You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

254 lines
5.9 KiB

  1. // Package events - Pub-Sub in go with event caching
  2. package events
  3. import (
  4. "context"
  5. "fmt"
  6. "sync"
  7. "github.com/tendermint/tendermint/libs/service"
  8. )
  9. // ErrListenerWasRemoved is returned by AddEvent if the listener was removed.
  10. type ErrListenerWasRemoved struct {
  11. listenerID string
  12. }
  13. // Error implements the error interface.
  14. func (e ErrListenerWasRemoved) Error() string {
  15. return fmt.Sprintf("listener #%s was removed", e.listenerID)
  16. }
  17. // EventData is a generic event data can be typed and registered with
  18. // tendermint/go-amino via concrete implementation of this interface.
  19. type EventData interface{}
  20. // Eventable is the interface reactors and other modules must export to become
  21. // eventable.
  22. type Eventable interface {
  23. SetEventSwitch(evsw EventSwitch)
  24. }
  25. // Fireable is the interface that wraps the FireEvent method.
  26. //
  27. // FireEvent fires an event with the given name and data.
  28. type Fireable interface {
  29. FireEvent(ctx context.Context, eventValue string, data EventData)
  30. }
  31. // EventSwitch is the interface for synchronous pubsub, where listeners
  32. // subscribe to certain events and, when an event is fired (see Fireable),
  33. // notified via a callback function.
  34. //
  35. // Listeners are added by calling AddListenerForEvent function.
  36. // They can be removed by calling either RemoveListenerForEvent or
  37. // RemoveListener (for all events).
  38. type EventSwitch interface {
  39. service.Service
  40. Fireable
  41. Stop() error
  42. AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error
  43. RemoveListenerForEvent(event string, listenerID string)
  44. RemoveListener(listenerID string)
  45. }
  46. type eventSwitch struct {
  47. service.BaseService
  48. mtx sync.RWMutex
  49. eventCells map[string]*eventCell
  50. listeners map[string]*eventListener
  51. }
  52. func NewEventSwitch() EventSwitch {
  53. evsw := &eventSwitch{
  54. eventCells: make(map[string]*eventCell),
  55. listeners: make(map[string]*eventListener),
  56. }
  57. evsw.BaseService = *service.NewBaseService(nil, "EventSwitch", evsw)
  58. return evsw
  59. }
  60. func (evsw *eventSwitch) OnStart(ctx context.Context) error {
  61. return nil
  62. }
  63. func (evsw *eventSwitch) OnStop() {}
  64. func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error {
  65. // Get/Create eventCell and listener.
  66. evsw.mtx.Lock()
  67. eventCell := evsw.eventCells[eventValue]
  68. if eventCell == nil {
  69. eventCell = newEventCell()
  70. evsw.eventCells[eventValue] = eventCell
  71. }
  72. listener := evsw.listeners[listenerID]
  73. if listener == nil {
  74. listener = newEventListener(listenerID)
  75. evsw.listeners[listenerID] = listener
  76. }
  77. evsw.mtx.Unlock()
  78. if err := listener.AddEvent(eventValue); err != nil {
  79. return err
  80. }
  81. eventCell.AddListener(listenerID, cb)
  82. return nil
  83. }
  84. func (evsw *eventSwitch) RemoveListener(listenerID string) {
  85. // Get and remove listener.
  86. evsw.mtx.RLock()
  87. listener := evsw.listeners[listenerID]
  88. evsw.mtx.RUnlock()
  89. if listener == nil {
  90. return
  91. }
  92. evsw.mtx.Lock()
  93. delete(evsw.listeners, listenerID)
  94. evsw.mtx.Unlock()
  95. // Remove callback for each event.
  96. listener.SetRemoved()
  97. for _, event := range listener.GetEvents() {
  98. evsw.RemoveListenerForEvent(event, listenerID)
  99. }
  100. }
  101. func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) {
  102. // Get eventCell
  103. evsw.mtx.Lock()
  104. eventCell := evsw.eventCells[event]
  105. evsw.mtx.Unlock()
  106. if eventCell == nil {
  107. return
  108. }
  109. // Remove listenerID from eventCell
  110. numListeners := eventCell.RemoveListener(listenerID)
  111. // Maybe garbage collect eventCell.
  112. if numListeners == 0 {
  113. // Lock again and double check.
  114. evsw.mtx.Lock() // OUTER LOCK
  115. eventCell.mtx.Lock() // INNER LOCK
  116. if len(eventCell.listeners) == 0 {
  117. delete(evsw.eventCells, event)
  118. }
  119. eventCell.mtx.Unlock() // INNER LOCK
  120. evsw.mtx.Unlock() // OUTER LOCK
  121. }
  122. }
  123. func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data EventData) {
  124. // Get the eventCell
  125. evsw.mtx.RLock()
  126. eventCell := evsw.eventCells[event]
  127. evsw.mtx.RUnlock()
  128. if eventCell == nil {
  129. return
  130. }
  131. // Fire event for all listeners in eventCell
  132. eventCell.FireEvent(ctx, data)
  133. }
  134. //-----------------------------------------------------------------------------
  135. // eventCell handles keeping track of listener callbacks for a given event.
  136. type eventCell struct {
  137. mtx sync.RWMutex
  138. listeners map[string]EventCallback
  139. }
  140. func newEventCell() *eventCell {
  141. return &eventCell{
  142. listeners: make(map[string]EventCallback),
  143. }
  144. }
  145. func (cell *eventCell) AddListener(listenerID string, cb EventCallback) {
  146. cell.mtx.Lock()
  147. cell.listeners[listenerID] = cb
  148. cell.mtx.Unlock()
  149. }
  150. func (cell *eventCell) RemoveListener(listenerID string) int {
  151. cell.mtx.Lock()
  152. delete(cell.listeners, listenerID)
  153. numListeners := len(cell.listeners)
  154. cell.mtx.Unlock()
  155. return numListeners
  156. }
  157. func (cell *eventCell) FireEvent(ctx context.Context, data EventData) {
  158. cell.mtx.RLock()
  159. eventCallbacks := make([]EventCallback, 0, len(cell.listeners))
  160. for _, cb := range cell.listeners {
  161. eventCallbacks = append(eventCallbacks, cb)
  162. }
  163. cell.mtx.RUnlock()
  164. for _, cb := range eventCallbacks {
  165. if err := cb(ctx, data); err != nil {
  166. // should we log or abort here?
  167. continue
  168. }
  169. }
  170. }
  171. //-----------------------------------------------------------------------------
  172. type EventCallback func(ctx context.Context, data EventData) error
  173. type eventListener struct {
  174. id string
  175. mtx sync.RWMutex
  176. removed bool
  177. events []string
  178. }
  179. func newEventListener(id string) *eventListener {
  180. return &eventListener{
  181. id: id,
  182. removed: false,
  183. events: nil,
  184. }
  185. }
  186. func (evl *eventListener) AddEvent(event string) error {
  187. evl.mtx.Lock()
  188. if evl.removed {
  189. evl.mtx.Unlock()
  190. return ErrListenerWasRemoved{listenerID: evl.id}
  191. }
  192. evl.events = append(evl.events, event)
  193. evl.mtx.Unlock()
  194. return nil
  195. }
  196. func (evl *eventListener) GetEvents() []string {
  197. evl.mtx.RLock()
  198. events := make([]string, len(evl.events))
  199. copy(events, evl.events)
  200. evl.mtx.RUnlock()
  201. return events
  202. }
  203. func (evl *eventListener) SetRemoved() {
  204. evl.mtx.Lock()
  205. evl.removed = true
  206. evl.mtx.Unlock()
  207. }