Browse Source

Merge remote-tracking branch 'rpc/develop' into repo-merge

pull/456/head
Ethan Buchman 8 years ago
parent
commit
35f1db09a9
18 changed files with 2165 additions and 0 deletions
  1. +12
    -0
      Dockerfile
  2. +201
    -0
      LICENSE
  3. +18
    -0
      Makefile
  4. +128
    -0
      README.md
  5. +21
    -0
      circle.yml
  6. +198
    -0
      client/http_client.go
  7. +7
    -0
      client/log.go
  8. +172
    -0
      client/ws_client.go
  9. +298
    -0
      rpc_test.go
  10. +649
    -0
      server/handlers.go
  11. +90
    -0
      server/http_params.go
  12. +125
    -0
      server/http_server.go
  13. +7
    -0
      server/log.go
  14. +9
    -0
      test/data.json
  15. +95
    -0
      test/integration_test.sh
  16. +35
    -0
      test/main.go
  17. +93
    -0
      types/types.go
  18. +7
    -0
      version.go

+ 12
- 0
Dockerfile View File

@ -0,0 +1,12 @@
FROM golang:latest
RUN mkdir -p /go/src/github.com/tendermint/go-rpc
WORKDIR /go/src/github.com/tendermint/go-rpc
COPY Makefile /go/src/github.com/tendermint/go-rpc/
# COPY glide.yaml /go/src/github.com/tendermint/go-rpc/
# COPY glide.lock /go/src/github.com/tendermint/go-rpc/
COPY . /go/src/github.com/tendermint/go-rpc
RUN make get_deps

+ 201
- 0
LICENSE View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright {yyyy} {name of copyright owner}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

+ 18
- 0
Makefile View File

@ -0,0 +1,18 @@
PACKAGES=$(shell go list ./... | grep -v "test")
all: get_deps test
test:
@echo "--> Running go test --race"
@go test --race $(PACKAGES)
@echo "--> Running integration tests"
@bash ./test/integration_test.sh
get_deps:
@echo "--> Running go get"
@go get -v -d $(PACKAGES)
@go list -f '{{join .TestImports "\n"}}' ./... | \
grep -v /vendor/ | sort | uniq | \
xargs go get -v -d
.PHONY: all test get_deps

+ 128
- 0
README.md View File

