You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

425 lines
15 KiB

  1. # ADR 065: Custom Event Indexing
  2. - [ADR 065: Custom Event Indexing](#adr-065-custom-event-indexing)
  3. - [Changelog](#changelog)
  4. - [Status](#status)
  5. - [Context](#context)
  6. - [Alternative Approaches](#alternative-approaches)
  7. - [Decision](#decision)
  8. - [Detailed Design](#detailed-design)
  9. - [EventSink](#eventsink)
  10. - [Supported Sinks](#supported-sinks)
  11. - [`KVEventSink`](#kveventsink)
  12. - [`PSQLEventSink`](#psqleventsink)
  13. - [Configuration](#configuration)
  14. - [Future Improvements](#future-improvements)
  15. - [Consequences](#consequences)
  16. - [Positive](#positive)
  17. - [Negative](#negative)
  18. - [Neutral](#neutral)
  19. - [References](#references)
  20. ## Changelog
  21. - April 1, 2021: Initial Draft (@alexanderbez)
  22. - April 28, 2021: Specify search capabilities are only supported through the KV indexer (@marbar3778)
  23. - May 19, 2021: Update the SQL schema and the eventsink interface (@jayt106)
  24. - Aug 30, 2021: Update the SQL schema and the psql implementation (@creachadair)
  25. - Oct 5, 2021: Clarify goals and implementation changes (@creachadair)
  26. ## Status
  27. Accepted
  28. ## Context
  29. Currently, Tendermint Core supports block and transaction event indexing through
  30. the `tx_index.indexer` configuration. Events are captured in transactions and
  31. are indexed via a `TxIndexer` type. Events are captured in blocks, specifically
  32. from `BeginBlock` and `EndBlock` application responses, and are indexed via a
  33. `BlockIndexer` type. Both of these types are managed by a single `IndexerService`
  34. which is responsible for consuming events and sending those events off to be
  35. indexed by the respective type.
  36. In addition to indexing, Tendermint Core also supports the ability to query for
  37. both indexed transaction and block events via Tendermint's RPC layer. The ability
  38. to query for these indexed events facilitates a great multitude of upstream client
  39. and application capabilities, e.g. block explorers, IBC relayers, and auxiliary
  40. data availability and indexing services.
  41. Currently, Tendermint only supports indexing via a `kv` indexer, which is supported
  42. by an underlying embedded key/value store database. The `kv` indexer implements
  43. its own indexing and query mechanisms. While the former is somewhat trivial,
  44. providing a rich and flexible query layer is not as trivial and has caused many
  45. issues and UX concerns for upstream clients and applications.
  46. The fragile nature of the proprietary `kv` query engine and the potential
  47. performance and scaling issues that arise when a large number of consumers are
  48. introduced, motivate the need for a more robust and flexible indexing and query
  49. solution.
  50. ## Alternative Approaches
  51. With regards to alternative approaches to a more robust solution, the only serious
  52. contender that was considered was to transition to using [SQLite](https://www.sqlite.org/index.html).
  53. While the approach would work, it locks us into a specific query language and
  54. storage layer, so in some ways it's only a bit better than our current approach.
  55. In addition, the implementation would require the introduction of CGO into the
  56. Tendermint Core stack, whereas right now CGO is only introduced depending on
  57. the database used.
  58. ## Decision
  59. We will adopt a similar approach to that of the Cosmos SDK's `KVStore` state
  60. listening described in [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/master/docs/architecture/adr-038-state-listening.md).
  61. We will implement the following changes:
  62. - Introduce a new interface, `EventSink`, that all data sinks must implement.
  63. - Augment the existing `tx_index.indexer` configuration to now accept a series
  64. of one or more indexer types, i.e., sinks.
  65. - Combine the current `TxIndexer` and `BlockIndexer` into a single `KVEventSink`
  66. that implements the `EventSink` interface.
  67. - Introduce an additional `EventSink` implementation that is backed by
  68. [PostgreSQL](https://www.postgresql.org/).
  69. - Implement the necessary schemas to support both block and transaction event indexing.
  70. - Update `IndexerService` to use a series of `EventSinks`.
  71. In addition:
  72. - The Postgres indexer implementation will _not_ implement the proprietary `kv`
  73. query language. Users wishing to write queries against the Postgres indexer
  74. will connect to the underlying DBMS directly and use SQL queries based on the
  75. indexing schema.
  76. Future custom indexer implementations will not be required to support the
  77. proprietary query language either.
  78. - For now, the existing `kv` indexer will be left in place with its current
  79. query support, but will be marked as deprecated in a subsequent release, and
  80. the documentation will be updated to encourage users who need to query the
  81. event index to migrate to the Postgres indexer.
  82. - In the future we may remove the `kv` indexer entirely, or replace it with a
  83. different implementation; that decision is deferred as future work.
  84. - In the future, we may remove the index query endpoints from the RPC service
  85. entirely; that decision is deferred as future work, but recommended.
  86. ## Detailed Design
  87. ### EventSink
  88. We introduce the `EventSink` interface type that all supported sinks must implement.
  89. The interface is defined as follows:
  90. ```go
  91. type EventSink interface {
  92. IndexBlockEvents(types.EventDataNewBlockHeader) error
  93. IndexTxEvents([]*abci.TxResult) error
  94. SearchBlockEvents(context.Context, *query.Query) ([]int64, error)
  95. SearchTxEvents(context.Context, *query.Query) ([]*abci.TxResult, error)
  96. GetTxByHash([]byte) (*abci.TxResult, error)
  97. HasBlock(int64) (bool, error)
  98. Type() EventSinkType
  99. Stop() error
  100. }
  101. ```
  102. The `IndexerService` will accept a list of one or more `EventSink` types. During
  103. the `OnStart` method it will call the appropriate APIs on each `EventSink` to
  104. index both block and transaction events.
  105. ### Supported Sinks
  106. We will initially support two `EventSink` types out of the box.
  107. #### `KVEventSink`
  108. This type of `EventSink` is a combination of the `TxIndexer` and `BlockIndexer`
  109. indexers, both of which are backed by a single embedded key/value database.
  110. A bulk of the existing business logic will remain the same, but the existing APIs
  111. mapped to the new `EventSink` API. Both types will be removed in favor of a single
  112. `KVEventSink` type.
  113. The `KVEventSink` will be the only `EventSink` enabled by default, so from a UX
  114. perspective, operators should not notice a difference apart from a configuration
  115. change.
  116. We omit `EventSink` implementation details as it should be fairly straightforward
  117. to map the existing business logic to the new APIs.
  118. #### `PSQLEventSink`
  119. This type of `EventSink` indexes block and transaction events into a [PostgreSQL](https://www.postgresql.org/).
  120. database. We define and automatically migrate the following schema when the
  121. `IndexerService` starts.
  122. The postgres eventsink will not support `tx_search`, `block_search`, `GetTxByHash` and `HasBlock`.
  123. ```sql
  124. -- Table Definition ----------------------------------------------
  125. -- The blocks table records metadata about each block.
  126. -- The block record does not include its events or transactions (see tx_results).
  127. CREATE TABLE blocks (
  128. rowid BIGSERIAL PRIMARY KEY,
  129. height BIGINT NOT NULL,
  130. chain_id VARCHAR NOT NULL,
  131. -- When this block header was logged into the sink, in UTC.
  132. created_at TIMESTAMPTZ NOT NULL,
  133. UNIQUE (height, chain_id)
  134. );
  135. -- Index blocks by height and chain, since we need to resolve block IDs when
  136. -- indexing transaction records and transaction events.
  137. CREATE INDEX idx_blocks_height_chain ON blocks(height, chain_id);
  138. -- The tx_results table records metadata about transaction results. Note that
  139. -- the events from a transaction are stored separately.
  140. CREATE TABLE tx_results (
  141. rowid BIGSERIAL PRIMARY KEY,
  142. -- The block to which this transaction belongs.
  143. block_id BIGINT NOT NULL REFERENCES blocks(rowid),
  144. -- The sequential index of the transaction within the block.
  145. index INTEGER NOT NULL,
  146. -- When this result record was logged into the sink, in UTC.
  147. created_at TIMESTAMPTZ NOT NULL,
  148. -- The hex-encoded hash of the transaction.
  149. tx_hash VARCHAR NOT NULL,
  150. -- The protobuf wire encoding of the TxResult message.
  151. tx_result BYTEA NOT NULL,
  152. UNIQUE (block_id, index)
  153. );
  154. -- The events table records events. All events (both block and transaction) are
  155. -- associated with a block ID; transaction events also have a transaction ID.
  156. CREATE TABLE events (
  157. rowid BIGSERIAL PRIMARY KEY,
  158. -- The block and transaction this event belongs to.
  159. -- If tx_id is NULL, this is a block event.
  160. block_id BIGINT NOT NULL REFERENCES blocks(rowid),
  161. tx_id BIGINT NULL REFERENCES tx_results(rowid),
  162. -- The application-defined type label for the event.
  163. type VARCHAR NOT NULL
  164. );
  165. -- The attributes table records event attributes.
  166. CREATE TABLE attributes (
  167. event_id BIGINT NOT NULL REFERENCES events(rowid),
  168. key VARCHAR NOT NULL, -- bare key
  169. composite_key VARCHAR NOT NULL, -- composed type.key
  170. value VARCHAR NULL,
  171. UNIQUE (event_id, key)
  172. );
  173. -- A joined view of events and their attributes. Events that do not have any
  174. -- attributes are represented as a single row with empty key and value fields.
  175. CREATE VIEW event_attributes AS
  176. SELECT block_id, tx_id, type, key, composite_key, value
  177. FROM events LEFT JOIN attributes ON (events.rowid = attributes.event_id);
  178. -- A joined view of all block events (those having tx_id NULL).
  179. CREATE VIEW block_events AS
  180. SELECT blocks.rowid as block_id, height, chain_id, type, key, composite_key, value
  181. FROM blocks JOIN event_attributes ON (blocks.rowid = event_attributes.block_id)
  182. WHERE event_attributes.tx_id IS NULL;
  183. -- A joined view of all transaction events.
  184. CREATE VIEW tx_events AS
  185. SELECT height, index, chain_id, type, key, composite_key, value, tx_results.created_at
  186. FROM blocks JOIN tx_results ON (blocks.rowid = tx_results.block_id)
  187. JOIN event_attributes ON (tx_results.rowid = event_attributes.tx_id)
  188. WHERE event_attributes.tx_id IS NOT NULL;
  189. ```
  190. The `PSQLEventSink` will implement the `EventSink` interface as follows
  191. (some details omitted for brevity):
  192. ```go
  193. func NewEventSink(connStr, chainID string) (*EventSink, error) {
  194. db, err := sql.Open(driverName, connStr)
  195. // ...
  196. return &EventSink{
  197. store: db,
  198. chainID: chainID,
  199. }, nil
  200. }
  201. func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error {
  202. ts := time.Now().UTC()
  203. return runInTransaction(es.store, func(tx *sql.Tx) error {
  204. // Add the block to the blocks table and report back its row ID for use
  205. // in indexing the events for the block.
  206. blockID, err := queryWithID(tx, `
  207. INSERT INTO blocks (height, chain_id, created_at)
  208. VALUES ($1, $2, $3)
  209. ON CONFLICT DO NOTHING
  210. RETURNING rowid;
  211. `, h.Header.Height, es.chainID, ts)
  212. // ...
  213. // Insert the special block meta-event for height.
  214. if err := insertEvents(tx, blockID, 0, []abci.Event{
  215. makeIndexedEvent(types.BlockHeightKey, fmt.Sprint(h.Header.Height)),
  216. }); err != nil {
  217. return fmt.Errorf("block meta-events: %w", err)
  218. }
  219. // Insert all the block events. Order is important here,
  220. if err := insertEvents(tx, blockID, 0, h.ResultBeginBlock.Events); err != nil {
  221. return fmt.Errorf("begin-block events: %w", err)
  222. }
  223. if err := insertEvents(tx, blockID, 0, h.ResultEndBlock.Events); err != nil {
  224. return fmt.Errorf("end-block events: %w", err)
  225. }
  226. return nil
  227. })
  228. }
  229. func (es *EventSink) IndexTxEvents(txrs []*abci.TxResult) error {
  230. ts := time.Now().UTC()
  231. for _, txr := range txrs {
  232. // Encode the result message in protobuf wire format for indexing.
  233. resultData, err := proto.Marshal(txr)
  234. // ...
  235. // Index the hash of the underlying transaction as a hex string.
  236. txHash := fmt.Sprintf("%X", types.Tx(txr.Tx).Hash())
  237. if err := runInTransaction(es.store, func(tx *sql.Tx) error {
  238. // Find the block associated with this transaction.
  239. blockID, err := queryWithID(tx, `
  240. SELECT rowid FROM blocks WHERE height = $1 AND chain_id = $2;
  241. `, txr.Height, es.chainID)
  242. // ...
  243. // Insert a record for this tx_result and capture its ID for indexing events.
  244. txID, err := queryWithID(tx, `
  245. INSERT INTO tx_results (block_id, index, created_at, tx_hash, tx_result)
  246. VALUES ($1, $2, $3, $4, $5)
  247. ON CONFLICT DO NOTHING
  248. RETURNING rowid;
  249. `, blockID, txr.Index, ts, txHash, resultData)
  250. // ...
  251. // Insert the special transaction meta-events for hash and height.
  252. if err := insertEvents(tx, blockID, txID, []abci.Event{
  253. makeIndexedEvent(types.TxHashKey, txHash),
  254. makeIndexedEvent(types.TxHeightKey, fmt.Sprint(txr.Height)),
  255. }); err != nil {
  256. return fmt.Errorf("indexing transaction meta-events: %w", err)
  257. }
  258. // Index any events packaged with the transaction.
  259. if err := insertEvents(tx, blockID, txID, txr.Result.Events); err != nil {
  260. return fmt.Errorf("indexing transaction events: %w", err)
  261. }
  262. return nil
  263. }); err != nil {
  264. return err
  265. }
  266. }
  267. return nil
  268. }
  269. // SearchBlockEvents is not implemented by this sink, and reports an error for all queries.
  270. func (es *EventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error)
  271. // SearchTxEvents is not implemented by this sink, and reports an error for all queries.
  272. func (es *EventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error)
  273. // GetTxByHash is not implemented by this sink, and reports an error for all queries.
  274. func (es *EventSink) GetTxByHash(hash []byte) (*abci.TxResult, error)
  275. // HasBlock is not implemented by this sink, and reports an error for all queries.
  276. func (es *EventSink) HasBlock(h int64) (bool, error)
  277. ```
  278. ### Configuration
  279. The current `tx_index.indexer` configuration would be changed to accept a list
  280. of supported `EventSink` types instead of a single value.
  281. Example:
  282. ```toml
  283. [tx_index]
  284. indexer = [
  285. "kv",
  286. "psql"
  287. ]
  288. ```
  289. If the `indexer` list contains the `null` indexer, then no indexers will be used
  290. regardless of what other values may exist.
  291. Additional configuration parameters might be required depending on what event
  292. sinks are supplied to `tx_index.indexer`. The `psql` will require an additional
  293. connection configuration.
  294. ```toml
  295. [tx_index]
  296. indexer = [
  297. "kv",
  298. "psql"
  299. ]
  300. pqsql_conn = "postgresql://<user>:<password>@<host>:<port>/<db>?<opts>"
  301. ```
  302. Any invalid or misconfigured `tx_index` configuration should yield an error as
  303. early as possible.
  304. ## Future Improvements
  305. Although not technically required to maintain feature parity with the current
  306. existing Tendermint indexer, it would be beneficial for operators to have a method
  307. of performing a "re-index". Specifically, Tendermint operators could invoke an
  308. RPC method that allows the Tendermint node to perform a re-indexing of all block
  309. and transaction events between two given heights, H<sub>1</sub> and H<sub>2</sub>,
  310. so long as the block store contains the blocks and transaction results for all
  311. the heights specified in a given range.
  312. ## Consequences
  313. ### Positive
  314. - A more robust and flexible indexing and query engine for indexing and search
  315. block and transaction events.
  316. - The ability to not have to support a custom indexing and query engine beyond
  317. the legacy `kv` type.
  318. - The ability to offload/proxy indexing and querying to the underling sink.
  319. - Scalability and reliability that essentially comes "for free" from the underlying
  320. sink, if it supports it.
  321. ### Negative
  322. - The need to support multiple and potentially a growing set of custom `EventSink`
  323. types.
  324. ### Neutral
  325. ## References
  326. - [Cosmos SDK ADR-038](https://github.com/cosmos/cosmos-sdk/blob/master/docs/architecture/adr-038-state-listening.md)
  327. - [PostgreSQL](https://www.postgresql.org/)
  328. - [SQLite](https://www.sqlite.org/index.html)