You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

487 lines
14 KiB

lint: Enable Golint (#4212) * Fix many golint errors * Fix golint errors in the 'lite' package * Don't export Pool.store * Fix typo * Revert unwanted changes * Fix errors in counter package * Fix linter errors in kvstore package * Fix linter error in example package * Fix error in tests package * Fix linter errors in v2 package * Fix linter errors in consensus package * Fix linter errors in evidence package * Fix linter error in fail package * Fix linter errors in query package * Fix linter errors in core package * Fix linter errors in node package * Fix linter errors in mempool package * Fix linter error in conn package * Fix linter errors in pex package * Rename PEXReactor export to Reactor * Fix linter errors in trust package * Fix linter errors in upnp package * Fix linter errors in p2p package * Fix linter errors in proxy package * Fix linter errors in mock_test package * Fix linter error in client_test package * Fix linter errors in coretypes package * Fix linter errors in coregrpc package * Fix linter errors in rpcserver package * Fix linter errors in rpctypes package * Fix linter errors in rpctest package * Fix linter error in json2wal script * Fix linter error in wal2json script * Fix linter errors in kv package * Fix linter error in state package * Fix linter error in grpc_client * Fix linter errors in types package * Fix linter error in version package * Fix remaining errors * Address review comments * Fix broken tests * Reconcile package coregrpc * Fix golangci bot error * Fix new golint errors * Fix broken reference * Enable golint linter * minor changes to bring golint into line * fix failing test * fix pex reactor naming * address PR comments
5 years ago
  1. package events
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. "github.com/stretchr/testify/assert"
  7. "github.com/stretchr/testify/require"
  8. "github.com/tendermint/tendermint/libs/rand"
  9. )
  10. // TestAddListenerForEventFireOnce sets up an EventSwitch, subscribes a single
  11. // listener to an event, and sends a string "data".
  12. func TestAddListenerForEventFireOnce(t *testing.T) {
  13. evsw := NewEventSwitch()
  14. err := evsw.Start()
  15. require.NoError(t, err)
  16. t.Cleanup(func() {
  17. if err := evsw.Stop(); err != nil {
  18. t.Error(err)
  19. }
  20. })
  21. messages := make(chan EventData)
  22. err = evsw.AddListenerForEvent("listener", "event",
  23. func(data EventData) {
  24. // test there's no deadlock if we remove the listener inside a callback
  25. evsw.RemoveListener("listener")
  26. messages <- data
  27. })
  28. require.NoError(t, err)
  29. go evsw.FireEvent("event", "data")
  30. received := <-messages
  31. if received != "data" {
  32. t.Errorf("message received does not match: %v", received)
  33. }
  34. }
  35. // TestAddListenerForEventFireMany sets up an EventSwitch, subscribes a single
  36. // listener to an event, and sends a thousand integers.
  37. func TestAddListenerForEventFireMany(t *testing.T) {
  38. evsw := NewEventSwitch()
  39. err := evsw.Start()
  40. require.NoError(t, err)
  41. t.Cleanup(func() {
  42. if err := evsw.Stop(); err != nil {
  43. t.Error(err)
  44. }
  45. })
  46. doneSum := make(chan uint64)
  47. doneSending := make(chan uint64)
  48. numbers := make(chan uint64, 4)
  49. // subscribe one listener for one event
  50. err = evsw.AddListenerForEvent("listener", "event",
  51. func(data EventData) {
  52. numbers <- data.(uint64)
  53. })
  54. require.NoError(t, err)
  55. // collect received events
  56. go sumReceivedNumbers(numbers, doneSum)
  57. // go fire events
  58. go fireEvents(evsw, "event", doneSending, uint64(1))
  59. checkSum := <-doneSending
  60. close(numbers)
  61. eventSum := <-doneSum
  62. if checkSum != eventSum {
  63. t.Errorf("not all messages sent were received.\n")
  64. }
  65. }
  66. // TestAddListenerForDifferentEvents sets up an EventSwitch, subscribes a single
  67. // listener to three different events and sends a thousand integers for each
  68. // of the three events.
  69. func TestAddListenerForDifferentEvents(t *testing.T) {
  70. evsw := NewEventSwitch()
  71. err := evsw.Start()
  72. require.NoError(t, err)
  73. t.Cleanup(func() {
  74. if err := evsw.Stop(); err != nil {
  75. t.Error(err)
  76. }
  77. })
  78. doneSum := make(chan uint64)
  79. doneSending1 := make(chan uint64)
  80. doneSending2 := make(chan uint64)
  81. doneSending3 := make(chan uint64)
  82. numbers := make(chan uint64, 4)
  83. // subscribe one listener to three events
  84. err = evsw.AddListenerForEvent("listener", "event1",
  85. func(data EventData) {
  86. numbers <- data.(uint64)
  87. })
  88. require.NoError(t, err)
  89. err = evsw.AddListenerForEvent("listener", "event2",
  90. func(data EventData) {
  91. numbers <- data.(uint64)
  92. })
  93. require.NoError(t, err)
  94. err = evsw.AddListenerForEvent("listener", "event3",
  95. func(data EventData) {
  96. numbers <- data.(uint64)
  97. })
  98. require.NoError(t, err)
  99. // collect received events
  100. go sumReceivedNumbers(numbers, doneSum)
  101. // go fire events
  102. go fireEvents(evsw, "event1", doneSending1, uint64(1))
  103. go fireEvents(evsw, "event2", doneSending2, uint64(1))
  104. go fireEvents(evsw, "event3", doneSending3, uint64(1))
  105. var checkSum uint64
  106. checkSum += <-doneSending1
  107. checkSum += <-doneSending2
  108. checkSum += <-doneSending3
  109. close(numbers)
  110. eventSum := <-doneSum
  111. if checkSum != eventSum {
  112. t.Errorf("not all messages sent were received.\n")
  113. }
  114. }
  115. // TestAddDifferentListenerForDifferentEvents sets up an EventSwitch,
  116. // subscribes a first listener to three events, and subscribes a second
  117. // listener to two of those three events, and then sends a thousand integers
  118. // for each of the three events.
  119. func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
  120. evsw := NewEventSwitch()
  121. err := evsw.Start()
  122. require.NoError(t, err)
  123. t.Cleanup(func() {
  124. if err := evsw.Stop(); err != nil {
  125. t.Error(err)
  126. }
  127. })
  128. doneSum1 := make(chan uint64)
  129. doneSum2 := make(chan uint64)
  130. doneSending1 := make(chan uint64)
  131. doneSending2 := make(chan uint64)
  132. doneSending3 := make(chan uint64)
  133. numbers1 := make(chan uint64, 4)
  134. numbers2 := make(chan uint64, 4)
  135. // subscribe two listener to three events
  136. err = evsw.AddListenerForEvent("listener1", "event1",
  137. func(data EventData) {
  138. numbers1 <- data.(uint64)
  139. })
  140. require.NoError(t, err)
  141. err = evsw.AddListenerForEvent("listener1", "event2",
  142. func(data EventData) {
  143. numbers1 <- data.(uint64)
  144. })
  145. require.NoError(t, err)
  146. err = evsw.AddListenerForEvent("listener1", "event3",
  147. func(data EventData) {
  148. numbers1 <- data.(uint64)
  149. })
  150. require.NoError(t, err)
  151. err = evsw.AddListenerForEvent("listener2", "event2",
  152. func(data EventData) {
  153. numbers2 <- data.(uint64)
  154. })
  155. require.NoError(t, err)
  156. err = evsw.AddListenerForEvent("listener2", "event3",
  157. func(data EventData) {
  158. numbers2 <- data.(uint64)
  159. })
  160. require.NoError(t, err)
  161. // collect received events for listener1
  162. go sumReceivedNumbers(numbers1, doneSum1)
  163. // collect received events for listener2
  164. go sumReceivedNumbers(numbers2, doneSum2)
  165. // go fire events
  166. go fireEvents(evsw, "event1", doneSending1, uint64(1))
  167. go fireEvents(evsw, "event2", doneSending2, uint64(1001))
  168. go fireEvents(evsw, "event3", doneSending3, uint64(2001))
  169. checkSumEvent1 := <-doneSending1
  170. checkSumEvent2 := <-doneSending2
  171. checkSumEvent3 := <-doneSending3
  172. checkSum1 := checkSumEvent1 + checkSumEvent2 + checkSumEvent3
  173. checkSum2 := checkSumEvent2 + checkSumEvent3
  174. close(numbers1)
  175. close(numbers2)
  176. eventSum1 := <-doneSum1
  177. eventSum2 := <-doneSum2
  178. if checkSum1 != eventSum1 ||
  179. checkSum2 != eventSum2 {
  180. t.Errorf("not all messages sent were received for different listeners to different events.\n")
  181. }
  182. }
  183. func TestAddAndRemoveListenerConcurrency(t *testing.T) {
  184. var (
  185. stopInputEvent = false
  186. roundCount = 2000
  187. )
  188. evsw := NewEventSwitch()
  189. err := evsw.Start()
  190. require.NoError(t, err)
  191. t.Cleanup(func() {
  192. if err := evsw.Stop(); err != nil {
  193. t.Error(err)
  194. }
  195. })
  196. done1 := make(chan struct{})
  197. done2 := make(chan struct{})
  198. // Must be executed concurrently to uncover the data race.
  199. // 1. RemoveListener
  200. go func() {
  201. defer close(done1)
  202. for i := 0; i < roundCount; i++ {
  203. evsw.RemoveListener("listener")
  204. }
  205. }()
  206. // 2. AddListenerForEvent
  207. go func() {
  208. defer close(done2)
  209. for i := 0; i < roundCount; i++ {
  210. index := i
  211. // we explicitly ignore errors here, since the listener will sometimes be removed
  212. // (that's what we're testing)
  213. _ = evsw.AddListenerForEvent("listener", fmt.Sprintf("event%d", index),
  214. func(data EventData) {
  215. t.Errorf("should not run callback for %d.\n", index)
  216. stopInputEvent = true
  217. })
  218. }
  219. }()
  220. <-done1
  221. <-done2
  222. evsw.RemoveListener("listener") // remove the last listener
  223. for i := 0; i < roundCount && !stopInputEvent; i++ {
  224. evsw.FireEvent(fmt.Sprintf("event%d", i), uint64(1001))
  225. }
  226. }
  227. // TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to
  228. // two events, fires a thousand integers for the first event, then unsubscribes
  229. // the listener and fires a thousand integers for the second event.
  230. func TestAddAndRemoveListener(t *testing.T) {
  231. evsw := NewEventSwitch()
  232. err := evsw.Start()
  233. require.NoError(t, err)
  234. t.Cleanup(func() {
  235. if err := evsw.Stop(); err != nil {
  236. t.Error(err)
  237. }
  238. })
  239. doneSum1 := make(chan uint64)
  240. doneSum2 := make(chan uint64)
  241. doneSending1 := make(chan uint64)
  242. doneSending2 := make(chan uint64)
  243. numbers1 := make(chan uint64, 4)
  244. numbers2 := make(chan uint64, 4)
  245. // subscribe two listener to three events
  246. err = evsw.AddListenerForEvent("listener", "event1",
  247. func(data EventData) {
  248. numbers1 <- data.(uint64)
  249. })
  250. require.NoError(t, err)
  251. err = evsw.AddListenerForEvent("listener", "event2",
  252. func(data EventData) {
  253. numbers2 <- data.(uint64)
  254. })
  255. require.NoError(t, err)
  256. // collect received events for event1
  257. go sumReceivedNumbers(numbers1, doneSum1)
  258. // collect received events for event2
  259. go sumReceivedNumbers(numbers2, doneSum2)
  260. // go fire events
  261. go fireEvents(evsw, "event1", doneSending1, uint64(1))
  262. checkSumEvent1 := <-doneSending1
  263. // after sending all event1, unsubscribe for all events
  264. evsw.RemoveListener("listener")
  265. go fireEvents(evsw, "event2", doneSending2, uint64(1001))
  266. checkSumEvent2 := <-doneSending2
  267. close(numbers1)
  268. close(numbers2)
  269. eventSum1 := <-doneSum1
  270. eventSum2 := <-doneSum2
  271. if checkSumEvent1 != eventSum1 ||
  272. // correct value asserted by preceding tests, suffices to be non-zero
  273. checkSumEvent2 == uint64(0) ||
  274. eventSum2 != uint64(0) {
  275. t.Errorf("not all messages sent were received or unsubscription did not register.\n")
  276. }
  277. }
  278. // TestRemoveListener does basic tests on adding and removing
  279. func TestRemoveListener(t *testing.T) {
  280. evsw := NewEventSwitch()
  281. err := evsw.Start()
  282. require.NoError(t, err)
  283. t.Cleanup(func() {
  284. if err := evsw.Stop(); err != nil {
  285. t.Error(err)
  286. }
  287. })
  288. count := 10
  289. sum1, sum2 := 0, 0
  290. // add some listeners and make sure they work
  291. err = evsw.AddListenerForEvent("listener", "event1",
  292. func(data EventData) {
  293. sum1++
  294. })
  295. require.NoError(t, err)
  296. err = evsw.AddListenerForEvent("listener", "event2",
  297. func(data EventData) {
  298. sum2++
  299. })
  300. require.NoError(t, err)
  301. for i := 0; i < count; i++ {
  302. evsw.FireEvent("event1", true)
  303. evsw.FireEvent("event2", true)
  304. }
  305. assert.Equal(t, count, sum1)
  306. assert.Equal(t, count, sum2)
  307. // remove one by event and make sure it is gone
  308. evsw.RemoveListenerForEvent("event2", "listener")
  309. for i := 0; i < count; i++ {
  310. evsw.FireEvent("event1", true)
  311. evsw.FireEvent("event2", true)
  312. }
  313. assert.Equal(t, count*2, sum1)
  314. assert.Equal(t, count, sum2)
  315. // remove the listener entirely and make sure both gone
  316. evsw.RemoveListener("listener")
  317. for i := 0; i < count; i++ {
  318. evsw.FireEvent("event1", true)
  319. evsw.FireEvent("event2", true)
  320. }
  321. assert.Equal(t, count*2, sum1)
  322. assert.Equal(t, count, sum2)
  323. }
  324. // TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two
  325. // listeners to three events, and fires a thousand integers for each event.
  326. // These two listeners serve as the baseline validation while other listeners
  327. // are randomly subscribed and unsubscribed.
  328. // More precisely it randomly subscribes new listeners (different from the first
  329. // two listeners) to one of these three events. At the same time it starts
  330. // randomly unsubscribing these additional listeners from all events they are
  331. // at that point subscribed to.
  332. // NOTE: it is important to run this test with race conditions tracking on,
  333. // `go test -race`, to examine for possible race conditions.
  334. func TestRemoveListenersAsync(t *testing.T) {
  335. evsw := NewEventSwitch()
  336. err := evsw.Start()
  337. require.NoError(t, err)
  338. t.Cleanup(func() {
  339. if err := evsw.Stop(); err != nil {
  340. t.Error(err)
  341. }
  342. })
  343. doneSum1 := make(chan uint64)
  344. doneSum2 := make(chan uint64)
  345. doneSending1 := make(chan uint64)
  346. doneSending2 := make(chan uint64)
  347. doneSending3 := make(chan uint64)
  348. numbers1 := make(chan uint64, 4)
  349. numbers2 := make(chan uint64, 4)
  350. // subscribe two listener to three events
  351. err = evsw.AddListenerForEvent("listener1", "event1",
  352. func(data EventData) {
  353. numbers1 <- data.(uint64)
  354. })
  355. require.NoError(t, err)
  356. err = evsw.AddListenerForEvent("listener1", "event2",
  357. func(data EventData) {
  358. numbers1 <- data.(uint64)
  359. })
  360. require.NoError(t, err)
  361. err = evsw.AddListenerForEvent("listener1", "event3",
  362. func(data EventData) {
  363. numbers1 <- data.(uint64)
  364. })
  365. require.NoError(t, err)
  366. err = evsw.AddListenerForEvent("listener2", "event1",
  367. func(data EventData) {
  368. numbers2 <- data.(uint64)
  369. })
  370. require.NoError(t, err)
  371. err = evsw.AddListenerForEvent("listener2", "event2",
  372. func(data EventData) {
  373. numbers2 <- data.(uint64)
  374. })
  375. require.NoError(t, err)
  376. err = evsw.AddListenerForEvent("listener2", "event3",
  377. func(data EventData) {
  378. numbers2 <- data.(uint64)
  379. })
  380. require.NoError(t, err)
  381. // collect received events for event1
  382. go sumReceivedNumbers(numbers1, doneSum1)
  383. // collect received events for event2
  384. go sumReceivedNumbers(numbers2, doneSum2)
  385. addListenersStress := func() {
  386. r1 := rand.NewRand()
  387. r1.Seed(time.Now().UnixNano())
  388. for k := uint16(0); k < 400; k++ {
  389. listenerNumber := r1.Intn(100) + 3
  390. eventNumber := r1.Intn(3) + 1
  391. go evsw.AddListenerForEvent(fmt.Sprintf("listener%v", listenerNumber), //nolint:errcheck // ignore for tests
  392. fmt.Sprintf("event%v", eventNumber),
  393. func(_ EventData) {})
  394. }
  395. }
  396. removeListenersStress := func() {
  397. r2 := rand.NewRand()
  398. r2.Seed(time.Now().UnixNano())
  399. for k := uint16(0); k < 80; k++ {
  400. listenerNumber := r2.Intn(100) + 3
  401. go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber))
  402. }
  403. }
  404. addListenersStress()
  405. // go fire events
  406. go fireEvents(evsw, "event1", doneSending1, uint64(1))
  407. removeListenersStress()
  408. go fireEvents(evsw, "event2", doneSending2, uint64(1001))
  409. go fireEvents(evsw, "event3", doneSending3, uint64(2001))
  410. checkSumEvent1 := <-doneSending1
  411. checkSumEvent2 := <-doneSending2
  412. checkSumEvent3 := <-doneSending3
  413. checkSum := checkSumEvent1 + checkSumEvent2 + checkSumEvent3
  414. close(numbers1)
  415. close(numbers2)
  416. eventSum1 := <-doneSum1
  417. eventSum2 := <-doneSum2
  418. if checkSum != eventSum1 ||
  419. checkSum != eventSum2 {
  420. t.Errorf("not all messages sent were received.\n")
  421. }
  422. }
  423. //------------------------------------------------------------------------------
  424. // Helper functions
  425. // sumReceivedNumbers takes two channels and adds all numbers received
  426. // until the receiving channel `numbers` is closed; it then sends the sum
  427. // on `doneSum` and closes that channel. Expected to be run in a go-routine.
  428. func sumReceivedNumbers(numbers, doneSum chan uint64) {
  429. var sum uint64
  430. for {
  431. j, more := <-numbers
  432. sum += j
  433. if !more {
  434. doneSum <- sum
  435. close(doneSum)
  436. return
  437. }
  438. }
  439. }
  440. // fireEvents takes an EventSwitch and fires a thousand integers under
  441. // a given `event` with the integers mootonically increasing from `offset`
  442. // to `offset` + 999. It additionally returns the addition of all integers
  443. // sent on `doneChan` for assertion that all events have been sent, and enabling
  444. // the test to assert all events have also been received.
  445. func fireEvents(evsw Fireable, event string, doneChan chan uint64,
  446. offset uint64) {
  447. var sentSum uint64
  448. for i := offset; i <= offset+uint64(999); i++ {
  449. sentSum += i
  450. evsw.FireEvent(event, i)
  451. }
  452. doneChan <- sentSum
  453. close(doneChan)
  454. }