Browse Source

consensus: mv timeoutRoutine into TimeoutTicker

pull/342/head
Ethan Buchman 8 years ago
parent
commit
40b08f2494
4 changed files with 146 additions and 123 deletions
  1. +15
    -11
      consensus/common_test.go
  2. +10
    -8
      consensus/replay.go
  3. +11
    -104
      consensus/state.go
  4. +110
    -0
      consensus/ticker.go

+ 15
- 11
consensus/common_test.go View File

@ -387,40 +387,44 @@ func crankTimeoutPropose(config cfg.Config) {
func newMockTickerFunc(onlyOnce bool) func() TimeoutTicker {
return func() TimeoutTicker {
return &mockTicker{
c: make(chan time.Time, 10),
c: make(chan timeoutInfo, 10),
onlyOnce: onlyOnce,
}
}
}
// mock ticker only fires for NewStepRound (timeout commit),
// mock ticker only fires once
// and only once if onlyOnce=true
type mockTicker struct {
c chan time.Time
c chan timeoutInfo
mtx sync.Mutex
onlyOnce bool
fired bool
}
func (m *mockTicker) Stop() {
func (m *mockTicker) Start() (bool, error) {
return true, nil
}
func (m *mockTicker) Reset(ti timeoutInfo) {
func (m *mockTicker) Stop() bool {
return true
}
func (m *mockTicker) ScheduleTimeout(ti timeoutInfo) {
m.mtx.Lock()
defer m.mtx.Unlock()
if m.onlyOnce && m.fired {
return
}
if ti.Step == RoundStepNewHeight {
m.Fire()
m.c <- ti
m.fired = true
}
}
func (m *mockTicker) Chan() <-chan time.Time {
func (m *mockTicker) Chan() <-chan timeoutInfo {
return m.c
}
func (m *mockTicker) Fire() {
m.c <- time.Now()
}
//------------------------------------

+ 10
- 8
consensus/replay.go View File

@ -246,15 +246,17 @@ func (cs *ConsensusState) startForReplay() {
cs.BaseService.OnStart()
// since we replay tocks we just ignore ticks
go func() {
for {
select {
case <-cs.tickChan:
case <-cs.Quit:
return
// TODO:!
/*
go func() {
for {
select {
case <-cs.tickChan:
case <-cs.Quit:
return
}
}
}
}()
}()*/
}
// console function for parsing input and running commands


+ 11
- 104
consensus/state.go View File

@ -232,12 +232,10 @@ type ConsensusState struct {
RoundState
state *sm.State // State until height-1.
peerMsgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes)
internalMsgQueue chan msgInfo // like peerMsgQueue but for our own proposals, parts, votes
timeoutTicker TimeoutTicker // ticker for timeouts
tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine
tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine
timeoutParams *TimeoutParams // parameters and functions for timeout intervals
peerMsgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes)
internalMsgQueue chan msgInfo // like peerMsgQueue but for our own proposals, parts, votes
timeoutTicker TimeoutTicker // ticker for timeouts
timeoutParams *TimeoutParams // parameters and functions for timeout intervals
evsw types.EventSwitch
@ -252,40 +250,6 @@ type ConsensusState struct {
setProposal func(proposal *types.Proposal) error
}
func NewTimeoutTicker() TimeoutTicker {
return &timeoutTicker{ticker: new(time.Ticker)}
}
type TimeoutTicker interface {
Chan() <-chan time.Time // on which to receive a timeout
Stop() // stop the timer
Reset(ti timeoutInfo) // reset the timer
}
type timeoutTicker struct {
ticker *time.Ticker
}
func (t *timeoutTicker) Chan() <-chan time.Time {
return t.ticker.C
}
func (t *timeoutTicker) Stop() {
t.ticker.Stop()
}
func (t *timeoutTicker) Reset(ti timeoutInfo) {
t.ticker = time.NewTicker(ti.Duration)
}
func skipTimeoutCommit(ti timeoutInfo) bool {
if ti.Step == RoundStepNewHeight &&
ti.Duration == time.Duration(0) {
return true
}
return false
}
func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
cs := &ConsensusState{
config: config,
@ -295,8 +259,6 @@ func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.Ap
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
timeoutTicker: NewTimeoutTicker(),
tickChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo, tickTockBufferSize),
timeoutParams: InitTimeoutParamsFromConfig(config),
}
// set function defaults (may be overwritten before calling Start)
@ -391,7 +353,7 @@ func (cs *ConsensusState) OnStart() error {
// NOTE: we will get a build up of garbage go routines
// firing on the tockChan until the receiveRoutine is started
// to deal with them (by that point, at most one will be valid)
go cs.timeoutRoutine()
cs.timeoutTicker.Start()
// we may have lost some votes if the process crashed
// reload from consensus log to catchup
@ -413,13 +375,15 @@ func (cs *ConsensusState) OnStart() error {
// timeoutRoutine: receive requests for timeouts on tickChan and fire timeouts on tockChan
// receiveRoutine: serializes processing of proposoals, block parts, votes; coordinates state transitions
func (cs *ConsensusState) startRoutines(maxSteps int) {
go cs.timeoutRoutine()
cs.timeoutTicker.Start()
go cs.receiveRoutine(maxSteps)
}
func (cs *ConsensusState) OnStop() {
cs.BaseService.OnStop()
cs.timeoutTicker.Stop()
// Make BaseService.Wait() wait until cs.wal.Wait()
if cs.wal != nil && cs.IsRunning() {
cs.wal.Wait()
@ -513,11 +477,9 @@ func (cs *ConsensusState) scheduleRound0(rs *RoundState) {
cs.scheduleTimeout(sleepDuration, rs.Height, 0, RoundStepNewHeight)
}
// Attempt to schedule a timeout by sending timeoutInfo on the tickChan.
// The timeoutRoutine is alwaya available to read from tickChan (it won't block).
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
// Attempt to schedule a timeout (by sending timeoutInfo on the tickChan)
func (cs *ConsensusState) scheduleTimeout(duration time.Duration, height, round int, step RoundStepType) {
cs.tickChan <- timeoutInfo{duration, height, round, step}
cs.timeoutTicker.ScheduleTimeout(timeoutInfo{duration, height, round, step})
}
// send a msg into the receiveRoutine regarding our own proposal, block part, or vote
@ -634,61 +596,6 @@ func (cs *ConsensusState) newStep() {
//-----------------------------------------
// the main go routines
// the state machine sends on tickChan to start a new timer.
// timers are interupted and replaced by new ticks from later steps
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
func (cs *ConsensusState) timeoutRoutine() {
log.Debug("Starting timeout routine")
var ti timeoutInfo
for {
select {
case newti := <-cs.tickChan:
log.Debug("Received tick", "old_ti", ti, "new_ti", newti)
// ignore tickers for old height/round/step
if newti.Height < ti.Height {
continue
} else if newti.Height == ti.Height {
if newti.Round < ti.Round {
continue
} else if newti.Round == ti.Round {
if ti.Step > 0 && newti.Step <= ti.Step {
// if we got here because we have all the votes,
// fire the tock now instead of waiting for the timeout
/*if skipTimeoutCommit(newti) {
cs.timeoutTicker.Stop()
go func(t timeoutInfo) { cs.tockChan <- t }(newti)
}*/
continue
}
}
}
ti = newti
// if the newti has duration == 0, we relay to the tockChan immediately (no timeout)
if ti.Duration == time.Duration(0) {
go func(t timeoutInfo) { cs.tockChan <- t }(ti)
continue
}
log.Debug("Scheduling timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
cs.timeoutTicker.Stop()
cs.timeoutTicker.Reset(ti)
case <-cs.timeoutTicker.Chan():
log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
cs.timeoutTicker.Stop()
// go routine here gaurantees timeoutRoutine doesn't block.
// Determinism comes from playback in the receiveRoutine.
// We can eliminate it by merging the timeoutRoutine into receiveRoutine
// and managing the timeouts ourselves with a millisecond ticker
go func(t timeoutInfo) { cs.tockChan <- t }(ti)
case <-cs.Quit:
return
}
}
}
// a nice idea but probably more trouble than its worth
func (cs *ConsensusState) stopTimer() {
cs.timeoutTicker.Stop()
@ -720,7 +627,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
cs.wal.Save(mi)
// handles proposals, block parts, votes
cs.handleMsg(mi, rs)
case ti := <-cs.tockChan:
case ti := <-cs.timeoutTicker.Chan(): // tockChan:
cs.wal.Save(ti)
// if the timeout is relevant to the rs
// go to the next step


+ 110
- 0
consensus/ticker.go View File

@ -0,0 +1,110 @@
package consensus
import (
"time"
. "github.com/tendermint/go-common"
)
type TimeoutTicker interface {
Start() (bool, error)
Stop() bool
Chan() <-chan timeoutInfo // on which to receive a timeout
ScheduleTimeout(ti timeoutInfo) // reset the timer
}
type timeoutTicker struct {
BaseService
timer *time.Timer
tickChan chan timeoutInfo
tockChan chan timeoutInfo
}
func NewTimeoutTicker() TimeoutTicker {
tt := &timeoutTicker{
timer: time.NewTimer(0),
tickChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo, tickTockBufferSize),
}
if !tt.timer.Stop() {
<-tt.timer.C
}
tt.BaseService = *NewBaseService(log, "TimeoutTicker", tt)
return tt
}
func (t *timeoutTicker) OnStart() error {
t.BaseService.OnStart()
go t.timeoutRoutine()
return nil
}
func (t *timeoutTicker) OnStop() {
t.BaseService.OnStop()
}
func (t *timeoutTicker) Chan() <-chan timeoutInfo {
return t.tockChan
}
// The timeoutRoutine is alwaya available to read from tickChan (it won't block).
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) {
t.tickChan <- ti
}
// send on tickChan to start a new timer.
// timers are interupted and replaced by new ticks from later steps
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
func (t *timeoutTicker) timeoutRoutine() {
log.Debug("Starting timeout routine")
var ti timeoutInfo
for {
select {
case newti := <-t.tickChan:
log.Debug("Received tick", "old_ti", ti, "new_ti", newti)
// ignore tickers for old height/round/step
if newti.Height < ti.Height {
continue
} else if newti.Height == ti.Height {
if newti.Round < ti.Round {
continue
} else if newti.Round == ti.Round {
if ti.Step > 0 && newti.Step <= ti.Step {
continue
}
}
}
ti = newti
// if the newti has duration == 0, we relay to the tockChan immediately (no timeout)
if ti.Duration == time.Duration(0) {
go func(toi timeoutInfo) { t.tockChan <- toi }(ti)
continue
}
log.Debug("Scheduling timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
if !t.timer.Stop() {
select {
case <-t.timer.C:
default:
}
}
t.timer.Reset(ti.Duration)
case <-t.timer.C:
log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
// go routine here gaurantees timeoutRoutine doesn't block.
// Determinism comes from playback in the receiveRoutine.
// We can eliminate it by merging the timeoutRoutine into receiveRoutine
// and managing the timeouts ourselves with a millisecond ticker
go func(toi timeoutInfo) { t.tockChan <- toi }(ti)
case <-t.Quit:
return
}
}
}

Loading…
Cancel
Save