diff --git a/config/config.go b/config/config.go index 25d6c44a5..ea3fa13e4 100644 --- a/config/config.go +++ b/config/config.go @@ -16,6 +16,7 @@ type Config struct { P2P *P2PConfig `mapstructure:"p2p"` Mempool *MempoolConfig `mapstructure:"mempool"` Consensus *ConsensusConfig `mapstructure:"consensus"` + TxIndex *TxIndexConfig `mapstructure:"tx_index"` } // DefaultConfig returns a default configuration for a Tendermint node @@ -26,6 +27,7 @@ func DefaultConfig() *Config { P2P: DefaultP2PConfig(), Mempool: DefaultMempoolConfig(), Consensus: DefaultConsensusConfig(), + TxIndex: DefaultTxIndexConfig(), } } @@ -37,6 +39,7 @@ func TestConfig() *Config { P2P: TestP2PConfig(), Mempool: DefaultMempoolConfig(), Consensus: TestConsensusConfig(), + TxIndex: DefaultTxIndexConfig(), } } @@ -93,9 +96,6 @@ type BaseConfig struct { // so the app can decide if we should keep the connection or not FilterPeers bool `mapstructure:"filter_peers"` // false - // What indexer to use for transactions - TxIndex string `mapstructure:"tx_index"` - // Database backend: leveldb | memdb DBBackend string `mapstructure:"db_backend"` @@ -115,7 +115,6 @@ func DefaultBaseConfig() BaseConfig { ProfListenAddress: "", FastSync: true, FilterPeers: false, - TxIndex: "kv", DBBackend: "leveldb", DBPath: "data", } @@ -412,6 +411,41 @@ func (c *ConsensusConfig) SetWalFile(walFile string) { c.walFile = walFile } +//----------------------------------------------------------------------------- +// TxIndexConfig + +// TxIndexConfig defines the confuguration for the transaction +// indexer, including tags to index. +type TxIndexConfig struct { + // What indexer to use for transactions + // + // Options: + // 1) "null" (default) + // 2) "kv" - the simplest possible indexer, backed by key-value storage (defaults to levelDB; see DBBackend). + Indexer string `mapstructure:"indexer"` + + // Comma-separated list of tags to index (by default the only tag is tx hash) + // + // It's recommended to index only a subset of tags due to possible memory + // bloat. This is, of course, depends on the indexer's DB and the volume of + // transactions. + IndexTags string `mapstructure:"index_tags"` + + // When set to true, tells indexer to index all tags. Note this may be not + // desirable (see the comment above). IndexTags has a precedence over + // IndexAllTags (i.e. when given both, IndexTags will be indexed). + IndexAllTags bool `mapstructure:"index_all_tags"` +} + +// DefaultTxIndexConfig returns a default configuration for the transaction indexer. +func DefaultTxIndexConfig() *TxIndexConfig { + return &TxIndexConfig{ + Indexer: "kv", + IndexTags: "", + IndexAllTags: false, + } +} + //----------------------------------------------------------------------------- // Utils diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index e4d09c95d..089d7b3f9 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -2,6 +2,7 @@ package consensus import ( "encoding/binary" + "fmt" "testing" "time" @@ -188,33 +189,41 @@ func (app *CounterApplication) Info(req abci.RequestInfo) abci.ResponseInfo { return abci.ResponseInfo{Data: cmn.Fmt("txs:%v", app.txCount)} } -func (app *CounterApplication) DeliverTx(tx []byte) abci.Result { - return runTx(tx, &app.txCount) +func (app *CounterApplication) DeliverTx(tx []byte) abci.ResponseDeliverTx { + txValue := txAsUint64(tx) + if txValue != uint64(app.txCount) { + return abci.ResponseDeliverTx{ + Code: abci.CodeType_BadNonce, + Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.txCount, txValue)} + } + app.txCount += 1 + return abci.ResponseDeliverTx{Code: abci.CodeType_OK} } -func (app *CounterApplication) CheckTx(tx []byte) abci.Result { - return runTx(tx, &app.mempoolTxCount) +func (app *CounterApplication) CheckTx(tx []byte) abci.ResponseCheckTx { + txValue := txAsUint64(tx) + if txValue != uint64(app.mempoolTxCount) { + return abci.ResponseCheckTx{ + Code: abci.CodeType_BadNonce, + Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.mempoolTxCount, txValue)} + } + app.mempoolTxCount += 1 + return abci.ResponseCheckTx{Code: abci.CodeType_OK} } -func runTx(tx []byte, countPtr *int) abci.Result { - count := *countPtr +func txAsUint64(tx []byte) uint64 { tx8 := make([]byte, 8) copy(tx8[len(tx8)-len(tx):], tx) - txValue := binary.BigEndian.Uint64(tx8) - if txValue != uint64(count) { - return abci.ErrBadNonce.AppendLog(cmn.Fmt("Invalid nonce. Expected %v, got %v", count, txValue)) - } - *countPtr += 1 - return abci.OK + return binary.BigEndian.Uint64(tx8) } -func (app *CounterApplication) Commit() abci.Result { +func (app *CounterApplication) Commit() abci.ResponseCommit { app.mempoolTxCount = app.txCount if app.txCount == 0 { - return abci.OK + return abci.ResponseCommit{Code: abci.CodeType_OK} } else { hash := make([]byte, 8) binary.BigEndian.PutUint64(hash, uint64(app.txCount)) - return abci.NewResultOK(hash, "") + return abci.ResponseCommit{Code: abci.CodeType_OK, Data: hash} } } diff --git a/consensus/replay.go b/consensus/replay.go index fb1c49a10..da68df51f 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -236,7 +236,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p // If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain if appBlockHeight == 0 { validators := types.TM2PB.Validators(h.state.Validators) - if err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { + if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { return nil, err } } @@ -385,21 +385,17 @@ type mockProxyApp struct { abciResponses *sm.ABCIResponses } -func (mock *mockProxyApp) DeliverTx(tx []byte) abci.Result { +func (mock *mockProxyApp) DeliverTx(tx []byte) abci.ResponseDeliverTx { r := mock.abciResponses.DeliverTx[mock.txCount] mock.txCount += 1 - return abci.Result{ - r.Code, - r.Data, - r.Log, - } + return *r } -func (mock *mockProxyApp) EndBlock(height uint64) abci.ResponseEndBlock { +func (mock *mockProxyApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock { mock.txCount = 0 - return mock.abciResponses.EndBlock + return *mock.abciResponses.EndBlock } -func (mock *mockProxyApp) Commit() abci.Result { - return abci.NewResultOK(mock.appHash, "") +func (mock *mockProxyApp) Commit() abci.ResponseCommit { + return abci.ResponseCommit{Code: abci.CodeType_OK, Data: mock.appHash} } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 381c9021c..25fdf4dba 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -411,7 +411,7 @@ func buildAppStateFromChain(proxyApp proxy.AppConns, } validators := types.TM2PB.Validators(state.Validators) - if err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { + if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { panic(err) } @@ -447,7 +447,7 @@ func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.B defer proxyApp.Stop() validators := types.TM2PB.Validators(state.Validators) - if err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { + if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { panic(err) } diff --git a/glide.lock b/glide.lock index e12ddb4e2..d69aacb06 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 223d8e42a118e7861cb673ea58a035e99d3a98c94e4b71fb52998d320f9c3b49 -updated: 2017-11-25T22:00:24.612202481-08:00 +hash: 8c38726da2666831affa40474117d3cef5dad083176e81fb013d7e8493b83e6f +updated: 2017-12-01T02:14:22.08770964Z imports: - name: github.com/btcsuite/btcd version: 8cea3866d0f7fb12d567a20744942c0d078c7d15 @@ -98,7 +98,7 @@ imports: - leveldb/table - leveldb/util - name: github.com/tendermint/abci - version: 76ef8a0697c6179220a74c479b36c27a5b53008a + version: 22b491bb1952125dd2fb0730d6ca8e59e310547c subpackages: - client - example/counter @@ -113,7 +113,7 @@ imports: - name: github.com/tendermint/go-crypto version: dd20358a264c772b4a83e477b0cfce4c88a7001d - name: github.com/tendermint/go-wire - version: 7d50b38b3815efe313728de77e2995c8813ce13f + version: 5ab49b4c6ad674da6b81442911cf713ef0afb544 subpackages: - data - data/base58 @@ -123,7 +123,7 @@ imports: subpackages: - iavl - name: github.com/tendermint/tmlibs - version: 1e12754b3a3b5f1c23bf44c2d882faae688fb2e8 + version: 21fb7819891997c96838308b4eba5a50b07ff03f subpackages: - autofile - cli @@ -160,6 +160,8 @@ imports: - trace - name: golang.org/x/sys version: b98136db334ff9cb24f28a68e3be3cb6608f7630 + subpackages: + - unix - name: golang.org/x/text version: 88f656faf3f37f690df1a32515b479415e1a6769 subpackages: diff --git a/glide.yaml b/glide.yaml index 0f07dc2da..18f0dae8d 100644 --- a/glide.yaml +++ b/glide.yaml @@ -18,7 +18,7 @@ import: - package: github.com/spf13/viper version: v1.0.0 - package: github.com/tendermint/abci - version: ~0.7.0 + version: 22b491bb1952125dd2fb0730d6ca8e59e310547c subpackages: - client - example/dummy @@ -34,7 +34,7 @@ import: subpackages: - iavl - package: github.com/tendermint/tmlibs - version: 1e12754b3a3b5f1c23bf44c2d882faae688fb2e8 + version: develop subpackages: - autofile - cli diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 2bbf99442..e26ef966d 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -13,6 +13,7 @@ import ( "github.com/tendermint/abci/example/counter" "github.com/tendermint/abci/example/dummy" + abci "github.com/tendermint/abci/types" "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" @@ -115,7 +116,7 @@ func TestTxsAvailable(t *testing.T) { func TestSerialReap(t *testing.T) { app := counter.NewCounterApplication(true) - app.SetOption("serial", "on") + app.SetOption(abci.RequestSetOption{"serial", "on"}) cc := proxy.NewLocalClientCreator(app) mempool := newMempoolWithApp(cc) @@ -172,13 +173,19 @@ func TestSerialReap(t *testing.T) { for i := start; i < end; i++ { txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) - res := appConnCon.DeliverTxSync(txBytes) - if !res.IsOK() { + res, err := appConnCon.DeliverTxSync(txBytes) + if err != nil { + t.Errorf("Client error committing tx: %v", err) + } + if res.IsErr() { t.Errorf("Error committing tx. Code:%v result:%X log:%v", res.Code, res.Data, res.Log) } } - res := appConnCon.CommitSync() + res, err := appConnCon.CommitSync() + if err != nil { + t.Errorf("Client error committing: %v", err) + } if len(res.Data) != 8 { t.Errorf("Error committing. Hash:%X log:%v", res.Data, res.Log) } diff --git a/node/node.go b/node/node.go index 5b8ab994a..7841a103d 100644 --- a/node/node.go +++ b/node/node.go @@ -111,6 +111,7 @@ type Node struct { proxyApp proxy.AppConns // connection to the application rpcListeners []net.Listener // rpc servers txIndexer txindex.TxIndexer + indexerService *txindex.IndexerService } // NewNode returns a new, ready to go, Tendermint Node. @@ -173,20 +174,6 @@ func NewNode(config *cfg.Config, state = sm.LoadState(stateDB) state.SetLogger(stateLogger) - // Transaction indexing - var txIndexer txindex.TxIndexer - switch config.TxIndex { - case "kv": - store, err := dbProvider(&DBContext{"tx_index", config}) - if err != nil { - return nil, err - } - txIndexer = kv.NewTxIndex(store) - default: - txIndexer = &null.TxIndex{} - } - state.TxIndexer = txIndexer - // Generate node PrivKey privKey := crypto.GenPrivKeyEd25519() @@ -293,6 +280,27 @@ func NewNode(config *cfg.Config, bcReactor.SetEventBus(eventBus) consensusReactor.SetEventBus(eventBus) + // Transaction indexing + var txIndexer txindex.TxIndexer + switch config.TxIndex.Indexer { + case "kv": + store, err := dbProvider(&DBContext{"tx_index", config}) + if err != nil { + return nil, err + } + if config.TxIndex.IndexTags != "" { + txIndexer = kv.NewTxIndex(store, kv.IndexTags(strings.Split(config.TxIndex.IndexTags, ","))) + } else if config.TxIndex.IndexAllTags { + txIndexer = kv.NewTxIndex(store, kv.IndexAllTags()) + } else { + txIndexer = kv.NewTxIndex(store) + } + default: + txIndexer = &null.TxIndex{} + } + + indexerService := txindex.NewIndexerService(txIndexer, eventBus) + // run the profile server profileHost := config.ProfListenAddress if profileHost != "" { @@ -318,6 +326,7 @@ func NewNode(config *cfg.Config, consensusReactor: consensusReactor, proxyApp: proxyApp, txIndexer: txIndexer, + indexerService: indexerService, eventBus: eventBus, } node.BaseService = *cmn.NewBaseService(logger, "Node", node) @@ -363,6 +372,12 @@ func (n *Node) OnStart() error { } } + // start tx indexer + err = n.indexerService.Start() + if err != nil { + return err + } + return nil } @@ -382,6 +397,8 @@ func (n *Node) OnStop() { } n.eventBus.Stop() + + n.indexerService.Stop() } // RunForever waits for an interrupt signal and stops the node. diff --git a/proxy/app_conn.go b/proxy/app_conn.go index 9121e8db8..2319fed82 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -12,12 +12,12 @@ type AppConnConsensus interface { SetResponseCallback(abcicli.Callback) Error() error - InitChainSync(types.RequestInitChain) (err error) + InitChainSync(types.RequestInitChain) (*types.ResponseInitChain, error) - BeginBlockSync(types.RequestBeginBlock) (err error) + BeginBlockSync(types.RequestBeginBlock) (*types.ResponseBeginBlock, error) DeliverTxAsync(tx []byte) *abcicli.ReqRes - EndBlockSync(height uint64) (types.ResponseEndBlock, error) - CommitSync() (res types.Result) + EndBlockSync(types.RequestEndBlock) (*types.ResponseEndBlock, error) + CommitSync() (*types.ResponseCommit, error) } type AppConnMempool interface { @@ -33,9 +33,9 @@ type AppConnMempool interface { type AppConnQuery interface { Error() error - EchoSync(string) (res types.Result) - InfoSync(types.RequestInfo) (types.ResponseInfo, error) - QuerySync(types.RequestQuery) (types.ResponseQuery, error) + EchoSync(string) (*types.ResponseEcho, error) + InfoSync(types.RequestInfo) (*types.ResponseInfo, error) + QuerySync(types.RequestQuery) (*types.ResponseQuery, error) // SetOptionSync(key string, value string) (res types.Result) } @@ -61,11 +61,11 @@ func (app *appConnConsensus) Error() error { return app.appConn.Error() } -func (app *appConnConsensus) InitChainSync(req types.RequestInitChain) (err error) { +func (app *appConnConsensus) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) { return app.appConn.InitChainSync(req) } -func (app *appConnConsensus) BeginBlockSync(req types.RequestBeginBlock) (err error) { +func (app *appConnConsensus) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) { return app.appConn.BeginBlockSync(req) } @@ -73,11 +73,11 @@ func (app *appConnConsensus) DeliverTxAsync(tx []byte) *abcicli.ReqRes { return app.appConn.DeliverTxAsync(tx) } -func (app *appConnConsensus) EndBlockSync(height uint64) (types.ResponseEndBlock, error) { - return app.appConn.EndBlockSync(height) +func (app *appConnConsensus) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) { + return app.appConn.EndBlockSync(req) } -func (app *appConnConsensus) CommitSync() (res types.Result) { +func (app *appConnConsensus) CommitSync() (*types.ResponseCommit, error) { return app.appConn.CommitSync() } @@ -131,14 +131,14 @@ func (app *appConnQuery) Error() error { return app.appConn.Error() } -func (app *appConnQuery) EchoSync(msg string) (res types.Result) { +func (app *appConnQuery) EchoSync(msg string) (*types.ResponseEcho, error) { return app.appConn.EchoSync(msg) } -func (app *appConnQuery) InfoSync(req types.RequestInfo) (types.ResponseInfo, error) { +func (app *appConnQuery) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { return app.appConn.InfoSync(req) } -func (app *appConnQuery) QuerySync(reqQuery types.RequestQuery) (types.ResponseQuery, error) { +func (app *appConnQuery) QuerySync(reqQuery types.RequestQuery) (*types.ResponseQuery, error) { return app.appConn.QuerySync(reqQuery) } diff --git a/proxy/app_conn_test.go b/proxy/app_conn_test.go index 3c00f1ae4..0fbad6022 100644 --- a/proxy/app_conn_test.go +++ b/proxy/app_conn_test.go @@ -17,7 +17,7 @@ import ( type AppConnTest interface { EchoAsync(string) *abcicli.ReqRes FlushSync() error - InfoSync(types.RequestInfo) (types.ResponseInfo, error) + InfoSync(types.RequestInfo) (*types.ResponseInfo, error) } type appConnTest struct { @@ -36,7 +36,7 @@ func (app *appConnTest) FlushSync() error { return app.appConn.FlushSync() } -func (app *appConnTest) InfoSync(req types.RequestInfo) (types.ResponseInfo, error) { +func (app *appConnTest) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { return app.appConn.InfoSync(req) } diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index 9619e5c05..963282294 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -100,7 +100,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { require.True(ok, "%d: %#v", i, evt) // make sure this is the proper tx require.EqualValues(tx, txe.Tx) - require.True(txe.Code.IsOK()) + require.True(txe.Result.Code.IsOK()) } } @@ -132,6 +132,6 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { require.True(ok, "%d: %#v", i, evt) // make sure this is the proper tx require.EqualValues(tx, txe.Tx) - require.True(txe.Code.IsOK()) + require.True(txe.Result.Code.IsOK()) } } diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 47c99fd37..5ceace970 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -163,17 +163,30 @@ func (c *HTTP) Commit(height *int) (*ctypes.ResultCommit, error) { func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { result := new(ctypes.ResultTx) - query := map[string]interface{}{ + params := map[string]interface{}{ "hash": hash, "prove": prove, } - _, err := c.rpc.Call("tx", query, result) + _, err := c.rpc.Call("tx", params, result) if err != nil { return nil, errors.Wrap(err, "Tx") } return result, nil } +func (c *HTTP) TxSearch(query string, prove bool) ([]*ctypes.ResultTx, error) { + results := new([]*ctypes.ResultTx) + params := map[string]interface{}{ + "query": query, + "prove": prove, + } + _, err := c.rpc.Call("tx_search", params, results) + if err != nil { + return nil, errors.Wrap(err, "TxSearch") + } + return *results, nil +} + func (c *HTTP) Validators(height *int) (*ctypes.ResultValidators, error) { result := new(ctypes.ResultValidators) _, err := c.rpc.Call("validators", map[string]interface{}{"height": height}, result) diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 443ea89d2..c0d7e0524 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -50,6 +50,7 @@ type SignClient interface { Commit(height *int) (*ctypes.ResultCommit, error) Validators(height *int) (*ctypes.ResultValidators, error) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) + TxSearch(query string, prove bool) ([]*ctypes.ResultTx, error) } // HistoryClient shows us data from genesis to now in large chunks. diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go index 55a0e0fb7..d54440079 100644 --- a/rpc/client/localclient.go +++ b/rpc/client/localclient.go @@ -124,6 +124,10 @@ func (Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { return core.Tx(hash, prove) } +func (Local) TxSearch(query string, prove bool) ([]*ctypes.ResultTx, error) { + return core.TxSearch(query, prove) +} + func (c *Local) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { q, err := tmquery.New(query) if err != nil { diff --git a/rpc/client/mock/abci.go b/rpc/client/mock/abci.go index e935a2824..2ffa9269d 100644 --- a/rpc/client/mock/abci.go +++ b/rpc/client/mock/abci.go @@ -38,7 +38,7 @@ func (a ABCIApp) ABCIQueryWithOptions(path string, data data.Bytes, opts client. func (a ABCIApp) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { res := ctypes.ResultBroadcastTxCommit{} res.CheckTx = a.App.CheckTx(tx) - if !res.CheckTx.IsOK() { + if res.CheckTx.IsErr() { return &res, nil } res.DeliverTx = a.App.DeliverTx(tx) @@ -48,7 +48,7 @@ func (a ABCIApp) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit func (a ABCIApp) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { c := a.App.CheckTx(tx) // and this gets written in a background thread... - if c.IsOK() { + if !c.IsErr() { go func() { a.App.DeliverTx(tx) }() // nolint: errcheck } return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log, tx.Hash()}, nil @@ -57,7 +57,7 @@ func (a ABCIApp) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error func (a ABCIApp) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { c := a.App.CheckTx(tx) // and this gets written in a background thread... - if c.IsOK() { + if !c.IsErr() { go func() { a.App.DeliverTx(tx) }() // nolint: errcheck } return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log, tx.Hash()}, nil diff --git a/rpc/client/mock/abci_test.go b/rpc/client/mock/abci_test.go index 36a457918..216bd7c2b 100644 --- a/rpc/client/mock/abci_test.go +++ b/rpc/client/mock/abci_test.go @@ -37,8 +37,8 @@ func TestABCIMock(t *testing.T) { BroadcastCommit: mock.Call{ Args: goodTx, Response: &ctypes.ResultBroadcastTxCommit{ - CheckTx: abci.Result{Data: data.Bytes("stand")}, - DeliverTx: abci.Result{Data: data.Bytes("deliver")}, + CheckTx: abci.ResponseCheckTx{Data: data.Bytes("stand")}, + DeliverTx: abci.ResponseDeliverTx{Data: data.Bytes("deliver")}, }, Error: errors.New("bad tx"), }, diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index c68276354..2f449cf99 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -1,6 +1,7 @@ package client_test import ( + "fmt" "strings" "testing" @@ -104,7 +105,7 @@ func TestABCIQuery(t *testing.T) { k, v, tx := MakeTxKV() bres, err := c.BroadcastTxCommit(tx) require.Nil(t, err, "%d: %+v", i, err) - apph := bres.Height + 1 // this is where the tx will be applied to the state + apph := int(bres.Height) + 1 // this is where the tx will be applied to the state // wait before querying client.WaitForHeight(c, apph, nil) @@ -136,7 +137,7 @@ func TestAppCalls(t *testing.T) { bres, err := c.BroadcastTxCommit(tx) require.Nil(err, "%d: %+v", i, err) require.True(bres.DeliverTx.Code.IsOK()) - txh := bres.Height + txh := int(bres.Height) apph := txh + 1 // this is where the tx will be applied to the state // wait before querying @@ -153,7 +154,7 @@ func TestAppCalls(t *testing.T) { // ptx, err := c.Tx(bres.Hash, true) ptx, err := c.Tx(bres.Hash, true) require.Nil(err, "%d: %+v", i, err) - assert.Equal(txh, ptx.Height) + assert.EqualValues(txh, ptx.Height) assert.EqualValues(tx, ptx.Tx) // and we can even check the block is added @@ -280,9 +281,9 @@ func TestTx(t *testing.T) { require.NotNil(err) } else { require.Nil(err, "%+v", err) - assert.Equal(txHeight, ptx.Height) + assert.EqualValues(txHeight, ptx.Height) assert.EqualValues(tx, ptx.Tx) - assert.Equal(0, ptx.Index) + assert.Zero(ptx.Index) assert.True(ptx.TxResult.Code.IsOK()) // time to verify the proof @@ -294,3 +295,50 @@ func TestTx(t *testing.T) { } } } + +func TestTxSearch(t *testing.T) { + // first we broadcast a tx + c := getHTTPClient() + _, _, tx := MakeTxKV() + bres, err := c.BroadcastTxCommit(tx) + require.Nil(t, err, "%+v", err) + + txHeight := bres.Height + txHash := bres.Hash + + anotherTxHash := types.Tx("a different tx").Hash() + + for i, c := range GetClients() { + t.Logf("client %d", i) + + // now we query for the tx. + // since there's only one tx, we know index=0. + results, err := c.TxSearch(fmt.Sprintf("tx.hash='%v'", txHash), true) + require.Nil(t, err, "%+v", err) + require.Len(t, results, 1) + + ptx := results[0] + assert.EqualValues(t, txHeight, ptx.Height) + assert.EqualValues(t, tx, ptx.Tx) + assert.Zero(t, ptx.Index) + assert.True(t, ptx.TxResult.Code.IsOK()) + + // time to verify the proof + proof := ptx.Proof + if assert.EqualValues(t, tx, proof.Data) { + assert.True(t, proof.Proof.Verify(proof.Index, proof.Total, txHash, proof.RootHash)) + } + + // we query for non existing tx + results, err = c.TxSearch(fmt.Sprintf("tx.hash='%X'", anotherTxHash), false) + require.Nil(t, err, "%+v", err) + require.Len(t, results, 0) + + // we query using a tag (see dummy application) + results, err = c.TxSearch("app.creator='jae'", false) + require.Nil(t, err, "%+v", err) + if len(results) == 0 { + t.Fatal("expected a lot of transactions") + } + } +} diff --git a/rpc/core/abci.go b/rpc/core/abci.go index 564c0bc63..a64c3d297 100644 --- a/rpc/core/abci.go +++ b/rpc/core/abci.go @@ -93,5 +93,5 @@ func ABCIInfo() (*ctypes.ResultABCIInfo, error) { if err != nil { return nil, err } - return &ctypes.ResultABCIInfo{resInfo}, nil + return &ctypes.ResultABCIInfo{*resInfo}, nil } diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 382b2f556..c2e5d2f9b 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -154,7 +154,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() deliverTxResCh := make(chan interface{}) - q := types.EventQueryTx(tx) + q := types.EventQueryTxFor(tx) err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh) if err != nil { err = errors.Wrap(err, "failed to subscribe to tx") @@ -177,8 +177,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { if checkTxR.Code != abci.CodeType_OK { // CheckTx failed! return &ctypes.ResultBroadcastTxCommit{ - CheckTx: checkTxR.Result(), - DeliverTx: abci.Result{}, + CheckTx: *checkTxR, + DeliverTx: abci.ResponseDeliverTx{}, Hash: tx.Hash(), }, nil } @@ -191,28 +191,22 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { case deliverTxResMsg := <-deliverTxResCh: deliverTxRes := deliverTxResMsg.(types.TMEventData).Unwrap().(types.EventDataTx) // The tx was included in a block. - deliverTxR := &abci.ResponseDeliverTx{ - Code: deliverTxRes.Code, - Data: deliverTxRes.Data, - Log: deliverTxRes.Log, - } + deliverTxR := deliverTxRes.Result logger.Info("DeliverTx passed ", "tx", data.Bytes(tx), "response", deliverTxR) return &ctypes.ResultBroadcastTxCommit{ - CheckTx: checkTxR.Result(), - DeliverTx: deliverTxR.Result(), + CheckTx: *checkTxR, + DeliverTx: deliverTxR, Hash: tx.Hash(), Height: deliverTxRes.Height, }, nil case <-timer.C: logger.Error("failed to include tx") return &ctypes.ResultBroadcastTxCommit{ - CheckTx: checkTxR.Result(), - DeliverTx: abci.Result{}, + CheckTx: *checkTxR, + DeliverTx: abci.ResponseDeliverTx{}, Hash: tx.Hash(), }, fmt.Errorf("Timed out waiting for transaction to be included in a block") } - - panic("Should never happen!") } // Get unconfirmed transactions including their number. diff --git a/rpc/core/routes.go b/rpc/core/routes.go index a4328f1d2..111c010a2 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -19,6 +19,7 @@ var Routes = map[string]*rpc.RPCFunc{ "block": rpc.NewRPCFunc(Block, "height"), "commit": rpc.NewRPCFunc(Commit, "height"), "tx": rpc.NewRPCFunc(Tx, "hash,prove"), + "tx_search": rpc.NewRPCFunc(TxSearch, "query,prove"), "validators": rpc.NewRPCFunc(Validators, "height"), "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""), "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, ""), diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 03a911e2c..b69735911 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -6,6 +6,7 @@ import ( ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/types" + tmquery "github.com/tendermint/tmlibs/pubsub/query" ) // Tx allows you to query the transaction results. `nil` could mean the @@ -82,20 +83,123 @@ func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { return nil, fmt.Errorf("Tx (%X) not found", hash) } - height := int(r.Height) // XXX - index := int(r.Index) + height := r.Height + index := r.Index var proof types.TxProof if prove { - block := blockStore.LoadBlock(height) - proof = block.Data.Txs.Proof(index) + // TODO: handle overflow + block := blockStore.LoadBlock(int(height)) + proof = block.Data.Txs.Proof(int(index)) } return &ctypes.ResultTx{ Height: height, Index: index, - TxResult: r.Result.Result(), + TxResult: r.Result, Tx: r.Tx, Proof: proof, }, nil } + +// TxSearch allows you to query for multiple transactions results. +// +// ```shell +// curl "localhost:46657/tx_search?query=\"account.owner='Ivan'\"&prove=true" +// ``` +// +// ```go +// client := client.NewHTTP("tcp://0.0.0.0:46657", "/websocket") +// q, err := tmquery.New("account.owner='Ivan'") +// tx, err := client.TxSearch(q, true) +// ``` +// +// > The above command returns JSON structured like this: +// +// ```json +// { +// "result": [ +// { +// "proof": { +// "Proof": { +// "aunts": [ +// "J3LHbizt806uKnABNLwG4l7gXCA=", +// "iblMO/M1TnNtlAefJyNCeVhjAb0=", +// "iVk3ryurVaEEhdeS0ohAJZ3wtB8=", +// "5hqMkTeGqpct51ohX0lZLIdsn7Q=", +// "afhsNxFnLlZgFDoyPpdQSe0bR8g=" +// ] +// }, +// "Data": "mvZHHa7HhZ4aRT0xMDA=", +// "RootHash": "F6541223AA46E428CB1070E9840D2C3DF3B6D776", +// "Total": 32, +// "Index": 31 +// }, +// "tx": "mvZHHa7HhZ4aRT0xMDA=", +// "tx_result": {}, +// "index": 31, +// "height": 12 +// } +// ], +// "id": "", +// "jsonrpc": "2.0" +// } +// ``` +// +// Returns transactions matching the given query. +// +// ### Query Parameters +// +// | Parameter | Type | Default | Required | Description | +// |-----------+--------+---------+----------+-----------------------------------------------------------| +// | query | string | "" | true | Query | +// | prove | bool | false | false | Include proofs of the transactions inclusion in the block | +// +// ### Returns +// +// - `proof`: the `types.TxProof` object +// - `tx`: `[]byte` - the transaction +// - `tx_result`: the `abci.Result` object +// - `index`: `int` - index of the transaction +// - `height`: `int` - height of the block where this transaction was in +func TxSearch(query string, prove bool) ([]*ctypes.ResultTx, error) { + // if index is disabled, return error + if _, ok := txIndexer.(*null.TxIndex); ok { + return nil, fmt.Errorf("Transaction indexing is disabled.") + } + + q, err := tmquery.New(query) + if err != nil { + return nil, err + } + + results, err := txIndexer.Search(q) + if err != nil { + return nil, err + } + + // TODO: we may want to consider putting a maximum on this length and somehow + // informing the user that things were truncated. + apiResults := make([]*ctypes.ResultTx, len(results)) + var proof types.TxProof + for i, r := range results { + height := r.Height + index := r.Index + + if prove { + // TODO: handle overflow + block := blockStore.LoadBlock(int(height)) + proof = block.Data.Txs.Proof(int(index)) + } + + apiResults[i] = &ctypes.ResultTx{ + Height: height, + Index: index, + TxResult: r.Result, + Tx: r.Tx, + Proof: proof, + } + } + + return apiResults, nil +} diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 8aa904fe5..a1b7e36fb 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -104,18 +104,18 @@ type ResultBroadcastTx struct { } type ResultBroadcastTxCommit struct { - CheckTx abci.Result `json:"check_tx"` - DeliverTx abci.Result `json:"deliver_tx"` - Hash data.Bytes `json:"hash"` - Height int `json:"height"` + CheckTx abci.ResponseCheckTx `json:"check_tx"` + DeliverTx abci.ResponseDeliverTx `json:"deliver_tx"` + Hash data.Bytes `json:"hash"` + Height uint64 `json:"height"` } type ResultTx struct { - Height int `json:"height"` - Index int `json:"index"` - TxResult abci.Result `json:"tx_result"` - Tx types.Tx `json:"tx"` - Proof types.TxProof `json:"proof,omitempty"` + Height uint64 `json:"height"` + Index uint32 `json:"index"` + TxResult abci.ResponseDeliverTx `json:"tx_result"` + Tx types.Tx `json:"tx"` + Proof types.TxProof `json:"proof,omitempty"` } type ResultUnconfirmedTxs struct { diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index f6526011f..73da30ad6 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -80,6 +80,7 @@ func GetConfig() *cfg.Config { globalConfig.P2P.ListenAddress = tm globalConfig.RPC.ListenAddress = rpc globalConfig.RPC.GRPCListenAddress = grpc + globalConfig.TxIndex.IndexTags = "app.creator" // see dummy application } return globalConfig } diff --git a/state/execution.go b/state/execution.go index 6c74f7a9e..3622a663c 100644 --- a/state/execution.go +++ b/state/execution.go @@ -8,7 +8,6 @@ import ( abci "github.com/tendermint/abci/types" crypto "github.com/tendermint/go-crypto" "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" @@ -54,36 +53,31 @@ func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn p // TODO: make use of this info // Blocks may include invalid txs. // reqDeliverTx := req.(abci.RequestDeliverTx) - txError := "" txResult := r.DeliverTx if txResult.Code == abci.CodeType_OK { validTxs++ } else { logger.Debug("Invalid tx", "code", txResult.Code, "log", txResult.Log) invalidTxs++ - txError = txResult.Code.String() } - abciResponses.DeliverTx[txIndex] = txResult - txIndex++ - // NOTE: if we count we can access the tx from the block instead of // pulling it from the req - event := types.EventDataTx{ - Height: block.Height, + txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{ + Height: uint64(block.Height), + Index: uint32(txIndex), Tx: types.Tx(req.GetDeliverTx().Tx), - Data: txResult.Data, - Code: txResult.Code, - Log: txResult.Log, - Error: txError, - } - txEventPublisher.PublishEventTx(event) + Result: *txResult, + }}) + + abciResponses.DeliverTx[txIndex] = txResult + txIndex++ } } proxyAppConn.SetResponseCallback(proxyCb) // Begin block - err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{ + _, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{ block.Hash(), types.TM2PB.Header(block.Header), }) @@ -101,7 +95,7 @@ func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn p } // End block - abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(uint64(block.Height)) + abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{uint64(block.Height)}) if err != nil { logger.Error("Error in proxyAppConn.EndBlock", "err", err) return nil, err @@ -210,7 +204,6 @@ func (s *State) validateBlock(block *types.Block) error { //----------------------------------------------------------------------------- // ApplyBlock validates & executes the block, updates state w/ ABCI responses, // then commits and updates the mempool atomically, then saves state. -// Transaction results are optionally indexed. // ApplyBlock validates the block against the state, executes it against the app, // commits it, and saves the block and state. It's the only function that needs to be called @@ -225,9 +218,6 @@ func (s *State) ApplyBlock(txEventPublisher types.TxEventPublisher, proxyAppConn fail.Fail() // XXX - // index txs. This could run in the background - s.indexTxs(abciResponses) - // save the results before we commit s.SaveABCIResponses(abciResponses) @@ -258,7 +248,11 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl defer mempool.Unlock() // Commit block, get hash back - res := proxyAppConn.CommitSync() + res, err := proxyAppConn.CommitSync() + if err != nil { + s.logger.Error("Client error during proxyAppConn.CommitSync", "err", err) + return err + } if res.IsErr() { s.logger.Error("Error in proxyAppConn.CommitSync", "err", res) return res @@ -276,26 +270,6 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl return mempool.Update(block.Height, block.Txs) } -func (s *State) indexTxs(abciResponses *ABCIResponses) { - // save the tx results using the TxIndexer - // NOTE: these may be overwriting, but the values should be the same. - batch := txindex.NewBatch(len(abciResponses.DeliverTx)) - for i, d := range abciResponses.DeliverTx { - tx := abciResponses.txs[i] - if err := batch.Add(types.TxResult{ - Height: uint64(abciResponses.Height), - Index: uint32(i), - Tx: tx, - Result: *d, - }); err != nil { - s.logger.Error("Error with batch.Add", "err", err) - } - } - if err := s.TxIndexer.AddBatch(batch); err != nil { - s.logger.Error("Error adding batch", "err", err) - } -} - // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state. // It returns the application root hash (result of abci.Commit). func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger) ([]byte, error) { @@ -305,7 +279,11 @@ func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block return nil, err } // Commit block, get hash back - res := appConnConsensus.CommitSync() + res, err := appConnConsensus.CommitSync() + if err != nil { + logger.Error("Client error during proxyAppConn.CommitSync", "err", res) + return nil, err + } if res.IsErr() { logger.Error("Error in proxyAppConn.CommitSync", "err", res) return nil, res diff --git a/state/execution_test.go b/state/execution_test.go index 5b9bf168a..e54d983d1 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -3,13 +3,11 @@ package state import ( "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/abci/example/dummy" crypto "github.com/tendermint/go-crypto" "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" @@ -31,8 +29,6 @@ func TestApplyBlock(t *testing.T) { state := state() state.SetLogger(log.TestingLogger()) - indexer := &dummyIndexer{0} - state.TxIndexer = indexer // make block block := makeBlock(1, state) @@ -40,7 +36,6 @@ func TestApplyBlock(t *testing.T) { err = state.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), types.MockMempool{}) require.Nil(t, err) - assert.Equal(t, nTxsPerBlock, indexer.Indexed) // test indexing works // TODO check state and mempool } @@ -75,16 +70,3 @@ func makeBlock(num int, state *State) *types.Block { prevBlockID, valHash, state.AppHash, testPartSize) return block } - -// dummyIndexer increments counter every time we index transaction. -type dummyIndexer struct { - Indexed int -} - -func (indexer *dummyIndexer) Get(hash []byte) (*types.TxResult, error) { - return nil, nil -} -func (indexer *dummyIndexer) AddBatch(batch *txindex.Batch) error { - indexer.Indexed += batch.Size() - return nil -} diff --git a/state/state.go b/state/state.go index 4241f9de6..e1f168350 100644 --- a/state/state.go +++ b/state/state.go @@ -15,8 +15,6 @@ import ( wire "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/state/txindex" - "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/types" ) @@ -61,9 +59,6 @@ type State struct { // AppHash is updated after Commit AppHash []byte - // TxIndexer indexes transactions - TxIndexer txindex.TxIndexer `json:"-"` - logger log.Logger } @@ -95,7 +90,7 @@ func loadState(db dbm.DB, key []byte) *State { return nil } - s := &State{db: db, TxIndexer: &null.TxIndex{}} + s := &State{db: db} r, n, err := bytes.NewReader(buf), new(int), new(error) wire.ReadBinaryPtr(&s, r, 0, n, err) if *err != nil { @@ -114,8 +109,6 @@ func (s *State) SetLogger(l log.Logger) { } // Copy makes a copy of the State for mutating. -// NOTE: Does not create a copy of TxIndexer. It creates a new pointer that points to the same -// underlying TxIndexer. func (s *State) Copy() *State { return &State{ db: s.db, @@ -125,7 +118,6 @@ func (s *State) Copy() *State { Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), AppHash: s.AppHash, - TxIndexer: s.TxIndexer, LastHeightValidatorsChanged: s.LastHeightValidatorsChanged, logger: s.logger, ChainID: s.ChainID, @@ -287,7 +279,7 @@ type ABCIResponses struct { Height int DeliverTx []*abci.ResponseDeliverTx - EndBlock abci.ResponseEndBlock + EndBlock *abci.ResponseEndBlock txs types.Txs // reference for indexing results by hash } @@ -368,7 +360,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) { } } - // we do not need indexer during replay and in tests return &State{ db: db, @@ -381,7 +372,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) { Validators: types.NewValidatorSet(validators), LastValidators: types.NewValidatorSet(nil), AppHash: genDoc.AppHash, - TxIndexer: &null.TxIndex{}, LastHeightValidatorsChanged: 1, }, nil } diff --git a/state/state_test.go b/state/state_test.go index 7bb43afa2..7fff07744 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -78,9 +78,9 @@ func TestABCIResponsesSaveLoad(t *testing.T) { // build mock responses block := makeBlock(2, state) abciResponses := NewABCIResponses(block) - abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo")} - abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok"} - abciResponses.EndBlock = abci.ResponseEndBlock{Diffs: []*abci.Validator{ + abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo"), Tags: []*abci.KVPair{}} + abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok", Tags: []*abci.KVPair{}} + abciResponses.EndBlock = &abci.ResponseEndBlock{Diffs: []*abci.Validator{ { PubKey: crypto.GenPrivKeyEd25519().PubKey().Bytes(), Power: 10, @@ -198,12 +198,13 @@ func makeHeaderPartsResponses(state *State, height int, block := makeBlock(height, state) _, val := state.Validators.GetByIndex(0) abciResponses := &ABCIResponses{ - Height: height, + Height: height, + EndBlock: &abci.ResponseEndBlock{Diffs: []*abci.Validator{}}, } // if the pubkey is new, remove the old and add the new if !bytes.Equal(pubkey.Bytes(), val.PubKey.Bytes()) { - abciResponses.EndBlock = abci.ResponseEndBlock{ + abciResponses.EndBlock = &abci.ResponseEndBlock{ Diffs: []*abci.Validator{ {val.PubKey.Bytes(), 0}, {pubkey.Bytes(), 10}, diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go index 039460a16..bd51fbb29 100644 --- a/state/txindex/indexer.go +++ b/state/txindex/indexer.go @@ -4,20 +4,24 @@ import ( "errors" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmlibs/pubsub/query" ) // TxIndexer interface defines methods to index and search transactions. type TxIndexer interface { - // AddBatch analyzes, indexes or stores a batch of transactions. - // NOTE: We do not specify Index method for analyzing a single transaction - // here because it bears heavy performance losses. Almost all advanced indexers - // support batching. + // AddBatch analyzes, indexes and stores a batch of transactions. AddBatch(b *Batch) error + // Index analyzes, indexes and stores a single transaction. + Index(result *types.TxResult) error + // Get returns the transaction specified by hash or nil if the transaction is not indexed // or stored. Get(hash []byte) (*types.TxResult, error) + + // Search allows you to query for transactions. + Search(q *query.Query) ([]*types.TxResult, error) } //---------------------------------------------------- @@ -26,18 +30,18 @@ type TxIndexer interface { // Batch groups together multiple Index operations to be performed at the same time. // NOTE: Batch is NOT thread-safe and must not be modified after starting its execution. type Batch struct { - Ops []types.TxResult + Ops []*types.TxResult } // NewBatch creates a new Batch. func NewBatch(n int) *Batch { return &Batch{ - Ops: make([]types.TxResult, n), + Ops: make([]*types.TxResult, n), } } // Add or update an entry for the given result.Index. -func (b *Batch) Add(result types.TxResult) error { +func (b *Batch) Add(result *types.TxResult) error { b.Ops[result.Index] = result return nil } diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go new file mode 100644 index 000000000..3e5fab127 --- /dev/null +++ b/state/txindex/indexer_service.go @@ -0,0 +1,49 @@ +package txindex + +import ( + "context" + + "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tmlibs/common" +) + +const ( + subscriber = "IndexerService" +) + +type IndexerService struct { + cmn.BaseService + + idr TxIndexer + eventBus *types.EventBus +} + +func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService { + is := &IndexerService{idr: idr, eventBus: eventBus} + is.BaseService = *cmn.NewBaseService(nil, "IndexerService", is) + return is +} + +// OnStart implements cmn.Service by subscribing for all transactions +// and indexing them by tags. +func (is *IndexerService) OnStart() error { + ch := make(chan interface{}) + if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, ch); err != nil { + return err + } + go func() { + for event := range ch { + // TODO: may be not perfomant to write one event at a time + txResult := event.(types.TMEventData).Unwrap().(types.EventDataTx).TxResult + is.idr.Index(&txResult) + } + }() + return nil +} + +// OnStop implements cmn.Service by unsubscribing from all transactions. +func (is *IndexerService) OnStop() { + if is.eventBus.IsRunning() { + _ = is.eventBus.UnsubscribeAll(context.Background(), subscriber) + } +} diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index db075e547..5ca4d0628 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -2,25 +2,57 @@ package kv import ( "bytes" + "encoding/hex" "fmt" + "strconv" + "strings" + "time" - "github.com/tendermint/go-wire" - - db "github.com/tendermint/tmlibs/db" + "github.com/pkg/errors" + abci "github.com/tendermint/abci/types" + wire "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tmlibs/common" + db "github.com/tendermint/tmlibs/db" + "github.com/tendermint/tmlibs/pubsub/query" ) -// TxIndex is the simplest possible indexer, backed by Key-Value storage (levelDB). -// It can only index transaction by its identifier. +const ( + tagKeySeparator = "/" +) + +var _ txindex.TxIndexer = (*TxIndex)(nil) + +// TxIndex is the simplest possible indexer, backed by key-value storage (levelDB). type TxIndex struct { - store db.DB + store db.DB + tagsToIndex []string + indexAllTags bool +} + +// NewTxIndex creates new KV indexer. +func NewTxIndex(store db.DB, options ...func(*TxIndex)) *TxIndex { + txi := &TxIndex{store: store, tagsToIndex: make([]string, 0), indexAllTags: false} + for _, o := range options { + o(txi) + } + return txi +} + +// IndexTags is an option for setting which tags to index. +func IndexTags(tags []string) func(*TxIndex) { + return func(txi *TxIndex) { + txi.tagsToIndex = tags + } } -// NewTxIndex returns new instance of TxIndex. -func NewTxIndex(store db.DB) *TxIndex { - return &TxIndex{store: store} +// IndexAllTags is an option for indexing all tags. +func IndexAllTags() func(*TxIndex) { + return func(txi *TxIndex) { + txi.indexAllTags = true + } } // Get gets transaction from the TxIndex storage and returns it or nil if the @@ -46,13 +78,328 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { return txResult, nil } -// AddBatch writes a batch of transactions into the TxIndex storage. +// AddBatch indexes a batch of transactions using the given list of tags. func (txi *TxIndex) AddBatch(b *txindex.Batch) error { storeBatch := txi.store.NewBatch() + for _, result := range b.Ops { - rawBytes := wire.BinaryBytes(&result) - storeBatch.Set(result.Tx.Hash(), rawBytes) + hash := result.Tx.Hash() + + // index tx by tags + for _, tag := range result.Result.Tags { + if txi.indexAllTags || cmn.StringInSlice(tag.Key, txi.tagsToIndex) { + storeBatch.Set(keyForTag(tag, result), hash) + } + } + + // index tx by hash + rawBytes := wire.BinaryBytes(result) + storeBatch.Set(hash, rawBytes) } + storeBatch.Write() return nil } + +// Index indexes a single transaction using the given list of tags. +func (txi *TxIndex) Index(result *types.TxResult) error { + b := txi.store.NewBatch() + + hash := result.Tx.Hash() + + // index tx by tags + for _, tag := range result.Result.Tags { + if txi.indexAllTags || cmn.StringInSlice(tag.Key, txi.tagsToIndex) { + b.Set(keyForTag(tag, result), hash) + } + } + + // index tx by hash + rawBytes := wire.BinaryBytes(result) + b.Set(hash, rawBytes) + + b.Write() + return nil +} + +// Search performs a search using the given query. It breaks the query into +// conditions (like "tx.height > 5"). For each condition, it queries the DB +// index. One special use cases here: (1) if "tx.hash" is found, it returns tx +// result for it (2) for range queries it is better for the client to provide +// both lower and upper bounds, so we are not performing a full scan. Results +// from querying indexes are then intersected and returned to the caller. +func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { + var hashes [][]byte + var hashesInitialized bool + + // get a list of conditions (like "tx.height > 5") + conditions := q.Conditions() + + // if there is a hash condition, return the result immediately + hash, err, ok := lookForHash(conditions) + if err != nil { + return nil, errors.Wrap(err, "error during searching for a hash in the query") + } else if ok { + res, err := txi.Get(hash) + if res == nil { + return []*types.TxResult{}, nil + } else { + return []*types.TxResult{res}, errors.Wrap(err, "error while retrieving the result") + } + } + + // conditions to skip because they're handled before "everything else" + skipIndexes := make([]int, 0) + + // if there is a height condition ("tx.height=3"), extract it for faster lookups + height, heightIndex := lookForHeight(conditions) + if heightIndex >= 0 { + skipIndexes = append(skipIndexes, heightIndex) + } + + // extract ranges + // if both upper and lower bounds exist, it's better to get them in order not + // no iterate over kvs that are not within range. + ranges, rangeIndexes := lookForRanges(conditions) + if len(ranges) > 0 { + skipIndexes = append(skipIndexes, rangeIndexes...) + + for _, r := range ranges { + if !hashesInitialized { + hashes = txi.matchRange(r, startKeyForRange(r, height)) + hashesInitialized = true + } else { + hashes = intersect(hashes, txi.matchRange(r, startKeyForRange(r, height))) + } + } + } + + // for all other conditions + for i, c := range conditions { + if cmn.IntInSlice(i, skipIndexes) { + continue + } + + if !hashesInitialized { + hashes = txi.match(c, startKey(c, height)) + hashesInitialized = true + } else { + hashes = intersect(hashes, txi.match(c, startKey(c, height))) + } + } + + results := make([]*types.TxResult, len(hashes)) + i := 0 + for _, h := range hashes { + results[i], err = txi.Get(h) + if err != nil { + return nil, errors.Wrapf(err, "failed to get Tx{%X}", h) + } + i++ + } + + return results, nil +} + +func lookForHash(conditions []query.Condition) (hash []byte, err error, ok bool) { + for _, c := range conditions { + if c.Tag == types.TxHashKey { + decoded, err := hex.DecodeString(c.Operand.(string)) + return decoded, err, true + } + } + return +} + +func lookForHeight(conditions []query.Condition) (height uint64, index int) { + for i, c := range conditions { + if c.Tag == types.TxHeightKey { + return uint64(c.Operand.(int64)), i + } + } + return 0, -1 +} + +// special map to hold range conditions +// Example: account.number => queryRange{lowerBound: 1, upperBound: 5} +type queryRanges map[string]queryRange + +type queryRange struct { + key string + lowerBound interface{} // int || time.Time + includeLowerBound bool + upperBound interface{} // int || time.Time + includeUpperBound bool +} + +func lookForRanges(conditions []query.Condition) (ranges queryRanges, indexes []int) { + ranges = make(queryRanges) + for i, c := range conditions { + if isRangeOperation(c.Op) { + r, ok := ranges[c.Tag] + if !ok { + r = queryRange{key: c.Tag} + } + switch c.Op { + case query.OpGreater: + r.lowerBound = c.Operand + case query.OpGreaterEqual: + r.includeLowerBound = true + r.lowerBound = c.Operand + case query.OpLess: + r.upperBound = c.Operand + case query.OpLessEqual: + r.includeUpperBound = true + r.upperBound = c.Operand + } + ranges[c.Tag] = r + indexes = append(indexes, i) + } + } + return ranges, indexes +} + +func isRangeOperation(op query.Operator) bool { + switch op { + case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual: + return true + default: + return false + } +} + +func (txi *TxIndex) match(c query.Condition, startKey []byte) (hashes [][]byte) { + if c.Op == query.OpEqual { + it := txi.store.IteratorPrefix(startKey) + defer it.Release() + for it.Next() { + hashes = append(hashes, it.Value()) + } + } else if c.Op == query.OpContains { + // XXX: doing full scan because startKey does not apply here + // For example, if startKey = "account.owner=an" and search query = "accoutn.owner CONSISTS an" + // we can't iterate with prefix "account.owner=an" because we might miss keys like "account.owner=Ulan" + it := txi.store.Iterator() + defer it.Release() + for it.Next() { + if !isTagKey(it.Key()) { + continue + } + if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) { + hashes = append(hashes, it.Value()) + } + } + } else { + panic("other operators should be handled already") + } + return +} + +func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) { + it := txi.store.IteratorPrefix(startKey) + defer it.Release() +LOOP: + for it.Next() { + if !isTagKey(it.Key()) { + continue + } + if r.upperBound != nil { + // no other way to stop iterator other than checking for upperBound + switch (r.upperBound).(type) { + case int64: + v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) + if err == nil && v == r.upperBound { + if r.includeUpperBound { + hashes = append(hashes, it.Value()) + } + break LOOP + } + // XXX: passing time in a ABCI Tags is not yet implemented + // case time.Time: + // v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) + // if v == r.upperBound { + // break + // } + } + } + hashes = append(hashes, it.Value()) + } + return +} + +/////////////////////////////////////////////////////////////////////////////// +// Keys + +func startKey(c query.Condition, height uint64) []byte { + var key string + if height > 0 { + key = fmt.Sprintf("%s/%v/%d", c.Tag, c.Operand, height) + } else { + key = fmt.Sprintf("%s/%v", c.Tag, c.Operand) + } + return []byte(key) +} + +func startKeyForRange(r queryRange, height uint64) []byte { + if r.lowerBound == nil { + return []byte(fmt.Sprintf("%s", r.key)) + } + + var lowerBound interface{} + if r.includeLowerBound { + lowerBound = r.lowerBound + } else { + switch t := r.lowerBound.(type) { + case int64: + lowerBound = t + 1 + case time.Time: + lowerBound = t.Unix() + 1 + default: + panic("not implemented") + } + } + var key string + if height > 0 { + key = fmt.Sprintf("%s/%v/%d", r.key, lowerBound, height) + } else { + key = fmt.Sprintf("%s/%v", r.key, lowerBound) + } + return []byte(key) +} + +func isTagKey(key []byte) bool { + return strings.Count(string(key), tagKeySeparator) == 3 +} + +func extractValueFromKey(key []byte) string { + parts := strings.SplitN(string(key), tagKeySeparator, 3) + return parts[1] +} + +func keyForTag(tag *abci.KVPair, result *types.TxResult) []byte { + switch tag.ValueType { + case abci.KVPair_STRING: + return []byte(fmt.Sprintf("%s/%v/%d/%d", tag.Key, tag.ValueString, result.Height, result.Index)) + case abci.KVPair_INT: + return []byte(fmt.Sprintf("%s/%v/%d/%d", tag.Key, tag.ValueInt, result.Height, result.Index)) + // case abci.KVPair_TIME: + // return []byte(fmt.Sprintf("%s/%d/%d/%d", tag.Key, tag.ValueTime.Unix(), result.Height, result.Index)) + default: + panic(fmt.Sprintf("Undefined value type: %v", tag.ValueType)) + } +} + +/////////////////////////////////////////////////////////////////////////////// +// Utils + +func intersect(as, bs [][]byte) [][]byte { + i := make([][]byte, 0, cmn.MinInt(len(as), len(bs))) + for _, a := range as { + for _, b := range bs { + if bytes.Equal(a, b) { + i = append(i, a) + } + } + } + return i +} diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index 673674b30..efe17a182 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -1,6 +1,7 @@ package kv import ( + "fmt" "io/ioutil" "os" "testing" @@ -11,30 +12,145 @@ import ( "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" db "github.com/tendermint/tmlibs/db" + "github.com/tendermint/tmlibs/pubsub/query" ) func TestTxIndex(t *testing.T) { - indexer := &TxIndex{store: db.NewMemDB()} + indexer := NewTxIndex(db.NewMemDB()) tx := types.Tx("HELLO WORLD") - txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} + txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}} hash := tx.Hash() batch := txindex.NewBatch(1) - if err := batch.Add(*txResult); err != nil { + if err := batch.Add(txResult); err != nil { t.Error(err) } err := indexer.AddBatch(batch) - require.Nil(t, err) + require.NoError(t, err) loadedTxResult, err := indexer.Get(hash) - require.Nil(t, err) + require.NoError(t, err) assert.Equal(t, txResult, loadedTxResult) + + tx2 := types.Tx("BYE BYE WORLD") + txResult2 := &types.TxResult{1, 0, tx2, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}} + hash2 := tx2.Hash() + + err = indexer.Index(txResult2) + require.NoError(t, err) + + loadedTxResult2, err := indexer.Get(hash2) + require.NoError(t, err) + assert.Equal(t, txResult2, loadedTxResult2) +} + +func TestTxSearch(t *testing.T) { + allowedTags := []string{"account.number", "account.owner", "account.date"} + indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags)) + + txResult := txResultWithTags([]*abci.KVPair{ + {Key: "account.number", ValueType: abci.KVPair_INT, ValueInt: 1}, + {Key: "account.owner", ValueType: abci.KVPair_STRING, ValueString: "Ivan"}, + {Key: "not_allowed", ValueType: abci.KVPair_STRING, ValueString: "Vlad"}, + }) + hash := txResult.Tx.Hash() + + err := indexer.Index(txResult) + require.NoError(t, err) + + testCases := []struct { + q string + resultsLength int + }{ + // search by hash + {fmt.Sprintf("tx.hash = '%X'", hash), 1}, + // search by exact match (one tag) + {"account.number = 1", 1}, + // search by exact match (two tags) + {"account.number = 1 AND account.owner = 'Ivan'", 1}, + // search by exact match (two tags) + {"account.number = 1 AND account.owner = 'Vlad'", 0}, + // search by range + {"account.number >= 1 AND account.number <= 5", 1}, + // search by range (lower bound) + {"account.number >= 1", 1}, + // search by range (upper bound) + {"account.number <= 5", 1}, + // search using not allowed tag + {"not_allowed = 'boom'", 0}, + // search for not existing tx result + {"account.number >= 2 AND account.number <= 5", 0}, + // search using not existing tag + {"account.date >= TIME 2013-05-03T14:45:00Z", 0}, + // search using CONTAINS + {"account.owner CONTAINS 'an'", 1}, + // search using CONTAINS + {"account.owner CONTAINS 'Vlad'", 0}, + } + + for _, tc := range testCases { + t.Run(tc.q, func(t *testing.T) { + results, err := indexer.Search(query.MustParse(tc.q)) + assert.NoError(t, err) + + assert.Len(t, results, tc.resultsLength) + if tc.resultsLength > 0 { + assert.Equal(t, []*types.TxResult{txResult}, results) + } + }) + } +} + +func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { + allowedTags := []string{"account.number"} + indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags)) + + txResult := txResultWithTags([]*abci.KVPair{ + {Key: "account.number", ValueType: abci.KVPair_INT, ValueInt: 1}, + {Key: "account.number", ValueType: abci.KVPair_INT, ValueInt: 2}, + }) + + err := indexer.Index(txResult) + require.NoError(t, err) + + results, err := indexer.Search(query.MustParse("account.number >= 1")) + assert.NoError(t, err) + + assert.Len(t, results, 1) + assert.Equal(t, []*types.TxResult{txResult}, results) +} + +func TestIndexAllTags(t *testing.T) { + indexer := NewTxIndex(db.NewMemDB(), IndexAllTags()) + + txResult := txResultWithTags([]*abci.KVPair{ + abci.KVPairString("account.owner", "Ivan"), + abci.KVPairInt("account.number", 1), + }) + + err := indexer.Index(txResult) + require.NoError(t, err) + + results, err := indexer.Search(query.MustParse("account.number >= 1")) + assert.NoError(t, err) + assert.Len(t, results, 1) + assert.Equal(t, []*types.TxResult{txResult}, results) + + results, err = indexer.Search(query.MustParse("account.owner = 'Ivan'")) + assert.NoError(t, err) + assert.Len(t, results, 1) + assert.Equal(t, []*types.TxResult{txResult}, results) +} + +func txResultWithTags(tags []*abci.KVPair) *types.TxResult { + tx := types.Tx("HELLO WORLD") + return &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: tags}} } func benchmarkTxIndex(txsCount int, b *testing.B) { tx := types.Tx("HELLO WORLD") - txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} + txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}} dir, err := ioutil.TempDir("", "tx_index_db") if err != nil { @@ -43,11 +159,11 @@ func benchmarkTxIndex(txsCount int, b *testing.B) { defer os.RemoveAll(dir) // nolint: errcheck store := db.NewDB("tx_index", "leveldb", dir) - indexer := &TxIndex{store: store} + indexer := NewTxIndex(store) batch := txindex.NewBatch(txsCount) for i := 0; i < txsCount; i++ { - if err := batch.Add(*txResult); err != nil { + if err := batch.Add(txResult); err != nil { b.Fatal(err) } txResult.Index += 1 diff --git a/state/txindex/null/null.go b/state/txindex/null/null.go index 4939d6d82..0764faa9e 100644 --- a/state/txindex/null/null.go +++ b/state/txindex/null/null.go @@ -5,8 +5,11 @@ import ( "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmlibs/pubsub/query" ) +var _ txindex.TxIndexer = (*TxIndex)(nil) + // TxIndex acts as a /dev/null. type TxIndex struct{} @@ -19,3 +22,12 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { func (txi *TxIndex) AddBatch(batch *txindex.Batch) error { return nil } + +// Index is a noop and always returns nil. +func (txi *TxIndex) Index(result *types.TxResult) error { + return nil +} + +func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { + return []*types.TxResult{}, nil +} diff --git a/types/event_bus.go b/types/event_bus.go index 85ef14485..6cee1d82b 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + abci "github.com/tendermint/abci/types" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" tmpubsub "github.com/tendermint/tmlibs/pubsub" @@ -67,63 +68,101 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error { //--- block, tx, and vote events -func (b *EventBus) PublishEventNewBlock(block EventDataNewBlock) error { - return b.Publish(EventNewBlock, TMEventData{block}) +func (b *EventBus) PublishEventNewBlock(event EventDataNewBlock) error { + return b.Publish(EventNewBlock, TMEventData{event}) } -func (b *EventBus) PublishEventNewBlockHeader(header EventDataNewBlockHeader) error { - return b.Publish(EventNewBlockHeader, TMEventData{header}) +func (b *EventBus) PublishEventNewBlockHeader(event EventDataNewBlockHeader) error { + return b.Publish(EventNewBlockHeader, TMEventData{event}) } -func (b *EventBus) PublishEventVote(vote EventDataVote) error { - return b.Publish(EventVote, TMEventData{vote}) +func (b *EventBus) PublishEventVote(event EventDataVote) error { + return b.Publish(EventVote, TMEventData{event}) } -func (b *EventBus) PublishEventTx(tx EventDataTx) error { +// PublishEventTx publishes tx event with tags from Result. Note it will add +// predefined tags (EventTypeKey, TxHashKey). Existing tags with the same names +// will be overwritten. +func (b *EventBus) PublishEventTx(event EventDataTx) error { // no explicit deadline for publishing events ctx := context.Background() - b.pubsub.PublishWithTags(ctx, TMEventData{tx}, map[string]interface{}{EventTypeKey: EventTx, TxHashKey: fmt.Sprintf("%X", tx.Tx.Hash())}) + + tags := make(map[string]interface{}) + + // validate and fill tags from tx result + for _, tag := range event.Result.Tags { + // basic validation + if tag.Key == "" { + b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", event.Tx) + continue + } + + switch tag.ValueType { + case abci.KVPair_STRING: + tags[tag.Key] = tag.ValueString + case abci.KVPair_INT: + tags[tag.Key] = tag.ValueInt + } + } + + // add predefined tags + logIfTagExists(EventTypeKey, tags, b.Logger) + tags[EventTypeKey] = EventTx + + logIfTagExists(TxHashKey, tags, b.Logger) + tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash()) + + logIfTagExists(TxHeightKey, tags, b.Logger) + tags[TxHeightKey] = event.Height + + b.pubsub.PublishWithTags(ctx, TMEventData{event}, tags) return nil } -func (b *EventBus) PublishEventProposalHeartbeat(ph EventDataProposalHeartbeat) error { - return b.Publish(EventProposalHeartbeat, TMEventData{ph}) +func (b *EventBus) PublishEventProposalHeartbeat(event EventDataProposalHeartbeat) error { + return b.Publish(EventProposalHeartbeat, TMEventData{event}) } //--- EventDataRoundState events -func (b *EventBus) PublishEventNewRoundStep(rs EventDataRoundState) error { - return b.Publish(EventNewRoundStep, TMEventData{rs}) +func (b *EventBus) PublishEventNewRoundStep(event EventDataRoundState) error { + return b.Publish(EventNewRoundStep, TMEventData{event}) +} + +func (b *EventBus) PublishEventTimeoutPropose(event EventDataRoundState) error { + return b.Publish(EventTimeoutPropose, TMEventData{event}) } -func (b *EventBus) PublishEventTimeoutPropose(rs EventDataRoundState) error { - return b.Publish(EventTimeoutPropose, TMEventData{rs}) +func (b *EventBus) PublishEventTimeoutWait(event EventDataRoundState) error { + return b.Publish(EventTimeoutWait, TMEventData{event}) } -func (b *EventBus) PublishEventTimeoutWait(rs EventDataRoundState) error { - return b.Publish(EventTimeoutWait, TMEventData{rs}) +func (b *EventBus) PublishEventNewRound(event EventDataRoundState) error { + return b.Publish(EventNewRound, TMEventData{event}) } -func (b *EventBus) PublishEventNewRound(rs EventDataRoundState) error { - return b.Publish(EventNewRound, TMEventData{rs}) +func (b *EventBus) PublishEventCompleteProposal(event EventDataRoundState) error { + return b.Publish(EventCompleteProposal, TMEventData{event}) } -func (b *EventBus) PublishEventCompleteProposal(rs EventDataRoundState) error { - return b.Publish(EventCompleteProposal, TMEventData{rs}) +func (b *EventBus) PublishEventPolka(event EventDataRoundState) error { + return b.Publish(EventPolka, TMEventData{event}) } -func (b *EventBus) PublishEventPolka(rs EventDataRoundState) error { - return b.Publish(EventPolka, TMEventData{rs}) +func (b *EventBus) PublishEventUnlock(event EventDataRoundState) error { + return b.Publish(EventUnlock, TMEventData{event}) } -func (b *EventBus) PublishEventUnlock(rs EventDataRoundState) error { - return b.Publish(EventUnlock, TMEventData{rs}) +func (b *EventBus) PublishEventRelock(event EventDataRoundState) error { + return b.Publish(EventRelock, TMEventData{event}) } -func (b *EventBus) PublishEventRelock(rs EventDataRoundState) error { - return b.Publish(EventRelock, TMEventData{rs}) +func (b *EventBus) PublishEventLock(event EventDataRoundState) error { + return b.Publish(EventLock, TMEventData{event}) } -func (b *EventBus) PublishEventLock(rs EventDataRoundState) error { - return b.Publish(EventLock, TMEventData{rs}) +func logIfTagExists(tag string, tags map[string]interface{}, logger log.Logger) { + if value, ok := tags[tag]; ok { + logger.Error("Found predefined tag (value will be overwritten)", "tag", tag, "value", value) + } } diff --git a/types/events.go b/types/events.go index 64b83ec95..9bf7a5a43 100644 --- a/types/events.go +++ b/types/events.go @@ -3,7 +3,6 @@ package types import ( "fmt" - abci "github.com/tendermint/abci/types" "github.com/tendermint/go-wire/data" tmpubsub "github.com/tendermint/tmlibs/pubsub" tmquery "github.com/tendermint/tmlibs/pubsub/query" @@ -110,12 +109,7 @@ type EventDataNewBlockHeader struct { // All txs fire EventDataTx type EventDataTx struct { - Height int `json:"height"` - Tx Tx `json:"tx"` - Data data.Bytes `json:"data"` - Log string `json:"log"` - Code abci.CodeType `json:"code"` - Error string `json:"error"` // this is redundant information for now + TxResult } type EventDataProposalHeartbeat struct { @@ -142,10 +136,13 @@ type EventDataVote struct { const ( // EventTypeKey is a reserved key, used to specify event type in tags. - EventTypeKey = "tm.events.type" + EventTypeKey = "tm.event" // TxHashKey is a reserved key, used to specify transaction's hash. // see EventBus#PublishEventTx TxHashKey = "tx.hash" + // TxHeightKey is a reserved key, used to specify transaction block's height. + // see EventBus#PublishEventTx + TxHeightKey = "tx.height" ) var ( @@ -167,9 +164,10 @@ var ( EventQueryTimeoutWait = queryForEvent(EventTimeoutWait) EventQueryVote = queryForEvent(EventVote) EventQueryProposalHeartbeat = queryForEvent(EventProposalHeartbeat) + EventQueryTx = queryForEvent(EventTx) ) -func EventQueryTx(tx Tx) tmpubsub.Query { +func EventQueryTxFor(tx Tx) tmpubsub.Query { return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTx, TxHashKey, tx.Hash())) }