diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index ba69d0cc1..5ca052d09 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" cmn "github.com/tendermint/tendermint/libs/common" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -49,7 +50,7 @@ func TestByzantine(t *testing.T) { switches[i].SetLogger(p2pLogger.With("validator", i)) } - eventChans := make([]chan interface{}, N) + eventSubs := make([]*tmpubsub.Subscription, N) reactors := make([]p2p.Reactor, N) for i := 0; i < N; i++ { // make first val byzantine @@ -68,8 +69,8 @@ func TestByzantine(t *testing.T) { eventBus := css[i].eventBus eventBus.SetLogger(logger.With("module", "events", "validator", i)) - eventChans[i] = make(chan interface{}, 1) - err := eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, eventChans[i]) + var err error + eventSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock) require.NoError(t, err) conR := NewConsensusReactor(css[i], true) // so we dont start the consensus states @@ -134,7 +135,7 @@ func TestByzantine(t *testing.T) { p2p.Connect2Switches(switches, ind1, ind2) // wait for someone in the big partition (B) to make a block - <-eventChans[ind2] + <-eventSubs[ind2].Out() t.Log("A block has been committed. Healing partition") p2p.Connect2Switches(switches, ind0, ind1) @@ -146,7 +147,7 @@ func TestByzantine(t *testing.T) { wg.Add(2) for i := 1; i < N-1; i++ { go func(j int) { - <-eventChans[j] + <-eventSubs[j].Out() wg.Done() }(i) } diff --git a/consensus/common_test.go b/consensus/common_test.go index e6e64c252..49834d6c4 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -25,6 +25,7 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/privval" @@ -220,18 +221,17 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo // genesis func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { - voteCh0 := make(chan interface{}) - err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote, voteCh0) + voteCh0Sub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote) if err != nil { panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote)) } voteCh := make(chan interface{}) go func() { - for v := range voteCh0 { - vote := v.(types.EventDataVote) + for msgAndTags := range voteCh0Sub.Out() { + vote := msgAndTags.Msg.(types.EventDataVote) // we only fire for our own votes if bytes.Equal(addr, vote.Vote.ValidatorAddress) { - voteCh <- v + voteCh <- msgAndTags.Msg } } }() @@ -311,7 +311,7 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { //------------------------------------------------------------------------------- -func ensureNoNewEvent(ch <-chan interface{}, timeout time.Duration, +func ensureNoNewEvent(ch <-chan tmpubsub.MsgAndTags, timeout time.Duration, errorMessage string) { select { case <-time.After(timeout): @@ -321,28 +321,28 @@ func ensureNoNewEvent(ch <-chan interface{}, timeout time.Duration, } } -func ensureNoNewEventOnChannel(ch <-chan interface{}) { +func ensureNoNewEventOnChannel(ch <-chan tmpubsub.MsgAndTags) { ensureNoNewEvent( ch, ensureTimeout, "We should be stuck waiting, not receiving new event on the channel") } -func ensureNoNewRoundStep(stepCh <-chan interface{}) { +func ensureNoNewRoundStep(stepCh <-chan tmpubsub.MsgAndTags) { ensureNoNewEvent( stepCh, ensureTimeout, "We should be stuck waiting, not receiving NewRoundStep event") } -func ensureNoNewUnlock(unlockCh <-chan interface{}) { +func ensureNoNewUnlock(unlockCh <-chan tmpubsub.MsgAndTags) { ensureNoNewEvent( unlockCh, ensureTimeout, "We should be stuck waiting, not receiving Unlock event") } -func ensureNoNewTimeout(stepCh <-chan interface{}, timeout int64) { +func ensureNoNewTimeout(stepCh <-chan tmpubsub.MsgAndTags, timeout int64) { timeoutDuration := time.Duration(timeout*5) * time.Nanosecond ensureNoNewEvent( stepCh, @@ -351,7 +351,7 @@ func ensureNoNewTimeout(stepCh <-chan interface{}, timeout int64) { } func ensureNewEvent( - ch <-chan interface{}, + ch <-chan tmpubsub.MsgAndTags, height int64, round int, timeout time.Duration, @@ -361,7 +361,7 @@ func ensureNewEvent( case <-time.After(timeout): panic(errorMessage) case ev := <-ch: - rs, ok := ev.(types.EventDataRoundState) + rs, ok := ev.Msg.(types.EventDataRoundState) if !ok { panic( fmt.Sprintf( @@ -378,12 +378,12 @@ func ensureNewEvent( } } -func ensureNewRound(roundCh <-chan interface{}, height int64, round int) { +func ensureNewRound(roundCh <-chan tmpubsub.MsgAndTags, height int64, round int) { select { case <-time.After(ensureTimeout): panic("Timeout expired while waiting for NewRound event") case ev := <-roundCh: - rs, ok := ev.(types.EventDataNewRound) + rs, ok := ev.Msg.(types.EventDataNewRound) if !ok { panic( fmt.Sprintf( @@ -399,18 +399,18 @@ func ensureNewRound(roundCh <-chan interface{}, height int64, round int) { } } -func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, timeout int64) { +func ensureNewTimeout(timeoutCh <-chan tmpubsub.MsgAndTags, height int64, round int, timeout int64) { timeoutDuration := time.Duration(timeout*3) * time.Nanosecond ensureNewEvent(timeoutCh, height, round, timeoutDuration, "Timeout expired while waiting for NewTimeout event") } -func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) { +func ensureNewProposal(proposalCh <-chan tmpubsub.MsgAndTags, height int64, round int) { select { case <-time.After(ensureTimeout): panic("Timeout expired while waiting for NewProposal event") case ev := <-proposalCh: - rs, ok := ev.(types.EventDataCompleteProposal) + rs, ok := ev.Msg.(types.EventDataCompleteProposal) if !ok { panic( fmt.Sprintf( @@ -426,17 +426,17 @@ func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) { } } -func ensureNewValidBlock(validBlockCh <-chan interface{}, height int64, round int) { +func ensureNewValidBlock(validBlockCh <-chan tmpubsub.MsgAndTags, height int64, round int) { ensureNewEvent(validBlockCh, height, round, ensureTimeout, "Timeout expired while waiting for NewValidBlock event") } -func ensureNewBlock(blockCh <-chan interface{}, height int64) { +func ensureNewBlock(blockCh <-chan tmpubsub.MsgAndTags, height int64) { select { case <-time.After(ensureTimeout): panic("Timeout expired while waiting for NewBlock event") case ev := <-blockCh: - block, ok := ev.(types.EventDataNewBlock) + block, ok := ev.Msg.(types.EventDataNewBlock) if !ok { panic(fmt.Sprintf("expected a *types.EventDataNewBlock, "+ "got %v. wrong subscription channel?", @@ -448,12 +448,12 @@ func ensureNewBlock(blockCh <-chan interface{}, height int64) { } } -func ensureNewBlockHeader(blockCh <-chan interface{}, height int64, blockHash cmn.HexBytes) { +func ensureNewBlockHeader(blockCh <-chan tmpubsub.MsgAndTags, height int64, blockHash cmn.HexBytes) { select { case <-time.After(ensureTimeout): panic("Timeout expired while waiting for NewBlockHeader event") case ev := <-blockCh: - blockHeader, ok := ev.(types.EventDataNewBlockHeader) + blockHeader, ok := ev.Msg.(types.EventDataNewBlockHeader) if !ok { panic(fmt.Sprintf("expected a *types.EventDataNewBlockHeader, "+ "got %v. wrong subscription channel?", @@ -468,42 +468,17 @@ func ensureNewBlockHeader(blockCh <-chan interface{}, height int64, blockHash cm } } -func ensureNewUnlock(unlockCh <-chan interface{}, height int64, round int) { +func ensureNewUnlock(unlockCh <-chan tmpubsub.MsgAndTags, height int64, round int) { ensureNewEvent(unlockCh, height, round, ensureTimeout, "Timeout expired while waiting for NewUnlock event") } -func ensureVote(voteCh <-chan interface{}, height int64, round int, - voteType types.SignedMsgType) { - select { - case <-time.After(ensureTimeout): - panic("Timeout expired while waiting for NewVote event") - case v := <-voteCh: - edv, ok := v.(types.EventDataVote) - if !ok { - panic(fmt.Sprintf("expected a *types.Vote, "+ - "got %v. wrong subscription channel?", - reflect.TypeOf(v))) - } - vote := edv.Vote - if vote.Height != height { - panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height)) - } - if vote.Round != round { - panic(fmt.Sprintf("expected round %v, got %v", round, vote.Round)) - } - if vote.Type != voteType { - panic(fmt.Sprintf("expected type %v, got %v", voteType, vote.Type)) - } - } -} - -func ensureProposal(proposalCh <-chan interface{}, height int64, round int, propId types.BlockID) { +func ensureProposal(proposalCh <-chan tmpubsub.MsgAndTags, height int64, round int, propId types.BlockID) { select { case <-time.After(ensureTimeout): panic("Timeout expired while waiting for NewProposal event") case ev := <-proposalCh: - rs, ok := ev.(types.EventDataCompleteProposal) + rs, ok := ev.Msg.(types.EventDataCompleteProposal) if !ok { panic( fmt.Sprintf( @@ -530,7 +505,32 @@ func ensurePrevote(voteCh <-chan interface{}, height int64, round int) { ensureVote(voteCh, height, round, types.PrevoteType) } -func ensureNewEventOnChannel(ch <-chan interface{}) { +func ensureVote(voteCh <-chan interface{}, height int64, round int, + voteType types.SignedMsgType) { + select { + case <-time.After(ensureTimeout): + panic("Timeout expired while waiting for NewVote event") + case v := <-voteCh: + edv, ok := v.(types.EventDataVote) + if !ok { + panic(fmt.Sprintf("expected a *types.Vote, "+ + "got %v. wrong subscription channel?", + reflect.TypeOf(v))) + } + vote := edv.Vote + if vote.Height != height { + panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height)) + } + if vote.Round != round { + panic(fmt.Sprintf("expected round %v, got %v", round, vote.Round)) + } + if vote.Type != voteType { + panic(fmt.Sprintf("expected type %v, got %v", voteType, vote.Type)) + } + } +} + +func ensureNewEventOnChannel(ch <-chan tmpubsub.MsgAndTags) { select { case <-time.After(ensureTimeout): panic("Timeout expired while waiting for new activity on the channel") diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index bb4bf6eb9..c6dd3779d 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -118,7 +118,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) { ticker := time.NewTicker(time.Second * 30) select { case b := <-newBlockCh: - evt := b.(types.EventDataNewBlock) + evt := b.Msg.(types.EventDataNewBlock) nTxs += int(evt.Block.Header.NumTxs) case <-ticker.C: panic("Timed out waiting to commit blocks with transactions") diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 28e245aec..9c6df4261 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -21,6 +21,7 @@ import ( cfg "github.com/tendermint/tendermint/config" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" @@ -34,9 +35,14 @@ func init() { //---------------------------------------------- // in-process testnets -func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*ConsensusReactor, []chan interface{}, []*types.EventBus) { +func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ( + []*ConsensusReactor, + []*tmpubsub.Subscription, + []*types.EventBus, +) { + var err error reactors := make([]*ConsensusReactor, N) - eventChans := make([]chan interface{}, N) + eventSubs := make([]*tmpubsub.Subscription, N) eventBuses := make([]*types.EventBus, N) for i := 0; i < N; i++ { /*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info") @@ -48,8 +54,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus eventBuses[i] = css[i].eventBus reactors[i].SetEventBus(eventBuses[i]) - eventChans[i] = make(chan interface{}, 1) - err := eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, eventChans[i]) + eventSubs[i], err = eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock) require.NoError(t, err) } // make connected switches and start all reactors @@ -67,7 +72,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus s := reactors[i].conS.GetState() reactors[i].SwitchToConsensus(s, 0) } - return reactors, eventChans, eventBuses + return reactors, eventSubs, eventBuses } func stopConsensusNet(logger log.Logger, reactors []*ConsensusReactor, eventBuses []*types.EventBus) { @@ -87,11 +92,11 @@ func stopConsensusNet(logger log.Logger, reactors []*ConsensusReactor, eventBuse func TestReactorBasic(t *testing.T) { N := 4 css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) - reactors, eventChans, eventBuses := startConsensusNet(t, css, N) + reactors, eventSubs, eventBuses := startConsensusNet(t, css, N) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // wait till everyone makes the first new block timeoutWaitGroup(t, N, func(j int) { - <-eventChans[j] + <-eventSubs[j].Out() }, css) } @@ -163,20 +168,20 @@ func TestReactorWithEvidence(t *testing.T) { css[i] = cs } - reactors, eventChans, eventBuses := startConsensusNet(t, css, nValidators) + reactors, eventSubs, eventBuses := startConsensusNet(t, css, nValidators) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // wait till everyone makes the first new block with no evidence timeoutWaitGroup(t, nValidators, func(j int) { - blockI := <-eventChans[j] - block := blockI.(types.EventDataNewBlock).Block + blockI := <-eventSubs[j].Out() + block := blockI.Msg.(types.EventDataNewBlock).Block assert.True(t, len(block.Evidence.Evidence) == 0) }, css) // second block should have evidence timeoutWaitGroup(t, nValidators, func(j int) { - blockI := <-eventChans[j] - block := blockI.(types.EventDataNewBlock).Block + blockI := <-eventSubs[j].Out() + block := blockI.Msg.(types.EventDataNewBlock).Block assert.True(t, len(block.Evidence.Evidence) > 0) }, css) } @@ -221,7 +226,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { func(c *cfg.Config) { c.Consensus.CreateEmptyBlocks = false }) - reactors, eventChans, eventBuses := startConsensusNet(t, css, N) + reactors, eventSubs, eventBuses := startConsensusNet(t, css, N) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // send a tx @@ -231,7 +236,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { // wait till everyone makes the first new block timeoutWaitGroup(t, N, func(j int) { - <-eventChans[j] + <-eventSubs[j].Out() }, css) } @@ -239,12 +244,12 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { func TestReactorRecordsVotesAndBlockParts(t *testing.T) { N := 4 css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) - reactors, eventChans, eventBuses := startConsensusNet(t, css, N) + reactors, eventSubs, eventBuses := startConsensusNet(t, css, N) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // wait till everyone makes the first new block timeoutWaitGroup(t, N, func(j int) { - <-eventChans[j] + <-eventSubs[j].Out() }, css) // Get peer @@ -263,7 +268,7 @@ func TestReactorVotingPowerChange(t *testing.T) { nVals := 4 logger := log.TestingLogger() css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentKVStore) - reactors, eventChans, eventBuses := startConsensusNet(t, css, nVals) + reactors, eventSubs, eventBuses := startConsensusNet(t, css, nVals) defer stopConsensusNet(logger, reactors, eventBuses) // map of active validators @@ -275,7 +280,7 @@ func TestReactorVotingPowerChange(t *testing.T) { // wait till everyone makes block 1 timeoutWaitGroup(t, nVals, func(j int) { - <-eventChans[j] + <-eventSubs[j].Out() }, css) //--------------------------------------------------------------------------- @@ -286,10 +291,10 @@ func TestReactorVotingPowerChange(t *testing.T) { updateValidatorTx := kvstore.MakeValSetChangeTx(val1PubKeyABCI, 25) previousTotalVotingPower := css[0].GetRoundState().LastValidators.TotalVotingPower() - waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx) - waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx) - waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) - waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) + waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css, updateValidatorTx) + waitForAndValidateBlockWithTx(t, nVals, activeVals, eventSubs, css, updateValidatorTx) + waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css) + waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css) if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower { t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower()) @@ -298,10 +303,10 @@ func TestReactorVotingPowerChange(t *testing.T) { updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 2) previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower() - waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx) - waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx) - waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) - waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) + waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css, updateValidatorTx) + waitForAndValidateBlockWithTx(t, nVals, activeVals, eventSubs, css, updateValidatorTx) + waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css) + waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css) if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower { t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower()) @@ -310,10 +315,10 @@ func TestReactorVotingPowerChange(t *testing.T) { updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 26) previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower() - waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx) - waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx) - waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) - waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) + waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css, updateValidatorTx) + waitForAndValidateBlockWithTx(t, nVals, activeVals, eventSubs, css, updateValidatorTx) + waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css) + waitForAndValidateBlock(t, nVals, activeVals, eventSubs, css) if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower { t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower()) @@ -327,7 +332,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { logger := log.TestingLogger() - reactors, eventChans, eventBuses := startConsensusNet(t, css, nPeers) + reactors, eventSubs, eventBuses := startConsensusNet(t, css, nPeers) defer stopConsensusNet(logger, reactors, eventBuses) // map of active validators @@ -339,7 +344,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { // wait till everyone makes block 1 timeoutWaitGroup(t, nPeers, func(j int) { - <-eventChans[j] + <-eventSubs[j].Out() }, css) //--------------------------------------------------------------------------- @@ -352,22 +357,22 @@ func TestReactorValidatorSetChanges(t *testing.T) { // wait till everyone makes block 2 // ensure the commit includes all validators // send newValTx to change vals in block 3 - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx1) + waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css, newValidatorTx1) // wait till everyone makes block 3. // it includes the commit for block 2, which is by the original validator set - waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx1) + waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventSubs, css, newValidatorTx1) // wait till everyone makes block 4. // it includes the commit for block 3, which is by the original validator set - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css) // the commits for block 4 should be with the updated validator set activeVals[string(newValidatorPubKey1.Address())] = struct{}{} // wait till everyone makes block 5 // it includes the commit for block 4, which should have the updated validator set - waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) + waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventSubs, css) //--------------------------------------------------------------------------- logger.Info("---------------------------- Testing changing the voting power of one validator") @@ -377,10 +382,10 @@ func TestReactorValidatorSetChanges(t *testing.T) { updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25) previousTotalVotingPower := css[nVals].GetRoundState().LastValidators.TotalVotingPower() - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, updateValidatorTx1) - waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, updateValidatorTx1) - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) - waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) + waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css, updateValidatorTx1) + waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventSubs, css, updateValidatorTx1) + waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css) + waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventSubs, css) if css[nVals].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower { t.Errorf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[nVals].GetRoundState().LastValidators.TotalVotingPower()) @@ -397,12 +402,12 @@ func TestReactorValidatorSetChanges(t *testing.T) { newVal3ABCI := types.TM2PB.PubKey(newValidatorPubKey3) newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower) - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3) - waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3) - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css, newValidatorTx2, newValidatorTx3) + waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventSubs, css, newValidatorTx2, newValidatorTx3) + waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css) activeVals[string(newValidatorPubKey2.Address())] = struct{}{} activeVals[string(newValidatorPubKey3.Address())] = struct{}{} - waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) + waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventSubs, css) //--------------------------------------------------------------------------- logger.Info("---------------------------- Testing removing two validators at once") @@ -410,12 +415,12 @@ func TestReactorValidatorSetChanges(t *testing.T) { removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0) removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0) - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3) - waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3) - waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) + waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css, removeValidatorTx2, removeValidatorTx3) + waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventSubs, css, removeValidatorTx2, removeValidatorTx3) + waitForAndValidateBlock(t, nPeers, activeVals, eventSubs, css) delete(activeVals, string(newValidatorPubKey2.Address())) delete(activeVals, string(newValidatorPubKey3.Address())) - waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) + waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventSubs, css) } // Check we can make blocks with skip_timeout_commit=false @@ -427,23 +432,30 @@ func TestReactorWithTimeoutCommit(t *testing.T) { css[i].config.SkipTimeoutCommit = false } - reactors, eventChans, eventBuses := startConsensusNet(t, css, N-1) + reactors, eventSubs, eventBuses := startConsensusNet(t, css, N-1) defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // wait till everyone makes the first new block timeoutWaitGroup(t, N-1, func(j int) { - <-eventChans[j] + <-eventSubs[j].Out() }, css) } -func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) { +func waitForAndValidateBlock( + t *testing.T, + n int, + activeVals map[string]struct{}, + eventSubs []*tmpubsub.Subscription, + css []*ConsensusState, + txs ...[]byte, +) { timeoutWaitGroup(t, n, func(j int) { css[j].Logger.Debug("waitForAndValidateBlock") - newBlockI, ok := <-eventChans[j] + newBlockI, ok := <-eventSubs[j].Out() if !ok { return } - newBlock := newBlockI.(types.EventDataNewBlock).Block + newBlock := newBlockI.Msg.(types.EventDataNewBlock).Block css[j].Logger.Debug("waitForAndValidateBlock: Got block", "height", newBlock.Height) err := validateBlock(newBlock, activeVals) assert.Nil(t, err) @@ -454,17 +466,24 @@ func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{} }, css) } -func waitForAndValidateBlockWithTx(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) { +func waitForAndValidateBlockWithTx( + t *testing.T, + n int, + activeVals map[string]struct{}, + eventSubs []*tmpubsub.Subscription, + css []*ConsensusState, + txs ...[]byte, +) { timeoutWaitGroup(t, n, func(j int) { ntxs := 0 BLOCK_TX_LOOP: for { css[j].Logger.Debug("waitForAndValidateBlockWithTx", "ntxs", ntxs) - newBlockI, ok := <-eventChans[j] + newBlockI, ok := <-eventSubs[j].Out() if !ok { return } - newBlock := newBlockI.(types.EventDataNewBlock).Block + newBlock := newBlockI.Msg.(types.EventDataNewBlock).Block css[j].Logger.Debug("waitForAndValidateBlockWithTx: Got block", "height", newBlock.Height) err := validateBlock(newBlock, activeVals) assert.Nil(t, err) @@ -485,18 +504,24 @@ func waitForAndValidateBlockWithTx(t *testing.T, n int, activeVals map[string]st }, css) } -func waitForBlockWithUpdatedValsAndValidateIt(t *testing.T, n int, updatedVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState) { +func waitForBlockWithUpdatedValsAndValidateIt( + t *testing.T, + n int, + updatedVals map[string]struct{}, + eventSubs []*tmpubsub.Subscription, + css []*ConsensusState, +) { timeoutWaitGroup(t, n, func(j int) { var newBlock *types.Block LOOP: for { css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt") - newBlockI, ok := <-eventChans[j] + newBlockI, ok := <-eventSubs[j].Out() if !ok { return } - newBlock = newBlockI.(types.EventDataNewBlock).Block + newBlock = newBlockI.Msg.(types.EventDataNewBlock).Block if newBlock.LastCommit.Size() == len(updatedVals) { css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block", "height", newBlock.Height) break LOOP diff --git a/consensus/replay.go b/consensus/replay.go index 21fef6b29..bdf9fc931 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -17,6 +17,7 @@ import ( dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -42,7 +43,7 @@ var crc32c = crc32.MakeTable(crc32.Castagnoli) // Unmarshal and apply a single message to the consensus state as if it were // received in receiveRoutine. Lines that start with "#" are ignored. // NOTE: receiveRoutine should not be running. -func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan interface{}) error { +func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepSub *tmpubsub.Subscription) error { // Skip meta messages which exist for demarcating boundaries. if _, ok := msg.Msg.(EndHeightMessage); ok { return nil @@ -54,15 +55,17 @@ func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan cs.Logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step) // these are playback checks ticker := time.After(time.Second * 2) - if newStepCh != nil { + if newStepSub != nil { select { - case mi := <-newStepCh: - m2 := mi.(types.EventDataRoundState) + case mi := <-newStepSub.Out(): + m2 := mi.Msg.(types.EventDataRoundState) if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) } + case <-newStepSub.Cancelled(): + return fmt.Errorf("Failed to read off newStepSub.Out(). newStepSub was cancelled") case <-ticker: - return fmt.Errorf("Failed to read off newStepCh") + return fmt.Errorf("Failed to read off newStepSub.Out()") } } case msgInfo: diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 3e92bad67..fa1e472d6 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -16,6 +16,7 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -51,25 +52,13 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error { cs.startForReplay() // ensure all new step events are regenerated as expected - newStepCh := make(chan interface{}, 1) ctx := context.Background() - err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, newStepCh) + newStepSub, err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep) if err != nil { return errors.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep) } - defer func() { - // drain newStepCh to make sure we don't block - LOOP: - for { - select { - case <-newStepCh: - default: - break LOOP - } - } - cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) - }() + defer cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) // just open the file for reading, no need to use wal fp, err := os.OpenFile(file, os.O_RDONLY, 0600) @@ -94,7 +83,8 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error { return err } - if err := pb.cs.readReplayMessage(msg, newStepCh); err != nil { + // TODO: pass a subscription + if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil { return err } @@ -132,7 +122,7 @@ func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState sm.S } // go back count steps by resetting the state and running (pb.count - count) steps -func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { +func (pb *playback) replayReset(count int, newStepSub *tmpubsub.Subscription) error { pb.cs.Stop() pb.cs.Wait() @@ -162,7 +152,7 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { } else if err != nil { return err } - if err := pb.cs.readReplayMessage(msg, newStepCh); err != nil { + if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil { return err } pb.count++ @@ -226,27 +216,16 @@ func (pb *playback) replayConsoleLoop() int { ctx := context.Background() // ensure all new step events are regenerated as expected - newStepCh := make(chan interface{}, 1) - err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, newStepCh) + newStepSub, err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep) if err != nil { cmn.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)) } - defer func() { - // drain newStepCh to make sure we don't block - LOOP: - for { - select { - case <-newStepCh: - default: - break LOOP - } - } - pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) - }() + defer pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) if len(tokens) == 1 { - if err := pb.replayReset(1, newStepCh); err != nil { + // TODO: pass a subscription + if err := pb.replayReset(1, newStepSub); err != nil { pb.cs.Logger.Error("Replay reset error", "err", err) } } else { @@ -256,7 +235,8 @@ func (pb *playback) replayConsoleLoop() int { } else if i > pb.count { fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count) } else { - if err := pb.replayReset(i, newStepCh); err != nil { + // TODO: pass a subscription + if err := pb.replayReset(i, newStepSub); err != nil { pb.cs.Logger.Error("Replay reset error", "err", err) } } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index e7269254c..f0232283b 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -70,13 +70,14 @@ func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64, // in the WAL itself. Assuming the consensus state is running, replay of any // WAL, including the empty one, should eventually be followed by a new // block, or else something is wrong. - newBlockCh := make(chan interface{}, 1) - err = cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, newBlockCh) + newBlockSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock) require.NoError(t, err) select { - case <-newBlockCh: + case <-newBlockSub.Out(): + case <-newBlockSub.Cancelled(): + t.Fatal("newBlockSub was cancelled") case <-time.After(60 * time.Second): - t.Fatalf("Timed out waiting for new block (see trace above)") + t.Fatal("Timed out waiting for new block (see trace above)") } } diff --git a/consensus/state_test.go b/consensus/state_test.go index 153f51e16..1a363cffa 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -1565,11 +1565,11 @@ func TestStateOutputVoteStats(t *testing.T) { } // subscribe subscribes test client to the given query and returns a channel with cap = 1. -func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan interface{} { - out := make(chan interface{}, 1) - err := eventBus.Subscribe(context.Background(), testSubscriber, q, out) +func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan tmpubsub.MsgAndTags { + sub, err := eventBus.Subscribe(context.Background(), testSubscriber, q) if err != nil { panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, q)) } - return out + // XXX: return a subscription? + return sub.Out() } diff --git a/rpc/core/events.go b/rpc/core/events.go index e4fd20417..36536bc83 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -101,16 +101,30 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() - ch := make(chan interface{}) - err = eventBusFor(wsCtx).Subscribe(ctx, addr, q, ch) + sub, err := eventBusFor(wsCtx).Subscribe(ctx, addr, q) if err != nil { return nil, err } go func() { - for event := range ch { - tmResult := &ctypes.ResultEvent{query, event.(tmtypes.TMEventData)} - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Codec(), rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)), tmResult)) + for { + select { + case event := <-sub.Out(): + tmResult := &ctypes.ResultEvent{query, event.Msg.(tmtypes.TMEventData)} + wsCtx.TryWriteRPCResponse( + rpctypes.NewRPCSuccessResponse( + wsCtx.Codec(), + rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)), + tmResult, + )) + case <-sub.Cancelled(): + wsCtx.TryWriteRPCResponse( + rpctypes.RPCServerError(rpctypes.JSONRPCStringID( + fmt.Sprintf("%v#event", wsCtx.Request.ID)), + fmt.Errorf("subscription was cancelled (reason: %v)", sub.Err()), + )) + return + } } }() diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index ff6b029cf..16d03ae0c 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -169,26 +169,14 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // Subscribe to tx being committed in block. ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() - deliverTxResCh := make(chan interface{}, 1) q := types.EventQueryTxFor(tx) - err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh) + deliverTxSub, err := eventBus.Subscribe(ctx, "mempool", q) if err != nil { err = errors.Wrap(err, "failed to subscribe to tx") logger.Error("Error on broadcast_tx_commit", "err", err) return nil, err } - defer func() { - // drain deliverTxResCh to make sure we don't block - LOOP: - for { - select { - case <-deliverTxResCh: - default: - break LOOP - } - } - eventBus.Unsubscribe(context.Background(), "mempool", q) - }() + defer eventBus.Unsubscribe(context.Background(), "mempool", q) // Broadcast tx and wait for CheckTx result checkTxResCh := make(chan *abci.Response, 1) @@ -213,17 +201,22 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // TODO: configurable? var deliverTxTimeout = rpcserver.WriteTimeout / 2 select { - case deliverTxResMsg, ok := <-deliverTxResCh: // The tx was included in a block. - if !ok { - return nil, errors.New("Error on broadcastTxCommit: expected DeliverTxResult, got nil. Did the Tendermint stop?") - } - deliverTxRes := deliverTxResMsg.(types.EventDataTx) + case deliverTxResMsg := <-deliverTxSub.Out(): // The tx was included in a block. + deliverTxRes := deliverTxResMsg.Msg.(types.EventDataTx) return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, DeliverTx: deliverTxRes.Result, Hash: tx.Hash(), Height: deliverTxRes.Height, }, nil + case <-deliverTxSub.Cancelled(): + err = errors.New("deliverTxSub was cancelled. Did the Tendermint stop?") + logger.Error("Error on broadcastTxCommit", "err", err) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, err case <-time.After(deliverTxTimeout): err = errors.New("Timed out waiting for tx to be included in a block") logger.Error("Error on broadcastTxCommit", "err", err) diff --git a/state/execution_test.go b/state/execution_test.go index 041fb558e..1f1ee5e54 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -317,8 +317,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { defer eventBus.Stop() blockExec.SetEventBus(eventBus) - updatesCh := make(chan interface{}, 1) - err = eventBus.Subscribe(context.Background(), "TestEndBlockValidatorUpdates", types.EventQueryValidatorSetUpdates, updatesCh) + updatesSub, err := eventBus.Subscribe(context.Background(), "TestEndBlockValidatorUpdates", types.EventQueryValidatorSetUpdates) require.NoError(t, err) block := makeBlock(state, 1) @@ -342,13 +341,15 @@ func TestEndBlockValidatorUpdates(t *testing.T) { // test we threw an event select { - case e := <-updatesCh: - event, ok := e.(types.EventDataValidatorSetUpdates) + case e := <-updatesSub.Out(): + event, ok := e.Msg.(types.EventDataValidatorSetUpdates) require.True(t, ok, "Expected event of type EventDataValidatorSetUpdates, got %T", e) if assert.NotEmpty(t, event.ValidatorUpdates) { assert.Equal(t, pubkey, event.ValidatorUpdates[0].PubKey) assert.EqualValues(t, 10, event.ValidatorUpdates[0].VotingPower) } + case <-updatesSub.Cancelled(): + t.Fatal("updatesSub was cancelled.") case <-time.After(1 * time.Second): t.Fatal("Did not receive EventValidatorSetUpdates within 1 sec.") } diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index 088252f5e..c48f01bae 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -31,35 +31,44 @@ func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService // OnStart implements cmn.Service by subscribing for all transactions // and indexing them by tags. func (is *IndexerService) OnStart() error { - blockHeadersCh := make(chan interface{}) - if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryNewBlockHeader, blockHeadersCh); err != nil { + blockHeadersSub, err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryNewBlockHeader) + if err != nil { return err } - txsCh := make(chan interface{}) - if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, txsCh); err != nil { + txsSub, err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx) + if err != nil { return err } go func() { for { - e, ok := <-blockHeadersCh - if !ok { - return - } - header := e.(types.EventDataNewBlockHeader).Header - batch := NewBatch(header.NumTxs) - for i := int64(0); i < header.NumTxs; i++ { - e, ok := <-txsCh - if !ok { - is.Logger.Error("Failed to index all transactions due to closed transactions channel", "height", header.Height, "numTxs", header.NumTxs, "numProcessed", i) - return + select { + case msgAndTags := <-blockHeadersSub.Out(): + header := msgAndTags.Msg.(types.EventDataNewBlockHeader).Header + batch := NewBatch(header.NumTxs) + for i := int64(0); i < header.NumTxs; i++ { + select { + case msgAndTags := <-txsSub.Out(): + txResult := msgAndTags.Msg.(types.EventDataTx).TxResult + batch.Add(&txResult) + case <-txsSub.Cancelled(): + is.Logger.Error("Failed to index a block. txsSub was cancelled. Did the Tendermint stop?", + "err", txsSub.Err(), + "height", header.Height, + "numTxs", header.NumTxs, + "numProcessed", i, + ) + return + } } - txResult := e.(types.EventDataTx).TxResult - batch.Add(&txResult) + is.idr.AddBatch(batch) + is.Logger.Info("Indexed block", "height", header.Height) + case <-blockHeadersSub.Cancelled(): + is.Logger.Error("Failed to index a block. blockHeadersSub was cancelled. Did the Tendermint stop?", + "reason", blockHeadersSub.Err()) + return } - is.idr.AddBatch(batch) - is.Logger.Info("Indexed block", "height", header.Height) } }() return nil diff --git a/types/event_bus.go b/types/event_bus.go index 055cbd3fe..6b4e815ea 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -12,7 +12,7 @@ import ( const defaultCapacity = 0 type EventBusSubscriber interface { - Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error + Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (*tmpubsub.Subscription, error) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error UnsubscribeAll(ctx context.Context, subscriber string) error } @@ -52,8 +52,8 @@ func (b *EventBus) OnStop() { b.pubsub.Stop() } -func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error { - return b.pubsub.Subscribe(ctx, subscriber, query, out) +func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (*tmpubsub.Subscription, error) { + return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...) } func (b *EventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { diff --git a/types/event_bus_test.go b/types/event_bus_test.go index 6845927be..85f06237f 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -24,23 +24,20 @@ func TestEventBusPublishEventTx(t *testing.T) { tx := Tx("foo") result := abci.ResponseDeliverTx{Data: []byte("bar"), Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}} - txEventsCh := make(chan interface{}) - // PublishEventTx adds all these 3 tags, so the query below should work query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND baz=1", tx.Hash()) - err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh) + txEventsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query)) require.NoError(t, err) done := make(chan struct{}) go func() { - for e := range txEventsCh { - edt := e.(EventDataTx) - assert.Equal(t, int64(1), edt.Height) - assert.Equal(t, uint32(0), edt.Index) - assert.Equal(t, tx, edt.Tx) - assert.Equal(t, result, edt.Result) - close(done) - } + e := <-txEventsSub.Out() + edt := e.Msg.(EventDataTx) + assert.Equal(t, int64(1), edt.Height) + assert.Equal(t, uint32(0), edt.Index) + assert.Equal(t, tx, edt.Tx) + assert.Equal(t, result, edt.Result) + close(done) }() err = eventBus.PublishEventTx(EventDataTx{TxResult{ @@ -68,22 +65,19 @@ func TestEventBusPublishEventNewBlock(t *testing.T) { resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}} resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}} - txEventsCh := make(chan interface{}) - // PublishEventNewBlock adds the tm.event tag, so the query below should work query := "tm.event='NewBlock' AND baz=1 AND foz=2" - err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh) + txEventsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query)) require.NoError(t, err) done := make(chan struct{}) go func() { - for e := range txEventsCh { - edt := e.(EventDataNewBlock) - assert.Equal(t, block, edt.Block) - assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock) - assert.Equal(t, resultEndBlock, edt.ResultEndBlock) - close(done) - } + e := <-txEventsSub.Out() + edt := e.Msg.(EventDataNewBlock) + assert.Equal(t, block, edt.Block) + assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock) + assert.Equal(t, resultEndBlock, edt.ResultEndBlock) + close(done) }() err = eventBus.PublishEventNewBlock(EventDataNewBlock{ @@ -110,22 +104,19 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) { resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}} resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}} - txEventsCh := make(chan interface{}) - // PublishEventNewBlockHeader adds the tm.event tag, so the query below should work query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2" - err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh) + txEventsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query)) require.NoError(t, err) done := make(chan struct{}) go func() { - for e := range txEventsCh { - edt := e.(EventDataNewBlockHeader) - assert.Equal(t, block.Header, edt.Header) - assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock) - assert.Equal(t, resultEndBlock, edt.ResultEndBlock) - close(done) - } + e := <-txEventsSub.Out() + edt := e.Msg.(EventDataNewBlockHeader) + assert.Equal(t, block.Header, edt.Header) + assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock) + assert.Equal(t, resultEndBlock, edt.ResultEndBlock) + close(done) }() err = eventBus.PublishEventNewBlockHeader(EventDataNewBlockHeader{ @@ -148,18 +139,18 @@ func TestEventBusPublish(t *testing.T) { require.NoError(t, err) defer eventBus.Stop() - eventsCh := make(chan interface{}) - err = eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}, eventsCh) + eventsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}) require.NoError(t, err) const numEventsExpected = 14 done := make(chan struct{}) go func() { numEvents := 0 - for range eventsCh { + for range eventsSub.Out() { numEvents++ if numEvents >= numEventsExpected { close(done) + return } } }() @@ -243,15 +234,22 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes q := EventQueryNewBlock for i := 0; i < numClients; i++ { - ch := make(chan interface{}) - go func() { - for range ch { - } - }() if randQueries { q = randQuery() } - eventBus.Subscribe(ctx, fmt.Sprintf("client-%d", i), q, ch) + sub, err := eventBus.Subscribe(ctx, fmt.Sprintf("client-%d", i), q) + if err != nil { + b.Fatal(err) + } + go func() { + for { + select { + case <-sub.Out(): + case <-sub.Cancelled(): + return + } + } + }() } eventType := EventNewBlock