diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 93dd5c5d4..04394bd52 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -23,6 +23,8 @@ program](https://hackerone.com/tendermint). ### FEATURES: +- [types] [\#1571](https://github.com/tendermint/tendermint/issues/1571) Enable subscription to tags emitted from `BeginBlock`/`EndBlock` (@kostko) + ### IMPROVEMENTS: - [config] \#2877 add blocktime_iota to the config.toml (@ackratos) diff --git a/docs/spec/abci/abci.md b/docs/spec/abci/abci.md index f057002ef..b9dc744de 100644 --- a/docs/spec/abci/abci.md +++ b/docs/spec/abci/abci.md @@ -45,7 +45,9 @@ include a `Tags` field in their `Response*`. Each tag is key-value pair denoting something about what happened during the methods execution. Tags can be used to index transactions and blocks according to what happened -during their execution. +during their execution. Note that the set of tags returned for a block from +`BeginBlock` and `EndBlock` are merged. In case both methods return the same +tag, only the value defined in `EndBlock` is used. Keys and values in tags must be UTF-8 encoded strings (e.g. "account.owner": "Bob", "balance": "100.0", diff --git a/state/execution.go b/state/execution.go index 9aa714ebd..b7c38f418 100644 --- a/state/execution.go +++ b/state/execution.go @@ -226,8 +226,9 @@ func execBlockOnProxyApp( commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB) - // Begin block. - _, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{ + // Begin block + var err error + abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{ Hash: block.Hash(), Header: types.TM2PB.Header(&block.Header), LastCommitInfo: commitInfo, @@ -417,8 +418,16 @@ func updateState( // Fire TxEvent for every tx. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again. func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) { - eventBus.PublishEventNewBlock(types.EventDataNewBlock{block}) - eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header}) + eventBus.PublishEventNewBlock(types.EventDataNewBlock{ + Block: block, + ResultBeginBlock: *abciResponses.BeginBlock, + ResultEndBlock: *abciResponses.EndBlock, + }) + eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{ + Header: block.Header, + ResultBeginBlock: *abciResponses.BeginBlock, + ResultEndBlock: *abciResponses.EndBlock, + }) for i, tx := range block.Data.Txs { eventBus.PublishEventTx(types.EventDataTx{types.TxResult{ diff --git a/state/store.go b/state/store.go index 0effe38a5..eb850fa7f 100644 --- a/state/store.go +++ b/state/store.go @@ -107,8 +107,9 @@ func saveState(db dbm.DB, state State, key []byte) { // of the various ABCI calls during block processing. // It is persisted to disk for each height before calling Commit. type ABCIResponses struct { - DeliverTx []*abci.ResponseDeliverTx - EndBlock *abci.ResponseEndBlock + DeliverTx []*abci.ResponseDeliverTx + EndBlock *abci.ResponseEndBlock + BeginBlock *abci.ResponseBeginBlock } // NewABCIResponses returns a new ABCIResponses diff --git a/tools/tm-monitor/monitor/node_test.go b/tools/tm-monitor/monitor/node_test.go index 10c2a13f1..0048e48fa 100644 --- a/tools/tm-monitor/monitor/node_test.go +++ b/tools/tm-monitor/monitor/node_test.go @@ -34,7 +34,7 @@ func TestNodeNewBlockReceived(t *testing.T) { n.SendBlocksTo(blockCh) blockHeader := tmtypes.Header{Height: 5} - emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlockHeader{blockHeader}) + emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlockHeader{Header: blockHeader}) assert.Equal(t, int64(5), n.Height) assert.Equal(t, blockHeader, <-blockCh) diff --git a/types/event_bus.go b/types/event_bus.go index fbe5ac478..d941e9aa9 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -71,12 +71,48 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error { return nil } +func (b *EventBus) validateAndStringifyTags(tags []cmn.KVPair, logger log.Logger) map[string]string { + result := make(map[string]string) + for _, tag := range tags { + // basic validation + if len(tag.Key) == 0 { + logger.Debug("Got tag with an empty key (skipping)", "tag", tag) + continue + } + result[string(tag.Key)] = string(tag.Value) + } + return result +} + func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error { - return b.Publish(EventNewBlock, data) + // no explicit deadline for publishing events + ctx := context.Background() + + resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...) + tags := b.validateAndStringifyTags(resultTags, b.Logger.With("block", data.Block.StringShort())) + + // add predefined tags + logIfTagExists(EventTypeKey, tags, b.Logger) + tags[EventTypeKey] = EventNewBlock + + b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags)) + return nil } func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error { - return b.Publish(EventNewBlockHeader, data) + // no explicit deadline for publishing events + ctx := context.Background() + + resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...) + // TODO: Create StringShort method for Header and use it in logger. + tags := b.validateAndStringifyTags(resultTags, b.Logger.With("header", data.Header)) + + // add predefined tags + logIfTagExists(EventTypeKey, tags, b.Logger) + tags[EventTypeKey] = EventNewBlockHeader + + b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags)) + return nil } func (b *EventBus) PublishEventVote(data EventDataVote) error { @@ -94,17 +130,7 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error { // no explicit deadline for publishing events ctx := context.Background() - tags := make(map[string]string) - - // validate and fill tags from tx result - for _, tag := range data.Result.Tags { - // basic validation - if len(tag.Key) == 0 { - b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", data.Tx) - continue - } - tags[string(tag.Key)] = string(tag.Value) - } + tags := b.validateAndStringifyTags(data.Result.Tags, b.Logger.With("tx", data.Tx)) // add predefined tags logIfTagExists(EventTypeKey, tags, b.Logger) diff --git a/types/event_bus_test.go b/types/event_bus_test.go index 4056dacd4..0af11ebd9 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -58,6 +58,90 @@ func TestEventBusPublishEventTx(t *testing.T) { } } +func TestEventBusPublishEventNewBlock(t *testing.T) { + eventBus := NewEventBus() + err := eventBus.Start() + require.NoError(t, err) + defer eventBus.Stop() + + block := MakeBlock(0, []Tx{}, nil, []Evidence{}) + resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}} + resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}} + + txEventsCh := make(chan interface{}) + + // PublishEventNewBlock adds the tm.event tag, so the query below should work + query := "tm.event='NewBlock' AND baz=1 AND foz=2" + err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh) + require.NoError(t, err) + + done := make(chan struct{}) + go func() { + for e := range txEventsCh { + edt := e.(EventDataNewBlock) + assert.Equal(t, block, edt.Block) + assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock) + assert.Equal(t, resultEndBlock, edt.ResultEndBlock) + close(done) + } + }() + + err = eventBus.PublishEventNewBlock(EventDataNewBlock{ + Block: block, + ResultBeginBlock: resultBeginBlock, + ResultEndBlock: resultEndBlock, + }) + assert.NoError(t, err) + + select { + case <-done: + case <-time.After(1 * time.Second): + t.Fatal("did not receive a block after 1 sec.") + } +} + +func TestEventBusPublishEventNewBlockHeader(t *testing.T) { + eventBus := NewEventBus() + err := eventBus.Start() + require.NoError(t, err) + defer eventBus.Stop() + + block := MakeBlock(0, []Tx{}, nil, []Evidence{}) + resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}} + resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}} + + txEventsCh := make(chan interface{}) + + // PublishEventNewBlockHeader adds the tm.event tag, so the query below should work + query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2" + err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh) + require.NoError(t, err) + + done := make(chan struct{}) + go func() { + for e := range txEventsCh { + edt := e.(EventDataNewBlockHeader) + assert.Equal(t, block.Header, edt.Header) + assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock) + assert.Equal(t, resultEndBlock, edt.ResultEndBlock) + close(done) + } + }() + + err = eventBus.PublishEventNewBlockHeader(EventDataNewBlockHeader{ + Header: block.Header, + ResultBeginBlock: resultBeginBlock, + ResultEndBlock: resultEndBlock, + }) + assert.NoError(t, err) + + select { + case <-done: + case <-time.After(1 * time.Second): + t.Fatal("did not receive a block header after 1 sec.") + } +} + func TestEventBusPublish(t *testing.T) { eventBus := NewEventBus() err := eventBus.Start() diff --git a/types/events.go b/types/events.go index c33b5978f..b22a1c8b8 100644 --- a/types/events.go +++ b/types/events.go @@ -4,6 +4,7 @@ import ( "fmt" amino "github.com/tendermint/go-amino" + abci "github.com/tendermint/tendermint/abci/types" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ) @@ -56,11 +57,17 @@ func RegisterEventDatas(cdc *amino.Codec) { type EventDataNewBlock struct { Block *Block `json:"block"` + + ResultBeginBlock abci.ResponseBeginBlock `json:"result_begin_block"` + ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"` } // light weight event for benchmarking type EventDataNewBlockHeader struct { Header Header `json:"header"` + + ResultBeginBlock abci.ResponseBeginBlock `json:"result_begin_block"` + ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"` } // All txs fire EventDataTx