Browse Source

move from tendermint/tendermint

pull/1842/head
Ethan Buchman 9 years ago
parent
commit
089435d31b
4 changed files with 269 additions and 1 deletions
  1. +1
    -1
      README.md
  2. +41
    -0
      event_cache.go
  3. +220
    -0
      events.go
  4. +7
    -0
      log.go

+ 1
- 1
README.md View File

@ -1,2 +1,2 @@
# go-events
PubSub in Go
PubSub in Go with event caching.

+ 41
- 0
event_cache.go View File

@ -0,0 +1,41 @@
package events
const (
eventsBufferSize = 1000
)
// An EventCache buffers events for a Fireable
// All events are cached. Filtering happens on Flush
type EventCache struct {
evsw Fireable
events []eventInfo
}
// Create a new EventCache with an EventSwitch as backend
func NewEventCache(evsw Fireable) *EventCache {
return &EventCache{
evsw: evsw,
events: make([]eventInfo, eventsBufferSize),
}
}
// a cached event
type eventInfo struct {
event string
data EventData
}
// Cache an event to be fired upon finality.
func (evc *EventCache) FireEvent(event string, data EventData) {
// append to list
evc.events = append(evc.events, eventInfo{event, data})
}
// Fire events by running evsw.FireEvent on all cached events. Blocks.
// Clears cached events
func (evc *EventCache) Flush() {
for _, ei := range evc.events {
evc.evsw.FireEvent(ei.event, ei.data)
}
evc.events = make([]eventInfo, eventsBufferSize)
}

+ 220
- 0
events.go View File

@ -0,0 +1,220 @@
package events
import (
"sync"
. "github.com/tendermint/go-common"
)
// Generic event data can be typed and registered with tendermint/go-wire
// via concrete implementation of this interface
type EventData interface {
AssertIsEventData()
}
// reactors and other modules should export
// this interface to become eventable
type Eventable interface {
SetEventSwitch(evsw *EventSwitch)
}
// an event switch or cache implements fireable
type Fireable interface {
FireEvent(event string, data EventData)
}
type EventSwitch struct {
BaseService
mtx sync.RWMutex
eventCells map[string]*eventCell
listeners map[string]*eventListener
}
func NewEventSwitch() *EventSwitch {
evsw := &EventSwitch{}
evsw.BaseService = *NewBaseService(log, "EventSwitch", evsw)
return evsw
}
func (evsw *EventSwitch) OnStart() error {
evsw.BaseService.OnStart()
evsw.eventCells = make(map[string]*eventCell)
evsw.listeners = make(map[string]*eventListener)
return nil
}
func (evsw *EventSwitch) OnStop() {
evsw.BaseService.OnStop()
evsw.eventCells = nil
evsw.listeners = nil
}
func (evsw *EventSwitch) AddListenerForEvent(listenerID, event string, cb eventCallback) {
// Get/Create eventCell and listener
evsw.mtx.Lock()
eventCell := evsw.eventCells[event]
if eventCell == nil {
eventCell = newEventCell()
evsw.eventCells[event] = eventCell
}
listener := evsw.listeners[listenerID]
if listener == nil {
listener = newEventListener(listenerID)
evsw.listeners[listenerID] = listener
}
evsw.mtx.Unlock()
// Add event and listener
eventCell.AddListener(listenerID, cb)
listener.AddEvent(event)
}
func (evsw *EventSwitch) RemoveListener(listenerID string) {
// Get and remove listener
evsw.mtx.RLock()
listener := evsw.listeners[listenerID]
delete(evsw.listeners, listenerID)
evsw.mtx.RUnlock()
if listener == nil {
return
}
// Remove callback for each event.
listener.SetRemoved()
for _, event := range listener.GetEvents() {
evsw.RemoveListenerForEvent(event, listenerID)
}
}
func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerID string) {
// Get eventCell
evsw.mtx.Lock()
eventCell := evsw.eventCells[event]
evsw.mtx.Unlock()
if eventCell == nil {
return
}
// Remove listenerID from eventCell
numListeners := eventCell.RemoveListener(listenerID)
// Maybe garbage collect eventCell.
if numListeners == 0 {
// Lock again and double check.
evsw.mtx.Lock() // OUTER LOCK
eventCell.mtx.Lock() // INNER LOCK
if len(eventCell.listeners) == 0 {
delete(evsw.eventCells, event)
}
eventCell.mtx.Unlock() // INNER LOCK
evsw.mtx.Unlock() // OUTER LOCK
}
}
func (evsw *EventSwitch) FireEvent(event string, data EventData) {
// Get the eventCell
evsw.mtx.RLock()
eventCell := evsw.eventCells[event]
evsw.mtx.RUnlock()
if eventCell == nil {
return
}
// Fire event for all listeners in eventCell
eventCell.FireEvent(data)
}
func (evsw *EventSwitch) SubscribeToEvent(receiver, eventID string, chanCap int) chan interface{} {
// listen for new round
ch := make(chan interface{}, chanCap)
evsw.AddListenerForEvent(receiver, eventID, func(data EventData) {
// NOTE: in production, evsw callbacks should be nonblocking.
ch <- data
})
return ch
}
//-----------------------------------------------------------------------------
// eventCell handles keeping track of listener callbacks for a given event.
type eventCell struct {
mtx sync.RWMutex
listeners map[string]eventCallback
}
func newEventCell() *eventCell {
return &eventCell{
listeners: make(map[string]eventCallback),
}
}
func (cell *eventCell) AddListener(listenerID string, cb eventCallback) {
cell.mtx.Lock()
cell.listeners[listenerID] = cb
cell.mtx.Unlock()
}
func (cell *eventCell) RemoveListener(listenerID string) int {
cell.mtx.Lock()
delete(cell.listeners, listenerID)
numListeners := len(cell.listeners)
cell.mtx.Unlock()
return numListeners
}
func (cell *eventCell) FireEvent(data EventData) {
cell.mtx.RLock()
for _, listener := range cell.listeners {
listener(data)
}
cell.mtx.RUnlock()
}
//-----------------------------------------------------------------------------
type eventCallback func(data EventData)
type eventListener struct {
id string
mtx sync.RWMutex
removed bool
events []string
}
func newEventListener(id string) *eventListener {
return &eventListener{
id: id,
removed: false,
events: nil,
}
}
func (evl *eventListener) AddEvent(event string) {
evl.mtx.Lock()
defer evl.mtx.Unlock()
if evl.removed {
return
}
evl.events = append(evl.events, event)
}
func (evl *eventListener) GetEvents() []string {
evl.mtx.RLock()
defer evl.mtx.RUnlock()
events := make([]string, len(evl.events))
copy(events, evl.events)
return events
}
func (evl *eventListener) SetRemoved() {
evl.mtx.Lock()
defer evl.mtx.Unlock()
evl.removed = true
}

+ 7
- 0
log.go View File

@ -0,0 +1,7 @@
package events
import (
"github.com/tendermint/go-logger"
)
var log = logger.New("module", "events")

Loading…
Cancel
Save