Browse Source

Refactor RPC to be more general

pull/176/head
Jae Kwon 9 years ago
parent
commit
74cdadec9f
8 changed files with 223 additions and 142 deletions
  1. +2
    -2
      benchmarks/simu/counter.go
  2. +5
    -15
      rpc/client/ws_client.go
  3. +27
    -0
      rpc/core/events.go
  4. +2
    -0
      rpc/core/routes.go
  5. +25
    -23
      rpc/core/types/responses.go
  6. +116
    -94
      rpc/server/handlers.go
  7. +3
    -3
      rpc/server/http_server.go
  8. +43
    -5
      rpc/types/types.go

+ 2
- 2
benchmarks/simu/counter.go View File

@ -10,7 +10,7 @@ import (
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/rpc/client" "github.com/tendermint/tendermint/rpc/client"
// ctypes "github.com/tendermint/tendermint/rpc/core/types"
_ "github.com/tendermint/tendermint/rpc/core/types" // Register RPCResponse > Result types
"github.com/tendermint/tendermint/rpc/types" "github.com/tendermint/tendermint/rpc/types"
) )
@ -49,7 +49,7 @@ func main() {
if i%1000 == 0 { if i%1000 == 0 {
fmt.Println(i) fmt.Println(i)
} }
time.Sleep(time.Microsecond * 250)
time.Sleep(time.Microsecond * 1000)
} }
ws.Stop() ws.Stop()


+ 5
- 15
rpc/client/ws_client.go View File

@ -2,18 +2,15 @@ package rpcclient
import ( import (
"net/http" "net/http"
"strings"
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/rpc/types" "github.com/tendermint/tendermint/rpc/types"
) )
const ( const (
wsEventsChannelCapacity = 10
wsResultsChannelCapacity = 10 wsResultsChannelCapacity = 10
wsWriteTimeoutSeconds = 10 wsWriteTimeoutSeconds = 10
) )
@ -22,8 +19,7 @@ type WSClient struct {
QuitService QuitService
Address string Address string
*websocket.Conn *websocket.Conn
EventsCh chan ctypes.ResultEvent // closes upon WSClient.Stop()
ResultsCh chan ctypes.Result // closes upon WSClient.Stop()
ResultsCh chan rpctypes.Result // closes upon WSClient.Stop()
} }
// create a new connection // create a new connection
@ -31,8 +27,7 @@ func NewWSClient(addr string) *WSClient {
wsClient := &WSClient{ wsClient := &WSClient{
Address: addr, Address: addr,
Conn: nil, Conn: nil,
EventsCh: make(chan ctypes.ResultEvent, wsEventsChannelCapacity),
ResultsCh: make(chan ctypes.Result, wsResultsChannelCapacity),
ResultsCh: make(chan rpctypes.Result, wsResultsChannelCapacity),
} }
wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient) wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient)
return wsClient return wsClient
@ -72,7 +67,7 @@ func (wsc *WSClient) dial() error {
func (wsc *WSClient) OnStop() { func (wsc *WSClient) OnStop() {
wsc.QuitService.OnStop() wsc.QuitService.OnStop()
// EventsCh and ResultsCh are closed in receiveEventsRoutine.
// ResultsCh is closed in receiveEventsRoutine.
} }
func (wsc *WSClient) receiveEventsRoutine() { func (wsc *WSClient) receiveEventsRoutine() {
@ -83,23 +78,18 @@ func (wsc *WSClient) receiveEventsRoutine() {
wsc.Stop() wsc.Stop()
break break
} else { } else {
var response ctypes.Response
var response rpctypes.RPCResponse
wire.ReadJSON(&response, data, &err) wire.ReadJSON(&response, data, &err)
if err != nil { if err != nil {
log.Info("WSClient failed to parse message", "error", err) log.Info("WSClient failed to parse message", "error", err)
wsc.Stop() wsc.Stop()
break break
} }
if strings.HasSuffix(response.ID, "#event") {
wsc.EventsCh <- *response.Result.(*ctypes.ResultEvent)
} else {
wsc.ResultsCh <- response.Result
}
wsc.ResultsCh <- response.Result
} }
} }
// Cleanup // Cleanup
close(wsc.EventsCh)
close(wsc.ResultsCh) close(wsc.ResultsCh)
} }


+ 27
- 0
rpc/core/events.go View File

