You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

693 lines
25 KiB

  1. package statesync
  2. import (
  3. "context"
  4. "errors"
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/mock"
  8. "github.com/stretchr/testify/require"
  9. abci "github.com/tendermint/tendermint/abci/types"
  10. tmsync "github.com/tendermint/tendermint/libs/sync"
  11. "github.com/tendermint/tendermint/p2p"
  12. tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
  13. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  14. tmversion "github.com/tendermint/tendermint/proto/tendermint/version"
  15. "github.com/tendermint/tendermint/proxy"
  16. proxymocks "github.com/tendermint/tendermint/proxy/mocks"
  17. sm "github.com/tendermint/tendermint/state"
  18. "github.com/tendermint/tendermint/statesync/mocks"
  19. "github.com/tendermint/tendermint/types"
  20. "github.com/tendermint/tendermint/version"
  21. )
  22. var ctx = context.Background()
  23. func TestSyncer_SyncAny(t *testing.T) {
  24. state := sm.State{
  25. ChainID: "chain",
  26. Version: tmstate.Version{
  27. Consensus: tmversion.Consensus{
  28. Block: version.BlockProtocol,
  29. App: 0,
  30. },
  31. Software: version.TMCoreSemVer,
  32. },
  33. LastBlockHeight: 1,
  34. LastBlockID: types.BlockID{Hash: []byte("blockhash")},
  35. LastBlockTime: time.Now(),
  36. LastResultsHash: []byte("last_results_hash"),
  37. AppHash: []byte("app_hash"),
  38. LastValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val1")}},
  39. Validators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val2")}},
  40. NextValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val3")}},
  41. ConsensusParams: *types.DefaultConsensusParams(),
  42. LastHeightConsensusParamsChanged: 1,
  43. }
  44. commit := &types.Commit{BlockID: types.BlockID{Hash: []byte("blockhash")}}
  45. chunks := []*chunk{
  46. {Height: 1, Format: 1, Index: 0, Chunk: []byte{1, 1, 0}},
  47. {Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 1, 1}},
  48. {Height: 1, Format: 1, Index: 2, Chunk: []byte{1, 1, 2}},
  49. }
  50. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  51. stateProvider := &mocks.StateProvider{}
  52. stateProvider.On("AppHash", mock.Anything, uint64(1)).Return(state.AppHash, nil)
  53. stateProvider.On("AppHash", mock.Anything, uint64(2)).Return([]byte("app_hash_2"), nil)
  54. stateProvider.On("Commit", mock.Anything, uint64(1)).Return(commit, nil)
  55. stateProvider.On("State", mock.Anything, uint64(1)).Return(state, nil)
  56. connSnapshot := &proxymocks.AppConnSnapshot{}
  57. connQuery := &proxymocks.AppConnQuery{}
  58. peerAID := p2p.NodeID("aa")
  59. peerBID := p2p.NodeID("bb")
  60. rts := setup(t, connSnapshot, connQuery, stateProvider, 3)
  61. // Adding a chunk should error when no sync is in progress
  62. _, err := rts.syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}})
  63. require.Error(t, err)
  64. // Adding a couple of peers should trigger snapshot discovery messages
  65. rts.syncer.AddPeer(peerAID)
  66. e := <-rts.snapshotOutCh
  67. require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)
  68. require.Equal(t, peerAID, e.To)
  69. rts.syncer.AddPeer(peerBID)
  70. e = <-rts.snapshotOutCh
  71. require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)
  72. require.Equal(t, peerBID, e.To)
  73. // Both peers report back with snapshots. One of them also returns a snapshot we don't want, in
  74. // format 2, which will be rejected by the ABCI application.
  75. new, err := rts.syncer.AddSnapshot(peerAID, s)
  76. require.NoError(t, err)
  77. require.True(t, new)
  78. new, err = rts.syncer.AddSnapshot(peerBID, s)
  79. require.NoError(t, err)
  80. require.False(t, new)
  81. new, err = rts.syncer.AddSnapshot(peerBID, &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1}})
  82. require.NoError(t, err)
  83. require.True(t, new)
  84. // We start a sync, with peers sending back chunks when requested. We first reject the snapshot
  85. // with height 2 format 2, and accept the snapshot at height 1.
  86. connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  87. Snapshot: &abci.Snapshot{
  88. Height: 2,
  89. Format: 2,
  90. Chunks: 3,
  91. Hash: []byte{1},
  92. },
  93. AppHash: []byte("app_hash_2"),
  94. }).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  95. connSnapshot.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  96. Snapshot: &abci.Snapshot{
  97. Height: s.Height,
  98. Format: s.Format,
  99. Chunks: s.Chunks,
  100. Hash: s.Hash,
  101. Metadata: s.Metadata,
  102. },
  103. AppHash: []byte("app_hash"),
  104. }).Times(2).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil)
  105. chunkRequests := make(map[uint32]int)
  106. chunkRequestsMtx := tmsync.Mutex{}
  107. go func() {
  108. for e := range rts.chunkOutCh {
  109. msg, ok := e.Message.(*ssproto.ChunkRequest)
  110. require.True(t, ok)
  111. require.EqualValues(t, 1, msg.Height)
  112. require.EqualValues(t, 1, msg.Format)
  113. require.LessOrEqual(t, msg.Index, uint32(len(chunks)))
  114. added, err := rts.syncer.AddChunk(chunks[msg.Index])
  115. require.NoError(t, err)
  116. require.True(t, added)
  117. chunkRequestsMtx.Lock()
  118. chunkRequests[msg.Index]++
  119. chunkRequestsMtx.Unlock()
  120. }
  121. }()
  122. // The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
  123. // which should cause it to keep the existing chunk 0 and 2, and restart restoration from
  124. // beginning. We also wait for a little while, to exercise the retry logic in fetchChunks().
  125. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  126. Index: 2, Chunk: []byte{1, 1, 2},
  127. }).Once().Run(func(args mock.Arguments) { time.Sleep(2 * time.Second) }).Return(
  128. &abci.ResponseApplySnapshotChunk{
  129. Result: abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT,
  130. RefetchChunks: []uint32{1},
  131. }, nil)
  132. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  133. Index: 0, Chunk: []byte{1, 1, 0},
  134. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  135. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  136. Index: 1, Chunk: []byte{1, 1, 1},
  137. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  138. connSnapshot.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  139. Index: 2, Chunk: []byte{1, 1, 2},
  140. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  141. connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(&abci.ResponseInfo{
  142. AppVersion: 9,
  143. LastBlockHeight: 1,
  144. LastBlockAppHash: []byte("app_hash"),
  145. }, nil)
  146. newState, lastCommit, err := rts.syncer.SyncAny(0)
  147. require.NoError(t, err)
  148. time.Sleep(50 * time.Millisecond) // wait for peers to receive requests
  149. chunkRequestsMtx.Lock()
  150. require.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests)
  151. chunkRequestsMtx.Unlock()
  152. // The syncer should have updated the state app version from the ABCI info response.
  153. expectState := state
  154. expectState.Version.Consensus.App = 9
  155. require.Equal(t, expectState, newState)
  156. require.Equal(t, commit, lastCommit)
  157. connSnapshot.AssertExpectations(t)
  158. connQuery.AssertExpectations(t)
  159. }
  160. func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
  161. stateProvider := &mocks.StateProvider{}
  162. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  163. rts := setup(t, nil, nil, stateProvider, 2)
  164. _, _, err := rts.syncer.SyncAny(0)
  165. require.Equal(t, errNoSnapshots, err)
  166. }
  167. func TestSyncer_SyncAny_abort(t *testing.T) {
  168. stateProvider := &mocks.StateProvider{}
  169. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  170. rts := setup(t, nil, nil, stateProvider, 2)
  171. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  172. peerID := p2p.NodeID("aa")
  173. _, err := rts.syncer.AddSnapshot(peerID, s)
  174. require.NoError(t, err)
  175. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  176. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  177. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  178. _, _, err = rts.syncer.SyncAny(0)
  179. require.Equal(t, errAbort, err)
  180. rts.conn.AssertExpectations(t)
  181. }
  182. func TestSyncer_SyncAny_reject(t *testing.T) {
  183. stateProvider := &mocks.StateProvider{}
  184. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  185. rts := setup(t, nil, nil, stateProvider, 2)
  186. // s22 is tried first, then s12, then s11, then errNoSnapshots
  187. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  188. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  189. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  190. peerID := p2p.NodeID("aa")
  191. _, err := rts.syncer.AddSnapshot(peerID, s22)
  192. require.NoError(t, err)
  193. _, err = rts.syncer.AddSnapshot(peerID, s12)
  194. require.NoError(t, err)
  195. _, err = rts.syncer.AddSnapshot(peerID, s11)
  196. require.NoError(t, err)
  197. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  198. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  199. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  200. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  201. Snapshot: toABCI(s12), AppHash: []byte("app_hash"),
  202. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  203. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  204. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  205. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  206. _, _, err = rts.syncer.SyncAny(0)
  207. require.Equal(t, errNoSnapshots, err)
  208. rts.conn.AssertExpectations(t)
  209. }
  210. func TestSyncer_SyncAny_reject_format(t *testing.T) {
  211. stateProvider := &mocks.StateProvider{}
  212. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  213. rts := setup(t, nil, nil, stateProvider, 2)
  214. // s22 is tried first, which reject s22 and s12, then s11 will abort.
  215. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  216. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  217. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  218. peerID := p2p.NodeID("aa")
  219. _, err := rts.syncer.AddSnapshot(peerID, s22)
  220. require.NoError(t, err)
  221. _, err = rts.syncer.AddSnapshot(peerID, s12)
  222. require.NoError(t, err)
  223. _, err = rts.syncer.AddSnapshot(peerID, s11)
  224. require.NoError(t, err)
  225. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  226. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  227. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  228. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  229. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  230. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  231. _, _, err = rts.syncer.SyncAny(0)
  232. require.Equal(t, errAbort, err)
  233. rts.conn.AssertExpectations(t)
  234. }
  235. func TestSyncer_SyncAny_reject_sender(t *testing.T) {
  236. stateProvider := &mocks.StateProvider{}
  237. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  238. rts := setup(t, nil, nil, stateProvider, 2)
  239. peerAID := p2p.NodeID("aa")
  240. peerBID := p2p.NodeID("bb")
  241. peerCID := p2p.NodeID("cc")
  242. // sbc will be offered first, which will be rejected with reject_sender, causing all snapshots
  243. // submitted by both b and c (i.e. sb, sc, sbc) to be rejected. Finally, sa will reject and
  244. // errNoSnapshots is returned.
  245. sa := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  246. sb := &snapshot{Height: 2, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  247. sc := &snapshot{Height: 3, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  248. sbc := &snapshot{Height: 4, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  249. _, err := rts.syncer.AddSnapshot(peerAID, sa)
  250. require.NoError(t, err)
  251. _, err = rts.syncer.AddSnapshot(peerBID, sb)
  252. require.NoError(t, err)
  253. _, err = rts.syncer.AddSnapshot(peerCID, sc)
  254. require.NoError(t, err)
  255. _, err = rts.syncer.AddSnapshot(peerBID, sbc)
  256. require.NoError(t, err)
  257. _, err = rts.syncer.AddSnapshot(peerCID, sbc)
  258. require.NoError(t, err)
  259. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  260. Snapshot: toABCI(sbc), AppHash: []byte("app_hash"),
  261. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_SENDER}, nil)
  262. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  263. Snapshot: toABCI(sa), AppHash: []byte("app_hash"),
  264. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  265. _, _, err = rts.syncer.SyncAny(0)
  266. require.Equal(t, errNoSnapshots, err)
  267. rts.conn.AssertExpectations(t)
  268. }
  269. func TestSyncer_SyncAny_abciError(t *testing.T) {
  270. stateProvider := &mocks.StateProvider{}
  271. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  272. rts := setup(t, nil, nil, stateProvider, 2)
  273. errBoom := errors.New("boom")
  274. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  275. peerID := p2p.NodeID("aa")
  276. _, err := rts.syncer.AddSnapshot(peerID, s)
  277. require.NoError(t, err)
  278. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  279. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  280. }).Once().Return(nil, errBoom)
  281. _, _, err = rts.syncer.SyncAny(0)
  282. require.True(t, errors.Is(err, errBoom))
  283. rts.conn.AssertExpectations(t)
  284. }
  285. func TestSyncer_offerSnapshot(t *testing.T) {
  286. unknownErr := errors.New("unknown error")
  287. boom := errors.New("boom")
  288. testcases := map[string]struct {
  289. result abci.ResponseOfferSnapshot_Result
  290. err error
  291. expectErr error
  292. }{
  293. "accept": {abci.ResponseOfferSnapshot_ACCEPT, nil, nil},
  294. "abort": {abci.ResponseOfferSnapshot_ABORT, nil, errAbort},
  295. "reject": {abci.ResponseOfferSnapshot_REJECT, nil, errRejectSnapshot},
  296. "reject_format": {abci.ResponseOfferSnapshot_REJECT_FORMAT, nil, errRejectFormat},
  297. "reject_sender": {abci.ResponseOfferSnapshot_REJECT_SENDER, nil, errRejectSender},
  298. "unknown": {abci.ResponseOfferSnapshot_UNKNOWN, nil, unknownErr},
  299. "error": {0, boom, boom},
  300. "unknown non-zero": {9, nil, unknownErr},
  301. }
  302. for name, tc := range testcases {
  303. tc := tc
  304. t.Run(name, func(t *testing.T) {
  305. stateProvider := &mocks.StateProvider{}
  306. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  307. rts := setup(t, nil, nil, stateProvider, 2)
  308. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  309. rts.conn.On("OfferSnapshotSync", ctx, abci.RequestOfferSnapshot{
  310. Snapshot: toABCI(s),
  311. AppHash: []byte("app_hash"),
  312. }).Return(&abci.ResponseOfferSnapshot{Result: tc.result}, tc.err)
  313. err := rts.syncer.offerSnapshot(s)
  314. if tc.expectErr == unknownErr {
  315. require.Error(t, err)
  316. } else {
  317. unwrapped := errors.Unwrap(err)
  318. if unwrapped != nil {
  319. err = unwrapped
  320. }
  321. require.Equal(t, tc.expectErr, err)
  322. }
  323. })
  324. }
  325. }
  326. func TestSyncer_applyChunks_Results(t *testing.T) {
  327. unknownErr := errors.New("unknown error")
  328. boom := errors.New("boom")
  329. testcases := map[string]struct {
  330. result abci.ResponseApplySnapshotChunk_Result
  331. err error
  332. expectErr error
  333. }{
  334. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT, nil, nil},
  335. "abort": {abci.ResponseApplySnapshotChunk_ABORT, nil, errAbort},
  336. "retry": {abci.ResponseApplySnapshotChunk_RETRY, nil, nil},
  337. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT, nil, errRetrySnapshot},
  338. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT, nil, errRejectSnapshot},
  339. "unknown": {abci.ResponseApplySnapshotChunk_UNKNOWN, nil, unknownErr},
  340. "error": {0, boom, boom},
  341. "unknown non-zero": {9, nil, unknownErr},
  342. }
  343. for name, tc := range testcases {
  344. tc := tc
  345. t.Run(name, func(t *testing.T) {
  346. stateProvider := &mocks.StateProvider{}
  347. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  348. rts := setup(t, nil, nil, stateProvider, 2)
  349. body := []byte{1, 2, 3}
  350. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "")
  351. require.NoError(t, err)
  352. _, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: body})
  353. require.NoError(t, err)
  354. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  355. Index: 0, Chunk: body,
  356. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: tc.result}, tc.err)
  357. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  358. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  359. Index: 0, Chunk: body,
  360. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  361. Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  362. }
  363. err = rts.syncer.applyChunks(chunks)
  364. if tc.expectErr == unknownErr {
  365. require.Error(t, err)
  366. } else {
  367. unwrapped := errors.Unwrap(err)
  368. if unwrapped != nil {
  369. err = unwrapped
  370. }
  371. require.Equal(t, tc.expectErr, err)
  372. }
  373. rts.conn.AssertExpectations(t)
  374. })
  375. }
  376. }
  377. func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
  378. // Discarding chunks via refetch_chunks should work the same for all results
  379. testcases := map[string]struct {
  380. result abci.ResponseApplySnapshotChunk_Result
  381. }{
  382. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  383. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  384. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  385. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  386. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  387. }
  388. for name, tc := range testcases {
  389. tc := tc
  390. t.Run(name, func(t *testing.T) {
  391. stateProvider := &mocks.StateProvider{}
  392. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  393. rts := setup(t, nil, nil, stateProvider, 2)
  394. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "")
  395. require.NoError(t, err)
  396. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}})
  397. require.True(t, added)
  398. require.NoError(t, err)
  399. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}})
  400. require.True(t, added)
  401. require.NoError(t, err)
  402. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}})
  403. require.True(t, added)
  404. require.NoError(t, err)
  405. // The first two chunks are accepted, before the last one asks for 1 to be refetched
  406. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  407. Index: 0, Chunk: []byte{0},
  408. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  409. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  410. Index: 1, Chunk: []byte{1},
  411. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  412. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  413. Index: 2, Chunk: []byte{2},
  414. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  415. Result: tc.result,
  416. RefetchChunks: []uint32{1},
  417. }, nil)
  418. // Since removing the chunk will cause Next() to block, we spawn a goroutine, then
  419. // check the queue contents, and finally close the queue to end the goroutine.
  420. // We don't really care about the result of applyChunks, since it has separate test.
  421. go func() {
  422. rts.syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error
  423. }()
  424. time.Sleep(50 * time.Millisecond)
  425. require.True(t, chunks.Has(0))
  426. require.False(t, chunks.Has(1))
  427. require.True(t, chunks.Has(2))
  428. require.NoError(t, chunks.Close())
  429. })
  430. }
  431. }
  432. func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
  433. // Banning chunks senders via ban_chunk_senders should work the same for all results
  434. testcases := map[string]struct {
  435. result abci.ResponseApplySnapshotChunk_Result
  436. }{
  437. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  438. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  439. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  440. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  441. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  442. }
  443. for name, tc := range testcases {
  444. tc := tc
  445. t.Run(name, func(t *testing.T) {
  446. stateProvider := &mocks.StateProvider{}
  447. stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
  448. rts := setup(t, nil, nil, stateProvider, 2)
  449. // Set up three peers across two snapshots, and ask for one of them to be banned.
  450. // It should be banned from all snapshots.
  451. peerAID := p2p.NodeID("aa")
  452. peerBID := p2p.NodeID("bb")
  453. peerCID := p2p.NodeID("cc")
  454. s1 := &snapshot{Height: 1, Format: 1, Chunks: 3}
  455. s2 := &snapshot{Height: 2, Format: 1, Chunks: 3}
  456. _, err := rts.syncer.AddSnapshot(peerAID, s1)
  457. require.NoError(t, err)
  458. _, err = rts.syncer.AddSnapshot(peerAID, s2)
  459. require.NoError(t, err)
  460. _, err = rts.syncer.AddSnapshot(peerBID, s1)
  461. require.NoError(t, err)
  462. _, err = rts.syncer.AddSnapshot(peerBID, s2)
  463. require.NoError(t, err)
  464. _, err = rts.syncer.AddSnapshot(peerCID, s1)
  465. require.NoError(t, err)
  466. _, err = rts.syncer.AddSnapshot(peerCID, s2)
  467. require.NoError(t, err)
  468. chunks, err := newChunkQueue(s1, "")
  469. require.NoError(t, err)
  470. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}, Sender: peerAID})
  471. require.True(t, added)
  472. require.NoError(t, err)
  473. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}, Sender: peerBID})
  474. require.True(t, added)
  475. require.NoError(t, err)
  476. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}, Sender: peerCID})
  477. require.True(t, added)
  478. require.NoError(t, err)
  479. // The first two chunks are accepted, before the last one asks for b sender to be rejected
  480. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  481. Index: 0, Chunk: []byte{0}, Sender: "aa",
  482. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  483. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  484. Index: 1, Chunk: []byte{1}, Sender: "bb",
  485. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  486. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  487. Index: 2, Chunk: []byte{2}, Sender: "cc",
  488. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  489. Result: tc.result,
  490. RejectSenders: []string{string(peerBID)},
  491. }, nil)
  492. // On retry, the last chunk will be tried again, so we just accept it then.
  493. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  494. rts.conn.On("ApplySnapshotChunkSync", ctx, abci.RequestApplySnapshotChunk{
  495. Index: 2, Chunk: []byte{2}, Sender: "cc",
  496. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  497. }
  498. // We don't really care about the result of applyChunks, since it has separate test.
  499. // However, it will block on e.g. retry result, so we spawn a goroutine that will
  500. // be shut down when the chunk queue closes.
  501. go func() {
  502. rts.syncer.applyChunks(chunks) //nolint:errcheck // purposefully ignore error
  503. }()
  504. time.Sleep(50 * time.Millisecond)
  505. s1peers := rts.syncer.snapshots.GetPeers(s1)
  506. require.Len(t, s1peers, 2)
  507. require.EqualValues(t, "aa", s1peers[0])
  508. require.EqualValues(t, "cc", s1peers[1])
  509. rts.syncer.snapshots.GetPeers(s1)
  510. require.Len(t, s1peers, 2)
  511. require.EqualValues(t, "aa", s1peers[0])
  512. require.EqualValues(t, "cc", s1peers[1])
  513. require.NoError(t, chunks.Close())
  514. })
  515. }
  516. }
  517. func TestSyncer_verifyApp(t *testing.T) {
  518. boom := errors.New("boom")
  519. s := &snapshot{Height: 3, Format: 1, Chunks: 5, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  520. testcases := map[string]struct {
  521. response *abci.ResponseInfo
  522. err error
  523. expectErr error
  524. }{
  525. "verified": {&abci.ResponseInfo{
  526. LastBlockHeight: 3,
  527. LastBlockAppHash: []byte("app_hash"),
  528. AppVersion: 9,
  529. }, nil, nil},
  530. "invalid height": {&abci.ResponseInfo{
  531. LastBlockHeight: 5,
  532. LastBlockAppHash: []byte("app_hash"),
  533. AppVersion: 9,
  534. }, nil, errVerifyFailed},
  535. "invalid hash": {&abci.ResponseInfo{
  536. LastBlockHeight: 3,
  537. LastBlockAppHash: []byte("xxx"),
  538. AppVersion: 9,
  539. }, nil, errVerifyFailed},
  540. "error": {nil, boom, boom},
  541. }
  542. for name, tc := range testcases {
  543. tc := tc
  544. t.Run(name, func(t *testing.T) {
  545. rts := setup(t, nil, nil, nil, 2)
  546. rts.connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(tc.response, tc.err)
  547. version, err := rts.syncer.verifyApp(s)
  548. unwrapped := errors.Unwrap(err)
  549. if unwrapped != nil {
  550. err = unwrapped
  551. }
  552. require.Equal(t, tc.expectErr, err)
  553. if err == nil {
  554. require.Equal(t, tc.response.AppVersion, version)
  555. }
  556. })
  557. }
  558. }
  559. func toABCI(s *snapshot) *abci.Snapshot {
  560. return &abci.Snapshot{
  561. Height: s.Height,
  562. Format: s.Format,
  563. Chunks: s.Chunks,
  564. Hash: s.Hash,
  565. Metadata: s.Metadata,
  566. }
  567. }