package mempool
|
|
|
|
import (
|
|
"bytes"
|
|
"container/list"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/spf13/viper"
|
|
|
|
abci "github.com/tendermint/abci/types"
|
|
"github.com/tendermint/tendermint/proxy"
|
|
"github.com/tendermint/tendermint/types"
|
|
auto "github.com/tendermint/tmlibs/autofile"
|
|
"github.com/tendermint/tmlibs/clist"
|
|
. "github.com/tendermint/tmlibs/common"
|
|
)
|
|
|
|
/*
|
|
|
|
The mempool pushes new txs onto the proxyAppConn.
|
|
It gets a stream of (req, res) tuples from the proxy.
|
|
The memool stores good txs in a concurrent linked-list.
|
|
|
|
Multiple concurrent go-routines can traverse this linked-list
|
|
safely by calling .NextWait() on each element.
|
|
|
|
So we have several go-routines:
|
|
1. Consensus calling Update() and Reap() synchronously
|
|
2. Many mempool reactor's peer routines calling CheckTx()
|
|
3. Many mempool reactor's peer routines traversing the txs linked list
|
|
4. Another goroutine calling GarbageCollectTxs() periodically
|
|
|
|
To manage these goroutines, there are three methods of locking.
|
|
1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe)
|
|
2. Mutations to the linked-list elements are atomic
|
|
3. CheckTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx
|
|
|
|
Garbage collection of old elements from mempool.txs is handlde via
|
|
the DetachPrev() call, which makes old elements not reachable by
|
|
peer broadcastTxRoutine() automatically garbage collected.
|
|
|
|
TODO: Better handle abci client errors. (make it automatically handle connection errors)
|
|
|
|
*/
|
|
|
|
const cacheSize = 100000
|
|
|
|
type Mempool struct {
|
|
config *viper.Viper
|
|
|
|
proxyMtx sync.Mutex
|
|
proxyAppConn proxy.AppConnMempool
|
|
txs *clist.CList // concurrent linked-list of good txs
|
|
counter int64 // simple incrementing counter
|
|
height int // the last block Update()'d to
|
|
rechecking int32 // for re-checking filtered txs on Update()
|
|
recheckCursor *clist.CElement // next expected response
|
|
recheckEnd *clist.CElement // re-checking stops here
|
|
|
|
// Keep a cache of already-seen txs.
|
|
// This reduces the pressure on the proxyApp.
|
|
cache *txCache
|
|
|
|
// A log of mempool txs
|
|
wal *auto.AutoFile
|
|
}
|
|
|
|
func NewMempool(config *viper.Viper, proxyAppConn proxy.AppConnMempool) *Mempool {
|
|
mempool := &Mempool{
|
|
config: config,
|
|
proxyAppConn: proxyAppConn,
|
|
txs: clist.New(),
|
|
counter: 0,
|
|
height: 0,
|
|
rechecking: 0,
|
|
recheckCursor: nil,
|
|
recheckEnd: nil,
|
|
|
|
cache: newTxCache(cacheSize),
|
|
}
|
|
mempool.initWAL()
|
|
proxyAppConn.SetResponseCallback(mempool.resCb)
|
|
return mempool
|
|
}
|
|
|
|
func (mem *Mempool) initWAL() {
|
|
walDir := mem.config.GetString("mempool_wal_dir")
|
|
if walDir != "" {
|
|
err := EnsureDir(walDir, 0700)
|
|
if err != nil {
|
|
log.Error("Error ensuring Mempool wal dir", "error", err)
|
|
PanicSanity(err)
|
|
}
|
|
af, err := auto.OpenAutoFile(walDir + "/wal")
|
|
if err != nil {
|
|
log.Error("Error opening Mempool wal file", "error", err)
|
|
PanicSanity(err)
|
|
}
|
|
mem.wal = af
|
|
}
|
|
}
|
|
|
|
// consensus must be able to hold lock to safely update
|
|
func (mem *Mempool) Lock() {
|
|
mem.proxyMtx.Lock()
|
|
}
|
|
|
|
func (mem *Mempool) Unlock() {
|
|
mem.proxyMtx.Unlock()
|
|
}
|
|
|
|
// Number of transactions in the mempool clist
|
|
func (mem *Mempool) Size() int {
|
|
return mem.txs.Len()
|
|
}
|
|
|
|
// Remove all transactions from mempool and cache
|
|
func (mem *Mempool) Flush() {
|
|
mem.proxyMtx.Lock()
|
|
defer mem.proxyMtx.Unlock()
|
|
|
|
mem.cache.Reset()
|
|
|
|
for e := mem.txs.Front(); e != nil; e = e.Next() {
|
|
mem.txs.Remove(e)
|
|
e.DetachPrev()
|
|
}
|
|
}
|
|
|
|
// Return the first element of mem.txs for peer goroutines to call .NextWait() on.
|
|
// Blocks until txs has elements.
|
|
func (mem *Mempool) TxsFrontWait() *clist.CElement {
|
|
return mem.txs.FrontWait()
|
|
}
|
|
|
|
// Try a new transaction in the mempool.
|
|
// Potentially blocking if we're blocking on Update() or Reap().
|
|
// cb: A callback from the CheckTx command.
|
|
// It gets called from another goroutine.
|
|
// CONTRACT: Either cb will get called, or err returned.
|
|
func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
|
|
mem.proxyMtx.Lock()
|
|
defer mem.proxyMtx.Unlock()
|
|
|
|
// CACHE
|
|
if mem.cache.Exists(tx) {
|
|
if cb != nil {
|
|
cb(&abci.Response{
|
|
Value: &abci.Response_CheckTx{
|
|
&abci.ResponseCheckTx{
|
|
Code: abci.CodeType_BadNonce, // TODO or duplicate tx
|
|
Log: "Duplicate transaction (ignored)",
|
|
},
|
|
},
|
|
})
|
|
}
|
|
return nil
|
|
}
|
|
mem.cache.Push(tx)
|
|
// END CACHE
|
|
|
|
// WAL
|
|
if mem.wal != nil {
|
|
// TODO: Notify administrators when WAL fails
|
|
mem.wal.Write([]byte(tx))
|
|
mem.wal.Write([]byte("\n"))
|
|
}
|
|
// END WAL
|
|
|
|
// NOTE: proxyAppConn may error if tx buffer is full
|
|
if err = mem.proxyAppConn.Error(); err != nil {
|
|
return err
|
|
}
|
|
reqRes := mem.proxyAppConn.CheckTxAsync(tx)
|
|
if cb != nil {
|
|
reqRes.SetCallback(cb)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ABCI callback function
|
|
func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) {
|
|
if mem.recheckCursor == nil {
|
|
mem.resCbNormal(req, res)
|
|
} else {
|
|
mem.resCbRecheck(req, res)
|
|
}
|
|
}
|
|
|
|
func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
|
|
switch r := res.Value.(type) {
|
|
case *abci.Response_CheckTx:
|
|
if r.CheckTx.Code == abci.CodeType_OK {
|
|
mem.counter++
|
|
memTx := &mempoolTx{
|
|
counter: mem.counter,
|
|
height: int64(mem.height),
|
|
tx: req.GetCheckTx().Tx,
|
|
}
|
|
mem.txs.PushBack(memTx)
|
|
} else {
|
|
// ignore bad transaction
|
|
log.Info("Bad Transaction", "res", r)
|
|
|
|
// remove from cache (it might be good later)
|
|
mem.cache.Remove(req.GetCheckTx().Tx)
|
|
|
|
// TODO: handle other retcodes
|
|
}
|
|
default:
|
|
// ignore other messages
|
|
}
|
|
}
|
|
|
|
func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
|
|
switch r := res.Value.(type) {
|
|
case *abci.Response_CheckTx:
|
|
memTx := mem.recheckCursor.Value.(*mempoolTx)
|
|
if !bytes.Equal(req.GetCheckTx().Tx, memTx.tx) {
|
|
PanicSanity(Fmt("Unexpected tx response from proxy during recheck\n"+
|
|
"Expected %X, got %X", r.CheckTx.Data, memTx.tx))
|
|
}
|
|
if r.CheckTx.Code == abci.CodeType_OK {
|
|
// Good, nothing to do.
|
|
} else {
|
|
// Tx became invalidated due to newly committed block.
|
|
mem.txs.Remove(mem.recheckCursor)
|
|
mem.recheckCursor.DetachPrev()
|
|
|
|
// remove from cache (it might be good later)
|
|
mem.cache.Remove(req.GetCheckTx().Tx)
|
|
}
|
|
if mem.recheckCursor == mem.recheckEnd {
|
|
mem.recheckCursor = nil
|
|
} else {
|
|
mem.recheckCursor = mem.recheckCursor.Next()
|
|
}
|
|
if mem.recheckCursor == nil {
|
|
// Done!
|
|
atomic.StoreInt32(&mem.rechecking, 0)
|
|
log.Info("Done rechecking txs")
|
|
}
|
|
default:
|
|
// ignore other messages
|
|
}
|
|
}
|
|
|
|
// Get the valid transactions remaining
|
|
// If maxTxs is -1, there is no cap on returned transactions.
|
|
func (mem *Mempool) Reap(maxTxs int) types.Txs {
|
|
mem.proxyMtx.Lock()
|
|
defer mem.proxyMtx.Unlock()
|
|
|
|
for atomic.LoadInt32(&mem.rechecking) > 0 {
|
|
// TODO: Something better?
|
|
time.Sleep(time.Millisecond * 10)
|
|
}
|
|
|
|
txs := mem.collectTxs(maxTxs)
|
|
return txs
|
|
}
|
|
|
|
// maxTxs: -1 means uncapped, 0 means none
|
|
func (mem *Mempool) collectTxs(maxTxs int) types.Txs {
|
|
if maxTxs == 0 {
|
|
return []types.Tx{}
|
|
} else if maxTxs < 0 {
|
|
maxTxs = mem.txs.Len()
|
|
}
|
|
txs := make([]types.Tx, 0, MinInt(mem.txs.Len(), maxTxs))
|
|
for e := mem.txs.Front(); e != nil && len(txs) < maxTxs; e = e.Next() {
|
|
memTx := e.Value.(*mempoolTx)
|
|
txs = append(txs, memTx.tx)
|
|
}
|
|
return txs
|
|
}
|
|
|
|
// Tell mempool that these txs were committed.
|
|
// Mempool will discard these txs.
|
|
// NOTE: this should be called *after* block is committed by consensus.
|
|
// NOTE: unsafe; Lock/Unlock must be managed by caller
|
|
func (mem *Mempool) Update(height int, txs types.Txs) {
|
|
// TODO: check err ?
|
|
mem.proxyAppConn.FlushSync() // To flush async resCb calls e.g. from CheckTx
|
|
|
|
// First, create a lookup map of txns in new txs.
|
|
txsMap := make(map[string]struct{})
|
|
for _, tx := range txs {
|
|
txsMap[string(tx)] = struct{}{}
|
|
}
|
|
|
|
// Set height
|
|
mem.height = height
|
|
// Remove transactions that are already in txs.
|
|
goodTxs := mem.filterTxs(txsMap)
|
|
// Recheck mempool txs if any txs were committed in the block
|
|
// NOTE/XXX: in some apps a tx could be invalidated due to EndBlock,
|
|
// so we really still do need to recheck, but this is for debugging
|
|
if mem.config.GetBool("mempool_recheck") &&
|
|
(mem.config.GetBool("mempool_recheck_empty") || len(txs) > 0) {
|
|
log.Info("Recheck txs", "numtxs", len(goodTxs))
|
|
mem.recheckTxs(goodTxs)
|
|
// At this point, mem.txs are being rechecked.
|
|
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
|
|
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
|
|
}
|
|
}
|
|
|
|
func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx {
|
|
goodTxs := make([]types.Tx, 0, mem.txs.Len())
|
|
for e := mem.txs.Front(); e != nil; e = e.Next() {
|
|
memTx := e.Value.(*mempoolTx)
|
|
// Remove the tx if it's alredy in a block.
|
|
if _, ok := blockTxsMap[string(memTx.tx)]; ok {
|
|
// remove from clist
|
|
mem.txs.Remove(e)
|
|
e.DetachPrev()
|
|
|
|
// NOTE: we don't remove committed txs from the cache.
|
|
continue
|
|
}
|
|
// Good tx!
|
|
goodTxs = append(goodTxs, memTx.tx)
|
|
}
|
|
return goodTxs
|
|
}
|
|
|
|
// NOTE: pass in goodTxs because mem.txs can mutate concurrently.
|
|
func (mem *Mempool) recheckTxs(goodTxs []types.Tx) {
|
|
if len(goodTxs) == 0 {
|
|
return
|
|
}
|
|
atomic.StoreInt32(&mem.rechecking, 1)
|
|
mem.recheckCursor = mem.txs.Front()
|
|
mem.recheckEnd = mem.txs.Back()
|
|
|
|
// Push txs to proxyAppConn
|
|
// NOTE: resCb() may be called concurrently.
|
|
for _, tx := range goodTxs {
|
|
mem.proxyAppConn.CheckTxAsync(tx)
|
|
}
|
|
mem.proxyAppConn.FlushAsync()
|
|
}
|
|
|
|
//--------------------------------------------------------------------------------
|
|
|
|
// A transaction that successfully ran
|
|
type mempoolTx struct {
|
|
counter int64 // a simple incrementing counter
|
|
height int64 // height that this tx had been validated in
|
|
tx types.Tx //
|
|
}
|
|
|
|
func (memTx *mempoolTx) Height() int {
|
|
return int(atomic.LoadInt64(&memTx.height))
|
|
}
|
|
|
|
//--------------------------------------------------------------------------------
|
|
|
|
type txCache struct {
|
|
mtx sync.Mutex
|
|
size int
|
|
map_ map[string]struct{}
|
|
list *list.List // to remove oldest tx when cache gets too big
|
|
}
|
|
|
|
func newTxCache(cacheSize int) *txCache {
|
|
return &txCache{
|
|
size: cacheSize,
|
|
map_: make(map[string]struct{}, cacheSize),
|
|
list: list.New(),
|
|
}
|
|
}
|
|
|
|
func (cache *txCache) Reset() {
|
|
cache.mtx.Lock()
|
|
cache.map_ = make(map[string]struct{}, cacheSize)
|
|
cache.list.Init()
|
|
cache.mtx.Unlock()
|
|
}
|
|
|
|
func (cache *txCache) Exists(tx types.Tx) bool {
|
|
cache.mtx.Lock()
|
|
_, exists := cache.map_[string(tx)]
|
|
cache.mtx.Unlock()
|
|
return exists
|
|
}
|
|
|
|
// Returns false if tx is in cache.
|
|
func (cache *txCache) Push(tx types.Tx) bool {
|
|
cache.mtx.Lock()
|
|
defer cache.mtx.Unlock()
|
|
|
|
if _, exists := cache.map_[string(tx)]; exists {
|
|
return false
|
|
}
|
|
|
|
if cache.list.Len() >= cache.size {
|
|
popped := cache.list.Front()
|
|
poppedTx := popped.Value.(types.Tx)
|
|
// NOTE: the tx may have already been removed from the map
|
|
// but deleting a non-existant element is fine
|
|
delete(cache.map_, string(poppedTx))
|
|
cache.list.Remove(popped)
|
|
}
|
|
cache.map_[string(tx)] = struct{}{}
|
|
cache.list.PushBack(tx)
|
|
return true
|
|
}
|
|
|
|
func (cache *txCache) Remove(tx types.Tx) {
|
|
cache.mtx.Lock()
|
|
delete(cache.map_, string(tx))
|
|
cache.mtx.Unlock()
|
|
}
|