Browse Source

mempool: fix mempool tests timeout (#5988)

pull/5989/head
Aleksandr Bezobchuk 4 years ago
committed by GitHub
parent
commit
642ecc3f5c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 33 deletions
  1. +0
    -5
      mempool/reactor.go
  2. +43
    -28
      mempool/reactor_test.go

+ 0
- 5
mempool/reactor.go View File

@ -224,16 +224,11 @@ func (r *Reactor) processMempoolCh() {
case envelope := <-r.mempoolCh.In(): case envelope := <-r.mempoolCh.In():
if err := r.handleMessage(r.mempoolCh.ID(), envelope); err != nil { if err := r.handleMessage(r.mempoolCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.mempoolCh.ID(), "envelope", envelope, "err", err) r.Logger.Error("failed to process message", "ch_id", r.mempoolCh.ID(), "envelope", envelope, "err", err)
fmt.Println("MESSAGE HANDLER ERROR:", err)
r.mempoolCh.Error() <- p2p.PeerError{ r.mempoolCh.Error() <- p2p.PeerError{
PeerID: envelope.From, PeerID: envelope.From,
Err: err, Err: err,
Severity: p2p.PeerErrorSeverityLow, Severity: p2p.PeerErrorSeverityLow,
} }
fmt.Println("SENT PEER ERROR ON CHANNEL")
} }
case <-r.closeCh: case <-r.closeCh:


+ 43
- 28
mempool/reactor_test.go View File

@ -97,7 +97,6 @@ func simulateRouter(
primary *reactorTestSuite, primary *reactorTestSuite,
suites []*reactorTestSuite, suites []*reactorTestSuite,
numOut int, numOut int,
dropChErr bool,
) { ) {
wg.Add(1) wg.Add(1)
@ -124,19 +123,6 @@ func simulateRouter(
wg.Done() wg.Done()
}() }()
go func() {
for pErr := range primary.mempoolPeerErrCh {
if dropChErr {
primary.reactor.Logger.Debug("dropped peer error", "err", pErr.Err)
} else {
primary.peerUpdatesCh <- p2p.PeerUpdate{
PeerID: pErr.PeerID,
Status: p2p.PeerStatusRemoved,
}
}
}
}()
} }
func waitForTxs(t *testing.T, txs types.Txs, suites ...*reactorTestSuite) { func waitForTxs(t *testing.T, txs types.Txs, suites ...*reactorTestSuite) {
@ -179,13 +165,22 @@ func TestReactorBroadcastTxs(t *testing.T) {
testSuites[i] = setup(t, config.Mempool, logger, 0) testSuites[i] = setup(t, config.Mempool, logger, 0)
} }
// ignore all peer errors
for _, suite := range testSuites {
go func(s *reactorTestSuite) {
// drop all errors on the mempool channel
for range s.mempoolPeerErrCh {
}
}(suite)
}
primary := testSuites[0] primary := testSuites[0]
secondaries := testSuites[1:] secondaries := testSuites[1:]
// Simulate a router by listening for all outbound envelopes and proxying the // Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite). // envelopes to the respective peer (suite).
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
simulateRouter(wg, primary, testSuites, numTxs*len(secondaries), true)
simulateRouter(wg, primary, testSuites, numTxs*len(secondaries))
txs := checkTxs(t, primary.reactor.mempool, numTxs, UnknownPeerID) txs := checkTxs(t, primary.reactor.mempool, numTxs, UnknownPeerID)
@ -286,11 +281,14 @@ func TestReactorNoBroadcastToSender(t *testing.T) {
primary := testSuites[0] primary := testSuites[0]
secondary := testSuites[1] secondary := testSuites[1]
go func() {
// drop all errors on the mempool channel
for range primary.mempoolPeerErrCh {
}
}()
// ignore all peer errors
for _, suite := range testSuites {
go func(s *reactorTestSuite) {
// drop all errors on the mempool channel
for range s.mempoolPeerErrCh {
}
}(suite)
}
peerID := uint16(1) peerID := uint16(1)
_ = checkTxs(t, primary.reactor.mempool, numTxs, peerID) _ = checkTxs(t, primary.reactor.mempool, numTxs, peerID)
@ -337,13 +335,22 @@ func TestReactor_MaxTxBytes(t *testing.T) {
testSuites[i] = setup(t, config.Mempool, logger, 0) testSuites[i] = setup(t, config.Mempool, logger, 0)
} }
// ignore all peer errors
for _, suite := range testSuites {
go func(s *reactorTestSuite) {
// drop all errors on the mempool channel
for range s.mempoolPeerErrCh {
}
}(suite)
}
primary := testSuites[0] primary := testSuites[0]
secondary := testSuites[1] secondary := testSuites[1]
// Simulate a router by listening for all outbound envelopes and proxying the // Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite). // envelopes to the respective peer (suite).
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
simulateRouter(wg, primary, testSuites, 1, true)
simulateRouter(wg, primary, testSuites, 1)
// Broadcast a tx, which has the max size and ensure it's received by the // Broadcast a tx, which has the max size and ensure it's received by the
// second reactor. // second reactor.
@ -439,14 +446,22 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
config := cfg.TestConfig() config := cfg.TestConfig()
primary := setup(t, config.Mempool, log.TestingLogger().With("node", 0), 0)
secondary := setup(t, config.Mempool, log.TestingLogger().With("node", 1), 0)
testSuites := []*reactorTestSuite{
setup(t, config.Mempool, log.TestingLogger().With("node", 0), 0),
setup(t, config.Mempool, log.TestingLogger().With("node", 1), 0),
}
go func() {
// drop all errors on the mempool channel
for range primary.mempoolPeerErrCh {
}
}()
primary := testSuites[0]
secondary := testSuites[1]
// ignore all peer errors
for _, suite := range testSuites {
go func(s *reactorTestSuite) {
// drop all errors on the mempool channel
for range s.mempoolPeerErrCh {
}
}(suite)
}
// connect peer // connect peer
primary.peerUpdatesCh <- p2p.PeerUpdate{ primary.peerUpdatesCh <- p2p.PeerUpdate{


Loading…
Cancel
Save