From c3f880e75840ead3c78dd8193800650934648806 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 11 Dec 2015 11:57:15 -0500 Subject: [PATCH] fire timeout events in handleTimeout; internalMsgQueue --- consensus/reactor.go | 6 +- consensus/state.go | 213 +++++++++++++++++++++++-------------------- 2 files changed, 117 insertions(+), 102 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 42c5827f1..7b6100a4a 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -171,12 +171,12 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte switch msg := msg.(type) { case *ProposalMessage: ps.SetHasProposal(msg.Proposal) - conR.conS.msgQueue <- msgInfo{msg, peer.Key} + conR.conS.peerMsgQueue <- msgInfo{msg, peer.Key} case *ProposalPOLMessage: ps.ApplyProposalPOLMessage(msg) case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Proof.Index) - conR.conS.msgQueue <- msgInfo{msg, peer.Key} + conR.conS.peerMsgQueue <- msgInfo{msg, peer.Key} default: log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) } @@ -196,7 +196,7 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote, msg.ValidatorIndex) - conR.conS.msgQueue <- msgInfo{msg, peer.Key} + conR.conS.peerMsgQueue <- msgInfo{msg, peer.Key} default: // don't punish (leave room for soft upgrades) diff --git a/consensus/state.go b/consensus/state.go index 1aa3681b1..05e6920bb 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -153,9 +153,7 @@ func (rs *RoundState) StringShort() string { //----------------------------------------------------------------------------- var ( - msgQueueSize = 1000 - tickBufferSize = 0 // I think this will deadlock ... - tockBufferSize = 0 + msgQueueSize = 1000 ) // msgs from the reactor which update the state @@ -192,10 +190,11 @@ type ConsensusState struct { stagedBlock *types.Block // Cache last staged block. stagedState *sm.State // Cache result of staged block. - roundState RoundState // roundState for the receiveRoutine - msgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes) - tickChan chan timeoutInfo // start a timer in the timeoutRoutine - tockChan chan timeoutInfo // receive a timeout + peerMsgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes) + internalMsgQueue chan msgInfo // like peerMsgQueue but for our own proposals, parts, votes + timeoutTicker *time.Ticker // ticker for timeouts + tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine + tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine evsw events.Fireable evc *events.EventCache // set in stageBlock and passed into state @@ -203,13 +202,15 @@ type ConsensusState struct { func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { cs := &ConsensusState{ - proxyAppCtx: proxyAppCtx, - blockStore: blockStore, - mempool: mempool, - newStepCh: make(chan *RoundState, 10), - msgQueue: make(chan msgInfo, msgQueueSize), - tickChan: make(chan timeoutInfo), - tockChan: make(chan timeoutInfo), + proxyAppCtx: proxyAppCtx, + blockStore: blockStore, + mempool: mempool, + newStepCh: make(chan *RoundState, 10), + peerMsgQueue: make(chan msgInfo, msgQueueSize), + internalMsgQueue: make(chan msgInfo, msgQueueSize), + timeoutTicker: new(time.Ticker), + tickChan: make(chan timeoutInfo), + tockChan: make(chan timeoutInfo), } cs.updateToState(state) // Don't call scheduleRound0 yet. @@ -283,19 +284,11 @@ func (cs *ConsensusState) OnStop() { func (cs *ConsensusState) updateHeight(height int) { cs.Height = height - - cs.roundState.Height = height } func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) { - cs.mtx.Lock() - defer cs.mtx.Unlock() - cs.Round = round cs.Step = step - - cs.roundState.Round = round - cs.roundState.Step = step } // EnterNewRound(height, 0) at cs.StartTime. @@ -305,11 +298,12 @@ func (cs *ConsensusState) scheduleRound0(height int) { // TODO: this go-routine ... go func() { + // should we use the timeoutRoutine? + // we don't really need an event because we get one in NewRound if 0 < sleepDuration { time.Sleep(sleepDuration) - // TODO: event? } - cs.EnterNewRound(height, 0, false) + cs.EnterNewRound(height, 0) }() } @@ -326,7 +320,6 @@ func (cs *ConsensusState) scheduleTimeout(duration time.Duration, height, round // timeouts of 0 on the tickChan will be immediately relayed to the tockChan func (cs *ConsensusState) timeoutRoutine() { log.Debug("Starting timeout routine") - var timer <-chan time.Time var ti timeoutInfo for { select { @@ -349,11 +342,15 @@ func (cs *ConsensusState) timeoutRoutine() { ti = newti log.Info("Scheduling timeout", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) - timer = time.After(ti.duration) - case <-timer: - // these need to not timeout! - // (we can collapse this into receive routine by managing the timer manually) + cs.timeoutTicker.Stop() + cs.timeoutTicker = time.NewTicker(ti.duration) + case <-cs.timeoutTicker.C: log.Info("Timed out", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + cs.timeoutTicker.Stop() + // go routine here gaurantees timeoutRoutine doesn't block. + // Determinism comes from playback in the receiveRoutine. + // We can eliminate it by merging the timeoutRoutine into receiveRoutine + // and managing the timeouts ourselves with a millisecond ticker go func() { cs.tockChan <- ti }() case <-cs.Quit: return @@ -361,19 +358,36 @@ func (cs *ConsensusState) timeoutRoutine() { } } -// receiveRoutine handles messages which may affect the state. -// It should keep the RoundState and be the only thing that updates it. -// TODO: this round state is still confounded with cs.RoundState +func (cs *ConsensusState) stopTimer() { + cs.timeoutTicker.Stop() +} + +func (cs *ConsensusState) sendInternalMessage(mi msgInfo) { + timeout := time.After(10 * time.Millisecond) + select { + case cs.internalMsgQueue <- mi: + case <-timeout: + log.Debug("Timed out trying to send an internal messge. Launching go-routine") + go func() { cs.internalMsgQueue <- mi }() + } +} + +// receiveRoutine handles messages which may cause state transitions. +// It keeps the RoundState and is the only thing that updates it. +// Updates happen on timeouts, complete proposals, and 2/3 majorities func (cs *ConsensusState) receiveRoutine() { for { - rs := cs.roundState + rs := cs.RoundState var mi msgInfo select { - case mi = <-cs.msgQueue: + case mi = <-cs.peerMsgQueue: // handles proposals, block parts, votes // may generate internal events (votes, complete proposals, 2/3 majorities) cs.handleMsg(mi, rs) + case mi = <-cs.internalMsgQueue: + // handles proposals, block parts, votes + cs.handleMsg(mi, rs) case ti := <-cs.tockChan: // if the timeout is relevant to the rs // go to the next step @@ -384,17 +398,23 @@ func (cs *ConsensusState) receiveRoutine() { } } +// state transitions on complete-proposal, 2/3-any, 2/3-one func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { var err error msg, peerKey := mi.msg, mi.peerKey switch msg := msg.(type) { case *ProposalMessage: + // will not cause transition. + // once proposal is set, we can receive block parts err = cs.SetProposal(msg.Proposal) case *BlockPartMessage: + // if the proposal is complete, we'll EnterPrevote or tryFinalizeCommit + // if we're the only validator, the EnterPrevote may take us through to the next round _, err = cs.AddProposalBlockPart(msg.Height, msg.Part) case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature - added, err := cs.TryAddVote(msg.ValidatorIndex, msg.Vote, peerKey) + // if the vote gives us a 2/3-any or 2/3-one, we transition + added, err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey) if err == ErrAddingVote { // TODO: punish peer } @@ -404,7 +424,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { // the peer is sending us CatchupCommit precommits. // We could make note of this and help filter in broadcastHasVoteMessage(). - // XXX TODO: we want this routine to run in the cnsensus state so how do we call the reactor?! + // XXX TODO: do this // conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex) } default: @@ -423,14 +443,18 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { return } + // the timeout will now cause a state transition + switch ti.step { case RoundStepPropose: - log.Debug("ENTERING PREVOTE") - cs.EnterPrevote(ti.height, ti.round, true) + cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent()) + cs.EnterPrevote(ti.height, ti.round) case RoundStepPrevoteWait: - cs.EnterPrecommit(ti.height, ti.round, true) + cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) + cs.EnterPrecommit(ti.height, ti.round) case RoundStepPrecommitWait: - cs.EnterNewRound(ti.height, ti.round+1, true) + cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) + cs.EnterNewRound(ti.height, ti.round+1) default: panic(Fmt("Invalid timeout step: %v", ti.step)) } @@ -516,19 +540,17 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { // Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1) // Enter: `startTime = commitTime+timeoutCommit` from NewHeight(height) // NOTE: cs.StartTime was already set for height. -func (cs *ConsensusState) EnterNewRound(height int, round int, timedOut bool) { +func (cs *ConsensusState) EnterNewRound(height int, round int) { if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != RoundStepNewHeight) { log.Debug(Fmt("EnterNewRound(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } - if timedOut { - cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - } - if now := time.Now(); cs.StartTime.After(now) { log.Warn("Need to set a buffer and log.Warn() here for sanity.", "startTime", cs.StartTime, "now", now) } + cs.stopTimer() + log.Notice(Fmt("EnterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) // Increment validators if necessary @@ -592,7 +614,7 @@ func (cs *ConsensusState) EnterPropose(height int, round int) { // else, we'll EnterPrevote when the rest of the proposal is received (in AddProposalBlockPart), // or else after timeoutPropose if cs.isProposalComplete() { - cs.EnterPrevote(height, cs.Round, false) + cs.EnterPrevote(height, cs.Round) } } @@ -623,14 +645,11 @@ func (cs *ConsensusState) decideProposal(height, round int) { cs.ProposalBlock = block cs.ProposalBlockParts = blockParts - // TODO: can we do better than just launching a go routine? - /*go func() { - cs.msgQueue <- msgInfo{&ProposalMessage{proposal}, ""} - for i := 0; i < blockParts.Total(); i++ { - part := blockParts.GetPart(i) - cs.msgQueue <- msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""} - } - }()*/ + cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""}) + for i := 0; i < blockParts.Total(); i++ { + part := blockParts.GetPart(i) + cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}) + } } else { log.Warn("EnterPropose: Error signing proposal", "height", height, "round", round, "error", err) } @@ -706,7 +725,7 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts // Enter: any +2/3 prevotes for future round. // Prevote for LockedBlock if we're locked, or ProposalBlock if valid. // Otherwise vote nil. -func (cs *ConsensusState) EnterPrevote(height int, round int, timedOut bool) { +func (cs *ConsensusState) EnterPrevote(height int, round int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevote <= cs.Step) { @@ -715,25 +734,27 @@ func (cs *ConsensusState) EnterPrevote(height int, round int, timedOut bool) { } // fire event for how we got here - if timedOut { - cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent()) - } else if cs.isProposalComplete() { + if cs.isProposalComplete() { cs.evsw.FireEvent(types.EventStringCompleteProposal(), cs.RoundStateEvent()) } else { // we received +2/3 prevotes for a future round // TODO: catchup event? } + cs.stopTimer() + log.Info(Fmt("EnterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + // needs to happen before doPrevote because + // addVote will take us to the next step + // TODO: should we fire our own votes back through the receive Routine? + cs.updateRoundStep(round, RoundStepPrevote) + // Sign and broadcast vote as necessary cs.doPrevote(height, round) // Done EnterPrevote: - cs.updateRoundStep(round, RoundStepPrevote) - log.Debug("wait on new step") cs.newStepCh <- cs.getRoundState() - log.Debug("done new step") // Once `addVote` hits any +2/3 prevotes, we will go to PrevoteWait // (so we have more time to try and collect +2/3 prevotes for a single block) @@ -797,7 +818,7 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) { // Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round) // else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil, // else, precommit nil otherwise. -func (cs *ConsensusState) EnterPrecommit(height int, round int, timedOut bool) { +func (cs *ConsensusState) EnterPrecommit(height int, round int) { //cs.mtx.Lock() // defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommit <= cs.Step) { @@ -805,15 +826,13 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int, timedOut bool) { return } - if timedOut { - cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - } + cs.stopTimer() log.Info(Fmt("EnterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + cs.updateRoundStep(round, RoundStepPrecommit) defer func() { // Done EnterPrecommit: - cs.updateRoundStep(round, RoundStepPrecommit) cs.newStepCh <- cs.getRoundState() }() @@ -1090,8 +1109,7 @@ func (cs *ConsensusState) AddProposalBlockPart(height int, part *types.Part) (ad log.Info("Received complete proposal", "hash", cs.ProposalBlock.Hash()) if cs.Step == RoundStepPropose && cs.isProposalComplete() { // Move onto the next step - //go cs.EnterPrevote(height, cs.Round, false) - cs.EnterPrevote(height, cs.Round, false) + cs.EnterPrevote(height, cs.Round) } else if cs.Step == RoundStepCommit { // If we're waiting on the proposal block... cs.tryFinalizeCommit(height) @@ -1102,8 +1120,8 @@ func (cs *ConsensusState) AddProposalBlockPart(height int, part *types.Part) (ad } // Attempt to add the vote. if its a duplicate signature, dupeout the validator -func (cs *ConsensusState) TryAddVote(valIndex int, vote *types.Vote, peerKey string) (bool, error) { - added, _, err := cs.AddVote(valIndex, vote, peerKey) +func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey string) (bool, error) { + added, _, err := cs.addVote(valIndex, vote, peerKey) if err != nil { // If the vote height is off, we'll just ignore it, // But if it's a conflicting sig, broadcast evidence tx for slashing. @@ -1130,13 +1148,23 @@ func (cs *ConsensusState) TryAddVote(valIndex int, vote *types.Vote, peerKey str return added, nil } +/* + +// Interface to the state machine from external go routines. +// May block on send if queue is full. +// How do we get added/address/error back? func (cs *ConsensusState) AddVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) { - // cs.mtx.Lock() - // defer cs.mtx.Unlock() + if peerKey == "" { + cs.internalMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""} + } else { + cs.peerMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, peerKey} + } - return cs.addVote(valIndex, vote, peerKey) + // TODO: wait for event?! } +*/ + //----------------------------------------------------------------------------- func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) { @@ -1152,7 +1180,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string if vote.Height+1 == cs.Height { if !(cs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) { // TODO: give the reason .. - // fmt.Errorf("TryAddVote: Wrong height, not a LastCommit straggler commit.") + // fmt.Errorf("tryAddVote: Wrong height, not a LastCommit straggler commit.") return added, nil, ErrVoteHeightMismatch } added, address, err = cs.LastCommit.AddByIndex(valIndex, vote) @@ -1188,20 +1216,17 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string } if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() { // Round-skip over to PrevoteWait or goto Precommit. - //go func() { - cs.EnterNewRound(height, vote.Round, false) + cs.EnterNewRound(height, vote.Round) // if the vote is ahead of us if prevotes.HasTwoThirdsMajority() { - cs.EnterPrecommit(height, vote.Round, false) + cs.EnterPrecommit(height, vote.Round) } else { - cs.EnterPrevote(height, vote.Round, false) + cs.EnterPrevote(height, vote.Round) // if the vote is ahead of us cs.EnterPrevoteWait(height, vote.Round) } - //}() } else if cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round { // If the proposal is now complete, enter prevote of cs.Round. if cs.isProposalComplete() { - // go - cs.EnterPrevote(height, cs.Round, false) + cs.EnterPrevote(height, cs.Round) } } case types.VoteTypePrecommit: @@ -1209,19 +1234,16 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string log.Info("Added to precommit", "vote", vote, "precommits", precommits.StringShort()) hash, _, ok := precommits.TwoThirdsMajority() if ok { - //go func() { if len(hash) == 0 { - cs.EnterNewRound(height, vote.Round+1, false) + cs.EnterNewRound(height, vote.Round+1) } else { - cs.EnterNewRound(height, vote.Round, false) - cs.EnterPrecommit(height, vote.Round, false) + cs.EnterNewRound(height, vote.Round) + cs.EnterPrecommit(height, vote.Round) cs.EnterCommit(height, vote.Round) } - //}() } else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() { - //go func() { - cs.EnterNewRound(height, vote.Round, false) - cs.EnterPrecommit(height, vote.Round, false) + cs.EnterNewRound(height, vote.Round) + cs.EnterPrecommit(height, vote.Round) cs.EnterPrecommitWait(height, vote.Round) //}() } @@ -1293,16 +1315,9 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part if err == nil { // NOTE: store our index in the cs so we don't have to do this every time valIndex, _ := cs.Validators.GetByAddress(cs.privValidator.Address) - _, _, err := cs.addVote(valIndex, vote, "") - log.Notice("Signed and added vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) - // so we fire events for ourself and can run replays - // TODO: can we do better than just launching a go-routine - // XXX: maybe we can use a "personal" channel in the receiveRoutine select - // and fire these there so we don't possibly block on a full msgQueue. - // though if things are really backed up, we could block on the personal channel too - /*go func() { - cs.msgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""} - }()*/ + // _, _, err := cs.addVote(valIndex, vote, "") + cs.sendInternalMessage(msgInfo{&VoteMessage{valIndex, vote}, ""}) + log.Notice("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) return vote } else { log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)