|
|
@ -77,7 +77,7 @@ type Mempool struct { |
|
|
|
|
|
|
|
// Keep a cache of already-seen txs.
|
|
|
|
// This reduces the pressure on the proxyApp.
|
|
|
|
cache *txCache |
|
|
|
cache txCache |
|
|
|
|
|
|
|
// A log of mempool txs
|
|
|
|
wal *auto.AutoFile |
|
|
@ -97,7 +97,11 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, he |
|
|
|
recheckCursor: nil, |
|
|
|
recheckEnd: nil, |
|
|
|
logger: log.NewNopLogger(), |
|
|
|
cache: newTxCache(config.CacheSize), |
|
|
|
} |
|
|
|
if config.CacheSize > 0 { |
|
|
|
mempool.cache = newMapTxCache(config.CacheSize) |
|
|
|
} else { |
|
|
|
mempool.cache = nopTxCache{} |
|
|
|
} |
|
|
|
proxyAppConn.SetResponseCallback(mempool.resCb) |
|
|
|
return mempool |
|
|
@ -328,11 +332,11 @@ func (mem *Mempool) notifyTxsAvailable() { |
|
|
|
panic("notified txs available but mempool is empty!") |
|
|
|
} |
|
|
|
if mem.txsAvailable != nil && !mem.notifiedTxsAvailable { |
|
|
|
// channel cap is 1, so this will send once
|
|
|
|
select { |
|
|
|
case mem.txsAvailable <- mem.height + 1: |
|
|
|
default: |
|
|
|
} |
|
|
|
|
|
|
|
mem.notifiedTxsAvailable = true |
|
|
|
} |
|
|
|
} |
|
|
@ -448,33 +452,42 @@ func (memTx *mempoolTx) Height() int64 { |
|
|
|
|
|
|
|
//--------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
// txCache maintains a cache of transactions.
|
|
|
|
type txCache struct { |
|
|
|
type txCache interface { |
|
|
|
Reset() |
|
|
|
Push(tx types.Tx) bool |
|
|
|
Remove(tx types.Tx) |
|
|
|
} |
|
|
|
|
|
|
|
// mapTxCache maintains a cache of transactions.
|
|
|
|
type mapTxCache struct { |
|
|
|
mtx sync.Mutex |
|
|
|
size int |
|
|
|
map_ map[string]struct{} |
|
|
|
list *list.List // to remove oldest tx when cache gets too big
|
|
|
|
} |
|
|
|
|
|
|
|
// newTxCache returns a new txCache.
|
|
|
|
func newTxCache(cacheSize int) *txCache { |
|
|
|
return &txCache{ |
|
|
|
var _ txCache = (*mapTxCache)(nil) |
|
|
|
|
|
|
|
// newMapTxCache returns a new mapTxCache.
|
|
|
|
func newMapTxCache(cacheSize int) *mapTxCache { |
|
|
|
return &mapTxCache{ |
|
|
|
size: cacheSize, |
|
|
|
map_: make(map[string]struct{}, cacheSize), |
|
|
|
list: list.New(), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Reset resets the txCache to empty.
|
|
|
|
func (cache *txCache) Reset() { |
|
|
|
// Reset resets the cache to an empty state.
|
|
|
|
func (cache *mapTxCache) Reset() { |
|
|
|
cache.mtx.Lock() |
|
|
|
cache.map_ = make(map[string]struct{}, cache.size) |
|
|
|
cache.list.Init() |
|
|
|
cache.mtx.Unlock() |
|
|
|
} |
|
|
|
|
|
|
|
// Push adds the given tx to the txCache. It returns false if tx is already in the cache.
|
|
|
|
func (cache *txCache) Push(tx types.Tx) bool { |
|
|
|
// Push adds the given tx to the cache and returns true. It returns false if tx
|
|
|
|
// is already in the cache.
|
|
|
|
func (cache *mapTxCache) Push(tx types.Tx) bool { |
|
|
|
cache.mtx.Lock() |
|
|
|
defer cache.mtx.Unlock() |
|
|
|
|
|
|
@ -496,8 +509,16 @@ func (cache *txCache) Push(tx types.Tx) bool { |
|
|
|
} |
|
|
|
|
|
|
|
// Remove removes the given tx from the cache.
|
|
|
|
func (cache *txCache) Remove(tx types.Tx) { |
|
|
|
func (cache *mapTxCache) Remove(tx types.Tx) { |
|
|
|
cache.mtx.Lock() |
|
|
|
delete(cache.map_, string(tx)) |
|
|
|
cache.mtx.Unlock() |
|
|
|
} |
|
|
|
|
|
|
|
type nopTxCache struct{} |
|
|
|
|
|
|
|
var _ txCache = (*nopTxCache)(nil) |
|
|
|
|
|
|
|
func (nopTxCache) Reset() {} |
|
|
|
func (nopTxCache) Push(types.Tx) bool { return true } |
|
|
|
func (nopTxCache) Remove(types.Tx) {} |