diff --git a/consensus/common_test.go b/consensus/common_test.go index 0c75bc7fb..2646310e9 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -297,6 +297,13 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { // read off the NewHeightStep <-cs.NewStepCh() + // start the reactor routines + // (we should move these to state but the receive routine needs to be able to "broadcast votes" + // --> add good votes to some buffered chan and have a go routine do the broadcast ... + conR := NewConsensusReactor(cs, nil, false) + go conR.receiveRoutine() // serializes processing of proposoals, block parts, votes + go conR.timeoutRoutine() // fires timeouts into the receive routine + for i := 0; i < nValidators; i++ { vss[i] = NewValidatorStub(privVals[i]) } diff --git a/consensus/reactor.go b/consensus/reactor.go index 4fc9fcc0b..a3c595ea4 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -56,8 +56,9 @@ func (conR *ConsensusReactor) OnStart() error { return err } } + go conR.receiveRoutine() // serializes processing of proposoals, block parts, votes + go conR.timeoutRoutine() // fires timeouts into the receive routine go conR.broadcastNewRoundStepRoutine() - go conR.msgProcessor() return nil } @@ -209,6 +210,131 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte } } +// the state machine sends on tickChan to start a new timer. +// timers are interupted and replaced by new ticks from later steps +// timeouts of 0 on the tickChan will be immediately relayed to the tockChan +func (conR *ConsensusReactor) timeoutRoutine() { + ticker := new(time.Ticker) + var ti timeoutInfo + log.Debug("starting timeout routine!") + for { + select { + case newti := <-conR.conS.tickChan: + log.Debug("Received tick", "new_it", newti.String(), "old_ti", ti.String()) + + // ignore tickers for old height/round/step + if newti.height < ti.height { + continue + } else if newti.height == ti.height { + if newti.round < ti.round { + continue + } else if newti.round == ti.round { + if ti.step > 0 && newti.step <= ti.step { + continue + } + } + } + + ticker.Stop() + ti = newti + + if ti.duration == time.Duration(0) { + // for new rounds with no sleep + conR.conS.tockChan <- ti + } else { + ticker = time.NewTicker(ti.duration) + } + case <-ticker.C: + log.Debug("timed out! firing on tock") + ticker.Stop() + conR.conS.tockChan <- ti + case <-conR.Quit: + return + } + } +} + +// receiveRoutine handles messages which affect the state. +// it should keep the RoundState and be the only thing that updates it +// XXX: for incremental dev, we store this RoundState unprotected cs) +func (conR *ConsensusReactor) receiveRoutine() { + cs := conR.conS + for { + rs := cs.roundState + var mi msgInfo + + select { + case mi = <-cs.msgQueue: + // handles proposals, block parts, votes + // may fire on tickChan + conR.handleMessage(mi) + case ti := <-cs.tockChan: + log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + // timeouts must be for current height, round, step + if ti.height == rs.Height+1 && ti.round == 0 && ti.step == RoundStepNewHeight { + // legit + log.Debug("received tock for next height") + } else if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) { + log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step) + continue + } + + switch ti.step { + case RoundStepPropose: + cs.EnterPrevote(ti.height, ti.round, true) + case RoundStepPrevote: + cs.EnterPrecommit(ti.height, ti.round, true) + case RoundStepPrecommit: + // If we have +2/3 of precommits for a particular block (or nil), + // we already entered commit (or the next round). + // So just try to transition to the next round, + // which is what we'd do otherwise. + cs.EnterNewRound(ti.height, ti.round+1, true) + case RoundStepNewRound: + // ? + case RoundStepNewHeight: + /*if ti.round == rs.Round && rs.Step != RoundStepNewHeight { + continue + }*/ + cs.EnterNewRound(ti.height, 0, false) + } + + case <-conR.Quit: + return + } + } +} + +func (conR *ConsensusReactor) handleMsg(mi messageInfo) { + cs := conR.conS + + var err error + msg, peerKey := mi.msg, mi.peerKey + switch msg := msg.(type) { + case *ProposalMessage: + err = cs.SetProposal(msg.Proposal) + case *BlockPartMessage: + _, err = cs.AddProposalBlockPart(msg.Height, msg.Part) + case *VoteMessage: + // attempt to add the vote and dupeout the validator if its a duplicate signature + added, err := cs.TryAddVote(msg.ValidatorIndex, msg.Vote, peerKey) + if err == ErrAddingVote { + // TODO: punish peer + } + + if added { + // If rs.Height == vote.Height && rs.Round < vote.Round, + // the peer is sending us CatchupCommit precommits. + // We could make note of this and help filter in broadcastHasVoteMessage(). + + conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex) + } + } + if err != nil { + log.Debug("error with msg", "error", err) + } +} + // Broadcasts HasVoteMessage to peers that care. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) { msg := &HasVoteMessage{ @@ -248,53 +374,6 @@ func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { //-------------------------------------- -// a message coming in from the reactor -type msgInfo struct { - msg ConsensusMessage - peerKey string -} - -func (conR *ConsensusReactor) msgProcessor() { - for { - var mi msgInfo - var err error - select { - case mi = <-conR.conS.msgQueue: - case <-conR.Quit: - return - } - - msg, peerKey := mi.msg, mi.peerKey - switch msg := msg.(type) { - case *ProposalMessage: - err = conR.conS.SetProposal(msg.Proposal) - case *BlockPartMessage: - _, err = conR.conS.AddProposalBlockPart(msg.Height, msg.Part) - case *VoteMessage: - // attempt to add the vote and dupeout the validator if its a duplicate signature - added, err := conR.conS.TryAddVote(msg.ValidatorIndex, msg.Vote, peerKey) - if err == ErrAddingVote { - // TODO: punish peer - } - - if added { - // If rs.Height == vote.Height && rs.Round < vote.Round, - // the peer is sending us CatchupCommit precommits. - // We could make note of this and help filter in broadcastHasVoteMessage(). - conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex) - } - - } - if err != nil { - log.Warn("Error in msg processor", "error", err) - } - - // TODO: get Proposer into the Data - conR.evsw.FireEvent(types.EventStringConsensusMessage(), &types.EventDataConsensusMessage{msg}) - - } -} - func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { nrsMsg = &NewRoundStepMessage{ Height: rs.Height, diff --git a/consensus/state.go b/consensus/state.go index 645fb8766..ac319be37 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -151,9 +151,29 @@ func (rs *RoundState) StringShort() string { //----------------------------------------------------------------------------- var ( - msgQueueSize = 1000 + msgQueueSize = 1000 + tickBufferSize = 0 // I think this will deadlock ... + tockBufferSize = 0 ) +// msgs from the reactor which update the state +type msgInfo struct { + msg ConsensusMessage + peerKey string +} + +// internally generated messages which update the state +type timeoutInfo struct { + duration time.Duration + height int + round int + step RoundStepType +} + +func (ti *timeoutInfo) String() string { + return fmt.Sprintf("%v ; %d/%d %v", ti.duration, ti.height, ti.round, ti.step) +} + // Tracks consensus state across block heights and rounds. type ConsensusState struct { BaseService @@ -170,8 +190,10 @@ type ConsensusState struct { stagedBlock *types.Block // Cache last staged block. stagedState *sm.State // Cache result of staged block. - // for messages which affect the state (proposals, block parts, votes) - msgQueue chan msgInfo + roundState RoundState // roundState for the receiveRoutine + msgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes) + tickChan chan timeoutInfo // start a timer in the timeoutRoutine + tockChan chan timeoutInfo // receive a timeout evsw events.Fireable evc *events.EventCache // set in stageBlock and passed into state @@ -184,6 +206,8 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore mempool: mempool, newStepCh: make(chan *RoundState, 10), msgQueue: make(chan msgInfo, msgQueueSize), + tickChan: make(chan timeoutInfo, tickBufferSize), + tockChan: make(chan timeoutInfo, tockBufferSize), } cs.updateToState(state) // Don't call scheduleRound0 yet. @@ -239,7 +263,7 @@ func (cs *ConsensusState) NewStepCh() chan *RoundState { func (cs *ConsensusState) OnStart() error { cs.BaseService.OnStart() - cs.scheduleRound0(cs.Height) + go cs.scheduleRound0(cs.Height) return nil } @@ -248,17 +272,32 @@ func (cs *ConsensusState) OnStop() { cs.BaseService.OnStop() } +func (cs *ConsensusState) updateHeight(height int) { + cs.Height = height + + cs.roundState.Height = height +} + +func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) { + cs.Round = round + cs.Step = step + + cs.roundState.Round = round + cs.roundState.Step = step +} + // EnterNewRound(height, 0) at cs.StartTime. func (cs *ConsensusState) scheduleRound0(height int) { //log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime) sleepDuration := cs.StartTime.Sub(time.Now()) - go func() { - if 0 < sleepDuration { - time.Sleep(sleepDuration) - // TODO: event? - } - cs.EnterNewRound(height, 0, false) - }() + + log.Debug("scheduleRound0 by firing on tick", "height", height) + // if sleepDuration is 0 it will just relay it to tockChan right away + cs.scheduleHeightRoundStep(sleepDuration, height, 0, RoundStepNewHeight) +} + +func (cs *ConsensusState) scheduleHeightRoundStep(duration time.Duration, height, round int, step RoundStepType) { + cs.tickChan <- timeoutInfo{duration, height, round, step} } // Updates ConsensusState and increments height to match that of state. @@ -295,9 +334,8 @@ func (cs *ConsensusState) updateToState(state *sm.State) { } // RoundState fields - cs.Height = height - cs.Round = 0 - cs.Step = RoundStepNewHeight + cs.updateHeight(height) + cs.updateRoundStep(0, RoundStepNewHeight) if cs.CommitTime.IsZero() { // "Now" makes it easier to sync up dev nodes. // We add timeoutCommit to allow transactions @@ -366,8 +404,7 @@ func (cs *ConsensusState) EnterNewRound(height int, round int, timedOut bool) { } // Setup new round - cs.Round = round - cs.Step = RoundStepNewRound + cs.updateRoundStep(round, RoundStepNewRound) cs.Validators = validators if round == 0 { // We've already reset these upon new height, @@ -398,8 +435,7 @@ func (cs *ConsensusState) EnterPropose(height int, round int) { defer func() { // Done EnterPropose: - cs.Round = round - cs.Step = RoundStepPropose + cs.updateRoundStep(round, RoundStepPropose) cs.newStepCh <- cs.getRoundState() // If we have the whole proposal + POL, then goto Prevote now. @@ -411,10 +447,8 @@ func (cs *ConsensusState) EnterPropose(height int, round int) { }() // This step times out after `timeoutPropose` - go func() { - time.Sleep(timeoutPropose) - cs.EnterPrevote(height, round, true) - }() + cs.tickChan <- timeoutInfo{timeoutPropose, height, round, RoundStepPropose} + log.Debug("started timer") // Nothing more to do if we're not a validator if cs.privValidator == nil { @@ -456,11 +490,14 @@ func (cs *ConsensusState) decideProposal(height, round int) { cs.ProposalBlock = block cs.ProposalBlockParts = blockParts - cs.msgQueue <- msgInfo{&ProposalMessage{proposal}, ""} - for i := 0; i < blockParts.Total(); i++ { - part := blockParts.GetPart(i) - cs.msgQueue <- msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""} - } + // TODO: can we do better than just launching a go routine? + go func() { + cs.msgQueue <- msgInfo{&ProposalMessage{proposal}, ""} + for i := 0; i < blockParts.Total(); i++ { + part := blockParts.GetPart(i) + cs.msgQueue <- msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""} + } + }() } else { log.Warn("EnterPropose: Error signing proposal", "height", height, "round", round, "error", err) } @@ -560,8 +597,7 @@ func (cs *ConsensusState) EnterPrevote(height int, round int, timedOut bool) { cs.doPrevote(height, round) // Done EnterPrevote: - cs.Round = round - cs.Step = RoundStepPrevote + cs.updateRoundStep(round, RoundStepPrevote) cs.newStepCh <- cs.getRoundState() // Once `addVote` hits any +2/3 prevotes, we will go to PrevoteWait @@ -613,15 +649,11 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) { log.Info(Fmt("EnterPrevoteWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) // Done EnterPrevoteWait: - cs.Round = round - cs.Step = RoundStepPrevoteWait + cs.updateRoundStep(round, RoundStepPrevoteWait) cs.newStepCh <- cs.getRoundState() // After `timeoutPrevote0+timeoutPrevoteDelta*round`, EnterPrecommit() - go func() { - time.Sleep(timeoutPrevote0 + timeoutPrevoteDelta*time.Duration(round)) - cs.EnterPrecommit(height, round, true) - }() + cs.tickChan <- timeoutInfo{timeoutPrevote0 + timeoutPrevoteDelta*time.Duration(round), height, round, RoundStepPrevote} } // Enter: +2/3 precomits for block or nil. @@ -646,8 +678,7 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int, timedOut bool) { defer func() { // Done EnterPrecommit: - cs.Round = round - cs.Step = RoundStepPrecommit + cs.updateRoundStep(round, RoundStepPrecommit) cs.newStepCh <- cs.getRoundState() }() @@ -743,19 +774,12 @@ func (cs *ConsensusState) EnterPrecommitWait(height int, round int) { log.Info(Fmt("EnterPrecommitWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) // Done EnterPrecommitWait: - cs.Round = round - cs.Step = RoundStepPrecommitWait + cs.updateRoundStep(round, RoundStepPrecommitWait) cs.newStepCh <- cs.getRoundState() // After `timeoutPrecommit0+timeoutPrecommitDelta*round`, EnterNewRound() - go func() { - time.Sleep(timeoutPrecommit0 + timeoutPrecommitDelta*time.Duration(round)) - // If we have +2/3 of precommits for a particular block (or nil), - // we already entered commit (or the next round). - // So just try to transition to the next round, - // which is what we'd do otherwise. - cs.EnterNewRound(height, round+1, true) - }() + cs.tickChan <- timeoutInfo{timeoutPrecommit0 + timeoutPrecommitDelta*time.Duration(round), height, round, RoundStepPrecommit} + } // Enter: +2/3 precommits for block @@ -1133,7 +1157,13 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part _, _, err := cs.addVote(valIndex, vote, "") log.Notice("Signed and added vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) // so we fire events for ourself and can run replays - cs.msgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""} + // TODO: can we do better than just launching a go-routine + // XXX: maybe we can use a "personal" channel in the receiveRoutine select + // and fire these there so we don't possibly block on a full msgQueue. + // though if things are really backed up, we could block on the personal channel too + go func() { + cs.msgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""} + }() return vote } else { log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)