You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

90 lines
2.4 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package txindex
  2. import (
  3. "context"
  4. cmn "github.com/tendermint/tendermint/libs/common"
  5. "github.com/tendermint/tendermint/types"
  6. )
  7. const (
  8. subscriber = "IndexerService"
  9. )
  10. // IndexerService connects event bus and transaction indexer together in order
  11. // to index transactions coming from event bus.
  12. type IndexerService struct {
  13. cmn.BaseService
  14. idr TxIndexer
  15. eventBus *types.EventBus
  16. }
  17. // NewIndexerService returns a new service instance.
  18. func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService {
  19. is := &IndexerService{idr: idr, eventBus: eventBus}
  20. is.BaseService = *cmn.NewBaseService(nil, "IndexerService", is)
  21. return is
  22. }
  23. // OnStart implements cmn.Service by subscribing for all transactions
  24. // and indexing them by tags.
  25. func (is *IndexerService) OnStart() error {
  26. blockHeadersSub, err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryNewBlockHeader)
  27. if err != nil {
  28. return err
  29. }
  30. txsSub, err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx)
  31. if err != nil {
  32. return err
  33. }
  34. go func() {
  35. for {
  36. select {
  37. case msg := <-blockHeadersSub.Out():
  38. header := msg.Data().(types.EventDataNewBlockHeader).Header
  39. batch := NewBatch(header.NumTxs)
  40. for i := int64(0); i < header.NumTxs; i++ {
  41. select {
  42. case msg2 := <-txsSub.Out():
  43. txResult := msg2.Data().(types.EventDataTx).TxResult
  44. if err = batch.Add(&txResult); err != nil {
  45. is.Logger.Error("Can't add tx to batch",
  46. "height", header.Height,
  47. "index", txResult.Index,
  48. "err", err)
  49. }
  50. case <-txsSub.Cancelled():
  51. is.Logger.Error("Failed to index block. txsSub was cancelled. Did the Tendermint stop?",
  52. "height", header.Height,
  53. "numTxs", header.NumTxs,
  54. "numProcessed", i,
  55. "err", txsSub.Err(),
  56. )
  57. return
  58. }
  59. }
  60. if err = is.idr.AddBatch(batch); err != nil {
  61. is.Logger.Error("Failed to index block", "height", header.Height, "err", err)
  62. } else {
  63. is.Logger.Info("Indexed block", "height", header.Height)
  64. }
  65. case <-blockHeadersSub.Cancelled():
  66. is.Logger.Error("blockHeadersSub was cancelled. Did the Tendermint stop?",
  67. "err", blockHeadersSub.Err())
  68. return
  69. }
  70. }
  71. }()
  72. return nil
  73. }
  74. // OnStop implements cmn.Service by unsubscribing from all transactions.
  75. func (is *IndexerService) OnStop() {
  76. if is.eventBus.IsRunning() {
  77. _ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
  78. }
  79. }