Browse Source

types: Emit tags from BeginBlock/EndBlock (#2747)

This commit makes both EventNewBlock and EventNewBlockHeader emit tags
on the event bus, so subscribers can use them in queries.
pull/2919/head
Jernej Kos 6 years ago
committed by Ethan Buchman
parent
commit
99b9c9bf60
8 changed files with 152 additions and 21 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +3
    -1
      docs/spec/abci/abci.md
  3. +13
    -4
      state/execution.go
  4. +3
    -2
      state/store.go
  5. +1
    -1
      tools/tm-monitor/monitor/node_test.go
  6. +39
    -13
      types/event_bus.go
  7. +84
    -0
      types/event_bus_test.go
  8. +7
    -0
      types/events.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -23,6 +23,8 @@ program](https://hackerone.com/tendermint).
### FEATURES: ### FEATURES:
- [types] [\#1571](https://github.com/tendermint/tendermint/issues/1571) Enable subscription to tags emitted from `BeginBlock`/`EndBlock` (@kostko)
### IMPROVEMENTS: ### IMPROVEMENTS:
- [config] \#2877 add blocktime_iota to the config.toml (@ackratos) - [config] \#2877 add blocktime_iota to the config.toml (@ackratos)


+ 3
- 1
docs/spec/abci/abci.md View File

@ -45,7 +45,9 @@ include a `Tags` field in their `Response*`. Each tag is key-value pair denoting
something about what happened during the methods execution. something about what happened during the methods execution.
Tags can be used to index transactions and blocks according to what happened Tags can be used to index transactions and blocks according to what happened
during their execution.
during their execution. Note that the set of tags returned for a block from
`BeginBlock` and `EndBlock` are merged. In case both methods return the same
tag, only the value defined in `EndBlock` is used.
Keys and values in tags must be UTF-8 encoded strings (e.g. Keys and values in tags must be UTF-8 encoded strings (e.g.
"account.owner": "Bob", "balance": "100.0", "account.owner": "Bob", "balance": "100.0",


+ 13
- 4
state/execution.go View File

@ -226,8 +226,9 @@ func execBlockOnProxyApp(
commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB) commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB)
// Begin block.
_, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
// Begin block
var err error
abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
Hash: block.Hash(), Hash: block.Hash(),
Header: types.TM2PB.Header(&block.Header), Header: types.TM2PB.Header(&block.Header),
LastCommitInfo: commitInfo, LastCommitInfo: commitInfo,
@ -417,8 +418,16 @@ func updateState(
// Fire TxEvent for every tx. // Fire TxEvent for every tx.
// NOTE: if Tendermint crashes before commit, some or all of these events may be published again. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) { func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
eventBus.PublishEventNewBlock(types.EventDataNewBlock{
Block: block,
ResultBeginBlock: *abciResponses.BeginBlock,
ResultEndBlock: *abciResponses.EndBlock,
})
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: block.Header,
ResultBeginBlock: *abciResponses.BeginBlock,
ResultEndBlock: *abciResponses.EndBlock,
})
for i, tx := range block.Data.Txs { for i, tx := range block.Data.Txs {
eventBus.PublishEventTx(types.EventDataTx{types.TxResult{ eventBus.PublishEventTx(types.EventDataTx{types.TxResult{


+ 3
- 2
state/store.go View File

@ -107,8 +107,9 @@ func saveState(db dbm.DB, state State, key []byte) {
// of the various ABCI calls during block processing. // of the various ABCI calls during block processing.
// It is persisted to disk for each height before calling Commit. // It is persisted to disk for each height before calling Commit.
type ABCIResponses struct { type ABCIResponses struct {
DeliverTx []*abci.ResponseDeliverTx
EndBlock *abci.ResponseEndBlock
DeliverTx []*abci.ResponseDeliverTx
EndBlock *abci.ResponseEndBlock
BeginBlock *abci.ResponseBeginBlock
} }
// NewABCIResponses returns a new ABCIResponses // NewABCIResponses returns a new ABCIResponses


+ 1
- 1
tools/tm-monitor/monitor/node_test.go View File

@ -34,7 +34,7 @@ func TestNodeNewBlockReceived(t *testing.T) {
n.SendBlocksTo(blockCh) n.SendBlocksTo(blockCh)
blockHeader := tmtypes.Header{Height: 5} blockHeader := tmtypes.Header{Height: 5}
emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlockHeader{blockHeader})
emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlockHeader{Header: blockHeader})
assert.Equal(t, int64(5), n.Height) assert.Equal(t, int64(5), n.Height)
assert.Equal(t, blockHeader, <-blockCh) assert.Equal(t, blockHeader, <-blockCh)


+ 39
- 13
types/event_bus.go View File

@ -71,12 +71,48 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
return nil return nil
} }
func (b *EventBus) validateAndStringifyTags(tags []cmn.KVPair, logger log.Logger) map[string]string {
result := make(map[string]string)
for _, tag := range tags {
// basic validation
if len(tag.Key) == 0 {
logger.Debug("Got tag with an empty key (skipping)", "tag", tag)
continue
}
result[string(tag.Key)] = string(tag.Value)
}
return result
}
func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error { func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
return b.Publish(EventNewBlock, data)
// no explicit deadline for publishing events
ctx := context.Background()
resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
tags := b.validateAndStringifyTags(resultTags, b.Logger.With("block", data.Block.StringShort()))
// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlock
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
return nil
} }
func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error { func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
return b.Publish(EventNewBlockHeader, data)
// no explicit deadline for publishing events
ctx := context.Background()
resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
// TODO: Create StringShort method for Header and use it in logger.
tags := b.validateAndStringifyTags(resultTags, b.Logger.With("header", data.Header))
// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlockHeader
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
return nil
} }
func (b *EventBus) PublishEventVote(data EventDataVote) error { func (b *EventBus) PublishEventVote(data EventDataVote) error {
@ -94,17 +130,7 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error {
// no explicit deadline for publishing events // no explicit deadline for publishing events
ctx := context.Background() ctx := context.Background()
tags := make(map[string]string)
// validate and fill tags from tx result
for _, tag := range data.Result.Tags {
// basic validation
if len(tag.Key) == 0 {
b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", data.Tx)
continue
}
tags[string(tag.Key)] = string(tag.Value)
}
tags := b.validateAndStringifyTags(data.Result.Tags, b.Logger.With("tx", data.Tx))
// add predefined tags // add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger) logIfTagExists(EventTypeKey, tags, b.Logger)


+ 84
- 0
types/event_bus_test.go View File

@ -58,6 +58,90 @@ func TestEventBusPublishEventTx(t *testing.T) {
} }
} }
func TestEventBusPublishEventNewBlock(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()
block := MakeBlock(0, []Tx{}, nil, []Evidence{})
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}
txEventsCh := make(chan interface{})
// PublishEventNewBlock adds the tm.event tag, so the query below should work
query := "tm.event='NewBlock' AND baz=1 AND foz=2"
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
require.NoError(t, err)
done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataNewBlock)
assert.Equal(t, block, edt.Block)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}
}()
err = eventBus.PublishEventNewBlock(EventDataNewBlock{
Block: block,
ResultBeginBlock: resultBeginBlock,
ResultEndBlock: resultEndBlock,
})
assert.NoError(t, err)
select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a block after 1 sec.")
}
}
func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
defer eventBus.Stop()
block := MakeBlock(0, []Tx{}, nil, []Evidence{})
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}
txEventsCh := make(chan interface{})
// PublishEventNewBlockHeader adds the tm.event tag, so the query below should work
query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2"
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
require.NoError(t, err)
done := make(chan struct{})
go func() {
for e := range txEventsCh {
edt := e.(EventDataNewBlockHeader)
assert.Equal(t, block.Header, edt.Header)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}
}()
err = eventBus.PublishEventNewBlockHeader(EventDataNewBlockHeader{
Header: block.Header,
ResultBeginBlock: resultBeginBlock,
ResultEndBlock: resultEndBlock,
})
assert.NoError(t, err)
select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a block header after 1 sec.")
}
}
func TestEventBusPublish(t *testing.T) { func TestEventBusPublish(t *testing.T) {
eventBus := NewEventBus() eventBus := NewEventBus()
err := eventBus.Start() err := eventBus.Start()


+ 7
- 0
types/events.go View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
amino "github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"
abci "github.com/tendermint/tendermint/abci/types"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query" tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
) )
@ -56,11 +57,17 @@ func RegisterEventDatas(cdc *amino.Codec) {
type EventDataNewBlock struct { type EventDataNewBlock struct {
Block *Block `json:"block"` Block *Block `json:"block"`
ResultBeginBlock abci.ResponseBeginBlock `json:"result_begin_block"`
ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
} }
// light weight event for benchmarking // light weight event for benchmarking
type EventDataNewBlockHeader struct { type EventDataNewBlockHeader struct {
Header Header `json:"header"` Header Header `json:"header"`
ResultBeginBlock abci.ResponseBeginBlock `json:"result_begin_block"`
ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
} }
// All txs fire EventDataTx // All txs fire EventDataTx


Loading…
Cancel
Save