From 5662bd12a8bc375014c3a2e739390913f6840e77 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Wed, 23 Feb 2022 15:22:40 -0800 Subject: [PATCH] rpc: implement the ADR 075 /events method (#7965) This method implements the eventlog extension interface to expose ABCI metadata to the log for query processing. Only the types that have ABCI events need to implement this. - Add an event log to the environment - Add a sketch of the handler method - Add an /events RPCFunc to the route map - Implement query logic - Subscribe to pubsub if confingured, handle termination --- internal/rpc/core/env.go | 38 +++++++++++ internal/rpc/core/events.go | 131 ++++++++++++++++++++++++++++++++++++ internal/rpc/core/routes.go | 7 +- light/proxy/routes.go | 14 ++++ rpc/coretypes/responses.go | 64 ++++++++++++++++++ types/events.go | 34 ++++++++++ types/events_test.go | 16 +++++ 7 files changed, 303 insertions(+), 1 deletion(-) diff --git a/internal/rpc/core/env.go b/internal/rpc/core/env.go index 48c368213..0e2cef230 100644 --- a/internal/rpc/core/env.go +++ b/internal/rpc/core/env.go @@ -16,10 +16,12 @@ import ( "github.com/tendermint/tendermint/internal/blocksync" "github.com/tendermint/tendermint/internal/consensus" "github.com/tendermint/tendermint/internal/eventbus" + "github.com/tendermint/tendermint/internal/eventlog" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/proxy" tmpubsub "github.com/tendermint/tendermint/internal/pubsub" + "github.com/tendermint/tendermint/internal/pubsub/query" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/state/indexer" "github.com/tendermint/tendermint/internal/statesync" @@ -93,6 +95,7 @@ type Environment struct { GenDoc *types.GenesisDoc // cache the genesis structure EventSinks []indexer.EventSink EventBus *eventbus.EventBus // thread safe + EventLog *eventlog.Log Mempool mempool.Mempool StateSyncMetricer statesync.Metricer @@ -239,6 +242,41 @@ func (env *Environment) StartService(ctx context.Context, conf *config.Config) ( cfg.WriteTimeout = conf.RPC.TimeoutBroadcastTxCommit + 1*time.Second } + // If the event log is enabled, subscribe to all events published to the + // event bus, and forward them to the event log. + if lg := env.EventLog; lg != nil { + // TODO(creachadair): This is kind of a hack, ideally we'd share the + // observer with the indexer, but it's tricky to plumb them together. + // For now, use a "normal" subscription with a big buffer allowance. + // The event log should always be able to keep up. + const subscriberID = "event-log-subscriber" + sub, err := env.EventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{ + ClientID: subscriberID, + Query: query.All, + Limit: 1 << 16, // essentially "no limit" + }) + if err != nil { + return nil, fmt.Errorf("event log subscribe: %w", err) + } + go func() { + // N.B. Use background for unsubscribe, ctx is already terminated. + defer env.EventBus.UnsubscribeAll(context.Background(), subscriberID) // nolint:errcheck + for { + msg, err := sub.Next(ctx) + if err != nil { + env.Logger.Error("Subscription terminated", "err", err) + return + } + etype, ok := eventlog.FindType(msg.Events()) + if ok { + _ = lg.Add(etype, msg.Data()) + } + } + }() + + env.Logger.Info("Event log subscription enabled") + } + // We may expose the RPC over both TCP and a Unix-domain socket. listeners := make([]net.Listener, len(listenAddrs)) for i, listenAddr := range listenAddrs { diff --git a/internal/rpc/core/events.go b/internal/rpc/core/events.go index 045d20f4d..1a26d9b31 100644 --- a/internal/rpc/core/events.go +++ b/internal/rpc/core/events.go @@ -6,6 +6,9 @@ import ( "fmt" "time" + "github.com/tendermint/tendermint/internal/eventlog" + "github.com/tendermint/tendermint/internal/eventlog/cursor" + "github.com/tendermint/tendermint/internal/jsontypes" tmpubsub "github.com/tendermint/tendermint/internal/pubsub" tmquery "github.com/tendermint/tendermint/internal/pubsub/query" "github.com/tendermint/tendermint/rpc/coretypes" @@ -126,3 +129,131 @@ func (env *Environment) UnsubscribeAll(ctx context.Context) (*coretypes.ResultUn } return &coretypes.ResultUnsubscribe{}, nil } + +// Events applies a query to the event log. If an event log is not enabled, +// Events reports an error. Otherwise, it filters the current contents of the +// log to return matching events. +// +// Events returns up to maxItems of the newest eligible event items. An item is +// eligible if it is older than before (or before is zero), it is newer than +// after (or after is zero), and its data matches the filter. A nil filter +// matches all event data. +// +// If before is zero and no eligible event items are available, Events waits +// for up to waitTime for a matching item to become available. The wait is +// terminated early if ctx ends. +// +// If maxItems ≤ 0, a default positive number of events is chosen. The values +// of maxItems and waitTime may be capped to sensible internal maxima without +// reporting an error to the caller. +func (env *Environment) Events(ctx context.Context, + filter *coretypes.EventFilter, + maxItems int, + before, after cursor.Cursor, + waitTime time.Duration, +) (*coretypes.ResultEvents, error) { + if env.EventLog == nil { + return nil, errors.New("the event log is not enabled") + } + + // Parse and validate parameters. + if maxItems <= 0 { + maxItems = 10 + } else if maxItems > 100 { + maxItems = 100 + } + + const maxWaitTime = 30 * time.Second + if waitTime > maxWaitTime { + waitTime = maxWaitTime + } + + query := tmquery.All + if filter != nil && filter.Query != "" { + q, err := tmquery.New(filter.Query) + if err != nil { + return nil, fmt.Errorf("invalid filter query: %w", err) + } + query = q + } + + var info eventlog.Info + var items []*eventlog.Item + var err error + accept := func(itm *eventlog.Item) error { + // N.B. We accept up to one item more than requested, so we can tell how + // to set the "more" flag in the response. + if len(items) > maxItems { + return eventlog.ErrStopScan + } + if cursorInRange(itm.Cursor, before, after) && query.Matches(itm.Events) { + items = append(items, itm) + } + return nil + } + + if waitTime > 0 && before.IsZero() { + ctx, cancel := context.WithTimeout(ctx, waitTime) + defer cancel() + + // Long poll. The loop here is because new items may not match the query, + // and we want to keep waiting until we have relevant results (or time out). + cur := after + for len(items) == 0 { + info, err = env.EventLog.WaitScan(ctx, cur, accept) + if err != nil { + // Don't report a timeout as a request failure. + if errors.Is(err, context.DeadlineExceeded) { + err = nil + } + break + } + cur = info.Newest + } + } else { + // Quick poll, return only what is already available. + info, err = env.EventLog.Scan(accept) + } + if err != nil { + return nil, err + } + + more := len(items) > maxItems + if more { + items = items[:len(items)-1] + } + enc, err := marshalItems(items) + if err != nil { + return nil, err + } + return &coretypes.ResultEvents{ + Items: enc, + More: more, + Oldest: cursorString(info.Oldest), + Newest: cursorString(info.Newest), + }, nil +} + +func cursorString(c cursor.Cursor) string { + if c.IsZero() { + return "" + } + return c.String() +} + +func cursorInRange(c, before, after cursor.Cursor) bool { + return (before.IsZero() || c.Before(before)) && (after.IsZero() || after.Before(c)) +} + +func marshalItems(items []*eventlog.Item) ([]*coretypes.EventItem, error) { + out := make([]*coretypes.EventItem, len(items)) + for i, itm := range items { + v, err := jsontypes.Marshal(itm.Data) + if err != nil { + return nil, fmt.Errorf("encoding event data: %w", err) + } + out[i] = &coretypes.EventItem{Cursor: itm.Cursor.String(), Event: itm.Type} + out[i].Data = v + } + return out, nil +} diff --git a/internal/rpc/core/routes.go b/internal/rpc/core/routes.go index 84e8e199a..945798ed7 100644 --- a/internal/rpc/core/routes.go +++ b/internal/rpc/core/routes.go @@ -2,7 +2,9 @@ package core import ( "context" + "time" + "github.com/tendermint/tendermint/internal/eventlog/cursor" "github.com/tendermint/tendermint/libs/bytes" "github.com/tendermint/tendermint/rpc/coretypes" rpc "github.com/tendermint/tendermint/rpc/jsonrpc/server" @@ -28,7 +30,9 @@ func NewRoutesMap(svc RPCService, opts *RouteOptions) RoutesMap { opts = new(RouteOptions) } out := RoutesMap{ - // subscribe/unsubscribe are reserved for websocket events. + // Event subscription. Note that subscribe, unsubscribe, and + // unsubscribe_all are only available via the websocket endpoint. + "events": rpc.NewRPCFunc(svc.Events, "filter", "maxItems", "before", "after", "waitTime"), "subscribe": rpc.NewWSRPCFunc(svc.Subscribe, "query"), "unsubscribe": rpc.NewWSRPCFunc(svc.Unsubscribe, "query"), "unsubscribe_all": rpc.NewWSRPCFunc(svc.UnsubscribeAll), @@ -94,6 +98,7 @@ type RPCService interface { Commit(ctx context.Context, heightPtr *int64) (*coretypes.ResultCommit, error) ConsensusParams(ctx context.Context, heightPtr *int64) (*coretypes.ResultConsensusParams, error) DumpConsensusState(ctx context.Context) (*coretypes.ResultDumpConsensusState, error) + Events(ctx context.Context, filter *coretypes.EventFilter, maxItems int, before, after cursor.Cursor, waitTime time.Duration) (*coretypes.ResultEvents, error) Genesis(ctx context.Context) (*coretypes.ResultGenesis, error) GenesisChunked(ctx context.Context, chunk uint) (*coretypes.ResultGenesisChunk, error) GetConsensusState(ctx context.Context) (*coretypes.ResultConsensusState, error) diff --git a/light/proxy/routes.go b/light/proxy/routes.go index 24e7e18c2..bf1ecc5a9 100644 --- a/light/proxy/routes.go +++ b/light/proxy/routes.go @@ -2,7 +2,10 @@ package proxy import ( "context" + "errors" + "time" + "github.com/tendermint/tendermint/internal/eventlog/cursor" tmbytes "github.com/tendermint/tendermint/libs/bytes" lrpc "github.com/tendermint/tendermint/light/rpc" rpcclient "github.com/tendermint/tendermint/rpc/client" @@ -27,6 +30,17 @@ func (p proxyService) GetConsensusState(ctx context.Context) (*coretypes.ResultC return p.ConsensusState(ctx) } +// TODO(creachadair): Remove this once the RPC clients support the new method. +// This is just a placeholder to let things build during development. +func (proxyService) Events(ctx context.Context, + filter *coretypes.EventFilter, + maxItems int, + before, after cursor.Cursor, + waitTime time.Duration, +) (*coretypes.ResultEvents, error) { + return nil, errors.New("the /events method is not implemented") +} + func (p proxyService) Subscribe(ctx context.Context, query string) (*coretypes.ResultSubscribe, error) { return p.SubscribeWS(ctx, query) } diff --git a/rpc/coretypes/responses.go b/rpc/coretypes/responses.go index c4b1ddd8f..9f86742de 100644 --- a/rpc/coretypes/responses.go +++ b/rpc/coretypes/responses.go @@ -356,3 +356,67 @@ type Evidence struct { func (e Evidence) MarshalJSON() ([]byte, error) { return jsontypes.Marshal(e.Value) } func (e *Evidence) UnmarshalJSON(data []byte) error { return jsontypes.Unmarshal(data, &e.Value) } + +// RequestEvents is the argument for the "/events" RPC endpoint. +type RequestEvents struct { + // Optional filter spec. If nil or empty, all items are eligible. + Filter *EventFilter `json:"filter"` + + // The maximum number of eligible items to return. + // If zero or negative, the server will report a default number. + MaxItems int `json:"max_items"` + + // Return only items after this cursor. If empty, the limit is just + // before the the beginning of the event log. + After string `json:"after"` + + // Return only items before this cursor. If empty, the limit is just + // after the head of the event log. + Before string `json:"before"` + + // Wait for up to this long for events to be available. + WaitTime time.Duration `json:"wait_time"` +} + +// An EventFilter specifies which events are selected by an /events request. +type EventFilter struct { + Query string `json:"query"` +} + +// ResultEvents is the response from the "/events" RPC endpoint. +type ResultEvents struct { + // The items matching the request parameters, from newest + // to oldest, if any were available within the timeout. + Items []*EventItem `json:"items"` + + // This is true if there is at least one older matching item + // available in the log that was not returned. + More bool `json:"more"` + + // The cursor of the oldest item in the log at the time of this reply, + // or "" if the log is empty. + Oldest string `json:"oldest"` + + // The cursor of the newest item in the log at the time of this reply, + // or "" if the log is empty. + Newest string `json:"newest"` +} + +type EventItem struct { + // The cursor of this item. + Cursor string `json:"cursor"` + + // The event label of this item (for example, "Vote"). + Event string `json:"event,omitempty"` + + // The encoded event data for this item. The content is a JSON object with + // the following structure: + // + // { + // "type": "type-tag", + // "value": + // } + // + // The known type tags are defined by the tendermint/types package. + Data json.RawMessage `json:"data"` +} diff --git a/types/events.go b/types/events.go index 4ddfd0ba7..d20ecfa93 100644 --- a/types/events.go +++ b/types/events.go @@ -92,7 +92,13 @@ var ( // ENCODING / DECODING // EventData is satisfied by types that can be published as event data. +// +// Implementations of this interface that contain ABCI event metadata should +// also implement the eventlog.ABCIEventer extension interface to expose those +// metadata to the event log machinery. Event data that do not contain ABCI +// metadata can safely omit this. type EventData interface { + // The value must support encoding as a type-tagged JSON object. jsontypes.Tagged } @@ -125,6 +131,9 @@ type EventDataNewBlock struct { // TypeTag implements the required method of jsontypes.Tagged. func (EventDataNewBlock) TypeTag() string { return "tendermint/event/NewBlock" } +// ABCIEvents implements the eventlog.ABCIEventer interface. +func (e EventDataNewBlock) ABCIEvents() []abci.Event { return e.ResultFinalizeBlock.Events } + type EventDataNewBlockHeader struct { Header Header `json:"header"` @@ -135,6 +144,9 @@ type EventDataNewBlockHeader struct { // TypeTag implements the required method of jsontypes.Tagged. func (EventDataNewBlockHeader) TypeTag() string { return "tendermint/event/NewBlockHeader" } +// ABCIEvents implements the eventlog.ABCIEventer interface. +func (e EventDataNewBlockHeader) ABCIEvents() []abci.Event { return e.ResultFinalizeBlock.Events } + type EventDataNewEvidence struct { Evidence Evidence `json:"evidence"` @@ -152,6 +164,15 @@ type EventDataTx struct { // TypeTag implements the required method of jsontypes.Tagged. func (EventDataTx) TypeTag() string { return "tendermint/event/Tx" } +// ABCIEvents implements the eventlog.ABCIEventer interface. +func (e EventDataTx) ABCIEvents() []abci.Event { + base := []abci.Event{ + eventWithAttr(TxHashKey, fmt.Sprintf("%X", Tx(e.Tx).Hash())), + eventWithAttr(TxHeightKey, fmt.Sprintf("%d", e.Height)), + } + return append(base, e.Result.Events...) +} + // NOTE: This goes into the replay WAL type EventDataRoundState struct { Height int64 `json:"height,string"` @@ -298,3 +319,16 @@ type BlockEventPublisher interface { type TxEventPublisher interface { PublishEventTx(context.Context, EventDataTx) error } + +// eventWithAttr constructs a single abci.Event with a single attribute. +// The type of the event and the name of the attribute are obtained by +// splitting the event type on period (e.g., "foo.bar"). +func eventWithAttr(etype, value string) abci.Event { + parts := strings.SplitN(etype, ".", 2) + return abci.Event{ + Type: parts[0], + Attributes: []abci.EventAttribute{{ + Key: parts[1], Value: value, + }}, + } +} diff --git a/types/events_test.go b/types/events_test.go index bd4bde264..c3f27c70f 100644 --- a/types/events_test.go +++ b/types/events_test.go @@ -7,6 +7,22 @@ import ( "github.com/stretchr/testify/assert" ) +// Verify that the event data types satisfy their shared interface. +var ( + _ EventData = EventDataBlockSyncStatus{} + _ EventData = EventDataCompleteProposal{} + _ EventData = EventDataNewBlock{} + _ EventData = EventDataNewBlockHeader{} + _ EventData = EventDataNewEvidence{} + _ EventData = EventDataNewRound{} + _ EventData = EventDataRoundState{} + _ EventData = EventDataStateSyncStatus{} + _ EventData = EventDataTx{} + _ EventData = EventDataValidatorSetUpdates{} + _ EventData = EventDataVote{} + _ EventData = EventDataString("") +) + func TestQueryTxFor(t *testing.T) { tx := Tx("foo") assert.Equal(t,