You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

257 lines
6.7 KiB

package evidence
import (
clist ""
ep ""
tmproto ""
const (
EvidenceChannel = byte(0x38)
maxMsgSize = 1048576 // 1MB TODO make it configurable
broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
// Reactor handles evpool evidence broadcasting amongst peers.
type Reactor struct {
evpool *Pool
eventBus *types.EventBus
// NewReactor returns a new Reactor with the given config and evpool.
func NewReactor(evpool *Pool) *Reactor {
evR := &Reactor{
evpool: evpool,
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR)
return evR
// SetLogger sets the Logger on the reactor and the underlying Evidence.
func (evR *Reactor) SetLogger(l log.Logger) {
evR.Logger = l
// GetChannels implements Reactor.
// It returns the list of channels for this reactor.
func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
ID: EvidenceChannel,
Priority: 5,
RecvMessageCapacity: maxMsgSize,
// AddPeer implements Reactor.
func (evR *Reactor) AddPeer(peer p2p.Peer) {
go evR.broadcastEvidenceRoutine(peer)
// Receive implements Reactor.
// It adds any received evidence to the evpool.
func (evR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
evis, err := decodeMsg(msgBytes)
if err != nil {
evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err, "bytes", msgBytes)
evR.Switch.StopPeerForError(src, err)
for _, ev := range evis {
err := evR.evpool.AddEvidence(ev)
switch err.(type) {
case ErrInvalidEvidence:
evR.Logger.Error("Evidence is not valid", "evidence", evis, "err", err)
// punish peer
evR.Switch.StopPeerForError(src, err)
case nil:
evR.Logger.Error("Evidence has not been added", "evidence", evis, "err", err)
// SetEventBus implements events.Eventable.
func (evR *Reactor) SetEventBus(b *types.EventBus) {
evR.eventBus = b
// Modeled after the mempool routine.
// - Evidence accumulates in a clist.
// - Each peer has a routine that iterates through the clist,
// sending available evidence to the peer.
// - If we're waiting for new evidence and the list is not empty,
// start iterating from the beginning again.
func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) {
var next *clist.CElement
for {
// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWait() returned nil. Go ahead and
// start from the beginning.
if next == nil {
select {
case <-evR.evpool.EvidenceWaitChan(): // Wait until evidence is available
if next = evR.evpool.EvidenceFront(); next == nil {
case <-peer.Quit():
case <-evR.Quit():
ev := next.Value.(types.Evidence)
evis, retry := evR.checkSendEvidenceMessage(peer, ev)
if len(evis) > 0 {
msgBytes, err := encodeMsg(evis)
if err != nil {
success := peer.Send(EvidenceChannel, msgBytes)
retry = !success
if retry {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
afterCh := time.After(time.Second * broadcastEvidenceIntervalS)
select {
case <-afterCh:
// start from the beginning every tick.
// TODO: only do this if we're at the end of the list!
next = nil
case <-next.NextWaitChan():
// see the start of the for loop for nil check
next = next.Next()
case <-peer.Quit():
case <-evR.Quit():
// Returns the message to send the peer, or nil if the evidence is invalid for the peer.
// If message is nil, return true if we should sleep and try again.
func (evR Reactor) checkSendEvidenceMessage(
peer p2p.Peer,
ev types.Evidence,
) (evis []types.Evidence, retry bool) {
// make sure the peer is up to date
evHeight := ev.Height()
peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
if !ok {
// Peer does not have a state yet. We set it in the consensus reactor, but
// when we add peer in Switch, the order we call reactors#AddPeer is
// different every time due to us using a map. Sometimes other reactors
// will be initialized before the consensus reactor. We should wait a few
// milliseconds and retry.
return nil, true
// NOTE: We only send evidence to peers where
// peerHeight - maxAge < evidenceHeight < peerHeight
// and
// lastBlockTime - maxDuration < evidenceTime
var (
peerHeight = peerState.GetHeight()
params = evR.evpool.State().ConsensusParams.Evidence
ageDuration = evR.evpool.State().LastBlockTime.Sub(ev.Time())
ageNumBlocks = peerHeight - evHeight
if peerHeight < evHeight { // peer is behind. sleep while he catches up
return nil, true
} else if ageNumBlocks > params.MaxAgeNumBlocks &&
ageDuration > params.MaxAgeDuration { // evidence is too old, skip
// NOTE: if evidence is too old for an honest peer, then we're behind and
// either it already got committed or it never will!
evR.Logger.Info("Not sending peer old evidence",
"peerHeight", peerHeight,
"evHeight", evHeight,
"maxAgeNumBlocks", params.MaxAgeNumBlocks,
"lastBlockTime", evR.evpool.State().LastBlockTime,
"evTime", ev.Time(),
"maxAgeDuration", params.MaxAgeDuration,
"peer", peer,
return nil, false
// send evidence
return []types.Evidence{ev}, false
// PeerState describes the state of a peer.
type PeerState interface {
GetHeight() int64
// encodemsg takes a array of evidence
// returns the byte encoding of the List Message
func encodeMsg(evis []types.Evidence) ([]byte, error) {
evi := make([]*tmproto.Evidence, len(evis))
for i := 0; i < len(evis); i++ {
ev, err := types.EvidenceToProto(evis[i])
if err != nil {
return nil, err
evi[i] = ev
epl := ep.List{
Evidence: evi,
return proto.Marshal(&epl)
// decodemsg takes an array of bytes
// returns an array of evidence
func decodeMsg(bz []byte) (evis []types.Evidence, err error) {
lm := ep.List{}
proto.Unmarshal(bz, &lm)
evis = make([]types.Evidence, len(lm.Evidence))
for i := 0; i < len(lm.Evidence); i++ {
ev, err := types.EvidenceFromProto(lm.Evidence[i])
if err != nil {
return nil, err
evis[i] = ev
for i, ev := range evis {
if err := ev.ValidateBasic(); err != nil {
return nil, fmt.Errorf("invalid evidence (#%d): %v", i, err)
return evis, nil