Browse Source

p2p: fix peer count mismatch #2332 (#2969)

* p2p: test case for peer count mismatch #2332

* p2p: fix peer count mismatch #2332

* changelog

* use httptest.Server to scrape Prometheus metrics
pull/2975/head v0.27.0-rc0
Ethan Buchman 6 years ago
committed by GitHub
parent
commit
a14fd8eba0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 79 additions and 11 deletions
  1. +3
    -0
      CHANGELOG.md
  2. +6
    -3
      p2p/peer_set.go
  3. +8
    -4
      p2p/peer_set_test.go
  4. +6
    -3
      p2p/switch.go
  5. +55
    -0
      p2p/switch_test.go
  6. +1
    -1
      p2p/test_util.go

+ 3
- 0
CHANGELOG.md View File

@ -58,6 +58,9 @@ key types that can be used by validators.
reset to 0 every time a validator is updated
- [p2p] \#2968 Panic on transport error rather than continuing to run but not
accept new connections
- [p2p] \#2969 Fix mismatch in peer count between `/net_info` and the prometheus
metrics
## v0.26.4


+ 6
- 3
p2p/peer_set.go View File

@ -98,13 +98,15 @@ func (ps *PeerSet) Get(peerKey ID) Peer {
}
// Remove discards peer by its Key, if the peer was previously memoized.
func (ps *PeerSet) Remove(peer Peer) {
// Returns true if the peer was removed, and false if it was not found.
// in the set.
func (ps *PeerSet) Remove(peer Peer) bool {
ps.mtx.Lock()
defer ps.mtx.Unlock()
item := ps.lookup[peer.ID()]
if item == nil {
return
return false
}
index := item.index
@ -116,7 +118,7 @@ func (ps *PeerSet) Remove(peer Peer) {
if index == len(ps.list)-1 {
ps.list = newList
delete(ps.lookup, peer.ID())
return
return true
}
// Replace the popped item with the last item in the old list.
@ -127,6 +129,7 @@ func (ps *PeerSet) Remove(peer Peer) {
lastPeerItem.index = index
ps.list = newList
delete(ps.lookup, peer.ID())
return true
}
// Size returns the number of unique items in the peerSet.


+ 8
- 4
p2p/peer_set_test.go View File

@ -60,13 +60,15 @@ func TestPeerSetAddRemoveOne(t *testing.T) {
n := len(peerList)
// 1. Test removing from the front
for i, peerAtFront := range peerList {
peerSet.Remove(peerAtFront)
removed := peerSet.Remove(peerAtFront)
assert.True(t, removed)
wantSize := n - i - 1
for j := 0; j < 2; j++ {
assert.Equal(t, false, peerSet.Has(peerAtFront.ID()), "#%d Run #%d: failed to remove peer", i, j)
assert.Equal(t, wantSize, peerSet.Size(), "#%d Run #%d: failed to remove peer and decrement size", i, j)
// Test the route of removing the now non-existent element
peerSet.Remove(peerAtFront)
removed := peerSet.Remove(peerAtFront)
assert.False(t, removed)
}
}
@ -81,7 +83,8 @@ func TestPeerSetAddRemoveOne(t *testing.T) {
// b) In reverse, remove each element
for i := n - 1; i >= 0; i-- {
peerAtEnd := peerList[i]
peerSet.Remove(peerAtEnd)
removed := peerSet.Remove(peerAtEnd)
assert.True(t, removed)
assert.Equal(t, false, peerSet.Has(peerAtEnd.ID()), "#%d: failed to remove item at end", i)
assert.Equal(t, i, peerSet.Size(), "#%d: differing sizes after peerSet.Remove(atEndPeer)", i)
}
@ -105,7 +108,8 @@ func TestPeerSetAddRemoveMany(t *testing.T) {
}
for i, peer := range peers {
peerSet.Remove(peer)
removed := peerSet.Remove(peer)
assert.True(t, removed)
if peerSet.Has(peer.ID()) {
t.Errorf("Failed to remove peer")
}


+ 6
- 3
p2p/switch.go View File

@ -211,7 +211,9 @@ func (sw *Switch) OnStop() {
// Stop peers
for _, p := range sw.peers.List() {
p.Stop()
sw.peers.Remove(p)
if sw.peers.Remove(p) {
sw.metrics.Peers.Add(float64(-1))
}
}
// Stop reactors
@ -299,8 +301,9 @@ func (sw *Switch) StopPeerGracefully(peer Peer) {
}
func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
sw.peers.Remove(peer)
sw.metrics.Peers.Add(float64(-1))
if sw.peers.Remove(peer) {
sw.metrics.Peers.Add(float64(-1))
}
peer.Stop()
for _, reactor := range sw.reactors {
reactor.RemovePeer(peer, reason)


+ 55
- 0
p2p/switch_test.go View File

@ -3,10 +3,17 @@ package p2p
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"regexp"
"strconv"
"sync"
"testing"
"time"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -335,6 +342,54 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
assert.False(p.IsRunning())
}
func TestSwitchStopPeerForError(t *testing.T) {
s := httptest.NewServer(stdprometheus.UninstrumentedHandler())
defer s.Close()
scrapeMetrics := func() string {
resp, _ := http.Get(s.URL)
buf, _ := ioutil.ReadAll(resp.Body)
return string(buf)
}
namespace, subsystem, name := config.TestInstrumentationConfig().Namespace, MetricsSubsystem, "peers"
re := regexp.MustCompile(namespace + `_` + subsystem + `_` + name + ` ([0-9\.]+)`)
peersMetricValue := func() float64 {
matches := re.FindStringSubmatch(scrapeMetrics())
f, _ := strconv.ParseFloat(matches[1], 64)
return f
}
p2pMetrics := PrometheusMetrics(namespace)
// make two connected switches
sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch) *Switch {
// set metrics on sw1
if i == 0 {
opt := WithMetrics(p2pMetrics)
opt(sw)
}
return initSwitchFunc(i, sw)
})
assert.Equal(t, len(sw1.Peers().List()), 1)
assert.EqualValues(t, 1, peersMetricValue())
// send messages to the peer from sw1
p := sw1.Peers().List()[0]
p.Send(0x1, []byte("here's a message to send"))
// stop sw2. this should cause the p to fail,
// which results in calling StopPeerForError internally
sw2.Stop()
// now call StopPeerForError explicitly, eg. from a reactor
sw1.StopPeerForError(p, fmt.Errorf("some err"))
assert.Equal(t, len(sw1.Peers().List()), 0)
assert.EqualValues(t, 0, peersMetricValue())
}
func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
assert, require := assert.New(t), require.New(t)


+ 1
- 1
p2p/test_util.go View File

@ -184,7 +184,7 @@ func MakeSwitch(
// TODO: let the config be passed in?
sw := initSwitch(i, NewSwitch(cfg, t, opts...))
sw.SetLogger(log.TestingLogger())
sw.SetLogger(log.TestingLogger().With("switch", i))
sw.SetNodeKey(&nodeKey)
ni := nodeInfo.(DefaultNodeInfo)


Loading…
Cancel
Save