Browse Source

improved connection loop

pull/3370/head
Juan Leni 6 years ago
parent
commit
37cc8db430
No known key found for this signature in database GPG Key ID: 23F1452155140419
4 changed files with 93 additions and 38 deletions
  1. +8
    -1
      privval/errors.go
  2. +83
    -35
      privval/signer_listener_endpoint.go
  3. +1
    -1
      tools/tm-signer-harness/internal/test_harness.go
  4. +1
    -1
      tools/tm-signer-harness/internal/test_harness_test.go

+ 8
- 1
privval/errors.go View File

@ -4,10 +4,17 @@ import (
"fmt"
)
type ListenerTimeoutError struct{}
// Implement the net.Error interface.
func (e ListenerTimeoutError) Error() string { return "listening endpoint timed out" }
func (e ListenerTimeoutError) Timeout() bool { return true }
func (e ListenerTimeoutError) Temporary() bool { return true }
// Socket errors.
var (
ErrUnexpectedResponse = fmt.Errorf("received unexpected response")
ErrListenerTimeout = fmt.Errorf("signer listening endpoint timed out")
ErrListenerTimeout = ListenerTimeoutError{}
ErrListenerNoConnection = fmt.Errorf("signer listening endpoint is not connected")
ErrDialerTimeout = fmt.Errorf("signer dialer endpoint timed out")
)


+ 83
- 35
privval/signer_listener_endpoint.go View File

