diff --git a/mempool/reactor.go b/mempool/reactor.go index 151ef8ca8..f6ae9dc9c 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -224,16 +224,11 @@ func (r *Reactor) processMempoolCh() { case envelope := <-r.mempoolCh.In(): if err := r.handleMessage(r.mempoolCh.ID(), envelope); err != nil { r.Logger.Error("failed to process message", "ch_id", r.mempoolCh.ID(), "envelope", envelope, "err", err) - - fmt.Println("MESSAGE HANDLER ERROR:", err) - r.mempoolCh.Error() <- p2p.PeerError{ PeerID: envelope.From, Err: err, Severity: p2p.PeerErrorSeverityLow, } - - fmt.Println("SENT PEER ERROR ON CHANNEL") } case <-r.closeCh: diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index bcb6bcfef..c878a5dca 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -97,7 +97,6 @@ func simulateRouter( primary *reactorTestSuite, suites []*reactorTestSuite, numOut int, - dropChErr bool, ) { wg.Add(1) @@ -124,19 +123,6 @@ func simulateRouter( wg.Done() }() - - go func() { - for pErr := range primary.mempoolPeerErrCh { - if dropChErr { - primary.reactor.Logger.Debug("dropped peer error", "err", pErr.Err) - } else { - primary.peerUpdatesCh <- p2p.PeerUpdate{ - PeerID: pErr.PeerID, - Status: p2p.PeerStatusRemoved, - } - } - } - }() } func waitForTxs(t *testing.T, txs types.Txs, suites ...*reactorTestSuite) { @@ -179,13 +165,22 @@ func TestReactorBroadcastTxs(t *testing.T) { testSuites[i] = setup(t, config.Mempool, logger, 0) } + // ignore all peer errors + for _, suite := range testSuites { + go func(s *reactorTestSuite) { + // drop all errors on the mempool channel + for range s.mempoolPeerErrCh { + } + }(suite) + } + primary := testSuites[0] secondaries := testSuites[1:] // Simulate a router by listening for all outbound envelopes and proxying the // envelopes to the respective peer (suite). wg := new(sync.WaitGroup) - simulateRouter(wg, primary, testSuites, numTxs*len(secondaries), true) + simulateRouter(wg, primary, testSuites, numTxs*len(secondaries)) txs := checkTxs(t, primary.reactor.mempool, numTxs, UnknownPeerID) @@ -286,11 +281,14 @@ func TestReactorNoBroadcastToSender(t *testing.T) { primary := testSuites[0] secondary := testSuites[1] - go func() { - // drop all errors on the mempool channel - for range primary.mempoolPeerErrCh { - } - }() + // ignore all peer errors + for _, suite := range testSuites { + go func(s *reactorTestSuite) { + // drop all errors on the mempool channel + for range s.mempoolPeerErrCh { + } + }(suite) + } peerID := uint16(1) _ = checkTxs(t, primary.reactor.mempool, numTxs, peerID) @@ -337,13 +335,22 @@ func TestReactor_MaxTxBytes(t *testing.T) { testSuites[i] = setup(t, config.Mempool, logger, 0) } + // ignore all peer errors + for _, suite := range testSuites { + go func(s *reactorTestSuite) { + // drop all errors on the mempool channel + for range s.mempoolPeerErrCh { + } + }(suite) + } + primary := testSuites[0] secondary := testSuites[1] // Simulate a router by listening for all outbound envelopes and proxying the // envelopes to the respective peer (suite). wg := new(sync.WaitGroup) - simulateRouter(wg, primary, testSuites, 1, true) + simulateRouter(wg, primary, testSuites, 1) // Broadcast a tx, which has the max size and ensure it's received by the // second reactor. @@ -439,14 +446,22 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { config := cfg.TestConfig() - primary := setup(t, config.Mempool, log.TestingLogger().With("node", 0), 0) - secondary := setup(t, config.Mempool, log.TestingLogger().With("node", 1), 0) + testSuites := []*reactorTestSuite{ + setup(t, config.Mempool, log.TestingLogger().With("node", 0), 0), + setup(t, config.Mempool, log.TestingLogger().With("node", 1), 0), + } - go func() { - // drop all errors on the mempool channel - for range primary.mempoolPeerErrCh { - } - }() + primary := testSuites[0] + secondary := testSuites[1] + + // ignore all peer errors + for _, suite := range testSuites { + go func(s *reactorTestSuite) { + // drop all errors on the mempool channel + for range s.mempoolPeerErrCh { + } + }(suite) + } // connect peer primary.peerUpdatesCh <- p2p.PeerUpdate{