From c2585b5525ecb3cfa785d8923480ab1f55be8201 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 20 Nov 2017 00:52:02 +0000 Subject: [PATCH] evidence_pool.go -> pool.go. remove old test files --- evidence/evidence_pool_test.go | 202 ------------------------- evidence/{evidence_pool.go => pool.go} | 0 evidence/reactor_test.go | 108 ------------- 3 files changed, 310 deletions(-) delete mode 100644 evidence/evidence_pool_test.go rename evidence/{evidence_pool.go => pool.go} (100%) delete mode 100644 evidence/reactor_test.go diff --git a/evidence/evidence_pool_test.go b/evidence/evidence_pool_test.go deleted file mode 100644 index fba5941c6..000000000 --- a/evidence/evidence_pool_test.go +++ /dev/null @@ -1,202 +0,0 @@ -package evpool - -import ( - "crypto/rand" - "encoding/binary" - "testing" - "time" - - "github.com/tendermint/abci/example/counter" - "github.com/tendermint/abci/example/dummy" - "github.com/tendermint/tmlibs/log" - - cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/types" -) - -func newMempoolWithApp(cc proxy.ClientCreator) *Mempool { - config := cfg.ResetTestRoot("mempool_test") - - appConnMem, _ := cc.NewABCIClient() - appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) - appConnMem.Start() - mempool := NewMempool(config.Mempool, appConnMem, 0) - mempool.SetLogger(log.TestingLogger()) - return mempool -} - -func ensureNoFire(t *testing.T, ch <-chan int, timeoutMS int) { - timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) - select { - case <-ch: - t.Fatal("Expected not to fire") - case <-timer.C: - } -} - -func ensureFire(t *testing.T, ch <-chan int, timeoutMS int) { - timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) - select { - case <-ch: - case <-timer.C: - t.Fatal("Expected to fire") - } -} - -func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs { - txs := make(types.Txs, count) - for i := 0; i < count; i++ { - txBytes := make([]byte, 20) - txs[i] = txBytes - rand.Read(txBytes) - err := mempool.CheckTx(txBytes, nil) - if err != nil { - t.Fatal("Error after CheckTx: %v", err) - } - } - return txs -} - -func TestTxsAvailable(t *testing.T) { - app := dummy.NewDummyApplication() - cc := proxy.NewLocalClientCreator(app) - mempool := newMempoolWithApp(cc) - mempool.EnableTxsAvailable() - - timeoutMS := 500 - - // with no txs, it shouldnt fire - ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) - - // send a bunch of txs, it should only fire once - txs := checkTxs(t, mempool, 100) - ensureFire(t, mempool.TxsAvailable(), timeoutMS) - ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) - - // call update with half the txs. - // it should fire once now for the new height - // since there are still txs left - committedTxs, txs := txs[:50], txs[50:] - mempool.Update(1, committedTxs) - ensureFire(t, mempool.TxsAvailable(), timeoutMS) - ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) - - // send a bunch more txs. we already fired for this height so it shouldnt fire again - moreTxs := checkTxs(t, mempool, 50) - ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) - - // now call update with all the txs. it should not fire as there are no txs left - committedTxs = append(txs, moreTxs...) - mempool.Update(2, committedTxs) - ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) - - // send a bunch more txs, it should only fire once - checkTxs(t, mempool, 100) - ensureFire(t, mempool.TxsAvailable(), timeoutMS) - ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) -} - -func TestSerialReap(t *testing.T) { - app := counter.NewCounterApplication(true) - app.SetOption("serial", "on") - cc := proxy.NewLocalClientCreator(app) - - mempool := newMempoolWithApp(cc) - appConnCon, _ := cc.NewABCIClient() - appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) - if _, err := appConnCon.Start(); err != nil { - t.Fatalf("Error starting ABCI client: %v", err.Error()) - } - - deliverTxsRange := func(start, end int) { - // Deliver some txs. - for i := start; i < end; i++ { - - // This will succeed - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(i)) - err := mempool.CheckTx(txBytes, nil) - if err != nil { - t.Fatal("Error after CheckTx: %v", err) - } - - // This will fail because not serial (incrementing) - // However, error should still be nil. - // It just won't show up on Reap(). - err = mempool.CheckTx(txBytes, nil) - if err != nil { - t.Fatal("Error after CheckTx: %v", err) - } - - } - } - - reapCheck := func(exp int) { - txs := mempool.Reap(-1) - if len(txs) != exp { - t.Fatalf("Expected to reap %v txs but got %v", exp, len(txs)) - } - } - - updateRange := func(start, end int) { - txs := make([]types.Tx, 0) - for i := start; i < end; i++ { - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(i)) - txs = append(txs, txBytes) - } - mempool.Update(0, txs) - } - - commitRange := func(start, end int) { - // Deliver some txs. - for i := start; i < end; i++ { - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(i)) - res := appConnCon.DeliverTxSync(txBytes) - if !res.IsOK() { - t.Errorf("Error committing tx. Code:%v result:%X log:%v", - res.Code, res.Data, res.Log) - } - } - res := appConnCon.CommitSync() - if len(res.Data) != 8 { - t.Errorf("Error committing. Hash:%X log:%v", res.Data, res.Log) - } - } - - //---------------------------------------- - - // Deliver some txs. - deliverTxsRange(0, 100) - - // Reap the txs. - reapCheck(100) - - // Reap again. We should get the same amount - reapCheck(100) - - // Deliver 0 to 999, we should reap 900 new txs - // because 100 were already counted. - deliverTxsRange(0, 1000) - - // Reap the txs. - reapCheck(1000) - - // Reap again. We should get the same amount - reapCheck(1000) - - // Commit from the conensus AppConn - commitRange(0, 500) - updateRange(0, 500) - - // We should have 500 left. - reapCheck(500) - - // Deliver 100 invalid txs and 100 valid txs - deliverTxsRange(900, 1100) - - // We should have 600 now. - reapCheck(600) -} diff --git a/evidence/evidence_pool.go b/evidence/pool.go similarity index 100% rename from evidence/evidence_pool.go rename to evidence/pool.go diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go deleted file mode 100644 index e488311b0..000000000 --- a/evidence/reactor_test.go +++ /dev/null @@ -1,108 +0,0 @@ -package evpool - -import ( - "fmt" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/go-kit/kit/log/term" - - "github.com/tendermint/abci/example/dummy" - "github.com/tendermint/tmlibs/log" - - cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/types" -) - -// evpoolLogger is a TestingLogger which uses a different -// color for each validator ("validator" key must exist). -func evpoolLogger() log.Logger { - return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { - for i := 0; i < len(keyvals)-1; i += 2 { - if keyvals[i] == "validator" { - return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} - } - } - return term.FgBgColor{} - }) -} - -// connect N evpool reactors through N switches -func makeAndConnectEvidencePoolReactors(config *cfg.Config, N int) []*EvidencePoolReactor { - reactors := make([]*EvidencePoolReactor, N) - logger := evpoolLogger() - for i := 0; i < N; i++ { - app := dummy.NewDummyApplication() - cc := proxy.NewLocalClientCreator(app) - evpool := newEvidencePoolWithApp(cc) - - reactors[i] = NewEvidencePoolReactor(config.EvidencePool, evpool) // so we dont start the consensus states - reactors[i].SetLogger(logger.With("validator", i)) - } - - p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("MEMPOOL", reactors[i]) - return s - - }, p2p.Connect2Switches) - return reactors -} - -// wait for all evidences on all reactors -func waitForTxs(t *testing.T, evidences types.Txs, reactors []*EvidencePoolReactor) { - // wait for the evidences in all evpools - wg := new(sync.WaitGroup) - for i := 0; i < len(reactors); i++ { - wg.Add(1) - go _waitForTxs(t, wg, evidences, i, reactors) - } - - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - timer := time.After(TIMEOUT) - select { - case <-timer: - t.Fatal("Timed out waiting for evidences") - case <-done: - } -} - -// wait for all evidences on a single evpool -func _waitForTxs(t *testing.T, wg *sync.WaitGroup, evidences types.Txs, reactorIdx int, reactors []*EvidencePoolReactor) { - - evpool := reactors[reactorIdx].EvidencePool - for evpool.Size() != len(evidences) { - time.Sleep(time.Second) - } - - reapedTxs := evpool.Reap(len(evidences)) - for i, evidence := range evidences { - assert.Equal(t, evidence, reapedTxs[i], fmt.Sprintf("evidences at index %d on reactor %d don't match: %v vs %v", i, reactorIdx, evidence, reapedTxs[i])) - } - wg.Done() -} - -var ( - NUM_TXS = 1000 - TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow -) - -func TestReactorBroadcastTxMessage(t *testing.T) { - config := cfg.TestConfig() - N := 4 - reactors := makeAndConnectEvidencePoolReactors(config, N) - - // send a bunch of evidences to the first reactor's evpool - // and wait for them all to be received in the others - evidences := checkTxs(t, reactors[0].EvidencePool, NUM_TXS) - waitForTxs(t, evidences, reactors) -}