From 09fe44a6b3b7b98d268a2a3fd11ec419a7b605f2 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 11 Apr 2016 18:08:28 -0400 Subject: [PATCH] test for mempool commit/update concurrency bug --- consensus/common_test.go | 9 ++-- consensus/mempool_test.go | 110 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 4 deletions(-) create mode 100644 consensus/mempool_test.go diff --git a/consensus/common_test.go b/consensus/common_test.go index fad0969bf..c942ac978 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -16,6 +16,7 @@ import ( sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" tmspcli "github.com/tendermint/tmsp/client" + tmsp "github.com/tendermint/tmsp/types" "github.com/tendermint/tmsp/example/counter" ) @@ -302,17 +303,17 @@ func fixedConsensusState() *ConsensusState { state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) privValidatorFile := config.GetString("priv_validator_file") privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - return newConsensusState(state, privValidator) + return newConsensusState(state, privValidator, counter.NewCounterApplication(true)) } -func newConsensusState(state *sm.State, pv *types.PrivValidator) *ConsensusState { +func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState { // Get BlockStore blockDB := dbm.NewMemDB() blockStore := bc.NewBlockStore(blockDB) // one for mempool, one for consensus - mtx, app := new(sync.Mutex), counter.NewCounterApplication(false) + mtx := new(sync.Mutex) proxyAppConnMem := tmspcli.NewLocalClient(mtx, app) proxyAppConnCon := tmspcli.NewLocalClient(mtx, app) @@ -335,7 +336,7 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { vss := make([]*validatorStub, nValidators) - cs := newConsensusState(state, privVals[0]) + cs := newConsensusState(state, privVals[0], counter.NewCounterApplication(true)) for i := 0; i < nValidators; i++ { vss[i] = NewValidatorStub(privVals[i]) diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go new file mode 100644 index 000000000..0bf789dfd --- /dev/null +++ b/consensus/mempool_test.go @@ -0,0 +1,110 @@ +package consensus + +import ( + "encoding/binary" + // "math/rand" + "testing" + "time" + + "github.com/tendermint/tendermint/config/tendermint_test" + "github.com/tendermint/tendermint/types" + tmsp "github.com/tendermint/tmsp/types" + + . "github.com/tendermint/go-common" +) + +func init() { + tendermint_test.ResetConfig("consensus_mempool_test") +} + +func TestTxConcurrentWithCommit(t *testing.T) { + + state, privVals := randGenesisState(1, false, 10) + cs := newConsensusState(state, privVals[0], NewCounterApplication()) + height, round := cs.Height, cs.Round + newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) + + appendTxsRange := func(start, end int) { + // Append some txs. + for i := start; i < end; i++ { + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(i)) + err := cs.mempool.CheckTx(txBytes, nil) + if err != nil { + t.Fatal("Error after CheckTx: %v", err) + } + // time.Sleep(time.Microsecond * time.Duration(rand.Int63n(3000))) + } + } + + NTxs := 10000 + go appendTxsRange(0, NTxs) + + startTestRound(cs, height, round) + ticker := time.NewTicker(time.Second * 5) + for nTxs := 0; nTxs < NTxs; { + select { + case b := <-newBlockCh: + nTxs += b.(types.EventDataNewBlock).Block.Header.NumTxs + case <-ticker.C: + t.Fatal("Timed out waiting to commit blocks with transactions") + } + } +} + +// CounterApplication that maintains a mempool state and resets it upon commit +type CounterApplication struct { + txCount int + mempoolTxCount int +} + +func NewCounterApplication() *CounterApplication { + return &CounterApplication{} +} + +func (app *CounterApplication) Info() string { + return Fmt("txs:%v", app.txCount) +} + +func (app *CounterApplication) SetOption(key string, value string) (log string) { + return "" +} + +func (app *CounterApplication) AppendTx(tx []byte) tmsp.Result { + return runTx(tx, &app.txCount) +} + +func (app *CounterApplication) CheckTx(tx []byte) tmsp.Result { + return runTx(tx, &app.mempoolTxCount) +} + +func runTx(tx []byte, countPtr *int) tmsp.Result { + count := *countPtr + tx8 := make([]byte, 8) + copy(tx8[len(tx8)-len(tx):], tx) + txValue := binary.BigEndian.Uint64(tx8) + if txValue != uint64(count) { + return tmsp.Result{ + Code: tmsp.CodeType_BadNonce, + Data: nil, + Log: Fmt("Invalid nonce. Expected %v, got %v", count, txValue), + } + } + *countPtr += 1 + return tmsp.OK +} + +func (app *CounterApplication) Commit() tmsp.Result { + app.mempoolTxCount = app.txCount + if app.txCount == 0 { + return tmsp.OK + } else { + hash := make([]byte, 8) + binary.BigEndian.PutUint64(hash, uint64(app.txCount)) + return tmsp.NewResultOK(hash, "") + } +} + +func (app *CounterApplication) Query(query []byte) tmsp.Result { + return tmsp.NewResultOK(nil, Fmt("Query is not supported")) +}