You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
5.7 KiB

8 years ago
8 years ago
  1. package server
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "strings"
  8. "sync"
  9. . "github.com/tendermint/go-common"
  10. "github.com/tendermint/tmsp/types"
  11. )
  12. // var maxNumberConnections = 2
  13. type SocketServer struct {
  14. BaseService
  15. proto string
  16. addr string
  17. listener net.Listener
  18. connsMtx sync.Mutex
  19. conns map[int]net.Conn
  20. nextConnID int
  21. appMtx sync.Mutex
  22. app types.Application
  23. }
  24. func NewSocketServer(protoAddr string, app types.Application) (Service, error) {
  25. parts := strings.SplitN(protoAddr, "://", 2)
  26. proto, addr := parts[0], parts[1]
  27. s := &SocketServer{
  28. proto: proto,
  29. addr: addr,
  30. listener: nil,
  31. app: app,
  32. conns: make(map[int]net.Conn),
  33. }
  34. s.BaseService = *NewBaseService(nil, "TMSPServer", s)
  35. _, err := s.Start() // Just start it
  36. return s, err
  37. }
  38. func (s *SocketServer) OnStart() error {
  39. s.BaseService.OnStart()
  40. ln, err := net.Listen(s.proto, s.addr)
  41. if err != nil {
  42. return err
  43. }
  44. s.listener = ln
  45. go s.acceptConnectionsRoutine()
  46. return nil
  47. }
  48. func (s *SocketServer) OnStop() {
  49. s.BaseService.OnStop()
  50. s.listener.Close()
  51. s.connsMtx.Lock()
  52. for id, conn := range s.conns {
  53. delete(s.conns, id)
  54. conn.Close()
  55. }
  56. s.connsMtx.Unlock()
  57. }
  58. func (s *SocketServer) addConn(conn net.Conn) int {
  59. s.connsMtx.Lock()
  60. defer s.connsMtx.Unlock()
  61. connID := s.nextConnID
  62. s.nextConnID += 1
  63. s.conns[connID] = conn
  64. return connID
  65. }
  66. // deletes conn even if close errs
  67. func (s *SocketServer) rmConn(connID int, conn net.Conn) error {
  68. s.connsMtx.Lock()
  69. defer s.connsMtx.Unlock()
  70. delete(s.conns, connID)
  71. return conn.Close()
  72. }
  73. func (s *SocketServer) acceptConnectionsRoutine() {
  74. // semaphore := make(chan struct{}, maxNumberConnections)
  75. for {
  76. // semaphore <- struct{}{}
  77. // Accept a connection
  78. log.Notice("Waiting for new connection...")
  79. conn, err := s.listener.Accept()
  80. if err != nil {
  81. if !s.IsRunning() {
  82. return // Ignore error from listener closing.
  83. }
  84. Exit("Failed to accept connection: " + err.Error())
  85. } else {
  86. log.Notice("Accepted a new connection")
  87. }
  88. connID := s.addConn(conn)
  89. closeConn := make(chan error, 2) // Push to signal connection closed
  90. responses := make(chan *types.Response, 1000) // A channel to buffer responses
  91. // Read requests from conn and deal with them
  92. go s.handleRequests(closeConn, conn, responses)
  93. // Pull responses from 'responses' and write them to conn.
  94. go s.handleResponses(closeConn, responses, conn)
  95. go func() {
  96. // Wait until signal to close connection
  97. errClose := <-closeConn
  98. if err == io.EOF {
  99. log.Warn("Connection was closed by client")
  100. } else if errClose != nil {
  101. log.Warn("Connection error", "error", errClose)
  102. } else {
  103. // never happens
  104. log.Warn("Connection was closed.")
  105. }
  106. // Close the connection
  107. err := s.rmConn(connID, conn)
  108. if err != nil {
  109. log.Warn("Error in closing connection", "error", err)
  110. }
  111. // <-semaphore
  112. }()
  113. }
  114. }
  115. // Read requests from conn and deal with them
  116. func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
  117. var count int
  118. var bufReader = bufio.NewReader(conn)
  119. for {
  120. var req = &types.Request{}
  121. err := types.ReadMessage(bufReader, req)
  122. if err != nil {
  123. if err == io.EOF {
  124. closeConn <- err
  125. } else {
  126. closeConn <- fmt.Errorf("Error reading message: %v", err.Error())
  127. }
  128. return
  129. }
  130. s.appMtx.Lock()
  131. count++
  132. s.handleRequest(req, responses)
  133. s.appMtx.Unlock()
  134. }
  135. }
  136. func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
  137. switch r := req.Value.(type) {
  138. case *types.Request_Echo:
  139. responses <- types.ToResponseEcho(r.Echo.Message)
  140. case *types.Request_Flush:
  141. responses <- types.ToResponseFlush()
  142. case *types.Request_Info:
  143. responses <- types.ToResponseInfo(s.app.Info())
  144. case *types.Request_SetOption:
  145. so := r.SetOption
  146. logStr := s.app.SetOption(so.Key, so.Value)
  147. responses <- types.ToResponseSetOption(logStr)
  148. case *types.Request_AppendTx:
  149. res := s.app.AppendTx(r.AppendTx.Tx)
  150. responses <- types.ToResponseAppendTx(res.Code, res.Data, res.Log)
  151. case *types.Request_CheckTx:
  152. res := s.app.CheckTx(r.CheckTx.Tx)
  153. responses <- types.ToResponseCheckTx(res.Code, res.Data, res.Log)
  154. case *types.Request_Commit:
  155. res := s.app.Commit()
  156. responses <- types.ToResponseCommit(res.Code, res.Data, res.Log)
  157. case *types.Request_Query:
  158. res := s.app.Query(r.Query.Query)
  159. responses <- types.ToResponseQuery(res.Code, res.Data, res.Log)
  160. case *types.Request_InitChain:
  161. if app, ok := s.app.(types.BlockchainAware); ok {
  162. app.InitChain(r.InitChain.Validators)
  163. }
  164. responses <- types.ToResponseInitChain()
  165. case *types.Request_BeginBlock:
  166. if app, ok := s.app.(types.BlockchainAware); ok {
  167. app.BeginBlock(r.BeginBlock.Hash, r.BeginBlock.Header)
  168. }
  169. responses <- types.ToResponseBeginBlock()
  170. case *types.Request_EndBlock:
  171. if app, ok := s.app.(types.BlockchainAware); ok {
  172. validators := app.EndBlock(r.EndBlock.Height)
  173. responses <- types.ToResponseEndBlock(validators)
  174. } else {
  175. responses <- types.ToResponseEndBlock(nil)
  176. }
  177. default:
  178. responses <- types.ToResponseException("Unknown request")
  179. }
  180. }
  181. // Pull responses from 'responses' and write them to conn.
  182. func (s *SocketServer) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) {
  183. var count int
  184. var bufWriter = bufio.NewWriter(conn)
  185. for {
  186. var res = <-responses
  187. err := types.WriteMessage(res, bufWriter)
  188. if err != nil {
  189. closeConn <- fmt.Errorf("Error writing message: %v", err.Error())
  190. return
  191. }
  192. if _, ok := res.Value.(*types.Response_Flush); ok {
  193. err = bufWriter.Flush()
  194. if err != nil {
  195. closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error())
  196. return
  197. }
  198. }
  199. count++
  200. }
  201. }