diff --git a/node/node.go b/node/node.go index c0e4197b9..5efe39b9d 100644 --- a/node/node.go +++ b/node/node.go @@ -173,20 +173,6 @@ func NewNode(config *cfg.Config, state = sm.LoadState(stateDB) state.SetLogger(stateLogger) - // Transaction indexing - var txIndexer txindex.TxIndexer - switch config.TxIndex.Indexer { - case "kv": - store, err := dbProvider(&DBContext{"tx_index", config}) - if err != nil { - return nil, err - } - txIndexer = kv.NewTxIndex(store) - default: - txIndexer = &null.TxIndex{} - } - state.TxIndexer = txIndexer - // Generate node PrivKey privKey := crypto.GenPrivKeyEd25519() @@ -293,6 +279,30 @@ func NewNode(config *cfg.Config, bcReactor.SetEventBus(eventBus) consensusReactor.SetEventBus(eventBus) + // Transaction indexing + var txIndexer txindex.TxIndexer + switch config.TxIndex.Indexer { + case "kv": + store, err := dbProvider(&DBContext{"tx_index", config}) + if err != nil { + return nil, err + } + txIndexer = kv.NewTxIndex(store) + default: + txIndexer = &null.TxIndex{} + } + + // subscribe for all transactions and index them by tags + ch := make(chan interface{}) + eventBus.Subscribe(context.Background(), "tx_index", types.EventQueryTx, ch) + go func() { + for event := range ch { + // XXX: may be not perfomant to write one event at a time + txResult := event.(types.TMEventData).Unwrap().(types.EventDataTx).TxResult + txIndexer.Index(&txResult) + } + }() + // run the profile server profileHost := config.ProfListenAddress if profileHost != "" { diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index 9619e5c05..963282294 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -100,7 +100,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { require.True(ok, "%d: %#v", i, evt) // make sure this is the proper tx require.EqualValues(tx, txe.Tx) - require.True(txe.Code.IsOK()) + require.True(txe.Result.Code.IsOK()) } } @@ -132,6 +132,6 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { require.True(ok, "%d: %#v", i, evt) // make sure this is the proper tx require.EqualValues(tx, txe.Tx) - require.True(txe.Code.IsOK()) + require.True(txe.Result.Code.IsOK()) } } diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index c68276354..b6b3d9e2d 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -104,7 +104,7 @@ func TestABCIQuery(t *testing.T) { k, v, tx := MakeTxKV() bres, err := c.BroadcastTxCommit(tx) require.Nil(t, err, "%d: %+v", i, err) - apph := bres.Height + 1 // this is where the tx will be applied to the state + apph := int(bres.Height) + 1 // this is where the tx will be applied to the state // wait before querying client.WaitForHeight(c, apph, nil) @@ -136,7 +136,7 @@ func TestAppCalls(t *testing.T) { bres, err := c.BroadcastTxCommit(tx) require.Nil(err, "%d: %+v", i, err) require.True(bres.DeliverTx.Code.IsOK()) - txh := bres.Height + txh := int(bres.Height) apph := txh + 1 // this is where the tx will be applied to the state // wait before querying @@ -153,7 +153,7 @@ func TestAppCalls(t *testing.T) { // ptx, err := c.Tx(bres.Hash, true) ptx, err := c.Tx(bres.Hash, true) require.Nil(err, "%d: %+v", i, err) - assert.Equal(txh, ptx.Height) + assert.EqualValues(txh, ptx.Height) assert.EqualValues(tx, ptx.Tx) // and we can even check the block is added @@ -280,9 +280,9 @@ func TestTx(t *testing.T) { require.NotNil(err) } else { require.Nil(err, "%+v", err) - assert.Equal(txHeight, ptx.Height) + assert.EqualValues(txHeight, ptx.Height) assert.EqualValues(tx, ptx.Tx) - assert.Equal(0, ptx.Index) + assert.Zero(ptx.Index) assert.True(ptx.TxResult.Code.IsOK()) // time to verify the proof diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 382b2f556..88c5bd2b4 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -154,7 +154,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() deliverTxResCh := make(chan interface{}) - q := types.EventQueryTx(tx) + q := types.EventQueryTxFor(tx) err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh) if err != nil { err = errors.Wrap(err, "failed to subscribe to tx") @@ -192,9 +192,9 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { deliverTxRes := deliverTxResMsg.(types.TMEventData).Unwrap().(types.EventDataTx) // The tx was included in a block. deliverTxR := &abci.ResponseDeliverTx{ - Code: deliverTxRes.Code, - Data: deliverTxRes.Data, - Log: deliverTxRes.Log, + Code: deliverTxRes.Result.Code, + Data: deliverTxRes.Result.Data, + Log: deliverTxRes.Result.Log, } logger.Info("DeliverTx passed ", "tx", data.Bytes(tx), "response", deliverTxR) return &ctypes.ResultBroadcastTxCommit{ diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 03a911e2c..dc842e622 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -82,13 +82,13 @@ func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { return nil, fmt.Errorf("Tx (%X) not found", hash) } - height := int(r.Height) // XXX - index := int(r.Index) + height := r.Height + index := r.Index var proof types.TxProof if prove { - block := blockStore.LoadBlock(height) - proof = block.Data.Txs.Proof(index) + block := blockStore.LoadBlock(int(height)) + proof = block.Data.Txs.Proof(int(index)) } return &ctypes.ResultTx{ diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 8aa904fe5..e4c5d8fc7 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -107,12 +107,12 @@ type ResultBroadcastTxCommit struct { CheckTx abci.Result `json:"check_tx"` DeliverTx abci.Result `json:"deliver_tx"` Hash data.Bytes `json:"hash"` - Height int `json:"height"` + Height uint64 `json:"height"` } type ResultTx struct { - Height int `json:"height"` - Index int `json:"index"` + Height uint64 `json:"height"` + Index uint32 `json:"index"` TxResult abci.Result `json:"tx_result"` Tx types.Tx `json:"tx"` Proof types.TxProof `json:"proof,omitempty"` diff --git a/state/execution.go b/state/execution.go index 0033e7f37..be09b2b2e 100644 --- a/state/execution.go +++ b/state/execution.go @@ -8,7 +8,6 @@ import ( abci "github.com/tendermint/abci/types" crypto "github.com/tendermint/go-crypto" "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" @@ -54,47 +53,25 @@ func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn p // TODO: make use of this info // Blocks may include invalid txs. // reqDeliverTx := req.(abci.RequestDeliverTx) - txError := "" txResult := r.DeliverTx if txResult.Code == abci.CodeType_OK { validTxs++ } else { logger.Debug("Invalid tx", "code", txResult.Code, "log", txResult.Log) invalidTxs++ - txError = txResult.Code.String() } - abciResponses.DeliverTx[txIndex] = txResult - txIndex++ - // NOTE: if we count we can access the tx from the block instead of // pulling it from the req - tx := types.Tx(req.GetDeliverTx().Tx) - - tags := make(map[string]interface{}) - for _, t := range txResult.Tags { - // basic validation - if t.Key == "" { - logger.Info("Got tag with an empty key (skipping)", "tag", t, "tx", tx) - continue - } - - if t.ValueString != "" { - tags[t.Key] = t.ValueString - } else { - tags[t.Key] = t.ValueInt - } - } + txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{ + Height: uint64(block.Height), + Index: uint32(txIndex), + Tx: types.Tx(req.GetDeliverTx().Tx), + Result: *txResult, + }}) - txEventPublisher.PublishEventTx(types.EventDataTx{ - Height: block.Height, - Tx: tx, - Data: txResult.Data, - Code: txResult.Code, - Log: txResult.Log, - Tags: tags, - Error: txError, - }) + abciResponses.DeliverTx[txIndex] = txResult + txIndex++ } } proxyAppConn.SetResponseCallback(proxyCb) @@ -227,7 +204,6 @@ func (s *State) validateBlock(block *types.Block) error { //----------------------------------------------------------------------------- // ApplyBlock validates & executes the block, updates state w/ ABCI responses, // then commits and updates the mempool atomically, then saves state. -// Transaction results are optionally indexed. // ApplyBlock validates the block against the state, executes it against the app, // commits it, and saves the block and state. It's the only function that needs to be called @@ -242,9 +218,6 @@ func (s *State) ApplyBlock(txEventPublisher types.TxEventPublisher, proxyAppConn fail.Fail() // XXX - // index txs. This could run in the background - s.indexTxs(abciResponses) - // save the results before we commit s.SaveABCIResponses(abciResponses) @@ -293,26 +266,6 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl return mempool.Update(block.Height, block.Txs) } -func (s *State) indexTxs(abciResponses *ABCIResponses) { - // save the tx results using the TxIndexer - // NOTE: these may be overwriting, but the values should be the same. - batch := txindex.NewBatch(len(abciResponses.DeliverTx)) - for i, d := range abciResponses.DeliverTx { - tx := abciResponses.txs[i] - if err := batch.Add(types.TxResult{ - Height: uint64(abciResponses.Height), - Index: uint32(i), - Tx: tx, - Result: *d, - }); err != nil { - s.logger.Error("Error with batch.Add", "err", err) - } - } - if err := s.TxIndexer.AddBatch(batch); err != nil { - s.logger.Error("Error adding batch", "err", err) - } -} - // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state. // It returns the application root hash (result of abci.Commit). func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger) ([]byte, error) { diff --git a/state/execution_test.go b/state/execution_test.go index 5b9bf168a..e54d983d1 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -3,13 +3,11 @@ package state import ( "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/abci/example/dummy" crypto "github.com/tendermint/go-crypto" "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" @@ -31,8 +29,6 @@ func TestApplyBlock(t *testing.T) { state := state() state.SetLogger(log.TestingLogger()) - indexer := &dummyIndexer{0} - state.TxIndexer = indexer // make block block := makeBlock(1, state) @@ -40,7 +36,6 @@ func TestApplyBlock(t *testing.T) { err = state.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), types.MockMempool{}) require.Nil(t, err) - assert.Equal(t, nTxsPerBlock, indexer.Indexed) // test indexing works // TODO check state and mempool } @@ -75,16 +70,3 @@ func makeBlock(num int, state *State) *types.Block { prevBlockID, valHash, state.AppHash, testPartSize) return block } - -// dummyIndexer increments counter every time we index transaction. -type dummyIndexer struct { - Indexed int -} - -func (indexer *dummyIndexer) Get(hash []byte) (*types.TxResult, error) { - return nil, nil -} -func (indexer *dummyIndexer) AddBatch(batch *txindex.Batch) error { - indexer.Indexed += batch.Size() - return nil -} diff --git a/state/state.go b/state/state.go index 4241f9de6..1c2b3efeb 100644 --- a/state/state.go +++ b/state/state.go @@ -15,8 +15,6 @@ import ( wire "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/state/txindex" - "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/types" ) @@ -61,9 +59,6 @@ type State struct { // AppHash is updated after Commit AppHash []byte - // TxIndexer indexes transactions - TxIndexer txindex.TxIndexer `json:"-"` - logger log.Logger } @@ -95,7 +90,7 @@ func loadState(db dbm.DB, key []byte) *State { return nil } - s := &State{db: db, TxIndexer: &null.TxIndex{}} + s := &State{db: db} r, n, err := bytes.NewReader(buf), new(int), new(error) wire.ReadBinaryPtr(&s, r, 0, n, err) if *err != nil { @@ -114,8 +109,6 @@ func (s *State) SetLogger(l log.Logger) { } // Copy makes a copy of the State for mutating. -// NOTE: Does not create a copy of TxIndexer. It creates a new pointer that points to the same -// underlying TxIndexer. func (s *State) Copy() *State { return &State{ db: s.db, @@ -125,7 +118,6 @@ func (s *State) Copy() *State { Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), AppHash: s.AppHash, - TxIndexer: s.TxIndexer, LastHeightValidatorsChanged: s.LastHeightValidatorsChanged, logger: s.logger, ChainID: s.ChainID, @@ -368,7 +360,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) { } } - // we do not need indexer during replay and in tests return &State{ db: db, @@ -381,7 +372,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) { Validators: types.NewValidatorSet(validators), LastValidators: types.NewValidatorSet(nil), AppHash: genDoc.AppHash, - TxIndexer: &null.TxIndex{}, LastHeightValidatorsChanged: 1, }, nil } diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go index 039460a16..2c37283c8 100644 --- a/state/txindex/indexer.go +++ b/state/txindex/indexer.go @@ -9,12 +9,12 @@ import ( // TxIndexer interface defines methods to index and search transactions. type TxIndexer interface { - // AddBatch analyzes, indexes or stores a batch of transactions. - // NOTE: We do not specify Index method for analyzing a single transaction - // here because it bears heavy performance losses. Almost all advanced indexers - // support batching. + // AddBatch analyzes, indexes and stores a batch of transactions. AddBatch(b *Batch) error + // Index analyzes, indexes and stores a single transaction. + Index(result *types.TxResult) error + // Get returns the transaction specified by hash or nil if the transaction is not indexed // or stored. Get(hash []byte) (*types.TxResult, error) diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index db075e547..a3826c8b4 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -4,7 +4,7 @@ import ( "bytes" "fmt" - "github.com/tendermint/go-wire" + wire "github.com/tendermint/go-wire" db "github.com/tendermint/tmlibs/db" @@ -56,3 +56,10 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error { storeBatch.Write() return nil } + +// Index writes a single transaction into the TxIndex storage. +func (txi *TxIndex) Index(result *types.TxResult) error { + rawBytes := wire.BinaryBytes(result) + txi.store.Set(result.Tx.Hash(), rawBytes) + return nil +} diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index c0f1403ec..f814fabe3 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -30,6 +30,17 @@ func TestTxIndex(t *testing.T) { loadedTxResult, err := indexer.Get(hash) require.Nil(t, err) assert.Equal(t, txResult, loadedTxResult) + + tx2 := types.Tx("BYE BYE WORLD") + txResult2 := &types.TxResult{1, 0, tx2, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}} + hash2 := tx2.Hash() + + err = indexer.Index(txResult2) + require.Nil(t, err) + + loadedTxResult2, err := indexer.Get(hash2) + require.Nil(t, err) + assert.Equal(t, txResult2, loadedTxResult2) } func benchmarkTxIndex(txsCount int, b *testing.B) { diff --git a/state/txindex/null/null.go b/state/txindex/null/null.go index 4939d6d82..27e81d734 100644 --- a/state/txindex/null/null.go +++ b/state/txindex/null/null.go @@ -19,3 +19,8 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { func (txi *TxIndex) AddBatch(batch *txindex.Batch) error { return nil } + +// Index is a noop and always returns nil. +func (txi *TxIndex) Index(result *types.TxResult) error { + return nil +} diff --git a/types/event_bus.go b/types/event_bus.go index 6091538e4..a4daaa3c0 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -67,67 +67,95 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error { //--- block, tx, and vote events -func (b *EventBus) PublishEventNewBlock(block EventDataNewBlock) error { - return b.Publish(EventNewBlock, TMEventData{block}) +func (b *EventBus) PublishEventNewBlock(event EventDataNewBlock) error { + return b.Publish(EventNewBlock, TMEventData{event}) } -func (b *EventBus) PublishEventNewBlockHeader(header EventDataNewBlockHeader) error { - return b.Publish(EventNewBlockHeader, TMEventData{header}) +func (b *EventBus) PublishEventNewBlockHeader(event EventDataNewBlockHeader) error { + return b.Publish(EventNewBlockHeader, TMEventData{event}) } -func (b *EventBus) PublishEventVote(vote EventDataVote) error { - return b.Publish(EventVote, TMEventData{vote}) +func (b *EventBus) PublishEventVote(event EventDataVote) error { + return b.Publish(EventVote, TMEventData{event}) } -func (b *EventBus) PublishEventTx(tx EventDataTx) error { +// PublishEventTx publishes tx event with tags from Result. Note it will add +// predefined tags (EventTypeKey, TxHashKey). Existing tags with the same names +// will be overwritten. +func (b *EventBus) PublishEventTx(event EventDataTx) error { // no explicit deadline for publishing events ctx := context.Background() - tags := tx.Tags - // add predefined tags (they should overwrite any existing tags) + + tags := make(map[string]interface{}) + + // validate and fill tags from tx result + for _, tag := range event.Result.Tags { + // basic validation + if tag.Key == "" { + b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", event.Tx) + continue + } + + if tag.ValueString != "" { + tags[tag.Key] = tag.ValueString + } else { + tags[tag.Key] = tag.ValueInt + } + } + + // add predefined tags + if tag, ok := tags[EventTypeKey]; ok { + b.Logger.Error("Found predefined tag (value will be overwritten)", "tag", tag) + } tags[EventTypeKey] = EventTx - tags[TxHashKey] = fmt.Sprintf("%X", tx.Tx.Hash()) - b.pubsub.PublishWithTags(ctx, TMEventData{tx}, tags) + + if tag, ok := tags[TxHashKey]; ok { + b.Logger.Error("Found predefined tag (value will be overwritten)", "tag", tag) + } + tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash()) + + b.pubsub.PublishWithTags(ctx, TMEventData{event}, tags) return nil } -func (b *EventBus) PublishEventProposalHeartbeat(ph EventDataProposalHeartbeat) error { - return b.Publish(EventProposalHeartbeat, TMEventData{ph}) +func (b *EventBus) PublishEventProposalHeartbeat(event EventDataProposalHeartbeat) error { + return b.Publish(EventProposalHeartbeat, TMEventData{event}) } //--- EventDataRoundState events -func (b *EventBus) PublishEventNewRoundStep(rs EventDataRoundState) error { - return b.Publish(EventNewRoundStep, TMEventData{rs}) +func (b *EventBus) PublishEventNewRoundStep(event EventDataRoundState) error { + return b.Publish(EventNewRoundStep, TMEventData{event}) } -func (b *EventBus) PublishEventTimeoutPropose(rs EventDataRoundState) error { - return b.Publish(EventTimeoutPropose, TMEventData{rs}) +func (b *EventBus) PublishEventTimeoutPropose(event EventDataRoundState) error { + return b.Publish(EventTimeoutPropose, TMEventData{event}) } -func (b *EventBus) PublishEventTimeoutWait(rs EventDataRoundState) error { - return b.Publish(EventTimeoutWait, TMEventData{rs}) +func (b *EventBus) PublishEventTimeoutWait(event EventDataRoundState) error { + return b.Publish(EventTimeoutWait, TMEventData{event}) } -func (b *EventBus) PublishEventNewRound(rs EventDataRoundState) error { - return b.Publish(EventNewRound, TMEventData{rs}) +func (b *EventBus) PublishEventNewRound(event EventDataRoundState) error { + return b.Publish(EventNewRound, TMEventData{event}) } -func (b *EventBus) PublishEventCompleteProposal(rs EventDataRoundState) error { - return b.Publish(EventCompleteProposal, TMEventData{rs}) +func (b *EventBus) PublishEventCompleteProposal(event EventDataRoundState) error { + return b.Publish(EventCompleteProposal, TMEventData{event}) } -func (b *EventBus) PublishEventPolka(rs EventDataRoundState) error { - return b.Publish(EventPolka, TMEventData{rs}) +func (b *EventBus) PublishEventPolka(event EventDataRoundState) error { + return b.Publish(EventPolka, TMEventData{event}) } -func (b *EventBus) PublishEventUnlock(rs EventDataRoundState) error { - return b.Publish(EventUnlock, TMEventData{rs}) +func (b *EventBus) PublishEventUnlock(event EventDataRoundState) error { + return b.Publish(EventUnlock, TMEventData{event}) } -func (b *EventBus) PublishEventRelock(rs EventDataRoundState) error { - return b.Publish(EventRelock, TMEventData{rs}) +func (b *EventBus) PublishEventRelock(event EventDataRoundState) error { + return b.Publish(EventRelock, TMEventData{event}) } -func (b *EventBus) PublishEventLock(rs EventDataRoundState) error { - return b.Publish(EventLock, TMEventData{rs}) +func (b *EventBus) PublishEventLock(event EventDataRoundState) error { + return b.Publish(EventLock, TMEventData{event}) } diff --git a/types/events.go b/types/events.go index f20297d6c..03e5e7951 100644 --- a/types/events.go +++ b/types/events.go @@ -3,7 +3,6 @@ package types import ( "fmt" - abci "github.com/tendermint/abci/types" "github.com/tendermint/go-wire/data" tmpubsub "github.com/tendermint/tmlibs/pubsub" tmquery "github.com/tendermint/tmlibs/pubsub/query" @@ -110,13 +109,7 @@ type EventDataNewBlockHeader struct { // All txs fire EventDataTx type EventDataTx struct { - Height int `json:"height"` - Tx Tx `json:"tx"` - Data data.Bytes `json:"data"` - Log string `json:"log"` - Code abci.CodeType `json:"code"` - Tags map[string]interface{} `json:"tags"` - Error string `json:"error"` // this is redundant information for now + TxResult } type EventDataProposalHeartbeat struct { @@ -168,9 +161,10 @@ var ( EventQueryTimeoutWait = queryForEvent(EventTimeoutWait) EventQueryVote = queryForEvent(EventVote) EventQueryProposalHeartbeat = queryForEvent(EventProposalHeartbeat) + EventQueryTx = queryForEvent(EventTx) ) -func EventQueryTx(tx Tx) tmpubsub.Query { +func EventQueryTxFor(tx Tx) tmpubsub.Query { return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTx, TxHashKey, tx.Hash())) }