@ -18,16 +18,55 @@ import (
"github.com/tendermint/tendermint/types"
)
var (
timeoutPropose0 = 3000 * time . Millisecond // Wait this long for a proposal
timeoutProposeDelta = 0500 * time . Millisecond // timeoutProposeN is timeoutPropose0 + timeoutProposeDelta*N
timeoutPrevote0 = 1000 * time . Millisecond // After any +2/3 prevotes received, wait this long for stragglers.
timeoutPrevoteDelta = 0500 * time . Millisecond // timeoutPrevoteN is timeoutPrevote0 + timeoutPrevoteDelta*N
timeoutPrecommit0 = 1000 * time . Millisecond // After any +2/3 precommits received, wait this long for stragglers.
timeoutPrecommitDelta = 0500 * time . Millisecond // timeoutPrecommitN is timeoutPrecommit0 + timeoutPrecommitDelta*N
timeoutCommit = 1000 * time . Millisecond // After +2/3 commits received for committed block, wait this long for stragglers in the next height's RoundStepNewHeight.
//-----------------------------------------------------------------------------
// Timeout Parameters
// All in milliseconds
type TimeoutParams struct {
Propose0 int
ProposeDelta int
Prevote0 int
PrevoteDelta int
Precommit0 int
PrecommitDelta int
Commit0 int
}
)
// Wait this long for a proposal
func ( tp * TimeoutParams ) Propose ( round int ) time . Duration {
return time . Duration ( tp . Propose0 + tp . ProposeDelta * round ) * time . Millisecond
}
// After receiving any +2/3 prevote, wait this long for stragglers
func ( tp * TimeoutParams ) Prevote ( round int ) time . Duration {
return time . Duration ( tp . Prevote0 + tp . PrevoteDelta * round ) * time . Millisecond
}
// After receiving any +2/3 precommits, wait this long for stragglers
func ( tp * TimeoutParams ) Precommit ( round int ) time . Duration {
return time . Duration ( tp . Precommit0 + tp . PrecommitDelta * round ) * time . Millisecond
}
// After receiving +2/3 precommits for a single block (a commit), wait this long for stragglers in the next height's RoundStepNewHeight
func ( tp * TimeoutParams ) Commit ( t time . Time ) time . Time {
return t . Add ( time . Duration ( tp . Commit0 ) * time . Millisecond )
}
// Initialize parameters from config file
func InitTimeoutParamsFromConfig ( ) * TimeoutParams {
return & TimeoutParams {
Propose0 : config . GetInt ( "timeout_propose" ) ,
ProposeDelta : config . GetInt ( "timeout_propose_delta" ) ,
Prevote0 : config . GetInt ( "timeout_prevote" ) ,
PrevoteDelta : config . GetInt ( "timeout_prevote_delta" ) ,
Precommit0 : config . GetInt ( "timeout_precommit" ) ,
PrecommitDelta : config . GetInt ( "timeout_precommit_delta" ) ,
Commit0 : config . GetInt ( "timeout_commit" ) ,
}
}
//-----------------------------------------------------------------------------
// Errors
var (
ErrInvalidProposalSignature = errors . New ( "Error invalid proposal signature" )
@ -188,6 +227,7 @@ type ConsensusState struct {
timeoutTicker * time . Ticker // ticker for timeouts
tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine
tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine
timeoutParams * TimeoutParams // parameters and functions for timeout intervals
evsw * events . EventSwitch
@ -206,6 +246,7 @@ func NewConsensusState(state *sm.State, proxyAppConn proxy.AppConn, blockStore *
timeoutTicker : new ( time . Ticker ) ,
tickChan : make ( chan timeoutInfo , tickTockBufferSize ) ,
tockChan : make ( chan timeoutInfo , tickTockBufferSize ) ,
timeoutParams : InitTimeoutParamsFromConfig ( ) ,
}
cs . updateToState ( state )
// Don't call scheduleRound0 yet.
@ -278,13 +319,16 @@ func (cs *ConsensusState) startRoutines(maxSteps int) {
func ( cs * ConsensusState ) OnStop ( ) {
cs . QuitService . OnStop ( )
if cs . wal != nil && cs . IsRunning ( ) {
cs . wal . Wait ( )
}
}
// Open file to log all consensus messages and timeouts for deterministic accountability
func ( cs * ConsensusState ) OpenWAL ( file string ) ( err error ) {
cs . mtx . Lock ( )
defer cs . mtx . Unlock ( )
wal , err := NewWAL ( file )
wal , err := NewWAL ( file , config . GetBool ( "cswal_light" ) )
if err != nil {
return err
}
@ -453,9 +497,9 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
// to be gathered for the first block.
// And alternative solution that relies on clocks:
// cs.StartTime = state.LastBlockTime.Add(timeoutCommit)
cs . StartTime = time . Now ( ) . Add ( timeoutCommit )
cs . StartTime = cs . timeoutParams . Commit ( time . Now ( ) )
} else {
cs . StartTime = cs . CommitTime . Add ( timeoutCommit )
cs . StartTime = cs . timeoutParams . Commit ( cs . CommitTime )
}
cs . CommitTime = time . Time { }
cs . Validators = validators
@ -575,6 +619,19 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
// go to the next step
cs . handleTimeout ( ti , rs )
case <- cs . Quit :
// drain the internalMsgQueue in case we eg. signed a proposal but it didn't hit the wal
FLUSH :
for {
select {
case mi = <- cs . internalMsgQueue :
cs . wal . Save ( mi )
cs . handleMsg ( mi , rs )
default :
break FLUSH
}
}
// close wal now that we're done writing to it
if cs . wal != nil {
cs . wal . Close ( )
@ -598,8 +655,10 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
err = cs . setProposal ( msg . Proposal )
case * BlockPartMessage :
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
// if we're the only validator, the enterPrevote may take us through to the next round
_ , err = cs . addProposalBlockPart ( msg . Height , msg . Part )
if err != nil && msg . Round != cs . Round {
err = nil
}
case * VoteMessage :
// attempt to add the vote and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition
@ -618,7 +677,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
log . Warn ( "Unknown msg type" , reflect . TypeOf ( msg ) )
}
if err != nil {
log . Error ( "Error with msg" , "type" , reflect . TypeOf ( msg ) , "error" , err , "msg" , msg )
log . Error ( "Error with msg" , "type" , reflect . TypeOf ( msg ) , "peer" , peerKey , " error" , err , "msg" , msg )
}
}
@ -726,8 +785,8 @@ func (cs *ConsensusState) enterPropose(height int, round int) {
}
} ( )
// This step times out after `timeoutPropose`
cs . scheduleTimeout ( timeoutPropose0 + timeoutProposeDelta * time . Duration ( round ) , height , round , RoundStepPropose )
// If we don't get the proposal and all block parts quick enough, enterPrevote
cs . scheduleTimeout ( cs . timeoutParams . Propose ( round ) , height , round , RoundStepPropose )
// Nothing more to do if we're not a validator
if cs . privValidator == nil {
@ -818,8 +877,19 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
return
}
maxBlockSize := config . GetInt ( "block_size" )
// Mempool validated transactions
txs := cs . mempool . Reap ( )
// if block_size < 0, no txs will be included
var txs [ ] types . Tx
if maxBlockSize >= 0 {
txs = cs . mempool . Reap ( )
}
// Cap the number of txs in a block
if maxBlockSize > 0 && maxBlockSize < len ( txs ) {
txs = txs [ : maxBlockSize ]
}
block = & types . Block {
Header : & types . Header {
@ -928,8 +998,8 @@ func (cs *ConsensusState) enterPrevoteWait(height int, round int) {
cs . newStep ( )
} ( )
// After `timeoutPrevote0+timeoutPrevoteDelta*round`, enterPrecommit()
cs . scheduleTimeout ( timeoutPrevote0 + timeoutPrevoteDelta * time . Duration ( round ) , height , round , RoundStepPrevoteWait )
// Wait for some more prevotes; enterPrecommit
cs . scheduleTimeout ( cs . timeoutParams . Prevote ( round ) , height , round , RoundStepPrevoteWait )
}
// Enter: +2/3 precomits for block or nil.
@ -978,9 +1048,9 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
// +2/3 prevoted nil. Unlock and precommit nil.
if len ( hash ) == 0 {
if cs . LockedBlock == nil {
log . Info ( "enterPrecommit: +2/3 prevoted for nil." )
log . Notice ( "enterPrecommit: +2/3 prevoted for nil." )
} else {
log . Info ( "enterPrecommit: +2/3 prevoted for nil. Unlocking" )
log . Notice ( "enterPrecommit: +2/3 prevoted for nil. Unlocking" )
cs . LockedRound = 0
cs . LockedBlock = nil
cs . LockedBlockParts = nil
@ -994,7 +1064,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
// If we're already locked on that block, precommit it, and update the LockedRound
if cs . LockedBlock . HashesTo ( hash ) {
log . Info ( "enterPrecommit: +2/3 prevoted locked block. Relocking" )
log . Notice ( "enterPrecommit: +2/3 prevoted locked block. Relocking" )
cs . LockedRound = round
cs . evsw . FireEvent ( types . EventStringRelock ( ) , cs . RoundStateEvent ( ) )
cs . signAddVote ( types . VoteTypePrecommit , hash , partsHeader )
@ -1003,7 +1073,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
// If +2/3 prevoted for proposal block, stage and precommit it
if cs . ProposalBlock . HashesTo ( hash ) {
log . Info ( "enterPrecommit: +2/3 prevoted proposal block. Locking" , "hash" , hash )
log . Notice ( "enterPrecommit: +2/3 prevoted proposal block. Locking" , "hash" , hash )
// Validate the block.
if err := cs . state . ValidateBlock ( cs . ProposalBlock ) ; err != nil {
PanicConsensus ( Fmt ( "enterPrecommit: +2/3 prevoted for an invalid block: %v" , err ) )
@ -1049,8 +1119,8 @@ func (cs *ConsensusState) enterPrecommitWait(height int, round int) {
cs . newStep ( )
} ( )
// After `timeoutPrecommit0+timeoutPrecommitDelta*round`, enterNewRound()
cs . scheduleTimeout ( timeoutPrecommit0 + timeoutPrecommitDelta * time . Duration ( round ) , height , round , RoundStepPrecommitWait )
// Wait for some more precommits; enterNewRound
cs . scheduleTimeout ( cs . timeoutParams . Precommit ( round ) , height , round , RoundStepPrecommitWait )
}
@ -1064,7 +1134,7 @@ func (cs *ConsensusState) enterCommit(height int, commitRound int) {
defer func ( ) {
// Done enterCommit:
// keep ca.Round the same, it points to the right Precommits set.
// keep cs.Round the same, commitRound points to the right Precommits set.
cs . updateRoundStep ( cs . Round , RoundStepCommit )
cs . CommitRound = commitRound
cs . newStep ( )
@ -1232,7 +1302,7 @@ func (cs *ConsensusState) setProposal(proposal *types.Proposal) error {
}
// NOTE: block is not necessarily valid.
// This can trigger us to go into enterPrevote asynchronously (before we timeout of propose) or to attempt to commit
// Asynchronously triggers either enterPrevote (before we timeout of propose) or tryFinalizeCommit, once we have the full block.
func ( cs * ConsensusState ) addProposalBlockPart ( height int , part * types . Part ) ( added bool , err error ) {
// Blocks might be reused, so round mismatch is OK
if cs . Height != height {