|
|
- package mempool
-
- import (
- "crypto/rand"
- "encoding/binary"
- "testing"
- "time"
-
- "github.com/tendermint/abci/example/counter"
- "github.com/tendermint/abci/example/dummy"
- "github.com/tendermint/tmlibs/log"
-
- cfg "github.com/tendermint/tendermint/config"
- "github.com/tendermint/tendermint/proxy"
- "github.com/tendermint/tendermint/types"
- )
-
- func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
- config := cfg.ResetTestRoot("mempool_test")
-
- appConnMem, _ := cc.NewABCIClient()
- appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
- appConnMem.Start()
- mempool := NewMempool(config.Mempool, appConnMem, 0)
- mempool.SetLogger(log.TestingLogger())
- return mempool
- }
-
- func ensureNoFire(t *testing.T, ch <-chan int, timeoutMS int) {
- timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
- select {
- case <-ch:
- t.Fatal("Expected not to fire")
- case <-timer.C:
- }
- }
-
- func ensureFire(t *testing.T, ch <-chan int, timeoutMS int) {
- timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
- select {
- case <-ch:
- case <-timer.C:
- t.Fatal("Expected to fire")
- }
- }
-
- func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
- txs := make(types.Txs, count)
- for i := 0; i < count; i++ {
- txBytes := make([]byte, 20)
- txs[i] = txBytes
- rand.Read(txBytes)
- err := mempool.CheckTx(txBytes, nil)
- if err != nil {
- t.Fatal("Error after CheckTx: %v", err)
- }
- }
- return txs
- }
-
- func TestTxsAvailable(t *testing.T) {
- app := dummy.NewDummyApplication()
- cc := proxy.NewLocalClientCreator(app)
- mempool := newMempoolWithApp(cc)
- mempool.EnableTxsAvailable()
-
- timeoutMS := 500
-
- // with no txs, it shouldnt fire
- ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
-
- // send a bunch of txs, it should only fire once
- txs := checkTxs(t, mempool, 100)
- ensureFire(t, mempool.TxsAvailable(), timeoutMS)
- ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
-
- // call update with half the txs.
- // it should fire once now for the new height
- // since there are still txs left
- committedTxs, txs := txs[:50], txs[50:]
- mempool.Update(1, committedTxs)
- ensureFire(t, mempool.TxsAvailable(), timeoutMS)
- ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
-
- // send a bunch more txs. we already fired for this height so it shouldnt fire again
- moreTxs := checkTxs(t, mempool, 50)
- ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
-
- // now call update with all the txs. it should not fire as there are no txs left
- committedTxs = append(txs, moreTxs...)
- mempool.Update(2, committedTxs)
- ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
-
- // send a bunch more txs, it should only fire once
- checkTxs(t, mempool, 100)
- ensureFire(t, mempool.TxsAvailable(), timeoutMS)
- ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
- }
-
- func TestSerialReap(t *testing.T) {
- app := counter.NewCounterApplication(true)
- app.SetOption("serial", "on")
- cc := proxy.NewLocalClientCreator(app)
-
- mempool := newMempoolWithApp(cc)
- appConnCon, _ := cc.NewABCIClient()
- appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
- if _, err := appConnCon.Start(); err != nil {
- t.Fatalf("Error starting ABCI client: %v", err.Error())
- }
-
- deliverTxsRange := func(start, end int) {
- // Deliver some txs.
- for i := start; i < end; i++ {
-
- // This will succeed
- txBytes := make([]byte, 8)
- binary.BigEndian.PutUint64(txBytes, uint64(i))
- err := mempool.CheckTx(txBytes, nil)
- if err != nil {
- t.Fatal("Error after CheckTx: %v", err)
- }
-
- // This will fail because not serial (incrementing)
- // However, error should still be nil.
- // It just won't show up on Reap().
- err = mempool.CheckTx(txBytes, nil)
- if err != nil {
- t.Fatal("Error after CheckTx: %v", err)
- }
-
- }
- }
-
- reapCheck := func(exp int) {
- txs := mempool.Reap(-1)
- if len(txs) != exp {
- t.Fatalf("Expected to reap %v txs but got %v", exp, len(txs))
- }
- }
-
- updateRange := func(start, end int) {
- txs := make([]types.Tx, 0)
- for i := start; i < end; i++ {
- txBytes := make([]byte, 8)
- binary.BigEndian.PutUint64(txBytes, uint64(i))
- txs = append(txs, txBytes)
- }
- mempool.Update(0, txs)
- }
-
- commitRange := func(start, end int) {
- // Deliver some txs.
- for i := start; i < end; i++ {
- txBytes := make([]byte, 8)
- binary.BigEndian.PutUint64(txBytes, uint64(i))
- res := appConnCon.DeliverTxSync(txBytes)
- if !res.IsOK() {
- t.Errorf("Error committing tx. Code:%v result:%X log:%v",
- res.Code, res.Data, res.Log)
- }
- }
- res := appConnCon.CommitSync()
- if len(res.Data) != 8 {
- t.Errorf("Error committing. Hash:%X log:%v", res.Data, res.Log)
- }
- }
-
- //----------------------------------------
-
- // Deliver some txs.
- deliverTxsRange(0, 100)
-
- // Reap the txs.
- reapCheck(100)
-
- // Reap again. We should get the same amount
- reapCheck(100)
-
- // Deliver 0 to 999, we should reap 900 new txs
- // because 100 were already counted.
- deliverTxsRange(0, 1000)
-
- // Reap the txs.
- reapCheck(1000)
-
- // Reap again. We should get the same amount
- reapCheck(1000)
-
- // Commit from the conensus AppConn
- commitRange(0, 500)
- updateRange(0, 500)
-
- // We should have 500 left.
- reapCheck(500)
-
- // Deliver 100 invalid txs and 100 valid txs
- deliverTxsRange(900, 1100)
-
- // We should have 600 now.
- reapCheck(600)
- }
|