Browse Source

consensus: TimeoutTicker, skip TimeoutCommit on HasAll

pull/342/head
Ethan Buchman 8 years ago
parent
commit
faf23aa0d4
5 changed files with 122 additions and 23 deletions
  1. +4
    -1
      consensus/byzantine_test.go
  2. +47
    -2
      consensus/common_test.go
  3. +2
    -2
      consensus/reactor_test.go
  4. +65
    -18
      consensus/state.go
  5. +4
    -0
      types/vote_set.go

+ 4
- 1
consensus/byzantine_test.go View File

@ -29,7 +29,10 @@ func init() {
// Heal partition and ensure A sees the commit // Heal partition and ensure A sees the commit
func TestByzantine(t *testing.T) { func TestByzantine(t *testing.T) {
N := 4 N := 4
css := randConsensusNet(N, "consensus_byzantine_test", crankTimeoutPropose)
css := randConsensusNet(N, "consensus_byzantine_test", crankTimeoutPropose, newMockTickerFunc(false))
// give the byzantine validator a normal ticker
css[0].SetTimeoutTicker(NewTimeoutTicker())
switches := make([]*p2p.Switch, N) switches := make([]*p2p.Switch, N)
for i := 0; i < N; i++ { for i := 0; i < N; i++ {


+ 47
- 2
consensus/common_test.go View File

@ -257,7 +257,7 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
return cs, vss return cs, vss
} }
func randConsensusNet(nValidators int, testName string, updateConfig func(cfg.Config)) []*ConsensusState {
func randConsensusNet(nValidators int, testName string, updateConfig func(cfg.Config), tickerFunc func() TimeoutTicker) []*ConsensusState {
genDoc, privVals := randGenesisDoc(nValidators, false, 10) genDoc, privVals := randGenesisDoc(nValidators, false, 10)
css := make([]*ConsensusState, nValidators) css := make([]*ConsensusState, nValidators)
for i := 0; i < nValidators; i++ { for i := 0; i < nValidators; i++ {
@ -268,12 +268,13 @@ func randConsensusNet(nValidators int, testName string, updateConfig func(cfg.Co
updateConfig(thisConfig) updateConfig(thisConfig)
EnsureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal EnsureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal
css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], counter.NewCounterApplication(true)) css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], counter.NewCounterApplication(true))
css[i].SetTimeoutTicker(tickerFunc())
} }
return css return css
} }
// nPeers = nValidators + nNotValidator // nPeers = nValidators + nNotValidator
func randConsensusNetWithPeers(nValidators, nPeers int, testName string, updateConfig func(cfg.Config)) []*ConsensusState {
func randConsensusNetWithPeers(nValidators, nPeers int, testName string, updateConfig func(cfg.Config), tickerFunc func() TimeoutTicker) []*ConsensusState {
genDoc, privVals := randGenesisDoc(nValidators, false, int64(testMinPower)) genDoc, privVals := randGenesisDoc(nValidators, false, int64(testMinPower))
css := make([]*ConsensusState, nPeers) css := make([]*ConsensusState, nPeers)
for i := 0; i < nPeers; i++ { for i := 0; i < nPeers; i++ {
@ -294,6 +295,7 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, updateC
dir, _ := ioutil.TempDir("/tmp", "persistent-dummy") dir, _ := ioutil.TempDir("/tmp", "persistent-dummy")
css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, dummy.NewPersistentDummyApplication(dir)) css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, dummy.NewPersistentDummyApplication(dir))
css[i].SetTimeoutTicker(tickerFunc())
} }
return css return css
} }
@ -379,3 +381,46 @@ func crankTimeoutPropose(config cfg.Config) {
config.Set("timeout_propose", 110000) // TODO: crank it to eleventy config.Set("timeout_propose", 110000) // TODO: crank it to eleventy
config.Set("timeout_commit", 1000) config.Set("timeout_commit", 1000)
} }
//------------------------------------
func newMockTickerFunc(onlyOnce bool) func() TimeoutTicker {
return func() TimeoutTicker {
return &mockTicker{
c: make(chan time.Time, 10),
onlyOnce: onlyOnce,
}
}
}
// mock ticker only fires for NewStepRound (timeout commit),
// and only once if onlyOnce=true
type mockTicker struct {
c chan time.Time
onlyOnce bool
fired bool
}
func (m *mockTicker) Stop() {
}
func (m *mockTicker) Reset(ti timeoutInfo) {
if m.onlyOnce && m.fired {
return
}
if ti.Step == RoundStepNewHeight {
m.Fire()
m.fired = true
}
}
func (m *mockTicker) Chan() <-chan time.Time {
return m.c
}
func (m *mockTicker) Fire() {
m.c <- time.Now()
}
//------------------------------------

+ 2
- 2
consensus/reactor_test.go View File

