package consensus
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
cstypes "github.com/tendermint/tendermint/internal/consensus/types"
|
|
"github.com/tendermint/tendermint/libs/bits"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
tmtime "github.com/tendermint/tendermint/libs/time"
|
|
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
var (
|
|
ErrPeerStateHeightRegression = errors.New("peer state height regression")
|
|
ErrPeerStateInvalidStartTime = errors.New("peer state invalid startTime")
|
|
)
|
|
|
|
// peerStateStats holds internal statistics for a peer.
|
|
type peerStateStats struct {
|
|
Votes int `json:"votes,string"`
|
|
BlockParts int `json:"block_parts,string"`
|
|
}
|
|
|
|
func (pss peerStateStats) String() string {
|
|
return fmt.Sprintf("peerStateStats{votes: %d, blockParts: %d}", pss.Votes, pss.BlockParts)
|
|
}
|
|
|
|
// PeerState contains the known state of a peer, including its connection and
|
|
// threadsafe access to its PeerRoundState.
|
|
// NOTE: THIS GETS DUMPED WITH rpc/core/consensus.go.
|
|
// Be mindful of what you Expose.
|
|
type PeerState struct {
|
|
peerID types.NodeID
|
|
logger log.Logger
|
|
|
|
// NOTE: Modify below using setters, never directly.
|
|
mtx sync.RWMutex
|
|
running bool
|
|
PRS cstypes.PeerRoundState `json:"round_state"`
|
|
Stats *peerStateStats `json:"stats"`
|
|
|
|
broadcastWG sync.WaitGroup
|
|
}
|
|
|
|
// NewPeerState returns a new PeerState for the given node ID.
|
|
func NewPeerState(logger log.Logger, peerID types.NodeID) *PeerState {
|
|
return &PeerState{
|
|
peerID: peerID,
|
|
logger: logger,
|
|
PRS: cstypes.PeerRoundState{
|
|
Round: -1,
|
|
ProposalPOLRound: -1,
|
|
LastCommitRound: -1,
|
|
CatchupCommitRound: -1,
|
|
},
|
|
Stats: &peerStateStats{},
|
|
}
|
|
}
|
|
|
|
// SetRunning sets the running state of the peer.
|
|
func (ps *PeerState) SetRunning(v bool) {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
ps.running = v
|
|
}
|
|
|
|
// IsRunning returns true if a PeerState is considered running where multiple
|
|
// broadcasting goroutines exist for the peer.
|
|
func (ps *PeerState) IsRunning() bool {
|
|
ps.mtx.RLock()
|
|
defer ps.mtx.RUnlock()
|
|
|
|
return ps.running
|
|
}
|
|
|
|
// GetRoundState returns a shallow copy of the PeerRoundState. There's no point
|
|
// in mutating it since it won't change PeerState.
|
|
func (ps *PeerState) GetRoundState() *cstypes.PeerRoundState {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
prs := ps.PRS.Copy()
|
|
return &prs
|
|
}
|
|
|
|
// ToJSON returns a json of PeerState.
|
|
func (ps *PeerState) ToJSON() ([]byte, error) {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
return json.Marshal(ps)
|
|
}
|
|
|
|
// GetHeight returns an atomic snapshot of the PeerRoundState's height used by
|
|
// the mempool to ensure peers are caught up before broadcasting new txs.
|
|
func (ps *PeerState) GetHeight() int64 {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
return ps.PRS.Height
|
|
}
|
|
|
|
// SetHasProposal sets the given proposal as known for the peer.
|
|
func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
if ps.PRS.Height != proposal.Height || ps.PRS.Round != proposal.Round {
|
|
return
|
|
}
|
|
|
|
if ps.PRS.Proposal {
|
|
return
|
|
}
|
|
|
|
ps.PRS.Proposal = true
|
|
|
|
// ps.PRS.ProposalBlockParts is set due to NewValidBlockMessage
|
|
if ps.PRS.ProposalBlockParts != nil {
|
|
return
|
|
}
|
|
|
|
ps.PRS.ProposalBlockPartSetHeader = proposal.BlockID.PartSetHeader
|
|
ps.PRS.ProposalBlockParts = bits.NewBitArray(int(proposal.BlockID.PartSetHeader.Total))
|
|
ps.PRS.ProposalPOLRound = proposal.POLRound
|
|
ps.PRS.ProposalPOL = nil // Nil until ProposalPOLMessage received.
|
|
}
|
|
|
|
// InitProposalBlockParts initializes the peer's proposal block parts header
|
|
// and bit array.
|
|
func (ps *PeerState) InitProposalBlockParts(partSetHeader types.PartSetHeader) {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
if ps.PRS.ProposalBlockParts != nil {
|
|
return
|
|
}
|
|
|
|
ps.PRS.ProposalBlockPartSetHeader = partSetHeader
|
|
ps.PRS.ProposalBlockParts = bits.NewBitArray(int(partSetHeader.Total))
|
|
}
|
|
|
|
// SetHasProposalBlockPart sets the given block part index as known for the peer.
|
|
func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index int) {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
if ps.PRS.Height != height || ps.PRS.Round != round {
|
|
return
|
|
}
|
|
|
|
ps.PRS.ProposalBlockParts.SetIndex(index, true)
|
|
}
|
|
|
|
// PickVoteToSend picks a vote to send to the peer. It will return true if a
|
|
// vote was picked.
|
|
//
|
|
// NOTE: `votes` must be the correct Size() for the Height().
|
|
func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (*types.Vote, bool) {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
if votes.Size() == 0 {
|
|
return nil, false
|
|
}
|
|
|
|
var (
|
|
height = votes.GetHeight()
|
|
round = votes.GetRound()
|
|
votesType = tmproto.SignedMsgType(votes.Type())
|
|
size = votes.Size()
|
|
)
|
|
|
|
// lazily set data using 'votes'
|
|
if votes.IsCommit() {
|
|
ps.ensureCatchupCommitRound(height, round, size)
|
|
}
|
|
|
|
ps.ensureVoteBitArrays(height, size)
|
|
|
|
psVotes := ps.getVoteBitArray(height, round, votesType)
|
|
if psVotes == nil {
|
|
return nil, false // not something worth sending
|
|
}
|
|
|
|
if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
|
|
vote := votes.GetByIndex(int32(index))
|
|
if vote != nil {
|
|
return vote, true
|
|
}
|
|
}
|
|
|
|
return nil, false
|
|
}
|
|
|
|
func (ps *PeerState) getVoteBitArray(height int64, round int32, votesType tmproto.SignedMsgType) *bits.BitArray {
|
|
if !types.IsVoteTypeValid(votesType) {
|
|
return nil
|
|
}
|
|
|
|
if ps.PRS.Height == height {
|
|
if ps.PRS.Round == round {
|
|
switch votesType {
|
|
case tmproto.PrevoteType:
|
|
return ps.PRS.Prevotes
|
|
|
|
case tmproto.PrecommitType:
|
|
return ps.PRS.Precommits
|
|
}
|
|
}
|
|
|
|
if ps.PRS.CatchupCommitRound == round {
|
|
switch votesType {
|
|
case tmproto.PrevoteType:
|
|
return nil
|
|
|
|
case tmproto.PrecommitType:
|
|
return ps.PRS.CatchupCommit
|
|
}
|
|
}
|
|
|
|
if ps.PRS.ProposalPOLRound == round {
|
|
switch votesType {
|
|
case tmproto.PrevoteType:
|
|
return ps.PRS.ProposalPOL
|
|
|
|
case tmproto.PrecommitType:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
if ps.PRS.Height == height+1 {
|
|
if ps.PRS.LastCommitRound == round {
|
|
switch votesType {
|
|
case tmproto.PrevoteType:
|
|
return nil
|
|
|
|
case tmproto.PrecommitType:
|
|
return ps.PRS.LastCommit
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// 'round': A round for which we have a +2/3 commit.
|
|
func (ps *PeerState) ensureCatchupCommitRound(height int64, round int32, numValidators int) {
|
|
if ps.PRS.Height != height {
|
|
return
|
|
}
|
|
|
|
/*
|
|
NOTE: This is wrong, 'round' could change.
|
|
e.g. if orig round is not the same as block LastCommit round.
|
|
if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
|
|
panic(fmt.Sprintf(
|
|
"Conflicting CatchupCommitRound. Height: %v,
|
|
Orig: %v,
|
|
New: %v",
|
|
height,
|
|
ps.CatchupCommitRound,
|
|
round))
|
|
}
|
|
*/
|
|
|
|
if ps.PRS.CatchupCommitRound == round {
|
|
return // Nothing to do!
|
|
}
|
|
|
|
ps.PRS.CatchupCommitRound = round
|
|
if round == ps.PRS.Round {
|
|
ps.PRS.CatchupCommit = ps.PRS.Precommits
|
|
} else {
|
|
ps.PRS.CatchupCommit = bits.NewBitArray(numValidators)
|
|
}
|
|
}
|
|
|
|
// EnsureVoteBitArrays ensures the bit-arrays have been allocated for tracking
|
|
// what votes this peer has received.
|
|
// NOTE: It's important to make sure that numValidators actually matches
|
|
// what the node sees as the number of validators for height.
|
|
func (ps *PeerState) EnsureVoteBitArrays(height int64, numValidators int) {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
ps.ensureVoteBitArrays(height, numValidators)
|
|
}
|
|
|
|
func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) {
|
|
if ps.PRS.Height == height {
|
|
if ps.PRS.Prevotes == nil {
|
|
ps.PRS.Prevotes = bits.NewBitArray(numValidators)
|
|
}
|
|
if ps.PRS.Precommits == nil {
|
|
ps.PRS.Precommits = bits.NewBitArray(numValidators)
|
|
}
|
|
if ps.PRS.CatchupCommit == nil {
|
|
ps.PRS.CatchupCommit = bits.NewBitArray(numValidators)
|
|
}
|
|
if ps.PRS.ProposalPOL == nil {
|
|
ps.PRS.ProposalPOL = bits.NewBitArray(numValidators)
|
|
}
|
|
} else if ps.PRS.Height == height+1 {
|
|
if ps.PRS.LastCommit == nil {
|
|
ps.PRS.LastCommit = bits.NewBitArray(numValidators)
|
|
}
|
|
}
|
|
}
|
|
|
|
// RecordVote increments internal votes related statistics for this peer.
|
|
// It returns the total number of added votes.
|
|
func (ps *PeerState) RecordVote() int {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
ps.Stats.Votes++
|
|
|
|
return ps.Stats.Votes
|
|
}
|
|
|
|
// VotesSent returns the number of blocks for which peer has been sending us
|
|
// votes.
|
|
func (ps *PeerState) VotesSent() int {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
return ps.Stats.Votes
|
|
}
|
|
|
|
// RecordBlockPart increments internal block part related statistics for this peer.
|
|
// It returns the total number of added block parts.
|
|
func (ps *PeerState) RecordBlockPart() int {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
ps.Stats.BlockParts++
|
|
return ps.Stats.BlockParts
|
|
}
|
|
|
|
// BlockPartsSent returns the number of useful block parts the peer has sent us.
|
|
func (ps *PeerState) BlockPartsSent() int {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
return ps.Stats.BlockParts
|
|
}
|
|
|
|
// SetHasVote sets the given vote as known by the peer
|
|
func (ps *PeerState) SetHasVote(vote *types.Vote) {
|
|
if vote == nil {
|
|
return
|
|
}
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
|
|
}
|
|
|
|
func (ps *PeerState) setHasVote(height int64, round int32, voteType tmproto.SignedMsgType, index int32) {
|
|
logger := ps.logger.With(
|
|
"peerH/R", fmt.Sprintf("%d/%d", ps.PRS.Height, ps.PRS.Round),
|
|
"H/R", fmt.Sprintf("%d/%d", height, round),
|
|
)
|
|
|
|
logger.Debug("setHasVote", "type", voteType, "index", index)
|
|
|
|
// NOTE: some may be nil BitArrays -> no side effects
|
|
psVotes := ps.getVoteBitArray(height, round, voteType)
|
|
if psVotes != nil {
|
|
psVotes.SetIndex(int(index), true)
|
|
}
|
|
}
|
|
|
|
// ApplyNewRoundStepMessage updates the peer state for the new round.
|
|
func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
// ignore duplicates or decreases
|
|
if CompareHRS(msg.Height, msg.Round, msg.Step, ps.PRS.Height, ps.PRS.Round, ps.PRS.Step) <= 0 {
|
|
return
|
|
}
|
|
|
|
var (
|
|
psHeight = ps.PRS.Height
|
|
psRound = ps.PRS.Round
|
|
psCatchupCommitRound = ps.PRS.CatchupCommitRound
|
|
psCatchupCommit = ps.PRS.CatchupCommit
|
|
startTime = tmtime.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
|
|
)
|
|
|
|
ps.PRS.Height = msg.Height
|
|
ps.PRS.Round = msg.Round
|
|
ps.PRS.Step = msg.Step
|
|
ps.PRS.StartTime = startTime
|
|
|
|
if psHeight != msg.Height || psRound != msg.Round {
|
|
ps.PRS.Proposal = false
|
|
ps.PRS.ProposalBlockPartSetHeader = types.PartSetHeader{}
|
|
ps.PRS.ProposalBlockParts = nil
|
|
ps.PRS.ProposalPOLRound = -1
|
|
ps.PRS.ProposalPOL = nil
|
|
|
|
// we'll update the BitArray capacity later
|
|
ps.PRS.Prevotes = nil
|
|
ps.PRS.Precommits = nil
|
|
}
|
|
|
|
if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
|
|
// Peer caught up to CatchupCommitRound.
|
|
// Preserve psCatchupCommit!
|
|
// NOTE: We prefer to use prs.Precommits if
|
|
// pr.Round matches pr.CatchupCommitRound.
|
|
ps.PRS.Precommits = psCatchupCommit
|
|
}
|
|
|
|
if psHeight != msg.Height {
|
|
// shift Precommits to LastCommit
|
|
if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
|
|
ps.PRS.LastCommitRound = msg.LastCommitRound
|
|
ps.PRS.LastCommit = ps.PRS.Precommits
|
|
} else {
|
|
ps.PRS.LastCommitRound = msg.LastCommitRound
|
|
ps.PRS.LastCommit = nil
|
|
}
|
|
|
|
// we'll update the BitArray capacity later
|
|
ps.PRS.CatchupCommitRound = -1
|
|
ps.PRS.CatchupCommit = nil
|
|
}
|
|
}
|
|
|
|
// ApplyNewValidBlockMessage updates the peer state for the new valid block.
|
|
func (ps *PeerState) ApplyNewValidBlockMessage(msg *NewValidBlockMessage) {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
if ps.PRS.Height != msg.Height {
|
|
return
|
|
}
|
|
|
|
if ps.PRS.Round != msg.Round && !msg.IsCommit {
|
|
return
|
|
}
|
|
|
|
ps.PRS.ProposalBlockPartSetHeader = msg.BlockPartSetHeader
|
|
ps.PRS.ProposalBlockParts = msg.BlockParts
|
|
}
|
|
|
|
// ApplyProposalPOLMessage updates the peer state for the new proposal POL.
|
|
func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
if ps.PRS.Height != msg.Height {
|
|
return
|
|
}
|
|
if ps.PRS.ProposalPOLRound != msg.ProposalPOLRound {
|
|
return
|
|
}
|
|
|
|
// TODO: Merge onto existing ps.PRS.ProposalPOL?
|
|
// We might have sent some prevotes in the meantime.
|
|
ps.PRS.ProposalPOL = msg.ProposalPOL
|
|
}
|
|
|
|
// ApplyHasVoteMessage updates the peer state for the new vote.
|
|
func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
if ps.PRS.Height != msg.Height {
|
|
return
|
|
}
|
|
|
|
ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
|
|
}
|
|
|
|
// ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
|
|
// it claims to have for the corresponding BlockID.
|
|
// `ourVotes` is a BitArray of votes we have for msg.BlockID
|
|
// NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
|
|
// we conservatively overwrite ps's votes w/ msg.Votes.
|
|
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *bits.BitArray) {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
|
|
votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type)
|
|
if votes != nil {
|
|
if ourVotes == nil {
|
|
votes.Update(msg.Votes)
|
|
} else {
|
|
otherVotes := votes.Sub(ourVotes)
|
|
hasVotes := otherVotes.Or(msg.Votes)
|
|
votes.Update(hasVotes)
|
|
}
|
|
}
|
|
}
|
|
|
|
// String returns a string representation of the PeerState
|
|
func (ps *PeerState) String() string {
|
|
return ps.StringIndented("")
|
|
}
|
|
|
|
// StringIndented returns a string representation of the PeerState
|
|
func (ps *PeerState) StringIndented(indent string) string {
|
|
ps.mtx.Lock()
|
|
defer ps.mtx.Unlock()
|
|
return fmt.Sprintf(`PeerState{
|
|
%s Key %v
|
|
%s RoundState %v
|
|
%s Stats %v
|
|
%s}`,
|
|
indent, ps.peerID,
|
|
indent, ps.PRS.StringIndented(indent+" "),
|
|
indent, ps.Stats,
|
|
indent,
|
|
)
|
|
}
|