Browse Source

mempool: Keep cache hashmap and linked list in sync (#2188)

* mempool: Keep cache hashmap and linked list in sync

This removes bugs with the linked list being full, but hashmap empty

* address PR comments

* switch clist back to list
pull/2148/head
Dev Ojha 6 years ago
committed by Anton Kaliaev
parent
commit
8a1a79257e
3 changed files with 67 additions and 8 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +14
    -8
      mempool/mempool.go
  3. +51
    -0
      mempool/mempool_test.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -18,3 +18,5 @@ IMPROVEMENTS:
corrupted WAL files and compose test WAL files (@bradyjoestar) corrupted WAL files and compose test WAL files (@bradyjoestar)
BUG FIXES: BUG FIXES:
- [mempool] No longer possible to fill up linked list without getting caching
benefits [#2180](https://github.com/tendermint/tendermint/issues/2180)

+ 14
- 8
mempool/mempool.go View File

@ -488,7 +488,7 @@ type txCache interface {
type mapTxCache struct { type mapTxCache struct {
mtx sync.Mutex mtx sync.Mutex
size int size int
map_ map[string]struct{}
map_ map[string]*list.Element
list *list.List // to remove oldest tx when cache gets too big list *list.List // to remove oldest tx when cache gets too big
} }
@ -498,7 +498,7 @@ var _ txCache = (*mapTxCache)(nil)
func newMapTxCache(cacheSize int) *mapTxCache { func newMapTxCache(cacheSize int) *mapTxCache {
return &mapTxCache{ return &mapTxCache{
size: cacheSize, size: cacheSize,
map_: make(map[string]struct{}, cacheSize),
map_: make(map[string]*list.Element, cacheSize),
list: list.New(), list: list.New(),
} }
} }
@ -506,7 +506,7 @@ func newMapTxCache(cacheSize int) *mapTxCache {
// Reset resets the cache to an empty state. // Reset resets the cache to an empty state.
func (cache *mapTxCache) Reset() { func (cache *mapTxCache) Reset() {
cache.mtx.Lock() cache.mtx.Lock()
cache.map_ = make(map[string]struct{}, cache.size)
cache.map_ = make(map[string]*list.Element, cache.size)
cache.list.Init() cache.list.Init()
cache.mtx.Unlock() cache.mtx.Unlock()
} }
@ -524,20 +524,26 @@ func (cache *mapTxCache) Push(tx types.Tx) bool {
if cache.list.Len() >= cache.size { if cache.list.Len() >= cache.size {
popped := cache.list.Front() popped := cache.list.Front()
poppedTx := popped.Value.(types.Tx) poppedTx := popped.Value.(types.Tx)
// NOTE: the tx may have already been removed from the map
// but deleting a non-existent element is fine
delete(cache.map_, string(poppedTx)) delete(cache.map_, string(poppedTx))
cache.list.Remove(popped)
if popped != nil {
cache.list.Remove(popped)
}
} }
cache.map_[string(tx)] = struct{}{}
cache.list.PushBack(tx) cache.list.PushBack(tx)
cache.map_[string(tx)] = cache.list.Back()
return true return true
} }
// Remove removes the given tx from the cache. // Remove removes the given tx from the cache.
func (cache *mapTxCache) Remove(tx types.Tx) { func (cache *mapTxCache) Remove(tx types.Tx) {
cache.mtx.Lock() cache.mtx.Lock()
delete(cache.map_, string(tx))
stx := string(tx)
popped := cache.map_[stx]
delete(cache.map_, stx)
if popped != nil {
cache.list.Remove(popped)
}
cache.mtx.Unlock() cache.mtx.Unlock()
} }


+ 51
- 0
mempool/mempool_test.go View File

@ -223,6 +223,28 @@ func TestSerialReap(t *testing.T) {
reapCheck(600) reapCheck(600)
} }
func TestCacheRemove(t *testing.T) {
cache := newMapTxCache(100)
numTxs := 10
txs := make([][]byte, numTxs)
for i := 0; i < numTxs; i++ {
// probability of collision is 2**-256
txBytes := make([]byte, 32)
rand.Read(txBytes)
txs[i] = txBytes
cache.Push(txBytes)
// make sure its added to both the linked list and the map
require.Equal(t, i+1, len(cache.map_))
require.Equal(t, i+1, cache.list.Len())
}
for i := 0; i < numTxs; i++ {
cache.Remove(txs[i])
// make sure its removed from both the map and the linked list
require.Equal(t, numTxs-(i+1), len(cache.map_))
require.Equal(t, numTxs-(i+1), cache.list.Len())
}
}
func TestMempoolCloseWAL(t *testing.T) { func TestMempoolCloseWAL(t *testing.T) {
// 1. Create the temporary directory for mempool and WAL testing. // 1. Create the temporary directory for mempool and WAL testing.
rootDir, err := ioutil.TempDir("", "mempool-test") rootDir, err := ioutil.TempDir("", "mempool-test")
@ -272,6 +294,35 @@ func TestMempoolCloseWAL(t *testing.T) {
require.Equal(t, 1, len(m3), "expecting the wal match in") require.Equal(t, 1, len(m3), "expecting the wal match in")
} }
func BenchmarkCacheInsertTime(b *testing.B) {
cache := newMapTxCache(b.N)
txs := make([][]byte, b.N)
for i := 0; i < b.N; i++ {
txs[i] = make([]byte, 8)
binary.BigEndian.PutUint64(txs[i], uint64(i))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Push(txs[i])
}
}
// This benchmark is probably skewed, since we actually will be removing
// txs in parallel, which may cause some overhead due to mutex locking.
func BenchmarkCacheRemoveTime(b *testing.B) {
cache := newMapTxCache(b.N)
txs := make([][]byte, b.N)
for i := 0; i < b.N; i++ {
txs[i] = make([]byte, 8)
binary.BigEndian.PutUint64(txs[i], uint64(i))
cache.Push(txs[i])
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Remove(txs[i])
}
}
func checksumIt(data []byte) string { func checksumIt(data []byte) string {
h := md5.New() h := md5.New()
h.Write(data) h.Write(data)


Loading…
Cancel
Save