Browse Source

consensus reactor code polish, fixed prs BitArray cache invalidation bug

pull/102/head
Jae Kwon 10 years ago
parent
commit
41d04cf5b8
4 changed files with 214 additions and 133 deletions
  1. +154
    -123
      consensus/reactor.go
  2. +17
    -0
      consensus/vote_set.go
  3. +29
    -10
      types/block.go
  4. +14
    -0
      types/vote.go

+ 154
- 123
consensus/reactor.go View File

@ -209,8 +209,8 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
// TODO: punish peer
}
}
ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size(), nil)
ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size(), nil)
ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size())
ps.SetHasVote(vote, index)
if added {
// If rs.Height == vote.Height && rs.Round < vote.Round,
@ -330,6 +330,7 @@ func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
}
func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
log := log.New("peer", peer.Key)
OUTER_LOOP:
for {
@ -435,6 +436,7 @@ OUTER_LOOP:
}
func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
log := log.New("peer", peer.Key)
// Simple hack to throttle logs upon sleep.
var sleeping = 0
@ -456,91 +458,42 @@ OUTER_LOOP:
sleeping = 0
}
// prsVoteSet: a pointer to a VoteSet field of prs.
// Returns true when useful work was done.
trySendVote := func(voteSet *VoteSet, prsVoteSet **BitArray) (sent bool) {
if voteSet == nil {
return false
}
if *prsVoteSet == nil {
ps.EnsureVoteBitArrays(voteSet.Height(), voteSet.Size(), prs)
// We could return true here (useful work was done)
// or, we can continue since prsVoteSet is no longer nil.
if *prsVoteSet == nil {
panic("prsVoteSet should not be nil after ps.EnsureVoteBitArrays")
}
}
// TODO: give priority to our vote.
if index, ok := voteSet.BitArray().Sub((*prsVoteSet).Copy()).PickRandom(); ok {
vote := voteSet.GetByIndex(index)
msg := &VoteMessage{index, vote}
peer.Send(VoteChannel, msg)
ps.SetHasVote(vote, index)
return true
}
return false
}
// prsVoteSet: a pointer to a VoteSet field of prs.
// Returns true when useful work was done.
trySendPrecommitFromValidation := func(validation *types.Validation, prsVoteSet **BitArray) (sent bool) {
if validation == nil {
return false
} else if *prsVoteSet == nil {
ps.EnsureVoteBitArrays(validation.Height(), len(validation.Precommits), prs)
// We could return true here (useful work was done)
// or, we can continue since prsVoteSet is no longer nil.
if *prsVoteSet == nil {
panic("prsVoteSet should not be nil after ps.EnsureVoteBitArrays")
}
}
if index, ok := validation.BitArray().Sub((*prsVoteSet).Copy()).PickRandom(); ok {
precommit := validation.Precommits[index]
log.Debug("Picked precommit to send", "index", index, "precommit", precommit)
msg := &VoteMessage{index, precommit}
peer.Send(VoteChannel, msg)
ps.SetHasVote(precommit, index)
return true
}
return false
}
// If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height {
// If there are lastCommits to send...
if prs.Step == RoundStepNewHeight {
if trySendVote(rs.LastCommit, &prs.LastCommit) {
if ps.PickSendVote(rs.LastCommit) {
continue OUTER_LOOP
}
}
// If there are prevotes to send...
if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
if trySendVote(rs.Votes.Prevotes(rs.Round), &prs.Prevotes) {
if ps.PickSendVote(rs.Votes.Prevotes(rs.Round)) {
continue OUTER_LOOP
}
}
// If there are precommits to send...
if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
if trySendVote(rs.Votes.Precommits(rs.Round), &prs.Precommits) {
if ps.PickSendVote(rs.Votes.Precommits(rs.Round)) {
continue OUTER_LOOP
}
}
// If there are prevotes to send for the last round...
if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrevote {
if trySendVote(rs.Votes.Prevotes(prs.Round), &prs.Prevotes) {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
continue OUTER_LOOP
}
}
// If there are precommits to send for the last round...
if rs.Round == prs.Round+1 && prs.Step <= RoundStepPrecommit {
if trySendVote(rs.Votes.Precommits(prs.Round), &prs.Precommits) {
if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
continue OUTER_LOOP
}
}
// If there are POLPrevotes to send...
if 0 <= prs.ProposalPOLRound {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if trySendVote(polPrevotes, &prs.ProposalPOL) {
if ps.PickSendVote(polPrevotes) {
continue OUTER_LOOP
}
}
@ -550,17 +503,8 @@ OUTER_LOOP:
// Special catchup logic.
// If peer is lagging by height 1, send LastCommit.
if prs.Height != 0 && prs.Height == rs.Height-1 {
if prs.Round == rs.LastCommit.Round() {
// NOTE: We prefer to use prs.Precommits if
// prs.Round matches prs.CatchupCommitRound.
if trySendVote(rs.LastCommit, &prs.Precommits) {
continue OUTER_LOOP
}
} else {
ps.EnsureCatchupCommitRound(prs.Height, rs.LastCommit.Round())
if trySendVote(rs.LastCommit, &prs.CatchupCommit) {
continue OUTER_LOOP
}
if ps.PickSendVote(rs.LastCommit) {
continue OUTER_LOOP
}
}
@ -571,8 +515,7 @@ OUTER_LOOP:
// which contains precommit signatures for prs.Height.
validation := conR.blockStore.LoadBlockValidation(prs.Height)
log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
ps.EnsureCatchupCommitRound(prs.Height, validation.Round())
if trySendPrecommitFromValidation(validation, &prs.CatchupCommit) {
if ps.PickSendVote(validation) {
continue OUTER_LOOP
}
}
@ -622,14 +565,14 @@ var (
)
type PeerState struct {
Key string
Peer *p2p.Peer
mtx sync.Mutex
PeerRoundState
}
func NewPeerState(peer *p2p.Peer) *PeerState {
return &PeerState{Key: peer.Key}
return &PeerState{Peer: peer}
}
// Returns an atomic snapshot of the PeerRoundState.
@ -671,13 +614,113 @@ func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
ps.ProposalBlockParts.SetIndex(index, true)
}
// prs: If given, will also update this PeerRoundState copy.
// Convenience function to send vote to peer.
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
if index, vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{index, vote}
ps.Peer.Send(VoteChannel, msg)
return true
}
return false
}
// votes: Must be the correct Size() for the Height().
func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (index int, vote *types.Vote, ok bool) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if votes.Size() == 0 {
return 0, nil, false
}
height, round, type_, size := votes.Height(), votes.Round(), votes.Type(), votes.Size()
// Lazily set data using 'votes'.
if votes.IsCommit() {
ps.ensureCatchupCommitRound(height, round, size)
}
ps.ensureVoteBitArrays(height, size)
psVotes := ps.getVoteBitArray(height, round, type_)
if psVotes == nil {
return 0, nil, false // Not something worth sending
}
if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
ps.setHasVote(height, round, type_, index)
return index, votes.GetByIndex(index), true
}
return 0, nil, false
}
func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
if ps.Height == height {
if ps.Round == round {
switch type_ {
case types.VoteTypePrevote:
return ps.Prevotes
case types.VoteTypePrecommit:
return ps.Precommits
default:
panic(Fmt("Unexpected vote type %X", type_))
}
}
if ps.CatchupCommitRound == round {
switch type_ {
case types.VoteTypePrevote:
return nil
case types.VoteTypePrecommit:
return ps.CatchupCommit
default:
panic(Fmt("Unexpected vote type %X", type_))
}
}
return nil
}
if ps.Height == height+1 {
if ps.LastCommitRound == round {
switch type_ {
case types.VoteTypePrevote:
return nil
case types.VoteTypePrecommit:
return ps.LastCommit
default:
panic(Fmt("Unexpected vote type %X", type_))
}
}
return nil
}
return nil
}
// NOTE: 'round' is what we know to be the commit round for height.
func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators int) {
if ps.Height != height {
return
}
if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
panic(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
}
if ps.CatchupCommitRound == round {
return // Nothing to do!
}
ps.CatchupCommitRound = round
if round == ps.Round {
ps.CatchupCommit = ps.Precommits
} else {
ps.CatchupCommit = NewBitArray(numValidators)
}
}
// NOTE: It's important to make sure that numValidators actually matches
// what the node sees as the number of validators for height.
func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int, prs *PeerRoundState) {
func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.ensureVoteBitArrays(height, numValidators)
}
func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
if ps.Height == height {
if ps.Prevotes == nil {
ps.Prevotes = NewBitArray(numValidators)
@ -696,15 +739,6 @@ func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int, prs *Pee
ps.LastCommit = NewBitArray(numValidators)
}
}
// Also, update prs if given.
if prs != nil {
prs.Prevotes = ps.Prevotes
prs.Precommits = ps.Precommits
prs.CatchupCommit = ps.CatchupCommit
prs.ProposalPOL = ps.ProposalPOL
prs.LastCommit = ps.LastCommit
}
}
func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
@ -715,51 +749,48 @@ func (ps *PeerState) SetHasVote(vote *types.Vote, index int) {
}
func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
if ps.Height == height+1 && ps.LastCommitRound == round && type_ == types.VoteTypePrecommit {
// Special case for LastCommit.
ps.LastCommit.SetIndex(index, true)
log.Debug("setHasVote", "LastCommit", ps.LastCommit, "index", index)
return
} else if ps.Height != height {
// Does not apply.
return
log := log.New("peer", ps.Peer.Key, "peerRound", ps.Round, "height", height, "round", round)
if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
panic("Invalid vote type") // SANITY
}
// By here, ps.Height is height.
switch type_ {
case types.VoteTypePrevote:
if ps.ProposalPOLRound == round {
ps.ProposalPOL.SetIndex(index, true)
if ps.Height == height {
if ps.Round == round {
switch type_ {
case types.VoteTypePrevote:
ps.Prevotes.SetIndex(index, true)
log.Debug("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
case types.VoteTypePrecommit:
ps.Precommits.SetIndex(index, true)
log.Debug("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
}
} else if ps.CatchupCommitRound == round {
switch type_ {
case types.VoteTypePrevote:
case types.VoteTypePrecommit:
ps.CatchupCommit.SetIndex(index, true)
log.Debug("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
}
} else if ps.ProposalPOLRound == round {
switch type_ {
case types.VoteTypePrevote:
ps.ProposalPOL.SetIndex(index, true)
log.Debug("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
case types.VoteTypePrecommit:
}
}
ps.Prevotes.SetIndex(index, true)
log.Debug("SetHasVote", "peer", ps.Key, "prevotes", ps.Prevotes, "index", index)
case types.VoteTypePrecommit:
if ps.CatchupCommitRound == round {
ps.CatchupCommit.SetIndex(index, true)
} else if ps.Height == height+1 {
if ps.LastCommitRound == round {
switch type_ {
case types.VoteTypePrevote:
case types.VoteTypePrecommit:
ps.LastCommit.SetIndex(index, true)
log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
}
}
ps.Precommits.SetIndex(index, true)
log.Debug("SetHasVote", "peer", ps.Key, "precommits", ps.Precommits, "index", index)
default:
panic("Invalid vote type")
}
}
// NOTE: 'round' is what we know to be the commit round for height.
func (ps *PeerState) EnsureCatchupCommitRound(height, round int) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.Height != height {
return
}
if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
panic(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
}
if ps.CatchupCommitRound == round {
return // Nothing to do!
} else {
// Does not apply.
}
ps.CatchupCommitRound = round
ps.CatchupCommit = nil
}
func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {


+ 17
- 0
consensus/vote_set.go View File

@ -67,6 +67,14 @@ func (voteSet *VoteSet) Round() int {
}
}
func (voteSet *VoteSet) Type() byte {
if voteSet == nil {
return 0x00
} else {
return voteSet.type_
}
}
func (voteSet *VoteSet) Size() int {
if voteSet == nil {
return 0
@ -193,6 +201,15 @@ func (voteSet *VoteSet) HasTwoThirdsMajority() bool {
return voteSet.maj23Exists
}
func (voteSet *VoteSet) IsCommit() bool {
if voteSet == nil {
return false
}
voteSet.mtx.Lock()
defer voteSet.mtx.Unlock()
return len(voteSet.maj23Hash) > 0
}
func (voteSet *VoteSet) HasTwoThirdsAny() bool {
if voteSet == nil {
return false


+ 29
- 10
types/block.go View File

@ -219,6 +219,35 @@ func (v *Validation) Round() int {
return v.FirstPrecommit().Round
}
func (v *Validation) Type() byte {
return VoteTypePrecommit
}
func (v *Validation) Size() int {
return len(v.Precommits)
}
func (v *Validation) BitArray() *BitArray {
if v.bitArray == nil {
v.bitArray = NewBitArray(len(v.Precommits))
for i, precommit := range v.Precommits {
v.bitArray.SetIndex(i, precommit != nil)
}
}
return v.bitArray
}
func (v *Validation) GetByIndex(index int) *Vote {
return v.Precommits[index]
}
func (v *Validation) IsCommit() bool {
if len(v.Precommits) == 0 {
return false
}
return true
}
func (v *Validation) ValidateBasic() error {
if len(v.Precommits) == 0 {
return errors.New("No precommits in validation")
@ -274,16 +303,6 @@ func (v *Validation) StringIndented(indent string) string {
indent, v.hash)
}
func (v *Validation) BitArray() *BitArray {
if v.bitArray == nil {
v.bitArray = NewBitArray(len(v.Precommits))
for i, precommit := range v.Precommits {
v.bitArray.SetIndex(i, precommit != nil)
}
}
return v.bitArray
}
//-----------------------------------------------------------------------------
type Data struct {


+ 14
- 0
types/vote.go View File

@ -69,3 +69,17 @@ func (vote *Vote) String() string {
return fmt.Sprintf("Vote{%v/%02d/%v(%v) %X#%v %v}", vote.Height, vote.Round, vote.Type, typeString, Fingerprint(vote.BlockHash), vote.BlockParts, vote.Signature)
}
//--------------------------------------------------------------------------------
// TODO: Move blocks/Validation to here?
// Common interface between *consensus.VoteSet and types.Validation
type VoteSetReader interface {
Height() int
Round() int
Type() byte
Size() int
BitArray() *BitArray
GetByIndex(int) *Vote
IsCommit() bool
}

Loading…
Cancel
Save