@ -0,0 +1,27 @@
package core
import (
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/rpc/types"
"github.com/tendermint/tendermint/types"
)
func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) {
log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event)
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) {
// NOTE: EventSwitch callbacks must be nonblocking
// NOTE: RPCResponses of subscribed events have id suffix "#event"
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, ""))
})
return &ctypes.ResultSubscribe{}, nil
}
func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) {
log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event)
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) {
// NOTE: EventSwitch callbacks must be nonblocking
// NOTE: RPCResponses of subscribed events have id suffix "#event"
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, ""))
})
return &ctypes.ResultUnsubscribe{}, nil
}

+ 2
- 0
rpc/core/routes.go View File

@ -6,6 +6,8 @@ import (
// TODO: eliminate redundancy between here and reading code from core/ // TODO: eliminate redundancy between here and reading code from core/
var Routes = map[string]*rpc.RPCFunc{ var Routes = map[string]*rpc.RPCFunc{
"subscribe": rpc.NewWSRPCFunc(Subscribe, []string{"event"}),
"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, []string{"event"}),
"status": rpc.NewRPCFunc(Status, []string{}), "status": rpc.NewRPCFunc(Status, []string{}),
"net_info": rpc.NewRPCFunc(NetInfo, []string{}), "net_info": rpc.NewRPCFunc(NetInfo, []string{}),
"blockchain": rpc.NewRPCFunc(BlockchainInfo, []string{"minHeight", "maxHeight"}), "blockchain": rpc.NewRPCFunc(BlockchainInfo, []string{"minHeight", "maxHeight"}),


+ 25
- 23
rpc/core/types/responses.go View File

@ -4,6 +4,7 @@ import (
"github.com/tendermint/go-crypto" "github.com/tendermint/go-crypto"
"github.com/tendermint/go-p2p" "github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/rpc/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -12,6 +13,10 @@ type ResultBlockchainInfo struct {
BlockMetas []*types.BlockMeta `json:"block_metas"` BlockMetas []*types.BlockMeta `json:"block_metas"`
} }
type ResultGenesis struct {
Genesis *types.GenesisDoc `json:"genesis"`
}
type ResultGetBlock struct { type ResultGetBlock struct {
BlockMeta *types.BlockMeta `json:"block_meta"` BlockMeta *types.BlockMeta `json:"block_meta"`
Block *types.Block `json:"block"` Block *types.Block `json:"block"`
@ -55,8 +60,10 @@ type ResultListUnconfirmedTxs struct {
Txs []types.Tx `json:"txs"` Txs []types.Tx `json:"txs"`
} }
type ResultGenesis struct {
Genesis *types.GenesisDoc `json:"genesis"`
type ResultSubscribe struct {
}
type ResultUnsubscribe struct {
} }
type ResultEvent struct { type ResultEvent struct {
@ -67,31 +74,25 @@ type ResultEvent struct {
//---------------------------------------- //----------------------------------------
// response & result types // response & result types
type Response struct {
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`
Result Result `json:"result"`
Error string `json:"error"`
}
const ( const (
ResultTypeBlockchainInfo = byte(0x05)
ResultTypeGetBlock = byte(0x06)
ResultTypeStatus = byte(0x07)
ResultTypeNetInfo = byte(0x08)
ResultTypeListValidators = byte(0x09)
ResultTypeDumpConsensusState = byte(0x0A)
ResultTypeBroadcastTx = byte(0x0E)
ResultTypeListUnconfirmedTxs = byte(0x0F)
ResultTypeGenesis = byte(0x11)
ResultTypeEvent = byte(0x13) // so websockets can respond to rpc functions
ResultTypeGenesis = byte(0x01)
ResultTypeBlockchainInfo = byte(0x02)
ResultTypeGetBlock = byte(0x03)
ResultTypeStatus = byte(0x04)
ResultTypeNetInfo = byte(0x05)
ResultTypeListValidators = byte(0x06)
ResultTypeDumpConsensusState = byte(0x07)
ResultTypeBroadcastTx = byte(0x08)
ResultTypeListUnconfirmedTxs = byte(0x09)
ResultTypeSubscribe = byte(0x0A)
ResultTypeUnsubscribe = byte(0x0B)
ResultTypeEvent = byte(0x0C)
) )
type Result interface{}
// for wire.readReflect // for wire.readReflect
var _ = wire.RegisterInterface( var _ = wire.RegisterInterface(
struct{ Result }{},
struct{ rpctypes.Result }{},
wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis},
wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo},
wire.ConcreteType{&ResultGetBlock{}, ResultTypeGetBlock}, wire.ConcreteType{&ResultGetBlock{}, ResultTypeGetBlock},
wire.ConcreteType{&ResultStatus{}, ResultTypeStatus}, wire.ConcreteType{&ResultStatus{}, ResultTypeStatus},
@ -100,6 +101,7 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState}, wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState},
wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx}, wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx},
wire.ConcreteType{&ResultListUnconfirmedTxs{}, ResultTypeListUnconfirmedTxs}, wire.ConcreteType{&ResultListUnconfirmedTxs{}, ResultTypeListUnconfirmedTxs},
wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis},
wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe},
wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe},
wire.ConcreteType{&ResultEvent{}, ResultTypeEvent}, wire.ConcreteType{&ResultEvent{}, ResultTypeEvent},
) )

+ 116
- 94
rpc/server/handlers.go View File

@ -15,9 +15,7 @@ import (
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/events"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
. "github.com/tendermint/tendermint/rpc/types" . "github.com/tendermint/tendermint/rpc/types"
"github.com/tendermint/tendermint/types"
) )
func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
@ -39,6 +37,7 @@ type RPCFunc struct {
args []reflect.Type // type of each function arg args []reflect.Type // type of each function arg
returns []reflect.Type // type of each return arg returns []reflect.Type // type of each return arg
argNames []string // name of each argument argNames []string // name of each argument
ws bool // websocket only
} }
// wraps a function for quicker introspection // wraps a function for quicker introspection
@ -48,6 +47,17 @@ func NewRPCFunc(f interface{}, args []string) *RPCFunc {
args: funcArgTypes(f), args: funcArgTypes(f),
returns: funcReturnTypes(f), returns: funcReturnTypes(f),
argNames: args, argNames: args,
ws: false,
}
}
func NewWSRPCFunc(f interface{}, args []string) *RPCFunc {
return &RPCFunc{
f: reflect.ValueOf(f),
args: funcArgTypes(f),
returns: funcReturnTypes(f),
argNames: args,
ws: true,
} }
} }
@ -91,35 +101,39 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
var request RPCRequest var request RPCRequest
err := json.Unmarshal(b, &request) err := json.Unmarshal(b, &request)
if err != nil { if err != nil {
WriteRPCResponse(w, NewRPCResponse("", nil, err.Error()))
WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
return return
} }
if len(r.URL.Path) > 1 { if len(r.URL.Path) > 1 {
WriteRPCResponse(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
return return
} }
rpcFunc := funcMap[request.Method] rpcFunc := funcMap[request.Method]
if rpcFunc == nil { if rpcFunc == nil {
WriteRPCResponse(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
return
}
if rpcFunc.ws {
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method))
return return
} }
args, err := jsonParamsToArgs(rpcFunc, request.Params) args, err := jsonParamsToArgs(rpcFunc, request.Params)
if err != nil { if err != nil {
WriteRPCResponse(w, NewRPCResponse(request.ID, nil, err.Error()))
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error()))
return return
} }
returns := rpcFunc.f.Call(args) returns := rpcFunc.f.Call(args)
log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
result, err := unreflectResult(returns) result, err := unreflectResult(returns)
if err != nil { if err != nil {
WriteRPCResponse(w, NewRPCResponse(request.ID, nil, err.Error()))
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error()))
return return
} }
WriteRPCResponse(w, NewRPCResponse(request.ID, result, ""))
WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, ""))
} }
} }
// covert a list of interfaces to properly typed values
// Convert a list of interfaces to properly typed values
func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) { func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) {
if len(rpcFunc.argNames) != len(params) { if len(rpcFunc.argNames) != len(params) {
return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
@ -137,6 +151,25 @@ func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value,
return values, nil return values, nil
} }
// Same as above, but with the first param the websocket connection
func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) {
if len(rpcFunc.argNames)-1 != len(params) {
return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params))
}
values := make([]reflect.Value, len(params)+1)
values[0] = reflect.ValueOf(wsCtx)
for i, p := range params {
ty := rpcFunc.args[i+1]
v, err := _jsonObjectToArg(ty, p)
if err != nil {
return nil, err
}
values[i+1] = v
}
return values, nil
}
func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) { func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) {
var err error var err error
v := reflect.New(ty) v := reflect.New(ty)
@ -154,20 +187,27 @@ func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error
// convert from a function name to the http handler // convert from a function name to the http handler
func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) { func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) {
// Exception for websocket endpoints
if rpcFunc.ws {
return func(w http.ResponseWriter, r *http.Request) {
WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets"))
}
}
// All other endpoints
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
args, err := httpParamsToArgs(rpcFunc, r) args, err := httpParamsToArgs(rpcFunc, r)
if err != nil { if err != nil {
WriteRPCResponse(w, NewRPCResponse("", nil, err.Error()))
WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
return return
} }
returns := rpcFunc.f.Call(args) returns := rpcFunc.f.Call(args)
log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
result, err := unreflectResult(returns) result, err := unreflectResult(returns)
if err != nil { if err != nil {
WriteRPCResponse(w, NewRPCResponse("", nil, err.Error()))
WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error()))
return return
} }
WriteRPCResponse(w, NewRPCResponse("", result, ""))
WriteRPCResponseHTTP(w, NewRPCResponse("", result, ""))
} }
} }
@ -215,10 +255,10 @@ const (
// a single websocket connection // a single websocket connection
// contains listener id, underlying ws connection, // contains listener id, underlying ws connection,
// and the event switch for subscribing to events // and the event switch for subscribing to events
type WSConnection struct {
type wsConnection struct {
QuitService QuitService
id string
remoteAddr string
baseConn *websocket.Conn baseConn *websocket.Conn
writeChan chan RPCResponse writeChan chan RPCResponse
readTimeout *time.Timer readTimeout *time.Timer
@ -229,20 +269,20 @@ type WSConnection struct {
} }
// new websocket connection wrapper // new websocket connection wrapper
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WSConnection {
wsc := &WSConnection{
id: baseConn.RemoteAddr().String(),
baseConn: baseConn,
writeChan: make(chan RPCResponse, writeChanCapacity), // error when full.
funcMap: funcMap,
evsw: evsw,
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *wsConnection {
wsc := &wsConnection{
remoteAddr: baseConn.RemoteAddr().String(),
baseConn: baseConn,
writeChan: make(chan RPCResponse, writeChanCapacity), // error when full.
funcMap: funcMap,
evsw: evsw,
} }
wsc.QuitService = *NewQuitService(log, "WSConnection", wsc)
wsc.QuitService = *NewQuitService(log, "wsConnection", wsc)
return wsc return wsc
} }
// wsc.Start() blocks until the connection closes. // wsc.Start() blocks until the connection closes.
func (wsc *WSConnection) OnStart() error {
func (wsc *wsConnection) OnStart() error {
wsc.QuitService.OnStart() wsc.QuitService.OnStart()
// Read subscriptions/unsubscriptions to events // Read subscriptions/unsubscriptions to events
@ -269,9 +309,9 @@ func (wsc *WSConnection) OnStart() error {
return nil return nil
} }
func (wsc *WSConnection) OnStop() {
func (wsc *wsConnection) OnStop() {
wsc.QuitService.OnStop() wsc.QuitService.OnStop()
wsc.evsw.RemoveListener(wsc.id)
wsc.evsw.RemoveListener(wsc.remoteAddr)
wsc.readTimeout.Stop() wsc.readTimeout.Stop()
wsc.pingTicker.Stop() wsc.pingTicker.Stop()
// The write loop closes the websocket connection // The write loop closes the websocket connection
@ -279,7 +319,7 @@ func (wsc *WSConnection) OnStop() {
// closes the writeChan // closes the writeChan
} }
func (wsc *WSConnection) readTimeoutRoutine() {
func (wsc *wsConnection) readTimeoutRoutine() {
select { select {
case <-wsc.readTimeout.C: case <-wsc.readTimeout.C:
log.Notice("Stopping connection due to read timeout") log.Notice("Stopping connection due to read timeout")
@ -289,8 +329,19 @@ func (wsc *WSConnection) readTimeoutRoutine() {
} }
} }
// Implements WSRPCConnection
func (wsc *wsConnection) GetRemoteAddr() string {
return wsc.remoteAddr
}
// Implements WSRPCConnection
func (wsc *wsConnection) GetEventSwitch() *events.EventSwitch {
return wsc.evsw
}
// Implements WSRPCConnection
// Blocking write to writeChan until service stops. // Blocking write to writeChan until service stops.
func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) {
func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) {
select { select {
case <-wsc.Quit: case <-wsc.Quit:
return return
@ -298,8 +349,9 @@ func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) {
} }
} }
// Implements WSRPCConnection
// Nonblocking write. // Nonblocking write.
func (wsc *WSConnection) tryWriteRPCResponse(resp RPCResponse) bool {
func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool {
select { select {
case <-wsc.Quit: case <-wsc.Quit:
return false return false
@ -311,8 +363,8 @@ func (wsc *WSConnection) tryWriteRPCResponse(resp RPCResponse) bool {
} }
// Read from the socket and subscribe to or unsubscribe from events // Read from the socket and subscribe to or unsubscribe from events
func (wsc *WSConnection) readRoutine() {
// Do not close writeChan, to allow writeRPCResponse() to fail.
func (wsc *wsConnection) readRoutine() {
// Do not close writeChan, to allow WriteRPCResponse() to fail.
// defer close(wsc.writeChan) // defer close(wsc.writeChan)
for { for {
@ -327,7 +379,7 @@ func (wsc *WSConnection) readRoutine() {
// We use `readTimeout` to handle read timeouts. // We use `readTimeout` to handle read timeouts.
_, in, err := wsc.baseConn.ReadMessage() _, in, err := wsc.baseConn.ReadMessage()
if err != nil { if err != nil {
log.Notice("Failed to read from connection", "id", wsc.id)
log.Notice("Failed to read from connection", "remote", wsc.remoteAddr)
// an error reading the connection, // an error reading the connection,
// kill the connection // kill the connection
wsc.Stop() wsc.Stop()
@ -337,75 +389,45 @@ func (wsc *WSConnection) readRoutine() {
err = json.Unmarshal(in, &request) err = json.Unmarshal(in, &request)
if err != nil { if err != nil {
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, errStr))
wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr))
continue continue
} }
switch request.Method {
case "subscribe":
if len(request.Params) != 1 {
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "subscribe takes 1 event parameter string"))
continue
}
if event, ok := request.Params[0].(string); !ok {
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "subscribe takes 1 event parameter string"))
continue
} else {
log.Notice("Subscribe to event", "id", wsc.id, "event", event)
wsc.evsw.AddListenerForEvent(wsc.id, event, func(msg types.EventData) {
// NOTE: EventSwitch callbacks must be nonblocking
// NOTE: RPCResponses of subscribed events have id suffix "#event"
wsc.tryWriteRPCResponse(NewRPCResponse(request.ID+"#event", ctypes.ResultEvent{event, msg}, ""))
})
continue
}
case "unsubscribe":
if len(request.Params) == 0 {
log.Notice("Unsubscribe from all events", "id", wsc.id)
wsc.evsw.RemoveListener(wsc.id)
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, ""))
continue
} else if len(request.Params) == 1 {
if event, ok := request.Params[0].(string); !ok {
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "unsubscribe takes 0 or 1 event parameter strings"))
continue
} else {
log.Notice("Unsubscribe from event", "id", wsc.id, "event", event)
wsc.evsw.RemoveListenerForEvent(event, wsc.id)
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, ""))
continue
}
} else {
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "unsubscribe takes 0 or 1 event parameter strings"))
continue
}
default:
rpcFunc := wsc.funcMap[request.Method]
if rpcFunc == nil {
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
continue
}
args, err := jsonParamsToArgs(rpcFunc, request.Params)
if err != nil {
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
continue
}
returns := rpcFunc.f.Call(args)
log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns)
result, err := unreflectResult(returns)
if err != nil {
wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
continue
} else {
wsc.writeRPCResponse(NewRPCResponse(request.ID, result, ""))
continue
}
// Now, fetch the RPCFunc and execute it.
rpcFunc := wsc.funcMap[request.Method]
if rpcFunc == nil {
wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method))
continue
} }
var args []reflect.Value
if rpcFunc.ws {
wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc}
args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx)
} else {
args, err = jsonParamsToArgs(rpcFunc, request.Params)
}
if err != nil {
wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
continue
}
returns := rpcFunc.f.Call(args)
log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns)
result, err := unreflectResult(returns)
if err != nil {
wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error()))
continue
} else {
wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, ""))
continue
}
} }
} }
} }
// receives on a write channel and writes out on the socket // receives on a write channel and writes out on the socket
func (wsc *WSConnection) writeRoutine() {
func (wsc *wsConnection) writeRoutine() {
defer wsc.baseConn.Close() defer wsc.baseConn.Close()
var n, err = int(0), error(nil) var n, err = int(0), error(nil)
for { for {
@ -463,7 +485,7 @@ func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch)
} }
} }
// Upgrade the request/response (via http.Hijack) and starts the WSConnection.
// Upgrade the request/response (via http.Hijack) and starts the wsConnection.
func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
wsConn, err := wm.Upgrade(w, r, nil) wsConn, err := wm.Upgrade(w, r, nil)
if err != nil { if err != nil {
@ -474,7 +496,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ
// register connection // register connection
con := NewWSConnection(wsConn, wm.funcMap, wm.evsw) con := NewWSConnection(wsConn, wm.funcMap, wm.evsw)
log.Notice("New websocket connection", "origin", con.id)
log.Notice("New websocket connection", "remote", con.remoteAddr)
con.Start() // Blocking con.Start() // Blocking
} }


+ 3
- 3
rpc/server/http_server.go View File

@ -32,7 +32,7 @@ func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, err
return listener, nil return listener, nil
} }
func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) {
func WriteRPCResponseHTTP(w http.ResponseWriter, res RPCResponse) {
buf, n, err := new(bytes.Buffer), int(0), error(nil) buf, n, err := new(bytes.Buffer), int(0), error(nil)
wire.WriteJSON(res, buf, &n, &err) wire.WriteJSON(res, buf, &n, &err)
if err != nil { if err != nil {
@ -70,12 +70,12 @@ func RecoverAndLogHandler(handler http.Handler) http.Handler {
// If RPCResponse // If RPCResponse
if res, ok := e.(RPCResponse); ok { if res, ok := e.(RPCResponse); ok {
WriteRPCResponse(rww, res)
WriteRPCResponseHTTP(rww, res)
} else { } else {
// For the rest, // For the rest,
log.Error("Panic in RPC HTTP handler", "error", e, "stack", string(debug.Stack())) log.Error("Panic in RPC HTTP handler", "error", e, "stack", string(debug.Stack()))
rww.WriteHeader(http.StatusInternalServerError) rww.WriteHeader(http.StatusInternalServerError)
WriteRPCResponse(rww, NewRPCResponse("", nil, Fmt("Internal Server Error: %v", e)))
WriteRPCResponseHTTP(rww, NewRPCResponse("", nil, Fmt("Internal Server Error: %v", e)))
} }
} }


+ 43
- 5
rpc/types/types.go View File

@ -1,5 +1,9 @@
package rpctypes package rpctypes
import (
"github.com/tendermint/tendermint/events"
)
type RPCRequest struct { type RPCRequest struct {
JSONRPC string `json:"jsonrpc"` JSONRPC string `json:"jsonrpc"`
ID string `json:"id"` ID string `json:"id"`
@ -16,14 +20,32 @@ func NewRPCRequest(id string, method string, params []interface{}) RPCRequest {
} }
} }
//----------------------------------------
/*
Result is a generic interface.
Applications should register type-bytes like so:
var _ = wire.RegisterInterface(
struct{ Result }{},
wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis},
wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo},
...
)
*/
type Result interface {
}
//----------------------------------------
type RPCResponse struct { type RPCResponse struct {
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`
Result interface{} `json:"result"`
Error string `json:"error"`
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`
Result Result `json:"result"`
Error string `json:"error"`
} }
func NewRPCResponse(id string, res interface{}, err string) RPCResponse {
func NewRPCResponse(id string, res Result, err string) RPCResponse {
return RPCResponse{ return RPCResponse{
JSONRPC: "2.0", JSONRPC: "2.0",
ID: id, ID: id,
@ -31,3 +53,19 @@ func NewRPCResponse(id string, res interface{}, err string) RPCResponse {
Error: err, Error: err,
} }
} }
//----------------------------------------
// *wsConnection implements this interface.
type WSRPCConnection interface {
GetRemoteAddr() string
GetEventSwitch() *events.EventSwitch
WriteRPCResponse(resp RPCResponse)
TryWriteRPCResponse(resp RPCResponse) bool
}
// websocket-only RPCFuncs take this as the first parameter.
type WSRPCContext struct {
Request RPCRequest
WSRPCConnection
}

Loading…
Cancel
Save