You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

303 lines
7.2 KiB

  1. package statesync
  2. import (
  3. "context"
  4. "math/rand"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/require"
  10. "github.com/tendermint/tendermint/internal/test/factory"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. var (
  14. startHeight int64 = 200
  15. stopHeight int64 = 100
  16. stopTime = time.Date(2019, 1, 1, 1, 0, 0, 0, time.UTC)
  17. endTime = stopTime.Add(-1 * time.Second)
  18. numWorkers = 1
  19. )
  20. func TestBlockQueueBasic(t *testing.T) {
  21. ctx, cancel := context.WithCancel(context.Background())
  22. defer cancel()
  23. peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
  24. require.NoError(t, err)
  25. queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 1)
  26. wg := &sync.WaitGroup{}
  27. // asynchronously fetch blocks and add it to the queue
  28. for i := 0; i <= numWorkers; i++ {
  29. wg.Add(1)
  30. go func() {
  31. for {
  32. select {
  33. case height := <-queue.nextHeight():
  34. queue.add(mockLBResp(ctx, t, peerID, height, endTime))
  35. case <-queue.done():
  36. wg.Done()
  37. return
  38. }
  39. }
  40. }()
  41. }
  42. trackingHeight := startHeight
  43. wg.Add(1)
  44. loop:
  45. for {
  46. select {
  47. case <-queue.done():
  48. wg.Done()
  49. break loop
  50. case resp := <-queue.verifyNext():
  51. // assert that the queue serializes the blocks
  52. require.Equal(t, resp.block.Height, trackingHeight)
  53. trackingHeight--
  54. queue.success()
  55. }
  56. }
  57. wg.Wait()
  58. assert.Less(t, trackingHeight, stopHeight)
  59. }
  60. // Test with spurious failures and retries
  61. func TestBlockQueueWithFailures(t *testing.T) {
  62. ctx, cancel := context.WithCancel(context.Background())
  63. defer cancel()
  64. peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
  65. require.NoError(t, err)
  66. queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 200)
  67. wg := &sync.WaitGroup{}
  68. failureRate := 4
  69. for i := 0; i <= numWorkers; i++ {
  70. wg.Add(1)
  71. go func() {
  72. for {
  73. select {
  74. case height := <-queue.nextHeight():
  75. if rand.Intn(failureRate) == 0 {
  76. queue.retry(height)
  77. } else {
  78. queue.add(mockLBResp(ctx, t, peerID, height, endTime))
  79. }
  80. case <-queue.done():
  81. wg.Done()
  82. return
  83. }
  84. }
  85. }()
  86. }
  87. trackingHeight := startHeight
  88. for {
  89. select {
  90. case resp := <-queue.verifyNext():
  91. // assert that the queue serializes the blocks
  92. assert.Equal(t, resp.block.Height, trackingHeight)
  93. if rand.Intn(failureRate) == 0 {
  94. queue.retry(resp.block.Height)
  95. } else {
  96. trackingHeight--
  97. queue.success()
  98. }
  99. case <-queue.done():
  100. wg.Wait()
  101. assert.Less(t, trackingHeight, stopHeight)
  102. return
  103. }
  104. }
  105. }
  106. // Test that when all the blocks are retrieved that the queue still holds on to
  107. // it's workers and in the event of failure can still fetch the failed block
  108. func TestBlockQueueBlocks(t *testing.T) {
  109. peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
  110. require.NoError(t, err)
  111. queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 2)
  112. expectedHeight := startHeight
  113. retryHeight := stopHeight + 2
  114. ctx, cancel := context.WithCancel(context.Background())
  115. defer cancel()
  116. loop:
  117. for {
  118. select {
  119. case height := <-queue.nextHeight():
  120. require.Equal(t, height, expectedHeight)
  121. require.GreaterOrEqual(t, height, stopHeight)
  122. expectedHeight--
  123. queue.add(mockLBResp(ctx, t, peerID, height, endTime))
  124. case <-time.After(1 * time.Second):
  125. if expectedHeight >= stopHeight {
  126. t.Fatalf("expected next height %d", expectedHeight)
  127. }
  128. break loop
  129. }
  130. }
  131. // close any waiter channels that the previous worker left hanging
  132. for _, ch := range queue.waiters {
  133. close(ch)
  134. }
  135. queue.waiters = make([]chan int64, 0)
  136. wg := &sync.WaitGroup{}
  137. wg.Add(1)
  138. // so far so good. The worker is waiting. Now we fail a previous
  139. // block and check that the worker fetches them
  140. go func(t *testing.T) {
  141. defer wg.Done()
  142. select {
  143. case height := <-queue.nextHeight():
  144. require.Equal(t, retryHeight, height)
  145. case <-time.After(1 * time.Second):
  146. require.Fail(t, "queue didn't ask worker to fetch failed height")
  147. }
  148. }(t)
  149. queue.retry(retryHeight)
  150. wg.Wait()
  151. }
  152. func TestBlockQueueAcceptsNoMoreBlocks(t *testing.T) {
  153. peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
  154. require.NoError(t, err)
  155. queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 1)
  156. defer queue.close()
  157. ctx, cancel := context.WithCancel(context.Background())
  158. defer cancel()
  159. loop:
  160. for {
  161. select {
  162. case height := <-queue.nextHeight():
  163. require.GreaterOrEqual(t, height, stopHeight)
  164. queue.add(mockLBResp(ctx, t, peerID, height, endTime))
  165. case <-time.After(1 * time.Second):
  166. break loop
  167. }
  168. }
  169. require.Len(t, queue.pending, int(startHeight-stopHeight)+1)
  170. queue.add(mockLBResp(ctx, t, peerID, stopHeight-1, endTime))
  171. require.Len(t, queue.pending, int(startHeight-stopHeight)+1)
  172. }
  173. // Test a scenario where more blocks are needed then just the stopheight because
  174. // we haven't found a block with a small enough time.
  175. func TestBlockQueueStopTime(t *testing.T) {
  176. peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
  177. require.NoError(t, err)
  178. queue := newBlockQueue(startHeight, stopHeight, 1, stopTime, 1)
  179. wg := &sync.WaitGroup{}
  180. ctx, cancel := context.WithCancel(context.Background())
  181. defer cancel()
  182. baseTime := stopTime.Add(-50 * time.Second)
  183. // asynchronously fetch blocks and add it to the queue
  184. for i := 0; i <= numWorkers; i++ {
  185. wg.Add(1)
  186. go func() {
  187. for {
  188. select {
  189. case height := <-queue.nextHeight():
  190. blockTime := baseTime.Add(time.Duration(height) * time.Second)
  191. queue.add(mockLBResp(ctx, t, peerID, height, blockTime))
  192. case <-queue.done():
  193. wg.Done()
  194. return
  195. }
  196. }
  197. }()
  198. }
  199. trackingHeight := startHeight
  200. for {
  201. select {
  202. case resp := <-queue.verifyNext():
  203. // assert that the queue serializes the blocks
  204. assert.Equal(t, resp.block.Height, trackingHeight)
  205. trackingHeight--
  206. queue.success()
  207. case <-queue.done():
  208. wg.Wait()
  209. assert.Less(t, trackingHeight, stopHeight-50)
  210. return
  211. }
  212. }
  213. }
  214. func TestBlockQueueInitialHeight(t *testing.T) {
  215. peerID, err := types.NewNodeID("0011223344556677889900112233445566778899")
  216. require.NoError(t, err)
  217. const initialHeight int64 = 120
  218. queue := newBlockQueue(startHeight, stopHeight, initialHeight, stopTime, 1)
  219. wg := &sync.WaitGroup{}
  220. ctx, cancel := context.WithCancel(context.Background())
  221. defer cancel()
  222. // asynchronously fetch blocks and add it to the queue
  223. for i := 0; i <= numWorkers; i++ {
  224. wg.Add(1)
  225. go func() {
  226. for {
  227. select {
  228. case height := <-queue.nextHeight():
  229. require.GreaterOrEqual(t, height, initialHeight)
  230. queue.add(mockLBResp(ctx, t, peerID, height, endTime))
  231. case <-queue.done():
  232. wg.Done()
  233. return
  234. }
  235. }
  236. }()
  237. }
  238. loop:
  239. for {
  240. select {
  241. case <-queue.done():
  242. wg.Wait()
  243. require.NoError(t, queue.error())
  244. break loop
  245. case resp := <-queue.verifyNext():
  246. require.GreaterOrEqual(t, resp.block.Height, initialHeight)
  247. queue.success()
  248. }
  249. }
  250. }
  251. func mockLBResp(ctx context.Context, t *testing.T, peer types.NodeID, height int64, time time.Time) lightBlockResponse {
  252. t.Helper()
  253. vals, pv := factory.ValidatorSet(ctx, t, 3, 10)
  254. _, _, lb := mockLB(ctx, t, height, time, factory.MakeBlockID(), vals, pv)
  255. return lightBlockResponse{
  256. block: lb,
  257. peer: peer,
  258. }
  259. }