From c52de487e77d49a5bf20be719973140645d09a61 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 17 Mar 2021 14:36:56 -0400 Subject: [PATCH] test: refactor mempool reactor to use new p2ptest infrastructure (#6250) --- mempool/reactor_test.go | 406 +++++++++++++++------------------------- 1 file changed, 156 insertions(+), 250 deletions(-) diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 1f1b3be1e..acf9921a4 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -1,8 +1,6 @@ package mempool import ( - "fmt" - "math/rand" "sync" "testing" "time" @@ -15,140 +13,142 @@ import ( "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/p2ptest" protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) -var rng = rand.New(rand.NewSource(time.Now().UnixNano())) - type reactorTestSuite struct { - reactor *Reactor + network *p2ptest.Network + logger log.Logger - peerID p2p.NodeID + reactors map[p2p.NodeID]*Reactor + mempoolChnnels map[p2p.NodeID]*p2p.Channel + mempools map[p2p.NodeID]*CListMempool + kvstores map[p2p.NodeID]*kvstore.Application - mempoolChannel *p2p.Channel - mempoolInCh chan p2p.Envelope - mempoolOutCh chan p2p.Envelope - mempoolPeerErrCh chan p2p.PeerError + peerChans map[p2p.NodeID]chan p2p.PeerUpdate + peerUpdates map[p2p.NodeID]*p2p.PeerUpdates - peerUpdatesCh chan p2p.PeerUpdate - peerUpdates *p2p.PeerUpdates + nodes []p2p.NodeID } -func setup(t *testing.T, cfg *cfg.MempoolConfig, logger log.Logger, chBuf uint) *reactorTestSuite { +func setup(t *testing.T, cfg *cfg.MempoolConfig, numNodes int, chBuf uint) *reactorTestSuite { t.Helper() - pID := make([]byte, 20) - _, err := rng.Read(pID) - require.NoError(t, err) + rts := &reactorTestSuite{ + logger: log.TestingLogger().With("testCase", t.Name()), + network: p2ptest.MakeNetwork(t, numNodes), + reactors: make(map[p2p.NodeID]*Reactor, numNodes), + mempoolChnnels: make(map[p2p.NodeID]*p2p.Channel, numNodes), + mempools: make(map[p2p.NodeID]*CListMempool, numNodes), + kvstores: make(map[p2p.NodeID]*kvstore.Application, numNodes), + peerChans: make(map[p2p.NodeID]chan p2p.PeerUpdate, numNodes), + peerUpdates: make(map[p2p.NodeID]*p2p.PeerUpdates, numNodes), + } - peerID, err := p2p.NewNodeID(fmt.Sprintf("%x", pID)) - require.NoError(t, err) + rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, MempoolChannel, new(protomem.Message), int(chBuf)) - peerUpdatesCh := make(chan p2p.PeerUpdate, chBuf) + i := 0 + for nodeID := range rts.network.Nodes { + rts.kvstores[nodeID] = kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(rts.kvstores[nodeID]) - rts := &reactorTestSuite{ - mempoolInCh: make(chan p2p.Envelope, chBuf), - mempoolOutCh: make(chan p2p.Envelope, chBuf), - mempoolPeerErrCh: make(chan p2p.PeerError, chBuf), - peerUpdatesCh: peerUpdatesCh, - peerUpdates: p2p.NewPeerUpdates(peerUpdatesCh), - peerID: peerID, - } + mempool, memCleanup := newMempoolWithApp(cc) + t.Cleanup(memCleanup) + mempool.SetLogger(rts.logger) + rts.mempools[nodeID] = mempool - rts.mempoolChannel = p2p.NewChannel( - MempoolChannel, - new(protomem.Message), - rts.mempoolInCh, - rts.mempoolOutCh, - rts.mempoolPeerErrCh, - ) + rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) + rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID]) + rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID]) - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mempool, memCleanup := newMempoolWithApp(cc) + rts.reactors[nodeID] = NewReactor( + rts.logger.With("nodeID", nodeID), + cfg, + rts.network.RandomNode().PeerManager, + mempool, + rts.mempoolChnnels[nodeID], + rts.peerUpdates[nodeID], + ) - mempool.SetLogger(logger) + rts.nodes = append(rts.nodes, nodeID) - rts.reactor = NewReactor( - logger, - cfg, - nil, - mempool, - rts.mempoolChannel, - rts.peerUpdates, - ) + require.NoError(t, rts.reactors[nodeID].Start()) + require.True(t, rts.reactors[nodeID].IsRunning()) + i++ + } - require.NoError(t, rts.reactor.Start()) - require.True(t, rts.reactor.IsRunning()) + require.Len(t, rts.reactors, numNodes) t.Cleanup(func() { - memCleanup() - require.NoError(t, rts.reactor.Stop()) - require.False(t, rts.reactor.IsRunning()) + for nodeID := range rts.reactors { + if rts.reactors[nodeID].IsRunning() { + require.NoError(t, rts.reactors[nodeID].Stop()) + require.False(t, rts.reactors[nodeID].IsRunning()) + } + } }) return rts } -func simulateRouter( - wg *sync.WaitGroup, - primary *reactorTestSuite, - suites []*reactorTestSuite, - numOut int, -) { +func (rts *reactorTestSuite) start(t *testing.T) { + t.Helper() + rts.network.Start(t) + require.Len(t, + rts.network.RandomNode().PeerManager.Peers(), + len(rts.nodes)-1, + "network does not have expected number of nodes") +} - wg.Add(1) +func (rts *reactorTestSuite) assertMempoolChannelsDrained(t *testing.T) { + t.Helper() - // create a mapping for efficient suite lookup by peer ID - suitesByPeerID := make(map[p2p.NodeID]*reactorTestSuite) - for _, suite := range suites { - suitesByPeerID[suite.peerID] = suite + for id, r := range rts.reactors { + require.NoError(t, r.Stop(), "stopping reactor %s", id) + r.Wait() + require.False(t, r.IsRunning(), "reactor %s did not stop", id) } - // Simulate a router by listening for all outbound envelopes and proxying the - // envelope to the respective peer (suite). - go func() { - for i := 0; i < numOut; i++ { - envelope := <-primary.mempoolOutCh - other := suitesByPeerID[envelope.To] - - other.mempoolInCh <- p2p.Envelope{ - From: primary.peerID, - To: envelope.To, - Message: envelope.Message, - } - } - - wg.Done() - }() + for id, mch := range rts.mempoolChnnels { + require.Empty(t, mch.Out, "checking channel %q", id) + } } -func waitForTxs(t *testing.T, txs types.Txs, suites ...*reactorTestSuite) { +func (rts *reactorTestSuite) waitForTxns(t *testing.T, txs types.Txs, ids ...p2p.NodeID) { t.Helper() - wg := new(sync.WaitGroup) + fn := func(pool *CListMempool) { + for pool.Size() < len(txs) { + time.Sleep(50 * time.Millisecond) + } - for _, suite := range suites { - wg.Add(1) + reapedTxs := pool.ReapMaxTxs(len(txs)) + require.Equal(t, len(txs), len(reapedTxs)) + for i, tx := range txs { + require.Equalf(t, + tx, + reapedTxs[i], + "txs at index %d in reactor mempool mismatch; got: %v, expected: %v", i, tx, reapedTxs[i], + ) + } + } - go func(s *reactorTestSuite) { - mempool := s.reactor.mempool - for mempool.Size() < len(txs) { - time.Sleep(time.Millisecond * 100) - } + if len(ids) == 1 { + fn(rts.reactors[ids[0]].mempool) + return + } - reapedTxs := mempool.ReapMaxTxs(len(txs)) - for i, tx := range txs { - require.Equalf( - t, tx, reapedTxs[i], - "txs at index %d in reactor mempool mismatch; got: %v, expected: %v", i, tx, reapedTxs[i], - ) - } + wg := &sync.WaitGroup{} + for id := range rts.mempools { + if len(ids) > 0 && !p2ptest.NodeInSlice(id, ids) { + continue + } - wg.Done() - }(suite) + wg.Add(1) + func(nid p2p.NodeID) { defer wg.Done(); fn(rts.reactors[nid].mempool) }(id) } wg.Wait() @@ -159,54 +159,25 @@ func TestReactorBroadcastTxs(t *testing.T) { numNodes := 10 config := cfg.TestConfig() - testSuites := make([]*reactorTestSuite, numNodes) - for i := 0; i < len(testSuites); i++ { - logger := log.TestingLogger().With("node", i) - testSuites[i] = setup(t, config.Mempool, logger, 0) - } - - // ignore all peer errors - for _, suite := range testSuites { - go func(s *reactorTestSuite) { - // drop all errors on the mempool channel - for range s.mempoolPeerErrCh { - } - }(suite) - } - - primary := testSuites[0] - secondaries := testSuites[1:] + rts := setup(t, config.Mempool, numNodes, 0) - // Simulate a router by listening for all outbound envelopes and proxying the - // envelopes to the respective peer (suite). - wg := new(sync.WaitGroup) - simulateRouter(wg, primary, testSuites, numTxs*len(secondaries)) + primary := rts.nodes[0] + secondaries := rts.nodes[1:] - txs := checkTxs(t, primary.reactor.mempool, numTxs, UnknownPeerID) + txs := checkTxs(t, rts.reactors[primary].mempool, numTxs, UnknownPeerID) - // Add each secondary suite (node) as a peer to the primary suite (node). This - // will cause the primary to gossip all mempool txs to the secondaries. - for _, suite := range secondaries { - primary.peerUpdatesCh <- p2p.PeerUpdate{ - Status: p2p.PeerStatusUp, - NodeID: suite.peerID, - } - } + // run the router + rts.start(t) // Wait till all secondary suites (reactor) received all mempool txs from the // primary suite (node). - waitForTxs(t, txs, secondaries...) + rts.waitForTxns(t, txs, secondaries...) - for _, suite := range testSuites { - require.Equal(t, len(txs), suite.reactor.mempool.Size()) + for _, pool := range rts.mempools { + require.Equal(t, len(txs), pool.Size()) } - wg.Wait() - - // ensure all channels are drained - for _, suite := range testSuites { - require.Empty(t, suite.mempoolOutCh) - } + rts.assertMempoolChannelsDrained(t) } // regression test for https://github.com/tendermint/tendermint/issues/5408 @@ -215,14 +186,12 @@ func TestReactorConcurrency(t *testing.T) { numNodes := 2 config := cfg.TestConfig() - testSuites := make([]*reactorTestSuite, numNodes) - for i := 0; i < len(testSuites); i++ { - logger := log.TestingLogger().With("node", i) - testSuites[i] = setup(t, config.Mempool, logger, 0) - } + rts := setup(t, config.Mempool, numNodes, 0) - primary := testSuites[0] - secondary := testSuites[1] + primary := rts.nodes[0] + secondary := rts.nodes[1] + + rts.start(t) var wg sync.WaitGroup @@ -231,37 +200,41 @@ func TestReactorConcurrency(t *testing.T) { // 1. submit a bunch of txs // 2. update the whole mempool - txs := checkTxs(t, primary.reactor.mempool, numTxs, UnknownPeerID) + + txs := checkTxs(t, rts.reactors[primary].mempool, numTxs, UnknownPeerID) go func() { defer wg.Done() - primary.reactor.mempool.Lock() - defer primary.reactor.mempool.Unlock() + mempool := rts.mempools[primary] + + mempool.Lock() + defer mempool.Unlock() deliverTxResponses := make([]*abci.ResponseDeliverTx, len(txs)) for i := range txs { deliverTxResponses[i] = &abci.ResponseDeliverTx{Code: 0} } - err := primary.reactor.mempool.Update(1, txs, deliverTxResponses, nil, nil) - require.NoError(t, err) + require.NoError(t, mempool.Update(1, txs, deliverTxResponses, nil, nil)) }() // 1. submit a bunch of txs // 2. update none - _ = checkTxs(t, secondary.reactor.mempool, numTxs, UnknownPeerID) + _ = checkTxs(t, rts.reactors[secondary].mempool, numTxs, UnknownPeerID) go func() { defer wg.Done() - secondary.reactor.mempool.Lock() - defer secondary.reactor.mempool.Unlock() + mempool := rts.mempools[secondary] - err := secondary.reactor.mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) + mempool.Lock() + defer mempool.Unlock() + + err := mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) require.NoError(t, err) }() // flush the mempool - secondary.reactor.mempool.Flush() + rts.mempools[secondary].Flush() } wg.Wait() @@ -272,42 +245,23 @@ func TestReactorNoBroadcastToSender(t *testing.T) { numNodes := 2 config := cfg.TestConfig() - testSuites := make([]*reactorTestSuite, numNodes) - for i := 0; i < len(testSuites); i++ { - logger := log.TestingLogger().With("node", i) - testSuites[i] = setup(t, config.Mempool, logger, uint(numTxs)) - } - - primary := testSuites[0] - secondary := testSuites[1] + rts := setup(t, config.Mempool, numNodes, uint(numTxs)) - // ignore all peer errors - for _, suite := range testSuites { - go func(s *reactorTestSuite) { - // drop all errors on the mempool channel - for range s.mempoolPeerErrCh { - } - }(suite) - } + primary := rts.nodes[0] + secondary := rts.nodes[1] peerID := uint16(1) - _ = checkTxs(t, primary.reactor.mempool, numTxs, peerID) + _ = checkTxs(t, rts.mempools[primary], numTxs, peerID) - primary.peerUpdatesCh <- p2p.PeerUpdate{ - Status: p2p.PeerStatusUp, - NodeID: secondary.peerID, - } + rts.start(t) time.Sleep(100 * time.Millisecond) require.Eventually(t, func() bool { - return secondary.reactor.mempool.Size() == 0 + return rts.mempools[secondary].Size() == 0 }, time.Minute, 100*time.Millisecond) - // ensure all channels are drained - for _, suite := range testSuites { - require.Empty(t, suite.mempoolOutCh) - } + rts.assertMempoolChannelsDrained(t) } func TestMempoolIDsBasic(t *testing.T) { @@ -329,86 +283,54 @@ func TestReactor_MaxTxBytes(t *testing.T) { numNodes := 2 config := cfg.TestConfig() - testSuites := make([]*reactorTestSuite, numNodes) - for i := 0; i < len(testSuites); i++ { - logger := log.TestingLogger().With("node", i) - testSuites[i] = setup(t, config.Mempool, logger, 0) - } + rts := setup(t, config.Mempool, numNodes, 0) - // ignore all peer errors - for _, suite := range testSuites { - go func(s *reactorTestSuite) { - // drop all errors on the mempool channel - for range s.mempoolPeerErrCh { - } - }(suite) - } - - primary := testSuites[0] - secondary := testSuites[1] - - // Simulate a router by listening for all outbound envelopes and proxying the - // envelopes to the respective peer (suite). - wg := new(sync.WaitGroup) - simulateRouter(wg, primary, testSuites, 1) + primary := rts.nodes[0] + secondary := rts.nodes[1] // Broadcast a tx, which has the max size and ensure it's received by the // second reactor. tx1 := tmrand.Bytes(config.Mempool.MaxTxBytes) - err := primary.reactor.mempool.CheckTx(tx1, nil, TxInfo{SenderID: UnknownPeerID}) + err := rts.reactors[primary].mempool.CheckTx(tx1, nil, TxInfo{SenderID: UnknownPeerID}) require.NoError(t, err) - primary.peerUpdatesCh <- p2p.PeerUpdate{ - Status: p2p.PeerStatusUp, - NodeID: secondary.peerID, - } + rts.start(t) // Wait till all secondary suites (reactor) received all mempool txs from the // primary suite (node). - waitForTxs(t, []types.Tx{tx1}, secondary) + rts.waitForTxns(t, []types.Tx{tx1}, secondary) - primary.reactor.mempool.Flush() - secondary.reactor.mempool.Flush() + rts.reactors[primary].mempool.Flush() + rts.reactors[secondary].mempool.Flush() // broadcast a tx, which is beyond the max size and ensure it's not sent tx2 := tmrand.Bytes(config.Mempool.MaxTxBytes + 1) - err = primary.reactor.mempool.CheckTx(tx2, nil, TxInfo{SenderID: UnknownPeerID}) + err = rts.mempools[primary].CheckTx(tx2, nil, TxInfo{SenderID: UnknownPeerID}) require.Error(t, err) - wg.Wait() - - // ensure all channels are drained - for _, suite := range testSuites { - require.Empty(t, suite.mempoolOutCh) - } + rts.assertMempoolChannelsDrained(t) } func TestDontExhaustMaxActiveIDs(t *testing.T) { config := cfg.TestConfig() - reactor := setup(t, config.Mempool, log.TestingLogger().With("node", 0), 0) - go func() { - // drop all messages on the mempool channel - for range reactor.mempoolOutCh { - } - }() + // we're creating a single node network, but not starting the + // network. + rts := setup(t, config.Mempool, 1, 0) - go func() { - // drop all errors on the mempool channel - for range reactor.mempoolPeerErrCh { - } - }() + nodeID := rts.nodes[0] peerID, err := p2p.NewNodeID("0011223344556677889900112233445566778899") require.NoError(t, err) // ensure the reactor does not panic (i.e. exhaust active IDs) for i := 0; i < maxActiveIDs+1; i++ { - reactor.peerUpdatesCh <- p2p.PeerUpdate{ + rts.peerChans[nodeID] <- p2p.PeerUpdate{ Status: p2p.PeerStatusUp, NodeID: peerID, } - reactor.mempoolOutCh <- p2p.Envelope{ + + rts.mempoolChnnels[nodeID].Out <- p2p.Envelope{ To: peerID, Message: &protomem.Txs{ Txs: [][]byte{}, @@ -416,7 +338,7 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) { } } - require.Empty(t, reactor.mempoolOutCh) + rts.assertMempoolChannelsDrained(t) } func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { @@ -446,32 +368,16 @@ func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { config := cfg.TestConfig() - testSuites := []*reactorTestSuite{ - setup(t, config.Mempool, log.TestingLogger().With("node", 0), 0), - setup(t, config.Mempool, log.TestingLogger().With("node", 1), 0), - } - - primary := testSuites[0] - secondary := testSuites[1] + rts := setup(t, config.Mempool, 2, 0) - // ignore all peer errors - for _, suite := range testSuites { - go func(s *reactorTestSuite) { - // drop all errors on the mempool channel - for range s.mempoolPeerErrCh { - } - }(suite) - } + primary := rts.nodes[0] + secondary := rts.nodes[1] - // connect peer - primary.peerUpdatesCh <- p2p.PeerUpdate{ - Status: p2p.PeerStatusUp, - NodeID: secondary.peerID, - } + rts.start(t) // disconnect peer - primary.peerUpdatesCh <- p2p.PeerUpdate{ + rts.peerChans[primary] <- p2p.PeerUpdate{ Status: p2p.PeerStatusDown, - NodeID: secondary.peerID, + NodeID: secondary, } }