diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index a8e873af3..30a97cade 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -24,11 +24,6 @@ const ( reqQueueSize = 256 ) -type reqResWithContext struct { - R *ReqRes - C context.Context // if context.Err is not nil, reqRes will be thrown away (ignored) -} - // This is goroutine-safe, but users should beware that the application in // general is not meant to be interfaced with concurrent callers. type socketClient struct { @@ -39,7 +34,7 @@ type socketClient struct { mustConnect bool conn net.Conn - reqQueue chan *reqResWithContext + reqQueue chan *ReqRes mtx sync.Mutex err error @@ -55,7 +50,7 @@ var _ Client = (*socketClient)(nil) func NewSocketClient(logger log.Logger, addr string, mustConnect bool) Client { cli := &socketClient{ logger: logger, - reqQueue: make(chan *reqResWithContext, reqQueueSize), + reqQueue: make(chan *ReqRes, reqQueueSize), mustConnect: mustConnect, addr: addr, reqSent: list.New(), @@ -99,7 +94,10 @@ func (cli *socketClient) OnStop() { cli.conn.Close() } - cli.drainQueue() + // this timeout is arbitrary. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + cli.drainQueue(ctx) } // Error returns an error if the client was stopped abruptly. @@ -132,13 +130,9 @@ func (cli *socketClient) sendRequestsRoutine(ctx context.Context, conn io.Writer return } - if reqres.C.Err() != nil { - cli.logger.Debug("Request's context is done", "req", reqres.R, "err", reqres.C.Err()) - continue - } - cli.willSendReq(reqres.R) + cli.willSendReq(reqres) - if err := types.WriteMessage(reqres.R.Request, bw); err != nil { + if err := types.WriteMessage(reqres.Request, bw); err != nil { cli.stopForError(fmt.Errorf("write to buffer: %w", err)) return } @@ -435,13 +429,13 @@ func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request, s if sync { select { - case cli.reqQueue <- &reqResWithContext{R: reqres, C: context.Background()}: + case cli.reqQueue <- reqres: case <-ctx.Done(): return nil, ctx.Err() } } else { select { - case cli.reqQueue <- &reqResWithContext{R: reqres, C: ctx}: + case cli.reqQueue <- reqres: default: return nil, errors.New("buffer is full") } @@ -486,7 +480,7 @@ func queueErr(e error) error { // drainQueue marks as complete and discards all remaining pending requests // from the queue. -func (cli *socketClient) drainQueue() { +func (cli *socketClient) drainQueue(ctx context.Context) { cli.mtx.Lock() defer cli.mtx.Unlock() @@ -503,8 +497,10 @@ func (cli *socketClient) drainQueue() { // See https://github.com/tendermint/tendermint/issues/6996. for { select { + case <-ctx.Done(): + return case reqres := <-cli.reqQueue: - reqres.R.Done() + reqres.Done() default: return }