|
@ -21,7 +21,7 @@ const maxResponseSize = 1048576 // 1MB |
|
|
// with concurrent callers.
|
|
|
// with concurrent callers.
|
|
|
type remoteAppContext struct { |
|
|
type remoteAppContext struct { |
|
|
QuitService |
|
|
QuitService |
|
|
sync.Mutex |
|
|
|
|
|
|
|
|
sync.Mutex // [EB]: is this even used?
|
|
|
|
|
|
|
|
|
reqQueue chan *reqRes |
|
|
reqQueue chan *reqRes |
|
|
|
|
|
|
|
@ -65,7 +65,7 @@ func (app *remoteAppContext) SetResponseCallback(resCb Callback) { |
|
|
|
|
|
|
|
|
func (app *remoteAppContext) StopForError(err error) { |
|
|
func (app *remoteAppContext) StopForError(err error) { |
|
|
app.mtx.Lock() |
|
|
app.mtx.Lock() |
|
|
fmt.Println("Stopping remoteAppContext for error:", err) |
|
|
|
|
|
|
|
|
log.Error("Stopping remoteAppContext for error.", "error", err) |
|
|
if app.err == nil { |
|
|
if app.err == nil { |
|
|
app.err = err |
|
|
app.err = err |
|
|
} |
|
|
} |
|
@ -89,11 +89,15 @@ func (app *remoteAppContext) sendRequestsRoutine() { |
|
|
case <-app.QuitService.Quit: |
|
|
case <-app.QuitService.Quit: |
|
|
return |
|
|
return |
|
|
case reqres := <-app.reqQueue: |
|
|
case reqres := <-app.reqQueue: |
|
|
|
|
|
|
|
|
|
|
|
app.willSendReq(reqres) |
|
|
|
|
|
|
|
|
wire.WriteBinary(reqres.Request, app.bufWriter, &n, &err) |
|
|
wire.WriteBinary(reqres.Request, app.bufWriter, &n, &err) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
app.StopForError(err) |
|
|
app.StopForError(err) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) |
|
|
if _, ok := reqres.Request.(tmsp.RequestFlush); ok { |
|
|
if _, ok := reqres.Request.(tmsp.RequestFlush); ok { |
|
|
err = app.bufWriter.Flush() |
|
|
err = app.bufWriter.Flush() |
|
|
if err != nil { |
|
|
if err != nil { |
|
@ -101,7 +105,6 @@ func (app *remoteAppContext) sendRequestsRoutine() { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
app.didSendReq(reqres) |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -121,6 +124,7 @@ func (app *remoteAppContext) recvResponseRoutine() { |
|
|
case tmsp.ResponseException: |
|
|
case tmsp.ResponseException: |
|
|
app.StopForError(errors.New(res.Error)) |
|
|
app.StopForError(errors.New(res.Error)) |
|
|
default: |
|
|
default: |
|
|
|
|
|
log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) |
|
|
err := app.didRecvResponse(res) |
|
|
err := app.didRecvResponse(res) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
app.StopForError(err) |
|
|
app.StopForError(err) |
|
@ -129,7 +133,7 @@ func (app *remoteAppContext) recvResponseRoutine() { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (app *remoteAppContext) didSendReq(reqres *reqRes) { |
|
|
|
|
|
|
|
|
func (app *remoteAppContext) willSendReq(reqres *reqRes) { |
|
|
app.mtx.Lock() |
|
|
app.mtx.Lock() |
|
|
defer app.mtx.Unlock() |
|
|
defer app.mtx.Unlock() |
|
|
app.reqSent.PushBack(reqres) |
|
|
app.reqSent.PushBack(reqres) |
|
|