Browse Source

blockchain v0: p2p refactor (#5858)

pull/5928/head
Aleksandr Bezobchuk 3 years ago
committed by GitHub
parent
commit
62d7a5d028
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1103 additions and 675 deletions
  1. +3
    -99
      blockchain/msgs.go
  2. +2
    -0
      blockchain/v0/pool.go
  3. +383
    -261
      blockchain/v0/reactor.go
  4. +291
    -214
      blockchain/v0/reactor_test.go
  5. +50
    -0
      blockchain/v0/test_util.go
  6. +53
    -7
      blockchain/v2/io.go
  7. +29
    -18
      blockchain/v2/reactor.go
  8. +25
    -11
      blockchain/v2/reactor_test.go
  9. +65
    -21
      node/node.go
  10. +107
    -0
      proto/tendermint/blockchain/message.go
  11. +25
    -19
      proto/tendermint/blockchain/message_test.go
  12. +2
    -2
      proto/tendermint/statesync/message.go
  13. +2
    -2
      statesync/reactor.go
  14. +66
    -21
      test/maverick/node/node.go

+ 3
- 99
blockchain/msgs.go View File

@ -1,108 +1,12 @@
package blockchain
import (
"errors"
"fmt"
"github.com/gogo/protobuf/proto"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
"github.com/tendermint/tendermint/types"
)
const (
// NOTE: keep up to date with bcproto.BlockResponse
BlockResponseMessagePrefixSize = 4
BlockResponseMessageFieldKeySize = 1
MaxMsgSize = types.MaxBlockSizeBytes +
BlockResponseMessagePrefixSize +
BlockResponseMessageFieldKeySize
MaxMsgSize = types.MaxBlockSizeBytes +
bcproto.BlockResponseMessagePrefixSize +
bcproto.BlockResponseMessageFieldKeySize
)
// EncodeMsg encodes a Protobuf message
func EncodeMsg(pb proto.Message) ([]byte, error) {
msg := bcproto.Message{}
switch pb := pb.(type) {
case *bcproto.BlockRequest:
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
case *bcproto.BlockResponse:
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
case *bcproto.NoBlockResponse:
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
case *bcproto.StatusRequest:
msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb}
case *bcproto.StatusResponse:
msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb}
default:
return nil, fmt.Errorf("unknown message type %T", pb)
}
bz, err := proto.Marshal(&msg)
if err != nil {
return nil, fmt.Errorf("unable to marshal %T: %w", pb, err)
}
return bz, nil
}
// DecodeMsg decodes a Protobuf message.
func DecodeMsg(bz []byte) (proto.Message, error) {
pb := &bcproto.Message{}
err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}
switch msg := pb.Sum.(type) {
case *bcproto.Message_BlockRequest:
return msg.BlockRequest, nil
case *bcproto.Message_BlockResponse:
return msg.BlockResponse, nil
case *bcproto.Message_NoBlockResponse:
return msg.NoBlockResponse, nil
case *bcproto.Message_StatusRequest:
return msg.StatusRequest, nil
case *bcproto.Message_StatusResponse:
return msg.StatusResponse, nil
default:
return nil, fmt.Errorf("unknown message type %T", msg)
}
}
// ValidateMsg validates a message.
func ValidateMsg(pb proto.Message) error {
if pb == nil {
return errors.New("message cannot be nil")
}
switch msg := pb.(type) {
case *bcproto.BlockRequest:
if msg.Height < 0 {
return errors.New("negative Height")
}
case *bcproto.BlockResponse:
// validate basic is called later when converting from proto
return nil
case *bcproto.NoBlockResponse:
if msg.Height < 0 {
return errors.New("negative Height")
}
case *bcproto.StatusResponse:
if msg.Base < 0 {
return errors.New("negative Base")
}
if msg.Height < 0 {
return errors.New("negative Height")
}
if msg.Base > msg.Height {
return fmt.Errorf("base %v cannot be greater than height %v", msg.Base, msg.Height)
}
case *bcproto.StatusRequest:
return nil
default:
return fmt.Errorf("unknown message type %T", msg)
}
return nil
}

+ 2
- 0
blockchain/v0/pool.go View File

