diff --git a/consensus/state.go b/consensus/state.go index 05e6920bb..21f7f346f 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -153,16 +153,17 @@ func (rs *RoundState) StringShort() string { //----------------------------------------------------------------------------- var ( - msgQueueSize = 1000 + msgQueueSize = 1000 + tickTockBufferSize = 10 ) -// msgs from the reactor which update the state +// msgs from the reactor which may update the state type msgInfo struct { msg ConsensusMessage peerKey string } -// internally generated messages which update the state +// internally generated messages which may update the state type timeoutInfo struct { duration time.Duration height int @@ -198,6 +199,8 @@ type ConsensusState struct { evsw events.Fireable evc *events.EventCache // set in stageBlock and passed into state + + nSteps int // used for testing to limit the number of transitions the state makes } func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { @@ -209,8 +212,8 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), timeoutTicker: new(time.Ticker), - tickChan: make(chan timeoutInfo), - tockChan: make(chan timeoutInfo), + tickChan: make(chan timeoutInfo, tickTockBufferSize), + tockChan: make(chan timeoutInfo, tickTockBufferSize), } cs.updateToState(state) // Don't call scheduleRound0 yet. @@ -220,27 +223,16 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore return cs } -// Reconstruct LastCommit from SeenValidation, which we saved along with the block, -// (which happens even before saving the state) -func (cs *ConsensusState) reconstructLastCommit(state *sm.State) { - if state.LastBlockHeight == 0 { - return - } - lastPrecommits := types.NewVoteSet(state.LastBlockHeight, 0, types.VoteTypePrecommit, state.LastValidators) - seenValidation := cs.blockStore.LoadSeenValidation(state.LastBlockHeight) - for idx, precommit := range seenValidation.Precommits { - if precommit == nil { - continue - } - added, _, err := lastPrecommits.AddByIndex(idx, precommit) - if !added || err != nil { - PanicCrisis(Fmt("Failed to reconstruct LastCommit: %v", err)) - } - } - if !lastPrecommits.HasTwoThirdsMajority() { - PanicSanity("Failed to reconstruct LastCommit: Does not have +2/3 maj") - } - cs.LastCommit = lastPrecommits +//---------------------------------------- +// Public interface + +// implements events.Eventable +func (cs *ConsensusState) SetFireable(evsw events.Fireable) { + cs.evsw = evsw +} + +func (cs *ConsensusState) String() string { + return Fmt("ConsensusState(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step) } func (cs *ConsensusState) GetState() *sm.State { @@ -260,6 +252,12 @@ func (cs *ConsensusState) getRoundState() *RoundState { return &rs } +func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.privValidator = priv +} + func (cs *ConsensusState) NewStepCh() chan *RoundState { return cs.newStepCh } @@ -267,21 +265,77 @@ func (cs *ConsensusState) NewStepCh() chan *RoundState { func (cs *ConsensusState) OnStart() error { cs.BaseService.OnStart() - cs.startRoutines() - go cs.scheduleRound0(cs.Height) + // first we start the round (no go routines) + // then we start the timeout and receive routines. + // buffered channels means scheduleRound0 will finish. Once it does, + // all further access to the RoundState is through the receiveRoutine + cs.scheduleRound0(cs.Height) + cs.startRoutines(0) // start timeout and receive return nil } -func (cs *ConsensusState) startRoutines() { - go cs.timeoutRoutine() // receive requests for timeouts on tickChan and fire timeouts on tockChan - go cs.receiveRoutine() // serializes processing of proposoals, block parts, votes and coordinates state transitions +func (cs *ConsensusState) startRoutines(maxSteps int) { + go cs.timeoutRoutine() // receive requests for timeouts on tickChan and fire timeouts on tockChan + go cs.receiveRoutine(maxSteps) // serializes processing of proposoals, block parts, votes, and coordinates state transitions } func (cs *ConsensusState) OnStop() { - // It's asynchronous so, there's not much to stop. - cs.BaseService.OnStop() + cs.QuitService.OnStop() } +/* + The following three functions can be used to send messages into the consensus state + which may cause a state transition +*/ + +// May block on send if queue is full. +func (cs *ConsensusState) AddVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) { + if peerKey == "" { + cs.internalMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""} + } else { + cs.peerMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, peerKey} + } + + // TODO: wait for event?! + return false, nil, nil +} + +func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerKey string) error { + + if peerKey == "" { + cs.internalMsgQueue <- msgInfo{&ProposalMessage{proposal}, ""} + } else { + cs.peerMsgQueue <- msgInfo{&ProposalMessage{proposal}, peerKey} + } + + // TODO: wait for event?! + return nil +} + +func (cs *ConsensusState) AddProposalBlockPart(height, round int, part *types.Part, peerKey string) error { + + if peerKey == "" { + cs.internalMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, ""} + } else { + cs.peerMsgQueue <- msgInfo{&BlockPartMessage{height, round, part}, peerKey} + } + + // TODO: wait for event?! + return nil +} + +func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *types.Block, parts *types.PartSet, peerKey string) error { + cs.SetProposal(proposal, peerKey) + for i := 0; i < parts.Total(); i++ { + part := parts.GetPart(i) + cs.AddProposalBlockPart(cs.Height, cs.Round, part, peerKey) + } + return nil // TODO errors +} + +//---------------------------------------------- +// internal functions for managing the state + func (cs *ConsensusState) updateHeight(height int) { cs.Height = height } @@ -295,17 +349,7 @@ func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) { func (cs *ConsensusState) scheduleRound0(height int) { //log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime) sleepDuration := cs.StartTime.Sub(time.Now()) - - // TODO: this go-routine ... - go func() { - // should we use the timeoutRoutine? - // we don't really need an event because we get one in NewRound - if 0 < sleepDuration { - time.Sleep(sleepDuration) - } - cs.EnterNewRound(height, 0) - }() - + cs.scheduleTimeout(sleepDuration, height, 0, 1) } // Attempt to schedule a timeout by sending timeoutInfo on the tickChan. @@ -315,6 +359,115 @@ func (cs *ConsensusState) scheduleTimeout(duration time.Duration, height, round cs.tickChan <- timeoutInfo{duration, height, round, step} } +// send a msg into the receiveRoutine regarding our own proposal, block part, or vote +func (cs *ConsensusState) sendInternalMessage(mi msgInfo) { + timeout := time.After(10 * time.Millisecond) + select { + case cs.internalMsgQueue <- mi: + case <-timeout: + log.Debug("Timed out trying to send an internal messge. Launching go-routine") + go func() { cs.internalMsgQueue <- mi }() + } +} + +// Reconstruct LastCommit from SeenValidation, which we saved along with the block, +// (which happens even before saving the state) +func (cs *ConsensusState) reconstructLastCommit(state *sm.State) { + if state.LastBlockHeight == 0 { + return + } + lastPrecommits := types.NewVoteSet(state.LastBlockHeight, 0, types.VoteTypePrecommit, state.LastValidators) + seenValidation := cs.blockStore.LoadSeenValidation(state.LastBlockHeight) + for idx, precommit := range seenValidation.Precommits { + if precommit == nil { + continue + } + added, _, err := lastPrecommits.AddByIndex(idx, precommit) + if !added || err != nil { + PanicCrisis(Fmt("Failed to reconstruct LastCommit: %v", err)) + } + } + if !lastPrecommits.HasTwoThirdsMajority() { + PanicSanity("Failed to reconstruct LastCommit: Does not have +2/3 maj") + } + cs.LastCommit = lastPrecommits +} + +// Updates ConsensusState and increments height to match that of state. +// The round becomes 0 and cs.Step becomes RoundStepNewHeight. +func (cs *ConsensusState) updateToState(state *sm.State) { + if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight { + PanicSanity(Fmt("updateToState() expected state height of %v but found %v", + cs.Height, state.LastBlockHeight)) + } + if cs.state != nil && cs.state.LastBlockHeight+1 != cs.Height { + // This might happen when someone else is mutating cs.state. + // Someone forgot to pass in state.Copy() somewhere?! + PanicSanity(Fmt("Inconsistent cs.state.LastBlockHeight+1 %v vs cs.Height %v", + cs.state.LastBlockHeight+1, cs.Height)) + } + + // If state isn't further out than cs.state, just ignore. + // This happens when SwitchToConsensus() is called in the reactor. + // We don't want to reset e.g. the Votes. + if cs.state != nil && (state.LastBlockHeight <= cs.state.LastBlockHeight) { + log.Notice("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1) + return + } + + // Reset fields based on state. + validators := state.Validators + height := state.LastBlockHeight + 1 // next desired block height + lastPrecommits := (*types.VoteSet)(nil) + if cs.CommitRound > -1 && cs.Votes != nil { + if !cs.Votes.Precommits(cs.CommitRound).HasTwoThirdsMajority() { + PanicSanity("updateToState(state) called but last Precommit round didn't have +2/3") + } + lastPrecommits = cs.Votes.Precommits(cs.CommitRound) + } + + // RoundState fields + cs.updateHeight(height) + cs.updateRoundStep(0, RoundStepNewHeight) + if cs.CommitTime.IsZero() { + // "Now" makes it easier to sync up dev nodes. + // We add timeoutCommit to allow transactions + // to be gathered for the first block. + // And alternative solution that relies on clocks: + // cs.StartTime = state.LastBlockTime.Add(timeoutCommit) + cs.StartTime = time.Now().Add(timeoutCommit) + } else { + cs.StartTime = cs.CommitTime.Add(timeoutCommit) + } + cs.CommitTime = time.Time{} + cs.Validators = validators + cs.Proposal = nil + cs.ProposalBlock = nil + cs.ProposalBlockParts = nil + cs.LockedRound = 0 + cs.LockedBlock = nil + cs.LockedBlockParts = nil + cs.Votes = NewHeightVoteSet(height, validators) + cs.CommitRound = -1 + cs.LastCommit = lastPrecommits + cs.LastValidators = state.LastValidators + + cs.state = state + cs.stagedBlock = nil + cs.stagedState = nil + + // Finally, broadcast RoundState + cs.newStep() +} + +func (cs *ConsensusState) newStep() { + cs.nSteps += 1 + cs.newStepCh <- cs.getRoundState() +} + +//----------------------------------------- +// the main go routines + // the state machine sends on tickChan to start a new timer. // timers are interupted and replaced by new ticks from later steps // timeouts of 0 on the tickChan will be immediately relayed to the tockChan @@ -341,6 +494,12 @@ func (cs *ConsensusState) timeoutRoutine() { ti = newti + // if the newti has duration == 0, we relay to the tockChan immediately (no timeout) + if ti.duration == time.Duration(0) { + go func(t timeoutInfo) { cs.tockChan <- t }(ti) + continue + } + log.Info("Scheduling timeout", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) cs.timeoutTicker.Stop() cs.timeoutTicker = time.NewTicker(ti.duration) @@ -351,32 +510,31 @@ func (cs *ConsensusState) timeoutRoutine() { // Determinism comes from playback in the receiveRoutine. // We can eliminate it by merging the timeoutRoutine into receiveRoutine // and managing the timeouts ourselves with a millisecond ticker - go func() { cs.tockChan <- ti }() + go func(t timeoutInfo) { cs.tockChan <- t }(ti) case <-cs.Quit: return } } } +// a nice idea but probably more trouble than its worth func (cs *ConsensusState) stopTimer() { cs.timeoutTicker.Stop() } -func (cs *ConsensusState) sendInternalMessage(mi msgInfo) { - timeout := time.After(10 * time.Millisecond) - select { - case cs.internalMsgQueue <- mi: - case <-timeout: - log.Debug("Timed out trying to send an internal messge. Launching go-routine") - go func() { cs.internalMsgQueue <- mi }() - } -} - // receiveRoutine handles messages which may cause state transitions. +// it's argument (n) is the number of messages to process before exiting - use 0 to run forever // It keeps the RoundState and is the only thing that updates it. // Updates happen on timeouts, complete proposals, and 2/3 majorities -func (cs *ConsensusState) receiveRoutine() { +func (cs *ConsensusState) receiveRoutine(maxSteps int) { for { + if maxSteps > 0 { + if cs.nSteps >= maxSteps { + log.Warn("reached max steps. exiting receive routine") + cs.nSteps = 0 + return + } + } rs := cs.RoundState var mi msgInfo @@ -400,17 +558,20 @@ func (cs *ConsensusState) receiveRoutine() { // state transitions on complete-proposal, 2/3-any, 2/3-one func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + var err error msg, peerKey := mi.msg, mi.peerKey switch msg := msg.(type) { case *ProposalMessage: // will not cause transition. // once proposal is set, we can receive block parts - err = cs.SetProposal(msg.Proposal) + err = cs.setProposal(msg.Proposal) case *BlockPartMessage: // if the proposal is complete, we'll EnterPrevote or tryFinalizeCommit // if we're the only validator, the EnterPrevote may take us through to the next round - _, err = cs.AddProposalBlockPart(msg.Height, msg.Part) + _, err = cs.addProposalBlockPart(msg.Height, msg.Part) case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature // if the vote gives us a 2/3-any or 2/3-one, we transition @@ -437,6 +598,18 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + + // if this is a timeout for the new height + if ti.height == rs.Height+1 && ti.round == 0 && ti.step == 1 { + cs.mtx.Lock() + // Increment height. + cs.updateToState(cs.stagedState) + // event fired from EnterNewRound after some updates + cs.EnterNewRound(ti.height, 0) + cs.mtx.Unlock() + return + } + // timeouts must be for current height, round, step if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) { log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step) @@ -444,6 +617,8 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { } // the timeout will now cause a state transition + cs.mtx.Lock() + defer cs.mtx.Unlock() switch ti.step { case RoundStepPropose: @@ -461,80 +636,11 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { } -// Updates ConsensusState and increments height to match that of state. -// The round becomes 0 and cs.Step becomes RoundStepNewHeight. -func (cs *ConsensusState) updateToState(state *sm.State) { - if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight { - PanicSanity(Fmt("updateToState() expected state height of %v but found %v", - cs.Height, state.LastBlockHeight)) - } - if cs.state != nil && cs.state.LastBlockHeight+1 != cs.Height { - // This might happen when someone else is mutating cs.state. - // Someone forgot to pass in state.Copy() somewhere?! - PanicSanity(Fmt("Inconsistent cs.state.LastBlockHeight+1 %v vs cs.Height %v", - cs.state.LastBlockHeight+1, cs.Height)) - } - - // If state isn't further out than cs.state, just ignore. - // This happens when SwitchToConsensus() is called in the reactor. - // We don't want to reset e.g. the Votes. - if cs.state != nil && (state.LastBlockHeight <= cs.state.LastBlockHeight) { - log.Notice("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1) - return - } - - // Reset fields based on state. - validators := state.Validators - height := state.LastBlockHeight + 1 // next desired block height - lastPrecommits := (*types.VoteSet)(nil) - if cs.CommitRound > -1 && cs.Votes != nil { - if !cs.Votes.Precommits(cs.CommitRound).HasTwoThirdsMajority() { - PanicSanity("updateToState(state) called but last Precommit round didn't have +2/3") - } - lastPrecommits = cs.Votes.Precommits(cs.CommitRound) - } - - // RoundState fields - cs.updateHeight(height) - cs.updateRoundStep(0, RoundStepNewHeight) - if cs.CommitTime.IsZero() { - // "Now" makes it easier to sync up dev nodes. - // We add timeoutCommit to allow transactions - // to be gathered for the first block. - // And alternative solution that relies on clocks: - // cs.StartTime = state.LastBlockTime.Add(timeoutCommit) - cs.StartTime = time.Now().Add(timeoutCommit) - } else { - cs.StartTime = cs.CommitTime.Add(timeoutCommit) - } - cs.CommitTime = time.Time{} - cs.Validators = validators - cs.Proposal = nil - cs.ProposalBlock = nil - cs.ProposalBlockParts = nil - cs.LockedRound = 0 - cs.LockedBlock = nil - cs.LockedBlockParts = nil - cs.Votes = NewHeightVoteSet(height, validators) - cs.CommitRound = -1 - cs.LastCommit = lastPrecommits - cs.LastValidators = state.LastValidators - - cs.state = state - cs.stagedBlock = nil - cs.stagedState = nil - - // Finally, broadcast RoundState - cs.newStepCh <- cs.getRoundState() -} - -func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { - cs.mtx.Lock() - defer cs.mtx.Unlock() - cs.privValidator = priv -} - //----------------------------------------------------------------------------- +// State functions +// Many of these functions are capitalized but are not really meant to be used +// by external code as it will cause race conditions with running timeout/receiveRoutine. +// Use AddVote, SetProposal, AddProposalBlockPart instead // Enter: +2/3 precommits for nil at (height,round-1) // Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1) @@ -549,7 +655,7 @@ func (cs *ConsensusState) EnterNewRound(height int, round int) { if now := time.Now(); cs.StartTime.After(now) { log.Warn("Need to set a buffer and log.Warn() here for sanity.", "startTime", cs.StartTime, "now", now) } - cs.stopTimer() + // cs.stopTimer() log.Notice(Fmt("EnterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) @@ -608,7 +714,7 @@ func (cs *ConsensusState) EnterPropose(height int, round int) { // Done EnterPropose: cs.updateRoundStep(round, RoundStepPropose) - cs.newStepCh <- cs.getRoundState() + cs.newStep() // If we have the whole proposal + POL, then goto Prevote now. // else, we'll EnterPrevote when the rest of the proposal is received (in AddProposalBlockPart), @@ -638,18 +744,21 @@ func (cs *ConsensusState) decideProposal(height, round int) { proposal := types.NewProposal(height, round, blockParts.Header(), cs.Votes.POLRound()) err := cs.privValidator.SignProposal(cs.state.ChainID, proposal) if err == nil { - log.Notice("Signed and set proposal", "height", height, "round", round, "proposal", proposal) - log.Debug(Fmt("Signed and set proposal block: %v", block)) // Set fields + /* fields set by setProposal and addBlockPart cs.Proposal = proposal cs.ProposalBlock = block cs.ProposalBlockParts = blockParts + */ + // send proposal and block parts on internal msg queue cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""}) for i := 0; i < blockParts.Total(); i++ { part := blockParts.GetPart(i) cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}) } + log.Notice("Signed and sent proposal", "height", height, "round", round, "proposal", proposal) + log.Debug(Fmt("Signed and sent proposal block: %v", block)) } else { log.Warn("EnterPropose: Error signing proposal", "height", height, "round", round, "error", err) } @@ -741,7 +850,7 @@ func (cs *ConsensusState) EnterPrevote(height int, round int) { // TODO: catchup event? } - cs.stopTimer() + // cs.stopTimer() log.Info(Fmt("EnterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) @@ -754,7 +863,7 @@ func (cs *ConsensusState) EnterPrevote(height int, round int) { cs.doPrevote(height, round) // Done EnterPrevote: - cs.newStepCh <- cs.getRoundState() + cs.newStep() // Once `addVote` hits any +2/3 prevotes, we will go to PrevoteWait // (so we have more time to try and collect +2/3 prevotes for a single block) @@ -806,7 +915,7 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) { // Done EnterPrevoteWait: cs.updateRoundStep(round, RoundStepPrevoteWait) - cs.newStepCh <- cs.getRoundState() + cs.newStep() // After `timeoutPrevote0+timeoutPrevoteDelta*round`, EnterPrecommit() cs.scheduleTimeout(timeoutPrevote0+timeoutPrevoteDelta*time.Duration(round), height, round, RoundStepPrevoteWait) @@ -826,14 +935,14 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { return } - cs.stopTimer() + // cs.stopTimer() log.Info(Fmt("EnterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) cs.updateRoundStep(round, RoundStepPrecommit) defer func() { // Done EnterPrecommit: - cs.newStepCh <- cs.getRoundState() + cs.newStep() }() hash, partsHeader, ok := cs.Votes.Prevotes(round).TwoThirdsMajority() @@ -929,7 +1038,7 @@ func (cs *ConsensusState) EnterPrecommitWait(height int, round int) { // Done EnterPrecommitWait: cs.updateRoundStep(round, RoundStepPrecommitWait) - cs.newStepCh <- cs.getRoundState() + cs.newStep() // After `timeoutPrecommit0+timeoutPrecommitDelta*round`, EnterNewRound() cs.scheduleTimeout(timeoutPrecommit0+timeoutPrecommitDelta*time.Duration(round), height, round, RoundStepPrecommitWait) @@ -951,7 +1060,7 @@ func (cs *ConsensusState) EnterCommit(height int, commitRound int) { // keep ca.Round the same, it points to the right Precommits set. cs.Step = RoundStepCommit cs.CommitRound = commitRound - cs.newStepCh <- cs.getRoundState() + cs.newStep() // Maybe finalize immediately. cs.tryFinalizeCommit(height) @@ -1030,11 +1139,11 @@ func (cs *ConsensusState) FinalizeCommit(height int) { log.Info(Fmt("Finalizing commit of block: %v", cs.ProposalBlock)) // We have the block, so stage/save/commit-vote. cs.saveBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Votes.Precommits(cs.CommitRound)) - // Increment height. - cs.updateToState(cs.stagedState) + + // call updateToState from handleTimeout + // cs.StartTime is already set. // Schedule Round0 to start soon. - // go cs.scheduleRound0(height + 1) // By here, @@ -1046,7 +1155,7 @@ func (cs *ConsensusState) FinalizeCommit(height int) { //----------------------------------------------------------------------------- -func (cs *ConsensusState) SetProposal(proposal *types.Proposal) error { +func (cs *ConsensusState) setProposal(proposal *types.Proposal) error { //cs.mtx.Lock() //defer cs.mtx.Unlock() @@ -1083,7 +1192,7 @@ func (cs *ConsensusState) SetProposal(proposal *types.Proposal) error { // NOTE: block is not necessarily valid. // This can trigger us to go into EnterPrevote asynchronously (before we timeout of propose) or to attempt to commit -func (cs *ConsensusState) AddProposalBlockPart(height int, part *types.Part) (added bool, err error) { +func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (added bool, err error) { //cs.mtx.Lock() //defer cs.mtx.Unlock() @@ -1148,34 +1257,11 @@ func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey str return added, nil } -/* - -// Interface to the state machine from external go routines. -// May block on send if queue is full. -// How do we get added/address/error back? -func (cs *ConsensusState) AddVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) { - if peerKey == "" { - cs.internalMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""} - } else { - cs.peerMsgQueue <- msgInfo{&VoteMessage{valIndex, vote}, peerKey} - } - - // TODO: wait for event?! -} - -*/ - //----------------------------------------------------------------------------- func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) { log.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "csHeight", cs.Height) - defer func() { - if added { - cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote}) - } - }() - // A precommit for the previous height? if vote.Height+1 == cs.Height { if !(cs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) { @@ -1186,6 +1272,8 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string added, address, err = cs.LastCommit.AddByIndex(valIndex, vote) if added { log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) + cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote}) + } return } @@ -1195,6 +1283,8 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string height := cs.Height added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey) if added { + cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote}) + switch vote.Type { case types.VoteTypePrevote: prevotes := cs.Votes.Prevotes(vote.Round) @@ -1307,6 +1397,7 @@ func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSet return vote, err } +// signs the vote, publishes on internalMsgQueue func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote { if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.Address) { return nil @@ -1360,14 +1451,7 @@ func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSe } -// implements events.Eventable -func (cs *ConsensusState) SetFireable(evsw events.Fireable) { - cs.evsw = evsw -} - -func (cs *ConsensusState) String() string { - return Fmt("ConsensusState(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step) -} +//--------------------------------------------------------- func CompareHRS(h1, r1 int, s1 RoundStepType, h2, r2 int, s2 RoundStepType) int { if h1 < h2 {