|
@ -5,9 +5,12 @@ import ( |
|
|
"fmt" |
|
|
"fmt" |
|
|
"io" |
|
|
"io" |
|
|
"net" |
|
|
"net" |
|
|
|
|
|
"os" |
|
|
|
|
|
"runtime" |
|
|
"sync" |
|
|
"sync" |
|
|
|
|
|
|
|
|
"github.com/tendermint/tendermint/abci/types" |
|
|
"github.com/tendermint/tendermint/abci/types" |
|
|
|
|
|
tmlog "github.com/tendermint/tendermint/libs/log" |
|
|
tmnet "github.com/tendermint/tendermint/libs/net" |
|
|
tmnet "github.com/tendermint/tendermint/libs/net" |
|
|
"github.com/tendermint/tendermint/libs/service" |
|
|
"github.com/tendermint/tendermint/libs/service" |
|
|
) |
|
|
) |
|
@ -16,6 +19,7 @@ import ( |
|
|
|
|
|
|
|
|
type SocketServer struct { |
|
|
type SocketServer struct { |
|
|
service.BaseService |
|
|
service.BaseService |
|
|
|
|
|
isLoggerSet bool |
|
|
|
|
|
|
|
|
proto string |
|
|
proto string |
|
|
addr string |
|
|
addr string |
|
@ -42,21 +46,24 @@ func NewSocketServer(protoAddr string, app types.Application) service.Service { |
|
|
return s |
|
|
return s |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (s *SocketServer) SetLogger(l tmlog.Logger) { |
|
|
|
|
|
s.BaseService.SetLogger(l) |
|
|
|
|
|
s.isLoggerSet = true |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
func (s *SocketServer) OnStart() error { |
|
|
func (s *SocketServer) OnStart() error { |
|
|
if err := s.BaseService.OnStart(); err != nil { |
|
|
|
|
|
return err |
|
|
|
|
|
} |
|
|
|
|
|
ln, err := net.Listen(s.proto, s.addr) |
|
|
ln, err := net.Listen(s.proto, s.addr) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
s.listener = ln |
|
|
s.listener = ln |
|
|
go s.acceptConnectionsRoutine() |
|
|
go s.acceptConnectionsRoutine() |
|
|
|
|
|
|
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (s *SocketServer) OnStop() { |
|
|
func (s *SocketServer) OnStop() { |
|
|
s.BaseService.OnStop() |
|
|
|
|
|
if err := s.listener.Close(); err != nil { |
|
|
if err := s.listener.Close(); err != nil { |
|
|
s.Logger.Error("Error closing listener", "err", err) |
|
|
s.Logger.Error("Error closing listener", "err", err) |
|
|
} |
|
|
} |
|
@ -105,7 +112,7 @@ func (s *SocketServer) acceptConnectionsRoutine() { |
|
|
if !s.IsRunning() { |
|
|
if !s.IsRunning() { |
|
|
return // Ignore error from listener closing.
|
|
|
return // Ignore error from listener closing.
|
|
|
} |
|
|
} |
|
|
s.Logger.Error("Failed to accept connection: " + err.Error()) |
|
|
|
|
|
|
|
|
s.Logger.Error("Failed to accept connection", "err", err) |
|
|
continue |
|
|
continue |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -132,15 +139,15 @@ func (s *SocketServer) waitForClose(closeConn chan error, connID int) { |
|
|
case err == io.EOF: |
|
|
case err == io.EOF: |
|
|
s.Logger.Error("Connection was closed by client") |
|
|
s.Logger.Error("Connection was closed by client") |
|
|
case err != nil: |
|
|
case err != nil: |
|
|
s.Logger.Error("Connection error", "error", err) |
|
|
|
|
|
|
|
|
s.Logger.Error("Connection error", "err", err) |
|
|
default: |
|
|
default: |
|
|
// never happens
|
|
|
// never happens
|
|
|
s.Logger.Error("Connection was closed.") |
|
|
|
|
|
|
|
|
s.Logger.Error("Connection was closed") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// Close the connection
|
|
|
// Close the connection
|
|
|
if err := s.rmConn(connID); err != nil { |
|
|
if err := s.rmConn(connID); err != nil { |
|
|
s.Logger.Error("Error in closing connection", "error", err) |
|
|
|
|
|
|
|
|
s.Logger.Error("Error closing connection", "err", err) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -153,7 +160,14 @@ func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, resp |
|
|
// make sure to recover from any app-related panics to allow proper socket cleanup
|
|
|
// make sure to recover from any app-related panics to allow proper socket cleanup
|
|
|
r := recover() |
|
|
r := recover() |
|
|
if r != nil { |
|
|
if r != nil { |
|
|
closeConn <- fmt.Errorf("recovered from panic: %v", r) |
|
|
|
|
|
|
|
|
const size = 64 << 10 |
|
|
|
|
|
buf := make([]byte, size) |
|
|
|
|
|
buf = buf[:runtime.Stack(buf, false)] |
|
|
|
|
|
err := fmt.Errorf("recovered from panic: %v\n%s", r, buf) |
|
|
|
|
|
if !s.isLoggerSet { |
|
|
|
|
|
fmt.Fprintln(os.Stderr, err) |
|
|
|
|
|
} |
|
|
|
|
|
closeConn <- err |
|
|
s.appMtx.Unlock() |
|
|
s.appMtx.Unlock() |
|
|
} |
|
|
} |
|
|
}() |
|
|
}() |
|
@ -166,7 +180,7 @@ func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, resp |
|
|
if err == io.EOF { |
|
|
if err == io.EOF { |
|
|
closeConn <- err |
|
|
closeConn <- err |
|
|
} else { |
|
|
} else { |
|
|
closeConn <- fmt.Errorf("error reading message: %v", err) |
|
|
|
|
|
|
|
|
closeConn <- fmt.Errorf("error reading message: %w", err) |
|
|
} |
|
|
} |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
@ -223,13 +237,13 @@ func (s *SocketServer) handleResponses(closeConn chan error, conn io.Writer, res |
|
|
var res = <-responses |
|
|
var res = <-responses |
|
|
err := types.WriteMessage(res, bufWriter) |
|
|
err := types.WriteMessage(res, bufWriter) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
closeConn <- fmt.Errorf("error writing message: %v", err.Error()) |
|
|
|
|
|
|
|
|
closeConn <- fmt.Errorf("error writing message: %w", err) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
if _, ok := res.Value.(*types.Response_Flush); ok { |
|
|
if _, ok := res.Value.(*types.Response_Flush); ok { |
|
|
err = bufWriter.Flush() |
|
|
err = bufWriter.Flush() |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
closeConn <- fmt.Errorf("error flushing write buffer: %v", err.Error()) |
|
|
|
|
|
|
|
|
closeConn <- fmt.Errorf("error flushing write buffer: %w", err) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|