From 44a3fbf109b69780b278d36ec62b4552d8b4d36a Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 15 Nov 2019 14:16:04 +0400 Subject: [PATCH] rpc/lib/client & server: try to conform to JSON-RPC 2.0 spec (#4141) https://www.jsonrpc.org/specification What is done in this PR: JSONRPCClient: validate that Response.ID matches Request.ID I wanted to do the same for the WSClient, but since we're sending events as responses, not notifications, checking IDs would require storing them in memory indefinitely (and we won't be able to remove them upon client unsubscribing because ID is different then). Request.ID is now optional. Notification is a Request without an ID. Previously "" or 0 were considered as notifications Remove #event suffix from ID from an event response (partially fixes #2949) ID must be either string, int or null AND must be equal to request's ID. Now, because we've implemented events as responses, WS clients are tripping when they see Response.ID("0#event") != Request.ID("0"). Implementing events as requests would require a lot of time (~ 2 days to completely rewrite WS client and server) generate unique ID for each request switch to integer IDs instead of "json-client-XYZ" id=0 method=/subscribe id=0 result=... id=1 method=/abci_query id=1 result=... > send events (resulting from /subscribe) as requests+notifications (not responses) this will require a lot of work. probably not worth it * rpc: generate an unique ID for each request in conformance with JSON-RPC spec * WSClient: check for unsolicited responses * fix golangci warnings * save commit * fix errors * remove ID from responses from subscribe Refs #2949 * clients are safe for concurrent access * tm-bench: switch to int ID * fixes after my own review * comment out sentIDs in WSClient see commit body for the reason * remove body.Close it will be closed automatically * stop ws connection outside of write/read routines also, use t.Rate in tm-bench indexer when calculating ID fix gocritic issues * update swagger.yaml * Apply suggestions from code review * fix stylecheck and golint linter warnings * update changelog * update changelog2 --- CHANGELOG_PENDING.md | 13 + .../subscribing-to-events-via-websocket.md | 4 +- lite/proxy/wrapper.go | 3 +- rpc/client/httpclient.go | 6 + rpc/client/rpc_test.go | 5 +- rpc/core/events.go | 6 +- rpc/lib/client/decode.go | 129 +++ rpc/lib/client/encode.go | 46 + .../{http_client.go => http_json_client.go} | 409 +++----- ...lient_test.go => http_json_client_test.go} | 0 rpc/lib/client/http_uri_client.go | 71 ++ rpc/lib/client/ws_client.go | 68 +- rpc/lib/client/ws_client_test.go | 10 +- rpc/lib/doc.go | 3 +- rpc/lib/server/handlers.go | 926 ------------------ rpc/lib/server/http_json_handler.go | 248 +++++ ...lers_test.go => http_json_handler_test.go} | 86 +- rpc/lib/server/http_server.go | 2 +- rpc/lib/server/http_uri_handler.go | 195 ++++ rpc/lib/server/rpc_func.go | 103 ++ rpc/lib/server/ws_handler.go | 448 +++++++++ rpc/lib/server/ws_handler_test.go | 59 ++ rpc/lib/types/types.go | 72 +- rpc/lib/types/types_test.go | 7 +- rpc/swagger/swagger.yaml | 70 +- tools/tm-bench/transacter.go | 2 +- 26 files changed, 1618 insertions(+), 1373 deletions(-) create mode 100644 rpc/lib/client/decode.go create mode 100644 rpc/lib/client/encode.go rename rpc/lib/client/{http_client.go => http_json_client.go} (58%) rename rpc/lib/client/{http_client_test.go => http_json_client_test.go} (100%) create mode 100644 rpc/lib/client/http_uri_client.go delete mode 100644 rpc/lib/server/handlers.go create mode 100644 rpc/lib/server/http_json_handler.go rename rpc/lib/server/{handlers_test.go => http_json_handler_test.go} (76%) create mode 100644 rpc/lib/server/http_uri_handler.go create mode 100644 rpc/lib/server/rpc_func.go create mode 100644 rpc/lib/server/ws_handler.go create mode 100644 rpc/lib/server/ws_handler_test.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 498104a8c..a7efc7e51 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -28,6 +28,17 @@ program](https://hackerone.com/tendermint). } } ``` + - [rpc] [\#4141](https://github.com/tendermint/tendermint/pull/4141) Remove `#event` suffix from the ID in event responses. + `{"jsonrpc": "2.0", "id": 0, "result": ...}` + - [rpc] [\#4141](https://github.com/tendermint/tendermint/pull/4141) Switch to integer IDs instead of `json-client-XYZ` + ``` + id=0 method=/subscribe + id=0 result=... + id=1 method=/abci_query + id=1 result=... + ``` + * ID is unique for each request; + * Request.ID is now optional. Notification is a Request without an ID. Previously ID="" or ID=0 were considered as notifications. - Apps @@ -63,3 +74,5 @@ program](https://hackerone.com/tendermint). - [state] [\#4104](https://github.com/tendermint/tendermint/pull/4104) txindex/kv: Fsync data to disk immediately after receiving it (@guagualvcha) - [state] [\#4095](https://github.com/tendermint/tendermint/pull/4095) txindex/kv: Return an error if there's one when the user searches for a tx (hash=X) (@hsyis) - [rpc/lib] [\#4051](https://github.com/tendermint/tendermint/pull/4131) Fix RPC client, which was previously resolving https protocol to http (@yenkhoon) +- [rpc] [\#4141](https://github.com/tendermint/tendermint/pull/4141) JSONRPCClient: validate that Response.ID matches Request.ID +- [rpc] [\#4141](https://github.com/tendermint/tendermint/pull/4141) WSClient: check for unsolicited responses diff --git a/docs/app-dev/subscribing-to-events-via-websocket.md b/docs/app-dev/subscribing-to-events-via-websocket.md index c07a343cd..afedc1d59 100644 --- a/docs/app-dev/subscribing-to-events-via-websocket.md +++ b/docs/app-dev/subscribing-to-events-via-websocket.md @@ -17,7 +17,7 @@ method via Websocket. { "jsonrpc": "2.0", "method": "subscribe", - "id": "0", + "id": 0, "params": { "query": "tm.event='NewBlock'" } @@ -44,7 +44,7 @@ Response: ``` { "jsonrpc": "2.0", - "id": "0#event", + "id": 0, "result": { "query": "tm.event='ValidatorSetUpdates'", "data": { diff --git a/lite/proxy/wrapper.go b/lite/proxy/wrapper.go index 2f608cfa9..d3a69cc3f 100644 --- a/lite/proxy/wrapper.go +++ b/lite/proxy/wrapper.go @@ -2,7 +2,6 @@ package proxy import ( "context" - "fmt" cmn "github.com/tendermint/tendermint/libs/common" @@ -165,7 +164,7 @@ func (w Wrapper) SubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.Resul ctx.WSConn.TryWriteRPCResponse( rpctypes.NewRPCSuccessResponse( ctx.WSConn.Codec(), - rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)), + ctx.JSONReq.ID, resultEvent, )) case <-w.Client.Quit(): diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index a982f582c..27712ce31 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -12,6 +12,7 @@ import ( amino "github.com/tendermint/go-amino" cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpcclient "github.com/tendermint/tendermint/rpc/lib/client" @@ -112,6 +113,10 @@ func NewHTTPWithClient(remote, wsEndpoint string, client *http.Client) *HTTP { var _ Client = (*HTTP)(nil) +func (c *HTTP) SetLogger(l log.Logger) { + c.WSEvents.SetLogger(l) +} + // NewBatch creates a new batch client for this HTTP client. func (c *HTTP) NewBatch() *BatchHTTP { rpcBatch := c.rpc.NewRequestBatch() @@ -395,6 +400,7 @@ func (w *WSEvents) OnStart() error { w.redoSubscriptionsAfter(0 * time.Second) })) w.ws.SetCodec(w.cdc) + w.ws.SetLogger(w.Logger) err := w.ws.Start() if err != nil { diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index f1718fec5..34d41d0e1 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -17,6 +17,7 @@ import ( "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/tmhash" cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/rpc/client" @@ -28,7 +29,9 @@ import ( func getHTTPClient() *client.HTTP { rpcAddr := rpctest.GetConfig().RPC.ListenAddress - return client.NewHTTP(rpcAddr, "/websocket") + c := client.NewHTTP(rpcAddr, "/websocket") + c.SetLogger(log.TestingLogger()) + return c } func getLocalClient() *client.Local { diff --git a/rpc/core/events.go b/rpc/core/events.go index 61f21c000..49bf7be5d 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -46,7 +46,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er ctx.WSConn.TryWriteRPCResponse( rpctypes.NewRPCSuccessResponse( ctx.WSConn.Codec(), - rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)), + ctx.JSONReq.ID, resultEvent, )) case <-sub.Cancelled(): @@ -58,8 +58,8 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er reason = sub.Err().Error() } ctx.WSConn.TryWriteRPCResponse( - rpctypes.RPCServerError(rpctypes.JSONRPCStringID( - fmt.Sprintf("%v#event", ctx.JSONReq.ID)), + rpctypes.RPCServerError( + ctx.JSONReq.ID, fmt.Errorf("subscription was cancelled (reason: %s)", reason), )) } diff --git a/rpc/lib/client/decode.go b/rpc/lib/client/decode.go new file mode 100644 index 000000000..dd4a2e4c6 --- /dev/null +++ b/rpc/lib/client/decode.go @@ -0,0 +1,129 @@ +package rpcclient + +import ( + "encoding/json" + + "github.com/pkg/errors" + + amino "github.com/tendermint/go-amino" + + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +func unmarshalResponseBytes( + cdc *amino.Codec, + responseBytes []byte, + expectedID types.JSONRPCIntID, + result interface{}, +) (interface{}, error) { + + // Read response. If rpc/core/types is imported, the result will unmarshal + // into the correct type. + response := &types.RPCResponse{} + if err := json.Unmarshal(responseBytes, response); err != nil { + return nil, errors.Wrap(err, "error unmarshalling") + } + + if response.Error != nil { + return nil, response.Error + } + + if err := validateAndVerifyID(response, expectedID); err != nil { + return nil, errors.Wrap(err, "wrong ID") + } + + // Unmarshal the RawMessage into the result. + if err := cdc.UnmarshalJSON(response.Result, result); err != nil { + return nil, errors.Wrap(err, "error unmarshalling result") + } + + return result, nil +} + +func unmarshalResponseBytesArray( + cdc *amino.Codec, + responseBytes []byte, + expectedIDs []types.JSONRPCIntID, + results []interface{}, +) ([]interface{}, error) { + + var ( + responses []types.RPCResponse + ) + + if err := json.Unmarshal(responseBytes, &responses); err != nil { + return nil, errors.Wrap(err, "error unmarshalling") + } + + // No response error checking here as there may be a mixture of successful + // and unsuccessful responses. + + if len(results) != len(responses) { + return nil, errors.Errorf( + "expected %d result objects into which to inject responses, but got %d", + len(responses), + len(results), + ) + } + + // Intersect IDs from responses with expectedIDs. + ids := make([]types.JSONRPCIntID, len(responses)) + var ok bool + for i, resp := range responses { + ids[i], ok = resp.ID.(types.JSONRPCIntID) + if !ok { + return nil, errors.Errorf("expected JSONRPCIntID, got %T", resp.ID) + } + } + if err := validateResponseIDs(ids, expectedIDs); err != nil { + return nil, errors.Wrap(err, "wrong IDs") + } + + for i := 0; i < len(responses); i++ { + if err := cdc.UnmarshalJSON(responses[i].Result, results[i]); err != nil { + return nil, errors.Wrapf(err, "error unmarshalling #%d result", i) + } + } + + return results, nil +} + +func validateResponseIDs(ids, expectedIDs []types.JSONRPCIntID) error { + m := make(map[types.JSONRPCIntID]bool, len(expectedIDs)) + for _, expectedID := range expectedIDs { + m[expectedID] = true + } + + for i, id := range ids { + if m[id] { + delete(m, id) + } else { + return errors.Errorf("unsolicited ID #%d: %v", i, id) + } + } + + return nil +} + +// From the JSON-RPC 2.0 spec: +// id: It MUST be the same as the value of the id member in the Request Object. +func validateAndVerifyID(res *types.RPCResponse, expectedID types.JSONRPCIntID) error { + if err := validateResponseID(res.ID); err != nil { + return err + } + if expectedID != res.ID.(types.JSONRPCIntID) { // validateResponseID ensured res.ID has the right type + return errors.Errorf("response ID (%d) does not match request ID (%d)", res.ID, expectedID) + } + return nil +} + +func validateResponseID(id interface{}) error { + if id == nil { + return errors.New("no ID") + } + _, ok := id.(types.JSONRPCIntID) + if !ok { + return errors.Errorf("expected JSONRPCIntID, but got: %T", id) + } + return nil +} diff --git a/rpc/lib/client/encode.go b/rpc/lib/client/encode.go new file mode 100644 index 000000000..227367f59 --- /dev/null +++ b/rpc/lib/client/encode.go @@ -0,0 +1,46 @@ +package rpcclient + +import ( + "fmt" + "net/url" + "reflect" + + amino "github.com/tendermint/go-amino" +) + +func argsToURLValues(cdc *amino.Codec, args map[string]interface{}) (url.Values, error) { + values := make(url.Values) + if len(args) == 0 { + return values, nil + } + + err := argsToJSON(cdc, args) + if err != nil { + return nil, err + } + + for key, val := range args { + values.Set(key, val.(string)) + } + + return values, nil +} + +func argsToJSON(cdc *amino.Codec, args map[string]interface{}) error { + for k, v := range args { + rt := reflect.TypeOf(v) + isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8 + if isByteSlice { + bytes := reflect.ValueOf(v).Bytes() + args[k] = fmt.Sprintf("0x%X", bytes) + continue + } + + data, err := cdc.MarshalJSON(v) + if err != nil { + return err + } + args[k] = string(data) + } + return nil +} diff --git a/rpc/lib/client/http_client.go b/rpc/lib/client/http_json_client.go similarity index 58% rename from rpc/lib/client/http_client.go rename to rpc/lib/client/http_json_client.go index 0673177c3..d828a3aa6 100644 --- a/rpc/lib/client/http_client.go +++ b/rpc/lib/client/http_json_client.go @@ -7,16 +7,12 @@ import ( "io/ioutil" "net" "net/http" - "net/url" - "reflect" "strings" "sync" "github.com/pkg/errors" - amino "github.com/tendermint/go-amino" - cmn "github.com/tendermint/tendermint/libs/common" types "github.com/tendermint/tendermint/rpc/lib/types" ) @@ -28,131 +24,43 @@ const ( protoTCP = "tcp" ) -// HTTPClient is a common interface for JSONRPCClient and URIClient. +//------------------------------------------------------------- + +// HTTPClient is a common interface for JSON-RPC HTTP clients. type HTTPClient interface { + // Call calls the given method with the params and returns a result. Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) + // Codec returns an amino codec used. Codec() *amino.Codec + // SetCodec sets an amino codec. SetCodec(*amino.Codec) } -// protocol - client's protocol (for example, "http", "https", "wss", "ws", "tcp") -// trimmedS - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80") with "/" replaced with "." -func toClientAddrAndParse(remoteAddr string) (network string, trimmedS string, err error) { - protocol, address, err := parseRemoteAddr(remoteAddr) - if err != nil { - return "", "", err - } - - // protocol to use for http operations, to support both http and https - var clientProtocol string - // default to http for unknown protocols (ex. tcp) - switch protocol { - case protoHTTP, protoHTTPS, protoWS, protoWSS: - clientProtocol = protocol - default: - clientProtocol = protoHTTP - } - - // replace / with . for http requests (kvstore domain) - trimmedAddress := strings.Replace(address, "/", ".", -1) - return clientProtocol, trimmedAddress, nil -} - -func toClientAddress(remoteAddr string) (string, error) { - clientProtocol, trimmedAddress, err := toClientAddrAndParse(remoteAddr) - if err != nil { - return "", err - } - return clientProtocol + "://" + trimmedAddress, nil -} - -// network - name of the network (for example, "tcp", "unix") -// s - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80") -// TODO: Deprecate support for IP:PORT or /path/to/socket -func parseRemoteAddr(remoteAddr string) (network string, s string, err error) { - parts := strings.SplitN(remoteAddr, "://", 2) - var protocol, address string - switch { - case len(parts) == 1: - // default to tcp if nothing specified - protocol, address = protoTCP, remoteAddr - case len(parts) == 2: - protocol, address = parts[0], parts[1] - default: - return "", "", fmt.Errorf("invalid addr: %s", remoteAddr) - } - - return protocol, address, nil -} - -func makeErrorDialer(err error) func(string, string) (net.Conn, error) { - return func(_ string, _ string) (net.Conn, error) { - return nil, err - } -} - -func makeHTTPDialer(remoteAddr string) func(string, string) (net.Conn, error) { - protocol, address, err := parseRemoteAddr(remoteAddr) - if err != nil { - return makeErrorDialer(err) - } - - // accept http(s) as an alias for tcp - switch protocol { - case protoHTTP, protoHTTPS: - protocol = protoTCP - } - - return func(proto, addr string) (net.Conn, error) { - return net.Dial(protocol, address) - } -} - -// DefaultHTTPClient is used to create an http client with some default parameters. -// We overwrite the http.Client.Dial so we can do http over tcp or unix. -// remoteAddr should be fully featured (eg. with tcp:// or unix://) -func DefaultHTTPClient(remoteAddr string) *http.Client { - return &http.Client{ - Transport: &http.Transport{ - // Set to true to prevent GZIP-bomb DoS attacks - DisableCompression: true, - Dial: makeHTTPDialer(remoteAddr), - }, - } -} - -//------------------------------------------------------------------------------------ - -// jsonRPCBufferedRequest encapsulates a single buffered request, as well as its -// anticipated response structure. -type jsonRPCBufferedRequest struct { - request types.RPCRequest - result interface{} // The result will be deserialized into this object. +// JSONRPCCaller implementers can facilitate calling the JSON-RPC endpoint. +type JSONRPCCaller interface { + Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) } -// JSONRPCRequestBatch allows us to buffer multiple request/response structures -// into a single batch request. Note that this batch acts like a FIFO queue, and -// is thread-safe. -type JSONRPCRequestBatch struct { - client *JSONRPCClient - - mtx sync.Mutex - requests []*jsonRPCBufferedRequest -} +//------------------------------------------------------------- -// JSONRPCClient takes params as a slice +// JSONRPCClient is a JSON-RPC client, which sends POST HTTP requests to the +// remote server. +// +// Request values are amino encoded. Response is expected to be amino encoded. +// New amino codec is used if no other codec was set using SetCodec. +// +// JSONRPCClient is safe for concurrent use by multiple goroutines. type JSONRPCClient struct { address string client *http.Client - id types.JSONRPCStringID cdc *amino.Codec -} -// JSONRPCCaller implementers can facilitate calling the JSON RPC endpoint. -type JSONRPCCaller interface { - Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) + mtx sync.Mutex + nextReqID int } +var _ HTTPClient = (*JSONRPCClient)(nil) + // Both JSONRPCClient and JSONRPCRequestBatch can facilitate calls to the JSON // RPC endpoint. var _ JSONRPCCaller = (*JSONRPCClient)(nil) @@ -178,36 +86,43 @@ func NewJSONRPCClientWithHTTPClient(remote string, client *http.Client) *JSONRPC return &JSONRPCClient{ address: clientAddress, client: client, - id: types.JSONRPCStringID("jsonrpc-client-" + cmn.RandStr(8)), cdc: amino.NewCodec(), } } -// Call will send the request for the given method through to the RPC endpoint -// immediately, without buffering of requests. +// Call issues a POST HTTP request. Requests are JSON encoded. Content-Type: +// text/json. func (c *JSONRPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { - request, err := types.MapToRequest(c.cdc, c.id, method, params) + id := c.nextRequestID() + + request, err := types.MapToRequest(c.cdc, id, method, params) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to encode params") } + requestBytes, err := json.Marshal(request) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to marshal request") } + requestBuf := bytes.NewBuffer(requestBytes) httpResponse, err := c.client.Post(c.address, "text/json", requestBuf) if err != nil { - return nil, err + return nil, errors.Wrap(err, "Post failed") } defer httpResponse.Body.Close() // nolint: errcheck responseBytes, err := ioutil.ReadAll(httpResponse.Body) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to read response body") } - return unmarshalResponseBytes(c.cdc, responseBytes, c.id, result) + + return unmarshalResponseBytes(c.cdc, responseBytes, id, result) } +func (c *JSONRPCClient) Codec() *amino.Codec { return c.cdc } +func (c *JSONRPCClient) SetCodec(cdc *amino.Codec) { c.cdc = cdc } + // NewRequestBatch starts a batch of requests for this client. func (c *JSONRPCClient) NewRequestBatch() *JSONRPCRequestBatch { return &JSONRPCRequestBatch{ @@ -223,33 +138,59 @@ func (c *JSONRPCClient) sendBatch(requests []*jsonRPCBufferedRequest) ([]interfa reqs = append(reqs, req.request) results = append(results, req.result) } + // serialize the array of requests into a single JSON object requestBytes, err := json.Marshal(reqs) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to marshal requests") } + httpResponse, err := c.client.Post(c.address, "text/json", bytes.NewBuffer(requestBytes)) if err != nil { - return nil, err + return nil, errors.Wrap(err, "Post failed") } defer httpResponse.Body.Close() // nolint: errcheck responseBytes, err := ioutil.ReadAll(httpResponse.Body) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to read response body") } - return unmarshalResponseBytesArray(c.cdc, responseBytes, c.id, results) + + // collect ids to check responses IDs in unmarshalResponseBytesArray + ids := make([]types.JSONRPCIntID, len(requests)) + for i, req := range requests { + ids[i] = req.request.ID.(types.JSONRPCIntID) + } + + return unmarshalResponseBytesArray(c.cdc, responseBytes, ids, results) } -func (c *JSONRPCClient) Codec() *amino.Codec { - return c.cdc +func (c *JSONRPCClient) nextRequestID() types.JSONRPCIntID { + c.mtx.Lock() + id := c.nextReqID + c.nextReqID++ + c.mtx.Unlock() + return types.JSONRPCIntID(id) } -func (c *JSONRPCClient) SetCodec(cdc *amino.Codec) { - c.cdc = cdc +//------------------------------------------------------------------------------------ + +// jsonRPCBufferedRequest encapsulates a single buffered request, as well as its +// anticipated response structure. +type jsonRPCBufferedRequest struct { + request types.RPCRequest + result interface{} // The result will be deserialized into this object. } -//------------------------------------------------------------- +// JSONRPCRequestBatch allows us to buffer multiple request/response structures +// into a single batch request. Note that this batch acts like a FIFO queue, and +// is thread-safe. +type JSONRPCRequestBatch struct { + client *JSONRPCClient + + mtx sync.Mutex + requests []*jsonRPCBufferedRequest +} // Count returns the number of enqueued requests waiting to be sent. func (b *JSONRPCRequestBatch) Count() int { @@ -296,7 +237,8 @@ func (b *JSONRPCRequestBatch) Call( params map[string]interface{}, result interface{}, ) (interface{}, error) { - request, err := types.MapToRequest(b.client.cdc, b.client.id, method, params) + id := b.client.nextRequestID() + request, err := types.MapToRequest(b.client.cdc, id, method, params) if err != nil { return nil, err } @@ -306,173 +248,88 @@ func (b *JSONRPCRequestBatch) Call( //------------------------------------------------------------- -// URI takes params as a map -type URIClient struct { - address string - client *http.Client - cdc *amino.Codec -} - -// The function panics if the provided remote is invalid. -func NewURIClient(remote string) *URIClient { - clientAddress, err := toClientAddress(remote) +// protocol - client's protocol (for example, "http", "https", "wss", "ws", "tcp") +// trimmedS - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80") with "/" replaced with "." +func toClientAddrAndParse(remoteAddr string) (network string, trimmedS string, err error) { + protocol, address, err := parseRemoteAddr(remoteAddr) if err != nil { - panic(fmt.Sprintf("invalid remote %s: %s", remote, err)) - } - return &URIClient{ - address: clientAddress, - client: DefaultHTTPClient(remote), - cdc: amino.NewCodec(), + return "", "", err } -} -func (c *URIClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { - values, err := argsToURLValues(c.cdc, params) - if err != nil { - return nil, err - } - // log.Info(Fmt("URI request to %v (%v): %v", c.address, method, values)) - resp, err := c.client.PostForm(c.address+"/"+method, values) - if err != nil { - return nil, err + // protocol to use for http operations, to support both http and https + var clientProtocol string + // default to http for unknown protocols (ex. tcp) + switch protocol { + case protoHTTP, protoHTTPS, protoWS, protoWSS: + clientProtocol = protocol + default: + clientProtocol = protoHTTP } - defer resp.Body.Close() // nolint: errcheck - responseBytes, err := ioutil.ReadAll(resp.Body) + // replace / with . for http requests (kvstore domain) + trimmedAddress := strings.Replace(address, "/", ".", -1) + return clientProtocol, trimmedAddress, nil +} + +func toClientAddress(remoteAddr string) (string, error) { + clientProtocol, trimmedAddress, err := toClientAddrAndParse(remoteAddr) if err != nil { - return nil, err + return "", err } - return unmarshalResponseBytes(c.cdc, responseBytes, "", result) + return clientProtocol + "://" + trimmedAddress, nil } -func (c *URIClient) Codec() *amino.Codec { - return c.cdc -} +// network - name of the network (for example, "tcp", "unix") +// s - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80") +// TODO: Deprecate support for IP:PORT or /path/to/socket +func parseRemoteAddr(remoteAddr string) (network string, s string, err error) { + parts := strings.SplitN(remoteAddr, "://", 2) + var protocol, address string + switch len(parts) { + case 1: + // default to tcp if nothing specified + protocol, address = protoTCP, remoteAddr + case 2: + protocol, address = parts[0], parts[1] + default: + return "", "", fmt.Errorf("invalid addr: %s", remoteAddr) + } -func (c *URIClient) SetCodec(cdc *amino.Codec) { - c.cdc = cdc + return protocol, address, nil } -//------------------------------------------------ - -func unmarshalResponseBytes( - cdc *amino.Codec, - responseBytes []byte, - expectedID types.JSONRPCStringID, - result interface{}, -) (interface{}, error) { - // Read response. If rpc/core/types is imported, the result will unmarshal - // into the correct type. - // log.Notice("response", "response", string(responseBytes)) - var err error - response := &types.RPCResponse{} - err = json.Unmarshal(responseBytes, response) - if err != nil { - return nil, errors.Wrap(err, "error unmarshalling rpc response") - } - if response.Error != nil { - return nil, errors.Wrap(response.Error, "response error") - } - // From the JSON-RPC 2.0 spec: - // id: It MUST be the same as the value of the id member in the Request Object. - if err := validateResponseID(response, expectedID); err != nil { +func makeErrorDialer(err error) func(string, string) (net.Conn, error) { + return func(_ string, _ string) (net.Conn, error) { return nil, err } - // Unmarshal the RawMessage into the result. - err = cdc.UnmarshalJSON(response.Result, result) - if err != nil { - return nil, errors.Wrap(err, "error unmarshalling rpc response result") - } - return result, nil } -func unmarshalResponseBytesArray( - cdc *amino.Codec, - responseBytes []byte, - expectedID types.JSONRPCStringID, - results []interface{}, -) ([]interface{}, error) { - var ( - err error - responses []types.RPCResponse - ) - err = json.Unmarshal(responseBytes, &responses) +func makeHTTPDialer(remoteAddr string) func(string, string) (net.Conn, error) { + protocol, address, err := parseRemoteAddr(remoteAddr) if err != nil { - return nil, errors.Wrap(err, "error unmarshalling rpc response") - } - // No response error checking here as there may be a mixture of successful - // and unsuccessful responses. - - if len(results) != len(responses) { - return nil, errors.Errorf( - "expected %d result objects into which to inject responses, but got %d", - len(responses), - len(results), - ) - } - - for i, response := range responses { - response := response - // From the JSON-RPC 2.0 spec: - // id: It MUST be the same as the value of the id member in the Request Object. - if err := validateResponseID(&response, expectedID); err != nil { - return nil, errors.Wrapf(err, "failed to validate response ID in response %d", i) - } - if err := cdc.UnmarshalJSON(responses[i].Result, results[i]); err != nil { - return nil, errors.Wrap(err, "error unmarshalling rpc response result") - } + return makeErrorDialer(err) } - return results, nil -} -func validateResponseID(res *types.RPCResponse, expectedID types.JSONRPCStringID) error { - // we only validate a response ID if the expected ID is non-empty - if len(expectedID) == 0 { - return nil - } - if res.ID == nil { - return errors.Errorf("missing ID in response") - } - id, ok := res.ID.(types.JSONRPCStringID) - if !ok { - return errors.Errorf("expected ID string in response but got: %v", id) - } - if expectedID != id { - return errors.Errorf("response ID (%s) does not match request ID (%s)", id, expectedID) + // accept http(s) as an alias for tcp + switch protocol { + case protoHTTP, protoHTTPS: + protocol = protoTCP } - return nil -} -func argsToURLValues(cdc *amino.Codec, args map[string]interface{}) (url.Values, error) { - values := make(url.Values) - if len(args) == 0 { - return values, nil - } - err := argsToJSON(cdc, args) - if err != nil { - return nil, err - } - for key, val := range args { - values.Set(key, val.(string)) + return func(proto, addr string) (net.Conn, error) { + return net.Dial(protocol, address) } - return values, nil } -func argsToJSON(cdc *amino.Codec, args map[string]interface{}) error { - for k, v := range args { - rt := reflect.TypeOf(v) - isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8 - if isByteSlice { - bytes := reflect.ValueOf(v).Bytes() - args[k] = fmt.Sprintf("0x%X", bytes) - continue - } - - data, err := cdc.MarshalJSON(v) - if err != nil { - return err - } - args[k] = string(data) +// DefaultHTTPClient is used to create an http client with some default parameters. +// We overwrite the http.Client.Dial so we can do http over tcp or unix. +// remoteAddr should be fully featured (eg. with tcp:// or unix://) +func DefaultHTTPClient(remoteAddr string) *http.Client { + return &http.Client{ + Transport: &http.Transport{ + // Set to true to prevent GZIP-bomb DoS attacks + DisableCompression: true, + Dial: makeHTTPDialer(remoteAddr), + }, } - return nil } diff --git a/rpc/lib/client/http_client_test.go b/rpc/lib/client/http_json_client_test.go similarity index 100% rename from rpc/lib/client/http_client_test.go rename to rpc/lib/client/http_json_client_test.go diff --git a/rpc/lib/client/http_uri_client.go b/rpc/lib/client/http_uri_client.go new file mode 100644 index 000000000..8efbd6608 --- /dev/null +++ b/rpc/lib/client/http_uri_client.go @@ -0,0 +1,71 @@ +package rpcclient + +import ( + "fmt" + "io/ioutil" + "net/http" + + "github.com/pkg/errors" + + amino "github.com/tendermint/go-amino" + + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +const ( + // URIClientRequestID in a request ID used by URIClient + URIClientRequestID = types.JSONRPCIntID(-1) +) + +// URIClient is a JSON-RPC client, which sends POST form HTTP requests to the +// remote server. +// +// Request values are amino encoded. Response is expected to be amino encoded. +// New amino codec is used if no other codec was set using SetCodec. +// +// URIClient is safe for concurrent use by multiple goroutines. +type URIClient struct { + address string + client *http.Client + cdc *amino.Codec +} + +var _ HTTPClient = (*URIClient)(nil) + +// NewURIClient returns a new client. +// The function panics if the provided remote is invalid. +func NewURIClient(remote string) *URIClient { + clientAddress, err := toClientAddress(remote) + if err != nil { + panic(fmt.Sprintf("invalid remote %s: %s", remote, err)) + } + return &URIClient{ + address: clientAddress, + client: DefaultHTTPClient(remote), + cdc: amino.NewCodec(), + } +} + +// Call issues a POST form HTTP request. +func (c *URIClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { + values, err := argsToURLValues(c.cdc, params) + if err != nil { + return nil, errors.Wrap(err, "failed to encode params") + } + + resp, err := c.client.PostForm(c.address+"/"+method, values) + if err != nil { + return nil, errors.Wrap(err, "PostForm failed") + } + defer resp.Body.Close() // nolint: errcheck + + responseBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Wrap(err, "failed to read response body") + } + + return unmarshalResponseBytes(c.cdc, responseBytes, URIClientRequestID, result) +} + +func (c *URIClient) Codec() *amino.Codec { return c.cdc } +func (c *URIClient) SetCodec(cdc *amino.Codec) { c.cdc = cdc } diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index b21eefa7c..976d24c70 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -25,8 +25,10 @@ const ( defaultPingPeriod = 0 ) -// WSClient is a WebSocket client. The methods of WSClient are safe for use by -// multiple goroutines. +// WSClient is a JSON-RPC client, which uses WebSocket for communication with +// the remote server. +// +// WSClient is safe for concurrent use by multiple goroutines. type WSClient struct { // nolint: maligned conn *websocket.Conn cdc *amino.Codec @@ -35,7 +37,8 @@ type WSClient struct { // nolint: maligned Endpoint string // /websocket/url/endpoint Dialer func(string, string) (net.Conn, error) - // Single user facing channel to read RPCResponses from, closed only when the client is being stopped. + // Single user facing channel to read RPCResponses from, closed only when the + // client is being stopped. ResponsesCh chan types.RPCResponse // Callback, which will be called each time after successful reconnect. @@ -58,6 +61,8 @@ type WSClient struct { // nolint: maligned mtx sync.RWMutex sentLastPingAt time.Time reconnecting bool + nextReqID int + // sentIDs map[types.JSONRPCIntID]bool // IDs of the requests currently in flight // Time allowed to write a message to the server. 0 means block until operation succeeds. writeWait time.Duration @@ -101,6 +106,8 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSCli writeWait: defaultWriteWait, pingPeriod: defaultPingPeriod, protocol: protocol, + + // sentIDs: make(map[types.JSONRPCIntID]bool), } c.BaseService = *cmn.NewBaseService(nil, "WSClient", c) for _, option := range options { @@ -151,7 +158,7 @@ func OnReconnect(cb func()) func(*WSClient) { // String returns WS client full address. func (c *WSClient) String() string { - return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint) + return fmt.Sprintf("WSClient{%s (%s)}", c.Address, c.Endpoint) } // OnStart implements cmn.Service by dialing a server and creating read and @@ -210,42 +217,48 @@ func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error { select { case c.send <- request: c.Logger.Info("sent a request", "req", request) + // c.mtx.Lock() + // c.sentIDs[request.ID.(types.JSONRPCIntID)] = true + // c.mtx.Unlock() return nil case <-ctx.Done(): return ctx.Err() } } -// Call the given method. See Send description. +// Call enqueues a call request onto the Send queue. Requests are JSON encoded. func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error { - request, err := types.MapToRequest(c.cdc, types.JSONRPCStringID("ws-client"), method, params) + request, err := types.MapToRequest(c.cdc, c.nextRequestID(), method, params) if err != nil { return err } return c.Send(ctx, request) } -// CallWithArrayParams the given method with params in a form of array. See -// Send description. +// CallWithArrayParams enqueues a call request onto the Send queue. Params are +// in a form of array (e.g. []interface{}{"abcd"}). Requests are JSON encoded. func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error { - request, err := types.ArrayToRequest(c.cdc, types.JSONRPCStringID("ws-client"), method, params) + request, err := types.ArrayToRequest(c.cdc, c.nextRequestID(), method, params) if err != nil { return err } return c.Send(ctx, request) } -func (c *WSClient) Codec() *amino.Codec { - return c.cdc -} - -func (c *WSClient) SetCodec(cdc *amino.Codec) { - c.cdc = cdc -} +func (c *WSClient) Codec() *amino.Codec { return c.cdc } +func (c *WSClient) SetCodec(cdc *amino.Codec) { c.cdc = cdc } /////////////////////////////////////////////////////////////////////////////// // Private methods +func (c *WSClient) nextRequestID() types.JSONRPCIntID { + c.mtx.Lock() + id := c.nextReqID + c.nextReqID++ + c.mtx.Unlock() + return types.JSONRPCIntID(id) +} + func (c *WSClient) dial() error { dialer := &websocket.Dialer{ NetDial: c.Dialer, @@ -473,10 +486,31 @@ func (c *WSClient) readRoutine() { c.Logger.Error("failed to parse response", "err", err, "data", string(data)) continue } - c.Logger.Info("got response", "resp", response.Result) + + if err = validateResponseID(response.ID); err != nil { + c.Logger.Error("error in response ID", "id", response.ID, "err", err) + continue + } + + // TODO: events resulting from /subscribe do not work with -> + // because they are implemented as responses with the subscribe request's + // ID. According to the spec, they should be notifications (requests + // without IDs). + // https://github.com/tendermint/tendermint/issues/2949 + // c.mtx.Lock() + // if _, ok := c.sentIDs[response.ID.(types.JSONRPCIntID)]; !ok { + // c.Logger.Error("unsolicited response ID", "id", response.ID, "expected", c.sentIDs) + // c.mtx.Unlock() + // continue + // } + // delete(c.sentIDs, response.ID.(types.JSONRPCIntID)) + // c.mtx.Unlock() // Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking // c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop // both readRoutine and writeRoutine + + c.Logger.Info("got response", "id", response.ID, "result", fmt.Sprintf("%X", response.Result)) + select { case <-c.Quit(): case c.ResponsesCh <- response: diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index 1babdae92..67dfd418d 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -35,11 +35,17 @@ func (h *myHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } defer conn.Close() // nolint: errcheck for { - messageType, _, err := conn.ReadMessage() + messageType, in, err := conn.ReadMessage() if err != nil { return } + var req types.RPCRequest + err = json.Unmarshal(in, &req) + if err != nil { + panic(err) + } + h.mtx.RLock() if h.closeConnAfterRead { if err := conn.Close(); err != nil { @@ -49,7 +55,7 @@ func (h *myHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.mtx.RUnlock() res := json.RawMessage(`{}`) - emptyRespBytes, _ := json.Marshal(types.RPCResponse{Result: res}) + emptyRespBytes, _ := json.Marshal(types.RPCResponse{Result: res, ID: req.ID}) if err := conn.WriteMessage(messageType, emptyRespBytes); err != nil { return } diff --git a/rpc/lib/doc.go b/rpc/lib/doc.go index aa9638bfd..d22c2fc87 100644 --- a/rpc/lib/doc.go +++ b/rpc/lib/doc.go @@ -1,4 +1,5 @@ -// HTTP RPC server supporting calls via uri params, jsonrpc, and jsonrpc over websockets +// HTTP RPC server supporting calls via uri params, jsonrpc over HTTP, and jsonrpc over +// websockets // // Client Requests // diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go deleted file mode 100644 index a3eb14bd4..000000000 --- a/rpc/lib/server/handlers.go +++ /dev/null @@ -1,926 +0,0 @@ -package rpcserver - -import ( - "bytes" - "context" - "encoding/hex" - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "reflect" - "runtime/debug" - "sort" - "strings" - "time" - - "github.com/gorilla/websocket" - "github.com/pkg/errors" - - amino "github.com/tendermint/go-amino" - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - types "github.com/tendermint/tendermint/rpc/lib/types" -) - -// RegisterRPCFuncs adds a route for each function in the funcMap, -// as well as general jsonrpc and websocket handlers for all functions. -// "result" is the interface on which the result objects are registered, -// and is popualted with every RPCResponse -func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, cdc *amino.Codec, logger log.Logger) { - // HTTP endpoints - for funcName, rpcFunc := range funcMap { - mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc, cdc, logger)) - } - - // JSONRPC endpoints - mux.HandleFunc("/", handleInvalidJSONRPCPaths(makeJSONRPCHandler(funcMap, cdc, logger))) -} - -//------------------------------------- -// function introspection - -// RPCFunc contains the introspected type information for a function -type RPCFunc struct { - f reflect.Value // underlying rpc function - args []reflect.Type // type of each function arg - returns []reflect.Type // type of each return arg - argNames []string // name of each argument - ws bool // websocket only -} - -// NewRPCFunc wraps a function for introspection. -// f is the function, args are comma separated argument names -func NewRPCFunc(f interface{}, args string) *RPCFunc { - return newRPCFunc(f, args, false) -} - -// NewWSRPCFunc wraps a function for introspection and use in the websockets. -func NewWSRPCFunc(f interface{}, args string) *RPCFunc { - return newRPCFunc(f, args, true) -} - -func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc { - var argNames []string - if args != "" { - argNames = strings.Split(args, ",") - } - return &RPCFunc{ - f: reflect.ValueOf(f), - args: funcArgTypes(f), - returns: funcReturnTypes(f), - argNames: argNames, - ws: ws, - } -} - -// return a function's argument types -func funcArgTypes(f interface{}) []reflect.Type { - t := reflect.TypeOf(f) - n := t.NumIn() - typez := make([]reflect.Type, n) - for i := 0; i < n; i++ { - typez[i] = t.In(i) - } - return typez -} - -// return a function's return types -func funcReturnTypes(f interface{}) []reflect.Type { - t := reflect.TypeOf(f) - n := t.NumOut() - typez := make([]reflect.Type, n) - for i := 0; i < n; i++ { - typez[i] = t.Out(i) - } - return typez -} - -// function introspection -//----------------------------------------------------------------------------- -// rpc.json - -// jsonrpc calls grab the given method's function info and runs reflect.Call -func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger log.Logger) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - b, err := ioutil.ReadAll(r.Body) - if err != nil { - WriteRPCResponseHTTP( - w, - types.RPCInvalidRequestError( - types.JSONRPCStringID(""), - errors.Wrap(err, "error reading request body"), - ), - ) - return - } - // if its an empty request (like from a browser), - // just display a list of functions - if len(b) == 0 { - writeListOfEndpoints(w, r, funcMap) - return - } - - // first try to unmarshal the incoming request as an array of RPC requests - var ( - requests []types.RPCRequest - responses []types.RPCResponse - ) - if err := json.Unmarshal(b, &requests); err != nil { - // next, try to unmarshal as a single request - var request types.RPCRequest - if err := json.Unmarshal(b, &request); err != nil { - WriteRPCResponseHTTP( - w, - types.RPCParseError( - types.JSONRPCStringID(""), - errors.Wrap(err, "error unmarshalling request"), - ), - ) - return - } - requests = []types.RPCRequest{request} - } - - for _, request := range requests { - request := request - // A Notification is a Request object without an "id" member. - // The Server MUST NOT reply to a Notification, including those that are within a batch request. - if request.ID == types.JSONRPCStringID("") { - logger.Debug( - "HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)", - ) - continue - } - if len(r.URL.Path) > 1 { - responses = append( - responses, - types.RPCInvalidRequestError(request.ID, errors.Errorf("path %s is invalid", r.URL.Path)), - ) - continue - } - rpcFunc, ok := funcMap[request.Method] - if !ok || rpcFunc.ws { - responses = append(responses, types.RPCMethodNotFoundError(request.ID)) - continue - } - ctx := &types.Context{JSONReq: &request, HTTPReq: r} - args := []reflect.Value{reflect.ValueOf(ctx)} - if len(request.Params) > 0 { - fnArgs, err := jsonParamsToArgs(rpcFunc, cdc, request.Params) - if err != nil { - responses = append( - responses, - types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "error converting json params to arguments")), - ) - continue - } - args = append(args, fnArgs...) - } - returns := rpcFunc.f.Call(args) - logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - responses = append(responses, types.RPCInternalError(request.ID, err)) - continue - } - responses = append(responses, types.NewRPCSuccessResponse(cdc, request.ID, result)) - } - if len(responses) > 0 { - WriteRPCResponseArrayHTTP(w, responses) - } - } -} - -func handleInvalidJSONRPCPaths(next http.HandlerFunc) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - // Since the pattern "/" matches all paths not matched by other registered patterns, - // we check whether the path is indeed "/", otherwise return a 404 error - if r.URL.Path != "/" { - http.NotFound(w, r) - return - } - - next(w, r) - } -} - -func mapParamsToArgs( - rpcFunc *RPCFunc, - cdc *amino.Codec, - params map[string]json.RawMessage, - argsOffset int, -) ([]reflect.Value, error) { - values := make([]reflect.Value, len(rpcFunc.argNames)) - for i, argName := range rpcFunc.argNames { - argType := rpcFunc.args[i+argsOffset] - - if p, ok := params[argName]; ok && p != nil && len(p) > 0 { - val := reflect.New(argType) - err := cdc.UnmarshalJSON(p, val.Interface()) - if err != nil { - return nil, err - } - values[i] = val.Elem() - } else { // use default for that type - values[i] = reflect.Zero(argType) - } - } - - return values, nil -} - -func arrayParamsToArgs( - rpcFunc *RPCFunc, - cdc *amino.Codec, - params []json.RawMessage, - argsOffset int, -) ([]reflect.Value, error) { - if len(rpcFunc.argNames) != len(params) { - return nil, errors.Errorf("expected %v parameters (%v), got %v (%v)", - len(rpcFunc.argNames), rpcFunc.argNames, len(params), params) - } - - values := make([]reflect.Value, len(params)) - for i, p := range params { - argType := rpcFunc.args[i+argsOffset] - val := reflect.New(argType) - err := cdc.UnmarshalJSON(p, val.Interface()) - if err != nil { - return nil, err - } - values[i] = val.Elem() - } - return values, nil -} - -// raw is unparsed json (from json.RawMessage) encoding either a map or an -// array. -// -// Example: -// rpcFunc.args = [rpctypes.Context string] -// rpcFunc.argNames = ["arg"] -func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte) ([]reflect.Value, error) { - const argsOffset = 1 - - // TODO: Make more efficient, perhaps by checking the first character for '{' or '['? - // First, try to get the map. - var m map[string]json.RawMessage - err := json.Unmarshal(raw, &m) - if err == nil { - return mapParamsToArgs(rpcFunc, cdc, m, argsOffset) - } - - // Otherwise, try an array. - var a []json.RawMessage - err = json.Unmarshal(raw, &a) - if err == nil { - return arrayParamsToArgs(rpcFunc, cdc, a, argsOffset) - } - - // Otherwise, bad format, we cannot parse - return nil, errors.Errorf("unknown type for JSON params: %v. Expected map or array", err) -} - -// rpc.json -//----------------------------------------------------------------------------- -// rpc.http - -// convert from a function name to the http handler -func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func(http.ResponseWriter, *http.Request) { - // Exception for websocket endpoints - if rpcFunc.ws { - return func(w http.ResponseWriter, r *http.Request) { - WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(types.JSONRPCStringID(""))) - } - } - - // All other endpoints - return func(w http.ResponseWriter, r *http.Request) { - logger.Debug("HTTP HANDLER", "req", r) - - ctx := &types.Context{HTTPReq: r} - args := []reflect.Value{reflect.ValueOf(ctx)} - - fnArgs, err := httpParamsToArgs(rpcFunc, cdc, r) - if err != nil { - WriteRPCResponseHTTP( - w, - types.RPCInvalidParamsError( - types.JSONRPCStringID(""), - errors.Wrap(err, "error converting http params to arguments"), - ), - ) - return - } - args = append(args, fnArgs...) - - returns := rpcFunc.f.Call(args) - - logger.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - WriteRPCResponseHTTP(w, types.RPCInternalError(types.JSONRPCStringID(""), err)) - return - } - WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(cdc, types.JSONRPCStringID(""), result)) - } -} - -// Covert an http query to a list of properly typed values. -// To be properly decoded the arg must be a concrete type from tendermint (if its an interface). -func httpParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, r *http.Request) ([]reflect.Value, error) { - // skip types.Context - const argsOffset = 1 - - values := make([]reflect.Value, len(rpcFunc.argNames)) - - for i, name := range rpcFunc.argNames { - argType := rpcFunc.args[i+argsOffset] - - values[i] = reflect.Zero(argType) // set default for that type - - arg := GetParam(r, name) - // log.Notice("param to arg", "argType", argType, "name", name, "arg", arg) - - if "" == arg { - continue - } - - v, err, ok := nonJSONStringToArg(cdc, argType, arg) - if err != nil { - return nil, err - } - if ok { - values[i] = v - continue - } - - values[i], err = jsonStringToArg(cdc, argType, arg) - if err != nil { - return nil, err - } - } - - return values, nil -} - -func jsonStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect.Value, error) { - rv := reflect.New(rt) - err := cdc.UnmarshalJSON([]byte(arg), rv.Interface()) - if err != nil { - return rv, err - } - rv = rv.Elem() - return rv, nil -} - -func nonJSONStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect.Value, error, bool) { - if rt.Kind() == reflect.Ptr { - rv_, err, ok := nonJSONStringToArg(cdc, rt.Elem(), arg) - switch { - case err != nil: - return reflect.Value{}, err, false - case ok: - rv := reflect.New(rt.Elem()) - rv.Elem().Set(rv_) - return rv, nil, true - default: - return reflect.Value{}, nil, false - } - } else { - return _nonJSONStringToArg(cdc, rt, arg) - } -} - -// NOTE: rt.Kind() isn't a pointer. -func _nonJSONStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect.Value, error, bool) { - isIntString := RE_INT.Match([]byte(arg)) - isQuotedString := strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`) - isHexString := strings.HasPrefix(strings.ToLower(arg), "0x") - - var expectingString, expectingByteSlice, expectingInt bool - switch rt.Kind() { - case reflect.Int, - reflect.Uint, - reflect.Int8, - reflect.Uint8, - reflect.Int16, - reflect.Uint16, - reflect.Int32, - reflect.Uint32, - reflect.Int64, - reflect.Uint64: - expectingInt = true - case reflect.String: - expectingString = true - case reflect.Slice: - expectingByteSlice = rt.Elem().Kind() == reflect.Uint8 - } - - if isIntString && expectingInt { - qarg := `"` + arg + `"` - // jsonStringToArg - rv, err := jsonStringToArg(cdc, rt, qarg) - if err != nil { - return rv, err, false - } - - return rv, nil, true - } - - if isHexString { - if !expectingString && !expectingByteSlice { - err := errors.Errorf("got a hex string arg, but expected '%s'", - rt.Kind().String()) - return reflect.ValueOf(nil), err, false - } - - var value []byte - value, err := hex.DecodeString(arg[2:]) - if err != nil { - return reflect.ValueOf(nil), err, false - } - if rt.Kind() == reflect.String { - return reflect.ValueOf(string(value)), nil, true - } - return reflect.ValueOf(value), nil, true - } - - if isQuotedString && expectingByteSlice { - v := reflect.New(reflect.TypeOf("")) - err := cdc.UnmarshalJSON([]byte(arg), v.Interface()) - if err != nil { - return reflect.ValueOf(nil), err, false - } - v = v.Elem() - return reflect.ValueOf([]byte(v.String())), nil, true - } - - return reflect.ValueOf(nil), nil, false -} - -// rpc.http -//----------------------------------------------------------------------------- -// rpc.websocket - -const ( - defaultWSWriteChanCapacity = 1000 - defaultWSWriteWait = 10 * time.Second - defaultWSReadWait = 30 * time.Second - defaultWSPingPeriod = (defaultWSReadWait * 9) / 10 -) - -// A single websocket connection contains listener id, underlying ws -// connection, and the event switch for subscribing to events. -// -// In case of an error, the connection is stopped. -type wsConnection struct { - cmn.BaseService - - remoteAddr string - baseConn *websocket.Conn - writeChan chan types.RPCResponse - - funcMap map[string]*RPCFunc - cdc *amino.Codec - - // write channel capacity - writeChanCapacity int - - // each write times out after this. - writeWait time.Duration - - // Connection times out if we haven't received *anything* in this long, not even pings. - readWait time.Duration - - // Send pings to server with this period. Must be less than readWait, but greater than zero. - pingPeriod time.Duration - - // Maximum message size. - readLimit int64 - - // callback which is called upon disconnect - onDisconnect func(remoteAddr string) - - ctx context.Context - cancel context.CancelFunc -} - -// NewWSConnection wraps websocket.Conn. -// -// See the commentary on the func(*wsConnection) functions for a detailed -// description of how to configure ping period and pong wait time. NOTE: if the -// write buffer is full, pongs may be dropped, which may cause clients to -// disconnect. see https://github.com/gorilla/websocket/issues/97 -func NewWSConnection( - baseConn *websocket.Conn, - funcMap map[string]*RPCFunc, - cdc *amino.Codec, - options ...func(*wsConnection), -) *wsConnection { - wsc := &wsConnection{ - remoteAddr: baseConn.RemoteAddr().String(), - baseConn: baseConn, - funcMap: funcMap, - cdc: cdc, - writeWait: defaultWSWriteWait, - writeChanCapacity: defaultWSWriteChanCapacity, - readWait: defaultWSReadWait, - pingPeriod: defaultWSPingPeriod, - } - for _, option := range options { - option(wsc) - } - wsc.baseConn.SetReadLimit(wsc.readLimit) - wsc.BaseService = *cmn.NewBaseService(nil, "wsConnection", wsc) - return wsc -} - -// OnDisconnect sets a callback which is used upon disconnect - not -// Goroutine-safe. Nop by default. -func OnDisconnect(onDisconnect func(remoteAddr string)) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.onDisconnect = onDisconnect - } -} - -// WriteWait sets the amount of time to wait before a websocket write times out. -// It should only be used in the constructor - not Goroutine-safe. -func WriteWait(writeWait time.Duration) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.writeWait = writeWait - } -} - -// WriteChanCapacity sets the capacity of the websocket write channel. -// It should only be used in the constructor - not Goroutine-safe. -func WriteChanCapacity(cap int) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.writeChanCapacity = cap - } -} - -// ReadWait sets the amount of time to wait before a websocket read times out. -// It should only be used in the constructor - not Goroutine-safe. -func ReadWait(readWait time.Duration) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.readWait = readWait - } -} - -// PingPeriod sets the duration for sending websocket pings. -// It should only be used in the constructor - not Goroutine-safe. -func PingPeriod(pingPeriod time.Duration) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.pingPeriod = pingPeriod - } -} - -// ReadLimit sets the maximum size for reading message. -// It should only be used in the constructor - not Goroutine-safe. -func ReadLimit(readLimit int64) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.readLimit = readLimit - } -} - -// OnStart implements cmn.Service by starting the read and write routines. It -// blocks until the connection closes. -func (wsc *wsConnection) OnStart() error { - wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity) - - // Read subscriptions/unsubscriptions to events - go wsc.readRoutine() - // Write responses, BLOCKING. - wsc.writeRoutine() - - return nil -} - -// OnStop implements cmn.Service by unsubscribing remoteAddr from all subscriptions. -func (wsc *wsConnection) OnStop() { - // Both read and write loops close the websocket connection when they exit their loops. - // The writeChan is never closed, to allow WriteRPCResponse() to fail. - - if wsc.onDisconnect != nil { - wsc.onDisconnect(wsc.remoteAddr) - } - - if wsc.ctx != nil { - wsc.cancel() - } -} - -// GetRemoteAddr returns the remote address of the underlying connection. -// It implements WSRPCConnection -func (wsc *wsConnection) GetRemoteAddr() string { - return wsc.remoteAddr -} - -// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. -// It implements WSRPCConnection. It is Goroutine-safe. -func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { - select { - case <-wsc.Quit(): - return - case wsc.writeChan <- resp: - } -} - -// TryWriteRPCResponse attempts to push a response to the writeChan, but does not block. -// It implements WSRPCConnection. It is Goroutine-safe -func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { - select { - case <-wsc.Quit(): - return false - case wsc.writeChan <- resp: - return true - default: - return false - } -} - -// Codec returns an amino codec used to decode parameters and encode results. -// It implements WSRPCConnection. -func (wsc *wsConnection) Codec() *amino.Codec { - return wsc.cdc -} - -// Context returns the connection's context. -// The context is canceled when the client's connection closes. -func (wsc *wsConnection) Context() context.Context { - if wsc.ctx != nil { - return wsc.ctx - } - wsc.ctx, wsc.cancel = context.WithCancel(context.Background()) - return wsc.ctx -} - -// Read from the socket and subscribe to or unsubscribe from events -func (wsc *wsConnection) readRoutine() { - defer func() { - if r := recover(); r != nil { - err, ok := r.(error) - if !ok { - err = fmt.Errorf("WSJSONRPC: %v", r) - } - wsc.Logger.Error("Panic in WSJSONRPC handler", "err", err, "stack", string(debug.Stack())) - wsc.WriteRPCResponse(types.RPCInternalError(types.JSONRPCStringID("unknown"), err)) - go wsc.readRoutine() - } else { - wsc.baseConn.Close() // nolint: errcheck - } - }() - - wsc.baseConn.SetPongHandler(func(m string) error { - return wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)) - }) - - for { - select { - case <-wsc.Quit(): - return - default: - // reset deadline for every type of message (control or data) - if err := wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)); err != nil { - wsc.Logger.Error("failed to set read deadline", "err", err) - } - var in []byte - _, in, err := wsc.baseConn.ReadMessage() - if err != nil { - if websocket.IsCloseError(err, websocket.CloseNormalClosure) { - wsc.Logger.Info("Client closed the connection") - } else { - wsc.Logger.Error("Failed to read request", "err", err) - } - wsc.Stop() - return - } - - var request types.RPCRequest - err = json.Unmarshal(in, &request) - if err != nil { - wsc.WriteRPCResponse(types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "error unmarshaling request"))) - continue - } - - // A Notification is a Request object without an "id" member. - // The Server MUST NOT reply to a Notification, including those that are within a batch request. - if request.ID == types.JSONRPCStringID("") { - wsc.Logger.Debug( - "WSJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)", - ) - continue - } - - // Now, fetch the RPCFunc and execute it. - rpcFunc := wsc.funcMap[request.Method] - if rpcFunc == nil { - wsc.WriteRPCResponse(types.RPCMethodNotFoundError(request.ID)) - continue - } - - ctx := &types.Context{JSONReq: &request, WSConn: wsc} - args := []reflect.Value{reflect.ValueOf(ctx)} - if len(request.Params) > 0 { - fnArgs, err := jsonParamsToArgs(rpcFunc, wsc.cdc, request.Params) - if err != nil { - wsc.WriteRPCResponse( - types.RPCInternalError(request.ID, errors.Wrap(err, "error converting json params to arguments")), - ) - continue - } - args = append(args, fnArgs...) - } - - returns := rpcFunc.f.Call(args) - - // TODO: Need to encode args/returns to string if we want to log them - wsc.Logger.Info("WSJSONRPC", "method", request.Method) - - result, err := unreflectResult(returns) - if err != nil { - wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err)) - continue - } - - wsc.WriteRPCResponse(types.NewRPCSuccessResponse(wsc.cdc, request.ID, result)) - } - } -} - -// receives on a write channel and writes out on the socket -func (wsc *wsConnection) writeRoutine() { - pingTicker := time.NewTicker(wsc.pingPeriod) - defer func() { - pingTicker.Stop() - if err := wsc.baseConn.Close(); err != nil { - wsc.Logger.Error("Error closing connection", "err", err) - } - }() - - // https://github.com/gorilla/websocket/issues/97 - pongs := make(chan string, 1) - wsc.baseConn.SetPingHandler(func(m string) error { - select { - case pongs <- m: - default: - } - return nil - }) - - for { - select { - case m := <-pongs: - err := wsc.writeMessageWithDeadline(websocket.PongMessage, []byte(m)) - if err != nil { - wsc.Logger.Info("Failed to write pong (client may disconnect)", "err", err) - } - case <-pingTicker.C: - err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{}) - if err != nil { - wsc.Logger.Error("Failed to write ping", "err", err) - wsc.Stop() - return - } - case msg := <-wsc.writeChan: - jsonBytes, err := json.MarshalIndent(msg, "", " ") - if err != nil { - wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err) - } else if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil { - wsc.Logger.Error("Failed to write response", "err", err) - wsc.Stop() - return - } - case <-wsc.Quit(): - return - } - } -} - -// All writes to the websocket must (re)set the write deadline. -// If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553) -func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error { - if err := wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait)); err != nil { - return err - } - return wsc.baseConn.WriteMessage(msgType, msg) -} - -//---------------------------------------- - -// WebsocketManager provides a WS handler for incoming connections and passes a -// map of functions along with any additional params to new connections. -// NOTE: The websocket path is defined externally, e.g. in node/node.go -type WebsocketManager struct { - websocket.Upgrader - - funcMap map[string]*RPCFunc - cdc *amino.Codec - logger log.Logger - wsConnOptions []func(*wsConnection) -} - -// NewWebsocketManager returns a new WebsocketManager that passes a map of -// functions, connection options and logger to new WS connections. -func NewWebsocketManager( - funcMap map[string]*RPCFunc, - cdc *amino.Codec, - wsConnOptions ...func(*wsConnection), -) *WebsocketManager { - return &WebsocketManager{ - funcMap: funcMap, - cdc: cdc, - Upgrader: websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - // TODO ??? - return true - }, - }, - logger: log.NewNopLogger(), - wsConnOptions: wsConnOptions, - } -} - -// SetLogger sets the logger. -func (wm *WebsocketManager) SetLogger(l log.Logger) { - wm.logger = l -} - -// WebsocketHandler upgrades the request/response (via http.Hijack) and starts -// the wsConnection. -func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { - wsConn, err := wm.Upgrade(w, r, nil) - if err != nil { - // TODO - return http error - wm.logger.Error("Failed to upgrade to websocket connection", "err", err) - return - } - - // register connection - con := NewWSConnection(wsConn, wm.funcMap, wm.cdc, wm.wsConnOptions...) - con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr())) - wm.logger.Info("New websocket connection", "remote", con.remoteAddr) - err = con.Start() // Blocking - if err != nil { - wm.logger.Error("Error starting connection", "err", err) - } -} - -// rpc.websocket -//----------------------------------------------------------------------------- - -// NOTE: assume returns is result struct and error. If error is not nil, return it -func unreflectResult(returns []reflect.Value) (interface{}, error) { - errV := returns[1] - if errV.Interface() != nil { - return nil, errors.Errorf("%v", errV.Interface()) - } - rv := returns[0] - // the result is a registered interface, - // we need a pointer to it so we can marshal with type byte - rvp := reflect.New(rv.Type()) - rvp.Elem().Set(rv) - return rvp.Interface(), nil -} - -// writes a list of available rpc endpoints as an html page -func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) { - noArgNames := []string{} - argNames := []string{} - for name, funcData := range funcMap { - if len(funcData.args) == 0 { - noArgNames = append(noArgNames, name) - } else { - argNames = append(argNames, name) - } - } - sort.Strings(noArgNames) - sort.Strings(argNames) - buf := new(bytes.Buffer) - buf.WriteString("") - buf.WriteString("
Available endpoints:
") - - for _, name := range noArgNames { - link := fmt.Sprintf("//%s/%s", r.Host, name) - buf.WriteString(fmt.Sprintf("%s
", link, link)) - } - - buf.WriteString("
Endpoints that require arguments:
") - for _, name := range argNames { - link := fmt.Sprintf("//%s/%s?", r.Host, name) - funcData := funcMap[name] - for i, argName := range funcData.argNames { - link += argName + "=_" - if i < len(funcData.argNames)-1 { - link += "&" - } - } - buf.WriteString(fmt.Sprintf("%s
", link, link)) - } - buf.WriteString("") - w.Header().Set("Content-Type", "text/html") - w.WriteHeader(200) - w.Write(buf.Bytes()) // nolint: errcheck -} diff --git a/rpc/lib/server/http_json_handler.go b/rpc/lib/server/http_json_handler.go new file mode 100644 index 000000000..65c0a680f --- /dev/null +++ b/rpc/lib/server/http_json_handler.go @@ -0,0 +1,248 @@ +package rpcserver + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "reflect" + "sort" + + "github.com/pkg/errors" + + amino "github.com/tendermint/go-amino" + + "github.com/tendermint/tendermint/libs/log" + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +/////////////////////////////////////////////////////////////////////////////// +// HTTP + JSON handler +/////////////////////////////////////////////////////////////////////////////// + +// jsonrpc calls grab the given method's function info and runs reflect.Call +func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger log.Logger) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + WriteRPCResponseHTTP( + w, + types.RPCInvalidRequestError( + nil, + errors.Wrap(err, "error reading request body"), + ), + ) + return + } + + // if its an empty request (like from a browser), just display a list of + // functions + if len(b) == 0 { + writeListOfEndpoints(w, r, funcMap) + return + } + + // first try to unmarshal the incoming request as an array of RPC requests + var ( + requests []types.RPCRequest + responses []types.RPCResponse + ) + if err := json.Unmarshal(b, &requests); err != nil { + // next, try to unmarshal as a single request + var request types.RPCRequest + if err := json.Unmarshal(b, &request); err != nil { + WriteRPCResponseHTTP( + w, + types.RPCParseError( + errors.Wrap(err, "error unmarshalling request"), + ), + ) + return + } + requests = []types.RPCRequest{request} + } + + for _, request := range requests { + request := request + + // A Notification is a Request object without an "id" member. + // The Server MUST NOT reply to a Notification, including those that are within a batch request. + if request.ID == nil { + logger.Debug( + "HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)", + "req", request, + ) + continue + } + if len(r.URL.Path) > 1 { + responses = append( + responses, + types.RPCInvalidRequestError(request.ID, errors.Errorf("path %s is invalid", r.URL.Path)), + ) + continue + } + rpcFunc, ok := funcMap[request.Method] + if !ok || rpcFunc.ws { + responses = append(responses, types.RPCMethodNotFoundError(request.ID)) + continue + } + ctx := &types.Context{JSONReq: &request, HTTPReq: r} + args := []reflect.Value{reflect.ValueOf(ctx)} + if len(request.Params) > 0 { + fnArgs, err := jsonParamsToArgs(rpcFunc, cdc, request.Params) + if err != nil { + responses = append( + responses, + types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "error converting json params to arguments")), + ) + continue + } + args = append(args, fnArgs...) + } + returns := rpcFunc.f.Call(args) + logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + responses = append(responses, types.RPCInternalError(request.ID, err)) + continue + } + responses = append(responses, types.NewRPCSuccessResponse(cdc, request.ID, result)) + } + if len(responses) > 0 { + WriteRPCResponseArrayHTTP(w, responses) + } + } +} + +func handleInvalidJSONRPCPaths(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // Since the pattern "/" matches all paths not matched by other registered patterns, + // we check whether the path is indeed "/", otherwise return a 404 error + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + + next(w, r) + } +} + +func mapParamsToArgs( + rpcFunc *RPCFunc, + cdc *amino.Codec, + params map[string]json.RawMessage, + argsOffset int, +) ([]reflect.Value, error) { + + values := make([]reflect.Value, len(rpcFunc.argNames)) + for i, argName := range rpcFunc.argNames { + argType := rpcFunc.args[i+argsOffset] + + if p, ok := params[argName]; ok && p != nil && len(p) > 0 { + val := reflect.New(argType) + err := cdc.UnmarshalJSON(p, val.Interface()) + if err != nil { + return nil, err + } + values[i] = val.Elem() + } else { // use default for that type + values[i] = reflect.Zero(argType) + } + } + + return values, nil +} + +func arrayParamsToArgs( + rpcFunc *RPCFunc, + cdc *amino.Codec, + params []json.RawMessage, + argsOffset int, +) ([]reflect.Value, error) { + + if len(rpcFunc.argNames) != len(params) { + return nil, errors.Errorf("expected %v parameters (%v), got %v (%v)", + len(rpcFunc.argNames), rpcFunc.argNames, len(params), params) + } + + values := make([]reflect.Value, len(params)) + for i, p := range params { + argType := rpcFunc.args[i+argsOffset] + val := reflect.New(argType) + err := cdc.UnmarshalJSON(p, val.Interface()) + if err != nil { + return nil, err + } + values[i] = val.Elem() + } + return values, nil +} + +// raw is unparsed json (from json.RawMessage) encoding either a map or an +// array. +// +// Example: +// rpcFunc.args = [rpctypes.Context string] +// rpcFunc.argNames = ["arg"] +func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte) ([]reflect.Value, error) { + const argsOffset = 1 + + // TODO: Make more efficient, perhaps by checking the first character for '{' or '['? + // First, try to get the map. + var m map[string]json.RawMessage + err := json.Unmarshal(raw, &m) + if err == nil { + return mapParamsToArgs(rpcFunc, cdc, m, argsOffset) + } + + // Otherwise, try an array. + var a []json.RawMessage + err = json.Unmarshal(raw, &a) + if err == nil { + return arrayParamsToArgs(rpcFunc, cdc, a, argsOffset) + } + + // Otherwise, bad format, we cannot parse + return nil, errors.Errorf("unknown type for JSON params: %v. Expected map or array", err) +} + +// writes a list of available rpc endpoints as an html page +func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) { + noArgNames := []string{} + argNames := []string{} + for name, funcData := range funcMap { + if len(funcData.args) == 0 { + noArgNames = append(noArgNames, name) + } else { + argNames = append(argNames, name) + } + } + sort.Strings(noArgNames) + sort.Strings(argNames) + buf := new(bytes.Buffer) + buf.WriteString("") + buf.WriteString("
Available endpoints:
") + + for _, name := range noArgNames { + link := fmt.Sprintf("//%s/%s", r.Host, name) + buf.WriteString(fmt.Sprintf("%s
", link, link)) + } + + buf.WriteString("
Endpoints that require arguments:
") + for _, name := range argNames { + link := fmt.Sprintf("//%s/%s?", r.Host, name) + funcData := funcMap[name] + for i, argName := range funcData.argNames { + link += argName + "=_" + if i < len(funcData.argNames)-1 { + link += "&" + } + } + buf.WriteString(fmt.Sprintf("%s
", link, link)) + } + buf.WriteString("") + w.Header().Set("Content-Type", "text/html") + w.WriteHeader(200) + w.Write(buf.Bytes()) // nolint: errcheck +} diff --git a/rpc/lib/server/handlers_test.go b/rpc/lib/server/http_json_handler_test.go similarity index 76% rename from rpc/lib/server/handlers_test.go rename to rpc/lib/server/http_json_handler_test.go index a882d5939..e4ae2f8bf 100644 --- a/rpc/lib/server/handlers_test.go +++ b/rpc/lib/server/http_json_handler_test.go @@ -1,4 +1,4 @@ -package rpcserver_test +package rpcserver import ( "bytes" @@ -9,32 +9,23 @@ import ( "strings" "testing" - "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" amino "github.com/tendermint/go-amino" "github.com/tendermint/tendermint/libs/log" - rs "github.com/tendermint/tendermint/rpc/lib/server" types "github.com/tendermint/tendermint/rpc/lib/types" ) -////////////////////////////////////////////////////////////////////////////// -// HTTP REST API -// TODO - -////////////////////////////////////////////////////////////////////////////// -// JSON-RPC over HTTP - func testMux() *http.ServeMux { - funcMap := map[string]*rs.RPCFunc{ - "c": rs.NewRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"), + funcMap := map[string]*RPCFunc{ + "c": NewRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"), } cdc := amino.NewCodec() mux := http.NewServeMux() buf := new(bytes.Buffer) logger := log.NewTMLogger(buf) - rs.RegisterRPCFuncs(mux, funcMap, cdc, logger) + RegisterRPCFuncs(mux, funcMap, cdc, logger) return mux } @@ -49,17 +40,20 @@ func TestRPCParams(t *testing.T) { tests := []struct { payload string wantErr string - expectedId interface{} + expectedID interface{} }{ // bad {`{"jsonrpc": "2.0", "id": "0"}`, "Method not found", types.JSONRPCStringID("0")}, {`{"jsonrpc": "2.0", "method": "y", "id": "0"}`, "Method not found", types.JSONRPCStringID("0")}, // id not captured in JSON parsing failures - {`{"method": "c", "id": "0", "params": a}`, "invalid character", types.JSONRPCStringID("")}, + {`{"method": "c", "id": "0", "params": a}`, "invalid character", nil}, {`{"method": "c", "id": "0", "params": ["a"]}`, "got 1", types.JSONRPCStringID("0")}, {`{"method": "c", "id": "0", "params": ["a", "b"]}`, "invalid character", types.JSONRPCStringID("0")}, {`{"method": "c", "id": "0", "params": [1, 1]}`, "of type string", types.JSONRPCStringID("0")}, + // no ID - notification + // {`{"jsonrpc": "2.0", "method": "c", "params": ["a", "10"]}`, false, nil}, + // good {`{"jsonrpc": "2.0", "method": "c", "id": "0", "params": null}`, "", types.JSONRPCStringID("0")}, {`{"method": "c", "id": "0", "params": {}}`, "", types.JSONRPCStringID("0")}, @@ -83,7 +77,7 @@ func TestRPCParams(t *testing.T) { recv := new(types.RPCResponse) assert.Nil(t, json.Unmarshal(blob, recv), "#%d: expecting successful parsing of an RPCResponse:\nblob: %s", i, blob) assert.NotEqual(t, recv, new(types.RPCResponse), "#%d: not expecting a blank RPCResponse", i) - assert.Equal(t, tt.expectedId, recv.ID, "#%d: expected ID not matched in RPCResponse", i) + assert.Equal(t, tt.expectedID, recv.ID, "#%d: expected ID not matched in RPCResponse", i) if tt.wantErr == "" { assert.Nil(t, recv.Error, "#%d: not expecting an error", i) } else { @@ -99,7 +93,7 @@ func TestJSONRPCID(t *testing.T) { tests := []struct { payload string wantErr bool - expectedId interface{} + expectedID interface{} }{ // good id {`{"jsonrpc": "2.0", "method": "c", "id": "0", "params": ["a", "10"]}`, false, types.JSONRPCStringID("0")}, @@ -108,7 +102,6 @@ func TestJSONRPCID(t *testing.T) { {`{"jsonrpc": "2.0", "method": "c", "id": 1, "params": ["a", "10"]}`, false, types.JSONRPCIntID(1)}, {`{"jsonrpc": "2.0", "method": "c", "id": 1.3, "params": ["a", "10"]}`, false, types.JSONRPCIntID(1)}, {`{"jsonrpc": "2.0", "method": "c", "id": -1, "params": ["a", "10"]}`, false, types.JSONRPCIntID(-1)}, - {`{"jsonrpc": "2.0", "method": "c", "id": null, "params": ["a", "10"]}`, false, nil}, // bad id {`{"jsonrpc": "2.0", "method": "c", "id": {}, "params": ["a", "10"]}`, true, nil}, @@ -134,7 +127,7 @@ func TestJSONRPCID(t *testing.T) { assert.Nil(t, err, "#%d: expecting successful parsing of an RPCResponse:\nblob: %s", i, blob) if !tt.wantErr { assert.NotEqual(t, recv, new(types.RPCResponse), "#%d: not expecting a blank RPCResponse", i) - assert.Equal(t, tt.expectedId, recv.ID, "#%d: expected ID not matched in RPCResponse", i) + assert.Equal(t, tt.expectedID, recv.ID, "#%d: expected ID not matched in RPCResponse", i) assert.Nil(t, recv.Error, "#%d: not expecting an error", i) } else { assert.True(t, recv.Error.Code < 0, "#%d: not expecting a positive JSONRPC code", i) @@ -144,7 +137,7 @@ func TestJSONRPCID(t *testing.T) { func TestRPCNotification(t *testing.T) { mux := testMux() - body := strings.NewReader(`{"jsonrpc": "2.0", "id": ""}`) + body := strings.NewReader(`{"jsonrpc": "2.0"}`) req, _ := http.NewRequest("POST", "http://localhost/", body) rec := httptest.NewRecorder() mux.ServeHTTP(rec, req) @@ -166,16 +159,16 @@ func TestRPCNotificationInBatch(t *testing.T) { }{ { `[ - {"jsonrpc": "2.0","id": ""}, + {"jsonrpc": "2.0"}, {"jsonrpc": "2.0","method":"c","id":"abc","params":["a","10"]} ]`, 1, }, { `[ - {"jsonrpc": "2.0","id": ""}, + {"jsonrpc": "2.0"}, {"jsonrpc": "2.0","method":"c","id":"abc","params":["a","10"]}, - {"jsonrpc": "2.0","id": ""}, + {"jsonrpc": "2.0"}, {"jsonrpc": "2.0","method":"c","id":"abc","params":["a","10"]} ]`, 2, @@ -236,50 +229,3 @@ func TestUnknownRPCPath(t *testing.T) { require.Equal(t, http.StatusNotFound, res.StatusCode, "should always return 404") res.Body.Close() } - -////////////////////////////////////////////////////////////////////////////// -// JSON-RPC over WEBSOCKETS - -func TestWebsocketManagerHandler(t *testing.T) { - s := newWSServer() - defer s.Close() - - // check upgrader works - d := websocket.Dialer{} - c, dialResp, err := d.Dial("ws://"+s.Listener.Addr().String()+"/websocket", nil) - require.NoError(t, err) - - if got, want := dialResp.StatusCode, http.StatusSwitchingProtocols; got != want { - t.Errorf("dialResp.StatusCode = %q, want %q", got, want) - } - - // check basic functionality works - req, err := types.MapToRequest( - amino.NewCodec(), - types.JSONRPCStringID("TestWebsocketManager"), - "c", - map[string]interface{}{"s": "a", "i": 10}, - ) - require.NoError(t, err) - err = c.WriteJSON(req) - require.NoError(t, err) - - var resp types.RPCResponse - err = c.ReadJSON(&resp) - require.NoError(t, err) - require.Nil(t, resp.Error) - dialResp.Body.Close() -} - -func newWSServer() *httptest.Server { - funcMap := map[string]*rs.RPCFunc{ - "c": rs.NewWSRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"), - } - wm := rs.NewWebsocketManager(funcMap, amino.NewCodec()) - wm.SetLogger(log.TestingLogger()) - - mux := http.NewServeMux() - mux.HandleFunc("/websocket", wm.WebsocketHandler) - - return httptest.NewServer(mux) -} diff --git a/rpc/lib/server/http_server.go b/rpc/lib/server/http_server.go index b48f09e56..501396867 100644 --- a/rpc/lib/server/http_server.go +++ b/rpc/lib/server/http_server.go @@ -163,7 +163,7 @@ func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler WriteRPCResponseHTTPError( rww, http.StatusInternalServerError, - types.RPCInternalError(types.JSONRPCStringID(""), e.(error)), + types.RPCInternalError(types.JSONRPCIntID(-1), e.(error)), ) } } diff --git a/rpc/lib/server/http_uri_handler.go b/rpc/lib/server/http_uri_handler.go new file mode 100644 index 000000000..6c68ba3ac --- /dev/null +++ b/rpc/lib/server/http_uri_handler.go @@ -0,0 +1,195 @@ +package rpcserver + +import ( + "encoding/hex" + "net/http" + "reflect" + "strings" + + "github.com/pkg/errors" + + amino "github.com/tendermint/go-amino" + + "github.com/tendermint/tendermint/libs/log" + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +/////////////////////////////////////////////////////////////////////////////// +// HTTP + URI handler +/////////////////////////////////////////////////////////////////////////////// + +// convert from a function name to the http handler +func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func(http.ResponseWriter, *http.Request) { + // Always return -1 as there's no ID here. + dummyID := types.JSONRPCIntID(-1) // URIClientRequestID + + // Exception for websocket endpoints + if rpcFunc.ws { + return func(w http.ResponseWriter, r *http.Request) { + WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(dummyID)) + } + } + + // All other endpoints + return func(w http.ResponseWriter, r *http.Request) { + logger.Debug("HTTP HANDLER", "req", r) + + ctx := &types.Context{HTTPReq: r} + args := []reflect.Value{reflect.ValueOf(ctx)} + + fnArgs, err := httpParamsToArgs(rpcFunc, cdc, r) + if err != nil { + WriteRPCResponseHTTP( + w, + types.RPCInvalidParamsError( + dummyID, + errors.Wrap(err, "error converting http params to arguments"), + ), + ) + return + } + args = append(args, fnArgs...) + + returns := rpcFunc.f.Call(args) + + logger.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + WriteRPCResponseHTTP(w, types.RPCInternalError(dummyID, err)) + return + } + WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(cdc, dummyID, result)) + } +} + +// Covert an http query to a list of properly typed values. +// To be properly decoded the arg must be a concrete type from tendermint (if its an interface). +func httpParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, r *http.Request) ([]reflect.Value, error) { + // skip types.Context + const argsOffset = 1 + + values := make([]reflect.Value, len(rpcFunc.argNames)) + + for i, name := range rpcFunc.argNames { + argType := rpcFunc.args[i+argsOffset] + + values[i] = reflect.Zero(argType) // set default for that type + + arg := GetParam(r, name) + // log.Notice("param to arg", "argType", argType, "name", name, "arg", arg) + + if arg == "" { + continue + } + + v, ok, err := nonJSONStringToArg(cdc, argType, arg) + if err != nil { + return nil, err + } + if ok { + values[i] = v + continue + } + + values[i], err = jsonStringToArg(cdc, argType, arg) + if err != nil { + return nil, err + } + } + + return values, nil +} + +func jsonStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect.Value, error) { + rv := reflect.New(rt) + err := cdc.UnmarshalJSON([]byte(arg), rv.Interface()) + if err != nil { + return rv, err + } + rv = rv.Elem() + return rv, nil +} + +func nonJSONStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect.Value, bool, error) { + if rt.Kind() == reflect.Ptr { + rv1, ok, err := nonJSONStringToArg(cdc, rt.Elem(), arg) + switch { + case err != nil: + return reflect.Value{}, false, err + case ok: + rv := reflect.New(rt.Elem()) + rv.Elem().Set(rv1) + return rv, true, nil + default: + return reflect.Value{}, false, nil + } + } else { + return _nonJSONStringToArg(cdc, rt, arg) + } +} + +// NOTE: rt.Kind() isn't a pointer. +func _nonJSONStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect.Value, bool, error) { + isIntString := RE_INT.Match([]byte(arg)) + isQuotedString := strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`) + isHexString := strings.HasPrefix(strings.ToLower(arg), "0x") + + var expectingString, expectingByteSlice, expectingInt bool + switch rt.Kind() { + case reflect.Int, + reflect.Uint, + reflect.Int8, + reflect.Uint8, + reflect.Int16, + reflect.Uint16, + reflect.Int32, + reflect.Uint32, + reflect.Int64, + reflect.Uint64: + expectingInt = true + case reflect.String: + expectingString = true + case reflect.Slice: + expectingByteSlice = rt.Elem().Kind() == reflect.Uint8 + } + + if isIntString && expectingInt { + qarg := `"` + arg + `"` + rv, err := jsonStringToArg(cdc, rt, qarg) + if err != nil { + return rv, false, err + } + + return rv, true, nil + } + + if isHexString { + if !expectingString && !expectingByteSlice { + err := errors.Errorf("got a hex string arg, but expected '%s'", + rt.Kind().String()) + return reflect.ValueOf(nil), false, err + } + + var value []byte + value, err := hex.DecodeString(arg[2:]) + if err != nil { + return reflect.ValueOf(nil), false, err + } + if rt.Kind() == reflect.String { + return reflect.ValueOf(string(value)), true, nil + } + return reflect.ValueOf(value), true, nil + } + + if isQuotedString && expectingByteSlice { + v := reflect.New(reflect.TypeOf("")) + err := cdc.UnmarshalJSON([]byte(arg), v.Interface()) + if err != nil { + return reflect.ValueOf(nil), false, err + } + v = v.Elem() + return reflect.ValueOf([]byte(v.String())), true, nil + } + + return reflect.ValueOf(nil), false, nil +} diff --git a/rpc/lib/server/rpc_func.go b/rpc/lib/server/rpc_func.go new file mode 100644 index 000000000..906533328 --- /dev/null +++ b/rpc/lib/server/rpc_func.go @@ -0,0 +1,103 @@ +package rpcserver + +import ( + "net/http" + "reflect" + "strings" + + "github.com/pkg/errors" + + amino "github.com/tendermint/go-amino" + + "github.com/tendermint/tendermint/libs/log" +) + +// RegisterRPCFuncs adds a route for each function in the funcMap, as well as +// general jsonrpc and websocket handlers for all functions. "result" is the +// interface on which the result objects are registered, and is popualted with +// every RPCResponse +func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, cdc *amino.Codec, logger log.Logger) { + // HTTP endpoints + for funcName, rpcFunc := range funcMap { + mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc, cdc, logger)) + } + + // JSONRPC endpoints + mux.HandleFunc("/", handleInvalidJSONRPCPaths(makeJSONRPCHandler(funcMap, cdc, logger))) +} + +/////////////////////////////////////////////////////////////////////////////// +// Function introspection +/////////////////////////////////////////////////////////////////////////////// + +// RPCFunc contains the introspected type information for a function +type RPCFunc struct { + f reflect.Value // underlying rpc function + args []reflect.Type // type of each function arg + returns []reflect.Type // type of each return arg + argNames []string // name of each argument + ws bool // websocket only +} + +// NewRPCFunc wraps a function for introspection. +// f is the function, args are comma separated argument names +func NewRPCFunc(f interface{}, args string) *RPCFunc { + return newRPCFunc(f, args, false) +} + +// NewWSRPCFunc wraps a function for introspection and use in the websockets. +func NewWSRPCFunc(f interface{}, args string) *RPCFunc { + return newRPCFunc(f, args, true) +} + +func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc { + var argNames []string + if args != "" { + argNames = strings.Split(args, ",") + } + return &RPCFunc{ + f: reflect.ValueOf(f), + args: funcArgTypes(f), + returns: funcReturnTypes(f), + argNames: argNames, + ws: ws, + } +} + +// return a function's argument types +func funcArgTypes(f interface{}) []reflect.Type { + t := reflect.TypeOf(f) + n := t.NumIn() + typez := make([]reflect.Type, n) + for i := 0; i < n; i++ { + typez[i] = t.In(i) + } + return typez +} + +// return a function's return types +func funcReturnTypes(f interface{}) []reflect.Type { + t := reflect.TypeOf(f) + n := t.NumOut() + typez := make([]reflect.Type, n) + for i := 0; i < n; i++ { + typez[i] = t.Out(i) + } + return typez +} + +//------------------------------------------------------------- + +// NOTE: assume returns is result struct and error. If error is not nil, return it +func unreflectResult(returns []reflect.Value) (interface{}, error) { + errV := returns[1] + if errV.Interface() != nil { + return nil, errors.Errorf("%v", errV.Interface()) + } + rv := returns[0] + // the result is a registered interface, + // we need a pointer to it so we can marshal with type byte + rvp := reflect.New(rv.Type()) + rvp.Elem().Set(rv) + return rvp.Interface(), nil +} diff --git a/rpc/lib/server/ws_handler.go b/rpc/lib/server/ws_handler.go new file mode 100644 index 000000000..80bda39f3 --- /dev/null +++ b/rpc/lib/server/ws_handler.go @@ -0,0 +1,448 @@ +package rpcserver + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "reflect" + "runtime/debug" + "time" + + "github.com/gorilla/websocket" + "github.com/pkg/errors" + + amino "github.com/tendermint/go-amino" + + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +/////////////////////////////////////////////////////////////////////////////// +// WebSocket handler +/////////////////////////////////////////////////////////////////////////////// + +const ( + defaultWSWriteChanCapacity = 1000 + defaultWSWriteWait = 10 * time.Second + defaultWSReadWait = 30 * time.Second + defaultWSPingPeriod = (defaultWSReadWait * 9) / 10 +) + +// WebsocketManager provides a WS handler for incoming connections and passes a +// map of functions along with any additional params to new connections. +// NOTE: The websocket path is defined externally, e.g. in node/node.go +type WebsocketManager struct { + websocket.Upgrader + + funcMap map[string]*RPCFunc + cdc *amino.Codec + logger log.Logger + wsConnOptions []func(*wsConnection) +} + +// NewWebsocketManager returns a new WebsocketManager that passes a map of +// functions, connection options and logger to new WS connections. +func NewWebsocketManager( + funcMap map[string]*RPCFunc, + cdc *amino.Codec, + wsConnOptions ...func(*wsConnection), +) *WebsocketManager { + return &WebsocketManager{ + funcMap: funcMap, + cdc: cdc, + Upgrader: websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + // TODO ??? + // + // The default behaviour would be relevant to browser-based clients, + // afaik. I suppose having a pass-through is a workaround for allowing + // for more complex security schemes, shifting the burden of + // AuthN/AuthZ outside the Tendermint RPC. + // I can't think of other uses right now that would warrant a TODO + // though. The real backstory of this TODO shall remain shrouded in + // mystery + return true + }, + }, + logger: log.NewNopLogger(), + wsConnOptions: wsConnOptions, + } +} + +// SetLogger sets the logger. +func (wm *WebsocketManager) SetLogger(l log.Logger) { + wm.logger = l +} + +// WebsocketHandler upgrades the request/response (via http.Hijack) and starts +// the wsConnection. +func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { + wsConn, err := wm.Upgrade(w, r, nil) + if err != nil { + // TODO - return http error + wm.logger.Error("Failed to upgrade connection", "err", err) + return + } + defer func() { + if err := wsConn.Close(); err != nil { + wm.logger.Error("Failed to close connection", "err", err) + } + }() + + // register connection + con := NewWSConnection(wsConn, wm.funcMap, wm.cdc, wm.wsConnOptions...) + con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr())) + wm.logger.Info("New websocket connection", "remote", con.remoteAddr) + err = con.Start() // BLOCKING + if err != nil { + wm.logger.Error("Failed to start connection", "err", err) + return + } + con.Stop() +} + +/////////////////////////////////////////////////////////////////////////////// +// WebSocket connection +/////////////////////////////////////////////////////////////////////////////// + +// A single websocket connection contains listener id, underlying ws +// connection, and the event switch for subscribing to events. +// +// In case of an error, the connection is stopped. +type wsConnection struct { + cmn.BaseService + + remoteAddr string + baseConn *websocket.Conn + // writeChan is never closed, to allow WriteRPCResponse() to fail. + writeChan chan types.RPCResponse + + // chan, which is closed when/if readRoutine errors + // used to abort writeRoutine + readRoutineQuit chan struct{} + + funcMap map[string]*RPCFunc + cdc *amino.Codec + + // write channel capacity + writeChanCapacity int + + // each write times out after this. + writeWait time.Duration + + // Connection times out if we haven't received *anything* in this long, not even pings. + readWait time.Duration + + // Send pings to server with this period. Must be less than readWait, but greater than zero. + pingPeriod time.Duration + + // Maximum message size. + readLimit int64 + + // callback which is called upon disconnect + onDisconnect func(remoteAddr string) + + ctx context.Context + cancel context.CancelFunc +} + +// NewWSConnection wraps websocket.Conn. +// +// See the commentary on the func(*wsConnection) functions for a detailed +// description of how to configure ping period and pong wait time. NOTE: if the +// write buffer is full, pongs may be dropped, which may cause clients to +// disconnect. see https://github.com/gorilla/websocket/issues/97 +func NewWSConnection( + baseConn *websocket.Conn, + funcMap map[string]*RPCFunc, + cdc *amino.Codec, + options ...func(*wsConnection), +) *wsConnection { + wsc := &wsConnection{ + remoteAddr: baseConn.RemoteAddr().String(), + baseConn: baseConn, + funcMap: funcMap, + cdc: cdc, + writeWait: defaultWSWriteWait, + writeChanCapacity: defaultWSWriteChanCapacity, + readWait: defaultWSReadWait, + pingPeriod: defaultWSPingPeriod, + readRoutineQuit: make(chan struct{}), + } + for _, option := range options { + option(wsc) + } + wsc.baseConn.SetReadLimit(wsc.readLimit) + wsc.BaseService = *cmn.NewBaseService(nil, "wsConnection", wsc) + return wsc +} + +// OnDisconnect sets a callback which is used upon disconnect - not +// Goroutine-safe. Nop by default. +func OnDisconnect(onDisconnect func(remoteAddr string)) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.onDisconnect = onDisconnect + } +} + +// WriteWait sets the amount of time to wait before a websocket write times out. +// It should only be used in the constructor - not Goroutine-safe. +func WriteWait(writeWait time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.writeWait = writeWait + } +} + +// WriteChanCapacity sets the capacity of the websocket write channel. +// It should only be used in the constructor - not Goroutine-safe. +func WriteChanCapacity(cap int) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.writeChanCapacity = cap + } +} + +// ReadWait sets the amount of time to wait before a websocket read times out. +// It should only be used in the constructor - not Goroutine-safe. +func ReadWait(readWait time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.readWait = readWait + } +} + +// PingPeriod sets the duration for sending websocket pings. +// It should only be used in the constructor - not Goroutine-safe. +func PingPeriod(pingPeriod time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.pingPeriod = pingPeriod + } +} + +// ReadLimit sets the maximum size for reading message. +// It should only be used in the constructor - not Goroutine-safe. +func ReadLimit(readLimit int64) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.readLimit = readLimit + } +} + +// OnStart implements cmn.Service by starting the read and write routines. It +// blocks until there's some error. +func (wsc *wsConnection) OnStart() error { + wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity) + + // Read subscriptions/unsubscriptions to events + go wsc.readRoutine() + // Write responses, BLOCKING. + wsc.writeRoutine() + + return nil +} + +// OnStop implements cmn.Service by unsubscribing remoteAddr from all +// subscriptions. +func (wsc *wsConnection) OnStop() { + if wsc.onDisconnect != nil { + wsc.onDisconnect(wsc.remoteAddr) + } + + if wsc.ctx != nil { + wsc.cancel() + } +} + +// GetRemoteAddr returns the remote address of the underlying connection. +// It implements WSRPCConnection +func (wsc *wsConnection) GetRemoteAddr() string { + return wsc.remoteAddr +} + +// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. +// It implements WSRPCConnection. It is Goroutine-safe. +func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { + select { + case <-wsc.Quit(): + return + case wsc.writeChan <- resp: + } +} + +// TryWriteRPCResponse attempts to push a response to the writeChan, but does not block. +// It implements WSRPCConnection. It is Goroutine-safe +func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { + select { + case <-wsc.Quit(): + return false + case wsc.writeChan <- resp: + return true + default: + return false + } +} + +// Codec returns an amino codec used to decode parameters and encode results. +// It implements WSRPCConnection. +func (wsc *wsConnection) Codec() *amino.Codec { + return wsc.cdc +} + +// Context returns the connection's context. +// The context is canceled when the client's connection closes. +func (wsc *wsConnection) Context() context.Context { + if wsc.ctx != nil { + return wsc.ctx + } + wsc.ctx, wsc.cancel = context.WithCancel(context.Background()) + return wsc.ctx +} + +// Read from the socket and subscribe to or unsubscribe from events +func (wsc *wsConnection) readRoutine() { + var request types.RPCRequest + + defer func() { + if r := recover(); r != nil { + err, ok := r.(error) + if !ok { + err = fmt.Errorf("WSJSONRPC: %v", r) + } + wsc.Logger.Error("Panic in WSJSONRPC handler", "err", err, "stack", string(debug.Stack())) + wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err)) + go wsc.readRoutine() + } + }() + + wsc.baseConn.SetPongHandler(func(m string) error { + return wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)) + }) + + for { + select { + case <-wsc.Quit(): + return + default: + // reset deadline for every type of message (control or data) + if err := wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)); err != nil { + wsc.Logger.Error("failed to set read deadline", "err", err) + } + var in []byte + _, in, err := wsc.baseConn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + wsc.Logger.Info("Client closed the connection") + } else { + wsc.Logger.Error("Failed to read request", "err", err) + } + wsc.Stop() + close(wsc.readRoutineQuit) + return + } + + err = json.Unmarshal(in, &request) + if err != nil { + wsc.WriteRPCResponse(types.RPCParseError(errors.Wrap(err, "error unmarshaling request"))) + continue + } + + // A Notification is a Request object without an "id" member. + // The Server MUST NOT reply to a Notification, including those that are within a batch request. + if request.ID == nil { + wsc.Logger.Debug( + "WSJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)", + "req", request, + ) + continue + } + + // Now, fetch the RPCFunc and execute it. + rpcFunc := wsc.funcMap[request.Method] + if rpcFunc == nil { + wsc.WriteRPCResponse(types.RPCMethodNotFoundError(request.ID)) + continue + } + + ctx := &types.Context{JSONReq: &request, WSConn: wsc} + args := []reflect.Value{reflect.ValueOf(ctx)} + if len(request.Params) > 0 { + fnArgs, err := jsonParamsToArgs(rpcFunc, wsc.cdc, request.Params) + if err != nil { + wsc.WriteRPCResponse( + types.RPCInternalError(request.ID, errors.Wrap(err, "error converting json params to arguments")), + ) + continue + } + args = append(args, fnArgs...) + } + + returns := rpcFunc.f.Call(args) + + // TODO: Need to encode args/returns to string if we want to log them + wsc.Logger.Info("WSJSONRPC", "method", request.Method) + + result, err := unreflectResult(returns) + if err != nil { + wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err)) + continue + } + + wsc.WriteRPCResponse(types.NewRPCSuccessResponse(wsc.cdc, request.ID, result)) + } + } +} + +// receives on a write channel and writes out on the socket +func (wsc *wsConnection) writeRoutine() { + pingTicker := time.NewTicker(wsc.pingPeriod) + defer func() { + pingTicker.Stop() + }() + + // https://github.com/gorilla/websocket/issues/97 + pongs := make(chan string, 1) + wsc.baseConn.SetPingHandler(func(m string) error { + select { + case pongs <- m: + default: + } + return nil + }) + + for { + select { + case <-wsc.Quit(): + return + case <-wsc.readRoutineQuit: // error in readRoutine + return + case m := <-pongs: + err := wsc.writeMessageWithDeadline(websocket.PongMessage, []byte(m)) + if err != nil { + wsc.Logger.Info("Failed to write pong (client may disconnect)", "err", err) + } + case <-pingTicker.C: + err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{}) + if err != nil { + wsc.Logger.Error("Failed to write ping", "err", err) + return + } + case msg := <-wsc.writeChan: + jsonBytes, err := json.MarshalIndent(msg, "", " ") + if err != nil { + wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err) + } else if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil { + wsc.Logger.Error("Failed to write response", "msg", msg, "err", err) + return + } + } + } +} + +// All writes to the websocket must (re)set the write deadline. +// If some writes don't set it while others do, they may timeout incorrectly +// (https://github.com/tendermint/tendermint/issues/553) +func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error { + if err := wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait)); err != nil { + return err + } + return wsc.baseConn.WriteMessage(msgType, msg) +} diff --git a/rpc/lib/server/ws_handler_test.go b/rpc/lib/server/ws_handler_test.go new file mode 100644 index 000000000..f58e17ee7 --- /dev/null +++ b/rpc/lib/server/ws_handler_test.go @@ -0,0 +1,59 @@ +package rpcserver + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" + + amino "github.com/tendermint/go-amino" + + "github.com/tendermint/tendermint/libs/log" + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +func TestWebsocketManagerHandler(t *testing.T) { + s := newWSServer() + defer s.Close() + + // check upgrader works + d := websocket.Dialer{} + c, dialResp, err := d.Dial("ws://"+s.Listener.Addr().String()+"/websocket", nil) + require.NoError(t, err) + + if got, want := dialResp.StatusCode, http.StatusSwitchingProtocols; got != want { + t.Errorf("dialResp.StatusCode = %q, want %q", got, want) + } + + // check basic functionality works + req, err := types.MapToRequest( + amino.NewCodec(), + types.JSONRPCStringID("TestWebsocketManager"), + "c", + map[string]interface{}{"s": "a", "i": 10}, + ) + require.NoError(t, err) + err = c.WriteJSON(req) + require.NoError(t, err) + + var resp types.RPCResponse + err = c.ReadJSON(&resp) + require.NoError(t, err) + require.Nil(t, resp.Error) + dialResp.Body.Close() +} + +func newWSServer() *httptest.Server { + funcMap := map[string]*RPCFunc{ + "c": NewWSRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"), + } + wm := NewWebsocketManager(funcMap, amino.NewCodec()) + wm.SetLogger(log.TestingLogger()) + + mux := http.NewServeMux() + mux.HandleFunc("/websocket", wm.WebsocketHandler) + + return httptest.NewServer(mux) +} diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index 14317d437..0ce9890e5 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -22,12 +22,14 @@ type jsonrpcid interface { // JSONRPCStringID a wrapper for JSON-RPC string IDs type JSONRPCStringID string -func (JSONRPCStringID) isJSONRPCID() {} +func (JSONRPCStringID) isJSONRPCID() {} +func (id JSONRPCStringID) String() string { return string(id) } // JSONRPCIntID a wrapper for JSON-RPC integer IDs type JSONRPCIntID int -func (JSONRPCIntID) isJSONRPCID() {} +func (JSONRPCIntID) isJSONRPCID() {} +func (id JSONRPCIntID) String() string { return fmt.Sprintf("%d", id) } func idFromInterface(idInterface interface{}) (jsonrpcid, error) { switch id := idInterface.(type) { @@ -50,16 +52,16 @@ func idFromInterface(idInterface interface{}) (jsonrpcid, error) { type RPCRequest struct { JSONRPC string `json:"jsonrpc"` - ID jsonrpcid `json:"id"` + ID jsonrpcid `json:"id,omitempty"` Method string `json:"method"` Params json.RawMessage `json:"params"` // must be map[string]interface{} or []interface{} } // UnmarshalJSON custom JSON unmarshalling due to jsonrpcid being string or int -func (request *RPCRequest) UnmarshalJSON(data []byte) error { +func (req *RPCRequest) UnmarshalJSON(data []byte) error { unsafeReq := &struct { JSONRPC string `json:"jsonrpc"` - ID interface{} `json:"id"` + ID interface{} `json:"id,omitempty"` Method string `json:"method"` Params json.RawMessage `json:"params"` // must be map[string]interface{} or []interface{} }{} @@ -67,9 +69,9 @@ func (request *RPCRequest) UnmarshalJSON(data []byte) error { if err != nil { return err } - request.JSONRPC = unsafeReq.JSONRPC - request.Method = unsafeReq.Method - request.Params = unsafeReq.Params + req.JSONRPC = unsafeReq.JSONRPC + req.Method = unsafeReq.Method + req.Params = unsafeReq.Params if unsafeReq.ID == nil { return nil } @@ -77,7 +79,7 @@ func (request *RPCRequest) UnmarshalJSON(data []byte) error { if err != nil { return err } - request.ID = id + req.ID = id return nil } @@ -91,41 +93,43 @@ func NewRPCRequest(id jsonrpcid, method string, params json.RawMessage) RPCReque } func (req RPCRequest) String() string { - return fmt.Sprintf("[%s %s]", req.ID, req.Method) + return fmt.Sprintf("RPCRequest{%s %s/%X}", req.ID, req.Method, req.Params) } func MapToRequest(cdc *amino.Codec, id jsonrpcid, method string, params map[string]interface{}) (RPCRequest, error) { - var params_ = make(map[string]json.RawMessage, len(params)) + var paramsMap = make(map[string]json.RawMessage, len(params)) for name, value := range params { valueJSON, err := cdc.MarshalJSON(value) if err != nil { return RPCRequest{}, err } - params_[name] = valueJSON + paramsMap[name] = valueJSON } - payload, err := json.Marshal(params_) // NOTE: Amino doesn't handle maps yet. + + payload, err := json.Marshal(paramsMap) // NOTE: Amino doesn't handle maps yet. if err != nil { return RPCRequest{}, err } - request := NewRPCRequest(id, method, payload) - return request, nil + + return NewRPCRequest(id, method, payload), nil } func ArrayToRequest(cdc *amino.Codec, id jsonrpcid, method string, params []interface{}) (RPCRequest, error) { - var params_ = make([]json.RawMessage, len(params)) + var paramsMap = make([]json.RawMessage, len(params)) for i, value := range params { valueJSON, err := cdc.MarshalJSON(value) if err != nil { return RPCRequest{}, err } - params_[i] = valueJSON + paramsMap[i] = valueJSON } - payload, err := json.Marshal(params_) // NOTE: Amino doesn't handle maps yet. + + payload, err := json.Marshal(paramsMap) // NOTE: Amino doesn't handle maps yet. if err != nil { return RPCRequest{}, err } - request := NewRPCRequest(id, method, payload) - return request, nil + + return NewRPCRequest(id, method, payload), nil } //---------------------------------------- @@ -147,16 +151,16 @@ func (err RPCError) Error() string { type RPCResponse struct { JSONRPC string `json:"jsonrpc"` - ID jsonrpcid `json:"id"` + ID jsonrpcid `json:"id,omitempty"` Result json.RawMessage `json:"result,omitempty"` Error *RPCError `json:"error,omitempty"` } // UnmarshalJSON custom JSON unmarshalling due to jsonrpcid being string or int -func (response *RPCResponse) UnmarshalJSON(data []byte) error { +func (resp *RPCResponse) UnmarshalJSON(data []byte) error { unsafeResp := &struct { JSONRPC string `json:"jsonrpc"` - ID interface{} `json:"id"` + ID interface{} `json:"id,omitempty"` Result json.RawMessage `json:"result,omitempty"` Error *RPCError `json:"error,omitempty"` }{} @@ -164,9 +168,9 @@ func (response *RPCResponse) UnmarshalJSON(data []byte) error { if err != nil { return err } - response.JSONRPC = unsafeResp.JSONRPC - response.Error = unsafeResp.Error - response.Result = unsafeResp.Result + resp.JSONRPC = unsafeResp.JSONRPC + resp.Error = unsafeResp.Error + resp.Result = unsafeResp.Result if unsafeResp.ID == nil { return nil } @@ -174,7 +178,7 @@ func (response *RPCResponse) UnmarshalJSON(data []byte) error { if err != nil { return err } - response.ID = id + resp.ID = id return nil } @@ -203,15 +207,21 @@ func NewRPCErrorResponse(id jsonrpcid, code int, msg string, data string) RPCRes func (resp RPCResponse) String() string { if resp.Error == nil { - return fmt.Sprintf("[%s %v]", resp.ID, resp.Result) + return fmt.Sprintf("RPCResponse{%s %v}", resp.ID, resp.Result) } - return fmt.Sprintf("[%s %s]", resp.ID, resp.Error) + return fmt.Sprintf("RPCResponse{%s %v}", resp.ID, resp.Error) } -func RPCParseError(id jsonrpcid, err error) RPCResponse { - return NewRPCErrorResponse(id, -32700, "Parse error. Invalid JSON", err.Error()) +// From the JSON-RPC 2.0 spec: +// If there was an error in detecting the id in the Request object (e.g. Parse +// error/Invalid Request), it MUST be Null. +func RPCParseError(err error) RPCResponse { + return NewRPCErrorResponse(nil, -32700, "Parse error. Invalid JSON", err.Error()) } +// From the JSON-RPC 2.0 spec: +// If there was an error in detecting the id in the Request object (e.g. Parse +// error/Invalid Request), it MUST be Null. func RPCInvalidRequestError(id jsonrpcid, err error) RPCResponse { return NewRPCErrorResponse(id, -32600, "Invalid Request", err.Error()) } diff --git a/rpc/lib/types/types_test.go b/rpc/lib/types/types_test.go index d6cf1b397..e2a98a18f 100644 --- a/rpc/lib/types/types_test.go +++ b/rpc/lib/types/types_test.go @@ -41,12 +41,9 @@ func TestResponses(t *testing.T) { s := fmt.Sprintf(`{"jsonrpc":"2.0","id":%v,"result":{"Value":"hello"}}`, tt.expected) assert.Equal(s, string(b)) - d := RPCParseError(jsonid, errors.New("Hello world")) + d := RPCParseError(errors.New("Hello world")) e, _ := json.Marshal(d) - f := fmt.Sprintf( - `{"jsonrpc":"2.0","id":%v,"error":{"code":-32700,"message":"Parse error. Invalid JSON","data":"Hello world"}}`, - tt.expected, - ) + f := `{"jsonrpc":"2.0","error":{"code":-32700,"message":"Parse error. Invalid JSON","data":"Hello world"}}` assert.Equal(f, string(e)) g := RPCMethodNotFoundError(jsonid) diff --git a/rpc/swagger/swagger.yaml b/rpc/swagger/swagger.yaml index dc41ce47d..066d62102 100644 --- a/rpc/swagger/swagger.yaml +++ b/rpc/swagger/swagger.yaml @@ -918,11 +918,11 @@ definitions: type: object properties: id: - type: string - x-example: "" + type: number + example: 0 jsonrpc: type: string - x-example: "2.0" + example: "2.0" EmptyResponse: description: Empty Response allOf: @@ -1414,8 +1414,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: type: "object" required: @@ -1590,8 +1590,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "signed_header" @@ -1779,8 +1779,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "block_height" @@ -1827,8 +1827,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: type: "object" required: @@ -1935,8 +1935,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "round_state" @@ -2280,8 +2280,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "round_state" @@ -2351,8 +2351,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: type: "object" required: @@ -2416,8 +2416,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "n_txs" @@ -2453,8 +2453,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "n_txs" @@ -2491,8 +2491,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "txs" @@ -2594,8 +2594,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "hash" @@ -2655,8 +2655,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "response" @@ -2727,8 +2727,8 @@ definitions: type: "object" type: "object" id: - type: "string" - example: "" + type: "number" + example: 0 jsonrpc: type: "string" example: "2.0" @@ -2745,8 +2745,8 @@ definitions: type: "string" example: "" id: - type: "string" - example: "" + type: "number" + example: 0 jsonrpc: type: "string" example: "2.0" @@ -2808,8 +2808,8 @@ definitions: type: "object" type: "object" id: - type: "string" - example: "" + type: "number" + example: 0 jsonrpc: type: "string" example: "2.0" @@ -2825,8 +2825,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "code" diff --git a/tools/tm-bench/transacter.go b/tools/tm-bench/transacter.go index 57324de35..f453ebcf7 100644 --- a/tools/tm-bench/transacter.go +++ b/tools/tm-bench/transacter.go @@ -194,7 +194,7 @@ func (t *transacter) sendLoop(connIndex int) { c.SetWriteDeadline(now.Add(sendTimeout)) err = c.WriteJSON(rpctypes.RPCRequest{ JSONRPC: "2.0", - ID: rpctypes.JSONRPCStringID("tm-bench"), + ID: rpctypes.JSONRPCIntID((connIndex * t.Rate) + i), Method: t.BroadcastTxMethod, Params: rawParamsJSON, })