Browse Source

mempool: allow ReapX and CheckTx functions to run in parallel

allow ReapX and CheckTx functions to run in parallel, making it not possible to block certain proposers from creating a new block.

Closes: #2972
pull/4816/head
Anton Kaliaev 5 years ago
committed by GitHub
parent
commit
52784f67d0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 116 additions and 82 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +2
    -2
      consensus/replay_stubs.go
  3. +1
    -1
      libs/os/os.go
  4. +95
    -67
      mempool/clist_mempool.go
  5. +5
    -6
      mempool/doc.go
  6. +5
    -3
      mempool/mempool.go
  7. +2
    -2
      mempool/mock/mempool.go
  8. +4
    -1
      node/node.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -23,6 +23,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [crypto] [\#4721](https://github.com/tendermint/tendermint/pull/4721) Remove `SimpleHashFromMap()` and `SimpleProofsFromMap()` (@erikgrinaker)
- [privval] [\#4744](https://github.com/tendermint/tendermint/pull/4744) Remove deprecated `OldFilePV` (@melekes)
- [mempool] [\#4759](https://github.com/tendermint/tendermint/pull/4759) Modify `Mempool#InitWAL` to return an error (@melekes)
- [types] \#4798 Simplify `VerifyCommitTrusting` func + remove extra validation (@melekes)
- Blockchain Protocol
@ -45,6 +46,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- nostrip: don't strip debugging symbols nor DWARF tables.
- cleveldb: use cleveldb as db backend instead of goleveldb.
- race: pass -race to go build and enable data race detection.
- [mempool] [\#4759](https://github.com/tendermint/tendermint/pull/4759) Allow ReapX and CheckTx functions to run in parallel (@melekes)
- [state] [\#4781](https://github.com/tendermint/tendermint/pull/4781) Export `InitStateVersion` for the initial state version (@erikgrinaker)
- [p2p/conn] \#4795 Return err on `signChallenge()` instead of panic


+ 2
- 2
consensus/replay_stubs.go View File

@ -41,8 +41,8 @@ func (emptyMempool) TxsBytes() int64 { return 0 }
func (emptyMempool) TxsFront() *clist.CElement { return nil }
func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil }
func (emptyMempool) InitWAL() {}
func (emptyMempool) CloseWAL() {}
func (emptyMempool) InitWAL() error { return nil }
func (emptyMempool) CloseWAL() {}
//-----------------------------------------------------------------------------


+ 1
- 1
libs/os/os.go View File

@ -46,7 +46,7 @@ func EnsureDir(dir string, mode os.FileMode) error {
if _, err := os.Stat(dir); os.IsNotExist(err) {
err := os.MkdirAll(dir, mode)
if err != nil {
return fmt.Errorf("could not create directory %v. %v", dir, err)
return fmt.Errorf("could not create directory %v: %w", dir, err)
}
}
return nil


+ 95
- 67
mempool/clist_mempool.go View File

@ -7,9 +7,6 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
@ -32,9 +29,8 @@ import (
// be efficiently accessed by multiple concurrent readers.
type CListMempool struct {
// Atomic integers
height int64 // the last block Update()'d to
txsBytes int64 // total size of mempool, in bytes
rechecking int32 // for re-checking filtered txs on Update()
height int64 // the last block Update()'d to
txsBytes int64 // total size of mempool, in bytes
// notify listeners (ie. consensus) when txs are available
notifiedTxsAvailable bool
@ -42,15 +38,19 @@ type CListMempool struct {
config *cfg.MempoolConfig
proxyMtx sync.Mutex
// Exclusive mutex for Update method to prevent concurrent execution of
// CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods.
updateMtx sync.RWMutex
preCheck PreCheckFunc
postCheck PostCheckFunc
wal *auto.AutoFile // a log of mempool txs
txs *clist.CList // concurrent linked-list of good txs
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
preCheck PreCheckFunc
postCheck PostCheckFunc
// Track whether we're rechecking txs.
// These are not protected by a mutex and are expected to be mutated
// in serial (ie. by abci responses which are called in serial).
// These are not protected by a mutex and are expected to be mutated in
// serial (ie. by abci responses which are called in serial).
recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
@ -62,9 +62,6 @@ type CListMempool struct {
// This reduces the pressure on the proxyApp.
cache txCache
// A log of mempool txs
wal *auto.AutoFile
logger log.Logger
metrics *Metrics
@ -87,7 +84,6 @@ func NewCListMempool(
proxyAppConn: proxyAppConn,
txs: clist.New(),
height: height,
rechecking: 0,
recheckCursor: nil,
recheckEnd: nil,
logger: log.NewNopLogger(),
@ -132,55 +128,64 @@ func WithMetrics(metrics *Metrics) CListMempoolOption {
return func(mem *CListMempool) { mem.metrics = metrics }
}
// *panics* if can't create directory or open file.
// *not thread safe*
func (mem *CListMempool) InitWAL() {
walDir := mem.config.WalDir()
err := tmos.EnsureDir(walDir, 0700)
if err != nil {
panic(errors.Wrap(err, "Error ensuring WAL dir"))
func (mem *CListMempool) InitWAL() error {
var (
walDir = mem.config.WalDir()
walFile = walDir + "/wal"
)
const perm = 0700
if err := tmos.EnsureDir(walDir, perm); err != nil {
return err
}
af, err := auto.OpenAutoFile(walDir + "/wal")
af, err := auto.OpenAutoFile(walFile)
if err != nil {
panic(errors.Wrap(err, "Error opening WAL file"))
return fmt.Errorf("can't open autofile %s: %w", walFile, err)
}
mem.wal = af
return nil
}
func (mem *CListMempool) CloseWAL() {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
if err := mem.wal.Close(); err != nil {
mem.logger.Error("Error closing WAL", "err", err)
}
mem.wal = nil
}
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) Lock() {
mem.proxyMtx.Lock()
mem.updateMtx.Lock()
}
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) Unlock() {
mem.proxyMtx.Unlock()
mem.updateMtx.Unlock()
}
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) Size() int {
return mem.txs.Len()
}
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) TxsBytes() int64 {
return atomic.LoadInt64(&mem.txsBytes)
}
// Lock() must be help by the caller during execution.
func (mem *CListMempool) FlushAppConn() error {
return mem.proxyAppConn.FlushSync()
}
// XXX: Unsafe! Calling Flush may leave mempool in inconsistent state.
func (mem *CListMempool) Flush() {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
mem.updateMtx.RLock()
defer mem.updateMtx.RUnlock()
_ = atomic.SwapInt64(&mem.txsBytes, 0)
mem.cache.Reset()
for e := mem.txs.Front(); e != nil; e = e.Next() {
@ -188,13 +193,17 @@ func (mem *CListMempool) Flush() {
e.DetachPrev()
}
mem.txsMap = sync.Map{}
_ = atomic.SwapInt64(&mem.txsBytes, 0)
mem.txsMap.Range(func(key, _ interface{}) bool {
mem.txsMap.Delete(key)
return true
})
}
// TxsFront returns the first transaction in the ordered list for peer
// goroutines to call .NextWait() on.
// FIXME: leaking implementation details!
//
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) TxsFront() *clist.CElement {
return mem.txs.Front()
}
@ -202,6 +211,8 @@ func (mem *CListMempool) TxsFront() *clist.CElement {
// TxsWaitChan returns a channel to wait on transactions. It will be closed
// once the mempool is not empty (ie. the internal `mem.txs` has at least one
// element)
//
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
return mem.txs.WaitChan()
}
@ -210,21 +221,17 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
// cb: A callback from the CheckTx command.
// It gets called from another goroutine.
// CONTRACT: Either cb will get called, or err returned.
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) {
mem.proxyMtx.Lock()
//
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error {
mem.updateMtx.RLock()
// use defer to unlock mutex because application (*local client*) might panic
defer mem.proxyMtx.Unlock()
defer mem.updateMtx.RUnlock()
var (
memSize = mem.Size()
txsBytes = mem.TxsBytes()
txSize = len(tx)
)
if memSize >= mem.config.Size ||
int64(txSize)+txsBytes > mem.config.MaxTxsBytes {
return ErrMempoolIsFull{
memSize, mem.config.Size,
txsBytes, mem.config.MaxTxsBytes}
txSize := len(tx)
if err := mem.isFull(txSize); err != nil {
return err
}
// The size of the corresponding amino-encoded TxMessage
@ -274,7 +281,7 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
// END WAL
// NOTE: proxyAppConn may error if tx buffer is full
if err = mem.proxyAppConn.Error(); err != nil {
if err := mem.proxyAppConn.Error(); err != nil {
return err
}
@ -290,7 +297,9 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that
// include this information. If we're not in the midst of a recheck, this function will just return,
// so the request specific callback can do the work.
// When rechecking, we don't need the peerID, so the recheck callback happens here.
//
// When rechecking, we don't need the peerID, so the recheck callback happens
// here.
func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {
if mem.recheckCursor == nil {
return
@ -359,6 +368,22 @@ func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromC
}
}
func (mem *CListMempool) isFull(txSize int) error {
var (
memSize = mem.Size()
txsBytes = mem.TxsBytes()
)
if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes {
return ErrMempoolIsFull{
memSize, mem.config.Size,
txsBytes, mem.config.MaxTxsBytes,
}
}
return nil
}
// callback, which is called after the app checked the tx for the first time.
//
// The case where the app checks the tx for the second and subsequent times is
@ -376,6 +401,15 @@ func (mem *CListMempool) resCbFirstTime(
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
// Check mempool isn't full again to reduce the chance of exceeding the
// limits.
if err := mem.isFull(len(tx)); err != nil {
// remove from cache (mempool might have a space later)
mem.cache.Remove(tx)
mem.logger.Error(err.Error())
return
}
memTx := &mempoolTx{
height: mem.height,
gasWanted: r.CheckTx.GasWanted,
@ -437,7 +471,6 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
}
if mem.recheckCursor == nil {
// Done!
atomic.StoreInt32(&mem.rechecking, 0)
mem.logger.Info("Done rechecking txs")
// incase the recheck removed all txs
@ -450,6 +483,7 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
}
}
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) TxsAvailable() <-chan struct{} {
return mem.txsAvailable
}
@ -468,17 +502,15 @@ func (mem *CListMempool) notifyTxsAvailable() {
}
}
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
mem.updateMtx.RLock()
defer mem.updateMtx.RUnlock()
for atomic.LoadInt32(&mem.rechecking) > 0 {
// TODO: Something better?
time.Sleep(time.Millisecond * 10)
}
var totalBytes int64
var totalGas int64
var (
totalBytes int64
totalGas int64
)
// TODO: we will get a performance boost if we have a good estimate of avg
// size per tx, and set the initial capacity based off of that.
// txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max/mem.avgTxSize))
@ -505,19 +537,15 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
return txs
}
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
mem.updateMtx.RLock()
defer mem.updateMtx.RUnlock()
if max < 0 {
max = mem.txs.Len()
}
for atomic.LoadInt32(&mem.rechecking) > 0 {
// TODO: Something better?
time.Sleep(time.Millisecond * 10)
}
txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max))
for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() {
memTx := e.Value.(*mempoolTx)
@ -526,6 +554,7 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
return txs
}
// Lock() must be help by the caller during execution.
func (mem *CListMempool) Update(
height int64,
txs types.Txs,
@ -593,7 +622,6 @@ func (mem *CListMempool) recheckTxs() {
panic("recheckTxs is called, but the mempool is empty")
}
atomic.StoreInt32(&mem.rechecking, 1)
mem.recheckCursor = mem.txs.Front()
mem.recheckEnd = mem.txs.Back()


+ 5
- 6
mempool/doc.go View File

@ -6,19 +6,18 @@
// safely by calling .NextWait() on each element.
// So we have several go-routines:
// 1. Consensus calling Update() and Reap() synchronously
// 1. Consensus calling Update() and ReapMaxBytesMaxGas() synchronously
// 2. Many mempool reactor's peer routines calling CheckTx()
// 3. Many mempool reactor's peer routines traversing the txs linked list
// 4. Another goroutine calling GarbageCollectTxs() periodically
// To manage these goroutines, there are three methods of locking.
// 1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe)
// 2. Mutations to the linked-list elements are atomic
// 3. CheckTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx
// 3. CheckTx() and/or ReapMaxBytesMaxGas() calls can be paused upon Update(), protected by .updateMtx
// Garbage collection of old elements from mempool.txs is handlde via
// the DetachPrev() call, which makes old elements not reachable by
// peer broadcastTxRoutine() automatically garbage collected.
// Garbage collection of old elements from mempool.txs is handlde via the
// DetachPrev() call, which makes old elements not reachable by peer
// broadcastTxRoutine().
// TODO: Better handle abci client errors. (make it automatically handle connection errors)
package mempool

+ 5
- 3
mempool/mempool.go View File

@ -37,7 +37,7 @@ type Mempool interface {
// Update informs the mempool that the given txs were committed and can be discarded.
// NOTE: this should be called *after* block is committed by consensus.
// NOTE: unsafe; Lock/Unlock must be managed by caller
// NOTE: Lock/Unlock must be managed by caller
Update(
blockHeight int64,
blockTxs types.Txs,
@ -48,6 +48,7 @@ type Mempool interface {
// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
// done. E.g. from CheckTx.
// NOTE: Lock/Unlock must be managed by caller
FlushAppConn() error
// Flush removes all transactions from the mempool and cache
@ -68,8 +69,9 @@ type Mempool interface {
// TxsBytes returns the total size of all txs in the mempool.
TxsBytes() int64
// InitWAL creates a directory for the WAL file and opens a file itself.
InitWAL()
// InitWAL creates a directory for the WAL file and opens a file itself. If
// there is an error, it will be of type *PathError.
InitWAL() error
// CloseWAL closes and discards the underlying WAL file.
// Any further writes will not be relayed to disk.


+ 2
- 2
mempool/mock/mempool.go View File

@ -38,5 +38,5 @@ func (Mempool) TxsBytes() int64 { return 0 }
func (Mempool) TxsFront() *clist.CElement { return nil }
func (Mempool) TxsWaitChan() <-chan struct{} { return nil }
func (Mempool) InitWAL() {}
func (Mempool) CloseWAL() {}
func (Mempool) InitWAL() error { return nil }
func (Mempool) CloseWAL() {}

+ 4
- 1
node/node.go View File

@ -875,7 +875,10 @@ func (n *Node) OnStart() error {
n.isListening = true
if n.config.Mempool.WalEnabled() {
n.mempool.InitWAL() // no need to have the mempool wal during tests
err = n.mempool.InitWAL()
if err != nil {
return fmt.Errorf("init mempool WAL: %w", err)
}
}
// Start the switch (the P2P server).


Loading…
Cancel
Save