You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

215 lines
4.7 KiB

  1. package events
  2. import (
  3. "sync"
  4. . "github.com/tendermint/go-common"
  5. "github.com/tendermint/tendermint/types"
  6. )
  7. // reactors and other modules should export
  8. // this interface to become eventable
  9. type Eventable interface {
  10. SetEventSwitch(evsw *EventSwitch)
  11. }
  12. // an event switch or cache implements fireable
  13. type Fireable interface {
  14. FireEvent(event string, data types.EventData)
  15. }
  16. type EventSwitch struct {
  17. BaseService
  18. mtx sync.RWMutex
  19. eventCells map[string]*eventCell
  20. listeners map[string]*eventListener
  21. }
  22. func NewEventSwitch() *EventSwitch {
  23. evsw := &EventSwitch{}
  24. evsw.BaseService = *NewBaseService(log, "EventSwitch", evsw)
  25. return evsw
  26. }
  27. func (evsw *EventSwitch) OnStart() error {
  28. evsw.BaseService.OnStart()
  29. evsw.eventCells = make(map[string]*eventCell)
  30. evsw.listeners = make(map[string]*eventListener)
  31. return nil
  32. }
  33. func (evsw *EventSwitch) OnStop() {
  34. evsw.BaseService.OnStop()
  35. evsw.eventCells = nil
  36. evsw.listeners = nil
  37. }
  38. func (evsw *EventSwitch) AddListenerForEvent(listenerID, event string, cb eventCallback) {
  39. // Get/Create eventCell and listener
  40. evsw.mtx.Lock()
  41. eventCell := evsw.eventCells[event]
  42. if eventCell == nil {
  43. eventCell = newEventCell()
  44. evsw.eventCells[event] = eventCell
  45. }
  46. listener := evsw.listeners[listenerID]
  47. if listener == nil {
  48. listener = newEventListener(listenerID)
  49. evsw.listeners[listenerID] = listener
  50. }
  51. evsw.mtx.Unlock()
  52. // Add event and listener
  53. eventCell.AddListener(listenerID, cb)
  54. listener.AddEvent(event)
  55. }
  56. func (evsw *EventSwitch) RemoveListener(listenerID string) {
  57. // Get and remove listener
  58. evsw.mtx.RLock()
  59. listener := evsw.listeners[listenerID]
  60. delete(evsw.listeners, listenerID)
  61. evsw.mtx.RUnlock()
  62. if listener == nil {
  63. return
  64. }
  65. // Remove callback for each event.
  66. listener.SetRemoved()
  67. for _, event := range listener.GetEvents() {
  68. evsw.RemoveListenerForEvent(event, listenerID)
  69. }
  70. }
  71. func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerID string) {
  72. // Get eventCell
  73. evsw.mtx.Lock()
  74. eventCell := evsw.eventCells[event]
  75. evsw.mtx.Unlock()
  76. if eventCell == nil {
  77. return
  78. }
  79. // Remove listenerID from eventCell
  80. numListeners := eventCell.RemoveListener(listenerID)
  81. // Maybe garbage collect eventCell.
  82. if numListeners == 0 {
  83. // Lock again and double check.
  84. evsw.mtx.Lock() // OUTER LOCK
  85. eventCell.mtx.Lock() // INNER LOCK
  86. if len(eventCell.listeners) == 0 {
  87. delete(evsw.eventCells, event)
  88. }
  89. eventCell.mtx.Unlock() // INNER LOCK
  90. evsw.mtx.Unlock() // OUTER LOCK
  91. }
  92. }
  93. func (evsw *EventSwitch) FireEvent(event string, data types.EventData) {
  94. // Get the eventCell
  95. evsw.mtx.RLock()
  96. eventCell := evsw.eventCells[event]
  97. evsw.mtx.RUnlock()
  98. if eventCell == nil {
  99. return
  100. }
  101. // Fire event for all listeners in eventCell
  102. eventCell.FireEvent(data)
  103. }
  104. func (evsw *EventSwitch) SubscribeToEvent(receiver, eventID string, chanCap int) chan interface{} {
  105. // listen for new round
  106. ch := make(chan interface{}, chanCap)
  107. evsw.AddListenerForEvent(receiver, eventID, func(data types.EventData) {
  108. // NOTE: in production, evsw callbacks should be nonblocking.
  109. ch <- data
  110. })
  111. return ch
  112. }
  113. //-----------------------------------------------------------------------------
  114. // eventCell handles keeping track of listener callbacks for a given event.
  115. type eventCell struct {
  116. mtx sync.RWMutex
  117. listeners map[string]eventCallback
  118. }
  119. func newEventCell() *eventCell {
  120. return &eventCell{
  121. listeners: make(map[string]eventCallback),
  122. }
  123. }
  124. func (cell *eventCell) AddListener(listenerID string, cb eventCallback) {
  125. cell.mtx.Lock()
  126. cell.listeners[listenerID] = cb
  127. cell.mtx.Unlock()
  128. }
  129. func (cell *eventCell) RemoveListener(listenerID string) int {
  130. cell.mtx.Lock()
  131. delete(cell.listeners, listenerID)
  132. numListeners := len(cell.listeners)
  133. cell.mtx.Unlock()
  134. return numListeners
  135. }
  136. func (cell *eventCell) FireEvent(data types.EventData) {
  137. cell.mtx.RLock()
  138. for _, listener := range cell.listeners {
  139. listener(data)
  140. }
  141. cell.mtx.RUnlock()
  142. }
  143. //-----------------------------------------------------------------------------
  144. type eventCallback func(data types.EventData)
  145. type eventListener struct {
  146. id string
  147. mtx sync.RWMutex
  148. removed bool
  149. events []string
  150. }
  151. func newEventListener(id string) *eventListener {
  152. return &eventListener{
  153. id: id,
  154. removed: false,
  155. events: nil,
  156. }
  157. }
  158. func (evl *eventListener) AddEvent(event string) {
  159. evl.mtx.Lock()
  160. defer evl.mtx.Unlock()
  161. if evl.removed {
  162. return
  163. }
  164. evl.events = append(evl.events, event)
  165. }
  166. func (evl *eventListener) GetEvents() []string {
  167. evl.mtx.RLock()
  168. defer evl.mtx.RUnlock()
  169. events := make([]string, len(evl.events))
  170. copy(events, evl.events)
  171. return events
  172. }
  173. func (evl *eventListener) SetRemoved() {
  174. evl.mtx.Lock()
  175. defer evl.mtx.Unlock()
  176. evl.removed = true
  177. }