diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 57ac36421..a7fab7af9 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -9,6 +9,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi ### BREAKING CHANGES - CLI/RPC/Config + - [pubsub/events] \#6634 The `ResultEvent.Events` field is now of type `[]abci.Event` preserving event order instead of `map[string][]string`. (@alexanderbez) - [config] \#5598 The `test_fuzz` and `test_fuzz_config` P2P settings have been removed. (@erikgrinaker) - [config] \#5728 `fast_sync = "v1"` is no longer supported (@melekes) - [cli] \#5772 `gen_node_key` prints JSON-encoded `NodeKey` rather than ID and does not save it to `node_key.json` (@melekes) @@ -33,6 +34,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - P2P Protocol - Go API + - [pubsub] \#6634 The `Query#Matches` method along with other pubsub methods, now accepts a `[]abci.Event` instead of `map[string][]string`. (@alexanderbez) - [p2p] \#6618 Move `p2p.NodeInfo` into `types` to support use of the SDK. (@tychoish) - [p2p] \#6583 Make `p2p.NodeID` and `p2p.NetAddress` exported types to support their use in the RPC layer. (@tychoish) - [node] \#6540 Reduce surface area of the `node` package by making most of the implementation details private. (@tychoish) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 612b83efd..fdef87ab8 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -371,7 +371,7 @@ func (r *Reactor) broadcastHasVoteMessage(vote *types.Vote) { func (r *Reactor) subscribeToBroadcastEvents() { err := r.state.evsw.AddListenerForEvent( listenerIDConsensus, - types.EventNewRoundStep, + types.EventNewRoundStepValue, func(data tmevents.EventData) { r.broadcastNewRoundStepMessage(data.(*cstypes.RoundState)) select { @@ -386,7 +386,7 @@ func (r *Reactor) subscribeToBroadcastEvents() { err = r.state.evsw.AddListenerForEvent( listenerIDConsensus, - types.EventValidBlock, + types.EventValidBlockValue, func(data tmevents.EventData) { r.broadcastNewValidBlockMessage(data.(*cstypes.RoundState)) }, @@ -397,7 +397,7 @@ func (r *Reactor) subscribeToBroadcastEvents() { err = r.state.evsw.AddListenerForEvent( listenerIDConsensus, - types.EventVote, + types.EventVoteValue, func(data tmevents.EventData) { r.broadcastHasVoteMessage(data.(*types.Vote)) }, diff --git a/internal/consensus/state.go b/internal/consensus/state.go index e0ec6927a..eece8d7ca 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -730,7 +730,7 @@ func (cs *State) newStep() { cs.Logger.Error("failed publishing new round step", "err", err) } - cs.evsw.FireEvent(types.EventNewRoundStep, &cs.RoundState) + cs.evsw.FireEvent(types.EventNewRoundStepValue, &cs.RoundState) } } @@ -1560,7 +1560,7 @@ func (cs *State) enterCommit(height int64, commitRound int32) { logger.Error("failed publishing valid block", "err", err) } - cs.evsw.FireEvent(types.EventValidBlock, &cs.RoundState) + cs.evsw.FireEvent(types.EventValidBlockValue, &cs.RoundState) } } } @@ -2020,7 +2020,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err return added, err } - cs.evsw.FireEvent(types.EventVote, vote) + cs.evsw.FireEvent(types.EventVoteValue, vote) // if we can skip timeoutCommit and have all the votes now, if cs.config.SkipTimeoutCommit && cs.LastCommit.HasAll() { @@ -2049,7 +2049,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err if err := cs.eventBus.PublishEventVote(types.EventDataVote{Vote: vote}); err != nil { return added, err } - cs.evsw.FireEvent(types.EventVote, vote) + cs.evsw.FireEvent(types.EventVoteValue, vote) switch vote.Type { case tmproto.PrevoteType: @@ -2103,7 +2103,7 @@ func (cs *State) addVote(vote *types.Vote, peerID types.NodeID) (added bool, err cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader) } - cs.evsw.FireEvent(types.EventValidBlock, &cs.RoundState) + cs.evsw.FireEvent(types.EventValidBlockValue, &cs.RoundState) if err := cs.eventBus.PublishEventValidBlock(cs.RoundStateEvent()); err != nil { return added, err } diff --git a/libs/events/events.go b/libs/events/events.go index 21bfce3e5..146a9cfa7 100644 --- a/libs/events/events.go +++ b/libs/events/events.go @@ -32,7 +32,7 @@ type Eventable interface { // // FireEvent fires an event with the given name and data. type Fireable interface { - FireEvent(event string, data EventData) + FireEvent(eventValue string, data EventData) } // EventSwitch is the interface for synchronous pubsub, where listeners @@ -46,7 +46,7 @@ type EventSwitch interface { service.Service Fireable - AddListenerForEvent(listenerID, event string, cb EventCallback) error + AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error RemoveListenerForEvent(event string, listenerID string) RemoveListener(listenerID string) } @@ -74,27 +74,29 @@ func (evsw *eventSwitch) OnStart() error { func (evsw *eventSwitch) OnStop() {} -func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) error { +func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error { // Get/Create eventCell and listener. evsw.mtx.Lock() - eventCell := evsw.eventCells[event] + + eventCell := evsw.eventCells[eventValue] if eventCell == nil { eventCell = newEventCell() - evsw.eventCells[event] = eventCell + evsw.eventCells[eventValue] = eventCell } + listener := evsw.listeners[listenerID] if listener == nil { listener = newEventListener(listenerID) evsw.listeners[listenerID] = listener } + evsw.mtx.Unlock() - // Add event and listener. - if err := listener.AddEvent(event); err != nil { + if err := listener.AddEvent(eventValue); err != nil { return err } - eventCell.AddListener(listenerID, cb) + eventCell.AddListener(listenerID, cb) return nil } diff --git a/libs/pubsub/example_test.go b/libs/pubsub/example_test.go index 6abd5de5c..fd4a94382 100644 --- a/libs/pubsub/example_test.go +++ b/libs/pubsub/example_test.go @@ -6,8 +6,8 @@ import ( "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/libs/pubsub/query" ) @@ -15,8 +15,9 @@ import ( func TestExample(t *testing.T) { s := pubsub.NewServer() s.SetLogger(log.TestingLogger()) - err := s.Start() - require.NoError(t, err) + + require.NoError(t, s.Start()) + t.Cleanup(func() { if err := s.Stop(); err != nil { t.Error(err) @@ -24,9 +25,18 @@ func TestExample(t *testing.T) { }) ctx := context.Background() + subscription, err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'")) require.NoError(t, err) - err = s.PublishWithEvents(ctx, "Tombstone", map[string][]string{"abci.account.name": {"John"}}) + + events := []abci.Event{ + { + Type: "abci.account", + Attributes: []abci.EventAttribute{{Key: "name", Value: "John"}}, + }, + } + err = s.PublishWithEvents(ctx, "Tombstone", events) require.NoError(t, err) + assertReceive(t, "Tombstone", subscription.Out()) } diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 7b08319bb..54a030fe8 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -39,6 +39,7 @@ import ( "errors" "fmt" + "github.com/tendermint/tendermint/abci/types" tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/libs/service" @@ -70,7 +71,7 @@ var ( // allows event types to repeat themselves with the same set of keys and // different values. type Query interface { - Matches(events map[string][]string) (bool, error) + Matches(events []types.Event) (bool, error) String() string } @@ -102,7 +103,7 @@ type cmd struct { // publish msg interface{} - events map[string][]string + events []types.Event } // Server allows clients to subscribe/unsubscribe for messages, publishing @@ -314,13 +315,13 @@ func (s *Server) NumClientSubscriptions(clientID string) int { // Publish publishes the given message. An error will be returned to the caller // if the context is canceled. func (s *Server) Publish(ctx context.Context, msg interface{}) error { - return s.PublishWithEvents(ctx, msg, make(map[string][]string)) + return s.PublishWithEvents(ctx, msg, []types.Event{}) } // PublishWithEvents publishes the given message with the set of events. The set // is matched with clients queries. If there is a match, the message is sent to // the client. -func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events map[string][]string) error { +func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events []types.Event) error { select { case s.cmds <- cmd{op: pub, msg: msg, events: events}: return nil @@ -473,7 +474,7 @@ func (state *state) removeAll(reason error) { } } -func (state *state) send(msg interface{}, events map[string][]string) error { +func (state *state) send(msg interface{}, events []types.Event) error { for qStr, clientSubscriptions := range state.subscriptions { if sub, ok := clientSubscriptions[qStr]; ok && sub.id == qStr { continue diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go index 88c52cf25..525415493 100644 --- a/libs/pubsub/pubsub_test.go +++ b/libs/pubsub/pubsub_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/pubsub" @@ -35,8 +36,8 @@ func TestSubscribe(t *testing.T) { subscription, err := s.Subscribe(ctx, clientID, query.Empty{}) require.NoError(t, err) - assert.Equal(t, 1, s.NumClients()) - assert.Equal(t, 1, s.NumClientSubscriptions(clientID)) + require.Equal(t, 1, s.NumClients()) + require.Equal(t, 1, s.NumClientSubscriptions(clientID)) err = s.Publish(ctx, "Ka-Zar") require.NoError(t, err) @@ -47,13 +48,13 @@ func TestSubscribe(t *testing.T) { defer close(published) err := s.Publish(ctx, "Quicksilver") - assert.NoError(t, err) + require.NoError(t, err) err = s.Publish(ctx, "Asylum") - assert.NoError(t, err) + require.NoError(t, err) err = s.Publish(ctx, "Ivan") - assert.NoError(t, err) + require.NoError(t, err) }() select { @@ -77,11 +78,11 @@ func TestSubscribeWithCapacity(t *testing.T) { }) ctx := context.Background() - assert.Panics(t, func() { + require.Panics(t, func() { _, err = s.Subscribe(ctx, clientID, query.Empty{}, -1) require.NoError(t, err) }) - assert.Panics(t, func() { + require.Panics(t, func() { _, err = s.Subscribe(ctx, clientID, query.Empty{}, 0) require.NoError(t, err) }) @@ -112,10 +113,10 @@ func TestSubscribeUnbuffered(t *testing.T) { defer close(published) err := s.Publish(ctx, "Ultron") - assert.NoError(t, err) + require.NoError(t, err) err = s.Publish(ctx, "Darkhawk") - assert.NoError(t, err) + require.NoError(t, err) }() select { @@ -152,8 +153,8 @@ func TestSlowClientIsRemovedWithErrOutOfCapacity(t *testing.T) { func TestDifferentClients(t *testing.T) { s := pubsub.NewServer() s.SetLogger(log.TestingLogger()) - err := s.Start() - require.NoError(t, err) + + require.NoError(t, s.Start()) t.Cleanup(func() { if err := s.Stop(); err != nil { t.Error(err) @@ -161,10 +162,18 @@ func TestDifferentClients(t *testing.T) { }) ctx := context.Background() + subscription1, err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'")) require.NoError(t, err) - err = s.PublishWithEvents(ctx, "Iceman", map[string][]string{"tm.events.type": {"NewBlock"}}) - require.NoError(t, err) + + events := []abci.Event{ + { + Type: "tm.events", + Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}}, + }, + } + + require.NoError(t, s.PublishWithEvents(ctx, "Iceman", events)) assertReceive(t, "Iceman", subscription1.Out()) subscription2, err := s.Subscribe( @@ -173,12 +182,19 @@ func TestDifferentClients(t *testing.T) { query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ) require.NoError(t, err) - err = s.PublishWithEvents( - ctx, - "Ultimo", - map[string][]string{"tm.events.type": {"NewBlock"}, "abci.account.name": {"Igor"}}, - ) - require.NoError(t, err) + + events = []abci.Event{ + { + Type: "tm.events", + Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}}, + }, + { + Type: "abci.account", + Attributes: []abci.EventAttribute{{Key: "name", Value: "Igor"}}, + }, + } + + require.NoError(t, s.PublishWithEvents(ctx, "Ultimo", events)) assertReceive(t, "Ultimo", subscription1.Out()) assertReceive(t, "Ultimo", subscription2.Out()) @@ -188,16 +204,25 @@ func TestDifferentClients(t *testing.T) { query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ) require.NoError(t, err) - err = s.PublishWithEvents(ctx, "Valeria Richards", map[string][]string{"tm.events.type": {"NewRoundStep"}}) - require.NoError(t, err) - assert.Zero(t, len(subscription3.Out())) + + events = []abci.Event{ + { + Type: "tm.events", + Attributes: []abci.EventAttribute{{Key: "type", Value: "NewRoundStep"}}, + }, + } + + require.NoError(t, s.PublishWithEvents(ctx, "Valeria Richards", events)) + require.Zero(t, len(subscription3.Out())) } func TestSubscribeDuplicateKeys(t *testing.T) { ctx := context.Background() s := pubsub.NewServer() s.SetLogger(log.TestingLogger()) + require.NoError(t, s.Start()) + t.Cleanup(func() { if err := s.Stop(); err != nil { t.Error(err) @@ -230,15 +255,26 @@ func TestSubscribeDuplicateKeys(t *testing.T) { sub, err := s.Subscribe(ctx, fmt.Sprintf("client-%d", i), query.MustParse(tc.query)) require.NoError(t, err) - err = s.PublishWithEvents( - ctx, - "Iceman", - map[string][]string{ - "transfer.sender": {"foo", "bar", "baz"}, - "withdraw.rewards": {"1", "17", "22"}, + events := []abci.Event{ + { + Type: "transfer", + Attributes: []abci.EventAttribute{ + {Key: "sender", Value: "foo"}, + {Key: "sender", Value: "bar"}, + {Key: "sender", Value: "baz"}, + }, }, - ) - require.NoError(t, err) + { + Type: "withdraw", + Attributes: []abci.EventAttribute{ + {Key: "rewards", Value: "1"}, + {Key: "rewards", Value: "17"}, + {Key: "rewards", Value: "22"}, + }, + }, + } + + require.NoError(t, s.PublishWithEvents(ctx, "Iceman", events)) if tc.expected != nil { assertReceive(t, tc.expected, sub.Out()) @@ -264,16 +300,22 @@ func TestClientSubscribesTwice(t *testing.T) { subscription1, err := s.Subscribe(ctx, clientID, q) require.NoError(t, err) - err = s.PublishWithEvents(ctx, "Goblin Queen", map[string][]string{"tm.events.type": {"NewBlock"}}) - require.NoError(t, err) + + events := []abci.Event{ + { + Type: "tm.events", + Attributes: []abci.EventAttribute{{Key: "type", Value: "NewBlock"}}, + }, + } + + require.NoError(t, s.PublishWithEvents(ctx, "Goblin Queen", events)) assertReceive(t, "Goblin Queen", subscription1.Out()) subscription2, err := s.Subscribe(ctx, clientID, q) require.Error(t, err) require.Nil(t, subscription2) - err = s.PublishWithEvents(ctx, "Spider-Man", map[string][]string{"tm.events.type": {"NewBlock"}}) - require.NoError(t, err) + require.NoError(t, s.PublishWithEvents(ctx, "Spider-Man", events)) assertReceive(t, "Spider-Man", subscription1.Out()) } @@ -298,7 +340,7 @@ func TestUnsubscribe(t *testing.T) { err = s.Publish(ctx, "Nick Fury") require.NoError(t, err) - assert.Zero(t, len(subscription.Out()), "Should not receive anything after Unsubscribe") + require.Zero(t, len(subscription.Out()), "Should not receive anything after Unsubscribe") assertCanceled(t, subscription, pubsub.ErrUnsubscribed) } @@ -325,9 +367,9 @@ func TestClientUnsubscribesTwice(t *testing.T) { err = s.Unsubscribe(ctx, pubsub.UnsubscribeArgs{ Subscriber: clientID, Query: query.MustParse("tm.events.type='NewBlock'")}) - assert.Equal(t, pubsub.ErrSubscriptionNotFound, err) + require.Equal(t, pubsub.ErrSubscriptionNotFound, err) err = s.UnsubscribeAll(ctx, clientID) - assert.Equal(t, pubsub.ErrSubscriptionNotFound, err) + require.Equal(t, pubsub.ErrSubscriptionNotFound, err) } func TestResubscribe(t *testing.T) { @@ -376,8 +418,8 @@ func TestUnsubscribeAll(t *testing.T) { err = s.Publish(ctx, "Nick Fury") require.NoError(t, err) - assert.Zero(t, len(subscription1.Out()), "Should not receive anything after UnsubscribeAll") - assert.Zero(t, len(subscription2.Out()), "Should not receive anything after UnsubscribeAll") + require.Zero(t, len(subscription1.Out()), "Should not receive anything after UnsubscribeAll") + require.Zero(t, len(subscription2.Out()), "Should not receive anything after UnsubscribeAll") assertCanceled(t, subscription1, pubsub.ErrUnsubscribed) assertCanceled(t, subscription2, pubsub.ErrUnsubscribed) @@ -387,7 +429,7 @@ func TestBufferCapacity(t *testing.T) { s := pubsub.NewServer(pubsub.BufferCapacity(2)) s.SetLogger(log.TestingLogger()) - assert.Equal(t, 2, s.BufferCapacity()) + require.Equal(t, 2, s.BufferCapacity()) ctx := context.Background() err := s.Publish(ctx, "Nighthawk") @@ -397,9 +439,10 @@ func TestBufferCapacity(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() + err = s.Publish(ctx, "Ironclad") if assert.Error(t, err) { - assert.Equal(t, context.DeadlineExceeded, err) + require.Equal(t, context.DeadlineExceeded, err) } } @@ -447,12 +490,18 @@ func benchmarkNClients(n int, b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - err = s.PublishWithEvents( - ctx, - "Gamora", - map[string][]string{"abci.Account.Owner": {"Ivan"}, "abci.Invoices.Number": {string(rune(i))}}, - ) - require.NoError(b, err) + events := []abci.Event{ + { + Type: "abci.Account", + Attributes: []abci.EventAttribute{{Key: "Owner", Value: "Ivan"}}, + }, + { + Type: "abci.Invoices", + Attributes: []abci.EventAttribute{{Key: "Number", Value: string(rune(i))}}, + }, + } + + require.NoError(b, s.PublishWithEvents(ctx, "Gamora", events)) } } @@ -487,10 +536,20 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { b.ReportAllocs() b.ResetTimer() + for i := 0; i < b.N; i++ { - err = s.PublishWithEvents(ctx, "Gamora", map[string][]string{"abci.Account.Owner": {"Ivan"}, - "abci.Invoices.Number": {"1"}}) - require.NoError(b, err) + events := []abci.Event{ + { + Type: "abci.Account", + Attributes: []abci.EventAttribute{{Key: "Owner", Value: "Ivan"}}, + }, + { + Type: "abci.Invoices", + Attributes: []abci.EventAttribute{{Key: "Number", Value: "1"}}, + }, + } + + require.NoError(b, s.PublishWithEvents(ctx, "Gamora", events)) } } @@ -499,7 +558,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.Message, msgAndArgs ...interface{}) { select { case actual := <-ch: - assert.Equal(t, expected, actual.Data(), msgAndArgs...) + require.Equal(t, expected, actual.Data(), msgAndArgs...) case <-time.After(1 * time.Second): t.Errorf("expected to receive %v from the channel, got nothing after 1s", expected) debug.PrintStack() @@ -508,6 +567,6 @@ func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.Message, func assertCanceled(t *testing.T, subscription *pubsub.Subscription, err error) { _, ok := <-subscription.Canceled() - assert.False(t, ok) - assert.Equal(t, err, subscription.Err()) + require.False(t, ok) + require.Equal(t, err, subscription.Err()) } diff --git a/libs/pubsub/query/empty.go b/libs/pubsub/query/empty.go index b86b8d4e8..dd6b3f3b2 100644 --- a/libs/pubsub/query/empty.go +++ b/libs/pubsub/query/empty.go @@ -1,11 +1,15 @@ package query +import ( + "github.com/tendermint/tendermint/abci/types" +) + // Empty query matches any set of events. type Empty struct { } // Matches always returns true. -func (Empty) Matches(tags map[string][]string) (bool, error) { +func (Empty) Matches(events []types.Event) (bool, error) { return true, nil } diff --git a/libs/pubsub/query/empty_test.go b/libs/pubsub/query/empty_test.go index 1b6ef2828..4bb3067d6 100644 --- a/libs/pubsub/query/empty_test.go +++ b/libs/pubsub/query/empty_test.go @@ -3,8 +3,8 @@ package query_test import ( "testing" - "github.com/stretchr/testify/assert" - + "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" ) @@ -12,17 +12,44 @@ func TestEmptyQueryMatchesAnything(t *testing.T) { q := query.Empty{} testCases := []struct { - query map[string][]string + events []abci.Event }{ - {map[string][]string{}}, - {map[string][]string{"Asher": {"Roth"}}}, - {map[string][]string{"Route": {"66"}}}, - {map[string][]string{"Route": {"66"}, "Billy": {"Blue"}}}, + { + []abci.Event{}, + }, + { + []abci.Event{ + { + Type: "Asher", + Attributes: []abci.EventAttribute{{Key: "Roth"}}, + }, + }, + }, + { + []abci.Event{ + { + Type: "Route", + Attributes: []abci.EventAttribute{{Key: "66"}}, + }, + }, + }, + { + []abci.Event{ + { + Type: "Route", + Attributes: []abci.EventAttribute{{Key: "66"}}, + }, + { + Type: "Billy", + Attributes: []abci.EventAttribute{{Key: "Blue"}}, + }, + }, + }, } for _, tc := range testCases { - match, err := q.Matches(tc.query) - assert.Nil(t, err) - assert.True(t, match) + match, err := q.Matches(tc.events) + require.Nil(t, err) + require.True(t, match) } } diff --git a/libs/pubsub/query/query.go b/libs/pubsub/query/query.go index cf6903ccf..7b1dfe0f9 100644 --- a/libs/pubsub/query/query.go +++ b/libs/pubsub/query/query.go @@ -15,6 +15,8 @@ import ( "strconv" "strings" "time" + + "github.com/tendermint/tendermint/abci/types" ) var ( @@ -198,11 +200,13 @@ func (q *Query) Conditions() ([]Condition, error) { // // For example, query "name=John" matches events = {"name": ["John", "Eric"]}. // More examples could be found in parser_test.go and query_test.go. -func (q *Query) Matches(events map[string][]string) (bool, error) { - if len(events) == 0 { +func (q *Query) Matches(rawEvents []types.Event) (bool, error) { + if len(rawEvents) == 0 { return false, nil } + events := flattenEvents(rawEvents) + var ( eventAttr string op Operator @@ -500,3 +504,24 @@ func matchValue(value string, op Operator, operand reflect.Value) (bool, error) return false, nil } + +func flattenEvents(events []types.Event) map[string][]string { + flattened := make(map[string][]string) + + for _, event := range events { + if len(event.Type) == 0 { + continue + } + + for _, attr := range event.Attributes { + if len(attr.Key) == 0 { + continue + } + + compositeEvent := fmt.Sprintf("%s.%s", event.Type, attr.Key) + flattened[compositeEvent] = append(flattened[compositeEvent], attr.Value) + } + } + + return flattened +} diff --git a/libs/pubsub/query/query_test.go b/libs/pubsub/query/query_test.go index d511e7fab..87f61aafe 100644 --- a/libs/pubsub/query/query_test.go +++ b/libs/pubsub/query/query_test.go @@ -2,15 +2,38 @@ package query_test import ( "fmt" + "strings" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" ) +func expandEvents(flattenedEvents map[string][]string) []abci.Event { + events := make([]abci.Event, len(flattenedEvents)) + + for composite, values := range flattenedEvents { + tokens := strings.Split(composite, ".") + + attrs := make([]abci.EventAttribute, len(values)) + for i, v := range values { + attrs[i] = abci.EventAttribute{ + Key: tokens[len(tokens)-1], + Value: v, + } + } + + events = append(events, abci.Event{ + Type: strings.Join(tokens[:len(tokens)-1], "."), + Attributes: attrs, + }) + } + + return events +} + func TestMatches(t *testing.T) { var ( txDate = "2017-01-01" @@ -159,21 +182,23 @@ func TestMatches(t *testing.T) { } require.NotNil(t, q, "Query '%s' should not be nil", tc.s) + rawEvents := expandEvents(tc.events) + if tc.matches { - match, err := q.Matches(tc.events) - assert.Nil(t, err, "Query '%s' should not error on match %v", tc.s, tc.events) - assert.True(t, match, "Query '%s' should match %v", tc.s, tc.events) + match, err := q.Matches(rawEvents) + require.Nil(t, err, "Query '%s' should not error on match %v", tc.s, tc.events) + require.True(t, match, "Query '%s' should match %v", tc.s, tc.events) } else { - match, err := q.Matches(tc.events) - assert.Equal(t, tc.matchErr, err != nil, "Unexpected error for query '%s' match %v", tc.s, tc.events) - assert.False(t, match, "Query '%s' should not match %v", tc.s, tc.events) + match, err := q.Matches(rawEvents) + require.Equal(t, tc.matchErr, err != nil, "Unexpected error for query '%s' match %v", tc.s, tc.events) + require.False(t, match, "Query '%s' should not match %v", tc.s, tc.events) } } } func TestMustParse(t *testing.T) { - assert.Panics(t, func() { query.MustParse("=") }) - assert.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") }) + require.Panics(t, func() { query.MustParse("=") }) + require.NotPanics(t, func() { query.MustParse("tm.events.type='NewBlock'") }) } func TestConditions(t *testing.T) { @@ -217,6 +242,6 @@ func TestConditions(t *testing.T) { c, err := q.Conditions() require.NoError(t, err) - assert.Equal(t, tc.conditions, c) + require.Equal(t, tc.conditions, c) } } diff --git a/libs/pubsub/subscription.go b/libs/pubsub/subscription.go index 34eab997f..40b84711e 100644 --- a/libs/pubsub/subscription.go +++ b/libs/pubsub/subscription.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/google/uuid" + "github.com/tendermint/tendermint/abci/types" tmsync "github.com/tendermint/tendermint/internal/libs/sync" ) @@ -89,10 +90,10 @@ func (s *Subscription) cancel(err error) { type Message struct { subID string data interface{} - events map[string][]string + events []types.Event } -func NewMessage(subID string, data interface{}, events map[string][]string) Message { +func NewMessage(subID string, data interface{}, events []types.Event) Message { return Message{ subID: subID, data: data, @@ -108,4 +109,4 @@ func (msg Message) SubscriptionID() string { return msg.subID } func (msg Message) Data() interface{} { return msg.data } // Events returns events, which matched the client's query. -func (msg Message) Events() map[string][]string { return msg.events } +func (msg Message) Events() []types.Event { return msg.events } diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index 226fc2b7d..4acd0fee9 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -44,8 +44,7 @@ func TestHeaderEvents(t *testing.T) { }) } - evtTyp := types.EventNewBlockHeader - evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) + evt, err := client.WaitForOneEvent(c, types.EventNewBlockHeaderValue, waitForEventTimeout) require.Nil(t, err, "%d: %+v", i, err) _, ok := evt.(types.EventDataNewBlockHeader) require.True(t, ok, "%d: %#v", i, evt) @@ -75,7 +74,7 @@ func TestBlockEvents(t *testing.T) { const subscriber = "TestBlockEvents" - eventCh, err := c.Subscribe(context.Background(), subscriber, types.QueryForEvent(types.EventNewBlock).String()) + eventCh, err := c.Subscribe(context.Background(), subscriber, types.QueryForEvent(types.EventNewBlockValue).String()) require.NoError(t, err) t.Cleanup(func() { if err := c.UnsubscribeAll(context.Background(), subscriber); err != nil { @@ -146,7 +145,7 @@ func testTxEventsSent(t *testing.T, broadcastMethod string) { }() // and wait for confirmation - evt, err := client.WaitForOneEvent(c, types.EventTx, waitForEventTimeout) + evt, err := client.WaitForOneEvent(c, types.EventTxValue, waitForEventTimeout) require.Nil(t, err) // and make sure it has the proper info @@ -176,12 +175,12 @@ func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) { // on Subscribe _, err := c.Subscribe(ctx, "TestHeaderEvents", - types.QueryForEvent(types.EventNewBlockHeader).String()) + types.QueryForEvent(types.EventNewBlockHeaderValue).String()) assert.Error(t, err) // on Unsubscribe err = c.Unsubscribe(ctx, "TestHeaderEvents", - types.QueryForEvent(types.EventNewBlockHeader).String()) + types.QueryForEvent(types.EventNewBlockHeaderValue).String()) assert.Error(t, err) // on UnsubscribeAll diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 78579b8a3..49598e814 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -57,17 +57,18 @@ func WaitForHeight(c StatusClient, h int64, waiter Waiter) error { // when the timeout duration has expired. // // This handles subscribing and unsubscribing under the hood -func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) { +func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) (types.TMEventData, error) { const subscriber = "helpers" ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() // register for the next event of this type - eventCh, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp).String()) + eventCh, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(eventValue).String()) if err != nil { return nil, fmt.Errorf("failed to subscribe: %w", err) } - // make sure to unregister after the test is over + + // make sure to un-register after the test is over defer func() { if deferErr := c.UnsubscribeAll(ctx, subscriber); deferErr != nil { panic(err) diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index d6d4b0983..7bf5eea09 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -261,8 +261,8 @@ type ( // Event data from a subscription type ResultEvent struct { - SubscriptionID string `json:"subscription_id"` - Query string `json:"query"` - Data types.TMEventData `json:"data"` - Events map[string][]string `json:"events"` + SubscriptionID string `json:"subscription_id"` + Query string `json:"query"` + Data types.TMEventData `json:"data"` + Events []abci.Event `json:"events"` } diff --git a/types/event_bus.go b/types/event_bus.go index 094fd6748..b4ea4568b 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -3,6 +3,7 @@ package types import ( "context" "fmt" + "strings" "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" @@ -100,47 +101,31 @@ func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error return b.pubsub.UnsubscribeAll(ctx, subscriber) } -func (b *EventBus) Publish(eventType string, eventData TMEventData) error { +func (b *EventBus) Publish(eventValue string, eventData TMEventData) error { // no explicit deadline for publishing events ctx := context.Background() - return b.pubsub.PublishWithEvents(ctx, eventData, map[string][]string{EventTypeKey: {eventType}}) -} - -// validateAndStringifyEvents takes a slice of event objects and creates a -// map of stringified events where each key is composed of the event -// type and each of the event's attributes keys in the form of -// "{event.Type}.{attribute.Key}" and the value is each attribute's value. -func (b *EventBus) validateAndStringifyEvents(events []types.Event, logger log.Logger) map[string][]string { - result := make(map[string][]string) - for _, event := range events { - if len(event.Type) == 0 { - logger.Debug("Got an event with an empty type (skipping)", "event", event) - continue - } - - for _, attr := range event.Attributes { - if len(attr.Key) == 0 { - logger.Debug("Got an event attribute with an empty key(skipping)", "event", event) - continue - } - - compositeTag := fmt.Sprintf("%s.%s", event.Type, attr.Key) - result[compositeTag] = append(result[compositeTag], attr.Value) - } + + tokens := strings.Split(EventTypeKey, ".") + event := types.Event{ + Type: tokens[0], + Attributes: []types.EventAttribute{ + { + Key: tokens[1], + Value: eventValue, + }, + }, } - return result + return b.pubsub.PublishWithEvents(ctx, eventData, []types.Event{event}) } func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error { // no explicit deadline for publishing events ctx := context.Background() + events := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...) - resultEvents := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...) - events := b.validateAndStringifyEvents(resultEvents, b.Logger.With("block", data.Block.StringShort())) - - // add predefined new block event - events[EventTypeKey] = append(events[EventTypeKey], EventNewBlock) + // add Tendermint-reserved new block event + events = append(events, EventNewBlock) return b.pubsub.PublishWithEvents(ctx, data, events) } @@ -148,27 +133,24 @@ func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error { func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error { // no explicit deadline for publishing events ctx := context.Background() + events := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...) - resultTags := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...) - // TODO: Create StringShort method for Header and use it in logger. - events := b.validateAndStringifyEvents(resultTags, b.Logger.With("header", data.Header)) - - // add predefined new block header event - events[EventTypeKey] = append(events[EventTypeKey], EventNewBlockHeader) + // add Tendermint-reserved new block header event + events = append(events, EventNewBlockHeader) return b.pubsub.PublishWithEvents(ctx, data, events) } func (b *EventBus) PublishEventNewEvidence(evidence EventDataNewEvidence) error { - return b.Publish(EventNewEvidence, evidence) + return b.Publish(EventNewEvidenceValue, evidence) } func (b *EventBus) PublishEventVote(data EventDataVote) error { - return b.Publish(EventVote, data) + return b.Publish(EventVoteValue, data) } func (b *EventBus) PublishEventValidBlock(data EventDataRoundState) error { - return b.Publish(EventValidBlock, data) + return b.Publish(EventValidBlockValue, data) } // PublishEventTx publishes tx event with events from Result. Note it will add @@ -177,55 +159,74 @@ func (b *EventBus) PublishEventValidBlock(data EventDataRoundState) error { func (b *EventBus) PublishEventTx(data EventDataTx) error { // no explicit deadline for publishing events ctx := context.Background() - - events := b.validateAndStringifyEvents(data.Result.Events, b.Logger.With("tx", data.Tx)) - - // add predefined compositeKeys - events[EventTypeKey] = append(events[EventTypeKey], EventTx) - events[TxHashKey] = append(events[TxHashKey], fmt.Sprintf("%X", Tx(data.Tx).Hash())) - events[TxHeightKey] = append(events[TxHeightKey], fmt.Sprintf("%d", data.Height)) + events := data.Result.Events + + // add Tendermint-reserved events + events = append(events, EventTx) + + tokens := strings.Split(TxHashKey, ".") + events = append(events, types.Event{ + Type: tokens[0], + Attributes: []types.EventAttribute{ + { + Key: tokens[1], + Value: fmt.Sprintf("%X", Tx(data.Tx).Hash()), + }, + }, + }) + + tokens = strings.Split(TxHeightKey, ".") + events = append(events, types.Event{ + Type: tokens[0], + Attributes: []types.EventAttribute{ + { + Key: tokens[1], + Value: fmt.Sprintf("%d", data.Height), + }, + }, + }) return b.pubsub.PublishWithEvents(ctx, data, events) } func (b *EventBus) PublishEventNewRoundStep(data EventDataRoundState) error { - return b.Publish(EventNewRoundStep, data) + return b.Publish(EventNewRoundStepValue, data) } func (b *EventBus) PublishEventTimeoutPropose(data EventDataRoundState) error { - return b.Publish(EventTimeoutPropose, data) + return b.Publish(EventTimeoutProposeValue, data) } func (b *EventBus) PublishEventTimeoutWait(data EventDataRoundState) error { - return b.Publish(EventTimeoutWait, data) + return b.Publish(EventTimeoutWaitValue, data) } func (b *EventBus) PublishEventNewRound(data EventDataNewRound) error { - return b.Publish(EventNewRound, data) + return b.Publish(EventNewRoundValue, data) } func (b *EventBus) PublishEventCompleteProposal(data EventDataCompleteProposal) error { - return b.Publish(EventCompleteProposal, data) + return b.Publish(EventCompleteProposalValue, data) } func (b *EventBus) PublishEventPolka(data EventDataRoundState) error { - return b.Publish(EventPolka, data) + return b.Publish(EventPolkaValue, data) } func (b *EventBus) PublishEventUnlock(data EventDataRoundState) error { - return b.Publish(EventUnlock, data) + return b.Publish(EventUnlockValue, data) } func (b *EventBus) PublishEventRelock(data EventDataRoundState) error { - return b.Publish(EventRelock, data) + return b.Publish(EventRelockValue, data) } func (b *EventBus) PublishEventLock(data EventDataRoundState) error { - return b.Publish(EventLock, data) + return b.Publish(EventLockValue, data) } func (b *EventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error { - return b.Publish(EventValidatorSetUpdates, data) + return b.Publish(EventValidatorSetUpdatesValue, data) } //----------------------------------------------------------------------------- diff --git a/types/event_bus_test.go b/types/event_bus_test.go index 981395718..e8a1667c6 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -342,7 +342,7 @@ func TestEventBusPublish(t *testing.T) { } }() - err = eventBus.Publish(EventNewBlockHeader, EventDataNewBlockHeader{}) + err = eventBus.Publish(EventNewBlockHeaderValue, EventDataNewBlockHeader{}) require.NoError(t, err) err = eventBus.PublishEventNewBlock(EventDataNewBlock{}) require.NoError(t, err) @@ -447,16 +447,16 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes }() } - eventType := EventNewBlock + eventValue := EventNewBlockValue b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { if randEvents { - eventType = randEvent() + eventValue = randEventValue() } - err := eventBus.Publish(eventType, EventDataString("Gamora")) + err := eventBus.Publish(eventValue, EventDataString("Gamora")) if err != nil { b.Error(err) } @@ -464,20 +464,21 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes } var events = []string{ - EventNewBlock, - EventNewBlockHeader, - EventNewRound, - EventNewRoundStep, - EventTimeoutPropose, - EventCompleteProposal, - EventPolka, - EventUnlock, - EventLock, - EventRelock, - EventTimeoutWait, - EventVote} - -func randEvent() string { + EventNewBlockValue, + EventNewBlockHeaderValue, + EventNewRoundValue, + EventNewRoundStepValue, + EventTimeoutProposeValue, + EventCompleteProposalValue, + EventPolkaValue, + EventUnlockValue, + EventLockValue, + EventRelockValue, + EventTimeoutWaitValue, + EventVoteValue, +} + +func randEventValue() string { return events[mrand.Intn(len(events))] } diff --git a/types/events.go b/types/events.go index 684160d6f..9df8e8507 100644 --- a/types/events.go +++ b/types/events.go @@ -2,6 +2,7 @@ package types import ( "fmt" + "strings" abci "github.com/tendermint/tendermint/abci/types" tmjson "github.com/tendermint/tendermint/libs/json" @@ -16,26 +17,69 @@ const ( // after a block has been committed. // These are also used by the tx indexer for async indexing. // All of this data can be fetched through the rpc. - EventNewBlock = "NewBlock" - EventNewBlockHeader = "NewBlockHeader" - EventNewEvidence = "NewEvidence" - EventTx = "Tx" - EventValidatorSetUpdates = "ValidatorSetUpdates" + EventNewBlockValue = "NewBlock" + EventNewBlockHeaderValue = "NewBlockHeader" + EventNewEvidenceValue = "NewEvidence" + EventTxValue = "Tx" + EventValidatorSetUpdatesValue = "ValidatorSetUpdates" // Internal consensus events. // These are used for testing the consensus state machine. // They can also be used to build real-time consensus visualizers. - EventCompleteProposal = "CompleteProposal" - EventLock = "Lock" - EventNewRound = "NewRound" - EventNewRoundStep = "NewRoundStep" - EventPolka = "Polka" - EventRelock = "Relock" - EventTimeoutPropose = "TimeoutPropose" - EventTimeoutWait = "TimeoutWait" - EventUnlock = "Unlock" - EventValidBlock = "ValidBlock" - EventVote = "Vote" + EventCompleteProposalValue = "CompleteProposal" + EventLockValue = "Lock" + EventNewRoundValue = "NewRound" + EventNewRoundStepValue = "NewRoundStep" + EventPolkaValue = "Polka" + EventRelockValue = "Relock" + EventTimeoutProposeValue = "TimeoutPropose" + EventTimeoutWaitValue = "TimeoutWait" + EventUnlockValue = "Unlock" + EventValidBlockValue = "ValidBlock" + EventVoteValue = "Vote" +) + +// Pre-populated ABCI Tendermint-reserved events +var ( + EventNewBlock = abci.Event{ + Type: strings.Split(EventTypeKey, ".")[0], + Attributes: []abci.EventAttribute{ + { + Key: strings.Split(EventTypeKey, ".")[1], + Value: EventNewBlockValue, + }, + }, + } + + EventNewBlockHeader = abci.Event{ + Type: strings.Split(EventTypeKey, ".")[0], + Attributes: []abci.EventAttribute{ + { + Key: strings.Split(EventTypeKey, ".")[1], + Value: EventNewBlockHeaderValue, + }, + }, + } + + EventNewEvidence = abci.Event{ + Type: strings.Split(EventTypeKey, ".")[0], + Attributes: []abci.EventAttribute{ + { + Key: strings.Split(EventTypeKey, ".")[1], + Value: EventNewEvidenceValue, + }, + }, + } + + EventTx = abci.Event{ + Type: strings.Split(EventTypeKey, ".")[0], + Attributes: []abci.EventAttribute{ + { + Key: strings.Split(EventTypeKey, ".")[1], + Value: EventTxValue, + }, + }, + } ) // ENCODING / DECODING @@ -147,30 +191,30 @@ const ( ) var ( - EventQueryCompleteProposal = QueryForEvent(EventCompleteProposal) - EventQueryLock = QueryForEvent(EventLock) - EventQueryNewBlock = QueryForEvent(EventNewBlock) - EventQueryNewBlockHeader = QueryForEvent(EventNewBlockHeader) - EventQueryNewEvidence = QueryForEvent(EventNewEvidence) - EventQueryNewRound = QueryForEvent(EventNewRound) - EventQueryNewRoundStep = QueryForEvent(EventNewRoundStep) - EventQueryPolka = QueryForEvent(EventPolka) - EventQueryRelock = QueryForEvent(EventRelock) - EventQueryTimeoutPropose = QueryForEvent(EventTimeoutPropose) - EventQueryTimeoutWait = QueryForEvent(EventTimeoutWait) - EventQueryTx = QueryForEvent(EventTx) - EventQueryUnlock = QueryForEvent(EventUnlock) - EventQueryValidatorSetUpdates = QueryForEvent(EventValidatorSetUpdates) - EventQueryValidBlock = QueryForEvent(EventValidBlock) - EventQueryVote = QueryForEvent(EventVote) + EventQueryCompleteProposal = QueryForEvent(EventCompleteProposalValue) + EventQueryLock = QueryForEvent(EventLockValue) + EventQueryNewBlock = QueryForEvent(EventNewBlockValue) + EventQueryNewBlockHeader = QueryForEvent(EventNewBlockHeaderValue) + EventQueryNewEvidence = QueryForEvent(EventNewEvidenceValue) + EventQueryNewRound = QueryForEvent(EventNewRoundValue) + EventQueryNewRoundStep = QueryForEvent(EventNewRoundStepValue) + EventQueryPolka = QueryForEvent(EventPolkaValue) + EventQueryRelock = QueryForEvent(EventRelockValue) + EventQueryTimeoutPropose = QueryForEvent(EventTimeoutProposeValue) + EventQueryTimeoutWait = QueryForEvent(EventTimeoutWaitValue) + EventQueryTx = QueryForEvent(EventTxValue) + EventQueryUnlock = QueryForEvent(EventUnlockValue) + EventQueryValidatorSetUpdates = QueryForEvent(EventValidatorSetUpdatesValue) + EventQueryValidBlock = QueryForEvent(EventValidBlockValue) + EventQueryVote = QueryForEvent(EventVoteValue) ) func EventQueryTxFor(tx Tx) tmpubsub.Query { - return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTx, TxHashKey, tx.Hash())) + return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTxValue, TxHashKey, tx.Hash())) } -func QueryForEvent(eventType string) tmpubsub.Query { - return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventType)) +func QueryForEvent(eventValue string) tmpubsub.Query { + return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventValue)) } // BlockEventPublisher publishes all block related events diff --git a/types/events_test.go b/types/events_test.go index 12f75b74d..dcd998ace 100644 --- a/types/events_test.go +++ b/types/events_test.go @@ -18,10 +18,10 @@ func TestQueryTxFor(t *testing.T) { func TestQueryForEvent(t *testing.T) { assert.Equal(t, "tm.event='NewBlock'", - QueryForEvent(EventNewBlock).String(), + QueryForEvent(EventNewBlockValue).String(), ) assert.Equal(t, "tm.event='NewEvidence'", - QueryForEvent(EventNewEvidence).String(), + QueryForEvent(EventNewEvidenceValue).String(), ) }