You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

702 lines
25 KiB

  1. package statesync
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/mock"
  10. "github.com/stretchr/testify/require"
  11. abci "github.com/tendermint/tendermint/abci/types"
  12. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  13. "github.com/tendermint/tendermint/internal/statesync/mocks"
  14. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  15. "github.com/tendermint/tendermint/proxy"
  16. proxymocks "github.com/tendermint/tendermint/proxy/mocks"
  17. sm "github.com/tendermint/tendermint/state"
  18. "github.com/tendermint/tendermint/types"
  19. "github.com/tendermint/tendermint/version"
  20. )
  21. var ctx = context.Background()
  22. func TestSyncer_SyncAny(t *testing.T) {
  23. state := sm.State{
  24. ChainID: "chain",
  25. Version: sm.Version{
  26. Consensus: version.Consensus{
  27. Block: version.BlockProtocol,
  28. App: 0,
  29. },
  30. Software: version.TMVersion,
  31. },
  32. LastBlockHeight: 1,
  33. LastBlockID: types.BlockID{Hash: []byte("blockhash")},
  34. LastBlockTime: time.Now(),
  35. LastResultsHash: []byte("last_results_hash"),
  36. AppHash: []byte("app_hash"),
  37. LastValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val1")}},
  38. Validators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val2")}},
  39. NextValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val3")}},
  40. ConsensusParams: *types.DefaultConsensusParams(),
  41. LastHeightConsensusParamsChanged: 1,
  42. }
  43. commit := &types.Commit{BlockID: types.BlockID{Hash: []byte("blockhash")}}
  44. chunks := []*chunk{
  45. {Height: 1, Format: 1, Index: 0, Chunk: []byte{1, 1, 0}},
  46. {Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 1, 1}},
  47. {Height: 1, Format: 1, Index: 2, Chunk: []byte{1, 1, 2}},
  48. }
  49. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  50. stateProvider := &mocks.StateProvider{}
  51. stateProvider.On("AppHash", mock.Anything, uint64(1)).Return(state.AppHash, nil)
  52. stateProvider.On("AppHash", mock.Anything, uint64(2)).Return([]byte("app_hash_2"), nil)
  53. stateProvider.On("Commit", mock.Anything, uint64(1)).Return(commit, nil)
  54. stateProvider.On("State", mock.Anything, uint64(1)).Return(state, nil)
  55. connSnapshot := &proxymocks.AppConnSnapshot{}
  56. connQuery := &proxymocks.AppConnQuery{}
  57. peerAID := types.NodeID("aa")
  58. peerBID := types.NodeID("bb")
  59. peerCID := types.NodeID("cc")
  60. rts := setup(t, connSnapshot, connQuery, stateProvider, 3)
  61. // Adding a chunk should error when no sync is in progress
  62. _, err := rts.syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}})
  63. require.Error(t, err)
  64. // Adding a couple of peers should trigger snapshot discovery messages
  65. rts.syncer.AddPeer(peerAID)
  66. e := <-rts.snapshotOutCh
  67. require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)
  68. require.Equal(t, peerAID, e.To)
  69. rts.syncer.AddPeer(peerBID)
  70. e = <-rts.snapshotOutCh
  71. require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)
  72. require.Equal(t, peerBID, e.To)
  73. // Both peers report back with snapshots. One of them also returns a snapshot we don't want, in
  74. // format 2, which will be rejected by the ABCI application.
  75. new, err := rts.syncer.AddSnapshot(peerAID, s)
  76. require.NoError(t, err)
  77. require.True(t, new)
  78. new, err = rts.syncer.AddSnapshot(peerBID, s)
  79. require.NoError(t, err)
  80. require.False(t, new)
  81. s2 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1}}
  82. new, err = rts.syncer.AddSnapshot(peerBID, s2)
  83. require.NoError(t, err)
  84. require.True(t, new)
  85. new, err = rts.syncer.AddSnapshot(peerCID, s2)
  86. require.NoError(t, err)
  87. require.False(t, new)
  88. // We start a sync, with peers sending back chunks when requested. We first reject the snapshot
  89. // with height 2 format 2, and accept the snapshot at height 1.
  90. connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  91. Snapshot: &abci.Snapshot{
  92. Height: 2,
  93. Format: 2,
  94. Chunks: 3,
  95. Hash: []byte{1},
  96. },
  97. AppHash: []byte("app_hash_2"),
  98. }).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  99. connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  100. Snapshot: &abci.Snapshot{
  101. Height: s.Height,
  102. Format: s.Format,
  103. Chunks: s.Chunks,
  104. Hash: s.Hash,
  105. Metadata: s.Metadata,
  106. },
  107. AppHash: []byte("app_hash"),
  108. }).Times(2).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil)
  109. chunkRequests := make(map[uint32]int)
  110. chunkRequestsMtx := tmsync.Mutex{}
  111. var wg sync.WaitGroup
  112. wg.Add(4)
  113. go func() {
  114. for e := range rts.chunkOutCh {
  115. msg, ok := e.Message.(*ssproto.ChunkRequest)
  116. assert.True(t, ok)
  117. assert.EqualValues(t, 1, msg.Height)
  118. assert.EqualValues(t, 1, msg.Format)
  119. assert.LessOrEqual(t, msg.Index, uint32(len(chunks)))
  120. added, err := rts.syncer.AddChunk(chunks[msg.Index])
  121. assert.NoError(t, err)
  122. assert.True(t, added)
  123. chunkRequestsMtx.Lock()
  124. chunkRequests[msg.Index]++
  125. chunkRequestsMtx.Unlock()
  126. wg.Done()
  127. }
  128. }()
  129. // The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
  130. // which should cause it to keep the existing chunk 0 and 2, and restart restoration from
  131. // beginning. We also wait for a little while, to exercise the retry logic in fetchChunks().
  132. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  133. Index: 2, Chunk: []byte{1, 1, 2},
  134. }).Once().Run(func(args mock.Arguments) { time.Sleep(2 * time.Second) }).Return(
  135. &abci.ResponseApplySnapshotChunk{
  136. Result: abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT,
  137. RefetchChunks: []uint32{1},
  138. }, nil)
  139. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  140. Index: 0, Chunk: []byte{1, 1, 0},
  141. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  142. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  143. Index: 1, Chunk: []byte{1, 1, 1},
  144. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  145. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  146. Index: 2, Chunk: []byte{1, 1, 2},
  147. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  148. connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(&abci.ResponseInfo{
  149. AppVersion: 9,
  150. LastBlockHeight: 1,
  151. LastBlockAppHash: []byte("app_hash"),
  152. }, nil)
  153. newState, lastCommit, err := rts.syncer.SyncAny(ctx, 0, func() {})
  154. require.NoError(t, err)
  155. wg.Wait()
  156. chunkRequestsMtx.Lock()
  157. require.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests)
  158. chunkRequestsMtx.Unlock()
  159. // The syncer should have updated the state app version from the ABCI info response.
  160. expectState := state
  161. expectState.Version.Consensus.App = 9
  162. require.Equal(t, expectState, newState)
  163. require.Equal(t, commit, lastCommit)
  164. connSnapshot.AssertExpectations(t)
  165. connQuery.AssertExpectations(t)
  166. }
  167. func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
  168. stateProvider := &mocks.StateProvider{}
  169. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  170. rts := setup(t, nil, nil, stateProvider, 2)
  171. _, _, err := rts.syncer.SyncAny(ctx, 0, func() {})
  172. require.Equal(t, errNoSnapshots, err)
  173. }
  174. func TestSyncer_SyncAny_abort(t *testing.T) {
  175. stateProvider := &mocks.StateProvider{}
  176. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  177. rts := setup(t, nil, nil, stateProvider, 2)
  178. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  179. peerID := types.NodeID("aa")
  180. _, err := rts.syncer.AddSnapshot(peerID, s)
  181. require.NoError(t, err)
  182. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  183. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  184. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  185. _, _, err = rts.syncer.SyncAny(ctx, 0, func() {})
  186. require.Equal(t, errAbort, err)
  187. rts.conn.AssertExpectations(t)
  188. }
  189. func TestSyncer_SyncAny_reject(t *testing.T) {
  190. stateProvider := &mocks.StateProvider{}
  191. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  192. rts := setup(t, nil, nil, stateProvider, 2)
  193. // s22 is tried first, then s12, then s11, then errNoSnapshots
  194. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  195. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  196. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  197. peerID := types.NodeID("aa")
  198. _, err := rts.syncer.AddSnapshot(peerID, s22)
  199. require.NoError(t, err)
  200. _, err = rts.syncer.AddSnapshot(peerID, s12)
  201. require.NoError(t, err)
  202. _, err = rts.syncer.AddSnapshot(peerID, s11)
  203. require.NoError(t, err)
  204. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  205. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  206. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  207. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  208. Snapshot: toABCI(s12), AppHash: []byte("app_hash"),
  209. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  210. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  211. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  212. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  213. _, _, err = rts.syncer.SyncAny(ctx, 0, func() {})
  214. require.Equal(t, errNoSnapshots, err)
  215. rts.conn.AssertExpectations(t)
  216. }
  217. func TestSyncer_SyncAny_reject_format(t *testing.T) {
  218. stateProvider := &mocks.StateProvider{}
  219. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  220. rts := setup(t, nil, nil, stateProvider, 2)
  221. // s22 is tried first, which reject s22 and s12, then s11 will abort.
  222. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  223. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  224. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  225. peerID := types.NodeID("aa")
  226. _, err := rts.syncer.AddSnapshot(peerID, s22)
  227. require.NoError(t, err)
  228. _, err = rts.syncer.AddSnapshot(peerID, s12)
  229. require.NoError(t, err)
  230. _, err = rts.syncer.AddSnapshot(peerID, s11)
  231. require.NoError(t, err)
  232. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  233. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  234. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  235. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  236. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  237. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  238. _, _, err = rts.syncer.SyncAny(ctx, 0, func() {})
  239. require.Equal(t, errAbort, err)
  240. rts.conn.AssertExpectations(t)
  241. }
  242. func TestSyncer_SyncAny_reject_sender(t *testing.T) {
  243. stateProvider := &mocks.StateProvider{}
  244. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  245. rts := setup(t, nil, nil, stateProvider, 2)
  246. peerAID := types.NodeID("aa")
  247. peerBID := types.NodeID("bb")
  248. peerCID := types.NodeID("cc")
  249. // sbc will be offered first, which will be rejected with reject_sender, causing all snapshots
  250. // submitted by both b and c (i.e. sb, sc, sbc) to be rejected. Finally, sa will reject and
  251. // errNoSnapshots is returned.
  252. sa := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  253. sb := &snapshot{Height: 2, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  254. sc := &snapshot{Height: 3, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  255. sbc := &snapshot{Height: 4, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  256. _, err := rts.syncer.AddSnapshot(peerAID, sa)
  257. require.NoError(t, err)
  258. _, err = rts.syncer.AddSnapshot(peerBID, sb)
  259. require.NoError(t, err)
  260. _, err = rts.syncer.AddSnapshot(peerCID, sc)
  261. require.NoError(t, err)
  262. _, err = rts.syncer.AddSnapshot(peerBID, sbc)
  263. require.NoError(t, err)
  264. _, err = rts.syncer.AddSnapshot(peerCID, sbc)
  265. require.NoError(t, err)
  266. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  267. Snapshot: toABCI(sbc), AppHash: []byte("app_hash"),
  268. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_SENDER}, nil)
  269. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  270. Snapshot: toABCI(sa), AppHash: []byte("app_hash"),
  271. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  272. _, _, err = rts.syncer.SyncAny(ctx, 0, func() {})
  273. require.Equal(t, errNoSnapshots, err)
  274. rts.conn.AssertExpectations(t)
  275. }
  276. func TestSyncer_SyncAny_abciError(t *testing.T) {
  277. stateProvider := &mocks.StateProvider{}
  278. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  279. rts := setup(t, nil, nil, stateProvider, 2)
  280. errBoom := errors.New("boom")
  281. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  282. peerID := types.NodeID("aa")
  283. _, err := rts.syncer.AddSnapshot(peerID, s)
  284. require.NoError(t, err)
  285. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  286. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  287. }).Once().Return(nil, errBoom)
  288. _, _, err = rts.syncer.SyncAny(ctx, 0, func() {})
  289. require.True(t, errors.Is(err, errBoom))
  290. rts.conn.AssertExpectations(t)
  291. }
  292. func TestSyncer_offerSnapshot(t *testing.T) {
  293. unknownErr := errors.New("unknown error")
  294. boom := errors.New("boom")
  295. testcases := map[string]struct {
  296. result abci.ResponseOfferSnapshot_Result
  297. err error
  298. expectErr error
  299. }{
  300. "accept": {abci.ResponseOfferSnapshot_ACCEPT, nil, nil},
  301. "abort": {abci.ResponseOfferSnapshot_ABORT, nil, errAbort},
  302. "reject": {abci.ResponseOfferSnapshot_REJECT, nil, errRejectSnapshot},
  303. "reject_format": {abci.ResponseOfferSnapshot_REJECT_FORMAT, nil, errRejectFormat},
  304. "reject_sender": {abci.ResponseOfferSnapshot_REJECT_SENDER, nil, errRejectSender},
  305. "unknown": {abci.ResponseOfferSnapshot_UNKNOWN, nil, unknownErr},
  306. "error": {0, boom, boom},
  307. "unknown non-zero": {9, nil, unknownErr},
  308. }
  309. for name, tc := range testcases {
  310. tc := tc
  311. t.Run(name, func(t *testing.T) {
  312. stateProvider := &mocks.StateProvider{}
  313. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  314. rts := setup(t, nil, nil, stateProvider, 2)
  315. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  316. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  317. Snapshot: toABCI(s),
  318. AppHash: []byte("app_hash"),
  319. }).Return(&abci.ResponseOfferSnapshot{Result: tc.result}, tc.err)
  320. err := rts.syncer.offerSnapshot(ctx, s)
  321. if tc.expectErr == unknownErr {
  322. require.Error(t, err)
  323. } else {
  324. unwrapped := errors.Unwrap(err)
  325. if unwrapped != nil {
  326. err = unwrapped
  327. }
  328. require.Equal(t, tc.expectErr, err)
  329. }
  330. })
  331. }
  332. }
  333. func TestSyncer_applyChunks_Results(t *testing.T) {
  334. unknownErr := errors.New("unknown error")
  335. boom := errors.New("boom")
  336. testcases := map[string]struct {
  337. result abci.ResponseApplySnapshotChunk_Result
  338. err error
  339. expectErr error
  340. }{
  341. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT, nil, nil},
  342. "abort": {abci.ResponseApplySnapshotChunk_ABORT, nil, errAbort},
  343. "retry": {abci.ResponseApplySnapshotChunk_RETRY, nil, nil},
  344. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT, nil, errRetrySnapshot},
  345. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT, nil, errRejectSnapshot},
  346. "unknown": {abci.ResponseApplySnapshotChunk_UNKNOWN, nil, unknownErr},
  347. "error": {0, boom, boom},
  348. "unknown non-zero": {9, nil, unknownErr},
  349. }
  350. for name, tc := range testcases {
  351. tc := tc
  352. t.Run(name, func(t *testing.T) {
  353. stateProvider := &mocks.StateProvider{}
  354. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  355. rts := setup(t, nil, nil, stateProvider, 2)
  356. body := []byte{1, 2, 3}
  357. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "")
  358. require.NoError(t, err)
  359. _, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: body})
  360. require.NoError(t, err)
  361. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  362. Index: 0, Chunk: body,
  363. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: tc.result}, tc.err)
  364. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  365. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  366. Index: 0, Chunk: body,
  367. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  368. Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  369. }
  370. err = rts.syncer.applyChunks(ctx, chunks)
  371. if tc.expectErr == unknownErr {
  372. require.Error(t, err)
  373. } else {
  374. unwrapped := errors.Unwrap(err)
  375. if unwrapped != nil {
  376. err = unwrapped
  377. }
  378. require.Equal(t, tc.expectErr, err)
  379. }
  380. rts.conn.AssertExpectations(t)
  381. })
  382. }
  383. }
  384. func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
  385. // Discarding chunks via refetch_chunks should work the same for all results
  386. testcases := map[string]struct {
  387. result abci.ResponseApplySnapshotChunk_Result
  388. }{
  389. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  390. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  391. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  392. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  393. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  394. }
  395. for name, tc := range testcases {
  396. tc := tc
  397. t.Run(name, func(t *testing.T) {
  398. stateProvider := &mocks.StateProvider{}
  399. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  400. rts := setup(t, nil, nil, stateProvider, 2)
  401. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "")
  402. require.NoError(t, err)
  403. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}})
  404. require.True(t, added)
  405. require.NoError(t, err)
  406. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}})
  407. require.True(t, added)
  408. require.NoError(t, err)
  409. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}})
  410. require.True(t, added)
  411. require.NoError(t, err)
  412. // The first two chunks are accepted, before the last one asks for 1 to be refetched
  413. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  414. Index: 0, Chunk: []byte{0},
  415. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  416. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  417. Index: 1, Chunk: []byte{1},
  418. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  419. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  420. Index: 2, Chunk: []byte{2},
  421. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  422. Result: tc.result,
  423. RefetchChunks: []uint32{1},
  424. }, nil)
  425. // Since removing the chunk will cause Next() to block, we spawn a goroutine, then
  426. // check the queue contents, and finally close the queue to end the goroutine.
  427. // We don't really care about the result of applyChunks, since it has separate test.
  428. go func() {
  429. rts.syncer.applyChunks(ctx, chunks) //nolint:errcheck // purposefully ignore error
  430. }()
  431. time.Sleep(50 * time.Millisecond)
  432. require.True(t, chunks.Has(0))
  433. require.False(t, chunks.Has(1))
  434. require.True(t, chunks.Has(2))
  435. require.NoError(t, chunks.Close())
  436. })
  437. }
  438. }
  439. func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
  440. // Banning chunks senders via ban_chunk_senders should work the same for all results
  441. testcases := map[string]struct {
  442. result abci.ResponseApplySnapshotChunk_Result
  443. }{
  444. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  445. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  446. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  447. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  448. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  449. }
  450. for name, tc := range testcases {
  451. tc := tc
  452. t.Run(name, func(t *testing.T) {
  453. stateProvider := &mocks.StateProvider{}
  454. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  455. rts := setup(t, nil, nil, stateProvider, 2)
  456. // Set up three peers across two snapshots, and ask for one of them to be banned.
  457. // It should be banned from all snapshots.
  458. peerAID := types.NodeID("aa")
  459. peerBID := types.NodeID("bb")
  460. peerCID := types.NodeID("cc")
  461. s1 := &snapshot{Height: 1, Format: 1, Chunks: 3}
  462. s2 := &snapshot{Height: 2, Format: 1, Chunks: 3}
  463. _, err := rts.syncer.AddSnapshot(peerAID, s1)
  464. require.NoError(t, err)
  465. _, err = rts.syncer.AddSnapshot(peerAID, s2)
  466. require.NoError(t, err)
  467. _, err = rts.syncer.AddSnapshot(peerBID, s1)
  468. require.NoError(t, err)
  469. _, err = rts.syncer.AddSnapshot(peerBID, s2)
  470. require.NoError(t, err)
  471. _, err = rts.syncer.AddSnapshot(peerCID, s1)
  472. require.NoError(t, err)
  473. _, err = rts.syncer.AddSnapshot(peerCID, s2)
  474. require.NoError(t, err)
  475. chunks, err := newChunkQueue(s1, "")
  476. require.NoError(t, err)
  477. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}, Sender: peerAID})
  478. require.True(t, added)
  479. require.NoError(t, err)
  480. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}, Sender: peerBID})
  481. require.True(t, added)
  482. require.NoError(t, err)
  483. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}, Sender: peerCID})
  484. require.True(t, added)
  485. require.NoError(t, err)
  486. // The first two chunks are accepted, before the last one asks for b sender to be rejected
  487. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  488. Index: 0, Chunk: []byte{0}, Sender: "aa",
  489. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  490. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  491. Index: 1, Chunk: []byte{1}, Sender: "bb",
  492. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  493. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  494. Index: 2, Chunk: []byte{2}, Sender: "cc",
  495. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  496. Result: tc.result,
  497. RejectSenders: []string{string(peerBID)},
  498. }, nil)
  499. // On retry, the last chunk will be tried again, so we just accept it then.
  500. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  501. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  502. Index: 2, Chunk: []byte{2}, Sender: "cc",
  503. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  504. }
  505. // We don't really care about the result of applyChunks, since it has separate test.
  506. // However, it will block on e.g. retry result, so we spawn a goroutine that will
  507. // be shut down when the chunk queue closes.
  508. go func() {
  509. rts.syncer.applyChunks(ctx, chunks) //nolint:errcheck // purposefully ignore error
  510. }()
  511. time.Sleep(50 * time.Millisecond)
  512. s1peers := rts.syncer.snapshots.GetPeers(s1)
  513. require.Len(t, s1peers, 2)
  514. require.EqualValues(t, "aa", s1peers[0])
  515. require.EqualValues(t, "cc", s1peers[1])
  516. rts.syncer.snapshots.GetPeers(s1)
  517. require.Len(t, s1peers, 2)
  518. require.EqualValues(t, "aa", s1peers[0])
  519. require.EqualValues(t, "cc", s1peers[1])
  520. require.NoError(t, chunks.Close())
  521. })
  522. }
  523. }
  524. func TestSyncer_verifyApp(t *testing.T) {
  525. boom := errors.New("boom")
  526. s := &snapshot{Height: 3, Format: 1, Chunks: 5, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  527. testcases := map[string]struct {
  528. response *abci.ResponseInfo
  529. err error
  530. expectErr error
  531. }{
  532. "verified": {&abci.ResponseInfo{
  533. LastBlockHeight: 3,
  534. LastBlockAppHash: []byte("app_hash"),
  535. AppVersion: 9,
  536. }, nil, nil},
  537. "invalid height": {&abci.ResponseInfo{
  538. LastBlockHeight: 5,
  539. LastBlockAppHash: []byte("app_hash"),
  540. AppVersion: 9,
  541. }, nil, errVerifyFailed},
  542. "invalid hash": {&abci.ResponseInfo{
  543. LastBlockHeight: 3,
  544. LastBlockAppHash: []byte("xxx"),
  545. AppVersion: 9,
  546. }, nil, errVerifyFailed},
  547. "error": {nil, boom, boom},
  548. }
  549. for name, tc := range testcases {
  550. tc := tc
  551. t.Run(name, func(t *testing.T) {
  552. rts := setup(t, nil, nil, nil, 2)
  553. rts.connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(tc.response, tc.err)
  554. version, err := rts.syncer.verifyApp(s)
  555. unwrapped := errors.Unwrap(err)
  556. if unwrapped != nil {
  557. err = unwrapped
  558. }
  559. require.Equal(t, tc.expectErr, err)
  560. if err == nil {
  561. require.Equal(t, tc.response.AppVersion, version)
  562. }
  563. })
  564. }
  565. }
  566. func toABCI(s *snapshot) *abci.Snapshot {
  567. return &abci.Snapshot{
  568. Height: s.Height,
  569. Format: s.Format,
  570. Chunks: s.Chunks,
  571. Hash: s.Hash,
  572. Metadata: s.Metadata,
  573. }
  574. }