From 711a7181625377085055079aba3940c26587c029 Mon Sep 17 00:00:00 2001 From: JayT106 Date: Thu, 27 May 2021 10:44:46 -0400 Subject: [PATCH] config/indexer: custom event indexing (#6411) --- CHANGELOG_PENDING.md | 1 + config/config.go | 14 +- config/toml.go | 10 +- .../adr-065-custom-event-indexing.md | 122 +++--- go.mod | 4 + go.sum | 39 ++ node/node.go | 20 +- node/node_test.go | 95 +++++ node/setup.go | 92 +++-- rpc/core/blocks.go | 19 +- rpc/core/env.go | 3 +- rpc/core/tx.go | 160 ++++---- state/indexer/eventsink.go | 54 +++ state/indexer/indexer.go | 9 +- state/indexer/indexer_service.go | 70 +++- state/indexer/indexer_service_test.go | 142 ++++++- state/indexer/sink/kv/kv.go | 61 +++ state/indexer/sink/kv/kv_test.go | 351 +++++++++++++++++ state/indexer/sink/null/null.go | 51 +++ state/indexer/sink/null/null_test.go | 39 ++ state/indexer/sink/psql/psql.go | 197 ++++++++++ state/indexer/sink/psql/psql_test.go | 365 ++++++++++++++++++ state/indexer/sink/psql/schema.sql | 31 ++ state/indexer/tx/kv/kv.go | 57 +-- state/indexer/tx/kv/kv_bench_test.go | 2 +- state/indexer/tx/kv/kv_test.go | 24 +- state/indexer/tx/null/null.go | 2 +- types/events.go | 3 + 28 files changed, 1760 insertions(+), 277 deletions(-) create mode 100644 state/indexer/eventsink.go create mode 100644 state/indexer/sink/kv/kv.go create mode 100644 state/indexer/sink/kv/kv_test.go create mode 100644 state/indexer/sink/null/null.go create mode 100644 state/indexer/sink/null/null_test.go create mode 100644 state/indexer/sink/psql/psql.go create mode 100644 state/indexer/sink/psql/psql_test.go create mode 100644 state/indexer/sink/psql/schema.sql diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index dbe0fbdec..7d699e63a 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -64,6 +64,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi accomodate for the new p2p stack. Removes the notion of seeds and crawling. All peer exchange reactors behave the same. (@cmwaters) - [crypto] \#6376 Enable sr25519 as a validator key +- [config/indexer] \#6411 Introduce support for custom event indexing data sources, specifically PostgreSQL. (@JayT106) ### IMPROVEMENTS diff --git a/config/config.go b/config/config.go index 681f0c9d9..94a99f38f 100644 --- a/config/config.go +++ b/config/config.go @@ -1058,19 +1058,25 @@ func (cfg *ConsensusConfig) ValidateBasic() error { // TxIndexConfig defines the configuration for the transaction indexer, // including composite keys to index. type TxIndexConfig struct { - // What indexer to use for transactions + // The backend database list to back the indexer. + // If list contains `null`, meaning no indexer service will be used. // // Options: - // 1) "null" + // 1) "null" - no indexer services. // 2) "kv" (default) - the simplest possible indexer, // backed by key-value storage (defaults to levelDB; see DBBackend). - Indexer string `mapstructure:"indexer"` + // 3) "psql" - the indexer services backed by PostgreSQL. + Indexer []string `mapstructure:"indexer"` + + // The PostgreSQL connection configuration, the connection format: + // postgresql://:@:/? + PsqlConn string `mapstructure:"psql-conn"` } // DefaultTxIndexConfig returns a default configuration for the transaction indexer. func DefaultTxIndexConfig() *TxIndexConfig { return &TxIndexConfig{ - Indexer: "kv", + Indexer: []string{"kv"}, } } diff --git a/config/toml.go b/config/toml.go index 5e9765971..f33f48444 100644 --- a/config/toml.go +++ b/config/toml.go @@ -469,7 +469,8 @@ peer-query-maj23-sleep-duration = "{{ .Consensus.PeerQueryMaj23SleepDuration }}" ####################################################### [tx-index] -# What indexer to use for transactions +# The backend database list to back the indexer. +# If list contains null, meaning no indexer service will be used. # # The application will set which txs to index. In some cases a node operator will be able # to decide which txs to index based on configuration set in the application. @@ -478,7 +479,12 @@ peer-query-maj23-sleep-duration = "{{ .Consensus.PeerQueryMaj23SleepDuration }}" # 1) "null" # 2) "kv" (default) - the simplest possible indexer, backed by key-value storage (defaults to levelDB; see DBBackend). # - When "kv" is chosen "tx.height" and "tx.hash" will always be indexed. -indexer = "{{ .TxIndex.Indexer }}" +# 3) "psql" - the indexer services backed by PostgreSQL. +indexer = [{{ range $i, $e := .TxIndex.Indexer }}{{if $i}}, {{end}}{{ printf "%q" $e}}{{end}}] + +# The PostgreSQL connection configuration, the connection format: +# postgresql://:@:/? +psql-conn = "{{ .TxIndex.PsqlConn }}" ####################################################### ### Instrumentation Configuration Options ### diff --git a/docs/architecture/adr-065-custom-event-indexing.md b/docs/architecture/adr-065-custom-event-indexing.md index d46b8f4f4..e6a3fdead 100644 --- a/docs/architecture/adr-065-custom-event-indexing.md +++ b/docs/architecture/adr-065-custom-event-indexing.md @@ -23,6 +23,7 @@ - April 1, 2021: Initial Draft (@alexanderbez) - April 28, 2021: Specify search capabilities are only supported through the KV indexer (@marbar3778) +- May 19, 2021: Update the SQL schema and the eventsink interface (@jayt106) ## Status @@ -96,13 +97,16 @@ The interface is defined as follows: ```go type EventSink interface { IndexBlockEvents(types.EventDataNewBlockHeader) error - IndexTxEvents(*abci.TxResult) error + IndexTxEvents([]*abci.TxResult) error SearchBlockEvents(context.Context, *query.Query) ([]int64, error) SearchTxEvents(context.Context, *query.Query) ([]*abci.TxResult, error) GetTxByHash([]byte) (*abci.TxResult, error) HasBlock(int64) (bool, error) + + Type() EventSinkType + Stop() error } ``` @@ -136,33 +140,41 @@ This type of `EventSink` indexes block and transaction events into a [PostgreSQL database. We define and automatically migrate the following schema when the `IndexerService` starts. -The postgres eventsink will not support `tx_search` and `block_search`. +The postgres eventsink will not support `tx_search`, `block_search`, `GetTxByHash` and `HasBlock`. ```sql -- Table Definition ---------------------------------------------- -CREATE TYPE IF NOT EXISTS block_event_type AS ENUM ('begin_block', 'end_block'); +CREATE TYPE block_event_type AS ENUM ('begin_block', 'end_block', ''); -CREATE TABLE IF NOT EXISTS block_events ( +CREATE TABLE block_events ( id SERIAL PRIMARY KEY, key VARCHAR NOT NULL, value VARCHAR NOT NULL, height INTEGER NOT NULL, - type block_event_type + type block_event_type, + created_at TIMESTAMPTZ NOT NULL, + chain_id VARCHAR NOT NULL ); -CREATE TABLE IF NOT EXISTS tx_results { - id SERIAL PRIMARY KEY, - tx_result BYTEA NOT NULL -} +CREATE TABLE tx_results ( + id SERIAL PRIMARY KEY, + tx_result BYTEA NOT NULL, + created_at TIMESTAMPTZ NOT NULL +); -CREATE TABLE IF NOT EXISTS tx_events ( +CREATE TABLE tx_events ( id SERIAL PRIMARY KEY, key VARCHAR NOT NULL, value VARCHAR NOT NULL, height INTEGER NOT NULL, hash VARCHAR NOT NULL, - FOREIGN KEY (tx_result_id) REFERENCES tx_results(id) ON DELETE CASCADE + tx_result_id SERIAL, + created_at TIMESTAMPTZ NOT NULL, + chain_id VARCHAR NOT NULL, + FOREIGN KEY (tx_result_id) + REFERENCES tx_results(id) + ON DELETE CASCADE ); -- Indices ------------------------------------------------------- @@ -177,7 +189,7 @@ The `PSQLEventSink` will implement the `EventSink` interface as follows ```go -func NewPSQLEventSink(connStr string) (*PSQLEventSink, error) { +func NewPSQLEventSink(connStr string, chainID string) (*PSQLEventSink, error) { db, err := sql.Open("postgres", connStr) if err != nil { return nil, err @@ -187,10 +199,11 @@ func NewPSQLEventSink(connStr string) (*PSQLEventSink, error) { } func (es *PSQLEventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error { - sqlStmt := sq.Insert("block_events").Columns("key", "value", "height", "type") + sqlStmt := sq.Insert("block_events").Columns("key", "value", "height", "type", "created_at", "chain_id") // index the reserved block height index - sqlStmt = sqlStmt.Values(types.BlockHeightKey, h.Header.Height, h.Header.Height, "") + ts := time.Now() + sqlStmt = sqlStmt.Values(types.BlockHeightKey, h.Header.Height, h.Header.Height, "", ts, es.chainID) for _, event := range h.ResultBeginBlock.Events { // only index events with a non-empty type @@ -210,7 +223,7 @@ func (es *PSQLEventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error } if attr.GetIndex() { - sqlStmt = sqlStmt.Values(compositeKey, string(attr.Value), h.Header.Height, BlockEventTypeBeginBlock) + sqlStmt = sqlStmt.Values(compositeKey, string(attr.Value), h.Header.Height, BlockEventTypeBeginBlock, ts, es.chainID) } } } @@ -219,51 +232,58 @@ func (es *PSQLEventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error // execute sqlStmt db query... } -func (es *PSQLEventSink) IndexTxEvents(txr *abci.TxResult) error { - sqlStmtEvents := sq.Insert("tx_events").Columns("key", "value", "height", "hash", "tx_result_id") - sqlStmtTxResult := sq.Insert("tx_results").Columns("tx_result") - +func (es *PSQLEventSink) IndexTxEvents(txr []*abci.TxResult) error { + sqlStmtEvents := sq.Insert("tx_events").Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id") + sqlStmtTxResult := sq.Insert("tx_results").Columns("tx_result", "created_at") - // store the tx result - txBz, err := proto.Marshal(txr) - if err != nil { - return err - } + ts := time.Now() + for _, tx := range txr { + // store the tx result + txBz, err := proto.Marshal(tx) + if err != nil { + return err + } - sqlStmtTxResult = sqlStmtTxResult.Values(txBz) + sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts) - // execute sqlStmtTxResult db query... + // execute sqlStmtTxResult db query... + var txID uint32 + err = sqlStmtTxResult.QueryRow().Scan(&txID) + if err != nil { + return err + } - // index the reserved height and hash indices - hash := types.Tx(txr.Tx).Hash() - sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, txr.Height, hash, txrID) - sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, txr.Height, txr.Height, hash, txrID) + // index the reserved height and hash indices + hash := types.Tx(tx.Tx).Hash() + sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txID, ts, es.chainID) + sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, tx.Height, tx.Height, hash, txID, ts, es.chainID) - for _, event := range result.Result.Events { - // only index events with a non-empty type - if len(event.Type) == 0 { - continue - } - - for _, attr := range event.Attributes { - if len(attr.Key) == 0 { + for _, event := range result.Result.Events { + // only index events with a non-empty type + if len(event.Type) == 0 { continue } - // index if `index: true` is set - compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key)) + for _, attr := range event.Attributes { + if len(attr.Key) == 0 { + continue + } + + // index if `index: true` is set + compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key)) - // ensure event does not conflict with a reserved prefix key - if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey { - return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag) - } + // ensure event does not conflict with a reserved prefix key + if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey { + return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag) + } - if attr.GetIndex() { - sqlStmtEvents = sqlStmtEvents.Values(compositeKey, string(attr.Value), txr.Height, hash, txrID) + if attr.GetIndex() { + sqlStmtEvents = sqlStmtEvents.Values(compositeKey, string(attr.Value), tx.Height, hash, txID, ts, es.chainID) + } } } } - + // execute sqlStmtEvents db query... } @@ -274,6 +294,14 @@ func (es *PSQLEventSink) SearchBlockEvents(ctx context.Context, q *query.Query) func (es *PSQLEventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) { return nil, errors.New("tx search is not supported via the postgres event sink") } + +func (es *PSQLEventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) { + return nil, errors.New("getTxByHash is not supported via the postgres event sink") +} + +func (es *PSQLEventSink) HasBlock(h int64) (bool, error) { + return false, errors.New("hasBlock is not supported via the postgres event sink") +} ``` ### Configuration diff --git a/go.mod b/go.mod index 551febe4a..d418207f2 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,9 @@ go 1.15 require ( github.com/BurntSushi/toml v0.3.1 github.com/ChainSafe/go-schnorrkel v0.0.0-20210222182958-bd440c890782 + github.com/Masterminds/squirrel v1.5.0 github.com/Workiva/go-datastructures v1.0.53 + github.com/adlio/schema v1.1.13 github.com/btcsuite/btcd v0.21.0-beta github.com/btcsuite/btcutil v1.0.2 github.com/confio/ics23/go v0.6.6 @@ -21,8 +23,10 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/gtank/merlin v0.1.1 github.com/hdevalence/ed25519consensus v0.0.0-20210204194344-59a8610d2b87 + github.com/lib/pq v1.10.1 github.com/libp2p/go-buffer-pool v0.0.2 github.com/minio/highwayhash v1.0.2 + github.com/ory/dockertest v3.3.5+incompatible github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.10.0 github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 diff --git a/go.sum b/go.sum index 552ca8b8c..e7a6e5fe0 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,8 @@ cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiy dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= filippo.io/edwards25519 v1.0.0-beta.2 h1:/BZRNzm8N4K4eWfK28dL4yescorxtO7YG1yun8fy+pI= filippo.io/edwards25519 v1.0.0-beta.2/go.mod h1:X+pm78QAUPtFLi1z9PYIlS/bdDnvbCOGKtZ+ACWEf7o= +github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= +github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -22,6 +24,12 @@ github.com/ChainSafe/go-schnorrkel v0.0.0-20210222182958-bd440c890782/go.mod h1: github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM= github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= +github.com/Masterminds/squirrel v1.5.0 h1:JukIZisrUXadA9pl3rMkjhiamxiB0cXiu+HGp/Y8cY8= +github.com/Masterminds/squirrel v1.5.0/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= +github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU= +github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= +github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= +github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= @@ -31,6 +39,8 @@ github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/ github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= github.com/Workiva/go-datastructures v1.0.53 h1:J6Y/52yX10Xc5JjXmGtWoSSxs3mZnGSaq37xZZh7Yig= github.com/Workiva/go-datastructures v1.0.53/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= +github.com/adlio/schema v1.1.13 h1:LeNMVg5Z1FX+Qgz8tJUijBLRdcpbFUElz+d1489On98= +github.com/adlio/schema v1.1.13/go.mod h1:L5Z7tw+7lRK1Fnpi/LT/ooCP1elkXn0krMWBQHUhEDE= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -70,6 +80,7 @@ github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= @@ -86,6 +97,8 @@ github.com/confio/ics23/go v0.0.0-20200817220745-f173e6211efb/go.mod h1:E45Nqnlp github.com/confio/ics23/go v0.6.3/go.mod h1:E45NqnlpxGnpfTWL/xauN7MRwEE28T4Dd4uraToOaKg= github.com/confio/ics23/go v0.6.6 h1:pkOy18YxxJ/r0XFDCnrl4Bjv6h4LkBSpLS6F38mrKL8= github.com/confio/ics23/go v0.6.6/go.mod h1:E45NqnlpxGnpfTWL/xauN7MRwEE28T4Dd4uraToOaKg= +github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 h1:NmTXa/uVnDyp0TY5MKi197+3HWcnYWfnHGyaFthlnGw= +github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= @@ -121,6 +134,10 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= +github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= +github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= +github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -227,6 +244,7 @@ github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.2.1/go.mod h1:EaizFBKfUKtMIF5iaDEhniwNedqGo9FuLFzppDr3uwI= @@ -300,12 +318,20 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.10.1 h1:6VXZrLU0jHBYyAqrSPa+MgPfnSvTPuMgK+k0o5kVFWo= +github.com/lib/pq v1.10.1/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= @@ -367,6 +393,12 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= +github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ= +github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= +github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVojFA6h/TRcI= +github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= +github.com/opencontainers/runc v0.1.1 h1:GlxAyO6x8rfZYN9Tt0Kti5a/cP41iuiO2yYT0IJGY8Y= +github.com/opencontainers/runc v0.1.1/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -375,6 +407,8 @@ github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxS github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= +github.com/ory/dockertest v3.3.5+incompatible h1:iLLK6SQwIhcbrG783Dghaaa3WPzGc+4Emza6EbVUUGA= +github.com/ory/dockertest v3.3.5+incompatible/go.mod h1:1vX4m9wsvi00u5bseYwXaSnhNrne+V0E6LAcBILJdPs= github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= @@ -447,7 +481,9 @@ github.com/sasha-s/go-deadlock v0.2.1-0.20190427202633-1595213edefa/go.mod h1:F7 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= @@ -598,6 +634,7 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200421231249-e086a090c8fd/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= @@ -641,6 +678,7 @@ golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -795,6 +833,7 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/node/node.go b/node/node.go index f2cc1e91b..ae034d431 100644 --- a/node/node.go +++ b/node/node.go @@ -15,6 +15,7 @@ import ( "github.com/rs/cors" dbm "github.com/tendermint/tm-db" + _ "github.com/lib/pq" // provide the psql db driver abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" cs "github.com/tendermint/tendermint/consensus" @@ -83,8 +84,7 @@ type Node struct { evidencePool *evidence.Pool // tracking evidence proxyApp proxy.AppConns // connection to the application rpcListeners []net.Listener // rpc servers - txIndexer indexer.TxIndexer - blockIndexer indexer.BlockIndexer + eventSinks []indexer.EventSink indexerService *indexer.Service prometheusSrv *http.Server } @@ -166,7 +166,7 @@ func NewNode(config *cfg.Config, return nil, err } - indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config, dbProvider, eventBus, logger) + indexerService, eventSinks, err := createAndStartIndexerService(config, dbProvider, eventBus, logger, genDoc.ChainID) if err != nil { return nil, err } @@ -232,7 +232,7 @@ func NewNode(config *cfg.Config, // TODO: Fetch and provide real options and do proper p2p bootstrapping. // TODO: Use a persistent peer database. - nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state) + nodeInfo, err := makeNodeInfo(config, nodeKey, eventSinks, genDoc, state) if err != nil { return nil, err } @@ -437,10 +437,9 @@ func NewNode(config *cfg.Config, evidenceReactor: evReactor, evidencePool: evPool, proxyApp: proxyApp, - txIndexer: txIndexer, indexerService: indexerService, - blockIndexer: blockIndexer, eventBus: eventBus, + eventSinks: eventSinks, } node.BaseService = *service.NewBaseService(logger, "Node", node) @@ -810,8 +809,7 @@ func (n *Node) ConfigureRPC() (*rpccore.Environment, error) { P2PTransport: n, GenDoc: n.genesisDoc, - TxIndexer: n.txIndexer, - BlockIndexer: n.blockIndexer, + EventSinks: n.eventSinks, ConsensusReactor: n.consensusReactor, EventBus: n.eventBus, Mempool: n.mempool, @@ -1040,9 +1038,9 @@ func (n *Node) Config() *cfg.Config { return n.config } -// TxIndexer returns the Node's TxIndexer. -func (n *Node) TxIndexer() indexer.TxIndexer { - return n.txIndexer +// EventSinks returns the Node's event indexing sinks. +func (n *Node) EventSinks() []indexer.EventSink { + return n.eventSinks } //------------------------------------------------------------------------------ diff --git a/node/node_test.go b/node/node_test.go index 43bba0099..0c6d01af2 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -2,6 +2,7 @@ package node import ( "context" + "errors" "fmt" "math" "net" @@ -29,6 +30,7 @@ import ( "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" @@ -527,6 +529,99 @@ func TestNodeNewSeedNode(t *testing.T) { assert.True(t, n.pexReactor.IsRunning()) } +func TestNodeSetEventSink(t *testing.T) { + config := cfg.ResetTestRoot("node_app_version_test") + defer os.RemoveAll(config.RootDir) + + // create & start node + n, err := DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + + assert.Equal(t, 1, len(n.eventSinks)) + assert.Equal(t, indexer.KV, n.eventSinks[0].Type()) + + config.TxIndex.Indexer = []string{"null"} + n, err = DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + + assert.Equal(t, 1, len(n.eventSinks)) + assert.Equal(t, indexer.NULL, n.eventSinks[0].Type()) + + config.TxIndex.Indexer = []string{"null", "kv"} + n, err = DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + + assert.Equal(t, 1, len(n.eventSinks)) + assert.Equal(t, indexer.NULL, n.eventSinks[0].Type()) + + config.TxIndex.Indexer = []string{"kvv"} + n, err = DefaultNewNode(config, log.TestingLogger()) + assert.Nil(t, n) + assert.Equal(t, errors.New("unsupported event sink type"), err) + + config.TxIndex.Indexer = []string{} + n, err = DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + + assert.Equal(t, 1, len(n.eventSinks)) + assert.Equal(t, indexer.NULL, n.eventSinks[0].Type()) + + config.TxIndex.Indexer = []string{"psql"} + n, err = DefaultNewNode(config, log.TestingLogger()) + assert.Nil(t, n) + assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err) + + var psqlConn = "test" + + config.TxIndex.Indexer = []string{"psql"} + config.TxIndex.PsqlConn = psqlConn + n, err = DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + assert.Equal(t, 1, len(n.eventSinks)) + assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type()) + n.OnStop() + + config.TxIndex.Indexer = []string{"psql", "kv"} + config.TxIndex.PsqlConn = psqlConn + n, err = DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + assert.Equal(t, 2, len(n.eventSinks)) + // we use map to filter the duplicated sinks, so it's not guarantee the order when append sinks. + if n.eventSinks[0].Type() == indexer.KV { + assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type()) + } else { + assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type()) + assert.Equal(t, indexer.KV, n.eventSinks[1].Type()) + } + n.OnStop() + + config.TxIndex.Indexer = []string{"kv", "psql"} + config.TxIndex.PsqlConn = psqlConn + n, err = DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + assert.Equal(t, 2, len(n.eventSinks)) + if n.eventSinks[0].Type() == indexer.KV { + assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type()) + } else { + assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type()) + assert.Equal(t, indexer.KV, n.eventSinks[1].Type()) + } + n.OnStop() + + var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml") + config.TxIndex.Indexer = []string{"psql", "kv", "Kv"} + config.TxIndex.PsqlConn = psqlConn + _, err = DefaultNewNode(config, log.TestingLogger()) + require.Error(t, err) + assert.Equal(t, e, err) + + config.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"} + config.TxIndex.PsqlConn = psqlConn + _, err = DefaultNewNode(config, log.TestingLogger()) + require.Error(t, err) + assert.Equal(t, e, err) +} + func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) { privVals := make([]types.PrivValidator, nVals) vals := make([]types.GenesisValidator, nVals) diff --git a/node/setup.go b/node/setup.go index 855273cb8..45ca5e01e 100644 --- a/node/setup.go +++ b/node/setup.go @@ -3,10 +3,12 @@ package node import ( "bytes" "context" + "errors" "fmt" "math" "net" _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port + "strings" "time" dbm "github.com/tendermint/tm-db" @@ -20,7 +22,7 @@ import ( "github.com/tendermint/tendermint/evidence" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" - "github.com/tendermint/tendermint/libs/strings" + tmStrings "github.com/tendermint/tendermint/libs/strings" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p/pex" @@ -28,10 +30,9 @@ import ( "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/indexer" - blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv" - blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null" - "github.com/tendermint/tendermint/state/indexer/tx/kv" - "github.com/tendermint/tendermint/state/indexer/tx/null" + kv "github.com/tendermint/tendermint/state/indexer/sink/kv" + null "github.com/tendermint/tendermint/state/indexer/sink/null" + psql "github.com/tendermint/tendermint/state/indexer/sink/psql" "github.com/tendermint/tendermint/statesync" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" @@ -73,35 +74,61 @@ func createAndStartIndexerService( dbProvider DBProvider, eventBus *types.EventBus, logger log.Logger, -) (*indexer.Service, indexer.TxIndexer, indexer.BlockIndexer, error) { + chainID string, +) (*indexer.Service, []indexer.EventSink, error) { - var ( - txIndexer indexer.TxIndexer - blockIndexer indexer.BlockIndexer - ) + eventSinks := []indexer.EventSink{} - switch config.TxIndex.Indexer { - case "kv": - store, err := dbProvider(&DBContext{"tx_index", config}) - if err != nil { - return nil, nil, nil, err + // Check duplicated sinks. + sinks := map[string]bool{} + for _, s := range config.TxIndex.Indexer { + sl := strings.ToLower(s) + if sinks[sl] { + return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml") + } + sinks[sl] = true + } + +loop: + for k := range sinks { + switch k { + case string(indexer.NULL): + // when we see null in the config, the eventsinks will be reset with the nullEventSink. + eventSinks = []indexer.EventSink{null.NewEventSink()} + break loop + case string(indexer.KV): + store, err := dbProvider(&DBContext{"tx_index", config}) + if err != nil { + return nil, nil, err + } + eventSinks = append(eventSinks, kv.NewEventSink(store)) + case string(indexer.PSQL): + conn := config.TxIndex.PsqlConn + if conn == "" { + return nil, nil, errors.New("the psql connection settings cannot be empty") + } + es, _, err := psql.NewEventSink(conn, chainID) + if err != nil { + return nil, nil, err + } + eventSinks = append(eventSinks, es) + default: + return nil, nil, errors.New("unsupported event sink type") } + } - txIndexer = kv.NewTxIndex(store) - blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events"))) - default: - txIndexer = &null.TxIndex{} - blockIndexer = &blockidxnull.BlockerIndexer{} + if len(eventSinks) == 0 { + eventSinks = []indexer.EventSink{null.NewEventSink()} } - indexerService := indexer.NewIndexerService(txIndexer, blockIndexer, eventBus) + indexerService := indexer.NewIndexerService(eventSinks, eventBus) indexerService.SetLogger(logger.With("module", "txindex")) if err := indexerService.Start(); err != nil { - return nil, nil, nil, err + return nil, nil, err } - return indexerService, txIndexer, blockIndexer, nil + return indexerService, eventSinks, nil } func doHandshake( @@ -381,7 +408,7 @@ func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{}, p2p.MConnTransportOptions{ MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers + - len(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")), + len(tmStrings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")), ), }, ) @@ -418,7 +445,7 @@ func createPeerManager( } privatePeerIDs := make(map[p2p.NodeID]struct{}) - for _, id := range strings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") { + for _, id := range tmStrings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") { privatePeerIDs[p2p.NodeID(id)] = struct{}{} } @@ -434,7 +461,7 @@ func createPeerManager( } peers := []p2p.NodeAddress{} - for _, p := range strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") { + for _, p := range tmStrings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") { address, err := p2p.ParseNodeAddress(p) if err != nil { return nil, fmt.Errorf("invalid peer address %q: %w", p, err) @@ -444,7 +471,7 @@ func createPeerManager( options.PersistentPeers = append(options.PersistentPeers, address.NodeID) } - for _, p := range strings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") { + for _, p := range tmStrings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") { address, err := p2p.ParseNodeAddress(p) if err != nil { return nil, fmt.Errorf("invalid peer address %q: %w", p, err) @@ -611,7 +638,7 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, sw *p2p.Switch, logger log.Logger) *pex.Reactor { reactorConfig := &pex.ReactorConfig{ - Seeds: strings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "), + Seeds: tmStrings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "), SeedMode: config.Mode == cfg.ModeSeed, // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000 // blocks assuming 10s blocks ~ 28 hours. @@ -647,13 +674,14 @@ func createPEXReactorV2( func makeNodeInfo( config *cfg.Config, nodeKey p2p.NodeKey, - txIndexer indexer.TxIndexer, + eventSinks []indexer.EventSink, genDoc *types.GenesisDoc, state sm.State, ) (p2p.NodeInfo, error) { - txIndexerStatus := "on" - if _, ok := txIndexer.(*null.TxIndex); ok { - txIndexerStatus = "off" + txIndexerStatus := "off" + + if indexer.IndexingEnabled(eventSinks) { + txIndexerStatus = "on" } var bcChannel byte diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index 731026d4f..5b3d75769 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -1,7 +1,6 @@ package core import ( - "errors" "fmt" "sort" @@ -9,7 +8,7 @@ import ( tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types" - blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null" + "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/types" ) @@ -181,9 +180,8 @@ func (env *Environment) BlockSearch( orderBy string, ) (*ctypes.ResultBlockSearch, error) { - // skip if block indexing is disabled - if _, ok := env.BlockIndexer.(*blockidxnull.BlockerIndexer); ok { - return nil, errors.New("block indexing is disabled") + if !indexer.KVSinkEnabled(env.EventSinks) { + return nil, fmt.Errorf("block searching is disabled due to no kvEventSink") } q, err := tmquery.New(query) @@ -191,7 +189,14 @@ func (env *Environment) BlockSearch( return nil, err } - results, err := env.BlockIndexer.Search(ctx.Context(), q) + var kvsink indexer.EventSink + for _, sink := range env.EventSinks { + if sink.Type() == indexer.KV { + kvsink = sink + } + } + + results, err := kvsink.SearchBlockEvents(ctx.Context(), q) if err != nil { return nil, err } @@ -205,7 +210,7 @@ func (env *Environment) BlockSearch( sort.Slice(results, func(i, j int) bool { return results[i] < results[j] }) default: - return nil, fmt.Errorf("%w: expected order_by to be either `asc` or `desc` or empty", ctypes.ErrInvalidRequest) + return nil, fmt.Errorf("expected order_by to be either `asc` or `desc` or empty: %w", ctypes.ErrInvalidRequest) } // paginate results diff --git a/rpc/core/env.go b/rpc/core/env.go index 2dccb2b41..c436d0175 100644 --- a/rpc/core/env.go +++ b/rpc/core/env.go @@ -77,8 +77,7 @@ type Environment struct { // objects PubKey crypto.PubKey GenDoc *types.GenesisDoc // cache the genesis structure - TxIndexer indexer.TxIndexer - BlockIndexer indexer.BlockIndexer + EventSinks []indexer.EventSink ConsensusReactor *consensus.Reactor EventBus *types.EventBus // thread safe Mempool mempl.Mempool diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 6cebd1a9a..1b3da3075 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -9,7 +9,7 @@ import ( tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types" - "github.com/tendermint/tendermint/state/indexer/tx/null" + "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/types" ) @@ -19,36 +19,39 @@ import ( // More: https://docs.tendermint.com/master/rpc/#/Info/tx func (env *Environment) Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { // if index is disabled, return error - if _, ok := env.TxIndexer.(*null.TxIndex); ok { - return nil, fmt.Errorf("transaction indexing is disabled") - } - r, err := env.TxIndexer.Get(hash) - if err != nil { - return nil, err + if !indexer.KVSinkEnabled(env.EventSinks) { + return nil, errors.New("transaction querying is disabled due to no kvEventSink") } - if r == nil { - return nil, fmt.Errorf("tx (%X) not found", hash) - } + for _, sink := range env.EventSinks { + if sink.Type() == indexer.KV { + r, err := sink.GetTxByHash(hash) + if r == nil { + return nil, fmt.Errorf("tx (%X) not found, err: %w", hash, err) + } - height := r.Height - index := r.Index + height := r.Height + index := r.Index - var proof types.TxProof - if prove { - block := env.BlockStore.LoadBlock(height) - proof = block.Data.Txs.Proof(int(index)) // XXX: overflow on 32-bit machines + var proof types.TxProof + if prove { + block := env.BlockStore.LoadBlock(height) + proof = block.Data.Txs.Proof(int(index)) // XXX: overflow on 32-bit machines + } + + return &ctypes.ResultTx{ + Hash: hash, + Height: height, + Index: index, + TxResult: r.Result, + Tx: r.Tx, + Proof: proof, + }, nil + } } - return &ctypes.ResultTx{ - Hash: hash, - Height: height, - Index: index, - TxResult: r.Result, - Tx: r.Tx, - Proof: proof, - }, nil + return nil, fmt.Errorf("transaction querying is disabled on this node due to the KV event sink being disabled") } // TxSearch allows you to query for multiple transactions results. It returns a @@ -62,9 +65,8 @@ func (env *Environment) TxSearch( orderBy string, ) (*ctypes.ResultTxSearch, error) { - // if index is disabled, return error - if _, ok := env.TxIndexer.(*null.TxIndex); ok { - return nil, errors.New("transaction indexing is disabled") + if !indexer.KVSinkEnabled(env.EventSinks) { + return nil, fmt.Errorf("transaction searching is disabled due to no kvEventSink") } q, err := tmquery.New(query) @@ -72,62 +74,68 @@ func (env *Environment) TxSearch( return nil, err } - results, err := env.TxIndexer.Search(ctx.Context(), q) - if err != nil { - return nil, err - } - - // sort results (must be done before pagination) - switch orderBy { - case "desc", "": - sort.Slice(results, func(i, j int) bool { - if results[i].Height == results[j].Height { - return results[i].Index > results[j].Index + for _, sink := range env.EventSinks { + if sink.Type() == indexer.KV { + results, err := sink.SearchTxEvents(ctx.Context(), q) + if err != nil { + return nil, err } - return results[i].Height > results[j].Height - }) - case "asc": - sort.Slice(results, func(i, j int) bool { - if results[i].Height == results[j].Height { - return results[i].Index < results[j].Index + + // sort results (must be done before pagination) + switch orderBy { + case "desc", "": + sort.Slice(results, func(i, j int) bool { + if results[i].Height == results[j].Height { + return results[i].Index > results[j].Index + } + return results[i].Height > results[j].Height + }) + case "asc": + sort.Slice(results, func(i, j int) bool { + if results[i].Height == results[j].Height { + return results[i].Index < results[j].Index + } + return results[i].Height < results[j].Height + }) + default: + return nil, fmt.Errorf("expected order_by to be either `asc` or `desc` or empty: %w", ctypes.ErrInvalidRequest) } - return results[i].Height < results[j].Height - }) - default: - return nil, fmt.Errorf("%w: expected order_by to be either `asc` or `desc` or empty", ctypes.ErrInvalidRequest) - } - // paginate results - totalCount := len(results) - perPage := env.validatePerPage(perPagePtr) + // paginate results + totalCount := len(results) + perPage := env.validatePerPage(perPagePtr) - page, err := validatePage(pagePtr, perPage, totalCount) - if err != nil { - return nil, err - } - - skipCount := validateSkipCount(page, perPage) - pageSize := tmmath.MinInt(perPage, totalCount-skipCount) + page, err := validatePage(pagePtr, perPage, totalCount) + if err != nil { + return nil, err + } - apiResults := make([]*ctypes.ResultTx, 0, pageSize) - for i := skipCount; i < skipCount+pageSize; i++ { - r := results[i] + skipCount := validateSkipCount(page, perPage) + pageSize := tmmath.MinInt(perPage, totalCount-skipCount) + + apiResults := make([]*ctypes.ResultTx, 0, pageSize) + for i := skipCount; i < skipCount+pageSize; i++ { + r := results[i] + + var proof types.TxProof + if prove { + block := env.BlockStore.LoadBlock(r.Height) + proof = block.Data.Txs.Proof(int(r.Index)) // XXX: overflow on 32-bit machines + } + + apiResults = append(apiResults, &ctypes.ResultTx{ + Hash: types.Tx(r.Tx).Hash(), + Height: r.Height, + Index: r.Index, + TxResult: r.Result, + Tx: r.Tx, + Proof: proof, + }) + } - var proof types.TxProof - if prove { - block := env.BlockStore.LoadBlock(r.Height) - proof = block.Data.Txs.Proof(int(r.Index)) // XXX: overflow on 32-bit machines + return &ctypes.ResultTxSearch{Txs: apiResults, TotalCount: totalCount}, nil } - - apiResults = append(apiResults, &ctypes.ResultTx{ - Hash: types.Tx(r.Tx).Hash(), - Height: r.Height, - Index: r.Index, - TxResult: r.Result, - Tx: r.Tx, - Proof: proof, - }) } - return &ctypes.ResultTxSearch{Txs: apiResults, TotalCount: totalCount}, nil + return nil, fmt.Errorf("transaction searching is disabled on this node due to the KV event sink being disabled") } diff --git a/state/indexer/eventsink.go b/state/indexer/eventsink.go new file mode 100644 index 000000000..8c2529103 --- /dev/null +++ b/state/indexer/eventsink.go @@ -0,0 +1,54 @@ +package indexer + +import ( + "context" + + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/types" +) + +type EventSinkType string + +const ( + NULL EventSinkType = "null" + KV EventSinkType = "kv" + PSQL EventSinkType = "psql" +) + +// EventSink interface is defined the APIs for the IndexerService to interact with the data store, +// including the block/transaction indexing and the search functions. +// +// The IndexerService will accept a list of one or more EventSink types. During the OnStart method +// it will call the appropriate APIs on each EventSink to index both block and transaction events. +type EventSink interface { + + // IndexBlockEvents indexes the blockheader. + IndexBlockEvents(types.EventDataNewBlockHeader) error + + // IndexTxEvents indexes the given result of transactions. To call it with multi transactions, + // must guarantee the index of given transactions are in order. + IndexTxEvents([]*abci.TxResult) error + + // SearchBlockEvents provides the block search by given query conditions. This function only + // supported by the kvEventSink. + SearchBlockEvents(context.Context, *query.Query) ([]int64, error) + + // SearchTxEvents provides the transaction search by given query conditions. This function only + // supported by the kvEventSink. + SearchTxEvents(context.Context, *query.Query) ([]*abci.TxResult, error) + + // GetTxByHash provides the transaction search by given transaction hash. This function only + // supported by the kvEventSink. + GetTxByHash([]byte) (*abci.TxResult, error) + + // HasBlock provides the transaction search by given transaction hash. This function only + // supported by the kvEventSink. + HasBlock(int64) (bool, error) + + // Type checks the eventsink structure type. + Type() EventSinkType + + // Stop will close the data store connection, if the eventsink supports it. + Stop() error +} diff --git a/state/indexer/indexer.go b/state/indexer/indexer.go index ac8957e55..24dc62d70 100644 --- a/state/indexer/indexer.go +++ b/state/indexer/indexer.go @@ -11,11 +11,10 @@ import ( // TxIndexer interface defines methods to index and search transactions. type TxIndexer interface { - // AddBatch analyzes, indexes and stores a batch of transactions. - AddBatch(b *Batch) error - - // Index analyzes, indexes and stores a single transaction. - Index(result *abci.TxResult) error + // Index analyzes, indexes and stores transactions. For indexing multiple + // Transacions must guarantee the Index of the TxResult is in order. + // See Batch struct. + Index(results []*abci.TxResult) error // Get returns the transaction specified by hash or nil if the transaction is not indexed // or stored. diff --git a/state/indexer/indexer_service.go b/state/indexer/indexer_service.go index f101366d9..a429b66a0 100644 --- a/state/indexer/indexer_service.go +++ b/state/indexer/indexer_service.go @@ -18,19 +18,14 @@ const ( type Service struct { service.BaseService - txIdxr TxIndexer - blockIdxr BlockIndexer - eventBus *types.EventBus + eventSinks []EventSink + eventBus *types.EventBus } // NewIndexerService returns a new service instance. -func NewIndexerService( - txIdxr TxIndexer, - blockIdxr BlockIndexer, - eventBus *types.EventBus, -) *Service { +func NewIndexerService(es []EventSink, eventBus *types.EventBus) *Service { - is := &Service{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus} + is := &Service{eventSinks: es, eventBus: eventBus} is.BaseService = *service.NewBaseService(nil, "IndexerService", is) return is } @@ -57,6 +52,7 @@ func (is *Service) OnStart() error { go func() { for { msg := <-blockHeadersSub.Out() + eventDataHeader := msg.Data().(types.EventDataNewBlockHeader) height := eventDataHeader.Header.Height batch := NewBatch(eventDataHeader.NumTxs) @@ -75,25 +71,63 @@ func (is *Service) OnStart() error { } } - if err := is.blockIdxr.Index(eventDataHeader); err != nil { - is.Logger.Error("failed to index block", "height", height, "err", err) - } else { - is.Logger.Info("indexed block", "height", height) + if !IndexingEnabled(is.eventSinks) { + continue } - if err = is.txIdxr.AddBatch(batch); err != nil { - is.Logger.Error("failed to index block txs", "height", height, "err", err) - } else { - is.Logger.Debug("indexed block txs", "height", height, "num_txs", eventDataHeader.NumTxs) + for _, sink := range is.eventSinks { + if err := sink.IndexBlockEvents(eventDataHeader); err != nil { + is.Logger.Error("failed to index block", "height", height, "err", err) + } else { + is.Logger.Debug("indexed block", "height", height, "sink", sink.Type()) + } + + if len(batch.Ops) > 0 { + err := sink.IndexTxEvents(batch.Ops) + if err != nil { + is.Logger.Error("failed to index block txs", "height", height, "err", err) + } else { + is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type()) + } + } } } }() return nil } -// OnStop implements service.Service by unsubscribing from all transactions. +// OnStop implements service.Service by unsubscribing from all transactions and +// close the eventsink. func (is *Service) OnStop() { if is.eventBus.IsRunning() { _ = is.eventBus.UnsubscribeAll(context.Background(), subscriber) } + + for _, sink := range is.eventSinks { + if err := sink.Stop(); err != nil { + is.Logger.Error("failed to close eventsink", "eventsink", sink.Type(), "err", err) + } + } +} + +// KVSinkEnabled returns the given eventSinks is containing KVEventSink. +func KVSinkEnabled(sinks []EventSink) bool { + for _, sink := range sinks { + if sink.Type() == KV { + return true + } + } + + return false +} + +// IndexingEnabled returns the given eventSinks is supporting the indexing services. +func IndexingEnabled(sinks []EventSink) bool { + for _, sink := range sinks { + if sink.Type() == KV || sink.Type() == PSQL { + return true + } + } + + return false } diff --git a/state/indexer/indexer_service_test.go b/state/indexer/indexer_service_test.go index 8a4b26b4f..68a00afb5 100644 --- a/state/indexer/indexer_service_test.go +++ b/state/indexer/indexer_service_test.go @@ -1,24 +1,45 @@ package indexer_test import ( + "database/sql" + "fmt" + "io/ioutil" + "os" "testing" "time" + "github.com/adlio/schema" + _ "github.com/lib/pq" + dockertest "github.com/ory/dockertest" + "github.com/ory/dockertest/docker" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - db "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/libs/log" + tmlog "github.com/tendermint/tendermint/libs/log" indexer "github.com/tendermint/tendermint/state/indexer" - blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv" - "github.com/tendermint/tendermint/state/indexer/tx/kv" + kv "github.com/tendermint/tendermint/state/indexer/sink/kv" + psql "github.com/tendermint/tendermint/state/indexer/sink/psql" "github.com/tendermint/tendermint/types" + db "github.com/tendermint/tm-db" +) + +var psqldb *sql.DB +var resource *dockertest.Resource +var pSink indexer.EventSink + +var ( + user = "postgres" + password = "secret" + port = "5432" + dsn = "postgres://%s:%s@localhost:%s/%s?sslmode=disable" + dbName = "postgres" ) func TestIndexerServiceIndexesBlocks(t *testing.T) { // event bus eventBus := types.NewEventBus() - eventBus.SetLogger(log.TestingLogger()) + eventBus.SetLogger(tmlog.TestingLogger()) err := eventBus.Start() require.NoError(t, err) t.Cleanup(func() { @@ -27,13 +48,20 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { } }) - // tx indexer + assert.False(t, indexer.KVSinkEnabled([]indexer.EventSink{})) + assert.False(t, indexer.IndexingEnabled([]indexer.EventSink{})) + + // event sink setup + pool, err := setupDB(t) + assert.Nil(t, err) + store := db.NewMemDB() - txIndexer := kv.NewTxIndex(store) - blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events"))) + eventSinks := []indexer.EventSink{kv.NewEventSink(store), pSink} + assert.True(t, indexer.KVSinkEnabled(eventSinks)) + assert.True(t, indexer.IndexingEnabled(eventSinks)) - service := indexer.NewIndexerService(txIndexer, blockIndexer, eventBus) - service.SetLogger(log.TestingLogger()) + service := indexer.NewIndexerService(eventSinks, eventBus) + service.SetLogger(tmlog.TestingLogger()) err = service.Start() require.NoError(t, err) t.Cleanup(func() { @@ -67,15 +95,103 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { time.Sleep(100 * time.Millisecond) - res, err := txIndexer.Get(types.Tx("foo").Hash()) + res, err := eventSinks[0].GetTxByHash(types.Tx("foo").Hash()) require.NoError(t, err) require.Equal(t, txResult1, res) - ok, err := blockIndexer.Has(1) + ok, err := eventSinks[0].HasBlock(1) require.NoError(t, err) require.True(t, ok) - res, err = txIndexer.Get(types.Tx("bar").Hash()) + res, err = eventSinks[0].GetTxByHash(types.Tx("bar").Hash()) require.NoError(t, err) require.Equal(t, txResult2, res) + + assert.Nil(t, teardown(t, pool)) +} + +func readSchema() ([]*schema.Migration, error) { + filename := "./sink/psql/schema.sql" + contents, err := ioutil.ReadFile(filename) + if err != nil { + return nil, fmt.Errorf("failed to read sql file from '%s': %w", filename, err) + } + + mg := &schema.Migration{} + mg.ID = time.Now().Local().String() + " db schema" + mg.Script = string(contents) + return append([]*schema.Migration{}, mg), nil +} + +func resetDB(t *testing.T) { + q := "DROP TABLE IF EXISTS block_events,tx_events,tx_results" + _, err := psqldb.Exec(q) + assert.Nil(t, err) + + q = "DROP TYPE IF EXISTS block_event_type" + _, err = psqldb.Exec(q) + assert.Nil(t, err) +} + +func setupDB(t *testing.T) (*dockertest.Pool, error) { + t.Helper() + pool, err := dockertest.NewPool(os.Getenv("DOCKER_URL")) + assert.Nil(t, err) + + resource, err = pool.RunWithOptions(&dockertest.RunOptions{ + Repository: psql.DriverName, + Tag: "13", + Env: []string{ + "POSTGRES_USER=" + user, + "POSTGRES_PASSWORD=" + password, + "POSTGRES_DB=" + dbName, + "listen_addresses = '*'", + }, + ExposedPorts: []string{port}, + }, func(config *docker.HostConfig) { + // set AutoRemove to true so that stopped container goes away by itself + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{ + Name: "no", + } + }) + + assert.Nil(t, err) + + // Set the container to expire in a minute to avoid orphaned containers + // hanging around + _ = resource.Expire(60) + + conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName) + + if err = pool.Retry(func() error { + var err error + + pSink, psqldb, err = psql.NewEventSink(conn, "test-chainID") + + if err != nil { + return err + } + + return psqldb.Ping() + }); err != nil { + assert.Error(t, err) + } + + resetDB(t) + + sm, err := readSchema() + assert.Nil(t, err) + + err = schema.NewMigrator().Apply(psqldb, sm) + assert.Nil(t, err) + + return pool, nil +} + +func teardown(t *testing.T, pool *dockertest.Pool) error { + t.Helper() + // When you're done, kill and remove the container + assert.Nil(t, pool.Purge(resource)) + return psqldb.Close() } diff --git a/state/indexer/sink/kv/kv.go b/state/indexer/sink/kv/kv.go new file mode 100644 index 000000000..7d51640d8 --- /dev/null +++ b/state/indexer/sink/kv/kv.go @@ -0,0 +1,61 @@ +package kv + +import ( + "context" + + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/state/indexer" + kvb "github.com/tendermint/tendermint/state/indexer/block/kv" + kvt "github.com/tendermint/tendermint/state/indexer/tx/kv" + "github.com/tendermint/tendermint/types" + dbm "github.com/tendermint/tm-db" +) + +var _ indexer.EventSink = (*EventSink)(nil) + +// The EventSink is an aggregator for redirecting the call path of the tx/block kvIndexer. +// For the implementation details please see the kv.go in the indexer/block and indexer/tx folder. +type EventSink struct { + txi *kvt.TxIndex + bi *kvb.BlockerIndexer +} + +func NewEventSink(store dbm.DB) indexer.EventSink { + return &EventSink{ + txi: kvt.NewTxIndex(store), + bi: kvb.New(store), + } +} + +func (kves *EventSink) Type() indexer.EventSinkType { + return indexer.KV +} + +func (kves *EventSink) IndexBlockEvents(bh types.EventDataNewBlockHeader) error { + return kves.bi.Index(bh) +} + +func (kves *EventSink) IndexTxEvents(results []*abci.TxResult) error { + return kves.txi.Index(results) +} + +func (kves *EventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) { + return kves.bi.Search(ctx, q) +} + +func (kves *EventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) { + return kves.txi.Search(ctx, q) +} + +func (kves *EventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) { + return kves.txi.Get(hash) +} + +func (kves *EventSink) HasBlock(h int64) (bool, error) { + return kves.bi.Has(h) +} + +func (kves *EventSink) Stop() error { + return nil +} diff --git a/state/indexer/sink/kv/kv_test.go b/state/indexer/sink/kv/kv_test.go new file mode 100644 index 000000000..a5d2dd81e --- /dev/null +++ b/state/indexer/sink/kv/kv_test.go @@ -0,0 +1,351 @@ +package kv + +import ( + "context" + "fmt" + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/state/indexer" + kvtx "github.com/tendermint/tendermint/state/indexer/tx/kv" + "github.com/tendermint/tendermint/types" + db "github.com/tendermint/tm-db" +) + +func TestType(t *testing.T) { + kvSink := NewEventSink(db.NewMemDB()) + assert.Equal(t, indexer.KV, kvSink.Type()) +} + +func TestStop(t *testing.T) { + kvSink := NewEventSink(db.NewMemDB()) + assert.Nil(t, kvSink.Stop()) +} + +func TestBlockFuncs(t *testing.T) { + store := db.NewPrefixDB(db.NewMemDB(), []byte("block_events")) + indexer := NewEventSink(store) + + require.NoError(t, indexer.IndexBlockEvents(types.EventDataNewBlockHeader{ + Header: types.Header{Height: 1}, + ResultBeginBlock: abci.ResponseBeginBlock{ + Events: []abci.Event{ + { + Type: "begin_event", + Attributes: []abci.EventAttribute{ + { + Key: "proposer", + Value: "FCAA001", + Index: true, + }, + }, + }, + }, + }, + ResultEndBlock: abci.ResponseEndBlock{ + Events: []abci.Event{ + { + Type: "end_event", + Attributes: []abci.EventAttribute{ + { + Key: "foo", + Value: "100", + Index: true, + }, + }, + }, + }, + }, + })) + + b, e := indexer.HasBlock(1) + assert.Nil(t, e) + assert.True(t, b) + + for i := 2; i < 12; i++ { + var index bool + if i%2 == 0 { + index = true + } + + require.NoError(t, indexer.IndexBlockEvents(types.EventDataNewBlockHeader{ + Header: types.Header{Height: int64(i)}, + ResultBeginBlock: abci.ResponseBeginBlock{ + Events: []abci.Event{ + { + Type: "begin_event", + Attributes: []abci.EventAttribute{ + { + Key: "proposer", + Value: "FCAA001", + Index: true, + }, + }, + }, + }, + }, + ResultEndBlock: abci.ResponseEndBlock{ + Events: []abci.Event{ + { + Type: "end_event", + Attributes: []abci.EventAttribute{ + { + Key: "foo", + Value: fmt.Sprintf("%d", i), + Index: index, + }, + }, + }, + }, + }, + })) + } + + testCases := map[string]struct { + q *query.Query + results []int64 + }{ + "block.height = 100": { + q: query.MustParse("block.height = 100"), + results: []int64{}, + }, + "block.height = 5": { + q: query.MustParse("block.height = 5"), + results: []int64{5}, + }, + "begin_event.key1 = 'value1'": { + q: query.MustParse("begin_event.key1 = 'value1'"), + results: []int64{}, + }, + "begin_event.proposer = 'FCAA001'": { + q: query.MustParse("begin_event.proposer = 'FCAA001'"), + results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + }, + "end_event.foo <= 5": { + q: query.MustParse("end_event.foo <= 5"), + results: []int64{2, 4}, + }, + "end_event.foo >= 100": { + q: query.MustParse("end_event.foo >= 100"), + results: []int64{1}, + }, + "block.height > 2 AND end_event.foo <= 8": { + q: query.MustParse("block.height > 2 AND end_event.foo <= 8"), + results: []int64{4, 6, 8}, + }, + "begin_event.proposer CONTAINS 'FFFFFFF'": { + q: query.MustParse("begin_event.proposer CONTAINS 'FFFFFFF'"), + results: []int64{}, + }, + "begin_event.proposer CONTAINS 'FCAA001'": { + q: query.MustParse("begin_event.proposer CONTAINS 'FCAA001'"), + results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + }, + } + + for name, tc := range testCases { + tc := tc + t.Run(name, func(t *testing.T) { + results, err := indexer.SearchBlockEvents(context.Background(), tc.q) + require.NoError(t, err) + require.Equal(t, tc.results, results) + }) + } +} + +func TestTxSearchWithCancelation(t *testing.T) { + indexer := NewEventSink(db.NewMemDB()) + + txResult := txResultWithEvents([]abci.Event{ + {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}}, + {Type: "account", Attributes: []abci.EventAttribute{{Key: "owner", Value: "Ivan", Index: true}}}, + {Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}}, + }) + err := indexer.IndexTxEvents([]*abci.TxResult{txResult}) + require.NoError(t, err) + + r, e := indexer.GetTxByHash(types.Tx("HELLO WORLD").Hash()) + assert.Nil(t, e) + assert.Equal(t, r, txResult) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + results, err := indexer.SearchTxEvents(ctx, query.MustParse("account.number = 1")) + assert.NoError(t, err) + assert.Empty(t, results) +} + +func TestTxSearchDeprecatedIndexing(t *testing.T) { + esdb := db.NewMemDB() + indexer := NewEventSink(esdb) + + // index tx using events indexing (composite key) + txResult1 := txResultWithEvents([]abci.Event{ + {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}}, + }) + hash1 := types.Tx(txResult1.Tx).Hash() + + err := indexer.IndexTxEvents([]*abci.TxResult{txResult1}) + require.NoError(t, err) + + // index tx also using deprecated indexing (event as key) + txResult2 := txResultWithEvents(nil) + txResult2.Tx = types.Tx("HELLO WORLD 2") + + hash2 := types.Tx(txResult2.Tx).Hash() + b := esdb.NewBatch() + + rawBytes, err := proto.Marshal(txResult2) + require.NoError(t, err) + + depKey := []byte(fmt.Sprintf("%s/%s/%d/%d", + "sender", + "addr1", + txResult2.Height, + txResult2.Index, + )) + + err = b.Set(depKey, hash2) + require.NoError(t, err) + err = b.Set(kvtx.KeyFromHeight(txResult2), hash2) + require.NoError(t, err) + err = b.Set(hash2, rawBytes) + require.NoError(t, err) + err = b.Write() + require.NoError(t, err) + + testCases := []struct { + q string + results []*abci.TxResult + }{ + // search by hash + {fmt.Sprintf("tx.hash = '%X'", hash1), []*abci.TxResult{txResult1}}, + // search by hash + {fmt.Sprintf("tx.hash = '%X'", hash2), []*abci.TxResult{txResult2}}, + // search by exact match (one key) + {"account.number = 1", []*abci.TxResult{txResult1}}, + {"account.number >= 1 AND account.number <= 5", []*abci.TxResult{txResult1}}, + // search by range (lower bound) + {"account.number >= 1", []*abci.TxResult{txResult1}}, + // search by range (upper bound) + {"account.number <= 5", []*abci.TxResult{txResult1}}, + // search using not allowed key + {"not_allowed = 'boom'", []*abci.TxResult{}}, + // search for not existing tx result + {"account.number >= 2 AND account.number <= 5", []*abci.TxResult{}}, + // search using not existing key + {"account.date >= TIME 2013-05-03T14:45:00Z", []*abci.TxResult{}}, + // search by deprecated key + {"sender = 'addr1'", []*abci.TxResult{txResult2}}, + } + + ctx := context.Background() + + for _, tc := range testCases { + tc := tc + t.Run(tc.q, func(t *testing.T) { + results, err := indexer.SearchTxEvents(ctx, query.MustParse(tc.q)) + require.NoError(t, err) + for _, txr := range results { + for _, tr := range tc.results { + assert.True(t, proto.Equal(tr, txr)) + } + } + }) + } +} + +func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { + indexer := NewEventSink(db.NewMemDB()) + + txResult := txResultWithEvents([]abci.Event{ + {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}}, + {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "2", Index: true}}}, + }) + + err := indexer.IndexTxEvents([]*abci.TxResult{txResult}) + require.NoError(t, err) + + ctx := context.Background() + + results, err := indexer.SearchTxEvents(ctx, query.MustParse("account.number >= 1")) + assert.NoError(t, err) + + assert.Len(t, results, 1) + for _, txr := range results { + assert.True(t, proto.Equal(txResult, txr)) + } +} + +func TestTxSearchMultipleTxs(t *testing.T) { + indexer := NewEventSink(db.NewMemDB()) + + // indexed first, but bigger height (to test the order of transactions) + txResult := txResultWithEvents([]abci.Event{ + {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}}, + }) + + txResult.Tx = types.Tx("Bob's account") + txResult.Height = 2 + txResult.Index = 1 + err := indexer.IndexTxEvents([]*abci.TxResult{txResult}) + require.NoError(t, err) + + // indexed second, but smaller height (to test the order of transactions) + txResult2 := txResultWithEvents([]abci.Event{ + {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "2", Index: true}}}, + }) + txResult2.Tx = types.Tx("Alice's account") + txResult2.Height = 1 + txResult2.Index = 2 + + err = indexer.IndexTxEvents([]*abci.TxResult{txResult2}) + require.NoError(t, err) + + // indexed third (to test the order of transactions) + txResult3 := txResultWithEvents([]abci.Event{ + {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "3", Index: true}}}, + }) + txResult3.Tx = types.Tx("Jack's account") + txResult3.Height = 1 + txResult3.Index = 1 + err = indexer.IndexTxEvents([]*abci.TxResult{txResult3}) + require.NoError(t, err) + + // indexed fourth (to test we don't include txs with similar events) + // https://github.com/tendermint/tendermint/issues/2908 + txResult4 := txResultWithEvents([]abci.Event{ + {Type: "account", Attributes: []abci.EventAttribute{{Key: "number.id", Value: "1", Index: true}}}, + }) + txResult4.Tx = types.Tx("Mike's account") + txResult4.Height = 2 + txResult4.Index = 2 + err = indexer.IndexTxEvents([]*abci.TxResult{txResult4}) + require.NoError(t, err) + + ctx := context.Background() + + results, err := indexer.SearchTxEvents(ctx, query.MustParse("account.number >= 1")) + assert.NoError(t, err) + + require.Len(t, results, 3) +} + +func txResultWithEvents(events []abci.Event) *abci.TxResult { + tx := types.Tx("HELLO WORLD") + return &abci.TxResult{ + Height: 1, + Index: 0, + Tx: tx, + Result: abci.ResponseDeliverTx{ + Data: []byte{0}, + Code: abci.CodeTypeOK, + Log: "", + Events: events, + }, + } +} diff --git a/state/indexer/sink/null/null.go b/state/indexer/sink/null/null.go new file mode 100644 index 000000000..b5ad93ab4 --- /dev/null +++ b/state/indexer/sink/null/null.go @@ -0,0 +1,51 @@ +package null + +import ( + "context" + + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/types" +) + +var _ indexer.EventSink = (*EventSink)(nil) + +// EventSink implements a no-op indexer. +type EventSink struct{} + +func NewEventSink() indexer.EventSink { + return &EventSink{} +} + +func (nes *EventSink) Type() indexer.EventSinkType { + return indexer.NULL +} + +func (nes *EventSink) IndexBlockEvents(bh types.EventDataNewBlockHeader) error { + return nil +} + +func (nes *EventSink) IndexTxEvents(results []*abci.TxResult) error { + return nil +} + +func (nes *EventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) { + return nil, nil +} + +func (nes *EventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) { + return nil, nil +} + +func (nes *EventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) { + return nil, nil +} + +func (nes *EventSink) HasBlock(h int64) (bool, error) { + return false, nil +} + +func (nes *EventSink) Stop() error { + return nil +} diff --git a/state/indexer/sink/null/null_test.go b/state/indexer/sink/null/null_test.go new file mode 100644 index 000000000..eef63fd6e --- /dev/null +++ b/state/indexer/sink/null/null_test.go @@ -0,0 +1,39 @@ +package null + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/types" +) + +func TestNullEventSink(t *testing.T) { + nullIndexer := NewEventSink() + + assert.Nil(t, nullIndexer.IndexTxEvents(nil)) + assert.Nil(t, nullIndexer.IndexBlockEvents(types.EventDataNewBlockHeader{})) + val1, err1 := nullIndexer.SearchBlockEvents(context.TODO(), nil) + assert.Nil(t, val1) + assert.Nil(t, err1) + val2, err2 := nullIndexer.SearchTxEvents(context.TODO(), nil) + assert.Nil(t, val2) + assert.Nil(t, err2) + val3, err3 := nullIndexer.GetTxByHash(nil) + assert.Nil(t, val3) + assert.Nil(t, err3) + val4, err4 := nullIndexer.HasBlock(0) + assert.False(t, val4) + assert.Nil(t, err4) +} + +func TestType(t *testing.T) { + nullIndexer := NewEventSink() + assert.Equal(t, indexer.NULL, nullIndexer.Type()) +} + +func TestStop(t *testing.T) { + nullIndexer := NewEventSink() + assert.Nil(t, nullIndexer.Stop()) +} diff --git a/state/indexer/sink/psql/psql.go b/state/indexer/sink/psql/psql.go new file mode 100644 index 000000000..0e66d6117 --- /dev/null +++ b/state/indexer/sink/psql/psql.go @@ -0,0 +1,197 @@ +package psql + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + sq "github.com/Masterminds/squirrel" + proto "github.com/gogo/protobuf/proto" + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/types" +) + +var _ indexer.EventSink = (*EventSink)(nil) + +const ( + TableEventBlock = "block_events" + TableEventTx = "tx_events" + TableResultTx = "tx_results" + DriverName = "postgres" +) + +// EventSink is an indexer backend providing the tx/block index services. +type EventSink struct { + store *sql.DB + chainID string +} + +func NewEventSink(connStr string, chainID string) (indexer.EventSink, *sql.DB, error) { + db, err := sql.Open(DriverName, connStr) + if err != nil { + return nil, nil, err + } + + return &EventSink{ + store: db, + chainID: chainID, + }, db, nil +} + +func (es *EventSink) Type() indexer.EventSinkType { + return indexer.PSQL +} + +func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error { + sqlStmt := sq. + Insert(TableEventBlock). + Columns("key", "value", "height", "type", "created_at", "chain_id"). + PlaceholderFormat(sq.Dollar) + + ts := time.Now() + // index the reserved block height index + sqlStmt = sqlStmt. + Values(types.BlockHeightKey, fmt.Sprint(h.Header.Height), h.Header.Height, "", ts, es.chainID) + + // index begin_block events + sqlStmt, err := indexBlockEvents( + sqlStmt, h.ResultBeginBlock.Events, types.EventTypeBeginBlock, h.Header.Height, ts, es.chainID) + if err != nil { + return err + } + + // index end_block events + sqlStmt, err = indexBlockEvents( + sqlStmt, h.ResultEndBlock.Events, types.EventTypeEndBlock, h.Header.Height, ts, es.chainID) + if err != nil { + return err + } + + _, err = sqlStmt.RunWith(es.store).Exec() + return err +} + +func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error { + // index the tx result + var txid uint32 + sqlStmtTxResult := sq. + Insert(TableResultTx). + Columns("tx_result", "created_at"). + PlaceholderFormat(sq.Dollar). + RunWith(es.store). + Suffix("RETURNING \"id\"") + + sqlStmtEvents := sq. + Insert(TableEventTx). + Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id"). + PlaceholderFormat(sq.Dollar) + + ts := time.Now() + for _, tx := range txr { + txBz, err := proto.Marshal(tx) + if err != nil { + return err + } + + sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts) + + // execute sqlStmtTxResult db query and retrieve the txid + err = sqlStmtTxResult.QueryRow().Scan(&txid) + if err != nil { + return err + } + + // index the reserved height and hash indices + hash := fmt.Sprintf("%X", types.Tx(tx.Tx).Hash()) + + sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txid, ts, es.chainID) + sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, fmt.Sprint(tx.Height), tx.Height, hash, txid, ts, es.chainID) + + for _, event := range tx.Result.Events { + // only index events with a non-empty type + if len(event.Type) == 0 { + continue + } + + for _, attr := range event.Attributes { + if len(attr.Key) == 0 { + continue + } + + // index if `index: true` is set + compositeTag := fmt.Sprintf("%s.%s", event.Type, attr.Key) + + // ensure event does not conflict with a reserved prefix key + if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey { + return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag) + } + + if attr.GetIndex() { + sqlStmtEvents = sqlStmtEvents.Values(compositeTag, attr.Value, tx.Height, hash, txid, ts, es.chainID) + } + } + } + } + + // execute sqlStmtEvents db query... + _, err := sqlStmtEvents.RunWith(es.store).Exec() + return err +} + +func (es *EventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) { + return nil, errors.New("block search is not supported via the postgres event sink") +} + +func (es *EventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) { + return nil, errors.New("tx search is not supported via the postgres event sink") +} + +func (es *EventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) { + return nil, errors.New("getTxByHash is not supported via the postgres event sink") +} + +func (es *EventSink) HasBlock(h int64) (bool, error) { + return false, errors.New("hasBlock is not supported via the postgres event sink") +} + +func indexBlockEvents( + sqlStmt sq.InsertBuilder, + events []abci.Event, + ty string, + height int64, + ts time.Time, + chainID string, +) (sq.InsertBuilder, error) { + for _, event := range events { + // only index events with a non-empty type + if len(event.Type) == 0 { + continue + } + + for _, attr := range event.Attributes { + if len(attr.Key) == 0 { + continue + } + + // index iff the event specified index:true and it's not a reserved event + compositeKey := fmt.Sprintf("%s.%s", event.Type, attr.Key) + if compositeKey == types.BlockHeightKey { + return sqlStmt, fmt.Errorf( + "event type and attribute key \"%s\" is reserved; please use a different key", compositeKey) + } + + if attr.GetIndex() { + sqlStmt = sqlStmt.Values(compositeKey, attr.Value, height, ty, ts, chainID) + } + } + } + return sqlStmt, nil +} + +func (es *EventSink) Stop() error { + return es.store.Close() +} diff --git a/state/indexer/sink/psql/psql_test.go b/state/indexer/sink/psql/psql_test.go new file mode 100644 index 000000000..591d23edd --- /dev/null +++ b/state/indexer/sink/psql/psql_test.go @@ -0,0 +1,365 @@ +package psql + +import ( + "context" + "database/sql" + "errors" + "fmt" + "io/ioutil" + "os" + "testing" + "time" + + sq "github.com/Masterminds/squirrel" + schema "github.com/adlio/schema" + proto "github.com/gogo/protobuf/proto" + _ "github.com/lib/pq" + dockertest "github.com/ory/dockertest" + "github.com/ory/dockertest/docker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/types" +) + +var db *sql.DB +var resource *dockertest.Resource +var chainID = "test-chainID" + +var ( + user = "postgres" + password = "secret" + port = "5432" + dsn = "postgres://%s:%s@localhost:%s/%s?sslmode=disable" + dbName = "postgres" +) + +func TestType(t *testing.T) { + pool, err := setupDB(t) + require.NoError(t, err) + + psqlSink := &EventSink{store: db, chainID: chainID} + assert.Equal(t, indexer.PSQL, psqlSink.Type()) + require.NoError(t, teardown(t, pool)) +} + +func TestBlockFuncs(t *testing.T) { + pool, err := setupDB(t) + require.NoError(t, err) + + indexer := &EventSink{store: db, chainID: chainID} + require.NoError(t, indexer.IndexBlockEvents(getTestBlockHeader())) + + r, err := verifyBlock(1) + assert.True(t, r) + require.NoError(t, err) + + r, err = verifyBlock(2) + assert.False(t, r) + require.NoError(t, err) + + r, err = indexer.HasBlock(1) + assert.False(t, r) + assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err) + + r, err = indexer.HasBlock(2) + assert.False(t, r) + assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err) + + r2, err := indexer.SearchBlockEvents(context.TODO(), nil) + assert.Nil(t, r2) + assert.Equal(t, errors.New("block search is not supported via the postgres event sink"), err) + + require.NoError(t, verifyTimeStamp(TableEventBlock)) + require.NoError(t, teardown(t, pool)) +} + +func TestTxFuncs(t *testing.T) { + pool, err := setupDB(t) + assert.Nil(t, err) + + indexer := &EventSink{store: db, chainID: chainID} + + txResult := txResultWithEvents([]abci.Event{ + {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}}, + {Type: "account", Attributes: []abci.EventAttribute{{Key: "owner", Value: "Ivan", Index: true}}}, + {Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}}, + }) + err = indexer.IndexTxEvents([]*abci.TxResult{txResult}) + require.NoError(t, err) + + tx, err := verifyTx(types.Tx(txResult.Tx).Hash()) + require.NoError(t, err) + assert.Equal(t, txResult, tx) + + require.NoError(t, verifyTimeStamp(TableEventTx)) + require.NoError(t, verifyTimeStamp(TableResultTx)) + + tx, err = indexer.GetTxByHash(types.Tx(txResult.Tx).Hash()) + assert.Nil(t, tx) + assert.Equal(t, errors.New("getTxByHash is not supported via the postgres event sink"), err) + + r2, err := indexer.SearchTxEvents(context.TODO(), nil) + assert.Nil(t, r2) + assert.Equal(t, errors.New("tx search is not supported via the postgres event sink"), err) + + assert.Nil(t, teardown(t, pool)) +} + +func TestStop(t *testing.T) { + pool, err := setupDB(t) + require.NoError(t, err) + + indexer := &EventSink{store: db} + require.NoError(t, indexer.Stop()) + + defer db.Close() + require.NoError(t, pool.Purge(resource)) +} + +func getTestBlockHeader() types.EventDataNewBlockHeader { + return types.EventDataNewBlockHeader{ + Header: types.Header{Height: 1}, + ResultBeginBlock: abci.ResponseBeginBlock{ + Events: []abci.Event{ + { + Type: "begin_event", + Attributes: []abci.EventAttribute{ + { + Key: "proposer", + Value: "FCAA001", + Index: true, + }, + }, + }, + }, + }, + ResultEndBlock: abci.ResponseEndBlock{ + Events: []abci.Event{ + { + Type: "end_event", + Attributes: []abci.EventAttribute{ + { + Key: "foo", + Value: "100", + Index: true, + }, + }, + }, + }, + }, + } +} + +func readSchema() ([]*schema.Migration, error) { + + filename := "schema.sql" + contents, err := ioutil.ReadFile(filename) + if err != nil { + return nil, fmt.Errorf("failed to read sql file from '%s': %w", filename, err) + } + + mg := &schema.Migration{} + mg.ID = time.Now().Local().String() + " db schema" + mg.Script = string(contents) + return append([]*schema.Migration{}, mg), nil +} + +func resetDB(t *testing.T) { + q := "DROP TABLE IF EXISTS block_events,tx_events,tx_results" + _, err := db.Exec(q) + + require.NoError(t, err) + + q = "DROP TYPE IF EXISTS block_event_type" + _, err = db.Exec(q) + require.NoError(t, err) +} + +func txResultWithEvents(events []abci.Event) *abci.TxResult { + tx := types.Tx("HELLO WORLD") + return &abci.TxResult{ + Height: 1, + Index: 0, + Tx: tx, + Result: abci.ResponseDeliverTx{ + Data: []byte{0}, + Code: abci.CodeTypeOK, + Log: "", + Events: events, + }, + } +} + +func verifyTx(hash []byte) (*abci.TxResult, error) { + join := fmt.Sprintf("%s ON %s.id = tx_result_id", TableEventTx, TableResultTx) + sqlStmt := sq. + Select("tx_result", fmt.Sprintf("%s.id", TableResultTx), "tx_result_id", "hash", "chain_id"). + Distinct().From(TableResultTx). + InnerJoin(join). + Where(fmt.Sprintf("hash = $1 AND chain_id = '%s'", chainID), fmt.Sprintf("%X", hash)) + + rows, err := sqlStmt.RunWith(db).Query() + if err != nil { + return nil, err + } + + defer rows.Close() + + if rows.Next() { + var txResult []byte + var txResultID, txid int + var h, cid string + err = rows.Scan(&txResult, &txResultID, &txid, &h, &cid) + if err != nil { + return nil, nil + } + + msg := new(abci.TxResult) + err = proto.Unmarshal(txResult, msg) + if err != nil { + return nil, err + } + + return msg, err + } + + // No result + return nil, nil +} + +func verifyTimeStamp(tb string) error { + + // We assume the tx indexing time would not exceed 2 second from now + sqlStmt := sq. + Select(fmt.Sprintf("%s.created_at", tb)). + Distinct().From(tb). + Where(fmt.Sprintf("%s.created_at >= $1", tb), time.Now().Add(-2*time.Second)) + + rows, err := sqlStmt.RunWith(db).Query() + if err != nil { + return err + } + + defer rows.Close() + + if rows.Next() { + var ts string + err = rows.Scan(&ts) + if err != nil { + return err + } + + return nil + } + + return errors.New("no result") +} + +func verifyBlock(h int64) (bool, error) { + sqlStmt := sq. + Select("height"). + Distinct(). + From(TableEventBlock). + Where(fmt.Sprintf("height = %d", h)) + rows, err := sqlStmt.RunWith(db).Query() + if err != nil { + return false, err + } + + defer rows.Close() + + if !rows.Next() { + return false, nil + } + + sqlStmt = sq. + Select("type, height", "chain_id"). + Distinct(). + From(TableEventBlock). + Where(fmt.Sprintf("height = %d AND type = '%s' AND chain_id = '%s'", h, types.EventTypeBeginBlock, chainID)) + + rows, err = sqlStmt.RunWith(db).Query() + if err != nil { + return false, err + } + + if !rows.Next() { + return false, nil + } + + sqlStmt = sq. + Select("type, height"). + Distinct(). + From(TableEventBlock). + Where(fmt.Sprintf("height = %d AND type = '%s'", h, types.EventTypeEndBlock)) + rows, err = sqlStmt.RunWith(db).Query() + + if err != nil { + return false, err + } + + return rows.Next(), nil +} + +func setupDB(t *testing.T) (*dockertest.Pool, error) { + t.Helper() + pool, err := dockertest.NewPool(os.Getenv("DOCKER_URL")) + + require.NoError(t, err) + + resource, err = pool.RunWithOptions(&dockertest.RunOptions{ + Repository: DriverName, + Tag: "13", + Env: []string{ + "POSTGRES_USER=" + user, + "POSTGRES_PASSWORD=" + password, + "POSTGRES_DB=" + dbName, + "listen_addresses = '*'", + }, + ExposedPorts: []string{port}, + }, func(config *docker.HostConfig) { + // set AutoRemove to true so that stopped container goes away by itself + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{ + Name: "no", + } + }) + + require.NoError(t, err) + + // Set the container to expire in a minute to avoid orphaned containers + // hanging around + _ = resource.Expire(60) + + conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName) + + if err = pool.Retry(func() error { + var err error + + _, db, err = NewEventSink(conn, chainID) + + if err != nil { + return err + } + + return db.Ping() + }); err != nil { + require.NoError(t, err) + } + + resetDB(t) + + sm, err := readSchema() + assert.Nil(t, err) + assert.Nil(t, schema.NewMigrator().Apply(db, sm)) + return pool, nil +} + +func teardown(t *testing.T, pool *dockertest.Pool) error { + t.Helper() + // When you're done, kill and remove the container + assert.Nil(t, pool.Purge(resource)) + return db.Close() +} diff --git a/state/indexer/sink/psql/schema.sql b/state/indexer/sink/psql/schema.sql new file mode 100644 index 000000000..36e7327cc --- /dev/null +++ b/state/indexer/sink/psql/schema.sql @@ -0,0 +1,31 @@ +CREATE TYPE block_event_type AS ENUM ('begin_block', 'end_block', ''); +CREATE TABLE block_events ( + id SERIAL PRIMARY KEY, + key VARCHAR NOT NULL, + value VARCHAR NOT NULL, + height INTEGER NOT NULL, + type block_event_type, + created_at TIMESTAMPTZ NOT NULL, + chain_id VARCHAR NOT NULL +); +CREATE TABLE tx_results ( + id SERIAL PRIMARY KEY, + tx_result BYTEA NOT NULL, + created_at TIMESTAMPTZ NOT NULL +); +CREATE TABLE tx_events ( + id SERIAL PRIMARY KEY, + key VARCHAR NOT NULL, + value VARCHAR NOT NULL, + height INTEGER NOT NULL, + hash VARCHAR NOT NULL, + tx_result_id SERIAL, + created_at TIMESTAMPTZ NOT NULL, + chain_id VARCHAR NOT NULL, + FOREIGN KEY (tx_result_id) + REFERENCES tx_results(id) + ON DELETE CASCADE +); +CREATE INDEX idx_block_events_key_value ON block_events(key, value); +CREATE INDEX idx_tx_events_key_value ON tx_events(key, value); +CREATE INDEX idx_tx_events_hash ON tx_events(hash); diff --git a/state/indexer/tx/kv/kv.go b/state/indexer/tx/kv/kv.go index e07393656..5d310eea7 100644 --- a/state/indexer/tx/kv/kv.go +++ b/state/indexer/tx/kv/kv.go @@ -58,25 +58,25 @@ func (txi *TxIndex) Get(hash []byte) (*abci.TxResult, error) { return txResult, nil } -// AddBatch indexes a batch of transactions using the given list of events. Each -// key that indexed from the tx's events is a composite of the event type and -// the respective attribute's key delimited by a "." (eg. "account.number"). +// Index indexes transactions using the given list of events. Each key +// that indexed from the tx's events is a composite of the event type and the +// respective attribute's key delimited by a "." (eg. "account.number"). // Any event with an empty type is not indexed. -func (txi *TxIndex) AddBatch(b *indexer.Batch) error { - storeBatch := txi.store.NewBatch() - defer storeBatch.Close() +func (txi *TxIndex) Index(results []*abci.TxResult) error { + b := txi.store.NewBatch() + defer b.Close() - for _, result := range b.Ops { + for _, result := range results { hash := types.Tx(result.Tx).Hash() // index tx by events - err := txi.indexEvents(result, hash, storeBatch) + err := txi.indexEvents(result, hash, b) if err != nil { return err } // index by height (always) - err = storeBatch.Set(keyFromHeight(result), hash) + err = b.Set(KeyFromHeight(result), hash) if err != nil { return err } @@ -86,47 +86,12 @@ func (txi *TxIndex) AddBatch(b *indexer.Batch) error { return err } // index by hash (always) - err = storeBatch.Set(primaryKey(hash), rawBytes) + err = b.Set(primaryKey(hash), rawBytes) if err != nil { return err } } - return storeBatch.WriteSync() -} - -// Index indexes a single transaction using the given list of events. Each key -// that indexed from the tx's events is a composite of the event type and the -// respective attribute's key delimited by a "." (eg. "account.number"). -// Any event with an empty type is not indexed. -func (txi *TxIndex) Index(result *abci.TxResult) error { - b := txi.store.NewBatch() - defer b.Close() - - hash := types.Tx(result.Tx).Hash() - - // index tx by events - err := txi.indexEvents(result, hash, b) - if err != nil { - return err - } - - // index by height (always) - err = b.Set(keyFromHeight(result), hash) - if err != nil { - return err - } - - rawBytes, err := proto.Marshal(result) - if err != nil { - return err - } - // index by hash (always) - err = b.Set(primaryKey(hash), rawBytes) - if err != nil { - return err - } - return b.WriteSync() } @@ -584,7 +549,7 @@ func keyFromEvent(compositeKey string, value string, result *abci.TxResult) []by return secondaryKey(compositeKey, value, result.Height, result.Index) } -func keyFromHeight(result *abci.TxResult) []byte { +func KeyFromHeight(result *abci.TxResult) []byte { return secondaryKey(types.TxHeightKey, fmt.Sprintf("%d", result.Height), result.Height, result.Index) } diff --git a/state/indexer/tx/kv/kv_bench_test.go b/state/indexer/tx/kv/kv_bench_test.go index 9ad4922a0..3f4e63ee1 100644 --- a/state/indexer/tx/kv/kv_bench_test.go +++ b/state/indexer/tx/kv/kv_bench_test.go @@ -55,7 +55,7 @@ func BenchmarkTxSearch(b *testing.B) { }, } - if err := indexer.Index(txResult); err != nil { + if err := indexer.Index([]*abci.TxResult{txResult}); err != nil { b.Errorf("failed to index tx: %s", err) } } diff --git a/state/indexer/tx/kv/kv_test.go b/state/indexer/tx/kv/kv_test.go index 16ca079d3..dd63dd9a4 100644 --- a/state/indexer/tx/kv/kv_test.go +++ b/state/indexer/tx/kv/kv_test.go @@ -39,7 +39,7 @@ func TestTxIndex(t *testing.T) { if err := batch.Add(txResult); err != nil { t.Error(err) } - err := txIndexer.AddBatch(batch) + err := txIndexer.Index(batch.Ops) require.NoError(t, err) loadedTxResult, err := txIndexer.Get(hash) @@ -58,7 +58,7 @@ func TestTxIndex(t *testing.T) { } hash2 := tx2.Hash() - err = txIndexer.Index(txResult2) + err = txIndexer.Index([]*abci.TxResult{txResult2}) require.NoError(t, err) loadedTxResult2, err := txIndexer.Get(hash2) @@ -76,7 +76,7 @@ func TestTxSearch(t *testing.T) { }) hash := types.Tx(txResult.Tx).Hash() - err := indexer.Index(txResult) + err := indexer.Index([]*abci.TxResult{txResult}) require.NoError(t, err) testCases := []struct { @@ -154,7 +154,7 @@ func TestTxSearchWithCancelation(t *testing.T) { {Type: "account", Attributes: []abci.EventAttribute{{Key: "owner", Value: "Ivan", Index: true}}}, {Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}}, }) - err := indexer.Index(txResult) + err := indexer.Index([]*abci.TxResult{txResult}) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -173,7 +173,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) { }) hash1 := types.Tx(txResult1.Tx).Hash() - err := indexer.Index(txResult1) + err := indexer.Index([]*abci.TxResult{txResult1}) require.NoError(t, err) // index tx also using deprecated indexing (event as key) @@ -195,7 +195,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) { err = b.Set(depKey, hash2) require.NoError(t, err) - err = b.Set(keyFromHeight(txResult2), hash2) + err = b.Set(KeyFromHeight(txResult2), hash2) require.NoError(t, err) err = b.Set(hash2, rawBytes) require.NoError(t, err) @@ -251,7 +251,7 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { {Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "2", Index: true}}}, }) - err := indexer.Index(txResult) + err := indexer.Index([]*abci.TxResult{txResult}) require.NoError(t, err) ctx := context.Background() @@ -276,7 +276,7 @@ func TestTxSearchMultipleTxs(t *testing.T) { txResult.Tx = types.Tx("Bob's account") txResult.Height = 2 txResult.Index = 1 - err := indexer.Index(txResult) + err := indexer.Index([]*abci.TxResult{txResult}) require.NoError(t, err) // indexed second, but smaller height (to test the order of transactions) @@ -287,7 +287,7 @@ func TestTxSearchMultipleTxs(t *testing.T) { txResult2.Height = 1 txResult2.Index = 2 - err = indexer.Index(txResult2) + err = indexer.Index([]*abci.TxResult{txResult2}) require.NoError(t, err) // indexed third (to test the order of transactions) @@ -297,7 +297,7 @@ func TestTxSearchMultipleTxs(t *testing.T) { txResult3.Tx = types.Tx("Jack's account") txResult3.Height = 1 txResult3.Index = 1 - err = indexer.Index(txResult3) + err = indexer.Index([]*abci.TxResult{txResult3}) require.NoError(t, err) // indexed fourth (to test we don't include txs with similar events) @@ -308,7 +308,7 @@ func TestTxSearchMultipleTxs(t *testing.T) { txResult4.Tx = types.Tx("Mike's account") txResult4.Height = 2 txResult4.Index = 2 - err = indexer.Index(txResult4) + err = indexer.Index([]*abci.TxResult{txResult4}) require.NoError(t, err) ctx := context.Background() @@ -367,7 +367,7 @@ func benchmarkTxIndex(txsCount int64, b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - err = txIndexer.AddBatch(batch) + err = txIndexer.Index(batch.Ops) } if err != nil { b.Fatal(err) diff --git a/state/indexer/tx/null/null.go b/state/indexer/tx/null/null.go index 051a4a714..d92ed489e 100644 --- a/state/indexer/tx/null/null.go +++ b/state/indexer/tx/null/null.go @@ -25,7 +25,7 @@ func (txi *TxIndex) AddBatch(batch *indexer.Batch) error { } // Index is a noop and always returns nil. -func (txi *TxIndex) Index(result *abci.TxResult) error { +func (txi *TxIndex) Index(results []*abci.TxResult) error { return nil } diff --git a/types/events.go b/types/events.go index b926ca828..684160d6f 100644 --- a/types/events.go +++ b/types/events.go @@ -141,6 +141,9 @@ const ( // BlockHeightKey is a reserved key used for indexing BeginBlock and Endblock // events. BlockHeightKey = "block.height" + + EventTypeBeginBlock = "begin_block" + EventTypeEndBlock = "end_block" ) var (