You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

249 lines
5.7 KiB

  1. // Package events - Pub-Sub in go with event caching
  2. package events
  3. import (
  4. "fmt"
  5. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  6. "github.com/tendermint/tendermint/libs/service"
  7. )
  8. // ErrListenerWasRemoved is returned by AddEvent if the listener was removed.
  9. type ErrListenerWasRemoved struct {
  10. listenerID string
  11. }
  12. // Error implements the error interface.
  13. func (e ErrListenerWasRemoved) Error() string {
  14. return fmt.Sprintf("listener #%s was removed", e.listenerID)
  15. }
  16. // EventData is a generic event data can be typed and registered with
  17. // tendermint/go-amino via concrete implementation of this interface.
  18. type EventData interface{}
  19. // Eventable is the interface reactors and other modules must export to become
  20. // eventable.
  21. type Eventable interface {
  22. SetEventSwitch(evsw EventSwitch)
  23. }
  24. // Fireable is the interface that wraps the FireEvent method.
  25. //
  26. // FireEvent fires an event with the given name and data.
  27. type Fireable interface {
  28. FireEvent(eventValue string, data EventData)
  29. }
  30. // EventSwitch is the interface for synchronous pubsub, where listeners
  31. // subscribe to certain events and, when an event is fired (see Fireable),
  32. // notified via a callback function.
  33. //
  34. // Listeners are added by calling AddListenerForEvent function.
  35. // They can be removed by calling either RemoveListenerForEvent or
  36. // RemoveListener (for all events).
  37. type EventSwitch interface {
  38. service.Service
  39. Fireable
  40. AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error
  41. RemoveListenerForEvent(event string, listenerID string)
  42. RemoveListener(listenerID string)
  43. }
  44. type eventSwitch struct {
  45. service.BaseService
  46. mtx tmsync.RWMutex
  47. eventCells map[string]*eventCell
  48. listeners map[string]*eventListener
  49. }
  50. func NewEventSwitch() EventSwitch {
  51. evsw := &eventSwitch{
  52. eventCells: make(map[string]*eventCell),
  53. listeners: make(map[string]*eventListener),
  54. }
  55. evsw.BaseService = *service.NewBaseService(nil, "EventSwitch", evsw)
  56. return evsw
  57. }
  58. func (evsw *eventSwitch) OnStart() error {
  59. return nil
  60. }
  61. func (evsw *eventSwitch) OnStop() {}
  62. func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error {
  63. // Get/Create eventCell and listener.
  64. evsw.mtx.Lock()
  65. eventCell := evsw.eventCells[eventValue]
  66. if eventCell == nil {
  67. eventCell = newEventCell()
  68. evsw.eventCells[eventValue] = eventCell
  69. }
  70. listener := evsw.listeners[listenerID]
  71. if listener == nil {
  72. listener = newEventListener(listenerID)
  73. evsw.listeners[listenerID] = listener
  74. }
  75. evsw.mtx.Unlock()
  76. if err := listener.AddEvent(eventValue); err != nil {
  77. return err
  78. }
  79. eventCell.AddListener(listenerID, cb)
  80. return nil
  81. }
  82. func (evsw *eventSwitch) RemoveListener(listenerID string) {
  83. // Get and remove listener.
  84. evsw.mtx.RLock()
  85. listener := evsw.listeners[listenerID]
  86. evsw.mtx.RUnlock()
  87. if listener == nil {
  88. return
  89. }
  90. evsw.mtx.Lock()
  91. delete(evsw.listeners, listenerID)
  92. evsw.mtx.Unlock()
  93. // Remove callback for each event.
  94. listener.SetRemoved()
  95. for _, event := range listener.GetEvents() {
  96. evsw.RemoveListenerForEvent(event, listenerID)
  97. }
  98. }
  99. func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) {
  100. // Get eventCell
  101. evsw.mtx.Lock()
  102. eventCell := evsw.eventCells[event]
  103. evsw.mtx.Unlock()
  104. if eventCell == nil {
  105. return
  106. }
  107. // Remove listenerID from eventCell
  108. numListeners := eventCell.RemoveListener(listenerID)
  109. // Maybe garbage collect eventCell.
  110. if numListeners == 0 {
  111. // Lock again and double check.
  112. evsw.mtx.Lock() // OUTER LOCK
  113. eventCell.mtx.Lock() // INNER LOCK
  114. if len(eventCell.listeners) == 0 {
  115. delete(evsw.eventCells, event)
  116. }
  117. eventCell.mtx.Unlock() // INNER LOCK
  118. evsw.mtx.Unlock() // OUTER LOCK
  119. }
  120. }
  121. func (evsw *eventSwitch) FireEvent(event string, data EventData) {
  122. // Get the eventCell
  123. evsw.mtx.RLock()
  124. eventCell := evsw.eventCells[event]
  125. evsw.mtx.RUnlock()
  126. if eventCell == nil {
  127. return
  128. }
  129. // Fire event for all listeners in eventCell
  130. eventCell.FireEvent(data)
  131. }
  132. //-----------------------------------------------------------------------------
  133. // eventCell handles keeping track of listener callbacks for a given event.
  134. type eventCell struct {
  135. mtx tmsync.RWMutex
  136. listeners map[string]EventCallback
  137. }
  138. func newEventCell() *eventCell {
  139. return &eventCell{
  140. listeners: make(map[string]EventCallback),
  141. }
  142. }
  143. func (cell *eventCell) AddListener(listenerID string, cb EventCallback) {
  144. cell.mtx.Lock()
  145. cell.listeners[listenerID] = cb
  146. cell.mtx.Unlock()
  147. }
  148. func (cell *eventCell) RemoveListener(listenerID string) int {
  149. cell.mtx.Lock()
  150. delete(cell.listeners, listenerID)
  151. numListeners := len(cell.listeners)
  152. cell.mtx.Unlock()
  153. return numListeners
  154. }
  155. func (cell *eventCell) FireEvent(data EventData) {
  156. cell.mtx.RLock()
  157. eventCallbacks := make([]EventCallback, 0, len(cell.listeners))
  158. for _, cb := range cell.listeners {
  159. eventCallbacks = append(eventCallbacks, cb)
  160. }
  161. cell.mtx.RUnlock()
  162. for _, cb := range eventCallbacks {
  163. cb(data)
  164. }
  165. }
  166. //-----------------------------------------------------------------------------
  167. type EventCallback func(data EventData)
  168. type eventListener struct {
  169. id string
  170. mtx tmsync.RWMutex
  171. removed bool
  172. events []string
  173. }
  174. func newEventListener(id string) *eventListener {
  175. return &eventListener{
  176. id: id,
  177. removed: false,
  178. events: nil,
  179. }
  180. }
  181. func (evl *eventListener) AddEvent(event string) error {
  182. evl.mtx.Lock()
  183. if evl.removed {
  184. evl.mtx.Unlock()
  185. return ErrListenerWasRemoved{listenerID: evl.id}
  186. }
  187. evl.events = append(evl.events, event)
  188. evl.mtx.Unlock()
  189. return nil
  190. }
  191. func (evl *eventListener) GetEvents() []string {
  192. evl.mtx.RLock()
  193. events := make([]string, len(evl.events))
  194. copy(events, evl.events)
  195. evl.mtx.RUnlock()
  196. return events
  197. }
  198. func (evl *eventListener) SetRemoved() {
  199. evl.mtx.Lock()
  200. evl.removed = true
  201. evl.mtx.Unlock()
  202. }