diff --git a/consensus/common_test.go b/consensus/common_test.go index 7b1653f34..ae9b97af6 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -387,40 +387,44 @@ func crankTimeoutPropose(config cfg.Config) { func newMockTickerFunc(onlyOnce bool) func() TimeoutTicker { return func() TimeoutTicker { return &mockTicker{ - c: make(chan time.Time, 10), + c: make(chan timeoutInfo, 10), onlyOnce: onlyOnce, } } } -// mock ticker only fires for NewStepRound (timeout commit), +// mock ticker only fires once // and only once if onlyOnce=true type mockTicker struct { - c chan time.Time + c chan timeoutInfo + mtx sync.Mutex onlyOnce bool fired bool } -func (m *mockTicker) Stop() { +func (m *mockTicker) Start() (bool, error) { + return true, nil } -func (m *mockTicker) Reset(ti timeoutInfo) { +func (m *mockTicker) Stop() bool { + return true +} + +func (m *mockTicker) ScheduleTimeout(ti timeoutInfo) { + m.mtx.Lock() + defer m.mtx.Unlock() if m.onlyOnce && m.fired { return } if ti.Step == RoundStepNewHeight { - m.Fire() + m.c <- ti m.fired = true } } -func (m *mockTicker) Chan() <-chan time.Time { +func (m *mockTicker) Chan() <-chan timeoutInfo { return m.c } -func (m *mockTicker) Fire() { - m.c <- time.Now() -} - //------------------------------------ diff --git a/consensus/replay.go b/consensus/replay.go index d2cdf4aba..b69b4384f 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -246,15 +246,17 @@ func (cs *ConsensusState) startForReplay() { cs.BaseService.OnStart() // since we replay tocks we just ignore ticks - go func() { - for { - select { - case <-cs.tickChan: - case <-cs.Quit: - return + // TODO:! + /* + go func() { + for { + select { + case <-cs.tickChan: + case <-cs.Quit: + return + } } - } - }() + }()*/ } // console function for parsing input and running commands diff --git a/consensus/state.go b/consensus/state.go index 0a4c77bc5..31a2e2c9f 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -232,12 +232,10 @@ type ConsensusState struct { RoundState state *sm.State // State until height-1. - peerMsgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes) - internalMsgQueue chan msgInfo // like peerMsgQueue but for our own proposals, parts, votes - timeoutTicker TimeoutTicker // ticker for timeouts - tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine - tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine - timeoutParams *TimeoutParams // parameters and functions for timeout intervals + peerMsgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes) + internalMsgQueue chan msgInfo // like peerMsgQueue but for our own proposals, parts, votes + timeoutTicker TimeoutTicker // ticker for timeouts + timeoutParams *TimeoutParams // parameters and functions for timeout intervals evsw types.EventSwitch @@ -252,40 +250,6 @@ type ConsensusState struct { setProposal func(proposal *types.Proposal) error } -func NewTimeoutTicker() TimeoutTicker { - return &timeoutTicker{ticker: new(time.Ticker)} -} - -type TimeoutTicker interface { - Chan() <-chan time.Time // on which to receive a timeout - Stop() // stop the timer - Reset(ti timeoutInfo) // reset the timer -} - -type timeoutTicker struct { - ticker *time.Ticker -} - -func (t *timeoutTicker) Chan() <-chan time.Time { - return t.ticker.C -} - -func (t *timeoutTicker) Stop() { - t.ticker.Stop() -} - -func (t *timeoutTicker) Reset(ti timeoutInfo) { - t.ticker = time.NewTicker(ti.Duration) -} - -func skipTimeoutCommit(ti timeoutInfo) bool { - if ti.Step == RoundStepNewHeight && - ti.Duration == time.Duration(0) { - return true - } - return false -} - func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { cs := &ConsensusState{ config: config, @@ -295,8 +259,6 @@ func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.Ap peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), timeoutTicker: NewTimeoutTicker(), - tickChan: make(chan timeoutInfo, tickTockBufferSize), - tockChan: make(chan timeoutInfo, tickTockBufferSize), timeoutParams: InitTimeoutParamsFromConfig(config), } // set function defaults (may be overwritten before calling Start) @@ -391,7 +353,7 @@ func (cs *ConsensusState) OnStart() error { // NOTE: we will get a build up of garbage go routines // firing on the tockChan until the receiveRoutine is started // to deal with them (by that point, at most one will be valid) - go cs.timeoutRoutine() + cs.timeoutTicker.Start() // we may have lost some votes if the process crashed // reload from consensus log to catchup @@ -413,13 +375,15 @@ func (cs *ConsensusState) OnStart() error { // timeoutRoutine: receive requests for timeouts on tickChan and fire timeouts on tockChan // receiveRoutine: serializes processing of proposoals, block parts, votes; coordinates state transitions func (cs *ConsensusState) startRoutines(maxSteps int) { - go cs.timeoutRoutine() + cs.timeoutTicker.Start() go cs.receiveRoutine(maxSteps) } func (cs *ConsensusState) OnStop() { cs.BaseService.OnStop() + cs.timeoutTicker.Stop() + // Make BaseService.Wait() wait until cs.wal.Wait() if cs.wal != nil && cs.IsRunning() { cs.wal.Wait() @@ -513,11 +477,9 @@ func (cs *ConsensusState) scheduleRound0(rs *RoundState) { cs.scheduleTimeout(sleepDuration, rs.Height, 0, RoundStepNewHeight) } -// Attempt to schedule a timeout by sending timeoutInfo on the tickChan. -// The timeoutRoutine is alwaya available to read from tickChan (it won't block). -// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step. +// Attempt to schedule a timeout (by sending timeoutInfo on the tickChan) func (cs *ConsensusState) scheduleTimeout(duration time.Duration, height, round int, step RoundStepType) { - cs.tickChan <- timeoutInfo{duration, height, round, step} + cs.timeoutTicker.ScheduleTimeout(timeoutInfo{duration, height, round, step}) } // send a msg into the receiveRoutine regarding our own proposal, block part, or vote @@ -634,61 +596,6 @@ func (cs *ConsensusState) newStep() { //----------------------------------------- // the main go routines -// the state machine sends on tickChan to start a new timer. -// timers are interupted and replaced by new ticks from later steps -// timeouts of 0 on the tickChan will be immediately relayed to the tockChan -func (cs *ConsensusState) timeoutRoutine() { - log.Debug("Starting timeout routine") - var ti timeoutInfo - for { - select { - case newti := <-cs.tickChan: - log.Debug("Received tick", "old_ti", ti, "new_ti", newti) - - // ignore tickers for old height/round/step - if newti.Height < ti.Height { - continue - } else if newti.Height == ti.Height { - if newti.Round < ti.Round { - continue - } else if newti.Round == ti.Round { - if ti.Step > 0 && newti.Step <= ti.Step { - // if we got here because we have all the votes, - // fire the tock now instead of waiting for the timeout - /*if skipTimeoutCommit(newti) { - cs.timeoutTicker.Stop() - go func(t timeoutInfo) { cs.tockChan <- t }(newti) - }*/ - continue - } - } - } - - ti = newti - - // if the newti has duration == 0, we relay to the tockChan immediately (no timeout) - if ti.Duration == time.Duration(0) { - go func(t timeoutInfo) { cs.tockChan <- t }(ti) - continue - } - - log.Debug("Scheduling timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) - cs.timeoutTicker.Stop() - cs.timeoutTicker.Reset(ti) - case <-cs.timeoutTicker.Chan(): - log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) - cs.timeoutTicker.Stop() - // go routine here gaurantees timeoutRoutine doesn't block. - // Determinism comes from playback in the receiveRoutine. - // We can eliminate it by merging the timeoutRoutine into receiveRoutine - // and managing the timeouts ourselves with a millisecond ticker - go func(t timeoutInfo) { cs.tockChan <- t }(ti) - case <-cs.Quit: - return - } - } -} - // a nice idea but probably more trouble than its worth func (cs *ConsensusState) stopTimer() { cs.timeoutTicker.Stop() @@ -720,7 +627,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { cs.wal.Save(mi) // handles proposals, block parts, votes cs.handleMsg(mi, rs) - case ti := <-cs.tockChan: + case ti := <-cs.timeoutTicker.Chan(): // tockChan: cs.wal.Save(ti) // if the timeout is relevant to the rs // go to the next step diff --git a/consensus/ticker.go b/consensus/ticker.go new file mode 100644 index 000000000..e4f389fa6 --- /dev/null +++ b/consensus/ticker.go @@ -0,0 +1,110 @@ +package consensus + +import ( + "time" + + . "github.com/tendermint/go-common" +) + +type TimeoutTicker interface { + Start() (bool, error) + Stop() bool + Chan() <-chan timeoutInfo // on which to receive a timeout + ScheduleTimeout(ti timeoutInfo) // reset the timer +} + +type timeoutTicker struct { + BaseService + + timer *time.Timer + tickChan chan timeoutInfo + tockChan chan timeoutInfo +} + +func NewTimeoutTicker() TimeoutTicker { + tt := &timeoutTicker{ + timer: time.NewTimer(0), + tickChan: make(chan timeoutInfo, tickTockBufferSize), + tockChan: make(chan timeoutInfo, tickTockBufferSize), + } + if !tt.timer.Stop() { + <-tt.timer.C + } + tt.BaseService = *NewBaseService(log, "TimeoutTicker", tt) + return tt +} + +func (t *timeoutTicker) OnStart() error { + t.BaseService.OnStart() + + go t.timeoutRoutine() + + return nil +} + +func (t *timeoutTicker) OnStop() { + t.BaseService.OnStop() +} + +func (t *timeoutTicker) Chan() <-chan timeoutInfo { + return t.tockChan +} + +// The timeoutRoutine is alwaya available to read from tickChan (it won't block). +// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step. +func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) { + t.tickChan <- ti +} + +// send on tickChan to start a new timer. +// timers are interupted and replaced by new ticks from later steps +// timeouts of 0 on the tickChan will be immediately relayed to the tockChan +func (t *timeoutTicker) timeoutRoutine() { + log.Debug("Starting timeout routine") + var ti timeoutInfo + for { + select { + case newti := <-t.tickChan: + log.Debug("Received tick", "old_ti", ti, "new_ti", newti) + + // ignore tickers for old height/round/step + if newti.Height < ti.Height { + continue + } else if newti.Height == ti.Height { + if newti.Round < ti.Round { + continue + } else if newti.Round == ti.Round { + if ti.Step > 0 && newti.Step <= ti.Step { + continue + } + } + } + + ti = newti + + // if the newti has duration == 0, we relay to the tockChan immediately (no timeout) + if ti.Duration == time.Duration(0) { + go func(toi timeoutInfo) { t.tockChan <- toi }(ti) + continue + } + + log.Debug("Scheduling timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) + if !t.timer.Stop() { + select { + case <-t.timer.C: + default: + } + } + t.timer.Reset(ti.Duration) + case <-t.timer.C: + log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) + // go routine here gaurantees timeoutRoutine doesn't block. + // Determinism comes from playback in the receiveRoutine. + // We can eliminate it by merging the timeoutRoutine into receiveRoutine + // and managing the timeouts ourselves with a millisecond ticker + go func(toi timeoutInfo) { t.tockChan <- toi }(ti) + case <-t.Quit: + return + } + } +}