From e2a103a3152977ae2cee0cfd783e01899b0c1828 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 27 Oct 2021 15:45:17 +0200 Subject: [PATCH] mempool: port reactor tests from legacy implementation (#7162) --- internal/mempool/v1/mempool_test.go | 10 + internal/mempool/v1/reactor_test.go | 273 ++++++++++++++++++++++++++++ 2 files changed, 283 insertions(+) diff --git a/internal/mempool/v1/mempool_test.go b/internal/mempool/v1/mempool_test.go index 72a72861c..89c1a6685 100644 --- a/internal/mempool/v1/mempool_test.go +++ b/internal/mempool/v1/mempool_test.go @@ -118,6 +118,16 @@ func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx return txs } +func convertTex(in []testTx) types.Txs { + out := make([]types.Tx, len(in)) + + for idx := range in { + out[idx] = in[idx].tx + } + + return out +} + func TestTxMempool_TxsAvailable(t *testing.T) { txmp := setup(t, 0) txmp.EnableTxsAvailable() diff --git a/internal/mempool/v1/reactor_test.go b/internal/mempool/v1/reactor_test.go index 2a92a11c5..317f7ce73 100644 --- a/internal/mempool/v1/reactor_test.go +++ b/internal/mempool/v1/reactor_test.go @@ -1,18 +1,24 @@ package v1 import ( + "context" "os" "strings" "sync" "testing" + "time" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/abci/example/kvstore" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" tmsync "github.com/tendermint/tendermint/internal/libs/sync" + "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/p2ptest" "github.com/tendermint/tendermint/libs/log" + tmrand "github.com/tendermint/tendermint/libs/rand" + protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" "github.com/tendermint/tendermint/types" ) @@ -100,6 +106,47 @@ func (rts *reactorTestSuite) start(t *testing.T) { "network does not have expected number of nodes") } +func (rts *reactorTestSuite) assertMempoolChannelsDrained(t *testing.T) { + t.Helper() + + rts.stop(t) + + for _, mch := range rts.mempoolChannels { + require.Empty(t, mch.Out, "checking channel %q (len=%d)", mch.ID, len(mch.Out)) + } +} + +func (rts *reactorTestSuite) stop(t *testing.T) { + for id, r := range rts.reactors { + require.NoError(t, r.Stop(), "stopping reactor %s", id) + r.Wait() + require.False(t, r.IsRunning(), "reactor %s did not stop", id) + } +} + +func (rts *reactorTestSuite) waitForTxns(t *testing.T, txs []types.Tx, ids ...types.NodeID) { + t.Helper() + + // ensure that the transactions get fully broadcast to the + // rest of the network + wg := &sync.WaitGroup{} + for name, pool := range rts.mempools { + if !p2ptest.NodeInSlice(name, ids) { + continue + } + + wg.Add(1) + go func(pool *TxMempool) { + defer wg.Done() + require.Eventually(t, func() bool { return len(txs) == pool.Size() }, + time.Minute, + 100*time.Millisecond, + ) + }(pool) + } + wg.Wait() +} + func TestReactorBroadcastDoesNotPanic(t *testing.T) { numNodes := 2 rts := setupReactors(t, numNodes, 0) @@ -142,3 +189,229 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) { primaryReactor.peerWG.Wait() wg.Wait() } + +func TestReactorBroadcastTxs(t *testing.T) { + numTxs := 1000 + numNodes := 10 + + rts := setupReactors(t, numNodes, 0) + + primary := rts.nodes[0] + secondaries := rts.nodes[1:] + + txs := checkTxs(t, rts.reactors[primary].mempool, numTxs, mempool.UnknownPeerID) + + // run the router + rts.start(t) + + // Wait till all secondary suites (reactor) received all mempool txs from the + // primary suite (node). + rts.waitForTxns(t, convertTex(txs), secondaries...) + + rts.stop(t) +} + +// regression test for https://github.com/tendermint/tendermint/issues/5408 +func TestReactorConcurrency(t *testing.T) { + numTxs := 5 + numNodes := 2 + + rts := setupReactors(t, numNodes, 0) + + primary := rts.nodes[0] + secondary := rts.nodes[1] + + rts.start(t) + + var wg sync.WaitGroup + + for i := 0; i < 1000; i++ { + wg.Add(2) + + // 1. submit a bunch of txs + // 2. update the whole mempool + + txs := checkTxs(t, rts.reactors[primary].mempool, numTxs, mempool.UnknownPeerID) + go func() { + defer wg.Done() + + mempool := rts.mempools[primary] + + mempool.Lock() + defer mempool.Unlock() + + deliverTxResponses := make([]*abci.ResponseDeliverTx, len(txs)) + for i := range txs { + deliverTxResponses[i] = &abci.ResponseDeliverTx{Code: 0} + } + + require.NoError(t, mempool.Update(1, convertTex(txs), deliverTxResponses, nil, nil)) + }() + + // 1. submit a bunch of txs + // 2. update none + _ = checkTxs(t, rts.reactors[secondary].mempool, numTxs, mempool.UnknownPeerID) + go func() { + defer wg.Done() + + mempool := rts.mempools[secondary] + + mempool.Lock() + defer mempool.Unlock() + + err := mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) + require.NoError(t, err) + }() + + // flush the mempool + rts.mempools[secondary].Flush() + } + + wg.Wait() +} + +func TestReactorNoBroadcastToSender(t *testing.T) { + numTxs := 1000 + numNodes := 2 + + rts := setupReactors(t, numNodes, uint(numTxs)) + + primary := rts.nodes[0] + secondary := rts.nodes[1] + + peerID := uint16(1) + _ = checkTxs(t, rts.mempools[primary], numTxs, peerID) + + rts.start(t) + + time.Sleep(100 * time.Millisecond) + + require.Eventually(t, func() bool { + return rts.mempools[secondary].Size() == 0 + }, time.Minute, 100*time.Millisecond) + + rts.assertMempoolChannelsDrained(t) +} + +func TestReactor_MaxTxBytes(t *testing.T) { + numNodes := 2 + cfg := config.TestConfig() + + rts := setupReactors(t, numNodes, 0) + + primary := rts.nodes[0] + secondary := rts.nodes[1] + + // Broadcast a tx, which has the max size and ensure it's received by the + // second reactor. + tx1 := tmrand.Bytes(cfg.Mempool.MaxTxBytes) + err := rts.reactors[primary].mempool.CheckTx( + context.Background(), + tx1, + nil, + mempool.TxInfo{ + SenderID: mempool.UnknownPeerID, + }, + ) + require.NoError(t, err) + + rts.start(t) + + rts.reactors[primary].mempool.Flush() + rts.reactors[secondary].mempool.Flush() + + // broadcast a tx, which is beyond the max size and ensure it's not sent + tx2 := tmrand.Bytes(cfg.Mempool.MaxTxBytes + 1) + err = rts.mempools[primary].CheckTx(context.Background(), tx2, nil, mempool.TxInfo{SenderID: mempool.UnknownPeerID}) + require.Error(t, err) + + rts.assertMempoolChannelsDrained(t) +} + +func TestDontExhaustMaxActiveIDs(t *testing.T) { + // we're creating a single node network, but not starting the + // network. + rts := setupReactors(t, 1, mempool.MaxActiveIDs+1) + + nodeID := rts.nodes[0] + + peerID, err := types.NewNodeID("0011223344556677889900112233445566778899") + require.NoError(t, err) + + // ensure the reactor does not panic (i.e. exhaust active IDs) + for i := 0; i < mempool.MaxActiveIDs+1; i++ { + rts.peerChans[nodeID] <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + NodeID: peerID, + } + + rts.mempoolChannels[nodeID].Out <- p2p.Envelope{ + To: peerID, + Message: &protomem.Txs{ + Txs: [][]byte{}, + }, + } + } + + require.Eventually( + t, + func() bool { + for _, mch := range rts.mempoolChannels { + if len(mch.Out) > 0 { + return false + } + } + + return true + }, + time.Minute, + 10*time.Millisecond, + ) + + rts.assertMempoolChannelsDrained(t) +} + +func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + // 0 is already reserved for UnknownPeerID + ids := mempool.NewMempoolIDs() + + peerID, err := types.NewNodeID("0011223344556677889900112233445566778899") + require.NoError(t, err) + + for i := 0; i < mempool.MaxActiveIDs-1; i++ { + ids.ReserveForPeer(peerID) + } + + require.Panics(t, func() { + ids.ReserveForPeer(peerID) + }) +} + +func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + rts := setupReactors(t, 2, 0) + + primary := rts.nodes[0] + secondary := rts.nodes[1] + + rts.start(t) + + // disconnect peer + rts.peerChans[primary] <- p2p.PeerUpdate{ + Status: p2p.PeerStatusDown, + NodeID: secondary, + } + time.Sleep(500 * time.Millisecond) + + txs := checkTxs(t, rts.reactors[primary].mempool, 4, mempool.UnknownPeerID) + require.Equal(t, 4, len(txs)) + require.Equal(t, 4, rts.mempools[primary].Size()) + require.Equal(t, 0, rts.mempools[secondary].Size()) +}