|
@ -24,11 +24,6 @@ const ( |
|
|
reqQueueSize = 256 |
|
|
reqQueueSize = 256 |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
type reqResWithContext struct { |
|
|
|
|
|
R *ReqRes |
|
|
|
|
|
C context.Context // if context.Err is not nil, reqRes will be thrown away (ignored)
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// This is goroutine-safe, but users should beware that the application in
|
|
|
// This is goroutine-safe, but users should beware that the application in
|
|
|
// general is not meant to be interfaced with concurrent callers.
|
|
|
// general is not meant to be interfaced with concurrent callers.
|
|
|
type socketClient struct { |
|
|
type socketClient struct { |
|
@ -39,7 +34,7 @@ type socketClient struct { |
|
|
mustConnect bool |
|
|
mustConnect bool |
|
|
conn net.Conn |
|
|
conn net.Conn |
|
|
|
|
|
|
|
|
reqQueue chan *reqResWithContext |
|
|
|
|
|
|
|
|
reqQueue chan *ReqRes |
|
|
|
|
|
|
|
|
mtx sync.Mutex |
|
|
mtx sync.Mutex |
|
|
err error |
|
|
err error |
|
@ -55,7 +50,7 @@ var _ Client = (*socketClient)(nil) |
|
|
func NewSocketClient(logger log.Logger, addr string, mustConnect bool) Client { |
|
|
func NewSocketClient(logger log.Logger, addr string, mustConnect bool) Client { |
|
|
cli := &socketClient{ |
|
|
cli := &socketClient{ |
|
|
logger: logger, |
|
|
logger: logger, |
|
|
reqQueue: make(chan *reqResWithContext, reqQueueSize), |
|
|
|
|
|
|
|
|
reqQueue: make(chan *ReqRes, reqQueueSize), |
|
|
mustConnect: mustConnect, |
|
|
mustConnect: mustConnect, |
|
|
addr: addr, |
|
|
addr: addr, |
|
|
reqSent: list.New(), |
|
|
reqSent: list.New(), |
|
@ -99,7 +94,10 @@ func (cli *socketClient) OnStop() { |
|
|
cli.conn.Close() |
|
|
cli.conn.Close() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
cli.drainQueue() |
|
|
|
|
|
|
|
|
// this timeout is arbitrary.
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
|
|
|
|
|
defer cancel() |
|
|
|
|
|
cli.drainQueue(ctx) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Error returns an error if the client was stopped abruptly.
|
|
|
// Error returns an error if the client was stopped abruptly.
|
|
@ -132,13 +130,9 @@ func (cli *socketClient) sendRequestsRoutine(ctx context.Context, conn io.Writer |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if reqres.C.Err() != nil { |
|
|
|
|
|
cli.logger.Debug("Request's context is done", "req", reqres.R, "err", reqres.C.Err()) |
|
|
|
|
|
continue |
|
|
|
|
|
} |
|
|
|
|
|
cli.willSendReq(reqres.R) |
|
|
|
|
|
|
|
|
cli.willSendReq(reqres) |
|
|
|
|
|
|
|
|
if err := types.WriteMessage(reqres.R.Request, bw); err != nil { |
|
|
|
|
|
|
|
|
if err := types.WriteMessage(reqres.Request, bw); err != nil { |
|
|
cli.stopForError(fmt.Errorf("write to buffer: %w", err)) |
|
|
cli.stopForError(fmt.Errorf("write to buffer: %w", err)) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
@ -435,13 +429,13 @@ func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request, s |
|
|
|
|
|
|
|
|
if sync { |
|
|
if sync { |
|
|
select { |
|
|
select { |
|
|
case cli.reqQueue <- &reqResWithContext{R: reqres, C: context.Background()}: |
|
|
|
|
|
|
|
|
case cli.reqQueue <- reqres: |
|
|
case <-ctx.Done(): |
|
|
case <-ctx.Done(): |
|
|
return nil, ctx.Err() |
|
|
return nil, ctx.Err() |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
select { |
|
|
select { |
|
|
case cli.reqQueue <- &reqResWithContext{R: reqres, C: ctx}: |
|
|
|
|
|
|
|
|
case cli.reqQueue <- reqres: |
|
|
default: |
|
|
default: |
|
|
return nil, errors.New("buffer is full") |
|
|
return nil, errors.New("buffer is full") |
|
|
} |
|
|
} |
|
@ -486,7 +480,7 @@ func queueErr(e error) error { |
|
|
|
|
|
|
|
|
// drainQueue marks as complete and discards all remaining pending requests
|
|
|
// drainQueue marks as complete and discards all remaining pending requests
|
|
|
// from the queue.
|
|
|
// from the queue.
|
|
|
func (cli *socketClient) drainQueue() { |
|
|
|
|
|
|
|
|
func (cli *socketClient) drainQueue(ctx context.Context) { |
|
|
cli.mtx.Lock() |
|
|
cli.mtx.Lock() |
|
|
defer cli.mtx.Unlock() |
|
|
defer cli.mtx.Unlock() |
|
|
|
|
|
|
|
@ -503,8 +497,10 @@ func (cli *socketClient) drainQueue() { |
|
|
// See https://github.com/tendermint/tendermint/issues/6996.
|
|
|
// See https://github.com/tendermint/tendermint/issues/6996.
|
|
|
for { |
|
|
for { |
|
|
select { |
|
|
select { |
|
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
|
return |
|
|
case reqres := <-cli.reqQueue: |
|
|
case reqres := <-cli.reqQueue: |
|
|
reqres.R.Done() |
|
|
|
|
|
|
|
|
reqres.Done() |
|
|
default: |
|
|
default: |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|