You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

255 lines
5.9 KiB

  1. // Package events - Pub-Sub in go with event caching
  2. package events
  3. import (
  4. "context"
  5. "fmt"
  6. "sync"
  7. "github.com/tendermint/tendermint/libs/log"
  8. "github.com/tendermint/tendermint/libs/service"
  9. )
  10. // ErrListenerWasRemoved is returned by AddEvent if the listener was removed.
  11. type ErrListenerWasRemoved struct {
  12. listenerID string
  13. }
  14. // Error implements the error interface.
  15. func (e ErrListenerWasRemoved) Error() string {
  16. return fmt.Sprintf("listener #%s was removed", e.listenerID)
  17. }
  18. // EventData is a generic event data can be typed and registered with
  19. // tendermint/go-amino via concrete implementation of this interface.
  20. type EventData interface{}
  21. // Eventable is the interface reactors and other modules must export to become
  22. // eventable.
  23. type Eventable interface {
  24. SetEventSwitch(evsw EventSwitch)
  25. }
  26. // Fireable is the interface that wraps the FireEvent method.
  27. //
  28. // FireEvent fires an event with the given name and data.
  29. type Fireable interface {
  30. FireEvent(ctx context.Context, eventValue string, data EventData)
  31. }
  32. // EventSwitch is the interface for synchronous pubsub, where listeners
  33. // subscribe to certain events and, when an event is fired (see Fireable),
  34. // notified via a callback function.
  35. //
  36. // Listeners are added by calling AddListenerForEvent function.
  37. // They can be removed by calling either RemoveListenerForEvent or
  38. // RemoveListener (for all events).
  39. type EventSwitch interface {
  40. service.Service
  41. Fireable
  42. Stop() error
  43. AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error
  44. RemoveListenerForEvent(event string, listenerID string)
  45. RemoveListener(listenerID string)
  46. }
  47. type eventSwitch struct {
  48. service.BaseService
  49. mtx sync.RWMutex
  50. eventCells map[string]*eventCell
  51. listeners map[string]*eventListener
  52. }
  53. func NewEventSwitch(logger log.Logger) EventSwitch {
  54. evsw := &eventSwitch{
  55. eventCells: make(map[string]*eventCell),
  56. listeners: make(map[string]*eventListener),
  57. }
  58. evsw.BaseService = *service.NewBaseService(logger, "EventSwitch", evsw)
  59. return evsw
  60. }
  61. func (evsw *eventSwitch) OnStart(ctx context.Context) error {
  62. return nil
  63. }
  64. func (evsw *eventSwitch) OnStop() {}
  65. func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error {
  66. // Get/Create eventCell and listener.
  67. evsw.mtx.Lock()
  68. eventCell := evsw.eventCells[eventValue]
  69. if eventCell == nil {
  70. eventCell = newEventCell()
  71. evsw.eventCells[eventValue] = eventCell
  72. }
  73. listener := evsw.listeners[listenerID]
  74. if listener == nil {
  75. listener = newEventListener(listenerID)
  76. evsw.listeners[listenerID] = listener
  77. }
  78. evsw.mtx.Unlock()
  79. if err := listener.AddEvent(eventValue); err != nil {
  80. return err
  81. }
  82. eventCell.AddListener(listenerID, cb)
  83. return nil
  84. }
  85. func (evsw *eventSwitch) RemoveListener(listenerID string) {
  86. // Get and remove listener.
  87. evsw.mtx.RLock()
  88. listener := evsw.listeners[listenerID]
  89. evsw.mtx.RUnlock()
  90. if listener == nil {
  91. return
  92. }
  93. evsw.mtx.Lock()
  94. delete(evsw.listeners, listenerID)
  95. evsw.mtx.Unlock()
  96. // Remove callback for each event.
  97. listener.SetRemoved()
  98. for _, event := range listener.GetEvents() {
  99. evsw.RemoveListenerForEvent(event, listenerID)
  100. }
  101. }
  102. func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) {
  103. // Get eventCell
  104. evsw.mtx.Lock()
  105. eventCell := evsw.eventCells[event]
  106. evsw.mtx.Unlock()
  107. if eventCell == nil {
  108. return
  109. }
  110. // Remove listenerID from eventCell
  111. numListeners := eventCell.RemoveListener(listenerID)
  112. // Maybe garbage collect eventCell.
  113. if numListeners == 0 {
  114. // Lock again and double check.
  115. evsw.mtx.Lock() // OUTER LOCK
  116. eventCell.mtx.Lock() // INNER LOCK
  117. if len(eventCell.listeners) == 0 {
  118. delete(evsw.eventCells, event)
  119. }
  120. eventCell.mtx.Unlock() // INNER LOCK
  121. evsw.mtx.Unlock() // OUTER LOCK
  122. }
  123. }
  124. func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data EventData) {
  125. // Get the eventCell
  126. evsw.mtx.RLock()
  127. eventCell := evsw.eventCells[event]
  128. evsw.mtx.RUnlock()
  129. if eventCell == nil {
  130. return
  131. }
  132. // Fire event for all listeners in eventCell
  133. eventCell.FireEvent(ctx, data)
  134. }
  135. //-----------------------------------------------------------------------------
  136. // eventCell handles keeping track of listener callbacks for a given event.
  137. type eventCell struct {
  138. mtx sync.RWMutex
  139. listeners map[string]EventCallback
  140. }
  141. func newEventCell() *eventCell {
  142. return &eventCell{
  143. listeners: make(map[string]EventCallback),
  144. }
  145. }
  146. func (cell *eventCell) AddListener(listenerID string, cb EventCallback) {
  147. cell.mtx.Lock()
  148. cell.listeners[listenerID] = cb
  149. cell.mtx.Unlock()
  150. }
  151. func (cell *eventCell) RemoveListener(listenerID string) int {
  152. cell.mtx.Lock()
  153. delete(cell.listeners, listenerID)
  154. numListeners := len(cell.listeners)
  155. cell.mtx.Unlock()
  156. return numListeners
  157. }
  158. func (cell *eventCell) FireEvent(ctx context.Context, data EventData) {
  159. cell.mtx.RLock()
  160. eventCallbacks := make([]EventCallback, 0, len(cell.listeners))
  161. for _, cb := range cell.listeners {
  162. eventCallbacks = append(eventCallbacks, cb)
  163. }
  164. cell.mtx.RUnlock()
  165. for _, cb := range eventCallbacks {
  166. if err := cb(ctx, data); err != nil {
  167. // should we log or abort here?
  168. continue
  169. }
  170. }
  171. }
  172. //-----------------------------------------------------------------------------
  173. type EventCallback func(ctx context.Context, data EventData) error
  174. type eventListener struct {
  175. id string
  176. mtx sync.RWMutex
  177. removed bool
  178. events []string
  179. }
  180. func newEventListener(id string) *eventListener {
  181. return &eventListener{
  182. id: id,
  183. removed: false,
  184. events: nil,
  185. }
  186. }
  187. func (evl *eventListener) AddEvent(event string) error {
  188. evl.mtx.Lock()
  189. if evl.removed {
  190. evl.mtx.Unlock()
  191. return ErrListenerWasRemoved{listenerID: evl.id}
  192. }
  193. evl.events = append(evl.events, event)
  194. evl.mtx.Unlock()
  195. return nil
  196. }
  197. func (evl *eventListener) GetEvents() []string {
  198. evl.mtx.RLock()
  199. events := make([]string, len(evl.events))
  200. copy(events, evl.events)
  201. evl.mtx.RUnlock()
  202. return events
  203. }
  204. func (evl *eventListener) SetRemoved() {
  205. evl.mtx.Lock()
  206. evl.removed = true
  207. evl.mtx.Unlock()
  208. }