diff --git a/consensus/common_test.go b/consensus/common_test.go index a9da1cdb6..2650b03f3 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -325,7 +325,7 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { evsw := events.NewEventSwitch() cs.SetFireable(evsw) - evsw.OnStart() + evsw.Start() // start the transition routines // cs.startRoutines() @@ -339,19 +339,8 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { return cs, vss } -func subscribeToEvent(cs *ConsensusState, eventID string) chan interface{} { - evsw := cs.evsw.(*events.EventSwitch) - // listen for new round - ch := make(chan interface{}, 10) - evsw.AddListenerForEvent("tester", eventID, func(data types.EventData) { - ch <- data - }) - return ch -} - func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { - - voteCh0 := subscribeToEvent(cs, types.EventStringVote()) + voteCh0 := cs.evsw.(*events.EventSwitch).SubscribeToEvent(types.EventStringVote(), 0) voteCh := make(chan interface{}) go func() { for { @@ -395,6 +384,6 @@ func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.G } func startTestRound(cs *ConsensusState, height, round int) { - cs.EnterNewRound(height, round) + cs.enterNewRound(height, round) cs.startRoutines(0) } diff --git a/consensus/state.go b/consensus/state.go index 168f2e4ed..eb9b59030 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -351,11 +351,11 @@ func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) { cs.Step = step } -// EnterNewRound(height, 0) at cs.StartTime. +// enterNewRound(height, 0) at cs.StartTime. func (cs *ConsensusState) scheduleRound0(height int) { //log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime) sleepDuration := cs.StartTime.Sub(time.Now()) - cs.scheduleTimeout(sleepDuration, height, 0, 1) + cs.scheduleTimeout(sleepDuration, height, 0, RoundStepNewHeight) } // Attempt to schedule a timeout by sending timeoutInfo on the tickChan. @@ -367,11 +367,14 @@ func (cs *ConsensusState) scheduleTimeout(duration time.Duration, height, round // send a msg into the receiveRoutine regarding our own proposal, block part, or vote func (cs *ConsensusState) sendInternalMessage(mi msgInfo) { - timeout := time.After(10 * time.Millisecond) select { case cs.internalMsgQueue <- mi: - case <-timeout: - log.Debug("Timed out trying to send an internal messge. Launching go-routine") + default: + // NOTE: using the go-routine means our votes can + // be processed out of order. + // TODO: use CList here for strict determinism and + // attempt push to internalMsgQueue in receiveRoutine + log.Debug("Internal msg queue is full. Using a go-routine") go func() { cs.internalMsgQueue <- mi }() } } @@ -578,8 +581,8 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { // once proposal is set, we can receive block parts err = cs.setProposal(msg.Proposal) case *BlockPartMessage: - // if the proposal is complete, we'll EnterPrevote or tryFinalizeCommit - // if we're the only validator, the EnterPrevote may take us through to the next round + // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit + // if we're the only validator, the enterPrevote may take us through to the next round _, err = cs.addProposalBlockPart(msg.Height, msg.Part) case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature @@ -618,18 +621,18 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { switch ti.step { case RoundStepNewHeight: - // NewRound event fired from EnterNewRound. + // NewRound event fired from enterNewRound. // Do we want a timeout event too? - cs.EnterNewRound(ti.height, 0) + cs.enterNewRound(ti.height, 0) case RoundStepPropose: cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent()) - cs.EnterPrevote(ti.height, ti.round) + cs.enterPrevote(ti.height, ti.round) case RoundStepPrevoteWait: cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.EnterPrecommit(ti.height, ti.round) + cs.enterPrecommit(ti.height, ti.round) case RoundStepPrecommitWait: cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.EnterNewRound(ti.height, ti.round+1) + cs.enterNewRound(ti.height, ti.round+1) default: panic(Fmt("Invalid timeout step: %v", ti.step)) } @@ -646,9 +649,9 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { // Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1) // Enter: `startTime = commitTime+timeoutCommit` from NewHeight(height) // NOTE: cs.StartTime was already set for height. -func (cs *ConsensusState) EnterNewRound(height int, round int) { +func (cs *ConsensusState) enterNewRound(height int, round int) { if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != RoundStepNewHeight) { - log.Debug(Fmt("EnterNewRound(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterNewRound(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } @@ -657,7 +660,7 @@ func (cs *ConsensusState) EnterNewRound(height int, round int) { } // cs.stopTimer() - log.Notice(Fmt("EnterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Notice(Fmt("enterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) // Increment validators if necessary validators := cs.Validators @@ -684,23 +687,23 @@ func (cs *ConsensusState) EnterNewRound(height int, round int) { cs.evsw.FireEvent(types.EventStringNewRound(), cs.RoundStateEvent()) - // Immediately go to EnterPropose. - cs.EnterPropose(height, round) + // Immediately go to enterPropose. + cs.enterPropose(height, round) } // Enter: from NewRound(height,round). -func (cs *ConsensusState) EnterPropose(height int, round int) { +func (cs *ConsensusState) enterPropose(height int, round int) { // cs.mtx.Lock() // cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPropose <= cs.Step) { - log.Debug(Fmt("EnterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } - log.Info(Fmt("EnterPropose(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPropose(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) defer func() { - // Done EnterPropose: + // Done enterPropose: cs.updateRoundStep(round, RoundStepPropose) cs.newStep() }() @@ -714,17 +717,17 @@ func (cs *ConsensusState) EnterPropose(height int, round int) { } if !bytes.Equal(cs.Validators.Proposer().Address, cs.privValidator.Address) { - log.Info("EnterPropose: Not our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) + log.Info("enterPropose: Not our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) } else { - log.Info("EnterPropose: Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) + log.Info("enterPropose: Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) cs.decideProposal(height, round) } // If we have the whole proposal + POL, then goto Prevote now. - // else, we'll EnterPrevote when the rest of the proposal is received (in AddProposalBlockPart), + // else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart), // or else after timeoutPropose if cs.isProposalComplete() { - cs.EnterPrevote(height, cs.Round) + cs.enterPrevote(height, cs.Round) } } @@ -764,7 +767,7 @@ func (cs *ConsensusState) decideProposal(height, round int) { log.Notice("Signed and sent proposal", "height", height, "round", round, "proposal", proposal) log.Debug(Fmt("Signed and sent proposal block: %v", block)) } else { - log.Warn("EnterPropose: Error signing proposal", "height", height, "round", round, "error", err) + log.Warn("enterPropose: Error signing proposal", "height", height, "round", round, "error", err) } } @@ -799,7 +802,7 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts validation = cs.LastCommit.MakeValidation() } else { // This shouldn't happen. - log.Error("EnterPropose: Cannot propose anything: No validation for the previous block.") + log.Error("enterPropose: Cannot propose anything: No validation for the previous block.") return } @@ -838,16 +841,16 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts // Enter: any +2/3 prevotes for future round. // Prevote for LockedBlock if we're locked, or ProposalBlock if valid. // Otherwise vote nil. -func (cs *ConsensusState) EnterPrevote(height int, round int) { +func (cs *ConsensusState) enterPrevote(height int, round int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevote <= cs.Step) { - log.Debug(Fmt("EnterPrevote(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPrevote(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } defer func() { - // Done EnterPrevote: + // Done enterPrevote: cs.updateRoundStep(round, RoundStepPrevote) cs.newStep() }() @@ -862,7 +865,7 @@ func (cs *ConsensusState) EnterPrevote(height int, round int) { // cs.stopTimer() - log.Info(Fmt("EnterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) // Sign and broadcast vote as necessary cs.doPrevote(height, round) @@ -874,14 +877,14 @@ func (cs *ConsensusState) EnterPrevote(height int, round int) { func (cs *ConsensusState) doPrevote(height int, round int) { // If a block is locked, prevote that. if cs.LockedBlock != nil { - log.Info("EnterPrevote: Block was locked") + log.Info("enterPrevote: Block was locked") cs.signAddVote(types.VoteTypePrevote, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header()) return } // If ProposalBlock is nil, prevote nil. if cs.ProposalBlock == nil { - log.Warn("EnterPrevote: ProposalBlock is nil") + log.Warn("enterPrevote: ProposalBlock is nil") cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{}) return } @@ -890,7 +893,7 @@ func (cs *ConsensusState) doPrevote(height int, round int) { err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts) if err != nil { // ProposalBlock is invalid, prevote nil. - log.Warn("EnterPrevote: ProposalBlock is invalid", "error", err) + log.Warn("enterPrevote: ProposalBlock is invalid", "error", err) cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{}) return } @@ -903,25 +906,25 @@ func (cs *ConsensusState) doPrevote(height int, round int) { } // Enter: any +2/3 prevotes at next round. -func (cs *ConsensusState) EnterPrevoteWait(height int, round int) { +func (cs *ConsensusState) enterPrevoteWait(height int, round int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevoteWait <= cs.Step) { - log.Debug(Fmt("EnterPrevoteWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPrevoteWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } if !cs.Votes.Prevotes(round).HasTwoThirdsAny() { - PanicSanity(Fmt("EnterPrevoteWait(%v/%v), but Prevotes does not have any +2/3 votes", height, round)) + PanicSanity(Fmt("enterPrevoteWait(%v/%v), but Prevotes does not have any +2/3 votes", height, round)) } - log.Info(Fmt("EnterPrevoteWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPrevoteWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) defer func() { - // Done EnterPrevoteWait: + // Done enterPrevoteWait: cs.updateRoundStep(round, RoundStepPrevoteWait) cs.newStep() }() - // After `timeoutPrevote0+timeoutPrevoteDelta*round`, EnterPrecommit() + // After `timeoutPrevote0+timeoutPrevoteDelta*round`, enterPrecommit() cs.scheduleTimeout(timeoutPrevote0+timeoutPrevoteDelta*time.Duration(round), height, round, RoundStepPrevoteWait) } @@ -931,20 +934,20 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) { // Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round) // else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil, // else, precommit nil otherwise. -func (cs *ConsensusState) EnterPrecommit(height int, round int) { +func (cs *ConsensusState) enterPrecommit(height int, round int) { //cs.mtx.Lock() // defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommit <= cs.Step) { - log.Debug(Fmt("EnterPrecommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPrecommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } // cs.stopTimer() - log.Info(Fmt("EnterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) defer func() { - // Done EnterPrecommit: + // Done enterPrecommit: cs.updateRoundStep(round, RoundStepPrecommit) cs.newStep() }() @@ -954,9 +957,9 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { // If we don't have a polka, we must precommit nil if !ok { if cs.LockedBlock != nil { - log.Info("EnterPrecommit: No +2/3 prevotes during EnterPrecommit while we're locked. Precommitting nil") + log.Info("enterPrecommit: No +2/3 prevotes during enterPrecommit while we're locked. Precommitting nil") } else { - log.Info("EnterPrecommit: No +2/3 prevotes during EnterPrecommit. Precommitting nil.") + log.Info("enterPrecommit: No +2/3 prevotes during enterPrecommit. Precommitting nil.") } cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{}) return @@ -973,9 +976,9 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { // +2/3 prevoted nil. Unlock and precommit nil. if len(hash) == 0 { if cs.LockedBlock == nil { - log.Info("EnterPrecommit: +2/3 prevoted for nil.") + log.Info("enterPrecommit: +2/3 prevoted for nil.") } else { - log.Info("EnterPrecommit: +2/3 prevoted for nil. Unlocking") + log.Info("enterPrecommit: +2/3 prevoted for nil. Unlocking") cs.LockedRound = 0 cs.LockedBlock = nil cs.LockedBlockParts = nil @@ -989,7 +992,7 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { // If we're already locked on that block, precommit it, and update the LockedRound if cs.LockedBlock.HashesTo(hash) { - log.Info("EnterPrecommit: +2/3 prevoted locked block. Relocking") + log.Info("enterPrecommit: +2/3 prevoted locked block. Relocking") cs.LockedRound = round cs.evsw.FireEvent(types.EventStringRelock(), cs.RoundStateEvent()) cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader) @@ -998,10 +1001,10 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { // If +2/3 prevoted for proposal block, stage and precommit it if cs.ProposalBlock.HashesTo(hash) { - log.Info("EnterPrecommit: +2/3 prevoted proposal block. Locking", "hash", hash) + log.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", hash) // Validate the block. if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil { - PanicConsensus(Fmt("EnterPrecommit: +2/3 prevoted for an invalid block: %v", err)) + PanicConsensus(Fmt("enterPrecommit: +2/3 prevoted for an invalid block: %v", err)) } cs.LockedRound = round cs.LockedBlock = cs.ProposalBlock @@ -1028,41 +1031,41 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { } // Enter: any +2/3 precommits for next round. -func (cs *ConsensusState) EnterPrecommitWait(height int, round int) { +func (cs *ConsensusState) enterPrecommitWait(height int, round int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommitWait <= cs.Step) { - log.Debug(Fmt("EnterPrecommitWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPrecommitWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } if !cs.Votes.Precommits(round).HasTwoThirdsAny() { - PanicSanity(Fmt("EnterPrecommitWait(%v/%v), but Precommits does not have any +2/3 votes", height, round)) + PanicSanity(Fmt("enterPrecommitWait(%v/%v), but Precommits does not have any +2/3 votes", height, round)) } - log.Info(Fmt("EnterPrecommitWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPrecommitWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) defer func() { - // Done EnterPrecommitWait: + // Done enterPrecommitWait: cs.updateRoundStep(round, RoundStepPrecommitWait) cs.newStep() }() - // After `timeoutPrecommit0+timeoutPrecommitDelta*round`, EnterNewRound() + // After `timeoutPrecommit0+timeoutPrecommitDelta*round`, enterNewRound() cs.scheduleTimeout(timeoutPrecommit0+timeoutPrecommitDelta*time.Duration(round), height, round, RoundStepPrecommitWait) } // Enter: +2/3 precommits for block -func (cs *ConsensusState) EnterCommit(height int, commitRound int) { +func (cs *ConsensusState) enterCommit(height int, commitRound int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || RoundStepCommit <= cs.Step { - log.Debug(Fmt("EnterCommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterCommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) return } - log.Info(Fmt("EnterCommit(%v/%v). Current: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterCommit(%v/%v). Current: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) defer func() { - // Done Entercommit: + // Done enterCommit: // keep ca.Round the same, it points to the right Precommits set. cs.updateRoundStep(cs.Round, RoundStepCommit) cs.CommitRound = commitRound @@ -1198,7 +1201,7 @@ func (cs *ConsensusState) setProposal(proposal *types.Proposal) error { } // NOTE: block is not necessarily valid. -// This can trigger us to go into EnterPrevote asynchronously (before we timeout of propose) or to attempt to commit +// This can trigger us to go into enterPrevote asynchronously (before we timeout of propose) or to attempt to commit func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (added bool, err error) { //cs.mtx.Lock() //defer cs.mtx.Unlock() @@ -1225,7 +1228,7 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (ad log.Info("Received complete proposal", "hash", cs.ProposalBlock.Hash(), "round", cs.Proposal.Round) if cs.Step == RoundStepPropose && cs.isProposalComplete() { // Move onto the next step - cs.EnterPrevote(height, cs.Round) + cs.enterPrevote(height, cs.Round) } else if cs.Step == RoundStepCommit { // If we're waiting on the proposal block... cs.tryFinalizeCommit(height) @@ -1299,7 +1302,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string // First, unlock if prevotes is a valid POL. // >> lockRound < POLRound <= unlockOrChangeLockRound (see spec) // NOTE: If (lockRound < POLRound) but !(POLRound <= unlockOrChangeLockRound), - // we'll still EnterNewRound(H,vote.R) and EnterPrecommit(H,vote.R) to process it + // we'll still enterNewRound(H,vote.R) and enterPrecommit(H,vote.R) to process it // there. if (cs.LockedBlock != nil) && (cs.LockedRound < vote.Round) && (vote.Round <= cs.Round) { hash, _, ok := prevotes.TwoThirdsMajority() @@ -1313,17 +1316,17 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string } if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() { // Round-skip over to PrevoteWait or goto Precommit. - cs.EnterNewRound(height, vote.Round) // if the vote is ahead of us + cs.enterNewRound(height, vote.Round) // if the vote is ahead of us if prevotes.HasTwoThirdsMajority() { - cs.EnterPrecommit(height, vote.Round) + cs.enterPrecommit(height, vote.Round) } else { - cs.EnterPrevote(height, vote.Round) // if the vote is ahead of us - cs.EnterPrevoteWait(height, vote.Round) + cs.enterPrevote(height, vote.Round) // if the vote is ahead of us + cs.enterPrevoteWait(height, vote.Round) } } else if cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round { // If the proposal is now complete, enter prevote of cs.Round. if cs.isProposalComplete() { - cs.EnterPrevote(height, cs.Round) + cs.enterPrevote(height, cs.Round) } } case types.VoteTypePrecommit: @@ -1332,16 +1335,16 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string hash, _, ok := precommits.TwoThirdsMajority() if ok { if len(hash) == 0 { - cs.EnterNewRound(height, vote.Round+1) + cs.enterNewRound(height, vote.Round+1) } else { - cs.EnterNewRound(height, vote.Round) - cs.EnterPrecommit(height, vote.Round) - cs.EnterCommit(height, vote.Round) + cs.enterNewRound(height, vote.Round) + cs.enterPrecommit(height, vote.Round) + cs.enterCommit(height, vote.Round) } } else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() { - cs.EnterNewRound(height, vote.Round) - cs.EnterPrecommit(height, vote.Round) - cs.EnterPrecommitWait(height, vote.Round) + cs.enterNewRound(height, vote.Round) + cs.enterPrecommit(height, vote.Round) + cs.enterPrecommitWait(height, vote.Round) //}() } default: diff --git a/consensus/state_test.go b/consensus/state_test.go index 459af95e7..d4917ac39 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -8,7 +8,7 @@ import ( _ "github.com/tendermint/tendermint/config/tendermint_test" //"github.com/tendermint/tendermint/events" - // "github.com/tendermint/tendermint/events" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/types" ) @@ -54,8 +54,9 @@ func TestProposerSelection0(t *testing.T) { cs1, vss := simpleConsensusState(4) height, round := cs1.Height, cs1.Round - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) + evsw := cs1.evsw.(*events.EventSwitch) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) startTestRound(cs1, height, round) @@ -87,7 +88,8 @@ func TestProposerSelection0(t *testing.T) { func TestProposerSelection2(t *testing.T) { cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + evsw := cs1.evsw.(*events.EventSwitch) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) // this time we jump in at round 2 incrementRound(vss[1:]...) @@ -118,8 +120,9 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { cs.SetPrivValidator(nil) height, round := cs.Height, cs.Round + evsw := cs.evsw.(*events.EventSwitch) // Listen for propose timeout event - timeoutCh := subscribeToEvent(cs, types.EventStringTimeoutPropose()) + timeoutCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) startTestRound(cs, height, round) @@ -143,11 +146,11 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { height, round := cs.Height, cs.Round // Listen for propose timeout event - timeoutCh := subscribeToEvent(cs, types.EventStringTimeoutPropose()) - proposalCh := subscribeToEvent(cs, types.EventStringCompleteProposal()) + evsw := cs.evsw.(*events.EventSwitch) + timeoutCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - //startTestRound(cs, height, round) - cs.EnterNewRound(height, round) + cs.enterNewRound(height, round) cs.startRoutines(3) <-proposalCh @@ -164,7 +167,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { t.Error("rs.ProposalBlockParts should be set") } - // if we're a validator, EnterPropose should not timeout + // if we're a validator, enterPropose should not timeout ticker := time.NewTicker(timeoutPropose * 2) select { case <-timeoutCh: @@ -179,8 +182,9 @@ func TestBadProposal(t *testing.T) { height, round := cs1.Height, cs1.Round cs2 := vss[1] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - voteCh := subscribeToEvent(cs1, types.EventStringVote()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) propBlock, _ := cs1.createProposalBlock() //changeProposer(t, cs1, cs2) @@ -234,9 +238,10 @@ func TestFullRound1(t *testing.T) { cs, vss := simpleConsensusState(1) height, round := cs.Height, cs.Round - voteCh := subscribeToEvent(cs, types.EventStringVote()) - propCh := subscribeToEvent(cs, types.EventStringCompleteProposal()) - newRoundCh := subscribeToEvent(cs, types.EventStringNewRound()) + evsw := cs.evsw.(*events.EventSwitch) + voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) + propCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) startTestRound(cs, height, round) @@ -262,9 +267,10 @@ func TestFullRoundNil(t *testing.T) { cs, vss := simpleConsensusState(1) height, round := cs.Height, cs.Round - voteCh := subscribeToEvent(cs, types.EventStringVote()) + evsw := cs.evsw.(*events.EventSwitch) + voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) - cs.EnterPrevote(height, round) + cs.enterPrevote(height, round) cs.startRoutines(4) <-voteCh // prevote @@ -281,8 +287,9 @@ func TestFullRound2(t *testing.T) { cs2 := vss[1] height, round := cs1.Height, cs1.Round - voteCh := subscribeToEvent(cs1, types.EventStringVote()) - newBlockCh := subscribeToEvent(cs1, types.EventStringNewBlock()) + evsw := cs1.evsw.(*events.EventSwitch) + voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) + newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0) // start round and wait for propose and prevote startTestRound(cs1, height, round) @@ -322,18 +329,19 @@ func TestLockNoPOL(t *testing.T) { cs2 := vss[1] height := cs1.Height - timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - voteCh := subscribeToEvent(cs1, types.EventStringVote()) - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + evsw := cs1.evsw.(*events.EventSwitch) + timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) + voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) /* Round1 (cs1, B) // B B // B B2 */ // start round and wait for prevote - cs1.EnterNewRound(height, 0) + cs1.enterNewRound(height, 0) cs1.startRoutines(0) re := <-proposalCh @@ -362,7 +370,7 @@ func TestLockNoPOL(t *testing.T) { <-voteCh // precommit // (note we're entering precommit for a second time this round) - // but with invalid args. then we EnterPrecommitWait, and the timeout to new round + // but with invalid args. then we enterPrecommitWait, and the timeout to new round <-timeoutWaitCh /// @@ -409,7 +417,7 @@ func TestLockNoPOL(t *testing.T) { <-voteCh // (note we're entering precommit for a second time this round, but with invalid args - // then we EnterPrecommitWait and timeout into NewRound + // then we enterPrecommitWait and timeout into NewRound <-timeoutWaitCh <-newRoundCh @@ -462,6 +470,7 @@ func TestLockNoPOL(t *testing.T) { // so set the proposal block cs1.SetProposalAndBlock(prop, propBlock, propBlock.MakePartSet(), "") + <-proposalCh <-voteCh // prevote // prevote for locked block (not proposal) @@ -483,12 +492,13 @@ func TestLockPOLRelock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - voteCh := subscribeToEvent(cs1, types.EventStringVote()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) - newBlockCh := subscribeToEvent(cs1, types.EventStringNewBlock()) + evsw := cs1.evsw.(*events.EventSwitch) + timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0) log.Debug("cs2 last round", "lr", cs2.PrivValidator.LastRound) @@ -591,11 +601,12 @@ func TestLockPOLUnlock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) - unlockCh := subscribeToEvent(cs1, types.EventStringUnlock()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + unlockCh := evsw.SubscribeToEvent(types.EventStringUnlock(), 0) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // everything done from perspective of cs1 @@ -682,10 +693,11 @@ func TestLockPOLSafety1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote @@ -780,7 +792,7 @@ func TestLockPOLSafety1(t *testing.T) { // we should prevote what we're locked on validatePrevote(t, cs1, 2, vss[0], propBlockHash) - newStepCh := subscribeToEvent(cs1, types.EventStringNewRoundStep()) + newStepCh := evsw.SubscribeToEvent(types.EventStringNewRoundStep(), 0) // add prevotes from the earlier round addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) @@ -801,11 +813,12 @@ func TestLockPOLSafety2(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) - unlockCh := subscribeToEvent(cs1, types.EventStringUnlock()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + unlockCh := evsw.SubscribeToEvent(types.EventStringUnlock(), 0) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // the block for R0: gets polkad but we miss it @@ -891,9 +904,10 @@ func TestSlashingPrevotes(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal() , 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait() , 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound() , 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote @@ -925,9 +939,10 @@ func TestSlashingPrecommits(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal() , 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait() , 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound() , 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote @@ -969,10 +984,11 @@ func TestHalt1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) - newBlockCh := subscribeToEvent(cs1, types.EventStringNewBlock()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote diff --git a/events/events.go b/events/events.go index 05f7115f6..e70d39620 100644 --- a/events/events.go +++ b/events/events.go @@ -123,6 +123,16 @@ func (evsw *EventSwitch) FireEvent(event string, data types.EventData) { eventCell.FireEvent(data) } +func (evsw *EventSwitch) SubscribeToEvent(eventID string, chanCap int) chan interface{} { + // listen for new round + ch := make(chan interface{}, chanCap) + evsw.AddListenerForEvent("tester", eventID, func(data types.EventData) { + // NOTE: in production, evsw callbacks should be nonblocking. + ch <- data + }) + return ch +} + //----------------------------------------------------------------------------- // eventCell handles keeping track of listener callbacks for a given event. diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index bdb942ffa..5d004fc5c 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -206,7 +206,7 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { // rpc.websocket const ( - writeChanCapacity = 20 + writeChanCapacity = 1000 wsWriteTimeoutSeconds = 30 // each write times out after this wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. @@ -287,7 +287,7 @@ func (wsc *WSConnection) readTimeoutRoutine() { } } -// Attempt to write response to writeChan and record failures +// Blocking write to writeChan until service stops. func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) { select { case wsc.writeChan <- resp: @@ -297,6 +297,18 @@ func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) { } } +// Nonblocking write. +func (wsc *WSConnection) tryWriteRPCResponse(resp RPCResponse) bool { + select { + case <-wsc.Quit: + return false + case wsc.writeChan <- resp: + return true + default: + return false + } +} + // Read from the socket and subscribe to or unsubscribe from events func (wsc *WSConnection) readRoutine() { // Do not close writeChan, to allow writeRPCResponse() to fail. @@ -339,8 +351,9 @@ func (wsc *WSConnection) readRoutine() { } else { log.Notice("Subscribe to event", "id", wsc.id, "event", event) wsc.evsw.AddListenerForEvent(wsc.id, event, func(msg types.EventData) { + // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsc.writeRPCResponse(NewRPCResponse(request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) + wsc.tryWriteRPCResponse(NewRPCResponse(request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) }) continue }