You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

203 lines
4.3 KiB

  1. package events
  2. import (
  3. "sync"
  4. . "github.com/tendermint/tendermint/common"
  5. )
  6. // reactors and other modules should export
  7. // this interface to become eventable
  8. type Eventable interface {
  9. SetFireable(Fireable)
  10. }
  11. // an event switch or cache implements fireable
  12. type Fireable interface {
  13. FireEvent(event string, msg interface{})
  14. }
  15. type EventSwitch struct {
  16. BaseService
  17. mtx sync.RWMutex
  18. eventCells map[string]*eventCell
  19. listeners map[string]*eventListener
  20. }
  21. func NewEventSwitch() *EventSwitch {
  22. evsw := &EventSwitch{}
  23. evsw.BaseService = *NewBaseService(log, "EventSwitch", evsw)
  24. return evsw
  25. }
  26. func (evsw *EventSwitch) OnStart() {
  27. evsw.BaseService.OnStart()
  28. evsw.eventCells = make(map[string]*eventCell)
  29. evsw.listeners = make(map[string]*eventListener)
  30. }
  31. func (evsw *EventSwitch) OnStop() {
  32. evsw.BaseService.OnStop()
  33. evsw.eventCells = nil
  34. evsw.listeners = nil
  35. }
  36. func (evsw *EventSwitch) AddListenerForEvent(listenerId, event string, cb eventCallback) {
  37. // Get/Create eventCell and listener
  38. evsw.mtx.Lock()
  39. eventCell := evsw.eventCells[event]
  40. if eventCell == nil {
  41. eventCell = newEventCell()
  42. evsw.eventCells[event] = eventCell
  43. }
  44. listener := evsw.listeners[listenerId]
  45. if listener == nil {
  46. listener = newEventListener(listenerId)
  47. evsw.listeners[listenerId] = listener
  48. }
  49. evsw.mtx.Unlock()
  50. // Add event and listener
  51. eventCell.AddListener(listenerId, cb)
  52. listener.AddEvent(event)
  53. }
  54. func (evsw *EventSwitch) RemoveListener(listenerId string) {
  55. // Get and remove listener
  56. evsw.mtx.RLock()
  57. listener := evsw.listeners[listenerId]
  58. delete(evsw.listeners, listenerId)
  59. evsw.mtx.RUnlock()
  60. if listener == nil {
  61. return
  62. }
  63. // Remove callback for each event.
  64. listener.SetRemoved()
  65. for _, event := range listener.GetEvents() {
  66. evsw.RemoveListenerForEvent(event, listenerId)
  67. }
  68. }
  69. func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerId string) {
  70. // Get eventCell
  71. evsw.mtx.Lock()
  72. eventCell := evsw.eventCells[event]
  73. evsw.mtx.Unlock()
  74. if eventCell == nil {
  75. return
  76. }
  77. // Remove listenerId from eventCell
  78. numListeners := eventCell.RemoveListener(listenerId)
  79. // Maybe garbage collect eventCell.
  80. if numListeners == 0 {
  81. // Lock again and double check.
  82. evsw.mtx.Lock() // OUTER LOCK
  83. eventCell.mtx.Lock() // INNER LOCK
  84. if len(eventCell.listeners) == 0 {
  85. delete(evsw.eventCells, event)
  86. }
  87. eventCell.mtx.Unlock() // INNER LOCK
  88. evsw.mtx.Unlock() // OUTER LOCK
  89. }
  90. }
  91. func (evsw *EventSwitch) FireEvent(event string, msg interface{}) {
  92. // Get the eventCell
  93. evsw.mtx.RLock()
  94. eventCell := evsw.eventCells[event]
  95. evsw.mtx.RUnlock()
  96. if eventCell == nil {
  97. return
  98. }
  99. // Fire event for all listeners in eventCell
  100. eventCell.FireEvent(msg)
  101. }
  102. //-----------------------------------------------------------------------------
  103. // eventCell handles keeping track of listener callbacks for a given event.
  104. type eventCell struct {
  105. mtx sync.RWMutex
  106. listeners map[string]eventCallback
  107. }
  108. func newEventCell() *eventCell {
  109. return &eventCell{
  110. listeners: make(map[string]eventCallback),
  111. }
  112. }
  113. func (cell *eventCell) AddListener(listenerId string, cb eventCallback) {
  114. cell.mtx.Lock()
  115. cell.listeners[listenerId] = cb
  116. cell.mtx.Unlock()
  117. }
  118. func (cell *eventCell) RemoveListener(listenerId string) int {
  119. cell.mtx.Lock()
  120. delete(cell.listeners, listenerId)
  121. numListeners := len(cell.listeners)
  122. cell.mtx.Unlock()
  123. return numListeners
  124. }
  125. func (cell *eventCell) FireEvent(msg interface{}) {
  126. cell.mtx.RLock()
  127. for _, listener := range cell.listeners {
  128. listener(msg)
  129. }
  130. cell.mtx.RUnlock()
  131. }
  132. //-----------------------------------------------------------------------------
  133. type eventCallback func(msg interface{})
  134. type eventListener struct {
  135. id string
  136. mtx sync.RWMutex
  137. removed bool
  138. events []string
  139. }
  140. func newEventListener(id string) *eventListener {
  141. return &eventListener{
  142. id: id,
  143. removed: false,
  144. events: nil,
  145. }
  146. }
  147. func (evl *eventListener) AddEvent(event string) {
  148. evl.mtx.Lock()
  149. defer evl.mtx.Unlock()
  150. if evl.removed {
  151. return
  152. }
  153. evl.events = append(evl.events, event)
  154. }
  155. func (evl *eventListener) GetEvents() []string {
  156. evl.mtx.RLock()
  157. defer evl.mtx.RUnlock()
  158. events := make([]string, len(evl.events))
  159. copy(events, evl.events)
  160. return events
  161. }
  162. func (evl *eventListener) SetRemoved() {
  163. evl.mtx.Lock()
  164. defer evl.mtx.Unlock()
  165. evl.removed = true
  166. }