You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

639 lines
24 KiB

  1. package statesync
  2. import (
  3. "errors"
  4. "sync"
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/assert"
  8. "github.com/stretchr/testify/mock"
  9. "github.com/stretchr/testify/require"
  10. abci "github.com/tendermint/tendermint/abci/types"
  11. "github.com/tendermint/tendermint/libs/log"
  12. "github.com/tendermint/tendermint/p2p"
  13. p2pmocks "github.com/tendermint/tendermint/p2p/mocks"
  14. "github.com/tendermint/tendermint/proxy"
  15. proxymocks "github.com/tendermint/tendermint/proxy/mocks"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/statesync/mocks"
  18. "github.com/tendermint/tendermint/types"
  19. "github.com/tendermint/tendermint/version"
  20. )
  21. // Sets up a basic syncer that can be used to test OfferSnapshot requests
  22. func setupOfferSyncer(t *testing.T) (*syncer, *proxymocks.AppConnSnapshot) {
  23. connQuery := &proxymocks.AppConnQuery{}
  24. connSnapshot := &proxymocks.AppConnSnapshot{}
  25. stateProvider := &mocks.StateProvider{}
  26. stateProvider.On("AppHash", mock.Anything).Return([]byte("app_hash"), nil)
  27. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  28. return syncer, connSnapshot
  29. }
  30. // Sets up a simple peer mock with an ID
  31. func simplePeer(id string) *p2pmocks.Peer {
  32. peer := &p2pmocks.Peer{}
  33. peer.On("ID").Return(p2p.ID(id))
  34. return peer
  35. }
  36. func TestSyncer_SyncAny(t *testing.T) {
  37. state := sm.State{
  38. ChainID: "chain",
  39. Version: sm.Version{
  40. Consensus: version.Consensus{
  41. Block: version.BlockProtocol,
  42. App: 0,
  43. },
  44. Software: version.TMCoreSemVer,
  45. },
  46. LastBlockHeight: 1,
  47. LastBlockID: types.BlockID{Hash: []byte("blockhash")},
  48. LastBlockTime: time.Now(),
  49. LastResultsHash: []byte("last_results_hash"),
  50. AppHash: []byte("app_hash"),
  51. LastValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val1")}},
  52. Validators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val2")}},
  53. NextValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val3")}},
  54. ConsensusParams: *types.DefaultConsensusParams(),
  55. LastHeightConsensusParamsChanged: 1,
  56. }
  57. commit := &types.Commit{BlockID: types.BlockID{Hash: []byte("blockhash")}}
  58. chunks := []*chunk{
  59. {Height: 1, Format: 1, Index: 0, Chunk: []byte{1, 1, 0}},
  60. {Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 1, 1}},
  61. {Height: 1, Format: 1, Index: 2, Chunk: []byte{1, 1, 2}},
  62. }
  63. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  64. stateProvider := &mocks.StateProvider{}
  65. stateProvider.On("AppHash", uint64(1)).Return(state.AppHash, nil)
  66. stateProvider.On("AppHash", uint64(2)).Return([]byte("app_hash_2"), nil)
  67. stateProvider.On("Commit", uint64(1)).Return(commit, nil)
  68. stateProvider.On("State", uint64(1)).Return(state, nil)
  69. connSnapshot := &proxymocks.AppConnSnapshot{}
  70. connQuery := &proxymocks.AppConnQuery{}
  71. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  72. // Adding a chunk should error when no sync is in progress
  73. _, err := syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}})
  74. require.Error(t, err)
  75. // Adding a couple of peers should trigger snapshot discovery messages
  76. peerA := &p2pmocks.Peer{}
  77. peerA.On("ID").Return(p2p.ID("a"))
  78. peerA.On("Send", SnapshotChannel, cdc.MustMarshalBinaryBare(&snapshotsRequestMessage{})).Return(true)
  79. syncer.AddPeer(peerA)
  80. peerA.AssertExpectations(t)
  81. peerB := &p2pmocks.Peer{}
  82. peerB.On("ID").Return(p2p.ID("b"))
  83. peerB.On("Send", SnapshotChannel, cdc.MustMarshalBinaryBare(&snapshotsRequestMessage{})).Return(true)
  84. syncer.AddPeer(peerB)
  85. peerB.AssertExpectations(t)
  86. // Both peers report back with snapshots. One of them also returns a snapshot we don't want, in
  87. // format 2, which will be rejected by the ABCI application.
  88. new, err := syncer.AddSnapshot(peerA, s)
  89. require.NoError(t, err)
  90. assert.True(t, new)
  91. new, err = syncer.AddSnapshot(peerB, s)
  92. require.NoError(t, err)
  93. assert.False(t, new)
  94. new, err = syncer.AddSnapshot(peerB, &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1}})
  95. require.NoError(t, err)
  96. assert.True(t, new)
  97. // We start a sync, with peers sending back chunks when requested. We first reject the snapshot
  98. // with height 2 format 2, and accept the snapshot at height 1.
  99. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  100. Snapshot: &abci.Snapshot{
  101. Height: 2,
  102. Format: 2,
  103. Chunks: 3,
  104. Hash: []byte{1},
  105. },
  106. AppHash: []byte("app_hash_2"),
  107. }).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  108. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  109. Snapshot: &abci.Snapshot{
  110. Height: s.Height,
  111. Format: s.Format,
  112. Chunks: s.Chunks,
  113. Hash: s.Hash,
  114. Metadata: s.Metadata,
  115. },
  116. AppHash: []byte("app_hash"),
  117. }).Times(2).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil)
  118. chunkRequests := make(map[uint32]int)
  119. chunkRequestsMtx := sync.Mutex{}
  120. onChunkRequest := func(args mock.Arguments) {
  121. msg := &chunkRequestMessage{}
  122. err := cdc.UnmarshalBinaryBare(args[1].([]byte), &msg)
  123. require.NoError(t, err)
  124. require.EqualValues(t, 1, msg.Height)
  125. require.EqualValues(t, 1, msg.Format)
  126. require.LessOrEqual(t, msg.Index, uint32(len(chunks)))
  127. added, err := syncer.AddChunk(chunks[msg.Index])
  128. require.NoError(t, err)
  129. assert.True(t, added)
  130. chunkRequestsMtx.Lock()
  131. chunkRequests[msg.Index]++
  132. chunkRequestsMtx.Unlock()
  133. }
  134. peerA.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
  135. peerB.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
  136. // The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
  137. // which should cause it to keep the existing chunk 0 and 2, and restart restoration from
  138. // beginning. We also wait for a little while, to exercise the retry logic in fetchChunks().
  139. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  140. Index: 2, Chunk: []byte{1, 1, 2},
  141. }).Once().Run(func(args mock.Arguments) { time.Sleep(2 * time.Second) }).Return(
  142. &abci.ResponseApplySnapshotChunk{
  143. Result: abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT,
  144. RefetchChunks: []uint32{1},
  145. }, nil)
  146. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  147. Index: 0, Chunk: []byte{1, 1, 0},
  148. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  149. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  150. Index: 1, Chunk: []byte{1, 1, 1},
  151. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  152. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  153. Index: 2, Chunk: []byte{1, 1, 2},
  154. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  155. connQuery.On("InfoSync", proxy.RequestInfo).Return(&abci.ResponseInfo{
  156. AppVersion: 9,
  157. LastBlockHeight: 1,
  158. LastBlockAppHash: []byte("app_hash"),
  159. }, nil)
  160. newState, lastCommit, err := syncer.SyncAny(0)
  161. require.NoError(t, err)
  162. chunkRequestsMtx.Lock()
  163. assert.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests)
  164. chunkRequestsMtx.Unlock()
  165. // The syncer should have updated the state app version from the ABCI info response.
  166. expectState := state
  167. expectState.Version.Consensus.App = 9
  168. assert.Equal(t, expectState, newState)
  169. assert.Equal(t, commit, lastCommit)
  170. connSnapshot.AssertExpectations(t)
  171. connQuery.AssertExpectations(t)
  172. peerA.AssertExpectations(t)
  173. peerB.AssertExpectations(t)
  174. }
  175. func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
  176. syncer, _ := setupOfferSyncer(t)
  177. _, _, err := syncer.SyncAny(0)
  178. assert.Equal(t, errNoSnapshots, err)
  179. }
  180. func TestSyncer_SyncAny_ABORT(t *testing.T) {
  181. syncer, connSnapshot := setupOfferSyncer(t)
  182. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  183. syncer.AddSnapshot(simplePeer("id"), s)
  184. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  185. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  186. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  187. _, _, err := syncer.SyncAny(0)
  188. assert.Equal(t, errAbort, err)
  189. connSnapshot.AssertExpectations(t)
  190. }
  191. func TestSyncer_SyncAny_reject(t *testing.T) {
  192. syncer, connSnapshot := setupOfferSyncer(t)
  193. // s22 is tried first, then s12, then s11, then errNoSnapshots
  194. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  195. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  196. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  197. syncer.AddSnapshot(simplePeer("id"), s22)
  198. syncer.AddSnapshot(simplePeer("id"), s12)
  199. syncer.AddSnapshot(simplePeer("id"), s11)
  200. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  201. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  202. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  203. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  204. Snapshot: toABCI(s12), AppHash: []byte("app_hash"),
  205. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  206. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  207. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  208. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  209. _, _, err := syncer.SyncAny(0)
  210. assert.Equal(t, errNoSnapshots, err)
  211. connSnapshot.AssertExpectations(t)
  212. }
  213. func TestSyncer_SyncAny_REJECT_FORMAT(t *testing.T) {
  214. syncer, connSnapshot := setupOfferSyncer(t)
  215. // s22 is tried first, which reject s22 and s12, then s11 will abort.
  216. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  217. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  218. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  219. syncer.AddSnapshot(simplePeer("id"), s22)
  220. syncer.AddSnapshot(simplePeer("id"), s12)
  221. syncer.AddSnapshot(simplePeer("id"), s11)
  222. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  223. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  224. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  225. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  226. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  227. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  228. _, _, err := syncer.SyncAny(0)
  229. assert.Equal(t, errAbort, err)
  230. connSnapshot.AssertExpectations(t)
  231. }
  232. func TestSyncer_SyncAny_reject_sender(t *testing.T) {
  233. syncer, connSnapshot := setupOfferSyncer(t)
  234. peerA := simplePeer("a")
  235. peerB := simplePeer("b")
  236. peerC := simplePeer("c")
  237. // sbc will be offered first, which will be rejected with reject_sender, causing all snapshots
  238. // submitted by both b and c (i.e. sb, sc, sbc) to be rejected. Finally, sa will reject and
  239. // errNoSnapshots is returned.
  240. sa := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  241. sb := &snapshot{Height: 2, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  242. sc := &snapshot{Height: 3, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  243. sbc := &snapshot{Height: 4, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  244. _, err := syncer.AddSnapshot(peerA, sa)
  245. require.NoError(t, err)
  246. _, err = syncer.AddSnapshot(peerB, sb)
  247. require.NoError(t, err)
  248. _, err = syncer.AddSnapshot(peerC, sc)
  249. require.NoError(t, err)
  250. _, err = syncer.AddSnapshot(peerB, sbc)
  251. require.NoError(t, err)
  252. _, err = syncer.AddSnapshot(peerC, sbc)
  253. require.NoError(t, err)
  254. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  255. Snapshot: toABCI(sbc), AppHash: []byte("app_hash"),
  256. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_SENDER}, nil)
  257. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  258. Snapshot: toABCI(sa), AppHash: []byte("app_hash"),
  259. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  260. _, _, err = syncer.SyncAny(0)
  261. assert.Equal(t, errNoSnapshots, err)
  262. connSnapshot.AssertExpectations(t)
  263. }
  264. func TestSyncer_SyncAny_abciError(t *testing.T) {
  265. syncer, connSnapshot := setupOfferSyncer(t)
  266. errBoom := errors.New("boom")
  267. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  268. syncer.AddSnapshot(simplePeer("id"), s)
  269. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  270. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  271. }).Once().Return(nil, errBoom)
  272. _, _, err := syncer.SyncAny(0)
  273. assert.True(t, errors.Is(err, errBoom))
  274. connSnapshot.AssertExpectations(t)
  275. }
  276. func TestSyncer_offerSnapshot(t *testing.T) {
  277. unknownErr := errors.New("unknown error")
  278. boom := errors.New("boom")
  279. testcases := map[string]struct {
  280. result abci.ResponseOfferSnapshot_Result
  281. err error
  282. expectErr error
  283. }{
  284. "accept": {abci.ResponseOfferSnapshot_ACCEPT, nil, nil},
  285. "abort": {abci.ResponseOfferSnapshot_ABORT, nil, errAbort},
  286. "reject": {abci.ResponseOfferSnapshot_REJECT, nil, errRejectSnapshot},
  287. "reject_format": {abci.ResponseOfferSnapshot_REJECT_FORMAT, nil, errRejectFormat},
  288. "reject_sender": {abci.ResponseOfferSnapshot_REJECT_SENDER, nil, errRejectSender},
  289. "error": {0, boom, boom},
  290. "unknown result": {9, nil, unknownErr},
  291. }
  292. for name, tc := range testcases {
  293. tc := tc
  294. t.Run(name, func(t *testing.T) {
  295. syncer, connSnapshot := setupOfferSyncer(t)
  296. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  297. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  298. Snapshot: toABCI(s),
  299. AppHash: []byte("app_hash"),
  300. }).Return(&abci.ResponseOfferSnapshot{Result: tc.result}, tc.err)
  301. err := syncer.offerSnapshot(s)
  302. if tc.expectErr == unknownErr {
  303. require.Error(t, err)
  304. } else {
  305. unwrapped := errors.Unwrap(err)
  306. if unwrapped != nil {
  307. err = unwrapped
  308. }
  309. assert.Equal(t, tc.expectErr, err)
  310. }
  311. })
  312. }
  313. }
  314. func TestSyncer_applyChunks_Results(t *testing.T) {
  315. unknownErr := errors.New("unknown error")
  316. boom := errors.New("boom")
  317. testcases := map[string]struct {
  318. result abci.ResponseApplySnapshotChunk_Result
  319. err error
  320. expectErr error
  321. }{
  322. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT, nil, nil},
  323. "abort": {abci.ResponseApplySnapshotChunk_ABORT, nil, errAbort},
  324. "retry": {abci.ResponseApplySnapshotChunk_RETRY, nil, nil},
  325. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT, nil, errRetrySnapshot},
  326. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT, nil, errRejectSnapshot},
  327. "error": {0, boom, boom},
  328. "unknown result": {9, nil, unknownErr},
  329. }
  330. for name, tc := range testcases {
  331. tc := tc
  332. t.Run(name, func(t *testing.T) {
  333. connQuery := &proxymocks.AppConnQuery{}
  334. connSnapshot := &proxymocks.AppConnSnapshot{}
  335. stateProvider := &mocks.StateProvider{}
  336. stateProvider.On("AppHash", mock.Anything).Return([]byte("app_hash"), nil)
  337. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  338. body := []byte{1, 2, 3}
  339. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "")
  340. chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: body})
  341. require.NoError(t, err)
  342. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  343. Index: 0, Chunk: body,
  344. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: tc.result}, tc.err)
  345. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  346. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  347. Index: 0, Chunk: body,
  348. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  349. Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  350. }
  351. err = syncer.applyChunks(chunks)
  352. if tc.expectErr == unknownErr {
  353. require.Error(t, err)
  354. } else {
  355. unwrapped := errors.Unwrap(err)
  356. if unwrapped != nil {
  357. err = unwrapped
  358. }
  359. assert.Equal(t, tc.expectErr, err)
  360. }
  361. connSnapshot.AssertExpectations(t)
  362. })
  363. }
  364. }
  365. func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
  366. // Discarding chunks via refetch_chunks should work the same for all results
  367. testcases := map[string]struct {
  368. result abci.ResponseApplySnapshotChunk_Result
  369. }{
  370. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  371. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  372. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  373. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  374. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  375. }
  376. for name, tc := range testcases {
  377. tc := tc
  378. t.Run(name, func(t *testing.T) {
  379. connQuery := &proxymocks.AppConnQuery{}
  380. connSnapshot := &proxymocks.AppConnSnapshot{}
  381. stateProvider := &mocks.StateProvider{}
  382. stateProvider.On("AppHash", mock.Anything).Return([]byte("app_hash"), nil)
  383. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  384. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "")
  385. require.NoError(t, err)
  386. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}})
  387. require.True(t, added)
  388. require.NoError(t, err)
  389. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}})
  390. require.True(t, added)
  391. require.NoError(t, err)
  392. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}})
  393. require.True(t, added)
  394. require.NoError(t, err)
  395. // The first two chunks are accepted, before the last one asks for 1 to be refetched
  396. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  397. Index: 0, Chunk: []byte{0},
  398. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  399. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  400. Index: 1, Chunk: []byte{1},
  401. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  402. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  403. Index: 2, Chunk: []byte{2},
  404. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  405. Result: tc.result,
  406. RefetchChunks: []uint32{1},
  407. }, nil)
  408. // Since removing the chunk will cause Next() to block, we spawn a goroutine, then
  409. // check the queue contents, and finally close the queue to end the goroutine.
  410. // We don't really care about the result of applyChunks, since it has separate test.
  411. go func() {
  412. syncer.applyChunks(chunks)
  413. }()
  414. time.Sleep(50 * time.Millisecond)
  415. assert.True(t, chunks.Has(0))
  416. assert.False(t, chunks.Has(1))
  417. assert.True(t, chunks.Has(2))
  418. err = chunks.Close()
  419. require.NoError(t, err)
  420. })
  421. }
  422. }
  423. func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
  424. // Banning chunks senders via ban_chunk_senders should work the same for all results
  425. testcases := map[string]struct {
  426. result abci.ResponseApplySnapshotChunk_Result
  427. }{
  428. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  429. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  430. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  431. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  432. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  433. }
  434. for name, tc := range testcases {
  435. tc := tc
  436. t.Run(name, func(t *testing.T) {
  437. connQuery := &proxymocks.AppConnQuery{}
  438. connSnapshot := &proxymocks.AppConnSnapshot{}
  439. stateProvider := &mocks.StateProvider{}
  440. stateProvider.On("AppHash", mock.Anything).Return([]byte("app_hash"), nil)
  441. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  442. // Set up three peers across two snapshots, and ask for one of them to be banned.
  443. // It should be banned from all snapshots.
  444. peerA := simplePeer("a")
  445. peerB := simplePeer("b")
  446. peerC := simplePeer("c")
  447. s1 := &snapshot{Height: 1, Format: 1, Chunks: 3}
  448. s2 := &snapshot{Height: 2, Format: 1, Chunks: 3}
  449. syncer.AddSnapshot(peerA, s1)
  450. syncer.AddSnapshot(peerA, s2)
  451. syncer.AddSnapshot(peerB, s1)
  452. syncer.AddSnapshot(peerB, s2)
  453. syncer.AddSnapshot(peerC, s1)
  454. syncer.AddSnapshot(peerC, s2)
  455. chunks, err := newChunkQueue(s1, "")
  456. require.NoError(t, err)
  457. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}, Sender: peerA.ID()})
  458. require.True(t, added)
  459. require.NoError(t, err)
  460. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}, Sender: peerB.ID()})
  461. require.True(t, added)
  462. require.NoError(t, err)
  463. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}, Sender: peerC.ID()})
  464. require.True(t, added)
  465. require.NoError(t, err)
  466. // The first two chunks are accepted, before the last one asks for b sender to be rejected
  467. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  468. Index: 0, Chunk: []byte{0}, Sender: "a",
  469. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  470. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  471. Index: 1, Chunk: []byte{1}, Sender: "b",
  472. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  473. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  474. Index: 2, Chunk: []byte{2}, Sender: "c",
  475. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  476. Result: tc.result,
  477. RejectSenders: []string{string(peerB.ID())},
  478. }, nil)
  479. // On retry, the last chunk will be tried again, so we just accept it then.
  480. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  481. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  482. Index: 2, Chunk: []byte{2}, Sender: "c",
  483. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  484. }
  485. // We don't really care about the result of applyChunks, since it has separate test.
  486. // However, it will block on e.g. retry result, so we spawn a goroutine that will
  487. // be shut down when the chunk queue closes.
  488. go func() {
  489. syncer.applyChunks(chunks)
  490. }()
  491. time.Sleep(50 * time.Millisecond)
  492. s1peers := syncer.snapshots.GetPeers(s1)
  493. assert.Len(t, s1peers, 2)
  494. assert.EqualValues(t, "a", s1peers[0].ID())
  495. assert.EqualValues(t, "c", s1peers[1].ID())
  496. syncer.snapshots.GetPeers(s1)
  497. assert.Len(t, s1peers, 2)
  498. assert.EqualValues(t, "a", s1peers[0].ID())
  499. assert.EqualValues(t, "c", s1peers[1].ID())
  500. err = chunks.Close()
  501. require.NoError(t, err)
  502. })
  503. }
  504. }
  505. func TestSyncer_verifyApp(t *testing.T) {
  506. boom := errors.New("boom")
  507. s := &snapshot{Height: 3, Format: 1, Chunks: 5, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  508. testcases := map[string]struct {
  509. response *abci.ResponseInfo
  510. err error
  511. expectErr error
  512. }{
  513. "verified": {&abci.ResponseInfo{
  514. LastBlockHeight: 3,
  515. LastBlockAppHash: []byte("app_hash"),
  516. AppVersion: 9,
  517. }, nil, nil},
  518. "invalid height": {&abci.ResponseInfo{
  519. LastBlockHeight: 5,
  520. LastBlockAppHash: []byte("app_hash"),
  521. AppVersion: 9,
  522. }, nil, errVerifyFailed},
  523. "invalid hash": {&abci.ResponseInfo{
  524. LastBlockHeight: 3,
  525. LastBlockAppHash: []byte("xxx"),
  526. AppVersion: 9,
  527. }, nil, errVerifyFailed},
  528. "error": {nil, boom, boom},
  529. }
  530. for name, tc := range testcases {
  531. tc := tc
  532. t.Run(name, func(t *testing.T) {
  533. connQuery := &proxymocks.AppConnQuery{}
  534. connSnapshot := &proxymocks.AppConnSnapshot{}
  535. stateProvider := &mocks.StateProvider{}
  536. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  537. connQuery.On("InfoSync", proxy.RequestInfo).Return(tc.response, tc.err)
  538. version, err := syncer.verifyApp(s)
  539. unwrapped := errors.Unwrap(err)
  540. if unwrapped != nil {
  541. err = unwrapped
  542. }
  543. assert.Equal(t, tc.expectErr, err)
  544. if err == nil {
  545. assert.Equal(t, tc.response.AppVersion, version)
  546. }
  547. })
  548. }
  549. }
  550. func toABCI(s *snapshot) *abci.Snapshot {
  551. return &abci.Snapshot{
  552. Height: s.Height,
  553. Format: s.Format,
  554. Chunks: s.Chunks,
  555. Hash: s.Hash,
  556. Metadata: s.Metadata,
  557. }
  558. }