package types import ( "context" "fmt" "strings" "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/libs/service" ) const defaultCapacity = 0 type EventBusSubscriber interface { Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error) Unsubscribe(ctx context.Context, args tmpubsub.UnsubscribeArgs) error UnsubscribeAll(ctx context.Context, subscriber string) error NumClients() int NumClientSubscriptions(clientID string) int } type Subscription interface { ID() string Out() <-chan tmpubsub.Message Canceled() <-chan struct{} Err() error } // EventBus is a common bus for all events going through the system. All calls // are proxied to underlying pubsub server. All events must be published using // EventBus to ensure correct data types. type EventBus struct { service.BaseService pubsub *tmpubsub.Server } // NewEventBus returns a new event bus. func NewEventBus() *EventBus { return NewEventBusWithBufferCapacity(defaultCapacity) } // NewEventBusWithBufferCapacity returns a new event bus with the given buffer capacity. func NewEventBusWithBufferCapacity(cap int) *EventBus { // capacity could be exposed later if needed pubsub := tmpubsub.NewServer(tmpubsub.BufferCapacity(cap)) b := &EventBus{pubsub: pubsub} b.BaseService = *service.NewBaseService(nil, "EventBus", b) return b } func (b *EventBus) SetLogger(l log.Logger) { b.BaseService.SetLogger(l) b.pubsub.SetLogger(l.With("module", "pubsub")) } func (b *EventBus) OnStart() error { return b.pubsub.Start() } func (b *EventBus) OnStop() { if err := b.pubsub.Stop(); err != nil { b.pubsub.Logger.Error("error trying to stop eventBus", "error", err) } } func (b *EventBus) NumClients() int { return b.pubsub.NumClients() } func (b *EventBus) NumClientSubscriptions(clientID string) int { return b.pubsub.NumClientSubscriptions(clientID) } func (b *EventBus) Subscribe( ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int, ) (Subscription, error) { return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...) } // This method can be used for a local consensus explorer and synchronous // testing. Do not use for for public facing / untrusted subscriptions! func (b *EventBus) SubscribeUnbuffered( ctx context.Context, subscriber string, query tmpubsub.Query, ) (Subscription, error) { return b.pubsub.SubscribeUnbuffered(ctx, subscriber, query) } func (b *EventBus) Unsubscribe(ctx context.Context, args tmpubsub.UnsubscribeArgs) error { return b.pubsub.Unsubscribe(ctx, args) } func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error { return b.pubsub.UnsubscribeAll(ctx, subscriber) } func (b *EventBus) Publish(eventValue string, eventData TMEventData) error { // no explicit deadline for publishing events ctx := context.Background() tokens := strings.Split(EventTypeKey, ".") event := types.Event{ Type: tokens[0], Attributes: []types.EventAttribute{ { Key: tokens[1], Value: eventValue, }, }, } return b.pubsub.PublishWithEvents(ctx, eventData, []types.Event{event}) } func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error { // no explicit deadline for publishing events ctx := context.Background() events := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...) // add Tendermint-reserved new block event events = append(events, EventNewBlock) return b.pubsub.PublishWithEvents(ctx, data, events) } func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error { // no explicit deadline for publishing events ctx := context.Background() events := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...) // add Tendermint-reserved new block header event events = append(events, EventNewBlockHeader) return b.pubsub.PublishWithEvents(ctx, data, events) } func (b *EventBus) PublishEventNewEvidence(evidence EventDataNewEvidence) error { return b.Publish(EventNewEvidenceValue, evidence) } func (b *EventBus) PublishEventVote(data EventDataVote) error { return b.Publish(EventVoteValue, data) } func (b *EventBus) PublishEventValidBlock(data EventDataRoundState) error { return b.Publish(EventValidBlockValue, data) } // PublishEventTx publishes tx event with events from Result. Note it will add // predefined keys (EventTypeKey, TxHashKey). Existing events with the same keys // will be overwritten. func (b *EventBus) PublishEventTx(data EventDataTx) error { // no explicit deadline for publishing events ctx := context.Background() events := data.Result.Events // add Tendermint-reserved events events = append(events, EventTx) tokens := strings.Split(TxHashKey, ".") events = append(events, types.Event{ Type: tokens[0], Attributes: []types.EventAttribute{ { Key: tokens[1], Value: fmt.Sprintf("%X", Tx(data.Tx).Hash()), }, }, }) tokens = strings.Split(TxHeightKey, ".") events = append(events, types.Event{ Type: tokens[0], Attributes: []types.EventAttribute{ { Key: tokens[1], Value: fmt.Sprintf("%d", data.Height), }, }, }) return b.pubsub.PublishWithEvents(ctx, data, events) } func (b *EventBus) PublishEventNewRoundStep(data EventDataRoundState) error { return b.Publish(EventNewRoundStepValue, data) } func (b *EventBus) PublishEventTimeoutPropose(data EventDataRoundState) error { return b.Publish(EventTimeoutProposeValue, data) } func (b *EventBus) PublishEventTimeoutWait(data EventDataRoundState) error { return b.Publish(EventTimeoutWaitValue, data) } func (b *EventBus) PublishEventNewRound(data EventDataNewRound) error { return b.Publish(EventNewRoundValue, data) } func (b *EventBus) PublishEventCompleteProposal(data EventDataCompleteProposal) error { return b.Publish(EventCompleteProposalValue, data) } func (b *EventBus) PublishEventPolka(data EventDataRoundState) error { return b.Publish(EventPolkaValue, data) } func (b *EventBus) PublishEventUnlock(data EventDataRoundState) error { return b.Publish(EventUnlockValue, data) } func (b *EventBus) PublishEventRelock(data EventDataRoundState) error { return b.Publish(EventRelockValue, data) } func (b *EventBus) PublishEventLock(data EventDataRoundState) error { return b.Publish(EventLockValue, data) } func (b *EventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error { return b.Publish(EventValidatorSetUpdatesValue, data) } //----------------------------------------------------------------------------- type NopEventBus struct{} func (NopEventBus) Subscribe( ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}, ) error { return nil } func (NopEventBus) Unsubscribe(ctx context.Context, args tmpubsub.UnsubscribeArgs) error { return nil } func (NopEventBus) UnsubscribeAll(ctx context.Context, subscriber string) error { return nil } func (NopEventBus) PublishEventNewBlock(data EventDataNewBlock) error { return nil } func (NopEventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error { return nil } func (NopEventBus) PublishEventNewEvidence(evidence EventDataNewEvidence) error { return nil } func (NopEventBus) PublishEventVote(data EventDataVote) error { return nil } func (NopEventBus) PublishEventTx(data EventDataTx) error { return nil } func (NopEventBus) PublishEventNewRoundStep(data EventDataRoundState) error { return nil } func (NopEventBus) PublishEventTimeoutPropose(data EventDataRoundState) error { return nil } func (NopEventBus) PublishEventTimeoutWait(data EventDataRoundState) error { return nil } func (NopEventBus) PublishEventNewRound(data EventDataRoundState) error { return nil } func (NopEventBus) PublishEventCompleteProposal(data EventDataRoundState) error { return nil } func (NopEventBus) PublishEventPolka(data EventDataRoundState) error { return nil } func (NopEventBus) PublishEventUnlock(data EventDataRoundState) error { return nil } func (NopEventBus) PublishEventRelock(data EventDataRoundState) error { return nil } func (NopEventBus) PublishEventLock(data EventDataRoundState) error { return nil } func (NopEventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error { return nil }