You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

134 lines
3.8 KiB

package consensus
import (
"time"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
)
var (
tickTockBufferSize = 10
)
// TimeoutTicker is a timer that schedules timeouts
// conditional on the height/round/step in the timeoutInfo.
// The timeoutInfo.Duration may be non-positive.
type TimeoutTicker interface {
Start() error
Stop() error
Chan() <-chan timeoutInfo // on which to receive a timeout
ScheduleTimeout(ti timeoutInfo) // reset the timer
SetLogger(log.Logger)
}
// timeoutTicker wraps time.Timer,
// scheduling timeouts only for greater height/round/step
// than what it's already seen.
// Timeouts are scheduled along the tickChan,
// and fired on the tockChan.
type timeoutTicker struct {
cmn.BaseService
timer *time.Timer
tickChan chan timeoutInfo // for scheduling timeouts
tockChan chan timeoutInfo // for notifying about them
}
// NewTimeoutTicker returns a new TimeoutTicker.
func NewTimeoutTicker() TimeoutTicker {
tt := &timeoutTicker{
timer: time.NewTimer(0),
tickChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo, tickTockBufferSize),
}
tt.BaseService = *cmn.NewBaseService(nil, "TimeoutTicker", tt)
tt.stopTimer() // don't want to fire until the first scheduled timeout
return tt
}
// OnStart implements cmn.Service. It starts the timeout routine.
func (t *timeoutTicker) OnStart() error {
go t.timeoutRoutine()
return nil
}
// OnStop implements cmn.Service. It stops the timeout routine.
func (t *timeoutTicker) OnStop() {
t.BaseService.OnStop()
t.stopTimer()
}
// Chan returns a channel on which timeouts are sent.
func (t *timeoutTicker) Chan() <-chan timeoutInfo {
return t.tockChan
}
// ScheduleTimeout schedules a new timeout by sending on the internal tickChan.
// The timeoutRoutine is alwaya available to read from tickChan, so this won't block.
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) {
t.tickChan <- ti
}
//-------------------------------------------------------------
// stop the timer and drain if necessary
func (t *timeoutTicker) stopTimer() {
// Stop() returns false if it was already fired or was stopped
if !t.timer.Stop() {
select {
case <-t.timer.C:
default:
t.Logger.Debug("Timer already stopped")
}
}
}
// send on tickChan to start a new timer.
// timers are interupted and replaced by new ticks from later steps
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
func (t *timeoutTicker) timeoutRoutine() {
t.Logger.Debug("Starting timeout routine")
var ti timeoutInfo
for {
select {
case newti := <-t.tickChan:
t.Logger.Debug("Received tick", "old_ti", ti, "new_ti", newti)
// ignore tickers for old height/round/step
if newti.Height < ti.Height {
continue
} else if newti.Height == ti.Height {
if newti.Round < ti.Round {
continue
} else if newti.Round == ti.Round {
if ti.Step > 0 && newti.Step <= ti.Step {
continue
}
}
}
// stop the last timer
t.stopTimer()
// update timeoutInfo and reset timer
// NOTE time.Timer allows duration to be non-positive
ti = newti
t.timer.Reset(ti.Duration)
t.Logger.Debug("Scheduled timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
case <-t.timer.C:
t.Logger.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
// go routine here guarantees timeoutRoutine doesn't block.
// Determinism comes from playback in the receiveRoutine.
// We can eliminate it by merging the timeoutRoutine into receiveRoutine
// and managing the timeouts ourselves with a millisecond ticker
go func(toi timeoutInfo) { t.tockChan <- toi }(ti)
case <-t.Quit:
return
}
}
}