You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

900 lines
25 KiB

  1. package statesync
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/fortytw2/leaktest"
  10. "github.com/stretchr/testify/mock"
  11. "github.com/stretchr/testify/require"
  12. dbm "github.com/tendermint/tm-db"
  13. abci "github.com/tendermint/tendermint/abci/types"
  14. "github.com/tendermint/tendermint/config"
  15. "github.com/tendermint/tendermint/internal/p2p"
  16. "github.com/tendermint/tendermint/internal/proxy"
  17. proxymocks "github.com/tendermint/tendermint/internal/proxy/mocks"
  18. smmocks "github.com/tendermint/tendermint/internal/state/mocks"
  19. "github.com/tendermint/tendermint/internal/statesync/mocks"
  20. "github.com/tendermint/tendermint/internal/store"
  21. "github.com/tendermint/tendermint/internal/test/factory"
  22. "github.com/tendermint/tendermint/libs/log"
  23. "github.com/tendermint/tendermint/light/provider"
  24. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  25. tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
  26. "github.com/tendermint/tendermint/types"
  27. )
  28. var m = PrometheusMetrics(config.TestConfig().Instrumentation.Namespace)
  29. const testAppVersion = 9
  30. type reactorTestSuite struct {
  31. reactor *Reactor
  32. syncer *syncer
  33. conn *proxymocks.AppConnSnapshot
  34. connQuery *proxymocks.AppConnQuery
  35. stateProvider *mocks.StateProvider
  36. snapshotChannel *p2p.Channel
  37. snapshotInCh chan p2p.Envelope
  38. snapshotOutCh chan p2p.Envelope
  39. snapshotPeerErrCh chan p2p.PeerError
  40. chunkChannel *p2p.Channel
  41. chunkInCh chan p2p.Envelope
  42. chunkOutCh chan p2p.Envelope
  43. chunkPeerErrCh chan p2p.PeerError
  44. blockChannel *p2p.Channel
  45. blockInCh chan p2p.Envelope
  46. blockOutCh chan p2p.Envelope
  47. blockPeerErrCh chan p2p.PeerError
  48. paramsChannel *p2p.Channel
  49. paramsInCh chan p2p.Envelope
  50. paramsOutCh chan p2p.Envelope
  51. paramsPeerErrCh chan p2p.PeerError
  52. peerUpdateCh chan p2p.PeerUpdate
  53. peerUpdates *p2p.PeerUpdates
  54. stateStore *smmocks.Store
  55. blockStore *store.BlockStore
  56. }
  57. func setup(
  58. ctx context.Context,
  59. t *testing.T,
  60. conn *proxymocks.AppConnSnapshot,
  61. connQuery *proxymocks.AppConnQuery,
  62. stateProvider *mocks.StateProvider,
  63. chBuf uint,
  64. ) *reactorTestSuite {
  65. t.Helper()
  66. if conn == nil {
  67. conn = &proxymocks.AppConnSnapshot{}
  68. }
  69. if connQuery == nil {
  70. connQuery = &proxymocks.AppConnQuery{}
  71. }
  72. if stateProvider == nil {
  73. stateProvider = &mocks.StateProvider{}
  74. }
  75. rts := &reactorTestSuite{
  76. snapshotInCh: make(chan p2p.Envelope, chBuf),
  77. snapshotOutCh: make(chan p2p.Envelope, chBuf),
  78. snapshotPeerErrCh: make(chan p2p.PeerError, chBuf),
  79. chunkInCh: make(chan p2p.Envelope, chBuf),
  80. chunkOutCh: make(chan p2p.Envelope, chBuf),
  81. chunkPeerErrCh: make(chan p2p.PeerError, chBuf),
  82. blockInCh: make(chan p2p.Envelope, chBuf),
  83. blockOutCh: make(chan p2p.Envelope, chBuf),
  84. blockPeerErrCh: make(chan p2p.PeerError, chBuf),
  85. paramsInCh: make(chan p2p.Envelope, chBuf),
  86. paramsOutCh: make(chan p2p.Envelope, chBuf),
  87. paramsPeerErrCh: make(chan p2p.PeerError, chBuf),
  88. conn: conn,
  89. connQuery: connQuery,
  90. stateProvider: stateProvider,
  91. }
  92. rts.peerUpdateCh = make(chan p2p.PeerUpdate, chBuf)
  93. rts.peerUpdates = p2p.NewPeerUpdates(rts.peerUpdateCh, int(chBuf))
  94. rts.snapshotChannel = p2p.NewChannel(
  95. SnapshotChannel,
  96. new(ssproto.Message),
  97. rts.snapshotInCh,
  98. rts.snapshotOutCh,
  99. rts.snapshotPeerErrCh,
  100. )
  101. rts.chunkChannel = p2p.NewChannel(
  102. ChunkChannel,
  103. new(ssproto.Message),
  104. rts.chunkInCh,
  105. rts.chunkOutCh,
  106. rts.chunkPeerErrCh,
  107. )
  108. rts.blockChannel = p2p.NewChannel(
  109. LightBlockChannel,
  110. new(ssproto.Message),
  111. rts.blockInCh,
  112. rts.blockOutCh,
  113. rts.blockPeerErrCh,
  114. )
  115. rts.paramsChannel = p2p.NewChannel(
  116. ParamsChannel,
  117. new(ssproto.Message),
  118. rts.paramsInCh,
  119. rts.paramsOutCh,
  120. rts.paramsPeerErrCh,
  121. )
  122. rts.stateStore = &smmocks.Store{}
  123. rts.blockStore = store.NewBlockStore(dbm.NewMemDB())
  124. cfg := config.DefaultStateSyncConfig()
  125. rts.reactor = NewReactor(
  126. factory.DefaultTestChainID,
  127. 1,
  128. *cfg,
  129. log.TestingLogger(),
  130. conn,
  131. connQuery,
  132. rts.snapshotChannel,
  133. rts.chunkChannel,
  134. rts.blockChannel,
  135. rts.paramsChannel,
  136. rts.peerUpdates,
  137. rts.stateStore,
  138. rts.blockStore,
  139. "",
  140. m,
  141. )
  142. rts.syncer = newSyncer(
  143. *cfg,
  144. log.NewNopLogger(),
  145. conn,
  146. connQuery,
  147. stateProvider,
  148. rts.snapshotChannel,
  149. rts.chunkChannel,
  150. "",
  151. rts.reactor.metrics,
  152. )
  153. require.NoError(t, rts.reactor.Start(ctx))
  154. require.True(t, rts.reactor.IsRunning())
  155. t.Cleanup(func() {
  156. rts.reactor.Wait()
  157. require.False(t, rts.reactor.IsRunning())
  158. })
  159. return rts
  160. }
  161. func TestReactor_Sync(t *testing.T) {
  162. ctx, cancel := context.WithCancel(context.Background())
  163. defer cancel()
  164. const snapshotHeight = 7
  165. rts := setup(ctx, t, nil, nil, nil, 2)
  166. chain := buildLightBlockChain(t, 1, 10, time.Now())
  167. // app accepts any snapshot
  168. rts.conn.On("OfferSnapshotSync", ctx, mock.AnythingOfType("types.RequestOfferSnapshot")).
  169. Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil)
  170. // app accepts every chunk
  171. rts.conn.On("ApplySnapshotChunkSync", ctx, mock.AnythingOfType("types.RequestApplySnapshotChunk")).
  172. Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  173. // app query returns valid state app hash
  174. rts.connQuery.On("InfoSync", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
  175. AppVersion: testAppVersion,
  176. LastBlockHeight: snapshotHeight,
  177. LastBlockAppHash: chain[snapshotHeight+1].AppHash,
  178. }, nil)
  179. // store accepts state and validator sets
  180. rts.stateStore.On("Bootstrap", mock.AnythingOfType("state.State")).Return(nil)
  181. rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"),
  182. mock.AnythingOfType("*types.ValidatorSet")).Return(nil)
  183. closeCh := make(chan struct{})
  184. defer close(closeCh)
  185. go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh,
  186. rts.blockInCh, closeCh, 0)
  187. go graduallyAddPeers(rts.peerUpdateCh, closeCh, 1*time.Second)
  188. go handleSnapshotRequests(t, rts.snapshotOutCh, rts.snapshotInCh, closeCh, []snapshot{
  189. {
  190. Height: uint64(snapshotHeight),
  191. Format: 1,
  192. Chunks: 1,
  193. },
  194. })
  195. go handleChunkRequests(t, rts.chunkOutCh, rts.chunkInCh, closeCh, []byte("abc"))
  196. go handleConsensusParamsRequest(ctx, t, rts.paramsOutCh, rts.paramsInCh, closeCh)
  197. // update the config to use the p2p provider
  198. rts.reactor.cfg.UseP2P = true
  199. rts.reactor.cfg.TrustHeight = 1
  200. rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())
  201. rts.reactor.cfg.DiscoveryTime = 1 * time.Second
  202. // Run state sync
  203. _, err := rts.reactor.Sync(ctx)
  204. require.NoError(t, err)
  205. }
  206. func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
  207. ctx, cancel := context.WithCancel(context.Background())
  208. defer cancel()
  209. rts := setup(ctx, t, nil, nil, nil, 2)
  210. rts.chunkInCh <- p2p.Envelope{
  211. From: types.NodeID("aa"),
  212. Message: &ssproto.SnapshotsRequest{},
  213. }
  214. response := <-rts.chunkPeerErrCh
  215. require.Error(t, response.Err)
  216. require.Empty(t, rts.chunkOutCh)
  217. require.Contains(t, response.Err.Error(), "received unknown message")
  218. require.Equal(t, types.NodeID("aa"), response.NodeID)
  219. }
  220. func TestReactor_ChunkRequest(t *testing.T) {
  221. testcases := map[string]struct {
  222. request *ssproto.ChunkRequest
  223. chunk []byte
  224. expectResponse *ssproto.ChunkResponse
  225. }{
  226. "chunk is returned": {
  227. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  228. []byte{1, 2, 3},
  229. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 2, 3}},
  230. },
  231. "empty chunk is returned, as empty": {
  232. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  233. []byte{},
  234. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{}},
  235. },
  236. "nil (missing) chunk is returned as missing": {
  237. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  238. nil,
  239. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true},
  240. },
  241. "invalid request": {
  242. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  243. nil,
  244. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true},
  245. },
  246. }
  247. bctx, bcancel := context.WithCancel(context.Background())
  248. defer bcancel()
  249. for name, tc := range testcases {
  250. t.Run(name, func(t *testing.T) {
  251. ctx, cancel := context.WithCancel(bctx)
  252. defer cancel()
  253. // mock ABCI connection to return local snapshots
  254. conn := &proxymocks.AppConnSnapshot{}
  255. conn.On("LoadSnapshotChunkSync", mock.Anything, abci.RequestLoadSnapshotChunk{
  256. Height: tc.request.Height,
  257. Format: tc.request.Format,
  258. Chunk: tc.request.Index,
  259. }).Return(&abci.ResponseLoadSnapshotChunk{Chunk: tc.chunk}, nil)
  260. rts := setup(ctx, t, conn, nil, nil, 2)
  261. rts.chunkInCh <- p2p.Envelope{
  262. From: types.NodeID("aa"),
  263. Message: tc.request,
  264. }
  265. response := <-rts.chunkOutCh
  266. require.Equal(t, tc.expectResponse, response.Message)
  267. require.Empty(t, rts.chunkOutCh)
  268. conn.AssertExpectations(t)
  269. })
  270. }
  271. }
  272. func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) {
  273. ctx, cancel := context.WithCancel(context.Background())
  274. defer cancel()
  275. rts := setup(ctx, t, nil, nil, nil, 2)
  276. rts.snapshotInCh <- p2p.Envelope{
  277. From: types.NodeID("aa"),
  278. Message: &ssproto.ChunkRequest{},
  279. }
  280. response := <-rts.snapshotPeerErrCh
  281. require.Error(t, response.Err)
  282. require.Empty(t, rts.snapshotOutCh)
  283. require.Contains(t, response.Err.Error(), "received unknown message")
  284. require.Equal(t, types.NodeID("aa"), response.NodeID)
  285. }
  286. func TestReactor_SnapshotsRequest(t *testing.T) {
  287. testcases := map[string]struct {
  288. snapshots []*abci.Snapshot
  289. expectResponses []*ssproto.SnapshotsResponse
  290. }{
  291. "no snapshots": {nil, []*ssproto.SnapshotsResponse{}},
  292. ">10 unordered snapshots": {
  293. []*abci.Snapshot{
  294. {Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1}},
  295. {Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
  296. {Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
  297. {Height: 1, Format: 1, Chunks: 7, Hash: []byte{1, 1}, Metadata: []byte{4}},
  298. {Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
  299. {Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
  300. {Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
  301. {Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
  302. {Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
  303. {Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
  304. {Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
  305. {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
  306. },
  307. []*ssproto.SnapshotsResponse{
  308. {Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
  309. {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
  310. {Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
  311. {Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
  312. {Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
  313. {Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
  314. {Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
  315. {Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
  316. {Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
  317. {Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
  318. },
  319. },
  320. }
  321. ctx, cancel := context.WithCancel(context.Background())
  322. defer cancel()
  323. for name, tc := range testcases {
  324. tc := tc
  325. t.Run(name, func(t *testing.T) {
  326. ctx, cancel := context.WithCancel(ctx)
  327. defer cancel()
  328. // mock ABCI connection to return local snapshots
  329. conn := &proxymocks.AppConnSnapshot{}
  330. conn.On("ListSnapshotsSync", mock.Anything, abci.RequestListSnapshots{}).Return(&abci.ResponseListSnapshots{
  331. Snapshots: tc.snapshots,
  332. }, nil)
  333. rts := setup(ctx, t, conn, nil, nil, 100)
  334. rts.snapshotInCh <- p2p.Envelope{
  335. From: types.NodeID("aa"),
  336. Message: &ssproto.SnapshotsRequest{},
  337. }
  338. if len(tc.expectResponses) > 0 {
  339. retryUntil(ctx, t, func() bool { return len(rts.snapshotOutCh) == len(tc.expectResponses) }, time.Second)
  340. }
  341. responses := make([]*ssproto.SnapshotsResponse, len(tc.expectResponses))
  342. for i := 0; i < len(tc.expectResponses); i++ {
  343. e := <-rts.snapshotOutCh
  344. responses[i] = e.Message.(*ssproto.SnapshotsResponse)
  345. }
  346. require.Equal(t, tc.expectResponses, responses)
  347. require.Empty(t, rts.snapshotOutCh)
  348. })
  349. }
  350. }
  351. func TestReactor_LightBlockResponse(t *testing.T) {
  352. ctx, cancel := context.WithCancel(context.Background())
  353. defer cancel()
  354. rts := setup(ctx, t, nil, nil, nil, 2)
  355. var height int64 = 10
  356. h := factory.MakeRandomHeader()
  357. h.Height = height
  358. blockID := factory.MakeBlockIDWithHash(h.Hash())
  359. vals, pv := factory.RandValidatorSet(1, 10)
  360. vote, err := factory.MakeVote(pv[0], h.ChainID, 0, h.Height, 0, 2,
  361. blockID, factory.DefaultTestTime)
  362. require.NoError(t, err)
  363. sh := &types.SignedHeader{
  364. Header: h,
  365. Commit: &types.Commit{
  366. Height: h.Height,
  367. BlockID: blockID,
  368. Signatures: []types.CommitSig{
  369. vote.CommitSig(),
  370. },
  371. },
  372. }
  373. lb := &types.LightBlock{
  374. SignedHeader: sh,
  375. ValidatorSet: vals,
  376. }
  377. require.NoError(t, rts.blockStore.SaveSignedHeader(sh, blockID))
  378. rts.stateStore.On("LoadValidators", height).Return(vals, nil)
  379. rts.blockInCh <- p2p.Envelope{
  380. From: types.NodeID("aa"),
  381. Message: &ssproto.LightBlockRequest{
  382. Height: 10,
  383. },
  384. }
  385. require.Empty(t, rts.blockPeerErrCh)
  386. select {
  387. case response := <-rts.blockOutCh:
  388. require.Equal(t, types.NodeID("aa"), response.To)
  389. res, ok := response.Message.(*ssproto.LightBlockResponse)
  390. require.True(t, ok)
  391. receivedLB, err := types.LightBlockFromProto(res.LightBlock)
  392. require.NoError(t, err)
  393. require.Equal(t, lb, receivedLB)
  394. case <-time.After(1 * time.Second):
  395. t.Fatal("expected light block response")
  396. }
  397. }
  398. func TestReactor_BlockProviders(t *testing.T) {
  399. ctx, cancel := context.WithCancel(context.Background())
  400. defer cancel()
  401. rts := setup(ctx, t, nil, nil, nil, 2)
  402. rts.peerUpdateCh <- p2p.PeerUpdate{
  403. NodeID: types.NodeID("aa"),
  404. Status: p2p.PeerStatusUp,
  405. }
  406. rts.peerUpdateCh <- p2p.PeerUpdate{
  407. NodeID: types.NodeID("bb"),
  408. Status: p2p.PeerStatusUp,
  409. }
  410. closeCh := make(chan struct{})
  411. defer close(closeCh)
  412. chain := buildLightBlockChain(t, 1, 10, time.Now())
  413. go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
  414. peers := rts.reactor.peers.All()
  415. require.Len(t, peers, 2)
  416. providers := make([]provider.Provider, len(peers))
  417. for idx, peer := range peers {
  418. providers[idx] = NewBlockProvider(peer, factory.DefaultTestChainID, rts.reactor.dispatcher)
  419. }
  420. wg := sync.WaitGroup{}
  421. for _, p := range providers {
  422. wg.Add(1)
  423. go func(t *testing.T, p provider.Provider) {
  424. defer wg.Done()
  425. for height := 2; height < 10; height++ {
  426. lb, err := p.LightBlock(ctx, int64(height))
  427. require.NoError(t, err)
  428. require.NotNil(t, lb)
  429. require.Equal(t, height, int(lb.Height))
  430. }
  431. }(t, p)
  432. }
  433. go func() { wg.Wait(); cancel() }()
  434. select {
  435. case <-time.After(time.Second):
  436. // not all of the requests to the dispatcher were responded to
  437. // within the timeout
  438. t.Fail()
  439. case <-ctx.Done():
  440. }
  441. }
  442. func TestReactor_StateProviderP2P(t *testing.T) {
  443. ctx, cancel := context.WithCancel(context.Background())
  444. defer cancel()
  445. rts := setup(ctx, t, nil, nil, nil, 2)
  446. // make syncer non nil else test won't think we are state syncing
  447. rts.reactor.syncer = rts.syncer
  448. peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength))
  449. peerB := types.NodeID(strings.Repeat("b", 2*types.NodeIDByteLength))
  450. rts.peerUpdateCh <- p2p.PeerUpdate{
  451. NodeID: peerA,
  452. Status: p2p.PeerStatusUp,
  453. }
  454. rts.peerUpdateCh <- p2p.PeerUpdate{
  455. NodeID: peerB,
  456. Status: p2p.PeerStatusUp,
  457. }
  458. closeCh := make(chan struct{})
  459. defer close(closeCh)
  460. chain := buildLightBlockChain(t, 1, 10, time.Now())
  461. go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
  462. go handleConsensusParamsRequest(ctx, t, rts.paramsOutCh, rts.paramsInCh, closeCh)
  463. rts.reactor.cfg.UseP2P = true
  464. rts.reactor.cfg.TrustHeight = 1
  465. rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())
  466. for _, p := range []types.NodeID{peerA, peerB} {
  467. if !rts.reactor.peers.Contains(p) {
  468. rts.reactor.peers.Append(p)
  469. }
  470. }
  471. require.True(t, rts.reactor.peers.Len() >= 2, "peer network not configured")
  472. ictx, cancel := context.WithTimeout(ctx, time.Second)
  473. defer cancel()
  474. rts.reactor.mtx.Lock()
  475. err := rts.reactor.initStateProvider(ictx, factory.DefaultTestChainID, 1)
  476. rts.reactor.mtx.Unlock()
  477. require.NoError(t, err)
  478. rts.reactor.syncer.stateProvider = rts.reactor.stateProvider
  479. actx, cancel := context.WithTimeout(ctx, 10*time.Second)
  480. defer cancel()
  481. appHash, err := rts.reactor.stateProvider.AppHash(actx, 5)
  482. require.NoError(t, err)
  483. require.Len(t, appHash, 32)
  484. state, err := rts.reactor.stateProvider.State(actx, 5)
  485. require.NoError(t, err)
  486. require.Equal(t, appHash, state.AppHash)
  487. require.Equal(t, types.DefaultConsensusParams(), &state.ConsensusParams)
  488. commit, err := rts.reactor.stateProvider.Commit(actx, 5)
  489. require.NoError(t, err)
  490. require.Equal(t, commit.BlockID, state.LastBlockID)
  491. added, err := rts.reactor.syncer.AddSnapshot(peerA, &snapshot{
  492. Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1},
  493. })
  494. require.NoError(t, err)
  495. require.True(t, added)
  496. }
  497. func TestReactor_Backfill(t *testing.T) {
  498. ctx, cancel := context.WithCancel(context.Background())
  499. defer cancel()
  500. // test backfill algorithm with varying failure rates [0, 10]
  501. failureRates := []int{0, 2, 9}
  502. for _, failureRate := range failureRates {
  503. failureRate := failureRate
  504. t.Run(fmt.Sprintf("failure rate: %d", failureRate), func(t *testing.T) {
  505. ctx, cancel := context.WithCancel(ctx)
  506. defer cancel()
  507. t.Cleanup(leaktest.CheckTimeout(t, 1*time.Minute))
  508. rts := setup(ctx, t, nil, nil, nil, 21)
  509. var (
  510. startHeight int64 = 20
  511. stopHeight int64 = 10
  512. stopTime = time.Date(2020, 1, 1, 0, 100, 0, 0, time.UTC)
  513. )
  514. peers := []string{"a", "b", "c", "d"}
  515. for _, peer := range peers {
  516. rts.peerUpdateCh <- p2p.PeerUpdate{
  517. NodeID: types.NodeID(peer),
  518. Status: p2p.PeerStatusUp,
  519. }
  520. }
  521. trackingHeight := startHeight
  522. rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"),
  523. mock.AnythingOfType("*types.ValidatorSet")).Return(func(lh, uh int64, vals *types.ValidatorSet) error {
  524. require.Equal(t, trackingHeight, lh)
  525. require.Equal(t, lh, uh)
  526. require.GreaterOrEqual(t, lh, stopHeight)
  527. trackingHeight--
  528. return nil
  529. })
  530. chain := buildLightBlockChain(t, stopHeight-1, startHeight+1, stopTime)
  531. closeCh := make(chan struct{})
  532. defer close(closeCh)
  533. go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh,
  534. rts.blockInCh, closeCh, failureRate)
  535. err := rts.reactor.backfill(
  536. ctx,
  537. factory.DefaultTestChainID,
  538. startHeight,
  539. stopHeight,
  540. 1,
  541. factory.MakeBlockIDWithHash(chain[startHeight].Header.Hash()),
  542. stopTime,
  543. )
  544. if failureRate > 3 {
  545. require.Error(t, err)
  546. require.NotEqual(t, rts.reactor.backfilledBlocks, rts.reactor.backfillBlockTotal)
  547. require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfillBlockTotal)
  548. } else {
  549. require.NoError(t, err)
  550. for height := startHeight; height <= stopHeight; height++ {
  551. blockMeta := rts.blockStore.LoadBlockMeta(height)
  552. require.NotNil(t, blockMeta)
  553. }
  554. require.Nil(t, rts.blockStore.LoadBlockMeta(stopHeight-1))
  555. require.Nil(t, rts.blockStore.LoadBlockMeta(startHeight+1))
  556. require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfilledBlocks)
  557. require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfillBlockTotal)
  558. }
  559. require.Equal(t, rts.reactor.backfilledBlocks, rts.reactor.BackFilledBlocks())
  560. require.Equal(t, rts.reactor.backfillBlockTotal, rts.reactor.BackFillBlocksTotal())
  561. })
  562. }
  563. }
  564. // retryUntil will continue to evaluate fn and will return successfully when true
  565. // or fail when the timeout is reached.
  566. func retryUntil(ctx context.Context, t *testing.T, fn func() bool, timeout time.Duration) {
  567. ctx, cancel := context.WithTimeout(ctx, timeout)
  568. defer cancel()
  569. for {
  570. if fn() {
  571. return
  572. }
  573. require.NoError(t, ctx.Err())
  574. }
  575. }
  576. func handleLightBlockRequests(
  577. ctx context.Context,
  578. t *testing.T,
  579. chain map[int64]*types.LightBlock,
  580. receiving chan p2p.Envelope,
  581. sending chan p2p.Envelope,
  582. close chan struct{},
  583. failureRate int) {
  584. requests := 0
  585. errorCount := 0
  586. for {
  587. select {
  588. case <-ctx.Done():
  589. return
  590. case envelope := <-receiving:
  591. if msg, ok := envelope.Message.(*ssproto.LightBlockRequest); ok {
  592. if requests%10 >= failureRate {
  593. lb, err := chain[int64(msg.Height)].ToProto()
  594. require.NoError(t, err)
  595. sending <- p2p.Envelope{
  596. From: envelope.To,
  597. Message: &ssproto.LightBlockResponse{
  598. LightBlock: lb,
  599. },
  600. }
  601. } else {
  602. switch errorCount % 3 {
  603. case 0: // send a different block
  604. vals, pv := factory.RandValidatorSet(3, 10)
  605. _, _, lb := mockLB(t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID(), vals, pv)
  606. differntLB, err := lb.ToProto()
  607. require.NoError(t, err)
  608. sending <- p2p.Envelope{
  609. From: envelope.To,
  610. Message: &ssproto.LightBlockResponse{
  611. LightBlock: differntLB,
  612. },
  613. }
  614. case 1: // send nil block i.e. pretend we don't have it
  615. sending <- p2p.Envelope{
  616. From: envelope.To,
  617. Message: &ssproto.LightBlockResponse{
  618. LightBlock: nil,
  619. },
  620. }
  621. case 2: // don't do anything
  622. }
  623. errorCount++
  624. }
  625. }
  626. case <-close:
  627. return
  628. }
  629. requests++
  630. }
  631. }
  632. func handleConsensusParamsRequest(
  633. ctx context.Context,
  634. t *testing.T,
  635. receiving, sending chan p2p.Envelope,
  636. closeCh chan struct{},
  637. ) {
  638. t.Helper()
  639. params := types.DefaultConsensusParams()
  640. paramsProto := params.ToProto()
  641. for {
  642. select {
  643. case <-ctx.Done():
  644. return
  645. case envelope := <-receiving:
  646. if ctx.Err() != nil {
  647. return
  648. }
  649. t.Log("received consensus params request")
  650. msg, ok := envelope.Message.(*ssproto.ParamsRequest)
  651. require.True(t, ok)
  652. sending <- p2p.Envelope{
  653. From: envelope.To,
  654. Message: &ssproto.ParamsResponse{
  655. Height: msg.Height,
  656. ConsensusParams: paramsProto,
  657. },
  658. }
  659. case <-closeCh:
  660. return
  661. }
  662. }
  663. }
  664. func buildLightBlockChain(t *testing.T, fromHeight, toHeight int64, startTime time.Time) map[int64]*types.LightBlock {
  665. chain := make(map[int64]*types.LightBlock, toHeight-fromHeight)
  666. lastBlockID := factory.MakeBlockID()
  667. blockTime := startTime.Add(time.Duration(fromHeight-toHeight) * time.Minute)
  668. vals, pv := factory.RandValidatorSet(3, 10)
  669. for height := fromHeight; height < toHeight; height++ {
  670. vals, pv, chain[height] = mockLB(t, height, blockTime, lastBlockID, vals, pv)
  671. lastBlockID = factory.MakeBlockIDWithHash(chain[height].Header.Hash())
  672. blockTime = blockTime.Add(1 * time.Minute)
  673. }
  674. return chain
  675. }
  676. func mockLB(t *testing.T, height int64, time time.Time, lastBlockID types.BlockID,
  677. currentVals *types.ValidatorSet, currentPrivVals []types.PrivValidator,
  678. ) (*types.ValidatorSet, []types.PrivValidator, *types.LightBlock) {
  679. header, err := factory.MakeHeader(&types.Header{
  680. Height: height,
  681. LastBlockID: lastBlockID,
  682. Time: time,
  683. })
  684. header.Version.App = testAppVersion
  685. require.NoError(t, err)
  686. nextVals, nextPrivVals := factory.RandValidatorSet(3, 10)
  687. header.ValidatorsHash = currentVals.Hash()
  688. header.NextValidatorsHash = nextVals.Hash()
  689. header.ConsensusHash = types.DefaultConsensusParams().HashConsensusParams()
  690. lastBlockID = factory.MakeBlockIDWithHash(header.Hash())
  691. voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, currentVals)
  692. commit, err := factory.MakeCommit(lastBlockID, height, 0, voteSet, currentPrivVals, time)
  693. require.NoError(t, err)
  694. return nextVals, nextPrivVals, &types.LightBlock{
  695. SignedHeader: &types.SignedHeader{
  696. Header: header,
  697. Commit: commit,
  698. },
  699. ValidatorSet: currentVals,
  700. }
  701. }
  702. // graduallyAddPeers delivers a new randomly-generated peer update on peerUpdateCh once
  703. // per interval, until closeCh is closed. Each peer update is assigned a random node ID.
  704. func graduallyAddPeers(
  705. peerUpdateCh chan p2p.PeerUpdate,
  706. closeCh chan struct{},
  707. interval time.Duration,
  708. ) {
  709. ticker := time.NewTicker(interval)
  710. for {
  711. select {
  712. case <-ticker.C:
  713. peerUpdateCh <- p2p.PeerUpdate{
  714. NodeID: factory.RandomNodeID(),
  715. Status: p2p.PeerStatusUp,
  716. }
  717. case <-closeCh:
  718. return
  719. }
  720. }
  721. }
  722. func handleSnapshotRequests(
  723. t *testing.T,
  724. receivingCh chan p2p.Envelope,
  725. sendingCh chan p2p.Envelope,
  726. closeCh chan struct{},
  727. snapshots []snapshot,
  728. ) {
  729. t.Helper()
  730. for {
  731. select {
  732. case envelope := <-receivingCh:
  733. _, ok := envelope.Message.(*ssproto.SnapshotsRequest)
  734. require.True(t, ok)
  735. for _, snapshot := range snapshots {
  736. sendingCh <- p2p.Envelope{
  737. From: envelope.To,
  738. Message: &ssproto.SnapshotsResponse{
  739. Height: snapshot.Height,
  740. Format: snapshot.Format,
  741. Chunks: snapshot.Chunks,
  742. Hash: snapshot.Hash,
  743. Metadata: snapshot.Metadata,
  744. },
  745. }
  746. }
  747. case <-closeCh:
  748. return
  749. }
  750. }
  751. }
  752. func handleChunkRequests(
  753. t *testing.T,
  754. receivingCh chan p2p.Envelope,
  755. sendingCh chan p2p.Envelope,
  756. closeCh chan struct{},
  757. chunk []byte,
  758. ) {
  759. t.Helper()
  760. for {
  761. select {
  762. case envelope := <-receivingCh:
  763. msg, ok := envelope.Message.(*ssproto.ChunkRequest)
  764. require.True(t, ok)
  765. sendingCh <- p2p.Envelope{
  766. From: envelope.To,
  767. Message: &ssproto.ChunkResponse{
  768. Height: msg.Height,
  769. Format: msg.Format,
  770. Index: msg.Index,
  771. Chunk: chunk,
  772. Missing: false,
  773. },
  774. }
  775. case <-closeCh:
  776. return
  777. }
  778. }
  779. }