diff --git a/state/indexer/sink/psql/psql.go b/state/indexer/sink/psql/psql.go index 0e66d6117..94ae4cb93 100644 --- a/state/indexer/sink/psql/psql.go +++ b/state/indexer/sink/psql/psql.go @@ -50,7 +50,9 @@ func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error { sqlStmt := sq. Insert(TableEventBlock). Columns("key", "value", "height", "type", "created_at", "chain_id"). - PlaceholderFormat(sq.Dollar) + PlaceholderFormat(sq.Dollar). + Suffix("ON CONFLICT (key,height)"). + Suffix("DO NOTHING") ts := time.Now() // index the reserved block height index @@ -83,12 +85,16 @@ func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error { Columns("tx_result", "created_at"). PlaceholderFormat(sq.Dollar). RunWith(es.store). + Suffix("ON CONFLICT (tx_result)"). + Suffix("DO NOTHING"). Suffix("RETURNING \"id\"") sqlStmtEvents := sq. Insert(TableEventTx). Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id"). - PlaceholderFormat(sq.Dollar) + PlaceholderFormat(sq.Dollar). + Suffix("ON CONFLICT (key,hash)"). + Suffix("DO NOTHING") ts := time.Now() for _, tx := range txr { @@ -100,17 +106,24 @@ func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error { sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts) // execute sqlStmtTxResult db query and retrieve the txid - err = sqlStmtTxResult.QueryRow().Scan(&txid) + r, err := sqlStmtTxResult.Query() if err != nil { return err } + if !r.Next() { + return nil + } + + if err := r.Scan(&txid); err != nil { + return err + } + // index the reserved height and hash indices hash := fmt.Sprintf("%X", types.Tx(tx.Tx).Hash()) sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txid, ts, es.chainID) sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, fmt.Sprint(tx.Height), tx.Height, hash, txid, ts, es.chainID) - for _, event := range tx.Result.Events { // only index events with a non-empty type if len(event.Type) == 0 { diff --git a/state/indexer/sink/psql/psql_test.go b/state/indexer/sink/psql/psql_test.go index 591d23edd..0c3cf81d0 100644 --- a/state/indexer/sink/psql/psql_test.go +++ b/state/indexer/sink/psql/psql_test.go @@ -72,6 +72,11 @@ func TestBlockFuncs(t *testing.T) { assert.Equal(t, errors.New("block search is not supported via the postgres event sink"), err) require.NoError(t, verifyTimeStamp(TableEventBlock)) + + // try to insert the duplicate block events. + err = indexer.IndexBlockEvents(getTestBlockHeader()) + require.NoError(t, err) + require.NoError(t, teardown(t, pool)) } @@ -104,6 +109,10 @@ func TestTxFuncs(t *testing.T) { assert.Nil(t, r2) assert.Equal(t, errors.New("tx search is not supported via the postgres event sink"), err) + // try to insert the duplicate tx events. + err = indexer.IndexTxEvents([]*abci.TxResult{txResult}) + require.NoError(t, err) + assert.Nil(t, teardown(t, pool)) } diff --git a/state/indexer/sink/psql/schema.sql b/state/indexer/sink/psql/schema.sql index 36e7327cc..0be9bdfa8 100644 --- a/state/indexer/sink/psql/schema.sql +++ b/state/indexer/sink/psql/schema.sql @@ -6,12 +6,14 @@ CREATE TABLE block_events ( height INTEGER NOT NULL, type block_event_type, created_at TIMESTAMPTZ NOT NULL, - chain_id VARCHAR NOT NULL + chain_id VARCHAR NOT NULL, + UNIQUE (key, height) ); CREATE TABLE tx_results ( id SERIAL PRIMARY KEY, tx_result BYTEA NOT NULL, - created_at TIMESTAMPTZ NOT NULL + created_at TIMESTAMPTZ NOT NULL, + UNIQUE (tx_result) ); CREATE TABLE tx_events ( id SERIAL PRIMARY KEY, @@ -22,6 +24,7 @@ CREATE TABLE tx_events ( tx_result_id SERIAL, created_at TIMESTAMPTZ NOT NULL, chain_id VARCHAR NOT NULL, + UNIQUE (hash, key), FOREIGN KEY (tx_result_id) REFERENCES tx_results(id) ON DELETE CASCADE