You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

264 lines
6.7 KiB

7 years ago
7 years ago
8 years ago
8 years ago
  1. package server
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "os"
  8. "runtime"
  9. "github.com/tendermint/tendermint/abci/types"
  10. tmlog "github.com/tendermint/tendermint/libs/log"
  11. tmnet "github.com/tendermint/tendermint/libs/net"
  12. "github.com/tendermint/tendermint/libs/service"
  13. tmsync "github.com/tendermint/tendermint/libs/sync"
  14. )
  15. // var maxNumberConnections = 2
  16. type SocketServer struct {
  17. service.BaseService
  18. isLoggerSet bool
  19. proto string
  20. addr string
  21. listener net.Listener
  22. connsMtx tmsync.Mutex
  23. conns map[int]net.Conn
  24. nextConnID int
  25. appMtx tmsync.Mutex
  26. app types.Application
  27. }
  28. func NewSocketServer(protoAddr string, app types.Application) service.Service {
  29. proto, addr := tmnet.ProtocolAndAddress(protoAddr)
  30. s := &SocketServer{
  31. proto: proto,
  32. addr: addr,
  33. listener: nil,
  34. app: app,
  35. conns: make(map[int]net.Conn),
  36. }
  37. s.BaseService = *service.NewBaseService(nil, "ABCIServer", s)
  38. return s
  39. }
  40. func (s *SocketServer) SetLogger(l tmlog.Logger) {
  41. s.BaseService.SetLogger(l)
  42. s.isLoggerSet = true
  43. }
  44. func (s *SocketServer) OnStart() error {
  45. ln, err := net.Listen(s.proto, s.addr)
  46. if err != nil {
  47. return err
  48. }
  49. s.listener = ln
  50. go s.acceptConnectionsRoutine()
  51. return nil
  52. }
  53. func (s *SocketServer) OnStop() {
  54. if err := s.listener.Close(); err != nil {
  55. s.Logger.Error("Error closing listener", "err", err)
  56. }
  57. s.connsMtx.Lock()
  58. defer s.connsMtx.Unlock()
  59. for id, conn := range s.conns {
  60. delete(s.conns, id)
  61. if err := conn.Close(); err != nil {
  62. s.Logger.Error("Error closing connection", "id", id, "conn", conn, "err", err)
  63. }
  64. }
  65. }
  66. func (s *SocketServer) addConn(conn net.Conn) int {
  67. s.connsMtx.Lock()
  68. defer s.connsMtx.Unlock()
  69. connID := s.nextConnID
  70. s.nextConnID++
  71. s.conns[connID] = conn
  72. return connID
  73. }
  74. // deletes conn even if close errs
  75. func (s *SocketServer) rmConn(connID int) error {
  76. s.connsMtx.Lock()
  77. defer s.connsMtx.Unlock()
  78. conn, ok := s.conns[connID]
  79. if !ok {
  80. return fmt.Errorf("connection %d does not exist", connID)
  81. }
  82. delete(s.conns, connID)
  83. return conn.Close()
  84. }
  85. func (s *SocketServer) acceptConnectionsRoutine() {
  86. for {
  87. // Accept a connection
  88. s.Logger.Info("Waiting for new connection...")
  89. conn, err := s.listener.Accept()
  90. if err != nil {
  91. if !s.IsRunning() {
  92. return // Ignore error from listener closing.
  93. }
  94. s.Logger.Error("Failed to accept connection", "err", err)
  95. continue
  96. }
  97. s.Logger.Info("Accepted a new connection")
  98. connID := s.addConn(conn)
  99. closeConn := make(chan error, 2) // Push to signal connection closed
  100. responses := make(chan *types.Response, 1000) // A channel to buffer responses
  101. // Read requests from conn and deal with them
  102. go s.handleRequests(closeConn, conn, responses)
  103. // Pull responses from 'responses' and write them to conn.
  104. go s.handleResponses(closeConn, conn, responses)
  105. // Wait until signal to close connection
  106. go s.waitForClose(closeConn, connID)
  107. }
  108. }
  109. func (s *SocketServer) waitForClose(closeConn chan error, connID int) {
  110. err := <-closeConn
  111. switch {
  112. case err == io.EOF:
  113. s.Logger.Error("Connection was closed by client")
  114. case err != nil:
  115. s.Logger.Error("Connection error", "err", err)
  116. default:
  117. // never happens
  118. s.Logger.Error("Connection was closed")
  119. }
  120. // Close the connection
  121. if err := s.rmConn(connID); err != nil {
  122. s.Logger.Error("Error closing connection", "err", err)
  123. }
  124. }
  125. // Read requests from conn and deal with them
  126. func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, responses chan<- *types.Response) {
  127. var count int
  128. var bufReader = bufio.NewReader(conn)
  129. defer func() {
  130. // make sure to recover from any app-related panics to allow proper socket cleanup
  131. r := recover()
  132. if r != nil {
  133. const size = 64 << 10
  134. buf := make([]byte, size)
  135. buf = buf[:runtime.Stack(buf, false)]
  136. err := fmt.Errorf("recovered from panic: %v\n%s", r, buf)
  137. if !s.isLoggerSet {
  138. fmt.Fprintln(os.Stderr, err)
  139. }
  140. closeConn <- err
  141. s.appMtx.Unlock()
  142. }
  143. }()
  144. for {
  145. var req = &types.Request{}
  146. err := types.ReadMessage(bufReader, req)
  147. if err != nil {
  148. if err == io.EOF {
  149. closeConn <- err
  150. } else {
  151. closeConn <- fmt.Errorf("error reading message: %w", err)
  152. }
  153. return
  154. }
  155. s.appMtx.Lock()
  156. count++
  157. s.handleRequest(req, responses)
  158. s.appMtx.Unlock()
  159. }
  160. }
  161. func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
  162. switch r := req.Value.(type) {
  163. case *types.Request_Echo:
  164. responses <- types.ToResponseEcho(r.Echo.Message)
  165. case *types.Request_Flush:
  166. responses <- types.ToResponseFlush()
  167. case *types.Request_Info:
  168. res := s.app.Info(*r.Info)
  169. responses <- types.ToResponseInfo(res)
  170. case *types.Request_SetOption:
  171. res := s.app.SetOption(*r.SetOption)
  172. responses <- types.ToResponseSetOption(res)
  173. case *types.Request_DeliverTx:
  174. res := s.app.DeliverTx(*r.DeliverTx)
  175. responses <- types.ToResponseDeliverTx(res)
  176. case *types.Request_CheckTx:
  177. res := s.app.CheckTx(*r.CheckTx)
  178. responses <- types.ToResponseCheckTx(res)
  179. case *types.Request_Commit:
  180. res := s.app.Commit()
  181. responses <- types.ToResponseCommit(res)
  182. case *types.Request_Query:
  183. res := s.app.Query(*r.Query)
  184. responses <- types.ToResponseQuery(res)
  185. case *types.Request_InitChain:
  186. res := s.app.InitChain(*r.InitChain)
  187. responses <- types.ToResponseInitChain(res)
  188. case *types.Request_BeginBlock:
  189. res := s.app.BeginBlock(*r.BeginBlock)
  190. responses <- types.ToResponseBeginBlock(res)
  191. case *types.Request_EndBlock:
  192. res := s.app.EndBlock(*r.EndBlock)
  193. responses <- types.ToResponseEndBlock(res)
  194. case *types.Request_ListSnapshots:
  195. res := s.app.ListSnapshots(*r.ListSnapshots)
  196. responses <- types.ToResponseListSnapshots(res)
  197. case *types.Request_OfferSnapshot:
  198. res := s.app.OfferSnapshot(*r.OfferSnapshot)
  199. responses <- types.ToResponseOfferSnapshot(res)
  200. case *types.Request_LoadSnapshotChunk:
  201. res := s.app.LoadSnapshotChunk(*r.LoadSnapshotChunk)
  202. responses <- types.ToResponseLoadSnapshotChunk(res)
  203. case *types.Request_ApplySnapshotChunk:
  204. res := s.app.ApplySnapshotChunk(*r.ApplySnapshotChunk)
  205. responses <- types.ToResponseApplySnapshotChunk(res)
  206. default:
  207. responses <- types.ToResponseException("Unknown request")
  208. }
  209. }
  210. // Pull responses from 'responses' and write them to conn.
  211. func (s *SocketServer) handleResponses(closeConn chan error, conn io.Writer, responses <-chan *types.Response) {
  212. var count int
  213. var bufWriter = bufio.NewWriter(conn)
  214. for {
  215. var res = <-responses
  216. err := types.WriteMessage(res, bufWriter)
  217. if err != nil {
  218. closeConn <- fmt.Errorf("error writing message: %w", err)
  219. return
  220. }
  221. if _, ok := res.Value.(*types.Response_Flush); ok {
  222. err = bufWriter.Flush()
  223. if err != nil {
  224. closeConn <- fmt.Errorf("error flushing write buffer: %w", err)
  225. return
  226. }
  227. }
  228. count++
  229. }
  230. }