From 8a1404b70e4c58ccb386175a5747e5dbdb5217d7 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 1 Sep 2020 10:21:11 +0400 Subject: [PATCH] mempool: return an error when WAL fails (#5292) Closes #2319 --- mempool/clist_mempool.go | 41 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 75eabd0a5..6118ad58a 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -24,6 +24,8 @@ import ( // TxKeySize is the size of the transaction key index const TxKeySize = sha256.Size +var newline = []byte("\n") + //-------------------------------------------------------------------------------- // CListMempool is an ordered in-memory pool for transactions before they are @@ -253,7 +255,23 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx } } - // CACHE + // NOTE: writing to the WAL and calling proxy must be done before adding tx + // to the cache. otherwise, if either of them fails, next time CheckTx is + // called with tx, ErrTxInCache will be returned without tx being checked at + // all even once. + if mem.wal != nil { + // TODO: Notify administrators when WAL fails + _, err := mem.wal.Write(append([]byte(tx), newline...)) + if err != nil { + return fmt.Errorf("wal.Write: %w", err) + } + } + + // NOTE: proxyAppConn may error if tx buffer is full + if err := mem.proxyAppConn.Error(); err != nil { + return err + } + if !mem.cache.Push(tx) { // Record a new sender for a tx we've already seen. // Note it's possible a tx is still in the cache but no longer in the mempool @@ -265,31 +283,10 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx // TODO: consider punishing peer for dups, // its non-trivial since invalid txs can become valid, // but they can spam the same tx with little cost to them atm. - } return ErrTxInCache } - // END CACHE - - // WAL - if mem.wal != nil { - // TODO: Notify administrators when WAL fails - _, err := mem.wal.Write([]byte(tx)) - if err != nil { - mem.logger.Error("Error writing to WAL", "err", err) - } - _, err = mem.wal.Write([]byte("\n")) - if err != nil { - mem.logger.Error("Error writing to WAL", "err", err) - } - } - // END WAL - - // NOTE: proxyAppConn may error if tx buffer is full - if err := mem.proxyAppConn.Error(); err != nil { - return err - } reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb))