From 4fead237f082020e3d52b3bb0ab26e38d4267d62 Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Fri, 24 Feb 2017 21:26:17 +0100 Subject: [PATCH] Client embeds EventSwitch, client.HTTP properly un/subscribes events over websocket --- rpc/client/event_test.go | 31 +++++----------- rpc/client/helpers.go | 15 +++++--- rpc/client/httpclient.go | 78 ++++++++++++++++++++++++++++++++++----- rpc/client/interface.go | 4 ++ rpc/client/mock/client.go | 2 + 5 files changed, 93 insertions(+), 37 deletions(-) diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index f43a47e9a..cc421ad90 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -4,7 +4,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" merktest "github.com/tendermint/merkleeyes/testutil" "github.com/tendermint/tendermint/rpc/client" @@ -14,25 +13,19 @@ import ( func TestHeaderEvents(t *testing.T) { require := require.New(t) for i, c := range GetClients() { - // test if this client implements event switch as well. - evsw, ok := c.(types.EventSwitch) - if !assert.True(t, ok, "%d: %v", i, c) { - continue - } - // start for this test it if it wasn't already running - if !evsw.IsRunning() { + if !c.IsRunning() { // if so, then we start it, listen, and stop it. - st, err := evsw.Start() + st, err := c.Start() require.Nil(err, "%d: %+v", i, err) require.True(st, "%d", i) - defer evsw.Stop() + defer c.Stop() } evtTyp := types.EventStringNewBlockHeader() - evt, err := client.WaitForOneEvent(evsw, evtTyp, 1*time.Second) + evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) require.Nil(err, "%d: %+v", i, err) - _, ok = evt.(types.EventDataNewBlockHeader) + _, ok := evt.(types.EventDataNewBlockHeader) require.True(ok, "%d: %#v", i, evt) // TODO: more checks... } @@ -41,19 +34,13 @@ func TestHeaderEvents(t *testing.T) { func TestTxEvents(t *testing.T) { require := require.New(t) for i, c := range GetClients() { - // test if this client implements event switch as well. - evsw, ok := c.(types.EventSwitch) - if !assert.True(t, ok, "%d: %v", i, c) { - continue - } - // start for this test it if it wasn't already running - if !evsw.IsRunning() { + if !c.IsRunning() { // if so, then we start it, listen, and stop it. - st, err := evsw.Start() + st, err := c.Start() require.Nil(err, "%d: %+v", i, err) require.True(st, "%d", i) - defer evsw.Stop() + defer c.Stop() } // make the tx @@ -66,7 +53,7 @@ func TestTxEvents(t *testing.T) { require.True(txres.Code.IsOK()) // and wait for confirmation - evt, err := client.WaitForOneEvent(evsw, evtTyp, 1*time.Second) + evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) require.Nil(err, "%d: %+v", i, err) // and make sure it has the proper info txe, ok := evt.(types.EventDataTx) diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 5af1fe205..89da434b8 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" cmn "github.com/tendermint/go-common" events "github.com/tendermint/go-events" + "github.com/tendermint/tendermint/types" ) // Waiter is informed of current height, decided whether to quit early @@ -55,8 +56,8 @@ func WaitForHeight(c StatusClient, h int, waiter Waiter) error { // when the timeout duration has expired. // // This handles subscribing and unsubscribing under the hood -func WaitForOneEvent(evsw events.EventSwitch, - evtTyp string, timeout time.Duration) (events.EventData, error) { +func WaitForOneEvent(evsw types.EventSwitch, + evtTyp string, timeout time.Duration) (types.TMEventData, error) { listener := cmn.RandStr(12) evts, quit := make(chan events.EventData, 10), make(chan bool, 1) @@ -71,14 +72,18 @@ func WaitForOneEvent(evsw events.EventSwitch, evts <- data }) // make sure to unregister after the test is over - // TODO: don't require both! - defer evsw.RemoveListenerForEvent(listener, evtTyp) + // TODO: why doesn't the other call work??? + // defer evsw.RemoveListenerForEvent(listener, evtTyp) defer evsw.RemoveListener(listener) select { case <-quit: return nil, errors.New("timed out waiting for event") case evt := <-evts: - return evt, nil + tmevt, ok := evt.(types.TMEventData) + if ok { + return tmevt, nil + } + return nil, errors.Errorf("Got unexpected event type: %#v", evt) } } diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 4eb94f5ac..bb4e6d3a8 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -172,8 +172,17 @@ type WSEvents struct { remote string endpoint string ws *rpcclient.WSClient - quit chan bool - done chan bool + + // used for signaling the goroutine that feeds ws -> EventSwitch + quit chan bool + done chan bool + + // used to maintain counts of actively listened events + // so we can properly subscribe/unsubscribe + // FIXME: thread-safety??? + // FIXME: reuse code from go-events??? + evtCount map[string]int // count how many time each event is subscribed + listeners map[string][]string // keep track of which events each listener is listening to } func newWSEvents(remote, endpoint string) *WSEvents { @@ -183,6 +192,8 @@ func newWSEvents(remote, endpoint string) *WSEvents { remote: remote, quit: make(chan bool, 1), done: make(chan bool, 1), + evtCount: map[string]int{}, + listeners: map[string][]string{}, } } @@ -222,16 +233,57 @@ func (w *WSEvents) Stop() bool { /** TODO: more intelligent subscriptions! **/ func (w *WSEvents) AddListenerForEvent(listenerID, event string, cb events.EventCallback) { - w.subscribe(event) + // no one listening -> subscribe + if w.evtCount[event] == 0 { + w.subscribe(event) + } + // if this listener was already listening to this event, return early + for _, s := range w.listeners[listenerID] { + if event == s { + return + } + } + // otherwise, add this event to this listener + w.evtCount[event] += 1 + w.listeners[listenerID] = append(w.listeners[listenerID], event) w.EventSwitch.AddListenerForEvent(listenerID, event, cb) } func (w *WSEvents) RemoveListenerForEvent(event string, listenerID string) { - w.unsubscribe(event) + // if this listener is listening already, splice it out + found := false + l := w.listeners[listenerID] + for i, s := range l { + if event == s { + found = true + w.listeners[listenerID] = append(l[:i], l[i+1:]...) + break + } + } + // if the listener wasn't already listening to the event, exit early + if !found { + return + } + + // now we can update the subscriptions + w.evtCount[event] -= 1 + if w.evtCount[event] == 0 { + w.unsubscribe(event) + } w.EventSwitch.RemoveListenerForEvent(event, listenerID) } func (w *WSEvents) RemoveListener(listenerID string) { + // remove all counts for this listener + for _, s := range w.listeners[listenerID] { + w.evtCount[s] -= 1 + if w.evtCount[s] == 0 { + w.unsubscribe(s) + } + } + w.listeners[listenerID] = nil + + // then let the switch do it's magic w.EventSwitch.RemoveListener(listenerID) } @@ -274,18 +326,24 @@ func (w *WSEvents) parseEvent(data []byte) (err error) { if !ok { // ignore silently (eg. subscribe, unsubscribe and maybe other events) return nil - // or report loudly??? - // return errors.Errorf("unknown message: %#v", *result) } // looks good! let's fire this baby! w.EventSwitch.FireEvent(event.Name, event.Data) return nil } -func (w *WSEvents) subscribe(event string) error { - return errors.Wrap(w.ws.Subscribe(event), "Subscribe") +// no way of exposing these failures, so we panic. +// is this right? or silently ignore??? +func (w *WSEvents) subscribe(event string) { + err := w.ws.Subscribe(event) + if err != nil { + panic(err) + } } -func (w *WSEvents) unsubscribe(event string) error { - return errors.Wrap(w.ws.Unsubscribe(event), "Unsubscribe") +func (w *WSEvents) unsubscribe(event string) { + err := w.ws.Unsubscribe(event) + if err != nil { + panic(err) + } } diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 50f065114..9a5ba668b 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -65,6 +65,10 @@ type Client interface { SignClient HistoryClient StatusClient + + // this Client is reactive, you can subscribe to any TMEventData + // type, given the proper string. see tendermint/types/events.go + types.EventSwitch } // NetworkClient is general info about the network state. May not diff --git a/rpc/client/mock/client.go b/rpc/client/mock/client.go index 0dcba718c..a3cecfca1 100644 --- a/rpc/client/mock/client.go +++ b/rpc/client/mock/client.go @@ -32,6 +32,8 @@ type Client struct { client.SignClient client.HistoryClient client.StatusClient + // create a mock with types.NewEventSwitch() + types.EventSwitch } func (c Client) _assertIsClient() client.Client {