- package types
-
- import (
- "context"
- "fmt"
-
- "github.com/tendermint/tendermint/abci/types"
- cmn "github.com/tendermint/tendermint/libs/common"
- "github.com/tendermint/tendermint/libs/log"
- tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
- )
-
- const defaultCapacity = 0
-
- type EventBusSubscriber interface {
- Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error)
- Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
- UnsubscribeAll(ctx context.Context, subscriber string) error
-
- NumClients() int
- NumClientSubscriptions(clientID string) int
- }
-
- type Subscription interface {
- Out() <-chan tmpubsub.Message
- Cancelled() <-chan struct{}
- Err() error
- }
-
- // EventBus is a common bus for all events going through the system. All calls
- // are proxied to underlying pubsub server. All events must be published using
- // EventBus to ensure correct data types.
- type EventBus struct {
- cmn.BaseService
- pubsub *tmpubsub.Server
- }
-
- // NewEventBus returns a new event bus.
- func NewEventBus() *EventBus {
- return NewEventBusWithBufferCapacity(defaultCapacity)
- }
-
- // NewEventBusWithBufferCapacity returns a new event bus with the given buffer capacity.
- func NewEventBusWithBufferCapacity(cap int) *EventBus {
- // capacity could be exposed later if needed
- pubsub := tmpubsub.NewServer(tmpubsub.BufferCapacity(cap))
- b := &EventBus{pubsub: pubsub}
- b.BaseService = *cmn.NewBaseService(nil, "EventBus", b)
- return b
- }
-
- func (b *EventBus) SetLogger(l log.Logger) {
- b.BaseService.SetLogger(l)
- b.pubsub.SetLogger(l.With("module", "pubsub"))
- }
-
- func (b *EventBus) OnStart() error {
- return b.pubsub.Start()
- }
-
- func (b *EventBus) OnStop() {
- b.pubsub.Stop()
- }
-
- func (b *EventBus) NumClients() int {
- return b.pubsub.NumClients()
- }
-
- func (b *EventBus) NumClientSubscriptions(clientID string) int {
- return b.pubsub.NumClientSubscriptions(clientID)
- }
-
- func (b *EventBus) Subscribe(
- ctx context.Context,
- subscriber string,
- query tmpubsub.Query,
- outCapacity ...int,
- ) (Subscription, error) {
- return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...)
- }
-
- // This method can be used for a local consensus explorer and synchronous
- // testing. Do not use for for public facing / untrusted subscriptions!
- func (b *EventBus) SubscribeUnbuffered(
- ctx context.Context,
- subscriber string,
- query tmpubsub.Query,
- ) (Subscription, error) {
- return b.pubsub.SubscribeUnbuffered(ctx, subscriber, query)
- }
-
- func (b *EventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
- return b.pubsub.Unsubscribe(ctx, subscriber, query)
- }
-
- func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error {
- return b.pubsub.UnsubscribeAll(ctx, subscriber)
- }
-
- func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
- // no explicit deadline for publishing events
- ctx := context.Background()
- return b.pubsub.PublishWithEvents(ctx, eventData, map[string][]string{EventTypeKey: {eventType}})
- }
-
- // validateAndStringifyEvents takes a slice of event objects and creates a
- // map of stringified events where each key is composed of the event
- // type and each of the event's attributes keys in the form of
- // "{event.Type}.{attribute.Key}" and the value is each attribute's value.
- func (b *EventBus) validateAndStringifyEvents(events []types.Event, logger log.Logger) map[string][]string {
- result := make(map[string][]string)
- for _, event := range events {
- if len(event.Type) == 0 {
- logger.Debug("Got an event with an empty type (skipping)", "event", event)
- continue
- }
-
- for _, attr := range event.Attributes {
- if len(attr.Key) == 0 {
- logger.Debug("Got an event attribute with an empty key(skipping)", "event", event)
- continue
- }
-
- compositeTag := fmt.Sprintf("%s.%s", event.Type, string(attr.Key))
- result[compositeTag] = append(result[compositeTag], string(attr.Value))
- }
- }
-
- return result
- }
-
- func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
- // no explicit deadline for publishing events
- ctx := context.Background()
-
- resultEvents := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...)
- events := b.validateAndStringifyEvents(resultEvents, b.Logger.With("block", data.Block.StringShort()))
-
- // add predefined new block event
- events[EventTypeKey] = append(events[EventTypeKey], EventNewBlock)
-
- return b.pubsub.PublishWithEvents(ctx, data, events)
- }
-
- func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
- // no explicit deadline for publishing events
- ctx := context.Background()
-
- resultTags := append(data.ResultBeginBlock.Events, data.ResultEndBlock.Events...)
- // TODO: Create StringShort method for Header and use it in logger.
- events := b.validateAndStringifyEvents(resultTags, b.Logger.With("header", data.Header))
-
- // add predefined new block header event
- events[EventTypeKey] = append(events[EventTypeKey], EventNewBlockHeader)
-
- return b.pubsub.PublishWithEvents(ctx, data, events)
- }
-
- func (b *EventBus) PublishEventVote(data EventDataVote) error {
- return b.Publish(EventVote, data)
- }
-
- func (b *EventBus) PublishEventValidBlock(data EventDataRoundState) error {
- return b.Publish(EventValidBlock, data)
- }
-
- // PublishEventTx publishes tx event with events from Result. Note it will add
- // predefined keys (EventTypeKey, TxHashKey). Existing events with the same keys
- // will be overwritten.
- func (b *EventBus) PublishEventTx(data EventDataTx) error {
- // no explicit deadline for publishing events
- ctx := context.Background()
-
- events := b.validateAndStringifyEvents(data.Result.Events, b.Logger.With("tx", data.Tx))
-
- // add predefined compositeKeys
- events[EventTypeKey] = append(events[EventTypeKey], EventTx)
- events[TxHashKey] = append(events[TxHashKey], fmt.Sprintf("%X", data.Tx.Hash()))
- events[TxHeightKey] = append(events[TxHeightKey], fmt.Sprintf("%d", data.Height))
-
- return b.pubsub.PublishWithEvents(ctx, data, events)
- }
-
- func (b *EventBus) PublishEventNewRoundStep(data EventDataRoundState) error {
- return b.Publish(EventNewRoundStep, data)
- }
-
- func (b *EventBus) PublishEventTimeoutPropose(data EventDataRoundState) error {
- return b.Publish(EventTimeoutPropose, data)
- }
-
- func (b *EventBus) PublishEventTimeoutWait(data EventDataRoundState) error {
- return b.Publish(EventTimeoutWait, data)
- }
-
- func (b *EventBus) PublishEventNewRound(data EventDataNewRound) error {
- return b.Publish(EventNewRound, data)
- }
-
- func (b *EventBus) PublishEventCompleteProposal(data EventDataCompleteProposal) error {
- return b.Publish(EventCompleteProposal, data)
- }
-
- func (b *EventBus) PublishEventPolka(data EventDataRoundState) error {
- return b.Publish(EventPolka, data)
- }
-
- func (b *EventBus) PublishEventUnlock(data EventDataRoundState) error {
- return b.Publish(EventUnlock, data)
- }
-
- func (b *EventBus) PublishEventRelock(data EventDataRoundState) error {
- return b.Publish(EventRelock, data)
- }
-
- func (b *EventBus) PublishEventLock(data EventDataRoundState) error {
- return b.Publish(EventLock, data)
- }
-
- func (b *EventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error {
- return b.Publish(EventValidatorSetUpdates, data)
- }
-
- //-----------------------------------------------------------------------------
- type NopEventBus struct{}
-
- func (NopEventBus) Subscribe(
- ctx context.Context,
- subscriber string,
- query tmpubsub.Query,
- out chan<- interface{},
- ) error {
- return nil
- }
-
- func (NopEventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
- return nil
- }
-
- func (NopEventBus) UnsubscribeAll(ctx context.Context, subscriber string) error {
- return nil
- }
-
- func (NopEventBus) PublishEventNewBlock(data EventDataNewBlock) error {
- return nil
- }
-
- func (NopEventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
- return nil
- }
-
- func (NopEventBus) PublishEventVote(data EventDataVote) error {
- return nil
- }
-
- func (NopEventBus) PublishEventTx(data EventDataTx) error {
- return nil
- }
-
- func (NopEventBus) PublishEventNewRoundStep(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventTimeoutPropose(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventTimeoutWait(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventNewRound(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventCompleteProposal(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventPolka(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventUnlock(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventRelock(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventLock(data EventDataRoundState) error {
- return nil
- }
-
- func (NopEventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error {
- return nil
- }
|