From 6607232a5daf1b474c4578836750e71943793c48 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 18 Feb 2016 21:07:49 +0000 Subject: [PATCH] add support for unix sockets --- client/http_client.go | 83 +++++++++++++++++++++--------- rpc_test.go | 114 ++++++++++++++++++++++++++++++++++++++++++ server/http_server.go | 9 ++-- types/types.go | 14 ++++++ 4 files changed, 194 insertions(+), 26 deletions(-) create mode 100644 rpc_test.go diff --git a/client/http_client.go b/client/http_client.go index 6bb746d2e..100507196 100644 --- a/client/http_client.go +++ b/client/http_client.go @@ -5,45 +5,63 @@ import ( "encoding/json" "errors" "io/ioutil" + "net" "net/http" "net/url" - "strings" . "github.com/tendermint/go-common" "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" ) -// JSON rpc takes params as a slice -type ClientJSONRPC struct { - remote string +// Set the net.Dial manually so we can do http over tcp or unix. +// Get/Post require a dummyDomain but it's over written by the Transport +var dummyDomain = "http://dummyDomain/" + +func unixDial(remote string) func(string, string) (net.Conn, error) { + return func(proto, addr string) (conn net.Conn, err error) { + return net.Dial("unix", remote) + } } -func NewClientJSONRPC(remote string) *ClientJSONRPC { - return &ClientJSONRPC{remote} +func tcpDial(remote string) func(string, string) (net.Conn, error) { + return func(proto, addr string) (conn net.Conn, err error) { + return net.Dial("tcp", remote) + } } -func (c *ClientJSONRPC) Call(method string, params []interface{}, result interface{}) (interface{}, error) { - return CallHTTP_JSONRPC(c.remote, method, params, result) +func socketTransport(remote string) *http.Transport { + if rpctypes.SocketType(remote) == "unix" { + return &http.Transport{ + Dial: unixDial(remote), + } + } else { + return &http.Transport{ + Dial: tcpDial(remote), + } + } } -// URI takes params as a map -type ClientURI struct { +//------------------------------------------------------------------------------------ + +// JSON rpc takes params as a slice +type ClientJSONRPC struct { remote string + client *http.Client } -func NewClientURI(remote string) *ClientURI { - if !strings.HasSuffix(remote, "/") { - remote = remote + "/" +func NewClientJSONRPC(remote string) *ClientJSONRPC { + return &ClientJSONRPC{ + remote: remote, + client: &http.Client{Transport: socketTransport(remote)}, } - return &ClientURI{remote} } -func (c *ClientURI) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { - return CallHTTP_URI(c.remote, method, params, result) +func (c *ClientJSONRPC) Call(method string, params []interface{}, result interface{}) (interface{}, error) { + return c.call(method, params, result) } -func CallHTTP_JSONRPC(remote string, method string, params []interface{}, result interface{}) (interface{}, error) { +func (c *ClientJSONRPC) call(method string, params []interface{}, result interface{}) (interface{}, error) { // Make request and get responseBytes request := rpctypes.RPCRequest{ JSONRPC: "2.0", @@ -53,8 +71,8 @@ func CallHTTP_JSONRPC(remote string, method string, params []interface{}, result } requestBytes := wire.JSONBytes(request) requestBuf := bytes.NewBuffer(requestBytes) - log.Info(Fmt("RPC request to %v (%v): %v", remote, method, string(requestBytes))) - httpResponse, err := http.Post(remote, "text/json", requestBuf) + log.Info(Fmt("RPC request to %v (%v): %v", c.remote, method, string(requestBytes))) + httpResponse, err := c.client.Post(dummyDomain, "text/json", requestBuf) if err != nil { return nil, err } @@ -63,17 +81,36 @@ func CallHTTP_JSONRPC(remote string, method string, params []interface{}, result if err != nil { return nil, err } - log.Info(Fmt("RPC response: %v", string(responseBytes))) + // log.Info(Fmt("RPC response: %v", string(responseBytes))) return unmarshalResponseBytes(responseBytes, result) } -func CallHTTP_URI(remote string, method string, params map[string]interface{}, result interface{}) (interface{}, error) { +//------------------------------------------------------------- + +// URI takes params as a map +type ClientURI struct { + remote string + client *http.Client +} + +func NewClientURI(remote string) *ClientURI { + return &ClientURI{ + remote: remote, + client: &http.Client{Transport: socketTransport(remote)}, + } +} + +func (c *ClientURI) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { + return c.call(method, params, result) +} + +func (c *ClientURI) call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { values, err := argsToURLValues(params) if err != nil { return nil, err } - log.Info(Fmt("URI request to %v (%v): %v", remote, method, values)) - resp, err := http.PostForm(remote+method, values) + log.Info(Fmt("URI request to %v (%v): %v", c.remote, method, values)) + resp, err := c.client.PostForm(dummyDomain+method, values) if err != nil { return nil, err } diff --git a/rpc_test.go b/rpc_test.go new file mode 100644 index 000000000..f81bf1a16 --- /dev/null +++ b/rpc_test.go @@ -0,0 +1,114 @@ +package rpc + +import ( + "net/http" + "testing" + "time" + + "github.com/tendermint/go-rpc/client" + "github.com/tendermint/go-rpc/server" + "github.com/tendermint/go-wire" +) + +// Client and Server should work over tcp or unix sockets +var ( + tcpAddr = "0.0.0.0:46657" + unixAddr = "/tmp/go-rpc.sock" // NOTE: must remove file for test to run again +) + +// Define a type for results and register concrete versions +type Result interface{} + +type ResultStatus struct { + Value string +} + +var _ = wire.RegisterInterface( + struct{ Result }{}, + wire.ConcreteType{&ResultStatus{}, 0x1}, +) + +// Define some routes +var Routes = map[string]*rpcserver.RPCFunc{ + "status": rpcserver.NewRPCFunc(StatusResult, "arg"), +} + +// an rpc function +func StatusResult(v string) (Result, error) { + return &ResultStatus{v}, nil +} + +// launch unix and tcp servers +func init() { + mux := http.NewServeMux() + rpcserver.RegisterRPCFuncs(mux, Routes) + go func() { + _, err := rpcserver.StartHTTPServer(tcpAddr, mux) + if err != nil { + panic(err) + } + }() + + mux = http.NewServeMux() + rpcserver.RegisterRPCFuncs(mux, Routes) + go func() { + _, err := rpcserver.StartHTTPServer(unixAddr, mux) + if err != nil { + panic(err) + } + }() + + // wait for servers to start + time.Sleep(time.Second * 2) + +} + +func testURI(t *testing.T, cl *rpcclient.ClientURI) { + val := "acbd" + params := map[string]interface{}{ + "arg": val, + } + var result Result + _, err := cl.Call("status", params, &result) + if err != nil { + t.Fatal(err) + } + got := result.(*ResultStatus).Value + if got != val { + t.Fatalf("Got: %v .... Expected: %v \n", got, val) + } +} + +func testJSONRPC(t *testing.T, cl *rpcclient.ClientJSONRPC) { + val := "acbd" + params := []interface{}{val} + var result Result + _, err := cl.Call("status", params, &result) + if err != nil { + t.Fatal(err) + } + got := result.(*ResultStatus).Value + if got != val { + t.Fatalf("Got: %v .... Expected: %v \n", got, val) + } +} + +func TestURI_TCP(t *testing.T) { + cl := rpcclient.NewClientURI(tcpAddr) + testURI(t, cl) +} + +func TestURI_UNIX(t *testing.T) { + cl := rpcclient.NewClientURI(unixAddr) + testURI(t, cl) +} + +func TestJSONRPC_TCP(t *testing.T) { + cl := rpcclient.NewClientJSONRPC(tcpAddr) + testJSONRPC(t, cl) +} + +func TestJSONRPC_UNIX(t *testing.T) { + cl := rpcclient.NewClientJSONRPC(unixAddr) + testJSONRPC(t, cl) +} diff --git a/server/http_server.go b/server/http_server.go index 1271d073b..beec9bcc2 100644 --- a/server/http_server.go +++ b/server/http_server.go @@ -17,11 +17,14 @@ import ( ) func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, error) { - log.Notice(Fmt("Starting RPC HTTP server on %v", listenAddr)) - listener, err := net.Listen("tcp", listenAddr) + // listenAddr is `IP:PORT` or /path/to/socket + socketType := SocketType(listenAddr) + log.Notice(Fmt("Starting RPC HTTP server on %s socket %v", socketType, listenAddr)) + listener, err := net.Listen(socketType, listenAddr) if err != nil { - return nil, fmt.Errorf("Failed to listen to %v", listenAddr) + return nil, fmt.Errorf("Failed to listen to %v: %v", listenAddr, err) } + go func() { res := http.Serve( listener, diff --git a/types/types.go b/types/types.go index a35e3dae8..d7461f4e1 100644 --- a/types/types.go +++ b/types/types.go @@ -2,6 +2,7 @@ package rpctypes import ( "encoding/json" + "strings" "github.com/tendermint/go-events" "github.com/tendermint/go-wire" @@ -77,3 +78,16 @@ type WSRPCContext struct { Request RPCRequest WSRPCConnection } + +//---------------------------------------- +// sockets +// +// Determine if its a unix or tcp socket. +// If tcp, must specify the port; `0.0.0.0` will return incorrectly as "unix" since there's no port +func SocketType(listenAddr string) string { + socketType := "unix" + if len(strings.Split(listenAddr, ":")) == 2 { + socketType = "tcp" + } + return socketType +}