From c7348985aff2a36237c094cffb408af876d4f57a Mon Sep 17 00:00:00 2001 From: Juan Leni Date: Tue, 5 Mar 2019 14:59:34 +0100 Subject: [PATCH] Fix/improve unit tests --- privval/file_test.go | 2 +- privval/signer_client.go | 40 +++--- privval/signer_client_test.go | 25 ++-- privval/signer_dialer_endpoint.go | 47 ++++--- privval/signer_listener_endpoint.go | 125 +++++++++++------- privval/signer_listener_endpoint_test.go | 2 +- .../internal/test_harness.go | 32 +++-- 7 files changed, 163 insertions(+), 110 deletions(-) diff --git a/privval/file_test.go b/privval/file_test.go index 3b9c3b614..a82911443 100644 --- a/privval/file_test.go +++ b/privval/file_test.go @@ -58,7 +58,7 @@ func TestResetValidator(t *testing.T) { // priv val after signing is not same as empty assert.NotEqual(t, privVal.LastSignState, emptyState) - // priv val after connect is same as empty + // priv val after tryConnect is same as empty privVal.Reset() assert.Equal(t, privVal.LastSignState, emptyState) } diff --git a/privval/signer_client.go b/privval/signer_client.go index 36d440dfa..db32daf34 100644 --- a/privval/signer_client.go +++ b/privval/signer_client.go @@ -2,6 +2,7 @@ package privval import ( "fmt" + "time" "github.com/pkg/errors" "github.com/tendermint/tendermint/crypto" @@ -41,28 +42,41 @@ func NewSignerClient(endpoint *SignerListenerEndpoint) (*SignerClient, error) { } // Close calls Close on the underlying net.Conn. -func (sr *SignerClient) Close() error { - return sr.endpoint.Close() +func (sc *SignerClient) Close() error { + return sc.endpoint.Close() +} + +// Close calls Close on the underlying net.Conn. +func (sc *SignerClient) IsConnected() bool { + return sc.endpoint.IsConnected() +} + +// Close calls Close on the underlying net.Conn. +func (sc *SignerClient) WaitForConnection(maxWait time.Duration) error { + if sc.endpoint == nil { + return fmt.Errorf("endpoint has not been defined") + } + return sc.endpoint.WaitForConnection(maxWait) } //-------------------------------------------------------- // Implement PrivValidator // GetPubKey implements PrivValidator. -func (sr *SignerClient) GetPubKey() crypto.PubKey { - response, err := sr.endpoint.SendRequest(&PubKeyRequest{}) +func (sc *SignerClient) GetPubKey() crypto.PubKey { + response, err := sc.endpoint.SendRequest(&PubKeyRequest{}) if err != nil { return nil } pubKeyResp, ok := response.(*PubKeyResponse) if !ok { - sr.endpoint.Logger.Error("response is not PubKeyResponse") + sc.endpoint.Logger.Error("response is not PubKeyResponse") return nil } if pubKeyResp.Error != nil { - sr.endpoint.Logger.Error("failed to get private validator's public key", "err", pubKeyResp.Error) + sc.endpoint.Logger.Error("failed to get private validator's public key", "err", pubKeyResp.Error) return nil } @@ -70,8 +84,8 @@ func (sr *SignerClient) GetPubKey() crypto.PubKey { } // SignVote implements PrivValidator. -func (sr *SignerClient) SignVote(chainID string, vote *types.Vote) error { - response, err := sr.endpoint.SendRequest(&SignVoteRequest{Vote: vote}) +func (sc *SignerClient) SignVote(chainID string, vote *types.Vote) error { + response, err := sc.endpoint.SendRequest(&SignVoteRequest{Vote: vote}) if err != nil { return err } @@ -90,8 +104,8 @@ func (sr *SignerClient) SignVote(chainID string, vote *types.Vote) error { } // SignProposal implements PrivValidator. -func (sr *SignerClient) SignProposal(chainID string, proposal *types.Proposal) error { - response, err := sr.endpoint.SendRequest(&SignProposalRequest{Proposal: proposal}) +func (sc *SignerClient) SignProposal(chainID string, proposal *types.Proposal) error { + response, err := sc.endpoint.SendRequest(&SignProposalRequest{Proposal: proposal}) if err != nil { return err } @@ -108,11 +122,7 @@ func (sr *SignerClient) SignProposal(chainID string, proposal *types.Proposal) e return nil } -func handleRequest( - req RemoteSignerMsg, - chainID string, - privVal types.PrivValidator) (RemoteSignerMsg, error) { - +func handleRequest(req RemoteSignerMsg, chainID string, privVal types.PrivValidator) (RemoteSignerMsg, error) { var res RemoteSignerMsg var err error diff --git a/privval/signer_client_test.go b/privval/signer_client_test.go index 3e165f941..78591985e 100644 --- a/privval/signer_client_test.go +++ b/privval/signer_client_test.go @@ -48,9 +48,6 @@ func TestSignerClose(t *testing.T) { err := tc.signer.Close() assert.NoError(t, err) - err = tc.signer.endpoint.Stop() - assert.NoError(t, err) - err = tc.signerService.Stop() assert.NoError(t, err) }() @@ -60,18 +57,18 @@ func TestSignerClose(t *testing.T) { func TestSignerGetPubKey(t *testing.T) { for _, tc := range getSignerTestCases(t) { func() { - defer tc.signer.Close() defer tc.signerService.OnStop() + defer tc.signer.Close() pubKey := tc.signer.GetPubKey() expectedPubKey := tc.mockPV.GetPubKey() assert.Equal(t, expectedPubKey, pubKey) - //addr := tc.signer.GetPubKey().Address() - //expectedAddr := tc.mockPV.GetPubKey().Address() - // - //assert.Equal(t, expectedAddr, addr) + addr := tc.signer.GetPubKey().Address() + expectedAddr := tc.mockPV.GetPubKey().Address() + + assert.Equal(t, expectedAddr, addr) }() } } @@ -83,8 +80,8 @@ func TestSignerProposal(t *testing.T) { want := &types.Proposal{Timestamp: ts} have := &types.Proposal{Timestamp: ts} - defer tc.signer.Close() defer tc.signerService.OnStop() + defer tc.signer.Close() require.NoError(t, tc.mockPV.SignProposal(tc.chainID, want)) require.NoError(t, tc.signer.SignProposal(tc.chainID, have)) @@ -101,8 +98,8 @@ func TestSignerVote(t *testing.T) { want := &types.Vote{Timestamp: ts, Type: types.PrecommitType} have := &types.Vote{Timestamp: ts, Type: types.PrecommitType} - defer tc.signer.Close() defer tc.signerService.OnStop() + defer tc.signer.Close() require.NoError(t, tc.mockPV.SignVote(tc.chainID, want)) require.NoError(t, tc.signer.SignVote(tc.chainID, have)) @@ -119,8 +116,8 @@ func TestSignerVoteResetDeadline(t *testing.T) { want := &types.Vote{Timestamp: ts, Type: types.PrecommitType} have := &types.Vote{Timestamp: ts, Type: types.PrecommitType} - defer tc.signer.Close() defer tc.signerService.OnStop() + defer tc.signer.Close() time.Sleep(testTimeoutReadWrite2o3) @@ -148,8 +145,8 @@ func TestSignerVoteKeepAlive(t *testing.T) { want := &types.Vote{Timestamp: ts, Type: types.PrecommitType} have := &types.Vote{Timestamp: ts, Type: types.PrecommitType} - defer tc.signer.Close() defer tc.signerService.OnStop() + defer tc.signer.Close() time.Sleep(testTimeoutReadWrite * 2) @@ -170,8 +167,8 @@ func TestSignerSignProposalErrors(t *testing.T) { tc.signerService.privVal = types.NewErroringMockPV() tc.mockPV = types.NewErroringMockPV() - defer tc.signer.Close() defer tc.signerService.OnStop() + defer tc.signer.Close() //ts := time.Now() //proposal := &types.Proposal{Timestamp: ts} @@ -197,8 +194,8 @@ func TestSignerSignVoteErrors(t *testing.T) { tc.signerService.privVal = types.NewErroringMockPV() tc.mockPV = types.NewErroringMockPV() - defer tc.signer.Close() defer tc.signerService.OnStop() + defer tc.signer.Close() err := tc.signer.SignVote(tc.chainID, vote) require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error()) diff --git a/privval/signer_dialer_endpoint.go b/privval/signer_dialer_endpoint.go index 7a1f38797..9d1d80c83 100644 --- a/privval/signer_dialer_endpoint.go +++ b/privval/signer_dialer_endpoint.go @@ -1,6 +1,7 @@ package privval import ( + "fmt" "io" "net" "time" @@ -19,7 +20,7 @@ func SignerServiceEndpointTimeoutReadWrite(timeout time.Duration) SignerServiceE return func(ss *SignerDialerEndpoint) { ss.timeoutReadWrite = timeout } } -// SignerServiceEndpointConnRetries sets the amount of attempted retries to connect. +// SignerServiceEndpointConnRetries sets the amount of attempted retries to tryConnect. func SignerServiceEndpointConnRetries(retries int) SignerServiceEndpointOption { return func(ss *SignerDialerEndpoint) { ss.connRetries = retries } } @@ -83,6 +84,7 @@ func (ss *SignerDialerEndpoint) OnStop() { if err := ss.conn.Close(); err != nil { ss.Logger.Error("OnStop", "err", cmn.ErrorWrap(err, "closing listener failed")) + ss.conn = nil } } @@ -105,8 +107,19 @@ func (ss *SignerDialerEndpoint) connect() (net.Conn, error) { } func (ss *SignerDialerEndpoint) readMessage() (msg RemoteSignerMsg, err error) { - // TODO: Avoid duplication - // TODO: Check connection status + // TODO: Avoid duplication. Unify endpoints + + if ss.conn == nil { + return nil, fmt.Errorf("not connected") + } + + // Reset read deadline + deadline := time.Now().Add(ss.timeoutReadWrite) + ss.Logger.Debug("SignerDialerEndpoint: readMessage", "deadline", deadline) + err = ss.conn.SetReadDeadline(deadline) + if err != nil { + return + } const maxRemoteSignerMsgSize = 1024 * 10 _, err = cdc.UnmarshalBinaryLengthPrefixedReader(ss.conn, &msg, maxRemoteSignerMsgSize) @@ -118,17 +131,25 @@ func (ss *SignerDialerEndpoint) readMessage() (msg RemoteSignerMsg, err error) { } func (ss *SignerDialerEndpoint) writeMessage(msg RemoteSignerMsg) (err error) { - // TODO: Avoid duplication - // TODO: Check connection status + // TODO: Avoid duplication. Unify endpoints + + if ss.conn == nil { + return fmt.Errorf("not connected") + } + + // Reset read deadline + deadline := time.Now().Add(ss.timeoutReadWrite) + ss.Logger.Debug("SignerDialerEndpoint: readMessage", "deadline", deadline) + err = ss.conn.SetWriteDeadline(deadline) + if err != nil { + return + } _, err = cdc.MarshalBinaryLengthPrefixedWriter(ss.conn, msg) if _, ok := err.(timeoutError); ok { err = cmn.ErrorWrap(ErrDialerTimeout, err.Error()) } - // TODO: Probably can assert that is a response type and check for error here - // Check the impact of KMS/Rust - return } @@ -138,18 +159,12 @@ func (ss *SignerDialerEndpoint) handleConnection(conn net.Conn) { return // Ignore error from listener closing. } - // Reset the connection deadline - deadline := time.Now().Add(ss.timeoutReadWrite) - err := conn.SetDeadline(deadline) - if err != nil { - return - } + ss.Logger.Debug("SignerDialerEndpoint: connected", "timeout", ss.timeoutReadWrite) - // TODO: As soon as it connects, most likely it will timeout req, err := ss.readMessage() if err != nil { if err != io.EOF { - ss.Logger.Error("handleConnection readMessage", "err", err) + ss.Logger.Error("SignerDialerEndpoint handleConnection", "err", err) } return } diff --git a/privval/signer_listener_endpoint.go b/privval/signer_listener_endpoint.go index 78a0457ea..3c50ce16a 100644 --- a/privval/signer_listener_endpoint.go +++ b/privval/signer_listener_endpoint.go @@ -30,7 +30,7 @@ func SignerValidatorEndpointSetHeartbeat(period time.Duration) SignerValidatorEn // TODO: Add a type for SignerEndpoints // getConnection -// connect +// tryConnect // read // write // close @@ -66,18 +66,14 @@ func NewSignerListenerEndpoint(logger log.Logger, listener net.Listener) *Signer // OnStart implements cmn.Service. func (ve *SignerListenerEndpoint) OnStart() error { - closed, err := ve.connect() - // TODO: Improve. Connection state should be kept in a variable + ve.Logger.Debug("SignerListenerEndpoint: OnStart") + err := ve.tryConnect() if err != nil { ve.Logger.Error("OnStart", "err", err) return err } - if closed { - return fmt.Errorf("listener is closed") - } - // Start a routine to keep the connection alive ve.cancelPingCh = make(chan struct{}, 1) ve.pingTicker = time.NewTicker(ve.heartbeatPeriod) @@ -87,6 +83,7 @@ func (ve *SignerListenerEndpoint) OnStart() error { for { select { case <-ve.pingTicker.C: + ve.Logger.Debug("SignerListenerEndpoint: ping ticker ch") err := ve.ping() if err != nil { ve.Logger.Error("Ping", "err", err) @@ -94,19 +91,17 @@ func (ve *SignerListenerEndpoint) OnStart() error { return } - closed, err := ve.connect() + err := ve.tryConnect() if err != nil { ve.Logger.Error("Reconnecting to remote signer failed", "err", err) continue } - if closed { - ve.Logger.Info("listener is closing") - return - } ve.Logger.Info("Re-created connection to remote signer", "impl", ve) } case <-ve.cancelPingCh: + ve.Logger.Debug("SignerListenerEndpoint: cancel ping ch") + ve.pingTicker.Stop() return } @@ -118,18 +113,38 @@ func (ve *SignerListenerEndpoint) OnStart() error { // OnStop implements cmn.Service. func (ve *SignerListenerEndpoint) OnStop() { + ve.Logger.Debug("SignerListenerEndpoint: OnStop") + if ve.cancelPingCh != nil { + ve.Logger.Debug("SignerListenerEndpoint: Close cancel ping channel") close(ve.cancelPingCh) ve.cancelPingCh = nil } + + ve.Logger.Debug("SignerListenerEndpoint: OnStop calling Close") _ = ve.Close() } +// IsConnected indicates if there is an active connection +func (ve *SignerListenerEndpoint) IsConnected() bool { + ve.mtx.Lock() + defer ve.mtx.Unlock() + return ve.isConnected() +} + +// WaitForConnection waits maxWait for a connection or returns a timeout error +func (ve *SignerListenerEndpoint) WaitForConnection(maxWait time.Duration) error { + // TODO: complete this + return nil +} + // Close closes the underlying net.Conn. func (ve *SignerListenerEndpoint) Close() error { ve.mtx.Lock() defer ve.mtx.Unlock() + ve.Logger.Debug("SignerListenerEndpoint: Close") + if ve.conn != nil { if err := ve.conn.Close(); err != nil { ve.Logger.Error("Closing connection", "err", err) @@ -153,6 +168,10 @@ func (ve *SignerListenerEndpoint) SendRequest(request RemoteSignerMsg) (RemoteSi ve.mtx.Lock() defer ve.mtx.Unlock() + if !ve.isConnected() { + return nil, cmn.ErrorWrap(ErrListenerTimeout, "endpoint is not connected") + } + err := ve.writeMessage(request) if err != nil { return nil, err @@ -166,24 +185,15 @@ func (ve *SignerListenerEndpoint) SendRequest(request RemoteSignerMsg) (RemoteSi return res, nil } -// Ping is used to check connection health. -func (ve *SignerListenerEndpoint) ping() error { - response, err := ve.SendRequest(&PingRequest{}) - - if err != nil { - return err - } - - _, ok := response.(*PingResponse) - if !ok { - return ErrUnexpectedResponse - } - - return nil +// IsConnected indicates if there is an active connection +func (ve *SignerListenerEndpoint) isConnected() bool { + return ve.IsRunning() && ve.conn != nil } func (ve *SignerListenerEndpoint) readMessage() (msg RemoteSignerMsg, err error) { - // TODO: Check connection status + if !ve.isConnected() { + return nil, cmn.ErrorWrap(ErrListenerTimeout, "endpoint is not connected") + } const maxRemoteSignerMsgSize = 1024 * 10 _, err = cdc.UnmarshalBinaryLengthPrefixedReader(ve.conn, &msg, maxRemoteSignerMsgSize) @@ -195,9 +205,8 @@ func (ve *SignerListenerEndpoint) readMessage() (msg RemoteSignerMsg, err error) } func (ve *SignerListenerEndpoint) writeMessage(msg RemoteSignerMsg) (err error) { - // TODO: Check connection status - if ve.conn == nil { - return fmt.Errorf("endpoint is not connected") + if !ve.isConnected() { + return cmn.ErrorWrap(ErrListenerTimeout, "endpoint is not connected") } _, err = cdc.MarshalBinaryLengthPrefixedWriter(ve.conn, msg) @@ -208,39 +217,55 @@ func (ve *SignerListenerEndpoint) writeMessage(msg RemoteSignerMsg) (err error) return } -// waits to accept and sets a new connection. -// connection is closed in OnStop. -// returns true if the listener is closed (ie. it returns a nil conn). -// TODO: Improve this -func (ve *SignerListenerEndpoint) connect() (closed bool, err error) { +// tryConnect waits to accept a new connection. +func (ve *SignerListenerEndpoint) tryConnect() error { ve.mtx.Lock() defer ve.mtx.Unlock() - // first check if the conn already exists and close it. + ve.Logger.Debug("SignerListenerEndpoint: tryConnect") + + if !ve.IsRunning() || ve.listener == nil { + return fmt.Errorf("endpoint is closing") + } + + // if the conn already exists and close it. if ve.conn != nil { if tmpErr := ve.conn.Close(); tmpErr != nil { - ve.Logger.Error("error closing socket val connection during connect", "err", tmpErr) + ve.Logger.Error("error closing socket val connection during tryConnect", "err", tmpErr) } } + // Forget old connection + ve.conn = nil + // wait for a new conn - ve.conn, err = ve.listener.Accept() + conn, err := ve.listener.Accept() if err != nil { - return false, err + return err } - // listener is closed - if ve.conn == nil { - return true, nil - } + ve.Logger.Debug("SignerListenerEndpoint: New connection") + + ve.conn = conn + // TODO: maybe we need to inform the owner that a connection has been received + + return nil +} + +// Ping is used to check connection health. +func (ve *SignerListenerEndpoint) ping() error { + response, err := ve.SendRequest(&PingRequest{}) if err != nil { - // TODO: This does not belong here... but maybe we need to inform the owner that a connection has been received - // failed to fetch the pubkey. close out the connection. - if tmpErr := ve.conn.Close(); tmpErr != nil { - ve.Logger.Error("error closing connection", "err", tmpErr) - } - return false, err + return err + } + + _, ok := response.(*PingResponse) + if !ok { + return ErrUnexpectedResponse } - return false, nil + + ve.Logger.Debug("SignerListenerEndpoint: pong") + + return nil } diff --git a/privval/signer_listener_endpoint_test.go b/privval/signer_listener_endpoint_test.go index 57c29bd1e..8a550c099 100644 --- a/privval/signer_listener_endpoint_test.go +++ b/privval/signer_listener_endpoint_test.go @@ -31,7 +31,7 @@ type dialerTestCase struct { // TestSignerRemoteRetryTCPOnly will test connection retry attempts over TCP. We // don't need this for Unix sockets because the OS instantly knows the state of // both ends of the socket connection. This basically causes the -// SignerDialerEndpoint.dialer() call inside SignerDialerEndpoint.connect() to return +// SignerDialerEndpoint.dialer() call inside SignerDialerEndpoint.tryConnect() to return // successfully immediately, putting an instant stop to any retry attempts. func TestSignerRemoteRetryTCPOnly(t *testing.T) { var ( diff --git a/tools/tm-signer-harness/internal/test_harness.go b/tools/tm-signer-harness/internal/test_harness.go index 115333120..5c577fa17 100644 --- a/tools/tm-signer-harness/internal/test_harness.go +++ b/tools/tm-signer-harness/internal/test_harness.go @@ -49,7 +49,7 @@ var _ error = (*TestHarnessError)(nil) // with this version of Tendermint. type TestHarness struct { addr string - spv *privval.SignerClient + signerClient *privval.SignerClient fpv *privval.FilePV chainID string acceptRetries int @@ -106,9 +106,14 @@ func NewTestHarness(logger log.Logger, cfg TestHarnessConfig) (*TestHarness, err return nil, newTestHarnessError(ErrFailedToCreateListener, err, "") } + signerClient, err:= privval.NewSignerClient(spv) + if err!=nil { + return nil, newTestHarnessError(ErrFailedToCreateListener, err, "") + } + return &TestHarness{ addr: cfg.BindAddr, - spv: spv, + signerClient: signerClient, fpv: fpv, chainID: st.ChainID, acceptRetries: cfg.AcceptRetries, @@ -135,9 +140,11 @@ func (th *TestHarness) Run() { th.logger.Info("Starting test harness") accepted := false var startErr error + for acceptRetries := th.acceptRetries; acceptRetries > 0; acceptRetries-- { th.logger.Info("Attempting to accept incoming connection", "acceptRetries", acceptRetries) - if err := th.spv.Start(); err != nil { + + if err := th.signerClient.WaitForConnection(5 * time.Second); err != nil { // if it wasn't a timeout error if _, ok := err.(timeoutError); !ok { th.logger.Error("Failed to start listener", "err", err) @@ -182,8 +189,8 @@ func (th *TestHarness) Run() { func (th *TestHarness) TestPublicKey() error { th.logger.Info("TEST: Public key of remote signer") th.logger.Info("Local", "pubKey", th.fpv.GetPubKey()) - th.logger.Info("Remote", "pubKey", th.spv.GetPubKey()) - if th.fpv.GetPubKey() != th.spv.GetPubKey() { + th.logger.Info("Remote", "pubKey", th.signerClient.GetPubKey()) + if th.fpv.GetPubKey() != th.signerClient.GetPubKey() { th.logger.Error("FAILED: Local and remote public keys do not match") return newTestHarnessError(ErrTestPublicKeyFailed, nil, "") } @@ -211,7 +218,7 @@ func (th *TestHarness) TestSignProposal() error { Timestamp: time.Now(), } propBytes := prop.SignBytes(th.chainID) - if err := th.spv.SignProposal(th.chainID, prop); err != nil { + if err := th.signerClient.SignProposal(th.chainID, prop); err != nil { th.logger.Error("FAILED: Signing of proposal", "err", err) return newTestHarnessError(ErrTestSignProposalFailed, err, "") } @@ -222,7 +229,7 @@ func (th *TestHarness) TestSignProposal() error { return newTestHarnessError(ErrTestSignProposalFailed, err, "") } // now validate the signature on the proposal - if th.spv.GetPubKey().VerifyBytes(propBytes, prop.Signature) { + if th.signerClient.GetPubKey().VerifyBytes(propBytes, prop.Signature) { th.logger.Info("Successfully validated proposal signature") } else { th.logger.Error("FAILED: Proposal signature validation failed") @@ -255,7 +262,7 @@ func (th *TestHarness) TestSignVote() error { } voteBytes := vote.SignBytes(th.chainID) // sign the vote - if err := th.spv.SignVote(th.chainID, vote); err != nil { + if err := th.signerClient.SignVote(th.chainID, vote); err != nil { th.logger.Error("FAILED: Signing of vote", "err", err) return newTestHarnessError(ErrTestSignVoteFailed, err, fmt.Sprintf("voteType=%d", voteType)) } @@ -266,7 +273,7 @@ func (th *TestHarness) TestSignVote() error { return newTestHarnessError(ErrTestSignVoteFailed, err, fmt.Sprintf("voteType=%d", voteType)) } // now validate the signature on the proposal - if th.spv.GetPubKey().VerifyBytes(voteBytes, vote.Signature) { + if th.signerClient.GetPubKey().VerifyBytes(voteBytes, vote.Signature) { th.logger.Info("Successfully validated vote signature", "type", voteType) } else { th.logger.Error("FAILED: Vote signature validation failed", "type", voteType) @@ -301,10 +308,9 @@ func (th *TestHarness) Shutdown(err error) { }() } - if th.spv.IsRunning() { - if err := th.spv.Stop(); err != nil { - th.logger.Error("Failed to cleanly stop listener: %s", err.Error()) - } + err = th.signerClient.Close() + if err != nil { + th.logger.Error("Failed to cleanly stop listener: %s", err.Error()) } if th.exitWhenComplete {