diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 383c69a2f..07e73418e 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -26,4 +26,6 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES +- [abci/grpc] \#5520 Return async responses in order, to avoid mempool panics. (@erikgrinaker) + - [types] \#5523 Change json naming of `PartSetHeader` within `BlockID` from `parts` to `part_set_header` (@marbar3778) diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index feeef7c8b..d97a88a4b 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -22,8 +22,9 @@ type grpcClient struct { service.BaseService mustConnect bool - client types.ABCIApplicationClient - conn *grpc.ClientConn + client types.ABCIApplicationClient + conn *grpc.ClientConn + chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool mtx tmsync.Mutex addr string @@ -35,6 +36,13 @@ func NewGRPCClient(addr string, mustConnect bool) Client { cli := &grpcClient{ addr: addr, mustConnect: mustConnect, + // Buffering the channel is needed to make calls appear asynchronous, + // which is required when the caller makes multiple async calls before + // processing callbacks (e.g. due to holding locks). 64 means that a + // caller can make up to 64 async calls before a callback must be + // processed (otherwise it deadlocks). It also means that we can make 64 + // gRPC calls while processing a slow callback at the channel head. + chReqRes: make(chan *ReqRes, 64), } cli.BaseService = *service.NewBaseService(nil, "grpcClient", cli) return cli @@ -48,6 +56,34 @@ func (cli *grpcClient) OnStart() error { if err := cli.BaseService.OnStart(); err != nil { return err } + + // This processes asynchronous request/response messages and dispatches + // them to callbacks. + go func() { + // Use a separate function to use defer for mutex unlocks (this handles panics) + callCb := func(reqres *ReqRes) { + cli.mtx.Lock() + defer cli.mtx.Unlock() + + // Notify client listener if set + if cli.resCb != nil { + cli.resCb(reqres.Request, reqres.Response) + } + + // Notify reqRes listener if set + if cb := reqres.GetCallback(); cb != nil { + cb(reqres.Response) + } + } + for reqres := range cli.chReqRes { + if reqres != nil { + callCb(reqres) + } else { + cli.Logger.Error("Received nil reqres") + } + } + }() + RETRY_LOOP: for { conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc)) @@ -85,6 +121,7 @@ func (cli *grpcClient) OnStop() { if cli.conn != nil { cli.conn.Close() } + close(cli.chReqRes) } func (cli *grpcClient) StopForError(err error) { @@ -254,26 +291,10 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshot func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes { reqres := NewReqRes(req) - reqres.Response = res // Set response - reqres.Done() // Release waiters - reqres.SetDone() // so reqRes.SetCallback will run the callback - - // goroutine for callbacks - go func() { - cli.mtx.Lock() - defer cli.mtx.Unlock() - - // Notify client listener if set - if cli.resCb != nil { - cli.resCb(reqres.Request, res) - } - - // Notify reqRes listener if set - if cb := reqres.GetCallback(); cb != nil { - cb(res) - } - }() - + reqres.Response = res // Set response + reqres.Done() // Release waiters + reqres.SetDone() // so reqRes.SetCallback will run the callback + cli.chReqRes <- reqres // use channel for async responses, since they must be ordered return reqres }