You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

703 lines
25 KiB

  1. package statesync
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/stretchr/testify/mock"
  10. "github.com/stretchr/testify/require"
  11. abci "github.com/tendermint/tendermint/abci/types"
  12. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  13. "github.com/tendermint/tendermint/p2p"
  14. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  15. "github.com/tendermint/tendermint/proxy"
  16. proxymocks "github.com/tendermint/tendermint/proxy/mocks"
  17. sm "github.com/tendermint/tendermint/state"
  18. "github.com/tendermint/tendermint/statesync/mocks"
  19. "github.com/tendermint/tendermint/types"
  20. "github.com/tendermint/tendermint/version"
  21. )
  22. var ctx = context.Background()
  23. func TestSyncer_SyncAny(t *testing.T) {
  24. state := sm.State{
  25. ChainID: "chain",
  26. Version: sm.Version{
  27. Consensus: version.Consensus{
  28. Block: version.BlockProtocol,
  29. App: 0,
  30. },
  31. Software: version.TMVersion,
  32. },
  33. LastBlockHeight: 1,
  34. LastBlockID: types.BlockID{Hash: []byte("blockhash")},
  35. LastBlockTime: time.Now(),
  36. LastResultsHash: []byte("last_results_hash"),
  37. AppHash: []byte("app_hash"),
  38. LastValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val1")}},
  39. Validators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val2")}},
  40. NextValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val3")}},
  41. ConsensusParams: *types.DefaultConsensusParams(),
  42. LastHeightConsensusParamsChanged: 1,
  43. }
  44. commit := &types.Commit{BlockID: types.BlockID{Hash: []byte("blockhash")}}
  45. chunks := []*chunk{
  46. {Height: 1, Format: 1, Index: 0, Chunk: []byte{1, 1, 0}},
  47. {Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 1, 1}},
  48. {Height: 1, Format: 1, Index: 2, Chunk: []byte{1, 1, 2}},
  49. }
  50. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  51. stateProvider := &mocks.StateProvider{}
  52. stateProvider.On("AppHash", mock.Anything, uint64(1)).Return(state.AppHash, nil)
  53. stateProvider.On("AppHash", mock.Anything, uint64(2)).Return([]byte("app_hash_2"), nil)
  54. stateProvider.On("Commit", mock.Anything, uint64(1)).Return(commit, nil)
  55. stateProvider.On("State", mock.Anything, uint64(1)).Return(state, nil)
  56. connSnapshot := &proxymocks.AppConnSnapshot{}
  57. connQuery := &proxymocks.AppConnQuery{}
  58. peerAID := p2p.NodeID("aa")
  59. peerBID := p2p.NodeID("bb")
  60. peerCID := p2p.NodeID("cc")
  61. rts := setup(t, connSnapshot, connQuery, stateProvider, 3)
  62. // Adding a chunk should error when no sync is in progress
  63. _, err := rts.syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}})
  64. require.Error(t, err)
  65. // Adding a couple of peers should trigger snapshot discovery messages
  66. rts.syncer.AddPeer(peerAID)
  67. e := <-rts.snapshotOutCh
  68. require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)
  69. require.Equal(t, peerAID, e.To)
  70. rts.syncer.AddPeer(peerBID)
  71. e = <-rts.snapshotOutCh
  72. require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)
  73. require.Equal(t, peerBID, e.To)
  74. // Both peers report back with snapshots. One of them also returns a snapshot we don't want, in
  75. // format 2, which will be rejected by the ABCI application.
  76. new, err := rts.syncer.AddSnapshot(peerAID, s)
  77. require.NoError(t, err)
  78. require.True(t, new)
  79. new, err = rts.syncer.AddSnapshot(peerBID, s)
  80. require.NoError(t, err)
  81. require.False(t, new)
  82. s2 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1}}
  83. new, err = rts.syncer.AddSnapshot(peerBID, s2)
  84. require.NoError(t, err)
  85. require.True(t, new)
  86. new, err = rts.syncer.AddSnapshot(peerCID, s2)
  87. require.NoError(t, err)
  88. require.False(t, new)
  89. // We start a sync, with peers sending back chunks when requested. We first reject the snapshot
  90. // with height 2 format 2, and accept the snapshot at height 1.
  91. connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  92. Snapshot: &abci.Snapshot{
  93. Height: 2,
  94. Format: 2,
  95. Chunks: 3,
  96. Hash: []byte{1},
  97. },
  98. AppHash: []byte("app_hash_2"),
  99. }).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  100. connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  101. Snapshot: &abci.Snapshot{
  102. Height: s.Height,
  103. Format: s.Format,
  104. Chunks: s.Chunks,
  105. Hash: s.Hash,
  106. Metadata: s.Metadata,
  107. },
  108. AppHash: []byte("app_hash"),
  109. }).Times(2).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil)
  110. chunkRequests := make(map[uint32]int)
  111. chunkRequestsMtx := tmsync.Mutex{}
  112. var wg sync.WaitGroup
  113. wg.Add(4)
  114. go func() {
  115. for e := range rts.chunkOutCh {
  116. msg, ok := e.Message.(*ssproto.ChunkRequest)
  117. assert.True(t, ok)
  118. assert.EqualValues(t, 1, msg.Height)
  119. assert.EqualValues(t, 1, msg.Format)
  120. assert.LessOrEqual(t, msg.Index, uint32(len(chunks)))
  121. added, err := rts.syncer.AddChunk(chunks[msg.Index])
  122. assert.NoError(t, err)
  123. assert.True(t, added)
  124. chunkRequestsMtx.Lock()
  125. chunkRequests[msg.Index]++
  126. chunkRequestsMtx.Unlock()
  127. wg.Done()
  128. }
  129. }()
  130. // The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
  131. // which should cause it to keep the existing chunk 0 and 2, and restart restoration from
  132. // beginning. We also wait for a little while, to exercise the retry logic in fetchChunks().
  133. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  134. Index: 2, Chunk: []byte{1, 1, 2},
  135. }).Once().Run(func(args mock.Arguments) { time.Sleep(2 * time.Second) }).Return(
  136. &abci.ResponseApplySnapshotChunk{
  137. Result: abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT,
  138. RefetchChunks: []uint32{1},
  139. }, nil)
  140. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  141. Index: 0, Chunk: []byte{1, 1, 0},
  142. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  143. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  144. Index: 1, Chunk: []byte{1, 1, 1},
  145. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  146. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  147. Index: 2, Chunk: []byte{1, 1, 2},
  148. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  149. connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(&abci.ResponseInfo{
  150. AppVersion: 9,
  151. LastBlockHeight: 1,
  152. LastBlockAppHash: []byte("app_hash"),
  153. }, nil)
  154. newState, lastCommit, err := rts.syncer.SyncAny(0, func() {})
  155. require.NoError(t, err)
  156. wg.Wait()
  157. chunkRequestsMtx.Lock()
  158. require.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests)
  159. chunkRequestsMtx.Unlock()
  160. // The syncer should have updated the state app version from the ABCI info response.
  161. expectState := state
  162. expectState.Version.Consensus.App = 9
  163. require.Equal(t, expectState, newState)
  164. require.Equal(t, commit, lastCommit)
  165. connSnapshot.AssertExpectations(t)
  166. connQuery.AssertExpectations(t)
  167. }
  168. func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
  169. stateProvider := &mocks.StateProvider{}
  170. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  171. rts := setup(t, nil, nil, stateProvider, 2)
  172. _, _, err := rts.syncer.SyncAny(0, func() {})
  173. require.Equal(t, errNoSnapshots, err)
  174. }
  175. func TestSyncer_SyncAny_abort(t *testing.T) {
  176. stateProvider := &mocks.StateProvider{}
  177. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  178. rts := setup(t, nil, nil, stateProvider, 2)
  179. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  180. peerID := p2p.NodeID("aa")
  181. _, err := rts.syncer.AddSnapshot(peerID, s)
  182. require.NoError(t, err)
  183. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  184. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  185. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  186. _, _, err = rts.syncer.SyncAny(0, func() {})
  187. require.Equal(t, errAbort, err)
  188. rts.conn.AssertExpectations(t)
  189. }
  190. func TestSyncer_SyncAny_reject(t *testing.T) {
  191. stateProvider := &mocks.StateProvider{}
  192. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  193. rts := setup(t, nil, nil, stateProvider, 2)
  194. // s22 is tried first, then s12, then s11, then errNoSnapshots
  195. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  196. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  197. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  198. peerID := p2p.NodeID("aa")
  199. _, err := rts.syncer.AddSnapshot(peerID, s22)
  200. require.NoError(t, err)
  201. _, err = rts.syncer.AddSnapshot(peerID, s12)
  202. require.NoError(t, err)
  203. _, err = rts.syncer.AddSnapshot(peerID, s11)
  204. require.NoError(t, err)
  205. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  206. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  207. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  208. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  209. Snapshot: toABCI(s12), AppHash: []byte("app_hash"),
  210. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  211. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  212. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  213. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  214. _, _, err = rts.syncer.SyncAny(0, func() {})
  215. require.Equal(t, errNoSnapshots, err)
  216. rts.conn.AssertExpectations(t)
  217. }
  218. func TestSyncer_SyncAny_reject_format(t *testing.T) {
  219. stateProvider := &mocks.StateProvider{}
  220. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  221. rts := setup(t, nil, nil, stateProvider, 2)
  222. // s22 is tried first, which reject s22 and s12, then s11 will abort.
  223. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  224. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  225. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  226. peerID := p2p.NodeID("aa")
  227. _, err := rts.syncer.AddSnapshot(peerID, s22)
  228. require.NoError(t, err)
  229. _, err = rts.syncer.AddSnapshot(peerID, s12)
  230. require.NoError(t, err)
  231. _, err = rts.syncer.AddSnapshot(peerID, s11)
  232. require.NoError(t, err)
  233. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  234. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  235. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  236. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  237. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  238. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  239. _, _, err = rts.syncer.SyncAny(0, func() {})
  240. require.Equal(t, errAbort, err)
  241. rts.conn.AssertExpectations(t)
  242. }
  243. func TestSyncer_SyncAny_reject_sender(t *testing.T) {
  244. stateProvider := &mocks.StateProvider{}
  245. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  246. rts := setup(t, nil, nil, stateProvider, 2)
  247. peerAID := p2p.NodeID("aa")
  248. peerBID := p2p.NodeID("bb")
  249. peerCID := p2p.NodeID("cc")
  250. // sbc will be offered first, which will be rejected with reject_sender, causing all snapshots
  251. // submitted by both b and c (i.e. sb, sc, sbc) to be rejected. Finally, sa will reject and
  252. // errNoSnapshots is returned.
  253. sa := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  254. sb := &snapshot{Height: 2, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  255. sc := &snapshot{Height: 3, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  256. sbc := &snapshot{Height: 4, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  257. _, err := rts.syncer.AddSnapshot(peerAID, sa)
  258. require.NoError(t, err)
  259. _, err = rts.syncer.AddSnapshot(peerBID, sb)
  260. require.NoError(t, err)
  261. _, err = rts.syncer.AddSnapshot(peerCID, sc)
  262. require.NoError(t, err)
  263. _, err = rts.syncer.AddSnapshot(peerBID, sbc)
  264. require.NoError(t, err)
  265. _, err = rts.syncer.AddSnapshot(peerCID, sbc)
  266. require.NoError(t, err)
  267. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  268. Snapshot: toABCI(sbc), AppHash: []byte("app_hash"),
  269. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_SENDER}, nil)
  270. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  271. Snapshot: toABCI(sa), AppHash: []byte("app_hash"),
  272. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  273. _, _, err = rts.syncer.SyncAny(0, func() {})
  274. require.Equal(t, errNoSnapshots, err)
  275. rts.conn.AssertExpectations(t)
  276. }
  277. func TestSyncer_SyncAny_abciError(t *testing.T) {
  278. stateProvider := &mocks.StateProvider{}
  279. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  280. rts := setup(t, nil, nil, stateProvider, 2)
  281. errBoom := errors.New("boom")
  282. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  283. peerID := p2p.NodeID("aa")
  284. _, err := rts.syncer.AddSnapshot(peerID, s)
  285. require.NoError(t, err)
  286. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  287. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  288. }).Once().Return(nil, errBoom)
  289. _, _, err = rts.syncer.SyncAny(0, func() {})
  290. require.True(t, errors.Is(err, errBoom))
  291. rts.conn.AssertExpectations(t)
  292. }
  293. func TestSyncer_offerSnapshot(t *testing.T) {
  294. unknownErr := errors.New("unknown error")
  295. boom := errors.New("boom")
  296. testcases := map[string]struct {
  297. result abci.ResponseOfferSnapshot_Result
  298. err error
  299. expectErr error
  300. }{
  301. "accept": {abci.ResponseOfferSnapshot_ACCEPT, nil, nil},
  302. "abort": {abci.ResponseOfferSnapshot_ABORT, nil, errAbort},
  303. "reject": {abci.ResponseOfferSnapshot_REJECT, nil, errRejectSnapshot},
  304. "reject_format": {abci.ResponseOfferSnapshot_REJECT_FORMAT, nil, errRejectFormat},
  305. "reject_sender": {abci.ResponseOfferSnapshot_REJECT_SENDER, nil, errRejectSender},
  306. "unknown": {abci.ResponseOfferSnapshot_UNKNOWN, nil, unknownErr},
  307. "error": {0, boom, boom},
  308. "unknown non-zero": {9, nil, unknownErr},
  309. }
  310. for name, tc := range testcases {
  311. tc := tc
  312. t.Run(name, func(t *testing.T) {
  313. stateProvider := &mocks.StateProvider{}
  314. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  315. rts := setup(t, nil, nil, stateProvider, 2)
  316. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  317. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  318. Snapshot: toABCI(s),
  319. AppHash: []byte("app_hash"),
  320. }).Return(&abci.ResponseOfferSnapshot{Result: tc.result}, tc.err)
  321. err := rts.syncer.offerSnapshot(s)
  322. if tc.expectErr == unknownErr {
  323. require.Error(t, err)
  324. } else {
  325. unwrapped := errors.Unwrap(err)
  326. if unwrapped != nil {
  327. err = unwrapped
  328. }
  329. require.Equal(t, tc.expectErr, err)
  330. }
  331. })
  332. }
  333. }
  334. func TestSyncer_applyChunks_Results(t *testing.T) {
  335. unknownErr := errors.New("unknown error")
  336. boom := errors.New("boom")
  337. testcases := map[string]struct {
  338. result abci.ResponseApplySnapshotChunk_Result
  339. err error
  340. expectErr error
  341. }{
  342. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT, nil, nil},
  343. "abort": {abci.ResponseApplySnapshotChunk_ABORT, nil, errAbort},
  344. "retry": {abci.ResponseApplySnapshotChunk_RETRY, nil, nil},
  345. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT, nil, errRetrySnapshot},
  346. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT, nil, errRejectSnapshot},
  347. "unknown": {abci.ResponseApplySnapshotChunk_UNKNOWN, nil, unknownErr},
  348. "error": {0, boom, boom},
  349. "unknown non-zero": {9, nil, unknownErr},
  350. }
  351. for name, tc := range testcases {
  352. tc := tc
  353. t.Run(name, func(t *testing.T) {
  354. stateProvider := &mocks.StateProvider{}
  355. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  356. rts := setup(t, nil, nil, stateProvider, 2)
  357. body := []byte{1, 2, 3}
  358. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "")
  359. require.NoError(t, err)
  360. _, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: body})
  361. require.NoError(t, err)
  362. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  363. Index: 0, Chunk: body,
  364. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: tc.result}, tc.err)
  365. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  366. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  367. Index: 0, Chunk: body,
  368. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  369. Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  370. }
  371. err = rts.syncer.applyChunks(chunks)
  372. if tc.expectErr == unknownErr {
  373. require.Error(t, err)
  374. } else {
  375. unwrapped := errors.Unwrap(err)
  376. if unwrapped != nil {
  377. err = unwrapped
  378. }
  379. require.Equal(t, tc.expectErr, err)
  380. }
  381. rts.conn.AssertExpectations(t)
  382. })
  383. }
  384. }
  385. func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
  386. // Discarding chunks via refetch_chunks should work the same for all results
  387. testcases := map[string]struct {
  388. result abci.ResponseApplySnapshotChunk_Result
  389. }{
  390. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  391. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  392. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  393. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  394. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  395. }
  396. for name, tc := range testcases {
  397. tc := tc
  398. t.Run(name, func(t *testing.T) {
  399. stateProvider := &mocks.StateProvider{}
  400. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  401. rts := setup(t, nil, nil, stateProvider, 2)
  402. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "")
  403. require.NoError(t, err)
  404. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}})
  405. require.True(t, added)
  406. require.NoError(t, err)
  407. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}})
  408. require.True(t, added)
  409. require.NoError(t, err)
  410. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}})
  411. require.True(t, added)
  412. require.NoError(t, err)
  413. // The first two chunks are accepted, before the last one asks for 1 to be refetched
  414. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  415. Index: 0, Chunk: []byte{0},
  416. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  417. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  418. Index: 1, Chunk: []byte{1},
  419. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  420. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  421. Index: 2, Chunk: []byte{2},
  422. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  423. Result: tc.result,
  424. RefetchChunks: []uint32{1},
  425. }, nil)
  426. // Since removing the chunk will cause Next() to block, we spawn a goroutine, then
  427. // check the queue contents, and finally close the queue to end the goroutine.
  428. // We don't really care about the result of applyChunks, since it has separate test.
  429. go func() {
  430. rts.syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error
  431. }()
  432. time.Sleep(50 * time.Millisecond)
  433. require.True(t, chunks.Has(0))
  434. require.False(t, chunks.Has(1))
  435. require.True(t, chunks.Has(2))
  436. require.NoError(t, chunks.Close())
  437. })
  438. }
  439. }
  440. func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
  441. // Banning chunks senders via ban_chunk_senders should work the same for all results
  442. testcases := map[string]struct {
  443. result abci.ResponseApplySnapshotChunk_Result
  444. }{
  445. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  446. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  447. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  448. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  449. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  450. }
  451. for name, tc := range testcases {
  452. tc := tc
  453. t.Run(name, func(t *testing.T) {
  454. stateProvider := &mocks.StateProvider{}
  455. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  456. rts := setup(t, nil, nil, stateProvider, 2)
  457. // Set up three peers across two snapshots, and ask for one of them to be banned.
  458. // It should be banned from all snapshots.
  459. peerAID := p2p.NodeID("aa")
  460. peerBID := p2p.NodeID("bb")
  461. peerCID := p2p.NodeID("cc")
  462. s1 := &snapshot{Height: 1, Format: 1, Chunks: 3}
  463. s2 := &snapshot{Height: 2, Format: 1, Chunks: 3}
  464. _, err := rts.syncer.AddSnapshot(peerAID, s1)
  465. require.NoError(t, err)
  466. _, err = rts.syncer.AddSnapshot(peerAID, s2)
  467. require.NoError(t, err)
  468. _, err = rts.syncer.AddSnapshot(peerBID, s1)
  469. require.NoError(t, err)
  470. _, err = rts.syncer.AddSnapshot(peerBID, s2)
  471. require.NoError(t, err)
  472. _, err = rts.syncer.AddSnapshot(peerCID, s1)
  473. require.NoError(t, err)
  474. _, err = rts.syncer.AddSnapshot(peerCID, s2)
  475. require.NoError(t, err)
  476. chunks, err := newChunkQueue(s1, "")
  477. require.NoError(t, err)
  478. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}, Sender: peerAID})
  479. require.True(t, added)
  480. require.NoError(t, err)
  481. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}, Sender: peerBID})
  482. require.True(t, added)
  483. require.NoError(t, err)
  484. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}, Sender: peerCID})
  485. require.True(t, added)
  486. require.NoError(t, err)
  487. // The first two chunks are accepted, before the last one asks for b sender to be rejected
  488. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  489. Index: 0, Chunk: []byte{0}, Sender: "aa",
  490. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  491. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  492. Index: 1, Chunk: []byte{1}, Sender: "bb",
  493. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  494. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  495. Index: 2, Chunk: []byte{2}, Sender: "cc",
  496. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  497. Result: tc.result,
  498. RejectSenders: []string{string(peerBID)},
  499. }, nil)
  500. // On retry, the last chunk will be tried again, so we just accept it then.
  501. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  502. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  503. Index: 2, Chunk: []byte{2}, Sender: "cc",
  504. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  505. }
  506. // We don't really care about the result of applyChunks, since it has separate test.
  507. // However, it will block on e.g. retry result, so we spawn a goroutine that will
  508. // be shut down when the chunk queue closes.
  509. go func() {
  510. rts.syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error
  511. }()
  512. time.Sleep(50 * time.Millisecond)
  513. s1peers := rts.syncer.snapshots.GetPeers(s1)
  514. require.Len(t, s1peers, 2)
  515. require.EqualValues(t, "aa", s1peers[0])
  516. require.EqualValues(t, "cc", s1peers[1])
  517. rts.syncer.snapshots.GetPeers(s1)
  518. require.Len(t, s1peers, 2)
  519. require.EqualValues(t, "aa", s1peers[0])
  520. require.EqualValues(t, "cc", s1peers[1])
  521. require.NoError(t, chunks.Close())
  522. })
  523. }
  524. }
  525. func TestSyncer_verifyApp(t *testing.T) {
  526. boom := errors.New("boom")
  527. s := &snapshot{Height: 3, Format: 1, Chunks: 5, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  528. testcases := map[string]struct {
  529. response *abci.ResponseInfo
  530. err error
  531. expectErr error
  532. }{
  533. "verified": {&abci.ResponseInfo{
  534. LastBlockHeight: 3,
  535. LastBlockAppHash: []byte("app_hash"),
  536. AppVersion: 9,
  537. }, nil, nil},
  538. "invalid height": {&abci.ResponseInfo{
  539. LastBlockHeight: 5,
  540. LastBlockAppHash: []byte("app_hash"),
  541. AppVersion: 9,
  542. }, nil, errVerifyFailed},
  543. "invalid hash": {&abci.ResponseInfo{
  544. LastBlockHeight: 3,
  545. LastBlockAppHash: []byte("xxx"),
  546. AppVersion: 9,
  547. }, nil, errVerifyFailed},
  548. "error": {nil, boom, boom},
  549. }
  550. for name, tc := range testcases {
  551. tc := tc
  552. t.Run(name, func(t *testing.T) {
  553. rts := setup(t, nil, nil, nil, 2)
  554. rts.connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(tc.response, tc.err)
  555. version, err := rts.syncer.verifyApp(s)
  556. unwrapped := errors.Unwrap(err)
  557. if unwrapped != nil {
  558. err = unwrapped
  559. }
  560. require.Equal(t, tc.expectErr, err)
  561. if err == nil {
  562. require.Equal(t, tc.response.AppVersion, version)
  563. }
  564. })
  565. }
  566. }
  567. func toABCI(s *snapshot) *abci.Snapshot {
  568. return &abci.Snapshot{
  569. Height: s.Height,
  570. Format: s.Format,
  571. Chunks: s.Chunks,
  572. Hash: s.Hash,
  573. Metadata: s.Metadata,
  574. }
  575. }