From 8a1a79257e8e8b309cd35bb1fe40bf9b3330fd7d Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Mon, 13 Aug 2018 03:42:33 -0700 Subject: [PATCH] mempool: Keep cache hashmap and linked list in sync (#2188) * mempool: Keep cache hashmap and linked list in sync This removes bugs with the linked list being full, but hashmap empty * address PR comments * switch clist back to list --- CHANGELOG_PENDING.md | 2 ++ mempool/mempool.go | 22 +++++++++++------- mempool/mempool_test.go | 51 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 8 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 32389de2b..48b7db0b7 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -18,3 +18,5 @@ IMPROVEMENTS: corrupted WAL files and compose test WAL files (@bradyjoestar) BUG FIXES: +- [mempool] No longer possible to fill up linked list without getting caching +benefits [#2180](https://github.com/tendermint/tendermint/issues/2180) \ No newline at end of file diff --git a/mempool/mempool.go b/mempool/mempool.go index f03ca4e8b..f336585ba 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -488,7 +488,7 @@ type txCache interface { type mapTxCache struct { mtx sync.Mutex size int - map_ map[string]struct{} + map_ map[string]*list.Element list *list.List // to remove oldest tx when cache gets too big } @@ -498,7 +498,7 @@ var _ txCache = (*mapTxCache)(nil) func newMapTxCache(cacheSize int) *mapTxCache { return &mapTxCache{ size: cacheSize, - map_: make(map[string]struct{}, cacheSize), + map_: make(map[string]*list.Element, cacheSize), list: list.New(), } } @@ -506,7 +506,7 @@ func newMapTxCache(cacheSize int) *mapTxCache { // Reset resets the cache to an empty state. func (cache *mapTxCache) Reset() { cache.mtx.Lock() - cache.map_ = make(map[string]struct{}, cache.size) + cache.map_ = make(map[string]*list.Element, cache.size) cache.list.Init() cache.mtx.Unlock() } @@ -524,20 +524,26 @@ func (cache *mapTxCache) Push(tx types.Tx) bool { if cache.list.Len() >= cache.size { popped := cache.list.Front() poppedTx := popped.Value.(types.Tx) - // NOTE: the tx may have already been removed from the map - // but deleting a non-existent element is fine delete(cache.map_, string(poppedTx)) - cache.list.Remove(popped) + if popped != nil { + cache.list.Remove(popped) + } } - cache.map_[string(tx)] = struct{}{} cache.list.PushBack(tx) + cache.map_[string(tx)] = cache.list.Back() return true } // Remove removes the given tx from the cache. func (cache *mapTxCache) Remove(tx types.Tx) { cache.mtx.Lock() - delete(cache.map_, string(tx)) + stx := string(tx) + popped := cache.map_[stx] + delete(cache.map_, stx) + if popped != nil { + cache.list.Remove(popped) + } + cache.mtx.Unlock() } diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index e276f11c8..c29578efd 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -223,6 +223,28 @@ func TestSerialReap(t *testing.T) { reapCheck(600) } +func TestCacheRemove(t *testing.T) { + cache := newMapTxCache(100) + numTxs := 10 + txs := make([][]byte, numTxs) + for i := 0; i < numTxs; i++ { + // probability of collision is 2**-256 + txBytes := make([]byte, 32) + rand.Read(txBytes) + txs[i] = txBytes + cache.Push(txBytes) + // make sure its added to both the linked list and the map + require.Equal(t, i+1, len(cache.map_)) + require.Equal(t, i+1, cache.list.Len()) + } + for i := 0; i < numTxs; i++ { + cache.Remove(txs[i]) + // make sure its removed from both the map and the linked list + require.Equal(t, numTxs-(i+1), len(cache.map_)) + require.Equal(t, numTxs-(i+1), cache.list.Len()) + } +} + func TestMempoolCloseWAL(t *testing.T) { // 1. Create the temporary directory for mempool and WAL testing. rootDir, err := ioutil.TempDir("", "mempool-test") @@ -272,6 +294,35 @@ func TestMempoolCloseWAL(t *testing.T) { require.Equal(t, 1, len(m3), "expecting the wal match in") } +func BenchmarkCacheInsertTime(b *testing.B) { + cache := newMapTxCache(b.N) + txs := make([][]byte, b.N) + for i := 0; i < b.N; i++ { + txs[i] = make([]byte, 8) + binary.BigEndian.PutUint64(txs[i], uint64(i)) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.Push(txs[i]) + } +} + +// This benchmark is probably skewed, since we actually will be removing +// txs in parallel, which may cause some overhead due to mutex locking. +func BenchmarkCacheRemoveTime(b *testing.B) { + cache := newMapTxCache(b.N) + txs := make([][]byte, b.N) + for i := 0; i < b.N; i++ { + txs[i] = make([]byte, 8) + binary.BigEndian.PutUint64(txs[i], uint64(i)) + cache.Push(txs[i]) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.Remove(txs[i]) + } +} + func checksumIt(data []byte) string { h := md5.New() h.Write(data)