diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 1b3aee56f..0e338ecf6 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -13,6 +13,7 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" + memplmock "github.com/tendermint/tendermint/mempool/mock" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" @@ -92,7 +93,7 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals // pool.height is determined from the store. fastSync := true blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(), - sm.MockMempool{}, sm.MockEvidencePool{}) + memplmock.Mempool{}, sm.MockEvidencePool{}) // let's add some blocks in for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { diff --git a/consensus/common_test.go b/consensus/common_test.go index 5706f2317..0a24ec4e2 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -268,7 +268,7 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S proxyAppConnCon := abcicli.NewLocalClient(mtx, app) // Make Mempool - mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0) + mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0) mempool.SetLogger(log.TestingLogger().With("module", "mempool")) if thisConfig.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index e7669b177..65fd31a79 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -11,13 +11,13 @@ import ( "github.com/tendermint/tendermint/abci/example/code" abci "github.com/tendermint/tendermint/abci/types" - sm "github.com/tendermint/tendermint/state" + mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/types" ) // for testing -func assertMempool(txn txNotifier) sm.Mempool { - return txn.(sm.Mempool) +func assertMempool(txn txNotifier) mempl.Mempool { + return txn.(mempl.Mempool) } func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) { diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index dfd6f2508..ebc0cf571 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -136,7 +136,7 @@ func TestReactorWithEvidence(t *testing.T) { proxyAppConnCon := abcicli.NewLocalClient(mtx, app) // Make Mempool - mempool := mempl.NewMempool(thisConfig.Mempool, proxyAppConnMem, 0) + mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0) mempool.SetLogger(log.TestingLogger().With("module", "mempool")) if thisConfig.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() diff --git a/consensus/replay.go b/consensus/replay.go index e47d4892a..d44d5e9e8 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -16,11 +16,11 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" + memplmock "github.com/tendermint/tendermint/mempool/mock" ) var crc32c = crc32.MakeTable(crc32.Castagnoli) @@ -442,7 +442,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap block := h.store.LoadBlock(height) meta := h.store.LoadBlockMeta(height) - blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, sm.MockMempool{}, sm.MockEvidencePool{}) + blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, memplmock.Mempool{}, sm.MockEvidencePool{}) blockExec.SetEventBus(h.eventBus) var err error diff --git a/consensus/replay_file.go b/consensus/replay_file.go index d45f9c5a4..366d1fc6d 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -19,6 +19,7 @@ import ( "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" + memplmock "github.com/tendermint/tendermint/mempool/mock" ) const ( @@ -312,7 +313,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo cmn.Exit(fmt.Sprintf("Error on handshake: %v", err)) } - mempool, evpool := sm.MockMempool{}, sm.MockEvidencePool{} + mempool, evpool := memplmock.Mempool{}, sm.MockEvidencePool{} blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) consensusState := NewConsensusState(csConfig, state.Copy(), blockExec, diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 86dca7657..368a4d56d 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -21,6 +21,7 @@ import ( "github.com/tendermint/tendermint/crypto" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" + memplmock "github.com/tendermint/tendermint/mempool/mock" "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" @@ -267,7 +268,7 @@ const ( ) var ( - mempool = sm.MockMempool{} + mempool = memplmock.Mempool{} evpool = sm.MockEvidencePool{} ) diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index 1a4cfb9ff..6b2c6362c 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -10,12 +10,14 @@ import ( "time" "github.com/pkg/errors" + "github.com/tendermint/tendermint/abci/example/kvstore" bc "github.com/tendermint/tendermint/blockchain" cfg "github.com/tendermint/tendermint/config" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" + memplmock "github.com/tendermint/tendermint/mempool/mock" "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" @@ -66,7 +68,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { return errors.Wrap(err, "failed to start event bus") } defer eventBus.Stop() - mempool := sm.MockMempool{} + mempool := memplmock.Mempool{} evpool := sm.MockEvidencePool{} blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go new file mode 100644 index 000000000..a4ac90fa6 --- /dev/null +++ b/mempool/clist_mempool.go @@ -0,0 +1,742 @@ +package mempool + +import ( + "bytes" + "container/list" + "crypto/sha256" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + + abci "github.com/tendermint/tendermint/abci/types" + cfg "github.com/tendermint/tendermint/config" + auto "github.com/tendermint/tendermint/libs/autofile" + "github.com/tendermint/tendermint/libs/clist" + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" +) + +//-------------------------------------------------------------------------------- + +// CListMempool is an ordered in-memory pool for transactions before they are +// proposed in a consensus round. Transaction validity is checked using the +// CheckTx abci message before the transaction is added to the pool. The +// mempool uses a concurrent list structure for storing transactions that can +// be efficiently accessed by multiple concurrent readers. +type CListMempool struct { + config *cfg.MempoolConfig + + proxyMtx sync.Mutex + proxyAppConn proxy.AppConnMempool + txs *clist.CList // concurrent linked-list of good txs + preCheck PreCheckFunc + postCheck PostCheckFunc + + // Track whether we're rechecking txs. + // These are not protected by a mutex and are expected to be mutated + // in serial (ie. by abci responses which are called in serial). + recheckCursor *clist.CElement // next expected response + recheckEnd *clist.CElement // re-checking stops here + + // notify listeners (ie. consensus) when txs are available + notifiedTxsAvailable bool + txsAvailable chan struct{} // fires once for each height, when the mempool is not empty + + // Map for quick access to txs to record sender in CheckTx. + // txsMap: txKey -> CElement + txsMap sync.Map + + // Atomic integers + height int64 // the last block Update()'d to + rechecking int32 // for re-checking filtered txs on Update() + txsBytes int64 // total size of mempool, in bytes + + // Keep a cache of already-seen txs. + // This reduces the pressure on the proxyApp. + cache txCache + + // A log of mempool txs + wal *auto.AutoFile + + logger log.Logger + + metrics *Metrics +} + +// Option sets an optional parameter on the mempool. +type Option func(*CListMempool) + +// NewCListMempool returns a new mempool with the given configuration and connection to an application. +func NewCListMempool( + config *cfg.MempoolConfig, + proxyAppConn proxy.AppConnMempool, + height int64, + options ...Option, +) *CListMempool { + mempool := &CListMempool{ + config: config, + proxyAppConn: proxyAppConn, + txs: clist.New(), + height: height, + rechecking: 0, + recheckCursor: nil, + recheckEnd: nil, + logger: log.NewNopLogger(), + metrics: NopMetrics(), + } + if config.CacheSize > 0 { + mempool.cache = newMapTxCache(config.CacheSize) + } else { + mempool.cache = nopTxCache{} + } + proxyAppConn.SetResponseCallback(mempool.globalCb) + for _, option := range options { + option(mempool) + } + return mempool +} + +// EnableTxsAvailable initializes the TxsAvailable channel, +// ensuring it will trigger once every height when transactions are available. +// NOTE: not thread safe - should only be called once, on startup +func (mem *CListMempool) EnableTxsAvailable() { + mem.txsAvailable = make(chan struct{}, 1) +} + +// SetLogger sets the Logger. +func (mem *CListMempool) SetLogger(l log.Logger) { + mem.logger = l +} + +// WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns +// false. This is ran before CheckTx. +func WithPreCheck(f PreCheckFunc) Option { + return func(mem *CListMempool) { mem.preCheck = f } +} + +// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns +// false. This is ran after CheckTx. +func WithPostCheck(f PostCheckFunc) Option { + return func(mem *CListMempool) { mem.postCheck = f } +} + +// WithMetrics sets the metrics. +func WithMetrics(metrics *Metrics) Option { + return func(mem *CListMempool) { mem.metrics = metrics } +} + +// InitWAL creates a directory for the WAL file and opens a file itself. +// +// *panics* if can't create directory or open file. +// *not thread safe* +func (mem *CListMempool) InitWAL() { + walDir := mem.config.WalDir() + err := cmn.EnsureDir(walDir, 0700) + if err != nil { + panic(errors.Wrap(err, "Error ensuring WAL dir")) + } + af, err := auto.OpenAutoFile(walDir + "/wal") + if err != nil { + panic(errors.Wrap(err, "Error opening WAL file")) + } + mem.wal = af +} + +// CloseWAL closes and discards the underlying WAL file. +// Any further writes will not be relayed to disk. +func (mem *CListMempool) CloseWAL() { + mem.proxyMtx.Lock() + defer mem.proxyMtx.Unlock() + + if err := mem.wal.Close(); err != nil { + mem.logger.Error("Error closing WAL", "err", err) + } + mem.wal = nil +} + +// Lock locks the mempool. The consensus must be able to hold lock to safely update. +func (mem *CListMempool) Lock() { + mem.proxyMtx.Lock() +} + +// Unlock unlocks the mempool. +func (mem *CListMempool) Unlock() { + mem.proxyMtx.Unlock() +} + +// Size returns the number of transactions in the mempool. +func (mem *CListMempool) Size() int { + return mem.txs.Len() +} + +// TxsBytes returns the total size of all txs in the mempool. +func (mem *CListMempool) TxsBytes() int64 { + return atomic.LoadInt64(&mem.txsBytes) +} + +// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are +// done. E.g. from CheckTx. +func (mem *CListMempool) FlushAppConn() error { + return mem.proxyAppConn.FlushSync() +} + +// Flush removes all transactions from the mempool and cache +func (mem *CListMempool) Flush() { + mem.proxyMtx.Lock() + defer mem.proxyMtx.Unlock() + + mem.cache.Reset() + + for e := mem.txs.Front(); e != nil; e = e.Next() { + mem.txs.Remove(e) + e.DetachPrev() + } + + mem.txsMap = sync.Map{} + _ = atomic.SwapInt64(&mem.txsBytes, 0) +} + +// TxsFront returns the first transaction in the ordered list for peer +// goroutines to call .NextWait() on. +func (mem *CListMempool) TxsFront() *clist.CElement { + return mem.txs.Front() +} + +// TxsWaitChan returns a channel to wait on transactions. It will be closed +// once the mempool is not empty (ie. the internal `mem.txs` has at least one +// element) +func (mem *CListMempool) TxsWaitChan() <-chan struct{} { + return mem.txs.WaitChan() +} + +// CheckTx executes a new transaction against the application to determine its validity +// and whether it should be added to the mempool. +// It blocks if we're waiting on Update() or Reap(). +// cb: A callback from the CheckTx command. +// It gets called from another goroutine. +// CONTRACT: Either cb will get called, or err returned. +func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { + return mem.CheckTxWithInfo(tx, cb, TxInfo{PeerID: UnknownPeerID}) +} + +// CheckTxWithInfo performs the same operation as CheckTx, but with extra meta data about the tx. +// Currently this metadata is the peer who sent it, +// used to prevent the tx from being gossiped back to them. +func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) { + mem.proxyMtx.Lock() + // use defer to unlock mutex because application (*local client*) might panic + defer mem.proxyMtx.Unlock() + + var ( + memSize = mem.Size() + txsBytes = mem.TxsBytes() + ) + if memSize >= mem.config.Size || + int64(len(tx))+txsBytes > mem.config.MaxTxsBytes { + return ErrMempoolIsFull{ + memSize, mem.config.Size, + txsBytes, mem.config.MaxTxsBytes} + } + + // The size of the corresponding amino-encoded TxMessage + // can't be larger than the maxMsgSize, otherwise we can't + // relay it to peers. + if len(tx) > maxTxSize { + return ErrTxTooLarge + } + + if mem.preCheck != nil { + if err := mem.preCheck(tx); err != nil { + return ErrPreCheck{err} + } + } + + // CACHE + if !mem.cache.Push(tx) { + // Record a new sender for a tx we've already seen. + // Note it's possible a tx is still in the cache but no longer in the mempool + // (eg. after committing a block, txs are removed from mempool but not cache), + // so we only record the sender for txs still in the mempool. + if e, ok := mem.txsMap.Load(txKey(tx)); ok { + memTx := e.(*clist.CElement).Value.(*mempoolTx) + if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded { + // TODO: consider punishing peer for dups, + // its non-trivial since invalid txs can become valid, + // but they can spam the same tx with little cost to them atm. + } + } + + return ErrTxInCache + } + // END CACHE + + // WAL + if mem.wal != nil { + // TODO: Notify administrators when WAL fails + _, err := mem.wal.Write([]byte(tx)) + if err != nil { + mem.logger.Error("Error writing to WAL", "err", err) + } + _, err = mem.wal.Write([]byte("\n")) + if err != nil { + mem.logger.Error("Error writing to WAL", "err", err) + } + } + // END WAL + + // NOTE: proxyAppConn may error if tx buffer is full + if err = mem.proxyAppConn.Error(); err != nil { + return err + } + + reqRes := mem.proxyAppConn.CheckTxAsync(tx) + reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID, cb)) + + return nil +} + +// Global callback that will be called after every ABCI response. +// Having a single global callback avoids needing to set a callback for each request. +// However, processing the checkTx response requires the peerID (so we can track which txs we heard from who), +// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that +// include this information. If we're not in the midst of a recheck, this function will just return, +// so the request specific callback can do the work. +// When rechecking, we don't need the peerID, so the recheck callback happens here. +func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { + if mem.recheckCursor == nil { + return + } + + mem.metrics.RecheckTimes.Add(1) + mem.resCbRecheck(req, res) + + // update metrics + mem.metrics.Size.Set(float64(mem.Size())) +} + +// Request specific callback that should be set on individual reqRes objects +// to incorporate local information when processing the response. +// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them. +// NOTE: alternatively, we could include this information in the ABCI request itself. +// +// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called +// when all other response processing is complete. +// +// Used in CheckTxWithInfo to record PeerID who sent us the tx. +func (mem *CListMempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Response)) func(res *abci.Response) { + return func(res *abci.Response) { + if mem.recheckCursor != nil { + // this should never happen + panic("recheck cursor is not nil in reqResCb") + } + + mem.resCbFirstTime(tx, peerID, res) + + // update metrics + mem.metrics.Size.Set(float64(mem.Size())) + + // passed in by the caller of CheckTx, eg. the RPC + if externalCb != nil { + externalCb(res) + } + } +} + +// Called from: +// - resCbFirstTime (lock not held) if tx is valid +func (mem *CListMempool) addTx(memTx *mempoolTx) { + e := mem.txs.PushBack(memTx) + mem.txsMap.Store(txKey(memTx.tx), e) + atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) + mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) +} + +// Called from: +// - Update (lock held) if tx was committed +// - resCbRecheck (lock not held) if tx was invalidated +func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) { + mem.txs.Remove(elem) + elem.DetachPrev() + mem.txsMap.Delete(txKey(tx)) + atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) + + if removeFromCache { + mem.cache.Remove(tx) + } +} + +// callback, which is called after the app checked the tx for the first time. +// +// The case where the app checks the tx for the second and subsequent times is +// handled by the resCbRecheck callback. +func (mem *CListMempool) resCbFirstTime(tx []byte, peerID uint16, res *abci.Response) { + switch r := res.Value.(type) { + case *abci.Response_CheckTx: + var postCheckErr error + if mem.postCheck != nil { + postCheckErr = mem.postCheck(tx, r.CheckTx) + } + if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { + memTx := &mempoolTx{ + height: mem.height, + gasWanted: r.CheckTx.GasWanted, + tx: tx, + } + memTx.senders.Store(peerID, true) + mem.addTx(memTx) + mem.logger.Info("Added good transaction", + "tx", txID(tx), + "res", r, + "height", memTx.height, + "total", mem.Size(), + ) + mem.notifyTxsAvailable() + } else { + // ignore bad transaction + mem.logger.Info("Rejected bad transaction", "tx", txID(tx), "res", r, "err", postCheckErr) + mem.metrics.FailedTxs.Add(1) + // remove from cache (it might be good later) + mem.cache.Remove(tx) + } + default: + // ignore other messages + } +} + +// callback, which is called after the app rechecked the tx. +// +// The case where the app checks the tx for the first time is handled by the +// resCbFirstTime callback. +func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { + switch r := res.Value.(type) { + case *abci.Response_CheckTx: + tx := req.GetCheckTx().Tx + memTx := mem.recheckCursor.Value.(*mempoolTx) + if !bytes.Equal(tx, memTx.tx) { + panic(fmt.Sprintf( + "Unexpected tx response from proxy during recheck\nExpected %X, got %X", + memTx.tx, + tx)) + } + var postCheckErr error + if mem.postCheck != nil { + postCheckErr = mem.postCheck(tx, r.CheckTx) + } + if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { + // Good, nothing to do. + } else { + // Tx became invalidated due to newly committed block. + mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr) + // NOTE: we remove tx from the cache because it might be good later + mem.removeTx(tx, mem.recheckCursor, true) + } + if mem.recheckCursor == mem.recheckEnd { + mem.recheckCursor = nil + } else { + mem.recheckCursor = mem.recheckCursor.Next() + } + if mem.recheckCursor == nil { + // Done! + atomic.StoreInt32(&mem.rechecking, 0) + mem.logger.Info("Done rechecking txs") + + // incase the recheck removed all txs + if mem.Size() > 0 { + mem.notifyTxsAvailable() + } + } + default: + // ignore other messages + } +} + +// TxsAvailable returns a channel which fires once for every height, +// and only when transactions are available in the mempool. +// NOTE: the returned channel may be nil if EnableTxsAvailable was not called. +func (mem *CListMempool) TxsAvailable() <-chan struct{} { + return mem.txsAvailable +} + +func (mem *CListMempool) notifyTxsAvailable() { + if mem.Size() == 0 { + panic("notified txs available but mempool is empty!") + } + if mem.txsAvailable != nil && !mem.notifiedTxsAvailable { + // channel cap is 1, so this will send once + mem.notifiedTxsAvailable = true + select { + case mem.txsAvailable <- struct{}{}: + default: + } + } +} + +// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes bytes total +// with the condition that the total gasWanted must be less than maxGas. +// If both maxes are negative, there is no cap on the size of all returned +// transactions (~ all available transactions). +func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { + mem.proxyMtx.Lock() + defer mem.proxyMtx.Unlock() + + for atomic.LoadInt32(&mem.rechecking) > 0 { + // TODO: Something better? + time.Sleep(time.Millisecond * 10) + } + + var totalBytes int64 + var totalGas int64 + // TODO: we will get a performance boost if we have a good estimate of avg + // size per tx, and set the initial capacity based off of that. + // txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max/mem.avgTxSize)) + txs := make([]types.Tx, 0, mem.txs.Len()) + for e := mem.txs.Front(); e != nil; e = e.Next() { + memTx := e.Value.(*mempoolTx) + // Check total size requirement + aminoOverhead := types.ComputeAminoOverhead(memTx.tx, 1) + if maxBytes > -1 && totalBytes+int64(len(memTx.tx))+aminoOverhead > maxBytes { + return txs + } + totalBytes += int64(len(memTx.tx)) + aminoOverhead + // Check total gas requirement. + // If maxGas is negative, skip this check. + // Since newTotalGas < masGas, which + // must be non-negative, it follows that this won't overflow. + newTotalGas := totalGas + memTx.gasWanted + if maxGas > -1 && newTotalGas > maxGas { + return txs + } + totalGas = newTotalGas + txs = append(txs, memTx.tx) + } + return txs +} + +// ReapMaxTxs reaps up to max transactions from the mempool. +// If max is negative, there is no cap on the size of all returned +// transactions (~ all available transactions). +func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { + mem.proxyMtx.Lock() + defer mem.proxyMtx.Unlock() + + if max < 0 { + max = mem.txs.Len() + } + + for atomic.LoadInt32(&mem.rechecking) > 0 { + // TODO: Something better? + time.Sleep(time.Millisecond * 10) + } + + txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max)) + for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() { + memTx := e.Value.(*mempoolTx) + txs = append(txs, memTx.tx) + } + return txs +} + +// Update informs the mempool that the given txs were committed and can be discarded. +// NOTE: this should be called *after* block is committed by consensus. +// NOTE: unsafe; Lock/Unlock must be managed by caller +func (mem *CListMempool) Update( + height int64, + txs types.Txs, + preCheck PreCheckFunc, + postCheck PostCheckFunc, +) error { + // Set height + mem.height = height + mem.notifiedTxsAvailable = false + + if preCheck != nil { + mem.preCheck = preCheck + } + if postCheck != nil { + mem.postCheck = postCheck + } + + // Add committed transactions to cache (if missing). + for _, tx := range txs { + _ = mem.cache.Push(tx) + } + + // Remove committed transactions. + txsLeft := mem.removeTxs(txs) + + // Either recheck non-committed txs to see if they became invalid + // or just notify there're some txs left. + if len(txsLeft) > 0 { + if mem.config.Recheck { + mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height) + mem.recheckTxs(txsLeft) + // At this point, mem.txs are being rechecked. + // mem.recheckCursor re-scans mem.txs and possibly removes some txs. + // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. + } else { + mem.notifyTxsAvailable() + } + } + + // Update metrics + mem.metrics.Size.Set(float64(mem.Size())) + + return nil +} + +func (mem *CListMempool) removeTxs(txs types.Txs) []types.Tx { + // Build a map for faster lookups. + txsMap := make(map[string]struct{}, len(txs)) + for _, tx := range txs { + txsMap[string(tx)] = struct{}{} + } + + txsLeft := make([]types.Tx, 0, mem.txs.Len()) + for e := mem.txs.Front(); e != nil; e = e.Next() { + memTx := e.Value.(*mempoolTx) + // Remove the tx if it's already in a block. + if _, ok := txsMap[string(memTx.tx)]; ok { + // NOTE: we don't remove committed txs from the cache. + mem.removeTx(memTx.tx, e, false) + + continue + } + txsLeft = append(txsLeft, memTx.tx) + } + return txsLeft +} + +// NOTE: pass in txs because mem.txs can mutate concurrently. +func (mem *CListMempool) recheckTxs(txs []types.Tx) { + if len(txs) == 0 { + return + } + atomic.StoreInt32(&mem.rechecking, 1) + mem.recheckCursor = mem.txs.Front() + mem.recheckEnd = mem.txs.Back() + + // Push txs to proxyAppConn + // NOTE: globalCb may be called concurrently. + for _, tx := range txs { + mem.proxyAppConn.CheckTxAsync(tx) + } + mem.proxyAppConn.FlushAsync() +} + +//-------------------------------------------------------------------------------- + +// mempoolTx is a transaction that successfully ran +type mempoolTx struct { + height int64 // height that this tx had been validated in + gasWanted int64 // amount of gas this tx states it will require + tx types.Tx // + + // ids of peers who've sent us this tx (as a map for quick lookups). + // senders: PeerID -> bool + senders sync.Map +} + +// Height returns the height for this transaction +func (memTx *mempoolTx) Height() int64 { + return atomic.LoadInt64(&memTx.height) +} + +//-------------------------------------------------------------------------------- + +type txCache interface { + Reset() + Push(tx types.Tx) bool + Remove(tx types.Tx) +} + +// mapTxCache maintains a LRU cache of transactions. This only stores the hash +// of the tx, due to memory concerns. +type mapTxCache struct { + mtx sync.Mutex + size int + map_ map[[sha256.Size]byte]*list.Element + list *list.List +} + +var _ txCache = (*mapTxCache)(nil) + +// newMapTxCache returns a new mapTxCache. +func newMapTxCache(cacheSize int) *mapTxCache { + return &mapTxCache{ + size: cacheSize, + map_: make(map[[sha256.Size]byte]*list.Element, cacheSize), + list: list.New(), + } +} + +// Reset resets the cache to an empty state. +func (cache *mapTxCache) Reset() { + cache.mtx.Lock() + cache.map_ = make(map[[sha256.Size]byte]*list.Element, cache.size) + cache.list.Init() + cache.mtx.Unlock() +} + +// Push adds the given tx to the cache and returns true. It returns +// false if tx is already in the cache. +func (cache *mapTxCache) Push(tx types.Tx) bool { + cache.mtx.Lock() + defer cache.mtx.Unlock() + + // Use the tx hash in the cache + txHash := txKey(tx) + if moved, exists := cache.map_[txHash]; exists { + cache.list.MoveToBack(moved) + return false + } + + if cache.list.Len() >= cache.size { + popped := cache.list.Front() + poppedTxHash := popped.Value.([sha256.Size]byte) + delete(cache.map_, poppedTxHash) + if popped != nil { + cache.list.Remove(popped) + } + } + e := cache.list.PushBack(txHash) + cache.map_[txHash] = e + return true +} + +// Remove removes the given tx from the cache. +func (cache *mapTxCache) Remove(tx types.Tx) { + cache.mtx.Lock() + txHash := txKey(tx) + popped := cache.map_[txHash] + delete(cache.map_, txHash) + if popped != nil { + cache.list.Remove(popped) + } + + cache.mtx.Unlock() +} + +type nopTxCache struct{} + +var _ txCache = (*nopTxCache)(nil) + +func (nopTxCache) Reset() {} +func (nopTxCache) Push(types.Tx) bool { return true } +func (nopTxCache) Remove(types.Tx) {} + +//-------------------------------------------------------------------------------- + +// txKey is the fixed length array sha256 hash used as the key in maps. +func txKey(tx types.Tx) [sha256.Size]byte { + return sha256.Sum256(tx) +} + +// txID is the hex encoded hash of the bytes as a types.Tx. +func txID(tx []byte) string { + return fmt.Sprintf("%X", types.Tx(tx).Hash()) +} diff --git a/mempool/mempool_test.go b/mempool/clist_mempool_test.go similarity index 97% rename from mempool/mempool_test.go rename to mempool/clist_mempool_test.go index d5f25396d..e3f98230d 100644 --- a/mempool/mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -32,18 +32,18 @@ import ( // test. type cleanupFunc func() -func newMempoolWithApp(cc proxy.ClientCreator) (*Mempool, cleanupFunc) { +func newMempoolWithApp(cc proxy.ClientCreator) (*CListMempool, cleanupFunc) { return newMempoolWithAppAndConfig(cc, cfg.ResetTestRoot("mempool_test")) } -func newMempoolWithAppAndConfig(cc proxy.ClientCreator, config *cfg.Config) (*Mempool, cleanupFunc) { +func newMempoolWithAppAndConfig(cc proxy.ClientCreator, config *cfg.Config) (*CListMempool, cleanupFunc) { appConnMem, _ := cc.NewABCIClient() appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) err := appConnMem.Start() if err != nil { panic(err) } - mempool := NewMempool(config.Mempool, appConnMem, 0) + mempool := NewCListMempool(config.Mempool, appConnMem, 0) mempool.SetLogger(log.TestingLogger()) return mempool, func() { os.RemoveAll(config.RootDir) } } @@ -66,7 +66,7 @@ func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) { } } -func checkTxs(t *testing.T, mempool *Mempool, count int, peerID uint16) types.Txs { +func checkTxs(t *testing.T, mempool Mempool, count int, peerID uint16) types.Txs { txs := make(types.Txs, count) txInfo := TxInfo{PeerID: peerID} for i := 0; i < count; i++ { @@ -348,7 +348,6 @@ func TestMempoolCloseWAL(t *testing.T) { // 1. Create the temporary directory for mempool and WAL testing. rootDir, err := ioutil.TempDir("", "mempool-test") require.Nil(t, err, "expecting successful tmpdir creation") - defer os.RemoveAll(rootDir) // 2. Ensure that it doesn't contain any elements -- Sanity check m1, err := filepath.Glob(filepath.Join(rootDir, "*")) @@ -356,13 +355,13 @@ func TestMempoolCloseWAL(t *testing.T) { require.Equal(t, 0, len(m1), "no matches yet") // 3. Create the mempool - wcfg := cfg.DefaultMempoolConfig() - wcfg.RootDir = rootDir - defer os.RemoveAll(wcfg.RootDir) + wcfg := cfg.DefaultConfig() + wcfg.Mempool.RootDir = rootDir app := kvstore.NewKVStoreApplication() cc := proxy.NewLocalClientCreator(app) - appConnMem, _ := cc.NewABCIClient() - mempool := NewMempool(wcfg, appConnMem, 10) + mempool, cleanup := newMempoolWithAppAndConfig(cc, wcfg) + defer cleanup() + mempool.height = 10 mempool.InitWAL() // 4. Ensure that the directory contains the WAL file diff --git a/mempool/doc.go b/mempool/doc.go new file mode 100644 index 000000000..ddd47aa2d --- /dev/null +++ b/mempool/doc.go @@ -0,0 +1,24 @@ +// The mempool pushes new txs onto the proxyAppConn. +// It gets a stream of (req, res) tuples from the proxy. +// The mempool stores good txs in a concurrent linked-list. + +// Multiple concurrent go-routines can traverse this linked-list +// safely by calling .NextWait() on each element. + +// So we have several go-routines: +// 1. Consensus calling Update() and Reap() synchronously +// 2. Many mempool reactor's peer routines calling CheckTx() +// 3. Many mempool reactor's peer routines traversing the txs linked list +// 4. Another goroutine calling GarbageCollectTxs() periodically + +// To manage these goroutines, there are three methods of locking. +// 1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe) +// 2. Mutations to the linked-list elements are atomic +// 3. CheckTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx + +// Garbage collection of old elements from mempool.txs is handlde via +// the DetachPrev() call, which makes old elements not reachable by +// peer broadcastTxRoutine() automatically garbage collected. + +// TODO: Better handle abci client errors. (make it automatically handle connection errors) +package mempool diff --git a/mempool/errors.go b/mempool/errors.go new file mode 100644 index 000000000..ac2a9b3c2 --- /dev/null +++ b/mempool/errors.go @@ -0,0 +1,46 @@ +package mempool + +import ( + "fmt" + + "github.com/pkg/errors" +) + +var ( + // ErrTxInCache is returned to the client if we saw tx earlier + ErrTxInCache = errors.New("Tx already exists in cache") + + // ErrTxTooLarge means the tx is too big to be sent in a message to other peers + ErrTxTooLarge = fmt.Errorf("Tx too large. Max size is %d", maxTxSize) +) + +// ErrMempoolIsFull means Tendermint & an application can't handle that much load +type ErrMempoolIsFull struct { + numTxs int + maxTxs int + + txsBytes int64 + maxTxsBytes int64 +} + +func (e ErrMempoolIsFull) Error() string { + return fmt.Sprintf( + "mempool is full: number of txs %d (max: %d), total txs bytes %d (max: %d)", + e.numTxs, e.maxTxs, + e.txsBytes, e.maxTxsBytes) +} + +// ErrPreCheck is returned when tx is too big +type ErrPreCheck struct { + Reason error +} + +func (e ErrPreCheck) Error() string { + return e.Reason.Error() +} + +// IsPreCheckError returns true if err is due to pre check failure. +func IsPreCheckError(err error) bool { + _, ok := err.(ErrPreCheck) + return ok +} diff --git a/mempool/mempool.go b/mempool/mempool.go index a5b14466a..8f3c0e63a 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -1,26 +1,34 @@ package mempool import ( - "bytes" - "container/list" - "crypto/sha256" "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/pkg/errors" abci "github.com/tendermint/tendermint/abci/types" - cfg "github.com/tendermint/tendermint/config" - auto "github.com/tendermint/tendermint/libs/autofile" - "github.com/tendermint/tendermint/libs/clist" - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) +// Mempool defines the mempool interface. +// Updates to the mempool need to be synchronized with committing a block so +// apps can reset their transient state on Commit. +type Mempool interface { + Lock() + Unlock() + + Size() int + + CheckTx(types.Tx, func(*abci.Response)) error + CheckTxWithInfo(types.Tx, func(*abci.Response), TxInfo) error + + ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs + + Update(int64, types.Txs, PreCheckFunc, PostCheckFunc) error + Flush() + FlushAppConn() error + + TxsAvailable() <-chan struct{} + EnableTxsAvailable() +} + // PreCheckFunc is an optional filter executed before CheckTx and rejects // transaction if false is returned. An example would be to ensure that a // transaction doesn't exceeded the block size. @@ -39,72 +47,7 @@ type TxInfo struct { PeerID uint16 } -/* - -The mempool pushes new txs onto the proxyAppConn. -It gets a stream of (req, res) tuples from the proxy. -The mempool stores good txs in a concurrent linked-list. - -Multiple concurrent go-routines can traverse this linked-list -safely by calling .NextWait() on each element. - -So we have several go-routines: -1. Consensus calling Update() and Reap() synchronously -2. Many mempool reactor's peer routines calling CheckTx() -3. Many mempool reactor's peer routines traversing the txs linked list -4. Another goroutine calling GarbageCollectTxs() periodically - -To manage these goroutines, there are three methods of locking. -1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe) -2. Mutations to the linked-list elements are atomic -3. CheckTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx - -Garbage collection of old elements from mempool.txs is handlde via -the DetachPrev() call, which makes old elements not reachable by -peer broadcastTxRoutine() automatically garbage collected. - -TODO: Better handle abci client errors. (make it automatically handle connection errors) - -*/ - -var ( - // ErrTxInCache is returned to the client if we saw tx earlier - ErrTxInCache = errors.New("Tx already exists in cache") - - // ErrTxTooLarge means the tx is too big to be sent in a message to other peers - ErrTxTooLarge = fmt.Errorf("Tx too large. Max size is %d", maxTxSize) -) - -// ErrMempoolIsFull means Tendermint & an application can't handle that much load -type ErrMempoolIsFull struct { - numTxs int - maxTxs int - - txsBytes int64 - maxTxsBytes int64 -} - -func (e ErrMempoolIsFull) Error() string { - return fmt.Sprintf( - "Mempool is full: number of txs %d (max: %d), total txs bytes %d (max: %d)", - e.numTxs, e.maxTxs, - e.txsBytes, e.maxTxsBytes) -} - -// ErrPreCheck is returned when tx is too big -type ErrPreCheck struct { - Reason error -} - -func (e ErrPreCheck) Error() string { - return e.Reason.Error() -} - -// IsPreCheckError returns true if err is due to pre check failure. -func IsPreCheckError(err error) bool { - _, ok := err.(ErrPreCheck) - return ok -} +//-------------------------------------------------------------------------------- // PreCheckAminoMaxBytes checks that the size of the transaction plus the amino // overhead is smaller or equal to the expected maxBytes. @@ -143,718 +86,3 @@ func PostCheckMaxGas(maxGas int64) PostCheckFunc { return nil } } - -// TxID is the hex encoded hash of the bytes as a types.Tx. -func TxID(tx []byte) string { - return fmt.Sprintf("%X", types.Tx(tx).Hash()) -} - -// txKey is the fixed length array sha256 hash used as the key in maps. -func txKey(tx types.Tx) [sha256.Size]byte { - return sha256.Sum256(tx) -} - -// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus -// round. Transaction validity is checked using the CheckTx abci message before the transaction is -// added to the pool. The Mempool uses a concurrent list structure for storing transactions that -// can be efficiently accessed by multiple concurrent readers. -type Mempool struct { - config *cfg.MempoolConfig - - proxyMtx sync.Mutex - proxyAppConn proxy.AppConnMempool - txs *clist.CList // concurrent linked-list of good txs - preCheck PreCheckFunc - postCheck PostCheckFunc - - // Track whether we're rechecking txs. - // These are not protected by a mutex and are expected to be mutated - // in serial (ie. by abci responses which are called in serial). - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here - - // notify listeners (ie. consensus) when txs are available - notifiedTxsAvailable bool - txsAvailable chan struct{} // fires once for each height, when the mempool is not empty - - // Map for quick access to txs to record sender in CheckTx. - // txsMap: txKey -> CElement - txsMap sync.Map - - // Atomic integers - height int64 // the last block Update()'d to - rechecking int32 // for re-checking filtered txs on Update() - txsBytes int64 // total size of mempool, in bytes - - // Keep a cache of already-seen txs. - // This reduces the pressure on the proxyApp. - cache txCache - - // A log of mempool txs - wal *auto.AutoFile - - logger log.Logger - - metrics *Metrics -} - -// MempoolOption sets an optional parameter on the Mempool. -type MempoolOption func(*Mempool) - -// NewMempool returns a new Mempool with the given configuration and connection to an application. -func NewMempool( - config *cfg.MempoolConfig, - proxyAppConn proxy.AppConnMempool, - height int64, - options ...MempoolOption, -) *Mempool { - mempool := &Mempool{ - config: config, - proxyAppConn: proxyAppConn, - txs: clist.New(), - height: height, - rechecking: 0, - recheckCursor: nil, - recheckEnd: nil, - logger: log.NewNopLogger(), - metrics: NopMetrics(), - } - if config.CacheSize > 0 { - mempool.cache = newMapTxCache(config.CacheSize) - } else { - mempool.cache = nopTxCache{} - } - proxyAppConn.SetResponseCallback(mempool.globalCb) - for _, option := range options { - option(mempool) - } - return mempool -} - -// EnableTxsAvailable initializes the TxsAvailable channel, -// ensuring it will trigger once every height when transactions are available. -// NOTE: not thread safe - should only be called once, on startup -func (mem *Mempool) EnableTxsAvailable() { - mem.txsAvailable = make(chan struct{}, 1) -} - -// SetLogger sets the Logger. -func (mem *Mempool) SetLogger(l log.Logger) { - mem.logger = l -} - -// WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns -// false. This is ran before CheckTx. -func WithPreCheck(f PreCheckFunc) MempoolOption { - return func(mem *Mempool) { mem.preCheck = f } -} - -// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns -// false. This is ran after CheckTx. -func WithPostCheck(f PostCheckFunc) MempoolOption { - return func(mem *Mempool) { mem.postCheck = f } -} - -// WithMetrics sets the metrics. -func WithMetrics(metrics *Metrics) MempoolOption { - return func(mem *Mempool) { mem.metrics = metrics } -} - -// InitWAL creates a directory for the WAL file and opens a file itself. -// -// *panics* if can't create directory or open file. -// *not thread safe* -func (mem *Mempool) InitWAL() { - walDir := mem.config.WalDir() - err := cmn.EnsureDir(walDir, 0700) - if err != nil { - panic(errors.Wrap(err, "Error ensuring Mempool WAL dir")) - } - af, err := auto.OpenAutoFile(walDir + "/wal") - if err != nil { - panic(errors.Wrap(err, "Error opening Mempool WAL file")) - } - mem.wal = af -} - -// CloseWAL closes and discards the underlying WAL file. -// Any further writes will not be relayed to disk. -func (mem *Mempool) CloseWAL() { - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() - - if err := mem.wal.Close(); err != nil { - mem.logger.Error("Error closing WAL", "err", err) - } - mem.wal = nil -} - -// Lock locks the mempool. The consensus must be able to hold lock to safely update. -func (mem *Mempool) Lock() { - mem.proxyMtx.Lock() -} - -// Unlock unlocks the mempool. -func (mem *Mempool) Unlock() { - mem.proxyMtx.Unlock() -} - -// Size returns the number of transactions in the mempool. -func (mem *Mempool) Size() int { - return mem.txs.Len() -} - -// TxsBytes returns the total size of all txs in the mempool. -func (mem *Mempool) TxsBytes() int64 { - return atomic.LoadInt64(&mem.txsBytes) -} - -// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are -// done. E.g. from CheckTx. -func (mem *Mempool) FlushAppConn() error { - return mem.proxyAppConn.FlushSync() -} - -// Flush removes all transactions from the mempool and cache -func (mem *Mempool) Flush() { - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() - - mem.cache.Reset() - - for e := mem.txs.Front(); e != nil; e = e.Next() { - mem.txs.Remove(e) - e.DetachPrev() - } - - mem.txsMap = sync.Map{} - _ = atomic.SwapInt64(&mem.txsBytes, 0) -} - -// TxsFront returns the first transaction in the ordered list for peer -// goroutines to call .NextWait() on. -func (mem *Mempool) TxsFront() *clist.CElement { - return mem.txs.Front() -} - -// TxsWaitChan returns a channel to wait on transactions. It will be closed -// once the mempool is not empty (ie. the internal `mem.txs` has at least one -// element) -func (mem *Mempool) TxsWaitChan() <-chan struct{} { - return mem.txs.WaitChan() -} - -// CheckTx executes a new transaction against the application to determine its validity -// and whether it should be added to the mempool. -// It blocks if we're waiting on Update() or Reap(). -// cb: A callback from the CheckTx command. -// It gets called from another goroutine. -// CONTRACT: Either cb will get called, or err returned. -func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { - return mem.CheckTxWithInfo(tx, cb, TxInfo{PeerID: UnknownPeerID}) -} - -// CheckTxWithInfo performs the same operation as CheckTx, but with extra meta data about the tx. -// Currently this metadata is the peer who sent it, -// used to prevent the tx from being gossiped back to them. -func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) (err error) { - mem.proxyMtx.Lock() - // use defer to unlock mutex because application (*local client*) might panic - defer mem.proxyMtx.Unlock() - - var ( - memSize = mem.Size() - txsBytes = mem.TxsBytes() - ) - if memSize >= mem.config.Size || - int64(len(tx))+txsBytes > mem.config.MaxTxsBytes { - return ErrMempoolIsFull{ - memSize, mem.config.Size, - txsBytes, mem.config.MaxTxsBytes} - } - - // The size of the corresponding amino-encoded TxMessage - // can't be larger than the maxMsgSize, otherwise we can't - // relay it to peers. - if len(tx) > maxTxSize { - return ErrTxTooLarge - } - - if mem.preCheck != nil { - if err := mem.preCheck(tx); err != nil { - return ErrPreCheck{err} - } - } - - // CACHE - if !mem.cache.Push(tx) { - // Record a new sender for a tx we've already seen. - // Note it's possible a tx is still in the cache but no longer in the mempool - // (eg. after committing a block, txs are removed from mempool but not cache), - // so we only record the sender for txs still in the mempool. - if e, ok := mem.txsMap.Load(txKey(tx)); ok { - memTx := e.(*clist.CElement).Value.(*mempoolTx) - if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded { - // TODO: consider punishing peer for dups, - // its non-trivial since invalid txs can become valid, - // but they can spam the same tx with little cost to them atm. - } - } - - return ErrTxInCache - } - // END CACHE - - // WAL - if mem.wal != nil { - // TODO: Notify administrators when WAL fails - _, err := mem.wal.Write([]byte(tx)) - if err != nil { - mem.logger.Error("Error writing to WAL", "err", err) - } - _, err = mem.wal.Write([]byte("\n")) - if err != nil { - mem.logger.Error("Error writing to WAL", "err", err) - } - } - // END WAL - - // NOTE: proxyAppConn may error if tx buffer is full - if err = mem.proxyAppConn.Error(); err != nil { - return err - } - - reqRes := mem.proxyAppConn.CheckTxAsync(tx) - reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID, cb)) - - return nil -} - -// Global callback that will be called after every ABCI response. -// Having a single global callback avoids needing to set a callback for each request. -// However, processing the checkTx response requires the peerID (so we can track which txs we heard from who), -// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that -// include this information. If we're not in the midst of a recheck, this function will just return, -// so the request specific callback can do the work. -// When rechecking, we don't need the peerID, so the recheck callback happens here. -func (mem *Mempool) globalCb(req *abci.Request, res *abci.Response) { - if mem.recheckCursor == nil { - return - } - - mem.metrics.RecheckTimes.Add(1) - mem.resCbRecheck(req, res) - - // update metrics - mem.metrics.Size.Set(float64(mem.Size())) -} - -// Request specific callback that should be set on individual reqRes objects -// to incorporate local information when processing the response. -// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them. -// NOTE: alternatively, we could include this information in the ABCI request itself. -// -// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called -// when all other response processing is complete. -// -// Used in CheckTxWithInfo to record PeerID who sent us the tx. -func (mem *Mempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Response)) func(res *abci.Response) { - return func(res *abci.Response) { - if mem.recheckCursor != nil { - // this should never happen - panic("recheck cursor is not nil in reqResCb") - } - - mem.resCbFirstTime(tx, peerID, res) - - // update metrics - mem.metrics.Size.Set(float64(mem.Size())) - - // passed in by the caller of CheckTx, eg. the RPC - if externalCb != nil { - externalCb(res) - } - } -} - -// Called from: -// - resCbFirstTime (lock not held) if tx is valid -func (mem *Mempool) addTx(memTx *mempoolTx) { - e := mem.txs.PushBack(memTx) - mem.txsMap.Store(txKey(memTx.tx), e) - atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) - mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) -} - -// Called from: -// - Update (lock held) if tx was committed -// - resCbRecheck (lock not held) if tx was invalidated -func (mem *Mempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) { - mem.txs.Remove(elem) - elem.DetachPrev() - mem.txsMap.Delete(txKey(tx)) - atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) - - if removeFromCache { - mem.cache.Remove(tx) - } -} - -// callback, which is called after the app checked the tx for the first time. -// -// The case where the app checks the tx for the second and subsequent times is -// handled by the resCbRecheck callback. -func (mem *Mempool) resCbFirstTime(tx []byte, peerID uint16, res *abci.Response) { - switch r := res.Value.(type) { - case *abci.Response_CheckTx: - var postCheckErr error - if mem.postCheck != nil { - postCheckErr = mem.postCheck(tx, r.CheckTx) - } - if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { - memTx := &mempoolTx{ - height: mem.height, - gasWanted: r.CheckTx.GasWanted, - tx: tx, - } - memTx.senders.Store(peerID, true) - mem.addTx(memTx) - mem.logger.Info("Added good transaction", - "tx", TxID(tx), - "res", r, - "height", memTx.height, - "total", mem.Size(), - ) - mem.notifyTxsAvailable() - } else { - // ignore bad transaction - mem.logger.Info("Rejected bad transaction", "tx", TxID(tx), "res", r, "err", postCheckErr) - mem.metrics.FailedTxs.Add(1) - // remove from cache (it might be good later) - mem.cache.Remove(tx) - } - default: - // ignore other messages - } -} - -// callback, which is called after the app rechecked the tx. -// -// The case where the app checks the tx for the first time is handled by the -// resCbFirstTime callback. -func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { - switch r := res.Value.(type) { - case *abci.Response_CheckTx: - tx := req.GetCheckTx().Tx - memTx := mem.recheckCursor.Value.(*mempoolTx) - if !bytes.Equal(tx, memTx.tx) { - panic(fmt.Sprintf( - "Unexpected tx response from proxy during recheck\nExpected %X, got %X", - memTx.tx, - tx)) - } - var postCheckErr error - if mem.postCheck != nil { - postCheckErr = mem.postCheck(tx, r.CheckTx) - } - if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { - // Good, nothing to do. - } else { - // Tx became invalidated due to newly committed block. - mem.logger.Info("Tx is no longer valid", "tx", TxID(tx), "res", r, "err", postCheckErr) - // NOTE: we remove tx from the cache because it might be good later - mem.removeTx(tx, mem.recheckCursor, true) - } - if mem.recheckCursor == mem.recheckEnd { - mem.recheckCursor = nil - } else { - mem.recheckCursor = mem.recheckCursor.Next() - } - if mem.recheckCursor == nil { - // Done! - atomic.StoreInt32(&mem.rechecking, 0) - mem.logger.Info("Done rechecking txs") - - // incase the recheck removed all txs - if mem.Size() > 0 { - mem.notifyTxsAvailable() - } - } - default: - // ignore other messages - } -} - -// TxsAvailable returns a channel which fires once for every height, -// and only when transactions are available in the mempool. -// NOTE: the returned channel may be nil if EnableTxsAvailable was not called. -func (mem *Mempool) TxsAvailable() <-chan struct{} { - return mem.txsAvailable -} - -func (mem *Mempool) notifyTxsAvailable() { - if mem.Size() == 0 { - panic("notified txs available but mempool is empty!") - } - if mem.txsAvailable != nil && !mem.notifiedTxsAvailable { - // channel cap is 1, so this will send once - mem.notifiedTxsAvailable = true - select { - case mem.txsAvailable <- struct{}{}: - default: - } - } -} - -// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes bytes total -// with the condition that the total gasWanted must be less than maxGas. -// If both maxes are negative, there is no cap on the size of all returned -// transactions (~ all available transactions). -func (mem *Mempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() - - for atomic.LoadInt32(&mem.rechecking) > 0 { - // TODO: Something better? - time.Sleep(time.Millisecond * 10) - } - - var totalBytes int64 - var totalGas int64 - // TODO: we will get a performance boost if we have a good estimate of avg - // size per tx, and set the initial capacity based off of that. - // txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max/mem.avgTxSize)) - txs := make([]types.Tx, 0, mem.txs.Len()) - for e := mem.txs.Front(); e != nil; e = e.Next() { - memTx := e.Value.(*mempoolTx) - // Check total size requirement - aminoOverhead := types.ComputeAminoOverhead(memTx.tx, 1) - if maxBytes > -1 && totalBytes+int64(len(memTx.tx))+aminoOverhead > maxBytes { - return txs - } - totalBytes += int64(len(memTx.tx)) + aminoOverhead - // Check total gas requirement. - // If maxGas is negative, skip this check. - // Since newTotalGas < masGas, which - // must be non-negative, it follows that this won't overflow. - newTotalGas := totalGas + memTx.gasWanted - if maxGas > -1 && newTotalGas > maxGas { - return txs - } - totalGas = newTotalGas - txs = append(txs, memTx.tx) - } - return txs -} - -// ReapMaxTxs reaps up to max transactions from the mempool. -// If max is negative, there is no cap on the size of all returned -// transactions (~ all available transactions). -func (mem *Mempool) ReapMaxTxs(max int) types.Txs { - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() - - if max < 0 { - max = mem.txs.Len() - } - - for atomic.LoadInt32(&mem.rechecking) > 0 { - // TODO: Something better? - time.Sleep(time.Millisecond * 10) - } - - txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max)) - for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() { - memTx := e.Value.(*mempoolTx) - txs = append(txs, memTx.tx) - } - return txs -} - -// Update informs the mempool that the given txs were committed and can be discarded. -// NOTE: this should be called *after* block is committed by consensus. -// NOTE: unsafe; Lock/Unlock must be managed by caller -func (mem *Mempool) Update( - height int64, - txs types.Txs, - preCheck PreCheckFunc, - postCheck PostCheckFunc, -) error { - // Set height - mem.height = height - mem.notifiedTxsAvailable = false - - if preCheck != nil { - mem.preCheck = preCheck - } - if postCheck != nil { - mem.postCheck = postCheck - } - - // Add committed transactions to cache (if missing). - for _, tx := range txs { - _ = mem.cache.Push(tx) - } - - // Remove committed transactions. - txsLeft := mem.removeTxs(txs) - - // Either recheck non-committed txs to see if they became invalid - // or just notify there're some txs left. - if len(txsLeft) > 0 { - if mem.config.Recheck { - mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height) - mem.recheckTxs(txsLeft) - // At this point, mem.txs are being rechecked. - // mem.recheckCursor re-scans mem.txs and possibly removes some txs. - // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. - } else { - mem.notifyTxsAvailable() - } - } - - // Update metrics - mem.metrics.Size.Set(float64(mem.Size())) - - return nil -} - -func (mem *Mempool) removeTxs(txs types.Txs) []types.Tx { - // Build a map for faster lookups. - txsMap := make(map[string]struct{}, len(txs)) - for _, tx := range txs { - txsMap[string(tx)] = struct{}{} - } - - txsLeft := make([]types.Tx, 0, mem.txs.Len()) - for e := mem.txs.Front(); e != nil; e = e.Next() { - memTx := e.Value.(*mempoolTx) - // Remove the tx if it's already in a block. - if _, ok := txsMap[string(memTx.tx)]; ok { - // NOTE: we don't remove committed txs from the cache. - mem.removeTx(memTx.tx, e, false) - - continue - } - txsLeft = append(txsLeft, memTx.tx) - } - return txsLeft -} - -// NOTE: pass in txs because mem.txs can mutate concurrently. -func (mem *Mempool) recheckTxs(txs []types.Tx) { - if len(txs) == 0 { - return - } - atomic.StoreInt32(&mem.rechecking, 1) - mem.recheckCursor = mem.txs.Front() - mem.recheckEnd = mem.txs.Back() - - // Push txs to proxyAppConn - // NOTE: globalCb may be called concurrently. - for _, tx := range txs { - mem.proxyAppConn.CheckTxAsync(tx) - } - mem.proxyAppConn.FlushAsync() -} - -//-------------------------------------------------------------------------------- - -// mempoolTx is a transaction that successfully ran -type mempoolTx struct { - height int64 // height that this tx had been validated in - gasWanted int64 // amount of gas this tx states it will require - tx types.Tx // - - // ids of peers who've sent us this tx (as a map for quick lookups). - // senders: PeerID -> bool - senders sync.Map -} - -// Height returns the height for this transaction -func (memTx *mempoolTx) Height() int64 { - return atomic.LoadInt64(&memTx.height) -} - -//-------------------------------------------------------------------------------- - -type txCache interface { - Reset() - Push(tx types.Tx) bool - Remove(tx types.Tx) -} - -// mapTxCache maintains a LRU cache of transactions. This only stores the hash -// of the tx, due to memory concerns. -type mapTxCache struct { - mtx sync.Mutex - size int - map_ map[[sha256.Size]byte]*list.Element - list *list.List -} - -var _ txCache = (*mapTxCache)(nil) - -// newMapTxCache returns a new mapTxCache. -func newMapTxCache(cacheSize int) *mapTxCache { - return &mapTxCache{ - size: cacheSize, - map_: make(map[[sha256.Size]byte]*list.Element, cacheSize), - list: list.New(), - } -} - -// Reset resets the cache to an empty state. -func (cache *mapTxCache) Reset() { - cache.mtx.Lock() - cache.map_ = make(map[[sha256.Size]byte]*list.Element, cache.size) - cache.list.Init() - cache.mtx.Unlock() -} - -// Push adds the given tx to the cache and returns true. It returns -// false if tx is already in the cache. -func (cache *mapTxCache) Push(tx types.Tx) bool { - cache.mtx.Lock() - defer cache.mtx.Unlock() - - // Use the tx hash in the cache - txHash := txKey(tx) - if moved, exists := cache.map_[txHash]; exists { - cache.list.MoveToBack(moved) - return false - } - - if cache.list.Len() >= cache.size { - popped := cache.list.Front() - poppedTxHash := popped.Value.([sha256.Size]byte) - delete(cache.map_, poppedTxHash) - if popped != nil { - cache.list.Remove(popped) - } - } - e := cache.list.PushBack(txHash) - cache.map_[txHash] = e - return true -} - -// Remove removes the given tx from the cache. -func (cache *mapTxCache) Remove(tx types.Tx) { - cache.mtx.Lock() - txHash := txKey(tx) - popped := cache.map_[txHash] - delete(cache.map_, txHash) - if popped != nil { - cache.list.Remove(popped) - } - - cache.mtx.Unlock() -} - -type nopTxCache struct{} - -var _ txCache = (*nopTxCache)(nil) - -func (nopTxCache) Reset() {} -func (nopTxCache) Push(types.Tx) bool { return true } -func (nopTxCache) Remove(types.Tx) {} diff --git a/mempool/mock/mempool.go b/mempool/mock/mempool.go new file mode 100644 index 000000000..97cc79e6a --- /dev/null +++ b/mempool/mock/mempool.go @@ -0,0 +1,36 @@ +package mock + +import ( + abci "github.com/tendermint/tendermint/abci/types" + mempl "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/types" +) + +// Mempool is an empty implementation of a Mempool, useful for testing. +type Mempool struct{} + +var _ mempl.Mempool = Mempool{} + +func (Mempool) Lock() {} +func (Mempool) Unlock() {} +func (Mempool) Size() int { return 0 } +func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error { + return nil +} +func (Mempool) CheckTxWithInfo(_ types.Tx, _ func(*abci.Response), + _ mempl.TxInfo) error { + return nil +} +func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } +func (Mempool) Update( + _ int64, + _ types.Txs, + _ mempl.PreCheckFunc, + _ mempl.PostCheckFunc, +) error { + return nil +} +func (Mempool) Flush() {} +func (Mempool) FlushAppConn() error { return nil } +func (Mempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } +func (Mempool) EnableTxsAvailable() {} diff --git a/mempool/reactor.go b/mempool/reactor.go index e1376b287..d7581ae68 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -31,13 +31,25 @@ const ( maxActiveIDs = math.MaxUint16 ) +// MempoolWithWait extends the standard Mempool interface to allow reactor to +// wait for transactions and iterate on them once there are any. Also it +// includes ReapMaxTxs function, which is useful for testing. +// +// UNSTABLE +type MempoolWithWait interface { + Mempool + TxsFront() *clist.CElement + TxsWaitChan() <-chan struct{} + ReapMaxTxs(n int) types.Txs +} + // MempoolReactor handles mempool tx broadcasting amongst peers. // It maintains a map from peer ID to counter, to prevent gossiping txs to the // peers you received it from. type MempoolReactor struct { p2p.BaseReactor config *cfg.MempoolConfig - Mempool *Mempool + Mempool MempoolWithWait ids *mempoolIDs } @@ -105,7 +117,7 @@ func newMempoolIDs() *mempoolIDs { } // NewMempoolReactor returns a new MempoolReactor with the given config and mempool. -func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor { +func NewMempoolReactor(config *cfg.MempoolConfig, mempool MempoolWithWait) *MempoolReactor { memR := &MempoolReactor{ config: config, Mempool: mempool, @@ -115,10 +127,18 @@ func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReac return memR } +type mempoolWithLogger interface { + SetLogger(log.Logger) +} + // SetLogger sets the Logger on the reactor and the underlying Mempool. func (memR *MempoolReactor) SetLogger(l log.Logger) { memR.Logger = l - memR.Mempool.SetLogger(l) + + // set mempoolWithLogger if mempool supports it + if mem, ok := memR.Mempool.(mempoolWithLogger); ok { + mem.SetLogger(l) + } } // OnStart implements p2p.BaseReactor. @@ -169,7 +189,7 @@ func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { peerID := memR.ids.GetForPeer(src) err := memR.Mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{PeerID: peerID}) if err != nil { - memR.Logger.Info("Could not check tx", "tx", TxID(msg.Tx), "err", err) + memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) } // broadcasting happens from go routines per peer default: diff --git a/node/node.go b/node/node.go index c0a4d736e..49fd17a19 100644 --- a/node/node.go +++ b/node/node.go @@ -157,9 +157,10 @@ type Node struct { // services eventBus *types.EventBus // pub/sub for services stateDB dbm.DB - blockStore *bc.BlockStore // store the blockchain to disk - bcReactor *bc.BlockchainReactor // for fast-syncing - mempoolReactor *mempl.MempoolReactor // for gossipping transactions + blockStore *bc.BlockStore // store the blockchain to disk + bcReactor *bc.BlockchainReactor // for fast-syncing + mempoolReactor *mempl.MempoolReactor // for gossipping transactions + mempool *mempl.CListMempool consensusState *cs.ConsensusState // latest consensus state consensusReactor *cs.ConsensusReactor // for participating in the consensus evidencePool *evidence.EvidencePool // tracking evidence @@ -320,7 +321,7 @@ func NewNode(config *cfg.Config, csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) // Make MempoolReactor - mempool := mempl.NewMempool( + mempool := mempl.NewCListMempool( config.Mempool, proxyApp.Mempool(), state.LastBlockHeight, @@ -329,7 +330,6 @@ func NewNode(config *cfg.Config, mempl.WithPostCheck(sm.TxPostCheck(state)), ) mempoolLogger := logger.With("module", "mempool") - mempool.SetLogger(mempoolLogger) if config.Mempool.WalEnabled() { mempool.InitWAL() // no need to have the mempool wal during tests } @@ -534,6 +534,7 @@ func NewNode(config *cfg.Config, blockStore: blockStore, bcReactor: bcReactor, mempoolReactor: mempoolReactor, + mempool: mempool, consensusState: consensusState, consensusReactor: consensusReactor, evidencePool: evidencePool, @@ -617,7 +618,7 @@ func (n *Node) OnStop() { // stop mempool WAL if n.config.Mempool.WalEnabled() { - n.mempoolReactor.Mempool.CloseWAL() + n.mempool.CloseWAL() } if err := n.transport.Close(); err != nil { @@ -652,7 +653,7 @@ func (n *Node) ConfigureRPC() { rpccore.SetStateDB(n.stateDB) rpccore.SetBlockStore(n.blockStore) rpccore.SetConsensusState(n.consensusState) - rpccore.SetMempool(n.mempoolReactor.Mempool) + rpccore.SetMempool(n.mempool) rpccore.SetEvidencePool(n.evidencePool) rpccore.SetP2PPeers(n.sw) rpccore.SetP2PTransport(n) @@ -804,6 +805,11 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor { return n.mempoolReactor } +// Mempool returns the Node's Mempool. +func (n *Node) Mempool() *mempl.CListMempool { + return n.mempool +} + // EvidencePool returns the Node's EvidencePool. func (n *Node) EvidencePool() *evidence.EvidencePool { return n.evidencePool diff --git a/node/node_test.go b/node/node_test.go index a2725d845..6971ddd34 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -224,7 +224,7 @@ func TestCreateProposalBlock(t *testing.T) { // Make Mempool memplMetrics := mempl.PrometheusMetrics("node_test") - mempool := mempl.NewMempool( + mempool := mempl.NewCListMempool( config.Mempool, proxyApp.Mempool(), state.LastBlockHeight, diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index ba9bc3af7..de3ec06e9 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -249,7 +249,8 @@ func TestAppCalls(t *testing.T) { func TestBroadcastTxSync(t *testing.T) { require := require.New(t) - mempool := node.MempoolReactor().Mempool + // TODO (melekes): use mempool which is set on RPC rather than getting it from node + mempool := node.Mempool() initMempoolSize := mempool.Size() for i, c := range GetClients() { @@ -269,7 +270,7 @@ func TestBroadcastTxSync(t *testing.T) { func TestBroadcastTxCommit(t *testing.T) { require := require.New(t) - mempool := node.MempoolReactor().Mempool + mempool := node.Mempool() for i, c := range GetClients() { _, _, tx := MakeTxKV() bres, err := c.BroadcastTxCommit(tx) @@ -284,7 +285,7 @@ func TestBroadcastTxCommit(t *testing.T) { func TestUnconfirmedTxs(t *testing.T) { _, _, tx := MakeTxKV() - mempool := node.MempoolReactor().Mempool + mempool := node.Mempool() _ = mempool.CheckTx(tx, nil) for i, c := range GetClients() { @@ -305,7 +306,7 @@ func TestUnconfirmedTxs(t *testing.T) { func TestNumUnconfirmedTxs(t *testing.T) { _, _, tx := MakeTxKV() - mempool := node.MempoolReactor().Mempool + mempool := node.Mempool() _ = mempool.CheckTx(tx, nil) mempoolSize := mempool.Size() diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index ad8afdefc..ab2316a38 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -49,6 +49,14 @@ type peers interface { Peers() p2p.IPeerSet } +// Mempool extends the standard Mempool interface to allow getting +// total txs size. ReapMaxTxs is used by UnconfirmedTxs to reap N transactions. +type Mempool interface { + mempl.Mempool + ReapMaxTxs(n int) types.Txs + TxsBytes() int64 +} + //---------------------------------------------- // These package level globals come with setters // that are expected to be called only once, on startup @@ -72,7 +80,7 @@ var ( txIndexer txindex.TxIndexer consensusReactor *consensus.ConsensusReactor eventBus *types.EventBus // thread safe - mempool *mempl.Mempool + mempool Mempool logger log.Logger @@ -87,7 +95,7 @@ func SetBlockStore(bs sm.BlockStore) { blockStore = bs } -func SetMempool(mem *mempl.Mempool) { +func SetMempool(mem Mempool) { mempool = mem } diff --git a/state/execution.go b/state/execution.go index 3a11ecca4..e97efd7bd 100644 --- a/state/execution.go +++ b/state/execution.go @@ -8,6 +8,7 @@ import ( dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/fail" "github.com/tendermint/tendermint/libs/log" + mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -30,7 +31,7 @@ type BlockExecutor struct { // manage the mempool lock during commit // and update both with block results after commit. - mempool Mempool + mempool mempl.Mempool evpool EvidencePool logger log.Logger @@ -48,7 +49,7 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption { // NewBlockExecutor returns a new BlockExecutor with a NopEventBus. // Call SetEventBus to provide one. -func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor { +func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, mempool mempl.Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor { res := &BlockExecutor{ db: db, proxyApp: proxyApp, diff --git a/state/execution_test.go b/state/execution_test.go index a9fdfe270..b15ea0cb0 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -16,10 +16,10 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" - tmtime "github.com/tendermint/tendermint/types/time" - + memplmock "github.com/tendermint/tendermint/mempool/mock" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" + tmtime "github.com/tendermint/tendermint/types/time" ) var ( @@ -38,7 +38,7 @@ func TestApplyBlock(t *testing.T) { state, stateDB := state(1, 1) blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), - MockMempool{}, MockEvidencePool{}) + memplmock.Mempool{}, MockEvidencePool{}) block := makeBlock(state, 1) blockID := types.BlockID{block.Hash(), block.MakePartSet(testPartSize).Header()} @@ -310,7 +310,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { state, stateDB := state(1, 1) - blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), MockMempool{}, MockEvidencePool{}) + blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), memplmock.Mempool{}, MockEvidencePool{}) eventBus := types.NewEventBus() err = eventBus.Start() @@ -367,7 +367,7 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { defer proxyApp.Stop() state, stateDB := state(1, 1) - blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), MockMempool{}, MockEvidencePool{}) + blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), memplmock.Mempool{}, MockEvidencePool{}) block := makeBlock(state, 1) blockID := types.BlockID{block.Hash(), block.MakePartSet(testPartSize).Header()} diff --git a/state/services.go b/state/services.go index 07d12c5a1..98f6afce3 100644 --- a/state/services.go +++ b/state/services.go @@ -1,8 +1,6 @@ package state import ( - abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/types" ) @@ -11,57 +9,6 @@ import ( // NOTE: Interfaces used by RPC must be thread safe! //------------------------------------------------------ -//------------------------------------------------------ -// mempool - -// Mempool defines the mempool interface as used by the ConsensusState. -// Updates to the mempool need to be synchronized with committing a block -// so apps can reset their transient state on Commit -type Mempool interface { - Lock() - Unlock() - - Size() int - CheckTx(types.Tx, func(*abci.Response)) error - CheckTxWithInfo(types.Tx, func(*abci.Response), mempool.TxInfo) error - ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs - Update(int64, types.Txs, mempool.PreCheckFunc, mempool.PostCheckFunc) error - Flush() - FlushAppConn() error - - TxsAvailable() <-chan struct{} - EnableTxsAvailable() -} - -// MockMempool is an empty implementation of a Mempool, useful for testing. -type MockMempool struct{} - -var _ Mempool = MockMempool{} - -func (MockMempool) Lock() {} -func (MockMempool) Unlock() {} -func (MockMempool) Size() int { return 0 } -func (MockMempool) CheckTx(_ types.Tx, _ func(*abci.Response)) error { - return nil -} -func (MockMempool) CheckTxWithInfo(_ types.Tx, _ func(*abci.Response), - _ mempool.TxInfo) error { - return nil -} -func (MockMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } -func (MockMempool) Update( - _ int64, - _ types.Txs, - _ mempool.PreCheckFunc, - _ mempool.PostCheckFunc, -) error { - return nil -} -func (MockMempool) Flush() {} -func (MockMempool) FlushAppConn() error { return nil } -func (MockMempool) TxsAvailable() <-chan struct{} { return make(chan struct{}) } -func (MockMempool) EnableTxsAvailable() {} - //------------------------------------------------------ // blockstore @@ -96,7 +43,7 @@ type EvidencePool interface { IsCommitted(types.Evidence) bool } -// MockMempool is an empty implementation of a Mempool, useful for testing. +// MockEvidencePool is an empty implementation of a Mempool, useful for testing. type MockEvidencePool struct{} func (m MockEvidencePool) PendingEvidence(int64) []types.Evidence { return nil }