You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

257 lines
8.5 KiB

  1. // Package psql implements an event sink backed by a PostgreSQL database.
  2. package psql
  3. import (
  4. "context"
  5. "database/sql"
  6. "errors"
  7. "fmt"
  8. "strings"
  9. "time"
  10. "github.com/gogo/protobuf/proto"
  11. abci "github.com/tendermint/tendermint/abci/types"
  12. "github.com/tendermint/tendermint/internal/state/indexer"
  13. "github.com/tendermint/tendermint/libs/pubsub/query"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. const (
  17. tableBlocks = "blocks"
  18. tableTxResults = "tx_results"
  19. tableEvents = "events"
  20. tableAttributes = "attributes"
  21. driverName = "postgres"
  22. )
  23. // EventSink is an indexer backend providing the tx/block index services. This
  24. // implementation stores records in a PostgreSQL database using the schema
  25. // defined in state/indexer/sink/psql/schema.sql.
  26. type EventSink struct {
  27. store *sql.DB
  28. chainID string
  29. }
  30. // NewEventSink constructs an event sink associated with the PostgreSQL
  31. // database specified by connStr. Events written to the sink are attributed to
  32. // the specified chainID.
  33. func NewEventSink(connStr, chainID string) (*EventSink, error) {
  34. db, err := sql.Open(driverName, connStr)
  35. if err != nil {
  36. return nil, err
  37. }
  38. return &EventSink{
  39. store: db,
  40. chainID: chainID,
  41. }, nil
  42. }
  43. // DB returns the underlying Postgres connection used by the sink.
  44. // This is exported to support testing.
  45. func (es *EventSink) DB() *sql.DB { return es.store }
  46. // Type returns the structure type for this sink, which is Postgres.
  47. func (es *EventSink) Type() indexer.EventSinkType { return indexer.PSQL }
  48. // runInTransaction executes query in a fresh database transaction.
  49. // If query reports an error, the transaction is rolled back and the
  50. // error from query is reported to the caller.
  51. // Otherwise, the result of committing the transaction is returned.
  52. func runInTransaction(db *sql.DB, query func(*sql.Tx) error) error {
  53. dbtx, err := db.Begin()
  54. if err != nil {
  55. return err
  56. }
  57. if err := query(dbtx); err != nil {
  58. _ = dbtx.Rollback() // report the initial error, not the rollback
  59. return err
  60. }
  61. return dbtx.Commit()
  62. }
  63. // queryWithID executes the specified SQL query with the given arguments,
  64. // expecting a single-row, single-column result containing an ID. If the query
  65. // succeeds, the ID from the result is returned.
  66. func queryWithID(tx *sql.Tx, query string, args ...interface{}) (uint32, error) {
  67. var id uint32
  68. if err := tx.QueryRow(query, args...).Scan(&id); err != nil {
  69. return 0, err
  70. }
  71. return id, nil
  72. }
  73. // insertEvents inserts a slice of events and any indexed attributes of those
  74. // events into the database associated with dbtx.
  75. //
  76. // If txID > 0, the event is attributed to the Tendermint transaction with that
  77. // ID; otherwise it is recorded as a block event.
  78. func insertEvents(dbtx *sql.Tx, blockID, txID uint32, evts []abci.Event) error {
  79. // Populate the transaction ID field iff one is defined (> 0).
  80. var txIDArg interface{}
  81. if txID > 0 {
  82. txIDArg = txID
  83. }
  84. // Add each event to the events table, and retrieve its row ID to use when
  85. // adding any attributes the event provides.
  86. for _, evt := range evts {
  87. // Skip events with an empty type.
  88. if evt.Type == "" {
  89. continue
  90. }
  91. eid, err := queryWithID(dbtx, `
  92. INSERT INTO `+tableEvents+` (block_id, tx_id, type) VALUES ($1, $2, $3)
  93. RETURNING rowid;
  94. `, blockID, txIDArg, evt.Type)
  95. if err != nil {
  96. return err
  97. }
  98. // Add any attributes flagged for indexing.
  99. for _, attr := range evt.Attributes {
  100. if !attr.Index {
  101. continue
  102. }
  103. compositeKey := evt.Type + "." + attr.Key
  104. if _, err := dbtx.Exec(`
  105. INSERT INTO `+tableAttributes+` (event_id, key, composite_key, value)
  106. VALUES ($1, $2, $3, $4);
  107. `, eid, attr.Key, compositeKey, attr.Value); err != nil {
  108. return err
  109. }
  110. }
  111. }
  112. return nil
  113. }
  114. // makeIndexedEvent constructs an event from the specified composite key and
  115. // value. If the key has the form "type.name", the event will have a single
  116. // attribute with that name and the value; otherwise the event will have only
  117. // a type and no attributes.
  118. func makeIndexedEvent(compositeKey, value string) abci.Event {
  119. i := strings.Index(compositeKey, ".")
  120. if i < 0 {
  121. return abci.Event{Type: compositeKey}
  122. }
  123. return abci.Event{Type: compositeKey[:i], Attributes: []abci.EventAttribute{
  124. {Key: compositeKey[i+1:], Value: value, Index: true},
  125. }}
  126. }
  127. // IndexBlockEvents indexes the specified block header, part of the
  128. // indexer.EventSink interface.
  129. func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error {
  130. ts := time.Now().UTC()
  131. return runInTransaction(es.store, func(dbtx *sql.Tx) error {
  132. // Add the block to the blocks table and report back its row ID for use
  133. // in indexing the events for the block.
  134. blockID, err := queryWithID(dbtx, `
  135. INSERT INTO `+tableBlocks+` (height, chain_id, created_at)
  136. VALUES ($1, $2, $3)
  137. ON CONFLICT DO NOTHING
  138. RETURNING rowid;
  139. `, h.Header.Height, es.chainID, ts)
  140. if err == sql.ErrNoRows {
  141. return nil // we already saw this block; quietly succeed
  142. } else if err != nil {
  143. return fmt.Errorf("indexing block header: %w", err)
  144. }
  145. // Insert the special block meta-event for height.
  146. if err := insertEvents(dbtx, blockID, 0, []abci.Event{
  147. makeIndexedEvent(types.BlockHeightKey, fmt.Sprint(h.Header.Height)),
  148. }); err != nil {
  149. return fmt.Errorf("block meta-events: %w", err)
  150. }
  151. // Insert all the block events. Order is important here,
  152. if err := insertEvents(dbtx, blockID, 0, h.ResultBeginBlock.Events); err != nil {
  153. return fmt.Errorf("begin-block events: %w", err)
  154. }
  155. if err := insertEvents(dbtx, blockID, 0, h.ResultEndBlock.Events); err != nil {
  156. return fmt.Errorf("end-block events: %w", err)
  157. }
  158. return nil
  159. })
  160. }
  161. func (es *EventSink) IndexTxEvents(txrs []*abci.TxResult) error {
  162. ts := time.Now().UTC()
  163. for _, txr := range txrs {
  164. // Encode the result message in protobuf wire format for indexing.
  165. resultData, err := proto.Marshal(txr)
  166. if err != nil {
  167. return fmt.Errorf("marshaling tx_result: %w", err)
  168. }
  169. // Index the hash of the underlying transaction as a hex string.
  170. txHash := fmt.Sprintf("%X", types.Tx(txr.Tx).Hash())
  171. if err := runInTransaction(es.store, func(dbtx *sql.Tx) error {
  172. // Find the block associated with this transaction. The block header
  173. // must have been indexed prior to the transactions belonging to it.
  174. blockID, err := queryWithID(dbtx, `
  175. SELECT rowid FROM `+tableBlocks+` WHERE height = $1 AND chain_id = $2;
  176. `, txr.Height, es.chainID)
  177. if err != nil {
  178. return fmt.Errorf("finding block ID: %w", err)
  179. }
  180. // Insert a record for this tx_result and capture its ID for indexing events.
  181. txID, err := queryWithID(dbtx, `
  182. INSERT INTO `+tableTxResults+` (block_id, index, created_at, tx_hash, tx_result)
  183. VALUES ($1, $2, $3, $4, $5)
  184. ON CONFLICT DO NOTHING
  185. RETURNING rowid;
  186. `, blockID, txr.Index, ts, txHash, resultData)
  187. if err == sql.ErrNoRows {
  188. return nil // we already saw this transaction; quietly succeed
  189. } else if err != nil {
  190. return fmt.Errorf("indexing tx_result: %w", err)
  191. }
  192. // Insert the special transaction meta-events for hash and height.
  193. if err := insertEvents(dbtx, blockID, txID, []abci.Event{
  194. makeIndexedEvent(types.TxHashKey, txHash),
  195. makeIndexedEvent(types.TxHeightKey, fmt.Sprint(txr.Height)),
  196. }); err != nil {
  197. return fmt.Errorf("indexing transaction meta-events: %w", err)
  198. }
  199. // Index any events packaged with the transaction.
  200. if err := insertEvents(dbtx, blockID, txID, txr.Result.Events); err != nil {
  201. return fmt.Errorf("indexing transaction events: %w", err)
  202. }
  203. return nil
  204. }); err != nil {
  205. return err
  206. }
  207. }
  208. return nil
  209. }
  210. // SearchBlockEvents is not implemented by this sink, and reports an error for all queries.
  211. func (es *EventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) {
  212. return nil, errors.New("block search is not supported via the postgres event sink")
  213. }
  214. // SearchTxEvents is not implemented by this sink, and reports an error for all queries.
  215. func (es *EventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) {
  216. return nil, errors.New("tx search is not supported via the postgres event sink")
  217. }
  218. // GetTxByHash is not implemented by this sink, and reports an error for all queries.
  219. func (es *EventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) {
  220. return nil, errors.New("getTxByHash is not supported via the postgres event sink")
  221. }
  222. // HasBlock is not implemented by this sink, and reports an error for all queries.
  223. func (es *EventSink) HasBlock(h int64) (bool, error) {
  224. return false, errors.New("hasBlock is not supported via the postgres event sink")
  225. }
  226. // Stop closes the underlying PostgreSQL database.
  227. func (es *EventSink) Stop() error { return es.store.Close() }