From 3ca5292dc95d7cae164a61bce65b92532bd7715e Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 5 Apr 2018 21:19:14 -0700 Subject: [PATCH] Fix rpc tests --- Gopkg.lock | 6 +- Gopkg.toml | 2 +- node/node.go | 7 +- rpc/client/event_test.go | 201 ++++++++++++++++++---------------- rpc/client/httpclient.go | 18 ++- rpc/core/routes.go | 15 +-- rpc/core/types/wire.go | 13 +++ rpc/lib/client/http_client.go | 9 ++ rpc/lib/client/ws_client.go | 4 + rpc/test/helpers.go | 5 + types/events.go | 17 ++- 11 files changed, 168 insertions(+), 129 deletions(-) create mode 100644 rpc/core/types/wire.go diff --git a/Gopkg.lock b/Gopkg.lock index c0704d722..2d6fe8b93 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -254,8 +254,8 @@ [[projects]] name = "github.com/tendermint/go-amino" packages = ["."] - revision = "26718ab6738f938d4b33d593543cee7681f2a6a6" - version = "0.9.5" + revision = "42246108ff925a457fb709475070a03dfd3e2b5c" + version = "0.9.6" [[projects]] name = "github.com/tendermint/go-crypto" @@ -383,6 +383,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "0dacd2eb1550ca01e0c64f77b721eda1a381dde1d246a56bfe5a2746b78b7bad" + inputs-digest = "d14dbd59436d0ea3b322c42ce33c213b26cd2451ba023a466764b8002e0e649d" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 73f73bebc..d689da986 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -79,7 +79,7 @@ [[constraint]] name = "github.com/tendermint/go-amino" - version = "0.9.5" + version = "0.9.6" [[constraint]] name = "github.com/tendermint/tmlibs" diff --git a/node/node.go b/node/node.go index 0b5f73725..221eb1eb0 100644 --- a/node/node.go +++ b/node/node.go @@ -26,6 +26,7 @@ import ( "github.com/tendermint/tendermint/p2p/trust" "github.com/tendermint/tendermint/proxy" rpccore "github.com/tendermint/tendermint/rpc/core" + ctypes "github.com/tendermint/tendermint/rpc/core/types" grpccore "github.com/tendermint/tendermint/rpc/grpc" rpc "github.com/tendermint/tendermint/rpc/lib" rpcserver "github.com/tendermint/tendermint/rpc/lib/server" @@ -489,6 +490,8 @@ func (n *Node) ConfigureRPC() { func (n *Node) startRPC() ([]net.Listener, error) { n.ConfigureRPC() listenAddrs := strings.Split(n.config.RPC.ListenAddress, ",") + coreCodec := amino.NewCodec() + ctypes.RegisterAmino(coreCodec) if n.config.RPC.Unsafe { rpccore.AddUnsafeRoutes() @@ -499,10 +502,10 @@ func (n *Node) startRPC() ([]net.Listener, error) { for i, listenAddr := range listenAddrs { mux := http.NewServeMux() rpcLogger := n.Logger.With("module", "rpc-server") - wm := rpcserver.NewWebsocketManager(rpccore.Routes, rpccore.RoutesCodec, rpcserver.EventSubscriber(n.eventBus)) + wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.EventSubscriber(n.eventBus)) wm.SetLogger(rpcLogger.With("protocol", "websocket")) mux.HandleFunc("/websocket", wm.WebsocketHandler) - rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpccore.RoutesCodec, rpcLogger) + rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger) listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger) if err != nil { return nil, err diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index e90ace43f..2254c1d1e 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -1,16 +1,16 @@ package client_test import ( + "reflect" "testing" "time" "github.com/stretchr/testify/require" abci "github.com/tendermint/abci/types" - cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tendermint/rpc/client" "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tmlibs/common" ) var waitForEventTimeout = 5 * time.Second @@ -23,116 +23,127 @@ func MakeTxKV() ([]byte, []byte, []byte) { } func TestHeaderEvents(t *testing.T) { - require := require.New(t) for i, c := range GetClients() { - // start for this test it if it wasn't already running - if !c.IsRunning() { - // if so, then we start it, listen, and stop it. - err := c.Start() - require.Nil(err, "%d: %+v", i, err) - defer c.Stop() - } - - evtTyp := types.EventNewBlockHeader - evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) - require.Nil(err, "%d: %+v", i, err) - _, ok := evt.(types.EventDataNewBlockHeader) - require.True(ok, "%d: %#v", i, evt) - // TODO: more checks... + i, c := i, c // capture params + t.Run(reflect.TypeOf(c).String(), func(t *testing.T) { + // start for this test it if it wasn't already running + if !c.IsRunning() { + // if so, then we start it, listen, and stop it. + err := c.Start() + require.Nil(t, err, "%d: %+v", i, err) + defer c.Stop() + } + + evtTyp := types.EventNewBlockHeader + evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) + require.Nil(t, err, "%d: %+v", i, err) + _, ok := evt.(types.EventDataNewBlockHeader) + require.True(t, ok, "%d: %#v", i, evt) + // TODO: more checks... + }) } } func TestBlockEvents(t *testing.T) { - require := require.New(t) for i, c := range GetClients() { - // start for this test it if it wasn't already running - if !c.IsRunning() { - // if so, then we start it, listen, and stop it. - err := c.Start() - require.Nil(err, "%d: %+v", i, err) - defer c.Stop() - } - - // listen for a new block; ensure height increases by 1 - var firstBlockHeight int64 - for j := 0; j < 3; j++ { - evtTyp := types.EventNewBlock - evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) - require.Nil(err, "%d: %+v", j, err) - blockEvent, ok := evt.(types.EventDataNewBlock) - require.True(ok, "%d: %#v", j, evt) - - block := blockEvent.Block - if j == 0 { - firstBlockHeight = block.Header.Height - continue + i, c := i, c // capture params + t.Run(reflect.TypeOf(c).String(), func(t *testing.T) { + + // start for this test it if it wasn't already running + if !c.IsRunning() { + // if so, then we start it, listen, and stop it. + err := c.Start() + require.Nil(t, err, "%d: %+v", i, err) + defer c.Stop() } - require.Equal(block.Header.Height, firstBlockHeight+int64(j)) - } + // listen for a new block; ensure height increases by 1 + var firstBlockHeight int64 + for j := 0; j < 3; j++ { + evtTyp := types.EventNewBlock + evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) + require.Nil(t, err, "%d: %+v", j, err) + blockEvent, ok := evt.(types.EventDataNewBlock) + require.True(t, ok, "%d: %#v", j, evt) + + block := blockEvent.Block + if j == 0 { + firstBlockHeight = block.Header.Height + continue + } + + require.Equal(t, block.Header.Height, firstBlockHeight+int64(j)) + } + }) } } func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { - require := require.New(t) for i, c := range GetClients() { - // start for this test it if it wasn't already running - if !c.IsRunning() { - // if so, then we start it, listen, and stop it. - err := c.Start() - require.Nil(err, "%d: %+v", i, err) - defer c.Stop() - } - - // make the tx - _, _, tx := MakeTxKV() - evtTyp := types.EventTx - - // send async - txres, err := c.BroadcastTxAsync(tx) - require.Nil(err, "%+v", err) - require.Equal(txres.Code, abci.CodeTypeOK) // FIXME - - // and wait for confirmation - evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) - require.Nil(err, "%d: %+v", i, err) - // and make sure it has the proper info - txe, ok := evt.(types.EventDataTx) - require.True(ok, "%d: %#v", i, evt) - // make sure this is the proper tx - require.EqualValues(tx, txe.Tx) - require.True(txe.Result.IsOK()) + i, c := i, c // capture params + t.Run(reflect.TypeOf(c).String(), func(t *testing.T) { + + // start for this test it if it wasn't already running + if !c.IsRunning() { + // if so, then we start it, listen, and stop it. + err := c.Start() + require.Nil(t, err, "%d: %+v", i, err) + defer c.Stop() + } + + // make the tx + _, _, tx := MakeTxKV() + evtTyp := types.EventTx + + // send async + txres, err := c.BroadcastTxAsync(tx) + require.Nil(t, err, "%+v", err) + require.Equal(t, txres.Code, abci.CodeTypeOK) // FIXME + + // and wait for confirmation + evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) + require.Nil(t, err, "%d: %+v", i, err) + // and make sure it has the proper info + txe, ok := evt.(types.EventDataTx) + require.True(t, ok, "%d: %#v", i, evt) + // make sure this is the proper tx + require.EqualValues(t, tx, txe.Tx) + require.True(t, txe.Result.IsOK()) + }) } } func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { - require := require.New(t) for i, c := range GetClients() { - // start for this test it if it wasn't already running - if !c.IsRunning() { - // if so, then we start it, listen, and stop it. - err := c.Start() - require.Nil(err, "%d: %+v", i, err) - defer c.Stop() - } - - // make the tx - _, _, tx := MakeTxKV() - evtTyp := types.EventTx - - // send sync - txres, err := c.BroadcastTxSync(tx) - require.Nil(err, "%+v", err) - require.Equal(txres.Code, abci.CodeTypeOK) // FIXME - - // and wait for confirmation - evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) - require.Nil(err, "%d: %+v", i, err) - // and make sure it has the proper info - txe, ok := evt.(types.EventDataTx) - require.True(ok, "%d: %#v", i, evt) - // make sure this is the proper tx - require.EqualValues(tx, txe.Tx) - require.True(txe.Result.IsOK()) + i, c := i, c // capture params + t.Run(reflect.TypeOf(c).String(), func(t *testing.T) { + + // start for this test it if it wasn't already running + if !c.IsRunning() { + // if so, then we start it, listen, and stop it. + err := c.Start() + require.Nil(t, err, "%d: %+v", i, err) + defer c.Stop() + } + + // make the tx + _, _, tx := MakeTxKV() + evtTyp := types.EventTx + + // send sync + txres, err := c.BroadcastTxSync(tx) + require.Nil(t, err, "%+v", err) + require.Equal(t, txres.Code, abci.CodeTypeOK) // FIXME + + // and wait for confirmation + evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) + require.Nil(t, err, "%d: %+v", i, err) + // and make sure it has the proper info + txe, ok := evt.(types.EventDataTx) + require.True(t, ok, "%d: %#v", i, evt) + // make sure this is the proper tx + require.EqualValues(t, tx, txe.Tx) + require.True(t, txe.Result.IsOK()) + }) } } diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index bc6cf759e..496ce9c66 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -2,11 +2,11 @@ package client import ( "context" - "encoding/json" "sync" "github.com/pkg/errors" + amino "github.com/tendermint/go-amino" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpcclient "github.com/tendermint/tendermint/rpc/lib/client" "github.com/tendermint/tendermint/types" @@ -32,10 +32,14 @@ type HTTP struct { // New takes a remote endpoint in the form tcp://: // and the websocket path (which always seems to be "/websocket") func NewHTTP(remote, wsEndpoint string) *HTTP { + rc := rpcclient.NewJSONRPCClient(remote) + cdc := rc.Codec() + ctypes.RegisterAmino(cdc) + return &HTTP{ - rpc: rpcclient.NewJSONRPCClient(remote), + rpc: rc, remote: remote, - WSEvents: newWSEvents(remote, wsEndpoint), + WSEvents: newWSEvents(cdc, remote, wsEndpoint), } } @@ -208,6 +212,7 @@ func (c *HTTP) Validators(height *int64) (*ctypes.ResultValidators, error) { type WSEvents struct { cmn.BaseService + cdc *amino.Codec remote string endpoint string ws *rpcclient.WSClient @@ -216,8 +221,9 @@ type WSEvents struct { subscriptions map[string]chan<- interface{} } -func newWSEvents(remote, endpoint string) *WSEvents { +func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { wsEvents := &WSEvents{ + cdc: cdc, endpoint: endpoint, remote: remote, subscriptions: make(map[string]chan<- interface{}), @@ -231,6 +237,8 @@ func (w *WSEvents) OnStart() error { w.ws = rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() { w.redoSubscriptions() })) + w.ws.SetCodec(w.cdc) + err := w.ws.Start() if err != nil { return err @@ -326,7 +334,7 @@ func (w *WSEvents) eventListener() { continue } result := new(ctypes.ResultEvent) - err := json.Unmarshal(resp.Result, result) + err := w.cdc.UnmarshalJSON(resp.Result, result) if err != nil { w.Logger.Error("failed to unmarshal response", "err", err) continue diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 9368dc3df..337dd64aa 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -1,13 +1,11 @@ package core import ( - "github.com/tendermint/go-amino" - "github.com/tendermint/go-crypto" rpc "github.com/tendermint/tendermint/rpc/lib/server" - "github.com/tendermint/tendermint/types" ) // TODO: better system than "unsafe" prefix +// NOTE: Amino is registered in rpc/core/types/wire.go. var Routes = map[string]*rpc.RPCFunc{ // subscribe/unsubscribe are reserved for websocket events. "subscribe": rpc.NewWSRPCFunc(Subscribe, "query"), @@ -50,14 +48,3 @@ func AddUnsafeRoutes() { Routes["unsafe_stop_cpu_profiler"] = rpc.NewRPCFunc(UnsafeStopCPUProfiler, "") Routes["unsafe_write_heap_profile"] = rpc.NewRPCFunc(UnsafeWriteHeapProfile, "filename") } - -var RoutesCodec *amino.Codec - -func init() { - cdc := amino.NewCodec() - RoutesCodec = cdc - - types.RegisterEventDatas(cdc) - types.RegisterEvidences(cdc) - crypto.RegisterAmino(cdc) -} diff --git a/rpc/core/types/wire.go b/rpc/core/types/wire.go new file mode 100644 index 000000000..6648364b1 --- /dev/null +++ b/rpc/core/types/wire.go @@ -0,0 +1,13 @@ +package core_types + +import ( + "github.com/tendermint/go-amino" + "github.com/tendermint/go-crypto" + "github.com/tendermint/tendermint/types" +) + +func RegisterAmino(cdc *amino.Codec) { + types.RegisterEventDatas(cdc) + types.RegisterEvidences(cdc) + crypto.RegisterAmino(cdc) +} diff --git a/rpc/lib/client/http_client.go b/rpc/lib/client/http_client.go index a41f3125c..e26d8f274 100644 --- a/rpc/lib/client/http_client.go +++ b/rpc/lib/client/http_client.go @@ -21,6 +21,7 @@ import ( type HTTPClient interface { Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) Codec() *amino.Codec + SetCodec(*amino.Codec) } // TODO: Deprecate support for IP:PORT or /path/to/socket @@ -111,6 +112,10 @@ func (c *JSONRPCClient) Codec() *amino.Codec { return c.cdc } +func (c *JSONRPCClient) SetCodec(cdc *amino.Codec) { + c.cdc = cdc +} + //------------------------------------------------------------- // URI takes params as a map @@ -152,6 +157,10 @@ func (c *URIClient) Codec() *amino.Codec { return c.cdc } +func (c *URIClient) SetCodec(cdc *amino.Codec) { + c.cdc = cdc +} + //------------------------------------------------ func unmarshalResponseBytes(cdc *amino.Codec, responseBytes []byte, result interface{}) (interface{}, error) { diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 43aa02f40..de5df6a2c 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -230,6 +230,10 @@ func (c *WSClient) Codec() *amino.Codec { return c.cdc } +func (c *WSClient) SetCodec(cdc *amino.Codec) { + c.cdc = cdc +} + /////////////////////////////////////////////////////////////////////////////// // Private methods diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index c9dc6d219..38c2dfaf0 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/tendermint/tmlibs/log" @@ -26,11 +27,15 @@ var globalConfig *cfg.Config func waitForRPC() { laddr := GetConfig().RPC.ListenAddress client := rpcclient.NewJSONRPCClient(laddr) + ctypes.RegisterAmino(client.Codec()) result := new(ctypes.ResultStatus) for { _, err := client.Call("status", map[string]interface{}{}, result) if err == nil { return + } else { + fmt.Println("error", err) + time.Sleep(time.Millisecond) } } } diff --git a/types/events.go b/types/events.go index 1323b65bb..cdffc0ee5 100644 --- a/types/events.go +++ b/types/events.go @@ -35,20 +35,19 @@ const ( // ENCODING / DECODING /////////////////////////////////////////////////////////////////////////////// -var ( - EventDataNameNewBlock = "new_block" - EventDataNameNewBlockHeader = "new_block_header" - EventDataNameTx = "tx" - EventDataNameRoundState = "round_state" - EventDataNameVote = "vote" - EventDataNameProposalHeartbeat = "proposal_heartbeat" -) - // implements events.EventData type TMEventData interface { + AssertIsTMEventData() // empty interface } +func (_ EventDataNewBlock) AssertIsTMEventData() {} +func (_ EventDataNewBlockHeader) AssertIsTMEventData() {} +func (_ EventDataTx) AssertIsTMEventData() {} +func (_ EventDataRoundState) AssertIsTMEventData() {} +func (_ EventDataVote) AssertIsTMEventData() {} +func (_ EventDataProposalHeartbeat) AssertIsTMEventData() {} + func RegisterEventDatas(cdc *amino.Codec) { cdc.RegisterInterface((*TMEventData)(nil), nil) cdc.RegisterConcrete(EventDataNewBlock{}, "tendermint/EventDataNameNewBlock", nil)