|
@ -83,6 +83,19 @@ func (cli *socketClient) OnStop() { |
|
|
if cli.conn != nil { |
|
|
if cli.conn != nil { |
|
|
cli.conn.Close() |
|
|
cli.conn.Close() |
|
|
} |
|
|
} |
|
|
|
|
|
cli.flushQueue() |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (cli *socketClient) flushQueue() { |
|
|
|
|
|
LOOP: |
|
|
|
|
|
for { |
|
|
|
|
|
select { |
|
|
|
|
|
case reqres := <-cli.reqQueue: |
|
|
|
|
|
reqres.Done() |
|
|
|
|
|
default: |
|
|
|
|
|
break LOOP |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Set listener for all responses
|
|
|
// Set listener for all responses
|
|
@ -94,6 +107,10 @@ func (cli *socketClient) SetResponseCallback(resCb Callback) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (cli *socketClient) StopForError(err error) { |
|
|
func (cli *socketClient) StopForError(err error) { |
|
|
|
|
|
if !cli.IsRunning() { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
cli.mtx.Lock() |
|
|
cli.mtx.Lock() |
|
|
log.Warn(Fmt("Stopping tmsp.socketClient for error: %v\n", err.Error())) |
|
|
log.Warn(Fmt("Stopping tmsp.socketClient for error: %v\n", err.Error())) |
|
|
if cli.err == nil { |
|
|
if cli.err == nil { |
|
@ -262,9 +279,12 @@ func (cli *socketClient) EchoSync(msg string) (res types.Result) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (cli *socketClient) FlushSync() error { |
|
|
func (cli *socketClient) FlushSync() error { |
|
|
log.Warn("FlushSync") |
|
|
|
|
|
cli.queueRequest(types.ToRequestFlush()).Wait() |
|
|
|
|
|
log.Warn("Done FlushSync") |
|
|
|
|
|
|
|
|
reqRes := cli.queueRequest(types.ToRequestFlush()) |
|
|
|
|
|
if reqRes == nil { |
|
|
|
|
|
return fmt.Errorf("Remote app is not running") |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here
|
|
|
return cli.err |
|
|
return cli.err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -358,6 +378,10 @@ func (cli *socketClient) EndBlockSync(height uint64) (validators []*types.Valida |
|
|
//----------------------------------------
|
|
|
//----------------------------------------
|
|
|
|
|
|
|
|
|
func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { |
|
|
func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { |
|
|
|
|
|
if !cli.IsRunning() { |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
reqres := NewReqRes(req) |
|
|
reqres := NewReqRes(req) |
|
|
|
|
|
|
|
|
// TODO: set cli.err if reqQueue times out
|
|
|
// TODO: set cli.err if reqQueue times out
|
|
|