package psql import ( "context" "database/sql" "errors" "fmt" "io/ioutil" "os" "testing" "time" sq "github.com/Masterminds/squirrel" schema "github.com/adlio/schema" proto "github.com/gogo/protobuf/proto" _ "github.com/lib/pq" dockertest "github.com/ory/dockertest" "github.com/ory/dockertest/docker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/types" ) var db *sql.DB var resource *dockertest.Resource var chainID = "test-chainID" var ( user = "postgres" password = "secret" port = "5432" dsn = "postgres://%s:%s@localhost:%s/%s?sslmode=disable" dbName = "postgres" ) func TestType(t *testing.T) { pool, err := setupDB(t) require.NoError(t, err) psqlSink := &EventSink{store: db, chainID: chainID} assert.Equal(t, indexer.PSQL, psqlSink.Type()) require.NoError(t, teardown(t, pool)) } func TestBlockFuncs(t *testing.T) { pool, err := setupDB(t) require.NoError(t, err) indexer := &EventSink{store: db, chainID: chainID} require.NoError(t, indexer.IndexBlockEvents(getTestBlockHeader())) r, err := verifyBlock(1) assert.True(t, r) require.NoError(t, err) r, err = verifyBlock(2) assert.False(t, r) require.NoError(t, err) r, err = indexer.HasBlock(1) assert.False(t, r) assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err) r, err = indexer.HasBlock(2) assert.False(t, r) assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err) r2, err := indexer.SearchBlockEvents(context.TODO(), nil) assert.Nil(t, r2) assert.Equal(t, errors.New("block search is not supported via the postgres event sink"), err) require.NoError(t, verifyTimeStamp(TableEventBlock)) // try to insert the duplicate block events. err = indexer.IndexBlockEvents(getTestBlockHeader()) require.NoError(t, err) require.NoError(t, teardown(t, pool)) } func TestTxFuncs(t *testing.T) { pool, err := setupDB(t) assert.Nil(t, err) indexer := &EventSink{store: db, chainID: chainID} txResult := txResultWithEvents([]abci.Event{ {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}}, {Type: "account", Attributes: []abci.EventAttribute{{Key: "owner", Value: "Ivan", Index: true}}}, {Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}}, }) err = indexer.IndexTxEvents([]*abci.TxResult{txResult}) require.NoError(t, err) tx, err := verifyTx(types.Tx(txResult.Tx).Hash()) require.NoError(t, err) assert.Equal(t, txResult, tx) require.NoError(t, verifyTimeStamp(TableEventTx)) require.NoError(t, verifyTimeStamp(TableResultTx)) tx, err = indexer.GetTxByHash(types.Tx(txResult.Tx).Hash()) assert.Nil(t, tx) assert.Equal(t, errors.New("getTxByHash is not supported via the postgres event sink"), err) r2, err := indexer.SearchTxEvents(context.TODO(), nil) assert.Nil(t, r2) assert.Equal(t, errors.New("tx search is not supported via the postgres event sink"), err) // try to insert the duplicate tx events. err = indexer.IndexTxEvents([]*abci.TxResult{txResult}) require.NoError(t, err) assert.Nil(t, teardown(t, pool)) } func TestStop(t *testing.T) { pool, err := setupDB(t) require.NoError(t, err) indexer := &EventSink{store: db} require.NoError(t, indexer.Stop()) defer db.Close() require.NoError(t, pool.Purge(resource)) } func getTestBlockHeader() types.EventDataNewBlockHeader { return types.EventDataNewBlockHeader{ Header: types.Header{Height: 1}, ResultBeginBlock: abci.ResponseBeginBlock{ Events: []abci.Event{ { Type: "begin_event", Attributes: []abci.EventAttribute{ { Key: "proposer", Value: "FCAA001", Index: true, }, }, }, }, }, ResultEndBlock: abci.ResponseEndBlock{ Events: []abci.Event{ { Type: "end_event", Attributes: []abci.EventAttribute{ { Key: "foo", Value: "100", Index: true, }, }, }, }, }, } } func readSchema() ([]*schema.Migration, error) { filename := "schema.sql" contents, err := ioutil.ReadFile(filename) if err != nil { return nil, fmt.Errorf("failed to read sql file from '%s': %w", filename, err) } mg := &schema.Migration{} mg.ID = time.Now().Local().String() + " db schema" mg.Script = string(contents) return append([]*schema.Migration{}, mg), nil } func resetDB(t *testing.T) { q := "DROP TABLE IF EXISTS block_events,tx_events,tx_results" _, err := db.Exec(q) require.NoError(t, err) q = "DROP TYPE IF EXISTS block_event_type" _, err = db.Exec(q) require.NoError(t, err) } func txResultWithEvents(events []abci.Event) *abci.TxResult { tx := types.Tx("HELLO WORLD") return &abci.TxResult{ Height: 1, Index: 0, Tx: tx, Result: abci.ResponseDeliverTx{ Data: []byte{0}, Code: abci.CodeTypeOK, Log: "", Events: events, }, } } func verifyTx(hash []byte) (*abci.TxResult, error) { join := fmt.Sprintf("%s ON %s.id = tx_result_id", TableEventTx, TableResultTx) sqlStmt := sq. Select("tx_result", fmt.Sprintf("%s.id", TableResultTx), "tx_result_id", "hash", "chain_id"). Distinct().From(TableResultTx). InnerJoin(join). Where(fmt.Sprintf("hash = $1 AND chain_id = '%s'", chainID), fmt.Sprintf("%X", hash)) rows, err := sqlStmt.RunWith(db).Query() if err != nil { return nil, err } defer rows.Close() if rows.Next() { var txResult []byte var txResultID, txid int var h, cid string err = rows.Scan(&txResult, &txResultID, &txid, &h, &cid) if err != nil { return nil, nil } msg := new(abci.TxResult) err = proto.Unmarshal(txResult, msg) if err != nil { return nil, err } return msg, err } // No result return nil, nil } func verifyTimeStamp(tb string) error { // We assume the tx indexing time would not exceed 2 second from now sqlStmt := sq. Select(fmt.Sprintf("%s.created_at", tb)). Distinct().From(tb). Where(fmt.Sprintf("%s.created_at >= $1", tb), time.Now().Add(-2*time.Second)) rows, err := sqlStmt.RunWith(db).Query() if err != nil { return err } defer rows.Close() if rows.Next() { var ts string err = rows.Scan(&ts) if err != nil { return err } return nil } return errors.New("no result") } func verifyBlock(h int64) (bool, error) { sqlStmt := sq. Select("height"). Distinct(). From(TableEventBlock). Where(fmt.Sprintf("height = %d", h)) rows, err := sqlStmt.RunWith(db).Query() if err != nil { return false, err } defer rows.Close() if !rows.Next() { return false, nil } sqlStmt = sq. Select("type, height", "chain_id"). Distinct(). From(TableEventBlock). Where(fmt.Sprintf("height = %d AND type = '%s' AND chain_id = '%s'", h, types.EventTypeBeginBlock, chainID)) rows, err = sqlStmt.RunWith(db).Query() if err != nil { return false, err } if !rows.Next() { return false, nil } sqlStmt = sq. Select("type, height"). Distinct(). From(TableEventBlock). Where(fmt.Sprintf("height = %d AND type = '%s'", h, types.EventTypeEndBlock)) rows, err = sqlStmt.RunWith(db).Query() if err != nil { return false, err } return rows.Next(), nil } func setupDB(t *testing.T) (*dockertest.Pool, error) { t.Helper() pool, err := dockertest.NewPool(os.Getenv("DOCKER_URL")) require.NoError(t, err) resource, err = pool.RunWithOptions(&dockertest.RunOptions{ Repository: DriverName, Tag: "13", Env: []string{ "POSTGRES_USER=" + user, "POSTGRES_PASSWORD=" + password, "POSTGRES_DB=" + dbName, "listen_addresses = '*'", }, ExposedPorts: []string{port}, }, func(config *docker.HostConfig) { // set AutoRemove to true so that stopped container goes away by itself config.AutoRemove = true config.RestartPolicy = docker.RestartPolicy{ Name: "no", } }) require.NoError(t, err) // Set the container to expire in a minute to avoid orphaned containers // hanging around _ = resource.Expire(60) conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName) if err = pool.Retry(func() error { var err error _, db, err = NewEventSink(conn, chainID) if err != nil { return err } return db.Ping() }); err != nil { require.NoError(t, err) } resetDB(t) sm, err := readSchema() assert.Nil(t, err) assert.Nil(t, schema.NewMigrator().Apply(db, sm)) return pool, nil } func teardown(t *testing.T, pool *dockertest.Pool) error { t.Helper() // When you're done, kill and remove the container assert.Nil(t, pool.Purge(resource)) return db.Close() }