@ -0,0 +1,128 @@
# go-rpc
[![CircleCI](https://circleci.com/gh/tendermint/go-rpc.svg?style=svg)](https://circleci.com/gh/tendermint/go-rpc)
HTTP RPC server supporting calls via uri params, jsonrpc, and jsonrpc over websockets
# Client Requests
Suppose we want to expose the rpc function `HelloWorld(name string, num int)`.
## GET (URI)
As a GET request, it would have URI encoded parameters, and look like:
```
curl 'http://localhost:8008/hello_world?name="my_world"&num=5'
```
Note the `'` around the url, which is just so bash doesn't ignore the quotes in `"my_world"`.
This should also work:
```
curl http://localhost:8008/hello_world?name=\"my_world\"&num=5
```
A GET request to `/` returns a list of available endpoints.
For those which take arguments, the arguments will be listed in order, with `_` where the actual value should be.
## POST (JSONRPC)
As a POST request, we use JSONRPC. For instance, the same request would have this as the body:
```
{
"jsonrpc": "2.0",
"id": "anything",
"method": "hello_world",
"params": {
"name": "my_world",
"num": 5
}
}
```
With the above saved in file `data.json`, we can make the request with
```
curl --data @data.json http://localhost:8008
```
## WebSocket (JSONRPC)
All requests are exposed over websocket in the same form as the POST JSONRPC.
Websocket connections are available at their own endpoint, typically `/websocket`,
though this is configurable when starting the server.
# Server Definition
Define some types and routes:
```
// Define a type for results and register concrete versions with go-wire
type Result interface{}
type ResultStatus struct {
Value string
}
var _ = wire.RegisterInterface(
struct{ Result }{},
wire.ConcreteType{&ResultStatus{}, 0x1},
)
// Define some routes
var Routes = map[string]*rpcserver.RPCFunc{
"status": rpcserver.NewRPCFunc(StatusResult, "arg"),
}
// an rpc function
func StatusResult(v string) (Result, error) {
return &ResultStatus{v}, nil
}
```
Now start the server:
```
mux := http.NewServeMux()
rpcserver.RegisterRPCFuncs(mux, Routes)
wm := rpcserver.NewWebsocketManager(Routes, nil)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
go func() {
_, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux)
if err != nil {
panic(err)
}
}()
```
Note that unix sockets are supported as well (eg. `/path/to/socket` instead of `0.0.0.0:8008`)
Now see all available endpoints by sending a GET request to `0.0.0.0:8008`.
Each route is available as a GET request, as a JSONRPCv2 POST request, and via JSONRPCv2 over websockets.
# Examples
* [Tendermint](https://github.com/tendermint/tendermint/blob/master/rpc/core/routes.go)
* [tm-monitor](https://github.com/tendermint/tools/blob/master/tm-monitor/rpc.go)
## CHANGELOG
### 0.7.0
BREAKING CHANGES:
- removed `Client` empty interface
- `ClientJSONRPC#Call` `params` argument became a map
- rename `ClientURI` -> `URIClient`, `ClientJSONRPC` -> `JSONRPCClient`
IMPROVEMENTS:
- added `HTTPClient` interface, which can be used for both `ClientURI`
and `ClientJSONRPC`
- all params are now optional (Golang's default will be used if some param is missing)
- added `Call` method to `WSClient` (see method's doc for details)

+ 21
- 0
circle.yml View File

@ -0,0 +1,21 @@
machine:
environment:
GOPATH: /home/ubuntu/.go_workspace
REPO: $GOPATH/src/github.com/$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME
hosts:
circlehost: 127.0.0.1
localhost: 127.0.0.1
checkout:
post:
- rm -rf $REPO
- mkdir -p $HOME/.go_workspace/src/github.com/$CIRCLE_PROJECT_USERNAME
- mv $HOME/$CIRCLE_PROJECT_REPONAME $REPO
dependencies:
override:
- "cd $REPO && make get_deps"
test:
override:
- "cd $REPO && make test"

+ 198
- 0
client/http_client.go View File

@ -0,0 +1,198 @@
package rpcclient
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"reflect"
"strings"
"github.com/pkg/errors"
types "github.com/tendermint/go-rpc/types"
wire "github.com/tendermint/go-wire"
)
// HTTPClient is a common interface for JSONRPCClient and URIClient.
type HTTPClient interface {
Call(method string, params map[string]interface{}, result interface{}) (interface{}, error)
}
// TODO: Deprecate support for IP:PORT or /path/to/socket
func makeHTTPDialer(remoteAddr string) (string, func(string, string) (net.Conn, error)) {
parts := strings.SplitN(remoteAddr, "://", 2)
var protocol, address string
if len(parts) != 2 {
log.Warn("WARNING (go-rpc): Please use fully formed listening addresses, including the tcp:// or unix:// prefix")
protocol = types.SocketType(remoteAddr)
address = remoteAddr
} else {
protocol, address = parts[0], parts[1]
}
trimmedAddress := strings.Replace(address, "/", ".", -1) // replace / with . for http requests (dummy domain)
return trimmedAddress, func(proto, addr string) (net.Conn, error) {
return net.Dial(protocol, address)
}
}
// We overwrite the http.Client.Dial so we can do http over tcp or unix.
// remoteAddr should be fully featured (eg. with tcp:// or unix://)
func makeHTTPClient(remoteAddr string) (string, *http.Client) {
address, dialer := makeHTTPDialer(remoteAddr)
return "http://" + address, &http.Client{
Transport: &http.Transport{
Dial: dialer,
},
}
}
//------------------------------------------------------------------------------------
// JSON rpc takes params as a slice
type JSONRPCClient struct {
address string
client *http.Client
}
func NewJSONRPCClient(remote string) *JSONRPCClient {
address, client := makeHTTPClient(remote)
return &JSONRPCClient{
address: address,
client: client,
}
}
func (c *JSONRPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
// we need this step because we attempt to decode values using `go-wire`
// (handlers.go:176) on the server side
encodedParams := make(map[string]interface{})
for k, v := range params {
bytes := json.RawMessage(wire.JSONBytes(v))
encodedParams[k] = &bytes
}
request := types.RPCRequest{
JSONRPC: "2.0",
Method: method,
Params: encodedParams,
ID: "",
}
requestBytes, err := json.Marshal(request)
if err != nil {
return nil, err
}
// log.Info(string(requestBytes))
requestBuf := bytes.NewBuffer(requestBytes)
// log.Info(Fmt("RPC request to %v (%v): %v", c.remote, method, string(requestBytes)))
httpResponse, err := c.client.Post(c.address, "text/json", requestBuf)
if err != nil {
return nil, err
}
defer httpResponse.Body.Close()
responseBytes, err := ioutil.ReadAll(httpResponse.Body)
if err != nil {
return nil, err
}
// log.Info(Fmt("RPC response: %v", string(responseBytes)))
return unmarshalResponseBytes(responseBytes, result)
}
//-------------------------------------------------------------
// URI takes params as a map
type URIClient struct {
address string
client *http.Client
}
func NewURIClient(remote string) *URIClient {
address, client := makeHTTPClient(remote)
return &URIClient{
address: address,
client: client,
}
}
func (c *URIClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
values, err := argsToURLValues(params)
if err != nil {
return nil, err
}
// log.Info(Fmt("URI request to %v (%v): %v", c.address, method, values))
resp, err := c.client.PostForm(c.address+"/"+method, values)
if err != nil {
return nil, err
}
defer resp.Body.Close()
responseBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return unmarshalResponseBytes(responseBytes, result)
}
//------------------------------------------------
func unmarshalResponseBytes(responseBytes []byte, result interface{}) (interface{}, error) {
// read response
// if rpc/core/types is imported, the result will unmarshal
// into the correct type
// log.Notice("response", "response", string(responseBytes))
var err error
response := &types.RPCResponse{}
err = json.Unmarshal(responseBytes, response)
if err != nil {
return nil, errors.Errorf("Error unmarshalling rpc response: %v", err)
}
errorStr := response.Error
if errorStr != "" {
return nil, errors.Errorf("Response error: %v", errorStr)
}
// unmarshal the RawMessage into the result
result = wire.ReadJSONPtr(result, *response.Result, &err)
if err != nil {
return nil, errors.Errorf("Error unmarshalling rpc response result: %v", err)
}
return result, nil
}
func argsToURLValues(args map[string]interface{}) (url.Values, error) {
values := make(url.Values)
if len(args) == 0 {
return values, nil
}
err := argsToJson(args)
if err != nil {
return nil, err
}
for key, val := range args {
values.Set(key, val.(string))
}
return values, nil
}
func argsToJson(args map[string]interface{}) error {
var n int
var err error
for k, v := range args {
// Convert byte slices to "0x"-prefixed hex
byteSlice, isByteSlice := reflect.ValueOf(v).Interface().([]byte)
if isByteSlice {
args[k] = fmt.Sprintf("0x%X", byteSlice)
continue
}
// Pass everything else to go-wire
buf := new(bytes.Buffer)
wire.WriteJSON(v, buf, &n, &err)
if err != nil {
return err
}
args[k] = buf.String()
}
return nil
}

+ 7
- 0
client/log.go View File

@ -0,0 +1,7 @@
package rpcclient
import (
"github.com/tendermint/log15"
)
var log = log15.New("module", "rpcclient")

+ 172
- 0
client/ws_client.go View File

@ -0,0 +1,172 @@
package rpcclient
import (
"encoding/json"
"net"
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
cmn "github.com/tendermint/tmlibs/common"
types "github.com/tendermint/go-rpc/types"
wire "github.com/tendermint/go-wire"
)
const (
wsResultsChannelCapacity = 10
wsErrorsChannelCapacity = 1
wsWriteTimeoutSeconds = 10
)
type WSClient struct {
cmn.BaseService
Address string // IP:PORT or /path/to/socket
Endpoint string // /websocket/url/endpoint
Dialer func(string, string) (net.Conn, error)
*websocket.Conn
ResultsCh chan json.RawMessage // closes upon WSClient.Stop()
ErrorsCh chan error // closes upon WSClient.Stop()
}
// create a new connection
func NewWSClient(remoteAddr, endpoint string) *WSClient {
addr, dialer := makeHTTPDialer(remoteAddr)
wsClient := &WSClient{
Address: addr,
Dialer: dialer,
Endpoint: endpoint,
Conn: nil,
}
wsClient.BaseService = *cmn.NewBaseService(log, "WSClient", wsClient)
return wsClient
}
func (wsc *WSClient) String() string {
return wsc.Address + ", " + wsc.Endpoint
}
// OnStart implements cmn.BaseService interface
func (wsc *WSClient) OnStart() error {
wsc.BaseService.OnStart()
err := wsc.dial()
if err != nil {
return err
}
wsc.ResultsCh = make(chan json.RawMessage, wsResultsChannelCapacity)
wsc.ErrorsCh = make(chan error, wsErrorsChannelCapacity)
go wsc.receiveEventsRoutine()
return nil
}
// OnReset implements cmn.BaseService interface
func (wsc *WSClient) OnReset() error {
return nil
}
func (wsc *WSClient) dial() error {
// Dial
dialer := &websocket.Dialer{
NetDial: wsc.Dialer,
Proxy: http.ProxyFromEnvironment,
}
rHeader := http.Header{}
con, _, err := dialer.Dial("ws://"+wsc.Address+wsc.Endpoint, rHeader)
if err != nil {
return err
}
// Set the ping/pong handlers
con.SetPingHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
return nil
})
con.SetPongHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
return nil
})
wsc.Conn = con
return nil
}
// OnStop implements cmn.BaseService interface
func (wsc *WSClient) OnStop() {
wsc.BaseService.OnStop()
wsc.Conn.Close()
// ResultsCh/ErrorsCh is closed in receiveEventsRoutine.
}
func (wsc *WSClient) receiveEventsRoutine() {
for {
_, data, err := wsc.ReadMessage()
if err != nil {
log.Info("WSClient failed to read message", "error", err, "data", string(data))
wsc.Stop()
break
} else {
var response types.RPCResponse
err := json.Unmarshal(data, &response)
if err != nil {
log.Info("WSClient failed to parse message", "error", err, "data", string(data))
wsc.ErrorsCh <- err
continue
}
if response.Error != "" {
wsc.ErrorsCh <- errors.Errorf(response.Error)
continue
}
wsc.ResultsCh <- *response.Result
}
}
// this must be modified in the same go-routine that reads from the
// connection to avoid race conditions
wsc.Conn = nil
// Cleanup
close(wsc.ResultsCh)
close(wsc.ErrorsCh)
}
// Subscribe to an event. Note the server must have a "subscribe" route
// defined.
func (wsc *WSClient) Subscribe(eventid string) error {
err := wsc.WriteJSON(types.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "subscribe",
Params: map[string]interface{}{"event": eventid},
})
return err
}
// Unsubscribe from an event. Note the server must have a "unsubscribe" route
// defined.
func (wsc *WSClient) Unsubscribe(eventid string) error {
err := wsc.WriteJSON(types.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "unsubscribe",
Params: map[string]interface{}{"event": eventid},
})
return err
}
// Call asynchronously calls a given method by sending an RPCRequest to the
// server. Results will be available on ResultsCh, errors, if any, on ErrorsCh.
func (wsc *WSClient) Call(method string, params map[string]interface{}) error {
// we need this step because we attempt to decode values using `go-wire`
// (handlers.go:470) on the server side
encodedParams := make(map[string]interface{})
for k, v := range params {
bytes := json.RawMessage(wire.JSONBytes(v))
encodedParams[k] = &bytes
}
err := wsc.WriteJSON(types.RPCRequest{
JSONRPC: "2.0",
Method: method,
Params: encodedParams,
ID: "",
})
return err
}

