Browse Source

consensus: let time.Timer handle non-positive durations

pull/344/merge
Ethan Buchman 8 years ago
parent
commit
55b4bfa1fe
4 changed files with 35 additions and 22 deletions
  1. +2
    -2
      consensus/replay.go
  2. +0
    -3
      consensus/state.go
  3. +3
    -0
      consensus/test_data/build.sh
  4. +30
    -17
      consensus/ticker.go

+ 2
- 2
consensus/replay.go View File

@ -246,9 +246,9 @@ func (cs *ConsensusState) startForReplay() {
// don't want to start full cs // don't want to start full cs
cs.BaseService.OnStart() cs.BaseService.OnStart()
log.Warn("Replay commands are disabled until someone updates them and writes tests")
/* TODO:!
// since we replay tocks we just ignore ticks // since we replay tocks we just ignore ticks
// TODO:!
/*
go func() { go func() {
for { for {
select { select {


+ 0
- 3
consensus/state.go View File

@ -488,9 +488,6 @@ func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) {
func (cs *ConsensusState) scheduleRound0(rs *RoundState) { func (cs *ConsensusState) scheduleRound0(rs *RoundState) {
//log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime) //log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime)
sleepDuration := rs.StartTime.Sub(time.Now()) sleepDuration := rs.StartTime.Sub(time.Now())
if sleepDuration < time.Duration(0) {
sleepDuration = time.Duration(0)
}
cs.scheduleTimeout(sleepDuration, rs.Height, 0, RoundStepNewHeight) cs.scheduleTimeout(sleepDuration, rs.Height, 0, RoundStepNewHeight)
} }


+ 3
- 0
consensus/test_data/build.sh View File

@ -22,6 +22,9 @@ tendermint node --proxy_app=dummy &> /dev/null &
sleep 5 sleep 5
killall tendermint killall tendermint
# /q would print up to and including the match, then quit.
# /Q doesn't include the match.
# http://unix.stackexchange.com/questions/11305/grep-show-all-the-file-up-to-the-match
sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/empty_block.cswal sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/empty_block.cswal
reset reset


+ 30
- 17
consensus/ticker.go View File

@ -10,6 +10,9 @@ var (
tickTockBufferSize = 10 tickTockBufferSize = 10
) )
// TimeoutTicker is a timer that schedules timeouts
// conditional on the height/round/step in the timeoutInfo.
// The timeoutInfo.Duration may be non-positive.
type TimeoutTicker interface { type TimeoutTicker interface {
Start() (bool, error) Start() (bool, error)
Stop() bool Stop() bool
@ -17,6 +20,11 @@ type TimeoutTicker interface {
ScheduleTimeout(ti timeoutInfo) // reset the timer ScheduleTimeout(ti timeoutInfo) // reset the timer
} }
// timeoutTicker wraps time.Timer,
// scheduling timeouts only for greater height/round/step
// than what it's already seen.
// Timeouts are scheduled along the tickChan,
// and fired on the tockChan.
type timeoutTicker struct { type timeoutTicker struct {
BaseService BaseService
@ -31,9 +39,7 @@ func NewTimeoutTicker() TimeoutTicker {
tickChan: make(chan timeoutInfo, tickTockBufferSize), tickChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo, tickTockBufferSize), tockChan: make(chan timeoutInfo, tickTockBufferSize),
} }
if !tt.timer.Stop() {
<-tt.timer.C
}
tt.stopTimer() // don't want to fire until the first scheduled timeout
tt.BaseService = *NewBaseService(log, "TimeoutTicker", tt) tt.BaseService = *NewBaseService(log, "TimeoutTicker", tt)
return tt return tt
} }
@ -48,6 +54,7 @@ func (t *timeoutTicker) OnStart() error {
func (t *timeoutTicker) OnStop() { func (t *timeoutTicker) OnStop() {
t.BaseService.OnStop() t.BaseService.OnStop()
t.stopTimer()
} }
func (t *timeoutTicker) Chan() <-chan timeoutInfo { func (t *timeoutTicker) Chan() <-chan timeoutInfo {
@ -60,6 +67,20 @@ func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) {
t.tickChan <- ti t.tickChan <- ti
} }
//-------------------------------------------------------------
// stop the timer and drain if necessary
func (t *timeoutTicker) stopTimer() {
// Stop() returns false if it was already fired or was stopped
if !t.timer.Stop() {
select {
case <-t.timer.C:
default:
log.Debug("Timer already stopped")
}
}
}
// send on tickChan to start a new timer. // send on tickChan to start a new timer.
// timers are interupted and replaced by new ticks from later steps // timers are interupted and replaced by new ticks from later steps
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan // timeouts of 0 on the tickChan will be immediately relayed to the tockChan
@ -84,22 +105,14 @@ func (t *timeoutTicker) timeoutRoutine() {
} }
} }
ti = newti
// if the newti has duration == 0, we relay to the tockChan immediately (no timeout)
if ti.Duration == time.Duration(0) {
go func(toi timeoutInfo) { t.tockChan <- toi }(ti)
continue
}
// stop the last timer
t.stopTimer()
log.Debug("Scheduling timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
if !t.timer.Stop() {
select {
case <-t.timer.C:
default:
}
}
// update timeoutInfo and reset timer
// NOTE time.Timer allows duration to be non-positive
ti = newti
t.timer.Reset(ti.Duration) t.timer.Reset(ti.Duration)
log.Debug("Scheduled timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
case <-t.timer.C: case <-t.timer.C:
log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
// go routine here gaurantees timeoutRoutine doesn't block. // go routine here gaurantees timeoutRoutine doesn't block.


Loading…
Cancel
Save