package consensus
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
"github.com/tendermint/tendermint/libs/service"
|
|
)
|
|
|
|
var (
|
|
tickTockBufferSize = 10
|
|
)
|
|
|
|
// TimeoutTicker is a timer that schedules timeouts
|
|
// conditional on the height/round/step in the timeoutInfo.
|
|
// The timeoutInfo.Duration may be non-positive.
|
|
type TimeoutTicker interface {
|
|
Start(context.Context) error
|
|
Stop() error
|
|
IsRunning() bool
|
|
Chan() <-chan timeoutInfo // on which to receive a timeout
|
|
ScheduleTimeout(ti timeoutInfo) // reset the timer
|
|
}
|
|
|
|
// timeoutTicker wraps time.Timer,
|
|
// scheduling timeouts only for greater height/round/step
|
|
// than what it's already seen.
|
|
// Timeouts are scheduled along the tickChan,
|
|
// and fired on the tockChan.
|
|
type timeoutTicker struct {
|
|
service.BaseService
|
|
logger log.Logger
|
|
|
|
timer *time.Timer
|
|
tickChan chan timeoutInfo // for scheduling timeouts
|
|
tockChan chan timeoutInfo // for notifying about them
|
|
}
|
|
|
|
// NewTimeoutTicker returns a new TimeoutTicker.
|
|
func NewTimeoutTicker(logger log.Logger) TimeoutTicker {
|
|
tt := &timeoutTicker{
|
|
logger: logger,
|
|
timer: time.NewTimer(0),
|
|
tickChan: make(chan timeoutInfo, tickTockBufferSize),
|
|
tockChan: make(chan timeoutInfo, tickTockBufferSize),
|
|
}
|
|
tt.BaseService = *service.NewBaseService(logger, "TimeoutTicker", tt)
|
|
tt.stopTimer() // don't want to fire until the first scheduled timeout
|
|
return tt
|
|
}
|
|
|
|
// OnStart implements service.Service. It starts the timeout routine.
|
|
func (t *timeoutTicker) OnStart(ctx context.Context) error {
|
|
go t.timeoutRoutine(ctx)
|
|
|
|
return nil
|
|
}
|
|
|
|
// OnStop implements service.Service. It stops the timeout routine.
|
|
func (t *timeoutTicker) OnStop() { t.stopTimer() }
|
|
|
|
// Chan returns a channel on which timeouts are sent.
|
|
func (t *timeoutTicker) Chan() <-chan timeoutInfo {
|
|
return t.tockChan
|
|
}
|
|
|
|
// ScheduleTimeout schedules a new timeout by sending on the internal tickChan.
|
|
// The timeoutRoutine is always available to read from tickChan, so this won't block.
|
|
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
|
|
func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) {
|
|
t.tickChan <- ti
|
|
}
|
|
|
|
//-------------------------------------------------------------
|
|
|
|
// stop the timer and drain if necessary
|
|
func (t *timeoutTicker) stopTimer() {
|
|
// Stop() returns false if it was already fired or was stopped
|
|
if !t.timer.Stop() {
|
|
select {
|
|
case <-t.timer.C:
|
|
default:
|
|
t.logger.Debug("Timer already stopped")
|
|
}
|
|
}
|
|
}
|
|
|
|
// send on tickChan to start a new timer.
|
|
// timers are interupted and replaced by new ticks from later steps
|
|
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
|
|
func (t *timeoutTicker) timeoutRoutine(ctx context.Context) {
|
|
t.logger.Debug("Starting timeout routine")
|
|
var ti timeoutInfo
|
|
for {
|
|
select {
|
|
case newti := <-t.tickChan:
|
|
t.logger.Debug("Received tick", "old_ti", ti, "new_ti", newti)
|
|
|
|
// ignore tickers for old height/round/step
|
|
if newti.Height < ti.Height {
|
|
continue
|
|
} else if newti.Height == ti.Height {
|
|
if newti.Round < ti.Round {
|
|
continue
|
|
} else if newti.Round == ti.Round {
|
|
if ti.Step > 0 && newti.Step <= ti.Step {
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// stop the last timer
|
|
t.stopTimer()
|
|
|
|
// update timeoutInfo and reset timer
|
|
// NOTE time.Timer allows duration to be non-positive
|
|
ti = newti
|
|
t.timer.Reset(ti.Duration)
|
|
t.logger.Debug("Scheduled timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
|
|
case <-t.timer.C:
|
|
t.logger.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
|
|
// go routine here guarantees timeoutRoutine doesn't block.
|
|
// Determinism comes from playback in the receiveRoutine.
|
|
// We can eliminate it by merging the timeoutRoutine into receiveRoutine
|
|
// and managing the timeouts ourselves with a millisecond ticker
|
|
go func(toi timeoutInfo) {
|
|
select {
|
|
case t.tockChan <- toi:
|
|
case <-ctx.Done():
|
|
}
|
|
}(ti)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|