+ 298
- 0
rpc_test.go View File

@ -0,0 +1,298 @@
package rpc
import (
"bytes"
crand "crypto/rand"
"fmt"
"math/rand"
"net/http"
"os/exec"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
client "github.com/tendermint/go-rpc/client"
server "github.com/tendermint/go-rpc/server"
types "github.com/tendermint/go-rpc/types"
wire "github.com/tendermint/go-wire"
)
// Client and Server should work over tcp or unix sockets
const (
tcpAddr = "tcp://0.0.0.0:46657"
unixSocket = "/tmp/go-rpc.sock"
unixAddr = "unix:///tmp/go-rpc.sock"
websocketEndpoint = "/websocket/endpoint"
)
// Define a type for results and register concrete versions
type Result interface{}
type ResultEcho struct {
Value string
}
type ResultEchoBytes struct {
Value []byte
}
var _ = wire.RegisterInterface(
struct{ Result }{},
wire.ConcreteType{&ResultEcho{}, 0x1},
wire.ConcreteType{&ResultEchoBytes{}, 0x2},
)
// Define some routes
var Routes = map[string]*server.RPCFunc{
"echo": server.NewRPCFunc(EchoResult, "arg"),
"echo_ws": server.NewWSRPCFunc(EchoWSResult, "arg"),
"echo_bytes": server.NewRPCFunc(EchoBytesResult, "arg"),
}
func EchoResult(v string) (Result, error) {
return &ResultEcho{v}, nil
}
func EchoWSResult(wsCtx types.WSRPCContext, v string) (Result, error) {
return &ResultEcho{v}, nil
}
func EchoBytesResult(v []byte) (Result, error) {
return &ResultEchoBytes{v}, nil
}
// launch unix and tcp servers
func init() {
cmd := exec.Command("rm", "-f", unixSocket)
err := cmd.Start()
if err != nil {
panic(err)
}
if err = cmd.Wait(); err != nil {
panic(err)
}
mux := http.NewServeMux()
server.RegisterRPCFuncs(mux, Routes)
wm := server.NewWebsocketManager(Routes, nil)
mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
go func() {
_, err := server.StartHTTPServer(tcpAddr, mux)
if err != nil {
panic(err)
}
}()
mux2 := http.NewServeMux()
server.RegisterRPCFuncs(mux2, Routes)
wm = server.NewWebsocketManager(Routes, nil)
mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
go func() {
_, err := server.StartHTTPServer(unixAddr, mux2)
if err != nil {
panic(err)
}
}()
// wait for servers to start
time.Sleep(time.Second * 2)
}
func echoViaHTTP(cl client.HTTPClient, val string) (string, error) {
params := map[string]interface{}{
"arg": val,
}
var result Result
if _, err := cl.Call("echo", params, &result); err != nil {
return "", err
}
return result.(*ResultEcho).Value, nil
}
func echoBytesViaHTTP(cl client.HTTPClient, bytes []byte) ([]byte, error) {
params := map[string]interface{}{
"arg": bytes,
}
var result Result
if _, err := cl.Call("echo_bytes", params, &result); err != nil {
return []byte{}, err
}
return result.(*ResultEchoBytes).Value, nil
}
func testWithHTTPClient(t *testing.T, cl client.HTTPClient) {
val := "acbd"
got, err := echoViaHTTP(cl, val)
require.Nil(t, err)
assert.Equal(t, got, val)
val2 := randBytes(t)
got2, err := echoBytesViaHTTP(cl, val2)
require.Nil(t, err)
assert.Equal(t, got2, val2)
}
func echoViaWS(cl *client.WSClient, val string) (string, error) {
params := map[string]interface{}{
"arg": val,
}
err := cl.Call("echo", params)
if err != nil {
return "", err
}
select {
case msg := <-cl.ResultsCh:
result := new(Result)
wire.ReadJSONPtr(result, msg, &err)
if err != nil {
return "", nil
}
return (*result).(*ResultEcho).Value, nil
case err := <-cl.ErrorsCh:
return "", err
}
}
func echoBytesViaWS(cl *client.WSClient, bytes []byte) ([]byte, error) {
params := map[string]interface{}{
"arg": bytes,
}
err := cl.Call("echo_bytes", params)
if err != nil {
return []byte{}, err
}
select {
case msg := <-cl.ResultsCh:
result := new(Result)
wire.ReadJSONPtr(result, msg, &err)
if err != nil {
return []byte{}, nil
}
return (*result).(*ResultEchoBytes).Value, nil
case err := <-cl.ErrorsCh:
return []byte{}, err
}
}
func testWithWSClient(t *testing.T, cl *client.WSClient) {
val := "acbd"
got, err := echoViaWS(cl, val)
require.Nil(t, err)
assert.Equal(t, got, val)
val2 := randBytes(t)
got2, err := echoBytesViaWS(cl, val2)
require.Nil(t, err)
assert.Equal(t, got2, val2)
}
//-------------
func TestServersAndClientsBasic(t *testing.T) {
serverAddrs := [...]string{tcpAddr, unixAddr}
for _, addr := range serverAddrs {
cl1 := client.NewURIClient(addr)
fmt.Printf("=== testing server on %s using %v client", addr, cl1)
testWithHTTPClient(t, cl1)
cl2 := client.NewJSONRPCClient(tcpAddr)
fmt.Printf("=== testing server on %s using %v client", addr, cl2)
testWithHTTPClient(t, cl2)
cl3 := client.NewWSClient(tcpAddr, websocketEndpoint)
_, err := cl3.Start()
require.Nil(t, err)
fmt.Printf("=== testing server on %s using %v client", addr, cl3)
testWithWSClient(t, cl3)
cl3.Stop()
}
}
func TestHexStringArg(t *testing.T) {
cl := client.NewURIClient(tcpAddr)
// should NOT be handled as hex
val := "0xabc"
got, err := echoViaHTTP(cl, val)
require.Nil(t, err)
assert.Equal(t, got, val)
}
func TestQuotedStringArg(t *testing.T) {
cl := client.NewURIClient(tcpAddr)
// should NOT be unquoted
val := "\"abc\""
got, err := echoViaHTTP(cl, val)
require.Nil(t, err)
assert.Equal(t, got, val)
}
func TestWSNewWSRPCFunc(t *testing.T) {
cl := client.NewWSClient(tcpAddr, websocketEndpoint)
_, err := cl.Start()
require.Nil(t, err)
defer cl.Stop()
val := "acbd"
params := map[string]interface{}{
"arg": val,
}
err = cl.WriteJSON(types.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "echo_ws",
Params: params,
})
require.Nil(t, err)
select {
case msg := <-cl.ResultsCh:
result := new(Result)
wire.ReadJSONPtr(result, msg, &err)
require.Nil(t, err)
got := (*result).(*ResultEcho).Value
assert.Equal(t, got, val)
case err := <-cl.ErrorsCh:
t.Fatal(err)
}
}
func TestWSHandlesArrayParams(t *testing.T) {
cl := client.NewWSClient(tcpAddr, websocketEndpoint)
_, err := cl.Start()
require.Nil(t, err)
defer cl.Stop()
val := "acbd"
params := []interface{}{val}
err = cl.WriteJSON(types.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "echo_ws",
Params: params,
})
require.Nil(t, err)
select {
case msg := <-cl.ResultsCh:
result := new(Result)
wire.ReadJSONPtr(result, msg, &err)
require.Nil(t, err)
got := (*result).(*ResultEcho).Value
assert.Equal(t, got, val)
case err := <-cl.ErrorsCh:
t.Fatalf("%+v", err)
}
}
func randBytes(t *testing.T) []byte {
n := rand.Intn(10) + 2
buf := make([]byte, n)
_, err := crand.Read(buf)
require.Nil(t, err)
return bytes.Replace(buf, []byte("="), []byte{100}, -1)
}

