Browse Source

rpc: first successful websocket event subscription

pull/52/head
Ethan Buchman 10 years ago
parent
commit
8e24b12888
6 changed files with 33 additions and 29 deletions
  1. +1
    -0
      consensus/reactor.go
  2. +10
    -0
      consensus/state.go
  3. +0
    -4
      rpc/core/routes.go
  4. +17
    -17
      rpc/handlers.go
  5. +1
    -0
      rpc/http_server.go
  6. +4
    -8
      rpc/test/client_rpc_test.go

+ 1
- 0
consensus/reactor.go View File

@ -236,6 +236,7 @@ func (conR *ConsensusReactor) ResetToState(state *sm.State) {
// implements events.Eventable
func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
conR.evsw = evsw
conR.conS.SetEventSwitch(evsw)
}
//--------------------------------------


+ 10
- 0
consensus/state.go View File

@ -66,6 +66,7 @@ import (
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/config"
. "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/events"
mempl "github.com/tendermint/tendermint/mempool"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
@ -246,6 +247,8 @@ type ConsensusState struct {
stagedBlock *types.Block // Cache last staged block.
stagedState *sm.State // Cache result of staged block.
lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on.
evsw *events.EventSwitch
}
func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState {
@ -437,6 +440,8 @@ ACTION_LOOP:
if cs.TryFinalizeCommit(rs.Height) {
// Now at new height
// cs.Step is at RoundStepNewHeight or RoundStepNewRound.
// newblock event!
cs.evsw.FireEvent("newblock", cs.state.LastBlockHash)
scheduleNextAction()
continue ACTION_LOOP
} else {
@ -1107,6 +1112,11 @@ func (cs *ConsensusState) saveCommitVoteBlock(block *types.Block, blockParts *ty
}
}
// implements events.Eventable
func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) {
cs.evsw = evsw
}
//-----------------------------------------------------------------------------
// total duration of given round


+ 0
- 4
rpc/core/routes.go View File

@ -4,10 +4,6 @@ import (
"github.com/tendermint/tendermint/rpc"
)
/*
TODO: support Call && GetStorage.
*/
var Routes = map[string]*rpc.RPCFunc{
"status": rpc.NewRPCFunc(Status, []string{}),
"net_info": rpc.NewRPCFunc(NetInfo, []string{}),


+ 17
- 17
rpc/handlers.go View File

@ -26,7 +26,7 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) {
// websocket endpoint
w := NewWebsocketManager(evsw)
http.HandleFunc("/events", w.websocketHandler) // websocket.Handler(w.eventsHandler))
mux.HandleFunc("/events", w.websocketHandler) // websocket.Handler(w.eventsHandler))
}
//-------------------------------------
@ -191,14 +191,20 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
//-----------------------------------------------------------------------------
// rpc.websocket
const (
WSConnectionReaperSeconds = 5
MaxFailedSendsSeconds = 10
WriteChanBufferSize = 10
)
// for requests coming in
type WsRequest struct {
type WSRequest struct {
Type string // subscribe or unsubscribe
Event string
}
// for responses going out
type WsResponse struct {
type WSResponse struct {
Event string
Data interface{}
Error string
@ -209,7 +215,7 @@ type WsResponse struct {
type Connection struct {
id string
wsCon *websocket.Conn
writeChan chan WsResponse
writeChan chan WSResponse
quitChan chan struct{}
failedSends uint
}
@ -219,7 +225,7 @@ func NewConnection(con *websocket.Conn) *Connection {
return &Connection{
id: con.RemoteAddr().String(),
wsCon: con,
writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full
writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
}
}
@ -276,15 +282,9 @@ func (w *WebsocketManager) handleWebsocket(con *websocket.Conn) {
w.write(c)
}
const (
WsConnectionReaperSeconds = 5
MaxFailedSendsSeconds = 10
WriteChanBuffer = 10
)
// read from the socket and subscribe to or unsubscribe from events
func (w *WebsocketManager) read(con *Connection) {
reaper := time.Tick(time.Second * WsConnectionReaperSeconds)
reaper := time.Tick(time.Second * WSConnectionReaperSeconds)
for {
select {
case <-reaper:
@ -302,17 +302,17 @@ func (w *WebsocketManager) read(con *Connection) {
// so kill the connection
con.quitChan <- struct{}{}
}
var req WsRequest
var req WSRequest
err = json.Unmarshal(in, &req)
if err != nil {
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
con.writeChan <- WsResponse{Error: errStr}
con.writeChan <- WSResponse{Error: errStr}
}
switch req.Type {
case "subscribe":
log.Info("New event subscription", "con id", con.id, "event", req.Event)
w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
resp := WsResponse{
resp := WSResponse{
Event: req.Event,
Data: msg,
}
@ -334,7 +334,7 @@ func (w *WebsocketManager) read(con *Connection) {
w.ew.RemoveListener(con.id)
}
default:
con.writeChan <- WsResponse{Error: "Unknown request type: " + req.Type}
con.writeChan <- WSResponse{Error: "Unknown request type: " + req.Type}
}
}
@ -350,7 +350,7 @@ func (w *WebsocketManager) write(con *Connection) {
buf := new(bytes.Buffer)
binary.WriteJSON(msg, buf, n, err)
if *err != nil {
log.Error("Failed to write JSON WsResponse", "error", err)
log.Error("Failed to write JSON WSResponse", "error", err)
} else {
//websocket.Message.Send(con.wsCon, buf.Bytes())
if err := con.wsCon.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {


+ 1
- 0
rpc/http_server.go View File

@ -13,6 +13,7 @@ import (
"github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/alert"
)
func StartHTTPServer(listenAddr string, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) {


+ 4
- 8
rpc/test/client_rpc_test.go View File

@ -85,11 +85,10 @@ func TestWSConnect(t *testing.T) {
dialer := websocket.DefaultDialer
rHeader := http.Header{}
_, r, err := dialer.Dial(websocketAddr, rHeader)
fmt.Println("respoinse:", r)
if err != nil {
t.Fatal(err)
}
fmt.Println("respoinse:", r)
}
func TestWSSubscribe(t *testing.T) {
@ -99,16 +98,13 @@ func TestWSSubscribe(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = con.WriteJSON(rpc.WsRequest{
err = con.WriteJSON(rpc.WSRequest{
Type: "subscribe",
Event: "newblock",
})
if err != nil {
t.Fatal(err)
}
/*
typ, p, err := con.ReadMessage()
fmt.Println("RESPONSE:", typ, string(p), err)
*/
typ, p, err := con.ReadMessage()
fmt.Println("RESPONSE:", typ, string(p), err)
}

Loading…
Cancel
Save