You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

191 lines
4.9 KiB

  1. package server
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "strings"
  8. "sync"
  9. . "github.com/tendermint/go-common"
  10. "github.com/tendermint/tmsp/types"
  11. )
  12. // var maxNumberConnections = 2
  13. type SocketServer struct {
  14. QuitService
  15. proto string
  16. addr string
  17. listener net.Listener
  18. appMtx sync.Mutex
  19. app types.Application
  20. }
  21. func NewSocketServer(protoAddr string, app types.Application) (Service, error) {
  22. parts := strings.SplitN(protoAddr, "://", 2)
  23. proto, addr := parts[0], parts[1]
  24. s := &SocketServer{
  25. proto: proto,
  26. addr: addr,
  27. listener: nil,
  28. app: app,
  29. }
  30. s.QuitService = *NewQuitService(nil, "TMSPServer", s)
  31. _, err := s.Start() // Just start it
  32. return s, err
  33. }
  34. func (s *SocketServer) OnStart() error {
  35. s.QuitService.OnStart()
  36. ln, err := net.Listen(s.proto, s.addr)
  37. if err != nil {
  38. return err
  39. }
  40. s.listener = ln
  41. go s.acceptConnectionsRoutine()
  42. return nil
  43. }
  44. func (s *SocketServer) OnStop() {
  45. s.QuitService.OnStop()
  46. s.listener.Close()
  47. }
  48. func (s *SocketServer) acceptConnectionsRoutine() {
  49. // semaphore := make(chan struct{}, maxNumberConnections)
  50. for {
  51. // semaphore <- struct{}{}
  52. // Accept a connection
  53. fmt.Println("Waiting for new connection...")
  54. conn, err := s.listener.Accept()
  55. if err != nil {
  56. if !s.IsRunning() {
  57. return // Ignore error from listener closing.
  58. }
  59. Exit("Failed to accept connection: " + err.Error())
  60. } else {
  61. fmt.Println("Accepted a new connection")
  62. }
  63. closeConn := make(chan error, 2) // Push to signal connection closed
  64. responses := make(chan *types.Response, 1000) // A channel to buffer responses
  65. // Read requests from conn and deal with them
  66. go s.handleRequests(closeConn, conn, responses)
  67. // Pull responses from 'responses' and write them to conn.
  68. go s.handleResponses(closeConn, responses, conn)
  69. go func() {
  70. // Wait until signal to close connection
  71. errClose := <-closeConn
  72. if errClose != nil {
  73. fmt.Printf("Connection error: %v\n", errClose)
  74. } else {
  75. fmt.Println("Connection was closed.")
  76. }
  77. // Close the connection
  78. err := conn.Close()
  79. if err != nil {
  80. fmt.Printf("Error in closing connection: %v\n", err)
  81. }
  82. // <-semaphore
  83. }()
  84. }
  85. }
  86. // Read requests from conn and deal with them
  87. func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
  88. var count int
  89. var bufReader = bufio.NewReader(conn)
  90. for {
  91. var req = &types.Request{}
  92. err := types.ReadMessage(bufReader, req)
  93. if err != nil {
  94. if err == io.EOF {
  95. closeConn <- fmt.Errorf("Connection closed by client")
  96. } else {
  97. closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
  98. }
  99. return
  100. }
  101. s.appMtx.Lock()
  102. count++
  103. s.handleRequest(req, responses)
  104. s.appMtx.Unlock()
  105. }
  106. }
  107. func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
  108. switch r := req.Value.(type) {
  109. case *types.Request_Echo:
  110. responses <- types.ToResponseEcho(r.Echo.Message)
  111. case *types.Request_Flush:
  112. responses <- types.ToResponseFlush()
  113. case *types.Request_Info:
  114. data := s.app.Info()
  115. responses <- types.ToResponseInfo(data)
  116. case *types.Request_SetOption:
  117. so := r.SetOption
  118. logStr := s.app.SetOption(so.Key, so.Value)
  119. responses <- types.ToResponseSetOption(logStr)
  120. case *types.Request_AppendTx:
  121. res := s.app.AppendTx(r.AppendTx.Tx)
  122. responses <- types.ToResponseAppendTx(res.Code, res.Data, res.Log)
  123. case *types.Request_CheckTx:
  124. res := s.app.CheckTx(r.CheckTx.Tx)
  125. responses <- types.ToResponseCheckTx(res.Code, res.Data, res.Log)
  126. case *types.Request_Commit:
  127. res := s.app.Commit()
  128. responses <- types.ToResponseCommit(res.Code, res.Data, res.Log)
  129. case *types.Request_Query:
  130. res := s.app.Query(r.Query.Query)
  131. responses <- types.ToResponseQuery(res.Code, res.Data, res.Log)
  132. case *types.Request_InitChain:
  133. if app, ok := s.app.(types.BlockchainAware); ok {
  134. app.InitChain(r.InitChain.Validators)
  135. responses <- types.ToResponseInitChain()
  136. } else {
  137. responses <- types.ToResponseInitChain()
  138. }
  139. case *types.Request_EndBlock:
  140. if app, ok := s.app.(types.BlockchainAware); ok {
  141. validators := app.EndBlock(r.EndBlock.Height)
  142. responses <- types.ToResponseEndBlock(validators)
  143. } else {
  144. responses <- types.ToResponseEndBlock(nil)
  145. }
  146. default:
  147. responses <- types.ToResponseException("Unknown request")
  148. }
  149. }
  150. // Pull responses from 'responses' and write them to conn.
  151. func (s *SocketServer) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) {
  152. var count int
  153. var bufWriter = bufio.NewWriter(conn)
  154. for {
  155. var res = <-responses
  156. err := types.WriteMessage(res, bufWriter)
  157. if err != nil {
  158. closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
  159. return
  160. }
  161. if _, ok := res.Value.(*types.Response_Flush); ok {
  162. err = bufWriter.Flush()
  163. if err != nil {
  164. closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
  165. return
  166. }
  167. }
  168. count++
  169. }
  170. }