You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

961 lines
27 KiB

  1. package statesync
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/fortytw2/leaktest"
  10. "github.com/stretchr/testify/mock"
  11. "github.com/stretchr/testify/require"
  12. dbm "github.com/tendermint/tm-db"
  13. clientmocks "github.com/tendermint/tendermint/abci/client/mocks"
  14. abci "github.com/tendermint/tendermint/abci/types"
  15. "github.com/tendermint/tendermint/config"
  16. "github.com/tendermint/tendermint/internal/p2p"
  17. "github.com/tendermint/tendermint/internal/proxy"
  18. smmocks "github.com/tendermint/tendermint/internal/state/mocks"
  19. "github.com/tendermint/tendermint/internal/statesync/mocks"
  20. "github.com/tendermint/tendermint/internal/store"
  21. "github.com/tendermint/tendermint/internal/test/factory"
  22. "github.com/tendermint/tendermint/libs/log"
  23. "github.com/tendermint/tendermint/light/provider"
  24. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  25. tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
  26. "github.com/tendermint/tendermint/types"
  27. )
  28. var m = PrometheusMetrics(config.TestConfig().Instrumentation.Namespace)
  29. const testAppVersion = 9
  30. type reactorTestSuite struct {
  31. reactor *Reactor
  32. syncer *syncer
  33. conn *clientmocks.Client
  34. stateProvider *mocks.StateProvider
  35. snapshotChannel *p2p.Channel
  36. snapshotInCh chan p2p.Envelope
  37. snapshotOutCh chan p2p.Envelope
  38. snapshotPeerErrCh chan p2p.PeerError
  39. chunkChannel *p2p.Channel
  40. chunkInCh chan p2p.Envelope
  41. chunkOutCh chan p2p.Envelope
  42. chunkPeerErrCh chan p2p.PeerError
  43. blockChannel *p2p.Channel
  44. blockInCh chan p2p.Envelope
  45. blockOutCh chan p2p.Envelope
  46. blockPeerErrCh chan p2p.PeerError
  47. paramsChannel *p2p.Channel
  48. paramsInCh chan p2p.Envelope
  49. paramsOutCh chan p2p.Envelope
  50. paramsPeerErrCh chan p2p.PeerError
  51. peerUpdateCh chan p2p.PeerUpdate
  52. peerUpdates *p2p.PeerUpdates
  53. stateStore *smmocks.Store
  54. blockStore *store.BlockStore
  55. }
  56. func setup(
  57. ctx context.Context,
  58. t *testing.T,
  59. conn *clientmocks.Client,
  60. stateProvider *mocks.StateProvider,
  61. chBuf uint,
  62. ) *reactorTestSuite {
  63. t.Helper()
  64. if conn == nil {
  65. conn = &clientmocks.Client{}
  66. }
  67. rts := &reactorTestSuite{
  68. snapshotInCh: make(chan p2p.Envelope, chBuf),
  69. snapshotOutCh: make(chan p2p.Envelope, chBuf),
  70. snapshotPeerErrCh: make(chan p2p.PeerError, chBuf),
  71. chunkInCh: make(chan p2p.Envelope, chBuf),
  72. chunkOutCh: make(chan p2p.Envelope, chBuf),
  73. chunkPeerErrCh: make(chan p2p.PeerError, chBuf),
  74. blockInCh: make(chan p2p.Envelope, chBuf),
  75. blockOutCh: make(chan p2p.Envelope, chBuf),
  76. blockPeerErrCh: make(chan p2p.PeerError, chBuf),
  77. paramsInCh: make(chan p2p.Envelope, chBuf),
  78. paramsOutCh: make(chan p2p.Envelope, chBuf),
  79. paramsPeerErrCh: make(chan p2p.PeerError, chBuf),
  80. conn: conn,
  81. stateProvider: stateProvider,
  82. }
  83. rts.peerUpdateCh = make(chan p2p.PeerUpdate, chBuf)
  84. rts.peerUpdates = p2p.NewPeerUpdates(rts.peerUpdateCh, int(chBuf))
  85. rts.snapshotChannel = p2p.NewChannel(
  86. SnapshotChannel,
  87. new(ssproto.Message),
  88. rts.snapshotInCh,
  89. rts.snapshotOutCh,
  90. rts.snapshotPeerErrCh,
  91. )
  92. rts.chunkChannel = p2p.NewChannel(
  93. ChunkChannel,
  94. new(ssproto.Message),
  95. rts.chunkInCh,
  96. rts.chunkOutCh,
  97. rts.chunkPeerErrCh,
  98. )
  99. rts.blockChannel = p2p.NewChannel(
  100. LightBlockChannel,
  101. new(ssproto.Message),
  102. rts.blockInCh,
  103. rts.blockOutCh,
  104. rts.blockPeerErrCh,
  105. )
  106. rts.paramsChannel = p2p.NewChannel(
  107. ParamsChannel,
  108. new(ssproto.Message),
  109. rts.paramsInCh,
  110. rts.paramsOutCh,
  111. rts.paramsPeerErrCh,
  112. )
  113. rts.stateStore = &smmocks.Store{}
  114. rts.blockStore = store.NewBlockStore(dbm.NewMemDB())
  115. cfg := config.DefaultStateSyncConfig()
  116. chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
  117. switch desc.ID {
  118. case SnapshotChannel:
  119. return rts.snapshotChannel, nil
  120. case ChunkChannel:
  121. return rts.chunkChannel, nil
  122. case LightBlockChannel:
  123. return rts.blockChannel, nil
  124. case ParamsChannel:
  125. return rts.paramsChannel, nil
  126. default:
  127. return nil, fmt.Errorf("invalid channel; %v", desc.ID)
  128. }
  129. }
  130. logger := log.NewNopLogger()
  131. var err error
  132. rts.reactor, err = NewReactor(
  133. ctx,
  134. factory.DefaultTestChainID,
  135. 1,
  136. *cfg,
  137. logger.With("component", "reactor"),
  138. conn,
  139. chCreator,
  140. rts.peerUpdates,
  141. rts.stateStore,
  142. rts.blockStore,
  143. "",
  144. m,
  145. nil, // eventbus can be nil
  146. )
  147. require.NoError(t, err)
  148. rts.syncer = newSyncer(
  149. *cfg,
  150. logger.With("component", "syncer"),
  151. conn,
  152. stateProvider,
  153. rts.snapshotChannel,
  154. rts.chunkChannel,
  155. "",
  156. rts.reactor.metrics,
  157. )
  158. ctx, cancel := context.WithCancel(ctx)
  159. require.NoError(t, rts.reactor.Start(ctx))
  160. require.True(t, rts.reactor.IsRunning())
  161. t.Cleanup(cancel)
  162. t.Cleanup(rts.reactor.Wait)
  163. t.Cleanup(leaktest.Check(t))
  164. return rts
  165. }
  166. func TestReactor_Sync(t *testing.T) {
  167. ctx, cancel := context.WithCancel(context.Background())
  168. defer cancel()
  169. const snapshotHeight = 7
  170. rts := setup(ctx, t, nil, nil, 2)
  171. chain := buildLightBlockChain(ctx, t, 1, 10, time.Now())
  172. // app accepts any snapshot
  173. rts.conn.On("OfferSnapshot", ctx, mock.AnythingOfType("types.RequestOfferSnapshot")).
  174. Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil)
  175. // app accepts every chunk
  176. rts.conn.On("ApplySnapshotChunk", ctx, mock.AnythingOfType("types.RequestApplySnapshotChunk")).
  177. Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  178. // app query returns valid state app hash
  179. rts.conn.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
  180. AppVersion: testAppVersion,
  181. LastBlockHeight: snapshotHeight,
  182. LastBlockAppHash: chain[snapshotHeight+1].AppHash,
  183. }, nil)
  184. // store accepts state and validator sets
  185. rts.stateStore.On("Bootstrap", mock.AnythingOfType("state.State")).Return(nil)
  186. rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"),
  187. mock.AnythingOfType("*types.ValidatorSet")).Return(nil)
  188. closeCh := make(chan struct{})
  189. defer close(closeCh)
  190. go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh,
  191. rts.blockInCh, closeCh, 0)
  192. go graduallyAddPeers(ctx, t, rts.peerUpdateCh, closeCh, 1*time.Second)
  193. go handleSnapshotRequests(ctx, t, rts.snapshotOutCh, rts.snapshotInCh, closeCh, []snapshot{
  194. {
  195. Height: uint64(snapshotHeight),
  196. Format: 1,
  197. Chunks: 1,
  198. },
  199. })
  200. go handleChunkRequests(ctx, t, rts.chunkOutCh, rts.chunkInCh, closeCh, []byte("abc"))
  201. go handleConsensusParamsRequest(ctx, t, rts.paramsOutCh, rts.paramsInCh, closeCh)
  202. // update the config to use the p2p provider
  203. rts.reactor.cfg.UseP2P = true
  204. rts.reactor.cfg.TrustHeight = 1
  205. rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())
  206. rts.reactor.cfg.DiscoveryTime = 1 * time.Second
  207. // Run state sync
  208. _, err := rts.reactor.Sync(ctx)
  209. require.NoError(t, err)
  210. }
  211. func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
  212. ctx, cancel := context.WithCancel(context.Background())
  213. defer cancel()
  214. rts := setup(ctx, t, nil, nil, 2)
  215. rts.chunkInCh <- p2p.Envelope{
  216. From: types.NodeID("aa"),
  217. Message: &ssproto.SnapshotsRequest{},
  218. }
  219. response := <-rts.chunkPeerErrCh
  220. require.Error(t, response.Err)
  221. require.Empty(t, rts.chunkOutCh)
  222. require.Contains(t, response.Err.Error(), "received unknown message")
  223. require.Equal(t, types.NodeID("aa"), response.NodeID)
  224. }
  225. func TestReactor_ChunkRequest(t *testing.T) {
  226. testcases := map[string]struct {
  227. request *ssproto.ChunkRequest
  228. chunk []byte
  229. expectResponse *ssproto.ChunkResponse
  230. }{
  231. "chunk is returned": {
  232. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  233. []byte{1, 2, 3},
  234. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 2, 3}},
  235. },
  236. "empty chunk is returned, as empty": {
  237. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  238. []byte{},
  239. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{}},
  240. },
  241. "nil (missing) chunk is returned as missing": {
  242. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  243. nil,
  244. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true},
  245. },
  246. "invalid request": {
  247. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  248. nil,
  249. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true},
  250. },
  251. }
  252. bctx, bcancel := context.WithCancel(context.Background())
  253. defer bcancel()
  254. for name, tc := range testcases {
  255. t.Run(name, func(t *testing.T) {
  256. ctx, cancel := context.WithCancel(bctx)
  257. defer cancel()
  258. // mock ABCI connection to return local snapshots
  259. conn := &clientmocks.Client{}
  260. conn.On("LoadSnapshotChunk", mock.Anything, abci.RequestLoadSnapshotChunk{
  261. Height: tc.request.Height,
  262. Format: tc.request.Format,
  263. Chunk: tc.request.Index,
  264. }).Return(&abci.ResponseLoadSnapshotChunk{Chunk: tc.chunk}, nil)
  265. rts := setup(ctx, t, conn, nil, 2)
  266. rts.chunkInCh <- p2p.Envelope{
  267. From: types.NodeID("aa"),
  268. Message: tc.request,
  269. }
  270. response := <-rts.chunkOutCh
  271. require.Equal(t, tc.expectResponse, response.Message)
  272. require.Empty(t, rts.chunkOutCh)
  273. conn.AssertExpectations(t)
  274. })
  275. }
  276. }
  277. func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) {
  278. ctx, cancel := context.WithCancel(context.Background())
  279. defer cancel()
  280. rts := setup(ctx, t, nil, nil, 2)
  281. rts.snapshotInCh <- p2p.Envelope{
  282. From: types.NodeID("aa"),
  283. Message: &ssproto.ChunkRequest{},
  284. }
  285. response := <-rts.snapshotPeerErrCh
  286. require.Error(t, response.Err)
  287. require.Empty(t, rts.snapshotOutCh)
  288. require.Contains(t, response.Err.Error(), "received unknown message")
  289. require.Equal(t, types.NodeID("aa"), response.NodeID)
  290. }
  291. func TestReactor_SnapshotsRequest(t *testing.T) {
  292. testcases := map[string]struct {
  293. snapshots []*abci.Snapshot
  294. expectResponses []*ssproto.SnapshotsResponse
  295. }{
  296. "no snapshots": {nil, []*ssproto.SnapshotsResponse{}},
  297. ">10 unordered snapshots": {
  298. []*abci.Snapshot{
  299. {Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1}},
  300. {Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
  301. {Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
  302. {Height: 1, Format: 1, Chunks: 7, Hash: []byte{1, 1}, Metadata: []byte{4}},
  303. {Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
  304. {Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
  305. {Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
  306. {Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
  307. {Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
  308. {Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
  309. {Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
  310. {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
  311. },
  312. []*ssproto.SnapshotsResponse{
  313. {Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
  314. {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
  315. {Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
  316. {Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
  317. {Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
  318. {Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
  319. {Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
  320. {Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
  321. {Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
  322. {Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
  323. },
  324. },
  325. }
  326. ctx, cancel := context.WithCancel(context.Background())
  327. defer cancel()
  328. for name, tc := range testcases {
  329. tc := tc
  330. t.Run(name, func(t *testing.T) {
  331. ctx, cancel := context.WithCancel(ctx)
  332. defer cancel()
  333. // mock ABCI connection to return local snapshots
  334. conn := &clientmocks.Client{}
  335. conn.On("ListSnapshots", mock.Anything, abci.RequestListSnapshots{}).Return(&abci.ResponseListSnapshots{
  336. Snapshots: tc.snapshots,
  337. }, nil)
  338. rts := setup(ctx, t, conn, nil, 100)
  339. rts.snapshotInCh <- p2p.Envelope{
  340. From: types.NodeID("aa"),
  341. Message: &ssproto.SnapshotsRequest{},
  342. }
  343. if len(tc.expectResponses) > 0 {
  344. retryUntil(ctx, t, func() bool { return len(rts.snapshotOutCh) == len(tc.expectResponses) }, time.Second)
  345. }
  346. responses := make([]*ssproto.SnapshotsResponse, len(tc.expectResponses))
  347. for i := 0; i < len(tc.expectResponses); i++ {
  348. e := <-rts.snapshotOutCh
  349. responses[i] = e.Message.(*ssproto.SnapshotsResponse)
  350. }
  351. require.Equal(t, tc.expectResponses, responses)
  352. require.Empty(t, rts.snapshotOutCh)
  353. })
  354. }
  355. }
  356. func TestReactor_LightBlockResponse(t *testing.T) {
  357. ctx, cancel := context.WithCancel(context.Background())
  358. defer cancel()
  359. rts := setup(ctx, t, nil, nil, 2)
  360. var height int64 = 10
  361. // generates a random header
  362. h := factory.MakeHeader(t, &types.Header{})
  363. h.Height = height
  364. blockID := factory.MakeBlockIDWithHash(h.Hash())
  365. vals, pv := factory.ValidatorSet(ctx, t, 1, 10)
  366. vote, err := factory.MakeVote(ctx, pv[0], h.ChainID, 0, h.Height, 0, 2,
  367. blockID, factory.DefaultTestTime)
  368. require.NoError(t, err)
  369. sh := &types.SignedHeader{
  370. Header: h,
  371. Commit: &types.Commit{
  372. Height: h.Height,
  373. BlockID: blockID,
  374. Signatures: []types.CommitSig{
  375. vote.CommitSig(),
  376. },
  377. },
  378. }
  379. lb := &types.LightBlock{
  380. SignedHeader: sh,
  381. ValidatorSet: vals,
  382. }
  383. require.NoError(t, rts.blockStore.SaveSignedHeader(sh, blockID))
  384. rts.stateStore.On("LoadValidators", height).Return(vals, nil)
  385. rts.blockInCh <- p2p.Envelope{
  386. From: types.NodeID("aa"),
  387. Message: &ssproto.LightBlockRequest{
  388. Height: 10,
  389. },
  390. }
  391. require.Empty(t, rts.blockPeerErrCh)
  392. select {
  393. case response := <-rts.blockOutCh:
  394. require.Equal(t, types.NodeID("aa"), response.To)
  395. res, ok := response.Message.(*ssproto.LightBlockResponse)
  396. require.True(t, ok)
  397. receivedLB, err := types.LightBlockFromProto(res.LightBlock)
  398. require.NoError(t, err)
  399. require.Equal(t, lb, receivedLB)
  400. case <-time.After(1 * time.Second):
  401. t.Fatal("expected light block response")
  402. }
  403. }
  404. func TestReactor_BlockProviders(t *testing.T) {
  405. ctx, cancel := context.WithCancel(context.Background())
  406. defer cancel()
  407. rts := setup(ctx, t, nil, nil, 2)
  408. rts.peerUpdateCh <- p2p.PeerUpdate{
  409. NodeID: types.NodeID("aa"),
  410. Status: p2p.PeerStatusUp,
  411. Channels: p2p.ChannelIDSet{
  412. SnapshotChannel: struct{}{},
  413. ChunkChannel: struct{}{},
  414. LightBlockChannel: struct{}{},
  415. ParamsChannel: struct{}{},
  416. },
  417. }
  418. rts.peerUpdateCh <- p2p.PeerUpdate{
  419. NodeID: types.NodeID("bb"),
  420. Status: p2p.PeerStatusUp,
  421. Channels: p2p.ChannelIDSet{
  422. SnapshotChannel: struct{}{},
  423. ChunkChannel: struct{}{},
  424. LightBlockChannel: struct{}{},
  425. ParamsChannel: struct{}{},
  426. },
  427. }
  428. closeCh := make(chan struct{})
  429. defer close(closeCh)
  430. chain := buildLightBlockChain(ctx, t, 1, 10, time.Now())
  431. go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
  432. peers := rts.reactor.peers.All()
  433. require.Len(t, peers, 2)
  434. providers := make([]provider.Provider, len(peers))
  435. for idx, peer := range peers {
  436. providers[idx] = NewBlockProvider(peer, factory.DefaultTestChainID, rts.reactor.dispatcher)
  437. }
  438. wg := sync.WaitGroup{}
  439. for _, p := range providers {
  440. wg.Add(1)
  441. go func(t *testing.T, p provider.Provider) {
  442. defer wg.Done()
  443. for height := 2; height < 10; height++ {
  444. lb, err := p.LightBlock(ctx, int64(height))
  445. require.NoError(t, err)
  446. require.NotNil(t, lb)
  447. require.Equal(t, height, int(lb.Height))
  448. }
  449. }(t, p)
  450. }
  451. go func() { wg.Wait(); cancel() }()
  452. select {
  453. case <-time.After(time.Second):
  454. // not all of the requests to the dispatcher were responded to
  455. // within the timeout
  456. t.Fail()
  457. case <-ctx.Done():
  458. }
  459. }
  460. func TestReactor_StateProviderP2P(t *testing.T) {
  461. ctx, cancel := context.WithCancel(context.Background())
  462. defer cancel()
  463. rts := setup(ctx, t, nil, nil, 2)
  464. // make syncer non nil else test won't think we are state syncing
  465. rts.reactor.syncer = rts.syncer
  466. peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength))
  467. peerB := types.NodeID(strings.Repeat("b", 2*types.NodeIDByteLength))
  468. rts.peerUpdateCh <- p2p.PeerUpdate{
  469. NodeID: peerA,
  470. Status: p2p.PeerStatusUp,
  471. }
  472. rts.peerUpdateCh <- p2p.PeerUpdate{
  473. NodeID: peerB,
  474. Status: p2p.PeerStatusUp,
  475. }
  476. closeCh := make(chan struct{})
  477. defer close(closeCh)
  478. chain := buildLightBlockChain(ctx, t, 1, 10, time.Now())
  479. go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
  480. go handleConsensusParamsRequest(ctx, t, rts.paramsOutCh, rts.paramsInCh, closeCh)
  481. rts.reactor.cfg.UseP2P = true
  482. rts.reactor.cfg.TrustHeight = 1
  483. rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())
  484. for _, p := range []types.NodeID{peerA, peerB} {
  485. if !rts.reactor.peers.Contains(p) {
  486. rts.reactor.peers.Append(p)
  487. }
  488. }
  489. require.True(t, rts.reactor.peers.Len() >= 2, "peer network not configured")
  490. ictx, cancel := context.WithTimeout(ctx, time.Second)
  491. defer cancel()
  492. rts.reactor.mtx.Lock()
  493. err := rts.reactor.initStateProvider(ictx, factory.DefaultTestChainID, 1)
  494. rts.reactor.mtx.Unlock()
  495. require.NoError(t, err)
  496. rts.reactor.syncer.stateProvider = rts.reactor.stateProvider
  497. actx, cancel := context.WithTimeout(ctx, time.Second)
  498. defer cancel()
  499. appHash, err := rts.reactor.stateProvider.AppHash(actx, 5)
  500. require.NoError(t, err)
  501. require.Len(t, appHash, 32)
  502. state, err := rts.reactor.stateProvider.State(actx, 5)
  503. require.NoError(t, err)
  504. require.Equal(t, appHash, state.AppHash)
  505. require.Equal(t, types.DefaultConsensusParams(), &state.ConsensusParams)
  506. commit, err := rts.reactor.stateProvider.Commit(actx, 5)
  507. require.NoError(t, err)
  508. require.Equal(t, commit.BlockID, state.LastBlockID)
  509. added, err := rts.reactor.syncer.AddSnapshot(peerA, &snapshot{
  510. Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1},
  511. })
  512. require.NoError(t, err)
  513. require.True(t, added)
  514. }
  515. func TestReactor_Backfill(t *testing.T) {
  516. ctx, cancel := context.WithCancel(context.Background())
  517. defer cancel()
  518. // test backfill algorithm with varying failure rates [0, 10]
  519. failureRates := []int{0, 2, 9}
  520. for _, failureRate := range failureRates {
  521. failureRate := failureRate
  522. t.Run(fmt.Sprintf("failure rate: %d", failureRate), func(t *testing.T) {
  523. ctx, cancel := context.WithCancel(ctx)
  524. defer cancel()
  525. t.Cleanup(leaktest.CheckTimeout(t, 1*time.Minute))
  526. rts := setup(ctx, t, nil, nil, 21)
  527. var (
  528. startHeight int64 = 20
  529. stopHeight int64 = 10
  530. stopTime = time.Date(2020, 1, 1, 0, 100, 0, 0, time.UTC)
  531. )
  532. peers := []string{"a", "b", "c", "d"}
  533. for _, peer := range peers {
  534. rts.peerUpdateCh <- p2p.PeerUpdate{
  535. NodeID: types.NodeID(peer),
  536. Status: p2p.PeerStatusUp,
  537. Channels: p2p.ChannelIDSet{
  538. SnapshotChannel: struct{}{},
  539. ChunkChannel: struct{}{},
  540. LightBlockChannel: struct{}{},
  541. ParamsChannel: struct{}{},
  542. },
  543. }
  544. }
  545. trackingHeight := startHeight
  546. rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"),
  547. mock.AnythingOfType("*types.ValidatorSet")).Return(func(lh, uh int64, vals *types.ValidatorSet) error {
  548. require.Equal(t, trackingHeight, lh)
  549. require.Equal(t, lh, uh)
  550. require.GreaterOrEqual(t, lh, stopHeight)
  551. trackingHeight--
  552. return nil
  553. })
  554. chain := buildLightBlockChain(ctx, t, stopHeight-1, startHeight+1, stopTime)
  555. closeCh := make(chan struct{})
  556. defer close(closeCh)
  557. go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh,
  558. rts.blockInCh, closeCh, failureRate)
  559. err := rts.reactor.backfill(
  560. ctx,
  561. factory.DefaultTestChainID,
  562. startHeight,
  563. stopHeight,
  564. 1,
  565. factory.MakeBlockIDWithHash(chain[startHeight].Header.Hash()),
  566. stopTime,
  567. )
  568. if failureRate > 3 {
  569. require.Error(t, err)
  570. require.NotEqual(t, rts.reactor.backfilledBlocks, rts.reactor.backfillBlockTotal)
  571. require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfillBlockTotal)
  572. } else {
  573. require.NoError(t, err)
  574. for height := startHeight; height <= stopHeight; height++ {
  575. blockMeta := rts.blockStore.LoadBlockMeta(height)
  576. require.NotNil(t, blockMeta)
  577. }
  578. require.Nil(t, rts.blockStore.LoadBlockMeta(stopHeight-1))
  579. require.Nil(t, rts.blockStore.LoadBlockMeta(startHeight+1))
  580. require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfilledBlocks)
  581. require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfillBlockTotal)
  582. }
  583. require.Equal(t, rts.reactor.backfilledBlocks, rts.reactor.BackFilledBlocks())
  584. require.Equal(t, rts.reactor.backfillBlockTotal, rts.reactor.BackFillBlocksTotal())
  585. })
  586. }
  587. }
  588. // retryUntil will continue to evaluate fn and will return successfully when true
  589. // or fail when the timeout is reached.
  590. func retryUntil(ctx context.Context, t *testing.T, fn func() bool, timeout time.Duration) {
  591. ctx, cancel := context.WithTimeout(ctx, timeout)
  592. defer cancel()
  593. for {
  594. if fn() {
  595. return
  596. }
  597. require.NoError(t, ctx.Err())
  598. }
  599. }
  600. func handleLightBlockRequests(
  601. ctx context.Context,
  602. t *testing.T,
  603. chain map[int64]*types.LightBlock,
  604. receiving chan p2p.Envelope,
  605. sending chan p2p.Envelope,
  606. close chan struct{},
  607. failureRate int) {
  608. requests := 0
  609. errorCount := 0
  610. for {
  611. select {
  612. case <-ctx.Done():
  613. return
  614. case envelope := <-receiving:
  615. if msg, ok := envelope.Message.(*ssproto.LightBlockRequest); ok {
  616. if requests%10 >= failureRate {
  617. lb, err := chain[int64(msg.Height)].ToProto()
  618. require.NoError(t, err)
  619. select {
  620. case sending <- p2p.Envelope{
  621. From: envelope.To,
  622. Message: &ssproto.LightBlockResponse{
  623. LightBlock: lb,
  624. },
  625. }:
  626. case <-ctx.Done():
  627. return
  628. }
  629. } else {
  630. switch errorCount % 3 {
  631. case 0: // send a different block
  632. vals, pv := factory.ValidatorSet(ctx, t, 3, 10)
  633. _, _, lb := mockLB(ctx, t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID(), vals, pv)
  634. differntLB, err := lb.ToProto()
  635. require.NoError(t, err)
  636. select {
  637. case sending <- p2p.Envelope{
  638. From: envelope.To,
  639. Message: &ssproto.LightBlockResponse{
  640. LightBlock: differntLB,
  641. },
  642. }:
  643. case <-ctx.Done():
  644. return
  645. }
  646. case 1: // send nil block i.e. pretend we don't have it
  647. select {
  648. case sending <- p2p.Envelope{
  649. From: envelope.To,
  650. Message: &ssproto.LightBlockResponse{
  651. LightBlock: nil,
  652. },
  653. }:
  654. case <-ctx.Done():
  655. return
  656. }
  657. case 2: // don't do anything
  658. }
  659. errorCount++
  660. }
  661. }
  662. case <-close:
  663. return
  664. }
  665. requests++
  666. }
  667. }
  668. func handleConsensusParamsRequest(
  669. ctx context.Context,
  670. t *testing.T,
  671. receiving, sending chan p2p.Envelope,
  672. closeCh chan struct{},
  673. ) {
  674. t.Helper()
  675. params := types.DefaultConsensusParams()
  676. paramsProto := params.ToProto()
  677. for {
  678. select {
  679. case <-ctx.Done():
  680. return
  681. case envelope := <-receiving:
  682. msg, ok := envelope.Message.(*ssproto.ParamsRequest)
  683. if !ok {
  684. t.Errorf("message was %T which is not a params request", envelope.Message)
  685. return
  686. }
  687. select {
  688. case sending <- p2p.Envelope{
  689. From: envelope.To,
  690. Message: &ssproto.ParamsResponse{
  691. Height: msg.Height,
  692. ConsensusParams: paramsProto,
  693. },
  694. }:
  695. case <-ctx.Done():
  696. return
  697. case <-closeCh:
  698. return
  699. }
  700. case <-closeCh:
  701. return
  702. }
  703. }
  704. }
  705. func buildLightBlockChain(ctx context.Context, t *testing.T, fromHeight, toHeight int64, startTime time.Time) map[int64]*types.LightBlock {
  706. t.Helper()
  707. chain := make(map[int64]*types.LightBlock, toHeight-fromHeight)
  708. lastBlockID := factory.MakeBlockID()
  709. blockTime := startTime.Add(time.Duration(fromHeight-toHeight) * time.Minute)
  710. vals, pv := factory.ValidatorSet(ctx, t, 3, 10)
  711. for height := fromHeight; height < toHeight; height++ {
  712. vals, pv, chain[height] = mockLB(ctx, t, height, blockTime, lastBlockID, vals, pv)
  713. lastBlockID = factory.MakeBlockIDWithHash(chain[height].Header.Hash())
  714. blockTime = blockTime.Add(1 * time.Minute)
  715. }
  716. return chain
  717. }
  718. func mockLB(ctx context.Context, t *testing.T, height int64, time time.Time, lastBlockID types.BlockID,
  719. currentVals *types.ValidatorSet, currentPrivVals []types.PrivValidator,
  720. ) (*types.ValidatorSet, []types.PrivValidator, *types.LightBlock) {
  721. t.Helper()
  722. header := factory.MakeHeader(t, &types.Header{
  723. Height: height,
  724. LastBlockID: lastBlockID,
  725. Time: time,
  726. })
  727. header.Version.App = testAppVersion
  728. nextVals, nextPrivVals := factory.ValidatorSet(ctx, t, 3, 10)
  729. header.ValidatorsHash = currentVals.Hash()
  730. header.NextValidatorsHash = nextVals.Hash()
  731. header.ConsensusHash = types.DefaultConsensusParams().HashConsensusParams()
  732. lastBlockID = factory.MakeBlockIDWithHash(header.Hash())
  733. voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, currentVals)
  734. commit, err := factory.MakeCommit(ctx, lastBlockID, height, 0, voteSet, currentPrivVals, time)
  735. require.NoError(t, err)
  736. return nextVals, nextPrivVals, &types.LightBlock{
  737. SignedHeader: &types.SignedHeader{
  738. Header: header,
  739. Commit: commit,
  740. },
  741. ValidatorSet: currentVals,
  742. }
  743. }
  744. // graduallyAddPeers delivers a new randomly-generated peer update on peerUpdateCh once
  745. // per interval, until closeCh is closed. Each peer update is assigned a random node ID.
  746. func graduallyAddPeers(
  747. ctx context.Context,
  748. t *testing.T,
  749. peerUpdateCh chan p2p.PeerUpdate,
  750. closeCh chan struct{},
  751. interval time.Duration,
  752. ) {
  753. ticker := time.NewTicker(interval)
  754. for {
  755. select {
  756. case <-ctx.Done():
  757. return
  758. case <-closeCh:
  759. return
  760. case <-ticker.C:
  761. peerUpdateCh <- p2p.PeerUpdate{
  762. NodeID: factory.RandomNodeID(t),
  763. Status: p2p.PeerStatusUp,
  764. Channels: p2p.ChannelIDSet{
  765. SnapshotChannel: struct{}{},
  766. ChunkChannel: struct{}{},
  767. LightBlockChannel: struct{}{},
  768. ParamsChannel: struct{}{},
  769. },
  770. }
  771. }
  772. }
  773. }
  774. func handleSnapshotRequests(
  775. ctx context.Context,
  776. t *testing.T,
  777. receivingCh chan p2p.Envelope,
  778. sendingCh chan p2p.Envelope,
  779. closeCh chan struct{},
  780. snapshots []snapshot,
  781. ) {
  782. t.Helper()
  783. for {
  784. select {
  785. case <-ctx.Done():
  786. return
  787. case <-closeCh:
  788. return
  789. case envelope := <-receivingCh:
  790. _, ok := envelope.Message.(*ssproto.SnapshotsRequest)
  791. require.True(t, ok)
  792. for _, snapshot := range snapshots {
  793. sendingCh <- p2p.Envelope{
  794. From: envelope.To,
  795. Message: &ssproto.SnapshotsResponse{
  796. Height: snapshot.Height,
  797. Format: snapshot.Format,
  798. Chunks: snapshot.Chunks,
  799. Hash: snapshot.Hash,
  800. Metadata: snapshot.Metadata,
  801. },
  802. }
  803. }
  804. }
  805. }
  806. }
  807. func handleChunkRequests(
  808. ctx context.Context,
  809. t *testing.T,
  810. receivingCh chan p2p.Envelope,
  811. sendingCh chan p2p.Envelope,
  812. closeCh chan struct{},
  813. chunk []byte,
  814. ) {
  815. t.Helper()
  816. for {
  817. select {
  818. case <-ctx.Done():
  819. return
  820. case <-closeCh:
  821. return
  822. case envelope := <-receivingCh:
  823. msg, ok := envelope.Message.(*ssproto.ChunkRequest)
  824. require.True(t, ok)
  825. sendingCh <- p2p.Envelope{
  826. From: envelope.To,
  827. Message: &ssproto.ChunkResponse{
  828. Height: msg.Height,
  829. Format: msg.Format,
  830. Index: msg.Index,
  831. Chunk: chunk,
  832. Missing: false,
  833. },
  834. }
  835. }
  836. }
  837. }