From 07a96a703e040f8e0b450fce8246bcd4d74c9dc9 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 12 Dec 2015 17:22:48 -0500 Subject: [PATCH 1/5] move updateToState back --- consensus/state.go | 42 +++++++++++++++++++----------------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index 0dd0efe6a..d83eae446 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -265,28 +265,30 @@ func (cs *ConsensusState) NewStepCh() chan *RoundState { func (cs *ConsensusState) OnStart() error { cs.BaseService.OnStart() - // first we start the round (no go routines) + // first we schedule the round (no go routines) // then we start the timeout and receive routines. - // buffered channels means scheduleRound0 will finish. Once it does, - // all further access to the RoundState is through the receiveRoutine + // tickChan is buffered so scheduleRound0 will finish. + // Then all further access to the RoundState is through the receiveRoutine cs.scheduleRound0(cs.Height) - cs.startRoutines(0) // start timeout and receive + cs.startRoutines(0) return nil } +// timeoutRoutine: receive requests for timeouts on tickChan and fire timeouts on tockChan +// receiveRoutine: serializes processing of proposoals, block parts, votes; coordinates state transitions func (cs *ConsensusState) startRoutines(maxSteps int) { - go cs.timeoutRoutine() // receive requests for timeouts on tickChan and fire timeouts on tockChan - go cs.receiveRoutine(maxSteps) // serializes processing of proposoals, block parts, votes, and coordinates state transitions + go cs.timeoutRoutine() + go cs.receiveRoutine(maxSteps) } func (cs *ConsensusState) OnStop() { cs.QuitService.OnStop() } -/* - The following three functions can be used to send messages into the consensus state - which may cause a state transition -*/ +//------------------------------------------------------------ +// Public interface for passing messages into the consensus state, +// possibly causing a state transition +// TODO: should these return anything or let callers just use events? // May block on send if queue is full. func (cs *ConsensusState) AddVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) { @@ -335,7 +337,7 @@ func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *t return nil // TODO errors } -//---------------------------------------------- +//------------------------------------------------------------ // internal functions for managing the state func (cs *ConsensusState) updateHeight(height int) { @@ -601,17 +603,6 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) - // if this is a timeout for the new height - if ti.height == rs.Height+1 && ti.round == 0 && ti.step == 1 { - cs.mtx.Lock() - // Increment height. - cs.updateToState(cs.stagedState) - // event fired from EnterNewRound after some updates - cs.EnterNewRound(ti.height, 0) - cs.mtx.Unlock() - return - } - // timeouts must be for current height, round, step if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) { log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step) @@ -623,6 +614,10 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { defer cs.mtx.Unlock() switch ti.step { + case RoundStepNewHeight: + // NewRound event fired from EnterNewRound. + // Do we want a timeout event too? + cs.EnterNewRound(ti.height, 0) case RoundStepPropose: cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent()) cs.EnterPrevote(ti.height, ti.round) @@ -1148,7 +1143,8 @@ func (cs *ConsensusState) FinalizeCommit(height int) { // We have the block, so stage/save/commit-vote. cs.saveBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Votes.Precommits(cs.CommitRound)) - // call updateToState from handleTimeout + // NewHeightStep! + cs.updateToState(cs.stagedState) // cs.StartTime is already set. // Schedule Round0 to start soon. From 9dea9539b4acb1d2e8d0af82f9412fd9ee6b5813 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 13 Dec 2015 14:56:05 -0500 Subject: [PATCH 2/5] fix consensus tests --- consensus/common_test.go | 44 ++- consensus/state.go | 4 +- consensus/state_test.go | 830 ++++++++++++++++----------------------- types/block.go | 8 +- 4 files changed, 386 insertions(+), 500 deletions(-) diff --git a/consensus/common_test.go b/consensus/common_test.go index f31f69ae1..40609c6e9 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -203,6 +203,18 @@ func waitFor(t *testing.T, cs *ConsensusState, height int, round int, step Round } } +func incrementHeight(vss ...*validatorStub) { + for _, vs := range vss { + vs.Height += 1 + } +} + +func incrementRound(vss ...*validatorStub) { + for _, vs := range vss { + vs.Round += 1 + } +} + func validatePrevote(t *testing.T, cs *ConsensusState, round int, privVal *validatorStub, blockHash []byte) { prevotes := cs.Votes.Prevotes(round) var vote *types.Vote @@ -220,15 +232,14 @@ func validatePrevote(t *testing.T, cs *ConsensusState, round int, privVal *valid } } -func incrementHeight(vss ...*validatorStub) { - for _, vs := range vss { - vs.Height += 1 +func validateLastPrecommit(t *testing.T, cs *ConsensusState, privVal *validatorStub, blockHash []byte) { + votes := cs.LastCommit + var vote *types.Vote + if vote = votes.GetByAddress(privVal.Address); vote == nil { + panic("Failed to find precommit from validator") } -} - -func incrementRound(vss ...*validatorStub) { - for _, vs := range vss { - vs.Round += 1 + if !bytes.Equal(vote.BlockHash, blockHash) { + panic(fmt.Sprintf("Expected precommit to be for %X, got %X", blockHash, vote.BlockHash)) } } @@ -332,6 +343,23 @@ func subscribeToEvent(cs *ConsensusState, eventID string) chan interface{} { return ch } +func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { + + voteCh0 := subscribeToEvent(cs, types.EventStringVote()) + voteCh := make(chan interface{}) + go func() { + for { + v := <-voteCh0 + vote := v.(*types.EventDataVote) + // we only fire for our own votes + if bytes.Equal(addr, vote.Address) { + voteCh <- v + } + } + }() + return voteCh +} + func randGenesisState(numValidators int, randPower bool, minPower int64) (*sm.State, []*types.PrivValidator) { db := dbm.NewMemDB() genDoc, privValidators := randGenesisDoc(numValidators, randPower, minPower) diff --git a/consensus/state.go b/consensus/state.go index d83eae446..4f4723615 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -332,7 +332,7 @@ func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *t cs.SetProposal(proposal, peerKey) for i := 0; i < parts.Total(); i++ { part := parts.GetPart(i) - cs.AddProposalBlockPart(cs.Height, cs.Round, part, peerKey) + cs.AddProposalBlockPart(proposal.Height, proposal.Round, part, peerKey) } return nil // TODO errors } @@ -1219,7 +1219,7 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (ad var n int var err error cs.ProposalBlock = wire.ReadBinary(&types.Block{}, cs.ProposalBlockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block) - log.Info("Received complete proposal", "hash", cs.ProposalBlock.Hash()) + log.Info("Received complete proposal", "hash", cs.ProposalBlock.Hash(), "round", cs.Proposal.Round) if cs.Step == RoundStepPropose && cs.isProposalComplete() { // Move onto the next step cs.EnterPrevote(height, cs.Round) diff --git a/consensus/state_test.go b/consensus/state_test.go index 682ade2fa..13619965a 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -8,7 +8,7 @@ import ( _ "github.com/tendermint/tendermint/config/tendermint_test" //"github.com/tendermint/tendermint/events" - "github.com/tendermint/tendermint/events" + // "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/types" ) @@ -235,18 +235,26 @@ func TestFullRound1(t *testing.T) { height, round := cs.Height, cs.Round voteCh := subscribeToEvent(cs, types.EventStringVote()) + propCh := subscribeToEvent(cs, types.EventStringCompleteProposal()) + newRoundCh := subscribeToEvent(cs, types.EventStringNewRound()) - cs.EnterNewRound(height, round) - cs.startRoutines(5) + startTestRound(cs, height, round) + + <-newRoundCh + + // grab proposal + re := <-propCh + propBlockHash := re.(*types.EventDataRoundState).ProposalBlock.Hash() <-voteCh // wait for prevote - <-voteCh // wait for precommit + validatePrevote(t, cs, round, vss[0], propBlockHash) - propBlockHash := cs.GetRoundState().ProposalBlock.Hash() + <-voteCh // wait for precommit - // the proposed block should be prevoted, precommitted, and locked - validatePrevoteAndPrecommit(t, cs, round, round, vss[0], propBlockHash, propBlockHash) + // we're going to roll right into new height + <-newRoundCh + validateLastPrecommit(t, cs, vss[0], propBlockHash) } // nil is proposed, so prevote and precommit nil @@ -313,38 +321,34 @@ func TestFullRound2(t *testing.T) { func TestLockNoPOL(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - cs1.newStepCh = make(chan *RoundState) // so it blocks height := cs1.Height - timeoutChan := make(chan struct{}) - evsw := events.NewEventSwitch() - evsw.OnStart() - evsw.AddListenerForEvent("tester", types.EventStringTimeoutPropose(), func(data types.EventData) { - timeoutChan <- struct{}{} - }) - evsw.AddListenerForEvent("tester", types.EventStringTimeoutWait(), func(data types.EventData) { - timeoutChan <- struct{}{} - }) - cs1.SetFireable(evsw) + timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) + timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) + voteCh := subscribeToEvent(cs1, types.EventStringVote()) + proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) + newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) /* Round1 (cs1, B) // B B // B B2 */ // start round and wait for prevote - go cs1.EnterNewRound(height, 0) - waitFor(t, cs1, height, 0, RoundStepPrevote) + cs1.EnterNewRound(height, 0) + cs1.startRoutines(0) + + re := <-proposalCh + rs := re.(*types.EventDataRoundState) + theBlockHash := rs.ProposalBlock.Hash() + + <-voteCh // prevote // we should now be stuck in limbo forever, waiting for more prevotes // prevote arrives from cs2: signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) + <-voteCh // prevote - cs1.mtx.Lock() // XXX: sigh - theBlockHash := cs1.ProposalBlock.Hash() - cs1.mtx.Unlock() - - // wait to finish precommit - waitFor(t, cs1, height, 0, RoundStepPrecommit) + <-voteCh // precommit // the proposed block should now be locked and our precommit added validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) @@ -352,16 +356,20 @@ func TestLockNoPOL(t *testing.T) { // we should now be stuck in limbo forever, waiting for more precommits // lets add one for a different block // NOTE: in practice we should never get to a point where there are precommits for different blocks at the same round - hash := cs1.ProposalBlock.Hash() + hash := make([]byte, len(theBlockHash)) + copy(hash, theBlockHash) hash[0] = byte((hash[0] + 1) % 255) - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) + <-voteCh // precommit // (note we're entering precommit for a second time this round) // but with invalid args. then we EnterPrecommitWait, and the timeout to new round - waitFor(t, cs1, height, 0, RoundStepPrecommitWait) - <-timeoutChan + <-timeoutWaitCh + + /// - log.Info("#### ONTO ROUND 2") + <-newRoundCh + log.Notice("#### ONTO ROUND 1") /* Round2 (cs1, B) // B B2 */ @@ -369,27 +377,28 @@ func TestLockNoPOL(t *testing.T) { incrementRound(cs2) // now we're on a new round and not the proposer, so wait for timeout - waitFor(t, cs1, height, 1, RoundStepPropose) - <-timeoutChan + re = <-timeoutProposeCh + rs = re.(*types.EventDataRoundState) - if cs1.ProposalBlock != nil { + if rs.ProposalBlock != nil { t.Fatal("Expected proposal block to be nil") } // wait to finish prevote - waitFor(t, cs1, height, 1, RoundStepPrevote) + <-voteCh // we should have prevoted our locked block - validatePrevote(t, cs1, 1, vss[0], cs1.LockedBlock.Hash()) + validatePrevote(t, cs1, 1, vss[0], rs.LockedBlock.Hash()) // add a conflicting prevote from the other validator - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) + <-voteCh // now we're going to enter prevote again, but with invalid args // and then prevote wait, which should timeout. then wait for precommit - waitFor(t, cs1, height, 1, RoundStepPrevoteWait) - <-timeoutChan - waitFor(t, cs1, height, 1, RoundStepPrecommit) + <-timeoutWaitCh + + <-voteCh // precommit // the proposed block should still be locked and our precommit added // we should precommit nil and be locked on the proposal @@ -397,43 +406,46 @@ func TestLockNoPOL(t *testing.T) { // add conflicting precommit from cs2 // NOTE: in practice we should never get to a point where there are precommits for different blocks at the same round - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) + <-voteCh // (note we're entering precommit for a second time this round, but with invalid args // then we EnterPrecommitWait and timeout into NewRound - waitFor(t, cs1, height, 1, RoundStepPrecommitWait) - <-timeoutChan + <-timeoutWaitCh - log.Info("#### ONTO ROUND 3") + <-newRoundCh + log.Notice("#### ONTO ROUND 2") /* Round3 (cs2, _) // B, B2 */ incrementRound(cs2) - waitFor(t, cs1, height, 2, RoundStepPropose) + re = <-proposalCh + rs = re.(*types.EventDataRoundState) // now we're on a new round and are the proposer - if cs1.ProposalBlock != cs1.LockedBlock { - t.Fatalf("Expected proposal block to be locked block. Got %v, Expected %v", cs1.ProposalBlock, cs1.LockedBlock) + if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) { + t.Fatalf("Expected proposal block to be locked block. Got %v, Expected %v", rs.ProposalBlock, rs.LockedBlock) } - // go to prevote, prevote for locked block - waitFor(t, cs1, height, 2, RoundStepPrevote) + <-voteCh // prevote - validatePrevote(t, cs1, 0, vss[0], cs1.LockedBlock.Hash()) + // TODO: is the round right?! + validatePrevote(t, cs1, 0, vss[0], rs.LockedBlock.Hash()) // TODO: quick fastforward to new round, set proposer - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) + <-voteCh - waitFor(t, cs1, height, 2, RoundStepPrevoteWait) - <-timeoutChan - waitFor(t, cs1, height, 2, RoundStepPrecommit) + <-timeoutWaitCh // prevote wait + <-voteCh // precommit - validatePrecommit(t, cs1, 2, 0, vss[0], nil, theBlockHash) // precommit nil but be locked on proposal - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) // NOTE: conflicting precommits at same height + validatePrecommit(t, cs1, 2, 0, vss[0], nil, theBlockHash) // precommit nil but be locked on proposal + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) // NOTE: conflicting precommits at same height + <-voteCh - waitFor(t, cs1, height, 2, RoundStepPrecommitWait) + <-timeoutWaitCh // before we time out into new round, set next proposal block prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) @@ -443,66 +455,45 @@ func TestLockNoPOL(t *testing.T) { incrementRound(cs2) - <-timeoutChan - - log.Info("#### ONTO ROUND 4") + <-newRoundCh + log.Notice("#### ONTO ROUND 3") /* Round4 (cs2, C) // B C // B C */ // now we're on a new round and not the proposer // so set the proposal block - cs1.mtx.Lock() - cs1.Proposal, cs1.ProposalBlock = prop, propBlock - cs1.mtx.Unlock() - - // wait for the proposal go ahead - waitFor(t, cs1, height, 3, RoundStepPropose) - - //log.Debug("waiting for timeout") - // and wait for timeout - // go func() { <-timeoutChan }() + cs1.SetProposalAndBlock(prop, propBlock, propBlock.MakePartSet(), "") - log.Debug("waiting for prevote") - // go to prevote, prevote for locked block (not proposal) - waitFor(t, cs1, height, 3, RoundStepPrevote) + <-voteCh // prevote + // prevote for locked block (not proposal) validatePrevote(t, cs1, 0, vss[0], cs1.LockedBlock.Hash()) signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, propBlock.Hash(), propBlock.MakePartSet().Header()) + <-voteCh - waitFor(t, cs1, height, 3, RoundStepPrevoteWait) - <-timeoutChan - waitFor(t, cs1, height, 3, RoundStepPrecommit) + <-timeoutWaitCh + <-voteCh validatePrecommit(t, cs1, 2, 0, vss[0], nil, theBlockHash) // precommit nil but locked on proposal signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, propBlock.Hash(), propBlock.MakePartSet().Header()) // NOTE: conflicting precommits at same height + <-voteCh } // 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka func TestLockPOLRelock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - cs1.newStepCh = make(chan *RoundState) // so it blocks - - timeoutChan := make(chan *types.EventDataRoundState) - voteChan := make(chan *types.EventDataVote) - evsw := events.NewEventSwitch() - evsw.OnStart() - evsw.AddListenerForEvent("tester", types.EventStringTimeoutPropose(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringTimeoutWait(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringVote(), func(data types.EventData) { - vote := data.(*types.EventDataVote) - // we only fire for our own votes - if bytes.Equal(cs1.privValidator.Address, vote.Address) { - voteChan <- vote - } - }) - cs1.SetFireable(evsw) + + timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) + timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) + proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) + voteCh := subscribeToEvent(cs1, types.EventStringVote()) + newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + newBlockCh := subscribeToEvent(cs1, types.EventStringNewBlock()) + + log.Debug("cs2 last round", "lr", cs2.PrivValidator.LastRound) // everything done from perspective of cs1 @@ -513,58 +504,42 @@ func TestLockPOLRelock(t *testing.T) { */ // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _, _ = <-cs1.NewStepCh(), <-voteChan, <-cs1.NewStepCh() - - theBlockHash := cs1.ProposalBlock.Hash() - - // wait to finish precommit after prevotes done - // we do this in a go routine with another channel since otherwise - // we may get deadlock with EnterPrecommit waiting to send on newStepCh and the final - // signAddVoteToFrom waiting for the cs.mtx.Lock - donePrecommit := make(chan struct{}) - go func() { - <-voteChan - <-cs1.NewStepCh() - donePrecommit <- struct{}{} - }() + startTestRound(cs1, cs1.Height, 0) + + <-newRoundCh + re := <-proposalCh + rs := re.(*types.EventDataRoundState) + theBlockHash := rs.ProposalBlock.Hash() + + <-voteCh // prevote + signAddVoteToFromMany(types.VoteTypePrevote, cs1, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), cs2, cs3, cs4) - <-donePrecommit + _, _, _ = <-voteCh, <-voteCh, <-voteCh // prevotes + <-voteCh // our precommit // the proposed block should now be locked and our precommit added validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) - donePrecommitWait := make(chan struct{}) - go func() { - // (note we're entering precommit for a second time this round) - // but with invalid args. then we EnterPrecommitWait, twice (?) - <-cs1.NewStepCh() - donePrecommitWait <- struct{}{} - }() // add precommits from the rest signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs4) signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) - <-donePrecommitWait - - // before we time out into new round, set next proposer - // and next proposal block - _, v1 := cs1.Validators.GetByAddress(vss[0].Address) - v1.VotingPower = 1 - if updated := cs1.Validators.Update(v1); !updated { - t.Fatal("failed to update validator") - } + _, _, _ = <-voteCh, <-voteCh, <-voteCh // precommits + // before we timeout to the new round set the new proposal prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) + propBlockParts := propBlock.MakePartSet() + propBlockHash := propBlock.Hash() incrementRound(cs2, cs3, cs4) // timeout to new round - te := <-timeoutChan - if te.Step != RoundStepPrecommitWait.String() { - t.Fatalf("expected to timeout of precommit into new round. got %v", te.Step) - } + <-timeoutWaitCh - log.Info("### ONTO ROUND 2") + //XXX: this isnt gauranteed to get there before the timeoutPropose ... + cs1.SetProposalAndBlock(prop, propBlock, propBlockParts, "some peer") + + <-newRoundCh + log.Notice("### ONTO ROUND 1") /* Round2 (cs2, C) // B C C C // C C C _) @@ -573,53 +548,44 @@ func TestLockPOLRelock(t *testing.T) { */ // now we're on a new round and not the proposer - // so set the proposal block - cs1.mtx.Lock() - propBlockHash, propBlockParts := propBlock.Hash(), propBlock.MakePartSet() - cs1.Proposal, cs1.ProposalBlock, cs1.ProposalBlockParts = prop, propBlock, propBlockParts - cs1.mtx.Unlock() - <-cs1.NewStepCh() + // but we should receive the proposal + select { + case <-proposalCh: + case <-timeoutProposeCh: + <-proposalCh + } // go to prevote, prevote for locked block (not proposal), move on - _, _ = <-voteChan, <-cs1.NewStepCh() + <-voteCh validatePrevote(t, cs1, 0, vss[0], theBlockHash) - donePrecommit = make(chan struct{}) - go func() { - // we need this go routine because if we go into PrevoteWait it has to pull on newStepCh - // before the final vote will get added (because it holds the mutex). - select { - case <-cs1.NewStepCh(): // we're in PrevoteWait, go to Precommit - <-voteChan - case <-voteChan: // we went straight to Precommit - } - donePrecommit <- struct{}{} - }() // now lets add prevotes from everyone else for the new block signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlockHash, propBlockParts.Header(), cs2, cs3, cs4) - <-donePrecommit + _, _, _ = <-voteCh, <-voteCh, <-voteCh // prevotes + + // now either we go to PrevoteWait or Precommit + select { + case <-timeoutWaitCh: // we're in PrevoteWait, go to Precommit + <-voteCh + case <-voteCh: // we went straight to Precommit + } // we should have unlocked and locked on the new block validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash, propBlockHash) - donePrecommitWait = make(chan struct{}) - go func() { - // (note we're entering precommit for a second time this round) - // but with invalid args. then we EnterPrecommitWait, - <-cs1.NewStepCh() - donePrecommitWait <- struct{}{} - }() signAddVoteToFromMany(types.VoteTypePrecommit, cs1, propBlockHash, propBlockParts.Header(), cs2, cs3) - <-donePrecommitWait + _, _ = <-voteCh, <-voteCh - <-cs1.NewStepCh() - rs := <-cs1.NewStepCh() + be := <-newBlockCh + b := be.(types.EventDataNewBlock) + re = <-newRoundCh + rs = re.(*types.EventDataRoundState) if rs.Height != 2 { t.Fatal("Expected height to increment") } - if hash, _, ok := rs.LastCommit.TwoThirdsMajority(); !ok || !bytes.Equal(hash, propBlockHash) { - t.Fatal("Expected block to get committed") + if !bytes.Equal(b.Block.Hash(), propBlockHash) { + t.Fatal("Expected new block to be proposal block") } } @@ -627,26 +593,13 @@ func TestLockPOLRelock(t *testing.T) { func TestLockPOLUnlock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - cs1.newStepCh = make(chan *RoundState) // so it blocks - - timeoutChan := make(chan *types.EventDataRoundState) - voteChan := make(chan *types.EventDataVote) - evsw := events.NewEventSwitch() - evsw.OnStart() - evsw.AddListenerForEvent("tester", types.EventStringTimeoutPropose(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringTimeoutWait(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringVote(), func(data types.EventData) { - vote := data.(*types.EventDataVote) - // we only fire for our own votes - if bytes.Equal(cs1.privValidator.Address, vote.Address) { - voteChan <- vote - } - }) - cs1.SetFireable(evsw) + + proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) + timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) + timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) + newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + unlockCh := subscribeToEvent(cs1, types.EventStringUnlock()) + voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // everything done from perspective of cs1 @@ -657,49 +610,41 @@ func TestLockPOLUnlock(t *testing.T) { */ // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _, _ = <-cs1.NewStepCh(), <-voteChan, <-cs1.NewStepCh() + startTestRound(cs1, cs1.Height, 0) + <-newRoundCh + re := <-proposalCh + rs := re.(*types.EventDataRoundState) + theBlockHash := rs.ProposalBlock.Hash() - theBlockHash := cs1.ProposalBlock.Hash() + <-voteCh // prevote - donePrecommit := make(chan struct{}) - go func() { - <-voteChan - <-cs1.NewStepCh() - donePrecommit <- struct{}{} - }() signAddVoteToFromMany(types.VoteTypePrevote, cs1, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), cs2, cs3, cs4) - <-donePrecommit + + <-voteCh //precommit // the proposed block should now be locked and our precommit added validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) - donePrecommitWait := make(chan struct{}) - go func() { - <-cs1.NewStepCh() - donePrecommitWait <- struct{}{} - }() // add precommits from the rest signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs4) signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) - <-donePrecommitWait - - // before we time out into new round, set next proposer - // and next proposal block - _, v1 := cs1.Validators.GetByAddress(vss[0].Address) - v1.VotingPower = 1 - if updated := cs1.Validators.Update(v1); !updated { - t.Fatal("failed to update validator") - } + // before we time out into new round, set next proposal block prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) + propBlockParts := propBlock.MakePartSet() incrementRound(cs2, cs3, cs4) // timeout to new round - <-timeoutChan + re = <-timeoutWaitCh + rs = re.(*types.EventDataRoundState) + lockedBlockHash := rs.LockedBlock.Hash() + + //XXX: this isnt gauranteed to get there before the timeoutPropose ... + cs1.SetProposalAndBlock(prop, propBlock, propBlockParts, "some peer") - log.Info("#### ONTO ROUND 2") + <-newRoundCh + log.Notice("#### ONTO ROUND 1") /* Round2 (cs2, C) // B nil nil nil // nil nil nil _ @@ -707,43 +652,29 @@ func TestLockPOLUnlock(t *testing.T) { */ // now we're on a new round and not the proposer, - // so set the proposal block - cs1.mtx.Lock() - cs1.Proposal, cs1.ProposalBlock, cs1.ProposalBlockParts = prop, propBlock, propBlock.MakePartSet() - lockedBlockHash := cs1.LockedBlock.Hash() - cs1.mtx.Unlock() - <-cs1.NewStepCh() + // but we should receive the proposal + select { + case <-proposalCh: + case <-timeoutProposeCh: + <-proposalCh + } // go to prevote, prevote for locked block (not proposal) - _, _ = <-voteChan, <-cs1.NewStepCh() + <-voteCh validatePrevote(t, cs1, 0, vss[0], lockedBlockHash) - - donePrecommit = make(chan struct{}) - go func() { - select { - case <-cs1.NewStepCh(): // we're in PrevoteWait, go to Precommit - <-voteChan - case <-voteChan: // we went straight to Precommit - } - donePrecommit <- struct{}{} - }() - // now lets add prevotes from everyone else for the new block + // now lets add prevotes from everyone else for nil (a polka!) signAddVoteToFromMany(types.VoteTypePrevote, cs1, nil, types.PartSetHeader{}, cs2, cs3, cs4) - <-donePrecommit - // we should have unlocked - // NOTE: we don't lock on nil, so LockedRound is still 0 + // the polka makes us unlock and precommit nil + <-unlockCh + <-voteCh // precommit + + // we should have unlocked and committed nil + // NOTE: since we don't relock on nil, the lock round is 0 validatePrecommit(t, cs1, 1, 0, vss[0], nil, nil) - donePrecommitWait = make(chan struct{}) - go func() { - // the votes will bring us to new round right away - // we should timeout of it - _, _, _ = <-cs1.NewStepCh(), <-cs1.NewStepCh(), <-timeoutChan - donePrecommitWait <- struct{}{} - }() signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs3) - <-donePrecommitWait + <-newRoundCh } // 4 vals @@ -753,45 +684,35 @@ func TestLockPOLUnlock(t *testing.T) { func TestLockPOLSafety1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - cs1.newStepCh = make(chan *RoundState) // so it blocks - - timeoutChan := make(chan *types.EventDataRoundState) - voteChan := make(chan *types.EventDataVote) - evsw := events.NewEventSwitch() - evsw.OnStart() - evsw.AddListenerForEvent("tester", types.EventStringTimeoutPropose(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringTimeoutWait(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringVote(), func(data types.EventData) { - vote := data.(*types.EventDataVote) - // we only fire for our own votes - if bytes.Equal(cs1.privValidator.Address, vote.Address) { - voteChan <- vote - } - }) - cs1.SetFireable(evsw) + + proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) + timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) + timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) + newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _, _ = <-cs1.NewStepCh(), <-voteChan, <-cs1.NewStepCh() + startTestRound(cs1, cs1.Height, 0) + <-newRoundCh + re := <-proposalCh + rs := re.(*types.EventDataRoundState) + propBlock := rs.ProposalBlock - propBlock := cs1.ProposalBlock + <-voteCh // prevote - validatePrevote(t, cs1, 0, vss[0], cs1.ProposalBlock.Hash()) + validatePrevote(t, cs1, 0, vss[0], propBlock.Hash()) // the others sign a polka but we don't see it prevotes := signVoteMany(types.VoteTypePrevote, propBlock.Hash(), propBlock.MakePartSet().Header(), cs2, cs3, cs4) // before we time out into new round, set next proposer // and next proposal block - _, v1 := cs1.Validators.GetByAddress(vss[0].Address) - v1.VotingPower = 1 - if updated := cs1.Validators.Update(v1); !updated { - t.Fatal("failed to update validator") - } + /* + _, v1 := cs1.Validators.GetByAddress(vss[0].Address) + v1.VotingPower = 1 + if updated := cs1.Validators.Update(v1); !updated { + t.Fatal("failed to update validator") + }*/ log.Warn("old prop", "hash", fmt.Sprintf("%X", propBlock.Hash())) @@ -799,73 +720,65 @@ func TestLockPOLSafety1(t *testing.T) { signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs3, cs4) prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) + propBlockHash := propBlock.Hash() + propBlockParts := propBlock.MakePartSet() incrementRound(cs2, cs3, cs4) - log.Info("### ONTO ROUND 2") + //XXX: this isnt gauranteed to get there before the timeoutPropose ... + cs1.SetProposalAndBlock(prop, propBlock, propBlockParts, "some peer") + + <-newRoundCh + log.Notice("### ONTO ROUND 1") /*Round2 // we timeout and prevote our lock // a polka happened but we didn't see it! */ // now we're on a new round and not the proposer, - // so set proposal - cs1.mtx.Lock() - propBlockHash, propBlockParts := propBlock.Hash(), propBlock.MakePartSet() - cs1.Proposal, cs1.ProposalBlock, cs1.ProposalBlockParts = prop, propBlock, propBlockParts - cs1.mtx.Unlock() - <-cs1.NewStepCh() - - if cs1.LockedBlock != nil { + // but we should receive the proposal + select { + case re = <-proposalCh: + case <-timeoutProposeCh: + re = <-proposalCh + } + + rs = re.(*types.EventDataRoundState) + + if rs.LockedBlock != nil { t.Fatal("we should not be locked!") } log.Warn("new prop", "hash", fmt.Sprintf("%X", propBlockHash)) // go to prevote, prevote for proposal block - _, _ = <-voteChan, <-cs1.NewStepCh() + <-voteCh validatePrevote(t, cs1, 1, vss[0], propBlockHash) // now we see the others prevote for it, so we should lock on it - donePrecommit := make(chan struct{}) - go func() { - select { - case <-cs1.NewStepCh(): // we're in PrevoteWait, go to Precommit - <-voteChan - case <-voteChan: // we went straight to Precommit - } - <-cs1.NewStepCh() - donePrecommit <- struct{}{} - }() - // now lets add prevotes from everyone else for nil signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlockHash, propBlockParts.Header(), cs2, cs3, cs4) - <-donePrecommit + + <-voteCh // precommit // we should have precommitted validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash, propBlockHash) - // now we see precommits for nil - donePrecommitWait := make(chan struct{}) - go func() { - // the votes will bring us to new round - // we should timeut of it and go to prevote - <-cs1.NewStepCh() - <-timeoutChan - donePrecommitWait <- struct{}{} - }() signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs3) - <-donePrecommitWait + + <-timeoutWaitCh incrementRound(cs2, cs3, cs4) - log.Info("### ONTO ROUND 3") + <-newRoundCh + + log.Notice("### ONTO ROUND 2") /*Round3 we see the polka from round 1 but we shouldn't unlock! */ // timeout of propose - _, _ = <-cs1.NewStepCh(), <-timeoutChan + <-timeoutProposeCh // finish prevote - _, _ = <-voteChan, <-cs1.NewStepCh() + <-voteCh // we should prevote what we're locked on validatePrevote(t, cs1, 2, vss[0], propBlockHash) @@ -875,190 +788,138 @@ func TestLockPOLSafety1(t *testing.T) { log.Warn("Done adding prevotes!") - ensureNoNewStep(t, cs1) + // ensureNoNewStep(t, cs1) + // TODO: subscribe to NewStep ... + } // 4 vals. -// polka P1 at R1, P2 at R2, and P3 at R3, -// we lock on P1 at R1, don't see P2, and unlock using P3 at R3 -// then we should make sure we don't lock using P2 +// polka P0 at R0, P1 at R1, and P2 at R2, +// we lock on P0 at R0, don't see P1, and unlock using P2 at R2 +// then we should make sure we don't lock using P1 + +// What we want: +// dont see P0, lock on P1 at R1, dont unlock using P0 at R2 func TestLockPOLSafety2(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - cs1.newStepCh = make(chan *RoundState) // so it blocks - - timeoutChan := make(chan *types.EventDataRoundState) - voteChan := make(chan *types.EventDataVote) - evsw := events.NewEventSwitch() - evsw.OnStart() - evsw.AddListenerForEvent("tester", types.EventStringTimeoutPropose(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringTimeoutWait(), func(data types.EventData) { - timeoutChan <- data.(*types.EventDataRoundState) - }) - evsw.AddListenerForEvent("tester", types.EventStringVote(), func(data types.EventData) { - vote := data.(*types.EventDataVote) - // we only fire for our own votes - if bytes.Equal(cs1.privValidator.Address, vote.Address) { - voteChan <- vote - } - }) - cs1.SetFireable(evsw) - // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _, _ = <-cs1.NewStepCh(), <-voteChan, <-cs1.NewStepCh() + proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) + timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) + timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) + newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + unlockCh := subscribeToEvent(cs1, types.EventStringUnlock()) + voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) - theBlockHash := cs1.ProposalBlock.Hash() + // the block for R0: gets polkad but we miss it + // (even though we signed it, shhh) + _, propBlock0 := decideProposal(cs1, vss[0], cs1.Height, cs1.Round) + propBlockHash0 := propBlock0.Hash() + propBlockParts0 := propBlock0.MakePartSet() - donePrecommit := make(chan struct{}) - go func() { - <-voteChan - <-cs1.NewStepCh() - donePrecommit <- struct{}{} - }() - signAddVoteToFromMany(types.VoteTypePrevote, cs1, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), cs2, cs3, cs4) - <-donePrecommit + // the others sign a polka but we don't see it + prevotes := signVoteMany(types.VoteTypePrevote, propBlockHash0, propBlockParts0.Header(), cs2, cs3, cs4) - // the proposed block should now be locked and our precommit added - validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) + // the block for round 1 + prop1, propBlock1 := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) + propBlockHash1 := propBlock1.Hash() + propBlockParts1 := propBlock1.MakePartSet() - donePrecommitWait := make(chan struct{}) - go func() { - <-cs1.NewStepCh() - donePrecommitWait <- struct{}{} - }() - // add precommits from the rest - signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs4) - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) - <-donePrecommitWait + incrementRound(cs2, cs3, cs4) - // before we time out into new round, set next proposer - // and next proposal block - _, v1 := cs1.Validators.GetByAddress(vss[0].Address) - v1.VotingPower = 1 - if updated := cs1.Validators.Update(v1); !updated { - t.Fatal("failed to update validator") - } + cs1.updateRoundStep(0, RoundStepPrecommitWait) - prop, propBlock := decideProposal(cs1, cs2, cs2.Height, cs2.Round+1) - - incrementRound(cs2, cs3, cs4) + log.Notice("### ONTO Round 1") + // jump in at round 1 + height := cs1.Height + startTestRound(cs1, height, 1) + <-newRoundCh - // timeout to new round - <-timeoutChan + cs1.SetProposalAndBlock(prop1, propBlock1, propBlockParts1, "some peer") + <-proposalCh - log.Info("### ONTO Round 2") - /*Round2 - // we timeout and prevote our lock - // a polka happened but we didn't see it! - */ + <-voteCh // prevote - // now we're on a new round and not the proposer, so wait for timeout - _, _ = <-cs1.NewStepCh(), <-timeoutChan - // go to prevote, prevote for locked block - _, _ = <-voteChan, <-cs1.NewStepCh() - validatePrevote(t, cs1, 0, vss[0], cs1.LockedBlock.Hash()) + signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlockHash1, propBlockParts1.Header(), cs2, cs3, cs4) - // the others sign a polka but we don't see it - prevotes := signVoteMany(types.VoteTypePrevote, propBlock.Hash(), propBlock.MakePartSet().Header(), cs2, cs3, cs4) + <-voteCh // precommit + // the proposed block should now be locked and our precommit added + validatePrecommit(t, cs1, 1, 1, vss[0], propBlockHash1, propBlockHash1) - // once we see prevotes for the next round we'll skip ahead + // add precommits from the rest + signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs4) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, propBlockHash1, propBlockParts1.Header()) incrementRound(cs2, cs3, cs4) - log.Info("### ONTO Round 3") - /*Round3 - a polka for nil causes us to unlock - */ + // timeout of precommit wait to new round + <-timeoutWaitCh - // these prevotes will send us straight to precommit at the higher round - donePrecommit = make(chan struct{}) - go func() { - select { - case <-cs1.NewStepCh(): // we're in PrevoteWait, go to Precommit - <-voteChan - case <-voteChan: // we went straight to Precommit - } - <-cs1.NewStepCh() - donePrecommit <- struct{}{} - }() - // now lets add prevotes from everyone else for nil - signAddVoteToFromMany(types.VoteTypePrevote, cs1, nil, types.PartSetHeader{}, cs2, cs3, cs4) - <-donePrecommit - - // we should have unlocked - // NOTE: we don't lock on nil, so LockedRound is still 0 - validatePrecommit(t, cs1, 2, 0, vss[0], nil, nil) - - donePrecommitWait = make(chan struct{}) - go func() { - // the votes will bring us to new round right away - // we should timeut of it and go to prevote - <-cs1.NewStepCh() - // set the proposal block to be that which got a polka in R2 - cs1.mtx.Lock() - cs1.Proposal, cs1.ProposalBlock, cs1.ProposalBlockParts = prop, propBlock, propBlock.MakePartSet() - cs1.mtx.Unlock() - // timeout into prevote, finish prevote - _, _, _ = <-timeoutChan, <-voteChan, <-cs1.NewStepCh() - donePrecommitWait <- struct{}{} - }() - signAddVoteToFromMany(types.VoteTypePrecommit, cs1, nil, types.PartSetHeader{}, cs2, cs3) - <-donePrecommitWait + // in round 2 we see the polkad block from round 0 + newProp := types.NewProposal(height, 2, propBlockParts0.Header(), 0) + if err := cs3.SignProposal(chainID, newProp); err != nil { + t.Fatal(err) + } + cs1.SetProposalAndBlock(newProp, propBlock0, propBlockParts0, "some peer") + addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) // add the pol votes - log.Info("### ONTO ROUND 4") - /*Round4 - we see the polka from R2 - make sure we don't lock because of it! + <-newRoundCh + log.Notice("### ONTO Round 2") + /*Round2 + // now we see the polka from round 1, but we shouldnt unlock */ - // new round and not proposer - // (we already timed out and stepped into prevote) - - log.Warn("adding prevotes from round 2") - - addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) - log.Warn("Done adding prevotes!") - - // we should prevote it now - validatePrevote(t, cs1, 3, vss[0], cs1.ProposalBlock.Hash()) + select { + case <-timeoutProposeCh: + <-proposalCh + case <-proposalCh: + } - // but we shouldn't precommit it - precommits := cs1.Votes.Precommits(3) - vote := precommits.GetByIndex(0) - if vote != nil { - t.Fatal("validator precommitted at round 4 based on an old polka") + select { + case <-unlockCh: + t.Fatal("validator unlocked using an old polka") + case <-voteCh: + // prevote our locked block } + validatePrevote(t, cs1, 2, vss[0], propBlockHash1) + } //------------------------------------------------------------------------------------------ // SlashingSuite +// TODO: Slashing +/* func TestSlashingPrevotes(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - cs1.newStepCh = make(chan *RoundState) // so it blocks + + proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) + timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) + newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _ = <-cs1.NewStepCh(), <-cs1.NewStepCh() + startTestRound(cs1, cs1.Height, 0) + <-newRoundCh + re := <-proposalCh + <-voteCh // prevote + + rs := re.(*types.EventDataRoundState) // we should now be stuck in limbo forever, waiting for more prevotes // add one for a different block should cause us to go into prevote wait hash := cs1.ProposalBlock.Hash() hash[0] = byte(hash[0]+1) % 255 - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlockParts.Header()) - // pass prevote wait - <-cs1.NewStepCh() + <-timeoutWaitCh // NOTE: we have to send the vote for different block first so we don't just go into precommit round right // away and ignore more prevotes (and thus fail to slash!) // add the conflicting vote - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header()) // XXX: Check for existence of Dupeout info } @@ -1066,35 +927,38 @@ func TestSlashingPrevotes(t *testing.T) { func TestSlashingPrecommits(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - cs1.newStepCh = make(chan *RoundState) // so it blocks + + proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) + timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) + newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _ = <-cs1.NewStepCh(), <-cs1.NewStepCh() + startTestRound(cs1, cs1.Height, 0) + <-newRoundCh + re := <-proposalCh + <-voteCh // prevote // add prevote from cs2 - signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header()) - // wait to finish precommit - <-cs1.NewStepCh() + <-voteCh // precommit // we should now be stuck in limbo forever, waiting for more prevotes // add one for a different block should cause us to go into prevote wait - hash := cs1.ProposalBlock.Hash() + hash := rs.ProposalBlock.Hash() hash[0] = byte(hash[0]+1) % 255 - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, cs1.ProposalBlockParts.Header()) - - // pass prevote wait - <-cs1.NewStepCh() + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, hash, rs.ProposalBlockParts.Header()) // NOTE: we have to send the vote for different block first so we don't just go into precommit round right // away and ignore more prevotes (and thus fail to slash!) // add precommit from cs2 - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header()) // XXX: Check for existence of Dupeout info } +*/ //------------------------------------------------------------------------------------------ // CatchupSuite @@ -1107,71 +971,61 @@ func TestSlashingPrecommits(t *testing.T) { func TestHalt1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - cs1.newStepCh = make(chan *RoundState) // so it blocks - timeoutChan := make(chan struct{}) - evsw := events.NewEventSwitch() - evsw.OnStart() - evsw.AddListenerForEvent("tester", types.EventStringTimeoutWait(), func(data types.EventData) { - timeoutChan <- struct{}{} - }) - cs1.SetFireable(evsw) + proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) + timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) + newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + newBlockCh := subscribeToEvent(cs1, types.EventStringNewBlock()) + voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote - go cs1.EnterNewRound(cs1.Height, 0) - _, _ = <-cs1.NewStepCh(), <-cs1.NewStepCh() + startTestRound(cs1, cs1.Height, 0) + <-newRoundCh + re := <-proposalCh + rs := re.(*types.EventDataRoundState) + propBlock := rs.ProposalBlock + propBlockParts := propBlock.MakePartSet() - theBlockHash := cs1.ProposalBlock.Hash() + <-voteCh // prevote - donePrecommit := make(chan struct{}) - go func() { - <-cs1.NewStepCh() - donePrecommit <- struct{}{} - }() - signAddVoteToFromMany(types.VoteTypePrevote, cs1, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header(), cs3, cs4) - <-donePrecommit + signAddVoteToFromMany(types.VoteTypePrevote, cs1, propBlock.Hash(), propBlockParts.Header(), cs3, cs4) + <-voteCh // precommit // the proposed block should now be locked and our precommit added - validatePrecommit(t, cs1, 0, 0, vss[0], theBlockHash, theBlockHash) + validatePrecommit(t, cs1, 0, 0, vss[0], propBlock.Hash(), propBlock.Hash()) - donePrecommitWait := make(chan struct{}) - go func() { - <-cs1.NewStepCh() - donePrecommitWait <- struct{}{} - }() // add precommits from the rest signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs2, nil, types.PartSetHeader{}) // didnt receive proposal - signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) + signAddVoteToFrom(types.VoteTypePrecommit, cs1, cs3, propBlock.Hash(), propBlockParts.Header()) // we receive this later, but cs3 might receive it earlier and with ours will go to commit! - precommit4 := signVote(cs4, types.VoteTypePrecommit, cs1.ProposalBlock.Hash(), cs1.ProposalBlockParts.Header()) - <-donePrecommitWait + precommit4 := signVote(cs4, types.VoteTypePrecommit, propBlock.Hash(), propBlockParts.Header()) incrementRound(cs2, cs3, cs4) // timeout to new round - <-timeoutChan + <-timeoutWaitCh + re = <-newRoundCh + rs = re.(*types.EventDataRoundState) - log.Info("### ONTO ROUND 2") + log.Notice("### ONTO ROUND 1") /*Round2 // we timeout and prevote our lock // a polka happened but we didn't see it! */ // go to prevote, prevote for locked block - _, _ = <-cs1.NewStepCh(), <-cs1.NewStepCh() - validatePrevote(t, cs1, 0, vss[0], cs1.LockedBlock.Hash()) + <-voteCh // prevote + validatePrevote(t, cs1, 0, vss[0], rs.LockedBlock.Hash()) // now we receive the precommit from the previous round addVoteToFrom(cs1, cs4, precommit4) // receiving that precommit should take us straight to commit - ensureNewStep(t, cs1) - log.Warn("done enter commit!") - - // update to state - ensureNewStep(t, cs1) + <-newBlockCh + re = <-newRoundCh + rs = re.(*types.EventDataRoundState) - if cs1.Height != 2 { + if rs.Height != 2 { t.Fatal("expected height to increment") } } diff --git a/types/block.go b/types/block.go index 752d6a562..de0eb2caf 100644 --- a/types/block.go +++ b/types/block.go @@ -62,8 +62,12 @@ func (b *Block) ValidateBasic(chainID string, lastBlockHeight int, lastBlockHash } func (b *Block) FillHeader() { - b.LastValidationHash = b.LastValidation.Hash() - b.DataHash = b.Data.Hash() + if b.LastValidationHash == nil { + b.LastValidationHash = b.LastValidation.Hash() + } + if b.DataHash == nil { + b.DataHash = b.Data.Hash() + } } // Computes and returns the block hash. From 4483971776ec29bdbe88197f5008b293e147a7be Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 13 Dec 2015 19:30:15 -0500 Subject: [PATCH 3/5] conR uses events to trigger newstep & hasvote broadcasts --- consensus/common_test.go | 22 +++++---- consensus/reactor.go | 99 ++++++++++++++++++++++------------------ consensus/state.go | 72 +++++++++++++++-------------- consensus/state_test.go | 13 ++---- types/events.go | 26 +++++++---- 5 files changed, 127 insertions(+), 105 deletions(-) diff --git a/consensus/common_test.go b/consensus/common_test.go index 40609c6e9..a9da1cdb6 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -78,6 +78,7 @@ func decideProposal(cs1 *ConsensusState, cs2 *validatorStub, height, round int) //------------------------------------------------------------------------------- // utils +/* func nilRound(t *testing.T, cs1 *ConsensusState, vss ...*validatorStub) { cs1.mtx.Lock() height, round := cs1.Height, cs1.Round @@ -93,6 +94,7 @@ func nilRound(t *testing.T, cs1 *ConsensusState, vss ...*validatorStub) { waitFor(t, cs1, height, round+1, RoundStepNewRound) } +*/ // NOTE: this switches the propser as far as `perspectiveOf` is concerned, // but for simplicity we return a block it generated. @@ -172,6 +174,17 @@ func signAddVoteToFrom(voteType byte, to *ConsensusState, from *validatorStub, h return vote } +func ensureNoNewStep(stepCh chan interface{}) { + timeout := time.NewTicker(ensureTimeout * time.Second) + select { + case <-timeout.C: + break + case <-stepCh: + panic("We should be stuck waiting for more votes, not moving to the next step") + } +} + +/* func ensureNoNewStep(t *testing.T, cs *ConsensusState) { timeout := time.NewTicker(ensureTimeout * time.Second) select { @@ -202,6 +215,7 @@ func waitFor(t *testing.T, cs *ConsensusState, height int, round int, step Round } } } +*/ func incrementHeight(vss ...*validatorStub) { for _, vs := range vss { @@ -309,17 +323,9 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { cs := NewConsensusState(state, proxyAppCtxCon, blockStore, mempool) cs.SetPrivValidator(privVals[0]) - // from the updateToState in NewConsensusState - <-cs.NewStepCh() - evsw := events.NewEventSwitch() cs.SetFireable(evsw) evsw.OnStart() - go func() { - for { - <-cs.NewStepCh() - } - }() // start the transition routines // cs.startRoutines() diff --git a/consensus/reactor.go b/consensus/reactor.go index 7b6100a4a..ab2fb4913 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -50,13 +50,17 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockSto func (conR *ConsensusReactor) OnStart() error { log.Notice("ConsensusReactor ", "fastSync", conR.fastSync) conR.BaseReactor.OnStart() + + // callbacks for broadcasting new steps and votes to peers + // upon their respective events (ie. uses evsw) + conR.registerEventCallbacks() + if !conR.fastSync { _, err := conR.conS.Start() if err != nil { return err } } - go conR.broadcastNewRoundStepRoutine() return nil } @@ -132,7 +136,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // NOTE: We process these messages even when we're fast_syncing. // Messages affect either a peer state or the consensus state. // Peer state updates can happen in parallel, but processing of -// proposals, block parts, and votes are ordered. +// proposals, block parts, and votes are ordered by the receiveRoutine func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) { if !conR.IsRunning() { log.Debug("Receive", "channel", chID, "peer", peer, "bytes", msgBytes) @@ -211,6 +215,47 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte } } +// Sets our private validator account for signing votes. +func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) { + conR.conS.SetPrivValidator(priv) +} + +// implements events.Eventable +func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { + conR.evsw = evsw + conR.conS.SetFireable(evsw) +} + +//-------------------------------------- + +// Listens for new steps and votes, +// broadcasting the result to peers +func (conR *ConsensusReactor) registerEventCallbacks() { + // XXX: should we change SetFireable to just use EventSwitch so we don't need these assertions? + evsw := conR.evsw.(*events.EventSwitch) + + evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) { + rs := data.(*types.EventDataRoundState) + conR.broadcastNewRoundStep(rs) + }) + + evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { + edv := data.(*types.EventDataVote) + conR.broadcastHasVoteMessage(edv.Vote, edv.Index) + }) +} + +func (conR *ConsensusReactor) broadcastNewRoundStep(rs *types.EventDataRoundState) { + + nrsMsg, csMsg := makeRoundStepMessages(rs) + if nrsMsg != nil { + conR.Switch.Broadcast(StateChannel, nrsMsg) + } + if csMsg != nil { + conR.Switch.Broadcast(StateChannel, csMsg) + } +} + // Broadcasts HasVoteMessage to peers that care. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) { msg := &HasVoteMessage{ @@ -237,62 +282,28 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index in */ } -// Sets our private validator account for signing votes. -func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) { - conR.conS.SetPrivValidator(priv) -} - -// implements events.Eventable -func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { - conR.evsw = evsw - conR.conS.SetFireable(evsw) -} - -//-------------------------------------- - -func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { +func makeRoundStepMessages(rs *types.EventDataRoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { + step := RoundStepType(rs.Step) nrsMsg = &NewRoundStepMessage{ Height: rs.Height, Round: rs.Round, - Step: rs.Step, + Step: step, SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()), - LastCommitRound: rs.LastCommit.Round(), + LastCommitRound: rs.LastCommitRound, } - if rs.Step == RoundStepCommit { + if step == RoundStepCommit { csMsg = &CommitStepMessage{ Height: rs.Height, - BlockPartsHeader: rs.ProposalBlockParts.Header(), - BlockParts: rs.ProposalBlockParts.BitArray(), + BlockPartsHeader: rs.BlockPartsHeader, + BlockParts: rs.BlockParts, } } return } -// Listens for changes to the ConsensusState.Step by pulling -// on conR.conS.NewStepCh(). -func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() { - for { - // Get RoundState with new Step or quit. - var rs *RoundState - select { - case rs = <-conR.conS.NewStepCh(): - case <-conR.Quit: - return - } - - nrsMsg, csMsg := makeRoundStepMessages(rs) - if nrsMsg != nil { - conR.Switch.Broadcast(StateChannel, nrsMsg) - } - if csMsg != nil { - conR.Switch.Broadcast(StateChannel, csMsg) - } - } -} - func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) { rs := conR.conS.GetRoundState() - nrsMsg, csMsg := makeRoundStepMessages(rs) + nrsMsg, csMsg := makeRoundStepMessages(rs.RoundStateEvent()) if nrsMsg != nil { peer.Send(StateChannel, nrsMsg) } diff --git a/consensus/state.go b/consensus/state.go index 4f4723615..168f2e4ed 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -98,18 +98,26 @@ type RoundState struct { } func (rs *RoundState) RoundStateEvent() *types.EventDataRoundState { + var header types.PartSetHeader + var parts *BitArray + if rs.ProposalBlockParts != nil { + header = rs.ProposalBlockParts.Header() + parts = rs.ProposalBlockParts.BitArray() + } return &types.EventDataRoundState{ - CurrentTime: time.Now(), - Height: rs.Height, - Round: rs.Round, - Step: rs.Step.String(), - StartTime: rs.StartTime, - CommitTime: rs.CommitTime, - Proposal: rs.Proposal, - ProposalBlock: rs.ProposalBlock, - LockedRound: rs.LockedRound, - LockedBlock: rs.LockedBlock, - POLRound: rs.Votes.POLRound(), + CurrentTime: time.Now(), + Height: rs.Height, + Round: rs.Round, + Step: int(rs.Step), + StartTime: rs.StartTime, + CommitTime: rs.CommitTime, + Proposal: rs.Proposal, + ProposalBlock: rs.ProposalBlock, + LockedRound: rs.LockedRound, + LockedBlock: rs.LockedBlock, + POLRound: rs.Votes.POLRound(), + BlockPartsHeader: header, + BlockParts: parts, } } @@ -183,7 +191,6 @@ type ConsensusState struct { blockStore *bc.BlockStore mempool *mempl.Mempool privValidator *types.PrivValidator - newStepCh chan *RoundState mtx sync.Mutex RoundState @@ -208,7 +215,6 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore proxyAppCtx: proxyAppCtx, blockStore: blockStore, mempool: mempool, - newStepCh: make(chan *RoundState, 10), peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), timeoutTicker: new(time.Ticker), @@ -258,10 +264,6 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { cs.privValidator = priv } -func (cs *ConsensusState) NewStepCh() chan *RoundState { - return cs.newStepCh -} - func (cs *ConsensusState) OnStart() error { cs.BaseService.OnStart() @@ -466,7 +468,10 @@ func (cs *ConsensusState) updateToState(state *sm.State) { func (cs *ConsensusState) newStep() { cs.nSteps += 1 - cs.newStepCh <- cs.getRoundState() + // newStep is called by updateToStep in NewConsensusState before the evsw is set! + if cs.evsw != nil { + cs.evsw.FireEvent(types.EventStringNewRoundStep(), cs.RoundStateEvent()) + } } //----------------------------------------- @@ -529,7 +534,7 @@ func (cs *ConsensusState) stopTimer() { // receiveRoutine handles messages which may cause state transitions. // it's argument (n) is the number of messages to process before exiting - use 0 to run forever // It keeps the RoundState and is the only thing that updates it. -// Updates happen on timeouts, complete proposals, and 2/3 majorities +// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities func (cs *ConsensusState) receiveRoutine(maxSteps int) { for { if maxSteps > 0 { @@ -579,19 +584,17 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature // if the vote gives us a 2/3-any or 2/3-one, we transition - added, err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey) + err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey) if err == ErrAddingVote { // TODO: punish peer } - if added { - // If rs.Height == vote.Height && rs.Round < vote.Round, - // the peer is sending us CatchupCommit precommits. - // We could make note of this and help filter in broadcastHasVoteMessage(). + // NOTE: the vote is broadcast to peers by the reactor listening + // for vote events - // XXX TODO: do this - // conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex) - } + // TODO: If rs.Height == vote.Height && rs.Round < vote.Round, + // the peer is sending us CatchupCommit precommits. + // We could make note of this and help filter in broadcastHasVoteMessage(). default: log.Warn("Unknown msg type", reflect.TypeOf(msg)) } @@ -1233,14 +1236,14 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (ad } // Attempt to add the vote. if its a duplicate signature, dupeout the validator -func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey string) (bool, error) { - added, _, err := cs.addVote(valIndex, vote, peerKey) +func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey string) error { + _, _, err := cs.addVote(valIndex, vote, peerKey) if err != nil { // If the vote height is off, we'll just ignore it, // But if it's a conflicting sig, broadcast evidence tx for slashing. // If it's otherwise invalid, punish peer. if err == ErrVoteHeightMismatch { - return added, err + return err } else if _, ok := err.(*types.ErrVoteConflictingSignature); ok { log.Warn("Found conflicting vote. Publish evidence") /* TODO @@ -1251,14 +1254,14 @@ func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey str } cs.mempool.BroadcastTx(evidenceTx) // shouldn't need to check returned err */ - return added, err + return err } else { // Probably an invalid signature. Bad peer. log.Warn("Error attempting to add vote", "error", err) - return added, ErrAddingVote + return ErrAddingVote } } - return added, nil + return nil } //----------------------------------------------------------------------------- @@ -1408,9 +1411,8 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part } vote, err := cs.signVote(type_, hash, header) if err == nil { - // NOTE: store our index in the cs so we don't have to do this every time + // TODO: store our index in the cs so we don't have to do this every time valIndex, _ := cs.Validators.GetByAddress(cs.privValidator.Address) - // _, _, err := cs.addVote(valIndex, vote, "") cs.sendInternalMessage(msgInfo{&VoteMessage{valIndex, vote}, ""}) log.Notice("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) return vote diff --git a/consensus/state_test.go b/consensus/state_test.go index 13619965a..459af95e7 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -316,8 +316,7 @@ func TestFullRound2(t *testing.T) { // LockSuite // two validators, 4 rounds. -// val1 proposes the first 2 rounds, and is locked in the first. -// val2 proposes the next two. val1 should precommit nil on all (except first where he locks) +// two vals take turns proposing. val1 locks on first one, precommits nil on everything else func TestLockNoPOL(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] @@ -431,10 +430,8 @@ func TestLockNoPOL(t *testing.T) { <-voteCh // prevote - // TODO: is the round right?! - validatePrevote(t, cs1, 0, vss[0], rs.LockedBlock.Hash()) + validatePrevote(t, cs1, 2, vss[0], rs.LockedBlock.Hash()) - // TODO: quick fastforward to new round, set proposer signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) <-voteCh @@ -783,14 +780,14 @@ func TestLockPOLSafety1(t *testing.T) { // we should prevote what we're locked on validatePrevote(t, cs1, 2, vss[0], propBlockHash) + newStepCh := subscribeToEvent(cs1, types.EventStringNewRoundStep()) + // add prevotes from the earlier round addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) log.Warn("Done adding prevotes!") - // ensureNoNewStep(t, cs1) - // TODO: subscribe to NewStep ... - + ensureNoNewStep(newStepCh) } // 4 vals. diff --git a/types/events.go b/types/events.go index 3b1598414..b046e9656 100644 --- a/types/events.go +++ b/types/events.go @@ -3,6 +3,7 @@ package types import ( "time" + "github.com/tendermint/go-common" "github.com/tendermint/go-wire" ) @@ -17,6 +18,7 @@ func EventStringFork() string { return "Fork" } func EventStringNewBlock() string { return "NewBlock" } func EventStringNewRound() string { return "NewRound" } +func EventStringNewRoundStep() string { return "NewRoundStep" } func EventStringTimeoutPropose() string { return "TimeoutPropose" } func EventStringCompleteProposal() string { return "CompleteProposal" } func EventStringPolka() string { return "Polka" } @@ -77,16 +79,20 @@ type EventDataApp struct { type EventDataRoundState struct { CurrentTime time.Time `json:"current_time"` - Height int `json:"height"` - Round int `json:"round"` - Step string `json:"step"` - StartTime time.Time `json:"start_time"` - CommitTime time.Time `json:"commit_time"` - Proposal *Proposal `json:"proposal"` - ProposalBlock *Block `json:"proposal_block"` - LockedRound int `json:"locked_round"` - LockedBlock *Block `json:"locked_block"` - POLRound int `json:"pol_round"` + Height int `json:"height"` + Round int `json:"round"` + Step int `json:"step"` + LastCommitRound int `json:"last_commit_round"` + StartTime time.Time `json:"start_time"` + CommitTime time.Time `json:"commit_time"` + Proposal *Proposal `json:"proposal"` + ProposalBlock *Block `json:"proposal_block"` + LockedRound int `json:"locked_round"` + LockedBlock *Block `json:"locked_block"` + POLRound int `json:"pol_round"` + + BlockPartsHeader PartSetHeader `json:"block_parts_header"` + BlockParts *common.BitArray `json:"block_parts"` } type EventDataVote struct { From 261647a0123c30449f7ba28db3cac95eeec8c3c7 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 13 Dec 2015 19:33:05 -0500 Subject: [PATCH 4/5] Enter* -> enter*. Comments/fixes from Jae --- consensus/common_test.go | 17 +---- consensus/state.go | 155 ++++++++++++++++++++------------------- consensus/state_test.go | 132 ++++++++++++++++++--------------- events/events.go | 10 +++ rpc/server/handlers.go | 19 ++++- 5 files changed, 182 insertions(+), 151 deletions(-) diff --git a/consensus/common_test.go b/consensus/common_test.go index a9da1cdb6..2650b03f3 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -325,7 +325,7 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { evsw := events.NewEventSwitch() cs.SetFireable(evsw) - evsw.OnStart() + evsw.Start() // start the transition routines // cs.startRoutines() @@ -339,19 +339,8 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { return cs, vss } -func subscribeToEvent(cs *ConsensusState, eventID string) chan interface{} { - evsw := cs.evsw.(*events.EventSwitch) - // listen for new round - ch := make(chan interface{}, 10) - evsw.AddListenerForEvent("tester", eventID, func(data types.EventData) { - ch <- data - }) - return ch -} - func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { - - voteCh0 := subscribeToEvent(cs, types.EventStringVote()) + voteCh0 := cs.evsw.(*events.EventSwitch).SubscribeToEvent(types.EventStringVote(), 0) voteCh := make(chan interface{}) go func() { for { @@ -395,6 +384,6 @@ func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.G } func startTestRound(cs *ConsensusState, height, round int) { - cs.EnterNewRound(height, round) + cs.enterNewRound(height, round) cs.startRoutines(0) } diff --git a/consensus/state.go b/consensus/state.go index 168f2e4ed..eb9b59030 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -351,11 +351,11 @@ func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) { cs.Step = step } -// EnterNewRound(height, 0) at cs.StartTime. +// enterNewRound(height, 0) at cs.StartTime. func (cs *ConsensusState) scheduleRound0(height int) { //log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime) sleepDuration := cs.StartTime.Sub(time.Now()) - cs.scheduleTimeout(sleepDuration, height, 0, 1) + cs.scheduleTimeout(sleepDuration, height, 0, RoundStepNewHeight) } // Attempt to schedule a timeout by sending timeoutInfo on the tickChan. @@ -367,11 +367,14 @@ func (cs *ConsensusState) scheduleTimeout(duration time.Duration, height, round // send a msg into the receiveRoutine regarding our own proposal, block part, or vote func (cs *ConsensusState) sendInternalMessage(mi msgInfo) { - timeout := time.After(10 * time.Millisecond) select { case cs.internalMsgQueue <- mi: - case <-timeout: - log.Debug("Timed out trying to send an internal messge. Launching go-routine") + default: + // NOTE: using the go-routine means our votes can + // be processed out of order. + // TODO: use CList here for strict determinism and + // attempt push to internalMsgQueue in receiveRoutine + log.Debug("Internal msg queue is full. Using a go-routine") go func() { cs.internalMsgQueue <- mi }() } } @@ -578,8 +581,8 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { // once proposal is set, we can receive block parts err = cs.setProposal(msg.Proposal) case *BlockPartMessage: - // if the proposal is complete, we'll EnterPrevote or tryFinalizeCommit - // if we're the only validator, the EnterPrevote may take us through to the next round + // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit + // if we're the only validator, the enterPrevote may take us through to the next round _, err = cs.addProposalBlockPart(msg.Height, msg.Part) case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature @@ -618,18 +621,18 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { switch ti.step { case RoundStepNewHeight: - // NewRound event fired from EnterNewRound. + // NewRound event fired from enterNewRound. // Do we want a timeout event too? - cs.EnterNewRound(ti.height, 0) + cs.enterNewRound(ti.height, 0) case RoundStepPropose: cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent()) - cs.EnterPrevote(ti.height, ti.round) + cs.enterPrevote(ti.height, ti.round) case RoundStepPrevoteWait: cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.EnterPrecommit(ti.height, ti.round) + cs.enterPrecommit(ti.height, ti.round) case RoundStepPrecommitWait: cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.EnterNewRound(ti.height, ti.round+1) + cs.enterNewRound(ti.height, ti.round+1) default: panic(Fmt("Invalid timeout step: %v", ti.step)) } @@ -646,9 +649,9 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { // Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1) // Enter: `startTime = commitTime+timeoutCommit` from NewHeight(height) // NOTE: cs.StartTime was already set for height. -func (cs *ConsensusState) EnterNewRound(height int, round int) { +func (cs *ConsensusState) enterNewRound(height int, round int) { if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != RoundStepNewHeight) { - log.Debug(Fmt("EnterNewRound(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterNewRound(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } @@ -657,7 +660,7 @@ func (cs *ConsensusState) EnterNewRound(height int, round int) { } // cs.stopTimer() - log.Notice(Fmt("EnterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Notice(Fmt("enterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) // Increment validators if necessary validators := cs.Validators @@ -684,23 +687,23 @@ func (cs *ConsensusState) EnterNewRound(height int, round int) { cs.evsw.FireEvent(types.EventStringNewRound(), cs.RoundStateEvent()) - // Immediately go to EnterPropose. - cs.EnterPropose(height, round) + // Immediately go to enterPropose. + cs.enterPropose(height, round) } // Enter: from NewRound(height,round). -func (cs *ConsensusState) EnterPropose(height int, round int) { +func (cs *ConsensusState) enterPropose(height int, round int) { // cs.mtx.Lock() // cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPropose <= cs.Step) { - log.Debug(Fmt("EnterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } - log.Info(Fmt("EnterPropose(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPropose(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) defer func() { - // Done EnterPropose: + // Done enterPropose: cs.updateRoundStep(round, RoundStepPropose) cs.newStep() }() @@ -714,17 +717,17 @@ func (cs *ConsensusState) EnterPropose(height int, round int) { } if !bytes.Equal(cs.Validators.Proposer().Address, cs.privValidator.Address) { - log.Info("EnterPropose: Not our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) + log.Info("enterPropose: Not our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) } else { - log.Info("EnterPropose: Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) + log.Info("enterPropose: Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) cs.decideProposal(height, round) } // If we have the whole proposal + POL, then goto Prevote now. - // else, we'll EnterPrevote when the rest of the proposal is received (in AddProposalBlockPart), + // else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart), // or else after timeoutPropose if cs.isProposalComplete() { - cs.EnterPrevote(height, cs.Round) + cs.enterPrevote(height, cs.Round) } } @@ -764,7 +767,7 @@ func (cs *ConsensusState) decideProposal(height, round int) { log.Notice("Signed and sent proposal", "height", height, "round", round, "proposal", proposal) log.Debug(Fmt("Signed and sent proposal block: %v", block)) } else { - log.Warn("EnterPropose: Error signing proposal", "height", height, "round", round, "error", err) + log.Warn("enterPropose: Error signing proposal", "height", height, "round", round, "error", err) } } @@ -799,7 +802,7 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts validation = cs.LastCommit.MakeValidation() } else { // This shouldn't happen. - log.Error("EnterPropose: Cannot propose anything: No validation for the previous block.") + log.Error("enterPropose: Cannot propose anything: No validation for the previous block.") return } @@ -838,16 +841,16 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts // Enter: any +2/3 prevotes for future round. // Prevote for LockedBlock if we're locked, or ProposalBlock if valid. // Otherwise vote nil. -func (cs *ConsensusState) EnterPrevote(height int, round int) { +func (cs *ConsensusState) enterPrevote(height int, round int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevote <= cs.Step) { - log.Debug(Fmt("EnterPrevote(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPrevote(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } defer func() { - // Done EnterPrevote: + // Done enterPrevote: cs.updateRoundStep(round, RoundStepPrevote) cs.newStep() }() @@ -862,7 +865,7 @@ func (cs *ConsensusState) EnterPrevote(height int, round int) { // cs.stopTimer() - log.Info(Fmt("EnterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) // Sign and broadcast vote as necessary cs.doPrevote(height, round) @@ -874,14 +877,14 @@ func (cs *ConsensusState) EnterPrevote(height int, round int) { func (cs *ConsensusState) doPrevote(height int, round int) { // If a block is locked, prevote that. if cs.LockedBlock != nil { - log.Info("EnterPrevote: Block was locked") + log.Info("enterPrevote: Block was locked") cs.signAddVote(types.VoteTypePrevote, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header()) return } // If ProposalBlock is nil, prevote nil. if cs.ProposalBlock == nil { - log.Warn("EnterPrevote: ProposalBlock is nil") + log.Warn("enterPrevote: ProposalBlock is nil") cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{}) return } @@ -890,7 +893,7 @@ func (cs *ConsensusState) doPrevote(height int, round int) { err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts) if err != nil { // ProposalBlock is invalid, prevote nil. - log.Warn("EnterPrevote: ProposalBlock is invalid", "error", err) + log.Warn("enterPrevote: ProposalBlock is invalid", "error", err) cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{}) return } @@ -903,25 +906,25 @@ func (cs *ConsensusState) doPrevote(height int, round int) { } // Enter: any +2/3 prevotes at next round. -func (cs *ConsensusState) EnterPrevoteWait(height int, round int) { +func (cs *ConsensusState) enterPrevoteWait(height int, round int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevoteWait <= cs.Step) { - log.Debug(Fmt("EnterPrevoteWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPrevoteWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } if !cs.Votes.Prevotes(round).HasTwoThirdsAny() { - PanicSanity(Fmt("EnterPrevoteWait(%v/%v), but Prevotes does not have any +2/3 votes", height, round)) + PanicSanity(Fmt("enterPrevoteWait(%v/%v), but Prevotes does not have any +2/3 votes", height, round)) } - log.Info(Fmt("EnterPrevoteWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPrevoteWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) defer func() { - // Done EnterPrevoteWait: + // Done enterPrevoteWait: cs.updateRoundStep(round, RoundStepPrevoteWait) cs.newStep() }() - // After `timeoutPrevote0+timeoutPrevoteDelta*round`, EnterPrecommit() + // After `timeoutPrevote0+timeoutPrevoteDelta*round`, enterPrecommit() cs.scheduleTimeout(timeoutPrevote0+timeoutPrevoteDelta*time.Duration(round), height, round, RoundStepPrevoteWait) } @@ -931,20 +934,20 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) { // Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round) // else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil, // else, precommit nil otherwise. -func (cs *ConsensusState) EnterPrecommit(height int, round int) { +func (cs *ConsensusState) enterPrecommit(height int, round int) { //cs.mtx.Lock() // defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommit <= cs.Step) { - log.Debug(Fmt("EnterPrecommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPrecommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } // cs.stopTimer() - log.Info(Fmt("EnterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) defer func() { - // Done EnterPrecommit: + // Done enterPrecommit: cs.updateRoundStep(round, RoundStepPrecommit) cs.newStep() }() @@ -954,9 +957,9 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { // If we don't have a polka, we must precommit nil if !ok { if cs.LockedBlock != nil { - log.Info("EnterPrecommit: No +2/3 prevotes during EnterPrecommit while we're locked. Precommitting nil") + log.Info("enterPrecommit: No +2/3 prevotes during enterPrecommit while we're locked. Precommitting nil") } else { - log.Info("EnterPrecommit: No +2/3 prevotes during EnterPrecommit. Precommitting nil.") + log.Info("enterPrecommit: No +2/3 prevotes during enterPrecommit. Precommitting nil.") } cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{}) return @@ -973,9 +976,9 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { // +2/3 prevoted nil. Unlock and precommit nil. if len(hash) == 0 { if cs.LockedBlock == nil { - log.Info("EnterPrecommit: +2/3 prevoted for nil.") + log.Info("enterPrecommit: +2/3 prevoted for nil.") } else { - log.Info("EnterPrecommit: +2/3 prevoted for nil. Unlocking") + log.Info("enterPrecommit: +2/3 prevoted for nil. Unlocking") cs.LockedRound = 0 cs.LockedBlock = nil cs.LockedBlockParts = nil @@ -989,7 +992,7 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { // If we're already locked on that block, precommit it, and update the LockedRound if cs.LockedBlock.HashesTo(hash) { - log.Info("EnterPrecommit: +2/3 prevoted locked block. Relocking") + log.Info("enterPrecommit: +2/3 prevoted locked block. Relocking") cs.LockedRound = round cs.evsw.FireEvent(types.EventStringRelock(), cs.RoundStateEvent()) cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader) @@ -998,10 +1001,10 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { // If +2/3 prevoted for proposal block, stage and precommit it if cs.ProposalBlock.HashesTo(hash) { - log.Info("EnterPrecommit: +2/3 prevoted proposal block. Locking", "hash", hash) + log.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", hash) // Validate the block. if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil { - PanicConsensus(Fmt("EnterPrecommit: +2/3 prevoted for an invalid block: %v", err)) + PanicConsensus(Fmt("enterPrecommit: +2/3 prevoted for an invalid block: %v", err)) } cs.LockedRound = round cs.LockedBlock = cs.ProposalBlock @@ -1028,41 +1031,41 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { } // Enter: any +2/3 precommits for next round. -func (cs *ConsensusState) EnterPrecommitWait(height int, round int) { +func (cs *ConsensusState) enterPrecommitWait(height int, round int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommitWait <= cs.Step) { - log.Debug(Fmt("EnterPrecommitWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterPrecommitWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return } if !cs.Votes.Precommits(round).HasTwoThirdsAny() { - PanicSanity(Fmt("EnterPrecommitWait(%v/%v), but Precommits does not have any +2/3 votes", height, round)) + PanicSanity(Fmt("enterPrecommitWait(%v/%v), but Precommits does not have any +2/3 votes", height, round)) } - log.Info(Fmt("EnterPrecommitWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterPrecommitWait(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) defer func() { - // Done EnterPrecommitWait: + // Done enterPrecommitWait: cs.updateRoundStep(round, RoundStepPrecommitWait) cs.newStep() }() - // After `timeoutPrecommit0+timeoutPrecommitDelta*round`, EnterNewRound() + // After `timeoutPrecommit0+timeoutPrecommitDelta*round`, enterNewRound() cs.scheduleTimeout(timeoutPrecommit0+timeoutPrecommitDelta*time.Duration(round), height, round, RoundStepPrecommitWait) } // Enter: +2/3 precommits for block -func (cs *ConsensusState) EnterCommit(height int, commitRound int) { +func (cs *ConsensusState) enterCommit(height int, commitRound int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || RoundStepCommit <= cs.Step { - log.Debug(Fmt("EnterCommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("enterCommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) return } - log.Info(Fmt("EnterCommit(%v/%v). Current: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) + log.Info(Fmt("enterCommit(%v/%v). Current: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) defer func() { - // Done Entercommit: + // Done enterCommit: // keep ca.Round the same, it points to the right Precommits set. cs.updateRoundStep(cs.Round, RoundStepCommit) cs.CommitRound = commitRound @@ -1198,7 +1201,7 @@ func (cs *ConsensusState) setProposal(proposal *types.Proposal) error { } // NOTE: block is not necessarily valid. -// This can trigger us to go into EnterPrevote asynchronously (before we timeout of propose) or to attempt to commit +// This can trigger us to go into enterPrevote asynchronously (before we timeout of propose) or to attempt to commit func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (added bool, err error) { //cs.mtx.Lock() //defer cs.mtx.Unlock() @@ -1225,7 +1228,7 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (ad log.Info("Received complete proposal", "hash", cs.ProposalBlock.Hash(), "round", cs.Proposal.Round) if cs.Step == RoundStepPropose && cs.isProposalComplete() { // Move onto the next step - cs.EnterPrevote(height, cs.Round) + cs.enterPrevote(height, cs.Round) } else if cs.Step == RoundStepCommit { // If we're waiting on the proposal block... cs.tryFinalizeCommit(height) @@ -1299,7 +1302,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string // First, unlock if prevotes is a valid POL. // >> lockRound < POLRound <= unlockOrChangeLockRound (see spec) // NOTE: If (lockRound < POLRound) but !(POLRound <= unlockOrChangeLockRound), - // we'll still EnterNewRound(H,vote.R) and EnterPrecommit(H,vote.R) to process it + // we'll still enterNewRound(H,vote.R) and enterPrecommit(H,vote.R) to process it // there. if (cs.LockedBlock != nil) && (cs.LockedRound < vote.Round) && (vote.Round <= cs.Round) { hash, _, ok := prevotes.TwoThirdsMajority() @@ -1313,17 +1316,17 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string } if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() { // Round-skip over to PrevoteWait or goto Precommit. - cs.EnterNewRound(height, vote.Round) // if the vote is ahead of us + cs.enterNewRound(height, vote.Round) // if the vote is ahead of us if prevotes.HasTwoThirdsMajority() { - cs.EnterPrecommit(height, vote.Round) + cs.enterPrecommit(height, vote.Round) } else { - cs.EnterPrevote(height, vote.Round) // if the vote is ahead of us - cs.EnterPrevoteWait(height, vote.Round) + cs.enterPrevote(height, vote.Round) // if the vote is ahead of us + cs.enterPrevoteWait(height, vote.Round) } } else if cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round { // If the proposal is now complete, enter prevote of cs.Round. if cs.isProposalComplete() { - cs.EnterPrevote(height, cs.Round) + cs.enterPrevote(height, cs.Round) } } case types.VoteTypePrecommit: @@ -1332,16 +1335,16 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string hash, _, ok := precommits.TwoThirdsMajority() if ok { if len(hash) == 0 { - cs.EnterNewRound(height, vote.Round+1) + cs.enterNewRound(height, vote.Round+1) } else { - cs.EnterNewRound(height, vote.Round) - cs.EnterPrecommit(height, vote.Round) - cs.EnterCommit(height, vote.Round) + cs.enterNewRound(height, vote.Round) + cs.enterPrecommit(height, vote.Round) + cs.enterCommit(height, vote.Round) } } else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() { - cs.EnterNewRound(height, vote.Round) - cs.EnterPrecommit(height, vote.Round) - cs.EnterPrecommitWait(height, vote.Round) + cs.enterNewRound(height, vote.Round) + cs.enterPrecommit(height, vote.Round) + cs.enterPrecommitWait(height, vote.Round) //}() } default: diff --git a/consensus/state_test.go b/consensus/state_test.go index 459af95e7..d4917ac39 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -8,7 +8,7 @@ import ( _ "github.com/tendermint/tendermint/config/tendermint_test" //"github.com/tendermint/tendermint/events" - // "github.com/tendermint/tendermint/events" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/types" ) @@ -54,8 +54,9 @@ func TestProposerSelection0(t *testing.T) { cs1, vss := simpleConsensusState(4) height, round := cs1.Height, cs1.Round - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) + evsw := cs1.evsw.(*events.EventSwitch) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) startTestRound(cs1, height, round) @@ -87,7 +88,8 @@ func TestProposerSelection0(t *testing.T) { func TestProposerSelection2(t *testing.T) { cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + evsw := cs1.evsw.(*events.EventSwitch) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) // this time we jump in at round 2 incrementRound(vss[1:]...) @@ -118,8 +120,9 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { cs.SetPrivValidator(nil) height, round := cs.Height, cs.Round + evsw := cs.evsw.(*events.EventSwitch) // Listen for propose timeout event - timeoutCh := subscribeToEvent(cs, types.EventStringTimeoutPropose()) + timeoutCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) startTestRound(cs, height, round) @@ -143,11 +146,11 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { height, round := cs.Height, cs.Round // Listen for propose timeout event - timeoutCh := subscribeToEvent(cs, types.EventStringTimeoutPropose()) - proposalCh := subscribeToEvent(cs, types.EventStringCompleteProposal()) + evsw := cs.evsw.(*events.EventSwitch) + timeoutCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - //startTestRound(cs, height, round) - cs.EnterNewRound(height, round) + cs.enterNewRound(height, round) cs.startRoutines(3) <-proposalCh @@ -164,7 +167,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { t.Error("rs.ProposalBlockParts should be set") } - // if we're a validator, EnterPropose should not timeout + // if we're a validator, enterPropose should not timeout ticker := time.NewTicker(timeoutPropose * 2) select { case <-timeoutCh: @@ -179,8 +182,9 @@ func TestBadProposal(t *testing.T) { height, round := cs1.Height, cs1.Round cs2 := vss[1] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - voteCh := subscribeToEvent(cs1, types.EventStringVote()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) propBlock, _ := cs1.createProposalBlock() //changeProposer(t, cs1, cs2) @@ -234,9 +238,10 @@ func TestFullRound1(t *testing.T) { cs, vss := simpleConsensusState(1) height, round := cs.Height, cs.Round - voteCh := subscribeToEvent(cs, types.EventStringVote()) - propCh := subscribeToEvent(cs, types.EventStringCompleteProposal()) - newRoundCh := subscribeToEvent(cs, types.EventStringNewRound()) + evsw := cs.evsw.(*events.EventSwitch) + voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) + propCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) startTestRound(cs, height, round) @@ -262,9 +267,10 @@ func TestFullRoundNil(t *testing.T) { cs, vss := simpleConsensusState(1) height, round := cs.Height, cs.Round - voteCh := subscribeToEvent(cs, types.EventStringVote()) + evsw := cs.evsw.(*events.EventSwitch) + voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) - cs.EnterPrevote(height, round) + cs.enterPrevote(height, round) cs.startRoutines(4) <-voteCh // prevote @@ -281,8 +287,9 @@ func TestFullRound2(t *testing.T) { cs2 := vss[1] height, round := cs1.Height, cs1.Round - voteCh := subscribeToEvent(cs1, types.EventStringVote()) - newBlockCh := subscribeToEvent(cs1, types.EventStringNewBlock()) + evsw := cs1.evsw.(*events.EventSwitch) + voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) + newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0) // start round and wait for propose and prevote startTestRound(cs1, height, round) @@ -322,18 +329,19 @@ func TestLockNoPOL(t *testing.T) { cs2 := vss[1] height := cs1.Height - timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - voteCh := subscribeToEvent(cs1, types.EventStringVote()) - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + evsw := cs1.evsw.(*events.EventSwitch) + timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) + voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) /* Round1 (cs1, B) // B B // B B2 */ // start round and wait for prevote - cs1.EnterNewRound(height, 0) + cs1.enterNewRound(height, 0) cs1.startRoutines(0) re := <-proposalCh @@ -362,7 +370,7 @@ func TestLockNoPOL(t *testing.T) { <-voteCh // precommit // (note we're entering precommit for a second time this round) - // but with invalid args. then we EnterPrecommitWait, and the timeout to new round + // but with invalid args. then we enterPrecommitWait, and the timeout to new round <-timeoutWaitCh /// @@ -409,7 +417,7 @@ func TestLockNoPOL(t *testing.T) { <-voteCh // (note we're entering precommit for a second time this round, but with invalid args - // then we EnterPrecommitWait and timeout into NewRound + // then we enterPrecommitWait and timeout into NewRound <-timeoutWaitCh <-newRoundCh @@ -462,6 +470,7 @@ func TestLockNoPOL(t *testing.T) { // so set the proposal block cs1.SetProposalAndBlock(prop, propBlock, propBlock.MakePartSet(), "") + <-proposalCh <-voteCh // prevote // prevote for locked block (not proposal) @@ -483,12 +492,13 @@ func TestLockPOLRelock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - voteCh := subscribeToEvent(cs1, types.EventStringVote()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) - newBlockCh := subscribeToEvent(cs1, types.EventStringNewBlock()) + evsw := cs1.evsw.(*events.EventSwitch) + timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0) log.Debug("cs2 last round", "lr", cs2.PrivValidator.LastRound) @@ -591,11 +601,12 @@ func TestLockPOLUnlock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) - unlockCh := subscribeToEvent(cs1, types.EventStringUnlock()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + unlockCh := evsw.SubscribeToEvent(types.EventStringUnlock(), 0) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // everything done from perspective of cs1 @@ -682,10 +693,11 @@ func TestLockPOLSafety1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote @@ -780,7 +792,7 @@ func TestLockPOLSafety1(t *testing.T) { // we should prevote what we're locked on validatePrevote(t, cs1, 2, vss[0], propBlockHash) - newStepCh := subscribeToEvent(cs1, types.EventStringNewRoundStep()) + newStepCh := evsw.SubscribeToEvent(types.EventStringNewRoundStep(), 0) // add prevotes from the earlier round addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) @@ -801,11 +813,12 @@ func TestLockPOLSafety2(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - timeoutProposeCh := subscribeToEvent(cs1, types.EventStringTimeoutPropose()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) - unlockCh := subscribeToEvent(cs1, types.EventStringUnlock()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + unlockCh := evsw.SubscribeToEvent(types.EventStringUnlock(), 0) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // the block for R0: gets polkad but we miss it @@ -891,9 +904,10 @@ func TestSlashingPrevotes(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal() , 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait() , 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound() , 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote @@ -925,9 +939,10 @@ func TestSlashingPrecommits(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal() , 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait() , 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound() , 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote @@ -969,10 +984,11 @@ func TestHalt1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - proposalCh := subscribeToEvent(cs1, types.EventStringCompleteProposal()) - timeoutWaitCh := subscribeToEvent(cs1, types.EventStringTimeoutWait()) - newRoundCh := subscribeToEvent(cs1, types.EventStringNewRound()) - newBlockCh := subscribeToEvent(cs1, types.EventStringNewBlock()) + evsw := cs1.evsw.(*events.EventSwitch) + proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) + newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote diff --git a/events/events.go b/events/events.go index 05f7115f6..e70d39620 100644 --- a/events/events.go +++ b/events/events.go @@ -123,6 +123,16 @@ func (evsw *EventSwitch) FireEvent(event string, data types.EventData) { eventCell.FireEvent(data) } +func (evsw *EventSwitch) SubscribeToEvent(eventID string, chanCap int) chan interface{} { + // listen for new round + ch := make(chan interface{}, chanCap) + evsw.AddListenerForEvent("tester", eventID, func(data types.EventData) { + // NOTE: in production, evsw callbacks should be nonblocking. + ch <- data + }) + return ch +} + //----------------------------------------------------------------------------- // eventCell handles keeping track of listener callbacks for a given event. diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index bdb942ffa..5d004fc5c 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -206,7 +206,7 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { // rpc.websocket const ( - writeChanCapacity = 20 + writeChanCapacity = 1000 wsWriteTimeoutSeconds = 30 // each write times out after this wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. @@ -287,7 +287,7 @@ func (wsc *WSConnection) readTimeoutRoutine() { } } -// Attempt to write response to writeChan and record failures +// Blocking write to writeChan until service stops. func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) { select { case wsc.writeChan <- resp: @@ -297,6 +297,18 @@ func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) { } } +// Nonblocking write. +func (wsc *WSConnection) tryWriteRPCResponse(resp RPCResponse) bool { + select { + case <-wsc.Quit: + return false + case wsc.writeChan <- resp: + return true + default: + return false + } +} + // Read from the socket and subscribe to or unsubscribe from events func (wsc *WSConnection) readRoutine() { // Do not close writeChan, to allow writeRPCResponse() to fail. @@ -339,8 +351,9 @@ func (wsc *WSConnection) readRoutine() { } else { log.Notice("Subscribe to event", "id", wsc.id, "event", event) wsc.evsw.AddListenerForEvent(wsc.id, event, func(msg types.EventData) { + // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsc.writeRPCResponse(NewRPCResponse(request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) + wsc.tryWriteRPCResponse(NewRPCResponse(request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) }) continue } From b9e143d95651b6d5930c2579e2ab82ebf8fd7dc7 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 14 Dec 2015 00:38:19 -0500 Subject: [PATCH 5/5] Fireable -> EventSwitch; rs in EventDataRoundState; fixes from review --- blockchain/reactor.go | 4 +- consensus/common_test.go | 4 +- consensus/reactor.go | 26 ++++--- consensus/state.go | 59 ++++++---------- consensus/state_test.go | 148 ++++++++++++++++++--------------------- events/events.go | 6 +- mempool/reactor.go | 4 +- node/node.go | 6 +- state/state.go | 5 +- types/events.go | 35 ++++----- 10 files changed, 129 insertions(+), 168 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 0a9778308..bf7a8236c 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -52,7 +52,7 @@ type BlockchainReactor struct { timeoutsCh chan string lastBlock *types.Block - evsw events.Fireable + evsw *events.EventSwitch } func NewBlockchainReactor(state *sm.State, proxyAppCtx proxy.AppContext, store *BlockStore, sync bool) *BlockchainReactor { @@ -261,7 +261,7 @@ func (bcR *BlockchainReactor) BroadcastStatusRequest() error { } // implements events.Eventable -func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) { +func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) { bcR.evsw = evsw } diff --git a/consensus/common_test.go b/consensus/common_test.go index 2650b03f3..81100adaf 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -324,7 +324,7 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { cs.SetPrivValidator(privVals[0]) evsw := events.NewEventSwitch() - cs.SetFireable(evsw) + cs.SetEventSwitch(evsw) evsw.Start() // start the transition routines @@ -340,7 +340,7 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { } func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { - voteCh0 := cs.evsw.(*events.EventSwitch).SubscribeToEvent(types.EventStringVote(), 0) + voteCh0 := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) voteCh := make(chan interface{}) go func() { for { diff --git a/consensus/reactor.go b/consensus/reactor.go index ab2fb4913..a68f917c2 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -34,7 +34,7 @@ type ConsensusReactor struct { blockStore *bc.BlockStore conS *ConsensusState fastSync bool - evsw events.Fireable + evsw *events.EventSwitch } func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor { @@ -221,9 +221,9 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) { } // implements events.Eventable -func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { +func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { conR.evsw = evsw - conR.conS.SetFireable(evsw) + conR.conS.SetEventSwitch(evsw) } //-------------------------------------- @@ -231,21 +231,19 @@ func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { // Listens for new steps and votes, // broadcasting the result to peers func (conR *ConsensusReactor) registerEventCallbacks() { - // XXX: should we change SetFireable to just use EventSwitch so we don't need these assertions? - evsw := conR.evsw.(*events.EventSwitch) - evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) { - rs := data.(*types.EventDataRoundState) + conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) { + rs := data.(*types.EventDataRoundState).RoundState().(*RoundState) conR.broadcastNewRoundStep(rs) }) - evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { + conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { edv := data.(*types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote, edv.Index) }) } -func (conR *ConsensusReactor) broadcastNewRoundStep(rs *types.EventDataRoundState) { +func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) { nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { @@ -282,20 +280,20 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index in */ } -func makeRoundStepMessages(rs *types.EventDataRoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { +func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { step := RoundStepType(rs.Step) nrsMsg = &NewRoundStepMessage{ Height: rs.Height, Round: rs.Round, Step: step, SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()), - LastCommitRound: rs.LastCommitRound, + LastCommitRound: rs.LastCommit.Round(), } if step == RoundStepCommit { csMsg = &CommitStepMessage{ Height: rs.Height, - BlockPartsHeader: rs.BlockPartsHeader, - BlockParts: rs.BlockParts, + BlockPartsHeader: rs.ProposalBlockParts.Header(), + BlockParts: rs.ProposalBlockParts.BitArray(), } } return @@ -303,7 +301,7 @@ func makeRoundStepMessages(rs *types.EventDataRoundState) (nrsMsg *NewRoundStepM func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) { rs := conR.conS.GetRoundState() - nrsMsg, csMsg := makeRoundStepMessages(rs.RoundStateEvent()) + nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { peer.Send(StateChannel, nrsMsg) } diff --git a/consensus/state.go b/consensus/state.go index eb9b59030..bb9bf586b 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -98,27 +98,13 @@ type RoundState struct { } func (rs *RoundState) RoundStateEvent() *types.EventDataRoundState { - var header types.PartSetHeader - var parts *BitArray - if rs.ProposalBlockParts != nil { - header = rs.ProposalBlockParts.Header() - parts = rs.ProposalBlockParts.BitArray() - } - return &types.EventDataRoundState{ - CurrentTime: time.Now(), - Height: rs.Height, - Round: rs.Round, - Step: int(rs.Step), - StartTime: rs.StartTime, - CommitTime: rs.CommitTime, - Proposal: rs.Proposal, - ProposalBlock: rs.ProposalBlock, - LockedRound: rs.LockedRound, - LockedBlock: rs.LockedBlock, - POLRound: rs.Votes.POLRound(), - BlockPartsHeader: header, - BlockParts: parts, + edrs := &types.EventDataRoundState{ + Height: rs.Height, + Round: rs.Round, + Step: rs.Step.String(), } + edrs.SetRoundState(rs) + return edrs } func (rs *RoundState) String() string { @@ -204,7 +190,7 @@ type ConsensusState struct { tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine - evsw events.Fireable + evsw *events.EventSwitch evc *events.EventCache // set in stageBlock and passed into state nSteps int // used for testing to limit the number of transitions the state makes @@ -233,7 +219,7 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore // Public interface // implements events.Eventable -func (cs *ConsensusState) SetFireable(evsw events.Fireable) { +func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) { cs.evsw = evsw } @@ -641,9 +627,7 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { //----------------------------------------------------------------------------- // State functions -// Many of these functions are capitalized but are not really meant to be used -// by external code as it will cause race conditions with running timeout/receiveRoutine. -// Use AddVote, SetProposal, AddProposalBlockPart instead +// Used internally by handleTimeout and handleMsg to make state transitions // Enter: +2/3 precommits for nil at (height,round-1) // Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1) @@ -706,6 +690,13 @@ func (cs *ConsensusState) enterPropose(height int, round int) { // Done enterPropose: cs.updateRoundStep(round, RoundStepPropose) cs.newStep() + + // If we have the whole proposal + POL, then goto Prevote now. + // else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart), + // or else after timeoutPropose + if cs.isProposalComplete() { + cs.enterPrevote(height, cs.Round) + } }() // This step times out after `timeoutPropose` @@ -723,12 +714,6 @@ func (cs *ConsensusState) enterPropose(height int, round int) { cs.decideProposal(height, round) } - // If we have the whole proposal + POL, then goto Prevote now. - // else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart), - // or else after timeoutPropose - if cs.isProposalComplete() { - cs.enterPrevote(height, cs.Round) - } } func (cs *ConsensusState) decideProposal(height, round int) { @@ -1117,29 +1102,29 @@ func (cs *ConsensusState) tryFinalizeCommit(height int) { return } // go - cs.FinalizeCommit(height) + cs.finalizeCommit(height) } // Increment height and goto RoundStepNewHeight -func (cs *ConsensusState) FinalizeCommit(height int) { +func (cs *ConsensusState) finalizeCommit(height int) { //cs.mtx.Lock() //defer cs.mtx.Unlock() if cs.Height != height || cs.Step != RoundStepCommit { - log.Debug(Fmt("FinalizeCommit(%v): Invalid args. Current step: %v/%v/%v", height, cs.Height, cs.Round, cs.Step)) + log.Debug(Fmt("finalizeCommit(%v): Invalid args. Current step: %v/%v/%v", height, cs.Height, cs.Round, cs.Step)) return } hash, header, ok := cs.Votes.Precommits(cs.CommitRound).TwoThirdsMajority() if !ok { - PanicSanity(Fmt("Cannot FinalizeCommit, commit does not have two thirds majority")) + PanicSanity(Fmt("Cannot finalizeCommit, commit does not have two thirds majority")) } if !cs.ProposalBlockParts.HasHeader(header) { PanicSanity(Fmt("Expected ProposalBlockParts header to be commit header")) } if !cs.ProposalBlock.HashesTo(hash) { - PanicSanity(Fmt("Cannot FinalizeCommit, ProposalBlock does not hash to commit hash")) + PanicSanity(Fmt("Cannot finalizeCommit, ProposalBlock does not hash to commit hash")) } if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil { PanicConsensus(Fmt("+2/3 committed an invalid block: %v", err)) @@ -1378,7 +1363,7 @@ func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartS // Create a copy of the state for staging stateCopy := cs.state.Copy() - stateCopy.SetFireable(cs.evc) + stateCopy.SetEventCache(cs.evc) // Run the block on the State: // + update validator sets diff --git a/consensus/state_test.go b/consensus/state_test.go index d4917ac39..d6e8682a4 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -8,7 +8,6 @@ import ( _ "github.com/tendermint/tendermint/config/tendermint_test" //"github.com/tendermint/tendermint/events" - "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/types" ) @@ -54,9 +53,8 @@ func TestProposerSelection0(t *testing.T) { cs1, vss := simpleConsensusState(4) height, round := cs1.Height, cs1.Round - evsw := cs1.evsw.(*events.EventSwitch) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) startTestRound(cs1, height, round) @@ -88,8 +86,7 @@ func TestProposerSelection0(t *testing.T) { func TestProposerSelection2(t *testing.T) { cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators - evsw := cs1.evsw.(*events.EventSwitch) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) // this time we jump in at round 2 incrementRound(vss[1:]...) @@ -120,9 +117,8 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { cs.SetPrivValidator(nil) height, round := cs.Height, cs.Round - evsw := cs.evsw.(*events.EventSwitch) // Listen for propose timeout event - timeoutCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) + timeoutCh := cs.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) startTestRound(cs, height, round) @@ -146,9 +142,9 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { height, round := cs.Height, cs.Round // Listen for propose timeout event - evsw := cs.evsw.(*events.EventSwitch) - timeoutCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) + + timeoutCh := cs.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + proposalCh := cs.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) cs.enterNewRound(height, round) cs.startRoutines(3) @@ -182,9 +178,8 @@ func TestBadProposal(t *testing.T) { height, round := cs1.Height, cs1.Round cs2 := vss[1] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) propBlock, _ := cs1.createProposalBlock() //changeProposer(t, cs1, cs2) @@ -238,10 +233,9 @@ func TestFullRound1(t *testing.T) { cs, vss := simpleConsensusState(1) height, round := cs.Height, cs.Round - evsw := cs.evsw.(*events.EventSwitch) - voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) - propCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + propCh := cs.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + newRoundCh := cs.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) startTestRound(cs, height, round) @@ -249,7 +243,7 @@ func TestFullRound1(t *testing.T) { // grab proposal re := <-propCh - propBlockHash := re.(*types.EventDataRoundState).ProposalBlock.Hash() + propBlockHash := re.(*types.EventDataRoundState).RoundState().(*RoundState).ProposalBlock.Hash() <-voteCh // wait for prevote validatePrevote(t, cs, round, vss[0], propBlockHash) @@ -267,8 +261,7 @@ func TestFullRoundNil(t *testing.T) { cs, vss := simpleConsensusState(1) height, round := cs.Height, cs.Round - evsw := cs.evsw.(*events.EventSwitch) - voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) + voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) cs.enterPrevote(height, round) cs.startRoutines(4) @@ -287,9 +280,8 @@ func TestFullRound2(t *testing.T) { cs2 := vss[1] height, round := cs1.Height, cs1.Round - evsw := cs1.evsw.(*events.EventSwitch) - voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) - newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0) + voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) // start round and wait for propose and prevote startTestRound(cs1, height, round) @@ -329,12 +321,11 @@ func TestLockNoPOL(t *testing.T) { cs2 := vss[1] height := cs1.Height - evsw := cs1.evsw.(*events.EventSwitch) - timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) - voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) /* Round1 (cs1, B) // B B // B B2 @@ -345,7 +336,7 @@ func TestLockNoPOL(t *testing.T) { cs1.startRoutines(0) re := <-proposalCh - rs := re.(*types.EventDataRoundState) + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -385,7 +376,7 @@ func TestLockNoPOL(t *testing.T) { // now we're on a new round and not the proposer, so wait for timeout re = <-timeoutProposeCh - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) if rs.ProposalBlock != nil { t.Fatal("Expected proposal block to be nil") @@ -429,7 +420,7 @@ func TestLockNoPOL(t *testing.T) { incrementRound(cs2) re = <-proposalCh - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) // now we're on a new round and are the proposer if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) { @@ -492,13 +483,12 @@ func TestLockPOLRelock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - evsw := cs1.evsw.(*events.EventSwitch) - timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) - newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) log.Debug("cs2 last round", "lr", cs2.PrivValidator.LastRound) @@ -515,7 +505,7 @@ func TestLockPOLRelock(t *testing.T) { <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState) + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -586,7 +576,7 @@ func TestLockPOLRelock(t *testing.T) { be := <-newBlockCh b := be.(types.EventDataNewBlock) re = <-newRoundCh - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) if rs.Height != 2 { t.Fatal("Expected height to increment") } @@ -601,12 +591,11 @@ func TestLockPOLUnlock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) - unlockCh := evsw.SubscribeToEvent(types.EventStringUnlock(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + unlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringUnlock(), 0) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // everything done from perspective of cs1 @@ -621,7 +610,7 @@ func TestLockPOLUnlock(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState) + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -645,7 +634,7 @@ func TestLockPOLUnlock(t *testing.T) { // timeout to new round re = <-timeoutWaitCh - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) lockedBlockHash := rs.LockedBlock.Hash() //XXX: this isnt gauranteed to get there before the timeoutPropose ... @@ -693,18 +682,17 @@ func TestLockPOLSafety1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState) + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) propBlock := rs.ProposalBlock <-voteCh // prevote @@ -752,7 +740,7 @@ func TestLockPOLSafety1(t *testing.T) { re = <-proposalCh } - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) if rs.LockedBlock != nil { t.Fatal("we should not be locked!") @@ -792,7 +780,7 @@ func TestLockPOLSafety1(t *testing.T) { // we should prevote what we're locked on validatePrevote(t, cs1, 2, vss[0], propBlockHash) - newStepCh := evsw.SubscribeToEvent(types.EventStringNewRoundStep(), 0) + newStepCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRoundStep(), 0) // add prevotes from the earlier round addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) @@ -813,12 +801,11 @@ func TestLockPOLSafety2(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) - unlockCh := evsw.SubscribeToEvent(types.EventStringUnlock(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + unlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringUnlock(), 0) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // the block for R0: gets polkad but we miss it @@ -904,10 +891,10 @@ func TestSlashingPrevotes(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal() , 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait() , 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound() , 1) + + proposalCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringCompleteProposal() , 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringTimeoutWait() , 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringNewRound() , 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote @@ -916,7 +903,7 @@ func TestSlashingPrevotes(t *testing.T) { re := <-proposalCh <-voteCh // prevote - rs := re.(*types.EventDataRoundState) + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) // we should now be stuck in limbo forever, waiting for more prevotes // add one for a different block should cause us to go into prevote wait @@ -939,10 +926,10 @@ func TestSlashingPrecommits(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal() , 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait() , 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound() , 1) + + proposalCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringCompleteProposal() , 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringTimeoutWait() , 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringNewRound() , 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote @@ -984,18 +971,17 @@ func TestHalt1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - evsw := cs1.evsw.(*events.EventSwitch) - proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) - timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) - newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) - newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0) + proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) + newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState) + rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) propBlock := rs.ProposalBlock propBlockParts := propBlock.MakePartSet() @@ -1018,7 +1004,7 @@ func TestHalt1(t *testing.T) { // timeout to new round <-timeoutWaitCh re = <-newRoundCh - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) log.Notice("### ONTO ROUND 1") /*Round2 @@ -1036,7 +1022,7 @@ func TestHalt1(t *testing.T) { // receiving that precommit should take us straight to commit <-newBlockCh re = <-newRoundCh - rs = re.(*types.EventDataRoundState) + rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) if rs.Height != 2 { t.Fatal("expected height to increment") diff --git a/events/events.go b/events/events.go index e70d39620..647522d0a 100644 --- a/events/events.go +++ b/events/events.go @@ -10,7 +10,7 @@ import ( // reactors and other modules should export // this interface to become eventable type Eventable interface { - SetFireable(Fireable) + SetEventSwitch(evsw *EventSwitch) } // an event switch or cache implements fireable @@ -123,10 +123,10 @@ func (evsw *EventSwitch) FireEvent(event string, data types.EventData) { eventCell.FireEvent(data) } -func (evsw *EventSwitch) SubscribeToEvent(eventID string, chanCap int) chan interface{} { +func (evsw *EventSwitch) SubscribeToEvent(receiver, eventID string, chanCap int) chan interface{} { // listen for new round ch := make(chan interface{}, chanCap) - evsw.AddListenerForEvent("tester", eventID, func(data types.EventData) { + evsw.AddListenerForEvent(receiver, eventID, func(data types.EventData) { // NOTE: in production, evsw callbacks should be nonblocking. ch <- data }) diff --git a/mempool/reactor.go b/mempool/reactor.go index ed8b8b260..5dd187fdc 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -25,7 +25,7 @@ const ( type MempoolReactor struct { p2p.BaseReactor Mempool *Mempool // TODO: un-expose - evsw events.Fireable + evsw *events.EventSwitch } func NewMempoolReactor(mempool *Mempool) *MempoolReactor { @@ -135,7 +135,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { } // implements events.Eventable -func (memR *MempoolReactor) SetFireable(evsw events.Fireable) { +func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) { memR.evsw = evsw } diff --git a/node/node.go b/node/node.go index b63116355..e01f02a8e 100644 --- a/node/node.go +++ b/node/node.go @@ -101,7 +101,7 @@ func NewNode() *Node { // add the event switch to all services // they should all satisfy events.Eventable - SetFireable(eventSwitch, bcReactor, mempoolReactor, consensusReactor) + SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor) // run the profile server profileHost := config.GetString("prof_laddr") @@ -144,9 +144,9 @@ func (n *Node) Stop() { } // Add the event switch to reactors, mempool, etc. -func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) { +func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) { for _, e := range eventables { - e.SetFireable(evsw) + e.SetEventSwitch(evsw) } } diff --git a/state/state.go b/state/state.go index 32a6bf179..9523a6497 100644 --- a/state/state.go +++ b/state/state.go @@ -33,7 +33,7 @@ type State struct { LastValidators *types.ValidatorSet LastAppHash []byte - evc events.Fireable // typically an events.EventCache + evc *events.EventCache } func LoadState(db dbm.DB) *State { @@ -81,8 +81,7 @@ func (s *State) Save() { s.db.Set(stateKey, buf.Bytes()) } -// Implements events.Eventable. Typically uses events.EventCache -func (s *State) SetFireable(evc events.Fireable) { +func (s *State) SetEventCache(evc *events.EventCache) { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/types/events.go b/types/events.go index b046e9656..436e02de6 100644 --- a/types/events.go +++ b/types/events.go @@ -1,9 +1,6 @@ package types import ( - "time" - - "github.com/tendermint/go-common" "github.com/tendermint/go-wire" ) @@ -74,25 +71,21 @@ type EventDataApp struct { Data []byte `json:"bytes"` } -// We fire the most recent round state that led to the event -// (ie. NewRound will have the previous rounds state) type EventDataRoundState struct { - CurrentTime time.Time `json:"current_time"` - - Height int `json:"height"` - Round int `json:"round"` - Step int `json:"step"` - LastCommitRound int `json:"last_commit_round"` - StartTime time.Time `json:"start_time"` - CommitTime time.Time `json:"commit_time"` - Proposal *Proposal `json:"proposal"` - ProposalBlock *Block `json:"proposal_block"` - LockedRound int `json:"locked_round"` - LockedBlock *Block `json:"locked_block"` - POLRound int `json:"pol_round"` - - BlockPartsHeader PartSetHeader `json:"block_parts_header"` - BlockParts *common.BitArray `json:"block_parts"` + Height int `json:"height"` + Round int `json:"round"` + Step string `json:"step"` + + // private, not exposed to websockets + rs interface{} +} + +func (edrs *EventDataRoundState) RoundState() interface{} { + return edrs.rs +} + +func (edrs *EventDataRoundState) SetRoundState(rs interface{}) { + edrs.rs = rs } type EventDataVote struct {