Browse Source

p2p: Peerbehaviour follow up (#3653) (#3663)

* Move peer behaviour into it's own package
* refactor wip
* Adjust API and fix tests
* remove unused test struct
* Better error message
* Restructure:
    + Now behaviour is it's own package, we don't need to include
    PeerBehaviour in every type.
    + Split up behaviours and reporters into seperate files

* doc string fixes
* Fix minor typos
* Update behaviour/reporter.go
Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com>

* Update behaviour/reporter.go
Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com>
pull/3720/head
Sean Braithwaite 6 years ago
committed by Alexander Simmerl
parent
commit
6a7d4182b4
5 changed files with 319 additions and 272 deletions
  1. +49
    -0
      behaviour/peer_behaviour.go
  2. +84
    -0
      behaviour/reporter.go
  3. +186
    -0
      behaviour/reporter_test.go
  4. +0
    -92
      p2p/peer_behaviour.go
  5. +0
    -180
      p2p/peer_behaviour_test.go

+ 49
- 0
behaviour/peer_behaviour.go View File

@ -0,0 +1,49 @@
package behaviour
import (
"github.com/tendermint/tendermint/p2p"
)
// PeerBehaviour is a struct describing a behaviour a peer performed.
// `peerID` identifies the peer and reason characterizes the specific
// behaviour performed by the peer.
type PeerBehaviour struct {
peerID p2p.ID
reason interface{}
}
type badMessage struct {
explanation string
}
// BadMessage returns a badMessage PeerBehaviour.
func BadMessage(peerID p2p.ID, explanation string) PeerBehaviour {
return PeerBehaviour{peerID: peerID, reason: badMessage{explanation}}
}
type messageOutOfOrder struct {
explanation string
}
// MessageOutOfOrder returns a messagOutOfOrder PeerBehaviour.
func MessageOutOfOrder(peerID p2p.ID, explanation string) PeerBehaviour {
return PeerBehaviour{peerID: peerID, reason: badMessage{explanation}}
}
type consensusVote struct {
explanation string
}
// ConsensusVote returns a consensusVote PeerBehaviour.
func ConsensusVote(peerID p2p.ID, explanation string) PeerBehaviour {
return PeerBehaviour{peerID: peerID, reason: consensusVote{explanation}}
}
type blockPart struct {
explanation string
}
// BlockPart returns blockPart PeerBehaviour.
func BlockPart(peerID p2p.ID, explanation string) PeerBehaviour {
return PeerBehaviour{peerID: peerID, reason: blockPart{explanation}}
}

+ 84
- 0
behaviour/reporter.go View File

@ -0,0 +1,84 @@
package behaviour
import (
"errors"
"sync"
"github.com/tendermint/tendermint/p2p"
)
// Reporter provides an interface for reactors to report the behaviour
// of peers synchronously to other components.
type Reporter interface {
Report(behaviour PeerBehaviour) error
}
// SwitchReporter reports peer behaviour to an internal Switch.
type SwitchReporter struct {
sw *p2p.Switch
}
// NewSwitchReporter return a new SwitchReporter instance which wraps the Switch.
func NewSwitcReporter(sw *p2p.Switch) *SwitchReporter {
return &SwitchReporter{
sw: sw,
}
}
// Report reports the behaviour of a peer to the Switch.
func (spbr *SwitchReporter) Report(behaviour PeerBehaviour) error {
peer := spbr.sw.Peers().Get(behaviour.peerID)
if peer == nil {
return errors.New("peer not found")
}
switch reason := behaviour.reason.(type) {
case consensusVote, blockPart:
spbr.sw.MarkPeerAsGood(peer)
case badMessage:
spbr.sw.StopPeerForError(peer, reason.explanation)
case messageOutOfOrder:
spbr.sw.StopPeerForError(peer, reason.explanation)
default:
return errors.New("unknown reason reported")
}
return nil
}
// MockReporter is a concrete implementation of the Reporter
// interface used in reactor tests to ensure reactors report the correct
// behaviour in manufactured scenarios.
type MockReporter struct {
mtx sync.RWMutex
pb map[p2p.ID][]PeerBehaviour
}
// NewMockReporter returns a Reporter which records all reported
// behaviours in memory.
func NewMockReporter() *MockReporter {
return &MockReporter{
pb: map[p2p.ID][]PeerBehaviour{},
}
}
// Report stores the PeerBehaviour produced by the peer identified by peerID.
func (mpbr *MockReporter) Report(behaviour PeerBehaviour) {
mpbr.mtx.Lock()
defer mpbr.mtx.Unlock()
mpbr.pb[behaviour.peerID] = append(mpbr.pb[behaviour.peerID], behaviour)
}
// GetBehaviours returns all behaviours reported on the peer identified by peerID.
func (mpbr *MockReporter) GetBehaviours(peerID p2p.ID) []PeerBehaviour {
mpbr.mtx.RLock()
defer mpbr.mtx.RUnlock()
if items, ok := mpbr.pb[peerID]; ok {
result := make([]PeerBehaviour, len(items))
copy(result, items)
return result
} else {
return []PeerBehaviour{}
}
}

+ 186
- 0
behaviour/reporter_test.go View File

@ -0,0 +1,186 @@
package behaviour_test
import (
"sync"
"testing"
bh "github.com/tendermint/tendermint/behaviour"
"github.com/tendermint/tendermint/p2p"
)
// TestMockReporter tests the MockReporter's ability to store reported
// peer behaviour in memory indexed by the peerID.
func TestMockReporter(t *testing.T) {
var peerID p2p.ID = "MockPeer"
pr := bh.NewMockReporter()
behaviours := pr.GetBehaviours(peerID)
if len(behaviours) != 0 {
t.Error("Expected to have no behaviours reported")
}
badMessage := bh.BadMessage(peerID, "bad message")
pr.Report(badMessage)
behaviours = pr.GetBehaviours(peerID)
if len(behaviours) != 1 {
t.Error("Expected the peer have one reported behaviour")
}
if behaviours[0] != badMessage {
t.Error("Expected Bad Message to have been reported")
}
}
type scriptItem struct {
peerID p2p.ID
behaviour bh.PeerBehaviour
}
// equalBehaviours returns true if a and b contain the same PeerBehaviours with
// the same freequencies and otherwise false.
func equalBehaviours(a []bh.PeerBehaviour, b []bh.PeerBehaviour) bool {
aHistogram := map[bh.PeerBehaviour]int{}
bHistogram := map[bh.PeerBehaviour]int{}
for _, behaviour := range a {
aHistogram[behaviour] += 1
}
for _, behaviour := range b {
bHistogram[behaviour] += 1
}
if len(aHistogram) != len(bHistogram) {
return false
}
for _, behaviour := range a {
if aHistogram[behaviour] != bHistogram[behaviour] {
return false
}
}
for _, behaviour := range b {
if bHistogram[behaviour] != aHistogram[behaviour] {
return false
}
}
return true
}
// TestEqualPeerBehaviours tests that equalBehaviours can tell that two slices
// of peer behaviours can be compared for the behaviours they contain and the
// freequencies that those behaviours occur.
func TestEqualPeerBehaviours(t *testing.T) {
var (
peerID p2p.ID = "MockPeer"
consensusVote = bh.ConsensusVote(peerID, "voted")
blockPart = bh.BlockPart(peerID, "blocked")
equals = []struct {
left []bh.PeerBehaviour
right []bh.PeerBehaviour
}{
// Empty sets
{[]bh.PeerBehaviour{}, []bh.PeerBehaviour{}},
// Single behaviours
{[]bh.PeerBehaviour{consensusVote}, []bh.PeerBehaviour{consensusVote}},
// Equal Frequencies
{[]bh.PeerBehaviour{consensusVote, consensusVote},
[]bh.PeerBehaviour{consensusVote, consensusVote}},
// Equal frequencies different orders
{[]bh.PeerBehaviour{consensusVote, blockPart},
[]bh.PeerBehaviour{blockPart, consensusVote}},
}
unequals = []struct {
left []bh.PeerBehaviour
right []bh.PeerBehaviour
}{
// Comparing empty sets to non empty sets
{[]bh.PeerBehaviour{}, []bh.PeerBehaviour{consensusVote}},
// Different behaviours
{[]bh.PeerBehaviour{consensusVote}, []bh.PeerBehaviour{blockPart}},
// Same behaviour with different frequencies
{[]bh.PeerBehaviour{consensusVote},
[]bh.PeerBehaviour{consensusVote, consensusVote}},
}
)
for _, test := range equals {
if !equalBehaviours(test.left, test.right) {
t.Errorf("Expected %#v and %#v to be equal", test.left, test.right)
}
}
for _, test := range unequals {
if equalBehaviours(test.left, test.right) {
t.Errorf("Expected %#v and %#v to be unequal", test.left, test.right)
}
}
}
// TestPeerBehaviourConcurrency constructs a scenario in which
// multiple goroutines are using the same MockReporter instance.
// This test reproduces the conditions in which MockReporter will
// be used within a Reactor `Receive` method tests to ensure thread safety.
func TestMockPeerBehaviourReporterConcurrency(t *testing.T) {
var (
behaviourScript = []struct {
peerID p2p.ID
behaviours []bh.PeerBehaviour
}{
{"1", []bh.PeerBehaviour{bh.ConsensusVote("1", "")}},
{"2", []bh.PeerBehaviour{bh.ConsensusVote("2", ""), bh.ConsensusVote("2", ""), bh.ConsensusVote("2", "")}},
{"3", []bh.PeerBehaviour{bh.BlockPart("3", ""), bh.ConsensusVote("3", ""), bh.BlockPart("3", ""), bh.ConsensusVote("3", "")}},
{"4", []bh.PeerBehaviour{bh.ConsensusVote("4", ""), bh.ConsensusVote("4", ""), bh.ConsensusVote("4", ""), bh.ConsensusVote("4", "")}},
{"5", []bh.PeerBehaviour{bh.BlockPart("5", ""), bh.ConsensusVote("5", ""), bh.BlockPart("5", ""), bh.ConsensusVote("5", "")}},
}
)
var receiveWg sync.WaitGroup
pr := bh.NewMockReporter()
scriptItems := make(chan scriptItem)
done := make(chan int)
numConsumers := 3
for i := 0; i < numConsumers; i++ {
receiveWg.Add(1)
go func() {
defer receiveWg.Done()
for {
select {
case pb := <-scriptItems:
pr.Report(pb.behaviour)
case <-done:
return
}
}
}()
}
var sendingWg sync.WaitGroup
sendingWg.Add(1)
go func() {
defer sendingWg.Done()
for _, item := range behaviourScript {
for _, reason := range item.behaviours {
scriptItems <- scriptItem{item.peerID, reason}
}
}
}()
sendingWg.Wait()
for i := 0; i < numConsumers; i++ {
done <- 1
}
receiveWg.Wait()
for _, items := range behaviourScript {
reported := pr.GetBehaviours(items.peerID)
if !equalBehaviours(reported, items.behaviours) {
t.Errorf("Expected peer %s to have behaved \nExpected: %#v \nGot %#v \n",
items.peerID, items.behaviours, reported)
}
}
}

+ 0
- 92
p2p/peer_behaviour.go View File

@ -1,92 +0,0 @@
package p2p
import (
"errors"
"sync"
)
// PeerBehaviour are types of reportable behaviours about peers.
type PeerBehaviour int
const (
PeerBehaviourBadMessage = iota
PeerBehaviourMessageOutOfOrder
PeerBehaviourVote
PeerBehaviourBlockPart
)
// PeerBehaviourReporter provides an interface for reactors to report the behaviour
// of peers synchronously to other components.
type PeerBehaviourReporter interface {
Report(peerID ID, behaviour PeerBehaviour) error
}
// SwitchPeerBehaviouReporter reports peer behaviour to an internal Switch
type SwitchPeerBehaviourReporter struct {
sw *Switch
}
// Return a new SwitchPeerBehaviourReporter instance which wraps the Switch.
func NewSwitchPeerBehaviourReporter(sw *Switch) *SwitchPeerBehaviourReporter {
return &SwitchPeerBehaviourReporter{
sw: sw,
}
}
// Report reports the behaviour of a peer to the Switch
func (spbr *SwitchPeerBehaviourReporter) Report(peerID ID, behaviour PeerBehaviour) error {
peer := spbr.sw.Peers().Get(peerID)
if peer == nil {
return errors.New("Peer not found")
}
switch behaviour {
case PeerBehaviourVote, PeerBehaviourBlockPart:
spbr.sw.MarkPeerAsGood(peer)
case PeerBehaviourBadMessage:
spbr.sw.StopPeerForError(peer, "Bad message")
case PeerBehaviourMessageOutOfOrder:
spbr.sw.StopPeerForError(peer, "Message out of order")
default:
return errors.New("Unknown behaviour")
}
return nil
}
// MockPeerBehaviourReporter serves a mock concrete implementation of the
// PeerBehaviourReporter interface used in reactor tests to ensure reactors
// report the correct behaviour in manufactured scenarios.
type MockPeerBehaviourReporter struct {
mtx sync.RWMutex
pb map[ID][]PeerBehaviour
}
// NewMockPeerBehaviourReporter returns a PeerBehaviourReporter which records all reported
// behaviours in memory.
func NewMockPeerBehaviourReporter() *MockPeerBehaviourReporter {
return &MockPeerBehaviourReporter{
pb: map[ID][]PeerBehaviour{},
}
}
// Report stores the PeerBehaviour produced by the peer identified by peerID.
func (mpbr *MockPeerBehaviourReporter) Report(peerID ID, behaviour PeerBehaviour) {
mpbr.mtx.Lock()
defer mpbr.mtx.Unlock()
mpbr.pb[peerID] = append(mpbr.pb[peerID], behaviour)
}
// GetBehaviours returns all behaviours reported on the peer identified by peerID.
func (mpbr *MockPeerBehaviourReporter) GetBehaviours(peerID ID) []PeerBehaviour {
mpbr.mtx.RLock()
defer mpbr.mtx.RUnlock()
if items, ok := mpbr.pb[peerID]; ok {
result := make([]PeerBehaviour, len(items))
copy(result, items)
return result
} else {
return []PeerBehaviour{}
}
}

+ 0
- 180
p2p/peer_behaviour_test.go View File

@ -1,180 +0,0 @@
package p2p_test
import (
"sync"
"testing"
"github.com/tendermint/tendermint/p2p"
)
// TestMockPeerBehaviour tests the MockPeerBehaviour' ability to store reported
// peer behaviour in memory indexed by the peerID
func TestMockPeerBehaviourReporter(t *testing.T) {
var peerID p2p.ID = "MockPeer"
pr := p2p.NewMockPeerBehaviourReporter()
behaviours := pr.GetBehaviours(peerID)
if len(behaviours) != 0 {
t.Error("Expected to have no behaviours reported")
}
pr.Report(peerID, p2p.PeerBehaviourBadMessage)
behaviours = pr.GetBehaviours(peerID)
if len(behaviours) != 1 {
t.Error("Expected the peer have one reported behaviour")
}
if behaviours[0] != p2p.PeerBehaviourBadMessage {
t.Error("Expected PeerBehaviourBadMessage to have been reported")
}
}
type scriptedBehaviours struct {
PeerID p2p.ID
Behaviours []p2p.PeerBehaviour
}
type scriptItem struct {
PeerID p2p.ID
Behaviour p2p.PeerBehaviour
}
// equalBehaviours returns true if a and b contain the same PeerBehaviours with
// the same freequency and otherwise false.
func equalBehaviours(a []p2p.PeerBehaviour, b []p2p.PeerBehaviour) bool {
aHistogram := map[p2p.PeerBehaviour]int{}
bHistogram := map[p2p.PeerBehaviour]int{}
for _, behaviour := range a {
aHistogram[behaviour] += 1
}
for _, behaviour := range b {
bHistogram[behaviour] += 1
}
if len(aHistogram) != len(bHistogram) {
return false
}
for _, behaviour := range a {
if aHistogram[behaviour] != bHistogram[behaviour] {
return false
}
}
for _, behaviour := range b {
if bHistogram[behaviour] != aHistogram[behaviour] {
return false
}
}
return true
}
// TestEqualPeerBehaviours tests that equalBehaviours can tell that two slices
// of peer behaviours can be compared for the behaviours they contain and the
// freequencies that those behaviours occur.
func TestEqualPeerBehaviours(t *testing.T) {
equals := []struct {
left []p2p.PeerBehaviour
right []p2p.PeerBehaviour
}{
// Empty sets
{[]p2p.PeerBehaviour{}, []p2p.PeerBehaviour{}},
// Single behaviours
{[]p2p.PeerBehaviour{p2p.PeerBehaviourVote}, []p2p.PeerBehaviour{p2p.PeerBehaviourVote}},
// Equal Frequencies
{[]p2p.PeerBehaviour{p2p.PeerBehaviourVote, p2p.PeerBehaviourVote},
[]p2p.PeerBehaviour{p2p.PeerBehaviourVote, p2p.PeerBehaviourVote}},
// Equal frequencies different orders
{[]p2p.PeerBehaviour{p2p.PeerBehaviourVote, p2p.PeerBehaviourBlockPart},
[]p2p.PeerBehaviour{p2p.PeerBehaviourBlockPart, p2p.PeerBehaviourVote}},
}
for _, test := range equals {
if !equalBehaviours(test.left, test.right) {
t.Errorf("Expected %#v and %#v to be equal", test.left, test.right)
}
}
unequals := []struct {
left []p2p.PeerBehaviour
right []p2p.PeerBehaviour
}{
// Comparing empty sets to non empty sets
{[]p2p.PeerBehaviour{}, []p2p.PeerBehaviour{p2p.PeerBehaviourVote}},
// Different behaviours
{[]p2p.PeerBehaviour{p2p.PeerBehaviourVote}, []p2p.PeerBehaviour{p2p.PeerBehaviourBlockPart}},
// Same behaviour with different frequencies
{[]p2p.PeerBehaviour{p2p.PeerBehaviourVote},
[]p2p.PeerBehaviour{p2p.PeerBehaviourVote, p2p.PeerBehaviourVote}},
}
for _, test := range unequals {
if equalBehaviours(test.left, test.right) {
t.Errorf("Expected %#v and %#v to be unequal", test.left, test.right)
}
}
}
// TestPeerBehaviourConcurrency constructs a scenario in which
// multiple goroutines are using the same MockPeerBehaviourReporter instance.
// This test reproduces the conditions in which MockPeerBehaviourReporter will
// be used within a Reactor Receive method tests to ensure thread safety.
func TestMockPeerBehaviourReporterConcurrency(t *testing.T) {
behaviourScript := []scriptedBehaviours{
{"1", []p2p.PeerBehaviour{p2p.PeerBehaviourVote}},
{"2", []p2p.PeerBehaviour{p2p.PeerBehaviourVote, p2p.PeerBehaviourVote, p2p.PeerBehaviourVote, p2p.PeerBehaviourVote}},
{"3", []p2p.PeerBehaviour{p2p.PeerBehaviourBlockPart, p2p.PeerBehaviourVote, p2p.PeerBehaviourBlockPart, p2p.PeerBehaviourVote}},
{"4", []p2p.PeerBehaviour{p2p.PeerBehaviourVote, p2p.PeerBehaviourVote, p2p.PeerBehaviourVote, p2p.PeerBehaviourVote}},
{"5", []p2p.PeerBehaviour{p2p.PeerBehaviourBlockPart, p2p.PeerBehaviourVote, p2p.PeerBehaviourBlockPart, p2p.PeerBehaviourVote}},
}
var receiveWg sync.WaitGroup
pr := p2p.NewMockPeerBehaviourReporter()
scriptItems := make(chan scriptItem)
done := make(chan int)
numConsumers := 3
for i := 0; i < numConsumers; i++ {
receiveWg.Add(1)
go func() {
defer receiveWg.Done()
for {
select {
case pb := <-scriptItems:
pr.Report(pb.PeerID, pb.Behaviour)
case <-done:
return
}
}
}()
}
var sendingWg sync.WaitGroup
sendingWg.Add(1)
go func() {
defer sendingWg.Done()
for _, item := range behaviourScript {
for _, reason := range item.Behaviours {
scriptItems <- scriptItem{item.PeerID, reason}
}
}
}()
sendingWg.Wait()
for i := 0; i < numConsumers; i++ {
done <- 1
}
receiveWg.Wait()
for _, items := range behaviourScript {
reported := pr.GetBehaviours(items.PeerID)
if !equalBehaviours(reported, items.Behaviours) {
t.Errorf("Expected peer %s to have behaved \nExpected: %#v \nGot %#v \n",
items.PeerID, items.Behaviours, reported)
}
}
}

Loading…
Cancel
Save