@ -30,6 +30,7 @@ eg, L = latency = 0.1s
const (
requestIntervalMS = 2
maxTotalRequesters = 600
maxPeerErrBuffer = 1000
maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 20
@ -180,6 +181,7 @@ func (pool *BlockPool) IsCaughtUp() bool {
if len(pool.peers) == 0 {
return false
}
// NOTE: we use maxPeerHeight - 1 because to sync block H requires block H+1
// to verify the LastCommit.
return pool.height >= (pool.maxPeerHeight - 1)


+ 383
- 261
blockchain/v0/reactor.go View File

@ -2,11 +2,12 @@ package v0
import (
"fmt"
"reflect"
"sync"
"time"
bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/p2p"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
sm "github.com/tendermint/tendermint/state"
@ -14,18 +15,39 @@ import (
"github.com/tendermint/tendermint/types"
)
var (
_ service.Service = (*Reactor)(nil)
// ChannelShims contains a map of ChannelDescriptorShim objects, where each
// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
// p2p proto.Message the new p2p Channel is responsible for handling.
//
//
// TODO: Remove once p2p refactor is complete.
// ref: https://github.com/tendermint/tendermint/issues/5670
ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{
BlockchainChannel: {
MsgType: new(bcproto.Message),
Descriptor: &p2p.ChannelDescriptor{
ID: byte(BlockchainChannel),
Priority: 5,
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: bc.MaxMsgSize,
},
},
}
)
const (
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40)
// BlockchainChannel is a channel for blocks and status updates
BlockchainChannel = p2p.ChannelID(0x40)
trySyncIntervalMS = 10
// stop syncing when last block's time is
// within this much of the system time.
// stopSyncingDurationMinutes = 10
// ask for best height every 10s
statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 1
@ -34,8 +56,8 @@ const (
)
type consensusReactor interface {
// for when we switch from blockchain reactor and fast sync to
// the consensus machine
// For when we switch from blockchain reactor and fast sync to the consensus
// machine.
SwitchToConsensus(state sm.State, skipWAL bool)
}
@ -49,339 +71,435 @@ func (e peerError) Error() string {
}
// BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct {
p2p.BaseReactor
type Reactor struct {
service.BaseService
// immutable
initialState sm.State
blockExec *sm.BlockExecutor
store *store.BlockStore
pool *BlockPool
fastSync bool
blockExec *sm.BlockExecutor
store *store.BlockStore
pool *BlockPool
consReactor consensusReactor
fastSync bool
blockchainCh *p2p.Channel
peerUpdates *p2p.PeerUpdatesCh
closeCh chan struct{}
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
}
// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
fastSync bool) *BlockchainReactor {
// poolWG is used to synchronize the graceful shutdown of the poolRoutine and
// requestRoutine spawned goroutines when stopping the reactor and before
// stopping the p2p Channel(s).
poolWG sync.WaitGroup
}
// NewReactor returns new reactor instance.
func NewReactor(
logger log.Logger,
state sm.State,
blockExec *sm.BlockExecutor,
store *store.BlockStore,
consReactor consensusReactor,
blockchainCh *p2p.Channel,
peerUpdates *p2p.PeerUpdatesCh,
fastSync bool,
) (*Reactor, error) {
if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
store.Height()))
return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())
}
requestsCh := make(chan BlockRequest, maxTotalRequesters)
const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
startHeight := store.Height() + 1
if startHeight == 1 {
startHeight = state.InitialHeight
}
pool := NewBlockPool(startHeight, requestsCh, errorsCh)
bcR := &BlockchainReactor{
requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
r := &Reactor{
initialState: state,
blockExec: blockExec,
store: store,
pool: pool,
pool: NewBlockPool(startHeight, requestsCh, errorsCh),
consReactor: consReactor,
fastSync: fastSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
blockchainCh: blockchainCh,
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
}
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
return bcR
}
// SetLogger implements service.Service by setting the logger on reactor and pool.
func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
bcR.BaseService.Logger = l
bcR.pool.Logger = l
r.BaseService = *service.NewBaseService(logger, "Blockchain", r)
return r, nil
}
// OnStart implements service.Service.
func (bcR *BlockchainReactor) OnStart() error {
if bcR.fastSync {
err := bcR.pool.Start()
if err != nil {
// OnStart starts separate go routines for each p2p Channel and listens for
// envelopes on each. In addition, it also listens for peer updates and handles
// messages on that p2p channel accordingly. The caller must be sure to execute
// OnStop to ensure the outbound p2p Channels are closed.
//
// If fastSync is enabled, we also start the pool and the pool processing
// goroutine. If the pool fails to start, an error is returned.
func (r *Reactor) OnStart() error {
if r.fastSync {
if err := r.pool.Start(); err != nil {
return err
}
go bcR.poolRoutine(false)
go r.poolRoutine(false)
}
return nil
}
// SwitchToFastSync is called by the state sync reactor when switching to fast sync.
func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error {
bcR.fastSync = true
bcR.initialState = state
go r.processBlockchainCh()
go r.processPeerUpdates()
bcR.pool.height = state.LastBlockHeight + 1
err := bcR.pool.Start()
if err != nil {
return err
}
go bcR.poolRoutine(true)
return nil
}
// OnStop implements service.Service.
func (bcR *BlockchainReactor) OnStop() {
if bcR.fastSync {
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
// blocking until they all exit.
func (r *Reactor) OnStop() {
if r.fastSync {
if err := r.pool.Stop(); err != nil {
r.Logger.Error("failed to stop pool", "err", err)
}
}
}
// GetChannels implements Reactor
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
ID: BlockchainChannel,
Priority: 5,
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: bc.MaxMsgSize,
},
}
// wait for the poolRoutine and requestRoutine goroutines to gracefully exit
r.poolWG.Wait()
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
// p2p Channels should execute Close().
close(r.closeCh)
// Wait for all p2p Channels to be closed before returning. This ensures we
// can easily reason about synchronization of all p2p Channels and ensure no
// panics will occur.
<-r.blockchainCh.Done()
<-r.peerUpdates.Done()
}
// AddPeer implements Reactor by sending our state to peer.
func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height()})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
// Otherwise, we'll respond saying we do not have it.
func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID p2p.NodeID) {
block := r.store.LoadBlock(msg.Height)
if block != nil {
blockProto, err := block.ToProto()
if err != nil {
r.Logger.Error("failed to convert msg to protobuf", "err", err)
return
}
_ = peer.Send(BlockchainChannel, msgBytes)
// it's OK if send fails. will try later in poolRoutine
r.blockchainCh.Out() <- p2p.Envelope{
To: peerID,
Message: &bcproto.BlockResponse{Block: blockProto},
}
// peer is added to the pool once we receive the first
// bcStatusResponseMessage from the peer and call pool.SetPeerRange
}
return
}
// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
bcR.pool.RemovePeer(peer.ID())
r.Logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height)
r.blockchainCh.Out() <- p2p.Envelope{
To: peerID,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
}
}
// respondToPeer loads a block and sends it to the requesting peer,
// if we have it. Otherwise, we'll respond saying we don't have it.
func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest,
src p2p.Peer) (queued bool) {
// handleBlockchainMessage handles enevelopes sent from peers on the
// BlockchainChannel. It returns an error only if the Envelope.Message is unknown
// for this channel. This should never be called outside of handleMessage.
func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error {
switch msg := envelope.Message.(type) {
case *bcproto.BlockRequest:
r.respondToPeer(msg, envelope.From)
block := bcR.store.LoadBlock(msg.Height)
if block != nil {
bl, err := block.ToProto()
case *bcproto.BlockResponse:
block, err := types.BlockFromProto(msg.Block)
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
r.Logger.Error("failed to convert block from proto", "err", err)
return err
}
msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: bl})
if err != nil {
bcR.Logger.Error("could not marshal msg", "err", err)
return false
r.pool.AddBlock(envelope.From, block, block.Size())
case *bcproto.StatusRequest:
r.blockchainCh.Out() <- p2p.Envelope{
To: envelope.From,
Message: &bcproto.StatusResponse{
Height: r.store.Height(),
Base: r.store.Base(),
},
}
return src.TrySend(BlockchainChannel, msgBytes)
case *bcproto.StatusResponse:
r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
r.Logger.Debug(
"peer does not have the requested block",
"height", msg.Height,
"peer", envelope.From,
)
default:
r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From)
return fmt.Errorf("received unknown message: %T", msg)
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
return nil
}
msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
r.Logger.Error("recovering from processing message panic", "err", err)
}
}()
r.Logger.Debug("received message", "message", envelope.Message, "peer", envelope.From)
switch chID {
case BlockchainChannel:
err = r.handleBlockchainMessage(envelope)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
}
return src.TrySend(BlockchainChannel, msgBytes)
return err
}
// Receive implements Reactor by handling 4 types of messages (look below).
// XXX: do not call any methods that can block or incur heavy processing.
// https://github.com/tendermint/tendermint/issues/2888
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
logger := bcR.Logger.With("src", src, "chId", chID)
// processBlockchainCh initiates a blocking process where we listen for and handle
// envelopes on the BlockchainChannel. Any error encountered during message
// execution will result in a PeerError being sent on the BlockchainChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processBlockchainCh() {
defer r.blockchainCh.Close()
msg, err := bc.DecodeMsg(msgBytes)
if err != nil {
logger.Error("Error decoding message", "err", err)
bcR.Switch.StopPeerForError(src, err)
return
for {
select {
case envelope := <-r.blockchainCh.In():
if err := r.handleMessage(r.blockchainCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.blockchainCh.ID(), "envelope", envelope, "err", err)
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
}
}
case <-r.closeCh:
r.Logger.Debug("stopped listening on blockchain channel; closing...")
return
}
}
}
if err = bc.ValidateMsg(msg); err != nil {
logger.Error("Peer sent us invalid msg", "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
// processPeerUpdate processes a PeerUpdate, returning an error upon failing to
// handle the PeerUpdate or if a panic is recovered.
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status)
// XXX: Pool#RedoRequest can sometimes give us an empty peer
if len(peerUpdate.PeerID) == 0 {
return
}
logger.Debug("Receive", "msg", msg)
switch peerUpdate.Status {
case p2p.PeerStatusNew, p2p.PeerStatusUp:
// send a status update the newly added peer
r.blockchainCh.Out() <- p2p.Envelope{
To: peerUpdate.PeerID,
Message: &bcproto.StatusResponse{
Base: r.store.Base(),
Height: r.store.Height(),
},
}
switch msg := msg.(type) {
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, src)
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
logger.Error("Block content is invalid", "err", err)
bcR.Switch.StopPeerForError(src, err)
case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned:
r.pool.RemovePeer(peerUpdate.PeerID)
}
}
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates() {
defer r.peerUpdates.Close()
for {
select {
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(peerUpdate)
case <-r.closeCh:
r.Logger.Debug("stopped listening on peer updates channel; closing...")
return
}
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
case *bcproto.StatusRequest:
// Send peer our state.
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
})
if err != nil {
logger.Error("could not convert msg to protobut", "err", err)
}
}
// SwitchToFastSync is called by the state sync reactor when switching to fast
// sync.
func (r *Reactor) SwitchToFastSync(state sm.State) error {
r.fastSync = true
r.initialState = state
r.pool.height = state.LastBlockHeight + 1
if err := r.pool.Start(); err != nil {
return err
}
go r.poolRoutine(true)
return nil
}
func (r *Reactor) requestRoutine() {
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
defer statusUpdateTicker.Stop()
r.poolWG.Add(1)
defer r.poolWG.Done()
for {
select {
case <-r.closeCh:
return
case <-r.pool.Quit():
return
case request := <-r.requestsCh:
r.blockchainCh.Out() <- p2p.Envelope{
To: request.PeerID,
Message: &bcproto.BlockRequest{Height: request.Height},
}
case pErr := <-r.errorsCh:
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: pErr.peerID,
Err: pErr.err,
Severity: p2p.PeerErrorSeverityLow,
}
case <-statusUpdateTicker.C:
r.poolWG.Add(1)
go func() {
defer r.poolWG.Done()
r.blockchainCh.Out() <- p2p.Envelope{
Broadcast: true,
Message: &bcproto.StatusRequest{},
}
}()
}
src.TrySend(BlockchainChannel, msgBytes)
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
logger.Debug("Peer does not have requested block", "height", msg.Height)
default:
logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
}
// Handle messages from the poolReactor telling the reactor what to do.
// poolRoutine handles messages from the poolReactor telling the reactor what to
// do.
//
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) {
func (r *Reactor) poolRoutine(stateSynced bool) {
var (
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
statusUpdateTicker = time.NewTicker(statusUpdateIntervalSeconds * time.Second)
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
blocksSynced = uint64(0)
chainID = bcR.initialState.ChainID
state = bcR.initialState
chainID = r.initialState.ChainID
state = r.initialState
lastHundred = time.Now()
lastRate = 0.0
didProcessCh = make(chan struct{}, 1)
)
defer trySyncTicker.Stop()
defer statusUpdateTicker.Stop()
defer switchToConsensusTicker.Stop()
go func() {
for {
select {
case <-bcR.Quit():
return
case <-bcR.pool.Quit():
return
case request := <-bcR.requestsCh:
peer := bcR.Switch.Peers().Get(request.PeerID)
if peer == nil {
bcR.Logger.Debug("Can't send request: no peer", "peer_id", request.PeerID)
continue
}
msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: request.Height})
if err != nil {
bcR.Logger.Error("could not convert BlockRequest to proto", "err", err)
continue
}
go r.requestRoutine()
queued := peer.TrySend(BlockchainChannel, msgBytes)
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
}
case err := <-bcR.errorsCh:
peer := bcR.Switch.Peers().Get(err.peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, err)
}
case <-statusUpdateTicker.C:
// ask for status updates
go bcR.BroadcastStatusRequest()
}
}
}()
r.poolWG.Add(1)
defer r.poolWG.Done()
FOR_LOOP:
for {
select {
case <-switchToConsensusTicker.C:
var (
height, numPending, lenRequesters = bcR.pool.GetStatus()
outbound, inbound, _ = bcR.Switch.NumPeers()
lastAdvance = bcR.pool.LastAdvance()
height, numPending, lenRequesters = r.pool.GetStatus()
lastAdvance = r.pool.LastAdvance()
)
bcR.Logger.Debug("Consensus ticker",
"numPending", numPending,
"total", lenRequesters)
r.Logger.Debug(
"consensus ticker",
"num_pending", numPending,
"total", lenRequesters,
"height", height,
)
switch {
case bcR.pool.IsCaughtUp():
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
case r.pool.IsCaughtUp():
r.Logger.Info("switching to consensus reactor", "height", height)
case time.Since(lastAdvance) > syncTimeout:
bcR.Logger.Error(fmt.Sprintf("No progress since last advance: %v", lastAdvance))
r.Logger.Error("no progress since last advance", "last_advance", lastAdvance)
default:
bcR.Logger.Info("Not caught up yet",
"height", height, "max_peer_height", bcR.pool.MaxPeerHeight(),
"num_peers", outbound+inbound,
"timeout_in", syncTimeout-time.Since(lastAdvance))
r.Logger.Info(
"not caught up yet",
"height", height,
"max_peer_height", r.pool.MaxPeerHeight(),
"timeout_in", syncTimeout-time.Since(lastAdvance),
)
continue
}
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
if err := r.pool.Stop(); err != nil {
r.Logger.Error("failed to stop pool", "err", err)
}
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
if r.consReactor != nil {
r.consReactor.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
}
break FOR_LOOP
case <-trySyncTicker.C: // chan time
case <-trySyncTicker.C:
select {
case didProcessCh <- struct{}{}:
default:
}
case <-didProcessCh:
// NOTE: It is a subtle mistake to process more than a single block
// at a time (e.g. 10) here, because we only TrySend 1 request per
// loop. The ratio mismatch can result in starving of blocks, a
// sudden burst of requests and responses, and repeat.
// Consequently, it is better to split these routines rather than
// coupling them as it's written here. TODO uncouple from request
// routine.
// See if there are any blocks to sync.
first, second := bcR.pool.PeekTwoBlocks()
// bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
// NOTE: It is a subtle mistake to process more than a single block at a
// time (e.g. 10) here, because we only send one BlockRequest per loop
// iteration. The ratio mismatch can result in starving of blocks, i.e. a
// sudden burst of requests and responses, and repeat. Consequently, it is
// better to split these routines rather than coupling them as it is
// written here.
//
// TODO: Uncouple from request routine.
// see if there are any blocks to sync
first, second := r.pool.PeekTwoBlocks()
if first == nil || second == nil {
// We need both to sync the first block.
// we need both to sync the first block
continue FOR_LOOP
} else {
// Try again quickly next loop.
// try again quickly next loop
didProcessCh <- struct{}{}
}
@ -391,71 +509,75 @@ FOR_LOOP:
firstID = types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader}
)
// Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling
// Finally, verify the first block using the second's commit.
//
// NOTE: We can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
err := state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit)
if err != nil {
err = fmt.Errorf("invalid last commit: %w", err)
bcR.Logger.Error(err.Error(),
"last_commit", second.LastCommit, "block_id", firstID, "height", first.Height)
peerID := bcR.pool.RedoRequest(first.Height)
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil {
// NOTE: we've already removed the peer's request, but we still need
// to clean up the rest.
bcR.Switch.StopPeerForError(peer, err)
r.Logger.Error(
err.Error(),
"last_commit", second.LastCommit,
"block_id", firstID,
"height", first.Height,
)
// NOTE: We've already removed the peer's request, but we still need
// to clean up the rest.
peerID := r.pool.RedoRequest(first.Height)
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: peerID,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
}
peerID2 := bcR.pool.RedoRequest(second.Height)
peerID2 := r.pool.RedoRequest(second.Height)
if peerID2 != peerID {
if peer2 := bcR.Switch.Peers().Get(peerID2); peer2 != nil {
bcR.Switch.StopPeerForError(peer2, err)
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: peerID2,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
}
}
continue FOR_LOOP
} else {
bcR.pool.PopRequest()
r.pool.PopRequest()
// TODO: batch saves so we dont persist to disk every block
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
// TODO: batch saves so we do not persist to disk every block
r.store.SaveBlock(first, firstParts, second.LastCommit)
// TODO: same thing for app - but we would need a way to get the hash
// without persisting the state.
var err error
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first)
// TODO: Same thing for app - but we would need a way to get the hash
// without persisting the state.
state, _, err = r.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
// TODO: This is bad, are we zombie?
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
blocksSynced++
if blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate",
"height", bcR.pool.height, "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
r.Logger.Info(
"fast sync rate",
"height", r.pool.height,
"max_peer_height", r.pool.MaxPeerHeight(),
"blocks/s", lastRate,
)
lastHundred = time.Now()
}
}
continue FOR_LOOP
case <-bcR.Quit():
case <-r.closeCh:
break FOR_LOOP
}
}
}
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *BlockchainReactor) BroadcastStatusRequest() {
bm, err := bc.EncodeMsg(&bcproto.StatusRequest{})
if err != nil {
bcR.Logger.Error("could not convert StatusRequest to proto", "err", err)
return
}
// We don't care about whenever broadcast is successful or not.
_ = bcR.Switch.Broadcast(BlockchainChannel, bm)
}

