diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index c44c58ef0..d80e3ae61 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -30,6 +30,8 @@ Special thanks to external contributors on this release: ### FEATURES +- [mempool, rpc] \#7041 Add removeTx operation to the RPC layer. (@tychoish) + ### IMPROVEMENTS ### BUG FIXES diff --git a/internal/consensus/replay_stubs.go b/internal/consensus/replay_stubs.go index 361ac6ec8..bc8c11cd9 100644 --- a/internal/consensus/replay_stubs.go +++ b/internal/consensus/replay_stubs.go @@ -24,6 +24,7 @@ func (emptyMempool) Size() int { return 0 } func (emptyMempool) CheckTx(_ context.Context, _ types.Tx, _ func(*abci.Response), _ mempool.TxInfo) error { return nil } +func (emptyMempool) RemoveTxByKey(txKey types.TxKey) error { return nil } func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } func (emptyMempool) Update( diff --git a/internal/mempool/cache.go b/internal/mempool/cache.go index 43174f106..3cd45d2bc 100644 --- a/internal/mempool/cache.go +++ b/internal/mempool/cache.go @@ -31,14 +31,14 @@ var _ TxCache = (*LRUTxCache)(nil) type LRUTxCache struct { mtx tmsync.Mutex size int - cacheMap map[[TxKeySize]byte]*list.Element + cacheMap map[types.TxKey]*list.Element list *list.List } func NewLRUTxCache(cacheSize int) *LRUTxCache { return &LRUTxCache{ size: cacheSize, - cacheMap: make(map[[TxKeySize]byte]*list.Element, cacheSize), + cacheMap: make(map[types.TxKey]*list.Element, cacheSize), list: list.New(), } } @@ -53,7 +53,7 @@ func (c *LRUTxCache) Reset() { c.mtx.Lock() defer c.mtx.Unlock() - c.cacheMap = make(map[[TxKeySize]byte]*list.Element, c.size) + c.cacheMap = make(map[types.TxKey]*list.Element, c.size) c.list.Init() } @@ -61,7 +61,7 @@ func (c *LRUTxCache) Push(tx types.Tx) bool { c.mtx.Lock() defer c.mtx.Unlock() - key := TxKey(tx) + key := tx.Key() moved, ok := c.cacheMap[key] if ok { @@ -72,7 +72,7 @@ func (c *LRUTxCache) Push(tx types.Tx) bool { if c.list.Len() >= c.size { front := c.list.Front() if front != nil { - frontKey := front.Value.([TxKeySize]byte) + frontKey := front.Value.(types.TxKey) delete(c.cacheMap, frontKey) c.list.Remove(front) } @@ -88,7 +88,7 @@ func (c *LRUTxCache) Remove(tx types.Tx) { c.mtx.Lock() defer c.mtx.Unlock() - key := TxKey(tx) + key := tx.Key() e := c.cacheMap[key] delete(c.cacheMap, key) diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index d679b3506..6e3955dc3 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -32,6 +32,10 @@ type Mempool interface { // its validity and whether it should be added to the mempool. CheckTx(ctx context.Context, tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error + // RemoveTxByKey removes a transaction, identified by its key, + // from the mempool. + RemoveTxByKey(txKey types.TxKey) error + // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes // bytes total with the condition that the total gasWanted must be less than // maxGas. diff --git a/internal/mempool/mock/mempool.go b/internal/mempool/mock/mempool.go index bf9ce8bd0..8e6f0c7bf 100644 --- a/internal/mempool/mock/mempool.go +++ b/internal/mempool/mock/mempool.go @@ -20,6 +20,7 @@ func (Mempool) Size() int { return 0 } func (Mempool) CheckTx(_ context.Context, _ types.Tx, _ func(*abci.Response), _ mempool.TxInfo) error { return nil } +func (Mempool) RemoveTxByKey(txKey types.TxKey) error { return nil } func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } func (Mempool) Update( diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index 860d3d3b4..adafdf85e 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -1,24 +1,9 @@ package mempool import ( - "crypto/sha256" - "github.com/tendermint/tendermint/types" ) -// TxKeySize defines the size of the transaction's key used for indexing. -const TxKeySize = sha256.Size - -// TxKey is the fixed length array key used as an index. -func TxKey(tx types.Tx) [TxKeySize]byte { - return sha256.Sum256(tx) -} - -// TxHashFromBytes returns the hash of a transaction from raw bytes. -func TxHashFromBytes(tx []byte) []byte { - return types.Tx(tx).Hash() -} - // TxInfo are parameters that get passed when attempting to add a tx to the // mempool. type TxInfo struct { diff --git a/internal/mempool/v0/cache_test.go b/internal/mempool/v0/cache_test.go index 389c0806c..5bf2c7603 100644 --- a/internal/mempool/v0/cache_test.go +++ b/internal/mempool/v0/cache_test.go @@ -61,7 +61,7 @@ func TestCacheAfterUpdate(t *testing.T) { require.NotEqual(t, len(tc.txsInCache), counter, "cache larger than expected on testcase %d", tcIndex) - nodeVal := node.Value.([sha256.Size]byte) + nodeVal := node.Value.(types.TxKey) expectedBz := sha256.Sum256([]byte{byte(tc.txsInCache[len(tc.txsInCache)-counter-1])}) // Reference for reading the errors: // >>> sha256('\x00').hexdigest() @@ -71,7 +71,7 @@ func TestCacheAfterUpdate(t *testing.T) { // >>> sha256('\x02').hexdigest() // 'dbc1b4c900ffe48d575b5da5c638040125f65db0fe3e24494b76ea986457d986' - require.Equal(t, expectedBz, nodeVal, "Equality failed on index %d, tc %d", counter, tcIndex) + require.EqualValues(t, expectedBz, nodeVal, "Equality failed on index %d, tc %d", counter, tcIndex) counter++ node = node.Next() } diff --git a/internal/mempool/v0/clist_mempool.go b/internal/mempool/v0/clist_mempool.go index f3571ede0..7816730c1 100644 --- a/internal/mempool/v0/clist_mempool.go +++ b/internal/mempool/v0/clist_mempool.go @@ -3,6 +3,7 @@ package v0 import ( "bytes" "context" + "errors" "fmt" "sync" "sync/atomic" @@ -240,7 +241,7 @@ func (mem *CListMempool) CheckTx( // Note it's possible a tx is still in the cache but no longer in the mempool // (eg. after committing a block, txs are removed from mempool but not cache), // so we only record the sender for txs still in the mempool. - if e, ok := mem.txsMap.Load(mempool.TxKey(tx)); ok { + if e, ok := mem.txsMap.Load(tx.Key()); ok { memTx := e.(*clist.CElement).Value.(*mempoolTx) _, loaded := memTx.senders.LoadOrStore(txInfo.SenderID, true) // TODO: consider punishing peer for dups, @@ -327,7 +328,7 @@ func (mem *CListMempool) reqResCb( // - resCbFirstTime (lock not held) if tx is valid func (mem *CListMempool) addTx(memTx *mempoolTx) { e := mem.txs.PushBack(memTx) - mem.txsMap.Store(mempool.TxKey(memTx.tx), e) + mem.txsMap.Store(memTx.tx.Key(), e) atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) } @@ -338,7 +339,7 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) { func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) { mem.txs.Remove(elem) elem.DetachPrev() - mem.txsMap.Delete(mempool.TxKey(tx)) + mem.txsMap.Delete(tx.Key()) atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) if removeFromCache { @@ -347,13 +348,16 @@ func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromC } // RemoveTxByKey removes a transaction from the mempool by its TxKey index. -func (mem *CListMempool) RemoveTxByKey(txKey [mempool.TxKeySize]byte, removeFromCache bool) { +func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { if e, ok := mem.txsMap.Load(txKey); ok { memTx := e.(*clist.CElement).Value.(*mempoolTx) if memTx != nil { - mem.removeTx(memTx.tx, e.(*clist.CElement), removeFromCache) + mem.removeTx(memTx.tx, e.(*clist.CElement), false) + return nil } + return errors.New("transaction not found") } + return errors.New("invalid transaction found") } func (mem *CListMempool) isFull(txSize int) error { @@ -409,7 +413,7 @@ func (mem *CListMempool) resCbFirstTime( mem.addTx(memTx) mem.logger.Debug( "added good transaction", - "tx", mempool.TxHashFromBytes(tx), + "tx", types.Tx(tx).Hash(), "res", r, "height", memTx.height, "total", mem.Size(), @@ -419,7 +423,7 @@ func (mem *CListMempool) resCbFirstTime( // ignore bad transaction mem.logger.Debug( "rejected bad transaction", - "tx", mempool.TxHashFromBytes(tx), + "tx", types.Tx(tx).Hash(), "peerID", peerP2PID, "res", r, "err", postCheckErr, @@ -460,7 +464,7 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { // Good, nothing to do. } else { // Tx became invalidated due to newly committed block. - mem.logger.Debug("tx is no longer valid", "tx", mempool.TxHashFromBytes(tx), "res", r, "err", postCheckErr) + mem.logger.Debug("tx is no longer valid", "tx", types.Tx(tx).Hash(), "res", r, "err", postCheckErr) // NOTE: we remove tx from the cache because it might be good later mem.removeTx(tx, mem.recheckCursor, !mem.config.KeepInvalidTxsInCache) } @@ -598,7 +602,7 @@ func (mem *CListMempool) Update( // Mempool after: // 100 // https://github.com/tendermint/tendermint/issues/3322. - if e, ok := mem.txsMap.Load(mempool.TxKey(tx)); ok { + if e, ok := mem.txsMap.Load(tx.Key()); ok { mem.removeTx(tx, e.(*clist.CElement), false) } } diff --git a/internal/mempool/v0/clist_mempool_test.go b/internal/mempool/v0/clist_mempool_test.go index e55942e48..b61a8333e 100644 --- a/internal/mempool/v0/clist_mempool_test.go +++ b/internal/mempool/v0/clist_mempool_test.go @@ -544,9 +544,9 @@ func TestMempoolTxsBytes(t *testing.T) { err = mp.CheckTx(context.Background(), []byte{0x06}, nil, mempool.TxInfo{}) require.NoError(t, err) assert.EqualValues(t, 9, mp.SizeBytes()) - mp.RemoveTxByKey(mempool.TxKey([]byte{0x07}), true) + assert.Error(t, mp.RemoveTxByKey(types.Tx([]byte{0x07}).Key())) assert.EqualValues(t, 9, mp.SizeBytes()) - mp.RemoveTxByKey(mempool.TxKey([]byte{0x06}), true) + assert.NoError(t, mp.RemoveTxByKey(types.Tx([]byte{0x06}).Key())) assert.EqualValues(t, 8, mp.SizeBytes()) } diff --git a/internal/mempool/v0/reactor.go b/internal/mempool/v0/reactor.go index b8aac3b5c..80362a04f 100644 --- a/internal/mempool/v0/reactor.go +++ b/internal/mempool/v0/reactor.go @@ -171,7 +171,7 @@ func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error { for _, tx := range protoTxs { if err := r.mempool.CheckTx(context.Background(), types.Tx(tx), nil, txInfo); err != nil { - logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(tx)), "err", err) + logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", types.Tx(tx).Hash()), "err", err) } } @@ -378,7 +378,7 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer) } r.Logger.Debug( "gossiped tx to peer", - "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(memTx.tx)), + "tx", fmt.Sprintf("%X", memTx.tx.Hash()), "peer", peerID, ) } diff --git a/internal/mempool/v1/mempool.go b/internal/mempool/v1/mempool.go index 34e524159..a12fbc51b 100644 --- a/internal/mempool/v1/mempool.go +++ b/internal/mempool/v1/mempool.go @@ -3,6 +3,7 @@ package v1 import ( "bytes" "context" + "errors" "fmt" "sync/atomic" "time" @@ -256,7 +257,7 @@ func (txmp *TxMempool) CheckTx( return err } - txHash := mempool.TxKey(tx) + txHash := tx.Key() // We add the transaction to the mempool's cache and if the transaction already // exists, i.e. false is returned, then we check if we've seen this transaction @@ -304,6 +305,19 @@ func (txmp *TxMempool) CheckTx( return nil } +func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error { + txmp.Lock() + defer txmp.Unlock() + + // remove the committed transaction from the transaction store and indexes + if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil { + txmp.removeTx(wtx, false) + return nil + } + + return errors.New("transaction not found") +} + // Flush flushes out the mempool. It acquires a read-lock, fetches all the // transactions currently in the transaction store and removes each transaction // from the store and all indexes and finally resets the cache. @@ -451,7 +465,7 @@ func (txmp *TxMempool) Update( } // remove the committed transaction from the transaction store and indexes - if wtx := txmp.txStore.GetTxByHash(mempool.TxKey(tx)); wtx != nil { + if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil { txmp.removeTx(wtx, false) } } @@ -629,7 +643,7 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) tx := req.GetCheckTx().Tx wtx := txmp.recheckCursor.Value.(*WrappedTx) if !bytes.Equal(tx, wtx.tx) { - panic(fmt.Sprintf("re-CheckTx transaction mismatch; got: %X, expected: %X", wtx.tx.Hash(), mempool.TxKey(tx))) + panic(fmt.Sprintf("re-CheckTx transaction mismatch; got: %X, expected: %X", wtx.tx.Hash(), types.Tx(tx).Key())) } // Only evaluate transactions that have not been removed. This can happen @@ -647,7 +661,7 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) txmp.logger.Debug( "existing transaction no longer valid; failed re-CheckTx callback", "priority", wtx.priority, - "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(wtx.tx)), + "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "err", err, "code", checkTxRes.CheckTx.Code, ) @@ -784,13 +798,13 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { // the height and time based indexes. func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { now := time.Now() - expiredTxs := make(map[[mempool.TxKeySize]byte]*WrappedTx) + expiredTxs := make(map[types.TxKey]*WrappedTx) if txmp.config.TTLNumBlocks > 0 { purgeIdx := -1 for i, wtx := range txmp.heightIndex.txs { if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks { - expiredTxs[mempool.TxKey(wtx.tx)] = wtx + expiredTxs[wtx.tx.Key()] = wtx purgeIdx = i } else { // since the index is sorted, we know no other txs can be be purged @@ -807,7 +821,7 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { purgeIdx := -1 for i, wtx := range txmp.timestampIndex.txs { if now.Sub(wtx.timestamp) > txmp.config.TTLDuration { - expiredTxs[mempool.TxKey(wtx.tx)] = wtx + expiredTxs[wtx.tx.Key()] = wtx purgeIdx = i } else { // since the index is sorted, we know no other txs can be be purged diff --git a/internal/mempool/v1/mempool_test.go b/internal/mempool/v1/mempool_test.go index 3b7eb02d0..8bed5520a 100644 --- a/internal/mempool/v1/mempool_test.go +++ b/internal/mempool/v1/mempool_test.go @@ -226,10 +226,10 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) - txMap := make(map[[mempool.TxKeySize]byte]testTx) + txMap := make(map[types.TxKey]testTx) priorities := make([]int64, len(tTxs)) for i, tTx := range tTxs { - txMap[mempool.TxKey(tTx.tx)] = tTx + txMap[tTx.tx.Key()] = tTx priorities[i] = tTx.priority } @@ -241,7 +241,7 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { ensurePrioritized := func(reapedTxs types.Txs) { reapedPriorities := make([]int64, len(reapedTxs)) for i, rTx := range reapedTxs { - reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority + reapedPriorities[i] = txMap[rTx.Key()].priority } require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) @@ -276,10 +276,10 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) { require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) - txMap := make(map[[mempool.TxKeySize]byte]testTx) + txMap := make(map[types.TxKey]testTx) priorities := make([]int64, len(tTxs)) for i, tTx := range tTxs { - txMap[mempool.TxKey(tTx.tx)] = tTx + txMap[tTx.tx.Key()] = tTx priorities[i] = tTx.priority } @@ -291,7 +291,7 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) { ensurePrioritized := func(reapedTxs types.Txs) { reapedPriorities := make([]int64, len(reapedTxs)) for i, rTx := range reapedTxs { - reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority + reapedPriorities[i] = txMap[rTx.Key()].priority } require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) diff --git a/internal/mempool/v1/reactor.go b/internal/mempool/v1/reactor.go index 75f89d07e..eff5b7ec0 100644 --- a/internal/mempool/v1/reactor.go +++ b/internal/mempool/v1/reactor.go @@ -178,7 +178,7 @@ func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error { for _, tx := range protoTxs { if err := r.mempool.CheckTx(context.Background(), types.Tx(tx), nil, txInfo); err != nil { - logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(tx)), "err", err) + logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", types.Tx(tx).Hash()), "err", err) } } @@ -386,7 +386,7 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer) } r.Logger.Debug( "gossiped tx to peer", - "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(memTx.tx)), + "tx", fmt.Sprintf("%X", memTx.tx.Hash()), "peer", peerID, ) } diff --git a/internal/mempool/v1/tx.go b/internal/mempool/v1/tx.go index 15173b91f..c5b7ca82f 100644 --- a/internal/mempool/v1/tx.go +++ b/internal/mempool/v1/tx.go @@ -6,7 +6,6 @@ import ( "github.com/tendermint/tendermint/internal/libs/clist" tmsync "github.com/tendermint/tendermint/internal/libs/sync" - "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/types" ) @@ -17,7 +16,7 @@ type WrappedTx struct { tx types.Tx // hash defines the transaction hash and the primary key used in the mempool - hash [mempool.TxKeySize]byte + hash types.TxKey // height defines the height at which the transaction was validated at height int64 @@ -66,14 +65,14 @@ func (wtx *WrappedTx) Size() int { // need mutative access. type TxStore struct { mtx tmsync.RWMutex - hashTxs map[[mempool.TxKeySize]byte]*WrappedTx // primary index - senderTxs map[string]*WrappedTx // sender is defined by the ABCI application + hashTxs map[types.TxKey]*WrappedTx // primary index + senderTxs map[string]*WrappedTx // sender is defined by the ABCI application } func NewTxStore() *TxStore { return &TxStore{ senderTxs: make(map[string]*WrappedTx), - hashTxs: make(map[[mempool.TxKeySize]byte]*WrappedTx), + hashTxs: make(map[types.TxKey]*WrappedTx), } } @@ -110,7 +109,7 @@ func (txs *TxStore) GetTxBySender(sender string) *WrappedTx { } // GetTxByHash returns a *WrappedTx by the transaction's hash. -func (txs *TxStore) GetTxByHash(hash [mempool.TxKeySize]byte) *WrappedTx { +func (txs *TxStore) GetTxByHash(hash types.TxKey) *WrappedTx { txs.mtx.RLock() defer txs.mtx.RUnlock() @@ -119,7 +118,7 @@ func (txs *TxStore) GetTxByHash(hash [mempool.TxKeySize]byte) *WrappedTx { // IsTxRemoved returns true if a transaction by hash is marked as removed and // false otherwise. -func (txs *TxStore) IsTxRemoved(hash [mempool.TxKeySize]byte) bool { +func (txs *TxStore) IsTxRemoved(hash types.TxKey) bool { txs.mtx.RLock() defer txs.mtx.RUnlock() @@ -142,7 +141,7 @@ func (txs *TxStore) SetTx(wtx *WrappedTx) { txs.senderTxs[wtx.sender] = wtx } - txs.hashTxs[mempool.TxKey(wtx.tx)] = wtx + txs.hashTxs[wtx.tx.Key()] = wtx } // RemoveTx removes a *WrappedTx from the transaction store. It deletes all @@ -155,13 +154,13 @@ func (txs *TxStore) RemoveTx(wtx *WrappedTx) { delete(txs.senderTxs, wtx.sender) } - delete(txs.hashTxs, mempool.TxKey(wtx.tx)) + delete(txs.hashTxs, wtx.tx.Key()) wtx.removed = true } // TxHasPeer returns true if a transaction by hash has a given peer ID and false // otherwise. If the transaction does not exist, false is returned. -func (txs *TxStore) TxHasPeer(hash [mempool.TxKeySize]byte, peerID uint16) bool { +func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool { txs.mtx.RLock() defer txs.mtx.RUnlock() @@ -179,7 +178,7 @@ func (txs *TxStore) TxHasPeer(hash [mempool.TxKeySize]byte, peerID uint16) bool // We return true if we've already recorded the given peer for this transaction // and false otherwise. If the transaction does not exist by hash, we return // (nil, false). -func (txs *TxStore) GetOrSetPeerByTxHash(hash [mempool.TxKeySize]byte, peerID uint16) (*WrappedTx, bool) { +func (txs *TxStore) GetOrSetPeerByTxHash(hash types.TxKey, peerID uint16) (*WrappedTx, bool) { txs.mtx.Lock() defer txs.mtx.Unlock() diff --git a/internal/mempool/v1/tx_test.go b/internal/mempool/v1/tx_test.go index c5d488669..fb4beafab 100644 --- a/internal/mempool/v1/tx_test.go +++ b/internal/mempool/v1/tx_test.go @@ -8,7 +8,7 @@ import ( "time" "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/internal/mempool" + "github.com/tendermint/tendermint/types" ) func TestTxStore_GetTxBySender(t *testing.T) { @@ -39,7 +39,7 @@ func TestTxStore_GetTxByHash(t *testing.T) { timestamp: time.Now(), } - key := mempool.TxKey(wtx.tx) + key := wtx.tx.Key() res := txs.GetTxByHash(key) require.Nil(t, res) @@ -58,7 +58,7 @@ func TestTxStore_SetTx(t *testing.T) { timestamp: time.Now(), } - key := mempool.TxKey(wtx.tx) + key := wtx.tx.Key() txs.SetTx(wtx) res := txs.GetTxByHash(key) @@ -81,10 +81,10 @@ func TestTxStore_GetOrSetPeerByTxHash(t *testing.T) { timestamp: time.Now(), } - key := mempool.TxKey(wtx.tx) + key := wtx.tx.Key() txs.SetTx(wtx) - res, ok := txs.GetOrSetPeerByTxHash(mempool.TxKey([]byte("test_tx_2")), 15) + res, ok := txs.GetOrSetPeerByTxHash(types.Tx([]byte("test_tx_2")).Key(), 15) require.Nil(t, res) require.False(t, ok) @@ -110,7 +110,7 @@ func TestTxStore_RemoveTx(t *testing.T) { txs.SetTx(wtx) - key := mempool.TxKey(wtx.tx) + key := wtx.tx.Key() res := txs.GetTxByHash(key) require.NotNil(t, res) diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 6cd05348f..c20f02032 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -151,3 +151,7 @@ func (env *Environment) CheckTx(ctx *rpctypes.Context, tx types.Tx) (*coretypes. } return &coretypes.ResultCheckTx{ResponseCheckTx: *res}, nil } + +func (env *Environment) RemoveTx(ctx *rpctypes.Context, txkey types.TxKey) error { + return env.Mempool.RemoveTxByKey(txkey) +} diff --git a/internal/rpc/core/routes.go b/internal/rpc/core/routes.go index 73eaaf14c..fe99d2118 100644 --- a/internal/rpc/core/routes.go +++ b/internal/rpc/core/routes.go @@ -28,6 +28,7 @@ func (env *Environment) GetRoutes() RoutesMap { "block_results": rpc.NewRPCFunc(env.BlockResults, "height", true), "commit": rpc.NewRPCFunc(env.Commit, "height", true), "check_tx": rpc.NewRPCFunc(env.CheckTx, "tx", true), + "remove_tx": rpc.NewRPCFunc(env.RemoveTx, "txkey", false), "tx": rpc.NewRPCFunc(env.Tx, "hash,prove", true), "tx_search": rpc.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by", false), "block_search": rpc.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by", false), diff --git a/light/rpc/client.go b/light/rpc/client.go index 4461028cd..dc745542e 100644 --- a/light/rpc/client.go +++ b/light/rpc/client.go @@ -212,6 +212,10 @@ func (c *Client) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.ResultChe return c.next.CheckTx(ctx, tx) } +func (c *Client) RemoveTx(ctx context.Context, txKey types.TxKey) error { + return c.next.RemoveTx(ctx, txKey) +} + func (c *Client) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo, error) { return c.next.NetInfo(ctx) } diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index 65d0f434b..d0c1d5621 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -315,6 +315,14 @@ func (c *baseRPCClient) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.Re return result, nil } +func (c *baseRPCClient) RemoveTx(ctx context.Context, txKey types.TxKey) error { + _, err := c.caller.Call(ctx, "remove_tx", map[string]interface{}{"tx_key": txKey}, nil) + if err != nil { + return err + } + return nil +} + func (c *baseRPCClient) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo, error) { result := new(coretypes.ResultNetInfo) _, err := c.caller.Call(ctx, "net_info", map[string]interface{}{}, result) diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 7b6a9bd20..474eb9937 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -146,6 +146,7 @@ type MempoolClient interface { UnconfirmedTxs(ctx context.Context, limit *int) (*coretypes.ResultUnconfirmedTxs, error) NumUnconfirmedTxs(context.Context) (*coretypes.ResultUnconfirmedTxs, error) CheckTx(context.Context, types.Tx) (*coretypes.ResultCheckTx, error) + RemoveTx(context.Context, types.TxKey) error } // EvidenceClient is used for submitting an evidence of the malicious diff --git a/rpc/client/local/local.go b/rpc/client/local/local.go index 108510b02..21ca6e6f1 100644 --- a/rpc/client/local/local.go +++ b/rpc/client/local/local.go @@ -116,6 +116,10 @@ func (c *Local) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.ResultChec return c.env.CheckTx(c.ctx, tx) } +func (c *Local) RemoveTx(ctx context.Context, txKey types.TxKey) error { + return c.env.Mempool.RemoveTxByKey(txKey) +} + func (c *Local) NetInfo(ctx context.Context) (*coretypes.ResultNetInfo, error) { return c.env.NetInfo(c.ctx) } diff --git a/rpc/openapi/openapi.yaml b/rpc/openapi/openapi.yaml index a32d44986..83d85be8f 100644 --- a/rpc/openapi/openapi.yaml +++ b/rpc/openapi/openapi.yaml @@ -237,6 +237,31 @@ paths: application/json: schema: $ref: "#/components/schemas/ErrorResponse" + + /remove_tx: + get: + summary: Removes a transaction from the mempool. + tags: + - TxKey + operationId: remove_tx + parameters: + - in: query + name: txKey + required: true + schema: + type: string + example: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + description: The transaction key + responses: + "200": + description: empty response. + "500": + description: empty error. + content: + application/json: + schema: + $ref: "#/components/schemas/ErrorResponse" + /subscribe: get: summary: Subscribe for events via WebSocket. diff --git a/types/mempool.go b/types/mempool.go index c739796af..fa0f8a208 100644 --- a/types/mempool.go +++ b/types/mempool.go @@ -1,14 +1,16 @@ package types import ( + "crypto/sha256" "errors" "fmt" ) -var ( - // ErrTxInCache is returned to the client if we saw tx earlier - ErrTxInCache = errors.New("tx already exists in cache") -) +// ErrTxInCache is returned to the client if we saw tx earlier +var ErrTxInCache = errors.New("tx already exists in cache") + +// TxKey is the fixed length array key used as an index. +type TxKey [sha256.Size]byte // ErrTxTooLarge defines an error when a transaction is too big to be sent in a // message to other peers. diff --git a/types/tx.go b/types/tx.go index 92df92f13..19ee41dac 100644 --- a/types/tx.go +++ b/types/tx.go @@ -2,6 +2,7 @@ package types import ( "bytes" + "crypto/sha256" "errors" "fmt" @@ -16,15 +17,14 @@ import ( // Might we want types here ? type Tx []byte +// Key produces a fixed-length key for use in indexing. +func (tx Tx) Key() TxKey { return sha256.Sum256(tx) } + // Hash computes the TMHASH hash of the wire encoded transaction. -func (tx Tx) Hash() []byte { - return tmhash.Sum(tx) -} +func (tx Tx) Hash() []byte { return tmhash.Sum(tx) } // String returns the hex-encoded transaction as a string. -func (tx Tx) String() string { - return fmt.Sprintf("Tx{%X}", []byte(tx)) -} +func (tx Tx) String() string { return fmt.Sprintf("Tx{%X}", []byte(tx)) } // Txs is a slice of Tx. type Txs []Tx