diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index cf1a10623..89c8c642b 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -445,8 +445,6 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) { select { case <-ctx.Done(): return - case <-r.pool.exitedCh: - return case <-switchToConsensusTicker.C: var ( height, numPending, lenRequesters = r.pool.GetStatus() diff --git a/libs/events/events_test.go b/libs/events/events_test.go index 6d0c8b4e7..bbab8f223 100644 --- a/libs/events/events_test.go +++ b/libs/events/events_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/log" @@ -28,8 +27,6 @@ func TestAddListenerForEventFireOnce(t *testing.T) { messages := make(chan EventData) require.NoError(t, evsw.AddListenerForEvent("listener", "event", func(ctx context.Context, data EventData) error { - // test there's no deadlock if we remove the listener inside a callback - evsw.RemoveListener("listener") select { case messages <- data: return nil @@ -234,63 +231,11 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) { } } -func TestAddAndRemoveListenerConcurrency(t *testing.T) { - var ( - stopInputEvent = false - roundCount = 2000 - ) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - logger := log.NewTestingLogger(t) - - evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) - - done1 := make(chan struct{}) - done2 := make(chan struct{}) - - // Must be executed concurrently to uncover the data race. - // 1. RemoveListener - go func() { - defer close(done1) - for i := 0; i < roundCount; i++ { - evsw.RemoveListener("listener") - } - }() - - // 2. AddListenerForEvent - go func() { - defer close(done2) - for i := 0; i < roundCount; i++ { - index := i - // we explicitly ignore errors here, since the listener will sometimes be removed - // (that's what we're testing) - _ = evsw.AddListenerForEvent("listener", fmt.Sprintf("event%d", index), - func(ctx context.Context, data EventData) error { - t.Errorf("should not run callback for %d.\n", index) - stopInputEvent = true - return nil - }) - } - }() - - <-done1 - <-done2 - - evsw.RemoveListener("listener") // remove the last listener - - for i := 0; i < roundCount && !stopInputEvent; i++ { - evsw.FireEvent(ctx, fmt.Sprintf("event%d", i), uint64(1001)) - } -} - // TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to // two events, fires a thousand integers for the first event, then unsubscribes // the listener and fires a thousand integers for the second event. -func TestAddAndRemoveListener(t *testing.T) { + +func TestAddListener(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -331,8 +276,6 @@ func TestAddAndRemoveListener(t *testing.T) { // go fire events go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1)) checkSumEvent1 := <-doneSending1 - // after sending all event1, unsubscribe for all events - evsw.RemoveListener("listener") go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001)) checkSumEvent2 := <-doneSending2 close(numbers1) @@ -347,58 +290,7 @@ func TestAddAndRemoveListener(t *testing.T) { } } -// TestRemoveListener does basic tests on adding and removing -func TestRemoveListener(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - logger := log.NewTestingLogger(t) - - evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) - - count := 10 - sum1, sum2 := 0, 0 - // add some listeners and make sure they work - require.NoError(t, evsw.AddListenerForEvent("listener", "event1", - func(ctx context.Context, data EventData) error { - sum1++ - return nil - })) - require.NoError(t, evsw.AddListenerForEvent("listener", "event2", - func(ctx context.Context, data EventData) error { - sum2++ - return nil - })) - - for i := 0; i < count; i++ { - evsw.FireEvent(ctx, "event1", true) - evsw.FireEvent(ctx, "event2", true) - } - assert.Equal(t, count, sum1) - assert.Equal(t, count, sum2) - - // remove one by event and make sure it is gone - evsw.RemoveListenerForEvent("event2", "listener") - for i := 0; i < count; i++ { - evsw.FireEvent(ctx, "event1", true) - evsw.FireEvent(ctx, "event2", true) - } - assert.Equal(t, count*2, sum1) - assert.Equal(t, count, sum2) - - // remove the listener entirely and make sure both gone - evsw.RemoveListener("listener") - for i := 0; i < count; i++ { - evsw.FireEvent(ctx, "event1", true) - evsw.FireEvent(ctx, "event2", true) - } - assert.Equal(t, count*2, sum1) - assert.Equal(t, count, sum2) -} - -// TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two +// TestManagerLiistenersAsync sets up an EventSwitch, subscribes two // listeners to three events, and fires a thousand integers for each event. // These two listeners serve as the baseline validation while other listeners // are randomly subscribed and unsubscribed. @@ -408,7 +300,7 @@ func TestRemoveListener(t *testing.T) { // at that point subscribed to. // NOTE: it is important to run this test with race conditions tracking on, // `go test -race`, to examine for possible race conditions. -func TestRemoveListenersAsync(t *testing.T) { +func TestManageListenersAsync(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.NewTestingLogger(t) @@ -494,18 +386,9 @@ func TestRemoveListenersAsync(t *testing.T) { func(context.Context, EventData) error { return nil }) } } - removeListenersStress := func() { - r2 := rand.New(rand.NewSource(time.Now().Unix())) - r2.Seed(time.Now().UnixNano()) - for k := uint16(0); k < 80; k++ { - listenerNumber := r2.Intn(100) + 3 - go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber)) - } - } addListenersStress() // go fire events go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1)) - removeListenersStress() go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001)) go fireEvents(ctx, evsw, "event3", doneSending3, uint64(2001)) checkSumEvent1 := <-doneSending1