@ -41,8 +41,8 @@ var (
// msgs from the reactor which may update the state
type msgInfo struct {
Msg Consensus Message ` json:"msg" `
PeerID p2p . ID ` json:"peer_key" `
Msg Message ` json:"msg" `
PeerID p2p . ID ` json:"peer_key" `
}
// internally generated messages which may update the state
@ -67,11 +67,11 @@ type evidencePool interface {
AddEvidence ( types . Evidence ) error
}
// Consensus State handles execution of the consensus algorithm.
// State handles execution of the consensus algorithm.
// It processes votes and proposals, and upon reaching agreement,
// commits blocks to the chain and executes them against the application.
// The internal state machine receives input from peers, the internal validator, and from a timer.
type Consensus State struct {
type State struct {
cmn . BaseService
// config details
@ -135,11 +135,11 @@ type ConsensusState struct {
metrics * Metrics
}
// StateOption sets an optional parameter on the Consensus State.
type StateOption func ( * Consensus State)
// StateOption sets an optional parameter on the State.
type StateOption func ( * State )
// NewConsensus State returns a new Consensus State.
func NewConsensus State (
// NewState returns a new State.
func NewState (
config * cfg . ConsensusConfig ,
state sm . State ,
blockExec * sm . BlockExecutor ,
@ -147,8 +147,8 @@ func NewConsensusState(
txNotifier txNotifier ,
evpool evidencePool ,
options ... StateOption ,
) * Consensus State {
cs := & Consensus State{
) * State {
cs := & State {
config : config ,
blockExec : blockExec ,
blockStore : blockStore ,
@ -174,7 +174,7 @@ func NewConsensusState(
// Don't call scheduleRound0 yet.
// We do that upon Start().
cs . reconstructLastCommit ( state )
cs . BaseService = * cmn . NewBaseService ( nil , "Consensus State" , cs )
cs . BaseService = * cmn . NewBaseService ( nil , "State" , cs )
for _ , option := range options {
option ( cs )
}
@ -185,30 +185,30 @@ func NewConsensusState(
// Public interface
// SetLogger implements Service.
func ( cs * Consensus State) SetLogger ( l log . Logger ) {
func ( cs * State ) SetLogger ( l log . Logger ) {
cs . BaseService . Logger = l
cs . timeoutTicker . SetLogger ( l )
}
// SetEventBus sets event bus.
func ( cs * Consensus State) SetEventBus ( b * types . EventBus ) {
func ( cs * State ) SetEventBus ( b * types . EventBus ) {
cs . eventBus = b
cs . blockExec . SetEventBus ( b )
}
// StateMetrics sets the metrics.
func StateMetrics ( metrics * Metrics ) StateOption {
return func ( cs * Consensus State) { cs . metrics = metrics }
return func ( cs * State ) { cs . metrics = metrics }
}
// String returns a string.
func ( cs * Consensus State) String ( ) string {
func ( cs * State ) String ( ) string {
// better not to access shared variables
return fmt . Sprintf ( "ConsensusState" ) //(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step)
}
// GetState returns a copy of the chain state.
func ( cs * Consensus State) GetState ( ) sm . State {
func ( cs * State ) GetState ( ) sm . State {
cs . mtx . RLock ( )
defer cs . mtx . RUnlock ( )
return cs . state . Copy ( )
@ -216,14 +216,14 @@ func (cs *ConsensusState) GetState() sm.State {
// GetLastHeight returns the last height committed.
// If there were no blocks, returns 0.
func ( cs * Consensus State) GetLastHeight ( ) int64 {
func ( cs * State ) GetLastHeight ( ) int64 {
cs . mtx . RLock ( )
defer cs . mtx . RUnlock ( )
return cs . RoundState . Height - 1
}
// GetRoundState returns a shallow copy of the internal consensus state.
func ( cs * Consensus State) GetRoundState ( ) * cstypes . RoundState {
func ( cs * State ) GetRoundState ( ) * cstypes . RoundState {
cs . mtx . RLock ( )
rs := cs . RoundState // copy
cs . mtx . RUnlock ( )
@ -231,42 +231,42 @@ func (cs *ConsensusState) GetRoundState() *cstypes.RoundState {
}
// GetRoundStateJSON returns a json of RoundState, marshalled using go-amino.
func ( cs * Consensus State) GetRoundStateJSON ( ) ( [ ] byte , error ) {
func ( cs * State ) GetRoundStateJSON ( ) ( [ ] byte , error ) {
cs . mtx . RLock ( )
defer cs . mtx . RUnlock ( )
return cdc . MarshalJSON ( cs . RoundState )
}
// GetRoundStateSimpleJSON returns a json of RoundStateSimple, marshalled using go-amino.
func ( cs * Consensus State) GetRoundStateSimpleJSON ( ) ( [ ] byte , error ) {
func ( cs * State ) GetRoundStateSimpleJSON ( ) ( [ ] byte , error ) {
cs . mtx . RLock ( )
defer cs . mtx . RUnlock ( )
return cdc . MarshalJSON ( cs . RoundState . RoundStateSimple ( ) )
}
// GetValidators returns a copy of the current validators.
func ( cs * Consensus State) GetValidators ( ) ( int64 , [ ] * types . Validator ) {
func ( cs * State ) GetValidators ( ) ( int64 , [ ] * types . Validator ) {
cs . mtx . RLock ( )
defer cs . mtx . RUnlock ( )
return cs . state . LastBlockHeight , cs . state . Validators . Copy ( ) . Validators
}
// SetPrivValidator sets the private validator account for signing votes.
func ( cs * Consensus State) SetPrivValidator ( priv types . PrivValidator ) {
func ( cs * State ) SetPrivValidator ( priv types . PrivValidator ) {
cs . mtx . Lock ( )
cs . privValidator = priv
cs . mtx . Unlock ( )
}
// SetTimeoutTicker sets the local timer. It may be useful to overwrite for testing.
func ( cs * Consensus State) SetTimeoutTicker ( timeoutTicker TimeoutTicker ) {
func ( cs * State ) SetTimeoutTicker ( timeoutTicker TimeoutTicker ) {
cs . mtx . Lock ( )
cs . timeoutTicker = timeoutTicker
cs . mtx . Unlock ( )
}
// LoadCommit loads the commit for a given height.
func ( cs * Consensus State) LoadCommit ( height int64 ) * types . Commit {
func ( cs * State ) LoadCommit ( height int64 ) * types . Commit {
cs . mtx . RLock ( )
defer cs . mtx . RUnlock ( )
if height == cs . blockStore . Height ( ) {
@ -277,7 +277,7 @@ func (cs *ConsensusState) LoadCommit(height int64) *types.Commit {
// OnStart implements cmn.Service.
// It loads the latest state via the WAL, and starts the timeout and receive routines.
func ( cs * Consensus State) OnStart ( ) error {
func ( cs * State ) OnStart ( ) error {
if err := cs . evsw . Start ( ) ; err != nil {
return err
}
@ -288,7 +288,7 @@ func (cs *ConsensusState) OnStart() error {
walFile := cs . config . WalFile ( )
wal , err := cs . OpenWAL ( walFile )
if err != nil {
cs . Logger . Error ( "Error loading Consensus State wal" , "err" , err . Error ( ) )
cs . Logger . Error ( "Error loading State wal" , "err" , err . Error ( ) )
return err
}
cs . wal = wal
@ -324,7 +324,7 @@ go run scripts/json2wal/main.go wal.json $WALFILE # rebuild the file without cor
return err
}
cs . Logger . Error ( "Error on catchup replay. Proceeding to start Consensus State anyway" , "err" , err . Error ( ) )
cs . Logger . Error ( "Error on catchup replay. Proceeding to start State anyway" , "err" , err . Error ( ) )
// NOTE: if we ever do return an error here,
// make sure to stop the timeoutTicker
}
@ -342,7 +342,7 @@ go run scripts/json2wal/main.go wal.json $WALFILE # rebuild the file without cor
// timeoutRoutine: receive requests for timeouts on tickChan and fire timeouts on tockChan
// receiveRoutine: serializes processing of proposoals, block parts, votes; coordinates state transitions
func ( cs * Consensus State) startRoutines ( maxSteps int ) {
func ( cs * State ) startRoutines ( maxSteps int ) {
err := cs . timeoutTicker . Start ( )
if err != nil {
cs . Logger . Error ( "Error starting timeout ticker" , "err" , err )
@ -352,7 +352,7 @@ func (cs *ConsensusState) startRoutines(maxSteps int) {
}
// OnStop implements cmn.Service.
func ( cs * Consensus State) OnStop ( ) {
func ( cs * State ) OnStop ( ) {
cs . evsw . Stop ( )
cs . timeoutTicker . Stop ( )
// WAL is stopped in receiveRoutine.
@ -361,12 +361,12 @@ func (cs *ConsensusState) OnStop() {
// Wait waits for the the main routine to return.
// NOTE: be sure to Stop() the event switch and drain
// any event channels or this may deadlock
func ( cs * Consensus State) Wait ( ) {
func ( cs * State ) Wait ( ) {
<- cs . done
}
// OpenWAL opens a file to log all consensus messages and timeouts for deterministic accountability
func ( cs * Consensus State) OpenWAL ( walFile string ) ( WAL , error ) {
func ( cs * State ) OpenWAL ( walFile string ) ( WAL , error ) {
wal , err := NewWAL ( walFile )
if err != nil {
cs . Logger . Error ( "Failed to open WAL for consensus state" , "wal" , walFile , "err" , err )
@ -387,7 +387,7 @@ func (cs *ConsensusState) OpenWAL(walFile string) (WAL, error) {
// TODO: should these return anything or let callers just use events?
// AddVote inputs a vote.
func ( cs * Consensus State) AddVote ( vote * types . Vote , peerID p2p . ID ) ( added bool , err error ) {
func ( cs * State ) AddVote ( vote * types . Vote , peerID p2p . ID ) ( added bool , err error ) {
if peerID == "" {
cs . internalMsgQueue <- msgInfo { & VoteMessage { vote } , "" }
} else {
@ -399,7 +399,7 @@ func (cs *ConsensusState) AddVote(vote *types.Vote, peerID p2p.ID) (added bool,
}
// SetProposal inputs a proposal.
func ( cs * Consensus State) SetProposal ( proposal * types . Proposal , peerID p2p . ID ) error {
func ( cs * State ) SetProposal ( proposal * types . Proposal , peerID p2p . ID ) error {
if peerID == "" {
cs . internalMsgQueue <- msgInfo { & ProposalMessage { proposal } , "" }
@ -412,7 +412,7 @@ func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerID p2p.ID) e
}
// AddProposalBlockPart inputs a part of the proposal block.
func ( cs * Consensus State) AddProposalBlockPart ( height int64 , round int , part * types . Part , peerID p2p . ID ) error {
func ( cs * State ) AddProposalBlockPart ( height int64 , round int , part * types . Part , peerID p2p . ID ) error {
if peerID == "" {
cs . internalMsgQueue <- msgInfo { & BlockPartMessage { height , round , part } , "" }
@ -425,11 +425,12 @@ func (cs *ConsensusState) AddProposalBlockPart(height int64, round int, part *ty
}
// SetProposalAndBlock inputs the proposal and all block parts.
func ( cs * Consensus State) SetProposalAndBlock (
func ( cs * State ) SetProposalAndBlock (
proposal * types . Proposal ,
block * types . Block ,
parts * types . PartSet ,
peerID p2p . ID ) error {
peerID p2p . ID ,
) error {
if err := cs . SetProposal ( proposal , peerID ) ; err != nil {
return err
}
@ -445,30 +446,30 @@ func (cs *ConsensusState) SetProposalAndBlock(
//------------------------------------------------------------
// internal functions for managing the state
func ( cs * Consensus State) updateHeight ( height int64 ) {
func ( cs * State ) updateHeight ( height int64 ) {
cs . metrics . Height . Set ( float64 ( height ) )
cs . Height = height
}
func ( cs * Consensus State) updateRoundStep ( round int , step cstypes . RoundStepType ) {
func ( cs * State ) updateRoundStep ( round int , step cstypes . RoundStepType ) {
cs . Round = round
cs . Step = step
}
// enterNewRound(height, 0) at cs.StartTime.
func ( cs * Consensus State) scheduleRound0 ( rs * cstypes . RoundState ) {
func ( cs * State ) scheduleRound0 ( rs * cstypes . RoundState ) {
//cs.Logger.Info("scheduleRound0", "now", tmtime.Now(), "startTime", cs.StartTime)
sleepDuration := rs . StartTime . Sub ( tmtime . Now ( ) )
cs . scheduleTimeout ( sleepDuration , rs . Height , 0 , cstypes . RoundStepNewHeight )
}
// Attempt to schedule a timeout (by sending timeoutInfo on the tickChan)
func ( cs * Consensus State) scheduleTimeout ( duration time . Duration , height int64 , round int , step cstypes . RoundStepType ) {
func ( cs * State ) scheduleTimeout ( duration time . Duration , height int64 , round int , step cstypes . RoundStepType ) {
cs . timeoutTicker . ScheduleTimeout ( timeoutInfo { duration , height , round , step } )
}
// send a msg into the receiveRoutine regarding our own proposal, block part, or vote
func ( cs * Consensus State) sendInternalMessage ( mi msgInfo ) {
func ( cs * State ) sendInternalMessage ( mi msgInfo ) {
select {
case cs . internalMsgQueue <- mi :
default :
@ -483,7 +484,7 @@ func (cs *ConsensusState) sendInternalMessage(mi msgInfo) {
// Reconstruct LastCommit from SeenCommit, which we saved along with the block,
// (which happens even before saving the state)
func ( cs * Consensus State) reconstructLastCommit ( state sm . State ) {
func ( cs * State ) reconstructLastCommit ( state sm . State ) {
if state . LastBlockHeight == 0 {
return
}
@ -495,9 +496,9 @@ func (cs *ConsensusState) reconstructLastCommit(state sm.State) {
cs . LastCommit = lastPrecommits
}
// Updates Consensus State and increments height to match that of state.
// Updates State and increments height to match that of state.
// The round becomes 0 and cs.Step becomes cstypes.RoundStepNewHeight.
func ( cs * Consensus State) updateToState ( state sm . State ) {
func ( cs * State ) updateToState ( state sm . State ) {
if cs . CommitRound > - 1 && 0 < cs . Height && cs . Height != state . LastBlockHeight {
panic ( fmt . Sprintf ( "updateToState() expected state height of %v but found %v" ,
cs . Height , state . LastBlockHeight ) )
@ -574,11 +575,11 @@ func (cs *ConsensusState) updateToState(state sm.State) {
cs . newStep ( )
}
func ( cs * Consensus State) newStep ( ) {
func ( cs * State ) newStep ( ) {
rs := cs . RoundStateEvent ( )
cs . wal . Write ( rs )
cs . nSteps ++
// newStep is called by updateToState in NewConsensus State before the eventBus is set!
// newStep is called by updateToState in NewState before the eventBus is set!
if cs . eventBus != nil {
cs . eventBus . PublishEventNewRoundStep ( rs )
cs . evsw . FireEvent ( types . EventNewRoundStep , & cs . RoundState )
@ -592,9 +593,9 @@ func (cs *ConsensusState) newStep() {
// it's argument (n) is the number of messages to process before exiting - use 0 to run forever
// It keeps the RoundState and is the only thing that updates it.
// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities.
// Consensus State must be locked before any internal state is updated.
func ( cs * Consensus State) receiveRoutine ( maxSteps int ) {
onExit := func ( cs * Consensus State) {
// State must be locked before any internal state is updated.
func ( cs * State ) receiveRoutine ( maxSteps int ) {
onExit := func ( cs * State ) {
// NOTE: the internalMsgQueue may have signed messages from our
// priv_val that haven't hit the WAL, but its ok because
// priv_val tracks LastSig
@ -669,7 +670,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
}
// state transitions on complete-proposal, 2/3-any, 2/3-one
func ( cs * Consensus State) handleMsg ( mi msgInfo ) {
func ( cs * State ) handleMsg ( mi msgInfo ) {
cs . mtx . Lock ( )
defer cs . mtx . Unlock ( )
@ -736,7 +737,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) {
}
}
func ( cs * Consensus State) handleTimeout ( ti timeoutInfo , rs cstypes . RoundState ) {
func ( cs * State ) handleTimeout ( ti timeoutInfo , rs cstypes . RoundState ) {
cs . Logger . Debug ( "Received tock" , "timeout" , ti . Duration , "height" , ti . Height , "round" , ti . Round , "step" , ti . Step )
// timeouts must be for current height, round, step
@ -772,7 +773,7 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) {
}
func ( cs * Consensus State) handleTxsAvailable ( ) {
func ( cs * State ) handleTxsAvailable ( ) {
cs . mtx . Lock ( )
defer cs . mtx . Unlock ( )
@ -806,7 +807,7 @@ func (cs *ConsensusState) handleTxsAvailable() {
// Enter: +2/3 precommits for nil at (height,round-1)
// Enter: +2/3 prevotes any or +2/3 precommits for block or any from (height, round)
// NOTE: cs.StartTime was already set for height.
func ( cs * Consensus State) enterNewRound ( height int64 , round int ) {
func ( cs * State ) enterNewRound ( height int64 , round int ) {
logger := cs . Logger . With ( "height" , height , "round" , round )
if cs . Height != height || round < cs . Round || ( cs . Round == round && cs . Step != cstypes . RoundStepNewHeight ) {
@ -870,7 +871,7 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) {
// needProofBlock returns true on the first height (so the genesis app hash is signed right away)
// and where the last block (height-1) caused the app hash to change
func ( cs * Consensus State) needProofBlock ( height int64 ) bool {
func ( cs * State ) needProofBlock ( height int64 ) bool {
if height == 1 {
return true
}
@ -883,7 +884,7 @@ func (cs *ConsensusState) needProofBlock(height int64) bool {
// Enter (CreateEmptyBlocks, CreateEmptyBlocksInterval > 0 ):
// after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval
// Enter (!CreateEmptyBlocks) : after enterNewRound(height,round), once txs are in the mempool
func ( cs * Consensus State) enterPropose ( height int64 , round int ) {
func ( cs * State ) enterPropose ( height int64 , round int ) {
logger := cs . Logger . With ( "height" , height , "round" , round )
if cs . Height != height || round < cs . Round || ( cs . Round == round && cstypes . RoundStepPropose <= cs . Step ) {
@ -944,11 +945,11 @@ func (cs *ConsensusState) enterPropose(height int64, round int) {
}
}
func ( cs * Consensus State) isProposer ( address [ ] byte ) bool {
func ( cs * State ) isProposer ( address [ ] byte ) bool {
return bytes . Equal ( cs . Validators . GetProposer ( ) . Address , address )
}
func ( cs * Consensus State) defaultDecideProposal ( height int64 , round int ) {
func ( cs * State ) defaultDecideProposal ( height int64 , round int ) {
var block * types . Block
var blockParts * types . PartSet
@ -988,7 +989,7 @@ func (cs *ConsensusState) defaultDecideProposal(height int64, round int) {
// Returns true if the proposal block is complete &&
// (if POLRound was proposed, we have +2/3 prevotes from there).
func ( cs * Consensus State) isProposalComplete ( ) bool {
func ( cs * State ) isProposalComplete ( ) bool {
if cs . Proposal == nil || cs . ProposalBlock == nil {
return false
}
@ -1007,7 +1008,7 @@ func (cs *ConsensusState) isProposalComplete() bool {
// is returned for convenience so we can log the proposal block.
// Returns nil block upon error.
// NOTE: keep it side-effect free for clarity.
func ( cs * Consensus State) createProposalBlock ( ) ( block * types . Block , blockParts * types . PartSet ) {
func ( cs * State ) createProposalBlock ( ) ( block * types . Block , blockParts * types . PartSet ) {
var commit * types . Commit
switch {
case cs . Height == 1 :
@ -1031,7 +1032,7 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
// Enter: proposal block and POL is ready.
// Prevote for LockedBlock if we're locked, or ProposalBlock if valid.
// Otherwise vote nil.
func ( cs * Consensus State) enterPrevote ( height int64 , round int ) {
func ( cs * State ) enterPrevote ( height int64 , round int ) {
if cs . Height != height || round < cs . Round || ( cs . Round == round && cstypes . RoundStepPrevote <= cs . Step ) {
cs . Logger . Debug ( fmt . Sprintf (
"enterPrevote(%v/%v): Invalid args. Current step: %v/%v/%v" ,
@ -1058,7 +1059,7 @@ func (cs *ConsensusState) enterPrevote(height int64, round int) {
// (so we have more time to try and collect +2/3 prevotes for a single block)
}
func ( cs * Consensus State) defaultDoPrevote ( height int64 , round int ) {
func ( cs * State ) defaultDoPrevote ( height int64 , round int ) {
logger := cs . Logger . With ( "height" , height , "round" , round )
// If a block is locked, prevote that.
@ -1092,7 +1093,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int64, round int) {
}
// Enter: any +2/3 prevotes at next round.
func ( cs * Consensus State) enterPrevoteWait ( height int64 , round int ) {
func ( cs * State ) enterPrevoteWait ( height int64 , round int ) {
logger := cs . Logger . With ( "height" , height , "round" , round )
if cs . Height != height || round < cs . Round || ( cs . Round == round && cstypes . RoundStepPrevoteWait <= cs . Step ) {
@ -1126,7 +1127,7 @@ func (cs *ConsensusState) enterPrevoteWait(height int64, round int) {
// Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round)
// else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil,
// else, precommit nil otherwise.
func ( cs * Consensus State) enterPrecommit ( height int64 , round int ) {
func ( cs * State ) enterPrecommit ( height int64 , round int ) {
logger := cs . Logger . With ( "height" , height , "round" , round )
if cs . Height != height || round < cs . Round || ( cs . Round == round && cstypes . RoundStepPrecommit <= cs . Step ) {
@ -1228,7 +1229,7 @@ func (cs *ConsensusState) enterPrecommit(height int64, round int) {
}
// Enter: any +2/3 precommits for next round.
func ( cs * Consensus State) enterPrecommitWait ( height int64 , round int ) {
func ( cs * State ) enterPrecommitWait ( height int64 , round int ) {
logger := cs . Logger . With ( "height" , height , "round" , round )
if cs . Height != height || round < cs . Round || ( cs . Round == round && cs . TriggeredTimeoutPrecommit ) {
@ -1256,7 +1257,7 @@ func (cs *ConsensusState) enterPrecommitWait(height int64, round int) {
}
// Enter: +2/3 precommits for block
func ( cs * Consensus State) enterCommit ( height int64 , commitRound int ) {
func ( cs * State ) enterCommit ( height int64 , commitRound int ) {
logger := cs . Logger . With ( "height" , height , "commitRound" , commitRound )
if cs . Height != height || cstypes . RoundStepCommit <= cs . Step {
@ -1320,7 +1321,7 @@ func (cs *ConsensusState) enterCommit(height int64, commitRound int) {
}
// If we have the block AND +2/3 commits for it, finalize.
func ( cs * Consensus State) tryFinalizeCommit ( height int64 ) {
func ( cs * State ) tryFinalizeCommit ( height int64 ) {
logger := cs . Logger . With ( "height" , height )
if cs . Height != height {
@ -1349,7 +1350,7 @@ func (cs *ConsensusState) tryFinalizeCommit(height int64) {
}
// Increment height and goto cstypes.RoundStepNewHeight
func ( cs * Consensus State) finalizeCommit ( height int64 ) {
func ( cs * State ) finalizeCommit ( height int64 ) {
if cs . Height != height || cs . Step != cstypes . RoundStepCommit {
cs . Logger . Debug ( fmt . Sprintf (
"finalizeCommit(%v): Invalid args. Current step: %v/%v/%v" ,
@ -1409,12 +1410,13 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
// complains about replaying for heights where an #ENDHEIGHT entry already
// exists.
//
// Either way, the Consensus State should not be resumed until we
// Either way, the State should not be resumed until we
// successfully call ApplyBlock (ie. later here, or in Handshake after
// restart).
endMsg := EndHeightMessage { height }
if err := cs . wal . WriteSync ( endMsg ) ; err != nil { // NOTE: fsync
panic ( fmt . Sprintf ( "Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node" , endMsg , err ) )
panic ( fmt . Sprintf ( "Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node" ,
endMsg , err ) )
}
fail . Fail ( ) // XXX
@ -1458,7 +1460,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
// * cs.StartTime is set to when we will start round0.
}
func ( cs * Consensus State) recordMetrics ( height int64 , block * types . Block ) {
func ( cs * State ) recordMetrics ( height int64 , block * types . Block ) {
cs . metrics . Validators . Set ( float64 ( cs . Validators . Size ( ) ) )
cs . metrics . ValidatorsPower . Set ( float64 ( cs . Validators . TotalVotingPower ( ) ) )
missingValidators := 0
@ -1499,7 +1501,7 @@ func (cs *ConsensusState) recordMetrics(height int64, block *types.Block) {
//-----------------------------------------------------------------------------
func ( cs * Consensus State) defaultSetProposal ( proposal * types . Proposal ) error {
func ( cs * State ) defaultSetProposal ( proposal * types . Proposal ) error {
// Already have one
// TODO: possibly catch double proposals
if cs . Proposal != nil {
@ -1536,7 +1538,7 @@ func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error {
// NOTE: block is not necessarily valid.
// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit,
// once we have the full block.
func ( cs * Consensus State) addProposalBlockPart ( msg * BlockPartMessage , peerID p2p . ID ) ( added bool , err error ) {
func ( cs * State ) addProposalBlockPart ( msg * BlockPartMessage , peerID p2p . ID ) ( added bool , err error ) {
height , round , part := msg . Height , msg . Round , msg . Part
// Blocks might be reused, so round mismatch is OK
@ -1606,7 +1608,7 @@ func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p
}
// Attempt to add the vote. if its a duplicate signature, dupeout the validator
func ( cs * Consensus State) tryAddVote ( vote * types . Vote , peerID p2p . ID ) ( bool , error ) {
func ( cs * State ) tryAddVote ( vote * types . Vote , peerID p2p . ID ) ( bool , error ) {
added , err := cs . addVote ( vote , peerID )
if err != nil {
// If the vote height is off, we'll just ignore it,
@ -1644,7 +1646,7 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID p2p.ID) (bool, err
//-----------------------------------------------------------------------------
func ( cs * Consensus State) addVote (
func ( cs * State ) addVote (
vote * types . Vote ,
peerID p2p . ID ) ( added bool , err error ) {
cs . Logger . Debug (
@ -1656,7 +1658,8 @@ func (cs *ConsensusState) addVote(
"valIndex" ,
vote . ValidatorIndex ,
"csHeight" ,
cs . Height )
cs . Height ,
)
// A precommit for the previous height?
// These come in while we wait timeoutCommit
@ -1742,7 +1745,7 @@ func (cs *ConsensusState) addVote(
} else {
cs . Logger . Info (
"Valid block we don't know about. Set ProposalBlock=nil" ,
"proposal" , cs . ProposalBlock . Hash ( ) , "blockId " , blockID . Hash )
"proposal" , cs . ProposalBlock . Hash ( ) , "blockID " , blockID . Hash )
// We're getting the wrong block.
cs . ProposalBlock = nil
}
@ -1802,10 +1805,11 @@ func (cs *ConsensusState) addVote(
return added , err
}
func ( cs * Consensus State) signVote (
type_ types . SignedMsgType ,
func ( cs * State ) signVote (
msgType types . SignedMsgType ,
hash [ ] byte ,
header types . PartSetHeader ) ( * types . Vote , error ) {
header types . PartSetHeader ,
) ( * types . Vote , error ) {
// Flush the WAL. Otherwise, we may not recompute the same vote to sign,
// and the privValidator will refuse to sign anything.
cs . wal . FlushAndSync ( )
@ -1819,14 +1823,14 @@ func (cs *ConsensusState) signVote(
Height : cs . Height ,
Round : cs . Round ,
Timestamp : cs . voteTime ( ) ,
Type : type_ ,
Type : msgType ,
BlockID : types . BlockID { Hash : hash , PartsHeader : header } ,
}
err := cs . privValidator . SignVote ( cs . state . ChainID , vote )
return vote , err
}
func ( cs * Consensus State) voteTime ( ) time . Time {
func ( cs * State ) voteTime ( ) time . Time {
now := tmtime . Now ( )
minVoteTime := now
// TODO: We should remove next line in case we don't vote for v in case cs.ProposalBlock == nil,
@ -1846,12 +1850,12 @@ func (cs *ConsensusState) voteTime() time.Time {
}
// sign the vote and publish on internalMsgQueue
func ( cs * Consensus State) signAddVote ( type_ types . SignedMsgType , hash [ ] byte , header types . PartSetHeader ) * types . Vote {
func ( cs * State ) signAddVote ( msgType types . SignedMsgType , hash [ ] byte , header types . PartSetHeader ) * types . Vote {
// if we don't have a key or we're not in the validator set, do nothing
if cs . privValidator == nil || ! cs . Validators . HasAddress ( cs . privValidator . GetPubKey ( ) . Address ( ) ) {
return nil
}
vote , err := cs . signVote ( type_ , hash , header )
vote , err := cs . signVote ( msgType , hash , header )
if err == nil {
cs . sendInternalMessage ( msgInfo { & VoteMessage { vote } , "" } )
cs . Logger . Info ( "Signed and pushed vote" , "height" , cs . Height , "round" , cs . Round , "vote" , vote , "err" , err )