You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

640 lines
24 KiB

  1. package statesync
  2. import (
  3. "errors"
  4. "sync"
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/assert"
  8. "github.com/stretchr/testify/mock"
  9. "github.com/stretchr/testify/require"
  10. abci "github.com/tendermint/tendermint/abci/types"
  11. "github.com/tendermint/tendermint/libs/log"
  12. "github.com/tendermint/tendermint/p2p"
  13. p2pmocks "github.com/tendermint/tendermint/p2p/mocks"
  14. ssproto "github.com/tendermint/tendermint/proto/statesync"
  15. "github.com/tendermint/tendermint/proxy"
  16. proxymocks "github.com/tendermint/tendermint/proxy/mocks"
  17. sm "github.com/tendermint/tendermint/state"
  18. "github.com/tendermint/tendermint/statesync/mocks"
  19. "github.com/tendermint/tendermint/types"
  20. "github.com/tendermint/tendermint/version"
  21. )
  22. // Sets up a basic syncer that can be used to test OfferSnapshot requests
  23. func setupOfferSyncer(t *testing.T) (*syncer, *proxymocks.AppConnSnapshot) {
  24. connQuery := &proxymocks.AppConnQuery{}
  25. connSnapshot := &proxymocks.AppConnSnapshot{}
  26. stateProvider := &mocks.StateProvider{}
  27. stateProvider.On("AppHash", mock.Anything).Return([]byte("app_hash"), nil)
  28. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  29. return syncer, connSnapshot
  30. }
  31. // Sets up a simple peer mock with an ID
  32. func simplePeer(id string) *p2pmocks.Peer {
  33. peer := &p2pmocks.Peer{}
  34. peer.On("ID").Return(p2p.ID(id))
  35. return peer
  36. }
  37. func TestSyncer_SyncAny(t *testing.T) {
  38. state := sm.State{
  39. ChainID: "chain",
  40. Version: sm.Version{
  41. Consensus: version.Consensus{
  42. Block: version.BlockProtocol,
  43. App: 0,
  44. },
  45. Software: version.TMCoreSemVer,
  46. },
  47. LastBlockHeight: 1,
  48. LastBlockID: types.BlockID{Hash: []byte("blockhash")},
  49. LastBlockTime: time.Now(),
  50. LastResultsHash: []byte("last_results_hash"),
  51. AppHash: []byte("app_hash"),
  52. LastValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val1")}},
  53. Validators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val2")}},
  54. NextValidators: &types.ValidatorSet{Proposer: &types.Validator{Address: []byte("val3")}},
  55. ConsensusParams: *types.DefaultConsensusParams(),
  56. LastHeightConsensusParamsChanged: 1,
  57. }
  58. commit := &types.Commit{BlockID: types.BlockID{Hash: []byte("blockhash")}}
  59. chunks := []*chunk{
  60. {Height: 1, Format: 1, Index: 0, Chunk: []byte{1, 1, 0}},
  61. {Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 1, 1}},
  62. {Height: 1, Format: 1, Index: 2, Chunk: []byte{1, 1, 2}},
  63. }
  64. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  65. stateProvider := &mocks.StateProvider{}
  66. stateProvider.On("AppHash", uint64(1)).Return(state.AppHash, nil)
  67. stateProvider.On("AppHash", uint64(2)).Return([]byte("app_hash_2"), nil)
  68. stateProvider.On("Commit", uint64(1)).Return(commit, nil)
  69. stateProvider.On("State", uint64(1)).Return(state, nil)
  70. connSnapshot := &proxymocks.AppConnSnapshot{}
  71. connQuery := &proxymocks.AppConnQuery{}
  72. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  73. // Adding a chunk should error when no sync is in progress
  74. _, err := syncer.AddChunk(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{1}})
  75. require.Error(t, err)
  76. // Adding a couple of peers should trigger snapshot discovery messages
  77. peerA := &p2pmocks.Peer{}
  78. peerA.On("ID").Return(p2p.ID("a"))
  79. peerA.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true)
  80. syncer.AddPeer(peerA)
  81. peerA.AssertExpectations(t)
  82. peerB := &p2pmocks.Peer{}
  83. peerB.On("ID").Return(p2p.ID("b"))
  84. peerB.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true)
  85. syncer.AddPeer(peerB)
  86. peerB.AssertExpectations(t)
  87. // Both peers report back with snapshots. One of them also returns a snapshot we don't want, in
  88. // format 2, which will be rejected by the ABCI application.
  89. new, err := syncer.AddSnapshot(peerA, s)
  90. require.NoError(t, err)
  91. assert.True(t, new)
  92. new, err = syncer.AddSnapshot(peerB, s)
  93. require.NoError(t, err)
  94. assert.False(t, new)
  95. new, err = syncer.AddSnapshot(peerB, &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1}})
  96. require.NoError(t, err)
  97. assert.True(t, new)
  98. // We start a sync, with peers sending back chunks when requested. We first reject the snapshot
  99. // with height 2 format 2, and accept the snapshot at height 1.
  100. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  101. Snapshot: &abci.Snapshot{
  102. Height: 2,
  103. Format: 2,
  104. Chunks: 3,
  105. Hash: []byte{1},
  106. },
  107. AppHash: []byte("app_hash_2"),
  108. }).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  109. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  110. Snapshot: &abci.Snapshot{
  111. Height: s.Height,
  112. Format: s.Format,
  113. Chunks: s.Chunks,
  114. Hash: s.Hash,
  115. Metadata: s.Metadata,
  116. },
  117. AppHash: []byte("app_hash"),
  118. }).Times(2).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil)
  119. chunkRequests := make(map[uint32]int)
  120. chunkRequestsMtx := sync.Mutex{}
  121. onChunkRequest := func(args mock.Arguments) {
  122. pb, err := decodeMsg(args[1].([]byte))
  123. require.NoError(t, err)
  124. msg := pb.(*ssproto.ChunkRequest)
  125. require.EqualValues(t, 1, msg.Height)
  126. require.EqualValues(t, 1, msg.Format)
  127. require.LessOrEqual(t, msg.Index, uint32(len(chunks)))
  128. added, err := syncer.AddChunk(chunks[msg.Index])
  129. require.NoError(t, err)
  130. assert.True(t, added)
  131. chunkRequestsMtx.Lock()
  132. chunkRequests[msg.Index]++
  133. chunkRequestsMtx.Unlock()
  134. }
  135. peerA.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
  136. peerB.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
  137. // The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
  138. // which should cause it to keep the existing chunk 0 and 2, and restart restoration from
  139. // beginning. We also wait for a little while, to exercise the retry logic in fetchChunks().
  140. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  141. Index: 2, Chunk: []byte{1, 1, 2},
  142. }).Once().Run(func(args mock.Arguments) { time.Sleep(2 * time.Second) }).Return(
  143. &abci.ResponseApplySnapshotChunk{
  144. Result: abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT,
  145. RefetchChunks: []uint32{1},
  146. }, nil)
  147. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  148. Index: 0, Chunk: []byte{1, 1, 0},
  149. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  150. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  151. Index: 1, Chunk: []byte{1, 1, 1},
  152. }).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  153. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  154. Index: 2, Chunk: []byte{1, 1, 2},
  155. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  156. connQuery.On("InfoSync", proxy.RequestInfo).Return(&abci.ResponseInfo{
  157. AppVersion: 9,
  158. LastBlockHeight: 1,
  159. LastBlockAppHash: []byte("app_hash"),
  160. }, nil)
  161. newState, lastCommit, err := syncer.SyncAny(0)
  162. require.NoError(t, err)
  163. chunkRequestsMtx.Lock()
  164. assert.Equal(t, map[uint32]int{0: 1, 1: 2, 2: 1}, chunkRequests)
  165. chunkRequestsMtx.Unlock()
  166. // The syncer should have updated the state app version from the ABCI info response.
  167. expectState := state
  168. expectState.Version.Consensus.App = 9
  169. assert.Equal(t, expectState, newState)
  170. assert.Equal(t, commit, lastCommit)
  171. connSnapshot.AssertExpectations(t)
  172. connQuery.AssertExpectations(t)
  173. peerA.AssertExpectations(t)
  174. peerB.AssertExpectations(t)
  175. }
  176. func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
  177. syncer, _ := setupOfferSyncer(t)
  178. _, _, err := syncer.SyncAny(0)
  179. assert.Equal(t, errNoSnapshots, err)
  180. }
  181. func TestSyncer_SyncAny_ABORT(t *testing.T) {
  182. syncer, connSnapshot := setupOfferSyncer(t)
  183. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  184. syncer.AddSnapshot(simplePeer("id"), s)
  185. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  186. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  187. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  188. _, _, err := syncer.SyncAny(0)
  189. assert.Equal(t, errAbort, err)
  190. connSnapshot.AssertExpectations(t)
  191. }
  192. func TestSyncer_SyncAny_reject(t *testing.T) {
  193. syncer, connSnapshot := setupOfferSyncer(t)
  194. // s22 is tried first, then s12, then s11, then errNoSnapshots
  195. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  196. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  197. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  198. syncer.AddSnapshot(simplePeer("id"), s22)
  199. syncer.AddSnapshot(simplePeer("id"), s12)
  200. syncer.AddSnapshot(simplePeer("id"), s11)
  201. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  202. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  203. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  204. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  205. Snapshot: toABCI(s12), AppHash: []byte("app_hash"),
  206. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  207. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  208. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  209. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  210. _, _, err := syncer.SyncAny(0)
  211. assert.Equal(t, errNoSnapshots, err)
  212. connSnapshot.AssertExpectations(t)
  213. }
  214. func TestSyncer_SyncAny_REJECT_FORMAT(t *testing.T) {
  215. syncer, connSnapshot := setupOfferSyncer(t)
  216. // s22 is tried first, which reject s22 and s12, then s11 will abort.
  217. s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  218. s12 := &snapshot{Height: 1, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
  219. s11 := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  220. syncer.AddSnapshot(simplePeer("id"), s22)
  221. syncer.AddSnapshot(simplePeer("id"), s12)
  222. syncer.AddSnapshot(simplePeer("id"), s11)
  223. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  224. Snapshot: toABCI(s22), AppHash: []byte("app_hash"),
  225. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
  226. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  227. Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
  228. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
  229. _, _, err := syncer.SyncAny(0)
  230. assert.Equal(t, errAbort, err)
  231. connSnapshot.AssertExpectations(t)
  232. }
  233. func TestSyncer_SyncAny_reject_sender(t *testing.T) {
  234. syncer, connSnapshot := setupOfferSyncer(t)
  235. peerA := simplePeer("a")
  236. peerB := simplePeer("b")
  237. peerC := simplePeer("c")
  238. // sbc will be offered first, which will be rejected with reject_sender, causing all snapshots
  239. // submitted by both b and c (i.e. sb, sc, sbc) to be rejected. Finally, sa will reject and
  240. // errNoSnapshots is returned.
  241. sa := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  242. sb := &snapshot{Height: 2, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  243. sc := &snapshot{Height: 3, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  244. sbc := &snapshot{Height: 4, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  245. _, err := syncer.AddSnapshot(peerA, sa)
  246. require.NoError(t, err)
  247. _, err = syncer.AddSnapshot(peerB, sb)
  248. require.NoError(t, err)
  249. _, err = syncer.AddSnapshot(peerC, sc)
  250. require.NoError(t, err)
  251. _, err = syncer.AddSnapshot(peerB, sbc)
  252. require.NoError(t, err)
  253. _, err = syncer.AddSnapshot(peerC, sbc)
  254. require.NoError(t, err)
  255. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  256. Snapshot: toABCI(sbc), AppHash: []byte("app_hash"),
  257. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_SENDER}, nil)
  258. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  259. Snapshot: toABCI(sa), AppHash: []byte("app_hash"),
  260. }).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
  261. _, _, err = syncer.SyncAny(0)
  262. assert.Equal(t, errNoSnapshots, err)
  263. connSnapshot.AssertExpectations(t)
  264. }
  265. func TestSyncer_SyncAny_abciError(t *testing.T) {
  266. syncer, connSnapshot := setupOfferSyncer(t)
  267. errBoom := errors.New("boom")
  268. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
  269. syncer.AddSnapshot(simplePeer("id"), s)
  270. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  271. Snapshot: toABCI(s), AppHash: []byte("app_hash"),
  272. }).Once().Return(nil, errBoom)
  273. _, _, err := syncer.SyncAny(0)
  274. assert.True(t, errors.Is(err, errBoom))
  275. connSnapshot.AssertExpectations(t)
  276. }
  277. func TestSyncer_offerSnapshot(t *testing.T) {
  278. unknownErr := errors.New("unknown error")
  279. boom := errors.New("boom")
  280. testcases := map[string]struct {
  281. result abci.ResponseOfferSnapshot_Result
  282. err error
  283. expectErr error
  284. }{
  285. "accept": {abci.ResponseOfferSnapshot_ACCEPT, nil, nil},
  286. "abort": {abci.ResponseOfferSnapshot_ABORT, nil, errAbort},
  287. "reject": {abci.ResponseOfferSnapshot_REJECT, nil, errRejectSnapshot},
  288. "reject_format": {abci.ResponseOfferSnapshot_REJECT_FORMAT, nil, errRejectFormat},
  289. "reject_sender": {abci.ResponseOfferSnapshot_REJECT_SENDER, nil, errRejectSender},
  290. "error": {0, boom, boom},
  291. "unknown result": {9, nil, unknownErr},
  292. }
  293. for name, tc := range testcases {
  294. tc := tc
  295. t.Run(name, func(t *testing.T) {
  296. syncer, connSnapshot := setupOfferSyncer(t)
  297. s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  298. connSnapshot.On("OfferSnapshotSync", abci.RequestOfferSnapshot{
  299. Snapshot: toABCI(s),
  300. AppHash: []byte("app_hash"),
  301. }).Return(&abci.ResponseOfferSnapshot{Result: tc.result}, tc.err)
  302. err := syncer.offerSnapshot(s)
  303. if tc.expectErr == unknownErr {
  304. require.Error(t, err)
  305. } else {
  306. unwrapped := errors.Unwrap(err)
  307. if unwrapped != nil {
  308. err = unwrapped
  309. }
  310. assert.Equal(t, tc.expectErr, err)
  311. }
  312. })
  313. }
  314. }
  315. func TestSyncer_applyChunks_Results(t *testing.T) {
  316. unknownErr := errors.New("unknown error")
  317. boom := errors.New("boom")
  318. testcases := map[string]struct {
  319. result abci.ResponseApplySnapshotChunk_Result
  320. err error
  321. expectErr error
  322. }{
  323. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT, nil, nil},
  324. "abort": {abci.ResponseApplySnapshotChunk_ABORT, nil, errAbort},
  325. "retry": {abci.ResponseApplySnapshotChunk_RETRY, nil, nil},
  326. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT, nil, errRetrySnapshot},
  327. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT, nil, errRejectSnapshot},
  328. "error": {0, boom, boom},
  329. "unknown result": {9, nil, unknownErr},
  330. }
  331. for name, tc := range testcases {
  332. tc := tc
  333. t.Run(name, func(t *testing.T) {
  334. connQuery := &proxymocks.AppConnQuery{}
  335. connSnapshot := &proxymocks.AppConnSnapshot{}
  336. stateProvider := &mocks.StateProvider{}
  337. stateProvider.On("AppHash", mock.Anything).Return([]byte("app_hash"), nil)
  338. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  339. body := []byte{1, 2, 3}
  340. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, "")
  341. chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: body})
  342. require.NoError(t, err)
  343. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  344. Index: 0, Chunk: body,
  345. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: tc.result}, tc.err)
  346. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  347. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  348. Index: 0, Chunk: body,
  349. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  350. Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  351. }
  352. err = syncer.applyChunks(chunks)
  353. if tc.expectErr == unknownErr {
  354. require.Error(t, err)
  355. } else {
  356. unwrapped := errors.Unwrap(err)
  357. if unwrapped != nil {
  358. err = unwrapped
  359. }
  360. assert.Equal(t, tc.expectErr, err)
  361. }
  362. connSnapshot.AssertExpectations(t)
  363. })
  364. }
  365. }
  366. func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
  367. // Discarding chunks via refetch_chunks should work the same for all results
  368. testcases := map[string]struct {
  369. result abci.ResponseApplySnapshotChunk_Result
  370. }{
  371. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  372. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  373. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  374. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  375. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  376. }
  377. for name, tc := range testcases {
  378. tc := tc
  379. t.Run(name, func(t *testing.T) {
  380. connQuery := &proxymocks.AppConnQuery{}
  381. connSnapshot := &proxymocks.AppConnSnapshot{}
  382. stateProvider := &mocks.StateProvider{}
  383. stateProvider.On("AppHash", mock.Anything).Return([]byte("app_hash"), nil)
  384. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  385. chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, "")
  386. require.NoError(t, err)
  387. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}})
  388. require.True(t, added)
  389. require.NoError(t, err)
  390. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}})
  391. require.True(t, added)
  392. require.NoError(t, err)
  393. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}})
  394. require.True(t, added)
  395. require.NoError(t, err)
  396. // The first two chunks are accepted, before the last one asks for 1 to be refetched
  397. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  398. Index: 0, Chunk: []byte{0},
  399. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  400. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  401. Index: 1, Chunk: []byte{1},
  402. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  403. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  404. Index: 2, Chunk: []byte{2},
  405. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  406. Result: tc.result,
  407. RefetchChunks: []uint32{1},
  408. }, nil)
  409. // Since removing the chunk will cause Next() to block, we spawn a goroutine, then
  410. // check the queue contents, and finally close the queue to end the goroutine.
  411. // We don't really care about the result of applyChunks, since it has separate test.
  412. go func() {
  413. syncer.applyChunks(chunks)
  414. }()
  415. time.Sleep(50 * time.Millisecond)
  416. assert.True(t, chunks.Has(0))
  417. assert.False(t, chunks.Has(1))
  418. assert.True(t, chunks.Has(2))
  419. err = chunks.Close()
  420. require.NoError(t, err)
  421. })
  422. }
  423. }
  424. func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
  425. // Banning chunks senders via ban_chunk_senders should work the same for all results
  426. testcases := map[string]struct {
  427. result abci.ResponseApplySnapshotChunk_Result
  428. }{
  429. "accept": {abci.ResponseApplySnapshotChunk_ACCEPT},
  430. "abort": {abci.ResponseApplySnapshotChunk_ABORT},
  431. "retry": {abci.ResponseApplySnapshotChunk_RETRY},
  432. "retry_snapshot": {abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT},
  433. "reject_snapshot": {abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT},
  434. }
  435. for name, tc := range testcases {
  436. tc := tc
  437. t.Run(name, func(t *testing.T) {
  438. connQuery := &proxymocks.AppConnQuery{}
  439. connSnapshot := &proxymocks.AppConnSnapshot{}
  440. stateProvider := &mocks.StateProvider{}
  441. stateProvider.On("AppHash", mock.Anything).Return([]byte("app_hash"), nil)
  442. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  443. // Set up three peers across two snapshots, and ask for one of them to be banned.
  444. // It should be banned from all snapshots.
  445. peerA := simplePeer("a")
  446. peerB := simplePeer("b")
  447. peerC := simplePeer("c")
  448. s1 := &snapshot{Height: 1, Format: 1, Chunks: 3}
  449. s2 := &snapshot{Height: 2, Format: 1, Chunks: 3}
  450. syncer.AddSnapshot(peerA, s1)
  451. syncer.AddSnapshot(peerA, s2)
  452. syncer.AddSnapshot(peerB, s1)
  453. syncer.AddSnapshot(peerB, s2)
  454. syncer.AddSnapshot(peerC, s1)
  455. syncer.AddSnapshot(peerC, s2)
  456. chunks, err := newChunkQueue(s1, "")
  457. require.NoError(t, err)
  458. added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}, Sender: peerA.ID()})
  459. require.True(t, added)
  460. require.NoError(t, err)
  461. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}, Sender: peerB.ID()})
  462. require.True(t, added)
  463. require.NoError(t, err)
  464. added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}, Sender: peerC.ID()})
  465. require.True(t, added)
  466. require.NoError(t, err)
  467. // The first two chunks are accepted, before the last one asks for b sender to be rejected
  468. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  469. Index: 0, Chunk: []byte{0}, Sender: "a",
  470. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  471. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  472. Index: 1, Chunk: []byte{1}, Sender: "b",
  473. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  474. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  475. Index: 2, Chunk: []byte{2}, Sender: "c",
  476. }).Once().Return(&abci.ResponseApplySnapshotChunk{
  477. Result: tc.result,
  478. RejectSenders: []string{string(peerB.ID())},
  479. }, nil)
  480. // On retry, the last chunk will be tried again, so we just accept it then.
  481. if tc.result == abci.ResponseApplySnapshotChunk_RETRY {
  482. connSnapshot.On("ApplySnapshotChunkSync", abci.RequestApplySnapshotChunk{
  483. Index: 2, Chunk: []byte{2}, Sender: "c",
  484. }).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
  485. }
  486. // We don't really care about the result of applyChunks, since it has separate test.
  487. // However, it will block on e.g. retry result, so we spawn a goroutine that will
  488. // be shut down when the chunk queue closes.
  489. go func() {
  490. syncer.applyChunks(chunks)
  491. }()
  492. time.Sleep(50 * time.Millisecond)
  493. s1peers := syncer.snapshots.GetPeers(s1)
  494. assert.Len(t, s1peers, 2)
  495. assert.EqualValues(t, "a", s1peers[0].ID())
  496. assert.EqualValues(t, "c", s1peers[1].ID())
  497. syncer.snapshots.GetPeers(s1)
  498. assert.Len(t, s1peers, 2)
  499. assert.EqualValues(t, "a", s1peers[0].ID())
  500. assert.EqualValues(t, "c", s1peers[1].ID())
  501. err = chunks.Close()
  502. require.NoError(t, err)
  503. })
  504. }
  505. }
  506. func TestSyncer_verifyApp(t *testing.T) {
  507. boom := errors.New("boom")
  508. s := &snapshot{Height: 3, Format: 1, Chunks: 5, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
  509. testcases := map[string]struct {
  510. response *abci.ResponseInfo
  511. err error
  512. expectErr error
  513. }{
  514. "verified": {&abci.ResponseInfo{
  515. LastBlockHeight: 3,
  516. LastBlockAppHash: []byte("app_hash"),
  517. AppVersion: 9,
  518. }, nil, nil},
  519. "invalid height": {&abci.ResponseInfo{
  520. LastBlockHeight: 5,
  521. LastBlockAppHash: []byte("app_hash"),
  522. AppVersion: 9,
  523. }, nil, errVerifyFailed},
  524. "invalid hash": {&abci.ResponseInfo{
  525. LastBlockHeight: 3,
  526. LastBlockAppHash: []byte("xxx"),
  527. AppVersion: 9,
  528. }, nil, errVerifyFailed},
  529. "error": {nil, boom, boom},
  530. }
  531. for name, tc := range testcases {
  532. tc := tc
  533. t.Run(name, func(t *testing.T) {
  534. connQuery := &proxymocks.AppConnQuery{}
  535. connSnapshot := &proxymocks.AppConnSnapshot{}
  536. stateProvider := &mocks.StateProvider{}
  537. syncer := newSyncer(log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
  538. connQuery.On("InfoSync", proxy.RequestInfo).Return(tc.response, tc.err)
  539. version, err := syncer.verifyApp(s)
  540. unwrapped := errors.Unwrap(err)
  541. if unwrapped != nil {
  542. err = unwrapped
  543. }
  544. assert.Equal(t, tc.expectErr, err)
  545. if err == nil {
  546. assert.Equal(t, tc.response.AppVersion, version)
  547. }
  548. })
  549. }
  550. }
  551. func toABCI(s *snapshot) *abci.Snapshot {
  552. return &abci.Snapshot{
  553. Height: s.Height,
  554. Format: s.Format,
  555. Chunks: s.Chunks,
  556. Hash: s.Hash,
  557. Metadata: s.Metadata,
  558. }
  559. }