You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

226 lines
4.8 KiB

8 years ago
9 years ago
  1. /*
  2. Pub-Sub in go with event caching
  3. */
  4. package events
  5. import (
  6. "sync"
  7. . "github.com/tendermint/go-common"
  8. )
  9. // Generic event data can be typed and registered with tendermint/go-wire
  10. // via concrete implementation of this interface
  11. type EventData interface {
  12. //AssertIsEventData()
  13. }
  14. // reactors and other modules should export
  15. // this interface to become eventable
  16. type Eventable interface {
  17. SetEventSwitch(evsw EventSwitch)
  18. }
  19. // an event switch or cache implements fireable
  20. type Fireable interface {
  21. FireEvent(event string, data EventData)
  22. }
  23. type EventSwitch interface {
  24. Service
  25. Fireable
  26. AddListenerForEvent(listenerID, event string, cb EventCallback)
  27. RemoveListenerForEvent(event string, listenerID string)
  28. RemoveListener(listenerID string)
  29. }
  30. type eventSwitch struct {
  31. BaseService
  32. mtx sync.RWMutex
  33. eventCells map[string]*eventCell
  34. listeners map[string]*eventListener
  35. }
  36. func NewEventSwitch() EventSwitch {
  37. evsw := &eventSwitch{}
  38. evsw.BaseService = *NewBaseService(log, "EventSwitch", evsw)
  39. return evsw
  40. }
  41. func (evsw *eventSwitch) OnStart() error {
  42. evsw.BaseService.OnStart()
  43. evsw.eventCells = make(map[string]*eventCell)
  44. evsw.listeners = make(map[string]*eventListener)
  45. return nil
  46. }
  47. func (evsw *eventSwitch) OnStop() {
  48. evsw.mtx.Lock()
  49. defer evsw.mtx.Unlock()
  50. evsw.BaseService.OnStop()
  51. evsw.eventCells = nil
  52. evsw.listeners = nil
  53. }
  54. func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) {
  55. // Get/Create eventCell and listener
  56. evsw.mtx.Lock()
  57. eventCell := evsw.eventCells[event]
  58. if eventCell == nil {
  59. eventCell = newEventCell()
  60. evsw.eventCells[event] = eventCell
  61. }
  62. listener := evsw.listeners[listenerID]
  63. if listener == nil {
  64. listener = newEventListener(listenerID)
  65. evsw.listeners[listenerID] = listener
  66. }
  67. evsw.mtx.Unlock()
  68. // Add event and listener
  69. eventCell.AddListener(listenerID, cb)
  70. listener.AddEvent(event)
  71. }
  72. func (evsw *eventSwitch) RemoveListener(listenerID string) {
  73. // Get and remove listener
  74. evsw.mtx.RLock()
  75. listener := evsw.listeners[listenerID]
  76. evsw.mtx.RUnlock()
  77. if listener == nil {
  78. return
  79. }
  80. evsw.mtx.Lock()
  81. delete(evsw.listeners, listenerID)
  82. evsw.mtx.Unlock()
  83. // Remove callback for each event.
  84. listener.SetRemoved()
  85. for _, event := range listener.GetEvents() {
  86. evsw.RemoveListenerForEvent(event, listenerID)
  87. }
  88. }
  89. func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) {
  90. // Get eventCell
  91. evsw.mtx.Lock()
  92. eventCell := evsw.eventCells[event]
  93. evsw.mtx.Unlock()
  94. if eventCell == nil {
  95. return
  96. }
  97. // Remove listenerID from eventCell
  98. numListeners := eventCell.RemoveListener(listenerID)
  99. // Maybe garbage collect eventCell.
  100. if numListeners == 0 {
  101. // Lock again and double check.
  102. evsw.mtx.Lock() // OUTER LOCK
  103. eventCell.mtx.Lock() // INNER LOCK
  104. if len(eventCell.listeners) == 0 {
  105. delete(evsw.eventCells, event)
  106. }
  107. eventCell.mtx.Unlock() // INNER LOCK
  108. evsw.mtx.Unlock() // OUTER LOCK
  109. }
  110. }
  111. func (evsw *eventSwitch) FireEvent(event string, data EventData) {
  112. // Get the eventCell
  113. evsw.mtx.RLock()
  114. eventCell := evsw.eventCells[event]
  115. evsw.mtx.RUnlock()
  116. if eventCell == nil {
  117. return
  118. }
  119. // Fire event for all listeners in eventCell
  120. eventCell.FireEvent(data)
  121. }
  122. //-----------------------------------------------------------------------------
  123. // eventCell handles keeping track of listener callbacks for a given event.
  124. type eventCell struct {
  125. mtx sync.RWMutex
  126. listeners map[string]EventCallback
  127. }
  128. func newEventCell() *eventCell {
  129. return &eventCell{
  130. listeners: make(map[string]EventCallback),
  131. }
  132. }
  133. func (cell *eventCell) AddListener(listenerID string, cb EventCallback) {
  134. cell.mtx.Lock()
  135. cell.listeners[listenerID] = cb
  136. cell.mtx.Unlock()
  137. }
  138. func (cell *eventCell) RemoveListener(listenerID string) int {
  139. cell.mtx.Lock()
  140. delete(cell.listeners, listenerID)
  141. numListeners := len(cell.listeners)
  142. cell.mtx.Unlock()
  143. return numListeners
  144. }
  145. func (cell *eventCell) FireEvent(data EventData) {
  146. cell.mtx.RLock()
  147. for _, listener := range cell.listeners {
  148. listener(data)
  149. }
  150. cell.mtx.RUnlock()
  151. }
  152. //-----------------------------------------------------------------------------
  153. type EventCallback func(data EventData)
  154. type eventListener struct {
  155. id string
  156. mtx sync.RWMutex
  157. removed bool
  158. events []string
  159. }
  160. func newEventListener(id string) *eventListener {
  161. return &eventListener{
  162. id: id,
  163. removed: false,
  164. events: nil,
  165. }
  166. }
  167. func (evl *eventListener) AddEvent(event string) {
  168. evl.mtx.Lock()
  169. defer evl.mtx.Unlock()
  170. if evl.removed {
  171. return
  172. }
  173. evl.events = append(evl.events, event)
  174. }
  175. func (evl *eventListener) GetEvents() []string {
  176. evl.mtx.RLock()
  177. defer evl.mtx.RUnlock()
  178. events := make([]string, len(evl.events))
  179. copy(events, evl.events)
  180. return events
  181. }
  182. func (evl *eventListener) SetRemoved() {
  183. evl.mtx.Lock()
  184. defer evl.mtx.Unlock()
  185. evl.removed = true
  186. }