Browse Source

mempool: remove only valid (Code==0) txs on Update (#3625)

* mempool: remove only valid (Code==0) txs on Update

so evil proposers can't drop valid txs in Commit stage.

Also remove invalid (Code!=0) txs from the cache so they can be
resubmitted.

Fixes #3322

@rickyyangz:

In the end of commit stage, we will update mempool to remove all the txs
in current block.

// Update mempool.
err = blockExec.mempool.Update(
	block.Height,
	block.Txs,
	TxPreCheck(state),
	TxPostCheck(state),
)

Assum an account has 3 transactions in the mempool, the sequences are
100, 101 and 102 separately, So an evil proposal can only package the
101 and 102 transactions into its proposal block, and leave 100 still in
mempool, then the two txs will be removed from all validators' mempool
when commit. So the account lost the two valid txs.

@ebuchman:

In the longer term we may want to do something like #2639 so we can
validate txs before we commit the block. But even in this case we'd only
want to run the equivalent of CheckTx, which means the DeliverTx could
still fail even if the CheckTx passes depending on how the app handles
the ABCI Code semantics. So more work will be required around the ABCI
code. See also #2185

* add changelog entry and tests

* improve changelog message

* reformat code
pull/3633/head
Anton Kaliaev 5 years ago
committed by GitHub
parent
commit
27909e5d2a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 84 additions and 50 deletions
  1. +4
    -0
      CHANGELOG_PENDING.md
  2. +2
    -1
      mempool/cache_test.go
  3. +34
    -36
      mempool/clist_mempool.go
  4. +39
    -11
      mempool/clist_mempool_test.go
  5. +1
    -1
      mempool/mempool.go
  6. +1
    -0
      mock/mempool.go
  7. +3
    -1
      state/execution.go

+ 4
- 0
CHANGELOG_PENDING.md View File

@ -48,3 +48,7 @@
* `Switch#DialPeerWithAddress` now only takes an address * `Switch#DialPeerWithAddress` now only takes an address
- [consensus] \#3067 getBeginBlockValidatorInfo loads validators from stateDB instead of state (@james-ray) - [consensus] \#3067 getBeginBlockValidatorInfo loads validators from stateDB instead of state (@james-ray)
- [pex] \#3603 Dial seeds when addrbook needs more addresses (@defunctzombie) - [pex] \#3603 Dial seeds when addrbook needs more addresses (@defunctzombie)
- [mempool] \#3322 Remove only valid (Code==0) txs on Update
* `Mempool#Update` and `BlockExecutor#Commit` now accept
`[]*abci.ResponseDeliverTx` - list of `DeliverTx` responses, which should
match `block.Txs`

+ 2
- 1
mempool/cache_test.go View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/example/kvstore"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -66,7 +67,7 @@ func TestCacheAfterUpdate(t *testing.T) {
tx := types.Tx{byte(v)} tx := types.Tx{byte(v)}
updateTxs = append(updateTxs, tx) updateTxs = append(updateTxs, tx)
} }
mempool.Update(int64(tcIndex), updateTxs, nil, nil)
mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil)
for _, v := range tc.reAddIndices { for _, v := range tc.reAddIndices {
tx := types.Tx{byte(v)} tx := types.Tx{byte(v)}


+ 34
- 36
mempool/clist_mempool.go View File

@ -519,6 +519,7 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
func (mem *CListMempool) Update( func (mem *CListMempool) Update(
height int64, height int64,
txs types.Txs, txs types.Txs,
deliverTxResponses []*abci.ResponseDeliverTx,
preCheck PreCheckFunc, preCheck PreCheckFunc,
postCheck PostCheckFunc, postCheck PostCheckFunc,
) error { ) error {
@ -533,20 +534,37 @@ func (mem *CListMempool) Update(
mem.postCheck = postCheck mem.postCheck = postCheck
} }
// Add committed transactions to cache (if missing).
for _, tx := range txs {
_ = mem.cache.Push(tx)
}
for i, tx := range txs {
if deliverTxResponses[i].Code == abci.CodeTypeOK {
// Add valid committed tx to the cache (if missing).
_ = mem.cache.Push(tx)
// Remove valid committed tx from the mempool.
if e, ok := mem.txsMap.Load(txKey(tx)); ok {
mem.removeTx(tx, e.(*clist.CElement), false)
}
} else {
// Allow invalid transactions to be resubmitted.
mem.cache.Remove(tx)
// Remove committed transactions.
txsLeft := mem.removeTxs(txs)
// Don't remove invalid tx from the mempool.
// Otherwise evil proposer can drop valid txs.
// Example:
// 100 -> 101 -> 102
// Block, proposed by evil proposer:
// 101 -> 102
// Mempool (if you remove txs):
// 100
// https://github.com/tendermint/tendermint/issues/3322.
}
}
// Either recheck non-committed txs to see if they became invalid // Either recheck non-committed txs to see if they became invalid
// or just notify there're some txs left. // or just notify there're some txs left.
if len(txsLeft) > 0 {
if mem.Size() > 0 {
if mem.config.Recheck { if mem.config.Recheck {
mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height)
mem.recheckTxs(txsLeft)
mem.logger.Info("Recheck txs", "numtxs", mem.Size(), "height", height)
mem.recheckTxs()
// At this point, mem.txs are being rechecked. // At this point, mem.txs are being rechecked.
// mem.recheckCursor re-scans mem.txs and possibly removes some txs. // mem.recheckCursor re-scans mem.txs and possibly removes some txs.
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil. // Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
@ -561,42 +579,22 @@ func (mem *CListMempool) Update(
return nil return nil
} }
func (mem *CListMempool) removeTxs(txs types.Txs) []types.Tx {
// Build a map for faster lookups.
txsMap := make(map[string]struct{}, len(txs))
for _, tx := range txs {
txsMap[string(tx)] = struct{}{}
}
txsLeft := make([]types.Tx, 0, mem.txs.Len())
for e := mem.txs.Front(); e != nil; e = e.Next() {
memTx := e.Value.(*mempoolTx)
// Remove the tx if it's already in a block.
if _, ok := txsMap[string(memTx.tx)]; ok {
// NOTE: we don't remove committed txs from the cache.
mem.removeTx(memTx.tx, e, false)
continue
}
txsLeft = append(txsLeft, memTx.tx)
func (mem *CListMempool) recheckTxs() {
if mem.Size() == 0 {
panic("recheckTxs is called, but the mempool is empty")
} }
return txsLeft
}
// NOTE: pass in txs because mem.txs can mutate concurrently.
func (mem *CListMempool) recheckTxs(txs []types.Tx) {
if len(txs) == 0 {
return
}
atomic.StoreInt32(&mem.rechecking, 1) atomic.StoreInt32(&mem.rechecking, 1)
mem.recheckCursor = mem.txs.Front() mem.recheckCursor = mem.txs.Front()
mem.recheckEnd = mem.txs.Back() mem.recheckEnd = mem.txs.Back()
// Push txs to proxyAppConn // Push txs to proxyAppConn
// NOTE: globalCb may be called concurrently. // NOTE: globalCb may be called concurrently.
for _, tx := range txs {
mem.proxyAppConn.CheckTxAsync(tx)
for e := mem.txs.Front(); e != nil; e = e.Next() {
memTx := e.Value.(*mempoolTx)
mem.proxyAppConn.CheckTxAsync(memTx.tx)
} }
mem.proxyAppConn.FlushAsync() mem.proxyAppConn.FlushAsync()
} }


+ 39
- 11
mempool/clist_mempool_test.go View File

@ -170,22 +170,42 @@ func TestMempoolFilters(t *testing.T) {
{10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(0), 0}, {10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(0), 0},
} }
for tcIndex, tt := range tests { for tcIndex, tt := range tests {
mempool.Update(1, emptyTxArr, tt.preFilter, tt.postFilter)
mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter)
checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID) checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID)
require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex) require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
mempool.Flush() mempool.Flush()
} }
} }
func TestMempoolUpdateAddsTxsToCache(t *testing.T) {
func TestMempoolUpdate(t *testing.T) {
app := kvstore.NewKVStoreApplication() app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app) cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc) mempool, cleanup := newMempoolWithApp(cc)
defer cleanup() defer cleanup()
mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil)
err := mempool.CheckTx([]byte{0x01}, nil)
if assert.Error(t, err) {
assert.Equal(t, ErrTxInCache, err)
// 1. Adds valid txs to the cache
{
mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
err := mempool.CheckTx([]byte{0x01}, nil)
if assert.Error(t, err) {
assert.Equal(t, ErrTxInCache, err)
}
}
// 2. Removes valid txs from the mempool
{
err := mempool.CheckTx([]byte{0x02}, nil)
require.NoError(t, err)
mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
assert.Zero(t, mempool.Size())
}
// 3. Removes invalid transactions from the cache, but leaves them in the mempool (if present)
{
err := mempool.CheckTx([]byte{0x03}, nil)
require.NoError(t, err)
mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil)
assert.Equal(t, 1, mempool.Size())
} }
} }
@ -210,7 +230,7 @@ func TestTxsAvailable(t *testing.T) {
// it should fire once now for the new height // it should fire once now for the new height
// since there are still txs left // since there are still txs left
committedTxs, txs := txs[:50], txs[50:] committedTxs, txs := txs[:50], txs[50:]
if err := mempool.Update(1, committedTxs, nil, nil); err != nil {
if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
t.Error(err) t.Error(err)
} }
ensureFire(t, mempool.TxsAvailable(), timeoutMS) ensureFire(t, mempool.TxsAvailable(), timeoutMS)
@ -222,7 +242,7 @@ func TestTxsAvailable(t *testing.T) {
// now call update with all the txs. it should not fire as there are no txs left // now call update with all the txs. it should not fire as there are no txs left
committedTxs = append(txs, moreTxs...) committedTxs = append(txs, moreTxs...)
if err := mempool.Update(2, committedTxs, nil, nil); err != nil {
if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
t.Error(err) t.Error(err)
} }
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
@ -281,7 +301,7 @@ func TestSerialReap(t *testing.T) {
binary.BigEndian.PutUint64(txBytes, uint64(i)) binary.BigEndian.PutUint64(txBytes, uint64(i))
txs = append(txs, txBytes) txs = append(txs, txBytes)
} }
if err := mempool.Update(0, txs, nil, nil); err != nil {
if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil {
t.Error(err) t.Error(err)
} }
} }
@ -462,7 +482,7 @@ func TestMempoolTxsBytes(t *testing.T) {
assert.EqualValues(t, 1, mempool.TxsBytes()) assert.EqualValues(t, 1, mempool.TxsBytes())
// 3. zero again after tx is removed by Update // 3. zero again after tx is removed by Update
mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil)
mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
assert.EqualValues(t, 0, mempool.TxsBytes()) assert.EqualValues(t, 0, mempool.TxsBytes())
// 4. zero after Flush // 4. zero after Flush
@ -507,7 +527,7 @@ func TestMempoolTxsBytes(t *testing.T) {
require.NotEmpty(t, res2.Data) require.NotEmpty(t, res2.Data)
// Pretend like we committed nothing so txBytes gets rechecked and removed. // Pretend like we committed nothing so txBytes gets rechecked and removed.
mempool.Update(1, []types.Tx{}, nil, nil)
mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil)
assert.EqualValues(t, 0, mempool.TxsBytes()) assert.EqualValues(t, 0, mempool.TxsBytes())
} }
@ -570,3 +590,11 @@ func checksumFile(p string, t *testing.T) string {
require.Nil(t, err, "expecting successful read of %q", p) require.Nil(t, err, "expecting successful read of %q", p)
return checksumIt(data) return checksumIt(data)
} }
func abciResponses(n int, code uint32) []*abci.ResponseDeliverTx {
responses := make([]*abci.ResponseDeliverTx, 0, n)
for i := 0; i < n; i++ {
responses = append(responses, &abci.ResponseDeliverTx{Code: code})
}
return responses
}

