From 7a0b05f22d51a51fdffedc37ea53d5efe4f975db Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Thu, 17 Mar 2022 09:02:02 -0400 Subject: [PATCH] libs/events: remove unneccessary unsubscription code (#8135) The events switch code is largely vestigal and is responsible for wiring between the consensus state machine and the consensus reactor. While there might have been a need, historicallly to managed these subscriptions at runtime, it's nolonger used: subscriptions are registered during startup, and then the switch shuts down at at the end. Eventually the EventSwitch should be replaced by a much smaller implementation of an eventloop in the consensus state machine, but cutting down on the scope of the event switch will help clarify the requirements from the consensus side. --- internal/consensus/reactor.go | 6 -- libs/events/events.go | 55 +---------- libs/events/events_test.go | 180 +--------------------------------- 3 files changed, 4 insertions(+), 237 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index efb3f2d04..822e8c627 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -219,8 +219,6 @@ func (r *Reactor) OnStart(ctx context.Context) error { // blocking until they all exit, as well as unsubscribing from events and stopping // state. func (r *Reactor) OnStop() { - r.unsubscribeFromBroadcastEvents() - r.state.Stop() if !r.WaitSync() { @@ -394,10 +392,6 @@ func (r *Reactor) subscribeToBroadcastEvents() { } } -func (r *Reactor) unsubscribeFromBroadcastEvents() { - r.state.evsw.RemoveListener(listenerIDConsensus) -} - func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep { return &tmcons.NewRoundStep{ Height: rs.Height, diff --git a/libs/events/events.go b/libs/events/events.go index 636aa102d..5ab5961f6 100644 --- a/libs/events/events.go +++ b/libs/events/events.go @@ -50,8 +50,6 @@ type EventSwitch interface { Stop() AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error - RemoveListenerForEvent(event string, listenerID string) - RemoveListener(listenerID string) } type eventSwitch struct { @@ -71,11 +69,8 @@ func NewEventSwitch(logger log.Logger) EventSwitch { return evsw } -func (evsw *eventSwitch) OnStart(ctx context.Context) error { - return nil -} - -func (evsw *eventSwitch) OnStop() {} +func (evsw *eventSwitch) OnStart(ctx context.Context) error { return nil } +func (evsw *eventSwitch) OnStop() {} func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error { // Get/Create eventCell and listener. @@ -103,52 +98,6 @@ func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb E return nil } -func (evsw *eventSwitch) RemoveListener(listenerID string) { - // Get and remove listener. - evsw.mtx.RLock() - listener := evsw.listeners[listenerID] - evsw.mtx.RUnlock() - if listener == nil { - return - } - - evsw.mtx.Lock() - delete(evsw.listeners, listenerID) - evsw.mtx.Unlock() - - // Remove callback for each event. - listener.SetRemoved() - for _, event := range listener.GetEvents() { - evsw.RemoveListenerForEvent(event, listenerID) - } -} - -func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) { - // Get eventCell - evsw.mtx.Lock() - eventCell := evsw.eventCells[event] - evsw.mtx.Unlock() - - if eventCell == nil { - return - } - - // Remove listenerID from eventCell - numListeners := eventCell.RemoveListener(listenerID) - - // Maybe garbage collect eventCell. - if numListeners == 0 { - // Lock again and double check. - evsw.mtx.Lock() // OUTER LOCK - eventCell.mtx.Lock() // INNER LOCK - if len(eventCell.listeners) == 0 { - delete(evsw.eventCells, event) - } - eventCell.mtx.Unlock() // INNER LOCK - evsw.mtx.Unlock() // OUTER LOCK - } -} - func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data EventData) { // Get the eventCell evsw.mtx.RLock() diff --git a/libs/events/events_test.go b/libs/events/events_test.go index 6d0c8b4e7..15f208e06 100644 --- a/libs/events/events_test.go +++ b/libs/events/events_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/log" @@ -28,8 +27,6 @@ func TestAddListenerForEventFireOnce(t *testing.T) { messages := make(chan EventData) require.NoError(t, evsw.AddListenerForEvent("listener", "event", func(ctx context.Context, data EventData) error { - // test there's no deadlock if we remove the listener inside a callback - evsw.RemoveListener("listener") select { case messages <- data: return nil @@ -234,171 +231,7 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) { } } -func TestAddAndRemoveListenerConcurrency(t *testing.T) { - var ( - stopInputEvent = false - roundCount = 2000 - ) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - logger := log.NewTestingLogger(t) - - evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) - - done1 := make(chan struct{}) - done2 := make(chan struct{}) - - // Must be executed concurrently to uncover the data race. - // 1. RemoveListener - go func() { - defer close(done1) - for i := 0; i < roundCount; i++ { - evsw.RemoveListener("listener") - } - }() - - // 2. AddListenerForEvent - go func() { - defer close(done2) - for i := 0; i < roundCount; i++ { - index := i - // we explicitly ignore errors here, since the listener will sometimes be removed - // (that's what we're testing) - _ = evsw.AddListenerForEvent("listener", fmt.Sprintf("event%d", index), - func(ctx context.Context, data EventData) error { - t.Errorf("should not run callback for %d.\n", index) - stopInputEvent = true - return nil - }) - } - }() - - <-done1 - <-done2 - - evsw.RemoveListener("listener") // remove the last listener - - for i := 0; i < roundCount && !stopInputEvent; i++ { - evsw.FireEvent(ctx, fmt.Sprintf("event%d", i), uint64(1001)) - } -} - -// TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to -// two events, fires a thousand integers for the first event, then unsubscribes -// the listener and fires a thousand integers for the second event. -func TestAddAndRemoveListener(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - logger := log.NewTestingLogger(t) - evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) - - doneSum1 := make(chan uint64) - doneSum2 := make(chan uint64) - doneSending1 := make(chan uint64) - doneSending2 := make(chan uint64) - numbers1 := make(chan uint64, 4) - numbers2 := make(chan uint64, 4) - // subscribe two listener to three events - require.NoError(t, evsw.AddListenerForEvent("listener", "event1", - func(ctx context.Context, data EventData) error { - select { - case numbers1 <- data.(uint64): - return nil - case <-ctx.Done(): - return ctx.Err() - } - })) - require.NoError(t, evsw.AddListenerForEvent("listener", "event2", - func(ctx context.Context, data EventData) error { - select { - case numbers2 <- data.(uint64): - return nil - case <-ctx.Done(): - return ctx.Err() - } - })) - // collect received events for event1 - go sumReceivedNumbers(numbers1, doneSum1) - // collect received events for event2 - go sumReceivedNumbers(numbers2, doneSum2) - // go fire events - go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1)) - checkSumEvent1 := <-doneSending1 - // after sending all event1, unsubscribe for all events - evsw.RemoveListener("listener") - go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001)) - checkSumEvent2 := <-doneSending2 - close(numbers1) - close(numbers2) - eventSum1 := <-doneSum1 - eventSum2 := <-doneSum2 - if checkSumEvent1 != eventSum1 || - // correct value asserted by preceding tests, suffices to be non-zero - checkSumEvent2 == uint64(0) || - eventSum2 != uint64(0) { - t.Errorf("not all messages sent were received or unsubscription did not register.\n") - } -} - -// TestRemoveListener does basic tests on adding and removing -func TestRemoveListener(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - logger := log.NewTestingLogger(t) - - evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) - - count := 10 - sum1, sum2 := 0, 0 - // add some listeners and make sure they work - require.NoError(t, evsw.AddListenerForEvent("listener", "event1", - func(ctx context.Context, data EventData) error { - sum1++ - return nil - })) - require.NoError(t, evsw.AddListenerForEvent("listener", "event2", - func(ctx context.Context, data EventData) error { - sum2++ - return nil - })) - - for i := 0; i < count; i++ { - evsw.FireEvent(ctx, "event1", true) - evsw.FireEvent(ctx, "event2", true) - } - assert.Equal(t, count, sum1) - assert.Equal(t, count, sum2) - - // remove one by event and make sure it is gone - evsw.RemoveListenerForEvent("event2", "listener") - for i := 0; i < count; i++ { - evsw.FireEvent(ctx, "event1", true) - evsw.FireEvent(ctx, "event2", true) - } - assert.Equal(t, count*2, sum1) - assert.Equal(t, count, sum2) - - // remove the listener entirely and make sure both gone - evsw.RemoveListener("listener") - for i := 0; i < count; i++ { - evsw.FireEvent(ctx, "event1", true) - evsw.FireEvent(ctx, "event2", true) - } - assert.Equal(t, count*2, sum1) - assert.Equal(t, count, sum2) -} - -// TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two +// TestManagerLiistenersAsync sets up an EventSwitch, subscribes two // listeners to three events, and fires a thousand integers for each event. // These two listeners serve as the baseline validation while other listeners // are randomly subscribed and unsubscribed. @@ -408,7 +241,7 @@ func TestRemoveListener(t *testing.T) { // at that point subscribed to. // NOTE: it is important to run this test with race conditions tracking on, // `go test -race`, to examine for possible race conditions. -func TestRemoveListenersAsync(t *testing.T) { +func TestManageListenersAsync(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.NewTestingLogger(t) @@ -494,18 +327,9 @@ func TestRemoveListenersAsync(t *testing.T) { func(context.Context, EventData) error { return nil }) } } - removeListenersStress := func() { - r2 := rand.New(rand.NewSource(time.Now().Unix())) - r2.Seed(time.Now().UnixNano()) - for k := uint16(0); k < 80; k++ { - listenerNumber := r2.Intn(100) + 3 - go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber)) - } - } addListenersStress() // go fire events go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1)) - removeListenersStress() go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001)) go fireEvents(ctx, evsw, "event3", doneSending3, uint64(2001)) checkSumEvent1 := <-doneSending1