Browse Source

consensus/reactor cleanup

pull/96/head
Jae Kwon 10 years ago
parent
commit
7a6b8944ab
2 changed files with 33 additions and 30 deletions
  1. +30
    -27
      consensus/reactor.go
  2. +3
    -3
      consensus/state.go

+ 30
- 27
consensus/reactor.go View File

@ -168,7 +168,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
ps.ApplyProposalPOLMessage(msg) ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage: case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index) ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
_, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
_, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part)
default: default:
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
@ -342,12 +342,12 @@ OUTER_LOOP:
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok { if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
part := rs.ProposalBlockParts.GetPart(index) part := rs.ProposalBlockParts.GetPart(index)
msg := &BlockPartMessage{ msg := &BlockPartMessage{
Height: rs.Height,
Round: rs.Round,
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: part, Part: part,
} }
peer.Send(DataChannel, msg) peer.Send(DataChannel, msg)
ps.SetHasProposalBlockPart(rs.Height, rs.Round, index)
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
@ -374,8 +374,8 @@ OUTER_LOOP:
} }
// Send the part // Send the part
msg := &BlockPartMessage{ msg := &BlockPartMessage{
Height: prs.Height,
Round: prs.Round,
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part, Part: part,
} }
peer.Send(DataChannel, msg) peer.Send(DataChannel, msg)
@ -389,13 +389,16 @@ OUTER_LOOP:
} }
// If height and round don't match, sleep. // If height and round don't match, sleep.
if rs.Height != prs.Height || rs.Round != prs.Round {
if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
//log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer) //log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
time.Sleep(peerGossipSleepDuration) time.Sleep(peerGossipSleepDuration)
continue OUTER_LOOP continue OUTER_LOOP
} }
// By here, height and round match. // By here, height and round match.
// Proposal block parts were already matched and sent if any were wanted.
// (These can match on hash so the round doesn't matter)
// Now consider sending other things, like the Proposal itself.
// Send Proposal && ProposalPOL BitArray? // Send Proposal && ProposalPOL BitArray?
if rs.Proposal != nil && !prs.Proposal { if rs.Proposal != nil && !prs.Proposal {
@ -406,8 +409,9 @@ OUTER_LOOP:
ps.SetHasProposal(rs.Proposal) ps.SetHasProposal(rs.Proposal)
} }
// ProposalPOL. // ProposalPOL.
// Must be in the same channel, sequential.
// That is, peer must receive ProposalMessage first.
// Peer must receive ProposalMessage first.
// rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
if 0 <= rs.Proposal.POLRound { if 0 <= rs.Proposal.POLRound {
msg := &ProposalPOLMessage{ msg := &ProposalPOLMessage{
Height: rs.Height, Height: rs.Height,
@ -530,11 +534,13 @@ OUTER_LOOP:
// If peer is lagging by height 1, send LastCommit. // If peer is lagging by height 1, send LastCommit.
if prs.Height != 0 && prs.Height == rs.Height-1 { if prs.Height != 0 && prs.Height == rs.Height-1 {
if prs.Round == rs.LastCommit.Round() { if prs.Round == rs.LastCommit.Round() {
// NOTE: We prefer to use prs.Precommits if
// prs.Round matches prs.CatchupCommitRound.
if trySendVote(rs.LastCommit, &prs.Precommits) { if trySendVote(rs.LastCommit, &prs.Precommits) {
continue OUTER_LOOP continue OUTER_LOOP
} }
} else { } else {
ps.SetCatchupCommitRound(prs.Height, rs.LastCommit.Round())
ps.EnsureCatchupCommitRound(prs.Height, rs.LastCommit.Round())
if trySendVote(rs.LastCommit, &prs.CatchupCommit) { if trySendVote(rs.LastCommit, &prs.CatchupCommit) {
continue OUTER_LOOP continue OUTER_LOOP
} }
@ -548,15 +554,8 @@ OUTER_LOOP:
// which contains precommit signatures for prs.Height. // which contains precommit signatures for prs.Height.
validation := conR.blockStore.LoadBlockValidation(prs.Height) validation := conR.blockStore.LoadBlockValidation(prs.Height)
log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation) log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
// Peer's CatchupCommitRound should be -1 or equal to the validation's precommit rounds.
// If not, warn.
if prs.CatchupCommitRound == -1 {
ps.SetCatchupCommitRound(prs.Height, validation.Round())
continue OUTER_LOOP // Get prs := ps.GetRoundState() again.
} else if prs.CatchupCommitRound != int(validation.Round()) {
log.Warn("Peer's CatchupCommitRound during catchup not equal to commit round",
"height", prs.Height, "validation", validation, "prs.CatchupCommitRound", prs.CatchupCommitRound)
} else if trySendPrecommitFromValidation(validation, &prs.CatchupCommit) {
ps.EnsureCatchupCommitRound(prs.Height, validation.Round())
if trySendPrecommitFromValidation(validation, &prs.CatchupCommit) {
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
@ -656,6 +655,8 @@ func (ps *PeerState) SetHasProposalBlockPart(height uint, round uint, index uint
} }
// prs: If given, will also update this PeerRoundState copy. // prs: If given, will also update this PeerRoundState copy.
// NOTE: It's important to make sure that numValidators actually matches
// what the node sees as the number of validators for height.
func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint, prs *PeerRoundState) { func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint, prs *PeerRoundState) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -683,9 +684,9 @@ func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint, prs *P
if prs != nil { if prs != nil {
prs.Prevotes = ps.Prevotes prs.Prevotes = ps.Prevotes
prs.Precommits = ps.Precommits prs.Precommits = ps.Precommits
prs.LastCommit = ps.LastCommit
prs.CatchupCommit = ps.CatchupCommit prs.CatchupCommit = ps.CatchupCommit
prs.ProposalPOL = ps.ProposalPOL prs.ProposalPOL = ps.ProposalPOL
prs.LastCommit = ps.LastCommit
} }
} }
@ -726,7 +727,8 @@ func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint)
} }
} }
func (ps *PeerState) SetCatchupCommitRound(height, round uint) {
// NOTE: 'round' is what we know to be the commit round for height.
func (ps *PeerState) EnsureCatchupCommitRound(height, round uint) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -734,12 +736,10 @@ func (ps *PeerState) SetCatchupCommitRound(height, round uint) {
return return
} }
if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != int(round) { if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != int(round) {
log.Warn("Conflicting CatchupCommitRound",
"height", height,
"orig", ps.CatchupCommitRound,
"new", round,
)
// TODO think harder
panic(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
}
if ps.CatchupCommitRound == int(round) {
return // Nothing to do!
} }
ps.CatchupCommitRound = int(round) ps.CatchupCommitRound = int(round)
ps.CatchupCommit = nil ps.CatchupCommit = nil
@ -773,6 +773,9 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *Roun
} }
if psHeight == msg.Height && psRound != msg.Round && int(msg.Round) == psCatchupCommitRound { if psHeight == msg.Height && psRound != msg.Round && int(msg.Round) == psCatchupCommitRound {
// Peer caught up to CatchupCommitRound. // Peer caught up to CatchupCommitRound.
// Preserve psCatchupCommit!
// NOTE: We prefer to use prs.Precommits if
// pr.Round matches pr.CatchupCommitRound.
ps.Precommits = psCatchupCommit ps.Precommits = psCatchupCommit
} }
if psHeight != msg.Height { if psHeight != msg.Height {


+ 3
- 3
consensus/state.go View File

@ -499,7 +499,7 @@ func (cs *ConsensusState) EnterPropose(height uint, round uint) {
// If we already have the proposal + POL, then goto Prevote // If we already have the proposal + POL, then goto Prevote
if cs.isProposalComplete() { if cs.isProposalComplete() {
go cs.EnterPrevote(height, round)
go cs.EnterPrevote(height, cs.Round)
} }
}() }()
@ -965,7 +965,7 @@ func (cs *ConsensusState) SetProposal(proposal *Proposal) error {
} }
// NOTE: block is not necessarily valid. // NOTE: block is not necessarily valid.
func (cs *ConsensusState) AddProposalBlockPart(height uint, round uint, part *types.Part) (added bool, err error) {
func (cs *ConsensusState) AddProposalBlockPart(height uint, part *types.Part) (added bool, err error) {
cs.mtx.Lock() cs.mtx.Lock()
defer cs.mtx.Unlock() defer cs.mtx.Unlock()
@ -991,7 +991,7 @@ func (cs *ConsensusState) AddProposalBlockPart(height uint, round uint, part *ty
log.Debug("Received complete proposal", "hash", cs.ProposalBlock.Hash()) log.Debug("Received complete proposal", "hash", cs.ProposalBlock.Hash())
if cs.Step == RoundStepPropose && cs.isProposalComplete() { if cs.Step == RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step // Move onto the next step
go cs.EnterPrevote(height, round)
go cs.EnterPrevote(height, cs.Round)
} else if cs.Step == RoundStepCommit { } else if cs.Step == RoundStepCommit {
// If we're waiting on the proposal block... // If we're waiting on the proposal block...
cs.tryFinalizeCommit(height) cs.tryFinalizeCommit(height)


Loading…
Cancel
Save