diff --git a/node/node.go b/node/node.go index 57fbfbf28..ea668f8f6 100644 --- a/node/node.go +++ b/node/node.go @@ -111,6 +111,7 @@ type Node struct { proxyApp proxy.AppConns // connection to the application rpcListeners []net.Listener // rpc servers txIndexer txindex.TxIndexer + indexerService *txindex.IndexerService } // NewNode returns a new, ready to go, Tendermint Node. @@ -292,16 +293,7 @@ func NewNode(config *cfg.Config, txIndexer = &null.TxIndex{} } - // subscribe for all transactions and index them by tags - ch := make(chan interface{}) - eventBus.Subscribe(context.Background(), "tx_index", types.EventQueryTx, ch) - go func() { - for event := range ch { - // XXX: may be not perfomant to write one event at a time - txResult := event.(types.TMEventData).Unwrap().(types.EventDataTx).TxResult - txIndexer.Index(&txResult) - } - }() + indexerService := txindex.NewIndexerService(txIndexer, eventBus) // run the profile server profileHost := config.ProfListenAddress @@ -328,6 +320,7 @@ func NewNode(config *cfg.Config, consensusReactor: consensusReactor, proxyApp: proxyApp, txIndexer: txIndexer, + indexerService: indexerService, eventBus: eventBus, } node.BaseService = *cmn.NewBaseService(logger, "Node", node) @@ -373,6 +366,12 @@ func (n *Node) OnStart() error { } } + // start tx indexer + _, err = n.indexerService.Start() + if err != nil { + return err + } + return nil } @@ -392,6 +391,8 @@ func (n *Node) OnStop() { } n.eventBus.Stop() + + n.indexerService.Stop() } // RunForever waits for an interrupt signal and stops the node. diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go new file mode 100644 index 000000000..80f12fd35 --- /dev/null +++ b/state/txindex/indexer_service.go @@ -0,0 +1,48 @@ +package txindex + +import ( + "context" + + "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tmlibs/common" +) + +const ( + subscriber = "IndexerService" +) + +type IndexerService struct { + cmn.BaseService + + idr TxIndexer + eventBus *types.EventBus +} + +func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService { + is := &IndexerService{idr: idr, eventBus: eventBus} + is.BaseService = *cmn.NewBaseService(nil, "IndexerService", is) + return is +} + +// OnStart implements cmn.Service by subscribing for all transactions +// and indexing them by tags. +func (is *IndexerService) OnStart() error { + ch := make(chan interface{}) + if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, ch); err != nil { + return err + } + go func() { + for event := range ch { + // TODO: may be not perfomant to write one event at a time + txResult := event.(types.TMEventData).Unwrap().(types.EventDataTx).TxResult + is.idr.Index(&txResult) + } + }() + return nil +} + +func (is *IndexerService) OnStop() { + if is.eventBus.IsRunning() { + _ = is.eventBus.UnsubscribeAll(context.Background(), subscriber) + } +}