diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 3b6dffb3a..d03d98112 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -48,3 +48,7 @@ * `Switch#DialPeerWithAddress` now only takes an address - [consensus] \#3067 getBeginBlockValidatorInfo loads validators from stateDB instead of state (@james-ray) - [pex] \#3603 Dial seeds when addrbook needs more addresses (@defunctzombie) +- [mempool] \#3322 Remove only valid (Code==0) txs on Update + * `Mempool#Update` and `BlockExecutor#Commit` now accept + `[]*abci.ResponseDeliverTx` - list of `DeliverTx` responses, which should + match `block.Txs` diff --git a/mempool/cache_test.go b/mempool/cache_test.go index ea9f63fd6..539bf1197 100644 --- a/mempool/cache_test.go +++ b/mempool/cache_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/abci/example/kvstore" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -66,7 +67,7 @@ func TestCacheAfterUpdate(t *testing.T) { tx := types.Tx{byte(v)} updateTxs = append(updateTxs, tx) } - mempool.Update(int64(tcIndex), updateTxs, nil, nil) + mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil) for _, v := range tc.reAddIndices { tx := types.Tx{byte(v)} diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 2a4bacbbb..c6ff475aa 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -519,6 +519,7 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { func (mem *CListMempool) Update( height int64, txs types.Txs, + deliverTxResponses []*abci.ResponseDeliverTx, preCheck PreCheckFunc, postCheck PostCheckFunc, ) error { @@ -533,20 +534,37 @@ func (mem *CListMempool) Update( mem.postCheck = postCheck } - // Add committed transactions to cache (if missing). - for _, tx := range txs { - _ = mem.cache.Push(tx) - } + for i, tx := range txs { + if deliverTxResponses[i].Code == abci.CodeTypeOK { + // Add valid committed tx to the cache (if missing). + _ = mem.cache.Push(tx) + + // Remove valid committed tx from the mempool. + if e, ok := mem.txsMap.Load(txKey(tx)); ok { + mem.removeTx(tx, e.(*clist.CElement), false) + } + } else { + // Allow invalid transactions to be resubmitted. + mem.cache.Remove(tx) - // Remove committed transactions. - txsLeft := mem.removeTxs(txs) + // Don't remove invalid tx from the mempool. + // Otherwise evil proposer can drop valid txs. + // Example: + // 100 -> 101 -> 102 + // Block, proposed by evil proposer: + // 101 -> 102 + // Mempool (if you remove txs): + // 100 + // https://github.com/tendermint/tendermint/issues/3322. + } + } // Either recheck non-committed txs to see if they became invalid // or just notify there're some txs left. - if len(txsLeft) > 0 { + if mem.Size() > 0 { if mem.config.Recheck { - mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height) - mem.recheckTxs(txsLeft) + mem.logger.Info("Recheck txs", "numtxs", mem.Size(), "height", height) + mem.recheckTxs() // At this point, mem.txs are being rechecked. // mem.recheckCursor re-scans mem.txs and possibly removes some txs. // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. @@ -561,42 +579,22 @@ func (mem *CListMempool) Update( return nil } -func (mem *CListMempool) removeTxs(txs types.Txs) []types.Tx { - // Build a map for faster lookups. - txsMap := make(map[string]struct{}, len(txs)) - for _, tx := range txs { - txsMap[string(tx)] = struct{}{} - } - - txsLeft := make([]types.Tx, 0, mem.txs.Len()) - for e := mem.txs.Front(); e != nil; e = e.Next() { - memTx := e.Value.(*mempoolTx) - // Remove the tx if it's already in a block. - if _, ok := txsMap[string(memTx.tx)]; ok { - // NOTE: we don't remove committed txs from the cache. - mem.removeTx(memTx.tx, e, false) - - continue - } - txsLeft = append(txsLeft, memTx.tx) +func (mem *CListMempool) recheckTxs() { + if mem.Size() == 0 { + panic("recheckTxs is called, but the mempool is empty") } - return txsLeft -} -// NOTE: pass in txs because mem.txs can mutate concurrently. -func (mem *CListMempool) recheckTxs(txs []types.Tx) { - if len(txs) == 0 { - return - } atomic.StoreInt32(&mem.rechecking, 1) mem.recheckCursor = mem.txs.Front() mem.recheckEnd = mem.txs.Back() // Push txs to proxyAppConn // NOTE: globalCb may be called concurrently. - for _, tx := range txs { - mem.proxyAppConn.CheckTxAsync(tx) + for e := mem.txs.Front(); e != nil; e = e.Next() { + memTx := e.Value.(*mempoolTx) + mem.proxyAppConn.CheckTxAsync(memTx.tx) } + mem.proxyAppConn.FlushAsync() } diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 69d4eb68c..3eb4990b6 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -170,22 +170,42 @@ func TestMempoolFilters(t *testing.T) { {10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(0), 0}, } for tcIndex, tt := range tests { - mempool.Update(1, emptyTxArr, tt.preFilter, tt.postFilter) + mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter) checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID) require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex) mempool.Flush() } } -func TestMempoolUpdateAddsTxsToCache(t *testing.T) { +func TestMempoolUpdate(t *testing.T) { app := kvstore.NewKVStoreApplication() cc := proxy.NewLocalClientCreator(app) mempool, cleanup := newMempoolWithApp(cc) defer cleanup() - mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil) - err := mempool.CheckTx([]byte{0x01}, nil) - if assert.Error(t, err) { - assert.Equal(t, ErrTxInCache, err) + + // 1. Adds valid txs to the cache + { + mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) + err := mempool.CheckTx([]byte{0x01}, nil) + if assert.Error(t, err) { + assert.Equal(t, ErrTxInCache, err) + } + } + + // 2. Removes valid txs from the mempool + { + err := mempool.CheckTx([]byte{0x02}, nil) + require.NoError(t, err) + mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil) + assert.Zero(t, mempool.Size()) + } + + // 3. Removes invalid transactions from the cache, but leaves them in the mempool (if present) + { + err := mempool.CheckTx([]byte{0x03}, nil) + require.NoError(t, err) + mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil) + assert.Equal(t, 1, mempool.Size()) } } @@ -210,7 +230,7 @@ func TestTxsAvailable(t *testing.T) { // it should fire once now for the new height // since there are still txs left committedTxs, txs := txs[:50], txs[50:] - if err := mempool.Update(1, committedTxs, nil, nil); err != nil { + if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { t.Error(err) } ensureFire(t, mempool.TxsAvailable(), timeoutMS) @@ -222,7 +242,7 @@ func TestTxsAvailable(t *testing.T) { // now call update with all the txs. it should not fire as there are no txs left committedTxs = append(txs, moreTxs...) - if err := mempool.Update(2, committedTxs, nil, nil); err != nil { + if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { t.Error(err) } ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) @@ -281,7 +301,7 @@ func TestSerialReap(t *testing.T) { binary.BigEndian.PutUint64(txBytes, uint64(i)) txs = append(txs, txBytes) } - if err := mempool.Update(0, txs, nil, nil); err != nil { + if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil { t.Error(err) } } @@ -462,7 +482,7 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 1, mempool.TxsBytes()) // 3. zero again after tx is removed by Update - mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil) + mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) assert.EqualValues(t, 0, mempool.TxsBytes()) // 4. zero after Flush @@ -507,7 +527,7 @@ func TestMempoolTxsBytes(t *testing.T) { require.NotEmpty(t, res2.Data) // Pretend like we committed nothing so txBytes gets rechecked and removed. - mempool.Update(1, []types.Tx{}, nil, nil) + mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil) assert.EqualValues(t, 0, mempool.TxsBytes()) } @@ -570,3 +590,11 @@ func checksumFile(p string, t *testing.T) string { require.Nil(t, err, "expecting successful read of %q", p) return checksumIt(data) } + +func abciResponses(n int, code uint32) []*abci.ResponseDeliverTx { + responses := make([]*abci.ResponseDeliverTx, 0, n) + for i := 0; i < n; i++ { + responses = append(responses, &abci.ResponseDeliverTx{Code: code}) + } + return responses +} diff --git a/mempool/mempool.go b/mempool/mempool.go index 1c1873d47..0995c734f 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -43,7 +43,7 @@ type Mempool interface { // Update informs the mempool that the given txs were committed and can be discarded. // NOTE: this should be called *after* block is committed by consensus. // NOTE: unsafe; Lock/Unlock must be managed by caller - Update(blockHeight int64, blockTxs types.Txs, newPreFn PreCheckFunc, newPostFn PostCheckFunc) error + Update(blockHeight int64, blockTxs types.Txs, deliverTxResponses []*abci.ResponseDeliverTx, newPreFn PreCheckFunc, newPostFn PostCheckFunc) error // FlushAppConn flushes the mempool connection to ensure async reqResCb calls are // done. E.g. from CheckTx. diff --git a/mock/mempool.go b/mock/mempool.go index 66ad68d30..cebe156ba 100644 --- a/mock/mempool.go +++ b/mock/mempool.go @@ -27,6 +27,7 @@ func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } func (Mempool) Update( _ int64, _ types.Txs, + _ []*abci.ResponseDeliverTx, _ mempl.PreCheckFunc, _ mempl.PostCheckFunc, ) error { diff --git a/state/execution.go b/state/execution.go index d872145ef..7e49a9ad8 100644 --- a/state/execution.go +++ b/state/execution.go @@ -156,7 +156,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b } // Lock mempool, commit app state, update mempoool. - appHash, err := blockExec.Commit(state, block) + appHash, err := blockExec.Commit(state, block, abciResponses.DeliverTx) if err != nil { return state, fmt.Errorf("Commit failed for application: %v", err) } @@ -188,6 +188,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b func (blockExec *BlockExecutor) Commit( state State, block *types.Block, + deliverTxResponses []*abci.ResponseDeliverTx, ) ([]byte, error) { blockExec.mempool.Lock() defer blockExec.mempool.Unlock() @@ -222,6 +223,7 @@ func (blockExec *BlockExecutor) Commit( err = blockExec.mempool.Update( block.Height, block.Txs, + deliverTxResponses, TxPreCheck(state), TxPostCheck(state), )