+ 649
- 0
server/handlers.go View File

@ -0,0 +1,649 @@
package rpcserver
import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"reflect"
"sort"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
types "github.com/tendermint/go-rpc/types"
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
events "github.com/tendermint/tmlibs/events"
)
// Adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions.
// "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse
func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
// HTTP endpoints
for funcName, rpcFunc := range funcMap {
mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc))
}
// JSONRPC endpoints
mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
}
//-------------------------------------
// function introspection
// holds all type information for each function
type RPCFunc struct {
f reflect.Value // underlying rpc function
args []reflect.Type // type of each function arg
returns []reflect.Type // type of each return arg
argNames []string // name of each argument
ws bool // websocket only
}
// wraps a function for quicker introspection
// f is the function, args are comma separated argument names
func NewRPCFunc(f interface{}, args string) *RPCFunc {
return newRPCFunc(f, args, false)
}
func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
return newRPCFunc(f, args, true)
}
func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc {
var argNames []string
if args != "" {
argNames = strings.Split(args, ",")
}
return &RPCFunc{
f: reflect.ValueOf(f),
args: funcArgTypes(f),
returns: funcReturnTypes(f),
argNames: argNames,
ws: ws,
}
}
// return a function's argument types
func funcArgTypes(f interface{}) []reflect.Type {
t := reflect.TypeOf(f)
n := t.NumIn()
typez := make([]reflect.Type, n)
for i := 0; i < n; i++ {
typez[i] = t.In(i)
}
return typez
}
// return a function's return types
func funcReturnTypes(f interface{}) []reflect.Type {
t := reflect.TypeOf(f)
n := t.NumOut()
typez := make([]reflect.Type, n)
for i := 0; i < n; i++ {
typez[i] = t.Out(i)
}
return typez
}
// function introspection
//-----------------------------------------------------------------------------
// rpc.json
// jsonrpc calls grab the given method's function info and runs reflect.Call
func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
b, _ := ioutil.ReadAll(r.Body)
// if its an empty request (like from a browser),
// just display a list of functions
if len(b) == 0 {
writeListOfEndpoints(w, r, funcMap)
return
}
var request types.RPCRequest
err := json.Unmarshal(b, &request)
if err != nil {
WriteRPCResponseHTTP(w, types.NewRPCResponse("", nil, fmt.Sprintf("Error unmarshalling request: %v", err.Error())))
return
}
if len(r.URL.Path) > 1 {
WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
return
}
rpcFunc := funcMap[request.Method]
if rpcFunc == nil {
WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
return
}
if rpcFunc.ws {
WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method))
return
}
args, err := jsonParamsToArgsRPC(rpcFunc, request.Params)
if err != nil {
WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, nil, fmt.Sprintf("Error converting json params to arguments: %v", err.Error())))
return
}
returns := rpcFunc.f.Call(args)
log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
result, err := unreflectResult(returns)
if err != nil {
WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, result, err.Error()))
return
}
WriteRPCResponseHTTP(w, types.NewRPCResponse(request.ID, result, ""))
}
}
// Convert a []interface{} OR a map[string]interface{} to properly typed values
//
// argsOffset should be 0 for RPC calls, and 1 for WS requests, where len(rpcFunc.args) != len(rpcFunc.argNames).
// Example:
// rpcFunc.args = [rpctypes.WSRPCContext string]
// rpcFunc.argNames = ["arg"]
func jsonParamsToArgs(rpcFunc *RPCFunc, paramsI interface{}, argsOffset int) ([]reflect.Value, error) {
values := make([]reflect.Value, len(rpcFunc.argNames))
switch params := paramsI.(type) {
case map[string]interface{}:
for i, argName := range rpcFunc.argNames {
argType := rpcFunc.args[i+argsOffset]
// decode param if provided
if param, ok := params[argName]; ok && "" != param {
v, err := _jsonObjectToArg(argType, param)
if err != nil {
return nil, err
}
values[i] = v
} else { // use default for that type
values[i] = reflect.Zero(argType)
}
}
case []interface{}:
if len(rpcFunc.argNames) != len(params) {
return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
len(rpcFunc.argNames), rpcFunc.argNames, len(params), params))
}
values := make([]reflect.Value, len(params))
for i, p := range params {
ty := rpcFunc.args[i+argsOffset]
v, err := _jsonObjectToArg(ty, p)
if err != nil {
return nil, err
}
values[i] = v
}
return values, nil
default:
return nil, fmt.Errorf("Unknown type for JSON params %v. Expected map[string]interface{} or []interface{}", reflect.TypeOf(paramsI))
}
return values, nil
}
// Convert a []interface{} OR a map[string]interface{} to properly typed values
func jsonParamsToArgsRPC(rpcFunc *RPCFunc, paramsI interface{}) ([]reflect.Value, error) {
return jsonParamsToArgs(rpcFunc, paramsI, 0)
}
// Same as above, but with the first param the websocket connection
func jsonParamsToArgsWS(rpcFunc *RPCFunc, paramsI interface{}, wsCtx types.WSRPCContext) ([]reflect.Value, error) {
values, err := jsonParamsToArgs(rpcFunc, paramsI, 1)
if err != nil {
return nil, err
}
return append([]reflect.Value{reflect.ValueOf(wsCtx)}, values...), nil
}
func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
var err error
v := reflect.New(ty)
wire.ReadJSONObjectPtr(v.Interface(), object, &err)
if err != nil {
return v, err
}
v = v.Elem()
return v, nil
}
// rpc.json
//-----------------------------------------------------------------------------
// rpc.http
// convert from a function name to the http handler
func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
// Exception for websocket endpoints
if rpcFunc.ws {
return func(w http.ResponseWriter, r *http.Request) {
WriteRPCResponseHTTP(w, types.NewRPCResponse("", nil, "This RPC method is only for websockets"))
}
}
// All other endpoints
return func(w http.ResponseWriter, r *http.Request) {
log.Debug("HTTP HANDLER", "req", r)
args, err := httpParamsToArgs(rpcFunc, r)
if err != nil {
WriteRPCResponseHTTP(w, types.NewRPCResponse("", nil, fmt.Sprintf("Error converting http params to args: %v", err.Error())))
return
}
returns := rpcFunc.f.Call(args)
log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
result, err := unreflectResult(returns)
if err != nil {
WriteRPCResponseHTTP(w, types.NewRPCResponse("", nil, err.Error()))
return
}
WriteRPCResponseHTTP(w, types.NewRPCResponse("", result, ""))
}
}
// Covert an http query to a list of properly typed values.
// To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) {
values := make([]reflect.Value, len(rpcFunc.args))
for i, name := range rpcFunc.argNames {
argType := rpcFunc.args[i]
values[i] = reflect.Zero(argType) // set default for that type
arg := GetParam(r, name)
// log.Notice("param to arg", "argType", argType, "name", name, "arg", arg)
if "" == arg {
continue
}
v, err, ok := nonJsonToArg(argType, arg)
if err != nil {
return nil, err
}
if ok {
values[i] = v
continue
}
// Pass values to go-wire
values[i], err = _jsonStringToArg(argType, arg)
if err != nil {
return nil, err
}
}
return values, nil
}
func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
var err error
v := reflect.New(ty)
wire.ReadJSONPtr(v.Interface(), []byte(arg), &err)
if err != nil {
return v, err
}
v = v.Elem()
return v, nil
}
func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) {
isQuotedString := strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`)
isHexString := strings.HasPrefix(strings.ToLower(arg), "0x")
expectingString := ty.Kind() == reflect.String
expectingByteSlice := ty.Kind() == reflect.Slice && ty.Elem().Kind() == reflect.Uint8
if isHexString {
if !expectingString && !expectingByteSlice {
err := errors.Errorf("Got a hex string arg, but expected '%s'",
ty.Kind().String())
return reflect.ValueOf(nil), err, false
}
var value []byte
value, err := hex.DecodeString(arg[2:])
if err != nil {
return reflect.ValueOf(nil), err, false
}
if ty.Kind() == reflect.String {
return reflect.ValueOf(string(value)), nil, true
}
return reflect.ValueOf([]byte(value)), nil, true
}
if isQuotedString && expectingByteSlice {
var err error
v := reflect.New(reflect.TypeOf(""))
wire.ReadJSONPtr(v.Interface(), []byte(arg), &err)
if err != nil {
return reflect.ValueOf(nil), err, false
}
v = v.Elem()
return reflect.ValueOf([]byte(v.String())), nil, true
}
return reflect.ValueOf(nil), nil, false
}
// rpc.http
//-----------------------------------------------------------------------------
// rpc.websocket
const (
writeChanCapacity = 1000
wsWriteTimeoutSeconds = 30 // each write times out after this
wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings.
wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds.
)
// a single websocket connection
// contains listener id, underlying ws connection,
// and the event switch for subscribing to events
type wsConnection struct {
cmn.BaseService
remoteAddr string
baseConn *websocket.Conn
writeChan chan types.RPCResponse
readTimeout *time.Timer
pingTicker *time.Ticker
funcMap map[string]*RPCFunc
evsw events.EventSwitch
}
// new websocket connection wrapper
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch) *wsConnection {
wsc := &wsConnection{
remoteAddr: baseConn.RemoteAddr().String(),
baseConn: baseConn,
writeChan: make(chan types.RPCResponse, writeChanCapacity), // error when full.
funcMap: funcMap,
evsw: evsw,
}
wsc.BaseService = *cmn.NewBaseService(log, "wsConnection", wsc)
return wsc
}
// wsc.Start() blocks until the connection closes.
func (wsc *wsConnection) OnStart() error {
wsc.BaseService.OnStart()
// these must be set before the readRoutine is created, as it may
// call wsc.Stop(), which accesses these timers
wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds)
wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds)
// Read subscriptions/unsubscriptions to events
go wsc.readRoutine()
// Custom Ping handler to touch readTimeout
wsc.baseConn.SetPingHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
return nil
})
wsc.baseConn.SetPongHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
return nil
})
go wsc.readTimeoutRoutine()
// Write responses, BLOCKING.
wsc.writeRoutine()
return nil
}
func (wsc *wsConnection) OnStop() {
wsc.BaseService.OnStop()
if wsc.evsw != nil {
wsc.evsw.RemoveListener(wsc.remoteAddr)
}
wsc.readTimeout.Stop()
wsc.pingTicker.Stop()
// The write loop closes the websocket connection
// when it exits its loop, and the read loop
// closes the writeChan
}
func (wsc *wsConnection) readTimeoutRoutine() {
select {
case <-wsc.readTimeout.C:
log.Notice("Stopping connection due to read timeout")
wsc.Stop()
case <-wsc.Quit:
return
}
}
// Implements WSRPCConnection
func (wsc *wsConnection) GetRemoteAddr() string {
return wsc.remoteAddr
}
// Implements WSRPCConnection
func (wsc *wsConnection) GetEventSwitch() events.EventSwitch {
return wsc.evsw
}
// Implements WSRPCConnection
// Blocking write to writeChan until service stops.
// Goroutine-safe
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
select {
case <-wsc.Quit:
return
case wsc.writeChan <- resp:
}
}
// Implements WSRPCConnection
// Nonblocking write.
// Goroutine-safe
func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
select {
case <-wsc.Quit:
return false
case wsc.writeChan <- resp:
return true
default:
return false
}
}
// Read from the socket and subscribe to or unsubscribe from events
func (wsc *wsConnection) readRoutine() {
// Do not close writeChan, to allow WriteRPCResponse() to fail.
// defer close(wsc.writeChan)
for {
select {
case <-wsc.Quit:
return
default:
var in []byte
// Do not set a deadline here like below:
// wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds))
// The client may not send anything for a while.
// We use `readTimeout` to handle read timeouts.
_, in, err := wsc.baseConn.ReadMessage()
if err != nil {
log.Notice("Failed to read from connection", "remote", wsc.remoteAddr, "err", err.Error())
// an error reading the connection,
// kill the connection
wsc.Stop()
return
}
var request types.RPCRequest
err = json.Unmarshal(in, &request)
if err != nil {
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, errStr))
continue
}
// Now, fetch the RPCFunc and execute it.
rpcFunc := wsc.funcMap[request.Method]
if rpcFunc == nil {
wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
continue
}
var args []reflect.Value
if rpcFunc.ws {
wsCtx := types.WSRPCContext{Request: request, WSRPCConnection: wsc}
args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx)
} else {
args, err = jsonParamsToArgsRPC(rpcFunc, request.Params)
}
if err != nil {
wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, err.Error()))
continue
}
returns := rpcFunc.f.Call(args)
log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns)
result, err := unreflectResult(returns)
if err != nil {
wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, nil, err.Error()))
continue
} else {
wsc.WriteRPCResponse(types.NewRPCResponse(request.ID, result, ""))
continue
}
}
}
}
// receives on a write channel and writes out on the socket
func (wsc *wsConnection) writeRoutine() {
defer wsc.baseConn.Close()
for {
select {
case <-wsc.Quit:
return
case <-wsc.pingTicker.C:
err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
log.Error("Failed to write ping message on websocket", "error", err)
wsc.Stop()
return
}
case msg := <-wsc.writeChan:
jsonBytes, err := json.Marshal(msg)
if err != nil {
log.Error("Failed to marshal RPCResponse to JSON", "error", err)
} else {
wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
if err = wsc.baseConn.WriteMessage(websocket.TextMessage, jsonBytes); err != nil {
log.Warn("Failed to write response on websocket", "error", err)
wsc.Stop()
return
}
}
}
}
}
//----------------------------------------
// Main manager for all websocket connections
// Holds the event switch
// NOTE: The websocket path is defined externally, e.g. in node/node.go
type WebsocketManager struct {
websocket.Upgrader
funcMap map[string]*RPCFunc
evsw events.EventSwitch
}
func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch) *WebsocketManager {
return &WebsocketManager{
funcMap: funcMap,
evsw: evsw,
Upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// TODO
return true
},
},
}
}
// Upgrade the request/response (via http.Hijack) and starts the wsConnection.
func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
wsConn, err := wm.Upgrade(w, r, nil)
if err != nil {
// TODO - return http error
log.Error("Failed to upgrade to websocket connection", "error", err)
return
}
// register connection
con := NewWSConnection(wsConn, wm.funcMap, wm.evsw)
log.Notice("New websocket connection", "remote", con.remoteAddr)
con.Start() // Blocking
}
// rpc.websocket
//-----------------------------------------------------------------------------
// NOTE: assume returns is result struct and error. If error is not nil, return it
func unreflectResult(returns []reflect.Value) (interface{}, error) {
errV := returns[1]
if errV.Interface() != nil {
return nil, errors.Errorf("%v", errV.Interface())
}
rv := returns[0]
// the result is a registered interface,
// we need a pointer to it so we can marshal with type byte
rvp := reflect.New(rv.Type())
rvp.Elem().Set(rv)
return rvp.Interface(), nil
}
// writes a list of available rpc endpoints as an html page
func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) {
noArgNames := []string{}
argNames := []string{}
for name, funcData := range funcMap {
if len(funcData.args) == 0 {
noArgNames = append(noArgNames, name)
} else {
argNames = append(argNames, name)
}
}
sort.Strings(noArgNames)
sort.Strings(argNames)
buf := new(bytes.Buffer)
buf.WriteString("<html><body>")
buf.WriteString("<br>Available endpoints:<br>")
for _, name := range noArgNames {
link := fmt.Sprintf("http://%s/%s", r.Host, name)
buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
}
buf.WriteString("<br>Endpoints that require arguments:<br>")
for _, name := range argNames {
link := fmt.Sprintf("http://%s/%s?", r.Host, name)
funcData := funcMap[name]
for i, argName := range funcData.argNames {
link += argName + "=_"
if i < len(funcData.argNames)-1 {
link += "&"
}
}
buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link))
}
buf.WriteString("</body></html>")
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(200)
w.Write(buf.Bytes())
}

+ 90
- 0
server/http_params.go View File

@ -0,0 +1,90 @@
package rpcserver
import (
"encoding/hex"
"net/http"
"regexp"
"strconv"
"github.com/pkg/errors"
)
var (
// Parts of regular expressions
atom = "[A-Z0-9!#$%&'*+\\-/=?^_`{|}~]+"
dotAtom = atom + `(?:\.` + atom + `)*`
domain = `[A-Z0-9.-]+\.[A-Z]{2,4}`
RE_HEX = regexp.MustCompile(`^(?i)[a-f0-9]+$`)
RE_EMAIL = regexp.MustCompile(`^(?i)(` + dotAtom + `)@(` + dotAtom + `)$`)
RE_ADDRESS = regexp.MustCompile(`^(?i)[a-z0-9]{25,34}$`)
RE_HOST = regexp.MustCompile(`^(?i)(` + domain + `)$`)
//RE_ID12 = regexp.MustCompile(`^[a-zA-Z0-9]{12}$`)
)
func GetParam(r *http.Request, param string) string {
s := r.URL.Query().Get(param)
if s == "" {
s = r.FormValue(param)
}
return s
}
func GetParamByteSlice(r *http.Request, param string) ([]byte, error) {
s := GetParam(r, param)
return hex.DecodeString(s)
}
func GetParamInt64(r *http.Request, param string) (int64, error) {
s := GetParam(r, param)
i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, errors.Errorf(param, err.Error())
}
return i, nil
}
func GetParamInt32(r *http.Request, param string) (int32, error) {
s := GetParam(r, param)
i, err := strconv.ParseInt(s, 10, 32)
if err != nil {
return 0, errors.Errorf(param, err.Error())
}
return int32(i), nil
}
func GetParamUint64(r *http.Request, param string) (uint64, error) {
s := GetParam(r, param)
i, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return 0, errors.Errorf(param, err.Error())
}
return i, nil
}
func GetParamUint(r *http.Request, param string) (uint, error) {
s := GetParam(r, param)
i, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return 0, errors.Errorf(param, err.Error())
}
return uint(i), nil
}
func GetParamRegexp(r *http.Request, param string, re *regexp.Regexp) (string, error) {
s := GetParam(r, param)
if !re.MatchString(s) {
return "", errors.Errorf(param, "Did not match regular expression %v", re.String())
}
return s, nil
}
func GetParamFloat64(r *http.Request, param string) (float64, error) {
s := GetParam(r, param)
f, err := strconv.ParseFloat(s, 64)
if err != nil {
return 0, errors.Errorf(param, err.Error())
}
return f, nil
}

