diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index d10f8c728..64f557586 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -82,21 +82,21 @@ var wsTyp = "JSONRPC" // make a simple connection to the server func TestWSConnect(t *testing.T) { - con := newWSCon(t) - con.Close() + wsc := newWSClient(t) + wsc.Stop() } // receive a new block message func TestWSNewBlock(t *testing.T) { - con := newWSCon(t) + wsc := newWSClient(t) eid := types.EventStringNewBlock() - subscribe(t, con, eid) + subscribe(t, wsc, eid) defer func() { - unsubscribe(t, con, eid) - con.Close() + unsubscribe(t, wsc, eid) + wsc.Stop() }() - waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { - fmt.Println("Check:", string(b)) + waitForEvent(t, wsc, eid, true, func() {}, func(eid string, b interface{}) error { + fmt.Println("Check:", b) return nil }) } @@ -106,15 +106,31 @@ func TestWSBlockchainGrowth(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode.") } - con := newWSCon(t) + wsc := newWSClient(t) eid := types.EventStringNewBlock() - subscribe(t, con, eid) + subscribe(t, wsc, eid) defer func() { - unsubscribe(t, con, eid) - con.Close() + unsubscribe(t, wsc, eid) + wsc.Stop() }() + // listen for NewBlock, ensure height increases by 1 - unmarshalValidateBlockchain(t, con, eid) + + var initBlockN int + for i := 0; i < 3; i++ { + waitForEvent(t, wsc, eid, true, func() {}, func(eid string, eventData interface{}) error { + block := eventData.(types.EventDataNewBlock).Block + if i == 0 { + initBlockN = block.Header.Height + } else { + if block.Header.Height != initBlockN+i { + return fmt.Errorf("Expected block %d, got block %d", initBlockN+i, block.Header.Height) + } + } + + return nil + }) + } } /* TODO: this with dummy app.. diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index dfccdb1ee..21de52fa9 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -1,19 +1,14 @@ package rpctest import ( - "encoding/json" - "fmt" - "net/http" "testing" "time" - "github.com/gorilla/websocket" . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" client "github.com/tendermint/go-rpc/client" - "github.com/tendermint/go-rpc/types" _ "github.com/tendermint/tendermint/config/tendermint_test" nm "github.com/tendermint/tendermint/node" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -77,84 +72,58 @@ func newNode(ready chan struct{}) { // Utilities for testing the websocket service // create a new connection -func newWSCon(t *testing.T) *websocket.Conn { - dialer := websocket.DefaultDialer - rHeader := http.Header{} - con, r, err := dialer.Dial(websocketAddr, rHeader) - fmt.Println("response", r) - if err != nil { +func newWSClient(t *testing.T) *client.WSClient { + wsc := client.NewWSClient(websocketAddr) + if _, err := wsc.Start(); err != nil { t.Fatal(err) } - return con + return wsc } // subscribe to an event -func subscribe(t *testing.T, con *websocket.Conn, eventid string) { - err := con.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "subscribe", - Params: []interface{}{eventid}, - }) - if err != nil { +func subscribe(t *testing.T, wsc *client.WSClient, eventid string) { + if err := wsc.Subscribe(eventid); err != nil { t.Fatal(err) } } // unsubscribe from an event -func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) { - err := con.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "unsubscribe", - Params: []interface{}{eventid}, - }) - if err != nil { +func unsubscribe(t *testing.T, wsc *client.WSClient, eventid string) { + if err := wsc.Unsubscribe(eventid); err != nil { t.Fatal(err) } } // wait for an event; do things that might trigger events, and check them when they are received // the check function takes an event id and the byte slice read off the ws -func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeout bool, f func(), check func(string, []byte) error) { +func waitForEvent(t *testing.T, wsc *client.WSClient, eventid string, dieOnTimeout bool, f func(), check func(string, interface{}) error) { // go routine to wait for webscoket msg - goodCh := make(chan []byte) + goodCh := make(chan interface{}) errCh := make(chan error) - quitCh := make(chan struct{}) - defer close(quitCh) // Read message go func() { + var err error + LOOP: for { - _, p, err := con.ReadMessage() - if err != nil { - errCh <- err - break - } else { - // if the event id isnt what we're waiting on - // ignore it - var response rpctypes.RPCResponse - if err := json.Unmarshal(p, &response); err != nil { - errCh <- err - break - } - if response.Error != "" { - errCh <- fmt.Errorf(response.Error) - break - } - + select { + case r := <-wsc.ResultsCh: result := new(ctypes.TMResult) - fmt.Println("RESULT:", string(*response.Result)) - wire.ReadJSONPtr(result, *response.Result, &err) + wire.ReadJSONPtr(result, r, &err) if err != nil { errCh <- err - break + break LOOP } event, ok := (*result).(*ctypes.ResultEvent) if ok && event.Name == eventid { - goodCh <- p - break + goodCh <- event.Data + break LOOP } + case err := <-wsc.ErrorsCh: + errCh <- err + break LOOP + case <-wsc.Quit: + break LOOP } } }() @@ -167,68 +136,27 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou select { case <-timeout.C: if dieOnTimeout { - con.Close() + wsc.Stop() t.Fatalf("%s event was not received in time", eventid) } // else that's great, we didn't hear the event // and we shouldn't have - case p := <-goodCh: + case eventData := <-goodCh: if dieOnTimeout { // message was received and expected // run the check - err := check(eventid, p) - if err != nil { - t.Fatal(err) - panic(err) // Show the stack trace. + if err := check(eventid, eventData); err != nil { + t.Fatal(err) // Show the stack trace. } } else { - con.Close() + wsc.Stop() t.Fatalf("%s event was not expected", eventid) } case err := <-errCh: t.Fatal(err) panic(err) // Show the stack trace. - } -} - -//-------------------------------------------------------------------------------- -func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { - // unmarshall and assert somethings - var response rpctypes.RPCResponse - var err error - if err := json.Unmarshal(b, &response); err != nil { - return nil, err } - if response.Error != "" { - return nil, fmt.Errorf(response.Error) - } - var result ctypes.TMResult - wire.ReadJSONPtr(&result, *response.Result, &err) - if err != nil { - return nil, err - } - block := result.(*ctypes.ResultEvent).Data.(types.EventDataNewBlock).Block - return block, nil } -func unmarshalValidateBlockchain(t *testing.T, con *websocket.Conn, eid string) { - var initBlockN int - for i := 0; i < 3; i++ { - waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { - block, err := unmarshalResponseNewBlock(b) - if err != nil { - return err - } - if i == 0 { - initBlockN = block.Header.Height - } else { - if block.Header.Height != initBlockN+i { - return fmt.Errorf("Expected block %d, got block %d", i, block.Header.Height) - } - } - - return nil - }) - } -} +//--------------------------------------------------------------------------------