From 8be32257e797db7c8cda2cb8ef7865dd4308e1e9 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 18 Jan 2016 12:18:09 -0800 Subject: [PATCH] Make subscribeToEvent have capacity 1 --- consensus/common_test.go | 14 +++++- consensus/state_test.go | 100 +++++++++++++++++++-------------------- 2 files changed, 62 insertions(+), 52 deletions(-) diff --git a/consensus/common_test.go b/consensus/common_test.go index 7effed689..c70db4203 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -9,9 +9,9 @@ import ( "time" dbm "github.com/tendermint/go-db" + "github.com/tendermint/go-events" bc "github.com/tendermint/tendermint/blockchain" _ "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/go-events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" @@ -339,7 +339,7 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { } func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { - voteCh0 := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + voteCh0 := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1) voteCh := make(chan interface{}) go func() { for { @@ -386,3 +386,13 @@ func startTestRound(cs *ConsensusState, height, round int) { cs.enterNewRound(height, round) cs.startRoutines(0) } + +// NOTE: this is blocking +func subscribeToEvent(evsw *events.EventSwitch, receiver, eventID string, chanCap int) chan interface{} { + // listen for new round + ch := make(chan interface{}, chanCap) + evsw.AddListenerForEvent(receiver, eventID, func(data events.EventData) { + ch <- data + }) + return ch +} diff --git a/consensus/state_test.go b/consensus/state_test.go index f4d8f5482..4649220f5 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -53,8 +53,8 @@ func TestProposerSelection0(t *testing.T) { cs1, vss := simpleConsensusState(4) height, round := cs1.Height, cs1.Round - newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) - proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) + proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) startTestRound(cs1, height, round) @@ -86,7 +86,7 @@ func TestProposerSelection0(t *testing.T) { func TestProposerSelection2(t *testing.T) { cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators - newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) // this time we jump in at round 2 incrementRound(vss[1:]...) @@ -118,7 +118,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { height, round := cs.Height, cs.Round // Listen for propose timeout event - timeoutCh := cs.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) + timeoutCh := subscribeToEvent(cs.evsw, "tester", types.EventStringTimeoutPropose(), 1) startTestRound(cs, height, round) @@ -143,8 +143,8 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { // Listen for propose timeout event - timeoutCh := cs.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) - proposalCh := cs.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) + timeoutCh := subscribeToEvent(cs.evsw, "tester", types.EventStringTimeoutPropose(), 1) + proposalCh := subscribeToEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), 1) cs.enterNewRound(height, round) cs.startRoutines(3) @@ -178,8 +178,8 @@ func TestBadProposal(t *testing.T) { height, round := cs1.Height, cs1.Round cs2 := vss[1] - proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) - voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) + voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1) propBlock, _ := cs1.createProposalBlock() //changeProposer(t, cs1, cs2) @@ -233,9 +233,9 @@ func TestFullRound1(t *testing.T) { cs, vss := simpleConsensusState(1) height, round := cs.Height, cs.Round - voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) - propCh := cs.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) - newRoundCh := cs.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1) + propCh := subscribeToEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), 1) + newRoundCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewRound(), 1) startTestRound(cs, height, round) @@ -261,7 +261,7 @@ func TestFullRoundNil(t *testing.T) { cs, vss := simpleConsensusState(1) height, round := cs.Height, cs.Round - voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) + voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1) cs.enterPrevote(height, round) cs.startRoutines(4) @@ -280,8 +280,8 @@ func TestFullRound2(t *testing.T) { cs2 := vss[1] height, round := cs1.Height, cs1.Round - voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) - newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) + voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1) + newBlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewBlock(), 1) // start round and wait for propose and prevote startTestRound(cs1, height, round) @@ -321,11 +321,11 @@ func TestLockNoPOL(t *testing.T) { cs2 := vss[1] height := cs1.Height - timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) - voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) - proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) - newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) + timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) + voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1) + proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) + newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) /* Round1 (cs1, B) // B B // B B2 @@ -483,12 +483,12 @@ func TestLockPOLRelock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) - proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) - voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) - newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) - newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) + timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) + timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) + proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) + voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1) + newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) + newBlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewBlock(), 1) log.Debug("cs2 last round", "lr", cs2.PrivValidator.LastRound) @@ -591,11 +591,11 @@ func TestLockPOLUnlock(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) - timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) - newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) - unlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringUnlock(), 0) + proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) + timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) + timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) + newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) + unlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringUnlock(), 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // everything done from perspective of cs1 @@ -682,10 +682,10 @@ func TestLockPOLSafety1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) - timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) - newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) + proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) + timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) + timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) + newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote @@ -780,7 +780,7 @@ func TestLockPOLSafety1(t *testing.T) { // we should prevote what we're locked on validatePrevote(t, cs1, 2, vss[0], propBlockHash) - newStepCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRoundStep(), 0) + newStepCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRoundStep(), 1) // add prevotes from the earlier round addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) @@ -801,11 +801,11 @@ func TestLockPOLSafety2(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) - timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) - timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) - newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) - unlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringUnlock(), 0) + proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) + timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) + timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) + newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) + unlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringUnlock(), 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // the block for R0: gets polkad but we miss it @@ -892,9 +892,9 @@ func TestSlashingPrevotes(t *testing.T) { cs2 := vss[1] - proposalCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringCompleteProposal() , 0) - timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringTimeoutWait() , 0) - newRoundCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringNewRound() , 1) + proposalCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringCompleteProposal() , 1) + timeoutWaitCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringTimeoutWait() , 1) + newRoundCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringNewRound() , 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote @@ -927,9 +927,9 @@ func TestSlashingPrecommits(t *testing.T) { cs2 := vss[1] - proposalCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringCompleteProposal() , 0) - timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringTimeoutWait() , 0) - newRoundCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringNewRound() , 1) + proposalCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringCompleteProposal() , 1) + timeoutWaitCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringTimeoutWait() , 1) + newRoundCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringNewRound() , 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote @@ -971,10 +971,10 @@ func TestHalt1(t *testing.T) { cs1, vss := simpleConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] - proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) - timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0) - newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) - newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) + proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) + timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) + newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) + newBlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewBlock(), 1) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) // start round and wait for propose and prevote