|
package mempool
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"crypto/sha256"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io/ioutil"
|
|
mrand "math/rand"
|
|
"os"
|
|
"path/filepath"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
amino "github.com/tendermint/go-amino"
|
|
|
|
"github.com/tendermint/tendermint/abci/example/counter"
|
|
"github.com/tendermint/tendermint/abci/example/kvstore"
|
|
abciserver "github.com/tendermint/tendermint/abci/server"
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
cfg "github.com/tendermint/tendermint/config"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
tmrand "github.com/tendermint/tendermint/libs/rand"
|
|
"github.com/tendermint/tendermint/libs/service"
|
|
"github.com/tendermint/tendermint/proxy"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
// A cleanupFunc cleans up any config / test files created for a particular
|
|
// test.
|
|
type cleanupFunc func()
|
|
|
|
func newMempoolWithApp(cc proxy.ClientCreator) (*CListMempool, cleanupFunc) {
|
|
return newMempoolWithAppAndConfig(cc, cfg.ResetTestRoot("mempool_test"))
|
|
}
|
|
|
|
func newMempoolWithAppAndConfig(cc proxy.ClientCreator, config *cfg.Config) (*CListMempool, cleanupFunc) {
|
|
appConnMem, _ := cc.NewABCIClient()
|
|
appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
|
|
err := appConnMem.Start()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
mempool := NewCListMempool(config.Mempool, appConnMem, 0)
|
|
mempool.SetLogger(log.TestingLogger())
|
|
return mempool, func() { os.RemoveAll(config.RootDir) }
|
|
}
|
|
|
|
func ensureNoFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
|
|
timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
|
|
select {
|
|
case <-ch:
|
|
t.Fatal("Expected not to fire")
|
|
case <-timer.C:
|
|
}
|
|
}
|
|
|
|
func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) {
|
|
timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond)
|
|
select {
|
|
case <-ch:
|
|
case <-timer.C:
|
|
t.Fatal("Expected to fire")
|
|
}
|
|
}
|
|
|
|
func checkTxs(t *testing.T, mempool Mempool, count int, peerID uint16) types.Txs {
|
|
txs := make(types.Txs, count)
|
|
txInfo := TxInfo{SenderID: peerID}
|
|
for i := 0; i < count; i++ {
|
|
txBytes := make([]byte, 20)
|
|
txs[i] = txBytes
|
|
_, err := rand.Read(txBytes)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
if err := mempool.CheckTx(txBytes, nil, txInfo); err != nil {
|
|
// Skip invalid txs.
|
|
// TestMempoolFilters will fail otherwise. It asserts a number of txs
|
|
// returned.
|
|
if IsPreCheckError(err) {
|
|
continue
|
|
}
|
|
t.Fatalf("CheckTx failed: %v while checking #%d tx", err, i)
|
|
}
|
|
}
|
|
return txs
|
|
}
|
|
|
|
func TestReapMaxBytesMaxGas(t *testing.T) {
|
|
app := kvstore.NewApplication()
|
|
cc := proxy.NewLocalClientCreator(app)
|
|
mempool, cleanup := newMempoolWithApp(cc)
|
|
defer cleanup()
|
|
|
|
// Ensure gas calculation behaves as expected
|
|
checkTxs(t, mempool, 1, UnknownPeerID)
|
|
tx0 := mempool.TxsFront().Value.(*mempoolTx)
|
|
// assert that kv store has gas wanted = 1.
|
|
require.Equal(t, app.CheckTx(abci.RequestCheckTx{Tx: tx0.tx}).GasWanted, int64(1), "KVStore had a gas value neq to 1")
|
|
require.Equal(t, tx0.gasWanted, int64(1), "transactions gas was set incorrectly")
|
|
// ensure each tx is 20 bytes long
|
|
require.Equal(t, len(tx0.tx), 20, "Tx is longer than 20 bytes")
|
|
mempool.Flush()
|
|
|
|
// each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
|
|
// each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
|
|
tests := []struct {
|
|
numTxsToCreate int
|
|
maxBytes int64
|
|
maxGas int64
|
|
expectedNumTxs int
|
|
}{
|
|
{20, -1, -1, 20},
|
|
{20, -1, 0, 0},
|
|
{20, -1, 10, 10},
|
|
{20, -1, 30, 20},
|
|
{20, 0, -1, 0},
|
|
{20, 0, 10, 0},
|
|
{20, 10, 10, 0},
|
|
{20, 22, 10, 1},
|
|
{20, 220, -1, 10},
|
|
{20, 220, 5, 5},
|
|
{20, 220, 10, 10},
|
|
{20, 220, 15, 10},
|
|
{20, 20000, -1, 20},
|
|
{20, 20000, 5, 5},
|
|
{20, 20000, 30, 20},
|
|
}
|
|
for tcIndex, tt := range tests {
|
|
checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID)
|
|
got := mempool.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas)
|
|
assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d",
|
|
len(got), tt.expectedNumTxs, tcIndex)
|
|
mempool.Flush()
|
|
}
|
|
}
|
|
|
|
func TestMempoolFilters(t *testing.T) {
|
|
app := kvstore.NewApplication()
|
|
cc := proxy.NewLocalClientCreator(app)
|
|
mempool, cleanup := newMempoolWithApp(cc)
|
|
defer cleanup()
|
|
emptyTxArr := []types.Tx{[]byte{}}
|
|
|
|
nopPreFilter := func(tx types.Tx) error { return nil }
|
|
nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil }
|
|
|
|
// each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
|
|
// each tx has 20 bytes + amino overhead = 21 bytes, 1 gas
|
|
tests := []struct {
|
|
numTxsToCreate int
|
|
preFilter PreCheckFunc
|
|
postFilter PostCheckFunc
|
|
expectedNumTxs int
|
|
}{
|
|
{10, nopPreFilter, nopPostFilter, 10},
|
|
{10, PreCheckAminoMaxBytes(10), nopPostFilter, 0},
|
|
{10, PreCheckAminoMaxBytes(20), nopPostFilter, 0},
|
|
{10, PreCheckAminoMaxBytes(22), nopPostFilter, 10},
|
|
{10, nopPreFilter, PostCheckMaxGas(-1), 10},
|
|
{10, nopPreFilter, PostCheckMaxGas(0), 0},
|
|
{10, nopPreFilter, PostCheckMaxGas(1), 10},
|
|
{10, nopPreFilter, PostCheckMaxGas(3000), 10},
|
|
{10, PreCheckAminoMaxBytes(10), PostCheckMaxGas(20), 0},
|
|
{10, PreCheckAminoMaxBytes(30), PostCheckMaxGas(20), 10},
|
|
{10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(1), 10},
|
|
{10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(0), 0},
|
|
}
|
|
for tcIndex, tt := range tests {
|
|
mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter)
|
|
checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID)
|
|
require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
|
|
mempool.Flush()
|
|
}
|
|
}
|
|
|
|
func TestMempoolUpdate(t *testing.T) {
|
|
app := kvstore.NewApplication()
|
|
cc := proxy.NewLocalClientCreator(app)
|
|
mempool, cleanup := newMempoolWithApp(cc)
|
|
defer cleanup()
|
|
|
|
// 1. Adds valid txs to the cache
|
|
{
|
|
mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
|
|
err := mempool.CheckTx([]byte{0x01}, nil, TxInfo{})
|
|
if assert.Error(t, err) {
|
|
assert.Equal(t, ErrTxInCache, err)
|
|
}
|
|
}
|
|
|
|
// 2. Removes valid txs from the mempool
|
|
{
|
|
err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{})
|
|
require.NoError(t, err)
|
|
mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
|
|
assert.Zero(t, mempool.Size())
|
|
}
|
|
|
|
// 3. Removes invalid transactions from the cache and the mempool (if present)
|
|
{
|
|
err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{})
|
|
require.NoError(t, err)
|
|
mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil)
|
|
assert.Zero(t, mempool.Size())
|
|
|
|
err = mempool.CheckTx([]byte{0x03}, nil, TxInfo{})
|
|
assert.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
func TestTxsAvailable(t *testing.T) {
|
|
app := kvstore.NewApplication()
|
|
cc := proxy.NewLocalClientCreator(app)
|
|
mempool, cleanup := newMempoolWithApp(cc)
|
|
defer cleanup()
|
|
mempool.EnableTxsAvailable()
|
|
|
|
timeoutMS := 500
|
|
|
|
// with no txs, it shouldnt fire
|
|
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
|
|
|
|
// send a bunch of txs, it should only fire once
|
|
txs := checkTxs(t, mempool, 100, UnknownPeerID)
|
|
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
|
|
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
|
|
|
|
// call update with half the txs.
|
|
// it should fire once now for the new height
|
|
// since there are still txs left
|
|
committedTxs, txs := txs[:50], txs[50:]
|
|
if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
|
|
t.Error(err)
|
|
}
|
|
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
|
|
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
|
|
|
|
// send a bunch more txs. we already fired for this height so it shouldnt fire again
|
|
moreTxs := checkTxs(t, mempool, 50, UnknownPeerID)
|
|
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
|
|
|
|
// now call update with all the txs. it should not fire as there are no txs left
|
|
committedTxs = append(txs, moreTxs...) //nolint: gocritic
|
|
if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
|
|
t.Error(err)
|
|
}
|
|
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
|
|
|
|
// send a bunch more txs, it should only fire once
|
|
checkTxs(t, mempool, 100, UnknownPeerID)
|
|
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
|
|
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
|
|
}
|
|
|
|
func TestSerialReap(t *testing.T) {
|
|
app := counter.NewApplication(true)
|
|
app.SetOption(abci.RequestSetOption{Key: "serial", Value: "on"})
|
|
cc := proxy.NewLocalClientCreator(app)
|
|
|
|
mempool, cleanup := newMempoolWithApp(cc)
|
|
defer cleanup()
|
|
|
|
appConnCon, _ := cc.NewABCIClient()
|
|
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
|
|
err := appConnCon.Start()
|
|
require.Nil(t, err)
|
|
|
|
cacheMap := make(map[string]struct{})
|
|
deliverTxsRange := func(start, end int) {
|
|
// Deliver some txs.
|
|
for i := start; i < end; i++ {
|
|
|
|
// This will succeed
|
|
txBytes := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(txBytes, uint64(i))
|
|
err := mempool.CheckTx(txBytes, nil, TxInfo{})
|
|
_, cached := cacheMap[string(txBytes)]
|
|
if cached {
|
|
require.NotNil(t, err, "expected error for cached tx")
|
|
} else {
|
|
require.Nil(t, err, "expected no err for uncached tx")
|
|
}
|
|
cacheMap[string(txBytes)] = struct{}{}
|
|
|
|
// Duplicates are cached and should return error
|
|
err = mempool.CheckTx(txBytes, nil, TxInfo{})
|
|
require.NotNil(t, err, "Expected error after CheckTx on duplicated tx")
|
|
}
|
|
}
|
|
|
|
reapCheck := func(exp int) {
|
|
txs := mempool.ReapMaxBytesMaxGas(-1, -1)
|
|
require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs)))
|
|
}
|
|
|
|
updateRange := func(start, end int) {
|
|
txs := make([]types.Tx, 0)
|
|
for i := start; i < end; i++ {
|
|
txBytes := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(txBytes, uint64(i))
|
|
txs = append(txs, txBytes)
|
|
}
|
|
if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil {
|
|
t.Error(err)
|
|
}
|
|
}
|
|
|
|
commitRange := func(start, end int) {
|
|
// Deliver some txs.
|
|
for i := start; i < end; i++ {
|
|
txBytes := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(txBytes, uint64(i))
|
|
res, err := appConnCon.DeliverTxSync(abci.RequestDeliverTx{Tx: txBytes})
|
|
if err != nil {
|
|
t.Errorf("client error committing tx: %v", err)
|
|
}
|
|
if res.IsErr() {
|
|
t.Errorf("error committing tx. Code:%v result:%X log:%v",
|
|
res.Code, res.Data, res.Log)
|
|
}
|
|
}
|
|
res, err := appConnCon.CommitSync()
|
|
if err != nil {
|
|
t.Errorf("client error committing: %v", err)
|
|
}
|
|
if len(res.Data) != 8 {
|
|
t.Errorf("error committing. Hash:%X", res.Data)
|
|
}
|
|
}
|
|
|
|
//----------------------------------------
|
|
|
|
// Deliver some txs.
|
|
deliverTxsRange(0, 100)
|
|
|
|
// Reap the txs.
|
|
reapCheck(100)
|
|
|
|
// Reap again. We should get the same amount
|
|
reapCheck(100)
|
|
|
|
// Deliver 0 to 999, we should reap 900 new txs
|
|
// because 100 were already counted.
|
|
deliverTxsRange(0, 1000)
|
|
|
|
// Reap the txs.
|
|
reapCheck(1000)
|
|
|
|
// Reap again. We should get the same amount
|
|
reapCheck(1000)
|
|
|
|
// Commit from the conensus AppConn
|
|
commitRange(0, 500)
|
|
updateRange(0, 500)
|
|
|
|
// We should have 500 left.
|
|
reapCheck(500)
|
|
|
|
// Deliver 100 invalid txs and 100 valid txs
|
|
deliverTxsRange(900, 1100)
|
|
|
|
// We should have 600 now.
|
|
reapCheck(600)
|
|
}
|
|
|
|
func TestMempoolCloseWAL(t *testing.T) {
|
|
// 1. Create the temporary directory for mempool and WAL testing.
|
|
rootDir, err := ioutil.TempDir("", "mempool-test")
|
|
require.Nil(t, err, "expecting successful tmpdir creation")
|
|
|
|
// 2. Ensure that it doesn't contain any elements -- Sanity check
|
|
m1, err := filepath.Glob(filepath.Join(rootDir, "*"))
|
|
require.Nil(t, err, "successful globbing expected")
|
|
require.Equal(t, 0, len(m1), "no matches yet")
|
|
|
|
// 3. Create the mempool
|
|
wcfg := cfg.DefaultConfig()
|
|
wcfg.Mempool.RootDir = rootDir
|
|
app := kvstore.NewApplication()
|
|
cc := proxy.NewLocalClientCreator(app)
|
|
mempool, cleanup := newMempoolWithAppAndConfig(cc, wcfg)
|
|
defer cleanup()
|
|
mempool.height = 10
|
|
mempool.InitWAL()
|
|
|
|
// 4. Ensure that the directory contains the WAL file
|
|
m2, err := filepath.Glob(filepath.Join(rootDir, "*"))
|
|
require.Nil(t, err, "successful globbing expected")
|
|
require.Equal(t, 1, len(m2), "expecting the wal match in")
|
|
|
|
// 5. Write some contents to the WAL
|
|
mempool.CheckTx(types.Tx([]byte("foo")), nil, TxInfo{})
|
|
walFilepath := mempool.wal.Path
|
|
sum1 := checksumFile(walFilepath, t)
|
|
|
|
// 6. Sanity check to ensure that the written TX matches the expectation.
|
|
require.Equal(t, sum1, checksumIt([]byte("foo\n")), "foo with a newline should be written")
|
|
|
|
// 7. Invoke CloseWAL() and ensure it discards the
|
|
// WAL thus any other write won't go through.
|
|
mempool.CloseWAL()
|
|
mempool.CheckTx(types.Tx([]byte("bar")), nil, TxInfo{})
|
|
sum2 := checksumFile(walFilepath, t)
|
|
require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded")
|
|
|
|
// 8. Sanity check to ensure that the WAL file still exists
|
|
m3, err := filepath.Glob(filepath.Join(rootDir, "*"))
|
|
require.Nil(t, err, "successful globbing expected")
|
|
require.Equal(t, 1, len(m3), "expecting the wal match in")
|
|
}
|
|
|
|
// Size of the amino encoded TxMessage is the length of the
|
|
// encoded byte array, plus 1 for the struct field, plus 4
|
|
// for the amino prefix.
|
|
func txMessageSize(tx types.Tx) int {
|
|
return amino.ByteSliceSize(tx) + 1 + 4
|
|
}
|
|
|
|
func TestMempoolMaxMsgSize(t *testing.T) {
|
|
app := kvstore.NewApplication()
|
|
cc := proxy.NewLocalClientCreator(app)
|
|
mempl, cleanup := newMempoolWithApp(cc)
|
|
defer cleanup()
|
|
|
|
maxTxSize := mempl.config.MaxTxBytes
|
|
maxMsgSize := calcMaxMsgSize(maxTxSize)
|
|
|
|
testCases := []struct {
|
|
len int
|
|
err bool
|
|
}{
|
|
// check small txs. no error
|
|
{10, false},
|
|
{1000, false},
|
|
{1000000, false},
|
|
|
|
// check around maxTxSize
|
|
// changes from no error to error
|
|
{maxTxSize - 2, false},
|
|
{maxTxSize - 1, false},
|
|
{maxTxSize, false},
|
|
{maxTxSize + 1, true},
|
|
{maxTxSize + 2, true},
|
|
|
|
// check around maxMsgSize. all error
|
|
{maxMsgSize - 1, true},
|
|
{maxMsgSize, true},
|
|
{maxMsgSize + 1, true},
|
|
}
|
|
|
|
for i, testCase := range testCases {
|
|
caseString := fmt.Sprintf("case %d, len %d", i, testCase.len)
|
|
|
|
tx := tmrand.Bytes(testCase.len)
|
|
err := mempl.CheckTx(tx, nil, TxInfo{})
|
|
msg := &TxMessage{tx}
|
|
encoded := cdc.MustMarshalBinaryBare(msg)
|
|
require.Equal(t, len(encoded), txMessageSize(tx), caseString)
|
|
if !testCase.err {
|
|
require.True(t, len(encoded) <= maxMsgSize, caseString)
|
|
require.NoError(t, err, caseString)
|
|
} else {
|
|
require.True(t, len(encoded) > maxMsgSize, caseString)
|
|
require.Equal(t, err, ErrTxTooLarge{maxTxSize, testCase.len}, caseString)
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func TestMempoolTxsBytes(t *testing.T) {
|
|
app := kvstore.NewApplication()
|
|
cc := proxy.NewLocalClientCreator(app)
|
|
config := cfg.ResetTestRoot("mempool_test")
|
|
config.Mempool.MaxTxsBytes = 10
|
|
mempool, cleanup := newMempoolWithAppAndConfig(cc, config)
|
|
defer cleanup()
|
|
|
|
// 1. zero by default
|
|
assert.EqualValues(t, 0, mempool.TxsBytes())
|
|
|
|
// 2. len(tx) after CheckTx
|
|
err := mempool.CheckTx([]byte{0x01}, nil, TxInfo{})
|
|
require.NoError(t, err)
|
|
assert.EqualValues(t, 1, mempool.TxsBytes())
|
|
|
|
// 3. zero again after tx is removed by Update
|
|
mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
|
|
assert.EqualValues(t, 0, mempool.TxsBytes())
|
|
|
|
// 4. zero after Flush
|
|
err = mempool.CheckTx([]byte{0x02, 0x03}, nil, TxInfo{})
|
|
require.NoError(t, err)
|
|
assert.EqualValues(t, 2, mempool.TxsBytes())
|
|
|
|
mempool.Flush()
|
|
assert.EqualValues(t, 0, mempool.TxsBytes())
|
|
|
|
// 5. ErrMempoolIsFull is returned when/if MaxTxsBytes limit is reached.
|
|
err = mempool.CheckTx([]byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04}, nil, TxInfo{})
|
|
require.NoError(t, err)
|
|
err = mempool.CheckTx([]byte{0x05}, nil, TxInfo{})
|
|
if assert.Error(t, err) {
|
|
assert.IsType(t, ErrMempoolIsFull{}, err)
|
|
}
|
|
|
|
// 6. zero after tx is rechecked and removed due to not being valid anymore
|
|
app2 := counter.NewApplication(true)
|
|
cc = proxy.NewLocalClientCreator(app2)
|
|
mempool, cleanup = newMempoolWithApp(cc)
|
|
defer cleanup()
|
|
|
|
txBytes := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(txBytes, uint64(0))
|
|
|
|
err = mempool.CheckTx(txBytes, nil, TxInfo{})
|
|
require.NoError(t, err)
|
|
assert.EqualValues(t, 8, mempool.TxsBytes())
|
|
|
|
appConnCon, _ := cc.NewABCIClient()
|
|
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
|
|
err = appConnCon.Start()
|
|
require.Nil(t, err)
|
|
defer appConnCon.Stop()
|
|
res, err := appConnCon.DeliverTxSync(abci.RequestDeliverTx{Tx: txBytes})
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, 0, res.Code)
|
|
res2, err := appConnCon.CommitSync()
|
|
require.NoError(t, err)
|
|
require.NotEmpty(t, res2.Data)
|
|
|
|
// Pretend like we committed nothing so txBytes gets rechecked and removed.
|
|
mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil)
|
|
assert.EqualValues(t, 0, mempool.TxsBytes())
|
|
}
|
|
|
|
// This will non-deterministically catch some concurrency failures like
|
|
// https://github.com/tendermint/tendermint/issues/3509
|
|
// TODO: all of the tests should probably also run using the remote proxy app
|
|
// since otherwise we're not actually testing the concurrency of the mempool here!
|
|
func TestMempoolRemoteAppConcurrency(t *testing.T) {
|
|
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
|
|
app := kvstore.NewApplication()
|
|
cc, server := newRemoteApp(t, sockPath, app)
|
|
defer server.Stop()
|
|
config := cfg.ResetTestRoot("mempool_test")
|
|
mempool, cleanup := newMempoolWithAppAndConfig(cc, config)
|
|
defer cleanup()
|
|
|
|
// generate small number of txs
|
|
nTxs := 10
|
|
txLen := 200
|
|
txs := make([]types.Tx, nTxs)
|
|
for i := 0; i < nTxs; i++ {
|
|
txs[i] = tmrand.Bytes(txLen)
|
|
}
|
|
|
|
// simulate a group of peers sending them over and over
|
|
N := config.Mempool.Size
|
|
maxPeers := 5
|
|
for i := 0; i < N; i++ {
|
|
peerID := mrand.Intn(maxPeers)
|
|
txNum := mrand.Intn(nTxs)
|
|
tx := txs[txNum]
|
|
|
|
// this will err with ErrTxInCache many times ...
|
|
mempool.CheckTx(tx, nil, TxInfo{SenderID: uint16(peerID)})
|
|
}
|
|
err := mempool.FlushAppConn()
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// caller must close server
|
|
func newRemoteApp(
|
|
t *testing.T,
|
|
addr string,
|
|
app abci.Application,
|
|
) (
|
|
clientCreator proxy.ClientCreator,
|
|
server service.Service,
|
|
) {
|
|
clientCreator = proxy.NewRemoteClientCreator(addr, "socket", true)
|
|
|
|
// Start server
|
|
server = abciserver.NewSocketServer(addr, app)
|
|
server.SetLogger(log.TestingLogger().With("module", "abci-server"))
|
|
if err := server.Start(); err != nil {
|
|
t.Fatalf("Error starting socket server: %v", err.Error())
|
|
}
|
|
return clientCreator, server
|
|
}
|
|
func checksumIt(data []byte) string {
|
|
h := sha256.New()
|
|
h.Write(data)
|
|
return fmt.Sprintf("%x", h.Sum(nil))
|
|
}
|
|
|
|
func checksumFile(p string, t *testing.T) string {
|
|
data, err := ioutil.ReadFile(p)
|
|
require.Nil(t, err, "expecting successful read of %q", p)
|
|
return checksumIt(data)
|
|
}
|
|
|
|
func abciResponses(n int, code uint32) []*abci.ResponseDeliverTx {
|
|
responses := make([]*abci.ResponseDeliverTx, 0, n)
|
|
for i := 0; i < n; i++ {
|
|
responses = append(responses, &abci.ResponseDeliverTx{Code: code})
|
|
}
|
|
return responses
|
|
}
|