Browse Source

indexer: use INSERT ... ON CONFLICT in the psql eventsink insert functions (#6556)

pull/6518/head
JayT106 4 years ago
committed by GitHub
parent
commit
d2b78ec09b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 6 deletions
  1. +17
    -4
      state/indexer/sink/psql/psql.go
  2. +9
    -0
      state/indexer/sink/psql/psql_test.go
  3. +5
    -2
      state/indexer/sink/psql/schema.sql

+ 17
- 4
state/indexer/sink/psql/psql.go View File

@ -50,7 +50,9 @@ func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error {
sqlStmt := sq. sqlStmt := sq.
Insert(TableEventBlock). Insert(TableEventBlock).
Columns("key", "value", "height", "type", "created_at", "chain_id"). Columns("key", "value", "height", "type", "created_at", "chain_id").
PlaceholderFormat(sq.Dollar)
PlaceholderFormat(sq.Dollar).
Suffix("ON CONFLICT (key,height)").
Suffix("DO NOTHING")
ts := time.Now() ts := time.Now()
// index the reserved block height index // index the reserved block height index
@ -83,12 +85,16 @@ func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error {
Columns("tx_result", "created_at"). Columns("tx_result", "created_at").
PlaceholderFormat(sq.Dollar). PlaceholderFormat(sq.Dollar).
RunWith(es.store). RunWith(es.store).
Suffix("ON CONFLICT (tx_result)").
Suffix("DO NOTHING").
Suffix("RETURNING \"id\"") Suffix("RETURNING \"id\"")
sqlStmtEvents := sq. sqlStmtEvents := sq.
Insert(TableEventTx). Insert(TableEventTx).
Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id"). Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id").
PlaceholderFormat(sq.Dollar)
PlaceholderFormat(sq.Dollar).
Suffix("ON CONFLICT (key,hash)").
Suffix("DO NOTHING")
ts := time.Now() ts := time.Now()
for _, tx := range txr { for _, tx := range txr {
@ -100,17 +106,24 @@ func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error {
sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts) sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts)
// execute sqlStmtTxResult db query and retrieve the txid // execute sqlStmtTxResult db query and retrieve the txid
err = sqlStmtTxResult.QueryRow().Scan(&txid)
r, err := sqlStmtTxResult.Query()
if err != nil { if err != nil {
return err return err
} }
if !r.Next() {
return nil
}
if err := r.Scan(&txid); err != nil {
return err
}
// index the reserved height and hash indices // index the reserved height and hash indices
hash := fmt.Sprintf("%X", types.Tx(tx.Tx).Hash()) hash := fmt.Sprintf("%X", types.Tx(tx.Tx).Hash())
sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txid, ts, es.chainID) sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txid, ts, es.chainID)
sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, fmt.Sprint(tx.Height), tx.Height, hash, txid, ts, es.chainID) sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, fmt.Sprint(tx.Height), tx.Height, hash, txid, ts, es.chainID)
for _, event := range tx.Result.Events { for _, event := range tx.Result.Events {
// only index events with a non-empty type // only index events with a non-empty type
if len(event.Type) == 0 { if len(event.Type) == 0 {


+ 9
- 0
state/indexer/sink/psql/psql_test.go View File

@ -72,6 +72,11 @@ func TestBlockFuncs(t *testing.T) {
assert.Equal(t, errors.New("block search is not supported via the postgres event sink"), err) assert.Equal(t, errors.New("block search is not supported via the postgres event sink"), err)
require.NoError(t, verifyTimeStamp(TableEventBlock)) require.NoError(t, verifyTimeStamp(TableEventBlock))
// try to insert the duplicate block events.
err = indexer.IndexBlockEvents(getTestBlockHeader())
require.NoError(t, err)
require.NoError(t, teardown(t, pool)) require.NoError(t, teardown(t, pool))
} }
@ -104,6 +109,10 @@ func TestTxFuncs(t *testing.T) {
assert.Nil(t, r2) assert.Nil(t, r2)
assert.Equal(t, errors.New("tx search is not supported via the postgres event sink"), err) assert.Equal(t, errors.New("tx search is not supported via the postgres event sink"), err)
// try to insert the duplicate tx events.
err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
require.NoError(t, err)
assert.Nil(t, teardown(t, pool)) assert.Nil(t, teardown(t, pool))
} }


+ 5
- 2
state/indexer/sink/psql/schema.sql View File

@ -6,12 +6,14 @@ CREATE TABLE block_events (
height INTEGER NOT NULL, height INTEGER NOT NULL,
type block_event_type, type block_event_type,
created_at TIMESTAMPTZ NOT NULL, created_at TIMESTAMPTZ NOT NULL,
chain_id VARCHAR NOT NULL
chain_id VARCHAR NOT NULL,
UNIQUE (key, height)
); );
CREATE TABLE tx_results ( CREATE TABLE tx_results (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
tx_result BYTEA NOT NULL, tx_result BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL
created_at TIMESTAMPTZ NOT NULL,
UNIQUE (tx_result)
); );
CREATE TABLE tx_events ( CREATE TABLE tx_events (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
@ -22,6 +24,7 @@ CREATE TABLE tx_events (
tx_result_id SERIAL, tx_result_id SERIAL,
created_at TIMESTAMPTZ NOT NULL, created_at TIMESTAMPTZ NOT NULL,
chain_id VARCHAR NOT NULL, chain_id VARCHAR NOT NULL,
UNIQUE (hash, key),
FOREIGN KEY (tx_result_id) FOREIGN KEY (tx_result_id)
REFERENCES tx_results(id) REFERENCES tx_results(id)
ON DELETE CASCADE ON DELETE CASCADE


Loading…
Cancel
Save