From 3bd8782ab211f2fec1f7302b9d931f8e3d56e7be Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 14 Nov 2017 17:03:23 +0000 Subject: [PATCH] server: minor refactor --- server/socket_server.go | 59 +++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/server/socket_server.go b/server/socket_server.go index 55227e7dc..2396c556d 100644 --- a/server/socket_server.go +++ b/server/socket_server.go @@ -59,11 +59,11 @@ func (s *SocketServer) OnStop() { s.listener.Close() s.connsMtx.Lock() + defer s.connsMtx.Unlock() for id, conn := range s.conns { delete(s.conns, id) conn.Close() } - s.connsMtx.Unlock() } func (s *SocketServer) addConn(conn net.Conn) int { @@ -78,20 +78,21 @@ func (s *SocketServer) addConn(conn net.Conn) int { } // deletes conn even if close errs -func (s *SocketServer) rmConn(connID int, conn net.Conn) error { +func (s *SocketServer) rmConn(connID int) error { s.connsMtx.Lock() defer s.connsMtx.Unlock() + conn, ok := s.conns[connID] + if !ok { + return fmt.Errorf("Connection %d does not exist", connID) + } + delete(s.conns, connID) return conn.Close() } func (s *SocketServer) acceptConnectionsRoutine() { - // semaphore := make(chan struct{}, maxNumberConnections) - for { - // semaphore <- struct{}{} - // Accept a connection s.Logger.Info("Waiting for new connection...") conn, err := s.listener.Accept() @@ -100,10 +101,11 @@ func (s *SocketServer) acceptConnectionsRoutine() { return // Ignore error from listener closing. } s.Logger.Error("Failed to accept connection: " + err.Error()) - } else { - s.Logger.Info("Accepted a new connection") + continue } + s.Logger.Info("Accepted a new connection") + connID := s.addConn(conn) closeConn := make(chan error, 2) // Push to signal connection closed @@ -112,28 +114,27 @@ func (s *SocketServer) acceptConnectionsRoutine() { // Read requests from conn and deal with them go s.handleRequests(closeConn, conn, responses) // Pull responses from 'responses' and write them to conn. - go s.handleResponses(closeConn, responses, conn) - - go func() { - // Wait until signal to close connection - errClose := <-closeConn - if errClose == io.EOF { - s.Logger.Error("Connection was closed by client") - } else if errClose != nil { - s.Logger.Error("Connection error", "error", errClose) - } else { - // never happens - s.Logger.Error("Connection was closed.") - } + go s.handleResponses(closeConn, conn, responses) - // Close the connection - err := s.rmConn(connID, conn) - if err != nil { - s.Logger.Error("Error in closing connection", "error", err) - } + // Wait until signal to close connection + go s.waitForClose(closeConn, connID) + } +} + +func (s *SocketServer) waitForClose(closeConn chan error, connID int) { + err := <-closeConn + if err == io.EOF { + s.Logger.Error("Connection was closed by client") + } else if err != nil { + s.Logger.Error("Connection error", "error", err) + } else { + // never happens + s.Logger.Error("Connection was closed.") + } - // <-semaphore - }() + // Close the connection + if err := s.rmConn(connID); err != nil { + s.Logger.Error("Error in closing connection", "error", err) } } @@ -200,7 +201,7 @@ func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types } // Pull responses from 'responses' and write them to conn. -func (s *SocketServer) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) { +func (s *SocketServer) handleResponses(closeConn chan error, conn net.Conn, responses <-chan *types.Response) { var count int var bufWriter = bufio.NewWriter(conn) for {