Browse Source

abci: Refactor tagging events using list of lists (#3643)

## PR

This PR introduces a fundamental breaking change to the structure of ABCI response and tx tags and the way they're processed. Namely, the SDK can support more complex and aggregated events for distribution and slashing. In addition, block responses can include duplicate keys in events.

    Implement new Event type. An event has a type and a list of KV pairs (ie. list-of-lists). Typical events may look like:

"rewards": [{"amount": "5000uatom", "validator": "...", "recipient": "..."}]
"sender": [{"address": "...", "balance": "100uatom"}]

The events are indexed by {even.type}.{even.attribute[i].key}/.... In this case a client would subscribe or query for rewards.recipient='...'

    ABCI response types and related types now include Events []Event instead of Tags []cmn.KVPair.
    PubSub logic now publishes/matches against map[string][]string instead of map[string]string to support duplicate keys in response events (from #1385). A match is successful if the value is found in the slice of strings.

closes: #1859
closes: #2905

## Commits:

* Implement Event ABCI type and updates responses to use events

* Update messages_test.go

* Update kvstore.go

* Update event_bus.go

* Update subscription.go

* Update pubsub.go

* Update kvstore.go

* Update query logic to handle slice of strings in events

* Update Empty#Matches and unit tests

* Update pubsub logic

* Update EventBus#Publish

* Update kv tx indexer

* Update godocs

* Update ResultEvent to use slice of strings; update RPC

* Update more tests

* Update abci.md

* Check for key in validateAndStringifyEvents

* Fix KV indexer to skip empty keys

* Fix linting errors

* Update CHANGELOG_PENDING.md

* Update docs/spec/abci/abci.md

Co-Authored-By: Federico Kunze <31522760+fedekunze@users.noreply.github.com>

* Update abci/types/types.proto

Co-Authored-By: Ethan Buchman <ethan@coinculture.info>

* Update docs/spec/abci/abci.md

Co-Authored-By: Ethan Buchman <ethan@coinculture.info>

* Update libs/pubsub/query/query.go

Co-Authored-By: Ethan Buchman <ethan@coinculture.info>

* Update match function to match if ANY value matches

* Implement TestSubscribeDuplicateKeys

* Update TestMatches to include multi-key test cases

* Update events.go

* Update Query interface godoc

* Update match godoc

* Add godoc for matchValue

* DRY-up tx indexing

* Return error from PublishWithEvents in EventBus#Publish

* Update PublishEventNewBlockHeader to return an error

* Fix build

* Update events doc in ABCI

* Update ABCI events godoc

* Implement TestEventBusPublishEventTxDuplicateKeys

* Update TestSubscribeDuplicateKeys to be table-driven

* Remove mod file

* Remove markdown from events godoc

* Implement TestTxSearchDeprecatedIndexing test
pull/3722/head v0.32.0-dev0
Alexander Bezobchuk 6 years ago
committed by Anton Kaliaev
parent
commit
ab0835463f
25 changed files with 1409 additions and 555 deletions
  1. +7
    -3
      CHANGELOG_PENDING.md
  2. +11
    -4
      abci/example/kvstore/kvstore.go
  3. +15
    -4
      abci/types/messages_test.go
  4. +609
    -325
      abci/types/types.pb.go
  5. +9
    -4
      abci/types/types.proto
  6. +124
    -0
      abci/types/typespb_test.go
  7. +1
    -1
      blockchain/reactor_test.go
  8. +46
    -8
      docs/spec/abci/abci.md
  9. +1
    -1
      libs/pubsub/example_test.go
  10. +21
    -16
      libs/pubsub/pubsub.go
  11. +58
    -7
      libs/pubsub/pubsub_test.go
  12. +1
    -1
      libs/pubsub/query/empty.go
  13. +4
    -4
      libs/pubsub/query/empty_test.go
  14. +50
    -22
      libs/pubsub/query/query.go
  15. +35
    -23
      libs/pubsub/query/query_test.go
  16. +7
    -7
      libs/pubsub/subscription.go
  17. +1
    -1
      rpc/client/localclient.go
  18. +74
    -15
      rpc/core/events.go
  19. +3
    -3
      rpc/core/types/responses.go
  20. +1
    -2
      state/execution_test.go
  21. +9
    -7
      state/state_test.go
  22. +39
    -20
      state/txindex/kv/kv.go
  23. +115
    -29
      state/txindex/kv/kv_test.go
  24. +39
    -40
      types/event_bus.go
  25. +129
    -8
      types/event_bus_test.go

+ 7
- 3
CHANGELOG_PENDING.md View File

@ -10,14 +10,18 @@
- read more on Modules here: https://github.com/golang/go/wiki/Modules - read more on Modules here: https://github.com/golang/go/wiki/Modules
* CLI/RPC/Config * CLI/RPC/Config
- [rpc] \#3616 Improve `/block_results` response format (`results.DeliverTx` ->
* [rpc] \#3616 Improve `/block_results` response format (`results.DeliverTx` ->
`results.deliver_tx`). See docs for details. `results.deliver_tx`). See docs for details.
* Apps * Apps
* [abci] \#1859 `ResponseCheckTx`, `ResponseDeliverTx`, `ResponseBeginBlock`,
and `ResponseEndBlock` now include `Events` instead of `Tags`. Each `Event`
contains a `type` and a list of `attributes` (list of key-value pairs) allowing
for inclusion of multiple distinct events in each response.
* Go API * Go API
- [libs/db] Removed deprecated `LevelDBBackend` const
* If you have `db_backend` set to `leveldb` in your config file, please
* [libs/db] Removed deprecated `LevelDBBackend` const
* If you have `db_backend` set to `leveldb` in your config file, please
change it to `goleveldb` or `cleveldb`. change it to `goleveldb` or `cleveldb`.
- [p2p] \#3521 Remove NewNetAddressStringWithOptionalID - [p2p] \#3521 Remove NewNetAddressStringWithOptionalID


+ 11
- 4
abci/example/kvstore/kvstore.go View File

@ -84,14 +84,21 @@ func (app *KVStoreApplication) DeliverTx(tx []byte) types.ResponseDeliverTx {
} else { } else {
key, value = tx, tx key, value = tx, tx
} }
app.state.db.Set(prefixKey(key), value) app.state.db.Set(prefixKey(key), value)
app.state.Size += 1 app.state.Size += 1
tags := []cmn.KVPair{
{Key: []byte("app.creator"), Value: []byte("Cosmoshi Netowoko")},
{Key: []byte("app.key"), Value: key},
events := []types.Event{
{
Type: "app",
Attributes: []cmn.KVPair{
{Key: []byte("creator"), Value: []byte("Cosmoshi Netowoko")},
{Key: []byte("key"), Value: key},
},
},
} }
return types.ResponseDeliverTx{Code: code.CodeTypeOK, Tags: tags}
return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events}
} }
func (app *KVStoreApplication) CheckTx(tx []byte) types.ResponseCheckTx { func (app *KVStoreApplication) CheckTx(tx []byte) types.ResponseCheckTx {


+ 15
- 4
abci/types/messages_test.go View File

@ -8,6 +8,7 @@ import (
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
) )
@ -21,8 +22,13 @@ func TestMarshalJSON(t *testing.T) {
Code: 1, Code: 1,
Data: []byte("hello"), Data: []byte("hello"),
GasWanted: 43, GasWanted: 43,
Tags: []cmn.KVPair{
{Key: []byte("pho"), Value: []byte("bo")},
Events: []Event{
{
Type: "testEvent",
Attributes: []cmn.KVPair{
{Key: []byte("pho"), Value: []byte("bo")},
},
},
}, },
} }
b, err = json.Marshal(&r1) b, err = json.Marshal(&r1)
@ -82,8 +88,13 @@ func TestWriteReadMessage2(t *testing.T) {
Data: []byte(phrase), Data: []byte(phrase),
Log: phrase, Log: phrase,
GasWanted: 10, GasWanted: 10,
Tags: []cmn.KVPair{
{Key: []byte("abc"), Value: []byte("def")},
Events: []Event{
{
Type: "testEvent",
Attributes: []cmn.KVPair{
{Key: []byte("abc"), Value: []byte("def")},
},
},
}, },
}, },
// TODO: add the rest // TODO: add the rest


+ 609
- 325
abci/types/types.pb.go
File diff suppressed because it is too large
View File


+ 9
- 4
abci/types/types.proto View File

