diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index c77a8b634..f055d03b9 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -208,10 +208,12 @@ func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID p2p.NodeID) { } } -// handleBlockchainMessage handles enevelopes sent from peers on the +// handleBlockchainMessage handles envelopes sent from peers on the // BlockchainChannel. It returns an error only if the Envelope.Message is unknown // for this channel. This should never be called outside of handleMessage. func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error { + logger := r.Logger.With("peer", envelope.From) + switch msg := envelope.Message.(type) { case *bcproto.BlockRequest: r.respondToPeer(msg, envelope.From) @@ -219,7 +221,7 @@ func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error { case *bcproto.BlockResponse: block, err := types.BlockFromProto(msg.Block) if err != nil { - r.Logger.Error("failed to convert block from proto", "err", err) + logger.Error("failed to convert block from proto", "err", err) return err } @@ -238,16 +240,10 @@ func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error { r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height) case *bcproto.NoBlockResponse: - r.Logger.Debug( - "peer does not have the requested block", - "height", msg.Height, - "peer", envelope.From, - ) + logger.Debug("peer does not have the requested block", "height", msg.Height) default: - r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From) return fmt.Errorf("received unknown message: %T", msg) - } return nil @@ -304,12 +300,11 @@ func (r *Reactor) processBlockchainCh() { } } -// processPeerUpdate processes a PeerUpdate, returning an error upon failing to -// handle the PeerUpdate or if a panic is recovered. +// processPeerUpdate processes a PeerUpdate. func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status) - // XXX: Pool#RedoRequest can sometimes give us an empty peer + // XXX: Pool#RedoRequest can sometimes give us an empty peer. if len(peerUpdate.PeerID) == 0 { return } diff --git a/evidence/reactor.go b/evidence/reactor.go index ef1230c57..643d9915a 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -48,21 +48,6 @@ const ( broadcastEvidenceIntervalS = 10 ) -type closer struct { - closeOnce sync.Once - doneCh chan struct{} -} - -func newCloser() *closer { - return &closer{doneCh: make(chan struct{})} -} - -func (c *closer) close() { - c.closeOnce.Do(func() { - close(c.doneCh) - }) -} - // Reactor handles evpool evidence broadcasting amongst peers. type Reactor struct { service.BaseService @@ -76,7 +61,7 @@ type Reactor struct { peerWG sync.WaitGroup mtx tmsync.Mutex - peerRoutines map[p2p.NodeID]*closer + peerRoutines map[p2p.NodeID]*tmsync.Closer } // NewReactor returns a reference to a new evidence reactor, which implements the @@ -93,7 +78,7 @@ func NewReactor( evidenceCh: evidenceCh, peerUpdates: peerUpdates, closeCh: make(chan struct{}), - peerRoutines: make(map[p2p.NodeID]*closer), + peerRoutines: make(map[p2p.NodeID]*tmsync.Closer), } r.BaseService = *service.NewBaseService(logger, "Evidence", r) @@ -121,7 +106,7 @@ func (r *Reactor) OnStart() error { func (r *Reactor) OnStop() { r.mtx.Lock() for _, c := range r.peerRoutines { - c.close() + c.Close() } r.mtx.Unlock() @@ -140,7 +125,7 @@ func (r *Reactor) OnStop() { <-r.peerUpdates.Done() } -// handleEvidenceMessage handles enevelopes sent from peers on the EvidenceChannel. +// handleEvidenceMessage handles envelopes sent from peers on the EvidenceChannel. // It returns an error only if the Envelope.Message is unknown for this channel // or if the given evidence is invalid. This should never be called outside of // handleMessage. @@ -149,8 +134,6 @@ func (r *Reactor) handleEvidenceMessage(envelope p2p.Envelope) error { switch msg := envelope.Message.(type) { case *tmproto.EvidenceList: - logger.Debug("received evidence list", "num_evidence", len(msg.Evidence)) - // TODO: Refactor the Evidence type to not contain a list since we only ever // send and receive one piece of evidence at a time. Or potentially consider // batching evidence. @@ -189,6 +172,8 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err } }() + r.Logger.Debug("received message", "message", envelope.Message, "peer", envelope.From) + switch chID { case EvidenceChannel: err = r.handleEvidenceMessage(envelope) @@ -224,10 +209,9 @@ func (r *Reactor) processEvidenceCh() { } } -// processPeerUpdate processes a PeerUpdate, returning an error upon failing to -// handle the PeerUpdate or if a panic is recovered. For new or live peers it -// will check if an evidence broadcasting goroutine needs to be started. For -// down or removed peers, it will check if an evidence broadcasting goroutine +// processPeerUpdate processes a PeerUpdate. For new or live peers it will check +// if an evidence broadcasting goroutine needs to be started. For down or +// removed peers, it will check if an evidence broadcasting goroutine // exists and signal that it should exit. // // FIXME: The peer may be behind in which case it would simply ignore the @@ -258,7 +242,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { // safely, and finally start the goroutine to broadcast evidence to that peer. _, ok := r.peerRoutines[peerUpdate.PeerID] if !ok { - closer := newCloser() + closer := tmsync.NewCloser() r.peerRoutines[peerUpdate.PeerID] = closer r.peerWG.Add(1) @@ -272,7 +256,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { // from the map of peer evidence broadcasting goroutines. closer, ok := r.peerRoutines[peerUpdate.PeerID] if ok { - closer.close() + closer.Close() } } } @@ -306,7 +290,7 @@ func (r *Reactor) processPeerUpdates() { // that the peer has already received or may not be ready for. // // REF: https://github.com/tendermint/tendermint/issues/4727 -func (r *Reactor) broadcastEvidenceLoop(peerID p2p.NodeID, closer *closer) { +func (r *Reactor) broadcastEvidenceLoop(peerID p2p.NodeID, closer *tmsync.Closer) { var next *clist.CElement defer func() { @@ -332,7 +316,7 @@ func (r *Reactor) broadcastEvidenceLoop(peerID p2p.NodeID, closer *closer) { continue } - case <-closer.doneCh: + case <-closer.Done(): // The peer is marked for removal via a PeerUpdate as the doneCh was // explicitly closed to signal we should exit. return @@ -370,7 +354,7 @@ func (r *Reactor) broadcastEvidenceLoop(peerID p2p.NodeID, closer *closer) { case <-next.NextWaitChan(): next = next.Next() - case <-closer.doneCh: + case <-closer.Done(): // The peer is marked for removal via a PeerUpdate as the doneCh was // explicitly closed to signal we should exit. return diff --git a/libs/sync/deadlock.go b/libs/sync/deadlock.go index 637d6fbb1..38627b7cb 100644 --- a/libs/sync/deadlock.go +++ b/libs/sync/deadlock.go @@ -15,3 +15,31 @@ type Mutex struct { type RWMutex struct { deadlock.RWMutex } + +// Closer implements a primitive to close a channel that signals process +// termination while allowing a caller to call Close multiple times safely. It +// should be used in cases where guarantees cannot be made about when and how +// many times closure is executed. +type Closer struct { + closeOnce deadlock.Once + doneCh chan struct{} +} + +// NewCloser returns a reference to a new Closer. +func NewCloser() *Closer { + return &Closer{doneCh: make(chan struct{})} +} + +// Done returns the internal done channel allowing the caller either block or wait +// for the Closer to be terminated/closed. +func (c *Closer) Done() <-chan struct{} { + return c.doneCh +} + +// Close gracefully closes the Closer. A caller should only call Close once, but +// it is safe to call it successive times. +func (c *Closer) Close() { + c.closeOnce.Do(func() { + close(c.doneCh) + }) +} diff --git a/libs/sync/sync.go b/libs/sync/sync.go index a0880e7de..32fe29e53 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -13,3 +13,31 @@ type Mutex struct { type RWMutex struct { sync.RWMutex } + +// Closer implements a primitive to close a channel that signals process +// termination while allowing a caller to call Close multiple times safely. It +// should be used in cases where guarantees cannot be made about when and how +// many times closure is executed. +type Closer struct { + closeOnce sync.Once + doneCh chan struct{} +} + +// NewCloser returns a reference to a new Closer. +func NewCloser() *Closer { + return &Closer{doneCh: make(chan struct{})} +} + +// Done returns the internal done channel allowing the caller either block or wait +// for the Closer to be terminated/closed. +func (c *Closer) Done() <-chan struct{} { + return c.doneCh +} + +// Close gracefully closes the Closer. A caller should only call Close once, but +// it is safe to call it successive times. +func (c *Closer) Close() { + c.closeOnce.Do(func() { + close(c.doneCh) + }) +} diff --git a/mempool/ids.go b/mempool/ids.go new file mode 100644 index 000000000..2c30ef3d0 --- /dev/null +++ b/mempool/ids.go @@ -0,0 +1,75 @@ +package mempool + +import ( + "fmt" + + tmsync "github.com/tendermint/tendermint/libs/sync" + "github.com/tendermint/tendermint/p2p" +) + +type mempoolIDs struct { + mtx tmsync.RWMutex + peerMap map[p2p.NodeID]uint16 + nextID uint16 // assumes that a node will never have over 65536 active peers + activeIDs map[uint16]struct{} // used to check if a given peerID key is used +} + +func newMempoolIDs() *mempoolIDs { + return &mempoolIDs{ + peerMap: make(map[p2p.NodeID]uint16), + + // reserve UnknownPeerID for mempoolReactor.BroadcastTx + activeIDs: map[uint16]struct{}{UnknownPeerID: {}}, + nextID: 1, + } +} + +// ReserveForPeer searches for the next unused ID and assigns it to the provided +// peer. +func (ids *mempoolIDs) ReserveForPeer(peerID p2p.NodeID) { + ids.mtx.Lock() + defer ids.mtx.Unlock() + + curID := ids.nextPeerID() + ids.peerMap[peerID] = curID + ids.activeIDs[curID] = struct{}{} +} + +// Reclaim returns the ID reserved for the peer back to unused pool. +func (ids *mempoolIDs) Reclaim(peerID p2p.NodeID) { + ids.mtx.Lock() + defer ids.mtx.Unlock() + + removedID, ok := ids.peerMap[peerID] + if ok { + delete(ids.activeIDs, removedID) + delete(ids.peerMap, peerID) + } +} + +// GetForPeer returns an ID reserved for the peer. +func (ids *mempoolIDs) GetForPeer(peerID p2p.NodeID) uint16 { + ids.mtx.RLock() + defer ids.mtx.RUnlock() + + return ids.peerMap[peerID] +} + +// nextPeerID returns the next unused peer ID to use. We assume that the mutex +// is already held. +func (ids *mempoolIDs) nextPeerID() uint16 { + if len(ids.activeIDs) == maxActiveIDs { + panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", maxActiveIDs)) + } + + _, idExists := ids.activeIDs[ids.nextID] + for idExists { + ids.nextID++ + _, idExists = ids.activeIDs[ids.nextID] + } + + curID := ids.nextID + ids.nextID++ + + return curID +} diff --git a/mempool/reactor.go b/mempool/reactor.go index e969aeb69..4cbbe1c1f 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -4,21 +4,29 @@ import ( "errors" "fmt" "math" + "sync" "time" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" tmsync "github.com/tendermint/tendermint/libs/sync" "github.com/tendermint/tendermint/p2p" protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" "github.com/tendermint/tendermint/types" ) +var ( + _ service.Service = (*Reactor)(nil) + _ p2p.Wrapper = (*protomem.Message)(nil) +) + const ( - MempoolChannel = byte(0x30) + MempoolChannel = p2p.ChannelID(0x30) - peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount + // peerCatchupSleepIntervalMS defines how much time to sleep if a peer is behind + peerCatchupSleepIntervalMS = 100 // UnknownPeerID is the peer ID to use when running CheckTx when there is // no peer (e.g. RPC) @@ -27,291 +35,367 @@ const ( maxActiveIDs = math.MaxUint16 ) -// Reactor handles mempool tx broadcasting amongst peers. -// It maintains a map from peer ID to counter, to prevent gossiping txs to the -// peers you received it from. +// PeerManager defines the interface contract required for getting necessary +// peer information. This should eventually be replaced with a message-oriented +// approach utilizing the p2p stack. +type PeerManager interface { + GetHeight(p2p.NodeID) (int64, error) +} + +// Reactor implements a service that contains mempool of txs that are broadcasted +// amongst peers. It maintains a map from peer ID to counter, to prevent gossiping +// txs to the peers you received it from. type Reactor struct { - p2p.BaseReactor + service.BaseService + config *cfg.MempoolConfig mempool *CListMempool ids *mempoolIDs -} -type mempoolIDs struct { - mtx tmsync.RWMutex - peerMap map[p2p.NodeID]uint16 - nextID uint16 // assumes that a node will never have over 65536 active peers - activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter -} + // XXX: Currently, this is the only way to get information about a peer. Ideally, + // we rely on message-oriented communication to get necessary peer data. + // ref: https://github.com/tendermint/tendermint/issues/5670 + peerMgr PeerManager -// Reserve searches for the next unused ID and assigns it to the -// peer. -func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { - ids.mtx.Lock() - defer ids.mtx.Unlock() + mempoolCh *p2p.Channel + peerUpdates *p2p.PeerUpdatesCh + closeCh chan struct{} - curID := ids.nextPeerID() - ids.peerMap[peer.ID()] = curID - ids.activeIDs[curID] = struct{}{} + // peerWG is used to coordinate graceful termination of all peer broadcasting + // goroutines. + peerWG sync.WaitGroup + + mtx tmsync.Mutex + peerRoutines map[p2p.NodeID]*tmsync.Closer } -// nextPeerID returns the next unused peer ID to use. -// This assumes that ids's mutex is already locked. -func (ids *mempoolIDs) nextPeerID() uint16 { - if len(ids.activeIDs) == maxActiveIDs { - panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", maxActiveIDs)) +// NewReactor returns a reference to a new reactor. +func NewReactor( + logger log.Logger, + config *cfg.MempoolConfig, + peerMgr PeerManager, + mempool *CListMempool, + mempoolCh *p2p.Channel, + peerUpdates *p2p.PeerUpdatesCh, +) *Reactor { + + r := &Reactor{ + config: config, + peerMgr: peerMgr, + mempool: mempool, + ids: newMempoolIDs(), + mempoolCh: mempoolCh, + peerUpdates: peerUpdates, + closeCh: make(chan struct{}), + peerRoutines: make(map[p2p.NodeID]*tmsync.Closer), } - _, idExists := ids.activeIDs[ids.nextID] - for idExists { - ids.nextID++ - _, idExists = ids.activeIDs[ids.nextID] - } - curID := ids.nextID - ids.nextID++ - return curID + r.BaseService = *service.NewBaseService(logger, "Mempool", r) + return r } -// Reclaim returns the ID reserved for the peer back to unused pool. -func (ids *mempoolIDs) Reclaim(peer p2p.Peer) { - ids.mtx.Lock() - defer ids.mtx.Unlock() +// GetChannelShims returns a map of ChannelDescriptorShim objects, where each +// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding +// p2p proto.Message the new p2p Channel is responsible for handling. +// +// +// TODO: Remove once p2p refactor is complete. +// ref: https://github.com/tendermint/tendermint/issues/5670 +func GetChannelShims(config *cfg.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDescriptorShim { + largestTx := make([]byte, config.MaxTxBytes) + batchMsg := protomem.Message{ + Sum: &protomem.Message_Txs{ + Txs: &protomem.Txs{Txs: [][]byte{largestTx}}, + }, + } - removedID, ok := ids.peerMap[peer.ID()] - if ok { - delete(ids.activeIDs, removedID) - delete(ids.peerMap, peer.ID()) + return map[p2p.ChannelID]*p2p.ChannelDescriptorShim{ + MempoolChannel: { + MsgType: new(protomem.Message), + Descriptor: &p2p.ChannelDescriptor{ + ID: byte(MempoolChannel), + Priority: 5, + RecvMessageCapacity: batchMsg.Size(), + }, + }, } } -// GetForPeer returns an ID reserved for the peer. -func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 { - ids.mtx.RLock() - defer ids.mtx.RUnlock() +// OnStart starts separate go routines for each p2p Channel and listens for +// envelopes on each. In addition, it also listens for peer updates and handles +// messages on that p2p channel accordingly. The caller must be sure to execute +// OnStop to ensure the outbound p2p Channels are closed. +func (r *Reactor) OnStart() error { + if !r.config.Broadcast { + r.Logger.Info("tx broadcasting is disabled") + } - return ids.peerMap[peer.ID()] -} + go r.processMempoolCh() + go r.processPeerUpdates() -func newMempoolIDs() *mempoolIDs { - return &mempoolIDs{ - peerMap: make(map[p2p.NodeID]uint16), - activeIDs: map[uint16]struct{}{0: {}}, - nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx - } + return nil } -// NewReactor returns a new Reactor with the given config and mempool. -func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor { - memR := &Reactor{ - config: config, - mempool: mempool, - ids: newMempoolIDs(), +// OnStop stops the reactor by signaling to all spawned goroutines to exit and +// blocking until they all exit. +func (r *Reactor) OnStop() { + r.mtx.Lock() + for _, c := range r.peerRoutines { + c.Close() } - memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) - return memR -} + r.mtx.Unlock() -// InitPeer implements Reactor by creating a state for the peer. -func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer { - memR.ids.ReserveForPeer(peer) - return peer -} + // wait for all spawned peer tx broadcasting goroutines to gracefully exit + r.peerWG.Wait() -// SetLogger sets the Logger on the reactor and the underlying mempool. -func (memR *Reactor) SetLogger(l log.Logger) { - memR.Logger = l - memR.mempool.SetLogger(l) + // Close closeCh to signal to all spawned goroutines to gracefully exit. All + // p2p Channels should execute Close(). + close(r.closeCh) + + // Wait for all p2p Channels to be closed before returning. This ensures we + // can easily reason about synchronization of all p2p Channels and ensure no + // panics will occur. + <-r.mempoolCh.Done() + <-r.peerUpdates.Done() } -// OnStart implements p2p.BaseReactor. -func (memR *Reactor) OnStart() error { - if !memR.config.Broadcast { - memR.Logger.Info("Tx broadcasting is disabled") +// handleMempoolMessage handles envelopes sent from peers on the MempoolChannel. +// For every tx in the message, we execute CheckTx. It returns an error if an +// empty set of txs are sent in an envelope or if we receive an unexpected +// message type. +func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error { + logger := r.Logger.With("peer", envelope.From) + + switch msg := envelope.Message.(type) { + case *protomem.Txs: + protoTxs := msg.GetTxs() + if len(protoTxs) == 0 { + return errors.New("empty txs received from peer") + } + + txInfo := TxInfo{SenderID: r.ids.GetForPeer(envelope.From)} + if len(envelope.From) != 0 { + txInfo.SenderP2PID = envelope.From + } + + for _, tx := range protoTxs { + if err := r.mempool.CheckTx(types.Tx(tx), nil, txInfo); err != nil { + logger.Error("checktx failed for tx", "tx", txID(tx), "err", err) + } + } + + default: + return fmt.Errorf("received unknown message: %T", msg) } + return nil } -// GetChannels implements Reactor by returning the list of channels for this -// reactor. -func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { - largestTx := make([]byte, memR.config.MaxTxBytes) - batchMsg := protomem.Message{ - Sum: &protomem.Message_Txs{ - Txs: &protomem.Txs{Txs: [][]byte{largestTx}}, - }, - } +// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. +// It will handle errors and any possible panics gracefully. A caller can handle +// any error returned by sending a PeerError on the respective channel. +func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) { + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("panic in processing message: %v", e) + } + }() - return []*p2p.ChannelDescriptor{ - { - ID: MempoolChannel, - Priority: 5, - RecvMessageCapacity: batchMsg.Size(), - }, + r.Logger.Debug("received message", "peer", envelope.From) + + switch chID { + case MempoolChannel: + err = r.handleMempoolMessage(envelope) + + default: + err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) } + + return err } -// AddPeer implements Reactor. -// It starts a broadcast routine ensuring all txs are forwarded to the given peer. -func (memR *Reactor) AddPeer(peer p2p.Peer) { - if memR.config.Broadcast { - go memR.broadcastTxRoutine(peer) +// processMempoolCh implements a blocking event loop where we listen for p2p +// Envelope messages from the mempoolCh. +func (r *Reactor) processMempoolCh() { + defer r.mempoolCh.Close() + + for { + select { + case envelope := <-r.mempoolCh.In(): + if err := r.handleMessage(r.mempoolCh.ID(), envelope); err != nil { + r.Logger.Error("failed to process message", "ch_id", r.mempoolCh.ID(), "envelope", envelope, "err", err) + r.mempoolCh.Error() <- p2p.PeerError{ + PeerID: envelope.From, + Err: err, + Severity: p2p.PeerErrorSeverityLow, + } + } + + case <-r.closeCh: + r.Logger.Debug("stopped listening on mempool channel; closing...") + return + } } } -// RemovePeer implements Reactor. -func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { - memR.ids.Reclaim(peer) - // broadcast routine checks if peer is gone and returns -} +// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we +// check if the reactor is running and if we've already started a tx broadcasting +// goroutine or not. If not, we start one for the newly added peer. For down or +// removed peers, we remove the peer from the mempool peer ID set and signal to +// stop the tx broadcasting goroutine. +func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { + r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status) + + r.mtx.Lock() + defer r.mtx.Unlock() + + switch peerUpdate.Status { + case p2p.PeerStatusUp: + // Do not allow starting new tx broadcast loops after reactor shutdown + // has been initiated. This can happen after we've manually closed all + // peer broadcast loops and closed r.closeCh, but the router still sends + // in-flight peer updates. + if !r.IsRunning() { + return + } -// Receive implements Reactor. -// It adds any received transactions to the mempool. -// XXX: do not call any methods that can block or incur heavy processing. -// https://github.com/tendermint/tendermint/issues/2888 -func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - msg, err := memR.decodeMsg(msgBytes) - if err != nil { - memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) - memR.Switch.StopPeerForError(src, err) - return - } - memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) + if r.config.Broadcast { + // Check if we've already started a goroutine for this peer, if not we create + // a new done channel so we can explicitly close the goroutine if the peer + // is later removed, we increment the waitgroup so the reactor can stop + // safely, and finally start the goroutine to broadcast txs to that peer. + _, ok := r.peerRoutines[peerUpdate.PeerID] + if !ok { + closer := tmsync.NewCloser() - txInfo := TxInfo{SenderID: memR.ids.GetForPeer(src)} - if src != nil { - txInfo.SenderP2PID = src.ID() - } - for _, tx := range msg.Txs { - err = memR.mempool.CheckTx(tx, nil, txInfo) - if err != nil { - memR.Logger.Info("Could not check tx", "tx", txID(tx), "err", err) + r.peerRoutines[peerUpdate.PeerID] = closer + r.peerWG.Add(1) + + r.ids.ReserveForPeer(peerUpdate.PeerID) + + // start a broadcast routine ensuring all txs are forwarded to the peer + go r.broadcastTxRoutine(peerUpdate.PeerID, closer) + } + } + + case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned: + r.ids.Reclaim(peerUpdate.PeerID) + + // Check if we've started a tx broadcasting goroutine for this peer. + // If we have, we signal to terminate the goroutine via the channel's closure. + // This will internally decrement the peer waitgroup and remove the peer + // from the map of peer tx broadcasting goroutines. + closer, ok := r.peerRoutines[peerUpdate.PeerID] + if ok { + closer.Close() } } - // broadcasting happens from go routines per peer } -// PeerState describes the state of a peer. -type PeerState interface { - GetHeight() int64 +// processPeerUpdates initiates a blocking process where we listen for and handle +// PeerUpdate messages. When the reactor is stopped, we will catch the signal and +// close the p2p PeerUpdatesCh gracefully. +func (r *Reactor) processPeerUpdates() { + defer r.peerUpdates.Close() + + for { + select { + case peerUpdate := <-r.peerUpdates.Updates(): + r.processPeerUpdate(peerUpdate) + + case <-r.closeCh: + r.Logger.Debug("stopped listening on peer updates channel; closing...") + return + } + } } -// Send new mempool txs to peer. -func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { - peerID := memR.ids.GetForPeer(peer) +func (r *Reactor) broadcastTxRoutine(peerID p2p.NodeID, closer *tmsync.Closer) { + peerMempoolID := r.ids.GetForPeer(peerID) var next *clist.CElement + // remove the peer ID from the map of routines and mark the waitgroup as done + defer func() { + r.mtx.Lock() + delete(r.peerRoutines, peerID) + r.mtx.Unlock() + + r.peerWG.Done() + + if e := recover(); e != nil { + r.Logger.Error("recovering from broadcasting mempool loop", "err", e) + } + }() + for { - // In case of both next.NextWaitChan() and peer.Quit() are variable at the same time - if !memR.IsRunning() || !peer.IsRunning() { + if !r.IsRunning() { return } + // This happens because the CElement we were looking at got garbage // collected (removed). That is, .NextWait() returned nil. Go ahead and // start from the beginning. if next == nil { select { - case <-memR.mempool.TxsWaitChan(): // Wait until a tx is available - if next = memR.mempool.TxsFront(); next == nil { + case <-r.mempool.TxsWaitChan(): // wait until a tx is available + if next = r.mempool.TxsFront(); next == nil { continue } - case <-peer.Quit(): + + case <-closer.Done(): + // The peer is marked for removal via a PeerUpdate as the doneCh was + // explicitly closed to signal we should exit. return - case <-memR.Quit(): + + case <-r.closeCh: + // The reactor has signaled that we are stopped and thus we should + // implicitly exit this peer's goroutine. return } } - // Make sure the peer is up to date. - peerState, ok := peer.Get(types.PeerStateKey).(PeerState) - if !ok { - // Peer does not have a state yet. We set it in the consensus reactor, but - // when we add peer in Switch, the order we call reactors#AddPeer is - // different every time due to us using a map. Sometimes other reactors - // will be initialized before the consensus reactor. We should wait a few - // milliseconds and retry. - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue - } - - // Allow for a lag of 1 block. memTx := next.Value.(*mempoolTx) - if peerState.GetHeight() < memTx.Height()-1 { - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue + + if r.peerMgr != nil { + height, err := r.peerMgr.GetHeight(peerID) + if err != nil { + r.Logger.Error("failed to get peer height", "err", err) + } else if height > 0 && height < memTx.Height()-1 { + // allow for a lag of one block + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + continue + } } - // NOTE: Transaction batching was disabled due to + // NOTE: Transaction batching was disabled due to: // https://github.com/tendermint/tendermint/issues/5796 - if _, ok := memTx.senders.Load(peerID); !ok { - msg := protomem.Message{ - Sum: &protomem.Message_Txs{ - Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}}, + if _, ok := memTx.senders.Load(peerMempoolID); !ok { + // Send the mempool tx to the corresponding peer. Note, the peer may be + // behind and thus would not be able to process the mempool tx correctly. + r.mempoolCh.Out() <- p2p.Envelope{ + To: peerID, + Message: &protomem.Txs{ + Txs: [][]byte{memTx.tx}, }, } - bz, err := msg.Marshal() - if err != nil { - panic(err) - } - success := peer.Send(MempoolChannel, bz) - if !success { - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue - } + r.Logger.Debug("gossiped tx to peer", "tx", txID(memTx.tx), "peer", peerID) } select { case <-next.NextWaitChan(): // see the start of the for loop for nil check next = next.Next() - case <-peer.Quit(): - return - case <-memR.Quit(): - return - } - } -} - -//----------------------------------------------------------------------------- -// Messages - -func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) { - msg := protomem.Message{} - err := msg.Unmarshal(bz) - if err != nil { - return TxsMessage{}, err - } - - var message TxsMessage - if i, ok := msg.Sum.(*protomem.Message_Txs); ok { - txs := i.Txs.GetTxs() - - if len(txs) == 0 { - return message, errors.New("empty TxsMessage") - } - - decoded := make([]types.Tx, len(txs)) - for j, tx := range txs { - decoded[j] = types.Tx(tx) - } + case <-closer.Done(): + // The peer is marked for removal via a PeerUpdate as the doneCh was + // explicitly closed to signal we should exit. + return - message = TxsMessage{ - Txs: decoded, + case <-r.closeCh: + // The reactor has signaled that we are stopped and thus we should + // implicitly exit this peer's goroutine. + return } - return message, nil } - return message, fmt.Errorf("msg type: %T is not supported", msg) -} - -//------------------------------------- - -// TxsMessage is a Message containing transactions. -type TxsMessage struct { - Txs []types.Tx -} - -// String returns a string representation of the TxsMessage. -func (m *TxsMessage) String() string { - return fmt.Sprintf("[TxsMessage %v]", m.Txs) } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index bc51bfd9b..564dcd3b5 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -1,16 +1,13 @@ package mempool import ( - "encoding/hex" - "errors" - "net" + "fmt" + "math/rand" "sync" "testing" "time" "github.com/fortytw2/leaktest" - "github.com/go-kit/kit/log/term" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/abci/example/kvstore" @@ -19,370 +16,413 @@ import ( "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/p2p/mock" - memproto "github.com/tendermint/tendermint/proto/tendermint/mempool" + protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) -const ( - numTxs = 1000 - timeout = 120 * time.Second // ridiculously high because CircleCI is slow -) +var rng = rand.New(rand.NewSource(time.Now().UnixNano())) + +type reactorTestSuite struct { + reactor *Reactor + + peerID p2p.NodeID -type peerState struct { - height int64 + mempoolChannel *p2p.Channel + mempoolInCh chan p2p.Envelope + mempoolOutCh chan p2p.Envelope + mempoolPeerErrCh chan p2p.PeerError + + peerUpdatesCh chan p2p.PeerUpdate + peerUpdates *p2p.PeerUpdatesCh } -func (ps peerState) GetHeight() int64 { - return ps.height +func setup(t *testing.T, cfg *cfg.MempoolConfig, logger log.Logger, chBuf uint) *reactorTestSuite { + t.Helper() + + pID := make([]byte, 16) + _, err := rng.Read(pID) + require.NoError(t, err) + + peerID, err := p2p.NewNodeID(fmt.Sprintf("%x", pID)) + require.NoError(t, err) + + peerUpdatesCh := make(chan p2p.PeerUpdate, chBuf) + + rts := &reactorTestSuite{ + mempoolInCh: make(chan p2p.Envelope, chBuf), + mempoolOutCh: make(chan p2p.Envelope, chBuf), + mempoolPeerErrCh: make(chan p2p.PeerError, chBuf), + peerUpdatesCh: peerUpdatesCh, + peerUpdates: p2p.NewPeerUpdates(peerUpdatesCh), + peerID: peerID, + } + + rts.mempoolChannel = p2p.NewChannel( + MempoolChannel, + new(protomem.Message), + rts.mempoolInCh, + rts.mempoolOutCh, + rts.mempoolPeerErrCh, + ) + + app := kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(app) + mempool, memCleanup := newMempoolWithApp(cc) + + mempool.SetLogger(logger) + + rts.reactor = NewReactor( + logger, + cfg, + nil, + mempool, + rts.mempoolChannel, + rts.peerUpdates, + ) + + require.NoError(t, rts.reactor.Start()) + require.True(t, rts.reactor.IsRunning()) + + t.Cleanup(func() { + memCleanup() + require.NoError(t, rts.reactor.Stop()) + require.False(t, rts.reactor.IsRunning()) + }) + + return rts } -// Send a bunch of txs to the first reactor's mempool and wait for them all to -// be received in the others. -func TestReactorBroadcastTxsMessage(t *testing.T) { - config := cfg.TestConfig() - // if there were more than two reactors, the order of transactions could not be - // asserted in waitForTxsOnReactors (due to transactions gossiping). If we - // replace Connect2Switches (full mesh) with a func, which connects first - // reactor to others and nothing else, this test should also pass with >2 reactors. - const N = 2 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) +func simulateRouter(wg *sync.WaitGroup, primary *reactorTestSuite, suites []*reactorTestSuite, numOut int) { + wg.Add(1) + + // create a mapping for efficient suite lookup by peer ID + suitesByPeerID := make(map[p2p.NodeID]*reactorTestSuite) + for _, suite := range suites { + suitesByPeerID[suite.peerID] = suite + } + + // Simulate a router by listening for all outbound envelopes and proxying the + // envelope to the respective peer (suite). + go func() { + for i := 0; i < numOut; i++ { + envelope := <-primary.mempoolOutCh + other := suitesByPeerID[envelope.To] + + other.mempoolInCh <- p2p.Envelope{ + From: primary.peerID, + To: envelope.To, + Message: envelope.Message, } } + + wg.Done() }() - for _, r := range reactors { - for _, peer := range r.Switch.Peers().List() { - peer.Set(types.PeerStateKey, peerState{1}) +} + +func waitForTxs(t *testing.T, txs types.Txs, suites ...*reactorTestSuite) { + t.Helper() + + wg := new(sync.WaitGroup) + + for _, suite := range suites { + wg.Add(1) + + go func(s *reactorTestSuite) { + mempool := s.reactor.mempool + for mempool.Size() < len(txs) { + time.Sleep(time.Millisecond * 100) + } + + reapedTxs := mempool.ReapMaxTxs(len(txs)) + for i, tx := range txs { + require.Equalf( + t, tx, reapedTxs[i], + "txs at index %d in reactor mempool mismatch; got: %v, expected: %v", i, tx, reapedTxs[i], + ) + } + + wg.Done() + }(suite) + } + + wg.Wait() +} + +func TestReactorBroadcastTxs(t *testing.T) { + numTxs := 1000 + numNodes := 10 + config := cfg.TestConfig() + + testSuites := make([]*reactorTestSuite, numNodes) + for i := 0; i < len(testSuites); i++ { + logger := log.TestingLogger().With("node", i) + testSuites[i] = setup(t, config.Mempool, logger, 0) + } + + primary := testSuites[0] + secondaries := testSuites[1:] + + // Simulate a router by listening for all outbound envelopes and proxying the + // envelopes to the respective peer (suite). + wg := new(sync.WaitGroup) + simulateRouter(wg, primary, testSuites, numTxs*len(secondaries)) + + txs := checkTxs(t, primary.reactor.mempool, numTxs, UnknownPeerID) + + // Add each secondary suite (node) as a peer to the primary suite (node). This + // will cause the primary to gossip all mempool txs to the secondaries. + for _, suite := range secondaries { + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: suite.peerID, } } - txs := checkTxs(t, reactors[0].mempool, numTxs, UnknownPeerID) - waitForTxsOnReactors(t, txs, reactors) + // Wait till all secondary suites (reactor) received all mempool txs from the + // primary suite (node). + waitForTxs(t, txs, secondaries...) + + for _, suite := range testSuites { + require.Equal(t, len(txs), suite.reactor.mempool.Size()) + } + + wg.Wait() + + // ensure all channels are drained + for _, suite := range testSuites { + require.Empty(t, suite.mempoolOutCh) + } } // regression test for https://github.com/tendermint/tendermint/issues/5408 func TestReactorConcurrency(t *testing.T) { + numTxs := 5 + numNodes := 2 config := cfg.TestConfig() - const N = 2 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() - for _, r := range reactors { - for _, peer := range r.Switch.Peers().List() { - peer.Set(types.PeerStateKey, peerState{1}) - } + + testSuites := make([]*reactorTestSuite, numNodes) + for i := 0; i < len(testSuites); i++ { + logger := log.TestingLogger().With("node", i) + testSuites[i] = setup(t, config.Mempool, logger, 0) } - var wg sync.WaitGroup - const numTxs = 5 + primary := testSuites[0] + secondary := testSuites[1] + + var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(2) // 1. submit a bunch of txs // 2. update the whole mempool - txs := checkTxs(t, reactors[0].mempool, numTxs, UnknownPeerID) + txs := checkTxs(t, primary.reactor.mempool, numTxs, UnknownPeerID) go func() { defer wg.Done() - reactors[0].mempool.Lock() - defer reactors[0].mempool.Unlock() + primary.reactor.mempool.Lock() + defer primary.reactor.mempool.Unlock() deliverTxResponses := make([]*abci.ResponseDeliverTx, len(txs)) for i := range txs { deliverTxResponses[i] = &abci.ResponseDeliverTx{Code: 0} } - err := reactors[0].mempool.Update(1, txs, deliverTxResponses, nil, nil) - assert.NoError(t, err) + + err := primary.reactor.mempool.Update(1, txs, deliverTxResponses, nil, nil) + require.NoError(t, err) }() // 1. submit a bunch of txs // 2. update none - _ = checkTxs(t, reactors[1].mempool, numTxs, UnknownPeerID) + _ = checkTxs(t, secondary.reactor.mempool, numTxs, UnknownPeerID) go func() { defer wg.Done() - reactors[1].mempool.Lock() - defer reactors[1].mempool.Unlock() - err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) - assert.NoError(t, err) + secondary.reactor.mempool.Lock() + defer secondary.reactor.mempool.Unlock() + + err := secondary.reactor.mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) + require.NoError(t, err) }() - // 1. flush the mempool - reactors[1].mempool.Flush() + // flush the mempool + secondary.reactor.mempool.Flush() } wg.Wait() } -// Send a bunch of txs to the first reactor's mempool, claiming it came from peer -// ensure peer gets no txs. func TestReactorNoBroadcastToSender(t *testing.T) { + numTxs := 1000 + numNodes := 2 config := cfg.TestConfig() - const N = 2 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() - for _, r := range reactors { - for _, peer := range r.Switch.Peers().List() { - peer.Set(types.PeerStateKey, peerState{1}) - } + + testSuites := make([]*reactorTestSuite, numNodes) + for i := 0; i < len(testSuites); i++ { + logger := log.TestingLogger().With("node", i) + testSuites[i] = setup(t, config.Mempool, logger, uint(numTxs)) } - const peerID = 1 - checkTxs(t, reactors[0].mempool, numTxs, peerID) - ensureNoTxs(t, reactors[peerID], 100*time.Millisecond) -} + primary := testSuites[0] + secondary := testSuites[1] -func TestReactor_MaxTxBytes(t *testing.T) { - config := cfg.TestConfig() + peerID := uint16(1) + _ = checkTxs(t, primary.reactor.mempool, numTxs, peerID) - const N = 2 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() - for _, r := range reactors { - for _, peer := range r.Switch.Peers().List() { - peer.Set(types.PeerStateKey, peerState{1}) - } + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: secondary.peerID, } - // Broadcast a tx, which has the max size - // => ensure it's received by the second reactor. - tx1 := tmrand.Bytes(config.Mempool.MaxTxBytes) - err := reactors[0].mempool.CheckTx(tx1, nil, TxInfo{SenderID: UnknownPeerID}) - require.NoError(t, err) - waitForTxsOnReactors(t, []types.Tx{tx1}, reactors) + time.Sleep(100 * time.Millisecond) - reactors[0].mempool.Flush() - reactors[1].mempool.Flush() + require.Eventually(t, func() bool { + return secondary.reactor.mempool.Size() == 0 + }, time.Minute, 100*time.Millisecond) - // Broadcast a tx, which is beyond the max size - // => ensure it's not sent - tx2 := tmrand.Bytes(config.Mempool.MaxTxBytes + 1) - err = reactors[0].mempool.CheckTx(tx2, nil, TxInfo{SenderID: UnknownPeerID}) - require.Error(t, err) + // ensure all channels are drained + for _, suite := range testSuites { + require.Empty(t, suite.mempoolOutCh) + } } -func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } +func TestMempoolIDsBasic(t *testing.T) { + ids := newMempoolIDs() - config := cfg.TestConfig() - const N = 2 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() + peerID, err := p2p.NewNodeID("00ffaa") + require.NoError(t, err) - // stop peer - sw := reactors[1].Switch - sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason")) + ids.ReserveForPeer(peerID) + require.EqualValues(t, 1, ids.GetForPeer(peerID)) + ids.Reclaim(peerID) - // check that we are not leaking any go-routines - // i.e. broadcastTxRoutine finishes when peer is stopped - leaktest.CheckTimeout(t, 10*time.Second)() + ids.ReserveForPeer(peerID) + require.EqualValues(t, 2, ids.GetForPeer(peerID)) + ids.Reclaim(peerID) } -func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - +func TestReactor_MaxTxBytes(t *testing.T) { + numNodes := 2 config := cfg.TestConfig() - const N = 2 - reactors := makeAndConnectReactors(config, N) - // stop reactors - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } + testSuites := make([]*reactorTestSuite, numNodes) + for i := 0; i < len(testSuites); i++ { + logger := log.TestingLogger().With("node", i) + testSuites[i] = setup(t, config.Mempool, logger, 0) } - // check that we are not leaking any go-routines - // i.e. broadcastTxRoutine finishes when reactor is stopped - leaktest.CheckTimeout(t, 10*time.Second)() -} + primary := testSuites[0] + secondary := testSuites[1] -func TestMempoolIDsBasic(t *testing.T) { - ids := newMempoolIDs() + // Simulate a router by listening for all outbound envelopes and proxying the + // envelopes to the respective peer (suite). + wg := new(sync.WaitGroup) + simulateRouter(wg, primary, testSuites, 1) - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) + // Broadcast a tx, which has the max size and ensure it's received by the + // second reactor. + tx1 := tmrand.Bytes(config.Mempool.MaxTxBytes) + err := primary.reactor.mempool.CheckTx(tx1, nil, TxInfo{SenderID: UnknownPeerID}) + require.NoError(t, err) - ids.ReserveForPeer(peer) - assert.EqualValues(t, 1, ids.GetForPeer(peer)) - ids.Reclaim(peer) + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: secondary.peerID, + } - ids.ReserveForPeer(peer) - assert.EqualValues(t, 2, ids.GetForPeer(peer)) - ids.Reclaim(peer) -} + // Wait till all secondary suites (reactor) received all mempool txs from the + // primary suite (node). + waitForTxs(t, []types.Tx{tx1}, secondary) -func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { - if testing.Short() { - return - } + primary.reactor.mempool.Flush() + secondary.reactor.mempool.Flush() - // 0 is already reserved for UnknownPeerID - ids := newMempoolIDs() + // broadcast a tx, which is beyond the max size and ensure it's not sent + tx2 := tmrand.Bytes(config.Mempool.MaxTxBytes + 1) + err = primary.reactor.mempool.CheckTx(tx2, nil, TxInfo{SenderID: UnknownPeerID}) + require.Error(t, err) - for i := 0; i < maxActiveIDs-1; i++ { - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) - ids.ReserveForPeer(peer) - } + wg.Wait() - assert.Panics(t, func() { - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) - ids.ReserveForPeer(peer) - }) + // ensure all channels are drained + for _, suite := range testSuites { + require.Empty(t, suite.mempoolOutCh) + } } func TestDontExhaustMaxActiveIDs(t *testing.T) { config := cfg.TestConfig() - const N = 1 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } + reactor := setup(t, config.Mempool, log.TestingLogger().With("node", 0), 0) + + go func() { + for range reactor.mempoolOutCh { } }() - reactor := reactors[0] + peerID, err := p2p.NewNodeID("00ffaa") + require.NoError(t, err) + + // ensure the reactor does not panic (i.e. exhaust active IDs) for i := 0; i < maxActiveIDs+1; i++ { - peer := mock.NewPeer(nil) - reactor.Receive(MempoolChannel, peer, []byte{0x1, 0x2, 0x3}) - reactor.AddPeer(peer) + reactor.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: peerID, + } + reactor.mempoolOutCh <- p2p.Envelope{ + To: peerID, + Message: &protomem.Txs{ + Txs: [][]byte{}, + }, + } } -} -// mempoolLogger is a TestingLogger which uses a different -// color for each validator ("validator" key must exist). -func mempoolLogger() log.Logger { - return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { - for i := 0; i < len(keyvals)-1; i += 2 { - if keyvals[i] == "validator" { - return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} - } - } - return term.FgBgColor{} - }) + require.Empty(t, reactor.mempoolOutCh) } -// connect N mempool reactors through N switches -func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor { - reactors := make([]*Reactor, n) - logger := mempoolLogger() - for i := 0; i < n; i++ { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mempool, cleanup := newMempoolWithApp(cc) - defer cleanup() - - reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states - reactors[i].SetLogger(logger.With("validator", i)) +func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") } - p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("MEMPOOL", reactors[i]) - return s + // 0 is already reserved for UnknownPeerID + ids := newMempoolIDs() - }, p2p.Connect2Switches) - return reactors -} + peerID, err := p2p.NewNodeID("00ffaa") + require.NoError(t, err) -func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) { - // wait for the txs in all mempools - wg := new(sync.WaitGroup) - for i, reactor := range reactors { - wg.Add(1) - go func(r *Reactor, reactorIndex int) { - defer wg.Done() - waitForTxsOnReactor(t, txs, r, reactorIndex) - }(reactor, i) + for i := 0; i < maxActiveIDs-1; i++ { + ids.ReserveForPeer(peerID) } - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - timer := time.After(timeout) - select { - case <-timer: - t.Fatal("Timed out waiting for txs") - case <-done: - } + require.Panics(t, func() { + ids.ReserveForPeer(peerID) + }) } -func waitForTxsOnReactor(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex int) { - mempool := reactor.mempool - for mempool.Size() < len(txs) { - time.Sleep(time.Millisecond * 100) +func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") } - reapedTxs := mempool.ReapMaxTxs(len(txs)) - for i, tx := range txs { - assert.Equalf(t, tx, reapedTxs[i], - "txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i]) - } -} + config := cfg.TestConfig() -// ensure no txs on reactor after some timeout -func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) { - time.Sleep(timeout) // wait for the txs in all mempools - assert.Zero(t, reactor.mempool.Size()) -} + primary := setup(t, config.Mempool, log.TestingLogger().With("node", 0), 0) + secondary := setup(t, config.Mempool, log.TestingLogger().With("node", 1), 0) -func TestMempoolVectors(t *testing.T) { - testCases := []struct { - testName string - tx []byte - expBytes string - }{ - {"tx 1", []byte{123}, "0a030a017b"}, - {"tx 2", []byte("proto encoding in mempool"), "0a1b0a1970726f746f20656e636f64696e6720696e206d656d706f6f6c"}, + // connect peer + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: secondary.peerID, } - for _, tc := range testCases { - tc := tc - - msg := memproto.Message{ - Sum: &memproto.Message_Txs{ - Txs: &memproto.Txs{Txs: [][]byte{tc.tx}}, - }, - } - bz, err := msg.Marshal() - require.NoError(t, err, tc.testName) - - require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName) + // disconnect peer + primary.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusDown, + PeerID: secondary.peerID, } + + // check that we are not leaking any go-routines + // i.e. broadcastTxRoutine finishes when peer is stopped + leaktest.CheckTimeout(t, 10*time.Second)() } diff --git a/node/node.go b/node/node.go index 09cdc2896..a6d4c56d2 100644 --- a/node/node.go +++ b/node/node.go @@ -319,9 +319,16 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool { return bytes.Equal(pubKey.Address(), addr) } -func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, - state sm.State, memplMetrics *mempl.Metrics, logger log.Logger) (*mempl.Reactor, *mempl.CListMempool) { +func createMempoolReactor( + config *cfg.Config, + proxyApp proxy.AppConns, + state sm.State, + memplMetrics *mempl.Metrics, + peerMgr *p2p.PeerManager, + logger log.Logger, +) (*p2p.ReactorShim, *mempl.Reactor, *mempl.CListMempool) { + logger = logger.With("module", "mempool") mempool := mempl.NewCListMempool( config.Mempool, proxyApp.Mempool(), @@ -330,14 +337,24 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, mempl.WithPreCheck(sm.TxPreCheck(state)), mempl.WithPostCheck(sm.TxPostCheck(state)), ) - mempoolLogger := logger.With("module", "mempool") - mempoolReactor := mempl.NewReactor(config.Mempool, mempool) - mempoolReactor.SetLogger(mempoolLogger) + + mempool.SetLogger(logger) + + reactorShim := p2p.NewReactorShim(logger, "MempoolShim", mempl.GetChannelShims(config.Mempool)) + reactor := mempl.NewReactor( + logger, + config.Mempool, + peerMgr, + mempool, + reactorShim.GetChannel(mempl.MempoolChannel), + reactorShim.PeerUpdates, + ) if config.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() } - return mempoolReactor, mempool + + return reactorShim, reactor, mempool } func createEvidenceReactor( @@ -513,7 +530,7 @@ func createSwitch(config *cfg.Config, transport p2p.Transport, p2pMetrics *p2p.Metrics, peerFilters []p2p.PeerFilterFunc, - mempoolReactor *mempl.Reactor, + mempoolReactor *p2p.ReactorShim, bcReactor p2p.Reactor, stateSyncReactor *p2p.ReactorShim, consensusReactor *cs.Reactor, @@ -744,10 +761,11 @@ func NewNode(config *cfg.Config, logNodeStartupInfo(state, pubKey, logger, consensusLogger) - csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) + // TODO: Fetch and provide real options and do proper p2p bootstrapping. + peerMgr := p2p.NewPeerManager(p2p.PeerManagerOptions{}) - // Make MempoolReactor - mempoolReactor, mempool := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger) + csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) + mpReactorShim, mpReactor, mempool := createMempoolReactor(config, proxyApp, state, memplMetrics, peerMgr, logger) evReactorShim, evReactor, evPool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger) if err != nil { @@ -819,7 +837,7 @@ func NewNode(config *cfg.Config, p2pLogger := logger.With("module", "p2p") transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) sw := createSwitch( - config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactorForSwitch, + config, transport, p2pMetrics, peerFilters, mpReactorShim, bcReactorForSwitch, stateSyncReactorShim, csReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, ) @@ -876,7 +894,7 @@ func NewNode(config *cfg.Config, stateStore: stateStore, blockStore: blockStore, bcReactor: bcReactor, - mempoolReactor: mempoolReactor, + mempoolReactor: mpReactor, mempool: mempool, consensusState: csState, consensusReactor: csReactor, @@ -964,6 +982,11 @@ func (n *Node) OnStart() error { return err } + // Start the real mempool reactor separately since the switch uses the shim. + if err := n.mempoolReactor.Start(); err != nil { + return err + } + // Start the real evidence reactor separately since the switch uses the shim. if err := n.evidenceReactor.Start(); err != nil { return err @@ -1022,6 +1045,11 @@ func (n *Node) OnStop() { n.Logger.Error("failed to stop the state sync reactor", "err", err) } + // Stop the real mempool reactor separately since the switch uses the shim. + if err := n.mempoolReactor.Stop(); err != nil { + n.Logger.Error("failed to stop the mempool reactor", "err", err) + } + // Stop the real evidence reactor separately since the switch uses the shim. if err := n.evidenceReactor.Stop(); err != nil { n.Logger.Error("failed to stop the evidence reactor", "err", err) @@ -1352,7 +1380,7 @@ func makeNodeInfo( cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel, - mempl.MempoolChannel, + byte(mempl.MempoolChannel), byte(evidence.EvidenceChannel), byte(statesync.SnapshotChannel), byte(statesync.ChunkChannel), diff --git a/proto/tendermint/blockchain/message.go b/proto/tendermint/blockchain/message.go index 194b48655..6143a64e7 100644 --- a/proto/tendermint/blockchain/message.go +++ b/proto/tendermint/blockchain/message.go @@ -12,7 +12,7 @@ const ( BlockResponseMessageFieldKeySize = 1 ) -// Wrap implements the p2p Wrapper interface and wraps a blockchain messages. +// Wrap implements the p2p Wrapper interface and wraps a blockchain message. func (m *Message) Wrap(pb proto.Message) error { switch msg := pb.(type) { case *BlockRequest: diff --git a/proto/tendermint/mempool/message.go b/proto/tendermint/mempool/message.go new file mode 100644 index 000000000..64a79bc81 --- /dev/null +++ b/proto/tendermint/mempool/message.go @@ -0,0 +1,32 @@ +package mempool + +import ( + fmt "fmt" + + proto "github.com/gogo/protobuf/proto" +) + +// Wrap implements the p2p Wrapper interface and wraps a mempool message. +func (m *Message) Wrap(pb proto.Message) error { + switch msg := pb.(type) { + case *Txs: + m.Sum = &Message_Txs{Txs: msg} + + default: + return fmt.Errorf("unknown message: %T", msg) + } + + return nil +} + +// Unwrap implements the p2p Wrapper interface and unwraps a wrapped mempool +// message. +func (m *Message) Unwrap() (proto.Message, error) { + switch msg := m.Sum.(type) { + case *Message_Txs: + return m.GetTxs(), nil + + default: + return nil, fmt.Errorf("unknown message: %T", msg) + } +} diff --git a/statesync/reactor.go b/statesync/reactor.go index c6b48411b..30cf5c09e 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -150,24 +150,25 @@ func (r *Reactor) OnStop() { <-r.peerUpdates.Done() } -// handleSnapshotMessage handles enevelopes sent from peers on the +// handleSnapshotMessage handles envelopes sent from peers on the // SnapshotChannel. It returns an error only if the Envelope.Message is unknown // for this channel. This should never be called outside of handleMessage. func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error { + logger := r.Logger.With("peer", envelope.From) + switch msg := envelope.Message.(type) { case *ssproto.SnapshotsRequest: snapshots, err := r.recentSnapshots(recentSnapshots) if err != nil { - r.Logger.Error("failed to fetch snapshots", "err", err) + logger.Error("failed to fetch snapshots", "err", err) return nil } for _, snapshot := range snapshots { - r.Logger.Debug( + logger.Debug( "advertising snapshot", "height", snapshot.Height, "format", snapshot.Format, - "peer", envelope.From, ) r.snapshotCh.Out() <- p2p.Envelope{ To: envelope.From, @@ -186,16 +187,11 @@ func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error { defer r.mtx.RUnlock() if r.syncer == nil { - r.Logger.Debug("received unexpected snapshot; no state sync in progress") + logger.Debug("received unexpected snapshot; no state sync in progress") return nil } - r.Logger.Debug( - "received snapshot", - "height", msg.Height, - "format", msg.Format, - "peer", envelope.From, - ) + logger.Debug("received snapshot", "height", msg.Height, "format", msg.Format) _, err := r.syncer.AddSnapshot(envelope.From, &snapshot{ Height: msg.Height, Format: msg.Format, @@ -204,7 +200,7 @@ func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error { Metadata: msg.Metadata, }) if err != nil { - r.Logger.Error( + logger.Error( "failed to add snapshot", "height", msg.Height, "format", msg.Format, @@ -215,14 +211,13 @@ func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error { } default: - r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From) return fmt.Errorf("received unknown message: %T", msg) } return nil } -// handleChunkMessage handles enevelopes sent from peers on the ChunkChannel. +// handleChunkMessage handles envelopes sent from peers on the ChunkChannel. // It returns an error only if the Envelope.Message is unknown for this channel. // This should never be called outside of handleMessage. func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error { @@ -306,7 +301,6 @@ func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error { } default: - r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From) return fmt.Errorf("received unknown message: %T", msg) } @@ -323,6 +317,8 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err } }() + r.Logger.Debug("received message", "message", envelope.Message, "peer", envelope.From) + switch chID { case SnapshotChannel: err = r.handleSnapshotMessage(envelope) diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index 502aaf970..14937a8ce 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -360,9 +360,16 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool { return bytes.Equal(pubKey.Address(), addr) } -func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, - state sm.State, memplMetrics *mempl.Metrics, logger log.Logger) (*mempl.Reactor, *mempl.CListMempool) { +func createMempoolReactor( + config *cfg.Config, + proxyApp proxy.AppConns, + state sm.State, + memplMetrics *mempl.Metrics, + peerMgr *p2p.PeerManager, + logger log.Logger, +) (*p2p.ReactorShim, *mempl.Reactor, *mempl.CListMempool) { + logger = logger.With("module", "mempool") mempool := mempl.NewCListMempool( config.Mempool, proxyApp.Mempool(), @@ -371,14 +378,24 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, mempl.WithPreCheck(sm.TxPreCheck(state)), mempl.WithPostCheck(sm.TxPostCheck(state)), ) - mempoolLogger := logger.With("module", "mempool") - mempoolReactor := mempl.NewReactor(config.Mempool, mempool) - mempoolReactor.SetLogger(mempoolLogger) + + mempool.SetLogger(logger) + + reactorShim := p2p.NewReactorShim(logger, "MempoolShim", mempl.GetChannelShims(config.Mempool)) + reactor := mempl.NewReactor( + logger, + config.Mempool, + peerMgr, + mempool, + reactorShim.GetChannel(mempl.MempoolChannel), + reactorShim.PeerUpdates, + ) if config.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() } - return mempoolReactor, mempool + + return reactorShim, reactor, mempool } func createEvidenceReactor( @@ -556,7 +573,7 @@ func createSwitch(config *cfg.Config, transport p2p.Transport, p2pMetrics *p2p.Metrics, peerFilters []p2p.PeerFilterFunc, - mempoolReactor *mempl.Reactor, + mempoolReactor *p2p.ReactorShim, bcReactor p2p.Reactor, stateSyncReactor *p2p.ReactorShim, consensusReactor *cs.Reactor, @@ -777,10 +794,11 @@ func NewNode(config *cfg.Config, logNodeStartupInfo(state, pubKey, logger, consensusLogger) - csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) + // TODO: Fetch and provide real options and do proper p2p bootstrapping. + peerMgr := p2p.NewPeerManager(p2p.PeerManagerOptions{}) - // Make MempoolReactor - mempoolReactor, mempool := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger) + csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) + mpReactorShim, mpReactor, mempool := createMempoolReactor(config, proxyApp, state, memplMetrics, peerMgr, logger) evReactorShim, evReactor, evPool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger) if err != nil { @@ -853,7 +871,7 @@ func NewNode(config *cfg.Config, p2pLogger := logger.With("module", "p2p") transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) sw := createSwitch( - config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactorForSwitch, + config, transport, p2pMetrics, peerFilters, mpReactorShim, bcReactorForSwitch, stateSyncReactorShim, csReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, ) @@ -910,7 +928,7 @@ func NewNode(config *cfg.Config, stateStore: stateStore, blockStore: blockStore, bcReactor: bcReactor, - mempoolReactor: mempoolReactor, + mempoolReactor: mpReactor, mempool: mempool, consensusState: csState, consensusReactor: csReactor, @@ -998,6 +1016,11 @@ func (n *Node) OnStart() error { return err } + // Start the real mempool reactor separately since the switch uses the shim. + if err := n.mempoolReactor.Start(); err != nil { + return err + } + // Start the real evidence reactor separately since the switch uses the shim. if err := n.evidenceReactor.Start(); err != nil { return err @@ -1056,6 +1079,11 @@ func (n *Node) OnStop() { n.Logger.Error("failed to stop the state sync reactor", "err", err) } + // Stop the real mempool reactor separately since the switch uses the shim. + if err := n.mempoolReactor.Stop(); err != nil { + n.Logger.Error("failed to stop the mempool reactor", "err", err) + } + // Stop the real evidence reactor separately since the switch uses the shim. if err := n.evidenceReactor.Stop(); err != nil { n.Logger.Error("failed to stop the evidence reactor", "err", err) @@ -1384,7 +1412,7 @@ func makeNodeInfo( cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel, - mempl.MempoolChannel, + byte(mempl.MempoolChannel), byte(evidence.EvidenceChannel), byte(statesync.SnapshotChannel), byte(statesync.ChunkChannel),