diff --git a/consensus/reactor.go b/consensus/reactor.go index 971a677bb..a6ea168b9 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -209,8 +209,8 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte // TODO: punish peer } } - ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size(), nil) - ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size(), nil) + ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size()) + ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size()) ps.SetHasVote(vote, index) if added { // If rs.Height == vote.Height && rs.Round < vote.Round, @@ -330,6 +330,7 @@ func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) { } func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) { + log := log.New("peer", peer.Key) OUTER_LOOP: for { @@ -435,6 +436,7 @@ OUTER_LOOP: } func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) { + log := log.New("peer", peer.Key) // Simple hack to throttle logs upon sleep. var sleeping = 0 @@ -456,91 +458,42 @@ OUTER_LOOP: sleeping = 0 } - // prsVoteSet: a pointer to a VoteSet field of prs. - // Returns true when useful work was done. - trySendVote := func(voteSet *VoteSet, prsVoteSet **BitArray) (sent bool) { - if voteSet == nil { - return false - } - if *prsVoteSet == nil { - ps.EnsureVoteBitArrays(voteSet.Height(), voteSet.Size(), prs) - // We could return true here (useful work was done) - // or, we can continue since prsVoteSet is no longer nil. - if *prsVoteSet == nil { - panic("prsVoteSet should not be nil after ps.EnsureVoteBitArrays") - } - } - // TODO: give priority to our vote. - if index, ok := voteSet.BitArray().Sub((*prsVoteSet).Copy()).PickRandom(); ok { - vote := voteSet.GetByIndex(index) - msg := &VoteMessage{index, vote} - peer.Send(VoteChannel, msg) - ps.SetHasVote(vote, index) - return true - } - return false - } - - // prsVoteSet: a pointer to a VoteSet field of prs. - // Returns true when useful work was done. - trySendPrecommitFromValidation := func(validation *types.Validation, prsVoteSet **BitArray) (sent bool) { - if validation == nil { - return false - } else if *prsVoteSet == nil { - ps.EnsureVoteBitArrays(validation.Height(), len(validation.Precommits), prs) - // We could return true here (useful work was done) - // or, we can continue since prsVoteSet is no longer nil. - if *prsVoteSet == nil { - panic("prsVoteSet should not be nil after ps.EnsureVoteBitArrays") - } - } - if index, ok := validation.BitArray().Sub((*prsVoteSet).Copy()).PickRandom(); ok { - precommit := validation.Precommits[index] - log.Debug("Picked precommit to send", "index", index, "precommit", precommit) - msg := &VoteMessage{index, precommit} - peer.Send(VoteChannel, msg) - ps.SetHasVote(precommit, index) - return true - } - return false - } - // If height matches, then send LastCommit, Prevotes, Precommits. if rs.Height == prs.Height { // If there are lastCommits to send... if prs.Step == RoundStepNewHeight { - if trySendVote(rs.LastCommit, &prs.LastCommit) { + if ps.PickSendVote(rs.LastCommit) { continue OUTER_LOOP } } // If there are prevotes to send... if rs.Round == prs.Round && prs.Step <= RoundStepPrevote { - if trySendVote(rs.Votes.Prevotes(rs.Round), &prs.Prevotes) { + if ps.PickSendVote(rs.Votes.Prevotes(rs.Round)) { continue OUTER_LOOP } } // If there are precommits to send... if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit { - if trySendVote(rs.Votes.Precommits(rs.Round), &prs.Precommits) { + if ps.PickSendVote(rs.Votes.Precommits(rs.Round)) { continue OUTER_LOOP } } // If there are prevotes to send for the last round... if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrevote { - if trySendVote(rs.Votes.Prevotes(prs.Round), &prs.Prevotes) { + if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { continue OUTER_LOOP } } // If there are precommits to send for the last round... if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrecommit { - if trySendVote(rs.Votes.Precommits(prs.Round), &prs.Precommits) { + if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) { continue OUTER_LOOP } } // If there are POLPrevotes to send... if 0 <= prs.ProposalPOLRound { if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { - if trySendVote(polPrevotes, &prs.ProposalPOL) { + if ps.PickSendVote(polPrevotes) { continue OUTER_LOOP } } @@ -550,17 +503,8 @@ OUTER_LOOP: // Special catchup logic. // If peer is lagging by height 1, send LastCommit. if prs.Height != 0 && prs.Height == rs.Height-1 { - if prs.Round == rs.LastCommit.Round() { - // NOTE: We prefer to use prs.Precommits if - // prs.Round matches prs.CatchupCommitRound. - if trySendVote(rs.LastCommit, &prs.Precommits) { - continue OUTER_LOOP - } - } else { - ps.EnsureCatchupCommitRound(prs.Height, rs.LastCommit.Round()) - if trySendVote(rs.LastCommit, &prs.CatchupCommit) { - continue OUTER_LOOP - } + if ps.PickSendVote(rs.LastCommit) { + continue OUTER_LOOP } } @@ -571,8 +515,7 @@ OUTER_LOOP: // which contains precommit signatures for prs.Height. validation := conR.blockStore.LoadBlockValidation(prs.Height) log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation) - ps.EnsureCatchupCommitRound(prs.Height, validation.Round()) - if trySendPrecommitFromValidation(validation, &prs.CatchupCommit) { + if ps.PickSendVote(validation) { continue OUTER_LOOP } } @@ -622,14 +565,14 @@ var ( ) type PeerState struct { - Key string + Peer *p2p.Peer mtx sync.Mutex PeerRoundState } func NewPeerState(peer *p2p.Peer) *PeerState { - return &PeerState{Key: peer.Key} + return &PeerState{Peer: peer} } // Returns an atomic snapshot of the PeerRoundState. @@ -671,13 +614,113 @@ func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) { ps.ProposalBlockParts.SetIndex(index, true) } -// prs: If given, will also update this PeerRoundState copy. +// Convenience function to send vote to peer. +// Returns true if vote was sent. +func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) { + if index, vote, ok := ps.PickVoteToSend(votes); ok { + msg := &VoteMessage{index, vote} + ps.Peer.Send(VoteChannel, msg) + return true + } + return false +} + +// votes: Must be the correct Size() for the Height(). +func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + if votes.Size() == 0 { + return 0, nil, false + } + + height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size() + + // Lazily set data using 'votes'. + if votes.IsCommit() { + ps.ensureCatchupCommitRound(height, round, size) + } + ps.ensureVoteBitArrays(height, size) + + psVotes := ps.getVoteBitArray(height, round, type_) + if psVotes == nil { + return 0, nil, false // Not something worth sending + } + if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok { + ps.setHasVote(height, round, type_, index) + return index, votes.GetByIndex(index), true + } + return 0, nil, false +} + +func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray { + if ps.Height == height { + if ps.Round == round { + switch type_ { + case types.VoteTypePrevote: + return ps.Prevotes + case types.VoteTypePrecommit: + return ps.Precommits + default: + panic(Fmt("Unexpected vote type %X", type_)) + } + } + if ps.CatchupCommitRound == round { + switch type_ { + case types.VoteTypePrevote: + return nil + case types.VoteTypePrecommit: + return ps.CatchupCommit + default: + panic(Fmt("Unexpected vote type %X", type_)) + } + } + return nil + } + if ps.Height == height+1 { + if ps.LastCommitRound == round { + switch type_ { + case types.VoteTypePrevote: + return nil + case types.VoteTypePrecommit: + return ps.LastCommit + default: + panic(Fmt("Unexpected vote type %X", type_)) + } + } + return nil + } + return nil +} + +// NOTE: 'round' is what we know to be the commit round for height. +func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) { + if ps.Height != height { + return + } + if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round { + panic(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round)) + } + if ps.CatchupCommitRound == round { + return // Nothing to do! + } + ps.CatchupCommitRound = round + if round == ps.Round { + ps.CatchupCommit = ps.Precommits + } else { + ps.CatchupCommit = NewBitArray(numValidators) + } +} + // NOTE: It's important to make sure that numValidators actually matches // what the node sees as the number of validators for height. -func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int, prs *PeerRoundState) { +func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) { ps.mtx.Lock() defer ps.mtx.Unlock() + ps.ensureVoteBitArrays(height, numValidators) +} +func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) { if ps.Height == height { if ps.Prevotes == nil { ps.Prevotes = NewBitArray(numValidators) @@ -696,15 +739,6 @@ func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int, prs *Pee ps.LastCommit = NewBitArray(numValidators) } } - - // Also, update prs if given. - if prs != nil { - prs.Prevotes = ps.Prevotes - prs.Precommits = ps.Precommits - prs.CatchupCommit = ps.CatchupCommit - prs.ProposalPOL = ps.ProposalPOL - prs.LastCommit = ps.LastCommit - } } func (ps *PeerState) SetHasVote(vote *types.Vote, index int) { @@ -715,51 +749,48 @@ func (ps *PeerState) SetHasVote(vote *types.Vote, index int) { } func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) { - if ps.Height == height+1 && ps.LastCommitRound == round && type_ == types.VoteTypePrecommit { - // Special case for LastCommit. - ps.LastCommit.SetIndex(index, true) - log.Debug("setHasVote", "LastCommit", ps.LastCommit, "index", index) - return - } else if ps.Height != height { - // Does not apply. - return + log := log.New("peer", ps.Peer.Key, "peerRound", ps.Round, "height", height, "round", round) + if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit { + panic("Invalid vote type") // SANITY } - // By here, ps.Height is height. - switch type_ { - case types.VoteTypePrevote: - if ps.ProposalPOLRound == round { - ps.ProposalPOL.SetIndex(index, true) + if ps.Height == height { + if ps.Round == round { + switch type_ { + case types.VoteTypePrevote: + ps.Prevotes.SetIndex(index, true) + log.Debug("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index) + case types.VoteTypePrecommit: + ps.Precommits.SetIndex(index, true) + log.Debug("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index) + } + } else if ps.CatchupCommitRound == round { + switch type_ { + case types.VoteTypePrevote: + case types.VoteTypePrecommit: + ps.CatchupCommit.SetIndex(index, true) + log.Debug("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index) + } + } else if ps.ProposalPOLRound == round { + switch type_ { + case types.VoteTypePrevote: + ps.ProposalPOL.SetIndex(index, true) + log.Debug("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index) + case types.VoteTypePrecommit: + } } - ps.Prevotes.SetIndex(index, true) - log.Debug("SetHasVote", "peer", ps.Key, "prevotes", ps.Prevotes, "index", index) - case types.VoteTypePrecommit: - if ps.CatchupCommitRound == round { - ps.CatchupCommit.SetIndex(index, true) + } else if ps.Height == height+1 { + if ps.LastCommitRound == round { + switch type_ { + case types.VoteTypePrevote: + case types.VoteTypePrecommit: + ps.LastCommit.SetIndex(index, true) + log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index) + } } - ps.Precommits.SetIndex(index, true) - log.Debug("SetHasVote", "peer", ps.Key, "precommits", ps.Precommits, "index", index) - default: - panic("Invalid vote type") - } -} - -// NOTE: 'round' is what we know to be the commit round for height. -func (ps *PeerState) EnsureCatchupCommitRound(height, round int) { - ps.mtx.Lock() - defer ps.mtx.Unlock() - - if ps.Height != height { - return - } - if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round { - panic(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round)) - } - if ps.CatchupCommitRound == round { - return // Nothing to do! + } else { + // Does not apply. } - ps.CatchupCommitRound = round - ps.CatchupCommit = nil } func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) { diff --git a/consensus/vote_set.go b/consensus/vote_set.go index c5c0ae0ce..073e0c200 100644 --- a/consensus/vote_set.go +++ b/consensus/vote_set.go @@ -67,6 +67,14 @@ func (voteSet *VoteSet) Round() int { } } +func (voteSet *VoteSet) Type() byte { + if voteSet == nil { + return 0x00 + } else { + return voteSet.type_ + } +} + func (voteSet *VoteSet) Size() int { if voteSet == nil { return 0 @@ -193,6 +201,15 @@ func (voteSet *VoteSet) HasTwoThirdsMajority() bool { return voteSet.maj23Exists } +func (voteSet *VoteSet) IsCommit() bool { + if voteSet == nil { + return false + } + voteSet.mtx.Lock() + defer voteSet.mtx.Unlock() + return len(voteSet.maj23Hash) > 0 +} + func (voteSet *VoteSet) HasTwoThirdsAny() bool { if voteSet == nil { return false diff --git a/types/block.go b/types/block.go index a3c9a2207..7b52d4b25 100644 --- a/types/block.go +++ b/types/block.go @@ -219,6 +219,35 @@ func (v *Validation) Round() int { return v.FirstPrecommit().Round } +func (v *Validation) Type() byte { + return VoteTypePrecommit +} + +func (v *Validation) Size() int { + return len(v.Precommits) +} + +func (v *Validation) BitArray() *BitArray { + if v.bitArray == nil { + v.bitArray = NewBitArray(len(v.Precommits)) + for i, precommit := range v.Precommits { + v.bitArray.SetIndex(i, precommit != nil) + } + } + return v.bitArray +} + +func (v *Validation) GetByIndex(index int) *Vote { + return v.Precommits[index] +} + +func (v *Validation) IsCommit() bool { + if len(v.Precommits) == 0 { + return false + } + return true +} + func (v *Validation) ValidateBasic() error { if len(v.Precommits) == 0 { return errors.New("No precommits in validation") @@ -274,16 +303,6 @@ func (v *Validation) StringIndented(indent string) string { indent, v.hash) } -func (v *Validation) BitArray() *BitArray { - if v.bitArray == nil { - v.bitArray = NewBitArray(len(v.Precommits)) - for i, precommit := range v.Precommits { - v.bitArray.SetIndex(i, precommit != nil) - } - } - return v.bitArray -} - //----------------------------------------------------------------------------- type Data struct { diff --git a/types/vote.go b/types/vote.go index 4fca422d6..6bc62f11f 100644 --- a/types/vote.go +++ b/types/vote.go @@ -69,3 +69,17 @@ func (vote *Vote) String() string { return fmt.Sprintf("Vote{%v/%02d/%v(%v) %X#%v %v}", vote.Height, vote.Round, vote.Type, typeString, Fingerprint(vote.BlockHash), vote.BlockParts, vote.Signature) } + +//-------------------------------------------------------------------------------- +// TODO: Move blocks/Validation to here? + +// Common interface between *consensus.VoteSet and types.Validation +type VoteSetReader interface { + Height() int + Round() int + Type() byte + Size() int + BitArray() *BitArray + GetByIndex(int) *Vote + IsCommit() bool +}