+ 291
- 214
blockchain/v0/reactor_test.go View File

@ -2,71 +2,59 @@ package v0
import (
"fmt"
"math/rand"
"os"
"sort"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/mempool/mock"
"github.com/tendermint/tendermint/p2p"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
dbm "github.com/tendermint/tm-db"
)
var config *cfg.Config
var rng = rand.New(rand.NewSource(time.Now().UnixNano()))
func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) {
validators := make([]types.GenesisValidator, numValidators)
privValidators := make([]types.PrivValidator, numValidators)
for i := 0; i < numValidators; i++ {
val, privVal := types.RandValidator(randPower, minPower)
validators[i] = types.GenesisValidator{
PubKey: val.PubKey,
Power: val.VotingPower,
}
privValidators[i] = privVal
}
sort.Sort(types.PrivValidatorsByAddress(privValidators))
type reactorTestSuite struct {
reactor *Reactor
app proxy.AppConns
return &types.GenesisDoc{
GenesisTime: tmtime.Now(),
ChainID: config.ChainID(),
Validators: validators,
}, privValidators
}
peerID p2p.NodeID
type BlockchainReactorPair struct {
reactor *BlockchainReactor
app proxy.AppConns
blockchainChannel *p2p.Channel
blockchainInCh chan p2p.Envelope
blockchainOutCh chan p2p.Envelope
blockchainPeerErrCh chan p2p.PeerError
peerUpdatesCh chan p2p.PeerUpdate
peerUpdates *p2p.PeerUpdatesCh
}
func newBlockchainReactor(
logger log.Logger,
func setup(
t *testing.T,
genDoc *types.GenesisDoc,
privVals []types.PrivValidator,
maxBlockHeight int64) BlockchainReactorPair {
if len(privVals) != 1 {
panic("only support one validator")
}
maxBlockHeight int64,
chBuf uint,
) *reactorTestSuite {
t.Helper()
app := &testApp{}
require.Len(t, privVals, 1, "only one validator can be supported")
app := &abci.BaseApplication{}
cc := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start()
if err != nil {
panic(fmt.Errorf("error start app: %w", err))
}
require.NoError(t, proxyApp.Start())
blockDB := dbm.NewMemDB()
stateDB := dbm.NewMemDB()
@ -74,25 +62,24 @@ func newBlockchainReactor(
blockStore := store.NewBlockStore(blockDB)
state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
if err != nil {
panic(fmt.Errorf("error constructing state from genesis file: %w", err))
}
require.NoError(t, err)
// Make the BlockchainReactor itself.
// NOTE we have to create and commit the blocks first because
// pool.height is determined from the store.
fastSync := true
db := dbm.NewMemDB()
stateStore = sm.NewStore(db)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
mock.Mempool{}, sm.EmptyEvidencePool{})
if err = stateStore.Save(state); err != nil {
panic(err)
}
// let's add some blocks in
blockExec := sm.NewBlockExecutor(
stateStore,
log.TestingLogger(),
proxyApp.Consensus(),
mock.Mempool{},
sm.EmptyEvidencePool{},
)
require.NoError(t, stateStore.Save(state))
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
if blockHeight > 1 {
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
lastBlock := blockStore.LoadBlock(blockHeight - 1)
@ -105,60 +92,197 @@ func newBlockchainReactor(
lastBlock.Header.ChainID,
time.Now(),
)
if err != nil {
panic(err)
}
lastCommit = types.NewCommit(vote.Height, vote.Round,
lastBlockMeta.BlockID, []types.CommitSig{vote.CommitSig()})
require.NoError(t, err)
lastCommit = types.NewCommit(
vote.Height,
vote.Round,
lastBlockMeta.BlockID,
[]types.CommitSig{vote.CommitSig()},
)
}
thisBlock := makeBlock(blockHeight, state, lastCommit)
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
if err != nil {
panic(fmt.Errorf("error apply block: %w", err))
}
require.NoError(t, err)
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
pID := make([]byte, 16)
_, err = rng.Read(pID)
require.NoError(t, err)
peerUpdatesCh := make(chan p2p.PeerUpdate, chBuf)
rts := &reactorTestSuite{
app: proxyApp,
blockchainInCh: make(chan p2p.Envelope, chBuf),
blockchainOutCh: make(chan p2p.Envelope, chBuf),
blockchainPeerErrCh: make(chan p2p.PeerError, chBuf),
peerUpdatesCh: peerUpdatesCh,
peerUpdates: p2p.NewPeerUpdates(peerUpdatesCh),
peerID: p2p.NodeID(fmt.Sprintf("%x", pID)),
}
rts.blockchainChannel = p2p.NewChannel(
BlockchainChannel,
new(bcproto.Message),
rts.blockchainInCh,
rts.blockchainOutCh,
rts.blockchainPeerErrCh,
)
reactor, err := NewReactor(
log.TestingLogger().With("module", "blockchain", "node", rts.peerID),
state.Copy(),
blockExec,
blockStore,
nil,
rts.blockchainChannel,
rts.peerUpdates,
fastSync,
)
require.NoError(t, err)
rts.reactor = reactor
require.NoError(t, rts.reactor.Start())
require.True(t, rts.reactor.IsRunning())
t.Cleanup(func() {
require.NoError(t, rts.reactor.Stop())
require.NoError(t, rts.app.Stop())
require.False(t, rts.reactor.IsRunning())
})
return rts
}
return BlockchainReactorPair{bcReactor, proxyApp}
func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite, dropChErr bool) {
// create a mapping for efficient suite lookup by peer ID
suitesByPeerID := make(map[p2p.NodeID]*reactorTestSuite)
for _, suite := range suites {
suitesByPeerID[suite.peerID] = suite
}
// Simulate a router by listening for all outbound envelopes and proxying the
// envelope to the respective peer (suite).
go func() {
for envelope := range primary.blockchainOutCh {
if envelope.Broadcast {
for _, s := range suites {
// broadcast to everyone except source
if s.peerID != primary.peerID {
s.blockchainInCh <- p2p.Envelope{
From: primary.peerID,
To: s.peerID,
Message: envelope.Message,
}
}
}
} else {
suitesByPeerID[envelope.To].blockchainInCh <- p2p.Envelope{
From: primary.peerID,
To: envelope.To,
Message: envelope.Message,
}
}
}
}()
go func() {
for pErr := range primary.blockchainPeerErrCh {
if dropChErr {
primary.reactor.Logger.Debug("dropped peer error", "err", pErr.Err)
} else {
primary.peerUpdatesCh <- p2p.PeerUpdate{
PeerID: pErr.PeerID,
Status: p2p.PeerStatusRemoved,
}
}
}
}()
}
func TestNoBlockResponse(t *testing.T) {
config = cfg.ResetTestRoot("blockchain_reactor_test")
func TestReactor_AbruptDisconnect(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(65)
genDoc, privVals := randGenesisDoc(config, 1, false, 30)
maxBlockHeight := int64(64)
testSuites := []*reactorTestSuite{
setup(t, genDoc, privVals, maxBlockHeight, 0),
setup(t, genDoc, privVals, 0, 0),
}
reactorPairs := make([]BlockchainReactorPair, 2)
require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height())
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
for _, s := range testSuites {
simulateRouter(s, testSuites, true)
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
// connect reactor to every other reactor
for _, ss := range testSuites {
if s.peerID != ss.peerID {
s.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: ss.peerID,
}
}
}
}
}, p2p.Connect2Switches)
secondaryPool := testSuites[1].reactor.pool
require.Eventually(
t,
func() bool {
height, _, _ := secondaryPool.GetStatus()
return secondaryPool.MaxPeerHeight() > 0 && height > 0 && height < 10
},
10*time.Second,
10*time.Millisecond,
"expected node to be partially synced",
)
// Remove synced node from the syncing node which should not result in any
// deadlocks or race conditions within the context of poolRoutine.
testSuites[1].peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusDown,
PeerID: testSuites[0].peerID,
}
}
defer func() {
for _, r := range reactorPairs {
err := r.reactor.Stop()
require.NoError(t, err)
err = r.app.Stop()
require.NoError(t, err)
func TestReactor_NoBlockResponse(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(config, 1, false, 30)
maxBlockHeight := int64(65)
testSuites := []*reactorTestSuite{
setup(t, genDoc, privVals, maxBlockHeight, 0),
setup(t, genDoc, privVals, 0, 0),
}
require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height())
for _, s := range testSuites {
simulateRouter(s, testSuites, true)
// connect reactor to every other reactor
for _, ss := range testSuites {
if s.peerID != ss.peerID {
s.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: ss.peerID,
}
}
}
}()
}
tests := []struct {
testCases := []struct {
height int64
existent bool
}{
@ -168,161 +292,114 @@ func TestNoBlockResponse(t *testing.T) {
{100, false},
}
for {
if reactorPairs[1].reactor.pool.IsCaughtUp() {
break
}
time.Sleep(10 * time.Millisecond)
}
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())
for _, tt := range tests {
block := reactorPairs[1].reactor.store.LoadBlock(tt.height)
if tt.existent {
assert.True(t, block != nil)
secondaryPool := testSuites[1].reactor.pool
require.Eventually(
t,
func() bool { return secondaryPool.MaxPeerHeight() > 0 && secondaryPool.IsCaughtUp() },
10*time.Second,
10*time.Millisecond,
"expected node to be fully synced",
)
for _, tc := range testCases {
block := testSuites[1].reactor.store.LoadBlock(tc.height)
if tc.existent {
require.True(t, block != nil)
} else {
assert.True(t, block == nil)
require.Nil(t, block)
}
}
}
// NOTE: This is too hard to test without
// an easy way to add test peer to switch
// or without significant refactoring of the module.
// Alternatively we could actually dial a TCP conn but
// that seems extreme.
func TestBadBlockStopsPeer(t *testing.T) {
config = cfg.ResetTestRoot("blockchain_reactor_test")
func TestReactor_BadBlockStopsPeer(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(148)
// Other chain needs a different validator set
otherGenDoc, otherPrivVals := randGenesisDoc(1, false, 30)
otherChain := newBlockchainReactor(log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight)
defer func() {
err := otherChain.reactor.Stop()
require.Error(t, err)
err = otherChain.app.Stop()
require.NoError(t, err)
}()
reactorPairs := make([]BlockchainReactorPair, 4)
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
maxBlockHeight := int64(48)
genDoc, privVals := randGenesisDoc(config, 1, false, 30)
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
testSuites := []*reactorTestSuite{
setup(t, genDoc, privVals, maxBlockHeight, 1000), // fully synced node
setup(t, genDoc, privVals, 0, 1000),
setup(t, genDoc, privVals, 0, 1000),
setup(t, genDoc, privVals, 0, 1000),
setup(t, genDoc, privVals, 0, 1000), // new node
}
}, p2p.Connect2Switches)
require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height())
defer func() {
for _, r := range reactorPairs {
err := r.reactor.Stop()
require.NoError(t, err)
for _, s := range testSuites[:len(testSuites)-1] {
simulateRouter(s, testSuites, true)
err = r.app.Stop()
require.NoError(t, err)
}
}()
for {
time.Sleep(1 * time.Second)
caughtUp := true
for _, r := range reactorPairs {
if !r.reactor.pool.IsCaughtUp() {
caughtUp = false
// connect reactor to every other reactor except the new node
for _, ss := range testSuites[:len(testSuites)-1] {
if s.peerID != ss.peerID {
s.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: ss.peerID,
}
}
}
if caughtUp {
break
}
}
// at this time, reactors[0-3] is the newest
assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size())
// Mark reactorPairs[3] as an invalid peer. Fiddling with .store without a mutex is a data
// race, but can't be easily avoided.
reactorPairs[3].reactor.store = otherChain.reactor.store
lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
return s
require.Eventually(
t,
func() bool {
caughtUp := true
for _, s := range testSuites[1 : len(testSuites)-1] {
if s.reactor.pool.MaxPeerHeight() == 0 || !s.reactor.pool.IsCaughtUp() {
caughtUp = false
}
}
}, p2p.Connect2Switches)...)
return caughtUp
},
10*time.Minute,
10*time.Millisecond,
"expected all nodes to be fully synced",
)
for i := 0; i < len(reactorPairs)-1; i++ {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
for _, s := range testSuites[:len(testSuites)-1] {
require.Len(t, s.reactor.pool.peers, 3)
}
for {
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
break
}
// Mark testSuites[3] as an invalid peer which will cause newSuite to disconnect
// from this peer.
otherGenDoc, otherPrivVals := randGenesisDoc(config, 1, false, 30)
otherSuite := setup(t, otherGenDoc, otherPrivVals, maxBlockHeight, 0)
testSuites[3].reactor.store = otherSuite.reactor.store
time.Sleep(1 * time.Second)
}
// add a fake peer just so we do not wait for the consensus ticker to timeout
otherSuite.reactor.pool.SetPeerRange("00ff", 10, 10)
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
}
//----------------------------------------------
// utility funcs
// start the new peer's faux router
newSuite := testSuites[len(testSuites)-1]
simulateRouter(newSuite, testSuites, false)
func makeTxs(height int64) (txs []types.Tx) {
for i := 0; i < 10; i++ {
txs = append(txs, types.Tx([]byte{byte(height), byte(i)}))
// connect all nodes to the new peer
for _, s := range testSuites[:len(testSuites)-1] {
newSuite.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: s.peerID,
}
}
return txs
}
func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address)
return block
}
type testApp struct {
abci.BaseApplication
}
var _ abci.Application = (*testApp)(nil)
func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) {
return abci.ResponseInfo{}
}
func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock {
return abci.ResponseBeginBlock{}
}
func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
return abci.ResponseEndBlock{}
}
func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
return abci.ResponseDeliverTx{Events: []abci.Event{}}
}
func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
return abci.ResponseCheckTx{}
}
func (app *testApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{}
}
func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) {
return
// wait for the new peer to catch up and become fully synced
require.Eventually(
t,
func() bool { return newSuite.reactor.pool.MaxPeerHeight() > 0 && newSuite.reactor.pool.IsCaughtUp() },
10*time.Minute,
10*time.Millisecond,
"expected new node to be fully synced",
)
require.Eventuallyf(
t,
func() bool { return len(newSuite.reactor.pool.peers) < len(testSuites)-1 },
10*time.Minute,
10*time.Millisecond,
"invalid number of peers; expected < %d, got: %d",
len(testSuites)-1,
len(newSuite.reactor.pool.peers),
)
}

