From 2b3bedead87fc60d83a7d5e2240df539594e666d Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 2 Apr 2015 14:30:30 -0700 Subject: [PATCH] rpc: websockets --- rpc/handlers.go | 191 +++++++++++++++++++++++++++++++++++++++++---- rpc/http_server.go | 6 +- 2 files changed, 181 insertions(+), 16 deletions(-) diff --git a/rpc/handlers.go b/rpc/handlers.go index 732164013..9fc1f7874 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -5,21 +5,21 @@ TODO: support Call && GetStorage. */ import ( + "bytes" "encoding/json" "fmt" "github.com/tendermint/tendermint/binary" + "github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/rpc/core" + "golang.org/x/net/websocket" "io/ioutil" "net/http" "reflect" "runtime" "strings" + "time" ) -// maps camel-case function names to lower case rpc version -// populated by calls to funcWrap -var reverseFuncMap = make(map[string]string) - // cache all type information about each function up front // (func, responseStruct, argNames) var funcMap = map[string]*FuncWrapper{ @@ -38,24 +38,36 @@ var funcMap = map[string]*FuncWrapper{ "unsafe/sign_tx": funcWrap(core.SignTx, []string{"tx", "privAccounts"}), } -func initHandlers() { - // HTTP endpoints - for funcName, funcInfo := range funcMap { - http.HandleFunc("/"+funcName, toHTTPHandler(funcInfo)) - } - - // JSONRPC endpoints - http.HandleFunc("/", JSONRPCHandler) +// maps camel-case function names to lower case rpc version +// populated by calls to funcWrap +var reverseFuncMap = fillReverseFuncMap() - // fill the map from camelcase to lowercase +// fill the map from camelcase to lowercase +func fillReverseFuncMap() map[string]string { + fMap := make(map[string]string) for name, f := range funcMap { camelName := runtime.FuncForPC(f.f.Pointer()).Name() spl := strings.Split(camelName, ".") if len(spl) > 1 { camelName = spl[len(spl)-1] } - reverseFuncMap[camelName] = name + fMap[camelName] = name + } + return fMap +} + +func initHandlers(ew *events.EventSwitch) { + // HTTP endpoints + for funcName, funcInfo := range funcMap { + http.HandleFunc("/"+funcName, toHTTPHandler(funcInfo)) } + + // JSONRPC endpoints + http.HandleFunc("/", JSONRPCHandler) + + w := NewWebsocketManager(ew) + // websocket endpoint + http.Handle("/events", websocket.Handler(w.eventsHandler)) } //------------------------------------- @@ -212,6 +224,157 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { // rpc.http //----------------------------------------------------------------------------- +// rpc.websocket + +// main manager for all websocket connections +// holds the event switch +type WebsocketManager struct { + ew *events.EventSwitch + cons map[string]*Connection +} + +func NewWebsocketManager(ew *events.EventSwitch) *WebsocketManager { + return &WebsocketManager{ + ew: ew, + cons: make(map[string]*Connection), + } +} + +func (w *WebsocketManager) eventsHandler(con *websocket.Conn) { + // register connection + c := NewConnection(con) + w.cons[con.RemoteAddr().String()] = c + + // read subscriptions/unsubscriptions to events + go w.read(c) + // write responses + go w.write(c) +} + +const ( + WsConnectionReaperSeconds = 5 + MaxFailedSendsSeconds = 10 + WriteChanBuffer = 10 +) + +// read from the socket and subscribe to or unsubscribe from events +func (w *WebsocketManager) read(con *Connection) { + reaper := time.Tick(time.Second * WsConnectionReaperSeconds) + for { + select { + case <-reaper: + if con.failedSends > MaxFailedSendsSeconds { + // sending has failed too many times. + // kill the connection + con.quitChan <- struct{}{} + } + default: + var in []byte + if err := websocket.Message.Receive(con.wsCon, &in); err != nil { + // an error reading the connection, + // so kill the connection + con.quitChan <- struct{}{} + } + var req WsRequest + err := json.Unmarshal(in, &req) + if err != nil { + errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) + con.writeChan <- WsResponse{Error: errStr} + } + switch req.Type { + case "subscribe": + w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) { + resp := WsResponse{ + Event: req.Event, + Data: msg, + } + select { + case con.writeChan <- resp: + // yay + con.failedSends = 0 + default: + // channel is full + // if this happens too many times, + // close connection + con.failedSends += 1 + } + }) + case "unsubscribe": + if req.Event != "" { + w.ew.RemoveListenerForEvent(req.Event, con.id) + } else { + w.ew.RemoveListener(con.id) + } + default: + con.writeChan <- WsResponse{Error: "Unknown request type: " + req.Type} + } + + } + } +} + +// receives on a write channel and writes out to the socket +func (w *WebsocketManager) write(con *Connection) { + n, err := new(int64), new(error) + for { + select { + case msg := <-con.writeChan: + buf := new(bytes.Buffer) + binary.WriteJSON(msg, buf, n, err) + if *err != nil { + log.Error("Failed to write JSON WsResponse", "error", err) + } else { + websocket.Message.Send(con.wsCon, buf.Bytes()) + } + case <-con.quitChan: + close(con.quitChan) + con.Close() + return + } + } +} + +// a single websocket connection +// contains the listeners id +type Connection struct { + id string + wsCon *websocket.Conn + writeChan chan WsResponse + quitChan chan struct{} + failedSends uint +} + +// for requests coming in +type WsRequest struct { + Type string // subscribe or unsubscribe + Event string +} + +// for responses going out +type WsResponse struct { + Event string + Data interface{} + Error string +} + +// new websocket connection wrapper +func NewConnection(con *websocket.Conn) *Connection { + return &Connection{ + id: con.RemoteAddr().String(), + wsCon: con, + writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full + } +} + +// close the channel +// should only be called by firing on c.quitChan +func (c *Connection) Close() { + close(c.writeChan) + c.wsCon.Close() +} + +// rpc.websocket +//----------------------------------------------------------------------------- // returns is Response struct and error. If error is not nil, return it func unreflectResponse(returns []reflect.Value) (interface{}, error) { diff --git a/rpc/http_server.go b/rpc/http_server.go index 47770f3d8..7de0daf81 100644 --- a/rpc/http_server.go +++ b/rpc/http_server.go @@ -12,15 +12,17 @@ import ( "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/events" ) -func StartHTTPServer() { - initHandlers() +func StartHTTPServer(ew *events.EventSwitch) { + initHandlers(ew) log.Info(Fmt("Starting RPC HTTP server on %s", config.App().GetString("RPC.HTTP.ListenAddr"))) go func() { log.Crit("RPC HTTPServer stopped", "result", http.ListenAndServe(config.App().GetString("RPC.HTTP.ListenAddr"), RecoverAndLogHandler(http.DefaultServeMux))) }() + } func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) {