Browse Source

Draft of consensus/reactor refactor.

pull/96/head
Jae Kwon 9 years ago
parent
commit
7752405945
6 changed files with 247 additions and 237 deletions
  1. +3
    -4
      binary/int.go
  2. +4
    -5
      blockchain/store.go
  3. +131
    -133
      consensus/reactor.go
  4. +20
    -18
      consensus/state.go
  5. +61
    -30
      consensus/vote_set.go
  6. +28
    -47
      types/block.go

+ 3
- 4
binary/int.go View File

@ -158,8 +158,7 @@ func ReadUint64(r io.Reader, n *int64, err *error) uint64 {
// Varint // Varint
func uvarintSize(i_ uint) int {
i := uint64(i_)
func uvarintSize(i uint64) int {
if i == 0 { if i == 0 {
return 0 return 0
} }
@ -193,7 +192,7 @@ func WriteVarint(i int, w io.Writer, n *int64, err *error) {
negate = true negate = true
i = -i i = -i
} }
var size = uvarintSize(uint(i))
var size = uvarintSize(uint64(i))
if negate { if negate {
// e.g. 0xF1 for a single negative byte // e.g. 0xF1 for a single negative byte
WriteUint8(uint8(size+0xF0), w, n, err) WriteUint8(uint8(size+0xF0), w, n, err)
@ -236,7 +235,7 @@ func ReadVarint(r io.Reader, n *int64, err *error) int {
// Uvarint // Uvarint
func WriteUvarint(i uint, w io.Writer, n *int64, err *error) { func WriteUvarint(i uint, w io.Writer, n *int64, err *error) {
var size = uvarintSize(i)
var size = uvarintSize(uint64(i))
WriteUint8(uint8(size), w, n, err) WriteUint8(uint8(size), w, n, err)
if size > 0 { if size > 0 {
buf := make([]byte, 8) buf := make([]byte, 8)


+ 4
- 5
blockchain/store.go View File

@ -101,9 +101,8 @@ func (bs *BlockStore) LoadBlockMeta(height uint) *types.BlockMeta {
return meta return meta
} }
// NOTE: the Precommit-vote heights are for the block at `height-1`
// Since these are included in the subsequent block, the height
// is off by 1.
// The +2/3 and other Precommit-votes for block at `height`.
// This Validation comes from block.LastValidation for `height+1`.
func (bs *BlockStore) LoadBlockValidation(height uint) *types.Validation { func (bs *BlockStore) LoadBlockValidation(height uint) *types.Validation {
var n int64 var n int64
var err error var err error
@ -158,8 +157,8 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
} }
// Save block validation (duplicate and separate from the Block) // Save block validation (duplicate and separate from the Block)
blockValidationBytes := binary.BinaryBytes(block.Validation)
bs.db.Set(calcBlockValidationKey(height), blockValidationBytes)
blockValidationBytes := binary.BinaryBytes(block.LastValidation)
bs.db.Set(calcBlockValidationKey(height-1), blockValidationBytes)
// Save seen validation (seen +2/3 precommits for block) // Save seen validation (seen +2/3 precommits for block)
seenValidationBytes := binary.BinaryBytes(seenValidation) seenValidationBytes := binary.BinaryBytes(seenValidation)


+ 131
- 133
consensus/reactor.go View File

@ -49,8 +49,8 @@ type ConsensusReactor struct {
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, sync bool) *ConsensusReactor { func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, sync bool) *ConsensusReactor {
conR := &ConsensusReactor{ conR := &ConsensusReactor{
blockStore: blockStore,
quit: make(chan struct{}), quit: make(chan struct{}),
blockStore: blockStore,
conS: consensusState, conS: consensusState,
sync: sync, sync: sync,
} }
@ -119,7 +119,7 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
go conR.gossipVotesRoutine(peer, peerState) go conR.gossipVotesRoutine(peer, peerState)
// Send our state to peer. // Send our state to peer.
conR.sendNewRoundStep(peer)
conR.sendNewRoundStepMessage(peer)
} }
// Implements Reactor // Implements Reactor
@ -164,18 +164,13 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
case *ProposalMessage: case *ProposalMessage:
ps.SetHasProposal(msg.Proposal) ps.SetHasProposal(msg.Proposal)
err = conR.conS.SetProposal(msg.Proposal) err = conR.conS.SetProposal(msg.Proposal)
case *PartMessage: case *PartMessage:
if msg.Type == partTypeProposalBlock { if msg.Type == partTypeProposalBlock {
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index) ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
_, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part) _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Round, msg.Part)
} else if msg.Type == partTypeProposalPOL {
ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Proof.Index)
_, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part)
} else { } else {
log.Warn(Fmt("Unknown part type %v", msg.Type)) log.Warn(Fmt("Unknown part type %v", msg.Type))
} }
default: default:
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
@ -186,15 +181,14 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
vote := msg.Vote vote := msg.Vote
if rs.Height != vote.Height { if rs.Height != vote.Height {
if rs.Height == vote.Height+1 { if rs.Height == vote.Height+1 {
if rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypeCommit {
if rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit {
goto VOTE_PASS // *ducks* goto VOTE_PASS // *ducks*
} }
} }
return // Wrong height. Not necessarily a bad peer. return // Wrong height. Not necessarily a bad peer.
} }
VOTE_PASS: VOTE_PASS:
validatorIndex := msg.ValidatorIndex
address, _ := rs.Validators.GetByIndex(validatorIndex)
address, _ := rs.Validators.GetByIndex(msg.ValidatorIndex)
added, index, err := conR.conS.AddVote(address, vote) added, index, err := conR.conS.AddVote(address, vote)
if err != nil { if err != nil {
// If conflicting sig, broadcast evidence tx for slashing. Else punish peer. // If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
@ -213,8 +207,8 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
// TODO: punish peer // TODO: punish peer
} }
} }
// Initialize Prevotes/Precommits/Commits if needed
ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
// Initialize Prevotes/Precommits if needed
ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size(), _)
ps.SetHasVote(vote, index) ps.SetHasVote(vote, index)
if added { if added {
msg := &HasVoteMessage{ msg := &HasVoteMessage{
@ -247,7 +241,6 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
// reset the state, turn off fast sync, start the consensus-state-machine // reset the state, turn off fast sync, start the consensus-state-machine
func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
conR.conS.updateToState(state, false) conR.conS.updateToState(state, false)
conR.conS.newStepCh <- conR.conS.getRoundState()
conR.sync = false conR.sync = false
conR.conS.Start() conR.conS.Start()
} }
@ -270,6 +263,7 @@ func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *
Round: rs.Round, Round: rs.Round,
Step: rs.Step, Step: rs.Step,
SecondsSinceStartTime: uint(timeElapsed.Seconds()), SecondsSinceStartTime: uint(timeElapsed.Seconds()),
LastCommitRound: rs.LastCommit.Round(),
} }
// If the step is commit, then also broadcast a CommitStepMessage. // If the step is commit, then also broadcast a CommitStepMessage.
@ -306,7 +300,7 @@ func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
} }
} }
func (conR *ConsensusReactor) sendNewRoundStep(peer *p2p.Peer) {
func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
rs := conR.conS.GetRoundState() rs := conR.conS.GetRoundState()
nrsMsg, csMsg := makeRoundStepMessages(rs) nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil { if nrsMsg != nil {
@ -330,8 +324,6 @@ OUTER_LOOP:
prs := ps.GetRoundState() prs := ps.GetRoundState()
// Send proposal Block parts? // Send proposal Block parts?
// NOTE: if we or peer is at RoundStepCommit*, the round
// won't necessarily match, but that's OK.
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) { if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) {
//log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts) //log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok { if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok {
@ -400,21 +392,6 @@ OUTER_LOOP:
continue OUTER_LOOP continue OUTER_LOOP
} }
// Send proposal POL parts?
if rs.ProposalPOLParts.HasHeader(prs.ProposalPOLParts) {
if index, ok := rs.ProposalPOLParts.BitArray().Sub(prs.ProposalPOLBitArray.Copy()).PickRandom(); ok {
msg := &PartMessage{
Height: rs.Height,
Round: rs.Round,
Type: partTypeProposalPOL,
Part: rs.ProposalPOLParts.GetPart(index),
}
peer.Send(DataChannel, msg)
ps.SetHasProposalPOLPart(rs.Height, rs.Round, index)
continue OUTER_LOOP
}
}
// Nothing to do. Sleep. // Nothing to do. Sleep.
time.Sleep(peerGossipSleepDuration) time.Sleep(peerGossipSleepDuration)
continue OUTER_LOOP continue OUTER_LOOP
@ -443,18 +420,23 @@ OUTER_LOOP:
sleeping = 0 sleeping = 0
} }
// prsVoteSet: a pointer to a VoteSet field of prs.
// Returns true when useful work was done. // Returns true when useful work was done.
trySendVote := func(height uint, voteSet *VoteSet, peerVoteSet *BitArray) (sent bool) {
trySendVote := func(voteSet *VoteSet, prsVoteSet **BitArray) (sent bool) {
if voteSet == nil { if voteSet == nil {
return false return false
} else if peerVoteSet == nil {
ps.EnsureVoteBitArrays(height, voteSet.Size())
return true
}
if *prsVoteSet == nil {
ps.EnsureVoteBitArrays(voteSet.Height(), voteSet.Size(), prs)
// We could return true here (useful work was done)
// or, we can continue since prsVoteSet is no longer nil.
if *prsVoteSet == nil {
panic("prsVoteSet should not be nil after ps.EnsureVoteBitArrays")
}
} }
// TODO: give priority to our vote. // TODO: give priority to our vote.
if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok {
if index, ok := voteSet.BitArray().Sub((*prsVoteSet).Copy()).PickRandom(); ok {
vote := voteSet.GetByIndex(index) vote := voteSet.GetByIndex(index)
// NOTE: vote may be a commit.
msg := &VoteMessage{index, vote} msg := &VoteMessage{index, vote}
peer.Send(VoteChannel, msg) peer.Send(VoteChannel, msg)
ps.SetHasVote(vote, index) ps.SetHasVote(vote, index)
@ -463,103 +445,95 @@ OUTER_LOOP:
return false return false
} }
// prsVoteSet: a pointer to a VoteSet field of prs.
// Returns true when useful work was done. // Returns true when useful work was done.
trySendCommitFromValidation := func(blockMeta *types.BlockMeta, validation *types.Validation, peerVoteSet *BitArray) (sent bool) {
trySendPrecommitFromValidation := func(validation *types.Validation, prsVoteSet **BitArray) (sent bool) {
if validation == nil { if validation == nil {
return false return false
} else if peerVoteSet == nil {
ps.EnsureVoteBitArrays(blockMeta.Header.Height, uint(len(validation.Commits)))
return true
}
if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok {
commit := validation.Commits[index]
log.Debug("Picked commit to send", "index", index, "commit", commit)
// Reconstruct vote.
vote := &types.Vote{
Height: prs.Height,
Round: commit.Round,
Type: types.VoteTypeCommit,
BlockHash: blockMeta.Hash,
BlockParts: blockMeta.Parts,
Signature: commit.Signature,
} else if *prsVoteSet == nil {
ps.EnsureVoteBitArrays(validation.Height(), uint(len(validation.Precommits)), prs)
// We could return true here (useful work was done)
// or, we can continue since prsVoteSet is no longer nil.
if *prsVoteSet == nil {
panic("prsVoteSet should not be nil after ps.EnsureVoteBitArrays")
} }
msg := &VoteMessage{index, vote}
}
if index, ok := validation.BitArray().Sub((*prsVoteSet).Copy()).PickRandom(); ok {
precommit := validation.Precommits[index]
log.Debug("Picked precommit to send", "index", index, "precommit", precommit)
msg := &VoteMessage{index, precommit}
peer.Send(VoteChannel, msg) peer.Send(VoteChannel, msg)
ps.SetHasVote(vote, index)
ps.SetHasVote(precommit, index)
return true return true
} }
return false return false
} }
// If height matches, then send LastCommits, Prevotes, Precommits, or Commits.
// If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height { if rs.Height == prs.Height {
// If there are lastcommits to send...
if prs.Round == 0 && prs.Step == RoundStepNewHeight {
if trySendVote(rs.Height-1, rs.LastCommits, prs.LastCommits) {
// If there are lastCommits to send...
if prs.Step == RoundStepNewHeight {
if trySendVote(rs.LastCommit, prs.LastCommit) {
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
// If there are prevotes to send... // If there are prevotes to send...
if rs.Round == prs.Round && prs.Step <= RoundStepPrevote { if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
if trySendVote(rs.Height, rs.Prevotes, prs.Prevotes) {
if trySendVote(rs.Prevotes, prs.Prevotes) {
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
// If there are precommits to send... // If there are precommits to send...
if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit { if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
if trySendVote(rs.Height, rs.Precommits, prs.Precommits) {
if trySendVote(rs.Precommits, prs.Precommits) {
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
// If there are any commits to send...
if trySendVote(rs.Height, rs.Commits, prs.Commits) {
continue OUTER_LOOP
}
} }
// Catchup logic
if prs.Height != 0 && !prs.HasAllCatchupCommits {
// If peer is lagging by height 1
if rs.Height == prs.Height+1 {
if rs.LastCommits.Size() > 0 {
// Sync peer to rs.LastCommits
if trySendVote(prs.Height, rs.LastCommits, prs.Commits) {
continue OUTER_LOOP
} else {
ps.SetHasAllCatchupCommits(prs.Height)
}
// Special catchup logic.
// If peer is lagging by height 1, send LastCommit.
if prs.Height != 0 && prs.Height == rs.Height-1 {
if prs.Round == rs.LastCommit.Round() {
if trySendVote(rs.LastCommit, prs.Precommits) {
continue OUTER_LOOP
// XXX CONTONUE
} }
}
// If peer is lagging by more than 1, send Validation.
if rs.Height >= prs.Height+2 {
// Load the blockMeta for block at prs.Height
blockMeta := conR.blockStore.LoadBlockMeta(prs.Height)
// Load the block validation for prs.Height+1,
// which contains commit signatures for prs.Height.
validation := conR.blockStore.LoadBlockValidation(prs.Height + 1)
log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height+1, "blockMeta", blockMeta, "validation", validation)
if trySendCommitFromValidation(blockMeta, validation, prs.Commits) {
} else {
ps.SetCatchupCommitRound(prs.Height, rs.LastCommit.Round())
ps.EnsureVoteBitArrays(prs.Height, rs.LastCommit.Size(), prs)
if trySendVote(rs.LastCommit, prs.CatchupCommit) {
continue OUTER_LOOP continue OUTER_LOOP
} else {
ps.SetHasAllCatchupCommits(prs.Height)
} }
} }
} }
// Catchup logic
// If peer is lagging by more than 1, send Validation.
if prs.Height != 0 && prs.Height <= rs.Height-2 {
// Load the block validation for prs.Height,
// which contains precommit signatures for prs.Height.
validation := conR.blockStore.LoadBlockValidation(prs.Height)
log.Debug("Loaded BlockValidation for catch-up", "height", prs.Height, "validation", validation)
// Peer's CommitRound should be -1 or equal to the validation's precommit rounds.
// If not, warn.
if prs.CommitRound == -1 {
ps.SetCommitRound(prs.Height, validation.Round())
continue OUTER_LOOP // Get prs := ps.GetRoundState() again.
} else if prs.CommitRound != validation.Round() {
log.Warn("Peer's CommitRound during catchup not equal to commit round",
"height", prs.Height, "validation", validation, "prs.CommitRound", prs.CommitRound)
} else if trySendPrecommitFromValidation(validation, prs.Commit) {
continue OUTER_LOOP
}
}
if sleeping == 0 { if sleeping == 0 {
// We sent nothing. Sleep... // We sent nothing. Sleep...
sleeping = 1 sleeping = 1
log.Debug("No votes to send, sleeping", "peer", peer, log.Debug("No votes to send, sleeping", "peer", peer,
"localPV", rs.Prevotes.BitArray(), "peerPV", prs.Prevotes, "localPV", rs.Prevotes.BitArray(), "peerPV", prs.Prevotes,
"localPC", rs.Precommits.BitArray(), "peerPC", prs.Precommits,
"localCM", rs.Commits.BitArray(), "peerCM", prs.Commits)
"localPC", rs.Precommits.BitArray(), "peerPC", prs.Precommits)
} else if sleeping == 2 { } else if sleeping == 2 {
// Continued sleep... // Continued sleep...
sleeping = 1 sleeping = 1
@ -585,9 +559,13 @@ type PeerRoundState struct {
ProposalPOLBitArray *BitArray // True bit -> has part ProposalPOLBitArray *BitArray // True bit -> has part
Prevotes *BitArray // All votes peer has for this round Prevotes *BitArray // All votes peer has for this round
Precommits *BitArray // All precommits peer has for this round Precommits *BitArray // All precommits peer has for this round
Commits *BitArray // All commits peer has for this height
LastCommits *BitArray // All commits peer has for last height
HasAllCatchupCommits bool // Used for catch-up
LastCommitRound uint // Round of commit for last height.
LastCommit *BitArray // All commit precommits of commit for last height.
// If peer is leading in height, the round that peer believes commit round is.
// If peer is lagging in height, the round that we believe commit round is.
CatchupCommitRound int
CatchupCommit *BitArray // All commit precommits peer has for this height
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -658,7 +636,8 @@ func (ps *PeerState) SetHasProposalPOLPart(height uint, round uint, index uint)
ps.ProposalPOLBitArray.SetIndex(uint(index), true) ps.ProposalPOLBitArray.SetIndex(uint(index), true)
} }
func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint) {
// prs: If given, will also update this PeerRoundState copy.
func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint, prs *PeerRoundState) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -669,14 +648,22 @@ func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint) {
if ps.Precommits == nil { if ps.Precommits == nil {
ps.Precommits = NewBitArray(numValidators) ps.Precommits = NewBitArray(numValidators)
} }
if ps.Commits == nil {
ps.Commits = NewBitArray(numValidators)
if ps.CatchupCommit == nil {
ps.CatchupCommit = NewBitArray(numValidators)
} }
} else if ps.Height == height+1 { } else if ps.Height == height+1 {
if ps.LastCommits == nil {
ps.LastCommits = NewBitArray(numValidators)
if ps.LastCommit == nil {
ps.LastCommit = NewBitArray(numValidators)
} }
} }
// Also, update prs if given.
if prs != nil {
prs.Prevotes = ps.Prevotes
prs.Precommits = ps.Precommits
prs.LastCommit = ps.LastCommit
prs.CatchupCommit = ps.CatchupCommit
}
} }
func (ps *PeerState) SetHasVote(vote *types.Vote, index uint) { func (ps *PeerState) SetHasVote(vote *types.Vote, index uint) {
@ -687,10 +674,10 @@ func (ps *PeerState) SetHasVote(vote *types.Vote, index uint) {
} }
func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) { func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) {
if ps.Height == height+1 && type_ == types.VoteTypeCommit {
// Special case for LastCommits.
ps.LastCommits.SetIndex(index, true)
log.Debug("SetHasVote", "lastCommits", ps.LastCommits, "index", index)
if ps.Height == height+1 && ps.LastCommitRound == round && type_ == types.VoteTypePrecommit {
// Special case for LastCommit.
ps.LastCommit.SetIndex(index, true)
log.Debug("setHasVote", "LastCommit", ps.LastCommit, "index", index)
return return
} else if ps.Height != height { } else if ps.Height != height {
// Does not apply. // Does not apply.
@ -702,29 +689,33 @@ func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint)
ps.Prevotes.SetIndex(index, true) ps.Prevotes.SetIndex(index, true)
log.Debug("SetHasVote", "peer", ps.Key, "prevotes", ps.Prevotes, "index", index) log.Debug("SetHasVote", "peer", ps.Key, "prevotes", ps.Prevotes, "index", index)
case types.VoteTypePrecommit: case types.VoteTypePrecommit:
if ps.CommitRound == round {
ps.Commit.SetIndex(index, true)
}
ps.Precommits.SetIndex(index, true) ps.Precommits.SetIndex(index, true)
log.Debug("SetHasVote", "peer", ps.Key, "precommits", ps.Precommits, "index", index) log.Debug("SetHasVote", "peer", ps.Key, "precommits", ps.Precommits, "index", index)
case types.VoteTypeCommit:
if round < ps.Round {
ps.Prevotes.SetIndex(index, true)
ps.Precommits.SetIndex(index, true)
}
ps.Commits.SetIndex(index, true)
log.Debug("SetHasVote", "peer", ps.Key, "commits", ps.Commits, "index", index)
default: default:
panic("Invalid vote type") panic("Invalid vote type")
} }
} }
// When catching up, this helps keep track of whether
// we should send more commit votes from the block (validation) store
func (ps *PeerState) SetHasAllCatchupCommits(height uint) {
func (ps *PeerState) SetCatchupCommitRound(height, round uint) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
if ps.Height == height {
ps.HasAllCatchupCommits = true
if ps.Height != height {
return
}
if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
log.Warn("Conflicting CatchupCommitRound",
"height", height,
"orig", ps.CatchupCommitRound,
"new", round,
)
// TODO think harder
} }
ps.CatchupCommitRound = round
ps.CatchupCommit = nil
} }
func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) { func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *RoundState) {
@ -735,6 +726,8 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *Roun
psHeight := ps.Height psHeight := ps.Height
psRound := ps.Round psRound := ps.Round
//psStep := ps.Step //psStep := ps.Step
psCatchupCommitRound := ps.CatchupCommitRound
psCatchupCommit := ps.CatchupCommitRound
startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second) startTime := time.Now().Add(-1 * time.Duration(msg.SecondsSinceStartTime) * time.Second)
ps.Height = msg.Height ps.Height = msg.Height
@ -751,16 +744,22 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *Roun
ps.Prevotes = nil ps.Prevotes = nil
ps.Precommits = nil ps.Precommits = nil
} }
if psHeight == msg.Height && psRound != msg.Round && msg.Round == psCatchupCommitRound {
// Peer caught up to CatchupCommitRound.
ps.Precommits = psCatchupCommit
}
if psHeight != msg.Height { if psHeight != msg.Height {
// Shift Commits to LastCommits
if psHeight+1 == msg.Height {
ps.LastCommits = ps.Commits
// Shift Precommits to LastCommit.
if psHeight+1 == msg.Height && psRound == msg.LastCommitRound {
ps.LastCommitRound = msg.LastCommitRound
ps.LastCommit = ps.Precommits
} else { } else {
ps.LastCommits = nil
ps.LastCommitRound = msg.LastCommitRound
ps.LastCommit = nil
} }
// We'll update the BitArray capacity later. // We'll update the BitArray capacity later.
ps.Commits = nil
ps.HasAllCatchupCommits = false
ps.CatchupCommitRound = -1
ps.CatchupCommit = nil
} }
} }
@ -780,11 +779,7 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
// Special case for LastCommits
if ps.Height == msg.Height+1 && msg.Type == types.VoteTypeCommit {
ps.LastCommits.SetIndex(msg.Index, true)
return
} else if ps.Height != msg.Height {
if ps.Height != msg.Height {
return return
} }
@ -826,15 +821,18 @@ func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
//------------------------------------- //-------------------------------------
// For every height/round/step transition
type NewRoundStepMessage struct { type NewRoundStepMessage struct {
Height uint Height uint
Round uint Round uint
Step RoundStepType Step RoundStepType
SecondsSinceStartTime uint SecondsSinceStartTime uint
LastCommitRound uint
} }
func (m *NewRoundStepMessage) String() string { func (m *NewRoundStepMessage) String() string {
return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v]", m.Height, m.Round, m.Step)
return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
m.Height, m.Round, m.Step, m.LastCommitRound)
} }
//------------------------------------- //-------------------------------------