@ -165,7 +165,7 @@ message ResponseQuery {
} }
message ResponseBeginBlock { message ResponseBeginBlock {
repeated common.KVPair tags = 1 [(gogoproto.nullable)=false, (gogoproto.jsontag)="tags,omitempty"];
repeated Event events = 1 [(gogoproto.nullable)=false, (gogoproto.jsontag)="events,omitempty"];
} }
message ResponseCheckTx { message ResponseCheckTx {
@ -175,7 +175,7 @@ message ResponseCheckTx {
string info = 4; // nondeterministic string info = 4; // nondeterministic
int64 gas_wanted = 5; int64 gas_wanted = 5;
int64 gas_used = 6; int64 gas_used = 6;
repeated common.KVPair tags = 7 [(gogoproto.nullable)=false, (gogoproto.jsontag)="tags,omitempty"];
repeated Event events = 7 [(gogoproto.nullable)=false, (gogoproto.jsontag)="events,omitempty"];
string codespace = 8; string codespace = 8;
} }
@ -186,14 +186,14 @@ message ResponseDeliverTx {
string info = 4; // nondeterministic string info = 4; // nondeterministic
int64 gas_wanted = 5; int64 gas_wanted = 5;
int64 gas_used = 6; int64 gas_used = 6;
repeated common.KVPair tags = 7 [(gogoproto.nullable)=false, (gogoproto.jsontag)="tags,omitempty"];
repeated Event events = 7 [(gogoproto.nullable)=false, (gogoproto.jsontag)="events,omitempty"];
string codespace = 8; string codespace = 8;
} }
message ResponseEndBlock { message ResponseEndBlock {
repeated ValidatorUpdate validator_updates = 1 [(gogoproto.nullable)=false]; repeated ValidatorUpdate validator_updates = 1 [(gogoproto.nullable)=false];
ConsensusParams consensus_param_updates = 2; ConsensusParams consensus_param_updates = 2;
repeated common.KVPair tags = 3 [(gogoproto.nullable)=false, (gogoproto.jsontag)="tags,omitempty"];
repeated Event events = 3 [(gogoproto.nullable)=false, (gogoproto.jsontag)="events,omitempty"];
} }
message ResponseCommit { message ResponseCommit {
@ -236,6 +236,11 @@ message LastCommitInfo {
repeated VoteInfo votes = 2 [(gogoproto.nullable)=false]; repeated VoteInfo votes = 2 [(gogoproto.nullable)=false];
} }
message Event {
string type = 1;
repeated common.KVPair attributes = 2 [(gogoproto.nullable)=false, (gogoproto.jsontag)="attributes,omitempty"];
}
//---------------------------------------- //----------------------------------------
// Blockchain Types // Blockchain Types


+ 124
- 0
abci/types/typespb_test.go View File

@ -1703,6 +1703,62 @@ func TestLastCommitInfoMarshalTo(t *testing.T) {
} }
} }
func TestEventProto(t *testing.T) {
seed := time.Now().UnixNano()
popr := math_rand.New(math_rand.NewSource(seed))
p := NewPopulatedEvent(popr, false)
dAtA, err := github_com_gogo_protobuf_proto.Marshal(p)
if err != nil {
t.Fatalf("seed = %d, err = %v", seed, err)
}
msg := &Event{}
if err := github_com_gogo_protobuf_proto.Unmarshal(dAtA, msg); err != nil {
t.Fatalf("seed = %d, err = %v", seed, err)
}
littlefuzz := make([]byte, len(dAtA))
copy(littlefuzz, dAtA)
for i := range dAtA {
dAtA[i] = byte(popr.Intn(256))
}
if !p.Equal(msg) {
t.Fatalf("seed = %d, %#v !Proto %#v", seed, msg, p)
}
if len(littlefuzz) > 0 {
fuzzamount := 100
for i := 0; i < fuzzamount; i++ {
littlefuzz[popr.Intn(len(littlefuzz))] = byte(popr.Intn(256))
littlefuzz = append(littlefuzz, byte(popr.Intn(256)))
}
// shouldn't panic
_ = github_com_gogo_protobuf_proto.Unmarshal(littlefuzz, msg)
}
}
func TestEventMarshalTo(t *testing.T) {
seed := time.Now().UnixNano()
popr := math_rand.New(math_rand.NewSource(seed))
p := NewPopulatedEvent(popr, false)
size := p.Size()
dAtA := make([]byte, size)
for i := range dAtA {
dAtA[i] = byte(popr.Intn(256))
}
_, err := p.MarshalTo(dAtA)
if err != nil {
t.Fatalf("seed = %d, err = %v", seed, err)
}
msg := &Event{}
if err := github_com_gogo_protobuf_proto.Unmarshal(dAtA, msg); err != nil {
t.Fatalf("seed = %d, err = %v", seed, err)
}
for i := range dAtA {
dAtA[i] = byte(popr.Intn(256))
}
if !p.Equal(msg) {
t.Fatalf("seed = %d, %#v !Proto %#v", seed, msg, p)
}
}
func TestHeaderProto(t *testing.T) { func TestHeaderProto(t *testing.T) {
seed := time.Now().UnixNano() seed := time.Now().UnixNano()
popr := math_rand.New(math_rand.NewSource(seed)) popr := math_rand.New(math_rand.NewSource(seed))
@ -2747,6 +2803,24 @@ func TestLastCommitInfoJSON(t *testing.T) {
t.Fatalf("seed = %d, %#v !Json Equal %#v", seed, msg, p) t.Fatalf("seed = %d, %#v !Json Equal %#v", seed, msg, p)
} }
} }
func TestEventJSON(t *testing.T) {
seed := time.Now().UnixNano()
popr := math_rand.New(math_rand.NewSource(seed))
p := NewPopulatedEvent(popr, true)
marshaler := github_com_gogo_protobuf_jsonpb.Marshaler{}
jsondata, err := marshaler.MarshalToString(p)
if err != nil {
t.Fatalf("seed = %d, err = %v", seed, err)
}
msg := &Event{}
err = github_com_gogo_protobuf_jsonpb.UnmarshalString(jsondata, msg)
if err != nil {
t.Fatalf("seed = %d, err = %v", seed, err)
}
if !p.Equal(msg) {
t.Fatalf("seed = %d, %#v !Json Equal %#v", seed, msg, p)
}
}
func TestHeaderJSON(t *testing.T) { func TestHeaderJSON(t *testing.T) {
seed := time.Now().UnixNano() seed := time.Now().UnixNano()
popr := math_rand.New(math_rand.NewSource(seed)) popr := math_rand.New(math_rand.NewSource(seed))
@ -3749,6 +3823,34 @@ func TestLastCommitInfoProtoCompactText(t *testing.T) {
} }
} }
func TestEventProtoText(t *testing.T) {
seed := time.Now().UnixNano()
popr := math_rand.New(math_rand.NewSource(seed))
p := NewPopulatedEvent(popr, true)
dAtA := github_com_gogo_protobuf_proto.MarshalTextString(p)
msg := &Event{}
if err := github_com_gogo_protobuf_proto.UnmarshalText(dAtA, msg); err != nil {
t.Fatalf("seed = %d, err = %v", seed, err)
}
if !p.Equal(msg) {
t.Fatalf("seed = %d, %#v !Proto %#v", seed, msg, p)
}
}
func TestEventProtoCompactText(t *testing.T) {
seed := time.Now().UnixNano()
popr := math_rand.New(math_rand.NewSource(seed))
p := NewPopulatedEvent(popr, true)
dAtA := github_com_gogo_protobuf_proto.CompactTextString(p)
msg := &Event{}
if err := github_com_gogo_protobuf_proto.UnmarshalText(dAtA, msg); err != nil {
t.Fatalf("seed = %d, err = %v", seed, err)
}
if !p.Equal(msg) {
t.Fatalf("seed = %d, %#v !Proto %#v", seed, msg, p)
}
}
func TestHeaderProtoText(t *testing.T) { func TestHeaderProtoText(t *testing.T) {
seed := time.Now().UnixNano() seed := time.Now().UnixNano()
popr := math_rand.New(math_rand.NewSource(seed)) popr := math_rand.New(math_rand.NewSource(seed))
@ -4661,6 +4763,28 @@ func TestLastCommitInfoSize(t *testing.T) {
} }
} }
func TestEventSize(t *testing.T) {
seed := time.Now().UnixNano()
popr := math_rand.New(math_rand.NewSource(seed))
p := NewPopulatedEvent(popr, true)
size2 := github_com_gogo_protobuf_proto.Size(p)
dAtA, err := github_com_gogo_protobuf_proto.Marshal(p)
if err != nil {
t.Fatalf("seed = %d, err = %v", seed, err)
}
size := p.Size()
if len(dAtA) != size {
t.Errorf("seed = %d, size %v != marshalled size %v", seed, size, len(dAtA))
}
if size2 != size {
t.Errorf("seed = %d, size %v != before marshal proto.Size %v", seed, size, size2)
}
size3 := github_com_gogo_protobuf_proto.Size(p)
if size3 != size {
t.Errorf("seed = %d, size %v != after marshal proto.Size %v", seed, size, size3)
}
}
func TestHeaderSize(t *testing.T) { func TestHeaderSize(t *testing.T) {
seed := time.Now().UnixNano() seed := time.Now().UnixNano()
popr := math_rand.New(math_rand.NewSource(seed)) popr := math_rand.New(math_rand.NewSource(seed))


+ 1
- 1
blockchain/reactor_test.go View File

@ -292,7 +292,7 @@ func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
} }
func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx { func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx {
return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}}
return abci.ResponseDeliverTx{Events: []abci.Event{}}
} }
func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx { func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx {


+ 46
- 8
docs/spec/abci/abci.md View File

@ -38,20 +38,58 @@ Finally, `Query`, `CheckTx`, and `DeliverTx` include a `Codespace string`, whose
intended use is to disambiguate `Code` values returned by different domains of the intended use is to disambiguate `Code` values returned by different domains of the
application. The `Codespace` is a namespace for the `Code`. application. The `Codespace` is a namespace for the `Code`.
## Tags
## Events
Some methods (`CheckTx, BeginBlock, DeliverTx, EndBlock`) Some methods (`CheckTx, BeginBlock, DeliverTx, EndBlock`)
include a `Tags` field in their `Response*`. Each tag is key-value pair denoting
something about what happened during the methods execution.
include an `Events` field in their `Response*`. Each event contains a type and a
list of attributes, which are key-value pairs denoting something about what happened
during the method's execution.
Tags can be used to index transactions and blocks according to what happened
during their execution. Note that the set of tags returned for a block from
Events can be used to index transactions and blocks according to what happened
during their execution. Note that the set of events returned for a block from
`BeginBlock` and `EndBlock` are merged. In case both methods return the same `BeginBlock` and `EndBlock` are merged. In case both methods return the same
tag, only the value defined in `EndBlock` is used. tag, only the value defined in `EndBlock` is used.
Keys and values in tags must be UTF-8 encoded strings (e.g.
"account.owner": "Bob", "balance": "100.0",
"time": "2018-01-02T12:30:00Z")
Each event has a `type` which is meant to categorize the event for a particular
`Response*` or tx. A `Response*` or tx may contain multiple events with duplicate
`type` values, where each distinct entry is meant to categorize attributes for a
particular event. Every key and value in an event's attributes must be UTF-8
encoded strings along with the even type itself.
Example:
```go
abci.ResponseDeliverTx{
// ...
Events: []abci.Event{
{
Type: "validator.provisions",
Attributes: cmn.KVPairs{
cmn.KVPair{Key: []byte("address"), Value: []byte("...")},
cmn.KVPair{Key: []byte("amount"), Value: []byte("...")},
cmn.KVPair{Key: []byte("balance"), Value: []byte("...")},
},
},
{
Type: "validator.provisions",
Attributes: cmn.KVPairs{
cmn.KVPair{Key: []byte("address"), Value: []byte("...")},
cmn.KVPair{Key: []byte("amount"), Value: []byte("...")},
cmn.KVPair{Key: []byte("balance"), Value: []byte("...")},
},
},
{
Type: "validator.slashed",
Attributes: cmn.KVPairs{
cmn.KVPair{Key: []byte("address"), Value: []byte("...")},
cmn.KVPair{Key: []byte("amount"), Value: []byte("...")},
cmn.KVPair{Key: []byte("reason"), Value: []byte("...")},
},
},
// ...
},
}
```
## Determinism ## Determinism


+ 1
- 1
libs/pubsub/example_test.go View File

@ -21,7 +21,7 @@ func TestExample(t *testing.T) {
ctx := context.Background() ctx := context.Background()
subscription, err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'")) subscription, err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"))
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Tombstone", map[string]string{"abci.account.name": "John"})
err = s.PublishWithEvents(ctx, "Tombstone", map[string][]string{"abci.account.name": {"John"}})
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Tombstone", subscription.Out()) assertReceive(t, "Tombstone", subscription.Out())
} }

+ 21
- 16
libs/pubsub/pubsub.go View File

@ -26,7 +26,7 @@
// for { // for {
// select { // select {
// case msg <- subscription.Out(): // case msg <- subscription.Out():
// // handle msg.Data() and msg.Tags()
// // handle msg.Data() and msg.Events()
// case <-subscription.Cancelled(): // case <-subscription.Cancelled():
// return subscription.Err() // return subscription.Err()
// } // }
@ -61,9 +61,14 @@ var (
ErrAlreadySubscribed = errors.New("already subscribed") ErrAlreadySubscribed = errors.New("already subscribed")
) )
// Query defines an interface for a query to be used for subscribing.
// Query defines an interface for a query to be used for subscribing. A query
// matches against a map of events. Each key in this map is a composite of the
// even type and an attribute key (e.g. "{eventType}.{eventAttrKey}") and the
// values are the event values that are contained under that relationship. This
// allows event types to repeat themselves with the same set of keys and
// different values.
type Query interface { type Query interface {
Matches(tags map[string]string) bool
Matches(events map[string][]string) bool
String() string String() string
} }
@ -76,12 +81,12 @@ type cmd struct {
clientID string clientID string
// publish // publish
msg interface{}
tags map[string]string
msg interface{}
events map[string][]string
} }
// Server allows clients to subscribe/unsubscribe for messages, publishing // Server allows clients to subscribe/unsubscribe for messages, publishing
// messages with or without tags, and manages internal state.
// messages with or without events, and manages internal state.
type Server struct { type Server struct {
cmn.BaseService cmn.BaseService
@ -258,15 +263,15 @@ func (s *Server) NumClientSubscriptions(clientID string) int {
// Publish publishes the given message. An error will be returned to the caller // Publish publishes the given message. An error will be returned to the caller
// if the context is canceled. // if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error { func (s *Server) Publish(ctx context.Context, msg interface{}) error {
return s.PublishWithTags(ctx, msg, make(map[string]string))
return s.PublishWithEvents(ctx, msg, make(map[string][]string))
} }
// PublishWithTags publishes the given message with the set of tags. The set is
// matched with clients queries. If there is a match, the message is sent to
// PublishWithEvents publishes the given message with the set of events. The set
// is matched with clients queries. If there is a match, the message is sent to
// the client. // the client.
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]string) error {
func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events map[string][]string) error {
select { select {
case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
case s.cmds <- cmd{op: pub, msg: msg, events: events}:
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
@ -325,7 +330,7 @@ loop:
case sub: case sub:
state.add(cmd.clientID, cmd.query, cmd.subscription) state.add(cmd.clientID, cmd.query, cmd.subscription)
case pub: case pub:
state.send(cmd.msg, cmd.tags)
state.send(cmd.msg, cmd.events)
} }
} }
} }
@ -392,18 +397,18 @@ func (state *state) removeAll(reason error) {
} }
} }
func (state *state) send(msg interface{}, tags map[string]string) {
func (state *state) send(msg interface{}, events map[string][]string) {
for qStr, clientSubscriptions := range state.subscriptions { for qStr, clientSubscriptions := range state.subscriptions {
q := state.queries[qStr].q q := state.queries[qStr].q
if q.Matches(tags) {
if q.Matches(events) {
for clientID, subscription := range clientSubscriptions { for clientID, subscription := range clientSubscriptions {
if cap(subscription.out) == 0 { if cap(subscription.out) == 0 {
// block on unbuffered channel // block on unbuffered channel
subscription.out <- Message{msg, tags}
subscription.out <- NewMessage(msg, events)
} else { } else {
// don't block on buffered channels // don't block on buffered channels
select { select {
case subscription.out <- Message{msg, tags}:
case subscription.out <- NewMessage(msg, events):
default: default:
state.remove(clientID, qStr, ErrOutOfCapacity) state.remove(clientID, qStr, ErrOutOfCapacity)
} }


+ 58
- 7
libs/pubsub/pubsub_test.go View File

@ -136,24 +136,75 @@ func TestDifferentClients(t *testing.T) {
ctx := context.Background() ctx := context.Background()
subscription1, err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'")) subscription1, err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Iceman", map[string]string{"tm.events.type": "NewBlock"})
err = s.PublishWithEvents(ctx, "Iceman", map[string][]string{"tm.events.type": {"NewBlock"}})
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Iceman", subscription1.Out()) assertReceive(t, "Iceman", subscription1.Out())
subscription2, err := s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'")) subscription2, err := s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"))
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Ultimo", map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})
err = s.PublishWithEvents(ctx, "Ultimo", map[string][]string{"tm.events.type": {"NewBlock"}, "abci.account.name": {"Igor"}})
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Ultimo", subscription1.Out()) assertReceive(t, "Ultimo", subscription1.Out())
assertReceive(t, "Ultimo", subscription2.Out()) assertReceive(t, "Ultimo", subscription2.Out())
subscription3, err := s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10")) subscription3, err := s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"))
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Valeria Richards", map[string]string{"tm.events.type": "NewRoundStep"})
err = s.PublishWithEvents(ctx, "Valeria Richards", map[string][]string{"tm.events.type": {"NewRoundStep"}})
require.NoError(t, err) require.NoError(t, err)
assert.Zero(t, len(subscription3.Out())) assert.Zero(t, len(subscription3.Out()))
} }
func TestSubscribeDuplicateKeys(t *testing.T) {
ctx := context.Background()
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
require.NoError(t, s.Start())
defer s.Stop()
testCases := []struct {
query string
expected interface{}
}{
{
"withdraw.rewards='17'",
"Iceman",
},
{
"withdraw.rewards='22'",
"Iceman",
},
{
"withdraw.rewards='1' AND withdraw.rewards='22'",
"Iceman",
},
{
"withdraw.rewards='100'",
nil,
},
}
for i, tc := range testCases {
sub, err := s.Subscribe(ctx, fmt.Sprintf("client-%d", i), query.MustParse(tc.query))
require.NoError(t, err)
err = s.PublishWithEvents(
ctx,
"Iceman",
map[string][]string{
"transfer.sender": {"foo", "bar", "baz"},
"withdraw.rewards": {"1", "17", "22"},
},
)
require.NoError(t, err)
if tc.expected != nil {
assertReceive(t, tc.expected, sub.Out())
} else {
require.Zero(t, len(sub.Out()))
}
}
}
func TestClientSubscribesTwice(t *testing.T) { func TestClientSubscribesTwice(t *testing.T) {
s := pubsub.NewServer() s := pubsub.NewServer()
s.SetLogger(log.TestingLogger()) s.SetLogger(log.TestingLogger())
@ -165,7 +216,7 @@ func TestClientSubscribesTwice(t *testing.T) {
subscription1, err := s.Subscribe(ctx, clientID, q) subscription1, err := s.Subscribe(ctx, clientID, q)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Goblin Queen", map[string]string{"tm.events.type": "NewBlock"})
err = s.PublishWithEvents(ctx, "Goblin Queen", map[string][]string{"tm.events.type": {"NewBlock"}})
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Goblin Queen", subscription1.Out()) assertReceive(t, "Goblin Queen", subscription1.Out())
@ -173,7 +224,7 @@ func TestClientSubscribesTwice(t *testing.T) {
require.Error(t, err) require.Error(t, err)
require.Nil(t, subscription2) require.Nil(t, subscription2)
err = s.PublishWithTags(ctx, "Spider-Man", map[string]string{"tm.events.type": "NewBlock"})
err = s.PublishWithEvents(ctx, "Spider-Man", map[string][]string{"tm.events.type": {"NewBlock"}})
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Spider-Man", subscription1.Out()) assertReceive(t, "Spider-Man", subscription1.Out())
} }
@ -312,7 +363,7 @@ func benchmarkNClients(n int, b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
s.PublishWithTags(ctx, "Gamora", map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": string(i)})
s.PublishWithEvents(ctx, "Gamora", map[string][]string{"abci.Account.Owner": {"Ivan"}, "abci.Invoices.Number": {string(i)}})
} }
} }
@ -343,7 +394,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
s.PublishWithTags(ctx, "Gamora", map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": "1"})
s.PublishWithEvents(ctx, "Gamora", map[string][]string{"abci.Account.Owner": {"Ivan"}, "abci.Invoices.Number": {"1"}})
} }
} }


+ 1
- 1
libs/pubsub/query/empty.go View File

@ -5,7 +5,7 @@ type Empty struct {
} }
// Matches always returns true. // Matches always returns true.
func (Empty) Matches(tags map[string]string) bool {
func (Empty) Matches(tags map[string][]string) bool {
return true return true
} }


+ 4
- 4
libs/pubsub/query/empty_test.go View File

@ -10,8 +10,8 @@ import (
func TestEmptyQueryMatchesAnything(t *testing.T) { func TestEmptyQueryMatchesAnything(t *testing.T) {
q := query.Empty{} q := query.Empty{}
assert.True(t, q.Matches(map[string]string{}))
assert.True(t, q.Matches(map[string]string{"Asher": "Roth"}))
assert.True(t, q.Matches(map[string]string{"Route": "66"}))
assert.True(t, q.Matches(map[string]string{"Route": "66", "Billy": "Blue"}))
assert.True(t, q.Matches(map[string][]string{}))
assert.True(t, q.Matches(map[string][]string{"Asher": {"Roth"}}))
assert.True(t, q.Matches(map[string][]string{"Route": {"66"}}))
assert.True(t, q.Matches(map[string][]string{"Route": {"66"}, "Billy": {"Blue"}}))
} }

+ 50
- 22
libs/pubsub/query/query.go View File

@ -148,12 +148,14 @@ func (q *Query) Conditions() []Condition {
return conditions return conditions
} }
// Matches returns true if the query matches the given set of tags, false otherwise.
// Matches returns true if the query matches against any event in the given set
// of events, false otherwise. For each event, a match exists if the query is
// matched against *any* value in a slice of values.
// //
// For example, query "name=John" matches tags = {"name": "John"}. More
// examples could be found in parser_test.go and query_test.go.
func (q *Query) Matches(tags map[string]string) bool {
if len(tags) == 0 {
// For example, query "name=John" matches events = {"name": ["John", "Eric"]}.
// More examples could be found in parser_test.go and query_test.go.
func (q *Query) Matches(events map[string][]string) bool {
if len(events) == 0 {
return false return false
} }
@ -162,7 +164,8 @@ func (q *Query) Matches(tags map[string]string) bool {
var tag string var tag string
var op Operator var op Operator
// tokens must be in the following order: tag ("tx.gas") -> operator ("=") -> operand ("7")
// tokens must be in the following order:
// tag ("tx.gas") -> operator ("=") -> operand ("7")
for _, token := range q.parser.Tokens() { for _, token := range q.parser.Tokens() {
switch token.pegRule { switch token.pegRule {
@ -188,7 +191,7 @@ func (q *Query) Matches(tags map[string]string) bool {
// see if the triplet (tag, operator, operand) matches any tag // see if the triplet (tag, operator, operand) matches any tag
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" } // "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
if !match(tag, op, reflect.ValueOf(valueWithoutSingleQuotes), tags) {
if !match(tag, op, reflect.ValueOf(valueWithoutSingleQuotes), events) {
return false return false
} }
case rulenumber: case rulenumber:
@ -198,7 +201,7 @@ func (q *Query) Matches(tags map[string]string) bool {
if err != nil { if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number)) panic(fmt.Sprintf("got %v while trying to parse %s as float64 (should never happen if the grammar is correct)", err, number))
} }
if !match(tag, op, reflect.ValueOf(value), tags) {
if !match(tag, op, reflect.ValueOf(value), events) {
return false return false
} }
} else { } else {
@ -206,7 +209,7 @@ func (q *Query) Matches(tags map[string]string) bool {
if err != nil { if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number)) panic(fmt.Sprintf("got %v while trying to parse %s as int64 (should never happen if the grammar is correct)", err, number))
} }
if !match(tag, op, reflect.ValueOf(value), tags) {
if !match(tag, op, reflect.ValueOf(value), events) {
return false return false
} }
} }
@ -215,7 +218,7 @@ func (q *Query) Matches(tags map[string]string) bool {
if err != nil { if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end])) panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / RFC3339 (should never happen if the grammar is correct)", err, buffer[begin:end]))
} }
if !match(tag, op, reflect.ValueOf(value), tags) {
if !match(tag, op, reflect.ValueOf(value), events) {
return false return false
} }
case ruledate: case ruledate:
@ -223,7 +226,7 @@ func (q *Query) Matches(tags map[string]string) bool {
if err != nil { if err != nil {
panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end])) panic(fmt.Sprintf("got %v while trying to parse %s as time.Time / '2006-01-02' (should never happen if the grammar is correct)", err, buffer[begin:end]))
} }
if !match(tag, op, reflect.ValueOf(value), tags) {
if !match(tag, op, reflect.ValueOf(value), events) {
return false return false
} }
} }
@ -232,34 +235,53 @@ func (q *Query) Matches(tags map[string]string) bool {
return true return true
} }
// match returns true if the given triplet (tag, operator, operand) matches any tag.
// match returns true if the given triplet (tag, operator, operand) matches any
// value in an event for that key.
// //
// First, it looks up the tag in tags and if it finds one, tries to compare the
// value from it to the operand using the operator.
// First, it looks up the key in the events and if it finds one, tries to compare
// all the values from it to the operand using the operator.
// //
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
func match(tag string, op Operator, operand reflect.Value, tags map[string]string) bool {
// "tx.gas", "=", "7", {"tx": [{"gas": 7, "ID": "4AE393495334"}]}
func match(tag string, op Operator, operand reflect.Value, events map[string][]string) bool {
// look up the tag from the query in tags // look up the tag from the query in tags
value, ok := tags[tag]
values, ok := events[tag]
if !ok { if !ok {
return false return false
} }
for _, value := range values {
// return true if any value in the set of the event's values matches
if matchValue(value, op, operand) {
return true
}
}
return false
}
// matchValue will attempt to match a string value against an operation an
// operand. A boolean is returned representing the match result. It will panic
// if an error occurs or if the operand is invalid.
func matchValue(value string, op Operator, operand reflect.Value) bool {
switch operand.Kind() { switch operand.Kind() {
case reflect.Struct: // time case reflect.Struct: // time
operandAsTime := operand.Interface().(time.Time) operandAsTime := operand.Interface().(time.Time)
// try our best to convert value from tags to time.Time // try our best to convert value from tags to time.Time
var ( var (
v time.Time v time.Time
err error err error
) )
if strings.ContainsAny(value, "T") { if strings.ContainsAny(value, "T") {
v, err = time.Parse(TimeLayout, value) v, err = time.Parse(TimeLayout, value)
} else { } else {
v, err = time.Parse(DateLayout, value) v, err = time.Parse(DateLayout, value)
} }
if err != nil { if err != nil {
panic(fmt.Sprintf("Failed to convert value %v from tag to time.Time: %v", value, err))
panic(fmt.Sprintf("failed to convert value %v from tag to time.Time: %v", value, err))
} }
switch op { switch op {
case OpLessEqual: case OpLessEqual:
return v.Before(operandAsTime) || v.Equal(operandAsTime) return v.Before(operandAsTime) || v.Equal(operandAsTime)
@ -272,14 +294,17 @@ func match(tag string, op Operator, operand reflect.Value, tags map[string]strin
case OpEqual: case OpEqual:
return v.Equal(operandAsTime) return v.Equal(operandAsTime)
} }
case reflect.Float64: case reflect.Float64:
operandFloat64 := operand.Interface().(float64) operandFloat64 := operand.Interface().(float64)
var v float64 var v float64
// try our best to convert value from tags to float64 // try our best to convert value from tags to float64
v, err := strconv.ParseFloat(value, 64) v, err := strconv.ParseFloat(value, 64)
if err != nil { if err != nil {
panic(fmt.Sprintf("Failed to convert value %v from tag to float64: %v", value, err))
panic(fmt.Sprintf("failed to convert value %v from tag to float64: %v", value, err))
} }
switch op { switch op {
case OpLessEqual: case OpLessEqual:
return v <= operandFloat64 return v <= operandFloat64
@ -292,6 +317,7 @@ func match(tag string, op Operator, operand reflect.Value, tags map[string]strin
case OpEqual: case OpEqual:
return v == operandFloat64 return v == operandFloat64
} }
case reflect.Int64: case reflect.Int64:
operandInt := operand.Interface().(int64) operandInt := operand.Interface().(int64)
var v int64 var v int64
@ -299,7 +325,7 @@ func match(tag string, op Operator, operand reflect.Value, tags map[string]strin
if strings.ContainsAny(value, ".") { if strings.ContainsAny(value, ".") {
v1, err := strconv.ParseFloat(value, 64) v1, err := strconv.ParseFloat(value, 64)
if err != nil { if err != nil {
panic(fmt.Sprintf("Failed to convert value %v from tag to float64: %v", value, err))
panic(fmt.Sprintf("failed to convert value %v from tag to float64: %v", value, err))
} }
v = int64(v1) v = int64(v1)
} else { } else {
@ -307,7 +333,7 @@ func match(tag string, op Operator, operand reflect.Value, tags map[string]strin
// try our best to convert value from tags to int64 // try our best to convert value from tags to int64
v, err = strconv.ParseInt(value, 10, 64) v, err = strconv.ParseInt(value, 10, 64)
if err != nil { if err != nil {
panic(fmt.Sprintf("Failed to convert value %v from tag to int64: %v", value, err))
panic(fmt.Sprintf("failed to convert value %v from tag to int64: %v", value, err))
} }
} }
switch op { switch op {
@ -322,6 +348,7 @@ func match(tag string, op Operator, operand reflect.Value, tags map[string]strin
case OpEqual: case OpEqual:
return v == operandInt return v == operandInt
} }
case reflect.String: case reflect.String:
switch op { switch op {
case OpEqual: case OpEqual:
@ -329,8 +356,9 @@ func match(tag string, op Operator, operand reflect.Value, tags map[string]strin
case OpContains: case OpContains:
return strings.Contains(value, operand.String()) return strings.Contains(value, operand.String())
} }
default: default:
panic(fmt.Sprintf("Unknown kind of operand %v", operand.Kind()))
panic(fmt.Sprintf("unknown kind of operand %v", operand.Kind()))
} }
return false return false


+ 35
- 23
libs/pubsub/query/query_test.go View File

@ -19,30 +19,40 @@ func TestMatches(t *testing.T) {
testCases := []struct { testCases := []struct {
s string s string
tags map[string]string
events map[string][]string
err bool err bool
matches bool matches bool
}{ }{
{"tm.events.type='NewBlock'", map[string]string{"tm.events.type": "NewBlock"}, false, true},
{"tx.gas > 7", map[string]string{"tx.gas": "8"}, false, true},
{"tx.gas > 7 AND tx.gas < 9", map[string]string{"tx.gas": "8"}, false, true},
{"body.weight >= 3.5", map[string]string{"body.weight": "3.5"}, false, true},
{"account.balance < 1000.0", map[string]string{"account.balance": "900"}, false, true},
{"apples.kg <= 4", map[string]string{"apples.kg": "4.0"}, false, true},
{"body.weight >= 4.5", map[string]string{"body.weight": fmt.Sprintf("%v", float32(4.5))}, false, true},
{"oranges.kg < 4 AND watermellons.kg > 10", map[string]string{"oranges.kg": "3", "watermellons.kg": "12"}, false, true},
{"peaches.kg < 4", map[string]string{"peaches.kg": "5"}, false, false},
{"tx.date > DATE 2017-01-01", map[string]string{"tx.date": time.Now().Format(query.DateLayout)}, false, true},
{"tx.date = DATE 2017-01-01", map[string]string{"tx.date": txDate}, false, true},
{"tx.date = DATE 2018-01-01", map[string]string{"tx.date": txDate}, false, false},
{"tx.time >= TIME 2013-05-03T14:45:00Z", map[string]string{"tx.time": time.Now().Format(query.TimeLayout)}, false, true},
{"tx.time = TIME 2013-05-03T14:45:00Z", map[string]string{"tx.time": txTime}, false, false},
{"abci.owner.name CONTAINS 'Igor'", map[string]string{"abci.owner.name": "Igor,Ivan"}, false, true},
{"abci.owner.name CONTAINS 'Igor'", map[string]string{"abci.owner.name": "Pavel,Ivan"}, false, false},
{"tm.events.type='NewBlock'", map[string][]string{"tm.events.type": {"NewBlock"}}, false, true},
{"tx.gas > 7", map[string][]string{"tx.gas": {"8"}}, false, true},
{"tx.gas > 7 AND tx.gas < 9", map[string][]string{"tx.gas": {"8"}}, false, true},
{"body.weight >= 3.5", map[string][]string{"body.weight": {"3.5"}}, false, true},
{"account.balance < 1000.0", map[string][]string{"account.balance": {"900"}}, false, true},
{"apples.kg <= 4", map[string][]string{"apples.kg": {"4.0"}}, false, true},
{"body.weight >= 4.5", map[string][]string{"body.weight": {fmt.Sprintf("%v", float32(4.5))}}, false, true},
{"oranges.kg < 4 AND watermellons.kg > 10", map[string][]string{"oranges.kg": {"3"}, "watermellons.kg": {"12"}}, false, true},
{"peaches.kg < 4", map[string][]string{"peaches.kg": {"5"}}, false, false},
{"tx.date > DATE 2017-01-01", map[string][]string{"tx.date": {time.Now().Format(query.DateLayout)}}, false, true},
{"tx.date = DATE 2017-01-01", map[string][]string{"tx.date": {txDate}}, false, true},
{"tx.date = DATE 2018-01-01", map[string][]string{"tx.date": {txDate}}, false, false},
{"tx.time >= TIME 2013-05-03T14:45:00Z", map[string][]string{"tx.time": {time.Now().Format(query.TimeLayout)}}, false, true},
{"tx.time = TIME 2013-05-03T14:45:00Z", map[string][]string{"tx.time": {txTime}}, false, false},
{"abci.owner.name CONTAINS 'Igor'", map[string][]string{"abci.owner.name": {"Igor,Ivan"}}, false, true},
{"abci.owner.name CONTAINS 'Igor'", map[string][]string{"abci.owner.name": {"Pavel,Ivan"}}, false, false},
{"abci.owner.name = 'Igor'", map[string][]string{"abci.owner.name": {"Igor", "Ivan"}}, false, true},
{"abci.owner.name = 'Ivan'", map[string][]string{"abci.owner.name": {"Igor", "Ivan"}}, false, true},
{"abci.owner.name = 'Ivan' AND abci.owner.name = 'Igor'", map[string][]string{"abci.owner.name": {"Igor", "Ivan"}}, false, true},
{"abci.owner.name = 'Ivan' AND abci.owner.name = 'John'", map[string][]string{"abci.owner.name": {"Igor", "Ivan"}}, false, false},
{"tm.events.type='NewBlock'", map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}}, false, true},
{"app.name = 'fuzzed'", map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}}, false, true},
{"tm.events.type='NewBlock' AND app.name = 'fuzzed'", map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}}, false, true},
{"tm.events.type='NewHeader' AND app.name = 'fuzzed'", map[string][]string{"tm.events.type": {"NewBlock"}, "app.name": {"fuzzed"}}, false, false},
} }
for _, tc := range testCases { for _, tc := range testCases {
@ -51,10 +61,12 @@ func TestMatches(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
} }
require.NotNil(t, q, "Query '%s' should not be nil", tc.s)
if tc.matches { if tc.matches {
assert.True(t, q.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags)
assert.True(t, q.Matches(tc.events), "Query '%s' should match %v", tc.s, tc.events)
} else { } else {
assert.False(t, q.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags)
assert.False(t, q.Matches(tc.events), "Query '%s' should not match %v", tc.s, tc.events)
} }
} }
} }


