From 851d2e3bdeb4d5a508a67cd42b36cb0328a3f817 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 5 Oct 2021 16:23:15 -0400 Subject: [PATCH] mempool,rpc: add removetx rpc method (#7047) Addresses one of the concerns with #7041. Provides a mechanism (via the RPC interface) to delete a single transaction, described by its hash, from the mempool. The method returns an error if the transaction cannot be found. Once the transaction is removed it remains in the cache and cannot be resubmitted until the cache is cleared or it expires from the cache. --- CHANGELOG_PENDING.md | 2 ++ internal/consensus/replay_stubs.go | 1 + internal/mempool/cache.go | 12 +++++----- internal/mempool/mempool.go | 4 ++++ internal/mempool/mock/mempool.go | 1 + internal/mempool/tx.go | 15 ------------ internal/mempool/v0/cache_test.go | 4 ++-- internal/mempool/v0/clist_mempool.go | 22 ++++++++++-------- internal/mempool/v0/clist_mempool_test.go | 4 ++-- internal/mempool/v0/reactor.go | 4 ++-- internal/mempool/v1/mempool.go | 28 +++++++++++++++++------ internal/mempool/v1/mempool_test.go | 12 +++++----- internal/mempool/v1/reactor.go | 4 ++-- internal/mempool/v1/tx.go | 21 ++++++++--------- internal/mempool/v1/tx_test.go | 12 +++++----- internal/rpc/core/mempool.go | 4 ++++ internal/rpc/core/routes.go | 1 + light/rpc/client.go | 4 ++++ rpc/client/http/http.go | 8 +++++++ rpc/client/interface.go | 1 + rpc/client/local/local.go | 4 ++++ rpc/openapi/openapi.yaml | 25 ++++++++++++++++++++ types/mempool.go | 10 ++++---- types/tx.go | 12 +++++----- 24 files changed, 137 insertions(+), 78 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index c44c58ef0..d80e3ae61 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -30,6 +30,8 @@ Special thanks to external contributors on this release: ### FEATURES +- [mempool, rpc] \#7041 Add removeTx operation to the RPC layer. (@tychoish) + ### IMPROVEMENTS ### BUG FIXES diff --git a/internal/consensus/replay_stubs.go b/internal/consensus/replay_stubs.go index 361ac6ec8..bc8c11cd9 100644 --- a/internal/consensus/replay_stubs.go +++ b/internal/consensus/replay_stubs.go @@ -24,6 +24,7 @@ func (emptyMempool) Size() int { return 0 } func (emptyMempool) CheckTx(_ context.Context, _ types.Tx, _ func(*abci.Response), _ mempool.TxInfo) error { return nil } +func (emptyMempool) RemoveTxByKey(txKey types.TxKey) error { return nil } func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } func (emptyMempool) Update( diff --git a/internal/mempool/cache.go b/internal/mempool/cache.go index 43174f106..3cd45d2bc 100644 --- a/internal/mempool/cache.go +++ b/internal/mempool/cache.go @@ -31,14 +31,14 @@ var _ TxCache = (*LRUTxCache)(nil) type LRUTxCache struct { mtx tmsync.Mutex size int - cacheMap map[[TxKeySize]byte]*list.Element + cacheMap map[types.TxKey]*list.Element list *list.List } func NewLRUTxCache(cacheSize int) *LRUTxCache { return &LRUTxCache{ size: cacheSize, - cacheMap: make(map[[TxKeySize]byte]*list.Element, cacheSize), + cacheMap: make(map[types.TxKey]*list.Element, cacheSize), list: list.New(), } } @@ -53,7 +53,7 @@ func (c *LRUTxCache) Reset() { c.mtx.Lock() defer c.mtx.Unlock() - c.cacheMap = make(map[[TxKeySize]byte]*list.Element, c.size) + c.cacheMap = make(map[types.TxKey]*list.Element, c.size) c.list.Init() } @@ -61,7 +61,7 @@ func (c *LRUTxCache) Push(tx types.Tx) bool { c.mtx.Lock() defer c.mtx.Unlock() - key := TxKey(tx) + key := tx.Key() moved, ok := c.cacheMap[key] if ok { @@ -72,7 +72,7 @@ func (c *LRUTxCache) Push(tx types.Tx) bool { if c.list.Len() >= c.size { front := c.list.Front() if front != nil { - frontKey := front.Value.([TxKeySize]byte) + frontKey := front.Value.(types.TxKey) delete(c.cacheMap, frontKey) c.list.Remove(front) } @@ -88,7 +88,7 @@ func (c *LRUTxCache) Remove(tx types.Tx) { c.mtx.Lock() defer c.mtx.Unlock() - key := TxKey(tx) + key := tx.Key() e := c.cacheMap[key] delete(c.cacheMap, key) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index d679b3506..6e3955dc3 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -32,6 +32,10 @@ type Mempool interface { // its validity and whether it should be added to the mempool. CheckTx(ctx context.Context, tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error + // RemoveTxByKey removes a transaction, identified by its key, + // from the mempool. + RemoveTxByKey(txKey types.TxKey) error + // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes // bytes total with the condition that the total gasWanted must be less than // maxGas. diff --git a/internal/mempool/mock/mempool.go b/internal/mempool/mock/mempool.go index bf9ce8bd0..8e6f0c7bf 100644 --- a/internal/mempool/mock/mempool.go +++ b/internal/mempool/mock/mempool.go @@ -20,6 +20,7 @@ func (Mempool) Size() int { return 0 } func (Mempool) CheckTx(_ context.Context, _ types.Tx, _ func(*abci.Response), _ mempool.TxInfo) error { return nil } +func (Mempool) RemoveTxByKey(txKey types.TxKey) error { return nil } func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } func (Mempool) Update( diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index 860d3d3b4..adafdf85e 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -1,24 +1,9 @@ package mempool import ( - "crypto/sha256" - "github.com/tendermint/tendermint/types" ) -// TxKeySize defines the size of the transaction's key used for indexing. -const TxKeySize = sha256.Size - -// TxKey is the fixed length array key used as an index. -func TxKey(tx types.Tx) [TxKeySize]byte { - return sha256.Sum256(tx) -} - -// TxHashFromBytes returns the hash of a transaction from raw bytes. -func TxHashFromBytes(tx []byte) []byte { - return types.Tx(tx).Hash() -} - // TxInfo are parameters that get passed when attempting to add a tx to the // mempool. type TxInfo struct { diff --git a/internal/mempool/v0/cache_test.go b/internal/mempool/v0/cache_test.go index 389c0806c..5bf2c7603 100644 --- a/internal/mempool/v0/cache_test.go +++ b/internal/mempool/v0/cache_test.go @@ -61,7 +61,7 @@ func TestCacheAfterUpdate(t *testing.T) { require.NotEqual(t, len(tc.txsInCache), counter, "cache larger than expected on testcase %d", tcIndex) - nodeVal := node.Value.([sha256.Size]byte) + nodeVal := node.Value.(types.TxKey) expectedBz := sha256.Sum256([]byte{byte(tc.txsInCache[len(tc.txsInCache)-counter-1])}) // Reference for reading the errors: // >>> sha256('\x00').hexdigest() @@ -71,7 +71,7 @@ func TestCacheAfterUpdate(t *testing.T) { // >>> sha256('\x02').hexdigest() // 'dbc1b4c900ffe48d575b5da5c638040125f65db0fe3e24494b76ea986457d986' - require.Equal(t, expectedBz, nodeVal, "Equality failed on index %d, tc %d", counter, tcIndex) + require.EqualValues(t, expectedBz, nodeVal, "Equality failed on index %d, tc %d", counter, tcIndex) counter++ node = node.Next() } diff --git a/internal/mempool/v0/clist_mempool.go b/internal/mempool/v0/clist_mempool.go index f3571ede0..7816730c1 100644 --- a/internal/mempool/v0/clist_mempool.go +++ b/internal/mempool/v0/clist_mempool.go @@ -3,6 +3,7 @@ package v0 import ( "bytes" "context" + "errors" "fmt" "sync" "sync/atomic" @@ -240,7 +241,7 @@ func (mem *CListMempool) CheckTx( // Note it's possible a tx is still in the cache but no longer in the mempool // (eg. after committing a block, txs are removed from mempool but not cache), // so we only record the sender for txs still in the mempool. - if e, ok := mem.txsMap.Load(mempool.TxKey(tx)); ok { + if e, ok := mem.txsMap.Load(tx.Key()); ok { memTx := e.(*clist.CElement).Value.(*mempoolTx) _, loaded := memTx.senders.LoadOrStore(txInfo.SenderID, true) // TODO: consider punishing peer for dups, @@ -327,7 +328,7 @@ func (mem *CListMempool) reqResCb( // - resCbFirstTime (lock not held) if tx is valid func (mem *CListMempool) addTx(memTx *mempoolTx) { e := mem.txs.PushBack(memTx) - mem.txsMap.Store(mempool.TxKey(memTx.tx), e) + mem.txsMap.Store(memTx.tx.Key(), e) atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) } @@ -338,7 +339,7 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) { func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) { mem.txs.Remove(elem) elem.DetachPrev() - mem.txsMap.Delete(mempool.TxKey(tx)) + mem.txsMap.Delete(tx.Key()) atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) if removeFromCache { @@ -347,13 +348,16 @@ func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromC } // RemoveTxByKey removes a transaction from the mempool by its TxKey index. -func (mem *CListMempool) RemoveTxByKey(txKey [mempool.TxKeySize]byte, removeFromCache bool) { +func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { if e, ok := mem.txsMap.Load(txKey); ok { memTx := e.(*clist.CElement).Value.(*mempoolTx) if memTx != nil { - mem.removeTx(memTx.tx, e.(*clist.CElement), removeFromCache) + mem.removeTx(memTx.tx, e.(*clist.CElement), false) + return nil } + return errors.New("transaction not found") } + return errors.New("invalid transaction found") } func (mem *CListMempool) isFull(txSize int) error { @@ -409,7 +413,7 @@ func (mem *CListMempool) resCbFirstTime( mem.addTx(memTx) mem.logger.Debug( "added good transaction", - "tx", mempool.TxHashFromBytes(tx), + "tx", types.Tx(tx).Hash(), "res", r, "height", memTx.height, "total", mem.Size(), @@ -419,7 +423,7 @@ func (mem *CListMempool) resCbFirstTime( // ignore bad transaction mem.logger.Debug( "rejected bad transaction", - "tx", mempool.TxHashFromBytes(tx), + "tx", types.Tx(tx).Hash(), "peerID", peerP2PID, "res", r, "err", postCheckErr, @@ -460,7 +464,7 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { // Good, nothing to do. } else { // Tx became invalidated due to newly committed block. - mem.logger.Debug("tx is no longer valid", "tx", mempool.TxHashFromBytes(tx), "res", r, "err", postCheckErr) + mem.logger.Debug("tx is no longer valid", "tx", types.Tx(tx).Hash(), "res", r, "err", postCheckErr) // NOTE: we remove tx from the cache because it might be good later mem.removeTx(tx, mem.recheckCursor, !mem.config.KeepInvalidTxsInCache) } @@ -598,7 +602,7 @@ func (mem *CListMempool) Update( // Mempool after: // 100 // https://github.com/tendermint/tendermint/issues/3322. - if e, ok := mem.txsMap.Load(mempool.TxKey(tx)); ok { + if e, ok := mem.txsMap.Load(tx.Key()); ok { mem.removeTx(tx, e.(*clist.CElement), false) } } diff --git a/internal/mempool/v0/clist_mempool_test.go b/internal/mempool/v0/clist_mempool_test.go index e55942e48..b61a8333e 100644 --- a/internal/mempool/v0/clist_mempool_test.go +++ b/internal/mempool/v0/clist_mempool_test.go @@ -544,9 +544,9 @@ func TestMempoolTxsBytes(t *testing.T) { err = mp.CheckTx(context.Background(), []byte{0x06}, nil, mempool.TxInfo{}) require.NoError(t, err) assert.EqualValues(t, 9, mp.SizeBytes()) - mp.RemoveTxByKey(mempool.TxKey([]byte{0x07}), true) + assert.Error(t, mp.RemoveTxByKey(types.Tx([]byte{0x07}).Key())) assert.EqualValues(t, 9, mp.SizeBytes()) - mp.RemoveTxByKey(mempool.TxKey([]byte{0x06}), true) + assert.NoError(t, mp.RemoveTxByKey(types.Tx([]byte{0x06}).Key())) assert.EqualValues(t, 8, mp.SizeBytes()) } diff --git a/internal/mempool/v0/reactor.go b/internal/mempool/v0/reactor.go index b8aac3b5c..80362a04f 100644 --- a/internal/mempool/v0/reactor.go +++ b/internal/mempool/v0/reactor.go @@ -171,7 +171,7 @@ func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error { for _, tx := range protoTxs { if err := r.mempool.CheckTx(context.Background(), types.Tx(tx), nil, txInfo); err != nil { - logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(tx)), "err", err) + logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", types.Tx(tx).Hash()), "err", err) } } @@ -378,7 +378,7 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer) } r.Logger.Debug( "gossiped tx to peer", - "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(memTx.tx)), + "tx", fmt.Sprintf("%X", memTx.tx.Hash()), "peer", peerID, ) } diff --git a/internal/mempool/v1/mempool.go b/internal/mempool/v1/mempool.go index 34e524159..a12fbc51b 100644 --- a/internal/mempool/v1/mempool.go +++ b/internal/mempool/v1/mempool.go @@ -3,6 +3,7 @@ package v1 import ( "bytes" "context" + "errors" "fmt" "sync/atomic" "time" @@ -256,7 +257,7 @@ func (txmp *TxMempool) CheckTx( return err } - txHash := mempool.TxKey(tx) + txHash := tx.Key() // We add the transaction to the mempool's cache and if the transaction already // exists, i.e. false is returned, then we check if we've seen this transaction @@ -304,6 +305,19 @@ func (txmp *TxMempool) CheckTx( return nil } +func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error { + txmp.Lock() + defer txmp.Unlock() + + // remove the committed transaction from the transaction store and indexes + if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil { + txmp.removeTx(wtx, false) + return nil + } + + return errors.New("transaction not found") +} + // Flush flushes out the mempool. It acquires a read-lock, fetches all the // transactions currently in the transaction store and removes each transaction // from the store and all indexes and finally resets the cache. @@ -451,7 +465,7 @@ func (txmp *TxMempool) Update( } // remove the committed transaction from the transaction store and indexes - if wtx := txmp.txStore.GetTxByHash(mempool.TxKey(tx)); wtx != nil { + if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil { txmp.removeTx(wtx, false) } } @@ -629,7 +643,7 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) tx := req.GetCheckTx().Tx wtx := txmp.recheckCursor.Value.(*WrappedTx) if !bytes.Equal(tx, wtx.tx) { - panic(fmt.Sprintf("re-CheckTx transaction mismatch; got: %X, expected: %X", wtx.tx.Hash(), mempool.TxKey(tx))) + panic(fmt.Sprintf("re-CheckTx transaction mismatch; got: %X, expected: %X", wtx.tx.Hash(), types.Tx(tx).Key())) } // Only evaluate transactions that have not been removed. This can happen @@ -647,7 +661,7 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) txmp.logger.Debug( "existing transaction no longer valid; failed re-CheckTx callback", "priority", wtx.priority, - "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(wtx.tx)), + "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "err", err, "code", checkTxRes.CheckTx.Code, ) @@ -784,13 +798,13 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { // the height and time based indexes. func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { now := time.Now() - expiredTxs := make(map[[mempool.TxKeySize]byte]*WrappedTx) + expiredTxs := make(map[types.TxKey]*WrappedTx) if txmp.config.TTLNumBlocks > 0 { purgeIdx := -1 for i, wtx := range txmp.heightIndex.txs { if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks { - expiredTxs[mempool.TxKey(wtx.tx)] = wtx + expiredTxs[wtx.tx.Key()] = wtx purgeIdx = i } else { // since the index is sorted, we know no other txs can be be purged @@ -807,7 +821,7 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { purgeIdx := -1 for i, wtx := range txmp.timestampIndex.txs { if now.Sub(wtx.timestamp) > txmp.config.TTLDuration { - expiredTxs[mempool.TxKey(wtx.tx)] = wtx + expiredTxs[wtx.tx.Key()] = wtx purgeIdx = i } else { // since the index is sorted, we know no other txs can be be purged diff --git a/internal/mempool/v1/mempool_test.go b/internal/mempool/v1/mempool_test.go index 3b7eb02d0..8bed5520a 100644 --- a/internal/mempool/v1/mempool_test.go +++ b/internal/mempool/v1/mempool_test.go @@ -226,10 +226,10 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) - txMap := make(map[[mempool.TxKeySize]byte]testTx) + txMap := make(map[types.TxKey]testTx) priorities := make([]int64, len(tTxs)) for i, tTx := range tTxs { - txMap[mempool.TxKey(tTx.tx)] = tTx + txMap[tTx.tx.Key()] = tTx priorities[i] = tTx.priority } @@ -241,7 +241,7 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { ensurePrioritized := func(reapedTxs types.Txs) { reapedPriorities := make([]int64, len(reapedTxs)) for i, rTx := range reapedTxs { - reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority + reapedPriorities[i] = txMap[rTx.Key()].priority } require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) @@ -276,10 +276,10 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) { require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) - txMap := make(map[[mempool.TxKeySize]byte]testTx) + txMap := make(map[types.TxKey]testTx) priorities := make([]int64, len(tTxs)) for i, tTx := range tTxs { - txMap[mempool.TxKey(tTx.tx)] = tTx + txMap[tTx.tx.Key()] = tTx priorities[i] = tTx.priority } @@ -291,7 +291,7 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) { ensurePrioritized := func(reapedTxs types.Txs) { reapedPriorities := make([]int64, len(reapedTxs)) for i, rTx := range reapedTxs { - reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority + reapedPriorities[i] = txMap[rTx.Key()].priority } require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) diff --git a/internal/mempool/v1/reactor.go b/internal/mempool/v1/reactor.go index 75f89d07e..eff5b7ec0 100644 --- a/internal/mempool/v1/reactor.go +++ b/internal/mempool/v1/reactor.go @@ -178,7 +178,7 @@ func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error { for _, tx := range protoTxs { if err := r.mempool.CheckTx(context.Background(), types.Tx(tx), nil, txInfo); err != nil { - logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(tx)), "err", err) + logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", types.Tx(tx).Hash()), "err", err) } } @@ -386,7 +386,7 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer) } r.Logger.Debug( "gossiped tx to peer", - "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(memTx.tx)), + "tx", fmt.Sprintf("%X", memTx.tx.Hash()), "peer", peerID, ) } diff --git a/internal/mempool/v1/tx.go b/internal/mempool/v1/tx.go index 15173b91f..c5b7ca82f 100644 --- a/internal/mempool/v1/tx.go +++ b/internal/mempool/v1/tx.go @@ -6,7 +6,6 @@ import ( "github.com/tendermint/tendermint/internal/libs/clist" tmsync "github.com/tendermint/tendermint/internal/libs/sync" - "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/types" ) @@ -17,7 +16,7 @@ type WrappedTx struct { tx types.Tx // hash defines the transaction hash and the primary key used in the mempool - hash [mempool.TxKeySize]byte + hash types.TxKey // height defines the height at which the transaction was validated at height int64 @@ -66,14 +65,14 @@ func (wtx *WrappedTx) Size() int { // need mutative access. type TxStore struct { mtx tmsync.RWMutex - hashTxs map[[mempool.TxKeySize]byte]*WrappedTx // primary index - senderTxs map[string]*WrappedTx // sender is defined by the ABCI application + hashTxs map[types.TxKey]*WrappedTx // primary index + senderTxs map[string]*WrappedTx // sender is defined by the ABCI application } func NewTxStore() *TxStore { return &TxStore{ senderTxs: make(map[string]*WrappedTx), - hashTxs: make(map[[mempool.TxKeySize]byte]*WrappedTx), + hashTxs: make(map[types.TxKey]*WrappedTx), } } @@ -110,7 +109,7 @@ func (txs *TxStore) GetTxBySender(sender string) *WrappedTx { } // GetTxByHash returns a *WrappedTx by the transaction's hash. -func (txs *TxStore) GetTxByHash(hash [mempool.TxKeySize]byte) *WrappedTx { +func (txs *TxStore) GetTxByHash(hash types.TxKey) *WrappedTx { txs.mtx.RLock() defer txs.mtx.RUnlock() @@ -119,7 +118,7 @@ func (txs *TxStore) GetTxByHash(hash [mempool.TxKeySize]byte) *WrappedTx { // IsTxRemoved returns true if a transaction by hash is marked as removed and // false otherwise. -func (txs *TxStore) IsTxRemoved(hash [mempool.TxKeySize]byte) bool { +func (txs *TxStore) IsTxRemoved(hash types.TxKey) bool { txs.mtx.RLock() defer txs.mtx.RUnlock() @@ -142,7 +141,7 @@ func (txs *TxStore) SetTx(wtx *WrappedTx) { txs.senderTxs[wtx.sender] = wtx } - txs.hashTxs[mempool.TxKey(wtx.tx)] = wtx + txs.hashTxs[wtx.tx.Key()] = wtx } // RemoveTx removes a *WrappedTx from the transaction store. It deletes all @@ -155,13 +154,13 @@ func (txs *TxStore) RemoveTx(wtx *WrappedTx) { delete(txs.senderTxs, wtx.sender) } - delete(txs.hashTxs, mempool.TxKey(wtx.tx)) + delete(txs.hashTxs, wtx.tx.Key()) wtx.removed = true } // TxHasPeer returns true if a transaction by hash has a given peer ID and false // otherwise. If the transaction does not exist, false is returned. -func (txs *TxStore) TxHasPeer(hash [mempool.TxKeySize]byte, peerID uint16) bool { +func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool { txs.mtx.RLock() defer txs.mtx.RUnlock() @@ -179,7 +178,7 @@ func (txs *TxStore) TxHasPeer(hash [mempool.TxKeySize]byte, peerID uint16) bool // We return true if we've already recorded the given peer for this transaction // and false otherwise. If the transaction does not exist by hash, we return // (nil, false). -func (txs *TxStore) GetOrSetPeerByTxHash(hash [mempool.TxKeySize]byte, peerID uint16) (*WrappedTx, bool) { +func (txs *TxStore) GetOrSetPeerByTxHash(hash types.TxKey, peerID uint16) (*WrappedTx, bool) { txs.mtx.Lock() defer txs.mtx.Unlock() diff --git a/internal/mempool/v1/tx_test.go b/internal/mempool/v1/tx_test.go index c5d488669..fb4beafab 100644 --- a/internal/mempool/v1/tx_test.go +++ b/internal/mempool/v1/tx_test.go @@ -8,7 +8,7 @@ import ( "time" "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/internal/mempool" + "github.com/tendermint/tendermint/types" ) func TestTxStore_GetTxBySender(t *testing.T) { @@ -39,7 +39,7 @@ func TestTxStore_GetTxByHash(t *testing.T) { timestamp: time.Now(), } - key := mempool.TxKey(wtx.tx) + key := wtx.tx.Key() res := txs.GetTxByHash(key) require.Nil(t, res) @@ -58,7 +58,7 @@ func TestTxStore_SetTx(t *testing.T) { timestamp: time.Now(), } - key := mempool.TxKey(wtx.tx) + key := wtx.tx.Key() txs.SetTx(wtx) res := txs.GetTxByHash(key) @@ -81,10 +81,10 @@ func TestTxStore_GetOrSetPeerByTxHash(t *testing.T) { timestamp: time.Now(), } - key := mempool.TxKey(wtx.tx) + key := wtx.tx.Key() txs.SetTx(wtx) - res, ok := txs.GetOrSetPeerByTxHash(mempool.TxKey([]byte("test_tx_2")), 15) + res, ok := txs.GetOrSetPeerByTxHash(types.Tx([]byte("test_tx_2")).Key(), 15) require.Nil(t, res) require.False(t, ok) @@ -110,7 +110,7 @@ func TestTxStore_RemoveTx(t *testing.T) { txs.SetTx(wtx) - key := mempool.TxKey(wtx.tx) + key := wtx.tx.Key() res := txs.GetTxByHash(key) require.NotNil(t, res) diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 6cd05348f..c20f02032 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -151,3 +151,7 @@ func (env *Environment) CheckTx(ctx *rpctypes.Context, tx types.Tx) (*coretypes. } return &coretypes.ResultCheckTx{ResponseCheckTx: *res}, nil } + +func (env *Environment) RemoveTx(ctx *rpctypes.Context, txkey types.TxKey) error { + return env.Mempool.RemoveTxByKey(txkey) +} diff --git a/internal/rpc/core/routes.go b/internal/rpc/core/routes.go index 73eaaf14c..fe99d2118 100644 --- a/internal/rpc/core/routes.go +++ b/internal/rpc/core/routes.go @@ -28,6 +28,7 @@ func (env *Environment) GetRoutes() RoutesMap { "block_results": rpc.NewRPCFunc(env.BlockResults, "height", true), "commit": rpc.NewRPCFunc(env.Commit, "height", true), "check_tx": rpc.NewRPCFunc(env.CheckTx, "tx", true), + "remove_tx": rpc.NewRPCFunc(env.RemoveTx, "txkey", false), "tx": rpc.NewRPCFunc(env.Tx, "hash,prove", true), "tx_search": rpc.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by", false), "block_search": rpc.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by", false), diff --git a/light/rpc/client.go b/light/rpc/client.go index 4461028cd..dc745542e 100644 --- a/light/rpc/client.go +++ b/light/rpc/client.go @@ -212,6 +212,10 @@ func (c *Client) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.ResultChe return c.next.CheckTx(ctx, tx) } +func (c *Client) RemoveTx(ctx context.Context, txKey types.TxKey) error { + return c.next.RemoveTx(ctx, txKey) +} + func (c *Client) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo, error) { return c.next.NetInfo(ctx) } diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index 65d0f434b..d0c1d5621 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -315,6 +315,14 @@ func (c *baseRPCClient) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.Re return result, nil } +func (c *baseRPCClient) RemoveTx(ctx context.Context, txKey types.TxKey) error { + _, err := c.caller.Call(ctx, "remove_tx", map[string]interface{}{"tx_key": txKey}, nil) + if err != nil { + return err + } + return nil +} + func (c *baseRPCClient) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo, error) { result := new(coretypes.ResultNetInfo) _, err := c.caller.Call(ctx, "net_info", map[string]interface{}{}, result) diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 7b6a9bd20..474eb9937 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -146,6 +146,7 @@ type MempoolClient interface { UnconfirmedTxs(ctx context.Context, limit *int) (*coretypes.ResultUnconfirmedTxs, error) NumUnconfirmedTxs(context.Context) (*coretypes.ResultUnconfirmedTxs, error) CheckTx(context.Context, types.Tx) (*coretypes.ResultCheckTx, error) + RemoveTx(context.Context, types.TxKey) error } // EvidenceClient is used for submitting an evidence of the malicious diff --git a/rpc/client/local/local.go b/rpc/client/local/local.go index 108510b02..21ca6e6f1 100644 --- a/rpc/client/local/local.go +++ b/rpc/client/local/local.go @@ -116,6 +116,10 @@ func (c *Local) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.ResultChec return c.env.CheckTx(c.ctx, tx) } +func (c *Local) RemoveTx(ctx context.Context, txKey types.TxKey) error { + return c.env.Mempool.RemoveTxByKey(txKey) +} + func (c *Local) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo, error) { return c.env.NetInfo(c.ctx) } diff --git a/rpc/openapi/openapi.yaml b/rpc/openapi/openapi.yaml index a32d44986..83d85be8f 100644 --- a/rpc/openapi/openapi.yaml +++ b/rpc/openapi/openapi.yaml @@ -237,6 +237,31 @@ paths: application/json: schema: $ref: "#/components/schemas/ErrorResponse" + + /remove_tx: + get: + summary: Removes a transaction from the mempool. + tags: + - TxKey + operationId: remove_tx + parameters: + - in: query + name: txKey + required: true + schema: + type: string + example: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + description: The transaction key + responses: + "200": + description: empty response. + "500": + description: empty error. + content: + application/json: + schema: + $ref: "#/components/schemas/ErrorResponse" + /subscribe: get: summary: Subscribe for events via WebSocket. diff --git a/types/mempool.go b/types/mempool.go index c739796af..fa0f8a208 100644 --- a/types/mempool.go +++ b/types/mempool.go @@ -1,14 +1,16 @@ package types import ( + "crypto/sha256" "errors" "fmt" ) -var ( - // ErrTxInCache is returned to the client if we saw tx earlier - ErrTxInCache = errors.New("tx already exists in cache") -) +// ErrTxInCache is returned to the client if we saw tx earlier +var ErrTxInCache = errors.New("tx already exists in cache") + +// TxKey is the fixed length array key used as an index. +type TxKey [sha256.Size]byte // ErrTxTooLarge defines an error when a transaction is too big to be sent in a // message to other peers. diff --git a/types/tx.go b/types/tx.go index 92df92f13..19ee41dac 100644 --- a/types/tx.go +++ b/types/tx.go @@ -2,6 +2,7 @@ package types import ( "bytes" + "crypto/sha256" "errors" "fmt" @@ -16,15 +17,14 @@ import ( // Might we want types here ? type Tx []byte +// Key produces a fixed-length key for use in indexing. +func (tx Tx) Key() TxKey { return sha256.Sum256(tx) } + // Hash computes the TMHASH hash of the wire encoded transaction. -func (tx Tx) Hash() []byte { - return tmhash.Sum(tx) -} +func (tx Tx) Hash() []byte { return tmhash.Sum(tx) } // String returns the hex-encoded transaction as a string. -func (tx Tx) String() string { - return fmt.Sprintf("Tx{%X}", []byte(tx)) -} +func (tx Tx) String() string { return fmt.Sprintf("Tx{%X}", []byte(tx)) } // Txs is a slice of Tx. type Txs []Tx