Browse Source

p2p/pex: cleanup to pex internals and peerManager interface (#6476)

pull/6477/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
0781ca3f50
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 103 additions and 135 deletions
  1. +1
    -1
      blockchain/v0/reactor_test.go
  2. +2
    -2
      evidence/reactor_test.go
  3. +3
    -6
      p2p/peermanager.go
  4. +35
    -35
      p2p/peermanager_test.go
  5. +25
    -28
      p2p/pex/reactor.go
  6. +5
    -4
      p2p/pex/reactor_test.go
  7. +31
    -58
      p2p/router.go
  8. +1
    -1
      p2p/router_test.go

+ 1
- 1
blockchain/v0/reactor_test.go View File

@ -211,7 +211,7 @@ func TestReactor_AbruptDisconnect(t *testing.T) {
Status: p2p.PeerStatusDown,
NodeID: rts.nodes[0],
}
require.NoError(t, rts.network.Nodes[rts.nodes[1]].PeerManager.Disconnected(rts.nodes[0]))
rts.network.Nodes[rts.nodes[1]].PeerManager.Disconnected(rts.nodes[0])
}
func TestReactor_NoBlockResponse(t *testing.T) {


+ 2
- 2
evidence/reactor_test.go View File

@ -256,11 +256,11 @@ func TestReactorMultiDisconnect(t *testing.T) {
// Ensure "disconnecting" the secondary peer from the primary more than once
// is handled gracefully.
require.NoError(t, primary.PeerManager.Disconnected(secondary.NodeID))
primary.PeerManager.Disconnected(secondary.NodeID)
require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusDown)
_, err := primary.PeerManager.TryEvictNext()
require.NoError(t, err)
require.NoError(t, primary.PeerManager.Disconnected(secondary.NodeID))
primary.PeerManager.Disconnected(secondary.NodeID)
require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusDown)
require.Equal(t, secondary.PeerManager.Status(primary.NodeID), p2p.PeerStatusUp)


+ 3
- 6
p2p/peermanager.go View File

