You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

223 lines
5.5 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package server
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sync"
  8. "github.com/tendermint/abci/types"
  9. cmn "github.com/tendermint/tmlibs/common"
  10. )
  11. // var maxNumberConnections = 2
  12. type SocketServer struct {
  13. cmn.BaseService
  14. proto string
  15. addr string
  16. listener net.Listener
  17. connsMtx sync.Mutex
  18. conns map[int]net.Conn
  19. nextConnID int
  20. appMtx sync.Mutex
  21. app types.Application
  22. }
  23. func NewSocketServer(protoAddr string, app types.Application) cmn.Service {
  24. proto, addr := cmn.ProtocolAndAddress(protoAddr)
  25. s := &SocketServer{
  26. proto: proto,
  27. addr: addr,
  28. listener: nil,
  29. app: app,
  30. conns: make(map[int]net.Conn),
  31. }
  32. s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)
  33. return s
  34. }
  35. func (s *SocketServer) OnStart() error {
  36. if err := s.BaseService.OnStart(); err != nil {
  37. return err
  38. }
  39. ln, err := net.Listen(s.proto, s.addr)
  40. if err != nil {
  41. return err
  42. }
  43. s.listener = ln
  44. go s.acceptConnectionsRoutine()
  45. return nil
  46. }
  47. func (s *SocketServer) OnStop() {
  48. s.BaseService.OnStop()
  49. s.listener.Close()
  50. s.connsMtx.Lock()
  51. defer s.connsMtx.Unlock()
  52. for id, conn := range s.conns {
  53. delete(s.conns, id)
  54. conn.Close()
  55. }
  56. }
  57. func (s *SocketServer) addConn(conn net.Conn) int {
  58. s.connsMtx.Lock()
  59. defer s.connsMtx.Unlock()
  60. connID := s.nextConnID
  61. s.nextConnID++
  62. s.conns[connID] = conn
  63. return connID
  64. }
  65. // deletes conn even if close errs
  66. func (s *SocketServer) rmConn(connID int) error {
  67. s.connsMtx.Lock()
  68. defer s.connsMtx.Unlock()
  69. conn, ok := s.conns[connID]
  70. if !ok {
  71. return fmt.Errorf("Connection %d does not exist", connID)
  72. }
  73. delete(s.conns, connID)
  74. return conn.Close()
  75. }
  76. func (s *SocketServer) acceptConnectionsRoutine() {
  77. for {
  78. // Accept a connection
  79. s.Logger.Info("Waiting for new connection...")
  80. conn, err := s.listener.Accept()
  81. if err != nil {
  82. if !s.IsRunning() {
  83. return // Ignore error from listener closing.
  84. }
  85. s.Logger.Error("Failed to accept connection: " + err.Error())
  86. continue
  87. }
  88. s.Logger.Info("Accepted a new connection")
  89. connID := s.addConn(conn)
  90. closeConn := make(chan error, 2) // Push to signal connection closed
  91. responses := make(chan *types.Response, 1000) // A channel to buffer responses
  92. // Read requests from conn and deal with them
  93. go s.handleRequests(closeConn, conn, responses)
  94. // Pull responses from 'responses' and write them to conn.
  95. go s.handleResponses(closeConn, conn, responses)
  96. // Wait until signal to close connection
  97. go s.waitForClose(closeConn, connID)
  98. }
  99. }
  100. func (s *SocketServer) waitForClose(closeConn chan error, connID int) {
  101. err := <-closeConn
  102. if err == io.EOF {
  103. s.Logger.Error("Connection was closed by client")
  104. } else if err != nil {
  105. s.Logger.Error("Connection error", "error", err)
  106. } else {
  107. // never happens
  108. s.Logger.Error("Connection was closed.")
  109. }
  110. // Close the connection
  111. if err := s.rmConn(connID); err != nil {
  112. s.Logger.Error("Error in closing connection", "error", err)
  113. }
  114. }
  115. // Read requests from conn and deal with them
  116. func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
  117. var count int
  118. var bufReader = bufio.NewReader(conn)
  119. for {
  120. var req = &types.Request{}
  121. err := types.ReadMessage(bufReader, req)
  122. if err != nil {
  123. if err == io.EOF {
  124. closeConn <- err
  125. } else {
  126. closeConn <- fmt.Errorf("Error reading message: %v", err.Error())
  127. }
  128. return
  129. }
  130. s.appMtx.Lock()
  131. count++
  132. s.handleRequest(req, responses)
  133. s.appMtx.Unlock()
  134. }
  135. }
  136. func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
  137. switch r := req.Value.(type) {
  138. case *types.Request_Echo:
  139. responses <- types.ToResponseEcho(r.Echo.Message)
  140. case *types.Request_Flush:
  141. responses <- types.ToResponseFlush()
  142. case *types.Request_Info:
  143. resInfo := s.app.Info(*r.Info)
  144. responses <- types.ToResponseInfo(resInfo)
  145. case *types.Request_SetOption:
  146. so := r.SetOption
  147. logStr := s.app.SetOption(so.Key, so.Value)
  148. responses <- types.ToResponseSetOption(logStr)
  149. case *types.Request_DeliverTx:
  150. res := s.app.DeliverTx(r.DeliverTx.Tx)
  151. responses <- types.ToResponseDeliverTx(res.Code, res.Data, res.Log, res.Tags)
  152. case *types.Request_CheckTx:
  153. res := s.app.CheckTx(r.CheckTx.Tx)
  154. responses <- types.ToResponseCheckTx(res.Code, res.Data, res.Log, res.Tags)
  155. case *types.Request_Commit:
  156. res := s.app.Commit()
  157. responses <- types.ToResponseCommit(res.Code, res.Data, res.Log)
  158. case *types.Request_Query:
  159. resQuery := s.app.Query(*r.Query)
  160. responses <- types.ToResponseQuery(resQuery)
  161. case *types.Request_InitChain:
  162. s.app.InitChain(*r.InitChain)
  163. responses <- types.ToResponseInitChain()
  164. case *types.Request_BeginBlock:
  165. s.app.BeginBlock(*r.BeginBlock)
  166. responses <- types.ToResponseBeginBlock()
  167. case *types.Request_EndBlock:
  168. resEndBlock := s.app.EndBlock(r.EndBlock.Height)
  169. responses <- types.ToResponseEndBlock(resEndBlock)
  170. default:
  171. responses <- types.ToResponseException("Unknown request")
  172. }
  173. }
  174. // Pull responses from 'responses' and write them to conn.
  175. func (s *SocketServer) handleResponses(closeConn chan error, conn net.Conn, responses <-chan *types.Response) {
  176. var count int
  177. var bufWriter = bufio.NewWriter(conn)
  178. for {
  179. var res = <-responses
  180. err := types.WriteMessage(res, bufWriter)
  181. if err != nil {
  182. closeConn <- fmt.Errorf("Error writing message: %v", err.Error())
  183. return
  184. }
  185. if _, ok := res.Value.(*types.Response_Flush); ok {
  186. err = bufWriter.Flush()
  187. if err != nil {
  188. closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error())
  189. return
  190. }
  191. }
  192. count++
  193. }
  194. }