diff --git a/consensus/common_test.go b/consensus/common_test.go index cecc01509..282d09b45 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -219,18 +219,18 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo } // genesis -func subscribeToVoter(cs *ConsensusState, addr []byte) <-chan tmpubsub.MsgAndTags { +func subscribeToVoter(cs *ConsensusState, addr []byte) <-chan tmpubsub.Message { voteCh0Sub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote) if err != nil { panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote)) } - ch := make(chan tmpubsub.MsgAndTags) + ch := make(chan tmpubsub.Message) go func() { - for mt := range voteCh0Sub.Out() { - vote := mt.Msg().(types.EventDataVote) + for msg := range voteCh0Sub.Out() { + vote := msg.Data().(types.EventDataVote) // we only fire for our own votes if bytes.Equal(addr, vote.Vote.ValidatorAddress) { - ch <- mt + ch <- msg } } }() @@ -310,7 +310,7 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { //------------------------------------------------------------------------------- -func ensureNoNewEvent(ch <-chan tmpubsub.MsgAndTags, timeout time.Duration, +func ensureNoNewEvent(ch <-chan tmpubsub.Message, timeout time.Duration, errorMessage string) { select { case <-time.After(timeout): @@ -320,28 +320,28 @@ func ensureNoNewEvent(ch <-chan tmpubsub.MsgAndTags, timeout time.Duration, } } -func ensureNoNewEventOnChannel(ch <-chan tmpubsub.MsgAndTags) { +func ensureNoNewEventOnChannel(ch <-chan tmpubsub.Message) { ensureNoNewEvent( ch, ensureTimeout, "We should be stuck waiting, not receiving new event on the channel") } -func ensureNoNewRoundStep(stepCh <-chan tmpubsub.MsgAndTags) { +func ensureNoNewRoundStep(stepCh <-chan tmpubsub.Message) { ensureNoNewEvent( stepCh, ensureTimeout, "We should be stuck waiting, not receiving NewRoundStep event") } -func ensureNoNewUnlock(unlockCh <-chan tmpubsub.MsgAndTags) { +func ensureNoNewUnlock(unlockCh <-chan tmpubsub.Message) { ensureNoNewEvent( unlockCh, ensureTimeout, "We should be stuck waiting, not receiving Unlock event") } -func ensureNoNewTimeout(stepCh <-chan tmpubsub.MsgAndTags, timeout int64) { +func ensureNoNewTimeout(stepCh <-chan tmpubsub.Message, timeout int64) { timeoutDuration := time.Duration(timeout*5) * time.Nanosecond ensureNoNewEvent( stepCh, @@ -349,15 +349,15 @@ func ensureNoNewTimeout(stepCh <-chan tmpubsub.MsgAndTags, timeout int64) { "We should be stuck waiting, not receiving NewTimeout event") } -func ensureNewEvent(ch <-chan tmpubsub.MsgAndTags, height int64, round int, timeout time.Duration, errorMessage string) { +func ensureNewEvent(ch <-chan tmpubsub.Message, height int64, round int, timeout time.Duration, errorMessage string) { select { case <-time.After(timeout): panic(errorMessage) - case mt := <-ch: - roundStateEvent, ok := mt.Msg().(types.EventDataRoundState) + case msg := <-ch: + roundStateEvent, ok := msg.Data().(types.EventDataRoundState) if !ok { panic(fmt.Sprintf("expected a EventDataRoundState, got %T. Wrong subscription channel?", - mt.Msg())) + msg.Data())) } if roundStateEvent.Height != height { panic(fmt.Sprintf("expected height %v, got %v", height, roundStateEvent.Height)) @@ -369,40 +369,25 @@ func ensureNewEvent(ch <-chan tmpubsub.MsgAndTags, height int64, round int, time } } -func ensureNewRound(roundCh <-chan tmpubsub.MsgAndTags, height int64, round int) { - select { - case <-time.After(ensureTimeout): - panic("Timeout expired while waiting for NewRound event") - case mt := <-roundCh: - newRoundEvent, ok := mt.Msg().(types.EventDataNewRound) - if !ok { - panic(fmt.Sprintf("expected a EventDataNewRound, got %T. Wrong subscription channel?", - mt.Msg())) - } - if newRoundEvent.Height != height { - panic(fmt.Sprintf("expected height %v, got %v", height, newRoundEvent.Height)) - } - if newRoundEvent.Round != round { - panic(fmt.Sprintf("expected round %v, got %v", round, newRoundEvent.Round)) - } - } +func ensureNewRoundStep(stepCh <-chan tmpubsub.Message, height int64, round int) { + ensureNewEvent(stepCh, height, round, ensureTimeout, "Timeout expired while waiting for NewStep event") } -func ensureNewTimeout(timeoutCh <-chan tmpubsub.MsgAndTags, height int64, round int, timeout int64) { +func ensureNewTimeout(timeoutCh <-chan tmpubsub.Message, height int64, round int, timeout int64) { timeoutDuration := time.Duration(timeout*3) * time.Nanosecond ensureNewEvent(timeoutCh, height, round, timeoutDuration, "Timeout expired while waiting for NewTimeout event") } -func ensureNewProposal(proposalCh <-chan tmpubsub.MsgAndTags, height int64, round int) { +func ensureNewProposal(proposalCh <-chan tmpubsub.Message, height int64, round int) { select { case <-time.After(ensureTimeout): panic("Timeout expired while waiting for NewProposal event") - case mt := <-proposalCh: - proposalEvent, ok := mt.Msg().(types.EventDataCompleteProposal) + case msg := <-proposalCh: + proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal) if !ok { panic(fmt.Sprintf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?", - mt.Msg())) + msg.Data())) } if proposalEvent.Height != height { panic(fmt.Sprintf("expected height %v, got %v", height, proposalEvent.Height)) @@ -413,20 +398,20 @@ func ensureNewProposal(proposalCh <-chan tmpubsub.MsgAndTags, height int64, roun } } -func ensureNewValidBlock(validBlockCh <-chan tmpubsub.MsgAndTags, height int64, round int) { +func ensureNewValidBlock(validBlockCh <-chan tmpubsub.Message, height int64, round int) { ensureNewEvent(validBlockCh, height, round, ensureTimeout, "Timeout expired while waiting for NewValidBlock event") } -func ensureNewBlock(blockCh <-chan tmpubsub.MsgAndTags, height int64) { +func ensureNewBlock(blockCh <-chan tmpubsub.Message, height int64) { select { case <-time.After(ensureTimeout): panic("Timeout expired while waiting for NewBlock event") - case mt := <-blockCh: - blockEvent, ok := mt.Msg().(types.EventDataNewBlock) + case msg := <-blockCh: + blockEvent, ok := msg.Data().(types.EventDataNewBlock) if !ok { panic(fmt.Sprintf("expected a EventDataNewBlock, got %T. Wrong subscription channel?", - mt.Msg())) + msg.Data())) } if blockEvent.Block.Height != height { panic(fmt.Sprintf("expected height %v, got %v", height, blockEvent.Block.Height)) @@ -434,15 +419,15 @@ func ensureNewBlock(blockCh <-chan tmpubsub.MsgAndTags, height int64) { } } -func ensureNewBlockHeader(blockCh <-chan tmpubsub.MsgAndTags, height int64, blockHash cmn.HexBytes) { +func ensureNewBlockHeader(blockCh <-chan tmpubsub.Message, height int64, blockHash cmn.HexBytes) { select { case <-time.After(ensureTimeout): panic("Timeout expired while waiting for NewBlockHeader event") - case mt := <-blockCh: - blockHeaderEvent, ok := mt.Msg().(types.EventDataNewBlockHeader) + case msg := <-blockCh: + blockHeaderEvent, ok := msg.Data().(types.EventDataNewBlockHeader) if !ok { panic(fmt.Sprintf("expected a EventDataNewBlockHeader, got %T. Wrong subscription channel?", - mt.Msg())) + msg.Data())) } if blockHeaderEvent.Header.Height != height { panic(fmt.Sprintf("expected height %v, got %v", height, blockHeaderEvent.Header.Height)) @@ -453,20 +438,20 @@ func ensureNewBlockHeader(blockCh <-chan tmpubsub.MsgAndTags, height int64, bloc } } -func ensureNewUnlock(unlockCh <-chan tmpubsub.MsgAndTags, height int64, round int) { +func ensureNewUnlock(unlockCh <-chan tmpubsub.Message, height int64, round int) { ensureNewEvent(unlockCh, height, round, ensureTimeout, "Timeout expired while waiting for NewUnlock event") } -func ensureProposal(proposalCh <-chan tmpubsub.MsgAndTags, height int64, round int, propID types.BlockID) { +func ensureProposal(proposalCh <-chan tmpubsub.Message, height int64, round int, propID types.BlockID) { select { case <-time.After(ensureTimeout): panic("Timeout expired while waiting for NewProposal event") - case mt := <-proposalCh: - proposalEvent, ok := mt.Msg().(types.EventDataCompleteProposal) + case msg := <-proposalCh: + proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal) if !ok { panic(fmt.Sprintf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?", - mt.Msg())) + msg.Data())) } if proposalEvent.Height != height { panic(fmt.Sprintf("expected height %v, got %v", height, proposalEvent.Height)) @@ -480,24 +465,24 @@ func ensureProposal(proposalCh <-chan tmpubsub.MsgAndTags, height int64, round i } } -func ensurePrecommit(voteCh <-chan tmpubsub.MsgAndTags, height int64, round int) { +func ensurePrecommit(voteCh <-chan tmpubsub.Message, height int64, round int) { ensureVote(voteCh, height, round, types.PrecommitType) } -func ensurePrevote(voteCh <-chan tmpubsub.MsgAndTags, height int64, round int) { +func ensurePrevote(voteCh <-chan tmpubsub.Message, height int64, round int) { ensureVote(voteCh, height, round, types.PrevoteType) } -func ensureVote(voteCh <-chan tmpubsub.MsgAndTags, height int64, round int, +func ensureVote(voteCh <-chan tmpubsub.Message, height int64, round int, voteType types.SignedMsgType) { select { case <-time.After(ensureTimeout): panic("Timeout expired while waiting for NewVote event") - case mt := <-voteCh: - voteEvent, ok := mt.Msg().(types.EventDataVote) + case msg := <-voteCh: + voteEvent, ok := msg.Data().(types.EventDataVote) if !ok { panic(fmt.Sprintf("expected a EventDataVote, got %T. Wrong subscription channel?", - mt.Msg())) + msg.Data())) } vote := voteEvent.Vote if vote.Height != height { @@ -512,7 +497,7 @@ func ensureVote(voteCh <-chan tmpubsub.MsgAndTags, height int64, round int, } } -func ensureNewEventOnChannel(ch <-chan tmpubsub.MsgAndTags) { +func ensureNewEventOnChannel(ch <-chan tmpubsub.Message) { select { case <-time.After(ensureTimeout): panic("Timeout expired while waiting for new activity on the channel") diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 31fd6c248..e72f68e1f 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -117,8 +117,8 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) { for nTxs := 0; nTxs < NTxs; { ticker := time.NewTicker(time.Second * 30) select { - case mt := <-newBlockCh: - blockEvent := mt.Msg().(types.EventDataNewBlock) + case msg := <-newBlockCh: + blockEvent := msg.Data().(types.EventDataNewBlock) nTxs += int(blockEvent.Block.Header.NumTxs) case <-ticker.C: panic("Timed out waiting to commit blocks with transactions") diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 382ff0cc5..4b9e53d83 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -172,15 +172,15 @@ func TestReactorWithEvidence(t *testing.T) { // wait till everyone makes the first new block with no evidence timeoutWaitGroup(t, nValidators, func(j int) { - mt := <-eventSubs[j].Out() - block := mt.Msg().(types.EventDataNewBlock).Block + msg := <-eventSubs[j].Out() + block := msg.Data().(types.EventDataNewBlock).Block assert.True(t, len(block.Evidence.Evidence) == 0) }, css) // second block should have evidence timeoutWaitGroup(t, nValidators, func(j int) { - mt := <-eventSubs[j].Out() - block := mt.Msg().(types.EventDataNewBlock).Block + msg := <-eventSubs[j].Out() + block := msg.Data().(types.EventDataNewBlock).Block assert.True(t, len(block.Evidence.Evidence) > 0) }, css) } @@ -450,8 +450,8 @@ func waitForAndValidateBlock( ) { timeoutWaitGroup(t, n, func(j int) { css[j].Logger.Debug("waitForAndValidateBlock") - mt := <-eventSubs[j].Out() - newBlock := mt.Msg().(types.EventDataNewBlock).Block + msg := <-eventSubs[j].Out() + newBlock := msg.Data().(types.EventDataNewBlock).Block css[j].Logger.Debug("waitForAndValidateBlock: Got block", "height", newBlock.Height) err := validateBlock(newBlock, activeVals) assert.Nil(t, err) @@ -475,8 +475,8 @@ func waitForAndValidateBlockWithTx( BLOCK_TX_LOOP: for { css[j].Logger.Debug("waitForAndValidateBlockWithTx", "ntxs", ntxs) - mt := <-eventSubs[j].Out() - newBlock := mt.Msg().(types.EventDataNewBlock).Block + msg := <-eventSubs[j].Out() + newBlock := msg.Data().(types.EventDataNewBlock).Block css[j].Logger.Debug("waitForAndValidateBlockWithTx: Got block", "height", newBlock.Height) err := validateBlock(newBlock, activeVals) assert.Nil(t, err) @@ -510,8 +510,8 @@ func waitForBlockWithUpdatedValsAndValidateIt( LOOP: for { css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt") - mt := <-eventSubs[j].Out() - newBlock = mt.Msg().(types.EventDataNewBlock).Block + msg := <-eventSubs[j].Out() + newBlock = msg.Data().(types.EventDataNewBlock).Block if newBlock.LastCommit.Size() == len(updatedVals) { css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block", "height", newBlock.Height) break LOOP diff --git a/consensus/replay.go b/consensus/replay.go index f3f50cec4..8fcb9edd8 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -56,8 +56,8 @@ func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepSub typ ticker := time.After(time.Second * 2) if newStepSub != nil { select { - case mt := <-newStepSub.Out(): - m2 := mt.Msg().(types.EventDataRoundState) + case stepMsg := <-newStepSub.Out(): + m2 := stepMsg.Data().(types.EventDataRoundState) if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) } diff --git a/consensus/state_test.go b/consensus/state_test.go index 1a363cffa..99f5b93f4 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -1565,7 +1565,7 @@ func TestStateOutputVoteStats(t *testing.T) { } // subscribe subscribes test client to the given query and returns a channel with cap = 1. -func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan tmpubsub.MsgAndTags { +func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan tmpubsub.Message { sub, err := eventBus.Subscribe(context.Background(), testSubscriber, q) if err != nil { panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, q)) diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index f4fe7eb7f..d5318c464 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -25,8 +25,8 @@ // // for { // select { -// case msgAndTags <- subscription.Out(): -// // handle msg and tags +// case msg <- subscription.Out(): +// // handle msg.Data() and msg.Tags() // case <-subscription.Cancelled(): // return subscription.Err() // } @@ -170,7 +170,7 @@ func (s *Server) subscribe(ctx context.Context, clientID string, query Query, ou } subscription := &Subscription{ - out: make(chan MsgAndTags, outCapacity), + out: make(chan Message, outCapacity), cancelled: make(chan struct{}), } select { @@ -389,11 +389,11 @@ func (state *state) send(msg interface{}, tags TagMap) { for clientID, subscription := range clientSubscriptions { if cap(subscription.out) == 0 { // block on unbuffered channel - subscription.out <- MsgAndTags{msg, tags} + subscription.out <- Message{msg, tags} } else { // don't block on buffered channels select { - case subscription.out <- MsgAndTags{msg, tags}: + case subscription.out <- Message{msg, tags}: default: state.remove(clientID, qStr, ErrOutOfCapacity) } diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go index 63a9c6e3e..973d6ba06 100644 --- a/libs/pubsub/pubsub_test.go +++ b/libs/pubsub/pubsub_test.go @@ -303,10 +303,10 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { /// HELPERS /////////////////////////////////////////////////////////////////////////////// -func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.MsgAndTags, msgAndArgs ...interface{}) { +func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.Message, msgAndArgs ...interface{}) { select { case actual := <-ch: - assert.Equal(t, expected, actual.Msg(), msgAndArgs...) + assert.Equal(t, expected, actual.Data(), msgAndArgs...) case <-time.After(1 * time.Second): t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected) debug.PrintStack() diff --git a/libs/pubsub/subscription.go b/libs/pubsub/subscription.go index db4f00d56..e852e4906 100644 --- a/libs/pubsub/subscription.go +++ b/libs/pubsub/subscription.go @@ -20,7 +20,7 @@ var ( // 2) channel which is closed if a client is too slow or choose to unsubscribe // 3) err indicating the reason for (2) type Subscription struct { - out chan MsgAndTags + out chan Message cancelled chan struct{} mtx sync.RWMutex @@ -30,7 +30,7 @@ type Subscription struct { // Out returns a channel onto which messages and tags are published. // Unsubscribe/UnsubscribeAll does not close the channel to avoid clients from // receiving a nil message. -func (s *Subscription) Out() <-chan MsgAndTags { +func (s *Subscription) Out() <-chan Message { return s.out } @@ -53,18 +53,18 @@ func (s *Subscription) Err() error { return s.err } -// MsgAndTags glues a message and tags together. -type MsgAndTags struct { - msg interface{} +// Message glues data and tags together. +type Message struct { + data interface{} tags TagMap } -// Msg returns a message. -func (mt MsgAndTags) Msg() interface{} { - return mt.msg +// Data returns an original data published. +func (msg Message) Data() interface{} { + return msg.data } -// Tags returns tags. -func (mt MsgAndTags) Tags() TagMap { - return mt.tags +// Tags returns tags, which matched the client's query. +func (msg Message) Tags() TagMap { + return msg.tags } diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 404c5d7b5..6b205393e 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -71,8 +71,8 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type defer c.UnsubscribeAll(ctx, subscriber) select { - case mt := <-sub.Out(): - return mt.Msg().(types.TMEventData), nil + case msg := <-sub.Out(): + return msg.Data().(types.TMEventData), nil case <-sub.Cancelled(): return nil, errors.New("subscription was cancelled") case <-ctx.Done(): diff --git a/rpc/core/events.go b/rpc/core/events.go index 92a8a7799..2ab44d386 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -109,8 +109,8 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri go func() { for { select { - case mt := <-sub.Out(): - resultEvent := &ctypes.ResultEvent{query, mt.Msg().(tmtypes.TMEventData)} + case msg := <-sub.Out(): + resultEvent := &ctypes.ResultEvent{query, msg.Data().(tmtypes.TMEventData)} wsCtx.TryWriteRPCResponse( rpctypes.NewRPCSuccessResponse( wsCtx.Codec(), diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 6b55c46e7..90f0d4120 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -201,8 +201,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // TODO: configurable? var deliverTxTimeout = rpcserver.WriteTimeout / 2 select { - case mt := <-deliverTxSub.Out(): // The tx was included in a block. - deliverTxRes := mt.Msg().(types.EventDataTx) + case msg := <-deliverTxSub.Out(): // The tx was included in a block. + deliverTxRes := msg.Data().(types.EventDataTx) return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, DeliverTx: deliverTxRes.Result, diff --git a/state/execution_test.go b/state/execution_test.go index 7faca096e..6146d3ba7 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -341,9 +341,9 @@ func TestEndBlockValidatorUpdates(t *testing.T) { // test we threw an event select { - case mt := <-updatesSub.Out(): - event, ok := mt.Msg().(types.EventDataValidatorSetUpdates) - require.True(t, ok, "Expected event of type EventDataValidatorSetUpdates, got %T", mt.Msg()) + case msg := <-updatesSub.Out(): + event, ok := msg.Data().(types.EventDataValidatorSetUpdates) + require.True(t, ok, "Expected event of type EventDataValidatorSetUpdates, got %T", msg.Data()) if assert.NotEmpty(t, event.ValidatorUpdates) { assert.Equal(t, pubkey, event.ValidatorUpdates[0].PubKey) assert.EqualValues(t, 10, event.ValidatorUpdates[0].VotingPower) diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index 468753968..468b28f8b 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -44,13 +44,13 @@ func (is *IndexerService) OnStart() error { go func() { for { select { - case mt := <-blockHeadersSub.Out(): - header := mt.Msg().(types.EventDataNewBlockHeader).Header + case msg := <-blockHeadersSub.Out(): + header := msg.Data().(types.EventDataNewBlockHeader).Header batch := NewBatch(header.NumTxs) for i := int64(0); i < header.NumTxs; i++ { select { - case mt2 := <-txsSub.Out(): - txResult := mt2.Msg().(types.EventDataTx).TxResult + case msg2 := <-txsSub.Out(): + txResult := msg2.Data().(types.EventDataTx).TxResult batch.Add(&txResult) case <-txsSub.Cancelled(): is.Logger.Error("Failed to index a block. txsSub was cancelled. Did the Tendermint stop?", diff --git a/types/event_bus.go b/types/event_bus.go index 67a91e159..2f017854b 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -18,7 +18,7 @@ type EventBusSubscriber interface { } type Subscription interface { - Out() <-chan tmpubsub.MsgAndTags + Out() <-chan tmpubsub.Message Cancelled() <-chan struct{} Err() error } diff --git a/types/event_bus_test.go b/types/event_bus_test.go index 180301210..be3f2315f 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -31,8 +31,8 @@ func TestEventBusPublishEventTx(t *testing.T) { done := make(chan struct{}) go func() { - mt := <-txsSub.Out() - edt := mt.Msg().(EventDataTx) + msg := <-txsSub.Out() + edt := msg.Data().(EventDataTx) assert.Equal(t, int64(1), edt.Height) assert.Equal(t, uint32(0), edt.Index) assert.Equal(t, tx, edt.Tx) @@ -72,8 +72,8 @@ func TestEventBusPublishEventNewBlock(t *testing.T) { done := make(chan struct{}) go func() { - mt := <-blocksSub.Out() - edt := mt.Msg().(EventDataNewBlock) + msg := <-blocksSub.Out() + edt := msg.Data().(EventDataNewBlock) assert.Equal(t, block, edt.Block) assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock) assert.Equal(t, resultEndBlock, edt.ResultEndBlock) @@ -111,8 +111,8 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) { done := make(chan struct{}) go func() { - mt := <-headersSub.Out() - edt := mt.Msg().(EventDataNewBlockHeader) + msg := <-headersSub.Out() + edt := msg.Data().(EventDataNewBlockHeader) assert.Equal(t, block.Header, edt.Header) assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock) assert.Equal(t, resultEndBlock, edt.ResultEndBlock)