Browse Source

WIP: fix rpc/core

pull/1347/head
Jae Kwon 7 years ago
parent
commit
c541d58d2f
5 changed files with 25 additions and 4 deletions
  1. +2
    -2
      node/node.go
  2. +1
    -1
      rpc/core/events.go
  3. +14
    -0
      rpc/core/routes.go
  4. +7
    -1
      rpc/lib/server/handlers.go
  5. +1
    -0
      rpc/lib/types/types.go

+ 2
- 2
node/node.go View File

@ -499,10 +499,10 @@ func (n *Node) startRPC() ([]net.Listener, error) {
for i, listenAddr := range listenAddrs {
mux := http.NewServeMux()
rpcLogger := n.Logger.With("module", "rpc-server")
wm := rpcserver.NewWebsocketManager(rpccore.Routes, rpcserver.EventSubscriber(n.eventBus))
wm := rpcserver.NewWebsocketManager(rpccore.Routes, rpccore.RoutesCodec, rpcserver.EventSubscriber(n.eventBus))
wm.SetLogger(rpcLogger.With("protocol", "websocket"))
mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpccore.RoutesCodec, rpcLogger)
listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger)
if err != nil {
return nil, err


+ 1
- 1
rpc/core/events.go View File

@ -104,7 +104,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
go func() {
for event := range ch {
tmResult := &ctypes.ResultEvent{query, event.(tmtypes.TMEventData)}
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Request.ID+"#event", tmResult))
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Codec(), wsCtx.Request.ID+"#event", tmResult))
}
}()


+ 14
- 0
rpc/core/routes.go View File

@ -1,7 +1,10 @@
package core
import (
"github.com/tendermint/go-amino"
"github.com/tendermint/go-crypto"
rpc "github.com/tendermint/tendermint/rpc/lib/server"
"github.com/tendermint/tendermint/types"
)
// TODO: better system than "unsafe" prefix
@ -47,3 +50,14 @@ func AddUnsafeRoutes() {
Routes["unsafe_stop_cpu_profiler"] = rpc.NewRPCFunc(UnsafeStopCPUProfiler, "")
Routes["unsafe_write_heap_profile"] = rpc.NewRPCFunc(UnsafeWriteHeapProfile, "filename")
}
var RoutesCodec *amino.Codec
func init() {
cdc := amino.NewCodec()
RoutesCodec = cdc
types.RegisterEventDatas(cdc)
types.RegisterEvidences(cdc)
crypto.RegisterAmino(cdc)
}

+ 7
- 1
rpc/lib/server/handlers.go View File

@ -357,7 +357,7 @@ const (
defaultWSPingPeriod = (defaultWSReadWait * 9) / 10
)
// a single websocket connection contains listener id, underlying ws
// A single websocket connection contains listener id, underlying ws
// connection, and the event switch for subscribing to events.
//
// In case of an error, the connection is stopped.
@ -508,6 +508,12 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
}
}
// Codec returns an amino codec used to decode parameters and encode results.
// It implements WSRPCConnection.
func (wsc *wsConnection) Codec() *amino.Codec {
return wsc.cdc
}
// Read from the socket and subscribe to or unsubscribe from events
func (wsc *wsConnection) readRoutine() {
defer func() {


+ 1
- 0
rpc/lib/types/types.go View File

@ -155,6 +155,7 @@ type WSRPCConnection interface {
WriteRPCResponse(resp RPCResponse)
TryWriteRPCResponse(resp RPCResponse) bool
GetEventSubscriber() EventSubscriber
Codec() *amino.Codec
}
// EventSubscriber mirros tendermint/tendermint/types.EventBusSubscriber


Loading…
Cancel
Save