You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

601 lines
17 KiB

package consensus
import (
"errors"
"fmt"
"sync"
"time"
. "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/blocks"
. "github.com/tendermint/tendermint/common"
. "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/state"
)
type RoundStep uint8
type RoundActionType uint8
const (
RoundStepStart = RoundStep(0x00) // Round started.
RoundStepPropose = RoundStep(0x01) // Did propose, gossip proposal.
RoundStepPrevote = RoundStep(0x02) // Did prevote, gossip prevotes.
RoundStepPrecommit = RoundStep(0x03) // Did precommit, gossip precommits.
RoundStepCommit = RoundStep(0x10) // Did commit, gossip commits.
RoundStepCommitWait = RoundStep(0x11) // Found +2/3 commits, wait more.
// If a block could not be committed at a given round,
// we progress to the next round, skipping RoundStepCommit.
//
// If a block was committed, we goto RoundStepCommit,
// then wait "finalizeDuration" to gather more commits,
// then we progress to the next height at round 0.
// TODO: document how RoundStepCommit transcends all rounds.
RoundActionPropose = RoundActionType(0x00) // Goto RoundStepPropose
RoundActionPrevote = RoundActionType(0x01) // Goto RoundStepPrevote
RoundActionPrecommit = RoundActionType(0x02) // Goto RoundStepPrecommit
RoundActionTryCommit = RoundActionType(0x10) // Goto RoundStepCommit or RoundStepStart next round
RoundActionCommitWait = RoundActionType(0x11) // Goto RoundStepCommitWait
RoundActionFinalize = RoundActionType(0x12) // Goto RoundStepStart next height
)
var (
ErrInvalidProposalSignature = errors.New("Error invalid proposal signature")
consensusStateKey = []byte("consensusState")
)
// Immutable when returned from ConsensusState.GetRoundState()
type RoundState struct {
Height uint32 // Height we are working on
Round uint16
Step RoundStep
StartTime time.Time
CommitTime time.Time // Time when +2/3 commits were found
Validators *state.ValidatorSet
Proposal *Proposal
ProposalBlock *Block
ProposalBlockPartSet *PartSet
ProposalPOL *POL
ProposalPOLPartSet *PartSet
LockedBlock *Block
LockedBlockPartSet *PartSet
LockedPOL *POL // Rarely needed, so no LockedPOLPartSet.
Prevotes *VoteSet
Precommits *VoteSet
Commits *VoteSet
LastCommits *VoteSet
PrivValidator *PrivValidator
}
func (rs *RoundState) String() string {
return rs.StringWithIndent("")
}
func (rs *RoundState) StringWithIndent(indent string) string {
return fmt.Sprintf(`RoundState{
%s H:%v R:%v S:%v
%s StartTime: %v
%s CommitTime: %v
%s Validators: %v
%s Proposal: %v
%s ProposalBlock: %v %v
%s ProposalPOL: %v %v
%s LockedBlock: %v %v
%s LockedPOL: %v
%s Prevotes: %v
%s Precommits: %v
%s Commits: %v
%s LastCommits: %v
%s}`,
indent, rs.Height, rs.Round, rs.Step,
indent, rs.StartTime,
indent, rs.CommitTime,
indent, rs.Validators.StringWithIndent(indent+" "),
indent, rs.Proposal,
indent, rs.ProposalBlockPartSet.Description(), rs.ProposalBlock.Description(),
indent, rs.ProposalPOLPartSet.Description(), rs.ProposalPOL.Description(),
indent, rs.LockedBlockPartSet.Description(), rs.LockedBlock.Description(),
indent, rs.LockedPOL.Description(),
indent, rs.Prevotes.StringWithIndent(indent+" "),
indent, rs.Precommits.StringWithIndent(indent+" "),
indent, rs.Commits.StringWithIndent(indent+" "),
indent, rs.LastCommits.Description(),
indent)
}
func (rs *RoundState) Description() string {
return fmt.Sprintf(`RS{%v/%v/%X %v}`,
rs.Height, rs.Round, rs.Step, rs.StartTime)
}
//-------------------------------------
// Tracks consensus state across block heights and rounds.
type ConsensusState struct {
blockStore *BlockStore
mempool *mempool.Mempool
mtx sync.Mutex
RoundState
state *state.State // State until height-1.
stagedBlock *Block // Cache last staged block.
stagedState *state.State // Cache result of staged block.
}
func NewConsensusState(state *state.State, blockStore *BlockStore, mempool *mempool.Mempool) *ConsensusState {
cs := &ConsensusState{
blockStore: blockStore,
mempool: mempool,
}
cs.updateToState(state)
return cs
}
func (cs *ConsensusState) GetRoundState() *RoundState {
cs.mtx.Lock()
defer cs.mtx.Unlock()
rs := cs.RoundState // copy
return &rs
}
func (cs *ConsensusState) updateToState(state *state.State) {
// Sanity check state.
if cs.Height > 0 && cs.Height != state.Height {
Panicf("updateToState() expected state height of %v but found %v",
cs.Height, state.Height)
}
// Reset fields based on state.
validators := state.BondedValidators
height := state.Height + 1 // next desired block height
cs.Height = height
cs.Round = 0
cs.Step = RoundStepStart
if cs.CommitTime.IsZero() {
cs.StartTime = state.BlockTime.Add(finalizeDuration)
} else {
cs.StartTime = cs.CommitTime.Add(finalizeDuration)
}
cs.CommitTime = time.Time{}
cs.Validators = validators
cs.Proposal = nil
cs.ProposalBlock = nil
cs.ProposalBlockPartSet = nil
cs.ProposalPOL = nil
cs.ProposalPOLPartSet = nil
cs.LockedBlock = nil
cs.LockedBlockPartSet = nil
cs.LockedPOL = nil
cs.Prevotes = NewVoteSet(height, 0, VoteTypePrevote, validators)
cs.Precommits = NewVoteSet(height, 0, VoteTypePrecommit, validators)
cs.LastCommits = cs.Commits
cs.Commits = NewVoteSet(height, 0, VoteTypeCommit, validators)
cs.state = state
cs.stagedBlock = nil
cs.stagedState = nil
// Update the round if we need to.
round := calcRound(cs.StartTime)
if round > 0 {
cs.setupRound(round)
}
}
func (cs *ConsensusState) SetupRound(round uint16) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Round >= round {
Panicf("ConsensusState round %v not lower than desired round %v", cs.Round, round)
}
cs.setupRound(round)
}
func (cs *ConsensusState) setupRound(round uint16) {
// Increment all the way to round.
validators := cs.Validators.Copy()
for r := cs.Round; r < round; r++ {
validators.IncrementAccum()
}
cs.Round = round
cs.Step = RoundStepStart
cs.Validators = validators
cs.Proposal = nil
cs.ProposalBlock = nil
cs.ProposalBlockPartSet = nil
cs.ProposalPOL = nil
cs.ProposalPOLPartSet = nil
cs.Prevotes = NewVoteSet(cs.Height, round, VoteTypePrevote, validators)
cs.Prevotes.AddFromCommits(cs.Commits)
cs.Precommits = NewVoteSet(cs.Height, round, VoteTypePrecommit, validators)
cs.Precommits.AddFromCommits(cs.Commits)
}
func (cs *ConsensusState) SetPrivValidator(priv *PrivValidator) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.PrivValidator = priv
}
//-----------------------------------------------------------------------------
func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
return
}
cs.Step = RoundStepPropose
if cs.PrivValidator == nil || cs.Validators.Proposer().Id != cs.PrivValidator.Id {
return
}
var block *Block
var blockPartSet *PartSet
var pol *POL
var polPartSet *PartSet
// Decide on block and POL
if cs.LockedBlock != nil {
// If we're locked onto a block, just choose that.
block = cs.LockedBlock
blockPartSet = cs.LockedBlockPartSet
pol = cs.LockedPOL
} else {
var validation Validation
if cs.Height == 1 {
// We're creating a proposal for the first block.
// The validation is empty.
} else {
// We need to create a proposal.
// If we don't have enough commits from the last height,
// we can't do anything.
if !cs.LastCommits.HasTwoThirdsMajority() {
return
} else {
validation = cs.LastCommits.MakeValidation()
}
}
txs, state := cs.mempool.GetProposalTxs() // TODO: cache state
block = &Block{
Header: Header{
Network: Config.Network,
Height: cs.Height,
Time: time.Now(),
LastBlockHash: cs.state.BlockHash,
StateHash: state.Hash(),
},
Validation: validation,
Data: Data{
Txs: txs,
},
}
blockPartSet = NewPartSetFromData(BinaryBytes(block))
pol = cs.LockedPOL // If exists, is a PoUnlock.
}
if pol != nil {
polPartSet = NewPartSetFromData(BinaryBytes(pol))
}
// Make proposal
proposal := NewProposal(cs.Height, cs.Round,
blockPartSet.Total(), blockPartSet.RootHash(),
polPartSet.Total(), polPartSet.RootHash())
cs.PrivValidator.Sign(proposal)
// Set fields
cs.Proposal = proposal
cs.ProposalBlock = block
cs.ProposalBlockPartSet = blockPartSet
cs.ProposalPOL = pol
cs.ProposalPOLPartSet = polPartSet
}
func (cs *ConsensusState) RunActionPrevote(height uint32, round uint16) *Vote {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
Panicf("RunActionPrevote(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round)
}
cs.Step = RoundStepPrevote
// If a block is locked, prevote that.
if cs.LockedBlock != nil {
return cs.signAddVote(VoteTypePrevote, cs.LockedBlock.Hash())
}
// If ProposalBlock is nil, prevote nil.
if cs.ProposalBlock == nil {
return nil
}
// Try staging proposed block.
err := cs.stageBlock(cs.ProposalBlock)
if err != nil {
// Prevote nil.
return nil
} else {
// Prevote block.
return cs.signAddVote(VoteTypePrevote, cs.ProposalBlock.Hash())
}
}
// Lock the ProposalBlock if we have enough prevotes for it,
// or unlock an existing lock if +2/3 of prevotes were nil.
// Returns a blockhash if a block was locked.
func (cs *ConsensusState) RunActionPrecommit(height uint32, round uint16) *Vote {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
Panicf("RunActionPrecommit(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round)
}
cs.Step = RoundStepPrecommit
if hash, ok := cs.Prevotes.TwoThirdsMajority(); ok {
// Remember this POL. (hash may be nil)
cs.LockedPOL = cs.Prevotes.MakePOL()
if len(hash) == 0 {
// +2/3 prevoted nil. Just unlock.
cs.LockedBlock = nil
cs.LockedBlockPartSet = nil
return nil
} else if cs.ProposalBlock.HashesTo(hash) {
// +2/3 prevoted for proposal block
// Validate the block.
// See note on ZombieValidators to see why.
if err := cs.stageBlock(cs.ProposalBlock); err != nil {
log.Warning("+2/3 prevoted for an invalid block: %v", err)
return nil
}
cs.LockedBlock = cs.ProposalBlock
cs.LockedBlockPartSet = cs.ProposalBlockPartSet
return cs.signAddVote(VoteTypePrecommit, hash)
} else if cs.LockedBlock.HashesTo(hash) {
// +2/3 prevoted for already locked block
return cs.signAddVote(VoteTypePrecommit, hash)
} else {
// We don't have the block that hashes to hash.
// Unlock if we're locked.
cs.LockedBlock = nil
cs.LockedBlockPartSet = nil
return nil
}
} else {
return nil
}
}
// Commits a block if we have enough precommits (and we have the block).
// If successful, saves the block and state and resets mempool,
// and returns the committed block.
// Commit is not finalized until FinalizeCommit() is called.
// This allows us to stay at this height and gather more commits.
func (cs *ConsensusState) RunActionCommit(height uint32, round uint16) *Vote {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
Panicf("RunActionCommit(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round)
}
cs.Step = RoundStepCommit
if hash, ok := cs.Precommits.TwoThirdsMajority(); ok {
// There are some strange cases that shouldn't happen
// (unless voters are duplicitous).
// For example, the hash may not be the one that was
// proposed this round. These cases should be identified
// and warn the administrator. We should err on the side of
// caution and not, for example, sign a block.
// TODO: Identify these strange cases.
var block *Block
var blockPartSet *PartSet
if cs.LockedBlock.HashesTo(hash) {
block = cs.LockedBlock
blockPartSet = cs.LockedBlockPartSet
} else if cs.ProposalBlock.HashesTo(hash) {
block = cs.ProposalBlock
blockPartSet = cs.ProposalBlockPartSet
} else {
return nil
}
// The proposal must be valid.
if err := cs.stageBlock(block); err != nil {
log.Warning("Network is commiting an invalid proposal? %v", err)
return nil
}
// Keep block in cs.Proposal*
if !cs.ProposalBlock.HashesTo(hash) {
cs.ProposalBlock = block
cs.ProposalBlockPartSet = blockPartSet
}
// Save to blockStore
cs.blockStore.SaveBlock(block)
// Save the state
cs.stagedState.Save()
// Update mempool.
cs.mempool.ResetForBlockAndState(block, cs.stagedState)
return cs.signAddVote(VoteTypeCommit, block.Hash())
}
return nil
}
func (cs *ConsensusState) RunActionCommitWait(height uint32, round uint16) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
Panicf("RunActionCommitWait(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round)
}
cs.Step = RoundStepCommitWait
if cs.Commits.HasTwoThirdsMajority() {
cs.CommitTime = time.Now()
} else {
panic("RunActionCommitWait() expects +2/3 commits")
}
}
func (cs *ConsensusState) RunActionFinalize(height uint32, round uint16) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
Panicf("RunActionFinalize(%v/%v), expected %v/%v", height, round, cs.Height, cs.Round)
}
// What was staged becomes committed.
cs.updateToState(cs.stagedState)
}
//-----------------------------------------------------------------------------
func (cs *ConsensusState) SetProposal(proposal *Proposal) error {
cs.mtx.Lock()
defer cs.mtx.Unlock()
// Already have one
if cs.Proposal != nil {
return nil
}
// Invalid.
if proposal.Height != cs.Height || proposal.Round != cs.Round {
return nil
}
// Verify signature
if !cs.Validators.Proposer().Verify(proposal) {
return ErrInvalidProposalSignature
}
cs.Proposal = proposal
cs.ProposalBlockPartSet = NewPartSetFromMetadata(proposal.BlockPartsTotal, proposal.BlockPartsHash)
cs.ProposalPOLPartSet = NewPartSetFromMetadata(proposal.POLPartsTotal, proposal.POLPartsHash)
return nil
}
// NOTE: block is not necessarily valid.
func (cs *ConsensusState) AddProposalBlockPart(height uint32, round uint16, part *Part) (added bool, err error) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
// Blocks might be reused, so round mismatch is OK
if cs.Height != height {
return false, nil
}
// We're not expecting a block part.
if cs.ProposalBlockPartSet != nil {
return false, nil // TODO: bad peer? Return error?
}
added, err = cs.ProposalBlockPartSet.AddPart(part)
if err != nil {
return added, err
}
if added && cs.ProposalBlockPartSet.IsComplete() {
var n int64
var err error
cs.ProposalBlock = ReadBlock(cs.ProposalBlockPartSet.GetReader(), &n, &err)
return true, err
}
return true, nil
}
// NOTE: POL is not necessarily valid.
func (cs *ConsensusState) AddProposalPOLPart(height uint32, round uint16, part *Part) (added bool, err error) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || cs.Round != round {
return false, nil
}
// We're not expecting a POL part.
if cs.ProposalPOLPartSet != nil {
return false, nil // TODO: bad peer? Return error?
}
added, err = cs.ProposalPOLPartSet.AddPart(part)
if err != nil {
return added, err
}
if added && cs.ProposalPOLPartSet.IsComplete() {
var n int64
var err error
cs.ProposalPOL = ReadPOL(cs.ProposalPOLPartSet.GetReader(), &n, &err)
return true, err
}
return true, nil
}
func (cs *ConsensusState) AddVote(vote *Vote) (added bool, err error) {
switch vote.Type {
case VoteTypePrevote:
// Prevotes checks for height+round match.
return cs.Prevotes.Add(vote)
case VoteTypePrecommit:
// Precommits checks for height+round match.
return cs.Precommits.Add(vote)
case VoteTypeCommit:
// Commits checks for height match.
cs.Prevotes.Add(vote)
cs.Precommits.Add(vote)
return cs.Commits.Add(vote)
default:
panic("Unknown vote type")
}
}
func (cs *ConsensusState) stageBlock(block *Block) error {
if block == nil {
panic("Cannot stage nil block")
}
// Already staged?
if cs.stagedBlock == block {
return nil
}
// Create a copy of the state for staging
stateCopy := cs.state.Copy()
// Commit block onto the copied state.
// NOTE: Basic validation is done in state.AppendBlock().
err := stateCopy.AppendBlock(block, true)
if err != nil {
return err
} else {
cs.stagedBlock = block
cs.stagedState = stateCopy
return nil
}
}
func (cs *ConsensusState) signAddVote(type_ byte, hash []byte) *Vote {
if cs.PrivValidator == nil || !cs.Validators.HasId(cs.PrivValidator.Id) {
return nil
}
vote := &Vote{
Height: cs.Height,
Round: cs.Round,
Type: type_,
BlockHash: hash,
}
cs.PrivValidator.Sign(vote)
cs.AddVote(vote)
return vote
}