diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 50d60f62b..3f8eb17f9 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -18,6 +18,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - Go API - [privval] [\#4744](https://github.com/tendermint/tendermint/pull/4744) Remove deprecated `OldFilePV` (@melekes) + - [mempool] [\#4759](https://github.com/tendermint/tendermint/pull/4759) Modify `Mempool#InitWAL` to return an error (@melekes) - Blockchain Protocol @@ -32,6 +33,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - nostrip: don't strip debugging symbols nor DWARF tables. - cleveldb: use cleveldb as db backend instead of goleveldb. - race: pass -race to go build and enable data race detection. +- [mempool] [\#4759](https://github.com/tendermint/tendermint/pull/4759) Allow ReapX and CheckTx functions to run in parallel (@melekes) ### BUG FIXES: diff --git a/libs/os/os.go b/libs/os/os.go index b56726c94..4773feef0 100644 --- a/libs/os/os.go +++ b/libs/os/os.go @@ -46,7 +46,7 @@ func EnsureDir(dir string, mode os.FileMode) error { if _, err := os.Stat(dir); os.IsNotExist(err) { err := os.MkdirAll(dir, mode) if err != nil { - return fmt.Errorf("could not create directory %v. %v", dir, err) + return fmt.Errorf("could not create directory %v: %w", dir, err) } } return nil diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 7732032a4..f55d62329 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -7,9 +7,6 @@ import ( "fmt" "sync" "sync/atomic" - "time" - - "github.com/pkg/errors" abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" @@ -32,9 +29,8 @@ import ( // be efficiently accessed by multiple concurrent readers. type CListMempool struct { // Atomic integers - height int64 // the last block Update()'d to - txsBytes int64 // total size of mempool, in bytes - rechecking int32 // for re-checking filtered txs on Update() + height int64 // the last block Update()'d to + txsBytes int64 // total size of mempool, in bytes // notify listeners (ie. consensus) when txs are available notifiedTxsAvailable bool @@ -42,15 +38,19 @@ type CListMempool struct { config *cfg.MempoolConfig - proxyMtx sync.Mutex + // Exclusive mutex for Update method to prevent concurrent execution of + // CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods. + updateMtx sync.RWMutex + preCheck PreCheckFunc + postCheck PostCheckFunc + + wal *auto.AutoFile // a log of mempool txs + txs *clist.CList // concurrent linked-list of good txs proxyAppConn proxy.AppConnMempool - txs *clist.CList // concurrent linked-list of good txs - preCheck PreCheckFunc - postCheck PostCheckFunc // Track whether we're rechecking txs. - // These are not protected by a mutex and are expected to be mutated - // in serial (ie. by abci responses which are called in serial). + // These are not protected by a mutex and are expected to be mutated in + // serial (ie. by abci responses which are called in serial). recheckCursor *clist.CElement // next expected response recheckEnd *clist.CElement // re-checking stops here @@ -62,9 +62,6 @@ type CListMempool struct { // This reduces the pressure on the proxyApp. cache txCache - // A log of mempool txs - wal *auto.AutoFile - logger log.Logger metrics *Metrics @@ -87,7 +84,6 @@ func NewCListMempool( proxyAppConn: proxyAppConn, txs: clist.New(), height: height, - rechecking: 0, recheckCursor: nil, recheckEnd: nil, logger: log.NewNopLogger(), @@ -132,55 +128,64 @@ func WithMetrics(metrics *Metrics) CListMempoolOption { return func(mem *CListMempool) { mem.metrics = metrics } } -// *panics* if can't create directory or open file. -// *not thread safe* -func (mem *CListMempool) InitWAL() { - walDir := mem.config.WalDir() - err := tmos.EnsureDir(walDir, 0700) - if err != nil { - panic(errors.Wrap(err, "Error ensuring WAL dir")) +func (mem *CListMempool) InitWAL() error { + var ( + walDir = mem.config.WalDir() + walFile = walDir + "/wal" + ) + + const perm = 0700 + if err := tmos.EnsureDir(walDir, perm); err != nil { + return err } - af, err := auto.OpenAutoFile(walDir + "/wal") + + af, err := auto.OpenAutoFile(walFile) if err != nil { - panic(errors.Wrap(err, "Error opening WAL file")) + return fmt.Errorf("can't open autofile %s: %w", walFile, err) } + mem.wal = af + return nil } func (mem *CListMempool) CloseWAL() { - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() - if err := mem.wal.Close(); err != nil { mem.logger.Error("Error closing WAL", "err", err) } mem.wal = nil } +// Safe for concurrent use by multiple goroutines. func (mem *CListMempool) Lock() { - mem.proxyMtx.Lock() + mem.updateMtx.Lock() } +// Safe for concurrent use by multiple goroutines. func (mem *CListMempool) Unlock() { - mem.proxyMtx.Unlock() + mem.updateMtx.Unlock() } +// Safe for concurrent use by multiple goroutines. func (mem *CListMempool) Size() int { return mem.txs.Len() } +// Safe for concurrent use by multiple goroutines. func (mem *CListMempool) TxsBytes() int64 { return atomic.LoadInt64(&mem.txsBytes) } +// Lock() must be help by the caller during execution. func (mem *CListMempool) FlushAppConn() error { return mem.proxyAppConn.FlushSync() } +// XXX: Unsafe! Calling Flush may leave mempool in inconsistent state. func (mem *CListMempool) Flush() { - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() + mem.updateMtx.RLock() + defer mem.updateMtx.RUnlock() + _ = atomic.SwapInt64(&mem.txsBytes, 0) mem.cache.Reset() for e := mem.txs.Front(); e != nil; e = e.Next() { @@ -188,13 +193,17 @@ func (mem *CListMempool) Flush() { e.DetachPrev() } - mem.txsMap = sync.Map{} - _ = atomic.SwapInt64(&mem.txsBytes, 0) + mem.txsMap.Range(func(key, _ interface{}) bool { + mem.txsMap.Delete(key) + return true + }) } // TxsFront returns the first transaction in the ordered list for peer // goroutines to call .NextWait() on. // FIXME: leaking implementation details! +// +// Safe for concurrent use by multiple goroutines. func (mem *CListMempool) TxsFront() *clist.CElement { return mem.txs.Front() } @@ -202,6 +211,8 @@ func (mem *CListMempool) TxsFront() *clist.CElement { // TxsWaitChan returns a channel to wait on transactions. It will be closed // once the mempool is not empty (ie. the internal `mem.txs` has at least one // element) +// +// Safe for concurrent use by multiple goroutines. func (mem *CListMempool) TxsWaitChan() <-chan struct{} { return mem.txs.WaitChan() } @@ -210,21 +221,17 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} { // cb: A callback from the CheckTx command. // It gets called from another goroutine. // CONTRACT: Either cb will get called, or err returned. -func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) { - mem.proxyMtx.Lock() +// +// Safe for concurrent use by multiple goroutines. +func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error { + mem.updateMtx.RLock() // use defer to unlock mutex because application (*local client*) might panic - defer mem.proxyMtx.Unlock() + defer mem.updateMtx.RUnlock() - var ( - memSize = mem.Size() - txsBytes = mem.TxsBytes() - txSize = len(tx) - ) - if memSize >= mem.config.Size || - int64(txSize)+txsBytes > mem.config.MaxTxsBytes { - return ErrMempoolIsFull{ - memSize, mem.config.Size, - txsBytes, mem.config.MaxTxsBytes} + txSize := len(tx) + + if err := mem.isFull(txSize); err != nil { + return err } // The size of the corresponding amino-encoded TxMessage @@ -274,7 +281,7 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx // END WAL // NOTE: proxyAppConn may error if tx buffer is full - if err = mem.proxyAppConn.Error(); err != nil { + if err := mem.proxyAppConn.Error(); err != nil { return err } @@ -290,7 +297,9 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx // and peerID is not included in the ABCI request, so we have to set request-specific callbacks that // include this information. If we're not in the midst of a recheck, this function will just return, // so the request specific callback can do the work. -// When rechecking, we don't need the peerID, so the recheck callback happens here. +// +// When rechecking, we don't need the peerID, so the recheck callback happens +// here. func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { if mem.recheckCursor == nil { return @@ -359,6 +368,22 @@ func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromC } } +func (mem *CListMempool) isFull(txSize int) error { + var ( + memSize = mem.Size() + txsBytes = mem.TxsBytes() + ) + + if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes { + return ErrMempoolIsFull{ + memSize, mem.config.Size, + txsBytes, mem.config.MaxTxsBytes, + } + } + + return nil +} + // callback, which is called after the app checked the tx for the first time. // // The case where the app checks the tx for the second and subsequent times is @@ -376,6 +401,15 @@ func (mem *CListMempool) resCbFirstTime( postCheckErr = mem.postCheck(tx, r.CheckTx) } if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { + // Check mempool isn't full again to reduce the chance of exceeding the + // limits. + if err := mem.isFull(len(tx)); err != nil { + // remove from cache (mempool might have a space later) + mem.cache.Remove(tx) + mem.logger.Error(err.Error()) + return + } + memTx := &mempoolTx{ height: mem.height, gasWanted: r.CheckTx.GasWanted, @@ -437,7 +471,6 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { } if mem.recheckCursor == nil { // Done! - atomic.StoreInt32(&mem.rechecking, 0) mem.logger.Info("Done rechecking txs") // incase the recheck removed all txs @@ -450,6 +483,7 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { } } +// Safe for concurrent use by multiple goroutines. func (mem *CListMempool) TxsAvailable() <-chan struct{} { return mem.txsAvailable } @@ -468,17 +502,15 @@ func (mem *CListMempool) notifyTxsAvailable() { } } +// Safe for concurrent use by multiple goroutines. func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() + mem.updateMtx.RLock() + defer mem.updateMtx.RUnlock() - for atomic.LoadInt32(&mem.rechecking) > 0 { - // TODO: Something better? - time.Sleep(time.Millisecond * 10) - } - - var totalBytes int64 - var totalGas int64 + var ( + totalBytes int64 + totalGas int64 + ) // TODO: we will get a performance boost if we have a good estimate of avg // size per tx, and set the initial capacity based off of that. // txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max/mem.avgTxSize)) @@ -505,19 +537,15 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { return txs } +// Safe for concurrent use by multiple goroutines. func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() + mem.updateMtx.RLock() + defer mem.updateMtx.RUnlock() if max < 0 { max = mem.txs.Len() } - for atomic.LoadInt32(&mem.rechecking) > 0 { - // TODO: Something better? - time.Sleep(time.Millisecond * 10) - } - txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max)) for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() { memTx := e.Value.(*mempoolTx) @@ -526,6 +554,7 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { return txs } +// Lock() must be help by the caller during execution. func (mem *CListMempool) Update( height int64, txs types.Txs, @@ -593,7 +622,6 @@ func (mem *CListMempool) recheckTxs() { panic("recheckTxs is called, but the mempool is empty") } - atomic.StoreInt32(&mem.rechecking, 1) mem.recheckCursor = mem.txs.Front() mem.recheckEnd = mem.txs.Back() diff --git a/mempool/doc.go b/mempool/doc.go index ddd47aa2d..7e6363e12 100644 --- a/mempool/doc.go +++ b/mempool/doc.go @@ -6,19 +6,18 @@ // safely by calling .NextWait() on each element. // So we have several go-routines: -// 1. Consensus calling Update() and Reap() synchronously +// 1. Consensus calling Update() and ReapMaxBytesMaxGas() synchronously // 2. Many mempool reactor's peer routines calling CheckTx() // 3. Many mempool reactor's peer routines traversing the txs linked list -// 4. Another goroutine calling GarbageCollectTxs() periodically // To manage these goroutines, there are three methods of locking. // 1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe) // 2. Mutations to the linked-list elements are atomic -// 3. CheckTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx +// 3. CheckTx() and/or ReapMaxBytesMaxGas() calls can be paused upon Update(), protected by .updateMtx -// Garbage collection of old elements from mempool.txs is handlde via -// the DetachPrev() call, which makes old elements not reachable by -// peer broadcastTxRoutine() automatically garbage collected. +// Garbage collection of old elements from mempool.txs is handlde via the +// DetachPrev() call, which makes old elements not reachable by peer +// broadcastTxRoutine(). // TODO: Better handle abci client errors. (make it automatically handle connection errors) package mempool diff --git a/mempool/mempool.go b/mempool/mempool.go index 97919ab99..68eec8674 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -37,7 +37,7 @@ type Mempool interface { // Update informs the mempool that the given txs were committed and can be discarded. // NOTE: this should be called *after* block is committed by consensus. - // NOTE: unsafe; Lock/Unlock must be managed by caller + // NOTE: Lock/Unlock must be managed by caller Update( blockHeight int64, blockTxs types.Txs, @@ -48,6 +48,7 @@ type Mempool interface { // FlushAppConn flushes the mempool connection to ensure async reqResCb calls are // done. E.g. from CheckTx. + // NOTE: Lock/Unlock must be managed by caller FlushAppConn() error // Flush removes all transactions from the mempool and cache @@ -68,8 +69,9 @@ type Mempool interface { // TxsBytes returns the total size of all txs in the mempool. TxsBytes() int64 - // InitWAL creates a directory for the WAL file and opens a file itself. - InitWAL() + // InitWAL creates a directory for the WAL file and opens a file itself. If + // there is an error, it will be of type *PathError. + InitWAL() error // CloseWAL closes and discards the underlying WAL file. // Any further writes will not be relayed to disk. diff --git a/mock/mempool.go b/mock/mempool.go index 8c5b6e38f..be690efaa 100644 --- a/mock/mempool.go +++ b/mock/mempool.go @@ -38,5 +38,5 @@ func (Mempool) TxsBytes() int64 { return 0 } func (Mempool) TxsFront() *clist.CElement { return nil } func (Mempool) TxsWaitChan() <-chan struct{} { return nil } -func (Mempool) InitWAL() {} -func (Mempool) CloseWAL() {} +func (Mempool) InitWAL() error { return nil } +func (Mempool) CloseWAL() {} diff --git a/node/node.go b/node/node.go index 06125c762..744422d36 100644 --- a/node/node.go +++ b/node/node.go @@ -773,7 +773,10 @@ func (n *Node) OnStart() error { n.isListening = true if n.config.Mempool.WalEnabled() { - n.mempool.InitWAL() // no need to have the mempool wal during tests + err = n.mempool.InitWAL() + if err != nil { + return fmt.Errorf("init mempool WAL: %w", err) + } } // Start the switch (the P2P server).