You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

247 lines
5.7 KiB

  1. // Package events - Pub-Sub in go with event caching
  2. package events
  3. import (
  4. "fmt"
  5. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  6. "github.com/tendermint/tendermint/libs/service"
  7. )
  8. // ErrListenerWasRemoved is returned by AddEvent if the listener was removed.
  9. type ErrListenerWasRemoved struct {
  10. listenerID string
  11. }
  12. // Error implements the error interface.
  13. func (e ErrListenerWasRemoved) Error() string {
  14. return fmt.Sprintf("listener #%s was removed", e.listenerID)
  15. }
  16. // EventData is a generic event data can be typed and registered with
  17. // tendermint/go-amino via concrete implementation of this interface.
  18. type EventData interface{}
  19. // Eventable is the interface reactors and other modules must export to become
  20. // eventable.
  21. type Eventable interface {
  22. SetEventSwitch(evsw EventSwitch)
  23. }
  24. // Fireable is the interface that wraps the FireEvent method.
  25. //
  26. // FireEvent fires an event with the given name and data.
  27. type Fireable interface {
  28. FireEvent(event string, data EventData)
  29. }
  30. // EventSwitch is the interface for synchronous pubsub, where listeners
  31. // subscribe to certain events and, when an event is fired (see Fireable),
  32. // notified via a callback function.
  33. //
  34. // Listeners are added by calling AddListenerForEvent function.
  35. // They can be removed by calling either RemoveListenerForEvent or
  36. // RemoveListener (for all events).
  37. type EventSwitch interface {
  38. service.Service
  39. Fireable
  40. AddListenerForEvent(listenerID, event string, cb EventCallback) error
  41. RemoveListenerForEvent(event string, listenerID string)
  42. RemoveListener(listenerID string)
  43. }
  44. type eventSwitch struct {
  45. service.BaseService
  46. mtx tmsync.RWMutex
  47. eventCells map[string]*eventCell
  48. listeners map[string]*eventListener
  49. }
  50. func NewEventSwitch() EventSwitch {
  51. evsw := &eventSwitch{
  52. eventCells: make(map[string]*eventCell),
  53. listeners: make(map[string]*eventListener),
  54. }
  55. evsw.BaseService = *service.NewBaseService(nil, "EventSwitch", evsw)
  56. return evsw
  57. }
  58. func (evsw *eventSwitch) OnStart() error {
  59. return nil
  60. }
  61. func (evsw *eventSwitch) OnStop() {}
  62. func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) error {
  63. // Get/Create eventCell and listener.
  64. evsw.mtx.Lock()
  65. eventCell := evsw.eventCells[event]
  66. if eventCell == nil {
  67. eventCell = newEventCell()
  68. evsw.eventCells[event] = eventCell
  69. }
  70. listener := evsw.listeners[listenerID]
  71. if listener == nil {
  72. listener = newEventListener(listenerID)
  73. evsw.listeners[listenerID] = listener
  74. }
  75. evsw.mtx.Unlock()
  76. // Add event and listener.
  77. if err := listener.AddEvent(event); err != nil {
  78. return err
  79. }
  80. eventCell.AddListener(listenerID, cb)
  81. return nil
  82. }
  83. func (evsw *eventSwitch) RemoveListener(listenerID string) {
  84. // Get and remove listener.
  85. evsw.mtx.RLock()
  86. listener := evsw.listeners[listenerID]
  87. evsw.mtx.RUnlock()
  88. if listener == nil {
  89. return
  90. }
  91. evsw.mtx.Lock()
  92. delete(evsw.listeners, listenerID)
  93. evsw.mtx.Unlock()
  94. // Remove callback for each event.
  95. listener.SetRemoved()
  96. for _, event := range listener.GetEvents() {
  97. evsw.RemoveListenerForEvent(event, listenerID)
  98. }
  99. }
  100. func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) {
  101. // Get eventCell
  102. evsw.mtx.Lock()
  103. eventCell := evsw.eventCells[event]
  104. evsw.mtx.Unlock()
  105. if eventCell == nil {
  106. return
  107. }
  108. // Remove listenerID from eventCell
  109. numListeners := eventCell.RemoveListener(listenerID)
  110. // Maybe garbage collect eventCell.
  111. if numListeners == 0 {
  112. // Lock again and double check.
  113. evsw.mtx.Lock() // OUTER LOCK
  114. eventCell.mtx.Lock() // INNER LOCK
  115. if len(eventCell.listeners) == 0 {
  116. delete(evsw.eventCells, event)
  117. }
  118. eventCell.mtx.Unlock() // INNER LOCK
  119. evsw.mtx.Unlock() // OUTER LOCK
  120. }
  121. }
  122. func (evsw *eventSwitch) FireEvent(event string, data EventData) {
  123. // Get the eventCell
  124. evsw.mtx.RLock()
  125. eventCell := evsw.eventCells[event]
  126. evsw.mtx.RUnlock()
  127. if eventCell == nil {
  128. return
  129. }
  130. // Fire event for all listeners in eventCell
  131. eventCell.FireEvent(data)
  132. }
  133. //-----------------------------------------------------------------------------
  134. // eventCell handles keeping track of listener callbacks for a given event.
  135. type eventCell struct {
  136. mtx tmsync.RWMutex
  137. listeners map[string]EventCallback
  138. }
  139. func newEventCell() *eventCell {
  140. return &eventCell{
  141. listeners: make(map[string]EventCallback),
  142. }
  143. }
  144. func (cell *eventCell) AddListener(listenerID string, cb EventCallback) {
  145. cell.mtx.Lock()
  146. cell.listeners[listenerID] = cb
  147. cell.mtx.Unlock()
  148. }
  149. func (cell *eventCell) RemoveListener(listenerID string) int {
  150. cell.mtx.Lock()
  151. delete(cell.listeners, listenerID)
  152. numListeners := len(cell.listeners)
  153. cell.mtx.Unlock()
  154. return numListeners
  155. }
  156. func (cell *eventCell) FireEvent(data EventData) {
  157. cell.mtx.RLock()
  158. eventCallbacks := make([]EventCallback, 0, len(cell.listeners))
  159. for _, cb := range cell.listeners {
  160. eventCallbacks = append(eventCallbacks, cb)
  161. }
  162. cell.mtx.RUnlock()
  163. for _, cb := range eventCallbacks {
  164. cb(data)
  165. }
  166. }
  167. //-----------------------------------------------------------------------------
  168. type EventCallback func(data EventData)
  169. type eventListener struct {
  170. id string
  171. mtx tmsync.RWMutex
  172. removed bool
  173. events []string
  174. }
  175. func newEventListener(id string) *eventListener {
  176. return &eventListener{
  177. id: id,
  178. removed: false,
  179. events: nil,
  180. }
  181. }
  182. func (evl *eventListener) AddEvent(event string) error {
  183. evl.mtx.Lock()
  184. if evl.removed {
  185. evl.mtx.Unlock()
  186. return ErrListenerWasRemoved{listenerID: evl.id}
  187. }
  188. evl.events = append(evl.events, event)
  189. evl.mtx.Unlock()
  190. return nil
  191. }
  192. func (evl *eventListener) GetEvents() []string {
  193. evl.mtx.RLock()
  194. events := make([]string, len(evl.events))
  195. copy(events, evl.events)
  196. evl.mtx.RUnlock()
  197. return events
  198. }
  199. func (evl *eventListener) SetRemoved() {
  200. evl.mtx.Lock()
  201. evl.removed = true
  202. evl.mtx.Unlock()
  203. }