You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

570 lines
16 KiB

package v1
import (
"fmt"
"reflect"
"time"
"github.com/tendermint/tendermint/behaviour"
bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
const (
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40)
trySyncIntervalMS = 10
trySendIntervalMS = 10
// ask for best height every 10s
statusUpdateIntervalSeconds = 10
)
var (
// Maximum number of requests that can be pending per peer, i.e. for which requests have been sent but blocks
// have not been received.
maxRequestsPerPeer = 20
// Maximum number of block requests for the reactor, pending or for which blocks have been received.
maxNumRequests = 64
)
type consensusReactor interface {
// for when we switch from blockchain reactor and fast sync to
// the consensus machine
SwitchToConsensus(state sm.State, skipWAL bool)
}
// BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct {
p2p.BaseReactor
initialState sm.State // immutable
state sm.State
blockExec *sm.BlockExecutor
store *store.BlockStore
fastSync bool
stateSynced bool
fsm *BcReactorFSM
blocksSynced uint64
// Receive goroutine forwards messages to this channel to be processed in the context of the poolRoutine.
messagesForFSMCh chan bcReactorMessage
// Switch goroutine may send RemovePeer to the blockchain reactor. This is an error message that is relayed
// to this channel to be processed in the context of the poolRoutine.
errorsForFSMCh chan bcReactorMessage
// This channel is used by the FSM and indirectly the block pool to report errors to the blockchain reactor and
// the switch.
eventsFromFSMCh chan bcFsmMessage
swReporter *behaviour.SwitchReporter
}
// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
fastSync bool) *BlockchainReactor {
if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
store.Height()))
}
const capacity = 1000
eventsFromFSMCh := make(chan bcFsmMessage, capacity)
messagesForFSMCh := make(chan bcReactorMessage, capacity)
errorsForFSMCh := make(chan bcReactorMessage, capacity)
startHeight := store.Height() + 1
if startHeight == 1 {
startHeight = state.InitialHeight
}
bcR := &BlockchainReactor{
initialState: state,
state: state,
blockExec: blockExec,
fastSync: fastSync,
store: store,
messagesForFSMCh: messagesForFSMCh,
eventsFromFSMCh: eventsFromFSMCh,
errorsForFSMCh: errorsForFSMCh,
}
fsm := NewFSM(startHeight, bcR)
bcR.fsm = fsm
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
// bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)
return bcR
}
// bcReactorMessage is used by the reactor to send messages to the FSM.
type bcReactorMessage struct {
event bReactorEvent
data bReactorEventData
}
type bFsmEvent uint
const (
// message type events
peerErrorEv = iota + 1
syncFinishedEv
)
type bFsmEventData struct {
peerID p2p.ID
err error
}
// bcFsmMessage is used by the FSM to send messages to the reactor
type bcFsmMessage struct {
event bFsmEvent
data bFsmEventData
}
// SetLogger implements service.Service by setting the logger on reactor and pool.
func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
bcR.BaseService.Logger = l
bcR.fsm.SetLogger(l)
}
// OnStart implements service.Service.
func (bcR *BlockchainReactor) OnStart() error {
bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)
if bcR.fastSync {
go bcR.poolRoutine()
}
return nil
}
// OnStop implements service.Service.
func (bcR *BlockchainReactor) OnStop() {
_ = bcR.Stop()
}
// SwitchToFastSync is called by the state sync reactor when switching to fast sync.
func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error {
bcR.fastSync = true
bcR.initialState = state
bcR.state = state
bcR.stateSynced = true
bcR.fsm = NewFSM(state.LastBlockHeight+1, bcR)
bcR.fsm.SetLogger(bcR.Logger)
go bcR.poolRoutine()
return nil
}
// GetChannels implements Reactor
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
ID: BlockchainChannel,
Priority: 10,
SendQueueCapacity: 2000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: bc.MaxMsgSize,
},
}
}
// AddPeer implements Reactor by sending our state to peer.
func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}
peer.Send(BlockchainChannel, msgBytes)
// it's OK if send fails. will try later in poolRoutine
// peer is added to the pool once we receive the first
// bcStatusResponseMessage from the peer and call pool.updatePeer()
}
// sendBlockToPeer loads a block and sends it to the requesting peer.
// If the block doesn't exist a bcNoBlockResponseMessage is sent.
// If all nodes are honest, no node should be requesting for a block that doesn't exist.
func (bcR *BlockchainReactor) sendBlockToPeer(msg *bcproto.BlockRequest,
src p2p.Peer) (queued bool) {
block := bcR.store.LoadBlock(msg.Height)
if block != nil {
pbbi, err := block.ToProto()
if err != nil {
bcR.Logger.Error("Could not send block message to peer", "err", err)
return false
}
msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: pbbi})
if err != nil {
bcR.Logger.Error("unable to marshal msg", "err", err)
return false
}
return src.TrySend(BlockchainChannel, msgBytes)
}
bcR.Logger.Info("peer asking for a block we don't have", "src", src, "height", msg.Height)
msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
bcR.Logger.Error("unable to marshal msg", "err", err)
return false
}
return src.TrySend(BlockchainChannel, msgBytes)
}
func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcproto.StatusRequest, src p2p.Peer) (queued bool) {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
})
if err != nil {
bcR.Logger.Error("unable to marshal msg", "err", err)
return false
}
return src.TrySend(BlockchainChannel, msgBytes)
}
// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
msgData := bcReactorMessage{
event: peerRemoveEv,
data: bReactorEventData{
peerID: peer.ID(),
err: errSwitchRemovesPeer,
},
}
bcR.errorsForFSMCh <- msgData
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := bc.DecodeMsg(msgBytes)
if err != nil {
bcR.Logger.Error("error decoding message",
"src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
_ = bcR.swReporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
return
}
if err = bc.ValidateMsg(msg); err != nil {
bcR.Logger.Error("peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
_ = bcR.swReporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
return
}
bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
switch msg := msg.(type) {
case *bcproto.BlockRequest:
if queued := bcR.sendBlockToPeer(msg, src); !queued {
// Unfortunately not queued since the queue is full.
bcR.Logger.Error("Could not send block message to peer", "src", src, "height", msg.Height)
}
case *bcproto.StatusRequest:
// Send peer our state.
if queued := bcR.sendStatusResponseToPeer(msg, src); !queued {
// Unfortunately not queued since the queue is full.
bcR.Logger.Error("Could not send status message to peer", "src", src)
}
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
bcR.Logger.Error("error transition block from protobuf", "err", err)
return
}
msgForFSM := bcReactorMessage{
event: blockResponseEv,
data: bReactorEventData{
peerID: src.ID(),
height: bi.Height,
block: bi,
length: len(msgBytes),
},
}
bcR.Logger.Info("Received", "src", src, "height", bi.Height)
bcR.messagesForFSMCh <- msgForFSM
case *bcproto.NoBlockResponse:
msgForFSM := bcReactorMessage{
event: noBlockResponseEv,
data: bReactorEventData{
peerID: src.ID(),
height: msg.Height,
},
}
bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height)
bcR.messagesForFSMCh <- msgForFSM
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
msgForFSM := bcReactorMessage{
event: statusResponseEv,
data: bReactorEventData{
peerID: src.ID(),
height: msg.Height,
length: len(msgBytes),
},
}
bcR.messagesForFSMCh <- msgForFSM
default:
bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg)))
}
}
// processBlocksRoutine processes blocks until signlaed to stop over the stopProcessing channel
func (bcR *BlockchainReactor) processBlocksRoutine(stopProcessing chan struct{}) {
processReceivedBlockTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
doProcessBlockCh := make(chan struct{}, 1)
lastHundred := time.Now()
lastRate := 0.0
ForLoop:
for {
select {
case <-stopProcessing:
bcR.Logger.Info("finishing block execution")
break ForLoop
case <-processReceivedBlockTicker.C: // try to execute blocks
select {
case doProcessBlockCh <- struct{}{}:
default:
}
case <-doProcessBlockCh:
for {
err := bcR.processBlock()
if err == errMissingBlock {
break
}
// Notify FSM of block processing result.
msgForFSM := bcReactorMessage{
event: processedBlockEv,
data: bReactorEventData{
err: err,
},
}
_ = bcR.fsm.Handle(&msgForFSM)
if err != nil {
break
}
bcR.blocksSynced++
if bcR.blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
height, maxPeerHeight := bcR.fsm.Status()
bcR.Logger.Info("Fast Sync Rate", "height", height,
"max_peer_height", maxPeerHeight, "blocks/s", lastRate)
lastHundred = time.Now()
}
}
}
}
}
// poolRoutine receives and handles messages from the Receive() routine and from the FSM.
func (bcR *BlockchainReactor) poolRoutine() {
bcR.fsm.Start()
sendBlockRequestTicker := time.NewTicker(trySendIntervalMS * time.Millisecond)
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
stopProcessing := make(chan struct{}, 1)
go bcR.processBlocksRoutine(stopProcessing)
ForLoop:
for {
select {
case <-sendBlockRequestTicker.C:
if !bcR.fsm.NeedsBlocks() {
continue
}
_ = bcR.fsm.Handle(&bcReactorMessage{
event: makeRequestsEv,
data: bReactorEventData{
maxNumRequests: maxNumRequests}})
case <-statusUpdateTicker.C:
// Ask for status updates.
go bcR.sendStatusRequest()
case msg := <-bcR.messagesForFSMCh:
// Sent from the Receive() routine when status (statusResponseEv) and
// block (blockResponseEv) response events are received
_ = bcR.fsm.Handle(&msg)
case msg := <-bcR.errorsForFSMCh:
// Sent from the switch.RemovePeer() routine (RemovePeerEv) and
// FSM state timer expiry routine (stateTimeoutEv).
_ = bcR.fsm.Handle(&msg)
case msg := <-bcR.eventsFromFSMCh:
switch msg.event {
case syncFinishedEv:
stopProcessing <- struct{}{}
// Sent from the FSM when it enters finished state.
break ForLoop
case peerErrorEv:
// Sent from the FSM when it detects peer error
bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerID)
if msg.data.err == errNoPeerResponse {
// Sent from the peer timeout handler routine
_ = bcR.fsm.Handle(&bcReactorMessage{
event: peerRemoveEv,
data: bReactorEventData{
peerID: msg.data.peerID,
err: msg.data.err,
},
})
}
// else {
// For slow peers, or errors due to blocks received from wrong peer
// the FSM had already removed the peers
// }
default:
bcR.Logger.Error("Event from FSM not supported", "type", msg.event)
}
case <-bcR.Quit():
break ForLoop
}
}
}
func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID) {
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil {
_ = bcR.swReporter.Report(behaviour.BadMessage(peerID, err.Error()))
}
}
func (bcR *BlockchainReactor) processBlock() error {
first, second, err := bcR.fsm.FirstTwoBlocks()
if err != nil {
// We need both to sync the first block.
return err
}
chainID := bcR.initialState.ChainID
firstParts := first.MakePartSet(types.BlockPartSizeBytes)
firstPartSetHeader := firstParts.Header()
firstID := types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader}
// Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
err = bcR.state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit)
if err != nil {
bcR.Logger.Error("error during commit verification", "err", err,
"first", first.Height, "second", second.Height)
return errBlockVerificationFailure
}
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
bcR.state, _, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first)
if err != nil {
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
return nil
}
// Implements bcRNotifier
// sendStatusRequest broadcasts `BlockStore` height.
func (bcR *BlockchainReactor) sendStatusRequest() {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{})
if err != nil {
panic(err)
}
bcR.Switch.Broadcast(BlockchainChannel, msgBytes)
}
// Implements bcRNotifier
// BlockRequest sends `BlockRequest` height.
func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error {
peer := bcR.Switch.Peers().Get(peerID)
if peer == nil {
return errNilPeerForBlockRequest
}
msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: height})
if err != nil {
return err
}
queued := peer.TrySend(BlockchainChannel, msgBytes)
if !queued {
return errSendQueueFull
}
return nil
}
// Implements bcRNotifier
func (bcR *BlockchainReactor) switchToConsensus() {
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(bcR.state, bcR.blocksSynced > 0 || bcR.stateSynced)
bcR.eventsFromFSMCh <- bcFsmMessage{event: syncFinishedEv}
}
// else {
// Should only happen during testing.
// }
}
// Implements bcRNotifier
// Called by FSM and pool:
// - pool calls when it detects slow peer or when peer times out
// - FSM calls when:
// - adding a block (addBlock) fails
// - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks
func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) {
bcR.Logger.Info("sendPeerError:", "peer", peerID, "error", err)
msgData := bcFsmMessage{
event: peerErrorEv,
data: bFsmEventData{
peerID: peerID,
err: err,
},
}
bcR.eventsFromFSMCh <- msgData
}
// Implements bcRNotifier
func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) {
if timer == nil {
panic("nil timer pointer parameter")
}
if *timer == nil {
*timer = time.AfterFunc(timeout, func() {
msg := bcReactorMessage{
event: stateTimeoutEv,
data: bReactorEventData{
stateName: name,
},
}
bcR.errorsForFSMCh <- msg
})
} else {
(*timer).Reset(timeout)
}
}