Browse Source

fix: race condition in p2p_switch and pex_reactor (#7015)

Closes https://github.com/tendermint/tendermint/issues/7014
pull/7016/head
lklimek 3 years ago
committed by GitHub
parent
commit
1bd1593f20
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 4 deletions
  1. +1
    -2
      internal/p2p/pex/pex_reactor.go
  2. +1
    -2
      internal/p2p/switch.go
  3. +14
    -0
      libs/cmap/cmap.go
  4. +44
    -0
      libs/cmap/cmap_test.go

+ 1
- 2
internal/p2p/pex/pex_reactor.go View File

@ -349,11 +349,10 @@ func (r *Reactor) receiveRequest(src Peer) error {
// request out for this peer.
func (r *Reactor) RequestAddrs(p Peer) {
id := string(p.ID())
if r.requestsSent.Has(id) {
if _, exists := r.requestsSent.GetOrSet(id, struct{}{}); exists {
return
}
r.Logger.Debug("Request addrs", "from", p)
r.requestsSent.Set(id, struct{}{})
p.Send(PexChannel, mustEncode(&tmp2p.PexRequest{}))
}


+ 1
- 2
internal/p2p/switch.go View File

@ -432,10 +432,9 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
// - ie. if we're getting ErrDuplicatePeer we can stop
// because the addrbook got us the peer back already
func (sw *Switch) reconnectToPeer(addr *NetAddress) {
if sw.reconnecting.Has(string(addr.ID)) {
if _, exists := sw.reconnecting.GetOrSet(string(addr.ID), addr); exists {
return
}
sw.reconnecting.Set(string(addr.ID), addr)
defer sw.reconnecting.Delete(string(addr.ID))
start := time.Now()


+ 14
- 0
libs/cmap/cmap.go View File

@ -22,6 +22,20 @@ func (cm *CMap) Set(key string, value interface{}) {
cm.l.Unlock()
}
// GetOrSet returns the existing value if present. Othewise, it stores `newValue` and returns it.
func (cm *CMap) GetOrSet(key string, newValue interface{}) (value interface{}, alreadyExists bool) {
cm.l.Lock()
defer cm.l.Unlock()
if v, ok := cm.m[key]; ok {
return v, true
}
cm.m[key] = newValue
return newValue, false
}
func (cm *CMap) Get(key string) interface{} {
cm.l.Lock()
val := cm.m[key]


+ 44
- 0
libs/cmap/cmap_test.go View File

@ -3,6 +3,7 @@ package cmap
import (
"fmt"
"strings"
"sync"
"testing"
"github.com/stretchr/testify/assert"
@ -67,3 +68,46 @@ func BenchmarkCMapHas(b *testing.B) {
m.Has(string(rune(i)))
}
}
func TestCMap_GetOrSet_Parallel(t *testing.T) {
tests := []struct {
name string
newValue interface{}
parallelism int
}{
{"test1", "a", 4},
{"test2", "a", 40},
{"test3", "a", 1},
}
//nolint:scopelint
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cm := NewCMap()
wg := sync.WaitGroup{}
wg.Add(tt.parallelism)
for i := 0; i < tt.parallelism; i++ {
go func() {
defer wg.Done()
gotValue, _ := cm.GetOrSet(tt.name, tt.newValue)
assert.EqualValues(t, tt.newValue, gotValue)
}()
}
wg.Wait()
})
}
}
func TestCMap_GetOrSet_Exists(t *testing.T) {
cm := NewCMap()
gotValue, exists := cm.GetOrSet("key", 1000)
assert.False(t, exists)
assert.EqualValues(t, 1000, gotValue)
gotValue, exists = cm.GetOrSet("key", 2000)
assert.True(t, exists)
assert.EqualValues(t, 1000, gotValue)
}

Loading…
Cancel
Save