From 5ffe406f16b8d7efbe13910f8879e194df6d3396 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Tue, 21 Oct 2014 01:18:46 -0700 Subject: [PATCH] RunAction* directly on ConsensusState. --- consensus/consensus.go | 133 +++++++++++----------------------------- consensus/part_set.go | 3 + consensus/proposal.go | 4 +- consensus/state.go | 112 +++++++++++++++++++++------------ consensus/state_test.go | 127 ++++++++++++++++++++++++++++++++++---- state/validator_set.go | 4 ++ 6 files changed, 235 insertions(+), 148 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index 51c285f9b..d3a669079 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -296,8 +296,8 @@ func (conR *ConsensusReactor) stepTransitionRoutine() { case RoundStepPropose: // Wake up when it's time to vote. time.Sleep(time.Duration(roundDeadlinePrevote-elapsedRatio) * roundDuration) - conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionVote} - case RoundStepVote: + conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrevote} + case RoundStepPrevote: // Wake up when it's time to precommit. time.Sleep(time.Duration(roundDeadlinePrecommit-elapsedRatio) * roundDuration) conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrecommit} @@ -323,8 +323,7 @@ func (conR *ConsensusReactor) stepTransitionRoutine() { round := roundAction.Round action := roundAction.Action rs := conR.conS.GetRoundState() - setStepAndBroadcast := func(step RoundStep) { - conR.conS.SetStep(step) + broadcastNewRoundStep := func(step RoundStep) { // Broadcast NewRoundStepMessage msg := &NewRoundStepMessage{ Height: height, @@ -344,24 +343,43 @@ func (conR *ConsensusReactor) stepTransitionRoutine() { // Run step if action == RoundActionPropose && rs.Step == RoundStepStart { - conR.runStepPropose(rs) - setStepAndBroadcast(RoundStepPropose) - } else if action == RoundActionVote && rs.Step <= RoundStepPropose { - conR.runStepPrevote(rs) - setStepAndBroadcast(RoundStepVote) - } else if action == RoundActionPrecommit && rs.Step <= RoundStepVote { - conR.runStepPrecommit(rs) - setStepAndBroadcast(RoundStepPrecommit) + conR.conS.RunActionPropose(rs.Height, rs.Round) + broadcastNewRoundStep(RoundStepPropose) + } else if action == RoundActionPrevote && rs.Step <= RoundStepPropose { + hash := conR.conS.RunActionPrevote(rs.Height, rs.Round) + broadcastNewRoundStep(RoundStepPrevote) + conR.signAndBroadcastVote(rs, &Vote{ + Height: rs.Height, + Round: rs.Round, + Type: VoteTypePrevote, + BlockHash: hash, + }) + } else if action == RoundActionPrecommit && rs.Step <= RoundStepPrevote { + hash := conR.conS.RunActionPrecommit(rs.Height, rs.Round) + broadcastNewRoundStep(RoundStepPrecommit) + if len(hash) > 0 { + conR.signAndBroadcastVote(rs, &Vote{ + Height: rs.Height, + Round: rs.Round, + Type: VoteTypePrecommit, + BlockHash: hash, + }) + } } else if action == RoundActionCommit && rs.Step <= RoundStepPrecommit { - committed := conR.runStepCommit(rs) - if committed { - setStepAndBroadcast(RoundStepCommit) + hash := conR.conS.RunActionCommit(rs.Height, rs.Round) + if len(hash) > 0 { + broadcastNewRoundStep(RoundStepCommit) + conR.signAndBroadcastVote(rs, &Vote{ + Height: rs.Height, + Round: rs.Round, + Type: VoteTypeCommit, + BlockHash: hash, + }) } else { - // runStepCommit() already set the round to the next round, - // so the step is already RoundStepStart (same height). + conR.conS.SetupRound(rs.Round + 1) } } else if action == RoundActionFinalize && rs.Step == RoundStepCommit { - conR.runStepFinalize(rs) + conR.conS.RunActionFinalize(rs.Height, rs.Round) // Height has been incremented, step is now RoundStepStart. } else { // This shouldn't happen now, but if an external source pushes @@ -454,7 +472,7 @@ OUTER_LOOP: } // If there are prevotes to send... - if prs.Step <= RoundStepVote { + if prs.Step <= RoundStepPrevote { index, ok := rs.Prevotes.BitArray().Sub(prs.Prevotes).PickRandom() if ok { valId, val := rs.Validators.GetByIndex(uint32(index)) @@ -518,83 +536,6 @@ func (conR *ConsensusReactor) signAndBroadcastVote(rs *RoundState, vote *Vote) { } } -//------------------------------------- - -func (conR *ConsensusReactor) runStepPropose(rs *RoundState) { - conR.conS.MakeProposal() -} - -func (conR *ConsensusReactor) runStepPrevote(rs *RoundState) { - // If we have a locked block, we must vote for that. - // NOTE: a locked block is already valid. - if rs.LockedBlock != nil { - conR.signAndBroadcastVote(rs, &Vote{ - Height: rs.Height, - Round: rs.Round, - Type: VoteTypePrevote, - BlockHash: rs.LockedBlock.Hash(), - }) - } - // Try staging proposed block. - // If Block is nil, an error is returned. - err := conR.conS.stageBlock(rs.ProposalBlock) - if err != nil { - // Prevote nil - conR.signAndBroadcastVote(rs, &Vote{ - Height: rs.Height, - Round: rs.Round, - Type: VoteTypePrevote, - BlockHash: nil, - }) - } else { - // Prevote block - conR.signAndBroadcastVote(rs, &Vote{ - Height: rs.Height, - Round: rs.Round, - Type: VoteTypePrevote, - BlockHash: rs.ProposalBlock.Hash(), - }) - } -} - -func (conR *ConsensusReactor) runStepPrecommit(rs *RoundState) { - // If we see a 2/3 majority of votes for a block, lock. - hash := conR.conS.LockOrUnlock(rs.Height, rs.Round) - if len(hash) > 0 { - // Precommit block - conR.signAndBroadcastVote(rs, &Vote{ - Height: rs.Height, - Round: rs.Round, - Type: VoteTypePrecommit, - BlockHash: hash, - }) - } -} - -func (conR *ConsensusReactor) runStepCommit(rs *RoundState) bool { - // If we see a 2/3 majority of votes for a block, lock. - block := conR.conS.TryCommit(rs.Height, rs.Round) - if block == nil { - // Couldn't commit, try next round. - conR.conS.SetupRound(rs.Round + 1) - return false - } else { - // Commit block. - conR.signAndBroadcastVote(rs, &Vote{ - Height: rs.Height, - Round: rs.Round, - Type: VoteTypePrecommit, - BlockHash: block.Hash(), - }) - return true - } -} - -func (conR *ConsensusReactor) runStepFinalize(rs *RoundState) { - // This actually updates the height and sets up round 0. - conR.conS.FinalizeCommit() -} - //----------------------------------------------------------------------------- // Read only when returned by PeerState.GetRoundState(). diff --git a/consensus/part_set.go b/consensus/part_set.go index 9b10e02b1..7b8a0705f 100644 --- a/consensus/part_set.go +++ b/consensus/part_set.go @@ -141,6 +141,9 @@ func (ps *PartSet) BitArray() BitArray { } func (ps *PartSet) RootHash() []byte { + if ps == nil { + return nil + } return ps.rootHash } diff --git a/consensus/proposal.go b/consensus/proposal.go index ab87281a3..53b7a73e7 100644 --- a/consensus/proposal.go +++ b/consensus/proposal.go @@ -23,8 +23,10 @@ type Proposal struct { Signature } -func NewProposal(height uint32, round uint16, blockPartsTotal uint16, blockPartsHash []byte, +func NewProposal(height uint32, round uint16, + blockPartsTotal uint16, blockPartsHash []byte, polPartsTotal uint16, polPartsHash []byte) *Proposal { + return &Proposal{ Height: height, Round: round, diff --git a/consensus/state.go b/consensus/state.go index 938650526..ab2a1a9bd 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -20,7 +20,7 @@ type RoundActionType uint8 const ( RoundStepStart = RoundStep(0x00) // Round started. RoundStepPropose = RoundStep(0x01) // Did propose, gossip proposal. - RoundStepVote = RoundStep(0x02) // Did vote, gossip votes. + RoundStepPrevote = RoundStep(0x02) // Did prevote, gossip prevotes. RoundStepPrecommit = RoundStep(0x03) // Did precommit, gossip precommits. RoundStepCommit = RoundStep(0x04) // Did commit, gossip commits. @@ -28,11 +28,11 @@ const ( // we progress to the next round, skipping RoundStepCommit. // // If a block was committed, we goto RoundStepCommit, - // then wait "finalizeDuration" to gather more commit votes, + // then wait "finalizeDuration" to gather more commits, // then we progress to the next height at round 0. RoundActionPropose = RoundActionType(0x00) // Goto RoundStepPropose - RoundActionVote = RoundActionType(0x01) // Goto RoundStepVote + RoundActionPrevote = RoundActionType(0x01) // Goto RoundStepPrevote RoundActionPrecommit = RoundActionType(0x02) // Goto RoundStepPrecommit RoundActionCommit = RoundActionType(0x03) // Goto RoundStepCommit or RoundStepStart next round RoundActionFinalize = RoundActionType(0x04) // Goto RoundStepStart next height @@ -135,14 +135,14 @@ func (cs *ConsensusState) GetRoundState() *RoundState { func (cs *ConsensusState) updateToState(state *state.State) { // Sanity check state. - stateHeight := state.Height - if stateHeight > 0 && stateHeight != cs.Height+1 { - Panicf("updateToState() expected state height of %v but found %v", cs.Height+1, stateHeight) + if cs.Height > 0 && cs.Height != state.Height { + Panicf("updateToState() expected state height of %v but found %v", + cs.Height, state.Height) } // Reset fields based on state. - height := state.Height validators := state.BondedValidators + height := state.Height + 1 // next desired block height cs.Height = height cs.Round = 0 cs.Step = RoundStepStart @@ -202,16 +202,6 @@ func (cs *ConsensusState) setupRound(round uint16) { cs.Precommits.AddFromCommits(cs.Commits) } -func (cs *ConsensusState) SetStep(step RoundStep) { - cs.mtx.Lock() - defer cs.mtx.Unlock() - if cs.Step < step { - cs.Step = step - } else { - panic("step regression") - } -} - func (cs *ConsensusState) SetPrivValidator(priv *PrivValidator) { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -243,9 +233,13 @@ func (cs *ConsensusState) SetProposal(proposal *Proposal) error { return nil } -func (cs *ConsensusState) MakeProposal() { +func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) { cs.mtx.Lock() defer cs.mtx.Unlock() + if cs.Height != height || cs.Round != round { + return + } + cs.Step = RoundStepPropose if cs.PrivValidator == nil || cs.Validators.Proposer().Id != cs.PrivValidator.Id { return @@ -262,11 +256,19 @@ func (cs *ConsensusState) MakeProposal() { block = cs.LockedBlock pol = cs.LockedPOL } else { - // We need to create a proposal. - // If we don't have enough commits from the last height, - // we can't do anything. - if !cs.LastCommits.HasTwoThirdsMajority() { - return + var validation Validation + if cs.Height == 1 { + // We're creating a proposal for the first block. + // The validation is empty. + } else { + // We need to create a proposal. + // If we don't have enough commits from the last height, + // we can't do anything. + if !cs.LastCommits.HasTwoThirdsMajority() { + return + } else { + validation = cs.LastCommits.MakeValidation() + } } txs, state := cs.mempool.GetProposalTxs() // TODO: cache state block = &Block{ @@ -277,7 +279,7 @@ func (cs *ConsensusState) MakeProposal() { LastBlockHash: cs.state.BlockHash, StateHash: state.Hash(), }, - Validation: cs.LastCommits.MakeValidation(), + Validation: validation, Data: Data{ Txs: txs, }, @@ -288,10 +290,13 @@ func (cs *ConsensusState) MakeProposal() { blockPartSet = NewPartSetFromData(BinaryBytes(block)) if pol != nil { polPartSet = NewPartSetFromData(BinaryBytes(pol)) + } else { + } // Make proposal - proposal := NewProposal(cs.Height, cs.Round, blockPartSet.Total(), blockPartSet.RootHash(), + proposal := NewProposal(cs.Height, cs.Round, + blockPartSet.Total(), blockPartSet.RootHash(), polPartSet.Total(), polPartSet.RootHash()) cs.PrivValidator.Sign(proposal) @@ -358,6 +363,29 @@ func (cs *ConsensusState) AddProposalPOLPart(height uint32, round uint16, part * return true, nil } +func (cs *ConsensusState) RunActionPrevote(height uint32, round uint16) []byte { + cs.mtx.Lock() + defer cs.mtx.Unlock() + if cs.Height != height || cs.Round != round { + return nil + } + cs.Step = RoundStepPrevote + + // If a block is locked, prevote that. + if cs.LockedBlock != nil { + return cs.LockedBlock.Hash() + } + // Try staging proposed block. + err := cs.stageBlock(cs.ProposalBlock) + if err != nil { + // Prevote nil. + return nil + } else { + // Prevote block. + return cs.ProposalBlock.Hash() + } +} + func (cs *ConsensusState) AddVote(vote *Vote) (added bool, err error) { switch vote.Type { case VoteTypePrevote: @@ -376,16 +404,16 @@ func (cs *ConsensusState) AddVote(vote *Vote) (added bool, err error) { } } -// Lock the ProposalBlock if we have enough votes for it, -// or unlock an existing lock if +2/3 of votes were nil. +// Lock the ProposalBlock if we have enough prevotes for it, +// or unlock an existing lock if +2/3 of prevotes were nil. // Returns a blockhash if a block was locked. -func (cs *ConsensusState) LockOrUnlock(height uint32, round uint16) []byte { +func (cs *ConsensusState) RunActionPrecommit(height uint32, round uint16) []byte { cs.mtx.Lock() defer cs.mtx.Unlock() - if cs.Height != height || cs.Round != round { return nil } + cs.Step = RoundStepPrecommit if hash, _, ok := cs.Prevotes.TwoThirdsMajority(); ok { @@ -393,21 +421,21 @@ func (cs *ConsensusState) LockOrUnlock(height uint32, round uint16) []byte { cs.LockedPOL = cs.Prevotes.MakePOL() if len(hash) == 0 { - // +2/3 voted nil. Just unlock. + // +2/3 prevoted nil. Just unlock. cs.LockedBlock = nil return nil } else if cs.ProposalBlock.HashesTo(hash) { - // +2/3 voted for proposal block + // +2/3 prevoted for proposal block // Validate the block. // See note on ZombieValidators to see why. - if cs.stageBlock(cs.ProposalBlock) != nil { - log.Warning("+2/3 voted for an invalid block.") + if err := cs.stageBlock(cs.ProposalBlock); err != nil { + log.Warning("+2/3 prevoted for an invalid block: %v", err) return nil } cs.LockedBlock = cs.ProposalBlock return hash } else if cs.LockedBlock.HashesTo(hash) { - // +2/3 voted for already locked block + // +2/3 prevoted for already locked block // cs.LockedBlock = cs.LockedBlock return hash } else { @@ -425,14 +453,14 @@ func (cs *ConsensusState) LockOrUnlock(height uint32, round uint16) []byte { // If successful, saves the block and state and resets mempool, // and returns the committed block. // Commit is not finalized until FinalizeCommit() is called. -// This allows us to stay at this height and gather more commit votes. -func (cs *ConsensusState) TryCommit(height uint32, round uint16) *Block { +// This allows us to stay at this height and gather more commits. +func (cs *ConsensusState) RunActionCommit(height uint32, round uint16) []byte { cs.mtx.Lock() defer cs.mtx.Unlock() - if cs.Height != height || cs.Round != round { return nil } + cs.Step = RoundStepCommit if hash, commitTime, ok := cs.Precommits.TwoThirdsMajority(); ok { @@ -468,7 +496,7 @@ func (cs *ConsensusState) TryCommit(height uint32, round uint16) *Block { // Update mempool. cs.mempool.ResetForBlockAndState(block, cs.stagedState) - return block + return block.Hash() } return nil @@ -476,7 +504,13 @@ func (cs *ConsensusState) TryCommit(height uint32, round uint16) *Block { // After TryCommit(), if successful, must call this in order to // update the RoundState. -func (cs *ConsensusState) FinalizeCommit() { +func (cs *ConsensusState) RunActionFinalize(height uint32, round uint16) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + if cs.Height != height || cs.Round != round { + return + } + // What was staged becomes committed. cs.updateToState(cs.stagedState) } diff --git a/consensus/state_test.go b/consensus/state_test.go index 5e97b2d18..7f505a705 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -58,7 +58,7 @@ func TestSetupRound(t *testing.T) { // Add a vote, precommit, and commit by val0. voteTypes := []byte{VoteTypePrevote, VoteTypePrecommit, VoteTypeCommit} for _, voteType := range voteTypes { - vote := &Vote{Height: 0, Round: 0, Type: voteType} // nil vote + vote := &Vote{Height: 1, Round: 0, Type: voteType} // nil vote privAccounts[0].Sign(vote) cs.AddVote(vote) } @@ -66,13 +66,13 @@ func TestSetupRound(t *testing.T) { // Ensure that vote appears in RoundState. rs0 := cs.GetRoundState() if vote := rs0.Prevotes.Get(0); vote == nil || vote.Type != VoteTypePrevote { - t.Errorf("Expected to find prevote %v, not there", vote) + t.Errorf("Expected to find prevote but got %v", vote) } if vote := rs0.Precommits.Get(0); vote == nil || vote.Type != VoteTypePrecommit { - t.Errorf("Expected to find precommit %v, not there", vote) + t.Errorf("Expected to find precommit but got %v", vote) } if vote := rs0.Commits.Get(0); vote == nil || vote.Type != VoteTypeCommit { - t.Errorf("Expected to find commit %v, not there", vote) + t.Errorf("Expected to find commit but got %v", vote) } // Setup round 1 (next round) @@ -81,13 +81,13 @@ func TestSetupRound(t *testing.T) { // Now the commit should be copied over to prevotes and precommits. rs1 := cs.GetRoundState() if vote := rs1.Prevotes.Get(0); vote == nil || vote.Type != VoteTypeCommit { - t.Errorf("Expected to find commit %v, not there", vote) + t.Errorf("Expected to find commit but got %v", vote) } if vote := rs1.Precommits.Get(0); vote == nil || vote.Type != VoteTypeCommit { - t.Errorf("Expected to find commit %v, not there", vote) + t.Errorf("Expected to find commit but got %v", vote) } if vote := rs1.Commits.Get(0); vote == nil || vote.Type != VoteTypeCommit { - t.Errorf("Expected to find commit %v, not there", vote) + t.Errorf("Expected to find commit but got %v", vote) } // Setup round 1 (should fail) @@ -102,22 +102,125 @@ func TestSetupRound(t *testing.T) { } -func TestMakeProposalNoPrivValidator(t *testing.T) { +func TestRunActionProposeNoPrivValidator(t *testing.T) { cs, _ := makeConsensusState() - cs.MakeProposal() + cs.RunActionPropose(1, 0) rs := cs.GetRoundState() if rs.Proposal != nil { t.Error("Expected to make no proposal, since no privValidator") } } -func TestMakeProposalEmptyMempool(t *testing.T) { +func TestRunActionPropose(t *testing.T) { cs, privAccounts := makeConsensusState() priv := NewPrivValidator(privAccounts[0], db_.NewMemDB()) cs.SetPrivValidator(priv) - cs.MakeProposal() + cs.RunActionPropose(1, 0) rs := cs.GetRoundState() - t.Log(rs.Proposal) + // Check that Proposal, ProposalBlock, ProposalBlockPartSet are set. + if rs.Proposal == nil { + t.Error("rs.Proposal should be set") + } + if rs.ProposalBlock == nil { + t.Error("rs.ProposalBlock should be set") + } + if rs.ProposalBlockPartSet.Total() == 0 { + t.Error("rs.ProposalBlockPartSet should be set") + } +} + +func checkRoundState(t *testing.T, cs *ConsensusState, + height uint32, round uint16, step RoundStep) { + rs := cs.GetRoundState() + if rs.Height != height { + t.Errorf("cs.RoundState.Height should be %v, got %v", height, rs.Height) + } + if rs.Round != round { + t.Errorf("cs.RoundState.Round should be %v, got %v", round, rs.Round) + } + if rs.Step != step { + t.Errorf("cs.RoundState.Step should be %v, got %v", step, rs.Step) + } +} + +func TestRunActionPrecommit(t *testing.T) { + cs, privAccounts := makeConsensusState() + priv := NewPrivValidator(privAccounts[0], db_.NewMemDB()) + cs.SetPrivValidator(priv) + + blockHash := cs.RunActionPrecommit(1, 0) + if blockHash != nil { + t.Errorf("RunActionPrecommit should fail without a proposal") + } + + cs.RunActionPropose(1, 0) + + // Test RunActionPrecommit failures: + blockHash = cs.RunActionPrecommit(1, 1) + if blockHash != nil { + t.Errorf("RunActionPrecommit should fail for wrong round") + } + blockHash = cs.RunActionPrecommit(2, 0) + if blockHash != nil { + t.Errorf("RunActionPrecommit should fail for wrong height") + } + blockHash = cs.RunActionPrecommit(1, 0) + if blockHash != nil { + t.Errorf("RunActionPrecommit should fail, not enough prevotes") + } + + // Add at least +2/3 prevotes. + for i := 0; i < 7; i++ { + vote := &Vote{ + Height: 1, + Round: uint16(i), + Type: VoteTypePrevote, + BlockHash: cs.ProposalBlock.Hash(), + } + privAccounts[i].Sign(vote) + cs.AddVote(vote) + } + + // Test RunActionPrecommit success: + blockHash = cs.RunActionPrecommit(1, 0) + if len(blockHash) == 0 { + t.Errorf("RunActionPrecommit should have succeeded") + } + checkRoundState(t, cs, 1, 0, RoundStepPrecommit) + + // Test RunActionCommit failures: + blockHash = cs.RunActionCommit(1, 1) + if blockHash != nil { + t.Errorf("RunActionCommit should fail for wrong round") + } + blockHash = cs.RunActionCommit(2, 0) + if blockHash != nil { + t.Errorf("RunActionCommit should fail for wrong height") + } + blockHash = cs.RunActionCommit(1, 0) + if blockHash != nil { + t.Errorf("RunActionCommit should fail, not enough commits") + } + + // Add at least +2/3 precommits. + for i := 0; i < 7; i++ { + vote := &Vote{ + Height: 1, + Round: uint16(i), + Type: VoteTypePrecommit, + BlockHash: cs.ProposalBlock.Hash(), + } + privAccounts[i].Sign(vote) + cs.AddVote(vote) + } + + // Test RunActionCommit success: + blockHash = cs.RunActionCommit(1, 0) + if len(blockHash) == 0 { + t.Errorf("RunActionCommit should have succeeded") + } + checkRoundState(t, cs, 1, 0, RoundStepCommit) + } diff --git a/state/validator_set.go b/state/validator_set.go index ef7dac98b..ae9e0fd17 100644 --- a/state/validator_set.go +++ b/state/validator_set.go @@ -127,6 +127,10 @@ func (vset *ValidatorSet) Iterate(fn func(val *Validator) bool) { }) } +func (vset *ValidatorSet) String() string { + return vset.StringWithIndent("") +} + func (vset *ValidatorSet) StringWithIndent(indent string) string { valStrings := []string{} vset.Iterate(func(val *Validator) bool {