@ -684,7 +684,7 @@ func (m *PeerManager) Accepted(peerID NodeID) error {
// peer must already be marked as connected. This is separate from Dialed() and
// Accepted() to allow the router to set up its internal queues before reactors
// start sending messages.
func (m *PeerManager) Ready(peerID NodeID) error {
func (m *PeerManager) Ready(peerID NodeID) {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -695,7 +695,6 @@ func (m *PeerManager) Ready(peerID NodeID) error {
Status: PeerStatusUp,
})
}
return nil
}
// EvictNext returns the next peer to evict (i.e. disconnect). If no evictable
@ -752,7 +751,7 @@ func (m *PeerManager) TryEvictNext() (NodeID, error) {
// Disconnected unmarks a peer as connected, allowing it to be dialed or
// accepted again as appropriate.
func (m *PeerManager) Disconnected(peerID NodeID) error {
func (m *PeerManager) Disconnected(peerID NodeID) {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -772,7 +771,6 @@ func (m *PeerManager) Disconnected(peerID NodeID) error {
}
m.dialWaker.Wake()
return nil
}
// Errored reports a peer error, causing the peer to be evicted if it's
@ -783,7 +781,7 @@ func (m *PeerManager) Disconnected(peerID NodeID) error {
//
// FIXME: This will cause the peer manager to immediately try to reconnect to
// the peer, which is probably not always what we want.
func (m *PeerManager) Errored(peerID NodeID, err error) error {
func (m *PeerManager) Errored(peerID NodeID, err error) {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -792,7 +790,6 @@ func (m *PeerManager) Errored(peerID NodeID, err error) error {
}
m.evictWaker.Wake()
return nil
}
// Advertise returns a list of peer addresses to advertise to a peer.


+ 35
- 35
p2p/peermanager_test.go View File

@ -446,7 +446,7 @@ func TestPeerManager_DialNext_WakeOnDisconnected(t *testing.T) {
go func() {
time.Sleep(200 * time.Millisecond)
require.NoError(t, peerManager.Disconnected(a.NodeID))
peerManager.Disconnected(a.NodeID)
}()
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
@ -558,7 +558,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
// Now, if we disconnect a, we should be allowed to dial d because we have a
// free upgrade slot.
require.NoError(t, peerManager.Disconnected(a.NodeID))
peerManager.Disconnected(a.NodeID)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, d, dial)
@ -567,7 +567,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) {
// However, if we disconnect b (such that only c and d are connected), we
// should not be allowed to dial e even though there are upgrade slots,
// because there are no lower-scored nodes that can be upgraded.
require.NoError(t, peerManager.Disconnected(b.NodeID))
peerManager.Disconnected(b.NodeID)
added, err = peerManager.Add(e)
require.NoError(t, err)
require.True(t, added)
@ -979,7 +979,7 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) {
// In the meanwhile, a disconnects and d connects. d is even lower-scored
// than b (1 vs 2), which is currently being upgraded.
require.NoError(t, peerManager.Disconnected(a.NodeID))
peerManager.Disconnected(a.NodeID)
added, err = peerManager.Add(d)
require.NoError(t, err)
require.True(t, added)
@ -1029,7 +1029,7 @@ func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) {
require.Equal(t, c, dial)
// In the meanwhile, b disconnects.
require.NoError(t, peerManager.Disconnected(b.NodeID))
peerManager.Disconnected(b.NodeID)
// Once c completes the upgrade of b, there is no longer a need to
// evict anything since we're at capacity.
@ -1198,7 +1198,7 @@ func TestPeerManager_Accepted_Upgrade(t *testing.T) {
evict, err := peerManager.TryEvictNext()
require.NoError(t, err)
require.Equal(t, a.NodeID, evict)
require.NoError(t, peerManager.Disconnected(a.NodeID))
peerManager.Disconnected(a.NodeID)
// c still cannot get accepted, since it's not scored above b.
require.Error(t, peerManager.Accepted(c.NodeID))
@ -1269,7 +1269,7 @@ func TestPeerManager_Ready(t *testing.T) {
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID))
// Marking a as ready should transition it to PeerStatusUp and send an update.
require.NoError(t, peerManager.Ready(a.NodeID))
peerManager.Ready(a.NodeID)
require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
require.Equal(t, p2p.PeerUpdate{
NodeID: a.NodeID,
@ -1281,7 +1281,7 @@ func TestPeerManager_Ready(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
require.NoError(t, peerManager.Ready(b.NodeID))
peerManager.Ready(b.NodeID)
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(b.NodeID))
require.Empty(t, sub.Updates())
}
@ -1297,7 +1297,7 @@ func TestPeerManager_EvictNext(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
peerManager.Ready(a.NodeID)
// Since there are no peers to evict, EvictNext should block until timeout.
timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
@ -1307,7 +1307,7 @@ func TestPeerManager_EvictNext(t *testing.T) {
require.Equal(t, context.DeadlineExceeded, err)
// Erroring the peer will return it from EvictNext().
require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
peerManager.Errored(a.NodeID, errors.New("foo"))
evict, err := peerManager.EvictNext(timeoutCtx)
require.NoError(t, err)
require.Equal(t, a.NodeID, evict)
@ -1330,12 +1330,12 @@ func TestPeerManager_EvictNext_WakeOnError(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
peerManager.Ready(a.NodeID)
// Spawn a goroutine to error a peer after a delay.
go func() {
time.Sleep(200 * time.Millisecond)
require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
peerManager.Errored(a.NodeID, errors.New("foo"))
}()
// This will block until peer errors above.
@ -1362,7 +1362,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
peerManager.Ready(a.NodeID)
// Spawn a goroutine to upgrade to b with a delay.
go func() {
@ -1400,7 +1400,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
peerManager.Ready(a.NodeID)
// Spawn a goroutine to upgrade b with a delay.
go func() {
@ -1432,10 +1432,10 @@ func TestPeerManager_TryEvictNext(t *testing.T) {
// Connecting to a won't evict anything either.
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
peerManager.Ready(a.NodeID)
// But if a errors it should be evicted.
require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
peerManager.Errored(a.NodeID, errors.New("foo"))
evict, err = peerManager.TryEvictNext()
require.NoError(t, err)
require.Equal(t, a.NodeID, evict)
@ -1445,7 +1445,7 @@ func TestPeerManager_TryEvictNext(t *testing.T) {
require.NoError(t, err)
require.Zero(t, evict)
require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
peerManager.Errored(a.NodeID, errors.New("foo"))
evict, err = peerManager.TryEvictNext()
require.NoError(t, err)
require.Zero(t, evict)
@ -1461,7 +1461,7 @@ func TestPeerManager_Disconnected(t *testing.T) {
defer sub.Close()
// Disconnecting an unknown peer does nothing.
require.NoError(t, peerManager.Disconnected(a.NodeID))
peerManager.Disconnected(a.NodeID)
require.Empty(t, peerManager.Peers())
require.Empty(t, sub.Updates())
@ -1470,14 +1470,14 @@ func TestPeerManager_Disconnected(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Disconnected(a.NodeID))
peerManager.Disconnected(a.NodeID)
require.Empty(t, sub.Updates())
// Disconnecting a ready peer sends a status update.
_, err = peerManager.Add(a)
require.NoError(t, err)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
peerManager.Ready(a.NodeID)
require.Equal(t, p2p.PeerStatusUp, peerManager.Status(a.NodeID))
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{
@ -1485,7 +1485,7 @@ func TestPeerManager_Disconnected(t *testing.T) {
Status: p2p.PeerStatusUp,
}, <-sub.Updates())
require.NoError(t, peerManager.Disconnected(a.NodeID))
peerManager.Disconnected(a.NodeID)
require.Equal(t, p2p.PeerStatusDown, peerManager.Status(a.NodeID))
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{
@ -1499,7 +1499,7 @@ func TestPeerManager_Disconnected(t *testing.T) {
require.NoError(t, err)
require.Equal(t, a, dial)
require.NoError(t, peerManager.Disconnected(a.NodeID))
peerManager.Disconnected(a.NodeID)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Zero(t, dial)
@ -1512,7 +1512,7 @@ func TestPeerManager_Errored(t *testing.T) {
require.NoError(t, err)
// Erroring an unknown peer does nothing.
require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
peerManager.Errored(a.NodeID, errors.New("foo"))
require.Empty(t, peerManager.Peers())
evict, err := peerManager.TryEvictNext()
require.NoError(t, err)
@ -1523,19 +1523,19 @@ func TestPeerManager_Errored(t *testing.T) {
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
peerManager.Errored(a.NodeID, errors.New("foo"))
evict, err = peerManager.TryEvictNext()
require.NoError(t, err)
require.Zero(t, evict)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
peerManager.Ready(a.NodeID)
evict, err = peerManager.TryEvictNext()
require.NoError(t, err)
require.Zero(t, evict)
// However, erroring once connected will evict it.
require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
peerManager.Errored(a.NodeID, errors.New("foo"))
evict, err = peerManager.TryEvictNext()
require.NoError(t, err)
require.Equal(t, a.NodeID, evict)
@ -1560,11 +1560,11 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.NoError(t, peerManager.Accepted(a.NodeID))
require.Empty(t, sub.Updates())
require.NoError(t, peerManager.Ready(a.NodeID))
peerManager.Ready(a.NodeID)
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
require.NoError(t, peerManager.Disconnected(a.NodeID))
peerManager.Disconnected(a.NodeID)
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
@ -1577,18 +1577,18 @@ func TestPeerManager_Subscribe(t *testing.T) {
require.NoError(t, peerManager.Dialed(a))
require.Empty(t, sub.Updates())
require.NoError(t, peerManager.Ready(a.NodeID))
peerManager.Ready(a.NodeID)
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
require.NoError(t, peerManager.Errored(a.NodeID, errors.New("foo")))
peerManager.Errored(a.NodeID, errors.New("foo"))
require.Empty(t, sub.Updates())
evict, err := peerManager.TryEvictNext()
require.NoError(t, err)
require.Equal(t, a.NodeID, evict)
require.NoError(t, peerManager.Disconnected(a.NodeID))
peerManager.Disconnected(a.NodeID)
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates())
@ -1617,13 +1617,13 @@ func TestPeerManager_Subscribe_Close(t *testing.T) {
require.NoError(t, peerManager.Accepted(a.NodeID))
require.Empty(t, sub.Updates())
require.NoError(t, peerManager.Ready(a.NodeID))
peerManager.Ready(a.NodeID)
require.NotEmpty(t, sub.Updates())
require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}, <-sub.Updates())
// Closing the subscription should not send us the disconnected update.
sub.Close()
require.NoError(t, peerManager.Disconnected(a.NodeID))
peerManager.Disconnected(a.NodeID)
require.Empty(t, sub.Updates())
}
@ -1647,7 +1647,7 @@ func TestPeerManager_Subscribe_Broadcast(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.NoError(t, peerManager.Accepted(a.NodeID))
require.NoError(t, peerManager.Ready(a.NodeID))
peerManager.Ready(a.NodeID)
expectUp := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusUp}
require.NotEmpty(t, s1)
@ -1660,7 +1660,7 @@ func TestPeerManager_Subscribe_Broadcast(t *testing.T) {
// We now close s2. Disconnecting the peer should only send updates
// on s1 and s3.
s2.Close()
require.NoError(t, peerManager.Disconnected(a.NodeID))
peerManager.Disconnected(a.NodeID)
expectDown := p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}
require.NotEmpty(t, s1)


+ 25
- 28
p2p/pex/reactor.go View File

@ -6,7 +6,6 @@ import (
"sync"
"time"
"github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/libs/service"
@ -24,7 +23,7 @@ var (
// See https://github.com/tendermint/tendermint/issues/6371
const (
// the minimum time one peer can send another request to the same peer
minReceiveRequestInterval = 300 * time.Millisecond
minReceiveRequestInterval = 100 * time.Millisecond
// the maximum amount of addresses that can be included in a response
maxAddresses uint16 = 100
@ -78,7 +77,7 @@ type ReactorV2 struct {
closeCh chan struct{}
// list of available peers to loop through and send peer requests to
availablePeers *clist.CList
availablePeers map[p2p.NodeID]struct{}
mtx sync.RWMutex
@ -120,7 +119,7 @@ func NewReactorV2(
pexCh: pexCh,
peerUpdates: peerUpdates,
closeCh: make(chan struct{}),
availablePeers: clist.New(),
availablePeers: make(map[p2p.NodeID]struct{}),
requestsSent: make(map[p2p.NodeID]struct{}),
lastReceivedRequests: make(map[p2p.NodeID]time.Time),
}
@ -387,11 +386,17 @@ func (r *ReactorV2) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (er
// send a request for addresses.
func (r *ReactorV2) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.Logger.Debug("received PEX peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
defer r.mtx.Unlock()
switch peerUpdate.Status {
case p2p.PeerStatusUp:
r.availablePeers.PushBack(peerUpdate.NodeID)
r.availablePeers[peerUpdate.NodeID] = struct{}{}
case p2p.PeerStatusDown:
r.removePeer(peerUpdate.NodeID)
delete(r.availablePeers, peerUpdate.NodeID)
delete(r.requestsSent, peerUpdate.NodeID)
delete(r.lastReceivedRequests, peerUpdate.NodeID)
default:
}
}
@ -407,14 +412,18 @@ func (r *ReactorV2) waitUntilNextRequest() <-chan time.Time {
func (r *ReactorV2) sendRequestForPeers() {
r.mtx.Lock()
defer r.mtx.Unlock()
peer := r.availablePeers.Front()
if peer == nil {
if len(r.availablePeers) == 0 {
// no peers are available
r.Logger.Debug("no available peers to send request to, waiting...")
r.nextRequestTime = time.Now().Add(noAvailablePeersWaitPeriod)
return
}
peerID := peer.Value.(p2p.NodeID)
var peerID p2p.NodeID
// use range to get a random peer.
for peerID = range r.availablePeers {
break
}
// The node accommodates for both pex systems
if r.isLegacyPeer(peerID) {
@ -429,9 +438,10 @@ func (r *ReactorV2) sendRequestForPeers() {
}
}
// remove the peer from the available peers list and mark it in the requestsSent map
r.availablePeers.Remove(peer)
peer.DetachPrev()
// remove the peer from the abvailable peers list and mark it in the requestsSent map
// WAT(tychoish): do we actually want to do this? doesn't this
// just make churn?
delete(r.availablePeers, peerID)
r.requestsSent[peerID] = struct{}{}
r.calculateNextRequestTime()
@ -462,7 +472,7 @@ func (r *ReactorV2) calculateNextRequestTime() {
// in. For example if we have 10 peers and we can't send a message to the
// same peer every 500ms, then we can send a request every 50ms. In practice
// we use a safety margin of 2, ergo 100ms
peers := tmmath.MinInt(r.availablePeers.Len(), 50)
peers := tmmath.MinInt(len(r.availablePeers), 50)
baseTime := minReceiveRequestInterval
if peers > 0 {
baseTime = minReceiveRequestInterval * 2 / time.Duration(peers)
@ -484,20 +494,6 @@ func (r *ReactorV2) calculateNextRequestTime() {
r.nextRequestTime = time.Now().Add(baseTime * time.Duration(r.discoveryRatio))
}
func (r *ReactorV2) removePeer(id p2p.NodeID) {
for e := r.availablePeers.Front(); e != nil; e = e.Next() {
if e.Value == id {
r.availablePeers.Remove(e)
e.DetachPrev()
break
}
}
r.mtx.Lock()
defer r.mtx.Unlock()
delete(r.requestsSent, id)
delete(r.lastReceivedRequests, id)
}
func (r *ReactorV2) markPeerRequest(peer p2p.NodeID) error {
r.mtx.Lock()
defer r.mtx.Unlock()
@ -521,7 +517,8 @@ func (r *ReactorV2) markPeerResponse(peer p2p.NodeID) error {
delete(r.requestsSent, peer)
// attach to the back of the list so that the peer can be used again for
// future requests
r.availablePeers.PushBack(peer)
r.availablePeers[peer] = struct{}{}
return nil
}


+ 5
- 4
p2p/pex/reactor_test.go View File

@ -193,10 +193,10 @@ func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) {
func TestReactorLargePeerStoreInASmallNetwork(t *testing.T) {
testNet := setupNetwork(t, testOptions{
TotalNodes: 10,
MaxPeers: 100,
MaxConnected: 100,
BufferSize: 10,
TotalNodes: 5,
MaxPeers: 50,
MaxConnected: 50,
BufferSize: 5,
})
testNet.connectN(t, 1)
testNet.start(t)
@ -668,6 +668,7 @@ func (r *reactorTestSuite) requireNumberOfPeers(
nodeIndex, numPeers int,
waitPeriod time.Duration,
) {
t.Helper()
require.Eventuallyf(t, func() bool {
actualNumPeers := len(r.network.Nodes[r.nodes[nodeIndex]].PeerManager.Peers())
return actualNumPeers >= numPeers


+ 31
- 58
p2p/router.go View File

@ -472,9 +472,7 @@ func (r *Router) routeChannel(
r.logger.Error("peer error, evicting", "peer", peerError.NodeID, "err", peerError.Err)
if err := r.peerManager.Errored(peerError.NodeID, peerError.Err); err != nil {
r.logger.Error("failed to report peer error", "peer", peerError.NodeID, "err", err)
}
r.peerManager.Errored(peerError.NodeID, peerError.Err)
case <-r.stopCh:
return
@ -574,38 +572,13 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) {
return
}
if err := r.peerManager.Accepted(peerInfo.NodeID); err != nil {
r.logger.Error("failed to accept connection", "peer", peerInfo.NodeID, "err", err)
return
}
r.metrics.Peers.Add(1)
queue := r.queueFactory(queueBufferDefault)
r.peerMtx.Lock()
r.peerQueues[peerInfo.NodeID] = queue
r.peerMtx.Unlock()
defer func() {
r.peerMtx.Lock()
delete(r.peerQueues, peerInfo.NodeID)
r.peerMtx.Unlock()
queue.close()
if err := r.peerManager.Disconnected(peerInfo.NodeID); err != nil {
r.logger.Error("failed to disconnect peer", "peer", peerInfo.NodeID, "err", err)
} else {
r.metrics.Peers.Add(-1)
}
}()
if err := r.peerManager.Ready(peerInfo.NodeID); err != nil {
r.logger.Error("failed to mark peer as ready", "peer", peerInfo.NodeID, "err", err)
if err := r.runWithPeerMutex(func() error { return r.peerManager.Accepted(peerInfo.NodeID) }); err != nil {
r.logger.Error("failed to accept connection",
"op", "incoming/accepted", "peer", peerInfo.NodeID, "err", err)
return
}
r.routePeer(peerInfo.NodeID, conn, queue)
r.routePeer(peerInfo.NodeID, conn)
}
// dialPeers maintains outbound connections to peers by dialing them.
@ -652,34 +625,13 @@ func (r *Router) dialPeers() {
return
}
if err = r.peerManager.Dialed(address); err != nil {
r.logger.Error("failed to dial peer", "peer", address, "err", err)
if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil {
r.logger.Error("failed to accept connection",
"op", "outgoing/dialing", "peer", address.NodeID, "err", err)
return
}
r.metrics.Peers.Add(1)
peerQueue := r.getOrMakeQueue(peerID)
defer func() {
r.peerMtx.Lock()
delete(r.peerQueues, peerID)
r.peerMtx.Unlock()
peerQueue.close()
if err := r.peerManager.Disconnected(peerID); err != nil {
r.logger.Error("failed to disconnect peer", "peer", address, "err", err)
} else {
r.metrics.Peers.Add(-1)
}
}()
if err := r.peerManager.Ready(peerID); err != nil {
r.logger.Error("failed to mark peer as ready", "peer", address, "err", err)
return
}
r.routePeer(peerID, conn, peerQueue)
r.routePeer(peerID, conn)
}()
}
}
@ -775,10 +727,31 @@ func (r *Router) handshakePeer(ctx context.Context, conn Connection, expectID No
return peerInfo, peerKey, nil
}
func (r *Router) runWithPeerMutex(fn func() error) error {
r.peerMtx.Lock()
defer r.peerMtx.Unlock()
return fn()
}
// routePeer routes inbound and outbound messages between a peer and the reactor
// channels. It will close the given connection and send queue when done, or if
// they are closed elsewhere it will cause this method to shut down and return.
func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) {
func (r *Router) routePeer(peerID NodeID, conn Connection) {
r.metrics.Peers.Add(1)
r.peerManager.Ready(peerID)
sendQueue := r.getOrMakeQueue(peerID)
defer func() {
r.peerMtx.Lock()
delete(r.peerQueues, peerID)
r.peerMtx.Unlock()
sendQueue.close()
r.peerManager.Disconnected(peerID)
r.metrics.Peers.Add(-1)
}()
r.logger.Info("peer connected", "peer", peerID, "endpoint", conn)
errCh := make(chan error, 2)


+ 1
- 1
p2p/router_test.go View File

@ -738,7 +738,7 @@ func TestRouter_EvictPeers(t *testing.T) {
Status: p2p.PeerStatusUp,
})
require.NoError(t, peerManager.Errored(peerInfo.NodeID, errors.New("boom")))
peerManager.Errored(peerInfo.NodeID, errors.New("boom"))
p2ptest.RequireUpdate(t, sub, p2p.PeerUpdate{
NodeID: peerInfo.NodeID,


Loading…
Cancel
Save