@ -1,3 +1,4 @@
// Package psql implements an event sink backed by a PostgreSQL database.
package psql
import (
@ -24,28 +25,38 @@ const (
DriverName = "postgres"
)
// EventSink is an indexer backend providing the tx/block index services.
// EventSink is an indexer backend providing the tx/block index services. This
// implementation stores records in a PostgreSQL database using the schema
// defined in state/indexer/sink/psql/schema.sql.
type EventSink struct {
store * sql . DB
chainID string
}
func NewEventSink ( connStr string , chainID string ) ( indexer . EventSink , * sql . DB , error ) {
// NewEventSink constructs an event sink associated with the PostgreSQL
// database specified by connStr. Events written to the sink are attributed to
// the specified chainID.
func NewEventSink ( connStr , chainID string ) ( * EventSink , error ) {
db , err := sql . Open ( DriverName , connStr )
if err != nil {
return nil , nil , err
return nil , err
}
return & EventSink {
store : db ,
chainID : chainID ,
} , db , nil
} , nil
}
func ( es * EventSink ) Type ( ) indexer . EventSinkType {
return indexer . PSQL
}
// DB returns the underlying Postgres connection used by the sink.
// This is exported to support testing.
func ( es * EventSink ) DB ( ) * sql . DB { return es . store }
// Type returns the structure type for this sink, which is Postgres.
func ( es * EventSink ) Type ( ) indexer . EventSinkType { return indexer . PSQL }
// IndexBlockEvents indexes the specified block header, part of the
// indexer.EventSink interface.
func ( es * EventSink ) IndexBlockEvents ( h types . EventDataNewBlockHeader ) error {
sqlStmt := sq .
Insert ( TableEventBlock ) .
@ -156,18 +167,22 @@ func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error {
return err
}
// SearchBlockEvents is not implemented by this sink, and reports an error for all queries.
func ( es * EventSink ) SearchBlockEvents ( ctx context . Context , q * query . Query ) ( [ ] int64 , error ) {
return nil , errors . New ( "block search is not supported via the postgres event sink" )
}
// SearchTxEvents is not implemented by this sink, and reports an error for all queries.
func ( es * EventSink ) SearchTxEvents ( ctx context . Context , q * query . Query ) ( [ ] * abci . TxResult , error ) {
return nil , errors . New ( "tx search is not supported via the postgres event sink" )
}
// GetTxByHash is not implemented by this sink, and reports an error for all queries.
func ( es * EventSink ) GetTxByHash ( hash [ ] byte ) ( * abci . TxResult , error ) {
return nil , errors . New ( "getTxByHash is not supported via the postgres event sink" )
}
// HasBlock is not implemented by this sink, and reports an error for all queries.
func ( es * EventSink ) HasBlock ( h int64 ) ( bool , error ) {
return false , errors . New ( "hasBlock is not supported via the postgres event sink" )
}
@ -206,6 +221,5 @@ func indexBlockEvents(
return sqlStmt , nil
}
func ( es * EventSink ) Stop ( ) error {
return es . store . Close ( )
}
// Stop closes the underlying PostgreSQL database.
func ( es * EventSink ) Stop ( ) error { return es . store . Close ( ) }