Browse Source

mempool: reserve IDs in InitPeer instead of AddPeer

rc1/v0.32.12
Tess Rinearson 4 years ago
committed by Anton Kaliaev
parent
commit
ffb0278d95
No known key found for this signature in database GPG Key ID: 7B6881D965918214
5 changed files with 29 additions and 6 deletions
  1. +1
    -1
      consensus/reactor.go
  2. +1
    -1
      evidence/reactor.go
  3. +8
    -3
      mempool/reactor.go
  4. +18
    -0
      mempool/reactor_test.go
  5. +1
    -1
      p2p/pex/pex_reactor.go

+ 1
- 1
consensus/reactor.go View File

@ -57,7 +57,7 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool, options
metrics: NopMetrics(),
}
conR.updateFastSyncingMetric()
conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
for _, option := range options {
option(conR)


+ 1
- 1
evidence/reactor.go View File

@ -34,7 +34,7 @@ func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor {
evR := &EvidenceReactor{
evpool: evpool,
}
evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR)
return evR
}


+ 8
- 3
mempool/reactor.go View File

@ -47,7 +47,7 @@ type mempoolIDs struct {
activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter
}
// Reserve searches for the next unused ID and assignes it to the
// Reserve searches for the next unused ID and assigns it to the
// peer.
func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
ids.mtx.Lock()
@ -110,10 +110,16 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor {
mempool: mempool,
ids: newMempoolIDs(),
}
memR.BaseReactor = *p2p.NewBaseReactor("Reactor", memR)
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
return memR
}
// InitPeer implements Reactor by creating a state for the peer.
func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer {
memR.ids.ReserveForPeer(peer)
return peer
}
// SetLogger sets the Logger on the reactor and the underlying mempool.
func (memR *Reactor) SetLogger(l log.Logger) {
memR.Logger = l
@ -142,7 +148,6 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor.
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *Reactor) AddPeer(peer p2p.Peer) {
memR.ids.ReserveForPeer(peer)
go memR.broadcastTxRoutine(peer)
}


+ 18
- 0
mempool/reactor_test.go View File

@ -223,3 +223,21 @@ func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
ids.ReserveForPeer(peer)
})
}
func TestDontExhaustMaxActiveIDs(t *testing.T) {
config := cfg.TestConfig()
const N = 1
reactors := makeAndConnectReactors(config, N)
defer func() {
for _, r := range reactors {
r.Stop()
}
}()
reactor := reactors[0]
for i := 0; i < maxActiveIDs+1; i++ {
peer := mock.NewPeer(nil)
reactor.Receive(MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
reactor.AddPeer(peer)
}
}

+ 1
- 1
p2p/pex/pex_reactor.go View File

@ -130,7 +130,7 @@ func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
lastReceivedRequests: cmn.NewCMap(),
crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo),
}
r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
r.BaseReactor = *p2p.NewBaseReactor("PEX", r)
return r
}


Loading…
Cancel
Save