You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

306 lines
6.8 KiB

  1. package proxy
  2. import (
  3. "bufio"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "reflect"
  9. "sync"
  10. . "github.com/tendermint/go-common"
  11. "github.com/tendermint/go-wire"
  12. tmsp "github.com/tendermint/tmsp/types"
  13. )
  14. const maxResponseSize = 1048576 // 1MB
  15. // This is goroutine-safe, but users should beware that
  16. // the application in general is not meant to be interfaced
  17. // with concurrent callers.
  18. type remoteAppContext struct {
  19. QuitService
  20. sync.Mutex
  21. reqQueue chan *reqRes
  22. mtx sync.Mutex
  23. conn net.Conn
  24. bufWriter *bufio.Writer
  25. err error
  26. reqSent *list.List
  27. resCb func(tmsp.Request, tmsp.Response)
  28. }
  29. func NewRemoteAppContext(conn net.Conn, bufferSize int) *remoteAppContext {
  30. app := &remoteAppContext{
  31. reqQueue: make(chan *reqRes, bufferSize),
  32. conn: conn,
  33. bufWriter: bufio.NewWriter(conn),
  34. reqSent: list.New(),
  35. resCb: nil,
  36. }
  37. app.QuitService = *NewQuitService(nil, "remoteAppContext", app)
  38. return app
  39. }
  40. func (app *remoteAppContext) OnStart() error {
  41. app.QuitService.OnStart()
  42. go app.sendRequestsRoutine()
  43. go app.recvResponseRoutine()
  44. return nil
  45. }
  46. func (app *remoteAppContext) OnStop() {
  47. app.QuitService.OnStop()
  48. app.conn.Close()
  49. }
  50. func (app *remoteAppContext) SetResponseCallback(resCb Callback) {
  51. app.mtx.Lock()
  52. defer app.mtx.Unlock()
  53. app.resCb = resCb
  54. }
  55. func (app *remoteAppContext) StopForError(err error) {
  56. app.mtx.Lock()
  57. fmt.Println("Stopping remoteAppContext for error:", err)
  58. if app.err == nil {
  59. app.err = err
  60. }
  61. app.mtx.Unlock()
  62. app.Stop()
  63. }
  64. func (app *remoteAppContext) Error() error {
  65. app.mtx.Lock()
  66. defer app.mtx.Unlock()
  67. return app.err
  68. }
  69. //----------------------------------------
  70. func (app *remoteAppContext) sendRequestsRoutine() {
  71. for {
  72. var n int
  73. var err error
  74. select {
  75. case <-app.QuitService.Quit:
  76. return
  77. case reqres := <-app.reqQueue:
  78. wire.WriteBinary(reqres.Request, app.bufWriter, &n, &err)
  79. if err != nil {
  80. app.StopForError(err)
  81. return
  82. }
  83. if _, ok := reqres.Request.(tmsp.RequestFlush); ok {
  84. err = app.bufWriter.Flush()
  85. if err != nil {
  86. app.StopForError(err)
  87. return
  88. }
  89. }
  90. app.didSendReq(reqres)
  91. }
  92. }
  93. }
  94. func (app *remoteAppContext) recvResponseRoutine() {
  95. r := bufio.NewReader(app.conn) // Buffer reads
  96. for {
  97. var res tmsp.Response
  98. var n int
  99. var err error
  100. wire.ReadBinaryPtr(&res, r, maxResponseSize, &n, &err)
  101. if err != nil {
  102. app.StopForError(err)
  103. return
  104. }
  105. switch res := res.(type) {
  106. case tmsp.ResponseException:
  107. app.StopForError(errors.New(res.Error))
  108. default:
  109. err := app.didRecvResponse(res)
  110. if err != nil {
  111. app.StopForError(err)
  112. }
  113. }
  114. }
  115. }
  116. func (app *remoteAppContext) didSendReq(reqres *reqRes) {
  117. app.mtx.Lock()
  118. defer app.mtx.Unlock()
  119. app.reqSent.PushBack(reqres)
  120. }
  121. func (app *remoteAppContext) didRecvResponse(res tmsp.Response) error {
  122. app.mtx.Lock()
  123. defer app.mtx.Unlock()
  124. // Special logic for events which have no corresponding requests.
  125. if _, ok := res.(tmsp.ResponseEvent); ok && app.resCb != nil {
  126. app.resCb(nil, res)
  127. return nil
  128. }
  129. // Get the first reqRes
  130. next := app.reqSent.Front()
  131. if next == nil {
  132. return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res))
  133. }
  134. reqres := next.Value.(*reqRes)
  135. if !resMatchesReq(reqres.Request, res) {
  136. return fmt.Errorf("Unexpected result type %v when response to %v expected",
  137. reflect.TypeOf(res), reflect.TypeOf(reqres.Request))
  138. }
  139. reqres.Response = res // Set response
  140. reqres.Done() // Release waiters
  141. app.reqSent.Remove(next) // Pop first item from linked list
  142. // Callback if there is a listener
  143. if app.resCb != nil {
  144. app.resCb(reqres.Request, res)
  145. }
  146. return nil
  147. }
  148. //----------------------------------------
  149. func (app *remoteAppContext) EchoAsync(msg string) {
  150. app.queueRequest(tmsp.RequestEcho{msg})
  151. }
  152. func (app *remoteAppContext) FlushAsync() {
  153. app.queueRequest(tmsp.RequestFlush{})
  154. }
  155. func (app *remoteAppContext) SetOptionAsync(key string, value string) {
  156. app.queueRequest(tmsp.RequestSetOption{key, value})
  157. }
  158. func (app *remoteAppContext) AppendTxAsync(tx []byte) {
  159. app.queueRequest(tmsp.RequestAppendTx{tx})
  160. }
  161. func (app *remoteAppContext) GetHashAsync() {
  162. app.queueRequest(tmsp.RequestGetHash{})
  163. }
  164. func (app *remoteAppContext) CommitAsync() {
  165. app.queueRequest(tmsp.RequestCommit{})
  166. }
  167. func (app *remoteAppContext) RollbackAsync() {
  168. app.queueRequest(tmsp.RequestRollback{})
  169. }
  170. func (app *remoteAppContext) AddListenerAsync(key string) {
  171. app.queueRequest(tmsp.RequestAddListener{key})
  172. }
  173. func (app *remoteAppContext) RemListenerAsync(key string) {
  174. app.queueRequest(tmsp.RequestRemListener{key})
  175. }
  176. //----------------------------------------
  177. func (app *remoteAppContext) InfoSync() (info []string, err error) {
  178. reqres := app.queueRequest(tmsp.RequestInfo{})
  179. app.FlushSync()
  180. if app.err != nil {
  181. return nil, app.err
  182. }
  183. return reqres.Response.(tmsp.ResponseInfo).Data, nil
  184. }
  185. func (app *remoteAppContext) FlushSync() error {
  186. app.queueRequest(tmsp.RequestFlush{}).Wait()
  187. return app.err
  188. }
  189. func (app *remoteAppContext) GetHashSync() (hash []byte, err error) {
  190. reqres := app.queueRequest(tmsp.RequestGetHash{})
  191. app.FlushSync()
  192. if app.err != nil {
  193. return nil, app.err
  194. }
  195. return reqres.Response.(tmsp.ResponseGetHash).Hash, nil
  196. }
  197. // Commits or error
  198. func (app *remoteAppContext) CommitSync() (err error) {
  199. app.queueRequest(tmsp.RequestCommit{})
  200. app.FlushSync()
  201. return app.err
  202. }
  203. // Rollback or error
  204. // Clears internal buffers
  205. func (app *remoteAppContext) RollbackSync() (err error) {
  206. app.queueRequest(tmsp.RequestRollback{})
  207. app.FlushSync()
  208. return app.err
  209. }
  210. //----------------------------------------
  211. func (app *remoteAppContext) queueRequest(req tmsp.Request) *reqRes {
  212. reqres := NewreqRes(req)
  213. // TODO: set app.err if reqQueue times out
  214. app.reqQueue <- reqres
  215. return reqres
  216. }
  217. //----------------------------------------
  218. func resMatchesReq(req tmsp.Request, res tmsp.Response) (ok bool) {
  219. switch req.(type) {
  220. case tmsp.RequestEcho:
  221. _, ok = res.(tmsp.ResponseEcho)
  222. case tmsp.RequestFlush:
  223. _, ok = res.(tmsp.ResponseFlush)
  224. case tmsp.RequestInfo:
  225. _, ok = res.(tmsp.ResponseInfo)
  226. case tmsp.RequestSetOption:
  227. _, ok = res.(tmsp.ResponseSetOption)
  228. case tmsp.RequestAppendTx:
  229. _, ok = res.(tmsp.ResponseAppendTx)
  230. case tmsp.RequestGetHash:
  231. _, ok = res.(tmsp.ResponseGetHash)
  232. case tmsp.RequestCommit:
  233. _, ok = res.(tmsp.ResponseCommit)
  234. case tmsp.RequestRollback:
  235. _, ok = res.(tmsp.ResponseRollback)
  236. case tmsp.RequestAddListener:
  237. _, ok = res.(tmsp.ResponseAddListener)
  238. case tmsp.RequestRemListener:
  239. _, ok = res.(tmsp.ResponseRemListener)
  240. default:
  241. return false
  242. }
  243. return
  244. }
  245. type reqRes struct {
  246. tmsp.Request
  247. *sync.WaitGroup
  248. tmsp.Response // Not set atomically, so be sure to use WaitGroup.
  249. }
  250. func NewreqRes(req tmsp.Request) *reqRes {
  251. return &reqRes{
  252. Request: req,
  253. WaitGroup: waitGroup1(),
  254. Response: nil,
  255. }
  256. }
  257. func waitGroup1() (wg *sync.WaitGroup) {
  258. wg = &sync.WaitGroup{}
  259. wg.Add(1)
  260. return
  261. }