@ -4,7 +4,6 @@ import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
@ -14,10 +13,11 @@ import (
"time"
"github.com/gorilla/websocket"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-events"
. "github.com/tendermint/go-rpc/types"
"github.com/tendermint/go-wire"
"github.com/pkg/errors"
cmn "github.com/tendermint/go-common"
events "github.com/tendermint/go-events"
types "github.com/tendermint/go-rpc/types"
wire "github.com/tendermint/go-wire"
)
// Adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions.
@ -105,76 +105,100 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
return
}
var request RPCRequest
var request types . RPCRequest
err := json . Unmarshal ( b , & request )
if err != nil {
WriteRPCResponseHTTP ( w , NewRPCResponse ( "" , nil , fmt . Sprintf ( "Error unmarshalling request: %v" , err . Error ( ) ) ) )
WriteRPCResponseHTTP ( w , types . NewRPCResponse ( "" , nil , fmt . Sprintf ( "Error unmarshalling request: %v" , err . Error ( ) ) ) )
return
}
if len ( r . URL . Path ) > 1 {
WriteRPCResponseHTTP ( w , NewRPCResponse ( request . ID , nil , fmt . Sprintf ( "Invalid JSONRPC endpoint %s" , r . URL . Path ) ) )
WriteRPCResponseHTTP ( w , types . NewRPCResponse ( request . ID , nil , fmt . Sprintf ( "Invalid JSONRPC endpoint %s" , r . URL . Path ) ) )
return
}
rpcFunc := funcMap [ request . Method ]
if rpcFunc == nil {
WriteRPCResponseHTTP ( w , NewRPCResponse ( request . ID , nil , "RPC method unknown: " + request . Method ) )
WriteRPCResponseHTTP ( w , types . NewRPCResponse ( request . ID , nil , "RPC method unknown: " + request . Method ) )
return
}
if rpcFunc . ws {
WriteRPCResponseHTTP ( w , NewRPCResponse ( request . ID , nil , "RPC method is only for websockets: " + request . Method ) )
WriteRPCResponseHTTP ( w , types . NewRPCResponse ( request . ID , nil , "RPC method is only for websockets: " + request . Method ) )
return
}
args , err := jsonParamsToArgs ( rpcFunc , request . Params )
args , err := jsonParamsToArgsRPC ( rpcFunc , request . Params )
if err != nil {
WriteRPCResponseHTTP ( w , NewRPCResponse ( request . ID , nil , fmt . Sprintf ( "Error converting json params to arguments: %v" , err . Error ( ) ) ) )
WriteRPCResponseHTTP ( w , types . NewRPCResponse ( request . ID , nil , fmt . Sprintf ( "Error converting json params to arguments: %v" , err . Error ( ) ) ) )
return
}
returns := rpcFunc . f . Call ( args )
log . Info ( "HTTPJSONRPC" , "method" , request . Method , "args" , args , "returns" , returns )
result , err := unreflectResult ( returns )
if err != nil {
WriteRPCResponseHTTP ( w , NewRPCResponse ( request . ID , result , fmt . Sprintf ( "Error unreflecting result: %v" , err . Error ( ) ) ) )
WriteRPCResponseHTTP ( w , types . NewRPCResponse ( request . ID , result , err . Error ( ) ) )
return
}
WriteRPCResponseHTTP ( w , NewRPCResponse ( request . ID , result , "" ) )
WriteRPCResponseHTTP ( w , types . NewRPCResponse ( request . ID , result , "" ) )
}
}
// Convert a list of interfaces to properly typed values
func jsonParamsToArgs ( rpcFunc * RPCFunc , params [ ] interface { } ) ( [ ] reflect . Value , error ) {
if len ( rpcFunc . argNames ) != len ( params ) {
return nil , errors . New ( fmt . Sprintf ( "Expected %v parameters (%v), got %v (%v)" ,
len ( rpcFunc . argNames ) , rpcFunc . argNames , len ( params ) , params ) )
}
values := make ( [ ] reflect . Value , len ( params ) )
for i , p := range params {
ty := rpcFunc . args [ i ]
v , err := _jsonObjectToArg ( ty , p )
if err != nil {
return nil , err
// Convert a []interface{} OR a map[string]interface{} to properly typed values
//
// argsOffset should be 0 for RPC calls, and 1 for WS requests, where len(rpcFunc.args) != len(rpcFunc.argNames).
// Example:
// rpcFunc.args = [rpctypes.WSRPCContext string]
// rpcFunc.argNames = ["arg"]
func jsonParamsToArgs ( rpcFunc * RPCFunc , paramsI interface { } , argsOffset int ) ( [ ] reflect . Value , error ) {
values := make ( [ ] reflect . Value , len ( rpcFunc . argNames ) )
switch params := paramsI . ( type ) {
case map [ string ] interface { } :
for i , argName := range rpcFunc . argNames {
argType := rpcFunc . args [ i + argsOffset ]
// decode param if provided
if param , ok := params [ argName ] ; ok && "" != param {
v , err := _jsonObjectToArg ( argType , param )
if err != nil {
return nil , err
}
values [ i ] = v
} else { // use default for that type
values [ i ] = reflect . Zero ( argType )
}
}
case [ ] interface { } :
if len ( rpcFunc . argNames ) != len ( params ) {
return nil , errors . New ( fmt . Sprintf ( "Expected %v parameters (%v), got %v (%v)" ,
len ( rpcFunc . argNames ) , rpcFunc . argNames , len ( params ) , params ) )
}
values := make ( [ ] reflect . Value , len ( params ) )
for i , p := range params {
ty := rpcFunc . args [ i + argsOffset ]
v , err := _jsonObjectToArg ( ty , p )
if err != nil {
return nil , err
}
values [ i ] = v
}
values [ i ] = v
return values , nil
default :
return nil , fmt . Errorf ( "Unknown type for JSON params %v. Expected map[string]interface{} or []interface{}" , reflect . TypeOf ( paramsI ) )
}
return values , nil
}
// Convert a []interface{} OR a map[string]interface{} to properly typed values
func jsonParamsToArgsRPC ( rpcFunc * RPCFunc , paramsI interface { } ) ( [ ] reflect . Value , error ) {
return jsonParamsToArgs ( rpcFunc , paramsI , 0 )
}
// Same as above, but with the first param the websocket connection
func jsonParamsToArgsWS ( rpcFunc * RPCFunc , params [ ] interface { } , wsCtx WSRPCContext ) ( [ ] reflect . Value , error ) {
if len ( rpcFunc . argNames ) != len ( params ) {
return nil , errors . New ( fmt . Sprintf ( "Expected %v parameters (%v), got %v (%v)" ,
len ( rpcFunc . argNames ) - 1 , rpcFunc . argNames [ 1 : ] , len ( params ) , params ) )
}
values := make ( [ ] reflect . Value , len ( params ) + 1 )
values [ 0 ] = reflect . ValueOf ( wsCtx )
for i , p := range params {
ty := rpcFunc . args [ i + 1 ]
v , err := _jsonObjectToArg ( ty , p )
if err != nil {
return nil , err
}
values [ i + 1 ] = v
func jsonParamsToArgsWS ( rpcFunc * RPCFunc , paramsI interface { } , wsCtx types . WSRPCContext ) ( [ ] reflect . Value , error ) {
values , err := jsonParamsToArgs ( rpcFunc , paramsI , 1 )
if err != nil {
return nil , err
}
return values , nil
return append ( [ ] reflect . Value { reflect . ValueOf ( wsCtx ) } , values ... ) , nil
}
func _jsonObjectToArg ( ty reflect . Type , object interface { } ) ( reflect . Value , error ) {
@ -197,7 +221,7 @@ func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request)
// Exception for websocket endpoints
if rpcFunc . ws {
return func ( w http . ResponseWriter , r * http . Request ) {
WriteRPCResponseHTTP ( w , NewRPCResponse ( "" , nil , "This RPC method is only for websockets" ) )
WriteRPCResponseHTTP ( w , types . NewRPCResponse ( "" , nil , "This RPC method is only for websockets" ) )
}
}
// All other endpoints
@ -205,33 +229,38 @@ func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request)
log . Debug ( "HTTP HANDLER" , "req" , r )
args , err := httpParamsToArgs ( rpcFunc , r )
if err != nil {
WriteRPCResponseHTTP ( w , NewRPCResponse ( "" , nil , fmt . Sprintf ( "Error converting http params to args: %v" , err . Error ( ) ) ) )
WriteRPCResponseHTTP ( w , types . NewRPCResponse ( "" , nil , fmt . Sprintf ( "Error converting http params to args: %v" , err . Error ( ) ) ) )
return
}
returns := rpcFunc . f . Call ( args )
log . Info ( "HTTPRestRPC" , "method" , r . URL . Path , "args" , args , "returns" , returns )
result , err := unreflectResult ( returns )
if err != nil {
WriteRPCResponseHTTP ( w , NewRPCResponse ( "" , nil , fmt . Sprintf ( "Error unreflecting result: %v" , err . Error ( ) ) ) )
WriteRPCResponseHTTP ( w , types . NewRPCResponse ( "" , nil , err . Error ( ) ) )
return
}
WriteRPCResponseHTTP ( w , NewRPCResponse ( "" , result , "" ) )
WriteRPCResponseHTTP ( w , types . NewRPCResponse ( "" , result , "" ) )
}
}
// Covert an http query to a list of properly typed values.
// To be properly decoded the arg must be a concrete type from tendermint (if its an interface).
func httpParamsToArgs ( rpcFunc * RPCFunc , r * http . Request ) ( [ ] reflect . Value , error ) {
argTypes := rpcFunc . args
argNames := rpcFunc . argNames
values := make ( [ ] reflect . Value , len ( rpcFunc . args ) )
for i , name := range rpcFunc . argNames {
argType := rpcFunc . args [ i ]
values [ i ] = reflect . Zero ( argType ) // set default for that type
values := make ( [ ] reflect . Value , len ( argNames ) )
for i , name := range argNames {
ty := argTypes [ i ]
arg := GetParam ( r , name )
// log.Notice("param to arg", "ty", ty, "name", name, "arg", arg)
// log.Notice("param to arg", "argType", argType, "name", name, "arg", arg)
if "" == arg {
continue
}
v , err , ok := nonJsonToArg ( ty , arg )
v , err , ok := nonJsonToArg ( argType , arg )
if err != nil {
return nil , err
}
@ -241,11 +270,12 @@ func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error
}
// Pass values to go-wire
values [ i ] , err = _jsonStringToArg ( ty , arg )
values [ i ] , err = _jsonStringToArg ( argType , arg )
if err != nil {
return nil , err
}
}
return values , nil
}
@ -268,7 +298,7 @@ func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) {
if isHexString {
if ! expectingString && ! expectingByteSlice {
err := fmt . Errorf ( "Got a hex string arg, but expected '%s'" ,
err := errors . Errorf ( "Got a hex string arg, but expected '%s'" ,
ty . Kind ( ) . String ( ) )
return reflect . ValueOf ( nil ) , err , false
}
@ -313,11 +343,11 @@ const (
// contains listener id, underlying ws connection,
// and the event switch for subscribing to events
type wsConnection struct {
BaseService
cmn . BaseService
remoteAddr string
baseConn * websocket . Conn
writeChan chan RPCResponse
writeChan chan types . RPCResponse
readTimeout * time . Timer
pingTicker * time . Ticker
@ -330,11 +360,11 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw
wsc := & wsConnection {
remoteAddr : baseConn . RemoteAddr ( ) . String ( ) ,
baseConn : baseConn ,
writeChan : make ( chan RPCResponse , writeChanCapacity ) , // error when full.
writeChan : make ( chan types . RPCResponse , writeChanCapacity ) , // error when full.
funcMap : funcMap ,
evsw : evsw ,
}
wsc . BaseService = * NewBaseService ( log , "wsConnection" , wsc )
wsc . BaseService = * cmn . NewBaseService ( log , "wsConnection" , wsc )
return wsc
}
@ -342,12 +372,15 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw
func ( wsc * wsConnection ) OnStart ( ) error {
wsc . BaseService . OnStart ( )
// these must be set before the readRoutine is created, as it may
// call wsc.Stop(), which accesses these timers
wsc . readTimeout = time . NewTimer ( time . Second * wsReadTimeoutSeconds )
wsc . pingTicker = time . NewTicker ( time . Second * wsPingTickerSeconds )
// Read subscriptions/unsubscriptions to events
go wsc . readRoutine ( )
// Custom Ping handler to touch readTimeout
wsc . readTimeout = time . NewTimer ( time . Second * wsReadTimeoutSeconds )
wsc . pingTicker = time . NewTicker ( time . Second * wsPingTickerSeconds )
wsc . baseConn . SetPingHandler ( func ( m string ) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
go wsc . baseConn . WriteControl ( websocket . PongMessage , [ ] byte ( m ) , time . Now ( ) . Add ( time . Second * wsWriteTimeoutSeconds ) )
@ -368,7 +401,9 @@ func (wsc *wsConnection) OnStart() error {
func ( wsc * wsConnection ) OnStop ( ) {
wsc . BaseService . OnStop ( )
wsc . evsw . RemoveListener ( wsc . remoteAddr )
if wsc . evsw != nil {
wsc . evsw . RemoveListener ( wsc . remoteAddr )
}
wsc . readTimeout . Stop ( )
wsc . pingTicker . Stop ( )
// The write loop closes the websocket connection
@ -399,7 +434,7 @@ func (wsc *wsConnection) GetEventSwitch() events.EventSwitch {
// Implements WSRPCConnection
// Blocking write to writeChan until service stops.
// Goroutine-safe
func ( wsc * wsConnection ) WriteRPCResponse ( resp RPCResponse ) {
func ( wsc * wsConnection ) WriteRPCResponse ( resp types . RPCResponse ) {
select {
case <- wsc . Quit :
return
@ -410,7 +445,7 @@ func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) {
// Implements WSRPCConnection
// Nonblocking write.
// Goroutine-safe
func ( wsc * wsConnection ) TryWriteRPCResponse ( resp RPCResponse ) bool {
func ( wsc * wsConnection ) TryWriteRPCResponse ( resp types . RPCResponse ) bool {
select {
case <- wsc . Quit :
return false
@ -444,11 +479,11 @@ func (wsc *wsConnection) readRoutine() {
wsc . Stop ( )
return
}
var request RPCRequest
var request types . RPCRequest
err = json . Unmarshal ( in , & request )
if err != nil {
errStr := fmt . Sprintf ( "Error unmarshaling data: %s" , err . Error ( ) )
wsc . WriteRPCResponse ( NewRPCResponse ( request . ID , nil , errStr ) )
wsc . WriteRPCResponse ( types . NewRPCResponse ( request . ID , nil , errStr ) )
continue
}
@ -456,28 +491,28 @@ func (wsc *wsConnection) readRoutine() {
rpcFunc := wsc . funcMap [ request . Method ]
if rpcFunc == nil {
wsc . WriteRPCResponse ( NewRPCResponse ( request . ID , nil , "RPC method unknown: " + request . Method ) )
wsc . WriteRPCResponse ( types . NewRPCResponse ( request . ID , nil , "RPC method unknown: " + request . Method ) )
continue
}
var args [ ] reflect . Value
if rpcFunc . ws {
wsCtx := WSRPCContext { Request : request , WSRPCConnection : wsc }
wsCtx := types . WSRPCContext { Request : request , WSRPCConnection : wsc }
args , err = jsonParamsToArgsWS ( rpcFunc , request . Params , wsCtx )
} else {
args , err = jsonParamsToArgs ( rpcFunc , request . Params )
args , err = jsonParamsToArgsRPC ( rpcFunc , request . Params )
}
if err != nil {
wsc . WriteRPCResponse ( NewRPCResponse ( request . ID , nil , err . Error ( ) ) )
wsc . WriteRPCResponse ( types . NewRPCResponse ( request . ID , nil , err . Error ( ) ) )
continue
}
returns := rpcFunc . f . Call ( args )
log . Info ( "WSJSONRPC" , "method" , request . Method , "args" , args , "returns" , returns )
result , err := unreflectResult ( returns )
if err != nil {
wsc . WriteRPCResponse ( NewRPCResponse ( request . ID , nil , err . Error ( ) ) )
wsc . WriteRPCResponse ( types . NewRPCResponse ( request . ID , nil , err . Error ( ) ) )
continue
} else {
wsc . WriteRPCResponse ( NewRPCResponse ( request . ID , result , "" ) )
wsc . WriteRPCResponse ( types . NewRPCResponse ( request . ID , result , "" ) )
continue
}
@ -563,7 +598,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ
func unreflectResult ( returns [ ] reflect . Value ) ( interface { } , error ) {
errV := returns [ 1 ]
if errV . Interface ( ) != nil {
return nil , fmt . Errorf ( "%v" , errV . Interface ( ) )
return nil , errors . Errorf ( "%v" , errV . Interface ( ) )
}
rv := returns [ 0 ]
// the result is a registered interface,