Browse Source

evidence: give each peer a go-routine

pull/1628/head
Ethan Buchman 6 years ago
parent
commit
932381effa
4 changed files with 117 additions and 85 deletions
  1. +40
    -14
      evidence/pool.go
  2. +9
    -11
      evidence/pool_test.go
  3. +66
    -55
      evidence/reactor.go
  4. +2
    -5
      evidence/reactor_test.go

+ 40
- 14
evidence/pool.go View File

@ -4,6 +4,7 @@ import (
"fmt"
"sync"
clist "github.com/tendermint/tmlibs/clist"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
@ -17,6 +18,7 @@ type EvidencePool struct {
logger log.Logger
evidenceStore *EvidenceStore
evidenceList *clist.CList // concurrent linked-list of evidence
// needed to load validators to verify evidence
stateDB dbm.DB
@ -24,9 +26,6 @@ type EvidencePool struct {
// latest state
mtx sync.Mutex
state sm.State
// never close
evidenceChan chan types.Evidence
}
func NewEvidencePool(stateDB dbm.DB, evidenceStore *EvidenceStore) *EvidencePool {
@ -35,21 +34,24 @@ func NewEvidencePool(stateDB dbm.DB, evidenceStore *EvidenceStore) *EvidencePool
state: sm.LoadState(stateDB),
logger: log.NewNopLogger(),
evidenceStore: evidenceStore,
evidenceChan: make(chan types.Evidence),
evidenceList: clist.New(),
}
return evpool
}
func (evpool *EvidencePool) EvidenceFront() *clist.CElement {
return evpool.evidenceList.Front()
}
func (evpool *EvidencePool) EvidenceWaitChan() <-chan struct{} {
return evpool.evidenceList.WaitChan()
}
// SetLogger sets the Logger.
func (evpool *EvidencePool) SetLogger(l log.Logger) {
evpool.logger = l
}
// EvidenceChan returns an unbuffered channel on which new evidence can be received.
func (evpool *EvidencePool) EvidenceChan() <-chan types.Evidence {
return evpool.evidenceChan
}
// PriorityEvidence returns the priority evidence.
func (evpool *EvidencePool) PriorityEvidence() []types.Evidence {
return evpool.evidenceStore.PriorityEvidence()
@ -81,11 +83,10 @@ func (evpool *EvidencePool) Update(block *types.Block, state sm.State) {
evpool.mtx.Unlock()
// remove evidence from pending and mark committed
evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence)
evpool.MarkEvidenceAsCommitted(block.Height, block.Evidence.Evidence)
}
// AddEvidence checks the evidence is valid and adds it to the pool.
// Blocks on the EvidenceChan.
func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) {
// TODO: check if we already have evidence for this
@ -109,14 +110,39 @@ func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) {
evpool.logger.Info("Verified new evidence of byzantine behaviour", "evidence", evidence)
// never closes. always safe to send on
evpool.evidenceChan <- evidence
// add evidence to clist
evpool.evidenceList.PushBack(evidence)
return nil
}
// MarkEvidenceAsCommitted marks all the evidence as committed.
func (evpool *EvidencePool) MarkEvidenceAsCommitted(evidence []types.Evidence) {
func (evpool *EvidencePool) MarkEvidenceAsCommitted(height int64, evidence []types.Evidence) {
blockEvidenceMap := make(map[string]struct{})
for _, ev := range evidence {
evpool.evidenceStore.MarkEvidenceAsCommitted(ev)
blockEvidenceMap[ev.String()] = struct{}{}
}
maxAge := evpool.State().ConsensusParams.EvidenceParams.MaxAge
// remove committed evidence from the clist
evpool.filterEvidence(height, maxAge, blockEvidenceMap)
}
func (evpool *EvidencePool) filterEvidence(height, maxAge int64, blockEvidenceMap map[string]struct{}) {
for e := evpool.evidenceList.Front(); e != nil; e = e.Next() {
ev := e.Value.(types.Evidence)
// Remove the evidence if it's already in a block
// or if it's now too old.
if _, ok := blockEvidenceMap[ev.String()]; ok ||
ev.Height() < height-maxAge {
// remove from clist
evpool.evidenceList.Remove(e)
e.DetachPrev()
}
}
}

+ 9
- 11
evidence/pool_test.go View File

@ -45,7 +45,6 @@ func initializeValidatorState(valAddr []byte, height int64) dbm.DB {
}
func TestEvidencePool(t *testing.T) {
assert := assert.New(t)
valAddr := []byte("val1")
height := int64(5)
@ -56,26 +55,25 @@ func TestEvidencePool(t *testing.T) {
goodEvidence := types.NewMockGoodEvidence(height, 0, valAddr)
badEvidence := types.MockBadEvidence{goodEvidence}
// bad evidence
err := pool.AddEvidence(badEvidence)
assert.NotNil(err)
assert.NotNil(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
<-pool.EvidenceChan()
<-pool.EvidenceWaitChan()
wg.Done()
}()
err = pool.AddEvidence(goodEvidence)
assert.Nil(err)
assert.Nil(t, err)
wg.Wait()
// if we send it again it wont fire on the chan
assert.Equal(t, 1, pool.evidenceList.Len())
// if we send it again, it shouldnt change the size
err = pool.AddEvidence(goodEvidence)
assert.Nil(err)
select {
case <-pool.EvidenceChan():
t.Fatal("unexpected read on EvidenceChan")
default:
}
assert.Nil(t, err)
assert.Equal(t, 1, pool.evidenceList.Len())
}

+ 66
- 55
evidence/reactor.go View File

@ -6,9 +6,9 @@ import (
"time"
"github.com/tendermint/go-amino"
clist "github.com/tendermint/tmlibs/clist"
"github.com/tendermint/tmlibs/log"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
@ -16,8 +16,10 @@ import (
const (
EvidenceChannel = byte(0x38)
maxMsgSize = 1048576 // 1MB TODO make it configurable
broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often
maxMsgSize = 1048576 // 1MB TODO make it configurable
broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
)
// EvidenceReactor handles evpool evidence broadcasting amongst peers.
@ -47,7 +49,6 @@ func (evR *EvidenceReactor) OnStart() error {
if err := evR.BaseReactor.OnStart(); err != nil {
return err
}
go evR.broadcastRoutine()
return nil
}
@ -64,14 +65,7 @@ func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor.
func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) {
// send the peer our high-priority evidence.
// the rest will be sent by the broadcastRoutine
evidences := evR.evpool.PriorityEvidence()
msg := &EvidenceListMessage{evidences}
success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
if !success {
// TODO: remove peer ?
}
go evR.broadcastEvidenceRoutine(peer)
}
// RemovePeer implements Reactor.
@ -110,63 +104,80 @@ func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) {
evR.eventBus = b
}
// Broadcast new evidence to all peers.
// Broadcasts must be non-blocking so routine is always available to read off EvidenceChan.
func (evR *EvidenceReactor) broadcastRoutine() {
ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS)
// Modeled after the mempool routine.
// - Evidence accumulates in a clist.
// - Each peer has a routien that iterates through the clist,
// sending available evidence to the peer.
// - If we're waiting for new evidence and the list is not empty,
// start iterating from the beginning again.
func (evR *EvidenceReactor) broadcastEvidenceRoutine(peer p2p.Peer) {
var next *clist.CElement
for {
select {
case evidence := <-evR.evpool.EvidenceChan():
// broadcast some new evidence
msg := &EvidenceListMessage{[]types.Evidence{evidence}}
evR.broadcastEvidenceListMsg(msg)
// TODO: the broadcast here is just doing TrySend.
// We should make sure the send succeeds before marking broadcasted.
evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence)
case <-ticker.C:
// broadcast all pending evidence
msg := &EvidenceListMessage{evR.evpool.PendingEvidence()}
evR.broadcastEvidenceListMsg(msg)
case <-evR.Quit():
return
}
}
}
func (evR *EvidenceReactor) broadcastEvidenceListMsg(msg *EvidenceListMessage) {
// NOTE: we dont send evidence to peers higher than their height,
// because they can't validate it (don't have validators from the height).
// So, for now, only send the `msg` to peers synced to the highest height in the list.
// TODO: send each peer all the evidence below its current height within maxAge -
// might require a routine per peer, like the mempool.
var maxHeight int64
for _, ev := range msg.Evidence {
if ev.Height() > maxHeight {
maxHeight = ev.Height()
// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWait() returned nil. Go ahead and
// start from the beginning.
if next == nil {
select {
case <-evR.evpool.EvidenceWaitChan(): // Wait until evidence is available
if next = evR.evpool.EvidenceFront(); next == nil {
continue
}
case <-peer.Quit():
return
case <-evR.Quit():
return
}
}
}
for _, peer := range evR.Switch.Peers().List() {
ps, ok := peer.Get(types.PeerStateKey).(PeerState)
ev := next.Value.(types.Evidence)
// make sure the peer is up to date
height := ev.Height()
peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
if !ok {
evR.Logger.Info("Found peer without PeerState", "peer", peer)
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
}
// only send to peer if maxHeight < peerHeight < maxHeight + maxAge
// NOTE: We only send evidence to peers where
// peerHeight - maxAge < evidenceHeight < peerHeight
maxAge := evR.evpool.State().ConsensusParams.EvidenceParams.MaxAge
rs := ps.GetRoundState()
if rs.Height >= maxHeight &&
rs.Height < maxAge+maxHeight {
peer.TrySend(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
peerHeight := peerState.GetHeight()
if peerHeight < height ||
peerHeight > height+maxAge {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
}
// send evidence
msg := &EvidenceListMessage{[]types.Evidence{ev}}
success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
if !success {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
}
afterCh := time.After(time.Second * broadcastEvidenceIntervalS)
select {
case <-afterCh:
// start from the beginning every tick.
// TODO: only do this if we're at the end of the list!
next = nil
case <-next.NextWaitChan():
// see the start of the for loop for nil check
next = next.Next()
case <-peer.Quit():
return
case <-evR.Quit():
return
}
}
}
// PeerState describes the state of a peer.
type PeerState interface {
GetRoundState() *cstypes.PeerRoundState
GetHeight() int64
}
//-----------------------------------------------------------------------------


+ 2
- 5
evidence/reactor_test.go View File

@ -14,7 +14,6 @@ import (
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
@ -149,10 +148,8 @@ type peerState struct {
height int64
}
func (ps peerState) GetRoundState() *cstypes.PeerRoundState {
return &cstypes.PeerRoundState{
Height: ps.height,
}
func (ps peerState) GetHeight() int64 {
return ps.height
}
func TestReactorSelectiveBroadcast(t *testing.T) {


Loading…
Cancel
Save