diff --git a/proxy/remote_app_conn.go b/proxy/remote_app_conn.go index 5c52f1e8b..a1dff9a84 100644 --- a/proxy/remote_app_conn.go +++ b/proxy/remote_app_conn.go @@ -15,6 +15,7 @@ import ( ) const maxResponseSize = 1048576 // 1MB +const flushThrottleMS = 20 // Don't wait longer than... // This is goroutine-safe, but users should beware that // the application in general is not meant to be interfaced @@ -23,7 +24,8 @@ type remoteAppConn struct { QuitService sync.Mutex // [EB]: is this even used? - reqQueue chan *reqRes + reqQueue chan *reqRes + flushTimer *ThrottleTimer mtx sync.Mutex conn net.Conn @@ -35,7 +37,9 @@ type remoteAppConn struct { func NewRemoteAppConn(conn net.Conn, bufferSize int) *remoteAppConn { app := &remoteAppConn{ - reqQueue: make(chan *reqRes, bufferSize), + reqQueue: make(chan *reqRes, bufferSize), + flushTimer: NewThrottleTimer("remoteAppConn", flushThrottleMS), + conn: conn, bufWriter: bufio.NewWriter(conn), reqSent: list.New(), @@ -57,6 +61,7 @@ func (app *remoteAppConn) OnStop() { app.conn.Close() } +// NOTE: callback may get internally generated flush responses. func (app *remoteAppConn) SetResponseCallback(resCb Callback) { app.mtx.Lock() defer app.mtx.Unlock() @@ -86,12 +91,16 @@ func (app *remoteAppConn) sendRequestsRoutine() { var n int var err error select { + case <-app.flushTimer.Ch: + select { + case app.reqQueue <- newReqRes(tmsp.RequestFlush{}): + default: + // Probably will fill the buffer, or retry later. + } case <-app.QuitService.Quit: return case reqres := <-app.reqQueue: - app.willSendReq(reqres) - wire.WriteBinaryLengthPrefixed(struct{ tmsp.Request }{reqres.Request}, app.bufWriter, &n, &err) // Length prefix if err != nil { app.StopForError(err) @@ -234,9 +243,18 @@ func (app *remoteAppConn) GetHashSync() (hash []byte, err error) { //---------------------------------------- func (app *remoteAppConn) queueRequest(req tmsp.Request) *reqRes { - reqres := NewreqRes(req) + reqres := newReqRes(req) // TODO: set app.err if reqQueue times out app.reqQueue <- reqres + + // Maybe auto-flush, or unset auto-flush + switch req.(type) { + case tmsp.RequestFlush: + app.flushTimer.Unset() + default: + app.flushTimer.Set() + } + return reqres } @@ -274,7 +292,7 @@ type reqRes struct { tmsp.Response // Not set atomically, so be sure to use WaitGroup. } -func NewreqRes(req tmsp.Request) *reqRes { +func newReqRes(req tmsp.Request) *reqRes { return &reqRes{ Request: req, WaitGroup: waitGroup1(),