+ 125
- 0
server/http_server.go View File

@ -0,0 +1,125 @@
// Commons for HTTP handling
package rpcserver
import (
"bufio"
"encoding/json"
"fmt"
"net"
"net/http"
"runtime/debug"
"strings"
"time"
"github.com/pkg/errors"
types "github.com/tendermint/go-rpc/types"
)
func StartHTTPServer(listenAddr string, handler http.Handler) (listener net.Listener, err error) {
// listenAddr should be fully formed including tcp:// or unix:// prefix
var proto, addr string
parts := strings.SplitN(listenAddr, "://", 2)
if len(parts) != 2 {
log.Warn("WARNING (go-rpc): Please use fully formed listening addresses, including the tcp:// or unix:// prefix")
// we used to allow addrs without tcp/unix prefix by checking for a colon
// TODO: Deprecate
proto = types.SocketType(listenAddr)
addr = listenAddr
// return nil, errors.Errorf("Invalid listener address %s", lisenAddr)
} else {
proto, addr = parts[0], parts[1]
}
log.Notice(fmt.Sprintf("Starting RPC HTTP server on %s socket %v", proto, addr))
listener, err = net.Listen(proto, addr)
if err != nil {
return nil, errors.Errorf("Failed to listen to %v: %v", listenAddr, err)
}
go func() {
res := http.Serve(
listener,
RecoverAndLogHandler(handler),
)
log.Crit("RPC HTTP server stopped", "result", res)
}()
return listener, nil
}
func WriteRPCResponseHTTP(w http.ResponseWriter, res types.RPCResponse) {
// jsonBytes := wire.JSONBytesPretty(res)
jsonBytes, err := json.Marshal(res)
if err != nil {
panic(err)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
w.Write(jsonBytes)
}
//-----------------------------------------------------------------------------
// Wraps an HTTP handler, adding error logging.
// If the inner function panics, the outer function recovers, logs, sends an
// HTTP 500 error response.
func RecoverAndLogHandler(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Wrap the ResponseWriter to remember the status
rww := &ResponseWriterWrapper{-1, w}
begin := time.Now()
// Common headers
origin := r.Header.Get("Origin")
rww.Header().Set("Access-Control-Allow-Origin", origin)
rww.Header().Set("Access-Control-Allow-Credentials", "true")
rww.Header().Set("Access-Control-Expose-Headers", "X-Server-Time")
rww.Header().Set("X-Server-Time", fmt.Sprintf("%v", begin.Unix()))
defer func() {
// Send a 500 error if a panic happens during a handler.
// Without this, Chrome & Firefox were retrying aborted ajax requests,
// at least to my localhost.
if e := recover(); e != nil {
// If RPCResponse
if res, ok := e.(types.RPCResponse); ok {
WriteRPCResponseHTTP(rww, res)
} else {
// For the rest,
log.Error("Panic in RPC HTTP handler", "error", e, "stack", string(debug.Stack()))
rww.WriteHeader(http.StatusInternalServerError)
WriteRPCResponseHTTP(rww, types.NewRPCResponse("", nil, fmt.Sprintf("Internal Server Error: %v", e)))
}
}
// Finally, log.
durationMS := time.Since(begin).Nanoseconds() / 1000000
if rww.Status == -1 {
rww.Status = 200
}
log.Info("Served RPC HTTP response",
"method", r.Method, "url", r.URL,
"status", rww.Status, "duration", durationMS,
"remoteAddr", r.RemoteAddr,
)
}()
handler.ServeHTTP(rww, r)
})
}
// Remember the status for logging
type ResponseWriterWrapper struct {
Status int
http.ResponseWriter
}
func (w *ResponseWriterWrapper) WriteHeader(status int) {
w.Status = status
w.ResponseWriter.WriteHeader(status)
}
// implements http.Hijacker
func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return w.ResponseWriter.(http.Hijacker).Hijack()
}

