Browse Source

p2p: PeerBehaviour implementation (#3539) (#3552)

* p2p: initial implementation of peer behaviour

* [p2p] re-use newMockPeer

* p2p: add inline docs for peer_behaviour interface

* [p2p] Align PeerBehaviour interface (#3558)

* [p2p] make switchedPeerHebaviour private

* [p2p] make storePeerHebaviour private

* [p2p] Add CHANGELOG_PENDING entry for PeerBehaviour

* [p2p] Adjustment naming for PeerBehaviour

* [p2p] Add coarse lock around storedPeerBehaviour

* [p2p]: Fix non-pointer methods in storedPeerBehaviour

    + Structs with embeded locks must specify all methods with pointer
     receivers to avoid creating a copy of the embeded lock.

* [p2p] Thorough refactoring based on comments in #3552

    + Decouple PeerBehaviour interface from Peer by parametrizing
    methods with `p2p.ID` instead of `p2p.Peer`
    + Setter methods wrapped in a write lock
    + Getter methods wrapped in a read lock
    + Getter methods on storedPeerBehaviour now take a `p2p.ID`
    + Getter methods return a copy of underlying stored behaviours
    + Add doc strings to public types and methods

* [p2p] make structs public

* [p2p] Test empty StoredPeerBehaviour

* [p2p] typo fix

* [p2p] add TestStoredPeerBehaviourConcurrency

    + Add a test which uses StoredPeerBehaviour in multiple goroutines
      to ensure thread-safety.

* Update p2p/peer_behaviour.go

Co-Authored-By: brapse <brapse@gmail.com>

* Update p2p/peer_behaviour.go

Co-Authored-By: brapse <brapse@gmail.com>

* Update p2p/peer_behaviour.go

Co-Authored-By: brapse <brapse@gmail.com>

* Update p2p/peer_behaviour.go

Co-Authored-By: brapse <brapse@gmail.com>

* Update p2p/peer_behaviour.go

Co-Authored-By: brapse <brapse@gmail.com>

* Update p2p/peer_behaviour_test.go

Co-Authored-By: brapse <brapse@gmail.com>

* Update p2p/peer_behaviour.go

Co-Authored-By: brapse <brapse@gmail.com>

* Update p2p/peer_behaviour.go

Co-Authored-By: brapse <brapse@gmail.com>

* Update p2p/peer_behaviour.go

Co-Authored-By: brapse <brapse@gmail.com>

* Update p2p/peer_behaviour.go

Co-Authored-By: brapse <brapse@gmail.com>

* [p2p] field ordering convention

* p2p: peer behaviour refactor

    + Change naming of reporting behaviour to `Report`
    + Remove the responsibility of distinguishing between the categories
    of good and bad behaviour and instead focus on reporting behaviour.

* p2p: rename PeerReporter -> PeerBehaviourReporter
pull/3673/head
Sean Braithwaite 5 years ago
committed by Ethan Buchman
parent
commit
86cf8ee3f9
3 changed files with 209 additions and 0 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +92
    -0
      p2p/peer_behaviour.go
  3. +115
    -0
      p2p/peer_behaviour_test.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -31,6 +31,8 @@
- [node] \#2659 Add `node.Mempool()` method, which allows you to access mempool
### IMPROVEMENTS:
- [p2p] [\#3463](https://github.com/tendermint/tendermint/pull/3463) Do not log "Can't add peer's address to addrbook" error for a private peer
- [p2p] [\#3552](https://github.com/tendermint/tendermint/pull/3552) Add PeerBehaviour Interface (@brapse)
- [rpc] [\#3534](https://github.com/tendermint/tendermint/pull/3534) Add support for batched requests/responses in JSON RPC
- [cli] \#3585 Add option to not clear address book with unsafe reset (@climber73)
- [cli] [\#3160](https://github.com/tendermint/tendermint/issues/3160) Add `-config=<path-to-config>` option to `testnet` cmd (@gregdhill)


+ 92
- 0
p2p/peer_behaviour.go View File

@ -0,0 +1,92 @@
package p2p
import (
"errors"
"sync"
)
// PeerBehaviour are types of reportable behaviours about peers.
type PeerBehaviour int
const (
PeerBehaviourBadMessage = iota
PeerBehaviourMessageOutOfOrder
PeerBehaviourVote
PeerBehaviourBlockPart
)
// PeerBehaviourReporter provides an interface for reactors to report the behaviour
// of peers synchronously to other components.
type PeerBehaviourReporter interface {
Report(peerID ID, behaviour PeerBehaviour) error
}
// SwitchPeerBehaviouReporter reports peer behaviour to an internal Switch
type SwitchPeerBehaviourReporter struct {
sw *Switch
}
// Return a new SwitchPeerBehaviourReporter instance which wraps the Switch.
func NewSwitchPeerBehaviourReporter(sw *Switch) *SwitchPeerBehaviourReporter {
return &SwitchPeerBehaviourReporter{
sw: sw,
}
}
// Report reports the behaviour of a peer to the Switch
func (spbr *SwitchPeerBehaviourReporter) Report(peerID ID, behaviour PeerBehaviour) error {
peer := spbr.sw.Peers().Get(peerID)
if peer == nil {
return errors.New("Peer not found")
}
switch behaviour {
case PeerBehaviourVote, PeerBehaviourBlockPart:
spbr.sw.MarkPeerAsGood(peer)
case PeerBehaviourBadMessage:
spbr.sw.StopPeerForError(peer, "Bad message")
case PeerBehaviourMessageOutOfOrder:
spbr.sw.StopPeerForError(peer, "Message out of order")
default:
return errors.New("Unknown behaviour")
}
return nil
}
// MockPeerBehaviourReporter serves a mock concrete implementation of the
// PeerBehaviourReporter interface used in reactor tests to ensure reactors
// report the correct behaviour in manufactured scenarios.
type MockPeerBehaviourReporter struct {
mtx sync.RWMutex
pb map[ID][]PeerBehaviour
}
// NewMockPeerBehaviourReporter returns a PeerBehaviourReporter which records all reported
// behaviours in memory.
func NewMockPeerBehaviourReporter() *MockPeerBehaviourReporter {
return &MockPeerBehaviourReporter{
pb: map[ID][]PeerBehaviour{},
}
}
// Report stores the PeerBehaviour produced by the peer identified by peerID.
func (mpbr *MockPeerBehaviourReporter) Report(peerID ID, behaviour PeerBehaviour) {
mpbr.mtx.Lock()
defer mpbr.mtx.Unlock()
mpbr.pb[peerID] = append(mpbr.pb[peerID], behaviour)
}
// GetBehaviours returns all behaviours reported on the peer identified by peerID.
func (mpbr *MockPeerBehaviourReporter) GetBehaviours(peerID ID) []PeerBehaviour {
mpbr.mtx.RLock()
defer mpbr.mtx.RUnlock()
if items, ok := mpbr.pb[peerID]; ok {
result := make([]PeerBehaviour, len(items))
copy(result, items)
return result
} else {
return []PeerBehaviour{}
}
}

+ 115
- 0
p2p/peer_behaviour_test.go View File

@ -0,0 +1,115 @@
package p2p
import (
"net"
"sync"
"testing"
)
// TestMockPeerBehaviour tests the MockPeerBehaviour' ability to store reported
// peer behaviour in memory indexed by the peerID
func TestMockPeerBehaviourReporter(t *testing.T) {
peer := newMockPeer(net.IP{127, 0, 0, 1})
pr := NewMockPeerBehaviourReporter()
behaviours := pr.GetBehaviours(peer.ID())
if len(behaviours) != 0 {
t.Errorf("Expected to have no behaviours reported")
}
pr.Report(peer.ID(), PeerBehaviourBadMessage)
behaviours = pr.GetBehaviours(peer.ID())
if len(behaviours) != 1 {
t.Errorf("Expected the peer have one reported behaviour")
}
if behaviours[0] != PeerBehaviourBadMessage {
t.Errorf("Expected PeerBehaviourBadMessage to have been reported")
}
}
type scriptedBehaviours struct {
PeerID ID
Behaviours []PeerBehaviour
}
type scriptItem struct {
PeerID ID
Behaviour PeerBehaviour
}
func equalBehaviours(a []PeerBehaviour, b []PeerBehaviour) bool {
if len(a) != len(b) {
return false
}
same := make([]PeerBehaviour, len(a))
for i, aBehaviour := range a {
for _, bBehaviour := range b {
if aBehaviour == bBehaviour {
same[i] = aBehaviour
}
}
}
return len(same) == len(a)
}
// TestPeerBehaviourConcurrency constructs a scenario in which
// multiple goroutines are using the same MockPeerBehaviourReporter instance.
// This test reproduces the conditions in which MockPeerBehaviourReporter will
// be used within a Reactor Receive method tests to ensure thread safety.
func TestMockPeerBehaviourReporterConcurrency(t *testing.T) {
behaviourScript := []scriptedBehaviours{
{"1", []PeerBehaviour{PeerBehaviourVote}},
{"2", []PeerBehaviour{PeerBehaviourVote, PeerBehaviourVote, PeerBehaviourVote, PeerBehaviourVote}},
{"3", []PeerBehaviour{PeerBehaviourBlockPart, PeerBehaviourVote, PeerBehaviourBlockPart, PeerBehaviourVote}},
{"4", []PeerBehaviour{PeerBehaviourVote, PeerBehaviourVote, PeerBehaviourVote, PeerBehaviourVote}},
{"5", []PeerBehaviour{PeerBehaviourBlockPart, PeerBehaviourVote, PeerBehaviourBlockPart, PeerBehaviourVote}},
}
var receiveWg sync.WaitGroup
pr := NewMockPeerBehaviourReporter()
scriptItems := make(chan scriptItem)
done := make(chan int)
numConsumers := 3
for i := 0; i < numConsumers; i++ {
receiveWg.Add(1)
go func() {
defer receiveWg.Done()
for {
select {
case pb := <-scriptItems:
pr.Report(pb.PeerID, pb.Behaviour)
case <-done:
return
}
}
}()
}
var sendingWg sync.WaitGroup
sendingWg.Add(1)
go func() {
defer sendingWg.Done()
for _, item := range behaviourScript {
for _, reason := range item.Behaviours {
scriptItems <- scriptItem{item.PeerID, reason}
}
}
}()
sendingWg.Wait()
for i := 0; i < numConsumers; i++ {
done <- 1
}
receiveWg.Wait()
for _, items := range behaviourScript {
if !equalBehaviours(pr.GetBehaviours(items.PeerID), items.Behaviours) {
t.Errorf("Expected peer %s to have behaved \n", items.PeerID)
}
}
}

Loading…
Cancel
Save