package mempool import ( "crypto/md5" "crypto/rand" "encoding/binary" "fmt" "io/ioutil" "os" "path/filepath" "testing" "time" "github.com/tendermint/tendermint/abci/example/counter" "github.com/tendermint/tendermint/abci/example/kvstore" abci "github.com/tendermint/tendermint/abci/types" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" "github.com/stretchr/testify/require" ) func newMempoolWithApp(cc proxy.ClientCreator) *Mempool { config := cfg.ResetTestRoot("mempool_test") appConnMem, _ := cc.NewABCIClient() appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) err := appConnMem.Start() if err != nil { panic(err) } mempool := NewMempool(config.Mempool, appConnMem, 0) mempool.SetLogger(log.TestingLogger()) return mempool } func ensureNoFire(t *testing.T, ch <-chan int64, timeoutMS int) { timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) select { case <-ch: t.Fatal("Expected not to fire") case <-timer.C: } } func ensureFire(t *testing.T, ch <-chan int64, timeoutMS int) { timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) select { case <-ch: case <-timer.C: t.Fatal("Expected to fire") } } func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs { txs := make(types.Txs, count) for i := 0; i < count; i++ { txBytes := make([]byte, 20) txs[i] = txBytes _, err := rand.Read(txBytes) if err != nil { t.Error(err) } if err := mempool.CheckTx(txBytes, nil); err != nil { t.Fatalf("Error after CheckTx: %v", err) } } return txs } func TestTxsAvailable(t *testing.T) { app := kvstore.NewKVStoreApplication() cc := proxy.NewLocalClientCreator(app) mempool := newMempoolWithApp(cc) mempool.EnableTxsAvailable() timeoutMS := 500 // with no txs, it shouldnt fire ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // send a bunch of txs, it should only fire once txs := checkTxs(t, mempool, 100) ensureFire(t, mempool.TxsAvailable(), timeoutMS) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // call update with half the txs. // it should fire once now for the new height // since there are still txs left committedTxs, txs := txs[:50], txs[50:] if err := mempool.Update(1, committedTxs); err != nil { t.Error(err) } ensureFire(t, mempool.TxsAvailable(), timeoutMS) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // send a bunch more txs. we already fired for this height so it shouldnt fire again moreTxs := checkTxs(t, mempool, 50) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // now call update with all the txs. it should not fire as there are no txs left committedTxs = append(txs, moreTxs...) if err := mempool.Update(2, committedTxs); err != nil { t.Error(err) } ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) // send a bunch more txs, it should only fire once checkTxs(t, mempool, 100) ensureFire(t, mempool.TxsAvailable(), timeoutMS) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) } func TestSerialReap(t *testing.T) { app := counter.NewCounterApplication(true) app.SetOption(abci.RequestSetOption{"serial", "on"}) cc := proxy.NewLocalClientCreator(app) mempool := newMempoolWithApp(cc) appConnCon, _ := cc.NewABCIClient() appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) err := appConnCon.Start() require.Nil(t, err) cacheMap := make(map[string]struct{}) deliverTxsRange := func(start, end int) { // Deliver some txs. for i := start; i < end; i++ { // This will succeed txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) err := mempool.CheckTx(txBytes, nil) _, cached := cacheMap[string(txBytes)] if cached { require.NotNil(t, err, "expected error for cached tx") } else { require.Nil(t, err, "expected no err for uncached tx") } cacheMap[string(txBytes)] = struct{}{} // Duplicates are cached and should return error err = mempool.CheckTx(txBytes, nil) require.NotNil(t, err, "Expected error after CheckTx on duplicated tx") } } reapCheck := func(exp int) { txs := mempool.Reap(-1) require.Equal(t, len(txs), exp, cmn.Fmt("Expected to reap %v txs but got %v", exp, len(txs))) } updateRange := func(start, end int) { txs := make([]types.Tx, 0) for i := start; i < end; i++ { txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) txs = append(txs, txBytes) } if err := mempool.Update(0, txs); err != nil { t.Error(err) } } commitRange := func(start, end int) { // Deliver some txs. for i := start; i < end; i++ { txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) res, err := appConnCon.DeliverTxSync(txBytes) if err != nil { t.Errorf("Client error committing tx: %v", err) } if res.IsErr() { t.Errorf("Error committing tx. Code:%v result:%X log:%v", res.Code, res.Data, res.Log) } } res, err := appConnCon.CommitSync() if err != nil { t.Errorf("Client error committing: %v", err) } if len(res.Data) != 8 { t.Errorf("Error committing. Hash:%X", res.Data) } } //---------------------------------------- // Deliver some txs. deliverTxsRange(0, 100) // Reap the txs. reapCheck(100) // Reap again. We should get the same amount reapCheck(100) // Deliver 0 to 999, we should reap 900 new txs // because 100 were already counted. deliverTxsRange(0, 1000) // Reap the txs. reapCheck(1000) // Reap again. We should get the same amount reapCheck(1000) // Commit from the conensus AppConn commitRange(0, 500) updateRange(0, 500) // We should have 500 left. reapCheck(500) // Deliver 100 invalid txs and 100 valid txs deliverTxsRange(900, 1100) // We should have 600 now. reapCheck(600) } func TestMempoolCloseWAL(t *testing.T) { // 1. Create the temporary directory for mempool and WAL testing. rootDir, err := ioutil.TempDir("", "mempool-test") require.Nil(t, err, "expecting successful tmpdir creation") defer os.RemoveAll(rootDir) // 2. Ensure that it doesn't contain any elements -- Sanity check m1, err := filepath.Glob(filepath.Join(rootDir, "*")) require.Nil(t, err, "successful globbing expected") require.Equal(t, 0, len(m1), "no matches yet") // 3. Create the mempool wcfg := cfg.DefaultMempoolConfig() wcfg.RootDir = rootDir app := kvstore.NewKVStoreApplication() cc := proxy.NewLocalClientCreator(app) appConnMem, _ := cc.NewABCIClient() mempool := NewMempool(wcfg, appConnMem, 10) mempool.InitWAL() // 4. Ensure that the directory contains the WAL file m2, err := filepath.Glob(filepath.Join(rootDir, "*")) require.Nil(t, err, "successful globbing expected") require.Equal(t, 1, len(m2), "expecting the wal match in") // 5. Write some contents to the WAL mempool.CheckTx(types.Tx([]byte("foo")), nil) walFilepath := mempool.wal.Path sum1 := checksumFile(walFilepath, t) // 6. Sanity check to ensure that the written TX matches the expectation. require.Equal(t, sum1, checksumIt([]byte("foo\n")), "foo with a newline should be written") // 7. Invoke CloseWAL() and ensure it discards the // WAL thus any other write won't go through. require.True(t, mempool.CloseWAL(), "CloseWAL should CloseWAL") mempool.CheckTx(types.Tx([]byte("bar")), nil) sum2 := checksumFile(walFilepath, t) require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded") // 8. Second CloseWAL should do nothing require.False(t, mempool.CloseWAL(), "CloseWAL should CloseWAL") // 9. Sanity check to ensure that the WAL file still exists m3, err := filepath.Glob(filepath.Join(rootDir, "*")) require.Nil(t, err, "successful globbing expected") require.Equal(t, 1, len(m3), "expecting the wal match in") } func checksumIt(data []byte) string { h := md5.New() h.Write(data) return fmt.Sprintf("%x", h.Sum(nil)) } func checksumFile(p string, t *testing.T) string { data, err := ioutil.ReadFile(p) require.Nil(t, err, "expecting successful read of %q", p) return checksumIt(data) }