You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

220 lines
4.7 KiB

  1. /*
  2. Pub-Sub in go with event caching
  3. */
  4. package events
  5. import (
  6. "sync"
  7. cmn "github.com/tendermint/tendermint/libs/common"
  8. )
  9. // Generic event data can be typed and registered with tendermint/go-amino
  10. // via concrete implementation of this interface
  11. type EventData interface {
  12. //AssertIsEventData()
  13. }
  14. // reactors and other modules should export
  15. // this interface to become eventable
  16. type Eventable interface {
  17. SetEventSwitch(evsw EventSwitch)
  18. }
  19. // an event switch or cache implements fireable
  20. type Fireable interface {
  21. FireEvent(event string, data EventData)
  22. }
  23. type EventSwitch interface {
  24. cmn.Service
  25. Fireable
  26. AddListenerForEvent(listenerID, event string, cb EventCallback)
  27. RemoveListenerForEvent(event string, listenerID string)
  28. RemoveListener(listenerID string)
  29. }
  30. type eventSwitch struct {
  31. cmn.BaseService
  32. mtx sync.RWMutex
  33. eventCells map[string]*eventCell
  34. listeners map[string]*eventListener
  35. }
  36. func NewEventSwitch() EventSwitch {
  37. evsw := &eventSwitch{
  38. eventCells: make(map[string]*eventCell),
  39. listeners: make(map[string]*eventListener),
  40. }
  41. evsw.BaseService = *cmn.NewBaseService(nil, "EventSwitch", evsw)
  42. return evsw
  43. }
  44. func (evsw *eventSwitch) OnStart() error {
  45. return nil
  46. }
  47. func (evsw *eventSwitch) OnStop() {}
  48. func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) {
  49. // Get/Create eventCell and listener
  50. evsw.mtx.Lock()
  51. eventCell := evsw.eventCells[event]
  52. if eventCell == nil {
  53. eventCell = newEventCell()
  54. evsw.eventCells[event] = eventCell
  55. }
  56. listener := evsw.listeners[listenerID]
  57. if listener == nil {
  58. listener = newEventListener(listenerID)
  59. evsw.listeners[listenerID] = listener
  60. }
  61. evsw.mtx.Unlock()
  62. // Add event and listener
  63. eventCell.AddListener(listenerID, cb)
  64. listener.AddEvent(event)
  65. }
  66. func (evsw *eventSwitch) RemoveListener(listenerID string) {
  67. // Get and remove listener
  68. evsw.mtx.RLock()
  69. listener := evsw.listeners[listenerID]
  70. evsw.mtx.RUnlock()
  71. if listener == nil {
  72. return
  73. }
  74. evsw.mtx.Lock()
  75. delete(evsw.listeners, listenerID)
  76. evsw.mtx.Unlock()
  77. // Remove callback for each event.
  78. listener.SetRemoved()
  79. for _, event := range listener.GetEvents() {
  80. evsw.RemoveListenerForEvent(event, listenerID)
  81. }
  82. }
  83. func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) {
  84. // Get eventCell
  85. evsw.mtx.Lock()
  86. eventCell := evsw.eventCells[event]
  87. evsw.mtx.Unlock()
  88. if eventCell == nil {
  89. return
  90. }
  91. // Remove listenerID from eventCell
  92. numListeners := eventCell.RemoveListener(listenerID)
  93. // Maybe garbage collect eventCell.
  94. if numListeners == 0 {
  95. // Lock again and double check.
  96. evsw.mtx.Lock() // OUTER LOCK
  97. eventCell.mtx.Lock() // INNER LOCK
  98. if len(eventCell.listeners) == 0 {
  99. delete(evsw.eventCells, event)
  100. }
  101. eventCell.mtx.Unlock() // INNER LOCK
  102. evsw.mtx.Unlock() // OUTER LOCK
  103. }
  104. }
  105. func (evsw *eventSwitch) FireEvent(event string, data EventData) {
  106. // Get the eventCell
  107. evsw.mtx.RLock()
  108. eventCell := evsw.eventCells[event]
  109. evsw.mtx.RUnlock()
  110. if eventCell == nil {
  111. return
  112. }
  113. // Fire event for all listeners in eventCell
  114. eventCell.FireEvent(data)
  115. }
  116. //-----------------------------------------------------------------------------
  117. // eventCell handles keeping track of listener callbacks for a given event.
  118. type eventCell struct {
  119. mtx sync.RWMutex
  120. listeners map[string]EventCallback
  121. }
  122. func newEventCell() *eventCell {
  123. return &eventCell{
  124. listeners: make(map[string]EventCallback),
  125. }
  126. }
  127. func (cell *eventCell) AddListener(listenerID string, cb EventCallback) {
  128. cell.mtx.Lock()
  129. cell.listeners[listenerID] = cb
  130. cell.mtx.Unlock()
  131. }
  132. func (cell *eventCell) RemoveListener(listenerID string) int {
  133. cell.mtx.Lock()
  134. delete(cell.listeners, listenerID)
  135. numListeners := len(cell.listeners)
  136. cell.mtx.Unlock()
  137. return numListeners
  138. }
  139. func (cell *eventCell) FireEvent(data EventData) {
  140. cell.mtx.RLock()
  141. for _, listener := range cell.listeners {
  142. listener(data)
  143. }
  144. cell.mtx.RUnlock()
  145. }
  146. //-----------------------------------------------------------------------------
  147. type EventCallback func(data EventData)
  148. type eventListener struct {
  149. id string
  150. mtx sync.RWMutex
  151. removed bool
  152. events []string
  153. }
  154. func newEventListener(id string) *eventListener {
  155. return &eventListener{
  156. id: id,
  157. removed: false,
  158. events: nil,
  159. }
  160. }
  161. func (evl *eventListener) AddEvent(event string) {
  162. evl.mtx.Lock()
  163. defer evl.mtx.Unlock()
  164. if evl.removed {
  165. return
  166. }
  167. evl.events = append(evl.events, event)
  168. }
  169. func (evl *eventListener) GetEvents() []string {
  170. evl.mtx.RLock()
  171. defer evl.mtx.RUnlock()
  172. events := make([]string, len(evl.events))
  173. copy(events, evl.events)
  174. return events
  175. }
  176. func (evl *eventListener) SetRemoved() {
  177. evl.mtx.Lock()
  178. defer evl.mtx.Unlock()
  179. evl.removed = true
  180. }