+ 50
- 0
blockchain/v0/test_util.go View File

@ -0,0 +1,50 @@
package v0
import (
"sort"
cfg "github.com/tendermint/tendermint/config"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
)
func randGenesisDoc(
config *cfg.Config,
numValidators int,
randPower bool,
minPower int64,
) (*types.GenesisDoc, []types.PrivValidator) {
validators := make([]types.GenesisValidator, numValidators)
privValidators := make([]types.PrivValidator, numValidators)
for i := 0; i < numValidators; i++ {
val, privVal := types.RandValidator(randPower, minPower)
validators[i] = types.GenesisValidator{
PubKey: val.PubKey,
Power: val.VotingPower,
}
privValidators[i] = privVal
}
sort.Sort(types.PrivValidatorsByAddress(privValidators))
return &types.GenesisDoc{
GenesisTime: tmtime.Now(),
ChainID: config.ChainID(),
Validators: validators,
}, privValidators
}
func makeTxs(height int64) (txs []types.Tx) {
for i := 0; i < 10; i++ {
txs = append(txs, types.Tx([]byte{byte(height), byte(i)}))
}
return txs
}
func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address)
return block
}

+ 53
- 7
blockchain/v2/io.go View File

@ -3,7 +3,7 @@ package v2
import (
"errors"
bc "github.com/tendermint/tendermint/blockchain"
"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/p2p"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
"github.com/tendermint/tendermint/state"
@ -48,7 +48,15 @@ type consensusReactor interface {
}
func (sio *switchIO) sendBlockRequest(peer p2p.Peer, height int64) error {
msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: height})
msgProto := &bcproto.Message{
Sum: &bcproto.Message_BlockRequest{
BlockRequest: &bcproto.BlockRequest{
Height: height,
},
},
}
msgBytes, err := proto.Marshal(msgProto)
if err != nil {
return err
}
@ -61,7 +69,16 @@ func (sio *switchIO) sendBlockRequest(peer p2p.Peer, height int64) error {
}
func (sio *switchIO) sendStatusResponse(base int64, height int64, peer p2p.Peer) error {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{Height: height, Base: base})
msgProto := &bcproto.Message{
Sum: &bcproto.Message_StatusResponse{
StatusResponse: &bcproto.StatusResponse{
Height: height,
Base: base,
},
},
}
msgBytes, err := proto.Marshal(msgProto)
if err != nil {
return err
}
@ -83,10 +100,19 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peer p2p.Peer) error {
return err
}
msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: bpb})
msgProto := &bcproto.Message{
Sum: &bcproto.Message_BlockResponse{
BlockResponse: &bcproto.BlockResponse{
Block: bpb,
},
},
}
msgBytes, err := proto.Marshal(msgProto)
if err != nil {
return err
}
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
return errPeerQueueFull
}
@ -95,7 +121,15 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peer p2p.Peer) error {
}
func (sio *switchIO) sendBlockNotFound(height int64, peer p2p.Peer) error {
msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: height})
msgProto := &bcproto.Message{
Sum: &bcproto.Message_NoBlockResponse{
NoBlockResponse: &bcproto.NoBlockResponse{
Height: height,
},
},
}
msgBytes, err := proto.Marshal(msgProto)
if err != nil {
return err
}
@ -116,7 +150,13 @@ func (sio *switchIO) trySwitchToConsensus(state state.State, skipWAL bool) bool
}
func (sio *switchIO) sendStatusRequest(peer p2p.Peer) error {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{})
msgProto := &bcproto.Message{
Sum: &bcproto.Message_StatusRequest{
StatusRequest: &bcproto.StatusRequest{},
},
}
msgBytes, err := proto.Marshal(msgProto)
if err != nil {
return err
}
@ -129,7 +169,13 @@ func (sio *switchIO) sendStatusRequest(peer p2p.Peer) error {
}
func (sio *switchIO) broadcastStatusRequest() error {
msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{})
msgProto := &bcproto.Message{
Sum: &bcproto.Message_StatusRequest{
StatusRequest: &bcproto.StatusRequest{},
},
}
msgBytes, err := proto.Marshal(msgProto)
if err != nil {
return err
}


