diff --git a/consensus/consensus.go b/consensus/consensus.go index 01ba2b498..17b5d8a79 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -92,6 +92,12 @@ func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, //----------------------------------------------------------------------------- +type RoundAction struct { + Height uint32 // The block height for which consensus is reaching for. + Round uint16 // The round number at given height. + Action RoundActionType // Action to perform. +} + type ConsensusReactor struct { sw *p2p.Switch quit chan struct{} @@ -241,9 +247,9 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte } if added { // Maybe send HasVotesMessage + // TODO optimize. It would be better to just acks for each vote! voteAddCounter++ if voteAddCounter%hasVotesThreshold == 0 { - // TODO optimize. msg := &HasVotesMessage{ Height: rs.Height, Round: rs.Round, @@ -274,56 +280,50 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte } } -//------------------------------------- - -type RoundAction struct { - Height uint32 // The block height for which consensus is reaching for. - Round uint16 // The round number at given height. - Action RoundActionType // Action to perform. -} +//-------------------------------------- // Source of all round state transitions (and votes). -// It can be preemptively woken up via a message to -// doActionCh. func (conR *ConsensusReactor) stepTransitionRoutine() { + // Schedule the next action by pushing a RoundAction{} to conR.doActionCh + // when it is due. scheduleNextAction := func() { - // Figure out which height/round/step we're at, - // then schedule an action for when it is due. rs := conR.conS.GetRoundState() _, _, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime) - switch rs.Step { - case RoundStepStart: - // It's a new RoundState. - if elapsedRatio < 0 { - // startTime is in the future. - time.Sleep(time.Duration(-1.0*elapsedRatio) * roundDuration) - } - conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPropose} - case RoundStepPropose: - // Wake up when it's time to vote. - time.Sleep(time.Duration(roundDeadlinePrevote-elapsedRatio) * roundDuration) - conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrevote} - case RoundStepPrevote: - // Wake up when it's time to precommit. - time.Sleep(time.Duration(roundDeadlinePrecommit-elapsedRatio) * roundDuration) - conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrecommit} - case RoundStepPrecommit: - // Wake up when the round is over. - time.Sleep(time.Duration(1.0-elapsedRatio) * roundDuration) - conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionNextRound} - case RoundStepCommit: - panic("Should not happen: RoundStepCommit waits until +2/3 commits.") - case RoundStepCommitWait: - // Wake up when it's time to finalize commit. - if rs.CommitTime.IsZero() { - panic("RoundStepCommitWait requires rs.CommitTime") + go func() { + switch rs.Step { + case RoundStepStart: + // It's a new RoundState. + if elapsedRatio < 0 { + // startTime is in the future. + time.Sleep(time.Duration(-1.0*elapsedRatio) * roundDuration) + } + conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPropose} + case RoundStepPropose: + // Wake up when it's time to vote. + time.Sleep(time.Duration(roundDeadlinePrevote-elapsedRatio) * roundDuration) + conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrevote} + case RoundStepPrevote: + // Wake up when it's time to precommit. + time.Sleep(time.Duration(roundDeadlinePrecommit-elapsedRatio) * roundDuration) + conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrecommit} + case RoundStepPrecommit: + // Wake up when the round is over. + time.Sleep(time.Duration(1.0-elapsedRatio) * roundDuration) + conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionNextRound} + case RoundStepCommit: + panic("Should not happen: RoundStepCommit waits until +2/3 commits.") + case RoundStepCommitWait: + // Wake up when it's time to finalize commit. + if rs.CommitTime.IsZero() { + panic("RoundStepCommitWait requires rs.CommitTime") + } + time.Sleep(rs.CommitTime.Sub(time.Now()) + finalizeDuration) + conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionFinalize} + default: + panic("Should not happen") } - time.Sleep(rs.CommitTime.Sub(time.Now()) + finalizeDuration) - conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionFinalize} - default: - panic("Should not happen") - } + }() } scheduleNextAction() @@ -435,7 +435,7 @@ ACTION_LOOP: if rs.Step >= RoundStepCommitWait { continue ACTION_LOOP } - // First we must commit. + // Commit first we haven't already. if rs.Step < RoundStepCommit { // NOTE: Duplicated in RoundActionCommit. hash := conR.conS.RunActionCommit(rs.Height) @@ -451,7 +451,7 @@ ACTION_LOOP: panic("This shouldn't happen") } } - // Now wait for more commit votes. + // Wait for more commit votes. conR.conS.RunActionCommitWait(rs.Height) scheduleNextAction() continue ACTION_LOOP @@ -474,7 +474,16 @@ ACTION_LOOP: } } -//------------------------------------- +func (conR *ConsensusReactor) signAndBroadcastVote(rs *RoundState, vote *Vote) { + if rs.PrivValidator != nil { + rs.PrivValidator.Sign(vote) + conR.conS.AddVote(vote) + msg := p2p.TypedMessage{msgTypeVote, vote} + conR.sw.Broadcast(VoteCh, msg) + } +} + +//-------------------------------------- func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) { @@ -540,8 +549,6 @@ OUTER_LOOP: } } -//------------------------------------- - func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) { OUTER_LOOP: for { @@ -621,16 +628,6 @@ OUTER_LOOP: } } -// Signs a vote document and broadcasts it. -func (conR *ConsensusReactor) signAndBroadcastVote(rs *RoundState, vote *Vote) { - if rs.PrivValidator != nil { - rs.PrivValidator.Sign(vote) - conR.conS.AddVote(vote) - msg := p2p.TypedMessage{msgTypeVote, vote} - conR.sw.Broadcast(VoteCh, msg) - } -} - //----------------------------------------------------------------------------- // Read only when returned by PeerState.GetRoundState(). diff --git a/consensus/state.go b/consensus/state.go index 68688e98c..107133fc3 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -222,30 +222,7 @@ func (cs *ConsensusState) SetPrivValidator(priv *PrivValidator) { cs.PrivValidator = priv } -func (cs *ConsensusState) SetProposal(proposal *Proposal) error { - cs.mtx.Lock() - defer cs.mtx.Unlock() - - // Already have one - if cs.Proposal != nil { - return nil - } - - // Invalid. - if proposal.Height != cs.Height || proposal.Round != cs.Round { - return nil - } - - // Verify signature - if !cs.Validators.Proposer().Verify(proposal) { - return ErrInvalidProposalSignature - } - - cs.Proposal = proposal - cs.ProposalBlockPartSet = NewPartSetFromMetadata(proposal.BlockPartsTotal, proposal.BlockPartsHash) - cs.ProposalPOLPartSet = NewPartSetFromMetadata(proposal.POLPartsTotal, proposal.POLPartsHash) - return nil -} +//----------------------------------------------------------------------------- func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) { cs.mtx.Lock() @@ -321,61 +298,6 @@ func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) { cs.ProposalPOLPartSet = polPartSet } -// NOTE: block is not necessarily valid. -func (cs *ConsensusState) AddProposalBlockPart(height uint32, round uint16, part *Part) (added bool, err error) { - cs.mtx.Lock() - defer cs.mtx.Unlock() - - // Blocks might be reused, so round mismatch is OK - if cs.Height != height { - return false, nil - } - - // We're not expecting a block part. - if cs.ProposalBlockPartSet != nil { - return false, nil // TODO: bad peer? Return error? - } - - added, err = cs.ProposalBlockPartSet.AddPart(part) - if err != nil { - return added, err - } - if added && cs.ProposalBlockPartSet.IsComplete() { - var n int64 - var err error - cs.ProposalBlock = ReadBlock(cs.ProposalBlockPartSet.GetReader(), &n, &err) - return true, err - } - return true, nil -} - -// NOTE: POL is not necessarily valid. -func (cs *ConsensusState) AddProposalPOLPart(height uint32, round uint16, part *Part) (added bool, err error) { - cs.mtx.Lock() - defer cs.mtx.Unlock() - - if cs.Height != height || cs.Round != round { - return false, nil - } - - // We're not expecting a POL part. - if cs.ProposalPOLPartSet != nil { - return false, nil // TODO: bad peer? Return error? - } - - added, err = cs.ProposalPOLPartSet.AddPart(part) - if err != nil { - return added, err - } - if added && cs.ProposalPOLPartSet.IsComplete() { - var n int64 - var err error - cs.ProposalPOL = ReadPOL(cs.ProposalPOLPartSet.GetReader(), &n, &err) - return true, err - } - return true, nil -} - func (cs *ConsensusState) RunActionPrevote(height uint32, round uint16) []byte { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -399,24 +321,6 @@ func (cs *ConsensusState) RunActionPrevote(height uint32, round uint16) []byte { } } -func (cs *ConsensusState) AddVote(vote *Vote) (added bool, err error) { - switch vote.Type { - case VoteTypePrevote: - // Prevotes checks for height+round match. - return cs.Prevotes.Add(vote) - case VoteTypePrecommit: - // Precommits checks for height+round match. - return cs.Precommits.Add(vote) - case VoteTypeCommit: - // Commits checks for height match. - cs.Prevotes.Add(vote) - cs.Precommits.Add(vote) - return cs.Commits.Add(vote) - default: - panic("Unknown vote type") - } -} - // Lock the ProposalBlock if we have enough prevotes for it, // or unlock an existing lock if +2/3 of prevotes were nil. // Returns a blockhash if a block was locked. @@ -428,7 +332,7 @@ func (cs *ConsensusState) RunActionPrecommit(height uint32, round uint16) []byte } cs.Step = RoundStepPrecommit - if hash, _, ok := cs.Prevotes.TwoThirdsMajority(); ok { + if hash, ok := cs.Prevotes.TwoThirdsMajority(); ok { // Remember this POL. (hash may be nil) cs.LockedPOL = cs.Prevotes.MakePOL() @@ -477,7 +381,7 @@ func (cs *ConsensusState) RunActionCommit(height uint32) []byte { } cs.Step = RoundStepCommit - if hash, _, ok := cs.Precommits.TwoThirdsMajority(); ok { + if hash, ok := cs.Precommits.TwoThirdsMajority(); ok { // There are some strange cases that shouldn't happen // (unless voters are duplicitous). @@ -534,9 +438,8 @@ func (cs *ConsensusState) RunActionCommitWait(height uint32) { } cs.Step = RoundStepCommitWait - if _, commitTime, ok := cs.Commits.TwoThirdsMajority(); ok { - // Remember the commitTime. - cs.CommitTime = commitTime + if cs.Commits.HasTwoThirdsMajority() { + cs.CommitTime = time.Now() } else { panic("RunActionCommitWait() expects +2/3 commits") } @@ -553,6 +456,106 @@ func (cs *ConsensusState) RunActionFinalize(height uint32) { cs.updateToState(cs.stagedState) } +//----------------------------------------------------------------------------- + +func (cs *ConsensusState) SetProposal(proposal *Proposal) error { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + // Already have one + if cs.Proposal != nil { + return nil + } + + // Invalid. + if proposal.Height != cs.Height || proposal.Round != cs.Round { + return nil + } + + // Verify signature + if !cs.Validators.Proposer().Verify(proposal) { + return ErrInvalidProposalSignature + } + + cs.Proposal = proposal + cs.ProposalBlockPartSet = NewPartSetFromMetadata(proposal.BlockPartsTotal, proposal.BlockPartsHash) + cs.ProposalPOLPartSet = NewPartSetFromMetadata(proposal.POLPartsTotal, proposal.POLPartsHash) + return nil +} + +// NOTE: block is not necessarily valid. +func (cs *ConsensusState) AddProposalBlockPart(height uint32, round uint16, part *Part) (added bool, err error) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + // Blocks might be reused, so round mismatch is OK + if cs.Height != height { + return false, nil + } + + // We're not expecting a block part. + if cs.ProposalBlockPartSet != nil { + return false, nil // TODO: bad peer? Return error? + } + + added, err = cs.ProposalBlockPartSet.AddPart(part) + if err != nil { + return added, err + } + if added && cs.ProposalBlockPartSet.IsComplete() { + var n int64 + var err error + cs.ProposalBlock = ReadBlock(cs.ProposalBlockPartSet.GetReader(), &n, &err) + return true, err + } + return true, nil +} + +// NOTE: POL is not necessarily valid. +func (cs *ConsensusState) AddProposalPOLPart(height uint32, round uint16, part *Part) (added bool, err error) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + if cs.Height != height || cs.Round != round { + return false, nil + } + + // We're not expecting a POL part. + if cs.ProposalPOLPartSet != nil { + return false, nil // TODO: bad peer? Return error? + } + + added, err = cs.ProposalPOLPartSet.AddPart(part) + if err != nil { + return added, err + } + if added && cs.ProposalPOLPartSet.IsComplete() { + var n int64 + var err error + cs.ProposalPOL = ReadPOL(cs.ProposalPOLPartSet.GetReader(), &n, &err) + return true, err + } + return true, nil +} + +func (cs *ConsensusState) AddVote(vote *Vote) (added bool, err error) { + switch vote.Type { + case VoteTypePrevote: + // Prevotes checks for height+round match. + return cs.Prevotes.Add(vote) + case VoteTypePrecommit: + // Precommits checks for height+round match. + return cs.Precommits.Add(vote) + case VoteTypeCommit: + // Commits checks for height match. + cs.Prevotes.Add(vote) + cs.Precommits.Add(vote) + return cs.Commits.Add(vote) + default: + panic("Unknown vote type") + } +} + func (cs *ConsensusState) stageBlock(block *Block) error { // Already staged? diff --git a/consensus/vote_set.go b/consensus/vote_set.go index 9a8d449f6..1d9fd7c73 100644 --- a/consensus/vote_set.go +++ b/consensus/vote_set.go @@ -5,7 +5,6 @@ import ( "fmt" "strings" "sync" - "time" . "github.com/tendermint/tendermint/blocks" . "github.com/tendermint/tendermint/common" @@ -23,14 +22,14 @@ type VoteSet struct { round uint16 type_ byte - mtx sync.Mutex - vset *state.ValidatorSet - votes map[uint64]*Vote - votesBitArray BitArray - votesByBlockHash map[string]uint64 - totalVotes uint64 - twoThirdsMajority []byte - twoThirdsCommitTime time.Time + mtx sync.Mutex + vset *state.ValidatorSet + votes map[uint64]*Vote + votesBitArray BitArray + votesByBlockHash map[string]uint64 + totalVotes uint64 + twoThirdsMajority []byte + twoThirdsExists bool } // Constructs a new VoteSet struct used to accumulate votes for each round. @@ -105,7 +104,7 @@ func (vs *VoteSet) addVote(vote *Vote) (bool, error) { if totalBlockHashVotes > vs.vset.TotalVotingPower()*2/3 && (totalBlockHashVotes-val.VotingPower) <= vs.vset.TotalVotingPower()*2/3 { vs.twoThirdsMajority = vote.BlockHash - vs.twoThirdsCommitTime = time.Now() + vs.twoThirdsExists = true } return true, nil @@ -149,18 +148,19 @@ func (vs *VoteSet) HasTwoThirdsMajority() bool { } vs.mtx.Lock() defer vs.mtx.Unlock() - return !vs.twoThirdsCommitTime.IsZero() + return vs.twoThirdsExists } // Returns either a blockhash (or nil) that received +2/3 majority. // If there exists no such majority, returns (nil, false). -func (vs *VoteSet) TwoThirdsMajority() (hash []byte, commitTime time.Time, ok bool) { +func (vs *VoteSet) TwoThirdsMajority() (hash []byte, ok bool) { vs.mtx.Lock() defer vs.mtx.Unlock() - if vs.twoThirdsCommitTime.IsZero() { - return nil, time.Time{}, false + if vs.twoThirdsExists { + return vs.twoThirdsMajority, true + } else { + return nil, false } - return vs.twoThirdsMajority, vs.twoThirdsCommitTime, true } func (vs *VoteSet) MakePOL() *POL { @@ -169,7 +169,7 @@ func (vs *VoteSet) MakePOL() *POL { } vs.mtx.Lock() defer vs.mtx.Unlock() - if vs.twoThirdsCommitTime.IsZero() { + if !vs.twoThirdsExists { return nil } majHash := vs.twoThirdsMajority // hash may be nil. diff --git a/consensus/vote_set_test.go b/consensus/vote_set_test.go index 50f77370e..79bfe4fd6 100644 --- a/consensus/vote_set_test.go +++ b/consensus/vote_set_test.go @@ -20,8 +20,8 @@ func TestAddVote(t *testing.T) { if voteSet.BitArray().GetIndex(0) { t.Errorf("Expected BitArray.GetIndex(0) to be false") } - hash, commitTime, ok := voteSet.TwoThirdsMajority() - if hash != nil || !commitTime.IsZero() || ok { + hash, ok := voteSet.TwoThirdsMajority() + if hash != nil || ok { t.Errorf("There should be no 2/3 majority") } @@ -35,8 +35,8 @@ func TestAddVote(t *testing.T) { if !voteSet.BitArray().GetIndex(0) { t.Errorf("Expected BitArray.GetIndex(0) to be true") } - hash, commitTime, ok = voteSet.TwoThirdsMajority() - if hash != nil || !commitTime.IsZero() || ok { + hash, ok = voteSet.TwoThirdsMajority() + if hash != nil || ok { t.Errorf("There should be no 2/3 majority") } } @@ -50,8 +50,8 @@ func Test2_3Majority(t *testing.T) { privAccounts[i].Sign(vote) voteSet.Add(vote) } - hash, commitTime, ok := voteSet.TwoThirdsMajority() - if hash != nil || !commitTime.IsZero() || ok { + hash, ok := voteSet.TwoThirdsMajority() + if hash != nil || ok { t.Errorf("There should be no 2/3 majority") } @@ -59,8 +59,8 @@ func Test2_3Majority(t *testing.T) { vote.BlockHash = CRandBytes(32) privAccounts[6].Sign(vote) voteSet.Add(vote) - hash, commitTime, ok = voteSet.TwoThirdsMajority() - if hash != nil || !commitTime.IsZero() || ok { + hash, ok = voteSet.TwoThirdsMajority() + if hash != nil || ok { t.Errorf("There should be no 2/3 majority") } @@ -68,8 +68,8 @@ func Test2_3Majority(t *testing.T) { vote.BlockHash = nil privAccounts[7].Sign(vote) voteSet.Add(vote) - hash, commitTime, ok = voteSet.TwoThirdsMajority() - if hash != nil || commitTime.IsZero() || !ok { + hash, ok = voteSet.TwoThirdsMajority() + if hash != nil || !ok { t.Errorf("There should be 2/3 majority for nil") } @@ -128,8 +128,8 @@ func TestAddCommitsToPrevoteVotes(t *testing.T) { privAccounts[i].Sign(vote) voteSet.Add(vote) } - hash, commitTime, ok := voteSet.TwoThirdsMajority() - if hash != nil || !commitTime.IsZero() || ok { + hash, ok := voteSet.TwoThirdsMajority() + if hash != nil || ok { t.Errorf("There should be no 2/3 majority") } @@ -174,8 +174,8 @@ func TestAddCommitsToPrevoteVotes(t *testing.T) { } // We should have 2/3 majority - hash, commitTime, ok = voteSet.TwoThirdsMajority() - if hash != nil || commitTime.IsZero() || !ok { + hash, ok = voteSet.TwoThirdsMajority() + if hash != nil || !ok { t.Errorf("There should be 2/3 majority for nil") }