Browse Source

p2p: simple peer scoring (#6277)

pull/6299/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
91506bf25d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 118 additions and 27 deletions
  1. +1
    -1
      blockchain/v0/reactor_test.go
  2. +10
    -7
      consensus/reactor.go
  3. +1
    -1
      evidence/reactor_test.go
  4. +1
    -1
      mempool/reactor_test.go
  5. +65
    -13
      p2p/peermanager.go
  6. +36
    -0
      p2p/peermanager_test.go
  7. +3
    -3
      p2p/shim.go
  8. +1
    -1
      statesync/reactor_test.go

+ 1
- 1
blockchain/v0/reactor_test.go View File

@ -152,7 +152,7 @@ func (rts *reactorTestSuite) addNode(t *testing.T,
}
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID])
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID])
rts.reactors[nodeID], err = NewReactor(
rts.logger.With("nodeID", nodeID),


+ 10
- 7
consensus/reactor.go View File

@ -1377,18 +1377,21 @@ func (r *Reactor) peerStatsRoutine() {
switch msg.Msg.(type) {
case *VoteMessage:
if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 { // nolint: staticcheck
// TODO: Handle peer quality via the peer manager.
// r.Switch.MarkPeerAsGood(peer)
if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 {
r.peerUpdates.SendUpdate(p2p.PeerUpdate{
NodeID: msg.PeerID,
Status: p2p.PeerStatusGood,
})
}
case *BlockPartMessage:
if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 { // nolint: staticcheck
// TODO: Handle peer quality via the peer manager.
// r.Switch.MarkPeerAsGood(peer)
if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 {
r.peerUpdates.SendUpdate(p2p.PeerUpdate{
NodeID: msg.PeerID,
Status: p2p.PeerStatusGood,
})
}
}
case <-r.closeCh:
return
}


+ 1
- 1
evidence/reactor_test.go View File

@ -86,7 +86,7 @@ func setup(t *testing.T, stateStores []sm.Store, chBuf uint) *reactorTestSuite {
require.NoError(t, err)
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID])
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID])
rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID])


+ 1
- 1
mempool/reactor_test.go View File

