Browse Source

Flush msgs to remoteProxyApp automatically

pull/176/head
Jae Kwon 9 years ago
parent
commit
4c12de6203
1 changed files with 24 additions and 6 deletions
  1. +24
    -6
      proxy/remote_app_conn.go

+ 24
- 6
proxy/remote_app_conn.go View File

@ -15,6 +15,7 @@ import (
)
const maxResponseSize = 1048576 // 1MB
const flushThrottleMS = 20 // Don't wait longer than...
// This is goroutine-safe, but users should beware that
// the application in general is not meant to be interfaced
@ -23,7 +24,8 @@ type remoteAppConn struct {
QuitService
sync.Mutex // [EB]: is this even used?
reqQueue chan *reqRes
reqQueue chan *reqRes
flushTimer *ThrottleTimer
mtx sync.Mutex
conn net.Conn
@ -35,7 +37,9 @@ type remoteAppConn struct {
func NewRemoteAppConn(conn net.Conn, bufferSize int) *remoteAppConn {
app := &remoteAppConn{
reqQueue: make(chan *reqRes, bufferSize),
reqQueue: make(chan *reqRes, bufferSize),
flushTimer: NewThrottleTimer("remoteAppConn", flushThrottleMS),
conn: conn,
bufWriter: bufio.NewWriter(conn),
reqSent: list.New(),
@ -57,6 +61,7 @@ func (app *remoteAppConn) OnStop() {
app.conn.Close()
}
// NOTE: callback may get internally generated flush responses.
func (app *remoteAppConn) SetResponseCallback(resCb Callback) {
app.mtx.Lock()
defer app.mtx.Unlock()
@ -86,12 +91,16 @@ func (app *remoteAppConn) sendRequestsRoutine() {
var n int
var err error
select {
case <-app.flushTimer.Ch:
select {
case app.reqQueue <- newReqRes(tmsp.RequestFlush{}):
default:
// Probably will fill the buffer, or retry later.
}
case <-app.QuitService.Quit:
return
case reqres := <-app.reqQueue:
app.willSendReq(reqres)
wire.WriteBinaryLengthPrefixed(struct{ tmsp.Request }{reqres.Request}, app.bufWriter, &n, &err) // Length prefix
if err != nil {
app.StopForError(err)
@ -234,9 +243,18 @@ func (app *remoteAppConn) GetHashSync() (hash []byte, err error) {
//----------------------------------------
func (app *remoteAppConn) queueRequest(req tmsp.Request) *reqRes {
reqres := NewreqRes(req)
reqres := newReqRes(req)
// TODO: set app.err if reqQueue times out
app.reqQueue <- reqres
// Maybe auto-flush, or unset auto-flush
switch req.(type) {
case tmsp.RequestFlush:
app.flushTimer.Unset()
default:
app.flushTimer.Set()
}
return reqres
}
@ -274,7 +292,7 @@ type reqRes struct {
tmsp.Response // Not set atomically, so be sure to use WaitGroup.
}
func NewreqRes(req tmsp.Request) *reqRes {
func newReqRes(req tmsp.Request) *reqRes {
return &reqRes{
Request: req,
WaitGroup: waitGroup1(),


Loading…
Cancel
Save