Browse Source

breakup some long lines; add more comments to consensus reactor

pull/576/head
Ethan Buchman 7 years ago
parent
commit
5f6b996d22
2 changed files with 187 additions and 111 deletions
  1. +175
    -107
      consensus/reactor.go
  2. +12
    -4
      p2p/peer.go

+ 175
- 107
consensus/reactor.go View File

@ -9,10 +9,12 @@ import (
"time" "time"
wire "github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
. "github.com/tendermint/tmlibs/common"
) )
const ( const (
@ -26,6 +28,7 @@ const (
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// ConsensusReactor defines a reactor for the consensus service.
type ConsensusReactor struct { type ConsensusReactor struct {
p2p.BaseReactor // BaseService + p2p.Switch p2p.BaseReactor // BaseService + p2p.Switch
@ -34,6 +37,7 @@ type ConsensusReactor struct {
evsw types.EventSwitch evsw types.EventSwitch
} }
// NewConsensusReactor returns a new ConsensusReactor with the given consensusState.
func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor { func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor {
conR := &ConsensusReactor{ conR := &ConsensusReactor{
conS: consensusState, conS: consensusState,
@ -43,6 +47,7 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens
return conR return conR
} }
// OnStart implements BaseService.
func (conR *ConsensusReactor) OnStart() error { func (conR *ConsensusReactor) OnStart() error {
conR.Logger.Info("ConsensusReactor ", "fastSync", conR.fastSync) conR.Logger.Info("ConsensusReactor ", "fastSync", conR.fastSync)
conR.BaseReactor.OnStart() conR.BaseReactor.OnStart()
@ -66,8 +71,8 @@ func (conR *ConsensusReactor) OnStop() {
conR.conS.Stop() conR.conS.Stop()
} }
// Switch from the fast_sync to the consensus:
// reset the state, turn off fast_sync, start the consensus-state-machine
// SwitchToConsensus switches from fast_sync mode to consensus mode.
// It resets the state, turns off fast_sync, and starts the consensus state-machine
func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
conR.Logger.Info("SwitchToConsensus") conR.Logger.Info("SwitchToConsensus")
conR.conS.reconstructLastCommit(state) conR.conS.reconstructLastCommit(state)
@ -183,7 +188,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID) votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID)
// Respond with a VoteSetBitsMessage showing which votes we have. // Respond with a VoteSetBitsMessage showing which votes we have.
// (and consequently shows which we don't have) // (and consequently shows which we don't have)
var ourVotes *BitArray
var ourVotes *cmn.BitArray
switch msg.Type { switch msg.Type {
case types.VoteTypePrevote: case types.VoteTypePrevote:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
@ -201,7 +206,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
Votes: ourVotes, Votes: ourVotes,
}}) }})
default: default:
conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
case DataChannel: case DataChannel:
@ -219,7 +224,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key} conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
default: default:
conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
case VoteChannel: case VoteChannel:
@ -241,7 +246,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
default: default:
// don't punish (leave room for soft upgrades) // don't punish (leave room for soft upgrades)
conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
case VoteSetBitsChannel: case VoteSetBitsChannel:
@ -257,7 +262,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
cs.mtx.Unlock() cs.mtx.Unlock()
if height == msg.Height { if height == msg.Height {
var ourVotes *BitArray
var ourVotes *cmn.BitArray
switch msg.Type { switch msg.Type {
case types.VoteTypePrevote: case types.VoteTypePrevote:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
@ -273,11 +278,11 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
} }
default: default:
// don't punish (leave room for soft upgrades) // don't punish (leave room for soft upgrades)
conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
default: default:
conR.Logger.Error(Fmt("Unknown chId %X", chID))
conR.Logger.Error(cmn.Fmt("Unknown chId %X", chID))
} }
if err != nil { if err != nil {
@ -285,7 +290,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
} }
} }
// implements events.Eventable
// SetEventSwitch implements events.Eventable
func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) {
conR.evsw = evsw conR.evsw = evsw
conR.conS.SetEventSwitch(evsw) conR.conS.SetEventSwitch(evsw)
@ -406,43 +411,9 @@ OUTER_LOOP:
// If the peer is on a previous height, help catch up. // If the peer is on a previous height, help catch up.
if (0 < prs.Height) && (prs.Height < rs.Height) { if (0 < prs.Height) && (prs.Height < rs.Height) {
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
// Ensure that the peer's PartSetHeader is correct
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
if blockMeta == nil {
logger.Error("Failed to load block meta", "peer height", prs.Height, "ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height(), "pv", conR.conS.privValidator)
time.Sleep(conR.conS.config.PeerGossipSleep())
continue OUTER_LOOP
} else if !blockMeta.BlockID.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
logger.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
"peerHeight", prs.Height, "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
time.Sleep(conR.conS.config.PeerGossipSleep())
continue OUTER_LOOP
}
// Load the part
part := conR.conS.blockStore.LoadBlockPart(prs.Height, index)
if part == nil {
logger.Error("Could not load part", "index", index,
"peerHeight", prs.Height, "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
time.Sleep(conR.conS.config.PeerGossipSleep())
continue OUTER_LOOP
}
// Send the part
msg := &BlockPartMessage{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
}
logger.Debug("Sending block part for catchup", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
} else {
//logger.Info("No parts to send in catch-up, sleeping")
time.Sleep(conR.conS.config.PeerGossipSleep())
continue OUTER_LOOP
}
heightLogger := logger.With("height", prs.Height)
conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer)
continue OUTER_LOOP
} }
// If height and round don't match, sleep. // If height and round don't match, sleep.
@ -489,6 +460,49 @@ OUTER_LOOP:
} }
} }
func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *RoundState,
prs *PeerRoundState, ps *PeerState, peer *p2p.Peer) {
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
// Ensure that the peer's PartSetHeader is correct
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
if blockMeta == nil {
logger.Error("Failed to load block meta",
"ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height())
time.Sleep(conR.conS.config.PeerGossipSleep())
return
} else if !blockMeta.BlockID.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
logger.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
"blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
time.Sleep(conR.conS.config.PeerGossipSleep())
return
}
// Load the part
part := conR.conS.blockStore.LoadBlockPart(prs.Height, index)
if part == nil {
logger.Error("Could not load part", "index", index,
"blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
time.Sleep(conR.conS.config.PeerGossipSleep())
return
}
// Send the part
msg := &BlockPartMessage{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
}
logger.Debug("Sending block part for catchup", "round", prs.Round)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
return
} else {
//logger.Info("No parts to send in catch-up, sleeping")
time.Sleep(conR.conS.config.PeerGossipSleep())
return
}
}
func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) { func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer) logger := conR.Logger.With("peer", peer)
@ -517,35 +531,9 @@ OUTER_LOOP:
// If height matches, then send LastCommit, Prevotes, Precommits. // If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height { if rs.Height == prs.Height {
// If there are lastCommits to send...
if prs.Step == RoundStepNewHeight {
if ps.PickSendVote(rs.LastCommit) {
logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
continue OUTER_LOOP
}
}
// If there are prevotes to send...
if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
logger.Debug("Picked rs.Prevotes(prs.Round) to send", "height", prs.Height, "round", prs.Round)
continue OUTER_LOOP
}
}
// If there are precommits to send...
if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
logger.Debug("Picked rs.Precommits(prs.Round) to send", "height", prs.Height, "round", prs.Round)
continue OUTER_LOOP
}
}
// If there are POLPrevotes to send...
if prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ps.PickSendVote(polPrevotes) {
logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", "height", prs.Height, "round", prs.ProposalPOLRound)
continue OUTER_LOOP
}
}
heightLogger := logger.With("height", prs.Height)
if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) {
continue OUTER_LOOP
} }
} }
@ -587,6 +575,42 @@ OUTER_LOOP:
} }
} }
func (conR *ConsensusReactor) gossipVotesForHeight(logger log.Logger, rs *RoundState, prs *PeerRoundState, ps *PeerState) bool {
// If there are lastCommits to send...
if prs.Step == RoundStepNewHeight {
if ps.PickSendVote(rs.LastCommit) {
logger.Debug("Picked rs.LastCommit to send")
return true
}
}
// If there are prevotes to send...
if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are precommits to send...
if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are POLPrevotes to send...
if prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ps.PickSendVote(polPrevotes) {
logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
"round", prs.ProposalPOLRound)
return true
}
}
}
return false
}
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes // NOTE: `queryMaj23Routine` has a simple crude design since it only comes
// into play for liveness when there's a signature DDoS attack happening. // into play for liveness when there's a signature DDoS attack happening.
func (conR *ConsensusReactor) queryMaj23Routine(peer *p2p.Peer, ps *PeerState) { func (conR *ConsensusReactor) queryMaj23Routine(peer *p2p.Peer, ps *PeerState) {
@ -675,11 +699,15 @@ OUTER_LOOP:
} }
} }
// String returns a string representation of the ConsensusReactor.
// NOTE: For now, it is just a hard-coded string to avoid accessing unprotected shared variables.
// TODO: improve!
func (conR *ConsensusReactor) String() string { func (conR *ConsensusReactor) String() string {
// better not to access shared variables // better not to access shared variables
return "ConsensusReactor" // conR.StringIndented("") return "ConsensusReactor" // conR.StringIndented("")
} }
// StringIndented returns an indented string representation of the ConsensusReactor
func (conR *ConsensusReactor) StringIndented(indent string) string { func (conR *ConsensusReactor) StringIndented(indent string) string {
s := "ConsensusReactor{\n" s := "ConsensusReactor{\n"
s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n" s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n"
@ -693,7 +721,8 @@ func (conR *ConsensusReactor) StringIndented(indent string) string {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Read only when returned by PeerState.GetRoundState().
// PeerRoundState contains the known state of a peer.
// NOTE: Read-only when returned by PeerState.GetRoundState().
type PeerRoundState struct { type PeerRoundState struct {
Height int // Height peer is at Height int // Height peer is at
Round int // Round peer is at, -1 if unknown. Round int // Round peer is at, -1 if unknown.
@ -701,21 +730,23 @@ type PeerRoundState struct {
StartTime time.Time // Estimated start of round 0 at this height StartTime time.Time // Estimated start of round 0 at this height
Proposal bool // True if peer has proposal for this round Proposal bool // True if peer has proposal for this round
ProposalBlockPartsHeader types.PartSetHeader // ProposalBlockPartsHeader types.PartSetHeader //
ProposalBlockParts *BitArray //
ProposalBlockParts *cmn.BitArray //
ProposalPOLRound int // Proposal's POL round. -1 if none. ProposalPOLRound int // Proposal's POL round. -1 if none.
ProposalPOL *BitArray // nil until ProposalPOLMessage received.
Prevotes *BitArray // All votes peer has for this round
Precommits *BitArray // All precommits peer has for this round
ProposalPOL *cmn.BitArray // nil until ProposalPOLMessage received.
Prevotes *cmn.BitArray // All votes peer has for this round
Precommits *cmn.BitArray // All precommits peer has for this round
LastCommitRound int // Round of commit for last height. -1 if none. LastCommitRound int // Round of commit for last height. -1 if none.
LastCommit *BitArray // All commit precommits of commit for last height.
LastCommit *cmn.BitArray // All commit precommits of commit for last height.
CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none. CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none.
CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound
CatchupCommit *cmn.BitArray // All commit precommits peer has for this height & CatchupCommitRound
} }
// String returns a string representation of the PeerRoundState
func (prs PeerRoundState) String() string { func (prs PeerRoundState) String() string {
return prs.StringIndented("") return prs.StringIndented("")
} }
// StringIndented returns a string representation of the PeerRoundState
func (prs PeerRoundState) StringIndented(indent string) string { func (prs PeerRoundState) StringIndented(indent string) string {
return fmt.Sprintf(`PeerRoundState{ return fmt.Sprintf(`PeerRoundState{
%s %v/%v/%v @%v %s %v/%v/%v @%v
@ -743,6 +774,8 @@ var (
ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime") ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
) )
// PeerState contains the known state of a peer, including its connection
// and threadsafe access to its PeerRoundState.
type PeerState struct { type PeerState struct {
Peer *p2p.Peer Peer *p2p.Peer
@ -750,6 +783,7 @@ type PeerState struct {
PeerRoundState PeerRoundState
} }
// NewPeerState returns a new PeerState for the given Peer
func NewPeerState(peer *p2p.Peer) *PeerState { func NewPeerState(peer *p2p.Peer) *PeerState {
return &PeerState{ return &PeerState{
Peer: peer, Peer: peer,
@ -762,7 +796,7 @@ func NewPeerState(peer *p2p.Peer) *PeerState {
} }
} }
// Returns an atomic snapshot of the PeerRoundState.
// GetRoundState returns an atomic snapshot of the PeerRoundState.
// There's no point in mutating it since it won't change PeerState. // There's no point in mutating it since it won't change PeerState.
func (ps *PeerState) GetRoundState() *PeerRoundState { func (ps *PeerState) GetRoundState() *PeerRoundState {
ps.mtx.Lock() ps.mtx.Lock()
@ -772,7 +806,7 @@ func (ps *PeerState) GetRoundState() *PeerRoundState {
return &prs return &prs
} }
// Returns an atomic snapshot of the PeerRoundState's height
// GetHeight returns an atomic snapshot of the PeerRoundState's height
// used by the mempool to ensure peers are caught up before broadcasting new txs // used by the mempool to ensure peers are caught up before broadcasting new txs
func (ps *PeerState) GetHeight() int { func (ps *PeerState) GetHeight() int {
ps.mtx.Lock() ps.mtx.Lock()
@ -780,6 +814,7 @@ func (ps *PeerState) GetHeight() int {
return ps.PeerRoundState.Height return ps.PeerRoundState.Height
} }
// SetHasProposal sets the given proposal as known for the peer.
func (ps *PeerState) SetHasProposal(proposal *types.Proposal) { func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -793,11 +828,12 @@ func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
ps.Proposal = true ps.Proposal = true
ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total)
ps.ProposalBlockParts = cmn.NewBitArray(proposal.BlockPartsHeader.Total)
ps.ProposalPOLRound = proposal.POLRound ps.ProposalPOLRound = proposal.POLRound
ps.ProposalPOL = nil // Nil until ProposalPOLMessage received. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
} }
// SetHasProposalBlockPart sets the given block part index as known for the peer.
func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) { func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -809,9 +845,9 @@ func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
ps.ProposalBlockParts.SetIndex(index, true) ps.ProposalBlockParts.SetIndex(index, true)
} }
// PickVoteToSend sends vote to peer.
// PickSendVote picks a vote and sends it to the peer.
// Returns true if vote was sent. // Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok { if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote} msg := &VoteMessage{vote}
return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg}) return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
@ -819,7 +855,9 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
return false return false
} }
// votes: Must be the correct Size() for the Height().
// PickVoteToSend picks a vote to send to the peer.
// Returns true if a vote was picked.
// NOTE: `votes` must be the correct Size() for the Height().
func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) { func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -847,9 +885,9 @@ func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote
return nil, false return nil, false
} }
func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *cmn.BitArray {
if !types.IsVoteTypeValid(type_) { if !types.IsVoteTypeValid(type_) {
PanicSanity("Invalid vote type")
cmn.PanicSanity("Invalid vote type")
} }
if ps.Height == height { if ps.Height == height {
@ -902,7 +940,7 @@ func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators i
NOTE: This is wrong, 'round' could change. NOTE: This is wrong, 'round' could change.
e.g. if orig round is not the same as block LastCommit round. e.g. if orig round is not the same as block LastCommit round.
if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round { if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
PanicSanity(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
cmn.PanicSanity(cmn.Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
} }
*/ */
if ps.CatchupCommitRound == round { if ps.CatchupCommitRound == round {
@ -912,10 +950,12 @@ func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators i
if round == ps.Round { if round == ps.Round {
ps.CatchupCommit = ps.Precommits ps.CatchupCommit = ps.Precommits
} else { } else {
ps.CatchupCommit = NewBitArray(numValidators)
ps.CatchupCommit = cmn.NewBitArray(numValidators)
} }
} }
// EnsureVoteVitArrays ensures the bit-arrays have been allocated for tracking
// what votes this peer has received.
// NOTE: It's important to make sure that numValidators actually matches // NOTE: It's important to make sure that numValidators actually matches
// what the node sees as the number of validators for height. // what the node sees as the number of validators for height.
func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) { func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
@ -927,24 +967,25 @@ func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) { func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
if ps.Height == height { if ps.Height == height {
if ps.Prevotes == nil { if ps.Prevotes == nil {
ps.Prevotes = NewBitArray(numValidators)
ps.Prevotes = cmn.NewBitArray(numValidators)
} }
if ps.Precommits == nil { if ps.Precommits == nil {
ps.Precommits = NewBitArray(numValidators)
ps.Precommits = cmn.NewBitArray(numValidators)
} }
if ps.CatchupCommit == nil { if ps.CatchupCommit == nil {
ps.CatchupCommit = NewBitArray(numValidators)
ps.CatchupCommit = cmn.NewBitArray(numValidators)
} }
if ps.ProposalPOL == nil { if ps.ProposalPOL == nil {
ps.ProposalPOL = NewBitArray(numValidators)
ps.ProposalPOL = cmn.NewBitArray(numValidators)
} }
} else if ps.Height == height+1 { } else if ps.Height == height+1 {
if ps.LastCommit == nil { if ps.LastCommit == nil {
ps.LastCommit = NewBitArray(numValidators)
ps.LastCommit = cmn.NewBitArray(numValidators)
} }
} }
} }
// SetHasVote sets the given vote as known by the peer
func (ps *PeerState) SetHasVote(vote *types.Vote) { func (ps *PeerState) SetHasVote(vote *types.Vote) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -960,6 +1001,7 @@ func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
ps.getVoteBitArray(height, round, type_).SetIndex(index, true) ps.getVoteBitArray(height, round, type_).SetIndex(index, true)
} }
// ApplyNewRoundStepMessage updates the peer state for the new round.
func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -1013,6 +1055,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
} }
} }
// ApplyCommitStepMessage updates the peer state for the new commit.
func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) { func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -1025,6 +1068,7 @@ func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
ps.ProposalBlockParts = msg.BlockParts ps.ProposalBlockParts = msg.BlockParts
} }
// ApplyProposalPOLMessage updates the peer state for the new proposal POL.
func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) { func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -1041,6 +1085,7 @@ func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
ps.ProposalPOL = msg.ProposalPOL ps.ProposalPOL = msg.ProposalPOL
} }
// ApplyHasVoteMessage updates the peer state for the new vote.
func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) { func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -1052,12 +1097,12 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index) ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
} }
// The peer has responded with a bitarray of votes that it has
// of the corresponding BlockID.
// ourVotes: BitArray of votes we have for msg.BlockID
// ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
// it claims to have for the corresponding BlockID.
// `ourVotes` is a BitArray of votes we have for msg.BlockID
// NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height), // NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
// we conservatively overwrite ps's votes w/ msg.Votes. // we conservatively overwrite ps's votes w/ msg.Votes.
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *BitArray) {
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *cmn.BitArray) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -1073,10 +1118,12 @@ func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *
} }
} }
// String returns a string representation of the PeerState
func (ps *PeerState) String() string { func (ps *PeerState) String() string {
return ps.StringIndented("") return ps.StringIndented("")
} }
// StringIndented returns a string representation of the PeerState
func (ps *PeerState) StringIndented(indent string) string { func (ps *PeerState) StringIndented(indent string) string {
return fmt.Sprintf(`PeerState{ return fmt.Sprintf(`PeerState{
%s Key %v %s Key %v
@ -1102,6 +1149,7 @@ const (
msgTypeVoteSetBits = byte(0x17) msgTypeVoteSetBits = byte(0x17)
) )
// ConsensusMessage is a message that can be sent and received on the ConsensusReactor
type ConsensusMessage interface{} type ConsensusMessage interface{}
var _ = wire.RegisterInterface( var _ = wire.RegisterInterface(
@ -1117,17 +1165,20 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits}, wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits},
) )
// DecodeMessage decodes the given bytes into a ConsensusMessage.
// TODO: check for unnecessary extra bytes at the end. // TODO: check for unnecessary extra bytes at the end.
func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) { func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
msgType = bz[0] msgType = bz[0]
n := new(int) n := new(int)
r := bytes.NewReader(bz) r := bytes.NewReader(bz)
msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
msgI := wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err)
msg = msgI.(struct{ ConsensusMessage }).ConsensusMessage
return return
} }
//------------------------------------- //-------------------------------------
// NewRoundStepMessage is sent for every step taken in the ConsensusState.
// For every height/round/step transition // For every height/round/step transition
type NewRoundStepMessage struct { type NewRoundStepMessage struct {
Height int Height int
@ -1137,6 +1188,7 @@ type NewRoundStepMessage struct {
LastCommitRound int LastCommitRound int
} }
// String returns a string representation.
func (m *NewRoundStepMessage) String() string { func (m *NewRoundStepMessage) String() string {
return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]", return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
m.Height, m.Round, m.Step, m.LastCommitRound) m.Height, m.Round, m.Step, m.LastCommitRound)
@ -1144,62 +1196,73 @@ func (m *NewRoundStepMessage) String() string {
//------------------------------------- //-------------------------------------
// CommitStepMessage is sent when a block is committed.
type CommitStepMessage struct { type CommitStepMessage struct {
Height int Height int
BlockPartsHeader types.PartSetHeader BlockPartsHeader types.PartSetHeader
BlockParts *BitArray
BlockParts *cmn.BitArray
} }
// String returns a string representation.
func (m *CommitStepMessage) String() string { func (m *CommitStepMessage) String() string {
return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts) return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
} }
//------------------------------------- //-------------------------------------
// ProposalMessage is sent when a new block is proposed.
type ProposalMessage struct { type ProposalMessage struct {
Proposal *types.Proposal Proposal *types.Proposal
} }
// String returns a string representation.
func (m *ProposalMessage) String() string { func (m *ProposalMessage) String() string {
return fmt.Sprintf("[Proposal %v]", m.Proposal) return fmt.Sprintf("[Proposal %v]", m.Proposal)
} }
//------------------------------------- //-------------------------------------
// ProposalPOLMessage is sent when a previous proposal is re-proposed.
type ProposalPOLMessage struct { type ProposalPOLMessage struct {
Height int Height int
ProposalPOLRound int ProposalPOLRound int
ProposalPOL *BitArray
ProposalPOL *cmn.BitArray
} }
// String returns a string representation.
func (m *ProposalPOLMessage) String() string { func (m *ProposalPOLMessage) String() string {
return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL) return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
} }
//------------------------------------- //-------------------------------------
// BlockPartMessage is sent when gossipping a piece of the proposed block.
type BlockPartMessage struct { type BlockPartMessage struct {
Height int Height int
Round int Round int
Part *types.Part Part *types.Part
} }
// String returns a string representation.
func (m *BlockPartMessage) String() string { func (m *BlockPartMessage) String() string {
return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part) return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
} }
//------------------------------------- //-------------------------------------
// VoteMessage is sent when voting for a proposal (or lack thereof).
type VoteMessage struct { type VoteMessage struct {
Vote *types.Vote Vote *types.Vote
} }
// String returns a string representation.
func (m *VoteMessage) String() string { func (m *VoteMessage) String() string {
return fmt.Sprintf("[Vote %v]", m.Vote) return fmt.Sprintf("[Vote %v]", m.Vote)
} }
//------------------------------------- //-------------------------------------
// HasVoteMessage is sent to indicate that a particular vote has been received.
type HasVoteMessage struct { type HasVoteMessage struct {
Height int Height int
Round int Round int
@ -1207,12 +1270,14 @@ type HasVoteMessage struct {
Index int Index int
} }
// String returns a string representation.
func (m *HasVoteMessage) String() string { func (m *HasVoteMessage) String() string {
return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index) return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
} }
//------------------------------------- //-------------------------------------
// VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes.
type VoteSetMaj23Message struct { type VoteSetMaj23Message struct {
Height int Height int
Round int Round int
@ -1220,20 +1285,23 @@ type VoteSetMaj23Message struct {
BlockID types.BlockID BlockID types.BlockID
} }
// String returns a string representation.
func (m *VoteSetMaj23Message) String() string { func (m *VoteSetMaj23Message) String() string {
return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID) return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID)
} }
//------------------------------------- //-------------------------------------
// VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the BlockID.
type VoteSetBitsMessage struct { type VoteSetBitsMessage struct {
Height int Height int
Round int Round int
Type byte Type byte
BlockID types.BlockID BlockID types.BlockID
Votes *BitArray
Votes *cmn.BitArray
} }
// String returns a string representation.
func (m *VoteSetBitsMessage) String() string { func (m *VoteSetBitsMessage) String() string {
return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes) return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
} }

+ 12
- 4
p2p/peer.go View File

@ -59,7 +59,9 @@ func DefaultPeerConfig() *PeerConfig {
} }
} }
func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
conn, err := dial(addr, config) conn, err := dial(addr, config)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Error creating peer") return nil, errors.Wrap(err, "Error creating peer")
@ -73,11 +75,15 @@ func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []
return peer, nil return peer, nil
} }
func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
} }
func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
conn := rawConn conn := rawConn
// Fuzz connection // Fuzz connection
@ -278,7 +284,9 @@ func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
return conn, nil return conn, nil
} }
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
onReceive := func(chID byte, msgBytes []byte) { onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID] reactor := reactorsByCh[chID]
if reactor == nil { if reactor == nil {


Loading…
Cancel
Save