@ -25,6 +25,8 @@ type SignerListenerEndpoint struct {
timeoutReadWrite time.Duration
stopCh, stoppedCh chan struct{}
connectCh chan struct{}
connectedCh chan net.Conn
}
// NewSignerListenerEndpoint returns an instance of SignerListenerEndpoint.
@ -47,7 +49,11 @@ func (ve *SignerListenerEndpoint) OnStart() error {
ve.stopCh = make(chan struct{})
ve.stoppedCh = make(chan struct{})
ve.connectCh = make(chan struct{})
ve.connectedCh = make(chan net.Conn)
go ve.serviceLoop()
ve.connectCh <- struct{}{}
return nil
}
@ -57,6 +63,7 @@ func (ve *SignerListenerEndpoint) OnStop() {
ve.Logger.Debug("SignerListenerEndpoint: OnStop calling Close")
_ = ve.Close()
ve.Logger.Debug("SignerListenerEndpoint: OnStop stop listening")
// Stop listening
if ve.listener != nil {
if err := ve.listener.Close(); err != nil {
@ -64,8 +71,10 @@ func (ve *SignerListenerEndpoint) OnStop() {
}
}
ve.Logger.Debug("SignerListenerEndpoint: OnStop close stopCh")
// Stop service loop
close(ve.stopCh)
ve.stopCh <- struct{}{}
<-ve.stoppedCh
}
@ -77,6 +86,7 @@ func (ve *SignerListenerEndpoint) Close() error {
ve.dropConnection()
ve.Logger.Debug("SignerListenerEndpoint: Closed")
return nil
}
@ -130,24 +140,6 @@ func (ve *SignerListenerEndpoint) isConnected() bool {
return ve.IsRunning() && ve.conn != nil
}
func (ve *SignerListenerEndpoint) ensureConnection(maxWait time.Duration) error {
// TODO: Check that is connected
if !ve.isConnected() {
}
return nil
}
// dropConnection closes the current connection but does not touch the listening socket
func (ve *SignerListenerEndpoint) dropConnection() {
ve.Logger.Debug("SignerListenerEndpoint: dropConnection")
if ve.conn != nil {
if err := ve.conn.Close(); err != nil {
ve.Logger.Error("SignerListenerEndpoint::dropConnection", "err", err)
}
ve.conn = nil
}
}
func (ve *SignerListenerEndpoint) readMessage() (msg RemoteSignerMsg, err error) {
if !ve.isConnected() {
return nil, cmn.ErrorWrap(ErrListenerNoConnection, "endpoint is not connected")
@ -200,48 +192,104 @@ func (ve *SignerListenerEndpoint) writeMessage(msg RemoteSignerMsg) (err error)
return
}
func (ve *SignerListenerEndpoint) ensureConnection(maxWait time.Duration) error {
if !ve.isConnected() {
// Is there a connection ready?
select {
case ve.conn = <-ve.connectedCh:
{
ve.Logger.Debug("SignerListenerEndpoint: received connection")
return nil
}
default:
{
ve.Logger.Debug("SignerListenerEndpoint: no connection is ready")
}
}
// should we trigger a reconnect?
select {
case ve.connectCh <- struct{}{}:
{
ve.Logger.Debug("SignerListenerEndpoint: triggered a reconnect")
}
default:
{
ve.Logger.Debug("SignerListenerEndpoint: reconnect in progress")
}
}
// block until connected or timeout
select {
case ve.conn = <-ve.connectedCh:
{
ve.Logger.Debug("SignerListenerEndpoint: connected")
}
case <-time.After(maxWait):
{
ve.Logger.Debug("SignerListenerEndpoint: timeout")
return ErrListenerTimeout
}
}
}
return nil
}
// dropConnection closes the current connection but does not touch the listening socket
func (ve *SignerListenerEndpoint) dropConnection() {
ve.Logger.Debug("SignerListenerEndpoint: dropConnection")
if ve.conn != nil {
if err := ve.conn.Close(); err != nil {
ve.Logger.Error("SignerListenerEndpoint::dropConnection", "err", err)
}
ve.conn = nil
}
ve.Logger.Debug("SignerListenerEndpoint: dropConnection DONE")
}
func (ve *SignerListenerEndpoint) serviceLoop() {
defer close(ve.stoppedCh)
defer ve.Logger.Debug("SignerListenerEndpoint::serviceLoop EXIT")
ve.Logger.Debug("SignerListenerEndpoint::serviceLoop")
for {
select {
default:
case <-ve.connectCh:
{
ve.Logger.Debug("Listening for new connection")
_ = ve.acceptNewConnection()
for {
ve.Logger.Info("Listening for new connection")
conn, err := ve.acceptNewConnection()
if err == nil {
ve.Logger.Info("Connected")
ve.connectedCh <- conn
break
}
}
}
case <-ve.stopCh:
{
ve.Logger.Debug("SignerListenerEndpoint::serviceLoop Stop")
return
}
}
}
}
func (ve *SignerListenerEndpoint) acceptNewConnection() error {
// TODO: add proper locking
func (ve *SignerListenerEndpoint) acceptNewConnection() (net.Conn, error) {
ve.Logger.Debug("SignerListenerEndpoint: AcceptNewConnection")
if !ve.IsRunning() || ve.listener == nil {
return fmt.Errorf("endpoint is closing")
return nil, fmt.Errorf("endpoint is closing")
}
// TODO: add proper locking
// if the conn already exists and close it.
ve.dropConnection()
// wait for a new conn
conn, err := ve.listener.Accept()
if err != nil {
ve.Logger.Debug("listener accept failed", "err", err)
ve.conn = nil // Explicitly set to nil because dialer returns an interface (https://golang.org/doc/faq#nil_error)
return err
return nil, err
}
ve.conn = conn
ve.Logger.Info("SignerListenerEndpoint: New connection")
// TODO: add proper locking
return nil
return conn, nil
}

+ 1
- 1
tools/tm-signer-harness/internal/test_harness.go View File

@ -144,7 +144,7 @@ func (th *TestHarness) Run() {
for acceptRetries := th.acceptRetries; acceptRetries > 0; acceptRetries-- {
th.logger.Info("Attempting to accept incoming connection", "acceptRetries", acceptRetries)
if err := th.signerClient.WaitForConnection(5 * time.Second); err != nil {
if err := th.signerClient.WaitForConnection(2 * time.Second); err != nil {
// if it wasn't a timeout error
if _, ok := err.(timeoutError); !ok {
th.logger.Error("Failed to start listener", "err", err)


+ 1
- 1
tools/tm-signer-harness/internal/test_harness_test.go View File

@ -73,7 +73,7 @@ const (
)
func TestRemoteSignerTestHarnessMaxAcceptRetriesReached(t *testing.T) {
cfg := makeConfig(t, 100, 2)
cfg := makeConfig(t, 200, 2)
defer cleanup(cfg)
th, err := NewTestHarness(log.TestingLogger(), cfg)


Loading…
Cancel
Save