Browse Source

P2P: Evidence Reactor Test Refactor (#6238)

pull/6252/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
acbe3f6570
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 286 additions and 308 deletions
  1. +259
    -306
      evidence/reactor_test.go
  2. +4
    -1
      p2p/p2ptest/network.go
  3. +10
    -0
      p2p/p2ptest/util.go
  4. +13
    -1
      p2p/peermanager.go

+ 259
- 306
evidence/reactor_test.go View File

@ -2,12 +2,13 @@ package evidence_test
import (
"encoding/hex"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@ -19,6 +20,7 @@ import (
"github.com/tendermint/tendermint/evidence/mocks"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/p2ptest"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
@ -31,129 +33,178 @@ var (
)
type reactorTestSuite struct {
reactor *evidence.Reactor
pool *evidence.Pool
peerID p2p.NodeID
evidenceChannel *p2p.Channel
evidenceInCh chan p2p.Envelope
evidenceOutCh chan p2p.Envelope
evidencePeerErrCh chan p2p.PeerError
peerUpdatesCh chan p2p.PeerUpdate
peerUpdates *p2p.PeerUpdates
network *p2ptest.Network
logger log.Logger
reactors map[p2p.NodeID]*evidence.Reactor
pools map[p2p.NodeID]*evidence.Pool
evidenceChannels map[p2p.NodeID]*p2p.Channel
peerUpdates map[p2p.NodeID]*p2p.PeerUpdates
peerChans map[p2p.NodeID]chan p2p.PeerUpdate
nodes []*p2ptest.Node
numStateStores int
}
func setup(t *testing.T, logger log.Logger, pool *evidence.Pool, chBuf uint) *reactorTestSuite {
func setup(t *testing.T, stateStores []sm.Store, chBuf uint) *reactorTestSuite {
t.Helper()
pID := make([]byte, 16)
_, err := rng.Read(pID)
require.NoError(t, err)
peerUpdatesCh := make(chan p2p.PeerUpdate)
numStateStores := len(stateStores)
rts := &reactorTestSuite{
pool: pool,
evidenceInCh: make(chan p2p.Envelope, chBuf),
evidenceOutCh: make(chan p2p.Envelope, chBuf),
evidencePeerErrCh: make(chan p2p.PeerError, chBuf),
peerUpdatesCh: peerUpdatesCh,
peerUpdates: p2p.NewPeerUpdates(peerUpdatesCh),
peerID: p2p.NodeID(fmt.Sprintf("%x", pID)),
numStateStores: numStateStores,
logger: log.TestingLogger().With("testCase", t.Name()),
network: p2ptest.MakeNetwork(t, numStateStores),
reactors: make(map[p2p.NodeID]*evidence.Reactor, numStateStores),
pools: make(map[p2p.NodeID]*evidence.Pool, numStateStores),
peerUpdates: make(map[p2p.NodeID]*p2p.PeerUpdates, numStateStores),
peerChans: make(map[p2p.NodeID]chan p2p.PeerUpdate, numStateStores),
}
rts.evidenceChannel = p2p.NewChannel(
rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(t,
evidence.EvidenceChannel,
new(tmproto.EvidenceList),
rts.evidenceInCh,
rts.evidenceOutCh,
rts.evidencePeerErrCh,
)
int(chBuf))
require.Len(t, rts.network.RandomNode().PeerManager.Peers(), 0)
rts.reactor = evidence.NewReactor(
logger,
rts.evidenceChannel,
rts.peerUpdates,
pool,
)
idx := 0
evidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
for nodeID := range rts.network.Nodes {
logger := rts.logger.With("validator", idx)
evidenceDB := dbm.NewMemDB()
blockStore := &mocks.BlockStore{}
state, _ := stateStores[idx].Load()
blockStore.On("LoadBlockMeta", mock.AnythingOfType("int64")).Return(func(h int64) *types.BlockMeta {
if h <= state.LastBlockHeight {
return &types.BlockMeta{Header: types.Header{Time: evidenceTime}}
}
return nil
})
rts.pools[nodeID], err = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore)
require.NoError(t, err)
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID])
rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID])
rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID])
require.NoError(t, rts.reactor.Start())
require.True(t, rts.reactor.IsRunning())
rts.reactors[nodeID] = evidence.NewReactor(logger,
rts.evidenceChannels[nodeID],
rts.peerUpdates[nodeID],
rts.pools[nodeID])
require.NoError(t, rts.reactors[nodeID].Start())
require.True(t, rts.reactors[nodeID].IsRunning())
idx++
}
t.Cleanup(func() {
require.NoError(t, rts.reactor.Stop())
require.False(t, rts.reactor.IsRunning())
for _, r := range rts.reactors {
if r.IsRunning() {
require.NoError(t, r.Stop())
require.False(t, r.IsRunning())
}
}
leaktest.Check(t)
})
return rts
}
func createTestSuites(t *testing.T, stateStores []sm.Store, chBuf uint) []*reactorTestSuite {
t.Helper()
func (rts *reactorTestSuite) start(t *testing.T) {
rts.network.Start(t)
require.Len(t,
rts.network.RandomNode().PeerManager.Peers(),
rts.numStateStores-1,
"network does not have expected number of nodes")
}
numSStores := len(stateStores)
testSuites := make([]*reactorTestSuite, numSStores)
evidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
func (rts *reactorTestSuite) waitForEvidence(t *testing.T, evList types.EvidenceList, ids ...p2p.NodeID) {
t.Helper()
for i := 0; i < numSStores; i++ {
logger := log.TestingLogger().With("validator", i)
evidenceDB := dbm.NewMemDB()
blockStore := &mocks.BlockStore{}
blockStore.On("LoadBlockMeta", mock.AnythingOfType("int64")).Return(
&types.BlockMeta{Header: types.Header{Time: evidenceTime}},
fn := func(pool *evidence.Pool) {
var (
localEvList []types.Evidence
size int64
loops int
)
pool, err := evidence.NewPool(logger, evidenceDB, stateStores[i], blockStore)
require.NoError(t, err)
// wait till we have at least the amount of evidence
// that we expect. if there's more local evidence then
// it doesn't make sense to wait longer and a
// different assertion should catch the resulting error
for len(localEvList) < len(evList) {
// each evidence should not be more than 500 bytes
localEvList, size = pool.PendingEvidence(int64(len(evList) * 500))
if loops == 100 {
t.Log("current wait status:", "|",
"local", len(localEvList), "|",
"waitlist", len(evList), "|",
"size", size)
}
testSuites[i] = setup(t, logger, pool, chBuf)
}
loops++
}
return testSuites
}
// put the reaped evidence in a map so we can quickly check we got everything
evMap := make(map[string]types.Evidence)
for _, e := range localEvList {
evMap[string(e.Hash())] = e
}
func waitForEvidence(t *testing.T, evList types.EvidenceList, suites ...*reactorTestSuite) {
t.Helper()
for i, expectedEv := range evList {
gotEv := evMap[string(expectedEv.Hash())]
require.Equalf(
t,
expectedEv,
gotEv,
"evidence for pool %d in pool does not match; got: %v, expected: %v", i, gotEv, expectedEv,
)
}
}
wg := new(sync.WaitGroup)
if len(ids) == 1 {
// special case waiting once, just to avoid the extra
// goroutine, in the case that this hits a timeout,
// the stack will be clearer.
fn(rts.pools[ids[0]])
return
}
for _, suite := range suites {
wg.Add(1)
wg := sync.WaitGroup{}
go func(s *reactorTestSuite) {
var localEvList []types.Evidence
for id := range rts.pools {
if len(ids) > 0 && !p2ptest.NodeInSlice(id, ids) {
// if an ID list is specified, then we only
// want to wait for those pools that are
// specified in the list, otherwise, wait for
// all pools.
continue
}
currentPoolSize := 0
for currentPoolSize != len(evList) {
// each evidence should not be more than 500 bytes
localEvList, _ = s.pool.PendingEvidence(int64(len(evList) * 500))
currentPoolSize = len(localEvList)
}
wg.Add(1)
go func(id p2p.NodeID) { defer wg.Done(); fn(rts.pools[id]) }(id)
}
wg.Wait()
}
// put the reaped evidence in a map so we can quickly check we got everything
evMap := make(map[string]types.Evidence)
for _, e := range localEvList {
evMap[string(e.Hash())] = e
}
func (rts *reactorTestSuite) assertEvidenceChannelsEmpty(t *testing.T) {
t.Helper()
for i, expectedEv := range evList {
gotEv := evMap[string(expectedEv.Hash())]
require.Equalf(
t,
expectedEv,
gotEv,
"evidence at index %d in pool does not match; got: %v, expected: %v", i, gotEv, expectedEv,
)
}
for id, r := range rts.reactors {
require.NoError(t, r.Stop(), "stopping reactor #%s", id)
r.Wait()
require.False(t, r.IsRunning(), "reactor #%d did not stop", id)
wg.Done()
}(suite)
}
// wait for the evidence in all evidence pools
wg.Wait()
for id, ech := range rts.evidenceChannels {
require.Empty(t, ech.Out, "checking channel #%q", id)
}
}
func createEvidenceList(
@ -162,7 +213,10 @@ func createEvidenceList(
val types.PrivValidator,
numEvidence int,
) types.EvidenceList {
t.Helper()
evList := make([]types.Evidence, numEvidence)
for i := 0; i < numEvidence; i++ {
ev := types.NewMockDuplicateVoteEvidenceWithValidator(
int64(i+1),
@ -171,44 +225,15 @@ func createEvidenceList(
evidenceChainID,
)
require.NoError(t, pool.AddEvidence(ev))
require.NoError(t, pool.AddEvidence(ev),
"adding evidence it#%d of %d to pool with height %d",
i, numEvidence, pool.State().LastBlockHeight)
evList[i] = ev
}
return evList
}
// simulateRouter will increment the provided WaitGroup and execute a simulated
// router where, for each outbound p2p Envelope from the primary reactor, we
// proxy (send) the Envelope the relevant peer reactor. Done is invoked on the
// WaitGroup when numOut Envelopes are sent (i.e. read from the outbound channel).
func simulateRouter(wg *sync.WaitGroup, primary *reactorTestSuite, suites []*reactorTestSuite, numOut int) {
wg.Add(1)
// create a mapping for efficient suite lookup by peer ID
suitesByPeerID := make(map[p2p.NodeID]*reactorTestSuite)
for _, suite := range suites {
suitesByPeerID[suite.peerID] = suite
}
// Simulate a router by listening for all outbound envelopes and proxying the
// envelope to the respective peer (suite).
go func() {
for i := 0; i < numOut; i++ {
envelope := <-primary.evidenceOutCh
other := suitesByPeerID[envelope.To]
other.evidenceInCh <- p2p.Envelope{
From: primary.peerID,
To: envelope.To,
Message: envelope.Message,
}
}
wg.Done()
}()
}
func TestReactorMultiDisconnect(t *testing.T) {
val := types.NewMockPV()
height := int64(numEvidence) + 10
@ -216,27 +241,29 @@ func TestReactorMultiDisconnect(t *testing.T) {
stateDB1 := initializeValidatorState(t, val, height)
stateDB2 := initializeValidatorState(t, val, height)
testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, 20)
primary := testSuites[0]
secondary := testSuites[1]
rts := setup(t, []sm.Store{stateDB1, stateDB2}, 20)
primary := rts.nodes[0]
secondary := rts.nodes[1]
_ = createEvidenceList(t, primary.pool, val, numEvidence)
_ = createEvidenceList(t, rts.pools[primary.NodeID], val, numEvidence)
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
NodeID: secondary.peerID,
}
require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusDown)
rts.start(t)
require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusUp)
// Ensure "disconnecting" the secondary peer from the primary more than once
// is handled gracefully.
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusDown,
NodeID: secondary.peerID,
}
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusDown,
NodeID: secondary.peerID,
}
require.NoError(t, primary.PeerManager.Disconnected(secondary.NodeID))
require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusDown)
_, err := primary.PeerManager.TryEvictNext()
require.NoError(t, err)
require.NoError(t, primary.PeerManager.Disconnected(secondary.NodeID))
require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusDown)
require.Equal(t, secondary.PeerManager.Status(primary.NodeID), p2p.PeerStatusUp)
}
// TestReactorBroadcastEvidence creates an environment of multiple peers that
@ -256,44 +283,45 @@ func TestReactorBroadcastEvidence(t *testing.T) {
stateDBs[i] = initializeValidatorState(t, val, height)
}
// Create a series of test suites where each suite contains a reactor and
rts := setup(t, stateDBs, 0)
rts.start(t)
// Create a series of fixtures where each suite contains a reactor and
// evidence pool. In addition, we mark a primary suite and the rest are
// secondaries where each secondary is added as a peer via a PeerUpdate to the
// primary. As a result, the primary will gossip all evidence to each secondary.
testSuites := createTestSuites(t, stateDBs, 0)
primary := testSuites[0]
secondaries := testSuites[1:]
primary := rts.network.RandomNode()
secondaries := make([]*p2ptest.Node, 0, len(rts.network.NodeIDs())-1)
secondaryIDs := make([]p2p.NodeID, 0, cap(secondaries))
for id := range rts.network.Nodes {
if id == primary.NodeID {
continue
}
// Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite).
wg := new(sync.WaitGroup)
simulateRouter(wg, primary, testSuites, numEvidence*len(secondaries))
secondaries = append(secondaries, rts.network.Nodes[id])
secondaryIDs = append(secondaryIDs, id)
}
evList := createEvidenceList(t, primary.pool, val, numEvidence)
evList := createEvidenceList(t, rts.pools[primary.NodeID], val, numEvidence)
// Add each secondary suite (node) as a peer to the primary suite (node). This
// will cause the primary to gossip all evidence to the secondaries.
for _, suite := range secondaries {
primary.peerUpdatesCh <- p2p.PeerUpdate{
rts.peerChans[primary.NodeID] <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
NodeID: suite.peerID,
NodeID: suite.NodeID,
}
}
// Wait till all secondary suites (reactor) received all evidence from the
// primary suite (node).
waitForEvidence(t, evList, secondaries...)
rts.waitForEvidence(t, evList, secondaryIDs...)
for _, suite := range testSuites {
require.Equal(t, numEvidence, int(suite.pool.Size()))
for _, pool := range rts.pools {
require.Equal(t, numEvidence, int(pool.Size()))
}
wg.Wait()
// ensure all channels are drained
for _, suite := range testSuites {
require.Empty(t, suite.evidenceOutCh)
}
rts.assertEvidenceChannelsEmpty(t)
}
// TestReactorSelectiveBroadcast tests a context where we have two reactors
@ -304,47 +332,35 @@ func TestReactorBroadcastEvidence_Lagging(t *testing.T) {
height1 := int64(numEvidence) + 10
height2 := int64(numEvidence) / 2
// stateDB1 is ahead of stateDB2, where stateDB1 has all heights (1-10) and
// stateDB2 only has heights 1-7.
// stateDB1 is ahead of stateDB2, where stateDB1 has all heights (1-20) and
// stateDB2 only has heights 1-5.
stateDB1 := initializeValidatorState(t, val, height1)
stateDB2 := initializeValidatorState(t, val, height2)
testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, 0)
primary := testSuites[0]
secondaries := testSuites[1:]
rts := setup(t, []sm.Store{stateDB1, stateDB2}, 100)
rts.start(t)
// Simulate a router by listening for all outbound envelopes and proxying the
// envelope to the respective peer (suite).
wg := new(sync.WaitGroup)
simulateRouter(wg, primary, testSuites, numEvidence*len(secondaries))
primary := rts.nodes[0]
secondary := rts.nodes[1]
// Send a list of valid evidence to the first reactor's, the one that is ahead,
// evidence pool.
evList := createEvidenceList(t, primary.pool, val, numEvidence)
evList := createEvidenceList(t, rts.pools[primary.NodeID], val, numEvidence)
// Add each secondary suite (node) as a peer to the primary suite (node). This
// will cause the primary to gossip all evidence to the secondaries.
for _, suite := range secondaries {
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
NodeID: suite.peerID,
}
rts.peerChans[primary.NodeID] <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
NodeID: secondary.NodeID,
}
// only ones less than the peers height should make it through
waitForEvidence(t, evList[:height2+2], secondaries...)
rts.waitForEvidence(t, evList[:height2], secondary.NodeID)
require.Equal(t, numEvidence, int(primary.pool.Size()))
require.Equal(t, int(height2+2), int(secondaries[0].pool.Size()))
require.Equal(t, numEvidence, int(rts.pools[primary.NodeID].Size()))
require.Equal(t, int(height2), int(rts.pools[secondary.NodeID].Size()))
// The primary will continue to send the remaining evidence to the secondaries
// so we wait until it has sent all the envelopes.
wg.Wait()
// ensure all channels are drained
for _, suite := range testSuites {
require.Empty(t, suite.evidenceOutCh)
}
rts.assertEvidenceChannelsEmpty(t)
}
func TestReactorBroadcastEvidence_Pending(t *testing.T) {
@ -354,46 +370,49 @@ func TestReactorBroadcastEvidence_Pending(t *testing.T) {
stateDB1 := initializeValidatorState(t, val, height)
stateDB2 := initializeValidatorState(t, val, height)
testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, 0)
primary := testSuites[0]
secondary := testSuites[1]
rts := setup(t, []sm.Store{stateDB1, stateDB2}, 100)
primary := rts.nodes[0]
secondary := rts.nodes[1]
// Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite).
wg := new(sync.WaitGroup)
simulateRouter(wg, primary, testSuites, numEvidence)
// add all evidence to the primary reactor
evList := createEvidenceList(t, primary.pool, val, numEvidence)
evList := createEvidenceList(t, rts.pools[primary.NodeID], val, numEvidence)
// Manually add half the evidence to the secondary which will mark them as
// pending.
for i := 0; i < numEvidence/2; i++ {
require.NoError(t, secondary.pool.AddEvidence(evList[i]))
require.NoError(t, rts.pools[secondary.NodeID].AddEvidence(evList[i]))
}
rts.start(t)
// the secondary should have half the evidence as pending
require.Equal(t, uint32(numEvidence/2), secondary.pool.Size())
require.Equal(t, numEvidence/2, int(rts.pools[secondary.NodeID].Size()))
// add the secondary reactor as a peer to the primary reactor
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
NodeID: secondary.peerID,
// adding the secondary node back in node back in
require.NoError(t, primary.PeerManager.Add(secondary.NodeAddress))
startAt := time.Now()
for {
if time.Since(startAt) > time.Second {
require.Fail(t, "could not reconnect the secondary in less than a second")
}
if primary.PeerManager.Status(secondary.NodeID) == p2p.PeerStatusUp {
break
}
}
// The secondary reactor should have received all the evidence ignoring the
// already pending evidence.
waitForEvidence(t, evList, secondary)
for _, suite := range testSuites {
require.Equal(t, numEvidence, int(suite.pool.Size()))
}
rts.waitForEvidence(t, evList, secondary.NodeID)
wg.Wait()
// check to make sure that all of the evidence has
// propogated
require.Len(t, rts.pools, 2)
assert.EqualValues(t, numEvidence, rts.pools[primary.NodeID].Size(),
"primary node should have all the evidence")
if assert.EqualValues(t, numEvidence, rts.pools[secondary.NodeID].Size(),
"secondary nodes should have caught up") {
// ensure all channels are drained
for _, suite := range testSuites {
require.Empty(t, suite.evidenceOutCh)
rts.assertEvidenceChannelsEmpty(t)
}
}
@ -404,55 +423,52 @@ func TestReactorBroadcastEvidence_Committed(t *testing.T) {
stateDB1 := initializeValidatorState(t, val, height)
stateDB2 := initializeValidatorState(t, val, height)
testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, 0)
primary := testSuites[0]
secondary := testSuites[1]
rts := setup(t, []sm.Store{stateDB1, stateDB2}, 0)
primary := rts.nodes[0]
secondary := rts.nodes[1]
// add all evidence to the primary reactor
evList := createEvidenceList(t, primary.pool, val, numEvidence)
evList := createEvidenceList(t, rts.pools[primary.NodeID], val, numEvidence)
// Manually add half the evidence to the secondary which will mark them as
// pending.
for i := 0; i < numEvidence/2; i++ {
require.NoError(t, secondary.pool.AddEvidence(evList[i]))
require.NoError(t, rts.pools[secondary.NodeID].AddEvidence(evList[i]))
}
// the secondary should have half the evidence as pending
require.Equal(t, uint32(numEvidence/2), secondary.pool.Size())
require.Equal(t, numEvidence/2, int(rts.pools[secondary.NodeID].Size()))
state, err := stateDB2.Load()
require.NoError(t, err)
// update the secondary's pool such that all pending evidence is committed
state.LastBlockHeight++
secondary.pool.Update(state, evList[:numEvidence/2])
rts.pools[secondary.NodeID].Update(state, evList[:numEvidence/2])
// the secondary should have half the evidence as committed
require.Equal(t, uint32(0), secondary.pool.Size())
require.Equal(t, 0, int(rts.pools[secondary.NodeID].Size()))
// Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite).
wg := new(sync.WaitGroup)
simulateRouter(wg, primary, testSuites, numEvidence)
// start the network and ensure it's configured
rts.start(t)
// add the secondary reactor as a peer to the primary reactor
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
NodeID: secondary.peerID,
}
// without the following sleep the test consistently fails;
// likely because the sleep forces a context switch that lets
// the router process other operations.
time.Sleep(2 * time.Millisecond)
// The secondary reactor should have received all the evidence ignoring the
// already committed evidence.
waitForEvidence(t, evList[numEvidence/2:], secondary)
rts.waitForEvidence(t, evList[numEvidence/2:], secondary.NodeID)
require.Equal(t, numEvidence, int(primary.pool.Size()))
require.Equal(t, numEvidence/2, int(secondary.pool.Size()))
require.Len(t, rts.pools, 2)
assert.EqualValues(t, numEvidence, rts.pools[primary.NodeID].Size(),
"primary node should have all the evidence")
if assert.EqualValues(t, numEvidence/2, rts.pools[secondary.NodeID].Size(),
"secondary nodes should have caught up") {
wg.Wait()
// ensure all channels are drained
for _, suite := range testSuites {
require.Empty(t, suite.evidenceOutCh)
rts.assertEvidenceChannelsEmpty(t)
}
}
@ -470,96 +486,33 @@ func TestReactorBroadcastEvidence_FullyConnected(t *testing.T) {
stateDBs[i] = initializeValidatorState(t, val, height)
}
testSuites := createTestSuites(t, stateDBs, 0)
// Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite).
wg := new(sync.WaitGroup)
for _, suite := range testSuites {
simulateRouter(wg, suite, testSuites, numEvidence*(len(testSuites)-1))
}
rts := setup(t, stateDBs, 0)
rts.start(t)
evList := createEvidenceList(t, testSuites[0].pool, val, numEvidence)
evList := createEvidenceList(t, rts.pools[rts.network.RandomNode().NodeID], val, numEvidence)
// every suite (reactor) connects to every other suite (reactor)
for _, suiteI := range testSuites {
for _, suiteJ := range testSuites {
if suiteI.peerID != suiteJ.peerID {
suiteI.peerUpdatesCh <- p2p.PeerUpdate{
for outerID, outerChan := range rts.peerChans {
for innerID := range rts.peerChans {
if outerID != innerID {
outerChan <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
NodeID: suiteJ.peerID,
NodeID: innerID,
}
}
}
}
// wait till all suites (reactors) received all evidence from other suites (reactors)
waitForEvidence(t, evList, testSuites...)
rts.waitForEvidence(t, evList)
for _, suite := range testSuites {
require.Equal(t, numEvidence, int(suite.pool.Size()))
for _, pool := range rts.pools {
require.Equal(t, numEvidence, int(pool.Size()))
// commit state so we do not continue to repeat gossiping the same evidence
state := suite.pool.State()
state := pool.State()
state.LastBlockHeight++
suite.pool.Update(state, evList)
}
wg.Wait()
}
func TestReactorBroadcastEvidence_RemovePeer(t *testing.T) {
val := types.NewMockPV()
height := int64(10)
stateDB1 := initializeValidatorState(t, val, height)
stateDB2 := initializeValidatorState(t, val, height)
testSuites := createTestSuites(t, []sm.Store{stateDB1, stateDB2}, uint(numEvidence))
primary := testSuites[0]
secondary := testSuites[1]
// Simulate a router by listening for all outbound envelopes and proxying the
// envelopes to the respective peer (suite).
wg := new(sync.WaitGroup)
simulateRouter(wg, primary, testSuites, numEvidence/2)
// add all evidence to the primary reactor
evList := createEvidenceList(t, primary.pool, val, numEvidence)
// add the secondary reactor as a peer to the primary reactor
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
NodeID: secondary.peerID,
}
// have the secondary reactor receive only half the evidence
waitForEvidence(t, evList[:numEvidence/2], secondary)
// disconnect the peer
primary.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusDown,
NodeID: secondary.peerID,
}
// Ensure the secondary only received half of the evidence before being
// disconnected.
require.Equal(t, numEvidence/2, int(secondary.pool.Size()))
wg.Wait()
// The primary reactor should still be attempting to send the remaining half.
//
// NOTE: The channel is buffered (size numEvidence) as to ensure the primary
// reactor will send all envelopes at once before receiving the signal to stop
// gossiping.
for i := 0; i < numEvidence/2; i++ {
<-primary.evidenceOutCh
}
// ensure all channels are drained
for _, suite := range testSuites {
require.Empty(t, suite.evidenceOutCh)
pool.Update(state, evList)
}
}


+ 4
- 1
p2p/p2ptest/network.go View File

@ -211,7 +211,9 @@ func MakeNode(t *testing.T, network *Network) *Node {
require.Len(t, transport.Endpoints(), 1, "transport not listening on 1 endpoint")
peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{
MinRetryTime: 10 * time.Millisecond,
MinRetryTime: 10 * time.Millisecond,
MaxRetryTime: 100 * time.Millisecond,
RetryTimeJitter: time.Millisecond,
})
require.NoError(t, err)
@ -274,5 +276,6 @@ func (n *Node) MakePeerUpdates(t *testing.T) *p2p.PeerUpdates {
RequireNoUpdates(t, sub)
sub.Close()
})
return sub
}

