Browse Source

fire timeout events in handleTimeout; internalMsgQueue

pull/169/head
Ethan Buchman 9 years ago
parent
commit
c3f880e758
2 changed files with 117 additions and 102 deletions
  1. +3
    -3
      consensus/reactor.go
  2. +114
    -99
      consensus/state.go

+ 3
- 3
consensus/reactor.go View File

@ -171,12 +171,12 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.msgQueue <- msgInfo{msg, peer.Key}
conR.conS.peerMsgQueue <- msgInfo{msg, peer.Key}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index)
conR.conS.msgQueue <- msgInfo{msg, peer.Key}
conR.conS.peerMsgQueue <- msgInfo{msg, peer.Key}
default:
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
}
@ -196,7 +196,7 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote, msg.ValidatorIndex)
conR.conS.msgQueue <- msgInfo{msg, peer.Key}
conR.conS.peerMsgQueue <- msgInfo{msg, peer.Key}
default:
// don't punish (leave room for soft upgrades)


+ 114
- 99
consensus/state.go View File

@ -153,9 +153,7 @@ func (rs *RoundState) StringShort() string {
//-----------------------------------------------------------------------------
var (
msgQueueSize = 1000
tickBufferSize = 0 // I think this will deadlock ...
tockBufferSize = 0
msgQueueSize = 1000
)
// msgs from the reactor which update the state
@ -192,10 +190,11 @@ type ConsensusState struct {
stagedBlock *types.Block // Cache last staged block.
stagedState *sm.State // Cache result of staged block.
roundState RoundState // roundState for the receiveRoutine
msgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes)
tickChan chan timeoutInfo // start a timer in the timeoutRoutine
tockChan chan timeoutInfo // receive a timeout
peerMsgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes)
internalMsgQueue chan msgInfo // like peerMsgQueue but for our own proposals, parts, votes
timeoutTicker *time.Ticker // ticker for timeouts
tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine
tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine
evsw events.Fireable
evc *events.EventCache // set in stageBlock and passed into state
@ -203,13 +202,15 @@ type ConsensusState struct {
func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
cs := &ConsensusState{
proxyAppCtx: proxyAppCtx,
blockStore: blockStore,
mempool: mempool,
newStepCh: make(chan *RoundState, 10),
msgQueue: make(chan msgInfo, msgQueueSize),
tickChan: make(chan timeoutInfo),
tockChan: make(chan timeoutInfo),
proxyAppCtx: proxyAppCtx,
blockStore: blockStore,
mempool: mempool,
newStepCh: make(chan *RoundState, 10),
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
timeoutTicker: new(time.Ticker),
tickChan: make(chan timeoutInfo),
tockChan: make(chan timeoutInfo),
}
cs.updateToState(state)
// Don't call scheduleRound0 yet.
@ -283,19 +284,11 @@ func (cs *ConsensusState) OnStop() {
func (cs *ConsensusState) updateHeight(height int) {
cs.Height = height
cs.roundState.Height = height
}
func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.Round = round
cs.Step = step
cs.roundState.Round = round
cs.roundState.Step = step
}
// EnterNewRound(height, 0) at cs.StartTime.
@ -305,11 +298,12 @@ func (cs *ConsensusState) scheduleRound0(height int) {
// TODO: this go-routine ...
go func() {
// should we use the timeoutRoutine?
// we don't really need an event because we get one in NewRound
if 0 < sleepDuration {
time.Sleep(sleepDuration)
// TODO: event?
}
cs.EnterNewRound(height, 0, false)
cs.EnterNewRound(height, 0)
}()
}
@ -326,7 +320,6 @@ func (cs *ConsensusState) scheduleTimeout(duration time.Duration, height, round
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
func (cs *ConsensusState) timeoutRoutine() {
log.Debug("Starting timeout routine")
var timer <-chan time.Time
var ti timeoutInfo
for {
select {
@ -349,11 +342,15 @@ func (cs *ConsensusState) timeoutRoutine() {
ti = newti
log.Info("Scheduling timeout", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
timer = time.After(ti.duration)
case <-timer:
// these need to not timeout!
// (we can collapse this into receive routine by managing the timer manually)
cs.timeoutTicker.Stop()
cs.timeoutTicker = time.NewTicker(ti.duration)
case <-cs.timeoutTicker.C:
log.Info("Timed out", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
cs.timeoutTicker.Stop()
// go routine here gaurantees timeoutRoutine doesn't block.
// Determinism comes from playback in the receiveRoutine.
// We can eliminate it by merging the timeoutRoutine into receiveRoutine
// and managing the timeouts ourselves with a millisecond ticker
go func() { cs.tockChan <- ti }()
case <-cs.Quit:
return
@ -361,19 +358,36 @@ func (cs *ConsensusState) timeoutRoutine() {
}
}
// receiveRoutine handles messages which may affect the state.
// It should keep the RoundState and be the only thing that updates it.
// TODO: this round state is still confounded with cs.RoundState
func (cs *ConsensusState) stopTimer() {
cs.timeoutTicker.Stop()
}
func (cs *ConsensusState) sendInternalMessage(mi msgInfo) {
timeout := time.After(10 * time.Millisecond)
select {
case cs.internalMsgQueue <- mi:
case <-timeout:
log.Debug("Timed out trying to send an internal messge. Launching go-routine")
go func() { cs.internalMsgQueue <- mi }()
}
}
// receiveRoutine handles messages which may cause state transitions.
// It keeps the RoundState and is the only thing that updates it.
// Updates happen on timeouts, complete proposals, and 2/3 majorities
func (cs *ConsensusState) receiveRoutine() {
for {
rs := cs.roundState
rs := cs.RoundState
var mi msgInfo
select {
case mi = <-cs.msgQueue:
case mi = <-cs.peerMsgQueue:
// handles proposals, block parts, votes
// may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi, rs)
case mi = <-cs.internalMsgQueue:
// handles proposals, block parts, votes
cs.handleMsg(mi, rs)
case ti := <-cs.tockChan:
// if the timeout is relevant to the rs
// go to the next step
@ -384,17 +398,23 @@ func (cs *ConsensusState) receiveRoutine() {
}
}
// state transitions on complete-proposal, 2/3-any, 2/3-one
func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
var err error
msg, peerKey := mi.msg, mi.peerKey
switch msg := msg.(type) {
case *ProposalMessage:
// will not cause transition.
// once proposal is set, we can receive block parts
err = cs.SetProposal(msg.Proposal)
case *BlockPartMessage:
// if the proposal is complete, we'll EnterPrevote or tryFinalizeCommit
// if we're the only validator, the EnterPrevote may take us through to the next round
_, err = cs.AddProposalBlockPart(msg.Height, msg.Part)
case *VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature
added, err := cs.TryAddVote(msg.ValidatorIndex, msg.Vote, peerKey)
// if the vote gives us a 2/3-any or 2/3-one, we transition
added, err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey)
if err == ErrAddingVote {
// TODO: punish peer
}
@ -404,7 +424,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
// the peer is sending us CatchupCommit precommits.
// We could make note of this and help filter in broadcastHasVoteMessage().
// XXX TODO: we want this routine to run in the cnsensus state so how do we call the reactor?!
// XXX TODO: do this
// conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex)
}
default:
@ -423,14 +443,18 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
return
}
// the timeout will now cause a state transition
switch ti.step {
case RoundStepPropose:
log.Debug("ENTERING PREVOTE")
cs.EnterPrevote(ti.height, ti.round, true)
cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent())
cs.EnterPrevote(ti.height, ti.round)
case RoundStepPrevoteWait:
cs.EnterPrecommit(ti.height, ti.round, true)
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
cs.EnterPrecommit(ti.height, ti.round)
case RoundStepPrecommitWait:
cs.EnterNewRound(ti.height, ti.round+1, true)
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
cs.EnterNewRound(ti.height, ti.round+1)
default:
panic(Fmt("Invalid timeout step: %v", ti.step))
}
@ -516,19 +540,17 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) {
// Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1)
// Enter: `startTime = commitTime+timeoutCommit` from NewHeight(height)
// NOTE: cs.StartTime was already set for height.
func (cs *ConsensusState) EnterNewRound(height int, round int, timedOut bool) {
func (cs *ConsensusState) EnterNewRound(height int, round int) {
if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != RoundStepNewHeight) {
log.Debug(Fmt("EnterNewRound(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
return
}
if timedOut {
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
}
if now := time.Now(); cs.StartTime.After(now) {
log.Warn("Need to set a buffer and log.Warn() here for sanity.", "startTime", cs.StartTime, "now", now)
}
cs.stopTimer()
log.Notice(Fmt("EnterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
// Increment validators if necessary
@ -592,7 +614,7 @@ func (cs *ConsensusState) EnterPropose(height int, round int) {
// else, we'll EnterPrevote when the rest of the proposal is received (in AddProposalBlockPart),
// or else after timeoutPropose
if cs.isProposalComplete() {
cs.EnterPrevote(height, cs.Round, false)
cs.EnterPrevote(height, cs.Round)
}
}
@ -623,14 +645,11 @@ func (cs *ConsensusState) decideProposal(height, round int) {
cs.ProposalBlock = block
cs.ProposalBlockParts = blockParts
// TODO: can we do better than just launching a go routine?
/*go func() {
cs.msgQueue <- msgInfo{&ProposalMessage{proposal}, ""}
for i := 0; i < blockParts.Total(); i++ {
part := blockParts.GetPart(i)
cs.msgQueue <- msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}
}
}()*/
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
for i := 0; i < blockParts.Total(); i++ {
part := blockParts.GetPart(i)
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""})
}
} else {
log.Warn("EnterPropose: Error signing proposal", "height", height, "round", round, "error", err)
}
@ -706,7 +725,7 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
// Enter: any +2/3 prevotes for future round.
// Prevote for LockedBlock if we're locked, or ProposalBlock if valid.
// Otherwise vote nil.
func (cs *ConsensusState) EnterPrevote(height int, round int, timedOut bool) {
func (cs *ConsensusState) EnterPrevote(height int, round int) {
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevote <= cs.Step) {
@ -715,25 +734,27 @@ func (cs *ConsensusState) EnterPrevote(height int, round int, timedOut bool) {
}
// fire event for how we got here
if timedOut {
cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent())
} else if cs.isProposalComplete() {
if cs.isProposalComplete() {
cs.evsw.FireEvent(types.EventStringCompleteProposal(), cs.RoundStateEvent())
} else {
// we received +2/3 prevotes for a future round
// TODO: catchup event?
}
cs.stopTimer()
log.Info(Fmt("EnterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
// needs to happen before doPrevote because
// addVote will take us to the next step
// TODO: should we fire our own votes back through the receive Routine?
cs.updateRoundStep(round, RoundStepPrevote)
// Sign and broadcast vote as necessary
cs.doPrevote(height, round)
// Done EnterPrevote:
cs.updateRoundStep(round, RoundStepPrevote)
log.Debug("wait on new step")
cs.newStepCh <- cs.getRoundState()
log.Debug("done new step")
// Once `addVote` hits any +2/3 prevotes, we will go to PrevoteWait
// (so we have more time to try and collect +2/3 prevotes for a single block)
@ -797,7 +818,7 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) {
// Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round)
// else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil,
// else, precommit nil otherwise.
func (cs *ConsensusState) EnterPrecommit(height int, round int, timedOut bool) {
func (cs *ConsensusState) EnterPrecommit(height int, round int) {
//cs.mtx.Lock()
// defer cs.mtx.Unlock()
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommit <= cs.Step) {
@ -805,15 +826,13 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int, timedOut bool) {
return
}
if timedOut {
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
}
cs.stopTimer()
log.Info(Fmt("EnterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
cs.updateRoundStep(round, RoundStepPrecommit)
defer func() {
// Done EnterPrecommit:
cs.updateRoundStep(round, RoundStepPrecommit)
cs.newStepCh <- cs.getRoundState()
}()
@ -1090,8 +1109,7 @@ func (cs *ConsensusState) AddProposalBlockPart(height int, part *types.Part) (ad
log.Info("Received complete proposal", "hash", cs.ProposalBlock.Hash())
if cs.Step == RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
//go cs.EnterPrevote(height, cs.Round, false)
cs.EnterPrevote(height, cs.Round, false)
cs.EnterPrevote(height, cs.Round)
} else if cs.Step == RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(height)
@ -1102,8 +1120,8 @@ func (cs *ConsensusState) AddProposalBlockPart(height int, part *types.Part) (ad
}
// Attempt to add the vote. if its a duplicate signature, dupeout the validator
func (cs *ConsensusState) TryAddVote(valIndex int, vote *types.Vote, peerKey string) (bool, error) {
added, _, err := cs.AddVote(valIndex, vote, peerKey)
func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey string) (bool, error) {
added, _, err := cs.addVote(valIndex, vote, peerKey)
if err != nil {
// If the vote height is off, we'll just ignore it,
// But if it's a conflicting sig, broadcast evidence tx for slashing.
@ -1130,13 +1148,23 @@ func (cs *ConsensusState) TryAddVote(valIndex int, vote *types.Vote, peerKey str
return added, nil
}
/*
// Interface to the state machine from external go routines.
// May block on send if queue is full.
// How do we get added/address/error back?
func (cs *ConsensusState) AddVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) {
// cs.mtx.Lock()
// defer cs.mtx.Unlock()
if peerKey == "" {
cs.internalMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""}
} else {
cs.peerMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, peerKey}
}
return cs.addVote(valIndex, vote, peerKey)
// TODO: wait for event?!
}
*/
//-----------------------------------------------------------------------------
func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) {
@ -1152,7 +1180,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
if vote.Height+1 == cs.Height {
if !(cs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) {
// TODO: give the reason ..
// fmt.Errorf("TryAddVote: Wrong height, not a LastCommit straggler commit.")
// fmt.Errorf("tryAddVote: Wrong height, not a LastCommit straggler commit.")
return added, nil, ErrVoteHeightMismatch
}
added, address, err = cs.LastCommit.AddByIndex(valIndex, vote)
@ -1188,20 +1216,17 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
}
if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() {
// Round-skip over to PrevoteWait or goto Precommit.
//go func() {
cs.EnterNewRound(height, vote.Round, false)
cs.EnterNewRound(height, vote.Round) // if the vote is ahead of us
if prevotes.HasTwoThirdsMajority() {
cs.EnterPrecommit(height, vote.Round, false)
cs.EnterPrecommit(height, vote.Round)
} else {
cs.EnterPrevote(height, vote.Round, false)
cs.EnterPrevote(height, vote.Round) // if the vote is ahead of us
cs.EnterPrevoteWait(height, vote.Round)
}
//}()
} else if cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round {
// If the proposal is now complete, enter prevote of cs.Round.
if cs.isProposalComplete() {
// go
cs.EnterPrevote(height, cs.Round, false)
cs.EnterPrevote(height, cs.Round)
}
}
case types.VoteTypePrecommit:
@ -1209,19 +1234,16 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
log.Info("Added to precommit", "vote", vote, "precommits", precommits.StringShort())
hash, _, ok := precommits.TwoThirdsMajority()
if ok {
//go func() {
if len(hash) == 0 {
cs.EnterNewRound(height, vote.Round+1, false)
cs.EnterNewRound(height, vote.Round+1)
} else {
cs.EnterNewRound(height, vote.Round, false)
cs.EnterPrecommit(height, vote.Round, false)
cs.EnterNewRound(height, vote.Round)
cs.EnterPrecommit(height, vote.Round)
cs.EnterCommit(height, vote.Round)
}
//}()
} else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() {
//go func() {
cs.EnterNewRound(height, vote.Round, false)
cs.EnterPrecommit(height, vote.Round, false)
cs.EnterNewRound(height, vote.Round)
cs.EnterPrecommit(height, vote.Round)
cs.EnterPrecommitWait(height, vote.Round)
//}()
}
@ -1293,16 +1315,9 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part
if err == nil {
// NOTE: store our index in the cs so we don't have to do this every time
valIndex, _ := cs.Validators.GetByAddress(cs.privValidator.Address)
_, _, err := cs.addVote(valIndex, vote, "")
log.Notice("Signed and added vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)
// so we fire events for ourself and can run replays
// TODO: can we do better than just launching a go-routine
// XXX: maybe we can use a "personal" channel in the receiveRoutine select
// and fire these there so we don't possibly block on a full msgQueue.
// though if things are really backed up, we could block on the personal channel too
/*go func() {
cs.msgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""}
}()*/
// _, _, err := cs.addVote(valIndex, vote, "")
cs.sendInternalMessage(msgInfo{&VoteMessage{valIndex, vote}, ""})
log.Notice("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)
return vote
} else {
log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)


Loading…
Cancel
Save