Browse Source

Clean up event switch add helper function

pull/418/head
Ethan Frey 8 years ago
parent
commit
6282fad518
4 changed files with 88 additions and 84 deletions
  1. +46
    -27
      rpc/client/event_test.go
  2. +35
    -0
      rpc/client/helpers.go
  3. +7
    -1
      rpc/client/httpclient.go
  4. +0
    -56
      rpc/client/rpc_test.go

+ 46
- 27
rpc/client/event_test.go View File

@ -6,14 +6,14 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
events "github.com/tendermint/go-events"
merktest "github.com/tendermint/merkleeyes/testutil"
"github.com/tendermint/tendermint/rpc/client"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
func TestEvents(t *testing.T) {
func TestHeaderEvents(t *testing.T) {
require := require.New(t) require := require.New(t)
for i, c := range GetClients() { for i, c := range GetClients() {
// for i, c := range []client.Client{getLocalClient()} {
// test if this client implements event switch as well. // test if this client implements event switch as well.
evsw, ok := c.(types.EventSwitch) evsw, ok := c.(types.EventSwitch)
if !assert.True(t, ok, "%d: %v", i, c) { if !assert.True(t, ok, "%d: %v", i, c) {
@ -29,31 +29,50 @@ func TestEvents(t *testing.T) {
defer evsw.Stop() defer evsw.Stop()
} }
// let's wait for the next header...
listener := "fooz"
event, timeout := make(chan events.EventData, 10), make(chan bool, 1)
// start timeout count-down
go func() {
time.Sleep(1 * time.Second)
timeout <- true
}()
// register for the next header event
evtTyp := types.EventStringNewBlockHeader() evtTyp := types.EventStringNewBlockHeader()
evsw.AddListenerForEvent(listener, evtTyp, func(data events.EventData) {
event <- data
})
// make sure to unregister after the test is over
// TODO: don't require both!
defer evsw.RemoveListenerForEvent(listener, evtTyp)
defer evsw.RemoveListener(listener)
select {
case <-timeout:
require.True(false, "%d: a timeout waiting for event", i)
case evt := <-event:
_, ok := evt.(types.EventDataNewBlockHeader)
require.True(ok, "%d: %#v", i, evt)
evt, err := client.WaitForOneEvent(evsw, evtTyp, 1*time.Second)
require.Nil(err, "%d: %+v", i, err)
_, ok = evt.(types.EventDataNewBlockHeader)
require.True(ok, "%d: %#v", i, evt)
// TODO: more checks...
}
}
func TestTxEvents(t *testing.T) {
require := require.New(t)
for i, c := range GetClients() {
// test if this client implements event switch as well.
evsw, ok := c.(types.EventSwitch)
if !assert.True(t, ok, "%d: %v", i, c) {
continue
}
// start for this test it if it wasn't already running
if !evsw.IsRunning() {
// if so, then we start it, listen, and stop it.
st, err := evsw.Start()
require.Nil(err, "%d: %+v", i, err)
require.True(st, "%d", i)
defer evsw.Stop()
} }
// make the tx
_, _, tx := merktest.MakeTxKV()
evtTyp := types.EventStringTx(types.Tx(tx))
// send async
txres, err := c.BroadcastTxAsync(tx)
require.Nil(err, "%+v", err)
require.True(txres.Code.IsOK())
// and wait for confirmation
evt, err := client.WaitForOneEvent(evsw, evtTyp, 1*time.Second)
require.Nil(err, "%d: %+v", i, err)
// and make sure it has the proper info
txe, ok := evt.(types.EventDataTx)
require.True(ok, "%d: %#v", i, evt)
// make sure this is the proper tx
require.EqualValues(tx, txe.Tx)
require.True(txe.Code.IsOK())
} }
} }

+ 35
- 0
rpc/client/helpers.go View File

@ -4,6 +4,8 @@ import (
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
cmn "github.com/tendermint/go-common"
events "github.com/tendermint/go-events"
) )
// Waiter is informed of current height, decided whether to quit early // Waiter is informed of current height, decided whether to quit early
@ -47,3 +49,36 @@ func WaitForHeight(c StatusClient, h int, waiter Waiter) error {
} }
return nil return nil
} }
// WaitForOneEvent subscribes to a websocket event for the given
// event time and returns upon receiving it one time, or
// when the timeout duration has expired.
//
// This handles subscribing and unsubscribing under the hood
func WaitForOneEvent(evsw events.EventSwitch,
evtTyp string, timeout time.Duration) (events.EventData, error) {
listener := cmn.RandStr(12)
evts, quit := make(chan events.EventData, 10), make(chan bool, 1)
// start timeout count-down
go func() {
time.Sleep(1 * time.Second)
quit <- true
}()
// register for the next event of this type
evsw.AddListenerForEvent(listener, evtTyp, func(data events.EventData) {
evts <- data
})
// make sure to unregister after the test is over
// TODO: don't require both!
defer evsw.RemoveListenerForEvent(listener, evtTyp)
defer evsw.RemoveListener(listener)
select {
case <-quit:
return nil, errors.New("timed out waiting for event")
case evt := <-evts:
return evt, nil
}
}

