Browse Source

Merge branch 'master' into wb/rebuild-synchrony-params

pull/8142/head
William Banfield 3 years ago
committed by GitHub
parent
commit
2eac1394a3
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 4 additions and 237 deletions
  1. +0
    -6
      internal/consensus/reactor.go
  2. +2
    -53
      libs/events/events.go
  3. +2
    -178
      libs/events/events_test.go

+ 0
- 6
internal/consensus/reactor.go View File

@ -219,8 +219,6 @@ func (r *Reactor) OnStart(ctx context.Context) error {
// blocking until they all exit, as well as unsubscribing from events and stopping // blocking until they all exit, as well as unsubscribing from events and stopping
// state. // state.
func (r *Reactor) OnStop() { func (r *Reactor) OnStop() {
r.unsubscribeFromBroadcastEvents()
r.state.Stop() r.state.Stop()
if !r.WaitSync() { if !r.WaitSync() {
@ -394,10 +392,6 @@ func (r *Reactor) subscribeToBroadcastEvents() {
} }
} }
func (r *Reactor) unsubscribeFromBroadcastEvents() {
r.state.evsw.RemoveListener(listenerIDConsensus)
}
func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep { func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
return &tmcons.NewRoundStep{ return &tmcons.NewRoundStep{
Height: rs.Height, Height: rs.Height,


+ 2
- 53
libs/events/events.go View File

@ -50,8 +50,6 @@ type EventSwitch interface {
Stop() Stop()
AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error
RemoveListenerForEvent(event string, listenerID string)
RemoveListener(listenerID string)
} }
type eventSwitch struct { type eventSwitch struct {
@ -71,11 +69,8 @@ func NewEventSwitch(logger log.Logger) EventSwitch {
return evsw return evsw
} }
func (evsw *eventSwitch) OnStart(ctx context.Context) error {
return nil
}
func (evsw *eventSwitch) OnStop() {}
func (evsw *eventSwitch) OnStart(ctx context.Context) error { return nil }
func (evsw *eventSwitch) OnStop() {}
func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error { func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error {
// Get/Create eventCell and listener. // Get/Create eventCell and listener.
@ -103,52 +98,6 @@ func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb E
return nil return nil
} }
func (evsw *eventSwitch) RemoveListener(listenerID string) {
// Get and remove listener.
evsw.mtx.RLock()
listener := evsw.listeners[listenerID]
evsw.mtx.RUnlock()
if listener == nil {
return
}
evsw.mtx.Lock()
delete(evsw.listeners, listenerID)
evsw.mtx.Unlock()
// Remove callback for each event.
listener.SetRemoved()
for _, event := range listener.GetEvents() {
evsw.RemoveListenerForEvent(event, listenerID)
}
}
func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) {
// Get eventCell
evsw.mtx.Lock()
eventCell := evsw.eventCells[event]
evsw.mtx.Unlock()
if eventCell == nil {
return
}
// Remove listenerID from eventCell
numListeners := eventCell.RemoveListener(listenerID)
// Maybe garbage collect eventCell.
if numListeners == 0 {
// Lock again and double check.
evsw.mtx.Lock() // OUTER LOCK
eventCell.mtx.Lock() // INNER LOCK
if len(eventCell.listeners) == 0 {
delete(evsw.eventCells, event)
}
eventCell.mtx.Unlock() // INNER LOCK
evsw.mtx.Unlock() // OUTER LOCK
}
}
func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data EventData) { func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data EventData) {
// Get the eventCell // Get the eventCell
evsw.mtx.RLock() evsw.mtx.RLock()


+ 2
- 178
libs/events/events_test.go View File

@ -7,7 +7,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
@ -28,8 +27,6 @@ func TestAddListenerForEventFireOnce(t *testing.T) {
messages := make(chan EventData) messages := make(chan EventData)
require.NoError(t, evsw.AddListenerForEvent("listener", "event", require.NoError(t, evsw.AddListenerForEvent("listener", "event",
func(ctx context.Context, data EventData) error { func(ctx context.Context, data EventData) error {
// test there's no deadlock if we remove the listener inside a callback
evsw.RemoveListener("listener")
select { select {
case messages <- data: case messages <- data:
return nil return nil
@ -234,171 +231,7 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
} }
} }
func TestAddAndRemoveListenerConcurrency(t *testing.T) {
var (
stopInputEvent = false
roundCount = 2000
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
done1 := make(chan struct{})
done2 := make(chan struct{})
// Must be executed concurrently to uncover the data race.
// 1. RemoveListener
go func() {
defer close(done1)
for i := 0; i < roundCount; i++ {
evsw.RemoveListener("listener")
}
}()
// 2. AddListenerForEvent
go func() {
defer close(done2)
for i := 0; i < roundCount; i++ {
index := i
// we explicitly ignore errors here, since the listener will sometimes be removed
// (that's what we're testing)
_ = evsw.AddListenerForEvent("listener", fmt.Sprintf("event%d", index),
func(ctx context.Context, data EventData) error {
t.Errorf("should not run callback for %d.\n", index)
stopInputEvent = true
return nil
})
}
}()
<-done1
<-done2
evsw.RemoveListener("listener") // remove the last listener
for i := 0; i < roundCount && !stopInputEvent; i++ {
evsw.FireEvent(ctx, fmt.Sprintf("event%d", i), uint64(1001))
}
}
// TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to
// two events, fires a thousand integers for the first event, then unsubscribes
// the listener and fires a thousand integers for the second event.
func TestAddAndRemoveListener(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
doneSum1 := make(chan uint64)
doneSum2 := make(chan uint64)
doneSending1 := make(chan uint64)
doneSending2 := make(chan uint64)
numbers1 := make(chan uint64, 4)
numbers2 := make(chan uint64, 4)
// subscribe two listener to three events
require.NoError(t, evsw.AddListenerForEvent("listener", "event1",
func(ctx context.Context, data EventData) error {
select {
case numbers1 <- data.(uint64):
return nil
case <-ctx.Done():
return ctx.Err()
}
}))
require.NoError(t, evsw.AddListenerForEvent("listener", "event2",
func(ctx context.Context, data EventData) error {
select {
case numbers2 <- data.(uint64):
return nil
case <-ctx.Done():
return ctx.Err()
}
}))
// collect received events for event1
go sumReceivedNumbers(numbers1, doneSum1)
// collect received events for event2
go sumReceivedNumbers(numbers2, doneSum2)
// go fire events
go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1))
checkSumEvent1 := <-doneSending1
// after sending all event1, unsubscribe for all events
evsw.RemoveListener("listener")
go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001))
checkSumEvent2 := <-doneSending2
close(numbers1)
close(numbers2)
eventSum1 := <-doneSum1
eventSum2 := <-doneSum2
if checkSumEvent1 != eventSum1 ||
// correct value asserted by preceding tests, suffices to be non-zero
checkSumEvent2 == uint64(0) ||
eventSum2 != uint64(0) {
t.Errorf("not all messages sent were received or unsubscription did not register.\n")
}
}
// TestRemoveListener does basic tests on adding and removing
func TestRemoveListener(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
count := 10
sum1, sum2 := 0, 0
// add some listeners and make sure they work
require.NoError(t, evsw.AddListenerForEvent("listener", "event1",
func(ctx context.Context, data EventData) error {
sum1++
return nil
}))
require.NoError(t, evsw.AddListenerForEvent("listener", "event2",
func(ctx context.Context, data EventData) error {
sum2++
return nil
}))
for i := 0; i < count; i++ {
evsw.FireEvent(ctx, "event1", true)
evsw.FireEvent(ctx, "event2", true)
}
assert.Equal(t, count, sum1)
assert.Equal(t, count, sum2)
// remove one by event and make sure it is gone
evsw.RemoveListenerForEvent("event2", "listener")
for i := 0; i < count; i++ {
evsw.FireEvent(ctx, "event1", true)
evsw.FireEvent(ctx, "event2", true)
}
assert.Equal(t, count*2, sum1)
assert.Equal(t, count, sum2)
// remove the listener entirely and make sure both gone
evsw.RemoveListener("listener")
for i := 0; i < count; i++ {
evsw.FireEvent(ctx, "event1", true)
evsw.FireEvent(ctx, "event2", true)
}
assert.Equal(t, count*2, sum1)
assert.Equal(t, count, sum2)
}
// TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two
// TestManagerLiistenersAsync sets up an EventSwitch, subscribes two
// listeners to three events, and fires a thousand integers for each event. // listeners to three events, and fires a thousand integers for each event.
// These two listeners serve as the baseline validation while other listeners // These two listeners serve as the baseline validation while other listeners
// are randomly subscribed and unsubscribed. // are randomly subscribed and unsubscribed.
@ -408,7 +241,7 @@ func TestRemoveListener(t *testing.T) {
// at that point subscribed to. // at that point subscribed to.
// NOTE: it is important to run this test with race conditions tracking on, // NOTE: it is important to run this test with race conditions tracking on,
// `go test -race`, to examine for possible race conditions. // `go test -race`, to examine for possible race conditions.
func TestRemoveListenersAsync(t *testing.T) {
func TestManageListenersAsync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
logger := log.NewTestingLogger(t) logger := log.NewTestingLogger(t)
@ -494,18 +327,9 @@ func TestRemoveListenersAsync(t *testing.T) {
func(context.Context, EventData) error { return nil }) func(context.Context, EventData) error { return nil })
} }
} }
removeListenersStress := func() {
r2 := rand.New(rand.NewSource(time.Now().Unix()))
r2.Seed(time.Now().UnixNano())
for k := uint16(0); k < 80; k++ {
listenerNumber := r2.Intn(100) + 3
go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber))
}
}
addListenersStress() addListenersStress()
// go fire events // go fire events
go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1)) go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1))
removeListenersStress()
go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001)) go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001))
go fireEvents(ctx, evsw, "event3", doneSending3, uint64(2001)) go fireEvents(ctx, evsw, "event3", doneSending3, uint64(2001))
checkSumEvent1 := <-doneSending1 checkSumEvent1 := <-doneSending1


Loading…
Cancel
Save