You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

643 lines
17 KiB

package v2
import (
"errors"
"fmt"
"time"
proto "github.com/gogo/protobuf/proto"
bc "github.com/tendermint/tendermint/internal/blockchain"
"github.com/tendermint/tendermint/internal/blockchain/v2/internal/behavior"
cons "github.com/tendermint/tendermint/internal/consensus"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/sync"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
const (
// chBufferSize is the buffer size of all event channels.
chBufferSize int = 1000
)
type blockStore interface {
LoadBlock(height int64) *types.Block
SaveBlock(*types.Block, *types.PartSet, *types.Commit)
Base() int64
Height() int64
}
// BlockchainReactor handles fast sync protocol.
type BlockchainReactor struct {
p2p.BaseReactor
fastSync *sync.AtomicBool // enable fast sync on start when it's been Set
stateSynced bool // set to true when SwitchToFastSync is called by state sync
scheduler *Routine
processor *Routine
logger log.Logger
mtx tmsync.RWMutex
maxPeerHeight int64
syncHeight int64
events chan Event // non-nil during a fast sync
reporter behavior.Reporter
io iIO
store blockStore
syncStartTime time.Time
syncStartHeight int64
lastSyncRate float64 // # blocks sync per sec base on the last 100 blocks
}
type blockApplier interface {
ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, error)
}
// XXX: unify naming in this package around tmState
func newReactor(state state.State, store blockStore, reporter behavior.Reporter,
blockApplier blockApplier, fastSync bool, metrics *cons.Metrics) *BlockchainReactor {
initHeight := state.LastBlockHeight + 1
if initHeight == 1 {
initHeight = state.InitialHeight
}
scheduler := newScheduler(initHeight, time.Now())
pContext := newProcessorContext(store, blockApplier, state, metrics)
// TODO: Fix naming to just newProcesssor
// newPcState requires a processorContext
processor := newPcState(pContext)
return &BlockchainReactor{
scheduler: newRoutine("scheduler", scheduler.handle, chBufferSize),
processor: newRoutine("processor", processor.handle, chBufferSize),
store: store,
reporter: reporter,
logger: log.NewNopLogger(),
fastSync: sync.NewBool(fastSync),
syncStartHeight: initHeight,
syncStartTime: time.Time{},
lastSyncRate: 0,
}
}
// NewBlockchainReactor creates a new reactor instance.
func NewBlockchainReactor(
state state.State,
blockApplier blockApplier,
store blockStore,
fastSync bool,
metrics *cons.Metrics) *BlockchainReactor {
reporter := behavior.NewMockReporter()
return newReactor(state, store, reporter, blockApplier, fastSync, metrics)
}
// SetSwitch implements Reactor interface.
func (r *BlockchainReactor) SetSwitch(sw *p2p.Switch) {
r.Switch = sw
if sw != nil {
r.io = newSwitchIo(sw)
} else {
r.io = nil
}
}
func (r *BlockchainReactor) setMaxPeerHeight(height int64) {
r.mtx.Lock()
defer r.mtx.Unlock()
if height > r.maxPeerHeight {
r.maxPeerHeight = height
}
}
func (r *BlockchainReactor) setSyncHeight(height int64) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.syncHeight = height
}
// SyncHeight returns the height to which the BlockchainReactor has synced.
func (r *BlockchainReactor) SyncHeight() int64 {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.syncHeight
}
// SetLogger sets the logger of the reactor.
func (r *BlockchainReactor) SetLogger(logger log.Logger) {
r.logger = logger
r.scheduler.setLogger(logger)
r.processor.setLogger(logger)
}
// Start implements cmn.Service interface
func (r *BlockchainReactor) Start() error {
r.reporter = behavior.NewSwitchReporter(r.BaseReactor.Switch)
if r.fastSync.IsSet() {
err := r.startSync(nil)
if err != nil {
return fmt.Errorf("failed to start fast sync: %w", err)
}
}
return nil
}
// startSync begins a fast sync, signaled by r.events being non-nil. If state is non-nil,
// the scheduler and processor is updated with this state on startup.
func (r *BlockchainReactor) startSync(state *state.State) error {
r.mtx.Lock()
defer r.mtx.Unlock()
if r.events != nil {
return errors.New("fast sync already in progress")
}
r.events = make(chan Event, chBufferSize)
go r.scheduler.start()
go r.processor.start()
if state != nil {
<-r.scheduler.ready()
<-r.processor.ready()
r.scheduler.send(bcResetState{state: *state})
r.processor.send(bcResetState{state: *state})
}
go r.demux(r.events)
return nil
}
// endSync ends a fast sync
func (r *BlockchainReactor) endSync() {
r.mtx.Lock()
defer r.mtx.Unlock()
if r.events != nil {
close(r.events)
}
r.events = nil
r.scheduler.stop()
r.processor.stop()
}
// SwitchToFastSync is called by the state sync reactor when switching to fast sync.
func (r *BlockchainReactor) SwitchToFastSync(state state.State) error {
r.stateSynced = true
state = state.Copy()
err := r.startSync(&state)
if err == nil {
r.syncStartTime = time.Now()
}
return err
}
// reactor generated ticker events:
// ticker for cleaning peers
type rTryPrunePeer struct {
priorityHigh
time time.Time
}
func (e rTryPrunePeer) String() string {
return fmt.Sprintf("rTryPrunePeer{%v}", e.time)
}
// ticker event for scheduling block requests
type rTrySchedule struct {
priorityHigh
time time.Time
}
func (e rTrySchedule) String() string {
return fmt.Sprintf("rTrySchedule{%v}", e.time)
}
// ticker for block processing
type rProcessBlock struct {
priorityNormal
}
func (e rProcessBlock) String() string {
return "rProcessBlock"
}
// reactor generated events based on blockchain related messages from peers:
// blockResponse message received from a peer
type bcBlockResponse struct {
priorityNormal
time time.Time
peerID types.NodeID
size int64
block *types.Block
}
func (resp bcBlockResponse) String() string {
return fmt.Sprintf("bcBlockResponse{%d#%X (size: %d bytes) from %v at %v}",
resp.block.Height, resp.block.Hash(), resp.size, resp.peerID, resp.time)
}
// blockNoResponse message received from a peer
type bcNoBlockResponse struct {
priorityNormal
time time.Time
peerID types.NodeID
height int64
}
func (resp bcNoBlockResponse) String() string {
return fmt.Sprintf("bcNoBlockResponse{%v has no block at height %d at %v}",
resp.peerID, resp.height, resp.time)
}
// statusResponse message received from a peer
type bcStatusResponse struct {
priorityNormal
time time.Time
peerID types.NodeID
base int64
height int64
}
func (resp bcStatusResponse) String() string {
return fmt.Sprintf("bcStatusResponse{%v is at height %d (base: %d) at %v}",
resp.peerID, resp.height, resp.base, resp.time)
}
// new peer is connected
type bcAddNewPeer struct {
priorityNormal
peerID types.NodeID
}
func (resp bcAddNewPeer) String() string {
return fmt.Sprintf("bcAddNewPeer{%v}", resp.peerID)
}
// existing peer is removed
type bcRemovePeer struct {
priorityHigh
peerID types.NodeID
reason interface{}
}
func (resp bcRemovePeer) String() string {
return fmt.Sprintf("bcRemovePeer{%v due to %v}", resp.peerID, resp.reason)
}
// resets the scheduler and processor state, e.g. following a switch from state syncing
type bcResetState struct {
priorityHigh
state state.State
}
func (e bcResetState) String() string {
return fmt.Sprintf("bcResetState{%v}", e.state)
}
// Takes the channel as a parameter to avoid race conditions on r.events.
func (r *BlockchainReactor) demux(events <-chan Event) {
var lastHundred = time.Now()
var (
processBlockFreq = 20 * time.Millisecond
doProcessBlockCh = make(chan struct{}, 1)
doProcessBlockTk = time.NewTicker(processBlockFreq)
)
defer doProcessBlockTk.Stop()
var (
prunePeerFreq = 1 * time.Second
doPrunePeerCh = make(chan struct{}, 1)
doPrunePeerTk = time.NewTicker(prunePeerFreq)
)
defer doPrunePeerTk.Stop()
var (
scheduleFreq = 20 * time.Millisecond
doScheduleCh = make(chan struct{}, 1)
doScheduleTk = time.NewTicker(scheduleFreq)
)
defer doScheduleTk.Stop()
var (
statusFreq = 10 * time.Second
doStatusCh = make(chan struct{}, 1)
doStatusTk = time.NewTicker(statusFreq)
)
defer doStatusTk.Stop()
doStatusCh <- struct{}{} // immediately broadcast to get status of existing peers
// Memoize the scSchedulerFail error to avoid printing it every scheduleFreq.
var scSchedulerFailErr error
// XXX: Extract timers to make testing atemporal
for {
select {
// Pacers: send at most per frequency but don't saturate
case <-doProcessBlockTk.C:
select {
case doProcessBlockCh <- struct{}{}:
default:
}
case <-doPrunePeerTk.C:
select {
case doPrunePeerCh <- struct{}{}:
default:
}
case <-doScheduleTk.C:
select {
case doScheduleCh <- struct{}{}:
default:
}
case <-doStatusTk.C:
select {
case doStatusCh <- struct{}{}:
default:
}
// Tickers: perform tasks periodically
case <-doScheduleCh:
r.scheduler.send(rTrySchedule{time: time.Now()})
case <-doPrunePeerCh:
r.scheduler.send(rTryPrunePeer{time: time.Now()})
case <-doProcessBlockCh:
r.processor.send(rProcessBlock{})
case <-doStatusCh:
if err := r.io.broadcastStatusRequest(); err != nil {
r.logger.Error("Error broadcasting status request", "err", err)
}
// Events from peers. Closing the channel signals event loop termination.
case event, ok := <-events:
if !ok {
r.logger.Info("Stopping event processing")
return
}
switch event := event.(type) {
case bcStatusResponse:
r.setMaxPeerHeight(event.height)
r.scheduler.send(event)
case bcAddNewPeer, bcRemovePeer, bcBlockResponse, bcNoBlockResponse:
r.scheduler.send(event)
default:
r.logger.Error("Received unexpected event", "event", fmt.Sprintf("%T", event))
}
// Incremental events from scheduler
case event := <-r.scheduler.next():
switch event := event.(type) {
case scBlockReceived:
r.processor.send(event)
case scPeerError:
r.processor.send(event)
if err := r.reporter.Report(behavior.BadMessage(event.peerID, "scPeerError")); err != nil {
r.logger.Error("Error reporting peer", "err", err)
}
case scBlockRequest:
peer := r.Switch.Peers().Get(event.peerID)
if peer == nil {
r.logger.Error("Wanted to send block request, but no such peer", "peerID", event.peerID)
continue
}
if err := r.io.sendBlockRequest(peer, event.height); err != nil {
r.logger.Error("Error sending block request", "err", err)
}
case scFinishedEv:
r.processor.send(event)
r.scheduler.stop()
case scSchedulerFail:
if scSchedulerFailErr != event.reason {
r.logger.Error("Scheduler failure", "err", event.reason.Error())
scSchedulerFailErr = event.reason
}
case scPeersPruned:
// Remove peers from the processor.
for _, peerID := range event.peers {
r.processor.send(scPeerError{peerID: peerID, reason: errors.New("peer was pruned")})
}
r.logger.Debug("Pruned peers", "count", len(event.peers))
case noOpEvent:
default:
r.logger.Error("Received unexpected scheduler event", "event", fmt.Sprintf("%T", event))
}
// Incremental events from processor
case event := <-r.processor.next():
switch event := event.(type) {
case pcBlockProcessed:
r.setSyncHeight(event.height)
if (r.syncHeight-r.syncStartHeight)%100 == 0 {
newSyncRate := 100 / time.Since(lastHundred).Seconds()
if r.lastSyncRate == 0 {
r.lastSyncRate = newSyncRate
} else {
r.lastSyncRate = 0.9*r.lastSyncRate + 0.1*newSyncRate
}
r.logger.Info("Fast Sync Rate", "height", r.syncHeight,
"max_peer_height", r.maxPeerHeight, "blocks/s", r.lastSyncRate)
lastHundred = time.Now()
}
r.scheduler.send(event)
case pcBlockVerificationFailure:
r.scheduler.send(event)
case pcFinished:
r.logger.Info("Fast sync complete, switching to consensus")
if !r.io.trySwitchToConsensus(event.tmState, event.blocksSynced > 0 || r.stateSynced) {
r.logger.Error("Failed to switch to consensus reactor")
}
r.endSync()
r.fastSync.UnSet()
return
case noOpEvent:
default:
r.logger.Error("Received unexpected processor event", "event", fmt.Sprintf("%T", event))
}
// Terminal event from scheduler
case err := <-r.scheduler.final():
switch err {
case nil:
r.logger.Info("Scheduler stopped")
default:
r.logger.Error("Scheduler aborted with error", "err", err)
}
// Terminal event from processor
case err := <-r.processor.final():
switch err {
case nil:
r.logger.Info("Processor stopped")
default:
r.logger.Error("Processor aborted with error", "err", err)
}
}
}
}
// Stop implements cmn.Service interface.
func (r *BlockchainReactor) Stop() error {
r.logger.Info("reactor stopping")
r.endSync()
r.logger.Info("reactor stopped")
return nil
}
// Receive implements Reactor by handling different message types.
// XXX: do not call any methods that can block or incur heavy processing.
// https://github.com/tendermint/tendermint/issues/2888
func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
logger := r.logger.With("src", src.ID(), "chID", chID)
msgProto := new(bcproto.Message)
if err := proto.Unmarshal(msgBytes, msgProto); err != nil {
logger.Error("error decoding message", "err", err)
_ = r.reporter.Report(behavior.BadMessage(src.ID(), err.Error()))
return
}
if err := msgProto.Validate(); err != nil {
logger.Error("peer sent us an invalid msg", "msg", msgProto, "err", err)
_ = r.reporter.Report(behavior.BadMessage(src.ID(), err.Error()))
return
}
r.logger.Debug("received", "msg", msgProto)
switch msg := msgProto.Sum.(type) {
case *bcproto.Message_StatusRequest:
if err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), src); err != nil {
logger.Error("Could not send status message to src peer")
}
case *bcproto.Message_BlockRequest:
block := r.store.LoadBlock(msg.BlockRequest.Height)
if block != nil {
if err := r.io.sendBlockToPeer(block, src); err != nil {
logger.Error("Could not send block message to src peer", "err", err)
}
} else {
logger.Info("peer asking for a block we don't have", "height", msg.BlockRequest.Height)
if err := r.io.sendBlockNotFound(msg.BlockRequest.Height, src); err != nil {
logger.Error("Couldn't send block not found msg", "err", err)
}
}
case *bcproto.Message_StatusResponse:
r.mtx.RLock()
if r.events != nil {
r.events <- bcStatusResponse{
peerID: src.ID(),
base: msg.StatusResponse.Base,
height: msg.StatusResponse.Height,
}
}
r.mtx.RUnlock()
case *bcproto.Message_BlockResponse:
bi, err := types.BlockFromProto(msg.BlockResponse.Block)
if err != nil {
logger.Error("error transitioning block from protobuf", "err", err)
_ = r.reporter.Report(behavior.BadMessage(src.ID(), err.Error()))
return
}
r.mtx.RLock()
if r.events != nil {
r.events <- bcBlockResponse{
peerID: src.ID(),
block: bi,
size: int64(len(msgBytes)),
time: time.Now(),
}
}
r.mtx.RUnlock()
case *bcproto.Message_NoBlockResponse:
r.mtx.RLock()
if r.events != nil {
r.events <- bcNoBlockResponse{
peerID: src.ID(),
height: msg.NoBlockResponse.Height,
time: time.Now(),
}
}
r.mtx.RUnlock()
}
}
// AddPeer implements Reactor interface
func (r *BlockchainReactor) AddPeer(peer p2p.Peer) {
err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), peer)
if err != nil {
r.logger.Error("could not send our status to the new peer", "peer", peer.ID, "err", err)
}
err = r.io.sendStatusRequest(peer)
if err != nil {
r.logger.Error("could not send status request to the new peer", "peer", peer.ID, "err", err)
}
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.events != nil {
r.events <- bcAddNewPeer{peerID: peer.ID()}
}
}
// RemovePeer implements Reactor interface.
func (r *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.events != nil {
r.events <- bcRemovePeer{
peerID: peer.ID(),
reason: reason,
}
}
}
// GetChannels implements Reactor
func (r *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
ID: BlockchainChannel,
Priority: 5,
SendQueueCapacity: 2000,
RecvBufferCapacity: 1024,
RecvMessageCapacity: bc.MaxMsgSize,
},
}
}
func (r *BlockchainReactor) GetMaxPeerBlockHeight() int64 {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.maxPeerHeight
}
func (r *BlockchainReactor) GetTotalSyncedTime() time.Duration {
if !r.fastSync.IsSet() || r.syncStartTime.IsZero() {
return time.Duration(0)
}
return time.Since(r.syncStartTime)
}
func (r *BlockchainReactor) GetRemainingSyncTime() time.Duration {
if !r.fastSync.IsSet() {
return time.Duration(0)
}
r.mtx.RLock()
defer r.mtx.RUnlock()
targetSyncs := r.maxPeerHeight - r.syncStartHeight
currentSyncs := r.syncHeight - r.syncStartHeight + 1
if currentSyncs < 0 || r.lastSyncRate < 0.001 {
return time.Duration(0)
}
remain := float64(targetSyncs-currentSyncs) / r.lastSyncRate
return time.Duration(int64(remain * float64(time.Second)))
}