diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 6be23c8b0..9d0caf774 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -29,7 +29,10 @@ func init() { // Heal partition and ensure A sees the commit func TestByzantine(t *testing.T) { N := 4 - css := randConsensusNet(N, "consensus_byzantine_test", crankTimeoutPropose) + css := randConsensusNet(N, "consensus_byzantine_test", crankTimeoutPropose, newMockTickerFunc(false)) + + // give the byzantine validator a normal ticker + css[0].SetTimeoutTicker(NewTimeoutTicker()) switches := make([]*p2p.Switch, N) for i := 0; i < N; i++ { diff --git a/consensus/common_test.go b/consensus/common_test.go index cf5df2baf..7b1653f34 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -257,7 +257,7 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { return cs, vss } -func randConsensusNet(nValidators int, testName string, updateConfig func(cfg.Config)) []*ConsensusState { +func randConsensusNet(nValidators int, testName string, updateConfig func(cfg.Config), tickerFunc func() TimeoutTicker) []*ConsensusState { genDoc, privVals := randGenesisDoc(nValidators, false, 10) css := make([]*ConsensusState, nValidators) for i := 0; i < nValidators; i++ { @@ -268,12 +268,13 @@ func randConsensusNet(nValidators int, testName string, updateConfig func(cfg.Co updateConfig(thisConfig) EnsureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], counter.NewCounterApplication(true)) + css[i].SetTimeoutTicker(tickerFunc()) } return css } // nPeers = nValidators + nNotValidator -func randConsensusNetWithPeers(nValidators, nPeers int, testName string, updateConfig func(cfg.Config)) []*ConsensusState { +func randConsensusNetWithPeers(nValidators, nPeers int, testName string, updateConfig func(cfg.Config), tickerFunc func() TimeoutTicker) []*ConsensusState { genDoc, privVals := randGenesisDoc(nValidators, false, int64(testMinPower)) css := make([]*ConsensusState, nPeers) for i := 0; i < nPeers; i++ { @@ -294,6 +295,7 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, updateC dir, _ := ioutil.TempDir("/tmp", "persistent-dummy") css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, dummy.NewPersistentDummyApplication(dir)) + css[i].SetTimeoutTicker(tickerFunc()) } return css } @@ -379,3 +381,46 @@ func crankTimeoutPropose(config cfg.Config) { config.Set("timeout_propose", 110000) // TODO: crank it to eleventy config.Set("timeout_commit", 1000) } + +//------------------------------------ + +func newMockTickerFunc(onlyOnce bool) func() TimeoutTicker { + return func() TimeoutTicker { + return &mockTicker{ + c: make(chan time.Time, 10), + onlyOnce: onlyOnce, + } + } +} + +// mock ticker only fires for NewStepRound (timeout commit), +// and only once if onlyOnce=true +type mockTicker struct { + c chan time.Time + + onlyOnce bool + fired bool +} + +func (m *mockTicker) Stop() { +} + +func (m *mockTicker) Reset(ti timeoutInfo) { + if m.onlyOnce && m.fired { + return + } + if ti.Step == RoundStepNewHeight { + m.Fire() + m.fired = true + } +} + +func (m *mockTicker) Chan() <-chan time.Time { + return m.c +} + +func (m *mockTicker) Fire() { + m.c <- time.Now() +} + +//------------------------------------ diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 30d50ab7c..ca9aea899 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -24,7 +24,7 @@ func init() { // Ensure a testnet makes blocks func TestReactor(t *testing.T) { N := 4 - css := randConsensusNet(N, "consensus_reactor_test", crankTimeoutPropose) + css := randConsensusNet(N, "consensus_reactor_test", crankTimeoutPropose, newMockTickerFunc(true)) reactors := make([]*ConsensusReactor, N) eventChans := make([]chan interface{}, N) for i := 0; i < N; i++ { @@ -58,7 +58,7 @@ func TestReactor(t *testing.T) { func TestValidatorSetChanges(t *testing.T) { nPeers := 8 nVals := 4 - css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", crankTimeoutPropose) + css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", crankTimeoutPropose, newMockTickerFunc(true)) reactors := make([]*ConsensusReactor, nPeers) eventChans := make([]chan interface{}, nPeers) for i := 0; i < nPeers; i++ { diff --git a/consensus/state.go b/consensus/state.go index 2bf94b7e8..10eff9c98 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -234,7 +234,7 @@ type ConsensusState struct { peerMsgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes) internalMsgQueue chan msgInfo // like peerMsgQueue but for our own proposals, parts, votes - timeoutTicker *time.Ticker // ticker for timeouts + timeoutTicker TimeoutTicker // ticker for timeouts tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine timeoutParams *TimeoutParams // parameters and functions for timeout intervals @@ -252,6 +252,40 @@ type ConsensusState struct { setProposal func(proposal *types.Proposal) error } +func NewTimeoutTicker() TimeoutTicker { + return &timeoutTicker{ticker: new(time.Ticker)} +} + +type TimeoutTicker interface { + Chan() <-chan time.Time // on which to receive a timeout + Stop() // stop the timer + Reset(ti timeoutInfo) // reset the timer +} + +type timeoutTicker struct { + ticker *time.Ticker +} + +func (t *timeoutTicker) Chan() <-chan time.Time { + return t.ticker.C +} + +func (t *timeoutTicker) Stop() { + t.ticker.Stop() +} + +func (t *timeoutTicker) Reset(ti timeoutInfo) { + t.ticker = time.NewTicker(ti.Duration) +} + +func skipTimeoutCommit(ti timeoutInfo) bool { + if ti.Step == RoundStepNewHeight && + ti.Duration == time.Duration(0) { + return true + } + return false +} + func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { cs := &ConsensusState{ config: config, @@ -260,7 +294,7 @@ func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.Ap mempool: mempool, peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), - timeoutTicker: new(time.Ticker), + timeoutTicker: NewTimeoutTicker(), tickChan: make(chan timeoutInfo, tickTockBufferSize), tockChan: make(chan timeoutInfo, tickTockBufferSize), timeoutParams: InitTimeoutParamsFromConfig(config), @@ -321,6 +355,13 @@ func (cs *ConsensusState) SetPrivValidator(priv PrivValidator) { cs.privValidator = priv } +// Set the local timer +func (cs *ConsensusState) SetTimeoutTicker(timeoutTicker TimeoutTicker) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.timeoutTicker = timeoutTicker +} + func (cs *ConsensusState) LoadCommit(height int) *types.Commit { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -562,7 +603,6 @@ func (cs *ConsensusState) updateToState(state *sm.State) { } else { cs.StartTime = cs.timeoutParams.Commit(cs.CommitTime) } - cs.CommitTime = time.Time{} cs.Validators = validators cs.Proposal = nil cs.ProposalBlock = nil @@ -613,6 +653,12 @@ func (cs *ConsensusState) timeoutRoutine() { continue } else if newti.Round == ti.Round { if ti.Step > 0 && newti.Step <= ti.Step { + // if we got here because we have all the votes, + // fire the tock now instead of waiting for the timeout + if skipTimeoutCommit(newti) { + cs.timeoutTicker.Stop() + go func(t timeoutInfo) { cs.tockChan <- t }(newti) + } continue } } @@ -628,8 +674,8 @@ func (cs *ConsensusState) timeoutRoutine() { log.Debug("Scheduling timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) cs.timeoutTicker.Stop() - cs.timeoutTicker = time.NewTicker(ti.Duration) - case <-cs.timeoutTicker.C: + cs.timeoutTicker.Reset(ti) + case <-cs.timeoutTicker.Chan(): log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) cs.timeoutTicker.Stop() // go routine here gaurantees timeoutRoutine doesn't block. @@ -681,17 +727,9 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { cs.handleTimeout(ti, rs) case <-cs.Quit: - // drain the internalMsgQueue in case we eg. signed a proposal but it didn't hit the wal - FLUSH: - for { - select { - case mi = <-cs.internalMsgQueue: - cs.wal.Save(mi) - cs.handleMsg(mi, rs) - default: - break FLUSH - } - } + // NOTE: the internalMsgQueue may have signed messages from our + // priv_val that haven't hit the WAL, but its ok because + // priv_val tracks LastSig // close wal now that we're done writing to it if cs.wal != nil { @@ -758,7 +796,7 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { switch ti.Step { case RoundStepNewHeight: // NewRound event fired from enterNewRound. - // XXX: should we fire timeout here? + // XXX: should we fire timeout here (for timeout commit)? cs.enterNewRound(ti.Height, 0) case RoundStepPropose: types.FireEventTimeoutPropose(cs.evsw, cs.RoundStateEvent()) @@ -1171,6 +1209,7 @@ func (cs *ConsensusState) enterCommit(height int, commitRound int) { // keep cs.Round the same, commitRound points to the right Precommits set. cs.updateRoundStep(cs.Round, RoundStepCommit) cs.CommitRound = commitRound + cs.CommitTime = time.Now() cs.newStep() // Maybe finalize immediately. @@ -1416,6 +1455,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool, log.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "csHeight", cs.Height) // A precommit for the previous height? + // These come in while we wait timeoutCommit if vote.Height+1 == cs.Height { if !(cs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) { // TODO: give the reason .. @@ -1426,7 +1466,15 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool, if added { log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) types.FireEventVote(cs.evsw, types.EventDataVote{vote}) + + if cs.LastCommit.HasAll() { + // if we have all the votes now, + // schedule the timeoutCommit to happen right away + // NOTE: this won't apply if only one validator + cs.scheduleTimeout(time.Duration(0), cs.Height, 0, RoundStepNewHeight) + } } + return } @@ -1487,7 +1535,6 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool, cs.enterNewRound(height, vote.Round) cs.enterPrecommit(height, vote.Round) cs.enterPrecommitWait(height, vote.Round) - //}() } default: PanicSanity(Fmt("Unexpected vote type %X", vote.Type)) // Should not happen. diff --git a/types/vote_set.go b/types/vote_set.go index 10b1aa094..a1a2bf394 100644 --- a/types/vote_set.go +++ b/types/vote_set.go @@ -377,6 +377,10 @@ func (voteSet *VoteSet) HasTwoThirdsAny() bool { return voteSet.sum > voteSet.valSet.TotalVotingPower()*2/3 } +func (voteSet *VoteSet) HasAll() bool { + return voteSet.sum == voteSet.valSet.TotalVotingPower() +} + // Returns either a blockhash (or nil) that received +2/3 majority. // If there exists no such majority, returns (nil, PartSetHeader{}, false). func (voteSet *VoteSet) TwoThirdsMajority() (blockID BlockID, ok bool) {