Browse Source

add support for unix sockets

pull/456/head
Ethan Buchman 9 years ago
parent
commit
6607232a5d
4 changed files with 194 additions and 26 deletions
  1. +60
    -23
      client/http_client.go
  2. +114
    -0
      rpc_test.go
  3. +6
    -3
      server/http_server.go
  4. +14
    -0
      types/types.go

+ 60
- 23
client/http_client.go View File

@ -5,45 +5,63 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"net/url" "net/url"
"strings"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-rpc/types" "github.com/tendermint/go-rpc/types"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
) )
// JSON rpc takes params as a slice
type ClientJSONRPC struct {
remote string
// Set the net.Dial manually so we can do http over tcp or unix.
// Get/Post require a dummyDomain but it's over written by the Transport
var dummyDomain = "http://dummyDomain/"
func unixDial(remote string) func(string, string) (net.Conn, error) {
return func(proto, addr string) (conn net.Conn, err error) {
return net.Dial("unix", remote)
}
} }
func NewClientJSONRPC(remote string) *ClientJSONRPC {
return &ClientJSONRPC{remote}
func tcpDial(remote string) func(string, string) (net.Conn, error) {
return func(proto, addr string) (conn net.Conn, err error) {
return net.Dial("tcp", remote)
}
} }
func (c *ClientJSONRPC) Call(method string, params []interface{}, result interface{}) (interface{}, error) {
return CallHTTP_JSONRPC(c.remote, method, params, result)
func socketTransport(remote string) *http.Transport {
if rpctypes.SocketType(remote) == "unix" {
return &http.Transport{
Dial: unixDial(remote),
}
} else {
return &http.Transport{
Dial: tcpDial(remote),
}
}
} }
// URI takes params as a map
type ClientURI struct {
//------------------------------------------------------------------------------------
// JSON rpc takes params as a slice
type ClientJSONRPC struct {
remote string remote string
client *http.Client
} }
func NewClientURI(remote string) *ClientURI {
if !strings.HasSuffix(remote, "/") {
remote = remote + "/"
func NewClientJSONRPC(remote string) *ClientJSONRPC {
return &ClientJSONRPC{
remote: remote,
client: &http.Client{Transport: socketTransport(remote)},
} }
return &ClientURI{remote}
} }
func (c *ClientURI) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
return CallHTTP_URI(c.remote, method, params, result)
func (c *ClientJSONRPC) Call(method string, params []interface{}, result interface{}) (interface{}, error) {
return c.call(method, params, result)
} }
func CallHTTP_JSONRPC(remote string, method string, params []interface{}, result interface{}) (interface{}, error) {
func (c *ClientJSONRPC) call(method string, params []interface{}, result interface{}) (interface{}, error) {
// Make request and get responseBytes // Make request and get responseBytes
request := rpctypes.RPCRequest{ request := rpctypes.RPCRequest{
JSONRPC: "2.0", JSONRPC: "2.0",
@ -53,8 +71,8 @@ func CallHTTP_JSONRPC(remote string, method string, params []interface{}, result
} }
requestBytes := wire.JSONBytes(request) requestBytes := wire.JSONBytes(request)
requestBuf := bytes.NewBuffer(requestBytes) requestBuf := bytes.NewBuffer(requestBytes)
log.Info(Fmt("RPC request to %v (%v): %v", remote, method, string(requestBytes)))
httpResponse, err := http.Post(remote, "text/json", requestBuf)
log.Info(Fmt("RPC request to %v (%v): %v", c.remote, method, string(requestBytes)))
httpResponse, err := c.client.Post(dummyDomain, "text/json", requestBuf)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -63,17 +81,36 @@ func CallHTTP_JSONRPC(remote string, method string, params []interface{}, result
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.Info(Fmt("RPC response: %v", string(responseBytes)))
// log.Info(Fmt("RPC response: %v", string(responseBytes)))
return unmarshalResponseBytes(responseBytes, result) return unmarshalResponseBytes(responseBytes, result)
} }
func CallHTTP_URI(remote string, method string, params map[string]interface{}, result interface{}) (interface{}, error) {
//-------------------------------------------------------------
// URI takes params as a map
type ClientURI struct {
remote string
client *http.Client
}
func NewClientURI(remote string) *ClientURI {
return &ClientURI{
remote: remote,
client: &http.Client{Transport: socketTransport(remote)},
}
}
func (c *ClientURI) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
return c.call(method, params, result)
}
func (c *ClientURI) call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
values, err := argsToURLValues(params) values, err := argsToURLValues(params)
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.Info(Fmt("URI request to %v (%v): %v", remote, method, values))
resp, err := http.PostForm(remote+method, values)
log.Info(Fmt("URI request to %v (%v): %v", c.remote, method, values))
resp, err := c.client.PostForm(dummyDomain+method, values)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 114
- 0
rpc_test.go View File

@ -0,0 +1,114 @@
package rpc
import (
"net/http"
"testing"
"time"
"github.com/tendermint/go-rpc/client"
"github.com/tendermint/go-rpc/server"
"github.com/tendermint/go-wire"
)
// Client and Server should work over tcp or unix sockets
var (
tcpAddr = "0.0.0.0:46657"
unixAddr = "/tmp/go-rpc.sock" // NOTE: must remove file for test to run again
)
// Define a type for results and register concrete versions
type Result interface{}
type ResultStatus struct {
Value string
}
var _ = wire.RegisterInterface(
struct{ Result }{},
wire.ConcreteType{&ResultStatus{}, 0x1},
)
// Define some routes
var Routes = map[string]*rpcserver.RPCFunc{
"status": rpcserver.NewRPCFunc(StatusResult, "arg"),
}
// an rpc function
func StatusResult(v string) (Result, error) {
return &ResultStatus{v}, nil
}
// launch unix and tcp servers
func init() {
mux := http.NewServeMux()
rpcserver.RegisterRPCFuncs(mux, Routes)
go func() {
_, err := rpcserver.StartHTTPServer(tcpAddr, mux)
if err != nil {
panic(err)
}
}()
mux = http.NewServeMux()
rpcserver.RegisterRPCFuncs(mux, Routes)
go func() {
_, err := rpcserver.StartHTTPServer(unixAddr, mux)
if err != nil {
panic(err)
}
}()
// wait for servers to start
time.Sleep(time.Second * 2)
}
func testURI(t *testing.T, cl *rpcclient.ClientURI) {
val := "acbd"
params := map[string]interface{}{
"arg": val,
}
var result Result
_, err := cl.Call("status", params, &result)
if err != nil {
t.Fatal(err)
}
got := result.(*ResultStatus).Value
if got != val {
t.Fatalf("Got: %v .... Expected: %v \n", got, val)
}
}
func testJSONRPC(t *testing.T, cl *rpcclient.ClientJSONRPC) {
val := "acbd"
params := []interface{}{val}
var result Result
_, err := cl.Call("status", params, &result)
if err != nil {
t.Fatal(err)
}
got := result.(*ResultStatus).Value
if got != val {
t.Fatalf("Got: %v .... Expected: %v \n", got, val)
}
}
func TestURI_TCP(t *testing.T) {
cl := rpcclient.NewClientURI(tcpAddr)
testURI(t, cl)
}
func TestURI_UNIX(t *testing.T) {
cl := rpcclient.NewClientURI(unixAddr)
testURI(t, cl)
}
func TestJSONRPC_TCP(t *testing.T) {
cl := rpcclient.NewClientJSONRPC(tcpAddr)
testJSONRPC(t, cl)
}
func TestJSONRPC_UNIX(t *testing.T) {
cl := rpcclient.NewClientJSONRPC(unixAddr)
testJSONRPC(t, cl)
}

+ 6
- 3
server/http_server.go View File

@ -17,11 +17,14 @@ import (
) )
func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, error) { func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, error) {
log.Notice(Fmt("Starting RPC HTTP server on %v", listenAddr))
listener, err := net.Listen("tcp", listenAddr)
// listenAddr is `IP:PORT` or /path/to/socket
socketType := SocketType(listenAddr)
log.Notice(Fmt("Starting RPC HTTP server on %s socket %v", socketType, listenAddr))
listener, err := net.Listen(socketType, listenAddr)
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to listen to %v", listenAddr)
return nil, fmt.Errorf("Failed to listen to %v: %v", listenAddr, err)
} }
go func() { go func() {
res := http.Serve( res := http.Serve(
listener, listener,


+ 14
- 0
types/types.go View File

@ -2,6 +2,7 @@ package rpctypes
import ( import (
"encoding/json" "encoding/json"
"strings"
"github.com/tendermint/go-events" "github.com/tendermint/go-events"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
@ -77,3 +78,16 @@ type WSRPCContext struct {
Request RPCRequest Request RPCRequest
WSRPCConnection WSRPCConnection
} }
//----------------------------------------
// sockets
//
// Determine if its a unix or tcp socket.
// If tcp, must specify the port; `0.0.0.0` will return incorrectly as "unix" since there's no port
func SocketType(listenAddr string) string {
socketType := "unix"
if len(strings.Split(listenAddr, ":")) == 2 {
socketType = "tcp"
}
return socketType
}

Loading…
Cancel
Save