Browse Source

libs: Refactor & document events code (#2576)

* [libs/events] add more godoc comments
* [libs/events] refactor code
- improve var naming
- improve code structure
- do not use defers for unlocking mutexes (defer takes time)
pull/2579/head
Anton Kaliaev 6 years ago
committed by Alexander Simmerl
parent
commit
989a2f32b1
2 changed files with 63 additions and 62 deletions
  1. +37
    -29
      libs/events/events.go
  2. +26
    -33
      libs/events/events_test.go

+ 37
- 29
libs/events/events.go View File

@ -1,6 +1,4 @@
/*
Pub-Sub in go with event caching
*/
// Package events - Pub-Sub in go with event caching
package events package events
import ( import (
@ -10,30 +8,40 @@ import (
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
) )
// ErrListenerWasRemoved is returned by AddEvent if the listener was removed.
type ErrListenerWasRemoved struct { type ErrListenerWasRemoved struct {
listener string
listenerID string
} }
// Error implements the error interface.
func (e ErrListenerWasRemoved) Error() string { func (e ErrListenerWasRemoved) Error() string {
return fmt.Sprintf("listener %s was removed", e.listener)
return fmt.Sprintf("listener #%s was removed", e.listenerID)
} }
// Generic event data can be typed and registered with tendermint/go-amino
// via concrete implementation of this interface
type EventData interface {
}
// EventData is a generic event data can be typed and registered with
// tendermint/go-amino via concrete implementation of this interface.
type EventData interface{}
// reactors and other modules should export
// this interface to become eventable
// Eventable is the interface reactors and other modules must export to become
// eventable.
type Eventable interface { type Eventable interface {
SetEventSwitch(evsw EventSwitch) SetEventSwitch(evsw EventSwitch)
} }
// an event switch or cache implements fireable
// Fireable is the interface that wraps the FireEvent method.
//
// FireEvent fires an event with the given name and data.
type Fireable interface { type Fireable interface {
FireEvent(event string, data EventData) FireEvent(event string, data EventData)
} }
// EventSwitch is the interface for synchronous pubsub, where listeners
// subscribe to certain events and, when an event is fired (see Fireable),
// notified via a callback function.
//
// Listeners are added by calling AddListenerForEvent function.
// They can be removed by calling either RemoveListenerForEvent or
// RemoveListener (for all events).
type EventSwitch interface { type EventSwitch interface {
cmn.Service cmn.Service
Fireable Fireable
@ -67,7 +75,7 @@ func (evsw *eventSwitch) OnStart() error {
func (evsw *eventSwitch) OnStop() {} func (evsw *eventSwitch) OnStop() {}
func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) error { func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) error {
// Get/Create eventCell and listener
// Get/Create eventCell and listener.
evsw.mtx.Lock() evsw.mtx.Lock()
eventCell := evsw.eventCells[event] eventCell := evsw.eventCells[event]
if eventCell == nil { if eventCell == nil {
@ -81,17 +89,17 @@ func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventC
} }
evsw.mtx.Unlock() evsw.mtx.Unlock()
// Add event and listener
err := listener.AddEvent(event)
if err == nil {
eventCell.AddListener(listenerID, cb)
// Add event and listener.
if err := listener.AddEvent(event); err != nil {
return err
} }
eventCell.AddListener(listenerID, cb)
return err
return nil
} }
func (evsw *eventSwitch) RemoveListener(listenerID string) { func (evsw *eventSwitch) RemoveListener(listenerID string) {
// Get and remove listener
// Get and remove listener.
evsw.mtx.RLock() evsw.mtx.RLock()
listener := evsw.listeners[listenerID] listener := evsw.listeners[listenerID]
evsw.mtx.RUnlock() evsw.mtx.RUnlock()
@ -180,14 +188,14 @@ func (cell *eventCell) RemoveListener(listenerID string) int {
func (cell *eventCell) FireEvent(data EventData) { func (cell *eventCell) FireEvent(data EventData) {
cell.mtx.RLock() cell.mtx.RLock()
var listenerCopy []EventCallback
for _, listener := range cell.listeners {
listenerCopy = append(listenerCopy, listener)
var eventCallbacks []EventCallback
for _, cb := range cell.listeners {
eventCallbacks = append(eventCallbacks, cb)
} }
cell.mtx.RUnlock() cell.mtx.RUnlock()
for _, listener := range listenerCopy {
listener(data)
for _, cb := range eventCallbacks {
cb(data)
} }
} }
@ -213,27 +221,27 @@ func newEventListener(id string) *eventListener {
func (evl *eventListener) AddEvent(event string) error { func (evl *eventListener) AddEvent(event string) error {
evl.mtx.Lock() evl.mtx.Lock()
defer evl.mtx.Unlock()
if evl.removed { if evl.removed {
return ErrListenerWasRemoved{listener: evl.id}
evl.mtx.Unlock()
return ErrListenerWasRemoved{listenerID: evl.id}
} }
evl.events = append(evl.events, event) evl.events = append(evl.events, event)
evl.mtx.Unlock()
return nil return nil
} }
func (evl *eventListener) GetEvents() []string { func (evl *eventListener) GetEvents() []string {
evl.mtx.RLock() evl.mtx.RLock()
defer evl.mtx.RUnlock()
events := make([]string, len(evl.events)) events := make([]string, len(evl.events))
copy(events, evl.events) copy(events, evl.events)
evl.mtx.RUnlock()
return events return events
} }
func (evl *eventListener) SetRemoved() { func (evl *eventListener) SetRemoved() {
evl.mtx.Lock() evl.mtx.Lock()
defer evl.mtx.Unlock()
evl.removed = true evl.removed = true
evl.mtx.Unlock()
} }

+ 26
- 33
libs/events/events_test.go View File

@ -6,6 +6,8 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
) )
@ -14,10 +16,9 @@ import (
func TestAddListenerForEventFireOnce(t *testing.T) { func TestAddListenerForEventFireOnce(t *testing.T) {
evsw := NewEventSwitch() evsw := NewEventSwitch()
err := evsw.Start() err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
require.NoError(t, err)
defer evsw.Stop() defer evsw.Stop()
messages := make(chan EventData) messages := make(chan EventData)
evsw.AddListenerForEvent("listener", "event", evsw.AddListenerForEvent("listener", "event",
func(data EventData) { func(data EventData) {
@ -35,10 +36,9 @@ func TestAddListenerForEventFireOnce(t *testing.T) {
func TestAddListenerForEventFireMany(t *testing.T) { func TestAddListenerForEventFireMany(t *testing.T) {
evsw := NewEventSwitch() evsw := NewEventSwitch()
err := evsw.Start() err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
require.NoError(t, err)
defer evsw.Stop() defer evsw.Stop()
doneSum := make(chan uint64) doneSum := make(chan uint64)
doneSending := make(chan uint64) doneSending := make(chan uint64)
numbers := make(chan uint64, 4) numbers := make(chan uint64, 4)
@ -65,10 +65,9 @@ func TestAddListenerForEventFireMany(t *testing.T) {
func TestAddListenerForDifferentEvents(t *testing.T) { func TestAddListenerForDifferentEvents(t *testing.T) {
evsw := NewEventSwitch() evsw := NewEventSwitch()
err := evsw.Start() err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
require.NoError(t, err)
defer evsw.Stop() defer evsw.Stop()
doneSum := make(chan uint64) doneSum := make(chan uint64)
doneSending1 := make(chan uint64) doneSending1 := make(chan uint64)
doneSending2 := make(chan uint64) doneSending2 := make(chan uint64)
@ -111,10 +110,9 @@ func TestAddListenerForDifferentEvents(t *testing.T) {
func TestAddDifferentListenerForDifferentEvents(t *testing.T) { func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
evsw := NewEventSwitch() evsw := NewEventSwitch()
err := evsw.Start() err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
require.NoError(t, err)
defer evsw.Stop() defer evsw.Stop()
doneSum1 := make(chan uint64) doneSum1 := make(chan uint64)
doneSum2 := make(chan uint64) doneSum2 := make(chan uint64)
doneSending1 := make(chan uint64) doneSending1 := make(chan uint64)
@ -174,40 +172,38 @@ func TestAddAndRemoveListenerConcurrency(t *testing.T) {
evsw := NewEventSwitch() evsw := NewEventSwitch()
err := evsw.Start() err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
require.NoError(t, err)
defer evsw.Stop() defer evsw.Stop()
done1 := make(chan struct{}) done1 := make(chan struct{})
done2 := make(chan struct{}) done2 := make(chan struct{})
// Must be executed concurrently to uncover the data race.
// 1. RemoveListener
go func() { go func() {
for i := 0; i < roundCount; i++ { for i := 0; i < roundCount; i++ {
evsw.RemoveListener("listener") evsw.RemoveListener("listener")
} }
done1 <- struct{}{}
close(done1)
}() }()
// 2. AddListenerForEvent
go func() { go func() {
for i := 0; i < roundCount; i++ { for i := 0; i < roundCount; i++ {
index := i //it necessary for closure
index := i
evsw.AddListenerForEvent("listener", fmt.Sprintf("event%d", index), evsw.AddListenerForEvent("listener", fmt.Sprintf("event%d", index),
func(data EventData) { func(data EventData) {
t.Errorf("should not run callback for %d.\n", index) t.Errorf("should not run callback for %d.\n", index)
stopInputEvent = true stopInputEvent = true
}) })
} }
done2 <- struct{}{}
close(done2)
}() }()
<-done1 <-done1
<-done2 <-done2
close(done1)
close(done2)
evsw.RemoveListener("listener") // make sure remove last
evsw.RemoveListener("listener") // remove the last listener
for i := 0; i < roundCount && !stopInputEvent; i++ { for i := 0; i < roundCount && !stopInputEvent; i++ {
evsw.FireEvent(fmt.Sprintf("event%d", i), uint64(1001)) evsw.FireEvent(fmt.Sprintf("event%d", i), uint64(1001))
@ -220,10 +216,9 @@ func TestAddAndRemoveListenerConcurrency(t *testing.T) {
func TestAddAndRemoveListener(t *testing.T) { func TestAddAndRemoveListener(t *testing.T) {
evsw := NewEventSwitch() evsw := NewEventSwitch()
err := evsw.Start() err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
require.NoError(t, err)
defer evsw.Stop() defer evsw.Stop()
doneSum1 := make(chan uint64) doneSum1 := make(chan uint64)
doneSum2 := make(chan uint64) doneSum2 := make(chan uint64)
doneSending1 := make(chan uint64) doneSending1 := make(chan uint64)
@ -266,10 +261,9 @@ func TestAddAndRemoveListener(t *testing.T) {
func TestRemoveListener(t *testing.T) { func TestRemoveListener(t *testing.T) {
evsw := NewEventSwitch() evsw := NewEventSwitch()
err := evsw.Start() err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
require.NoError(t, err)
defer evsw.Stop() defer evsw.Stop()
count := 10 count := 10
sum1, sum2 := 0, 0 sum1, sum2 := 0, 0
// add some listeners and make sure they work // add some listeners and make sure they work
@ -320,10 +314,9 @@ func TestRemoveListener(t *testing.T) {
func TestRemoveListenersAsync(t *testing.T) { func TestRemoveListenersAsync(t *testing.T) {
evsw := NewEventSwitch() evsw := NewEventSwitch()
err := evsw.Start() err := evsw.Start()
if err != nil {
t.Errorf("Failed to start EventSwitch, error: %v", err)
}
require.NoError(t, err)
defer evsw.Stop() defer evsw.Stop()
doneSum1 := make(chan uint64) doneSum1 := make(chan uint64)
doneSum2 := make(chan uint64) doneSum2 := make(chan uint64)
doneSending1 := make(chan uint64) doneSending1 := make(chan uint64)
@ -406,7 +399,7 @@ func TestRemoveListenersAsync(t *testing.T) {
// until the receiving channel `numbers` is closed; it then sends the sum // until the receiving channel `numbers` is closed; it then sends the sum
// on `doneSum` and closes that channel. Expected to be run in a go-routine. // on `doneSum` and closes that channel. Expected to be run in a go-routine.
func sumReceivedNumbers(numbers, doneSum chan uint64) { func sumReceivedNumbers(numbers, doneSum chan uint64) {
var sum uint64 = 0
var sum uint64
for { for {
j, more := <-numbers j, more := <-numbers
sum += j sum += j
@ -425,7 +418,7 @@ func sumReceivedNumbers(numbers, doneSum chan uint64) {
// the test to assert all events have also been received. // the test to assert all events have also been received.
func fireEvents(evsw EventSwitch, event string, doneChan chan uint64, func fireEvents(evsw EventSwitch, event string, doneChan chan uint64,
offset uint64) { offset uint64) {
var sentSum uint64 = 0
var sentSum uint64
for i := offset; i <= offset+uint64(999); i++ { for i := offset; i <= offset+uint64(999); i++ {
sentSum += i sentSum += i
evsw.FireEvent(event, i) evsw.FireEvent(event, i)


Loading…
Cancel
Save