You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

551 lines
16 KiB

lint: Enable Golint (#4212) * Fix many golint errors * Fix golint errors in the 'lite' package * Don't export Pool.store * Fix typo * Revert unwanted changes * Fix errors in counter package * Fix linter errors in kvstore package * Fix linter error in example package * Fix error in tests package * Fix linter errors in v2 package * Fix linter errors in consensus package * Fix linter errors in evidence package * Fix linter error in fail package * Fix linter errors in query package * Fix linter errors in core package * Fix linter errors in node package * Fix linter errors in mempool package * Fix linter error in conn package * Fix linter errors in pex package * Rename PEXReactor export to Reactor * Fix linter errors in trust package * Fix linter errors in upnp package * Fix linter errors in p2p package * Fix linter errors in proxy package * Fix linter errors in mock_test package * Fix linter error in client_test package * Fix linter errors in coretypes package * Fix linter errors in coregrpc package * Fix linter errors in rpcserver package * Fix linter errors in rpctypes package * Fix linter errors in rpctest package * Fix linter error in json2wal script * Fix linter error in wal2json script * Fix linter errors in kv package * Fix linter error in state package * Fix linter error in grpc_client * Fix linter errors in types package * Fix linter error in version package * Fix remaining errors * Address review comments * Fix broken tests * Reconcile package coregrpc * Fix golangci bot error * Fix new golint errors * Fix broken reference * Enable golint linter * minor changes to bring golint into line * fix failing test * fix pex reactor naming * address PR comments
5 years ago
  1. package events
  2. import (
  3. "context"
  4. "fmt"
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/assert"
  8. "github.com/stretchr/testify/require"
  9. "github.com/tendermint/tendermint/libs/log"
  10. "github.com/tendermint/tendermint/libs/rand"
  11. )
  12. // TestAddListenerForEventFireOnce sets up an EventSwitch, subscribes a single
  13. // listener to an event, and sends a string "data".
  14. func TestAddListenerForEventFireOnce(t *testing.T) {
  15. ctx, cancel := context.WithCancel(context.Background())
  16. defer cancel()
  17. evsw := NewEventSwitch(log.TestingLogger())
  18. require.NoError(t, evsw.Start(ctx))
  19. t.Cleanup(evsw.Wait)
  20. messages := make(chan EventData)
  21. require.NoError(t, evsw.AddListenerForEvent("listener", "event",
  22. func(ctx context.Context, data EventData) error {
  23. // test there's no deadlock if we remove the listener inside a callback
  24. evsw.RemoveListener("listener")
  25. select {
  26. case messages <- data:
  27. return nil
  28. case <-ctx.Done():
  29. return ctx.Err()
  30. }
  31. }))
  32. go evsw.FireEvent(ctx, "event", "data")
  33. received := <-messages
  34. if received != "data" {
  35. t.Errorf("message received does not match: %v", received)
  36. }
  37. }
  38. // TestAddListenerForEventFireMany sets up an EventSwitch, subscribes a single
  39. // listener to an event, and sends a thousand integers.
  40. func TestAddListenerForEventFireMany(t *testing.T) {
  41. ctx, cancel := context.WithCancel(context.Background())
  42. defer cancel()
  43. evsw := NewEventSwitch(log.TestingLogger())
  44. require.NoError(t, evsw.Start(ctx))
  45. t.Cleanup(evsw.Wait)
  46. doneSum := make(chan uint64)
  47. doneSending := make(chan uint64)
  48. numbers := make(chan uint64, 4)
  49. // subscribe one listener for one event
  50. require.NoError(t, evsw.AddListenerForEvent("listener", "event",
  51. func(ctx context.Context, data EventData) error {
  52. select {
  53. case numbers <- data.(uint64):
  54. return nil
  55. case <-ctx.Done():
  56. return ctx.Err()
  57. }
  58. }))
  59. // collect received events
  60. go sumReceivedNumbers(numbers, doneSum)
  61. // go fire events
  62. go fireEvents(ctx, evsw, "event", doneSending, uint64(1))
  63. checkSum := <-doneSending
  64. close(numbers)
  65. eventSum := <-doneSum
  66. if checkSum != eventSum {
  67. t.Errorf("not all messages sent were received.\n")
  68. }
  69. }
  70. // TestAddListenerForDifferentEvents sets up an EventSwitch, subscribes a single
  71. // listener to three different events and sends a thousand integers for each
  72. // of the three events.
  73. func TestAddListenerForDifferentEvents(t *testing.T) {
  74. ctx, cancel := context.WithCancel(context.Background())
  75. defer cancel()
  76. evsw := NewEventSwitch(log.TestingLogger())
  77. require.NoError(t, evsw.Start(ctx))
  78. t.Cleanup(evsw.Wait)
  79. doneSum := make(chan uint64)
  80. doneSending1 := make(chan uint64)
  81. doneSending2 := make(chan uint64)
  82. doneSending3 := make(chan uint64)
  83. numbers := make(chan uint64, 4)
  84. // subscribe one listener to three events
  85. require.NoError(t, evsw.AddListenerForEvent("listener", "event1",
  86. func(ctx context.Context, data EventData) error {
  87. select {
  88. case numbers <- data.(uint64):
  89. return nil
  90. case <-ctx.Done():
  91. return ctx.Err()
  92. }
  93. }))
  94. require.NoError(t, evsw.AddListenerForEvent("listener", "event2",
  95. func(ctx context.Context, data EventData) error {
  96. select {
  97. case numbers <- data.(uint64):
  98. return nil
  99. case <-ctx.Done():
  100. return ctx.Err()
  101. }
  102. }))
  103. require.NoError(t, evsw.AddListenerForEvent("listener", "event3",
  104. func(ctx context.Context, data EventData) error {
  105. select {
  106. case numbers <- data.(uint64):
  107. return nil
  108. case <-ctx.Done():
  109. return ctx.Err()
  110. }
  111. }))
  112. // collect received events
  113. go sumReceivedNumbers(numbers, doneSum)
  114. // go fire events
  115. go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1))
  116. go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1))
  117. go fireEvents(ctx, evsw, "event3", doneSending3, uint64(1))
  118. var checkSum uint64
  119. checkSum += <-doneSending1
  120. checkSum += <-doneSending2
  121. checkSum += <-doneSending3
  122. close(numbers)
  123. eventSum := <-doneSum
  124. if checkSum != eventSum {
  125. t.Errorf("not all messages sent were received.\n")
  126. }
  127. }
  128. // TestAddDifferentListenerForDifferentEvents sets up an EventSwitch,
  129. // subscribes a first listener to three events, and subscribes a second
  130. // listener to two of those three events, and then sends a thousand integers
  131. // for each of the three events.
  132. func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
  133. ctx, cancel := context.WithCancel(context.Background())
  134. defer cancel()
  135. evsw := NewEventSwitch(log.TestingLogger())
  136. require.NoError(t, evsw.Start(ctx))
  137. t.Cleanup(evsw.Wait)
  138. doneSum1 := make(chan uint64)
  139. doneSum2 := make(chan uint64)
  140. doneSending1 := make(chan uint64)
  141. doneSending2 := make(chan uint64)
  142. doneSending3 := make(chan uint64)
  143. numbers1 := make(chan uint64, 4)
  144. numbers2 := make(chan uint64, 4)
  145. // subscribe two listener to three events
  146. require.NoError(t, evsw.AddListenerForEvent("listener1", "event1",
  147. func(ctx context.Context, data EventData) error {
  148. select {
  149. case numbers1 <- data.(uint64):
  150. return nil
  151. case <-ctx.Done():
  152. return ctx.Err()
  153. }
  154. }))
  155. require.NoError(t, evsw.AddListenerForEvent("listener1", "event2",
  156. func(ctx context.Context, data EventData) error {
  157. select {
  158. case numbers1 <- data.(uint64):
  159. return nil
  160. case <-ctx.Done():
  161. return ctx.Err()
  162. }
  163. }))
  164. require.NoError(t, evsw.AddListenerForEvent("listener1", "event3",
  165. func(ctx context.Context, data EventData) error {
  166. select {
  167. case numbers1 <- data.(uint64):
  168. return nil
  169. case <-ctx.Done():
  170. return ctx.Err()
  171. }
  172. }))
  173. require.NoError(t, evsw.AddListenerForEvent("listener2", "event2",
  174. func(ctx context.Context, data EventData) error {
  175. select {
  176. case numbers2 <- data.(uint64):
  177. return nil
  178. case <-ctx.Done():
  179. return ctx.Err()
  180. }
  181. }))
  182. require.NoError(t, evsw.AddListenerForEvent("listener2", "event3",
  183. func(ctx context.Context, data EventData) error {
  184. select {
  185. case numbers2 <- data.(uint64):
  186. return nil
  187. case <-ctx.Done():
  188. return ctx.Err()
  189. }
  190. }))
  191. // collect received events for listener1
  192. go sumReceivedNumbers(numbers1, doneSum1)
  193. // collect received events for listener2
  194. go sumReceivedNumbers(numbers2, doneSum2)
  195. // go fire events
  196. go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1))
  197. go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001))
  198. go fireEvents(ctx, evsw, "event3", doneSending3, uint64(2001))
  199. checkSumEvent1 := <-doneSending1
  200. checkSumEvent2 := <-doneSending2
  201. checkSumEvent3 := <-doneSending3
  202. checkSum1 := checkSumEvent1 + checkSumEvent2 + checkSumEvent3
  203. checkSum2 := checkSumEvent2 + checkSumEvent3
  204. close(numbers1)
  205. close(numbers2)
  206. eventSum1 := <-doneSum1
  207. eventSum2 := <-doneSum2
  208. if checkSum1 != eventSum1 ||
  209. checkSum2 != eventSum2 {
  210. t.Errorf("not all messages sent were received for different listeners to different events.\n")
  211. }
  212. }
  213. func TestAddAndRemoveListenerConcurrency(t *testing.T) {
  214. var (
  215. stopInputEvent = false
  216. roundCount = 2000
  217. )
  218. ctx, cancel := context.WithCancel(context.Background())
  219. defer cancel()
  220. evsw := NewEventSwitch(log.TestingLogger())
  221. require.NoError(t, evsw.Start(ctx))
  222. t.Cleanup(evsw.Wait)
  223. done1 := make(chan struct{})
  224. done2 := make(chan struct{})
  225. // Must be executed concurrently to uncover the data race.
  226. // 1. RemoveListener
  227. go func() {
  228. defer close(done1)
  229. for i := 0; i < roundCount; i++ {
  230. evsw.RemoveListener("listener")
  231. }
  232. }()
  233. // 2. AddListenerForEvent
  234. go func() {
  235. defer close(done2)
  236. for i := 0; i < roundCount; i++ {
  237. index := i
  238. // we explicitly ignore errors here, since the listener will sometimes be removed
  239. // (that's what we're testing)
  240. _ = evsw.AddListenerForEvent("listener", fmt.Sprintf("event%d", index),
  241. func(ctx context.Context, data EventData) error {
  242. t.Errorf("should not run callback for %d.\n", index)
  243. stopInputEvent = true
  244. return nil
  245. })
  246. }
  247. }()
  248. <-done1
  249. <-done2
  250. evsw.RemoveListener("listener") // remove the last listener
  251. for i := 0; i < roundCount && !stopInputEvent; i++ {
  252. evsw.FireEvent(ctx, fmt.Sprintf("event%d", i), uint64(1001))
  253. }
  254. }
  255. // TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to
  256. // two events, fires a thousand integers for the first event, then unsubscribes
  257. // the listener and fires a thousand integers for the second event.
  258. func TestAddAndRemoveListener(t *testing.T) {
  259. ctx, cancel := context.WithCancel(context.Background())
  260. defer cancel()
  261. evsw := NewEventSwitch(log.TestingLogger())
  262. require.NoError(t, evsw.Start(ctx))
  263. t.Cleanup(evsw.Wait)
  264. doneSum1 := make(chan uint64)
  265. doneSum2 := make(chan uint64)
  266. doneSending1 := make(chan uint64)
  267. doneSending2 := make(chan uint64)
  268. numbers1 := make(chan uint64, 4)
  269. numbers2 := make(chan uint64, 4)
  270. // subscribe two listener to three events
  271. require.NoError(t, evsw.AddListenerForEvent("listener", "event1",
  272. func(ctx context.Context, data EventData) error {
  273. select {
  274. case numbers1 <- data.(uint64):
  275. return nil
  276. case <-ctx.Done():
  277. return ctx.Err()
  278. }
  279. }))
  280. require.NoError(t, evsw.AddListenerForEvent("listener", "event2",
  281. func(ctx context.Context, data EventData) error {
  282. select {
  283. case numbers2 <- data.(uint64):
  284. return nil
  285. case <-ctx.Done():
  286. return ctx.Err()
  287. }
  288. }))
  289. // collect received events for event1
  290. go sumReceivedNumbers(numbers1, doneSum1)
  291. // collect received events for event2
  292. go sumReceivedNumbers(numbers2, doneSum2)
  293. // go fire events
  294. go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1))
  295. checkSumEvent1 := <-doneSending1
  296. // after sending all event1, unsubscribe for all events
  297. evsw.RemoveListener("listener")
  298. go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001))
  299. checkSumEvent2 := <-doneSending2
  300. close(numbers1)
  301. close(numbers2)
  302. eventSum1 := <-doneSum1
  303. eventSum2 := <-doneSum2
  304. if checkSumEvent1 != eventSum1 ||
  305. // correct value asserted by preceding tests, suffices to be non-zero
  306. checkSumEvent2 == uint64(0) ||
  307. eventSum2 != uint64(0) {
  308. t.Errorf("not all messages sent were received or unsubscription did not register.\n")
  309. }
  310. }
  311. // TestRemoveListener does basic tests on adding and removing
  312. func TestRemoveListener(t *testing.T) {
  313. ctx, cancel := context.WithCancel(context.Background())
  314. defer cancel()
  315. evsw := NewEventSwitch(log.TestingLogger())
  316. require.NoError(t, evsw.Start(ctx))
  317. t.Cleanup(evsw.Wait)
  318. count := 10
  319. sum1, sum2 := 0, 0
  320. // add some listeners and make sure they work
  321. require.NoError(t, evsw.AddListenerForEvent("listener", "event1",
  322. func(ctx context.Context, data EventData) error {
  323. sum1++
  324. return nil
  325. }))
  326. require.NoError(t, evsw.AddListenerForEvent("listener", "event2",
  327. func(ctx context.Context, data EventData) error {
  328. sum2++
  329. return nil
  330. }))
  331. for i := 0; i < count; i++ {
  332. evsw.FireEvent(ctx, "event1", true)
  333. evsw.FireEvent(ctx, "event2", true)
  334. }
  335. assert.Equal(t, count, sum1)
  336. assert.Equal(t, count, sum2)
  337. // remove one by event and make sure it is gone
  338. evsw.RemoveListenerForEvent("event2", "listener")
  339. for i := 0; i < count; i++ {
  340. evsw.FireEvent(ctx, "event1", true)
  341. evsw.FireEvent(ctx, "event2", true)
  342. }
  343. assert.Equal(t, count*2, sum1)
  344. assert.Equal(t, count, sum2)
  345. // remove the listener entirely and make sure both gone
  346. evsw.RemoveListener("listener")
  347. for i := 0; i < count; i++ {
  348. evsw.FireEvent(ctx, "event1", true)
  349. evsw.FireEvent(ctx, "event2", true)
  350. }
  351. assert.Equal(t, count*2, sum1)
  352. assert.Equal(t, count, sum2)
  353. }
  354. // TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two
  355. // listeners to three events, and fires a thousand integers for each event.
  356. // These two listeners serve as the baseline validation while other listeners
  357. // are randomly subscribed and unsubscribed.
  358. // More precisely it randomly subscribes new listeners (different from the first
  359. // two listeners) to one of these three events. At the same time it starts
  360. // randomly unsubscribing these additional listeners from all events they are
  361. // at that point subscribed to.
  362. // NOTE: it is important to run this test with race conditions tracking on,
  363. // `go test -race`, to examine for possible race conditions.
  364. func TestRemoveListenersAsync(t *testing.T) {
  365. ctx, cancel := context.WithCancel(context.Background())
  366. defer cancel()
  367. evsw := NewEventSwitch(log.TestingLogger())
  368. require.NoError(t, evsw.Start(ctx))
  369. t.Cleanup(evsw.Wait)
  370. doneSum1 := make(chan uint64)
  371. doneSum2 := make(chan uint64)
  372. doneSending1 := make(chan uint64)
  373. doneSending2 := make(chan uint64)
  374. doneSending3 := make(chan uint64)
  375. numbers1 := make(chan uint64, 4)
  376. numbers2 := make(chan uint64, 4)
  377. // subscribe two listener to three events
  378. require.NoError(t, evsw.AddListenerForEvent("listener1", "event1",
  379. func(ctx context.Context, data EventData) error {
  380. select {
  381. case numbers1 <- data.(uint64):
  382. return nil
  383. case <-ctx.Done():
  384. return ctx.Err()
  385. }
  386. }))
  387. require.NoError(t, evsw.AddListenerForEvent("listener1", "event2",
  388. func(ctx context.Context, data EventData) error {
  389. select {
  390. case numbers1 <- data.(uint64):
  391. return nil
  392. case <-ctx.Done():
  393. return ctx.Err()
  394. }
  395. }))
  396. require.NoError(t, evsw.AddListenerForEvent("listener1", "event3",
  397. func(ctx context.Context, data EventData) error {
  398. select {
  399. case numbers1 <- data.(uint64):
  400. return nil
  401. case <-ctx.Done():
  402. return ctx.Err()
  403. }
  404. }))
  405. require.NoError(t, evsw.AddListenerForEvent("listener2", "event1",
  406. func(ctx context.Context, data EventData) error {
  407. select {
  408. case numbers2 <- data.(uint64):
  409. return nil
  410. case <-ctx.Done():
  411. return ctx.Err()
  412. }
  413. }))
  414. require.NoError(t, evsw.AddListenerForEvent("listener2", "event2",
  415. func(ctx context.Context, data EventData) error {
  416. select {
  417. case numbers2 <- data.(uint64):
  418. return nil
  419. case <-ctx.Done():
  420. return ctx.Err()
  421. }
  422. }))
  423. require.NoError(t, evsw.AddListenerForEvent("listener2", "event3",
  424. func(ctx context.Context, data EventData) error {
  425. select {
  426. case numbers2 <- data.(uint64):
  427. return nil
  428. case <-ctx.Done():
  429. return ctx.Err()
  430. }
  431. }))
  432. // collect received events for event1
  433. go sumReceivedNumbers(numbers1, doneSum1)
  434. // collect received events for event2
  435. go sumReceivedNumbers(numbers2, doneSum2)
  436. addListenersStress := func() {
  437. r1 := rand.NewRand()
  438. r1.Seed(time.Now().UnixNano())
  439. for k := uint16(0); k < 400; k++ {
  440. listenerNumber := r1.Intn(100) + 3
  441. eventNumber := r1.Intn(3) + 1
  442. go evsw.AddListenerForEvent(fmt.Sprintf("listener%v", listenerNumber), //nolint:errcheck // ignore for tests
  443. fmt.Sprintf("event%v", eventNumber),
  444. func(context.Context, EventData) error { return nil })
  445. }
  446. }
  447. removeListenersStress := func() {
  448. r2 := rand.NewRand()
  449. r2.Seed(time.Now().UnixNano())
  450. for k := uint16(0); k < 80; k++ {
  451. listenerNumber := r2.Intn(100) + 3
  452. go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber))
  453. }
  454. }
  455. addListenersStress()
  456. // go fire events
  457. go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1))
  458. removeListenersStress()
  459. go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001))
  460. go fireEvents(ctx, evsw, "event3", doneSending3, uint64(2001))
  461. checkSumEvent1 := <-doneSending1
  462. checkSumEvent2 := <-doneSending2
  463. checkSumEvent3 := <-doneSending3
  464. checkSum := checkSumEvent1 + checkSumEvent2 + checkSumEvent3
  465. close(numbers1)
  466. close(numbers2)
  467. eventSum1 := <-doneSum1
  468. eventSum2 := <-doneSum2
  469. if checkSum != eventSum1 ||
  470. checkSum != eventSum2 {
  471. t.Errorf("not all messages sent were received.\n")
  472. }
  473. }
  474. //------------------------------------------------------------------------------
  475. // Helper functions
  476. // sumReceivedNumbers takes two channels and adds all numbers received
  477. // until the receiving channel `numbers` is closed; it then sends the sum
  478. // on `doneSum` and closes that channel. Expected to be run in a go-routine.
  479. func sumReceivedNumbers(numbers, doneSum chan uint64) {
  480. var sum uint64
  481. for {
  482. j, more := <-numbers
  483. sum += j
  484. if !more {
  485. doneSum <- sum
  486. close(doneSum)
  487. return
  488. }
  489. }
  490. }
  491. // fireEvents takes an EventSwitch and fires a thousand integers under
  492. // a given `event` with the integers mootonically increasing from `offset`
  493. // to `offset` + 999. It additionally returns the addition of all integers
  494. // sent on `doneChan` for assertion that all events have been sent, and enabling
  495. // the test to assert all events have also been received.
  496. func fireEvents(ctx context.Context, evsw Fireable, event string, doneChan chan uint64, offset uint64) {
  497. defer close(doneChan)
  498. var sentSum uint64
  499. for i := offset; i <= offset+uint64(999); i++ {
  500. if ctx.Err() != nil {
  501. break
  502. }
  503. evsw.FireEvent(ctx, event, i)
  504. sentSum += i
  505. }
  506. select {
  507. case <-ctx.Done():
  508. case doneChan <- sentSum:
  509. }
  510. }