diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index d6477062c..8a4598fe5 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -13,7 +13,6 @@ import ( "github.com/tendermint/tendermint/abci/types" tmsync "github.com/tendermint/tendermint/internal/libs/sync" - "github.com/tendermint/tendermint/internal/libs/timer" tmnet "github.com/tendermint/tendermint/libs/net" "github.com/tendermint/tendermint/libs/service" ) @@ -22,8 +21,6 @@ const ( // reqQueueSize is the max number of queued async requests. // (memory: 256MB max assuming 1MB transactions) reqQueueSize = 256 - // Don't wait longer than... - flushThrottleMS = 20 ) type reqResWithContext struct { @@ -40,8 +37,7 @@ type socketClient struct { mustConnect bool conn net.Conn - reqQueue chan *reqResWithContext - flushTimer *timer.ThrottleTimer + reqQueue chan *reqResWithContext mtx tmsync.RWMutex err error @@ -57,7 +53,6 @@ var _ Client = (*socketClient)(nil) func NewSocketClient(addr string, mustConnect bool) Client { cli := &socketClient{ reqQueue: make(chan *reqResWithContext, reqQueueSize), - flushTimer: timer.NewThrottleTimer("socketClient", flushThrottleMS), mustConnect: mustConnect, addr: addr, @@ -102,8 +97,7 @@ func (cli *socketClient) OnStop() { cli.conn.Close() } - cli.flushQueue() - cli.flushTimer.Stop() + cli.drainQueue() } // Error returns an error if the client was stopped abruptly. @@ -126,38 +120,25 @@ func (cli *socketClient) SetResponseCallback(resCb Callback) { //---------------------------------------- func (cli *socketClient) sendRequestsRoutine(conn io.Writer) { - w := bufio.NewWriter(conn) + bw := bufio.NewWriter(conn) for { select { case reqres := <-cli.reqQueue: - // cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) - if reqres.C.Err() != nil { cli.Logger.Debug("Request's context is done", "req", reqres.R, "err", reqres.C.Err()) continue } - cli.willSendReq(reqres.R) - err := types.WriteMessage(reqres.R.Request, w) - if err != nil { + + if err := types.WriteMessage(reqres.R.Request, bw); err != nil { cli.stopForError(fmt.Errorf("write to buffer: %w", err)) return } - - // If it's a flush request, flush the current buffer. - if _, ok := reqres.R.Request.Value.(*types.Request_Flush); ok { - err = w.Flush() - if err != nil { - cli.stopForError(fmt.Errorf("flush buffer: %w", err)) - return - } - } - case <-cli.flushTimer.Ch: // flush queue - select { - case cli.reqQueue <- &reqResWithContext{R: NewReqRes(types.ToRequestFlush()), C: context.Background()}: - default: - // Probably will fill the buffer, or retry later. + if err := bw.Flush(); err != nil { + cli.stopForError(fmt.Errorf("flush buffer: %w", err)) + return } + case <-cli.Quit(): return } @@ -492,14 +473,6 @@ func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request, s } } - // Maybe auto-flush, or unset auto-flush - switch req.Value.(type) { - case *types.Request_Flush: - cli.flushTimer.Unset() - default: - cli.flushTimer.Set() - } - return reqres, nil } @@ -537,7 +510,9 @@ func queueErr(e error) error { return fmt.Errorf("can't queue req: %w", e) } -func (cli *socketClient) flushQueue() { +// drainQueue marks as complete and discards all remaining pending requests +// from the queue. +func (cli *socketClient) drainQueue() { cli.mtx.Lock() defer cli.mtx.Unlock() @@ -547,14 +522,17 @@ func (cli *socketClient) flushQueue() { reqres.Done() } - // mark all queued messages as resolved -LOOP: + // Mark all queued messages as resolved. + // + // TODO(creachadair): We can't simply range the channel, because it is never + // closed, and the writer continues to add work. + // See https://github.com/tendermint/tendermint/issues/6996. for { select { case reqres := <-cli.reqQueue: reqres.R.Done() default: - break LOOP + return } } } diff --git a/abci/server/socket_server.go b/abci/server/socket_server.go index 543b444b1..85539645b 100644 --- a/abci/server/socket_server.go +++ b/abci/server/socket_server.go @@ -240,22 +240,15 @@ func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types // Pull responses from 'responses' and write them to conn. func (s *SocketServer) handleResponses(closeConn chan error, conn io.Writer, responses <-chan *types.Response) { - var count int - var bufWriter = bufio.NewWriter(conn) - for { - var res = <-responses - err := types.WriteMessage(res, bufWriter) - if err != nil { + bw := bufio.NewWriter(conn) + for res := range responses { + if err := types.WriteMessage(res, bw); err != nil { closeConn <- fmt.Errorf("error writing message: %w", err) return } - if _, ok := res.Value.(*types.Response_Flush); ok { - err = bufWriter.Flush() - if err != nil { - closeConn <- fmt.Errorf("error flushing write buffer: %w", err) - return - } + if err := bw.Flush(); err != nil { + closeConn <- fmt.Errorf("error flushing write buffer: %w", err) + return } - count++ } }