diff --git a/evidence/pool.go b/evidence/pool.go index fab29be60..b24537776 100644 --- a/evidence/pool.go +++ b/evidence/pool.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" + clist "github.com/tendermint/tmlibs/clist" dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" @@ -17,6 +18,7 @@ type EvidencePool struct { logger log.Logger evidenceStore *EvidenceStore + evidenceList *clist.CList // concurrent linked-list of evidence // needed to load validators to verify evidence stateDB dbm.DB @@ -24,9 +26,6 @@ type EvidencePool struct { // latest state mtx sync.Mutex state sm.State - - // never close - evidenceChan chan types.Evidence } func NewEvidencePool(stateDB dbm.DB, evidenceStore *EvidenceStore) *EvidencePool { @@ -35,21 +34,24 @@ func NewEvidencePool(stateDB dbm.DB, evidenceStore *EvidenceStore) *EvidencePool state: sm.LoadState(stateDB), logger: log.NewNopLogger(), evidenceStore: evidenceStore, - evidenceChan: make(chan types.Evidence), + evidenceList: clist.New(), } return evpool } +func (evpool *EvidencePool) EvidenceFront() *clist.CElement { + return evpool.evidenceList.Front() +} + +func (evpool *EvidencePool) EvidenceWaitChan() <-chan struct{} { + return evpool.evidenceList.WaitChan() +} + // SetLogger sets the Logger. func (evpool *EvidencePool) SetLogger(l log.Logger) { evpool.logger = l } -// EvidenceChan returns an unbuffered channel on which new evidence can be received. -func (evpool *EvidencePool) EvidenceChan() <-chan types.Evidence { - return evpool.evidenceChan -} - // PriorityEvidence returns the priority evidence. func (evpool *EvidencePool) PriorityEvidence() []types.Evidence { return evpool.evidenceStore.PriorityEvidence() @@ -81,11 +83,10 @@ func (evpool *EvidencePool) Update(block *types.Block, state sm.State) { evpool.mtx.Unlock() // remove evidence from pending and mark committed - evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence) + evpool.MarkEvidenceAsCommitted(block.Height, block.Evidence.Evidence) } // AddEvidence checks the evidence is valid and adds it to the pool. -// Blocks on the EvidenceChan. func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { // TODO: check if we already have evidence for this @@ -109,14 +110,39 @@ func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { evpool.logger.Info("Verified new evidence of byzantine behaviour", "evidence", evidence) - // never closes. always safe to send on - evpool.evidenceChan <- evidence + // add evidence to clist + evpool.evidenceList.PushBack(evidence) + return nil } // MarkEvidenceAsCommitted marks all the evidence as committed. -func (evpool *EvidencePool) MarkEvidenceAsCommitted(evidence []types.Evidence) { +func (evpool *EvidencePool) MarkEvidenceAsCommitted(height int64, evidence []types.Evidence) { + blockEvidenceMap := make(map[string]struct{}) for _, ev := range evidence { evpool.evidenceStore.MarkEvidenceAsCommitted(ev) + blockEvidenceMap[ev.String()] = struct{}{} + } + + maxAge := evpool.State().ConsensusParams.EvidenceParams.MaxAge + + // remove committed evidence from the clist + evpool.filterEvidence(height, maxAge, blockEvidenceMap) + +} + +func (evpool *EvidencePool) filterEvidence(height, maxAge int64, blockEvidenceMap map[string]struct{}) { + for e := evpool.evidenceList.Front(); e != nil; e = e.Next() { + ev := e.Value.(types.Evidence) + + // Remove the evidence if it's already in a block + // or if it's now too old. + if _, ok := blockEvidenceMap[ev.String()]; ok || + ev.Height() < height-maxAge { + + // remove from clist + evpool.evidenceList.Remove(e) + e.DetachPrev() + } } } diff --git a/evidence/pool_test.go b/evidence/pool_test.go index 97a29a278..019076234 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -45,7 +45,6 @@ func initializeValidatorState(valAddr []byte, height int64) dbm.DB { } func TestEvidencePool(t *testing.T) { - assert := assert.New(t) valAddr := []byte("val1") height := int64(5) @@ -56,26 +55,25 @@ func TestEvidencePool(t *testing.T) { goodEvidence := types.NewMockGoodEvidence(height, 0, valAddr) badEvidence := types.MockBadEvidence{goodEvidence} + // bad evidence err := pool.AddEvidence(badEvidence) - assert.NotNil(err) + assert.NotNil(t, err) var wg sync.WaitGroup wg.Add(1) go func() { - <-pool.EvidenceChan() + <-pool.EvidenceWaitChan() wg.Done() }() err = pool.AddEvidence(goodEvidence) - assert.Nil(err) + assert.Nil(t, err) wg.Wait() - // if we send it again it wont fire on the chan + assert.Equal(t, 1, pool.evidenceList.Len()) + + // if we send it again, it shouldnt change the size err = pool.AddEvidence(goodEvidence) - assert.Nil(err) - select { - case <-pool.EvidenceChan(): - t.Fatal("unexpected read on EvidenceChan") - default: - } + assert.Nil(t, err) + assert.Equal(t, 1, pool.evidenceList.Len()) } diff --git a/evidence/reactor.go b/evidence/reactor.go index 625663df2..602edc0c9 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -6,9 +6,9 @@ import ( "time" "github.com/tendermint/go-amino" + clist "github.com/tendermint/tmlibs/clist" "github.com/tendermint/tmlibs/log" - cstypes "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -16,8 +16,10 @@ import ( const ( EvidenceChannel = byte(0x38) - maxMsgSize = 1048576 // 1MB TODO make it configurable - broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often + maxMsgSize = 1048576 // 1MB TODO make it configurable + + broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often + peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount ) // EvidenceReactor handles evpool evidence broadcasting amongst peers. @@ -47,7 +49,6 @@ func (evR *EvidenceReactor) OnStart() error { if err := evR.BaseReactor.OnStart(); err != nil { return err } - go evR.broadcastRoutine() return nil } @@ -64,14 +65,7 @@ func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) { - // send the peer our high-priority evidence. - // the rest will be sent by the broadcastRoutine - evidences := evR.evpool.PriorityEvidence() - msg := &EvidenceListMessage{evidences} - success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) - if !success { - // TODO: remove peer ? - } + go evR.broadcastEvidenceRoutine(peer) } // RemovePeer implements Reactor. @@ -110,63 +104,80 @@ func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) { evR.eventBus = b } -// Broadcast new evidence to all peers. -// Broadcasts must be non-blocking so routine is always available to read off EvidenceChan. -func (evR *EvidenceReactor) broadcastRoutine() { - ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS) +// Modeled after the mempool routine. +// - Evidence accumulates in a clist. +// - Each peer has a routien that iterates through the clist, +// sending available evidence to the peer. +// - If we're waiting for new evidence and the list is not empty, +// start iterating from the beginning again. +func (evR *EvidenceReactor) broadcastEvidenceRoutine(peer p2p.Peer) { + var next *clist.CElement for { - select { - case evidence := <-evR.evpool.EvidenceChan(): - // broadcast some new evidence - msg := &EvidenceListMessage{[]types.Evidence{evidence}} - evR.broadcastEvidenceListMsg(msg) - - // TODO: the broadcast here is just doing TrySend. - // We should make sure the send succeeds before marking broadcasted. - evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence) - case <-ticker.C: - // broadcast all pending evidence - msg := &EvidenceListMessage{evR.evpool.PendingEvidence()} - evR.broadcastEvidenceListMsg(msg) - case <-evR.Quit(): - return - } - } -} - -func (evR *EvidenceReactor) broadcastEvidenceListMsg(msg *EvidenceListMessage) { - // NOTE: we dont send evidence to peers higher than their height, - // because they can't validate it (don't have validators from the height). - // So, for now, only send the `msg` to peers synced to the highest height in the list. - // TODO: send each peer all the evidence below its current height within maxAge - - // might require a routine per peer, like the mempool. - - var maxHeight int64 - for _, ev := range msg.Evidence { - if ev.Height() > maxHeight { - maxHeight = ev.Height() + // This happens because the CElement we were looking at got garbage + // collected (removed). That is, .NextWait() returned nil. Go ahead and + // start from the beginning. + if next == nil { + select { + case <-evR.evpool.EvidenceWaitChan(): // Wait until evidence is available + if next = evR.evpool.EvidenceFront(); next == nil { + continue + } + case <-peer.Quit(): + return + case <-evR.Quit(): + return + } } - } - for _, peer := range evR.Switch.Peers().List() { - ps, ok := peer.Get(types.PeerStateKey).(PeerState) + ev := next.Value.(types.Evidence) + // make sure the peer is up to date + height := ev.Height() + peerState, ok := peer.Get(types.PeerStateKey).(PeerState) if !ok { evR.Logger.Info("Found peer without PeerState", "peer", peer) + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue + } - // only send to peer if maxHeight < peerHeight < maxHeight + maxAge + // NOTE: We only send evidence to peers where + // peerHeight - maxAge < evidenceHeight < peerHeight maxAge := evR.evpool.State().ConsensusParams.EvidenceParams.MaxAge - rs := ps.GetRoundState() - if rs.Height >= maxHeight && - rs.Height < maxAge+maxHeight { - peer.TrySend(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) + peerHeight := peerState.GetHeight() + if peerHeight < height || + peerHeight > height+maxAge { + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + continue + } + + // send evidence + msg := &EvidenceListMessage{[]types.Evidence{ev}} + success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg)) + if !success { + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + continue + } + + afterCh := time.After(time.Second * broadcastEvidenceIntervalS) + select { + case <-afterCh: + // start from the beginning every tick. + // TODO: only do this if we're at the end of the list! + next = nil + case <-next.NextWaitChan(): + // see the start of the for loop for nil check + next = next.Next() + case <-peer.Quit(): + return + case <-evR.Quit(): + return } } } +// PeerState describes the state of a peer. type PeerState interface { - GetRoundState() *cstypes.PeerRoundState + GetHeight() int64 } //----------------------------------------------------------------------------- diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 6b4b24a07..c7034c321 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -14,7 +14,6 @@ import ( "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" - cstypes "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -149,10 +148,8 @@ type peerState struct { height int64 } -func (ps peerState) GetRoundState() *cstypes.PeerRoundState { - return &cstypes.PeerRoundState{ - Height: ps.height, - } +func (ps peerState) GetHeight() int64 { + return ps.height } func TestReactorSelectiveBroadcast(t *testing.T) {