package abcicli import ( "fmt" "sync" "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/service" tmsync "github.com/tendermint/tendermint/libs/sync" ) const ( dialRetryIntervalSeconds = 3 echoRetryIntervalSeconds = 1 ) //go:generate mockery --case underscore --name Client // Client defines an interface for an ABCI client. // All `Async` methods return a `ReqRes` object. // All `Sync` methods return the appropriate protobuf ResponseXxx struct and an error. // Note these are client errors, eg. ABCI socket connectivity issues. // Application-related errors are reflected in response via ABCI error codes and logs. type Client interface { service.Service SetResponseCallback(Callback) Error() error FlushAsync() *ReqRes EchoAsync(msg string) *ReqRes InfoAsync(types.RequestInfo) *ReqRes DeliverTxAsync(types.RequestDeliverTx) *ReqRes CheckTxAsync(types.RequestCheckTx) *ReqRes QueryAsync(types.RequestQuery) *ReqRes CommitAsync() *ReqRes InitChainAsync(types.RequestInitChain) *ReqRes BeginBlockAsync(types.RequestBeginBlock) *ReqRes EndBlockAsync(types.RequestEndBlock) *ReqRes ListSnapshotsAsync(types.RequestListSnapshots) *ReqRes OfferSnapshotAsync(types.RequestOfferSnapshot) *ReqRes LoadSnapshotChunkAsync(types.RequestLoadSnapshotChunk) *ReqRes ApplySnapshotChunkAsync(types.RequestApplySnapshotChunk) *ReqRes FlushSync() error EchoSync(msg string) (*types.ResponseEcho, error) InfoSync(types.RequestInfo) (*types.ResponseInfo, error) DeliverTxSync(types.RequestDeliverTx) (*types.ResponseDeliverTx, error) CheckTxSync(types.RequestCheckTx) (*types.ResponseCheckTx, error) QuerySync(types.RequestQuery) (*types.ResponseQuery, error) CommitSync() (*types.ResponseCommit, error) InitChainSync(types.RequestInitChain) (*types.ResponseInitChain, error) BeginBlockSync(types.RequestBeginBlock) (*types.ResponseBeginBlock, error) EndBlockSync(types.RequestEndBlock) (*types.ResponseEndBlock, error) ListSnapshotsSync(types.RequestListSnapshots) (*types.ResponseListSnapshots, error) OfferSnapshotSync(types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) LoadSnapshotChunkSync(types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) ApplySnapshotChunkSync(types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) } //---------------------------------------- // NewClient returns a new ABCI client of the specified transport type. // It returns an error if the transport is not "socket" or "grpc" func NewClient(addr, transport string, mustConnect bool) (client Client, err error) { switch transport { case "socket": client = NewSocketClient(addr, mustConnect) case "grpc": client = NewGRPCClient(addr, mustConnect) default: err = fmt.Errorf("unknown abci transport %s", transport) } return } //---------------------------------------- type Callback func(*types.Request, *types.Response) //---------------------------------------- type ReqRes struct { *types.Request *sync.WaitGroup *types.Response // Not set atomically, so be sure to use WaitGroup. mtx tmsync.Mutex done bool // Gets set to true once *after* WaitGroup.Done(). cb func(*types.Response) // A single callback that may be set. } func NewReqRes(req *types.Request) *ReqRes { return &ReqRes{ Request: req, WaitGroup: waitGroup1(), Response: nil, done: false, cb: nil, } } // Sets the callback for this ReqRes atomically. // If reqRes is already done, calls cb immediately. // NOTE: reqRes.cb should not change if reqRes.done. // NOTE: only one callback is supported. func (reqRes *ReqRes) SetCallback(cb func(res *types.Response)) { reqRes.mtx.Lock() if reqRes.done { reqRes.mtx.Unlock() cb(reqRes.Response) return } reqRes.cb = cb reqRes.mtx.Unlock() } func (reqRes *ReqRes) GetCallback() func(*types.Response) { reqRes.mtx.Lock() defer reqRes.mtx.Unlock() return reqRes.cb } // NOTE: it should be safe to read reqRes.cb without locks after this. func (reqRes *ReqRes) SetDone() { reqRes.mtx.Lock() reqRes.done = true reqRes.mtx.Unlock() } func waitGroup1() (wg *sync.WaitGroup) { wg = &sync.WaitGroup{} wg.Add(1) return }