@ -337,10 +337,10 @@ func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) {
// rpc.websocket
const (
writeChanCapacity = 1000
wsWriteWait = 3 0 * time . Second // each write times out after this.
defaultWSPongWait = 30 * time . Second
defaultWSPingPeriod = ( defaultWSPong Wait * 9 ) / 10
defaultWSWriteChanCapacity = 1000
defaultWSWriteWait = 1 0 * time . Second
defaultWSReadWait = 30 * time . Second
defaultWSPingPeriod = ( defaultWSRead Wait * 9 ) / 10
)
// a single websocket connection
@ -349,19 +349,23 @@ const (
type wsConnection struct {
cmn . BaseService
remoteAddr string
baseConn * websocket . Conn
writeChan chan types . RPCResponse
readTimeout * time . Timer
pingTicker * time . Ticker
remoteAddr string
baseConn * websocket . Conn
writeChan chan types . RPCResponse
funcMap map [ string ] * RPCFunc
evsw events . EventSwitch
// write channel capacity
writeChanCapacity int
// each write times out after this.
writeWait time . Duration
// Connection times out if we haven't received *anything* in this long, not even pings.
pongWait time . Duration
read Wait time . Duration
// Send pings to server with this period. Must be less than pongWait.
// Send pings to server with this period. Must be less than readWait, but greater than zero .
pingPeriod time . Duration
}
@ -370,13 +374,14 @@ type wsConnection struct {
// ping period and pong wait time.
func NewWSConnection ( baseConn * websocket . Conn , funcMap map [ string ] * RPCFunc , evsw events . EventSwitch , options ... func ( * wsConnection ) ) * wsConnection {
wsc := & wsConnection {
remoteAddr : baseConn . RemoteAddr ( ) . String ( ) ,
baseConn : baseConn ,
writeChan : make ( chan types . RPCResponse , writeChanCapacity ) , // error when full.
funcMap : funcMap ,
evsw : evsw ,
pongWait : defaultWSPongWait ,
pingPeriod : defaultWSPingPeriod ,
remoteAddr : baseConn . RemoteAddr ( ) . String ( ) ,
baseConn : baseConn ,
funcMap : funcMap ,
evsw : evsw ,
writeWait : defaultWSWriteWait ,
writeChanCapacity : defaultWSWriteChanCapacity ,
readWait : defaultWSReadWait ,
pingPeriod : defaultWSPingPeriod ,
}
for _ , option := range options {
option ( wsc )
@ -385,69 +390,48 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw
return wsc
}
// PingPong allows changing ping period and pong wait time. If ping period
// greater or equal to pong wait time, panic will be thrown.
func PingPong ( pingPeriod , pongWait time . Duration ) func ( * wsConnection ) {
func WriteWait ( writeWait time . Duration ) func ( * wsConnection ) {
return func ( wsc * wsConnection ) {
wsc . writeWait = writeWait
}
}
func WriteChanCapacity ( cap int ) func ( * wsConnection ) {
return func ( wsc * wsConnection ) {
wsc . writeChanCapacity = cap
}
}
func ReadWait ( readWait time . Duration ) func ( * wsConnection ) {
return func ( wsc * wsConnection ) {
wsc . readWait = readWait
}
}
func PingPeriod ( pingPeriod time . Duration ) func ( * wsConnection ) {
return func ( wsc * wsConnection ) {
if pingPeriod >= pongWait {
panic ( fmt . Sprintf ( "ping period (%v) must be less than pong wait time (%v)" , pingPeriod , pongWait ) )
}
wsc . pingPeriod = pingPeriod
wsc . pongWait = pongWait
}
}
// wsc.Start() blocks until the connection closes.
func ( wsc * wsConnection ) OnStart ( ) error {
wsc . BaseService . OnStart ( )
// these must be set before the readRoutine is created, as it may
// call wsc.Stop(), which accesses these timers
wsc . readTimeout = time . NewTimer ( wsc . pongWait )
wsc . pingTicker = time . NewTicker ( wsc . pingPeriod )
wsc . writeChan = make ( chan types . RPCResponse , wsc . writeChanCapacity )
// Read subscriptions/unsubscriptions to events
go wsc . readRoutine ( )
// Custom Ping handler to touch readTimeout
wsc . baseConn . SetPingHandler ( func ( m string ) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
go wsc . baseConn . WriteControl ( websocket . PongMessage , [ ] byte ( m ) , time . Now ( ) . Add ( wsWriteWait ) )
wsc . readTimeout . Reset ( wsc . pongWait )
return nil
} )
wsc . baseConn . SetPongHandler ( func ( m string ) error {
// NOTE: https://github.com/gorilla/websocket/issues/97
wsc . readTimeout . Reset ( wsc . pongWait )
return nil
} )
go wsc . readTimeoutRoutine ( )
// Write responses, BLOCKING.
wsc . writeRoutine ( )
return nil
}
func ( wsc * wsConnection ) OnStop ( ) {
wsc . BaseService . OnStop ( )
if wsc . evsw != nil {
wsc . evsw . RemoveListener ( wsc . remoteAddr )
}
wsc . readTimeout . Stop ( )
wsc . pingTicker . Stop ( )
// The write loop closes the websocket connection
// when it exits its loop, and the read loop
// closes the writeChan
}
func ( wsc * wsConnection ) readTimeoutRoutine ( ) {
select {
case <- wsc . readTimeout . C :
wsc . Logger . Info ( "Stopping connection due to read timeout" )
wsc . Stop ( )
case <- wsc . Quit :
return
}
// The write loop closes the websocket connection when it exits its loop, and
// the read loop closes the writeChan.
}
// Implements WSRPCConnection
@ -487,30 +471,30 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
// Read from the socket and subscribe to or unsubscribe from events
func ( wsc * wsConnection ) readRoutine ( ) {
defer func ( ) {
wsc . baseConn . Close ( )
} ( )
// Do not close writeChan, to allow WriteRPCResponse() to fail.
// defer close(wsc.writeChan)
for {
select {
case <- wsc . Quit :
return
default :
// reset deadline for every type of message (control or data)
wsc . baseConn . SetReadDeadline ( time . Now ( ) . Add ( wsc . readWait ) )
var in [ ] byte
// Do not set a deadline here like below:
// wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.pongWait))
// The client may not send anything for a while.
// We use `readTimeout` to handle read timeouts.
_ , in , err := wsc . baseConn . ReadMessage ( )
if err != nil {
if websocket . IsCloseError ( err , websocket . CloseNormalClosure ) {
wsc . Logger . Info ( "Client closed the connection" , "remote" , wsc . remoteAddr )
wsc . Logger . Info ( "Client closed the connection" )
} else {
wsc . Logger . Info ( "Failed to read from connection" , "remote ", wsc . remoteAddr , "err" , err . Error ( ) )
wsc . Logger . Error ( "Failed to read request ", "err" , err )
}
wsc . Stop ( )
return
}
wsc . readTimeout . Reset ( wsc . pongWait )
var request types . RPCRequest
err = json . Unmarshal ( in , & request )
@ -558,15 +542,33 @@ func (wsc *wsConnection) readRoutine() {
// receives on a write channel and writes out on the socket
func ( wsc * wsConnection ) writeRoutine ( ) {
defer wsc . baseConn . Close ( )
pingTicker := time . NewTicker ( wsc . pingPeriod )
defer func ( ) {
pingTicker . Stop ( )
wsc . baseConn . Close ( )
} ( )
// https://github.com/gorilla/websocket/issues/97
pongs := make ( chan string , 1 )
wsc . baseConn . SetPingHandler ( func ( m string ) error {
select {
case pongs <- m :
default :
}
return nil
} )
for {
select {
case <- wsc . Quit :
return
case <- wsc . pingTicker . C :
case m := <- pongs :
err := wsc . writeMessageWithDeadline ( websocket . PongMessage , [ ] byte ( m ) )
if err != nil {
wsc . Logger . Info ( "Failed to write pong (client may disconnect)" , "err" , err )
}
case <- pingTicker . C :
err := wsc . writeMessageWithDeadline ( websocket . PingMessage , [ ] byte { } )
if err != nil {
wsc . Logger . Error ( "Failed to write ping message on websocket" , "err" , err )
wsc . Logger . Error ( "Failed to write ping" , "err" , err )
wsc . Stop ( )
return
}
@ -576,11 +578,13 @@ func (wsc *wsConnection) writeRoutine() {
wsc . Logger . Error ( "Failed to marshal RPCResponse to JSON" , "err" , err )
} else {
if err = wsc . writeMessageWithDeadline ( websocket . TextMessage , jsonBytes ) ; err != nil {
wsc . Logger . Error ( "Failed to write response on websocket " , "err" , err )
wsc . Logger . Error ( "Failed to write response" , "err" , err )
wsc . Stop ( )
return
}
}
case <- wsc . Quit :
return
}
}
}
@ -588,7 +592,7 @@ func (wsc *wsConnection) writeRoutine() {
// All writes to the websocket must (re)set the write deadline.
// If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553)
func ( wsc * wsConnection ) writeMessageWithDeadline ( msgType int , msg [ ] byte ) error {
wsc . baseConn . SetWriteDeadline ( time . Now ( ) . Add ( wsW riteWait ) )
wsc . baseConn . SetWriteDeadline ( time . Now ( ) . Add ( wsc . w riteWait ) )
return wsc . baseConn . WriteMessage ( msgType , msg )
}
@ -610,10 +614,8 @@ func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, w
funcMap : funcMap ,
evsw : evsw ,
Upgrader : websocket . Upgrader {
ReadBufferSize : 1024 ,
WriteBufferSize : 1024 ,
CheckOrigin : func ( r * http . Request ) bool {
// TODO
// TODO ???
return true
} ,
} ,
@ -637,7 +639,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ
// register connection
con := NewWSConnection ( wsConn , wm . funcMap , wm . evsw , wm . wsConnOptions ... )
con . SetLogger ( wm . logger )
con . SetLogger ( wm . logger . With ( "remote" , wsConn . RemoteAddr ( ) ) )
wm . logger . Info ( "New websocket connection" , "remote" , con . remoteAddr )
con . Start ( ) // Blocking
}