Browse Source

mempool: Store txs by hash inside of cache (#2234)

* mempool: Store txs by hash inside of cache

This allows for large cachesizes, without fear of the memory
consumption growing rapidly.

* (squash this) rename hashedTx -> txHash
pull/2250/merge
Dev Ojha 6 years ago
committed by Anton Kaliaev
parent
commit
bd531401a0
3 changed files with 25 additions and 14 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +8
    -2
      docs/spec/reactors/mempool/functionality.md
  3. +16
    -12
      mempool/mempool.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -28,6 +28,7 @@ IMPROVEMENTS:
- [docs] Lint documentation with `write-good` and `stop-words`.
- [scripts] Added json2wal tool, which is supposed to help our users restore
corrupted WAL files and compose test WAL files (@bradyjoestar)
- [mempool] Now stores txs by hash inside of the cache, to mitigate memory leakage
- [config] Replace db_path with db_dir from automatically generated configuration files.
Issue reported to Cosmos SDK ([#1712](https://github.com/cosmos/cosmos-sdk/issues/1712))


+ 8
- 2
docs/spec/reactors/mempool/functionality.md View File

@ -32,5 +32,11 @@ What guarantees does it need from the ABCI app?
## Optimizations
Talk about the LRU cache to make sure we don't process any
tx that we have seen before
The implementation within this library also implements a tx cache.
This is so that signatures don't have to be reverified if the tx has
already been seen before.
However, we only store valid txs in the cache, not invalid ones.
This is because invalid txs could become good later.
Txs that are included in a block aren't removed from the cache,
as they still may be getting received over the p2p network.
These txs are stored in the cache by their hash, to mitigate memory concerns.

+ 16
- 12
mempool/mempool.go View File

@ -3,6 +3,7 @@ package mempool
import (
"bytes"
"container/list"
"crypto/sha256"
"fmt"
"sync"
"sync/atomic"
@ -484,11 +485,12 @@ type txCache interface {
Remove(tx types.Tx)
}
// mapTxCache maintains a cache of transactions.
// mapTxCache maintains a cache of transactions. This only stores
// the hash of the tx, due to memory concerns.
type mapTxCache struct {
mtx sync.Mutex
size int
map_ map[string]*list.Element
map_ map[[sha256.Size]byte]*list.Element
list *list.List // to remove oldest tx when cache gets too big
}
@ -498,7 +500,7 @@ var _ txCache = (*mapTxCache)(nil)
func newMapTxCache(cacheSize int) *mapTxCache {
return &mapTxCache{
size: cacheSize,
map_: make(map[string]*list.Element, cacheSize),
map_: make(map[[sha256.Size]byte]*list.Element, cacheSize),
list: list.New(),
}
}
@ -506,7 +508,7 @@ func newMapTxCache(cacheSize int) *mapTxCache {
// Reset resets the cache to an empty state.
func (cache *mapTxCache) Reset() {
cache.mtx.Lock()
cache.map_ = make(map[string]*list.Element, cache.size)
cache.map_ = make(map[[sha256.Size]byte]*list.Element, cache.size)
cache.list.Init()
cache.mtx.Unlock()
}
@ -517,29 +519,31 @@ func (cache *mapTxCache) Push(tx types.Tx) bool {
cache.mtx.Lock()
defer cache.mtx.Unlock()
if _, exists := cache.map_[string(tx)]; exists {
// Use the tx hash in the cache
txHash := sha256.Sum256(tx)
if _, exists := cache.map_[txHash]; exists {
return false
}
if cache.list.Len() >= cache.size {
popped := cache.list.Front()
poppedTx := popped.Value.(types.Tx)
delete(cache.map_, string(poppedTx))
poppedTxHash := popped.Value.([sha256.Size]byte)
delete(cache.map_, poppedTxHash)
if popped != nil {
cache.list.Remove(popped)
}
}
cache.list.PushBack(tx)
cache.map_[string(tx)] = cache.list.Back()
cache.list.PushBack(txHash)
cache.map_[txHash] = cache.list.Back()
return true
}
// Remove removes the given tx from the cache.
func (cache *mapTxCache) Remove(tx types.Tx) {
cache.mtx.Lock()
stx := string(tx)
popped := cache.map_[stx]
delete(cache.map_, stx)
txHash := sha256.Sum256(tx)
popped := cache.map_[txHash]
delete(cache.map_, txHash)
if popped != nil {
cache.list.Remove(popped)
}


Loading…
Cancel
Save