@ -3,6 +3,7 @@ package abcicli
import (
"fmt"
"net"
"sync"
"time"
"golang.org/x/net/context"
@ -65,6 +66,9 @@ func (cli *grpcClient) OnStart() error {
cli . mtx . Lock ( )
defer cli . mtx . Unlock ( )
reqres . SetDone ( )
reqres . Done ( )
// Notify client listener if set
if cli . resCb != nil {
cli . resCb ( reqres . Request , reqres . Response )
@ -298,15 +302,43 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshot
return cli . finishAsyncCall ( req , & types . Response { Value : & types . Response_ApplySnapshotChunk { ApplySnapshotChunk : res } } )
}
// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
// with the response. We don't complete it until it's been ordered via the channel.
func ( cli * grpcClient ) finishAsyncCall ( req * types . Request , res * types . Response ) * ReqRes {
reqres := NewReqRes ( req )
reqres . Response = res // Set response
reqres . Done ( ) // Release waiters
reqres . SetDone ( ) // so reqRes.SetCallback will run the callback
reqres . Response = res
cli . chReqRes <- reqres // use channel for async responses, since they must be ordered
return reqres
}
// finishSyncCall waits for an async call to complete. It is necessary to call all
// sync calls asynchronously as well, to maintain call and response ordering via
// the channel, and this method will wait until the async call completes.
func ( cli * grpcClient ) finishSyncCall ( reqres * ReqRes ) * types . Response {
// It's possible that the callback is called twice, since the callback can
// be called immediately on SetCallback() in addition to after it has been
// set. This is because completing the ReqRes happens in a separate critical
// section from the one where the callback is called: there is a race where
// SetCallback() is called between completing the ReqRes and dispatching the
// callback.
//
// We also buffer the channel with 1 response, since SetCallback() will be
// called synchronously if the reqres is already completed, in which case
// it will block on sending to the channel since it hasn't gotten around to
// receiving from it yet.
//
// ReqRes should really handle callback dispatch internally, to guarantee
// that it's only called once and avoid the above race conditions.
var once sync . Once
ch := make ( chan * types . Response , 1 )
reqres . SetCallback ( func ( res * types . Response ) {
once . Do ( func ( ) {
ch <- res
} )
} )
return <- ch
}
//----------------------------------------
func ( cli * grpcClient ) FlushSync ( ) error {
@ -316,12 +348,12 @@ func (cli *grpcClient) FlushSync() error {
func ( cli * grpcClient ) EchoSync ( msg string ) ( * types . ResponseEcho , error ) {
reqres := cli . EchoAsync ( msg )
// StopForError should already have been called if error is set
return reqres . Response . GetEcho ( ) , cli . Error ( )
return cli . finishSyncCall ( reqres ) . GetEcho ( ) , cli . Error ( )
}
func ( cli * grpcClient ) InfoSync ( req types . RequestInfo ) ( * types . ResponseInfo , error ) {
reqres := cli . InfoAsync ( req )
return reqres . Response . GetInfo ( ) , cli . Error ( )
return cli . finishSyncCall ( reqres ) . GetInfo ( ) , cli . Error ( )
}
func ( cli * grpcClient ) SetOptionSync ( req types . RequestSetOption ) ( * types . ResponseSetOption , error ) {
@ -331,57 +363,57 @@ func (cli *grpcClient) SetOptionSync(req types.RequestSetOption) (*types.Respons
func ( cli * grpcClient ) DeliverTxSync ( params types . RequestDeliverTx ) ( * types . ResponseDeliverTx , error ) {
reqres := cli . DeliverTxAsync ( params )
return reqres . Response . GetDeliverTx ( ) , cli . Error ( )
return cli . finishSyncCall ( reqres ) . GetDeliverTx ( ) , cli . Error ( )
}
func ( cli * grpcClient ) CheckTxSync ( params types . RequestCheckTx ) ( * types . ResponseCheckTx , error ) {
reqres := cli . CheckTxAsync ( params )
return reqres . Response . GetCheckTx ( ) , cli . Error ( )
return cli . finishSyncCall ( reqres ) . GetCheckTx ( ) , cli . Error ( )
}
func ( cli * grpcClient ) QuerySync ( req types . RequestQuery ) ( * types . ResponseQuery , error ) {
reqres := cli . QueryAsync ( req )
return reqres . Response . GetQuery ( ) , cli . Error ( )
return cli . finishSyncCall ( reqres ) . GetQuery ( ) , cli . Error ( )
}
func ( cli * grpcClient ) CommitSync ( ) ( * types . ResponseCommit , error ) {
reqres := cli . CommitAsync ( )
return reqres . Response . GetCommit ( ) , cli . Error ( )
return cli . finishSyncCall ( reqres ) . GetCommit ( ) , cli . Error ( )
}
func ( cli * grpcClient ) InitChainSync ( params types . RequestInitChain ) ( * types . ResponseInitChain , error ) {
reqres := cli . InitChainAsync ( params )
return reqres . Response . GetInitChain ( ) , cli . Error ( )
return cli . finishSyncCall ( reqres ) . GetInitChain ( ) , cli . Error ( )
}
func ( cli * grpcClient ) BeginBlockSync ( params types . RequestBeginBlock ) ( * types . ResponseBeginBlock , error ) {
reqres := cli . BeginBlockAsync ( params )
return reqres . Response . GetBeginBlock ( ) , cli . Error ( )
return cli . finishSyncCall ( reqres ) . GetBeginBlock ( ) , cli . Error ( )
}
func ( cli * grpcClient ) EndBlockSync ( params types . RequestEndBlock ) ( * types . ResponseEndBlock , error ) {
reqres := cli . EndBlockAsync ( params )
return reqres . Response . GetEndBlock ( ) , cli . Error ( )
return cli . finishSyncCall ( reqres ) . GetEndBlock ( ) , cli . Error ( )
}
func ( cli * grpcClient ) ListSnapshotsSync ( params types . RequestListSnapshots ) ( * types . ResponseListSnapshots , error ) {
reqres := cli . ListSnapshotsAsync ( params )
return reqres . Response . GetListSnapshots ( ) , cli . Error ( )
return cli . finishSyncCall ( reqres ) . GetListSnapshots ( ) , cli . Error ( )
}
func ( cli * grpcClient ) OfferSnapshotSync ( params types . RequestOfferSnapshot ) ( * types . ResponseOfferSnapshot , error ) {
reqres := cli . OfferSnapshotAsync ( params )
return reqres . Response . GetOfferSnapshot ( ) , cli . Error ( )
return cli . finishSyncCall ( reqres ) . GetOfferSnapshot ( ) , cli . Error ( )
}
func ( cli * grpcClient ) LoadSnapshotChunkSync (
params types . RequestLoadSnapshotChunk ) ( * types . ResponseLoadSnapshotChunk , error ) {
reqres := cli . LoadSnapshotChunkAsync ( params )
return reqres . Response . GetLoadSnapshotChunk ( ) , cli . Error ( )
return cli . finishSyncCall ( reqres ) . GetLoadSnapshotChunk ( ) , cli . Error ( )
}
func ( cli * grpcClient ) ApplySnapshotChunkSync (
params types . RequestApplySnapshotChunk ) ( * types . ResponseApplySnapshotChunk , error ) {
reqres := cli . ApplySnapshotChunkAsync ( params )
return reqres . Response . GetApplySnapshotChunk ( ) , cli . Error ( )
return cli . finishSyncCall ( reqres ) . GetApplySnapshotChunk ( ) , cli . Error ( )
}