diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index 9d955319a..e03745c1a 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -1,8 +1,6 @@ package v0 import ( - "fmt" - "math/rand" "os" "testing" "time" @@ -14,6 +12,7 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/mempool/mock" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/p2ptest" bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" @@ -22,39 +21,80 @@ import ( dbm "github.com/tendermint/tm-db" ) -var rng = rand.New(rand.NewSource(time.Now().UnixNano())) - type reactorTestSuite struct { - reactor *Reactor - app proxy.AppConns + network *p2ptest.Network + logger log.Logger + nodes []p2p.NodeID - peerID p2p.NodeID + reactors map[p2p.NodeID]*Reactor + app map[p2p.NodeID]proxy.AppConns - blockchainChannel *p2p.Channel - blockchainInCh chan p2p.Envelope - blockchainOutCh chan p2p.Envelope - blockchainPeerErrCh chan p2p.PeerError + blockchainChannels map[p2p.NodeID]*p2p.Channel + peerChans map[p2p.NodeID]chan p2p.PeerUpdate + peerUpdates map[p2p.NodeID]*p2p.PeerUpdates - peerUpdatesCh chan p2p.PeerUpdate - peerUpdates *p2p.PeerUpdates + fastSync bool } func setup( t *testing.T, genDoc *types.GenesisDoc, - privVals []types.PrivValidator, - maxBlockHeight int64, + privVal types.PrivValidator, + maxBlockHeights []int64, chBuf uint, ) *reactorTestSuite { t.Helper() - require.Len(t, privVals, 1, "only one validator can be supported") + numNodes := len(maxBlockHeights) + require.True(t, numNodes >= 1, + "must specify at least one block height (nodes)") + + rts := &reactorTestSuite{ + logger: log.TestingLogger().With("module", "blockchain", "testCase", t.Name()), + network: p2ptest.MakeNetwork(t, numNodes), + nodes: make([]p2p.NodeID, 0, numNodes), + reactors: make(map[p2p.NodeID]*Reactor, numNodes), + app: make(map[p2p.NodeID]proxy.AppConns, numNodes), + blockchainChannels: make(map[p2p.NodeID]*p2p.Channel, numNodes), + peerChans: make(map[p2p.NodeID]chan p2p.PeerUpdate, numNodes), + peerUpdates: make(map[p2p.NodeID]*p2p.PeerUpdates, numNodes), + fastSync: true, + } + + rts.blockchainChannels = rts.network.MakeChannelsNoCleanup(t, BlockchainChannel, new(bcproto.Message), int(chBuf)) + + i := 0 + for nodeID := range rts.network.Nodes { + rts.addNode(t, nodeID, genDoc, privVal, maxBlockHeights[i]) + i++ + } + + t.Cleanup(func() { + for _, nodeID := range rts.nodes { + rts.peerUpdates[nodeID].Close() + + if rts.reactors[nodeID].IsRunning() { + require.NoError(t, rts.reactors[nodeID].Stop()) + require.NoError(t, rts.app[nodeID].Stop()) + require.False(t, rts.reactors[nodeID].IsRunning()) + } + } + }) + + return rts +} - app := &abci.BaseApplication{} - cc := proxy.NewLocalClientCreator(app) +func (rts *reactorTestSuite) addNode(t *testing.T, + nodeID p2p.NodeID, + genDoc *types.GenesisDoc, + privVal types.PrivValidator, + maxBlockHeight int64, +) { + t.Helper() - proxyApp := proxy.NewAppConns(cc) - require.NoError(t, proxyApp.Start()) + rts.nodes = append(rts.nodes, nodeID) + rts.app[nodeID] = proxy.NewAppConns(proxy.NewLocalClientCreator(&abci.BaseApplication{})) + require.NoError(t, rts.app[nodeID].Start()) blockDB := dbm.NewMemDB() stateDB := dbm.NewMemDB() @@ -64,14 +104,13 @@ func setup( state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) require.NoError(t, err) - fastSync := true db := dbm.NewMemDB() stateStore = sm.NewStore(db) blockExec := sm.NewBlockExecutor( stateStore, log.TestingLogger(), - proxyApp.Consensus(), + rts.app[nodeID].Consensus(), mock.Mempool{}, sm.EmptyEvidencePool{}, ) @@ -88,7 +127,7 @@ func setup( lastBlock.Header.Height, lastBlockMeta.BlockID, state.Validators, - privVals[0], + privVal, lastBlock.Header.ChainID, time.Now(), ) @@ -112,100 +151,31 @@ func setup( blockStore.SaveBlock(thisBlock, thisParts, lastCommit) } - pID := make([]byte, 16) - _, err = rng.Read(pID) - require.NoError(t, err) - - peerUpdatesCh := make(chan p2p.PeerUpdate, chBuf) - - rts := &reactorTestSuite{ - app: proxyApp, - blockchainInCh: make(chan p2p.Envelope, chBuf), - blockchainOutCh: make(chan p2p.Envelope, chBuf), - blockchainPeerErrCh: make(chan p2p.PeerError, chBuf), - peerUpdatesCh: peerUpdatesCh, - peerUpdates: p2p.NewPeerUpdates(peerUpdatesCh), - peerID: p2p.NodeID(fmt.Sprintf("%x", pID)), - } - - rts.blockchainChannel = p2p.NewChannel( - BlockchainChannel, - new(bcproto.Message), - rts.blockchainInCh, - rts.blockchainOutCh, - rts.blockchainPeerErrCh, - ) - - reactor, err := NewReactor( - log.TestingLogger().With("module", "blockchain", "node", rts.peerID), + rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) + rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID]) + rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID]) + rts.reactors[nodeID], err = NewReactor( + rts.logger.With("nodeID", nodeID), state.Copy(), blockExec, blockStore, nil, - rts.blockchainChannel, - rts.peerUpdates, - fastSync, - ) - + rts.blockchainChannels[nodeID], + rts.peerUpdates[nodeID], + rts.fastSync) require.NoError(t, err) - rts.reactor = reactor - - require.NoError(t, rts.reactor.Start()) - require.True(t, rts.reactor.IsRunning()) - - t.Cleanup(func() { - require.NoError(t, rts.reactor.Stop()) - require.NoError(t, rts.app.Stop()) - require.False(t, rts.reactor.IsRunning()) - }) - return rts + require.NoError(t, rts.reactors[nodeID].Start()) + require.True(t, rts.reactors[nodeID].IsRunning()) } -func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite, dropChErr bool) { - // create a mapping for efficient suite lookup by peer ID - suitesByPeerID := make(map[p2p.NodeID]*reactorTestSuite) - for _, suite := range suites { - suitesByPeerID[suite.peerID] = suite - } - - // Simulate a router by listening for all outbound envelopes and proxying the - // envelope to the respective peer (suite). - go func() { - for envelope := range primary.blockchainOutCh { - if envelope.Broadcast { - for _, s := range suites { - // broadcast to everyone except source - if s.peerID != primary.peerID { - s.blockchainInCh <- p2p.Envelope{ - From: primary.peerID, - To: s.peerID, - Message: envelope.Message, - } - } - } - } else { - suitesByPeerID[envelope.To].blockchainInCh <- p2p.Envelope{ - From: primary.peerID, - To: envelope.To, - Message: envelope.Message, - } - } - } - }() - - go func() { - for pErr := range primary.blockchainPeerErrCh { - if dropChErr { - primary.reactor.Logger.Debug("dropped peer error", "err", pErr.Err) - } else { - primary.peerUpdatesCh <- p2p.PeerUpdate{ - NodeID: pErr.NodeID, - Status: p2p.PeerStatusDown, - } - } - } - }() +func (rts *reactorTestSuite) start(t *testing.T) { + t.Helper() + rts.network.Start(t) + require.Len(t, + rts.network.RandomNode().PeerManager.Peers(), + len(rts.nodes)-1, + "network does not have expected number of nodes") } func TestReactor_AbruptDisconnect(t *testing.T) { @@ -214,28 +184,15 @@ func TestReactor_AbruptDisconnect(t *testing.T) { genDoc, privVals := randGenesisDoc(config, 1, false, 30) maxBlockHeight := int64(64) - testSuites := []*reactorTestSuite{ - setup(t, genDoc, privVals, maxBlockHeight, 0), - setup(t, genDoc, privVals, 0, 0), - } - require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height()) + rts := setup(t, genDoc, privVals[0], []int64{maxBlockHeight, 0}, 0) - for _, s := range testSuites { - simulateRouter(s, testSuites, true) + require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height()) - // connect reactor to every other reactor - for _, ss := range testSuites { - if s.peerID != ss.peerID { - s.peerUpdatesCh <- p2p.PeerUpdate{ - Status: p2p.PeerStatusUp, - NodeID: ss.peerID, - } - } - } - } + rts.start(t) + + secondaryPool := rts.reactors[rts.nodes[1]].pool - secondaryPool := testSuites[1].reactor.pool require.Eventually( t, func() bool { @@ -249,10 +206,11 @@ func TestReactor_AbruptDisconnect(t *testing.T) { // Remove synced node from the syncing node which should not result in any // deadlocks or race conditions within the context of poolRoutine. - testSuites[1].peerUpdatesCh <- p2p.PeerUpdate{ + rts.peerChans[rts.nodes[1]] <- p2p.PeerUpdate{ Status: p2p.PeerStatusDown, - NodeID: testSuites[0].peerID, + NodeID: rts.nodes[0], } + require.NoError(t, rts.network.Nodes[rts.nodes[1]].PeerManager.Disconnected(rts.nodes[0])) } func TestReactor_NoBlockResponse(t *testing.T) { @@ -261,26 +219,12 @@ func TestReactor_NoBlockResponse(t *testing.T) { genDoc, privVals := randGenesisDoc(config, 1, false, 30) maxBlockHeight := int64(65) - testSuites := []*reactorTestSuite{ - setup(t, genDoc, privVals, maxBlockHeight, 0), - setup(t, genDoc, privVals, 0, 0), - } - require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height()) + rts := setup(t, genDoc, privVals[0], []int64{maxBlockHeight, 0}, 0) - for _, s := range testSuites { - simulateRouter(s, testSuites, true) + require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height()) - // connect reactor to every other reactor - for _, ss := range testSuites { - if s.peerID != ss.peerID { - s.peerUpdatesCh <- p2p.PeerUpdate{ - Status: p2p.PeerStatusUp, - NodeID: ss.peerID, - } - } - } - } + rts.start(t) testCases := []struct { height int64 @@ -292,7 +236,7 @@ func TestReactor_NoBlockResponse(t *testing.T) { {100, false}, } - secondaryPool := testSuites[1].reactor.pool + secondaryPool := rts.reactors[rts.nodes[1]].pool require.Eventually( t, func() bool { return secondaryPool.MaxPeerHeight() > 0 && secondaryPool.IsCaughtUp() }, @@ -302,7 +246,7 @@ func TestReactor_NoBlockResponse(t *testing.T) { ) for _, tc := range testCases { - block := testSuites[1].reactor.store.LoadBlock(tc.height) + block := rts.reactors[rts.nodes[1]].store.LoadBlock(tc.height) if tc.existent { require.True(t, block != nil) } else { @@ -323,36 +267,18 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) { maxBlockHeight := int64(48) genDoc, privVals := randGenesisDoc(config, 1, false, 30) - testSuites := []*reactorTestSuite{ - setup(t, genDoc, privVals, maxBlockHeight, 1000), // fully synced node - setup(t, genDoc, privVals, 0, 1000), - setup(t, genDoc, privVals, 0, 1000), - setup(t, genDoc, privVals, 0, 1000), - setup(t, genDoc, privVals, 0, 1000), // new node - } + rts := setup(t, genDoc, privVals[0], []int64{maxBlockHeight, 0, 0, 0, 0}, 1000) - require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height()) + require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height()) - for _, s := range testSuites[:len(testSuites)-1] { - simulateRouter(s, testSuites, true) - - // connect reactor to every other reactor except the new node - for _, ss := range testSuites[:len(testSuites)-1] { - if s.peerID != ss.peerID { - s.peerUpdatesCh <- p2p.PeerUpdate{ - Status: p2p.PeerStatusUp, - NodeID: ss.peerID, - } - } - } - } + rts.start(t) require.Eventually( t, func() bool { caughtUp := true - for _, s := range testSuites[1 : len(testSuites)-1] { - if s.reactor.pool.MaxPeerHeight() == 0 || !s.reactor.pool.IsCaughtUp() { + for _, id := range rts.nodes[1 : len(rts.nodes)-1] { + if rts.reactors[id].pool.MaxPeerHeight() == 0 || !rts.reactors[id].pool.IsCaughtUp() { caughtUp = false } } @@ -364,8 +290,8 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) { "expected all nodes to be fully synced", ) - for _, s := range testSuites[:len(testSuites)-1] { - require.Len(t, s.reactor.pool.peers, 3) + for _, id := range rts.nodes[:len(rts.nodes)-1] { + require.Len(t, rts.reactors[id].pool.peers, 3) } // Mark testSuites[3] as an invalid peer which will cause newSuite to disconnect @@ -374,28 +300,18 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) { // XXX: This causes a potential race condition. // See: https://github.com/tendermint/tendermint/issues/6005 otherGenDoc, otherPrivVals := randGenesisDoc(config, 1, false, 30) - otherSuite := setup(t, otherGenDoc, otherPrivVals, maxBlockHeight, 0) - testSuites[3].reactor.store = otherSuite.reactor.store + newNode := rts.network.MakeNode(t) + rts.addNode(t, newNode.NodeID, otherGenDoc, otherPrivVals[0], maxBlockHeight) // add a fake peer just so we do not wait for the consensus ticker to timeout - otherSuite.reactor.pool.SetPeerRange("00ff", 10, 10) - - // start the new peer's faux router - newSuite := testSuites[len(testSuites)-1] - simulateRouter(newSuite, testSuites, false) - - // connect all nodes to the new peer - for _, s := range testSuites[:len(testSuites)-1] { - newSuite.peerUpdatesCh <- p2p.PeerUpdate{ - Status: p2p.PeerStatusUp, - NodeID: s.peerID, - } - } + rts.reactors[newNode.NodeID].pool.SetPeerRange("00ff", 10, 10) // wait for the new peer to catch up and become fully synced require.Eventually( t, - func() bool { return newSuite.reactor.pool.MaxPeerHeight() > 0 && newSuite.reactor.pool.IsCaughtUp() }, + func() bool { + return rts.reactors[newNode.NodeID].pool.MaxPeerHeight() > 0 && rts.reactors[newNode.NodeID].pool.IsCaughtUp() + }, 10*time.Minute, 10*time.Millisecond, "expected new node to be fully synced", @@ -403,11 +319,11 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) { require.Eventuallyf( t, - func() bool { return len(newSuite.reactor.pool.peers) < len(testSuites)-1 }, + func() bool { return len(rts.reactors[newNode.NodeID].pool.peers) < len(rts.nodes)-1 }, 10*time.Minute, 10*time.Millisecond, "invalid number of peers; expected < %d, got: %d", - len(testSuites)-1, - len(newSuite.reactor.pool.peers), + len(rts.nodes)-1, + len(rts.reactors[newNode.NodeID].pool.peers), ) } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index acf9921a4..c9eb4fba3 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -50,7 +50,6 @@ func setup(t *testing.T, cfg *cfg.MempoolConfig, numNodes int, chBuf uint) *reac rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, MempoolChannel, new(protomem.Message), int(chBuf)) - i := 0 for nodeID := range rts.network.Nodes { rts.kvstores[nodeID] = kvstore.NewApplication() cc := proxy.NewLocalClientCreator(rts.kvstores[nodeID]) @@ -77,7 +76,6 @@ func setup(t *testing.T, cfg *cfg.MempoolConfig, numNodes int, chBuf uint) *reac require.NoError(t, rts.reactors[nodeID].Start()) require.True(t, rts.reactors[nodeID].IsRunning()) - i++ } require.Len(t, rts.reactors, numNodes) diff --git a/p2p/p2ptest/network.go b/p2p/p2ptest/network.go index 30e508287..98fad603b 100644 --- a/p2p/p2ptest/network.go +++ b/p2p/p2ptest/network.go @@ -36,7 +36,7 @@ func MakeNetwork(t *testing.T, nodes int) *Network { } for i := 0; i < nodes; i++ { - node := MakeNode(t, network) + node := network.MakeNode(t) network.Nodes[node.NodeID] = node } @@ -197,8 +197,10 @@ type Node struct { Transport *p2p.MemoryTransport } -// MakeNode creates a new Node. -func MakeNode(t *testing.T, network *Network) *Node { +// MakeNode creates a new Node configured for the network with a +// running peer manager, but does not add it to the existing +// network. Callers are responsible for updating peering relationships. +func (n *Network) MakeNode(t *testing.T) *Node { privKey := ed25519.GenPrivKey() nodeID := p2p.NodeIDFromPubKey(privKey.PubKey()) nodeInfo := p2p.NodeInfo{ @@ -207,7 +209,7 @@ func MakeNode(t *testing.T, network *Network) *Node { Moniker: string(nodeID), } - transport := network.memoryNetwork.CreateTransport(nodeID) + transport := n.memoryNetwork.CreateTransport(nodeID) require.Len(t, transport.Endpoints(), 1, "transport not listening on 1 endpoint") peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{ @@ -217,7 +219,7 @@ func MakeNode(t *testing.T, network *Network) *Node { }) require.NoError(t, err) - router, err := p2p.NewRouter(network.logger, nodeInfo, privKey, peerManager, + router, err := p2p.NewRouter(n.logger, nodeInfo, privKey, peerManager, []p2p.Transport{transport}, p2p.RouterOptions{}) require.NoError(t, err) require.NoError(t, router.Start())