diff --git a/CHANGELOG.md b/CHANGELOG.md index 47528bce9..64d37e68b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,9 @@ key types that can be used by validators. reset to 0 every time a validator is updated - [p2p] \#2968 Panic on transport error rather than continuing to run but not accept new connections +- [p2p] \#2969 Fix mismatch in peer count between `/net_info` and the prometheus + metrics + ## v0.26.4 diff --git a/p2p/peer_set.go b/p2p/peer_set.go index 257856156..87cf61da0 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -98,13 +98,15 @@ func (ps *PeerSet) Get(peerKey ID) Peer { } // Remove discards peer by its Key, if the peer was previously memoized. -func (ps *PeerSet) Remove(peer Peer) { +// Returns true if the peer was removed, and false if it was not found. +// in the set. +func (ps *PeerSet) Remove(peer Peer) bool { ps.mtx.Lock() defer ps.mtx.Unlock() item := ps.lookup[peer.ID()] if item == nil { - return + return false } index := item.index @@ -116,7 +118,7 @@ func (ps *PeerSet) Remove(peer Peer) { if index == len(ps.list)-1 { ps.list = newList delete(ps.lookup, peer.ID()) - return + return true } // Replace the popped item with the last item in the old list. @@ -127,6 +129,7 @@ func (ps *PeerSet) Remove(peer Peer) { lastPeerItem.index = index ps.list = newList delete(ps.lookup, peer.ID()) + return true } // Size returns the number of unique items in the peerSet. diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index 04b877b0d..3eb5357d3 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -60,13 +60,15 @@ func TestPeerSetAddRemoveOne(t *testing.T) { n := len(peerList) // 1. Test removing from the front for i, peerAtFront := range peerList { - peerSet.Remove(peerAtFront) + removed := peerSet.Remove(peerAtFront) + assert.True(t, removed) wantSize := n - i - 1 for j := 0; j < 2; j++ { assert.Equal(t, false, peerSet.Has(peerAtFront.ID()), "#%d Run #%d: failed to remove peer", i, j) assert.Equal(t, wantSize, peerSet.Size(), "#%d Run #%d: failed to remove peer and decrement size", i, j) // Test the route of removing the now non-existent element - peerSet.Remove(peerAtFront) + removed := peerSet.Remove(peerAtFront) + assert.False(t, removed) } } @@ -81,7 +83,8 @@ func TestPeerSetAddRemoveOne(t *testing.T) { // b) In reverse, remove each element for i := n - 1; i >= 0; i-- { peerAtEnd := peerList[i] - peerSet.Remove(peerAtEnd) + removed := peerSet.Remove(peerAtEnd) + assert.True(t, removed) assert.Equal(t, false, peerSet.Has(peerAtEnd.ID()), "#%d: failed to remove item at end", i) assert.Equal(t, i, peerSet.Size(), "#%d: differing sizes after peerSet.Remove(atEndPeer)", i) } @@ -105,7 +108,8 @@ func TestPeerSetAddRemoveMany(t *testing.T) { } for i, peer := range peers { - peerSet.Remove(peer) + removed := peerSet.Remove(peer) + assert.True(t, removed) if peerSet.Has(peer.ID()) { t.Errorf("Failed to remove peer") } diff --git a/p2p/switch.go b/p2p/switch.go index eece71316..0490eebb8 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -211,7 +211,9 @@ func (sw *Switch) OnStop() { // Stop peers for _, p := range sw.peers.List() { p.Stop() - sw.peers.Remove(p) + if sw.peers.Remove(p) { + sw.metrics.Peers.Add(float64(-1)) + } } // Stop reactors @@ -299,8 +301,9 @@ func (sw *Switch) StopPeerGracefully(peer Peer) { } func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { - sw.peers.Remove(peer) - sw.metrics.Peers.Add(float64(-1)) + if sw.peers.Remove(peer) { + sw.metrics.Peers.Add(float64(-1)) + } peer.Stop() for _, reactor := range sw.reactors { reactor.RemovePeer(peer, reason) diff --git a/p2p/switch_test.go b/p2p/switch_test.go index f52e47f06..6c515be02 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -3,10 +3,17 @@ package p2p import ( "bytes" "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "regexp" + "strconv" "sync" "testing" "time" + stdprometheus "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -335,6 +342,54 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { assert.False(p.IsRunning()) } +func TestSwitchStopPeerForError(t *testing.T) { + s := httptest.NewServer(stdprometheus.UninstrumentedHandler()) + defer s.Close() + + scrapeMetrics := func() string { + resp, _ := http.Get(s.URL) + buf, _ := ioutil.ReadAll(resp.Body) + return string(buf) + } + + namespace, subsystem, name := config.TestInstrumentationConfig().Namespace, MetricsSubsystem, "peers" + re := regexp.MustCompile(namespace + `_` + subsystem + `_` + name + ` ([0-9\.]+)`) + peersMetricValue := func() float64 { + matches := re.FindStringSubmatch(scrapeMetrics()) + f, _ := strconv.ParseFloat(matches[1], 64) + return f + } + + p2pMetrics := PrometheusMetrics(namespace) + + // make two connected switches + sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch) *Switch { + // set metrics on sw1 + if i == 0 { + opt := WithMetrics(p2pMetrics) + opt(sw) + } + return initSwitchFunc(i, sw) + }) + + assert.Equal(t, len(sw1.Peers().List()), 1) + assert.EqualValues(t, 1, peersMetricValue()) + + // send messages to the peer from sw1 + p := sw1.Peers().List()[0] + p.Send(0x1, []byte("here's a message to send")) + + // stop sw2. this should cause the p to fail, + // which results in calling StopPeerForError internally + sw2.Stop() + + // now call StopPeerForError explicitly, eg. from a reactor + sw1.StopPeerForError(p, fmt.Errorf("some err")) + + assert.Equal(t, len(sw1.Peers().List()), 0) + assert.EqualValues(t, 0, peersMetricValue()) +} + func TestSwitchReconnectsToPersistentPeer(t *testing.T) { assert, require := assert.New(t), require.New(t) diff --git a/p2p/test_util.go b/p2p/test_util.go index ac29ecb47..44f27be40 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -184,7 +184,7 @@ func MakeSwitch( // TODO: let the config be passed in? sw := initSwitch(i, NewSwitch(cfg, t, opts...)) - sw.SetLogger(log.TestingLogger()) + sw.SetLogger(log.TestingLogger().With("switch", i)) sw.SetNodeKey(&nodeKey) ni := nodeInfo.(DefaultNodeInfo)