Browse Source

clean up tests

pull/3370/head
Juan Leni 6 years ago
parent
commit
ad7eb8f45d
No known key found for this signature in database GPG Key ID: 23F1452155140419
5 changed files with 87 additions and 44 deletions
  1. +4
    -3
      privval/errors.go
  2. +1
    -1
      privval/file_test.go
  3. +1
    -1
      privval/signer_dialer_endpoint.go
  4. +80
    -38
      privval/signer_listener_endpoint.go
  5. +1
    -1
      privval/signer_listener_endpoint_test.go

+ 4
- 3
privval/errors.go View File

@ -6,9 +6,10 @@ import (
// Socket errors.
var (
ErrUnexpectedResponse = fmt.Errorf("received unexpected response")
ErrListenerTimeout = fmt.Errorf("signer listening endpoint timed out")
ErrDialerTimeout = fmt.Errorf("signer dialer endpoint timed out")
ErrUnexpectedResponse = fmt.Errorf("received unexpected response")
ErrListenerTimeout = fmt.Errorf("signer listening endpoint timed out")
ErrListenerNoConnection = fmt.Errorf("signer listening endpoint is not connected")
ErrDialerTimeout = fmt.Errorf("signer dialer endpoint timed out")
)
// RemoteSignerError allows (remote) validators to include meaningful error descriptions in their reply.


+ 1
- 1
privval/file_test.go View File

@ -58,7 +58,7 @@ func TestResetValidator(t *testing.T) {
// priv val after signing is not same as empty
assert.NotEqual(t, privVal.LastSignState, emptyState)
// priv val after tryConnect is same as empty
// priv val after tryAcceptConnection is same as empty
privVal.Reset()
assert.Equal(t, privVal.LastSignState, emptyState)
}


+ 1
- 1
privval/signer_dialer_endpoint.go View File

@ -20,7 +20,7 @@ func SignerServiceEndpointTimeoutReadWrite(timeout time.Duration) SignerServiceE
return func(ss *SignerDialerEndpoint) { ss.timeoutReadWrite = timeout }
}
// SignerServiceEndpointConnRetries sets the amount of attempted retries to tryConnect.
// SignerServiceEndpointConnRetries sets the amount of attempted retries to tryAcceptConnection.
func SignerServiceEndpointConnRetries(retries int) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.connRetries = retries }
}


+ 80
- 38
privval/signer_listener_endpoint.go View File

