You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

824 lines
24 KiB

  1. package statesync
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/fortytw2/leaktest"
  10. "github.com/stretchr/testify/mock"
  11. "github.com/stretchr/testify/require"
  12. dbm "github.com/tendermint/tm-db"
  13. abci "github.com/tendermint/tendermint/abci/types"
  14. "github.com/tendermint/tendermint/config"
  15. "github.com/tendermint/tendermint/internal/p2p"
  16. "github.com/tendermint/tendermint/internal/statesync/mocks"
  17. "github.com/tendermint/tendermint/internal/test/factory"
  18. "github.com/tendermint/tendermint/libs/log"
  19. "github.com/tendermint/tendermint/light/provider"
  20. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  21. tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
  22. "github.com/tendermint/tendermint/proxy"
  23. proxymocks "github.com/tendermint/tendermint/proxy/mocks"
  24. smmocks "github.com/tendermint/tendermint/state/mocks"
  25. "github.com/tendermint/tendermint/store"
  26. "github.com/tendermint/tendermint/types"
  27. )
  28. type reactorTestSuite struct {
  29. reactor *Reactor
  30. syncer *syncer
  31. conn *proxymocks.AppConnSnapshot
  32. connQuery *proxymocks.AppConnQuery
  33. stateProvider *mocks.StateProvider
  34. snapshotChannel *p2p.Channel
  35. snapshotInCh chan p2p.Envelope
  36. snapshotOutCh chan p2p.Envelope
  37. snapshotPeerErrCh chan p2p.PeerError
  38. chunkChannel *p2p.Channel
  39. chunkInCh chan p2p.Envelope
  40. chunkOutCh chan p2p.Envelope
  41. chunkPeerErrCh chan p2p.PeerError
  42. blockChannel *p2p.Channel
  43. blockInCh chan p2p.Envelope
  44. blockOutCh chan p2p.Envelope
  45. blockPeerErrCh chan p2p.PeerError
  46. paramsChannel *p2p.Channel
  47. paramsInCh chan p2p.Envelope
  48. paramsOutCh chan p2p.Envelope
  49. paramsPeerErrCh chan p2p.PeerError
  50. peerUpdateCh chan p2p.PeerUpdate
  51. peerUpdates *p2p.PeerUpdates
  52. stateStore *smmocks.Store
  53. blockStore *store.BlockStore
  54. }
  55. func setup(
  56. t *testing.T,
  57. conn *proxymocks.AppConnSnapshot,
  58. connQuery *proxymocks.AppConnQuery,
  59. stateProvider *mocks.StateProvider,
  60. chBuf uint,
  61. ) *reactorTestSuite {
  62. t.Helper()
  63. if conn == nil {
  64. conn = &proxymocks.AppConnSnapshot{}
  65. }
  66. if connQuery == nil {
  67. connQuery = &proxymocks.AppConnQuery{}
  68. }
  69. if stateProvider == nil {
  70. stateProvider = &mocks.StateProvider{}
  71. }
  72. rts := &reactorTestSuite{
  73. snapshotInCh: make(chan p2p.Envelope, chBuf),
  74. snapshotOutCh: make(chan p2p.Envelope, chBuf),
  75. snapshotPeerErrCh: make(chan p2p.PeerError, chBuf),
  76. chunkInCh: make(chan p2p.Envelope, chBuf),
  77. chunkOutCh: make(chan p2p.Envelope, chBuf),
  78. chunkPeerErrCh: make(chan p2p.PeerError, chBuf),
  79. blockInCh: make(chan p2p.Envelope, chBuf),
  80. blockOutCh: make(chan p2p.Envelope, chBuf),
  81. blockPeerErrCh: make(chan p2p.PeerError, chBuf),
  82. paramsInCh: make(chan p2p.Envelope, chBuf),
  83. paramsOutCh: make(chan p2p.Envelope, chBuf),
  84. paramsPeerErrCh: make(chan p2p.PeerError, chBuf),
  85. conn: conn,
  86. connQuery: connQuery,
  87. stateProvider: stateProvider,
  88. }
  89. rts.peerUpdateCh = make(chan p2p.PeerUpdate, chBuf)
  90. rts.peerUpdates = p2p.NewPeerUpdates(rts.peerUpdateCh, int(chBuf))
  91. rts.snapshotChannel = p2p.NewChannel(
  92. SnapshotChannel,
  93. new(ssproto.Message),
  94. rts.snapshotInCh,
  95. rts.snapshotOutCh,
  96. rts.snapshotPeerErrCh,
  97. )
  98. rts.chunkChannel = p2p.NewChannel(
  99. ChunkChannel,
  100. new(ssproto.Message),
  101. rts.chunkInCh,
  102. rts.chunkOutCh,
  103. rts.chunkPeerErrCh,
  104. )
  105. rts.blockChannel = p2p.NewChannel(
  106. LightBlockChannel,
  107. new(ssproto.Message),
  108. rts.blockInCh,
  109. rts.blockOutCh,
  110. rts.blockPeerErrCh,
  111. )
  112. rts.paramsChannel = p2p.NewChannel(
  113. ParamsChannel,
  114. new(ssproto.Message),
  115. rts.paramsInCh,
  116. rts.paramsOutCh,
  117. rts.paramsPeerErrCh,
  118. )
  119. rts.stateStore = &smmocks.Store{}
  120. rts.blockStore = store.NewBlockStore(dbm.NewMemDB())
  121. cfg := config.DefaultStateSyncConfig()
  122. rts.reactor = NewReactor(
  123. factory.DefaultTestChainID,
  124. 1,
  125. *cfg,
  126. log.TestingLogger(),
  127. conn,
  128. connQuery,
  129. rts.snapshotChannel,
  130. rts.chunkChannel,
  131. rts.blockChannel,
  132. rts.paramsChannel,
  133. rts.peerUpdates,
  134. rts.stateStore,
  135. rts.blockStore,
  136. "",
  137. )
  138. rts.syncer = newSyncer(
  139. *cfg,
  140. log.NewNopLogger(),
  141. conn,
  142. connQuery,
  143. stateProvider,
  144. rts.snapshotOutCh,
  145. rts.chunkOutCh,
  146. "",
  147. )
  148. require.NoError(t, rts.reactor.Start())
  149. require.True(t, rts.reactor.IsRunning())
  150. t.Cleanup(func() {
  151. require.NoError(t, rts.reactor.Stop())
  152. require.False(t, rts.reactor.IsRunning())
  153. })
  154. return rts
  155. }
  156. func TestReactor_Sync(t *testing.T) {
  157. const snapshotHeight = 7
  158. rts := setup(t, nil, nil, nil, 2)
  159. chain := buildLightBlockChain(t, 1, 10, time.Now())
  160. // app accepts any snapshot
  161. rts.conn.On("OfferSnapshotSync", ctx, mock.AnythingOfType("types.RequestOfferSnapshot")).
  162. Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil)
  163. // app accepts every chunk
  164. rts.conn.On("ApplySnapshotChunkSync", ctx, mock.AnythingOfType("types.RequestApplySnapshotChunk")).
  165. Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  166. // app query returns valid state app hash
  167. rts.connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(&abci.ResponseInfo{
  168. AppVersion: 9,
  169. LastBlockHeight: snapshotHeight,
  170. LastBlockAppHash: chain[snapshotHeight+1].AppHash,
  171. }, nil)
  172. // store accepts state and validator sets
  173. rts.stateStore.On("Bootstrap", mock.AnythingOfType("state.State")).Return(nil)
  174. rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"),
  175. mock.AnythingOfType("*types.ValidatorSet")).Return(nil)
  176. closeCh := make(chan struct{})
  177. defer close(closeCh)
  178. go handleLightBlockRequests(t, chain, rts.blockOutCh,
  179. rts.blockInCh, closeCh, 0)
  180. go graduallyAddPeers(rts.peerUpdateCh, closeCh, 1*time.Second)
  181. go handleSnapshotRequests(t, rts.snapshotOutCh, rts.snapshotInCh, closeCh, []snapshot{
  182. {
  183. Height: uint64(snapshotHeight),
  184. Format: 1,
  185. Chunks: 1,
  186. },
  187. })
  188. go handleChunkRequests(t, rts.chunkOutCh, rts.chunkInCh, closeCh, []byte("abc"))
  189. go handleConsensusParamsRequest(t, rts.paramsOutCh, rts.paramsInCh, closeCh)
  190. // update the config to use the p2p provider
  191. rts.reactor.cfg.UseP2P = true
  192. rts.reactor.cfg.TrustHeight = 1
  193. rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())
  194. rts.reactor.cfg.DiscoveryTime = 1 * time.Second
  195. // Run state sync
  196. _, err := rts.reactor.Sync(context.Background())
  197. require.NoError(t, err)
  198. }
  199. func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
  200. rts := setup(t, nil, nil, nil, 2)
  201. rts.chunkInCh <- p2p.Envelope{
  202. From: types.NodeID("aa"),
  203. Message: &ssproto.SnapshotsRequest{},
  204. }
  205. response := <-rts.chunkPeerErrCh
  206. require.Error(t, response.Err)
  207. require.Empty(t, rts.chunkOutCh)
  208. require.Contains(t, response.Err.Error(), "received unknown message")
  209. require.Equal(t, types.NodeID("aa"), response.NodeID)
  210. }
  211. func TestReactor_ChunkRequest(t *testing.T) {
  212. testcases := map[string]struct {
  213. request *ssproto.ChunkRequest
  214. chunk []byte
  215. expectResponse *ssproto.ChunkResponse
  216. }{
  217. "chunk is returned": {
  218. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  219. []byte{1, 2, 3},
  220. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 2, 3}},
  221. },
  222. "empty chunk is returned, as empty": {
  223. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  224. []byte{},
  225. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{}},
  226. },
  227. "nil (missing) chunk is returned as missing": {
  228. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  229. nil,
  230. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true},
  231. },
  232. "invalid request": {
  233. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  234. nil,
  235. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true},
  236. },
  237. }
  238. for name, tc := range testcases {
  239. tc := tc
  240. t.Run(name, func(t *testing.T) {
  241. // mock ABCI connection to return local snapshots
  242. conn := &proxymocks.AppConnSnapshot{}
  243. conn.On("LoadSnapshotChunkSync", context.Background(), abci.RequestLoadSnapshotChunk{
  244. Height: tc.request.Height,
  245. Format: tc.request.Format,
  246. Chunk: tc.request.Index,
  247. }).Return(&abci.ResponseLoadSnapshotChunk{Chunk: tc.chunk}, nil)
  248. rts := setup(t, conn, nil, nil, 2)
  249. rts.chunkInCh <- p2p.Envelope{
  250. From: types.NodeID("aa"),
  251. Message: tc.request,
  252. }
  253. response := <-rts.chunkOutCh
  254. require.Equal(t, tc.expectResponse, response.Message)
  255. require.Empty(t, rts.chunkOutCh)
  256. conn.AssertExpectations(t)
  257. })
  258. }
  259. }
  260. func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) {
  261. rts := setup(t, nil, nil, nil, 2)
  262. rts.snapshotInCh <- p2p.Envelope{
  263. From: types.NodeID("aa"),
  264. Message: &ssproto.ChunkRequest{},
  265. }
  266. response := <-rts.snapshotPeerErrCh
  267. require.Error(t, response.Err)
  268. require.Empty(t, rts.snapshotOutCh)
  269. require.Contains(t, response.Err.Error(), "received unknown message")
  270. require.Equal(t, types.NodeID("aa"), response.NodeID)
  271. }
  272. func TestReactor_SnapshotsRequest(t *testing.T) {
  273. testcases := map[string]struct {
  274. snapshots []*abci.Snapshot
  275. expectResponses []*ssproto.SnapshotsResponse
  276. }{
  277. "no snapshots": {nil, []*ssproto.SnapshotsResponse{}},
  278. ">10 unordered snapshots": {
  279. []*abci.Snapshot{
  280. {Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1}},
  281. {Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
  282. {Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
  283. {Height: 1, Format: 1, Chunks: 7, Hash: []byte{1, 1}, Metadata: []byte{4}},
  284. {Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
  285. {Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
  286. {Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
  287. {Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
  288. {Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
  289. {Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
  290. {Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
  291. {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
  292. },
  293. []*ssproto.SnapshotsResponse{
  294. {Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
  295. {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
  296. {Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
  297. {Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
  298. {Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
  299. {Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
  300. {Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
  301. {Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
  302. {Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
  303. {Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
  304. },
  305. },
  306. }
  307. for name, tc := range testcases {
  308. tc := tc
  309. t.Run(name, func(t *testing.T) {
  310. // mock ABCI connection to return local snapshots
  311. conn := &proxymocks.AppConnSnapshot{}
  312. conn.On("ListSnapshotsSync", context.Background(), abci.RequestListSnapshots{}).Return(&abci.ResponseListSnapshots{
  313. Snapshots: tc.snapshots,
  314. }, nil)
  315. rts := setup(t, conn, nil, nil, 100)
  316. rts.snapshotInCh <- p2p.Envelope{
  317. From: types.NodeID("aa"),
  318. Message: &ssproto.SnapshotsRequest{},
  319. }
  320. if len(tc.expectResponses) > 0 {
  321. retryUntil(t, func() bool { return len(rts.snapshotOutCh) == len(tc.expectResponses) }, time.Second)
  322. }
  323. responses := make([]*ssproto.SnapshotsResponse, len(tc.expectResponses))
  324. for i := 0; i < len(tc.expectResponses); i++ {
  325. e := <-rts.snapshotOutCh
  326. responses[i] = e.Message.(*ssproto.SnapshotsResponse)
  327. }
  328. require.Equal(t, tc.expectResponses, responses)
  329. require.Empty(t, rts.snapshotOutCh)
  330. })
  331. }
  332. }
  333. func TestReactor_LightBlockResponse(t *testing.T) {
  334. rts := setup(t, nil, nil, nil, 2)
  335. var height int64 = 10
  336. h := factory.MakeRandomHeader()
  337. h.Height = height
  338. blockID := factory.MakeBlockIDWithHash(h.Hash())
  339. vals, pv := factory.RandValidatorSet(1, 10)
  340. vote, err := factory.MakeVote(pv[0], h.ChainID, 0, h.Height, 0, 2,
  341. blockID, factory.DefaultTestTime)
  342. require.NoError(t, err)
  343. sh := &types.SignedHeader{
  344. Header: h,
  345. Commit: &types.Commit{
  346. Height: h.Height,
  347. BlockID: blockID,
  348. Signatures: []types.CommitSig{
  349. vote.CommitSig(),
  350. },
  351. },
  352. }
  353. lb := &types.LightBlock{
  354. SignedHeader: sh,
  355. ValidatorSet: vals,
  356. }
  357. require.NoError(t, rts.blockStore.SaveSignedHeader(sh, blockID))
  358. rts.stateStore.On("LoadValidators", height).Return(vals, nil)
  359. rts.blockInCh <- p2p.Envelope{
  360. From: types.NodeID("aa"),
  361. Message: &ssproto.LightBlockRequest{
  362. Height: 10,
  363. },
  364. }
  365. require.Empty(t, rts.blockPeerErrCh)
  366. select {
  367. case response := <-rts.blockOutCh:
  368. require.Equal(t, types.NodeID("aa"), response.To)
  369. res, ok := response.Message.(*ssproto.LightBlockResponse)
  370. require.True(t, ok)
  371. receivedLB, err := types.LightBlockFromProto(res.LightBlock)
  372. require.NoError(t, err)
  373. require.Equal(t, lb, receivedLB)
  374. case <-time.After(1 * time.Second):
  375. t.Fatal("expected light block response")
  376. }
  377. }
  378. func TestReactor_BlockProviders(t *testing.T) {
  379. rts := setup(t, nil, nil, nil, 2)
  380. rts.peerUpdateCh <- p2p.PeerUpdate{
  381. NodeID: types.NodeID("aa"),
  382. Status: p2p.PeerStatusUp,
  383. }
  384. rts.peerUpdateCh <- p2p.PeerUpdate{
  385. NodeID: types.NodeID("bb"),
  386. Status: p2p.PeerStatusUp,
  387. }
  388. closeCh := make(chan struct{})
  389. defer close(closeCh)
  390. chain := buildLightBlockChain(t, 1, 10, time.Now())
  391. go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
  392. peers := rts.reactor.peers.All()
  393. require.Len(t, peers, 2)
  394. providers := make([]provider.Provider, len(peers))
  395. for idx, peer := range peers {
  396. providers[idx] = NewBlockProvider(peer, factory.DefaultTestChainID, rts.reactor.dispatcher)
  397. }
  398. wg := sync.WaitGroup{}
  399. for _, p := range providers {
  400. wg.Add(1)
  401. go func(t *testing.T, p provider.Provider) {
  402. defer wg.Done()
  403. for height := 2; height < 10; height++ {
  404. lb, err := p.LightBlock(context.Background(), int64(height))
  405. require.NoError(t, err)
  406. require.NotNil(t, lb)
  407. require.Equal(t, height, int(lb.Height))
  408. }
  409. }(t, p)
  410. }
  411. ctx, cancel := context.WithCancel(context.Background())
  412. go func() { wg.Wait(); cancel() }()
  413. select {
  414. case <-time.After(time.Second):
  415. // not all of the requests to the dispatcher were responded to
  416. // within the timeout
  417. t.Fail()
  418. case <-ctx.Done():
  419. }
  420. }
  421. func TestReactor_StateProviderP2P(t *testing.T) {
  422. rts := setup(t, nil, nil, nil, 2)
  423. // make syncer non nil else test won't think we are state syncing
  424. rts.reactor.syncer = rts.syncer
  425. peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength))
  426. peerB := types.NodeID(strings.Repeat("b", 2*types.NodeIDByteLength))
  427. rts.peerUpdateCh <- p2p.PeerUpdate{
  428. NodeID: peerA,
  429. Status: p2p.PeerStatusUp,
  430. }
  431. rts.peerUpdateCh <- p2p.PeerUpdate{
  432. NodeID: peerB,
  433. Status: p2p.PeerStatusUp,
  434. }
  435. closeCh := make(chan struct{})
  436. defer close(closeCh)
  437. chain := buildLightBlockChain(t, 1, 10, time.Now())
  438. go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
  439. go handleConsensusParamsRequest(t, rts.paramsOutCh, rts.paramsInCh, closeCh)
  440. rts.reactor.cfg.UseP2P = true
  441. rts.reactor.cfg.TrustHeight = 1
  442. rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())
  443. ctx := context.Background()
  444. rts.reactor.mtx.Lock()
  445. err := rts.reactor.initStateProvider(ctx, factory.DefaultTestChainID, 1)
  446. rts.reactor.mtx.Unlock()
  447. require.NoError(t, err)
  448. rts.reactor.syncer.stateProvider = rts.reactor.stateProvider
  449. appHash, err := rts.reactor.stateProvider.AppHash(ctx, 5)
  450. require.NoError(t, err)
  451. require.Len(t, appHash, 32)
  452. state, err := rts.reactor.stateProvider.State(ctx, 5)
  453. require.NoError(t, err)
  454. require.Equal(t, appHash, state.AppHash)
  455. require.Equal(t, types.DefaultConsensusParams(), &state.ConsensusParams)
  456. commit, err := rts.reactor.stateProvider.Commit(ctx, 5)
  457. require.NoError(t, err)
  458. require.Equal(t, commit.BlockID, state.LastBlockID)
  459. added, err := rts.reactor.syncer.AddSnapshot(peerA, &snapshot{
  460. Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1},
  461. })
  462. require.NoError(t, err)
  463. require.True(t, added)
  464. }
  465. func TestReactor_Backfill(t *testing.T) {
  466. // test backfill algorithm with varying failure rates [0, 10]
  467. failureRates := []int{0, 2, 9}
  468. for _, failureRate := range failureRates {
  469. failureRate := failureRate
  470. t.Run(fmt.Sprintf("failure rate: %d", failureRate), func(t *testing.T) {
  471. t.Cleanup(leaktest.CheckTimeout(t, 1*time.Minute))
  472. rts := setup(t, nil, nil, nil, 21)
  473. var (
  474. startHeight int64 = 20
  475. stopHeight int64 = 10
  476. stopTime = time.Date(2020, 1, 1, 0, 100, 0, 0, time.UTC)
  477. )
  478. peers := []string{"a", "b", "c", "d"}
  479. for _, peer := range peers {
  480. rts.peerUpdateCh <- p2p.PeerUpdate{
  481. NodeID: types.NodeID(peer),
  482. Status: p2p.PeerStatusUp,
  483. }
  484. }
  485. trackingHeight := startHeight
  486. rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"),
  487. mock.AnythingOfType("*types.ValidatorSet")).Return(func(lh, uh int64, vals *types.ValidatorSet) error {
  488. require.Equal(t, trackingHeight, lh)
  489. require.Equal(t, lh, uh)
  490. require.GreaterOrEqual(t, lh, stopHeight)
  491. trackingHeight--
  492. return nil
  493. })
  494. chain := buildLightBlockChain(t, stopHeight-1, startHeight+1, stopTime)
  495. closeCh := make(chan struct{})
  496. defer close(closeCh)
  497. go handleLightBlockRequests(t, chain, rts.blockOutCh,
  498. rts.blockInCh, closeCh, failureRate)
  499. err := rts.reactor.backfill(
  500. context.Background(),
  501. factory.DefaultTestChainID,
  502. startHeight,
  503. stopHeight,
  504. 1,
  505. factory.MakeBlockIDWithHash(chain[startHeight].Header.Hash()),
  506. stopTime,
  507. )
  508. if failureRate > 3 {
  509. require.Error(t, err)
  510. } else {
  511. require.NoError(t, err)
  512. for height := startHeight; height <= stopHeight; height++ {
  513. blockMeta := rts.blockStore.LoadBlockMeta(height)
  514. require.NotNil(t, blockMeta)
  515. }
  516. require.Nil(t, rts.blockStore.LoadBlockMeta(stopHeight-1))
  517. require.Nil(t, rts.blockStore.LoadBlockMeta(startHeight+1))
  518. }
  519. })
  520. }
  521. }
  522. // retryUntil will continue to evaluate fn and will return successfully when true
  523. // or fail when the timeout is reached.
  524. func retryUntil(t *testing.T, fn func() bool, timeout time.Duration) {
  525. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  526. defer cancel()
  527. for {
  528. if fn() {
  529. return
  530. }
  531. require.NoError(t, ctx.Err())
  532. }
  533. }
  534. func handleLightBlockRequests(t *testing.T,
  535. chain map[int64]*types.LightBlock,
  536. receiving chan p2p.Envelope,
  537. sending chan p2p.Envelope,
  538. close chan struct{},
  539. failureRate int) {
  540. requests := 0
  541. errorCount := 0
  542. for {
  543. select {
  544. case envelope := <-receiving:
  545. if msg, ok := envelope.Message.(*ssproto.LightBlockRequest); ok {
  546. if requests%10 >= failureRate {
  547. lb, err := chain[int64(msg.Height)].ToProto()
  548. require.NoError(t, err)
  549. sending <- p2p.Envelope{
  550. From: envelope.To,
  551. Message: &ssproto.LightBlockResponse{
  552. LightBlock: lb,
  553. },
  554. }
  555. } else {
  556. switch errorCount % 3 {
  557. case 0: // send a different block
  558. vals, pv := factory.RandValidatorSet(3, 10)
  559. _, _, lb := mockLB(t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID(), vals, pv)
  560. differntLB, err := lb.ToProto()
  561. require.NoError(t, err)
  562. sending <- p2p.Envelope{
  563. From: envelope.To,
  564. Message: &ssproto.LightBlockResponse{
  565. LightBlock: differntLB,
  566. },
  567. }
  568. case 1: // send nil block i.e. pretend we don't have it
  569. sending <- p2p.Envelope{
  570. From: envelope.To,
  571. Message: &ssproto.LightBlockResponse{
  572. LightBlock: nil,
  573. },
  574. }
  575. case 2: // don't do anything
  576. }
  577. errorCount++
  578. }
  579. }
  580. case <-close:
  581. return
  582. }
  583. requests++
  584. }
  585. }
  586. func handleConsensusParamsRequest(t *testing.T, receiving, sending chan p2p.Envelope, closeCh chan struct{}) {
  587. t.Helper()
  588. params := types.DefaultConsensusParams()
  589. paramsProto := params.ToProto()
  590. for {
  591. select {
  592. case envelope := <-receiving:
  593. t.Log("received consensus params request")
  594. msg, ok := envelope.Message.(*ssproto.ParamsRequest)
  595. require.True(t, ok)
  596. sending <- p2p.Envelope{
  597. From: envelope.To,
  598. Message: &ssproto.ParamsResponse{
  599. Height: msg.Height,
  600. ConsensusParams: paramsProto,
  601. },
  602. }
  603. case <-closeCh:
  604. return
  605. }
  606. }
  607. }
  608. func buildLightBlockChain(t *testing.T, fromHeight, toHeight int64, startTime time.Time) map[int64]*types.LightBlock {
  609. chain := make(map[int64]*types.LightBlock, toHeight-fromHeight)
  610. lastBlockID := factory.MakeBlockID()
  611. blockTime := startTime.Add(time.Duration(fromHeight-toHeight) * time.Minute)
  612. vals, pv := factory.RandValidatorSet(3, 10)
  613. for height := fromHeight; height < toHeight; height++ {
  614. vals, pv, chain[height] = mockLB(t, height, blockTime, lastBlockID, vals, pv)
  615. lastBlockID = factory.MakeBlockIDWithHash(chain[height].Header.Hash())
  616. blockTime = blockTime.Add(1 * time.Minute)
  617. }
  618. return chain
  619. }
  620. func mockLB(t *testing.T, height int64, time time.Time, lastBlockID types.BlockID,
  621. currentVals *types.ValidatorSet, currentPrivVals []types.PrivValidator,
  622. ) (*types.ValidatorSet, []types.PrivValidator, *types.LightBlock) {
  623. header, err := factory.MakeHeader(&types.Header{
  624. Height: height,
  625. LastBlockID: lastBlockID,
  626. Time: time,
  627. })
  628. require.NoError(t, err)
  629. nextVals, nextPrivVals := factory.RandValidatorSet(3, 10)
  630. header.ValidatorsHash = currentVals.Hash()
  631. header.NextValidatorsHash = nextVals.Hash()
  632. header.ConsensusHash = types.DefaultConsensusParams().HashConsensusParams()
  633. lastBlockID = factory.MakeBlockIDWithHash(header.Hash())
  634. voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, currentVals)
  635. commit, err := factory.MakeCommit(lastBlockID, height, 0, voteSet, currentPrivVals, time)
  636. require.NoError(t, err)
  637. return nextVals, nextPrivVals, &types.LightBlock{
  638. SignedHeader: &types.SignedHeader{
  639. Header: header,
  640. Commit: commit,
  641. },
  642. ValidatorSet: currentVals,
  643. }
  644. }
  645. // graduallyAddPeers delivers a new randomly-generated peer update on peerUpdateCh once
  646. // per interval, until closeCh is closed. Each peer update is assigned a random node ID.
  647. func graduallyAddPeers(
  648. peerUpdateCh chan p2p.PeerUpdate,
  649. closeCh chan struct{},
  650. interval time.Duration,
  651. ) {
  652. ticker := time.NewTicker(interval)
  653. for {
  654. select {
  655. case <-ticker.C:
  656. peerUpdateCh <- p2p.PeerUpdate{
  657. NodeID: factory.RandomNodeID(),
  658. Status: p2p.PeerStatusUp,
  659. }
  660. case <-closeCh:
  661. return
  662. }
  663. }
  664. }
  665. func handleSnapshotRequests(
  666. t *testing.T,
  667. receivingCh chan p2p.Envelope,
  668. sendingCh chan p2p.Envelope,
  669. closeCh chan struct{},
  670. snapshots []snapshot,
  671. ) {
  672. t.Helper()
  673. for {
  674. select {
  675. case envelope := <-receivingCh:
  676. _, ok := envelope.Message.(*ssproto.SnapshotsRequest)
  677. require.True(t, ok)
  678. for _, snapshot := range snapshots {
  679. sendingCh <- p2p.Envelope{
  680. From: envelope.To,
  681. Message: &ssproto.SnapshotsResponse{
  682. Height: snapshot.Height,
  683. Format: snapshot.Format,
  684. Chunks: snapshot.Chunks,
  685. Hash: snapshot.Hash,
  686. Metadata: snapshot.Metadata,
  687. },
  688. }
  689. }
  690. case <-closeCh:
  691. return
  692. }
  693. }
  694. }
  695. func handleChunkRequests(
  696. t *testing.T,
  697. receivingCh chan p2p.Envelope,
  698. sendingCh chan p2p.Envelope,
  699. closeCh chan struct{},
  700. chunk []byte,
  701. ) {
  702. t.Helper()
  703. for {
  704. select {
  705. case envelope := <-receivingCh:
  706. msg, ok := envelope.Message.(*ssproto.ChunkRequest)
  707. require.True(t, ok)
  708. sendingCh <- p2p.Envelope{
  709. From: envelope.To,
  710. Message: &ssproto.ChunkResponse{
  711. Height: msg.Height,
  712. Format: msg.Format,
  713. Index: msg.Index,
  714. Chunk: chunk,
  715. Missing: false,
  716. },
  717. }
  718. case <-closeCh:
  719. return
  720. }
  721. }
  722. }