diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 0156b7f4b..22416fc3f 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -25,3 +25,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [statesync] \#5516 Check that all heights necessary to rebuild state for a snapshot exist before adding the snapshot to the pool. (@erikgrinaker) ### BUG FIXES + +- [abci/grpc] \#5520 Return async responses in order, to avoid mempool panics. (@erikgrinaker) diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index da84bb77d..265b55532 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -22,8 +22,9 @@ type grpcClient struct { service.BaseService mustConnect bool - client types.ABCIApplicationClient - conn *grpc.ClientConn + client types.ABCIApplicationClient + conn *grpc.ClientConn + chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool mtx tmsync.Mutex addr string @@ -35,6 +36,13 @@ func NewGRPCClient(addr string, mustConnect bool) Client { cli := &grpcClient{ addr: addr, mustConnect: mustConnect, + // Buffering the channel is needed to make calls appear asynchronous, + // which is required when the caller makes multiple async calls before + // processing callbacks (e.g. due to holding locks). 64 means that a + // caller can make up to 64 async calls before a callback must be + // processed (otherwise it deadlocks). It also means that we can make 64 + // gRPC calls while processing a slow callback at the channel head. + chReqRes: make(chan *ReqRes, 64), } cli.BaseService = *service.NewBaseService(nil, "grpcClient", cli) return cli @@ -48,6 +56,34 @@ func (cli *grpcClient) OnStart() error { if err := cli.BaseService.OnStart(); err != nil { return err } + + // This processes asynchronous request/response messages and dispatches + // them to callbacks. + go func() { + // Use a separate function to use defer for mutex unlocks (this handles panics) + callCb := func(reqres *ReqRes) { + cli.mtx.Lock() + defer cli.mtx.Unlock() + + // Notify client listener if set + if cli.resCb != nil { + cli.resCb(reqres.Request, reqres.Response) + } + + // Notify reqRes listener if set + if cb := reqres.GetCallback(); cb != nil { + cb(reqres.Response) + } + } + for reqres := range cli.chReqRes { + if reqres != nil { + callCb(reqres) + } else { + cli.Logger.Error("Received nil reqres") + } + } + }() + RETRY_LOOP: for { conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc)) @@ -85,6 +121,7 @@ func (cli *grpcClient) OnStop() { if cli.conn != nil { cli.conn.Close() } + close(cli.chReqRes) } func (cli *grpcClient) StopForError(err error) { @@ -263,26 +300,10 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshot func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes { reqres := NewReqRes(req) - reqres.Response = res // Set response - reqres.Done() // Release waiters - reqres.SetDone() // so reqRes.SetCallback will run the callback - - // goroutine for callbacks - go func() { - cli.mtx.Lock() - defer cli.mtx.Unlock() - - // Notify client listener if set - if cli.resCb != nil { - cli.resCb(reqres.Request, res) - } - - // Notify reqRes listener if set - if cb := reqres.GetCallback(); cb != nil { - cb(res) - } - }() - + reqres.Response = res // Set response + reqres.Done() // Release waiters + reqres.SetDone() // so reqRes.SetCallback will run the callback + cli.chReqRes <- reqres // use channel for async responses, since they must be ordered return reqres }