+ 7
- 0
server/log.go View File

@ -0,0 +1,7 @@
package rpcserver
import (
"github.com/tendermint/log15"
)
var log = log15.New("module", "rpcserver")

+ 9
- 0
test/data.json View File

@ -0,0 +1,9 @@
{
"jsonrpc": "2.0",
"id": "",
"method": "hello_world",
"params": {
"name": "my_world",
"num": 5
}
}

+ 95
- 0
test/integration_test.sh View File

@ -0,0 +1,95 @@
#!/usr/bin/env bash
set -e
# Get the directory of where this script is.
SOURCE="${BASH_SOURCE[0]}"
while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done
DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
# Change into that dir because we expect that.
pushd "$DIR"
echo "==> Building the server"
go build -o rpcserver main.go
echo "==> (Re)starting the server"
PID=$(pgrep rpcserver || echo "")
if [[ $PID != "" ]]; then
kill -9 "$PID"
fi
./rpcserver &
PID=$!
sleep 2
echo "==> simple request"
R1=$(curl -s 'http://localhost:8008/hello_world?name="my_world"&num=5')
R2=$(curl -s --data @data.json http://localhost:8008)
if [[ "$R1" != "$R2" ]]; then
echo "responses are not identical:"
echo "R1: $R1"
echo "R2: $R2"
echo "FAIL"
exit 1
else
echo "OK"
fi
echo "==> request with 0x-prefixed hex string arg"
R1=$(curl -s 'http://localhost:8008/hello_world?name=0x41424344&num=123')
R2='{"jsonrpc":"2.0","id":"","result":{"Result":"hi ABCD 123"},"error":""}'
if [[ "$R1" != "$R2" ]]; then
echo "responses are not identical:"
echo "R1: $R1"
echo "R2: $R2"
echo "FAIL"
exit 1
else
echo "OK"
fi
echo "==> request with missing params"
R1=$(curl -s 'http://localhost:8008/hello_world')
R2='{"jsonrpc":"2.0","id":"","result":{"Result":"hi 0"},"error":""}'
if [[ "$R1" != "$R2" ]]; then
echo "responses are not identical:"
echo "R1: $R1"
echo "R2: $R2"
echo "FAIL"
exit 1
else
echo "OK"
fi
echo "==> request with unquoted string arg"
R1=$(curl -s 'http://localhost:8008/hello_world?name=abcd&num=123')
R2="{\"jsonrpc\":\"2.0\",\"id\":\"\",\"result\":null,\"error\":\"Error converting http params to args: invalid character 'a' looking for beginning of value\"}"
if [[ "$R1" != "$R2" ]]; then
echo "responses are not identical:"
echo "R1: $R1"
echo "R2: $R2"
echo "FAIL"
exit 1
else
echo "OK"
fi
echo "==> request with string type when expecting number arg"
R1=$(curl -s 'http://localhost:8008/hello_world?name="abcd"&num=0xabcd')
R2="{\"jsonrpc\":\"2.0\",\"id\":\"\",\"result\":null,\"error\":\"Error converting http params to args: Got a hex string arg, but expected 'int'\"}"
if [[ "$R1" != "$R2" ]]; then
echo "responses are not identical:"
echo "R1: $R1"
echo "R2: $R2"
echo "FAIL"
exit 1
else
echo "OK"
fi
echo "==> Stopping the server"
kill -9 $PID
rm -f rpcserver
popd
exit 0

+ 35
- 0
test/main.go View File

@ -0,0 +1,35 @@
package main
import (
"fmt"
"net/http"
cmn "github.com/tendermint/tmlibs/common"
rpcserver "github.com/tendermint/go-rpc/server"
)
var routes = map[string]*rpcserver.RPCFunc{
"hello_world": rpcserver.NewRPCFunc(HelloWorld, "name,num"),
}
func HelloWorld(name string, num int) (Result, error) {
return Result{fmt.Sprintf("hi %s %d", name, num)}, nil
}
type Result struct {
Result string
}
func main() {
mux := http.NewServeMux()
rpcserver.RegisterRPCFuncs(mux, routes)
_, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux)
if err != nil {
cmn.Exit(err.Error())
}
// Wait forever
cmn.TrapSignal(func() {
})
}

