You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1007 lines
29 KiB

  1. package statesync
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "reflect"
  8. "runtime/debug"
  9. "sort"
  10. "time"
  11. abci "github.com/tendermint/tendermint/abci/types"
  12. "github.com/tendermint/tendermint/config"
  13. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  14. "github.com/tendermint/tendermint/internal/p2p"
  15. "github.com/tendermint/tendermint/libs/log"
  16. "github.com/tendermint/tendermint/libs/service"
  17. "github.com/tendermint/tendermint/light"
  18. "github.com/tendermint/tendermint/light/provider"
  19. ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
  20. "github.com/tendermint/tendermint/proxy"
  21. sm "github.com/tendermint/tendermint/state"
  22. "github.com/tendermint/tendermint/store"
  23. "github.com/tendermint/tendermint/types"
  24. )
  25. var (
  26. _ service.Service = (*Reactor)(nil)
  27. _ p2p.Wrapper = (*ssproto.Message)(nil)
  28. // ChannelShims contains a map of ChannelDescriptorShim objects, where each
  29. // object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
  30. // p2p proto.Message the new p2p Channel is responsible for handling.
  31. //
  32. //
  33. // TODO: Remove once p2p refactor is complete.
  34. // ref: https://github.com/tendermint/tendermint/issues/5670
  35. ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
  36. SnapshotChannel: {
  37. MsgType: new(ssproto.Message),
  38. Descriptor: &p2p.ChannelDescriptor{
  39. ID: byte(SnapshotChannel),
  40. Priority: 6,
  41. SendQueueCapacity: 10,
  42. RecvMessageCapacity: snapshotMsgSize,
  43. RecvBufferCapacity: 128,
  44. MaxSendBytes: 400,
  45. },
  46. },
  47. ChunkChannel: {
  48. MsgType: new(ssproto.Message),
  49. Descriptor: &p2p.ChannelDescriptor{
  50. ID: byte(ChunkChannel),
  51. Priority: 3,
  52. SendQueueCapacity: 4,
  53. RecvMessageCapacity: chunkMsgSize,
  54. RecvBufferCapacity: 128,
  55. MaxSendBytes: 400,
  56. },
  57. },
  58. LightBlockChannel: {
  59. MsgType: new(ssproto.Message),
  60. Descriptor: &p2p.ChannelDescriptor{
  61. ID: byte(LightBlockChannel),
  62. Priority: 5,
  63. SendQueueCapacity: 10,
  64. RecvMessageCapacity: lightBlockMsgSize,
  65. RecvBufferCapacity: 128,
  66. MaxSendBytes: 400,
  67. },
  68. },
  69. ParamsChannel: {
  70. MsgType: new(ssproto.Message),
  71. Descriptor: &p2p.ChannelDescriptor{
  72. ID: byte(ParamsChannel),
  73. Priority: 2,
  74. SendQueueCapacity: 10,
  75. RecvMessageCapacity: paramMsgSize,
  76. RecvBufferCapacity: 128,
  77. MaxSendBytes: 400,
  78. },
  79. },
  80. }
  81. )
  82. const (
  83. // SnapshotChannel exchanges snapshot metadata
  84. SnapshotChannel = p2p.ChannelID(0x60)
  85. // ChunkChannel exchanges chunk contents
  86. ChunkChannel = p2p.ChannelID(0x61)
  87. // LightBlockChannel exchanges light blocks
  88. LightBlockChannel = p2p.ChannelID(0x62)
  89. // ParamsChannel exchanges consensus params
  90. ParamsChannel = p2p.ChannelID(0x63)
  91. // recentSnapshots is the number of recent snapshots to send and receive per peer.
  92. recentSnapshots = 10
  93. // snapshotMsgSize is the maximum size of a snapshotResponseMessage
  94. snapshotMsgSize = int(4e6) // ~4MB
  95. // chunkMsgSize is the maximum size of a chunkResponseMessage
  96. chunkMsgSize = int(16e6) // ~16MB
  97. // lightBlockMsgSize is the maximum size of a lightBlockResponseMessage
  98. lightBlockMsgSize = int(1e7) // ~1MB
  99. // paramMsgSize is the maximum size of a paramsResponseMessage
  100. paramMsgSize = int(1e5) // ~100kb
  101. // lightBlockResponseTimeout is how long the dispatcher waits for a peer to
  102. // return a light block
  103. lightBlockResponseTimeout = 10 * time.Second
  104. // consensusParamsResponseTimeout is the time the p2p state provider waits
  105. // before performing a secondary call
  106. consensusParamsResponseTimeout = 5 * time.Second
  107. // maxLightBlockRequestRetries is the amount of retries acceptable before
  108. // the backfill process aborts
  109. maxLightBlockRequestRetries = 20
  110. )
  111. // Reactor handles state sync, both restoring snapshots for the local node and
  112. // serving snapshots for other nodes.
  113. type Reactor struct {
  114. service.BaseService
  115. chainID string
  116. initialHeight int64
  117. cfg config.StateSyncConfig
  118. stateStore sm.Store
  119. blockStore *store.BlockStore
  120. conn proxy.AppConnSnapshot
  121. connQuery proxy.AppConnQuery
  122. tempDir string
  123. snapshotCh *p2p.Channel
  124. chunkCh *p2p.Channel
  125. blockCh *p2p.Channel
  126. paramsCh *p2p.Channel
  127. peerUpdates *p2p.PeerUpdates
  128. closeCh chan struct{}
  129. // Dispatcher is used to multiplex light block requests and responses over multiple
  130. // peers used by the p2p state provider and in reverse sync.
  131. dispatcher *Dispatcher
  132. peers *peerList
  133. // These will only be set when a state sync is in progress. It is used to feed
  134. // received snapshots and chunks into the syncer and manage incoming and outgoing
  135. // providers.
  136. mtx tmsync.RWMutex
  137. syncer *syncer
  138. providers map[types.NodeID]*BlockProvider
  139. stateProvider StateProvider
  140. }
  141. // NewReactor returns a reference to a new state sync reactor, which implements
  142. // the service.Service interface. It accepts a logger, connections for snapshots
  143. // and querying, references to p2p Channels and a channel to listen for peer
  144. // updates on. Note, the reactor will close all p2p Channels when stopping.
  145. func NewReactor(
  146. chainID string,
  147. initialHeight int64,
  148. cfg config.StateSyncConfig,
  149. logger log.Logger,
  150. conn proxy.AppConnSnapshot,
  151. connQuery proxy.AppConnQuery,
  152. snapshotCh, chunkCh, blockCh, paramsCh *p2p.Channel,
  153. peerUpdates *p2p.PeerUpdates,
  154. stateStore sm.Store,
  155. blockStore *store.BlockStore,
  156. tempDir string,
  157. ) *Reactor {
  158. r := &Reactor{
  159. chainID: chainID,
  160. initialHeight: initialHeight,
  161. cfg: cfg,
  162. conn: conn,
  163. connQuery: connQuery,
  164. snapshotCh: snapshotCh,
  165. chunkCh: chunkCh,
  166. blockCh: blockCh,
  167. paramsCh: paramsCh,
  168. peerUpdates: peerUpdates,
  169. closeCh: make(chan struct{}),
  170. tempDir: tempDir,
  171. stateStore: stateStore,
  172. blockStore: blockStore,
  173. peers: newPeerList(),
  174. dispatcher: NewDispatcher(blockCh.Out),
  175. providers: make(map[types.NodeID]*BlockProvider),
  176. }
  177. r.BaseService = *service.NewBaseService(logger, "StateSync", r)
  178. return r
  179. }
  180. // OnStart starts separate go routines for each p2p Channel and listens for
  181. // envelopes on each. In addition, it also listens for peer updates and handles
  182. // messages on that p2p channel accordingly. Note, we do not launch a go-routine to
  183. // handle individual envelopes as to not have to deal with bounding workers or pools.
  184. // The caller must be sure to execute OnStop to ensure the outbound p2p Channels are
  185. // closed. No error is returned.
  186. func (r *Reactor) OnStart() error {
  187. go r.processSnapshotCh()
  188. go r.processChunkCh()
  189. go r.processBlockCh()
  190. go r.processParamsCh()
  191. go r.processPeerUpdates()
  192. return nil
  193. }
  194. // OnStop stops the reactor by signaling to all spawned goroutines to exit and
  195. // blocking until they all exit.
  196. func (r *Reactor) OnStop() {
  197. // tell the dispatcher to stop sending any more requests
  198. r.dispatcher.Close()
  199. // wait for any remaining requests to complete
  200. <-r.dispatcher.Done()
  201. // Close closeCh to signal to all spawned goroutines to gracefully exit. All
  202. // p2p Channels should execute Close().
  203. close(r.closeCh)
  204. // Wait for all p2p Channels to be closed before returning. This ensures we
  205. // can easily reason about synchronization of all p2p Channels and ensure no
  206. // panics will occur.
  207. <-r.snapshotCh.Done()
  208. <-r.chunkCh.Done()
  209. <-r.blockCh.Done()
  210. <-r.paramsCh.Done()
  211. <-r.peerUpdates.Done()
  212. }
  213. // Sync runs a state sync, fetching snapshots and providing chunks to the
  214. // application. At the close of the operation, Sync will bootstrap the state
  215. // store and persist the commit at that height so that either consensus or
  216. // blocksync can commence. It will then proceed to backfill the necessary amount
  217. // of historical blocks before participating in consensus
  218. func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
  219. // We need at least two peers (for cross-referencing of light blocks) before we can
  220. // begin state sync
  221. r.waitForEnoughPeers(ctx, 2)
  222. r.mtx.Lock()
  223. if r.syncer != nil {
  224. r.mtx.Unlock()
  225. return sm.State{}, errors.New("a state sync is already in progress")
  226. }
  227. if err := r.initStateProvider(ctx, r.chainID, r.initialHeight); err != nil {
  228. return sm.State{}, err
  229. }
  230. r.syncer = newSyncer(
  231. r.cfg,
  232. r.Logger,
  233. r.conn,
  234. r.connQuery,
  235. r.stateProvider,
  236. r.snapshotCh.Out,
  237. r.chunkCh.Out,
  238. r.tempDir,
  239. )
  240. r.mtx.Unlock()
  241. defer func() {
  242. r.mtx.Lock()
  243. // reset syncing objects at the close of Sync
  244. r.syncer = nil
  245. r.stateProvider = nil
  246. r.mtx.Unlock()
  247. }()
  248. requestSnapshotsHook := func() {
  249. // request snapshots from all currently connected peers
  250. r.snapshotCh.Out <- p2p.Envelope{
  251. Broadcast: true,
  252. Message: &ssproto.SnapshotsRequest{},
  253. }
  254. }
  255. state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, requestSnapshotsHook)
  256. if err != nil {
  257. return sm.State{}, err
  258. }
  259. err = r.stateStore.Bootstrap(state)
  260. if err != nil {
  261. return sm.State{}, fmt.Errorf("failed to bootstrap node with new state: %w", err)
  262. }
  263. err = r.blockStore.SaveSeenCommit(state.LastBlockHeight, commit)
  264. if err != nil {
  265. return sm.State{}, fmt.Errorf("failed to store last seen commit: %w", err)
  266. }
  267. err = r.Backfill(ctx, state)
  268. if err != nil {
  269. r.Logger.Error("backfill failed. Proceeding optimistically...", "err", err)
  270. }
  271. return state, nil
  272. }
  273. // Backfill sequentially fetches, verifies and stores light blocks in reverse
  274. // order. It does not stop verifying blocks until reaching a block with a height
  275. // and time that is less or equal to the stopHeight and stopTime. The
  276. // trustedBlockID should be of the header at startHeight.
  277. func (r *Reactor) Backfill(ctx context.Context, state sm.State) error {
  278. params := state.ConsensusParams.Evidence
  279. stopHeight := state.LastBlockHeight - params.MaxAgeNumBlocks
  280. stopTime := state.LastBlockTime.Add(-params.MaxAgeDuration)
  281. // ensure that stop height doesn't go below the initial height
  282. if stopHeight < state.InitialHeight {
  283. stopHeight = state.InitialHeight
  284. // this essentially makes stop time a void criteria for termination
  285. stopTime = state.LastBlockTime
  286. }
  287. return r.backfill(
  288. ctx,
  289. state.ChainID,
  290. state.LastBlockHeight,
  291. stopHeight,
  292. state.InitialHeight,
  293. state.LastBlockID,
  294. stopTime,
  295. )
  296. }
  297. func (r *Reactor) backfill(
  298. ctx context.Context,
  299. chainID string,
  300. startHeight, stopHeight, initialHeight int64,
  301. trustedBlockID types.BlockID,
  302. stopTime time.Time,
  303. ) error {
  304. r.Logger.Info("starting backfill process...", "startHeight", startHeight,
  305. "stopHeight", stopHeight, "stopTime", stopTime, "trustedBlockID", trustedBlockID)
  306. const sleepTime = 1 * time.Second
  307. var (
  308. lastValidatorSet *types.ValidatorSet
  309. lastChangeHeight = startHeight
  310. )
  311. queue := newBlockQueue(startHeight, stopHeight, initialHeight, stopTime, maxLightBlockRequestRetries)
  312. // fetch light blocks across four workers. The aim with deploying concurrent
  313. // workers is to equate the network messaging time with the verification
  314. // time. Ideally we want the verification process to never have to be
  315. // waiting on blocks. If it takes 4s to retrieve a block and 1s to verify
  316. // it, then steady state involves four workers.
  317. for i := 0; i < int(r.cfg.Fetchers); i++ {
  318. ctxWithCancel, cancel := context.WithCancel(ctx)
  319. defer cancel()
  320. go func() {
  321. for {
  322. select {
  323. case height := <-queue.nextHeight():
  324. // pop the next peer of the list to send a request to
  325. peer := r.peers.Pop(ctx)
  326. r.Logger.Debug("fetching next block", "height", height, "peer", peer)
  327. subCtx, cancel := context.WithTimeout(ctxWithCancel, lightBlockResponseTimeout)
  328. defer cancel()
  329. lb, err := func() (*types.LightBlock, error) {
  330. defer cancel()
  331. // request the light block with a timeout
  332. return r.dispatcher.LightBlock(subCtx, height, peer)
  333. }()
  334. // once the peer has returned a value, add it back to the peer list to be used again
  335. r.peers.Append(peer)
  336. if errors.Is(err, context.Canceled) {
  337. return
  338. }
  339. if err != nil {
  340. queue.retry(height)
  341. if errors.Is(err, errNoConnectedPeers) {
  342. r.Logger.Info("backfill: no connected peers to fetch light blocks from; sleeping...",
  343. "sleepTime", sleepTime)
  344. time.Sleep(sleepTime)
  345. } else {
  346. // we don't punish the peer as it might just have not responded in time
  347. r.Logger.Info("backfill: error with fetching light block",
  348. "height", height, "err", err)
  349. }
  350. continue
  351. }
  352. if lb == nil {
  353. r.Logger.Info("backfill: peer didn't have block, fetching from another peer", "height", height)
  354. queue.retry(height)
  355. // As we are fetching blocks backwards, if this node doesn't have the block it likely doesn't
  356. // have any prior ones, thus we remove it from the peer list.
  357. r.peers.Remove(peer)
  358. continue
  359. }
  360. // run a validate basic. This checks the validator set and commit
  361. // hashes line up
  362. err = lb.ValidateBasic(chainID)
  363. if err != nil || lb.Height != height {
  364. r.Logger.Info("backfill: fetched light block failed validate basic, removing peer...",
  365. "err", err, "height", height)
  366. queue.retry(height)
  367. r.blockCh.Error <- p2p.PeerError{
  368. NodeID: peer,
  369. Err: fmt.Errorf("received invalid light block: %w", err),
  370. }
  371. continue
  372. }
  373. // add block to queue to be verified
  374. queue.add(lightBlockResponse{
  375. block: lb,
  376. peer: peer,
  377. })
  378. r.Logger.Debug("backfill: added light block to processing queue", "height", height)
  379. case <-queue.done():
  380. return
  381. }
  382. }
  383. }()
  384. }
  385. // verify all light blocks
  386. for {
  387. select {
  388. case <-r.closeCh:
  389. queue.close()
  390. return nil
  391. case <-ctx.Done():
  392. queue.close()
  393. return nil
  394. case resp := <-queue.verifyNext():
  395. // validate the header hash. We take the last block id of the
  396. // previous header (i.e. one height above) as the trusted hash which
  397. // we equate to. ValidatorsHash and CommitHash have already been
  398. // checked in the `ValidateBasic`
  399. if w, g := trustedBlockID.Hash, resp.block.Hash(); !bytes.Equal(w, g) {
  400. r.Logger.Info("received invalid light block. header hash doesn't match trusted LastBlockID",
  401. "trustedHash", w, "receivedHash", g, "height", resp.block.Height)
  402. r.blockCh.Error <- p2p.PeerError{
  403. NodeID: resp.peer,
  404. Err: fmt.Errorf("received invalid light block. Expected hash %v, got: %v", w, g),
  405. }
  406. queue.retry(resp.block.Height)
  407. continue
  408. }
  409. // save the signed headers
  410. err := r.blockStore.SaveSignedHeader(resp.block.SignedHeader, trustedBlockID)
  411. if err != nil {
  412. return err
  413. }
  414. // check if there has been a change in the validator set
  415. if lastValidatorSet != nil && !bytes.Equal(resp.block.Header.ValidatorsHash, resp.block.Header.NextValidatorsHash) {
  416. // save all the heights that the last validator set was the same
  417. err = r.stateStore.SaveValidatorSets(resp.block.Height+1, lastChangeHeight, lastValidatorSet)
  418. if err != nil {
  419. return err
  420. }
  421. // update the lastChangeHeight
  422. lastChangeHeight = resp.block.Height
  423. }
  424. trustedBlockID = resp.block.LastBlockID
  425. queue.success(resp.block.Height)
  426. r.Logger.Info("backfill: verified and stored light block", "height", resp.block.Height)
  427. lastValidatorSet = resp.block.ValidatorSet
  428. case <-queue.done():
  429. if err := queue.error(); err != nil {
  430. return err
  431. }
  432. // save the final batch of validators
  433. if err := r.stateStore.SaveValidatorSets(queue.terminal.Height, lastChangeHeight, lastValidatorSet); err != nil {
  434. return err
  435. }
  436. r.Logger.Info("successfully completed backfill process", "endHeight", queue.terminal.Height)
  437. return nil
  438. }
  439. }
  440. }
  441. // handleSnapshotMessage handles envelopes sent from peers on the
  442. // SnapshotChannel. It returns an error only if the Envelope.Message is unknown
  443. // for this channel. This should never be called outside of handleMessage.
  444. func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error {
  445. logger := r.Logger.With("peer", envelope.From)
  446. switch msg := envelope.Message.(type) {
  447. case *ssproto.SnapshotsRequest:
  448. snapshots, err := r.recentSnapshots(recentSnapshots)
  449. if err != nil {
  450. logger.Error("failed to fetch snapshots", "err", err)
  451. return nil
  452. }
  453. for _, snapshot := range snapshots {
  454. logger.Info(
  455. "advertising snapshot",
  456. "height", snapshot.Height,
  457. "format", snapshot.Format,
  458. "peer", envelope.From,
  459. )
  460. r.snapshotCh.Out <- p2p.Envelope{
  461. To: envelope.From,
  462. Message: &ssproto.SnapshotsResponse{
  463. Height: snapshot.Height,
  464. Format: snapshot.Format,
  465. Chunks: snapshot.Chunks,
  466. Hash: snapshot.Hash,
  467. Metadata: snapshot.Metadata,
  468. },
  469. }
  470. }
  471. case *ssproto.SnapshotsResponse:
  472. r.mtx.RLock()
  473. defer r.mtx.RUnlock()
  474. if r.syncer == nil {
  475. logger.Debug("received unexpected snapshot; no state sync in progress")
  476. return nil
  477. }
  478. logger.Info("received snapshot", "height", msg.Height, "format", msg.Format)
  479. _, err := r.syncer.AddSnapshot(envelope.From, &snapshot{
  480. Height: msg.Height,
  481. Format: msg.Format,
  482. Chunks: msg.Chunks,
  483. Hash: msg.Hash,
  484. Metadata: msg.Metadata,
  485. })
  486. if err != nil {
  487. logger.Error(
  488. "failed to add snapshot",
  489. "height", msg.Height,
  490. "format", msg.Format,
  491. "err", err,
  492. "channel", r.snapshotCh.ID,
  493. )
  494. return nil
  495. }
  496. logger.Info("added snapshot", "height", msg.Height, "format", msg.Format)
  497. default:
  498. return fmt.Errorf("received unknown message: %T", msg)
  499. }
  500. return nil
  501. }
  502. // handleChunkMessage handles envelopes sent from peers on the ChunkChannel.
  503. // It returns an error only if the Envelope.Message is unknown for this channel.
  504. // This should never be called outside of handleMessage.
  505. func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error {
  506. switch msg := envelope.Message.(type) {
  507. case *ssproto.ChunkRequest:
  508. r.Logger.Debug(
  509. "received chunk request",
  510. "height", msg.Height,
  511. "format", msg.Format,
  512. "chunk", msg.Index,
  513. "peer", envelope.From,
  514. )
  515. resp, err := r.conn.LoadSnapshotChunkSync(context.Background(), abci.RequestLoadSnapshotChunk{
  516. Height: msg.Height,
  517. Format: msg.Format,
  518. Chunk: msg.Index,
  519. })
  520. if err != nil {
  521. r.Logger.Error(
  522. "failed to load chunk",
  523. "height", msg.Height,
  524. "format", msg.Format,
  525. "chunk", msg.Index,
  526. "err", err,
  527. "peer", envelope.From,
  528. )
  529. return nil
  530. }
  531. r.Logger.Debug(
  532. "sending chunk",
  533. "height", msg.Height,
  534. "format", msg.Format,
  535. "chunk", msg.Index,
  536. "peer", envelope.From,
  537. )
  538. r.chunkCh.Out <- p2p.Envelope{
  539. To: envelope.From,
  540. Message: &ssproto.ChunkResponse{
  541. Height: msg.Height,
  542. Format: msg.Format,
  543. Index: msg.Index,
  544. Chunk: resp.Chunk,
  545. Missing: resp.Chunk == nil,
  546. },
  547. }
  548. case *ssproto.ChunkResponse:
  549. r.mtx.RLock()
  550. defer r.mtx.RUnlock()
  551. if r.syncer == nil {
  552. r.Logger.Debug("received unexpected chunk; no state sync in progress", "peer", envelope.From)
  553. return nil
  554. }
  555. r.Logger.Debug(
  556. "received chunk; adding to sync",
  557. "height", msg.Height,
  558. "format", msg.Format,
  559. "chunk", msg.Index,
  560. "peer", envelope.From,
  561. )
  562. _, err := r.syncer.AddChunk(&chunk{
  563. Height: msg.Height,
  564. Format: msg.Format,
  565. Index: msg.Index,
  566. Chunk: msg.Chunk,
  567. Sender: envelope.From,
  568. })
  569. if err != nil {
  570. r.Logger.Error(
  571. "failed to add chunk",
  572. "height", msg.Height,
  573. "format", msg.Format,
  574. "chunk", msg.Index,
  575. "err", err,
  576. "peer", envelope.From,
  577. )
  578. return nil
  579. }
  580. default:
  581. return fmt.Errorf("received unknown message: %T", msg)
  582. }
  583. return nil
  584. }
  585. func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
  586. switch msg := envelope.Message.(type) {
  587. case *ssproto.LightBlockRequest:
  588. r.Logger.Info("received light block request", "height", msg.Height)
  589. lb, err := r.fetchLightBlock(msg.Height)
  590. if err != nil {
  591. r.Logger.Error("failed to retrieve light block", "err", err, "height", msg.Height)
  592. return err
  593. }
  594. if lb == nil {
  595. r.blockCh.Out <- p2p.Envelope{
  596. To: envelope.From,
  597. Message: &ssproto.LightBlockResponse{
  598. LightBlock: nil,
  599. },
  600. }
  601. return nil
  602. }
  603. lbproto, err := lb.ToProto()
  604. if err != nil {
  605. r.Logger.Error("marshaling light block to proto", "err", err)
  606. return nil
  607. }
  608. // NOTE: If we don't have the light block we will send a nil light block
  609. // back to the requested node, indicating that we don't have it.
  610. r.blockCh.Out <- p2p.Envelope{
  611. To: envelope.From,
  612. Message: &ssproto.LightBlockResponse{
  613. LightBlock: lbproto,
  614. },
  615. }
  616. case *ssproto.LightBlockResponse:
  617. var height int64 = 0
  618. if msg.LightBlock != nil {
  619. height = msg.LightBlock.SignedHeader.Header.Height
  620. }
  621. r.Logger.Info("received light block response", "peer", envelope.From, "height", height)
  622. if err := r.dispatcher.Respond(msg.LightBlock, envelope.From); err != nil {
  623. r.Logger.Error("error processing light block response", "err", err, "height", height)
  624. }
  625. default:
  626. return fmt.Errorf("received unknown message: %T", msg)
  627. }
  628. return nil
  629. }
  630. func (r *Reactor) handleParamsMessage(envelope p2p.Envelope) error {
  631. switch msg := envelope.Message.(type) {
  632. case *ssproto.ParamsRequest:
  633. r.Logger.Debug("received consensus params request", "height", msg.Height)
  634. cp, err := r.stateStore.LoadConsensusParams(int64(msg.Height))
  635. if err != nil {
  636. r.Logger.Error("failed to fetch requested consensus params", "err", err, "height", msg.Height)
  637. return nil
  638. }
  639. cpproto := cp.ToProto()
  640. r.paramsCh.Out <- p2p.Envelope{
  641. To: envelope.From,
  642. Message: &ssproto.ParamsResponse{
  643. Height: msg.Height,
  644. ConsensusParams: cpproto,
  645. },
  646. }
  647. case *ssproto.ParamsResponse:
  648. r.mtx.RLock()
  649. defer r.mtx.RUnlock()
  650. r.Logger.Debug("received consensus params response", "height", msg.Height)
  651. cp := types.ConsensusParamsFromProto(msg.ConsensusParams)
  652. if sp, ok := r.stateProvider.(*stateProviderP2P); ok {
  653. select {
  654. case sp.paramsRecvCh <- cp:
  655. default:
  656. }
  657. } else {
  658. r.Logger.Debug("received unexpected params response; using RPC state provider", "peer", envelope.From)
  659. }
  660. default:
  661. return fmt.Errorf("received unknown message: %T", msg)
  662. }
  663. return nil
  664. }
  665. // handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
  666. // It will handle errors and any possible panics gracefully. A caller can handle
  667. // any error returned by sending a PeerError on the respective channel.
  668. func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
  669. defer func() {
  670. if e := recover(); e != nil {
  671. err = fmt.Errorf("panic in processing message: %v", e)
  672. r.Logger.Error(
  673. "recovering from processing message panic",
  674. "err", err,
  675. "stack", string(debug.Stack()),
  676. )
  677. }
  678. }()
  679. r.Logger.Debug("received message", "message", reflect.TypeOf(envelope.Message), "peer", envelope.From)
  680. switch chID {
  681. case SnapshotChannel:
  682. err = r.handleSnapshotMessage(envelope)
  683. case ChunkChannel:
  684. err = r.handleChunkMessage(envelope)
  685. case LightBlockChannel:
  686. err = r.handleLightBlockMessage(envelope)
  687. case ParamsChannel:
  688. err = r.handleParamsMessage(envelope)
  689. default:
  690. err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
  691. }
  692. return err
  693. }
  694. // processSnapshotCh initiates a blocking process where we listen for and handle
  695. // envelopes on the SnapshotChannel.
  696. func (r *Reactor) processSnapshotCh() {
  697. r.processCh(r.snapshotCh, "snapshot")
  698. }
  699. // processChunkCh initiates a blocking process where we listen for and handle
  700. // envelopes on the ChunkChannel.
  701. func (r *Reactor) processChunkCh() {
  702. r.processCh(r.chunkCh, "chunk")
  703. }
  704. // processBlockCh initiates a blocking process where we listen for and handle
  705. // envelopes on the LightBlockChannel.
  706. func (r *Reactor) processBlockCh() {
  707. r.processCh(r.blockCh, "light block")
  708. }
  709. func (r *Reactor) processParamsCh() {
  710. r.processCh(r.paramsCh, "consensus params")
  711. }
  712. // processCh routes state sync messages to their respective handlers. Any error
  713. // encountered during message execution will result in a PeerError being sent on
  714. // the respective channel. When the reactor is stopped, we will catch the signal
  715. // and close the p2p Channel gracefully.
  716. func (r *Reactor) processCh(ch *p2p.Channel, chName string) {
  717. defer ch.Close()
  718. for {
  719. select {
  720. case envelope := <-ch.In:
  721. if err := r.handleMessage(ch.ID, envelope); err != nil {
  722. r.Logger.Error(fmt.Sprintf("failed to process %s message", chName),
  723. "ch_id", ch.ID, "envelope", envelope, "err", err)
  724. ch.Error <- p2p.PeerError{
  725. NodeID: envelope.From,
  726. Err: err,
  727. }
  728. }
  729. case <-r.closeCh:
  730. r.Logger.Debug(fmt.Sprintf("stopped listening on %s channel; closing...", chName))
  731. return
  732. }
  733. }
  734. }
  735. // processPeerUpdate processes a PeerUpdate, returning an error upon failing to
  736. // handle the PeerUpdate or if a panic is recovered.
  737. func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
  738. r.Logger.Info("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
  739. switch peerUpdate.Status {
  740. case p2p.PeerStatusUp:
  741. r.peers.Append(peerUpdate.NodeID)
  742. case p2p.PeerStatusDown:
  743. r.peers.Remove(peerUpdate.NodeID)
  744. }
  745. r.mtx.Lock()
  746. if r.syncer == nil {
  747. r.mtx.Unlock()
  748. return
  749. }
  750. defer r.mtx.Unlock()
  751. switch peerUpdate.Status {
  752. case p2p.PeerStatusUp:
  753. newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher)
  754. r.providers[peerUpdate.NodeID] = newProvider
  755. r.syncer.AddPeer(peerUpdate.NodeID)
  756. if sp, ok := r.stateProvider.(*stateProviderP2P); ok {
  757. // we do this in a separate routine to not block whilst waiting for the light client to finish
  758. // whatever call it's currently executing
  759. go sp.addProvider(newProvider)
  760. }
  761. case p2p.PeerStatusDown:
  762. delete(r.providers, peerUpdate.NodeID)
  763. r.syncer.RemovePeer(peerUpdate.NodeID)
  764. }
  765. r.Logger.Info("processed peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
  766. }
  767. // processPeerUpdates initiates a blocking process where we listen for and handle
  768. // PeerUpdate messages. When the reactor is stopped, we will catch the signal and
  769. // close the p2p PeerUpdatesCh gracefully.
  770. func (r *Reactor) processPeerUpdates() {
  771. defer r.peerUpdates.Close()
  772. for {
  773. select {
  774. case peerUpdate := <-r.peerUpdates.Updates():
  775. r.processPeerUpdate(peerUpdate)
  776. case <-r.closeCh:
  777. r.Logger.Debug("stopped listening on peer updates channel; closing...")
  778. return
  779. }
  780. }
  781. }
  782. // recentSnapshots fetches the n most recent snapshots from the app
  783. func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) {
  784. resp, err := r.conn.ListSnapshotsSync(context.Background(), abci.RequestListSnapshots{})
  785. if err != nil {
  786. return nil, err
  787. }
  788. sort.Slice(resp.Snapshots, func(i, j int) bool {
  789. a := resp.Snapshots[i]
  790. b := resp.Snapshots[j]
  791. switch {
  792. case a.Height > b.Height:
  793. return true
  794. case a.Height == b.Height && a.Format > b.Format:
  795. return true
  796. default:
  797. return false
  798. }
  799. })
  800. snapshots := make([]*snapshot, 0, n)
  801. for i, s := range resp.Snapshots {
  802. if i >= recentSnapshots {
  803. break
  804. }
  805. snapshots = append(snapshots, &snapshot{
  806. Height: s.Height,
  807. Format: s.Format,
  808. Chunks: s.Chunks,
  809. Hash: s.Hash,
  810. Metadata: s.Metadata,
  811. })
  812. }
  813. return snapshots, nil
  814. }
  815. // fetchLightBlock works out whether the node has a light block at a particular
  816. // height and if so returns it so it can be gossiped to peers
  817. func (r *Reactor) fetchLightBlock(height uint64) (*types.LightBlock, error) {
  818. h := int64(height)
  819. blockMeta := r.blockStore.LoadBlockMeta(h)
  820. if blockMeta == nil {
  821. return nil, nil
  822. }
  823. commit := r.blockStore.LoadBlockCommit(h)
  824. if commit == nil {
  825. return nil, nil
  826. }
  827. vals, err := r.stateStore.LoadValidators(h)
  828. if err != nil {
  829. return nil, err
  830. }
  831. if vals == nil {
  832. return nil, nil
  833. }
  834. return &types.LightBlock{
  835. SignedHeader: &types.SignedHeader{
  836. Header: &blockMeta.Header,
  837. Commit: commit,
  838. },
  839. ValidatorSet: vals,
  840. }, nil
  841. }
  842. func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) {
  843. t := time.NewTicker(200 * time.Millisecond)
  844. defer t.Stop()
  845. for {
  846. select {
  847. case <-ctx.Done():
  848. return
  849. case <-t.C:
  850. if r.peers.Len() >= numPeers {
  851. return
  852. }
  853. }
  854. }
  855. }
  856. func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initialHeight int64) error {
  857. var err error
  858. to := light.TrustOptions{
  859. Period: r.cfg.TrustPeriod,
  860. Height: r.cfg.TrustHeight,
  861. Hash: r.cfg.TrustHashBytes(),
  862. }
  863. spLogger := r.Logger.With("module", "stateprovider")
  864. spLogger.Info("initializing state provider", "trustPeriod", to.Period,
  865. "trustHeight", to.Height, "useP2P", r.cfg.UseP2P)
  866. if r.cfg.UseP2P {
  867. peers := r.peers.All()
  868. providers := make([]provider.Provider, len(peers))
  869. for idx, p := range peers {
  870. providers[idx] = NewBlockProvider(p, chainID, r.dispatcher)
  871. }
  872. r.stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, r.paramsCh.Out, spLogger)
  873. if err != nil {
  874. return fmt.Errorf("failed to initialize P2P state provider: %w", err)
  875. }
  876. } else {
  877. r.stateProvider, err = NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger)
  878. if err != nil {
  879. return fmt.Errorf("failed to initialize RPC state provider: %w", err)
  880. }
  881. }
  882. return nil
  883. }