+ 1
- 1
mempool/mempool.go View File

@ -43,7 +43,7 @@ type Mempool interface {
// Update informs the mempool that the given txs were committed and can be discarded. // Update informs the mempool that the given txs were committed and can be discarded.
// NOTE: this should be called *after* block is committed by consensus. // NOTE: this should be called *after* block is committed by consensus.
// NOTE: unsafe; Lock/Unlock must be managed by caller // NOTE: unsafe; Lock/Unlock must be managed by caller
Update(blockHeight int64, blockTxs types.Txs, newPreFn PreCheckFunc, newPostFn PostCheckFunc) error
Update(blockHeight int64, blockTxs types.Txs, deliverTxResponses []*abci.ResponseDeliverTx, newPreFn PreCheckFunc, newPostFn PostCheckFunc) error
// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are // FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
// done. E.g. from CheckTx. // done. E.g. from CheckTx.


+ 1
- 0
mock/mempool.go View File

@ -27,6 +27,7 @@ func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
func (Mempool) Update( func (Mempool) Update(
_ int64, _ int64,
_ types.Txs, _ types.Txs,
_ []*abci.ResponseDeliverTx,
_ mempl.PreCheckFunc, _ mempl.PreCheckFunc,
_ mempl.PostCheckFunc, _ mempl.PostCheckFunc,
) error { ) error {


+ 3
- 1
state/execution.go View File

@ -156,7 +156,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b
} }
// Lock mempool, commit app state, update mempoool. // Lock mempool, commit app state, update mempoool.
appHash, err := blockExec.Commit(state, block)
appHash, err := blockExec.Commit(state, block, abciResponses.DeliverTx)
if err != nil { if err != nil {
return state, fmt.Errorf("Commit failed for application: %v", err) return state, fmt.Errorf("Commit failed for application: %v", err)
} }
@ -188,6 +188,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b
func (blockExec *BlockExecutor) Commit( func (blockExec *BlockExecutor) Commit(
state State, state State,
block *types.Block, block *types.Block,
deliverTxResponses []*abci.ResponseDeliverTx,
) ([]byte, error) { ) ([]byte, error) {
blockExec.mempool.Lock() blockExec.mempool.Lock()
defer blockExec.mempool.Unlock() defer blockExec.mempool.Unlock()
@ -222,6 +223,7 @@ func (blockExec *BlockExecutor) Commit(
err = blockExec.mempool.Update( err = blockExec.mempool.Update(
block.Height, block.Height,
block.Txs, block.Txs,
deliverTxResponses,
TxPreCheck(state), TxPreCheck(state),
TxPostCheck(state), TxPostCheck(state),
) )


Loading…
Cancel
Save