You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

584 lines
16 KiB

  1. package statesync
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "sync"
  7. "testing"
  8. "time"
  9. // "github.com/fortytw2/leaktest"
  10. "github.com/stretchr/testify/mock"
  11. "github.com/stretchr/testify/require"
  12. dbm "github.com/tendermint/tm-db"
  13. abci "github.com/tendermint/tendermint/abci/types"
  14. "github.com/tendermint/tendermint/config"
  15. "github.com/tendermint/tendermint/internal/p2p"
  16. "github.com/tendermint/tendermint/internal/statesync/mocks"
  17. "github.com/tendermint/tendermint/internal/test/factory"
  18. "github.com/tendermint/tendermint/libs/log"
  19. "github.com/tendermint/tendermint/light/provider"
  20. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  21. tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
  22. proxymocks "github.com/tendermint/tendermint/proxy/mocks"
  23. smmocks "github.com/tendermint/tendermint/state/mocks"
  24. "github.com/tendermint/tendermint/store"
  25. "github.com/tendermint/tendermint/types"
  26. )
  27. type reactorTestSuite struct {
  28. reactor *Reactor
  29. syncer *syncer
  30. conn *proxymocks.AppConnSnapshot
  31. connQuery *proxymocks.AppConnQuery
  32. stateProvider *mocks.StateProvider
  33. snapshotChannel *p2p.Channel
  34. snapshotInCh chan p2p.Envelope
  35. snapshotOutCh chan p2p.Envelope
  36. snapshotPeerErrCh chan p2p.PeerError
  37. chunkChannel *p2p.Channel
  38. chunkInCh chan p2p.Envelope
  39. chunkOutCh chan p2p.Envelope
  40. chunkPeerErrCh chan p2p.PeerError
  41. blockChannel *p2p.Channel
  42. blockInCh chan p2p.Envelope
  43. blockOutCh chan p2p.Envelope
  44. blockPeerErrCh chan p2p.PeerError
  45. peerUpdateCh chan p2p.PeerUpdate
  46. peerUpdates *p2p.PeerUpdates
  47. stateStore *smmocks.Store
  48. blockStore *store.BlockStore
  49. }
  50. func setup(
  51. t *testing.T,
  52. conn *proxymocks.AppConnSnapshot,
  53. connQuery *proxymocks.AppConnQuery,
  54. stateProvider *mocks.StateProvider,
  55. chBuf uint,
  56. ) *reactorTestSuite {
  57. t.Helper()
  58. if conn == nil {
  59. conn = &proxymocks.AppConnSnapshot{}
  60. }
  61. if connQuery == nil {
  62. connQuery = &proxymocks.AppConnQuery{}
  63. }
  64. if stateProvider == nil {
  65. stateProvider = &mocks.StateProvider{}
  66. }
  67. rts := &reactorTestSuite{
  68. snapshotInCh: make(chan p2p.Envelope, chBuf),
  69. snapshotOutCh: make(chan p2p.Envelope, chBuf),
  70. snapshotPeerErrCh: make(chan p2p.PeerError, chBuf),
  71. chunkInCh: make(chan p2p.Envelope, chBuf),
  72. chunkOutCh: make(chan p2p.Envelope, chBuf),
  73. chunkPeerErrCh: make(chan p2p.PeerError, chBuf),
  74. blockInCh: make(chan p2p.Envelope, chBuf),
  75. blockOutCh: make(chan p2p.Envelope, chBuf),
  76. blockPeerErrCh: make(chan p2p.PeerError, chBuf),
  77. conn: conn,
  78. connQuery: connQuery,
  79. stateProvider: stateProvider,
  80. }
  81. rts.peerUpdateCh = make(chan p2p.PeerUpdate, chBuf)
  82. rts.peerUpdates = p2p.NewPeerUpdates(rts.peerUpdateCh, int(chBuf))
  83. rts.snapshotChannel = p2p.NewChannel(
  84. SnapshotChannel,
  85. new(ssproto.Message),
  86. rts.snapshotInCh,
  87. rts.snapshotOutCh,
  88. rts.snapshotPeerErrCh,
  89. )
  90. rts.chunkChannel = p2p.NewChannel(
  91. ChunkChannel,
  92. new(ssproto.Message),
  93. rts.chunkInCh,
  94. rts.chunkOutCh,
  95. rts.chunkPeerErrCh,
  96. )
  97. rts.blockChannel = p2p.NewChannel(
  98. LightBlockChannel,
  99. new(ssproto.Message),
  100. rts.blockInCh,
  101. rts.blockOutCh,
  102. rts.blockPeerErrCh,
  103. )
  104. rts.stateStore = &smmocks.Store{}
  105. rts.blockStore = store.NewBlockStore(dbm.NewMemDB())
  106. cfg := config.DefaultStateSyncConfig()
  107. rts.reactor = NewReactor(
  108. *cfg,
  109. log.TestingLogger(),
  110. conn,
  111. connQuery,
  112. rts.snapshotChannel,
  113. rts.chunkChannel,
  114. rts.blockChannel,
  115. rts.peerUpdates,
  116. rts.stateStore,
  117. rts.blockStore,
  118. "",
  119. )
  120. // override the dispatcher with one with a shorter timeout
  121. rts.reactor.dispatcher = newDispatcher(rts.blockChannel.Out, 1*time.Second)
  122. rts.syncer = newSyncer(
  123. *cfg,
  124. log.NewNopLogger(),
  125. conn,
  126. connQuery,
  127. stateProvider,
  128. rts.snapshotOutCh,
  129. rts.chunkOutCh,
  130. "",
  131. )
  132. require.NoError(t, rts.reactor.Start())
  133. require.True(t, rts.reactor.IsRunning())
  134. t.Cleanup(func() {
  135. require.NoError(t, rts.reactor.Stop())
  136. require.False(t, rts.reactor.IsRunning())
  137. })
  138. return rts
  139. }
  140. func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
  141. rts := setup(t, nil, nil, nil, 2)
  142. rts.chunkInCh <- p2p.Envelope{
  143. From: p2p.NodeID("aa"),
  144. Message: &ssproto.SnapshotsRequest{},
  145. }
  146. response := <-rts.chunkPeerErrCh
  147. require.Error(t, response.Err)
  148. require.Empty(t, rts.chunkOutCh)
  149. require.Contains(t, response.Err.Error(), "received unknown message")
  150. require.Equal(t, p2p.NodeID("aa"), response.NodeID)
  151. }
  152. func TestReactor_ChunkRequest(t *testing.T) {
  153. testcases := map[string]struct {
  154. request *ssproto.ChunkRequest
  155. chunk []byte
  156. expectResponse *ssproto.ChunkResponse
  157. }{
  158. "chunk is returned": {
  159. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  160. []byte{1, 2, 3},
  161. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 2, 3}},
  162. },
  163. "empty chunk is returned, as empty": {
  164. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  165. []byte{},
  166. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{}},
  167. },
  168. "nil (missing) chunk is returned as missing": {
  169. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  170. nil,
  171. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true},
  172. },
  173. "invalid request": {
  174. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  175. nil,
  176. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true},
  177. },
  178. }
  179. for name, tc := range testcases {
  180. tc := tc
  181. t.Run(name, func(t *testing.T) {
  182. // mock ABCI connection to return local snapshots
  183. conn := &proxymocks.AppConnSnapshot{}
  184. conn.On("LoadSnapshotChunkSync", context.Background(), abci.RequestLoadSnapshotChunk{
  185. Height: tc.request.Height,
  186. Format: tc.request.Format,
  187. Chunk: tc.request.Index,
  188. }).Return(&abci.ResponseLoadSnapshotChunk{Chunk: tc.chunk}, nil)
  189. rts := setup(t, conn, nil, nil, 2)
  190. rts.chunkInCh <- p2p.Envelope{
  191. From: p2p.NodeID("aa"),
  192. Message: tc.request,
  193. }
  194. response := <-rts.chunkOutCh
  195. require.Equal(t, tc.expectResponse, response.Message)
  196. require.Empty(t, rts.chunkOutCh)
  197. conn.AssertExpectations(t)
  198. })
  199. }
  200. }
  201. func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) {
  202. rts := setup(t, nil, nil, nil, 2)
  203. rts.snapshotInCh <- p2p.Envelope{
  204. From: p2p.NodeID("aa"),
  205. Message: &ssproto.ChunkRequest{},
  206. }
  207. response := <-rts.snapshotPeerErrCh
  208. require.Error(t, response.Err)
  209. require.Empty(t, rts.snapshotOutCh)
  210. require.Contains(t, response.Err.Error(), "received unknown message")
  211. require.Equal(t, p2p.NodeID("aa"), response.NodeID)
  212. }
  213. func TestReactor_SnapshotsRequest(t *testing.T) {
  214. testcases := map[string]struct {
  215. snapshots []*abci.Snapshot
  216. expectResponses []*ssproto.SnapshotsResponse
  217. }{
  218. "no snapshots": {nil, []*ssproto.SnapshotsResponse{}},
  219. ">10 unordered snapshots": {
  220. []*abci.Snapshot{
  221. {Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1}},
  222. {Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
  223. {Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
  224. {Height: 1, Format: 1, Chunks: 7, Hash: []byte{1, 1}, Metadata: []byte{4}},
  225. {Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
  226. {Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
  227. {Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
  228. {Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
  229. {Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
  230. {Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
  231. {Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
  232. {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
  233. },
  234. []*ssproto.SnapshotsResponse{
  235. {Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
  236. {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
  237. {Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
  238. {Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
  239. {Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
  240. {Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
  241. {Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
  242. {Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
  243. {Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
  244. {Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
  245. },
  246. },
  247. }
  248. for name, tc := range testcases {
  249. tc := tc
  250. t.Run(name, func(t *testing.T) {
  251. // mock ABCI connection to return local snapshots
  252. conn := &proxymocks.AppConnSnapshot{}
  253. conn.On("ListSnapshotsSync", context.Background(), abci.RequestListSnapshots{}).Return(&abci.ResponseListSnapshots{
  254. Snapshots: tc.snapshots,
  255. }, nil)
  256. rts := setup(t, conn, nil, nil, 100)
  257. rts.snapshotInCh <- p2p.Envelope{
  258. From: p2p.NodeID("aa"),
  259. Message: &ssproto.SnapshotsRequest{},
  260. }
  261. if len(tc.expectResponses) > 0 {
  262. retryUntil(t, func() bool { return len(rts.snapshotOutCh) == len(tc.expectResponses) }, time.Second)
  263. }
  264. responses := make([]*ssproto.SnapshotsResponse, len(tc.expectResponses))
  265. for i := 0; i < len(tc.expectResponses); i++ {
  266. e := <-rts.snapshotOutCh
  267. responses[i] = e.Message.(*ssproto.SnapshotsResponse)
  268. }
  269. require.Equal(t, tc.expectResponses, responses)
  270. require.Empty(t, rts.snapshotOutCh)
  271. })
  272. }
  273. }
  274. func TestReactor_LightBlockResponse(t *testing.T) {
  275. rts := setup(t, nil, nil, nil, 2)
  276. var height int64 = 10
  277. h := factory.MakeRandomHeader()
  278. h.Height = height
  279. blockID := factory.MakeBlockIDWithHash(h.Hash())
  280. vals, pv := factory.RandValidatorSet(1, 10)
  281. vote, err := factory.MakeVote(pv[0], h.ChainID, 0, h.Height, 0, 2,
  282. blockID, factory.DefaultTestTime)
  283. require.NoError(t, err)
  284. sh := &types.SignedHeader{
  285. Header: h,
  286. Commit: &types.Commit{
  287. Height: h.Height,
  288. BlockID: blockID,
  289. Signatures: []types.CommitSig{
  290. vote.CommitSig(),
  291. },
  292. },
  293. }
  294. lb := &types.LightBlock{
  295. SignedHeader: sh,
  296. ValidatorSet: vals,
  297. }
  298. require.NoError(t, rts.blockStore.SaveSignedHeader(sh, blockID))
  299. rts.stateStore.On("LoadValidators", height).Return(vals, nil)
  300. rts.blockInCh <- p2p.Envelope{
  301. From: p2p.NodeID("aa"),
  302. Message: &ssproto.LightBlockRequest{
  303. Height: 10,
  304. },
  305. }
  306. require.Empty(t, rts.blockPeerErrCh)
  307. select {
  308. case response := <-rts.blockOutCh:
  309. require.Equal(t, p2p.NodeID("aa"), response.To)
  310. res, ok := response.Message.(*ssproto.LightBlockResponse)
  311. require.True(t, ok)
  312. receivedLB, err := types.LightBlockFromProto(res.LightBlock)
  313. require.NoError(t, err)
  314. require.Equal(t, lb, receivedLB)
  315. case <-time.After(1 * time.Second):
  316. t.Fatal("expected light block response")
  317. }
  318. }
  319. func TestReactor_Dispatcher(t *testing.T) {
  320. rts := setup(t, nil, nil, nil, 2)
  321. rts.peerUpdateCh <- p2p.PeerUpdate{
  322. NodeID: p2p.NodeID("aa"),
  323. Status: p2p.PeerStatusUp,
  324. }
  325. rts.peerUpdateCh <- p2p.PeerUpdate{
  326. NodeID: p2p.NodeID("bb"),
  327. Status: p2p.PeerStatusUp,
  328. }
  329. closeCh := make(chan struct{})
  330. defer close(closeCh)
  331. chain := buildLightBlockChain(t, 1, 10, time.Now())
  332. go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
  333. dispatcher := rts.reactor.Dispatcher()
  334. providers := dispatcher.Providers(factory.DefaultTestChainID, 5*time.Second)
  335. require.Len(t, providers, 2)
  336. wg := sync.WaitGroup{}
  337. for _, p := range providers {
  338. wg.Add(1)
  339. go func(t *testing.T, p provider.Provider) {
  340. defer wg.Done()
  341. for height := 2; height < 10; height++ {
  342. lb, err := p.LightBlock(context.Background(), int64(height))
  343. require.NoError(t, err)
  344. require.NotNil(t, lb)
  345. require.Equal(t, height, int(lb.Height))
  346. }
  347. }(t, p)
  348. }
  349. ctx, cancel := context.WithCancel(context.Background())
  350. go func() { wg.Wait(); cancel() }()
  351. select {
  352. case <-time.After(time.Second):
  353. // not all of the requests to the dispatcher were responded to
  354. // within the timeout
  355. t.Fail()
  356. case <-ctx.Done():
  357. }
  358. }
  359. func TestReactor_Backfill(t *testing.T) {
  360. // test backfill algorithm with varying failure rates [0, 10]
  361. failureRates := []int{0, 3, 9}
  362. for _, failureRate := range failureRates {
  363. failureRate := failureRate
  364. t.Run(fmt.Sprintf("failure rate: %d", failureRate), func(t *testing.T) {
  365. // t.Cleanup(leaktest.Check(t))
  366. rts := setup(t, nil, nil, nil, 21)
  367. var (
  368. startHeight int64 = 20
  369. stopHeight int64 = 10
  370. stopTime = time.Date(2020, 1, 1, 0, 100, 0, 0, time.UTC)
  371. )
  372. peers := []string{"a", "b", "c", "d"}
  373. for _, peer := range peers {
  374. rts.peerUpdateCh <- p2p.PeerUpdate{
  375. NodeID: p2p.NodeID(peer),
  376. Status: p2p.PeerStatusUp,
  377. }
  378. }
  379. trackingHeight := startHeight
  380. rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"),
  381. mock.AnythingOfType("*types.ValidatorSet")).Return(func(lh, uh int64, vals *types.ValidatorSet) error {
  382. require.Equal(t, trackingHeight, lh)
  383. require.Equal(t, lh, uh)
  384. require.GreaterOrEqual(t, lh, stopHeight)
  385. trackingHeight--
  386. return nil
  387. })
  388. chain := buildLightBlockChain(t, stopHeight-1, startHeight+1, stopTime)
  389. closeCh := make(chan struct{})
  390. defer close(closeCh)
  391. go handleLightBlockRequests(t, chain, rts.blockOutCh,
  392. rts.blockInCh, closeCh, failureRate)
  393. err := rts.reactor.backfill(
  394. context.Background(),
  395. factory.DefaultTestChainID,
  396. startHeight,
  397. stopHeight,
  398. factory.MakeBlockIDWithHash(chain[startHeight].Header.Hash()),
  399. stopTime,
  400. )
  401. if failureRate > 5 {
  402. require.Error(t, err)
  403. } else {
  404. require.NoError(t, err)
  405. for height := startHeight; height <= stopHeight; height++ {
  406. blockMeta := rts.blockStore.LoadBlockMeta(height)
  407. require.NotNil(t, blockMeta)
  408. }
  409. require.Nil(t, rts.blockStore.LoadBlockMeta(stopHeight-1))
  410. require.Nil(t, rts.blockStore.LoadBlockMeta(startHeight+1))
  411. }
  412. })
  413. }
  414. }
  415. // retryUntil will continue to evaluate fn and will return successfully when true
  416. // or fail when the timeout is reached.
  417. func retryUntil(t *testing.T, fn func() bool, timeout time.Duration) {
  418. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  419. defer cancel()
  420. for {
  421. if fn() {
  422. return
  423. }
  424. require.NoError(t, ctx.Err())
  425. }
  426. }
  427. func handleLightBlockRequests(t *testing.T,
  428. chain map[int64]*types.LightBlock,
  429. receiving chan p2p.Envelope,
  430. sending chan p2p.Envelope,
  431. close chan struct{},
  432. failureRate int) {
  433. requests := 0
  434. for {
  435. select {
  436. case envelope := <-receiving:
  437. if msg, ok := envelope.Message.(*ssproto.LightBlockRequest); ok {
  438. if requests%10 >= failureRate {
  439. lb, err := chain[int64(msg.Height)].ToProto()
  440. require.NoError(t, err)
  441. sending <- p2p.Envelope{
  442. From: envelope.To,
  443. Message: &ssproto.LightBlockResponse{
  444. LightBlock: lb,
  445. },
  446. }
  447. } else {
  448. switch rand.Intn(3) {
  449. case 0: // send a different block
  450. differntLB, err := mockLB(t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID()).ToProto()
  451. require.NoError(t, err)
  452. sending <- p2p.Envelope{
  453. From: envelope.To,
  454. Message: &ssproto.LightBlockResponse{
  455. LightBlock: differntLB,
  456. },
  457. }
  458. case 1: // send nil block i.e. pretend we don't have it
  459. sending <- p2p.Envelope{
  460. From: envelope.To,
  461. Message: &ssproto.LightBlockResponse{
  462. LightBlock: nil,
  463. },
  464. }
  465. case 2: // don't do anything
  466. }
  467. }
  468. }
  469. case <-close:
  470. return
  471. }
  472. requests++
  473. }
  474. }
  475. func buildLightBlockChain(t *testing.T, fromHeight, toHeight int64, startTime time.Time) map[int64]*types.LightBlock {
  476. chain := make(map[int64]*types.LightBlock, toHeight-fromHeight)
  477. lastBlockID := factory.MakeBlockID()
  478. blockTime := startTime.Add(-5 * time.Minute)
  479. for height := fromHeight; height < toHeight; height++ {
  480. chain[height] = mockLB(t, height, blockTime, lastBlockID)
  481. lastBlockID = factory.MakeBlockIDWithHash(chain[height].Header.Hash())
  482. blockTime = blockTime.Add(1 * time.Minute)
  483. }
  484. return chain
  485. }
  486. func mockLB(t *testing.T, height int64, time time.Time,
  487. lastBlockID types.BlockID) *types.LightBlock {
  488. header, err := factory.MakeHeader(&types.Header{
  489. Height: height,
  490. LastBlockID: lastBlockID,
  491. Time: time,
  492. })
  493. require.NoError(t, err)
  494. vals, pv := factory.RandValidatorSet(3, 10)
  495. header.ValidatorsHash = vals.Hash()
  496. lastBlockID = factory.MakeBlockIDWithHash(header.Hash())
  497. voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, vals)
  498. commit, err := factory.MakeCommit(lastBlockID, height, 0, voteSet, pv, time)
  499. require.NoError(t, err)
  500. return &types.LightBlock{
  501. SignedHeader: &types.SignedHeader{
  502. Header: header,
  503. Commit: commit,
  504. },
  505. ValidatorSet: vals,
  506. }
  507. }