+ 7
- 1
rpc/client/httpclient.go View File

@ -173,6 +173,7 @@ type WSEvents struct {
endpoint string endpoint string
ws *rpcclient.WSClient ws *rpcclient.WSClient
quit chan bool quit chan bool
done chan bool
} }
func newWSEvents(remote, endpoint string) *WSEvents { func newWSEvents(remote, endpoint string) *WSEvents {
@ -181,6 +182,7 @@ func newWSEvents(remote, endpoint string) *WSEvents {
endpoint: endpoint, endpoint: endpoint,
remote: remote, remote: remote,
quit: make(chan bool, 1), quit: make(chan bool, 1),
done: make(chan bool, 1),
} }
} }
@ -211,7 +213,9 @@ func (w *WSEvents) Stop() bool {
if stop { if stop {
// send a message to quit to stop the eventListener // send a message to quit to stop the eventListener
w.quit <- true w.quit <- true
<-w.done
w.ws.Stop() w.ws.Stop()
w.ws = nil
} }
return stop return stop
} }
@ -249,7 +253,9 @@ func (w *WSEvents) eventListener() {
// FIXME: better logging/handling of errors?? // FIXME: better logging/handling of errors??
fmt.Printf("ws err: %+v\n", err) fmt.Printf("ws err: %+v\n", err)
case <-w.quit: case <-w.quit:
// only way to finish this method
// send a message so we can wait for the routine to exit
// before cleaning up the w.ws stuff
w.done <- true
return return
} }
} }


+ 0
- 56
rpc/client/rpc_test.go View File

@ -179,59 +179,3 @@ func TestAppCalls(t *testing.T) {
} }
} }
} }
// TestSubscriptions only works for HTTPClient
//
// TODO: generalize this functionality -> Local and Client
// func TestSubscriptions(t *testing.T) {
// require := require.New(t)
// c := getHTTPClient()
// err := c.StartWebsocket()
// require.Nil(err)
// defer c.StopWebsocket()
// // subscribe to a transaction event
// _, _, tx := merktest.MakeTxKV()
// eventType := types.EventStringTx(types.Tx(tx))
// c.Subscribe(eventType)
// // set up a listener
// r, e := c.GetEventChannels()
// go func() {
// // send a tx and wait for it to propogate
// _, err = c.BroadcastTxCommit(tx)
// require.Nil(err, string(tx))
// }()
// checkData := func(data []byte, kind byte) {
// x := []interface{}{}
// err := json.Unmarshal(data, &x)
// require.Nil(err)
// // gotta love wire's json format
// require.EqualValues(kind, x[0])
// }
// res := <-r
// checkData(res, ctypes.ResultTypeSubscribe)
// // read one event, must be success
// select {
// case res := <-r:
// checkData(res, ctypes.ResultTypeEvent)
// // this is good.. let's get the data... ugh...
// // result := new(ctypes.TMResult)
// // wire.ReadJSON(result, res, &err)
// // require.Nil(err, "%+v", err)
// // event, ok := (*result).(*ctypes.ResultEvent)
// // require.True(ok)
// // assert.Equal("foo", event.Name)
// // data, ok := event.Data.(types.EventDataTx)
// // require.True(ok)
// // assert.EqualValues(0, data.Code)
// // assert.EqualValues(tx, data.Tx)
// case err := <-e:
// // this is a failure
// require.Nil(err)
// }
// }

Loading…
Cancel
Save