@ -60,7 +60,7 @@ func setup(t *testing.T, cfg *cfg.MempoolConfig, numNodes int, chBuf uint) *reac
rts.mempools[nodeID] = mempool
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID])
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID])
rts.reactors[nodeID] = NewReactor(


+ 65
- 13
p2p/peermanager.go View File

@ -33,13 +33,15 @@ type PeerStatus string
const (
PeerStatusUp PeerStatus = "up" // connected and ready
PeerStatusDown PeerStatus = "down" // disconnected
PeerStatusGood PeerStatus = "good" // peer observed as good
PeerStatusBad PeerStatus = "bad" // peer observed as bad
)
// PeerScore is a numeric score assigned to a peer (higher is better).
type PeerScore uint8
const (
PeerScorePersistent PeerScore = 100 // persistent peers
PeerScorePersistent PeerScore = math.MaxUint8 // persistent peers
)
// PeerUpdate is a peer update event sent via PeerUpdates.
@ -51,24 +53,35 @@ type PeerUpdate struct {
// PeerUpdates is a peer update subscription with notifications about peer
// events (currently just status changes).
type PeerUpdates struct {
updatesCh chan PeerUpdate
closeCh chan struct{}
closeOnce sync.Once
routerUpdatesCh chan PeerUpdate
reactorUpdatesCh chan PeerUpdate
closeCh chan struct{}
closeOnce sync.Once
}
// NewPeerUpdates creates a new PeerUpdates subscription. It is primarily for
// internal use, callers should typically use PeerManager.Subscribe(). The
// subscriber must call Close() when done.
func NewPeerUpdates(updatesCh chan PeerUpdate) *PeerUpdates {
func NewPeerUpdates(updatesCh chan PeerUpdate, buf int) *PeerUpdates {
return &PeerUpdates{
updatesCh: updatesCh,
closeCh: make(chan struct{}),
reactorUpdatesCh: updatesCh,
routerUpdatesCh: make(chan PeerUpdate, buf),
closeCh: make(chan struct{}),
}
}
// Updates returns a channel for consuming peer updates.
func (pu *PeerUpdates) Updates() <-chan PeerUpdate {
return pu.updatesCh
return pu.reactorUpdatesCh
}
// SendUpdate pushes information about a peer into the routing layer,
// presumably from a peer.
func (pu *PeerUpdates) SendUpdate(update PeerUpdate) {
select {
case <-pu.closeCh:
case pu.routerUpdatesCh <- update:
}
}
// Close closes the peer updates subscription.
@ -791,7 +804,7 @@ func (m *PeerManager) Subscribe() *PeerUpdates {
// to the next subscriptions. This also prevents tail latencies from
// compounding. Limiting it to 1 means that the subscribers are still
// reasonably in sync. However, this should probably be benchmarked.
peerUpdates := NewPeerUpdates(make(chan PeerUpdate, 1))
peerUpdates := NewPeerUpdates(make(chan PeerUpdate, 1), 1)
m.Register(peerUpdates)
return peerUpdates
}
@ -809,6 +822,19 @@ func (m *PeerManager) Register(peerUpdates *PeerUpdates) {
m.subscriptions[peerUpdates] = peerUpdates
m.mtx.Unlock()
go func() {
for {
select {
case <-peerUpdates.closeCh:
return
case <-m.closeCh:
return
case pu := <-peerUpdates.routerUpdatesCh:
m.processPeerEvent(pu)
}
}
}()
go func() {
select {
case <-peerUpdates.Done():
@ -820,6 +846,22 @@ func (m *PeerManager) Register(peerUpdates *PeerUpdates) {
}()
}
func (m *PeerManager) processPeerEvent(pu PeerUpdate) {
m.mtx.Lock()
defer m.mtx.Unlock()
if _, ok := m.store.peers[pu.NodeID]; !ok {
m.store.peers[pu.NodeID] = &peerInfo{}
}
switch pu.Status {
case PeerStatusBad:
m.store.peers[pu.NodeID].MutableScore--
case PeerStatusGood:
m.store.peers[pu.NodeID].MutableScore++
}
}
// broadcast broadcasts a peer update to all subscriptions. The caller must
// already hold the mutex lock, to make sure updates are sent in the same order
// as the PeerManager processes them, but this means subscribers must be
@ -837,7 +879,7 @@ func (m *PeerManager) broadcast(peerUpdate PeerUpdate) {
default:
}
select {
case sub.updatesCh <- peerUpdate:
case sub.reactorUpdatesCh <- peerUpdate:
case <-sub.closeCh:
}
}
@ -1149,6 +1191,8 @@ type peerInfo struct {
Persistent bool
Height int64
FixedScore PeerScore // mainly for tests
MutableScore int64 // updated by router
}
// peerInfoFromProto converts a Protobuf PeerInfo message to a peerInfo,
@ -1205,14 +1249,22 @@ func (p *peerInfo) Copy() peerInfo {
// Score calculates a score for the peer. Higher-scored peers will be
// preferred over lower scores.
func (p *peerInfo) Score() PeerScore {
var score PeerScore
if p.FixedScore > 0 {
return p.FixedScore
}
if p.Persistent {
score += PeerScorePersistent
return PeerScorePersistent
}
return score
if p.MutableScore <= 0 {
return 0
}
if p.MutableScore >= math.MaxUint8 {
return PeerScore(math.MaxUint8)
}
return PeerScore(p.MutableScore)
}
// Validate validates the peer info.


+ 36
- 0
p2p/peermanager_test.go View File

@ -1615,3 +1615,39 @@ func TestPeerManager_SetHeight_GetHeight(t *testing.T) {
require.Zero(t, peerManager.GetHeight(a.NodeID))
require.Zero(t, peerManager.GetHeight(b.NodeID))
}
func TestPeerScoring(t *testing.T) {
// create a mock peer manager
db := dbm.NewMemDB()
peerManager, err := p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{})
require.NoError(t, err)
defer peerManager.Close()
// create a fake node
id := p2p.NodeID(strings.Repeat("a1", 20))
require.NoError(t, peerManager.Add(p2p.NodeAddress{NodeID: id, Protocol: "memory"}))
// update the manager and make sure it's correct
pu := peerManager.Subscribe()
require.EqualValues(t, 0, peerManager.Scores()[id])
// add a bunch of good status updates and watch things increase.
for i := 1; i < 10; i++ {
pu.SendUpdate(p2p.PeerUpdate{
NodeID: id,
Status: p2p.PeerStatusGood,
})
time.Sleep(time.Millisecond) // force a context switch
require.EqualValues(t, i, peerManager.Scores()[id])
}
// watch the corresponding decreases respond to update
for i := 10; i == 0; i-- {
pu.SendUpdate(p2p.PeerUpdate{
NodeID: id,
Status: p2p.PeerStatusBad,
})
time.Sleep(time.Millisecond) // force a context switch
require.EqualValues(t, i, peerManager.Scores()[id])
}
}

+ 3
- 3
p2p/shim.go View File

@ -64,7 +64,7 @@ func NewReactorShim(logger log.Logger, name string, descriptors map[ChannelID]*C
rs := &ReactorShim{
Name: name,
PeerUpdates: NewPeerUpdates(make(chan PeerUpdate)),
PeerUpdates: NewPeerUpdates(make(chan PeerUpdate), 0),
Channels: channels,
}
@ -230,7 +230,7 @@ func (rs *ReactorShim) GetChannels() []*ChannelDescriptor {
// handle adding a peer.
func (rs *ReactorShim) AddPeer(peer Peer) {
select {
case rs.PeerUpdates.updatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusUp}:
case rs.PeerUpdates.reactorUpdatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusUp}:
rs.Logger.Debug("sent peer update", "reactor", rs.Name, "peer", peer.ID(), "status", PeerStatusUp)
case <-rs.PeerUpdates.Done():
@ -249,7 +249,7 @@ func (rs *ReactorShim) AddPeer(peer Peer) {
// handle removing a peer.
func (rs *ReactorShim) RemovePeer(peer Peer, reason interface{}) {
select {
case rs.PeerUpdates.updatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusDown}:
case rs.PeerUpdates.reactorUpdatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusDown}:
rs.Logger.Debug(
"sent peer update",
"reactor", rs.Name,


+ 1
- 1
statesync/reactor_test.go View File

@ -62,7 +62,7 @@ func setup(
chunkInCh: make(chan p2p.Envelope, chBuf),
chunkOutCh: make(chan p2p.Envelope, chBuf),
chunkPeerErrCh: make(chan p2p.PeerError, chBuf),
peerUpdates: p2p.NewPeerUpdates(make(chan p2p.PeerUpdate)),
peerUpdates: p2p.NewPeerUpdates(make(chan p2p.PeerUpdate), int(chBuf)),
conn: conn,
connQuery: connQuery,
stateProvider: stateProvider,


Loading…
Cancel
Save