You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

344 lines
10 KiB

  1. package psql
  2. import (
  3. "context"
  4. "database/sql"
  5. "flag"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "os"
  10. "os/signal"
  11. "testing"
  12. "time"
  13. "github.com/adlio/schema"
  14. "github.com/gogo/protobuf/proto"
  15. "github.com/ory/dockertest"
  16. "github.com/ory/dockertest/docker"
  17. "github.com/stretchr/testify/assert"
  18. "github.com/stretchr/testify/require"
  19. abci "github.com/tendermint/tendermint/abci/types"
  20. "github.com/tendermint/tendermint/internal/state/indexer"
  21. "github.com/tendermint/tendermint/types"
  22. // Register the Postgres database driver.
  23. _ "github.com/lib/pq"
  24. )
  25. // Verify that the type satisfies the EventSink interface.
  26. var _ indexer.EventSink = (*EventSink)(nil)
  27. var (
  28. doPauseAtExit = flag.Bool("pause-at-exit", false,
  29. "If true, pause the test until interrupted at shutdown, to allow debugging")
  30. // A hook that test cases can call to obtain the shared database instance
  31. // used for testing the sink. This is initialized in TestMain (see below).
  32. testDB func() *sql.DB
  33. )
  34. const (
  35. user = "postgres"
  36. password = "secret"
  37. port = "5432"
  38. dsn = "postgres://%s:%s@localhost:%s/%s?sslmode=disable"
  39. dbName = "postgres"
  40. chainID = "test-chainID"
  41. viewBlockEvents = "block_events"
  42. viewTxEvents = "tx_events"
  43. )
  44. func TestMain(m *testing.M) {
  45. flag.Parse()
  46. // Set up docker and start a container running PostgreSQL.
  47. pool, err := dockertest.NewPool(os.Getenv("DOCKER_URL"))
  48. if err != nil {
  49. log.Fatalf("Creating docker pool: %v", err)
  50. }
  51. resource, err := pool.RunWithOptions(&dockertest.RunOptions{
  52. Repository: "postgres",
  53. Tag: "13",
  54. Env: []string{
  55. "POSTGRES_USER=" + user,
  56. "POSTGRES_PASSWORD=" + password,
  57. "POSTGRES_DB=" + dbName,
  58. "listen_addresses = '*'",
  59. },
  60. ExposedPorts: []string{port},
  61. }, func(config *docker.HostConfig) {
  62. // set AutoRemove to true so that stopped container goes away by itself
  63. config.AutoRemove = true
  64. config.RestartPolicy = docker.RestartPolicy{
  65. Name: "no",
  66. }
  67. })
  68. if err != nil {
  69. log.Fatalf("Starting docker pool: %v", err)
  70. }
  71. if *doPauseAtExit {
  72. log.Print("Pause at exit is enabled, containers will not expire")
  73. } else {
  74. const expireSeconds = 60
  75. _ = resource.Expire(expireSeconds)
  76. log.Printf("Container expiration set to %d seconds", expireSeconds)
  77. }
  78. // Connect to the database, clear any leftover data, and install the
  79. // indexing schema.
  80. conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName)
  81. var db *sql.DB
  82. if err := pool.Retry(func() error {
  83. sink, err := NewEventSink(conn, chainID)
  84. if err != nil {
  85. return err
  86. }
  87. db = sink.DB() // set global for test use
  88. return db.Ping()
  89. }); err != nil {
  90. log.Fatalf("Connecting to database: %v", err)
  91. }
  92. if err := resetDatabase(db); err != nil {
  93. log.Fatalf("Flushing database: %v", err)
  94. }
  95. sm, err := readSchema()
  96. if err != nil {
  97. log.Fatalf("Reading schema: %v", err)
  98. } else if err := schema.NewMigrator().Apply(db, sm); err != nil {
  99. log.Fatalf("Applying schema: %v", err)
  100. }
  101. // Set up the hook for tests to get the shared database handle.
  102. testDB = func() *sql.DB { return db }
  103. // Run the selected test cases.
  104. code := m.Run()
  105. // Clean up and shut down the database container.
  106. if *doPauseAtExit {
  107. log.Print("Testing complete, pausing for inspection. Send SIGINT to resume teardown")
  108. waitForInterrupt()
  109. log.Print("(resuming)")
  110. }
  111. log.Print("Shutting down database")
  112. if err := pool.Purge(resource); err != nil {
  113. log.Printf("WARNING: Purging pool failed: %v", err)
  114. }
  115. if err := db.Close(); err != nil {
  116. log.Printf("WARNING: Closing database failed: %v", err)
  117. }
  118. os.Exit(code)
  119. }
  120. func TestType(t *testing.T) {
  121. psqlSink := &EventSink{store: testDB(), chainID: chainID}
  122. assert.Equal(t, indexer.PSQL, psqlSink.Type())
  123. }
  124. func TestIndexing(t *testing.T) {
  125. t.Run("IndexBlockEvents", func(t *testing.T) {
  126. indexer := &EventSink{store: testDB(), chainID: chainID}
  127. require.NoError(t, indexer.IndexBlockEvents(newTestBlockHeader()))
  128. verifyBlock(t, 1)
  129. verifyBlock(t, 2)
  130. verifyNotImplemented(t, "hasBlock", func() (bool, error) { return indexer.HasBlock(1) })
  131. verifyNotImplemented(t, "hasBlock", func() (bool, error) { return indexer.HasBlock(2) })
  132. verifyNotImplemented(t, "block search", func() (bool, error) {
  133. v, err := indexer.SearchBlockEvents(context.Background(), nil)
  134. return v != nil, err
  135. })
  136. require.NoError(t, verifyTimeStamp(tableBlocks))
  137. // Attempting to reindex the same events should gracefully succeed.
  138. require.NoError(t, indexer.IndexBlockEvents(newTestBlockHeader()))
  139. })
  140. t.Run("IndexTxEvents", func(t *testing.T) {
  141. indexer := &EventSink{store: testDB(), chainID: chainID}
  142. txResult := txResultWithEvents([]abci.Event{
  143. makeIndexedEvent("account.number", "1"),
  144. makeIndexedEvent("account.owner", "Ivan"),
  145. makeIndexedEvent("account.owner", "Yulieta"),
  146. {Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}},
  147. })
  148. require.NoError(t, indexer.IndexTxEvents([]*abci.TxResult{txResult}))
  149. txr, err := loadTxResult(types.Tx(txResult.Tx).Hash())
  150. require.NoError(t, err)
  151. assert.Equal(t, txResult, txr)
  152. require.NoError(t, verifyTimeStamp(tableTxResults))
  153. require.NoError(t, verifyTimeStamp(viewTxEvents))
  154. verifyNotImplemented(t, "getTxByHash", func() (bool, error) {
  155. txr, err := indexer.GetTxByHash(types.Tx(txResult.Tx).Hash())
  156. return txr != nil, err
  157. })
  158. verifyNotImplemented(t, "tx search", func() (bool, error) {
  159. txr, err := indexer.SearchTxEvents(context.Background(), nil)
  160. return txr != nil, err
  161. })
  162. // try to insert the duplicate tx events.
  163. err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
  164. require.NoError(t, err)
  165. })
  166. }
  167. func TestStop(t *testing.T) {
  168. indexer := &EventSink{store: testDB()}
  169. require.NoError(t, indexer.Stop())
  170. }
  171. // newTestBlockHeader constructs a fresh copy of a block header containing
  172. // known test values to exercise the indexer.
  173. func newTestBlockHeader() types.EventDataNewBlockHeader {
  174. return types.EventDataNewBlockHeader{
  175. Header: types.Header{Height: 1},
  176. ResultBeginBlock: abci.ResponseBeginBlock{
  177. Events: []abci.Event{
  178. makeIndexedEvent("begin_event.proposer", "FCAA001"),
  179. makeIndexedEvent("thingy.whatzit", "O.O"),
  180. },
  181. },
  182. ResultEndBlock: abci.ResponseEndBlock{
  183. Events: []abci.Event{
  184. makeIndexedEvent("end_event.foo", "100"),
  185. makeIndexedEvent("thingy.whatzit", "-.O"),
  186. },
  187. },
  188. }
  189. }
  190. // readSchema loads the indexing database schema file
  191. func readSchema() ([]*schema.Migration, error) {
  192. const filename = "schema.sql"
  193. contents, err := ioutil.ReadFile(filename)
  194. if err != nil {
  195. return nil, fmt.Errorf("failed to read sql file from '%s': %w", filename, err)
  196. }
  197. return []*schema.Migration{{
  198. ID: time.Now().Local().String() + " db schema",
  199. Script: string(contents),
  200. }}, nil
  201. }
  202. // resetDB drops all the data from the test database.
  203. func resetDatabase(db *sql.DB) error {
  204. _, err := db.Exec(`DROP TABLE IF EXISTS blocks,tx_results,events,attributes CASCADE;`)
  205. if err != nil {
  206. return fmt.Errorf("dropping tables: %v", err)
  207. }
  208. _, err = db.Exec(`DROP VIEW IF EXISTS event_attributes,block_events,tx_events CASCADE;`)
  209. if err != nil {
  210. return fmt.Errorf("dropping views: %v", err)
  211. }
  212. return nil
  213. }
  214. // txResultWithEvents constructs a fresh transaction result with fixed values
  215. // for testing, that includes the specified events.
  216. func txResultWithEvents(events []abci.Event) *abci.TxResult {
  217. return &abci.TxResult{
  218. Height: 1,
  219. Index: 0,
  220. Tx: types.Tx("HELLO WORLD"),
  221. Result: abci.ResponseDeliverTx{
  222. Data: []byte{0},
  223. Code: abci.CodeTypeOK,
  224. Log: "",
  225. Events: events,
  226. },
  227. }
  228. }
  229. func loadTxResult(hash []byte) (*abci.TxResult, error) {
  230. hashString := fmt.Sprintf("%X", hash)
  231. var resultData []byte
  232. if err := testDB().QueryRow(`
  233. SELECT tx_result FROM `+tableTxResults+` WHERE tx_hash = $1;
  234. `, hashString).Scan(&resultData); err != nil {
  235. return nil, fmt.Errorf("lookup transaction for hash %q failed: %v", hashString, err)
  236. }
  237. txr := new(abci.TxResult)
  238. if err := proto.Unmarshal(resultData, txr); err != nil {
  239. return nil, fmt.Errorf("unmarshaling txr: %v", err)
  240. }
  241. return txr, nil
  242. }
  243. func verifyTimeStamp(tableName string) error {
  244. return testDB().QueryRow(fmt.Sprintf(`
  245. SELECT DISTINCT %[1]s.created_at
  246. FROM %[1]s
  247. WHERE %[1]s.created_at >= $1;
  248. `, tableName), time.Now().Add(-2*time.Second)).Err()
  249. }
  250. func verifyBlock(t *testing.T, height int64) {
  251. // Check that the blocks table contains an entry for this height.
  252. if err := testDB().QueryRow(`
  253. SELECT height FROM `+tableBlocks+` WHERE height = $1;
  254. `, height).Err(); err == sql.ErrNoRows {
  255. t.Errorf("No block found for height=%d", height)
  256. } else if err != nil {
  257. t.Fatalf("Database query failed: %v", err)
  258. }
  259. // Verify the presence of begin_block and end_block events.
  260. if err := testDB().QueryRow(`
  261. SELECT type, height, chain_id FROM `+viewBlockEvents+`
  262. WHERE height = $1 AND type = $2 AND chain_id = $3;
  263. `, height, types.EventTypeBeginBlock, chainID).Err(); err == sql.ErrNoRows {
  264. t.Errorf("No %q event found for height=%d", types.EventTypeBeginBlock, height)
  265. } else if err != nil {
  266. t.Fatalf("Database query failed: %v", err)
  267. }
  268. if err := testDB().QueryRow(`
  269. SELECT type, height, chain_id FROM `+viewBlockEvents+`
  270. WHERE height = $1 AND type = $2 AND chain_id = $3;
  271. `, height, types.EventTypeEndBlock, chainID).Err(); err == sql.ErrNoRows {
  272. t.Errorf("No %q event found for height=%d", types.EventTypeEndBlock, height)
  273. } else if err != nil {
  274. t.Fatalf("Database query failed: %v", err)
  275. }
  276. }
  277. // verifyNotImplemented calls f and verifies that it returns both a
  278. // false-valued flag and a non-nil error whose string matching the expected
  279. // "not supported" message with label prefixed.
  280. func verifyNotImplemented(t *testing.T, label string, f func() (bool, error)) {
  281. t.Helper()
  282. t.Logf("Verifying that %q reports it is not implemented", label)
  283. want := label + " is not supported via the postgres event sink"
  284. ok, err := f()
  285. assert.False(t, ok)
  286. require.NotNil(t, err)
  287. assert.Equal(t, want, err.Error())
  288. }
  289. // waitForInterrupt blocks until a SIGINT is received by the process.
  290. func waitForInterrupt() {
  291. ch := make(chan os.Signal, 1)
  292. signal.Notify(ch, os.Interrupt)
  293. <-ch
  294. }