From 2c1aa7af2b1dc612cc820f1b1c17d9fb263baf89 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 21 Feb 2016 23:44:33 -0800 Subject: [PATCH] s/StartListener/NewServer/g --- cmd/counter/main.go | 2 +- cmd/dummy/main.go | 2 +- example/dummy/dummy_test.go | 2 +- server/server.go | 142 ++++++++++++++++++++++-------------- 4 files changed, 89 insertions(+), 59 deletions(-) diff --git a/cmd/counter/main.go b/cmd/counter/main.go index 37e735684..fc003fb88 100644 --- a/cmd/counter/main.go +++ b/cmd/counter/main.go @@ -16,7 +16,7 @@ func main() { app := counter.NewCounterApplication(*serialPtr) // Start the listener - _, err := server.StartListener(*addrPtr, app) + _, err := server.NewServer(*addrPtr, app) if err != nil { Exit(err.Error()) } diff --git a/cmd/dummy/main.go b/cmd/dummy/main.go index b4e542c65..25486e44f 100644 --- a/cmd/dummy/main.go +++ b/cmd/dummy/main.go @@ -14,7 +14,7 @@ func main() { flag.Parse() // Start the listener - _, err := server.StartListener(*addrPtr, dummy.NewDummyApplication()) + _, err := server.NewServer(*addrPtr, dummy.NewDummyApplication()) if err != nil { Exit(err.Error()) } diff --git a/example/dummy/dummy_test.go b/example/dummy/dummy_test.go index 266cf6359..073815bef 100644 --- a/example/dummy/dummy_test.go +++ b/example/dummy/dummy_test.go @@ -14,7 +14,7 @@ func TestStream(t *testing.T) { numAppendTxs := 200000 // Start the listener - _, err := server.StartListener("tcp://127.0.0.1:46658", NewDummyApplication()) + _, err := server.NewServer("tcp://127.0.0.1:46658", NewDummyApplication()) if err != nil { Exit(err.Error()) } diff --git a/server/server.go b/server/server.go index 933aa4c6e..e3ac8a377 100644 --- a/server/server.go +++ b/server/server.go @@ -14,65 +14,95 @@ import ( // var maxNumberConnections = 2 -func StartListener(protoAddr string, app types.Application) (net.Listener, error) { - var mtx sync.Mutex // global mutex +type Server struct { + QuitService + + proto string + addr string + listener net.Listener + + appMtx sync.Mutex + app types.Application +} + +func NewServer(protoAddr string, app types.Application) (*Server, error) { parts := strings.SplitN(protoAddr, "://", 2) proto, addr := parts[0], parts[1] - ln, err := net.Listen(proto, addr) + s := &Server{ + proto: proto, + addr: addr, + listener: nil, + app: app, + } + s.QuitService = *NewQuitService(nil, "TMSPServer", s) + s.Start() // Just start it + return s, nil +} + +func (s *Server) OnStart() error { + s.QuitService.OnStart() + ln, err := net.Listen(s.proto, s.addr) if err != nil { - return nil, err + return err } + s.listener = ln + go s.acceptConnectionsRoutine() + return nil +} - // A goroutine to accept a connection. - go func() { - // semaphore := make(chan struct{}, maxNumberConnections) +func (s *Server) OnStop() { + s.QuitService.OnStop() + s.listener.Close() +} - for { - // semaphore <- struct{}{} +func (s *Server) acceptConnectionsRoutine() { + // semaphore := make(chan struct{}, maxNumberConnections) - // Accept a connection - fmt.Println("Waiting for new connection...") - conn, err := ln.Accept() - if err != nil { - Exit("Failed to accept connection") - } else { - fmt.Println("Accepted a new connection") - } + for { + // semaphore <- struct{}{} - closeConn := make(chan error, 2) // Push to signal connection closed - responses := make(chan *types.Response, 1000) // A channel to buffer responses - - // Read requests from conn and deal with them - go handleRequests(&mtx, app, closeConn, conn, responses) - // Pull responses from 'responses' and write them to conn. - go handleResponses(closeConn, responses, conn) - - go func() { - // Wait until signal to close connection - errClose := <-closeConn - if errClose != nil { - fmt.Printf("Connection error: %v\n", errClose) - } else { - fmt.Println("Connection was closed.") - } - - // Close the connection - err := conn.Close() - if err != nil { - fmt.Printf("Error in closing connection: %v\n", err) - } - - // <-semaphore - }() + // Accept a connection + fmt.Println("Waiting for new connection...") + conn, err := s.listener.Accept() + if err != nil { + if !s.IsRunning() { + return // Ignore error from listener closing. + } + Exit("Failed to accept connection: " + err.Error()) + } else { + fmt.Println("Accepted a new connection") } - }() + closeConn := make(chan error, 2) // Push to signal connection closed + responses := make(chan *types.Response, 1000) // A channel to buffer responses - return ln, nil + // Read requests from conn and deal with them + go s.handleRequests(closeConn, conn, responses) + // Pull responses from 'responses' and write them to conn. + go s.handleResponses(closeConn, responses, conn) + + go func() { + // Wait until signal to close connection + errClose := <-closeConn + if errClose != nil { + fmt.Printf("Connection error: %v\n", errClose) + } else { + fmt.Println("Connection was closed.") + } + + // Close the connection + err := conn.Close() + if err != nil { + fmt.Printf("Error in closing connection: %v\n", err) + } + + // <-semaphore + }() + } } // Read requests from conn and deal with them -func handleRequests(mtx *sync.Mutex, app types.Application, closeConn chan error, conn net.Conn, responses chan<- *types.Response) { +func (s *Server) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) { var count int var bufReader = bufio.NewReader(conn) for { @@ -87,36 +117,36 @@ func handleRequests(mtx *sync.Mutex, app types.Application, closeConn chan error } return } - mtx.Lock() + s.appMtx.Lock() count++ - handleRequest(app, req, responses) - mtx.Unlock() + s.handleRequest(req, responses) + s.appMtx.Unlock() } } -func handleRequest(app types.Application, req *types.Request, responses chan<- *types.Response) { +func (s *Server) handleRequest(req *types.Request, responses chan<- *types.Response) { switch req.Type { case types.MessageType_Echo: responses <- types.ResponseEcho(string(req.Data)) case types.MessageType_Flush: responses <- types.ResponseFlush() case types.MessageType_Info: - data := app.Info() + data := s.app.Info() responses <- types.ResponseInfo(data) case types.MessageType_SetOption: - logStr := app.SetOption(req.Key, req.Value) + logStr := s.app.SetOption(req.Key, req.Value) responses <- types.ResponseSetOption(logStr) case types.MessageType_AppendTx: - code, result, logStr := app.AppendTx(req.Data) + code, result, logStr := s.app.AppendTx(req.Data) responses <- types.ResponseAppendTx(code, result, logStr) case types.MessageType_CheckTx: - code, result, logStr := app.CheckTx(req.Data) + code, result, logStr := s.app.CheckTx(req.Data) responses <- types.ResponseCheckTx(code, result, logStr) case types.MessageType_Commit: - hash, logStr := app.Commit() + hash, logStr := s.app.Commit() responses <- types.ResponseCommit(hash, logStr) case types.MessageType_Query: - code, result, logStr := app.Query(req.Data) + code, result, logStr := s.app.Query(req.Data) responses <- types.ResponseQuery(code, result, logStr) default: responses <- types.ResponseException("Unknown request") @@ -124,7 +154,7 @@ func handleRequest(app types.Application, req *types.Request, responses chan<- * } // Pull responses from 'responses' and write them to conn. -func handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) { +func (s *Server) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) { var count int var bufWriter = bufio.NewWriter(conn) for {