You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

335 lines
11 KiB

  1. package events
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "testing"
  6. "time"
  7. )
  8. // TestAddListenerForEventFireOnce sets up an EventSwitch, subscribes a single
  9. // listener to an event, and sends a string "data".
  10. func TestAddListenerForEventFireOnce(t *testing.T) {
  11. evsw := NewEventSwitch()
  12. started, err := evsw.Start()
  13. if started == false || err != nil {
  14. t.Errorf("Failed to start EventSwitch, error: %v", err)
  15. }
  16. messages := make(chan EventData)
  17. evsw.AddListenerForEvent("listener", "event",
  18. func(data EventData) {
  19. messages <- data
  20. })
  21. go evsw.FireEvent("event", "data")
  22. received := <-messages
  23. if received != "data" {
  24. t.Errorf("Message received does not match: %v", received)
  25. }
  26. }
  27. // TestAddListenerForEventFireMany sets up an EventSwitch, subscribes a single
  28. // listener to an event, and sends a thousand integers.
  29. func TestAddListenerForEventFireMany(t *testing.T) {
  30. evsw := NewEventSwitch()
  31. started, err := evsw.Start()
  32. if started == false || err != nil {
  33. t.Errorf("Failed to start EventSwitch, error: %v", err)
  34. }
  35. doneSum := make(chan uint64)
  36. doneSending := make(chan uint64)
  37. numbers := make(chan uint64, 4)
  38. // subscribe one listener for one event
  39. evsw.AddListenerForEvent("listener", "event",
  40. func(data EventData) {
  41. numbers <- data.(uint64)
  42. })
  43. // collect received events
  44. go sumReceivedNumbers(numbers, doneSum)
  45. // go fire events
  46. go fireEvents(evsw, "event", doneSending, uint64(1))
  47. checkSum := <-doneSending
  48. close(numbers)
  49. eventSum := <-doneSum
  50. if checkSum != eventSum {
  51. t.Errorf("Not all messages sent were received.\n")
  52. }
  53. }
  54. // TestAddListenerForDifferentEvents sets up an EventSwitch, subscribes a single
  55. // listener to three different events and sends a thousand integers for each
  56. // of the three events.
  57. func TestAddListenerForDifferentEvents(t *testing.T) {
  58. evsw := NewEventSwitch()
  59. started, err := evsw.Start()
  60. if started == false || err != nil {
  61. t.Errorf("Failed to start EventSwitch, error: %v", err)
  62. }
  63. doneSum := make(chan uint64)
  64. doneSending1 := make(chan uint64)
  65. doneSending2 := make(chan uint64)
  66. doneSending3 := make(chan uint64)
  67. numbers := make(chan uint64, 4)
  68. // subscribe one listener to three events
  69. evsw.AddListenerForEvent("listener", "event1",
  70. func(data EventData) {
  71. numbers <- data.(uint64)
  72. })
  73. evsw.AddListenerForEvent("listener", "event2",
  74. func(data EventData) {
  75. numbers <- data.(uint64)
  76. })
  77. evsw.AddListenerForEvent("listener", "event3",
  78. func(data EventData) {
  79. numbers <- data.(uint64)
  80. })
  81. // collect received events
  82. go sumReceivedNumbers(numbers, doneSum)
  83. // go fire events
  84. go fireEvents(evsw, "event1", doneSending1, uint64(1))
  85. go fireEvents(evsw, "event2", doneSending2, uint64(1))
  86. go fireEvents(evsw, "event3", doneSending3, uint64(1))
  87. var checkSum uint64 = 0
  88. checkSum += <-doneSending1
  89. checkSum += <-doneSending2
  90. checkSum += <-doneSending3
  91. close(numbers)
  92. eventSum := <-doneSum
  93. if checkSum != eventSum {
  94. t.Errorf("Not all messages sent were received.\n")
  95. }
  96. }
  97. // TestAddDifferentListenerForDifferentEvents sets up an EventSwitch,
  98. // subscribes a first listener to three events, and subscribes a second
  99. // listener to two of those three events, and then sends a thousand integers
  100. // for each of the three events.
  101. func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
  102. evsw := NewEventSwitch()
  103. started, err := evsw.Start()
  104. if started == false || err != nil {
  105. t.Errorf("Failed to start EventSwitch, error: %v", err)
  106. }
  107. doneSum1 := make(chan uint64)
  108. doneSum2 := make(chan uint64)
  109. doneSending1 := make(chan uint64)
  110. doneSending2 := make(chan uint64)
  111. doneSending3 := make(chan uint64)
  112. numbers1 := make(chan uint64, 4)
  113. numbers2 := make(chan uint64, 4)
  114. // subscribe two listener to three events
  115. evsw.AddListenerForEvent("listener1", "event1",
  116. func(data EventData) {
  117. numbers1 <- data.(uint64)
  118. })
  119. evsw.AddListenerForEvent("listener1", "event2",
  120. func(data EventData) {
  121. numbers1 <- data.(uint64)
  122. })
  123. evsw.AddListenerForEvent("listener1", "event3",
  124. func(data EventData) {
  125. numbers1 <- data.(uint64)
  126. })
  127. evsw.AddListenerForEvent("listener2", "event2",
  128. func(data EventData) {
  129. numbers2 <- data.(uint64)
  130. })
  131. evsw.AddListenerForEvent("listener2", "event3",
  132. func(data EventData) {
  133. numbers2 <- data.(uint64)
  134. })
  135. // collect received events for listener1
  136. go sumReceivedNumbers(numbers1, doneSum1)
  137. // collect received events for listener2
  138. go sumReceivedNumbers(numbers2, doneSum2)
  139. // go fire events
  140. go fireEvents(evsw, "event1", doneSending1, uint64(1))
  141. go fireEvents(evsw, "event2", doneSending2, uint64(1001))
  142. go fireEvents(evsw, "event3", doneSending3, uint64(2001))
  143. checkSumEvent1 := <-doneSending1
  144. checkSumEvent2 := <-doneSending2
  145. checkSumEvent3 := <-doneSending3
  146. checkSum1 := checkSumEvent1 + checkSumEvent2 + checkSumEvent3
  147. checkSum2 := checkSumEvent2 + checkSumEvent3
  148. close(numbers1)
  149. close(numbers2)
  150. eventSum1 := <-doneSum1
  151. eventSum2 := <-doneSum2
  152. if checkSum1 != eventSum1 ||
  153. checkSum2 != eventSum2 {
  154. t.Errorf("Not all messages sent were received for different listeners to different events.\n")
  155. }
  156. }
  157. // TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to
  158. // two events, fires a thousand integers for the first event, then unsubscribes
  159. // the listener and fires a thousand integers for the second event.
  160. func TestAddAndRemoveListener(t *testing.T) {
  161. evsw := NewEventSwitch()
  162. started, err := evsw.Start()
  163. if started == false || err != nil {
  164. t.Errorf("Failed to start EventSwitch, error: %v", err)
  165. }
  166. doneSum1 := make(chan uint64)
  167. doneSum2 := make(chan uint64)
  168. doneSending1 := make(chan uint64)
  169. doneSending2 := make(chan uint64)
  170. numbers1 := make(chan uint64, 4)
  171. numbers2 := make(chan uint64, 4)
  172. // subscribe two listener to three events
  173. evsw.AddListenerForEvent("listener", "event1",
  174. func(data EventData) {
  175. numbers1 <- data.(uint64)
  176. })
  177. evsw.AddListenerForEvent("listener", "event2",
  178. func(data EventData) {
  179. numbers2 <- data.(uint64)
  180. })
  181. // collect received events for event1
  182. go sumReceivedNumbers(numbers1, doneSum1)
  183. // collect received events for event2
  184. go sumReceivedNumbers(numbers2, doneSum2)
  185. // go fire events
  186. go fireEvents(evsw, "event1", doneSending1, uint64(1))
  187. checkSumEvent1 := <-doneSending1
  188. // after sending all event1, unsubscribe for all events
  189. evsw.RemoveListener("listener")
  190. go fireEvents(evsw, "event2", doneSending2, uint64(1001))
  191. checkSumEvent2 := <-doneSending2
  192. close(numbers1)
  193. close(numbers2)
  194. eventSum1 := <-doneSum1
  195. eventSum2 := <-doneSum2
  196. if checkSumEvent1 != eventSum1 ||
  197. // correct value asserted by preceding tests, suffices to be non-zero
  198. checkSumEvent2 == uint64(0) ||
  199. eventSum2 != uint64(0) {
  200. t.Errorf("Not all messages sent were received or unsubscription did not register.\n")
  201. }
  202. }
  203. // TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two
  204. // listeners to three events, and fires a thousand integers for each event.
  205. // These two listeners serve as the baseline validation while other listeners
  206. // are randomly subscribed and unsubscribed.
  207. // More precisely it randomly subscribes new listeners (different from the first
  208. // two listeners) to one of these three events. At the same time it starts
  209. // randomly unsubscribing these additional listeners from all events they are
  210. // at that point subscribed to.
  211. // NOTE: it is important to run this test with race conditions tracking on,
  212. // `go test -race`, to examine for possible race conditions.
  213. func TestRemoveListenersAsync(t *testing.T) {
  214. evsw := NewEventSwitch()
  215. started, err := evsw.Start()
  216. if started == false || err != nil {
  217. t.Errorf("Failed to start EventSwitch, error: %v", err)
  218. }
  219. doneSum1 := make(chan uint64)
  220. doneSum2 := make(chan uint64)
  221. doneSending1 := make(chan uint64)
  222. doneSending2 := make(chan uint64)
  223. doneSending3 := make(chan uint64)
  224. numbers1 := make(chan uint64, 4)
  225. numbers2 := make(chan uint64, 4)
  226. // subscribe two listener to three events
  227. evsw.AddListenerForEvent("listener1", "event1",
  228. func(data EventData) {
  229. numbers1 <- data.(uint64)
  230. })
  231. evsw.AddListenerForEvent("listener1", "event2",
  232. func(data EventData) {
  233. numbers1 <- data.(uint64)
  234. })
  235. evsw.AddListenerForEvent("listener1", "event3",
  236. func(data EventData) {
  237. numbers1 <- data.(uint64)
  238. })
  239. evsw.AddListenerForEvent("listener2", "event1",
  240. func(data EventData) {
  241. numbers2 <- data.(uint64)
  242. })
  243. evsw.AddListenerForEvent("listener2", "event2",
  244. func(data EventData) {
  245. numbers2 <- data.(uint64)
  246. })
  247. evsw.AddListenerForEvent("listener2", "event3",
  248. func(data EventData) {
  249. numbers2 <- data.(uint64)
  250. })
  251. // collect received events for event1
  252. go sumReceivedNumbers(numbers1, doneSum1)
  253. // collect received events for event2
  254. go sumReceivedNumbers(numbers2, doneSum2)
  255. addListenersStress := func() {
  256. s1 := rand.NewSource(time.Now().UnixNano())
  257. r1 := rand.New(s1)
  258. for k := uint16(0); k < 400; k++ {
  259. listenerNumber := r1.Intn(100) + 3
  260. eventNumber := r1.Intn(3) + 1
  261. go evsw.AddListenerForEvent(fmt.Sprintf("listener%v", listenerNumber),
  262. fmt.Sprintf("event%v", eventNumber),
  263. func(_ EventData) {})
  264. }
  265. }
  266. removeListenersStress := func() {
  267. s2 := rand.NewSource(time.Now().UnixNano())
  268. r2 := rand.New(s2)
  269. for k := uint16(0); k < 80; k++ {
  270. listenerNumber := r2.Intn(100) + 3
  271. go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber))
  272. }
  273. }
  274. addListenersStress()
  275. // go fire events
  276. go fireEvents(evsw, "event1", doneSending1, uint64(1))
  277. removeListenersStress()
  278. go fireEvents(evsw, "event2", doneSending2, uint64(1001))
  279. go fireEvents(evsw, "event3", doneSending3, uint64(2001))
  280. checkSumEvent1 := <-doneSending1
  281. checkSumEvent2 := <-doneSending2
  282. checkSumEvent3 := <-doneSending3
  283. checkSum := checkSumEvent1 + checkSumEvent2 + checkSumEvent3
  284. close(numbers1)
  285. close(numbers2)
  286. eventSum1 := <-doneSum1
  287. eventSum2 := <-doneSum2
  288. if checkSum != eventSum1 ||
  289. checkSum != eventSum2 {
  290. t.Errorf("Not all messages sent were received.\n")
  291. }
  292. }
  293. //------------------------------------------------------------------------------
  294. // Helper functions
  295. // sumReceivedNumbers takes two channels and adds all numbers received
  296. // until the receiving channel `numbers` is closed; it then sends the sum
  297. // on `doneSum` and closes that channel. Expected to be run in a go-routine.
  298. func sumReceivedNumbers(numbers, doneSum chan uint64) {
  299. var sum uint64 = 0
  300. for {
  301. j, more := <-numbers
  302. sum += j
  303. if !more {
  304. doneSum <- sum
  305. close(doneSum)
  306. return
  307. }
  308. }
  309. }
  310. // fireEvents takes an EventSwitch and fires a thousand integers under
  311. // a given `event` with the integers mootonically increasing from `offset`
  312. // to `offset` + 999. It additionally returns the addition of all integers
  313. // sent on `doneChan` for assertion that all events have been sent, and enabling
  314. // the test to assert all events have also been received.
  315. func fireEvents(evsw *EventSwitch, event string, doneChan chan uint64,
  316. offset uint64) {
  317. var sentSum uint64 = 0
  318. for i := offset; i <= offset+uint64(999); i++ {
  319. sentSum += i
  320. evsw.FireEvent(event, i)
  321. }
  322. doneChan <- sentSum
  323. close(doneChan)
  324. return
  325. }