diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index 265b55532..0f3aa75a5 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -3,6 +3,7 @@ package abcicli import ( "fmt" "net" + "sync" "time" "golang.org/x/net/context" @@ -65,6 +66,9 @@ func (cli *grpcClient) OnStart() error { cli.mtx.Lock() defer cli.mtx.Unlock() + reqres.SetDone() + reqres.Done() + // Notify client listener if set if cli.resCb != nil { cli.resCb(reqres.Request, reqres.Response) @@ -298,15 +302,43 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshot return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_ApplySnapshotChunk{ApplySnapshotChunk: res}}) } +// finishAsyncCall creates a ReqRes for an async call, and immediately populates it +// with the response. We don't complete it until it's been ordered via the channel. func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes { reqres := NewReqRes(req) - reqres.Response = res // Set response - reqres.Done() // Release waiters - reqres.SetDone() // so reqRes.SetCallback will run the callback + reqres.Response = res cli.chReqRes <- reqres // use channel for async responses, since they must be ordered return reqres } +// finishSyncCall waits for an async call to complete. It is necessary to call all +// sync calls asynchronously as well, to maintain call and response ordering via +// the channel, and this method will wait until the async call completes. +func (cli *grpcClient) finishSyncCall(reqres *ReqRes) *types.Response { + // It's possible that the callback is called twice, since the callback can + // be called immediately on SetCallback() in addition to after it has been + // set. This is because completing the ReqRes happens in a separate critical + // section from the one where the callback is called: there is a race where + // SetCallback() is called between completing the ReqRes and dispatching the + // callback. + // + // We also buffer the channel with 1 response, since SetCallback() will be + // called synchronously if the reqres is already completed, in which case + // it will block on sending to the channel since it hasn't gotten around to + // receiving from it yet. + // + // ReqRes should really handle callback dispatch internally, to guarantee + // that it's only called once and avoid the above race conditions. + var once sync.Once + ch := make(chan *types.Response, 1) + reqres.SetCallback(func(res *types.Response) { + once.Do(func() { + ch <- res + }) + }) + return <-ch +} + //---------------------------------------- func (cli *grpcClient) FlushSync() error { @@ -316,12 +348,12 @@ func (cli *grpcClient) FlushSync() error { func (cli *grpcClient) EchoSync(msg string) (*types.ResponseEcho, error) { reqres := cli.EchoAsync(msg) // StopForError should already have been called if error is set - return reqres.Response.GetEcho(), cli.Error() + return cli.finishSyncCall(reqres).GetEcho(), cli.Error() } func (cli *grpcClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { reqres := cli.InfoAsync(req) - return reqres.Response.GetInfo(), cli.Error() + return cli.finishSyncCall(reqres).GetInfo(), cli.Error() } func (cli *grpcClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) { @@ -331,57 +363,57 @@ func (cli *grpcClient) SetOptionSync(req types.RequestSetOption) (*types.Respons func (cli *grpcClient) DeliverTxSync(params types.RequestDeliverTx) (*types.ResponseDeliverTx, error) { reqres := cli.DeliverTxAsync(params) - return reqres.Response.GetDeliverTx(), cli.Error() + return cli.finishSyncCall(reqres).GetDeliverTx(), cli.Error() } func (cli *grpcClient) CheckTxSync(params types.RequestCheckTx) (*types.ResponseCheckTx, error) { reqres := cli.CheckTxAsync(params) - return reqres.Response.GetCheckTx(), cli.Error() + return cli.finishSyncCall(reqres).GetCheckTx(), cli.Error() } func (cli *grpcClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) { reqres := cli.QueryAsync(req) - return reqres.Response.GetQuery(), cli.Error() + return cli.finishSyncCall(reqres).GetQuery(), cli.Error() } func (cli *grpcClient) CommitSync() (*types.ResponseCommit, error) { reqres := cli.CommitAsync() - return reqres.Response.GetCommit(), cli.Error() + return cli.finishSyncCall(reqres).GetCommit(), cli.Error() } func (cli *grpcClient) InitChainSync(params types.RequestInitChain) (*types.ResponseInitChain, error) { reqres := cli.InitChainAsync(params) - return reqres.Response.GetInitChain(), cli.Error() + return cli.finishSyncCall(reqres).GetInitChain(), cli.Error() } func (cli *grpcClient) BeginBlockSync(params types.RequestBeginBlock) (*types.ResponseBeginBlock, error) { reqres := cli.BeginBlockAsync(params) - return reqres.Response.GetBeginBlock(), cli.Error() + return cli.finishSyncCall(reqres).GetBeginBlock(), cli.Error() } func (cli *grpcClient) EndBlockSync(params types.RequestEndBlock) (*types.ResponseEndBlock, error) { reqres := cli.EndBlockAsync(params) - return reqres.Response.GetEndBlock(), cli.Error() + return cli.finishSyncCall(reqres).GetEndBlock(), cli.Error() } func (cli *grpcClient) ListSnapshotsSync(params types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { reqres := cli.ListSnapshotsAsync(params) - return reqres.Response.GetListSnapshots(), cli.Error() + return cli.finishSyncCall(reqres).GetListSnapshots(), cli.Error() } func (cli *grpcClient) OfferSnapshotSync(params types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { reqres := cli.OfferSnapshotAsync(params) - return reqres.Response.GetOfferSnapshot(), cli.Error() + return cli.finishSyncCall(reqres).GetOfferSnapshot(), cli.Error() } func (cli *grpcClient) LoadSnapshotChunkSync( params types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { reqres := cli.LoadSnapshotChunkAsync(params) - return reqres.Response.GetLoadSnapshotChunk(), cli.Error() + return cli.finishSyncCall(reqres).GetLoadSnapshotChunk(), cli.Error() } func (cli *grpcClient) ApplySnapshotChunkSync( params types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { reqres := cli.ApplySnapshotChunkAsync(params) - return reqres.Response.GetApplySnapshotChunk(), cli.Error() + return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error() }