From 1bd1593f20f41a3fb8deae71592229d77a2706b9 Mon Sep 17 00:00:00 2001 From: lklimek <842586+lklimek@users.noreply.github.com> Date: Tue, 28 Sep 2021 15:32:14 +0200 Subject: [PATCH] fix: race condition in p2p_switch and pex_reactor (#7015) Closes https://github.com/tendermint/tendermint/issues/7014 --- internal/p2p/pex/pex_reactor.go | 3 +-- internal/p2p/switch.go | 3 +-- libs/cmap/cmap.go | 14 +++++++++++ libs/cmap/cmap_test.go | 44 +++++++++++++++++++++++++++++++++ 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/internal/p2p/pex/pex_reactor.go b/internal/p2p/pex/pex_reactor.go index 049dbd9f1..9eb58c054 100644 --- a/internal/p2p/pex/pex_reactor.go +++ b/internal/p2p/pex/pex_reactor.go @@ -349,11 +349,10 @@ func (r *Reactor) receiveRequest(src Peer) error { // request out for this peer. func (r *Reactor) RequestAddrs(p Peer) { id := string(p.ID()) - if r.requestsSent.Has(id) { + if _, exists := r.requestsSent.GetOrSet(id, struct{}{}); exists { return } r.Logger.Debug("Request addrs", "from", p) - r.requestsSent.Set(id, struct{}{}) p.Send(PexChannel, mustEncode(&tmp2p.PexRequest{})) } diff --git a/internal/p2p/switch.go b/internal/p2p/switch.go index eeb93a994..ea1272354 100644 --- a/internal/p2p/switch.go +++ b/internal/p2p/switch.go @@ -432,10 +432,9 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { // - ie. if we're getting ErrDuplicatePeer we can stop // because the addrbook got us the peer back already func (sw *Switch) reconnectToPeer(addr *NetAddress) { - if sw.reconnecting.Has(string(addr.ID)) { + if _, exists := sw.reconnecting.GetOrSet(string(addr.ID), addr); exists { return } - sw.reconnecting.Set(string(addr.ID), addr) defer sw.reconnecting.Delete(string(addr.ID)) start := time.Now() diff --git a/libs/cmap/cmap.go b/libs/cmap/cmap.go index 539870363..5aa82e807 100644 --- a/libs/cmap/cmap.go +++ b/libs/cmap/cmap.go @@ -22,6 +22,20 @@ func (cm *CMap) Set(key string, value interface{}) { cm.l.Unlock() } +// GetOrSet returns the existing value if present. Othewise, it stores `newValue` and returns it. +func (cm *CMap) GetOrSet(key string, newValue interface{}) (value interface{}, alreadyExists bool) { + + cm.l.Lock() + defer cm.l.Unlock() + + if v, ok := cm.m[key]; ok { + return v, true + } + + cm.m[key] = newValue + return newValue, false +} + func (cm *CMap) Get(key string) interface{} { cm.l.Lock() val := cm.m[key] diff --git a/libs/cmap/cmap_test.go b/libs/cmap/cmap_test.go index bab78da96..68a052bdb 100644 --- a/libs/cmap/cmap_test.go +++ b/libs/cmap/cmap_test.go @@ -3,6 +3,7 @@ package cmap import ( "fmt" "strings" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -67,3 +68,46 @@ func BenchmarkCMapHas(b *testing.B) { m.Has(string(rune(i))) } } + +func TestCMap_GetOrSet_Parallel(t *testing.T) { + + tests := []struct { + name string + newValue interface{} + parallelism int + }{ + {"test1", "a", 4}, + {"test2", "a", 40}, + {"test3", "a", 1}, + } + + //nolint:scopelint + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cm := NewCMap() + + wg := sync.WaitGroup{} + wg.Add(tt.parallelism) + for i := 0; i < tt.parallelism; i++ { + go func() { + defer wg.Done() + gotValue, _ := cm.GetOrSet(tt.name, tt.newValue) + assert.EqualValues(t, tt.newValue, gotValue) + }() + } + wg.Wait() + }) + } +} + +func TestCMap_GetOrSet_Exists(t *testing.T) { + cm := NewCMap() + + gotValue, exists := cm.GetOrSet("key", 1000) + assert.False(t, exists) + assert.EqualValues(t, 1000, gotValue) + + gotValue, exists = cm.GetOrSet("key", 2000) + assert.True(t, exists) + assert.EqualValues(t, 1000, gotValue) +}