You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

476 lines
15 KiB

  1. package statesync
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "time"
  8. abci "github.com/tendermint/tendermint/abci/types"
  9. "github.com/tendermint/tendermint/libs/log"
  10. tmsync "github.com/tendermint/tendermint/libs/sync"
  11. "github.com/tendermint/tendermint/p2p"
  12. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  13. "github.com/tendermint/tendermint/proxy"
  14. sm "github.com/tendermint/tendermint/state"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. const (
  18. // chunkFetchers is the number of concurrent chunk fetchers to run.
  19. chunkFetchers = 4
  20. // chunkTimeout is the timeout while waiting for the next chunk from the chunk queue.
  21. chunkTimeout = 2 * time.Minute
  22. // requestTimeout is the timeout before rerequesting a chunk, possibly from a different peer.
  23. chunkRequestTimeout = 10 * time.Second
  24. )
  25. var (
  26. // errAbort is returned by Sync() when snapshot restoration is aborted.
  27. errAbort = errors.New("state sync aborted")
  28. // errRetrySnapshot is returned by Sync() when the snapshot should be retried.
  29. errRetrySnapshot = errors.New("retry snapshot")
  30. // errRejectSnapshot is returned by Sync() when the snapshot is rejected.
  31. errRejectSnapshot = errors.New("snapshot was rejected")
  32. // errRejectFormat is returned by Sync() when the snapshot format is rejected.
  33. errRejectFormat = errors.New("snapshot format was rejected")
  34. // errRejectSender is returned by Sync() when the snapshot sender is rejected.
  35. errRejectSender = errors.New("snapshot sender was rejected")
  36. // errVerifyFailed is returned by Sync() when app hash or last height verification fails.
  37. errVerifyFailed = errors.New("verification failed")
  38. // errTimeout is returned by Sync() when we've waited too long to receive a chunk.
  39. errTimeout = errors.New("timed out waiting for chunk")
  40. // errNoSnapshots is returned by SyncAny() if no snapshots are found and discovery is disabled.
  41. errNoSnapshots = errors.New("no suitable snapshots found")
  42. )
  43. // syncer runs a state sync against an ABCI app. Use either SyncAny() to automatically attempt to
  44. // sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific
  45. // snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate.
  46. type syncer struct {
  47. logger log.Logger
  48. stateProvider StateProvider
  49. conn proxy.AppConnSnapshot
  50. connQuery proxy.AppConnQuery
  51. snapshots *snapshotPool
  52. snapshotCh chan<- p2p.Envelope
  53. chunkCh chan<- p2p.Envelope
  54. tempDir string
  55. mtx tmsync.RWMutex
  56. chunks *chunkQueue
  57. }
  58. // newSyncer creates a new syncer.
  59. func newSyncer(
  60. logger log.Logger,
  61. conn proxy.AppConnSnapshot,
  62. connQuery proxy.AppConnQuery,
  63. stateProvider StateProvider,
  64. snapshotCh, chunkCh chan<- p2p.Envelope,
  65. tempDir string,
  66. ) *syncer {
  67. return &syncer{
  68. logger: logger,
  69. stateProvider: stateProvider,
  70. conn: conn,
  71. connQuery: connQuery,
  72. snapshots: newSnapshotPool(stateProvider),
  73. snapshotCh: snapshotCh,
  74. chunkCh: chunkCh,
  75. tempDir: tempDir,
  76. }
  77. }
  78. // AddChunk adds a chunk to the chunk queue, if any. It returns false if the chunk has already
  79. // been added to the queue, or an error if there's no sync in progress.
  80. func (s *syncer) AddChunk(chunk *chunk) (bool, error) {
  81. s.mtx.RLock()
  82. defer s.mtx.RUnlock()
  83. if s.chunks == nil {
  84. return false, errors.New("no state sync in progress")
  85. }
  86. added, err := s.chunks.Add(chunk)
  87. if err != nil {
  88. return false, err
  89. }
  90. if added {
  91. s.logger.Debug("Added chunk to queue", "height", chunk.Height, "format", chunk.Format,
  92. "chunk", chunk.Index)
  93. } else {
  94. s.logger.Debug("Ignoring duplicate chunk in queue", "height", chunk.Height, "format", chunk.Format,
  95. "chunk", chunk.Index)
  96. }
  97. return added, nil
  98. }
  99. // AddSnapshot adds a snapshot to the snapshot pool. It returns true if a new, previously unseen
  100. // snapshot was accepted and added.
  101. func (s *syncer) AddSnapshot(peer p2p.PeerID, snapshot *snapshot) (bool, error) {
  102. added, err := s.snapshots.Add(peer, snapshot)
  103. if err != nil {
  104. return false, err
  105. }
  106. if added {
  107. s.logger.Info("Discovered new snapshot", "height", snapshot.Height, "format", snapshot.Format,
  108. "hash", fmt.Sprintf("%X", snapshot.Hash))
  109. }
  110. return added, nil
  111. }
  112. // AddPeer adds a peer to the pool. For now we just keep it simple and send a
  113. // single request to discover snapshots, later we may want to do retries and stuff.
  114. func (s *syncer) AddPeer(peer p2p.PeerID) {
  115. s.logger.Debug("Requesting snapshots from peer", "peer", peer.String())
  116. s.snapshotCh <- p2p.Envelope{
  117. To: peer,
  118. Message: &ssproto.SnapshotsRequest{},
  119. }
  120. }
  121. // RemovePeer removes a peer from the pool.
  122. func (s *syncer) RemovePeer(peer p2p.PeerID) {
  123. s.logger.Debug("Removing peer from sync", "peer", peer.String())
  124. s.snapshots.RemovePeer(peer)
  125. }
  126. // SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further
  127. // snapshots if none were found and discoveryTime > 0. It returns the latest state and block commit
  128. // which the caller must use to bootstrap the node.
  129. func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit, error) {
  130. if discoveryTime > 0 {
  131. s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
  132. time.Sleep(discoveryTime)
  133. }
  134. // The app may ask us to retry a snapshot restoration, in which case we need to reuse
  135. // the snapshot and chunk queue from the previous loop iteration.
  136. var (
  137. snapshot *snapshot
  138. chunks *chunkQueue
  139. err error
  140. )
  141. for {
  142. // If not nil, we're going to retry restoration of the same snapshot.
  143. if snapshot == nil {
  144. snapshot = s.snapshots.Best()
  145. chunks = nil
  146. }
  147. if snapshot == nil {
  148. if discoveryTime == 0 {
  149. return sm.State{}, nil, errNoSnapshots
  150. }
  151. s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
  152. time.Sleep(discoveryTime)
  153. continue
  154. }
  155. if chunks == nil {
  156. chunks, err = newChunkQueue(snapshot, s.tempDir)
  157. if err != nil {
  158. return sm.State{}, nil, fmt.Errorf("failed to create chunk queue: %w", err)
  159. }
  160. defer chunks.Close() // in case we forget to close it elsewhere
  161. }
  162. newState, commit, err := s.Sync(snapshot, chunks)
  163. switch {
  164. case err == nil:
  165. return newState, commit, nil
  166. case errors.Is(err, errAbort):
  167. return sm.State{}, nil, err
  168. case errors.Is(err, errRetrySnapshot):
  169. chunks.RetryAll()
  170. s.logger.Info("Retrying snapshot", "height", snapshot.Height, "format", snapshot.Format,
  171. "hash", fmt.Sprintf("%X", snapshot.Hash))
  172. continue
  173. case errors.Is(err, errTimeout):
  174. s.snapshots.Reject(snapshot)
  175. s.logger.Error("Timed out waiting for snapshot chunks, rejected snapshot",
  176. "height", snapshot.Height, "format", snapshot.Format, "hash", fmt.Sprintf("%X", snapshot.Hash))
  177. case errors.Is(err, errRejectSnapshot):
  178. s.snapshots.Reject(snapshot)
  179. s.logger.Info("Snapshot rejected", "height", snapshot.Height, "format", snapshot.Format,
  180. "hash", fmt.Sprintf("%X", snapshot.Hash))
  181. case errors.Is(err, errRejectFormat):
  182. s.snapshots.RejectFormat(snapshot.Format)
  183. s.logger.Info("Snapshot format rejected", "format", snapshot.Format)
  184. case errors.Is(err, errRejectSender):
  185. s.logger.Info("Snapshot senders rejected", "height", snapshot.Height, "format", snapshot.Format,
  186. "hash", fmt.Sprintf("%X", snapshot.Hash))
  187. for _, peer := range s.snapshots.GetPeers(snapshot) {
  188. s.snapshots.RejectPeer(peer)
  189. s.logger.Info("Snapshot sender rejected", "peer", peer.String())
  190. }
  191. default:
  192. return sm.State{}, nil, fmt.Errorf("snapshot restoration failed: %w", err)
  193. }
  194. // Discard snapshot and chunks for next iteration
  195. err = chunks.Close()
  196. if err != nil {
  197. s.logger.Error("Failed to clean up chunk queue", "err", err)
  198. }
  199. snapshot = nil
  200. chunks = nil
  201. }
  202. }
  203. // Sync executes a sync for a specific snapshot, returning the latest state and block commit which
  204. // the caller must use to bootstrap the node.
  205. func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.Commit, error) {
  206. s.mtx.Lock()
  207. if s.chunks != nil {
  208. s.mtx.Unlock()
  209. return sm.State{}, nil, errors.New("a state sync is already in progress")
  210. }
  211. s.chunks = chunks
  212. s.mtx.Unlock()
  213. defer func() {
  214. s.mtx.Lock()
  215. s.chunks = nil
  216. s.mtx.Unlock()
  217. }()
  218. // Offer snapshot to ABCI app.
  219. err := s.offerSnapshot(snapshot)
  220. if err != nil {
  221. return sm.State{}, nil, err
  222. }
  223. // Spawn chunk fetchers. They will terminate when the chunk queue is closed or context cancelled.
  224. ctx, cancel := context.WithCancel(context.Background())
  225. defer cancel()
  226. for i := int32(0); i < chunkFetchers; i++ {
  227. go s.fetchChunks(ctx, snapshot, chunks)
  228. }
  229. pctx, pcancel := context.WithTimeout(context.Background(), 10*time.Second)
  230. defer pcancel()
  231. // Optimistically build new state, so we don't discover any light client failures at the end.
  232. state, err := s.stateProvider.State(pctx, snapshot.Height)
  233. if err != nil {
  234. return sm.State{}, nil, fmt.Errorf("failed to build new state: %w", err)
  235. }
  236. commit, err := s.stateProvider.Commit(pctx, snapshot.Height)
  237. if err != nil {
  238. return sm.State{}, nil, fmt.Errorf("failed to fetch commit: %w", err)
  239. }
  240. // Restore snapshot
  241. err = s.applyChunks(chunks)
  242. if err != nil {
  243. return sm.State{}, nil, err
  244. }
  245. // Verify app and update app version
  246. appVersion, err := s.verifyApp(snapshot)
  247. if err != nil {
  248. return sm.State{}, nil, err
  249. }
  250. state.Version.Consensus.App = appVersion
  251. // Done! 🎉
  252. s.logger.Info("Snapshot restored", "height", snapshot.Height, "format", snapshot.Format,
  253. "hash", fmt.Sprintf("%X", snapshot.Hash))
  254. return state, commit, nil
  255. }
  256. // offerSnapshot offers a snapshot to the app. It returns various errors depending on the app's
  257. // response, or nil if the snapshot was accepted.
  258. func (s *syncer) offerSnapshot(snapshot *snapshot) error {
  259. s.logger.Info("Offering snapshot to ABCI app", "height", snapshot.Height,
  260. "format", snapshot.Format, "hash", fmt.Sprintf("%X", snapshot.Hash))
  261. resp, err := s.conn.OfferSnapshotSync(context.Background(), abci.RequestOfferSnapshot{
  262. Snapshot: &abci.Snapshot{
  263. Height: snapshot.Height,
  264. Format: snapshot.Format,
  265. Chunks: snapshot.Chunks,
  266. Hash: snapshot.Hash,
  267. Metadata: snapshot.Metadata,
  268. },
  269. AppHash: snapshot.trustedAppHash,
  270. })
  271. if err != nil {
  272. return fmt.Errorf("failed to offer snapshot: %w", err)
  273. }
  274. switch resp.Result {
  275. case abci.ResponseOfferSnapshot_ACCEPT:
  276. s.logger.Info("Snapshot accepted, restoring", "height", snapshot.Height,
  277. "format", snapshot.Format, "hash", fmt.Sprintf("%X", snapshot.Hash))
  278. return nil
  279. case abci.ResponseOfferSnapshot_ABORT:
  280. return errAbort
  281. case abci.ResponseOfferSnapshot_REJECT:
  282. return errRejectSnapshot
  283. case abci.ResponseOfferSnapshot_REJECT_FORMAT:
  284. return errRejectFormat
  285. case abci.ResponseOfferSnapshot_REJECT_SENDER:
  286. return errRejectSender
  287. default:
  288. return fmt.Errorf("unknown ResponseOfferSnapshot result %v", resp.Result)
  289. }
  290. }
  291. // applyChunks applies chunks to the app. It returns various errors depending on the app's
  292. // response, or nil once the snapshot is fully restored.
  293. func (s *syncer) applyChunks(chunks *chunkQueue) error {
  294. for {
  295. chunk, err := chunks.Next()
  296. if err == errDone {
  297. return nil
  298. } else if err != nil {
  299. return fmt.Errorf("failed to fetch chunk: %w", err)
  300. }
  301. resp, err := s.conn.ApplySnapshotChunkSync(context.Background(), abci.RequestApplySnapshotChunk{
  302. Index: chunk.Index,
  303. Chunk: chunk.Chunk,
  304. Sender: chunk.Sender.String(),
  305. })
  306. if err != nil {
  307. return fmt.Errorf("failed to apply chunk %v: %w", chunk.Index, err)
  308. }
  309. s.logger.Info("Applied snapshot chunk to ABCI app", "height", chunk.Height,
  310. "format", chunk.Format, "chunk", chunk.Index, "total", chunks.Size())
  311. // Discard and refetch any chunks as requested by the app
  312. for _, index := range resp.RefetchChunks {
  313. err := chunks.Discard(index)
  314. if err != nil {
  315. return fmt.Errorf("failed to discard chunk %v: %w", index, err)
  316. }
  317. }
  318. // Reject any senders as requested by the app
  319. for _, sender := range resp.RejectSenders {
  320. if sender != "" {
  321. peerID, err := p2p.PeerIDFromString(sender)
  322. if err != nil {
  323. return err
  324. }
  325. s.snapshots.RejectPeer(peerID)
  326. if err := chunks.DiscardSender(peerID); err != nil {
  327. return fmt.Errorf("failed to reject sender: %w", err)
  328. }
  329. }
  330. }
  331. switch resp.Result {
  332. case abci.ResponseApplySnapshotChunk_ACCEPT:
  333. case abci.ResponseApplySnapshotChunk_ABORT:
  334. return errAbort
  335. case abci.ResponseApplySnapshotChunk_RETRY:
  336. chunks.Retry(chunk.Index)
  337. case abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT:
  338. return errRetrySnapshot
  339. case abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT:
  340. return errRejectSnapshot
  341. default:
  342. return fmt.Errorf("unknown ResponseApplySnapshotChunk result %v", resp.Result)
  343. }
  344. }
  345. }
  346. // fetchChunks requests chunks from peers, receiving allocations from the chunk queue. Chunks
  347. // will be received from the reactor via syncer.AddChunks() to chunkQueue.Add().
  348. func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) {
  349. for {
  350. index, err := chunks.Allocate()
  351. if err == errDone {
  352. // Keep checking until the context is cancelled (restore is done), in case any
  353. // chunks need to be refetched.
  354. select {
  355. case <-ctx.Done():
  356. return
  357. default:
  358. }
  359. time.Sleep(2 * time.Second)
  360. continue
  361. }
  362. if err != nil {
  363. s.logger.Error("Failed to allocate chunk from queue", "err", err)
  364. return
  365. }
  366. s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height,
  367. "format", snapshot.Format, "chunk", index, "total", chunks.Size())
  368. ticker := time.NewTicker(chunkRequestTimeout)
  369. defer ticker.Stop()
  370. s.requestChunk(snapshot, index)
  371. select {
  372. case <-chunks.WaitFor(index):
  373. case <-ticker.C:
  374. s.requestChunk(snapshot, index)
  375. case <-ctx.Done():
  376. return
  377. }
  378. ticker.Stop()
  379. }
  380. }
  381. // requestChunk requests a chunk from a peer.
  382. func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
  383. peer := s.snapshots.GetPeer(snapshot)
  384. if peer == nil {
  385. s.logger.Error("No valid peers found for snapshot", "height", snapshot.Height,
  386. "format", snapshot.Format, "hash", snapshot.Hash)
  387. return
  388. }
  389. s.logger.Debug(
  390. "Requesting snapshot chunk",
  391. "height", snapshot.Height,
  392. "format", snapshot.Format,
  393. "chunk", chunk,
  394. "peer", peer.String(),
  395. )
  396. s.chunkCh <- p2p.Envelope{
  397. To: peer,
  398. Message: &ssproto.ChunkRequest{
  399. Height: snapshot.Height,
  400. Format: snapshot.Format,
  401. Index: chunk,
  402. },
  403. }
  404. }
  405. // verifyApp verifies the sync, checking the app hash and last block height. It returns the
  406. // app version, which should be returned as part of the initial state.
  407. func (s *syncer) verifyApp(snapshot *snapshot) (uint64, error) {
  408. resp, err := s.connQuery.InfoSync(context.Background(), proxy.RequestInfo)
  409. if err != nil {
  410. return 0, fmt.Errorf("failed to query ABCI app for appHash: %w", err)
  411. }
  412. if !bytes.Equal(snapshot.trustedAppHash, resp.LastBlockAppHash) {
  413. s.logger.Error("appHash verification failed",
  414. "expected", fmt.Sprintf("%X", snapshot.trustedAppHash),
  415. "actual", fmt.Sprintf("%X", resp.LastBlockAppHash))
  416. return 0, errVerifyFailed
  417. }
  418. if uint64(resp.LastBlockHeight) != snapshot.Height {
  419. s.logger.Error(
  420. "ABCI app reported unexpected last block height",
  421. "expected", snapshot.Height,
  422. "actual", resp.LastBlockHeight,
  423. )
  424. return 0, errVerifyFailed
  425. }
  426. s.logger.Info("Verified ABCI app", "height", snapshot.Height, "appHash", fmt.Sprintf("%X", snapshot.trustedAppHash))
  427. return resp.AppVersion, nil
  428. }