+ 29
- 18
blockchain/v2/reactor.go View File

@ -5,6 +5,8 @@ import (
"fmt"
"time"
proto "github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/behaviour"
bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/libs/log"
@ -466,49 +468,54 @@ func (r *BlockchainReactor) Stop() error {
func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
logger := r.logger.With("src", src.ID(), "chID", chID)
msg, err := bc.DecodeMsg(msgBytes)
if err != nil {
msgProto := new(bcproto.Message)
if err := proto.Unmarshal(msgBytes, msgProto); err != nil {
logger.Error("error decoding message", "err", err)
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
return
}
if err = bc.ValidateMsg(msg); err != nil {
logger.Error("peer sent us invalid msg", "msg", msg, "err", err)
if err := msgProto.Validate(); err != nil {
logger.Error("peer sent us an invalid msg", "msg", msgProto, "err", err)
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
return
}
r.logger.Debug("Receive", "msg", msg)
r.logger.Debug("received", "msg", msgProto)
switch msg := msg.(type) {
case *bcproto.StatusRequest:
switch msg := msgProto.Sum.(type) {
case *bcproto.Message_StatusRequest:
if err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), src); err != nil {
logger.Error("Could not send status message to src peer")
}
case *bcproto.BlockRequest:
block := r.store.LoadBlock(msg.Height)
case *bcproto.Message_BlockRequest:
block := r.store.LoadBlock(msg.BlockRequest.Height)
if block != nil {
if err = r.io.sendBlockToPeer(block, src); err != nil {
if err := r.io.sendBlockToPeer(block, src); err != nil {
logger.Error("Could not send block message to src peer", "err", err)
}
} else {
logger.Info("peer asking for a block we don't have", "height", msg.Height)
if err = r.io.sendBlockNotFound(msg.Height, src); err != nil {
logger.Info("peer asking for a block we don't have", "height", msg.BlockRequest.Height)
if err := r.io.sendBlockNotFound(msg.BlockRequest.Height, src); err != nil {
logger.Error("Couldn't send block not found msg", "err", err)
}
}
case *bcproto.StatusResponse:
case *bcproto.Message_StatusResponse:
r.mtx.RLock()
if r.events != nil {
r.events <- bcStatusResponse{peerID: src.ID(), base: msg.Base, height: msg.Height}
r.events <- bcStatusResponse{
peerID: src.ID(),
base: msg.StatusResponse.Base,
height: msg.StatusResponse.Height,
}
}
r.mtx.RUnlock()
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
case *bcproto.Message_BlockResponse:
bi, err := types.BlockFromProto(msg.BlockResponse.Block)
if err != nil {
logger.Error("error transitioning block from protobuf", "err", err)
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
@ -525,10 +532,14 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
}
r.mtx.RUnlock()
case *bcproto.NoBlockResponse:
case *bcproto.Message_NoBlockResponse:
r.mtx.RLock()
if r.events != nil {
r.events <- bcNoBlockResponse{peerID: src.ID(), height: msg.Height, time: time.Now()}
r.events <- bcNoBlockResponse{
peerID: src.ID(),
height: msg.NoBlockResponse.Height,
time: time.Now(),
}
}
r.mtx.RUnlock()
}


+ 25
- 11
blockchain/v2/reactor_test.go View File

@ -9,13 +9,13 @@ import (
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/behaviour"
bc "github.com/tendermint/tendermint/blockchain"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
@ -409,23 +409,37 @@ func TestReactorHelperMode(t *testing.T) {
switch ev := step.event.(type) {
case bcproto.StatusRequest:
old := mockSwitch.numStatusResponse
msg, err := bc.EncodeMsg(&ev)
assert.NoError(t, err)
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg)
msgProto := new(bcproto.Message)
require.NoError(t, msgProto.Wrap(&ev))
msgBz, err := proto.Marshal(msgProto)
require.NoError(t, err)
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz)
assert.Equal(t, old+1, mockSwitch.numStatusResponse)
case bcproto.BlockRequest:
if ev.Height > params.startHeight {
old := mockSwitch.numNoBlockResponse
msg, err := bc.EncodeMsg(&ev)
assert.NoError(t, err)
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg)
msgProto := new(bcproto.Message)
require.NoError(t, msgProto.Wrap(&ev))
msgBz, err := proto.Marshal(msgProto)
require.NoError(t, err)
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz)
assert.Equal(t, old+1, mockSwitch.numNoBlockResponse)
} else {
old := mockSwitch.numBlockResponse
msg, err := bc.EncodeMsg(&ev)
assert.NoError(t, err)
assert.NoError(t, err)
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg)
msgProto := new(bcproto.Message)
require.NoError(t, msgProto.Wrap(&ev))
msgBz, err := proto.Marshal(msgProto)
require.NoError(t, err)
reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz)
assert.Equal(t, old+1, mockSwitch.numBlockResponse)
}
}


+ 65
- 21
node/node.go View File

@ -192,7 +192,7 @@ type Node struct {
eventBus *types.EventBus // pub/sub for services
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
bcReactor p2p.Reactor // for fast-syncing
bcReactor service.Service // for fast-syncing
mempoolReactor *mempl.Reactor // for gossipping transactions
mempool mempl.Mempool
stateSync bool // whether the node should state sync on startup
@ -370,24 +370,41 @@ func createEvidenceReactor(
return evidenceReactorShim, evidenceReactor, evidencePool, nil
}
func createBlockchainReactor(config *cfg.Config,
func createBlockchainReactor(
logger log.Logger,
config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore *store.BlockStore,
csReactor *cs.Reactor,
fastSync bool,
logger log.Logger) (bcReactor p2p.Reactor, err error) {
) (*p2p.ReactorShim, service.Service, error) {
logger = logger.With("module", "blockchain")
switch config.FastSync.Version {
case "v0":
bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
reactor, err := bcv0.NewReactor(
logger, state.Copy(), blockExec, blockStore, csReactor,
reactorShim.GetChannel(bcv0.BlockchainChannel), reactorShim.PeerUpdates, fastSync,
)
if err != nil {
return nil, nil, err
}
return reactorShim, reactor, nil
case "v2":
bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
reactor.SetLogger(logger)
return nil, reactor, nil
default:
return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}
bcReactor.SetLogger(logger.With("module", "blockchain"))
return bcReactor, nil
}
func createConsensusReactor(config *cfg.Config,
@ -747,12 +764,28 @@ func NewNode(config *cfg.Config,
sm.BlockExecutorWithMetrics(smMetrics),
)
// Make BlockchainReactor. Don't start fast sync if we're doing a state sync first.
bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, fastSync && !stateSync, logger)
csReactor, csState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evPool,
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger,
)
// Create the blockchain reactor. Note, we do not start fast sync if we're
// doing a state sync first.
bcReactorShim, bcReactor, err := createBlockchainReactor(
logger, config, state, blockExec, blockStore, csReactor, fastSync && !stateSync,
)
if err != nil {
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
}
// TODO: Remove this once the switch is removed.
var bcReactorForSwitch p2p.Reactor
if bcReactorShim != nil {
bcReactorForSwitch = bcReactorShim
} else {
bcReactorForSwitch = bcReactor.(p2p.Reactor)
}
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them.
if stateSync {
@ -761,11 +794,6 @@ func NewNode(config *cfg.Config,
csMetrics.FastSyncing.Set(1)
}
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evPool,
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger,
)
// Set up state sync reactor, and schedule a sync if requested.
// FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy,
// we should clean this whole thing up. See:
@ -791,8 +819,8 @@ func NewNode(config *cfg.Config,
p2pLogger := logger.With("module", "p2p")
transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp)
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
stateSyncReactorShim, consensusReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger,
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactorForSwitch,
stateSyncReactorShim, csReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
@ -850,8 +878,8 @@ func NewNode(config *cfg.Config,
bcReactor: bcReactor,
mempoolReactor: mempoolReactor,
mempool: mempool,
consensusState: consensusState,
consensusReactor: consensusReactor,
consensusState: csState,
consensusReactor: csReactor,
stateSyncReactor: stateSyncReactor,
stateSync: stateSync,
stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
@ -924,6 +952,13 @@ func (n *Node) OnStart() error {
n.isListening = true
if n.config.FastSync.Version == "v0" {
// Start the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Start(); err != nil {
return err
}
}
// Start the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Start(); err != nil {
return err
@ -975,6 +1010,13 @@ func (n *Node) OnStop() {
n.Logger.Error("Error closing switch", "err", err)
}
if n.config.FastSync.Version == "v0" {
// Stop the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the blockchain reactor", "err", err)
}
}
// Stop the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the state sync reactor", "err", err)
@ -1286,9 +1328,11 @@ func makeNodeInfo(
var bcChannel byte
switch config.FastSync.Version {
case "v0":
bcChannel = bcv0.BlockchainChannel
bcChannel = byte(bcv0.BlockchainChannel)
case "v2":
bcChannel = bcv2.BlockchainChannel
default:
return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}


+ 107
- 0
proto/tendermint/blockchain/message.go View File

@ -0,0 +1,107 @@
package blockchain
import (
"errors"
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
)
const (
BlockResponseMessagePrefixSize = 4
BlockResponseMessageFieldKeySize = 1
)
// Wrap implements the p2p Wrapper interface and wraps a blockchain messages.
func (m *Message) Wrap(pb proto.Message) error {
switch msg := pb.(type) {
case *BlockRequest:
m.Sum = &Message_BlockRequest{BlockRequest: msg}
case *BlockResponse:
m.Sum = &Message_BlockResponse{BlockResponse: msg}
case *NoBlockResponse:
m.Sum = &Message_NoBlockResponse{NoBlockResponse: msg}
case *StatusRequest:
m.Sum = &Message_StatusRequest{StatusRequest: msg}
case *StatusResponse:
m.Sum = &Message_StatusResponse{StatusResponse: msg}
default:
return fmt.Errorf("unknown message: %T", msg)
}
return nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped blockchain
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_BlockRequest:
return m.GetBlockRequest(), nil
case *Message_BlockResponse:
return m.GetBlockResponse(), nil
case *Message_NoBlockResponse:
return m.GetNoBlockResponse(), nil
case *Message_StatusRequest:
return m.GetStatusRequest(), nil
case *Message_StatusResponse:
return m.GetStatusResponse(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}
// Validate validates the message returning an error upon failure.
func (m *Message) Validate() error {
if m == nil {
return errors.New("message cannot be nil")
}
switch msg := m.Sum.(type) {
case *Message_BlockRequest:
if m.GetBlockRequest().Height < 0 {
return errors.New("negative Height")
}
case *Message_BlockResponse:
// validate basic is called later when converting from proto
return nil
case *Message_NoBlockResponse:
if m.GetNoBlockResponse().Height < 0 {
return errors.New("negative Height")
}
case *Message_StatusResponse:
if m.GetStatusResponse().Base < 0 {
return errors.New("negative Base")
}
if m.GetStatusResponse().Height < 0 {
return errors.New("negative Height")
}
if m.GetStatusResponse().Base > m.GetStatusResponse().Height {
return fmt.Errorf(
"base %v cannot be greater than height %v",
m.GetStatusResponse().Base, m.GetStatusResponse().Height,
)
}
case *Message_StatusRequest:
return nil
default:
return fmt.Errorf("unknown message type: %T", msg)
}
return nil
}

blockchain/msgs_test.go → proto/tendermint/blockchain/message_test.go View File


+ 2
- 2
proto/tendermint/statesync/message.go View File

@ -8,8 +8,8 @@ import (
)
// Wrap implements the p2p Wrapper interface and wraps a state sync messages.
func (m *Message) Wrap(msg proto.Message) error {
switch msg := msg.(type) {
func (m *Message) Wrap(pb proto.Message) error {
switch msg := pb.(type) {
case *ChunkRequest:
m.Sum = &Message_ChunkRequest{ChunkRequest: msg}


+ 2
- 2
statesync/reactor.go View File

@ -349,7 +349,7 @@ func (r *Reactor) processSnapshotCh() {
select {
case envelope := <-r.snapshotCh.In():
if err := r.handleMessage(r.snapshotCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process envelope", "ch_id", r.snapshotCh.ID(), "envelope", envelope, "err", err)
r.Logger.Error("failed to process message", "ch_id", r.snapshotCh.ID(), "envelope", envelope, "err", err)
r.snapshotCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,
@ -376,7 +376,7 @@ func (r *Reactor) processChunkCh() {
select {
case envelope := <-r.chunkCh.In():
if err := r.handleMessage(r.chunkCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process envelope", "ch_id", r.chunkCh.ID(), "envelope", envelope, "err", err)
r.Logger.Error("failed to process message", "ch_id", r.chunkCh.ID(), "envelope", envelope, "err", err)
r.chunkCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,


+ 66
- 21
test/maverick/node/node.go View File

@ -233,7 +233,7 @@ type Node struct {
eventBus *types.EventBus // pub/sub for services
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
bcReactor p2p.Reactor // for fast-syncing
bcReactor service.Service // for fast-syncing
mempoolReactor *mempl.Reactor // for gossipping transactions
mempool mempl.Mempool
stateSync bool // whether the node should state sync on startup
@ -411,24 +411,41 @@ func createEvidenceReactor(
return evidenceReactorShim, evidenceReactor, evidencePool, nil
}
func createBlockchainReactor(config *cfg.Config,
func createBlockchainReactor(
logger log.Logger,
config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore *store.BlockStore,
csReactor *cs.Reactor,
fastSync bool,
logger log.Logger) (bcReactor p2p.Reactor, err error) {
) (*p2p.ReactorShim, service.Service, error) {
logger = logger.With("module", "blockchain")
switch config.FastSync.Version {
case "v0":
bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
reactor, err := bcv0.NewReactor(
logger, state.Copy(), blockExec, blockStore, csReactor,
reactorShim.GetChannel(bcv0.BlockchainChannel), reactorShim.PeerUpdates, fastSync,
)
if err != nil {
return nil, nil, err
}
return reactorShim, reactor, nil
case "v2":
bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
reactor.SetLogger(logger)
return nil, reactor, nil
default:
return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}
bcReactor.SetLogger(logger.With("module", "blockchain"))
return bcReactor, nil
}
func createConsensusReactor(config *cfg.Config,
@ -780,12 +797,29 @@ func NewNode(config *cfg.Config,
sm.BlockExecutorWithMetrics(smMetrics),
)
// Make BlockchainReactor. Don't start fast sync if we're doing a state sync first.
bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, fastSync && !stateSync, logger)
logger.Info("Setting up maverick consensus reactor", "Misbehaviors", misbehaviors)
csReactor, csState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evPool,
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, misbehaviors,
)
// Create the blockchain reactor. Note, we do not start fast sync if we're
// doing a state sync first.
bcReactorShim, bcReactor, err := createBlockchainReactor(
logger, config, state, blockExec, blockStore, csReactor, fastSync && !stateSync,
)
if err != nil {
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
}
// TODO: Remove this once the switch is removed.
var bcReactorForSwitch p2p.Reactor
if bcReactorShim != nil {
bcReactorForSwitch = bcReactorShim
} else {
bcReactorForSwitch = bcReactor.(p2p.Reactor)
}
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them.
if stateSync {
@ -794,11 +828,6 @@ func NewNode(config *cfg.Config,
csMetrics.FastSyncing.Set(1)
}
logger.Info("Setting up maverick consensus reactor", "Misbehaviors", misbehaviors)
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evPool,
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, misbehaviors)
// Set up state sync reactor, and schedule a sync if requested.
// FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy,
// we should clean this whole thing up. See:
@ -824,8 +853,8 @@ func NewNode(config *cfg.Config,
p2pLogger := logger.With("module", "p2p")
transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp)
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
stateSyncReactorShim, consensusReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger,
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactorForSwitch,
stateSyncReactorShim, csReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
@ -883,8 +912,8 @@ func NewNode(config *cfg.Config,
bcReactor: bcReactor,
mempoolReactor: mempoolReactor,
mempool: mempool,
consensusState: consensusState,
consensusReactor: consensusReactor,
consensusState: csState,
consensusReactor: csReactor,
stateSyncReactor: stateSyncReactor,
stateSync: stateSync,
stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
@ -957,6 +986,13 @@ func (n *Node) OnStart() error {
n.isListening = true
if n.config.FastSync.Version == "v0" {
// Start the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Start(); err != nil {
return err
}
}
// Start the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Start(); err != nil {
return err
@ -1008,6 +1044,13 @@ func (n *Node) OnStop() {
n.Logger.Error("Error closing switch", "err", err)
}
if n.config.FastSync.Version == "v0" {
// Stop the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the blockchain reactor", "err", err)
}
}
// Stop the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the state sync reactor", "err", err)
@ -1317,9 +1360,11 @@ func makeNodeInfo(
var bcChannel byte
switch config.FastSync.Version {
case "v0":
bcChannel = bcv0.BlockchainChannel
bcChannel = byte(bcv0.BlockchainChannel)
case "v2":
bcChannel = bcv2.BlockchainChannel
default:
return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}


Loading…
Cancel
Save