Browse Source

abci/grpc: return async responses in order (#5520) (#5531)

Fixes #5439. This is really a workaround for #5519 (unless we require async implementations to return ordered responses, but that kind of defeats the purpose of having an async API).
pull/5539/head
Erik Grinaker 4 years ago
committed by GitHub
parent
commit
29ca7de63c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 22 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +43
    -22
      abci/client/grpc_client.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -25,3 +25,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [statesync] \#5516 Check that all heights necessary to rebuild state for a snapshot exist before adding the snapshot to the pool. (@erikgrinaker)
### BUG FIXES
- [abci/grpc] \#5520 Return async responses in order, to avoid mempool panics. (@erikgrinaker)

+ 43
- 22
abci/client/grpc_client.go View File

@ -22,8 +22,9 @@ type grpcClient struct {
service.BaseService
mustConnect bool
client types.ABCIApplicationClient
conn *grpc.ClientConn
client types.ABCIApplicationClient
conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool
mtx tmsync.Mutex
addr string
@ -35,6 +36,13 @@ func NewGRPCClient(addr string, mustConnect bool) Client {
cli := &grpcClient{
addr: addr,
mustConnect: mustConnect,
// Buffering the channel is needed to make calls appear asynchronous,
// which is required when the caller makes multiple async calls before
// processing callbacks (e.g. due to holding locks). 64 means that a
// caller can make up to 64 async calls before a callback must be
// processed (otherwise it deadlocks). It also means that we can make 64
// gRPC calls while processing a slow callback at the channel head.
chReqRes: make(chan *ReqRes, 64),
}
cli.BaseService = *service.NewBaseService(nil, "grpcClient", cli)
return cli
@ -48,6 +56,34 @@ func (cli *grpcClient) OnStart() error {
if err := cli.BaseService.OnStart(); err != nil {
return err
}
// This processes asynchronous request/response messages and dispatches
// them to callbacks.
go func() {
// Use a separate function to use defer for mutex unlocks (this handles panics)
callCb := func(reqres *ReqRes) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, reqres.Response)
}
// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(reqres.Response)
}
}
for reqres := range cli.chReqRes {
if reqres != nil {
callCb(reqres)
} else {
cli.Logger.Error("Received nil reqres")
}
}
}()
RETRY_LOOP:
for {
conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc))
@ -85,6 +121,7 @@ func (cli *grpcClient) OnStop() {
if cli.conn != nil {
cli.conn.Close()
}
close(cli.chReqRes)
}
func (cli *grpcClient) StopForError(err error) {
@ -263,26 +300,10 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshot
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
reqres := NewReqRes(req)
reqres.Response = res // Set response
reqres.Done() // Release waiters
reqres.SetDone() // so reqRes.SetCallback will run the callback
// goroutine for callbacks
go func() {
cli.mtx.Lock()
defer cli.mtx.Unlock()
// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, res)
}
// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(res)
}
}()
reqres.Response = res // Set response
reqres.Done() // Release waiters
reqres.SetDone() // so reqRes.SetCallback will run the callback
cli.chReqRes <- reqres // use channel for async responses, since they must be ordered
return reqres
}


Loading…
Cancel
Save