You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1130 lines
31 KiB

  1. package statesync
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "reflect"
  8. "runtime/debug"
  9. "sort"
  10. "time"
  11. abci "github.com/tendermint/tendermint/abci/types"
  12. "github.com/tendermint/tendermint/config"
  13. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  14. "github.com/tendermint/tendermint/internal/p2p"
  15. "github.com/tendermint/tendermint/internal/proxy"
  16. sm "github.com/tendermint/tendermint/internal/state"
  17. "github.com/tendermint/tendermint/internal/store"
  18. "github.com/tendermint/tendermint/libs/log"
  19. "github.com/tendermint/tendermint/libs/service"
  20. "github.com/tendermint/tendermint/light"
  21. "github.com/tendermint/tendermint/light/provider"
  22. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  23. "github.com/tendermint/tendermint/types"
  24. )
  25. var (
  26. _ service.Service = (*Reactor)(nil)
  27. _ p2p.Wrapper = (*ssproto.Message)(nil)
  28. // ChannelShims contains a map of ChannelDescriptorShim objects, where each
  29. // object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
  30. // p2p proto.Message the new p2p Channel is responsible for handling.
  31. //
  32. //
  33. // TODO: Remove once p2p refactor is complete.
  34. // ref: https://github.com/tendermint/tendermint/issues/5670
  35. ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
  36. SnapshotChannel: {
  37. MsgType: new(ssproto.Message),
  38. Descriptor: &p2p.ChannelDescriptor{
  39. ID: byte(SnapshotChannel),
  40. Priority: 6,
  41. SendQueueCapacity: 10,
  42. RecvMessageCapacity: snapshotMsgSize,
  43. RecvBufferCapacity: 128,
  44. MaxSendBytes: 400,
  45. },
  46. },
  47. ChunkChannel: {
  48. MsgType: new(ssproto.Message),
  49. Descriptor: &p2p.ChannelDescriptor{
  50. ID: byte(ChunkChannel),
  51. Priority: 3,
  52. SendQueueCapacity: 4,
  53. RecvMessageCapacity: chunkMsgSize,
  54. RecvBufferCapacity: 128,
  55. MaxSendBytes: 400,
  56. },
  57. },
  58. LightBlockChannel: {
  59. MsgType: new(ssproto.Message),
  60. Descriptor: &p2p.ChannelDescriptor{
  61. ID: byte(LightBlockChannel),
  62. Priority: 5,
  63. SendQueueCapacity: 10,
  64. RecvMessageCapacity: lightBlockMsgSize,
  65. RecvBufferCapacity: 128,
  66. MaxSendBytes: 400,
  67. },
  68. },
  69. ParamsChannel: {
  70. MsgType: new(ssproto.Message),
  71. Descriptor: &p2p.ChannelDescriptor{
  72. ID: byte(ParamsChannel),
  73. Priority: 2,
  74. SendQueueCapacity: 10,
  75. RecvMessageCapacity: paramMsgSize,
  76. RecvBufferCapacity: 128,
  77. MaxSendBytes: 400,
  78. },
  79. },
  80. }
  81. )
  82. const (
  83. // SnapshotChannel exchanges snapshot metadata
  84. SnapshotChannel = p2p.ChannelID(0x60)
  85. // ChunkChannel exchanges chunk contents
  86. ChunkChannel = p2p.ChannelID(0x61)
  87. // LightBlockChannel exchanges light blocks
  88. LightBlockChannel = p2p.ChannelID(0x62)
  89. // ParamsChannel exchanges consensus params
  90. ParamsChannel = p2p.ChannelID(0x63)
  91. // recentSnapshots is the number of recent snapshots to send and receive per peer.
  92. recentSnapshots = 10
  93. // snapshotMsgSize is the maximum size of a snapshotResponseMessage
  94. snapshotMsgSize = int(4e6) // ~4MB
  95. // chunkMsgSize is the maximum size of a chunkResponseMessage
  96. chunkMsgSize = int(16e6) // ~16MB
  97. // lightBlockMsgSize is the maximum size of a lightBlockResponseMessage
  98. lightBlockMsgSize = int(1e7) // ~1MB
  99. // paramMsgSize is the maximum size of a paramsResponseMessage
  100. paramMsgSize = int(1e5) // ~100kb
  101. // lightBlockResponseTimeout is how long the dispatcher waits for a peer to
  102. // return a light block
  103. lightBlockResponseTimeout = 10 * time.Second
  104. // consensusParamsResponseTimeout is the time the p2p state provider waits
  105. // before performing a secondary call
  106. consensusParamsResponseTimeout = 5 * time.Second
  107. // maxLightBlockRequestRetries is the amount of retries acceptable before
  108. // the backfill process aborts
  109. maxLightBlockRequestRetries = 20
  110. )
  111. // Metricer defines an interface used for the rpc sync info query, please see statesync.metrics
  112. // for the details.
  113. type Metricer interface {
  114. TotalSnapshots() int64
  115. ChunkProcessAvgTime() time.Duration
  116. SnapshotHeight() int64
  117. SnapshotChunksCount() int64
  118. SnapshotChunksTotal() int64
  119. BackFilledBlocks() int64
  120. BackFillBlocksTotal() int64
  121. }
  122. // Reactor handles state sync, both restoring snapshots for the local node and
  123. // serving snapshots for other nodes.
  124. type Reactor struct {
  125. service.BaseService
  126. chainID string
  127. initialHeight int64
  128. cfg config.StateSyncConfig
  129. stateStore sm.Store
  130. blockStore *store.BlockStore
  131. conn proxy.AppConnSnapshot
  132. connQuery proxy.AppConnQuery
  133. tempDir string
  134. snapshotCh *p2p.Channel
  135. chunkCh *p2p.Channel
  136. blockCh *p2p.Channel
  137. paramsCh *p2p.Channel
  138. peerUpdates *p2p.PeerUpdates
  139. closeCh chan struct{}
  140. // Dispatcher is used to multiplex light block requests and responses over multiple
  141. // peers used by the p2p state provider and in reverse sync.
  142. dispatcher *Dispatcher
  143. peers *peerList
  144. // These will only be set when a state sync is in progress. It is used to feed
  145. // received snapshots and chunks into the syncer and manage incoming and outgoing
  146. // providers.
  147. mtx tmsync.RWMutex
  148. syncer *syncer
  149. providers map[types.NodeID]*BlockProvider
  150. stateProvider StateProvider
  151. metrics *Metrics
  152. backfillBlockTotal int64
  153. backfilledBlocks int64
  154. }
  155. // NewReactor returns a reference to a new state sync reactor, which implements
  156. // the service.Service interface. It accepts a logger, connections for snapshots
  157. // and querying, references to p2p Channels and a channel to listen for peer
  158. // updates on. Note, the reactor will close all p2p Channels when stopping.
  159. func NewReactor(
  160. chainID string,
  161. initialHeight int64,
  162. cfg config.StateSyncConfig,
  163. logger log.Logger,
  164. conn proxy.AppConnSnapshot,
  165. connQuery proxy.AppConnQuery,
  166. snapshotCh, chunkCh, blockCh, paramsCh *p2p.Channel,
  167. peerUpdates *p2p.PeerUpdates,
  168. stateStore sm.Store,
  169. blockStore *store.BlockStore,
  170. tempDir string,
  171. ssMetrics *Metrics,
  172. ) *Reactor {
  173. r := &Reactor{
  174. chainID: chainID,
  175. initialHeight: initialHeight,
  176. cfg: cfg,
  177. conn: conn,
  178. connQuery: connQuery,
  179. snapshotCh: snapshotCh,
  180. chunkCh: chunkCh,
  181. blockCh: blockCh,
  182. paramsCh: paramsCh,
  183. peerUpdates: peerUpdates,
  184. closeCh: make(chan struct{}),
  185. tempDir: tempDir,
  186. stateStore: stateStore,
  187. blockStore: blockStore,
  188. peers: newPeerList(),
  189. dispatcher: NewDispatcher(blockCh.Out),
  190. providers: make(map[types.NodeID]*BlockProvider),
  191. metrics: ssMetrics,
  192. }
  193. r.BaseService = *service.NewBaseService(logger, "StateSync", r)
  194. return r
  195. }
  196. // OnStart starts separate go routines for each p2p Channel and listens for
  197. // envelopes on each. In addition, it also listens for peer updates and handles
  198. // messages on that p2p channel accordingly. Note, we do not launch a go-routine to
  199. // handle individual envelopes as to not have to deal with bounding workers or pools.
  200. // The caller must be sure to execute OnStop to ensure the outbound p2p Channels are
  201. // closed. No error is returned.
  202. func (r *Reactor) OnStart() error {
  203. go r.processSnapshotCh()
  204. go r.processChunkCh()
  205. go r.processBlockCh()
  206. go r.processParamsCh()
  207. go r.processPeerUpdates()
  208. return nil
  209. }
  210. // OnStop stops the reactor by signaling to all spawned goroutines to exit and
  211. // blocking until they all exit.
  212. func (r *Reactor) OnStop() {
  213. // tell the dispatcher to stop sending any more requests
  214. r.dispatcher.Close()
  215. // wait for any remaining requests to complete
  216. <-r.dispatcher.Done()
  217. // Close closeCh to signal to all spawned goroutines to gracefully exit. All
  218. // p2p Channels should execute Close().
  219. close(r.closeCh)
  220. // Wait for all p2p Channels to be closed before returning. This ensures we
  221. // can easily reason about synchronization of all p2p Channels and ensure no
  222. // panics will occur.
  223. <-r.snapshotCh.Done()
  224. <-r.chunkCh.Done()
  225. <-r.blockCh.Done()
  226. <-r.paramsCh.Done()
  227. <-r.peerUpdates.Done()
  228. }
  229. // Sync runs a state sync, fetching snapshots and providing chunks to the
  230. // application. At the close of the operation, Sync will bootstrap the state
  231. // store and persist the commit at that height so that either consensus or
  232. // blocksync can commence. It will then proceed to backfill the necessary amount
  233. // of historical blocks before participating in consensus
  234. func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
  235. // We need at least two peers (for cross-referencing of light blocks) before we can
  236. // begin state sync
  237. if err := r.waitForEnoughPeers(ctx, 2); err != nil {
  238. return sm.State{}, err
  239. }
  240. r.mtx.Lock()
  241. if r.syncer != nil {
  242. r.mtx.Unlock()
  243. return sm.State{}, errors.New("a state sync is already in progress")
  244. }
  245. if err := r.initStateProvider(ctx, r.chainID, r.initialHeight); err != nil {
  246. return sm.State{}, err
  247. }
  248. r.syncer = newSyncer(
  249. r.cfg,
  250. r.Logger,
  251. r.conn,
  252. r.connQuery,
  253. r.stateProvider,
  254. r.snapshotCh.Out,
  255. r.chunkCh.Out,
  256. r.snapshotCh.Done(),
  257. r.tempDir,
  258. r.metrics,
  259. )
  260. r.mtx.Unlock()
  261. defer func() {
  262. r.mtx.Lock()
  263. // reset syncing objects at the close of Sync
  264. r.syncer = nil
  265. r.stateProvider = nil
  266. r.mtx.Unlock()
  267. }()
  268. requestSnapshotsHook := func() {
  269. // request snapshots from all currently connected peers
  270. msg := p2p.Envelope{
  271. Broadcast: true,
  272. Message: &ssproto.SnapshotsRequest{},
  273. }
  274. select {
  275. case <-ctx.Done():
  276. case <-r.closeCh:
  277. case r.snapshotCh.Out <- msg:
  278. }
  279. }
  280. state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, requestSnapshotsHook)
  281. if err != nil {
  282. return sm.State{}, err
  283. }
  284. err = r.stateStore.Bootstrap(state)
  285. if err != nil {
  286. return sm.State{}, fmt.Errorf("failed to bootstrap node with new state: %w", err)
  287. }
  288. err = r.blockStore.SaveSeenCommit(state.LastBlockHeight, commit)
  289. if err != nil {
  290. return sm.State{}, fmt.Errorf("failed to store last seen commit: %w", err)
  291. }
  292. err = r.Backfill(ctx, state)
  293. if err != nil {
  294. r.Logger.Error("backfill failed. Proceeding optimistically...", "err", err)
  295. }
  296. return state, nil
  297. }
  298. // Backfill sequentially fetches, verifies and stores light blocks in reverse
  299. // order. It does not stop verifying blocks until reaching a block with a height
  300. // and time that is less or equal to the stopHeight and stopTime. The
  301. // trustedBlockID should be of the header at startHeight.
  302. func (r *Reactor) Backfill(ctx context.Context, state sm.State) error {
  303. params := state.ConsensusParams.Evidence
  304. stopHeight := state.LastBlockHeight - params.MaxAgeNumBlocks
  305. stopTime := state.LastBlockTime.Add(-params.MaxAgeDuration)
  306. // ensure that stop height doesn't go below the initial height
  307. if stopHeight < state.InitialHeight {
  308. stopHeight = state.InitialHeight
  309. // this essentially makes stop time a void criteria for termination
  310. stopTime = state.LastBlockTime
  311. }
  312. return r.backfill(
  313. ctx,
  314. state.ChainID,
  315. state.LastBlockHeight,
  316. stopHeight,
  317. state.InitialHeight,
  318. state.LastBlockID,
  319. stopTime,
  320. )
  321. }
  322. func (r *Reactor) backfill(
  323. ctx context.Context,
  324. chainID string,
  325. startHeight, stopHeight, initialHeight int64,
  326. trustedBlockID types.BlockID,
  327. stopTime time.Time,
  328. ) error {
  329. r.Logger.Info("starting backfill process...", "startHeight", startHeight,
  330. "stopHeight", stopHeight, "stopTime", stopTime, "trustedBlockID", trustedBlockID)
  331. r.backfillBlockTotal = startHeight - stopHeight + 1
  332. r.metrics.BackFillBlocksTotal.Set(float64(r.backfillBlockTotal))
  333. const sleepTime = 1 * time.Second
  334. var (
  335. lastValidatorSet *types.ValidatorSet
  336. lastChangeHeight = startHeight
  337. )
  338. queue := newBlockQueue(startHeight, stopHeight, initialHeight, stopTime, maxLightBlockRequestRetries)
  339. // fetch light blocks across four workers. The aim with deploying concurrent
  340. // workers is to equate the network messaging time with the verification
  341. // time. Ideally we want the verification process to never have to be
  342. // waiting on blocks. If it takes 4s to retrieve a block and 1s to verify
  343. // it, then steady state involves four workers.
  344. for i := 0; i < int(r.cfg.Fetchers); i++ {
  345. ctxWithCancel, cancel := context.WithCancel(ctx)
  346. defer cancel()
  347. go func() {
  348. for {
  349. select {
  350. case height := <-queue.nextHeight():
  351. // pop the next peer of the list to send a request to
  352. peer := r.peers.Pop(ctx)
  353. r.Logger.Debug("fetching next block", "height", height, "peer", peer)
  354. subCtx, cancel := context.WithTimeout(ctxWithCancel, lightBlockResponseTimeout)
  355. defer cancel()
  356. lb, err := func() (*types.LightBlock, error) {
  357. defer cancel()
  358. // request the light block with a timeout
  359. return r.dispatcher.LightBlock(subCtx, height, peer)
  360. }()
  361. // once the peer has returned a value, add it back to the peer list to be used again
  362. r.peers.Append(peer)
  363. if errors.Is(err, context.Canceled) {
  364. return
  365. }
  366. if err != nil {
  367. queue.retry(height)
  368. if errors.Is(err, errNoConnectedPeers) {
  369. r.Logger.Info("backfill: no connected peers to fetch light blocks from; sleeping...",
  370. "sleepTime", sleepTime)
  371. time.Sleep(sleepTime)
  372. } else {
  373. // we don't punish the peer as it might just have not responded in time
  374. r.Logger.Info("backfill: error with fetching light block",
  375. "height", height, "err", err)
  376. }
  377. continue
  378. }
  379. if lb == nil {
  380. r.Logger.Info("backfill: peer didn't have block, fetching from another peer", "height", height)
  381. queue.retry(height)
  382. // As we are fetching blocks backwards, if this node doesn't have the block it likely doesn't
  383. // have any prior ones, thus we remove it from the peer list.
  384. r.peers.Remove(peer)
  385. continue
  386. }
  387. // run a validate basic. This checks the validator set and commit
  388. // hashes line up
  389. err = lb.ValidateBasic(chainID)
  390. if err != nil || lb.Height != height {
  391. r.Logger.Info("backfill: fetched light block failed validate basic, removing peer...",
  392. "err", err, "height", height)
  393. queue.retry(height)
  394. r.blockCh.Error <- p2p.PeerError{
  395. NodeID: peer,
  396. Err: fmt.Errorf("received invalid light block: %w", err),
  397. }
  398. continue
  399. }
  400. // add block to queue to be verified
  401. queue.add(lightBlockResponse{
  402. block: lb,
  403. peer: peer,
  404. })
  405. r.Logger.Debug("backfill: added light block to processing queue", "height", height)
  406. case <-queue.done():
  407. return
  408. }
  409. }
  410. }()
  411. }
  412. // verify all light blocks
  413. for {
  414. select {
  415. case <-r.closeCh:
  416. queue.close()
  417. return nil
  418. case <-ctx.Done():
  419. queue.close()
  420. return nil
  421. case resp := <-queue.verifyNext():
  422. // validate the header hash. We take the last block id of the
  423. // previous header (i.e. one height above) as the trusted hash which
  424. // we equate to. ValidatorsHash and CommitHash have already been
  425. // checked in the `ValidateBasic`
  426. if w, g := trustedBlockID.Hash, resp.block.Hash(); !bytes.Equal(w, g) {
  427. r.Logger.Info("received invalid light block. header hash doesn't match trusted LastBlockID",
  428. "trustedHash", w, "receivedHash", g, "height", resp.block.Height)
  429. r.blockCh.Error <- p2p.PeerError{
  430. NodeID: resp.peer,
  431. Err: fmt.Errorf("received invalid light block. Expected hash %v, got: %v", w, g),
  432. }
  433. queue.retry(resp.block.Height)
  434. continue
  435. }
  436. // save the signed headers
  437. err := r.blockStore.SaveSignedHeader(resp.block.SignedHeader, trustedBlockID)
  438. if err != nil {
  439. return err
  440. }
  441. // check if there has been a change in the validator set
  442. if lastValidatorSet != nil && !bytes.Equal(resp.block.Header.ValidatorsHash, resp.block.Header.NextValidatorsHash) {
  443. // save all the heights that the last validator set was the same
  444. err = r.stateStore.SaveValidatorSets(resp.block.Height+1, lastChangeHeight, lastValidatorSet)
  445. if err != nil {
  446. return err
  447. }
  448. // update the lastChangeHeight
  449. lastChangeHeight = resp.block.Height
  450. }
  451. trustedBlockID = resp.block.LastBlockID
  452. queue.success(resp.block.Height)
  453. r.Logger.Info("backfill: verified and stored light block", "height", resp.block.Height)
  454. lastValidatorSet = resp.block.ValidatorSet
  455. r.backfilledBlocks++
  456. r.metrics.BackFilledBlocks.Add(1)
  457. // The block height might be less than the stopHeight because of the stopTime condition
  458. // hasn't been fulfilled.
  459. if resp.block.Height < stopHeight {
  460. r.backfillBlockTotal++
  461. r.metrics.BackFillBlocksTotal.Set(float64(r.backfillBlockTotal))
  462. }
  463. case <-queue.done():
  464. if err := queue.error(); err != nil {
  465. return err
  466. }
  467. // save the final batch of validators
  468. if err := r.stateStore.SaveValidatorSets(queue.terminal.Height, lastChangeHeight, lastValidatorSet); err != nil {
  469. return err
  470. }
  471. r.Logger.Info("successfully completed backfill process", "endHeight", queue.terminal.Height)
  472. return nil
  473. }
  474. }
  475. }
  476. // handleSnapshotMessage handles envelopes sent from peers on the
  477. // SnapshotChannel. It returns an error only if the Envelope.Message is unknown
  478. // for this channel. This should never be called outside of handleMessage.
  479. func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error {
  480. logger := r.Logger.With("peer", envelope.From)
  481. switch msg := envelope.Message.(type) {
  482. case *ssproto.SnapshotsRequest:
  483. snapshots, err := r.recentSnapshots(recentSnapshots)
  484. if err != nil {
  485. logger.Error("failed to fetch snapshots", "err", err)
  486. return nil
  487. }
  488. for _, snapshot := range snapshots {
  489. logger.Info(
  490. "advertising snapshot",
  491. "height", snapshot.Height,
  492. "format", snapshot.Format,
  493. "peer", envelope.From,
  494. )
  495. r.snapshotCh.Out <- p2p.Envelope{
  496. To: envelope.From,
  497. Message: &ssproto.SnapshotsResponse{
  498. Height: snapshot.Height,
  499. Format: snapshot.Format,
  500. Chunks: snapshot.Chunks,
  501. Hash: snapshot.Hash,
  502. Metadata: snapshot.Metadata,
  503. },
  504. }
  505. }
  506. case *ssproto.SnapshotsResponse:
  507. r.mtx.RLock()
  508. defer r.mtx.RUnlock()
  509. if r.syncer == nil {
  510. logger.Debug("received unexpected snapshot; no state sync in progress")
  511. return nil
  512. }
  513. logger.Info("received snapshot", "height", msg.Height, "format", msg.Format)
  514. _, err := r.syncer.AddSnapshot(envelope.From, &snapshot{
  515. Height: msg.Height,
  516. Format: msg.Format,
  517. Chunks: msg.Chunks,
  518. Hash: msg.Hash,
  519. Metadata: msg.Metadata,
  520. })
  521. if err != nil {
  522. logger.Error(
  523. "failed to add snapshot",
  524. "height", msg.Height,
  525. "format", msg.Format,
  526. "err", err,
  527. "channel", r.snapshotCh.ID,
  528. )
  529. return nil
  530. }
  531. logger.Info("added snapshot", "height", msg.Height, "format", msg.Format)
  532. default:
  533. return fmt.Errorf("received unknown message: %T", msg)
  534. }
  535. return nil
  536. }
  537. // handleChunkMessage handles envelopes sent from peers on the ChunkChannel.
  538. // It returns an error only if the Envelope.Message is unknown for this channel.
  539. // This should never be called outside of handleMessage.
  540. func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error {
  541. switch msg := envelope.Message.(type) {
  542. case *ssproto.ChunkRequest:
  543. r.Logger.Debug(
  544. "received chunk request",
  545. "height", msg.Height,
  546. "format", msg.Format,
  547. "chunk", msg.Index,
  548. "peer", envelope.From,
  549. )
  550. resp, err := r.conn.LoadSnapshotChunkSync(context.Background(), abci.RequestLoadSnapshotChunk{
  551. Height: msg.Height,
  552. Format: msg.Format,
  553. Chunk: msg.Index,
  554. })
  555. if err != nil {
  556. r.Logger.Error(
  557. "failed to load chunk",
  558. "height", msg.Height,
  559. "format", msg.Format,
  560. "chunk", msg.Index,
  561. "err", err,
  562. "peer", envelope.From,
  563. )
  564. return nil
  565. }
  566. r.Logger.Debug(
  567. "sending chunk",
  568. "height", msg.Height,
  569. "format", msg.Format,
  570. "chunk", msg.Index,
  571. "peer", envelope.From,
  572. )
  573. r.chunkCh.Out <- p2p.Envelope{
  574. To: envelope.From,
  575. Message: &ssproto.ChunkResponse{
  576. Height: msg.Height,
  577. Format: msg.Format,
  578. Index: msg.Index,
  579. Chunk: resp.Chunk,
  580. Missing: resp.Chunk == nil,
  581. },
  582. }
  583. case *ssproto.ChunkResponse:
  584. r.mtx.RLock()
  585. defer r.mtx.RUnlock()
  586. if r.syncer == nil {
  587. r.Logger.Debug("received unexpected chunk; no state sync in progress", "peer", envelope.From)
  588. return nil
  589. }
  590. r.Logger.Debug(
  591. "received chunk; adding to sync",
  592. "height", msg.Height,
  593. "format", msg.Format,
  594. "chunk", msg.Index,
  595. "peer", envelope.From,
  596. )
  597. _, err := r.syncer.AddChunk(&chunk{
  598. Height: msg.Height,
  599. Format: msg.Format,
  600. Index: msg.Index,
  601. Chunk: msg.Chunk,
  602. Sender: envelope.From,
  603. })
  604. if err != nil {
  605. r.Logger.Error(
  606. "failed to add chunk",
  607. "height", msg.Height,
  608. "format", msg.Format,
  609. "chunk", msg.Index,
  610. "err", err,
  611. "peer", envelope.From,
  612. )
  613. return nil
  614. }
  615. default:
  616. return fmt.Errorf("received unknown message: %T", msg)
  617. }
  618. return nil
  619. }
  620. func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
  621. switch msg := envelope.Message.(type) {
  622. case *ssproto.LightBlockRequest:
  623. r.Logger.Info("received light block request", "height", msg.Height)
  624. lb, err := r.fetchLightBlock(msg.Height)
  625. if err != nil {
  626. r.Logger.Error("failed to retrieve light block", "err", err, "height", msg.Height)
  627. return err
  628. }
  629. if lb == nil {
  630. r.blockCh.Out <- p2p.Envelope{
  631. To: envelope.From,
  632. Message: &ssproto.LightBlockResponse{
  633. LightBlock: nil,
  634. },
  635. }
  636. return nil
  637. }
  638. lbproto, err := lb.ToProto()
  639. if err != nil {
  640. r.Logger.Error("marshaling light block to proto", "err", err)
  641. return nil
  642. }
  643. // NOTE: If we don't have the light block we will send a nil light block
  644. // back to the requested node, indicating that we don't have it.
  645. r.blockCh.Out <- p2p.Envelope{
  646. To: envelope.From,
  647. Message: &ssproto.LightBlockResponse{
  648. LightBlock: lbproto,
  649. },
  650. }
  651. case *ssproto.LightBlockResponse:
  652. var height int64 = 0
  653. if msg.LightBlock != nil {
  654. height = msg.LightBlock.SignedHeader.Header.Height
  655. }
  656. r.Logger.Info("received light block response", "peer", envelope.From, "height", height)
  657. if err := r.dispatcher.Respond(msg.LightBlock, envelope.From); err != nil {
  658. r.Logger.Error("error processing light block response", "err", err, "height", height)
  659. }
  660. default:
  661. return fmt.Errorf("received unknown message: %T", msg)
  662. }
  663. return nil
  664. }
  665. func (r *Reactor) handleParamsMessage(envelope p2p.Envelope) error {
  666. switch msg := envelope.Message.(type) {
  667. case *ssproto.ParamsRequest:
  668. r.Logger.Debug("received consensus params request", "height", msg.Height)
  669. cp, err := r.stateStore.LoadConsensusParams(int64(msg.Height))
  670. if err != nil {
  671. r.Logger.Error("failed to fetch requested consensus params", "err", err, "height", msg.Height)
  672. return nil
  673. }
  674. cpproto := cp.ToProto()
  675. r.paramsCh.Out <- p2p.Envelope{
  676. To: envelope.From,
  677. Message: &ssproto.ParamsResponse{
  678. Height: msg.Height,
  679. ConsensusParams: cpproto,
  680. },
  681. }
  682. case *ssproto.ParamsResponse:
  683. r.mtx.RLock()
  684. defer r.mtx.RUnlock()
  685. r.Logger.Debug("received consensus params response", "height", msg.Height)
  686. cp := types.ConsensusParamsFromProto(msg.ConsensusParams)
  687. if sp, ok := r.stateProvider.(*stateProviderP2P); ok {
  688. select {
  689. case sp.paramsRecvCh <- cp:
  690. default:
  691. }
  692. } else {
  693. r.Logger.Debug("received unexpected params response; using RPC state provider", "peer", envelope.From)
  694. }
  695. default:
  696. return fmt.Errorf("received unknown message: %T", msg)
  697. }
  698. return nil
  699. }
  700. // handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
  701. // It will handle errors and any possible panics gracefully. A caller can handle
  702. // any error returned by sending a PeerError on the respective channel.
  703. func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
  704. defer func() {
  705. if e := recover(); e != nil {
  706. err = fmt.Errorf("panic in processing message: %v", e)
  707. r.Logger.Error(
  708. "recovering from processing message panic",
  709. "err", err,
  710. "stack", string(debug.Stack()),
  711. )
  712. }
  713. }()
  714. r.Logger.Debug("received message", "message", reflect.TypeOf(envelope.Message), "peer", envelope.From)
  715. switch chID {
  716. case SnapshotChannel:
  717. err = r.handleSnapshotMessage(envelope)
  718. case ChunkChannel:
  719. err = r.handleChunkMessage(envelope)
  720. case LightBlockChannel:
  721. err = r.handleLightBlockMessage(envelope)
  722. case ParamsChannel:
  723. err = r.handleParamsMessage(envelope)
  724. default:
  725. err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
  726. }
  727. return err
  728. }
  729. // processSnapshotCh initiates a blocking process where we listen for and handle
  730. // envelopes on the SnapshotChannel.
  731. func (r *Reactor) processSnapshotCh() {
  732. r.processCh(r.snapshotCh, "snapshot")
  733. }
  734. // processChunkCh initiates a blocking process where we listen for and handle
  735. // envelopes on the ChunkChannel.
  736. func (r *Reactor) processChunkCh() {
  737. r.processCh(r.chunkCh, "chunk")
  738. }
  739. // processBlockCh initiates a blocking process where we listen for and handle
  740. // envelopes on the LightBlockChannel.
  741. func (r *Reactor) processBlockCh() {
  742. r.processCh(r.blockCh, "light block")
  743. }
  744. func (r *Reactor) processParamsCh() {
  745. r.processCh(r.paramsCh, "consensus params")
  746. }
  747. // processCh routes state sync messages to their respective handlers. Any error
  748. // encountered during message execution will result in a PeerError being sent on
  749. // the respective channel. When the reactor is stopped, we will catch the signal
  750. // and close the p2p Channel gracefully.
  751. func (r *Reactor) processCh(ch *p2p.Channel, chName string) {
  752. defer ch.Close()
  753. for {
  754. select {
  755. case envelope := <-ch.In:
  756. if err := r.handleMessage(ch.ID, envelope); err != nil {
  757. r.Logger.Error(fmt.Sprintf("failed to process %s message", chName),
  758. "ch_id", ch.ID, "envelope", envelope, "err", err)
  759. ch.Error <- p2p.PeerError{
  760. NodeID: envelope.From,
  761. Err: err,
  762. }
  763. }
  764. case <-r.closeCh:
  765. r.Logger.Debug(fmt.Sprintf("stopped listening on %s channel; closing...", chName))
  766. return
  767. }
  768. }
  769. }
  770. // processPeerUpdate processes a PeerUpdate, returning an error upon failing to
  771. // handle the PeerUpdate or if a panic is recovered.
  772. func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
  773. r.Logger.Info("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
  774. switch peerUpdate.Status {
  775. case p2p.PeerStatusUp:
  776. r.peers.Append(peerUpdate.NodeID)
  777. case p2p.PeerStatusDown:
  778. r.peers.Remove(peerUpdate.NodeID)
  779. }
  780. r.mtx.Lock()
  781. if r.syncer == nil {
  782. r.mtx.Unlock()
  783. return
  784. }
  785. defer r.mtx.Unlock()
  786. switch peerUpdate.Status {
  787. case p2p.PeerStatusUp:
  788. newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher)
  789. r.providers[peerUpdate.NodeID] = newProvider
  790. r.syncer.AddPeer(peerUpdate.NodeID)
  791. if sp, ok := r.stateProvider.(*stateProviderP2P); ok {
  792. // we do this in a separate routine to not block whilst waiting for the light client to finish
  793. // whatever call it's currently executing
  794. go sp.addProvider(newProvider)
  795. }
  796. case p2p.PeerStatusDown:
  797. delete(r.providers, peerUpdate.NodeID)
  798. r.syncer.RemovePeer(peerUpdate.NodeID)
  799. }
  800. r.Logger.Info("processed peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
  801. }
  802. // processPeerUpdates initiates a blocking process where we listen for and handle
  803. // PeerUpdate messages. When the reactor is stopped, we will catch the signal and
  804. // close the p2p PeerUpdatesCh gracefully.
  805. func (r *Reactor) processPeerUpdates() {
  806. defer r.peerUpdates.Close()
  807. for {
  808. select {
  809. case peerUpdate := <-r.peerUpdates.Updates():
  810. r.processPeerUpdate(peerUpdate)
  811. case <-r.closeCh:
  812. r.Logger.Debug("stopped listening on peer updates channel; closing...")
  813. return
  814. }
  815. }
  816. }
  817. // recentSnapshots fetches the n most recent snapshots from the app
  818. func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) {
  819. resp, err := r.conn.ListSnapshotsSync(context.Background(), abci.RequestListSnapshots{})
  820. if err != nil {
  821. return nil, err
  822. }
  823. sort.Slice(resp.Snapshots, func(i, j int) bool {
  824. a := resp.Snapshots[i]
  825. b := resp.Snapshots[j]
  826. switch {
  827. case a.Height > b.Height:
  828. return true
  829. case a.Height == b.Height && a.Format > b.Format:
  830. return true
  831. default:
  832. return false
  833. }
  834. })
  835. snapshots := make([]*snapshot, 0, n)
  836. for i, s := range resp.Snapshots {
  837. if i >= recentSnapshots {
  838. break
  839. }
  840. snapshots = append(snapshots, &snapshot{
  841. Height: s.Height,
  842. Format: s.Format,
  843. Chunks: s.Chunks,
  844. Hash: s.Hash,
  845. Metadata: s.Metadata,
  846. })
  847. }
  848. return snapshots, nil
  849. }
  850. // fetchLightBlock works out whether the node has a light block at a particular
  851. // height and if so returns it so it can be gossiped to peers
  852. func (r *Reactor) fetchLightBlock(height uint64) (*types.LightBlock, error) {
  853. h := int64(height)
  854. blockMeta := r.blockStore.LoadBlockMeta(h)
  855. if blockMeta == nil {
  856. return nil, nil
  857. }
  858. commit := r.blockStore.LoadBlockCommit(h)
  859. if commit == nil {
  860. return nil, nil
  861. }
  862. vals, err := r.stateStore.LoadValidators(h)
  863. if err != nil {
  864. return nil, err
  865. }
  866. if vals == nil {
  867. return nil, nil
  868. }
  869. return &types.LightBlock{
  870. SignedHeader: &types.SignedHeader{
  871. Header: &blockMeta.Header,
  872. Commit: commit,
  873. },
  874. ValidatorSet: vals,
  875. }, nil
  876. }
  877. func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) error {
  878. startAt := time.Now()
  879. t := time.NewTicker(100 * time.Millisecond)
  880. defer t.Stop()
  881. logT := time.NewTicker(time.Minute)
  882. defer logT.Stop()
  883. var iter int
  884. for r.peers.Len() < numPeers {
  885. iter++
  886. select {
  887. case <-ctx.Done():
  888. return fmt.Errorf("operation canceled while waiting for peers after %s", time.Since(startAt))
  889. case <-r.closeCh:
  890. return fmt.Errorf("shutdown while waiting for peers after %s", time.Since(startAt))
  891. case <-t.C:
  892. continue
  893. case <-logT.C:
  894. r.Logger.Info("waiting for sufficient peers to start statesync",
  895. "duration", time.Since(startAt).String(),
  896. "target", numPeers,
  897. "peers", r.peers.Len(),
  898. "iters", iter,
  899. )
  900. continue
  901. }
  902. }
  903. return nil
  904. }
  905. func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initialHeight int64) error {
  906. var err error
  907. to := light.TrustOptions{
  908. Period: r.cfg.TrustPeriod,
  909. Height: r.cfg.TrustHeight,
  910. Hash: r.cfg.TrustHashBytes(),
  911. }
  912. spLogger := r.Logger.With("module", "stateprovider")
  913. spLogger.Info("initializing state provider", "trustPeriod", to.Period,
  914. "trustHeight", to.Height, "useP2P", r.cfg.UseP2P)
  915. if r.cfg.UseP2P {
  916. if err := r.waitForEnoughPeers(ctx, 2); err != nil {
  917. return err
  918. }
  919. peers := r.peers.All()
  920. providers := make([]provider.Provider, len(peers))
  921. for idx, p := range peers {
  922. providers[idx] = NewBlockProvider(p, chainID, r.dispatcher)
  923. }
  924. r.stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, r.paramsCh.Out, spLogger)
  925. if err != nil {
  926. return fmt.Errorf("failed to initialize P2P state provider: %w", err)
  927. }
  928. } else {
  929. r.stateProvider, err = NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger)
  930. if err != nil {
  931. return fmt.Errorf("failed to initialize RPC state provider: %w", err)
  932. }
  933. }
  934. return nil
  935. }
  936. func (r *Reactor) TotalSnapshots() int64 {
  937. r.mtx.RLock()
  938. defer r.mtx.RUnlock()
  939. if r.syncer != nil && r.syncer.snapshots != nil {
  940. return int64(len(r.syncer.snapshots.snapshots))
  941. }
  942. return 0
  943. }
  944. func (r *Reactor) ChunkProcessAvgTime() time.Duration {
  945. r.mtx.RLock()
  946. defer r.mtx.RUnlock()
  947. if r.syncer != nil {
  948. return time.Duration(r.syncer.avgChunkTime)
  949. }
  950. return time.Duration(0)
  951. }
  952. func (r *Reactor) SnapshotHeight() int64 {
  953. r.mtx.RLock()
  954. defer r.mtx.RUnlock()
  955. if r.syncer != nil {
  956. return r.syncer.lastSyncedSnapshotHeight
  957. }
  958. return 0
  959. }
  960. func (r *Reactor) SnapshotChunksCount() int64 {
  961. r.mtx.RLock()
  962. defer r.mtx.RUnlock()
  963. if r.syncer != nil && r.syncer.chunks != nil {
  964. return int64(r.syncer.chunks.numChunksReturned())
  965. }
  966. return 0
  967. }
  968. func (r *Reactor) SnapshotChunksTotal() int64 {
  969. r.mtx.RLock()
  970. defer r.mtx.RUnlock()
  971. if r.syncer != nil && r.syncer.processingSnapshot != nil {
  972. return int64(r.syncer.processingSnapshot.Chunks)
  973. }
  974. return 0
  975. }
  976. func (r *Reactor) BackFilledBlocks() int64 {
  977. r.mtx.RLock()
  978. defer r.mtx.RUnlock()
  979. return r.backfilledBlocks
  980. }
  981. func (r *Reactor) BackFillBlocksTotal() int64 {
  982. r.mtx.RLock()
  983. defer r.mtx.RUnlock()
  984. return r.backfillBlockTotal
  985. }