From fe4b53a4637283f97d8104230e6a58d9dfde540a Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 2 Nov 2017 12:06:48 -0600 Subject: [PATCH] EvidencePool --- consensus/state.go | 19 +--- consensus/types/state.go | 3 - evidence/evidence_pool.go | 202 +++++++++++++++++++++++++++++++++ evidence/evidence_pool_test.go | 202 +++++++++++++++++++++++++++++++++ evidence/reactor.go | 138 ++++++++++++++++++++++ evidence/reactor_test.go | 108 ++++++++++++++++++ 6 files changed, 656 insertions(+), 16 deletions(-) create mode 100644 evidence/evidence_pool.go create mode 100644 evidence/evidence_pool_test.go create mode 100644 evidence/reactor.go create mode 100644 evidence/reactor_test.go diff --git a/consensus/state.go b/consensus/state.go index 4785b3621..b26095de7 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -78,6 +78,7 @@ type ConsensusState struct { proxyAppConn proxy.AppConnConsensus blockStore types.BlockStore mempool types.Mempool + evpool types.EvidencePool // internal state mtx sync.Mutex @@ -113,7 +114,7 @@ type ConsensusState struct { } // NewConsensusState returns a new ConsensusState. -func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool) *ConsensusState { +func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool, evpool types.Mempool) *ConsensusState { cs := &ConsensusState{ config: config, proxyAppConn: proxyAppConn, @@ -125,6 +126,7 @@ func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppCon done: make(chan struct{}), doWALCatchup: true, wal: nilWAL{}, + evpool: evpool, } // set function defaults (may be overwritten before calling Start) cs.decideProposal = cs.defaultDecideProposal @@ -864,7 +866,8 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts // Mempool validated transactions txs := cs.mempool.Reap(cs.config.MaxBlockSizeTxs) block, parts := cs.state.MakeBlock(cs.Height, txs, commit) - block.AddEvidence(cs.Evidence) + evidence := cs.evpool.Evidence() + block.AddEvidence(evidence) return block, parts } @@ -1338,7 +1341,7 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerKey string) error { cs.Logger.Error("Found conflicting vote from ourselves. Did you unsafe_reset a validator?", "height", vote.Height, "round", vote.Round, "type", vote.Type) return err } - cs.addEvidence(voteErr.DuplicateVoteEvidence) + cs.evpool.AddEvidence(voteErr.DuplicateVoteEvidence) return err } else { // Probably an invalid signature / Bad peer. @@ -1350,16 +1353,6 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerKey string) error { return nil } -func (cs *ConsensusState) addEvidence(ev types.Evidence) { - if cs.Evidence.Has(ev) { - return - } - - cs.Logger.Error("Found conflicting vote. Recording evidence in the RoundState", "evidence", ev) - - cs.Evidence = append(cs.Evidence, ev) -} - //----------------------------------------------------------------------------- func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool, err error) { diff --git a/consensus/types/state.go b/consensus/types/state.go index 06194711b..b95131f4e 100644 --- a/consensus/types/state.go +++ b/consensus/types/state.go @@ -74,7 +74,6 @@ type RoundState struct { CommitRound int // LastCommit *types.VoteSet // Last precommits at Height-1 LastValidators *types.ValidatorSet - Evidence types.Evidences } // RoundStateEvent returns the H/R/S of the RoundState as an event. @@ -110,7 +109,6 @@ func (rs *RoundState) StringIndented(indent string) string { %s Votes: %v %s LastCommit: %v %s LastValidators:%v -%s Evidence: %v %s}`, indent, rs.Height, rs.Round, rs.Step, indent, rs.StartTime, @@ -123,7 +121,6 @@ func (rs *RoundState) StringIndented(indent string) string { indent, rs.Votes.StringIndented(indent+" "), indent, rs.LastCommit.StringShort(), indent, rs.LastValidators.StringIndented(indent+" "), - indent, rs.Evidence.String(), indent) } diff --git a/evidence/evidence_pool.go b/evidence/evidence_pool.go new file mode 100644 index 000000000..d84eb13d8 --- /dev/null +++ b/evidence/evidence_pool.go @@ -0,0 +1,202 @@ +package evpool + +import ( + "container/list" + "fmt" + "sync" + "sync/atomic" + + "github.com/tendermint/tmlibs/log" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/types" +) + +const cacheSize = 100000 + +// EvidencePool maintains a set of valid uncommitted evidence. +type EvidencePool struct { + config *cfg.EvidencePoolConfig + + mtx sync.Mutex + height int // the last block Update()'d to + evidence types.Evidences + // TODO: evidenceCache + + // TODO: need to persist evidence so we never lose it + + logger log.Logger +} + +func NewEvidencePool(config *cfg.EvidencePoolConfig, height int) *EvidencePool { + evpool := &EvidencePool{ + config: config, + height: height, + logger: log.NewNopLogger(), + } + evpool.initWAL() + return evpool +} + +// SetLogger sets the Logger. +func (evpool *EvidencePool) SetLogger(l log.Logger) { + evpool.logger = l +} + +// Evidence returns a copy of the pool's evidence. +func (evpool *EvidencePool) Evidence() types.Evidences { + evpool.mtx.Lock() + defer evpool.mtx.Unlock() + + evCopy := make(types.Evidences, len(evpool.evidence)) + for i, ev := range evpool.evidence { + evCopy[i] = ev + } + return evCopy +} + +// Size returns the number of pieces of evidence in the evpool. +func (evpool *EvidencePool) Size() int { + evpool.mtx.Lock() + defer evpool.mtx.Unlock() + return len(evpool.evidence) +} + +// Flush removes all evidence from the evpool +func (evpool *EvidencePool) Flush() { + evpool.mtx.Lock() + defer evpool.mtx.Unlock() + evpool.evidence = make(types.Evidence) +} + +// AddEvidence checks the evidence is valid and adds it to the pool. +func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { + evpool.mtx.Lock() + defer evpool.mtx.Unlock() + + if evpool.evidence.Has(evidence) { + return fmt.Errorf("Evidence already exists", "evidence", evidence) + } + cs.Logger.Info("Found conflicting vote. Recording evidence", "evidence", ev) + evpool.evidence = append(evpool.evidence, ev) + // TODO: write to disk ? WAL ? + return nil +} + +// Update informs the evpool that the given evidence was committed and can be discarded. +// NOTE: this should be called *after* block is committed by consensus. +func (evpool *EvidencePool) Update(height int, evidence types.Evidences) { + + // First, create a lookup map of txns in new txs. + evMap := make(map[string]struct{}) + for _, ev := range evidence { + evMap[string(evidence.Hash())] = struct{}{} + } + + // Set height + evpool.height = height + + // Remove evidence that is already committed . + goodEvidence := evpool.filterEvidence(evMap) + _ = goodEvidence + +} + +// TODO: +func (evpool *EvidencePool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { + goodTxs := make([]types.Tx, 0, evpool.txs.Len()) + for e := evpool.txs.Front(); e != nil; e = e.Next() { + memTx := e.Value.(*evpoolTx) + // Remove the tx if it's alredy in a block. + if _, ok := blockTxsMap[string(memTx.tx)]; ok { + // remove from clist + evpool.txs.Remove(e) + e.DetachPrev() + + // NOTE: we don't remove committed txs from the cache. + continue + } + // Good tx! + goodTxs = append(goodTxs, memTx.tx) + } + return goodTxs +} + +//-------------------------------------------------------------------------------- + +// evpoolTx is a transaction that successfully ran +type evpoolEvidence struct { + counter int64 // a simple incrementing counter + height int64 // height that this tx had been validated in + evidence types.Evidence // +} + +// Height returns the height for this transaction +func (memTx *evpoolTx) Height() int { + return int(atomic.LoadInt64(&memTx.height)) +} + +//-------------------------------------------------------------------------------- +// TODO: + +// txCache maintains a cache of evidence +type txCache struct { + mtx sync.Mutex + size int + map_ map[string]struct{} + list *list.List // to remove oldest tx when cache gets too big +} + +// newTxCache returns a new txCache. +func newTxCache(cacheSize int) *txCache { + return &txCache{ + size: cacheSize, + map_: make(map[string]struct{}, cacheSize), + list: list.New(), + } +} + +// Reset resets the txCache to empty. +func (cache *txCache) Reset() { + cache.mtx.Lock() + cache.map_ = make(map[string]struct{}, cacheSize) + cache.list.Init() + cache.mtx.Unlock() +} + +// Exists returns true if the given tx is cached. +func (cache *txCache) Exists(tx types.Tx) bool { + cache.mtx.Lock() + _, exists := cache.map_[string(tx)] + cache.mtx.Unlock() + return exists +} + +// Push adds the given tx to the txCache. It returns false if tx is already in the cache. +func (cache *txCache) Push(tx types.Tx) bool { + cache.mtx.Lock() + defer cache.mtx.Unlock() + + if _, exists := cache.map_[string(tx)]; exists { + return false + } + + if cache.list.Len() >= cache.size { + popped := cache.list.Front() + poppedTx := popped.Value.(types.Tx) + // NOTE: the tx may have already been removed from the map + // but deleting a non-existent element is fine + delete(cache.map_, string(poppedTx)) + cache.list.Remove(popped) + } + cache.map_[string(tx)] = struct{}{} + cache.list.PushBack(tx) + return true +} + +// Remove removes the given tx from the cache. +func (cache *txCache) Remove(tx types.Tx) { + cache.mtx.Lock() + delete(cache.map_, string(tx)) + cache.mtx.Unlock() +} diff --git a/evidence/evidence_pool_test.go b/evidence/evidence_pool_test.go new file mode 100644 index 000000000..46401e88b --- /dev/null +++ b/evidence/evidence_pool_test.go @@ -0,0 +1,202 @@ +package mempool + +import ( + "crypto/rand" + "encoding/binary" + "testing" + "time" + + "github.com/tendermint/abci/example/counter" + "github.com/tendermint/abci/example/dummy" + "github.com/tendermint/tmlibs/log" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" +) + +func newMempoolWithApp(cc proxy.ClientCreator) *Mempool { + config := cfg.ResetTestRoot("mempool_test") + + appConnMem, _ := cc.NewABCIClient() + appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) + appConnMem.Start() + mempool := NewMempool(config.Mempool, appConnMem, 0) + mempool.SetLogger(log.TestingLogger()) + return mempool +} + +func ensureNoFire(t *testing.T, ch <-chan int, timeoutMS int) { + timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) + select { + case <-ch: + t.Fatal("Expected not to fire") + case <-timer.C: + } +} + +func ensureFire(t *testing.T, ch <-chan int, timeoutMS int) { + timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) + select { + case <-ch: + case <-timer.C: + t.Fatal("Expected to fire") + } +} + +func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs { + txs := make(types.Txs, count) + for i := 0; i < count; i++ { + txBytes := make([]byte, 20) + txs[i] = txBytes + rand.Read(txBytes) + err := mempool.CheckTx(txBytes, nil) + if err != nil { + t.Fatal("Error after CheckTx: %v", err) + } + } + return txs +} + +func TestTxsAvailable(t *testing.T) { + app := dummy.NewDummyApplication() + cc := proxy.NewLocalClientCreator(app) + mempool := newMempoolWithApp(cc) + mempool.EnableTxsAvailable() + + timeoutMS := 500 + + // with no txs, it shouldnt fire + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // send a bunch of txs, it should only fire once + txs := checkTxs(t, mempool, 100) + ensureFire(t, mempool.TxsAvailable(), timeoutMS) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // call update with half the txs. + // it should fire once now for the new height + // since there are still txs left + committedTxs, txs := txs[:50], txs[50:] + mempool.Update(1, committedTxs) + ensureFire(t, mempool.TxsAvailable(), timeoutMS) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // send a bunch more txs. we already fired for this height so it shouldnt fire again + moreTxs := checkTxs(t, mempool, 50) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // now call update with all the txs. it should not fire as there are no txs left + committedTxs = append(txs, moreTxs...) + mempool.Update(2, committedTxs) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // send a bunch more txs, it should only fire once + checkTxs(t, mempool, 100) + ensureFire(t, mempool.TxsAvailable(), timeoutMS) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) +} + +func TestSerialReap(t *testing.T) { + app := counter.NewCounterApplication(true) + app.SetOption("serial", "on") + cc := proxy.NewLocalClientCreator(app) + + mempool := newMempoolWithApp(cc) + appConnCon, _ := cc.NewABCIClient() + appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) + if _, err := appConnCon.Start(); err != nil { + t.Fatalf("Error starting ABCI client: %v", err.Error()) + } + + deliverTxsRange := func(start, end int) { + // Deliver some txs. + for i := start; i < end; i++ { + + // This will succeed + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(i)) + err := mempool.CheckTx(txBytes, nil) + if err != nil { + t.Fatal("Error after CheckTx: %v", err) + } + + // This will fail because not serial (incrementing) + // However, error should still be nil. + // It just won't show up on Reap(). + err = mempool.CheckTx(txBytes, nil) + if err != nil { + t.Fatal("Error after CheckTx: %v", err) + } + + } + } + + reapCheck := func(exp int) { + txs := mempool.Reap(-1) + if len(txs) != exp { + t.Fatalf("Expected to reap %v txs but got %v", exp, len(txs)) + } + } + + updateRange := func(start, end int) { + txs := make([]types.Tx, 0) + for i := start; i < end; i++ { + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(i)) + txs = append(txs, txBytes) + } + mempool.Update(0, txs) + } + + commitRange := func(start, end int) { + // Deliver some txs. + for i := start; i < end; i++ { + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(i)) + res := appConnCon.DeliverTxSync(txBytes) + if !res.IsOK() { + t.Errorf("Error committing tx. Code:%v result:%X log:%v", + res.Code, res.Data, res.Log) + } + } + res := appConnCon.CommitSync() + if len(res.Data) != 8 { + t.Errorf("Error committing. Hash:%X log:%v", res.Data, res.Log) + } + } + + //---------------------------------------- + + // Deliver some txs. + deliverTxsRange(0, 100) + + // Reap the txs. + reapCheck(100) + + // Reap again. We should get the same amount + reapCheck(100) + + // Deliver 0 to 999, we should reap 900 new txs + // because 100 were already counted. + deliverTxsRange(0, 1000) + + // Reap the txs. + reapCheck(1000) + + // Reap again. We should get the same amount + reapCheck(1000) + + // Commit from the conensus AppConn + commitRange(0, 500) + updateRange(0, 500) + + // We should have 500 left. + reapCheck(500) + + // Deliver 100 invalid txs and 100 valid txs + deliverTxsRange(900, 1100) + + // We should have 600 now. + reapCheck(600) +} diff --git a/evidence/reactor.go b/evidence/reactor.go new file mode 100644 index 000000000..7554bb4c2 --- /dev/null +++ b/evidence/reactor.go @@ -0,0 +1,138 @@ +package evpool + +import ( + "bytes" + "fmt" + "reflect" + + wire "github.com/tendermint/go-wire" + "github.com/tendermint/tmlibs/log" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/types" +) + +const ( + EvidencePoolChannel = byte(0x38) + + maxEvidencePoolMessageSize = 1048576 // 1MB TODO make it configurable + peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount +) + +// EvidencePoolReactor handles evpool evidence broadcasting amongst peers. +type EvidencePoolReactor struct { + p2p.BaseReactor + config *cfg.EvidencePoolConfig + EvidencePool *EvidencePool + evsw types.EventSwitch +} + +// NewEvidencePoolReactor returns a new EvidencePoolReactor with the given config and evpool. +func NewEvidencePoolReactor(config *cfg.EvidencePoolConfig, evpool *EvidencePool) *EvidencePoolReactor { + evR := &EvidencePoolReactor{ + config: config, + EvidencePool: evpool, + } + evR.BaseReactor = *p2p.NewBaseReactor("EvidencePoolReactor", evR) + return evR +} + +// SetLogger sets the Logger on the reactor and the underlying EvidencePool. +func (evR *EvidencePoolReactor) SetLogger(l log.Logger) { + evR.Logger = l + evR.EvidencePool.SetLogger(l) +} + +// GetChannels implements Reactor. +// It returns the list of channels for this reactor. +func (evR *EvidencePoolReactor) GetChannels() []*p2p.ChannelDescriptor { + return []*p2p.ChannelDescriptor{ + &p2p.ChannelDescriptor{ + ID: EvidencePoolChannel, + Priority: 5, + }, + } +} + +// AddPeer implements Reactor. +func (evR *EvidencePoolReactor) AddPeer(peer p2p.Peer) { + // send the new peer all current evidence + evidence := evR.evpool.Evidence() + msg := EvidenceMessage{evidence} + success := peer.Send(EvidencePoolChannel, struct{ EvidencePoolMessage }{msg}) + if !success { + // TODO: remove peer ? + } +} + +// RemovePeer implements Reactor. +func (evR *EvidencePoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) { +} + +// Receive implements Reactor. +// It adds any received evidence to the evpool. +func (evR *EvidencePoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { + _, msg, err := DecodeMessage(msgBytes) + if err != nil { + evR.Logger.Error("Error decoding message", "err", err) + return + } + evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) + + switch msg := msg.(type) { + case *EvidenceMessage: + for _, ev := range msg.Evidence { + err := evR.EvidencePool.AddEvidence(msg.Evidence, nil) + if err != nil { + evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err) + // TODO: punish peer + } else { + // TODO: broadcast good evidence to all peers (except sender? ) + } + } + default: + evR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + } +} + +// SetEventSwitch implements events.Eventable. +func (evR *EvidencePoolReactor) SetEventSwitch(evsw types.EventSwitch) { + evR.evsw = evsw +} + +//----------------------------------------------------------------------------- +// Messages + +const ( + msgTypeEvidence = byte(0x01) +) + +// EvidencePoolMessage is a message sent or received by the EvidencePoolReactor. +type EvidencePoolMessage interface{} + +var _ = wire.RegisterInterface( + struct{ EvidencePoolMessage }{}, + wire.ConcreteType{&EvidenceMessage{}, msgTypeEvidence}, +) + +// DecodeMessage decodes a byte-array into a EvidencePoolMessage. +func DecodeMessage(bz []byte) (msgType byte, msg EvidencePoolMessage, err error) { + msgType = bz[0] + n := new(int) + r := bytes.NewReader(bz) + msg = wire.ReadBinary(struct{ EvidencePoolMessage }{}, r, maxEvidencePoolMessageSize, n, &err).(struct{ EvidencePoolMessage }).EvidencePoolMessage + return +} + +//------------------------------------- + +// EvidenceMessage contains a list of evidence. +type EvidenceMessage struct { + Evidence types.Evidences +} + +// String returns a string representation of the EvidenceMessage. +func (m *EvidenceMessage) String() string { + return fmt.Sprintf("[EvidenceMessage %v]", m.Evidence) +} diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go new file mode 100644 index 000000000..e488311b0 --- /dev/null +++ b/evidence/reactor_test.go @@ -0,0 +1,108 @@ +package evpool + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/go-kit/kit/log/term" + + "github.com/tendermint/abci/example/dummy" + "github.com/tendermint/tmlibs/log" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" +) + +// evpoolLogger is a TestingLogger which uses a different +// color for each validator ("validator" key must exist). +func evpoolLogger() log.Logger { + return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { + for i := 0; i < len(keyvals)-1; i += 2 { + if keyvals[i] == "validator" { + return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} + } + } + return term.FgBgColor{} + }) +} + +// connect N evpool reactors through N switches +func makeAndConnectEvidencePoolReactors(config *cfg.Config, N int) []*EvidencePoolReactor { + reactors := make([]*EvidencePoolReactor, N) + logger := evpoolLogger() + for i := 0; i < N; i++ { + app := dummy.NewDummyApplication() + cc := proxy.NewLocalClientCreator(app) + evpool := newEvidencePoolWithApp(cc) + + reactors[i] = NewEvidencePoolReactor(config.EvidencePool, evpool) // so we dont start the consensus states + reactors[i].SetLogger(logger.With("validator", i)) + } + + p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("MEMPOOL", reactors[i]) + return s + + }, p2p.Connect2Switches) + return reactors +} + +// wait for all evidences on all reactors +func waitForTxs(t *testing.T, evidences types.Txs, reactors []*EvidencePoolReactor) { + // wait for the evidences in all evpools + wg := new(sync.WaitGroup) + for i := 0; i < len(reactors); i++ { + wg.Add(1) + go _waitForTxs(t, wg, evidences, i, reactors) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + timer := time.After(TIMEOUT) + select { + case <-timer: + t.Fatal("Timed out waiting for evidences") + case <-done: + } +} + +// wait for all evidences on a single evpool +func _waitForTxs(t *testing.T, wg *sync.WaitGroup, evidences types.Txs, reactorIdx int, reactors []*EvidencePoolReactor) { + + evpool := reactors[reactorIdx].EvidencePool + for evpool.Size() != len(evidences) { + time.Sleep(time.Second) + } + + reapedTxs := evpool.Reap(len(evidences)) + for i, evidence := range evidences { + assert.Equal(t, evidence, reapedTxs[i], fmt.Sprintf("evidences at index %d on reactor %d don't match: %v vs %v", i, reactorIdx, evidence, reapedTxs[i])) + } + wg.Done() +} + +var ( + NUM_TXS = 1000 + TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow +) + +func TestReactorBroadcastTxMessage(t *testing.T) { + config := cfg.TestConfig() + N := 4 + reactors := makeAndConnectEvidencePoolReactors(config, N) + + // send a bunch of evidences to the first reactor's evpool + // and wait for them all to be received in the others + evidences := checkTxs(t, reactors[0].EvidencePool, NUM_TXS) + waitForTxs(t, evidences, reactors) +}