@ -3,195 +3,261 @@ package psql
import (
"context"
"database/sql"
"errors "
"flag "
"fmt"
"io/ioutil"
"log"
"os"
"os/signal"
"testing"
"time"
sq "github.com/Masterminds/squirrel"
schema "github.com/adlio/schema"
proto "github.com/gogo/protobuf/proto"
_ "github.com/lib/pq"
dockertest "github.com/ory/dockertest"
"github.com/adlio/schema"
"github.com/gogo/protobuf/proto"
"github.com/ory/dockertest"
"github.com/ory/dockertest/docker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
// Register the Postgres database driver.
_ "github.com/lib/pq"
)
var db * sql . DB
var resource * dockertest . Resource
var chainID = "test-chainID"
// Verify that the type satisfies the EventSink interface.
var _ indexer . EventSink = ( * EventSink ) ( nil )
var (
doPauseAtExit = flag . Bool ( "pause-at-exit" , false ,
"If true, pause the test until interrupted at shutdown, to allow debugging" )
// A hook that test cases can call to obtain the shared database instance
// used for testing the sink. This is initialized in TestMain (see below).
testDB func ( ) * sql . DB
)
const (
user = "postgres"
password = "secret"
port = "5432"
dsn = "postgres://%s:%s@localhost:%s/%s?sslmode=disable"
dbName = "postgres"
chainID = "test-chainID"
viewBlockEvents = "block_events"
viewTxEvents = "tx_events"
)
func TestType ( t * testing . T ) {
pool , err := setupDB ( t )
require . NoError ( t , err )
func TestMain ( m * testing . M ) {
flag . Parse ( )
psqlSink := & EventSink { store : db , chainID : chainID }
assert . Equal ( t , indexer . PSQL , psqlSink . Type ( ) )
require . NoError ( t , teardown ( t , pool ) )
}
// Set up docker and start a container running PostgreSQL.
pool , err := dockertest . NewPool ( os . Getenv ( "DOCKER_URL" ) )
if err != nil {
log . Fatalf ( "Creating docker pool: %v" , err )
}
func TestBlockFuncs ( t * testing . T ) {
pool , err := setupDB ( t )
require . NoError ( t , err )
resource , err := pool . RunWithOptions ( & dockertest . RunOptions {
Repository : "postgres" ,
Tag : "13" ,
Env : [ ] string {
"POSTGRES_USER=" + user ,
"POSTGRES_PASSWORD=" + password ,
"POSTGRES_DB=" + dbName ,
"listen_addresses = '*'" ,
} ,
ExposedPorts : [ ] string { port } ,
} , func ( config * docker . HostConfig ) {
// set AutoRemove to true so that stopped container goes away by itself
config . AutoRemove = true
config . RestartPolicy = docker . RestartPolicy {
Name : "no" ,
}
} )
if err != nil {
log . Fatalf ( "Starting docker pool: %v" , err )
}
indexer := & EventSink { store : db , chainID : chainID }
require . NoError ( t , indexer . IndexBlockEvents ( getTestBlockHeader ( ) ) )
if * doPauseAtExit {
log . Print ( "Pause at exit is enabled, containers will not expire" )
} else {
const expireSeconds = 60
_ = resource . Expire ( expireSeconds )
log . Printf ( "Container expiration set to %d seconds" , expireSeconds )
}
r , err := verifyBlock ( 1 )
assert . True ( t , r )
require . NoError ( t , err )
// Connect to the database, clear any leftover data, and install the
// indexing schema.
conn := fmt . Sprintf ( dsn , user , password , resource . GetPort ( port + "/tcp" ) , dbName )
var db * sql . DB
r , err = verifyBlock ( 2 )
assert . False ( t , r )
require . NoError ( t , err )
if err := pool . Retry ( func ( ) error {
sink , err := NewEventSink ( conn , chainID )
if err != nil {
return err
}
db = sink . DB ( ) // set global for test use
return db . Ping ( )
} ) ; err != nil {
log . Fatalf ( "Connecting to database: %v" , err )
}
r , err = indexer . HasBlock ( 1 )
assert . False ( t , r )
assert . Equal ( t , errors . New ( "hasBlock is not supported via the postgres event sink" ) , err )
if err := resetDatabase ( db ) ; err != nil {
log . Fatalf ( "Flushing database: %v" , er r )
}
r , err = indexer . HasBlock ( 2 )
assert . False ( t , r )
assert . Equal ( t , errors . New ( "hasBlock is not supported via the postgres event sink" ) , err )
sm , err := readSchema ( )
if err != nil {
log . Fatalf ( "Reading schema: %v" , err )
} else if err := schema . NewMigrator ( ) . Apply ( db , sm ) ; err != nil {
log . Fatalf ( "Applying schema: %v" , err )
}
r2 , err := indexer . SearchBlockEvents ( context . TODO ( ) , nil )
assert . Nil ( t , r2 )
assert . Equal ( t , errors . New ( "block search is not supported via the postgres event sink" ) , err )
// Set up the hook for tests to get the shared database handle.
testDB = func ( ) * sql . DB { return db }
require . NoError ( t , verifyTimeStamp ( TableEventBlock ) )
// Run the selected test cases.
code := m . Run ( )
// try to insert the duplicate block events.
err = indexer . IndexBlockEvents ( getTestBlockHeader ( ) )
require . NoError ( t , err )
// Clean up and shut down the database container.
if * doPauseAtExit {
log . Print ( "Testing complete, pausing for inspection. Send SIGINT to resume teardown" )
waitForInterrupt ( )
log . Print ( "(resuming)" )
}
log . Print ( "Shutting down database" )
if err := pool . Purge ( resource ) ; err != nil {
log . Printf ( "WARNING: Purging pool failed: %v" , err )
}
if err := db . Close ( ) ; err != nil {
log . Printf ( "WARNING: Closing database failed: %v" , err )
}
require . NoError ( t , teardown ( t , pool ) )
os . Exit ( code )
}
func TestTxFuncs ( t * testing . T ) {
pool , err := setupDB ( t )
assert . Nil ( t , err )
indexer := & EventSink { store : db , chainID : chainID }
func TestType ( t * testing . T ) {
psqlSink := & EventSink { store : testDB ( ) , chainID : chainID }
assert . Equal ( t , indexer . PSQL , psqlSink . Type ( ) )
}
txResult := txResultWithEvents ( [ ] abci . Event {
{ Type : "account" , Attributes : [ ] abci . EventAttribute { { Key : "number" , Value : "1" , Index : true } } } ,
{ Type : "account" , Attributes : [ ] abci . EventAttribute { { Key : "owner" , Value : "Ivan" , Index : true } } } ,
{ Type : "" , Attributes : [ ] abci . EventAttribute { { Key : "not_allowed" , Value : "Vlad" , Index : true } } } ,
} )
err = indexer . IndexTxEvents ( [ ] * abci . TxResult { txResult } )
require . NoError ( t , err )
func TestIndexing ( t * testing . T ) {
t . Run ( "IndexBlockEvents" , func ( t * testing . T ) {
indexer := & EventSink { store : testDB ( ) , chainID : chainID }
require . NoError ( t , indexer . IndexBlockEvents ( newTestBlockHeader ( ) ) )
tx , err := verifyTx ( types . Tx ( txResult . Tx ) . Hash ( ) )
require . NoError ( t , err )
assert . Equal ( t , txResult , tx )
verifyBlock ( t , 1 )
verifyBlock ( t , 2 )
require . NoError ( t , verifyTimeStamp ( TableEventTx ) )
require . NoError ( t , verifyTimeStamp ( TableResultTx ) )
verifyNotImplemented ( t , "hasBlock" , func ( ) ( bool , error ) { return indexer . HasBlock ( 1 ) } )
verifyNotImplemented ( t , "hasBlock" , func ( ) ( bool , error ) { return indexer . HasBlock ( 2 ) } )
tx , err = indexer . GetTxByHash ( types . Tx ( txResult . Tx ) . Hash ( ) )
assert . Nil ( t , tx )
assert . Equal ( t , errors . New ( "getTxByHash is not supported via the postgres event sink" ) , err )
verifyNotImplemented ( t , "block search" , func ( ) ( bool , error ) {
v , err := indexer . SearchBlockEvents ( context . Background ( ) , nil )
return v != nil , err
} )
r2 , err := indexer . SearchTxEvents ( context . TODO ( ) , nil )
assert . Nil ( t , r2 )
assert . Equal ( t , errors . New ( "tx search is not supported via the postgres event sink" ) , err )
require . NoError ( t , verifyTimeStamp ( tableBlocks ) )
// try to insert the duplicate tx events .
err = indexer . IndexTxEvents ( [ ] * abci . TxResult { txResult } )
require . NoError ( t , err )
// Attempting to reindex the same events should gracefully succeed.
require . NoError ( t , indexer . IndexBlockEvents ( newTestBlockHeader ( ) ) )
} )
assert . Nil ( t , teardown ( t , pool ) )
t . Run ( "IndexTxEvents" , func ( t * testing . T ) {
indexer := & EventSink { store : testDB ( ) , chainID : chainID }
txResult := txResultWithEvents ( [ ] abci . Event {
makeIndexedEvent ( "account.number" , "1" ) ,
makeIndexedEvent ( "account.owner" , "Ivan" ) ,
makeIndexedEvent ( "account.owner" , "Yulieta" ) ,
{ Type : "" , Attributes : [ ] abci . EventAttribute { { Key : "not_allowed" , Value : "Vlad" , Index : true } } } ,
} )
require . NoError ( t , indexer . IndexTxEvents ( [ ] * abci . TxResult { txResult } ) )
txr , err := loadTxResult ( types . Tx ( txResult . Tx ) . Hash ( ) )
require . NoError ( t , err )
assert . Equal ( t , txResult , txr )
require . NoError ( t , verifyTimeStamp ( tableTxResults ) )
require . NoError ( t , verifyTimeStamp ( viewTxEvents ) )
verifyNotImplemented ( t , "getTxByHash" , func ( ) ( bool , error ) {
txr , err := indexer . GetTxByHash ( types . Tx ( txResult . Tx ) . Hash ( ) )
return txr != nil , err
} )
verifyNotImplemented ( t , "tx search" , func ( ) ( bool , error ) {
txr , err := indexer . SearchTxEvents ( context . Background ( ) , nil )
return txr != nil , err
} )
// try to insert the duplicate tx events.
err = indexer . IndexTxEvents ( [ ] * abci . TxResult { txResult } )
require . NoError ( t , err )
} )
}
func TestStop ( t * testing . T ) {
pool , err := setupDB ( t )
require . NoError ( t , err )
indexer := & EventSink { store : db }
indexer := & EventSink { store : testDB ( ) }
require . NoError ( t , indexer . Stop ( ) )
defer db . Close ( )
require . NoError ( t , pool . Purge ( resource ) )
}
func getTestBlockHeader ( ) types . EventDataNewBlockHeader {
// newTestBlockHeader constructs a fresh copy of a block header containing
// known test values to exercise the indexer.
func newTestBlockHeader ( ) types . EventDataNewBlockHeader {
return types . EventDataNewBlockHeader {
Header : types . Header { Height : 1 } ,
ResultBeginBlock : abci . ResponseBeginBlock {
Events : [ ] abci . Event {
{
Type : "begin_event" ,
Attributes : [ ] abci . EventAttribute {
{
Key : "proposer" ,
Value : "FCAA001" ,
Index : true ,
} ,
} ,
} ,
makeIndexedEvent ( "begin_event.proposer" , "FCAA001" ) ,
makeIndexedEvent ( "thingy.whatzit" , "O.O" ) ,
} ,
} ,
ResultEndBlock : abci . ResponseEndBlock {
Events : [ ] abci . Event {
{
Type : "end_event" ,
Attributes : [ ] abci . EventAttribute {
{
Key : "foo" ,
Value : "100" ,
Index : true ,
} ,
} ,
} ,
makeIndexedEvent ( "end_event.foo" , "100" ) ,
makeIndexedEvent ( "thingy.whatzit" , "-.O" ) ,
} ,
} ,
}
}
// readSchema loads the indexing database schema file
func readSchema ( ) ( [ ] * schema . Migration , error ) {
filename := "schema.sql"
const filename = "schema.sql"
contents , err := ioutil . ReadFile ( filename )
if err != nil {
return nil , fmt . Errorf ( "failed to read sql file from '%s': %w" , filename , err )
}
mg := & schema . Migration { }
mg . ID = time . Now ( ) . Local ( ) . String ( ) + " db schema"
mg . Script = string ( contents )
return append ( [ ] * schema . Migration { } , mg ) , nil
return [ ] * schema . Migration { {
ID : time . Now ( ) . Local ( ) . String ( ) + " db schema" ,
Script : string ( contents ) ,
} } , nil
}
func resetDB ( t * testing . T ) {
q := "DROP TABLE IF EXISTS block_events,tx_events,tx_results"
_ , err := db . Exec ( q )
require . NoError ( t , err )
q = "DROP TYPE IF EXISTS block_event_type"
_ , err = db . Exec ( q )
require . NoError ( t , err )
// resetDB drops all the data from the test database.
func resetDatabase ( db * sql . DB ) error {
_ , err := db . Exec ( ` DROP TABLE IF EXISTS blocks,tx_results,events,attributes CASCADE; ` )
if err != nil {
return fmt . Errorf ( "dropping tables: %v" , err )
}
_ , err = db . Exec ( ` DROP VIEW IF EXISTS event_attributes,block_events,tx_events CASCADE; ` )
if err != nil {
return fmt . Errorf ( "dropping views: %v" , err )
}
return nil
}
// txResultWithEvents constructs a fresh transaction result with fixed values
// for testing, that includes the specified events.
func txResultWithEvents ( events [ ] abci . Event ) * abci . TxResult {
tx := types . Tx ( "HELLO WORLD" )
return & abci . TxResult {
Height : 1 ,
Index : 0 ,
Tx : tx ,
Tx : types . T x ( "HELLO WORLD" ) ,
Result : abci . ResponseDeliverTx {
Data : [ ] byte { 0 } ,
Code : abci . CodeTypeOK ,
@ -201,166 +267,78 @@ func txResultWithEvents(events []abci.Event) *abci.TxResult {
}
}
func verifyTx ( hash [ ] byte ) ( * abci . TxResult , error ) {
join := fmt . Sprintf ( "%s ON %s.id = tx_result_id" , TableEventTx , TableResultTx )
sqlStmt := sq .
Select ( "tx_result" , fmt . Sprintf ( "%s.id" , TableResultTx ) , "tx_result_id" , "hash" , "chain_id" ) .
Distinct ( ) . From ( TableResultTx ) .
InnerJoin ( join ) .
Where ( fmt . Sprintf ( "hash = $1 AND chain_id = '%s'" , chainID ) , fmt . Sprintf ( "%X" , hash ) )
rows , err := sqlStmt . RunWith ( db ) . Query ( )
if err != nil {
return nil , err
func loadTxResult ( hash [ ] byte ) ( * abci . TxResult , error ) {
hashString := fmt . Sprintf ( "%X" , hash )
var resultData [ ] byte
if err := testDB ( ) . QueryRow ( `
SELECT tx_result FROM ` +tableTxResults+ ` WHERE tx_hash = $ 1 ;
` , hashString ) . Scan ( & resultData ) ; err != nil {
return nil , fmt . Errorf ( "lookup transaction for hash %q failed: %v" , hashString , err )
}
defer rows . Close ( )
if rows . Next ( ) {
var txResult [ ] byte
var txResultID , txid int
var h , cid string
err = rows . Scan ( & txResult , & txResultID , & txid , & h , & cid )
if err != nil {
return nil , nil
}
msg := new ( abci . TxResult )
err = proto . Unmarshal ( txResult , msg )
if err != nil {
return nil , err
}
return msg , err
txr := new ( abci . TxResult )
if err := proto . Unmarshal ( resultData , txr ) ; err != nil {
return nil , fmt . Errorf ( "unmarshaling txr: %v" , err )
}
// No result
return nil , nil
return txr , nil
}
func verifyTimeStamp ( tb string ) error {
// We assume the tx indexing time would not exceed 2 second from now
sqlStmt := sq .
Select ( fmt . Sprintf ( "%s.created_at" , tb ) ) .
Distinct ( ) . From ( tb ) .
Where ( fmt . Sprintf ( "%s.created_at >= $1" , tb ) , time . Now ( ) . Add ( - 2 * time . Second ) )
rows , err := sqlStmt . RunWith ( db ) . Query ( )
if err != nil {
return err
}
defer rows . Close ( )
if rows . Next ( ) {
var ts string
return rows . Scan ( & ts )
}
return errors . New ( "no result" )
func verifyTimeStamp ( tableName string ) error {
return testDB ( ) . QueryRow ( fmt . Sprintf ( `
SELECT DISTINCT % [ 1 ] s . created_at
FROM % [ 1 ] s
WHERE % [ 1 ] s . created_at >= $ 1 ;
` , tableName ) , time . Now ( ) . Add ( - 2 * time . Second ) ) . Err ( )
}
func verifyBlock ( h int64 ) ( bool , error ) {
sqlStmt := sq .
Select ( "height" ) .
Distinct ( ) .
From ( TableEventBlock ) .
Where ( fmt . Sprintf ( "height = %d" , h ) )
rows , err := sqlStmt . RunWith ( db ) . Query ( )
if err != nil {
return false , err
}
defer rows . Close ( )
if ! rows . Next ( ) {
return false , nil
func verifyBlock ( t * testing . T , height int64 ) {
// Check that the blocks table contains an entry for this height.
if err := testDB ( ) . QueryRow ( `
SELECT height FROM ` +tableBlocks+ ` WHERE height = $ 1 ;
` , height ) . Err ( ) ; err == sql . ErrNoRows {
t . Errorf ( "No block found for height=%d" , height )
} else if err != nil {
t . Fatalf ( "Database query failed: %v" , err )
}
sqlStmt = sq .
Select ( "type, height" , "chain_id" ) .
Distinct ( ) .
From ( TableEventBlock ) .
Where ( fmt . Sprintf ( "height = %d AND type = '%s' AND chain_id = '%s'" , h , types . EventTypeBeginBlock , chainID ) )
rows , err = sqlStmt . RunWith ( db ) . Query ( )
if err != nil {
return false , err
// Verify the presence of begin_block and end_block events.
if err := testDB ( ) . QueryRow ( `
SELECT type , height , chain_id FROM ` +viewBlockEvents+ `
WHERE height = $ 1 AND type = $ 2 AND chain_id = $ 3 ;
` , height , types . EventTypeBeginBlock , chainID ) . Err ( ) ; err == sql . ErrNoRows {
t . Errorf ( "No %q event found for height=%d" , types . EventTypeBeginBlock , height )
} else if err != nil {
t . Fatalf ( "Database query failed: %v" , err )
}
defer rows . Close ( )
if ! rows . Next ( ) {
return false , nil
if err := testDB ( ) . QueryRow ( `
SELECT type , height , chain_id FROM ` +viewBlockEvents+ `
WHERE height = $ 1 AND type = $ 2 AND chain_id = $ 3 ;
` , height , types . EventTypeEndBlock , chainID ) . Err ( ) ; err == sql . ErrNoRows {
t . Errorf ( "No %q event found for height=%d" , types . EventTypeEndBlock , height )
} else if err != nil {
t . Fatalf ( "Database query failed: %v" , err )
}
sqlStmt = sq .
Select ( "type, height" ) .
Distinct ( ) .
From ( TableEventBlock ) .
Where ( fmt . Sprintf ( "height = %d AND type = '%s'" , h , types . EventTypeEndBlock ) )
rows , err = sqlStmt . RunWith ( db ) . Query ( )
if err != nil {
return false , err
}
defer rows . Close ( )
return rows . Next ( ) , nil
}
func setupDB ( t * testing . T ) ( * dockertest . Pool , error ) {
// verifyNotImplemented calls f and verifies that it returns both a
// false-valued flag and a non-nil error whose string matching the expected
// "not supported" message with label prefixed.
func verifyNotImplemented ( t * testing . T , label string , f func ( ) ( bool , error ) ) {
t . Helper ( )
pool , err := dockertest . NewPool ( os . Getenv ( "DOCKER_URL" ) )
require . NoError ( t , err )
t . Logf ( "Verifying that %q reports it is not implemented" , label )
resource , err = pool . RunWithOptions ( & dockertest . RunOptions {
Repository : DriverName ,
Tag : "13" ,
Env : [ ] string {
"POSTGRES_USER=" + user ,
"POSTGRES_PASSWORD=" + password ,
"POSTGRES_DB=" + dbName ,
"listen_addresses = '*'" ,
} ,
ExposedPorts : [ ] string { port } ,
} , func ( config * docker . HostConfig ) {
// set AutoRemove to true so that stopped container goes away by itself
config . AutoRemove = true
config . RestartPolicy = docker . RestartPolicy {
Name : "no" ,
}
} )
require . NoError ( t , err )
// Set the container to expire in a minute to avoid orphaned containers
// hanging around
_ = resource . Expire ( 60 )
conn := fmt . Sprintf ( dsn , user , password , resource . GetPort ( port + "/tcp" ) , dbName )
require . NoError ( t , pool . Retry ( func ( ) error {
sink , err := NewEventSink ( conn , chainID )
if err != nil {
return err
}
db = sink . DB ( ) // set global for test use
return db . Ping ( )
} ) )
resetDB ( t )
sm , err := readSchema ( )
assert . Nil ( t , err )
assert . Nil ( t , schema . NewMigrator ( ) . Apply ( db , sm ) )
return pool , nil
want := label + " is not supported via the postgres event sink"
ok , err := f ( )
assert . False ( t , ok )
require . NotNil ( t , err )
assert . Equal ( t , want , err . Error ( ) )
}
func teardown ( t * testing . T , pool * dockertest . Pool ) error {
t . Helper ( )
// When you're done, kill and remove the container
assert . Nil ( t , pool . Purge ( resource ) )
return db . Close ( )
// waitForInterrupt blocks until a SIGINT is received by the process.
func waitForInterrupt ( ) {
ch := make ( chan os . Signal , 1 )
signal . Notify ( ch , os . Interrupt )
<- ch
}