You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

169 lines
5.0 KiB

6 years ago
6 years ago
pubsub 2.0 (#3227) * green pubsub tests :OK: * get rid of clientToQueryMap * Subscribe and SubscribeUnbuffered * start adapting other pkgs to new pubsub * nope * rename MsgAndTags to Message * remove TagMap it does not bring any additional benefits * bring back EventSubscriber * fix test * fix data race in TestStartNextHeightCorrectly ``` Write at 0x00c0001c7418 by goroutine 796: github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Previous read at 0x00c0001c7418 by goroutine 858: github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366 github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg() /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine() /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794 Goroutine 796 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 testing.runTests.func1() /usr/local/go/src/testing/testing.go:1119 +0xa8 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 testing.runTests() /usr/local/go/src/testing/testing.go:1117 +0x4ee testing.(*M).Run() /usr/local/go/src/testing/testing.go:1034 +0x2ee main.main() _testmain.go:214 +0x332 Goroutine 858 (running) created at: github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines() /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221 github.com/tendermint/tendermint/consensus.startTestRound() /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63 github.com/tendermint/tendermint/consensus.TestStateFullRound1() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ``` * fixes after my own review * fix formatting * wait 100ms before kicking a subscriber out + a test for indexer_service * fixes after my second review * no timeout * add changelog entries * fix merge conflicts * fix typos after Thane's review Co-Authored-By: melekes <anton.kalyaev@gmail.com> * reformat code * rewrite indexer service in the attempt to fix failing test https://github.com/tendermint/tendermint/pull/3227/#issuecomment-462316527 * Revert "rewrite indexer service in the attempt to fix failing test" This reverts commit 0d9107a098230de7138abb1c201877c246e89ed1. * another attempt to fix indexer * fixes after Ethan's review * use unbuffered channel when indexing transactions Refs https://github.com/tendermint/tendermint/pull/3227#discussion_r258786716 * add a comment for EventBus#SubscribeUnbuffered * format code
5 years ago
  1. package indexer
  2. import (
  3. "context"
  4. "time"
  5. "github.com/tendermint/tendermint/internal/eventbus"
  6. "github.com/tendermint/tendermint/libs/log"
  7. "github.com/tendermint/tendermint/libs/pubsub"
  8. "github.com/tendermint/tendermint/libs/service"
  9. "github.com/tendermint/tendermint/types"
  10. )
  11. // Service connects event bus, transaction and block indexers together in
  12. // order to index transactions and blocks coming from the event bus.
  13. type Service struct {
  14. service.BaseService
  15. eventSinks []EventSink
  16. eventBus *eventbus.EventBus
  17. metrics *Metrics
  18. currentBlock struct {
  19. header types.EventDataNewBlockHeader
  20. height int64
  21. batch *Batch
  22. }
  23. }
  24. // NewService constructs a new indexer service from the given arguments.
  25. func NewService(args ServiceArgs) *Service {
  26. is := &Service{
  27. eventSinks: args.Sinks,
  28. eventBus: args.EventBus,
  29. metrics: args.Metrics,
  30. }
  31. if is.metrics == nil {
  32. is.metrics = NopMetrics()
  33. }
  34. is.BaseService = *service.NewBaseService(args.Logger, "IndexerService", is)
  35. return is
  36. }
  37. // publish publishes a pubsub message to the service. The service blocks until
  38. // the message has been fully processed.
  39. func (is *Service) publish(msg pubsub.Message) error {
  40. // Indexing has three states. Initially, no block is in progress (WAIT) and
  41. // we expect a block header. Upon seeing a header, we are waiting for zero
  42. // or more transactions (GATHER). Once all the expected transactions have
  43. // been delivered (in some order), we are ready to index. After indexing a
  44. // block, we revert to the WAIT state for the next block.
  45. if is.currentBlock.batch == nil {
  46. // WAIT: Start a new block.
  47. hdr := msg.Data().(types.EventDataNewBlockHeader)
  48. is.currentBlock.header = hdr
  49. is.currentBlock.height = hdr.Header.Height
  50. is.currentBlock.batch = NewBatch(hdr.NumTxs)
  51. if hdr.NumTxs != 0 {
  52. return nil
  53. }
  54. // If the block does not expect any transactions, fall through and index
  55. // it immediately. This shouldn't happen, but this check ensures we do
  56. // not get stuck if it does.
  57. }
  58. curr := is.currentBlock.batch
  59. if curr.Pending != 0 {
  60. // GATHER: Accumulate a transaction into the current block's batch.
  61. txResult := msg.Data().(types.EventDataTx).TxResult
  62. if err := curr.Add(&txResult); err != nil {
  63. is.Logger.Error("failed to add tx to batch",
  64. "height", is.currentBlock.height, "index", txResult.Index, "err", err)
  65. }
  66. // This may have been the last transaction in the batch, so fall through
  67. // to check whether it is time to index.
  68. }
  69. if curr.Pending == 0 {
  70. // INDEX: We have all the transactions we expect for the current block.
  71. for _, sink := range is.eventSinks {
  72. start := time.Now()
  73. if err := sink.IndexBlockEvents(is.currentBlock.header); err != nil {
  74. is.Logger.Error("failed to index block header",
  75. "height", is.currentBlock.height, "err", err)
  76. } else {
  77. is.metrics.BlockEventsSeconds.Observe(time.Since(start).Seconds())
  78. is.metrics.BlocksIndexed.Add(1)
  79. is.Logger.Debug("indexed block",
  80. "height", is.currentBlock.height, "sink", sink.Type())
  81. }
  82. if curr.Size() != 0 {
  83. start := time.Now()
  84. err := sink.IndexTxEvents(curr.Ops)
  85. if err != nil {
  86. is.Logger.Error("failed to index block txs",
  87. "height", is.currentBlock.height, "err", err)
  88. } else {
  89. is.metrics.TxEventsSeconds.Observe(time.Since(start).Seconds())
  90. is.metrics.TransactionsIndexed.Add(float64(curr.Size()))
  91. is.Logger.Debug("indexed txs",
  92. "height", is.currentBlock.height, "sink", sink.Type())
  93. }
  94. }
  95. }
  96. is.currentBlock.batch = nil // return to the WAIT state for the next block
  97. }
  98. return nil
  99. }
  100. // OnStart implements part of service.Service. It registers an observer for the
  101. // indexer if the underlying event sinks support indexing.
  102. //
  103. // TODO(creachadair): Can we get rid of the "enabled" check?
  104. func (is *Service) OnStart(ctx context.Context) error {
  105. // If the event sinks support indexing, register an observer to capture
  106. // block header data for the indexer.
  107. if IndexingEnabled(is.eventSinks) {
  108. err := is.eventBus.Observe(context.TODO(), is.publish,
  109. types.EventQueryNewBlockHeader, types.EventQueryTx)
  110. if err != nil {
  111. return err
  112. }
  113. }
  114. return nil
  115. }
  116. // OnStop implements service.Service by closing the event sinks.
  117. func (is *Service) OnStop() {
  118. for _, sink := range is.eventSinks {
  119. if err := sink.Stop(); err != nil {
  120. is.Logger.Error("failed to close eventsink", "eventsink", sink.Type(), "err", err)
  121. }
  122. }
  123. }
  124. // ServiceArgs are arguments for constructing a new indexer service.
  125. type ServiceArgs struct {
  126. Sinks []EventSink
  127. EventBus *eventbus.EventBus
  128. Metrics *Metrics
  129. Logger log.Logger
  130. }
  131. // KVSinkEnabled returns the given eventSinks is containing KVEventSink.
  132. func KVSinkEnabled(sinks []EventSink) bool {
  133. for _, sink := range sinks {
  134. if sink.Type() == KV {
  135. return true
  136. }
  137. }
  138. return false
  139. }
  140. // IndexingEnabled returns the given eventSinks is supporting the indexing services.
  141. func IndexingEnabled(sinks []EventSink) bool {
  142. for _, sink := range sinks {
  143. if sink.Type() == KV || sink.Type() == PSQL {
  144. return true
  145. }
  146. }
  147. return false
  148. }