You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

445 lines
15 KiB

  1. package statesync
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "time"
  8. abci "github.com/tendermint/tendermint/abci/types"
  9. "github.com/tendermint/tendermint/libs/log"
  10. tmsync "github.com/tendermint/tendermint/libs/sync"
  11. "github.com/tendermint/tendermint/p2p"
  12. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  13. "github.com/tendermint/tendermint/proxy"
  14. sm "github.com/tendermint/tendermint/state"
  15. "github.com/tendermint/tendermint/types"
  16. )
  17. const (
  18. // defaultDiscoveryTime is the time to spend discovering snapshots.
  19. defaultDiscoveryTime = 20 * time.Second
  20. // chunkFetchers is the number of concurrent chunk fetchers to run.
  21. chunkFetchers = 4
  22. // chunkTimeout is the timeout while waiting for the next chunk from the chunk queue.
  23. chunkTimeout = 2 * time.Minute
  24. // requestTimeout is the timeout before rerequesting a chunk, possibly from a different peer.
  25. chunkRequestTimeout = 10 * time.Second
  26. )
  27. var (
  28. // errAbort is returned by Sync() when snapshot restoration is aborted.
  29. errAbort = errors.New("state sync aborted")
  30. // errRetrySnapshot is returned by Sync() when the snapshot should be retried.
  31. errRetrySnapshot = errors.New("retry snapshot")
  32. // errRejectSnapshot is returned by Sync() when the snapshot is rejected.
  33. errRejectSnapshot = errors.New("snapshot was rejected")
  34. // errRejectFormat is returned by Sync() when the snapshot format is rejected.
  35. errRejectFormat = errors.New("snapshot format was rejected")
  36. // errRejectSender is returned by Sync() when the snapshot sender is rejected.
  37. errRejectSender = errors.New("snapshot sender was rejected")
  38. // errVerifyFailed is returned by Sync() when app hash or last height verification fails.
  39. errVerifyFailed = errors.New("verification failed")
  40. // errTimeout is returned by Sync() when we've waited too long to receive a chunk.
  41. errTimeout = errors.New("timed out waiting for chunk")
  42. // errNoSnapshots is returned by SyncAny() if no snapshots are found and discovery is disabled.
  43. errNoSnapshots = errors.New("no suitable snapshots found")
  44. )
  45. // syncer runs a state sync against an ABCI app. Use either SyncAny() to automatically attempt to
  46. // sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific
  47. // snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate.
  48. type syncer struct {
  49. logger log.Logger
  50. stateProvider StateProvider
  51. conn proxy.AppConnSnapshot
  52. connQuery proxy.AppConnQuery
  53. snapshots *snapshotPool
  54. tempDir string
  55. mtx tmsync.RWMutex
  56. chunks *chunkQueue
  57. }
  58. // newSyncer creates a new syncer.
  59. func newSyncer(logger log.Logger, conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery,
  60. stateProvider StateProvider, tempDir string) *syncer {
  61. return &syncer{
  62. logger: logger,
  63. stateProvider: stateProvider,
  64. conn: conn,
  65. connQuery: connQuery,
  66. snapshots: newSnapshotPool(stateProvider),
  67. tempDir: tempDir,
  68. }
  69. }
  70. // AddChunk adds a chunk to the chunk queue, if any. It returns false if the chunk has already
  71. // been added to the queue, or an error if there's no sync in progress.
  72. func (s *syncer) AddChunk(chunk *chunk) (bool, error) {
  73. s.mtx.RLock()
  74. defer s.mtx.RUnlock()
  75. if s.chunks == nil {
  76. return false, errors.New("no state sync in progress")
  77. }
  78. added, err := s.chunks.Add(chunk)
  79. if err != nil {
  80. return false, err
  81. }
  82. if added {
  83. s.logger.Debug("Added chunk to queue", "height", chunk.Height, "format", chunk.Format,
  84. "chunk", chunk.Index)
  85. } else {
  86. s.logger.Debug("Ignoring duplicate chunk in queue", "height", chunk.Height, "format", chunk.Format,
  87. "chunk", chunk.Index)
  88. }
  89. return added, nil
  90. }
  91. // AddSnapshot adds a snapshot to the snapshot pool. It returns true if a new, previously unseen
  92. // snapshot was accepted and added.
  93. func (s *syncer) AddSnapshot(peer p2p.Peer, snapshot *snapshot) (bool, error) {
  94. added, err := s.snapshots.Add(peer, snapshot)
  95. if err != nil {
  96. return false, err
  97. }
  98. if added {
  99. s.logger.Info("Discovered new snapshot", "height", snapshot.Height, "format", snapshot.Format,
  100. "hash", fmt.Sprintf("%X", snapshot.Hash))
  101. }
  102. return added, nil
  103. }
  104. // AddPeer adds a peer to the pool. For now we just keep it simple and send a single request
  105. // to discover snapshots, later we may want to do retries and stuff.
  106. func (s *syncer) AddPeer(peer p2p.Peer) {
  107. s.logger.Debug("Requesting snapshots from peer", "peer", peer.ID())
  108. peer.Send(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
  109. }
  110. // RemovePeer removes a peer from the pool.
  111. func (s *syncer) RemovePeer(peer p2p.Peer) {
  112. s.logger.Debug("Removing peer from sync", "peer", peer.ID())
  113. s.snapshots.RemovePeer(peer.ID())
  114. }
  115. // SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further
  116. // snapshots if none were found and discoveryTime > 0. It returns the latest state and block commit
  117. // which the caller must use to bootstrap the node.
  118. func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit, error) {
  119. if discoveryTime > 0 {
  120. s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
  121. time.Sleep(discoveryTime)
  122. }
  123. // The app may ask us to retry a snapshot restoration, in which case we need to reuse
  124. // the snapshot and chunk queue from the previous loop iteration.
  125. var (
  126. snapshot *snapshot
  127. chunks *chunkQueue
  128. err error
  129. )
  130. for {
  131. // If not nil, we're going to retry restoration of the same snapshot.
  132. if snapshot == nil {
  133. snapshot = s.snapshots.Best()
  134. chunks = nil
  135. }
  136. if snapshot == nil {
  137. if discoveryTime == 0 {
  138. return sm.State{}, nil, errNoSnapshots
  139. }
  140. s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
  141. time.Sleep(discoveryTime)
  142. continue
  143. }
  144. if chunks == nil {
  145. chunks, err = newChunkQueue(snapshot, s.tempDir)
  146. if err != nil {
  147. return sm.State{}, nil, fmt.Errorf("failed to create chunk queue: %w", err)
  148. }
  149. defer chunks.Close() // in case we forget to close it elsewhere
  150. }
  151. newState, commit, err := s.Sync(snapshot, chunks)
  152. switch {
  153. case err == nil:
  154. return newState, commit, nil
  155. case errors.Is(err, errAbort):
  156. return sm.State{}, nil, err
  157. case errors.Is(err, errRetrySnapshot):
  158. chunks.RetryAll()
  159. s.logger.Info("Retrying snapshot", "height", snapshot.Height, "format", snapshot.Format,
  160. "hash", fmt.Sprintf("%X", snapshot.Hash))
  161. continue
  162. case errors.Is(err, errTimeout):
  163. s.snapshots.Reject(snapshot)
  164. s.logger.Error("Timed out waiting for snapshot chunks, rejected snapshot",
  165. "height", snapshot.Height, "format", snapshot.Format, "hash", fmt.Sprintf("%X", snapshot.Hash))
  166. case errors.Is(err, errRejectSnapshot):
  167. s.snapshots.Reject(snapshot)
  168. s.logger.Info("Snapshot rejected", "height", snapshot.Height, "format", snapshot.Format,
  169. "hash", fmt.Sprintf("%X", snapshot.Hash))
  170. case errors.Is(err, errRejectFormat):
  171. s.snapshots.RejectFormat(snapshot.Format)
  172. s.logger.Info("Snapshot format rejected", "format", snapshot.Format)
  173. case errors.Is(err, errRejectSender):
  174. s.logger.Info("Snapshot senders rejected", "height", snapshot.Height, "format", snapshot.Format,
  175. "hash", fmt.Sprintf("%X", snapshot.Hash))
  176. for _, peer := range s.snapshots.GetPeers(snapshot) {
  177. s.snapshots.RejectPeer(peer.ID())
  178. s.logger.Info("Snapshot sender rejected", "peer", peer.ID())
  179. }
  180. default:
  181. return sm.State{}, nil, fmt.Errorf("snapshot restoration failed: %w", err)
  182. }
  183. // Discard snapshot and chunks for next iteration
  184. err = chunks.Close()
  185. if err != nil {
  186. s.logger.Error("Failed to clean up chunk queue", "err", err)
  187. }
  188. snapshot = nil
  189. chunks = nil
  190. }
  191. }
  192. // Sync executes a sync for a specific snapshot, returning the latest state and block commit which
  193. // the caller must use to bootstrap the node.
  194. func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.Commit, error) {
  195. s.mtx.Lock()
  196. if s.chunks != nil {
  197. s.mtx.Unlock()
  198. return sm.State{}, nil, errors.New("a state sync is already in progress")
  199. }
  200. s.chunks = chunks
  201. s.mtx.Unlock()
  202. defer func() {
  203. s.mtx.Lock()
  204. s.chunks = nil
  205. s.mtx.Unlock()
  206. }()
  207. // Offer snapshot to ABCI app.
  208. err := s.offerSnapshot(snapshot)
  209. if err != nil {
  210. return sm.State{}, nil, err
  211. }
  212. // Spawn chunk fetchers. They will terminate when the chunk queue is closed or context cancelled.
  213. ctx, cancel := context.WithCancel(context.Background())
  214. defer cancel()
  215. for i := int32(0); i < chunkFetchers; i++ {
  216. go s.fetchChunks(ctx, snapshot, chunks)
  217. }
  218. pctx, pcancel := context.WithTimeout(context.Background(), 10*time.Second)
  219. defer pcancel()
  220. // Optimistically build new state, so we don't discover any light client failures at the end.
  221. state, err := s.stateProvider.State(pctx, snapshot.Height)
  222. if err != nil {
  223. return sm.State{}, nil, fmt.Errorf("failed to build new state: %w", err)
  224. }
  225. commit, err := s.stateProvider.Commit(pctx, snapshot.Height)
  226. if err != nil {
  227. return sm.State{}, nil, fmt.Errorf("failed to fetch commit: %w", err)
  228. }
  229. // Restore snapshot
  230. err = s.applyChunks(chunks)
  231. if err != nil {
  232. return sm.State{}, nil, err
  233. }
  234. // Verify app and update app version
  235. appVersion, err := s.verifyApp(snapshot)
  236. if err != nil {
  237. return sm.State{}, nil, err
  238. }
  239. state.Version.Consensus.App = appVersion
  240. // Done! 🎉
  241. s.logger.Info("Snapshot restored", "height", snapshot.Height, "format", snapshot.Format,
  242. "hash", fmt.Sprintf("%X", snapshot.Hash))
  243. return state, commit, nil
  244. }
  245. // offerSnapshot offers a snapshot to the app. It returns various errors depending on the app's
  246. // response, or nil if the snapshot was accepted.
  247. func (s *syncer) offerSnapshot(snapshot *snapshot) error {
  248. s.logger.Info("Offering snapshot to ABCI app", "height", snapshot.Height,
  249. "format", snapshot.Format, "hash", fmt.Sprintf("%X", snapshot.Hash))
  250. resp, err := s.conn.OfferSnapshotSync(abci.RequestOfferSnapshot{
  251. Snapshot: &abci.Snapshot{
  252. Height: snapshot.Height,
  253. Format: snapshot.Format,
  254. Chunks: snapshot.Chunks,
  255. Hash: snapshot.Hash,
  256. Metadata: snapshot.Metadata,
  257. },
  258. AppHash: snapshot.trustedAppHash,
  259. })
  260. if err != nil {
  261. return fmt.Errorf("failed to offer snapshot: %w", err)
  262. }
  263. switch resp.Result {
  264. case abci.ResponseOfferSnapshot_ACCEPT:
  265. s.logger.Info("Snapshot accepted, restoring", "height", snapshot.Height,
  266. "format", snapshot.Format, "hash", fmt.Sprintf("%X", snapshot.Hash))
  267. return nil
  268. case abci.ResponseOfferSnapshot_ABORT:
  269. return errAbort
  270. case abci.ResponseOfferSnapshot_REJECT:
  271. return errRejectSnapshot
  272. case abci.ResponseOfferSnapshot_REJECT_FORMAT:
  273. return errRejectFormat
  274. case abci.ResponseOfferSnapshot_REJECT_SENDER:
  275. return errRejectSender
  276. default:
  277. return fmt.Errorf("unknown ResponseOfferSnapshot result %v", resp.Result)
  278. }
  279. }
  280. // applyChunks applies chunks to the app. It returns various errors depending on the app's
  281. // response, or nil once the snapshot is fully restored.
  282. func (s *syncer) applyChunks(chunks *chunkQueue) error {
  283. for {
  284. chunk, err := chunks.Next()
  285. if err == errDone {
  286. return nil
  287. } else if err != nil {
  288. return fmt.Errorf("failed to fetch chunk: %w", err)
  289. }
  290. resp, err := s.conn.ApplySnapshotChunkSync(abci.RequestApplySnapshotChunk{
  291. Index: chunk.Index,
  292. Chunk: chunk.Chunk,
  293. Sender: string(chunk.Sender),
  294. })
  295. if err != nil {
  296. return fmt.Errorf("failed to apply chunk %v: %w", chunk.Index, err)
  297. }
  298. s.logger.Info("Applied snapshot chunk to ABCI app", "height", chunk.Height,
  299. "format", chunk.Format, "chunk", chunk.Index, "total", chunks.Size())
  300. // Discard and refetch any chunks as requested by the app
  301. for _, index := range resp.RefetchChunks {
  302. err := chunks.Discard(index)
  303. if err != nil {
  304. return fmt.Errorf("failed to discard chunk %v: %w", index, err)
  305. }
  306. }
  307. // Reject any senders as requested by the app
  308. for _, sender := range resp.RejectSenders {
  309. if sender != "" {
  310. s.snapshots.RejectPeer(p2p.ID(sender))
  311. err := chunks.DiscardSender(p2p.ID(sender))
  312. if err != nil {
  313. return fmt.Errorf("failed to reject sender: %w", err)
  314. }
  315. }
  316. }
  317. switch resp.Result {
  318. case abci.ResponseApplySnapshotChunk_ACCEPT:
  319. case abci.ResponseApplySnapshotChunk_ABORT:
  320. return errAbort
  321. case abci.ResponseApplySnapshotChunk_RETRY:
  322. chunks.Retry(chunk.Index)
  323. case abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT:
  324. return errRetrySnapshot
  325. case abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT:
  326. return errRejectSnapshot
  327. default:
  328. return fmt.Errorf("unknown ResponseApplySnapshotChunk result %v", resp.Result)
  329. }
  330. }
  331. }
  332. // fetchChunks requests chunks from peers, receiving allocations from the chunk queue. Chunks
  333. // will be received from the reactor via syncer.AddChunks() to chunkQueue.Add().
  334. func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) {
  335. for {
  336. index, err := chunks.Allocate()
  337. if err == errDone {
  338. // Keep checking until the context is cancelled (restore is done), in case any
  339. // chunks need to be refetched.
  340. select {
  341. case <-ctx.Done():
  342. return
  343. default:
  344. }
  345. time.Sleep(2 * time.Second)
  346. continue
  347. }
  348. if err != nil {
  349. s.logger.Error("Failed to allocate chunk from queue", "err", err)
  350. return
  351. }
  352. s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height,
  353. "format", snapshot.Format, "chunk", index, "total", chunks.Size())
  354. ticker := time.NewTicker(chunkRequestTimeout)
  355. defer ticker.Stop()
  356. s.requestChunk(snapshot, index)
  357. select {
  358. case <-chunks.WaitFor(index):
  359. case <-ticker.C:
  360. s.requestChunk(snapshot, index)
  361. case <-ctx.Done():
  362. return
  363. }
  364. ticker.Stop()
  365. }
  366. }
  367. // requestChunk requests a chunk from a peer.
  368. func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
  369. peer := s.snapshots.GetPeer(snapshot)
  370. if peer == nil {
  371. s.logger.Error("No valid peers found for snapshot", "height", snapshot.Height,
  372. "format", snapshot.Format, "hash", snapshot.Hash)
  373. return
  374. }
  375. s.logger.Debug("Requesting snapshot chunk", "height", snapshot.Height,
  376. "format", snapshot.Format, "chunk", chunk, "peer", peer.ID())
  377. peer.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkRequest{
  378. Height: snapshot.Height,
  379. Format: snapshot.Format,
  380. Index: chunk,
  381. }))
  382. }
  383. // verifyApp verifies the sync, checking the app hash and last block height. It returns the
  384. // app version, which should be returned as part of the initial state.
  385. func (s *syncer) verifyApp(snapshot *snapshot) (uint64, error) {
  386. resp, err := s.connQuery.InfoSync(proxy.RequestInfo)
  387. if err != nil {
  388. return 0, fmt.Errorf("failed to query ABCI app for appHash: %w", err)
  389. }
  390. if !bytes.Equal(snapshot.trustedAppHash, resp.LastBlockAppHash) {
  391. s.logger.Error("appHash verification failed",
  392. "expected", fmt.Sprintf("%X", snapshot.trustedAppHash),
  393. "actual", fmt.Sprintf("%X", resp.LastBlockAppHash))
  394. return 0, errVerifyFailed
  395. }
  396. if uint64(resp.LastBlockHeight) != snapshot.Height {
  397. s.logger.Error("ABCI app reported unexpected last block height",
  398. "expected", snapshot.Height, "actual", resp.LastBlockHeight)
  399. return 0, errVerifyFailed
  400. }
  401. s.logger.Info("Verified ABCI app", "height", snapshot.Height,
  402. "appHash", fmt.Sprintf("%X", snapshot.trustedAppHash))
  403. return resp.AppVersion, nil
  404. }