diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..c23c911ce --- /dev/null +++ b/Dockerfile @@ -0,0 +1,12 @@ +FROM golang:latest + +RUN mkdir -p /go/src/github.com/tendermint/go-rpc +WORKDIR /go/src/github.com/tendermint/go-rpc + +COPY Makefile /go/src/github.com/tendermint/go-rpc/ +# COPY glide.yaml /go/src/github.com/tendermint/go-rpc/ +# COPY glide.lock /go/src/github.com/tendermint/go-rpc/ + +COPY . /go/src/github.com/tendermint/go-rpc + +RUN make get_deps diff --git a/LICENSE b/LICENSE new file mode 100644 index 000000000..8dada3eda --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..0937558a8 --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ +PACKAGES=$(shell go list ./... | grep -v "test") + +all: get_deps test + +test: + @echo "--> Running go test --race" + @go test --race $(PACKAGES) + @echo "--> Running integration tests" + @bash ./test/integration_test.sh + +get_deps: + @echo "--> Running go get" + @go get -v -d $(PACKAGES) + @go list -f '{{join .TestImports "\n"}}' ./... | \ + grep -v /vendor/ | sort | uniq | \ + xargs go get -v -d + +.PHONY: all test get_deps diff --git a/README.md b/README.md new file mode 100644 index 000000000..79dd9692e --- /dev/null +++ b/README.md @@ -0,0 +1,128 @@ +# go-rpc + +[![CircleCI](https://circleci.com/gh/tendermint/go-rpc.svg?style=svg)](https://circleci.com/gh/tendermint/go-rpc) + +HTTP RPC server supporting calls via uri params, jsonrpc, and jsonrpc over websockets + +# Client Requests + +Suppose we want to expose the rpc function `HelloWorld(name string, num int)`. + +## GET (URI) + +As a GET request, it would have URI encoded parameters, and look like: + +``` +curl 'http://localhost:8008/hello_world?name="my_world"&num=5' +``` + +Note the `'` around the url, which is just so bash doesn't ignore the quotes in `"my_world"`. +This should also work: + +``` +curl http://localhost:8008/hello_world?name=\"my_world\"&num=5 +``` + +A GET request to `/` returns a list of available endpoints. +For those which take arguments, the arguments will be listed in order, with `_` where the actual value should be. + +## POST (JSONRPC) + +As a POST request, we use JSONRPC. For instance, the same request would have this as the body: + +``` +{ + "jsonrpc": "2.0", + "id": "anything", + "method": "hello_world", + "params": { + "name": "my_world", + "num": 5 + } +} +``` + +With the above saved in file `data.json`, we can make the request with + +``` +curl --data @data.json http://localhost:8008 +``` + +## WebSocket (JSONRPC) + +All requests are exposed over websocket in the same form as the POST JSONRPC. +Websocket connections are available at their own endpoint, typically `/websocket`, +though this is configurable when starting the server. + +# Server Definition + +Define some types and routes: + +``` +// Define a type for results and register concrete versions with go-wire +type Result interface{} + +type ResultStatus struct { + Value string +} + +var _ = wire.RegisterInterface( + struct{ Result }{}, + wire.ConcreteType{&ResultStatus{}, 0x1}, +) + +// Define some routes +var Routes = map[string]*rpcserver.RPCFunc{ + "status": rpcserver.NewRPCFunc(StatusResult, "arg"), +} + +// an rpc function +func StatusResult(v string) (Result, error) { + return &ResultStatus{v}, nil +} + +``` + +Now start the server: + +``` +mux := http.NewServeMux() +rpcserver.RegisterRPCFuncs(mux, Routes) +wm := rpcserver.NewWebsocketManager(Routes, nil) +mux.HandleFunc("/websocket", wm.WebsocketHandler) +go func() { + _, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux) + if err != nil { + panic(err) + } +}() + +``` + +Note that unix sockets are supported as well (eg. `/path/to/socket` instead of `0.0.0.0:8008`) + +Now see all available endpoints by sending a GET request to `0.0.0.0:8008`. +Each route is available as a GET request, as a JSONRPCv2 POST request, and via JSONRPCv2 over websockets. + + +# Examples + +* [Tendermint](https://github.com/tendermint/tendermint/blob/master/rpc/core/routes.go) +* [tm-monitor](https://github.com/tendermint/tools/blob/master/tm-monitor/rpc.go) + +## CHANGELOG + +### 0.7.0 + +BREAKING CHANGES: + +- removed `Client` empty interface +- `ClientJSONRPC#Call` `params` argument became a map +- rename `ClientURI` -> `URIClient`, `ClientJSONRPC` -> `JSONRPCClient` + +IMPROVEMENTS: + +- added `HTTPClient` interface, which can be used for both `ClientURI` +and `ClientJSONRPC` +- all params are now optional (Golang's default will be used if some param is missing) +- added `Call` method to `WSClient` (see method's doc for details) diff --git a/circle.yml b/circle.yml new file mode 100644 index 000000000..0308a4e79 --- /dev/null +++ b/circle.yml @@ -0,0 +1,21 @@ +machine: + environment: + GOPATH: /home/ubuntu/.go_workspace + REPO: $GOPATH/src/github.com/$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME + hosts: + circlehost: 127.0.0.1 + localhost: 127.0.0.1 + +checkout: + post: + - rm -rf $REPO + - mkdir -p $HOME/.go_workspace/src/github.com/$CIRCLE_PROJECT_USERNAME + - mv $HOME/$CIRCLE_PROJECT_REPONAME $REPO + +dependencies: + override: + - "cd $REPO && make get_deps" + +test: + override: + - "cd $REPO && make test" diff --git a/client/http_client.go b/client/http_client.go new file mode 100644 index 000000000..f4a2a6d7e --- /dev/null +++ b/client/http_client.go @@ -0,0 +1,198 @@ +package rpcclient + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "reflect" + "strings" + + "github.com/pkg/errors" + types "github.com/tendermint/go-rpc/types" + wire "github.com/tendermint/go-wire" +) + +// HTTPClient is a common interface for JSONRPCClient and URIClient. +type HTTPClient interface { + Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) +} + +// TODO: Deprecate support for IP:PORT or /path/to/socket +func makeHTTPDialer(remoteAddr string) (string, func(string, string) (net.Conn, error)) { + + parts := strings.SplitN(remoteAddr, "://", 2) + var protocol, address string + if len(parts) != 2 { + log.Warn("WARNING (go-rpc): Please use fully formed listening addresses, including the tcp:// or unix:// prefix") + protocol = types.SocketType(remoteAddr) + address = remoteAddr + } else { + protocol, address = parts[0], parts[1] + } + + trimmedAddress := strings.Replace(address, "/", ".", -1) // replace / with . for http requests (dummy domain) + return trimmedAddress, func(proto, addr string) (net.Conn, error) { + return net.Dial(protocol, address) + } +} + +// We overwrite the http.Client.Dial so we can do http over tcp or unix. +// remoteAddr should be fully featured (eg. with tcp:// or unix://) +func makeHTTPClient(remoteAddr string) (string, *http.Client) { + address, dialer := makeHTTPDialer(remoteAddr) + return "http://" + address, &http.Client{ + Transport: &http.Transport{ + Dial: dialer, + }, + } +} + +//------------------------------------------------------------------------------------ + +// JSON rpc takes params as a slice +type JSONRPCClient struct { + address string + client *http.Client +} + +func NewJSONRPCClient(remote string) *JSONRPCClient { + address, client := makeHTTPClient(remote) + return &JSONRPCClient{ + address: address, + client: client, + } +} + +func (c *JSONRPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { + // we need this step because we attempt to decode values using `go-wire` + // (handlers.go:176) on the server side + encodedParams := make(map[string]interface{}) + for k, v := range params { + bytes := json.RawMessage(wire.JSONBytes(v)) + encodedParams[k] = &bytes + } + request := types.RPCRequest{ + JSONRPC: "2.0", + Method: method, + Params: encodedParams, + ID: "", + } + requestBytes, err := json.Marshal(request) + if err != nil { + return nil, err + } + // log.Info(string(requestBytes)) + requestBuf := bytes.NewBuffer(requestBytes) + // log.Info(Fmt("RPC request to %v (%v): %v", c.remote, method, string(requestBytes))) + httpResponse, err := c.client.Post(c.address, "text/json", requestBuf) + if err != nil { + return nil, err + } + defer httpResponse.Body.Close() + responseBytes, err := ioutil.ReadAll(httpResponse.Body) + if err != nil { + return nil, err + } + // log.Info(Fmt("RPC response: %v", string(responseBytes))) + return unmarshalResponseBytes(responseBytes, result) +} + +//------------------------------------------------------------- + +// URI takes params as a map +type URIClient struct { + address string + client *http.Client +} + +func NewURIClient(remote string) *URIClient { + address, client := makeHTTPClient(remote) + return &URIClient{ + address: address, + client: client, + } +} + +func (c *URIClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { + values, err := argsToURLValues(params) + if err != nil { + return nil, err + } + // log.Info(Fmt("URI request to %v (%v): %v", c.address, method, values)) + resp, err := c.client.PostForm(c.address+"/"+method, values) + if err != nil { + return nil, err + } + defer resp.Body.Close() + responseBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + return unmarshalResponseBytes(responseBytes, result) +} + +//------------------------------------------------ + +func unmarshalResponseBytes(responseBytes []byte, result interface{}) (interface{}, error) { + // read response + // if rpc/core/types is imported, the result will unmarshal + // into the correct type + // log.Notice("response", "response", string(responseBytes)) + var err error + response := &types.RPCResponse{} + err = json.Unmarshal(responseBytes, response) + if err != nil { + return nil, errors.Errorf("Error unmarshalling rpc response: %v", err) + } + errorStr := response.Error + if errorStr != "" { + return nil, errors.Errorf("Response error: %v", errorStr) + } + // unmarshal the RawMessage into the result + result = wire.ReadJSONPtr(result, *response.Result, &err) + if err != nil { + return nil, errors.Errorf("Error unmarshalling rpc response result: %v", err) + } + return result, nil +} + +func argsToURLValues(args map[string]interface{}) (url.Values, error) { + values := make(url.Values) + if len(args) == 0 { + return values, nil + } + err := argsToJson(args) + if err != nil { + return nil, err + } + for key, val := range args { + values.Set(key, val.(string)) + } + return values, nil +} + +func argsToJson(args map[string]interface{}) error { + var n int + var err error + for k, v := range args { + // Convert byte slices to "0x"-prefixed hex + byteSlice, isByteSlice := reflect.ValueOf(v).Interface().([]byte) + if isByteSlice { + args[k] = fmt.Sprintf("0x%X", byteSlice) + continue + } + + // Pass everything else to go-wire + buf := new(bytes.Buffer) + wire.WriteJSON(v, buf, &n, &err) + if err != nil { + return err + } + args[k] = buf.String() + } + return nil +} diff --git a/client/log.go b/client/log.go new file mode 100644 index 000000000..8b33e2f10 --- /dev/null +++ b/client/log.go @@ -0,0 +1,7 @@ +package rpcclient + +import ( + "github.com/tendermint/log15" +) + +var log = log15.New("module", "rpcclient") diff --git a/client/ws_client.go b/client/ws_client.go new file mode 100644 index 000000000..b83d62974 --- /dev/null +++ b/client/ws_client.go @@ -0,0 +1,172 @@ +package rpcclient + +import ( + "encoding/json" + "net" + "net/http" + "time" + + "github.com/gorilla/websocket" + "github.com/pkg/errors" + cmn "github.com/tendermint/tmlibs/common" + types "github.com/tendermint/go-rpc/types" + wire "github.com/tendermint/go-wire" +) + +const ( + wsResultsChannelCapacity = 10 + wsErrorsChannelCapacity = 1 + wsWriteTimeoutSeconds = 10 +) + +type WSClient struct { + cmn.BaseService + Address string // IP:PORT or /path/to/socket + Endpoint string // /websocket/url/endpoint + Dialer func(string, string) (net.Conn, error) + *websocket.Conn + ResultsCh chan json.RawMessage // closes upon WSClient.Stop() + ErrorsCh chan error // closes upon WSClient.Stop() +} + +// create a new connection +func NewWSClient(remoteAddr, endpoint string) *WSClient { + addr, dialer := makeHTTPDialer(remoteAddr) + wsClient := &WSClient{ + Address: addr, + Dialer: dialer, + Endpoint: endpoint, + Conn: nil, + } + wsClient.BaseService = *cmn.NewBaseService(log, "WSClient", wsClient) + return wsClient +} + +func (wsc *WSClient) String() string { + return wsc.Address + ", " + wsc.Endpoint +} + +// OnStart implements cmn.BaseService interface +func (wsc *WSClient) OnStart() error { + wsc.BaseService.OnStart() + err := wsc.dial() + if err != nil { + return err + } + wsc.ResultsCh = make(chan json.RawMessage, wsResultsChannelCapacity) + wsc.ErrorsCh = make(chan error, wsErrorsChannelCapacity) + go wsc.receiveEventsRoutine() + return nil +} + +// OnReset implements cmn.BaseService interface +func (wsc *WSClient) OnReset() error { + return nil +} + +func (wsc *WSClient) dial() error { + + // Dial + dialer := &websocket.Dialer{ + NetDial: wsc.Dialer, + Proxy: http.ProxyFromEnvironment, + } + rHeader := http.Header{} + con, _, err := dialer.Dial("ws://"+wsc.Address+wsc.Endpoint, rHeader) + if err != nil { + return err + } + // Set the ping/pong handlers + con.SetPingHandler(func(m string) error { + // NOTE: https://github.com/gorilla/websocket/issues/97 + go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) + return nil + }) + con.SetPongHandler(func(m string) error { + // NOTE: https://github.com/gorilla/websocket/issues/97 + return nil + }) + wsc.Conn = con + return nil +} + +// OnStop implements cmn.BaseService interface +func (wsc *WSClient) OnStop() { + wsc.BaseService.OnStop() + wsc.Conn.Close() + // ResultsCh/ErrorsCh is closed in receiveEventsRoutine. +} + +func (wsc *WSClient) receiveEventsRoutine() { + for { + _, data, err := wsc.ReadMessage() + if err != nil { + log.Info("WSClient failed to read message", "error", err, "data", string(data)) + wsc.Stop() + break + } else { + var response types.RPCResponse + err := json.Unmarshal(data, &response) + if err != nil { + log.Info("WSClient failed to parse message", "error", err, "data", string(data)) + wsc.ErrorsCh <- err + continue + } + if response.Error != "" { + wsc.ErrorsCh <- errors.Errorf(response.Error) + continue + } + wsc.ResultsCh <- *response.Result + } + } + // this must be modified in the same go-routine that reads from the + // connection to avoid race conditions + wsc.Conn = nil + + // Cleanup + close(wsc.ResultsCh) + close(wsc.ErrorsCh) +} + +// Subscribe to an event. Note the server must have a "subscribe" route +// defined. +func (wsc *WSClient) Subscribe(eventid string) error { + err := wsc.WriteJSON(types.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "subscribe", + Params: map[string]interface{}{"event": eventid}, + }) + return err +} + +// Unsubscribe from an event. Note the server must have a "unsubscribe" route +// defined. +func (wsc *WSClient) Unsubscribe(eventid string) error { + err := wsc.WriteJSON(types.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "unsubscribe", + Params: map[string]interface{}{"event": eventid}, + }) + return err +} + +// Call asynchronously calls a given method by sending an RPCRequest to the +// server. Results will be available on ResultsCh, errors, if any, on ErrorsCh. +func (wsc *WSClient) Call(method string, params map[string]interface{}) error { + // we need this step because we attempt to decode values using `go-wire` + // (handlers.go:470) on the server side + encodedParams := make(map[string]interface{}) + for k, v := range params { + bytes := json.RawMessage(wire.JSONBytes(v)) + encodedParams[k] = &bytes + } + err := wsc.WriteJSON(types.RPCRequest{ + JSONRPC: "2.0", + Method: method, + Params: encodedParams, + ID: "", + }) + return err +} diff --git a/rpc_test.go b/rpc_test.go new file mode 100644 index 000000000..ed28cbc8d --- /dev/null +++ b/rpc_test.go @@ -0,0 +1,298 @@ +package rpc + +import ( + "bytes" + crand "crypto/rand" + "fmt" + "math/rand" + "net/http" + "os/exec" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + client "github.com/tendermint/go-rpc/client" + server "github.com/tendermint/go-rpc/server" + types "github.com/tendermint/go-rpc/types" + wire "github.com/tendermint/go-wire" +) + +// Client and Server should work over tcp or unix sockets +const ( + tcpAddr = "tcp://0.0.0.0:46657" + + unixSocket = "/tmp/go-rpc.sock" + unixAddr = "unix:///tmp/go-rpc.sock" + + websocketEndpoint = "/websocket/endpoint" +) + +// Define a type for results and register concrete versions +type Result interface{} + +type ResultEcho struct { + Value string +} + +type ResultEchoBytes struct { + Value []byte +} + +var _ = wire.RegisterInterface( + struct{ Result }{}, + wire.ConcreteType{&ResultEcho{}, 0x1}, + wire.ConcreteType{&ResultEchoBytes{}, 0x2}, +) + +// Define some routes +var Routes = map[string]*server.RPCFunc{ + "echo": server.NewRPCFunc(EchoResult, "arg"), + "echo_ws": server.NewWSRPCFunc(EchoWSResult, "arg"), + "echo_bytes": server.NewRPCFunc(EchoBytesResult, "arg"), +} + +func EchoResult(v string) (Result, error) { + return &ResultEcho{v}, nil +} + +func EchoWSResult(wsCtx types.WSRPCContext, v string) (Result, error) { + return &ResultEcho{v}, nil +} + +func EchoBytesResult(v []byte) (Result, error) { + return &ResultEchoBytes{v}, nil +} + +// launch unix and tcp servers +func init() { + cmd := exec.Command("rm", "-f", unixSocket) + err := cmd.Start() + if err != nil { + panic(err) + } + if err = cmd.Wait(); err != nil { + panic(err) + } + + mux := http.NewServeMux() + server.RegisterRPCFuncs(mux, Routes) + wm := server.NewWebsocketManager(Routes, nil) + mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) + go func() { + _, err := server.StartHTTPServer(tcpAddr, mux) + if err != nil { + panic(err) + } + }() + + mux2 := http.NewServeMux() + server.RegisterRPCFuncs(mux2, Routes) + wm = server.NewWebsocketManager(Routes, nil) + mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler) + go func() { + _, err := server.StartHTTPServer(unixAddr, mux2) + if err != nil { + panic(err) + } + }() + + // wait for servers to start + time.Sleep(time.Second * 2) +} + +func echoViaHTTP(cl client.HTTPClient, val string) (string, error) { + params := map[string]interface{}{ + "arg": val, + } + var result Result + if _, err := cl.Call("echo", params, &result); err != nil { + return "", err + } + return result.(*ResultEcho).Value, nil +} + +func echoBytesViaHTTP(cl client.HTTPClient, bytes []byte) ([]byte, error) { + params := map[string]interface{}{ + "arg": bytes, + } + var result Result + if _, err := cl.Call("echo_bytes", params, &result); err != nil { + return []byte{}, err + } + return result.(*ResultEchoBytes).Value, nil +} + +func testWithHTTPClient(t *testing.T, cl client.HTTPClient) { + val := "acbd" + got, err := echoViaHTTP(cl, val) + require.Nil(t, err) + assert.Equal(t, got, val) + + val2 := randBytes(t) + got2, err := echoBytesViaHTTP(cl, val2) + require.Nil(t, err) + assert.Equal(t, got2, val2) +} + +func echoViaWS(cl *client.WSClient, val string) (string, error) { + params := map[string]interface{}{ + "arg": val, + } + err := cl.Call("echo", params) + if err != nil { + return "", err + } + + select { + case msg := <-cl.ResultsCh: + result := new(Result) + wire.ReadJSONPtr(result, msg, &err) + if err != nil { + return "", nil + } + return (*result).(*ResultEcho).Value, nil + case err := <-cl.ErrorsCh: + return "", err + } +} + +func echoBytesViaWS(cl *client.WSClient, bytes []byte) ([]byte, error) { + params := map[string]interface{}{ + "arg": bytes, + } + err := cl.Call("echo_bytes", params) + if err != nil { + return []byte{}, err + } + + select { + case msg := <-cl.ResultsCh: + result := new(Result) + wire.ReadJSONPtr(result, msg, &err) + if err != nil { + return []byte{}, nil + } + return (*result).(*ResultEchoBytes).Value, nil + case err := <-cl.ErrorsCh: + return []byte{}, err + } +} + +func testWithWSClient(t *testing.T, cl *client.WSClient) { + val := "acbd" + got, err := echoViaWS(cl, val) + require.Nil(t, err) + assert.Equal(t, got, val) + + val2 := randBytes(t) + got2, err := echoBytesViaWS(cl, val2) + require.Nil(t, err) + assert.Equal(t, got2, val2) +} + +//------------- + +func TestServersAndClientsBasic(t *testing.T) { + serverAddrs := [...]string{tcpAddr, unixAddr} + for _, addr := range serverAddrs { + cl1 := client.NewURIClient(addr) + fmt.Printf("=== testing server on %s using %v client", addr, cl1) + testWithHTTPClient(t, cl1) + + cl2 := client.NewJSONRPCClient(tcpAddr) + fmt.Printf("=== testing server on %s using %v client", addr, cl2) + testWithHTTPClient(t, cl2) + + cl3 := client.NewWSClient(tcpAddr, websocketEndpoint) + _, err := cl3.Start() + require.Nil(t, err) + fmt.Printf("=== testing server on %s using %v client", addr, cl3) + testWithWSClient(t, cl3) + cl3.Stop() + } +} + +func TestHexStringArg(t *testing.T) { + cl := client.NewURIClient(tcpAddr) + // should NOT be handled as hex + val := "0xabc" + got, err := echoViaHTTP(cl, val) + require.Nil(t, err) + assert.Equal(t, got, val) +} + +func TestQuotedStringArg(t *testing.T) { + cl := client.NewURIClient(tcpAddr) + // should NOT be unquoted + val := "\"abc\"" + got, err := echoViaHTTP(cl, val) + require.Nil(t, err) + assert.Equal(t, got, val) +} + +func TestWSNewWSRPCFunc(t *testing.T) { + cl := client.NewWSClient(tcpAddr, websocketEndpoint) + _, err := cl.Start() + require.Nil(t, err) + defer cl.Stop() + + val := "acbd" + params := map[string]interface{}{ + "arg": val, + } + err = cl.WriteJSON(types.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "echo_ws", + Params: params, + }) + require.Nil(t, err) + + select { + case msg := <-cl.ResultsCh: + result := new(Result) + wire.ReadJSONPtr(result, msg, &err) + require.Nil(t, err) + got := (*result).(*ResultEcho).Value + assert.Equal(t, got, val) + case err := <-cl.ErrorsCh: + t.Fatal(err) + } +} + +func TestWSHandlesArrayParams(t *testing.T) { + cl := client.NewWSClient(tcpAddr, websocketEndpoint) + _, err := cl.Start() + require.Nil(t, err) + defer cl.Stop() + + val := "acbd" + params := []interface{}{val} + err = cl.WriteJSON(types.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "echo_ws", + Params: params, + }) + require.Nil(t, err) + + select { + case msg := <-cl.ResultsCh: + result := new(Result) + wire.ReadJSONPtr(result, msg, &err) + require.Nil(t, err) + got := (*result).(*ResultEcho).Value + assert.Equal(t, got, val) + case err := <-cl.ErrorsCh: + t.Fatalf("%+v", err) + } +} + +func randBytes(t *testing.T) []byte { + n := rand.Intn(10) + 2 + buf := make([]byte, n) + _, err := crand.Read(buf) + require.Nil(t, err) + return bytes.Replace(buf, []byte("="), []byte{100}, -1) +} diff --git a/server/handlers.go b/server/handlers.go new file mode 100644 index 000000000..456b4aaf5 --- /dev/null +++ b/server/handlers.go @@ -0,0 +1,649 @@ +package rpcserver + +import ( + "bytes" + "encoding/hex" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "reflect" + "sort" + "strings" + "time" + + "github.com/gorilla/websocket" + "github.com/pkg/errors" + types "github.com/tendermint/go-rpc/types" + wire "github.com/tendermint/go-wire" + cmn "github.com/tendermint/tmlibs/common" + events "github.com/tendermint/tmlibs/events" +) + +// Adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions. +// "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse +func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { + // HTTP endpoints + for funcName, rpcFunc := range funcMap { + mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc)) + } + + // JSONRPC endpoints + mux.HandleFunc("/", makeJSONRPCHandler(funcMap)) +} + +//------------------------------------- +// function introspection + +// holds all type information for each function +type RPCFunc struct { + f reflect.Value // underlying rpc function + args []reflect.Type // type of each function arg + returns []reflect.Type // type of each return arg + argNames []string // name of each argument + ws bool // websocket only +} + +// wraps a function for quicker introspection +// f is the function, args are comma separated argument names +func NewRPCFunc(f interface{}, args string) *RPCFunc { + return newRPCFunc(f, args, false) +} + +func NewWSRPCFunc(f interface{}, args string) *RPCFunc { + return newRPCFunc(f, args, true) +} + +func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc { + var argNames []string + if args != "" { + argNames = strings.Split(args, ",") + } + return &RPCFunc{ + f: reflect.ValueOf(f), + args: funcArgTypes(f), + returns: funcReturnTypes(f), + argNames: argNames, + ws: ws, + } +} + +// return a function's argument types +func funcArgTypes(f interface{}) []reflect.Type { + t := reflect.TypeOf(f) + n := t.NumIn() + typez := make([]reflect.Type, n) + for i := 0; i < n; i++ { + typez[i] = t.In(i) + } + return typez +} + +// return a function's return types +func funcReturnTypes(f interface{}) []reflect.Type { + t := reflect.TypeOf(f) + n := t.NumOut() + typez := make([]reflect.Type, n) + for i := 0; i < n; i++ { + typez[i] = t.Out(i) + } + return typez +} + +// function introspection +//----------------------------------------------------------------------------- +// rpc.json + +// jsonrpc calls grab the given method's function info and runs reflect.Call +func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + b, _ := ioutil.ReadAll(r.Body) + // if its an empty request (like from a browser), + // just display a list of functions + if len(b) == 0 { + writeListOfEndpoints(w, r, funcMap) + return + } + + var request types.RPCRequest + err := json.Unmarshal(b, &request) + if err != nil { + WriteRPCResponseHTTP(w, types.NewRPCResponse("", nil, fmt.Sprintf("Error unmarshalling request: %v", err.Error()))) + return + } + if len(r.URL.Path) > 1 { + WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path))) + return + } + rpcFunc := funcMap[request.Method] + if rpcFunc == nil { + WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) + return + } + if rpcFunc.ws { + WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method)) + return + } + args, err := jsonParamsToArgsRPC(rpcFunc, request.Params) + if err != nil { + WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, nil, fmt.Sprintf("Error converting json params to arguments: %v", err.Error()))) + return + } + returns := rpcFunc.f.Call(args) + log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, result, err.Error())) + return + } + WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, result, "")) + } +} + +// Convert a []interface{} OR a map[string]interface{} to properly typed values +// +// argsOffset should be 0 for RPC calls, and 1 for WS requests, where len(rpcFunc.args) != len(rpcFunc.argNames). +// Example: +// rpcFunc.args = [rpctypes.WSRPCContext string] +// rpcFunc.argNames = ["arg"] +func jsonParamsToArgs(rpcFunc *RPCFunc, paramsI interface{}, argsOffset int) ([]reflect.Value, error) { + values := make([]reflect.Value, len(rpcFunc.argNames)) + + switch params := paramsI.(type) { + + case map[string]interface{}: + for i, argName := range rpcFunc.argNames { + argType := rpcFunc.args[i+argsOffset] + + // decode param if provided + if param, ok := params[argName]; ok && "" != param { + v, err := _jsonObjectToArg(argType, param) + if err != nil { + return nil, err + } + values[i] = v + } else { // use default for that type + values[i] = reflect.Zero(argType) + } + } + case []interface{}: + if len(rpcFunc.argNames) != len(params) { + return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", + len(rpcFunc.argNames), rpcFunc.argNames, len(params), params)) + } + values := make([]reflect.Value, len(params)) + for i, p := range params { + ty := rpcFunc.args[i+argsOffset] + v, err := _jsonObjectToArg(ty, p) + if err != nil { + return nil, err + } + values[i] = v + } + return values, nil + default: + return nil, fmt.Errorf("Unknown type for JSON params %v. Expected map[string]interface{} or []interface{}", reflect.TypeOf(paramsI)) + } + return values, nil +} + +// Convert a []interface{} OR a map[string]interface{} to properly typed values +func jsonParamsToArgsRPC(rpcFunc *RPCFunc, paramsI interface{}) ([]reflect.Value, error) { + return jsonParamsToArgs(rpcFunc, paramsI, 0) +} + +// Same as above, but with the first param the websocket connection +func jsonParamsToArgsWS(rpcFunc *RPCFunc, paramsI interface{}, wsCtx types.WSRPCContext) ([]reflect.Value, error) { + values, err := jsonParamsToArgs(rpcFunc, paramsI, 1) + if err != nil { + return nil, err + } + return append([]reflect.Value{reflect.ValueOf(wsCtx)}, values...), nil +} + +func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) { + var err error + v := reflect.New(ty) + wire.ReadJSONObjectPtr(v.Interface(), object, &err) + if err != nil { + return v, err + } + v = v.Elem() + return v, nil +} + +// rpc.json +//----------------------------------------------------------------------------- +// rpc.http + +// convert from a function name to the http handler +func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) { + // Exception for websocket endpoints + if rpcFunc.ws { + return func(w http.ResponseWriter, r *http.Request) { + WriteRPCResponseHTTP(w, types.NewRPCResponse("", nil, "This RPC method is only for websockets")) + } + } + // All other endpoints + return func(w http.ResponseWriter, r *http.Request) { + log.Debug("HTTP HANDLER", "req", r) + args, err := httpParamsToArgs(rpcFunc, r) + if err != nil { + WriteRPCResponseHTTP(w, types.NewRPCResponse("", nil, fmt.Sprintf("Error converting http params to args: %v", err.Error()))) + return + } + returns := rpcFunc.f.Call(args) + log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + WriteRPCResponseHTTP(w, types.NewRPCResponse("", nil, err.Error())) + return + } + WriteRPCResponseHTTP(w, types.NewRPCResponse("", result, "")) + } +} + +// Covert an http query to a list of properly typed values. +// To be properly decoded the arg must be a concrete type from tendermint (if its an interface). +func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) { + values := make([]reflect.Value, len(rpcFunc.args)) + + for i, name := range rpcFunc.argNames { + argType := rpcFunc.args[i] + + values[i] = reflect.Zero(argType) // set default for that type + + arg := GetParam(r, name) + // log.Notice("param to arg", "argType", argType, "name", name, "arg", arg) + + if "" == arg { + continue + } + + v, err, ok := nonJsonToArg(argType, arg) + if err != nil { + return nil, err + } + if ok { + values[i] = v + continue + } + + // Pass values to go-wire + values[i], err = _jsonStringToArg(argType, arg) + if err != nil { + return nil, err + } + } + + return values, nil +} + +func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { + var err error + v := reflect.New(ty) + wire.ReadJSONPtr(v.Interface(), []byte(arg), &err) + if err != nil { + return v, err + } + v = v.Elem() + return v, nil +} + +func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) { + isQuotedString := strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`) + isHexString := strings.HasPrefix(strings.ToLower(arg), "0x") + expectingString := ty.Kind() == reflect.String + expectingByteSlice := ty.Kind() == reflect.Slice && ty.Elem().Kind() == reflect.Uint8 + + if isHexString { + if !expectingString && !expectingByteSlice { + err := errors.Errorf("Got a hex string arg, but expected '%s'", + ty.Kind().String()) + return reflect.ValueOf(nil), err, false + } + + var value []byte + value, err := hex.DecodeString(arg[2:]) + if err != nil { + return reflect.ValueOf(nil), err, false + } + if ty.Kind() == reflect.String { + return reflect.ValueOf(string(value)), nil, true + } + return reflect.ValueOf([]byte(value)), nil, true + } + + if isQuotedString && expectingByteSlice { + var err error + v := reflect.New(reflect.TypeOf("")) + wire.ReadJSONPtr(v.Interface(), []byte(arg), &err) + if err != nil { + return reflect.ValueOf(nil), err, false + } + v = v.Elem() + return reflect.ValueOf([]byte(v.String())), nil, true + } + + return reflect.ValueOf(nil), nil, false +} + +// rpc.http +//----------------------------------------------------------------------------- +// rpc.websocket + +const ( + writeChanCapacity = 1000 + wsWriteTimeoutSeconds = 30 // each write times out after this + wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. + wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. +) + +// a single websocket connection +// contains listener id, underlying ws connection, +// and the event switch for subscribing to events +type wsConnection struct { + cmn.BaseService + + remoteAddr string + baseConn *websocket.Conn + writeChan chan types.RPCResponse + readTimeout *time.Timer + pingTicker *time.Ticker + + funcMap map[string]*RPCFunc + evsw events.EventSwitch +} + +// new websocket connection wrapper +func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch) *wsConnection { + wsc := &wsConnection{ + remoteAddr: baseConn.RemoteAddr().String(), + baseConn: baseConn, + writeChan: make(chan types.RPCResponse, writeChanCapacity), // error when full. + funcMap: funcMap, + evsw: evsw, + } + wsc.BaseService = *cmn.NewBaseService(log, "wsConnection", wsc) + return wsc +} + +// wsc.Start() blocks until the connection closes. +func (wsc *wsConnection) OnStart() error { + wsc.BaseService.OnStart() + + // these must be set before the readRoutine is created, as it may + // call wsc.Stop(), which accesses these timers + wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds) + wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds) + + // Read subscriptions/unsubscriptions to events + go wsc.readRoutine() + + // Custom Ping handler to touch readTimeout + wsc.baseConn.SetPingHandler(func(m string) error { + // NOTE: https://github.com/gorilla/websocket/issues/97 + go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) + wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) + return nil + }) + wsc.baseConn.SetPongHandler(func(m string) error { + // NOTE: https://github.com/gorilla/websocket/issues/97 + wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) + return nil + }) + go wsc.readTimeoutRoutine() + + // Write responses, BLOCKING. + wsc.writeRoutine() + return nil +} + +func (wsc *wsConnection) OnStop() { + wsc.BaseService.OnStop() + if wsc.evsw != nil { + wsc.evsw.RemoveListener(wsc.remoteAddr) + } + wsc.readTimeout.Stop() + wsc.pingTicker.Stop() + // The write loop closes the websocket connection + // when it exits its loop, and the read loop + // closes the writeChan +} + +func (wsc *wsConnection) readTimeoutRoutine() { + select { + case <-wsc.readTimeout.C: + log.Notice("Stopping connection due to read timeout") + wsc.Stop() + case <-wsc.Quit: + return + } +} + +// Implements WSRPCConnection +func (wsc *wsConnection) GetRemoteAddr() string { + return wsc.remoteAddr +} + +// Implements WSRPCConnection +func (wsc *wsConnection) GetEventSwitch() events.EventSwitch { + return wsc.evsw +} + +// Implements WSRPCConnection +// Blocking write to writeChan until service stops. +// Goroutine-safe +func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { + select { + case <-wsc.Quit: + return + case wsc.writeChan <- resp: + } +} + +// Implements WSRPCConnection +// Nonblocking write. +// Goroutine-safe +func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { + select { + case <-wsc.Quit: + return false + case wsc.writeChan <- resp: + return true + default: + return false + } +} + +// Read from the socket and subscribe to or unsubscribe from events +func (wsc *wsConnection) readRoutine() { + // Do not close writeChan, to allow WriteRPCResponse() to fail. + // defer close(wsc.writeChan) + + for { + select { + case <-wsc.Quit: + return + default: + var in []byte + // Do not set a deadline here like below: + // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds)) + // The client may not send anything for a while. + // We use `readTimeout` to handle read timeouts. + _, in, err := wsc.baseConn.ReadMessage() + if err != nil { + log.Notice("Failed to read from connection", "remote", wsc.remoteAddr, "err", err.Error()) + // an error reading the connection, + // kill the connection + wsc.Stop() + return + } + var request types.RPCRequest + err = json.Unmarshal(in, &request) + if err != nil { + errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) + wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, errStr)) + continue + } + + // Now, fetch the RPCFunc and execute it. + + rpcFunc := wsc.funcMap[request.Method] + if rpcFunc == nil { + wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) + continue + } + var args []reflect.Value + if rpcFunc.ws { + wsCtx := types.WSRPCContext{Request: request, WSRPCConnection: wsc} + args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx) + } else { + args, err = jsonParamsToArgsRPC(rpcFunc, request.Params) + } + if err != nil { + wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, err.Error())) + continue + } + returns := rpcFunc.f.Call(args) + log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, err.Error())) + continue + } else { + wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, result, "")) + continue + } + + } + } +} + +// receives on a write channel and writes out on the socket +func (wsc *wsConnection) writeRoutine() { + defer wsc.baseConn.Close() + for { + select { + case <-wsc.Quit: + return + case <-wsc.pingTicker.C: + err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{}) + if err != nil { + log.Error("Failed to write ping message on websocket", "error", err) + wsc.Stop() + return + } + case msg := <-wsc.writeChan: + jsonBytes, err := json.Marshal(msg) + if err != nil { + log.Error("Failed to marshal RPCResponse to JSON", "error", err) + } else { + wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) + if err = wsc.baseConn.WriteMessage(websocket.TextMessage, jsonBytes); err != nil { + log.Warn("Failed to write response on websocket", "error", err) + wsc.Stop() + return + } + } + } + } +} + +//---------------------------------------- + +// Main manager for all websocket connections +// Holds the event switch +// NOTE: The websocket path is defined externally, e.g. in node/node.go +type WebsocketManager struct { + websocket.Upgrader + funcMap map[string]*RPCFunc + evsw events.EventSwitch +} + +func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch) *WebsocketManager { + return &WebsocketManager{ + funcMap: funcMap, + evsw: evsw, + Upgrader: websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + // TODO + return true + }, + }, + } +} + +// Upgrade the request/response (via http.Hijack) and starts the wsConnection. +func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { + wsConn, err := wm.Upgrade(w, r, nil) + if err != nil { + // TODO - return http error + log.Error("Failed to upgrade to websocket connection", "error", err) + return + } + + // register connection + con := NewWSConnection(wsConn, wm.funcMap, wm.evsw) + log.Notice("New websocket connection", "remote", con.remoteAddr) + con.Start() // Blocking +} + +// rpc.websocket +//----------------------------------------------------------------------------- + +// NOTE: assume returns is result struct and error. If error is not nil, return it +func unreflectResult(returns []reflect.Value) (interface{}, error) { + errV := returns[1] + if errV.Interface() != nil { + return nil, errors.Errorf("%v", errV.Interface()) + } + rv := returns[0] + // the result is a registered interface, + // we need a pointer to it so we can marshal with type byte + rvp := reflect.New(rv.Type()) + rvp.Elem().Set(rv) + return rvp.Interface(), nil +} + +// writes a list of available rpc endpoints as an html page +func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) { + noArgNames := []string{} + argNames := []string{} + for name, funcData := range funcMap { + if len(funcData.args) == 0 { + noArgNames = append(noArgNames, name) + } else { + argNames = append(argNames, name) + } + } + sort.Strings(noArgNames) + sort.Strings(argNames) + buf := new(bytes.Buffer) + buf.WriteString("") + buf.WriteString("
Available endpoints:
") + + for _, name := range noArgNames { + link := fmt.Sprintf("http://%s/%s", r.Host, name) + buf.WriteString(fmt.Sprintf("%s
", link, link)) + } + + buf.WriteString("
Endpoints that require arguments:
") + for _, name := range argNames { + link := fmt.Sprintf("http://%s/%s?", r.Host, name) + funcData := funcMap[name] + for i, argName := range funcData.argNames { + link += argName + "=_" + if i < len(funcData.argNames)-1 { + link += "&" + } + } + buf.WriteString(fmt.Sprintf("%s
", link, link)) + } + buf.WriteString("") + w.Header().Set("Content-Type", "text/html") + w.WriteHeader(200) + w.Write(buf.Bytes()) +} diff --git a/server/http_params.go b/server/http_params.go new file mode 100644 index 000000000..565060678 --- /dev/null +++ b/server/http_params.go @@ -0,0 +1,90 @@ +package rpcserver + +import ( + "encoding/hex" + "net/http" + "regexp" + "strconv" + + "github.com/pkg/errors" +) + +var ( + // Parts of regular expressions + atom = "[A-Z0-9!#$%&'*+\\-/=?^_`{|}~]+" + dotAtom = atom + `(?:\.` + atom + `)*` + domain = `[A-Z0-9.-]+\.[A-Z]{2,4}` + + RE_HEX = regexp.MustCompile(`^(?i)[a-f0-9]+$`) + RE_EMAIL = regexp.MustCompile(`^(?i)(` + dotAtom + `)@(` + dotAtom + `)$`) + RE_ADDRESS = regexp.MustCompile(`^(?i)[a-z0-9]{25,34}$`) + RE_HOST = regexp.MustCompile(`^(?i)(` + domain + `)$`) + + //RE_ID12 = regexp.MustCompile(`^[a-zA-Z0-9]{12}$`) +) + +func GetParam(r *http.Request, param string) string { + s := r.URL.Query().Get(param) + if s == "" { + s = r.FormValue(param) + } + return s +} + +func GetParamByteSlice(r *http.Request, param string) ([]byte, error) { + s := GetParam(r, param) + return hex.DecodeString(s) +} + +func GetParamInt64(r *http.Request, param string) (int64, error) { + s := GetParam(r, param) + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return 0, errors.Errorf(param, err.Error()) + } + return i, nil +} + +func GetParamInt32(r *http.Request, param string) (int32, error) { + s := GetParam(r, param) + i, err := strconv.ParseInt(s, 10, 32) + if err != nil { + return 0, errors.Errorf(param, err.Error()) + } + return int32(i), nil +} + +func GetParamUint64(r *http.Request, param string) (uint64, error) { + s := GetParam(r, param) + i, err := strconv.ParseUint(s, 10, 64) + if err != nil { + return 0, errors.Errorf(param, err.Error()) + } + return i, nil +} + +func GetParamUint(r *http.Request, param string) (uint, error) { + s := GetParam(r, param) + i, err := strconv.ParseUint(s, 10, 64) + if err != nil { + return 0, errors.Errorf(param, err.Error()) + } + return uint(i), nil +} + +func GetParamRegexp(r *http.Request, param string, re *regexp.Regexp) (string, error) { + s := GetParam(r, param) + if !re.MatchString(s) { + return "", errors.Errorf(param, "Did not match regular expression %v", re.String()) + } + return s, nil +} + +func GetParamFloat64(r *http.Request, param string) (float64, error) { + s := GetParam(r, param) + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return 0, errors.Errorf(param, err.Error()) + } + return f, nil +} diff --git a/server/http_server.go b/server/http_server.go new file mode 100644 index 000000000..5375c574f --- /dev/null +++ b/server/http_server.go @@ -0,0 +1,125 @@ +// Commons for HTTP handling +package rpcserver + +import ( + "bufio" + "encoding/json" + "fmt" + "net" + "net/http" + "runtime/debug" + "strings" + "time" + + "github.com/pkg/errors" + types "github.com/tendermint/go-rpc/types" +) + +func StartHTTPServer(listenAddr string, handler http.Handler) (listener net.Listener, err error) { + // listenAddr should be fully formed including tcp:// or unix:// prefix + var proto, addr string + parts := strings.SplitN(listenAddr, "://", 2) + if len(parts) != 2 { + log.Warn("WARNING (go-rpc): Please use fully formed listening addresses, including the tcp:// or unix:// prefix") + // we used to allow addrs without tcp/unix prefix by checking for a colon + // TODO: Deprecate + proto = types.SocketType(listenAddr) + addr = listenAddr + // return nil, errors.Errorf("Invalid listener address %s", lisenAddr) + } else { + proto, addr = parts[0], parts[1] + } + + log.Notice(fmt.Sprintf("Starting RPC HTTP server on %s socket %v", proto, addr)) + listener, err = net.Listen(proto, addr) + if err != nil { + return nil, errors.Errorf("Failed to listen to %v: %v", listenAddr, err) + } + + go func() { + res := http.Serve( + listener, + RecoverAndLogHandler(handler), + ) + log.Crit("RPC HTTP server stopped", "result", res) + }() + return listener, nil +} + +func WriteRPCResponseHTTP(w http.ResponseWriter, res types.RPCResponse) { + // jsonBytes := wire.JSONBytesPretty(res) + jsonBytes, err := json.Marshal(res) + if err != nil { + panic(err) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + w.Write(jsonBytes) +} + +//----------------------------------------------------------------------------- + +// Wraps an HTTP handler, adding error logging. +// If the inner function panics, the outer function recovers, logs, sends an +// HTTP 500 error response. +func RecoverAndLogHandler(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Wrap the ResponseWriter to remember the status + rww := &ResponseWriterWrapper{-1, w} + begin := time.Now() + + // Common headers + origin := r.Header.Get("Origin") + rww.Header().Set("Access-Control-Allow-Origin", origin) + rww.Header().Set("Access-Control-Allow-Credentials", "true") + rww.Header().Set("Access-Control-Expose-Headers", "X-Server-Time") + rww.Header().Set("X-Server-Time", fmt.Sprintf("%v", begin.Unix())) + + defer func() { + // Send a 500 error if a panic happens during a handler. + // Without this, Chrome & Firefox were retrying aborted ajax requests, + // at least to my localhost. + if e := recover(); e != nil { + + // If RPCResponse + if res, ok := e.(types.RPCResponse); ok { + WriteRPCResponseHTTP(rww, res) + } else { + // For the rest, + log.Error("Panic in RPC HTTP handler", "error", e, "stack", string(debug.Stack())) + rww.WriteHeader(http.StatusInternalServerError) + WriteRPCResponseHTTP(rww, types.NewRPCResponse("", nil, fmt.Sprintf("Internal Server Error: %v", e))) + } + } + + // Finally, log. + durationMS := time.Since(begin).Nanoseconds() / 1000000 + if rww.Status == -1 { + rww.Status = 200 + } + log.Info("Served RPC HTTP response", + "method", r.Method, "url", r.URL, + "status", rww.Status, "duration", durationMS, + "remoteAddr", r.RemoteAddr, + ) + }() + + handler.ServeHTTP(rww, r) + }) +} + +// Remember the status for logging +type ResponseWriterWrapper struct { + Status int + http.ResponseWriter +} + +func (w *ResponseWriterWrapper) WriteHeader(status int) { + w.Status = status + w.ResponseWriter.WriteHeader(status) +} + +// implements http.Hijacker +func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) { + return w.ResponseWriter.(http.Hijacker).Hijack() +} diff --git a/server/log.go b/server/log.go new file mode 100644 index 000000000..704e22e30 --- /dev/null +++ b/server/log.go @@ -0,0 +1,7 @@ +package rpcserver + +import ( + "github.com/tendermint/log15" +) + +var log = log15.New("module", "rpcserver") diff --git a/test/data.json b/test/data.json new file mode 100644 index 000000000..83283ec33 --- /dev/null +++ b/test/data.json @@ -0,0 +1,9 @@ +{ + "jsonrpc": "2.0", + "id": "", + "method": "hello_world", + "params": { + "name": "my_world", + "num": 5 + } +} diff --git a/test/integration_test.sh b/test/integration_test.sh new file mode 100755 index 000000000..7c23be7d3 --- /dev/null +++ b/test/integration_test.sh @@ -0,0 +1,95 @@ +#!/usr/bin/env bash +set -e + +# Get the directory of where this script is. +SOURCE="${BASH_SOURCE[0]}" +while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done +DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" + +# Change into that dir because we expect that. +pushd "$DIR" + +echo "==> Building the server" +go build -o rpcserver main.go + +echo "==> (Re)starting the server" +PID=$(pgrep rpcserver || echo "") +if [[ $PID != "" ]]; then + kill -9 "$PID" +fi +./rpcserver & +PID=$! +sleep 2 + +echo "==> simple request" +R1=$(curl -s 'http://localhost:8008/hello_world?name="my_world"&num=5') +R2=$(curl -s --data @data.json http://localhost:8008) +if [[ "$R1" != "$R2" ]]; then + echo "responses are not identical:" + echo "R1: $R1" + echo "R2: $R2" + echo "FAIL" + exit 1 +else + echo "OK" +fi + +echo "==> request with 0x-prefixed hex string arg" +R1=$(curl -s 'http://localhost:8008/hello_world?name=0x41424344&num=123') +R2='{"jsonrpc":"2.0","id":"","result":{"Result":"hi ABCD 123"},"error":""}' +if [[ "$R1" != "$R2" ]]; then + echo "responses are not identical:" + echo "R1: $R1" + echo "R2: $R2" + echo "FAIL" + exit 1 +else + echo "OK" +fi + +echo "==> request with missing params" +R1=$(curl -s 'http://localhost:8008/hello_world') +R2='{"jsonrpc":"2.0","id":"","result":{"Result":"hi 0"},"error":""}' +if [[ "$R1" != "$R2" ]]; then + echo "responses are not identical:" + echo "R1: $R1" + echo "R2: $R2" + echo "FAIL" + exit 1 +else + echo "OK" +fi + +echo "==> request with unquoted string arg" +R1=$(curl -s 'http://localhost:8008/hello_world?name=abcd&num=123') +R2="{\"jsonrpc\":\"2.0\",\"id\":\"\",\"result\":null,\"error\":\"Error converting http params to args: invalid character 'a' looking for beginning of value\"}" +if [[ "$R1" != "$R2" ]]; then + echo "responses are not identical:" + echo "R1: $R1" + echo "R2: $R2" + echo "FAIL" + exit 1 +else + echo "OK" +fi + +echo "==> request with string type when expecting number arg" +R1=$(curl -s 'http://localhost:8008/hello_world?name="abcd"&num=0xabcd') +R2="{\"jsonrpc\":\"2.0\",\"id\":\"\",\"result\":null,\"error\":\"Error converting http params to args: Got a hex string arg, but expected 'int'\"}" +if [[ "$R1" != "$R2" ]]; then + echo "responses are not identical:" + echo "R1: $R1" + echo "R2: $R2" + echo "FAIL" + exit 1 +else + echo "OK" +fi + +echo "==> Stopping the server" +kill -9 $PID + +rm -f rpcserver + +popd +exit 0 diff --git a/test/main.go b/test/main.go new file mode 100644 index 000000000..59c6e9b8f --- /dev/null +++ b/test/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "fmt" + "net/http" + + cmn "github.com/tendermint/tmlibs/common" + rpcserver "github.com/tendermint/go-rpc/server" +) + +var routes = map[string]*rpcserver.RPCFunc{ + "hello_world": rpcserver.NewRPCFunc(HelloWorld, "name,num"), +} + +func HelloWorld(name string, num int) (Result, error) { + return Result{fmt.Sprintf("hi %s %d", name, num)}, nil +} + +type Result struct { + Result string +} + +func main() { + mux := http.NewServeMux() + rpcserver.RegisterRPCFuncs(mux, routes) + _, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux) + if err != nil { + cmn.Exit(err.Error()) + } + + // Wait forever + cmn.TrapSignal(func() { + }) + +} diff --git a/types/types.go b/types/types.go new file mode 100644 index 000000000..9c5f2625b --- /dev/null +++ b/types/types.go @@ -0,0 +1,93 @@ +package rpctypes + +import ( + "encoding/json" + "strings" + + wire "github.com/tendermint/go-wire" + events "github.com/tendermint/tmlibs/events" +) + +type RPCRequest struct { + JSONRPC string `json:"jsonrpc"` + ID string `json:"id"` + Method string `json:"method"` + Params interface{} `json:"params"` // must be map[string]interface{} or []interface{} +} + +func NewRPCRequest(id string, method string, params map[string]interface{}) RPCRequest { + return RPCRequest{ + JSONRPC: "2.0", + ID: id, + Method: method, + Params: params, + } +} + +//---------------------------------------- + +/* +Result is a generic interface. +Applications should register type-bytes like so: + +var _ = wire.RegisterInterface( + struct{ Result }{}, + wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, + wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, + ... +) +*/ +type Result interface { +} + +//---------------------------------------- + +type RPCResponse struct { + JSONRPC string `json:"jsonrpc"` + ID string `json:"id"` + Result *json.RawMessage `json:"result"` + Error string `json:"error"` +} + +func NewRPCResponse(id string, res interface{}, err string) RPCResponse { + var raw *json.RawMessage + if res != nil { + rawMsg := json.RawMessage(wire.JSONBytes(res)) + raw = &rawMsg + } + return RPCResponse{ + JSONRPC: "2.0", + ID: id, + Result: raw, + Error: err, + } +} + +//---------------------------------------- + +// *wsConnection implements this interface. +type WSRPCConnection interface { + GetRemoteAddr() string + GetEventSwitch() events.EventSwitch + WriteRPCResponse(resp RPCResponse) + TryWriteRPCResponse(resp RPCResponse) bool +} + +// websocket-only RPCFuncs take this as the first parameter. +type WSRPCContext struct { + Request RPCRequest + WSRPCConnection +} + +//---------------------------------------- +// sockets +// +// Determine if its a unix or tcp socket. +// If tcp, must specify the port; `0.0.0.0` will return incorrectly as "unix" since there's no port +func SocketType(listenAddr string) string { + socketType := "unix" + if len(strings.Split(listenAddr, ":")) >= 2 { + socketType = "tcp" + } + return socketType +} diff --git a/version.go b/version.go new file mode 100644 index 000000000..8828f260b --- /dev/null +++ b/version.go @@ -0,0 +1,7 @@ +package rpc + +const Maj = "0" +const Min = "7" +const Fix = "0" + +const Version = Maj + "." + Min + "." + Fix