You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

224 lines
5.5 KiB

8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package server
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "strings"
  8. "sync"
  9. "github.com/tendermint/abci/types"
  10. cmn "github.com/tendermint/tmlibs/common"
  11. )
  12. // var maxNumberConnections = 2
  13. type SocketServer struct {
  14. cmn.BaseService
  15. proto string
  16. addr string
  17. listener net.Listener
  18. connsMtx sync.Mutex
  19. conns map[int]net.Conn
  20. nextConnID int
  21. appMtx sync.Mutex
  22. app types.Application
  23. }
  24. func NewSocketServer(protoAddr string, app types.Application) cmn.Service {
  25. parts := strings.SplitN(protoAddr, "://", 2)
  26. proto, addr := parts[0], parts[1]
  27. s := &SocketServer{
  28. proto: proto,
  29. addr: addr,
  30. listener: nil,
  31. app: app,
  32. conns: make(map[int]net.Conn),
  33. }
  34. s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)
  35. return s
  36. }
  37. func (s *SocketServer) OnStart() error {
  38. if err := s.BaseService.OnStart(); err != nil {
  39. return err
  40. }
  41. ln, err := net.Listen(s.proto, s.addr)
  42. if err != nil {
  43. return err
  44. }
  45. s.listener = ln
  46. go s.acceptConnectionsRoutine()
  47. return nil
  48. }
  49. func (s *SocketServer) OnStop() {
  50. s.BaseService.OnStop()
  51. s.listener.Close()
  52. s.connsMtx.Lock()
  53. for id, conn := range s.conns {
  54. delete(s.conns, id)
  55. conn.Close()
  56. }
  57. s.connsMtx.Unlock()
  58. }
  59. func (s *SocketServer) addConn(conn net.Conn) int {
  60. s.connsMtx.Lock()
  61. defer s.connsMtx.Unlock()
  62. connID := s.nextConnID
  63. s.nextConnID++
  64. s.conns[connID] = conn
  65. return connID
  66. }
  67. // deletes conn even if close errs
  68. func (s *SocketServer) rmConn(connID int, conn net.Conn) error {
  69. s.connsMtx.Lock()
  70. defer s.connsMtx.Unlock()
  71. delete(s.conns, connID)
  72. return conn.Close()
  73. }
  74. func (s *SocketServer) acceptConnectionsRoutine() {
  75. // semaphore := make(chan struct{}, maxNumberConnections)
  76. for {
  77. // semaphore <- struct{}{}
  78. // Accept a connection
  79. s.Logger.Info("Waiting for new connection...")
  80. conn, err := s.listener.Accept()
  81. if err != nil {
  82. if !s.IsRunning() {
  83. return // Ignore error from listener closing.
  84. }
  85. s.Logger.Error("Failed to accept connection: " + err.Error())
  86. } else {
  87. s.Logger.Info("Accepted a new connection")
  88. }
  89. connID := s.addConn(conn)
  90. closeConn := make(chan error, 2) // Push to signal connection closed
  91. responses := make(chan *types.Response, 1000) // A channel to buffer responses
  92. // Read requests from conn and deal with them
  93. go s.handleRequests(closeConn, conn, responses)
  94. // Pull responses from 'responses' and write them to conn.
  95. go s.handleResponses(closeConn, responses, conn)
  96. go func() {
  97. // Wait until signal to close connection
  98. errClose := <-closeConn
  99. if err == io.EOF {
  100. s.Logger.Error("Connection was closed by client")
  101. } else if errClose != nil {
  102. s.Logger.Error("Connection error", "error", errClose)
  103. } else {
  104. // never happens
  105. s.Logger.Error("Connection was closed.")
  106. }
  107. // Close the connection
  108. err := s.rmConn(connID, conn)
  109. if err != nil {
  110. s.Logger.Error("Error in closing connection", "error", err)
  111. }
  112. // <-semaphore
  113. }()
  114. }
  115. }
  116. // Read requests from conn and deal with them
  117. func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
  118. var count int
  119. var bufReader = bufio.NewReader(conn)
  120. for {
  121. var req = &types.Request{}
  122. err := types.ReadMessage(bufReader, req)
  123. if err != nil {
  124. if err == io.EOF {
  125. closeConn <- err
  126. } else {
  127. closeConn <- fmt.Errorf("Error reading message: %v", err.Error())
  128. }
  129. return
  130. }
  131. s.appMtx.Lock()
  132. count++
  133. s.handleRequest(req, responses)
  134. s.appMtx.Unlock()
  135. }
  136. }
  137. func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
  138. switch r := req.Value.(type) {
  139. case *types.Request_Echo:
  140. responses <- types.ToResponseEcho(r.Echo.Message)
  141. case *types.Request_Flush:
  142. responses <- types.ToResponseFlush()
  143. case *types.Request_Info:
  144. resInfo := s.app.Info()
  145. responses <- types.ToResponseInfo(resInfo)
  146. case *types.Request_SetOption:
  147. so := r.SetOption
  148. logStr := s.app.SetOption(so.Key, so.Value)
  149. responses <- types.ToResponseSetOption(logStr)
  150. case *types.Request_DeliverTx:
  151. res := s.app.DeliverTx(r.DeliverTx.Tx)
  152. responses <- types.ToResponseDeliverTx(res.Code, res.Data, res.Log)
  153. case *types.Request_CheckTx:
  154. res := s.app.CheckTx(r.CheckTx.Tx)
  155. responses <- types.ToResponseCheckTx(res.Code, res.Data, res.Log)
  156. case *types.Request_Commit:
  157. res := s.app.Commit()
  158. responses <- types.ToResponseCommit(res.Code, res.Data, res.Log)
  159. case *types.Request_Query:
  160. resQuery := s.app.Query(*r.Query)
  161. responses <- types.ToResponseQuery(resQuery)
  162. case *types.Request_InitChain:
  163. s.app.InitChain(*r.InitChain)
  164. responses <- types.ToResponseInitChain()
  165. case *types.Request_BeginBlock:
  166. s.app.BeginBlock(*r.BeginBlock)
  167. responses <- types.ToResponseBeginBlock()
  168. case *types.Request_EndBlock:
  169. resEndBlock := s.app.EndBlock(r.EndBlock.Height)
  170. responses <- types.ToResponseEndBlock(resEndBlock)
  171. default:
  172. responses <- types.ToResponseException("Unknown request")
  173. }
  174. }
  175. // Pull responses from 'responses' and write them to conn.
  176. func (s *SocketServer) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) {
  177. var count int
  178. var bufWriter = bufio.NewWriter(conn)
  179. for {
  180. var res = <-responses
  181. err := types.WriteMessage(res, bufWriter)
  182. if err != nil {
  183. closeConn <- fmt.Errorf("Error writing message: %v", err.Error())
  184. return
  185. }
  186. if _, ok := res.Value.(*types.Response_Flush); ok {
  187. err = bufWriter.Flush()
  188. if err != nil {
  189. closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error())
  190. return
  191. }
  192. }
  193. count++
  194. }
  195. }