diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 473525a3b..9870367c7 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -52,7 +52,7 @@ type BlockchainReactor struct { timeoutsCh chan string lastBlock *types.Block - evsw events.Fireable + evsw *events.EventSwitch } func NewBlockchainReactor(state *sm.State, proxyAppCtx proxy.AppContext, store *BlockStore, sync bool) *BlockchainReactor { @@ -263,7 +263,7 @@ func (bcR *BlockchainReactor) BroadcastStatusRequest() error { } // implements events.Eventable -func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) { +func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) { bcR.evsw = evsw } diff --git a/consensus/common_test.go b/consensus/common_test.go index 052c26f80..94b30dd68 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -78,6 +78,7 @@ func decideProposal(cs1 *ConsensusState, cs2 *validatorStub, height, round int) //------------------------------------------------------------------------------- // utils +/* func nilRound(t *testing.T, cs1 *ConsensusState, vss ...*validatorStub) { cs1.mtx.Lock() height, round := cs1.Height, cs1.Round @@ -93,6 +94,7 @@ func nilRound(t *testing.T, cs1 *ConsensusState, vss ...*validatorStub) { waitFor(t, cs1, height, round+1, RoundStepNewRound) } +*/ // NOTE: this switches the propser as far as `perspectiveOf` is concerned, // but for simplicity we return a block it generated. @@ -172,6 +174,17 @@ func signAddVoteToFrom(voteType byte, to *ConsensusState, from *validatorStub, h return vote } +func ensureNoNewStep(stepCh chan interface{}) { + timeout := time.NewTicker(ensureTimeout * time.Second) + select { + case <-timeout.C: + break + case <-stepCh: + panic("We should be stuck waiting for more votes, not moving to the next step") + } +} + +/* func ensureNoNewStep(t *testing.T, cs *ConsensusState) { timeout := time.NewTicker(ensureTimeout * time.Second) select { @@ -202,6 +215,19 @@ func waitFor(t *testing.T, cs *ConsensusState, height int, round int, step Round } } } +*/ + +func incrementHeight(vss ...*validatorStub) { + for _, vs := range vss { + vs.Height += 1 + } +} + +func incrementRound(vss ...*validatorStub) { + for _, vs := range vss { + vs.Round += 1 + } +} func validatePrevote(t *testing.T, cs *ConsensusState, round int, privVal *validatorStub, blockHash []byte) { prevotes := cs.Votes.Prevotes(round) @@ -220,15 +246,14 @@ func validatePrevote(t *testing.T, cs *ConsensusState, round int, privVal *valid } } -func incrementHeight(vss ...*validatorStub) { - for _, vs := range vss { - vs.Height += 1 +func validateLastPrecommit(t *testing.T, cs *ConsensusState, privVal *validatorStub, blockHash []byte) { + votes := cs.LastCommit + var vote *types.Vote + if vote = votes.GetByAddress(privVal.Address); vote == nil { + panic("Failed to find precommit from validator") } -} - -func incrementRound(vss ...*validatorStub) { - for _, vs := range vss { - vs.Round += 1 + if !bytes.Equal(vote.BlockHash, blockHash) { + panic(fmt.Sprintf("Expected precommit to be for %X, got %X", blockHash, vote.BlockHash)) } } @@ -298,17 +323,9 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { cs := NewConsensusState(state, proxyAppCtxCon, blockStore, mempool) cs.SetPrivValidator(privVals[0]) - // from the updateToState in NewConsensusState - <-cs.NewStepCh() - evsw := events.NewEventSwitch() - cs.SetFireable(evsw) - evsw.OnStart() - go func() { - for { - <-cs.NewStepCh() - } - }() + cs.SetEventSwitch(evsw) + evsw.Start() // start the transition routines // cs.startRoutines() @@ -322,14 +339,20 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { return cs, vss } -func subscribeToEvent(cs *ConsensusState, eventID string) chan interface{} { - evsw := cs.evsw.(*events.EventSwitch) - // listen for new round - ch := make(chan interface{}, 10) - evsw.AddListenerForEvent("tester", eventID, func(data types.EventData) { - ch <- data - }) - return ch +func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { + voteCh0 := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + voteCh := make(chan interface{}) + go func() { + for { + v := <-voteCh0 + vote := v.(*types.EventDataVote) + // we only fire for our own votes + if bytes.Equal(addr, vote.Address) { + voteCh <- v + } + } + }() + return voteCh } func randGenesisState(numValidators int, randPower bool, minPower int64) (*sm.State, []*types.PrivValidator) { @@ -361,6 +384,6 @@ func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.G } func startTestRound(cs *ConsensusState, height, round int) { - cs.EnterNewRound(height, round) + cs.enterNewRound(height, round) cs.startRoutines(0) } diff --git a/consensus/reactor.go b/consensus/reactor.go index d747b3364..29a9be86d 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -34,7 +34,7 @@ type ConsensusReactor struct { blockStore *bc.BlockStore conS *ConsensusState fastSync bool - evsw events.Fireable + evsw *events.EventSwitch } func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor { @@ -50,13 +50,17 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockSto func (conR *ConsensusReactor) OnStart() error { log.Notice("ConsensusReactor ", "fastSync", conR.fastSync) conR.BaseReactor.OnStart() + + // callbacks for broadcasting new steps and votes to peers + // upon their respective events (ie. uses evsw) + conR.registerEventCallbacks() + if !conR.fastSync { _, err := conR.conS.Start() if err != nil { return err } } - go conR.broadcastNewRoundStepRoutine() return nil } @@ -134,7 +138,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // NOTE: We process these messages even when we're fast_syncing. // Messages affect either a peer state or the consensus state. // Peer state updates can happen in parallel, but processing of -// proposals, block parts, and votes are ordered. +// proposals, block parts, and votes are ordered by the receiveRoutine func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) { if !conR.IsRunning() { log.Debug("Receive", "channel", chID, "peer", peer, "bytes", msgBytes) @@ -213,6 +217,45 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte } } +// Sets our private validator account for signing votes. +func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) { + conR.conS.SetPrivValidator(priv) +} + +// implements events.Eventable +func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { + conR.evsw = evsw + conR.conS.SetEventSwitch(evsw) +} + +//-------------------------------------- + +// Listens for new steps and votes, +// broadcasting the result to peers +func (conR *ConsensusReactor) registerEventCallbacks() { + + conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) { + rs := data.(*types.EventDataRoundState).RoundState().(*RoundState) + conR.broadcastNewRoundStep(rs) + }) + + conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { + edv := data.(*types.EventDataVote) + conR.broadcastHasVoteMessage(edv.Vote, edv.Index) + }) +} + +func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) { + + nrsMsg, csMsg := makeRoundStepMessages(rs) + if nrsMsg != nil { + conR.Switch.Broadcast(StateChannel, nrsMsg) + } + if csMsg != nil { + conR.Switch.Broadcast(StateChannel, csMsg) + } +} + // Broadcasts HasVoteMessage to peers that care. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) { msg := &HasVoteMessage{ @@ -239,28 +282,16 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index in */ } -// Sets our private validator account for signing votes. -func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) { - conR.conS.SetPrivValidator(priv) -} - -// implements events.Eventable -func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { - conR.evsw = evsw - conR.conS.SetFireable(evsw) -} - -//-------------------------------------- - func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { + step := RoundStepType(rs.Step) nrsMsg = &NewRoundStepMessage{ Height: rs.Height, Round: rs.Round, - Step: rs.Step, + Step: step, SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()), LastCommitRound: rs.LastCommit.Round(), } - if rs.Step == RoundStepCommit { + if step == RoundStepCommit { csMsg = &CommitStepMessage{ Height: rs.Height, BlockPartsHeader: rs.ProposalBlockParts.Header(), @@ -270,28 +301,6 @@ func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg * return } -// Listens for changes to the ConsensusState.Step by pulling -// on conR.conS.NewStepCh(). -func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() { - for { - // Get RoundState with new Step or quit. - var rs *RoundState - select { - case rs = <-conR.conS.NewStepCh(): - case <-conR.Quit: - return - } - - nrsMsg, csMsg := makeRoundStepMessages(rs) - if nrsMsg != nil { - conR.Switch.Broadcast(StateChannel, nrsMsg) - } - if csMsg != nil { - conR.Switch.Broadcast(StateChannel, csMsg) - } - } -} - func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) { rs := conR.conS.GetRoundState() nrsMsg, csMsg := makeRoundStepMessages(rs) diff --git a/consensus/state.go b/consensus/state.go index 0dd0efe6a..bb9bf586b 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -98,19 +98,13 @@ type RoundState struct { } func (rs *RoundState) RoundStateEvent() *types.EventDataRoundState { - return &types.EventDataRoundState{ - CurrentTime: time.Now(), - Height: rs.Height, - Round: rs.Round, - Step: rs.Step.String(), - StartTime: rs.StartTime, - CommitTime: rs.CommitTime, - Proposal: rs.Proposal, - ProposalBlock: rs.ProposalBlock, - LockedRound: rs.LockedRound, - LockedBlock: rs.LockedBlock, - POLRound: rs.Votes.POLRound(), + edrs := &types.EventDataRoundState{ + Height: rs.Height, + Round: rs.Round, + Step: rs.Step.String(), } + edrs.SetRoundState(rs) + return edrs } func (rs *RoundState) String() string { @@ -183,7 +177,6 @@ type ConsensusState struct { blockStore *bc.BlockStore mempool *mempl.Mempool privValidator *types.PrivValidator - newStepCh chan *RoundState mtx sync.Mutex RoundState @@ -197,7 +190,7 @@ type ConsensusState struct { tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine - evsw events.Fireable + evsw *events.EventSwitch evc *events.EventCache // set in stageBlock and passed into state nSteps int // used for testing to limit the number of transitions the state makes @@ -208,7 +201,6 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore proxyAppCtx: proxyAppCtx, blockStore: blockStore, mempool: mempool, - newStepCh: make(chan *RoundState, 10), peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), timeoutTicker: new(time.Ticker), @@ -227,7 +219,7 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore // Public interface // implements events.Eventable -func (cs *ConsensusState) SetFireable(evsw events.Fireable) { +func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) { cs.evsw = evsw } @@ -258,35 +250,33 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { cs.privValidator = priv } -func (cs *ConsensusState) NewStepCh() chan *RoundState { - return cs.newStepCh -} - func (cs *ConsensusState) OnStart() error { cs.BaseService.OnStart() - // first we start the round (no go routines) + // first we schedule the round (no go routines) // then we start the timeout and receive routines. - // buffered channels means scheduleRound0 will finish. Once it does, - // all further access to the RoundState is through the receiveRoutine + // tickChan is buffered so scheduleRound0 will finish. + // Then all further access to the RoundState is through the receiveRoutine cs.scheduleRound0(cs.Height) - cs.startRoutines(0) // start timeout and receive + cs.startRoutines(0) return nil } +// timeoutRoutine: receive requests for timeouts on tickChan and fire timeouts on tockChan +// receiveRoutine: serializes processing of proposoals, block parts, votes; coordinates state transitions func (cs *ConsensusState) startRoutines(maxSteps int) { - go cs.timeoutRoutine() // receive requests for timeouts on tickChan and fire timeouts on tockChan - go cs.receiveRoutine(maxSteps) // serializes processing of proposoals, block parts, votes, and coordinates state transitions + go cs.timeoutRoutine() + go cs.receiveRoutine(maxSteps) } func (cs *ConsensusState) OnStop() { cs.QuitService.OnStop() } -/* - The following three functions can be used to send messages into the consensus state - which may cause a state transition -*/ +//------------------------------------------------------------ +// Public interface for passing messages into the consensus state, +// possibly causing a state transition +// TODO: should these return anything or let callers just use events? // May block on send if queue is full. func (cs *ConsensusState) AddVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) { @@ -330,12 +320,12 @@ func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *t cs.SetProposal(proposal, peerKey) for i := 0; i < parts.Total(); i++ { part := parts.GetPart(i) - cs.AddProposalBlockPart(cs.Height, cs.Round, part, peerKey) + cs.AddProposalBlockPart(proposal.Height, proposal.Round, part, peerKey) } return nil // TODO errors } -//---------------------------------------------- +//------------------------------------------------------------ // internal functions for managing the state func (cs *ConsensusState) updateHeight(height int) { @@ -347,11 +337,11 @@ func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) { cs.Step = step } -// EnterNewRound(height, 0) at cs.StartTime. +// enterNewRound(height, 0) at cs.StartTime. func (cs *ConsensusState) scheduleRound0(height int) { //log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime) sleepDuration := cs.StartTime.Sub(time.Now()) - cs.scheduleTimeout(sleepDuration, height, 0, 1) + cs.scheduleTimeout(sleepDuration, height, 0, RoundStepNewHeight) } // Attempt to schedule a timeout by sending timeoutInfo on the tickChan. @@ -363,11 +353,14 @@ func (cs *ConsensusState) scheduleTimeout(duration time.Duration, height, round // send a msg into the receiveRoutine regarding our own proposal, block part, or vote func (cs *ConsensusState) sendInternalMessage(mi msgInfo) { - timeout := time.After(10 * time.Millisecond) select { case cs.internalMsgQueue <- mi: - case <-timeout: - log.Debug("Timed out trying to send an internal messge. Launching go-routine") + default: + // NOTE: using the go-routine means our votes can + // be processed out of order. + // TODO: use CList here for strict determinism and + // attempt push to internalMsgQueue in receiveRoutine + log.Debug("Internal msg queue is full. Using a go-routine") go func() { cs.internalMsgQueue <- mi }() } } @@ -464,7 +457,10 @@ func (cs *ConsensusState) updateToState(state *sm.State) { func (cs *ConsensusState) newStep() { cs.nSteps += 1 - cs.newStepCh <- cs.getRoundState() + // newStep is called by updateToStep in NewConsensusState before the evsw is set! + if cs.evsw != nil { + cs.evsw.FireEvent(types.EventStringNewRoundStep(), cs.RoundStateEvent()) + } } //----------------------------------------- @@ -527,7 +523,7 @@ func (cs *ConsensusState) stopTimer() { // receiveRoutine handles messages which may cause state transitions. // it's argument (n) is the number of messages to process before exiting - use 0 to run forever // It keeps the RoundState and is the only thing that updates it. -// Updates happen on timeouts, complete proposals, and 2/3 majorities +// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities func (cs *ConsensusState) receiveRoutine(maxSteps int) { for { if maxSteps > 0 { @@ -571,25 +567,23 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { // once proposal is set, we can receive block parts err = cs.setProposal(msg.Proposal) case *BlockPartMessage: - // if the proposal is complete, we'll EnterPrevote or tryFinalizeCommit - // if we're the only validator, the EnterPrevote may take us through to the next round + // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit + // if we're the only validator, the enterPrevote may take us through to the next round _, err = cs.addProposalBlockPart(msg.Height, msg.Part) case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature // if the vote gives us a 2/3-any or 2/3-one, we transition - added, err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey) + err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey) if err == ErrAddingVote { // TODO: punish peer } - if added { - // If rs.Height == vote.Height && rs.Round < vote.Round, - // the peer is sending us CatchupCommit precommits. - // We could make note of this and help filter in broadcastHasVoteMessage(). + // NOTE: the vote is broadcast to peers by the reactor listening + // for vote events - // XXX TODO: do this - // conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex) - } + // TODO: If rs.Height == vote.Height && rs.Round < vote.Round, + // the peer is sending us CatchupCommit precommits. + // We could make note of this and help filter in broadcastHasVoteMessage(). default: log.Warn("Unknown msg type", reflect.TypeOf(msg)) } @@ -601,17 +595,6 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) - // if this is a timeout for the new height - if ti.height == rs.Height+1 && ti.round == 0 && ti.step == 1 { - cs.mtx.Lock() - // Increment height. - cs.updateToState(cs.stagedState) - // event fired from EnterNewRound after some updates - cs.EnterNewRound(ti.height, 0) - cs.mtx.Unlock() - return - } - // timeouts must be for current height, round, step if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) { log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step) @@ -623,15 +606,19 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { defer cs.mtx.Unlock() switch ti.step { + case RoundStepNewHeight: + // NewRound event fired from enterNewRound. + // Do we want a timeout event too? + cs.enterNewRound(ti.height, 0) case RoundStepPropose: cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent()) - cs.EnterPrevote(ti.height, ti.round) + cs.enterPrevote(ti.height, ti.round) case RoundStepPrevoteWait: cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.EnterPrecommit(ti.height, ti.round) + cs.enterPrecommit(ti.height, ti.round) case RoundStepPrecommitWait: cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.EnterNewRound(ti.height, ti.round+1) + cs.enterNewRound(ti.height, ti.round+1) default: panic(Fmt("Invalid timeout step: %v", ti.step)) } @@ -640,17 +627,15 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { //----------------------------------------------------------------------------- // State functions -// Many of these functions are capitalized but are not really meant to be used -// by external code as it will cause race conditions with running timeout/receiveRoutine. -// Use AddVote, SetProposal, AddProposalBlockPart instead +// Used internally by handleTimeout and handleMsg to make state transitions // Enter: +2/3 precommits for nil at (height,round-1) // Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1) // Enter: `startTime = commitTime+timeoutCommit` from NewHeight(height) // NOTE: cs.StartTime was already set for height. -func (cs *ConsensusState) EnterNewRound(height int, round int) { +func (cs *ConsensusState) enterNewRound(height int, round int) { if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != RoundStepNewHeight) { - log.Debug(Fmt("EnterNewRound(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterNewRound(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } @@ -659,7 +644,7 @@ func (cs *ConsensusState) EnterNewRound(height int, round int) { } // cs.stopTimer() - log.Notice(Fmt("EnterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Notice(Fmt("enterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) // Increment validators if necessary validators := cs.Validators @@ -686,25 +671,32 @@ func (cs *ConsensusState) EnterNewRound(height int, round int) { cs.evsw.FireEvent(types.EventStringNewRound(), cs.RoundStateEvent()) - // Immediately go to EnterPropose. - cs.EnterPropose(height, round) + // Immediately go to enterPropose. + cs.enterPropose(height, round) } // Enter: from NewRound(height,round). -func (cs *ConsensusState) EnterPropose(height int, round int) { +func (cs *ConsensusState) enterPropose(height int, round int) { // cs.mtx.Lock() // cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPropose <= cs.Step) { - log.Debug(Fmt("EnterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } - log.Info(Fmt("EnterPropose(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPropose(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) defer func() { - // Done EnterPropose: + // Done enterPropose: cs.updateRoundStep(round, RoundStepPropose) cs.newStep() + + // If we have the whole proposal + POL, then goto Prevote now. + // else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart), + // or else after timeoutPropose + if cs.isProposalComplete() { + cs.enterPrevote(height, cs.Round) + } }() // This step times out after `timeoutPropose` @@ -716,18 +708,12 @@ func (cs *ConsensusState) EnterPropose(height int, round int) { } if !bytes.Equal(cs.Validators.Proposer().Address, cs.privValidator.Address) { - log.Info("EnterPropose: Not our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) + log.Info("enterPropose: Not our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) } else { - log.Info("EnterPropose: Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) + log.Info("enterPropose: Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) cs.decideProposal(height, round) } - // If we have the whole proposal + POL, then goto Prevote now. - // else, we'll EnterPrevote when the rest of the proposal is received (in AddProposalBlockPart), - // or else after timeoutPropose - if cs.isProposalComplete() { - cs.EnterPrevote(height, cs.Round) - } } func (cs *ConsensusState) decideProposal(height, round int) { @@ -766,7 +752,7 @@ func (cs *ConsensusState) decideProposal(height, round int) { log.Notice("Signed and sent proposal", "height", height, "round", round, "proposal", proposal) log.Debug(Fmt("Signed and sent proposal block: %v", block)) } else { - log.Warn("EnterPropose: Error signing proposal", "height", height, "round", round, "error", err) + log.Warn("enterPropose: Error signing proposal", "height", height, "round", round, "error", err) } } @@ -801,7 +787,7 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts validation = cs.LastCommit.MakeValidation() } else { // This shouldn't happen. - log.Error("EnterPropose: Cannot propose anything: No validation for the previous block.") + log.Error("enterPropose: Cannot propose anything: No validation for the previous block.") return } @@ -840,16 +826,16 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts // Enter: any +2/3 prevotes for future round. // Prevote for LockedBlock if we're locked, or ProposalBlock if valid. // Otherwise vote nil. -func (cs *ConsensusState) EnterPrevote(height int, round int) { +func (cs *ConsensusState) enterPrevote(height int, round int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevote <= cs.Step) { - log.Debug(Fmt("EnterPrevote(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPrevote(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } defer func() { - // Done EnterPrevote: + // Done enterPrevote: cs.updateRoundStep(round, RoundStepPrevote) cs.newStep() }() @@ -864,7 +850,7 @@ func (cs *ConsensusState) EnterPrevote(height int, round int) { // cs.stopTimer() - log.Info(Fmt("EnterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) // Sign and broadcast vote as necessary cs.doPrevote(height, round) @@ -876,14 +862,14 @@ func (cs *ConsensusState) EnterPrevote(height int, round int) { func (cs *ConsensusState) doPrevote(height int, round int) { // If a block is locked, prevote that. if cs.LockedBlock != nil { - log.Info("EnterPrevote: Block was locked") + log.Info("enterPrevote: Block was locked") cs.signAddVote(types.VoteTypePrevote, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header()) return } // If ProposalBlock is nil, prevote nil. if cs.ProposalBlock == nil { - log.Warn("EnterPrevote: ProposalBlock is nil") + log.Warn("enterPrevote: ProposalBlock is nil") cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{}) return } @@ -892,7 +878,7 @@ func (cs *ConsensusState) doPrevote(height int, round int) { err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts) if err != nil { // ProposalBlock is invalid, prevote nil. - log.Warn("EnterPrevote: ProposalBlock is invalid", "error", err) + log.Warn("enterPrevote: ProposalBlock is invalid", "error", err) cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{}) return } @@ -905,25 +891,25 @@ func (cs *ConsensusState) doPrevote(height int, round int) { } // Enter: any +2/3 prevotes at next round. -func (cs *ConsensusState) EnterPrevoteWait(height int, round int) { +func (cs *ConsensusState) enterPrevoteWait(height int, round int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevoteWait <= cs.Step) { - log.Debug(Fmt("EnterPrevoteWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPrevoteWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } if !cs.Votes.Prevotes(round).HasTwoThirdsAny() { - PanicSanity(Fmt("EnterPrevoteWait(%v/%v), but Prevotes does not have any +2/3 votes", height, round)) + PanicSanity(Fmt("enterPrevoteWait(%v/%v), but Prevotes does not have any +2/3 votes", height, round)) } - log.Info(Fmt("EnterPrevoteWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPrevoteWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) defer func() { - // Done EnterPrevoteWait: + // Done enterPrevoteWait: cs.updateRoundStep(round, RoundStepPrevoteWait) cs.newStep() }() - // After `timeoutPrevote0+timeoutPrevoteDelta*round`, EnterPrecommit() + // After `timeoutPrevote0+timeoutPrevoteDelta*round`, enterPrecommit() cs.scheduleTimeout(timeoutPrevote0+timeoutPrevoteDelta*time.Duration(round), height, round, RoundStepPrevoteWait) } @@ -933,20 +919,20 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) { // Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round) // else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil, // else, precommit nil otherwise. -func (cs *ConsensusState) EnterPrecommit(height int, round int) { +func (cs *ConsensusState) enterPrecommit(height int, round int) { //cs.mtx.Lock() // defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommit <= cs.Step) { - log.Debug(Fmt("EnterPrecommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPrecommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } // cs.stopTimer() - log.Info(Fmt("EnterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) defer func() { - // Done EnterPrecommit: + // Done enterPrecommit: cs.updateRoundStep(round, RoundStepPrecommit) cs.newStep() }() @@ -956,9 +942,9 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { // If we don't have a polka, we must precommit nil if !ok { if cs.LockedBlock != nil { - log.Info("EnterPrecommit: No +2/3 prevotes during EnterPrecommit while we're locked. Precommitting nil") + log.Info("enterPrecommit: No +2/3 prevotes during enterPrecommit while we're locked. Precommitting nil") } else { - log.Info("EnterPrecommit: No +2/3 prevotes during EnterPrecommit. Precommitting nil.") + log.Info("enterPrecommit: No +2/3 prevotes during enterPrecommit. Precommitting nil.") } cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{}) return @@ -975,9 +961,9 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { // +2/3 prevoted nil. Unlock and precommit nil. if len(hash) == 0 { if cs.LockedBlock == nil { - log.Info("EnterPrecommit: +2/3 prevoted for nil.") + log.Info("enterPrecommit: +2/3 prevoted for nil.") } else { - log.Info("EnterPrecommit: +2/3 prevoted for nil. Unlocking") + log.Info("enterPrecommit: +2/3 prevoted for nil. Unlocking") cs.LockedRound = 0 cs.LockedBlock = nil cs.LockedBlockParts = nil @@ -991,7 +977,7 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { // If we're already locked on that block, precommit it, and update the LockedRound if cs.LockedBlock.HashesTo(hash) { - log.Info("EnterPrecommit: +2/3 prevoted locked block. Relocking") + log.Info("enterPrecommit: +2/3 prevoted locked block. Relocking") cs.LockedRound = round cs.evsw.FireEvent(types.EventStringRelock(), cs.RoundStateEvent()) cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader) @@ -1000,10 +986,10 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { // If +2/3 prevoted for proposal block, stage and precommit it if cs.ProposalBlock.HashesTo(hash) { - log.Info("EnterPrecommit: +2/3 prevoted proposal block. Locking", "hash", hash) + log.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", hash) // Validate the block. if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil { - PanicConsensus(Fmt("EnterPrecommit: +2/3 prevoted for an invalid block: %v", err)) + PanicConsensus(Fmt("enterPrecommit: +2/3 prevoted for an invalid block: %v", err)) } cs.LockedRound = round cs.LockedBlock = cs.ProposalBlock @@ -1030,41 +1016,41 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { } // Enter: any +2/3 precommits for next round. -func (cs *ConsensusState) EnterPrecommitWait(height int, round int) { +func (cs *ConsensusState) enterPrecommitWait(height int, round int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommitWait <= cs.Step) { - log.Debug(Fmt("EnterPrecommitWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPrecommitWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } if !cs.Votes.Precommits(round).HasTwoThirdsAny() { - PanicSanity(Fmt("EnterPrecommitWait(%v/%v), but Precommits does not have any +2/3 votes", height, round)) + PanicSanity(Fmt("enterPrecommitWait(%v/%v), but Precommits does not have any +2/3 votes", height, round)) } - log.Info(Fmt("EnterPrecommitWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPrecommitWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) defer func() { - // Done EnterPrecommitWait: + // Done enterPrecommitWait: cs.updateRoundStep(round, RoundStepPrecommitWait) cs.newStep() }() - // After `timeoutPrecommit0+timeoutPrecommitDelta*round`, EnterNewRound() + // After `timeoutPrecommit0+timeoutPrecommitDelta*round`, enterNewRound() cs.scheduleTimeout(timeoutPrecommit0+timeoutPrecommitDelta*time.Duration(round), height, round, RoundStepPrecommitWait) } // Enter: +2/3 precommits for block -func (cs *ConsensusState) EnterCommit(height int, commitRound int) { +func (cs *ConsensusState) enterCommit(height int, commitRound int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || RoundStepCommit <= cs.Step { - log.Debug(Fmt("EnterCommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterCommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) return } - log.Info(Fmt("EnterCommit(%v/%v). Current: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterCommit(%v/%v). Current: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) defer func() { - // Done Entercommit: + // Done enterCommit: // keep ca.Round the same, it points to the right Precommits set. cs.updateRoundStep(cs.Round, RoundStepCommit) cs.CommitRound = commitRound @@ -1116,29 +1102,29 @@ func (cs *ConsensusState) tryFinalizeCommit(height int) { return } // go - cs.FinalizeCommit(height) + cs.finalizeCommit(height) } // Increment height and goto RoundStepNewHeight -func (cs *ConsensusState) FinalizeCommit(height int) { +func (cs *ConsensusState) finalizeCommit(height int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || cs.Step != RoundStepCommit { - log.Debug(Fmt("FinalizeCommit(%v): Invalid args. Current step: %v/%v/%v", height, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("finalizeCommit(%v): Invalid args. Current step: %v/%v/%v", height, cs.Height, cs.Round, cs.Step)) return } hash, header, ok := cs.Votes.Precommits(cs.CommitRound).TwoThirdsMajority() if !ok { - PanicSanity(Fmt("Cannot FinalizeCommit, commit does not have two thirds majority")) + PanicSanity(Fmt("Cannot finalizeCommit, commit does not have two thirds majority")) } if !cs.ProposalBlockParts.HasHeader(header) { PanicSanity(Fmt("Expected ProposalBlockParts header to be commit header")) } if !cs.ProposalBlock.HashesTo(hash) { - PanicSanity(Fmt("Cannot FinalizeCommit, ProposalBlock does not hash to commit hash")) + PanicSanity(Fmt("Cannot finalizeCommit, ProposalBlock does not hash to commit hash")) } if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil { PanicConsensus(Fmt("+2/3 committed an invalid block: %v", err)) @@ -1148,7 +1134,8 @@ func (cs *ConsensusState) FinalizeCommit(height int) { // We have the block, so stage/save/commit-vote. cs.saveBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Votes.Precommits(cs.CommitRound)) - // call updateToState from handleTimeout + // NewHeightStep! + cs.updateToState(cs.stagedState) // cs.StartTime is already set. // Schedule Round0 to start soon. @@ -1199,7 +1186,7 @@ func (cs *ConsensusState) setProposal(proposal *types.Proposal) error { } // NOTE: block is not necessarily valid. -// This can trigger us to go into EnterPrevote asynchronously (before we timeout of propose) or to attempt to commit +// This can trigger us to go into enterPrevote asynchronously (before we timeout of propose) or to attempt to commit func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (added bool, err error) { //cs.mtx.Lock() //defer cs.mtx.Unlock() @@ -1223,10 +1210,10 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (ad var n int var err error cs.ProposalBlock = wire.ReadBinary(&types.Block{}, cs.ProposalBlockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block) - log.Info("Received complete proposal", "hash", cs.ProposalBlock.Hash()) + log.Info("Received complete proposal", "hash", cs.ProposalBlock.Hash(), "round", cs.Proposal.Round) if cs.Step == RoundStepPropose && cs.isProposalComplete() { // Move onto the next step - cs.EnterPrevote(height, cs.Round) + cs.enterPrevote(height, cs.Round) } else if cs.Step == RoundStepCommit { // If we're waiting on the proposal block... cs.tryFinalizeCommit(height) @@ -1237,14 +1224,14 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (ad } // Attempt to add the vote. if its a duplicate signature, dupeout the validator -func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey string) (bool, error) { - added, _, err := cs.addVote(valIndex, vote, peerKey) +func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey string) error { + _, _, err := cs.addVote(valIndex, vote, peerKey) if err != nil { // If the vote height is off, we'll just ignore it, // But if it's a conflicting sig, broadcast evidence tx for slashing. // If it's otherwise invalid, punish peer. if err == ErrVoteHeightMismatch { - return added, err + return err } else if _, ok := err.(*types.ErrVoteConflictingSignature); ok { log.Warn("Found conflicting vote. Publish evidence") /* TODO @@ -1255,14 +1242,14 @@ func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey str } cs.mempool.BroadcastTx(evidenceTx) // shouldn't need to check returned err */ - return added, err + return err } else { // Probably an invalid signature. Bad peer. log.Warn("Error attempting to add vote", "error", err) - return added, ErrAddingVote + return ErrAddingVote } } - return added, nil + return nil } //----------------------------------------------------------------------------- @@ -1300,7 +1287,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string // First, unlock if prevotes is a valid POL. // >> lockRound < POLRound <= unlockOrChangeLockRound (see spec) // NOTE: If (lockRound < POLRound) but !(POLRound <= unlockOrChangeLockRound), - // we'll still EnterNewRound(H,vote.R) and EnterPrecommit(H,vote.R) to process it + // we'll still enterNewRound(H,vote.R) and enterPrecommit(H,vote.R) to process it // there. if (cs.LockedBlock != nil) && (cs.LockedRound < vote.Round) && (vote.Round <= cs.Round) { hash, _, ok := prevotes.TwoThirdsMajority() @@ -1314,17 +1301,17 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string } if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() { // Round-skip over to PrevoteWait or goto Precommit. - cs.EnterNewRound(height, vote.Round) // if the vote is ahead of us + cs.enterNewRound(height, vote.Round) // if the vote is ahead of us if prevotes.HasTwoThirdsMajority() { - cs.EnterPrecommit(height, vote.Round) + cs.enterPrecommit(height, vote.Round) } else { - cs.EnterPrevote(height, vote.Round) // if the vote is ahead of us - cs.EnterPrevoteWait(height, vote.Round) + cs.enterPrevote(height, vote.Round) // if the vote is ahead of us + cs.enterPrevoteWait(height, vote.Round) } } else if cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round { // If the proposal is now complete, enter prevote of cs.Round. if cs.isProposalComplete() { - cs.EnterPrevote(height, cs.Round) + cs.enterPrevote(height, cs.Round) } } case types.VoteTypePrecommit: @@ -1333,16 +1320,16 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string hash, _, ok := precommits.TwoThirdsMajority() if ok { if len(hash) == 0 { - cs.EnterNewRound(height, vote.Round+1) + cs.enterNewRound(height, vote.Round+1) } else { - cs.EnterNewRound(height, vote.Round) - cs.EnterPrecommit(height, vote.Round) - cs.EnterCommit(height, vote.Round) + cs.enterNewRound(height, vote.Round) + cs.enterPrecommit(height, vote.Round) + cs.enterCommit(height, vote.Round) } } else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() { - cs.EnterNewRound(height, vote.Round) - cs.EnterPrecommit(height, vote.Round) - cs.EnterPrecommitWait(height, vote.Round) + cs.enterNewRound(height, vote.Round) + cs.enterPrecommit(height, vote.Round) + cs.enterPrecommitWait(height, vote.Round) //}() } default: @@ -1376,7 +1363,7 @@ func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartS // Create a copy of the state for staging stateCopy := cs.state.Copy() - stateCopy.SetFireable(cs.evc) + stateCopy.SetEventCache(cs.evc) // Run the block on the State: // + update validator sets @@ -1412,9 +1399,8 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part } vote, err := cs.signVote(type_, hash, header) if err == nil { - // NOTE: store our index in the cs so we don't have to do this every time + // TODO: store our index in the cs so we don't have to do this every time valIndex, _ := cs.Validators.GetByAddress(cs.privValidator.Address) - // _, _, err := cs.addVote(valIndex, vote, "") cs.sendInternalMessage(msgInfo{&VoteMessage{valIndex, vote}, ""}) log.Notice("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) return vote diff --git a/consensus/state_test.go b/consensus/state_test.go index 682ade2fa..d6e8682a4 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -8,7 +8,6 @@ import ( _ "github.com/tendermint/tendermint/config/tendermint_test" //"github.com/tendermint/tendermint/events" - "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/types" ) @@ -54,8 +53,8 @@ func TestProposerSelection0(t *testing.T) { cs1, vss := simpleConsensusState(4) height, round := cs1.Height, cs1.Round - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) startTestRound(cs1, height, round) @@ -87,7 +86,7 @@ func TestProposerSelection0(t *testing.T) { func TestProposerSelection2(t *testing.T) { cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) // this time we jump in at round 2 incrementRound(vss[1:]...) @@ -119,7 +118,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { height, round := cs.Height, cs.Round // Listen for propose timeout event - timeoutCh := subscribeToEvent(cs, types.EventStringTimeoutPropose()) + timeoutCh := cs.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) startTestRound(cs, height, round) @@ -143,11 +142,11 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { height, round := cs.Height, cs.Round // Listen for propose timeout event - timeoutCh := subscribeToEvent(cs, types.EventStringTimeoutPropose()) - proposalCh := subscribeToEvent(cs, types.EventStringCompleteProposal()) - //startTestRound(cs, height, round) - cs.EnterNewRound(height, round) + timeoutCh := cs.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + proposalCh := cs.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + + cs.enterNewRound(height, round) cs.startRoutines(3) <-proposalCh @@ -164,7 +163,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { t.Error("rs.ProposalBlockParts should be set") } - // if we're a validator, EnterPropose should not timeout + // if we're a validator, enterPropose should not timeout ticker := time.NewTicker(timeoutPropose * 2) select { case <-timeoutCh: @@ -179,8 +178,8 @@ func TestBadProposal(t *testing.T) { height, round := cs1.Height, cs1.Round cs2 := vss[1] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - voteCh := subscribeToEvent(cs1, types.EventStringVote()) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) propBlock, _ := cs1.createProposalBlock() //changeProposer(t, cs1, cs2) @@ -234,19 +233,27 @@ func TestFullRound1(t *testing.T) { cs, vss := simpleConsensusState(1) height, round := cs.Height, cs.Round - voteCh := subscribeToEvent(cs, types.EventStringVote()) + voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + propCh := cs.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + newRoundCh := cs.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + + startTestRound(cs, height, round) + + <-newRoundCh - cs.EnterNewRound(height, round) - cs.startRoutines(5) + // grab proposal + re := <-propCh + propBlockHash := re.(*types.EventDataRoundState).RoundState().(*RoundState).ProposalBlock.Hash() <-voteCh // wait for prevote - <-voteCh // wait for precommit + validatePrevote(t, cs, round, vss[0], propBlockHash) - propBlockHash := cs.GetRoundState().ProposalBlock.Hash() + <-voteCh // wait for precommit - // the proposed block should be prevoted, precommitted, and locked - validatePrevoteAndPrecommit(t, cs, round, round, vss[0], propBlockHash, propBlockHash) + // we're going to roll right into new height + <-newRoundCh + validateLastPrecommit(t, cs, vss[0], propBlockHash) } // nil is proposed, so prevote and precommit nil @@ -254,9 +261,9 @@ func TestFullRoundNil(t *testing.T) { cs, vss := simpleConsensusState(1) height, round := cs.Height, cs.Round - voteCh := subscribeToEvent(cs, types.EventStringVote()) + voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) - cs.EnterPrevote(height, round) + cs.enterPrevote(height, round) cs.startRoutines(4) <-voteCh // prevote @@ -273,8 +280,8 @@ func TestFullRound2(t *testing.T) { cs2 := vss[1] height, round := cs1.Height, cs1.Round - voteCh := subscribeToEvent(cs1, types.EventStringVote()) - newBlockCh := subscribeToEvent(cs1, types.EventStringNewBlock()) + voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) // start round and wait for propose and prevote startTestRound(cs1, height, round) @@ -308,43 +315,38 @@ func TestFullRound2(t *testing.T) { // LockSuite // two validators, 4 rounds. -// val1 proposes the first 2 rounds, and is locked in the first. -// val2 proposes the next two. val1 should precommit nil on all (except first where he locks) +// two vals take turns proposing. val1 locks on first one, precommits nil on everything else func TestLockNoPOL(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - cs1.newStepCh = make(chan *RoundState) // so it blocks height := cs1.Height - timeoutChan := make(chan struct{}) - evsw := events.NewEventSwitch() - evsw.OnStart() - evsw.AddListenerForEvent("tester", types.EventStringTimeoutPropose(), func(data types.EventData) { - timeoutChan <- struct{}{} - }) - evsw.AddListenerForEvent("tester", types.EventStringTimeoutWait(), func(data types.EventData) { - timeoutChan <- struct{}{} - }) - cs1.SetFireable(evsw) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) /* Round1 (cs1, B) // B B // B B2 */ // start round and wait for prevote - go cs1.EnterNewRound(height, 0) - waitFor(t, cs1, height, 0, RoundStepPrevote) + cs1.enterNewRound(height, 0) + cs1.startRoutines(0) + + re := <-proposalCh + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + theBlockHash := rs.ProposalBlock.Hash() + + <-voteCh // prevote // we should now be stuck in limbo forever, waiting for more prevotes // prevote arrives from cs2: signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) + <-voteCh // prevote - cs1.mtx.Lock() // XXX: sigh - theBlockHash := cs1.ProposalBlock.Hash() - cs1.mtx.Unlock() - - // wait to finish precommit - waitFor(t, cs1, height, 0, RoundStepPrecommit) + <-voteCh // precommit // the proposed block should now be locked and our precommit added validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) @@ -352,16 +354,20 @@ func TestLockNoPOL(t *testing.T) { // we should now be stuck in limbo forever, waiting for more precommits // lets add one for a different block // NOTE: in practice we should never get to a point where there are precommits for different blocks at the same round - hash := cs1.ProposalBlock.Hash() + hash := make([]byte, len(theBlockHash)) + copy(hash, theBlockHash) hash[0] = byte((hash[0] + 1) % 255) - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) + <-voteCh // precommit // (note we're entering precommit for a second time this round) - // but with invalid args. then we EnterPrecommitWait, and the timeout to new round - waitFor(t, cs1, height, 0, RoundStepPrecommitWait) - <-timeoutChan + // but with invalid args. then we enterPrecommitWait, and the timeout to new round + <-timeoutWaitCh + + /// - log.Info("#### ONTO ROUND 2") + <-newRoundCh + log.Notice("#### ONTO ROUND 1") /* Round2 (cs1, B) // B B2 */ @@ -369,27 +375,28 @@ func TestLockNoPOL(t *testing.T) { incrementRound(cs2) // now we're on a new round and not the proposer, so wait for timeout - waitFor(t, cs1, height, 1, RoundStepPropose) - <-timeoutChan + re = <-timeoutProposeCh + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) - if cs1.ProposalBlock != nil { + if rs.ProposalBlock != nil { t.Fatal("Expected proposal block to be nil") } // wait to finish prevote - waitFor(t, cs1, height, 1, RoundStepPrevote) + <-voteCh // we should have prevoted our locked block - validatePrevote(t, cs1, 1, vss[0], cs1.LockedBlock.Hash()) + validatePrevote(t, cs1, 1, vss[0], rs.LockedBlock.Hash()) // add a conflicting prevote from the other validator - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) + <-voteCh // now we're going to enter prevote again, but with invalid args // and then prevote wait, which should timeout. then wait for precommit - waitFor(t, cs1, height, 1, RoundStepPrevoteWait) - <-timeoutChan - waitFor(t, cs1, height, 1, RoundStepPrecommit) + <-timeoutWaitCh + + <-voteCh // precommit // the proposed block should still be locked and our precommit added // we should precommit nil and be locked on the proposal @@ -397,43 +404,44 @@ func TestLockNoPOL(t *testing.T) { // add conflicting precommit from cs2 // NOTE: in practice we should never get to a point where there are precommits for different blocks at the same round - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) + <-voteCh // (note we're entering precommit for a second time this round, but with invalid args - // then we EnterPrecommitWait and timeout into NewRound - waitFor(t, cs1, height, 1, RoundStepPrecommitWait) - <-timeoutChan + // then we enterPrecommitWait and timeout into NewRound + <-timeoutWaitCh - log.Info("#### ONTO ROUND 3") + <-newRoundCh + log.Notice("#### ONTO ROUND 2") /* Round3 (cs2, _) // B, B2 */ incrementRound(cs2) - waitFor(t, cs1, height, 2, RoundStepPropose) + re = <-proposalCh + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) // now we're on a new round and are the proposer - if cs1.ProposalBlock != cs1.LockedBlock { - t.Fatalf("Expected proposal block to be locked block. Got %v, Expected %v", cs1.ProposalBlock, cs1.LockedBlock) + if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) { + t.Fatalf("Expected proposal block to be locked block. Got %v, Expected %v", rs.ProposalBlock, rs.LockedBlock) } - // go to prevote, prevote for locked block - waitFor(t, cs1, height, 2, RoundStepPrevote) + <-voteCh // prevote - validatePrevote(t, cs1, 0, vss[0], cs1.LockedBlock.Hash()) + validatePrevote(t, cs1, 2, vss[0], rs.LockedBlock.Hash()) - // TODO: quick fastforward to new round, set proposer - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) + <-voteCh - waitFor(t, cs1, height, 2, RoundStepPrevoteWait) - <-timeoutChan - waitFor(t, cs1, height, 2, RoundStepPrecommit) + <-timeoutWaitCh // prevote wait + <-voteCh // precommit - validatePrecommit(t, cs1, 2, 0, vss[0], nil, theBlockHash) // precommit nil but be locked on proposal - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) // NOTE: conflicting precommits at same height + validatePrecommit(t, cs1, 2, 0, vss[0], nil, theBlockHash) // precommit nil but be locked on proposal + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) // NOTE: conflicting precommits at same height + <-voteCh - waitFor(t, cs1, height, 2, RoundStepPrecommitWait) + <-timeoutWaitCh // before we time out into new round, set next proposal block prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) @@ -443,66 +451,46 @@ func TestLockNoPOL(t *testing.T) { incrementRound(cs2) - <-timeoutChan - - log.Info("#### ONTO ROUND 4") + <-newRoundCh + log.Notice("#### ONTO ROUND 3") /* Round4 (cs2, C) // B C // B C */ // now we're on a new round and not the proposer // so set the proposal block - cs1.mtx.Lock() - cs1.Proposal, cs1.ProposalBlock = prop, propBlock - cs1.mtx.Unlock() - - // wait for the proposal go ahead - waitFor(t, cs1, height, 3, RoundStepPropose) - - //log.Debug("waiting for timeout") - // and wait for timeout - // go func() { <-timeoutChan }() + cs1.SetProposalAndBlock(prop, propBlock, propBlock.MakePartSet(), "") - log.Debug("waiting for prevote") - // go to prevote, prevote for locked block (not proposal) - waitFor(t, cs1, height, 3, RoundStepPrevote) + <-proposalCh + <-voteCh // prevote + // prevote for locked block (not proposal) validatePrevote(t, cs1, 0, vss[0], cs1.LockedBlock.Hash()) signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, propBlock.Hash(), propBlock.MakePartSet().Header()) + <-voteCh - waitFor(t, cs1, height, 3, RoundStepPrevoteWait) - <-timeoutChan - waitFor(t, cs1, height, 3, RoundStepPrecommit) + <-timeoutWaitCh + <-voteCh validatePrecommit(t, cs1, 2, 0, vss[0], nil, theBlockHash) // precommit nil but locked on proposal signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, propBlock.Hash(), propBlock.MakePartSet().Header()) // NOTE: conflicting precommits at same height + <-voteCh } // 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka func TestLockPOLRelock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - cs1.newStepCh = make(chan *RoundState) // so it blocks - - timeoutChan := make(chan *types.EventDataRoundState) - voteChan := make(chan *types.EventDataVote) - evsw := events.NewEventSwitch() - evsw.OnStart() - evsw.AddListenerForEvent("tester", types.EventStringTimeoutPropose(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringTimeoutWait(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringVote(), func(data types.EventData) { - vote := data.(*types.EventDataVote) - // we only fire for our own votes - if bytes.Equal(cs1.privValidator.Address, vote.Address) { - voteChan <- vote - } - }) - cs1.SetFireable(evsw) + + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) + + log.Debug("cs2 last round", "lr", cs2.PrivValidator.LastRound) // everything done from perspective of cs1 @@ -513,58 +501,42 @@ func TestLockPOLRelock(t *testing.T) { */ // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _, _ = <-cs1.NewStepCh(), <-voteChan, <-cs1.NewStepCh() - - theBlockHash := cs1.ProposalBlock.Hash() - - // wait to finish precommit after prevotes done - // we do this in a go routine with another channel since otherwise - // we may get deadlock with EnterPrecommit waiting to send on newStepCh and the final - // signAddVoteToFrom waiting for the cs.mtx.Lock - donePrecommit := make(chan struct{}) - go func() { - <-voteChan - <-cs1.NewStepCh() - donePrecommit <- struct{}{} - }() + startTestRound(cs1, cs1.Height, 0) + + <-newRoundCh + re := <-proposalCh + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + theBlockHash := rs.ProposalBlock.Hash() + + <-voteCh // prevote + signAddVoteToFromMany(types.VoteTypePrevote, cs1, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), cs2, cs3, cs4) - <-donePrecommit + _, _, _ = <-voteCh, <-voteCh, <-voteCh // prevotes + <-voteCh // our precommit // the proposed block should now be locked and our precommit added validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) - donePrecommitWait := make(chan struct{}) - go func() { - // (note we're entering precommit for a second time this round) - // but with invalid args. then we EnterPrecommitWait, twice (?) - <-cs1.NewStepCh() - donePrecommitWait <- struct{}{} - }() // add precommits from the rest signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs4) signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) - <-donePrecommitWait - - // before we time out into new round, set next proposer - // and next proposal block - _, v1 := cs1.Validators.GetByAddress(vss[0].Address) - v1.VotingPower = 1 - if updated := cs1.Validators.Update(v1); !updated { - t.Fatal("failed to update validator") - } + _, _, _ = <-voteCh, <-voteCh, <-voteCh // precommits + // before we timeout to the new round set the new proposal prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) + propBlockParts := propBlock.MakePartSet() + propBlockHash := propBlock.Hash() incrementRound(cs2, cs3, cs4) // timeout to new round - te := <-timeoutChan - if te.Step != RoundStepPrecommitWait.String() { - t.Fatalf("expected to timeout of precommit into new round. got %v", te.Step) - } + <-timeoutWaitCh - log.Info("### ONTO ROUND 2") + //XXX: this isnt gauranteed to get there before the timeoutPropose ... + cs1.SetProposalAndBlock(prop, propBlock, propBlockParts, "some peer") + + <-newRoundCh + log.Notice("### ONTO ROUND 1") /* Round2 (cs2, C) // B C C C // C C C _) @@ -573,53 +545,44 @@ func TestLockPOLRelock(t *testing.T) { */ // now we're on a new round and not the proposer - // so set the proposal block - cs1.mtx.Lock() - propBlockHash, propBlockParts := propBlock.Hash(), propBlock.MakePartSet() - cs1.Proposal, cs1.ProposalBlock, cs1.ProposalBlockParts = prop, propBlock, propBlockParts - cs1.mtx.Unlock() - <-cs1.NewStepCh() + // but we should receive the proposal + select { + case <-proposalCh: + case <-timeoutProposeCh: + <-proposalCh + } // go to prevote, prevote for locked block (not proposal), move on - _, _ = <-voteChan, <-cs1.NewStepCh() + <-voteCh validatePrevote(t, cs1, 0, vss[0], theBlockHash) - donePrecommit = make(chan struct{}) - go func() { - // we need this go routine because if we go into PrevoteWait it has to pull on newStepCh - // before the final vote will get added (because it holds the mutex). - select { - case <-cs1.NewStepCh(): // we're in PrevoteWait, go to Precommit - <-voteChan - case <-voteChan: // we went straight to Precommit - } - donePrecommit <- struct{}{} - }() // now lets add prevotes from everyone else for the new block signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlockHash, propBlockParts.Header(), cs2, cs3, cs4) - <-donePrecommit + _, _, _ = <-voteCh, <-voteCh, <-voteCh // prevotes + + // now either we go to PrevoteWait or Precommit + select { + case <-timeoutWaitCh: // we're in PrevoteWait, go to Precommit + <-voteCh + case <-voteCh: // we went straight to Precommit + } // we should have unlocked and locked on the new block validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash, propBlockHash) - donePrecommitWait = make(chan struct{}) - go func() { - // (note we're entering precommit for a second time this round) - // but with invalid args. then we EnterPrecommitWait, - <-cs1.NewStepCh() - donePrecommitWait <- struct{}{} - }() signAddVoteToFromMany(types.VoteTypePrecommit, cs1, propBlockHash, propBlockParts.Header(), cs2, cs3) - <-donePrecommitWait + _, _ = <-voteCh, <-voteCh - <-cs1.NewStepCh() - rs := <-cs1.NewStepCh() + be := <-newBlockCh + b := be.(types.EventDataNewBlock) + re = <-newRoundCh + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) if rs.Height != 2 { t.Fatal("Expected height to increment") } - if hash, _, ok := rs.LastCommit.TwoThirdsMajority(); !ok || !bytes.Equal(hash, propBlockHash) { - t.Fatal("Expected block to get committed") + if !bytes.Equal(b.Block.Hash(), propBlockHash) { + t.Fatal("Expected new block to be proposal block") } } @@ -627,26 +590,13 @@ func TestLockPOLRelock(t *testing.T) { func TestLockPOLUnlock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - cs1.newStepCh = make(chan *RoundState) // so it blocks - - timeoutChan := make(chan *types.EventDataRoundState) - voteChan := make(chan *types.EventDataVote) - evsw := events.NewEventSwitch() - evsw.OnStart() - evsw.AddListenerForEvent("tester", types.EventStringTimeoutPropose(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringTimeoutWait(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringVote(), func(data types.EventData) { - vote := data.(*types.EventDataVote) - // we only fire for our own votes - if bytes.Equal(cs1.privValidator.Address, vote.Address) { - voteChan <- vote - } - }) - cs1.SetFireable(evsw) + + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + unlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringUnlock(), 0) + voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // everything done from perspective of cs1 @@ -657,49 +607,41 @@ func TestLockPOLUnlock(t *testing.T) { */ // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _, _ = <-cs1.NewStepCh(), <-voteChan, <-cs1.NewStepCh() + startTestRound(cs1, cs1.Height, 0) + <-newRoundCh + re := <-proposalCh + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + theBlockHash := rs.ProposalBlock.Hash() - theBlockHash := cs1.ProposalBlock.Hash() + <-voteCh // prevote - donePrecommit := make(chan struct{}) - go func() { - <-voteChan - <-cs1.NewStepCh() - donePrecommit <- struct{}{} - }() signAddVoteToFromMany(types.VoteTypePrevote, cs1, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), cs2, cs3, cs4) - <-donePrecommit + + <-voteCh //precommit // the proposed block should now be locked and our precommit added validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) - donePrecommitWait := make(chan struct{}) - go func() { - <-cs1.NewStepCh() - donePrecommitWait <- struct{}{} - }() // add precommits from the rest signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs4) signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) - <-donePrecommitWait - - // before we time out into new round, set next proposer - // and next proposal block - _, v1 := cs1.Validators.GetByAddress(vss[0].Address) - v1.VotingPower = 1 - if updated := cs1.Validators.Update(v1); !updated { - t.Fatal("failed to update validator") - } + // before we time out into new round, set next proposal block prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) + propBlockParts := propBlock.MakePartSet() incrementRound(cs2, cs3, cs4) // timeout to new round - <-timeoutChan + re = <-timeoutWaitCh + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + lockedBlockHash := rs.LockedBlock.Hash() + + //XXX: this isnt gauranteed to get there before the timeoutPropose ... + cs1.SetProposalAndBlock(prop, propBlock, propBlockParts, "some peer") - log.Info("#### ONTO ROUND 2") + <-newRoundCh + log.Notice("#### ONTO ROUND 1") /* Round2 (cs2, C) // B nil nil nil // nil nil nil _ @@ -707,43 +649,29 @@ func TestLockPOLUnlock(t *testing.T) { */ // now we're on a new round and not the proposer, - // so set the proposal block - cs1.mtx.Lock() - cs1.Proposal, cs1.ProposalBlock, cs1.ProposalBlockParts = prop, propBlock, propBlock.MakePartSet() - lockedBlockHash := cs1.LockedBlock.Hash() - cs1.mtx.Unlock() - <-cs1.NewStepCh() + // but we should receive the proposal + select { + case <-proposalCh: + case <-timeoutProposeCh: + <-proposalCh + } // go to prevote, prevote for locked block (not proposal) - _, _ = <-voteChan, <-cs1.NewStepCh() + <-voteCh validatePrevote(t, cs1, 0, vss[0], lockedBlockHash) - - donePrecommit = make(chan struct{}) - go func() { - select { - case <-cs1.NewStepCh(): // we're in PrevoteWait, go to Precommit - <-voteChan - case <-voteChan: // we went straight to Precommit - } - donePrecommit <- struct{}{} - }() - // now lets add prevotes from everyone else for the new block + // now lets add prevotes from everyone else for nil (a polka!) signAddVoteToFromMany(types.VoteTypePrevote, cs1, nil, types.PartSetHeader{}, cs2, cs3, cs4) - <-donePrecommit - // we should have unlocked - // NOTE: we don't lock on nil, so LockedRound is still 0 + // the polka makes us unlock and precommit nil + <-unlockCh + <-voteCh // precommit + + // we should have unlocked and committed nil + // NOTE: since we don't relock on nil, the lock round is 0 validatePrecommit(t, cs1, 1, 0, vss[0], nil, nil) - donePrecommitWait = make(chan struct{}) - go func() { - // the votes will bring us to new round right away - // we should timeout of it - _, _, _ = <-cs1.NewStepCh(), <-cs1.NewStepCh(), <-timeoutChan - donePrecommitWait <- struct{}{} - }() signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs3) - <-donePrecommitWait + <-newRoundCh } // 4 vals @@ -753,45 +681,35 @@ func TestLockPOLUnlock(t *testing.T) { func TestLockPOLSafety1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - cs1.newStepCh = make(chan *RoundState) // so it blocks - - timeoutChan := make(chan *types.EventDataRoundState) - voteChan := make(chan *types.EventDataVote) - evsw := events.NewEventSwitch() - evsw.OnStart() - evsw.AddListenerForEvent("tester", types.EventStringTimeoutPropose(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringTimeoutWait(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringVote(), func(data types.EventData) { - vote := data.(*types.EventDataVote) - // we only fire for our own votes - if bytes.Equal(cs1.privValidator.Address, vote.Address) { - voteChan <- vote - } - }) - cs1.SetFireable(evsw) + + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _, _ = <-cs1.NewStepCh(), <-voteChan, <-cs1.NewStepCh() + startTestRound(cs1, cs1.Height, 0) + <-newRoundCh + re := <-proposalCh + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + propBlock := rs.ProposalBlock - propBlock := cs1.ProposalBlock + <-voteCh // prevote - validatePrevote(t, cs1, 0, vss[0], cs1.ProposalBlock.Hash()) + validatePrevote(t, cs1, 0, vss[0], propBlock.Hash()) // the others sign a polka but we don't see it prevotes := signVoteMany(types.VoteTypePrevote, propBlock.Hash(), propBlock.MakePartSet().Header(), cs2, cs3, cs4) // before we time out into new round, set next proposer // and next proposal block - _, v1 := cs1.Validators.GetByAddress(vss[0].Address) - v1.VotingPower = 1 - if updated := cs1.Validators.Update(v1); !updated { - t.Fatal("failed to update validator") - } + /* + _, v1 := cs1.Validators.GetByAddress(vss[0].Address) + v1.VotingPower = 1 + if updated := cs1.Validators.Update(v1); !updated { + t.Fatal("failed to update validator") + }*/ log.Warn("old prop", "hash", fmt.Sprintf("%X", propBlock.Hash())) @@ -799,266 +717,207 @@ func TestLockPOLSafety1(t *testing.T) { signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs3, cs4) prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) + propBlockHash := propBlock.Hash() + propBlockParts := propBlock.MakePartSet() incrementRound(cs2, cs3, cs4) - log.Info("### ONTO ROUND 2") + //XXX: this isnt gauranteed to get there before the timeoutPropose ... + cs1.SetProposalAndBlock(prop, propBlock, propBlockParts, "some peer") + + <-newRoundCh + log.Notice("### ONTO ROUND 1") /*Round2 // we timeout and prevote our lock // a polka happened but we didn't see it! */ // now we're on a new round and not the proposer, - // so set proposal - cs1.mtx.Lock() - propBlockHash, propBlockParts := propBlock.Hash(), propBlock.MakePartSet() - cs1.Proposal, cs1.ProposalBlock, cs1.ProposalBlockParts = prop, propBlock, propBlockParts - cs1.mtx.Unlock() - <-cs1.NewStepCh() - - if cs1.LockedBlock != nil { + // but we should receive the proposal + select { + case re = <-proposalCh: + case <-timeoutProposeCh: + re = <-proposalCh + } + + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + + if rs.LockedBlock != nil { t.Fatal("we should not be locked!") } log.Warn("new prop", "hash", fmt.Sprintf("%X", propBlockHash)) // go to prevote, prevote for proposal block - _, _ = <-voteChan, <-cs1.NewStepCh() + <-voteCh validatePrevote(t, cs1, 1, vss[0], propBlockHash) // now we see the others prevote for it, so we should lock on it - donePrecommit := make(chan struct{}) - go func() { - select { - case <-cs1.NewStepCh(): // we're in PrevoteWait, go to Precommit - <-voteChan - case <-voteChan: // we went straight to Precommit - } - <-cs1.NewStepCh() - donePrecommit <- struct{}{} - }() - // now lets add prevotes from everyone else for nil signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlockHash, propBlockParts.Header(), cs2, cs3, cs4) - <-donePrecommit + + <-voteCh // precommit // we should have precommitted validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash, propBlockHash) - // now we see precommits for nil - donePrecommitWait := make(chan struct{}) - go func() { - // the votes will bring us to new round - // we should timeut of it and go to prevote - <-cs1.NewStepCh() - <-timeoutChan - donePrecommitWait <- struct{}{} - }() signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs3) - <-donePrecommitWait + + <-timeoutWaitCh incrementRound(cs2, cs3, cs4) - log.Info("### ONTO ROUND 3") + <-newRoundCh + + log.Notice("### ONTO ROUND 2") /*Round3 we see the polka from round 1 but we shouldn't unlock! */ // timeout of propose - _, _ = <-cs1.NewStepCh(), <-timeoutChan + <-timeoutProposeCh // finish prevote - _, _ = <-voteChan, <-cs1.NewStepCh() + <-voteCh // we should prevote what we're locked on validatePrevote(t, cs1, 2, vss[0], propBlockHash) + newStepCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRoundStep(), 0) + // add prevotes from the earlier round addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) log.Warn("Done adding prevotes!") - ensureNoNewStep(t, cs1) + ensureNoNewStep(newStepCh) } // 4 vals. -// polka P1 at R1, P2 at R2, and P3 at R3, -// we lock on P1 at R1, don't see P2, and unlock using P3 at R3 -// then we should make sure we don't lock using P2 +// polka P0 at R0, P1 at R1, and P2 at R2, +// we lock on P0 at R0, don't see P1, and unlock using P2 at R2 +// then we should make sure we don't lock using P1 + +// What we want: +// dont see P0, lock on P1 at R1, dont unlock using P0 at R2 func TestLockPOLSafety2(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - cs1.newStepCh = make(chan *RoundState) // so it blocks - - timeoutChan := make(chan *types.EventDataRoundState) - voteChan := make(chan *types.EventDataVote) - evsw := events.NewEventSwitch() - evsw.OnStart() - evsw.AddListenerForEvent("tester", types.EventStringTimeoutPropose(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringTimeoutWait(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringVote(), func(data types.EventData) { - vote := data.(*types.EventDataVote) - // we only fire for our own votes - if bytes.Equal(cs1.privValidator.Address, vote.Address) { - voteChan <- vote - } - }) - cs1.SetFireable(evsw) - // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _, _ = <-cs1.NewStepCh(), <-voteChan, <-cs1.NewStepCh() + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + unlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringUnlock(), 0) + voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) - theBlockHash := cs1.ProposalBlock.Hash() + // the block for R0: gets polkad but we miss it + // (even though we signed it, shhh) + _, propBlock0 := decideProposal(cs1, vss[0], cs1.Height, cs1.Round) + propBlockHash0 := propBlock0.Hash() + propBlockParts0 := propBlock0.MakePartSet() - donePrecommit := make(chan struct{}) - go func() { - <-voteChan - <-cs1.NewStepCh() - donePrecommit <- struct{}{} - }() - signAddVoteToFromMany(types.VoteTypePrevote, cs1, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), cs2, cs3, cs4) - <-donePrecommit + // the others sign a polka but we don't see it + prevotes := signVoteMany(types.VoteTypePrevote, propBlockHash0, propBlockParts0.Header(), cs2, cs3, cs4) - // the proposed block should now be locked and our precommit added - validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) + // the block for round 1 + prop1, propBlock1 := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) + propBlockHash1 := propBlock1.Hash() + propBlockParts1 := propBlock1.MakePartSet() - donePrecommitWait := make(chan struct{}) - go func() { - <-cs1.NewStepCh() - donePrecommitWait <- struct{}{} - }() - // add precommits from the rest - signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs4) - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) - <-donePrecommitWait - - // before we time out into new round, set next proposer - // and next proposal block - _, v1 := cs1.Validators.GetByAddress(vss[0].Address) - v1.VotingPower = 1 - if updated := cs1.Validators.Update(v1); !updated { - t.Fatal("failed to update validator") - } + incrementRound(cs2, cs3, cs4) - prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) + cs1.updateRoundStep(0, RoundStepPrecommitWait) - incrementRound(cs2, cs3, cs4) + log.Notice("### ONTO Round 1") + // jump in at round 1 + height := cs1.Height + startTestRound(cs1, height, 1) + <-newRoundCh - // timeout to new round - <-timeoutChan + cs1.SetProposalAndBlock(prop1, propBlock1, propBlockParts1, "some peer") + <-proposalCh - log.Info("### ONTO Round 2") - /*Round2 - // we timeout and prevote our lock - // a polka happened but we didn't see it! - */ + <-voteCh // prevote - // now we're on a new round and not the proposer, so wait for timeout - _, _ = <-cs1.NewStepCh(), <-timeoutChan - // go to prevote, prevote for locked block - _, _ = <-voteChan, <-cs1.NewStepCh() - validatePrevote(t, cs1, 0, vss[0], cs1.LockedBlock.Hash()) + signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlockHash1, propBlockParts1.Header(), cs2, cs3, cs4) - // the others sign a polka but we don't see it - prevotes := signVoteMany(types.VoteTypePrevote, propBlock.Hash(), propBlock.MakePartSet().Header(), cs2, cs3, cs4) + <-voteCh // precommit + // the proposed block should now be locked and our precommit added + validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash1, propBlockHash1) - // once we see prevotes for the next round we'll skip ahead + // add precommits from the rest + signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs4) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, propBlockHash1, propBlockParts1.Header()) incrementRound(cs2, cs3, cs4) - log.Info("### ONTO Round 3") - /*Round3 - a polka for nil causes us to unlock - */ + // timeout of precommit wait to new round + <-timeoutWaitCh - // these prevotes will send us straight to precommit at the higher round - donePrecommit = make(chan struct{}) - go func() { - select { - case <-cs1.NewStepCh(): // we're in PrevoteWait, go to Precommit - <-voteChan - case <-voteChan: // we went straight to Precommit - } - <-cs1.NewStepCh() - donePrecommit <- struct{}{} - }() - // now lets add prevotes from everyone else for nil - signAddVoteToFromMany(types.VoteTypePrevote, cs1, nil, types.PartSetHeader{}, cs2, cs3, cs4) - <-donePrecommit - - // we should have unlocked - // NOTE: we don't lock on nil, so LockedRound is still 0 - validatePrecommit(t, cs1, 2, 0, vss[0], nil, nil) - - donePrecommitWait = make(chan struct{}) - go func() { - // the votes will bring us to new round right away - // we should timeut of it and go to prevote - <-cs1.NewStepCh() - // set the proposal block to be that which got a polka in R2 - cs1.mtx.Lock() - cs1.Proposal, cs1.ProposalBlock, cs1.ProposalBlockParts = prop, propBlock, propBlock.MakePartSet() - cs1.mtx.Unlock() - // timeout into prevote, finish prevote - _, _, _ = <-timeoutChan, <-voteChan, <-cs1.NewStepCh() - donePrecommitWait <- struct{}{} - }() - signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs3) - <-donePrecommitWait + // in round 2 we see the polkad block from round 0 + newProp := types.NewProposal(height, 2, propBlockParts0.Header(), 0) + if err := cs3.SignProposal(chainID, newProp); err != nil { + t.Fatal(err) + } + cs1.SetProposalAndBlock(newProp, propBlock0, propBlockParts0, "some peer") + addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) // add the pol votes - log.Info("### ONTO ROUND 4") - /*Round4 - we see the polka from R2 - make sure we don't lock because of it! + <-newRoundCh + log.Notice("### ONTO Round 2") + /*Round2 + // now we see the polka from round 1, but we shouldnt unlock */ - // new round and not proposer - // (we already timed out and stepped into prevote) - - log.Warn("adding prevotes from round 2") - addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) - - log.Warn("Done adding prevotes!") - - // we should prevote it now - validatePrevote(t, cs1, 3, vss[0], cs1.ProposalBlock.Hash()) + select { + case <-timeoutProposeCh: + <-proposalCh + case <-proposalCh: + } - // but we shouldn't precommit it - precommits := cs1.Votes.Precommits(3) - vote := precommits.GetByIndex(0) - if vote != nil { - t.Fatal("validator precommitted at round 4 based on an old polka") + select { + case <-unlockCh: + t.Fatal("validator unlocked using an old polka") + case <-voteCh: + // prevote our locked block } + validatePrevote(t, cs1, 2, vss[0], propBlockHash1) + } //------------------------------------------------------------------------------------------ // SlashingSuite +// TODO: Slashing +/* func TestSlashingPrevotes(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - cs1.newStepCh = make(chan *RoundState) // so it blocks + + + proposalCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringCompleteProposal() , 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringTimeoutWait() , 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringNewRound() , 1) + voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _ = <-cs1.NewStepCh(), <-cs1.NewStepCh() + startTestRound(cs1, cs1.Height, 0) + <-newRoundCh + re := <-proposalCh + <-voteCh // prevote + + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) // we should now be stuck in limbo forever, waiting for more prevotes // add one for a different block should cause us to go into prevote wait hash := cs1.ProposalBlock.Hash() hash[0] = byte(hash[0]+1) % 255 - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlockParts.Header()) - // pass prevote wait - <-cs1.NewStepCh() + <-timeoutWaitCh // NOTE: we have to send the vote for different block first so we don't just go into precommit round right // away and ignore more prevotes (and thus fail to slash!) // add the conflicting vote - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header()) // XXX: Check for existence of Dupeout info } @@ -1066,35 +925,39 @@ func TestSlashingPrevotes(t *testing.T) { func TestSlashingPrecommits(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - cs1.newStepCh = make(chan *RoundState) // so it blocks + + + proposalCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringCompleteProposal() , 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringTimeoutWait() , 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringNewRound() , 1) + voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _ = <-cs1.NewStepCh(), <-cs1.NewStepCh() + startTestRound(cs1, cs1.Height, 0) + <-newRoundCh + re := <-proposalCh + <-voteCh // prevote // add prevote from cs2 - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header()) - // wait to finish precommit - <-cs1.NewStepCh() + <-voteCh // precommit // we should now be stuck in limbo forever, waiting for more prevotes // add one for a different block should cause us to go into prevote wait - hash := cs1.ProposalBlock.Hash() + hash := rs.ProposalBlock.Hash() hash[0] = byte(hash[0]+1) % 255 - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) - - // pass prevote wait - <-cs1.NewStepCh() + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlockParts.Header()) // NOTE: we have to send the vote for different block first so we don't just go into precommit round right // away and ignore more prevotes (and thus fail to slash!) // add precommit from cs2 - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header()) // XXX: Check for existence of Dupeout info } +*/ //------------------------------------------------------------------------------------------ // CatchupSuite @@ -1107,71 +970,61 @@ func TestSlashingPrecommits(t *testing.T) { func TestHalt1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - cs1.newStepCh = make(chan *RoundState) // so it blocks - timeoutChan := make(chan struct{}) - evsw := events.NewEventSwitch() - evsw.OnStart() - evsw.AddListenerForEvent("tester", types.EventStringTimeoutWait(), func(data types.EventData) { - timeoutChan <- struct{}{} - }) - cs1.SetFireable(evsw) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) + voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _ = <-cs1.NewStepCh(), <-cs1.NewStepCh() + startTestRound(cs1, cs1.Height, 0) + <-newRoundCh + re := <-proposalCh + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + propBlock := rs.ProposalBlock + propBlockParts := propBlock.MakePartSet() - theBlockHash := cs1.ProposalBlock.Hash() + <-voteCh // prevote - donePrecommit := make(chan struct{}) - go func() { - <-cs1.NewStepCh() - donePrecommit <- struct{}{} - }() - signAddVoteToFromMany(types.VoteTypePrevote, cs1, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), cs3, cs4) - <-donePrecommit + signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlock.Hash(), propBlockParts.Header(), cs3, cs4) + <-voteCh // precommit // the proposed block should now be locked and our precommit added - validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) + validatePrecommit(t, cs1, 0, 0, vss[0], propBlock.Hash(), propBlock.Hash()) - donePrecommitWait := make(chan struct{}) - go func() { - <-cs1.NewStepCh() - donePrecommitWait <- struct{}{} - }() // add precommits from the rest signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, nil, types.PartSetHeader{}) // didnt receive proposal - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, propBlock.Hash(), propBlockParts.Header()) // we receive this later, but cs3 might receive it earlier and with ours will go to commit! - precommit4 := signVote(cs4, types.VoteTypePrecommit, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) - <-donePrecommitWait + precommit4 := signVote(cs4, types.VoteTypePrecommit, propBlock.Hash(), propBlockParts.Header()) incrementRound(cs2, cs3, cs4) // timeout to new round - <-timeoutChan + <-timeoutWaitCh + re = <-newRoundCh + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) - log.Info("### ONTO ROUND 2") + log.Notice("### ONTO ROUND 1") /*Round2 // we timeout and prevote our lock // a polka happened but we didn't see it! */ // go to prevote, prevote for locked block - _, _ = <-cs1.NewStepCh(), <-cs1.NewStepCh() - validatePrevote(t, cs1, 0, vss[0], cs1.LockedBlock.Hash()) + <-voteCh // prevote + validatePrevote(t, cs1, 0, vss[0], rs.LockedBlock.Hash()) // now we receive the precommit from the previous round addVoteToFrom(cs1, cs4, precommit4) // receiving that precommit should take us straight to commit - ensureNewStep(t, cs1) - log.Warn("done enter commit!") - - // update to state - ensureNewStep(t, cs1) + <-newBlockCh + re = <-newRoundCh + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) - if cs1.Height != 2 { + if rs.Height != 2 { t.Fatal("expected height to increment") } } diff --git a/events/events.go b/events/events.go index 05f7115f6..647522d0a 100644 --- a/events/events.go +++ b/events/events.go @@ -10,7 +10,7 @@ import ( // reactors and other modules should export // this interface to become eventable type Eventable interface { - SetFireable(Fireable) + SetEventSwitch(evsw *EventSwitch) } // an event switch or cache implements fireable @@ -123,6 +123,16 @@ func (evsw *EventSwitch) FireEvent(event string, data types.EventData) { eventCell.FireEvent(data) } +func (evsw *EventSwitch) SubscribeToEvent(receiver, eventID string, chanCap int) chan interface{} { + // listen for new round + ch := make(chan interface{}, chanCap) + evsw.AddListenerForEvent(receiver, eventID, func(data types.EventData) { + // NOTE: in production, evsw callbacks should be nonblocking. + ch <- data + }) + return ch +} + //----------------------------------------------------------------------------- // eventCell handles keeping track of listener callbacks for a given event. diff --git a/mempool/reactor.go b/mempool/reactor.go index d6140d2d4..13bb20ad3 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -25,7 +25,7 @@ const ( type MempoolReactor struct { p2p.BaseReactor Mempool *Mempool // TODO: un-expose - evsw events.Fireable + evsw *events.EventSwitch } func NewMempoolReactor(mempool *Mempool) *MempoolReactor { @@ -135,7 +135,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { } // implements events.Eventable -func (memR *MempoolReactor) SetFireable(evsw events.Fireable) { +func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) { memR.evsw = evsw } diff --git a/node/node.go b/node/node.go index 85de92dce..539e315dc 100644 --- a/node/node.go +++ b/node/node.go @@ -94,7 +94,7 @@ func NewNode() *Node { // add the event switch to all services // they should all satisfy events.Eventable - SetFireable(eventSwitch, bcReactor, mempoolReactor, consensusReactor) + SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor) // run the profile server profileHost := config.GetString("prof_laddr") @@ -133,9 +133,9 @@ func (n *Node) Stop() { } // Add the event switch to reactors, mempool, etc. -func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) { +func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) { for _, e := range eventables { - e.SetFireable(evsw) + e.SetEventSwitch(evsw) } } diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index c26b60fe7..40e3abe73 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -206,7 +206,7 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { // rpc.websocket const ( - writeChanCapacity = 20 + writeChanCapacity = 1000 wsWriteTimeoutSeconds = 30 // each write times out after this wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. @@ -289,7 +289,7 @@ func (wsc *WSConnection) readTimeoutRoutine() { } } -// Block trying to write to writeChan until service stops. +// Blocking write to writeChan until service stops. func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) { select { case <-wsc.Quit: @@ -298,6 +298,18 @@ func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) { } } +// Nonblocking write. +func (wsc *WSConnection) tryWriteRPCResponse(resp RPCResponse) bool { + select { + case <-wsc.Quit: + return false + case wsc.writeChan <- resp: + return true + default: + return false + } +} + // Read from the socket and subscribe to or unsubscribe from events func (wsc *WSConnection) readRoutine() { // Do not close writeChan, to allow writeRPCResponse() to fail. @@ -340,8 +352,9 @@ func (wsc *WSConnection) readRoutine() { } else { log.Notice("Subscribe to event", "id", wsc.id, "event", event) wsc.evsw.AddListenerForEvent(wsc.id, event, func(msg types.EventData) { + // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsc.writeRPCResponse(NewRPCResponse(request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) + wsc.tryWriteRPCResponse(NewRPCResponse(request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) }) continue } diff --git a/state/state.go b/state/state.go index 32a6bf179..9523a6497 100644 --- a/state/state.go +++ b/state/state.go @@ -33,7 +33,7 @@ type State struct { LastValidators *types.ValidatorSet LastAppHash []byte - evc events.Fireable // typically an events.EventCache + evc *events.EventCache } func LoadState(db dbm.DB) *State { @@ -81,8 +81,7 @@ func (s *State) Save() { s.db.Set(stateKey, buf.Bytes()) } -// Implements events.Eventable. Typically uses events.EventCache -func (s *State) SetFireable(evc events.Fireable) { +func (s *State) SetEventCache(evc *events.EventCache) { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/types/block.go b/types/block.go index e574f2893..c3e232030 100644 --- a/types/block.go +++ b/types/block.go @@ -62,8 +62,12 @@ func (b *Block) ValidateBasic(chainID string, lastBlockHeight int, lastBlockHash } func (b *Block) FillHeader() { - b.LastValidationHash = b.LastValidation.Hash() - b.DataHash = b.Data.Hash() + if b.LastValidationHash == nil { + b.LastValidationHash = b.LastValidation.Hash() + } + if b.DataHash == nil { + b.DataHash = b.Data.Hash() + } } // Computes and returns the block hash. diff --git a/types/events.go b/types/events.go index 3b1598414..436e02de6 100644 --- a/types/events.go +++ b/types/events.go @@ -1,8 +1,6 @@ package types import ( - "time" - "github.com/tendermint/go-wire" ) @@ -17,6 +15,7 @@ func EventStringFork() string { return "Fork" } func EventStringNewBlock() string { return "NewBlock" } func EventStringNewRound() string { return "NewRound" } +func EventStringNewRoundStep() string { return "NewRoundStep" } func EventStringTimeoutPropose() string { return "TimeoutPropose" } func EventStringCompleteProposal() string { return "CompleteProposal" } func EventStringPolka() string { return "Polka" } @@ -72,21 +71,21 @@ type EventDataApp struct { Data []byte `json:"bytes"` } -// We fire the most recent round state that led to the event -// (ie. NewRound will have the previous rounds state) type EventDataRoundState struct { - CurrentTime time.Time `json:"current_time"` - - Height int `json:"height"` - Round int `json:"round"` - Step string `json:"step"` - StartTime time.Time `json:"start_time"` - CommitTime time.Time `json:"commit_time"` - Proposal *Proposal `json:"proposal"` - ProposalBlock *Block `json:"proposal_block"` - LockedRound int `json:"locked_round"` - LockedBlock *Block `json:"locked_block"` - POLRound int `json:"pol_round"` + Height int `json:"height"` + Round int `json:"round"` + Step string `json:"step"` + + // private, not exposed to websockets + rs interface{} +} + +func (edrs *EventDataRoundState) RoundState() interface{} { + return edrs.rs +} + +func (edrs *EventDataRoundState) SetRoundState(rs interface{}) { + edrs.rs = rs } type EventDataVote struct {