Browse Source

RunAction* directly on ConsensusState.

pull/9/head
Jae Kwon 10 years ago
parent
commit
5ffe406f16
6 changed files with 235 additions and 148 deletions
  1. +37
    -96
      consensus/consensus.go
  2. +3
    -0
      consensus/part_set.go
  3. +3
    -1
      consensus/proposal.go
  4. +73
    -39
      consensus/state.go
  5. +115
    -12
      consensus/state_test.go
  6. +4
    -0
      state/validator_set.go

+ 37
- 96
consensus/consensus.go View File

@ -296,8 +296,8 @@ func (conR *ConsensusReactor) stepTransitionRoutine() {
case RoundStepPropose:
// Wake up when it's time to vote.
time.Sleep(time.Duration(roundDeadlinePrevote-elapsedRatio) * roundDuration)
conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionVote}
case RoundStepVote:
conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrevote}
case RoundStepPrevote:
// Wake up when it's time to precommit.
time.Sleep(time.Duration(roundDeadlinePrecommit-elapsedRatio) * roundDuration)
conR.doActionCh <- RoundAction{rs.Height, rs.Round, RoundActionPrecommit}
@ -323,8 +323,7 @@ func (conR *ConsensusReactor) stepTransitionRoutine() {
round := roundAction.Round
action := roundAction.Action
rs := conR.conS.GetRoundState()
setStepAndBroadcast := func(step RoundStep) {
conR.conS.SetStep(step)
broadcastNewRoundStep := func(step RoundStep) {
// Broadcast NewRoundStepMessage
msg := &NewRoundStepMessage{
Height: height,
@ -344,24 +343,43 @@ func (conR *ConsensusReactor) stepTransitionRoutine() {
// Run step
if action == RoundActionPropose && rs.Step == RoundStepStart {
conR.runStepPropose(rs)
setStepAndBroadcast(RoundStepPropose)
} else if action == RoundActionVote && rs.Step <= RoundStepPropose {
conR.runStepPrevote(rs)
setStepAndBroadcast(RoundStepVote)
} else if action == RoundActionPrecommit && rs.Step <= RoundStepVote {
conR.runStepPrecommit(rs)
setStepAndBroadcast(RoundStepPrecommit)
conR.conS.RunActionPropose(rs.Height, rs.Round)
broadcastNewRoundStep(RoundStepPropose)
} else if action == RoundActionPrevote && rs.Step <= RoundStepPropose {
hash := conR.conS.RunActionPrevote(rs.Height, rs.Round)
broadcastNewRoundStep(RoundStepPrevote)
conR.signAndBroadcastVote(rs, &Vote{
Height: rs.Height,
Round: rs.Round,
Type: VoteTypePrevote,
BlockHash: hash,
})
} else if action == RoundActionPrecommit && rs.Step <= RoundStepPrevote {
hash := conR.conS.RunActionPrecommit(rs.Height, rs.Round)
broadcastNewRoundStep(RoundStepPrecommit)
if len(hash) > 0 {
conR.signAndBroadcastVote(rs, &Vote{
Height: rs.Height,
Round: rs.Round,
Type: VoteTypePrecommit,
BlockHash: hash,
})
}
} else if action == RoundActionCommit && rs.Step <= RoundStepPrecommit {
committed := conR.runStepCommit(rs)
if committed {
setStepAndBroadcast(RoundStepCommit)
hash := conR.conS.RunActionCommit(rs.Height, rs.Round)
if len(hash) > 0 {
broadcastNewRoundStep(RoundStepCommit)
conR.signAndBroadcastVote(rs, &Vote{
Height: rs.Height,
Round: rs.Round,
Type: VoteTypeCommit,
BlockHash: hash,
})
} else {
// runStepCommit() already set the round to the next round,
// so the step is already RoundStepStart (same height).
conR.conS.SetupRound(rs.Round + 1)
}
} else if action == RoundActionFinalize && rs.Step == RoundStepCommit {
conR.runStepFinalize(rs)
conR.conS.RunActionFinalize(rs.Height, rs.Round)
// Height has been incremented, step is now RoundStepStart.
} else {
// This shouldn't happen now, but if an external source pushes
@ -454,7 +472,7 @@ OUTER_LOOP:
}
// If there are prevotes to send...
if prs.Step <= RoundStepVote {
if prs.Step <= RoundStepPrevote {
index, ok := rs.Prevotes.BitArray().Sub(prs.Prevotes).PickRandom()
if ok {
valId, val := rs.Validators.GetByIndex(uint32(index))
@ -518,83 +536,6 @@ func (conR *ConsensusReactor) signAndBroadcastVote(rs *RoundState, vote *Vote) {
}
}
//-------------------------------------
func (conR *ConsensusReactor) runStepPropose(rs *RoundState) {
conR.conS.MakeProposal()
}
func (conR *ConsensusReactor) runStepPrevote(rs *RoundState) {
// If we have a locked block, we must vote for that.
// NOTE: a locked block is already valid.
if rs.LockedBlock != nil {
conR.signAndBroadcastVote(rs, &Vote{
Height: rs.Height,
Round: rs.Round,
Type: VoteTypePrevote,
BlockHash: rs.LockedBlock.Hash(),
})
}
// Try staging proposed block.
// If Block is nil, an error is returned.
err := conR.conS.stageBlock(rs.ProposalBlock)
if err != nil {
// Prevote nil
conR.signAndBroadcastVote(rs, &Vote{
Height: rs.Height,
Round: rs.Round,
Type: VoteTypePrevote,
BlockHash: nil,
})
} else {
// Prevote block
conR.signAndBroadcastVote(rs, &Vote{
Height: rs.Height,
Round: rs.Round,
Type: VoteTypePrevote,
BlockHash: rs.ProposalBlock.Hash(),
})
}
}
func (conR *ConsensusReactor) runStepPrecommit(rs *RoundState) {
// If we see a 2/3 majority of votes for a block, lock.
hash := conR.conS.LockOrUnlock(rs.Height, rs.Round)
if len(hash) > 0 {
// Precommit block
conR.signAndBroadcastVote(rs, &Vote{
Height: rs.Height,
Round: rs.Round,
Type: VoteTypePrecommit,
BlockHash: hash,
})
}
}
func (conR *ConsensusReactor) runStepCommit(rs *RoundState) bool {
// If we see a 2/3 majority of votes for a block, lock.
block := conR.conS.TryCommit(rs.Height, rs.Round)
if block == nil {
// Couldn't commit, try next round.
conR.conS.SetupRound(rs.Round + 1)
return false
} else {
// Commit block.
conR.signAndBroadcastVote(rs, &Vote{
Height: rs.Height,
Round: rs.Round,
Type: VoteTypePrecommit,
BlockHash: block.Hash(),
})
return true
}
}
func (conR *ConsensusReactor) runStepFinalize(rs *RoundState) {
// This actually updates the height and sets up round 0.
conR.conS.FinalizeCommit()
}
//-----------------------------------------------------------------------------
// Read only when returned by PeerState.GetRoundState().


+ 3
- 0
consensus/part_set.go View File

@ -141,6 +141,9 @@ func (ps *PartSet) BitArray() BitArray {
}
func (ps *PartSet) RootHash() []byte {
if ps == nil {
return nil
}
return ps.rootHash
}


+ 3
- 1
consensus/proposal.go View File

@ -23,8 +23,10 @@ type Proposal struct {
Signature
}
func NewProposal(height uint32, round uint16, blockPartsTotal uint16, blockPartsHash []byte,
func NewProposal(height uint32, round uint16,
blockPartsTotal uint16, blockPartsHash []byte,
polPartsTotal uint16, polPartsHash []byte) *Proposal {
return &Proposal{
Height: height,
Round: round,


+ 73
- 39
consensus/state.go View File

@ -20,7 +20,7 @@ type RoundActionType uint8
const (
RoundStepStart = RoundStep(0x00) // Round started.
RoundStepPropose = RoundStep(0x01) // Did propose, gossip proposal.
RoundStepVote = RoundStep(0x02) // Did vote, gossip votes.
RoundStepPrevote = RoundStep(0x02) // Did prevote, gossip prevotes.
RoundStepPrecommit = RoundStep(0x03) // Did precommit, gossip precommits.
RoundStepCommit = RoundStep(0x04) // Did commit, gossip commits.
@ -28,11 +28,11 @@ const (
// we progress to the next round, skipping RoundStepCommit.
//
// If a block was committed, we goto RoundStepCommit,
// then wait "finalizeDuration" to gather more commit votes,
// then wait "finalizeDuration" to gather more commits,
// then we progress to the next height at round 0.
RoundActionPropose = RoundActionType(0x00) // Goto RoundStepPropose
RoundActionVote = RoundActionType(0x01) // Goto RoundStepVote
RoundActionPrevote = RoundActionType(0x01) // Goto RoundStepPrevote
RoundActionPrecommit = RoundActionType(0x02) // Goto RoundStepPrecommit
RoundActionCommit = RoundActionType(0x03) // Goto RoundStepCommit or RoundStepStart next round
RoundActionFinalize = RoundActionType(0x04) // Goto RoundStepStart next height
@ -135,14 +135,14 @@ func (cs *ConsensusState) GetRoundState() *RoundState {
func (cs *ConsensusState) updateToState(state *state.State) {
// Sanity check state.
stateHeight := state.Height
if stateHeight > 0 && stateHeight != cs.Height+1 {
Panicf("updateToState() expected state height of %v but found %v", cs.Height+1, stateHeight)
if cs.Height > 0 && cs.Height != state.Height {
Panicf("updateToState() expected state height of %v but found %v",
cs.Height, state.Height)
}
// Reset fields based on state.
height := state.Height
validators := state.BondedValidators
height := state.Height + 1 // next desired block height
cs.Height = height
cs.Round = 0
cs.Step = RoundStepStart
@ -202,16 +202,6 @@ func (cs *ConsensusState) setupRound(round uint16) {
cs.Precommits.AddFromCommits(cs.Commits)
}
func (cs *ConsensusState) SetStep(step RoundStep) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Step < step {
cs.Step = step
} else {
panic("step regression")
}
}
func (cs *ConsensusState) SetPrivValidator(priv *PrivValidator) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
@ -243,9 +233,13 @@ func (cs *ConsensusState) SetProposal(proposal *Proposal) error {
return nil
}
func (cs *ConsensusState) MakeProposal() {
func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
return
}
cs.Step = RoundStepPropose
if cs.PrivValidator == nil || cs.Validators.Proposer().Id != cs.PrivValidator.Id {
return
@ -262,11 +256,19 @@ func (cs *ConsensusState) MakeProposal() {
block = cs.LockedBlock
pol = cs.LockedPOL
} else {
// We need to create a proposal.
// If we don't have enough commits from the last height,
// we can't do anything.
if !cs.LastCommits.HasTwoThirdsMajority() {
return
var validation Validation
if cs.Height == 1 {
// We're creating a proposal for the first block.
// The validation is empty.
} else {
// We need to create a proposal.
// If we don't have enough commits from the last height,
// we can't do anything.
if !cs.LastCommits.HasTwoThirdsMajority() {
return
} else {
validation = cs.LastCommits.MakeValidation()
}
}
txs, state := cs.mempool.GetProposalTxs() // TODO: cache state
block = &Block{
@ -277,7 +279,7 @@ func (cs *ConsensusState) MakeProposal() {
LastBlockHash: cs.state.BlockHash,
StateHash: state.Hash(),
},
Validation: cs.LastCommits.MakeValidation(),
Validation: validation,
Data: Data{
Txs: txs,
},
@ -288,10 +290,13 @@ func (cs *ConsensusState) MakeProposal() {
blockPartSet = NewPartSetFromData(BinaryBytes(block))
if pol != nil {
polPartSet = NewPartSetFromData(BinaryBytes(pol))
} else {
}
// Make proposal
proposal := NewProposal(cs.Height, cs.Round, blockPartSet.Total(), blockPartSet.RootHash(),
proposal := NewProposal(cs.Height, cs.Round,
blockPartSet.Total(), blockPartSet.RootHash(),
polPartSet.Total(), polPartSet.RootHash())
cs.PrivValidator.Sign(proposal)
@ -358,6 +363,29 @@ func (cs *ConsensusState) AddProposalPOLPart(height uint32, round uint16, part *
return true, nil
}
func (cs *ConsensusState) RunActionPrevote(height uint32, round uint16) []byte {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
return nil
}
cs.Step = RoundStepPrevote
// If a block is locked, prevote that.
if cs.LockedBlock != nil {
return cs.LockedBlock.Hash()
}
// Try staging proposed block.
err := cs.stageBlock(cs.ProposalBlock)
if err != nil {
// Prevote nil.
return nil
} else {
// Prevote block.
return cs.ProposalBlock.Hash()
}
}
func (cs *ConsensusState) AddVote(vote *Vote) (added bool, err error) {
switch vote.Type {
case VoteTypePrevote:
@ -376,16 +404,16 @@ func (cs *ConsensusState) AddVote(vote *Vote) (added bool, err error) {
}
}
// Lock the ProposalBlock if we have enough votes for it,
// or unlock an existing lock if +2/3 of votes were nil.
// Lock the ProposalBlock if we have enough prevotes for it,
// or unlock an existing lock if +2/3 of prevotes were nil.
// Returns a blockhash if a block was locked.
func (cs *ConsensusState) LockOrUnlock(height uint32, round uint16) []byte {
func (cs *ConsensusState) RunActionPrecommit(height uint32, round uint16) []byte {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
return nil
}
cs.Step = RoundStepPrecommit
if hash, _, ok := cs.Prevotes.TwoThirdsMajority(); ok {
@ -393,21 +421,21 @@ func (cs *ConsensusState) LockOrUnlock(height uint32, round uint16) []byte {
cs.LockedPOL = cs.Prevotes.MakePOL()
if len(hash) == 0 {
// +2/3 voted nil. Just unlock.
// +2/3 prevoted nil. Just unlock.
cs.LockedBlock = nil
return nil
} else if cs.ProposalBlock.HashesTo(hash) {
// +2/3 voted for proposal block
// +2/3 prevoted for proposal block
// Validate the block.
// See note on ZombieValidators to see why.
if cs.stageBlock(cs.ProposalBlock) != nil {
log.Warning("+2/3 voted for an invalid block.")
if err := cs.stageBlock(cs.ProposalBlock); err != nil {
log.Warning("+2/3 prevoted for an invalid block: %v", err)
return nil
}
cs.LockedBlock = cs.ProposalBlock
return hash
} else if cs.LockedBlock.HashesTo(hash) {
// +2/3 voted for already locked block
// +2/3 prevoted for already locked block
// cs.LockedBlock = cs.LockedBlock
return hash
} else {
@ -425,14 +453,14 @@ func (cs *ConsensusState) LockOrUnlock(height uint32, round uint16) []byte {
// If successful, saves the block and state and resets mempool,
// and returns the committed block.
// Commit is not finalized until FinalizeCommit() is called.
// This allows us to stay at this height and gather more commit votes.
func (cs *ConsensusState) TryCommit(height uint32, round uint16) *Block {
// This allows us to stay at this height and gather more commits.
func (cs *ConsensusState) RunActionCommit(height uint32, round uint16) []byte {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
return nil
}
cs.Step = RoundStepCommit
if hash, commitTime, ok := cs.Precommits.TwoThirdsMajority(); ok {
@ -468,7 +496,7 @@ func (cs *ConsensusState) TryCommit(height uint32, round uint16) *Block {
// Update mempool.
cs.mempool.ResetForBlockAndState(block, cs.stagedState)
return block
return block.Hash()
}
return nil
@ -476,7 +504,13 @@ func (cs *ConsensusState) TryCommit(height uint32, round uint16) *Block {
// After TryCommit(), if successful, must call this in order to
// update the RoundState.
func (cs *ConsensusState) FinalizeCommit() {
func (cs *ConsensusState) RunActionFinalize(height uint32, round uint16) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
return
}
// What was staged becomes committed.
cs.updateToState(cs.stagedState)
}


+ 115
- 12
consensus/state_test.go View File

@ -58,7 +58,7 @@ func TestSetupRound(t *testing.T) {
// Add a vote, precommit, and commit by val0.
voteTypes := []byte{VoteTypePrevote, VoteTypePrecommit, VoteTypeCommit}
for _, voteType := range voteTypes {
vote := &Vote{Height: 0, Round: 0, Type: voteType} // nil vote
vote := &Vote{Height: 1, Round: 0, Type: voteType} // nil vote
privAccounts[0].Sign(vote)
cs.AddVote(vote)
}
@ -66,13 +66,13 @@ func TestSetupRound(t *testing.T) {
// Ensure that vote appears in RoundState.
rs0 := cs.GetRoundState()
if vote := rs0.Prevotes.Get(0); vote == nil || vote.Type != VoteTypePrevote {
t.Errorf("Expected to find prevote %v, not there", vote)
t.Errorf("Expected to find prevote but got %v", vote)
}
if vote := rs0.Precommits.Get(0); vote == nil || vote.Type != VoteTypePrecommit {
t.Errorf("Expected to find precommit %v, not there", vote)
t.Errorf("Expected to find precommit but got %v", vote)
}
if vote := rs0.Commits.Get(0); vote == nil || vote.Type != VoteTypeCommit {
t.Errorf("Expected to find commit %v, not there", vote)
t.Errorf("Expected to find commit but got %v", vote)
}
// Setup round 1 (next round)
@ -81,13 +81,13 @@ func TestSetupRound(t *testing.T) {
// Now the commit should be copied over to prevotes and precommits.
rs1 := cs.GetRoundState()
if vote := rs1.Prevotes.Get(0); vote == nil || vote.Type != VoteTypeCommit {
t.Errorf("Expected to find commit %v, not there", vote)
t.Errorf("Expected to find commit but got %v", vote)
}
if vote := rs1.Precommits.Get(0); vote == nil || vote.Type != VoteTypeCommit {
t.Errorf("Expected to find commit %v, not there", vote)
t.Errorf("Expected to find commit but got %v", vote)
}
if vote := rs1.Commits.Get(0); vote == nil || vote.Type != VoteTypeCommit {
t.Errorf("Expected to find commit %v, not there", vote)
t.Errorf("Expected to find commit but got %v", vote)
}
// Setup round 1 (should fail)
@ -102,22 +102,125 @@ func TestSetupRound(t *testing.T) {
}
func TestMakeProposalNoPrivValidator(t *testing.T) {
func TestRunActionProposeNoPrivValidator(t *testing.T) {
cs, _ := makeConsensusState()
cs.MakeProposal()
cs.RunActionPropose(1, 0)
rs := cs.GetRoundState()
if rs.Proposal != nil {
t.Error("Expected to make no proposal, since no privValidator")
}
}
func TestMakeProposalEmptyMempool(t *testing.T) {
func TestRunActionPropose(t *testing.T) {
cs, privAccounts := makeConsensusState()
priv := NewPrivValidator(privAccounts[0], db_.NewMemDB())
cs.SetPrivValidator(priv)
cs.MakeProposal()
cs.RunActionPropose(1, 0)
rs := cs.GetRoundState()
t.Log(rs.Proposal)
// Check that Proposal, ProposalBlock, ProposalBlockPartSet are set.
if rs.Proposal == nil {
t.Error("rs.Proposal should be set")
}
if rs.ProposalBlock == nil {
t.Error("rs.ProposalBlock should be set")
}
if rs.ProposalBlockPartSet.Total() == 0 {
t.Error("rs.ProposalBlockPartSet should be set")
}
}
func checkRoundState(t *testing.T, cs *ConsensusState,
height uint32, round uint16, step RoundStep) {
rs := cs.GetRoundState()
if rs.Height != height {
t.Errorf("cs.RoundState.Height should be %v, got %v", height, rs.Height)
}
if rs.Round != round {
t.Errorf("cs.RoundState.Round should be %v, got %v", round, rs.Round)
}
if rs.Step != step {
t.Errorf("cs.RoundState.Step should be %v, got %v", step, rs.Step)
}
}
func TestRunActionPrecommit(t *testing.T) {
cs, privAccounts := makeConsensusState()
priv := NewPrivValidator(privAccounts[0], db_.NewMemDB())
cs.SetPrivValidator(priv)
blockHash := cs.RunActionPrecommit(1, 0)
if blockHash != nil {
t.Errorf("RunActionPrecommit should fail without a proposal")
}
cs.RunActionPropose(1, 0)
// Test RunActionPrecommit failures:
blockHash = cs.RunActionPrecommit(1, 1)
if blockHash != nil {
t.Errorf("RunActionPrecommit should fail for wrong round")
}
blockHash = cs.RunActionPrecommit(2, 0)
if blockHash != nil {
t.Errorf("RunActionPrecommit should fail for wrong height")
}
blockHash = cs.RunActionPrecommit(1, 0)
if blockHash != nil {
t.Errorf("RunActionPrecommit should fail, not enough prevotes")
}
// Add at least +2/3 prevotes.
for i := 0; i < 7; i++ {
vote := &Vote{
Height: 1,
Round: uint16(i),
Type: VoteTypePrevote,
BlockHash: cs.ProposalBlock.Hash(),
}
privAccounts[i].Sign(vote)
cs.AddVote(vote)
}
// Test RunActionPrecommit success:
blockHash = cs.RunActionPrecommit(1, 0)
if len(blockHash) == 0 {
t.Errorf("RunActionPrecommit should have succeeded")
}
checkRoundState(t, cs, 1, 0, RoundStepPrecommit)
// Test RunActionCommit failures:
blockHash = cs.RunActionCommit(1, 1)
if blockHash != nil {
t.Errorf("RunActionCommit should fail for wrong round")
}
blockHash = cs.RunActionCommit(2, 0)
if blockHash != nil {
t.Errorf("RunActionCommit should fail for wrong height")
}
blockHash = cs.RunActionCommit(1, 0)
if blockHash != nil {
t.Errorf("RunActionCommit should fail, not enough commits")
}
// Add at least +2/3 precommits.
for i := 0; i < 7; i++ {
vote := &Vote{
Height: 1,
Round: uint16(i),
Type: VoteTypePrecommit,
BlockHash: cs.ProposalBlock.Hash(),
}
privAccounts[i].Sign(vote)
cs.AddVote(vote)
}
// Test RunActionCommit success:
blockHash = cs.RunActionCommit(1, 0)
if len(blockHash) == 0 {
t.Errorf("RunActionCommit should have succeeded")
}
checkRoundState(t, cs, 1, 0, RoundStepCommit)
}

+ 4
- 0
state/validator_set.go View File

@ -127,6 +127,10 @@ func (vset *ValidatorSet) Iterate(fn func(val *Validator) bool) {
})
}
func (vset *ValidatorSet) String() string {
return vset.StringWithIndent("")
}
func (vset *ValidatorSet) StringWithIndent(indent string) string {
valStrings := []string{}
vset.Iterate(func(val *Validator) bool {


Loading…
Cancel
Save