+ 7
- 7
libs/pubsub/subscription.go View File

@ -70,12 +70,12 @@ func (s *Subscription) cancel(err error) {
// Message glues data and tags together. // Message glues data and tags together.
type Message struct { type Message struct {
data interface{}
tags map[string]string
data interface{}
events map[string][]string
} }
func NewMessage(data interface{}, tags map[string]string) Message {
return Message{data, tags}
func NewMessage(data interface{}, events map[string][]string) Message {
return Message{data, events}
} }
// Data returns an original data published. // Data returns an original data published.
@ -83,7 +83,7 @@ func (msg Message) Data() interface{} {
return msg.data return msg.data
} }
// Tags returns tags, which matched the client's query.
func (msg Message) Tags() map[string]string {
return msg.tags
// Events returns events, which matched the client's query.
func (msg Message) Events() map[string][]string {
return msg.events
} }

+ 1
- 1
rpc/client/localclient.go View File

@ -182,7 +182,7 @@ func (c *Local) eventsRoutine(sub types.Subscription, subscriber string, q tmpub
for { for {
select { select {
case msg := <-sub.Out(): case msg := <-sub.Out():
result := ctypes.ResultEvent{Query: q.String(), Data: msg.Data(), Tags: msg.Tags()}
result := ctypes.ResultEvent{Query: q.String(), Data: msg.Data(), Events: msg.Events()}
if cap(outc) == 0 { if cap(outc) == 0 {
outc <- result outc <- result
} else { } else {


+ 74
- 15
rpc/core/events.go View File

@ -22,26 +22,83 @@ import (
// string (escaped with single quotes), number, date or time. // string (escaped with single quotes), number, date or time.
// //
// Examples: // Examples:
// tm.event = 'NewBlock' # new blocks
// tm.event = 'CompleteProposal' # node got a complete proposal
// tm.event = 'NewBlock' # new blocks
// tm.event = 'CompleteProposal' # node got a complete proposal
// tm.event = 'Tx' AND tx.hash = 'XYZ' # single transaction // tm.event = 'Tx' AND tx.hash = 'XYZ' # single transaction
// tm.event = 'Tx' AND tx.height = 5 # all txs of the fifth block
// tx.height = 5 # all txs of the fifth block
// tm.event = 'Tx' AND tx.height = 5 # all txs of the fifth block
// tx.height = 5 # all txs of the fifth block
// //
// Tendermint provides a few predefined keys: tm.event, tx.hash and tx.height. // Tendermint provides a few predefined keys: tm.event, tx.hash and tx.height.
// Note for transactions, you can define additional keys by providing tags with
// Note for transactions, you can define additional keys by providing events with
// DeliverTx response. // DeliverTx response.
// //
// DeliverTx{
// Tags: []*KVPair{
// "agent.name": "K",
// }
// }
// import (
// abci "github.com/tendermint/tendermint/abci/types"
// "github.com/tendermint/tendermint/libs/pubsub/query"
// )
// //
// tm.event = 'Tx' AND agent.name = 'K'
// tm.event = 'Tx' AND account.created_at >= TIME 2013-05-03T14:45:00Z
// tm.event = 'Tx' AND contract.sign_date = DATE 2017-01-01
// tm.event = 'Tx' AND account.owner CONTAINS 'Igor'
// abci.ResponseDeliverTx{
// Events: []abci.Event{
// {
// Type: "rewards.withdraw",
// Attributes: cmn.KVPairs{
// cmn.KVPair{Key: []byte("address"), Value: []byte("AddrA")},
// cmn.KVPair{Key: []byte("source"), Value: []byte("SrcX")},
// cmn.KVPair{Key: []byte("amount"), Value: []byte("...")},
// cmn.KVPair{Key: []byte("balance"), Value: []byte("...")},
// },
// },
// {
// Type: "rewards.withdraw",
// Attributes: cmn.KVPairs{
// cmn.KVPair{Key: []byte("address"), Value: []byte("AddrB")},
// cmn.KVPair{Key: []byte("source"), Value: []byte("SrcY")},
// cmn.KVPair{Key: []byte("amount"), Value: []byte("...")},
// cmn.KVPair{Key: []byte("balance"), Value: []byte("...")},
// },
// },
// {
// Type: "transfer",
// Attributes: cmn.KVPairs{
// cmn.KVPair{Key: []byte("sender"), Value: []byte("AddrC")},
// cmn.KVPair{Key: []byte("recipient"), Value: []byte("AddrD")},
// cmn.KVPair{Key: []byte("amount"), Value: []byte("...")},
// },
// },
// },
// }
//
// All events are indexed by a composite key of the form {eventType}.{evenAttrKey}.
// In the above examples, the following keys would be indexed:
// - rewards.withdraw.address
// - rewards.withdraw.source
// - rewards.withdraw.amount
// - rewards.withdraw.balance
// - transfer.sender
// - transfer.recipient
// - transfer.amount
//
// Multiple event types with duplicate keys are allowed and are meant to
// categorize unique and distinct events. In the above example, all events
// indexed under the key `rewards.withdraw.address` will have the following
// values stored and queryable:
//
// - AddrA
// - AddrB
//
// To create a query for txs where address AddrA withdrew rewards:
// query.MustParse("tm.event = 'Tx' AND rewards.withdraw.address = 'AddrA'")
//
// To create a query for txs where address AddrA withdrew rewards from source Y:
// query.MustParse("tm.event = 'Tx' AND rewards.withdraw.address = 'AddrA' AND rewards.withdraw.source = 'Y'")
//
// To create a query for txs where AddrA transferred funds:
// query.MustParse("tm.event = 'Tx' AND transfer.sender = 'AddrA'")
//
// The following queries would return no results:
// query.MustParse("tm.event = 'Tx' AND transfer.sender = 'AddrZ'")
// query.MustParse("tm.event = 'Tx' AND rewards.withdraw.address = 'AddrZ'")
// query.MustParse("tm.event = 'Tx' AND rewards.withdraw.source = 'W'")
// //
// See list of all possible events here // See list of all possible events here
// https://godoc.org/github.com/tendermint/tendermint/types#pkg-constants // https://godoc.org/github.com/tendermint/tendermint/types#pkg-constants
@ -106,8 +163,10 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to parse query") return nil, errors.Wrap(err, "failed to parse query")
} }
subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout) subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout)
defer cancel() defer cancel()
sub, err := eventBus.Subscribe(subCtx, addr, q) sub, err := eventBus.Subscribe(subCtx, addr, q)
if err != nil { if err != nil {
return nil, err return nil, err
@ -117,7 +176,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
for { for {
select { select {
case msg := <-sub.Out(): case msg := <-sub.Out():
resultEvent := &ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}
resultEvent := &ctypes.ResultEvent{Query: query, Data: msg.Data(), Events: msg.Events()}
ctx.WSConn.TryWriteRPCResponse( ctx.WSConn.TryWriteRPCResponse(
rpctypes.NewRPCSuccessResponse( rpctypes.NewRPCSuccessResponse(
ctx.WSConn.Codec(), ctx.WSConn.Codec(),


+ 3
- 3
rpc/core/types/responses.go View File

@ -205,7 +205,7 @@ type (
// Event data from a subscription // Event data from a subscription
type ResultEvent struct { type ResultEvent struct {
Query string `json:"query"`
Data types.TMEventData `json:"data"`
Tags map[string]string `json:"tags"`
Query string `json:"query"`
Data types.TMEventData `json:"data"`
Events map[string][]string `json:"events"`
} }

+ 1
- 2
state/execution_test.go View File

@ -13,7 +13,6 @@ import (
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/secp256k1" "github.com/tendermint/tendermint/crypto/secp256k1"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db" dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/mock" "github.com/tendermint/tendermint/mock"
@ -455,7 +454,7 @@ func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
} }
func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx { func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx {
return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}}
return abci.ResponseDeliverTx{Events: []abci.Event{}}
} }
func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx { func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx {


+ 9
- 7
state/state_test.go View File

@ -93,8 +93,8 @@ func TestABCIResponsesSaveLoad1(t *testing.T) {
// Build mock responses. // Build mock responses.
block := makeBlock(state, 2) block := makeBlock(state, 2)
abciResponses := NewABCIResponses(block) abciResponses := NewABCIResponses(block)
abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo"), Tags: nil}
abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok", Tags: nil}
abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo"), Events: nil}
abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok", Events: nil}
abciResponses.EndBlock = &abci.ResponseEndBlock{ValidatorUpdates: []abci.ValidatorUpdate{ abciResponses.EndBlock = &abci.ResponseEndBlock{ValidatorUpdates: []abci.ValidatorUpdate{
types.TM2PB.NewValidatorUpdate(ed25519.GenPrivKey().PubKey(), 10), types.TM2PB.NewValidatorUpdate(ed25519.GenPrivKey().PubKey(), 10),
}} }}
@ -134,11 +134,13 @@ func TestABCIResponsesSaveLoad2(t *testing.T) {
2: { 2: {
[]*abci.ResponseDeliverTx{ []*abci.ResponseDeliverTx{
{Code: 383}, {Code: 383},
{Data: []byte("Gotcha!"),
Tags: []cmn.KVPair{
{Key: []byte("a"), Value: []byte("1")},
{Key: []byte("build"), Value: []byte("stuff")},
}},
{
Data: []byte("Gotcha!"),
Events: []abci.Event{
{Type: "type1", Attributes: []cmn.KVPair{{Key: []byte("a"), Value: []byte("1")}}},
{Type: "type2", Attributes: []cmn.KVPair{{Key: []byte("build"), Value: []byte("stuff")}}},
},
},
}, },
types.ABCIResults{ types.ABCIResults{
{383, nil}, {383, nil},


+ 39
- 20
state/txindex/kv/kv.go View File

@ -10,9 +10,9 @@ import (
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db" dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -75,7 +75,10 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
return txResult, nil return txResult, nil
} }
// AddBatch indexes a batch of transactions using the given list of tags.
// AddBatch indexes a batch of transactions using the given list of events. Each
// key that indexed from the tx's events is a composite of the event type and
// the respective attribute's key delimited by a "." (eg. "account.number").
// Any event with an empty type is not indexed.
func (txi *TxIndex) AddBatch(b *txindex.Batch) error { func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
storeBatch := txi.store.NewBatch() storeBatch := txi.store.NewBatch()
defer storeBatch.Close() defer storeBatch.Close()
@ -83,12 +86,8 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
for _, result := range b.Ops { for _, result := range b.Ops {
hash := result.Tx.Hash() hash := result.Tx.Hash()
// index tx by tags
for _, tag := range result.Result.Tags {
if txi.indexAllTags || cmn.StringInSlice(string(tag.Key), txi.tagsToIndex) {
storeBatch.Set(keyForTag(tag, result), hash)
}
}
// index tx by events
txi.indexEvents(result, hash, storeBatch)
// index tx by height // index tx by height
if txi.indexAllTags || cmn.StringInSlice(types.TxHeightKey, txi.tagsToIndex) { if txi.indexAllTags || cmn.StringInSlice(types.TxHeightKey, txi.tagsToIndex) {
@ -107,19 +106,18 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
return nil return nil
} }
// Index indexes a single transaction using the given list of tags.
// Index indexes a single transaction using the given list of events. Each key
// that indexed from the tx's events is a composite of the event type and the
// respective attribute's key delimited by a "." (eg. "account.number").
// Any event with an empty type is not indexed.
func (txi *TxIndex) Index(result *types.TxResult) error { func (txi *TxIndex) Index(result *types.TxResult) error {
b := txi.store.NewBatch() b := txi.store.NewBatch()
defer b.Close() defer b.Close()
hash := result.Tx.Hash() hash := result.Tx.Hash()
// index tx by tags
for _, tag := range result.Result.Tags {
if txi.indexAllTags || cmn.StringInSlice(string(tag.Key), txi.tagsToIndex) {
b.Set(keyForTag(tag, result), hash)
}
}
// index tx by events
txi.indexEvents(result, hash, b)
// index tx by height // index tx by height
if txi.indexAllTags || cmn.StringInSlice(types.TxHeightKey, txi.tagsToIndex) { if txi.indexAllTags || cmn.StringInSlice(types.TxHeightKey, txi.tagsToIndex) {
@ -131,12 +129,33 @@ func (txi *TxIndex) Index(result *types.TxResult) error {
if err != nil { if err != nil {
return err return err
} }
b.Set(hash, rawBytes)
b.Set(hash, rawBytes)
b.Write() b.Write()
return nil return nil
} }
func (txi *TxIndex) indexEvents(result *types.TxResult, hash []byte, store dbm.SetDeleter) {
for _, event := range result.Result.Events {
// only index events with a non-empty type
if len(event.Type) == 0 {
continue
}
for _, attr := range event.Attributes {
if len(attr.Key) == 0 {
continue
}
compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
if txi.indexAllTags || cmn.StringInSlice(compositeTag, txi.tagsToIndex) {
store.Set(keyForEvent(compositeTag, attr.Value, result), hash)
}
}
}
}
// Search performs a search using the given query. It breaks the query into // Search performs a search using the given query. It breaks the query into
// conditions (like "tx.height > 5"). For each condition, it queries the DB // conditions (like "tx.height > 5"). For each condition, it queries the DB
// index. One special use cases here: (1) if "tx.hash" is found, it returns tx // index. One special use cases here: (1) if "tx.hash" is found, it returns tx
@ -343,7 +362,7 @@ func (txi *TxIndex) match(c query.Condition, startKeyBz []byte) (hashes [][]byte
} }
} else if c.Op == query.OpContains { } else if c.Op == query.OpContains {
// XXX: startKey does not apply here. // XXX: startKey does not apply here.
// For example, if startKey = "account.owner/an/" and search query = "accoutn.owner CONTAINS an"
// For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an"
// we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/" // we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/"
it := dbm.IteratePrefix(txi.store, startKey(c.Tag)) it := dbm.IteratePrefix(txi.store, startKey(c.Tag))
defer it.Close() defer it.Close()
@ -420,10 +439,10 @@ func extractValueFromKey(key []byte) string {
return parts[1] return parts[1]
} }
func keyForTag(tag cmn.KVPair, result *types.TxResult) []byte {
func keyForEvent(key string, value []byte, result *types.TxResult) []byte {
return []byte(fmt.Sprintf("%s/%s/%d/%d", return []byte(fmt.Sprintf("%s/%s/%d/%d",
tag.Key,
tag.Value,
key,
value,
result.Height, result.Height,
result.Index, result.Index,
)) ))


+ 115
- 29
state/txindex/kv/kv_test.go View File

@ -21,7 +21,15 @@ func TestTxIndex(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB()) indexer := NewTxIndex(db.NewMemDB())
tx := types.Tx("HELLO WORLD") tx := types.Tx("HELLO WORLD")
txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeTypeOK, Log: "", Tags: nil}}
txResult := &types.TxResult{
Height: 1,
Index: 0,
Tx: tx,
Result: abci.ResponseDeliverTx{
Data: []byte{0},
Code: abci.CodeTypeOK, Log: "", Events: nil,
},
}
hash := tx.Hash() hash := tx.Hash()
batch := txindex.NewBatch(1) batch := txindex.NewBatch(1)
@ -36,7 +44,15 @@ func TestTxIndex(t *testing.T) {
assert.Equal(t, txResult, loadedTxResult) assert.Equal(t, txResult, loadedTxResult)
tx2 := types.Tx("BYE BYE WORLD") tx2 := types.Tx("BYE BYE WORLD")
txResult2 := &types.TxResult{1, 0, tx2, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeTypeOK, Log: "", Tags: nil}}
txResult2 := &types.TxResult{
Height: 1,
Index: 0,
Tx: tx2,
Result: abci.ResponseDeliverTx{
Data: []byte{0},
Code: abci.CodeTypeOK, Log: "", Events: nil,
},
}
hash2 := tx2.Hash() hash2 := tx2.Hash()
err = indexer.Index(txResult2) err = indexer.Index(txResult2)
@ -51,10 +67,10 @@ func TestTxSearch(t *testing.T) {
allowedTags := []string{"account.number", "account.owner", "account.date"} allowedTags := []string{"account.number", "account.owner", "account.date"}
indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags)) indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags))
txResult := txResultWithTags([]cmn.KVPair{
{Key: []byte("account.number"), Value: []byte("1")},
{Key: []byte("account.owner"), Value: []byte("Ivan")},
{Key: []byte("not_allowed"), Value: []byte("Vlad")},
txResult := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []cmn.KVPair{{Key: []byte("number"), Value: []byte("1")}}},
{Type: "account", Attributes: []cmn.KVPair{{Key: []byte("owner"), Value: []byte("Ivan")}}},
{Type: "", Attributes: []cmn.KVPair{{Key: []byte("not_allowed"), Value: []byte("Vlad")}}},
}) })
hash := txResult.Tx.Hash() hash := txResult.Tx.Hash()
@ -108,13 +124,82 @@ func TestTxSearch(t *testing.T) {
} }
} }
func TestTxSearchDeprecatedIndexing(t *testing.T) {
allowedTags := []string{"account.number", "sender"}
indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags))
// index tx using events indexing (composite key)
txResult1 := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []cmn.KVPair{{Key: []byte("number"), Value: []byte("1")}}},
})
hash1 := txResult1.Tx.Hash()
err := indexer.Index(txResult1)
require.NoError(t, err)
// index tx also using deprecated indexing (tag as key)
txResult2 := txResultWithEvents(nil)
txResult2.Tx = types.Tx("HELLO WORLD 2")
hash2 := txResult2.Tx.Hash()
b := indexer.store.NewBatch()
rawBytes, err := cdc.MarshalBinaryBare(txResult2)
require.NoError(t, err)
depKey := []byte(fmt.Sprintf("%s/%s/%d/%d",
"sender",
"addr1",
txResult2.Height,
txResult2.Index,
))
b.Set(depKey, hash2)
b.Set(keyForHeight(txResult2), hash2)
b.Set(hash2, rawBytes)
b.Write()
testCases := []struct {
q string
results []*types.TxResult
}{
// search by hash
{fmt.Sprintf("tx.hash = '%X'", hash1), []*types.TxResult{txResult1}},
// search by hash
{fmt.Sprintf("tx.hash = '%X'", hash2), []*types.TxResult{txResult2}},
// search by exact match (one tag)
{"account.number = 1", []*types.TxResult{txResult1}},
{"account.number >= 1 AND account.number <= 5", []*types.TxResult{txResult1}},
// search by range (lower bound)
{"account.number >= 1", []*types.TxResult{txResult1}},
// search by range (upper bound)
{"account.number <= 5", []*types.TxResult{txResult1}},
// search using not allowed tag
{"not_allowed = 'boom'", []*types.TxResult{}},
// search for not existing tx result
{"account.number >= 2 AND account.number <= 5", []*types.TxResult{}},
// search using not existing tag
{"account.date >= TIME 2013-05-03T14:45:00Z", []*types.TxResult{}},
// search by deprecated tag
{"sender = 'addr1'", []*types.TxResult{txResult2}},
}
for _, tc := range testCases {
t.Run(tc.q, func(t *testing.T) {
results, err := indexer.Search(query.MustParse(tc.q))
require.NoError(t, err)
require.Equal(t, results, tc.results)
})
}
}
func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
allowedTags := []string{"account.number"} allowedTags := []string{"account.number"}
indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags)) indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags))
txResult := txResultWithTags([]cmn.KVPair{
{Key: []byte("account.number"), Value: []byte("1")},
{Key: []byte("account.number"), Value: []byte("2")},
txResult := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []cmn.KVPair{{Key: []byte("number"), Value: []byte("1")}}},
{Type: "account", Attributes: []cmn.KVPair{{Key: []byte("number"), Value: []byte("2")}}},
}) })
err := indexer.Index(txResult) err := indexer.Index(txResult)
@ -132,9 +217,10 @@ func TestTxSearchMultipleTxs(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags)) indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags))
// indexed first, but bigger height (to test the order of transactions) // indexed first, but bigger height (to test the order of transactions)
txResult := txResultWithTags([]cmn.KVPair{
{Key: []byte("account.number"), Value: []byte("1")},
txResult := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []cmn.KVPair{{Key: []byte("number"), Value: []byte("1")}}},
}) })
txResult.Tx = types.Tx("Bob's account") txResult.Tx = types.Tx("Bob's account")
txResult.Height = 2 txResult.Height = 2
txResult.Index = 1 txResult.Index = 1
@ -142,8 +228,8 @@ func TestTxSearchMultipleTxs(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// indexed second, but smaller height (to test the order of transactions) // indexed second, but smaller height (to test the order of transactions)
txResult2 := txResultWithTags([]cmn.KVPair{
{Key: []byte("account.number"), Value: []byte("2")},
txResult2 := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []cmn.KVPair{{Key: []byte("number"), Value: []byte("2")}}},
}) })
txResult2.Tx = types.Tx("Alice's account") txResult2.Tx = types.Tx("Alice's account")
txResult2.Height = 1 txResult2.Height = 1
@ -153,8 +239,8 @@ func TestTxSearchMultipleTxs(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// indexed third (to test the order of transactions) // indexed third (to test the order of transactions)
txResult3 := txResultWithTags([]cmn.KVPair{
{Key: []byte("account.number"), Value: []byte("3")},
txResult3 := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []cmn.KVPair{{Key: []byte("number"), Value: []byte("3")}}},
}) })
txResult3.Tx = types.Tx("Jack's account") txResult3.Tx = types.Tx("Jack's account")
txResult3.Height = 1 txResult3.Height = 1
@ -164,8 +250,8 @@ func TestTxSearchMultipleTxs(t *testing.T) {
// indexed fourth (to test we don't include txs with similar tags) // indexed fourth (to test we don't include txs with similar tags)
// https://github.com/tendermint/tendermint/issues/2908 // https://github.com/tendermint/tendermint/issues/2908
txResult4 := txResultWithTags([]cmn.KVPair{
{Key: []byte("account.number.id"), Value: []byte("1")},
txResult4 := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []cmn.KVPair{{Key: []byte("number.id"), Value: []byte("1")}}},
}) })
txResult4.Tx = types.Tx("Mike's account") txResult4.Tx = types.Tx("Mike's account")
txResult4.Height = 2 txResult4.Height = 2
@ -183,9 +269,9 @@ func TestTxSearchMultipleTxs(t *testing.T) {
func TestIndexAllTags(t *testing.T) { func TestIndexAllTags(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB(), IndexAllTags()) indexer := NewTxIndex(db.NewMemDB(), IndexAllTags())
txResult := txResultWithTags([]cmn.KVPair{
{Key: []byte("account.owner"), Value: []byte("Ivan")},
{Key: []byte("account.number"), Value: []byte("1")},
txResult := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []cmn.KVPair{{Key: []byte("owner"), Value: []byte("Ivan")}}},
{Type: "account", Attributes: []cmn.KVPair{{Key: []byte("number"), Value: []byte("1")}}},
}) })
err := indexer.Index(txResult) err := indexer.Index(txResult)
@ -202,17 +288,17 @@ func TestIndexAllTags(t *testing.T) {
assert.Equal(t, []*types.TxResult{txResult}, results) assert.Equal(t, []*types.TxResult{txResult}, results)
} }
func txResultWithTags(tags []cmn.KVPair) *types.TxResult {
func txResultWithEvents(events []abci.Event) *types.TxResult {
tx := types.Tx("HELLO WORLD") tx := types.Tx("HELLO WORLD")
return &types.TxResult{ return &types.TxResult{
Height: 1, Height: 1,
Index: 0, Index: 0,
Tx: tx, Tx: tx,
Result: abci.ResponseDeliverTx{ Result: abci.ResponseDeliverTx{
Data: []byte{0},
Code: abci.CodeTypeOK,
Log: "",
Tags: tags,
Data: []byte{0},
Code: abci.CodeTypeOK,
Log: "",
Events: events,
}, },
} }
} }
@ -236,10 +322,10 @@ func benchmarkTxIndex(txsCount int64, b *testing.B) {
Index: txIndex, Index: txIndex,
Tx: tx, Tx: tx,
Result: abci.ResponseDeliverTx{ Result: abci.ResponseDeliverTx{
Data: []byte{0},
Code: abci.CodeTypeOK,
Log: "",
Tags: []cmn.KVPair{},
Data: []byte{0},
Code: abci.CodeTypeOK,
Log: "",
Events: []abci.Event{},
}, },
} }
if err := batch.Add(txResult); err != nil { if err := batch.Add(txResult); err != nil {


+ 39
- 40
types/event_bus.go View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/tendermint/tendermint/abci/types"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
@ -90,20 +91,32 @@ func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error
func (b *EventBus) Publish(eventType string, eventData TMEventData) error { func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
// no explicit deadline for publishing events // no explicit deadline for publishing events
ctx := context.Background() ctx := context.Background()
b.pubsub.PublishWithTags(ctx, eventData, map[string]string{EventTypeKey: eventType})
return nil
}
func (b *EventBus) validateAndStringifyTags(tags []cmn.KVPair, logger log.Logger) map[string]string {
result := make(map[string]string)
for _, tag := range tags {
// basic validation
if len(tag.Key) == 0 {
logger.Debug("Got tag with an empty key (skipping)", "tag", tag)
return b.pubsub.PublishWithEvents(ctx, eventData, map[string][]string{EventTypeKey: {eventType}})
}
// validateAndStringifyEvents takes a slice of event objects and creates a
// map of stringified events where each key is composed of the event
// type and each of the event's attributes keys in the form of
// "{event.Type}.{attribute.Key}" and the value is each attribute's value.
func (b *EventBus) validateAndStringifyEvents(events []types.Event, logger log.Logger) map[string][]string {
result := make(map[string][]string)
for _, event := range events {
if len(event.Type) == 0 {
logger.Debug("Got an event with an empty type (skipping)", "event", event)
continue continue
} }
result[string(tag.Key)] = string(tag.Value)
for _, attr := range event.Attributes {
if len(attr.Key) == 0 {
logger.Debug("Got an event attribute with an empty key(skipping)", "event", event)
continue
}
compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
result[compositeTag] = append(result[compositeTag], string(attr.Value))
}
} }
return result return result
} }
@ -111,14 +124,13 @@ func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
// no explicit deadline for publishing events // no explicit deadline for publishing events
ctx := context.Background() ctx := context.Background()
resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
tags := b.validateAndStringifyTags(resultTags, b.Logger.With("block", data.Block.StringShort()))
resultEvents := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...)
events := b.validateAndStringifyEvents(resultEvents, b.Logger.With("block", data.Block.StringShort()))
// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlock
// add predefined new block event
events[EventTypeKey] = append(events[EventTypeKey], EventNewBlock)
b.pubsub.PublishWithTags(ctx, data, tags)
_ = b.pubsub.PublishWithEvents(ctx, data, events)
return nil return nil
} }
@ -126,16 +138,14 @@ func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) erro
// no explicit deadline for publishing events // no explicit deadline for publishing events
ctx := context.Background() ctx := context.Background()
resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
resultTags := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...)
// TODO: Create StringShort method for Header and use it in logger. // TODO: Create StringShort method for Header and use it in logger.
tags := b.validateAndStringifyTags(resultTags, b.Logger.With("header", data.Header))
events := b.validateAndStringifyEvents(resultTags, b.Logger.With("header", data.Header))
// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlockHeader
// add predefined new block header event
events[EventTypeKey] = append(events[EventTypeKey], EventNewBlockHeader)
b.pubsub.PublishWithTags(ctx, data, tags)
return nil
return b.pubsub.PublishWithEvents(ctx, data, events)
} }
func (b *EventBus) PublishEventVote(data EventDataVote) error { func (b *EventBus) PublishEventVote(data EventDataVote) error {
@ -153,19 +163,14 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error {
// no explicit deadline for publishing events // no explicit deadline for publishing events
ctx := context.Background() ctx := context.Background()
tags := b.validateAndStringifyTags(data.Result.Tags, b.Logger.With("tx", data.Tx))
events := b.validateAndStringifyEvents(data.Result.Events, b.Logger.With("tx", data.Tx))
// add predefined tags // add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventTx
logIfTagExists(TxHashKey, tags, b.Logger)
tags[TxHashKey] = fmt.Sprintf("%X", data.Tx.Hash())
logIfTagExists(TxHeightKey, tags, b.Logger)
tags[TxHeightKey] = fmt.Sprintf("%d", data.Height)
events[EventTypeKey] = append(events[EventTypeKey], EventTx)
events[TxHashKey] = append(events[TxHashKey], fmt.Sprintf("%X", data.Tx.Hash()))
events[TxHeightKey] = append(events[TxHeightKey], fmt.Sprintf("%d", data.Height))
b.pubsub.PublishWithTags(ctx, data, tags)
_ = b.pubsub.PublishWithEvents(ctx, data, events)
return nil return nil
} }
@ -209,12 +214,6 @@ func (b *EventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpd
return b.Publish(EventValidatorSetUpdates, data) return b.Publish(EventValidatorSetUpdates, data)
} }
func logIfTagExists(tag string, tags map[string]string, logger log.Logger) {
if value, ok := tags[tag]; ok {
logger.Error("Found predefined tag (value will be overwritten)", "tag", tag, "value", value)
}
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
type NopEventBus struct{} type NopEventBus struct{}


+ 129
- 8
types/event_bus_test.go View File

@ -22,10 +22,15 @@ func TestEventBusPublishEventTx(t *testing.T) {
defer eventBus.Stop() defer eventBus.Stop()
tx := Tx("foo") tx := Tx("foo")
result := abci.ResponseDeliverTx{Data: []byte("bar"), Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
result := abci.ResponseDeliverTx{
Data: []byte("bar"),
Events: []abci.Event{
{Type: "testType", Attributes: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}},
},
}
// PublishEventTx adds all these 3 tags, so the query below should work // PublishEventTx adds all these 3 tags, so the query below should work
query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND baz=1", tx.Hash())
query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND testType.baz=1", tx.Hash())
txsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query)) txsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err) require.NoError(t, err)
@ -62,11 +67,19 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
defer eventBus.Stop() defer eventBus.Stop()
block := MakeBlock(0, []Tx{}, nil, []Evidence{}) block := MakeBlock(0, []Tx{}, nil, []Evidence{})
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}
resultBeginBlock := abci.ResponseBeginBlock{
Events: []abci.Event{
{Type: "testType", Attributes: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}},
},
}
resultEndBlock := abci.ResponseEndBlock{
Events: []abci.Event{
{Type: "testType", Attributes: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}},
},
}
// PublishEventNewBlock adds the tm.event tag, so the query below should work // PublishEventNewBlock adds the tm.event tag, so the query below should work
query := "tm.event='NewBlock' AND baz=1 AND foz=2"
query := "tm.event='NewBlock' AND testType.baz=1 AND testType.foz=2"
blocksSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query)) blocksSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err) require.NoError(t, err)
@ -94,6 +107,106 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
} }
} }
func TestEventBusPublishEventTxDuplicateKeys(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()
tx := Tx("foo")
result := abci.ResponseDeliverTx{
Data: []byte("bar"),
Events: []abci.Event{
{
Type: "transfer",
Attributes: []cmn.KVPair{
{Key: []byte("sender"), Value: []byte("foo")},
{Key: []byte("recipient"), Value: []byte("bar")},
{Key: []byte("amount"), Value: []byte("5")},
},
},
{
Type: "transfer",
Attributes: []cmn.KVPair{
{Key: []byte("sender"), Value: []byte("baz")},
{Key: []byte("recipient"), Value: []byte("cat")},
{Key: []byte("amount"), Value: []byte("13")},
},
},
{
Type: "withdraw.rewards",
Attributes: []cmn.KVPair{
{Key: []byte("address"), Value: []byte("bar")},
{Key: []byte("source"), Value: []byte("iceman")},
{Key: []byte("amount"), Value: []byte("33")},
},
},
},
}
testCases := []struct {
query string
expectResults bool
}{
{
"tm.event='Tx' AND tx.height=1 AND transfer.sender='DoesNotExist'",
false,
},
{
"tm.event='Tx' AND tx.height=1 AND transfer.sender='foo'",
true,
},
{
"tm.event='Tx' AND tx.height=1 AND transfer.sender='baz'",
true,
},
{
"tm.event='Tx' AND tx.height=1 AND transfer.sender='foo' AND transfer.sender='baz'",
true,
},
{
"tm.event='Tx' AND tx.height=1 AND transfer.sender='foo' AND transfer.sender='DoesNotExist'",
false,
},
}
for i, tc := range testCases {
sub, err := eventBus.Subscribe(context.Background(), fmt.Sprintf("client-%d", i), tmquery.MustParse(tc.query))
require.NoError(t, err)
done := make(chan struct{})
go func() {
msg := <-sub.Out()
data := msg.Data().(EventDataTx)
assert.Equal(t, int64(1), data.Height)
assert.Equal(t, uint32(0), data.Index)
assert.Equal(t, tx, data.Tx)
assert.Equal(t, result, data.Result)
close(done)
}()
err = eventBus.PublishEventTx(EventDataTx{TxResult{
Height: 1,
Index: 0,
Tx: tx,
Result: result,
}})
assert.NoError(t, err)
select {
case <-done:
if !tc.expectResults {
require.Fail(t, "unexpected transaction result(s) from subscription")
}
case <-time.After(1 * time.Second):
if tc.expectResults {
require.Fail(t, "failed to receive a transaction after 1 second")
}
}
}
}
func TestEventBusPublishEventNewBlockHeader(t *testing.T) { func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
eventBus := NewEventBus() eventBus := NewEventBus()
err := eventBus.Start() err := eventBus.Start()
@ -101,11 +214,19 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
defer eventBus.Stop() defer eventBus.Stop()
block := MakeBlock(0, []Tx{}, nil, []Evidence{}) block := MakeBlock(0, []Tx{}, nil, []Evidence{})
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}
resultBeginBlock := abci.ResponseBeginBlock{
Events: []abci.Event{
{Type: "testType", Attributes: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}},
},
}
resultEndBlock := abci.ResponseEndBlock{
Events: []abci.Event{
{Type: "testType", Attributes: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}},
},
}
// PublishEventNewBlockHeader adds the tm.event tag, so the query below should work // PublishEventNewBlockHeader adds the tm.event tag, so the query below should work
query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2"
query := "tm.event='NewBlockHeader' AND testType.baz=1 AND testType.foz=2"
headersSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query)) headersSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err) require.NoError(t, err)


Loading…
Cancel
Save