diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 564dcd3b5..3fe4d8241 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -93,7 +93,14 @@ func setup(t *testing.T, cfg *cfg.MempoolConfig, logger log.Logger, chBuf uint) return rts } -func simulateRouter(wg *sync.WaitGroup, primary *reactorTestSuite, suites []*reactorTestSuite, numOut int) { +func simulateRouter( + wg *sync.WaitGroup, + primary *reactorTestSuite, + suites []*reactorTestSuite, + numOut int, + dropChErr bool, +) { + wg.Add(1) // create a mapping for efficient suite lookup by peer ID @@ -118,6 +125,19 @@ func simulateRouter(wg *sync.WaitGroup, primary *reactorTestSuite, suites []*rea wg.Done() }() + + go func() { + for pErr := range primary.mempoolPeerErrCh { + if dropChErr { + primary.reactor.Logger.Debug("dropped peer error", "err", pErr.Err) + } else { + primary.peerUpdatesCh <- p2p.PeerUpdate{ + PeerID: pErr.PeerID, + Status: p2p.PeerStatusRemoved, + } + } + } + }() } func waitForTxs(t *testing.T, txs types.Txs, suites ...*reactorTestSuite) { @@ -166,7 +186,7 @@ func TestReactorBroadcastTxs(t *testing.T) { // Simulate a router by listening for all outbound envelopes and proxying the // envelopes to the respective peer (suite). wg := new(sync.WaitGroup) - simulateRouter(wg, primary, testSuites, numTxs*len(secondaries)) + simulateRouter(wg, primary, testSuites, numTxs*len(secondaries), true) txs := checkTxs(t, primary.reactor.mempool, numTxs, UnknownPeerID) @@ -318,7 +338,7 @@ func TestReactor_MaxTxBytes(t *testing.T) { // Simulate a router by listening for all outbound envelopes and proxying the // envelopes to the respective peer (suite). wg := new(sync.WaitGroup) - simulateRouter(wg, primary, testSuites, 1) + simulateRouter(wg, primary, testSuites, 1, true) // Broadcast a tx, which has the max size and ensure it's received by the // second reactor. @@ -356,10 +376,17 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) { reactor := setup(t, config.Mempool, log.TestingLogger().With("node", 0), 0) go func() { + // drop all messages on the mempool channel for range reactor.mempoolOutCh { } }() + go func() { + // drop all errors on the mempool channel + for range reactor.mempoolPeerErrCh { + } + }() + peerID, err := p2p.NewNodeID("00ffaa") require.NoError(t, err) @@ -410,6 +437,12 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { primary := setup(t, config.Mempool, log.TestingLogger().With("node", 0), 0) secondary := setup(t, config.Mempool, log.TestingLogger().With("node", 1), 0) + go func() { + // drop all errors on the mempool channel + for range primary.mempoolPeerErrCh { + } + }() + // connect peer primary.peerUpdatesCh <- p2p.PeerUpdate{ Status: p2p.PeerStatusUp,