You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

259 lines
8.6 KiB

  1. // Package psql implements an event sink backed by a PostgreSQL database.
  2. package psql
  3. import (
  4. "context"
  5. "database/sql"
  6. "errors"
  7. "fmt"
  8. "strings"
  9. "time"
  10. "github.com/gogo/protobuf/proto"
  11. abci "github.com/tendermint/tendermint/abci/types"
  12. "github.com/tendermint/tendermint/internal/pubsub/query"
  13. "github.com/tendermint/tendermint/internal/state/indexer"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. const (
  17. tableBlocks = "blocks"
  18. tableTxResults = "tx_results"
  19. tableEvents = "events"
  20. tableAttributes = "attributes"
  21. driverName = "postgres"
  22. )
  23. // EventSink is an indexer backend providing the tx/block index services. This
  24. // implementation stores records in a PostgreSQL database using the schema
  25. // defined in state/indexer/sink/psql/schema.sql.
  26. type EventSink struct {
  27. store *sql.DB
  28. chainID string
  29. }
  30. // NewEventSink constructs an event sink associated with the PostgreSQL
  31. // database specified by connStr. Events written to the sink are attributed to
  32. // the specified chainID.
  33. func NewEventSink(connStr, chainID string) (*EventSink, error) {
  34. db, err := sql.Open(driverName, connStr)
  35. if err != nil {
  36. return nil, err
  37. } else if err := db.Ping(); err != nil {
  38. return nil, err
  39. }
  40. return &EventSink{
  41. store: db,
  42. chainID: chainID,
  43. }, nil
  44. }
  45. // DB returns the underlying Postgres connection used by the sink.
  46. // This is exported to support testing.
  47. func (es *EventSink) DB() *sql.DB { return es.store }
  48. // Type returns the structure type for this sink, which is Postgres.
  49. func (es *EventSink) Type() indexer.EventSinkType { return indexer.PSQL }
  50. // runInTransaction executes query in a fresh database transaction.
  51. // If query reports an error, the transaction is rolled back and the
  52. // error from query is reported to the caller.
  53. // Otherwise, the result of committing the transaction is returned.
  54. func runInTransaction(db *sql.DB, query func(*sql.Tx) error) error {
  55. dbtx, err := db.Begin()
  56. if err != nil {
  57. return err
  58. }
  59. if err := query(dbtx); err != nil {
  60. _ = dbtx.Rollback() // report the initial error, not the rollback
  61. return err
  62. }
  63. return dbtx.Commit()
  64. }
  65. // queryWithID executes the specified SQL query with the given arguments,
  66. // expecting a single-row, single-column result containing an ID. If the query
  67. // succeeds, the ID from the result is returned.
  68. func queryWithID(tx *sql.Tx, query string, args ...interface{}) (uint32, error) {
  69. var id uint32
  70. if err := tx.QueryRow(query, args...).Scan(&id); err != nil {
  71. return 0, err
  72. }
  73. return id, nil
  74. }
  75. // insertEvents inserts a slice of events and any indexed attributes of those
  76. // events into the database associated with dbtx.
  77. //
  78. // If txID > 0, the event is attributed to the Tendermint transaction with that
  79. // ID; otherwise it is recorded as a block event.
  80. func insertEvents(dbtx *sql.Tx, blockID, txID uint32, evts []abci.Event) error {
  81. // Populate the transaction ID field iff one is defined (> 0).
  82. var txIDArg interface{}
  83. if txID > 0 {
  84. txIDArg = txID
  85. }
  86. // Add each event to the events table, and retrieve its row ID to use when
  87. // adding any attributes the event provides.
  88. for _, evt := range evts {
  89. // Skip events with an empty type.
  90. if evt.Type == "" {
  91. continue
  92. }
  93. eid, err := queryWithID(dbtx, `
  94. INSERT INTO `+tableEvents+` (block_id, tx_id, type) VALUES ($1, $2, $3)
  95. RETURNING rowid;
  96. `, blockID, txIDArg, evt.Type)
  97. if err != nil {
  98. return err
  99. }
  100. // Add any attributes flagged for indexing.
  101. for _, attr := range evt.Attributes {
  102. if !attr.Index {
  103. continue
  104. }
  105. compositeKey := evt.Type + "." + attr.Key
  106. if _, err := dbtx.Exec(`
  107. INSERT INTO `+tableAttributes+` (event_id, key, composite_key, value)
  108. VALUES ($1, $2, $3, $4);
  109. `, eid, attr.Key, compositeKey, attr.Value); err != nil {
  110. return err
  111. }
  112. }
  113. }
  114. return nil
  115. }
  116. // makeIndexedEvent constructs an event from the specified composite key and
  117. // value. If the key has the form "type.name", the event will have a single
  118. // attribute with that name and the value; otherwise the event will have only
  119. // a type and no attributes.
  120. func makeIndexedEvent(compositeKey, value string) abci.Event {
  121. i := strings.Index(compositeKey, ".")
  122. if i < 0 {
  123. return abci.Event{Type: compositeKey}
  124. }
  125. return abci.Event{Type: compositeKey[:i], Attributes: []abci.EventAttribute{
  126. {Key: compositeKey[i+1:], Value: value, Index: true},
  127. }}
  128. }
  129. // IndexBlockEvents indexes the specified block header, part of the
  130. // indexer.EventSink interface.
  131. func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error {
  132. ts := time.Now().UTC()
  133. return runInTransaction(es.store, func(dbtx *sql.Tx) error {
  134. // Add the block to the blocks table and report back its row ID for use
  135. // in indexing the events for the block.
  136. blockID, err := queryWithID(dbtx, `
  137. INSERT INTO `+tableBlocks+` (height, chain_id, created_at)
  138. VALUES ($1, $2, $3)
  139. ON CONFLICT DO NOTHING
  140. RETURNING rowid;
  141. `, h.Header.Height, es.chainID, ts)
  142. if err == sql.ErrNoRows {
  143. return nil // we already saw this block; quietly succeed
  144. } else if err != nil {
  145. return fmt.Errorf("indexing block header: %w", err)
  146. }
  147. // Insert the special block meta-event for height.
  148. if err := insertEvents(dbtx, blockID, 0, []abci.Event{
  149. makeIndexedEvent(types.BlockHeightKey, fmt.Sprint(h.Header.Height)),
  150. }); err != nil {
  151. return fmt.Errorf("block meta-events: %w", err)
  152. }
  153. // Insert all the block events. Order is important here,
  154. if err := insertEvents(dbtx, blockID, 0, h.ResultBeginBlock.Events); err != nil {
  155. return fmt.Errorf("begin-block events: %w", err)
  156. }
  157. if err := insertEvents(dbtx, blockID, 0, h.ResultEndBlock.Events); err != nil {
  158. return fmt.Errorf("end-block events: %w", err)
  159. }
  160. return nil
  161. })
  162. }
  163. func (es *EventSink) IndexTxEvents(txrs []*abci.TxResult) error {
  164. ts := time.Now().UTC()
  165. for _, txr := range txrs {
  166. // Encode the result message in protobuf wire format for indexing.
  167. resultData, err := proto.Marshal(txr)
  168. if err != nil {
  169. return fmt.Errorf("marshaling tx_result: %w", err)
  170. }
  171. // Index the hash of the underlying transaction as a hex string.
  172. txHash := fmt.Sprintf("%X", types.Tx(txr.Tx).Hash())
  173. if err := runInTransaction(es.store, func(dbtx *sql.Tx) error {
  174. // Find the block associated with this transaction. The block header
  175. // must have been indexed prior to the transactions belonging to it.
  176. blockID, err := queryWithID(dbtx, `
  177. SELECT rowid FROM `+tableBlocks+` WHERE height = $1 AND chain_id = $2;
  178. `, txr.Height, es.chainID)
  179. if err != nil {
  180. return fmt.Errorf("finding block ID: %w", err)
  181. }
  182. // Insert a record for this tx_result and capture its ID for indexing events.
  183. txID, err := queryWithID(dbtx, `
  184. INSERT INTO `+tableTxResults+` (block_id, index, created_at, tx_hash, tx_result)
  185. VALUES ($1, $2, $3, $4, $5)
  186. ON CONFLICT DO NOTHING
  187. RETURNING rowid;
  188. `, blockID, txr.Index, ts, txHash, resultData)
  189. if err == sql.ErrNoRows {
  190. return nil // we already saw this transaction; quietly succeed
  191. } else if err != nil {
  192. return fmt.Errorf("indexing tx_result: %w", err)
  193. }
  194. // Insert the special transaction meta-events for hash and height.
  195. if err := insertEvents(dbtx, blockID, txID, []abci.Event{
  196. makeIndexedEvent(types.TxHashKey, txHash),
  197. makeIndexedEvent(types.TxHeightKey, fmt.Sprint(txr.Height)),
  198. }); err != nil {
  199. return fmt.Errorf("indexing transaction meta-events: %w", err)
  200. }
  201. // Index any events packaged with the transaction.
  202. if err := insertEvents(dbtx, blockID, txID, txr.Result.Events); err != nil {
  203. return fmt.Errorf("indexing transaction events: %w", err)
  204. }
  205. return nil
  206. }); err != nil {
  207. return err
  208. }
  209. }
  210. return nil
  211. }
  212. // SearchBlockEvents is not implemented by this sink, and reports an error for all queries.
  213. func (es *EventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) {
  214. return nil, errors.New("block search is not supported via the postgres event sink")
  215. }
  216. // SearchTxEvents is not implemented by this sink, and reports an error for all queries.
  217. func (es *EventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) {
  218. return nil, errors.New("tx search is not supported via the postgres event sink")
  219. }
  220. // GetTxByHash is not implemented by this sink, and reports an error for all queries.
  221. func (es *EventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) {
  222. return nil, errors.New("getTxByHash is not supported via the postgres event sink")
  223. }
  224. // HasBlock is not implemented by this sink, and reports an error for all queries.
  225. func (es *EventSink) HasBlock(h int64) (bool, error) {
  226. return false, errors.New("hasBlock is not supported via the postgres event sink")
  227. }
  228. // Stop closes the underlying PostgreSQL database.
  229. func (es *EventSink) Stop() error { return es.store.Close() }