You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

342 lines
9.7 KiB

  1. package psql
  2. import (
  3. "context"
  4. "database/sql"
  5. "flag"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "os"
  10. "os/signal"
  11. "testing"
  12. "time"
  13. "github.com/adlio/schema"
  14. "github.com/gogo/protobuf/proto"
  15. "github.com/ory/dockertest"
  16. "github.com/ory/dockertest/docker"
  17. "github.com/stretchr/testify/assert"
  18. "github.com/stretchr/testify/require"
  19. abci "github.com/tendermint/tendermint/abci/types"
  20. "github.com/tendermint/tendermint/types"
  21. // Register the Postgres database driver.
  22. _ "github.com/lib/pq"
  23. )
  24. var (
  25. doPauseAtExit = flag.Bool("pause-at-exit", false,
  26. "If true, pause the test until interrupted at shutdown, to allow debugging")
  27. // A hook that test cases can call to obtain the shared database instance
  28. // used for testing the sink. This is initialized in TestMain (see below).
  29. testDB func() *sql.DB
  30. )
  31. const (
  32. user = "postgres"
  33. password = "secret"
  34. port = "5432"
  35. dsn = "postgres://%s:%s@localhost:%s/%s?sslmode=disable"
  36. dbName = "postgres"
  37. chainID = "test-chainID"
  38. viewBlockEvents = "block_events"
  39. viewTxEvents = "tx_events"
  40. )
  41. func TestMain(m *testing.M) {
  42. flag.Parse()
  43. // Set up docker and start a container running PostgreSQL.
  44. pool, err := dockertest.NewPool(os.Getenv("DOCKER_URL"))
  45. if err != nil {
  46. log.Fatalf("Creating docker pool: %v", err)
  47. }
  48. resource, err := pool.RunWithOptions(&dockertest.RunOptions{
  49. Repository: "postgres",
  50. Tag: "13",
  51. Env: []string{
  52. "POSTGRES_USER=" + user,
  53. "POSTGRES_PASSWORD=" + password,
  54. "POSTGRES_DB=" + dbName,
  55. "listen_addresses = '*'",
  56. },
  57. ExposedPorts: []string{port},
  58. }, func(config *docker.HostConfig) {
  59. // set AutoRemove to true so that stopped container goes away by itself
  60. config.AutoRemove = true
  61. config.RestartPolicy = docker.RestartPolicy{
  62. Name: "no",
  63. }
  64. })
  65. if err != nil {
  66. log.Fatalf("Starting docker pool: %v", err)
  67. }
  68. if *doPauseAtExit {
  69. log.Print("Pause at exit is enabled, containers will not expire")
  70. } else {
  71. const expireSeconds = 60
  72. _ = resource.Expire(expireSeconds)
  73. log.Printf("Container expiration set to %d seconds", expireSeconds)
  74. }
  75. // Connect to the database, clear any leftover data, and install the
  76. // indexing schema.
  77. conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName)
  78. var db *sql.DB
  79. if err := pool.Retry(func() error {
  80. sink, err := NewEventSink(conn, chainID)
  81. if err != nil {
  82. return err
  83. }
  84. db = sink.DB() // set global for test use
  85. return db.Ping()
  86. }); err != nil {
  87. log.Fatalf("Connecting to database: %v", err)
  88. }
  89. if err := resetDatabase(db); err != nil {
  90. log.Fatalf("Flushing database: %v", err)
  91. }
  92. sm, err := readSchema()
  93. if err != nil {
  94. log.Fatalf("Reading schema: %v", err)
  95. } else if err := schema.NewMigrator().Apply(db, sm); err != nil {
  96. log.Fatalf("Applying schema: %v", err)
  97. }
  98. // Set up the hook for tests to get the shared database handle.
  99. testDB = func() *sql.DB { return db }
  100. // Run the selected test cases.
  101. code := m.Run()
  102. // Clean up and shut down the database container.
  103. if *doPauseAtExit {
  104. log.Print("Testing complete, pausing for inspection. Send SIGINT to resume teardown")
  105. waitForInterrupt()
  106. log.Print("(resuming)")
  107. }
  108. log.Print("Shutting down database")
  109. if err := pool.Purge(resource); err != nil {
  110. log.Printf("WARNING: Purging pool failed: %v", err)
  111. }
  112. if err := db.Close(); err != nil {
  113. log.Printf("WARNING: Closing database failed: %v", err)
  114. }
  115. os.Exit(code)
  116. }
  117. func TestIndexing(t *testing.T) {
  118. t.Run("IndexBlockEvents", func(t *testing.T) {
  119. indexer := &EventSink{store: testDB(), chainID: chainID}
  120. require.NoError(t, indexer.IndexBlockEvents(newTestBlockHeader()))
  121. verifyBlock(t, 1)
  122. verifyBlock(t, 2)
  123. verifyNotImplemented(t, "hasBlock", func() (bool, error) { return indexer.HasBlock(1) })
  124. verifyNotImplemented(t, "hasBlock", func() (bool, error) { return indexer.HasBlock(2) })
  125. verifyNotImplemented(t, "block search", func() (bool, error) {
  126. v, err := indexer.SearchBlockEvents(context.Background(), nil)
  127. return v != nil, err
  128. })
  129. require.NoError(t, verifyTimeStamp(tableBlocks))
  130. // Attempting to reindex the same events should gracefully succeed.
  131. require.NoError(t, indexer.IndexBlockEvents(newTestBlockHeader()))
  132. })
  133. t.Run("IndexTxEvents", func(t *testing.T) {
  134. indexer := &EventSink{store: testDB(), chainID: chainID}
  135. txResult := txResultWithEvents([]abci.Event{
  136. makeIndexedEvent("account.number", "1"),
  137. makeIndexedEvent("account.owner", "Ivan"),
  138. makeIndexedEvent("account.owner", "Yulieta"),
  139. {Type: "", Attributes: []abci.EventAttribute{
  140. {
  141. Key: []byte("not_allowed"),
  142. Value: []byte("Vlad"),
  143. Index: true,
  144. },
  145. }},
  146. })
  147. require.NoError(t, indexer.IndexTxEvents([]*abci.TxResult{txResult}))
  148. txr, err := loadTxResult(types.Tx(txResult.Tx).Hash())
  149. require.NoError(t, err)
  150. assert.Equal(t, txResult, txr)
  151. require.NoError(t, verifyTimeStamp(tableTxResults))
  152. require.NoError(t, verifyTimeStamp(viewTxEvents))
  153. verifyNotImplemented(t, "getTxByHash", func() (bool, error) {
  154. txr, err := indexer.GetTxByHash(types.Tx(txResult.Tx).Hash())
  155. return txr != nil, err
  156. })
  157. verifyNotImplemented(t, "tx search", func() (bool, error) {
  158. txr, err := indexer.SearchTxEvents(context.Background(), nil)
  159. return txr != nil, err
  160. })
  161. // try to insert the duplicate tx events.
  162. err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
  163. require.NoError(t, err)
  164. })
  165. }
  166. func TestStop(t *testing.T) {
  167. indexer := &EventSink{store: testDB()}
  168. require.NoError(t, indexer.Stop())
  169. }
  170. // newTestBlockHeader constructs a fresh copy of a block header containing
  171. // known test values to exercise the indexer.
  172. func newTestBlockHeader() types.EventDataNewBlockHeader {
  173. return types.EventDataNewBlockHeader{
  174. Header: types.Header{Height: 1},
  175. ResultBeginBlock: abci.ResponseBeginBlock{
  176. Events: []abci.Event{
  177. makeIndexedEvent("begin_event.proposer", "FCAA001"),
  178. makeIndexedEvent("thingy.whatzit", "O.O"),
  179. },
  180. },
  181. ResultEndBlock: abci.ResponseEndBlock{
  182. Events: []abci.Event{
  183. makeIndexedEvent("end_event.foo", "100"),
  184. makeIndexedEvent("thingy.whatzit", "-.O"),
  185. },
  186. },
  187. }
  188. }
  189. // readSchema loads the indexing database schema file
  190. func readSchema() ([]*schema.Migration, error) {
  191. const filename = "schema.sql"
  192. contents, err := ioutil.ReadFile(filename)
  193. if err != nil {
  194. return nil, fmt.Errorf("failed to read sql file from '%s': %w", filename, err)
  195. }
  196. return []*schema.Migration{{
  197. ID: time.Now().Local().String() + " db schema",
  198. Script: string(contents),
  199. }}, nil
  200. }
  201. // resetDB drops all the data from the test database.
  202. func resetDatabase(db *sql.DB) error {
  203. _, err := db.Exec(`DROP TABLE IF EXISTS blocks,tx_results,events,attributes CASCADE;`)
  204. if err != nil {
  205. return fmt.Errorf("dropping tables: %v", err)
  206. }
  207. _, err = db.Exec(`DROP VIEW IF EXISTS event_attributes,block_events,tx_events CASCADE;`)
  208. if err != nil {
  209. return fmt.Errorf("dropping views: %v", err)
  210. }
  211. return nil
  212. }
  213. // txResultWithEvents constructs a fresh transaction result with fixed values
  214. // for testing, that includes the specified events.
  215. func txResultWithEvents(events []abci.Event) *abci.TxResult {
  216. return &abci.TxResult{
  217. Height: 1,
  218. Index: 0,
  219. Tx: types.Tx("HELLO WORLD"),
  220. Result: abci.ResponseDeliverTx{
  221. Data: []byte{0},
  222. Code: abci.CodeTypeOK,
  223. Log: "",
  224. Events: events,
  225. },
  226. }
  227. }
  228. func loadTxResult(hash []byte) (*abci.TxResult, error) {
  229. hashString := fmt.Sprintf("%X", hash)
  230. var resultData []byte
  231. if err := testDB().QueryRow(`
  232. SELECT tx_result FROM `+tableTxResults+` WHERE tx_hash = $1;
  233. `, hashString).Scan(&resultData); err != nil {
  234. return nil, fmt.Errorf("lookup transaction for hash %q failed: %v", hashString, err)
  235. }
  236. txr := new(abci.TxResult)
  237. if err := proto.Unmarshal(resultData, txr); err != nil {
  238. return nil, fmt.Errorf("unmarshaling txr: %v", err)
  239. }
  240. return txr, nil
  241. }
  242. func verifyTimeStamp(tableName string) error {
  243. return testDB().QueryRow(fmt.Sprintf(`
  244. SELECT DISTINCT %[1]s.created_at
  245. FROM %[1]s
  246. WHERE %[1]s.created_at >= $1;
  247. `, tableName), time.Now().Add(-2*time.Second)).Err()
  248. }
  249. func verifyBlock(t *testing.T, height int64) {
  250. // Check that the blocks table contains an entry for this height.
  251. if err := testDB().QueryRow(`
  252. SELECT height FROM `+tableBlocks+` WHERE height = $1;
  253. `, height).Err(); err == sql.ErrNoRows {
  254. t.Errorf("No block found for height=%d", height)
  255. } else if err != nil {
  256. t.Fatalf("Database query failed: %v", err)
  257. }
  258. // Verify the presence of begin_block and end_block events.
  259. if err := testDB().QueryRow(`
  260. SELECT type, height, chain_id FROM `+viewBlockEvents+`
  261. WHERE height = $1 AND type = $2 AND chain_id = $3;
  262. `, height, eventTypeBeginBlock, chainID).Err(); err == sql.ErrNoRows {
  263. t.Errorf("No %q event found for height=%d", eventTypeBeginBlock, height)
  264. } else if err != nil {
  265. t.Fatalf("Database query failed: %v", err)
  266. }
  267. if err := testDB().QueryRow(`
  268. SELECT type, height, chain_id FROM `+viewBlockEvents+`
  269. WHERE height = $1 AND type = $2 AND chain_id = $3;
  270. `, height, eventTypeEndBlock, chainID).Err(); err == sql.ErrNoRows {
  271. t.Errorf("No %q event found for height=%d", eventTypeEndBlock, height)
  272. } else if err != nil {
  273. t.Fatalf("Database query failed: %v", err)
  274. }
  275. }
  276. // verifyNotImplemented calls f and verifies that it returns both a
  277. // false-valued flag and a non-nil error whose string matching the expected
  278. // "not supported" message with label prefixed.
  279. func verifyNotImplemented(t *testing.T, label string, f func() (bool, error)) {
  280. t.Helper()
  281. t.Logf("Verifying that %q reports it is not implemented", label)
  282. want := label + " is not supported via the postgres event sink"
  283. ok, err := f()
  284. assert.False(t, ok)
  285. require.NotNil(t, err)
  286. assert.Equal(t, want, err.Error())
  287. }
  288. // waitForInterrupt blocks until a SIGINT is received by the process.
  289. func waitForInterrupt() {
  290. ch := make(chan os.Signal, 1)
  291. signal.Notify(ch, os.Interrupt)
  292. <-ch
  293. }