You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

443 lines
15 KiB

  1. package statesync
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "sync"
  8. "time"
  9. abci "github.com/tendermint/tendermint/abci/types"
  10. "github.com/tendermint/tendermint/libs/log"
  11. "github.com/tendermint/tendermint/p2p"
  12. ssproto "github.com/tendermint/tendermint/proto/statesync"
  13. "github.com/tendermint/tendermint/proxy"
  14. sm "github.com/tendermint/tendermint/state"
  15. "github.com/tendermint/tendermint/types"
  16. "github.com/tendermint/tendermint/version"
  17. )
  18. const (
  19. // defaultDiscoveryTime is the time to spend discovering snapshots.
  20. defaultDiscoveryTime = 20 * time.Second
  21. // chunkFetchers is the number of concurrent chunk fetchers to run.
  22. chunkFetchers = 4
  23. // chunkTimeout is the timeout while waiting for the next chunk from the chunk queue.
  24. chunkTimeout = 2 * time.Minute
  25. // requestTimeout is the timeout before rerequesting a chunk, possibly from a different peer.
  26. chunkRequestTimeout = 10 * time.Second
  27. )
  28. var (
  29. // errAbort is returned by Sync() when snapshot restoration is aborted.
  30. errAbort = errors.New("state sync aborted")
  31. // errRetrySnapshot is returned by Sync() when the snapshot should be retried.
  32. errRetrySnapshot = errors.New("retry snapshot")
  33. // errRejectSnapshot is returned by Sync() when the snapshot is rejected.
  34. errRejectSnapshot = errors.New("snapshot was rejected")
  35. // errRejectFormat is returned by Sync() when the snapshot format is rejected.
  36. errRejectFormat = errors.New("snapshot format was rejected")
  37. // errRejectSender is returned by Sync() when the snapshot sender is rejected.
  38. errRejectSender = errors.New("snapshot sender was rejected")
  39. // errVerifyFailed is returned by Sync() when app hash or last height verification fails.
  40. errVerifyFailed = errors.New("verification failed")
  41. // errTimeout is returned by Sync() when we've waited too long to receive a chunk.
  42. errTimeout = errors.New("timed out waiting for chunk")
  43. // errNoSnapshots is returned by SyncAny() if no snapshots are found and discovery is disabled.
  44. errNoSnapshots = errors.New("no suitable snapshots found")
  45. )
  46. // syncer runs a state sync against an ABCI app. Use either SyncAny() to automatically attempt to
  47. // sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific
  48. // snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate.
  49. type syncer struct {
  50. logger log.Logger
  51. stateProvider StateProvider
  52. conn proxy.AppConnSnapshot
  53. connQuery proxy.AppConnQuery
  54. snapshots *snapshotPool
  55. tempDir string
  56. mtx sync.RWMutex
  57. chunks *chunkQueue
  58. }
  59. // newSyncer creates a new syncer.
  60. func newSyncer(logger log.Logger, conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery,
  61. stateProvider StateProvider, tempDir string) *syncer {
  62. return &syncer{
  63. logger: logger,
  64. stateProvider: stateProvider,
  65. conn: conn,
  66. connQuery: connQuery,
  67. snapshots: newSnapshotPool(stateProvider),
  68. tempDir: tempDir,
  69. }
  70. }
  71. // AddChunk adds a chunk to the chunk queue, if any. It returns false if the chunk has already
  72. // been added to the queue, or an error if there's no sync in progress.
  73. func (s *syncer) AddChunk(chunk *chunk) (bool, error) {
  74. s.mtx.RLock()
  75. defer s.mtx.RUnlock()
  76. if s.chunks == nil {
  77. return false, errors.New("no state sync in progress")
  78. }
  79. added, err := s.chunks.Add(chunk)
  80. if err != nil {
  81. return false, err
  82. }
  83. if added {
  84. s.logger.Debug("Added chunk to queue", "height", chunk.Height, "format", chunk.Format,
  85. "chunk", chunk.Index)
  86. } else {
  87. s.logger.Debug("Ignoring duplicate chunk in queue", "height", chunk.Height, "format", chunk.Format,
  88. "chunk", chunk.Index)
  89. }
  90. return added, nil
  91. }
  92. // AddSnapshot adds a snapshot to the snapshot pool. It returns true if a new, previously unseen
  93. // snapshot was accepted and added.
  94. func (s *syncer) AddSnapshot(peer p2p.Peer, snapshot *snapshot) (bool, error) {
  95. added, err := s.snapshots.Add(peer, snapshot)
  96. if err != nil {
  97. return false, err
  98. }
  99. if added {
  100. s.logger.Info("Discovered new snapshot", "height", snapshot.Height, "format", snapshot.Format,
  101. "hash", fmt.Sprintf("%X", snapshot.Hash))
  102. }
  103. return added, nil
  104. }
  105. // AddPeer adds a peer to the pool. For now we just keep it simple and send a single request
  106. // to discover snapshots, later we may want to do retries and stuff.
  107. func (s *syncer) AddPeer(peer p2p.Peer) {
  108. s.logger.Debug("Requesting snapshots from peer", "peer", peer.ID())
  109. peer.Send(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
  110. }
  111. // RemovePeer removes a peer from the pool.
  112. func (s *syncer) RemovePeer(peer p2p.Peer) {
  113. s.logger.Debug("Removing peer from sync", "peer", peer.ID())
  114. s.snapshots.RemovePeer(peer.ID())
  115. }
  116. // SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further
  117. // snapshots if none were found and discoveryTime > 0. It returns the latest state and block commit
  118. // which the caller must use to bootstrap the node.
  119. func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit, error) {
  120. if discoveryTime > 0 {
  121. s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
  122. time.Sleep(discoveryTime)
  123. }
  124. // The app may ask us to retry a snapshot restoration, in which case we need to reuse
  125. // the snapshot and chunk queue from the previous loop iteration.
  126. var (
  127. snapshot *snapshot
  128. chunks *chunkQueue
  129. err error
  130. )
  131. for {
  132. // If not nil, we're going to retry restoration of the same snapshot.
  133. if snapshot == nil {
  134. snapshot = s.snapshots.Best()
  135. chunks = nil
  136. }
  137. if snapshot == nil {
  138. if discoveryTime == 0 {
  139. return sm.State{}, nil, errNoSnapshots
  140. }
  141. s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
  142. time.Sleep(discoveryTime)
  143. continue
  144. }
  145. if chunks == nil {
  146. chunks, err = newChunkQueue(snapshot, s.tempDir)
  147. if err != nil {
  148. return sm.State{}, nil, fmt.Errorf("failed to create chunk queue: %w", err)
  149. }
  150. defer chunks.Close() // in case we forget to close it elsewhere
  151. }
  152. newState, commit, err := s.Sync(snapshot, chunks)
  153. switch {
  154. case err == nil:
  155. return newState, commit, nil
  156. case errors.Is(err, errAbort):
  157. return sm.State{}, nil, err
  158. case errors.Is(err, errRetrySnapshot):
  159. chunks.RetryAll()
  160. s.logger.Info("Retrying snapshot", "height", snapshot.Height, "format", snapshot.Format,
  161. "hash", fmt.Sprintf("%X", snapshot.Hash))
  162. continue
  163. case errors.Is(err, errTimeout):
  164. s.snapshots.Reject(snapshot)
  165. s.logger.Error("Timed out waiting for snapshot chunks, rejected snapshot",
  166. "height", snapshot.Height, "format", snapshot.Format, "hash", fmt.Sprintf("%X", snapshot.Hash))
  167. case errors.Is(err, errRejectSnapshot):
  168. s.snapshots.Reject(snapshot)
  169. s.logger.Info("Snapshot rejected", "height", snapshot.Height, "format", snapshot.Format,
  170. "hash", fmt.Sprintf("%X", snapshot.Hash))
  171. case errors.Is(err, errRejectFormat):
  172. s.snapshots.RejectFormat(snapshot.Format)
  173. s.logger.Info("Snapshot format rejected", "format", snapshot.Format)
  174. case errors.Is(err, errRejectSender):
  175. s.logger.Info("Snapshot senders rejected", "height", snapshot.Height, "format", snapshot.Format,
  176. "hash", fmt.Sprintf("%X", snapshot.Hash))
  177. for _, peer := range s.snapshots.GetPeers(snapshot) {
  178. s.snapshots.RejectPeer(peer.ID())
  179. s.logger.Info("Snapshot sender rejected", "peer", peer.ID())
  180. }
  181. default:
  182. return sm.State{}, nil, fmt.Errorf("snapshot restoration failed: %w", err)
  183. }
  184. // Discard snapshot and chunks for next iteration
  185. err = chunks.Close()
  186. if err != nil {
  187. s.logger.Error("Failed to clean up chunk queue", "err", err)
  188. }
  189. snapshot = nil
  190. chunks = nil
  191. }
  192. }
  193. // Sync executes a sync for a specific snapshot, returning the latest state and block commit which
  194. // the caller must use to bootstrap the node.
  195. func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.Commit, error) {
  196. s.mtx.Lock()
  197. if s.chunks != nil {
  198. s.mtx.Unlock()
  199. return sm.State{}, nil, errors.New("a state sync is already in progress")
  200. }
  201. s.chunks = chunks
  202. s.mtx.Unlock()
  203. defer func() {
  204. s.mtx.Lock()
  205. s.chunks = nil
  206. s.mtx.Unlock()
  207. }()
  208. // Offer snapshot to ABCI app.
  209. err := s.offerSnapshot(snapshot)
  210. if err != nil {
  211. return sm.State{}, nil, err
  212. }
  213. // Spawn chunk fetchers. They will terminate when the chunk queue is closed or context cancelled.
  214. ctx, cancel := context.WithCancel(context.Background())
  215. defer cancel()
  216. for i := int32(0); i < chunkFetchers; i++ {
  217. go s.fetchChunks(ctx, snapshot, chunks)
  218. }
  219. // Optimistically build new state, so we don't discover any light client failures at the end.
  220. state, err := s.stateProvider.State(snapshot.Height)
  221. if err != nil {
  222. return sm.State{}, nil, fmt.Errorf("failed to build new state: %w", err)
  223. }
  224. commit, err := s.stateProvider.Commit(snapshot.Height)
  225. if err != nil {
  226. return sm.State{}, nil, fmt.Errorf("failed to fetch commit: %w", err)
  227. }
  228. // Restore snapshot
  229. err = s.applyChunks(chunks)
  230. if err != nil {
  231. return sm.State{}, nil, err
  232. }
  233. // Verify app and update app version
  234. appVersion, err := s.verifyApp(snapshot)
  235. if err != nil {
  236. return sm.State{}, nil, err
  237. }
  238. state.Version.Consensus.App = version.Protocol(appVersion)
  239. // Done! 🎉
  240. s.logger.Info("Snapshot restored", "height", snapshot.Height, "format", snapshot.Format,
  241. "hash", fmt.Sprintf("%X", snapshot.Hash))
  242. return state, commit, nil
  243. }
  244. // offerSnapshot offers a snapshot to the app. It returns various errors depending on the app's
  245. // response, or nil if the snapshot was accepted.
  246. func (s *syncer) offerSnapshot(snapshot *snapshot) error {
  247. s.logger.Info("Offering snapshot to ABCI app", "height", snapshot.Height,
  248. "format", snapshot.Format, "hash", fmt.Sprintf("%X", snapshot.Hash))
  249. resp, err := s.conn.OfferSnapshotSync(abci.RequestOfferSnapshot{
  250. Snapshot: &abci.Snapshot{
  251. Height: snapshot.Height,
  252. Format: snapshot.Format,
  253. Chunks: snapshot.Chunks,
  254. Hash: snapshot.Hash,
  255. Metadata: snapshot.Metadata,
  256. },
  257. AppHash: snapshot.trustedAppHash,
  258. })
  259. if err != nil {
  260. return fmt.Errorf("failed to offer snapshot: %w", err)
  261. }
  262. switch resp.Result {
  263. case abci.ResponseOfferSnapshot_ACCEPT:
  264. s.logger.Info("Snapshot accepted, restoring", "height", snapshot.Height,
  265. "format", snapshot.Format, "hash", fmt.Sprintf("%X", snapshot.Hash))
  266. return nil
  267. case abci.ResponseOfferSnapshot_ABORT:
  268. return errAbort
  269. case abci.ResponseOfferSnapshot_REJECT:
  270. return errRejectSnapshot
  271. case abci.ResponseOfferSnapshot_REJECT_FORMAT:
  272. return errRejectFormat
  273. case abci.ResponseOfferSnapshot_REJECT_SENDER:
  274. return errRejectSender
  275. default:
  276. return fmt.Errorf("invalid ResponseOfferSnapshot result %v", resp.Result)
  277. }
  278. }
  279. // applyChunks applies chunks to the app. It returns various errors depending on the app's
  280. // response, or nil once the snapshot is fully restored.
  281. func (s *syncer) applyChunks(chunks *chunkQueue) error {
  282. for {
  283. chunk, err := chunks.Next()
  284. if err == errDone {
  285. return nil
  286. } else if err != nil {
  287. return fmt.Errorf("failed to fetch chunk: %w", err)
  288. }
  289. resp, err := s.conn.ApplySnapshotChunkSync(abci.RequestApplySnapshotChunk{
  290. Index: chunk.Index,
  291. Chunk: chunk.Chunk,
  292. Sender: string(chunk.Sender),
  293. })
  294. if err != nil {
  295. return fmt.Errorf("failed to apply chunk %v: %w", chunk.Index, err)
  296. }
  297. s.logger.Info("Applied snapshot chunk to ABCI app", "height", chunk.Height,
  298. "format", chunk.Format, "chunk", chunk.Index, "total", chunks.Size())
  299. // Discard and refetch any chunks as requested by the app
  300. for _, index := range resp.RefetchChunks {
  301. err := chunks.Discard(index)
  302. if err != nil {
  303. return fmt.Errorf("failed to discard chunk %v: %w", index, err)
  304. }
  305. }
  306. // Reject any senders as requested by the app
  307. for _, sender := range resp.RejectSenders {
  308. if sender != "" {
  309. s.snapshots.RejectPeer(p2p.ID(sender))
  310. err := chunks.DiscardSender(p2p.ID(sender))
  311. if err != nil {
  312. return fmt.Errorf("failed to reject sender: %w", err)
  313. }
  314. }
  315. }
  316. switch resp.Result {
  317. case abci.ResponseApplySnapshotChunk_ACCEPT:
  318. case abci.ResponseApplySnapshotChunk_ABORT:
  319. return errAbort
  320. case abci.ResponseApplySnapshotChunk_RETRY:
  321. chunks.Retry(chunk.Index)
  322. case abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT:
  323. return errRetrySnapshot
  324. case abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT:
  325. return errRejectSnapshot
  326. default:
  327. return fmt.Errorf("unknown ResponseApplySnapshotChunk result %v", resp.Result)
  328. }
  329. }
  330. }
  331. // fetchChunks requests chunks from peers, receiving allocations from the chunk queue. Chunks
  332. // will be received from the reactor via syncer.AddChunks() to chunkQueue.Add().
  333. func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) {
  334. for {
  335. index, err := chunks.Allocate()
  336. if err == errDone {
  337. // Keep checking until the context is cancelled (restore is done), in case any
  338. // chunks need to be refetched.
  339. select {
  340. case <-ctx.Done():
  341. return
  342. default:
  343. }
  344. time.Sleep(2 * time.Second)
  345. continue
  346. }
  347. if err != nil {
  348. s.logger.Error("Failed to allocate chunk from queue", "err", err)
  349. return
  350. }
  351. s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height,
  352. "format", snapshot.Format, "chunk", index, "total", chunks.Size())
  353. ticker := time.NewTicker(chunkRequestTimeout)
  354. defer ticker.Stop()
  355. s.requestChunk(snapshot, index)
  356. select {
  357. case <-chunks.WaitFor(index):
  358. case <-ticker.C:
  359. s.requestChunk(snapshot, index)
  360. case <-ctx.Done():
  361. return
  362. }
  363. ticker.Stop()
  364. }
  365. }
  366. // requestChunk requests a chunk from a peer.
  367. func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
  368. peer := s.snapshots.GetPeer(snapshot)
  369. if peer == nil {
  370. s.logger.Error("No valid peers found for snapshot", "height", snapshot.Height,
  371. "format", snapshot.Format, "hash", snapshot.Hash)
  372. return
  373. }
  374. s.logger.Debug("Requesting snapshot chunk", "height", snapshot.Height,
  375. "format", snapshot.Format, "chunk", chunk, "peer", peer.ID())
  376. peer.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkRequest{
  377. Height: snapshot.Height,
  378. Format: snapshot.Format,
  379. Index: chunk,
  380. }))
  381. }
  382. // verifyApp verifies the sync, checking the app hash and last block height. It returns the
  383. // app version, which should be returned as part of the initial state.
  384. func (s *syncer) verifyApp(snapshot *snapshot) (uint64, error) {
  385. resp, err := s.connQuery.InfoSync(proxy.RequestInfo)
  386. if err != nil {
  387. return 0, fmt.Errorf("failed to query ABCI app for appHash: %w", err)
  388. }
  389. if !bytes.Equal(snapshot.trustedAppHash, resp.LastBlockAppHash) {
  390. s.logger.Error("appHash verification failed",
  391. "expected", fmt.Sprintf("%X", snapshot.trustedAppHash),
  392. "actual", fmt.Sprintf("%X", resp.LastBlockAppHash))
  393. return 0, errVerifyFailed
  394. }
  395. if uint64(resp.LastBlockHeight) != snapshot.Height {
  396. s.logger.Error("ABCI app reported unexpected last block height",
  397. "expected", snapshot.Height, "actual", resp.LastBlockHeight)
  398. return 0, errVerifyFailed
  399. }
  400. s.logger.Info("Verified ABCI app", "height", snapshot.Height,
  401. "appHash", fmt.Sprintf("%X", snapshot.trustedAppHash))
  402. return resp.AppVersion, nil
  403. }