+ 20
- 18
consensus/state.go View File

@ -56,7 +56,7 @@ Consensus State Machine Overview:
* NewHeight: * NewHeight:
* Upon entering NewHeight, * Upon entering NewHeight,
* Move Precommits to LastPrecommits and increment height.
* Move Precommits to LastCommit and increment height.
* Wait until `CommitTime+timeoutCommit` to receive straggler commits. --> Then, goto NewRound round 0 * Wait until `CommitTime+timeoutCommit` to receive straggler commits. --> Then, goto NewRound round 0
* Proof of Safety: * Proof of Safety:
@ -184,7 +184,7 @@ type RoundState struct {
LockedBlock *types.Block LockedBlock *types.Block
LockedBlockParts *types.PartSet LockedBlockParts *types.PartSet
Votes *HeightVoteSet Votes *HeightVoteSet
LastPrecommits *VoteSet // Last precommits for Height-1
LastCommit *VoteSet // Last precommits for Height-1
} }
func (rs *RoundState) String() string { func (rs *RoundState) String() string {
@ -201,7 +201,7 @@ func (rs *RoundState) StringIndented(indent string) string {
%s ProposalBlock: %v %v %s ProposalBlock: %v %v
%s LockedBlock: %v %v %s LockedBlock: %v %v
%s Votes: %v %s Votes: %v
%s LastPrecommits: %v
%s LastCommit: %v
%s}`, %s}`,
indent, rs.Height, rs.Round, rs.Step, indent, rs.Height, rs.Round, rs.Step,
indent, rs.StartTime, indent, rs.StartTime,
@ -211,7 +211,7 @@ func (rs *RoundState) StringIndented(indent string) string {
indent, rs.ProposalBlockParts.StringShort(), rs.ProposalBlock.StringShort(), indent, rs.ProposalBlockParts.StringShort(), rs.ProposalBlock.StringShort(),
indent, rs.LockedBlockParts.StringShort(), rs.LockedBlock.StringShort(), indent, rs.LockedBlockParts.StringShort(), rs.LockedBlock.StringShort(),
indent, rs.Votes.StringIndented(indent+" "), indent, rs.Votes.StringIndented(indent+" "),
indent, rs.LastPrecommits.StringShort(),
indent, rs.LastCommit.StringShort(),
indent) indent)
} }
@ -252,13 +252,13 @@ func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReacto
} }
cs.updateToState(state, true) cs.updateToState(state, true)
cs.maybeRebond() cs.maybeRebond()
cs.reconstructLastPrecommits(state)
cs.reconstructLastCommit(state)
return cs return cs
} }
// Reconstruct LastPrecommits from SeenValidation, which we saved along with the block,
// Reconstruct LastCommit from SeenValidation, which we saved along with the block,
// (which happens even before saving the state) // (which happens even before saving the state)
func (cs *ConsensusState) reconstructLastPrecommits(state *sm.State) {
func (cs *ConsensusState) reconstructLastCommit(state *sm.State) {
if state.LastBlockHeight == 0 { if state.LastBlockHeight == 0 {
return return
} }
@ -275,13 +275,13 @@ func (cs *ConsensusState) reconstructLastPrecommits(state *sm.State) {
} }
added, _, err := lastPrecommits.AddByIndex(uint(idx), precommitVote) added, _, err := lastPrecommits.AddByIndex(uint(idx), precommitVote)
if !added || err != nil { if !added || err != nil {
panic(Fmt("Failed to reconstruct LastPrecommits: %v", err))
panic(Fmt("Failed to reconstruct LastCommit: %v", err))
} }
} }
if !lastPrecommits.HasTwoThirdsMajority() { if !lastPrecommits.HasTwoThirdsMajority() {
panic("Failed to reconstruct LastPrecommits: Does not have +2/3 maj")
panic("Failed to reconstruct LastCommit: Does not have +2/3 maj")
} }
cs.LastPrecommits = lastPrecommits
cs.LastCommit = lastPrecommits
} }
func (cs *ConsensusState) GetState() *sm.State { func (cs *ConsensusState) GetState() *sm.State {
@ -373,11 +373,14 @@ func (cs *ConsensusState) updateToState(state *sm.State, contiguous bool) {
cs.LockedBlock = nil cs.LockedBlock = nil
cs.LockedBlockParts = nil cs.LockedBlockParts = nil
cs.Votes = NewHeightVoteSet(height, validators) cs.Votes = NewHeightVoteSet(height, validators)
cs.LastPrecommits = lastPrecommits
cs.LastCommit = lastPrecommits
cs.state = state cs.state = state
cs.stagedBlock = nil cs.stagedBlock = nil
cs.stagedState = nil cs.stagedState = nil
// Finally, broadcast RoundState
cs.newStepCh <- cs.getRoundState()
} }
// If we're unbonded, broadcast RebondTx. // If we're unbonded, broadcast RebondTx.
@ -528,11 +531,11 @@ func (cs *ConsensusState) createProposalBlock() (*types.Block, *types.PartSet) {
var validation *types.Validation var validation *types.Validation
if cs.Height == 1 { if cs.Height == 1 {
// We're creating a proposal for the first block. // We're creating a proposal for the first block.
// The validation is empty.
// The validation is empty, but not nil.
validation = &types.Validation{} validation = &types.Validation{}
} else if cs.LastPrecommits.HasTwoThirdsMajority() {
// Make the validation from LastPrecommits
validation = cs.LastPrecommits.MakeValidation()
} else if cs.LastCommit.HasTwoThirdsMajority() {
// Make the validation from LastCommit
validation = cs.LastCommit.MakeValidation()
} else { } else {
// This shouldn't happen. // This shouldn't happen.
log.Error("EnterPropose: Cannot propose anything: No validation for the previous block.") log.Error("EnterPropose: Cannot propose anything: No validation for the previous block.")
@ -871,7 +874,6 @@ func (cs *ConsensusState) FinalizeCommit(height uint) {
// * cs.Height has been increment to height+1 // * cs.Height has been increment to height+1
// * cs.Step is now RoundStepNewHeight // * cs.Step is now RoundStepNewHeight
// * cs.StartTime is set to when we should start round0. // * cs.StartTime is set to when we should start round0.
cs.newStepCh <- cs.getRoundState()
// Start round 0 when cs.StartTime. // Start round 0 when cs.StartTime.
go cs.scheduleRound0(height) go cs.scheduleRound0(height)
return return
@ -954,9 +956,9 @@ func (cs *ConsensusState) AddVote(address []byte, vote *types.Vote) (added bool,
func (cs *ConsensusState) addVote(address []byte, vote *types.Vote) (added bool, index uint, err error) { func (cs *ConsensusState) addVote(address []byte, vote *types.Vote) (added bool, index uint, err error) {
// A precommit for the previous height? // A precommit for the previous height?
if vote.Height+1 == cs.Height && vote.Type == types.VoteTypePrecommit { if vote.Height+1 == cs.Height && vote.Type == types.VoteTypePrecommit {
added, index, err = cs.LastPrecommits.AddByAddress(address, vote)
added, index, err = cs.LastCommit.AddByAddress(address, vote)
if added { if added {
log.Debug(Fmt("Added to lastPrecommits: %v", cs.LastPrecommits.StringShort()))
log.Debug(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort()))
} }
return return
} }


+ 61
- 30
consensus/vote_set.go View File

@ -59,6 +59,14 @@ func (voteSet *VoteSet) Height() uint {
} }
} }
func (voteSet *VoteSet) Round() uint {
if voteSet == nil {
return 0
} else {
return voteSet.round
}
}
func (voteSet *VoteSet) Size() uint { func (voteSet *VoteSet) Size() uint {
if voteSet == nil { if voteSet == nil {
return 0 return 0
@ -206,36 +214,6 @@ func (voteSet *VoteSet) TwoThirdsMajority() (hash []byte, parts types.PartSetHea
} }
} }
func (voteSet *VoteSet) MakeValidation() *types.Validation {
if voteSet.type_ != types.VoteTypePrecommit {
panic("Cannot MakeValidation() unless VoteSet.Type is types.VoteTypePrecommit")
}
voteSet.mtx.Lock()
defer voteSet.mtx.Unlock()
if len(voteSet.maj23Hash) == 0 {
panic("Cannot MakeValidation() unless a blockhash has +2/3")
}
precommits := make([]types.Precommit, voteSet.valSet.Size())
voteSet.valSet.Iterate(func(valIndex uint, val *sm.Validator) bool {
vote := voteSet.votes[valIndex]
if vote == nil {
return false
}
if !bytes.Equal(vote.BlockHash, voteSet.maj23Hash) {
return false
}
if !vote.BlockParts.Equals(voteSet.maj23Parts) {
return false
}
precommits[valIndex] = types.Precommit{val.Address, vote.Signature}
return false
})
return &types.Validation{
Round: voteSet.round,
Precommits: precommits,
}
}
func (voteSet *VoteSet) String() string { func (voteSet *VoteSet) String() string {
return voteSet.StringIndented("") return voteSet.StringIndented("")
} }
@ -269,3 +247,56 @@ func (voteSet *VoteSet) StringShort() string {
return fmt.Sprintf(`VoteSet{H:%v R:%v T:%v +2/3:%v %v}`, return fmt.Sprintf(`VoteSet{H:%v R:%v T:%v +2/3:%v %v}`,
voteSet.height, voteSet.round, voteSet.type_, voteSet.maj23Exists, voteSet.votesBitArray) voteSet.height, voteSet.round, voteSet.type_, voteSet.maj23Exists, voteSet.votesBitArray)
} }
//--------------------------------------------------------------------------------
// Validation
func (voteSet *VoteSet) MakeValidation() *types.Validation {
if voteSet.type_ != types.VoteTypePrecommit {
panic("Cannot MakeValidation() unless VoteSet.Type is types.VoteTypePrecommit")
}
voteSet.mtx.Lock()
defer voteSet.mtx.Unlock()
if len(voteSet.maj23Hash) == 0 {
panic("Cannot MakeValidation() unless a blockhash has +2/3")
}
precommits := make([]types.Precommit, voteSet.valSet.Size())
voteSet.valSet.Iterate(func(valIndex uint, val *sm.Validator) bool {
vote := voteSet.votes[valIndex]
if vote == nil {
return false
}
if !bytes.Equal(vote.BlockHash, voteSet.maj23Hash) {
return false
}
if !vote.BlockParts.Equals(voteSet.maj23Parts) {
return false
}
precommits[valIndex] = types.Precommit{val.Address, vote.Signature}
return false
})
return &types.Validation{
Round: voteSet.round,
Precommits: precommits,
}
}
// XXX
func VoteSetFromValidation(validation *types.Validation) *VoteSet {
lastPrecommits := NewVoteSet(state.LastBlockHeight, 0, types.VoteTypePrecommit, state.LastBondedValidators)
seenValidation := cs.blockStore.LoadSeenValidation(state.LastBlockHeight)
for idx, precommit := range seenValidation.Precommits {
precommitVote := &types.Vote{
Height: state.LastBlockHeight,
Round: seenValidation.Round,
Type: types.VoteTypePrecommit,
BlockHash: state.LastBlockHash,
BlockParts: state.LastBlockParts,
Signature: precommit.Signature,
}
added, _, err := lastPrecommits.AddByIndex(uint(idx), precommitVote)
if !added || err != nil {
panic(Fmt("Failed to reconstruct LastPrecommits: %v", err))
}
}
}

+ 28
- 47
types/block.go View File

@ -15,9 +15,9 @@ import (
) )
type Block struct { type Block struct {
*Header `json:"header"`
*Validation `json:"validation"`
*Data `json:"data"`
*Header `json:"header"`
*Data `json:"data"`
LastValidation *Validation `json:"last_validation"`
} }
// Basic validation that doesn't involve state data. // Basic validation that doesn't involve state data.
@ -46,7 +46,7 @@ func (b *Block) ValidateBasic(chainID string, lastBlockHeight uint, lastBlockHas
} }
*/ */
if b.Header.Height != 1 { if b.Header.Height != 1 {
if err := b.Validation.ValidateBasic(); err != nil {
if err := b.LastValidation.ValidateBasic(); err != nil {
return err return err
} }
} }
@ -58,12 +58,12 @@ func (b *Block) ValidateBasic(chainID string, lastBlockHeight uint, lastBlockHas
// If the block is incomplete (e.g. missing Header.StateHash) // If the block is incomplete (e.g. missing Header.StateHash)
// then the hash is nil, to prevent the usage of that hash. // then the hash is nil, to prevent the usage of that hash.
func (b *Block) Hash() []byte { func (b *Block) Hash() []byte {
if b.Header == nil || b.Validation == nil || b.Data == nil {
if b.Header == nil || b.Data == nil || b.LastValidation == nil {
return nil return nil
} }
hashHeader := b.Header.Hash() hashHeader := b.Header.Hash()
hashValidation := b.Validation.Hash()
hashData := b.Data.Hash() hashData := b.Data.Hash()
hashLastValidation := b.LastValidation.Hash()
// If hashHeader is nil, required fields are missing. // If hashHeader is nil, required fields are missing.
if len(hashHeader) == 0 { if len(hashHeader) == 0 {
@ -71,7 +71,7 @@ func (b *Block) Hash() []byte {
} }
// Merkle hash from subhashes. // Merkle hash from subhashes.
hashes := [][]byte{hashHeader, hashValidation, hashData}
hashes := [][]byte{hashHeader, hashData, hashLastValidation}
return merkle.SimpleHashFromHashes(hashes) return merkle.SimpleHashFromHashes(hashes)
} }
@ -106,8 +106,8 @@ func (b *Block) StringIndented(indent string) string {
%s %v %s %v
%s}#%X`, %s}#%X`,
indent, b.Header.StringIndented(indent+" "), indent, b.Header.StringIndented(indent+" "),
indent, b.Validation.StringIndented(indent+" "),
indent, b.Data.StringIndented(indent+" "), indent, b.Data.StringIndented(indent+" "),
indent, b.LastValidation.StringIndented(indent+" "),
indent, b.Hash()) indent, b.Hash())
} }
@ -174,56 +174,39 @@ func (h *Header) StringIndented(indent string) string {
indent, h.Hash()) indent, h.Hash())
} }
//-----------------------------------------------------------------------------
type Precommit struct {
Address []byte `json:"address"`
Signature account.SignatureEd25519 `json:"signature"`
}
func (pc Precommit) IsZero() bool {
return pc.Signature.IsZero()
}
func (pc Precommit) String() string {
return fmt.Sprintf("Precommit{A:%X %X}", pc.Address, Fingerprint(pc.Signature))
}
//------------------------------------- //-------------------------------------
// NOTE: The Precommits are in order of address to preserve the bonded ValidatorSet order.
// Any peer with a block can gossip precommits by index with a peer without recalculating the
// active ValidatorSet.
// NOTE: Validation is empty for height 1, but never nil.
type Validation struct { type Validation struct {
Round uint `json:"round"` // Round for all precommits
Precommits []Precommit `json:"precommits"` // Precommits (or nil) of all active validators in address order.
// NOTE: The Precommits are in order of address to preserve the bonded ValidatorSet order.
// Any peer with a block can gossip precommits by index with a peer without recalculating the
// active ValidatorSet.
Precommits []*Vote `json:"precommits"`
// Volatile // Volatile
hash []byte hash []byte
bitArray *BitArray bitArray *BitArray
} }
func (v *Validation) Height() uint {
if len(v.Precommits) == 0 {
return 0
}
return v.Precommits[0].Height
}
func (v *Validation) Round() uint {
if len(v.Precommits) == 0 {
return 0
}
return v.Precommits[0].Round
}
func (v *Validation) ValidateBasic() error { func (v *Validation) ValidateBasic() error {
if len(v.Precommits) == 0 { if len(v.Precommits) == 0 {
return errors.New("No precommits in validation") return errors.New("No precommits in validation")
} }
lastAddress := []byte{}
for i := 0; i < len(v.Precommits); i++ {
precommit := v.Precommits[i]
if precommit.IsZero() {
if len(precommit.Address) > 0 {
return errors.New("Zero precommits should not have an address")
}
} else {
if len(precommit.Address) == 0 {
return errors.New("Nonzero precommits should have an address")
}
if len(lastAddress) > 0 && bytes.Compare(lastAddress, precommit.Address) != -1 {
return errors.New("Invalid precommit order")
}
lastAddress = precommit.Address
}
}
// TODO Additional validation?
return nil return nil
} }
@ -248,10 +231,8 @@ func (v *Validation) StringIndented(indent string) string {
precommitStrings[i] = precommit.String() precommitStrings[i] = precommit.String()
} }
return fmt.Sprintf(`Validation{ return fmt.Sprintf(`Validation{
%s Round: %v
%s Precommits: %v %s Precommits: %v
%s}#%X`, %s}#%X`,
indent, v.Round,
indent, strings.Join(precommitStrings, "\n"+indent+" "), indent, strings.Join(precommitStrings, "\n"+indent+" "),
indent, v.hash) indent, v.hash)
} }


Loading…
Cancel
Save