You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

376 lines
8.5 KiB

  1. package psql
  2. import (
  3. "context"
  4. "database/sql"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "os"
  9. "testing"
  10. "time"
  11. sq "github.com/Masterminds/squirrel"
  12. schema "github.com/adlio/schema"
  13. proto "github.com/gogo/protobuf/proto"
  14. _ "github.com/lib/pq"
  15. dockertest "github.com/ory/dockertest"
  16. "github.com/ory/dockertest/docker"
  17. "github.com/stretchr/testify/assert"
  18. "github.com/stretchr/testify/require"
  19. abci "github.com/tendermint/tendermint/abci/types"
  20. "github.com/tendermint/tendermint/state/indexer"
  21. "github.com/tendermint/tendermint/types"
  22. )
  23. var db *sql.DB
  24. var resource *dockertest.Resource
  25. var chainID = "test-chainID"
  26. var (
  27. user = "postgres"
  28. password = "secret"
  29. port = "5432"
  30. dsn = "postgres://%s:%s@localhost:%s/%s?sslmode=disable"
  31. dbName = "postgres"
  32. )
  33. func TestType(t *testing.T) {
  34. pool, err := setupDB(t)
  35. require.NoError(t, err)
  36. psqlSink := &EventSink{store: db, chainID: chainID}
  37. assert.Equal(t, indexer.PSQL, psqlSink.Type())
  38. require.NoError(t, teardown(t, pool))
  39. }
  40. func TestBlockFuncs(t *testing.T) {
  41. pool, err := setupDB(t)
  42. require.NoError(t, err)
  43. indexer := &EventSink{store: db, chainID: chainID}
  44. require.NoError(t, indexer.IndexBlockEvents(getTestBlockHeader()))
  45. r, err := verifyBlock(1)
  46. assert.True(t, r)
  47. require.NoError(t, err)
  48. r, err = verifyBlock(2)
  49. assert.False(t, r)
  50. require.NoError(t, err)
  51. r, err = indexer.HasBlock(1)
  52. assert.False(t, r)
  53. assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err)
  54. r, err = indexer.HasBlock(2)
  55. assert.False(t, r)
  56. assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err)
  57. r2, err := indexer.SearchBlockEvents(context.TODO(), nil)
  58. assert.Nil(t, r2)
  59. assert.Equal(t, errors.New("block search is not supported via the postgres event sink"), err)
  60. require.NoError(t, verifyTimeStamp(TableEventBlock))
  61. // try to insert the duplicate block events.
  62. err = indexer.IndexBlockEvents(getTestBlockHeader())
  63. require.NoError(t, err)
  64. require.NoError(t, teardown(t, pool))
  65. }
  66. func TestTxFuncs(t *testing.T) {
  67. pool, err := setupDB(t)
  68. assert.Nil(t, err)
  69. indexer := &EventSink{store: db, chainID: chainID}
  70. txResult := txResultWithEvents([]abci.Event{
  71. {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}},
  72. {Type: "account", Attributes: []abci.EventAttribute{{Key: "owner", Value: "Ivan", Index: true}}},
  73. {Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}},
  74. })
  75. err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
  76. require.NoError(t, err)
  77. tx, err := verifyTx(types.Tx(txResult.Tx).Hash())
  78. require.NoError(t, err)
  79. assert.Equal(t, txResult, tx)
  80. require.NoError(t, verifyTimeStamp(TableEventTx))
  81. require.NoError(t, verifyTimeStamp(TableResultTx))
  82. tx, err = indexer.GetTxByHash(types.Tx(txResult.Tx).Hash())
  83. assert.Nil(t, tx)
  84. assert.Equal(t, errors.New("getTxByHash is not supported via the postgres event sink"), err)
  85. r2, err := indexer.SearchTxEvents(context.TODO(), nil)
  86. assert.Nil(t, r2)
  87. assert.Equal(t, errors.New("tx search is not supported via the postgres event sink"), err)
  88. // try to insert the duplicate tx events.
  89. err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
  90. require.NoError(t, err)
  91. assert.Nil(t, teardown(t, pool))
  92. }
  93. func TestStop(t *testing.T) {
  94. pool, err := setupDB(t)
  95. require.NoError(t, err)
  96. indexer := &EventSink{store: db}
  97. require.NoError(t, indexer.Stop())
  98. defer db.Close()
  99. require.NoError(t, pool.Purge(resource))
  100. }
  101. func getTestBlockHeader() types.EventDataNewBlockHeader {
  102. return types.EventDataNewBlockHeader{
  103. Header: types.Header{Height: 1},
  104. ResultBeginBlock: abci.ResponseBeginBlock{
  105. Events: []abci.Event{
  106. {
  107. Type: "begin_event",
  108. Attributes: []abci.EventAttribute{
  109. {
  110. Key: "proposer",
  111. Value: "FCAA001",
  112. Index: true,
  113. },
  114. },
  115. },
  116. },
  117. },
  118. ResultEndBlock: abci.ResponseEndBlock{
  119. Events: []abci.Event{
  120. {
  121. Type: "end_event",
  122. Attributes: []abci.EventAttribute{
  123. {
  124. Key: "foo",
  125. Value: "100",
  126. Index: true,
  127. },
  128. },
  129. },
  130. },
  131. },
  132. }
  133. }
  134. func readSchema() ([]*schema.Migration, error) {
  135. filename := "schema.sql"
  136. contents, err := ioutil.ReadFile(filename)
  137. if err != nil {
  138. return nil, fmt.Errorf("failed to read sql file from '%s': %w", filename, err)
  139. }
  140. mg := &schema.Migration{}
  141. mg.ID = time.Now().Local().String() + " db schema"
  142. mg.Script = string(contents)
  143. return append([]*schema.Migration{}, mg), nil
  144. }
  145. func resetDB(t *testing.T) {
  146. q := "DROP TABLE IF EXISTS block_events,tx_events,tx_results"
  147. _, err := db.Exec(q)
  148. require.NoError(t, err)
  149. q = "DROP TYPE IF EXISTS block_event_type"
  150. _, err = db.Exec(q)
  151. require.NoError(t, err)
  152. }
  153. func txResultWithEvents(events []abci.Event) *abci.TxResult {
  154. tx := types.Tx("HELLO WORLD")
  155. return &abci.TxResult{
  156. Height: 1,
  157. Index: 0,
  158. Tx: tx,
  159. Result: abci.ResponseDeliverTx{
  160. Data: []byte{0},
  161. Code: abci.CodeTypeOK,
  162. Log: "",
  163. Events: events,
  164. },
  165. }
  166. }
  167. func verifyTx(hash []byte) (*abci.TxResult, error) {
  168. join := fmt.Sprintf("%s ON %s.id = tx_result_id", TableEventTx, TableResultTx)
  169. sqlStmt := sq.
  170. Select("tx_result", fmt.Sprintf("%s.id", TableResultTx), "tx_result_id", "hash", "chain_id").
  171. Distinct().From(TableResultTx).
  172. InnerJoin(join).
  173. Where(fmt.Sprintf("hash = $1 AND chain_id = '%s'", chainID), fmt.Sprintf("%X", hash))
  174. rows, err := sqlStmt.RunWith(db).Query()
  175. if err != nil {
  176. return nil, err
  177. }
  178. defer rows.Close()
  179. if rows.Next() {
  180. var txResult []byte
  181. var txResultID, txid int
  182. var h, cid string
  183. err = rows.Scan(&txResult, &txResultID, &txid, &h, &cid)
  184. if err != nil {
  185. return nil, nil
  186. }
  187. msg := new(abci.TxResult)
  188. err = proto.Unmarshal(txResult, msg)
  189. if err != nil {
  190. return nil, err
  191. }
  192. return msg, err
  193. }
  194. // No result
  195. return nil, nil
  196. }
  197. func verifyTimeStamp(tb string) error {
  198. // We assume the tx indexing time would not exceed 2 second from now
  199. sqlStmt := sq.
  200. Select(fmt.Sprintf("%s.created_at", tb)).
  201. Distinct().From(tb).
  202. Where(fmt.Sprintf("%s.created_at >= $1", tb), time.Now().Add(-2*time.Second))
  203. rows, err := sqlStmt.RunWith(db).Query()
  204. if err != nil {
  205. return err
  206. }
  207. defer rows.Close()
  208. if rows.Next() {
  209. var ts string
  210. err = rows.Scan(&ts)
  211. if err != nil {
  212. return err
  213. }
  214. return nil
  215. }
  216. return errors.New("no result")
  217. }
  218. func verifyBlock(h int64) (bool, error) {
  219. sqlStmt := sq.
  220. Select("height").
  221. Distinct().
  222. From(TableEventBlock).
  223. Where(fmt.Sprintf("height = %d", h))
  224. rows, err := sqlStmt.RunWith(db).Query()
  225. if err != nil {
  226. return false, err
  227. }
  228. defer rows.Close()
  229. if !rows.Next() {
  230. return false, nil
  231. }
  232. sqlStmt = sq.
  233. Select("type, height", "chain_id").
  234. Distinct().
  235. From(TableEventBlock).
  236. Where(fmt.Sprintf("height = %d AND type = '%s' AND chain_id = '%s'", h, types.EventTypeBeginBlock, chainID))
  237. rows, err = sqlStmt.RunWith(db).Query()
  238. if err != nil {
  239. return false, err
  240. }
  241. defer rows.Close()
  242. if !rows.Next() {
  243. return false, nil
  244. }
  245. sqlStmt = sq.
  246. Select("type, height").
  247. Distinct().
  248. From(TableEventBlock).
  249. Where(fmt.Sprintf("height = %d AND type = '%s'", h, types.EventTypeEndBlock))
  250. rows, err = sqlStmt.RunWith(db).Query()
  251. if err != nil {
  252. return false, err
  253. }
  254. defer rows.Close()
  255. return rows.Next(), nil
  256. }
  257. func setupDB(t *testing.T) (*dockertest.Pool, error) {
  258. t.Helper()
  259. pool, err := dockertest.NewPool(os.Getenv("DOCKER_URL"))
  260. require.NoError(t, err)
  261. resource, err = pool.RunWithOptions(&dockertest.RunOptions{
  262. Repository: DriverName,
  263. Tag: "13",
  264. Env: []string{
  265. "POSTGRES_USER=" + user,
  266. "POSTGRES_PASSWORD=" + password,
  267. "POSTGRES_DB=" + dbName,
  268. "listen_addresses = '*'",
  269. },
  270. ExposedPorts: []string{port},
  271. }, func(config *docker.HostConfig) {
  272. // set AutoRemove to true so that stopped container goes away by itself
  273. config.AutoRemove = true
  274. config.RestartPolicy = docker.RestartPolicy{
  275. Name: "no",
  276. }
  277. })
  278. require.NoError(t, err)
  279. // Set the container to expire in a minute to avoid orphaned containers
  280. // hanging around
  281. _ = resource.Expire(60)
  282. conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName)
  283. if err = pool.Retry(func() error {
  284. var err error
  285. _, db, err = NewEventSink(conn, chainID)
  286. if err != nil {
  287. return err
  288. }
  289. return db.Ping()
  290. }); err != nil {
  291. require.NoError(t, err)
  292. }
  293. resetDB(t)
  294. sm, err := readSchema()
  295. assert.Nil(t, err)
  296. assert.Nil(t, schema.NewMigrator().Apply(db, sm))
  297. return pool, nil
  298. }
  299. func teardown(t *testing.T, pool *dockertest.Pool) error {
  300. t.Helper()
  301. // When you're done, kill and remove the container
  302. assert.Nil(t, pool.Purge(resource))
  303. return db.Close()
  304. }