Browse Source

config/indexer: custom event indexing (#6411)

pull/6500/head
JayT106 3 years ago
committed by GitHub
parent
commit
711a718162
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1760 additions and 277 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +10
    -4
      config/config.go
  3. +8
    -2
      config/toml.go
  4. +75
    -47
      docs/architecture/adr-065-custom-event-indexing.md
  5. +4
    -0
      go.mod
  6. +39
    -0
      go.sum
  7. +9
    -11
      node/node.go
  8. +95
    -0
      node/node_test.go
  9. +60
    -32
      node/setup.go
  10. +12
    -7
      rpc/core/blocks.go
  11. +1
    -2
      rpc/core/env.go
  12. +84
    -76
      rpc/core/tx.go
  13. +54
    -0
      state/indexer/eventsink.go
  14. +4
    -5
      state/indexer/indexer.go
  15. +52
    -18
      state/indexer/indexer_service.go
  16. +129
    -13
      state/indexer/indexer_service_test.go
  17. +61
    -0
      state/indexer/sink/kv/kv.go
  18. +351
    -0
      state/indexer/sink/kv/kv_test.go
  19. +51
    -0
      state/indexer/sink/null/null.go
  20. +39
    -0
      state/indexer/sink/null/null_test.go
  21. +197
    -0
      state/indexer/sink/psql/psql.go
  22. +365
    -0
      state/indexer/sink/psql/psql_test.go
  23. +31
    -0
      state/indexer/sink/psql/schema.sql
  24. +11
    -46
      state/indexer/tx/kv/kv.go
  25. +1
    -1
      state/indexer/tx/kv/kv_bench_test.go
  26. +12
    -12
      state/indexer/tx/kv/kv_test.go
  27. +1
    -1
      state/indexer/tx/null/null.go
  28. +3
    -0
      types/events.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -64,6 +64,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
accomodate for the new p2p stack. Removes the notion of seeds and crawling. All peer
exchange reactors behave the same. (@cmwaters)
- [crypto] \#6376 Enable sr25519 as a validator key
- [config/indexer] \#6411 Introduce support for custom event indexing data sources, specifically PostgreSQL. (@JayT106)
### IMPROVEMENTS


+ 10
- 4
config/config.go View File

@ -1058,19 +1058,25 @@ func (cfg *ConsensusConfig) ValidateBasic() error {
// TxIndexConfig defines the configuration for the transaction indexer,
// including composite keys to index.
type TxIndexConfig struct {
// What indexer to use for transactions
// The backend database list to back the indexer.
// If list contains `null`, meaning no indexer service will be used.
//
// Options:
// 1) "null"
// 1) "null" - no indexer services.
// 2) "kv" (default) - the simplest possible indexer,
// backed by key-value storage (defaults to levelDB; see DBBackend).
Indexer string `mapstructure:"indexer"`
// 3) "psql" - the indexer services backed by PostgreSQL.
Indexer []string `mapstructure:"indexer"`
// The PostgreSQL connection configuration, the connection format:
// postgresql://<user>:<password>@<host>:<port>/<db>?<opts>
PsqlConn string `mapstructure:"psql-conn"`
}
// DefaultTxIndexConfig returns a default configuration for the transaction indexer.
func DefaultTxIndexConfig() *TxIndexConfig {
return &TxIndexConfig{
Indexer: "kv",
Indexer: []string{"kv"},
}
}


+ 8
- 2
config/toml.go View File

@ -469,7 +469,8 @@ peer-query-maj23-sleep-duration = "{{ .Consensus.PeerQueryMaj23SleepDuration }}"
#######################################################
[tx-index]
# What indexer to use for transactions
# The backend database list to back the indexer.
# If list contains null, meaning no indexer service will be used.
#
# The application will set which txs to index. In some cases a node operator will be able
# to decide which txs to index based on configuration set in the application.
@ -478,7 +479,12 @@ peer-query-maj23-sleep-duration = "{{ .Consensus.PeerQueryMaj23SleepDuration }}"
# 1) "null"
# 2) "kv" (default) - the simplest possible indexer, backed by key-value storage (defaults to levelDB; see DBBackend).
# - When "kv" is chosen "tx.height" and "tx.hash" will always be indexed.
indexer = "{{ .TxIndex.Indexer }}"
# 3) "psql" - the indexer services backed by PostgreSQL.
indexer = [{{ range $i, $e := .TxIndex.Indexer }}{{if $i}}, {{end}}{{ printf "%q" $e}}{{end}}]
# The PostgreSQL connection configuration, the connection format:
# postgresql://<user>:<password>@<host>:<port>/<db>?<opts>
psql-conn = "{{ .TxIndex.PsqlConn }}"
#######################################################
### Instrumentation Configuration Options ###


+ 75
- 47
docs/architecture/adr-065-custom-event-indexing.md View File

@ -23,6 +23,7 @@
- April 1, 2021: Initial Draft (@alexanderbez)
- April 28, 2021: Specify search capabilities are only supported through the KV indexer (@marbar3778)
- May 19, 2021: Update the SQL schema and the eventsink interface (@jayt106)
## Status
@ -96,13 +97,16 @@ The interface is defined as follows:
```go
type EventSink interface {
IndexBlockEvents(types.EventDataNewBlockHeader) error
IndexTxEvents(*abci.TxResult) error
IndexTxEvents([]*abci.TxResult) error
SearchBlockEvents(context.Context, *query.Query) ([]int64, error)
SearchTxEvents(context.Context, *query.Query) ([]*abci.TxResult, error)
GetTxByHash([]byte) (*abci.TxResult, error)
HasBlock(int64) (bool, error)
Type() EventSinkType
Stop() error
}
```
@ -136,33 +140,41 @@ This type of `EventSink` indexes block and transaction events into a [PostgreSQL
database. We define and automatically migrate the following schema when the
`IndexerService` starts.
The postgres eventsink will not support `tx_search` and `block_search`.
The postgres eventsink will not support `tx_search`, `block_search`, `GetTxByHash` and `HasBlock`.
```sql
-- Table Definition ----------------------------------------------
CREATE TYPE IF NOT EXISTS block_event_type AS ENUM ('begin_block', 'end_block');
CREATE TYPE block_event_type AS ENUM ('begin_block', 'end_block', '');
CREATE TABLE IF NOT EXISTS block_events (
CREATE TABLE block_events (
id SERIAL PRIMARY KEY,
key VARCHAR NOT NULL,
value VARCHAR NOT NULL,
height INTEGER NOT NULL,
type block_event_type
type block_event_type,
created_at TIMESTAMPTZ NOT NULL,
chain_id VARCHAR NOT NULL
);
CREATE TABLE IF NOT EXISTS tx_results {
id SERIAL PRIMARY KEY,
tx_result BYTEA NOT NULL
}
CREATE TABLE tx_results (
id SERIAL PRIMARY KEY,
tx_result BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL
);
CREATE TABLE IF NOT EXISTS tx_events (
CREATE TABLE tx_events (
id SERIAL PRIMARY KEY,
key VARCHAR NOT NULL,
value VARCHAR NOT NULL,
height INTEGER NOT NULL,
hash VARCHAR NOT NULL,
FOREIGN KEY (tx_result_id) REFERENCES tx_results(id) ON DELETE CASCADE
tx_result_id SERIAL,
created_at TIMESTAMPTZ NOT NULL,
chain_id VARCHAR NOT NULL,
FOREIGN KEY (tx_result_id)
REFERENCES tx_results(id)
ON DELETE CASCADE
);
-- Indices -------------------------------------------------------
@ -177,7 +189,7 @@ The `PSQLEventSink` will implement the `EventSink` interface as follows
```go
func NewPSQLEventSink(connStr string) (*PSQLEventSink, error) {
func NewPSQLEventSink(connStr string, chainID string) (*PSQLEventSink, error) {
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, err
@ -187,10 +199,11 @@ func NewPSQLEventSink(connStr string) (*PSQLEventSink, error) {
}
func (es *PSQLEventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error {
sqlStmt := sq.Insert("block_events").Columns("key", "value", "height", "type")
sqlStmt := sq.Insert("block_events").Columns("key", "value", "height", "type", "created_at", "chain_id")
// index the reserved block height index
sqlStmt = sqlStmt.Values(types.BlockHeightKey, h.Header.Height, h.Header.Height, "")
ts := time.Now()
sqlStmt = sqlStmt.Values(types.BlockHeightKey, h.Header.Height, h.Header.Height, "", ts, es.chainID)
for _, event := range h.ResultBeginBlock.Events {
// only index events with a non-empty type
@ -210,7 +223,7 @@ func (es *PSQLEventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error
}
if attr.GetIndex() {
sqlStmt = sqlStmt.Values(compositeKey, string(attr.Value), h.Header.Height, BlockEventTypeBeginBlock)
sqlStmt = sqlStmt.Values(compositeKey, string(attr.Value), h.Header.Height, BlockEventTypeBeginBlock, ts, es.chainID)
}
}
}
@ -219,51 +232,58 @@ func (es *PSQLEventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error
// execute sqlStmt db query...
}
func (es *PSQLEventSink) IndexTxEvents(txr *abci.TxResult) error {
sqlStmtEvents := sq.Insert("tx_events").Columns("key", "value", "height", "hash", "tx_result_id")
sqlStmtTxResult := sq.Insert("tx_results").Columns("tx_result")
func (es *PSQLEventSink) IndexTxEvents(txr []*abci.TxResult) error {
sqlStmtEvents := sq.Insert("tx_events").Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id")
sqlStmtTxResult := sq.Insert("tx_results").Columns("tx_result", "created_at")
// store the tx result
txBz, err := proto.Marshal(txr)
if err != nil {
return err
}
ts := time.Now()
for _, tx := range txr {
// store the tx result
txBz, err := proto.Marshal(tx)
if err != nil {
return err
}
sqlStmtTxResult = sqlStmtTxResult.Values(txBz)
sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts)
// execute sqlStmtTxResult db query...
// execute sqlStmtTxResult db query...
var txID uint32
err = sqlStmtTxResult.QueryRow().Scan(&txID)
if err != nil {
return err
}
// index the reserved height and hash indices
hash := types.Tx(txr.Tx).Hash()
sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, txr.Height, hash, txrID)
sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, txr.Height, txr.Height, hash, txrID)
// index the reserved height and hash indices
hash := types.Tx(tx.Tx).Hash()
sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txID, ts, es.chainID)
sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, tx.Height, tx.Height, hash, txID, ts, es.chainID)
for _, event := range result.Result.Events {
// only index events with a non-empty type
if len(event.Type) == 0 {
continue
}
for _, attr := range event.Attributes {
if len(attr.Key) == 0 {
for _, event := range result.Result.Events {
// only index events with a non-empty type
if len(event.Type) == 0 {
continue
}
// index if `index: true` is set
compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
for _, attr := range event.Attributes {
if len(attr.Key) == 0 {
continue
}
// index if `index: true` is set
compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
// ensure event does not conflict with a reserved prefix key
if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey {
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag)
}
// ensure event does not conflict with a reserved prefix key
if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey {
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag)
}
if attr.GetIndex() {
sqlStmtEvents = sqlStmtEvents.Values(compositeKey, string(attr.Value), txr.Height, hash, txrID)
if attr.GetIndex() {
sqlStmtEvents = sqlStmtEvents.Values(compositeKey, string(attr.Value), tx.Height, hash, txID, ts, es.chainID)
}
}
}
}
// execute sqlStmtEvents db query...
}
@ -274,6 +294,14 @@ func (es *PSQLEventSink) SearchBlockEvents(ctx context.Context, q *query.Query)
func (es *PSQLEventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) {
return nil, errors.New("tx search is not supported via the postgres event sink")
}
func (es *PSQLEventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) {
return nil, errors.New("getTxByHash is not supported via the postgres event sink")
}
func (es *PSQLEventSink) HasBlock(h int64) (bool, error) {
return false, errors.New("hasBlock is not supported via the postgres event sink")
}
```
### Configuration


+ 4
- 0
go.mod View File

@ -5,7 +5,9 @@ go 1.15
require (
github.com/BurntSushi/toml v0.3.1
github.com/ChainSafe/go-schnorrkel v0.0.0-20210222182958-bd440c890782
github.com/Masterminds/squirrel v1.5.0
github.com/Workiva/go-datastructures v1.0.53
github.com/adlio/schema v1.1.13
github.com/btcsuite/btcd v0.21.0-beta
github.com/btcsuite/btcutil v1.0.2
github.com/confio/ics23/go v0.6.6
@ -21,8 +23,10 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/gtank/merlin v0.1.1
github.com/hdevalence/ed25519consensus v0.0.0-20210204194344-59a8610d2b87
github.com/lib/pq v1.10.1
github.com/libp2p/go-buffer-pool v0.0.2
github.com/minio/highwayhash v1.0.2
github.com/ory/dockertest v3.3.5+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.10.0
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0


+ 39
- 0
go.sum View File

@ -13,6 +13,8 @@ cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiy
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
filippo.io/edwards25519 v1.0.0-beta.2 h1:/BZRNzm8N4K4eWfK28dL4yescorxtO7YG1yun8fy+pI=
filippo.io/edwards25519 v1.0.0-beta.2/go.mod h1:X+pm78QAUPtFLi1z9PYIlS/bdDnvbCOGKtZ+ACWEf7o=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
@ -22,6 +24,12 @@ github.com/ChainSafe/go-schnorrkel v0.0.0-20210222182958-bd440c890782/go.mod h1:
github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Masterminds/squirrel v1.5.0 h1:JukIZisrUXadA9pl3rMkjhiamxiB0cXiu+HGp/Y8cY8=
github.com/Masterminds/squirrel v1.5.0/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10=
github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU=
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
@ -31,6 +39,8 @@ github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/
github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/Workiva/go-datastructures v1.0.53 h1:J6Y/52yX10Xc5JjXmGtWoSSxs3mZnGSaq37xZZh7Yig=
github.com/Workiva/go-datastructures v1.0.53/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A=
github.com/adlio/schema v1.1.13 h1:LeNMVg5Z1FX+Qgz8tJUijBLRdcpbFUElz+d1489On98=
github.com/adlio/schema v1.1.13/go.mod h1:L5Z7tw+7lRK1Fnpi/LT/ooCP1elkXn0krMWBQHUhEDE=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@ -70,6 +80,7 @@ github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
@ -86,6 +97,8 @@ github.com/confio/ics23/go v0.0.0-20200817220745-f173e6211efb/go.mod h1:E45Nqnlp
github.com/confio/ics23/go v0.6.3/go.mod h1:E45NqnlpxGnpfTWL/xauN7MRwEE28T4Dd4uraToOaKg=
github.com/confio/ics23/go v0.6.6 h1:pkOy18YxxJ/r0XFDCnrl4Bjv6h4LkBSpLS6F38mrKL8=
github.com/confio/ics23/go v0.6.6/go.mod h1:E45NqnlpxGnpfTWL/xauN7MRwEE28T4Dd4uraToOaKg=
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6 h1:NmTXa/uVnDyp0TY5MKi197+3HWcnYWfnHGyaFthlnGw=
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
@ -121,6 +134,10 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
@ -227,6 +244,7 @@ github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.2.1/go.mod h1:EaizFBKfUKtMIF5iaDEhniwNedqGo9FuLFzppDr3uwI=
@ -300,12 +318,20 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw=
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.1 h1:6VXZrLU0jHBYyAqrSPa+MgPfnSvTPuMgK+k0o5kVFWo=
github.com/lib/pq v1.10.1/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
@ -367,6 +393,12 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ=
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVojFA6h/TRcI=
github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
github.com/opencontainers/runc v0.1.1 h1:GlxAyO6x8rfZYN9Tt0Kti5a/cP41iuiO2yYT0IJGY8Y=
github.com/opencontainers/runc v0.1.1/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U=
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
@ -375,6 +407,8 @@ github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxS
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
github.com/ory/dockertest v3.3.5+incompatible h1:iLLK6SQwIhcbrG783Dghaaa3WPzGc+4Emza6EbVUUGA=
github.com/ory/dockertest v3.3.5+incompatible/go.mod h1:1vX4m9wsvi00u5bseYwXaSnhNrne+V0E6LAcBILJdPs=
github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
@ -447,7 +481,9 @@ github.com/sasha-s/go-deadlock v0.2.1-0.20190427202633-1595213edefa/go.mod h1:F7
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
@ -598,6 +634,7 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200421231249-e086a090c8fd/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
@ -641,6 +678,7 @@ golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -795,6 +833,7 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=


+ 9
- 11
node/node.go View File

@ -15,6 +15,7 @@ import (
"github.com/rs/cors"
dbm "github.com/tendermint/tm-db"
_ "github.com/lib/pq" // provide the psql db driver
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
cs "github.com/tendermint/tendermint/consensus"
@ -83,8 +84,7 @@ type Node struct {
evidencePool *evidence.Pool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
txIndexer indexer.TxIndexer
blockIndexer indexer.BlockIndexer
eventSinks []indexer.EventSink
indexerService *indexer.Service
prometheusSrv *http.Server
}
@ -166,7 +166,7 @@ func NewNode(config *cfg.Config,
return nil, err
}
indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config, dbProvider, eventBus, logger)
indexerService, eventSinks, err := createAndStartIndexerService(config, dbProvider, eventBus, logger, genDoc.ChainID)
if err != nil {
return nil, err
}
@ -232,7 +232,7 @@ func NewNode(config *cfg.Config,
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
// TODO: Use a persistent peer database.
nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state)
nodeInfo, err := makeNodeInfo(config, nodeKey, eventSinks, genDoc, state)
if err != nil {
return nil, err
}
@ -437,10 +437,9 @@ func NewNode(config *cfg.Config,
evidenceReactor: evReactor,
evidencePool: evPool,
proxyApp: proxyApp,
txIndexer: txIndexer,
indexerService: indexerService,
blockIndexer: blockIndexer,
eventBus: eventBus,
eventSinks: eventSinks,
}
node.BaseService = *service.NewBaseService(logger, "Node", node)
@ -810,8 +809,7 @@ func (n *Node) ConfigureRPC() (*rpccore.Environment, error) {
P2PTransport: n,
GenDoc: n.genesisDoc,
TxIndexer: n.txIndexer,
BlockIndexer: n.blockIndexer,
EventSinks: n.eventSinks,
ConsensusReactor: n.consensusReactor,
EventBus: n.eventBus,
Mempool: n.mempool,
@ -1040,9 +1038,9 @@ func (n *Node) Config() *cfg.Config {
return n.config
}
// TxIndexer returns the Node's TxIndexer.
func (n *Node) TxIndexer() indexer.TxIndexer {
return n.txIndexer
// EventSinks returns the Node's event indexing sinks.
func (n *Node) EventSinks() []indexer.EventSink {
return n.eventSinks
}
//------------------------------------------------------------------------------


+ 95
- 0
node/node_test.go View File

@ -2,6 +2,7 @@ package node
import (
"context"
"errors"
"fmt"
"math"
"net"
@ -29,6 +30,7 @@ import (
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
@ -527,6 +529,99 @@ func TestNodeNewSeedNode(t *testing.T) {
assert.True(t, n.pexReactor.IsRunning())
}
func TestNodeSetEventSink(t *testing.T) {
config := cfg.ResetTestRoot("node_app_version_test")
defer os.RemoveAll(config.RootDir)
// create & start node
n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.KV, n.eventSinks[0].Type())
config.TxIndex.Indexer = []string{"null"}
n, err = DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
config.TxIndex.Indexer = []string{"null", "kv"}
n, err = DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
config.TxIndex.Indexer = []string{"kvv"}
n, err = DefaultNewNode(config, log.TestingLogger())
assert.Nil(t, n)
assert.Equal(t, errors.New("unsupported event sink type"), err)
config.TxIndex.Indexer = []string{}
n, err = DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
config.TxIndex.Indexer = []string{"psql"}
n, err = DefaultNewNode(config, log.TestingLogger())
assert.Nil(t, n)
assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err)
var psqlConn = "test"
config.TxIndex.Indexer = []string{"psql"}
config.TxIndex.PsqlConn = psqlConn
n, err = DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
n.OnStop()
config.TxIndex.Indexer = []string{"psql", "kv"}
config.TxIndex.PsqlConn = psqlConn
n, err = DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.Equal(t, 2, len(n.eventSinks))
// we use map to filter the duplicated sinks, so it's not guarantee the order when append sinks.
if n.eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type())
} else {
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
assert.Equal(t, indexer.KV, n.eventSinks[1].Type())
}
n.OnStop()
config.TxIndex.Indexer = []string{"kv", "psql"}
config.TxIndex.PsqlConn = psqlConn
n, err = DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.Equal(t, 2, len(n.eventSinks))
if n.eventSinks[0].Type() == indexer.KV {
assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type())
} else {
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
assert.Equal(t, indexer.KV, n.eventSinks[1].Type())
}
n.OnStop()
var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
config.TxIndex.Indexer = []string{"psql", "kv", "Kv"}
config.TxIndex.PsqlConn = psqlConn
_, err = DefaultNewNode(config, log.TestingLogger())
require.Error(t, err)
assert.Equal(t, e, err)
config.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
config.TxIndex.PsqlConn = psqlConn
_, err = DefaultNewNode(config, log.TestingLogger())
require.Error(t, err)
assert.Equal(t, e, err)
}
func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
privVals := make([]types.PrivValidator, nVals)
vals := make([]types.GenesisValidator, nVals)


+ 60
- 32
node/setup.go View File

@ -3,10 +3,12 @@ package node
import (
"bytes"
"context"
"errors"
"fmt"
"math"
"net"
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
"strings"
"time"
dbm "github.com/tendermint/tm-db"
@ -20,7 +22,7 @@ import (
"github.com/tendermint/tendermint/evidence"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/libs/strings"
tmStrings "github.com/tendermint/tendermint/libs/strings"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/pex"
@ -28,10 +30,9 @@ import (
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv"
blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null"
"github.com/tendermint/tendermint/state/indexer/tx/kv"
"github.com/tendermint/tendermint/state/indexer/tx/null"
kv "github.com/tendermint/tendermint/state/indexer/sink/kv"
null "github.com/tendermint/tendermint/state/indexer/sink/null"
psql "github.com/tendermint/tendermint/state/indexer/sink/psql"
"github.com/tendermint/tendermint/statesync"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
@ -73,35 +74,61 @@ func createAndStartIndexerService(
dbProvider DBProvider,
eventBus *types.EventBus,
logger log.Logger,
) (*indexer.Service, indexer.TxIndexer, indexer.BlockIndexer, error) {
chainID string,
) (*indexer.Service, []indexer.EventSink, error) {
var (
txIndexer indexer.TxIndexer
blockIndexer indexer.BlockIndexer
)
eventSinks := []indexer.EventSink{}
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, nil, nil, err
// Check duplicated sinks.
sinks := map[string]bool{}
for _, s := range config.TxIndex.Indexer {
sl := strings.ToLower(s)
if sinks[sl] {
return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
}
sinks[sl] = true
}
loop:
for k := range sinks {
switch k {
case string(indexer.NULL):
// when we see null in the config, the eventsinks will be reset with the nullEventSink.
eventSinks = []indexer.EventSink{null.NewEventSink()}
break loop
case string(indexer.KV):
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, nil, err
}
eventSinks = append(eventSinks, kv.NewEventSink(store))
case string(indexer.PSQL):
conn := config.TxIndex.PsqlConn
if conn == "" {
return nil, nil, errors.New("the psql connection settings cannot be empty")
}
es, _, err := psql.NewEventSink(conn, chainID)
if err != nil {
return nil, nil, err
}
eventSinks = append(eventSinks, es)
default:
return nil, nil, errors.New("unsupported event sink type")
}
}
txIndexer = kv.NewTxIndex(store)
blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events")))
default:
txIndexer = &null.TxIndex{}
blockIndexer = &blockidxnull.BlockerIndexer{}
if len(eventSinks) == 0 {
eventSinks = []indexer.EventSink{null.NewEventSink()}
}
indexerService := indexer.NewIndexerService(txIndexer, blockIndexer, eventBus)
indexerService := indexer.NewIndexerService(eventSinks, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))
if err := indexerService.Start(); err != nil {
return nil, nil, nil, err
return nil, nil, err
}
return indexerService, txIndexer, blockIndexer, nil
return indexerService, eventSinks, nil
}
func doHandshake(
@ -381,7 +408,7 @@ func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport
logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{},
p2p.MConnTransportOptions{
MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers +
len(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")),
len(tmStrings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")),
),
},
)
@ -418,7 +445,7 @@ func createPeerManager(
}
privatePeerIDs := make(map[p2p.NodeID]struct{})
for _, id := range strings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") {
for _, id := range tmStrings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") {
privatePeerIDs[p2p.NodeID(id)] = struct{}{}
}
@ -434,7 +461,7 @@ func createPeerManager(
}
peers := []p2p.NodeAddress{}
for _, p := range strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") {
for _, p := range tmStrings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
@ -444,7 +471,7 @@ func createPeerManager(
options.PersistentPeers = append(options.PersistentPeers, address.NodeID)
}
for _, p := range strings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") {
for _, p := range tmStrings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
@ -611,7 +638,7 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
sw *p2p.Switch, logger log.Logger) *pex.Reactor {
reactorConfig := &pex.ReactorConfig{
Seeds: strings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "),
Seeds: tmStrings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "),
SeedMode: config.Mode == cfg.ModeSeed,
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
// blocks assuming 10s blocks ~ 28 hours.
@ -647,13 +674,14 @@ func createPEXReactorV2(
func makeNodeInfo(
config *cfg.Config,
nodeKey p2p.NodeKey,
txIndexer indexer.TxIndexer,
eventSinks []indexer.EventSink,
genDoc *types.GenesisDoc,
state sm.State,
) (p2p.NodeInfo, error) {
txIndexerStatus := "on"
if _, ok := txIndexer.(*null.TxIndex); ok {
txIndexerStatus = "off"
txIndexerStatus := "off"
if indexer.IndexingEnabled(eventSinks) {
txIndexerStatus = "on"
}
var bcChannel byte


+ 12
- 7
rpc/core/blocks.go View File

@ -1,7 +1,6 @@
package core
import (
"errors"
"fmt"
"sort"
@ -9,7 +8,7 @@ import (
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
)
@ -181,9 +180,8 @@ func (env *Environment) BlockSearch(
orderBy string,
) (*ctypes.ResultBlockSearch, error) {
// skip if block indexing is disabled
if _, ok := env.BlockIndexer.(*blockidxnull.BlockerIndexer); ok {
return nil, errors.New("block indexing is disabled")
if !indexer.KVSinkEnabled(env.EventSinks) {
return nil, fmt.Errorf("block searching is disabled due to no kvEventSink")
}
q, err := tmquery.New(query)
@ -191,7 +189,14 @@ func (env *Environment) BlockSearch(
return nil, err
}
results, err := env.BlockIndexer.Search(ctx.Context(), q)
var kvsink indexer.EventSink
for _, sink := range env.EventSinks {
if sink.Type() == indexer.KV {
kvsink = sink
}
}
results, err := kvsink.SearchBlockEvents(ctx.Context(), q)
if err != nil {
return nil, err
}
@ -205,7 +210,7 @@ func (env *Environment) BlockSearch(
sort.Slice(results, func(i, j int) bool { return results[i] < results[j] })
default:
return nil, fmt.Errorf("%w: expected order_by to be either `asc` or `desc` or empty", ctypes.ErrInvalidRequest)
return nil, fmt.Errorf("expected order_by to be either `asc` or `desc` or empty: %w", ctypes.ErrInvalidRequest)
}
// paginate results


+ 1
- 2
rpc/core/env.go View File

@ -77,8 +77,7 @@ type Environment struct {
// objects
PubKey crypto.PubKey
GenDoc *types.GenesisDoc // cache the genesis structure
TxIndexer indexer.TxIndexer
BlockIndexer indexer.BlockIndexer
EventSinks []indexer.EventSink
ConsensusReactor *consensus.Reactor
EventBus *types.EventBus // thread safe
Mempool mempl.Mempool


+ 84
- 76
rpc/core/tx.go View File

@ -9,7 +9,7 @@ import (
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
"github.com/tendermint/tendermint/state/indexer/tx/null"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
)
@ -19,36 +19,39 @@ import (
// More: https://docs.tendermint.com/master/rpc/#/Info/tx
func (env *Environment) Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) {
// if index is disabled, return error
if _, ok := env.TxIndexer.(*null.TxIndex); ok {
return nil, fmt.Errorf("transaction indexing is disabled")
}
r, err := env.TxIndexer.Get(hash)
if err != nil {
return nil, err
if !indexer.KVSinkEnabled(env.EventSinks) {
return nil, errors.New("transaction querying is disabled due to no kvEventSink")
}
if r == nil {
return nil, fmt.Errorf("tx (%X) not found", hash)
}
for _, sink := range env.EventSinks {
if sink.Type() == indexer.KV {
r, err := sink.GetTxByHash(hash)
if r == nil {
return nil, fmt.Errorf("tx (%X) not found, err: %w", hash, err)
}
height := r.Height
index := r.Index
height := r.Height
index := r.Index
var proof types.TxProof
if prove {
block := env.BlockStore.LoadBlock(height)
proof = block.Data.Txs.Proof(int(index)) // XXX: overflow on 32-bit machines
var proof types.TxProof
if prove {
block := env.BlockStore.LoadBlock(height)
proof = block.Data.Txs.Proof(int(index)) // XXX: overflow on 32-bit machines
}
return &ctypes.ResultTx{
Hash: hash,
Height: height,
Index: index,
TxResult: r.Result,
Tx: r.Tx,
Proof: proof,
}, nil
}
}
return &ctypes.ResultTx{
Hash: hash,
Height: height,
Index: index,
TxResult: r.Result,
Tx: r.Tx,
Proof: proof,
}, nil
return nil, fmt.Errorf("transaction querying is disabled on this node due to the KV event sink being disabled")
}
// TxSearch allows you to query for multiple transactions results. It returns a
@ -62,9 +65,8 @@ func (env *Environment) TxSearch(
orderBy string,
) (*ctypes.ResultTxSearch, error) {
// if index is disabled, return error
if _, ok := env.TxIndexer.(*null.TxIndex); ok {
return nil, errors.New("transaction indexing is disabled")
if !indexer.KVSinkEnabled(env.EventSinks) {
return nil, fmt.Errorf("transaction searching is disabled due to no kvEventSink")
}
q, err := tmquery.New(query)
@ -72,62 +74,68 @@ func (env *Environment) TxSearch(
return nil, err
}
results, err := env.TxIndexer.Search(ctx.Context(), q)
if err != nil {
return nil, err
}
// sort results (must be done before pagination)
switch orderBy {
case "desc", "":
sort.Slice(results, func(i, j int) bool {
if results[i].Height == results[j].Height {
return results[i].Index > results[j].Index
for _, sink := range env.EventSinks {
if sink.Type() == indexer.KV {
results, err := sink.SearchTxEvents(ctx.Context(), q)
if err != nil {
return nil, err
}
return results[i].Height > results[j].Height
})
case "asc":
sort.Slice(results, func(i, j int) bool {
if results[i].Height == results[j].Height {
return results[i].Index < results[j].Index
// sort results (must be done before pagination)
switch orderBy {
case "desc", "":
sort.Slice(results, func(i, j int) bool {
if results[i].Height == results[j].Height {
return results[i].Index > results[j].Index
}
return results[i].Height > results[j].Height
})
case "asc":
sort.Slice(results, func(i, j int) bool {
if results[i].Height == results[j].Height {
return results[i].Index < results[j].Index
}
return results[i].Height < results[j].Height
})
default:
return nil, fmt.Errorf("expected order_by to be either `asc` or `desc` or empty: %w", ctypes.ErrInvalidRequest)
}
return results[i].Height < results[j].Height
})
default:
return nil, fmt.Errorf("%w: expected order_by to be either `asc` or `desc` or empty", ctypes.ErrInvalidRequest)
}
// paginate results
totalCount := len(results)
perPage := env.validatePerPage(perPagePtr)
// paginate results
totalCount := len(results)
perPage := env.validatePerPage(perPagePtr)
page, err := validatePage(pagePtr, perPage, totalCount)
if err != nil {
return nil, err
}
skipCount := validateSkipCount(page, perPage)
pageSize := tmmath.MinInt(perPage, totalCount-skipCount)
page, err := validatePage(pagePtr, perPage, totalCount)
if err != nil {
return nil, err
}
apiResults := make([]*ctypes.ResultTx, 0, pageSize)
for i := skipCount; i < skipCount+pageSize; i++ {
r := results[i]
skipCount := validateSkipCount(page, perPage)
pageSize := tmmath.MinInt(perPage, totalCount-skipCount)
apiResults := make([]*ctypes.ResultTx, 0, pageSize)
for i := skipCount; i < skipCount+pageSize; i++ {
r := results[i]
var proof types.TxProof
if prove {
block := env.BlockStore.LoadBlock(r.Height)
proof = block.Data.Txs.Proof(int(r.Index)) // XXX: overflow on 32-bit machines
}
apiResults = append(apiResults, &ctypes.ResultTx{
Hash: types.Tx(r.Tx).Hash(),
Height: r.Height,
Index: r.Index,
TxResult: r.Result,
Tx: r.Tx,
Proof: proof,
})
}
var proof types.TxProof
if prove {
block := env.BlockStore.LoadBlock(r.Height)
proof = block.Data.Txs.Proof(int(r.Index)) // XXX: overflow on 32-bit machines
return &ctypes.ResultTxSearch{Txs: apiResults, TotalCount: totalCount}, nil
}
apiResults = append(apiResults, &ctypes.ResultTx{
Hash: types.Tx(r.Tx).Hash(),
Height: r.Height,
Index: r.Index,
TxResult: r.Result,
Tx: r.Tx,
Proof: proof,
})
}
return &ctypes.ResultTxSearch{Txs: apiResults, TotalCount: totalCount}, nil
return nil, fmt.Errorf("transaction searching is disabled on this node due to the KV event sink being disabled")
}

+ 54
- 0
state/indexer/eventsink.go View File

@ -0,0 +1,54 @@
package indexer
import (
"context"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/types"
)
type EventSinkType string
const (
NULL EventSinkType = "null"
KV EventSinkType = "kv"
PSQL EventSinkType = "psql"
)
// EventSink interface is defined the APIs for the IndexerService to interact with the data store,
// including the block/transaction indexing and the search functions.
//
// The IndexerService will accept a list of one or more EventSink types. During the OnStart method
// it will call the appropriate APIs on each EventSink to index both block and transaction events.
type EventSink interface {
// IndexBlockEvents indexes the blockheader.
IndexBlockEvents(types.EventDataNewBlockHeader) error
// IndexTxEvents indexes the given result of transactions. To call it with multi transactions,
// must guarantee the index of given transactions are in order.
IndexTxEvents([]*abci.TxResult) error
// SearchBlockEvents provides the block search by given query conditions. This function only
// supported by the kvEventSink.
SearchBlockEvents(context.Context, *query.Query) ([]int64, error)
// SearchTxEvents provides the transaction search by given query conditions. This function only
// supported by the kvEventSink.
SearchTxEvents(context.Context, *query.Query) ([]*abci.TxResult, error)
// GetTxByHash provides the transaction search by given transaction hash. This function only
// supported by the kvEventSink.
GetTxByHash([]byte) (*abci.TxResult, error)
// HasBlock provides the transaction search by given transaction hash. This function only
// supported by the kvEventSink.
HasBlock(int64) (bool, error)
// Type checks the eventsink structure type.
Type() EventSinkType
// Stop will close the data store connection, if the eventsink supports it.
Stop() error
}

+ 4
- 5
state/indexer/indexer.go View File

@ -11,11 +11,10 @@ import (
// TxIndexer interface defines methods to index and search transactions.
type TxIndexer interface {
// AddBatch analyzes, indexes and stores a batch of transactions.
AddBatch(b *Batch) error
// Index analyzes, indexes and stores a single transaction.
Index(result *abci.TxResult) error
// Index analyzes, indexes and stores transactions. For indexing multiple
// Transacions must guarantee the Index of the TxResult is in order.
// See Batch struct.
Index(results []*abci.TxResult) error
// Get returns the transaction specified by hash or nil if the transaction is not indexed
// or stored.


+ 52
- 18
state/indexer/indexer_service.go View File

@ -18,19 +18,14 @@ const (
type Service struct {
service.BaseService
txIdxr TxIndexer
blockIdxr BlockIndexer
eventBus *types.EventBus
eventSinks []EventSink
eventBus *types.EventBus
}
// NewIndexerService returns a new service instance.
func NewIndexerService(
txIdxr TxIndexer,
blockIdxr BlockIndexer,
eventBus *types.EventBus,
) *Service {
func NewIndexerService(es []EventSink, eventBus *types.EventBus) *Service {
is := &Service{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus}
is := &Service{eventSinks: es, eventBus: eventBus}
is.BaseService = *service.NewBaseService(nil, "IndexerService", is)
return is
}
@ -57,6 +52,7 @@ func (is *Service) OnStart() error {
go func() {
for {
msg := <-blockHeadersSub.Out()
eventDataHeader := msg.Data().(types.EventDataNewBlockHeader)
height := eventDataHeader.Header.Height
batch := NewBatch(eventDataHeader.NumTxs)
@ -75,25 +71,63 @@ func (is *Service) OnStart() error {
}
}
if err := is.blockIdxr.Index(eventDataHeader); err != nil {
is.Logger.Error("failed to index block", "height", height, "err", err)
} else {
is.Logger.Info("indexed block", "height", height)
if !IndexingEnabled(is.eventSinks) {
continue
}
if err = is.txIdxr.AddBatch(batch); err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err)
} else {
is.Logger.Debug("indexed block txs", "height", height, "num_txs", eventDataHeader.NumTxs)
for _, sink := range is.eventSinks {
if err := sink.IndexBlockEvents(eventDataHeader); err != nil {
is.Logger.Error("failed to index block", "height", height, "err", err)
} else {
is.Logger.Debug("indexed block", "height", height, "sink", sink.Type())
}
if len(batch.Ops) > 0 {
err := sink.IndexTxEvents(batch.Ops)
if err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err)
} else {
is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type())
}
}
}
}
}()
return nil
}
// OnStop implements service.Service by unsubscribing from all transactions.
// OnStop implements service.Service by unsubscribing from all transactions and
// close the eventsink.
func (is *Service) OnStop() {
if is.eventBus.IsRunning() {
_ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
}
for _, sink := range is.eventSinks {
if err := sink.Stop(); err != nil {
is.Logger.Error("failed to close eventsink", "eventsink", sink.Type(), "err", err)
}
}
}
// KVSinkEnabled returns the given eventSinks is containing KVEventSink.
func KVSinkEnabled(sinks []EventSink) bool {
for _, sink := range sinks {
if sink.Type() == KV {
return true
}
}
return false
}
// IndexingEnabled returns the given eventSinks is supporting the indexing services.
func IndexingEnabled(sinks []EventSink) bool {
for _, sink := range sinks {
if sink.Type() == KV || sink.Type() == PSQL {
return true
}
}
return false
}

+ 129
- 13
state/indexer/indexer_service_test.go View File

@ -1,24 +1,45 @@
package indexer_test
import (
"database/sql"
"fmt"
"io/ioutil"
"os"
"testing"
"time"
"github.com/adlio/schema"
_ "github.com/lib/pq"
dockertest "github.com/ory/dockertest"
"github.com/ory/dockertest/docker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
db "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
tmlog "github.com/tendermint/tendermint/libs/log"
indexer "github.com/tendermint/tendermint/state/indexer"
blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv"
"github.com/tendermint/tendermint/state/indexer/tx/kv"
kv "github.com/tendermint/tendermint/state/indexer/sink/kv"
psql "github.com/tendermint/tendermint/state/indexer/sink/psql"
"github.com/tendermint/tendermint/types"
db "github.com/tendermint/tm-db"
)
var psqldb *sql.DB
var resource *dockertest.Resource
var pSink indexer.EventSink
var (
user = "postgres"
password = "secret"
port = "5432"
dsn = "postgres://%s:%s@localhost:%s/%s?sslmode=disable"
dbName = "postgres"
)
func TestIndexerServiceIndexesBlocks(t *testing.T) {
// event bus
eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger())
eventBus.SetLogger(tmlog.TestingLogger())
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -27,13 +48,20 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
}
})
// tx indexer
assert.False(t, indexer.KVSinkEnabled([]indexer.EventSink{}))
assert.False(t, indexer.IndexingEnabled([]indexer.EventSink{}))
// event sink setup
pool, err := setupDB(t)
assert.Nil(t, err)
store := db.NewMemDB()
txIndexer := kv.NewTxIndex(store)
blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events")))
eventSinks := []indexer.EventSink{kv.NewEventSink(store), pSink}
assert.True(t, indexer.KVSinkEnabled(eventSinks))
assert.True(t, indexer.IndexingEnabled(eventSinks))
service := indexer.NewIndexerService(txIndexer, blockIndexer, eventBus)
service.SetLogger(log.TestingLogger())
service := indexer.NewIndexerService(eventSinks, eventBus)
service.SetLogger(tmlog.TestingLogger())
err = service.Start()
require.NoError(t, err)
t.Cleanup(func() {
@ -67,15 +95,103 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
time.Sleep(100 * time.Millisecond)
res, err := txIndexer.Get(types.Tx("foo").Hash())
res, err := eventSinks[0].GetTxByHash(types.Tx("foo").Hash())
require.NoError(t, err)
require.Equal(t, txResult1, res)
ok, err := blockIndexer.Has(1)
ok, err := eventSinks[0].HasBlock(1)
require.NoError(t, err)
require.True(t, ok)
res, err = txIndexer.Get(types.Tx("bar").Hash())
res, err = eventSinks[0].GetTxByHash(types.Tx("bar").Hash())
require.NoError(t, err)
require.Equal(t, txResult2, res)
assert.Nil(t, teardown(t, pool))
}
func readSchema() ([]*schema.Migration, error) {
filename := "./sink/psql/schema.sql"
contents, err := ioutil.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("failed to read sql file from '%s': %w", filename, err)
}
mg := &schema.Migration{}
mg.ID = time.Now().Local().String() + " db schema"
mg.Script = string(contents)
return append([]*schema.Migration{}, mg), nil
}
func resetDB(t *testing.T) {
q := "DROP TABLE IF EXISTS block_events,tx_events,tx_results"
_, err := psqldb.Exec(q)
assert.Nil(t, err)
q = "DROP TYPE IF EXISTS block_event_type"
_, err = psqldb.Exec(q)
assert.Nil(t, err)
}
func setupDB(t *testing.T) (*dockertest.Pool, error) {
t.Helper()
pool, err := dockertest.NewPool(os.Getenv("DOCKER_URL"))
assert.Nil(t, err)
resource, err = pool.RunWithOptions(&dockertest.RunOptions{
Repository: psql.DriverName,
Tag: "13",
Env: []string{
"POSTGRES_USER=" + user,
"POSTGRES_PASSWORD=" + password,
"POSTGRES_DB=" + dbName,
"listen_addresses = '*'",
},
ExposedPorts: []string{port},
}, func(config *docker.HostConfig) {
// set AutoRemove to true so that stopped container goes away by itself
config.AutoRemove = true
config.RestartPolicy = docker.RestartPolicy{
Name: "no",
}
})
assert.Nil(t, err)
// Set the container to expire in a minute to avoid orphaned containers
// hanging around
_ = resource.Expire(60)
conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName)
if err = pool.Retry(func() error {
var err error
pSink, psqldb, err = psql.NewEventSink(conn, "test-chainID")
if err != nil {
return err
}
return psqldb.Ping()
}); err != nil {
assert.Error(t, err)
}
resetDB(t)
sm, err := readSchema()
assert.Nil(t, err)
err = schema.NewMigrator().Apply(psqldb, sm)
assert.Nil(t, err)
return pool, nil
}
func teardown(t *testing.T, pool *dockertest.Pool) error {
t.Helper()
// When you're done, kill and remove the container
assert.Nil(t, pool.Purge(resource))
return psqldb.Close()
}

+ 61
- 0
state/indexer/sink/kv/kv.go View File

@ -0,0 +1,61 @@
package kv
import (
"context"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/indexer"
kvb "github.com/tendermint/tendermint/state/indexer/block/kv"
kvt "github.com/tendermint/tendermint/state/indexer/tx/kv"
"github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tm-db"
)
var _ indexer.EventSink = (*EventSink)(nil)
// The EventSink is an aggregator for redirecting the call path of the tx/block kvIndexer.
// For the implementation details please see the kv.go in the indexer/block and indexer/tx folder.
type EventSink struct {
txi *kvt.TxIndex
bi *kvb.BlockerIndexer
}
func NewEventSink(store dbm.DB) indexer.EventSink {
return &EventSink{
txi: kvt.NewTxIndex(store),
bi: kvb.New(store),
}
}
func (kves *EventSink) Type() indexer.EventSinkType {
return indexer.KV
}
func (kves *EventSink) IndexBlockEvents(bh types.EventDataNewBlockHeader) error {
return kves.bi.Index(bh)
}
func (kves *EventSink) IndexTxEvents(results []*abci.TxResult) error {
return kves.txi.Index(results)
}
func (kves *EventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) {
return kves.bi.Search(ctx, q)
}
func (kves *EventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) {
return kves.txi.Search(ctx, q)
}
func (kves *EventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) {
return kves.txi.Get(hash)
}
func (kves *EventSink) HasBlock(h int64) (bool, error) {
return kves.bi.Has(h)
}
func (kves *EventSink) Stop() error {
return nil
}

+ 351
- 0
state/indexer/sink/kv/kv_test.go View File

@ -0,0 +1,351 @@
package kv
import (
"context"
"fmt"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/indexer"
kvtx "github.com/tendermint/tendermint/state/indexer/tx/kv"
"github.com/tendermint/tendermint/types"
db "github.com/tendermint/tm-db"
)
func TestType(t *testing.T) {
kvSink := NewEventSink(db.NewMemDB())
assert.Equal(t, indexer.KV, kvSink.Type())
}
func TestStop(t *testing.T) {
kvSink := NewEventSink(db.NewMemDB())
assert.Nil(t, kvSink.Stop())
}
func TestBlockFuncs(t *testing.T) {
store := db.NewPrefixDB(db.NewMemDB(), []byte("block_events"))
indexer := NewEventSink(store)
require.NoError(t, indexer.IndexBlockEvents(types.EventDataNewBlockHeader{
Header: types.Header{Height: 1},
ResultBeginBlock: abci.ResponseBeginBlock{
Events: []abci.Event{
{
Type: "begin_event",
Attributes: []abci.EventAttribute{
{
Key: "proposer",
Value: "FCAA001",
Index: true,
},
},
},
},
},
ResultEndBlock: abci.ResponseEndBlock{
Events: []abci.Event{
{
Type: "end_event",
Attributes: []abci.EventAttribute{
{
Key: "foo",
Value: "100",
Index: true,
},
},
},
},
},
}))
b, e := indexer.HasBlock(1)
assert.Nil(t, e)
assert.True(t, b)
for i := 2; i < 12; i++ {
var index bool
if i%2 == 0 {
index = true
}
require.NoError(t, indexer.IndexBlockEvents(types.EventDataNewBlockHeader{
Header: types.Header{Height: int64(i)},
ResultBeginBlock: abci.ResponseBeginBlock{
Events: []abci.Event{
{
Type: "begin_event",
Attributes: []abci.EventAttribute{
{
Key: "proposer",
Value: "FCAA001",
Index: true,
},
},
},
},
},
ResultEndBlock: abci.ResponseEndBlock{
Events: []abci.Event{
{
Type: "end_event",
Attributes: []abci.EventAttribute{
{
Key: "foo",
Value: fmt.Sprintf("%d", i),
Index: index,
},
},
},
},
},
}))
}
testCases := map[string]struct {
q *query.Query
results []int64
}{
"block.height = 100": {
q: query.MustParse("block.height = 100"),
results: []int64{},
},
"block.height = 5": {
q: query.MustParse("block.height = 5"),
results: []int64{5},
},
"begin_event.key1 = 'value1'": {
q: query.MustParse("begin_event.key1 = 'value1'"),
results: []int64{},
},
"begin_event.proposer = 'FCAA001'": {
q: query.MustParse("begin_event.proposer = 'FCAA001'"),
results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
},
"end_event.foo <= 5": {
q: query.MustParse("end_event.foo <= 5"),
results: []int64{2, 4},
},
"end_event.foo >= 100": {
q: query.MustParse("end_event.foo >= 100"),
results: []int64{1},
},
"block.height > 2 AND end_event.foo <= 8": {
q: query.MustParse("block.height > 2 AND end_event.foo <= 8"),
results: []int64{4, 6, 8},
},
"begin_event.proposer CONTAINS 'FFFFFFF'": {
q: query.MustParse("begin_event.proposer CONTAINS 'FFFFFFF'"),
results: []int64{},
},
"begin_event.proposer CONTAINS 'FCAA001'": {
q: query.MustParse("begin_event.proposer CONTAINS 'FCAA001'"),
results: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
},
}
for name, tc := range testCases {
tc := tc
t.Run(name, func(t *testing.T) {
results, err := indexer.SearchBlockEvents(context.Background(), tc.q)
require.NoError(t, err)
require.Equal(t, tc.results, results)
})
}
}
func TestTxSearchWithCancelation(t *testing.T) {
indexer := NewEventSink(db.NewMemDB())
txResult := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}},
{Type: "account", Attributes: []abci.EventAttribute{{Key: "owner", Value: "Ivan", Index: true}}},
{Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}},
})
err := indexer.IndexTxEvents([]*abci.TxResult{txResult})
require.NoError(t, err)
r, e := indexer.GetTxByHash(types.Tx("HELLO WORLD").Hash())
assert.Nil(t, e)
assert.Equal(t, r, txResult)
ctx, cancel := context.WithCancel(context.Background())
cancel()
results, err := indexer.SearchTxEvents(ctx, query.MustParse("account.number = 1"))
assert.NoError(t, err)
assert.Empty(t, results)
}
func TestTxSearchDeprecatedIndexing(t *testing.T) {
esdb := db.NewMemDB()
indexer := NewEventSink(esdb)
// index tx using events indexing (composite key)
txResult1 := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}},
})
hash1 := types.Tx(txResult1.Tx).Hash()
err := indexer.IndexTxEvents([]*abci.TxResult{txResult1})
require.NoError(t, err)
// index tx also using deprecated indexing (event as key)
txResult2 := txResultWithEvents(nil)
txResult2.Tx = types.Tx("HELLO WORLD 2")
hash2 := types.Tx(txResult2.Tx).Hash()
b := esdb.NewBatch()
rawBytes, err := proto.Marshal(txResult2)
require.NoError(t, err)
depKey := []byte(fmt.Sprintf("%s/%s/%d/%d",
"sender",
"addr1",
txResult2.Height,
txResult2.Index,
))
err = b.Set(depKey, hash2)
require.NoError(t, err)
err = b.Set(kvtx.KeyFromHeight(txResult2), hash2)
require.NoError(t, err)
err = b.Set(hash2, rawBytes)
require.NoError(t, err)
err = b.Write()
require.NoError(t, err)
testCases := []struct {
q string
results []*abci.TxResult
}{
// search by hash
{fmt.Sprintf("tx.hash = '%X'", hash1), []*abci.TxResult{txResult1}},
// search by hash
{fmt.Sprintf("tx.hash = '%X'", hash2), []*abci.TxResult{txResult2}},
// search by exact match (one key)
{"account.number = 1", []*abci.TxResult{txResult1}},
{"account.number >= 1 AND account.number <= 5", []*abci.TxResult{txResult1}},
// search by range (lower bound)
{"account.number >= 1", []*abci.TxResult{txResult1}},
// search by range (upper bound)
{"account.number <= 5", []*abci.TxResult{txResult1}},
// search using not allowed key
{"not_allowed = 'boom'", []*abci.TxResult{}},
// search for not existing tx result
{"account.number >= 2 AND account.number <= 5", []*abci.TxResult{}},
// search using not existing key
{"account.date >= TIME 2013-05-03T14:45:00Z", []*abci.TxResult{}},
// search by deprecated key
{"sender = 'addr1'", []*abci.TxResult{txResult2}},
}
ctx := context.Background()
for _, tc := range testCases {
tc := tc
t.Run(tc.q, func(t *testing.T) {
results, err := indexer.SearchTxEvents(ctx, query.MustParse(tc.q))
require.NoError(t, err)
for _, txr := range results {
for _, tr := range tc.results {
assert.True(t, proto.Equal(tr, txr))
}
}
})
}
}
func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
indexer := NewEventSink(db.NewMemDB())
txResult := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}},
{Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "2", Index: true}}},
})
err := indexer.IndexTxEvents([]*abci.TxResult{txResult})
require.NoError(t, err)
ctx := context.Background()
results, err := indexer.SearchTxEvents(ctx, query.MustParse("account.number >= 1"))
assert.NoError(t, err)
assert.Len(t, results, 1)
for _, txr := range results {
assert.True(t, proto.Equal(txResult, txr))
}
}
func TestTxSearchMultipleTxs(t *testing.T) {
indexer := NewEventSink(db.NewMemDB())
// indexed first, but bigger height (to test the order of transactions)
txResult := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}},
})
txResult.Tx = types.Tx("Bob's account")
txResult.Height = 2
txResult.Index = 1
err := indexer.IndexTxEvents([]*abci.TxResult{txResult})
require.NoError(t, err)
// indexed second, but smaller height (to test the order of transactions)
txResult2 := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "2", Index: true}}},
})
txResult2.Tx = types.Tx("Alice's account")
txResult2.Height = 1
txResult2.Index = 2
err = indexer.IndexTxEvents([]*abci.TxResult{txResult2})
require.NoError(t, err)
// indexed third (to test the order of transactions)
txResult3 := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "3", Index: true}}},
})
txResult3.Tx = types.Tx("Jack's account")
txResult3.Height = 1
txResult3.Index = 1
err = indexer.IndexTxEvents([]*abci.TxResult{txResult3})
require.NoError(t, err)
// indexed fourth (to test we don't include txs with similar events)
// https://github.com/tendermint/tendermint/issues/2908
txResult4 := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []abci.EventAttribute{{Key: "number.id", Value: "1", Index: true}}},
})
txResult4.Tx = types.Tx("Mike's account")
txResult4.Height = 2
txResult4.Index = 2
err = indexer.IndexTxEvents([]*abci.TxResult{txResult4})
require.NoError(t, err)
ctx := context.Background()
results, err := indexer.SearchTxEvents(ctx, query.MustParse("account.number >= 1"))
assert.NoError(t, err)
require.Len(t, results, 3)
}
func txResultWithEvents(events []abci.Event) *abci.TxResult {
tx := types.Tx("HELLO WORLD")
return &abci.TxResult{
Height: 1,
Index: 0,
Tx: tx,
Result: abci.ResponseDeliverTx{
Data: []byte{0},
Code: abci.CodeTypeOK,
Log: "",
Events: events,
},
}
}

+ 51
- 0
state/indexer/sink/null/null.go View File

@ -0,0 +1,51 @@
package null
import (
"context"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
)
var _ indexer.EventSink = (*EventSink)(nil)
// EventSink implements a no-op indexer.
type EventSink struct{}
func NewEventSink() indexer.EventSink {
return &EventSink{}
}
func (nes *EventSink) Type() indexer.EventSinkType {
return indexer.NULL
}
func (nes *EventSink) IndexBlockEvents(bh types.EventDataNewBlockHeader) error {
return nil
}
func (nes *EventSink) IndexTxEvents(results []*abci.TxResult) error {
return nil
}
func (nes *EventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) {
return nil, nil
}
func (nes *EventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) {
return nil, nil
}
func (nes *EventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) {
return nil, nil
}
func (nes *EventSink) HasBlock(h int64) (bool, error) {
return false, nil
}
func (nes *EventSink) Stop() error {
return nil
}

+ 39
- 0
state/indexer/sink/null/null_test.go View File

@ -0,0 +1,39 @@
package null
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
)
func TestNullEventSink(t *testing.T) {
nullIndexer := NewEventSink()
assert.Nil(t, nullIndexer.IndexTxEvents(nil))
assert.Nil(t, nullIndexer.IndexBlockEvents(types.EventDataNewBlockHeader{}))
val1, err1 := nullIndexer.SearchBlockEvents(context.TODO(), nil)
assert.Nil(t, val1)
assert.Nil(t, err1)
val2, err2 := nullIndexer.SearchTxEvents(context.TODO(), nil)
assert.Nil(t, val2)
assert.Nil(t, err2)
val3, err3 := nullIndexer.GetTxByHash(nil)
assert.Nil(t, val3)
assert.Nil(t, err3)
val4, err4 := nullIndexer.HasBlock(0)
assert.False(t, val4)
assert.Nil(t, err4)
}
func TestType(t *testing.T) {
nullIndexer := NewEventSink()
assert.Equal(t, indexer.NULL, nullIndexer.Type())
}
func TestStop(t *testing.T) {
nullIndexer := NewEventSink()
assert.Nil(t, nullIndexer.Stop())
}

+ 197
- 0
state/indexer/sink/psql/psql.go View File

@ -0,0 +1,197 @@
package psql
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
sq "github.com/Masterminds/squirrel"
proto "github.com/gogo/protobuf/proto"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
)
var _ indexer.EventSink = (*EventSink)(nil)
const (
TableEventBlock = "block_events"
TableEventTx = "tx_events"
TableResultTx = "tx_results"
DriverName = "postgres"
)
// EventSink is an indexer backend providing the tx/block index services.
type EventSink struct {
store *sql.DB
chainID string
}
func NewEventSink(connStr string, chainID string) (indexer.EventSink, *sql.DB, error) {
db, err := sql.Open(DriverName, connStr)
if err != nil {
return nil, nil, err
}
return &EventSink{
store: db,
chainID: chainID,
}, db, nil
}
func (es *EventSink) Type() indexer.EventSinkType {
return indexer.PSQL
}
func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockHeader) error {
sqlStmt := sq.
Insert(TableEventBlock).
Columns("key", "value", "height", "type", "created_at", "chain_id").
PlaceholderFormat(sq.Dollar)
ts := time.Now()
// index the reserved block height index
sqlStmt = sqlStmt.
Values(types.BlockHeightKey, fmt.Sprint(h.Header.Height), h.Header.Height, "", ts, es.chainID)
// index begin_block events
sqlStmt, err := indexBlockEvents(
sqlStmt, h.ResultBeginBlock.Events, types.EventTypeBeginBlock, h.Header.Height, ts, es.chainID)
if err != nil {
return err
}
// index end_block events
sqlStmt, err = indexBlockEvents(
sqlStmt, h.ResultEndBlock.Events, types.EventTypeEndBlock, h.Header.Height, ts, es.chainID)
if err != nil {
return err
}
_, err = sqlStmt.RunWith(es.store).Exec()
return err
}
func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error {
// index the tx result
var txid uint32
sqlStmtTxResult := sq.
Insert(TableResultTx).
Columns("tx_result", "created_at").
PlaceholderFormat(sq.Dollar).
RunWith(es.store).
Suffix("RETURNING \"id\"")
sqlStmtEvents := sq.
Insert(TableEventTx).
Columns("key", "value", "height", "hash", "tx_result_id", "created_at", "chain_id").
PlaceholderFormat(sq.Dollar)
ts := time.Now()
for _, tx := range txr {
txBz, err := proto.Marshal(tx)
if err != nil {
return err
}
sqlStmtTxResult = sqlStmtTxResult.Values(txBz, ts)
// execute sqlStmtTxResult db query and retrieve the txid
err = sqlStmtTxResult.QueryRow().Scan(&txid)
if err != nil {
return err
}
// index the reserved height and hash indices
hash := fmt.Sprintf("%X", types.Tx(tx.Tx).Hash())
sqlStmtEvents = sqlStmtEvents.Values(types.TxHashKey, hash, tx.Height, hash, txid, ts, es.chainID)
sqlStmtEvents = sqlStmtEvents.Values(types.TxHeightKey, fmt.Sprint(tx.Height), tx.Height, hash, txid, ts, es.chainID)
for _, event := range tx.Result.Events {
// only index events with a non-empty type
if len(event.Type) == 0 {
continue
}
for _, attr := range event.Attributes {
if len(attr.Key) == 0 {
continue
}
// index if `index: true` is set
compositeTag := fmt.Sprintf("%s.%s", event.Type, attr.Key)
// ensure event does not conflict with a reserved prefix key
if compositeTag == types.TxHashKey || compositeTag == types.TxHeightKey {
return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag)
}
if attr.GetIndex() {
sqlStmtEvents = sqlStmtEvents.Values(compositeTag, attr.Value, tx.Height, hash, txid, ts, es.chainID)
}
}
}
}
// execute sqlStmtEvents db query...
_, err := sqlStmtEvents.RunWith(es.store).Exec()
return err
}
func (es *EventSink) SearchBlockEvents(ctx context.Context, q *query.Query) ([]int64, error) {
return nil, errors.New("block search is not supported via the postgres event sink")
}
func (es *EventSink) SearchTxEvents(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) {
return nil, errors.New("tx search is not supported via the postgres event sink")
}
func (es *EventSink) GetTxByHash(hash []byte) (*abci.TxResult, error) {
return nil, errors.New("getTxByHash is not supported via the postgres event sink")
}
func (es *EventSink) HasBlock(h int64) (bool, error) {
return false, errors.New("hasBlock is not supported via the postgres event sink")
}
func indexBlockEvents(
sqlStmt sq.InsertBuilder,
events []abci.Event,
ty string,
height int64,
ts time.Time,
chainID string,
) (sq.InsertBuilder, error) {
for _, event := range events {
// only index events with a non-empty type
if len(event.Type) == 0 {
continue
}
for _, attr := range event.Attributes {
if len(attr.Key) == 0 {
continue
}
// index iff the event specified index:true and it's not a reserved event
compositeKey := fmt.Sprintf("%s.%s", event.Type, attr.Key)
if compositeKey == types.BlockHeightKey {
return sqlStmt, fmt.Errorf(
"event type and attribute key \"%s\" is reserved; please use a different key", compositeKey)
}
if attr.GetIndex() {
sqlStmt = sqlStmt.Values(compositeKey, attr.Value, height, ty, ts, chainID)
}
}
}
return sqlStmt, nil
}
func (es *EventSink) Stop() error {
return es.store.Close()
}

+ 365
- 0
state/indexer/sink/psql/psql_test.go View File

@ -0,0 +1,365 @@
package psql
import (
"context"
"database/sql"
"errors"
"fmt"
"io/ioutil"
"os"
"testing"
"time"
sq "github.com/Masterminds/squirrel"
schema "github.com/adlio/schema"
proto "github.com/gogo/protobuf/proto"
_ "github.com/lib/pq"
dockertest "github.com/ory/dockertest"
"github.com/ory/dockertest/docker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
)
var db *sql.DB
var resource *dockertest.Resource
var chainID = "test-chainID"
var (
user = "postgres"
password = "secret"
port = "5432"
dsn = "postgres://%s:%s@localhost:%s/%s?sslmode=disable"
dbName = "postgres"
)
func TestType(t *testing.T) {
pool, err := setupDB(t)
require.NoError(t, err)
psqlSink := &EventSink{store: db, chainID: chainID}
assert.Equal(t, indexer.PSQL, psqlSink.Type())
require.NoError(t, teardown(t, pool))
}
func TestBlockFuncs(t *testing.T) {
pool, err := setupDB(t)
require.NoError(t, err)
indexer := &EventSink{store: db, chainID: chainID}
require.NoError(t, indexer.IndexBlockEvents(getTestBlockHeader()))
r, err := verifyBlock(1)
assert.True(t, r)
require.NoError(t, err)
r, err = verifyBlock(2)
assert.False(t, r)
require.NoError(t, err)
r, err = indexer.HasBlock(1)
assert.False(t, r)
assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err)
r, err = indexer.HasBlock(2)
assert.False(t, r)
assert.Equal(t, errors.New("hasBlock is not supported via the postgres event sink"), err)
r2, err := indexer.SearchBlockEvents(context.TODO(), nil)
assert.Nil(t, r2)
assert.Equal(t, errors.New("block search is not supported via the postgres event sink"), err)
require.NoError(t, verifyTimeStamp(TableEventBlock))
require.NoError(t, teardown(t, pool))
}
func TestTxFuncs(t *testing.T) {
pool, err := setupDB(t)
assert.Nil(t, err)
indexer := &EventSink{store: db, chainID: chainID}
txResult := txResultWithEvents([]abci.Event{
{Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "1", Index: true}}},
{Type: "account", Attributes: []abci.EventAttribute{{Key: "owner", Value: "Ivan", Index: true}}},
{Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}},
})
err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
require.NoError(t, err)
tx, err := verifyTx(types.Tx(txResult.Tx).Hash())
require.NoError(t, err)
assert.Equal(t, txResult, tx)
require.NoError(t, verifyTimeStamp(TableEventTx))
require.NoError(t, verifyTimeStamp(TableResultTx))
tx, err = indexer.GetTxByHash(types.Tx(txResult.Tx).Hash())
assert.Nil(t, tx)
assert.Equal(t, errors.New("getTxByHash is not supported via the postgres event sink"), err)
r2, err := indexer.SearchTxEvents(context.TODO(), nil)
assert.Nil(t, r2)
assert.Equal(t, errors.New("tx search is not supported via the postgres event sink"), err)
assert.Nil(t, teardown(t, pool))
}
func TestStop(t *testing.T) {
pool, err := setupDB(t)
require.NoError(t, err)
indexer := &EventSink{store: db}
require.NoError(t, indexer.Stop())
defer db.Close()
require.NoError(t, pool.Purge(resource))
}
func getTestBlockHeader() types.EventDataNewBlockHeader {
return types.EventDataNewBlockHeader{
Header: types.Header{Height: 1},
ResultBeginBlock: abci.ResponseBeginBlock{
Events: []abci.Event{
{
Type: "begin_event",
Attributes: []abci.EventAttribute{
{
Key: "proposer",
Value: "FCAA001",
Index: true,
},
},
},
},
},
ResultEndBlock: abci.ResponseEndBlock{
Events: []abci.Event{
{
Type: "end_event",
Attributes: []abci.EventAttribute{
{
Key: "foo",
Value: "100",
Index: true,
},
},
},
},
},
}
}
func readSchema() ([]*schema.Migration, error) {
filename := "schema.sql"
contents, err := ioutil.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("failed to read sql file from '%s': %w", filename, err)
}
mg := &schema.Migration{}
mg.ID = time.Now().Local().String() + " db schema"
mg.Script = string(contents)
return append([]*schema.Migration{}, mg), nil
}
func resetDB(t *testing.T) {
q := "DROP TABLE IF EXISTS block_events,tx_events,tx_results"
_, err := db.Exec(q)
require.NoError(t, err)
q = "DROP TYPE IF EXISTS block_event_type"
_, err = db.Exec(q)
require.NoError(t, err)
}
func txResultWithEvents(events []abci.Event) *abci.TxResult {
tx := types.Tx("HELLO WORLD")
return &abci.TxResult{
Height: 1,
Index: 0,
Tx: tx,
Result: abci.ResponseDeliverTx{
Data: []byte{0},
Code: abci.CodeTypeOK,
Log: "",
Events: events,
},
}
}
func verifyTx(hash []byte) (*abci.TxResult, error) {
join := fmt.Sprintf("%s ON %s.id = tx_result_id", TableEventTx, TableResultTx)
sqlStmt := sq.
Select("tx_result", fmt.Sprintf("%s.id", TableResultTx), "tx_result_id", "hash", "chain_id").
Distinct().From(TableResultTx).
InnerJoin(join).
Where(fmt.Sprintf("hash = $1 AND chain_id = '%s'", chainID), fmt.Sprintf("%X", hash))
rows, err := sqlStmt.RunWith(db).Query()
if err != nil {
return nil, err
}
defer rows.Close()
if rows.Next() {
var txResult []byte
var txResultID, txid int
var h, cid string
err = rows.Scan(&txResult, &txResultID, &txid, &h, &cid)
if err != nil {
return nil, nil
}
msg := new(abci.TxResult)
err = proto.Unmarshal(txResult, msg)
if err != nil {
return nil, err
}
return msg, err
}
// No result
return nil, nil
}
func verifyTimeStamp(tb string) error {
// We assume the tx indexing time would not exceed 2 second from now
sqlStmt := sq.
Select(fmt.Sprintf("%s.created_at", tb)).
Distinct().From(tb).
Where(fmt.Sprintf("%s.created_at >= $1", tb), time.Now().Add(-2*time.Second))
rows, err := sqlStmt.RunWith(db).Query()
if err != nil {
return err
}
defer rows.Close()
if rows.Next() {
var ts string
err = rows.Scan(&ts)
if err != nil {
return err
}
return nil
}
return errors.New("no result")
}
func verifyBlock(h int64) (bool, error) {
sqlStmt := sq.
Select("height").
Distinct().
From(TableEventBlock).
Where(fmt.Sprintf("height = %d", h))
rows, err := sqlStmt.RunWith(db).Query()
if err != nil {
return false, err
}
defer rows.Close()
if !rows.Next() {
return false, nil
}
sqlStmt = sq.
Select("type, height", "chain_id").
Distinct().
From(TableEventBlock).
Where(fmt.Sprintf("height = %d AND type = '%s' AND chain_id = '%s'", h, types.EventTypeBeginBlock, chainID))
rows, err = sqlStmt.RunWith(db).Query()
if err != nil {
return false, err
}
if !rows.Next() {
return false, nil
}
sqlStmt = sq.
Select("type, height").
Distinct().
From(TableEventBlock).
Where(fmt.Sprintf("height = %d AND type = '%s'", h, types.EventTypeEndBlock))
rows, err = sqlStmt.RunWith(db).Query()
if err != nil {
return false, err
}
return rows.Next(), nil
}
func setupDB(t *testing.T) (*dockertest.Pool, error) {
t.Helper()
pool, err := dockertest.NewPool(os.Getenv("DOCKER_URL"))
require.NoError(t, err)
resource, err = pool.RunWithOptions(&dockertest.RunOptions{
Repository: DriverName,
Tag: "13",
Env: []string{
"POSTGRES_USER=" + user,
"POSTGRES_PASSWORD=" + password,
"POSTGRES_DB=" + dbName,
"listen_addresses = '*'",
},
ExposedPorts: []string{port},
}, func(config *docker.HostConfig) {
// set AutoRemove to true so that stopped container goes away by itself
config.AutoRemove = true
config.RestartPolicy = docker.RestartPolicy{
Name: "no",
}
})
require.NoError(t, err)
// Set the container to expire in a minute to avoid orphaned containers
// hanging around
_ = resource.Expire(60)
conn := fmt.Sprintf(dsn, user, password, resource.GetPort(port+"/tcp"), dbName)
if err = pool.Retry(func() error {
var err error
_, db, err = NewEventSink(conn, chainID)
if err != nil {
return err
}
return db.Ping()
}); err != nil {
require.NoError(t, err)
}
resetDB(t)
sm, err := readSchema()
assert.Nil(t, err)
assert.Nil(t, schema.NewMigrator().Apply(db, sm))
return pool, nil
}
func teardown(t *testing.T, pool *dockertest.Pool) error {
t.Helper()
// When you're done, kill and remove the container
assert.Nil(t, pool.Purge(resource))
return db.Close()
}

+ 31
- 0
state/indexer/sink/psql/schema.sql View File

@ -0,0 +1,31 @@
CREATE TYPE block_event_type AS ENUM ('begin_block', 'end_block', '');
CREATE TABLE block_events (
id SERIAL PRIMARY KEY,
key VARCHAR NOT NULL,
value VARCHAR NOT NULL,
height INTEGER NOT NULL,
type block_event_type,
created_at TIMESTAMPTZ NOT NULL,
chain_id VARCHAR NOT NULL
);
CREATE TABLE tx_results (
id SERIAL PRIMARY KEY,
tx_result BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL
);
CREATE TABLE tx_events (
id SERIAL PRIMARY KEY,
key VARCHAR NOT NULL,
value VARCHAR NOT NULL,
height INTEGER NOT NULL,
hash VARCHAR NOT NULL,
tx_result_id SERIAL,
created_at TIMESTAMPTZ NOT NULL,
chain_id VARCHAR NOT NULL,
FOREIGN KEY (tx_result_id)
REFERENCES tx_results(id)
ON DELETE CASCADE
);
CREATE INDEX idx_block_events_key_value ON block_events(key, value);
CREATE INDEX idx_tx_events_key_value ON tx_events(key, value);
CREATE INDEX idx_tx_events_hash ON tx_events(hash);

+ 11
- 46
state/indexer/tx/kv/kv.go View File

@ -58,25 +58,25 @@ func (txi *TxIndex) Get(hash []byte) (*abci.TxResult, error) {
return txResult, nil
}
// AddBatch indexes a batch of transactions using the given list of events. Each
// key that indexed from the tx's events is a composite of the event type and
// the respective attribute's key delimited by a "." (eg. "account.number").
// Index indexes transactions using the given list of events. Each key
// that indexed from the tx's events is a composite of the event type and the
// respective attribute's key delimited by a "." (eg. "account.number").
// Any event with an empty type is not indexed.
func (txi *TxIndex) AddBatch(b *indexer.Batch) error {
storeBatch := txi.store.NewBatch()
defer storeBatch.Close()
func (txi *TxIndex) Index(results []*abci.TxResult) error {
b := txi.store.NewBatch()
defer b.Close()
for _, result := range b.Ops {
for _, result := range results {
hash := types.Tx(result.Tx).Hash()
// index tx by events
err := txi.indexEvents(result, hash, storeBatch)
err := txi.indexEvents(result, hash, b)
if err != nil {
return err
}
// index by height (always)
err = storeBatch.Set(keyFromHeight(result), hash)
err = b.Set(KeyFromHeight(result), hash)
if err != nil {
return err
}
@ -86,47 +86,12 @@ func (txi *TxIndex) AddBatch(b *indexer.Batch) error {
return err
}
// index by hash (always)
err = storeBatch.Set(primaryKey(hash), rawBytes)
err = b.Set(primaryKey(hash), rawBytes)
if err != nil {
return err
}
}
return storeBatch.WriteSync()
}
// Index indexes a single transaction using the given list of events. Each key
// that indexed from the tx's events is a composite of the event type and the
// respective attribute's key delimited by a "." (eg. "account.number").
// Any event with an empty type is not indexed.
func (txi *TxIndex) Index(result *abci.TxResult) error {
b := txi.store.NewBatch()
defer b.Close()
hash := types.Tx(result.Tx).Hash()
// index tx by events
err := txi.indexEvents(result, hash, b)
if err != nil {
return err
}
// index by height (always)
err = b.Set(keyFromHeight(result), hash)
if err != nil {
return err
}
rawBytes, err := proto.Marshal(result)
if err != nil {
return err
}
// index by hash (always)
err = b.Set(primaryKey(hash), rawBytes)
if err != nil {
return err
}
return b.WriteSync()
}
@ -584,7 +549,7 @@ func keyFromEvent(compositeKey string, value string, result *abci.TxResult) []by
return secondaryKey(compositeKey, value, result.Height, result.Index)
}
func keyFromHeight(result *abci.TxResult) []byte {
func KeyFromHeight(result *abci.TxResult) []byte {
return secondaryKey(types.TxHeightKey, fmt.Sprintf("%d", result.Height), result.Height, result.Index)
}


+ 1
- 1
state/indexer/tx/kv/kv_bench_test.go View File

@ -55,7 +55,7 @@ func BenchmarkTxSearch(b *testing.B) {
},
}
if err := indexer.Index(txResult); err != nil {
if err := indexer.Index([]*abci.TxResult{txResult}); err != nil {
b.Errorf("failed to index tx: %s", err)
}
}


+ 12
- 12
state/indexer/tx/kv/kv_test.go View File

@ -39,7 +39,7 @@ func TestTxIndex(t *testing.T) {
if err := batch.Add(txResult); err != nil {
t.Error(err)
}
err := txIndexer.AddBatch(batch)
err := txIndexer.Index(batch.Ops)
require.NoError(t, err)
loadedTxResult, err := txIndexer.Get(hash)
@ -58,7 +58,7 @@ func TestTxIndex(t *testing.T) {
}
hash2 := tx2.Hash()
err = txIndexer.Index(txResult2)
err = txIndexer.Index([]*abci.TxResult{txResult2})
require.NoError(t, err)
loadedTxResult2, err := txIndexer.Get(hash2)
@ -76,7 +76,7 @@ func TestTxSearch(t *testing.T) {
})
hash := types.Tx(txResult.Tx).Hash()
err := indexer.Index(txResult)
err := indexer.Index([]*abci.TxResult{txResult})
require.NoError(t, err)
testCases := []struct {
@ -154,7 +154,7 @@ func TestTxSearchWithCancelation(t *testing.T) {
{Type: "account", Attributes: []abci.EventAttribute{{Key: "owner", Value: "Ivan", Index: true}}},
{Type: "", Attributes: []abci.EventAttribute{{Key: "not_allowed", Value: "Vlad", Index: true}}},
})
err := indexer.Index(txResult)
err := indexer.Index([]*abci.TxResult{txResult})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
@ -173,7 +173,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) {
})
hash1 := types.Tx(txResult1.Tx).Hash()
err := indexer.Index(txResult1)
err := indexer.Index([]*abci.TxResult{txResult1})
require.NoError(t, err)
// index tx also using deprecated indexing (event as key)
@ -195,7 +195,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) {
err = b.Set(depKey, hash2)
require.NoError(t, err)
err = b.Set(keyFromHeight(txResult2), hash2)
err = b.Set(KeyFromHeight(txResult2), hash2)
require.NoError(t, err)
err = b.Set(hash2, rawBytes)
require.NoError(t, err)
@ -251,7 +251,7 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
{Type: "account", Attributes: []abci.EventAttribute{{Key: "number", Value: "2", Index: true}}},
})
err := indexer.Index(txResult)
err := indexer.Index([]*abci.TxResult{txResult})
require.NoError(t, err)
ctx := context.Background()
@ -276,7 +276,7 @@ func TestTxSearchMultipleTxs(t *testing.T) {
txResult.Tx = types.Tx("Bob's account")
txResult.Height = 2
txResult.Index = 1
err := indexer.Index(txResult)
err := indexer.Index([]*abci.TxResult{txResult})
require.NoError(t, err)
// indexed second, but smaller height (to test the order of transactions)
@ -287,7 +287,7 @@ func TestTxSearchMultipleTxs(t *testing.T) {
txResult2.Height = 1
txResult2.Index = 2
err = indexer.Index(txResult2)
err = indexer.Index([]*abci.TxResult{txResult2})
require.NoError(t, err)
// indexed third (to test the order of transactions)
@ -297,7 +297,7 @@ func TestTxSearchMultipleTxs(t *testing.T) {
txResult3.Tx = types.Tx("Jack's account")
txResult3.Height = 1
txResult3.Index = 1
err = indexer.Index(txResult3)
err = indexer.Index([]*abci.TxResult{txResult3})
require.NoError(t, err)
// indexed fourth (to test we don't include txs with similar events)
@ -308,7 +308,7 @@ func TestTxSearchMultipleTxs(t *testing.T) {
txResult4.Tx = types.Tx("Mike's account")
txResult4.Height = 2
txResult4.Index = 2
err = indexer.Index(txResult4)
err = indexer.Index([]*abci.TxResult{txResult4})
require.NoError(t, err)
ctx := context.Background()
@ -367,7 +367,7 @@ func benchmarkTxIndex(txsCount int64, b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
err = txIndexer.AddBatch(batch)
err = txIndexer.Index(batch.Ops)
}
if err != nil {
b.Fatal(err)


+ 1
- 1
state/indexer/tx/null/null.go View File

@ -25,7 +25,7 @@ func (txi *TxIndex) AddBatch(batch *indexer.Batch) error {
}
// Index is a noop and always returns nil.
func (txi *TxIndex) Index(result *abci.TxResult) error {
func (txi *TxIndex) Index(results []*abci.TxResult) error {
return nil
}


+ 3
- 0
types/events.go View File

@ -141,6 +141,9 @@ const (
// BlockHeightKey is a reserved key used for indexing BeginBlock and Endblock
// events.
BlockHeightKey = "block.height"
EventTypeBeginBlock = "begin_block"
EventTypeEndBlock = "end_block"
)
var (


Loading…
Cancel
Save