You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

700 lines
25 KiB

  1. package statesync
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/mock"
  10. "github.com/stretchr/testify/require"
  11. abci "github.com/tendermint/tendermint/abci/types"
  12. tmsync "github.com/tendermint/tendermint/libs/sync"
  13. "github.com/tendermint/tendermint/p2p"
  14. tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
  15. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  16. tmversion "github.com/tendermint/tendermint/proto/tendermint/version"
  17. "github.com/tendermint/tendermint/proxy"
  18. proxymocks "github.com/tendermint/tendermint/proxy/mocks"
  19. sm "github.com/tendermint/tendermint/state"
  20. "github.com/tendermint/tendermint/statesync/mocks"
  21. "github.com/tendermint/tendermint/types"
  22. "github.com/tendermint/tendermint/version"
  23. )
  24. var ctx = context.Background()
  25. func TestSyncer_SyncAny(t *testing.T) {
  26. state := sm.State{
  27. ChainID: "chain",
  28. Version: tmstate.Version{
  29. Consensus: tmversion.Consensus{
  30. Block: version.BlockProtocol,
  31. App: 0,
  32. },
  33. Software: version.TMCoreSemVer,
  34. },
  35. LastBlockHeight: 1,
  36. LastBlockID: types.BlockID{Hash: []byte("blockhash")},
  37. LastBlockTime: time.Now(),
  38. LastResultsHash: []byte("last_results_hash"),
  39. AppHash: []byte("app_hash"),
  40. LastValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val1")}},
  41. Validators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val2")}},
  42. NextValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val3")}},
  43. ConsensusParams: *types.DefaultConsensusParams(),
  44. LastHeightConsensusParamsChanged: 1,
  45. }
  46. commit := &types.Commit{BlockID: types.BlockID{Hash: []byte("blockhash")}}
  47. chunks := []*chunk{
  48. {Height: 1, Format: 1, Index: 0, Chunk: []byte{1, 1, 0}},
  49. {Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 1, 1}},
  50. {Height: 1, Format: 1, Index: 2, Chunk: []byte{1, 1, 2}},
  51. }
  52. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  53. stateProvider := &mocks.StateProvider{}
  54. stateProvider.On("AppHash", mock.Anything, uint64(1)).Return(state.AppHash, nil)
  55. stateProvider.On("AppHash", mock.Anything, uint64(2)).Return([]byte("app_hash_2"), nil)
  56. stateProvider.On("Commit", mock.Anything, uint64(1)).Return(commit, nil)
  57. stateProvider.On("State", mock.Anything, uint64(1)).Return(state, nil)
  58. connSnapshot := &proxymocks.AppConnSnapshot{}
  59. connQuery := &proxymocks.AppConnQuery{}
  60. peerAID := p2p.NodeID("aa")
  61. peerBID := p2p.NodeID("bb")
  62. rts := setup(t, connSnapshot, connQuery, stateProvider, 3)
  63. // Adding a chunk should error when no sync is in progress
  64. _, err := rts.syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}})
  65. require.Error(t, err)
  66. // Adding a couple of peers should trigger snapshot discovery messages
  67. rts.syncer.AddPeer(peerAID)
  68. e := <-rts.snapshotOutCh
  69. require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)
  70. require.Equal(t, peerAID, e.To)
  71. rts.syncer.AddPeer(peerBID)
  72. e = <-rts.snapshotOutCh
  73. require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)
  74. require.Equal(t, peerBID, e.To)
  75. // Both peers report back with snapshots. One of them also returns a snapshot we don't want, in
  76. // format 2, which will be rejected by the ABCI application.
  77. new, err := rts.syncer.AddSnapshot(peerAID, s)
  78. require.NoError(t, err)
  79. require.True(t, new)
  80. new, err = rts.syncer.AddSnapshot(peerBID, s)
  81. require.NoError(t, err)
  82. require.False(t, new)
  83. new, err = rts.syncer.AddSnapshot(peerBID, &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1}})
  84. require.NoError(t, err)
  85. require.True(t, new)
  86. // We start a sync, with peers sending back chunks when requested. We first reject the snapshot
  87. // with height 2 format 2, and accept the snapshot at height 1.
  88. connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  89. Snapshot: &abci.Snapshot{
  90. Height: 2,
  91. Format: 2,
  92. Chunks: 3,
  93. Hash: []byte{1},
  94. },
  95. AppHash: []byte("app_hash_2"),
  96. }).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  97. connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  98. Snapshot: &abci.Snapshot{
  99. Height: s.Height,
  100. Format: s.Format,
  101. Chunks: s.Chunks,
  102. Hash: s.Hash,
  103. Metadata: s.Metadata,
  104. },
  105. AppHash: []byte("app_hash"),
  106. }).Times(2).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil)
  107. chunkRequests := make(map[uint32]int)
  108. chunkRequestsMtx := tmsync.Mutex{}
  109. var wg sync.WaitGroup
  110. wg.Add(4)
  111. go func() {
  112. for e := range rts.chunkOutCh {
  113. msg, ok := e.Message.(*ssproto.ChunkRequest)
  114. assert.True(t, ok)
  115. assert.EqualValues(t, 1, msg.Height)
  116. assert.EqualValues(t, 1, msg.Format)
  117. assert.LessOrEqual(t, msg.Index, uint32(len(chunks)))
  118. added, err := rts.syncer.AddChunk(chunks[msg.Index])
  119. assert.NoError(t, err)
  120. assert.True(t, added)
  121. chunkRequestsMtx.Lock()
  122. chunkRequests[msg.Index]++
  123. chunkRequestsMtx.Unlock()
  124. wg.Done()
  125. }
  126. }()
  127. // The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
  128. // which should cause it to keep the existing chunk 0 and 2, and restart restoration from
  129. // beginning. We also wait for a little while, to exercise the retry logic in fetchChunks().
  130. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  131. Index: 2, Chunk: []byte{1, 1, 2},
  132. }).Once().Run(func(args mock.Arguments) { time.Sleep(2 * time.Second) }).Return(
  133. &abci.ResponseApplySnapshotChunk{
  134. Result: abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT,
  135. RefetchChunks: []uint32{1},
  136. }, nil)
  137. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  138. Index: 0, Chunk: []byte{1, 1, 0},
  139. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  140. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  141. Index: 1, Chunk: []byte{1, 1, 1},
  142. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  143. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  144. Index: 2, Chunk: []byte{1, 1, 2},
  145. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  146. connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(&abci.ResponseInfo{
  147. AppVersion: 9,
  148. LastBlockHeight: 1,
  149. LastBlockAppHash: []byte("app_hash"),
  150. }, nil)
  151. newState, lastCommit, err := rts.syncer.SyncAny(0)
  152. require.NoError(t, err)
  153. wg.Wait()
  154. chunkRequestsMtx.Lock()
  155. require.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests)
  156. chunkRequestsMtx.Unlock()
  157. // The syncer should have updated the state app version from the ABCI info response.
  158. expectState := state
  159. expectState.Version.Consensus.App = 9
  160. require.Equal(t, expectState, newState)
  161. require.Equal(t, commit, lastCommit)
  162. connSnapshot.AssertExpectations(t)
  163. connQuery.AssertExpectations(t)
  164. }
  165. func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
  166. stateProvider := &mocks.StateProvider{}
  167. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  168. rts := setup(t, nil, nil, stateProvider, 2)
  169. _, _, err := rts.syncer.SyncAny(0)
  170. require.Equal(t, errNoSnapshots, err)
  171. }
  172. func TestSyncer_SyncAny_abort(t *testing.T) {
  173. stateProvider := &mocks.StateProvider{}
  174. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  175. rts := setup(t, nil, nil, stateProvider, 2)
  176. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  177. peerID := p2p.NodeID("aa")
  178. _, err := rts.syncer.AddSnapshot(peerID, s)
  179. require.NoError(t, err)
  180. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  181. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  182. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  183. _, _, err = rts.syncer.SyncAny(0)
  184. require.Equal(t, errAbort, err)
  185. rts.conn.AssertExpectations(t)
  186. }
  187. func TestSyncer_SyncAny_reject(t *testing.T) {
  188. stateProvider := &mocks.StateProvider{}
  189. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  190. rts := setup(t, nil, nil, stateProvider, 2)
  191. // s22 is tried first, then s12, then s11, then errNoSnapshots
  192. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  193. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  194. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  195. peerID := p2p.NodeID("aa")
  196. _, err := rts.syncer.AddSnapshot(peerID, s22)
  197. require.NoError(t, err)
  198. _, err = rts.syncer.AddSnapshot(peerID, s12)
  199. require.NoError(t, err)
  200. _, err = rts.syncer.AddSnapshot(peerID, s11)
  201. require.NoError(t, err)
  202. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  203. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  204. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  205. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  206. Snapshot: toABCI(s12), AppHash: []byte("app_hash"),
  207. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  208. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  209. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  210. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  211. _, _, err = rts.syncer.SyncAny(0)
  212. require.Equal(t, errNoSnapshots, err)
  213. rts.conn.AssertExpectations(t)
  214. }
  215. func TestSyncer_SyncAny_reject_format(t *testing.T) {
  216. stateProvider := &mocks.StateProvider{}
  217. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  218. rts := setup(t, nil, nil, stateProvider, 2)
  219. // s22 is tried first, which reject s22 and s12, then s11 will abort.
  220. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  221. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  222. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  223. peerID := p2p.NodeID("aa")
  224. _, err := rts.syncer.AddSnapshot(peerID, s22)
  225. require.NoError(t, err)
  226. _, err = rts.syncer.AddSnapshot(peerID, s12)
  227. require.NoError(t, err)
  228. _, err = rts.syncer.AddSnapshot(peerID, s11)
  229. require.NoError(t, err)
  230. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  231. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  232. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  233. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  234. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  235. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  236. _, _, err = rts.syncer.SyncAny(0)
  237. require.Equal(t, errAbort, err)
  238. rts.conn.AssertExpectations(t)
  239. }
  240. func TestSyncer_SyncAny_reject_sender(t *testing.T) {
  241. stateProvider := &mocks.StateProvider{}
  242. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  243. rts := setup(t, nil, nil, stateProvider, 2)
  244. peerAID := p2p.NodeID("aa")
  245. peerBID := p2p.NodeID("bb")
  246. peerCID := p2p.NodeID("cc")
  247. // sbc will be offered first, which will be rejected with reject_sender, causing all snapshots
  248. // submitted by both b and c (i.e. sb, sc, sbc) to be rejected. Finally, sa will reject and
  249. // errNoSnapshots is returned.
  250. sa := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  251. sb := &snapshot{Height: 2, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  252. sc := &snapshot{Height: 3, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  253. sbc := &snapshot{Height: 4, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  254. _, err := rts.syncer.AddSnapshot(peerAID, sa)
  255. require.NoError(t, err)
  256. _, err = rts.syncer.AddSnapshot(peerBID, sb)
  257. require.NoError(t, err)
  258. _, err = rts.syncer.AddSnapshot(peerCID, sc)
  259. require.NoError(t, err)
  260. _, err = rts.syncer.AddSnapshot(peerBID, sbc)
  261. require.NoError(t, err)
  262. _, err = rts.syncer.AddSnapshot(peerCID, sbc)
  263. require.NoError(t, err)
  264. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  265. Snapshot: toABCI(sbc), AppHash: []byte("app_hash"),
  266. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_SENDER}, nil)
  267. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  268. Snapshot: toABCI(sa), AppHash: []byte("app_hash"),
  269. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  270. _, _, err = rts.syncer.SyncAny(0)
  271. require.Equal(t, errNoSnapshots, err)
  272. rts.conn.AssertExpectations(t)
  273. }
  274. func TestSyncer_SyncAny_abciError(t *testing.T) {
  275. stateProvider := &mocks.StateProvider{}
  276. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  277. rts := setup(t, nil, nil, stateProvider, 2)
  278. errBoom := errors.New("boom")
  279. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  280. peerID := p2p.NodeID("aa")
  281. _, err := rts.syncer.AddSnapshot(peerID, s)
  282. require.NoError(t, err)
  283. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  284. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  285. }).Once().Return(nil, errBoom)
  286. _, _, err = rts.syncer.SyncAny(0)
  287. require.True(t, errors.Is(err, errBoom))
  288. rts.conn.AssertExpectations(t)
  289. }
  290. func TestSyncer_offerSnapshot(t *testing.T) {
  291. unknownErr := errors.New("unknown error")
  292. boom := errors.New("boom")
  293. testcases := map[string]struct {
  294. result abci.ResponseOfferSnapshot_Result
  295. err error
  296. expectErr error
  297. }{
  298. "accept": {abci.ResponseOfferSnapshot_ACCEPT, nil, nil},
  299. "abort": {abci.ResponseOfferSnapshot_ABORT, nil, errAbort},
  300. "reject": {abci.ResponseOfferSnapshot_REJECT, nil, errRejectSnapshot},
  301. "reject_format": {abci.ResponseOfferSnapshot_REJECT_FORMAT, nil, errRejectFormat},
  302. "reject_sender": {abci.ResponseOfferSnapshot_REJECT_SENDER, nil, errRejectSender},
  303. "unknown": {abci.ResponseOfferSnapshot_UNKNOWN, nil, unknownErr},
  304. "error": {0, boom, boom},
  305. "unknown non-zero": {9, nil, unknownErr},
  306. }
  307. for name, tc := range testcases {
  308. tc := tc
  309. t.Run(name, func(t *testing.T) {
  310. stateProvider := &mocks.StateProvider{}
  311. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  312. rts := setup(t, nil, nil, stateProvider, 2)
  313. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  314. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  315. Snapshot: toABCI(s),
  316. AppHash: []byte("app_hash"),
  317. }).Return(&abci.ResponseOfferSnapshot{Result: tc.result}, tc.err)
  318. err := rts.syncer.offerSnapshot(s)
  319. if tc.expectErr == unknownErr {
  320. require.Error(t, err)
  321. } else {
  322. unwrapped := errors.Unwrap(err)
  323. if unwrapped != nil {
  324. err = unwrapped
  325. }
  326. require.Equal(t, tc.expectErr, err)
  327. }
  328. })
  329. }
  330. }
  331. func TestSyncer_applyChunks_Results(t *testing.T) {
  332. unknownErr := errors.New("unknown error")
  333. boom := errors.New("boom")
  334. testcases := map[string]struct {
  335. result abci.ResponseApplySnapshotChunk_Result
  336. err error
  337. expectErr error
  338. }{
  339. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT, nil, nil},
  340. "abort": {abci.ResponseApplySnapshotChunk_ABORT, nil, errAbort},
  341. "retry": {abci.ResponseApplySnapshotChunk_RETRY, nil, nil},
  342. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT, nil, errRetrySnapshot},
  343. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT, nil, errRejectSnapshot},
  344. "unknown": {abci.ResponseApplySnapshotChunk_UNKNOWN, nil, unknownErr},
  345. "error": {0, boom, boom},
  346. "unknown non-zero": {9, nil, unknownErr},
  347. }
  348. for name, tc := range testcases {
  349. tc := tc
  350. t.Run(name, func(t *testing.T) {
  351. stateProvider := &mocks.StateProvider{}
  352. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  353. rts := setup(t, nil, nil, stateProvider, 2)
  354. body := []byte{1, 2, 3}
  355. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "")
  356. require.NoError(t, err)
  357. _, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: body})
  358. require.NoError(t, err)
  359. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  360. Index: 0, Chunk: body,
  361. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: tc.result}, tc.err)
  362. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  363. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  364. Index: 0, Chunk: body,
  365. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  366. Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  367. }
  368. err = rts.syncer.applyChunks(chunks)
  369. if tc.expectErr == unknownErr {
  370. require.Error(t, err)
  371. } else {
  372. unwrapped := errors.Unwrap(err)
  373. if unwrapped != nil {
  374. err = unwrapped
  375. }
  376. require.Equal(t, tc.expectErr, err)
  377. }
  378. rts.conn.AssertExpectations(t)
  379. })
  380. }
  381. }
  382. func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
  383. // Discarding chunks via refetch_chunks should work the same for all results
  384. testcases := map[string]struct {
  385. result abci.ResponseApplySnapshotChunk_Result
  386. }{
  387. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  388. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  389. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  390. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  391. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  392. }
  393. for name, tc := range testcases {
  394. tc := tc
  395. t.Run(name, func(t *testing.T) {
  396. stateProvider := &mocks.StateProvider{}
  397. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  398. rts := setup(t, nil, nil, stateProvider, 2)
  399. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "")
  400. require.NoError(t, err)
  401. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}})
  402. require.True(t, added)
  403. require.NoError(t, err)
  404. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}})
  405. require.True(t, added)
  406. require.NoError(t, err)
  407. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}})
  408. require.True(t, added)
  409. require.NoError(t, err)
  410. // The first two chunks are accepted, before the last one asks for 1 to be refetched
  411. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  412. Index: 0, Chunk: []byte{0},
  413. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  414. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  415. Index: 1, Chunk: []byte{1},
  416. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  417. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  418. Index: 2, Chunk: []byte{2},
  419. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  420. Result: tc.result,
  421. RefetchChunks: []uint32{1},
  422. }, nil)
  423. // Since removing the chunk will cause Next() to block, we spawn a goroutine, then
  424. // check the queue contents, and finally close the queue to end the goroutine.
  425. // We don't really care about the result of applyChunks, since it has separate test.
  426. go func() {
  427. rts.syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error
  428. }()
  429. time.Sleep(50 * time.Millisecond)
  430. require.True(t, chunks.Has(0))
  431. require.False(t, chunks.Has(1))
  432. require.True(t, chunks.Has(2))
  433. require.NoError(t, chunks.Close())
  434. })
  435. }
  436. }
  437. func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
  438. // Banning chunks senders via ban_chunk_senders should work the same for all results
  439. testcases := map[string]struct {
  440. result abci.ResponseApplySnapshotChunk_Result
  441. }{
  442. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  443. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  444. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  445. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  446. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  447. }
  448. for name, tc := range testcases {
  449. tc := tc
  450. t.Run(name, func(t *testing.T) {
  451. stateProvider := &mocks.StateProvider{}
  452. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  453. rts := setup(t, nil, nil, stateProvider, 2)
  454. // Set up three peers across two snapshots, and ask for one of them to be banned.
  455. // It should be banned from all snapshots.
  456. peerAID := p2p.NodeID("aa")
  457. peerBID := p2p.NodeID("bb")
  458. peerCID := p2p.NodeID("cc")
  459. s1 := &snapshot{Height: 1, Format: 1, Chunks: 3}
  460. s2 := &snapshot{Height: 2, Format: 1, Chunks: 3}
  461. _, err := rts.syncer.AddSnapshot(peerAID, s1)
  462. require.NoError(t, err)
  463. _, err = rts.syncer.AddSnapshot(peerAID, s2)
  464. require.NoError(t, err)
  465. _, err = rts.syncer.AddSnapshot(peerBID, s1)
  466. require.NoError(t, err)
  467. _, err = rts.syncer.AddSnapshot(peerBID, s2)
  468. require.NoError(t, err)
  469. _, err = rts.syncer.AddSnapshot(peerCID, s1)
  470. require.NoError(t, err)
  471. _, err = rts.syncer.AddSnapshot(peerCID, s2)
  472. require.NoError(t, err)
  473. chunks, err := newChunkQueue(s1, "")
  474. require.NoError(t, err)
  475. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}, Sender: peerAID})
  476. require.True(t, added)
  477. require.NoError(t, err)
  478. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}, Sender: peerBID})
  479. require.True(t, added)
  480. require.NoError(t, err)
  481. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}, Sender: peerCID})
  482. require.True(t, added)
  483. require.NoError(t, err)
  484. // The first two chunks are accepted, before the last one asks for b sender to be rejected
  485. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  486. Index: 0, Chunk: []byte{0}, Sender: "aa",
  487. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  488. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  489. Index: 1, Chunk: []byte{1}, Sender: "bb",
  490. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  491. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  492. Index: 2, Chunk: []byte{2}, Sender: "cc",
  493. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  494. Result: tc.result,
  495. RejectSenders: []string{string(peerBID)},
  496. }, nil)
  497. // On retry, the last chunk will be tried again, so we just accept it then.
  498. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  499. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  500. Index: 2, Chunk: []byte{2}, Sender: "cc",
  501. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  502. }
  503. // We don't really care about the result of applyChunks, since it has separate test.
  504. // However, it will block on e.g. retry result, so we spawn a goroutine that will
  505. // be shut down when the chunk queue closes.
  506. go func() {
  507. rts.syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error
  508. }()
  509. time.Sleep(50 * time.Millisecond)
  510. s1peers := rts.syncer.snapshots.GetPeers(s1)
  511. require.Len(t, s1peers, 2)
  512. require.EqualValues(t, "aa", s1peers[0])
  513. require.EqualValues(t, "cc", s1peers[1])
  514. rts.syncer.snapshots.GetPeers(s1)
  515. require.Len(t, s1peers, 2)
  516. require.EqualValues(t, "aa", s1peers[0])
  517. require.EqualValues(t, "cc", s1peers[1])
  518. require.NoError(t, chunks.Close())
  519. })
  520. }
  521. }
  522. func TestSyncer_verifyApp(t *testing.T) {
  523. boom := errors.New("boom")
  524. s := &snapshot{Height: 3, Format: 1, Chunks: 5, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  525. testcases := map[string]struct {
  526. response *abci.ResponseInfo
  527. err error
  528. expectErr error
  529. }{
  530. "verified": {&abci.ResponseInfo{
  531. LastBlockHeight: 3,
  532. LastBlockAppHash: []byte("app_hash"),
  533. AppVersion: 9,
  534. }, nil, nil},
  535. "invalid height": {&abci.ResponseInfo{
  536. LastBlockHeight: 5,
  537. LastBlockAppHash: []byte("app_hash"),
  538. AppVersion: 9,
  539. }, nil, errVerifyFailed},
  540. "invalid hash": {&abci.ResponseInfo{
  541. LastBlockHeight: 3,
  542. LastBlockAppHash: []byte("xxx"),
  543. AppVersion: 9,
  544. }, nil, errVerifyFailed},
  545. "error": {nil, boom, boom},
  546. }
  547. for name, tc := range testcases {
  548. tc := tc
  549. t.Run(name, func(t *testing.T) {
  550. rts := setup(t, nil, nil, nil, 2)
  551. rts.connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(tc.response, tc.err)
  552. version, err := rts.syncer.verifyApp(s)
  553. unwrapped := errors.Unwrap(err)
  554. if unwrapped != nil {
  555. err = unwrapped
  556. }
  557. require.Equal(t, tc.expectErr, err)
  558. if err == nil {
  559. require.Equal(t, tc.response.AppVersion, version)
  560. }
  561. })
  562. }
  563. }
  564. func toABCI(s *snapshot) *abci.Snapshot {
  565. return &abci.Snapshot{
  566. Height: s.Height,
  567. Format: s.Format,
  568. Chunks: s.Chunks,
  569. Hash: s.Hash,
  570. Metadata: s.Metadata,
  571. }
  572. }