diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 498104a8c..a7efc7e51 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -28,6 +28,17 @@ program](https://hackerone.com/tendermint). } } ``` + - [rpc] [\#4141](https://github.com/tendermint/tendermint/pull/4141) Remove `#event` suffix from the ID in event responses. + `{"jsonrpc": "2.0", "id": 0, "result": ...}` + - [rpc] [\#4141](https://github.com/tendermint/tendermint/pull/4141) Switch to integer IDs instead of `json-client-XYZ` + ``` + id=0 method=/subscribe + id=0 result=... + id=1 method=/abci_query + id=1 result=... + ``` + * ID is unique for each request; + * Request.ID is now optional. Notification is a Request without an ID. Previously ID="" or ID=0 were considered as notifications. - Apps @@ -63,3 +74,5 @@ program](https://hackerone.com/tendermint). - [state] [\#4104](https://github.com/tendermint/tendermint/pull/4104) txindex/kv: Fsync data to disk immediately after receiving it (@guagualvcha) - [state] [\#4095](https://github.com/tendermint/tendermint/pull/4095) txindex/kv: Return an error if there's one when the user searches for a tx (hash=X) (@hsyis) - [rpc/lib] [\#4051](https://github.com/tendermint/tendermint/pull/4131) Fix RPC client, which was previously resolving https protocol to http (@yenkhoon) +- [rpc] [\#4141](https://github.com/tendermint/tendermint/pull/4141) JSONRPCClient: validate that Response.ID matches Request.ID +- [rpc] [\#4141](https://github.com/tendermint/tendermint/pull/4141) WSClient: check for unsolicited responses diff --git a/docs/app-dev/subscribing-to-events-via-websocket.md b/docs/app-dev/subscribing-to-events-via-websocket.md index c07a343cd..afedc1d59 100644 --- a/docs/app-dev/subscribing-to-events-via-websocket.md +++ b/docs/app-dev/subscribing-to-events-via-websocket.md @@ -17,7 +17,7 @@ method via Websocket. { "jsonrpc": "2.0", "method": "subscribe", - "id": "0", + "id": 0, "params": { "query": "tm.event='NewBlock'" } @@ -44,7 +44,7 @@ Response: ``` { "jsonrpc": "2.0", - "id": "0#event", + "id": 0, "result": { "query": "tm.event='ValidatorSetUpdates'", "data": { diff --git a/lite/proxy/wrapper.go b/lite/proxy/wrapper.go index 2f608cfa9..d3a69cc3f 100644 --- a/lite/proxy/wrapper.go +++ b/lite/proxy/wrapper.go @@ -2,7 +2,6 @@ package proxy import ( "context" - "fmt" cmn "github.com/tendermint/tendermint/libs/common" @@ -165,7 +164,7 @@ func (w Wrapper) SubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.Resul ctx.WSConn.TryWriteRPCResponse( rpctypes.NewRPCSuccessResponse( ctx.WSConn.Codec(), - rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)), + ctx.JSONReq.ID, resultEvent, )) case <-w.Client.Quit(): diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index a982f582c..27712ce31 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -12,6 +12,7 @@ import ( amino "github.com/tendermint/go-amino" cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpcclient "github.com/tendermint/tendermint/rpc/lib/client" @@ -112,6 +113,10 @@ func NewHTTPWithClient(remote, wsEndpoint string, client *http.Client) *HTTP { var _ Client = (*HTTP)(nil) +func (c *HTTP) SetLogger(l log.Logger) { + c.WSEvents.SetLogger(l) +} + // NewBatch creates a new batch client for this HTTP client. func (c *HTTP) NewBatch() *BatchHTTP { rpcBatch := c.rpc.NewRequestBatch() @@ -395,6 +400,7 @@ func (w *WSEvents) OnStart() error { w.redoSubscriptionsAfter(0 * time.Second) })) w.ws.SetCodec(w.cdc) + w.ws.SetLogger(w.Logger) err := w.ws.Start() if err != nil { diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index f1718fec5..34d41d0e1 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -17,6 +17,7 @@ import ( "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/tmhash" cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/rpc/client" @@ -28,7 +29,9 @@ import ( func getHTTPClient() *client.HTTP { rpcAddr := rpctest.GetConfig().RPC.ListenAddress - return client.NewHTTP(rpcAddr, "/websocket") + c := client.NewHTTP(rpcAddr, "/websocket") + c.SetLogger(log.TestingLogger()) + return c } func getLocalClient() *client.Local { diff --git a/rpc/core/events.go b/rpc/core/events.go index 61f21c000..49bf7be5d 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -46,7 +46,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er ctx.WSConn.TryWriteRPCResponse( rpctypes.NewRPCSuccessResponse( ctx.WSConn.Codec(), - rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)), + ctx.JSONReq.ID, resultEvent, )) case <-sub.Cancelled(): @@ -58,8 +58,8 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er reason = sub.Err().Error() } ctx.WSConn.TryWriteRPCResponse( - rpctypes.RPCServerError(rpctypes.JSONRPCStringID( - fmt.Sprintf("%v#event", ctx.JSONReq.ID)), + rpctypes.RPCServerError( + ctx.JSONReq.ID, fmt.Errorf("subscription was cancelled (reason: %s)", reason), )) } diff --git a/rpc/lib/client/decode.go b/rpc/lib/client/decode.go new file mode 100644 index 000000000..dd4a2e4c6 --- /dev/null +++ b/rpc/lib/client/decode.go @@ -0,0 +1,129 @@ +package rpcclient + +import ( + "encoding/json" + + "github.com/pkg/errors" + + amino "github.com/tendermint/go-amino" + + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +func unmarshalResponseBytes( + cdc *amino.Codec, + responseBytes []byte, + expectedID types.JSONRPCIntID, + result interface{}, +) (interface{}, error) { + + // Read response. If rpc/core/types is imported, the result will unmarshal + // into the correct type. + response := &types.RPCResponse{} + if err := json.Unmarshal(responseBytes, response); err != nil { + return nil, errors.Wrap(err, "error unmarshalling") + } + + if response.Error != nil { + return nil, response.Error + } + + if err := validateAndVerifyID(response, expectedID); err != nil { + return nil, errors.Wrap(err, "wrong ID") + } + + // Unmarshal the RawMessage into the result. + if err := cdc.UnmarshalJSON(response.Result, result); err != nil { + return nil, errors.Wrap(err, "error unmarshalling result") + } + + return result, nil +} + +func unmarshalResponseBytesArray( + cdc *amino.Codec, + responseBytes []byte, + expectedIDs []types.JSONRPCIntID, + results []interface{}, +) ([]interface{}, error) { + + var ( + responses []types.RPCResponse + ) + + if err := json.Unmarshal(responseBytes, &responses); err != nil { + return nil, errors.Wrap(err, "error unmarshalling") + } + + // No response error checking here as there may be a mixture of successful + // and unsuccessful responses. + + if len(results) != len(responses) { + return nil, errors.Errorf( + "expected %d result objects into which to inject responses, but got %d", + len(responses), + len(results), + ) + } + + // Intersect IDs from responses with expectedIDs. + ids := make([]types.JSONRPCIntID, len(responses)) + var ok bool + for i, resp := range responses { + ids[i], ok = resp.ID.(types.JSONRPCIntID) + if !ok { + return nil, errors.Errorf("expected JSONRPCIntID, got %T", resp.ID) + } + } + if err := validateResponseIDs(ids, expectedIDs); err != nil { + return nil, errors.Wrap(err, "wrong IDs") + } + + for i := 0; i < len(responses); i++ { + if err := cdc.UnmarshalJSON(responses[i].Result, results[i]); err != nil { + return nil, errors.Wrapf(err, "error unmarshalling #%d result", i) + } + } + + return results, nil +} + +func validateResponseIDs(ids, expectedIDs []types.JSONRPCIntID) error { + m := make(map[types.JSONRPCIntID]bool, len(expectedIDs)) + for _, expectedID := range expectedIDs { + m[expectedID] = true + } + + for i, id := range ids { + if m[id] { + delete(m, id) + } else { + return errors.Errorf("unsolicited ID #%d: %v", i, id) + } + } + + return nil +} + +// From the JSON-RPC 2.0 spec: +// id: It MUST be the same as the value of the id member in the Request Object. +func validateAndVerifyID(res *types.RPCResponse, expectedID types.JSONRPCIntID) error { + if err := validateResponseID(res.ID); err != nil { + return err + } + if expectedID != res.ID.(types.JSONRPCIntID) { // validateResponseID ensured res.ID has the right type + return errors.Errorf("response ID (%d) does not match request ID (%d)", res.ID, expectedID) + } + return nil +} + +func validateResponseID(id interface{}) error { + if id == nil { + return errors.New("no ID") + } + _, ok := id.(types.JSONRPCIntID) + if !ok { + return errors.Errorf("expected JSONRPCIntID, but got: %T", id) + } + return nil +} diff --git a/rpc/lib/client/encode.go b/rpc/lib/client/encode.go new file mode 100644 index 000000000..227367f59 --- /dev/null +++ b/rpc/lib/client/encode.go @@ -0,0 +1,46 @@ +package rpcclient + +import ( + "fmt" + "net/url" + "reflect" + + amino "github.com/tendermint/go-amino" +) + +func argsToURLValues(cdc *amino.Codec, args map[string]interface{}) (url.Values, error) { + values := make(url.Values) + if len(args) == 0 { + return values, nil + } + + err := argsToJSON(cdc, args) + if err != nil { + return nil, err + } + + for key, val := range args { + values.Set(key, val.(string)) + } + + return values, nil +} + +func argsToJSON(cdc *amino.Codec, args map[string]interface{}) error { + for k, v := range args { + rt := reflect.TypeOf(v) + isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8 + if isByteSlice { + bytes := reflect.ValueOf(v).Bytes() + args[k] = fmt.Sprintf("0x%X", bytes) + continue + } + + data, err := cdc.MarshalJSON(v) + if err != nil { + return err + } + args[k] = string(data) + } + return nil +} diff --git a/rpc/lib/client/http_client.go b/rpc/lib/client/http_json_client.go similarity index 58% rename from rpc/lib/client/http_client.go rename to rpc/lib/client/http_json_client.go index 0673177c3..d828a3aa6 100644 --- a/rpc/lib/client/http_client.go +++ b/rpc/lib/client/http_json_client.go @@ -7,16 +7,12 @@ import ( "io/ioutil" "net" "net/http" - "net/url" - "reflect" "strings" "sync" "github.com/pkg/errors" - amino "github.com/tendermint/go-amino" - cmn "github.com/tendermint/tendermint/libs/common" types "github.com/tendermint/tendermint/rpc/lib/types" ) @@ -28,131 +24,43 @@ const ( protoTCP = "tcp" ) -// HTTPClient is a common interface for JSONRPCClient and URIClient. +//------------------------------------------------------------- + +// HTTPClient is a common interface for JSON-RPC HTTP clients. type HTTPClient interface { + // Call calls the given method with the params and returns a result. Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) + // Codec returns an amino codec used. Codec() *amino.Codec + // SetCodec sets an amino codec. SetCodec(*amino.Codec) } -// protocol - client's protocol (for example, "http", "https", "wss", "ws", "tcp") -// trimmedS - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80") with "/" replaced with "." -func toClientAddrAndParse(remoteAddr string) (network string, trimmedS string, err error) { - protocol, address, err := parseRemoteAddr(remoteAddr) - if err != nil { - return "", "", err - } - - // protocol to use for http operations, to support both http and https - var clientProtocol string - // default to http for unknown protocols (ex. tcp) - switch protocol { - case protoHTTP, protoHTTPS, protoWS, protoWSS: - clientProtocol = protocol - default: - clientProtocol = protoHTTP - } - - // replace / with . for http requests (kvstore domain) - trimmedAddress := strings.Replace(address, "/", ".", -1) - return clientProtocol, trimmedAddress, nil -} - -func toClientAddress(remoteAddr string) (string, error) { - clientProtocol, trimmedAddress, err := toClientAddrAndParse(remoteAddr) - if err != nil { - return "", err - } - return clientProtocol + "://" + trimmedAddress, nil -} - -// network - name of the network (for example, "tcp", "unix") -// s - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80") -// TODO: Deprecate support for IP:PORT or /path/to/socket -func parseRemoteAddr(remoteAddr string) (network string, s string, err error) { - parts := strings.SplitN(remoteAddr, "://", 2) - var protocol, address string - switch { - case len(parts) == 1: - // default to tcp if nothing specified - protocol, address = protoTCP, remoteAddr - case len(parts) == 2: - protocol, address = parts[0], parts[1] - default: - return "", "", fmt.Errorf("invalid addr: %s", remoteAddr) - } - - return protocol, address, nil -} - -func makeErrorDialer(err error) func(string, string) (net.Conn, error) { - return func(_ string, _ string) (net.Conn, error) { - return nil, err - } -} - -func makeHTTPDialer(remoteAddr string) func(string, string) (net.Conn, error) { - protocol, address, err := parseRemoteAddr(remoteAddr) - if err != nil { - return makeErrorDialer(err) - } - - // accept http(s) as an alias for tcp - switch protocol { - case protoHTTP, protoHTTPS: - protocol = protoTCP - } - - return func(proto, addr string) (net.Conn, error) { - return net.Dial(protocol, address) - } -} - -// DefaultHTTPClient is used to create an http client with some default parameters. -// We overwrite the http.Client.Dial so we can do http over tcp or unix. -// remoteAddr should be fully featured (eg. with tcp:// or unix://) -func DefaultHTTPClient(remoteAddr string) *http.Client { - return &http.Client{ - Transport: &http.Transport{ - // Set to true to prevent GZIP-bomb DoS attacks - DisableCompression: true, - Dial: makeHTTPDialer(remoteAddr), - }, - } -} - -//------------------------------------------------------------------------------------ - -// jsonRPCBufferedRequest encapsulates a single buffered request, as well as its -// anticipated response structure. -type jsonRPCBufferedRequest struct { - request types.RPCRequest - result interface{} // The result will be deserialized into this object. +// JSONRPCCaller implementers can facilitate calling the JSON-RPC endpoint. +type JSONRPCCaller interface { + Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) } -// JSONRPCRequestBatch allows us to buffer multiple request/response structures -// into a single batch request. Note that this batch acts like a FIFO queue, and -// is thread-safe. -type JSONRPCRequestBatch struct { - client *JSONRPCClient - - mtx sync.Mutex - requests []*jsonRPCBufferedRequest -} +//------------------------------------------------------------- -// JSONRPCClient takes params as a slice +// JSONRPCClient is a JSON-RPC client, which sends POST HTTP requests to the +// remote server. +// +// Request values are amino encoded. Response is expected to be amino encoded. +// New amino codec is used if no other codec was set using SetCodec. +// +// JSONRPCClient is safe for concurrent use by multiple goroutines. type JSONRPCClient struct { address string client *http.Client - id types.JSONRPCStringID cdc *amino.Codec -} -// JSONRPCCaller implementers can facilitate calling the JSON RPC endpoint. -type JSONRPCCaller interface { - Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) + mtx sync.Mutex + nextReqID int } +var _ HTTPClient = (*JSONRPCClient)(nil) + // Both JSONRPCClient and JSONRPCRequestBatch can facilitate calls to the JSON // RPC endpoint. var _ JSONRPCCaller = (*JSONRPCClient)(nil) @@ -178,36 +86,43 @@ func NewJSONRPCClientWithHTTPClient(remote string, client *http.Client) *JSONRPC return &JSONRPCClient{ address: clientAddress, client: client, - id: types.JSONRPCStringID("jsonrpc-client-" + cmn.RandStr(8)), cdc: amino.NewCodec(), } } -// Call will send the request for the given method through to the RPC endpoint -// immediately, without buffering of requests. +// Call issues a POST HTTP request. Requests are JSON encoded. Content-Type: +// text/json. func (c *JSONRPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { - request, err := types.MapToRequest(c.cdc, c.id, method, params) + id := c.nextRequestID() + + request, err := types.MapToRequest(c.cdc, id, method, params) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to encode params") } + requestBytes, err := json.Marshal(request) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to marshal request") } + requestBuf := bytes.NewBuffer(requestBytes) httpResponse, err := c.client.Post(c.address, "text/json", requestBuf) if err != nil { - return nil, err + return nil, errors.Wrap(err, "Post failed") } defer httpResponse.Body.Close() // nolint: errcheck responseBytes, err := ioutil.ReadAll(httpResponse.Body) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to read response body") } - return unmarshalResponseBytes(c.cdc, responseBytes, c.id, result) + + return unmarshalResponseBytes(c.cdc, responseBytes, id, result) } +func (c *JSONRPCClient) Codec() *amino.Codec { return c.cdc } +func (c *JSONRPCClient) SetCodec(cdc *amino.Codec) { c.cdc = cdc } + // NewRequestBatch starts a batch of requests for this client. func (c *JSONRPCClient) NewRequestBatch() *JSONRPCRequestBatch { return &JSONRPCRequestBatch{ @@ -223,33 +138,59 @@ func (c *JSONRPCClient) sendBatch(requests []*jsonRPCBufferedRequest) ([]interfa reqs = append(reqs, req.request) results = append(results, req.result) } + // serialize the array of requests into a single JSON object requestBytes, err := json.Marshal(reqs) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to marshal requests") } + httpResponse, err := c.client.Post(c.address, "text/json", bytes.NewBuffer(requestBytes)) if err != nil { - return nil, err + return nil, errors.Wrap(err, "Post failed") } defer httpResponse.Body.Close() // nolint: errcheck responseBytes, err := ioutil.ReadAll(httpResponse.Body) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to read response body") } - return unmarshalResponseBytesArray(c.cdc, responseBytes, c.id, results) + + // collect ids to check responses IDs in unmarshalResponseBytesArray + ids := make([]types.JSONRPCIntID, len(requests)) + for i, req := range requests { + ids[i] = req.request.ID.(types.JSONRPCIntID) + } + + return unmarshalResponseBytesArray(c.cdc, responseBytes, ids, results) } -func (c *JSONRPCClient) Codec() *amino.Codec { - return c.cdc +func (c *JSONRPCClient) nextRequestID() types.JSONRPCIntID { + c.mtx.Lock() + id := c.nextReqID + c.nextReqID++ + c.mtx.Unlock() + return types.JSONRPCIntID(id) } -func (c *JSONRPCClient) SetCodec(cdc *amino.Codec) { - c.cdc = cdc +//------------------------------------------------------------------------------------ + +// jsonRPCBufferedRequest encapsulates a single buffered request, as well as its +// anticipated response structure. +type jsonRPCBufferedRequest struct { + request types.RPCRequest + result interface{} // The result will be deserialized into this object. } -//------------------------------------------------------------- +// JSONRPCRequestBatch allows us to buffer multiple request/response structures +// into a single batch request. Note that this batch acts like a FIFO queue, and +// is thread-safe. +type JSONRPCRequestBatch struct { + client *JSONRPCClient + + mtx sync.Mutex + requests []*jsonRPCBufferedRequest +} // Count returns the number of enqueued requests waiting to be sent. func (b *JSONRPCRequestBatch) Count() int { @@ -296,7 +237,8 @@ func (b *JSONRPCRequestBatch) Call( params map[string]interface{}, result interface{}, ) (interface{}, error) { - request, err := types.MapToRequest(b.client.cdc, b.client.id, method, params) + id := b.client.nextRequestID() + request, err := types.MapToRequest(b.client.cdc, id, method, params) if err != nil { return nil, err } @@ -306,173 +248,88 @@ func (b *JSONRPCRequestBatch) Call( //------------------------------------------------------------- -// URI takes params as a map -type URIClient struct { - address string - client *http.Client - cdc *amino.Codec -} - -// The function panics if the provided remote is invalid. -func NewURIClient(remote string) *URIClient { - clientAddress, err := toClientAddress(remote) +// protocol - client's protocol (for example, "http", "https", "wss", "ws", "tcp") +// trimmedS - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80") with "/" replaced with "." +func toClientAddrAndParse(remoteAddr string) (network string, trimmedS string, err error) { + protocol, address, err := parseRemoteAddr(remoteAddr) if err != nil { - panic(fmt.Sprintf("invalid remote %s: %s", remote, err)) - } - return &URIClient{ - address: clientAddress, - client: DefaultHTTPClient(remote), - cdc: amino.NewCodec(), + return "", "", err } -} -func (c *URIClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { - values, err := argsToURLValues(c.cdc, params) - if err != nil { - return nil, err - } - // log.Info(Fmt("URI request to %v (%v): %v", c.address, method, values)) - resp, err := c.client.PostForm(c.address+"/"+method, values) - if err != nil { - return nil, err + // protocol to use for http operations, to support both http and https + var clientProtocol string + // default to http for unknown protocols (ex. tcp) + switch protocol { + case protoHTTP, protoHTTPS, protoWS, protoWSS: + clientProtocol = protocol + default: + clientProtocol = protoHTTP } - defer resp.Body.Close() // nolint: errcheck - responseBytes, err := ioutil.ReadAll(resp.Body) + // replace / with . for http requests (kvstore domain) + trimmedAddress := strings.Replace(address, "/", ".", -1) + return clientProtocol, trimmedAddress, nil +} + +func toClientAddress(remoteAddr string) (string, error) { + clientProtocol, trimmedAddress, err := toClientAddrAndParse(remoteAddr) if err != nil { - return nil, err + return "", err } - return unmarshalResponseBytes(c.cdc, responseBytes, "", result) + return clientProtocol + "://" + trimmedAddress, nil } -func (c *URIClient) Codec() *amino.Codec { - return c.cdc -} +// network - name of the network (for example, "tcp", "unix") +// s - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80") +// TODO: Deprecate support for IP:PORT or /path/to/socket +func parseRemoteAddr(remoteAddr string) (network string, s string, err error) { + parts := strings.SplitN(remoteAddr, "://", 2) + var protocol, address string + switch len(parts) { + case 1: + // default to tcp if nothing specified + protocol, address = protoTCP, remoteAddr + case 2: + protocol, address = parts[0], parts[1] + default: + return "", "", fmt.Errorf("invalid addr: %s", remoteAddr) + } -func (c *URIClient) SetCodec(cdc *amino.Codec) { - c.cdc = cdc + return protocol, address, nil } -//------------------------------------------------ - -func unmarshalResponseBytes( - cdc *amino.Codec, - responseBytes []byte, - expectedID types.JSONRPCStringID, - result interface{}, -) (interface{}, error) { - // Read response. If rpc/core/types is imported, the result will unmarshal - // into the correct type. - // log.Notice("response", "response", string(responseBytes)) - var err error - response := &types.RPCResponse{} - err = json.Unmarshal(responseBytes, response) - if err != nil { - return nil, errors.Wrap(err, "error unmarshalling rpc response") - } - if response.Error != nil { - return nil, errors.Wrap(response.Error, "response error") - } - // From the JSON-RPC 2.0 spec: - // id: It MUST be the same as the value of the id member in the Request Object. - if err := validateResponseID(response, expectedID); err != nil { +func makeErrorDialer(err error) func(string, string) (net.Conn, error) { + return func(_ string, _ string) (net.Conn, error) { return nil, err } - // Unmarshal the RawMessage into the result. - err = cdc.UnmarshalJSON(response.Result, result) - if err != nil { - return nil, errors.Wrap(err, "error unmarshalling rpc response result") - } - return result, nil } -func unmarshalResponseBytesArray( - cdc *amino.Codec, - responseBytes []byte, - expectedID types.JSONRPCStringID, - results []interface{}, -) ([]interface{}, error) { - var ( - err error - responses []types.RPCResponse - ) - err = json.Unmarshal(responseBytes, &responses) +func makeHTTPDialer(remoteAddr string) func(string, string) (net.Conn, error) { + protocol, address, err := parseRemoteAddr(remoteAddr) if err != nil { - return nil, errors.Wrap(err, "error unmarshalling rpc response") - } - // No response error checking here as there may be a mixture of successful - // and unsuccessful responses. - - if len(results) != len(responses) { - return nil, errors.Errorf( - "expected %d result objects into which to inject responses, but got %d", - len(responses), - len(results), - ) - } - - for i, response := range responses { - response := response - // From the JSON-RPC 2.0 spec: - // id: It MUST be the same as the value of the id member in the Request Object. - if err := validateResponseID(&response, expectedID); err != nil { - return nil, errors.Wrapf(err, "failed to validate response ID in response %d", i) - } - if err := cdc.UnmarshalJSON(responses[i].Result, results[i]); err != nil { - return nil, errors.Wrap(err, "error unmarshalling rpc response result") - } + return makeErrorDialer(err) } - return results, nil -} -func validateResponseID(res *types.RPCResponse, expectedID types.JSONRPCStringID) error { - // we only validate a response ID if the expected ID is non-empty - if len(expectedID) == 0 { - return nil - } - if res.ID == nil { - return errors.Errorf("missing ID in response") - } - id, ok := res.ID.(types.JSONRPCStringID) - if !ok { - return errors.Errorf("expected ID string in response but got: %v", id) - } - if expectedID != id { - return errors.Errorf("response ID (%s) does not match request ID (%s)", id, expectedID) + // accept http(s) as an alias for tcp + switch protocol { + case protoHTTP, protoHTTPS: + protocol = protoTCP } - return nil -} -func argsToURLValues(cdc *amino.Codec, args map[string]interface{}) (url.Values, error) { - values := make(url.Values) - if len(args) == 0 { - return values, nil - } - err := argsToJSON(cdc, args) - if err != nil { - return nil, err - } - for key, val := range args { - values.Set(key, val.(string)) + return func(proto, addr string) (net.Conn, error) { + return net.Dial(protocol, address) } - return values, nil } -func argsToJSON(cdc *amino.Codec, args map[string]interface{}) error { - for k, v := range args { - rt := reflect.TypeOf(v) - isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8 - if isByteSlice { - bytes := reflect.ValueOf(v).Bytes() - args[k] = fmt.Sprintf("0x%X", bytes) - continue - } - - data, err := cdc.MarshalJSON(v) - if err != nil { - return err - } - args[k] = string(data) +// DefaultHTTPClient is used to create an http client with some default parameters. +// We overwrite the http.Client.Dial so we can do http over tcp or unix. +// remoteAddr should be fully featured (eg. with tcp:// or unix://) +func DefaultHTTPClient(remoteAddr string) *http.Client { + return &http.Client{ + Transport: &http.Transport{ + // Set to true to prevent GZIP-bomb DoS attacks + DisableCompression: true, + Dial: makeHTTPDialer(remoteAddr), + }, } - return nil } diff --git a/rpc/lib/client/http_client_test.go b/rpc/lib/client/http_json_client_test.go similarity index 100% rename from rpc/lib/client/http_client_test.go rename to rpc/lib/client/http_json_client_test.go diff --git a/rpc/lib/client/http_uri_client.go b/rpc/lib/client/http_uri_client.go new file mode 100644 index 000000000..8efbd6608 --- /dev/null +++ b/rpc/lib/client/http_uri_client.go @@ -0,0 +1,71 @@ +package rpcclient + +import ( + "fmt" + "io/ioutil" + "net/http" + + "github.com/pkg/errors" + + amino "github.com/tendermint/go-amino" + + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +const ( + // URIClientRequestID in a request ID used by URIClient + URIClientRequestID = types.JSONRPCIntID(-1) +) + +// URIClient is a JSON-RPC client, which sends POST form HTTP requests to the +// remote server. +// +// Request values are amino encoded. Response is expected to be amino encoded. +// New amino codec is used if no other codec was set using SetCodec. +// +// URIClient is safe for concurrent use by multiple goroutines. +type URIClient struct { + address string + client *http.Client + cdc *amino.Codec +} + +var _ HTTPClient = (*URIClient)(nil) + +// NewURIClient returns a new client. +// The function panics if the provided remote is invalid. +func NewURIClient(remote string) *URIClient { + clientAddress, err := toClientAddress(remote) + if err != nil { + panic(fmt.Sprintf("invalid remote %s: %s", remote, err)) + } + return &URIClient{ + address: clientAddress, + client: DefaultHTTPClient(remote), + cdc: amino.NewCodec(), + } +} + +// Call issues a POST form HTTP request. +func (c *URIClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { + values, err := argsToURLValues(c.cdc, params) + if err != nil { + return nil, errors.Wrap(err, "failed to encode params") + } + + resp, err := c.client.PostForm(c.address+"/"+method, values) + if err != nil { + return nil, errors.Wrap(err, "PostForm failed") + } + defer resp.Body.Close() // nolint: errcheck + + responseBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Wrap(err, "failed to read response body") + } + + return unmarshalResponseBytes(c.cdc, responseBytes, URIClientRequestID, result) +} + +func (c *URIClient) Codec() *amino.Codec { return c.cdc } +func (c *URIClient) SetCodec(cdc *amino.Codec) { c.cdc = cdc } diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index b21eefa7c..976d24c70 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -25,8 +25,10 @@ const ( defaultPingPeriod = 0 ) -// WSClient is a WebSocket client. The methods of WSClient are safe for use by -// multiple goroutines. +// WSClient is a JSON-RPC client, which uses WebSocket for communication with +// the remote server. +// +// WSClient is safe for concurrent use by multiple goroutines. type WSClient struct { // nolint: maligned conn *websocket.Conn cdc *amino.Codec @@ -35,7 +37,8 @@ type WSClient struct { // nolint: maligned Endpoint string // /websocket/url/endpoint Dialer func(string, string) (net.Conn, error) - // Single user facing channel to read RPCResponses from, closed only when the client is being stopped. + // Single user facing channel to read RPCResponses from, closed only when the + // client is being stopped. ResponsesCh chan types.RPCResponse // Callback, which will be called each time after successful reconnect. @@ -58,6 +61,8 @@ type WSClient struct { // nolint: maligned mtx sync.RWMutex sentLastPingAt time.Time reconnecting bool + nextReqID int + // sentIDs map[types.JSONRPCIntID]bool // IDs of the requests currently in flight // Time allowed to write a message to the server. 0 means block until operation succeeds. writeWait time.Duration @@ -101,6 +106,8 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSCli writeWait: defaultWriteWait, pingPeriod: defaultPingPeriod, protocol: protocol, + + // sentIDs: make(map[types.JSONRPCIntID]bool), } c.BaseService = *cmn.NewBaseService(nil, "WSClient", c) for _, option := range options { @@ -151,7 +158,7 @@ func OnReconnect(cb func()) func(*WSClient) { // String returns WS client full address. func (c *WSClient) String() string { - return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint) + return fmt.Sprintf("WSClient{%s (%s)}", c.Address, c.Endpoint) } // OnStart implements cmn.Service by dialing a server and creating read and @@ -210,42 +217,48 @@ func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error { select { case c.send <- request: c.Logger.Info("sent a request", "req", request) + // c.mtx.Lock() + // c.sentIDs[request.ID.(types.JSONRPCIntID)] = true + // c.mtx.Unlock() return nil case <-ctx.Done(): return ctx.Err() } } -// Call the given method. See Send description. +// Call enqueues a call request onto the Send queue. Requests are JSON encoded. func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error { - request, err := types.MapToRequest(c.cdc, types.JSONRPCStringID("ws-client"), method, params) + request, err := types.MapToRequest(c.cdc, c.nextRequestID(), method, params) if err != nil { return err } return c.Send(ctx, request) } -// CallWithArrayParams the given method with params in a form of array. See -// Send description. +// CallWithArrayParams enqueues a call request onto the Send queue. Params are +// in a form of array (e.g. []interface{}{"abcd"}). Requests are JSON encoded. func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error { - request, err := types.ArrayToRequest(c.cdc, types.JSONRPCStringID("ws-client"), method, params) + request, err := types.ArrayToRequest(c.cdc, c.nextRequestID(), method, params) if err != nil { return err } return c.Send(ctx, request) } -func (c *WSClient) Codec() *amino.Codec { - return c.cdc -} - -func (c *WSClient) SetCodec(cdc *amino.Codec) { - c.cdc = cdc -} +func (c *WSClient) Codec() *amino.Codec { return c.cdc } +func (c *WSClient) SetCodec(cdc *amino.Codec) { c.cdc = cdc } /////////////////////////////////////////////////////////////////////////////// // Private methods +func (c *WSClient) nextRequestID() types.JSONRPCIntID { + c.mtx.Lock() + id := c.nextReqID + c.nextReqID++ + c.mtx.Unlock() + return types.JSONRPCIntID(id) +} + func (c *WSClient) dial() error { dialer := &websocket.Dialer{ NetDial: c.Dialer, @@ -473,10 +486,31 @@ func (c *WSClient) readRoutine() { c.Logger.Error("failed to parse response", "err", err, "data", string(data)) continue } - c.Logger.Info("got response", "resp", response.Result) + + if err = validateResponseID(response.ID); err != nil { + c.Logger.Error("error in response ID", "id", response.ID, "err", err) + continue + } + + // TODO: events resulting from /subscribe do not work with -> + // because they are implemented as responses with the subscribe request's + // ID. According to the spec, they should be notifications (requests + // without IDs). + // https://github.com/tendermint/tendermint/issues/2949 + // c.mtx.Lock() + // if _, ok := c.sentIDs[response.ID.(types.JSONRPCIntID)]; !ok { + // c.Logger.Error("unsolicited response ID", "id", response.ID, "expected", c.sentIDs) + // c.mtx.Unlock() + // continue + // } + // delete(c.sentIDs, response.ID.(types.JSONRPCIntID)) + // c.mtx.Unlock() // Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking // c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop // both readRoutine and writeRoutine + + c.Logger.Info("got response", "id", response.ID, "result", fmt.Sprintf("%X", response.Result)) + select { case <-c.Quit(): case c.ResponsesCh <- response: diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index 1babdae92..67dfd418d 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -35,11 +35,17 @@ func (h *myHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } defer conn.Close() // nolint: errcheck for { - messageType, _, err := conn.ReadMessage() + messageType, in, err := conn.ReadMessage() if err != nil { return } + var req types.RPCRequest + err = json.Unmarshal(in, &req) + if err != nil { + panic(err) + } + h.mtx.RLock() if h.closeConnAfterRead { if err := conn.Close(); err != nil { @@ -49,7 +55,7 @@ func (h *myHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.mtx.RUnlock() res := json.RawMessage(`{}`) - emptyRespBytes, _ := json.Marshal(types.RPCResponse{Result: res}) + emptyRespBytes, _ := json.Marshal(types.RPCResponse{Result: res, ID: req.ID}) if err := conn.WriteMessage(messageType, emptyRespBytes); err != nil { return } diff --git a/rpc/lib/doc.go b/rpc/lib/doc.go index aa9638bfd..d22c2fc87 100644 --- a/rpc/lib/doc.go +++ b/rpc/lib/doc.go @@ -1,4 +1,5 @@ -// HTTP RPC server supporting calls via uri params, jsonrpc, and jsonrpc over websockets +// HTTP RPC server supporting calls via uri params, jsonrpc over HTTP, and jsonrpc over +// websockets // // Client Requests // diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go deleted file mode 100644 index a3eb14bd4..000000000 --- a/rpc/lib/server/handlers.go +++ /dev/null @@ -1,926 +0,0 @@ -package rpcserver - -import ( - "bytes" - "context" - "encoding/hex" - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "reflect" - "runtime/debug" - "sort" - "strings" - "time" - - "github.com/gorilla/websocket" - "github.com/pkg/errors" - - amino "github.com/tendermint/go-amino" - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - types "github.com/tendermint/tendermint/rpc/lib/types" -) - -// RegisterRPCFuncs adds a route for each function in the funcMap, -// as well as general jsonrpc and websocket handlers for all functions. -// "result" is the interface on which the result objects are registered, -// and is popualted with every RPCResponse -func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, cdc *amino.Codec, logger log.Logger) { - // HTTP endpoints - for funcName, rpcFunc := range funcMap { - mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc, cdc, logger)) - } - - // JSONRPC endpoints - mux.HandleFunc("/", handleInvalidJSONRPCPaths(makeJSONRPCHandler(funcMap, cdc, logger))) -} - -//------------------------------------- -// function introspection - -// RPCFunc contains the introspected type information for a function -type RPCFunc struct { - f reflect.Value // underlying rpc function - args []reflect.Type // type of each function arg - returns []reflect.Type // type of each return arg - argNames []string // name of each argument - ws bool // websocket only -} - -// NewRPCFunc wraps a function for introspection. -// f is the function, args are comma separated argument names -func NewRPCFunc(f interface{}, args string) *RPCFunc { - return newRPCFunc(f, args, false) -} - -// NewWSRPCFunc wraps a function for introspection and use in the websockets. -func NewWSRPCFunc(f interface{}, args string) *RPCFunc { - return newRPCFunc(f, args, true) -} - -func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc { - var argNames []string - if args != "" { - argNames = strings.Split(args, ",") - } - return &RPCFunc{ - f: reflect.ValueOf(f), - args: funcArgTypes(f), - returns: funcReturnTypes(f), - argNames: argNames, - ws: ws, - } -} - -// return a function's argument types -func funcArgTypes(f interface{}) []reflect.Type { - t := reflect.TypeOf(f) - n := t.NumIn() - typez := make([]reflect.Type, n) - for i := 0; i < n; i++ { - typez[i] = t.In(i) - } - return typez -} - -// return a function's return types -func funcReturnTypes(f interface{}) []reflect.Type { - t := reflect.TypeOf(f) - n := t.NumOut() - typez := make([]reflect.Type, n) - for i := 0; i < n; i++ { - typez[i] = t.Out(i) - } - return typez -} - -// function introspection -//----------------------------------------------------------------------------- -// rpc.json - -// jsonrpc calls grab the given method's function info and runs reflect.Call -func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger log.Logger) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - b, err := ioutil.ReadAll(r.Body) - if err != nil { - WriteRPCResponseHTTP( - w, - types.RPCInvalidRequestError( - types.JSONRPCStringID(""), - errors.Wrap(err, "error reading request body"), - ), - ) - return - } - // if its an empty request (like from a browser), - // just display a list of functions - if len(b) == 0 { - writeListOfEndpoints(w, r, funcMap) - return - } - - // first try to unmarshal the incoming request as an array of RPC requests - var ( - requests []types.RPCRequest - responses []types.RPCResponse - ) - if err := json.Unmarshal(b, &requests); err != nil { - // next, try to unmarshal as a single request - var request types.RPCRequest - if err := json.Unmarshal(b, &request); err != nil { - WriteRPCResponseHTTP( - w, - types.RPCParseError( - types.JSONRPCStringID(""), - errors.Wrap(err, "error unmarshalling request"), - ), - ) - return - } - requests = []types.RPCRequest{request} - } - - for _, request := range requests { - request := request - // A Notification is a Request object without an "id" member. - // The Server MUST NOT reply to a Notification, including those that are within a batch request. - if request.ID == types.JSONRPCStringID("") { - logger.Debug( - "HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)", - ) - continue - } - if len(r.URL.Path) > 1 { - responses = append( - responses, - types.RPCInvalidRequestError(request.ID, errors.Errorf("path %s is invalid", r.URL.Path)), - ) - continue - } - rpcFunc, ok := funcMap[request.Method] - if !ok || rpcFunc.ws { - responses = append(responses, types.RPCMethodNotFoundError(request.ID)) - continue - } - ctx := &types.Context{JSONReq: &request, HTTPReq: r} - args := []reflect.Value{reflect.ValueOf(ctx)} - if len(request.Params) > 0 { - fnArgs, err := jsonParamsToArgs(rpcFunc, cdc, request.Params) - if err != nil { - responses = append( - responses, - types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "error converting json params to arguments")), - ) - continue - } - args = append(args, fnArgs...) - } - returns := rpcFunc.f.Call(args) - logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - responses = append(responses, types.RPCInternalError(request.ID, err)) - continue - } - responses = append(responses, types.NewRPCSuccessResponse(cdc, request.ID, result)) - } - if len(responses) > 0 { - WriteRPCResponseArrayHTTP(w, responses) - } - } -} - -func handleInvalidJSONRPCPaths(next http.HandlerFunc) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - // Since the pattern "/" matches all paths not matched by other registered patterns, - // we check whether the path is indeed "/", otherwise return a 404 error - if r.URL.Path != "/" { - http.NotFound(w, r) - return - } - - next(w, r) - } -} - -func mapParamsToArgs( - rpcFunc *RPCFunc, - cdc *amino.Codec, - params map[string]json.RawMessage, - argsOffset int, -) ([]reflect.Value, error) { - values := make([]reflect.Value, len(rpcFunc.argNames)) - for i, argName := range rpcFunc.argNames { - argType := rpcFunc.args[i+argsOffset] - - if p, ok := params[argName]; ok && p != nil && len(p) > 0 { - val := reflect.New(argType) - err := cdc.UnmarshalJSON(p, val.Interface()) - if err != nil { - return nil, err - } - values[i] = val.Elem() - } else { // use default for that type - values[i] = reflect.Zero(argType) - } - } - - return values, nil -} - -func arrayParamsToArgs( - rpcFunc *RPCFunc, - cdc *amino.Codec, - params []json.RawMessage, - argsOffset int, -) ([]reflect.Value, error) { - if len(rpcFunc.argNames) != len(params) { - return nil, errors.Errorf("expected %v parameters (%v), got %v (%v)", - len(rpcFunc.argNames), rpcFunc.argNames, len(params), params) - } - - values := make([]reflect.Value, len(params)) - for i, p := range params { - argType := rpcFunc.args[i+argsOffset] - val := reflect.New(argType) - err := cdc.UnmarshalJSON(p, val.Interface()) - if err != nil { - return nil, err - } - values[i] = val.Elem() - } - return values, nil -} - -// raw is unparsed json (from json.RawMessage) encoding either a map or an -// array. -// -// Example: -// rpcFunc.args = [rpctypes.Context string] -// rpcFunc.argNames = ["arg"] -func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte) ([]reflect.Value, error) { - const argsOffset = 1 - - // TODO: Make more efficient, perhaps by checking the first character for '{' or '['? - // First, try to get the map. - var m map[string]json.RawMessage - err := json.Unmarshal(raw, &m) - if err == nil { - return mapParamsToArgs(rpcFunc, cdc, m, argsOffset) - } - - // Otherwise, try an array. - var a []json.RawMessage - err = json.Unmarshal(raw, &a) - if err == nil { - return arrayParamsToArgs(rpcFunc, cdc, a, argsOffset) - } - - // Otherwise, bad format, we cannot parse - return nil, errors.Errorf("unknown type for JSON params: %v. Expected map or array", err) -} - -// rpc.json -//----------------------------------------------------------------------------- -// rpc.http - -// convert from a function name to the http handler -func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func(http.ResponseWriter, *http.Request) { - // Exception for websocket endpoints - if rpcFunc.ws { - return func(w http.ResponseWriter, r *http.Request) { - WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(types.JSONRPCStringID(""))) - } - } - - // All other endpoints - return func(w http.ResponseWriter, r *http.Request) { - logger.Debug("HTTP HANDLER", "req", r) - - ctx := &types.Context{HTTPReq: r} - args := []reflect.Value{reflect.ValueOf(ctx)} - - fnArgs, err := httpParamsToArgs(rpcFunc, cdc, r) - if err != nil { - WriteRPCResponseHTTP( - w, - types.RPCInvalidParamsError( - types.JSONRPCStringID(""), - errors.Wrap(err, "error converting http params to arguments"), - ), - ) - return - } - args = append(args, fnArgs...) - - returns := rpcFunc.f.Call(args) - - logger.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - WriteRPCResponseHTTP(w, types.RPCInternalError(types.JSONRPCStringID(""), err)) - return - } - WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(cdc, types.JSONRPCStringID(""), result)) - } -} - -// Covert an http query to a list of properly typed values. -// To be properly decoded the arg must be a concrete type from tendermint (if its an interface). -func httpParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, r *http.Request) ([]reflect.Value, error) { - // skip types.Context - const argsOffset = 1 - - values := make([]reflect.Value, len(rpcFunc.argNames)) - - for i, name := range rpcFunc.argNames { - argType := rpcFunc.args[i+argsOffset] - - values[i] = reflect.Zero(argType) // set default for that type - - arg := GetParam(r, name) - // log.Notice("param to arg", "argType", argType, "name", name, "arg", arg) - - if "" == arg { - continue - } - - v, err, ok := nonJSONStringToArg(cdc, argType, arg) - if err != nil { - return nil, err - } - if ok { - values[i] = v - continue - } - - values[i], err = jsonStringToArg(cdc, argType, arg) - if err != nil { - return nil, err - } - } - - return values, nil -} - -func jsonStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect.Value, error) { - rv := reflect.New(rt) - err := cdc.UnmarshalJSON([]byte(arg), rv.Interface()) - if err != nil { - return rv, err - } - rv = rv.Elem() - return rv, nil -} - -func nonJSONStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect.Value, error, bool) { - if rt.Kind() == reflect.Ptr { - rv_, err, ok := nonJSONStringToArg(cdc, rt.Elem(), arg) - switch { - case err != nil: - return reflect.Value{}, err, false - case ok: - rv := reflect.New(rt.Elem()) - rv.Elem().Set(rv_) - return rv, nil, true - default: - return reflect.Value{}, nil, false - } - } else { - return _nonJSONStringToArg(cdc, rt, arg) - } -} - -// NOTE: rt.Kind() isn't a pointer. -func _nonJSONStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect.Value, error, bool) { - isIntString := RE_INT.Match([]byte(arg)) - isQuotedString := strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`) - isHexString := strings.HasPrefix(strings.ToLower(arg), "0x") - - var expectingString, expectingByteSlice, expectingInt bool - switch rt.Kind() { - case reflect.Int, - reflect.Uint, - reflect.Int8, - reflect.Uint8, - reflect.Int16, - reflect.Uint16, - reflect.Int32, - reflect.Uint32, - reflect.Int64, - reflect.Uint64: - expectingInt = true - case reflect.String: - expectingString = true - case reflect.Slice: - expectingByteSlice = rt.Elem().Kind() == reflect.Uint8 - } - - if isIntString && expectingInt { - qarg := `"` + arg + `"` - // jsonStringToArg - rv, err := jsonStringToArg(cdc, rt, qarg) - if err != nil { - return rv, err, false - } - - return rv, nil, true - } - - if isHexString { - if !expectingString && !expectingByteSlice { - err := errors.Errorf("got a hex string arg, but expected '%s'", - rt.Kind().String()) - return reflect.ValueOf(nil), err, false - } - - var value []byte - value, err := hex.DecodeString(arg[2:]) - if err != nil { - return reflect.ValueOf(nil), err, false - } - if rt.Kind() == reflect.String { - return reflect.ValueOf(string(value)), nil, true - } - return reflect.ValueOf(value), nil, true - } - - if isQuotedString && expectingByteSlice { - v := reflect.New(reflect.TypeOf("")) - err := cdc.UnmarshalJSON([]byte(arg), v.Interface()) - if err != nil { - return reflect.ValueOf(nil), err, false - } - v = v.Elem() - return reflect.ValueOf([]byte(v.String())), nil, true - } - - return reflect.ValueOf(nil), nil, false -} - -// rpc.http -//----------------------------------------------------------------------------- -// rpc.websocket - -const ( - defaultWSWriteChanCapacity = 1000 - defaultWSWriteWait = 10 * time.Second - defaultWSReadWait = 30 * time.Second - defaultWSPingPeriod = (defaultWSReadWait * 9) / 10 -) - -// A single websocket connection contains listener id, underlying ws -// connection, and the event switch for subscribing to events. -// -// In case of an error, the connection is stopped. -type wsConnection struct { - cmn.BaseService - - remoteAddr string - baseConn *websocket.Conn - writeChan chan types.RPCResponse - - funcMap map[string]*RPCFunc - cdc *amino.Codec - - // write channel capacity - writeChanCapacity int - - // each write times out after this. - writeWait time.Duration - - // Connection times out if we haven't received *anything* in this long, not even pings. - readWait time.Duration - - // Send pings to server with this period. Must be less than readWait, but greater than zero. - pingPeriod time.Duration - - // Maximum message size. - readLimit int64 - - // callback which is called upon disconnect - onDisconnect func(remoteAddr string) - - ctx context.Context - cancel context.CancelFunc -} - -// NewWSConnection wraps websocket.Conn. -// -// See the commentary on the func(*wsConnection) functions for a detailed -// description of how to configure ping period and pong wait time. NOTE: if the -// write buffer is full, pongs may be dropped, which may cause clients to -// disconnect. see https://github.com/gorilla/websocket/issues/97 -func NewWSConnection( - baseConn *websocket.Conn, - funcMap map[string]*RPCFunc, - cdc *amino.Codec, - options ...func(*wsConnection), -) *wsConnection { - wsc := &wsConnection{ - remoteAddr: baseConn.RemoteAddr().String(), - baseConn: baseConn, - funcMap: funcMap, - cdc: cdc, - writeWait: defaultWSWriteWait, - writeChanCapacity: defaultWSWriteChanCapacity, - readWait: defaultWSReadWait, - pingPeriod: defaultWSPingPeriod, - } - for _, option := range options { - option(wsc) - } - wsc.baseConn.SetReadLimit(wsc.readLimit) - wsc.BaseService = *cmn.NewBaseService(nil, "wsConnection", wsc) - return wsc -} - -// OnDisconnect sets a callback which is used upon disconnect - not -// Goroutine-safe. Nop by default. -func OnDisconnect(onDisconnect func(remoteAddr string)) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.onDisconnect = onDisconnect - } -} - -// WriteWait sets the amount of time to wait before a websocket write times out. -// It should only be used in the constructor - not Goroutine-safe. -func WriteWait(writeWait time.Duration) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.writeWait = writeWait - } -} - -// WriteChanCapacity sets the capacity of the websocket write channel. -// It should only be used in the constructor - not Goroutine-safe. -func WriteChanCapacity(cap int) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.writeChanCapacity = cap - } -} - -// ReadWait sets the amount of time to wait before a websocket read times out. -// It should only be used in the constructor - not Goroutine-safe. -func ReadWait(readWait time.Duration) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.readWait = readWait - } -} - -// PingPeriod sets the duration for sending websocket pings. -// It should only be used in the constructor - not Goroutine-safe. -func PingPeriod(pingPeriod time.Duration) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.pingPeriod = pingPeriod - } -} - -// ReadLimit sets the maximum size for reading message. -// It should only be used in the constructor - not Goroutine-safe. -func ReadLimit(readLimit int64) func(*wsConnection) { - return func(wsc *wsConnection) { - wsc.readLimit = readLimit - } -} - -// OnStart implements cmn.Service by starting the read and write routines. It -// blocks until the connection closes. -func (wsc *wsConnection) OnStart() error { - wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity) - - // Read subscriptions/unsubscriptions to events - go wsc.readRoutine() - // Write responses, BLOCKING. - wsc.writeRoutine() - - return nil -} - -// OnStop implements cmn.Service by unsubscribing remoteAddr from all subscriptions. -func (wsc *wsConnection) OnStop() { - // Both read and write loops close the websocket connection when they exit their loops. - // The writeChan is never closed, to allow WriteRPCResponse() to fail. - - if wsc.onDisconnect != nil { - wsc.onDisconnect(wsc.remoteAddr) - } - - if wsc.ctx != nil { - wsc.cancel() - } -} - -// GetRemoteAddr returns the remote address of the underlying connection. -// It implements WSRPCConnection -func (wsc *wsConnection) GetRemoteAddr() string { - return wsc.remoteAddr -} - -// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. -// It implements WSRPCConnection. It is Goroutine-safe. -func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { - select { - case <-wsc.Quit(): - return - case wsc.writeChan <- resp: - } -} - -// TryWriteRPCResponse attempts to push a response to the writeChan, but does not block. -// It implements WSRPCConnection. It is Goroutine-safe -func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { - select { - case <-wsc.Quit(): - return false - case wsc.writeChan <- resp: - return true - default: - return false - } -} - -// Codec returns an amino codec used to decode parameters and encode results. -// It implements WSRPCConnection. -func (wsc *wsConnection) Codec() *amino.Codec { - return wsc.cdc -} - -// Context returns the connection's context. -// The context is canceled when the client's connection closes. -func (wsc *wsConnection) Context() context.Context { - if wsc.ctx != nil { - return wsc.ctx - } - wsc.ctx, wsc.cancel = context.WithCancel(context.Background()) - return wsc.ctx -} - -// Read from the socket and subscribe to or unsubscribe from events -func (wsc *wsConnection) readRoutine() { - defer func() { - if r := recover(); r != nil { - err, ok := r.(error) - if !ok { - err = fmt.Errorf("WSJSONRPC: %v", r) - } - wsc.Logger.Error("Panic in WSJSONRPC handler", "err", err, "stack", string(debug.Stack())) - wsc.WriteRPCResponse(types.RPCInternalError(types.JSONRPCStringID("unknown"), err)) - go wsc.readRoutine() - } else { - wsc.baseConn.Close() // nolint: errcheck - } - }() - - wsc.baseConn.SetPongHandler(func(m string) error { - return wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)) - }) - - for { - select { - case <-wsc.Quit(): - return - default: - // reset deadline for every type of message (control or data) - if err := wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)); err != nil { - wsc.Logger.Error("failed to set read deadline", "err", err) - } - var in []byte - _, in, err := wsc.baseConn.ReadMessage() - if err != nil { - if websocket.IsCloseError(err, websocket.CloseNormalClosure) { - wsc.Logger.Info("Client closed the connection") - } else { - wsc.Logger.Error("Failed to read request", "err", err) - } - wsc.Stop() - return - } - - var request types.RPCRequest - err = json.Unmarshal(in, &request) - if err != nil { - wsc.WriteRPCResponse(types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "error unmarshaling request"))) - continue - } - - // A Notification is a Request object without an "id" member. - // The Server MUST NOT reply to a Notification, including those that are within a batch request. - if request.ID == types.JSONRPCStringID("") { - wsc.Logger.Debug( - "WSJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)", - ) - continue - } - - // Now, fetch the RPCFunc and execute it. - rpcFunc := wsc.funcMap[request.Method] - if rpcFunc == nil { - wsc.WriteRPCResponse(types.RPCMethodNotFoundError(request.ID)) - continue - } - - ctx := &types.Context{JSONReq: &request, WSConn: wsc} - args := []reflect.Value{reflect.ValueOf(ctx)} - if len(request.Params) > 0 { - fnArgs, err := jsonParamsToArgs(rpcFunc, wsc.cdc, request.Params) - if err != nil { - wsc.WriteRPCResponse( - types.RPCInternalError(request.ID, errors.Wrap(err, "error converting json params to arguments")), - ) - continue - } - args = append(args, fnArgs...) - } - - returns := rpcFunc.f.Call(args) - - // TODO: Need to encode args/returns to string if we want to log them - wsc.Logger.Info("WSJSONRPC", "method", request.Method) - - result, err := unreflectResult(returns) - if err != nil { - wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err)) - continue - } - - wsc.WriteRPCResponse(types.NewRPCSuccessResponse(wsc.cdc, request.ID, result)) - } - } -} - -// receives on a write channel and writes out on the socket -func (wsc *wsConnection) writeRoutine() { - pingTicker := time.NewTicker(wsc.pingPeriod) - defer func() { - pingTicker.Stop() - if err := wsc.baseConn.Close(); err != nil { - wsc.Logger.Error("Error closing connection", "err", err) - } - }() - - // https://github.com/gorilla/websocket/issues/97 - pongs := make(chan string, 1) - wsc.baseConn.SetPingHandler(func(m string) error { - select { - case pongs <- m: - default: - } - return nil - }) - - for { - select { - case m := <-pongs: - err := wsc.writeMessageWithDeadline(websocket.PongMessage, []byte(m)) - if err != nil { - wsc.Logger.Info("Failed to write pong (client may disconnect)", "err", err) - } - case <-pingTicker.C: - err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{}) - if err != nil { - wsc.Logger.Error("Failed to write ping", "err", err) - wsc.Stop() - return - } - case msg := <-wsc.writeChan: - jsonBytes, err := json.MarshalIndent(msg, "", " ") - if err != nil { - wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err) - } else if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil { - wsc.Logger.Error("Failed to write response", "err", err) - wsc.Stop() - return - } - case <-wsc.Quit(): - return - } - } -} - -// All writes to the websocket must (re)set the write deadline. -// If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553) -func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error { - if err := wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait)); err != nil { - return err - } - return wsc.baseConn.WriteMessage(msgType, msg) -} - -//---------------------------------------- - -// WebsocketManager provides a WS handler for incoming connections and passes a -// map of functions along with any additional params to new connections. -// NOTE: The websocket path is defined externally, e.g. in node/node.go -type WebsocketManager struct { - websocket.Upgrader - - funcMap map[string]*RPCFunc - cdc *amino.Codec - logger log.Logger - wsConnOptions []func(*wsConnection) -} - -// NewWebsocketManager returns a new WebsocketManager that passes a map of -// functions, connection options and logger to new WS connections. -func NewWebsocketManager( - funcMap map[string]*RPCFunc, - cdc *amino.Codec, - wsConnOptions ...func(*wsConnection), -) *WebsocketManager { - return &WebsocketManager{ - funcMap: funcMap, - cdc: cdc, - Upgrader: websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - // TODO ??? - return true - }, - }, - logger: log.NewNopLogger(), - wsConnOptions: wsConnOptions, - } -} - -// SetLogger sets the logger. -func (wm *WebsocketManager) SetLogger(l log.Logger) { - wm.logger = l -} - -// WebsocketHandler upgrades the request/response (via http.Hijack) and starts -// the wsConnection. -func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { - wsConn, err := wm.Upgrade(w, r, nil) - if err != nil { - // TODO - return http error - wm.logger.Error("Failed to upgrade to websocket connection", "err", err) - return - } - - // register connection - con := NewWSConnection(wsConn, wm.funcMap, wm.cdc, wm.wsConnOptions...) - con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr())) - wm.logger.Info("New websocket connection", "remote", con.remoteAddr) - err = con.Start() // Blocking - if err != nil { - wm.logger.Error("Error starting connection", "err", err) - } -} - -// rpc.websocket -//----------------------------------------------------------------------------- - -// NOTE: assume returns is result struct and error. If error is not nil, return it -func unreflectResult(returns []reflect.Value) (interface{}, error) { - errV := returns[1] - if errV.Interface() != nil { - return nil, errors.Errorf("%v", errV.Interface()) - } - rv := returns[0] - // the result is a registered interface, - // we need a pointer to it so we can marshal with type byte - rvp := reflect.New(rv.Type()) - rvp.Elem().Set(rv) - return rvp.Interface(), nil -} - -// writes a list of available rpc endpoints as an html page -func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) { - noArgNames := []string{} - argNames := []string{} - for name, funcData := range funcMap { - if len(funcData.args) == 0 { - noArgNames = append(noArgNames, name) - } else { - argNames = append(argNames, name) - } - } - sort.Strings(noArgNames) - sort.Strings(argNames) - buf := new(bytes.Buffer) - buf.WriteString("") - buf.WriteString("
Available endpoints:
") - - for _, name := range noArgNames { - link := fmt.Sprintf("//%s/%s", r.Host, name) - buf.WriteString(fmt.Sprintf("%s
", link, link)) - } - - buf.WriteString("
Endpoints that require arguments:
") - for _, name := range argNames { - link := fmt.Sprintf("//%s/%s?", r.Host, name) - funcData := funcMap[name] - for i, argName := range funcData.argNames { - link += argName + "=_" - if i < len(funcData.argNames)-1 { - link += "&" - } - } - buf.WriteString(fmt.Sprintf("%s
", link, link)) - } - buf.WriteString("") - w.Header().Set("Content-Type", "text/html") - w.WriteHeader(200) - w.Write(buf.Bytes()) // nolint: errcheck -} diff --git a/rpc/lib/server/http_json_handler.go b/rpc/lib/server/http_json_handler.go new file mode 100644 index 000000000..65c0a680f --- /dev/null +++ b/rpc/lib/server/http_json_handler.go @@ -0,0 +1,248 @@ +package rpcserver + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "reflect" + "sort" + + "github.com/pkg/errors" + + amino "github.com/tendermint/go-amino" + + "github.com/tendermint/tendermint/libs/log" + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +/////////////////////////////////////////////////////////////////////////////// +// HTTP + JSON handler +/////////////////////////////////////////////////////////////////////////////// + +// jsonrpc calls grab the given method's function info and runs reflect.Call +func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger log.Logger) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + WriteRPCResponseHTTP( + w, + types.RPCInvalidRequestError( + nil, + errors.Wrap(err, "error reading request body"), + ), + ) + return + } + + // if its an empty request (like from a browser), just display a list of + // functions + if len(b) == 0 { + writeListOfEndpoints(w, r, funcMap) + return + } + + // first try to unmarshal the incoming request as an array of RPC requests + var ( + requests []types.RPCRequest + responses []types.RPCResponse + ) + if err := json.Unmarshal(b, &requests); err != nil { + // next, try to unmarshal as a single request + var request types.RPCRequest + if err := json.Unmarshal(b, &request); err != nil { + WriteRPCResponseHTTP( + w, + types.RPCParseError( + errors.Wrap(err, "error unmarshalling request"), + ), + ) + return + } + requests = []types.RPCRequest{request} + } + + for _, request := range requests { + request := request + + // A Notification is a Request object without an "id" member. + // The Server MUST NOT reply to a Notification, including those that are within a batch request. + if request.ID == nil { + logger.Debug( + "HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)", + "req", request, + ) + continue + } + if len(r.URL.Path) > 1 { + responses = append( + responses, + types.RPCInvalidRequestError(request.ID, errors.Errorf("path %s is invalid", r.URL.Path)), + ) + continue + } + rpcFunc, ok := funcMap[request.Method] + if !ok || rpcFunc.ws { + responses = append(responses, types.RPCMethodNotFoundError(request.ID)) + continue + } + ctx := &types.Context{JSONReq: &request, HTTPReq: r} + args := []reflect.Value{reflect.ValueOf(ctx)} + if len(request.Params) > 0 { + fnArgs, err := jsonParamsToArgs(rpcFunc, cdc, request.Params) + if err != nil { + responses = append( + responses, + types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "error converting json params to arguments")), + ) + continue + } + args = append(args, fnArgs...) + } + returns := rpcFunc.f.Call(args) + logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + responses = append(responses, types.RPCInternalError(request.ID, err)) + continue + } + responses = append(responses, types.NewRPCSuccessResponse(cdc, request.ID, result)) + } + if len(responses) > 0 { + WriteRPCResponseArrayHTTP(w, responses) + } + } +} + +func handleInvalidJSONRPCPaths(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // Since the pattern "/" matches all paths not matched by other registered patterns, + // we check whether the path is indeed "/", otherwise return a 404 error + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + + next(w, r) + } +} + +func mapParamsToArgs( + rpcFunc *RPCFunc, + cdc *amino.Codec, + params map[string]json.RawMessage, + argsOffset int, +) ([]reflect.Value, error) { + + values := make([]reflect.Value, len(rpcFunc.argNames)) + for i, argName := range rpcFunc.argNames { + argType := rpcFunc.args[i+argsOffset] + + if p, ok := params[argName]; ok && p != nil && len(p) > 0 { + val := reflect.New(argType) + err := cdc.UnmarshalJSON(p, val.Interface()) + if err != nil { + return nil, err + } + values[i] = val.Elem() + } else { // use default for that type + values[i] = reflect.Zero(argType) + } + } + + return values, nil +} + +func arrayParamsToArgs( + rpcFunc *RPCFunc, + cdc *amino.Codec, + params []json.RawMessage, + argsOffset int, +) ([]reflect.Value, error) { + + if len(rpcFunc.argNames) != len(params) { + return nil, errors.Errorf("expected %v parameters (%v), got %v (%v)", + len(rpcFunc.argNames), rpcFunc.argNames, len(params), params) + } + + values := make([]reflect.Value, len(params)) + for i, p := range params { + argType := rpcFunc.args[i+argsOffset] + val := reflect.New(argType) + err := cdc.UnmarshalJSON(p, val.Interface()) + if err != nil { + return nil, err + } + values[i] = val.Elem() + } + return values, nil +} + +// raw is unparsed json (from json.RawMessage) encoding either a map or an +// array. +// +// Example: +// rpcFunc.args = [rpctypes.Context string] +// rpcFunc.argNames = ["arg"] +func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte) ([]reflect.Value, error) { + const argsOffset = 1 + + // TODO: Make more efficient, perhaps by checking the first character for '{' or '['? + // First, try to get the map. + var m map[string]json.RawMessage + err := json.Unmarshal(raw, &m) + if err == nil { + return mapParamsToArgs(rpcFunc, cdc, m, argsOffset) + } + + // Otherwise, try an array. + var a []json.RawMessage + err = json.Unmarshal(raw, &a) + if err == nil { + return arrayParamsToArgs(rpcFunc, cdc, a, argsOffset) + } + + // Otherwise, bad format, we cannot parse + return nil, errors.Errorf("unknown type for JSON params: %v. Expected map or array", err) +} + +// writes a list of available rpc endpoints as an html page +func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) { + noArgNames := []string{} + argNames := []string{} + for name, funcData := range funcMap { + if len(funcData.args) == 0 { + noArgNames = append(noArgNames, name) + } else { + argNames = append(argNames, name) + } + } + sort.Strings(noArgNames) + sort.Strings(argNames) + buf := new(bytes.Buffer) + buf.WriteString("") + buf.WriteString("
Available endpoints:
") + + for _, name := range noArgNames { + link := fmt.Sprintf("//%s/%s", r.Host, name) + buf.WriteString(fmt.Sprintf("%s
", link, link)) + } + + buf.WriteString("
Endpoints that require arguments:
") + for _, name := range argNames { + link := fmt.Sprintf("//%s/%s?", r.Host, name) + funcData := funcMap[name] + for i, argName := range funcData.argNames { + link += argName + "=_" + if i < len(funcData.argNames)-1 { + link += "&" + } + } + buf.WriteString(fmt.Sprintf("%s
", link, link)) + } + buf.WriteString("") + w.Header().Set("Content-Type", "text/html") + w.WriteHeader(200) + w.Write(buf.Bytes()) // nolint: errcheck +} diff --git a/rpc/lib/server/handlers_test.go b/rpc/lib/server/http_json_handler_test.go similarity index 76% rename from rpc/lib/server/handlers_test.go rename to rpc/lib/server/http_json_handler_test.go index a882d5939..e4ae2f8bf 100644 --- a/rpc/lib/server/handlers_test.go +++ b/rpc/lib/server/http_json_handler_test.go @@ -1,4 +1,4 @@ -package rpcserver_test +package rpcserver import ( "bytes" @@ -9,32 +9,23 @@ import ( "strings" "testing" - "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" amino "github.com/tendermint/go-amino" "github.com/tendermint/tendermint/libs/log" - rs "github.com/tendermint/tendermint/rpc/lib/server" types "github.com/tendermint/tendermint/rpc/lib/types" ) -////////////////////////////////////////////////////////////////////////////// -// HTTP REST API -// TODO - -////////////////////////////////////////////////////////////////////////////// -// JSON-RPC over HTTP - func testMux() *http.ServeMux { - funcMap := map[string]*rs.RPCFunc{ - "c": rs.NewRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"), + funcMap := map[string]*RPCFunc{ + "c": NewRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"), } cdc := amino.NewCodec() mux := http.NewServeMux() buf := new(bytes.Buffer) logger := log.NewTMLogger(buf) - rs.RegisterRPCFuncs(mux, funcMap, cdc, logger) + RegisterRPCFuncs(mux, funcMap, cdc, logger) return mux } @@ -49,17 +40,20 @@ func TestRPCParams(t *testing.T) { tests := []struct { payload string wantErr string - expectedId interface{} + expectedID interface{} }{ // bad {`{"jsonrpc": "2.0", "id": "0"}`, "Method not found", types.JSONRPCStringID("0")}, {`{"jsonrpc": "2.0", "method": "y", "id": "0"}`, "Method not found", types.JSONRPCStringID("0")}, // id not captured in JSON parsing failures - {`{"method": "c", "id": "0", "params": a}`, "invalid character", types.JSONRPCStringID("")}, + {`{"method": "c", "id": "0", "params": a}`, "invalid character", nil}, {`{"method": "c", "id": "0", "params": ["a"]}`, "got 1", types.JSONRPCStringID("0")}, {`{"method": "c", "id": "0", "params": ["a", "b"]}`, "invalid character", types.JSONRPCStringID("0")}, {`{"method": "c", "id": "0", "params": [1, 1]}`, "of type string", types.JSONRPCStringID("0")}, + // no ID - notification + // {`{"jsonrpc": "2.0", "method": "c", "params": ["a", "10"]}`, false, nil}, + // good {`{"jsonrpc": "2.0", "method": "c", "id": "0", "params": null}`, "", types.JSONRPCStringID("0")}, {`{"method": "c", "id": "0", "params": {}}`, "", types.JSONRPCStringID("0")}, @@ -83,7 +77,7 @@ func TestRPCParams(t *testing.T) { recv := new(types.RPCResponse) assert.Nil(t, json.Unmarshal(blob, recv), "#%d: expecting successful parsing of an RPCResponse:\nblob: %s", i, blob) assert.NotEqual(t, recv, new(types.RPCResponse), "#%d: not expecting a blank RPCResponse", i) - assert.Equal(t, tt.expectedId, recv.ID, "#%d: expected ID not matched in RPCResponse", i) + assert.Equal(t, tt.expectedID, recv.ID, "#%d: expected ID not matched in RPCResponse", i) if tt.wantErr == "" { assert.Nil(t, recv.Error, "#%d: not expecting an error", i) } else { @@ -99,7 +93,7 @@ func TestJSONRPCID(t *testing.T) { tests := []struct { payload string wantErr bool - expectedId interface{} + expectedID interface{} }{ // good id {`{"jsonrpc": "2.0", "method": "c", "id": "0", "params": ["a", "10"]}`, false, types.JSONRPCStringID("0")}, @@ -108,7 +102,6 @@ func TestJSONRPCID(t *testing.T) { {`{"jsonrpc": "2.0", "method": "c", "id": 1, "params": ["a", "10"]}`, false, types.JSONRPCIntID(1)}, {`{"jsonrpc": "2.0", "method": "c", "id": 1.3, "params": ["a", "10"]}`, false, types.JSONRPCIntID(1)}, {`{"jsonrpc": "2.0", "method": "c", "id": -1, "params": ["a", "10"]}`, false, types.JSONRPCIntID(-1)}, - {`{"jsonrpc": "2.0", "method": "c", "id": null, "params": ["a", "10"]}`, false, nil}, // bad id {`{"jsonrpc": "2.0", "method": "c", "id": {}, "params": ["a", "10"]}`, true, nil}, @@ -134,7 +127,7 @@ func TestJSONRPCID(t *testing.T) { assert.Nil(t, err, "#%d: expecting successful parsing of an RPCResponse:\nblob: %s", i, blob) if !tt.wantErr { assert.NotEqual(t, recv, new(types.RPCResponse), "#%d: not expecting a blank RPCResponse", i) - assert.Equal(t, tt.expectedId, recv.ID, "#%d: expected ID not matched in RPCResponse", i) + assert.Equal(t, tt.expectedID, recv.ID, "#%d: expected ID not matched in RPCResponse", i) assert.Nil(t, recv.Error, "#%d: not expecting an error", i) } else { assert.True(t, recv.Error.Code < 0, "#%d: not expecting a positive JSONRPC code", i) @@ -144,7 +137,7 @@ func TestJSONRPCID(t *testing.T) { func TestRPCNotification(t *testing.T) { mux := testMux() - body := strings.NewReader(`{"jsonrpc": "2.0", "id": ""}`) + body := strings.NewReader(`{"jsonrpc": "2.0"}`) req, _ := http.NewRequest("POST", "http://localhost/", body) rec := httptest.NewRecorder() mux.ServeHTTP(rec, req) @@ -166,16 +159,16 @@ func TestRPCNotificationInBatch(t *testing.T) { }{ { `[ - {"jsonrpc": "2.0","id": ""}, + {"jsonrpc": "2.0"}, {"jsonrpc": "2.0","method":"c","id":"abc","params":["a","10"]} ]`, 1, }, { `[ - {"jsonrpc": "2.0","id": ""}, + {"jsonrpc": "2.0"}, {"jsonrpc": "2.0","method":"c","id":"abc","params":["a","10"]}, - {"jsonrpc": "2.0","id": ""}, + {"jsonrpc": "2.0"}, {"jsonrpc": "2.0","method":"c","id":"abc","params":["a","10"]} ]`, 2, @@ -236,50 +229,3 @@ func TestUnknownRPCPath(t *testing.T) { require.Equal(t, http.StatusNotFound, res.StatusCode, "should always return 404") res.Body.Close() } - -////////////////////////////////////////////////////////////////////////////// -// JSON-RPC over WEBSOCKETS - -func TestWebsocketManagerHandler(t *testing.T) { - s := newWSServer() - defer s.Close() - - // check upgrader works - d := websocket.Dialer{} - c, dialResp, err := d.Dial("ws://"+s.Listener.Addr().String()+"/websocket", nil) - require.NoError(t, err) - - if got, want := dialResp.StatusCode, http.StatusSwitchingProtocols; got != want { - t.Errorf("dialResp.StatusCode = %q, want %q", got, want) - } - - // check basic functionality works - req, err := types.MapToRequest( - amino.NewCodec(), - types.JSONRPCStringID("TestWebsocketManager"), - "c", - map[string]interface{}{"s": "a", "i": 10}, - ) - require.NoError(t, err) - err = c.WriteJSON(req) - require.NoError(t, err) - - var resp types.RPCResponse - err = c.ReadJSON(&resp) - require.NoError(t, err) - require.Nil(t, resp.Error) - dialResp.Body.Close() -} - -func newWSServer() *httptest.Server { - funcMap := map[string]*rs.RPCFunc{ - "c": rs.NewWSRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"), - } - wm := rs.NewWebsocketManager(funcMap, amino.NewCodec()) - wm.SetLogger(log.TestingLogger()) - - mux := http.NewServeMux() - mux.HandleFunc("/websocket", wm.WebsocketHandler) - - return httptest.NewServer(mux) -} diff --git a/rpc/lib/server/http_server.go b/rpc/lib/server/http_server.go index b48f09e56..501396867 100644 --- a/rpc/lib/server/http_server.go +++ b/rpc/lib/server/http_server.go @@ -163,7 +163,7 @@ func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler WriteRPCResponseHTTPError( rww, http.StatusInternalServerError, - types.RPCInternalError(types.JSONRPCStringID(""), e.(error)), + types.RPCInternalError(types.JSONRPCIntID(-1), e.(error)), ) } } diff --git a/rpc/lib/server/http_uri_handler.go b/rpc/lib/server/http_uri_handler.go new file mode 100644 index 000000000..6c68ba3ac --- /dev/null +++ b/rpc/lib/server/http_uri_handler.go @@ -0,0 +1,195 @@ +package rpcserver + +import ( + "encoding/hex" + "net/http" + "reflect" + "strings" + + "github.com/pkg/errors" + + amino "github.com/tendermint/go-amino" + + "github.com/tendermint/tendermint/libs/log" + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +/////////////////////////////////////////////////////////////////////////////// +// HTTP + URI handler +/////////////////////////////////////////////////////////////////////////////// + +// convert from a function name to the http handler +func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func(http.ResponseWriter, *http.Request) { + // Always return -1 as there's no ID here. + dummyID := types.JSONRPCIntID(-1) // URIClientRequestID + + // Exception for websocket endpoints + if rpcFunc.ws { + return func(w http.ResponseWriter, r *http.Request) { + WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(dummyID)) + } + } + + // All other endpoints + return func(w http.ResponseWriter, r *http.Request) { + logger.Debug("HTTP HANDLER", "req", r) + + ctx := &types.Context{HTTPReq: r} + args := []reflect.Value{reflect.ValueOf(ctx)} + + fnArgs, err := httpParamsToArgs(rpcFunc, cdc, r) + if err != nil { + WriteRPCResponseHTTP( + w, + types.RPCInvalidParamsError( + dummyID, + errors.Wrap(err, "error converting http params to arguments"), + ), + ) + return + } + args = append(args, fnArgs...) + + returns := rpcFunc.f.Call(args) + + logger.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + WriteRPCResponseHTTP(w, types.RPCInternalError(dummyID, err)) + return + } + WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(cdc, dummyID, result)) + } +} + +// Covert an http query to a list of properly typed values. +// To be properly decoded the arg must be a concrete type from tendermint (if its an interface). +func httpParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, r *http.Request) ([]reflect.Value, error) { + // skip types.Context + const argsOffset = 1 + + values := make([]reflect.Value, len(rpcFunc.argNames)) + + for i, name := range rpcFunc.argNames { + argType := rpcFunc.args[i+argsOffset] + + values[i] = reflect.Zero(argType) // set default for that type + + arg := GetParam(r, name) + // log.Notice("param to arg", "argType", argType, "name", name, "arg", arg) + + if arg == "" { + continue + } + + v, ok, err := nonJSONStringToArg(cdc, argType, arg) + if err != nil { + return nil, err + } + if ok { + values[i] = v + continue + } + + values[i], err = jsonStringToArg(cdc, argType, arg) + if err != nil { + return nil, err + } + } + + return values, nil +} + +func jsonStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect.Value, error) { + rv := reflect.New(rt) + err := cdc.UnmarshalJSON([]byte(arg), rv.Interface()) + if err != nil { + return rv, err + } + rv = rv.Elem() + return rv, nil +} + +func nonJSONStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect.Value, bool, error) { + if rt.Kind() == reflect.Ptr { + rv1, ok, err := nonJSONStringToArg(cdc, rt.Elem(), arg) + switch { + case err != nil: + return reflect.Value{}, false, err + case ok: + rv := reflect.New(rt.Elem()) + rv.Elem().Set(rv1) + return rv, true, nil + default: + return reflect.Value{}, false, nil + } + } else { + return _nonJSONStringToArg(cdc, rt, arg) + } +} + +// NOTE: rt.Kind() isn't a pointer. +func _nonJSONStringToArg(cdc *amino.Codec, rt reflect.Type, arg string) (reflect.Value, bool, error) { + isIntString := RE_INT.Match([]byte(arg)) + isQuotedString := strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`) + isHexString := strings.HasPrefix(strings.ToLower(arg), "0x") + + var expectingString, expectingByteSlice, expectingInt bool + switch rt.Kind() { + case reflect.Int, + reflect.Uint, + reflect.Int8, + reflect.Uint8, + reflect.Int16, + reflect.Uint16, + reflect.Int32, + reflect.Uint32, + reflect.Int64, + reflect.Uint64: + expectingInt = true + case reflect.String: + expectingString = true + case reflect.Slice: + expectingByteSlice = rt.Elem().Kind() == reflect.Uint8 + } + + if isIntString && expectingInt { + qarg := `"` + arg + `"` + rv, err := jsonStringToArg(cdc, rt, qarg) + if err != nil { + return rv, false, err + } + + return rv, true, nil + } + + if isHexString { + if !expectingString && !expectingByteSlice { + err := errors.Errorf("got a hex string arg, but expected '%s'", + rt.Kind().String()) + return reflect.ValueOf(nil), false, err + } + + var value []byte + value, err := hex.DecodeString(arg[2:]) + if err != nil { + return reflect.ValueOf(nil), false, err + } + if rt.Kind() == reflect.String { + return reflect.ValueOf(string(value)), true, nil + } + return reflect.ValueOf(value), true, nil + } + + if isQuotedString && expectingByteSlice { + v := reflect.New(reflect.TypeOf("")) + err := cdc.UnmarshalJSON([]byte(arg), v.Interface()) + if err != nil { + return reflect.ValueOf(nil), false, err + } + v = v.Elem() + return reflect.ValueOf([]byte(v.String())), true, nil + } + + return reflect.ValueOf(nil), false, nil +} diff --git a/rpc/lib/server/rpc_func.go b/rpc/lib/server/rpc_func.go new file mode 100644 index 000000000..906533328 --- /dev/null +++ b/rpc/lib/server/rpc_func.go @@ -0,0 +1,103 @@ +package rpcserver + +import ( + "net/http" + "reflect" + "strings" + + "github.com/pkg/errors" + + amino "github.com/tendermint/go-amino" + + "github.com/tendermint/tendermint/libs/log" +) + +// RegisterRPCFuncs adds a route for each function in the funcMap, as well as +// general jsonrpc and websocket handlers for all functions. "result" is the +// interface on which the result objects are registered, and is popualted with +// every RPCResponse +func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, cdc *amino.Codec, logger log.Logger) { + // HTTP endpoints + for funcName, rpcFunc := range funcMap { + mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc, cdc, logger)) + } + + // JSONRPC endpoints + mux.HandleFunc("/", handleInvalidJSONRPCPaths(makeJSONRPCHandler(funcMap, cdc, logger))) +} + +/////////////////////////////////////////////////////////////////////////////// +// Function introspection +/////////////////////////////////////////////////////////////////////////////// + +// RPCFunc contains the introspected type information for a function +type RPCFunc struct { + f reflect.Value // underlying rpc function + args []reflect.Type // type of each function arg + returns []reflect.Type // type of each return arg + argNames []string // name of each argument + ws bool // websocket only +} + +// NewRPCFunc wraps a function for introspection. +// f is the function, args are comma separated argument names +func NewRPCFunc(f interface{}, args string) *RPCFunc { + return newRPCFunc(f, args, false) +} + +// NewWSRPCFunc wraps a function for introspection and use in the websockets. +func NewWSRPCFunc(f interface{}, args string) *RPCFunc { + return newRPCFunc(f, args, true) +} + +func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc { + var argNames []string + if args != "" { + argNames = strings.Split(args, ",") + } + return &RPCFunc{ + f: reflect.ValueOf(f), + args: funcArgTypes(f), + returns: funcReturnTypes(f), + argNames: argNames, + ws: ws, + } +} + +// return a function's argument types +func funcArgTypes(f interface{}) []reflect.Type { + t := reflect.TypeOf(f) + n := t.NumIn() + typez := make([]reflect.Type, n) + for i := 0; i < n; i++ { + typez[i] = t.In(i) + } + return typez +} + +// return a function's return types +func funcReturnTypes(f interface{}) []reflect.Type { + t := reflect.TypeOf(f) + n := t.NumOut() + typez := make([]reflect.Type, n) + for i := 0; i < n; i++ { + typez[i] = t.Out(i) + } + return typez +} + +//------------------------------------------------------------- + +// NOTE: assume returns is result struct and error. If error is not nil, return it +func unreflectResult(returns []reflect.Value) (interface{}, error) { + errV := returns[1] + if errV.Interface() != nil { + return nil, errors.Errorf("%v", errV.Interface()) + } + rv := returns[0] + // the result is a registered interface, + // we need a pointer to it so we can marshal with type byte + rvp := reflect.New(rv.Type()) + rvp.Elem().Set(rv) + return rvp.Interface(), nil +} diff --git a/rpc/lib/server/ws_handler.go b/rpc/lib/server/ws_handler.go new file mode 100644 index 000000000..80bda39f3 --- /dev/null +++ b/rpc/lib/server/ws_handler.go @@ -0,0 +1,448 @@ +package rpcserver + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "reflect" + "runtime/debug" + "time" + + "github.com/gorilla/websocket" + "github.com/pkg/errors" + + amino "github.com/tendermint/go-amino" + + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +/////////////////////////////////////////////////////////////////////////////// +// WebSocket handler +/////////////////////////////////////////////////////////////////////////////// + +const ( + defaultWSWriteChanCapacity = 1000 + defaultWSWriteWait = 10 * time.Second + defaultWSReadWait = 30 * time.Second + defaultWSPingPeriod = (defaultWSReadWait * 9) / 10 +) + +// WebsocketManager provides a WS handler for incoming connections and passes a +// map of functions along with any additional params to new connections. +// NOTE: The websocket path is defined externally, e.g. in node/node.go +type WebsocketManager struct { + websocket.Upgrader + + funcMap map[string]*RPCFunc + cdc *amino.Codec + logger log.Logger + wsConnOptions []func(*wsConnection) +} + +// NewWebsocketManager returns a new WebsocketManager that passes a map of +// functions, connection options and logger to new WS connections. +func NewWebsocketManager( + funcMap map[string]*RPCFunc, + cdc *amino.Codec, + wsConnOptions ...func(*wsConnection), +) *WebsocketManager { + return &WebsocketManager{ + funcMap: funcMap, + cdc: cdc, + Upgrader: websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + // TODO ??? + // + // The default behaviour would be relevant to browser-based clients, + // afaik. I suppose having a pass-through is a workaround for allowing + // for more complex security schemes, shifting the burden of + // AuthN/AuthZ outside the Tendermint RPC. + // I can't think of other uses right now that would warrant a TODO + // though. The real backstory of this TODO shall remain shrouded in + // mystery + return true + }, + }, + logger: log.NewNopLogger(), + wsConnOptions: wsConnOptions, + } +} + +// SetLogger sets the logger. +func (wm *WebsocketManager) SetLogger(l log.Logger) { + wm.logger = l +} + +// WebsocketHandler upgrades the request/response (via http.Hijack) and starts +// the wsConnection. +func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { + wsConn, err := wm.Upgrade(w, r, nil) + if err != nil { + // TODO - return http error + wm.logger.Error("Failed to upgrade connection", "err", err) + return + } + defer func() { + if err := wsConn.Close(); err != nil { + wm.logger.Error("Failed to close connection", "err", err) + } + }() + + // register connection + con := NewWSConnection(wsConn, wm.funcMap, wm.cdc, wm.wsConnOptions...) + con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr())) + wm.logger.Info("New websocket connection", "remote", con.remoteAddr) + err = con.Start() // BLOCKING + if err != nil { + wm.logger.Error("Failed to start connection", "err", err) + return + } + con.Stop() +} + +/////////////////////////////////////////////////////////////////////////////// +// WebSocket connection +/////////////////////////////////////////////////////////////////////////////// + +// A single websocket connection contains listener id, underlying ws +// connection, and the event switch for subscribing to events. +// +// In case of an error, the connection is stopped. +type wsConnection struct { + cmn.BaseService + + remoteAddr string + baseConn *websocket.Conn + // writeChan is never closed, to allow WriteRPCResponse() to fail. + writeChan chan types.RPCResponse + + // chan, which is closed when/if readRoutine errors + // used to abort writeRoutine + readRoutineQuit chan struct{} + + funcMap map[string]*RPCFunc + cdc *amino.Codec + + // write channel capacity + writeChanCapacity int + + // each write times out after this. + writeWait time.Duration + + // Connection times out if we haven't received *anything* in this long, not even pings. + readWait time.Duration + + // Send pings to server with this period. Must be less than readWait, but greater than zero. + pingPeriod time.Duration + + // Maximum message size. + readLimit int64 + + // callback which is called upon disconnect + onDisconnect func(remoteAddr string) + + ctx context.Context + cancel context.CancelFunc +} + +// NewWSConnection wraps websocket.Conn. +// +// See the commentary on the func(*wsConnection) functions for a detailed +// description of how to configure ping period and pong wait time. NOTE: if the +// write buffer is full, pongs may be dropped, which may cause clients to +// disconnect. see https://github.com/gorilla/websocket/issues/97 +func NewWSConnection( + baseConn *websocket.Conn, + funcMap map[string]*RPCFunc, + cdc *amino.Codec, + options ...func(*wsConnection), +) *wsConnection { + wsc := &wsConnection{ + remoteAddr: baseConn.RemoteAddr().String(), + baseConn: baseConn, + funcMap: funcMap, + cdc: cdc, + writeWait: defaultWSWriteWait, + writeChanCapacity: defaultWSWriteChanCapacity, + readWait: defaultWSReadWait, + pingPeriod: defaultWSPingPeriod, + readRoutineQuit: make(chan struct{}), + } + for _, option := range options { + option(wsc) + } + wsc.baseConn.SetReadLimit(wsc.readLimit) + wsc.BaseService = *cmn.NewBaseService(nil, "wsConnection", wsc) + return wsc +} + +// OnDisconnect sets a callback which is used upon disconnect - not +// Goroutine-safe. Nop by default. +func OnDisconnect(onDisconnect func(remoteAddr string)) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.onDisconnect = onDisconnect + } +} + +// WriteWait sets the amount of time to wait before a websocket write times out. +// It should only be used in the constructor - not Goroutine-safe. +func WriteWait(writeWait time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.writeWait = writeWait + } +} + +// WriteChanCapacity sets the capacity of the websocket write channel. +// It should only be used in the constructor - not Goroutine-safe. +func WriteChanCapacity(cap int) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.writeChanCapacity = cap + } +} + +// ReadWait sets the amount of time to wait before a websocket read times out. +// It should only be used in the constructor - not Goroutine-safe. +func ReadWait(readWait time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.readWait = readWait + } +} + +// PingPeriod sets the duration for sending websocket pings. +// It should only be used in the constructor - not Goroutine-safe. +func PingPeriod(pingPeriod time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.pingPeriod = pingPeriod + } +} + +// ReadLimit sets the maximum size for reading message. +// It should only be used in the constructor - not Goroutine-safe. +func ReadLimit(readLimit int64) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.readLimit = readLimit + } +} + +// OnStart implements cmn.Service by starting the read and write routines. It +// blocks until there's some error. +func (wsc *wsConnection) OnStart() error { + wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity) + + // Read subscriptions/unsubscriptions to events + go wsc.readRoutine() + // Write responses, BLOCKING. + wsc.writeRoutine() + + return nil +} + +// OnStop implements cmn.Service by unsubscribing remoteAddr from all +// subscriptions. +func (wsc *wsConnection) OnStop() { + if wsc.onDisconnect != nil { + wsc.onDisconnect(wsc.remoteAddr) + } + + if wsc.ctx != nil { + wsc.cancel() + } +} + +// GetRemoteAddr returns the remote address of the underlying connection. +// It implements WSRPCConnection +func (wsc *wsConnection) GetRemoteAddr() string { + return wsc.remoteAddr +} + +// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. +// It implements WSRPCConnection. It is Goroutine-safe. +func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { + select { + case <-wsc.Quit(): + return + case wsc.writeChan <- resp: + } +} + +// TryWriteRPCResponse attempts to push a response to the writeChan, but does not block. +// It implements WSRPCConnection. It is Goroutine-safe +func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { + select { + case <-wsc.Quit(): + return false + case wsc.writeChan <- resp: + return true + default: + return false + } +} + +// Codec returns an amino codec used to decode parameters and encode results. +// It implements WSRPCConnection. +func (wsc *wsConnection) Codec() *amino.Codec { + return wsc.cdc +} + +// Context returns the connection's context. +// The context is canceled when the client's connection closes. +func (wsc *wsConnection) Context() context.Context { + if wsc.ctx != nil { + return wsc.ctx + } + wsc.ctx, wsc.cancel = context.WithCancel(context.Background()) + return wsc.ctx +} + +// Read from the socket and subscribe to or unsubscribe from events +func (wsc *wsConnection) readRoutine() { + var request types.RPCRequest + + defer func() { + if r := recover(); r != nil { + err, ok := r.(error) + if !ok { + err = fmt.Errorf("WSJSONRPC: %v", r) + } + wsc.Logger.Error("Panic in WSJSONRPC handler", "err", err, "stack", string(debug.Stack())) + wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err)) + go wsc.readRoutine() + } + }() + + wsc.baseConn.SetPongHandler(func(m string) error { + return wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)) + }) + + for { + select { + case <-wsc.Quit(): + return + default: + // reset deadline for every type of message (control or data) + if err := wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)); err != nil { + wsc.Logger.Error("failed to set read deadline", "err", err) + } + var in []byte + _, in, err := wsc.baseConn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + wsc.Logger.Info("Client closed the connection") + } else { + wsc.Logger.Error("Failed to read request", "err", err) + } + wsc.Stop() + close(wsc.readRoutineQuit) + return + } + + err = json.Unmarshal(in, &request) + if err != nil { + wsc.WriteRPCResponse(types.RPCParseError(errors.Wrap(err, "error unmarshaling request"))) + continue + } + + // A Notification is a Request object without an "id" member. + // The Server MUST NOT reply to a Notification, including those that are within a batch request. + if request.ID == nil { + wsc.Logger.Debug( + "WSJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)", + "req", request, + ) + continue + } + + // Now, fetch the RPCFunc and execute it. + rpcFunc := wsc.funcMap[request.Method] + if rpcFunc == nil { + wsc.WriteRPCResponse(types.RPCMethodNotFoundError(request.ID)) + continue + } + + ctx := &types.Context{JSONReq: &request, WSConn: wsc} + args := []reflect.Value{reflect.ValueOf(ctx)} + if len(request.Params) > 0 { + fnArgs, err := jsonParamsToArgs(rpcFunc, wsc.cdc, request.Params) + if err != nil { + wsc.WriteRPCResponse( + types.RPCInternalError(request.ID, errors.Wrap(err, "error converting json params to arguments")), + ) + continue + } + args = append(args, fnArgs...) + } + + returns := rpcFunc.f.Call(args) + + // TODO: Need to encode args/returns to string if we want to log them + wsc.Logger.Info("WSJSONRPC", "method", request.Method) + + result, err := unreflectResult(returns) + if err != nil { + wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err)) + continue + } + + wsc.WriteRPCResponse(types.NewRPCSuccessResponse(wsc.cdc, request.ID, result)) + } + } +} + +// receives on a write channel and writes out on the socket +func (wsc *wsConnection) writeRoutine() { + pingTicker := time.NewTicker(wsc.pingPeriod) + defer func() { + pingTicker.Stop() + }() + + // https://github.com/gorilla/websocket/issues/97 + pongs := make(chan string, 1) + wsc.baseConn.SetPingHandler(func(m string) error { + select { + case pongs <- m: + default: + } + return nil + }) + + for { + select { + case <-wsc.Quit(): + return + case <-wsc.readRoutineQuit: // error in readRoutine + return + case m := <-pongs: + err := wsc.writeMessageWithDeadline(websocket.PongMessage, []byte(m)) + if err != nil { + wsc.Logger.Info("Failed to write pong (client may disconnect)", "err", err) + } + case <-pingTicker.C: + err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{}) + if err != nil { + wsc.Logger.Error("Failed to write ping", "err", err) + return + } + case msg := <-wsc.writeChan: + jsonBytes, err := json.MarshalIndent(msg, "", " ") + if err != nil { + wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err) + } else if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil { + wsc.Logger.Error("Failed to write response", "msg", msg, "err", err) + return + } + } + } +} + +// All writes to the websocket must (re)set the write deadline. +// If some writes don't set it while others do, they may timeout incorrectly +// (https://github.com/tendermint/tendermint/issues/553) +func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error { + if err := wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait)); err != nil { + return err + } + return wsc.baseConn.WriteMessage(msgType, msg) +} diff --git a/rpc/lib/server/ws_handler_test.go b/rpc/lib/server/ws_handler_test.go new file mode 100644 index 000000000..f58e17ee7 --- /dev/null +++ b/rpc/lib/server/ws_handler_test.go @@ -0,0 +1,59 @@ +package rpcserver + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" + + amino "github.com/tendermint/go-amino" + + "github.com/tendermint/tendermint/libs/log" + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +func TestWebsocketManagerHandler(t *testing.T) { + s := newWSServer() + defer s.Close() + + // check upgrader works + d := websocket.Dialer{} + c, dialResp, err := d.Dial("ws://"+s.Listener.Addr().String()+"/websocket", nil) + require.NoError(t, err) + + if got, want := dialResp.StatusCode, http.StatusSwitchingProtocols; got != want { + t.Errorf("dialResp.StatusCode = %q, want %q", got, want) + } + + // check basic functionality works + req, err := types.MapToRequest( + amino.NewCodec(), + types.JSONRPCStringID("TestWebsocketManager"), + "c", + map[string]interface{}{"s": "a", "i": 10}, + ) + require.NoError(t, err) + err = c.WriteJSON(req) + require.NoError(t, err) + + var resp types.RPCResponse + err = c.ReadJSON(&resp) + require.NoError(t, err) + require.Nil(t, resp.Error) + dialResp.Body.Close() +} + +func newWSServer() *httptest.Server { + funcMap := map[string]*RPCFunc{ + "c": NewWSRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"), + } + wm := NewWebsocketManager(funcMap, amino.NewCodec()) + wm.SetLogger(log.TestingLogger()) + + mux := http.NewServeMux() + mux.HandleFunc("/websocket", wm.WebsocketHandler) + + return httptest.NewServer(mux) +} diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index 14317d437..0ce9890e5 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -22,12 +22,14 @@ type jsonrpcid interface { // JSONRPCStringID a wrapper for JSON-RPC string IDs type JSONRPCStringID string -func (JSONRPCStringID) isJSONRPCID() {} +func (JSONRPCStringID) isJSONRPCID() {} +func (id JSONRPCStringID) String() string { return string(id) } // JSONRPCIntID a wrapper for JSON-RPC integer IDs type JSONRPCIntID int -func (JSONRPCIntID) isJSONRPCID() {} +func (JSONRPCIntID) isJSONRPCID() {} +func (id JSONRPCIntID) String() string { return fmt.Sprintf("%d", id) } func idFromInterface(idInterface interface{}) (jsonrpcid, error) { switch id := idInterface.(type) { @@ -50,16 +52,16 @@ func idFromInterface(idInterface interface{}) (jsonrpcid, error) { type RPCRequest struct { JSONRPC string `json:"jsonrpc"` - ID jsonrpcid `json:"id"` + ID jsonrpcid `json:"id,omitempty"` Method string `json:"method"` Params json.RawMessage `json:"params"` // must be map[string]interface{} or []interface{} } // UnmarshalJSON custom JSON unmarshalling due to jsonrpcid being string or int -func (request *RPCRequest) UnmarshalJSON(data []byte) error { +func (req *RPCRequest) UnmarshalJSON(data []byte) error { unsafeReq := &struct { JSONRPC string `json:"jsonrpc"` - ID interface{} `json:"id"` + ID interface{} `json:"id,omitempty"` Method string `json:"method"` Params json.RawMessage `json:"params"` // must be map[string]interface{} or []interface{} }{} @@ -67,9 +69,9 @@ func (request *RPCRequest) UnmarshalJSON(data []byte) error { if err != nil { return err } - request.JSONRPC = unsafeReq.JSONRPC - request.Method = unsafeReq.Method - request.Params = unsafeReq.Params + req.JSONRPC = unsafeReq.JSONRPC + req.Method = unsafeReq.Method + req.Params = unsafeReq.Params if unsafeReq.ID == nil { return nil } @@ -77,7 +79,7 @@ func (request *RPCRequest) UnmarshalJSON(data []byte) error { if err != nil { return err } - request.ID = id + req.ID = id return nil } @@ -91,41 +93,43 @@ func NewRPCRequest(id jsonrpcid, method string, params json.RawMessage) RPCReque } func (req RPCRequest) String() string { - return fmt.Sprintf("[%s %s]", req.ID, req.Method) + return fmt.Sprintf("RPCRequest{%s %s/%X}", req.ID, req.Method, req.Params) } func MapToRequest(cdc *amino.Codec, id jsonrpcid, method string, params map[string]interface{}) (RPCRequest, error) { - var params_ = make(map[string]json.RawMessage, len(params)) + var paramsMap = make(map[string]json.RawMessage, len(params)) for name, value := range params { valueJSON, err := cdc.MarshalJSON(value) if err != nil { return RPCRequest{}, err } - params_[name] = valueJSON + paramsMap[name] = valueJSON } - payload, err := json.Marshal(params_) // NOTE: Amino doesn't handle maps yet. + + payload, err := json.Marshal(paramsMap) // NOTE: Amino doesn't handle maps yet. if err != nil { return RPCRequest{}, err } - request := NewRPCRequest(id, method, payload) - return request, nil + + return NewRPCRequest(id, method, payload), nil } func ArrayToRequest(cdc *amino.Codec, id jsonrpcid, method string, params []interface{}) (RPCRequest, error) { - var params_ = make([]json.RawMessage, len(params)) + var paramsMap = make([]json.RawMessage, len(params)) for i, value := range params { valueJSON, err := cdc.MarshalJSON(value) if err != nil { return RPCRequest{}, err } - params_[i] = valueJSON + paramsMap[i] = valueJSON } - payload, err := json.Marshal(params_) // NOTE: Amino doesn't handle maps yet. + + payload, err := json.Marshal(paramsMap) // NOTE: Amino doesn't handle maps yet. if err != nil { return RPCRequest{}, err } - request := NewRPCRequest(id, method, payload) - return request, nil + + return NewRPCRequest(id, method, payload), nil } //---------------------------------------- @@ -147,16 +151,16 @@ func (err RPCError) Error() string { type RPCResponse struct { JSONRPC string `json:"jsonrpc"` - ID jsonrpcid `json:"id"` + ID jsonrpcid `json:"id,omitempty"` Result json.RawMessage `json:"result,omitempty"` Error *RPCError `json:"error,omitempty"` } // UnmarshalJSON custom JSON unmarshalling due to jsonrpcid being string or int -func (response *RPCResponse) UnmarshalJSON(data []byte) error { +func (resp *RPCResponse) UnmarshalJSON(data []byte) error { unsafeResp := &struct { JSONRPC string `json:"jsonrpc"` - ID interface{} `json:"id"` + ID interface{} `json:"id,omitempty"` Result json.RawMessage `json:"result,omitempty"` Error *RPCError `json:"error,omitempty"` }{} @@ -164,9 +168,9 @@ func (response *RPCResponse) UnmarshalJSON(data []byte) error { if err != nil { return err } - response.JSONRPC = unsafeResp.JSONRPC - response.Error = unsafeResp.Error - response.Result = unsafeResp.Result + resp.JSONRPC = unsafeResp.JSONRPC + resp.Error = unsafeResp.Error + resp.Result = unsafeResp.Result if unsafeResp.ID == nil { return nil } @@ -174,7 +178,7 @@ func (response *RPCResponse) UnmarshalJSON(data []byte) error { if err != nil { return err } - response.ID = id + resp.ID = id return nil } @@ -203,15 +207,21 @@ func NewRPCErrorResponse(id jsonrpcid, code int, msg string, data string) RPCRes func (resp RPCResponse) String() string { if resp.Error == nil { - return fmt.Sprintf("[%s %v]", resp.ID, resp.Result) + return fmt.Sprintf("RPCResponse{%s %v}", resp.ID, resp.Result) } - return fmt.Sprintf("[%s %s]", resp.ID, resp.Error) + return fmt.Sprintf("RPCResponse{%s %v}", resp.ID, resp.Error) } -func RPCParseError(id jsonrpcid, err error) RPCResponse { - return NewRPCErrorResponse(id, -32700, "Parse error. Invalid JSON", err.Error()) +// From the JSON-RPC 2.0 spec: +// If there was an error in detecting the id in the Request object (e.g. Parse +// error/Invalid Request), it MUST be Null. +func RPCParseError(err error) RPCResponse { + return NewRPCErrorResponse(nil, -32700, "Parse error. Invalid JSON", err.Error()) } +// From the JSON-RPC 2.0 spec: +// If there was an error in detecting the id in the Request object (e.g. Parse +// error/Invalid Request), it MUST be Null. func RPCInvalidRequestError(id jsonrpcid, err error) RPCResponse { return NewRPCErrorResponse(id, -32600, "Invalid Request", err.Error()) } diff --git a/rpc/lib/types/types_test.go b/rpc/lib/types/types_test.go index d6cf1b397..e2a98a18f 100644 --- a/rpc/lib/types/types_test.go +++ b/rpc/lib/types/types_test.go @@ -41,12 +41,9 @@ func TestResponses(t *testing.T) { s := fmt.Sprintf(`{"jsonrpc":"2.0","id":%v,"result":{"Value":"hello"}}`, tt.expected) assert.Equal(s, string(b)) - d := RPCParseError(jsonid, errors.New("Hello world")) + d := RPCParseError(errors.New("Hello world")) e, _ := json.Marshal(d) - f := fmt.Sprintf( - `{"jsonrpc":"2.0","id":%v,"error":{"code":-32700,"message":"Parse error. Invalid JSON","data":"Hello world"}}`, - tt.expected, - ) + f := `{"jsonrpc":"2.0","error":{"code":-32700,"message":"Parse error. Invalid JSON","data":"Hello world"}}` assert.Equal(f, string(e)) g := RPCMethodNotFoundError(jsonid) diff --git a/rpc/swagger/swagger.yaml b/rpc/swagger/swagger.yaml index dc41ce47d..066d62102 100644 --- a/rpc/swagger/swagger.yaml +++ b/rpc/swagger/swagger.yaml @@ -918,11 +918,11 @@ definitions: type: object properties: id: - type: string - x-example: "" + type: number + example: 0 jsonrpc: type: string - x-example: "2.0" + example: "2.0" EmptyResponse: description: Empty Response allOf: @@ -1414,8 +1414,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: type: "object" required: @@ -1590,8 +1590,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "signed_header" @@ -1779,8 +1779,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "block_height" @@ -1827,8 +1827,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: type: "object" required: @@ -1935,8 +1935,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "round_state" @@ -2280,8 +2280,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "round_state" @@ -2351,8 +2351,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: type: "object" required: @@ -2416,8 +2416,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "n_txs" @@ -2453,8 +2453,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "n_txs" @@ -2491,8 +2491,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "txs" @@ -2594,8 +2594,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "hash" @@ -2655,8 +2655,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "response" @@ -2727,8 +2727,8 @@ definitions: type: "object" type: "object" id: - type: "string" - example: "" + type: "number" + example: 0 jsonrpc: type: "string" example: "2.0" @@ -2745,8 +2745,8 @@ definitions: type: "string" example: "" id: - type: "string" - example: "" + type: "number" + example: 0 jsonrpc: type: "string" example: "2.0" @@ -2808,8 +2808,8 @@ definitions: type: "object" type: "object" id: - type: "string" - example: "" + type: "number" + example: 0 jsonrpc: type: "string" example: "2.0" @@ -2825,8 +2825,8 @@ definitions: type: "string" example: "2.0" id: - type: "string" - example: "" + type: "number" + example: 0 result: required: - "code" diff --git a/tools/tm-bench/transacter.go b/tools/tm-bench/transacter.go index 57324de35..f453ebcf7 100644 --- a/tools/tm-bench/transacter.go +++ b/tools/tm-bench/transacter.go @@ -194,7 +194,7 @@ func (t *transacter) sendLoop(connIndex int) { c.SetWriteDeadline(now.Add(sendTimeout)) err = c.WriteJSON(rpctypes.RPCRequest{ JSONRPC: "2.0", - ID: rpctypes.JSONRPCStringID("tm-bench"), + ID: rpctypes.JSONRPCIntID((connIndex * t.Rate) + i), Method: t.BroadcastTxMethod, Params: rawParamsJSON, })