- package events
-
- import (
- "context"
- "fmt"
- "math/rand"
- "testing"
- "time"
-
- "github.com/stretchr/testify/require"
-
- "github.com/tendermint/tendermint/libs/log"
- )
-
- // TestAddListenerForEventFireOnce sets up an EventSwitch, subscribes a single
- // listener to an event, and sends a string "data".
- func TestAddListenerForEventFireOnce(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.NewTestingLogger(t)
-
- evsw := NewEventSwitch(logger)
-
- messages := make(chan EventData)
- require.NoError(t, evsw.AddListenerForEvent("listener", "event",
- func(ctx context.Context, data EventData) error {
- select {
- case messages <- data:
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- go evsw.FireEvent(ctx, "event", "data")
- received := <-messages
- if received != "data" {
- t.Errorf("message received does not match: %v", received)
- }
- }
-
- // TestAddListenerForEventFireMany sets up an EventSwitch, subscribes a single
- // listener to an event, and sends a thousand integers.
- func TestAddListenerForEventFireMany(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.NewTestingLogger(t)
-
- evsw := NewEventSwitch(logger)
-
- doneSum := make(chan uint64)
- doneSending := make(chan uint64)
- numbers := make(chan uint64, 4)
- // subscribe one listener for one event
- require.NoError(t, evsw.AddListenerForEvent("listener", "event",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- // collect received events
- go sumReceivedNumbers(numbers, doneSum)
- // go fire events
- go fireEvents(ctx, evsw, "event", doneSending, uint64(1))
- checkSum := <-doneSending
- close(numbers)
- eventSum := <-doneSum
- if checkSum != eventSum {
- t.Errorf("not all messages sent were received.\n")
- }
- }
-
- // TestAddListenerForDifferentEvents sets up an EventSwitch, subscribes a single
- // listener to three different events and sends a thousand integers for each
- // of the three events.
- func TestAddListenerForDifferentEvents(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.NewTestingLogger(t)
-
- evsw := NewEventSwitch(logger)
-
- doneSum := make(chan uint64)
- doneSending1 := make(chan uint64)
- doneSending2 := make(chan uint64)
- doneSending3 := make(chan uint64)
- numbers := make(chan uint64, 4)
- // subscribe one listener to three events
- require.NoError(t, evsw.AddListenerForEvent("listener", "event1",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- require.NoError(t, evsw.AddListenerForEvent("listener", "event2",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- require.NoError(t, evsw.AddListenerForEvent("listener", "event3",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- // collect received events
- go sumReceivedNumbers(numbers, doneSum)
- // go fire events
- go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1))
- go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1))
- go fireEvents(ctx, evsw, "event3", doneSending3, uint64(1))
- var checkSum uint64
- checkSum += <-doneSending1
- checkSum += <-doneSending2
- checkSum += <-doneSending3
- close(numbers)
- eventSum := <-doneSum
- if checkSum != eventSum {
- t.Errorf("not all messages sent were received.\n")
- }
- }
-
- // TestAddDifferentListenerForDifferentEvents sets up an EventSwitch,
- // subscribes a first listener to three events, and subscribes a second
- // listener to two of those three events, and then sends a thousand integers
- // for each of the three events.
- func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- logger := log.NewTestingLogger(t)
- evsw := NewEventSwitch(logger)
-
- doneSum1 := make(chan uint64)
- doneSum2 := make(chan uint64)
- doneSending1 := make(chan uint64)
- doneSending2 := make(chan uint64)
- doneSending3 := make(chan uint64)
- numbers1 := make(chan uint64, 4)
- numbers2 := make(chan uint64, 4)
- // subscribe two listener to three events
- require.NoError(t, evsw.AddListenerForEvent("listener1", "event1",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers1 <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- require.NoError(t, evsw.AddListenerForEvent("listener1", "event2",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers1 <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- require.NoError(t, evsw.AddListenerForEvent("listener1", "event3",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers1 <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- require.NoError(t, evsw.AddListenerForEvent("listener2", "event2",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers2 <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- require.NoError(t, evsw.AddListenerForEvent("listener2", "event3",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers2 <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- // collect received events for listener1
- go sumReceivedNumbers(numbers1, doneSum1)
- // collect received events for listener2
- go sumReceivedNumbers(numbers2, doneSum2)
- // go fire events
- go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1))
- go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001))
- go fireEvents(ctx, evsw, "event3", doneSending3, uint64(2001))
- checkSumEvent1 := <-doneSending1
- checkSumEvent2 := <-doneSending2
- checkSumEvent3 := <-doneSending3
- checkSum1 := checkSumEvent1 + checkSumEvent2 + checkSumEvent3
- checkSum2 := checkSumEvent2 + checkSumEvent3
- close(numbers1)
- close(numbers2)
- eventSum1 := <-doneSum1
- eventSum2 := <-doneSum2
- if checkSum1 != eventSum1 ||
- checkSum2 != eventSum2 {
- t.Errorf("not all messages sent were received for different listeners to different events.\n")
- }
- }
-
- // TestManagerLiistenersAsync sets up an EventSwitch, subscribes two
- // listeners to three events, and fires a thousand integers for each event.
- // These two listeners serve as the baseline validation while other listeners
- // are randomly subscribed and unsubscribed.
- // More precisely it randomly subscribes new listeners (different from the first
- // two listeners) to one of these three events. At the same time it starts
- // randomly unsubscribing these additional listeners from all events they are
- // at that point subscribed to.
- // NOTE: it is important to run this test with race conditions tracking on,
- // `go test -race`, to examine for possible race conditions.
- func TestManageListenersAsync(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- logger := log.NewTestingLogger(t)
-
- evsw := NewEventSwitch(logger)
-
- doneSum1 := make(chan uint64)
- doneSum2 := make(chan uint64)
- doneSending1 := make(chan uint64)
- doneSending2 := make(chan uint64)
- doneSending3 := make(chan uint64)
- numbers1 := make(chan uint64, 4)
- numbers2 := make(chan uint64, 4)
- // subscribe two listener to three events
- require.NoError(t, evsw.AddListenerForEvent("listener1", "event1",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers1 <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- require.NoError(t, evsw.AddListenerForEvent("listener1", "event2",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers1 <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- require.NoError(t, evsw.AddListenerForEvent("listener1", "event3",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers1 <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- require.NoError(t, evsw.AddListenerForEvent("listener2", "event1",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers2 <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- require.NoError(t, evsw.AddListenerForEvent("listener2", "event2",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers2 <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- require.NoError(t, evsw.AddListenerForEvent("listener2", "event3",
- func(ctx context.Context, data EventData) error {
- select {
- case numbers2 <- data.(uint64):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
- }))
- // collect received events for event1
- go sumReceivedNumbers(numbers1, doneSum1)
- // collect received events for event2
- go sumReceivedNumbers(numbers2, doneSum2)
- addListenersStress := func() {
- r1 := rand.New(rand.NewSource(time.Now().Unix()))
- r1.Seed(time.Now().UnixNano())
- for k := uint16(0); k < 400; k++ {
- listenerNumber := r1.Intn(100) + 3
- eventNumber := r1.Intn(3) + 1
- go evsw.AddListenerForEvent(fmt.Sprintf("listener%v", listenerNumber), //nolint:errcheck // ignore for tests
- fmt.Sprintf("event%v", eventNumber),
- func(context.Context, EventData) error { return nil })
- }
- }
- addListenersStress()
- // go fire events
- go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1))
- go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001))
- go fireEvents(ctx, evsw, "event3", doneSending3, uint64(2001))
- checkSumEvent1 := <-doneSending1
- checkSumEvent2 := <-doneSending2
- checkSumEvent3 := <-doneSending3
- checkSum := checkSumEvent1 + checkSumEvent2 + checkSumEvent3
- close(numbers1)
- close(numbers2)
- eventSum1 := <-doneSum1
- eventSum2 := <-doneSum2
- if checkSum != eventSum1 ||
- checkSum != eventSum2 {
- t.Errorf("not all messages sent were received.\n")
- }
- }
-
- //------------------------------------------------------------------------------
- // Helper functions
-
- // sumReceivedNumbers takes two channels and adds all numbers received
- // until the receiving channel `numbers` is closed; it then sends the sum
- // on `doneSum` and closes that channel. Expected to be run in a go-routine.
- func sumReceivedNumbers(numbers, doneSum chan uint64) {
- var sum uint64
- for {
- j, more := <-numbers
- sum += j
- if !more {
- doneSum <- sum
- close(doneSum)
- return
- }
- }
- }
-
- // fireEvents takes an EventSwitch and fires a thousand integers under
- // a given `event` with the integers mootonically increasing from `offset`
- // to `offset` + 999. It additionally returns the addition of all integers
- // sent on `doneChan` for assertion that all events have been sent, and enabling
- // the test to assert all events have also been received.
- func fireEvents(ctx context.Context, evsw Fireable, event string, doneChan chan uint64, offset uint64) {
- defer close(doneChan)
-
- var sentSum uint64
- for i := offset; i <= offset+uint64(999); i++ {
- if ctx.Err() != nil {
- break
- }
-
- evsw.FireEvent(ctx, event, i)
- sentSum += i
- }
-
- select {
- case <-ctx.Done():
- case doneChan <- sentSum:
- }
- }
|