- package v1
- import (
- "fmt"
- "reflect"
- "sync/atomic"
- "time"
- "github.com/tendermint/tendermint/behaviour"
- bc "github.com/tendermint/tendermint/blockchain"
- "github.com/tendermint/tendermint/libs/log"
- "github.com/tendermint/tendermint/p2p"
- bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
- sm "github.com/tendermint/tendermint/state"
- "github.com/tendermint/tendermint/store"
- "github.com/tendermint/tendermint/types"
- )
- const (
- // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
- BlockchainChannel = byte(0x40)
- trySyncIntervalMS = 10
- trySendIntervalMS = 10
- // ask for best height every 10s
- statusUpdateIntervalSeconds = 10
- )
- var (
- // Maximum number of requests that can be pending per peer, i.e. for which requests have been sent but blocks
- // have not been received.
- maxRequestsPerPeer = 20
- // Maximum number of block requests for the reactor, pending or for which blocks have been received.
- maxNumRequests = 64
- )
- type consensusReactor interface {
- // for when we switch from blockchain reactor and fast sync to
- // the consensus machine
- SwitchToConsensus(state sm.State, skipWAL bool)
- }
- // BlockchainReactor handles long-term catchup syncing.
- type BlockchainReactor struct {
- p2p.BaseReactor
- initialState sm.State // immutable
- state sm.State
- blockExec *sm.BlockExecutor
- store *store.BlockStore
- fastSync bool
- stateSynced bool
- fsm *BcReactorFSM
- blocksSynced uint64
- // Receive goroutine forwards messages to this channel to be processed in the context of the poolRoutine.
- messagesForFSMCh chan bcReactorMessage
- // Switch goroutine may send RemovePeer to the blockchain reactor. This is an error message that is relayed
- // to this channel to be processed in the context of the poolRoutine.
- errorsForFSMCh chan bcReactorMessage
- // This channel is used by the FSM and indirectly the block pool to report errors to the blockchain reactor and
- // the switch.
- eventsFromFSMCh chan bcFsmMessage
- swReporter *behaviour.SwitchReporter
- // Atomic integer (0 - sync in progress, 1 - finished syncing)
- syncEnded int32
- }
- // NewBlockchainReactor returns new reactor instance.
- func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
- fastSync bool) *BlockchainReactor {
- if state.LastBlockHeight != store.Height() {
- panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
- store.Height()))
- }
- const capacity = 1000
- eventsFromFSMCh := make(chan bcFsmMessage, capacity)
- messagesForFSMCh := make(chan bcReactorMessage, capacity)
- errorsForFSMCh := make(chan bcReactorMessage, capacity)
- startHeight := store.Height() + 1
- if startHeight == 1 {
- startHeight = state.InitialHeight
- }
- bcR := &BlockchainReactor{
- initialState: state,
- state: state,
- blockExec: blockExec,
- fastSync: fastSync,
- store: store,
- messagesForFSMCh: messagesForFSMCh,
- eventsFromFSMCh: eventsFromFSMCh,
- errorsForFSMCh: errorsForFSMCh,
- }
- fsm := NewFSM(startHeight, bcR)
- bcR.fsm = fsm
- bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
- // bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)
- return bcR
- }
- // bcReactorMessage is used by the reactor to send messages to the FSM.
- type bcReactorMessage struct {
- event bReactorEvent
- data bReactorEventData
- }
- type bFsmEvent uint
- const (
- // message type events
- peerErrorEv = iota + 1
- syncFinishedEv
- )
- type bFsmEventData struct {
- peerID p2p.ID
- err error
- }
- // bcFsmMessage is used by the FSM to send messages to the reactor
- type bcFsmMessage struct {
- event bFsmEvent
- data bFsmEventData
- }
- // SetLogger implements service.Service by setting the logger on reactor and pool.
- func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
- bcR.BaseService.Logger = l
- bcR.fsm.SetLogger(l)
- }
- // OnStart implements service.Service.
- func (bcR *BlockchainReactor) OnStart() error {
- bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)
- if bcR.fastSync {
- go bcR.poolRoutine()
- } else { // if we're not fast syncing, mark it as finished
- bcR.setSyncEnded()
- }
- return nil
- }
- // OnStop implements service.Service.
- func (bcR *BlockchainReactor) OnStop() {
- }
- func (bcR *BlockchainReactor) isSyncEnded() bool {
- return atomic.LoadInt32(&(bcR.syncEnded)) != 0
- }
- func (bcR *BlockchainReactor) setSyncEnded() {
- atomic.StoreInt32(&(bcR.syncEnded), 1)
- }
- // SwitchToFastSync is called by the state sync reactor when switching to fast sync.
- func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error {
- bcR.fastSync = true
- bcR.initialState = state
- bcR.state = state
- bcR.stateSynced = true
- bcR.fsm = NewFSM(state.LastBlockHeight+1, bcR)
- bcR.fsm.SetLogger(bcR.Logger)
- go bcR.poolRoutine()
- return nil
- }
- // GetChannels implements Reactor
- func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
- return []*p2p.ChannelDescriptor{
- {
- ID: BlockchainChannel,
- Priority: 10,
- SendQueueCapacity: 2000,
- RecvBufferCapacity: 50 * 4096,
- RecvMessageCapacity: bc.MaxMsgSize,
- },
- }
- }
- // AddPeer implements Reactor by sending our state to peer.
- func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
- msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{
- Base: bcR.store.Base(),
- Height: bcR.store.Height(),
- })
- if err != nil {
- bcR.Logger.Error("could not convert msg to protobuf", "err", err)
- return
- }
- peer.Send(BlockchainChannel, msgBytes)
- // it's OK if send fails. will try later in poolRoutine
- // peer is added to the pool once we receive the first
- // bcStatusResponseMessage from the peer and call pool.updatePeer()
- }
- // sendBlockToPeer loads a block and sends it to the requesting peer.
- // If the block doesn't exist a bcNoBlockResponseMessage is sent.
- // If all nodes are honest, no node should be requesting for a block that doesn't exist.
- func (bcR *BlockchainReactor) sendBlockToPeer(msg *bcproto.BlockRequest,
- src p2p.Peer) (queued bool) {
- block := bcR.store.LoadBlock(msg.Height)
- if block != nil {
- pbbi, err := block.ToProto()
- if err != nil {
- bcR.Logger.Error("Could not send block message to peer", "err", err)
- return false
- }
- msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: pbbi})
- if err != nil {
- bcR.Logger.Error("unable to marshal msg", "err", err)
- return false
- }
- return src.TrySend(BlockchainChannel, msgBytes)
- }
- bcR.Logger.Info("peer asking for a block we don't have", "src", src, "height", msg.Height)
- msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
- if err != nil {
- bcR.Logger.Error("unable to marshal msg", "err", err)
- return false
- }
- return src.TrySend(BlockchainChannel, msgBytes)
- }
- func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcproto.StatusRequest, src p2p.Peer) (queued bool) {
- msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{
- Base: bcR.store.Base(),
- Height: bcR.store.Height(),
- })
- if err != nil {
- bcR.Logger.Error("unable to marshal msg", "err", err)
- return false
- }
- return src.TrySend(BlockchainChannel, msgBytes)
- }
- // RemovePeer implements Reactor by removing peer from the pool.
- func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
- if bcR.isSyncEnded() {
- return
- }
- msgData := bcReactorMessage{
- event: peerRemoveEv,
- data: bReactorEventData{
- peerID: peer.ID(),
- err: errSwitchRemovesPeer,
- },
- }
- bcR.errorsForFSMCh <- msgData
- }
- // Receive implements Reactor by handling 4 types of messages (look below).
- // XXX: do not call any methods that can block or incur heavy processing.
- // https://github.com/tendermint/tendermint/issues/2888
- func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
- msg, err := bc.DecodeMsg(msgBytes)
- if err != nil {
- bcR.Logger.Error("error decoding message",
- "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
- _ = bcR.swReporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
- return
- }
- if err = bc.ValidateMsg(msg); err != nil {
- bcR.Logger.Error("peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
- _ = bcR.swReporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
- return
- }
- bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
- switch msg := msg.(type) {
- case *bcproto.BlockRequest:
- if queued := bcR.sendBlockToPeer(msg, src); !queued {
- // Unfortunately not queued since the queue is full.
- bcR.Logger.Error("Could not send block message to peer", "src", src, "height", msg.Height)
- }
- case *bcproto.StatusRequest:
- // Send peer our state.
- if queued := bcR.sendStatusResponseToPeer(msg, src); !queued {
- // Unfortunately not queued since the queue is full.
- bcR.Logger.Error("Could not send status message to peer", "src", src)
- }
- case *bcproto.BlockResponse:
- if bcR.isSyncEnded() {
- return
- }
- bi, err := types.BlockFromProto(msg.Block)
- if err != nil {
- bcR.Logger.Error("error transition block from protobuf", "err", err)
- _ = bcR.swReporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
- return
- }
- msgForFSM := bcReactorMessage{
- event: blockResponseEv,
- data: bReactorEventData{
- peerID: src.ID(),
- height: bi.Height,
- block: bi,
- length: len(msgBytes),
- },
- }
- bcR.Logger.Info("Received", "src", src, "height", bi.Height)
- bcR.messagesForFSMCh <- msgForFSM
- case *bcproto.NoBlockResponse:
- if bcR.isSyncEnded() {
- return
- }
- msgForFSM := bcReactorMessage{
- event: noBlockResponseEv,
- data: bReactorEventData{
- peerID: src.ID(),
- height: msg.Height,
- },
- }
- bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height)
- bcR.messagesForFSMCh <- msgForFSM
- case *bcproto.StatusResponse:
- if bcR.isSyncEnded() {
- return
- }
- // Got a peer status. Unverified.
- msgForFSM := bcReactorMessage{
- event: statusResponseEv,
- data: bReactorEventData{
- peerID: src.ID(),
- height: msg.Height,
- length: len(msgBytes),
- },
- }
- bcR.messagesForFSMCh <- msgForFSM
- default:
- bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg)))
- }
- }
- // processBlocksRoutine processes blocks until signlaed to stop over the stopProcessing channel
- func (bcR *BlockchainReactor) processBlocksRoutine(stopProcessing chan struct{}) {
- processReceivedBlockTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
- defer processReceivedBlockTicker.Stop()
- var (
- doProcessBlockCh = make(chan struct{}, 1)
- lastHundred = time.Now()
- lastRate = 0.0
- )
- ForLoop:
- for {
- select {
- case <-bcR.Quit():
- break ForLoop
- case <-stopProcessing:
- bcR.Logger.Info("finishing block execution")
- break ForLoop
- case <-processReceivedBlockTicker.C: // try to execute blocks
- select {
- case doProcessBlockCh <- struct{}{}:
- default:
- }
- case <-doProcessBlockCh:
- for {
- err := bcR.processBlock()
- if err == errMissingBlock {
- break
- }
- // Notify FSM of block processing result.
- msgForFSM := bcReactorMessage{
- event: processedBlockEv,
- data: bReactorEventData{
- err: err,
- },
- }
- _ = bcR.fsm.Handle(&msgForFSM)
- if err != nil {
- break
- }
- bcR.blocksSynced++
- if bcR.blocksSynced%100 == 0 {
- lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
- height, maxPeerHeight := bcR.fsm.Status()
- bcR.Logger.Info("Fast Sync Rate", "height", height,
- "max_peer_height", maxPeerHeight, "blocks/s", lastRate)
- lastHundred = time.Now()
- }
- }
- }
- }
- }
- // poolRoutine receives and handles messages from the Receive() routine and from the FSM.
- func (bcR *BlockchainReactor) poolRoutine() {
- bcR.fsm.Start()
- sendBlockRequestTicker := time.NewTicker(trySendIntervalMS * time.Millisecond)
- statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
- defer sendBlockRequestTicker.Stop()
- // NOTE: statusUpdateTicker can continue to run
- stopProcessing := make(chan struct{}, 1)
- go bcR.processBlocksRoutine(stopProcessing)
- ForLoop:
- for {
- select {
- case <-sendBlockRequestTicker.C:
- if !bcR.fsm.NeedsBlocks() {
- continue
- }
- _ = bcR.fsm.Handle(&bcReactorMessage{
- event: makeRequestsEv,
- data: bReactorEventData{
- maxNumRequests: maxNumRequests}})
- case <-statusUpdateTicker.C:
- // Ask for status updates.
- go bcR.sendStatusRequest()
- case msg := <-bcR.messagesForFSMCh:
- // Sent from the Receive() routine when status (statusResponseEv) and
- // block (blockResponseEv) response events are received
- _ = bcR.fsm.Handle(&msg)
- case msg := <-bcR.errorsForFSMCh:
- // Sent from the switch.RemovePeer() routine (RemovePeerEv) and
- // FSM state timer expiry routine (stateTimeoutEv).
- _ = bcR.fsm.Handle(&msg)
- case msg := <-bcR.eventsFromFSMCh:
- switch msg.event {
- case syncFinishedEv: // Sent from the FSM when it enters finished state.
- stopProcessing <- struct{}{}
- bcR.setSyncEnded()
- case peerErrorEv: // Sent from the FSM when it detects peer error
- bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerID)
- if msg.data.err == errNoPeerResponse {
- // Sent from the peer timeout handler routine
- _ = bcR.fsm.Handle(&bcReactorMessage{
- event: peerRemoveEv,
- data: bReactorEventData{
- peerID: msg.data.peerID,
- err: msg.data.err,
- },
- })
- }
- // else {
- // For slow peers, or errors due to blocks received from wrong peer
- // the FSM had already removed the peers
- // }
- default:
- bcR.Logger.Error("Event from FSM not supported", "type", msg.event)
- }
- case <-bcR.Quit():
- break ForLoop
- }
- }
- }
- func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID) {
- peer := bcR.Switch.Peers().Get(peerID)
- if peer != nil {
- _ = bcR.swReporter.Report(behaviour.BadMessage(peerID, err.Error()))
- }
- }
- func (bcR *BlockchainReactor) processBlock() error {
- first, second, err := bcR.fsm.FirstTwoBlocks()
- if err != nil {
- // We need both to sync the first block.
- return err
- }
- chainID := bcR.initialState.ChainID
- firstParts := first.MakePartSet(types.BlockPartSizeBytes)
- firstPartSetHeader := firstParts.Header()
- firstID := types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader}
- // Finally, verify the first block using the second's commit
- // NOTE: we can probably make this more efficient, but note that calling
- // first.Hash() doesn't verify the tx contents, so MakePartSet() is
- // currently necessary.
- err = bcR.state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit)
- if err != nil {
- bcR.Logger.Error("error during commit verification", "err", err,
- "first", first.Height, "second", second.Height)
- return errBlockVerificationFailure
- }
- bcR.store.SaveBlock(first, firstParts, second.LastCommit)
- bcR.state, _, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first)
- if err != nil {
- panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
- }
- return nil
- }
- // sendStatusRequest broadcasts `BlockStore` height.
- func (bcR *BlockchainReactor) sendStatusRequest() {
- msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{})
- if err != nil {
- panic(err)
- }
- bcR.Switch.Broadcast(BlockchainChannel, msgBytes)
- }
- // BlockRequest sends `BlockRequest` height.
- func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error {
- peer := bcR.Switch.Peers().Get(peerID)
- if peer == nil {
- return errNilPeerForBlockRequest
- }
- msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: height})
- if err != nil {
- return err
- }
- queued := peer.TrySend(BlockchainChannel, msgBytes)
- if !queued {
- return errSendQueueFull
- }
- return nil
- }
- func (bcR *BlockchainReactor) switchToConsensus() {
- conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
- if ok {
- conR.SwitchToConsensus(bcR.state, bcR.blocksSynced > 0 || bcR.stateSynced)
- bcR.eventsFromFSMCh <- bcFsmMessage{event: syncFinishedEv}
- }
- }
- // Called by FSM and pool:
- // - pool calls when it detects slow peer or when peer times out
- // - FSM calls when:
- // - adding a block (addBlock) fails
- // - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks
- func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) {
- bcR.Logger.Info("sendPeerError:", "peer", peerID, "error", err)
- msgData := bcFsmMessage{
- event: peerErrorEv,
- data: bFsmEventData{
- peerID: peerID,
- err: err,
- },
- }
- bcR.eventsFromFSMCh <- msgData
- }
- func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) {
- if timer == nil {
- panic("nil timer pointer parameter")
- }
- if *timer == nil {
- *timer = time.AfterFunc(timeout, func() {
- msg := bcReactorMessage{
- event: stateTimeoutEv,
- data: bReactorEventData{
- stateName: name,
- },
- }
- bcR.errorsForFSMCh <- msg
- })
- } else {
- (*timer).Reset(timeout)
- }
- }