From a52cdbfe435155d39b04b970850bb15f253fb227 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 9 Nov 2017 17:35:46 -0500 Subject: [PATCH] extract tags from DeliverTx/Result and send them along with predefined --- consensus/replay.go | 1 + glide.lock | 2 +- glide.yaml | 2 +- state/execution.go | 1 + state/state_test.go | 4 ++-- state/txindex/kv/kv_test.go | 4 ++-- types/event_bus.go | 14 +++++++++++++- types/events.go | 13 +++++++------ 8 files changed, 28 insertions(+), 13 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index fb1c49a10..38a5eef31 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -392,6 +392,7 @@ func (mock *mockProxyApp) DeliverTx(tx []byte) abci.Result { r.Code, r.Data, r.Log, + r.Tags, } } diff --git a/glide.lock b/glide.lock index e12ddb4e2..ccb747599 100644 --- a/glide.lock +++ b/glide.lock @@ -98,7 +98,7 @@ imports: - leveldb/table - leveldb/util - name: github.com/tendermint/abci - version: 76ef8a0697c6179220a74c479b36c27a5b53008a + version: 6b47155e08732f46dafdcef185d23f0ff9ff24a5 subpackages: - client - example/counter diff --git a/glide.yaml b/glide.yaml index 0f07dc2da..19485fb6a 100644 --- a/glide.yaml +++ b/glide.yaml @@ -18,7 +18,7 @@ import: - package: github.com/spf13/viper version: v1.0.0 - package: github.com/tendermint/abci - version: ~0.7.0 + version: 6b47155e08732f46dafdcef185d23f0ff9ff24a5 subpackages: - client - example/dummy diff --git a/state/execution.go b/state/execution.go index 6c74f7a9e..aa4cd9c89 100644 --- a/state/execution.go +++ b/state/execution.go @@ -75,6 +75,7 @@ func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn p Data: txResult.Data, Code: txResult.Code, Log: txResult.Log, + Tags: txResult.Tags, Error: txError, } txEventPublisher.PublishEventTx(event) diff --git a/state/state_test.go b/state/state_test.go index 7bb43afa2..b60f1546f 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -78,8 +78,8 @@ func TestABCIResponsesSaveLoad(t *testing.T) { // build mock responses block := makeBlock(2, state) abciResponses := NewABCIResponses(block) - abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo")} - abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok"} + abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo"), Tags: []*abci.KVPair{}} + abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok", Tags: []*abci.KVPair{}} abciResponses.EndBlock = abci.ResponseEndBlock{Diffs: []*abci.Validator{ { PubKey: crypto.GenPrivKeyEd25519().PubKey().Bytes(), diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index 673674b30..c0f1403ec 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -17,7 +17,7 @@ func TestTxIndex(t *testing.T) { indexer := &TxIndex{store: db.NewMemDB()} tx := types.Tx("HELLO WORLD") - txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} + txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}} hash := tx.Hash() batch := txindex.NewBatch(1) @@ -34,7 +34,7 @@ func TestTxIndex(t *testing.T) { func benchmarkTxIndex(txsCount int, b *testing.B) { tx := types.Tx("HELLO WORLD") - txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} + txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}} dir, err := ioutil.TempDir("", "tx_index_db") if err != nil { diff --git a/types/event_bus.go b/types/event_bus.go index 85ef14485..479ae7350 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -82,7 +82,19 @@ func (b *EventBus) PublishEventVote(vote EventDataVote) error { func (b *EventBus) PublishEventTx(tx EventDataTx) error { // no explicit deadline for publishing events ctx := context.Background() - b.pubsub.PublishWithTags(ctx, TMEventData{tx}, map[string]interface{}{EventTypeKey: EventTx, TxHashKey: fmt.Sprintf("%X", tx.Tx.Hash())}) + tags := make(map[string]interface{}) + for _, t := range tx.Tags { + // TODO [@melekes]: validate, but where? + if t.ValueString != "" { + tags[t.Key] = t.ValueString + } else { + tags[t.Key] = t.ValueInt + } + } + // predefined tags should come last + tags[EventTypeKey] = EventTx + tags[TxHashKey] = fmt.Sprintf("%X", tx.Tx.Hash()) + b.pubsub.PublishWithTags(ctx, TMEventData{tx}, tags) return nil } diff --git a/types/events.go b/types/events.go index 64b83ec95..c9de20af1 100644 --- a/types/events.go +++ b/types/events.go @@ -110,12 +110,13 @@ type EventDataNewBlockHeader struct { // All txs fire EventDataTx type EventDataTx struct { - Height int `json:"height"` - Tx Tx `json:"tx"` - Data data.Bytes `json:"data"` - Log string `json:"log"` - Code abci.CodeType `json:"code"` - Error string `json:"error"` // this is redundant information for now + Height int `json:"height"` + Tx Tx `json:"tx"` + Data data.Bytes `json:"data"` + Log string `json:"log"` + Code abci.CodeType `json:"code"` + Tags []*abci.KVPair `json:"tags"` + Error string `json:"error"` // this is redundant information for now } type EventDataProposalHeartbeat struct {