@ -279,7 +279,7 @@ func (cli *socketClient) ApplySnapshotChunkAsync(
//----------------------------------------
func ( cli * socketClient ) FlushSync ( ctx context . Context ) error {
reqRes , err := cli . queueRequest ( ctx , types . ToRequestFlush ( ) , true )
reqRes , err := cli . queueRequest ( ctx , types . ToRequestFlush ( ) )
if err != nil {
return queueErr ( err )
}
@ -448,29 +448,22 @@ func (cli *socketClient) ApplySnapshotChunkSync(
//----------------------------------------
// queueRequest enqueues req onto the queue. If the queue is full, it ether
// returns an error (sync=false) or blocks (sync=true).
//
// When sync=true, ctx can be used to break early. When sync=false, ctx will be
// used later to determine if request should be dropped (if ctx.Err is
// non-nil).
// queueRequest enqueues req onto the queue. The request can break early if the
// the context is canceled. If the queue is full, this method blocks to allow
// the request to be placed onto the queue. This has the effect of creating an
// unbounded queue of goroutines waiting to write to this queue which is a bit
// antithetical to the purposes of a queue, however, undoing this behavior has
// dangerous upstream implications as a result of the usage of this behavior upstream.
// Remove at your peril.
//
// The caller is responsible for checking cli.Error.
func ( cli * socketClient ) queueRequest ( ctx context . Context , req * types . Request , sync bool ) ( * ReqRes , error ) {
func ( cli * socketClient ) queueRequest ( ctx context . Context , req * types . Request ) ( * ReqRes , error ) {
reqres := NewReqRes ( req )
if sync {
select {
case cli . reqQueue <- & reqResWithContext { R : reqres , C : context . Background ( ) } :
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
}
} else {
select {
case cli . reqQueue <- & reqResWithContext { R : reqres , C : ctx } :
default :
return nil , errors . New ( "buffer is full" )
}
select {
case cli . reqQueue <- & reqResWithContext { R : reqres , C : ctx } :
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
}
return reqres , nil
@ -481,7 +474,7 @@ func (cli *socketClient) queueRequestAsync(
req * types . Request ,
) ( * ReqRes , error ) {
reqres , err := cli . queueRequest ( ctx , req , false )
reqres , err := cli . queueRequest ( ctx , req )
if err != nil {
return nil , queueErr ( err )
}
@ -494,7 +487,7 @@ func (cli *socketClient) queueRequestAndFlushSync(
req * types . Request ,
) ( * ReqRes , error ) {
reqres , err := cli . queueRequest ( ctx , req , true )
reqres , err := cli . queueRequest ( ctx , req )
if err != nil {
return nil , queueErr ( err )
}