diff --git a/consensus/reactor.go b/consensus/reactor.go index 4fb510da7..0337c9ff6 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -57,7 +57,7 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool, options metrics: NopMetrics(), } conR.updateFastSyncingMetric() - conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR) + conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR) for _, option := range options { option(conR) diff --git a/evidence/reactor.go b/evidence/reactor.go index 1a2e52693..1ca18b588 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -34,7 +34,7 @@ func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor { evR := &EvidenceReactor{ evpool: evpool, } - evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR) + evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR) return evR } diff --git a/mempool/reactor.go b/mempool/reactor.go index 3388d4e4e..8acb02878 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -47,7 +47,7 @@ type mempoolIDs struct { activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter } -// Reserve searches for the next unused ID and assignes it to the +// Reserve searches for the next unused ID and assigns it to the // peer. func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { ids.mtx.Lock() @@ -110,10 +110,16 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor { mempool: mempool, ids: newMempoolIDs(), } - memR.BaseReactor = *p2p.NewBaseReactor("Reactor", memR) + memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) return memR } +// InitPeer implements Reactor by creating a state for the peer. +func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer { + memR.ids.ReserveForPeer(peer) + return peer +} + // SetLogger sets the Logger on the reactor and the underlying mempool. func (memR *Reactor) SetLogger(l log.Logger) { memR.Logger = l @@ -142,7 +148,6 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. // It starts a broadcast routine ensuring all txs are forwarded to the given peer. func (memR *Reactor) AddPeer(peer p2p.Peer) { - memR.ids.ReserveForPeer(peer) go memR.broadcastTxRoutine(peer) } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index dff4c0d68..f732eca1e 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -223,3 +223,21 @@ func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { ids.ReserveForPeer(peer) }) } + +func TestDontExhaustMaxActiveIDs(t *testing.T) { + config := cfg.TestConfig() + const N = 1 + reactors := makeAndConnectReactors(config, N) + defer func() { + for _, r := range reactors { + r.Stop() + } + }() + reactor := reactors[0] + + for i := 0; i < maxActiveIDs+1; i++ { + peer := mock.NewPeer(nil) + reactor.Receive(MempoolChannel, peer, []byte{0x1, 0x2, 0x3}) + reactor.AddPeer(peer) + } +} diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 46d09d0cb..140c13f38 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -130,7 +130,7 @@ func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor { lastReceivedRequests: cmn.NewCMap(), crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo), } - r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r) + r.BaseReactor = *p2p.NewBaseReactor("PEX", r) return r }