You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

236 lines
5.8 KiB

8 years ago
8 years ago
7 years ago
7 years ago
8 years ago
8 years ago
8 years ago
  1. package server
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sync"
  8. "github.com/tendermint/tendermint/abci/types"
  9. cmn "github.com/tendermint/tendermint/libs/common"
  10. )
  11. // var maxNumberConnections = 2
  12. type SocketServer struct {
  13. cmn.BaseService
  14. proto string
  15. addr string
  16. listener net.Listener
  17. connsMtx sync.Mutex
  18. conns map[int]net.Conn
  19. nextConnID int
  20. appMtx sync.Mutex
  21. app types.Application
  22. }
  23. func NewSocketServer(protoAddr string, app types.Application) cmn.Service {
  24. proto, addr := cmn.ProtocolAndAddress(protoAddr)
  25. s := &SocketServer{
  26. proto: proto,
  27. addr: addr,
  28. listener: nil,
  29. app: app,
  30. conns: make(map[int]net.Conn),
  31. }
  32. s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)
  33. return s
  34. }
  35. func (s *SocketServer) OnStart() error {
  36. if err := s.BaseService.OnStart(); err != nil {
  37. return err
  38. }
  39. ln, err := net.Listen(s.proto, s.addr)
  40. if err != nil {
  41. return err
  42. }
  43. s.listener = ln
  44. go s.acceptConnectionsRoutine()
  45. return nil
  46. }
  47. func (s *SocketServer) OnStop() {
  48. s.BaseService.OnStop()
  49. if err := s.listener.Close(); err != nil {
  50. s.Logger.Error("Error closing listener", "err", err)
  51. }
  52. s.connsMtx.Lock()
  53. defer s.connsMtx.Unlock()
  54. for id, conn := range s.conns {
  55. delete(s.conns, id)
  56. if err := conn.Close(); err != nil {
  57. s.Logger.Error("Error closing connection", "id", id, "conn", conn, "err", err)
  58. }
  59. }
  60. }
  61. func (s *SocketServer) addConn(conn net.Conn) int {
  62. s.connsMtx.Lock()
  63. defer s.connsMtx.Unlock()
  64. connID := s.nextConnID
  65. s.nextConnID++
  66. s.conns[connID] = conn
  67. return connID
  68. }
  69. // deletes conn even if close errs
  70. func (s *SocketServer) rmConn(connID int) error {
  71. s.connsMtx.Lock()
  72. defer s.connsMtx.Unlock()
  73. conn, ok := s.conns[connID]
  74. if !ok {
  75. return fmt.Errorf("Connection %d does not exist", connID)
  76. }
  77. delete(s.conns, connID)
  78. return conn.Close()
  79. }
  80. func (s *SocketServer) acceptConnectionsRoutine() {
  81. for {
  82. // Accept a connection
  83. s.Logger.Info("Waiting for new connection...")
  84. conn, err := s.listener.Accept()
  85. if err != nil {
  86. if !s.IsRunning() {
  87. return // Ignore error from listener closing.
  88. }
  89. s.Logger.Error("Failed to accept connection: " + err.Error())
  90. continue
  91. }
  92. s.Logger.Info("Accepted a new connection")
  93. connID := s.addConn(conn)
  94. closeConn := make(chan error, 2) // Push to signal connection closed
  95. responses := make(chan *types.Response, 1000) // A channel to buffer responses
  96. // Read requests from conn and deal with them
  97. go s.handleRequests(closeConn, conn, responses)
  98. // Pull responses from 'responses' and write them to conn.
  99. go s.handleResponses(closeConn, conn, responses)
  100. // Wait until signal to close connection
  101. go s.waitForClose(closeConn, connID)
  102. }
  103. }
  104. func (s *SocketServer) waitForClose(closeConn chan error, connID int) {
  105. err := <-closeConn
  106. if err == io.EOF {
  107. s.Logger.Error("Connection was closed by client")
  108. } else if err != nil {
  109. s.Logger.Error("Connection error", "error", err)
  110. } else {
  111. // never happens
  112. s.Logger.Error("Connection was closed.")
  113. }
  114. // Close the connection
  115. if err := s.rmConn(connID); err != nil {
  116. s.Logger.Error("Error in closing connection", "error", err)
  117. }
  118. }
  119. // Read requests from conn and deal with them
  120. func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
  121. var count int
  122. var bufReader = bufio.NewReader(conn)
  123. defer func() {
  124. // make sure to recover from any app-related panics to allow proper socket cleanup
  125. r := recover()
  126. if r != nil {
  127. closeConn <- fmt.Errorf("recovered from panic: %v", r)
  128. s.appMtx.Unlock()
  129. }
  130. }()
  131. for {
  132. var req = &types.Request{}
  133. err := types.ReadMessage(bufReader, req)
  134. if err != nil {
  135. if err == io.EOF {
  136. closeConn <- err
  137. } else {
  138. closeConn <- fmt.Errorf("error reading message: %v", err)
  139. }
  140. return
  141. }
  142. s.appMtx.Lock()
  143. count++
  144. s.handleRequest(req, responses)
  145. s.appMtx.Unlock()
  146. }
  147. }
  148. func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
  149. switch r := req.Value.(type) {
  150. case *types.Request_Echo:
  151. responses <- types.ToResponseEcho(r.Echo.Message)
  152. case *types.Request_Flush:
  153. responses <- types.ToResponseFlush()
  154. case *types.Request_Info:
  155. res := s.app.Info(*r.Info)
  156. responses <- types.ToResponseInfo(res)
  157. case *types.Request_SetOption:
  158. res := s.app.SetOption(*r.SetOption)
  159. responses <- types.ToResponseSetOption(res)
  160. case *types.Request_DeliverTx:
  161. res := s.app.DeliverTx(*r.DeliverTx)
  162. responses <- types.ToResponseDeliverTx(res)
  163. case *types.Request_CheckTx:
  164. res := s.app.CheckTx(*r.CheckTx)
  165. responses <- types.ToResponseCheckTx(res)
  166. case *types.Request_Commit:
  167. res := s.app.Commit()
  168. responses <- types.ToResponseCommit(res)
  169. case *types.Request_Query:
  170. res := s.app.Query(*r.Query)
  171. responses <- types.ToResponseQuery(res)
  172. case *types.Request_InitChain:
  173. res := s.app.InitChain(*r.InitChain)
  174. responses <- types.ToResponseInitChain(res)
  175. case *types.Request_BeginBlock:
  176. res := s.app.BeginBlock(*r.BeginBlock)
  177. responses <- types.ToResponseBeginBlock(res)
  178. case *types.Request_EndBlock:
  179. res := s.app.EndBlock(*r.EndBlock)
  180. responses <- types.ToResponseEndBlock(res)
  181. default:
  182. responses <- types.ToResponseException("Unknown request")
  183. }
  184. }
  185. // Pull responses from 'responses' and write them to conn.
  186. func (s *SocketServer) handleResponses(closeConn chan error, conn net.Conn, responses <-chan *types.Response) {
  187. var count int
  188. var bufWriter = bufio.NewWriter(conn)
  189. for {
  190. var res = <-responses
  191. err := types.WriteMessage(res, bufWriter)
  192. if err != nil {
  193. closeConn <- fmt.Errorf("Error writing message: %v", err.Error())
  194. return
  195. }
  196. if _, ok := res.Value.(*types.Response_Flush); ok {
  197. err = bufWriter.Flush()
  198. if err != nil {
  199. closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error())
  200. return
  201. }
  202. }
  203. count++
  204. }
  205. }