diff --git a/node/node.go b/node/node.go index 82b42d990..08b9d249c 100644 --- a/node/node.go +++ b/node/node.go @@ -209,8 +209,10 @@ func (n *Node) StartRPC() (net.Listener, error) { core.SetGenDoc(n.genDoc) listenAddr := config.GetString("rpc_laddr") + mux := http.NewServeMux() - rpcserver.RegisterEventsHandler(mux, n.evsw) + wm := rpcserver.NewWebsocketManager(core.Routes, n.evsw) + mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, core.Routes) return rpcserver.StartHTTPServer(listenAddr, mux) } diff --git a/rpc/client/client.go b/rpc/client/client.go index aa756601c..95abec98c 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -18,7 +18,7 @@ func Call(remote string, method string, params []interface{}, dest interface{}) JSONRPC: "2.0", Method: method, Params: params, - Id: 0, + Id: "", } requestBytes := binary.JSONBytes(request) requestBuf := bytes.NewBuffer(requestBytes) diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 223dbb13f..afc9586ae 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -25,4 +25,5 @@ var Routes = map[string]*rpc.RPCFunc{ "list_names": rpc.NewRPCFunc(ListNames, []string{}), "unsafe/gen_priv_account": rpc.NewRPCFunc(GenPrivAccount, []string{}), "unsafe/sign_tx": rpc.NewRPCFunc(SignTx, []string{"tx", "privAccounts"}), + // subscribe/unsubscribe are reserved for websocket events. } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 6c3e6826c..be428f7ea 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -83,3 +83,11 @@ type ResponseListNames struct { BlockHeight int `json:"block_height"` Names []*types.NameRegEntry `json:"names"` } + +//---------------------------------------- +// event responses + +type ResponseEvent struct { + Event string `json:"event"` + Data interface{} `json:"data"` +} diff --git a/rpc/core_client/client.go b/rpc/core_client/client.go index 2af0ef0b5..0d71b1c46 100644 --- a/rpc/core_client/client.go +++ b/rpc/core_client/client.go @@ -173,7 +173,7 @@ fmt JSONRPC: "2.0", Method: reverseFuncMap["{{name}}"], Params: []interface{}{ {{args.ident}} }, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil{ diff --git a/rpc/core_client/client_methods.go b/rpc/core_client/client_methods.go index 9daad8a99..9345b82e9 100644 --- a/rpc/core_client/client_methods.go +++ b/rpc/core_client/client_methods.go @@ -611,7 +611,7 @@ func (c *ClientJSON) BlockchainInfo(minHeight int, maxHeight int) (*ctypes.Respo JSONRPC: "2.0", Method: reverseFuncMap["BlockchainInfo"], Params: []interface{}{minHeight, maxHeight}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -638,7 +638,7 @@ func (c *ClientJSON) BroadcastTx(tx types.Tx) (*ctypes.Receipt, error) { JSONRPC: "2.0", Method: reverseFuncMap["BroadcastTx"], Params: []interface{}{tx}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -665,7 +665,7 @@ func (c *ClientJSON) Call(fromAddress []byte, toAddress []byte, data []byte) (*c JSONRPC: "2.0", Method: reverseFuncMap["Call"], Params: []interface{}{fromAddress, toAddress, data}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -692,7 +692,7 @@ func (c *ClientJSON) CallCode(fromAddress []byte, code []byte, data []byte) (*ct JSONRPC: "2.0", Method: reverseFuncMap["CallCode"], Params: []interface{}{fromAddress, code, data}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -719,7 +719,7 @@ func (c *ClientJSON) DumpConsensusState() (*ctypes.ResponseDumpConsensusState, e JSONRPC: "2.0", Method: reverseFuncMap["DumpConsensusState"], Params: []interface{}{}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -746,7 +746,7 @@ func (c *ClientJSON) DumpStorage(address []byte) (*ctypes.ResponseDumpStorage, e JSONRPC: "2.0", Method: reverseFuncMap["DumpStorage"], Params: []interface{}{address}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -773,7 +773,7 @@ func (c *ClientJSON) GenPrivAccount() (*acm.PrivAccount, error) { JSONRPC: "2.0", Method: reverseFuncMap["GenPrivAccount"], Params: []interface{}{}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -800,7 +800,7 @@ func (c *ClientJSON) Genesis() (*sm.GenesisDoc, error) { JSONRPC: "2.0", Method: reverseFuncMap["Genesis"], Params: []interface{}{}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -827,7 +827,7 @@ func (c *ClientJSON) GetAccount(address []byte) (*acm.Account, error) { JSONRPC: "2.0", Method: reverseFuncMap["GetAccount"], Params: []interface{}{address}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -854,7 +854,7 @@ func (c *ClientJSON) GetBlock(height int) (*ctypes.ResponseGetBlock, error) { JSONRPC: "2.0", Method: reverseFuncMap["GetBlock"], Params: []interface{}{height}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -881,7 +881,7 @@ func (c *ClientJSON) GetName(name string) (*types.NameRegEntry, error) { JSONRPC: "2.0", Method: reverseFuncMap["GetName"], Params: []interface{}{name}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -908,7 +908,7 @@ func (c *ClientJSON) GetStorage(address []byte, key []byte) (*ctypes.ResponseGet JSONRPC: "2.0", Method: reverseFuncMap["GetStorage"], Params: []interface{}{address, key}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -935,7 +935,7 @@ func (c *ClientJSON) ListAccounts() (*ctypes.ResponseListAccounts, error) { JSONRPC: "2.0", Method: reverseFuncMap["ListAccounts"], Params: []interface{}{}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -962,7 +962,7 @@ func (c *ClientJSON) ListNames() (*ctypes.ResponseListNames, error) { JSONRPC: "2.0", Method: reverseFuncMap["ListNames"], Params: []interface{}{}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -989,7 +989,7 @@ func (c *ClientJSON) ListUnconfirmedTxs() ([]types.Tx, error) { JSONRPC: "2.0", Method: reverseFuncMap["ListUnconfirmedTxs"], Params: []interface{}{}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -1016,7 +1016,7 @@ func (c *ClientJSON) ListValidators() (*ctypes.ResponseListValidators, error) { JSONRPC: "2.0", Method: reverseFuncMap["ListValidators"], Params: []interface{}{}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -1043,7 +1043,7 @@ func (c *ClientJSON) NetInfo() (*ctypes.ResponseNetInfo, error) { JSONRPC: "2.0", Method: reverseFuncMap["NetInfo"], Params: []interface{}{}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -1070,7 +1070,7 @@ func (c *ClientJSON) SignTx(tx types.Tx, privAccounts []*acm.PrivAccount) (types JSONRPC: "2.0", Method: reverseFuncMap["SignTx"], Params: []interface{}{tx, privAccounts}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { @@ -1097,7 +1097,7 @@ func (c *ClientJSON) Status() (*ctypes.ResponseStatus, error) { JSONRPC: "2.0", Method: reverseFuncMap["Status"], Params: []interface{}{}, - Id: 0, + Id: "", } body, err := c.RequestResponse(request) if err != nil { diff --git a/rpc/core_client/ws_client.go b/rpc/core_client/ws_client.go index 057656ac7..c64636476 100644 --- a/rpc/core_client/ws_client.go +++ b/rpc/core_client/ws_client.go @@ -32,17 +32,21 @@ func (wsc *WSClient) Dial() (*http.Response, error) { // subscribe to an event func (wsc *WSClient) Subscribe(eventid string) error { - return wsc.conn.WriteJSON(rpctypes.WSRequest{ - Type: "subscribe", - Event: eventid, + return wsc.conn.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + Id: "", + Method: "subscribe", + Params: []interface{}{eventid}, }) } // unsubscribe from an event func (wsc *WSClient) Unsubscribe(eventid string) error { - return wsc.conn.WriteJSON(rpctypes.WSRequest{ - Type: "unsubscribe", - Event: eventid, + return wsc.conn.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + Id: "", + Method: "unsubscribe", + Params: []interface{}{eventid}, }) } diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index 3c2ab0028..0470135be 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -28,12 +28,6 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { mux.HandleFunc("/", makeJSONRPCHandler(funcMap)) } -func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) { - // websocket endpoint - wm := NewWebsocketManager(evsw) - mux.HandleFunc("/events", wm.websocketHandler) // websocket.Handler(w.eventsHandler)) -} - //------------------------------------- // function introspection @@ -84,12 +78,7 @@ func funcReturnTypes(f interface{}) []reflect.Type { // jsonrpc calls grab the given method's function info and runs reflect.Call func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - if len(r.URL.Path) > 1 { - WriteRPCResponse(w, NewRPCResponse(nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path))) - return - } b, _ := ioutil.ReadAll(r.Body) - // if its an empty request (like from a browser), // just display a list of functions if len(b) == 0 { @@ -100,27 +89,31 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc { var request RPCRequest err := json.Unmarshal(b, &request) if err != nil { - WriteRPCResponse(w, NewRPCResponse(nil, err.Error())) + WriteRPCResponse(w, NewRPCResponse("", nil, err.Error())) + return + } + if len(r.URL.Path) > 1 { + WriteRPCResponse(w, NewRPCResponse(request.Id, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path))) return } rpcFunc := funcMap[request.Method] if rpcFunc == nil { - WriteRPCResponse(w, NewRPCResponse(nil, "RPC method unknown: "+request.Method)) + WriteRPCResponse(w, NewRPCResponse(request.Id, nil, "RPC method unknown: "+request.Method)) return } args, err := jsonParamsToArgs(rpcFunc, request.Params) if err != nil { - WriteRPCResponse(w, NewRPCResponse(nil, err.Error())) + WriteRPCResponse(w, NewRPCResponse(request.Id, nil, err.Error())) return } returns := rpcFunc.f.Call(args) log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) - response, err := unreflectResponse(returns) + result, err := unreflectResult(returns) if err != nil { - WriteRPCResponse(w, NewRPCResponse(nil, err.Error())) + WriteRPCResponse(w, NewRPCResponse(request.Id, nil, err.Error())) return } - WriteRPCResponse(w, NewRPCResponse(response, "")) + WriteRPCResponse(w, NewRPCResponse(request.Id, result, "")) } } @@ -162,17 +155,17 @@ func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) return func(w http.ResponseWriter, r *http.Request) { args, err := httpParamsToArgs(rpcFunc, r) if err != nil { - WriteRPCResponse(w, NewRPCResponse(nil, err.Error())) + WriteRPCResponse(w, NewRPCResponse("", nil, err.Error())) return } returns := rpcFunc.f.Call(args) log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) - response, err := unreflectResponse(returns) + result, err := unreflectResult(returns) if err != nil { - WriteRPCResponse(w, NewRPCResponse(nil, err.Error())) + WriteRPCResponse(w, NewRPCResponse("", nil, err.Error())) return } - WriteRPCResponse(w, NewRPCResponse(response, "")) + WriteRPCResponse(w, NewRPCResponse("", result, "")) } } @@ -224,18 +217,21 @@ type WSConnection struct { id string baseConn *websocket.Conn - writeChan chan WSResponse + writeChan chan RPCResponse readTimeout *time.Timer - evsw *events.EventSwitch + funcMap map[string]*RPCFunc + evsw *events.EventSwitch } // new websocket connection wrapper -func NewWSConnection(baseConn *websocket.Conn) *WSConnection { +func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WSConnection { wsc := &WSConnection{ id: baseConn.RemoteAddr().String(), baseConn: baseConn, - writeChan: make(chan WSResponse, writeChanCapacity), // error when full. + writeChan: make(chan RPCResponse, writeChanCapacity), // error when full. + funcMap: funcMap, + evsw: evsw, } wsc.QuitService = *NewQuitService(log, "WSConnection", wsc) return wsc @@ -255,6 +251,10 @@ func (wsc *WSConnection) OnStart() { wsc.readTimeout.Reset(time.Second * WSReadTimeoutSeconds) return nil }) + wsc.baseConn.SetPongHandler(func(m string) error { + wsc.readTimeout.Reset(time.Second * WSReadTimeoutSeconds) + return nil + }) go wsc.readTimeoutRoutine() // Write responses, BLOCKING. @@ -270,8 +270,6 @@ func (wsc *WSConnection) OnStop() { // closes the writeChan } -func (wsc *WSConnection) SetEventSwitch(evsw *events.EventSwitch) { wsc.evsw = evsw } - func (wsc *WSConnection) readTimeoutRoutine() { select { case <-wsc.readTimeout.C: @@ -283,7 +281,7 @@ func (wsc *WSConnection) readTimeoutRoutine() { } // Attempt to write response to writeChan and record failures -func (wsc *WSConnection) writeResponse(resp WSResponse) { +func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) { select { case wsc.writeChan <- resp: default: @@ -294,7 +292,8 @@ func (wsc *WSConnection) writeResponse(resp WSResponse) { // Read from the socket and subscribe to or unsubscribe from events func (wsc *WSConnection) readRoutine() { - defer close(wsc.writeChan) + // Do not close writeChan, to allow writeRPCResponse() to fail. + // defer close(wsc.writeChan) for { select { @@ -314,31 +313,68 @@ func (wsc *WSConnection) readRoutine() { wsc.Stop() return } - var req WSRequest - err = json.Unmarshal(in, &req) + var request RPCRequest + err = json.Unmarshal(in, &request) if err != nil { errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) - wsc.writeResponse(WSResponse{Error: errStr}) + wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, errStr)) continue } - switch req.Type { + switch request.Method { case "subscribe": - log.Notice("New event subscription", "id", wsc.id, "event", req.Event) - wsc.evsw.AddListenerForEvent(wsc.id, req.Event, func(msg interface{}) { - resp := WSResponse{ - Event: req.Event, - Data: msg, - } - wsc.writeResponse(resp) - }) - case "unsubscribe": - if req.Event != "" { - wsc.evsw.RemoveListenerForEvent(req.Event, wsc.id) + if len(request.Params) != 1 { + wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "subscribe takes 1 event parameter string")) + continue + } + if event, ok := request.Params[0].(string); !ok { + wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "subscribe takes 1 event parameter string")) + continue } else { + log.Notice("Subscribe to event", "id", wsc.id, "event", event) + wsc.evsw.AddListenerForEvent(wsc.id, event, func(msg interface{}) { + wsc.writeRPCResponse(NewRPCResponse(request.Id, RPCEventResult{event, msg}, "")) + }) + continue + } + case "unsubscribe": + if len(request.Params) == 0 { + log.Notice("Unsubscribe from all events", "id", wsc.id) wsc.evsw.RemoveListener(wsc.id) + continue + } else if len(request.Params) == 1 { + if event, ok := request.Params[0].(string); !ok { + wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "unsubscribe takes 0 or 1 event parameter strings")) + continue + } else { + log.Notice("Unsubscribe from event", "id", wsc.id, "event", event) + wsc.evsw.RemoveListenerForEvent(event, wsc.id) + continue + } + } else { + wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "unsubscribe takes 0 or 1 event parameter strings")) + continue } default: - wsc.writeResponse(WSResponse{Error: "Unknown request type: " + req.Type}) + rpcFunc := wsc.funcMap[request.Method] + if rpcFunc == nil { + wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, "RPC method unknown: "+request.Method)) + continue + } + args, err := jsonParamsToArgs(rpcFunc, request.Params) + if err != nil { + wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, err.Error())) + continue + } + returns := rpcFunc.f.Call(args) + log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + wsc.writeRPCResponse(NewRPCResponse(request.Id, nil, err.Error())) + continue + } else { + wsc.writeRPCResponse(NewRPCResponse(request.Id, result, "")) + continue + } } } } @@ -356,7 +392,7 @@ func (wsc *WSConnection) writeRoutine() { buf := new(bytes.Buffer) binary.WriteJSON(msg, buf, n, err) if *err != nil { - log.Error("Failed to marshal WSResponse to JSON", "error", err) + log.Error("Failed to marshal RPCResponse to JSON", "error", err) } else { wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * WSWriteTimeoutSeconds)) if err := wsc.baseConn.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { @@ -375,12 +411,14 @@ func (wsc *WSConnection) writeRoutine() { // holds the event switch type WebsocketManager struct { websocket.Upgrader - evsw *events.EventSwitch + funcMap map[string]*RPCFunc + evsw *events.EventSwitch } -func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager { +func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WebsocketManager { return &WebsocketManager{ - evsw: evsw, + funcMap: funcMap, + evsw: evsw, Upgrader: websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, @@ -393,7 +431,7 @@ func NewWebsocketManager(evsw *events.EventSwitch) *WebsocketManager { } // Upgrade the request/response (via http.Hijack) and starts the WSConnection. -func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Request) { +func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { wsConn, err := wm.Upgrade(w, r, nil) if err != nil { // TODO - return http error @@ -402,17 +440,16 @@ func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Requ } // register connection - con := NewWSConnection(wsConn) + con := NewWSConnection(wsConn, wm.funcMap, wm.evsw) log.Notice("New websocket connection", "origin", con.id) - con.SetEventSwitch(wm.evsw) con.Start() // Blocking } // rpc.websocket //----------------------------------------------------------------------------- -// returns is Response struct and error. If error is not nil, return it -func unreflectResponse(returns []reflect.Value) (interface{}, error) { +// returns is result struct and error. If error is not nil, return it +func unreflectResult(returns []reflect.Value) (interface{}, error) { errV := returns[1] if errV.Interface() != nil { return nil, fmt.Errorf("%v", errV.Interface()) diff --git a/rpc/server/http_server.go b/rpc/server/http_server.go index 899365db7..63f5d41f4 100644 --- a/rpc/server/http_server.go +++ b/rpc/server/http_server.go @@ -75,7 +75,7 @@ func RecoverAndLogHandler(handler http.Handler) http.Handler { // For the rest, log.Error("Panic in RPC HTTP handler", "error", e, "stack", string(debug.Stack())) rww.WriteHeader(http.StatusInternalServerError) - WriteRPCResponse(rww, NewRPCResponse(nil, Fmt("Internal Server Error: %v", e))) + WriteRPCResponse(rww, NewRPCResponse("", nil, Fmt("Internal Server Error: %v", e))) } } diff --git a/rpc/test/client_ws_test.go b/rpc/test/client_ws_test.go index aa8a9cb5e..59da51c46 100644 --- a/rpc/test/client_ws_test.go +++ b/rpc/test/client_ws_test.go @@ -20,7 +20,7 @@ func TestWSConnect(t *testing.T) { } // receive a new block message -func _TestWSNewBlock(t *testing.T) { +func TestWSNewBlock(t *testing.T) { con := newWSCon(t) eid := types.EventStringNewBlock() subscribe(t, con, eid) diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 3a7ffb551..8a553ead8 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -19,7 +19,7 @@ import ( var ( rpcAddr = "127.0.0.1:36657" // Not 46657 requestAddr = "http://" + rpcAddr + "/" - websocketAddr = "ws://" + rpcAddr + "/events" + websocketAddr = "ws://" + rpcAddr + "/websocket" node *nm.Node diff --git a/rpc/test/ws_helpers.go b/rpc/test/ws_helpers.go index 6e2038b19..830ef14dd 100644 --- a/rpc/test/ws_helpers.go +++ b/rpc/test/ws_helpers.go @@ -33,9 +33,11 @@ func newWSCon(t *testing.T) *websocket.Conn { // subscribe to an event func subscribe(t *testing.T, con *websocket.Conn, eventid string) { - err := con.WriteJSON(rpctypes.WSRequest{ - Type: "subscribe", - Event: eventid, + err := con.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + Id: "", + Method: "subscribe", + Params: []interface{}{eventid}, }) if err != nil { t.Fatal(err) @@ -44,9 +46,11 @@ func subscribe(t *testing.T, con *websocket.Conn, eventid string) { // unsubscribe from an event func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) { - err := con.WriteJSON(rpctypes.WSRequest{ - Type: "unsubscribe", - Event: eventid, + err := con.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + Id: "", + Method: "unsubscribe", + Params: []interface{}{eventid}, }) if err != nil { t.Fatal(err) @@ -87,13 +91,13 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou // if the event id isnt what we're waiting on // ignore it var response struct { - Event string `json:"event"` + Result rpctypes.RPCEventResult `json:"result"` } if err := json.Unmarshal(p, &response); err != nil { errCh <- err break } - if response.Event == eventid { + if response.Result.Event == eventid { goodCh <- p break } @@ -136,9 +140,13 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { // unmarshall and assert somethings var response struct { - Event string `json:"event"` - Data *types.Block `json:"data"` - Error string `json:"error"` + JSONRPC string `json:"jsonrpc"` + Id string `json:"id"` + Result struct { + Event string `json:"event"` + Data *types.Block `json:"data"` + } `json:"result"` + Error string `json:"error"` } var err error binary.ReadJSON(&response, b, &err) @@ -148,7 +156,7 @@ func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { if response.Error != "" { return nil, fmt.Errorf(response.Error) } - block := response.Data + block := response.Result.Data return block, nil } @@ -177,9 +185,13 @@ func unmarshalValidateSend(amt int64, toAddr []byte) func(string, []byte) error return func(eid string, b []byte) error { // unmarshal and assert correctness var response struct { - Event string `json:"event"` - Data types.SendTx `json:"data"` - Error string `json:"error"` + JSONRPC string `json:"jsonrpc"` + Id string `json:"id"` + Result struct { + Event string `json:"event"` + Data *types.SendTx `json:"data"` + } `json:"result"` + Error string `json:"error"` } var err error binary.ReadJSON(&response, b, &err) @@ -189,10 +201,10 @@ func unmarshalValidateSend(amt int64, toAddr []byte) func(string, []byte) error if response.Error != "" { return fmt.Errorf(response.Error) } - if eid != response.Event { - return fmt.Errorf("Eventid is not correct. Got %s, expected %s", response.Event, eid) + if eid != response.Result.Event { + return fmt.Errorf("Eventid is not correct. Got %s, expected %s", response.Result.Event, eid) } - tx := response.Data + tx := response.Result.Data if bytes.Compare(tx.Inputs[0].Address, user[0].Address) != 0 { return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Inputs[0].Address, user[0].Address) } @@ -210,12 +222,16 @@ func unmarshalValidateCall(amt int64, returnCode []byte) func(string, []byte) er return func(eid string, b []byte) error { // unmarshall and assert somethings var response struct { - Event string `json:"event"` - Data struct { - Tx types.CallTx `json:"tx"` - Return []byte `json:"return"` - Exception string `json:"exception"` - } `json:"data"` + JSONRPC string `json:"jsonrpc"` + Id string `json:"id"` + Result struct { + Event string `json:"event"` + Data struct { + Tx types.CallTx `json:"tx"` + Return []byte `json:"return"` + Exception string `json:"exception"` + } `json:"data"` + } `json:"result"` Error string `json:"error"` } var err error @@ -226,17 +242,17 @@ func unmarshalValidateCall(amt int64, returnCode []byte) func(string, []byte) er if response.Error != "" { return fmt.Errorf(response.Error) } - if response.Data.Exception != "" { - return fmt.Errorf(response.Data.Exception) + if response.Result.Data.Exception != "" { + return fmt.Errorf(response.Result.Data.Exception) } - tx := response.Data.Tx + tx := response.Result.Data.Tx if bytes.Compare(tx.Input.Address, user[0].Address) != 0 { return fmt.Errorf("Senders do not match up! Got %x, expected %x", tx.Input.Address, user[0].Address) } if tx.Input.Amount != amt { return fmt.Errorf("Amt does not match up! Got %d, expected %d", tx.Input.Amount, amt) } - ret := response.Data.Return + ret := response.Result.Data.Return if bytes.Compare(ret, returnCode) != 0 { return fmt.Errorf("Call did not return correctly. Got %x, expected %x", ret, returnCode) } @@ -248,9 +264,13 @@ func unmarshalValidateCallCall(origin, returnCode []byte, txid *[]byte) func(str return func(eid string, b []byte) error { // unmarshall and assert somethings var response struct { - Event string `json:"event"` - Data types.EventMsgCall `json:"data"` - Error string `json:"error"` + JSONRPC string `json:"jsonrpc"` + Id string `json:"id"` + Result struct { + Event string `json:"event"` + Data types.EventMsgCall `json:"data"` + } `json:"result"` + Error string `json:"error"` } var err error binary.ReadJSON(&response, b, &err) @@ -260,18 +280,18 @@ func unmarshalValidateCallCall(origin, returnCode []byte, txid *[]byte) func(str if response.Error != "" { return fmt.Errorf(response.Error) } - if response.Data.Exception != "" { - return fmt.Errorf(response.Data.Exception) + if response.Result.Data.Exception != "" { + return fmt.Errorf(response.Result.Data.Exception) } - if bytes.Compare(response.Data.Origin, origin) != 0 { - return fmt.Errorf("Origin does not match up! Got %x, expected %x", response.Data.Origin, origin) + if bytes.Compare(response.Result.Data.Origin, origin) != 0 { + return fmt.Errorf("Origin does not match up! Got %x, expected %x", response.Result.Data.Origin, origin) } - ret := response.Data.Return + ret := response.Result.Data.Return if bytes.Compare(ret, returnCode) != 0 { return fmt.Errorf("Call did not return correctly. Got %x, expected %x", ret, returnCode) } - if bytes.Compare(response.Data.TxID, *txid) != 0 { - return fmt.Errorf("TxIDs do not match up! Got %x, expected %x", response.Data.TxID, *txid) + if bytes.Compare(response.Result.Data.TxID, *txid) != 0 { + return fmt.Errorf("TxIDs do not match up! Got %x, expected %x", response.Result.Data.TxID, *txid) } return nil } diff --git a/rpc/types/types.go b/rpc/types/types.go index f3c14bbd8..47cea1c57 100644 --- a/rpc/types/types.go +++ b/rpc/types/types.go @@ -2,39 +2,32 @@ package rpctypes type RPCRequest struct { JSONRPC string `json:"jsonrpc"` + Id string `json:"id"` Method string `json:"method"` Params []interface{} `json:"params"` - Id int `json:"id"` } type RPCResponse struct { + JSONRPC string `json:"jsonrpc"` + Id string `json:"id"` Result interface{} `json:"result"` Error string `json:"error"` - Id string `json:"id"` - JSONRPC string `json:"jsonrpc"` } -func NewRPCResponse(res interface{}, err string) RPCResponse { +func NewRPCResponse(id string, res interface{}, err string) RPCResponse { if res == nil { res = struct{}{} } return RPCResponse{ + JSONRPC: "2.0", + Id: id, Result: res, Error: err, - Id: "", - JSONRPC: "2.0", } } -// for requests coming in -type WSRequest struct { - Type string `json:"type"` // subscribe or unsubscribe - Event string `json:"event"` -} - -// for responses going out -type WSResponse struct { +// Goes in the Result field of an RPCResponse. +type RPCEventResult struct { Event string `json:"event"` Data interface{} `json:"data"` - Error string `json:"error"` }