Browse Source

mempool: return an error when WAL fails (#5292)

Closes #2319
pull/5315/head
Anton Kaliaev 4 years ago
committed by GitHub
parent
commit
8a1404b70e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 19 additions and 22 deletions
  1. +19
    -22
      mempool/clist_mempool.go

+ 19
- 22
mempool/clist_mempool.go View File

@ -24,6 +24,8 @@ import (
// TxKeySize is the size of the transaction key index // TxKeySize is the size of the transaction key index
const TxKeySize = sha256.Size const TxKeySize = sha256.Size
var newline = []byte("\n")
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
// CListMempool is an ordered in-memory pool for transactions before they are // CListMempool is an ordered in-memory pool for transactions before they are
@ -253,7 +255,23 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
} }
} }
// CACHE
// NOTE: writing to the WAL and calling proxy must be done before adding tx
// to the cache. otherwise, if either of them fails, next time CheckTx is
// called with tx, ErrTxInCache will be returned without tx being checked at
// all even once.
if mem.wal != nil {
// TODO: Notify administrators when WAL fails
_, err := mem.wal.Write(append([]byte(tx), newline...))
if err != nil {
return fmt.Errorf("wal.Write: %w", err)
}
}
// NOTE: proxyAppConn may error if tx buffer is full
if err := mem.proxyAppConn.Error(); err != nil {
return err
}
if !mem.cache.Push(tx) { if !mem.cache.Push(tx) {
// Record a new sender for a tx we've already seen. // Record a new sender for a tx we've already seen.
// Note it's possible a tx is still in the cache but no longer in the mempool // Note it's possible a tx is still in the cache but no longer in the mempool
@ -265,31 +283,10 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
// TODO: consider punishing peer for dups, // TODO: consider punishing peer for dups,
// its non-trivial since invalid txs can become valid, // its non-trivial since invalid txs can become valid,
// but they can spam the same tx with little cost to them atm. // but they can spam the same tx with little cost to them atm.
} }
return ErrTxInCache return ErrTxInCache
} }
// END CACHE
// WAL
if mem.wal != nil {
// TODO: Notify administrators when WAL fails
_, err := mem.wal.Write([]byte(tx))
if err != nil {
mem.logger.Error("Error writing to WAL", "err", err)
}
_, err = mem.wal.Write([]byte("\n"))
if err != nil {
mem.logger.Error("Error writing to WAL", "err", err)
}
}
// END WAL
// NOTE: proxyAppConn may error if tx buffer is full
if err := mem.proxyAppConn.Error(); err != nil {
return err
}
reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb)) reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb))


Loading…
Cancel
Save