You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

129 lines
3.5 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package abcicli
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/tendermint/abci/types"
  6. cmn "github.com/tendermint/tmlibs/common"
  7. )
  8. const (
  9. dialRetryIntervalSeconds = 3
  10. echoRetryIntervalSeconds = 1
  11. )
  12. // Client defines an interface for an ABCI client.
  13. // All `Async` methods return a `ReqRes` object.
  14. // All `Sync` methods return the appropriate protobuf ResponseXxx struct and an error.
  15. // Note these are client errors, eg. ABCI socket connectivity issues.
  16. // Application-related errors are reflected in response via ABCI error codes and logs.
  17. type Client interface {
  18. cmn.Service
  19. SetResponseCallback(Callback)
  20. Error() error
  21. FlushAsync() *ReqRes
  22. EchoAsync(msg string) *ReqRes
  23. InfoAsync(types.RequestInfo) *ReqRes
  24. SetOptionAsync(types.RequestSetOption) *ReqRes
  25. DeliverTxAsync(tx []byte) *ReqRes
  26. CheckTxAsync(tx []byte) *ReqRes
  27. QueryAsync(types.RequestQuery) *ReqRes
  28. CommitAsync() *ReqRes
  29. InitChainAsync(types.RequestInitChain) *ReqRes
  30. BeginBlockAsync(types.RequestBeginBlock) *ReqRes
  31. EndBlockAsync(types.RequestEndBlock) *ReqRes
  32. FlushSync() error
  33. EchoSync(msg string) (*types.ResponseEcho, error)
  34. InfoSync(types.RequestInfo) (*types.ResponseInfo, error)
  35. SetOptionSync(types.RequestSetOption) (*types.ResponseSetOption, error)
  36. DeliverTxSync(tx []byte) (*types.ResponseDeliverTx, error)
  37. CheckTxSync(tx []byte) (*types.ResponseCheckTx, error)
  38. QuerySync(types.RequestQuery) (*types.ResponseQuery, error)
  39. CommitSync() (*types.ResponseCommit, error)
  40. InitChainSync(types.RequestInitChain) (*types.ResponseInitChain, error)
  41. BeginBlockSync(types.RequestBeginBlock) (*types.ResponseBeginBlock, error)
  42. EndBlockSync(types.RequestEndBlock) (*types.ResponseEndBlock, error)
  43. }
  44. //----------------------------------------
  45. // NewClient returns a new ABCI client of the specified transport type.
  46. // It returns an error if the transport is not "socket" or "grpc"
  47. func NewClient(addr, transport string, mustConnect bool) (client Client, err error) {
  48. switch transport {
  49. case "socket":
  50. client = NewSocketClient(addr, mustConnect)
  51. case "grpc":
  52. client = NewGRPCClient(addr, mustConnect)
  53. default:
  54. err = fmt.Errorf("Unknown abci transport %s", transport)
  55. }
  56. return
  57. }
  58. //----------------------------------------
  59. type Callback func(*types.Request, *types.Response)
  60. //----------------------------------------
  61. type ReqRes struct {
  62. *types.Request
  63. *sync.WaitGroup
  64. *types.Response // Not set atomically, so be sure to use WaitGroup.
  65. mtx sync.Mutex
  66. done bool // Gets set to true once *after* WaitGroup.Done().
  67. cb func(*types.Response) // A single callback that may be set.
  68. }
  69. func NewReqRes(req *types.Request) *ReqRes {
  70. return &ReqRes{
  71. Request: req,
  72. WaitGroup: waitGroup1(),
  73. Response: nil,
  74. done: false,
  75. cb: nil,
  76. }
  77. }
  78. // Sets the callback for this ReqRes atomically.
  79. // If reqRes is already done, calls cb immediately.
  80. // NOTE: reqRes.cb should not change if reqRes.done.
  81. // NOTE: only one callback is supported.
  82. func (reqRes *ReqRes) SetCallback(cb func(res *types.Response)) {
  83. reqRes.mtx.Lock()
  84. if reqRes.done {
  85. reqRes.mtx.Unlock()
  86. cb(reqRes.Response)
  87. return
  88. }
  89. defer reqRes.mtx.Unlock()
  90. reqRes.cb = cb
  91. }
  92. func (reqRes *ReqRes) GetCallback() func(*types.Response) {
  93. reqRes.mtx.Lock()
  94. defer reqRes.mtx.Unlock()
  95. return reqRes.cb
  96. }
  97. // NOTE: it should be safe to read reqRes.cb without locks after this.
  98. func (reqRes *ReqRes) SetDone() {
  99. reqRes.mtx.Lock()
  100. reqRes.done = true
  101. reqRes.mtx.Unlock()
  102. }
  103. func waitGroup1() (wg *sync.WaitGroup) {
  104. wg = &sync.WaitGroup{}
  105. wg.Add(1)
  106. return
  107. }