You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

642 lines
24 KiB

  1. package statesync
  2. import (
  3. "errors"
  4. "sync"
  5. "testing"
  6. "time"
  7. ""
  8. ""
  9. ""
  10. abci ""
  11. ""
  12. ""
  13. p2pmocks ""
  14. tmstate ""
  15. ssproto ""
  16. tmversion ""
  17. ""
  18. proxymocks ""
  19. sm ""
  20. ""
  21. ""
  22. ""
  23. )
  24. // Sets up a basic syncer that can be used to test OfferSnapshot requests
  25. func setupOfferSyncer(t *testing.T) (*syncer, *proxymocks.AppConnSnapshot) {
  26. connQuery := &proxymocks.AppConnQuery{}
  27. connSnapshot := &proxymocks.AppConnSnapshot{}
  28. stateProvider := &mocks.StateProvider{}
  29. stateProvider.On("AppHash", mock.Anything).Return([]byte("app_hash"), nil)
  30. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  31. return syncer, connSnapshot
  32. }
  33. // Sets up a simple peer mock with an ID
  34. func simplePeer(id string) *p2pmocks.Peer {
  35. peer := &p2pmocks.Peer{}
  36. peer.On("ID").Return(p2p.ID(id))
  37. return peer
  38. }
  39. func TestSyncer_SyncAny(t *testing.T) {
  40. state := sm.State{
  41. ChainID: "chain",
  42. Version: tmstate.Version{
  43. Consensus: tmversion.Consensus{
  44. Block: version.BlockProtocol,
  45. App: 0,
  46. },
  47. Software: version.TMCoreSemVer,
  48. },
  49. LastBlockHeight: 1,
  50. LastBlockID: types.BlockID{Hash: []byte("blockhash")},
  51. LastBlockTime: time.Now(),
  52. LastResultsHash: []byte("last_results_hash"),
  53. AppHash: []byte("app_hash"),
  54. LastValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val1")}},
  55. Validators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val2")}},
  56. NextValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val3")}},
  57. ConsensusParams: *types.DefaultConsensusParams(),
  58. LastHeightConsensusParamsChanged: 1,
  59. }
  60. commit := &types.Commit{BlockID: types.BlockID{Hash: []byte("blockhash")}}
  61. chunks := []*chunk{
  62. {Height: 1, Format: 1, Index: 0, Chunk: []byte{1, 1, 0}},
  63. {Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 1, 1}},
  64. {Height: 1, Format: 1, Index: 2, Chunk: []byte{1, 1, 2}},
  65. }
  66. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  67. stateProvider := &mocks.StateProvider{}
  68. stateProvider.On("AppHash", uint64(1)).Return(state.AppHash, nil)
  69. stateProvider.On("AppHash", uint64(2)).Return([]byte("app_hash_2"), nil)
  70. stateProvider.On("Commit", uint64(1)).Return(commit, nil)
  71. stateProvider.On("State", uint64(1)).Return(state, nil)
  72. connSnapshot := &proxymocks.AppConnSnapshot{}
  73. connQuery := &proxymocks.AppConnQuery{}
  74. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  75. // Adding a chunk should error when no sync is in progress
  76. _, err := syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}})
  77. require.Error(t, err)
  78. // Adding a couple of peers should trigger snapshot discovery messages
  79. peerA := &p2pmocks.Peer{}
  80. peerA.On("ID").Return(p2p.ID("a"))
  81. peerA.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true)
  82. syncer.AddPeer(peerA)
  83. peerA.AssertExpectations(t)
  84. peerB := &p2pmocks.Peer{}
  85. peerB.On("ID").Return(p2p.ID("b"))
  86. peerB.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true)
  87. syncer.AddPeer(peerB)
  88. peerB.AssertExpectations(t)
  89. // Both peers report back with snapshots. One of them also returns a snapshot we don't want, in
  90. // format 2, which will be rejected by the ABCI application.
  91. new, err := syncer.AddSnapshot(peerA, s)
  92. require.NoError(t, err)
  93. assert.True(t, new)
  94. new, err = syncer.AddSnapshot(peerB, s)
  95. require.NoError(t, err)
  96. assert.False(t, new)
  97. new, err = syncer.AddSnapshot(peerB, &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1}})
  98. require.NoError(t, err)
  99. assert.True(t, new)
  100. // We start a sync, with peers sending back chunks when requested. We first reject the snapshot
  101. // with height 2 format 2, and accept the snapshot at height 1.
  102. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  103. Snapshot: &abci.Snapshot{
  104. Height: 2,
  105. Format: 2,
  106. Chunks: 3,
  107. Hash: []byte{1},
  108. },
  109. AppHash: []byte("app_hash_2"),
  110. }).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  111. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  112. Snapshot: &abci.Snapshot{
  113. Height: s.Height,
  114. Format: s.Format,
  115. Chunks: s.Chunks,
  116. Hash: s.Hash,
  117. Metadata: s.Metadata,
  118. },
  119. AppHash: []byte("app_hash"),
  120. }).Times(2).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil)
  121. chunkRequests := make(map[uint32]int)
  122. chunkRequestsMtx := sync.Mutex{}
  123. onChunkRequest := func(args mock.Arguments) {
  124. pb, err := decodeMsg(args[1].([]byte))
  125. require.NoError(t, err)
  126. msg := pb.(*ssproto.ChunkRequest)
  127. require.EqualValues(t, 1, msg.Height)
  128. require.EqualValues(t, 1, msg.Format)
  129. require.LessOrEqual(t, msg.Index, uint32(len(chunks)))
  130. added, err := syncer.AddChunk(chunks[msg.Index])
  131. require.NoError(t, err)
  132. assert.True(t, added)
  133. chunkRequestsMtx.Lock()
  134. chunkRequests[msg.Index]++
  135. chunkRequestsMtx.Unlock()
  136. }
  137. peerA.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
  138. peerB.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
  139. // The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
  140. // which should cause it to keep the existing chunk 0 and 2, and restart restoration from
  141. // beginning. We also wait for a little while, to exercise the retry logic in fetchChunks().
  142. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  143. Index: 2, Chunk: []byte{1, 1, 2},
  144. }).Once().Run(func(args mock.Arguments) { time.Sleep(2 * time.Second) }).Return(
  145. &abci.ResponseApplySnapshotChunk{
  146. Result: abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT,
  147. RefetchChunks: []uint32{1},
  148. }, nil)
  149. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  150. Index: 0, Chunk: []byte{1, 1, 0},
  151. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  152. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  153. Index: 1, Chunk: []byte{1, 1, 1},
  154. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  155. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  156. Index: 2, Chunk: []byte{1, 1, 2},
  157. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  158. connQuery.On("InfoSync", proxy.RequestInfo).Return(&abci.ResponseInfo{
  159. AppVersion: 9,
  160. LastBlockHeight: 1,
  161. LastBlockAppHash: []byte("app_hash"),
  162. }, nil)
  163. newState, lastCommit, err := syncer.SyncAny(0)
  164. require.NoError(t, err)
  165. chunkRequestsMtx.Lock()
  166. assert.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests)
  167. chunkRequestsMtx.Unlock()
  168. // The syncer should have updated the state app version from the ABCI info response.
  169. expectState := state
  170. expectState.Version.Consensus.App = 9
  171. assert.Equal(t, expectState, newState)
  172. assert.Equal(t, commit, lastCommit)
  173. connSnapshot.AssertExpectations(t)
  174. connQuery.AssertExpectations(t)
  175. peerA.AssertExpectations(t)
  176. peerB.AssertExpectations(t)
  177. }
  178. func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
  179. syncer, _ := setupOfferSyncer(t)
  180. _, _, err := syncer.SyncAny(0)
  181. assert.Equal(t, errNoSnapshots, err)
  182. }
  183. func TestSyncer_SyncAny_ABORT(t *testing.T) {
  184. syncer, connSnapshot := setupOfferSyncer(t)
  185. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  186. syncer.AddSnapshot(simplePeer("id"), s)
  187. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  188. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  189. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  190. _, _, err := syncer.SyncAny(0)
  191. assert.Equal(t, errAbort, err)
  192. connSnapshot.AssertExpectations(t)
  193. }
  194. func TestSyncer_SyncAny_reject(t *testing.T) {
  195. syncer, connSnapshot := setupOfferSyncer(t)
  196. // s22 is tried first, then s12, then s11, then errNoSnapshots
  197. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  198. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  199. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  200. syncer.AddSnapshot(simplePeer("id"), s22)
  201. syncer.AddSnapshot(simplePeer("id"), s12)
  202. syncer.AddSnapshot(simplePeer("id"), s11)
  203. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  204. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  205. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  206. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  207. Snapshot: toABCI(s12), AppHash: []byte("app_hash"),
  208. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  209. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  210. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  211. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  212. _, _, err := syncer.SyncAny(0)
  213. assert.Equal(t, errNoSnapshots, err)
  214. connSnapshot.AssertExpectations(t)
  215. }
  216. func TestSyncer_SyncAny_REJECT_FORMAT(t *testing.T) {
  217. syncer, connSnapshot := setupOfferSyncer(t)
  218. // s22 is tried first, which reject s22 and s12, then s11 will abort.
  219. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  220. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  221. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  222. syncer.AddSnapshot(simplePeer("id"), s22)
  223. syncer.AddSnapshot(simplePeer("id"), s12)
  224. syncer.AddSnapshot(simplePeer("id"), s11)
  225. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  226. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  227. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  228. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  229. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  230. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  231. _, _, err := syncer.SyncAny(0)
  232. assert.Equal(t, errAbort, err)
  233. connSnapshot.AssertExpectations(t)
  234. }
  235. func TestSyncer_SyncAny_reject_sender(t *testing.T) {
  236. syncer, connSnapshot := setupOfferSyncer(t)
  237. peerA := simplePeer("a")
  238. peerB := simplePeer("b")
  239. peerC := simplePeer("c")
  240. // sbc will be offered first, which will be rejected with reject_sender, causing all snapshots
  241. // submitted by both b and c (i.e. sb, sc, sbc) to be rejected. Finally, sa will reject and
  242. // errNoSnapshots is returned.
  243. sa := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  244. sb := &snapshot{Height: 2, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  245. sc := &snapshot{Height: 3, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  246. sbc := &snapshot{Height: 4, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  247. _, err := syncer.AddSnapshot(peerA, sa)
  248. require.NoError(t, err)
  249. _, err = syncer.AddSnapshot(peerB, sb)
  250. require.NoError(t, err)
  251. _, err = syncer.AddSnapshot(peerC, sc)
  252. require.NoError(t, err)
  253. _, err = syncer.AddSnapshot(peerB, sbc)
  254. require.NoError(t, err)
  255. _, err = syncer.AddSnapshot(peerC, sbc)
  256. require.NoError(t, err)
  257. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  258. Snapshot: toABCI(sbc), AppHash: []byte("app_hash"),
  259. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_SENDER}, nil)
  260. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  261. Snapshot: toABCI(sa), AppHash: []byte("app_hash"),
  262. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  263. _, _, err = syncer.SyncAny(0)
  264. assert.Equal(t, errNoSnapshots, err)
  265. connSnapshot.AssertExpectations(t)
  266. }
  267. func TestSyncer_SyncAny_abciError(t *testing.T) {
  268. syncer, connSnapshot := setupOfferSyncer(t)
  269. errBoom := errors.New("boom")
  270. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  271. syncer.AddSnapshot(simplePeer("id"), s)
  272. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  273. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  274. }).Once().Return(nil, errBoom)
  275. _, _, err := syncer.SyncAny(0)
  276. assert.True(t, errors.Is(err, errBoom))
  277. connSnapshot.AssertExpectations(t)
  278. }
  279. func TestSyncer_offerSnapshot(t *testing.T) {
  280. unknownErr := errors.New("unknown error")
  281. boom := errors.New("boom")
  282. testcases := map[string]struct {
  283. result abci.ResponseOfferSnapshot_Result
  284. err error
  285. expectErr error
  286. }{
  287. "accept": {abci.ResponseOfferSnapshot_ACCEPT, nil, nil},
  288. "abort": {abci.ResponseOfferSnapshot_ABORT, nil, errAbort},
  289. "reject": {abci.ResponseOfferSnapshot_REJECT, nil, errRejectSnapshot},
  290. "reject_format": {abci.ResponseOfferSnapshot_REJECT_FORMAT, nil, errRejectFormat},
  291. "reject_sender": {abci.ResponseOfferSnapshot_REJECT_SENDER, nil, errRejectSender},
  292. "error": {0, boom, boom},
  293. "unknown result": {9, nil, unknownErr},
  294. }
  295. for name, tc := range testcases {
  296. tc := tc
  297. t.Run(name, func(t *testing.T) {
  298. syncer, connSnapshot := setupOfferSyncer(t)
  299. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  300. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  301. Snapshot: toABCI(s),
  302. AppHash: []byte("app_hash"),
  303. }).Return(&abci.ResponseOfferSnapshot{Result: tc.result}, tc.err)
  304. err := syncer.offerSnapshot(s)
  305. if tc.expectErr == unknownErr {
  306. require.Error(t, err)
  307. } else {
  308. unwrapped := errors.Unwrap(err)
  309. if unwrapped != nil {
  310. err = unwrapped
  311. }
  312. assert.Equal(t, tc.expectErr, err)
  313. }
  314. })
  315. }
  316. }
  317. func TestSyncer_applyChunks_Results(t *testing.T) {
  318. unknownErr := errors.New("unknown error")
  319. boom := errors.New("boom")
  320. testcases := map[string]struct {
  321. result abci.ResponseApplySnapshotChunk_Result
  322. err error
  323. expectErr error
  324. }{
  325. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT, nil, nil},
  326. "abort": {abci.ResponseApplySnapshotChunk_ABORT, nil, errAbort},
  327. "retry": {abci.ResponseApplySnapshotChunk_RETRY, nil, nil},
  328. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT, nil, errRetrySnapshot},
  329. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT, nil, errRejectSnapshot},
  330. "error": {0, boom, boom},
  331. "unknown result": {9, nil, unknownErr},
  332. }
  333. for name, tc := range testcases {
  334. tc := tc
  335. t.Run(name, func(t *testing.T) {
  336. connQuery := &proxymocks.AppConnQuery{}
  337. connSnapshot := &proxymocks.AppConnSnapshot{}
  338. stateProvider := &mocks.StateProvider{}
  339. stateProvider.On("AppHash", mock.Anything).Return([]byte("app_hash"), nil)
  340. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  341. body := []byte{1, 2, 3}
  342. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "")
  343. chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: body})
  344. require.NoError(t, err)
  345. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  346. Index: 0, Chunk: body,
  347. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: tc.result}, tc.err)
  348. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  349. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  350. Index: 0, Chunk: body,
  351. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  352. Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  353. }
  354. err = syncer.applyChunks(chunks)
  355. if tc.expectErr == unknownErr {
  356. require.Error(t, err)
  357. } else {
  358. unwrapped := errors.Unwrap(err)
  359. if unwrapped != nil {
  360. err = unwrapped
  361. }
  362. assert.Equal(t, tc.expectErr, err)
  363. }
  364. connSnapshot.AssertExpectations(t)
  365. })
  366. }
  367. }
  368. func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
  369. // Discarding chunks via refetch_chunks should work the same for all results
  370. testcases := map[string]struct {
  371. result abci.ResponseApplySnapshotChunk_Result
  372. }{
  373. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  374. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  375. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  376. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  377. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  378. }
  379. for name, tc := range testcases {
  380. tc := tc
  381. t.Run(name, func(t *testing.T) {
  382. connQuery := &proxymocks.AppConnQuery{}
  383. connSnapshot := &proxymocks.AppConnSnapshot{}
  384. stateProvider := &mocks.StateProvider{}
  385. stateProvider.On("AppHash", mock.Anything).Return([]byte("app_hash"), nil)
  386. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  387. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "")
  388. require.NoError(t, err)
  389. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}})
  390. require.True(t, added)
  391. require.NoError(t, err)
  392. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}})
  393. require.True(t, added)
  394. require.NoError(t, err)
  395. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}})
  396. require.True(t, added)
  397. require.NoError(t, err)
  398. // The first two chunks are accepted, before the last one asks for 1 to be refetched
  399. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  400. Index: 0, Chunk: []byte{0},
  401. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  402. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  403. Index: 1, Chunk: []byte{1},
  404. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  405. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  406. Index: 2, Chunk: []byte{2},
  407. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  408. Result: tc.result,
  409. RefetchChunks: []uint32{1},
  410. }, nil)
  411. // Since removing the chunk will cause Next() to block, we spawn a goroutine, then
  412. // check the queue contents, and finally close the queue to end the goroutine.
  413. // We don't really care about the result of applyChunks, since it has separate test.
  414. go func() {
  415. syncer.applyChunks(chunks)
  416. }()
  417. time.Sleep(50 * time.Millisecond)
  418. assert.True(t, chunks.Has(0))
  419. assert.False(t, chunks.Has(1))
  420. assert.True(t, chunks.Has(2))
  421. err = chunks.Close()
  422. require.NoError(t, err)
  423. })
  424. }
  425. }
  426. func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
  427. // Banning chunks senders via ban_chunk_senders should work the same for all results
  428. testcases := map[string]struct {
  429. result abci.ResponseApplySnapshotChunk_Result
  430. }{
  431. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  432. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  433. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  434. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  435. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  436. }
  437. for name, tc := range testcases {
  438. tc := tc
  439. t.Run(name, func(t *testing.T) {
  440. connQuery := &proxymocks.AppConnQuery{}
  441. connSnapshot := &proxymocks.AppConnSnapshot{}
  442. stateProvider := &mocks.StateProvider{}
  443. stateProvider.On("AppHash", mock.Anything).Return([]byte("app_hash"), nil)
  444. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  445. // Set up three peers across two snapshots, and ask for one of them to be banned.
  446. // It should be banned from all snapshots.
  447. peerA := simplePeer("a")
  448. peerB := simplePeer("b")
  449. peerC := simplePeer("c")
  450. s1 := &snapshot{Height: 1, Format: 1, Chunks: 3}
  451. s2 := &snapshot{Height: 2, Format: 1, Chunks: 3}
  452. syncer.AddSnapshot(peerA, s1)
  453. syncer.AddSnapshot(peerA, s2)
  454. syncer.AddSnapshot(peerB, s1)
  455. syncer.AddSnapshot(peerB, s2)
  456. syncer.AddSnapshot(peerC, s1)
  457. syncer.AddSnapshot(peerC, s2)
  458. chunks, err := newChunkQueue(s1, "")
  459. require.NoError(t, err)
  460. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}, Sender: peerA.ID()})
  461. require.True(t, added)
  462. require.NoError(t, err)
  463. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}, Sender: peerB.ID()})
  464. require.True(t, added)
  465. require.NoError(t, err)
  466. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}, Sender: peerC.ID()})
  467. require.True(t, added)
  468. require.NoError(t, err)
  469. // The first two chunks are accepted, before the last one asks for b sender to be rejected
  470. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  471. Index: 0, Chunk: []byte{0}, Sender: "a",
  472. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  473. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  474. Index: 1, Chunk: []byte{1}, Sender: "b",
  475. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  476. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  477. Index: 2, Chunk: []byte{2}, Sender: "c",
  478. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  479. Result: tc.result,
  480. RejectSenders: []string{string(peerB.ID())},
  481. }, nil)
  482. // On retry, the last chunk will be tried again, so we just accept it then.
  483. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  484. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  485. Index: 2, Chunk: []byte{2}, Sender: "c",
  486. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  487. }
  488. // We don't really care about the result of applyChunks, since it has separate test.
  489. // However, it will block on e.g. retry result, so we spawn a goroutine that will
  490. // be shut down when the chunk queue closes.
  491. go func() {
  492. syncer.applyChunks(chunks)
  493. }()
  494. time.Sleep(50 * time.Millisecond)
  495. s1peers := syncer.snapshots.GetPeers(s1)
  496. assert.Len(t, s1peers, 2)
  497. assert.EqualValues(t, "a", s1peers[0].ID())
  498. assert.EqualValues(t, "c", s1peers[1].ID())
  499. syncer.snapshots.GetPeers(s1)
  500. assert.Len(t, s1peers, 2)
  501. assert.EqualValues(t, "a", s1peers[0].ID())
  502. assert.EqualValues(t, "c", s1peers[1].ID())
  503. err = chunks.Close()
  504. require.NoError(t, err)
  505. })
  506. }
  507. }
  508. func TestSyncer_verifyApp(t *testing.T) {
  509. boom := errors.New("boom")
  510. s := &snapshot{Height: 3, Format: 1, Chunks: 5, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  511. testcases := map[string]struct {
  512. response *abci.ResponseInfo
  513. err error
  514. expectErr error
  515. }{
  516. "verified": {&abci.ResponseInfo{
  517. LastBlockHeight: 3,
  518. LastBlockAppHash: []byte("app_hash"),
  519. AppVersion: 9,
  520. }, nil, nil},
  521. "invalid height": {&abci.ResponseInfo{
  522. LastBlockHeight: 5,
  523. LastBlockAppHash: []byte("app_hash"),
  524. AppVersion: 9,
  525. }, nil, errVerifyFailed},
  526. "invalid hash": {&abci.ResponseInfo{
  527. LastBlockHeight: 3,
  528. LastBlockAppHash: []byte("xxx"),
  529. AppVersion: 9,
  530. }, nil, errVerifyFailed},
  531. "error": {nil, boom, boom},
  532. }
  533. for name, tc := range testcases {
  534. tc := tc
  535. t.Run(name, func(t *testing.T) {
  536. connQuery := &proxymocks.AppConnQuery{}
  537. connSnapshot := &proxymocks.AppConnSnapshot{}
  538. stateProvider := &mocks.StateProvider{}
  539. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  540. connQuery.On("InfoSync", proxy.RequestInfo).Return(tc.response, tc.err)
  541. version, err := syncer.verifyApp(s)
  542. unwrapped := errors.Unwrap(err)
  543. if unwrapped != nil {
  544. err = unwrapped
  545. }
  546. assert.Equal(t, tc.expectErr, err)
  547. if err == nil {
  548. assert.Equal(t, tc.response.AppVersion, version)
  549. }
  550. })
  551. }
  552. }
  553. func toABCI(s *snapshot) *abci.Snapshot {
  554. return &abci.Snapshot{
  555. Height: s.Height,
  556. Format: s.Format,
  557. Chunks: s.Chunks,
  558. Hash: s.Hash,
  559. Metadata: s.Metadata,
  560. }
  561. }