|
|
@ -30,7 +30,7 @@ func SignerValidatorEndpointSetHeartbeat(period time.Duration) SignerValidatorEn |
|
|
|
|
|
|
|
// TODO: Add a type for SignerEndpoints
|
|
|
|
// getConnection
|
|
|
|
// connect
|
|
|
|
// tryConnect
|
|
|
|
// read
|
|
|
|
// write
|
|
|
|
// close
|
|
|
@ -66,18 +66,14 @@ func NewSignerListenerEndpoint(logger log.Logger, listener net.Listener) *Signer |
|
|
|
|
|
|
|
// OnStart implements cmn.Service.
|
|
|
|
func (ve *SignerListenerEndpoint) OnStart() error { |
|
|
|
closed, err := ve.connect() |
|
|
|
// TODO: Improve. Connection state should be kept in a variable
|
|
|
|
ve.Logger.Debug("SignerListenerEndpoint: OnStart") |
|
|
|
|
|
|
|
err := ve.tryConnect() |
|
|
|
if err != nil { |
|
|
|
ve.Logger.Error("OnStart", "err", err) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
if closed { |
|
|
|
return fmt.Errorf("listener is closed") |
|
|
|
} |
|
|
|
|
|
|
|
// Start a routine to keep the connection alive
|
|
|
|
ve.cancelPingCh = make(chan struct{}, 1) |
|
|
|
ve.pingTicker = time.NewTicker(ve.heartbeatPeriod) |
|
|
@ -87,6 +83,7 @@ func (ve *SignerListenerEndpoint) OnStart() error { |
|
|
|
for { |
|
|
|
select { |
|
|
|
case <-ve.pingTicker.C: |
|
|
|
ve.Logger.Debug("SignerListenerEndpoint: ping ticker ch") |
|
|
|
err := ve.ping() |
|
|
|
if err != nil { |
|
|
|
ve.Logger.Error("Ping", "err", err) |
|
|
@ -94,19 +91,17 @@ func (ve *SignerListenerEndpoint) OnStart() error { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
closed, err := ve.connect() |
|
|
|
err := ve.tryConnect() |
|
|
|
if err != nil { |
|
|
|
ve.Logger.Error("Reconnecting to remote signer failed", "err", err) |
|
|
|
continue |
|
|
|
} |
|
|
|
if closed { |
|
|
|
ve.Logger.Info("listener is closing") |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
ve.Logger.Info("Re-created connection to remote signer", "impl", ve) |
|
|
|
} |
|
|
|
case <-ve.cancelPingCh: |
|
|
|
ve.Logger.Debug("SignerListenerEndpoint: cancel ping ch") |
|
|
|
|
|
|
|
ve.pingTicker.Stop() |
|
|
|
return |
|
|
|
} |
|
|
@ -118,18 +113,38 @@ func (ve *SignerListenerEndpoint) OnStart() error { |
|
|
|
|
|
|
|
// OnStop implements cmn.Service.
|
|
|
|
func (ve *SignerListenerEndpoint) OnStop() { |
|
|
|
ve.Logger.Debug("SignerListenerEndpoint: OnStop") |
|
|
|
|
|
|
|
if ve.cancelPingCh != nil { |
|
|
|
ve.Logger.Debug("SignerListenerEndpoint: Close cancel ping channel") |
|
|
|
close(ve.cancelPingCh) |
|
|
|
ve.cancelPingCh = nil |
|
|
|
} |
|
|
|
|
|
|
|
ve.Logger.Debug("SignerListenerEndpoint: OnStop calling Close") |
|
|
|
_ = ve.Close() |
|
|
|
} |
|
|
|
|
|
|
|
// IsConnected indicates if there is an active connection
|
|
|
|
func (ve *SignerListenerEndpoint) IsConnected() bool { |
|
|
|
ve.mtx.Lock() |
|
|
|
defer ve.mtx.Unlock() |
|
|
|
return ve.isConnected() |
|
|
|
} |
|
|
|
|
|
|
|
// WaitForConnection waits maxWait for a connection or returns a timeout error
|
|
|
|
func (ve *SignerListenerEndpoint) WaitForConnection(maxWait time.Duration) error { |
|
|
|
// TODO: complete this
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// Close closes the underlying net.Conn.
|
|
|
|
func (ve *SignerListenerEndpoint) Close() error { |
|
|
|
ve.mtx.Lock() |
|
|
|
defer ve.mtx.Unlock() |
|
|
|
|
|
|
|
ve.Logger.Debug("SignerListenerEndpoint: Close") |
|
|
|
|
|
|
|
if ve.conn != nil { |
|
|
|
if err := ve.conn.Close(); err != nil { |
|
|
|
ve.Logger.Error("Closing connection", "err", err) |
|
|
@ -153,6 +168,10 @@ func (ve *SignerListenerEndpoint) SendRequest(request RemoteSignerMsg) (RemoteSi |
|
|
|
ve.mtx.Lock() |
|
|
|
defer ve.mtx.Unlock() |
|
|
|
|
|
|
|
if !ve.isConnected() { |
|
|
|
return nil, cmn.ErrorWrap(ErrListenerTimeout, "endpoint is not connected") |
|
|
|
} |
|
|
|
|
|
|
|
err := ve.writeMessage(request) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
@ -166,24 +185,15 @@ func (ve *SignerListenerEndpoint) SendRequest(request RemoteSignerMsg) (RemoteSi |
|
|
|
return res, nil |
|
|
|
} |
|
|
|
|
|
|
|
// Ping is used to check connection health.
|
|
|
|
func (ve *SignerListenerEndpoint) ping() error { |
|
|
|
response, err := ve.SendRequest(&PingRequest{}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
_, ok := response.(*PingResponse) |
|
|
|
if !ok { |
|
|
|
return ErrUnexpectedResponse |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
// IsConnected indicates if there is an active connection
|
|
|
|
func (ve *SignerListenerEndpoint) isConnected() bool { |
|
|
|
return ve.IsRunning() && ve.conn != nil |
|
|
|
} |
|
|
|
|
|
|
|
func (ve *SignerListenerEndpoint) readMessage() (msg RemoteSignerMsg, err error) { |
|
|
|
// TODO: Check connection status
|
|
|
|
if !ve.isConnected() { |
|
|
|
return nil, cmn.ErrorWrap(ErrListenerTimeout, "endpoint is not connected") |
|
|
|
} |
|
|
|
|
|
|
|
const maxRemoteSignerMsgSize = 1024 * 10 |
|
|
|
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(ve.conn, &msg, maxRemoteSignerMsgSize) |
|
|
@ -195,9 +205,8 @@ func (ve *SignerListenerEndpoint) readMessage() (msg RemoteSignerMsg, err error) |
|
|
|
} |
|
|
|
|
|
|
|
func (ve *SignerListenerEndpoint) writeMessage(msg RemoteSignerMsg) (err error) { |
|
|
|
// TODO: Check connection status
|
|
|
|
if ve.conn == nil { |
|
|
|
return fmt.Errorf("endpoint is not connected") |
|
|
|
if !ve.isConnected() { |
|
|
|
return cmn.ErrorWrap(ErrListenerTimeout, "endpoint is not connected") |
|
|
|
} |
|
|
|
|
|
|
|
_, err = cdc.MarshalBinaryLengthPrefixedWriter(ve.conn, msg) |
|
|
@ -208,39 +217,55 @@ func (ve *SignerListenerEndpoint) writeMessage(msg RemoteSignerMsg) (err error) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
// waits to accept and sets a new connection.
|
|
|
|
// connection is closed in OnStop.
|
|
|
|
// returns true if the listener is closed (ie. it returns a nil conn).
|
|
|
|
// TODO: Improve this
|
|
|
|
func (ve *SignerListenerEndpoint) connect() (closed bool, err error) { |
|
|
|
// tryConnect waits to accept a new connection.
|
|
|
|
func (ve *SignerListenerEndpoint) tryConnect() error { |
|
|
|
ve.mtx.Lock() |
|
|
|
defer ve.mtx.Unlock() |
|
|
|
|
|
|
|
// first check if the conn already exists and close it.
|
|
|
|
ve.Logger.Debug("SignerListenerEndpoint: tryConnect") |
|
|
|
|
|
|
|
if !ve.IsRunning() || ve.listener == nil { |
|
|
|
return fmt.Errorf("endpoint is closing") |
|
|
|
} |
|
|
|
|
|
|
|
// if the conn already exists and close it.
|
|
|
|
if ve.conn != nil { |
|
|
|
if tmpErr := ve.conn.Close(); tmpErr != nil { |
|
|
|
ve.Logger.Error("error closing socket val connection during connect", "err", tmpErr) |
|
|
|
ve.Logger.Error("error closing socket val connection during tryConnect", "err", tmpErr) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Forget old connection
|
|
|
|
ve.conn = nil |
|
|
|
|
|
|
|
// wait for a new conn
|
|
|
|
ve.conn, err = ve.listener.Accept() |
|
|
|
conn, err := ve.listener.Accept() |
|
|
|
if err != nil { |
|
|
|
return false, err |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
// listener is closed
|
|
|
|
if ve.conn == nil { |
|
|
|
return true, nil |
|
|
|
} |
|
|
|
ve.Logger.Debug("SignerListenerEndpoint: New connection") |
|
|
|
|
|
|
|
ve.conn = conn |
|
|
|
// TODO: maybe we need to inform the owner that a connection has been received
|
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// Ping is used to check connection health.
|
|
|
|
func (ve *SignerListenerEndpoint) ping() error { |
|
|
|
response, err := ve.SendRequest(&PingRequest{}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
// TODO: This does not belong here... but maybe we need to inform the owner that a connection has been received
|
|
|
|
// failed to fetch the pubkey. close out the connection.
|
|
|
|
if tmpErr := ve.conn.Close(); tmpErr != nil { |
|
|
|
ve.Logger.Error("error closing connection", "err", tmpErr) |
|
|
|
} |
|
|
|
return false, err |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
_, ok := response.(*PingResponse) |
|
|
|
if !ok { |
|
|
|
return ErrUnexpectedResponse |
|
|
|
} |
|
|
|
return false, nil |
|
|
|
|
|
|
|
ve.Logger.Debug("SignerListenerEndpoint: pong") |
|
|
|
|
|
|
|
return nil |
|
|
|
} |