You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

239 lines
5.0 KiB

  1. /*
  2. Pub-Sub in go with event caching
  3. */
  4. package events
  5. import (
  6. "fmt"
  7. "sync"
  8. cmn "github.com/tendermint/tendermint/libs/common"
  9. )
  10. type ErrListenerWasRemoved struct {
  11. listener string
  12. }
  13. func (e ErrListenerWasRemoved) Error() string {
  14. return fmt.Sprintf("listener %s was removed", e.listener)
  15. }
  16. // Generic event data can be typed and registered with tendermint/go-amino
  17. // via concrete implementation of this interface
  18. type EventData interface {
  19. }
  20. // reactors and other modules should export
  21. // this interface to become eventable
  22. type Eventable interface {
  23. SetEventSwitch(evsw EventSwitch)
  24. }
  25. // an event switch or cache implements fireable
  26. type Fireable interface {
  27. FireEvent(event string, data EventData)
  28. }
  29. type EventSwitch interface {
  30. cmn.Service
  31. Fireable
  32. AddListenerForEvent(listenerID, event string, cb EventCallback) error
  33. RemoveListenerForEvent(event string, listenerID string)
  34. RemoveListener(listenerID string)
  35. }
  36. type eventSwitch struct {
  37. cmn.BaseService
  38. mtx sync.RWMutex
  39. eventCells map[string]*eventCell
  40. listeners map[string]*eventListener
  41. }
  42. func NewEventSwitch() EventSwitch {
  43. evsw := &eventSwitch{
  44. eventCells: make(map[string]*eventCell),
  45. listeners: make(map[string]*eventListener),
  46. }
  47. evsw.BaseService = *cmn.NewBaseService(nil, "EventSwitch", evsw)
  48. return evsw
  49. }
  50. func (evsw *eventSwitch) OnStart() error {
  51. return nil
  52. }
  53. func (evsw *eventSwitch) OnStop() {}
  54. func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) error {
  55. // Get/Create eventCell and listener
  56. evsw.mtx.Lock()
  57. eventCell := evsw.eventCells[event]
  58. if eventCell == nil {
  59. eventCell = newEventCell()
  60. evsw.eventCells[event] = eventCell
  61. }
  62. listener := evsw.listeners[listenerID]
  63. if listener == nil {
  64. listener = newEventListener(listenerID)
  65. evsw.listeners[listenerID] = listener
  66. }
  67. evsw.mtx.Unlock()
  68. // Add event and listener
  69. err := listener.AddEvent(event)
  70. if err == nil {
  71. eventCell.AddListener(listenerID, cb)
  72. }
  73. return err
  74. }
  75. func (evsw *eventSwitch) RemoveListener(listenerID string) {
  76. // Get and remove listener
  77. evsw.mtx.RLock()
  78. listener := evsw.listeners[listenerID]
  79. evsw.mtx.RUnlock()
  80. if listener == nil {
  81. return
  82. }
  83. evsw.mtx.Lock()
  84. delete(evsw.listeners, listenerID)
  85. evsw.mtx.Unlock()
  86. // Remove callback for each event.
  87. listener.SetRemoved()
  88. for _, event := range listener.GetEvents() {
  89. evsw.RemoveListenerForEvent(event, listenerID)
  90. }
  91. }
  92. func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) {
  93. // Get eventCell
  94. evsw.mtx.Lock()
  95. eventCell := evsw.eventCells[event]
  96. evsw.mtx.Unlock()
  97. if eventCell == nil {
  98. return
  99. }
  100. // Remove listenerID from eventCell
  101. numListeners := eventCell.RemoveListener(listenerID)
  102. // Maybe garbage collect eventCell.
  103. if numListeners == 0 {
  104. // Lock again and double check.
  105. evsw.mtx.Lock() // OUTER LOCK
  106. eventCell.mtx.Lock() // INNER LOCK
  107. if len(eventCell.listeners) == 0 {
  108. delete(evsw.eventCells, event)
  109. }
  110. eventCell.mtx.Unlock() // INNER LOCK
  111. evsw.mtx.Unlock() // OUTER LOCK
  112. }
  113. }
  114. func (evsw *eventSwitch) FireEvent(event string, data EventData) {
  115. // Get the eventCell
  116. evsw.mtx.RLock()
  117. eventCell := evsw.eventCells[event]
  118. evsw.mtx.RUnlock()
  119. if eventCell == nil {
  120. return
  121. }
  122. // Fire event for all listeners in eventCell
  123. eventCell.FireEvent(data)
  124. }
  125. //-----------------------------------------------------------------------------
  126. // eventCell handles keeping track of listener callbacks for a given event.
  127. type eventCell struct {
  128. mtx sync.RWMutex
  129. listeners map[string]EventCallback
  130. }
  131. func newEventCell() *eventCell {
  132. return &eventCell{
  133. listeners: make(map[string]EventCallback),
  134. }
  135. }
  136. func (cell *eventCell) AddListener(listenerID string, cb EventCallback) {
  137. cell.mtx.Lock()
  138. cell.listeners[listenerID] = cb
  139. cell.mtx.Unlock()
  140. }
  141. func (cell *eventCell) RemoveListener(listenerID string) int {
  142. cell.mtx.Lock()
  143. delete(cell.listeners, listenerID)
  144. numListeners := len(cell.listeners)
  145. cell.mtx.Unlock()
  146. return numListeners
  147. }
  148. func (cell *eventCell) FireEvent(data EventData) {
  149. cell.mtx.RLock()
  150. var listenerCopy []EventCallback
  151. for _, listener := range cell.listeners {
  152. listenerCopy = append(listenerCopy, listener)
  153. }
  154. cell.mtx.RUnlock()
  155. for _, listener := range listenerCopy {
  156. listener(data)
  157. }
  158. }
  159. //-----------------------------------------------------------------------------
  160. type EventCallback func(data EventData)
  161. type eventListener struct {
  162. id string
  163. mtx sync.RWMutex
  164. removed bool
  165. events []string
  166. }
  167. func newEventListener(id string) *eventListener {
  168. return &eventListener{
  169. id: id,
  170. removed: false,
  171. events: nil,
  172. }
  173. }
  174. func (evl *eventListener) AddEvent(event string) error {
  175. evl.mtx.Lock()
  176. defer evl.mtx.Unlock()
  177. if evl.removed {
  178. return ErrListenerWasRemoved{listener: evl.id}
  179. }
  180. evl.events = append(evl.events, event)
  181. return nil
  182. }
  183. func (evl *eventListener) GetEvents() []string {
  184. evl.mtx.RLock()
  185. defer evl.mtx.RUnlock()
  186. events := make([]string, len(evl.events))
  187. copy(events, evl.events)
  188. return events
  189. }
  190. func (evl *eventListener) SetRemoved() {
  191. evl.mtx.Lock()
  192. defer evl.mtx.Unlock()
  193. evl.removed = true
  194. }