diff --git a/.gitignore b/.gitignore index 10ee3099c..9e2e5a9ea 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,5 @@ terraform.tfstate.backup terraform.tfstate.d .vscode + +profile\.out diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 833fb6cc0..0c7da31ec 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -19,6 +19,7 @@ program](https://hackerone.com/tendermint). ### IMPROVEMENTS: +- [privval] \#3370 Refactors and simplifies validator/kms connection handling. Please refer to thttps://github.com/tendermint/tendermint/pull/3370#issue-257360971 - [consensus] \#3839 Reduce "Error attempting to add vote" message severity (Error -> Info) ### BUG FIXES: diff --git a/cmd/priv_val_server/main.go b/cmd/priv_val_server/main.go index c86bced81..22af6418f 100644 --- a/cmd/priv_val_server/main.go +++ b/cmd/priv_val_server/main.go @@ -48,15 +48,17 @@ func main() { os.Exit(1) } - rs := privval.NewSignerServiceEndpoint(logger, *chainID, pv, dialer) - err := rs.Start() + sd := privval.NewSignerDialerEndpoint(logger, dialer) + ss := privval.NewSignerServer(sd, *chainID, pv) + + err := ss.Start() if err != nil { panic(err) } // Stop upon receiving SIGTERM or CTRL-C. cmn.TrapSignal(logger, func() { - err := rs.Stop() + err := ss.Stop() if err != nil { panic(err) } diff --git a/node/node.go b/node/node.go index 18cb0ba3b..5c98ea5bf 100644 --- a/node/node.go +++ b/node/node.go @@ -23,7 +23,7 @@ import ( cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/consensus" cs "github.com/tendermint/tendermint/consensus" - "github.com/tendermint/tendermint/crypto/ed25519" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/evidence" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" @@ -278,9 +278,7 @@ func doHandshake(stateDB dbm.DB, state sm.State, blockStore sm.BlockStore, return nil } -func logNodeStartupInfo(state sm.State, privValidator types.PrivValidator, logger, - consensusLogger log.Logger) { - +func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) { // Log the version info. logger.Info("Version info", "software", version.TMCoreSemVer, @@ -296,7 +294,6 @@ func logNodeStartupInfo(state sm.State, privValidator types.PrivValidator, logge ) } - pubKey := privValidator.GetPubKey() addr := pubKey.Address() // Log whether this node is a validator or an observer if state.Validators.HasAddress(addr) { @@ -601,7 +598,13 @@ func NewNode(config *cfg.Config, } } - logNodeStartupInfo(state, privValidator, logger, consensusLogger) + pubKey := privValidator.GetPubKey() + if pubKey == nil { + // TODO: GetPubKey should return errors - https://github.com/tendermint/tendermint/issues/3602 + return nil, errors.New("could not retrieve public key from private validator") + } + + logNodeStartupInfo(state, pubKey, logger, consensusLogger) // Decide whether to fast-sync or not // We don't fast-sync when the only validator is us. @@ -1158,29 +1161,13 @@ func createAndStartPrivValidatorSocketClient( listenAddr string, logger log.Logger, ) (types.PrivValidator, error) { - var listener net.Listener - - protocol, address := cmn.ProtocolAndAddress(listenAddr) - ln, err := net.Listen(protocol, address) + pve, err := privval.NewSignerListener(listenAddr, logger) if err != nil { - return nil, err - } - switch protocol { - case "unix": - listener = privval.NewUnixListener(ln) - case "tcp": - // TODO: persist this key so external signer - // can actually authenticate us - listener = privval.NewTCPListener(ln, ed25519.GenPrivKey()) - default: - return nil, fmt.Errorf( - "wrong listen address: expected either 'tcp' or 'unix' protocols, got %s", - protocol, - ) + return nil, errors.Wrap(err, "failed to start private validator") } - pvsc := privval.NewSignerValidatorEndpoint(logger.With("module", "privval"), listener) - if err := pvsc.Start(); err != nil { + pvsc, err := privval.NewSignerClient(pve) + if err != nil { return nil, errors.Wrap(err, "failed to start private validator") } diff --git a/node/node_test.go b/node/node_test.go index f031c13a9..6cdaceffb 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -136,25 +136,29 @@ func TestNodeSetPrivValTCP(t *testing.T) { config.BaseConfig.PrivValidatorListenAddr = addr dialer := privval.DialTCPFn(addr, 100*time.Millisecond, ed25519.GenPrivKey()) - pvsc := privval.NewSignerServiceEndpoint( + dialerEndpoint := privval.NewSignerDialerEndpoint( log.TestingLogger(), + dialer, + ) + privval.SignerDialerEndpointTimeoutReadWrite(100 * time.Millisecond)(dialerEndpoint) + + signerServer := privval.NewSignerServer( + dialerEndpoint, config.ChainID(), types.NewMockPV(), - dialer, ) - privval.SignerServiceEndpointTimeoutReadWrite(100 * time.Millisecond)(pvsc) go func() { - err := pvsc.Start() + err := signerServer.Start() if err != nil { panic(err) } }() - defer pvsc.Stop() + defer signerServer.Stop() n, err := DefaultNewNode(config, log.TestingLogger()) require.NoError(t, err) - assert.IsType(t, &privval.SignerValidatorEndpoint{}, n.PrivValidator()) + assert.IsType(t, &privval.SignerClient{}, n.PrivValidator()) } // address without a protocol must result in error @@ -178,13 +182,17 @@ func TestNodeSetPrivValIPC(t *testing.T) { config.BaseConfig.PrivValidatorListenAddr = "unix://" + tmpfile dialer := privval.DialUnixFn(tmpfile) - pvsc := privval.NewSignerServiceEndpoint( + dialerEndpoint := privval.NewSignerDialerEndpoint( log.TestingLogger(), + dialer, + ) + privval.SignerDialerEndpointTimeoutReadWrite(100 * time.Millisecond)(dialerEndpoint) + + pvsc := privval.NewSignerServer( + dialerEndpoint, config.ChainID(), types.NewMockPV(), - dialer, ) - privval.SignerServiceEndpointTimeoutReadWrite(100 * time.Millisecond)(pvsc) go func() { err := pvsc.Start() @@ -194,8 +202,7 @@ func TestNodeSetPrivValIPC(t *testing.T) { n, err := DefaultNewNode(config, log.TestingLogger()) require.NoError(t, err) - assert.IsType(t, &privval.SignerValidatorEndpoint{}, n.PrivValidator()) - + assert.IsType(t, &privval.SignerClient{}, n.PrivValidator()) } // testFreeAddr claims a free port so we don't block on listener being ready. diff --git a/privval/doc.go b/privval/doc.go index 80869a6a7..ad60673b6 100644 --- a/privval/doc.go +++ b/privval/doc.go @@ -6,16 +6,16 @@ FilePV FilePV is the simplest implementation and developer default. It uses one file for the private key and another to store state. -SignerValidatorEndpoint +SignerListenerEndpoint -SignerValidatorEndpoint establishes a connection to an external process, like a Key Management Server (KMS), using a socket. -SignerValidatorEndpoint listens for the external KMS process to dial in. -SignerValidatorEndpoint takes a listener, which determines the type of connection +SignerListenerEndpoint establishes a connection to an external process, like a Key Management Server (KMS), using a socket. +SignerListenerEndpoint listens for the external KMS process to dial in. +SignerListenerEndpoint takes a listener, which determines the type of connection (ie. encrypted over tcp, or unencrypted over unix). -SignerServiceEndpoint +SignerDialerEndpoint -SignerServiceEndpoint is a simple wrapper around a net.Conn. It's used by both IPCVal and TCPVal. +SignerDialerEndpoint is a simple wrapper around a net.Conn. It's used by both IPCVal and TCPVal. */ package privval diff --git a/privval/errors.go b/privval/errors.go index 75fb25fc6..9f151f11d 100644 --- a/privval/errors.go +++ b/privval/errors.go @@ -4,10 +4,21 @@ import ( "fmt" ) +type EndpointTimeoutError struct{} + +// Implement the net.Error interface. +func (e EndpointTimeoutError) Error() string { return "endpoint connection timed out" } +func (e EndpointTimeoutError) Timeout() bool { return true } +func (e EndpointTimeoutError) Temporary() bool { return true } + // Socket errors. var ( ErrUnexpectedResponse = fmt.Errorf("received unexpected response") - ErrConnTimeout = fmt.Errorf("remote signer timed out") + ErrNoConnection = fmt.Errorf("endpoint is not connected") + ErrConnectionTimeout = EndpointTimeoutError{} + + ErrReadTimeout = fmt.Errorf("endpoint read timed out") + ErrWriteTimeout = fmt.Errorf("endpoint write timed out") ) // RemoteSignerError allows (remote) validators to include meaningful error descriptions in their reply. @@ -18,5 +29,5 @@ type RemoteSignerError struct { } func (e *RemoteSignerError) Error() string { - return fmt.Sprintf("signerServiceEndpoint returned error #%d: %s", e.Code, e.Description) + return fmt.Sprintf("signerEndpoint returned error #%d: %s", e.Code, e.Description) } diff --git a/privval/file_deprecated_test.go b/privval/file_deprecated_test.go index 46391a3fe..e678bfc09 100644 --- a/privval/file_deprecated_test.go +++ b/privval/file_deprecated_test.go @@ -67,11 +67,11 @@ func assertEqualPV(t *testing.T, oldPV *privval.OldFilePV, newPV *privval.FilePV } func initTmpOldFile(t *testing.T) string { - tmpfile, err := ioutil.TempFile("", "priv_validator_*.json") + tmpFile, err := ioutil.TempFile("", "priv_validator_*.json") require.NoError(t, err) - t.Logf("created test file %s", tmpfile.Name()) - _, err = tmpfile.WriteString(oldPrivvalContent) + t.Logf("created test file %s", tmpFile.Name()) + _, err = tmpFile.WriteString(oldPrivvalContent) require.NoError(t, err) - return tmpfile.Name() + return tmpFile.Name() } diff --git a/privval/file_test.go b/privval/file_test.go index 98de69480..38f6e6fe3 100644 --- a/privval/file_test.go +++ b/privval/file_test.go @@ -58,7 +58,7 @@ func TestResetValidator(t *testing.T) { // priv val after signing is not same as empty assert.NotEqual(t, privVal.LastSignState, emptyState) - // priv val after reset is same as empty + // priv val after AcceptNewConnection is same as empty privVal.Reset() assert.Equal(t, privVal.LastSignState, emptyState) } @@ -164,6 +164,7 @@ func TestSignVote(t *testing.T) { block1 := types.BlockID{Hash: []byte{1, 2, 3}, PartsHeader: types.PartSetHeader{}} block2 := types.BlockID{Hash: []byte{3, 2, 1}, PartsHeader: types.PartSetHeader{}} + height, round := int64(10), 1 voteType := byte(types.PrevoteType) diff --git a/privval/messages.go b/privval/messages.go index 6774a2795..7704049fe 100644 --- a/privval/messages.go +++ b/privval/messages.go @@ -1,61 +1,64 @@ package privval import ( - amino "github.com/tendermint/go-amino" + "github.com/tendermint/go-amino" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/types" ) -// RemoteSignerMsg is sent between SignerServiceEndpoint and the SignerServiceEndpoint client. -type RemoteSignerMsg interface{} +// SignerMessage is sent between Signer Clients and Servers. +type SignerMessage interface{} func RegisterRemoteSignerMsg(cdc *amino.Codec) { - cdc.RegisterInterface((*RemoteSignerMsg)(nil), nil) + cdc.RegisterInterface((*SignerMessage)(nil), nil) cdc.RegisterConcrete(&PubKeyRequest{}, "tendermint/remotesigner/PubKeyRequest", nil) cdc.RegisterConcrete(&PubKeyResponse{}, "tendermint/remotesigner/PubKeyResponse", nil) cdc.RegisterConcrete(&SignVoteRequest{}, "tendermint/remotesigner/SignVoteRequest", nil) cdc.RegisterConcrete(&SignedVoteResponse{}, "tendermint/remotesigner/SignedVoteResponse", nil) cdc.RegisterConcrete(&SignProposalRequest{}, "tendermint/remotesigner/SignProposalRequest", nil) cdc.RegisterConcrete(&SignedProposalResponse{}, "tendermint/remotesigner/SignedProposalResponse", nil) + cdc.RegisterConcrete(&PingRequest{}, "tendermint/remotesigner/PingRequest", nil) cdc.RegisterConcrete(&PingResponse{}, "tendermint/remotesigner/PingResponse", nil) } +// TODO: Add ChainIDRequest + // PubKeyRequest requests the consensus public key from the remote signer. type PubKeyRequest struct{} -// PubKeyResponse is a PrivValidatorSocket message containing the public key. +// PubKeyResponse is a response message containing the public key. type PubKeyResponse struct { PubKey crypto.PubKey Error *RemoteSignerError } -// SignVoteRequest is a PrivValidatorSocket message containing a vote. +// SignVoteRequest is a request to sign a vote type SignVoteRequest struct { Vote *types.Vote } -// SignedVoteResponse is a PrivValidatorSocket message containing a signed vote along with a potenial error message. +// SignedVoteResponse is a response containing a signed vote or an error type SignedVoteResponse struct { Vote *types.Vote Error *RemoteSignerError } -// SignProposalRequest is a PrivValidatorSocket message containing a Proposal. +// SignProposalRequest is a request to sign a proposal type SignProposalRequest struct { Proposal *types.Proposal } -// SignedProposalResponse is a PrivValidatorSocket message containing a proposal response +// SignedProposalResponse is response containing a signed proposal or an error type SignedProposalResponse struct { Proposal *types.Proposal Error *RemoteSignerError } -// PingRequest is a PrivValidatorSocket message to keep the connection alive. +// PingRequest is a request to confirm that the connection is alive. type PingRequest struct { } -// PingRequest is a PrivValidatorSocket response to keep the connection alive. +// PingResponse is a response to confirm that the connection is alive. type PingResponse struct { } diff --git a/privval/signer_client.go b/privval/signer_client.go new file mode 100644 index 000000000..0885ee4aa --- /dev/null +++ b/privval/signer_client.go @@ -0,0 +1,131 @@ +package privval + +import ( + "time" + + "github.com/pkg/errors" + + "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/types" +) + +// SignerClient implements PrivValidator. +// Handles remote validator connections that provide signing services +type SignerClient struct { + endpoint *SignerListenerEndpoint +} + +var _ types.PrivValidator = (*SignerClient)(nil) + +// NewSignerClient returns an instance of SignerClient. +// it will start the endpoint (if not already started) +func NewSignerClient(endpoint *SignerListenerEndpoint) (*SignerClient, error) { + if !endpoint.IsRunning() { + if err := endpoint.Start(); err != nil { + return nil, errors.Wrap(err, "failed to start listener endpoint") + } + } + + return &SignerClient{endpoint: endpoint}, nil +} + +// Close closes the underlying connection +func (sc *SignerClient) Close() error { + return sc.endpoint.Close() +} + +// IsConnected indicates with the signer is connected to a remote signing service +func (sc *SignerClient) IsConnected() bool { + return sc.endpoint.IsConnected() +} + +// WaitForConnection waits maxWait for a connection or returns a timeout error +func (sc *SignerClient) WaitForConnection(maxWait time.Duration) error { + return sc.endpoint.WaitForConnection(maxWait) +} + +//-------------------------------------------------------- +// Implement PrivValidator + +// Ping sends a ping request to the remote signer +func (sc *SignerClient) Ping() error { + response, err := sc.endpoint.SendRequest(&PingRequest{}) + + if err != nil { + sc.endpoint.Logger.Error("SignerClient::Ping", "err", err) + return nil + } + + _, ok := response.(*PingResponse) + if !ok { + sc.endpoint.Logger.Error("SignerClient::Ping", "err", "response != PingResponse") + return err + } + + return nil +} + +// GetPubKey retrieves a public key from a remote signer +func (sc *SignerClient) GetPubKey() crypto.PubKey { + response, err := sc.endpoint.SendRequest(&PubKeyRequest{}) + if err != nil { + sc.endpoint.Logger.Error("SignerClient::GetPubKey", "err", err) + return nil + } + + pubKeyResp, ok := response.(*PubKeyResponse) + if !ok { + sc.endpoint.Logger.Error("SignerClient::GetPubKey", "err", "response != PubKeyResponse") + return nil + } + + if pubKeyResp.Error != nil { + sc.endpoint.Logger.Error("failed to get private validator's public key", "err", pubKeyResp.Error) + return nil + } + + return pubKeyResp.PubKey +} + +// SignVote requests a remote signer to sign a vote +func (sc *SignerClient) SignVote(chainID string, vote *types.Vote) error { + response, err := sc.endpoint.SendRequest(&SignVoteRequest{Vote: vote}) + if err != nil { + sc.endpoint.Logger.Error("SignerClient::SignVote", "err", err) + return err + } + + resp, ok := response.(*SignedVoteResponse) + if !ok { + sc.endpoint.Logger.Error("SignerClient::GetPubKey", "err", "response != SignedVoteResponse") + return ErrUnexpectedResponse + } + + if resp.Error != nil { + return resp.Error + } + *vote = *resp.Vote + + return nil +} + +// SignProposal requests a remote signer to sign a proposal +func (sc *SignerClient) SignProposal(chainID string, proposal *types.Proposal) error { + response, err := sc.endpoint.SendRequest(&SignProposalRequest{Proposal: proposal}) + if err != nil { + sc.endpoint.Logger.Error("SignerClient::SignProposal", "err", err) + return err + } + + resp, ok := response.(*SignedProposalResponse) + if !ok { + sc.endpoint.Logger.Error("SignerClient::SignProposal", "err", "response != SignedProposalResponse") + return ErrUnexpectedResponse + } + if resp.Error != nil { + return resp.Error + } + *proposal = *resp.Proposal + + return nil +} diff --git a/privval/signer_client_test.go b/privval/signer_client_test.go new file mode 100644 index 000000000..3d7cfb3e0 --- /dev/null +++ b/privval/signer_client_test.go @@ -0,0 +1,257 @@ +package privval + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/types" +) + +type signerTestCase struct { + chainID string + mockPV types.PrivValidator + signerClient *SignerClient + signerServer *SignerServer +} + +func getSignerTestCases(t *testing.T) []signerTestCase { + testCases := make([]signerTestCase, 0) + + // Get test cases for each possible dialer (DialTCP / DialUnix / etc) + for _, dtc := range getDialerTestCases(t) { + chainID := common.RandStr(12) + mockPV := types.NewMockPV() + + // get a pair of signer listener, signer dialer endpoints + sl, sd := getMockEndpoints(t, dtc.addr, dtc.dialer) + sc, err := NewSignerClient(sl) + require.NoError(t, err) + ss := NewSignerServer(sd, chainID, mockPV) + + err = ss.Start() + require.NoError(t, err) + + tc := signerTestCase{ + chainID: chainID, + mockPV: mockPV, + signerClient: sc, + signerServer: ss, + } + + testCases = append(testCases, tc) + } + + return testCases +} + +func TestSignerClose(t *testing.T) { + for _, tc := range getSignerTestCases(t) { + err := tc.signerClient.Close() + assert.NoError(t, err) + + err = tc.signerServer.Stop() + assert.NoError(t, err) + } +} + +func TestSignerPing(t *testing.T) { + for _, tc := range getSignerTestCases(t) { + defer tc.signerServer.Stop() + defer tc.signerClient.Close() + + err := tc.signerClient.Ping() + assert.NoError(t, err) + } +} + +func TestSignerGetPubKey(t *testing.T) { + for _, tc := range getSignerTestCases(t) { + defer tc.signerServer.Stop() + defer tc.signerClient.Close() + + pubKey := tc.signerClient.GetPubKey() + expectedPubKey := tc.mockPV.GetPubKey() + + assert.Equal(t, expectedPubKey, pubKey) + + addr := tc.signerClient.GetPubKey().Address() + expectedAddr := tc.mockPV.GetPubKey().Address() + + assert.Equal(t, expectedAddr, addr) + } +} + +func TestSignerProposal(t *testing.T) { + for _, tc := range getSignerTestCases(t) { + ts := time.Now() + want := &types.Proposal{Timestamp: ts} + have := &types.Proposal{Timestamp: ts} + + defer tc.signerServer.Stop() + defer tc.signerClient.Close() + + require.NoError(t, tc.mockPV.SignProposal(tc.chainID, want)) + require.NoError(t, tc.signerClient.SignProposal(tc.chainID, have)) + + assert.Equal(t, want.Signature, have.Signature) + } +} + +func TestSignerVote(t *testing.T) { + for _, tc := range getSignerTestCases(t) { + ts := time.Now() + want := &types.Vote{Timestamp: ts, Type: types.PrecommitType} + have := &types.Vote{Timestamp: ts, Type: types.PrecommitType} + + defer tc.signerServer.Stop() + defer tc.signerClient.Close() + + require.NoError(t, tc.mockPV.SignVote(tc.chainID, want)) + require.NoError(t, tc.signerClient.SignVote(tc.chainID, have)) + + assert.Equal(t, want.Signature, have.Signature) + } +} + +func TestSignerVoteResetDeadline(t *testing.T) { + for _, tc := range getSignerTestCases(t) { + ts := time.Now() + want := &types.Vote{Timestamp: ts, Type: types.PrecommitType} + have := &types.Vote{Timestamp: ts, Type: types.PrecommitType} + + defer tc.signerServer.Stop() + defer tc.signerClient.Close() + + time.Sleep(testTimeoutReadWrite2o3) + + require.NoError(t, tc.mockPV.SignVote(tc.chainID, want)) + require.NoError(t, tc.signerClient.SignVote(tc.chainID, have)) + assert.Equal(t, want.Signature, have.Signature) + + // TODO(jleni): Clarify what is actually being tested + + // This would exceed the deadline if it was not extended by the previous message + time.Sleep(testTimeoutReadWrite2o3) + + require.NoError(t, tc.mockPV.SignVote(tc.chainID, want)) + require.NoError(t, tc.signerClient.SignVote(tc.chainID, have)) + assert.Equal(t, want.Signature, have.Signature) + } +} + +func TestSignerVoteKeepAlive(t *testing.T) { + for _, tc := range getSignerTestCases(t) { + ts := time.Now() + want := &types.Vote{Timestamp: ts, Type: types.PrecommitType} + have := &types.Vote{Timestamp: ts, Type: types.PrecommitType} + + defer tc.signerServer.Stop() + defer tc.signerClient.Close() + + // Check that even if the client does not request a + // signature for a long time. The service is still available + + // in this particular case, we use the dialer logger to ensure that + // test messages are properly interleaved in the test logs + tc.signerServer.Logger.Debug("TEST: Forced Wait -------------------------------------------------") + time.Sleep(testTimeoutReadWrite * 3) + tc.signerServer.Logger.Debug("TEST: Forced Wait DONE---------------------------------------------") + + require.NoError(t, tc.mockPV.SignVote(tc.chainID, want)) + require.NoError(t, tc.signerClient.SignVote(tc.chainID, have)) + + assert.Equal(t, want.Signature, have.Signature) + } +} + +func TestSignerSignProposalErrors(t *testing.T) { + for _, tc := range getSignerTestCases(t) { + // Replace service with a mock that always fails + tc.signerServer.privVal = types.NewErroringMockPV() + tc.mockPV = types.NewErroringMockPV() + + defer tc.signerServer.Stop() + defer tc.signerClient.Close() + + ts := time.Now() + proposal := &types.Proposal{Timestamp: ts} + err := tc.signerClient.SignProposal(tc.chainID, proposal) + require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error()) + + err = tc.mockPV.SignProposal(tc.chainID, proposal) + require.Error(t, err) + + err = tc.signerClient.SignProposal(tc.chainID, proposal) + require.Error(t, err) + } +} + +func TestSignerSignVoteErrors(t *testing.T) { + for _, tc := range getSignerTestCases(t) { + ts := time.Now() + vote := &types.Vote{Timestamp: ts, Type: types.PrecommitType} + + // Replace signer service privval with one that always fails + tc.signerServer.privVal = types.NewErroringMockPV() + tc.mockPV = types.NewErroringMockPV() + + defer tc.signerServer.Stop() + defer tc.signerClient.Close() + + err := tc.signerClient.SignVote(tc.chainID, vote) + require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error()) + + err = tc.mockPV.SignVote(tc.chainID, vote) + require.Error(t, err) + + err = tc.signerClient.SignVote(tc.chainID, vote) + require.Error(t, err) + } +} + +func brokenHandler(privVal types.PrivValidator, request SignerMessage, chainID string) (SignerMessage, error) { + var res SignerMessage + var err error + + switch r := request.(type) { + + // This is broken and will answer most requests with a pubkey response + case *PubKeyRequest: + res = &PubKeyResponse{nil, nil} + case *SignVoteRequest: + res = &PubKeyResponse{nil, nil} + case *SignProposalRequest: + res = &PubKeyResponse{nil, nil} + + case *PingRequest: + err, res = nil, &PingResponse{} + + default: + err = fmt.Errorf("unknown msg: %v", r) + } + + return res, err +} + +func TestSignerUnexpectedResponse(t *testing.T) { + for _, tc := range getSignerTestCases(t) { + tc.signerServer.privVal = types.NewMockPV() + tc.mockPV = types.NewMockPV() + + tc.signerServer.SetRequestHandler(brokenHandler) + + defer tc.signerServer.Stop() + defer tc.signerClient.Close() + + ts := time.Now() + want := &types.Vote{Timestamp: ts, Type: types.PrecommitType} + + e := tc.signerClient.SignVote(tc.chainID, want) + assert.EqualError(t, e, "received unexpected response") + } +} diff --git a/privval/signer_dialer_endpoint.go b/privval/signer_dialer_endpoint.go new file mode 100644 index 000000000..95094c6d0 --- /dev/null +++ b/privval/signer_dialer_endpoint.go @@ -0,0 +1,84 @@ +package privval + +import ( + "time" + + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" +) + +const ( + defaultMaxDialRetries = 10 + defaultRetryWaitMilliseconds = 100 +) + +// SignerServiceEndpointOption sets an optional parameter on the SignerDialerEndpoint. +type SignerServiceEndpointOption func(*SignerDialerEndpoint) + +// SignerDialerEndpointTimeoutReadWrite sets the read and write timeout for connections +// from external signing processes. +func SignerDialerEndpointTimeoutReadWrite(timeout time.Duration) SignerServiceEndpointOption { + return func(ss *SignerDialerEndpoint) { ss.timeoutReadWrite = timeout } +} + +// SignerDialerEndpointConnRetries sets the amount of attempted retries to acceptNewConnection. +func SignerDialerEndpointConnRetries(retries int) SignerServiceEndpointOption { + return func(ss *SignerDialerEndpoint) { ss.maxConnRetries = retries } +} + +// SignerDialerEndpoint dials using its dialer and responds to any +// signature requests using its privVal. +type SignerDialerEndpoint struct { + signerEndpoint + + dialer SocketDialer + + retryWait time.Duration + maxConnRetries int +} + +// NewSignerDialerEndpoint returns a SignerDialerEndpoint that will dial using the given +// dialer and respond to any signature requests over the connection +// using the given privVal. +func NewSignerDialerEndpoint( + logger log.Logger, + dialer SocketDialer, +) *SignerDialerEndpoint { + + sd := &SignerDialerEndpoint{ + dialer: dialer, + retryWait: defaultRetryWaitMilliseconds * time.Millisecond, + maxConnRetries: defaultMaxDialRetries, + } + + sd.BaseService = *cmn.NewBaseService(logger, "SignerDialerEndpoint", sd) + sd.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second + + return sd +} + +func (sd *SignerDialerEndpoint) ensureConnection() error { + if sd.IsConnected() { + return nil + } + + retries := 0 + for retries < sd.maxConnRetries { + conn, err := sd.dialer() + + if err != nil { + retries++ + sd.Logger.Debug("SignerDialer: Reconnection failed", "retries", retries, "max", sd.maxConnRetries, "err", err) + // Wait between retries + time.Sleep(sd.retryWait) + } else { + sd.SetConnection(conn) + sd.Logger.Debug("SignerDialer: Connection Ready") + return nil + } + } + + sd.Logger.Debug("SignerDialer: Max retries exceeded", "retries", retries, "max", sd.maxConnRetries) + + return ErrNoConnection +} diff --git a/privval/signer_endpoint.go b/privval/signer_endpoint.go new file mode 100644 index 000000000..425f73fea --- /dev/null +++ b/privval/signer_endpoint.go @@ -0,0 +1,156 @@ +package privval + +import ( + "fmt" + "net" + "sync" + "time" + + "github.com/pkg/errors" + + cmn "github.com/tendermint/tendermint/libs/common" +) + +const ( + defaultTimeoutReadWriteSeconds = 3 +) + +type signerEndpoint struct { + cmn.BaseService + + connMtx sync.Mutex + conn net.Conn + + timeoutReadWrite time.Duration +} + +// Close closes the underlying net.Conn. +func (se *signerEndpoint) Close() error { + se.DropConnection() + return nil +} + +// IsConnected indicates if there is an active connection +func (se *signerEndpoint) IsConnected() bool { + se.connMtx.Lock() + defer se.connMtx.Unlock() + return se.isConnected() +} + +// TryGetConnection retrieves a connection if it is already available +func (se *signerEndpoint) GetAvailableConnection(connectionAvailableCh chan net.Conn) bool { + se.connMtx.Lock() + defer se.connMtx.Unlock() + + // Is there a connection ready? + select { + case se.conn = <-connectionAvailableCh: + return true + default: + } + return false +} + +// TryGetConnection retrieves a connection if it is already available +func (se *signerEndpoint) WaitConnection(connectionAvailableCh chan net.Conn, maxWait time.Duration) error { + se.connMtx.Lock() + defer se.connMtx.Unlock() + + select { + case se.conn = <-connectionAvailableCh: + case <-time.After(maxWait): + return ErrConnectionTimeout + } + + return nil +} + +// SetConnection replaces the current connection object +func (se *signerEndpoint) SetConnection(newConnection net.Conn) { + se.connMtx.Lock() + defer se.connMtx.Unlock() + se.conn = newConnection +} + +// IsConnected indicates if there is an active connection +func (se *signerEndpoint) DropConnection() { + se.connMtx.Lock() + defer se.connMtx.Unlock() + se.dropConnection() +} + +// ReadMessage reads a message from the endpoint +func (se *signerEndpoint) ReadMessage() (msg SignerMessage, err error) { + se.connMtx.Lock() + defer se.connMtx.Unlock() + + if !se.isConnected() { + return nil, fmt.Errorf("endpoint is not connected") + } + + // Reset read deadline + deadline := time.Now().Add(se.timeoutReadWrite) + + err = se.conn.SetReadDeadline(deadline) + if err != nil { + return + } + + const maxRemoteSignerMsgSize = 1024 * 10 + _, err = cdc.UnmarshalBinaryLengthPrefixedReader(se.conn, &msg, maxRemoteSignerMsgSize) + if _, ok := err.(timeoutError); ok { + if err != nil { + err = errors.Wrap(ErrReadTimeout, err.Error()) + } else { + err = errors.Wrap(ErrReadTimeout, "Empty error") + } + se.Logger.Debug("Dropping [read]", "obj", se) + se.dropConnection() + } + + return +} + +// WriteMessage writes a message from the endpoint +func (se *signerEndpoint) WriteMessage(msg SignerMessage) (err error) { + se.connMtx.Lock() + defer se.connMtx.Unlock() + + if !se.isConnected() { + return errors.Wrap(ErrNoConnection, "endpoint is not connected") + } + + // Reset read deadline + deadline := time.Now().Add(se.timeoutReadWrite) + se.Logger.Debug("Write::Error Resetting deadline", "obj", se) + + err = se.conn.SetWriteDeadline(deadline) + if err != nil { + return + } + + _, err = cdc.MarshalBinaryLengthPrefixedWriter(se.conn, msg) + if _, ok := err.(timeoutError); ok { + if err != nil { + err = errors.Wrap(ErrWriteTimeout, err.Error()) + } else { + err = errors.Wrap(ErrWriteTimeout, "Empty error") + } + se.dropConnection() + } + + return +} + +func (se *signerEndpoint) isConnected() bool { + return se.conn != nil +} + +func (se *signerEndpoint) dropConnection() { + if se.conn != nil { + if err := se.conn.Close(); err != nil { + se.Logger.Error("signerEndpoint::dropConnection", "err", err) + } + se.conn = nil + } +} diff --git a/privval/signer_listener_endpoint.go b/privval/signer_listener_endpoint.go new file mode 100644 index 000000000..e25f18756 --- /dev/null +++ b/privval/signer_listener_endpoint.go @@ -0,0 +1,198 @@ +package privval + +import ( + "fmt" + "net" + "sync" + "time" + + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" +) + +// SignerValidatorEndpointOption sets an optional parameter on the SocketVal. +type SignerValidatorEndpointOption func(*SignerListenerEndpoint) + +// SignerListenerEndpoint listens for an external process to dial in +// and keeps the connection alive by dropping and reconnecting +type SignerListenerEndpoint struct { + signerEndpoint + + listener net.Listener + connectRequestCh chan struct{} + connectionAvailableCh chan net.Conn + + timeoutAccept time.Duration + pingTimer *time.Ticker + + instanceMtx sync.Mutex // Ensures instance public methods access, i.e. SendRequest +} + +// NewSignerListenerEndpoint returns an instance of SignerListenerEndpoint. +func NewSignerListenerEndpoint( + logger log.Logger, + listener net.Listener, +) *SignerListenerEndpoint { + sc := &SignerListenerEndpoint{ + listener: listener, + timeoutAccept: defaultTimeoutAcceptSeconds * time.Second, + } + + sc.BaseService = *cmn.NewBaseService(logger, "SignerListenerEndpoint", sc) + sc.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second + return sc +} + +// OnStart implements cmn.Service. +func (sl *SignerListenerEndpoint) OnStart() error { + sl.connectRequestCh = make(chan struct{}) + sl.connectionAvailableCh = make(chan net.Conn) + + sl.pingTimer = time.NewTicker(defaultPingPeriodMilliseconds * time.Millisecond) + + go sl.serviceLoop() + go sl.pingLoop() + + sl.connectRequestCh <- struct{}{} + + return nil +} + +// OnStop implements cmn.Service +func (sl *SignerListenerEndpoint) OnStop() { + sl.instanceMtx.Lock() + defer sl.instanceMtx.Unlock() + _ = sl.Close() + + // Stop listening + if sl.listener != nil { + if err := sl.listener.Close(); err != nil { + sl.Logger.Error("Closing Listener", "err", err) + sl.listener = nil + } + } + + sl.pingTimer.Stop() +} + +// WaitForConnection waits maxWait for a connection or returns a timeout error +func (sl *SignerListenerEndpoint) WaitForConnection(maxWait time.Duration) error { + sl.instanceMtx.Lock() + defer sl.instanceMtx.Unlock() + return sl.ensureConnection(maxWait) +} + +// SendRequest ensures there is a connection, sends a request and waits for a response +func (sl *SignerListenerEndpoint) SendRequest(request SignerMessage) (SignerMessage, error) { + sl.instanceMtx.Lock() + defer sl.instanceMtx.Unlock() + + err := sl.ensureConnection(sl.timeoutAccept) + if err != nil { + return nil, err + } + + err = sl.WriteMessage(request) + if err != nil { + return nil, err + } + + res, err := sl.ReadMessage() + if err != nil { + return nil, err + } + + return res, nil +} + +func (sl *SignerListenerEndpoint) ensureConnection(maxWait time.Duration) error { + if sl.IsConnected() { + return nil + } + + // Is there a connection ready? then use it + if sl.GetAvailableConnection(sl.connectionAvailableCh) { + return nil + } + + // block until connected or timeout + sl.triggerConnect() + err := sl.WaitConnection(sl.connectionAvailableCh, maxWait) + if err != nil { + return err + } + + return nil +} + +func (sl *SignerListenerEndpoint) acceptNewConnection() (net.Conn, error) { + if !sl.IsRunning() || sl.listener == nil { + return nil, fmt.Errorf("endpoint is closing") + } + + // wait for a new conn + sl.Logger.Info("SignerListener: Listening for new connection") + conn, err := sl.listener.Accept() + if err != nil { + return nil, err + } + + return conn, nil +} + +func (sl *SignerListenerEndpoint) triggerConnect() { + select { + case sl.connectRequestCh <- struct{}{}: + default: + } +} + +func (sl *SignerListenerEndpoint) triggerReconnect() { + sl.DropConnection() + sl.triggerConnect() +} + +func (sl *SignerListenerEndpoint) serviceLoop() { + for { + select { + case <-sl.connectRequestCh: + { + conn, err := sl.acceptNewConnection() + if err == nil { + sl.Logger.Info("SignerListener: Connected") + + // We have a good connection, wait for someone that needs one otherwise cancellation + select { + case sl.connectionAvailableCh <- conn: + case <-sl.Quit(): + return + } + } + + select { + case sl.connectRequestCh <- struct{}{}: + default: + } + } + case <-sl.Quit(): + return + } + } +} + +func (sl *SignerListenerEndpoint) pingLoop() { + for { + select { + case <-sl.pingTimer.C: + { + _, err := sl.SendRequest(&PingRequest{}) + if err != nil { + sl.Logger.Error("SignerListener: Ping timeout") + sl.triggerReconnect() + } + } + case <-sl.Quit(): + return + } + } +} diff --git a/privval/signer_listener_endpoint_test.go b/privval/signer_listener_endpoint_test.go new file mode 100644 index 000000000..7058ff8b8 --- /dev/null +++ b/privval/signer_listener_endpoint_test.go @@ -0,0 +1,198 @@ +package privval + +import ( + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/crypto/ed25519" + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/types" +) + +var ( + testTimeoutAccept = defaultTimeoutAcceptSeconds * time.Second + + testTimeoutReadWrite = 100 * time.Millisecond + testTimeoutReadWrite2o3 = 60 * time.Millisecond // 2/3 of the other one +) + +type dialerTestCase struct { + addr string + dialer SocketDialer +} + +// TestSignerRemoteRetryTCPOnly will test connection retry attempts over TCP. We +// don't need this for Unix sockets because the OS instantly knows the state of +// both ends of the socket connection. This basically causes the +// SignerDialerEndpoint.dialer() call inside SignerDialerEndpoint.acceptNewConnection() to return +// successfully immediately, putting an instant stop to any retry attempts. +func TestSignerRemoteRetryTCPOnly(t *testing.T) { + var ( + attemptCh = make(chan int) + retries = 10 + ) + + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + // Continuously Accept connection and close {attempts} times + go func(ln net.Listener, attemptCh chan<- int) { + attempts := 0 + for { + conn, err := ln.Accept() + require.NoError(t, err) + + err = conn.Close() + require.NoError(t, err) + + attempts++ + + if attempts == retries { + attemptCh <- attempts + break + } + } + }(ln, attemptCh) + + dialerEndpoint := NewSignerDialerEndpoint( + log.TestingLogger(), + DialTCPFn(ln.Addr().String(), testTimeoutReadWrite, ed25519.GenPrivKey()), + ) + SignerDialerEndpointTimeoutReadWrite(time.Millisecond)(dialerEndpoint) + SignerDialerEndpointConnRetries(retries)(dialerEndpoint) + + chainId := cmn.RandStr(12) + mockPV := types.NewMockPV() + signerServer := NewSignerServer(dialerEndpoint, chainId, mockPV) + + err = signerServer.Start() + require.NoError(t, err) + defer signerServer.Stop() + + select { + case attempts := <-attemptCh: + assert.Equal(t, retries, attempts) + case <-time.After(1500 * time.Millisecond): + t.Error("expected remote to observe connection attempts") + } +} + +func TestRetryConnToRemoteSigner(t *testing.T) { + for _, tc := range getDialerTestCases(t) { + var ( + logger = log.TestingLogger() + chainID = cmn.RandStr(12) + mockPV = types.NewMockPV() + endpointIsOpenCh = make(chan struct{}) + thisConnTimeout = testTimeoutReadWrite + listenerEndpoint = newSignerListenerEndpoint(logger, tc.addr, thisConnTimeout) + ) + + dialerEndpoint := NewSignerDialerEndpoint( + logger, + tc.dialer, + ) + SignerDialerEndpointTimeoutReadWrite(testTimeoutReadWrite)(dialerEndpoint) + SignerDialerEndpointConnRetries(10)(dialerEndpoint) + + signerServer := NewSignerServer(dialerEndpoint, chainID, mockPV) + + startListenerEndpointAsync(t, listenerEndpoint, endpointIsOpenCh) + defer listenerEndpoint.Stop() + + require.NoError(t, signerServer.Start()) + assert.True(t, signerServer.IsRunning()) + <-endpointIsOpenCh + signerServer.Stop() + + dialerEndpoint2 := NewSignerDialerEndpoint( + logger, + tc.dialer, + ) + signerServer2 := NewSignerServer(dialerEndpoint2, chainID, mockPV) + + // let some pings pass + require.NoError(t, signerServer2.Start()) + assert.True(t, signerServer2.IsRunning()) + defer signerServer2.Stop() + + // give the client some time to re-establish the conn to the remote signer + // should see sth like this in the logs: + // + // E[10016-01-10|17:12:46.128] Ping err="remote signer timed out" + // I[10016-01-10|17:16:42.447] Re-created connection to remote signer impl=SocketVal + time.Sleep(testTimeoutReadWrite * 2) + } +} + +/////////////////////////////////// + +func newSignerListenerEndpoint(logger log.Logger, addr string, timeoutReadWrite time.Duration) *SignerListenerEndpoint { + proto, address := cmn.ProtocolAndAddress(addr) + + ln, err := net.Listen(proto, address) + logger.Info("SignerListener: Listening", "proto", proto, "address", address) + if err != nil { + panic(err) + } + + var listener net.Listener + + if proto == "unix" { + unixLn := NewUnixListener(ln) + UnixListenerTimeoutAccept(testTimeoutAccept)(unixLn) + UnixListenerTimeoutReadWrite(timeoutReadWrite)(unixLn) + listener = unixLn + } else { + tcpLn := NewTCPListener(ln, ed25519.GenPrivKey()) + TCPListenerTimeoutAccept(testTimeoutAccept)(tcpLn) + TCPListenerTimeoutReadWrite(timeoutReadWrite)(tcpLn) + listener = tcpLn + } + + return NewSignerListenerEndpoint(logger, listener) +} + +func startListenerEndpointAsync(t *testing.T, sle *SignerListenerEndpoint, endpointIsOpenCh chan struct{}) { + go func(sle *SignerListenerEndpoint) { + require.NoError(t, sle.Start()) + assert.True(t, sle.IsRunning()) + close(endpointIsOpenCh) + }(sle) +} + +func getMockEndpoints( + t *testing.T, + addr string, + socketDialer SocketDialer, +) (*SignerListenerEndpoint, *SignerDialerEndpoint) { + + var ( + logger = log.TestingLogger() + endpointIsOpenCh = make(chan struct{}) + + dialerEndpoint = NewSignerDialerEndpoint( + logger, + socketDialer, + ) + + listenerEndpoint = newSignerListenerEndpoint(logger, addr, testTimeoutReadWrite) + ) + + SignerDialerEndpointTimeoutReadWrite(testTimeoutReadWrite)(dialerEndpoint) + SignerDialerEndpointConnRetries(1e6)(dialerEndpoint) + + startListenerEndpointAsync(t, listenerEndpoint, endpointIsOpenCh) + + require.NoError(t, dialerEndpoint.Start()) + assert.True(t, dialerEndpoint.IsRunning()) + + <-endpointIsOpenCh + + return listenerEndpoint, dialerEndpoint +} diff --git a/privval/signer_remote.go b/privval/signer_remote.go deleted file mode 100644 index 53b0cb773..000000000 --- a/privval/signer_remote.go +++ /dev/null @@ -1,192 +0,0 @@ -package privval - -import ( - "fmt" - "io" - "net" - - "github.com/pkg/errors" - - "github.com/tendermint/tendermint/crypto" - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/types" -) - -// SignerRemote implements PrivValidator. -// It uses a net.Conn to request signatures from an external process. -type SignerRemote struct { - conn net.Conn - - // memoized - consensusPubKey crypto.PubKey -} - -// Check that SignerRemote implements PrivValidator. -var _ types.PrivValidator = (*SignerRemote)(nil) - -// NewSignerRemote returns an instance of SignerRemote. -func NewSignerRemote(conn net.Conn) (*SignerRemote, error) { - - // retrieve and memoize the consensus public key once. - pubKey, err := getPubKey(conn) - if err != nil { - return nil, cmn.ErrorWrap(err, "error while retrieving public key for remote signer") - } - return &SignerRemote{ - conn: conn, - consensusPubKey: pubKey, - }, nil -} - -// Close calls Close on the underlying net.Conn. -func (sc *SignerRemote) Close() error { - return sc.conn.Close() -} - -// GetPubKey implements PrivValidator. -func (sc *SignerRemote) GetPubKey() crypto.PubKey { - return sc.consensusPubKey -} - -// not thread-safe (only called on startup). -func getPubKey(conn net.Conn) (crypto.PubKey, error) { - err := writeMsg(conn, &PubKeyRequest{}) - if err != nil { - return nil, err - } - - res, err := readMsg(conn) - if err != nil { - return nil, err - } - - pubKeyResp, ok := res.(*PubKeyResponse) - if !ok { - return nil, errors.Wrap(ErrUnexpectedResponse, "response is not PubKeyResponse") - } - - if pubKeyResp.Error != nil { - return nil, errors.Wrap(pubKeyResp.Error, "failed to get private validator's public key") - } - - return pubKeyResp.PubKey, nil -} - -// SignVote implements PrivValidator. -func (sc *SignerRemote) SignVote(chainID string, vote *types.Vote) error { - err := writeMsg(sc.conn, &SignVoteRequest{Vote: vote}) - if err != nil { - return err - } - - res, err := readMsg(sc.conn) - if err != nil { - return err - } - - resp, ok := res.(*SignedVoteResponse) - if !ok { - return ErrUnexpectedResponse - } - if resp.Error != nil { - return resp.Error - } - *vote = *resp.Vote - - return nil -} - -// SignProposal implements PrivValidator. -func (sc *SignerRemote) SignProposal(chainID string, proposal *types.Proposal) error { - err := writeMsg(sc.conn, &SignProposalRequest{Proposal: proposal}) - if err != nil { - return err - } - - res, err := readMsg(sc.conn) - if err != nil { - return err - } - resp, ok := res.(*SignedProposalResponse) - if !ok { - return ErrUnexpectedResponse - } - if resp.Error != nil { - return resp.Error - } - *proposal = *resp.Proposal - - return nil -} - -// Ping is used to check connection health. -func (sc *SignerRemote) Ping() error { - err := writeMsg(sc.conn, &PingRequest{}) - if err != nil { - return err - } - - res, err := readMsg(sc.conn) - if err != nil { - return err - } - _, ok := res.(*PingResponse) - if !ok { - return ErrUnexpectedResponse - } - - return nil -} - -func readMsg(r io.Reader) (msg RemoteSignerMsg, err error) { - const maxRemoteSignerMsgSize = 1024 * 10 - _, err = cdc.UnmarshalBinaryLengthPrefixedReader(r, &msg, maxRemoteSignerMsgSize) - if _, ok := err.(timeoutError); ok { - err = cmn.ErrorWrap(ErrConnTimeout, err.Error()) - } - return -} - -func writeMsg(w io.Writer, msg interface{}) (err error) { - _, err = cdc.MarshalBinaryLengthPrefixedWriter(w, msg) - if _, ok := err.(timeoutError); ok { - err = cmn.ErrorWrap(ErrConnTimeout, err.Error()) - } - return -} - -func handleRequest(req RemoteSignerMsg, chainID string, privVal types.PrivValidator) (RemoteSignerMsg, error) { - var res RemoteSignerMsg - var err error - - switch r := req.(type) { - case *PubKeyRequest: - var p crypto.PubKey - p = privVal.GetPubKey() - res = &PubKeyResponse{p, nil} - - case *SignVoteRequest: - err = privVal.SignVote(chainID, r.Vote) - if err != nil { - res = &SignedVoteResponse{nil, &RemoteSignerError{0, err.Error()}} - } else { - res = &SignedVoteResponse{r.Vote, nil} - } - - case *SignProposalRequest: - err = privVal.SignProposal(chainID, r.Proposal) - if err != nil { - res = &SignedProposalResponse{nil, &RemoteSignerError{0, err.Error()}} - } else { - res = &SignedProposalResponse{r.Proposal, nil} - } - - case *PingRequest: - res = &PingResponse{} - - default: - err = fmt.Errorf("unknown msg: %v", r) - } - - return res, err -} diff --git a/privval/signer_remote_test.go b/privval/signer_remote_test.go deleted file mode 100644 index 28230b803..000000000 --- a/privval/signer_remote_test.go +++ /dev/null @@ -1,68 +0,0 @@ -package privval - -import ( - "net" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/crypto/ed25519" - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/types" -) - -// TestSignerRemoteRetryTCPOnly will test connection retry attempts over TCP. We -// don't need this for Unix sockets because the OS instantly knows the state of -// both ends of the socket connection. This basically causes the -// SignerServiceEndpoint.dialer() call inside SignerServiceEndpoint.connect() to return -// successfully immediately, putting an instant stop to any retry attempts. -func TestSignerRemoteRetryTCPOnly(t *testing.T) { - var ( - attemptCh = make(chan int) - retries = 2 - ) - - ln, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - - go func(ln net.Listener, attemptCh chan<- int) { - attempts := 0 - - for { - conn, err := ln.Accept() - require.NoError(t, err) - - err = conn.Close() - require.NoError(t, err) - - attempts++ - - if attempts == retries { - attemptCh <- attempts - break - } - } - }(ln, attemptCh) - - serviceEndpoint := NewSignerServiceEndpoint( - log.TestingLogger(), - cmn.RandStr(12), - types.NewMockPV(), - DialTCPFn(ln.Addr().String(), testTimeoutReadWrite, ed25519.GenPrivKey()), - ) - defer serviceEndpoint.Stop() - - SignerServiceEndpointTimeoutReadWrite(time.Millisecond)(serviceEndpoint) - SignerServiceEndpointConnRetries(retries)(serviceEndpoint) - - assert.Equal(t, serviceEndpoint.Start(), ErrDialRetryMax) - - select { - case attempts := <-attemptCh: - assert.Equal(t, retries, attempts) - case <-time.After(100 * time.Millisecond): - t.Error("expected remote to observe connection attempts") - } -} diff --git a/privval/signer_requestHandler.go b/privval/signer_requestHandler.go new file mode 100644 index 000000000..dcab7752e --- /dev/null +++ b/privval/signer_requestHandler.go @@ -0,0 +1,44 @@ +package privval + +import ( + "fmt" + + "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/types" +) + +func DefaultValidationRequestHandler(privVal types.PrivValidator, req SignerMessage, chainID string) (SignerMessage, error) { + var res SignerMessage + var err error + + switch r := req.(type) { + case *PubKeyRequest: + var p crypto.PubKey + p = privVal.GetPubKey() + res = &PubKeyResponse{p, nil} + + case *SignVoteRequest: + err = privVal.SignVote(chainID, r.Vote) + if err != nil { + res = &SignedVoteResponse{nil, &RemoteSignerError{0, err.Error()}} + } else { + res = &SignedVoteResponse{r.Vote, nil} + } + + case *SignProposalRequest: + err = privVal.SignProposal(chainID, r.Proposal) + if err != nil { + res = &SignedProposalResponse{nil, &RemoteSignerError{0, err.Error()}} + } else { + res = &SignedProposalResponse{r.Proposal, nil} + } + + case *PingRequest: + err, res = nil, &PingResponse{} + + default: + err = fmt.Errorf("unknown msg: %v", r) + } + + return res, err +} diff --git a/privval/signer_server.go b/privval/signer_server.go new file mode 100644 index 000000000..62dcc461c --- /dev/null +++ b/privval/signer_server.go @@ -0,0 +1,107 @@ +package privval + +import ( + "io" + "sync" + + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/types" +) + +// ValidationRequestHandlerFunc handles different remoteSigner requests +type ValidationRequestHandlerFunc func( + privVal types.PrivValidator, + requestMessage SignerMessage, + chainID string) (SignerMessage, error) + +type SignerServer struct { + cmn.BaseService + + endpoint *SignerDialerEndpoint + chainID string + privVal types.PrivValidator + + handlerMtx sync.Mutex + validationRequestHandler ValidationRequestHandlerFunc +} + +func NewSignerServer(endpoint *SignerDialerEndpoint, chainID string, privVal types.PrivValidator) *SignerServer { + ss := &SignerServer{ + endpoint: endpoint, + chainID: chainID, + privVal: privVal, + validationRequestHandler: DefaultValidationRequestHandler, + } + + ss.BaseService = *cmn.NewBaseService(endpoint.Logger, "SignerServer", ss) + + return ss +} + +// OnStart implements cmn.Service. +func (ss *SignerServer) OnStart() error { + go ss.serviceLoop() + return nil +} + +// OnStop implements cmn.Service. +func (ss *SignerServer) OnStop() { + ss.endpoint.Logger.Debug("SignerServer: OnStop calling Close") + _ = ss.endpoint.Close() +} + +// SetRequestHandler override the default function that is used to service requests +func (ss *SignerServer) SetRequestHandler(validationRequestHandler ValidationRequestHandlerFunc) { + ss.handlerMtx.Lock() + defer ss.handlerMtx.Unlock() + ss.validationRequestHandler = validationRequestHandler +} + +func (ss *SignerServer) servicePendingRequest() { + if !ss.IsRunning() { + return // Ignore error from closing. + } + + req, err := ss.endpoint.ReadMessage() + if err != nil { + if err != io.EOF { + ss.Logger.Error("SignerServer: HandleMessage", "err", err) + } + return + } + + var res SignerMessage + { + // limit the scope of the lock + ss.handlerMtx.Lock() + defer ss.handlerMtx.Unlock() + res, err = ss.validationRequestHandler(ss.privVal, req, ss.chainID) + if err != nil { + // only log the error; we'll reply with an error in res + ss.Logger.Error("SignerServer: handleMessage", "err", err) + } + } + + if res != nil { + err = ss.endpoint.WriteMessage(res) + if err != nil { + ss.Logger.Error("SignerServer: writeMessage", "err", err) + } + } +} + +func (ss *SignerServer) serviceLoop() { + for { + select { + default: + err := ss.endpoint.ensureConnection() + if err != nil { + return + } + ss.servicePendingRequest() + + case <-ss.Quit(): + return + } + } +} diff --git a/privval/signer_service_endpoint.go b/privval/signer_service_endpoint.go deleted file mode 100644 index 1b37d5fc6..000000000 --- a/privval/signer_service_endpoint.go +++ /dev/null @@ -1,139 +0,0 @@ -package privval - -import ( - "io" - "net" - "time" - - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/types" -) - -// SignerServiceEndpointOption sets an optional parameter on the SignerServiceEndpoint. -type SignerServiceEndpointOption func(*SignerServiceEndpoint) - -// SignerServiceEndpointTimeoutReadWrite sets the read and write timeout for connections -// from external signing processes. -func SignerServiceEndpointTimeoutReadWrite(timeout time.Duration) SignerServiceEndpointOption { - return func(ss *SignerServiceEndpoint) { ss.timeoutReadWrite = timeout } -} - -// SignerServiceEndpointConnRetries sets the amount of attempted retries to connect. -func SignerServiceEndpointConnRetries(retries int) SignerServiceEndpointOption { - return func(ss *SignerServiceEndpoint) { ss.connRetries = retries } -} - -// SignerServiceEndpoint dials using its dialer and responds to any -// signature requests using its privVal. -type SignerServiceEndpoint struct { - cmn.BaseService - - chainID string - timeoutReadWrite time.Duration - connRetries int - privVal types.PrivValidator - - dialer SocketDialer - conn net.Conn -} - -// NewSignerServiceEndpoint returns a SignerServiceEndpoint that will dial using the given -// dialer and respond to any signature requests over the connection -// using the given privVal. -func NewSignerServiceEndpoint( - logger log.Logger, - chainID string, - privVal types.PrivValidator, - dialer SocketDialer, -) *SignerServiceEndpoint { - se := &SignerServiceEndpoint{ - chainID: chainID, - timeoutReadWrite: time.Second * defaultTimeoutReadWriteSeconds, - connRetries: defaultMaxDialRetries, - privVal: privVal, - dialer: dialer, - } - - se.BaseService = *cmn.NewBaseService(logger, "SignerServiceEndpoint", se) - return se -} - -// OnStart implements cmn.Service. -func (se *SignerServiceEndpoint) OnStart() error { - conn, err := se.connect() - if err != nil { - se.Logger.Error("OnStart", "err", err) - return err - } - - se.conn = conn - go se.handleConnection(conn) - - return nil -} - -// OnStop implements cmn.Service. -func (se *SignerServiceEndpoint) OnStop() { - if se.conn == nil { - return - } - - if err := se.conn.Close(); err != nil { - se.Logger.Error("OnStop", "err", cmn.ErrorWrap(err, "closing listener failed")) - } -} - -func (se *SignerServiceEndpoint) connect() (net.Conn, error) { - for retries := 0; retries < se.connRetries; retries++ { - // Don't sleep if it is the first retry. - if retries > 0 { - time.Sleep(se.timeoutReadWrite) - } - - conn, err := se.dialer() - if err == nil { - return conn, nil - } - - se.Logger.Error("dialing", "err", err) - } - - return nil, ErrDialRetryMax -} - -func (se *SignerServiceEndpoint) handleConnection(conn net.Conn) { - for { - if !se.IsRunning() { - return // Ignore error from listener closing. - } - - // Reset the connection deadline - deadline := time.Now().Add(se.timeoutReadWrite) - err := conn.SetDeadline(deadline) - if err != nil { - return - } - - req, err := readMsg(conn) - if err != nil { - if err != io.EOF { - se.Logger.Error("handleConnection readMsg", "err", err) - } - return - } - - res, err := handleRequest(req, se.chainID, se.privVal) - - if err != nil { - // only log the error; we'll reply with an error in res - se.Logger.Error("handleConnection handleRequest", "err", err) - } - - err = writeMsg(conn, res) - if err != nil { - se.Logger.Error("handleConnection writeMsg", "err", err) - return - } - } -} diff --git a/privval/signer_validator_endpoint.go b/privval/signer_validator_endpoint.go deleted file mode 100644 index 6dc7f99d5..000000000 --- a/privval/signer_validator_endpoint.go +++ /dev/null @@ -1,230 +0,0 @@ -package privval - -import ( - "fmt" - "net" - "sync" - "time" - - "github.com/tendermint/tendermint/crypto" - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/types" -) - -const ( - defaultHeartbeatSeconds = 2 - defaultMaxDialRetries = 10 -) - -var ( - heartbeatPeriod = time.Second * defaultHeartbeatSeconds -) - -// SignerValidatorEndpointOption sets an optional parameter on the SocketVal. -type SignerValidatorEndpointOption func(*SignerValidatorEndpoint) - -// SignerValidatorEndpointSetHeartbeat sets the period on which to check the liveness of the -// connected Signer connections. -func SignerValidatorEndpointSetHeartbeat(period time.Duration) SignerValidatorEndpointOption { - return func(sc *SignerValidatorEndpoint) { sc.heartbeatPeriod = period } -} - -// SocketVal implements PrivValidator. -// It listens for an external process to dial in and uses -// the socket to request signatures. -type SignerValidatorEndpoint struct { - cmn.BaseService - - listener net.Listener - - // ping - cancelPingCh chan struct{} - pingTicker *time.Ticker - heartbeatPeriod time.Duration - - // signer is mutable since it can be reset if the connection fails. - // failures are detected by a background ping routine. - // All messages are request/response, so we hold the mutex - // so only one request/response pair can happen at a time. - // Methods on the underlying net.Conn itself are already goroutine safe. - mtx sync.Mutex - - // TODO: Signer should encapsulate and hide the endpoint completely. Invert the relation - signer *SignerRemote -} - -// Check that SignerValidatorEndpoint implements PrivValidator. -var _ types.PrivValidator = (*SignerValidatorEndpoint)(nil) - -// NewSignerValidatorEndpoint returns an instance of SignerValidatorEndpoint. -func NewSignerValidatorEndpoint(logger log.Logger, listener net.Listener) *SignerValidatorEndpoint { - sc := &SignerValidatorEndpoint{ - listener: listener, - heartbeatPeriod: heartbeatPeriod, - } - - sc.BaseService = *cmn.NewBaseService(logger, "SignerValidatorEndpoint", sc) - - return sc -} - -//-------------------------------------------------------- -// Implement PrivValidator - -// GetPubKey implements PrivValidator. -func (ve *SignerValidatorEndpoint) GetPubKey() crypto.PubKey { - ve.mtx.Lock() - defer ve.mtx.Unlock() - return ve.signer.GetPubKey() -} - -// SignVote implements PrivValidator. -func (ve *SignerValidatorEndpoint) SignVote(chainID string, vote *types.Vote) error { - ve.mtx.Lock() - defer ve.mtx.Unlock() - return ve.signer.SignVote(chainID, vote) -} - -// SignProposal implements PrivValidator. -func (ve *SignerValidatorEndpoint) SignProposal(chainID string, proposal *types.Proposal) error { - ve.mtx.Lock() - defer ve.mtx.Unlock() - return ve.signer.SignProposal(chainID, proposal) -} - -//-------------------------------------------------------- -// More thread safe methods proxied to the signer - -// Ping is used to check connection health. -func (ve *SignerValidatorEndpoint) Ping() error { - ve.mtx.Lock() - defer ve.mtx.Unlock() - return ve.signer.Ping() -} - -// Close closes the underlying net.Conn. -func (ve *SignerValidatorEndpoint) Close() { - ve.mtx.Lock() - defer ve.mtx.Unlock() - if ve.signer != nil { - if err := ve.signer.Close(); err != nil { - ve.Logger.Error("OnStop", "err", err) - } - } - - if ve.listener != nil { - if err := ve.listener.Close(); err != nil { - ve.Logger.Error("OnStop", "err", err) - } - } -} - -//-------------------------------------------------------- -// Service start and stop - -// OnStart implements cmn.Service. -func (ve *SignerValidatorEndpoint) OnStart() error { - if closed, err := ve.reset(); err != nil { - ve.Logger.Error("OnStart", "err", err) - return err - } else if closed { - return fmt.Errorf("listener is closed") - } - - // Start a routine to keep the connection alive - ve.cancelPingCh = make(chan struct{}, 1) - ve.pingTicker = time.NewTicker(ve.heartbeatPeriod) - go func() { - for { - select { - case <-ve.pingTicker.C: - err := ve.Ping() - if err != nil { - ve.Logger.Error("Ping", "err", err) - if err == ErrUnexpectedResponse { - return - } - - closed, err := ve.reset() - if err != nil { - ve.Logger.Error("Reconnecting to remote signer failed", "err", err) - continue - } - if closed { - ve.Logger.Info("listener is closing") - return - } - - ve.Logger.Info("Re-created connection to remote signer", "impl", ve) - } - case <-ve.cancelPingCh: - ve.pingTicker.Stop() - return - } - } - }() - - return nil -} - -// OnStop implements cmn.Service. -func (ve *SignerValidatorEndpoint) OnStop() { - if ve.cancelPingCh != nil { - close(ve.cancelPingCh) - } - ve.Close() -} - -//-------------------------------------------------------- -// Connection and signer management - -// waits to accept and sets a new connection. -// connection is closed in OnStop. -// returns true if the listener is closed -// (ie. it returns a nil conn). -func (ve *SignerValidatorEndpoint) reset() (closed bool, err error) { - ve.mtx.Lock() - defer ve.mtx.Unlock() - - // first check if the conn already exists and close it. - if ve.signer != nil { - if tmpErr := ve.signer.Close(); tmpErr != nil { - ve.Logger.Error("error closing socket val connection during reset", "err", tmpErr) - } - } - - // wait for a new conn - conn, err := ve.acceptConnection() - if err != nil { - return false, err - } - - // listener is closed - if conn == nil { - return true, nil - } - - ve.signer, err = NewSignerRemote(conn) - if err != nil { - // failed to fetch the pubkey. close out the connection. - if tmpErr := conn.Close(); tmpErr != nil { - ve.Logger.Error("error closing connection", "err", tmpErr) - } - return false, err - } - return false, nil -} - -// Attempt to accept a connection. -// Times out after the listener's timeoutAccept -func (ve *SignerValidatorEndpoint) acceptConnection() (net.Conn, error) { - conn, err := ve.listener.Accept() - if err != nil { - if !ve.IsRunning() { - return nil, nil // Ignore error from listener closing. - } - return nil, err - } - return conn, nil -} diff --git a/privval/signer_validator_endpoint_test.go b/privval/signer_validator_endpoint_test.go deleted file mode 100644 index 611e743c9..000000000 --- a/privval/signer_validator_endpoint_test.go +++ /dev/null @@ -1,506 +0,0 @@ -package privval - -import ( - "fmt" - "net" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/tendermint/tendermint/crypto/ed25519" - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - - "github.com/tendermint/tendermint/types" -) - -var ( - testTimeoutAccept = defaultTimeoutAcceptSeconds * time.Second - - testTimeoutReadWrite = 100 * time.Millisecond - testTimeoutReadWrite2o3 = 66 * time.Millisecond // 2/3 of the other one - - testTimeoutHeartbeat = 10 * time.Millisecond - testTimeoutHeartbeat3o2 = 6 * time.Millisecond // 3/2 of the other one -) - -type socketTestCase struct { - addr string - dialer SocketDialer -} - -func socketTestCases(t *testing.T) []socketTestCase { - tcpAddr := fmt.Sprintf("tcp://%s", testFreeTCPAddr(t)) - unixFilePath, err := testUnixAddr() - require.NoError(t, err) - unixAddr := fmt.Sprintf("unix://%s", unixFilePath) - return []socketTestCase{ - { - addr: tcpAddr, - dialer: DialTCPFn(tcpAddr, testTimeoutReadWrite, ed25519.GenPrivKey()), - }, - { - addr: unixAddr, - dialer: DialUnixFn(unixFilePath), - }, - } -} - -func TestSocketPVAddress(t *testing.T) { - for _, tc := range socketTestCases(t) { - // Execute the test within a closure to ensure the deferred statements - // are called between each for loop iteration, for isolated test cases. - func() { - var ( - chainID = cmn.RandStr(12) - validatorEndpoint, serviceEndpoint = testSetupSocketPair(t, chainID, types.NewMockPV(), tc.addr, tc.dialer) - ) - defer validatorEndpoint.Stop() - defer serviceEndpoint.Stop() - - serviceAddr := serviceEndpoint.privVal.GetPubKey().Address() - validatorAddr := validatorEndpoint.GetPubKey().Address() - - assert.Equal(t, serviceAddr, validatorAddr) - }() - } -} - -func TestSocketPVPubKey(t *testing.T) { - for _, tc := range socketTestCases(t) { - func() { - var ( - chainID = cmn.RandStr(12) - validatorEndpoint, serviceEndpoint = testSetupSocketPair( - t, - chainID, - types.NewMockPV(), - tc.addr, - tc.dialer) - ) - defer validatorEndpoint.Stop() - defer serviceEndpoint.Stop() - - clientKey := validatorEndpoint.GetPubKey() - privvalPubKey := serviceEndpoint.privVal.GetPubKey() - - assert.Equal(t, privvalPubKey, clientKey) - }() - } -} - -func TestSocketPVProposal(t *testing.T) { - for _, tc := range socketTestCases(t) { - func() { - var ( - chainID = cmn.RandStr(12) - validatorEndpoint, serviceEndpoint = testSetupSocketPair( - t, - chainID, - types.NewMockPV(), - tc.addr, - tc.dialer) - - ts = time.Now() - privProposal = &types.Proposal{Timestamp: ts} - clientProposal = &types.Proposal{Timestamp: ts} - ) - defer validatorEndpoint.Stop() - defer serviceEndpoint.Stop() - - require.NoError(t, serviceEndpoint.privVal.SignProposal(chainID, privProposal)) - require.NoError(t, validatorEndpoint.SignProposal(chainID, clientProposal)) - - assert.Equal(t, privProposal.Signature, clientProposal.Signature) - }() - } -} - -func TestSocketPVVote(t *testing.T) { - for _, tc := range socketTestCases(t) { - func() { - var ( - chainID = cmn.RandStr(12) - validatorEndpoint, serviceEndpoint = testSetupSocketPair( - t, - chainID, - types.NewMockPV(), - tc.addr, - tc.dialer) - - ts = time.Now() - vType = types.PrecommitType - want = &types.Vote{Timestamp: ts, Type: vType} - have = &types.Vote{Timestamp: ts, Type: vType} - ) - defer validatorEndpoint.Stop() - defer serviceEndpoint.Stop() - - require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want)) - require.NoError(t, validatorEndpoint.SignVote(chainID, have)) - assert.Equal(t, want.Signature, have.Signature) - }() - } -} - -func TestSocketPVVoteResetDeadline(t *testing.T) { - for _, tc := range socketTestCases(t) { - func() { - var ( - chainID = cmn.RandStr(12) - validatorEndpoint, serviceEndpoint = testSetupSocketPair( - t, - chainID, - types.NewMockPV(), - tc.addr, - tc.dialer) - - ts = time.Now() - vType = types.PrecommitType - want = &types.Vote{Timestamp: ts, Type: vType} - have = &types.Vote{Timestamp: ts, Type: vType} - ) - defer validatorEndpoint.Stop() - defer serviceEndpoint.Stop() - - time.Sleep(testTimeoutReadWrite2o3) - - require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want)) - require.NoError(t, validatorEndpoint.SignVote(chainID, have)) - assert.Equal(t, want.Signature, have.Signature) - - // This would exceed the deadline if it was not extended by the previous message - time.Sleep(testTimeoutReadWrite2o3) - - require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want)) - require.NoError(t, validatorEndpoint.SignVote(chainID, have)) - assert.Equal(t, want.Signature, have.Signature) - }() - } -} - -func TestSocketPVVoteKeepalive(t *testing.T) { - for _, tc := range socketTestCases(t) { - func() { - var ( - chainID = cmn.RandStr(12) - validatorEndpoint, serviceEndpoint = testSetupSocketPair( - t, - chainID, - types.NewMockPV(), - tc.addr, - tc.dialer) - - ts = time.Now() - vType = types.PrecommitType - want = &types.Vote{Timestamp: ts, Type: vType} - have = &types.Vote{Timestamp: ts, Type: vType} - ) - defer validatorEndpoint.Stop() - defer serviceEndpoint.Stop() - - time.Sleep(testTimeoutReadWrite * 2) - - require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want)) - require.NoError(t, validatorEndpoint.SignVote(chainID, have)) - assert.Equal(t, want.Signature, have.Signature) - }() - } -} - -func TestSocketPVDeadline(t *testing.T) { - for _, tc := range socketTestCases(t) { - func() { - var ( - listenc = make(chan struct{}) - thisConnTimeout = 100 * time.Millisecond - validatorEndpoint = newSignerValidatorEndpoint(log.TestingLogger(), tc.addr, thisConnTimeout) - ) - - go func(sc *SignerValidatorEndpoint) { - defer close(listenc) - - // Note: the TCP connection times out at the accept() phase, - // whereas the Unix domain sockets connection times out while - // attempting to fetch the remote signer's public key. - assert.True(t, IsConnTimeout(sc.Start())) - - assert.False(t, sc.IsRunning()) - }(validatorEndpoint) - - for { - _, err := cmn.Connect(tc.addr) - if err == nil { - break - } - } - - <-listenc - }() - } -} - -func TestRemoteSignVoteErrors(t *testing.T) { - for _, tc := range socketTestCases(t) { - func() { - var ( - chainID = cmn.RandStr(12) - validatorEndpoint, serviceEndpoint = testSetupSocketPair( - t, - chainID, - types.NewErroringMockPV(), - tc.addr, - tc.dialer) - - ts = time.Now() - vType = types.PrecommitType - vote = &types.Vote{Timestamp: ts, Type: vType} - ) - defer validatorEndpoint.Stop() - defer serviceEndpoint.Stop() - - err := validatorEndpoint.SignVote("", vote) - require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error()) - - err = serviceEndpoint.privVal.SignVote(chainID, vote) - require.Error(t, err) - err = validatorEndpoint.SignVote(chainID, vote) - require.Error(t, err) - }() - } -} - -func TestRemoteSignProposalErrors(t *testing.T) { - for _, tc := range socketTestCases(t) { - func() { - var ( - chainID = cmn.RandStr(12) - validatorEndpoint, serviceEndpoint = testSetupSocketPair( - t, - chainID, - types.NewErroringMockPV(), - tc.addr, - tc.dialer) - - ts = time.Now() - proposal = &types.Proposal{Timestamp: ts} - ) - defer validatorEndpoint.Stop() - defer serviceEndpoint.Stop() - - err := validatorEndpoint.SignProposal("", proposal) - require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error()) - - err = serviceEndpoint.privVal.SignProposal(chainID, proposal) - require.Error(t, err) - - err = validatorEndpoint.SignProposal(chainID, proposal) - require.Error(t, err) - }() - } -} - -func TestErrUnexpectedResponse(t *testing.T) { - for _, tc := range socketTestCases(t) { - func() { - var ( - logger = log.TestingLogger() - chainID = cmn.RandStr(12) - readyCh = make(chan struct{}) - errCh = make(chan error, 1) - - serviceEndpoint = NewSignerServiceEndpoint( - logger, - chainID, - types.NewMockPV(), - tc.dialer, - ) - - validatorEndpoint = newSignerValidatorEndpoint( - logger, - tc.addr, - testTimeoutReadWrite) - ) - - testStartEndpoint(t, readyCh, validatorEndpoint) - defer validatorEndpoint.Stop() - SignerServiceEndpointTimeoutReadWrite(time.Millisecond)(serviceEndpoint) - SignerServiceEndpointConnRetries(100)(serviceEndpoint) - // we do not want to Start() the remote signer here and instead use the connection to - // reply with intentionally wrong replies below: - rsConn, err := serviceEndpoint.connect() - require.NoError(t, err) - require.NotNil(t, rsConn) - defer rsConn.Close() - - // send over public key to get the remote signer running: - go testReadWriteResponse(t, &PubKeyResponse{}, rsConn) - <-readyCh - - // Proposal: - go func(errc chan error) { - errc <- validatorEndpoint.SignProposal(chainID, &types.Proposal{}) - }(errCh) - - // read request and write wrong response: - go testReadWriteResponse(t, &SignedVoteResponse{}, rsConn) - err = <-errCh - require.Error(t, err) - require.Equal(t, err, ErrUnexpectedResponse) - - // Vote: - go func(errc chan error) { - errc <- validatorEndpoint.SignVote(chainID, &types.Vote{}) - }(errCh) - // read request and write wrong response: - go testReadWriteResponse(t, &SignedProposalResponse{}, rsConn) - err = <-errCh - require.Error(t, err) - require.Equal(t, err, ErrUnexpectedResponse) - }() - } -} - -func TestRetryConnToRemoteSigner(t *testing.T) { - for _, tc := range socketTestCases(t) { - func() { - var ( - logger = log.TestingLogger() - chainID = cmn.RandStr(12) - readyCh = make(chan struct{}) - - serviceEndpoint = NewSignerServiceEndpoint( - logger, - chainID, - types.NewMockPV(), - tc.dialer, - ) - thisConnTimeout = testTimeoutReadWrite - validatorEndpoint = newSignerValidatorEndpoint(logger, tc.addr, thisConnTimeout) - ) - // Ping every: - SignerValidatorEndpointSetHeartbeat(testTimeoutHeartbeat)(validatorEndpoint) - - SignerServiceEndpointTimeoutReadWrite(testTimeoutReadWrite)(serviceEndpoint) - SignerServiceEndpointConnRetries(10)(serviceEndpoint) - - testStartEndpoint(t, readyCh, validatorEndpoint) - defer validatorEndpoint.Stop() - require.NoError(t, serviceEndpoint.Start()) - assert.True(t, serviceEndpoint.IsRunning()) - - <-readyCh - time.Sleep(testTimeoutHeartbeat * 2) - - serviceEndpoint.Stop() - rs2 := NewSignerServiceEndpoint( - logger, - chainID, - types.NewMockPV(), - tc.dialer, - ) - // let some pings pass - time.Sleep(testTimeoutHeartbeat3o2) - require.NoError(t, rs2.Start()) - assert.True(t, rs2.IsRunning()) - defer rs2.Stop() - - // give the client some time to re-establish the conn to the remote signer - // should see sth like this in the logs: - // - // E[10016-01-10|17:12:46.128] Ping err="remote signer timed out" - // I[10016-01-10|17:16:42.447] Re-created connection to remote signer impl=SocketVal - time.Sleep(testTimeoutReadWrite * 2) - }() - } -} - -func newSignerValidatorEndpoint(logger log.Logger, addr string, timeoutReadWrite time.Duration) *SignerValidatorEndpoint { - proto, address := cmn.ProtocolAndAddress(addr) - - ln, err := net.Listen(proto, address) - logger.Info("Listening at", "proto", proto, "address", address) - if err != nil { - panic(err) - } - - var listener net.Listener - - if proto == "unix" { - unixLn := NewUnixListener(ln) - UnixListenerTimeoutAccept(testTimeoutAccept)(unixLn) - UnixListenerTimeoutReadWrite(timeoutReadWrite)(unixLn) - listener = unixLn - } else { - tcpLn := NewTCPListener(ln, ed25519.GenPrivKey()) - TCPListenerTimeoutAccept(testTimeoutAccept)(tcpLn) - TCPListenerTimeoutReadWrite(timeoutReadWrite)(tcpLn) - listener = tcpLn - } - - return NewSignerValidatorEndpoint(logger, listener) -} - -func testSetupSocketPair( - t *testing.T, - chainID string, - privValidator types.PrivValidator, - addr string, - socketDialer SocketDialer, -) (*SignerValidatorEndpoint, *SignerServiceEndpoint) { - var ( - logger = log.TestingLogger() - privVal = privValidator - readyc = make(chan struct{}) - serviceEndpoint = NewSignerServiceEndpoint( - logger, - chainID, - privVal, - socketDialer, - ) - - thisConnTimeout = testTimeoutReadWrite - validatorEndpoint = newSignerValidatorEndpoint(logger, addr, thisConnTimeout) - ) - - SignerValidatorEndpointSetHeartbeat(testTimeoutHeartbeat)(validatorEndpoint) - SignerServiceEndpointTimeoutReadWrite(testTimeoutReadWrite)(serviceEndpoint) - SignerServiceEndpointConnRetries(1e6)(serviceEndpoint) - - testStartEndpoint(t, readyc, validatorEndpoint) - - require.NoError(t, serviceEndpoint.Start()) - assert.True(t, serviceEndpoint.IsRunning()) - - <-readyc - - return validatorEndpoint, serviceEndpoint -} - -func testReadWriteResponse(t *testing.T, resp RemoteSignerMsg, rsConn net.Conn) { - _, err := readMsg(rsConn) - require.NoError(t, err) - - err = writeMsg(rsConn, resp) - require.NoError(t, err) -} - -func testStartEndpoint(t *testing.T, readyCh chan struct{}, sc *SignerValidatorEndpoint) { - go func(sc *SignerValidatorEndpoint) { - require.NoError(t, sc.Start()) - assert.True(t, sc.IsRunning()) - - readyCh <- struct{}{} - }(sc) -} - -// testFreeTCPAddr claims a free port so we don't block on listener being ready. -func testFreeTCPAddr(t *testing.T) string { - ln, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - defer ln.Close() - - return fmt.Sprintf("127.0.0.1:%d", ln.Addr().(*net.TCPAddr).Port) -} diff --git a/privval/socket_dialers_test.go b/privval/socket_dialers_test.go index 9d5d5cc2b..c77261bc5 100644 --- a/privval/socket_dialers_test.go +++ b/privval/socket_dialers_test.go @@ -1,26 +1,49 @@ package privval import ( + "fmt" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/crypto/ed25519" cmn "github.com/tendermint/tendermint/libs/common" ) +func getDialerTestCases(t *testing.T) []dialerTestCase { + tcpAddr := GetFreeLocalhostAddrPort() + unixFilePath, err := testUnixAddr() + require.NoError(t, err) + unixAddr := fmt.Sprintf("unix://%s", unixFilePath) + + return []dialerTestCase{ + { + addr: tcpAddr, + dialer: DialTCPFn(tcpAddr, testTimeoutReadWrite, ed25519.GenPrivKey()), + }, + { + addr: unixAddr, + dialer: DialUnixFn(unixFilePath), + }, + } +} + func TestIsConnTimeoutForFundamentalTimeouts(t *testing.T) { // Generate a networking timeout - dialer := DialTCPFn(testFreeTCPAddr(t), time.Millisecond, ed25519.GenPrivKey()) + tcpAddr := GetFreeLocalhostAddrPort() + dialer := DialTCPFn(tcpAddr, time.Millisecond, ed25519.GenPrivKey()) _, err := dialer() assert.Error(t, err) assert.True(t, IsConnTimeout(err)) } func TestIsConnTimeoutForWrappedConnTimeouts(t *testing.T) { - dialer := DialTCPFn(testFreeTCPAddr(t), time.Millisecond, ed25519.GenPrivKey()) + tcpAddr := GetFreeLocalhostAddrPort() + dialer := DialTCPFn(tcpAddr, time.Millisecond, ed25519.GenPrivKey()) _, err := dialer() assert.Error(t, err) - err = cmn.ErrorWrap(ErrConnTimeout, err.Error()) + err = cmn.ErrorWrap(ErrConnectionTimeout, err.Error()) assert.True(t, IsConnTimeout(err)) } diff --git a/privval/socket_listeners.go b/privval/socket_listeners.go index 7c8835791..f4d875e71 100644 --- a/privval/socket_listeners.go +++ b/privval/socket_listeners.go @@ -9,8 +9,8 @@ import ( ) const ( - defaultTimeoutAcceptSeconds = 3 - defaultTimeoutReadWriteSeconds = 3 + defaultTimeoutAcceptSeconds = 3 + defaultPingPeriodMilliseconds = 100 ) // timeoutError can be used to check if an error returned from the netp package diff --git a/privval/utils.go b/privval/utils.go index d8837bdf0..a707e2ee4 100644 --- a/privval/utils.go +++ b/privval/utils.go @@ -1,7 +1,12 @@ package privval import ( + "fmt" + "net" + + "github.com/tendermint/tendermint/crypto/ed25519" cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" ) // IsConnTimeout returns a boolean indicating whether the error is known to @@ -9,7 +14,7 @@ import ( // network timeouts, as well as ErrConnTimeout errors. func IsConnTimeout(err error) bool { if cmnErr, ok := err.(cmn.Error); ok { - if cmnErr.Data() == ErrConnTimeout { + if cmnErr.Data() == ErrConnectionTimeout { return true } } @@ -18,3 +23,39 @@ func IsConnTimeout(err error) bool { } return false } + +// NewSignerListener creates a new SignerListenerEndpoint using the corresponding listen address +func NewSignerListener(listenAddr string, logger log.Logger) (*SignerListenerEndpoint, error) { + var listener net.Listener + + protocol, address := cmn.ProtocolAndAddress(listenAddr) + ln, err := net.Listen(protocol, address) + if err != nil { + return nil, err + } + switch protocol { + case "unix": + listener = NewUnixListener(ln) + case "tcp": + // TODO: persist this key so external signer can actually authenticate us + listener = NewTCPListener(ln, ed25519.GenPrivKey()) + default: + return nil, fmt.Errorf( + "wrong listen address: expected either 'tcp' or 'unix' protocols, got %s", + protocol, + ) + } + + pve := NewSignerListenerEndpoint(logger.With("module", "privval"), listener) + + return pve, nil +} + +// GetFreeLocalhostAddrPort returns a free localhost:port address +func GetFreeLocalhostAddrPort() string { + port, err := cmn.GetFreePort() + if err != nil { + panic(err) + } + return fmt.Sprintf("127.0.0.1:%d", port) +} diff --git a/privval/utils_test.go b/privval/utils_test.go index 23f6f6a3b..b07186f6c 100644 --- a/privval/utils_test.go +++ b/privval/utils_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + cmn "github.com/tendermint/tendermint/libs/common" ) diff --git a/tools/tm-signer-harness/internal/test_harness.go b/tools/tm-signer-harness/internal/test_harness.go index 7fefdfb42..216cf6851 100644 --- a/tools/tm-signer-harness/internal/test_harness.go +++ b/tools/tm-signer-harness/internal/test_harness.go @@ -49,7 +49,7 @@ var _ error = (*TestHarnessError)(nil) // with this version of Tendermint. type TestHarness struct { addr string - spv *privval.SignerValidatorEndpoint + signerClient *privval.SignerClient fpv *privval.FilePV chainID string acceptRetries int @@ -101,14 +101,19 @@ func NewTestHarness(logger log.Logger, cfg TestHarnessConfig) (*TestHarness, err } logger.Info("Loaded genesis file", "chainID", st.ChainID) - spv, err := newTestHarnessSocketVal(logger, cfg) + spv, err := newTestHarnessListener(logger, cfg) + if err != nil { + return nil, newTestHarnessError(ErrFailedToCreateListener, err, "") + } + + signerClient, err := privval.NewSignerClient(spv) if err != nil { return nil, newTestHarnessError(ErrFailedToCreateListener, err, "") } return &TestHarness{ addr: cfg.BindAddr, - spv: spv, + signerClient: signerClient, fpv: fpv, chainID: st.ChainID, acceptRetries: cfg.AcceptRetries, @@ -135,9 +140,11 @@ func (th *TestHarness) Run() { th.logger.Info("Starting test harness") accepted := false var startErr error + for acceptRetries := th.acceptRetries; acceptRetries > 0; acceptRetries-- { th.logger.Info("Attempting to accept incoming connection", "acceptRetries", acceptRetries) - if err := th.spv.Start(); err != nil { + + if err := th.signerClient.WaitForConnection(10 * time.Millisecond); err != nil { // if it wasn't a timeout error if _, ok := err.(timeoutError); !ok { th.logger.Error("Failed to start listener", "err", err) @@ -149,6 +156,7 @@ func (th *TestHarness) Run() { } startErr = err } else { + th.logger.Info("Accepted external connection") accepted = true break } @@ -182,8 +190,8 @@ func (th *TestHarness) Run() { func (th *TestHarness) TestPublicKey() error { th.logger.Info("TEST: Public key of remote signer") th.logger.Info("Local", "pubKey", th.fpv.GetPubKey()) - th.logger.Info("Remote", "pubKey", th.spv.GetPubKey()) - if th.fpv.GetPubKey() != th.spv.GetPubKey() { + th.logger.Info("Remote", "pubKey", th.signerClient.GetPubKey()) + if th.fpv.GetPubKey() != th.signerClient.GetPubKey() { th.logger.Error("FAILED: Local and remote public keys do not match") return newTestHarnessError(ErrTestPublicKeyFailed, nil, "") } @@ -211,7 +219,7 @@ func (th *TestHarness) TestSignProposal() error { Timestamp: time.Now(), } propBytes := prop.SignBytes(th.chainID) - if err := th.spv.SignProposal(th.chainID, prop); err != nil { + if err := th.signerClient.SignProposal(th.chainID, prop); err != nil { th.logger.Error("FAILED: Signing of proposal", "err", err) return newTestHarnessError(ErrTestSignProposalFailed, err, "") } @@ -222,7 +230,7 @@ func (th *TestHarness) TestSignProposal() error { return newTestHarnessError(ErrTestSignProposalFailed, err, "") } // now validate the signature on the proposal - if th.spv.GetPubKey().VerifyBytes(propBytes, prop.Signature) { + if th.signerClient.GetPubKey().VerifyBytes(propBytes, prop.Signature) { th.logger.Info("Successfully validated proposal signature") } else { th.logger.Error("FAILED: Proposal signature validation failed") @@ -255,7 +263,7 @@ func (th *TestHarness) TestSignVote() error { } voteBytes := vote.SignBytes(th.chainID) // sign the vote - if err := th.spv.SignVote(th.chainID, vote); err != nil { + if err := th.signerClient.SignVote(th.chainID, vote); err != nil { th.logger.Error("FAILED: Signing of vote", "err", err) return newTestHarnessError(ErrTestSignVoteFailed, err, fmt.Sprintf("voteType=%d", voteType)) } @@ -266,7 +274,7 @@ func (th *TestHarness) TestSignVote() error { return newTestHarnessError(ErrTestSignVoteFailed, err, fmt.Sprintf("voteType=%d", voteType)) } // now validate the signature on the proposal - if th.spv.GetPubKey().VerifyBytes(voteBytes, vote.Signature) { + if th.signerClient.GetPubKey().VerifyBytes(voteBytes, vote.Signature) { th.logger.Info("Successfully validated vote signature", "type", voteType) } else { th.logger.Error("FAILED: Vote signature validation failed", "type", voteType) @@ -301,10 +309,9 @@ func (th *TestHarness) Shutdown(err error) { }() } - if th.spv.IsRunning() { - if err := th.spv.Stop(); err != nil { - th.logger.Error("Failed to cleanly stop listener: %s", err.Error()) - } + err = th.signerClient.Close() + if err != nil { + th.logger.Error("Failed to cleanly stop listener: %s", err.Error()) } if th.exitWhenComplete { @@ -312,9 +319,8 @@ func (th *TestHarness) Shutdown(err error) { } } -// newTestHarnessSocketVal creates our client instance which we will use for -// testing. -func newTestHarnessSocketVal(logger log.Logger, cfg TestHarnessConfig) (*privval.SignerValidatorEndpoint, error) { +// newTestHarnessListener creates our client instance which we will use for testing. +func newTestHarnessListener(logger log.Logger, cfg TestHarnessConfig) (*privval.SignerListenerEndpoint, error) { proto, addr := cmn.ProtocolAndAddress(cfg.BindAddr) if proto == "unix" { // make sure the socket doesn't exist - if so, try to delete it @@ -329,7 +335,7 @@ func newTestHarnessSocketVal(logger log.Logger, cfg TestHarnessConfig) (*privval if err != nil { return nil, err } - logger.Info("Listening at", "proto", proto, "addr", addr) + logger.Info("Listening", "proto", proto, "addr", addr) var svln net.Listener switch proto { case "unix": @@ -347,7 +353,7 @@ func newTestHarnessSocketVal(logger log.Logger, cfg TestHarnessConfig) (*privval logger.Error("Unsupported protocol (must be unix:// or tcp://)", "proto", proto) return nil, newTestHarnessError(ErrInvalidParameters, nil, fmt.Sprintf("Unsupported protocol: %s", proto)) } - return privval.NewSignerValidatorEndpoint(logger, svln), nil + return privval.NewSignerListenerEndpoint(logger, svln), nil } func newTestHarnessError(code int, err error, info string) *TestHarnessError { diff --git a/tools/tm-signer-harness/internal/test_harness_test.go b/tools/tm-signer-harness/internal/test_harness_test.go index c249bd2b6..47e510666 100644 --- a/tools/tm-signer-harness/internal/test_harness_test.go +++ b/tools/tm-signer-harness/internal/test_harness_test.go @@ -3,19 +3,18 @@ package internal import ( "fmt" "io/ioutil" - "net" "os" "testing" "time" - "github.com/tendermint/tendermint/crypto" - "github.com/tendermint/tendermint/privval" - "github.com/tendermint/tendermint/types" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/privval" + "github.com/tendermint/tendermint/types" ) const ( @@ -85,8 +84,8 @@ func TestRemoteSignerTestHarnessMaxAcceptRetriesReached(t *testing.T) { func TestRemoteSignerTestHarnessSuccessfulRun(t *testing.T) { harnessTest( t, - func(th *TestHarness) *privval.SignerServiceEndpoint { - return newMockRemoteSigner(t, th, th.fpv.Key.PrivKey, false, false) + func(th *TestHarness) *privval.SignerServer { + return newMockSignerServer(t, th, th.fpv.Key.PrivKey, false, false) }, NoError, ) @@ -95,8 +94,8 @@ func TestRemoteSignerTestHarnessSuccessfulRun(t *testing.T) { func TestRemoteSignerPublicKeyCheckFailed(t *testing.T) { harnessTest( t, - func(th *TestHarness) *privval.SignerServiceEndpoint { - return newMockRemoteSigner(t, th, ed25519.GenPrivKey(), false, false) + func(th *TestHarness) *privval.SignerServer { + return newMockSignerServer(t, th, ed25519.GenPrivKey(), false, false) }, ErrTestPublicKeyFailed, ) @@ -105,8 +104,8 @@ func TestRemoteSignerPublicKeyCheckFailed(t *testing.T) { func TestRemoteSignerProposalSigningFailed(t *testing.T) { harnessTest( t, - func(th *TestHarness) *privval.SignerServiceEndpoint { - return newMockRemoteSigner(t, th, th.fpv.Key.PrivKey, true, false) + func(th *TestHarness) *privval.SignerServer { + return newMockSignerServer(t, th, th.fpv.Key.PrivKey, true, false) }, ErrTestSignProposalFailed, ) @@ -115,28 +114,30 @@ func TestRemoteSignerProposalSigningFailed(t *testing.T) { func TestRemoteSignerVoteSigningFailed(t *testing.T) { harnessTest( t, - func(th *TestHarness) *privval.SignerServiceEndpoint { - return newMockRemoteSigner(t, th, th.fpv.Key.PrivKey, false, true) + func(th *TestHarness) *privval.SignerServer { + return newMockSignerServer(t, th, th.fpv.Key.PrivKey, false, true) }, ErrTestSignVoteFailed, ) } -func newMockRemoteSigner(t *testing.T, th *TestHarness, privKey crypto.PrivKey, breakProposalSigning bool, breakVoteSigning bool) *privval.SignerServiceEndpoint { - return privval.NewSignerServiceEndpoint( +func newMockSignerServer(t *testing.T, th *TestHarness, privKey crypto.PrivKey, breakProposalSigning bool, breakVoteSigning bool) *privval.SignerServer { + mockPV := types.NewMockPVWithParams(privKey, breakProposalSigning, breakVoteSigning) + + dialerEndpoint := privval.NewSignerDialerEndpoint( th.logger, - th.chainID, - types.NewMockPVWithParams(privKey, breakProposalSigning, breakVoteSigning), privval.DialTCPFn( th.addr, time.Duration(defaultConnDeadline)*time.Millisecond, ed25519.GenPrivKey(), ), ) + + return privval.NewSignerServer(dialerEndpoint, th.chainID, mockPV) } // For running relatively standard tests. -func harnessTest(t *testing.T, rsMaker func(th *TestHarness) *privval.SignerServiceEndpoint, expectedExitCode int) { +func harnessTest(t *testing.T, signerServerMaker func(th *TestHarness) *privval.SignerServer, expectedExitCode int) { cfg := makeConfig(t, 100, 3) defer cleanup(cfg) @@ -148,10 +149,10 @@ func harnessTest(t *testing.T, rsMaker func(th *TestHarness) *privval.SignerServ th.Run() }() - rs := rsMaker(th) - require.NoError(t, rs.Start()) - assert.True(t, rs.IsRunning()) - defer rs.Stop() + ss := signerServerMaker(th) + require.NoError(t, ss.Start()) + assert.True(t, ss.IsRunning()) + defer ss.Stop() <-donec assert.Equal(t, expectedExitCode, th.exitCode) @@ -159,7 +160,7 @@ func harnessTest(t *testing.T, rsMaker func(th *TestHarness) *privval.SignerServ func makeConfig(t *testing.T, acceptDeadline, acceptRetries int) TestHarnessConfig { return TestHarnessConfig{ - BindAddr: testFreeTCPAddr(t), + BindAddr: privval.GetFreeLocalhostAddrPort(), KeyFile: makeTempFile("tm-testharness-keyfile", keyFileContents), StateFile: makeTempFile("tm-testharness-statefile", stateFileContents), GenesisFile: makeTempFile("tm-testharness-genesisfile", genesisFileContents), @@ -191,12 +192,3 @@ func makeTempFile(name, content string) string { } return tempFile.Name() } - -// testFreeTCPAddr claims a free port so we don't block on listener being ready. -func testFreeTCPAddr(t *testing.T) string { - ln, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - defer ln.Close() - - return fmt.Sprintf("127.0.0.1:%d", ln.Addr().(*net.TCPAddr).Port) -} diff --git a/types/priv_validator.go b/types/priv_validator.go index 8acab243a..45d0a67b5 100644 --- a/types/priv_validator.go +++ b/types/priv_validator.go @@ -12,6 +12,7 @@ import ( // PrivValidator defines the functionality of a local Tendermint validator // that signs votes and proposals, and never double signs. type PrivValidator interface { + // TODO: Extend the interface to return errors too. Issue: https://github.com/tendermint/tendermint/issues/3602 GetPubKey() crypto.PubKey SignVote(chainID string, vote *Vote) error