Browse Source

Merge branch 'master' into peer-score-adjust

pull/8137/head
M. J. Fromberger 3 years ago
committed by GitHub
parent
commit
d6e58b2409
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 17 additions and 330 deletions
  1. +0
    -6
      internal/consensus/reactor.go
  2. +1
    -1
      internal/libs/clist/bench_test.go
  3. +11
    -67
      internal/libs/clist/clist.go
  4. +1
    -25
      internal/libs/clist/clist_test.go
  5. +2
    -53
      libs/events/events.go
  6. +2
    -178
      libs/events/events_test.go

+ 0
- 6
internal/consensus/reactor.go View File

@ -219,8 +219,6 @@ func (r *Reactor) OnStart(ctx context.Context) error {
// blocking until they all exit, as well as unsubscribing from events and stopping
// state.
func (r *Reactor) OnStop() {
r.unsubscribeFromBroadcastEvents()
r.state.Stop()
if !r.WaitSync() {
@ -394,10 +392,6 @@ func (r *Reactor) subscribeToBroadcastEvents() {
}
}
func (r *Reactor) unsubscribeFromBroadcastEvents() {
r.state.evsw.RemoveListener(listenerIDConsensus)
}
func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
return &tmcons.NewRoundStep{
Height: rs.Height,


+ 1
- 1
internal/libs/clist/bench_test.go View File

@ -12,7 +12,7 @@ func BenchmarkDetaching(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
start.removed = true
start.DetachNext()
start.detachNext()
start.DetachPrev()
tmp := nxt
nxt = nxt.Next()


+ 11
- 67
internal/libs/clist/clist.go View File

@ -44,7 +44,6 @@ waiting on NextWait() (since it's just a read operation).
type CElement struct {
mtx sync.RWMutex
prev *CElement
prevWaitCh chan struct{}
next *CElement
nextWaitCh chan struct{}
removed bool
@ -72,33 +71,6 @@ func (e *CElement) NextWait() *CElement {
}
}
// Blocking implementation of Prev().
// May return nil iff CElement was head and got removed.
func (e *CElement) PrevWait() *CElement {
for {
e.mtx.RLock()
prev := e.prev
removed := e.removed
signal := e.prevWaitCh
e.mtx.RUnlock()
if prev != nil || removed {
return prev
}
<-signal
}
}
// PrevWaitChan can be used to wait until Prev becomes not nil. Once it does,
// channel will be closed.
func (e *CElement) PrevWaitChan() <-chan struct{} {
e.mtx.RLock()
defer e.mtx.RUnlock()
return e.prevWaitCh
}
// NextWaitChan can be used to wait until Next becomes not nil. Once it does,
// channel will be closed.
func (e *CElement) NextWaitChan() <-chan struct{} {
@ -131,7 +103,7 @@ func (e *CElement) Removed() bool {
return isRemoved
}
func (e *CElement) DetachNext() {
func (e *CElement) detachNext() {
e.mtx.Lock()
if !e.removed {
e.mtx.Unlock()
@ -153,7 +125,7 @@ func (e *CElement) DetachPrev() {
// NOTE: This function needs to be safe for
// concurrent goroutines waiting on nextWg.
func (e *CElement) SetNext(newNext *CElement) {
func (e *CElement) setNext(newNext *CElement) {
e.mtx.Lock()
oldNext := e.next
@ -174,30 +146,20 @@ func (e *CElement) SetNext(newNext *CElement) {
// NOTE: This function needs to be safe for
// concurrent goroutines waiting on prevWg
func (e *CElement) SetPrev(newPrev *CElement) {
func (e *CElement) setPrev(newPrev *CElement) {
e.mtx.Lock()
defer e.mtx.Unlock()
oldPrev := e.prev
e.prev = newPrev
if oldPrev != nil && newPrev == nil {
e.prevWaitCh = make(chan struct{})
}
if oldPrev == nil && newPrev != nil {
close(e.prevWaitCh)
}
}
func (e *CElement) SetRemoved() {
func (e *CElement) setRemoved() {
e.mtx.Lock()
defer e.mtx.Unlock()
e.removed = true
// This wakes up anyone waiting in either direction.
if e.prev == nil {
close(e.prevWaitCh)
}
// This wakes up anyone waiting.
if e.next == nil {
close(e.nextWaitCh)
}
@ -211,7 +173,6 @@ func (e *CElement) SetRemoved() {
// Panics if length grows beyond the max.
type CList struct {
mtx sync.RWMutex
wg *sync.WaitGroup
waitCh chan struct{}
head *CElement // first element
tail *CElement // last element
@ -250,7 +211,7 @@ func (l *CList) Front() *CElement {
return head
}
func (l *CList) FrontWait() *CElement {
func (l *CList) frontWait() *CElement {
// Loop until the head is non-nil else wait and try again
for {
l.mtx.RLock()
@ -273,22 +234,6 @@ func (l *CList) Back() *CElement {
return back
}
func (l *CList) BackWait() *CElement {
for {
l.mtx.RLock()
tail := l.tail
wg := l.wg
l.mtx.RUnlock()
if tail != nil {
return tail
}
wg.Wait()
// l.tail doesn't necessarily exist here.
// That's why we need to continue a for-loop.
}
}
// WaitChan can be used to wait until Front or Back becomes not nil. Once it
// does, channel will be closed.
func (l *CList) WaitChan() <-chan struct{} {
@ -305,7 +250,6 @@ func (l *CList) PushBack(v interface{}) *CElement {
// Construct a new element
e := &CElement{
prev: nil,
prevWaitCh: make(chan struct{}),
next: nil,
nextWaitCh: make(chan struct{}),
removed: false,
@ -326,8 +270,8 @@ func (l *CList) PushBack(v interface{}) *CElement {
l.head = e
l.tail = e
} else {
e.SetPrev(l.tail) // We must init e first.
l.tail.SetNext(e) // This will make e accessible.
e.setPrev(l.tail) // We must init e first.
l.tail.setNext(e) // This will make e accessible.
l.tail = e // Update the list.
}
l.mtx.Unlock()
@ -365,16 +309,16 @@ func (l *CList) Remove(e *CElement) interface{} {
if prev == nil {
l.head = next
} else {
prev.SetNext(next)
prev.setNext(next)
}
if next == nil {
l.tail = prev
} else {
next.SetPrev(prev)
next.setPrev(prev)
}
// Set .Done() on e, otherwise waiters will wait forever.
e.SetRemoved()
e.setRemoved()
return e.Value
}

+ 1
- 25
internal/libs/clist/clist_test.go View File

@ -218,7 +218,7 @@ func TestScanRightDeleteRandom(t *testing.T) {
default:
}
if el == nil {
el = l.FrontWait()
el = l.frontWait()
restartCounter++
}
el = el.Next()
@ -314,30 +314,6 @@ FOR_LOOP:
t.Fatalf("number of pushed items (%d) not equal to number of seen items (%d)", pushed, seen)
}
// 4) test iterating backwards (PrevWaitChan and Prev)
prev := next
seen = 0
FOR_LOOP2:
for {
select {
case <-prev.PrevWaitChan():
prev = prev.Prev()
seen++
if prev == nil {
t.Fatal("expected PrevWaitChan to block forever on nil when reached first elem")
}
if pushed == seen {
break FOR_LOOP2
}
case <-time.After(250 * time.Millisecond):
break FOR_LOOP2
}
}
if pushed != seen {
t.Fatalf("number of pushed items (%d) not equal to number of seen items (%d)", pushed, seen)
}
}
func TestRemoved(t *testing.T) {


+ 2
- 53
libs/events/events.go View File

@ -50,8 +50,6 @@ type EventSwitch interface {
Stop()
AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error
RemoveListenerForEvent(event string, listenerID string)
RemoveListener(listenerID string)
}
type eventSwitch struct {
@ -71,11 +69,8 @@ func NewEventSwitch(logger log.Logger) EventSwitch {
return evsw
}
func (evsw *eventSwitch) OnStart(ctx context.Context) error {
return nil
}
func (evsw *eventSwitch) OnStop() {}
func (evsw *eventSwitch) OnStart(ctx context.Context) error { return nil }
func (evsw *eventSwitch) OnStop() {}
func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error {
// Get/Create eventCell and listener.
@ -103,52 +98,6 @@ func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb E
return nil
}
func (evsw *eventSwitch) RemoveListener(listenerID string) {
// Get and remove listener.
evsw.mtx.RLock()
listener := evsw.listeners[listenerID]
evsw.mtx.RUnlock()
if listener == nil {
return
}
evsw.mtx.Lock()
delete(evsw.listeners, listenerID)
evsw.mtx.Unlock()
// Remove callback for each event.
listener.SetRemoved()
for _, event := range listener.GetEvents() {
evsw.RemoveListenerForEvent(event, listenerID)
}
}
func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) {
// Get eventCell
evsw.mtx.Lock()
eventCell := evsw.eventCells[event]
evsw.mtx.Unlock()
if eventCell == nil {
return
}
// Remove listenerID from eventCell
numListeners := eventCell.RemoveListener(listenerID)
// Maybe garbage collect eventCell.
if numListeners == 0 {
// Lock again and double check.
evsw.mtx.Lock() // OUTER LOCK
eventCell.mtx.Lock() // INNER LOCK
if len(eventCell.listeners) == 0 {
delete(evsw.eventCells, event)
}
eventCell.mtx.Unlock() // INNER LOCK
evsw.mtx.Unlock() // OUTER LOCK
}
}
func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data EventData) {
// Get the eventCell
evsw.mtx.RLock()


+ 2
- 178
libs/events/events_test.go View File

@ -7,7 +7,6 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
@ -28,8 +27,6 @@ func TestAddListenerForEventFireOnce(t *testing.T) {
messages := make(chan EventData)
require.NoError(t, evsw.AddListenerForEvent("listener", "event",
func(ctx context.Context, data EventData) error {
// test there's no deadlock if we remove the listener inside a callback
evsw.RemoveListener("listener")
select {
case messages <- data:
return nil
@ -234,171 +231,7 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
}
}
func TestAddAndRemoveListenerConcurrency(t *testing.T) {
var (
stopInputEvent = false
roundCount = 2000
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
done1 := make(chan struct{})
done2 := make(chan struct{})
// Must be executed concurrently to uncover the data race.
// 1. RemoveListener
go func() {
defer close(done1)
for i := 0; i < roundCount; i++ {
evsw.RemoveListener("listener")
}
}()
// 2. AddListenerForEvent
go func() {
defer close(done2)
for i := 0; i < roundCount; i++ {
index := i
// we explicitly ignore errors here, since the listener will sometimes be removed
// (that's what we're testing)
_ = evsw.AddListenerForEvent("listener", fmt.Sprintf("event%d", index),
func(ctx context.Context, data EventData) error {
t.Errorf("should not run callback for %d.\n", index)
stopInputEvent = true
return nil
})
}
}()
<-done1
<-done2
evsw.RemoveListener("listener") // remove the last listener
for i := 0; i < roundCount && !stopInputEvent; i++ {
evsw.FireEvent(ctx, fmt.Sprintf("event%d", i), uint64(1001))
}
}
// TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to
// two events, fires a thousand integers for the first event, then unsubscribes
// the listener and fires a thousand integers for the second event.
func TestAddAndRemoveListener(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
doneSum1 := make(chan uint64)
doneSum2 := make(chan uint64)
doneSending1 := make(chan uint64)
doneSending2 := make(chan uint64)
numbers1 := make(chan uint64, 4)
numbers2 := make(chan uint64, 4)
// subscribe two listener to three events
require.NoError(t, evsw.AddListenerForEvent("listener", "event1",
func(ctx context.Context, data EventData) error {
select {
case numbers1 <- data.(uint64):
return nil
case <-ctx.Done():
return ctx.Err()
}
}))
require.NoError(t, evsw.AddListenerForEvent("listener", "event2",
func(ctx context.Context, data EventData) error {
select {
case numbers2 <- data.(uint64):
return nil
case <-ctx.Done():
return ctx.Err()
}
}))
// collect received events for event1
go sumReceivedNumbers(numbers1, doneSum1)
// collect received events for event2
go sumReceivedNumbers(numbers2, doneSum2)
// go fire events
go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1))
checkSumEvent1 := <-doneSending1
// after sending all event1, unsubscribe for all events
evsw.RemoveListener("listener")
go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001))
checkSumEvent2 := <-doneSending2
close(numbers1)
close(numbers2)
eventSum1 := <-doneSum1
eventSum2 := <-doneSum2
if checkSumEvent1 != eventSum1 ||
// correct value asserted by preceding tests, suffices to be non-zero
checkSumEvent2 == uint64(0) ||
eventSum2 != uint64(0) {
t.Errorf("not all messages sent were received or unsubscription did not register.\n")
}
}
// TestRemoveListener does basic tests on adding and removing
func TestRemoveListener(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
count := 10
sum1, sum2 := 0, 0
// add some listeners and make sure they work
require.NoError(t, evsw.AddListenerForEvent("listener", "event1",
func(ctx context.Context, data EventData) error {
sum1++
return nil
}))
require.NoError(t, evsw.AddListenerForEvent("listener", "event2",
func(ctx context.Context, data EventData) error {
sum2++
return nil
}))
for i := 0; i < count; i++ {
evsw.FireEvent(ctx, "event1", true)
evsw.FireEvent(ctx, "event2", true)
}
assert.Equal(t, count, sum1)
assert.Equal(t, count, sum2)
// remove one by event and make sure it is gone
evsw.RemoveListenerForEvent("event2", "listener")
for i := 0; i < count; i++ {
evsw.FireEvent(ctx, "event1", true)
evsw.FireEvent(ctx, "event2", true)
}
assert.Equal(t, count*2, sum1)
assert.Equal(t, count, sum2)
// remove the listener entirely and make sure both gone
evsw.RemoveListener("listener")
for i := 0; i < count; i++ {
evsw.FireEvent(ctx, "event1", true)
evsw.FireEvent(ctx, "event2", true)
}
assert.Equal(t, count*2, sum1)
assert.Equal(t, count, sum2)
}
// TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two
// TestManagerLiistenersAsync sets up an EventSwitch, subscribes two
// listeners to three events, and fires a thousand integers for each event.
// These two listeners serve as the baseline validation while other listeners
// are randomly subscribed and unsubscribed.
@ -408,7 +241,7 @@ func TestRemoveListener(t *testing.T) {
// at that point subscribed to.
// NOTE: it is important to run this test with race conditions tracking on,
// `go test -race`, to examine for possible race conditions.
func TestRemoveListenersAsync(t *testing.T) {
func TestManageListenersAsync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
@ -494,18 +327,9 @@ func TestRemoveListenersAsync(t *testing.T) {
func(context.Context, EventData) error { return nil })
}
}
removeListenersStress := func() {
r2 := rand.New(rand.NewSource(time.Now().Unix()))
r2.Seed(time.Now().UnixNano())
for k := uint16(0); k < 80; k++ {
listenerNumber := r2.Intn(100) + 3
go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber))
}
}
addListenersStress()
// go fire events
go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1))
removeListenersStress()
go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001))
go fireEvents(ctx, evsw, "event3", doneSending3, uint64(2001))
checkSumEvent1 := <-doneSending1


Loading…
Cancel
Save