diff --git a/blockchain/msgs.go b/blockchain/msgs.go index cd5ef977f..e901ae1e3 100644 --- a/blockchain/msgs.go +++ b/blockchain/msgs.go @@ -1,108 +1,12 @@ package blockchain import ( - "errors" - "fmt" - - "github.com/gogo/protobuf/proto" - bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" "github.com/tendermint/tendermint/types" ) const ( - // NOTE: keep up to date with bcproto.BlockResponse - BlockResponseMessagePrefixSize = 4 - BlockResponseMessageFieldKeySize = 1 - MaxMsgSize = types.MaxBlockSizeBytes + - BlockResponseMessagePrefixSize + - BlockResponseMessageFieldKeySize + MaxMsgSize = types.MaxBlockSizeBytes + + bcproto.BlockResponseMessagePrefixSize + + bcproto.BlockResponseMessageFieldKeySize ) - -// EncodeMsg encodes a Protobuf message -func EncodeMsg(pb proto.Message) ([]byte, error) { - msg := bcproto.Message{} - - switch pb := pb.(type) { - case *bcproto.BlockRequest: - msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb} - case *bcproto.BlockResponse: - msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb} - case *bcproto.NoBlockResponse: - msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb} - case *bcproto.StatusRequest: - msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb} - case *bcproto.StatusResponse: - msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb} - default: - return nil, fmt.Errorf("unknown message type %T", pb) - } - - bz, err := proto.Marshal(&msg) - if err != nil { - return nil, fmt.Errorf("unable to marshal %T: %w", pb, err) - } - - return bz, nil -} - -// DecodeMsg decodes a Protobuf message. -func DecodeMsg(bz []byte) (proto.Message, error) { - pb := &bcproto.Message{} - - err := proto.Unmarshal(bz, pb) - if err != nil { - return nil, err - } - - switch msg := pb.Sum.(type) { - case *bcproto.Message_BlockRequest: - return msg.BlockRequest, nil - case *bcproto.Message_BlockResponse: - return msg.BlockResponse, nil - case *bcproto.Message_NoBlockResponse: - return msg.NoBlockResponse, nil - case *bcproto.Message_StatusRequest: - return msg.StatusRequest, nil - case *bcproto.Message_StatusResponse: - return msg.StatusResponse, nil - default: - return nil, fmt.Errorf("unknown message type %T", msg) - } -} - -// ValidateMsg validates a message. -func ValidateMsg(pb proto.Message) error { - if pb == nil { - return errors.New("message cannot be nil") - } - - switch msg := pb.(type) { - case *bcproto.BlockRequest: - if msg.Height < 0 { - return errors.New("negative Height") - } - case *bcproto.BlockResponse: - // validate basic is called later when converting from proto - return nil - case *bcproto.NoBlockResponse: - if msg.Height < 0 { - return errors.New("negative Height") - } - case *bcproto.StatusResponse: - if msg.Base < 0 { - return errors.New("negative Base") - } - if msg.Height < 0 { - return errors.New("negative Height") - } - if msg.Base > msg.Height { - return fmt.Errorf("base %v cannot be greater than height %v", msg.Base, msg.Height) - } - case *bcproto.StatusRequest: - return nil - default: - return fmt.Errorf("unknown message type %T", msg) - } - return nil -} diff --git a/blockchain/v0/pool.go b/blockchain/v0/pool.go index 7541c06f9..bbb5bbf80 100644 --- a/blockchain/v0/pool.go +++ b/blockchain/v0/pool.go @@ -30,6 +30,7 @@ eg, L = latency = 0.1s const ( requestIntervalMS = 2 maxTotalRequesters = 600 + maxPeerErrBuffer = 1000 maxPendingRequests = maxTotalRequesters maxPendingRequestsPerPeer = 20 @@ -180,6 +181,7 @@ func (pool *BlockPool) IsCaughtUp() bool { if len(pool.peers) == 0 { return false } + // NOTE: we use maxPeerHeight - 1 because to sync block H requires block H+1 // to verify the LastCommit. return pool.height >= (pool.maxPeerHeight - 1) diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index 0935a6218..c77a8b634 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -2,11 +2,12 @@ package v0 import ( "fmt" - "reflect" + "sync" "time" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/p2p" bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" sm "github.com/tendermint/tendermint/state" @@ -14,18 +15,39 @@ import ( "github.com/tendermint/tendermint/types" ) +var ( + _ service.Service = (*Reactor)(nil) + + // ChannelShims contains a map of ChannelDescriptorShim objects, where each + // object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding + // p2p proto.Message the new p2p Channel is responsible for handling. + // + // + // TODO: Remove once p2p refactor is complete. + // ref: https://github.com/tendermint/tendermint/issues/5670 + ChannelShims = map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ + BlockchainChannel: { + MsgType: new(bcproto.Message), + Descriptor: &p2p.ChannelDescriptor{ + ID: byte(BlockchainChannel), + Priority: 5, + SendQueueCapacity: 1000, + RecvBufferCapacity: 50 * 4096, + RecvMessageCapacity: bc.MaxMsgSize, + }, + }, + } +) + const ( - // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) - BlockchainChannel = byte(0x40) + // BlockchainChannel is a channel for blocks and status updates + BlockchainChannel = p2p.ChannelID(0x40) trySyncIntervalMS = 10 - // stop syncing when last block's time is - // within this much of the system time. - // stopSyncingDurationMinutes = 10 - // ask for best height every 10s statusUpdateIntervalSeconds = 10 + // check if we should switch to consensus reactor switchToConsensusIntervalSeconds = 1 @@ -34,8 +56,8 @@ const ( ) type consensusReactor interface { - // for when we switch from blockchain reactor and fast sync to - // the consensus machine + // For when we switch from blockchain reactor and fast sync to the consensus + // machine. SwitchToConsensus(state sm.State, skipWAL bool) } @@ -49,339 +71,435 @@ func (e peerError) Error() string { } // BlockchainReactor handles long-term catchup syncing. -type BlockchainReactor struct { - p2p.BaseReactor +type Reactor struct { + service.BaseService // immutable initialState sm.State - blockExec *sm.BlockExecutor - store *store.BlockStore - pool *BlockPool - fastSync bool + blockExec *sm.BlockExecutor + store *store.BlockStore + pool *BlockPool + consReactor consensusReactor + fastSync bool + + blockchainCh *p2p.Channel + peerUpdates *p2p.PeerUpdatesCh + closeCh chan struct{} requestsCh <-chan BlockRequest errorsCh <-chan peerError -} -// NewBlockchainReactor returns new reactor instance. -func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, - fastSync bool) *BlockchainReactor { + // poolWG is used to synchronize the graceful shutdown of the poolRoutine and + // requestRoutine spawned goroutines when stopping the reactor and before + // stopping the p2p Channel(s). + poolWG sync.WaitGroup +} +// NewReactor returns new reactor instance. +func NewReactor( + logger log.Logger, + state sm.State, + blockExec *sm.BlockExecutor, + store *store.BlockStore, + consReactor consensusReactor, + blockchainCh *p2p.Channel, + peerUpdates *p2p.PeerUpdatesCh, + fastSync bool, +) (*Reactor, error) { if state.LastBlockHeight != store.Height() { - panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, - store.Height())) + return nil, fmt.Errorf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()) } - requestsCh := make(chan BlockRequest, maxTotalRequesters) - - const capacity = 1000 // must be bigger than peers count - errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock - startHeight := store.Height() + 1 if startHeight == 1 { startHeight = state.InitialHeight } - pool := NewBlockPool(startHeight, requestsCh, errorsCh) - bcR := &BlockchainReactor{ + requestsCh := make(chan BlockRequest, maxTotalRequesters) + errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. + + r := &Reactor{ initialState: state, blockExec: blockExec, store: store, - pool: pool, + pool: NewBlockPool(startHeight, requestsCh, errorsCh), + consReactor: consReactor, fastSync: fastSync, requestsCh: requestsCh, errorsCh: errorsCh, + blockchainCh: blockchainCh, + peerUpdates: peerUpdates, + closeCh: make(chan struct{}), } - bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) - return bcR -} -// SetLogger implements service.Service by setting the logger on reactor and pool. -func (bcR *BlockchainReactor) SetLogger(l log.Logger) { - bcR.BaseService.Logger = l - bcR.pool.Logger = l + r.BaseService = *service.NewBaseService(logger, "Blockchain", r) + return r, nil } -// OnStart implements service.Service. -func (bcR *BlockchainReactor) OnStart() error { - if bcR.fastSync { - err := bcR.pool.Start() - if err != nil { +// OnStart starts separate go routines for each p2p Channel and listens for +// envelopes on each. In addition, it also listens for peer updates and handles +// messages on that p2p channel accordingly. The caller must be sure to execute +// OnStop to ensure the outbound p2p Channels are closed. +// +// If fastSync is enabled, we also start the pool and the pool processing +// goroutine. If the pool fails to start, an error is returned. +func (r *Reactor) OnStart() error { + if r.fastSync { + if err := r.pool.Start(); err != nil { return err } - go bcR.poolRoutine(false) + + go r.poolRoutine(false) } - return nil -} -// SwitchToFastSync is called by the state sync reactor when switching to fast sync. -func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error { - bcR.fastSync = true - bcR.initialState = state + go r.processBlockchainCh() + go r.processPeerUpdates() - bcR.pool.height = state.LastBlockHeight + 1 - err := bcR.pool.Start() - if err != nil { - return err - } - go bcR.poolRoutine(true) return nil } -// OnStop implements service.Service. -func (bcR *BlockchainReactor) OnStop() { - if bcR.fastSync { - if err := bcR.pool.Stop(); err != nil { - bcR.Logger.Error("Error stopping pool", "err", err) +// OnStop stops the reactor by signaling to all spawned goroutines to exit and +// blocking until they all exit. +func (r *Reactor) OnStop() { + if r.fastSync { + if err := r.pool.Stop(); err != nil { + r.Logger.Error("failed to stop pool", "err", err) } } -} -// GetChannels implements Reactor -func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { - return []*p2p.ChannelDescriptor{ - { - ID: BlockchainChannel, - Priority: 5, - SendQueueCapacity: 1000, - RecvBufferCapacity: 50 * 4096, - RecvMessageCapacity: bc.MaxMsgSize, - }, - } + // wait for the poolRoutine and requestRoutine goroutines to gracefully exit + r.poolWG.Wait() + + // Close closeCh to signal to all spawned goroutines to gracefully exit. All + // p2p Channels should execute Close(). + close(r.closeCh) + + // Wait for all p2p Channels to be closed before returning. This ensures we + // can easily reason about synchronization of all p2p Channels and ensure no + // panics will occur. + <-r.blockchainCh.Done() + <-r.peerUpdates.Done() } -// AddPeer implements Reactor by sending our state to peer. -func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) { - msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{ - Base: bcR.store.Base(), - Height: bcR.store.Height()}) - if err != nil { - bcR.Logger.Error("could not convert msg to protobuf", "err", err) - return - } +// respondToPeer loads a block and sends it to the requesting peer, if we have it. +// Otherwise, we'll respond saying we do not have it. +func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID p2p.NodeID) { + block := r.store.LoadBlock(msg.Height) + if block != nil { + blockProto, err := block.ToProto() + if err != nil { + r.Logger.Error("failed to convert msg to protobuf", "err", err) + return + } - _ = peer.Send(BlockchainChannel, msgBytes) - // it's OK if send fails. will try later in poolRoutine + r.blockchainCh.Out() <- p2p.Envelope{ + To: peerID, + Message: &bcproto.BlockResponse{Block: blockProto}, + } - // peer is added to the pool once we receive the first - // bcStatusResponseMessage from the peer and call pool.SetPeerRange -} + return + } -// RemovePeer implements Reactor by removing peer from the pool. -func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { - bcR.pool.RemovePeer(peer.ID()) + r.Logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height) + r.blockchainCh.Out() <- p2p.Envelope{ + To: peerID, + Message: &bcproto.NoBlockResponse{Height: msg.Height}, + } } -// respondToPeer loads a block and sends it to the requesting peer, -// if we have it. Otherwise, we'll respond saying we don't have it. -func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest, - src p2p.Peer) (queued bool) { +// handleBlockchainMessage handles enevelopes sent from peers on the +// BlockchainChannel. It returns an error only if the Envelope.Message is unknown +// for this channel. This should never be called outside of handleMessage. +func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error { + switch msg := envelope.Message.(type) { + case *bcproto.BlockRequest: + r.respondToPeer(msg, envelope.From) - block := bcR.store.LoadBlock(msg.Height) - if block != nil { - bl, err := block.ToProto() + case *bcproto.BlockResponse: + block, err := types.BlockFromProto(msg.Block) if err != nil { - bcR.Logger.Error("could not convert msg to protobuf", "err", err) - return false + r.Logger.Error("failed to convert block from proto", "err", err) + return err } - msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: bl}) - if err != nil { - bcR.Logger.Error("could not marshal msg", "err", err) - return false + r.pool.AddBlock(envelope.From, block, block.Size()) + + case *bcproto.StatusRequest: + r.blockchainCh.Out() <- p2p.Envelope{ + To: envelope.From, + Message: &bcproto.StatusResponse{ + Height: r.store.Height(), + Base: r.store.Base(), + }, } - return src.TrySend(BlockchainChannel, msgBytes) + case *bcproto.StatusResponse: + r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height) + + case *bcproto.NoBlockResponse: + r.Logger.Debug( + "peer does not have the requested block", + "height", msg.Height, + "peer", envelope.From, + ) + + default: + r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From) + return fmt.Errorf("received unknown message: %T", msg) + } - bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height) + return nil +} - msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height}) - if err != nil { - bcR.Logger.Error("could not convert msg to protobuf", "err", err) - return false +// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. +// It will handle errors and any possible panics gracefully. A caller can handle +// any error returned by sending a PeerError on the respective channel. +func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) { + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("panic in processing message: %v", e) + r.Logger.Error("recovering from processing message panic", "err", err) + } + }() + + r.Logger.Debug("received message", "message", envelope.Message, "peer", envelope.From) + + switch chID { + case BlockchainChannel: + err = r.handleBlockchainMessage(envelope) + + default: + err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) } - return src.TrySend(BlockchainChannel, msgBytes) + return err } -// Receive implements Reactor by handling 4 types of messages (look below). -// XXX: do not call any methods that can block or incur heavy processing. -// https://github.com/tendermint/tendermint/issues/2888 -func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - logger := bcR.Logger.With("src", src, "chId", chID) +// processBlockchainCh initiates a blocking process where we listen for and handle +// envelopes on the BlockchainChannel. Any error encountered during message +// execution will result in a PeerError being sent on the BlockchainChannel. When +// the reactor is stopped, we will catch the signal and close the p2p Channel +// gracefully. +func (r *Reactor) processBlockchainCh() { + defer r.blockchainCh.Close() - msg, err := bc.DecodeMsg(msgBytes) - if err != nil { - logger.Error("Error decoding message", "err", err) - bcR.Switch.StopPeerForError(src, err) - return + for { + select { + case envelope := <-r.blockchainCh.In(): + if err := r.handleMessage(r.blockchainCh.ID(), envelope); err != nil { + r.Logger.Error("failed to process message", "ch_id", r.blockchainCh.ID(), "envelope", envelope, "err", err) + r.blockchainCh.Error() <- p2p.PeerError{ + PeerID: envelope.From, + Err: err, + Severity: p2p.PeerErrorSeverityLow, + } + } + + case <-r.closeCh: + r.Logger.Debug("stopped listening on blockchain channel; closing...") + return + } } +} - if err = bc.ValidateMsg(msg); err != nil { - logger.Error("Peer sent us invalid msg", "msg", msg, "err", err) - bcR.Switch.StopPeerForError(src, err) +// processPeerUpdate processes a PeerUpdate, returning an error upon failing to +// handle the PeerUpdate or if a panic is recovered. +func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { + r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status) + + // XXX: Pool#RedoRequest can sometimes give us an empty peer + if len(peerUpdate.PeerID) == 0 { return } - logger.Debug("Receive", "msg", msg) + switch peerUpdate.Status { + case p2p.PeerStatusNew, p2p.PeerStatusUp: + // send a status update the newly added peer + r.blockchainCh.Out() <- p2p.Envelope{ + To: peerUpdate.PeerID, + Message: &bcproto.StatusResponse{ + Base: r.store.Base(), + Height: r.store.Height(), + }, + } - switch msg := msg.(type) { - case *bcproto.BlockRequest: - bcR.respondToPeer(msg, src) - case *bcproto.BlockResponse: - bi, err := types.BlockFromProto(msg.Block) - if err != nil { - logger.Error("Block content is invalid", "err", err) - bcR.Switch.StopPeerForError(src, err) + case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned: + r.pool.RemovePeer(peerUpdate.PeerID) + } +} + +// processPeerUpdates initiates a blocking process where we listen for and handle +// PeerUpdate messages. When the reactor is stopped, we will catch the signal and +// close the p2p PeerUpdatesCh gracefully. +func (r *Reactor) processPeerUpdates() { + defer r.peerUpdates.Close() + + for { + select { + case peerUpdate := <-r.peerUpdates.Updates(): + r.processPeerUpdate(peerUpdate) + + case <-r.closeCh: + r.Logger.Debug("stopped listening on peer updates channel; closing...") return } - bcR.pool.AddBlock(src.ID(), bi, len(msgBytes)) - case *bcproto.StatusRequest: - // Send peer our state. - msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{ - Height: bcR.store.Height(), - Base: bcR.store.Base(), - }) - if err != nil { - logger.Error("could not convert msg to protobut", "err", err) + } +} + +// SwitchToFastSync is called by the state sync reactor when switching to fast +// sync. +func (r *Reactor) SwitchToFastSync(state sm.State) error { + r.fastSync = true + r.initialState = state + r.pool.height = state.LastBlockHeight + 1 + + if err := r.pool.Start(); err != nil { + return err + } + + go r.poolRoutine(true) + return nil +} + +func (r *Reactor) requestRoutine() { + statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) + defer statusUpdateTicker.Stop() + + r.poolWG.Add(1) + defer r.poolWG.Done() + + for { + select { + case <-r.closeCh: + return + + case <-r.pool.Quit(): return + + case request := <-r.requestsCh: + r.blockchainCh.Out() <- p2p.Envelope{ + To: request.PeerID, + Message: &bcproto.BlockRequest{Height: request.Height}, + } + + case pErr := <-r.errorsCh: + r.blockchainCh.Error() <- p2p.PeerError{ + PeerID: pErr.peerID, + Err: pErr.err, + Severity: p2p.PeerErrorSeverityLow, + } + + case <-statusUpdateTicker.C: + r.poolWG.Add(1) + + go func() { + defer r.poolWG.Done() + + r.blockchainCh.Out() <- p2p.Envelope{ + Broadcast: true, + Message: &bcproto.StatusRequest{}, + } + }() } - src.TrySend(BlockchainChannel, msgBytes) - case *bcproto.StatusResponse: - // Got a peer status. Unverified. - bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height) - case *bcproto.NoBlockResponse: - logger.Debug("Peer does not have requested block", "height", msg.Height) - default: - logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } } -// Handle messages from the poolReactor telling the reactor what to do. +// poolRoutine handles messages from the poolReactor telling the reactor what to +// do. +// // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! -func (bcR *BlockchainReactor) poolRoutine(stateSynced bool) { +func (r *Reactor) poolRoutine(stateSynced bool) { var ( trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond) - statusUpdateTicker = time.NewTicker(statusUpdateIntervalSeconds * time.Second) switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second) blocksSynced = uint64(0) - chainID = bcR.initialState.ChainID - state = bcR.initialState + chainID = r.initialState.ChainID + state = r.initialState lastHundred = time.Now() lastRate = 0.0 didProcessCh = make(chan struct{}, 1) ) + defer trySyncTicker.Stop() - defer statusUpdateTicker.Stop() defer switchToConsensusTicker.Stop() - go func() { - for { - select { - - case <-bcR.Quit(): - return - - case <-bcR.pool.Quit(): - return - - case request := <-bcR.requestsCh: - peer := bcR.Switch.Peers().Get(request.PeerID) - if peer == nil { - bcR.Logger.Debug("Can't send request: no peer", "peer_id", request.PeerID) - continue - } - msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: request.Height}) - if err != nil { - bcR.Logger.Error("could not convert BlockRequest to proto", "err", err) - continue - } + go r.requestRoutine() - queued := peer.TrySend(BlockchainChannel, msgBytes) - if !queued { - bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height) - } - - case err := <-bcR.errorsCh: - peer := bcR.Switch.Peers().Get(err.peerID) - if peer != nil { - bcR.Switch.StopPeerForError(peer, err) - } - - case <-statusUpdateTicker.C: - // ask for status updates - go bcR.BroadcastStatusRequest() - } - } - }() + r.poolWG.Add(1) + defer r.poolWG.Done() FOR_LOOP: for { select { - case <-switchToConsensusTicker.C: var ( - height, numPending, lenRequesters = bcR.pool.GetStatus() - outbound, inbound, _ = bcR.Switch.NumPeers() - lastAdvance = bcR.pool.LastAdvance() + height, numPending, lenRequesters = r.pool.GetStatus() + lastAdvance = r.pool.LastAdvance() ) - bcR.Logger.Debug("Consensus ticker", - "numPending", numPending, - "total", lenRequesters) + r.Logger.Debug( + "consensus ticker", + "num_pending", numPending, + "total", lenRequesters, + "height", height, + ) switch { - case bcR.pool.IsCaughtUp(): - bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) + case r.pool.IsCaughtUp(): + r.Logger.Info("switching to consensus reactor", "height", height) + case time.Since(lastAdvance) > syncTimeout: - bcR.Logger.Error(fmt.Sprintf("No progress since last advance: %v", lastAdvance)) + r.Logger.Error("no progress since last advance", "last_advance", lastAdvance) + default: - bcR.Logger.Info("Not caught up yet", - "height", height, "max_peer_height", bcR.pool.MaxPeerHeight(), - "num_peers", outbound+inbound, - "timeout_in", syncTimeout-time.Since(lastAdvance)) + r.Logger.Info( + "not caught up yet", + "height", height, + "max_peer_height", r.pool.MaxPeerHeight(), + "timeout_in", syncTimeout-time.Since(lastAdvance), + ) continue } - if err := bcR.pool.Stop(); err != nil { - bcR.Logger.Error("Error stopping pool", "err", err) + if err := r.pool.Stop(); err != nil { + r.Logger.Error("failed to stop pool", "err", err) } - conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) - if ok { - conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced) + + if r.consReactor != nil { + r.consReactor.SwitchToConsensus(state, blocksSynced > 0 || stateSynced) } break FOR_LOOP - case <-trySyncTicker.C: // chan time + case <-trySyncTicker.C: select { case didProcessCh <- struct{}{}: default: } case <-didProcessCh: - // NOTE: It is a subtle mistake to process more than a single block - // at a time (e.g. 10) here, because we only TrySend 1 request per - // loop. The ratio mismatch can result in starving of blocks, a - // sudden burst of requests and responses, and repeat. - // Consequently, it is better to split these routines rather than - // coupling them as it's written here. TODO uncouple from request - // routine. - - // See if there are any blocks to sync. - first, second := bcR.pool.PeekTwoBlocks() - // bcR.Logger.Info("TrySync peeked", "first", first, "second", second) + // NOTE: It is a subtle mistake to process more than a single block at a + // time (e.g. 10) here, because we only send one BlockRequest per loop + // iteration. The ratio mismatch can result in starving of blocks, i.e. a + // sudden burst of requests and responses, and repeat. Consequently, it is + // better to split these routines rather than coupling them as it is + // written here. + // + // TODO: Uncouple from request routine. + + // see if there are any blocks to sync + first, second := r.pool.PeekTwoBlocks() if first == nil || second == nil { - // We need both to sync the first block. + // we need both to sync the first block continue FOR_LOOP } else { - // Try again quickly next loop. + // try again quickly next loop didProcessCh <- struct{}{} } @@ -391,71 +509,75 @@ FOR_LOOP: firstID = types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader} ) - // Finally, verify the first block using the second's commit - // NOTE: we can probably make this more efficient, but note that calling + // Finally, verify the first block using the second's commit. + // + // NOTE: We can probably make this more efficient, but note that calling // first.Hash() doesn't verify the tx contents, so MakePartSet() is // currently necessary. err := state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit) if err != nil { err = fmt.Errorf("invalid last commit: %w", err) - bcR.Logger.Error(err.Error(), - "last_commit", second.LastCommit, "block_id", firstID, "height", first.Height) - - peerID := bcR.pool.RedoRequest(first.Height) - peer := bcR.Switch.Peers().Get(peerID) - if peer != nil { - // NOTE: we've already removed the peer's request, but we still need - // to clean up the rest. - bcR.Switch.StopPeerForError(peer, err) + r.Logger.Error( + err.Error(), + "last_commit", second.LastCommit, + "block_id", firstID, + "height", first.Height, + ) + + // NOTE: We've already removed the peer's request, but we still need + // to clean up the rest. + peerID := r.pool.RedoRequest(first.Height) + r.blockchainCh.Error() <- p2p.PeerError{ + PeerID: peerID, + Err: err, + Severity: p2p.PeerErrorSeverityLow, } - peerID2 := bcR.pool.RedoRequest(second.Height) + peerID2 := r.pool.RedoRequest(second.Height) if peerID2 != peerID { - if peer2 := bcR.Switch.Peers().Get(peerID2); peer2 != nil { - bcR.Switch.StopPeerForError(peer2, err) + r.blockchainCh.Error() <- p2p.PeerError{ + PeerID: peerID2, + Err: err, + Severity: p2p.PeerErrorSeverityLow, } } continue FOR_LOOP } else { - bcR.pool.PopRequest() + r.pool.PopRequest() - // TODO: batch saves so we dont persist to disk every block - bcR.store.SaveBlock(first, firstParts, second.LastCommit) + // TODO: batch saves so we do not persist to disk every block + r.store.SaveBlock(first, firstParts, second.LastCommit) - // TODO: same thing for app - but we would need a way to get the hash - // without persisting the state. var err error - state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first) + + // TODO: Same thing for app - but we would need a way to get the hash + // without persisting the state. + state, _, err = r.blockExec.ApplyBlock(state, firstID, first) if err != nil { - // TODO This is bad, are we zombie? - panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) + // TODO: This is bad, are we zombie? + panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } + blocksSynced++ if blocksSynced%100 == 0 { lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) - bcR.Logger.Info("Fast Sync Rate", - "height", bcR.pool.height, "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate) + r.Logger.Info( + "fast sync rate", + "height", r.pool.height, + "max_peer_height", r.pool.MaxPeerHeight(), + "blocks/s", lastRate, + ) + lastHundred = time.Now() } } + continue FOR_LOOP - case <-bcR.Quit(): + case <-r.closeCh: break FOR_LOOP } } } - -// BroadcastStatusRequest broadcasts `BlockStore` base and height. -func (bcR *BlockchainReactor) BroadcastStatusRequest() { - bm, err := bc.EncodeMsg(&bcproto.StatusRequest{}) - if err != nil { - bcR.Logger.Error("could not convert StatusRequest to proto", "err", err) - return - } - - // We don't care about whenever broadcast is successful or not. - _ = bcR.Switch.Broadcast(BlockchainChannel, bm) -} diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index a88b499f4..163853a9a 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -2,71 +2,59 @@ package v0 import ( "fmt" + "math/rand" "os" - "sort" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - dbm "github.com/tendermint/tm-db" - abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/mempool/mock" "github.com/tendermint/tendermint/p2p" + bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" - tmtime "github.com/tendermint/tendermint/types/time" + dbm "github.com/tendermint/tm-db" ) -var config *cfg.Config +var rng = rand.New(rand.NewSource(time.Now().UnixNano())) -func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) { - validators := make([]types.GenesisValidator, numValidators) - privValidators := make([]types.PrivValidator, numValidators) - for i := 0; i < numValidators; i++ { - val, privVal := types.RandValidator(randPower, minPower) - validators[i] = types.GenesisValidator{ - PubKey: val.PubKey, - Power: val.VotingPower, - } - privValidators[i] = privVal - } - sort.Sort(types.PrivValidatorsByAddress(privValidators)) +type reactorTestSuite struct { + reactor *Reactor + app proxy.AppConns - return &types.GenesisDoc{ - GenesisTime: tmtime.Now(), - ChainID: config.ChainID(), - Validators: validators, - }, privValidators -} + peerID p2p.NodeID -type BlockchainReactorPair struct { - reactor *BlockchainReactor - app proxy.AppConns + blockchainChannel *p2p.Channel + blockchainInCh chan p2p.Envelope + blockchainOutCh chan p2p.Envelope + blockchainPeerErrCh chan p2p.PeerError + + peerUpdatesCh chan p2p.PeerUpdate + peerUpdates *p2p.PeerUpdatesCh } -func newBlockchainReactor( - logger log.Logger, +func setup( + t *testing.T, genDoc *types.GenesisDoc, privVals []types.PrivValidator, - maxBlockHeight int64) BlockchainReactorPair { - if len(privVals) != 1 { - panic("only support one validator") - } + maxBlockHeight int64, + chBuf uint, +) *reactorTestSuite { + t.Helper() - app := &testApp{} + require.Len(t, privVals, 1, "only one validator can be supported") + + app := &abci.BaseApplication{} cc := proxy.NewLocalClientCreator(app) + proxyApp := proxy.NewAppConns(cc) - err := proxyApp.Start() - if err != nil { - panic(fmt.Errorf("error start app: %w", err)) - } + require.NoError(t, proxyApp.Start()) blockDB := dbm.NewMemDB() stateDB := dbm.NewMemDB() @@ -74,25 +62,24 @@ func newBlockchainReactor( blockStore := store.NewBlockStore(blockDB) state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) - if err != nil { - panic(fmt.Errorf("error constructing state from genesis file: %w", err)) - } + require.NoError(t, err) - // Make the BlockchainReactor itself. - // NOTE we have to create and commit the blocks first because - // pool.height is determined from the store. fastSync := true db := dbm.NewMemDB() stateStore = sm.NewStore(db) - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), - mock.Mempool{}, sm.EmptyEvidencePool{}) - if err = stateStore.Save(state); err != nil { - panic(err) - } - // let's add some blocks in + blockExec := sm.NewBlockExecutor( + stateStore, + log.TestingLogger(), + proxyApp.Consensus(), + mock.Mempool{}, + sm.EmptyEvidencePool{}, + ) + require.NoError(t, stateStore.Save(state)) + for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil) + if blockHeight > 1 { lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1) lastBlock := blockStore.LoadBlock(blockHeight - 1) @@ -105,60 +92,197 @@ func newBlockchainReactor( lastBlock.Header.ChainID, time.Now(), ) - if err != nil { - panic(err) - } - lastCommit = types.NewCommit(vote.Height, vote.Round, - lastBlockMeta.BlockID, []types.CommitSig{vote.CommitSig()}) + require.NoError(t, err) + + lastCommit = types.NewCommit( + vote.Height, + vote.Round, + lastBlockMeta.BlockID, + []types.CommitSig{vote.CommitSig()}, + ) } thisBlock := makeBlock(blockHeight, state, lastCommit) - thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock) - if err != nil { - panic(fmt.Errorf("error apply block: %w", err)) - } + require.NoError(t, err) blockStore.SaveBlock(thisBlock, thisParts, lastCommit) } - bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - bcReactor.SetLogger(logger.With("module", "blockchain")) + pID := make([]byte, 16) + _, err = rng.Read(pID) + require.NoError(t, err) + + peerUpdatesCh := make(chan p2p.PeerUpdate, chBuf) + + rts := &reactorTestSuite{ + app: proxyApp, + blockchainInCh: make(chan p2p.Envelope, chBuf), + blockchainOutCh: make(chan p2p.Envelope, chBuf), + blockchainPeerErrCh: make(chan p2p.PeerError, chBuf), + peerUpdatesCh: peerUpdatesCh, + peerUpdates: p2p.NewPeerUpdates(peerUpdatesCh), + peerID: p2p.NodeID(fmt.Sprintf("%x", pID)), + } + + rts.blockchainChannel = p2p.NewChannel( + BlockchainChannel, + new(bcproto.Message), + rts.blockchainInCh, + rts.blockchainOutCh, + rts.blockchainPeerErrCh, + ) + + reactor, err := NewReactor( + log.TestingLogger().With("module", "blockchain", "node", rts.peerID), + state.Copy(), + blockExec, + blockStore, + nil, + rts.blockchainChannel, + rts.peerUpdates, + fastSync, + ) + + require.NoError(t, err) + rts.reactor = reactor + + require.NoError(t, rts.reactor.Start()) + require.True(t, rts.reactor.IsRunning()) + + t.Cleanup(func() { + require.NoError(t, rts.reactor.Stop()) + require.NoError(t, rts.app.Stop()) + require.False(t, rts.reactor.IsRunning()) + }) + + return rts +} - return BlockchainReactorPair{bcReactor, proxyApp} +func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite, dropChErr bool) { + // create a mapping for efficient suite lookup by peer ID + suitesByPeerID := make(map[p2p.NodeID]*reactorTestSuite) + for _, suite := range suites { + suitesByPeerID[suite.peerID] = suite + } + + // Simulate a router by listening for all outbound envelopes and proxying the + // envelope to the respective peer (suite). + go func() { + for envelope := range primary.blockchainOutCh { + if envelope.Broadcast { + for _, s := range suites { + // broadcast to everyone except source + if s.peerID != primary.peerID { + s.blockchainInCh <- p2p.Envelope{ + From: primary.peerID, + To: s.peerID, + Message: envelope.Message, + } + } + } + } else { + suitesByPeerID[envelope.To].blockchainInCh <- p2p.Envelope{ + From: primary.peerID, + To: envelope.To, + Message: envelope.Message, + } + } + } + }() + + go func() { + for pErr := range primary.blockchainPeerErrCh { + if dropChErr { + primary.reactor.Logger.Debug("dropped peer error", "err", pErr.Err) + } else { + primary.peerUpdatesCh <- p2p.PeerUpdate{ + PeerID: pErr.PeerID, + Status: p2p.PeerStatusRemoved, + } + } + } + }() } -func TestNoBlockResponse(t *testing.T) { - config = cfg.ResetTestRoot("blockchain_reactor_test") +func TestReactor_AbruptDisconnect(t *testing.T) { + config := cfg.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) - genDoc, privVals := randGenesisDoc(1, false, 30) - maxBlockHeight := int64(65) + genDoc, privVals := randGenesisDoc(config, 1, false, 30) + maxBlockHeight := int64(64) + testSuites := []*reactorTestSuite{ + setup(t, genDoc, privVals, maxBlockHeight, 0), + setup(t, genDoc, privVals, 0, 0), + } - reactorPairs := make([]BlockchainReactorPair, 2) + require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height()) - reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) - reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + for _, s := range testSuites { + simulateRouter(s, testSuites, true) - p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) - return s + // connect reactor to every other reactor + for _, ss := range testSuites { + if s.peerID != ss.peerID { + s.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: ss.peerID, + } + } + } + } - }, p2p.Connect2Switches) + secondaryPool := testSuites[1].reactor.pool + require.Eventually( + t, + func() bool { + height, _, _ := secondaryPool.GetStatus() + return secondaryPool.MaxPeerHeight() > 0 && height > 0 && height < 10 + }, + 10*time.Second, + 10*time.Millisecond, + "expected node to be partially synced", + ) + + // Remove synced node from the syncing node which should not result in any + // deadlocks or race conditions within the context of poolRoutine. + testSuites[1].peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusDown, + PeerID: testSuites[0].peerID, + } +} - defer func() { - for _, r := range reactorPairs { - err := r.reactor.Stop() - require.NoError(t, err) - err = r.app.Stop() - require.NoError(t, err) +func TestReactor_NoBlockResponse(t *testing.T) { + config := cfg.ResetTestRoot("blockchain_reactor_test") + defer os.RemoveAll(config.RootDir) + + genDoc, privVals := randGenesisDoc(config, 1, false, 30) + maxBlockHeight := int64(65) + testSuites := []*reactorTestSuite{ + setup(t, genDoc, privVals, maxBlockHeight, 0), + setup(t, genDoc, privVals, 0, 0), + } + + require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height()) + + for _, s := range testSuites { + simulateRouter(s, testSuites, true) + + // connect reactor to every other reactor + for _, ss := range testSuites { + if s.peerID != ss.peerID { + s.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: ss.peerID, + } + } } - }() + } - tests := []struct { + testCases := []struct { height int64 existent bool }{ @@ -168,161 +292,114 @@ func TestNoBlockResponse(t *testing.T) { {100, false}, } - for { - if reactorPairs[1].reactor.pool.IsCaughtUp() { - break - } - - time.Sleep(10 * time.Millisecond) - } - - assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height()) - - for _, tt := range tests { - block := reactorPairs[1].reactor.store.LoadBlock(tt.height) - if tt.existent { - assert.True(t, block != nil) + secondaryPool := testSuites[1].reactor.pool + require.Eventually( + t, + func() bool { return secondaryPool.MaxPeerHeight() > 0 && secondaryPool.IsCaughtUp() }, + 10*time.Second, + 10*time.Millisecond, + "expected node to be fully synced", + ) + + for _, tc := range testCases { + block := testSuites[1].reactor.store.LoadBlock(tc.height) + if tc.existent { + require.True(t, block != nil) } else { - assert.True(t, block == nil) + require.Nil(t, block) } } } -// NOTE: This is too hard to test without -// an easy way to add test peer to switch -// or without significant refactoring of the module. -// Alternatively we could actually dial a TCP conn but -// that seems extreme. -func TestBadBlockStopsPeer(t *testing.T) { - config = cfg.ResetTestRoot("blockchain_reactor_test") +func TestReactor_BadBlockStopsPeer(t *testing.T) { + config := cfg.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) - genDoc, privVals := randGenesisDoc(1, false, 30) - - maxBlockHeight := int64(148) - - // Other chain needs a different validator set - otherGenDoc, otherPrivVals := randGenesisDoc(1, false, 30) - otherChain := newBlockchainReactor(log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight) - defer func() { - err := otherChain.reactor.Stop() - require.Error(t, err) - err = otherChain.app.Stop() - require.NoError(t, err) - }() - - reactorPairs := make([]BlockchainReactorPair, 4) - - reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) - reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + maxBlockHeight := int64(48) + genDoc, privVals := randGenesisDoc(config, 1, false, 30) - switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) - return s + testSuites := []*reactorTestSuite{ + setup(t, genDoc, privVals, maxBlockHeight, 1000), // fully synced node + setup(t, genDoc, privVals, 0, 1000), + setup(t, genDoc, privVals, 0, 1000), + setup(t, genDoc, privVals, 0, 1000), + setup(t, genDoc, privVals, 0, 1000), // new node + } - }, p2p.Connect2Switches) + require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height()) - defer func() { - for _, r := range reactorPairs { - err := r.reactor.Stop() - require.NoError(t, err) + for _, s := range testSuites[:len(testSuites)-1] { + simulateRouter(s, testSuites, true) - err = r.app.Stop() - require.NoError(t, err) - } - }() - - for { - time.Sleep(1 * time.Second) - caughtUp := true - for _, r := range reactorPairs { - if !r.reactor.pool.IsCaughtUp() { - caughtUp = false + // connect reactor to every other reactor except the new node + for _, ss := range testSuites[:len(testSuites)-1] { + if s.peerID != ss.peerID { + s.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: ss.peerID, + } } } - if caughtUp { - break - } } - // at this time, reactors[0-3] is the newest - assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size()) - - // Mark reactorPairs[3] as an invalid peer. Fiddling with .store without a mutex is a data - // race, but can't be easily avoided. - reactorPairs[3].reactor.store = otherChain.reactor.store - - lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - reactorPairs = append(reactorPairs, lastReactorPair) - - switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) - return s + require.Eventually( + t, + func() bool { + caughtUp := true + for _, s := range testSuites[1 : len(testSuites)-1] { + if s.reactor.pool.MaxPeerHeight() == 0 || !s.reactor.pool.IsCaughtUp() { + caughtUp = false + } + } - }, p2p.Connect2Switches)...) + return caughtUp + }, + 10*time.Minute, + 10*time.Millisecond, + "expected all nodes to be fully synced", + ) - for i := 0; i < len(reactorPairs)-1; i++ { - p2p.Connect2Switches(switches, i, len(reactorPairs)-1) + for _, s := range testSuites[:len(testSuites)-1] { + require.Len(t, s.reactor.pool.peers, 3) } - for { - if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { - break - } + // Mark testSuites[3] as an invalid peer which will cause newSuite to disconnect + // from this peer. + otherGenDoc, otherPrivVals := randGenesisDoc(config, 1, false, 30) + otherSuite := setup(t, otherGenDoc, otherPrivVals, maxBlockHeight, 0) + testSuites[3].reactor.store = otherSuite.reactor.store - time.Sleep(1 * time.Second) - } + // add a fake peer just so we do not wait for the consensus ticker to timeout + otherSuite.reactor.pool.SetPeerRange("00ff", 10, 10) - assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) -} - -//---------------------------------------------- -// utility funcs + // start the new peer's faux router + newSuite := testSuites[len(testSuites)-1] + simulateRouter(newSuite, testSuites, false) -func makeTxs(height int64) (txs []types.Tx) { - for i := 0; i < 10; i++ { - txs = append(txs, types.Tx([]byte{byte(height), byte(i)})) + // connect all nodes to the new peer + for _, s := range testSuites[:len(testSuites)-1] { + newSuite.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: s.peerID, + } } - return txs -} - -func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block { - block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address) - return block -} - -type testApp struct { - abci.BaseApplication -} - -var _ abci.Application = (*testApp)(nil) - -func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) { - return abci.ResponseInfo{} -} - -func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock { - return abci.ResponseBeginBlock{} -} - -func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock { - return abci.ResponseEndBlock{} -} - -func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx { - return abci.ResponseDeliverTx{Events: []abci.Event{}} -} - -func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { - return abci.ResponseCheckTx{} -} - -func (app *testApp) Commit() abci.ResponseCommit { - return abci.ResponseCommit{} -} -func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) { - return + // wait for the new peer to catch up and become fully synced + require.Eventually( + t, + func() bool { return newSuite.reactor.pool.MaxPeerHeight() > 0 && newSuite.reactor.pool.IsCaughtUp() }, + 10*time.Minute, + 10*time.Millisecond, + "expected new node to be fully synced", + ) + + require.Eventuallyf( + t, + func() bool { return len(newSuite.reactor.pool.peers) < len(testSuites)-1 }, + 10*time.Minute, + 10*time.Millisecond, + "invalid number of peers; expected < %d, got: %d", + len(testSuites)-1, + len(newSuite.reactor.pool.peers), + ) } diff --git a/blockchain/v0/test_util.go b/blockchain/v0/test_util.go new file mode 100644 index 000000000..f9b119d14 --- /dev/null +++ b/blockchain/v0/test_util.go @@ -0,0 +1,50 @@ +package v0 + +import ( + "sort" + + cfg "github.com/tendermint/tendermint/config" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" + tmtime "github.com/tendermint/tendermint/types/time" +) + +func randGenesisDoc( + config *cfg.Config, + numValidators int, + randPower bool, + minPower int64, +) (*types.GenesisDoc, []types.PrivValidator) { + validators := make([]types.GenesisValidator, numValidators) + privValidators := make([]types.PrivValidator, numValidators) + + for i := 0; i < numValidators; i++ { + val, privVal := types.RandValidator(randPower, minPower) + validators[i] = types.GenesisValidator{ + PubKey: val.PubKey, + Power: val.VotingPower, + } + + privValidators[i] = privVal + } + + sort.Sort(types.PrivValidatorsByAddress(privValidators)) + + return &types.GenesisDoc{ + GenesisTime: tmtime.Now(), + ChainID: config.ChainID(), + Validators: validators, + }, privValidators +} + +func makeTxs(height int64) (txs []types.Tx) { + for i := 0; i < 10; i++ { + txs = append(txs, types.Tx([]byte{byte(height), byte(i)})) + } + return txs +} + +func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block { + block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address) + return block +} diff --git a/blockchain/v2/io.go b/blockchain/v2/io.go index 69995e4c5..d1c7c2256 100644 --- a/blockchain/v2/io.go +++ b/blockchain/v2/io.go @@ -3,7 +3,7 @@ package v2 import ( "errors" - bc "github.com/tendermint/tendermint/blockchain" + "github.com/gogo/protobuf/proto" "github.com/tendermint/tendermint/p2p" bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" "github.com/tendermint/tendermint/state" @@ -48,7 +48,15 @@ type consensusReactor interface { } func (sio *switchIO) sendBlockRequest(peer p2p.Peer, height int64) error { - msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: height}) + msgProto := &bcproto.Message{ + Sum: &bcproto.Message_BlockRequest{ + BlockRequest: &bcproto.BlockRequest{ + Height: height, + }, + }, + } + + msgBytes, err := proto.Marshal(msgProto) if err != nil { return err } @@ -61,7 +69,16 @@ func (sio *switchIO) sendBlockRequest(peer p2p.Peer, height int64) error { } func (sio *switchIO) sendStatusResponse(base int64, height int64, peer p2p.Peer) error { - msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{Height: height, Base: base}) + msgProto := &bcproto.Message{ + Sum: &bcproto.Message_StatusResponse{ + StatusResponse: &bcproto.StatusResponse{ + Height: height, + Base: base, + }, + }, + } + + msgBytes, err := proto.Marshal(msgProto) if err != nil { return err } @@ -83,10 +100,19 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peer p2p.Peer) error { return err } - msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: bpb}) + msgProto := &bcproto.Message{ + Sum: &bcproto.Message_BlockResponse{ + BlockResponse: &bcproto.BlockResponse{ + Block: bpb, + }, + }, + } + + msgBytes, err := proto.Marshal(msgProto) if err != nil { return err } + if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued { return errPeerQueueFull } @@ -95,7 +121,15 @@ func (sio *switchIO) sendBlockToPeer(block *types.Block, peer p2p.Peer) error { } func (sio *switchIO) sendBlockNotFound(height int64, peer p2p.Peer) error { - msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: height}) + msgProto := &bcproto.Message{ + Sum: &bcproto.Message_NoBlockResponse{ + NoBlockResponse: &bcproto.NoBlockResponse{ + Height: height, + }, + }, + } + + msgBytes, err := proto.Marshal(msgProto) if err != nil { return err } @@ -116,7 +150,13 @@ func (sio *switchIO) trySwitchToConsensus(state state.State, skipWAL bool) bool } func (sio *switchIO) sendStatusRequest(peer p2p.Peer) error { - msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{}) + msgProto := &bcproto.Message{ + Sum: &bcproto.Message_StatusRequest{ + StatusRequest: &bcproto.StatusRequest{}, + }, + } + + msgBytes, err := proto.Marshal(msgProto) if err != nil { return err } @@ -129,7 +169,13 @@ func (sio *switchIO) sendStatusRequest(peer p2p.Peer) error { } func (sio *switchIO) broadcastStatusRequest() error { - msgBytes, err := bc.EncodeMsg(&bcproto.StatusRequest{}) + msgProto := &bcproto.Message{ + Sum: &bcproto.Message_StatusRequest{ + StatusRequest: &bcproto.StatusRequest{}, + }, + } + + msgBytes, err := proto.Marshal(msgProto) if err != nil { return err } diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 94f3cc5ac..4b37439a3 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + proto "github.com/gogo/protobuf/proto" + "github.com/tendermint/tendermint/behaviour" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/libs/log" @@ -466,49 +468,54 @@ func (r *BlockchainReactor) Stop() error { func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { logger := r.logger.With("src", src.ID(), "chID", chID) - msg, err := bc.DecodeMsg(msgBytes) - if err != nil { + msgProto := new(bcproto.Message) + + if err := proto.Unmarshal(msgBytes, msgProto); err != nil { logger.Error("error decoding message", "err", err) _ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error())) return } - if err = bc.ValidateMsg(msg); err != nil { - logger.Error("peer sent us invalid msg", "msg", msg, "err", err) + if err := msgProto.Validate(); err != nil { + logger.Error("peer sent us an invalid msg", "msg", msgProto, "err", err) _ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error())) return } - r.logger.Debug("Receive", "msg", msg) + r.logger.Debug("received", "msg", msgProto) - switch msg := msg.(type) { - case *bcproto.StatusRequest: + switch msg := msgProto.Sum.(type) { + case *bcproto.Message_StatusRequest: if err := r.io.sendStatusResponse(r.store.Base(), r.store.Height(), src); err != nil { logger.Error("Could not send status message to src peer") } - case *bcproto.BlockRequest: - block := r.store.LoadBlock(msg.Height) + case *bcproto.Message_BlockRequest: + block := r.store.LoadBlock(msg.BlockRequest.Height) if block != nil { - if err = r.io.sendBlockToPeer(block, src); err != nil { + if err := r.io.sendBlockToPeer(block, src); err != nil { logger.Error("Could not send block message to src peer", "err", err) } } else { - logger.Info("peer asking for a block we don't have", "height", msg.Height) - if err = r.io.sendBlockNotFound(msg.Height, src); err != nil { + logger.Info("peer asking for a block we don't have", "height", msg.BlockRequest.Height) + if err := r.io.sendBlockNotFound(msg.BlockRequest.Height, src); err != nil { logger.Error("Couldn't send block not found msg", "err", err) } } - case *bcproto.StatusResponse: + case *bcproto.Message_StatusResponse: r.mtx.RLock() if r.events != nil { - r.events <- bcStatusResponse{peerID: src.ID(), base: msg.Base, height: msg.Height} + r.events <- bcStatusResponse{ + peerID: src.ID(), + base: msg.StatusResponse.Base, + height: msg.StatusResponse.Height, + } } r.mtx.RUnlock() - case *bcproto.BlockResponse: - bi, err := types.BlockFromProto(msg.Block) + case *bcproto.Message_BlockResponse: + bi, err := types.BlockFromProto(msg.BlockResponse.Block) if err != nil { logger.Error("error transitioning block from protobuf", "err", err) _ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error())) @@ -525,10 +532,14 @@ func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { } r.mtx.RUnlock() - case *bcproto.NoBlockResponse: + case *bcproto.Message_NoBlockResponse: r.mtx.RLock() if r.events != nil { - r.events <- bcNoBlockResponse{peerID: src.ID(), height: msg.Height, time: time.Now()} + r.events <- bcNoBlockResponse{ + peerID: src.ID(), + height: msg.NoBlockResponse.Height, + time: time.Now(), + } } r.mtx.RUnlock() } diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index fe0154f81..099d311a0 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -9,13 +9,13 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/behaviour" - bc "github.com/tendermint/tendermint/blockchain" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" @@ -409,23 +409,37 @@ func TestReactorHelperMode(t *testing.T) { switch ev := step.event.(type) { case bcproto.StatusRequest: old := mockSwitch.numStatusResponse - msg, err := bc.EncodeMsg(&ev) - assert.NoError(t, err) - reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg) + + msgProto := new(bcproto.Message) + require.NoError(t, msgProto.Wrap(&ev)) + + msgBz, err := proto.Marshal(msgProto) + require.NoError(t, err) + + reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz) assert.Equal(t, old+1, mockSwitch.numStatusResponse) case bcproto.BlockRequest: if ev.Height > params.startHeight { old := mockSwitch.numNoBlockResponse - msg, err := bc.EncodeMsg(&ev) - assert.NoError(t, err) - reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg) + + msgProto := new(bcproto.Message) + require.NoError(t, msgProto.Wrap(&ev)) + + msgBz, err := proto.Marshal(msgProto) + require.NoError(t, err) + + reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz) assert.Equal(t, old+1, mockSwitch.numNoBlockResponse) } else { old := mockSwitch.numBlockResponse - msg, err := bc.EncodeMsg(&ev) - assert.NoError(t, err) - assert.NoError(t, err) - reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msg) + + msgProto := new(bcproto.Message) + require.NoError(t, msgProto.Wrap(&ev)) + + msgBz, err := proto.Marshal(msgProto) + require.NoError(t, err) + + reactor.Receive(channelID, mockPeer{id: p2p.NodeID(step.peer)}, msgBz) assert.Equal(t, old+1, mockSwitch.numBlockResponse) } } diff --git a/node/node.go b/node/node.go index 06161cde7..09cdc2896 100644 --- a/node/node.go +++ b/node/node.go @@ -192,7 +192,7 @@ type Node struct { eventBus *types.EventBus // pub/sub for services stateStore sm.Store blockStore *store.BlockStore // store the blockchain to disk - bcReactor p2p.Reactor // for fast-syncing + bcReactor service.Service // for fast-syncing mempoolReactor *mempl.Reactor // for gossipping transactions mempool mempl.Mempool stateSync bool // whether the node should state sync on startup @@ -370,24 +370,41 @@ func createEvidenceReactor( return evidenceReactorShim, evidenceReactor, evidencePool, nil } -func createBlockchainReactor(config *cfg.Config, +func createBlockchainReactor( + logger log.Logger, + config *cfg.Config, state sm.State, blockExec *sm.BlockExecutor, blockStore *store.BlockStore, + csReactor *cs.Reactor, fastSync bool, - logger log.Logger) (bcReactor p2p.Reactor, err error) { +) (*p2p.ReactorShim, service.Service, error) { + + logger = logger.With("module", "blockchain") switch config.FastSync.Version { case "v0": - bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims) + + reactor, err := bcv0.NewReactor( + logger, state.Copy(), blockExec, blockStore, csReactor, + reactorShim.GetChannel(bcv0.BlockchainChannel), reactorShim.PeerUpdates, fastSync, + ) + if err != nil { + return nil, nil, err + } + + return reactorShim, reactor, nil + case "v2": - bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + reactor.SetLogger(logger) + + return nil, reactor, nil + default: - return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) + return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) } - - bcReactor.SetLogger(logger.With("module", "blockchain")) - return bcReactor, nil } func createConsensusReactor(config *cfg.Config, @@ -747,12 +764,28 @@ func NewNode(config *cfg.Config, sm.BlockExecutorWithMetrics(smMetrics), ) - // Make BlockchainReactor. Don't start fast sync if we're doing a state sync first. - bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, fastSync && !stateSync, logger) + csReactor, csState := createConsensusReactor( + config, state, blockExec, blockStore, mempool, evPool, + privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, + ) + + // Create the blockchain reactor. Note, we do not start fast sync if we're + // doing a state sync first. + bcReactorShim, bcReactor, err := createBlockchainReactor( + logger, config, state, blockExec, blockStore, csReactor, fastSync && !stateSync, + ) if err != nil { return nil, fmt.Errorf("could not create blockchain reactor: %w", err) } + // TODO: Remove this once the switch is removed. + var bcReactorForSwitch p2p.Reactor + if bcReactorShim != nil { + bcReactorForSwitch = bcReactorShim + } else { + bcReactorForSwitch = bcReactor.(p2p.Reactor) + } + // Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first. // FIXME We need to update metrics here, since other reactors don't have access to them. if stateSync { @@ -761,11 +794,6 @@ func NewNode(config *cfg.Config, csMetrics.FastSyncing.Set(1) } - consensusReactor, consensusState := createConsensusReactor( - config, state, blockExec, blockStore, mempool, evPool, - privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, - ) - // Set up state sync reactor, and schedule a sync if requested. // FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy, // we should clean this whole thing up. See: @@ -791,8 +819,8 @@ func NewNode(config *cfg.Config, p2pLogger := logger.With("module", "p2p") transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) sw := createSwitch( - config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, - stateSyncReactorShim, consensusReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, + config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactorForSwitch, + stateSyncReactorShim, csReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, ) err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) @@ -850,8 +878,8 @@ func NewNode(config *cfg.Config, bcReactor: bcReactor, mempoolReactor: mempoolReactor, mempool: mempool, - consensusState: consensusState, - consensusReactor: consensusReactor, + consensusState: csState, + consensusReactor: csReactor, stateSyncReactor: stateSyncReactor, stateSync: stateSync, stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state @@ -924,6 +952,13 @@ func (n *Node) OnStart() error { n.isListening = true + if n.config.FastSync.Version == "v0" { + // Start the real blockchain reactor separately since the switch uses the shim. + if err := n.bcReactor.Start(); err != nil { + return err + } + } + // Start the real state sync reactor separately since the switch uses the shim. if err := n.stateSyncReactor.Start(); err != nil { return err @@ -975,6 +1010,13 @@ func (n *Node) OnStop() { n.Logger.Error("Error closing switch", "err", err) } + if n.config.FastSync.Version == "v0" { + // Stop the real blockchain reactor separately since the switch uses the shim. + if err := n.bcReactor.Stop(); err != nil { + n.Logger.Error("failed to stop the blockchain reactor", "err", err) + } + } + // Stop the real state sync reactor separately since the switch uses the shim. if err := n.stateSyncReactor.Stop(); err != nil { n.Logger.Error("failed to stop the state sync reactor", "err", err) @@ -1286,9 +1328,11 @@ func makeNodeInfo( var bcChannel byte switch config.FastSync.Version { case "v0": - bcChannel = bcv0.BlockchainChannel + bcChannel = byte(bcv0.BlockchainChannel) + case "v2": bcChannel = bcv2.BlockchainChannel + default: return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) } diff --git a/proto/tendermint/blockchain/message.go b/proto/tendermint/blockchain/message.go new file mode 100644 index 000000000..194b48655 --- /dev/null +++ b/proto/tendermint/blockchain/message.go @@ -0,0 +1,107 @@ +package blockchain + +import ( + "errors" + fmt "fmt" + + proto "github.com/gogo/protobuf/proto" +) + +const ( + BlockResponseMessagePrefixSize = 4 + BlockResponseMessageFieldKeySize = 1 +) + +// Wrap implements the p2p Wrapper interface and wraps a blockchain messages. +func (m *Message) Wrap(pb proto.Message) error { + switch msg := pb.(type) { + case *BlockRequest: + m.Sum = &Message_BlockRequest{BlockRequest: msg} + + case *BlockResponse: + m.Sum = &Message_BlockResponse{BlockResponse: msg} + + case *NoBlockResponse: + m.Sum = &Message_NoBlockResponse{NoBlockResponse: msg} + + case *StatusRequest: + m.Sum = &Message_StatusRequest{StatusRequest: msg} + + case *StatusResponse: + m.Sum = &Message_StatusResponse{StatusResponse: msg} + + default: + return fmt.Errorf("unknown message: %T", msg) + } + + return nil +} + +// Unwrap implements the p2p Wrapper interface and unwraps a wrapped blockchain +// message. +func (m *Message) Unwrap() (proto.Message, error) { + switch msg := m.Sum.(type) { + case *Message_BlockRequest: + return m.GetBlockRequest(), nil + + case *Message_BlockResponse: + return m.GetBlockResponse(), nil + + case *Message_NoBlockResponse: + return m.GetNoBlockResponse(), nil + + case *Message_StatusRequest: + return m.GetStatusRequest(), nil + + case *Message_StatusResponse: + return m.GetStatusResponse(), nil + + default: + return nil, fmt.Errorf("unknown message: %T", msg) + } +} + +// Validate validates the message returning an error upon failure. +func (m *Message) Validate() error { + if m == nil { + return errors.New("message cannot be nil") + } + + switch msg := m.Sum.(type) { + case *Message_BlockRequest: + if m.GetBlockRequest().Height < 0 { + return errors.New("negative Height") + } + + case *Message_BlockResponse: + // validate basic is called later when converting from proto + return nil + + case *Message_NoBlockResponse: + if m.GetNoBlockResponse().Height < 0 { + return errors.New("negative Height") + } + + case *Message_StatusResponse: + if m.GetStatusResponse().Base < 0 { + return errors.New("negative Base") + } + if m.GetStatusResponse().Height < 0 { + return errors.New("negative Height") + } + if m.GetStatusResponse().Base > m.GetStatusResponse().Height { + return fmt.Errorf( + "base %v cannot be greater than height %v", + m.GetStatusResponse().Base, m.GetStatusResponse().Height, + ) + } + + case *Message_StatusRequest: + return nil + + default: + return fmt.Errorf("unknown message type: %T", msg) + } + + return nil +} diff --git a/blockchain/msgs_test.go b/proto/tendermint/blockchain/message_test.go similarity index 76% rename from blockchain/msgs_test.go rename to proto/tendermint/blockchain/message_test.go index df8efca14..37a0df217 100644 --- a/blockchain/msgs_test.go +++ b/proto/tendermint/blockchain/message_test.go @@ -1,19 +1,18 @@ -package blockchain +package blockchain_test import ( "encoding/hex" - "math" + math "math" "testing" - "github.com/gogo/protobuf/proto" - "github.com/stretchr/testify/assert" + proto "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" "github.com/tendermint/tendermint/types" ) -func TestBcBlockRequestMessageValidateBasic(t *testing.T) { +func TestBlockRequest_Validate(t *testing.T) { testCases := []struct { testName string requestHeight int64 @@ -27,13 +26,15 @@ func TestBcBlockRequestMessageValidateBasic(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.testName, func(t *testing.T) { - request := bcproto.BlockRequest{Height: tc.requestHeight} - assert.Equal(t, tc.expectErr, ValidateMsg(&request) != nil, "Validate Basic had an unexpected result") + msg := &bcproto.Message{} + require.NoError(t, msg.Wrap(&bcproto.BlockRequest{Height: tc.requestHeight})) + + require.Equal(t, tc.expectErr, msg.Validate() != nil) }) } } -func TestBcNoBlockResponseMessageValidateBasic(t *testing.T) { +func TestNoBlockResponse_Validate(t *testing.T) { testCases := []struct { testName string nonResponseHeight int64 @@ -47,18 +48,21 @@ func TestBcNoBlockResponseMessageValidateBasic(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.testName, func(t *testing.T) { - nonResponse := bcproto.NoBlockResponse{Height: tc.nonResponseHeight} - assert.Equal(t, tc.expectErr, ValidateMsg(&nonResponse) != nil, "Validate Basic had an unexpected result") + msg := &bcproto.Message{} + require.NoError(t, msg.Wrap(&bcproto.NoBlockResponse{Height: tc.nonResponseHeight})) + + require.Equal(t, tc.expectErr, msg.Validate() != nil) }) } } -func TestBcStatusRequestMessageValidateBasic(t *testing.T) { - request := bcproto.StatusRequest{} - assert.NoError(t, ValidateMsg(&request)) +func TestStatusRequest_Validate(t *testing.T) { + msg := &bcproto.Message{} + require.NoError(t, msg.Wrap(&bcproto.StatusRequest{})) + require.NoError(t, msg.Validate()) } -func TestBcStatusResponseMessageValidateBasic(t *testing.T) { +func TestStatusResponse_Validate(t *testing.T) { testCases := []struct { testName string responseHeight int64 @@ -72,13 +76,15 @@ func TestBcStatusResponseMessageValidateBasic(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.testName, func(t *testing.T) { - response := bcproto.StatusResponse{Height: tc.responseHeight} - assert.Equal(t, tc.expectErr, ValidateMsg(&response) != nil, "Validate Basic had an unexpected result") + msg := &bcproto.Message{} + require.NoError(t, msg.Wrap(&bcproto.StatusResponse{Height: tc.responseHeight})) + + require.Equal(t, tc.expectErr, msg.Validate() != nil) }) } } -// nolint:lll // ignore line length in tests +// nolint:lll func TestBlockchainMessageVectors(t *testing.T) { block := types.MakeBlock(int64(3), []types.Tx{types.Tx("Hello World")}, nil, nil) block.Version.Block = 11 // overwrite updated protocol version @@ -117,8 +123,8 @@ func TestBlockchainMessageVectors(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.testName, func(t *testing.T) { - bz, _ := proto.Marshal(tc.bmsg) - + bz, err := proto.Marshal(tc.bmsg) + require.NoError(t, err) require.Equal(t, tc.expBytes, hex.EncodeToString(bz)) }) } diff --git a/proto/tendermint/statesync/message.go b/proto/tendermint/statesync/message.go index 792e7f64c..fe38bda51 100644 --- a/proto/tendermint/statesync/message.go +++ b/proto/tendermint/statesync/message.go @@ -8,8 +8,8 @@ import ( ) // Wrap implements the p2p Wrapper interface and wraps a state sync messages. -func (m *Message) Wrap(msg proto.Message) error { - switch msg := msg.(type) { +func (m *Message) Wrap(pb proto.Message) error { + switch msg := pb.(type) { case *ChunkRequest: m.Sum = &Message_ChunkRequest{ChunkRequest: msg} diff --git a/statesync/reactor.go b/statesync/reactor.go index ca0be92a3..c6b48411b 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -349,7 +349,7 @@ func (r *Reactor) processSnapshotCh() { select { case envelope := <-r.snapshotCh.In(): if err := r.handleMessage(r.snapshotCh.ID(), envelope); err != nil { - r.Logger.Error("failed to process envelope", "ch_id", r.snapshotCh.ID(), "envelope", envelope, "err", err) + r.Logger.Error("failed to process message", "ch_id", r.snapshotCh.ID(), "envelope", envelope, "err", err) r.snapshotCh.Error() <- p2p.PeerError{ PeerID: envelope.From, Err: err, @@ -376,7 +376,7 @@ func (r *Reactor) processChunkCh() { select { case envelope := <-r.chunkCh.In(): if err := r.handleMessage(r.chunkCh.ID(), envelope); err != nil { - r.Logger.Error("failed to process envelope", "ch_id", r.chunkCh.ID(), "envelope", envelope, "err", err) + r.Logger.Error("failed to process message", "ch_id", r.chunkCh.ID(), "envelope", envelope, "err", err) r.chunkCh.Error() <- p2p.PeerError{ PeerID: envelope.From, Err: err, diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index dd3b684af..502aaf970 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -233,7 +233,7 @@ type Node struct { eventBus *types.EventBus // pub/sub for services stateStore sm.Store blockStore *store.BlockStore // store the blockchain to disk - bcReactor p2p.Reactor // for fast-syncing + bcReactor service.Service // for fast-syncing mempoolReactor *mempl.Reactor // for gossipping transactions mempool mempl.Mempool stateSync bool // whether the node should state sync on startup @@ -411,24 +411,41 @@ func createEvidenceReactor( return evidenceReactorShim, evidenceReactor, evidencePool, nil } -func createBlockchainReactor(config *cfg.Config, +func createBlockchainReactor( + logger log.Logger, + config *cfg.Config, state sm.State, blockExec *sm.BlockExecutor, blockStore *store.BlockStore, + csReactor *cs.Reactor, fastSync bool, - logger log.Logger) (bcReactor p2p.Reactor, err error) { +) (*p2p.ReactorShim, service.Service, error) { + + logger = logger.With("module", "blockchain") switch config.FastSync.Version { case "v0": - bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims) + + reactor, err := bcv0.NewReactor( + logger, state.Copy(), blockExec, blockStore, csReactor, + reactorShim.GetChannel(bcv0.BlockchainChannel), reactorShim.PeerUpdates, fastSync, + ) + if err != nil { + return nil, nil, err + } + + return reactorShim, reactor, nil + case "v2": - bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + reactor.SetLogger(logger) + + return nil, reactor, nil + default: - return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) + return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) } - - bcReactor.SetLogger(logger.With("module", "blockchain")) - return bcReactor, nil } func createConsensusReactor(config *cfg.Config, @@ -780,12 +797,29 @@ func NewNode(config *cfg.Config, sm.BlockExecutorWithMetrics(smMetrics), ) - // Make BlockchainReactor. Don't start fast sync if we're doing a state sync first. - bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, fastSync && !stateSync, logger) + logger.Info("Setting up maverick consensus reactor", "Misbehaviors", misbehaviors) + csReactor, csState := createConsensusReactor( + config, state, blockExec, blockStore, mempool, evPool, + privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, misbehaviors, + ) + + // Create the blockchain reactor. Note, we do not start fast sync if we're + // doing a state sync first. + bcReactorShim, bcReactor, err := createBlockchainReactor( + logger, config, state, blockExec, blockStore, csReactor, fastSync && !stateSync, + ) if err != nil { return nil, fmt.Errorf("could not create blockchain reactor: %w", err) } + // TODO: Remove this once the switch is removed. + var bcReactorForSwitch p2p.Reactor + if bcReactorShim != nil { + bcReactorForSwitch = bcReactorShim + } else { + bcReactorForSwitch = bcReactor.(p2p.Reactor) + } + // Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first. // FIXME We need to update metrics here, since other reactors don't have access to them. if stateSync { @@ -794,11 +828,6 @@ func NewNode(config *cfg.Config, csMetrics.FastSyncing.Set(1) } - logger.Info("Setting up maverick consensus reactor", "Misbehaviors", misbehaviors) - consensusReactor, consensusState := createConsensusReactor( - config, state, blockExec, blockStore, mempool, evPool, - privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, misbehaviors) - // Set up state sync reactor, and schedule a sync if requested. // FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy, // we should clean this whole thing up. See: @@ -824,8 +853,8 @@ func NewNode(config *cfg.Config, p2pLogger := logger.With("module", "p2p") transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) sw := createSwitch( - config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, - stateSyncReactorShim, consensusReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, + config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactorForSwitch, + stateSyncReactorShim, csReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, ) err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) @@ -883,8 +912,8 @@ func NewNode(config *cfg.Config, bcReactor: bcReactor, mempoolReactor: mempoolReactor, mempool: mempool, - consensusState: consensusState, - consensusReactor: consensusReactor, + consensusState: csState, + consensusReactor: csReactor, stateSyncReactor: stateSyncReactor, stateSync: stateSync, stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state @@ -957,6 +986,13 @@ func (n *Node) OnStart() error { n.isListening = true + if n.config.FastSync.Version == "v0" { + // Start the real blockchain reactor separately since the switch uses the shim. + if err := n.bcReactor.Start(); err != nil { + return err + } + } + // Start the real state sync reactor separately since the switch uses the shim. if err := n.stateSyncReactor.Start(); err != nil { return err @@ -1008,6 +1044,13 @@ func (n *Node) OnStop() { n.Logger.Error("Error closing switch", "err", err) } + if n.config.FastSync.Version == "v0" { + // Stop the real blockchain reactor separately since the switch uses the shim. + if err := n.bcReactor.Stop(); err != nil { + n.Logger.Error("failed to stop the blockchain reactor", "err", err) + } + } + // Stop the real state sync reactor separately since the switch uses the shim. if err := n.stateSyncReactor.Stop(); err != nil { n.Logger.Error("failed to stop the state sync reactor", "err", err) @@ -1317,9 +1360,11 @@ func makeNodeInfo( var bcChannel byte switch config.FastSync.Version { case "v0": - bcChannel = bcv0.BlockchainChannel + bcChannel = byte(bcv0.BlockchainChannel) + case "v2": bcChannel = bcv2.BlockchainChannel + default: return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) }