Browse Source

rpc: websockets

pull/46/head
Ethan Buchman 10 years ago
parent
commit
2b3bedead8
2 changed files with 181 additions and 16 deletions
  1. +177
    -14
      rpc/handlers.go
  2. +4
    -2
      rpc/http_server.go

+ 177
- 14
rpc/handlers.go View File

@ -5,21 +5,21 @@ TODO: support Call && GetStorage.
*/ */
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/rpc/core" "github.com/tendermint/tendermint/rpc/core"
"golang.org/x/net/websocket"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"reflect" "reflect"
"runtime" "runtime"
"strings" "strings"
"time"
) )
// maps camel-case function names to lower case rpc version
// populated by calls to funcWrap
var reverseFuncMap = make(map[string]string)
// cache all type information about each function up front // cache all type information about each function up front
// (func, responseStruct, argNames) // (func, responseStruct, argNames)
var funcMap = map[string]*FuncWrapper{ var funcMap = map[string]*FuncWrapper{
@ -38,24 +38,36 @@ var funcMap = map[string]*FuncWrapper{
"unsafe/sign_tx": funcWrap(core.SignTx, []string{"tx", "privAccounts"}), "unsafe/sign_tx": funcWrap(core.SignTx, []string{"tx", "privAccounts"}),
} }
func initHandlers() {
// HTTP endpoints
for funcName, funcInfo := range funcMap {
http.HandleFunc("/"+funcName, toHTTPHandler(funcInfo))
}
// JSONRPC endpoints
http.HandleFunc("/", JSONRPCHandler)
// maps camel-case function names to lower case rpc version
// populated by calls to funcWrap
var reverseFuncMap = fillReverseFuncMap()
// fill the map from camelcase to lowercase
// fill the map from camelcase to lowercase
func fillReverseFuncMap() map[string]string {
fMap := make(map[string]string)
for name, f := range funcMap { for name, f := range funcMap {
camelName := runtime.FuncForPC(f.f.Pointer()).Name() camelName := runtime.FuncForPC(f.f.Pointer()).Name()
spl := strings.Split(camelName, ".") spl := strings.Split(camelName, ".")
if len(spl) > 1 { if len(spl) > 1 {
camelName = spl[len(spl)-1] camelName = spl[len(spl)-1]
} }
reverseFuncMap[camelName] = name
fMap[camelName] = name
}
return fMap
}
func initHandlers(ew *events.EventSwitch) {
// HTTP endpoints
for funcName, funcInfo := range funcMap {
http.HandleFunc("/"+funcName, toHTTPHandler(funcInfo))
} }
// JSONRPC endpoints
http.HandleFunc("/", JSONRPCHandler)
w := NewWebsocketManager(ew)
// websocket endpoint
http.Handle("/events", websocket.Handler(w.eventsHandler))
} }
//------------------------------------- //-------------------------------------
@ -212,6 +224,157 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
// rpc.http // rpc.http
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// rpc.websocket
// main manager for all websocket connections
// holds the event switch
type WebsocketManager struct {
ew *events.EventSwitch
cons map[string]*Connection
}
func NewWebsocketManager(ew *events.EventSwitch) *WebsocketManager {
return &WebsocketManager{
ew: ew,
cons: make(map[string]*Connection),
}
}
func (w *WebsocketManager) eventsHandler(con *websocket.Conn) {
// register connection
c := NewConnection(con)
w.cons[con.RemoteAddr().String()] = c
// read subscriptions/unsubscriptions to events
go w.read(c)
// write responses
go w.write(c)
}
const (
WsConnectionReaperSeconds = 5
MaxFailedSendsSeconds = 10
WriteChanBuffer = 10
)
// read from the socket and subscribe to or unsubscribe from events
func (w *WebsocketManager) read(con *Connection) {
reaper := time.Tick(time.Second * WsConnectionReaperSeconds)
for {
select {
case <-reaper:
if con.failedSends > MaxFailedSendsSeconds {
// sending has failed too many times.
// kill the connection
con.quitChan <- struct{}{}
}
default:
var in []byte
if err := websocket.Message.Receive(con.wsCon, &in); err != nil {
// an error reading the connection,
// so kill the connection
con.quitChan <- struct{}{}
}
var req WsRequest
err := json.Unmarshal(in, &req)
if err != nil {
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
con.writeChan <- WsResponse{Error: errStr}
}
switch req.Type {
case "subscribe":
w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
resp := WsResponse{
Event: req.Event,
Data: msg,
}
select {
case con.writeChan <- resp:
// yay
con.failedSends = 0
default:
// channel is full
// if this happens too many times,
// close connection
con.failedSends += 1
}
})
case "unsubscribe":
if req.Event != "" {
w.ew.RemoveListenerForEvent(req.Event, con.id)
} else {
w.ew.RemoveListener(con.id)
}
default:
con.writeChan <- WsResponse{Error: "Unknown request type: " + req.Type}
}
}
}
}
// receives on a write channel and writes out to the socket
func (w *WebsocketManager) write(con *Connection) {
n, err := new(int64), new(error)
for {
select {
case msg := <-con.writeChan:
buf := new(bytes.Buffer)
binary.WriteJSON(msg, buf, n, err)
if *err != nil {
log.Error("Failed to write JSON WsResponse", "error", err)
} else {
websocket.Message.Send(con.wsCon, buf.Bytes())
}
case <-con.quitChan:
close(con.quitChan)
con.Close()
return
}
}
}
// a single websocket connection
// contains the listeners id
type Connection struct {
id string
wsCon *websocket.Conn
writeChan chan WsResponse
quitChan chan struct{}
failedSends uint
}
// for requests coming in
type WsRequest struct {
Type string // subscribe or unsubscribe
Event string
}
// for responses going out
type WsResponse struct {
Event string
Data interface{}
Error string
}
// new websocket connection wrapper
func NewConnection(con *websocket.Conn) *Connection {
return &Connection{
id: con.RemoteAddr().String(),
wsCon: con,
writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full
}
}
// close the channel
// should only be called by firing on c.quitChan
func (c *Connection) Close() {
close(c.writeChan)
c.wsCon.Close()
}
// rpc.websocket
//-----------------------------------------------------------------------------
// returns is Response struct and error. If error is not nil, return it // returns is Response struct and error. If error is not nil, return it
func unreflectResponse(returns []reflect.Value) (interface{}, error) { func unreflectResponse(returns []reflect.Value) (interface{}, error) {


+ 4
- 2
rpc/http_server.go View File

@ -12,15 +12,17 @@ import (
"github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/events"
) )
func StartHTTPServer() {
initHandlers()
func StartHTTPServer(ew *events.EventSwitch) {
initHandlers(ew)
log.Info(Fmt("Starting RPC HTTP server on %s", config.App().GetString("RPC.HTTP.ListenAddr"))) log.Info(Fmt("Starting RPC HTTP server on %s", config.App().GetString("RPC.HTTP.ListenAddr")))
go func() { go func() {
log.Crit("RPC HTTPServer stopped", "result", http.ListenAndServe(config.App().GetString("RPC.HTTP.ListenAddr"), RecoverAndLogHandler(http.DefaultServeMux))) log.Crit("RPC HTTPServer stopped", "result", http.ListenAndServe(config.App().GetString("RPC.HTTP.ListenAddr"), RecoverAndLogHandler(http.DefaultServeMux)))
}() }()
} }
func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) { func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) {


Loading…
Cancel
Save