+ 10
- 0
p2p/p2ptest/util.go View File

@ -2,7 +2,17 @@ package p2ptest
import (
gogotypes "github.com/gogo/protobuf/types"
"github.com/tendermint/tendermint/p2p"
)
// Message is a simple message containing a string-typed Value field.
type Message = gogotypes.StringValue
func NodeInSlice(id p2p.NodeID, ids []p2p.NodeID) bool {
for _, n := range ids {
if id == n {
return true
}
}
return false
}

+ 13
- 1
p2p/peermanager.go View File

@ -792,6 +792,19 @@ func (m *PeerManager) Subscribe() *PeerUpdates {
// compounding. Limiting it to 1 means that the subscribers are still
// reasonably in sync. However, this should probably be benchmarked.
peerUpdates := NewPeerUpdates(make(chan PeerUpdate, 1))
m.Register(peerUpdates)
return peerUpdates
}
// Register allows you to inject a custom PeerUpdate instance into the
// PeerManager, rather than relying on the instance constructed by the
// Subscribe method, which wraps the functionality of the Register
// method.
//
// The caller must consume the peer updates from this PeerUpdates
// instance in a timely fashion and close the subscription when done,
// otherwise the PeerManager will halt.
func (m *PeerManager) Register(peerUpdates *PeerUpdates) {
m.mtx.Lock()
m.subscriptions[peerUpdates] = peerUpdates
m.mtx.Unlock()
@ -805,7 +818,6 @@ func (m *PeerManager) Subscribe() *PeerUpdates {
case <-m.closeCh:
}
}()
return peerUpdates
}
// broadcast broadcasts a peer update to all subscriptions. The caller must


Loading…
Cancel
Save