package mempool import ( "crypto/rand" "encoding/binary" "testing" "time" "github.com/tendermint/abci/example/counter" "github.com/tendermint/abci/example/dummy" "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) func newMempoolWithApp(cc proxy.ClientCreator) *Mempool { config := cfg.ResetTestRoot("mempool_test") appConnMem, _ := cc.NewABCIClient() appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) appConnMem.Start() mempool := NewMempool(config.Mempool, appConnMem, 0) mempool.SetLogger(log.TestingLogger()) return mempool } func ensureNoFire(t *testing.T, ch <-chan int, timeoutMS int) { timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) select { case <-ch: t.Fatal("Expected not to fire") case <-timer.C: } } func ensureFire(t *testing.T, ch <-chan int, timeoutMS int) { timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) select { case <-ch: case <-timer.C: t.Fatal("Expected to fire") } } func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs { txs := make(types.Txs, count) for i := 0; i < count; i++ { txBytes := make([]byte, 20) txs[i] = txBytes _, err := rand.Read(txBytes) if err != nil { t.Error(err) } if err := mempool.CheckTx(txBytes, nil); err != nil { t.Fatal("Error after CheckTx: %v", err) } } return txs } func TestTxsAvailable(t *testing.T) { app := dummy.NewDummyApplication() cc := proxy.NewLocalClientCreator(app) mempool := newMempoolWithApp(cc) mempool.EnableTxsAvailable() timeoutMS := 500 // with no txs, it shouldnt fire ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // send a bunch of txs, it should only fire once txs := checkTxs(t, mempool, 100) ensureFire(t, mempool.TxsAvailable(), timeoutMS) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // call update with half the txs. // it should fire once now for the new height // since there are still txs left committedTxs, txs := txs[:50], txs[50:] mempool.Update(1, committedTxs) ensureFire(t, mempool.TxsAvailable(), timeoutMS) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // send a bunch more txs. we already fired for this height so it shouldnt fire again moreTxs := checkTxs(t, mempool, 50) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // now call update with all the txs. it should not fire as there are no txs left committedTxs = append(txs, moreTxs...) mempool.Update(2, committedTxs) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // send a bunch more txs, it should only fire once checkTxs(t, mempool, 100) ensureFire(t, mempool.TxsAvailable(), timeoutMS) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) } func TestSerialReap(t *testing.T) { app := counter.NewCounterApplication(true) app.SetOption("serial", "on") cc := proxy.NewLocalClientCreator(app) mempool := newMempoolWithApp(cc) appConnCon, _ := cc.NewABCIClient() appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) if _, err := appConnCon.Start(); err != nil { t.Fatalf("Error starting ABCI client: %v", err.Error()) } deliverTxsRange := func(start, end int) { // Deliver some txs. for i := start; i < end; i++ { // This will succeed txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) err := mempool.CheckTx(txBytes, nil) if err != nil { t.Fatal("Error after CheckTx: %v", err) } // This will fail because not serial (incrementing) // However, error should still be nil. // It just won't show up on Reap(). err = mempool.CheckTx(txBytes, nil) if err != nil { t.Fatal("Error after CheckTx: %v", err) } } } reapCheck := func(exp int) { txs := mempool.Reap(-1) if len(txs) != exp { t.Fatalf("Expected to reap %v txs but got %v", exp, len(txs)) } } updateRange := func(start, end int) { txs := make([]types.Tx, 0) for i := start; i < end; i++ { txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) txs = append(txs, txBytes) } mempool.Update(0, txs) } commitRange := func(start, end int) { // Deliver some txs. for i := start; i < end; i++ { txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) res := appConnCon.DeliverTxSync(txBytes) if !res.IsOK() { t.Errorf("Error committing tx. Code:%v result:%X log:%v", res.Code, res.Data, res.Log) } } res := appConnCon.CommitSync() if len(res.Data) != 8 { t.Errorf("Error committing. Hash:%X log:%v", res.Data, res.Log) } } //---------------------------------------- // Deliver some txs. deliverTxsRange(0, 100) // Reap the txs. reapCheck(100) // Reap again. We should get the same amount reapCheck(100) // Deliver 0 to 999, we should reap 900 new txs // because 100 were already counted. deliverTxsRange(0, 1000) // Reap the txs. reapCheck(1000) // Reap again. We should get the same amount reapCheck(1000) // Commit from the conensus AppConn commitRange(0, 500) updateRange(0, 500) // We should have 500 left. reapCheck(500) // Deliver 100 invalid txs and 100 valid txs deliverTxsRange(900, 1100) // We should have 600 now. reapCheck(600) }