You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

509 lines
16 KiB

  1. package statesync
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "time"
  8. abci "github.com/tendermint/tendermint/abci/types"
  9. "github.com/tendermint/tendermint/config"
  10. "github.com/tendermint/tendermint/libs/log"
  11. tmsync "github.com/tendermint/tendermint/libs/sync"
  12. "github.com/tendermint/tendermint/light"
  13. "github.com/tendermint/tendermint/p2p"
  14. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  15. "github.com/tendermint/tendermint/proxy"
  16. sm "github.com/tendermint/tendermint/state"
  17. "github.com/tendermint/tendermint/types"
  18. )
  19. const (
  20. // chunkTimeout is the timeout while waiting for the next chunk from the chunk queue.
  21. chunkTimeout = 2 * time.Minute
  22. // minimumDiscoveryTime is the lowest allowable time for a
  23. // SyncAny discovery time.
  24. minimumDiscoveryTime = 5 * time.Second
  25. )
  26. var (
  27. // errAbort is returned by Sync() when snapshot restoration is aborted.
  28. errAbort = errors.New("state sync aborted")
  29. // errRetrySnapshot is returned by Sync() when the snapshot should be retried.
  30. errRetrySnapshot = errors.New("retry snapshot")
  31. // errRejectSnapshot is returned by Sync() when the snapshot is rejected.
  32. errRejectSnapshot = errors.New("snapshot was rejected")
  33. // errRejectFormat is returned by Sync() when the snapshot format is rejected.
  34. errRejectFormat = errors.New("snapshot format was rejected")
  35. // errRejectSender is returned by Sync() when the snapshot sender is rejected.
  36. errRejectSender = errors.New("snapshot sender was rejected")
  37. // errVerifyFailed is returned by Sync() when app hash or last height verification fails.
  38. errVerifyFailed = errors.New("verification failed")
  39. // errTimeout is returned by Sync() when we've waited too long to receive a chunk.
  40. errTimeout = errors.New("timed out waiting for chunk")
  41. // errNoSnapshots is returned by SyncAny() if no snapshots are found and discovery is disabled.
  42. errNoSnapshots = errors.New("no suitable snapshots found")
  43. )
  44. // syncer runs a state sync against an ABCI app. Use either SyncAny() to automatically attempt to
  45. // sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific
  46. // snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate.
  47. type syncer struct {
  48. logger log.Logger
  49. stateProvider StateProvider
  50. conn proxy.AppConnSnapshot
  51. connQuery proxy.AppConnQuery
  52. snapshots *snapshotPool
  53. tempDir string
  54. chunkFetchers int32
  55. retryTimeout time.Duration
  56. mtx tmsync.RWMutex
  57. chunks *chunkQueue
  58. }
  59. // newSyncer creates a new syncer.
  60. func newSyncer(
  61. cfg config.StateSyncConfig,
  62. logger log.Logger,
  63. conn proxy.AppConnSnapshot,
  64. connQuery proxy.AppConnQuery,
  65. stateProvider StateProvider,
  66. tempDir string,
  67. ) *syncer {
  68. return &syncer{
  69. logger: logger,
  70. stateProvider: stateProvider,
  71. conn: conn,
  72. connQuery: connQuery,
  73. snapshots: newSnapshotPool(),
  74. tempDir: tempDir,
  75. chunkFetchers: cfg.ChunkFetchers,
  76. retryTimeout: cfg.ChunkRequestTimeout,
  77. }
  78. }
  79. // AddChunk adds a chunk to the chunk queue, if any. It returns false if the chunk has already
  80. // been added to the queue, or an error if there's no sync in progress.
  81. func (s *syncer) AddChunk(chunk *chunk) (bool, error) {
  82. s.mtx.RLock()
  83. defer s.mtx.RUnlock()
  84. if s.chunks == nil {
  85. return false, errors.New("no state sync in progress")
  86. }
  87. added, err := s.chunks.Add(chunk)
  88. if err != nil {
  89. return false, err
  90. }
  91. if added {
  92. s.logger.Debug("Added chunk to queue", "height", chunk.Height, "format", chunk.Format,
  93. "chunk", chunk.Index)
  94. } else {
  95. s.logger.Debug("Ignoring duplicate chunk in queue", "height", chunk.Height, "format", chunk.Format,
  96. "chunk", chunk.Index)
  97. }
  98. return added, nil
  99. }
  100. // AddSnapshot adds a snapshot to the snapshot pool. It returns true if a new, previously unseen
  101. // snapshot was accepted and added.
  102. func (s *syncer) AddSnapshot(peer p2p.Peer, snapshot *snapshot) (bool, error) {
  103. added, err := s.snapshots.Add(peer, snapshot)
  104. if err != nil {
  105. return false, err
  106. }
  107. if added {
  108. s.logger.Info("Discovered new snapshot", "height", snapshot.Height, "format", snapshot.Format,
  109. "hash", snapshot.Hash)
  110. }
  111. return added, nil
  112. }
  113. // AddPeer adds a peer to the pool. For now we just keep it simple and send a single request
  114. // to discover snapshots, later we may want to do retries and stuff.
  115. func (s *syncer) AddPeer(peer p2p.Peer) {
  116. s.logger.Debug("Requesting snapshots from peer", "peer", peer.ID())
  117. peer.Send(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
  118. }
  119. // RemovePeer removes a peer from the pool.
  120. func (s *syncer) RemovePeer(peer p2p.Peer) {
  121. s.logger.Debug("Removing peer from sync", "peer", peer.ID())
  122. s.snapshots.RemovePeer(peer.ID())
  123. }
  124. // SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further
  125. // snapshots if none were found and discoveryTime > 0. It returns the latest state and block commit
  126. // which the caller must use to bootstrap the node.
  127. func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.State, *types.Commit, error) {
  128. if discoveryTime != 0 && discoveryTime < minimumDiscoveryTime {
  129. discoveryTime = 5 * minimumDiscoveryTime
  130. }
  131. if discoveryTime > 0 {
  132. s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
  133. time.Sleep(discoveryTime)
  134. }
  135. // The app may ask us to retry a snapshot restoration, in which case we need to reuse
  136. // the snapshot and chunk queue from the previous loop iteration.
  137. var (
  138. snapshot *snapshot
  139. chunks *chunkQueue
  140. err error
  141. )
  142. for {
  143. // If not nil, we're going to retry restoration of the same snapshot.
  144. if snapshot == nil {
  145. snapshot = s.snapshots.Best()
  146. chunks = nil
  147. }
  148. if snapshot == nil {
  149. if discoveryTime == 0 {
  150. return sm.State{}, nil, errNoSnapshots
  151. }
  152. retryHook()
  153. s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
  154. time.Sleep(discoveryTime)
  155. continue
  156. }
  157. if chunks == nil {
  158. chunks, err = newChunkQueue(snapshot, s.tempDir)
  159. if err != nil {
  160. return sm.State{}, nil, fmt.Errorf("failed to create chunk queue: %w", err)
  161. }
  162. defer chunks.Close() // in case we forget to close it elsewhere
  163. }
  164. newState, commit, err := s.Sync(snapshot, chunks)
  165. switch {
  166. case err == nil:
  167. return newState, commit, nil
  168. case errors.Is(err, errAbort):
  169. return sm.State{}, nil, err
  170. case errors.Is(err, errRetrySnapshot):
  171. chunks.RetryAll()
  172. s.logger.Info("Retrying snapshot", "height", snapshot.Height, "format", snapshot.Format,
  173. "hash", snapshot.Hash)
  174. continue
  175. case errors.Is(err, errTimeout):
  176. s.snapshots.Reject(snapshot)
  177. s.logger.Error("Timed out waiting for snapshot chunks, rejected snapshot",
  178. "height", snapshot.Height, "format", snapshot.Format, "hash", snapshot.Hash)
  179. case errors.Is(err, errRejectSnapshot):
  180. s.snapshots.Reject(snapshot)
  181. s.logger.Info("Snapshot rejected", "height", snapshot.Height, "format", snapshot.Format,
  182. "hash", snapshot.Hash)
  183. case errors.Is(err, errRejectFormat):
  184. s.snapshots.RejectFormat(snapshot.Format)
  185. s.logger.Info("Snapshot format rejected", "format", snapshot.Format)
  186. case errors.Is(err, errRejectSender):
  187. s.logger.Info("Snapshot senders rejected", "height", snapshot.Height, "format", snapshot.Format,
  188. "hash", snapshot.Hash)
  189. for _, peer := range s.snapshots.GetPeers(snapshot) {
  190. s.snapshots.RejectPeer(peer.ID())
  191. s.logger.Info("Snapshot sender rejected", "peer", peer.ID())
  192. }
  193. case errors.Is(err, context.DeadlineExceeded):
  194. s.logger.Info("Timed out validating snapshot, rejecting", "height", snapshot.Height, "err", err)
  195. s.snapshots.Reject(snapshot)
  196. default:
  197. return sm.State{}, nil, fmt.Errorf("snapshot restoration failed: %w", err)
  198. }
  199. // Discard snapshot and chunks for next iteration
  200. err = chunks.Close()
  201. if err != nil {
  202. s.logger.Error("Failed to clean up chunk queue", "err", err)
  203. }
  204. snapshot = nil
  205. chunks = nil
  206. }
  207. }
  208. // Sync executes a sync for a specific snapshot, returning the latest state and block commit which
  209. // the caller must use to bootstrap the node.
  210. func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.Commit, error) {
  211. s.mtx.Lock()
  212. if s.chunks != nil {
  213. s.mtx.Unlock()
  214. return sm.State{}, nil, errors.New("a state sync is already in progress")
  215. }
  216. s.chunks = chunks
  217. s.mtx.Unlock()
  218. defer func() {
  219. s.mtx.Lock()
  220. s.chunks = nil
  221. s.mtx.Unlock()
  222. }()
  223. hctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
  224. defer cancel()
  225. appHash, err := s.stateProvider.AppHash(hctx, snapshot.Height)
  226. if err != nil {
  227. s.logger.Info("failed to fetch and verify app hash", "err", err)
  228. if err == light.ErrNoWitnesses {
  229. return sm.State{}, nil, err
  230. }
  231. return sm.State{}, nil, errRejectSnapshot
  232. }
  233. snapshot.trustedAppHash = appHash
  234. // Offer snapshot to ABCI app.
  235. err = s.offerSnapshot(snapshot)
  236. if err != nil {
  237. return sm.State{}, nil, err
  238. }
  239. // Spawn chunk fetchers. They will terminate when the chunk queue is closed or context cancelled.
  240. fetchCtx, cancel := context.WithCancel(context.TODO())
  241. defer cancel()
  242. for i := int32(0); i < s.chunkFetchers; i++ {
  243. go s.fetchChunks(fetchCtx, snapshot, chunks)
  244. }
  245. pctx, pcancel := context.WithTimeout(context.TODO(), 30*time.Second)
  246. defer pcancel()
  247. // Optimistically build new state, so we don't discover any light client failures at the end.
  248. state, err := s.stateProvider.State(pctx, snapshot.Height)
  249. if err != nil {
  250. s.logger.Info("failed to fetch and verify tendermint state", "err", err)
  251. if err == light.ErrNoWitnesses {
  252. return sm.State{}, nil, err
  253. }
  254. return sm.State{}, nil, errRejectSnapshot
  255. }
  256. commit, err := s.stateProvider.Commit(pctx, snapshot.Height)
  257. if err != nil {
  258. s.logger.Info("failed to fetch and verify commit", "err", err)
  259. if err == light.ErrNoWitnesses {
  260. return sm.State{}, nil, err
  261. }
  262. return sm.State{}, nil, errRejectSnapshot
  263. }
  264. // Restore snapshot
  265. err = s.applyChunks(chunks)
  266. if err != nil {
  267. return sm.State{}, nil, err
  268. }
  269. // Verify app and app version
  270. if err := s.verifyApp(snapshot, state.Version.Consensus.App); err != nil {
  271. return sm.State{}, nil, err
  272. }
  273. // Done! 🎉
  274. s.logger.Info("Snapshot restored", "height", snapshot.Height, "format", snapshot.Format,
  275. "hash", snapshot.Hash)
  276. return state, commit, nil
  277. }
  278. // offerSnapshot offers a snapshot to the app. It returns various errors depending on the app's
  279. // response, or nil if the snapshot was accepted.
  280. func (s *syncer) offerSnapshot(snapshot *snapshot) error {
  281. s.logger.Info("Offering snapshot to ABCI app", "height", snapshot.Height,
  282. "format", snapshot.Format, "hash", snapshot.Hash)
  283. resp, err := s.conn.OfferSnapshotSync(abci.RequestOfferSnapshot{
  284. Snapshot: &abci.Snapshot{
  285. Height: snapshot.Height,
  286. Format: snapshot.Format,
  287. Chunks: snapshot.Chunks,
  288. Hash: snapshot.Hash,
  289. Metadata: snapshot.Metadata,
  290. },
  291. AppHash: snapshot.trustedAppHash,
  292. })
  293. if err != nil {
  294. return fmt.Errorf("failed to offer snapshot: %w", err)
  295. }
  296. switch resp.Result {
  297. case abci.ResponseOfferSnapshot_ACCEPT:
  298. s.logger.Info("Snapshot accepted, restoring", "height", snapshot.Height,
  299. "format", snapshot.Format, "hash", snapshot.Hash)
  300. return nil
  301. case abci.ResponseOfferSnapshot_ABORT:
  302. return errAbort
  303. case abci.ResponseOfferSnapshot_REJECT:
  304. return errRejectSnapshot
  305. case abci.ResponseOfferSnapshot_REJECT_FORMAT:
  306. return errRejectFormat
  307. case abci.ResponseOfferSnapshot_REJECT_SENDER:
  308. return errRejectSender
  309. default:
  310. return fmt.Errorf("unknown ResponseOfferSnapshot result %v", resp.Result)
  311. }
  312. }
  313. // applyChunks applies chunks to the app. It returns various errors depending on the app's
  314. // response, or nil once the snapshot is fully restored.
  315. func (s *syncer) applyChunks(chunks *chunkQueue) error {
  316. for {
  317. chunk, err := chunks.Next()
  318. if err == errDone {
  319. return nil
  320. } else if err != nil {
  321. return fmt.Errorf("failed to fetch chunk: %w", err)
  322. }
  323. resp, err := s.conn.ApplySnapshotChunkSync(abci.RequestApplySnapshotChunk{
  324. Index: chunk.Index,
  325. Chunk: chunk.Chunk,
  326. Sender: string(chunk.Sender),
  327. })
  328. if err != nil {
  329. return fmt.Errorf("failed to apply chunk %v: %w", chunk.Index, err)
  330. }
  331. s.logger.Info("Applied snapshot chunk to ABCI app", "height", chunk.Height,
  332. "format", chunk.Format, "chunk", chunk.Index, "total", chunks.Size())
  333. // Discard and refetch any chunks as requested by the app
  334. for _, index := range resp.RefetchChunks {
  335. err := chunks.Discard(index)
  336. if err != nil {
  337. return fmt.Errorf("failed to discard chunk %v: %w", index, err)
  338. }
  339. }
  340. // Reject any senders as requested by the app
  341. for _, sender := range resp.RejectSenders {
  342. if sender != "" {
  343. s.snapshots.RejectPeer(p2p.ID(sender))
  344. err := chunks.DiscardSender(p2p.ID(sender))
  345. if err != nil {
  346. return fmt.Errorf("failed to reject sender: %w", err)
  347. }
  348. }
  349. }
  350. switch resp.Result {
  351. case abci.ResponseApplySnapshotChunk_ACCEPT:
  352. case abci.ResponseApplySnapshotChunk_ABORT:
  353. return errAbort
  354. case abci.ResponseApplySnapshotChunk_RETRY:
  355. chunks.Retry(chunk.Index)
  356. case abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT:
  357. return errRetrySnapshot
  358. case abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT:
  359. return errRejectSnapshot
  360. default:
  361. return fmt.Errorf("unknown ResponseApplySnapshotChunk result %v", resp.Result)
  362. }
  363. }
  364. }
  365. // fetchChunks requests chunks from peers, receiving allocations from the chunk queue. Chunks
  366. // will be received from the reactor via syncer.AddChunks() to chunkQueue.Add().
  367. func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) {
  368. var (
  369. next = true
  370. index uint32
  371. err error
  372. )
  373. for {
  374. if next {
  375. index, err = chunks.Allocate()
  376. if errors.Is(err, errDone) {
  377. // Keep checking until the context is canceled (restore is done), in case any
  378. // chunks need to be refetched.
  379. select {
  380. case <-ctx.Done():
  381. return
  382. default:
  383. }
  384. time.Sleep(2 * time.Second)
  385. continue
  386. }
  387. if err != nil {
  388. s.logger.Error("Failed to allocate chunk from queue", "err", err)
  389. return
  390. }
  391. }
  392. s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height,
  393. "format", snapshot.Format, "chunk", index, "total", chunks.Size())
  394. ticker := time.NewTicker(s.retryTimeout)
  395. defer ticker.Stop()
  396. s.requestChunk(snapshot, index)
  397. select {
  398. case <-chunks.WaitFor(index):
  399. next = true
  400. case <-ticker.C:
  401. next = false
  402. case <-ctx.Done():
  403. return
  404. }
  405. ticker.Stop()
  406. }
  407. }
  408. // requestChunk requests a chunk from a peer.
  409. func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
  410. peer := s.snapshots.GetPeer(snapshot)
  411. if peer == nil {
  412. s.logger.Error("No valid peers found for snapshot", "height", snapshot.Height,
  413. "format", snapshot.Format, "hash", snapshot.Hash)
  414. return
  415. }
  416. s.logger.Debug("Requesting snapshot chunk", "height", snapshot.Height,
  417. "format", snapshot.Format, "chunk", chunk, "peer", peer.ID())
  418. peer.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkRequest{
  419. Height: snapshot.Height,
  420. Format: snapshot.Format,
  421. Index: chunk,
  422. }))
  423. }
  424. // verifyApp verifies the sync, checking the app hash, last block height and app version
  425. func (s *syncer) verifyApp(snapshot *snapshot, appVersion uint64) error {
  426. resp, err := s.connQuery.InfoSync(proxy.RequestInfo)
  427. if err != nil {
  428. return fmt.Errorf("failed to query ABCI app for appHash: %w", err)
  429. }
  430. // sanity check that the app version in the block matches the application's own record
  431. // of its version
  432. if resp.AppVersion != appVersion {
  433. // An error here most likely means that the app hasn't inplemented state sync
  434. // or the Info call correctly
  435. return fmt.Errorf("app version mismatch. Expected: %d, got: %d",
  436. appVersion, resp.AppVersion)
  437. }
  438. if !bytes.Equal(snapshot.trustedAppHash, resp.LastBlockAppHash) {
  439. s.logger.Error("appHash verification failed",
  440. "expected", snapshot.trustedAppHash,
  441. "actual", resp.LastBlockAppHash)
  442. return errVerifyFailed
  443. }
  444. if uint64(resp.LastBlockHeight) != snapshot.Height {
  445. s.logger.Error(
  446. "ABCI app reported unexpected last block height",
  447. "expected", snapshot.Height,
  448. "actual", resp.LastBlockHeight,
  449. )
  450. return errVerifyFailed
  451. }
  452. s.logger.Info("Verified ABCI app", "height", snapshot.Height, "appHash", snapshot.trustedAppHash)
  453. return nil
  454. }