@ -30,7 +30,7 @@ func SignerValidatorEndpointSetHeartbeat(period time.Duration) SignerValidatorEn
// TODO: Add a type for SignerEndpoints
// getConnection
// tryConnect
// tryAcceptConnection
// read
// write
// close
@ -46,6 +46,8 @@ type SignerListenerEndpoint struct {
listener net.Listener
conn net.Conn
timeoutReadWrite time.Duration
// ping
cancelPingCh chan struct{}
pingTicker *time.Ticker
@ -56,6 +58,7 @@ type SignerListenerEndpoint struct {
func NewSignerListenerEndpoint(logger log.Logger, listener net.Listener) *SignerListenerEndpoint {
sc := &SignerListenerEndpoint{
listener: listener,
timeoutReadWrite : defaultTimeoutReadWriteSeconds * time.Second,
heartbeatPeriod: heartbeatPeriod,
}
@ -68,47 +71,55 @@ func NewSignerListenerEndpoint(logger log.Logger, listener net.Listener) *Signer
func (ve *SignerListenerEndpoint) OnStart() error {
ve.Logger.Debug("SignerListenerEndpoint: OnStart")
err := ve.tryConnect()
err := ve.tryAcceptConnection()
if err != nil {
ve.Logger.Error("OnStart", "err", err)
return err
}
ve.Logger.Info("OnStart", "connected", ve.isConnected())
// Start a routine to keep the connection alive
go ve.pingLoop()
return nil
}
func (ve *SignerListenerEndpoint)pingLoop() {
ve.cancelPingCh = make(chan struct{}, 1)
ve.pingTicker = time.NewTicker(ve.heartbeatPeriod)
// TODO: Move subroutine to another place?
go func() {
for {
select {
case <-ve.pingTicker.C:
ve.Logger.Debug("SignerListenerEndpoint: ping ticker ch")
err := ve.ping()
fmt.Printf("ONSTART out -> %p\n", ve)
for {
fmt.Printf("ONSTART in -> %p\n", ve)
select {
case <-ve.pingTicker.C:
fmt.Printf("ONSTART before -> %p\n", ve)
err := ve.ping()
if err != nil {
ve.Logger.Error("Ping", "err", err)
if err == ErrUnexpectedResponse {
return
}
err := ve.tryAcceptConnection()
if err != nil {
ve.Logger.Error("Ping", "err", err)
if err == ErrUnexpectedResponse {
return
}
err := ve.tryConnect()
if err != nil {
ve.Logger.Error("Reconnecting to remote signer failed", "err", err)
continue
}
ve.Logger.Info("Re-created connection to remote signer", "impl", ve)
ve.Logger.Error("Connection from remote signer not available", "err", err)
continue
}
case <-ve.cancelPingCh:
ve.Logger.Debug("SignerListenerEndpoint: cancel ping ch")
ve.pingTicker.Stop()
return
ve.Logger.Info("Connection from remote signer available", "impl", ve)
}
}
}()
case <-ve.cancelPingCh:
ve.Logger.Debug("SignerListenerEndpoint: cancel ping ch")
return nil
ve.pingTicker.Stop()
return
}
}
}
// OnStop implements cmn.Service.
@ -150,6 +161,7 @@ func (ve *SignerListenerEndpoint) Close() error {
ve.Logger.Error("Closing connection", "err", err)
return err
}
ve.Logger.Debug("SignerListenerEndpoint: set ve.conn Nil")
ve.conn = nil
}
@ -168,15 +180,21 @@ func (ve *SignerListenerEndpoint) SendRequest(request RemoteSignerMsg) (RemoteSi
ve.mtx.Lock()
defer ve.mtx.Unlock()
ve.Logger.Debug("SignerListenerEndpoint: Send request", "connected", ve.isConnected())
if !ve.isConnected() {
return nil, cmn.ErrorWrap(ErrListenerTimeout, "endpoint is not connected")
return nil, cmn.ErrorWrap(ErrListenerNoConnection, "endpoint is not connected")
}
ve.Logger.Debug("Send request. Write")
err := ve.writeMessage(request)
if err != nil {
return nil, err
}
ve.Logger.Debug("Send request. Read")
res, err := ve.readMessage()
if err != nil {
return nil, err
@ -187,12 +205,21 @@ func (ve *SignerListenerEndpoint) SendRequest(request RemoteSignerMsg) (RemoteSi
// IsConnected indicates if there is an active connection
func (ve *SignerListenerEndpoint) isConnected() bool {
return ve.IsRunning() && ve.conn != nil
// return ve.IsRunning() && ve.conn != nil
return ve.conn != nil
}
func (ve *SignerListenerEndpoint) readMessage() (msg RemoteSignerMsg, err error) {
if !ve.isConnected() {
return nil, cmn.ErrorWrap(ErrListenerTimeout, "endpoint is not connected")
return nil, cmn.ErrorWrap(ErrListenerNoConnection, "endpoint is not connected")
}
// Reset read deadline
deadline := time.Now().Add(ve.timeoutReadWrite)
ve.Logger.Debug("SignerListenerEndpoint: readMessage", "deadline", deadline)
err = ve.conn.SetReadDeadline(deadline)
if err != nil {
return
}
const maxRemoteSignerMsgSize = 1024 * 10
@ -206,7 +233,17 @@ func (ve *SignerListenerEndpoint) readMessage() (msg RemoteSignerMsg, err error)
func (ve *SignerListenerEndpoint) writeMessage(msg RemoteSignerMsg) (err error) {
if !ve.isConnected() {
return cmn.ErrorWrap(ErrListenerTimeout, "endpoint is not connected")
return cmn.ErrorWrap(ErrListenerNoConnection, "endpoint is not connected")
}
fmt.Printf("writemessage -> %p\n", ve)
// Reset read deadline
deadline := time.Now().Add(ve.timeoutReadWrite)
ve.Logger.Debug("SignerListenerEndpoint: writeMessage", "deadline", deadline)
err = ve.conn.SetWriteDeadline(deadline)
if err != nil {
return
}
_, err = cdc.MarshalBinaryLengthPrefixedWriter(ve.conn, msg)
@ -217,12 +254,12 @@ func (ve *SignerListenerEndpoint) writeMessage(msg RemoteSignerMsg) (err error)
return
}
// tryConnect waits to accept a new connection.
func (ve *SignerListenerEndpoint) tryConnect() error {
// tryAcceptConnection waits to accept a new connection.
func (ve *SignerListenerEndpoint) tryAcceptConnection() error {
ve.mtx.Lock()
defer ve.mtx.Unlock()
ve.Logger.Debug("SignerListenerEndpoint: tryConnect")
ve.Logger.Debug("SignerListenerEndpoint: tryAcceptConnection")
if !ve.IsRunning() || ve.listener == nil {
return fmt.Errorf("endpoint is closing")
@ -231,22 +268,25 @@ func (ve *SignerListenerEndpoint) tryConnect() error {
// if the conn already exists and close it.
if ve.conn != nil {
if tmpErr := ve.conn.Close(); tmpErr != nil {
ve.Logger.Error("error closing socket val connection during tryConnect", "err", tmpErr)
ve.Logger.Error("tryAcceptConnection: error closing old connection", "err", tmpErr)
}
}
// Forget old connection
ve.conn = nil
ve.Logger.Debug("SignerListenerEndpoint: set ve.conn Nil")
// wait for a new conn
conn, err := ve.listener.Accept()
if err != nil {
ve.Logger.Debug("listener accept failed", "err", err)
return err
}
ve.Logger.Debug("SignerListenerEndpoint: New connection")
ve.conn = conn
ve.Logger.Debug("SignerListenerEndpoint: New connection", "connected", ve.isConnected())
fmt.Printf("tryAcceptConnection: %p\n", ve)
// TODO: maybe we need to inform the owner that a connection has been received
return nil
@ -254,6 +294,8 @@ func (ve *SignerListenerEndpoint) tryConnect() error {
// Ping is used to check connection health.
func (ve *SignerListenerEndpoint) ping() error {
ve.Logger.Debug("SignerListenerEndpoint: PING", "connected", ve.isConnected())
response, err := ve.SendRequest(&PingRequest{})
if err != nil {


+ 1
- 1
privval/signer_listener_endpoint_test.go View File

@ -31,7 +31,7 @@ type dialerTestCase struct {
// TestSignerRemoteRetryTCPOnly will test connection retry attempts over TCP. We
// don't need this for Unix sockets because the OS instantly knows the state of
// both ends of the socket connection. This basically causes the
// SignerDialerEndpoint.dialer() call inside SignerDialerEndpoint.tryConnect() to return
// SignerDialerEndpoint.dialer() call inside SignerDialerEndpoint.tryAcceptConnection() to return
// successfully immediately, putting an instant stop to any retry attempts.
func TestSignerRemoteRetryTCPOnly(t *testing.T) {
var (


Loading…
Cancel
Save