diff --git a/consensus/reactor.go b/consensus/reactor.go index 0e264714c..16ef2129f 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -236,6 +236,7 @@ func (conR *ConsensusReactor) ResetToState(state *sm.State) { // implements events.Eventable func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { conR.evsw = evsw + conR.conS.SetEventSwitch(evsw) } //-------------------------------------- diff --git a/consensus/state.go b/consensus/state.go index c036fa3b4..0a5431de8 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -66,6 +66,7 @@ import ( . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" . "github.com/tendermint/tendermint/consensus/types" + "github.com/tendermint/tendermint/events" mempl "github.com/tendermint/tendermint/mempool" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -246,6 +247,8 @@ type ConsensusState struct { stagedBlock *types.Block // Cache last staged block. stagedState *sm.State // Cache result of staged block. lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on. + + evsw *events.EventSwitch } func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState { @@ -437,6 +440,8 @@ ACTION_LOOP: if cs.TryFinalizeCommit(rs.Height) { // Now at new height // cs.Step is at RoundStepNewHeight or RoundStepNewRound. + // newblock event! + cs.evsw.FireEvent("newblock", cs.state.LastBlockHash) scheduleNextAction() continue ACTION_LOOP } else { @@ -1107,6 +1112,11 @@ func (cs *ConsensusState) saveCommitVoteBlock(block *types.Block, blockParts *ty } } +// implements events.Eventable +func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) { + cs.evsw = evsw +} + //----------------------------------------------------------------------------- // total duration of given round diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 62630fb5a..395ad82a4 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -4,10 +4,6 @@ import ( "github.com/tendermint/tendermint/rpc" ) -/* -TODO: support Call && GetStorage. -*/ - var Routes = map[string]*rpc.RPCFunc{ "status": rpc.NewRPCFunc(Status, []string{}), "net_info": rpc.NewRPCFunc(NetInfo, []string{}), diff --git a/rpc/handlers.go b/rpc/handlers.go index 4e55e0267..aa112971f 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -26,7 +26,7 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) { // websocket endpoint w := NewWebsocketManager(evsw) - http.HandleFunc("/events", w.websocketHandler) // websocket.Handler(w.eventsHandler)) + mux.HandleFunc("/events", w.websocketHandler) // websocket.Handler(w.eventsHandler)) } //------------------------------------- @@ -191,14 +191,20 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { //----------------------------------------------------------------------------- // rpc.websocket +const ( + WSConnectionReaperSeconds = 5 + MaxFailedSendsSeconds = 10 + WriteChanBufferSize = 10 +) + // for requests coming in -type WsRequest struct { +type WSRequest struct { Type string // subscribe or unsubscribe Event string } // for responses going out -type WsResponse struct { +type WSResponse struct { Event string Data interface{} Error string @@ -209,7 +215,7 @@ type WsResponse struct { type Connection struct { id string wsCon *websocket.Conn - writeChan chan WsResponse + writeChan chan WSResponse quitChan chan struct{} failedSends uint } @@ -219,7 +225,7 @@ func NewConnection(con *websocket.Conn) *Connection { return &Connection{ id: con.RemoteAddr().String(), wsCon: con, - writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full + writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full } } @@ -276,15 +282,9 @@ func (w *WebsocketManager) handleWebsocket(con *websocket.Conn) { w.write(c) } -const ( - WsConnectionReaperSeconds = 5 - MaxFailedSendsSeconds = 10 - WriteChanBuffer = 10 -) - // read from the socket and subscribe to or unsubscribe from events func (w *WebsocketManager) read(con *Connection) { - reaper := time.Tick(time.Second * WsConnectionReaperSeconds) + reaper := time.Tick(time.Second * WSConnectionReaperSeconds) for { select { case <-reaper: @@ -302,17 +302,17 @@ func (w *WebsocketManager) read(con *Connection) { // so kill the connection con.quitChan <- struct{}{} } - var req WsRequest + var req WSRequest err = json.Unmarshal(in, &req) if err != nil { errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) - con.writeChan <- WsResponse{Error: errStr} + con.writeChan <- WSResponse{Error: errStr} } switch req.Type { case "subscribe": log.Info("New event subscription", "con id", con.id, "event", req.Event) w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) { - resp := WsResponse{ + resp := WSResponse{ Event: req.Event, Data: msg, } @@ -334,7 +334,7 @@ func (w *WebsocketManager) read(con *Connection) { w.ew.RemoveListener(con.id) } default: - con.writeChan <- WsResponse{Error: "Unknown request type: " + req.Type} + con.writeChan <- WSResponse{Error: "Unknown request type: " + req.Type} } } @@ -350,7 +350,7 @@ func (w *WebsocketManager) write(con *Connection) { buf := new(bytes.Buffer) binary.WriteJSON(msg, buf, n, err) if *err != nil { - log.Error("Failed to write JSON WsResponse", "error", err) + log.Error("Failed to write JSON WSResponse", "error", err) } else { //websocket.Message.Send(con.wsCon, buf.Bytes()) if err := con.wsCon.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil { diff --git a/rpc/http_server.go b/rpc/http_server.go index 1603ab79b..14ccb6521 100644 --- a/rpc/http_server.go +++ b/rpc/http_server.go @@ -13,6 +13,7 @@ import ( "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/events" + "github.com/tendermint/tendermint/alert" ) func StartHTTPServer(listenAddr string, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) { diff --git a/rpc/test/client_rpc_test.go b/rpc/test/client_rpc_test.go index 9c261d9f9..0d712192b 100644 --- a/rpc/test/client_rpc_test.go +++ b/rpc/test/client_rpc_test.go @@ -85,11 +85,10 @@ func TestWSConnect(t *testing.T) { dialer := websocket.DefaultDialer rHeader := http.Header{} _, r, err := dialer.Dial(websocketAddr, rHeader) + fmt.Println("respoinse:", r) if err != nil { t.Fatal(err) } - fmt.Println("respoinse:", r) - } func TestWSSubscribe(t *testing.T) { @@ -99,16 +98,13 @@ func TestWSSubscribe(t *testing.T) { if err != nil { t.Fatal(err) } - err = con.WriteJSON(rpc.WsRequest{ + err = con.WriteJSON(rpc.WSRequest{ Type: "subscribe", Event: "newblock", }) if err != nil { t.Fatal(err) } - /* - typ, p, err := con.ReadMessage() - fmt.Println("RESPONSE:", typ, string(p), err) - */ - + typ, p, err := con.ReadMessage() + fmt.Println("RESPONSE:", typ, string(p), err) }