You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

242 lines
7.1 KiB

  1. package eventbus
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. abci "github.com/tendermint/tendermint/abci/types"
  7. "github.com/tendermint/tendermint/libs/log"
  8. tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
  9. "github.com/tendermint/tendermint/libs/service"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. // Subscription is a proxy interface for a pubsub Subscription.
  13. type Subscription interface {
  14. ID() string
  15. Next(context.Context) (tmpubsub.Message, error)
  16. }
  17. // EventBus is a common bus for all events going through the system.
  18. // It is a type-aware wrapper around an underlying pubsub server.
  19. // All events should be published via the bus.
  20. type EventBus struct {
  21. service.BaseService
  22. pubsub *tmpubsub.Server
  23. }
  24. // NewDefault returns a new event bus with default options.
  25. func NewDefault() *EventBus {
  26. pubsub := tmpubsub.NewServer(tmpubsub.BufferCapacity(0))
  27. b := &EventBus{pubsub: pubsub}
  28. b.BaseService = *service.NewBaseService(nil, "EventBus", b)
  29. return b
  30. }
  31. func (b *EventBus) SetLogger(l log.Logger) {
  32. b.BaseService.SetLogger(l)
  33. b.pubsub.SetLogger(l.With("module", "pubsub"))
  34. }
  35. func (b *EventBus) OnStart() error {
  36. return b.pubsub.Start()
  37. }
  38. func (b *EventBus) OnStop() {
  39. if err := b.pubsub.Stop(); err != nil {
  40. b.pubsub.Logger.Error("error trying to stop eventBus", "error", err)
  41. }
  42. }
  43. func (b *EventBus) NumClients() int {
  44. return b.pubsub.NumClients()
  45. }
  46. func (b *EventBus) NumClientSubscriptions(clientID string) int {
  47. return b.pubsub.NumClientSubscriptions(clientID)
  48. }
  49. // Deprecated: Use SubscribeWithArgs instead.
  50. func (b *EventBus) Subscribe(ctx context.Context,
  51. clientID string, query tmpubsub.Query, capacities ...int) (Subscription, error) {
  52. return b.pubsub.Subscribe(ctx, clientID, query, capacities...)
  53. }
  54. func (b *EventBus) SubscribeWithArgs(ctx context.Context, args tmpubsub.SubscribeArgs) (Subscription, error) {
  55. return b.pubsub.SubscribeWithArgs(ctx, args)
  56. }
  57. func (b *EventBus) Unsubscribe(ctx context.Context, args tmpubsub.UnsubscribeArgs) error {
  58. return b.pubsub.Unsubscribe(ctx, args)
  59. }
  60. func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error {
  61. return b.pubsub.UnsubscribeAll(ctx, subscriber)
  62. }
  63. func (b *EventBus) Observe(ctx context.Context, observe func(tmpubsub.Message) error, queries ...tmpubsub.Query) error {
  64. return b.pubsub.Observe(ctx, observe, queries...)
  65. }
  66. func (b *EventBus) Publish(eventValue string, eventData types.TMEventData) error {
  67. // no explicit deadline for publishing events
  68. ctx := context.Background()
  69. tokens := strings.Split(types.EventTypeKey, ".")
  70. event := abci.Event{
  71. Type: tokens[0],
  72. Attributes: []abci.EventAttribute{
  73. {
  74. Key: tokens[1],
  75. Value: eventValue,
  76. },
  77. },
  78. }
  79. return b.pubsub.PublishWithEvents(ctx, eventData, []abci.Event{event})
  80. }
  81. func (b *EventBus) PublishEventNewBlock(data types.EventDataNewBlock) error {
  82. // no explicit deadline for publishing events
  83. ctx := context.Background()
  84. events := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...)
  85. // add Tendermint-reserved new block event
  86. events = append(events, types.EventNewBlock)
  87. return b.pubsub.PublishWithEvents(ctx, data, events)
  88. }
  89. func (b *EventBus) PublishEventNewBlockHeader(data types.EventDataNewBlockHeader) error {
  90. // no explicit deadline for publishing events
  91. ctx := context.Background()
  92. events := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...)
  93. // add Tendermint-reserved new block header event
  94. events = append(events, types.EventNewBlockHeader)
  95. return b.pubsub.PublishWithEvents(ctx, data, events)
  96. }
  97. func (b *EventBus) PublishEventNewEvidence(evidence types.EventDataNewEvidence) error {
  98. return b.Publish(types.EventNewEvidenceValue, evidence)
  99. }
  100. func (b *EventBus) PublishEventVote(data types.EventDataVote) error {
  101. return b.Publish(types.EventVoteValue, data)
  102. }
  103. func (b *EventBus) PublishEventValidBlock(data types.EventDataRoundState) error {
  104. return b.Publish(types.EventValidBlockValue, data)
  105. }
  106. func (b *EventBus) PublishEventBlockSyncStatus(data types.EventDataBlockSyncStatus) error {
  107. return b.Publish(types.EventBlockSyncStatusValue, data)
  108. }
  109. func (b *EventBus) PublishEventStateSyncStatus(data types.EventDataStateSyncStatus) error {
  110. return b.Publish(types.EventStateSyncStatusValue, data)
  111. }
  112. // PublishEventTx publishes tx event with events from Result. Note it will add
  113. // predefined keys (EventTypeKey, TxHashKey). Existing events with the same keys
  114. // will be overwritten.
  115. func (b *EventBus) PublishEventTx(data types.EventDataTx) error {
  116. // no explicit deadline for publishing events
  117. ctx := context.Background()
  118. events := data.Result.Events
  119. // add Tendermint-reserved events
  120. events = append(events, types.EventTx)
  121. tokens := strings.Split(types.TxHashKey, ".")
  122. events = append(events, abci.Event{
  123. Type: tokens[0],
  124. Attributes: []abci.EventAttribute{
  125. {
  126. Key: tokens[1],
  127. Value: fmt.Sprintf("%X", types.Tx(data.Tx).Hash()),
  128. },
  129. },
  130. })
  131. tokens = strings.Split(types.TxHeightKey, ".")
  132. events = append(events, abci.Event{
  133. Type: tokens[0],
  134. Attributes: []abci.EventAttribute{
  135. {
  136. Key: tokens[1],
  137. Value: fmt.Sprintf("%d", data.Height),
  138. },
  139. },
  140. })
  141. return b.pubsub.PublishWithEvents(ctx, data, events)
  142. }
  143. func (b *EventBus) PublishEventNewRoundStep(data types.EventDataRoundState) error {
  144. return b.Publish(types.EventNewRoundStepValue, data)
  145. }
  146. func (b *EventBus) PublishEventTimeoutPropose(data types.EventDataRoundState) error {
  147. return b.Publish(types.EventTimeoutProposeValue, data)
  148. }
  149. func (b *EventBus) PublishEventTimeoutWait(data types.EventDataRoundState) error {
  150. return b.Publish(types.EventTimeoutWaitValue, data)
  151. }
  152. func (b *EventBus) PublishEventNewRound(data types.EventDataNewRound) error {
  153. return b.Publish(types.EventNewRoundValue, data)
  154. }
  155. func (b *EventBus) PublishEventCompleteProposal(data types.EventDataCompleteProposal) error {
  156. return b.Publish(types.EventCompleteProposalValue, data)
  157. }
  158. func (b *EventBus) PublishEventPolka(data types.EventDataRoundState) error {
  159. return b.Publish(types.EventPolkaValue, data)
  160. }
  161. func (b *EventBus) PublishEventUnlock(data types.EventDataRoundState) error {
  162. return b.Publish(types.EventUnlockValue, data)
  163. }
  164. func (b *EventBus) PublishEventRelock(data types.EventDataRoundState) error {
  165. return b.Publish(types.EventRelockValue, data)
  166. }
  167. func (b *EventBus) PublishEventLock(data types.EventDataRoundState) error {
  168. return b.Publish(types.EventLockValue, data)
  169. }
  170. func (b *EventBus) PublishEventValidatorSetUpdates(data types.EventDataValidatorSetUpdates) error {
  171. return b.Publish(types.EventValidatorSetUpdatesValue, data)
  172. }
  173. //-----------------------------------------------------------------------------
  174. // NopEventBus implements a types.BlockEventPublisher that discards all events.
  175. type NopEventBus struct{}
  176. func (NopEventBus) PublishEventNewBlock(types.EventDataNewBlock) error {
  177. return nil
  178. }
  179. func (NopEventBus) PublishEventNewBlockHeader(types.EventDataNewBlockHeader) error {
  180. return nil
  181. }
  182. func (NopEventBus) PublishEventNewEvidence(types.EventDataNewEvidence) error {
  183. return nil
  184. }
  185. func (NopEventBus) PublishEventTx(types.EventDataTx) error {
  186. return nil
  187. }
  188. func (NopEventBus) PublishEventValidatorSetUpdates(types.EventDataValidatorSetUpdates) error {
  189. return nil
  190. }