You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

365 lines
8.3 KiB

  1. package psql
  2. import (
  3. "context"
  4. "database/sql"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "os"
  9. "testing"
  10. "time"
  11. sq "github.com/Masterminds/squirrel"
  12. schema "github.com/adlio/schema"
  13. proto "github.com/gogo/protobuf/proto"
  14. _ "github.com/lib/pq"
  15. dockertest "github.com/ory/dockertest"
  16. "github.com/ory/dockertest/docker"
  17. "github.com/stretchr/testify/assert"
  18. "github.com/stretchr/testify/require"
  19. abci "github.com/tendermint/tendermint/abci/types"
  20. "github.com/tendermint/tendermint/state/indexer"
  21. "github.com/tendermint/tendermint/types"
  22. )
  23. var db *sql.DB
  24. var resource *dockertest.Resource
  25. var chainID = "test-chainID"
  26. var (
  27. user = "postgres"
  28. password = "secret"
  29. port = "5432"
  30. dsn = "postgres://%s:%s@localhost:%s/%s?sslmode=disable"
  31. dbName = "postgres"
  32. )
  33. func TestType(t *testing.T) {
  34. pool, err := setupDB(t)
  35. require.NoError(t, err)
  36. psqlSink := &EventSink{store: db, chainID: chainID}
  37. assert.Equal(t, indexer.PSQL, psqlSink.Type())
  38. require.NoError(t, teardown(t, pool))
  39. }
  40. func TestBlockFuncs(t *testing.T) {
  41. pool, err := setupDB(t)
  42. require.NoError(t, err)
  43. indexer := &EventSink{store: db, chainID: chainID}
  44. require.NoError(t, indexer.IndexBlockEvents(getTestBlockHeader()))
  45. r, err := verifyBlock(1)
  46. assert.True(t, r)
  47. require.NoError(t, err)
  48. r, err = verifyBlock(2)
  49. assert.False(t, r)
  50. require.NoError(t, err)
  51. r, err = indexer.HasBlock(1)
  52. assert.False(t, r)
  53. assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err)
  54. r, err = indexer.HasBlock(2)
  55. assert.False(t, r)
  56. assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err)
  57. r2, err := indexer.SearchBlockEvents(context.TODO(), nil)
  58. assert.Nil(t, r2)
  59. assert.Equal(t, errors.New("block search is not supported via the postgres event sink"), err)
  60. require.NoError(t, verifyTimeStamp(TableEventBlock))
  61. require.NoError(t, teardown(t, pool))
  62. }
  63. func TestTxFuncs(t *testing.T) {
  64. pool, err := setupDB(t)
  65. assert.Nil(t, err)
  66. indexer := &EventSink{store: db, chainID: chainID}
  67. txResult := txResultWithEvents([]abci.Event{
  68. {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}},
  69. {Type: "account", Attributes: []abci.EventAttribute{{Key: "owner", Value: "Ivan", Index: true}}},
  70. {Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}},
  71. })
  72. err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
  73. require.NoError(t, err)
  74. tx, err := verifyTx(types.Tx(txResult.Tx).Hash())
  75. require.NoError(t, err)
  76. assert.Equal(t, txResult, tx)
  77. require.NoError(t, verifyTimeStamp(TableEventTx))
  78. require.NoError(t, verifyTimeStamp(TableResultTx))
  79. tx, err = indexer.GetTxByHash(types.Tx(txResult.Tx).Hash())
  80. assert.Nil(t, tx)
  81. assert.Equal(t, errors.New("getTxByHash is not supported via the postgres event sink"), err)
  82. r2, err := indexer.SearchTxEvents(context.TODO(), nil)
  83. assert.Nil(t, r2)
  84. assert.Equal(t, errors.New("tx search is not supported via the postgres event sink"), err)
  85. assert.Nil(t, teardown(t, pool))
  86. }
  87. func TestStop(t *testing.T) {
  88. pool, err := setupDB(t)
  89. require.NoError(t, err)
  90. indexer := &EventSink{store: db}
  91. require.NoError(t, indexer.Stop())
  92. defer db.Close()
  93. require.NoError(t, pool.Purge(resource))
  94. }
  95. func getTestBlockHeader() types.EventDataNewBlockHeader {
  96. return types.EventDataNewBlockHeader{
  97. Header: types.Header{Height: 1},
  98. ResultBeginBlock: abci.ResponseBeginBlock{
  99. Events: []abci.Event{
  100. {
  101. Type: "begin_event",
  102. Attributes: []abci.EventAttribute{
  103. {
  104. Key: "proposer",
  105. Value: "FCAA001",
  106. Index: true,
  107. },
  108. },
  109. },
  110. },
  111. },
  112. ResultEndBlock: abci.ResponseEndBlock{
  113. Events: []abci.Event{
  114. {
  115. Type: "end_event",
  116. Attributes: []abci.EventAttribute{
  117. {
  118. Key: "foo",
  119. Value: "100",
  120. Index: true,
  121. },
  122. },
  123. },
  124. },
  125. },
  126. }
  127. }
  128. func readSchema() ([]*schema.Migration, error) {
  129. filename := "schema.sql"
  130. contents, err := ioutil.ReadFile(filename)
  131. if err != nil {
  132. return nil, fmt.Errorf("failed to read sql file from '%s': %w", filename, err)
  133. }
  134. mg := &schema.Migration{}
  135. mg.ID = time.Now().Local().String() + " db schema"
  136. mg.Script = string(contents)
  137. return append([]*schema.Migration{}, mg), nil
  138. }
  139. func resetDB(t *testing.T) {
  140. q := "DROP TABLE IF EXISTS block_events,tx_events,tx_results"
  141. _, err := db.Exec(q)
  142. require.NoError(t, err)
  143. q = "DROP TYPE IF EXISTS block_event_type"
  144. _, err = db.Exec(q)
  145. require.NoError(t, err)
  146. }
  147. func txResultWithEvents(events []abci.Event) *abci.TxResult {
  148. tx := types.Tx("HELLO WORLD")
  149. return &abci.TxResult{
  150. Height: 1,
  151. Index: 0,
  152. Tx: tx,
  153. Result: abci.ResponseDeliverTx{
  154. Data: []byte{0},
  155. Code: abci.CodeTypeOK,
  156. Log: "",
  157. Events: events,
  158. },
  159. }
  160. }
  161. func verifyTx(hash []byte) (*abci.TxResult, error) {
  162. join := fmt.Sprintf("%s ON %s.id = tx_result_id", TableEventTx, TableResultTx)
  163. sqlStmt := sq.
  164. Select("tx_result", fmt.Sprintf("%s.id", TableResultTx), "tx_result_id", "hash", "chain_id").
  165. Distinct().From(TableResultTx).
  166. InnerJoin(join).
  167. Where(fmt.Sprintf("hash = $1 AND chain_id = '%s'", chainID), fmt.Sprintf("%X", hash))
  168. rows, err := sqlStmt.RunWith(db).Query()
  169. if err != nil {
  170. return nil, err
  171. }
  172. defer rows.Close()
  173. if rows.Next() {
  174. var txResult []byte
  175. var txResultID, txid int
  176. var h, cid string
  177. err = rows.Scan(&txResult, &txResultID, &txid, &h, &cid)
  178. if err != nil {
  179. return nil, nil
  180. }
  181. msg := new(abci.TxResult)
  182. err = proto.Unmarshal(txResult, msg)
  183. if err != nil {
  184. return nil, err
  185. }
  186. return msg, err
  187. }
  188. // No result
  189. return nil, nil
  190. }
  191. func verifyTimeStamp(tb string) error {
  192. // We assume the tx indexing time would not exceed 2 second from now
  193. sqlStmt := sq.
  194. Select(fmt.Sprintf("%s.created_at", tb)).
  195. Distinct().From(tb).
  196. Where(fmt.Sprintf("%s.created_at >= $1", tb), time.Now().Add(-2*time.Second))
  197. rows, err := sqlStmt.RunWith(db).Query()
  198. if err != nil {
  199. return err
  200. }
  201. defer rows.Close()
  202. if rows.Next() {
  203. var ts string
  204. err = rows.Scan(&ts)
  205. if err != nil {
  206. return err
  207. }
  208. return nil
  209. }
  210. return errors.New("no result")
  211. }
  212. func verifyBlock(h int64) (bool, error) {
  213. sqlStmt := sq.
  214. Select("height").
  215. Distinct().
  216. From(TableEventBlock).
  217. Where(fmt.Sprintf("height = %d", h))
  218. rows, err := sqlStmt.RunWith(db).Query()
  219. if err != nil {
  220. return false, err
  221. }
  222. defer rows.Close()
  223. if !rows.Next() {
  224. return false, nil
  225. }
  226. sqlStmt = sq.
  227. Select("type, height", "chain_id").
  228. Distinct().
  229. From(TableEventBlock).
  230. Where(fmt.Sprintf("height = %d AND type = '%s' AND chain_id = '%s'", h, types.EventTypeBeginBlock, chainID))
  231. rows, err = sqlStmt.RunWith(db).Query()
  232. if err != nil {
  233. return false, err
  234. }
  235. if !rows.Next() {
  236. return false, nil
  237. }
  238. sqlStmt = sq.
  239. Select("type, height").
  240. Distinct().
  241. From(TableEventBlock).
  242. Where(fmt.Sprintf("height = %d AND type = '%s'", h, types.EventTypeEndBlock))
  243. rows, err = sqlStmt.RunWith(db).Query()
  244. if err != nil {
  245. return false, err
  246. }
  247. return rows.Next(), nil
  248. }
  249. func setupDB(t *testing.T) (*dockertest.Pool, error) {
  250. t.Helper()
  251. pool, err := dockertest.NewPool(os.Getenv("DOCKER_URL"))
  252. require.NoError(t, err)
  253. resource, err = pool.RunWithOptions(&dockertest.RunOptions{
  254. Repository: DriverName,
  255. Tag: "13",
  256. Env: []string{
  257. "POSTGRES_USER=" + user,
  258. "POSTGRES_PASSWORD=" + password,
  259. "POSTGRES_DB=" + dbName,
  260. "listen_addresses = '*'",
  261. },
  262. ExposedPorts: []string{port},
  263. }, func(config *docker.HostConfig) {
  264. // set AutoRemove to true so that stopped container goes away by itself
  265. config.AutoRemove = true
  266. config.RestartPolicy = docker.RestartPolicy{
  267. Name: "no",
  268. }
  269. })
  270. require.NoError(t, err)
  271. // Set the container to expire in a minute to avoid orphaned containers
  272. // hanging around
  273. _ = resource.Expire(60)
  274. conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName)
  275. if err = pool.Retry(func() error {
  276. var err error
  277. _, db, err = NewEventSink(conn, chainID)
  278. if err != nil {
  279. return err
  280. }
  281. return db.Ping()
  282. }); err != nil {
  283. require.NoError(t, err)
  284. }
  285. resetDB(t)
  286. sm, err := readSchema()
  287. assert.Nil(t, err)
  288. assert.Nil(t, schema.NewMigrator().Apply(db, sm))
  289. return pool, nil
  290. }
  291. func teardown(t *testing.T, pool *dockertest.Pool) error {
  292. t.Helper()
  293. // When you're done, kill and remove the container
  294. assert.Nil(t, pool.Purge(resource))
  295. return db.Close()
  296. }