You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
5.0 KiB

7 years ago
7 years ago
pubsub 2.0 (#3227) * green pubsub tests :OK: * get rid of clientToQueryMap * Subscribe and SubscribeUnbuffered * start adapting other pkgs to new pubsub * nope * rename MsgAndTags to Message * remove TagMap it does not bring any additional benefits * bring back EventSubscriber * fix test * fix data race in TestStartNextHeightCorrectly ``` Write at 0x00c0001c7418 by goroutine 796: github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Previous read at 0x00c0001c7418 by goroutine 858: github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366 github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg() /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine() /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794 Goroutine 796 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 testing.runTests.func1() /usr/local/go/src/testing/testing.go:1119 +0xa8 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 testing.runTests() /usr/local/go/src/testing/testing.go:1117 +0x4ee testing.(*M).Run() /usr/local/go/src/testing/testing.go:1034 +0x2ee main.main() _testmain.go:214 +0x332 Goroutine 858 (running) created at: github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines() /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221 github.com/tendermint/tendermint/consensus.startTestRound() /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63 github.com/tendermint/tendermint/consensus.TestStateFullRound1() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ``` * fixes after my own review * fix formatting * wait 100ms before kicking a subscriber out + a test for indexer_service * fixes after my second review * no timeout * add changelog entries * fix merge conflicts * fix typos after Thane's review Co-Authored-By: melekes <anton.kalyaev@gmail.com> * reformat code * rewrite indexer service in the attempt to fix failing test https://github.com/tendermint/tendermint/pull/3227/#issuecomment-462316527 * Revert "rewrite indexer service in the attempt to fix failing test" This reverts commit 0d9107a098230de7138abb1c201877c246e89ed1. * another attempt to fix indexer * fixes after Ethan's review * use unbuffered channel when indexing transactions Refs https://github.com/tendermint/tendermint/pull/3227#discussion_r258786716 * add a comment for EventBus#SubscribeUnbuffered * format code
6 years ago
  1. package indexer
  2. import (
  3. "context"
  4. "time"
  5. "github.com/tendermint/tendermint/internal/eventbus"
  6. "github.com/tendermint/tendermint/internal/pubsub"
  7. "github.com/tendermint/tendermint/libs/log"
  8. "github.com/tendermint/tendermint/libs/service"
  9. "github.com/tendermint/tendermint/types"
  10. )
  11. // Service connects event bus, transaction and block indexers together in
  12. // order to index transactions and blocks coming from the event bus.
  13. type Service struct {
  14. service.BaseService
  15. logger log.Logger
  16. eventSinks []EventSink
  17. eventBus *eventbus.EventBus
  18. metrics *Metrics
  19. currentBlock struct {
  20. header types.EventDataNewBlockHeader
  21. height int64
  22. batch *Batch
  23. }
  24. }
  25. // NewService constructs a new indexer service from the given arguments.
  26. func NewService(args ServiceArgs) *Service {
  27. is := &Service{
  28. logger: args.Logger,
  29. eventSinks: args.Sinks,
  30. eventBus: args.EventBus,
  31. metrics: args.Metrics,
  32. }
  33. if is.metrics == nil {
  34. is.metrics = NopMetrics()
  35. }
  36. is.BaseService = *service.NewBaseService(args.Logger, "IndexerService", is)
  37. return is
  38. }
  39. // publish publishes a pubsub message to the service. The service blocks until
  40. // the message has been fully processed.
  41. func (is *Service) publish(msg pubsub.Message) error {
  42. // Indexing has three states. Initially, no block is in progress (WAIT) and
  43. // we expect a block header. Upon seeing a header, we are waiting for zero
  44. // or more transactions (GATHER). Once all the expected transactions have
  45. // been delivered (in some order), we are ready to index. After indexing a
  46. // block, we revert to the WAIT state for the next block.
  47. if is.currentBlock.batch == nil {
  48. // WAIT: Start a new block.
  49. hdr := msg.Data().(types.EventDataNewBlockHeader)
  50. is.currentBlock.header = hdr
  51. is.currentBlock.height = hdr.Header.Height
  52. is.currentBlock.batch = NewBatch(hdr.NumTxs)
  53. if hdr.NumTxs != 0 {
  54. return nil
  55. }
  56. // If the block does not expect any transactions, fall through and index
  57. // it immediately. This shouldn't happen, but this check ensures we do
  58. // not get stuck if it does.
  59. }
  60. curr := is.currentBlock.batch
  61. if curr.Pending != 0 {
  62. // GATHER: Accumulate a transaction into the current block's batch.
  63. txResult := msg.Data().(types.EventDataTx).TxResult
  64. if err := curr.Add(&txResult); err != nil {
  65. is.logger.Error("failed to add tx to batch",
  66. "height", is.currentBlock.height, "index", txResult.Index, "err", err)
  67. }
  68. // This may have been the last transaction in the batch, so fall through
  69. // to check whether it is time to index.
  70. }
  71. if curr.Pending == 0 {
  72. // INDEX: We have all the transactions we expect for the current block.
  73. for _, sink := range is.eventSinks {
  74. start := time.Now()
  75. if err := sink.IndexBlockEvents(is.currentBlock.header); err != nil {
  76. is.logger.Error("failed to index block header",
  77. "height", is.currentBlock.height, "err", err)
  78. } else {
  79. is.metrics.BlockEventsSeconds.Observe(time.Since(start).Seconds())
  80. is.metrics.BlocksIndexed.Add(1)
  81. is.logger.Debug("indexed block",
  82. "height", is.currentBlock.height, "sink", sink.Type())
  83. }
  84. if curr.Size() != 0 {
  85. start := time.Now()
  86. err := sink.IndexTxEvents(curr.Ops)
  87. if err != nil {
  88. is.logger.Error("failed to index block txs",
  89. "height", is.currentBlock.height, "err", err)
  90. } else {
  91. is.metrics.TxEventsSeconds.Observe(time.Since(start).Seconds())
  92. is.metrics.TransactionsIndexed.Add(float64(curr.Size()))
  93. is.logger.Debug("indexed txs",
  94. "height", is.currentBlock.height, "sink", sink.Type())
  95. }
  96. }
  97. }
  98. is.currentBlock.batch = nil // return to the WAIT state for the next block
  99. }
  100. return nil
  101. }
  102. // OnStart implements part of service.Service. It registers an observer for the
  103. // indexer if the underlying event sinks support indexing.
  104. //
  105. // TODO(creachadair): Can we get rid of the "enabled" check?
  106. func (is *Service) OnStart(ctx context.Context) error {
  107. // If the event sinks support indexing, register an observer to capture
  108. // block header data for the indexer.
  109. if IndexingEnabled(is.eventSinks) {
  110. err := is.eventBus.Observe(ctx, is.publish,
  111. types.EventQueryNewBlockHeader, types.EventQueryTx)
  112. if err != nil {
  113. return err
  114. }
  115. }
  116. return nil
  117. }
  118. // OnStop implements service.Service by closing the event sinks.
  119. func (is *Service) OnStop() {
  120. for _, sink := range is.eventSinks {
  121. if err := sink.Stop(); err != nil {
  122. is.logger.Error("failed to close eventsink", "eventsink", sink.Type(), "err", err)
  123. }
  124. }
  125. }
  126. // ServiceArgs are arguments for constructing a new indexer service.
  127. type ServiceArgs struct {
  128. Sinks []EventSink
  129. EventBus *eventbus.EventBus
  130. Metrics *Metrics
  131. Logger log.Logger
  132. }
  133. // KVSinkEnabled returns the given eventSinks is containing KVEventSink.
  134. func KVSinkEnabled(sinks []EventSink) bool {
  135. for _, sink := range sinks {
  136. if sink.Type() == KV {
  137. return true
  138. }
  139. }
  140. return false
  141. }
  142. // IndexingEnabled returns the given eventSinks is supporting the indexing services.
  143. func IndexingEnabled(sinks []EventSink) bool {
  144. for _, sink := range sinks {
  145. if sink.Type() == KV || sink.Type() == PSQL {
  146. return true
  147. }
  148. }
  149. return false
  150. }