You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

121 lines
3.0 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package abcicli
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/tendermint/abci/types"
  6. cmn "github.com/tendermint/tmlibs/common"
  7. )
  8. type Client interface {
  9. cmn.Service
  10. SetResponseCallback(Callback)
  11. Error() error
  12. FlushAsync() *ReqRes
  13. EchoAsync(msg string) *ReqRes
  14. InfoAsync() *ReqRes
  15. SetOptionAsync(key string, value string) *ReqRes
  16. DeliverTxAsync(tx []byte) *ReqRes
  17. CheckTxAsync(tx []byte) *ReqRes
  18. QueryAsync(reqQuery types.RequestQuery) *ReqRes
  19. CommitAsync() *ReqRes
  20. FlushSync() error
  21. EchoSync(msg string) (res types.Result)
  22. InfoSync() (resInfo types.ResponseInfo, err error)
  23. SetOptionSync(key string, value string) (res types.Result)
  24. DeliverTxSync(tx []byte) (res types.Result)
  25. CheckTxSync(tx []byte) (res types.Result)
  26. QuerySync(reqQuery types.RequestQuery) (resQuery types.ResponseQuery, err error)
  27. CommitSync() (res types.Result)
  28. InitChainAsync(validators []*types.Validator) *ReqRes
  29. BeginBlockAsync(hash []byte, header *types.Header) *ReqRes
  30. EndBlockAsync(height uint64) *ReqRes
  31. InitChainSync(validators []*types.Validator) (err error)
  32. BeginBlockSync(hash []byte, header *types.Header) (err error)
  33. EndBlockSync(height uint64) (resEndBlock types.ResponseEndBlock, err error)
  34. }
  35. //----------------------------------------
  36. // NewClient returns a new ABCI client of the specified transport type.
  37. // It returns an error if the transport is not "socket" or "grpc"
  38. func NewClient(addr, transport string, mustConnect bool) (client Client, err error) {
  39. switch transport {
  40. case "socket":
  41. client = NewSocketClient(addr, mustConnect)
  42. case "grpc":
  43. client = NewGRPCClient(addr, mustConnect)
  44. default:
  45. err = fmt.Errorf("Unknown abci transport %s", transport)
  46. }
  47. return
  48. }
  49. //----------------------------------------
  50. type Callback func(*types.Request, *types.Response)
  51. //----------------------------------------
  52. type ReqRes struct {
  53. *types.Request
  54. *sync.WaitGroup
  55. *types.Response // Not set atomically, so be sure to use WaitGroup.
  56. mtx sync.Mutex
  57. done bool // Gets set to true once *after* WaitGroup.Done().
  58. cb func(*types.Response) // A single callback that may be set.
  59. }
  60. func NewReqRes(req *types.Request) *ReqRes {
  61. return &ReqRes{
  62. Request: req,
  63. WaitGroup: waitGroup1(),
  64. Response: nil,
  65. done: false,
  66. cb: nil,
  67. }
  68. }
  69. // Sets the callback for this ReqRes atomically.
  70. // If reqRes is already done, calls cb immediately.
  71. // NOTE: reqRes.cb should not change if reqRes.done.
  72. // NOTE: only one callback is supported.
  73. func (reqRes *ReqRes) SetCallback(cb func(res *types.Response)) {
  74. reqRes.mtx.Lock()
  75. if reqRes.done {
  76. reqRes.mtx.Unlock()
  77. cb(reqRes.Response)
  78. return
  79. }
  80. defer reqRes.mtx.Unlock()
  81. reqRes.cb = cb
  82. }
  83. func (reqRes *ReqRes) GetCallback() func(*types.Response) {
  84. reqRes.mtx.Lock()
  85. defer reqRes.mtx.Unlock()
  86. return reqRes.cb
  87. }
  88. // NOTE: it should be safe to read reqRes.cb without locks after this.
  89. func (reqRes *ReqRes) SetDone() {
  90. reqRes.mtx.Lock()
  91. reqRes.done = true
  92. reqRes.mtx.Unlock()
  93. }
  94. func waitGroup1() (wg *sync.WaitGroup) {
  95. wg = &sync.WaitGroup{}
  96. wg.Add(1)
  97. return
  98. }