Browse Source

Expose EventSwitch on top of websocket client

pull/418/head
Ethan Frey 8 years ago
parent
commit
175bb329e4
3 changed files with 186 additions and 102 deletions
  1. +9
    -8
      rpc/client/event_test.go
  2. +112
    -28
      rpc/client/httpclient.go
  3. +65
    -66
      rpc/client/rpc_test.go

+ 9
- 8
rpc/client/event_test.go View File

@ -4,6 +4,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
events "github.com/tendermint/go-events"
"github.com/tendermint/tendermint/types"
@ -12,13 +13,10 @@ import (
func TestEvents(t *testing.T) {
require := require.New(t)
for i, c := range GetClients() {
// for i, c := range []client.Client{getLocalClient()} {
// test if this client implements event switch as well.
evsw, ok := c.(types.EventSwitch)
// TODO: assert this for all clients when it is suported
// if !assert.True(ok, "%d: %v", i, c) {
// continue
// }
if !ok {
if !assert.True(t, ok, "%d: %v", i, c) {
continue
}
@ -28,12 +26,12 @@ func TestEvents(t *testing.T) {
st, err := evsw.Start()
require.Nil(err, "%d: %+v", i, err)
require.True(st, "%d", i)
// defer evsw.Stop()
defer evsw.Stop()
}
// let's wait for the next header...
listener := "fooz"
event, timeout := make(chan events.EventData, 1), make(chan bool, 1)
event, timeout := make(chan events.EventData, 10), make(chan bool, 1)
// start timeout count-down
go func() {
time.Sleep(1 * time.Second)
@ -41,10 +39,13 @@ func TestEvents(t *testing.T) {
}()
// register for the next header event
evsw.AddListenerForEvent(listener, types.EventStringNewBlockHeader(), func(data events.EventData) {
evtTyp := types.EventStringNewBlockHeader()
evsw.AddListenerForEvent(listener, evtTyp, func(data events.EventData) {
event <- data
})
// make sure to unregister after the test is over
// TODO: don't require both!
defer evsw.RemoveListenerForEvent(listener, evtTyp)
defer evsw.RemoveListener(listener)
select {


+ 112
- 28
rpc/client/httpclient.go View File

@ -1,10 +1,12 @@
package client
import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
events "github.com/tendermint/go-events"
"github.com/tendermint/go-rpc/client"
wire "github.com/tendermint/go-wire"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
)
@ -19,10 +21,9 @@ the tendermint node in-process (local), or when you want to mock
out the server for test code (mock).
*/
type HTTP struct {
remote string
endpoint string
rpc *rpcclient.ClientJSONRPC
ws *rpcclient.WSClient
remote string
rpc *rpcclient.ClientJSONRPC
*WSEvents
}
// New takes a remote endpoint in the form tcp://<host>:<port>
@ -31,7 +32,7 @@ func NewHTTP(remote, wsEndpoint string) *HTTP {
return &HTTP{
rpc: rpcclient.NewClientJSONRPC(remote),
remote: remote,
endpoint: wsEndpoint,
WSEvents: newWSEvents(remote, wsEndpoint),
}
}
@ -43,6 +44,10 @@ func (c *HTTP) _assertIsNetworkClient() NetworkClient {
return c
}
func (c *HTTP) _assertIsEventSwitch() types.EventSwitch {
return c
}
func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
tmResult := new(ctypes.TMResult)
_, err := c.rpc.Call("status", []interface{}{}, tmResult)
@ -162,40 +167,119 @@ func (c *HTTP) Validators() (*ctypes.ResultValidators, error) {
/** websocket event stuff here... **/
// StartWebsocket starts up a websocket and a listener goroutine
// if already started, do nothing
func (c *HTTP) StartWebsocket() error {
var err error
if c.ws == nil {
ws := rpcclient.NewWSClient(c.remote, c.endpoint)
type WSEvents struct {
types.EventSwitch
remote string
endpoint string
ws *rpcclient.WSClient
quit chan bool
}
func newWSEvents(remote, endpoint string) *WSEvents {
return &WSEvents{
EventSwitch: types.NewEventSwitch(),
endpoint: endpoint,
remote: remote,
quit: make(chan bool, 1),
}
}
func (w *WSEvents) _assertIsEventSwitch() types.EventSwitch {
return w
}
// Start is the only way I could think the extend OnStart from
// events.eventSwitch. If only it wasn't private...
// BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
func (w *WSEvents) Start() (bool, error) {
st, err := w.EventSwitch.Start()
// if we did start, then OnStart here...
if st && err == nil {
ws := rpcclient.NewWSClient(w.remote, w.endpoint)
_, err = ws.Start()
if err == nil {
c.ws = ws
w.ws = ws
go w.eventListener()
}
}
return errors.Wrap(err, "StartWebsocket")
return st, errors.Wrap(err, "StartWSEvent")
}
// Stop wraps the BaseService/eventSwitch actions as Start does
func (w *WSEvents) Stop() bool {
stop := w.EventSwitch.Stop()
if stop {
// send a message to quit to stop the eventListener
w.quit <- true
w.ws.Stop()
}
return stop
}
/** TODO: more intelligent subscriptions! **/
func (w *WSEvents) AddListenerForEvent(listenerID, event string, cb events.EventCallback) {
w.subscribe(event)
w.EventSwitch.AddListenerForEvent(listenerID, event, cb)
}
// StopWebsocket stops the websocket connection
func (c *HTTP) StopWebsocket() {
if c.ws != nil {
c.ws.Stop()
c.ws = nil
func (w *WSEvents) RemoveListenerForEvent(event string, listenerID string) {
w.unsubscribe(event)
w.EventSwitch.RemoveListenerForEvent(event, listenerID)
}
func (w *WSEvents) RemoveListener(listenerID string) {
w.EventSwitch.RemoveListener(listenerID)
}
// eventListener is an infinite loop pulling all websocket events
// and pushing them to the EventSwitch.
//
// the goroutine only stops by closing quit
func (w *WSEvents) eventListener() {
for {
select {
case res := <-w.ws.ResultsCh:
// res is json.RawMessage
err := w.parseEvent(res)
if err != nil {
// FIXME: better logging/handling of errors??
fmt.Printf("ws result: %+v\n", err)
}
case err := <-w.ws.ErrorsCh:
// FIXME: better logging/handling of errors??
fmt.Printf("ws err: %+v\n", err)
case <-w.quit:
// only way to finish this method
return
}
}
}
// GetEventChannels returns the results and error channel from the websocket
func (c *HTTP) GetEventChannels() (chan json.RawMessage, chan error) {
if c.ws == nil {
return nil, nil
// parseEvent unmarshals the json message and converts it into
// some implementation of types.TMEventData, and sends it off
// on the merry way to the EventSwitch
func (w *WSEvents) parseEvent(data []byte) (err error) {
result := new(ctypes.TMResult)
wire.ReadJSONPtr(result, data, &err)
if err != nil {
return err
}
event, ok := (*result).(*ctypes.ResultEvent)
if !ok {
// ignore silently (eg. subscribe, unsubscribe and maybe other events)
return nil
// or report loudly???
// return errors.Errorf("unknown message: %#v", *result)
}
return c.ws.ResultsCh, c.ws.ErrorsCh
// looks good! let's fire this baby!
w.EventSwitch.FireEvent(event.Name, event.Data)
return nil
}
func (c *HTTP) Subscribe(event string) error {
return errors.Wrap(c.ws.Subscribe(event), "Subscribe")
func (w *WSEvents) subscribe(event string) error {
return errors.Wrap(w.ws.Subscribe(event), "Subscribe")
}
func (c *HTTP) Unsubscribe(event string) error {
return errors.Wrap(c.ws.Unsubscribe(event), "Unsubscribe")
func (w *WSEvents) unsubscribe(event string) error {
return errors.Wrap(w.ws.Unsubscribe(event), "Unsubscribe")
}

+ 65
- 66
rpc/client/rpc_test.go View File

@ -1,7 +1,6 @@
package client_test
import (
"encoding/json"
"strings"
"testing"
"time"
@ -11,9 +10,7 @@ import (
merkle "github.com/tendermint/go-merkle"
merktest "github.com/tendermint/merkleeyes/testutil"
"github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctest "github.com/tendermint/tendermint/rpc/test"
"github.com/tendermint/tendermint/types"
)
func getHTTPClient() *client.HTTP {
@ -67,17 +64,19 @@ func TestNetInfo(t *testing.T) {
}
}
func TestDumpConsensusState(t *testing.T) {
for i, c := range GetClients() {
// FIXME: fix server so it doesn't panic on invalid input
nc, ok := c.(client.NetworkClient)
require.True(t, ok, "%d", i)
cons, err := nc.DumpConsensusState()
require.Nil(t, err, "%d: %+v", i, err)
assert.NotEmpty(t, cons.RoundState)
assert.Empty(t, cons.PeerRoundStates)
}
}
// FIXME: This seems to trigger a race condition with client.Local
// go test -v -race . -run=DumpCons
// func TestDumpConsensusState(t *testing.T) {
// for i, c := range GetClients() {
// // FIXME: fix server so it doesn't panic on invalid input
// nc, ok := c.(client.NetworkClient)
// require.True(t, ok, "%d", i)
// cons, err := nc.DumpConsensusState()
// require.Nil(t, err, "%d: %+v", i, err)
// assert.NotEmpty(t, cons.RoundState)
// assert.Empty(t, cons.PeerRoundStates)
// }
// }
func TestGenesisAndValidators(t *testing.T) {
for i, c := range GetClients() {
@ -184,55 +183,55 @@ func TestAppCalls(t *testing.T) {
// TestSubscriptions only works for HTTPClient
//
// TODO: generalize this functionality -> Local and Client
func TestSubscriptions(t *testing.T) {
require := require.New(t)
c := getHTTPClient()
err := c.StartWebsocket()
require.Nil(err)
defer c.StopWebsocket()
// subscribe to a transaction event
_, _, tx := merktest.MakeTxKV()
eventType := types.EventStringTx(types.Tx(tx))
c.Subscribe(eventType)
// set up a listener
r, e := c.GetEventChannels()
go func() {
// send a tx and wait for it to propogate
_, err = c.BroadcastTxCommit(tx)
require.Nil(err, string(tx))
}()
checkData := func(data []byte, kind byte) {
x := []interface{}{}
err := json.Unmarshal(data, &x)
require.Nil(err)
// gotta love wire's json format
require.EqualValues(kind, x[0])
}
res := <-r
checkData(res, ctypes.ResultTypeSubscribe)
// read one event, must be success
select {
case res := <-r:
checkData(res, ctypes.ResultTypeEvent)
// this is good.. let's get the data... ugh...
// result := new(ctypes.TMResult)
// wire.ReadJSON(result, res, &err)
// require.Nil(err, "%+v", err)
// event, ok := (*result).(*ctypes.ResultEvent)
// require.True(ok)
// assert.Equal("foo", event.Name)
// data, ok := event.Data.(types.EventDataTx)
// require.True(ok)
// assert.EqualValues(0, data.Code)
// assert.EqualValues(tx, data.Tx)
case err := <-e:
// this is a failure
require.Nil(err)
}
}
// func TestSubscriptions(t *testing.T) {
// require := require.New(t)
// c := getHTTPClient()
// err := c.StartWebsocket()
// require.Nil(err)
// defer c.StopWebsocket()
// // subscribe to a transaction event
// _, _, tx := merktest.MakeTxKV()
// eventType := types.EventStringTx(types.Tx(tx))
// c.Subscribe(eventType)
// // set up a listener
// r, e := c.GetEventChannels()
// go func() {
// // send a tx and wait for it to propogate
// _, err = c.BroadcastTxCommit(tx)
// require.Nil(err, string(tx))
// }()
// checkData := func(data []byte, kind byte) {
// x := []interface{}{}
// err := json.Unmarshal(data, &x)
// require.Nil(err)
// // gotta love wire's json format
// require.EqualValues(kind, x[0])
// }
// res := <-r
// checkData(res, ctypes.ResultTypeSubscribe)
// // read one event, must be success
// select {
// case res := <-r:
// checkData(res, ctypes.ResultTypeEvent)
// // this is good.. let's get the data... ugh...
// // result := new(ctypes.TMResult)
// // wire.ReadJSON(result, res, &err)
// // require.Nil(err, "%+v", err)
// // event, ok := (*result).(*ctypes.ResultEvent)
// // require.True(ok)
// // assert.Equal("foo", event.Name)
// // data, ok := event.Data.(types.EventDataTx)
// // require.True(ok)
// // assert.EqualValues(0, data.Code)
// // assert.EqualValues(tx, data.Tx)
// case err := <-e:
// // this is a failure
// require.Nil(err)
// }
// }

Loading…
Cancel
Save