You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

348 lines
10 KiB

  1. package psql
  2. import (
  3. "context"
  4. "database/sql"
  5. "flag"
  6. "fmt"
  7. "log"
  8. "os"
  9. "os/signal"
  10. "testing"
  11. "time"
  12. "github.com/adlio/schema"
  13. "github.com/gogo/protobuf/proto"
  14. "github.com/ory/dockertest"
  15. "github.com/ory/dockertest/docker"
  16. "github.com/stretchr/testify/assert"
  17. "github.com/stretchr/testify/require"
  18. abci "github.com/tendermint/tendermint/abci/types"
  19. "github.com/tendermint/tendermint/internal/state/indexer"
  20. "github.com/tendermint/tendermint/types"
  21. // Register the Postgres database driver.
  22. _ "github.com/lib/pq"
  23. )
  24. // Verify that the type satisfies the EventSink interface.
  25. var _ indexer.EventSink = (*EventSink)(nil)
  26. var (
  27. doPauseAtExit = flag.Bool("pause-at-exit", false,
  28. "If true, pause the test until interrupted at shutdown, to allow debugging")
  29. // A hook that test cases can call to obtain the shared database instance
  30. // used for testing the sink. This is initialized in TestMain (see below).
  31. testDB func() *sql.DB
  32. )
  33. const (
  34. user = "postgres"
  35. password = "secret"
  36. port = "5432"
  37. dsn = "postgres://%s:%s@localhost:%s/%s?sslmode=disable"
  38. dbName = "postgres"
  39. chainID = "test-chainID"
  40. viewBlockEvents = "block_events"
  41. viewTxEvents = "tx_events"
  42. )
  43. func TestMain(m *testing.M) {
  44. flag.Parse()
  45. // Set up docker and start a container running PostgreSQL.
  46. pool, err := dockertest.NewPool(os.Getenv("DOCKER_URL"))
  47. if err != nil {
  48. log.Fatalf("Creating docker pool: %v", err)
  49. }
  50. resource, err := pool.RunWithOptions(&dockertest.RunOptions{
  51. Repository: "postgres",
  52. Tag: "13",
  53. Env: []string{
  54. "POSTGRES_USER=" + user,
  55. "POSTGRES_PASSWORD=" + password,
  56. "POSTGRES_DB=" + dbName,
  57. "listen_addresses = '*'",
  58. },
  59. ExposedPorts: []string{port},
  60. }, func(config *docker.HostConfig) {
  61. // set AutoRemove to true so that stopped container goes away by itself
  62. config.AutoRemove = true
  63. config.RestartPolicy = docker.RestartPolicy{
  64. Name: "no",
  65. }
  66. })
  67. if err != nil {
  68. log.Fatalf("Starting docker pool: %v", err)
  69. }
  70. if *doPauseAtExit {
  71. log.Print("Pause at exit is enabled, containers will not expire")
  72. } else {
  73. const expireSeconds = 60
  74. _ = resource.Expire(expireSeconds)
  75. log.Printf("Container expiration set to %d seconds", expireSeconds)
  76. }
  77. // Connect to the database, clear any leftover data, and install the
  78. // indexing schema.
  79. conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName)
  80. var db *sql.DB
  81. if err := pool.Retry(func() error {
  82. sink, err := NewEventSink(conn, chainID)
  83. if err != nil {
  84. return err
  85. }
  86. db = sink.DB() // set global for test use
  87. return db.Ping()
  88. }); err != nil {
  89. log.Fatalf("Connecting to database: %v", err)
  90. }
  91. if err := resetDatabase(db); err != nil {
  92. log.Fatalf("Flushing database: %v", err)
  93. }
  94. sm, err := readSchema()
  95. if err != nil {
  96. log.Fatalf("Reading schema: %v", err)
  97. }
  98. migrator := schema.NewMigrator()
  99. if err := migrator.Apply(db, sm); err != nil {
  100. log.Fatalf("Applying schema: %v", err)
  101. }
  102. // Set up the hook for tests to get the shared database handle.
  103. testDB = func() *sql.DB { return db }
  104. // Run the selected test cases.
  105. code := m.Run()
  106. // Clean up and shut down the database container.
  107. if *doPauseAtExit {
  108. log.Print("Testing complete, pausing for inspection. Send SIGINT to resume teardown")
  109. waitForInterrupt()
  110. log.Print("(resuming)")
  111. }
  112. log.Print("Shutting down database")
  113. if err := pool.Purge(resource); err != nil {
  114. log.Printf("WARNING: Purging pool failed: %v", err)
  115. }
  116. if err := db.Close(); err != nil {
  117. log.Printf("WARNING: Closing database failed: %v", err)
  118. }
  119. os.Exit(code)
  120. }
  121. func TestType(t *testing.T) {
  122. psqlSink := &EventSink{store: testDB(), chainID: chainID}
  123. assert.Equal(t, indexer.PSQL, psqlSink.Type())
  124. }
  125. func TestIndexing(t *testing.T) {
  126. ctx, cancel := context.WithCancel(context.Background())
  127. defer cancel()
  128. t.Run("IndexBlockEvents", func(t *testing.T) {
  129. indexer := &EventSink{store: testDB(), chainID: chainID}
  130. require.NoError(t, indexer.IndexBlockEvents(newTestBlockHeader()))
  131. verifyBlock(t, 1)
  132. verifyBlock(t, 2)
  133. verifyNotImplemented(t, "hasBlock", func() (bool, error) { return indexer.HasBlock(1) })
  134. verifyNotImplemented(t, "hasBlock", func() (bool, error) { return indexer.HasBlock(2) })
  135. verifyNotImplemented(t, "block search", func() (bool, error) {
  136. v, err := indexer.SearchBlockEvents(ctx, nil)
  137. return v != nil, err
  138. })
  139. require.NoError(t, verifyTimeStamp(tableBlocks))
  140. // Attempting to reindex the same events should gracefully succeed.
  141. require.NoError(t, indexer.IndexBlockEvents(newTestBlockHeader()))
  142. })
  143. t.Run("IndexTxEvents", func(t *testing.T) {
  144. indexer := &EventSink{store: testDB(), chainID: chainID}
  145. txResult := txResultWithEvents([]abci.Event{
  146. makeIndexedEvent("account.number", "1"),
  147. makeIndexedEvent("account.owner", "Ivan"),
  148. makeIndexedEvent("account.owner", "Yulieta"),
  149. {Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}},
  150. })
  151. require.NoError(t, indexer.IndexTxEvents([]*abci.TxResult{txResult}))
  152. txr, err := loadTxResult(types.Tx(txResult.Tx).Hash())
  153. require.NoError(t, err)
  154. assert.Equal(t, txResult, txr)
  155. require.NoError(t, verifyTimeStamp(tableTxResults))
  156. require.NoError(t, verifyTimeStamp(viewTxEvents))
  157. verifyNotImplemented(t, "getTxByHash", func() (bool, error) {
  158. txr, err := indexer.GetTxByHash(types.Tx(txResult.Tx).Hash())
  159. return txr != nil, err
  160. })
  161. verifyNotImplemented(t, "tx search", func() (bool, error) {
  162. txr, err := indexer.SearchTxEvents(ctx, nil)
  163. return txr != nil, err
  164. })
  165. // try to insert the duplicate tx events.
  166. err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
  167. require.NoError(t, err)
  168. })
  169. }
  170. func TestStop(t *testing.T) {
  171. indexer := &EventSink{store: testDB()}
  172. require.NoError(t, indexer.Stop())
  173. }
  174. // newTestBlockHeader constructs a fresh copy of a block header containing
  175. // known test values to exercise the indexer.
  176. func newTestBlockHeader() types.EventDataNewBlockHeader {
  177. return types.EventDataNewBlockHeader{
  178. Header: types.Header{Height: 1},
  179. ResultBeginBlock: abci.ResponseBeginBlock{
  180. Events: []abci.Event{
  181. makeIndexedEvent("begin_event.proposer", "FCAA001"),
  182. makeIndexedEvent("thingy.whatzit", "O.O"),
  183. },
  184. },
  185. ResultEndBlock: abci.ResponseEndBlock{
  186. Events: []abci.Event{
  187. makeIndexedEvent("end_event.foo", "100"),
  188. makeIndexedEvent("thingy.whatzit", "-.O"),
  189. },
  190. },
  191. }
  192. }
  193. // readSchema loads the indexing database schema file
  194. func readSchema() ([]*schema.Migration, error) {
  195. const filename = "schema.sql"
  196. contents, err := os.ReadFile(filename)
  197. if err != nil {
  198. return nil, fmt.Errorf("failed to read sql file from '%s': %w", filename, err)
  199. }
  200. return []*schema.Migration{{
  201. ID: time.Now().Local().String() + " db schema",
  202. Script: string(contents),
  203. }}, nil
  204. }
  205. // resetDB drops all the data from the test database.
  206. func resetDatabase(db *sql.DB) error {
  207. _, err := db.Exec(`DROP TABLE IF EXISTS blocks,tx_results,events,attributes CASCADE;`)
  208. if err != nil {
  209. return fmt.Errorf("dropping tables: %w", err)
  210. }
  211. _, err = db.Exec(`DROP VIEW IF EXISTS event_attributes,block_events,tx_events CASCADE;`)
  212. if err != nil {
  213. return fmt.Errorf("dropping views: %w", err)
  214. }
  215. return nil
  216. }
  217. // txResultWithEvents constructs a fresh transaction result with fixed values
  218. // for testing, that includes the specified events.
  219. func txResultWithEvents(events []abci.Event) *abci.TxResult {
  220. return &abci.TxResult{
  221. Height: 1,
  222. Index: 0,
  223. Tx: types.Tx("HELLO WORLD"),
  224. Result: abci.ResponseDeliverTx{
  225. Data: []byte{0},
  226. Code: abci.CodeTypeOK,
  227. Log: "",
  228. Events: events,
  229. },
  230. }
  231. }
  232. func loadTxResult(hash []byte) (*abci.TxResult, error) {
  233. hashString := fmt.Sprintf("%X", hash)
  234. var resultData []byte
  235. if err := testDB().QueryRow(`
  236. SELECT tx_result FROM `+tableTxResults+` WHERE tx_hash = $1;
  237. `, hashString).Scan(&resultData); err != nil {
  238. return nil, fmt.Errorf("lookup transaction for hash %q failed: %v", hashString, err)
  239. }
  240. txr := new(abci.TxResult)
  241. if err := proto.Unmarshal(resultData, txr); err != nil {
  242. return nil, fmt.Errorf("unmarshaling txr: %w", err)
  243. }
  244. return txr, nil
  245. }
  246. func verifyTimeStamp(tableName string) error {
  247. return testDB().QueryRow(fmt.Sprintf(`
  248. SELECT DISTINCT %[1]s.created_at
  249. FROM %[1]s
  250. WHERE %[1]s.created_at >= $1;
  251. `, tableName), time.Now().Add(-2*time.Second)).Err()
  252. }
  253. func verifyBlock(t *testing.T, height int64) {
  254. // Check that the blocks table contains an entry for this height.
  255. if err := testDB().QueryRow(`
  256. SELECT height FROM `+tableBlocks+` WHERE height = $1;
  257. `, height).Err(); err == sql.ErrNoRows {
  258. t.Errorf("No block found for height=%d", height)
  259. } else if err != nil {
  260. t.Fatalf("Database query failed: %v", err)
  261. }
  262. // Verify the presence of begin_block and end_block events.
  263. if err := testDB().QueryRow(`
  264. SELECT type, height, chain_id FROM `+viewBlockEvents+`
  265. WHERE height = $1 AND type = $2 AND chain_id = $3;
  266. `, height, types.EventTypeBeginBlock, chainID).Err(); err == sql.ErrNoRows {
  267. t.Errorf("No %q event found for height=%d", types.EventTypeBeginBlock, height)
  268. } else if err != nil {
  269. t.Fatalf("Database query failed: %c", err)
  270. }
  271. if err := testDB().QueryRow(`
  272. SELECT type, height, chain_id FROM `+viewBlockEvents+`
  273. WHERE height = $1 AND type = $2 AND chain_id = $3;
  274. `, height, types.EventTypeEndBlock, chainID).Err(); err == sql.ErrNoRows {
  275. t.Errorf("No %q event found for height=%d", types.EventTypeEndBlock, height)
  276. } else if err != nil {
  277. t.Fatalf("Database query failed: %v", err)
  278. }
  279. }
  280. // verifyNotImplemented calls f and verifies that it returns both a
  281. // false-valued flag and a non-nil error whose string matching the expected
  282. // "not supported" message with label prefixed.
  283. func verifyNotImplemented(t *testing.T, label string, f func() (bool, error)) {
  284. t.Helper()
  285. t.Logf("Verifying that %q reports it is not implemented", label)
  286. want := label + " is not supported via the postgres event sink"
  287. ok, err := f()
  288. assert.False(t, ok)
  289. require.Error(t, err)
  290. assert.Equal(t, want, err.Error())
  291. }
  292. // waitForInterrupt blocks until a SIGINT is received by the process.
  293. func waitForInterrupt() {
  294. ch := make(chan os.Signal, 1)
  295. signal.Notify(ch, os.Interrupt)
  296. <-ch
  297. }