diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index efb3f2d04..822e8c627 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -219,8 +219,6 @@ func (r *Reactor) OnStart(ctx context.Context) error { // blocking until they all exit, as well as unsubscribing from events and stopping // state. func (r *Reactor) OnStop() { - r.unsubscribeFromBroadcastEvents() - r.state.Stop() if !r.WaitSync() { @@ -394,10 +392,6 @@ func (r *Reactor) subscribeToBroadcastEvents() { } } -func (r *Reactor) unsubscribeFromBroadcastEvents() { - r.state.evsw.RemoveListener(listenerIDConsensus) -} - func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep { return &tmcons.NewRoundStep{ Height: rs.Height, diff --git a/internal/libs/clist/bench_test.go b/internal/libs/clist/bench_test.go index 95973cc76..ee5d836a7 100644 --- a/internal/libs/clist/bench_test.go +++ b/internal/libs/clist/bench_test.go @@ -12,7 +12,7 @@ func BenchmarkDetaching(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { start.removed = true - start.DetachNext() + start.detachNext() start.DetachPrev() tmp := nxt nxt = nxt.Next() diff --git a/internal/libs/clist/clist.go b/internal/libs/clist/clist.go index 99e5f05bf..3969c94cc 100644 --- a/internal/libs/clist/clist.go +++ b/internal/libs/clist/clist.go @@ -44,7 +44,6 @@ waiting on NextWait() (since it's just a read operation). type CElement struct { mtx sync.RWMutex prev *CElement - prevWaitCh chan struct{} next *CElement nextWaitCh chan struct{} removed bool @@ -72,33 +71,6 @@ func (e *CElement) NextWait() *CElement { } } -// Blocking implementation of Prev(). -// May return nil iff CElement was head and got removed. -func (e *CElement) PrevWait() *CElement { - for { - e.mtx.RLock() - prev := e.prev - removed := e.removed - signal := e.prevWaitCh - e.mtx.RUnlock() - - if prev != nil || removed { - return prev - } - - <-signal - } -} - -// PrevWaitChan can be used to wait until Prev becomes not nil. Once it does, -// channel will be closed. -func (e *CElement) PrevWaitChan() <-chan struct{} { - e.mtx.RLock() - defer e.mtx.RUnlock() - - return e.prevWaitCh -} - // NextWaitChan can be used to wait until Next becomes not nil. Once it does, // channel will be closed. func (e *CElement) NextWaitChan() <-chan struct{} { @@ -131,7 +103,7 @@ func (e *CElement) Removed() bool { return isRemoved } -func (e *CElement) DetachNext() { +func (e *CElement) detachNext() { e.mtx.Lock() if !e.removed { e.mtx.Unlock() @@ -153,7 +125,7 @@ func (e *CElement) DetachPrev() { // NOTE: This function needs to be safe for // concurrent goroutines waiting on nextWg. -func (e *CElement) SetNext(newNext *CElement) { +func (e *CElement) setNext(newNext *CElement) { e.mtx.Lock() oldNext := e.next @@ -174,30 +146,20 @@ func (e *CElement) SetNext(newNext *CElement) { // NOTE: This function needs to be safe for // concurrent goroutines waiting on prevWg -func (e *CElement) SetPrev(newPrev *CElement) { +func (e *CElement) setPrev(newPrev *CElement) { e.mtx.Lock() defer e.mtx.Unlock() - oldPrev := e.prev e.prev = newPrev - if oldPrev != nil && newPrev == nil { - e.prevWaitCh = make(chan struct{}) - } - if oldPrev == nil && newPrev != nil { - close(e.prevWaitCh) - } } -func (e *CElement) SetRemoved() { +func (e *CElement) setRemoved() { e.mtx.Lock() defer e.mtx.Unlock() e.removed = true - // This wakes up anyone waiting in either direction. - if e.prev == nil { - close(e.prevWaitCh) - } + // This wakes up anyone waiting. if e.next == nil { close(e.nextWaitCh) } @@ -211,7 +173,6 @@ func (e *CElement) SetRemoved() { // Panics if length grows beyond the max. type CList struct { mtx sync.RWMutex - wg *sync.WaitGroup waitCh chan struct{} head *CElement // first element tail *CElement // last element @@ -250,7 +211,7 @@ func (l *CList) Front() *CElement { return head } -func (l *CList) FrontWait() *CElement { +func (l *CList) frontWait() *CElement { // Loop until the head is non-nil else wait and try again for { l.mtx.RLock() @@ -273,22 +234,6 @@ func (l *CList) Back() *CElement { return back } -func (l *CList) BackWait() *CElement { - for { - l.mtx.RLock() - tail := l.tail - wg := l.wg - l.mtx.RUnlock() - - if tail != nil { - return tail - } - wg.Wait() - // l.tail doesn't necessarily exist here. - // That's why we need to continue a for-loop. - } -} - // WaitChan can be used to wait until Front or Back becomes not nil. Once it // does, channel will be closed. func (l *CList) WaitChan() <-chan struct{} { @@ -305,7 +250,6 @@ func (l *CList) PushBack(v interface{}) *CElement { // Construct a new element e := &CElement{ prev: nil, - prevWaitCh: make(chan struct{}), next: nil, nextWaitCh: make(chan struct{}), removed: false, @@ -326,8 +270,8 @@ func (l *CList) PushBack(v interface{}) *CElement { l.head = e l.tail = e } else { - e.SetPrev(l.tail) // We must init e first. - l.tail.SetNext(e) // This will make e accessible. + e.setPrev(l.tail) // We must init e first. + l.tail.setNext(e) // This will make e accessible. l.tail = e // Update the list. } l.mtx.Unlock() @@ -365,16 +309,16 @@ func (l *CList) Remove(e *CElement) interface{} { if prev == nil { l.head = next } else { - prev.SetNext(next) + prev.setNext(next) } if next == nil { l.tail = prev } else { - next.SetPrev(prev) + next.setPrev(prev) } // Set .Done() on e, otherwise waiters will wait forever. - e.SetRemoved() + e.setRemoved() return e.Value } diff --git a/internal/libs/clist/clist_test.go b/internal/libs/clist/clist_test.go index a0482fc40..a0449a985 100644 --- a/internal/libs/clist/clist_test.go +++ b/internal/libs/clist/clist_test.go @@ -218,7 +218,7 @@ func TestScanRightDeleteRandom(t *testing.T) { default: } if el == nil { - el = l.FrontWait() + el = l.frontWait() restartCounter++ } el = el.Next() @@ -314,30 +314,6 @@ FOR_LOOP: t.Fatalf("number of pushed items (%d) not equal to number of seen items (%d)", pushed, seen) } - // 4) test iterating backwards (PrevWaitChan and Prev) - prev := next - seen = 0 -FOR_LOOP2: - for { - select { - case <-prev.PrevWaitChan(): - prev = prev.Prev() - seen++ - if prev == nil { - t.Fatal("expected PrevWaitChan to block forever on nil when reached first elem") - } - if pushed == seen { - break FOR_LOOP2 - } - - case <-time.After(250 * time.Millisecond): - break FOR_LOOP2 - } - } - - if pushed != seen { - t.Fatalf("number of pushed items (%d) not equal to number of seen items (%d)", pushed, seen) - } } func TestRemoved(t *testing.T) { diff --git a/libs/events/events.go b/libs/events/events.go index 636aa102d..5ab5961f6 100644 --- a/libs/events/events.go +++ b/libs/events/events.go @@ -50,8 +50,6 @@ type EventSwitch interface { Stop() AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error - RemoveListenerForEvent(event string, listenerID string) - RemoveListener(listenerID string) } type eventSwitch struct { @@ -71,11 +69,8 @@ func NewEventSwitch(logger log.Logger) EventSwitch { return evsw } -func (evsw *eventSwitch) OnStart(ctx context.Context) error { - return nil -} - -func (evsw *eventSwitch) OnStop() {} +func (evsw *eventSwitch) OnStart(ctx context.Context) error { return nil } +func (evsw *eventSwitch) OnStop() {} func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error { // Get/Create eventCell and listener. @@ -103,52 +98,6 @@ func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb E return nil } -func (evsw *eventSwitch) RemoveListener(listenerID string) { - // Get and remove listener. - evsw.mtx.RLock() - listener := evsw.listeners[listenerID] - evsw.mtx.RUnlock() - if listener == nil { - return - } - - evsw.mtx.Lock() - delete(evsw.listeners, listenerID) - evsw.mtx.Unlock() - - // Remove callback for each event. - listener.SetRemoved() - for _, event := range listener.GetEvents() { - evsw.RemoveListenerForEvent(event, listenerID) - } -} - -func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) { - // Get eventCell - evsw.mtx.Lock() - eventCell := evsw.eventCells[event] - evsw.mtx.Unlock() - - if eventCell == nil { - return - } - - // Remove listenerID from eventCell - numListeners := eventCell.RemoveListener(listenerID) - - // Maybe garbage collect eventCell. - if numListeners == 0 { - // Lock again and double check. - evsw.mtx.Lock() // OUTER LOCK - eventCell.mtx.Lock() // INNER LOCK - if len(eventCell.listeners) == 0 { - delete(evsw.eventCells, event) - } - eventCell.mtx.Unlock() // INNER LOCK - evsw.mtx.Unlock() // OUTER LOCK - } -} - func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data EventData) { // Get the eventCell evsw.mtx.RLock() diff --git a/libs/events/events_test.go b/libs/events/events_test.go index 6d0c8b4e7..15f208e06 100644 --- a/libs/events/events_test.go +++ b/libs/events/events_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/log" @@ -28,8 +27,6 @@ func TestAddListenerForEventFireOnce(t *testing.T) { messages := make(chan EventData) require.NoError(t, evsw.AddListenerForEvent("listener", "event", func(ctx context.Context, data EventData) error { - // test there's no deadlock if we remove the listener inside a callback - evsw.RemoveListener("listener") select { case messages <- data: return nil @@ -234,171 +231,7 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) { } } -func TestAddAndRemoveListenerConcurrency(t *testing.T) { - var ( - stopInputEvent = false - roundCount = 2000 - ) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - logger := log.NewTestingLogger(t) - - evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) - - done1 := make(chan struct{}) - done2 := make(chan struct{}) - - // Must be executed concurrently to uncover the data race. - // 1. RemoveListener - go func() { - defer close(done1) - for i := 0; i < roundCount; i++ { - evsw.RemoveListener("listener") - } - }() - - // 2. AddListenerForEvent - go func() { - defer close(done2) - for i := 0; i < roundCount; i++ { - index := i - // we explicitly ignore errors here, since the listener will sometimes be removed - // (that's what we're testing) - _ = evsw.AddListenerForEvent("listener", fmt.Sprintf("event%d", index), - func(ctx context.Context, data EventData) error { - t.Errorf("should not run callback for %d.\n", index) - stopInputEvent = true - return nil - }) - } - }() - - <-done1 - <-done2 - - evsw.RemoveListener("listener") // remove the last listener - - for i := 0; i < roundCount && !stopInputEvent; i++ { - evsw.FireEvent(ctx, fmt.Sprintf("event%d", i), uint64(1001)) - } -} - -// TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to -// two events, fires a thousand integers for the first event, then unsubscribes -// the listener and fires a thousand integers for the second event. -func TestAddAndRemoveListener(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - logger := log.NewTestingLogger(t) - evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) - - doneSum1 := make(chan uint64) - doneSum2 := make(chan uint64) - doneSending1 := make(chan uint64) - doneSending2 := make(chan uint64) - numbers1 := make(chan uint64, 4) - numbers2 := make(chan uint64, 4) - // subscribe two listener to three events - require.NoError(t, evsw.AddListenerForEvent("listener", "event1", - func(ctx context.Context, data EventData) error { - select { - case numbers1 <- data.(uint64): - return nil - case <-ctx.Done(): - return ctx.Err() - } - })) - require.NoError(t, evsw.AddListenerForEvent("listener", "event2", - func(ctx context.Context, data EventData) error { - select { - case numbers2 <- data.(uint64): - return nil - case <-ctx.Done(): - return ctx.Err() - } - })) - // collect received events for event1 - go sumReceivedNumbers(numbers1, doneSum1) - // collect received events for event2 - go sumReceivedNumbers(numbers2, doneSum2) - // go fire events - go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1)) - checkSumEvent1 := <-doneSending1 - // after sending all event1, unsubscribe for all events - evsw.RemoveListener("listener") - go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001)) - checkSumEvent2 := <-doneSending2 - close(numbers1) - close(numbers2) - eventSum1 := <-doneSum1 - eventSum2 := <-doneSum2 - if checkSumEvent1 != eventSum1 || - // correct value asserted by preceding tests, suffices to be non-zero - checkSumEvent2 == uint64(0) || - eventSum2 != uint64(0) { - t.Errorf("not all messages sent were received or unsubscription did not register.\n") - } -} - -// TestRemoveListener does basic tests on adding and removing -func TestRemoveListener(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - logger := log.NewTestingLogger(t) - - evsw := NewEventSwitch(logger) - require.NoError(t, evsw.Start(ctx)) - t.Cleanup(evsw.Wait) - - count := 10 - sum1, sum2 := 0, 0 - // add some listeners and make sure they work - require.NoError(t, evsw.AddListenerForEvent("listener", "event1", - func(ctx context.Context, data EventData) error { - sum1++ - return nil - })) - require.NoError(t, evsw.AddListenerForEvent("listener", "event2", - func(ctx context.Context, data EventData) error { - sum2++ - return nil - })) - - for i := 0; i < count; i++ { - evsw.FireEvent(ctx, "event1", true) - evsw.FireEvent(ctx, "event2", true) - } - assert.Equal(t, count, sum1) - assert.Equal(t, count, sum2) - - // remove one by event and make sure it is gone - evsw.RemoveListenerForEvent("event2", "listener") - for i := 0; i < count; i++ { - evsw.FireEvent(ctx, "event1", true) - evsw.FireEvent(ctx, "event2", true) - } - assert.Equal(t, count*2, sum1) - assert.Equal(t, count, sum2) - - // remove the listener entirely and make sure both gone - evsw.RemoveListener("listener") - for i := 0; i < count; i++ { - evsw.FireEvent(ctx, "event1", true) - evsw.FireEvent(ctx, "event2", true) - } - assert.Equal(t, count*2, sum1) - assert.Equal(t, count, sum2) -} - -// TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two +// TestManagerLiistenersAsync sets up an EventSwitch, subscribes two // listeners to three events, and fires a thousand integers for each event. // These two listeners serve as the baseline validation while other listeners // are randomly subscribed and unsubscribed. @@ -408,7 +241,7 @@ func TestRemoveListener(t *testing.T) { // at that point subscribed to. // NOTE: it is important to run this test with race conditions tracking on, // `go test -race`, to examine for possible race conditions. -func TestRemoveListenersAsync(t *testing.T) { +func TestManageListenersAsync(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger := log.NewTestingLogger(t) @@ -494,18 +327,9 @@ func TestRemoveListenersAsync(t *testing.T) { func(context.Context, EventData) error { return nil }) } } - removeListenersStress := func() { - r2 := rand.New(rand.NewSource(time.Now().Unix())) - r2.Seed(time.Now().UnixNano()) - for k := uint16(0); k < 80; k++ { - listenerNumber := r2.Intn(100) + 3 - go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber)) - } - } addListenersStress() // go fire events go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1)) - removeListenersStress() go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001)) go fireEvents(ctx, evsw, "event3", doneSending3, uint64(2001)) checkSumEvent1 := <-doneSending1