Browse Source

abci: undo socket buffer limit (#7877)

This change changes the ABCI socket client to allow goroutines to block writing to the internal queue. This has the effect ensuring that callers of the ABCI methods do not error on a full internal queue at the expense of allowing the number of goroutines waiting on this internal queue to grow in an unbounded fashion. This tradeoff seems preferable since it allows callers of the ABCI methods to be certain that a request that was made will reach the application if it is available.

Closes: #7827 

This change was initially implemented here: e13b4386ff and never landed on v0.34, only v0.35+
pull/7880/head
William Banfield 2 years ago
committed by GitHub
parent
commit
7f8f1cde8c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 14 additions and 21 deletions
  1. +14
    -21
      abci/client/socket_client.go

+ 14
- 21
abci/client/socket_client.go View File

@ -207,7 +207,7 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {
//----------------------------------------
func (cli *socketClient) Flush(ctx context.Context) error {
reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush(), true)
reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush())
if err != nil {
return queueErr(err)
}
@ -389,29 +389,22 @@ func (cli *socketClient) FinalizeBlock(
//----------------------------------------
// queueRequest enqueues req onto the queue. If the queue is full, it ether
// returns an error (sync=false) or blocks (sync=true).
//
// When sync=true, ctx can be used to break early. When sync=false, ctx will be
// used later to determine if request should be dropped (if ctx.Err is
// non-nil).
// queueRequest enqueues req onto the queue. The request can break early if the
// the context is canceled. If the queue is full, this method blocks to allow
// the request to be placed onto the queue. This has the effect of creating an
// unbounded queue of goroutines waiting to write to this queue which is a bit
// antithetical to the purposes of a queue, however, undoing this behavior has
// dangerous upstream implications as a result of the usage of this behavior upstream.
// Remove at your peril.
//
// The caller is responsible for checking cli.Error.
func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request, sync bool) (*ReqRes, error) {
func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) {
reqres := NewReqRes(req)
if sync {
select {
case cli.reqQueue <- reqres:
case <-ctx.Done():
return nil, ctx.Err()
}
} else {
select {
case cli.reqQueue <- reqres:
default:
return nil, errors.New("buffer is full")
}
select {
case cli.reqQueue <- reqres:
case <-ctx.Done():
return nil, ctx.Err()
}
return reqres, nil
@ -422,7 +415,7 @@ func (cli *socketClient) queueRequestAndFlush(
req *types.Request,
) (*ReqRes, error) {
reqres, err := cli.queueRequest(ctx, req, true)
reqres, err := cli.queueRequest(ctx, req)
if err != nil {
return nil, queueErr(err)
}


Loading…
Cancel
Save