You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

73 lines
1.9 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package txindex
  2. import (
  3. "context"
  4. cmn "github.com/tendermint/tendermint/libs/common"
  5. "github.com/tendermint/tendermint/types"
  6. )
  7. const (
  8. subscriber = "IndexerService"
  9. )
  10. // IndexerService connects event bus and transaction indexer together in order
  11. // to index transactions coming from event bus.
  12. type IndexerService struct {
  13. cmn.BaseService
  14. idr TxIndexer
  15. eventBus *types.EventBus
  16. }
  17. // NewIndexerService returns a new service instance.
  18. func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService {
  19. is := &IndexerService{idr: idr, eventBus: eventBus}
  20. is.BaseService = *cmn.NewBaseService(nil, "IndexerService", is)
  21. return is
  22. }
  23. // OnStart implements cmn.Service by subscribing for all transactions
  24. // and indexing them by tags.
  25. func (is *IndexerService) OnStart() error {
  26. blockHeadersCh := make(chan interface{})
  27. if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryNewBlockHeader, blockHeadersCh); err != nil {
  28. return err
  29. }
  30. txsCh := make(chan interface{})
  31. if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, txsCh); err != nil {
  32. return err
  33. }
  34. go func() {
  35. for {
  36. e, ok := <-blockHeadersCh
  37. if !ok {
  38. return
  39. }
  40. header := e.(types.EventDataNewBlockHeader).Header
  41. batch := NewBatch(header.NumTxs)
  42. for i := int64(0); i < header.NumTxs; i++ {
  43. e, ok := <-txsCh
  44. if !ok {
  45. is.Logger.Error("Failed to index all transactions due to closed transactions channel", "height", header.Height, "numTxs", header.NumTxs, "numProcessed", i)
  46. return
  47. }
  48. txResult := e.(types.EventDataTx).TxResult
  49. batch.Add(&txResult)
  50. }
  51. is.idr.AddBatch(batch)
  52. is.Logger.Info("Indexed block", "height", header.Height)
  53. }
  54. }()
  55. return nil
  56. }
  57. // OnStop implements cmn.Service by unsubscribing from all transactions.
  58. func (is *IndexerService) OnStop() {
  59. if is.eventBus.IsRunning() {
  60. _ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
  61. }
  62. }