From 7fe3e78a38e668649afe6cc745c99fc119d81714 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 31 Aug 2021 15:35:07 -0700 Subject: [PATCH] Update the psql indexer schema and implementation (#6868) Update the schema and implementation of the Postgres event indexer to improve certain types of queries against the index. These changes address the use cases raised by #6843, and are partly inspired by the prototype schema in that issue. In the old schema, events were flattened, making it difficult to find all the events associated with a particular block or transaction. In addition, events with no key/value attributes were entirely lost, since entries were generated only for attributes. To address these issues, this new schema records blocks, transactions, events, and attributes in separate tables, and provides views that join these tables to give a more convenient query surface for block and transaction events. - All events for a given block can be queried from the `block_events` view. - All events for a given transaction can be queried from the `tx_events` view. - Multiple events for the same key can be indexed for both blocks and transactions. The tests have been reworked, but all of the existing test cases for the old schema still pass with the new implementation. Various other minor cleanups are included, ADR-065 is also updated to reflect the updated schema. --- .../adr-065-custom-event-indexing.md | 290 ++++++----- state/indexer/indexer_service_test.go | 2 +- state/indexer/sink/psql/psql.go | 286 +++++----- state/indexer/sink/psql/psql_test.go | 490 +++++++++--------- state/indexer/sink/psql/schema.sql | 109 +++- 5 files changed, 634 insertions(+), 543 deletions(-) diff --git a/docs/architecture/adr-065-custom-event-indexing.md b/docs/architecture/adr-065-custom-event-indexing.md index e6a3fdead..b5c86ecfa 100644 --- a/docs/architecture/adr-065-custom-event-indexing.md +++ b/docs/architecture/adr-065-custom-event-indexing.md @@ -24,6 +24,7 @@ - April 1, 2021: Initial Draft (@alexanderbez) - April 28, 2021: Specify search capabilities are only supported through the KV indexer (@marbar3778) - May 19, 2021: Update the SQL schema and the eventsink interface (@jayt106) +- Aug 30, 2021: Update the SQL schema and the psql implementation (@creachadair) ## Status @@ -145,163 +146,190 @@ The postgres eventsink will not support `tx_search`, `block_search`, `GetTxByHas ```sql -- Table Definition ---------------------------------------------- -CREATE TYPE block_event_type AS ENUM ('begin_block', 'end_block', ''); +-- The blocks table records metadata about each block. +-- The block record does not include its events or transactions (see tx_results). +CREATE TABLE blocks ( + rowid BIGSERIAL PRIMARY KEY, -CREATE TABLE block_events ( - id SERIAL PRIMARY KEY, - key VARCHAR NOT NULL, - value VARCHAR NOT NULL, - height INTEGER NOT NULL, - type block_event_type, - created_at TIMESTAMPTZ NOT NULL, - chain_id VARCHAR NOT NULL + height BIGINT NOT NULL, + chain_id VARCHAR NOT NULL, + + -- When this block header was logged into the sink, in UTC. + created_at TIMESTAMPTZ NOT NULL, + + UNIQUE (height, chain_id) ); +-- Index blocks by height and chain, since we need to resolve block IDs when +-- indexing transaction records and transaction events. +CREATE INDEX idx_blocks_height_chain ON blocks(height, chain_id); + +-- The tx_results table records metadata about transaction results. Note that +-- the events from a transaction are stored separately. CREATE TABLE tx_results ( - id SERIAL PRIMARY KEY, - tx_result BYTEA NOT NULL, - created_at TIMESTAMPTZ NOT NULL + rowid BIGSERIAL PRIMARY KEY, + + -- The block to which this transaction belongs. + block_id BIGINT NOT NULL REFERENCES blocks(rowid), + -- The sequential index of the transaction within the block. + index INTEGER NOT NULL, + -- When this result record was logged into the sink, in UTC. + created_at TIMESTAMPTZ NOT NULL, + -- The hex-encoded hash of the transaction. + tx_hash VARCHAR NOT NULL, + -- The protobuf wire encoding of the TxResult message. + tx_result BYTEA NOT NULL, + + UNIQUE (block_id, index) ); -CREATE TABLE tx_events ( - id SERIAL PRIMARY KEY, - key VARCHAR NOT NULL, - value VARCHAR NOT NULL, - height INTEGER NOT NULL, - hash VARCHAR NOT NULL, - tx_result_id SERIAL, - created_at TIMESTAMPTZ NOT NULL, - chain_id VARCHAR NOT NULL, - FOREIGN KEY (tx_result_id) - REFERENCES tx_results(id) - ON DELETE CASCADE +-- The events table records events. All events (both block and transaction) are +-- associated with a block ID; transaction events also have a transaction ID. +CREATE TABLE events ( + rowid BIGSERIAL PRIMARY KEY, + + -- The block and transaction this event belongs to. + -- If tx_id is NULL, this is a block event. + block_id BIGINT NOT NULL REFERENCES blocks(rowid), + tx_id BIGINT NULL REFERENCES tx_results(rowid), + + -- The application-defined type label for the event. + type VARCHAR NOT NULL ); --- Indices ------------------------------------------------------- +-- The attributes table records event attributes. +CREATE TABLE attributes ( + event_id BIGINT NOT NULL REFERENCES events(rowid), + key VARCHAR NOT NULL, -- bare key + composite_key VARCHAR NOT NULL, -- composed type.key + value VARCHAR NULL, -CREATE INDEX idx_block_events_key_value ON block_events(key, value); -CREATE INDEX idx_tx_events_key_value ON tx_events(key, value); -CREATE INDEX idx_tx_events_hash ON tx_events(hash); + UNIQUE (event_id, key) +); + +-- A joined view of events and their attributes. Events that do not have any +-- attributes are represented as a single row with empty key and value fields. +CREATE VIEW event_attributes AS + SELECT block_id, tx_id, type, key, composite_key, value + FROM events LEFT JOIN attributes ON (events.rowid = attributes.event_id); + +-- A joined view of all block events (those having tx_id NULL). +CREATE VIEW block_events AS + SELECT blocks.rowid as block_id, height, chain_id, type, key, composite_key, value + FROM blocks JOIN event_attributes ON (blocks.rowid = event_attributes.block_id) + WHERE event_attributes.tx_id IS NULL; + +-- A joined view of all transaction events. +CREATE VIEW tx_events AS + SELECT height, index, chain_id, type, key, composite_key, value, tx_results.created_at + FROM blocks JOIN tx_results ON (blocks.rowid = tx_results.block_id) + JOIN event_attributes ON (tx_results.rowid = event_attributes.tx_id) + WHERE event_attributes.tx_id IS NOT NULL; ``` The `PSQLEventSink` will implement the `EventSink` interface as follows (some details omitted for brevity): - ```go -func NewPSQLEventSink(connStr string, chainID string) (*PSQLEventSink, error) { - db, err := sql.Open("postgres", connStr) - if err != nil { - return nil, err - } - - // ... +func NewEventSink(connStr, chainID string) (*EventSink, error) { + db, err := sql.Open(driverName, connStr) + // ... + + return &EventSink{ + store: db, + chainID: chainID, + }, nil } -func (es *PSQLEventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error { - sqlStmt := sq.Insert("block_events").Columns("key", "value", "height", "type", "created_at", "chain_id") - - // index the reserved block height index - ts := time.Now() - sqlStmt = sqlStmt.Values(types.BlockHeightKey, h.Header.Height, h.Header.Height, "", ts, es.chainID) - - for _, event := range h.ResultBeginBlock.Events { - // only index events with a non-empty type - if len(event.Type) == 0 { - continue - } - - for _, attr := range event.Attributes { - if len(attr.Key) == 0 { - continue - } - - // index iff the event specified index:true and it's not a reserved event - compositeKey := fmt.Sprintf("%s.%s", event.Type, string(attr.Key)) - if compositeKey == types.BlockHeightKey { - return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeKey) - } - - if attr.GetIndex() { - sqlStmt = sqlStmt.Values(compositeKey, string(attr.Value), h.Header.Height, BlockEventTypeBeginBlock, ts, es.chainID) - } - } - } - - // index end_block events... - // execute sqlStmt db query... +func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error { + ts := time.Now().UTC() + + return runInTransaction(es.store, func(tx *sql.Tx) error { + // Add the block to the blocks table and report back its row ID for use + // in indexing the events for the block. + blockID, err := queryWithID(tx, ` +INSERT INTO blocks (height, chain_id, created_at) + VALUES ($1, $2, $3) + ON CONFLICT DO NOTHING + RETURNING rowid; +`, h.Header.Height, es.chainID, ts) + // ... + + // Insert the special block meta-event for height. + if err := insertEvents(tx, blockID, 0, []abci.Event{ + makeIndexedEvent(types.BlockHeightKey, fmt.Sprint(h.Header.Height)), + }); err != nil { + return fmt.Errorf("block meta-events: %w", err) + } + // Insert all the block events. Order is important here, + if err := insertEvents(tx, blockID, 0, h.ResultBeginBlock.Events); err != nil { + return fmt.Errorf("begin-block events: %w", err) + } + if err := insertEvents(tx, blockID, 0, h.ResultEndBlock.Events); err != nil { + return fmt.Errorf("end-block events: %w", err) + } + return nil + }) } -func (es *PSQLEventSink) IndexTxEvents(txr []*abci.TxResult) error { - sqlStmtEvents := sq.Insert("tx_events").Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id") - sqlStmtTxResult := sq.Insert("tx_results").Columns("tx_result", "created_at") - - ts := time.Now() - for _, tx := range txr { - // store the tx result - txBz, err := proto.Marshal(tx) - if err != nil { - return err - } - - sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts) - - // execute sqlStmtTxResult db query... - var txID uint32 - err = sqlStmtTxResult.QueryRow().Scan(&txID) - if err != nil { +func (es *EventSink) IndexTxEvents(txrs []*abci.TxResult) error { + ts := time.Now().UTC() + + for _, txr := range txrs { + // Encode the result message in protobuf wire format for indexing. + resultData, err := proto.Marshal(txr) + // ... + + // Index the hash of the underlying transaction as a hex string. + txHash := fmt.Sprintf("%X", types.Tx(txr.Tx).Hash()) + + if err := runInTransaction(es.store, func(tx *sql.Tx) error { + // Find the block associated with this transaction. + blockID, err := queryWithID(tx, ` +SELECT rowid FROM blocks WHERE height = $1 AND chain_id = $2; +`, txr.Height, es.chainID) + // ... + + // Insert a record for this tx_result and capture its ID for indexing events. + txID, err := queryWithID(tx, ` +INSERT INTO tx_results (block_id, index, created_at, tx_hash, tx_result) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT DO NOTHING + RETURNING rowid; +`, blockID, txr.Index, ts, txHash, resultData) + // ... + + // Insert the special transaction meta-events for hash and height. + if err := insertEvents(tx, blockID, txID, []abci.Event{ + makeIndexedEvent(types.TxHashKey, txHash), + makeIndexedEvent(types.TxHeightKey, fmt.Sprint(txr.Height)), + }); err != nil { + return fmt.Errorf("indexing transaction meta-events: %w", err) + } + // Index any events packaged with the transaction. + if err := insertEvents(tx, blockID, txID, txr.Result.Events); err != nil { + return fmt.Errorf("indexing transaction events: %w", err) + } + return nil + + }); err != nil { return err } - - // index the reserved height and hash indices - hash := types.Tx(tx.Tx).Hash() - sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txID, ts, es.chainID) - sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, tx.Height, tx.Height, hash, txID, ts, es.chainID) - - for _, event := range result.Result.Events { - // only index events with a non-empty type - if len(event.Type) == 0 { - continue - } - - for _, attr := range event.Attributes { - if len(attr.Key) == 0 { - continue - } - - // index if `index: true` is set - compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key)) - - // ensure event does not conflict with a reserved prefix key - if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey { - return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag) - } - - if attr.GetIndex() { - sqlStmtEvents = sqlStmtEvents.Values(compositeKey, string(attr.Value), tx.Height, hash, txID, ts, es.chainID) - } - } - } - } - - // execute sqlStmtEvents db query... + } + return nil } -func (es *PSQLEventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) { - return nil, errors.New("block search is not supported via the postgres event sink") -} +// SearchBlockEvents is not implemented by this sink, and reports an error for all queries. +func (es *EventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) -func (es *PSQLEventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) { - return nil, errors.New("tx search is not supported via the postgres event sink") -} +// SearchTxEvents is not implemented by this sink, and reports an error for all queries. +func (es *EventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) -func (es *PSQLEventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) { - return nil, errors.New("getTxByHash is not supported via the postgres event sink") -} +// GetTxByHash is not implemented by this sink, and reports an error for all queries. +func (es *EventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) -func (es *PSQLEventSink) HasBlock(h int64) (bool, error) { - return false, errors.New("hasBlock is not supported via the postgres event sink") -} +// HasBlock is not implemented by this sink, and reports an error for all queries. +func (es *EventSink) HasBlock(h int64) (bool, error) ``` ### Configuration diff --git a/state/indexer/indexer_service_test.go b/state/indexer/indexer_service_test.go index 457ed065a..4d12cc86f 100644 --- a/state/indexer/indexer_service_test.go +++ b/state/indexer/indexer_service_test.go @@ -139,7 +139,7 @@ func setupDB(t *testing.T) (*dockertest.Pool, error) { assert.Nil(t, err) resource, err = pool.RunWithOptions(&dockertest.RunOptions{ - Repository: psql.DriverName, + Repository: "postgres", Tag: "13", Env: []string{ "POSTGRES_USER=" + user, diff --git a/state/indexer/sink/psql/psql.go b/state/indexer/sink/psql/psql.go index 8bd378f4a..e452ed406 100644 --- a/state/indexer/sink/psql/psql.go +++ b/state/indexer/sink/psql/psql.go @@ -6,23 +6,22 @@ import ( "database/sql" "errors" "fmt" + "strings" "time" - sq "github.com/Masterminds/squirrel" - proto "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/proto" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/types" ) -var _ indexer.EventSink = (*EventSink)(nil) - const ( - TableEventBlock = "block_events" - TableEventTx = "tx_events" - TableResultTx = "tx_results" - DriverName = "postgres" + tableBlocks = "blocks" + tableTxResults = "tx_results" + tableEvents = "events" + tableAttributes = "attributes" + driverName = "postgres" ) // EventSink is an indexer backend providing the tx/block index services. This @@ -37,7 +36,7 @@ type EventSink struct { // database specified by connStr. Events written to the sink are attributed to // the specified chainID. func NewEventSink(connStr, chainID string) (*EventSink, error) { - db, err := sql.Open(DriverName, connStr) + db, err := sql.Open(driverName, connStr) if err != nil { return nil, err } @@ -55,116 +54,183 @@ func (es *EventSink) DB() *sql.DB { return es.store } // Type returns the structure type for this sink, which is Postgres. func (es *EventSink) Type() indexer.EventSinkType { return indexer.PSQL } -// IndexBlockEvents indexes the specified block header, part of the -// indexer.EventSink interface. -func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error { - sqlStmt := sq. - Insert(TableEventBlock). - Columns("key", "value", "height", "type", "created_at", "chain_id"). - PlaceholderFormat(sq.Dollar). - Suffix("ON CONFLICT (key,height)"). - Suffix("DO NOTHING") - - ts := time.Now() - // index the reserved block height index - sqlStmt = sqlStmt. - Values(types.BlockHeightKey, fmt.Sprint(h.Header.Height), h.Header.Height, "", ts, es.chainID) - - // index begin_block events - sqlStmt, err := indexBlockEvents( - sqlStmt, h.ResultBeginBlock.Events, types.EventTypeBeginBlock, h.Header.Height, ts, es.chainID) +// runInTransaction executes query in a fresh database transaction. +// If query reports an error, the transaction is rolled back and the +// error from query is reported to the caller. +// Otherwise, the result of committing the transaction is returned. +func runInTransaction(db *sql.DB, query func(*sql.Tx) error) error { + dbtx, err := db.Begin() if err != nil { return err } - - // index end_block events - sqlStmt, err = indexBlockEvents( - sqlStmt, h.ResultEndBlock.Events, types.EventTypeEndBlock, h.Header.Height, ts, es.chainID) - if err != nil { + if err := query(dbtx); err != nil { + _ = dbtx.Rollback() // report the initial error, not the rollback return err } + return dbtx.Commit() +} - _, err = sqlStmt.RunWith(es.store).Exec() - return err +// queryWithID executes the specified SQL query with the given arguments, +// expecting a single-row, single-column result containing an ID. If the query +// succeeds, the ID from the result is returned. +func queryWithID(tx *sql.Tx, query string, args ...interface{}) (uint32, error) { + var id uint32 + if err := tx.QueryRow(query, args...).Scan(&id); err != nil { + return 0, err + } + return id, nil } -func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error { - // index the tx result - var txid uint32 - sqlStmtTxResult := sq. - Insert(TableResultTx). - Columns("tx_result", "created_at"). - PlaceholderFormat(sq.Dollar). - RunWith(es.store). - Suffix("ON CONFLICT (tx_result)"). - Suffix("DO NOTHING"). - Suffix("RETURNING \"id\"") - - sqlStmtEvents := sq. - Insert(TableEventTx). - Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id"). - PlaceholderFormat(sq.Dollar). - Suffix("ON CONFLICT (key,hash)"). - Suffix("DO NOTHING") - - ts := time.Now() - for _, tx := range txr { - txBz, err := proto.Marshal(tx) - if err != nil { - return err - } +// insertEvents inserts a slice of events and any indexed attributes of those +// events into the database associated with dbtx. +// +// If txID > 0, the event is attributed to the Tendermint transaction with that +// ID; otherwise it is recorded as a block event. +func insertEvents(dbtx *sql.Tx, blockID, txID uint32, evts []abci.Event) error { + // Populate the transaction ID field iff one is defined (> 0). + var txIDArg interface{} + if txID > 0 { + txIDArg = txID + } - sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts) + // Add each event to the events table, and retrieve its row ID to use when + // adding any attributes the event provides. + for _, evt := range evts { + // Skip events with an empty type. + if evt.Type == "" { + continue + } - // execute sqlStmtTxResult db query and retrieve the txid - r, err := sqlStmtTxResult.Query() + eid, err := queryWithID(dbtx, ` +INSERT INTO `+tableEvents+` (block_id, tx_id, type) VALUES ($1, $2, $3) + RETURNING rowid; +`, blockID, txIDArg, evt.Type) if err != nil { return err } - defer r.Close() - if !r.Next() { - return nil + // Add any attributes flagged for indexing. + for _, attr := range evt.Attributes { + if !attr.Index { + continue + } + compositeKey := evt.Type + "." + attr.Key + if _, err := dbtx.Exec(` +INSERT INTO `+tableAttributes+` (event_id, key, composite_key, value) + VALUES ($1, $2, $3, $4); +`, eid, attr.Key, compositeKey, attr.Value); err != nil { + return err + } } + } + return nil +} - if err := r.Scan(&txid); err != nil { - return err +// makeIndexedEvent constructs an event from the specified composite key and +// value. If the key has the form "type.name", the event will have a single +// attribute with that name and the value; otherwise the event will have only +// a type and no attributes. +func makeIndexedEvent(compositeKey, value string) abci.Event { + i := strings.Index(compositeKey, ".") + if i < 0 { + return abci.Event{Type: compositeKey} + } + return abci.Event{Type: compositeKey[:i], Attributes: []abci.EventAttribute{ + {Key: compositeKey[i+1:], Value: value, Index: true}, + }} +} + +// IndexBlockEvents indexes the specified block header, part of the +// indexer.EventSink interface. +func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error { + ts := time.Now().UTC() + + return runInTransaction(es.store, func(dbtx *sql.Tx) error { + // Add the block to the blocks table and report back its row ID for use + // in indexing the events for the block. + blockID, err := queryWithID(dbtx, ` +INSERT INTO `+tableBlocks+` (height, chain_id, created_at) + VALUES ($1, $2, $3) + ON CONFLICT DO NOTHING + RETURNING rowid; +`, h.Header.Height, es.chainID, ts) + if err == sql.ErrNoRows { + return nil // we already saw this block; quietly succeed + } else if err != nil { + return fmt.Errorf("indexing block header: %w", err) } - // index the reserved height and hash indices - hash := fmt.Sprintf("%X", types.Tx(tx.Tx).Hash()) + // Insert the special block meta-event for height. + if err := insertEvents(dbtx, blockID, 0, []abci.Event{ + makeIndexedEvent(types.BlockHeightKey, fmt.Sprint(h.Header.Height)), + }); err != nil { + return fmt.Errorf("block meta-events: %w", err) + } + // Insert all the block events. Order is important here, + if err := insertEvents(dbtx, blockID, 0, h.ResultBeginBlock.Events); err != nil { + return fmt.Errorf("begin-block events: %w", err) + } + if err := insertEvents(dbtx, blockID, 0, h.ResultEndBlock.Events); err != nil { + return fmt.Errorf("end-block events: %w", err) + } + return nil + }) +} - sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txid, ts, es.chainID) - sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, fmt.Sprint(tx.Height), tx.Height, hash, txid, ts, es.chainID) - for _, event := range tx.Result.Events { - // only index events with a non-empty type - if len(event.Type) == 0 { - continue - } +func (es *EventSink) IndexTxEvents(txrs []*abci.TxResult) error { + ts := time.Now().UTC() - for _, attr := range event.Attributes { - if len(attr.Key) == 0 { - continue - } + for _, txr := range txrs { + // Encode the result message in protobuf wire format for indexing. + resultData, err := proto.Marshal(txr) + if err != nil { + return fmt.Errorf("marshaling tx_result: %w", err) + } - // index if `index: true` is set - compositeTag := fmt.Sprintf("%s.%s", event.Type, attr.Key) + // Index the hash of the underlying transaction as a hex string. + txHash := fmt.Sprintf("%X", types.Tx(txr.Tx).Hash()) + + if err := runInTransaction(es.store, func(dbtx *sql.Tx) error { + // Find the block associated with this transaction. The block header + // must have been indexed prior to the transactions belonging to it. + blockID, err := queryWithID(dbtx, ` +SELECT rowid FROM `+tableBlocks+` WHERE height = $1 AND chain_id = $2; +`, txr.Height, es.chainID) + if err != nil { + return fmt.Errorf("finding block ID: %w", err) + } - // ensure event does not conflict with a reserved prefix key - if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey { - return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag) - } + // Insert a record for this tx_result and capture its ID for indexing events. + txID, err := queryWithID(dbtx, ` +INSERT INTO `+tableTxResults+` (block_id, index, created_at, tx_hash, tx_result) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT DO NOTHING + RETURNING rowid; +`, blockID, txr.Index, ts, txHash, resultData) + if err == sql.ErrNoRows { + return nil // we already saw this transaction; quietly succeed + } else if err != nil { + return fmt.Errorf("indexing tx_result: %w", err) + } - if attr.GetIndex() { - sqlStmtEvents = sqlStmtEvents.Values(compositeTag, attr.Value, tx.Height, hash, txid, ts, es.chainID) - } + // Insert the special transaction meta-events for hash and height. + if err := insertEvents(dbtx, blockID, txID, []abci.Event{ + makeIndexedEvent(types.TxHashKey, txHash), + makeIndexedEvent(types.TxHeightKey, fmt.Sprint(txr.Height)), + }); err != nil { + return fmt.Errorf("indexing transaction meta-events: %w", err) } + // Index any events packaged with the transaction. + if err := insertEvents(dbtx, blockID, txID, txr.Result.Events); err != nil { + return fmt.Errorf("indexing transaction events: %w", err) + } + return nil + + }); err != nil { + return err } } - - // execute sqlStmtEvents db query... - _, err := sqlStmtEvents.RunWith(es.store).Exec() - return err + return nil } // SearchBlockEvents is not implemented by this sink, and reports an error for all queries. @@ -187,39 +253,5 @@ func (es *EventSink) HasBlock(h int64) (bool, error) { return false, errors.New("hasBlock is not supported via the postgres event sink") } -func indexBlockEvents( - sqlStmt sq.InsertBuilder, - events []abci.Event, - ty string, - height int64, - ts time.Time, - chainID string, -) (sq.InsertBuilder, error) { - for _, event := range events { - // only index events with a non-empty type - if len(event.Type) == 0 { - continue - } - - for _, attr := range event.Attributes { - if len(attr.Key) == 0 { - continue - } - - // index iff the event specified index:true and it's not a reserved event - compositeKey := fmt.Sprintf("%s.%s", event.Type, attr.Key) - if compositeKey == types.BlockHeightKey { - return sqlStmt, fmt.Errorf( - "event type and attribute key \"%s\" is reserved; please use a different key", compositeKey) - } - - if attr.GetIndex() { - sqlStmt = sqlStmt.Values(compositeKey, attr.Value, height, ty, ts, chainID) - } - } - } - return sqlStmt, nil -} - // Stop closes the underlying PostgreSQL database. func (es *EventSink) Stop() error { return es.store.Close() } diff --git a/state/indexer/sink/psql/psql_test.go b/state/indexer/sink/psql/psql_test.go index 35ad7eea3..e8a1ce833 100644 --- a/state/indexer/sink/psql/psql_test.go +++ b/state/indexer/sink/psql/psql_test.go @@ -3,195 +3,261 @@ package psql import ( "context" "database/sql" - "errors" + "flag" "fmt" "io/ioutil" + "log" "os" + "os/signal" "testing" "time" - sq "github.com/Masterminds/squirrel" - schema "github.com/adlio/schema" - proto "github.com/gogo/protobuf/proto" - _ "github.com/lib/pq" - dockertest "github.com/ory/dockertest" + "github.com/adlio/schema" + "github.com/gogo/protobuf/proto" + "github.com/ory/dockertest" "github.com/ory/dockertest/docker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/types" + + // Register the Postgres database driver. + _ "github.com/lib/pq" ) -var db *sql.DB -var resource *dockertest.Resource -var chainID = "test-chainID" +// Verify that the type satisfies the EventSink interface. +var _ indexer.EventSink = (*EventSink)(nil) var ( + doPauseAtExit = flag.Bool("pause-at-exit", false, + "If true, pause the test until interrupted at shutdown, to allow debugging") + + // A hook that test cases can call to obtain the shared database instance + // used for testing the sink. This is initialized in TestMain (see below). + testDB func() *sql.DB +) + +const ( user = "postgres" password = "secret" port = "5432" dsn = "postgres://%s:%s@localhost:%s/%s?sslmode=disable" dbName = "postgres" + chainID = "test-chainID" + + viewBlockEvents = "block_events" + viewTxEvents = "tx_events" ) -func TestType(t *testing.T) { - pool, err := setupDB(t) - require.NoError(t, err) +func TestMain(m *testing.M) { + flag.Parse() - psqlSink := &EventSink{store: db, chainID: chainID} - assert.Equal(t, indexer.PSQL, psqlSink.Type()) - require.NoError(t, teardown(t, pool)) -} + // Set up docker and start a container running PostgreSQL. + pool, err := dockertest.NewPool(os.Getenv("DOCKER_URL")) + if err != nil { + log.Fatalf("Creating docker pool: %v", err) + } -func TestBlockFuncs(t *testing.T) { - pool, err := setupDB(t) - require.NoError(t, err) + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "postgres", + Tag: "13", + Env: []string{ + "POSTGRES_USER=" + user, + "POSTGRES_PASSWORD=" + password, + "POSTGRES_DB=" + dbName, + "listen_addresses = '*'", + }, + ExposedPorts: []string{port}, + }, func(config *docker.HostConfig) { + // set AutoRemove to true so that stopped container goes away by itself + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{ + Name: "no", + } + }) + if err != nil { + log.Fatalf("Starting docker pool: %v", err) + } - indexer := &EventSink{store: db, chainID: chainID} - require.NoError(t, indexer.IndexBlockEvents(getTestBlockHeader())) + if *doPauseAtExit { + log.Print("Pause at exit is enabled, containers will not expire") + } else { + const expireSeconds = 60 + _ = resource.Expire(expireSeconds) + log.Printf("Container expiration set to %d seconds", expireSeconds) + } - r, err := verifyBlock(1) - assert.True(t, r) - require.NoError(t, err) + // Connect to the database, clear any leftover data, and install the + // indexing schema. + conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName) + var db *sql.DB - r, err = verifyBlock(2) - assert.False(t, r) - require.NoError(t, err) + if err := pool.Retry(func() error { + sink, err := NewEventSink(conn, chainID) + if err != nil { + return err + } + db = sink.DB() // set global for test use + return db.Ping() + }); err != nil { + log.Fatalf("Connecting to database: %v", err) + } - r, err = indexer.HasBlock(1) - assert.False(t, r) - assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err) + if err := resetDatabase(db); err != nil { + log.Fatalf("Flushing database: %v", err) + } - r, err = indexer.HasBlock(2) - assert.False(t, r) - assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err) + sm, err := readSchema() + if err != nil { + log.Fatalf("Reading schema: %v", err) + } else if err := schema.NewMigrator().Apply(db, sm); err != nil { + log.Fatalf("Applying schema: %v", err) + } - r2, err := indexer.SearchBlockEvents(context.TODO(), nil) - assert.Nil(t, r2) - assert.Equal(t, errors.New("block search is not supported via the postgres event sink"), err) + // Set up the hook for tests to get the shared database handle. + testDB = func() *sql.DB { return db } - require.NoError(t, verifyTimeStamp(TableEventBlock)) + // Run the selected test cases. + code := m.Run() - // try to insert the duplicate block events. - err = indexer.IndexBlockEvents(getTestBlockHeader()) - require.NoError(t, err) + // Clean up and shut down the database container. + if *doPauseAtExit { + log.Print("Testing complete, pausing for inspection. Send SIGINT to resume teardown") + waitForInterrupt() + log.Print("(resuming)") + } + log.Print("Shutting down database") + if err := pool.Purge(resource); err != nil { + log.Printf("WARNING: Purging pool failed: %v", err) + } + if err := db.Close(); err != nil { + log.Printf("WARNING: Closing database failed: %v", err) + } - require.NoError(t, teardown(t, pool)) + os.Exit(code) } -func TestTxFuncs(t *testing.T) { - pool, err := setupDB(t) - assert.Nil(t, err) - - indexer := &EventSink{store: db, chainID: chainID} +func TestType(t *testing.T) { + psqlSink := &EventSink{store: testDB(), chainID: chainID} + assert.Equal(t, indexer.PSQL, psqlSink.Type()) +} - txResult := txResultWithEvents([]abci.Event{ - {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}}, - {Type: "account", Attributes: []abci.EventAttribute{{Key: "owner", Value: "Ivan", Index: true}}}, - {Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}}, - }) - err = indexer.IndexTxEvents([]*abci.TxResult{txResult}) - require.NoError(t, err) +func TestIndexing(t *testing.T) { + t.Run("IndexBlockEvents", func(t *testing.T) { + indexer := &EventSink{store: testDB(), chainID: chainID} + require.NoError(t, indexer.IndexBlockEvents(newTestBlockHeader())) - tx, err := verifyTx(types.Tx(txResult.Tx).Hash()) - require.NoError(t, err) - assert.Equal(t, txResult, tx) + verifyBlock(t, 1) + verifyBlock(t, 2) - require.NoError(t, verifyTimeStamp(TableEventTx)) - require.NoError(t, verifyTimeStamp(TableResultTx)) + verifyNotImplemented(t, "hasBlock", func() (bool, error) { return indexer.HasBlock(1) }) + verifyNotImplemented(t, "hasBlock", func() (bool, error) { return indexer.HasBlock(2) }) - tx, err = indexer.GetTxByHash(types.Tx(txResult.Tx).Hash()) - assert.Nil(t, tx) - assert.Equal(t, errors.New("getTxByHash is not supported via the postgres event sink"), err) + verifyNotImplemented(t, "block search", func() (bool, error) { + v, err := indexer.SearchBlockEvents(context.Background(), nil) + return v != nil, err + }) - r2, err := indexer.SearchTxEvents(context.TODO(), nil) - assert.Nil(t, r2) - assert.Equal(t, errors.New("tx search is not supported via the postgres event sink"), err) + require.NoError(t, verifyTimeStamp(tableBlocks)) - // try to insert the duplicate tx events. - err = indexer.IndexTxEvents([]*abci.TxResult{txResult}) - require.NoError(t, err) + // Attempting to reindex the same events should gracefully succeed. + require.NoError(t, indexer.IndexBlockEvents(newTestBlockHeader())) + }) - assert.Nil(t, teardown(t, pool)) + t.Run("IndexTxEvents", func(t *testing.T) { + indexer := &EventSink{store: testDB(), chainID: chainID} + + txResult := txResultWithEvents([]abci.Event{ + makeIndexedEvent("account.number", "1"), + makeIndexedEvent("account.owner", "Ivan"), + makeIndexedEvent("account.owner", "Yulieta"), + + {Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}}, + }) + require.NoError(t, indexer.IndexTxEvents([]*abci.TxResult{txResult})) + + txr, err := loadTxResult(types.Tx(txResult.Tx).Hash()) + require.NoError(t, err) + assert.Equal(t, txResult, txr) + + require.NoError(t, verifyTimeStamp(tableTxResults)) + require.NoError(t, verifyTimeStamp(viewTxEvents)) + + verifyNotImplemented(t, "getTxByHash", func() (bool, error) { + txr, err := indexer.GetTxByHash(types.Tx(txResult.Tx).Hash()) + return txr != nil, err + }) + verifyNotImplemented(t, "tx search", func() (bool, error) { + txr, err := indexer.SearchTxEvents(context.Background(), nil) + return txr != nil, err + }) + + // try to insert the duplicate tx events. + err = indexer.IndexTxEvents([]*abci.TxResult{txResult}) + require.NoError(t, err) + }) } func TestStop(t *testing.T) { - pool, err := setupDB(t) - require.NoError(t, err) - - indexer := &EventSink{store: db} + indexer := &EventSink{store: testDB()} require.NoError(t, indexer.Stop()) - - defer db.Close() - require.NoError(t, pool.Purge(resource)) } -func getTestBlockHeader() types.EventDataNewBlockHeader { +// newTestBlockHeader constructs a fresh copy of a block header containing +// known test values to exercise the indexer. +func newTestBlockHeader() types.EventDataNewBlockHeader { return types.EventDataNewBlockHeader{ Header: types.Header{Height: 1}, ResultBeginBlock: abci.ResponseBeginBlock{ Events: []abci.Event{ - { - Type: "begin_event", - Attributes: []abci.EventAttribute{ - { - Key: "proposer", - Value: "FCAA001", - Index: true, - }, - }, - }, + makeIndexedEvent("begin_event.proposer", "FCAA001"), + makeIndexedEvent("thingy.whatzit", "O.O"), }, }, ResultEndBlock: abci.ResponseEndBlock{ Events: []abci.Event{ - { - Type: "end_event", - Attributes: []abci.EventAttribute{ - { - Key: "foo", - Value: "100", - Index: true, - }, - }, - }, + makeIndexedEvent("end_event.foo", "100"), + makeIndexedEvent("thingy.whatzit", "-.O"), }, }, } } +// readSchema loads the indexing database schema file func readSchema() ([]*schema.Migration, error) { - - filename := "schema.sql" + const filename = "schema.sql" contents, err := ioutil.ReadFile(filename) if err != nil { return nil, fmt.Errorf("failed to read sql file from '%s': %w", filename, err) } - mg := &schema.Migration{} - mg.ID = time.Now().Local().String() + " db schema" - mg.Script = string(contents) - return append([]*schema.Migration{}, mg), nil + return []*schema.Migration{{ + ID: time.Now().Local().String() + " db schema", + Script: string(contents), + }}, nil } -func resetDB(t *testing.T) { - q := "DROP TABLE IF EXISTS block_events,tx_events,tx_results" - _, err := db.Exec(q) - - require.NoError(t, err) - - q = "DROP TYPE IF EXISTS block_event_type" - _, err = db.Exec(q) - require.NoError(t, err) +// resetDB drops all the data from the test database. +func resetDatabase(db *sql.DB) error { + _, err := db.Exec(`DROP TABLE IF EXISTS blocks,tx_results,events,attributes CASCADE;`) + if err != nil { + return fmt.Errorf("dropping tables: %v", err) + } + _, err = db.Exec(`DROP VIEW IF EXISTS event_attributes,block_events,tx_events CASCADE;`) + if err != nil { + return fmt.Errorf("dropping views: %v", err) + } + return nil } +// txResultWithEvents constructs a fresh transaction result with fixed values +// for testing, that includes the specified events. func txResultWithEvents(events []abci.Event) *abci.TxResult { - tx := types.Tx("HELLO WORLD") return &abci.TxResult{ Height: 1, Index: 0, - Tx: tx, + Tx: types.Tx("HELLO WORLD"), Result: abci.ResponseDeliverTx{ Data: []byte{0}, Code: abci.CodeTypeOK, @@ -201,166 +267,78 @@ func txResultWithEvents(events []abci.Event) *abci.TxResult { } } -func verifyTx(hash []byte) (*abci.TxResult, error) { - join := fmt.Sprintf("%s ON %s.id = tx_result_id", TableEventTx, TableResultTx) - sqlStmt := sq. - Select("tx_result", fmt.Sprintf("%s.id", TableResultTx), "tx_result_id", "hash", "chain_id"). - Distinct().From(TableResultTx). - InnerJoin(join). - Where(fmt.Sprintf("hash = $1 AND chain_id = '%s'", chainID), fmt.Sprintf("%X", hash)) - - rows, err := sqlStmt.RunWith(db).Query() - if err != nil { - return nil, err +func loadTxResult(hash []byte) (*abci.TxResult, error) { + hashString := fmt.Sprintf("%X", hash) + var resultData []byte + if err := testDB().QueryRow(` +SELECT tx_result FROM `+tableTxResults+` WHERE tx_hash = $1; +`, hashString).Scan(&resultData); err != nil { + return nil, fmt.Errorf("lookup transaction for hash %q failed: %v", hashString, err) } - defer rows.Close() - - if rows.Next() { - var txResult []byte - var txResultID, txid int - var h, cid string - err = rows.Scan(&txResult, &txResultID, &txid, &h, &cid) - if err != nil { - return nil, nil - } - - msg := new(abci.TxResult) - err = proto.Unmarshal(txResult, msg) - if err != nil { - return nil, err - } - - return msg, err + txr := new(abci.TxResult) + if err := proto.Unmarshal(resultData, txr); err != nil { + return nil, fmt.Errorf("unmarshaling txr: %v", err) } - // No result - return nil, nil + return txr, nil } -func verifyTimeStamp(tb string) error { - - // We assume the tx indexing time would not exceed 2 second from now - sqlStmt := sq. - Select(fmt.Sprintf("%s.created_at", tb)). - Distinct().From(tb). - Where(fmt.Sprintf("%s.created_at >= $1", tb), time.Now().Add(-2*time.Second)) - - rows, err := sqlStmt.RunWith(db).Query() - if err != nil { - return err - } - - defer rows.Close() - - if rows.Next() { - var ts string - return rows.Scan(&ts) - } - - return errors.New("no result") +func verifyTimeStamp(tableName string) error { + return testDB().QueryRow(fmt.Sprintf(` +SELECT DISTINCT %[1]s.created_at + FROM %[1]s + WHERE %[1]s.created_at >= $1; +`, tableName), time.Now().Add(-2*time.Second)).Err() } -func verifyBlock(h int64) (bool, error) { - sqlStmt := sq. - Select("height"). - Distinct(). - From(TableEventBlock). - Where(fmt.Sprintf("height = %d", h)) - rows, err := sqlStmt.RunWith(db).Query() - if err != nil { - return false, err - } - - defer rows.Close() - - if !rows.Next() { - return false, nil +func verifyBlock(t *testing.T, height int64) { + // Check that the blocks table contains an entry for this height. + if err := testDB().QueryRow(` +SELECT height FROM `+tableBlocks+` WHERE height = $1; +`, height).Err(); err == sql.ErrNoRows { + t.Errorf("No block found for height=%d", height) + } else if err != nil { + t.Fatalf("Database query failed: %v", err) } - sqlStmt = sq. - Select("type, height", "chain_id"). - Distinct(). - From(TableEventBlock). - Where(fmt.Sprintf("height = %d AND type = '%s' AND chain_id = '%s'", h, types.EventTypeBeginBlock, chainID)) - - rows, err = sqlStmt.RunWith(db).Query() - if err != nil { - return false, err + // Verify the presence of begin_block and end_block events. + if err := testDB().QueryRow(` +SELECT type, height, chain_id FROM `+viewBlockEvents+` + WHERE height = $1 AND type = $2 AND chain_id = $3; +`, height, types.EventTypeBeginBlock, chainID).Err(); err == sql.ErrNoRows { + t.Errorf("No %q event found for height=%d", types.EventTypeBeginBlock, height) + } else if err != nil { + t.Fatalf("Database query failed: %v", err) } - defer rows.Close() - if !rows.Next() { - return false, nil + if err := testDB().QueryRow(` +SELECT type, height, chain_id FROM `+viewBlockEvents+` + WHERE height = $1 AND type = $2 AND chain_id = $3; +`, height, types.EventTypeEndBlock, chainID).Err(); err == sql.ErrNoRows { + t.Errorf("No %q event found for height=%d", types.EventTypeEndBlock, height) + } else if err != nil { + t.Fatalf("Database query failed: %v", err) } - - sqlStmt = sq. - Select("type, height"). - Distinct(). - From(TableEventBlock). - Where(fmt.Sprintf("height = %d AND type = '%s'", h, types.EventTypeEndBlock)) - rows, err = sqlStmt.RunWith(db).Query() - - if err != nil { - return false, err - } - defer rows.Close() - - return rows.Next(), nil } -func setupDB(t *testing.T) (*dockertest.Pool, error) { +// verifyNotImplemented calls f and verifies that it returns both a +// false-valued flag and a non-nil error whose string matching the expected +// "not supported" message with label prefixed. +func verifyNotImplemented(t *testing.T, label string, f func() (bool, error)) { t.Helper() - pool, err := dockertest.NewPool(os.Getenv("DOCKER_URL")) - - require.NoError(t, err) + t.Logf("Verifying that %q reports it is not implemented", label) - resource, err = pool.RunWithOptions(&dockertest.RunOptions{ - Repository: DriverName, - Tag: "13", - Env: []string{ - "POSTGRES_USER=" + user, - "POSTGRES_PASSWORD=" + password, - "POSTGRES_DB=" + dbName, - "listen_addresses = '*'", - }, - ExposedPorts: []string{port}, - }, func(config *docker.HostConfig) { - // set AutoRemove to true so that stopped container goes away by itself - config.AutoRemove = true - config.RestartPolicy = docker.RestartPolicy{ - Name: "no", - } - }) - - require.NoError(t, err) - - // Set the container to expire in a minute to avoid orphaned containers - // hanging around - _ = resource.Expire(60) - - conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName) - - require.NoError(t, pool.Retry(func() error { - sink, err := NewEventSink(conn, chainID) - if err != nil { - return err - } - db = sink.DB() // set global for test use - return db.Ping() - })) - - resetDB(t) - - sm, err := readSchema() - assert.Nil(t, err) - assert.Nil(t, schema.NewMigrator().Apply(db, sm)) - return pool, nil + want := label + " is not supported via the postgres event sink" + ok, err := f() + assert.False(t, ok) + require.NotNil(t, err) + assert.Equal(t, want, err.Error()) } -func teardown(t *testing.T, pool *dockertest.Pool) error { - t.Helper() - // When you're done, kill and remove the container - assert.Nil(t, pool.Purge(resource)) - return db.Close() +// waitForInterrupt blocks until a SIGINT is received by the process. +func waitForInterrupt() { + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt) + <-ch } diff --git a/state/indexer/sink/psql/schema.sql b/state/indexer/sink/psql/schema.sql index 0563136e2..1091cd4c3 100644 --- a/state/indexer/sink/psql/schema.sql +++ b/state/indexer/sink/psql/schema.sql @@ -1,32 +1,85 @@ -CREATE TYPE block_event_type AS ENUM ('begin_block', 'end_block', ''); -CREATE TABLE block_events ( - id SERIAL PRIMARY KEY, - key VARCHAR NOT NULL, - value VARCHAR NOT NULL, - height INTEGER NOT NULL, - type block_event_type, - created_at TIMESTAMPTZ NOT NULL, - chain_id VARCHAR NOT NULL, - UNIQUE (key, height) +/* + This file defines the database schema for the PostgresQL ("psql") event sink + implementation in Tendermint. The operator must create a database and install + this schema before using the database to index events. + */ + +-- The blocks table records metadata about each block. +-- The block record does not include its events or transactions (see tx_results). +CREATE TABLE blocks ( + rowid BIGSERIAL PRIMARY KEY, + + height BIGINT NOT NULL, + chain_id VARCHAR NOT NULL, + + -- When this block header was logged into the sink, in UTC. + created_at TIMESTAMPTZ NOT NULL, + + UNIQUE (height, chain_id) ); + +-- Index blocks by height and chain, since we need to resolve block IDs when +-- indexing transaction records and transaction events. +CREATE INDEX idx_blocks_height_chain ON blocks(height, chain_id); + +-- The tx_results table records metadata about transaction results. Note that +-- the events from a transaction are stored separately. CREATE TABLE tx_results ( - id SERIAL PRIMARY KEY, - tx_result BYTEA NOT NULL, - created_at TIMESTAMPTZ NOT NULL, - UNIQUE (tx_result) + rowid BIGSERIAL PRIMARY KEY, + + -- The block to which this transaction belongs. + block_id BIGINT NOT NULL REFERENCES blocks(rowid), + -- The sequential index of the transaction within the block. + index INTEGER NOT NULL, + -- When this result record was logged into the sink, in UTC. + created_at TIMESTAMPTZ NOT NULL, + -- The hex-encoded hash of the transaction. + tx_hash VARCHAR NOT NULL, + -- The protobuf wire encoding of the TxResult message. + tx_result BYTEA NOT NULL, + + UNIQUE (block_id, index) ); -CREATE TABLE tx_events ( - id SERIAL PRIMARY KEY, - key VARCHAR NOT NULL, - value VARCHAR NOT NULL, - height INTEGER NOT NULL, - hash VARCHAR NOT NULL, - tx_result_id SERIAL, - created_at TIMESTAMPTZ NOT NULL, - chain_id VARCHAR NOT NULL, - UNIQUE (hash, key), - FOREIGN KEY (tx_result_id) REFERENCES tx_results(id) ON DELETE CASCADE + +-- The events table records events. All events (both block and transaction) are +-- associated with a block ID; transaction events also have a transaction ID. +CREATE TABLE events ( + rowid BIGSERIAL PRIMARY KEY, + + -- The block and transaction this event belongs to. + -- If tx_id is NULL, this is a block event. + block_id BIGINT NOT NULL REFERENCES blocks(rowid), + tx_id BIGINT NULL REFERENCES tx_results(rowid), + + -- The application-defined type label for the event. + type VARCHAR NOT NULL ); -CREATE INDEX idx_block_events_key_value ON block_events(key, value); -CREATE INDEX idx_tx_events_key_value ON tx_events(key, value); -CREATE INDEX idx_tx_events_hash ON tx_events(hash); + +-- The attributes table records event attributes. +CREATE TABLE attributes ( + event_id BIGINT NOT NULL REFERENCES events(rowid), + key VARCHAR NOT NULL, -- bare key + composite_key VARCHAR NOT NULL, -- composed type.key + value VARCHAR NULL, + + UNIQUE (event_id, key) +); + +-- A joined view of events and their attributes. Events that do not have any +-- attributes are represented as a single row with empty key and value fields. +CREATE VIEW event_attributes AS + SELECT block_id, tx_id, type, key, composite_key, value + FROM events LEFT JOIN attributes ON (events.rowid = attributes.event_id); + +-- A joined view of all block events (those having tx_id NULL). +CREATE VIEW block_events AS + SELECT blocks.rowid as block_id, height, chain_id, type, key, composite_key, value + FROM blocks JOIN event_attributes ON (blocks.rowid = event_attributes.block_id) + WHERE event_attributes.tx_id IS NULL; + +-- A joined view of all transaction events. +CREATE VIEW tx_events AS + SELECT height, index, chain_id, type, key, composite_key, value, tx_results.created_at + FROM blocks JOIN tx_results ON (blocks.rowid = tx_results.block_id) + JOIN event_attributes ON (tx_results.rowid = event_attributes.tx_id) + WHERE event_attributes.tx_id IS NOT NULL;