You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

310 lines
7.1 KiB

  1. package proxy
  2. import (
  3. "bufio"
  4. "container/list"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "reflect"
  9. "sync"
  10. . "github.com/tendermint/go-common"
  11. "github.com/tendermint/go-wire"
  12. tmsp "github.com/tendermint/tmsp/types"
  13. )
  14. const maxResponseSize = 1048576 // 1MB
  15. // This is goroutine-safe, but users should beware that
  16. // the application in general is not meant to be interfaced
  17. // with concurrent callers.
  18. type remoteAppContext struct {
  19. QuitService
  20. sync.Mutex // [EB]: is this even used?
  21. reqQueue chan *reqRes
  22. mtx sync.Mutex
  23. conn net.Conn
  24. bufWriter *bufio.Writer
  25. err error
  26. reqSent *list.List
  27. resCb func(tmsp.Request, tmsp.Response)
  28. }
  29. func NewRemoteAppContext(conn net.Conn, bufferSize int) *remoteAppContext {
  30. app := &remoteAppContext{
  31. reqQueue: make(chan *reqRes, bufferSize),
  32. conn: conn,
  33. bufWriter: bufio.NewWriter(conn),
  34. reqSent: list.New(),
  35. resCb: nil,
  36. }
  37. app.QuitService = *NewQuitService(nil, "remoteAppContext", app)
  38. return app
  39. }
  40. func (app *remoteAppContext) OnStart() error {
  41. app.QuitService.OnStart()
  42. go app.sendRequestsRoutine()
  43. go app.recvResponseRoutine()
  44. return nil
  45. }
  46. func (app *remoteAppContext) OnStop() {
  47. app.QuitService.OnStop()
  48. app.conn.Close()
  49. }
  50. func (app *remoteAppContext) SetResponseCallback(resCb Callback) {
  51. app.mtx.Lock()
  52. defer app.mtx.Unlock()
  53. app.resCb = resCb
  54. }
  55. func (app *remoteAppContext) StopForError(err error) {
  56. app.mtx.Lock()
  57. log.Error("Stopping remoteAppContext for error.", "error", err)
  58. if app.err == nil {
  59. app.err = err
  60. }
  61. app.mtx.Unlock()
  62. app.Stop()
  63. }
  64. func (app *remoteAppContext) Error() error {
  65. app.mtx.Lock()
  66. defer app.mtx.Unlock()
  67. return app.err
  68. }
  69. //----------------------------------------
  70. func (app *remoteAppContext) sendRequestsRoutine() {
  71. for {
  72. var n int
  73. var err error
  74. select {
  75. case <-app.QuitService.Quit:
  76. return
  77. case reqres := <-app.reqQueue:
  78. app.willSendReq(reqres)
  79. wire.WriteBinaryLengthPrefixed(struct{ tmsp.Request }{reqres.Request}, app.bufWriter, &n, &err) // Length prefix
  80. if err != nil {
  81. app.StopForError(err)
  82. return
  83. }
  84. log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
  85. if _, ok := reqres.Request.(tmsp.RequestFlush); ok {
  86. err = app.bufWriter.Flush()
  87. if err != nil {
  88. app.StopForError(err)
  89. return
  90. }
  91. }
  92. }
  93. }
  94. }
  95. func (app *remoteAppContext) recvResponseRoutine() {
  96. r := bufio.NewReader(app.conn) // Buffer reads
  97. for {
  98. var res tmsp.Response
  99. var n int
  100. var err error
  101. wire.ReadBinaryPtrLengthPrefixed(&res, r, maxResponseSize, &n, &err)
  102. if err != nil {
  103. app.StopForError(err)
  104. return
  105. }
  106. switch res := res.(type) {
  107. case tmsp.ResponseException:
  108. app.StopForError(errors.New(res.Error))
  109. default:
  110. log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
  111. err := app.didRecvResponse(res)
  112. if err != nil {
  113. app.StopForError(err)
  114. }
  115. }
  116. }
  117. }
  118. func (app *remoteAppContext) willSendReq(reqres *reqRes) {
  119. app.mtx.Lock()
  120. defer app.mtx.Unlock()
  121. app.reqSent.PushBack(reqres)
  122. }
  123. func (app *remoteAppContext) didRecvResponse(res tmsp.Response) error {
  124. app.mtx.Lock()
  125. defer app.mtx.Unlock()
  126. // Special logic for events which have no corresponding requests.
  127. if _, ok := res.(tmsp.ResponseEvent); ok && app.resCb != nil {
  128. app.resCb(nil, res)
  129. return nil
  130. }
  131. // Get the first reqRes
  132. next := app.reqSent.Front()
  133. if next == nil {
  134. return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res))
  135. }
  136. reqres := next.Value.(*reqRes)
  137. if !resMatchesReq(reqres.Request, res) {
  138. return fmt.Errorf("Unexpected result type %v when response to %v expected",
  139. reflect.TypeOf(res), reflect.TypeOf(reqres.Request))
  140. }
  141. reqres.Response = res // Set response
  142. reqres.Done() // Release waiters
  143. app.reqSent.Remove(next) // Pop first item from linked list
  144. // Callback if there is a listener
  145. if app.resCb != nil {
  146. app.resCb(reqres.Request, res)
  147. }
  148. return nil
  149. }
  150. //----------------------------------------
  151. func (app *remoteAppContext) EchoAsync(msg string) {
  152. app.queueRequest(tmsp.RequestEcho{msg})
  153. }
  154. func (app *remoteAppContext) FlushAsync() {
  155. app.queueRequest(tmsp.RequestFlush{})
  156. }
  157. func (app *remoteAppContext) SetOptionAsync(key string, value string) {
  158. app.queueRequest(tmsp.RequestSetOption{key, value})
  159. }
  160. func (app *remoteAppContext) AppendTxAsync(tx []byte) {
  161. app.queueRequest(tmsp.RequestAppendTx{tx})
  162. }
  163. func (app *remoteAppContext) GetHashAsync() {
  164. app.queueRequest(tmsp.RequestGetHash{})
  165. }
  166. func (app *remoteAppContext) CommitAsync() {
  167. app.queueRequest(tmsp.RequestCommit{})
  168. }
  169. func (app *remoteAppContext) RollbackAsync() {
  170. app.queueRequest(tmsp.RequestRollback{})
  171. }
  172. func (app *remoteAppContext) AddListenerAsync(key string) {
  173. app.queueRequest(tmsp.RequestAddListener{key})
  174. }
  175. func (app *remoteAppContext) RemListenerAsync(key string) {
  176. app.queueRequest(tmsp.RequestRemListener{key})
  177. }
  178. //----------------------------------------
  179. func (app *remoteAppContext) InfoSync() (info []string, err error) {
  180. reqres := app.queueRequest(tmsp.RequestInfo{})
  181. app.FlushSync()
  182. if app.err != nil {
  183. return nil, app.err
  184. }
  185. return reqres.Response.(tmsp.ResponseInfo).Data, nil
  186. }
  187. func (app *remoteAppContext) FlushSync() error {
  188. app.queueRequest(tmsp.RequestFlush{}).Wait()
  189. return app.err
  190. }
  191. func (app *remoteAppContext) GetHashSync() (hash []byte, err error) {
  192. reqres := app.queueRequest(tmsp.RequestGetHash{})
  193. app.FlushSync()
  194. if app.err != nil {
  195. return nil, app.err
  196. }
  197. return reqres.Response.(tmsp.ResponseGetHash).Hash, nil
  198. }
  199. // Commits or error
  200. func (app *remoteAppContext) CommitSync() (err error) {
  201. app.queueRequest(tmsp.RequestCommit{})
  202. app.FlushSync()
  203. return app.err
  204. }
  205. // Rollback or error
  206. // Clears internal buffers
  207. func (app *remoteAppContext) RollbackSync() (err error) {
  208. app.queueRequest(tmsp.RequestRollback{})
  209. app.FlushSync()
  210. return app.err
  211. }
  212. //----------------------------------------
  213. func (app *remoteAppContext) queueRequest(req tmsp.Request) *reqRes {
  214. reqres := NewreqRes(req)
  215. // TODO: set app.err if reqQueue times out
  216. app.reqQueue <- reqres
  217. return reqres
  218. }
  219. //----------------------------------------
  220. func resMatchesReq(req tmsp.Request, res tmsp.Response) (ok bool) {
  221. switch req.(type) {
  222. case tmsp.RequestEcho:
  223. _, ok = res.(tmsp.ResponseEcho)
  224. case tmsp.RequestFlush:
  225. _, ok = res.(tmsp.ResponseFlush)
  226. case tmsp.RequestInfo:
  227. _, ok = res.(tmsp.ResponseInfo)
  228. case tmsp.RequestSetOption:
  229. _, ok = res.(tmsp.ResponseSetOption)
  230. case tmsp.RequestAppendTx:
  231. _, ok = res.(tmsp.ResponseAppendTx)
  232. case tmsp.RequestGetHash:
  233. _, ok = res.(tmsp.ResponseGetHash)
  234. case tmsp.RequestCommit:
  235. _, ok = res.(tmsp.ResponseCommit)
  236. case tmsp.RequestRollback:
  237. _, ok = res.(tmsp.ResponseRollback)
  238. case tmsp.RequestAddListener:
  239. _, ok = res.(tmsp.ResponseAddListener)
  240. case tmsp.RequestRemListener:
  241. _, ok = res.(tmsp.ResponseRemListener)
  242. default:
  243. return false
  244. }
  245. return
  246. }
  247. type reqRes struct {
  248. tmsp.Request
  249. *sync.WaitGroup
  250. tmsp.Response // Not set atomically, so be sure to use WaitGroup.
  251. }
  252. func NewreqRes(req tmsp.Request) *reqRes {
  253. return &reqRes{
  254. Request: req,
  255. WaitGroup: waitGroup1(),
  256. Response: nil,
  257. }
  258. }
  259. func waitGroup1() (wg *sync.WaitGroup) {
  260. wg = &sync.WaitGroup{}
  261. wg.Add(1)
  262. return
  263. }