You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

253 lines
6.9 KiB

package evidence
import (
"fmt"
"time"
clist "github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
const (
EvidenceChannel = byte(0x38)
maxMsgSize = 1048576 // 1MB TODO make it configurable
// broadcast all uncommitted evidence this often. This sets when the reactor
// goes back to the start of the list and begins sending the evidence again.
// Most evidence should be committed in the very next block that is why we wait
// just over the block production rate before sending evidence again.
broadcastEvidenceIntervalS = 10
// If a message fails wait this much before sending it again
peerRetryMessageIntervalMS = 100
)
// Reactor handles evpool evidence broadcasting amongst peers.
type Reactor struct {
p2p.BaseReactor
evpool *Pool
eventBus *types.EventBus
}
// NewReactor returns a new Reactor with the given config and evpool.
func NewReactor(evpool *Pool) *Reactor {
evR := &Reactor{
evpool: evpool,
}
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR)
return evR
}
// SetLogger sets the Logger on the reactor and the underlying Evidence.
func (evR *Reactor) SetLogger(l log.Logger) {
evR.Logger = l
evR.evpool.SetLogger(l)
}
// GetChannels implements Reactor.
// It returns the list of channels for this reactor.
func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
ID: EvidenceChannel,
Priority: 5,
RecvMessageCapacity: maxMsgSize,
},
}
}
// AddPeer implements Reactor.
func (evR *Reactor) AddPeer(peer p2p.Peer) {
go evR.broadcastEvidenceRoutine(peer)
}
// Receive implements Reactor.
// It adds any received evidence to the evpool.
// XXX: do not call any methods that can block or incur heavy processing.
// https://github.com/tendermint/tendermint/issues/2888
func (evR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
evis, err := decodeMsg(msgBytes)
if err != nil {
evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
evR.Switch.StopPeerForError(src, err)
return
}
for _, ev := range evis {
err := evR.evpool.AddEvidence(ev)
switch err.(type) {
case *types.ErrInvalidEvidence:
evR.Logger.Error(err.Error())
// punish peer
evR.Switch.StopPeerForError(src, err)
return
case nil:
default:
// continue to the next piece of evidence
evR.Logger.Error("Evidence has not been added", "evidence", evis, "err", err)
}
}
}
// SetEventBus implements events.Eventable.
func (evR *Reactor) SetEventBus(b *types.EventBus) {
evR.eventBus = b
}
// Modeled after the mempool routine.
// - Evidence accumulates in a clist.
// - Each peer has a routine that iterates through the clist,
// sending available evidence to the peer.
// - If we're waiting for new evidence and the list is not empty,
// start iterating from the beginning again.
func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) {
var next *clist.CElement
for {
// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWait() returned nil. Go ahead and
// start from the beginning.
if next == nil {
select {
case <-evR.evpool.EvidenceWaitChan(): // Wait until evidence is available
if next = evR.evpool.EvidenceFront(); next == nil {
continue
}
case <-peer.Quit():
return
case <-evR.Quit():
return
}
}
ev := next.Value.(types.Evidence)
evis := evR.prepareEvidenceMessage(peer, ev)
if len(evis) > 0 {
msgBytes, err := encodeMsg(evis)
if err != nil {
panic(err)
}
evR.Logger.Debug("Gossiping evidence to peer", "ev", ev, "peer", peer.ID())
success := peer.Send(EvidenceChannel, msgBytes)
if !success {
time.Sleep(peerRetryMessageIntervalMS * time.Millisecond)
continue
}
}
afterCh := time.After(time.Second * broadcastEvidenceIntervalS)
select {
case <-afterCh:
// start from the beginning every tick.
// TODO: only do this if we're at the end of the list!
next = nil
case <-next.NextWaitChan():
// see the start of the for loop for nil check
next = next.Next()
case <-peer.Quit():
return
case <-evR.Quit():
return
}
}
}
// Returns the message to send to the peer, or nil if the evidence is invalid for the peer.
// If message is nil, we should sleep and try again.
func (evR Reactor) prepareEvidenceMessage(
peer p2p.Peer,
ev types.Evidence,
) (evis []types.Evidence) {
// make sure the peer is up to date
evHeight := ev.Height()
peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
if !ok {
// Peer does not have a state yet. We set it in the consensus reactor, but
// when we add peer in Switch, the order we call reactors#AddPeer is
// different every time due to us using a map. Sometimes other reactors
// will be initialized before the consensus reactor. We should wait a few
// milliseconds and retry.
return nil
}
// NOTE: We only send evidence to peers where
// peerHeight - maxAge < evidenceHeight < peerHeight
var (
peerHeight = peerState.GetHeight()
params = evR.evpool.State().ConsensusParams.Evidence
ageNumBlocks = peerHeight - evHeight
)
if peerHeight <= evHeight { // peer is behind. sleep while he catches up
return nil
} else if ageNumBlocks > params.MaxAgeNumBlocks { // evidence is too old relative to the peer, skip
// NOTE: if evidence is too old for an honest peer, then we're behind and
// either it already got committed or it never will!
evR.Logger.Info("Not sending peer old evidence",
"peerHeight", peerHeight,
"evHeight", evHeight,
"maxAgeNumBlocks", params.MaxAgeNumBlocks,
"lastBlockTime", evR.evpool.State().LastBlockTime,
"maxAgeDuration", params.MaxAgeDuration,
"peer", peer,
)
return nil
}
// send evidence
return []types.Evidence{ev}
}
// PeerState describes the state of a peer.
type PeerState interface {
GetHeight() int64
}
// encodemsg takes a array of evidence
// returns the byte encoding of the List Message
func encodeMsg(evis []types.Evidence) ([]byte, error) {
evi := make([]tmproto.Evidence, len(evis))
for i := 0; i < len(evis); i++ {
ev, err := types.EvidenceToProto(evis[i])
if err != nil {
return nil, err
}
evi[i] = *ev
}
epl := tmproto.EvidenceList{
Evidence: evi,
}
return epl.Marshal()
}
// decodemsg takes an array of bytes
// returns an array of evidence
func decodeMsg(bz []byte) (evis []types.Evidence, err error) {
lm := tmproto.EvidenceList{}
if err := lm.Unmarshal(bz); err != nil {
return nil, err
}
evis = make([]types.Evidence, len(lm.Evidence))
for i := 0; i < len(lm.Evidence); i++ {
ev, err := types.EvidenceFromProto(&lm.Evidence[i])
if err != nil {
return nil, err
}
evis[i] = ev
}
for i, ev := range evis {
if err := ev.ValidateBasic(); err != nil {
return nil, fmt.Errorf("invalid evidence (#%d): %v", i, err)
}
}
return evis, nil
}