You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
4.3 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package server
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "strings"
  8. "sync"
  9. . "github.com/tendermint/go-common"
  10. "github.com/tendermint/tmsp/types"
  11. )
  12. // var maxNumberConnections = 2
  13. type Server struct {
  14. QuitService
  15. proto string
  16. addr string
  17. listener net.Listener
  18. appMtx sync.Mutex
  19. app types.Application
  20. }
  21. func NewServer(protoAddr string, app types.Application) (*Server, error) {
  22. parts := strings.SplitN(protoAddr, "://", 2)
  23. proto, addr := parts[0], parts[1]
  24. s := &Server{
  25. proto: proto,
  26. addr: addr,
  27. listener: nil,
  28. app: app,
  29. }
  30. s.QuitService = *NewQuitService(nil, "TMSPServer", s)
  31. s.Start() // Just start it
  32. return s, nil
  33. }
  34. func (s *Server) OnStart() error {
  35. s.QuitService.OnStart()
  36. ln, err := net.Listen(s.proto, s.addr)
  37. if err != nil {
  38. return err
  39. }
  40. s.listener = ln
  41. go s.acceptConnectionsRoutine()
  42. return nil
  43. }
  44. func (s *Server) OnStop() {
  45. s.QuitService.OnStop()
  46. s.listener.Close()
  47. }
  48. func (s *Server) acceptConnectionsRoutine() {
  49. // semaphore := make(chan struct{}, maxNumberConnections)
  50. for {
  51. // semaphore <- struct{}{}
  52. // Accept a connection
  53. fmt.Println("Waiting for new connection...")
  54. conn, err := s.listener.Accept()
  55. if err != nil {
  56. if !s.IsRunning() {
  57. return // Ignore error from listener closing.
  58. }
  59. Exit("Failed to accept connection: " + err.Error())
  60. } else {
  61. fmt.Println("Accepted a new connection")
  62. }
  63. closeConn := make(chan error, 2) // Push to signal connection closed
  64. responses := make(chan *types.Response, 1000) // A channel to buffer responses
  65. // Read requests from conn and deal with them
  66. go s.handleRequests(closeConn, conn, responses)
  67. // Pull responses from 'responses' and write them to conn.
  68. go s.handleResponses(closeConn, responses, conn)
  69. go func() {
  70. // Wait until signal to close connection
  71. errClose := <-closeConn
  72. if errClose != nil {
  73. fmt.Printf("Connection error: %v\n", errClose)
  74. } else {
  75. fmt.Println("Connection was closed.")
  76. }
  77. // Close the connection
  78. err := conn.Close()
  79. if err != nil {
  80. fmt.Printf("Error in closing connection: %v\n", err)
  81. }
  82. // <-semaphore
  83. }()
  84. }
  85. }
  86. // Read requests from conn and deal with them
  87. func (s *Server) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
  88. var count int
  89. var bufReader = bufio.NewReader(conn)
  90. for {
  91. var req = &types.Request{}
  92. err := types.ReadMessage(bufReader, req)
  93. if err != nil {
  94. if err == io.EOF {
  95. closeConn <- fmt.Errorf("Connection closed by client")
  96. } else {
  97. closeConn <- fmt.Errorf("Error in handleRequests: %v", err.Error())
  98. }
  99. return
  100. }
  101. s.appMtx.Lock()
  102. count++
  103. s.handleRequest(req, responses)
  104. s.appMtx.Unlock()
  105. }
  106. }
  107. func (s *Server) handleRequest(req *types.Request, responses chan<- *types.Response) {
  108. switch req.Type {
  109. case types.MessageType_Echo:
  110. responses <- types.ResponseEcho(string(req.Data))
  111. case types.MessageType_Flush:
  112. responses <- types.ResponseFlush()
  113. case types.MessageType_Info:
  114. data := s.app.Info()
  115. responses <- types.ResponseInfo(data)
  116. case types.MessageType_SetOption:
  117. logStr := s.app.SetOption(req.Key, req.Value)
  118. responses <- types.ResponseSetOption(logStr)
  119. case types.MessageType_AppendTx:
  120. code, result, logStr := s.app.AppendTx(req.Data)
  121. responses <- types.ResponseAppendTx(code, result, logStr)
  122. case types.MessageType_CheckTx:
  123. code, result, logStr := s.app.CheckTx(req.Data)
  124. responses <- types.ResponseCheckTx(code, result, logStr)
  125. case types.MessageType_Commit:
  126. hash, logStr := s.app.Commit()
  127. responses <- types.ResponseCommit(hash, logStr)
  128. case types.MessageType_Query:
  129. code, result, logStr := s.app.Query(req.Data)
  130. responses <- types.ResponseQuery(code, result, logStr)
  131. default:
  132. responses <- types.ResponseException("Unknown request")
  133. }
  134. }
  135. // Pull responses from 'responses' and write them to conn.
  136. func (s *Server) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) {
  137. var count int
  138. var bufWriter = bufio.NewWriter(conn)
  139. for {
  140. var res = <-responses
  141. err := types.WriteMessage(res, bufWriter)
  142. if err != nil {
  143. closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
  144. return
  145. }
  146. if res.Type == types.MessageType_Flush {
  147. err = bufWriter.Flush()
  148. if err != nil {
  149. closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
  150. return
  151. }
  152. }
  153. count++
  154. }
  155. }