diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 726c554d4..821845801 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -279,7 +279,7 @@ func (cli *socketClient) ApplySnapshotChunkAsync( //---------------------------------------- func (cli *socketClient) FlushSync(ctx context.Context) error { - reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush(), true) + reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush()) if err != nil { return queueErr(err) } @@ -448,29 +448,22 @@ func (cli *socketClient) ApplySnapshotChunkSync( //---------------------------------------- -// queueRequest enqueues req onto the queue. If the queue is full, it ether -// returns an error (sync=false) or blocks (sync=true). -// -// When sync=true, ctx can be used to break early. When sync=false, ctx will be -// used later to determine if request should be dropped (if ctx.Err is -// non-nil). +// queueRequest enqueues req onto the queue. The request can break early if the +// the context is canceled. If the queue is full, this method blocks to allow +// the request to be placed onto the queue. This has the effect of creating an +// unbounded queue of goroutines waiting to write to this queue which is a bit +// antithetical to the purposes of a queue, however, undoing this behavior has +// dangerous upstream implications as a result of the usage of this behavior upstream. +// Remove at your peril. // // The caller is responsible for checking cli.Error. -func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request, sync bool) (*ReqRes, error) { +func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) { reqres := NewReqRes(req) - if sync { - select { - case cli.reqQueue <- &reqResWithContext{R: reqres, C: context.Background()}: - case <-ctx.Done(): - return nil, ctx.Err() - } - } else { - select { - case cli.reqQueue <- &reqResWithContext{R: reqres, C: ctx}: - default: - return nil, errors.New("buffer is full") - } + select { + case cli.reqQueue <- &reqResWithContext{R: reqres, C: ctx}: + case <-ctx.Done(): + return nil, ctx.Err() } return reqres, nil @@ -481,7 +474,7 @@ func (cli *socketClient) queueRequestAsync( req *types.Request, ) (*ReqRes, error) { - reqres, err := cli.queueRequest(ctx, req, false) + reqres, err := cli.queueRequest(ctx, req) if err != nil { return nil, queueErr(err) } @@ -494,7 +487,7 @@ func (cli *socketClient) queueRequestAndFlushSync( req *types.Request, ) (*ReqRes, error) { - reqres, err := cli.queueRequest(ctx, req, true) + reqres, err := cli.queueRequest(ctx, req) if err != nil { return nil, queueErr(err) }