+ 93
- 0
types/types.go View File

@ -0,0 +1,93 @@
package rpctypes
import (
"encoding/json"
"strings"
wire "github.com/tendermint/go-wire"
events "github.com/tendermint/tmlibs/events"
)
type RPCRequest struct {
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`
Method string `json:"method"`
Params interface{} `json:"params"` // must be map[string]interface{} or []interface{}
}
func NewRPCRequest(id string, method string, params map[string]interface{}) RPCRequest {
return RPCRequest{
JSONRPC: "2.0",
ID: id,
Method: method,
Params: params,
}
}
//----------------------------------------
/*
Result is a generic interface.
Applications should register type-bytes like so:
var _ = wire.RegisterInterface(
struct{ Result }{},
wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis},
wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo},
...
)
*/
type Result interface {
}
//----------------------------------------
type RPCResponse struct {
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`
Result *json.RawMessage `json:"result"`
Error string `json:"error"`
}
func NewRPCResponse(id string, res interface{}, err string) RPCResponse {
var raw *json.RawMessage
if res != nil {
rawMsg := json.RawMessage(wire.JSONBytes(res))
raw = &rawMsg
}
return RPCResponse{
JSONRPC: "2.0",
ID: id,
Result: raw,
Error: err,
}
}
//----------------------------------------
// *wsConnection implements this interface.
type WSRPCConnection interface {
GetRemoteAddr() string
GetEventSwitch() events.EventSwitch
WriteRPCResponse(resp RPCResponse)
TryWriteRPCResponse(resp RPCResponse) bool
}
// websocket-only RPCFuncs take this as the first parameter.
type WSRPCContext struct {
Request RPCRequest
WSRPCConnection
}
//----------------------------------------
// sockets
//
// Determine if its a unix or tcp socket.
// If tcp, must specify the port; `0.0.0.0` will return incorrectly as "unix" since there's no port
func SocketType(listenAddr string) string {
socketType := "unix"
if len(strings.Split(listenAddr, ":")) >= 2 {
socketType = "tcp"
}
return socketType
}

+ 7
- 0
version.go View File

@ -0,0 +1,7 @@
package rpc
const Maj = "0"
const Min = "7"
const Fix = "0"
const Version = Maj + "." + Min + "." + Fix

Loading…
Cancel
Save