Browse Source

rpc: implement the ADR 075 /events method (#7965)

This method implements the eventlog extension interface to expose ABCI metadata
to the log for query processing. Only the types that have ABCI events need to
implement this.

- Add an event log to the environment
- Add a sketch of the handler method
- Add an /events RPCFunc to the route map
- Implement query logic
- Subscribe to pubsub if confingured, handle termination
pull/7981/head
M. J. Fromberger 2 years ago
committed by GitHub
parent
commit
5662bd12a8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 303 additions and 1 deletions
  1. +38
    -0
      internal/rpc/core/env.go
  2. +131
    -0
      internal/rpc/core/events.go
  3. +6
    -1
      internal/rpc/core/routes.go
  4. +14
    -0
      light/proxy/routes.go
  5. +64
    -0
      rpc/coretypes/responses.go
  6. +34
    -0
      types/events.go
  7. +16
    -0
      types/events_test.go

+ 38
- 0
internal/rpc/core/env.go View File

@ -16,10 +16,12 @@ import (
"github.com/tendermint/tendermint/internal/blocksync"
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/eventlog"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/proxy"
tmpubsub "github.com/tendermint/tendermint/internal/pubsub"
"github.com/tendermint/tendermint/internal/pubsub/query"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/internal/statesync"
@ -93,6 +95,7 @@ type Environment struct {
GenDoc *types.GenesisDoc // cache the genesis structure
EventSinks []indexer.EventSink
EventBus *eventbus.EventBus // thread safe
EventLog *eventlog.Log
Mempool mempool.Mempool
StateSyncMetricer statesync.Metricer
@ -239,6 +242,41 @@ func (env *Environment) StartService(ctx context.Context, conf *config.Config) (
cfg.WriteTimeout = conf.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}
// If the event log is enabled, subscribe to all events published to the
// event bus, and forward them to the event log.
if lg := env.EventLog; lg != nil {
// TODO(creachadair): This is kind of a hack, ideally we'd share the
// observer with the indexer, but it's tricky to plumb them together.
// For now, use a "normal" subscription with a big buffer allowance.
// The event log should always be able to keep up.
const subscriberID = "event-log-subscriber"
sub, err := env.EventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{
ClientID: subscriberID,
Query: query.All,
Limit: 1 << 16, // essentially "no limit"
})
if err != nil {
return nil, fmt.Errorf("event log subscribe: %w", err)
}
go func() {
// N.B. Use background for unsubscribe, ctx is already terminated.
defer env.EventBus.UnsubscribeAll(context.Background(), subscriberID) // nolint:errcheck
for {
msg, err := sub.Next(ctx)
if err != nil {
env.Logger.Error("Subscription terminated", "err", err)
return
}
etype, ok := eventlog.FindType(msg.Events())
if ok {
_ = lg.Add(etype, msg.Data())
}
}
}()
env.Logger.Info("Event log subscription enabled")
}
// We may expose the RPC over both TCP and a Unix-domain socket.
listeners := make([]net.Listener, len(listenAddrs))
for i, listenAddr := range listenAddrs {


+ 131
- 0
internal/rpc/core/events.go View File

@ -6,6 +6,9 @@ import (
"fmt"
"time"
"github.com/tendermint/tendermint/internal/eventlog"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/internal/jsontypes"
tmpubsub "github.com/tendermint/tendermint/internal/pubsub"
tmquery "github.com/tendermint/tendermint/internal/pubsub/query"
"github.com/tendermint/tendermint/rpc/coretypes"
@ -126,3 +129,131 @@ func (env *Environment) UnsubscribeAll(ctx context.Context) (*coretypes.ResultUn
}
return &coretypes.ResultUnsubscribe{}, nil
}
// Events applies a query to the event log. If an event log is not enabled,
// Events reports an error. Otherwise, it filters the current contents of the
// log to return matching events.
//
// Events returns up to maxItems of the newest eligible event items. An item is
// eligible if it is older than before (or before is zero), it is newer than
// after (or after is zero), and its data matches the filter. A nil filter
// matches all event data.
//
// If before is zero and no eligible event items are available, Events waits
// for up to waitTime for a matching item to become available. The wait is
// terminated early if ctx ends.
//
// If maxItems ≤ 0, a default positive number of events is chosen. The values
// of maxItems and waitTime may be capped to sensible internal maxima without
// reporting an error to the caller.
func (env *Environment) Events(ctx context.Context,
filter *coretypes.EventFilter,
maxItems int,
before, after cursor.Cursor,
waitTime time.Duration,
) (*coretypes.ResultEvents, error) {
if env.EventLog == nil {
return nil, errors.New("the event log is not enabled")
}
// Parse and validate parameters.
if maxItems <= 0 {
maxItems = 10
} else if maxItems > 100 {
maxItems = 100
}
const maxWaitTime = 30 * time.Second
if waitTime > maxWaitTime {
waitTime = maxWaitTime
}
query := tmquery.All
if filter != nil && filter.Query != "" {
q, err := tmquery.New(filter.Query)
if err != nil {
return nil, fmt.Errorf("invalid filter query: %w", err)
}
query = q
}
var info eventlog.Info
var items []*eventlog.Item
var err error
accept := func(itm *eventlog.Item) error {
// N.B. We accept up to one item more than requested, so we can tell how
// to set the "more" flag in the response.
if len(items) > maxItems {
return eventlog.ErrStopScan
}
if cursorInRange(itm.Cursor, before, after) && query.Matches(itm.Events) {
items = append(items, itm)
}
return nil
}
if waitTime > 0 && before.IsZero() {
ctx, cancel := context.WithTimeout(ctx, waitTime)
defer cancel()
// Long poll. The loop here is because new items may not match the query,
// and we want to keep waiting until we have relevant results (or time out).
cur := after
for len(items) == 0 {
info, err = env.EventLog.WaitScan(ctx, cur, accept)
if err != nil {
// Don't report a timeout as a request failure.
if errors.Is(err, context.DeadlineExceeded) {
err = nil
}
break
}
cur = info.Newest
}
} else {
// Quick poll, return only what is already available.
info, err = env.EventLog.Scan(accept)
}
if err != nil {
return nil, err
}
more := len(items) > maxItems
if more {
items = items[:len(items)-1]
}
enc, err := marshalItems(items)
if err != nil {
return nil, err
}
return &coretypes.ResultEvents{
Items: enc,
More: more,
Oldest: cursorString(info.Oldest),
Newest: cursorString(info.Newest),
}, nil
}
func cursorString(c cursor.Cursor) string {
if c.IsZero() {
return ""
}
return c.String()
}
func cursorInRange(c, before, after cursor.Cursor) bool {
return (before.IsZero() || c.Before(before)) && (after.IsZero() || after.Before(c))
}
func marshalItems(items []*eventlog.Item) ([]*coretypes.EventItem, error) {
out := make([]*coretypes.EventItem, len(items))
for i, itm := range items {
v, err := jsontypes.Marshal(itm.Data)
if err != nil {
return nil, fmt.Errorf("encoding event data: %w", err)
}
out[i] = &coretypes.EventItem{Cursor: itm.Cursor.String(), Event: itm.Type}
out[i].Data = v
}
return out, nil
}

+ 6
- 1
internal/rpc/core/routes.go View File

@ -2,7 +2,9 @@ package core
import (
"context"
"time"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/rpc/coretypes"
rpc "github.com/tendermint/tendermint/rpc/jsonrpc/server"
@ -28,7 +30,9 @@ func NewRoutesMap(svc RPCService, opts *RouteOptions) RoutesMap {
opts = new(RouteOptions)
}
out := RoutesMap{
// subscribe/unsubscribe are reserved for websocket events.
// Event subscription. Note that subscribe, unsubscribe, and
// unsubscribe_all are only available via the websocket endpoint.
"events": rpc.NewRPCFunc(svc.Events, "filter", "maxItems", "before", "after", "waitTime"),
"subscribe": rpc.NewWSRPCFunc(svc.Subscribe, "query"),
"unsubscribe": rpc.NewWSRPCFunc(svc.Unsubscribe, "query"),
"unsubscribe_all": rpc.NewWSRPCFunc(svc.UnsubscribeAll),
@ -94,6 +98,7 @@ type RPCService interface {
Commit(ctx context.Context, heightPtr *int64) (*coretypes.ResultCommit, error)
ConsensusParams(ctx context.Context, heightPtr *int64) (*coretypes.ResultConsensusParams, error)
DumpConsensusState(ctx context.Context) (*coretypes.ResultDumpConsensusState, error)
Events(ctx context.Context, filter *coretypes.EventFilter, maxItems int, before, after cursor.Cursor, waitTime time.Duration) (*coretypes.ResultEvents, error)
Genesis(ctx context.Context) (*coretypes.ResultGenesis, error)
GenesisChunked(ctx context.Context, chunk uint) (*coretypes.ResultGenesisChunk, error)
GetConsensusState(ctx context.Context) (*coretypes.ResultConsensusState, error)


+ 14
- 0
light/proxy/routes.go View File

@ -2,7 +2,10 @@ package proxy
import (
"context"
"errors"
"time"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
lrpc "github.com/tendermint/tendermint/light/rpc"
rpcclient "github.com/tendermint/tendermint/rpc/client"
@ -27,6 +30,17 @@ func (p proxyService) GetConsensusState(ctx context.Context) (*coretypes.ResultC
return p.ConsensusState(ctx)
}
// TODO(creachadair): Remove this once the RPC clients support the new method.
// This is just a placeholder to let things build during development.
func (proxyService) Events(ctx context.Context,
filter *coretypes.EventFilter,
maxItems int,
before, after cursor.Cursor,
waitTime time.Duration,
) (*coretypes.ResultEvents, error) {
return nil, errors.New("the /events method is not implemented")
}
func (p proxyService) Subscribe(ctx context.Context, query string) (*coretypes.ResultSubscribe, error) {
return p.SubscribeWS(ctx, query)
}


+ 64
- 0
rpc/coretypes/responses.go View File

@ -356,3 +356,67 @@ type Evidence struct {
func (e Evidence) MarshalJSON() ([]byte, error) { return jsontypes.Marshal(e.Value) }
func (e *Evidence) UnmarshalJSON(data []byte) error { return jsontypes.Unmarshal(data, &e.Value) }
// RequestEvents is the argument for the "/events" RPC endpoint.
type RequestEvents struct {
// Optional filter spec. If nil or empty, all items are eligible.
Filter *EventFilter `json:"filter"`
// The maximum number of eligible items to return.
// If zero or negative, the server will report a default number.
MaxItems int `json:"max_items"`
// Return only items after this cursor. If empty, the limit is just
// before the the beginning of the event log.
After string `json:"after"`
// Return only items before this cursor. If empty, the limit is just
// after the head of the event log.
Before string `json:"before"`
// Wait for up to this long for events to be available.
WaitTime time.Duration `json:"wait_time"`
}
// An EventFilter specifies which events are selected by an /events request.
type EventFilter struct {
Query string `json:"query"`
}
// ResultEvents is the response from the "/events" RPC endpoint.
type ResultEvents struct {
// The items matching the request parameters, from newest
// to oldest, if any were available within the timeout.
Items []*EventItem `json:"items"`
// This is true if there is at least one older matching item
// available in the log that was not returned.
More bool `json:"more"`
// The cursor of the oldest item in the log at the time of this reply,
// or "" if the log is empty.
Oldest string `json:"oldest"`
// The cursor of the newest item in the log at the time of this reply,
// or "" if the log is empty.
Newest string `json:"newest"`
}
type EventItem struct {
// The cursor of this item.
Cursor string `json:"cursor"`
// The event label of this item (for example, "Vote").
Event string `json:"event,omitempty"`
// The encoded event data for this item. The content is a JSON object with
// the following structure:
//
// {
// "type": "type-tag",
// "value": <json-encoded-value>
// }
//
// The known type tags are defined by the tendermint/types package.
Data json.RawMessage `json:"data"`
}

+ 34
- 0
types/events.go View File

@ -92,7 +92,13 @@ var (
// ENCODING / DECODING
// EventData is satisfied by types that can be published as event data.
//
// Implementations of this interface that contain ABCI event metadata should
// also implement the eventlog.ABCIEventer extension interface to expose those
// metadata to the event log machinery. Event data that do not contain ABCI
// metadata can safely omit this.
type EventData interface {
// The value must support encoding as a type-tagged JSON object.
jsontypes.Tagged
}
@ -125,6 +131,9 @@ type EventDataNewBlock struct {
// TypeTag implements the required method of jsontypes.Tagged.
func (EventDataNewBlock) TypeTag() string { return "tendermint/event/NewBlock" }
// ABCIEvents implements the eventlog.ABCIEventer interface.
func (e EventDataNewBlock) ABCIEvents() []abci.Event { return e.ResultFinalizeBlock.Events }
type EventDataNewBlockHeader struct {
Header Header `json:"header"`
@ -135,6 +144,9 @@ type EventDataNewBlockHeader struct {
// TypeTag implements the required method of jsontypes.Tagged.
func (EventDataNewBlockHeader) TypeTag() string { return "tendermint/event/NewBlockHeader" }
// ABCIEvents implements the eventlog.ABCIEventer interface.
func (e EventDataNewBlockHeader) ABCIEvents() []abci.Event { return e.ResultFinalizeBlock.Events }
type EventDataNewEvidence struct {
Evidence Evidence `json:"evidence"`
@ -152,6 +164,15 @@ type EventDataTx struct {
// TypeTag implements the required method of jsontypes.Tagged.
func (EventDataTx) TypeTag() string { return "tendermint/event/Tx" }
// ABCIEvents implements the eventlog.ABCIEventer interface.
func (e EventDataTx) ABCIEvents() []abci.Event {
base := []abci.Event{
eventWithAttr(TxHashKey, fmt.Sprintf("%X", Tx(e.Tx).Hash())),
eventWithAttr(TxHeightKey, fmt.Sprintf("%d", e.Height)),
}
return append(base, e.Result.Events...)
}
// NOTE: This goes into the replay WAL
type EventDataRoundState struct {
Height int64 `json:"height,string"`
@ -298,3 +319,16 @@ type BlockEventPublisher interface {
type TxEventPublisher interface {
PublishEventTx(context.Context, EventDataTx) error
}
// eventWithAttr constructs a single abci.Event with a single attribute.
// The type of the event and the name of the attribute are obtained by
// splitting the event type on period (e.g., "foo.bar").
func eventWithAttr(etype, value string) abci.Event {
parts := strings.SplitN(etype, ".", 2)
return abci.Event{
Type: parts[0],
Attributes: []abci.EventAttribute{{
Key: parts[1], Value: value,
}},
}
}

+ 16
- 0
types/events_test.go View File

@ -7,6 +7,22 @@ import (
"github.com/stretchr/testify/assert"
)
// Verify that the event data types satisfy their shared interface.
var (
_ EventData = EventDataBlockSyncStatus{}
_ EventData = EventDataCompleteProposal{}
_ EventData = EventDataNewBlock{}
_ EventData = EventDataNewBlockHeader{}
_ EventData = EventDataNewEvidence{}
_ EventData = EventDataNewRound{}
_ EventData = EventDataRoundState{}
_ EventData = EventDataStateSyncStatus{}
_ EventData = EventDataTx{}
_ EventData = EventDataValidatorSetUpdates{}
_ EventData = EventDataVote{}
_ EventData = EventDataString("")
)
func TestQueryTxFor(t *testing.T) {
tx := Tx("foo")
assert.Equal(t,


Loading…
Cancel
Save