You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

947 lines
27 KiB

  1. package statesync
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/fortytw2/leaktest"
  10. "github.com/stretchr/testify/mock"
  11. "github.com/stretchr/testify/require"
  12. dbm "github.com/tendermint/tm-db"
  13. abci "github.com/tendermint/tendermint/abci/types"
  14. "github.com/tendermint/tendermint/config"
  15. "github.com/tendermint/tendermint/internal/p2p"
  16. "github.com/tendermint/tendermint/internal/proxy"
  17. proxymocks "github.com/tendermint/tendermint/internal/proxy/mocks"
  18. smmocks "github.com/tendermint/tendermint/internal/state/mocks"
  19. "github.com/tendermint/tendermint/internal/statesync/mocks"
  20. "github.com/tendermint/tendermint/internal/store"
  21. "github.com/tendermint/tendermint/internal/test/factory"
  22. "github.com/tendermint/tendermint/libs/log"
  23. "github.com/tendermint/tendermint/light/provider"
  24. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  25. tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
  26. "github.com/tendermint/tendermint/types"
  27. )
  28. var m = PrometheusMetrics(config.TestConfig().Instrumentation.Namespace)
  29. const testAppVersion = 9
  30. type reactorTestSuite struct {
  31. reactor *Reactor
  32. syncer *syncer
  33. conn *proxymocks.AppConnSnapshot
  34. connQuery *proxymocks.AppConnQuery
  35. stateProvider *mocks.StateProvider
  36. snapshotChannel *p2p.Channel
  37. snapshotInCh chan p2p.Envelope
  38. snapshotOutCh chan p2p.Envelope
  39. snapshotPeerErrCh chan p2p.PeerError
  40. chunkChannel *p2p.Channel
  41. chunkInCh chan p2p.Envelope
  42. chunkOutCh chan p2p.Envelope
  43. chunkPeerErrCh chan p2p.PeerError
  44. blockChannel *p2p.Channel
  45. blockInCh chan p2p.Envelope
  46. blockOutCh chan p2p.Envelope
  47. blockPeerErrCh chan p2p.PeerError
  48. paramsChannel *p2p.Channel
  49. paramsInCh chan p2p.Envelope
  50. paramsOutCh chan p2p.Envelope
  51. paramsPeerErrCh chan p2p.PeerError
  52. peerUpdateCh chan p2p.PeerUpdate
  53. peerUpdates *p2p.PeerUpdates
  54. stateStore *smmocks.Store
  55. blockStore *store.BlockStore
  56. }
  57. func setup(
  58. ctx context.Context,
  59. t *testing.T,
  60. conn *proxymocks.AppConnSnapshot,
  61. connQuery *proxymocks.AppConnQuery,
  62. stateProvider *mocks.StateProvider,
  63. chBuf uint,
  64. ) *reactorTestSuite {
  65. t.Helper()
  66. if conn == nil {
  67. conn = &proxymocks.AppConnSnapshot{}
  68. }
  69. if connQuery == nil {
  70. connQuery = &proxymocks.AppConnQuery{}
  71. }
  72. if stateProvider == nil {
  73. stateProvider = &mocks.StateProvider{}
  74. }
  75. rts := &reactorTestSuite{
  76. snapshotInCh: make(chan p2p.Envelope, chBuf),
  77. snapshotOutCh: make(chan p2p.Envelope, chBuf),
  78. snapshotPeerErrCh: make(chan p2p.PeerError, chBuf),
  79. chunkInCh: make(chan p2p.Envelope, chBuf),
  80. chunkOutCh: make(chan p2p.Envelope, chBuf),
  81. chunkPeerErrCh: make(chan p2p.PeerError, chBuf),
  82. blockInCh: make(chan p2p.Envelope, chBuf),
  83. blockOutCh: make(chan p2p.Envelope, chBuf),
  84. blockPeerErrCh: make(chan p2p.PeerError, chBuf),
  85. paramsInCh: make(chan p2p.Envelope, chBuf),
  86. paramsOutCh: make(chan p2p.Envelope, chBuf),
  87. paramsPeerErrCh: make(chan p2p.PeerError, chBuf),
  88. conn: conn,
  89. connQuery: connQuery,
  90. stateProvider: stateProvider,
  91. }
  92. rts.peerUpdateCh = make(chan p2p.PeerUpdate, chBuf)
  93. rts.peerUpdates = p2p.NewPeerUpdates(rts.peerUpdateCh, int(chBuf))
  94. rts.snapshotChannel = p2p.NewChannel(
  95. SnapshotChannel,
  96. new(ssproto.Message),
  97. rts.snapshotInCh,
  98. rts.snapshotOutCh,
  99. rts.snapshotPeerErrCh,
  100. )
  101. rts.chunkChannel = p2p.NewChannel(
  102. ChunkChannel,
  103. new(ssproto.Message),
  104. rts.chunkInCh,
  105. rts.chunkOutCh,
  106. rts.chunkPeerErrCh,
  107. )
  108. rts.blockChannel = p2p.NewChannel(
  109. LightBlockChannel,
  110. new(ssproto.Message),
  111. rts.blockInCh,
  112. rts.blockOutCh,
  113. rts.blockPeerErrCh,
  114. )
  115. rts.paramsChannel = p2p.NewChannel(
  116. ParamsChannel,
  117. new(ssproto.Message),
  118. rts.paramsInCh,
  119. rts.paramsOutCh,
  120. rts.paramsPeerErrCh,
  121. )
  122. rts.stateStore = &smmocks.Store{}
  123. rts.blockStore = store.NewBlockStore(dbm.NewMemDB())
  124. cfg := config.DefaultStateSyncConfig()
  125. chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
  126. switch desc.ID {
  127. case SnapshotChannel:
  128. return rts.snapshotChannel, nil
  129. case ChunkChannel:
  130. return rts.chunkChannel, nil
  131. case LightBlockChannel:
  132. return rts.blockChannel, nil
  133. case ParamsChannel:
  134. return rts.paramsChannel, nil
  135. default:
  136. return nil, fmt.Errorf("invalid channel; %v", desc.ID)
  137. }
  138. }
  139. logger := log.NewNopLogger()
  140. var err error
  141. rts.reactor, err = NewReactor(
  142. ctx,
  143. factory.DefaultTestChainID,
  144. 1,
  145. *cfg,
  146. logger.With("component", "reactor"),
  147. conn,
  148. connQuery,
  149. chCreator,
  150. rts.peerUpdates,
  151. rts.stateStore,
  152. rts.blockStore,
  153. "",
  154. m,
  155. nil, // eventbus can be nil
  156. )
  157. require.NoError(t, err)
  158. rts.syncer = newSyncer(
  159. *cfg,
  160. logger.With("component", "syncer"),
  161. conn,
  162. connQuery,
  163. stateProvider,
  164. rts.snapshotChannel,
  165. rts.chunkChannel,
  166. "",
  167. rts.reactor.metrics,
  168. )
  169. ctx, cancel := context.WithCancel(ctx)
  170. require.NoError(t, rts.reactor.Start(ctx))
  171. require.True(t, rts.reactor.IsRunning())
  172. t.Cleanup(cancel)
  173. t.Cleanup(rts.reactor.Wait)
  174. t.Cleanup(leaktest.Check(t))
  175. return rts
  176. }
  177. func TestReactor_Sync(t *testing.T) {
  178. ctx, cancel := context.WithCancel(context.Background())
  179. defer cancel()
  180. const snapshotHeight = 7
  181. rts := setup(ctx, t, nil, nil, nil, 2)
  182. chain := buildLightBlockChain(ctx, t, 1, 10, time.Now())
  183. // app accepts any snapshot
  184. rts.conn.On("OfferSnapshot", ctx, mock.AnythingOfType("types.RequestOfferSnapshot")).
  185. Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil)
  186. // app accepts every chunk
  187. rts.conn.On("ApplySnapshotChunk", ctx, mock.AnythingOfType("types.RequestApplySnapshotChunk")).
  188. Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  189. // app query returns valid state app hash
  190. rts.connQuery.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
  191. AppVersion: testAppVersion,
  192. LastBlockHeight: snapshotHeight,
  193. LastBlockAppHash: chain[snapshotHeight+1].AppHash,
  194. }, nil)
  195. // store accepts state and validator sets
  196. rts.stateStore.On("Bootstrap", mock.AnythingOfType("state.State")).Return(nil)
  197. rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"),
  198. mock.AnythingOfType("*types.ValidatorSet")).Return(nil)
  199. closeCh := make(chan struct{})
  200. defer close(closeCh)
  201. go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh,
  202. rts.blockInCh, closeCh, 0)
  203. go graduallyAddPeers(t, rts.peerUpdateCh, closeCh, 1*time.Second)
  204. go handleSnapshotRequests(t, rts.snapshotOutCh, rts.snapshotInCh, closeCh, []snapshot{
  205. {
  206. Height: uint64(snapshotHeight),
  207. Format: 1,
  208. Chunks: 1,
  209. },
  210. })
  211. go handleChunkRequests(t, rts.chunkOutCh, rts.chunkInCh, closeCh, []byte("abc"))
  212. go handleConsensusParamsRequest(ctx, t, rts.paramsOutCh, rts.paramsInCh, closeCh)
  213. // update the config to use the p2p provider
  214. rts.reactor.cfg.UseP2P = true
  215. rts.reactor.cfg.TrustHeight = 1
  216. rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())
  217. rts.reactor.cfg.DiscoveryTime = 1 * time.Second
  218. // Run state sync
  219. _, err := rts.reactor.Sync(ctx)
  220. require.NoError(t, err)
  221. }
  222. func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
  223. ctx, cancel := context.WithCancel(context.Background())
  224. defer cancel()
  225. rts := setup(ctx, t, nil, nil, nil, 2)
  226. rts.chunkInCh <- p2p.Envelope{
  227. From: types.NodeID("aa"),
  228. Message: &ssproto.SnapshotsRequest{},
  229. }
  230. response := <-rts.chunkPeerErrCh
  231. require.Error(t, response.Err)
  232. require.Empty(t, rts.chunkOutCh)
  233. require.Contains(t, response.Err.Error(), "received unknown message")
  234. require.Equal(t, types.NodeID("aa"), response.NodeID)
  235. }
  236. func TestReactor_ChunkRequest(t *testing.T) {
  237. testcases := map[string]struct {
  238. request *ssproto.ChunkRequest
  239. chunk []byte
  240. expectResponse *ssproto.ChunkResponse
  241. }{
  242. "chunk is returned": {
  243. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  244. []byte{1, 2, 3},
  245. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 2, 3}},
  246. },
  247. "empty chunk is returned, as empty": {
  248. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  249. []byte{},
  250. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{}},
  251. },
  252. "nil (missing) chunk is returned as missing": {
  253. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  254. nil,
  255. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true},
  256. },
  257. "invalid request": {
  258. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  259. nil,
  260. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true},
  261. },
  262. }
  263. bctx, bcancel := context.WithCancel(context.Background())
  264. defer bcancel()
  265. for name, tc := range testcases {
  266. t.Run(name, func(t *testing.T) {
  267. ctx, cancel := context.WithCancel(bctx)
  268. defer cancel()
  269. // mock ABCI connection to return local snapshots
  270. conn := &proxymocks.AppConnSnapshot{}
  271. conn.On("LoadSnapshotChunk", mock.Anything, abci.RequestLoadSnapshotChunk{
  272. Height: tc.request.Height,
  273. Format: tc.request.Format,
  274. Chunk: tc.request.Index,
  275. }).Return(&abci.ResponseLoadSnapshotChunk{Chunk: tc.chunk}, nil)
  276. rts := setup(ctx, t, conn, nil, nil, 2)
  277. rts.chunkInCh <- p2p.Envelope{
  278. From: types.NodeID("aa"),
  279. Message: tc.request,
  280. }
  281. response := <-rts.chunkOutCh
  282. require.Equal(t, tc.expectResponse, response.Message)
  283. require.Empty(t, rts.chunkOutCh)
  284. conn.AssertExpectations(t)
  285. })
  286. }
  287. }
  288. func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) {
  289. ctx, cancel := context.WithCancel(context.Background())
  290. defer cancel()
  291. rts := setup(ctx, t, nil, nil, nil, 2)
  292. rts.snapshotInCh <- p2p.Envelope{
  293. From: types.NodeID("aa"),
  294. Message: &ssproto.ChunkRequest{},
  295. }
  296. response := <-rts.snapshotPeerErrCh
  297. require.Error(t, response.Err)
  298. require.Empty(t, rts.snapshotOutCh)
  299. require.Contains(t, response.Err.Error(), "received unknown message")
  300. require.Equal(t, types.NodeID("aa"), response.NodeID)
  301. }
  302. func TestReactor_SnapshotsRequest(t *testing.T) {
  303. testcases := map[string]struct {
  304. snapshots []*abci.Snapshot
  305. expectResponses []*ssproto.SnapshotsResponse
  306. }{
  307. "no snapshots": {nil, []*ssproto.SnapshotsResponse{}},
  308. ">10 unordered snapshots": {
  309. []*abci.Snapshot{
  310. {Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1}},
  311. {Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
  312. {Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
  313. {Height: 1, Format: 1, Chunks: 7, Hash: []byte{1, 1}, Metadata: []byte{4}},
  314. {Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
  315. {Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
  316. {Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
  317. {Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
  318. {Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
  319. {Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
  320. {Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
  321. {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
  322. },
  323. []*ssproto.SnapshotsResponse{
  324. {Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
  325. {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
  326. {Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
  327. {Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
  328. {Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
  329. {Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
  330. {Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
  331. {Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
  332. {Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
  333. {Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
  334. },
  335. },
  336. }
  337. ctx, cancel := context.WithCancel(context.Background())
  338. defer cancel()
  339. for name, tc := range testcases {
  340. tc := tc
  341. t.Run(name, func(t *testing.T) {
  342. ctx, cancel := context.WithCancel(ctx)
  343. defer cancel()
  344. // mock ABCI connection to return local snapshots
  345. conn := &proxymocks.AppConnSnapshot{}
  346. conn.On("ListSnapshots", mock.Anything, abci.RequestListSnapshots{}).Return(&abci.ResponseListSnapshots{
  347. Snapshots: tc.snapshots,
  348. }, nil)
  349. rts := setup(ctx, t, conn, nil, nil, 100)
  350. rts.snapshotInCh <- p2p.Envelope{
  351. From: types.NodeID("aa"),
  352. Message: &ssproto.SnapshotsRequest{},
  353. }
  354. if len(tc.expectResponses) > 0 {
  355. retryUntil(ctx, t, func() bool { return len(rts.snapshotOutCh) == len(tc.expectResponses) }, time.Second)
  356. }
  357. responses := make([]*ssproto.SnapshotsResponse, len(tc.expectResponses))
  358. for i := 0; i < len(tc.expectResponses); i++ {
  359. e := <-rts.snapshotOutCh
  360. responses[i] = e.Message.(*ssproto.SnapshotsResponse)
  361. }
  362. require.Equal(t, tc.expectResponses, responses)
  363. require.Empty(t, rts.snapshotOutCh)
  364. })
  365. }
  366. }
  367. func TestReactor_LightBlockResponse(t *testing.T) {
  368. ctx, cancel := context.WithCancel(context.Background())
  369. defer cancel()
  370. rts := setup(ctx, t, nil, nil, nil, 2)
  371. var height int64 = 10
  372. // generates a random header
  373. h := factory.MakeHeader(t, &types.Header{})
  374. h.Height = height
  375. blockID := factory.MakeBlockIDWithHash(h.Hash())
  376. vals, pv := factory.ValidatorSet(ctx, t, 1, 10)
  377. vote, err := factory.MakeVote(ctx, pv[0], h.ChainID, 0, h.Height, 0, 2,
  378. blockID, factory.DefaultTestTime)
  379. require.NoError(t, err)
  380. sh := &types.SignedHeader{
  381. Header: h,
  382. Commit: &types.Commit{
  383. Height: h.Height,
  384. BlockID: blockID,
  385. Signatures: []types.CommitSig{
  386. vote.CommitSig(),
  387. },
  388. },
  389. }
  390. lb := &types.LightBlock{
  391. SignedHeader: sh,
  392. ValidatorSet: vals,
  393. }
  394. require.NoError(t, rts.blockStore.SaveSignedHeader(sh, blockID))
  395. rts.stateStore.On("LoadValidators", height).Return(vals, nil)
  396. rts.blockInCh <- p2p.Envelope{
  397. From: types.NodeID("aa"),
  398. Message: &ssproto.LightBlockRequest{
  399. Height: 10,
  400. },
  401. }
  402. require.Empty(t, rts.blockPeerErrCh)
  403. select {
  404. case response := <-rts.blockOutCh:
  405. require.Equal(t, types.NodeID("aa"), response.To)
  406. res, ok := response.Message.(*ssproto.LightBlockResponse)
  407. require.True(t, ok)
  408. receivedLB, err := types.LightBlockFromProto(res.LightBlock)
  409. require.NoError(t, err)
  410. require.Equal(t, lb, receivedLB)
  411. case <-time.After(1 * time.Second):
  412. t.Fatal("expected light block response")
  413. }
  414. }
  415. func TestReactor_BlockProviders(t *testing.T) {
  416. ctx, cancel := context.WithCancel(context.Background())
  417. defer cancel()
  418. rts := setup(ctx, t, nil, nil, nil, 2)
  419. rts.peerUpdateCh <- p2p.PeerUpdate{
  420. NodeID: types.NodeID("aa"),
  421. Status: p2p.PeerStatusUp,
  422. Channels: p2p.ChannelIDSet{
  423. SnapshotChannel: struct{}{},
  424. ChunkChannel: struct{}{},
  425. LightBlockChannel: struct{}{},
  426. ParamsChannel: struct{}{},
  427. },
  428. }
  429. rts.peerUpdateCh <- p2p.PeerUpdate{
  430. NodeID: types.NodeID("bb"),
  431. Status: p2p.PeerStatusUp,
  432. Channels: p2p.ChannelIDSet{
  433. SnapshotChannel: struct{}{},
  434. ChunkChannel: struct{}{},
  435. LightBlockChannel: struct{}{},
  436. ParamsChannel: struct{}{},
  437. },
  438. }
  439. closeCh := make(chan struct{})
  440. defer close(closeCh)
  441. chain := buildLightBlockChain(ctx, t, 1, 10, time.Now())
  442. go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
  443. peers := rts.reactor.peers.All()
  444. require.Len(t, peers, 2)
  445. providers := make([]provider.Provider, len(peers))
  446. for idx, peer := range peers {
  447. providers[idx] = NewBlockProvider(peer, factory.DefaultTestChainID, rts.reactor.dispatcher)
  448. }
  449. wg := sync.WaitGroup{}
  450. for _, p := range providers {
  451. wg.Add(1)
  452. go func(t *testing.T, p provider.Provider) {
  453. defer wg.Done()
  454. for height := 2; height < 10; height++ {
  455. lb, err := p.LightBlock(ctx, int64(height))
  456. require.NoError(t, err)
  457. require.NotNil(t, lb)
  458. require.Equal(t, height, int(lb.Height))
  459. }
  460. }(t, p)
  461. }
  462. go func() { wg.Wait(); cancel() }()
  463. select {
  464. case <-time.After(time.Second):
  465. // not all of the requests to the dispatcher were responded to
  466. // within the timeout
  467. t.Fail()
  468. case <-ctx.Done():
  469. }
  470. }
  471. func TestReactor_StateProviderP2P(t *testing.T) {
  472. ctx, cancel := context.WithCancel(context.Background())
  473. defer cancel()
  474. rts := setup(ctx, t, nil, nil, nil, 2)
  475. // make syncer non nil else test won't think we are state syncing
  476. rts.reactor.syncer = rts.syncer
  477. peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength))
  478. peerB := types.NodeID(strings.Repeat("b", 2*types.NodeIDByteLength))
  479. rts.peerUpdateCh <- p2p.PeerUpdate{
  480. NodeID: peerA,
  481. Status: p2p.PeerStatusUp,
  482. }
  483. rts.peerUpdateCh <- p2p.PeerUpdate{
  484. NodeID: peerB,
  485. Status: p2p.PeerStatusUp,
  486. }
  487. closeCh := make(chan struct{})
  488. defer close(closeCh)
  489. chain := buildLightBlockChain(ctx, t, 1, 10, time.Now())
  490. go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
  491. go handleConsensusParamsRequest(ctx, t, rts.paramsOutCh, rts.paramsInCh, closeCh)
  492. rts.reactor.cfg.UseP2P = true
  493. rts.reactor.cfg.TrustHeight = 1
  494. rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())
  495. for _, p := range []types.NodeID{peerA, peerB} {
  496. if !rts.reactor.peers.Contains(p) {
  497. rts.reactor.peers.Append(p)
  498. }
  499. }
  500. require.True(t, rts.reactor.peers.Len() >= 2, "peer network not configured")
  501. ictx, cancel := context.WithTimeout(ctx, time.Second)
  502. defer cancel()
  503. rts.reactor.mtx.Lock()
  504. err := rts.reactor.initStateProvider(ictx, factory.DefaultTestChainID, 1)
  505. rts.reactor.mtx.Unlock()
  506. require.NoError(t, err)
  507. rts.reactor.syncer.stateProvider = rts.reactor.stateProvider
  508. actx, cancel := context.WithTimeout(ctx, time.Second)
  509. defer cancel()
  510. appHash, err := rts.reactor.stateProvider.AppHash(actx, 5)
  511. require.NoError(t, err)
  512. require.Len(t, appHash, 32)
  513. state, err := rts.reactor.stateProvider.State(actx, 5)
  514. require.NoError(t, err)
  515. require.Equal(t, appHash, state.AppHash)
  516. require.Equal(t, types.DefaultConsensusParams(), &state.ConsensusParams)
  517. commit, err := rts.reactor.stateProvider.Commit(actx, 5)
  518. require.NoError(t, err)
  519. require.Equal(t, commit.BlockID, state.LastBlockID)
  520. added, err := rts.reactor.syncer.AddSnapshot(peerA, &snapshot{
  521. Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1},
  522. })
  523. require.NoError(t, err)
  524. require.True(t, added)
  525. }
  526. func TestReactor_Backfill(t *testing.T) {
  527. ctx, cancel := context.WithCancel(context.Background())
  528. defer cancel()
  529. // test backfill algorithm with varying failure rates [0, 10]
  530. failureRates := []int{0, 2, 9}
  531. for _, failureRate := range failureRates {
  532. failureRate := failureRate
  533. t.Run(fmt.Sprintf("failure rate: %d", failureRate), func(t *testing.T) {
  534. ctx, cancel := context.WithCancel(ctx)
  535. defer cancel()
  536. t.Cleanup(leaktest.CheckTimeout(t, 1*time.Minute))
  537. rts := setup(ctx, t, nil, nil, nil, 21)
  538. var (
  539. startHeight int64 = 20
  540. stopHeight int64 = 10
  541. stopTime = time.Date(2020, 1, 1, 0, 100, 0, 0, time.UTC)
  542. )
  543. peers := []string{"a", "b", "c", "d"}
  544. for _, peer := range peers {
  545. rts.peerUpdateCh <- p2p.PeerUpdate{
  546. NodeID: types.NodeID(peer),
  547. Status: p2p.PeerStatusUp,
  548. Channels: p2p.ChannelIDSet{
  549. SnapshotChannel: struct{}{},
  550. ChunkChannel: struct{}{},
  551. LightBlockChannel: struct{}{},
  552. ParamsChannel: struct{}{},
  553. },
  554. }
  555. }
  556. trackingHeight := startHeight
  557. rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"),
  558. mock.AnythingOfType("*types.ValidatorSet")).Return(func(lh, uh int64, vals *types.ValidatorSet) error {
  559. require.Equal(t, trackingHeight, lh)
  560. require.Equal(t, lh, uh)
  561. require.GreaterOrEqual(t, lh, stopHeight)
  562. trackingHeight--
  563. return nil
  564. })
  565. chain := buildLightBlockChain(ctx, t, stopHeight-1, startHeight+1, stopTime)
  566. closeCh := make(chan struct{})
  567. defer close(closeCh)
  568. go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh,
  569. rts.blockInCh, closeCh, failureRate)
  570. err := rts.reactor.backfill(
  571. ctx,
  572. factory.DefaultTestChainID,
  573. startHeight,
  574. stopHeight,
  575. 1,
  576. factory.MakeBlockIDWithHash(chain[startHeight].Header.Hash()),
  577. stopTime,
  578. )
  579. if failureRate > 3 {
  580. require.Error(t, err)
  581. require.NotEqual(t, rts.reactor.backfilledBlocks, rts.reactor.backfillBlockTotal)
  582. require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfillBlockTotal)
  583. } else {
  584. require.NoError(t, err)
  585. for height := startHeight; height <= stopHeight; height++ {
  586. blockMeta := rts.blockStore.LoadBlockMeta(height)
  587. require.NotNil(t, blockMeta)
  588. }
  589. require.Nil(t, rts.blockStore.LoadBlockMeta(stopHeight-1))
  590. require.Nil(t, rts.blockStore.LoadBlockMeta(startHeight+1))
  591. require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfilledBlocks)
  592. require.Equal(t, startHeight-stopHeight+1, rts.reactor.backfillBlockTotal)
  593. }
  594. require.Equal(t, rts.reactor.backfilledBlocks, rts.reactor.BackFilledBlocks())
  595. require.Equal(t, rts.reactor.backfillBlockTotal, rts.reactor.BackFillBlocksTotal())
  596. })
  597. }
  598. }
  599. // retryUntil will continue to evaluate fn and will return successfully when true
  600. // or fail when the timeout is reached.
  601. func retryUntil(ctx context.Context, t *testing.T, fn func() bool, timeout time.Duration) {
  602. ctx, cancel := context.WithTimeout(ctx, timeout)
  603. defer cancel()
  604. for {
  605. if fn() {
  606. return
  607. }
  608. require.NoError(t, ctx.Err())
  609. }
  610. }
  611. func handleLightBlockRequests(
  612. ctx context.Context,
  613. t *testing.T,
  614. chain map[int64]*types.LightBlock,
  615. receiving chan p2p.Envelope,
  616. sending chan p2p.Envelope,
  617. close chan struct{},
  618. failureRate int) {
  619. requests := 0
  620. errorCount := 0
  621. for {
  622. select {
  623. case <-ctx.Done():
  624. return
  625. case envelope := <-receiving:
  626. if msg, ok := envelope.Message.(*ssproto.LightBlockRequest); ok {
  627. if requests%10 >= failureRate {
  628. lb, err := chain[int64(msg.Height)].ToProto()
  629. require.NoError(t, err)
  630. sending <- p2p.Envelope{
  631. From: envelope.To,
  632. Message: &ssproto.LightBlockResponse{
  633. LightBlock: lb,
  634. },
  635. }
  636. } else {
  637. switch errorCount % 3 {
  638. case 0: // send a different block
  639. vals, pv := factory.ValidatorSet(ctx, t, 3, 10)
  640. _, _, lb := mockLB(ctx, t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID(), vals, pv)
  641. differntLB, err := lb.ToProto()
  642. require.NoError(t, err)
  643. sending <- p2p.Envelope{
  644. From: envelope.To,
  645. Message: &ssproto.LightBlockResponse{
  646. LightBlock: differntLB,
  647. },
  648. }
  649. case 1: // send nil block i.e. pretend we don't have it
  650. sending <- p2p.Envelope{
  651. From: envelope.To,
  652. Message: &ssproto.LightBlockResponse{
  653. LightBlock: nil,
  654. },
  655. }
  656. case 2: // don't do anything
  657. }
  658. errorCount++
  659. }
  660. }
  661. case <-close:
  662. return
  663. }
  664. requests++
  665. }
  666. }
  667. func handleConsensusParamsRequest(
  668. ctx context.Context,
  669. t *testing.T,
  670. receiving, sending chan p2p.Envelope,
  671. closeCh chan struct{},
  672. ) {
  673. t.Helper()
  674. params := types.DefaultConsensusParams()
  675. paramsProto := params.ToProto()
  676. for {
  677. select {
  678. case <-ctx.Done():
  679. return
  680. case envelope := <-receiving:
  681. if ctx.Err() != nil {
  682. return
  683. }
  684. t.Log("received consensus params request")
  685. msg, ok := envelope.Message.(*ssproto.ParamsRequest)
  686. require.True(t, ok)
  687. sending <- p2p.Envelope{
  688. From: envelope.To,
  689. Message: &ssproto.ParamsResponse{
  690. Height: msg.Height,
  691. ConsensusParams: paramsProto,
  692. },
  693. }
  694. case <-closeCh:
  695. return
  696. }
  697. }
  698. }
  699. func buildLightBlockChain(ctx context.Context, t *testing.T, fromHeight, toHeight int64, startTime time.Time) map[int64]*types.LightBlock {
  700. t.Helper()
  701. chain := make(map[int64]*types.LightBlock, toHeight-fromHeight)
  702. lastBlockID := factory.MakeBlockID()
  703. blockTime := startTime.Add(time.Duration(fromHeight-toHeight) * time.Minute)
  704. vals, pv := factory.ValidatorSet(ctx, t, 3, 10)
  705. for height := fromHeight; height < toHeight; height++ {
  706. vals, pv, chain[height] = mockLB(ctx, t, height, blockTime, lastBlockID, vals, pv)
  707. lastBlockID = factory.MakeBlockIDWithHash(chain[height].Header.Hash())
  708. blockTime = blockTime.Add(1 * time.Minute)
  709. }
  710. return chain
  711. }
  712. func mockLB(ctx context.Context, t *testing.T, height int64, time time.Time, lastBlockID types.BlockID,
  713. currentVals *types.ValidatorSet, currentPrivVals []types.PrivValidator,
  714. ) (*types.ValidatorSet, []types.PrivValidator, *types.LightBlock) {
  715. t.Helper()
  716. header := factory.MakeHeader(t, &types.Header{
  717. Height: height,
  718. LastBlockID: lastBlockID,
  719. Time: time,
  720. })
  721. header.Version.App = testAppVersion
  722. nextVals, nextPrivVals := factory.ValidatorSet(ctx, t, 3, 10)
  723. header.ValidatorsHash = currentVals.Hash()
  724. header.NextValidatorsHash = nextVals.Hash()
  725. header.ConsensusHash = types.DefaultConsensusParams().HashConsensusParams()
  726. lastBlockID = factory.MakeBlockIDWithHash(header.Hash())
  727. voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, currentVals)
  728. commit, err := factory.MakeCommit(ctx, lastBlockID, height, 0, voteSet, currentPrivVals, time)
  729. require.NoError(t, err)
  730. return nextVals, nextPrivVals, &types.LightBlock{
  731. SignedHeader: &types.SignedHeader{
  732. Header: header,
  733. Commit: commit,
  734. },
  735. ValidatorSet: currentVals,
  736. }
  737. }
  738. // graduallyAddPeers delivers a new randomly-generated peer update on peerUpdateCh once
  739. // per interval, until closeCh is closed. Each peer update is assigned a random node ID.
  740. func graduallyAddPeers(
  741. t *testing.T,
  742. peerUpdateCh chan p2p.PeerUpdate,
  743. closeCh chan struct{},
  744. interval time.Duration,
  745. ) {
  746. ticker := time.NewTicker(interval)
  747. for {
  748. select {
  749. case <-ticker.C:
  750. peerUpdateCh <- p2p.PeerUpdate{
  751. NodeID: factory.RandomNodeID(t),
  752. Status: p2p.PeerStatusUp,
  753. Channels: p2p.ChannelIDSet{
  754. SnapshotChannel: struct{}{},
  755. ChunkChannel: struct{}{},
  756. LightBlockChannel: struct{}{},
  757. ParamsChannel: struct{}{},
  758. },
  759. }
  760. case <-closeCh:
  761. return
  762. }
  763. }
  764. }
  765. func handleSnapshotRequests(
  766. t *testing.T,
  767. receivingCh chan p2p.Envelope,
  768. sendingCh chan p2p.Envelope,
  769. closeCh chan struct{},
  770. snapshots []snapshot,
  771. ) {
  772. t.Helper()
  773. for {
  774. select {
  775. case envelope := <-receivingCh:
  776. _, ok := envelope.Message.(*ssproto.SnapshotsRequest)
  777. require.True(t, ok)
  778. for _, snapshot := range snapshots {
  779. sendingCh <- p2p.Envelope{
  780. From: envelope.To,
  781. Message: &ssproto.SnapshotsResponse{
  782. Height: snapshot.Height,
  783. Format: snapshot.Format,
  784. Chunks: snapshot.Chunks,
  785. Hash: snapshot.Hash,
  786. Metadata: snapshot.Metadata,
  787. },
  788. }
  789. }
  790. case <-closeCh:
  791. return
  792. }
  793. }
  794. }
  795. func handleChunkRequests(
  796. t *testing.T,
  797. receivingCh chan p2p.Envelope,
  798. sendingCh chan p2p.Envelope,
  799. closeCh chan struct{},
  800. chunk []byte,
  801. ) {
  802. t.Helper()
  803. for {
  804. select {
  805. case envelope := <-receivingCh:
  806. msg, ok := envelope.Message.(*ssproto.ChunkRequest)
  807. require.True(t, ok)
  808. sendingCh <- p2p.Envelope{
  809. From: envelope.To,
  810. Message: &ssproto.ChunkResponse{
  811. Height: msg.Height,
  812. Format: msg.Format,
  813. Index: msg.Index,
  814. Chunk: chunk,
  815. Missing: false,
  816. },
  817. }
  818. case <-closeCh:
  819. return
  820. }
  821. }
  822. }