- package types
-
- import (
- "context"
- "fmt"
-
- cmn "github.com/tendermint/tendermint/libs/common"
- "github.com/tendermint/tendermint/libs/log"
- tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
- )
-
- const defaultCapacity = 0
-
- type EventBusSubscriber interface {
- Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error)
- Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
- UnsubscribeAll(ctx context.Context, subscriber string) error
- }
-
- type Subscription interface {
- Out() <-chan tmpubsub.Message
- Cancelled() <-chan struct{}
- Err() error
- }
-
- // EventBus is a common bus for all events going through the system. All calls
- // are proxied to underlying pubsub server. All events must be published using
- // EventBus to ensure correct data types.
- type EventBus struct {
- cmn.BaseService
- pubsub *tmpubsub.Server
- }
-
- // NewEventBus returns a new event bus.
- func NewEventBus() *EventBus {
- return NewEventBusWithBufferCapacity(defaultCapacity)
- }
-
- // NewEventBusWithBufferCapacity returns a new event bus with the given buffer capacity.
- func NewEventBusWithBufferCapacity(cap int) *EventBus {
- // capacity could be exposed later if needed
- pubsub := tmpubsub.NewServer(tmpubsub.BufferCapacity(cap))
- b := &EventBus{pubsub: pubsub}
- b.BaseService = *cmn.NewBaseService(nil, "EventBus", b)
- return b
- }
-
- func (b *EventBus) SetLogger(l log.Logger) {
- b.BaseService.SetLogger(l)
- b.pubsub.SetLogger(l.With("module", "pubsub"))
- }
-
- func (b *EventBus) OnStart() error {
- return b.pubsub.Start()
- }
-
- func (b *EventBus) OnStop() {
- b.pubsub.Stop()
- }
-
- func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error) {
- return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...)
- }
-
- // This method can be used for a local consensus explorer and synchronous
- // testing. Do not use for for public facing / untrusted subscriptions!
- func (b *EventBus) SubscribeUnbuffered(ctx context.Context, subscriber string, query tmpubsub.Query) (Subscription, error) {
- return b.pubsub.SubscribeUnbuffered(ctx, subscriber, query)
- }
-
- func (b *EventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
- return b.pubsub.Unsubscribe(ctx, subscriber, query)
- }
-
- func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error {
- return b.pubsub.UnsubscribeAll(ctx, subscriber)
- }
-
- func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
- // no explicit deadline for publishing events
- ctx := context.Background()
- b.pubsub.PublishWithTags(ctx, eventData, map[string]string{EventTypeKey: eventType})
- return nil
- }
-
- func (b *EventBus) validateAndStringifyTags(tags []cmn.KVPair, logger log.Logger) map[string]string {
- result := make(map[string]string)
- for _, tag := range tags {
- // basic validation
- if len(tag.Key) == 0 {
- logger.Debug("Got tag with an empty key (skipping)", "tag", tag)
- continue
- }
- result[string(tag.Key)] = string(tag.Value)
- }
- return result
- }
-
- func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
- // no explicit deadline for publishing events
- ctx := context.Background()
-
- resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
- tags := b.validateAndStringifyTags(resultTags, b.Logger.With("block", data.Block.StringShort()))
-
- // add predefined tags
- logIfTagExists(EventTypeKey, tags, b.Logger)
- tags[EventTypeKey] = EventNewBlock
-
- b.pubsub.PublishWithTags(ctx, data, tags)
- return nil
- }
-
- func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
- // no explicit deadline for publishing events
- ctx := context.Background()
-
- resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
- // TODO: Create StringShort method for Header and use it in logger.
- tags := b.validateAndStringifyTags(resultTags, b.Logger.With("header", data.Header))
-
- // add predefined tags
- logIfTagExists(EventTypeKey, tags, b.Logger)
- tags[EventTypeKey] = EventNewBlockHeader
-
- b.pubsub.PublishWithTags(ctx, data, tags)
- return nil
- }
-
- func (b *EventBus) PublishEventVote(data EventDataVote) error {
- return b.Publish(EventVote, data)
- }
-
- func (b *EventBus) PublishEventValidBlock(data EventDataRoundState) error {
- return b.Publish(EventValidBlock, data)
- }
-
- // PublishEventTx publishes tx event with tags from Result. Note it will add
- // predefined tags (EventTypeKey, TxHashKey). Existing tags with the same names
- // will be overwritten.
- func (b *EventBus) PublishEventTx(data EventDataTx) error {
- // no explicit deadline for publishing events
- ctx := context.Background()
-
- tags := b.validateAndStringifyTags(data.Result.Tags, b.Logger.With("tx", data.Tx))
-
- // add predefined tags
- logIfTagExists(EventTypeKey, tags, b.Logger)
- tags[EventTypeKey] = EventTx
-
- logIfTagExists(TxHashKey, tags, b.Logger)
- tags[TxHashKey] = fmt.Sprintf("%X", data.Tx.Hash())
-
- logIfTagExists(TxHeightKey, tags, b.Logger)
- tags[TxHeightKey] = fmt.Sprintf("%d", data.Height)
-
- b.pubsub.PublishWithTags(ctx, data, tags)
- return nil
- }
-
- func (b *EventBus) PublishEventNewRoundStep(data EventDataRoundState) error {
- return b.Publish(EventNewRoundStep, data)
- }
-
- func (b *EventBus) PublishEventTimeoutPropose(data EventDataRoundState) error {
- return b.Publish(EventTimeoutPropose, data)
- }
-
- func (b *EventBus) PublishEventTimeoutWait(data EventDataRoundState) error {
- return b.Publish(EventTimeoutWait, data)
- }
-
- func (b *EventBus) PublishEventNewRound(data EventDataNewRound) error {
- return b.Publish(EventNewRound, data)
- }
-
- func (b *EventBus) PublishEventCompleteProposal(data EventDataCompleteProposal) error {
- return b.Publish(EventCompleteProposal, data)
- }
-
- func (b *EventBus) PublishEventPolka(data EventDataRoundState) error {
- return b.Publish(EventPolka, data)
- }
-
- func (b *EventBus) PublishEventUnlock(data EventDataRoundState) error {
- return b.Publish(EventUnlock, data)
- }
-
- func (b *EventBus) PublishEventRelock(data EventDataRoundState) error {
- return b.Publish(EventRelock, data)
- }
-
- func (b *EventBus) PublishEventLock(data EventDataRoundState) error {
- return b.Publish(EventLock, data)
- }
-
- func (b *EventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error {
- return b.Publish(EventValidatorSetUpdates, data)
- }
-
- func logIfTagExists(tag string, tags map[string]string, logger log.Logger) {
- if value, ok := tags[tag]; ok {
- logger.Error("Found predefined tag (value will be overwritten)", "tag", tag, "value", value)
- }
- }
-
- //-----------------------------------------------------------------------------
- type NopEventBus struct{}
-
- func (NopEventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
- return nil
- }
-
- func (NopEventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
- return nil
- }
-
- func (NopEventBus) UnsubscribeAll(ctx context.Context, subscriber string) error {
- return nil
- }
-
- func (NopEventBus) PublishEventNewBlock(data EventDataNewBlock) error {
- return nil
- }
-
- func (NopEventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
- return nil
- }
-
- func (NopEventBus) PublishEventVote(data EventDataVote) error {
- return nil
- }
-
- func (NopEventBus) PublishEventTx(data EventDataTx) error {
- return nil
- }
-
- func (NopEventBus) PublishEventNewRoundStep(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventTimeoutPropose(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventTimeoutWait(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventNewRound(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventCompleteProposal(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventPolka(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventUnlock(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventRelock(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventLock(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error {
- return nil
- }
|