@ -24,7 +24,7 @@ func init() {
// Ensure a testnet makes blocks // Ensure a testnet makes blocks
func TestReactor(t *testing.T) { func TestReactor(t *testing.T) {
N := 4 N := 4
css := randConsensusNet(N, "consensus_reactor_test", crankTimeoutPropose)
css := randConsensusNet(N, "consensus_reactor_test", crankTimeoutPropose, newMockTickerFunc(true))
reactors := make([]*ConsensusReactor, N) reactors := make([]*ConsensusReactor, N)
eventChans := make([]chan interface{}, N) eventChans := make([]chan interface{}, N)
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
@ -58,7 +58,7 @@ func TestReactor(t *testing.T) {
func TestValidatorSetChanges(t *testing.T) { func TestValidatorSetChanges(t *testing.T) {
nPeers := 8 nPeers := 8
nVals := 4 nVals := 4
css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", crankTimeoutPropose)
css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", crankTimeoutPropose, newMockTickerFunc(true))
reactors := make([]*ConsensusReactor, nPeers) reactors := make([]*ConsensusReactor, nPeers)
eventChans := make([]chan interface{}, nPeers) eventChans := make([]chan interface{}, nPeers)
for i := 0; i < nPeers; i++ { for i := 0; i < nPeers; i++ {


+ 65
- 18
consensus/state.go View File

@ -234,7 +234,7 @@ type ConsensusState struct {
peerMsgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes) peerMsgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes)
internalMsgQueue chan msgInfo // like peerMsgQueue but for our own proposals, parts, votes internalMsgQueue chan msgInfo // like peerMsgQueue but for our own proposals, parts, votes
timeoutTicker *time.Ticker // ticker for timeouts
timeoutTicker TimeoutTicker // ticker for timeouts
tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine
tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine
timeoutParams *TimeoutParams // parameters and functions for timeout intervals timeoutParams *TimeoutParams // parameters and functions for timeout intervals
@ -252,6 +252,40 @@ type ConsensusState struct {
setProposal func(proposal *types.Proposal) error setProposal func(proposal *types.Proposal) error
} }
func NewTimeoutTicker() TimeoutTicker {
return &timeoutTicker{ticker: new(time.Ticker)}
}
type TimeoutTicker interface {
Chan() <-chan time.Time // on which to receive a timeout
Stop() // stop the timer
Reset(ti timeoutInfo) // reset the timer
}
type timeoutTicker struct {
ticker *time.Ticker
}
func (t *timeoutTicker) Chan() <-chan time.Time {
return t.ticker.C
}
func (t *timeoutTicker) Stop() {
t.ticker.Stop()
}
func (t *timeoutTicker) Reset(ti timeoutInfo) {
t.ticker = time.NewTicker(ti.Duration)
}
func skipTimeoutCommit(ti timeoutInfo) bool {
if ti.Step == RoundStepNewHeight &&
ti.Duration == time.Duration(0) {
return true
}
return false
}
func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
cs := &ConsensusState{ cs := &ConsensusState{
config: config, config: config,
@ -260,7 +294,7 @@ func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.Ap
mempool: mempool, mempool: mempool,
peerMsgQueue: make(chan msgInfo, msgQueueSize), peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize),
timeoutTicker: new(time.Ticker),
timeoutTicker: NewTimeoutTicker(),
tickChan: make(chan timeoutInfo, tickTockBufferSize), tickChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo, tickTockBufferSize), tockChan: make(chan timeoutInfo, tickTockBufferSize),
timeoutParams: InitTimeoutParamsFromConfig(config), timeoutParams: InitTimeoutParamsFromConfig(config),
@ -321,6 +355,13 @@ func (cs *ConsensusState) SetPrivValidator(priv PrivValidator) {
cs.privValidator = priv cs.privValidator = priv
} }
// Set the local timer
func (cs *ConsensusState) SetTimeoutTicker(timeoutTicker TimeoutTicker) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.timeoutTicker = timeoutTicker
}
func (cs *ConsensusState) LoadCommit(height int) *types.Commit { func (cs *ConsensusState) LoadCommit(height int) *types.Commit {
cs.mtx.Lock() cs.mtx.Lock()
defer cs.mtx.Unlock() defer cs.mtx.Unlock()
@ -562,7 +603,6 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
} else { } else {
cs.StartTime = cs.timeoutParams.Commit(cs.CommitTime) cs.StartTime = cs.timeoutParams.Commit(cs.CommitTime)
} }
cs.CommitTime = time.Time{}
cs.Validators = validators cs.Validators = validators
cs.Proposal = nil cs.Proposal = nil
cs.ProposalBlock = nil cs.ProposalBlock = nil
@ -613,6 +653,12 @@ func (cs *ConsensusState) timeoutRoutine() {
continue continue
} else if newti.Round == ti.Round { } else if newti.Round == ti.Round {
if ti.Step > 0 && newti.Step <= ti.Step { if ti.Step > 0 && newti.Step <= ti.Step {
// if we got here because we have all the votes,
// fire the tock now instead of waiting for the timeout
if skipTimeoutCommit(newti) {
cs.timeoutTicker.Stop()
go func(t timeoutInfo) { cs.tockChan <- t }(newti)
}
continue continue
} }
} }
@ -628,8 +674,8 @@ func (cs *ConsensusState) timeoutRoutine() {
log.Debug("Scheduling timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) log.Debug("Scheduling timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
cs.timeoutTicker.Stop() cs.timeoutTicker.Stop()
cs.timeoutTicker = time.NewTicker(ti.Duration)
case <-cs.timeoutTicker.C:
cs.timeoutTicker.Reset(ti)
case <-cs.timeoutTicker.Chan():
log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
cs.timeoutTicker.Stop() cs.timeoutTicker.Stop()
// go routine here gaurantees timeoutRoutine doesn't block. // go routine here gaurantees timeoutRoutine doesn't block.
@ -681,17 +727,9 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
cs.handleTimeout(ti, rs) cs.handleTimeout(ti, rs)
case <-cs.Quit: case <-cs.Quit:
// drain the internalMsgQueue in case we eg. signed a proposal but it didn't hit the wal
FLUSH:
for {
select {
case mi = <-cs.internalMsgQueue:
cs.wal.Save(mi)
cs.handleMsg(mi, rs)
default:
break FLUSH
}
}
// NOTE: the internalMsgQueue may have signed messages from our
// priv_val that haven't hit the WAL, but its ok because
// priv_val tracks LastSig
// close wal now that we're done writing to it // close wal now that we're done writing to it
if cs.wal != nil { if cs.wal != nil {
@ -758,7 +796,7 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
switch ti.Step { switch ti.Step {
case RoundStepNewHeight: case RoundStepNewHeight:
// NewRound event fired from enterNewRound. // NewRound event fired from enterNewRound.
// XXX: should we fire timeout here?
// XXX: should we fire timeout here (for timeout commit)?
cs.enterNewRound(ti.Height, 0) cs.enterNewRound(ti.Height, 0)
case RoundStepPropose: case RoundStepPropose:
types.FireEventTimeoutPropose(cs.evsw, cs.RoundStateEvent()) types.FireEventTimeoutPropose(cs.evsw, cs.RoundStateEvent())
@ -1171,6 +1209,7 @@ func (cs *ConsensusState) enterCommit(height int, commitRound int) {
// keep cs.Round the same, commitRound points to the right Precommits set. // keep cs.Round the same, commitRound points to the right Precommits set.
cs.updateRoundStep(cs.Round, RoundStepCommit) cs.updateRoundStep(cs.Round, RoundStepCommit)
cs.CommitRound = commitRound cs.CommitRound = commitRound
cs.CommitTime = time.Now()
cs.newStep() cs.newStep()
// Maybe finalize immediately. // Maybe finalize immediately.
@ -1416,6 +1455,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool,
log.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "csHeight", cs.Height) log.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "csHeight", cs.Height)
// A precommit for the previous height? // A precommit for the previous height?
// These come in while we wait timeoutCommit
if vote.Height+1 == cs.Height { if vote.Height+1 == cs.Height {
if !(cs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) { if !(cs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) {
// TODO: give the reason .. // TODO: give the reason ..
@ -1426,7 +1466,15 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool,
if added { if added {
log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort()))
types.FireEventVote(cs.evsw, types.EventDataVote{vote}) types.FireEventVote(cs.evsw, types.EventDataVote{vote})
if cs.LastCommit.HasAll() {
// if we have all the votes now,
// schedule the timeoutCommit to happen right away
// NOTE: this won't apply if only one validator
cs.scheduleTimeout(time.Duration(0), cs.Height, 0, RoundStepNewHeight)
}
} }
return return
} }
@ -1487,7 +1535,6 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool,
cs.enterNewRound(height, vote.Round) cs.enterNewRound(height, vote.Round)
cs.enterPrecommit(height, vote.Round) cs.enterPrecommit(height, vote.Round)
cs.enterPrecommitWait(height, vote.Round) cs.enterPrecommitWait(height, vote.Round)
//}()
} }
default: default:
PanicSanity(Fmt("Unexpected vote type %X", vote.Type)) // Should not happen. PanicSanity(Fmt("Unexpected vote type %X", vote.Type)) // Should not happen.


+ 4
- 0
types/vote_set.go View File

@ -377,6 +377,10 @@ func (voteSet *VoteSet) HasTwoThirdsAny() bool {
return voteSet.sum > voteSet.valSet.TotalVotingPower()*2/3 return voteSet.sum > voteSet.valSet.TotalVotingPower()*2/3
} }
func (voteSet *VoteSet) HasAll() bool {
return voteSet.sum == voteSet.valSet.TotalVotingPower()
}
// Returns either a blockhash (or nil) that received +2/3 majority. // Returns either a blockhash (or nil) that received +2/3 majority.
// If there exists no such majority, returns (nil, PartSetHeader{}, false). // If there exists no such majority, returns (nil, PartSetHeader{}, false).
func (voteSet *VoteSet) TwoThirdsMajority() (blockID BlockID, ok bool) { func (voteSet *VoteSet) TwoThirdsMajority() (blockID BlockID, ok bool) {


Loading…
Cancel
Save