You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

579 lines
16 KiB

  1. package statesync
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "sync"
  7. "testing"
  8. "time"
  9. // "github.com/fortytw2/leaktest"
  10. "github.com/stretchr/testify/mock"
  11. "github.com/stretchr/testify/require"
  12. dbm "github.com/tendermint/tm-db"
  13. abci "github.com/tendermint/tendermint/abci/types"
  14. "github.com/tendermint/tendermint/internal/p2p"
  15. "github.com/tendermint/tendermint/internal/statesync/mocks"
  16. "github.com/tendermint/tendermint/internal/test/factory"
  17. "github.com/tendermint/tendermint/libs/log"
  18. "github.com/tendermint/tendermint/light/provider"
  19. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  20. tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
  21. proxymocks "github.com/tendermint/tendermint/proxy/mocks"
  22. smmocks "github.com/tendermint/tendermint/state/mocks"
  23. "github.com/tendermint/tendermint/store"
  24. "github.com/tendermint/tendermint/types"
  25. )
  26. type reactorTestSuite struct {
  27. reactor *Reactor
  28. syncer *syncer
  29. conn *proxymocks.AppConnSnapshot
  30. connQuery *proxymocks.AppConnQuery
  31. stateProvider *mocks.StateProvider
  32. snapshotChannel *p2p.Channel
  33. snapshotInCh chan p2p.Envelope
  34. snapshotOutCh chan p2p.Envelope
  35. snapshotPeerErrCh chan p2p.PeerError
  36. chunkChannel *p2p.Channel
  37. chunkInCh chan p2p.Envelope
  38. chunkOutCh chan p2p.Envelope
  39. chunkPeerErrCh chan p2p.PeerError
  40. blockChannel *p2p.Channel
  41. blockInCh chan p2p.Envelope
  42. blockOutCh chan p2p.Envelope
  43. blockPeerErrCh chan p2p.PeerError
  44. peerUpdateCh chan p2p.PeerUpdate
  45. peerUpdates *p2p.PeerUpdates
  46. stateStore *smmocks.Store
  47. blockStore *store.BlockStore
  48. }
  49. func setup(
  50. t *testing.T,
  51. conn *proxymocks.AppConnSnapshot,
  52. connQuery *proxymocks.AppConnQuery,
  53. stateProvider *mocks.StateProvider,
  54. chBuf uint,
  55. ) *reactorTestSuite {
  56. t.Helper()
  57. if conn == nil {
  58. conn = &proxymocks.AppConnSnapshot{}
  59. }
  60. if connQuery == nil {
  61. connQuery = &proxymocks.AppConnQuery{}
  62. }
  63. if stateProvider == nil {
  64. stateProvider = &mocks.StateProvider{}
  65. }
  66. rts := &reactorTestSuite{
  67. snapshotInCh: make(chan p2p.Envelope, chBuf),
  68. snapshotOutCh: make(chan p2p.Envelope, chBuf),
  69. snapshotPeerErrCh: make(chan p2p.PeerError, chBuf),
  70. chunkInCh: make(chan p2p.Envelope, chBuf),
  71. chunkOutCh: make(chan p2p.Envelope, chBuf),
  72. chunkPeerErrCh: make(chan p2p.PeerError, chBuf),
  73. blockInCh: make(chan p2p.Envelope, chBuf),
  74. blockOutCh: make(chan p2p.Envelope, chBuf),
  75. blockPeerErrCh: make(chan p2p.PeerError, chBuf),
  76. conn: conn,
  77. connQuery: connQuery,
  78. stateProvider: stateProvider,
  79. }
  80. rts.peerUpdateCh = make(chan p2p.PeerUpdate, chBuf)
  81. rts.peerUpdates = p2p.NewPeerUpdates(rts.peerUpdateCh, int(chBuf))
  82. rts.snapshotChannel = p2p.NewChannel(
  83. SnapshotChannel,
  84. new(ssproto.Message),
  85. rts.snapshotInCh,
  86. rts.snapshotOutCh,
  87. rts.snapshotPeerErrCh,
  88. )
  89. rts.chunkChannel = p2p.NewChannel(
  90. ChunkChannel,
  91. new(ssproto.Message),
  92. rts.chunkInCh,
  93. rts.chunkOutCh,
  94. rts.chunkPeerErrCh,
  95. )
  96. rts.blockChannel = p2p.NewChannel(
  97. LightBlockChannel,
  98. new(ssproto.Message),
  99. rts.blockInCh,
  100. rts.blockOutCh,
  101. rts.blockPeerErrCh,
  102. )
  103. rts.stateStore = &smmocks.Store{}
  104. rts.blockStore = store.NewBlockStore(dbm.NewMemDB())
  105. rts.reactor = NewReactor(
  106. log.TestingLogger(),
  107. conn,
  108. connQuery,
  109. rts.snapshotChannel,
  110. rts.chunkChannel,
  111. rts.blockChannel,
  112. rts.peerUpdates,
  113. rts.stateStore,
  114. rts.blockStore,
  115. "",
  116. )
  117. // override the dispatcher with one with a shorter timeout
  118. rts.reactor.dispatcher = newDispatcher(rts.blockChannel.Out, 1*time.Second)
  119. rts.syncer = newSyncer(
  120. log.NewNopLogger(),
  121. conn,
  122. connQuery,
  123. stateProvider,
  124. rts.snapshotOutCh,
  125. rts.chunkOutCh,
  126. "",
  127. )
  128. require.NoError(t, rts.reactor.Start())
  129. require.True(t, rts.reactor.IsRunning())
  130. t.Cleanup(func() {
  131. require.NoError(t, rts.reactor.Stop())
  132. require.False(t, rts.reactor.IsRunning())
  133. })
  134. return rts
  135. }
  136. func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
  137. rts := setup(t, nil, nil, nil, 2)
  138. rts.chunkInCh <- p2p.Envelope{
  139. From: p2p.NodeID("aa"),
  140. Message: &ssproto.SnapshotsRequest{},
  141. }
  142. response := <-rts.chunkPeerErrCh
  143. require.Error(t, response.Err)
  144. require.Empty(t, rts.chunkOutCh)
  145. require.Contains(t, response.Err.Error(), "received unknown message")
  146. require.Equal(t, p2p.NodeID("aa"), response.NodeID)
  147. }
  148. func TestReactor_ChunkRequest(t *testing.T) {
  149. testcases := map[string]struct {
  150. request *ssproto.ChunkRequest
  151. chunk []byte
  152. expectResponse *ssproto.ChunkResponse
  153. }{
  154. "chunk is returned": {
  155. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  156. []byte{1, 2, 3},
  157. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 2, 3}},
  158. },
  159. "empty chunk is returned, as empty": {
  160. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  161. []byte{},
  162. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{}},
  163. },
  164. "nil (missing) chunk is returned as missing": {
  165. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  166. nil,
  167. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true},
  168. },
  169. "invalid request": {
  170. &ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
  171. nil,
  172. &ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true},
  173. },
  174. }
  175. for name, tc := range testcases {
  176. tc := tc
  177. t.Run(name, func(t *testing.T) {
  178. // mock ABCI connection to return local snapshots
  179. conn := &proxymocks.AppConnSnapshot{}
  180. conn.On("LoadSnapshotChunkSync", context.Background(), abci.RequestLoadSnapshotChunk{
  181. Height: tc.request.Height,
  182. Format: tc.request.Format,
  183. Chunk: tc.request.Index,
  184. }).Return(&abci.ResponseLoadSnapshotChunk{Chunk: tc.chunk}, nil)
  185. rts := setup(t, conn, nil, nil, 2)
  186. rts.chunkInCh <- p2p.Envelope{
  187. From: p2p.NodeID("aa"),
  188. Message: tc.request,
  189. }
  190. response := <-rts.chunkOutCh
  191. require.Equal(t, tc.expectResponse, response.Message)
  192. require.Empty(t, rts.chunkOutCh)
  193. conn.AssertExpectations(t)
  194. })
  195. }
  196. }
  197. func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) {
  198. rts := setup(t, nil, nil, nil, 2)
  199. rts.snapshotInCh <- p2p.Envelope{
  200. From: p2p.NodeID("aa"),
  201. Message: &ssproto.ChunkRequest{},
  202. }
  203. response := <-rts.snapshotPeerErrCh
  204. require.Error(t, response.Err)
  205. require.Empty(t, rts.snapshotOutCh)
  206. require.Contains(t, response.Err.Error(), "received unknown message")
  207. require.Equal(t, p2p.NodeID("aa"), response.NodeID)
  208. }
  209. func TestReactor_SnapshotsRequest(t *testing.T) {
  210. testcases := map[string]struct {
  211. snapshots []*abci.Snapshot
  212. expectResponses []*ssproto.SnapshotsResponse
  213. }{
  214. "no snapshots": {nil, []*ssproto.SnapshotsResponse{}},
  215. ">10 unordered snapshots": {
  216. []*abci.Snapshot{
  217. {Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1}},
  218. {Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
  219. {Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
  220. {Height: 1, Format: 1, Chunks: 7, Hash: []byte{1, 1}, Metadata: []byte{4}},
  221. {Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
  222. {Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
  223. {Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
  224. {Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
  225. {Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
  226. {Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
  227. {Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
  228. {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
  229. },
  230. []*ssproto.SnapshotsResponse{
  231. {Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
  232. {Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
  233. {Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
  234. {Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
  235. {Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
  236. {Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
  237. {Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
  238. {Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
  239. {Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
  240. {Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
  241. },
  242. },
  243. }
  244. for name, tc := range testcases {
  245. tc := tc
  246. t.Run(name, func(t *testing.T) {
  247. // mock ABCI connection to return local snapshots
  248. conn := &proxymocks.AppConnSnapshot{}
  249. conn.On("ListSnapshotsSync", context.Background(), abci.RequestListSnapshots{}).Return(&abci.ResponseListSnapshots{
  250. Snapshots: tc.snapshots,
  251. }, nil)
  252. rts := setup(t, conn, nil, nil, 100)
  253. rts.snapshotInCh <- p2p.Envelope{
  254. From: p2p.NodeID("aa"),
  255. Message: &ssproto.SnapshotsRequest{},
  256. }
  257. if len(tc.expectResponses) > 0 {
  258. retryUntil(t, func() bool { return len(rts.snapshotOutCh) == len(tc.expectResponses) }, time.Second)
  259. }
  260. responses := make([]*ssproto.SnapshotsResponse, len(tc.expectResponses))
  261. for i := 0; i < len(tc.expectResponses); i++ {
  262. e := <-rts.snapshotOutCh
  263. responses[i] = e.Message.(*ssproto.SnapshotsResponse)
  264. }
  265. require.Equal(t, tc.expectResponses, responses)
  266. require.Empty(t, rts.snapshotOutCh)
  267. })
  268. }
  269. }
  270. func TestReactor_LightBlockResponse(t *testing.T) {
  271. rts := setup(t, nil, nil, nil, 2)
  272. var height int64 = 10
  273. h := factory.MakeRandomHeader()
  274. h.Height = height
  275. blockID := factory.MakeBlockIDWithHash(h.Hash())
  276. vals, pv := factory.RandValidatorSet(1, 10)
  277. vote, err := factory.MakeVote(pv[0], h.ChainID, 0, h.Height, 0, 2,
  278. blockID, factory.DefaultTestTime)
  279. require.NoError(t, err)
  280. sh := &types.SignedHeader{
  281. Header: h,
  282. Commit: &types.Commit{
  283. Height: h.Height,
  284. BlockID: blockID,
  285. Signatures: []types.CommitSig{
  286. vote.CommitSig(),
  287. },
  288. },
  289. }
  290. lb := &types.LightBlock{
  291. SignedHeader: sh,
  292. ValidatorSet: vals,
  293. }
  294. require.NoError(t, rts.blockStore.SaveSignedHeader(sh, blockID))
  295. rts.stateStore.On("LoadValidators", height).Return(vals, nil)
  296. rts.blockInCh <- p2p.Envelope{
  297. From: p2p.NodeID("aa"),
  298. Message: &ssproto.LightBlockRequest{
  299. Height: 10,
  300. },
  301. }
  302. require.Empty(t, rts.blockPeerErrCh)
  303. select {
  304. case response := <-rts.blockOutCh:
  305. require.Equal(t, p2p.NodeID("aa"), response.To)
  306. res, ok := response.Message.(*ssproto.LightBlockResponse)
  307. require.True(t, ok)
  308. receivedLB, err := types.LightBlockFromProto(res.LightBlock)
  309. require.NoError(t, err)
  310. require.Equal(t, lb, receivedLB)
  311. case <-time.After(1 * time.Second):
  312. t.Fatal("expected light block response")
  313. }
  314. }
  315. func TestReactor_Dispatcher(t *testing.T) {
  316. rts := setup(t, nil, nil, nil, 2)
  317. rts.peerUpdateCh <- p2p.PeerUpdate{
  318. NodeID: p2p.NodeID("aa"),
  319. Status: p2p.PeerStatusUp,
  320. }
  321. rts.peerUpdateCh <- p2p.PeerUpdate{
  322. NodeID: p2p.NodeID("bb"),
  323. Status: p2p.PeerStatusUp,
  324. }
  325. closeCh := make(chan struct{})
  326. defer close(closeCh)
  327. chain := buildLightBlockChain(t, 1, 10, time.Now())
  328. go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
  329. dispatcher := rts.reactor.Dispatcher()
  330. providers := dispatcher.Providers(factory.DefaultTestChainID, 5*time.Second)
  331. require.Len(t, providers, 2)
  332. wg := sync.WaitGroup{}
  333. for _, p := range providers {
  334. wg.Add(1)
  335. go func(t *testing.T, p provider.Provider) {
  336. defer wg.Done()
  337. for height := 2; height < 10; height++ {
  338. lb, err := p.LightBlock(context.Background(), int64(height))
  339. require.NoError(t, err)
  340. require.NotNil(t, lb)
  341. require.Equal(t, height, int(lb.Height))
  342. }
  343. }(t, p)
  344. }
  345. ctx, cancel := context.WithCancel(context.Background())
  346. go func() { wg.Wait(); cancel() }()
  347. select {
  348. case <-time.After(time.Second):
  349. // not all of the requests to the dispatcher were responded to
  350. // within the timeout
  351. t.Fail()
  352. case <-ctx.Done():
  353. }
  354. }
  355. func TestReactor_Backfill(t *testing.T) {
  356. // test backfill algorithm with varying failure rates [0, 10]
  357. failureRates := []int{0, 3, 9}
  358. for _, failureRate := range failureRates {
  359. failureRate := failureRate
  360. t.Run(fmt.Sprintf("failure rate: %d", failureRate), func(t *testing.T) {
  361. // t.Cleanup(leaktest.Check(t))
  362. rts := setup(t, nil, nil, nil, 21)
  363. var (
  364. startHeight int64 = 20
  365. stopHeight int64 = 10
  366. stopTime = time.Date(2020, 1, 1, 0, 100, 0, 0, time.UTC)
  367. )
  368. peers := []string{"a", "b", "c", "d"}
  369. for _, peer := range peers {
  370. rts.peerUpdateCh <- p2p.PeerUpdate{
  371. NodeID: p2p.NodeID(peer),
  372. Status: p2p.PeerStatusUp,
  373. }
  374. }
  375. trackingHeight := startHeight
  376. rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"),
  377. mock.AnythingOfType("*types.ValidatorSet")).Return(func(lh, uh int64, vals *types.ValidatorSet) error {
  378. require.Equal(t, trackingHeight, lh)
  379. require.Equal(t, lh, uh)
  380. require.GreaterOrEqual(t, lh, stopHeight)
  381. trackingHeight--
  382. return nil
  383. })
  384. chain := buildLightBlockChain(t, stopHeight-1, startHeight+1, stopTime)
  385. closeCh := make(chan struct{})
  386. defer close(closeCh)
  387. go handleLightBlockRequests(t, chain, rts.blockOutCh,
  388. rts.blockInCh, closeCh, failureRate)
  389. err := rts.reactor.backfill(
  390. context.Background(),
  391. factory.DefaultTestChainID,
  392. startHeight,
  393. stopHeight,
  394. factory.MakeBlockIDWithHash(chain[startHeight].Header.Hash()),
  395. stopTime,
  396. )
  397. if failureRate > 5 {
  398. require.Error(t, err)
  399. } else {
  400. require.NoError(t, err)
  401. for height := startHeight; height <= stopHeight; height++ {
  402. blockMeta := rts.blockStore.LoadBlockMeta(height)
  403. require.NotNil(t, blockMeta)
  404. }
  405. require.Nil(t, rts.blockStore.LoadBlockMeta(stopHeight-1))
  406. require.Nil(t, rts.blockStore.LoadBlockMeta(startHeight+1))
  407. }
  408. })
  409. }
  410. }
  411. // retryUntil will continue to evaluate fn and will return successfully when true
  412. // or fail when the timeout is reached.
  413. func retryUntil(t *testing.T, fn func() bool, timeout time.Duration) {
  414. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  415. defer cancel()
  416. for {
  417. if fn() {
  418. return
  419. }
  420. require.NoError(t, ctx.Err())
  421. }
  422. }
  423. func handleLightBlockRequests(t *testing.T,
  424. chain map[int64]*types.LightBlock,
  425. receiving chan p2p.Envelope,
  426. sending chan p2p.Envelope,
  427. close chan struct{},
  428. failureRate int) {
  429. requests := 0
  430. for {
  431. select {
  432. case envelope := <-receiving:
  433. if msg, ok := envelope.Message.(*ssproto.LightBlockRequest); ok {
  434. if requests%10 >= failureRate {
  435. lb, err := chain[int64(msg.Height)].ToProto()
  436. require.NoError(t, err)
  437. sending <- p2p.Envelope{
  438. From: envelope.To,
  439. Message: &ssproto.LightBlockResponse{
  440. LightBlock: lb,
  441. },
  442. }
  443. } else {
  444. switch rand.Intn(3) {
  445. case 0: // send a different block
  446. differntLB, err := mockLB(t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID()).ToProto()
  447. require.NoError(t, err)
  448. sending <- p2p.Envelope{
  449. From: envelope.To,
  450. Message: &ssproto.LightBlockResponse{
  451. LightBlock: differntLB,
  452. },
  453. }
  454. case 1: // send nil block i.e. pretend we don't have it
  455. sending <- p2p.Envelope{
  456. From: envelope.To,
  457. Message: &ssproto.LightBlockResponse{
  458. LightBlock: nil,
  459. },
  460. }
  461. case 2: // don't do anything
  462. }
  463. }
  464. }
  465. case <-close:
  466. return
  467. }
  468. requests++
  469. }
  470. }
  471. func buildLightBlockChain(t *testing.T, fromHeight, toHeight int64, startTime time.Time) map[int64]*types.LightBlock {
  472. chain := make(map[int64]*types.LightBlock, toHeight-fromHeight)
  473. lastBlockID := factory.MakeBlockID()
  474. blockTime := startTime.Add(-5 * time.Minute)
  475. for height := fromHeight; height < toHeight; height++ {
  476. chain[height] = mockLB(t, height, blockTime, lastBlockID)
  477. lastBlockID = factory.MakeBlockIDWithHash(chain[height].Header.Hash())
  478. blockTime = blockTime.Add(1 * time.Minute)
  479. }
  480. return chain
  481. }
  482. func mockLB(t *testing.T, height int64, time time.Time,
  483. lastBlockID types.BlockID) *types.LightBlock {
  484. header, err := factory.MakeHeader(&types.Header{
  485. Height: height,
  486. LastBlockID: lastBlockID,
  487. Time: time,
  488. })
  489. require.NoError(t, err)
  490. vals, pv := factory.RandValidatorSet(3, 10)
  491. header.ValidatorsHash = vals.Hash()
  492. lastBlockID = factory.MakeBlockIDWithHash(header.Hash())
  493. voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, vals)
  494. commit, err := factory.MakeCommit(lastBlockID, height, 0, voteSet, pv, time)
  495. require.NoError(t, err)
  496. return &types.LightBlock{
  497. SignedHeader: &types.SignedHeader{
  498. Header: header,
  499. Commit: commit,
  500. },
  501. ValidatorSet: vals,
  502. }
  503. }