You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

255 lines
5.9 KiB

// Package events - Pub-Sub in go with event caching
package events
import (
"context"
"fmt"
"sync"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
)
// ErrListenerWasRemoved is returned by AddEvent if the listener was removed.
type ErrListenerWasRemoved struct {
listenerID string
}
// Error implements the error interface.
func (e ErrListenerWasRemoved) Error() string {
return fmt.Sprintf("listener #%s was removed", e.listenerID)
}
// EventData is a generic event data can be typed and registered with
// tendermint/go-amino via concrete implementation of this interface.
type EventData interface{}
// Eventable is the interface reactors and other modules must export to become
// eventable.
type Eventable interface {
SetEventSwitch(evsw EventSwitch)
}
// Fireable is the interface that wraps the FireEvent method.
//
// FireEvent fires an event with the given name and data.
type Fireable interface {
FireEvent(ctx context.Context, eventValue string, data EventData)
}
// EventSwitch is the interface for synchronous pubsub, where listeners
// subscribe to certain events and, when an event is fired (see Fireable),
// notified via a callback function.
//
// Listeners are added by calling AddListenerForEvent function.
// They can be removed by calling either RemoveListenerForEvent or
// RemoveListener (for all events).
type EventSwitch interface {
service.Service
Fireable
Stop()
AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error
RemoveListenerForEvent(event string, listenerID string)
RemoveListener(listenerID string)
}
type eventSwitch struct {
service.BaseService
mtx sync.RWMutex
eventCells map[string]*eventCell
listeners map[string]*eventListener
}
func NewEventSwitch(logger log.Logger) EventSwitch {
evsw := &eventSwitch{
eventCells: make(map[string]*eventCell),
listeners: make(map[string]*eventListener),
}
evsw.BaseService = *service.NewBaseService(logger, "EventSwitch", evsw)
return evsw
}
func (evsw *eventSwitch) OnStart(ctx context.Context) error {
return nil
}
func (evsw *eventSwitch) OnStop() {}
func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error {
// Get/Create eventCell and listener.
evsw.mtx.Lock()
eventCell := evsw.eventCells[eventValue]
if eventCell == nil {
eventCell = newEventCell()
evsw.eventCells[eventValue] = eventCell
}
listener := evsw.listeners[listenerID]
if listener == nil {
listener = newEventListener(listenerID)
evsw.listeners[listenerID] = listener
}
evsw.mtx.Unlock()
if err := listener.AddEvent(eventValue); err != nil {
return err
}
eventCell.AddListener(listenerID, cb)
return nil
}
func (evsw *eventSwitch) RemoveListener(listenerID string) {
// Get and remove listener.
evsw.mtx.RLock()
listener := evsw.listeners[listenerID]
evsw.mtx.RUnlock()
if listener == nil {
return
}
evsw.mtx.Lock()
delete(evsw.listeners, listenerID)
evsw.mtx.Unlock()
// Remove callback for each event.
listener.SetRemoved()
for _, event := range listener.GetEvents() {
evsw.RemoveListenerForEvent(event, listenerID)
}
}
func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) {
// Get eventCell
evsw.mtx.Lock()
eventCell := evsw.eventCells[event]
evsw.mtx.Unlock()
if eventCell == nil {
return
}
// Remove listenerID from eventCell
numListeners := eventCell.RemoveListener(listenerID)
// Maybe garbage collect eventCell.
if numListeners == 0 {
// Lock again and double check.
evsw.mtx.Lock() // OUTER LOCK
eventCell.mtx.Lock() // INNER LOCK
if len(eventCell.listeners) == 0 {
delete(evsw.eventCells, event)
}
eventCell.mtx.Unlock() // INNER LOCK
evsw.mtx.Unlock() // OUTER LOCK
}
}
func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data EventData) {
// Get the eventCell
evsw.mtx.RLock()
eventCell := evsw.eventCells[event]
evsw.mtx.RUnlock()
if eventCell == nil {
return
}
// Fire event for all listeners in eventCell
eventCell.FireEvent(ctx, data)
}
//-----------------------------------------------------------------------------
// eventCell handles keeping track of listener callbacks for a given event.
type eventCell struct {
mtx sync.RWMutex
listeners map[string]EventCallback
}
func newEventCell() *eventCell {
return &eventCell{
listeners: make(map[string]EventCallback),
}
}
func (cell *eventCell) AddListener(listenerID string, cb EventCallback) {
cell.mtx.Lock()
cell.listeners[listenerID] = cb
cell.mtx.Unlock()
}
func (cell *eventCell) RemoveListener(listenerID string) int {
cell.mtx.Lock()
delete(cell.listeners, listenerID)
numListeners := len(cell.listeners)
cell.mtx.Unlock()
return numListeners
}
func (cell *eventCell) FireEvent(ctx context.Context, data EventData) {
cell.mtx.RLock()
eventCallbacks := make([]EventCallback, 0, len(cell.listeners))
for _, cb := range cell.listeners {
eventCallbacks = append(eventCallbacks, cb)
}
cell.mtx.RUnlock()
for _, cb := range eventCallbacks {
if err := cb(ctx, data); err != nil {
// should we log or abort here?
continue
}
}
}
//-----------------------------------------------------------------------------
type EventCallback func(ctx context.Context, data EventData) error
type eventListener struct {
id string
mtx sync.RWMutex
removed bool
events []string
}
func newEventListener(id string) *eventListener {
return &eventListener{
id: id,
removed: false,
events: nil,
}
}
func (evl *eventListener) AddEvent(event string) error {
evl.mtx.Lock()
if evl.removed {
evl.mtx.Unlock()
return ErrListenerWasRemoved{listenerID: evl.id}
}
evl.events = append(evl.events, event)
evl.mtx.Unlock()
return nil
}
func (evl *eventListener) GetEvents() []string {
evl.mtx.RLock()
events := make([]string, len(evl.events))
copy(events, evl.events)
evl.mtx.RUnlock()
return events
}
func (evl *eventListener) SetRemoved() {
evl.mtx.Lock()
evl.removed = true
evl.mtx.Unlock()
}