diff --git a/privval/signer_dialer_endpoint.go b/privval/signer_dialer_endpoint.go index c5900e009..e6aca4568 100644 --- a/privval/signer_dialer_endpoint.go +++ b/privval/signer_dialer_endpoint.go @@ -85,7 +85,6 @@ func (ss *SignerDialerEndpoint) OnStart() error { go ss.serviceLoop() - ss.Logger.Debug("OnStart - done") return nil } @@ -93,24 +92,18 @@ func (ss *SignerDialerEndpoint) OnStart() error { func (ss *SignerDialerEndpoint) OnStop() { ss.Logger.Debug("SignerDialerEndpoint: OnStop calling Close") _ = ss.Close() -} -// IsConnected indicates if there is an active connection -func (ss *SignerDialerEndpoint) IsConnected() bool { - ss.mtx.Lock() - defer ss.mtx.Unlock() - return ss.isConnected() + // Stop service loop + close(ss.stopCh) + <-ss.stoppedCh } // Close closes the underlying net.Conn. func (ss *SignerDialerEndpoint) Close() error { ss.mtx.Lock() defer ss.mtx.Unlock() - - close(ss.stopCh) - <-ss.stoppedCh - ss.Logger.Debug("SignerDialerEndpoint: Close") + if ss.conn != nil { if err := ss.conn.Close(); err != nil { ss.Logger.Error("OnStop", "err", cmn.ErrorWrap(err, "closing listener failed")) @@ -121,54 +114,25 @@ func (ss *SignerDialerEndpoint) Close() error { return nil } -func (ss *SignerDialerEndpoint) serviceLoop() { - defer close(ss.stoppedCh) - - retries := 0 - var err error - - for { - select { - default: - { - ss.Logger.Debug("Try connect", "retries", retries, "max", ss.maxConnRetries) - - if retries > ss.maxConnRetries { - ss.Logger.Error("Maximum retries reached", "retries", retries) - return - } - - if ss.conn == nil { - ss.conn, err = ss.dialer() - - if err != nil { - ss.conn = nil // Explicitly set to nil because dialer returns an interface (https://golang.org/doc/faq#nil_error) - retries++ - continue - } - } - - retries = 0 - ss.handleRequest() - } - - case <-ss.stopCh: - { - return - } - } - } +// IsConnected indicates if there is an active connection +func (ss *SignerDialerEndpoint) IsConnected() bool { + ss.mtx.Lock() + defer ss.mtx.Unlock() + return ss.isConnected() } func (ss *SignerDialerEndpoint) readMessage() (msg RemoteSignerMsg, err error) { // TODO(jleni): Avoid duplication. Unify endpoints - if ss.conn == nil { + if !ss.isConnected() { return nil, fmt.Errorf("not connected") } // Reset read deadline deadline := time.Now().Add(ss.timeoutReadWrite) - ss.Logger.Debug("SignerDialerEndpoint: readMessage", "deadline", deadline) + ss.Logger.Debug( + "SignerDialerEndpoint: readMessage", + "timeout", ss.timeoutReadWrite, + "deadline", deadline) err = ss.conn.SetReadDeadline(deadline) if err != nil { @@ -240,3 +204,42 @@ func (ss *SignerDialerEndpoint) handleRequest() { func (ve *SignerDialerEndpoint) isConnected() bool { return ve.IsRunning() && ve.conn != nil } + +func (ss *SignerDialerEndpoint) serviceLoop() { + defer close(ss.stoppedCh) + + retries := 0 + var err error + + for { + select { + default: + { + ss.Logger.Debug("Try connect", "retries", retries, "max", ss.maxConnRetries) + + if retries > ss.maxConnRetries { + ss.Logger.Error("Maximum retries reached", "retries", retries) + return + } + + if ss.conn == nil { + ss.conn, err = ss.dialer() + + if err != nil { + ss.conn = nil // Explicitly set to nil because dialer returns an interface (https://golang.org/doc/faq#nil_error) + retries++ + continue + } + } + + retries = 0 + ss.handleRequest() + } + + case <-ss.stopCh: + { + return + } + } + } +} diff --git a/privval/signer_listener_endpoint.go b/privval/signer_listener_endpoint.go index 09f552bb4..9815ac548 100644 --- a/privval/signer_listener_endpoint.go +++ b/privval/signer_listener_endpoint.go @@ -18,11 +18,13 @@ type SignerValidatorEndpointOption func(*SignerListenerEndpoint) type SignerListenerEndpoint struct { cmn.BaseService - mtx sync.Mutex + extMtx sync.Mutex listener net.Listener conn net.Conn timeoutReadWrite time.Duration + + stopCh, stoppedCh chan struct{} } // NewSignerListenerEndpoint returns an instance of SignerListenerEndpoint. @@ -42,13 +44,11 @@ func NewSignerListenerEndpoint(logger log.Logger, listener net.Listener) *Signer func (ve *SignerListenerEndpoint) OnStart() error { ve.Logger.Debug("SignerListenerEndpoint: OnStart") - err := ve.acceptNewConnection() - if err != nil { - ve.Logger.Error("OnStart", "err", err) - return err - } + ve.stopCh = make(chan struct{}) + ve.stoppedCh = make(chan struct{}) + + go ve.serviceLoop() - ve.Logger.Debug("SignerListenerEndpoint OnStart", "connected", ve.isConnected()) return nil } @@ -56,50 +56,53 @@ func (ve *SignerListenerEndpoint) OnStart() error { func (ve *SignerListenerEndpoint) OnStop() { ve.Logger.Debug("SignerListenerEndpoint: OnStop calling Close") _ = ve.Close() -} -// IsConnected indicates if there is an active connection -func (ve *SignerListenerEndpoint) IsConnected() bool { - ve.mtx.Lock() - defer ve.mtx.Unlock() - return ve.isConnected() -} - -// WaitForConnection waits maxWait for a connection or returns a timeout error -func (ve *SignerListenerEndpoint) WaitForConnection(maxWait time.Duration) error { - ve.mtx.Lock() - defer ve.mtx.Unlock() + // Stop listening + if ve.listener != nil { + if err := ve.listener.Close(); err != nil { + ve.Logger.Error("Closing Listener", "err", err) + } + } - // TODO(jleni): Pass maxwait through - return ve.ensureConnection() + // Stop service loop + close(ve.stopCh) + <-ve.stoppedCh } // Close closes the underlying net.Conn. func (ve *SignerListenerEndpoint) Close() error { - ve.mtx.Lock() - defer ve.mtx.Unlock() - + ve.extMtx.Lock() + defer ve.extMtx.Unlock() ve.Logger.Debug("SignerListenerEndpoint: Close") ve.dropConnection() - if ve.listener != nil { - if err := ve.listener.Close(); err != nil { - ve.Logger.Error("Closing Listener", "err", err) - return err - } - } - return nil } +// IsConnected indicates if there is an active connection +func (ve *SignerListenerEndpoint) IsConnected() bool { + ve.extMtx.Lock() + defer ve.extMtx.Unlock() + return ve.isConnected() +} + +// WaitForConnection waits maxWait for a connection or returns a timeout error +func (ve *SignerListenerEndpoint) WaitForConnection(maxWait time.Duration) error { + ve.extMtx.Lock() + defer ve.extMtx.Unlock() + return ve.ensureConnection(maxWait) +} + // SendRequest sends a request and waits for a response func (ve *SignerListenerEndpoint) SendRequest(request RemoteSignerMsg) (RemoteSignerMsg, error) { - ve.mtx.Lock() - defer ve.mtx.Unlock() + ve.extMtx.Lock() + defer ve.extMtx.Unlock() + + // TODO: Add retries.. that include dropping the connection and ve.Logger.Debug("SignerListenerEndpoint: Send request", "connected", ve.isConnected()) - err := ve.ensureConnection() + err := ve.ensureConnection(ve.timeoutReadWrite) if err != nil { return nil, err } @@ -127,12 +130,9 @@ func (ve *SignerListenerEndpoint) isConnected() bool { return ve.IsRunning() && ve.conn != nil } -func (ve *SignerListenerEndpoint) ensureConnection() error { +func (ve *SignerListenerEndpoint) ensureConnection(maxWait time.Duration) error { + // TODO: Check that is connected if !ve.isConnected() { - err := ve.acceptNewConnection() - if err != nil { - return cmn.ErrorWrap(ErrListenerNoConnection, "could not reconnect") - } } return nil } @@ -140,10 +140,9 @@ func (ve *SignerListenerEndpoint) ensureConnection() error { // dropConnection closes the current connection but does not touch the listening socket func (ve *SignerListenerEndpoint) dropConnection() { ve.Logger.Debug("SignerListenerEndpoint: dropConnection") - if ve.conn != nil { if err := ve.conn.Close(); err != nil { - ve.Logger.Error("Closing connection", "err", err) + ve.Logger.Error("SignerListenerEndpoint::dropConnection", "err", err) } ve.conn = nil } @@ -201,32 +200,48 @@ func (ve *SignerListenerEndpoint) writeMessage(msg RemoteSignerMsg) (err error) return } +func (ve *SignerListenerEndpoint) serviceLoop() { + defer close(ve.stoppedCh) + + for { + select { + default: + { + ve.Logger.Debug("Listening for new connection") + _ = ve.acceptNewConnection() + } + + case <-ve.stopCh: + { + return + } + } + } +} + func (ve *SignerListenerEndpoint) acceptNewConnection() error { + // TODO: add proper locking ve.Logger.Debug("SignerListenerEndpoint: AcceptNewConnection") if !ve.IsRunning() || ve.listener == nil { return fmt.Errorf("endpoint is closing") } + // TODO: add proper locking // if the conn already exists and close it. - if ve.conn != nil { - if tmpErr := ve.conn.Close(); tmpErr != nil { - ve.Logger.Error("AcceptNewConnection: error closing old connection", "err", tmpErr) - } - } - - // Forget old connection - ve.conn = nil + ve.dropConnection() // wait for a new conn conn, err := ve.listener.Accept() if err != nil { ve.Logger.Debug("listener accept failed", "err", err) + ve.conn = nil // Explicitly set to nil because dialer returns an interface (https://golang.org/doc/faq#nil_error) return err } ve.conn = conn ve.Logger.Info("SignerListenerEndpoint: New connection") + // TODO: add proper locking return nil } diff --git a/tools/tm-signer-harness/internal/test_harness.go b/tools/tm-signer-harness/internal/test_harness.go index a6677e82e..1adc093c1 100644 --- a/tools/tm-signer-harness/internal/test_harness.go +++ b/tools/tm-signer-harness/internal/test_harness.go @@ -101,7 +101,7 @@ func NewTestHarness(logger log.Logger, cfg TestHarnessConfig) (*TestHarness, err } logger.Info("Loaded genesis file", "chainID", st.ChainID) - spv, err := newTestHarnessSignerRemote(logger, cfg) + spv, err := newTestHarnessListener(logger, cfg) if err != nil { return nil, newTestHarnessError(ErrFailedToCreateListener, err, "") } @@ -318,8 +318,8 @@ func (th *TestHarness) Shutdown(err error) { } } -// newTestHarnessSignerRemote creates our client instance which we will use for testing. -func newTestHarnessSignerRemote(logger log.Logger, cfg TestHarnessConfig) (*privval.SignerListenerEndpoint, error) { +// newTestHarnessListener creates our client instance which we will use for testing. +func newTestHarnessListener(logger log.Logger, cfg TestHarnessConfig) (*privval.SignerListenerEndpoint, error) { proto, addr := cmn.ProtocolAndAddress(cfg.BindAddr) if proto == "unix" { // make sure the socket doesn't exist - if so, try to delete it diff --git a/tools/tm-signer-harness/internal/test_harness_test.go b/tools/tm-signer-harness/internal/test_harness_test.go index 296b6d9c4..e13a6c4bd 100644 --- a/tools/tm-signer-harness/internal/test_harness_test.go +++ b/tools/tm-signer-harness/internal/test_harness_test.go @@ -73,7 +73,7 @@ const ( ) func TestRemoteSignerTestHarnessMaxAcceptRetriesReached(t *testing.T) { - cfg := makeConfig(t, 1, 2) + cfg := makeConfig(t, 100, 2) defer cleanup(cfg) th, err := NewTestHarness(log.TestingLogger(), cfg)