Browse Source

Merge pull request #1237 from tendermint/feature/priv_val_socket_client

privVal: Integrate socket client
pull/1203/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
edb871f514
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 217 additions and 175 deletions
  1. +0
    -1
      cmd/tendermint/commands/run_node.go
  2. +15
    -20
      config/config.go
  3. +7
    -5
      node/node.go
  4. +90
    -60
      types/priv_validator/socket.go
  5. +105
    -89
      types/priv_validator/socket_test.go

+ 0
- 1
cmd/tendermint/commands/run_node.go View File

@ -16,7 +16,6 @@ func AddNodeFlags(cmd *cobra.Command) {
// priv val flags // priv val flags
cmd.Flags().String("priv_validator_addr", config.PrivValidatorAddr, "Socket address for private validator") cmd.Flags().String("priv_validator_addr", config.PrivValidatorAddr, "Socket address for private validator")
cmd.Flags().Int("priv_validator_max_conn", config.PrivValidatorMaxConn, "Limit of concurrent connections to the PrivValidator")
// node flags // node flags
cmd.Flags().Bool("fast_sync", config.FastSync, "Fast blockchain syncing") cmd.Flags().Bool("fast_sync", config.FastSync, "Fast blockchain syncing")


+ 15
- 20
config/config.go View File

@ -21,10 +21,9 @@ var (
defaultConfigFileName = "config.toml" defaultConfigFileName = "config.toml"
defaultGenesisJSONName = "genesis.json" defaultGenesisJSONName = "genesis.json"
defaultPrivValName = "priv_validator.json"
defaultPrivValMaxConn = 3
defaultNodeKeyName = "node_key.json"
defaultAddrBookName = "addrbook.json"
defaultPrivValName = "priv_validator.json"
defaultNodeKeyName = "node_key.json"
defaultAddrBookName = "addrbook.json"
defaultConfigFilePath = filepath.Join(defaultConfigDir, defaultConfigFileName) defaultConfigFilePath = filepath.Join(defaultConfigDir, defaultConfigFileName)
defaultGenesisJSONPath = filepath.Join(defaultConfigDir, defaultGenesisJSONName) defaultGenesisJSONPath = filepath.Join(defaultConfigDir, defaultGenesisJSONName)
@ -108,9 +107,6 @@ type BaseConfig struct {
// TCP or UNIX socket address of the PrivValidator server // TCP or UNIX socket address of the PrivValidator server
PrivValidatorAddr string `mapstructure:"priv_validator_addr"` PrivValidatorAddr string `mapstructure:"priv_validator_addr"`
// Limit of concurrent connections to the PrivValidator.
PrivValidatorMaxConn int `mapstructure:"priv_validator_max_conn"`
// TCP or UNIX socket address of the ABCI application, // TCP or UNIX socket address of the ABCI application,
// or the name of an ABCI application compiled in with the Tendermint binary // or the name of an ABCI application compiled in with the Tendermint binary
ProxyApp string `mapstructure:"proxy_app"` ProxyApp string `mapstructure:"proxy_app"`
@ -147,19 +143,18 @@ func (c BaseConfig) ChainID() string {
// DefaultBaseConfig returns a default base configuration for a Tendermint node // DefaultBaseConfig returns a default base configuration for a Tendermint node
func DefaultBaseConfig() BaseConfig { func DefaultBaseConfig() BaseConfig {
return BaseConfig{ return BaseConfig{
Genesis: defaultGenesisJSONPath,
PrivValidator: defaultPrivValPath,
PrivValidatorMaxConn: defaultPrivValMaxConn,
NodeKey: defaultNodeKeyPath,
Moniker: defaultMoniker,
ProxyApp: "tcp://127.0.0.1:46658",
ABCI: "socket",
LogLevel: DefaultPackageLogLevels(),
ProfListenAddress: "",
FastSync: true,
FilterPeers: false,
DBBackend: "leveldb",
DBPath: "data",
Genesis: defaultGenesisJSONPath,
PrivValidator: defaultPrivValPath,
NodeKey: defaultNodeKeyPath,
Moniker: defaultMoniker,
ProxyApp: "tcp://127.0.0.1:46658",
ABCI: "socket",
LogLevel: DefaultPackageLogLevels(),
ProfListenAddress: "",
FastSync: true,
FilterPeers: false,
DBBackend: "leveldb",
DBPath: "data",
} }
} }


+ 7
- 5
node/node.go View File

@ -173,20 +173,22 @@ func NewNode(config *cfg.Config,
// reload the state (it may have been updated by the handshake) // reload the state (it may have been updated by the handshake)
state = sm.LoadState(stateDB) state = sm.LoadState(stateDB)
// Connect to external signing process, if an address is provided.
if config.PrivValidatorAddr != "" { if config.PrivValidatorAddr != "" {
var ( var (
privKey = crypto.GenPrivKeyEd25519() privKey = crypto.GenPrivKeyEd25519()
pvss = priv_val.NewPrivValidatorSocketServer(
pvsc = priv_val.NewSocketClient(
logger.With("module", "priv_val"), logger.With("module", "priv_val"),
config.ChainID(),
config.PrivValidatorAddr, config.PrivValidatorAddr,
config.PrivValidatorMaxConn,
priv_val.LoadPrivValidatorJSON(config.PrivValidatorFile()),
&privKey, &privKey,
) )
) )
pvss.Start()
if err := pvsc.Start(); err != nil {
return nil, fmt.Errorf("Error starting private validator client: %v", err)
}
privValidator = pvsc
} }
// Decide whether to fast-sync or not // Decide whether to fast-sync or not


+ 90
- 60
types/priv_validator/socket.go View File

@ -18,9 +18,9 @@ import (
) )
const ( const (
connDeadlineSeconds = 3
dialRetryIntervalSeconds = 1
dialRetryMax = 10
defaultConnDeadlineSeconds = 3
defaultDialRetryIntervalSeconds = 1
defaultDialRetryMax = 10
) )
// Socket errors. // Socket errors.
@ -29,70 +29,89 @@ var (
) )
var ( var (
connDeadline = time.Second * connDeadlineSeconds
connDeadline = time.Second * defaultConnDeadlineSeconds
) )
//-----------------------------------------------------------------
// SocketClientOption sets an optional parameter on the SocketClient.
type SocketClientOption func(*socketClient)
var _ types.PrivValidator2 = (*PrivValidatorSocketClient)(nil)
// SocketClientTimeout sets the timeout for connecting to the external socket
// address.
func SocketClientTimeout(timeout time.Duration) SocketClientOption {
return func(sc *socketClient) { sc.connectTimeout = timeout }
}
// PrivValidatorSocketClient implements PrivValidator.
// It uses a socket to request signatures.
type PrivValidatorSocketClient struct {
// socketClient implements PrivValidator, it uses a socket to request signatures
// from an external process.
type socketClient struct {
cmn.BaseService cmn.BaseService
conn net.Conn conn net.Conn
privKey *crypto.PrivKeyEd25519 privKey *crypto.PrivKeyEd25519
ID types.ValidatorID
SocketAddress string
addr string
connectTimeout time.Duration
} }
// NewPrivValidatorSocketClient returns an instance of
// PrivValidatorSocketClient.
func NewPrivValidatorSocketClient(
// Check that socketClient implements PrivValidator2.
var _ types.PrivValidator2 = (*socketClient)(nil)
// NewsocketClient returns an instance of socketClient.
func NewSocketClient(
logger log.Logger, logger log.Logger,
socketAddr string, socketAddr string,
privKey *crypto.PrivKeyEd25519, privKey *crypto.PrivKeyEd25519,
) *PrivValidatorSocketClient {
pvsc := &PrivValidatorSocketClient{
SocketAddress: socketAddr,
privKey: privKey,
) *socketClient {
sc := &socketClient{
addr: socketAddr,
connectTimeout: time.Second * defaultConnDeadlineSeconds,
privKey: privKey,
} }
pvsc.BaseService = *cmn.NewBaseService(logger, "privValidatorSocketClient", pvsc)
sc.BaseService = *cmn.NewBaseService(logger, "privValidatorsocketClient", sc)
return pvsc
return sc
} }
// OnStart implements cmn.Service. // OnStart implements cmn.Service.
func (pvsc *PrivValidatorSocketClient) OnStart() error {
if err := pvsc.BaseService.OnStart(); err != nil {
func (sc *socketClient) OnStart() error {
if err := sc.BaseService.OnStart(); err != nil {
return err return err
} }
conn, err := pvsc.connect()
conn, err := sc.connect()
if err != nil { if err != nil {
return err return err
} }
pvsc.conn = conn
sc.conn = conn
return nil return nil
} }
// OnStop implements cmn.Service. // OnStop implements cmn.Service.
func (pvsc *PrivValidatorSocketClient) OnStop() {
pvsc.BaseService.OnStop()
func (sc *socketClient) OnStop() {
sc.BaseService.OnStop()
if pvsc.conn != nil {
pvsc.conn.Close()
if sc.conn != nil {
sc.conn.Close()
} }
} }
// GetAddress implements PrivValidator.
// TODO(xla): Remove when PrivValidator2 replaced PrivValidator.
func (sc *socketClient) GetAddress() types.Address {
addr, err := sc.Address()
if err != nil {
panic(err)
}
return addr
}
// Address is an alias for PubKey().Address(). // Address is an alias for PubKey().Address().
func (pvsc *PrivValidatorSocketClient) Address() (cmn.HexBytes, error) {
p, err := pvsc.PubKey()
func (sc *socketClient) Address() (cmn.HexBytes, error) {
p, err := sc.PubKey()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -100,14 +119,25 @@ func (pvsc *PrivValidatorSocketClient) Address() (cmn.HexBytes, error) {
return p.Address(), nil return p.Address(), nil
} }
// PubKey implements PrivValidator.
func (pvsc *PrivValidatorSocketClient) PubKey() (crypto.PubKey, error) {
err := writeMsg(pvsc.conn, &PubKeyMsg{})
// GetPubKey implements PrivValidator.
// TODO(xla): Remove when PrivValidator2 replaced PrivValidator.
func (sc *socketClient) GetPubKey() crypto.PubKey {
pubKey, err := sc.PubKey()
if err != nil {
panic(err)
}
return pubKey
}
// PubKey implements PrivValidator2.
func (sc *socketClient) PubKey() (crypto.PubKey, error) {
err := writeMsg(sc.conn, &PubKeyMsg{})
if err != nil { if err != nil {
return crypto.PubKey{}, err return crypto.PubKey{}, err
} }
res, err := readMsg(pvsc.conn)
res, err := readMsg(sc.conn)
if err != nil { if err != nil {
return crypto.PubKey{}, err return crypto.PubKey{}, err
} }
@ -115,14 +145,14 @@ func (pvsc *PrivValidatorSocketClient) PubKey() (crypto.PubKey, error) {
return res.(*PubKeyMsg).PubKey, nil return res.(*PubKeyMsg).PubKey, nil
} }
// SignVote implements PrivValidator.
func (pvsc *PrivValidatorSocketClient) SignVote(chainID string, vote *types.Vote) error {
err := writeMsg(pvsc.conn, &SignVoteMsg{Vote: vote})
// SignVote implements PrivValidator2.
func (sc *socketClient) SignVote(chainID string, vote *types.Vote) error {
err := writeMsg(sc.conn, &SignVoteMsg{Vote: vote})
if err != nil { if err != nil {
return err return err
} }
res, err := readMsg(pvsc.conn)
res, err := readMsg(sc.conn)
if err != nil { if err != nil {
return err return err
} }
@ -132,14 +162,14 @@ func (pvsc *PrivValidatorSocketClient) SignVote(chainID string, vote *types.Vote
return nil return nil
} }
// SignProposal implements PrivValidator.
func (pvsc *PrivValidatorSocketClient) SignProposal(chainID string, proposal *types.Proposal) error {
err := writeMsg(pvsc.conn, &SignProposalMsg{Proposal: proposal})
// SignProposal implements PrivValidator2.
func (sc *socketClient) SignProposal(chainID string, proposal *types.Proposal) error {
err := writeMsg(sc.conn, &SignProposalMsg{Proposal: proposal})
if err != nil { if err != nil {
return err return err
} }
res, err := readMsg(pvsc.conn)
res, err := readMsg(sc.conn)
if err != nil { if err != nil {
return err return err
} }
@ -149,14 +179,14 @@ func (pvsc *PrivValidatorSocketClient) SignProposal(chainID string, proposal *ty
return nil return nil
} }
// SignHeartbeat implements PrivValidator.
func (pvsc *PrivValidatorSocketClient) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error {
err := writeMsg(pvsc.conn, &SignHeartbeatMsg{Heartbeat: heartbeat})
// SignHeartbeat implements PrivValidator2.
func (sc *socketClient) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error {
err := writeMsg(sc.conn, &SignHeartbeatMsg{Heartbeat: heartbeat})
if err != nil { if err != nil {
return err return err
} }
res, err := readMsg(pvsc.conn)
res, err := readMsg(sc.conn)
if err != nil { if err != nil {
return err return err
} }
@ -166,22 +196,22 @@ func (pvsc *PrivValidatorSocketClient) SignHeartbeat(chainID string, heartbeat *
return nil return nil
} }
func (pvsc *PrivValidatorSocketClient) connect() (net.Conn, error) {
retries := dialRetryMax
func (sc *socketClient) connect() (net.Conn, error) {
retries := defaultDialRetryMax
RETRY_LOOP: RETRY_LOOP:
for retries > 0 { for retries > 0 {
if retries != dialRetryMax {
time.Sleep(time.Second * dialRetryIntervalSeconds)
if retries != defaultDialRetryMax {
time.Sleep(sc.connectTimeout)
} }
retries-- retries--
conn, err := cmn.Connect(pvsc.SocketAddress)
conn, err := cmn.Connect(sc.addr)
if err != nil { if err != nil {
pvsc.Logger.Error(
"pvsc connect",
"addr", pvsc.SocketAddress,
sc.Logger.Error(
"sc connect",
"addr", sc.addr,
"err", errors.Wrap(err, "connection failed"), "err", errors.Wrap(err, "connection failed"),
) )
@ -189,18 +219,18 @@ RETRY_LOOP:
} }
if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil { if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil {
pvsc.Logger.Error(
"pvsc connect",
sc.Logger.Error(
"sc connect",
"err", errors.Wrap(err, "setting connection timeout failed"), "err", errors.Wrap(err, "setting connection timeout failed"),
) )
continue continue
} }
if pvsc.privKey != nil {
conn, err = p2pconn.MakeSecretConnection(conn, pvsc.privKey.Wrap())
if sc.privKey != nil {
conn, err = p2pconn.MakeSecretConnection(conn, sc.privKey.Wrap())
if err != nil { if err != nil {
pvsc.Logger.Error(
"pvsc connect",
sc.Logger.Error(
"sc connect",
"err", errors.Wrap(err, "encrypting connection failed"), "err", errors.Wrap(err, "encrypting connection failed"),
) )


+ 105
- 89
types/priv_validator/socket_test.go View File

@ -13,80 +13,128 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
func TestPrivValidatorSocketServer(t *testing.T) {
func TestSocketClientAddress(t *testing.T) {
var ( var (
assert, require = assert.New(t), require.New(t) assert, require = assert.New(t), require.New(t)
chainID = "test-chain-secret" chainID = "test-chain-secret"
logger = log.TestingLogger()
signer = types.GenSigner()
clientPrivKey = crypto.GenPrivKeyEd25519()
serverPrivKey = crypto.GenPrivKeyEd25519()
privVal = NewTestPrivValidator(signer)
pvss = NewPrivValidatorSocketServer(
logger,
chainID,
"127.0.0.1:0",
1,
privVal,
&serverPrivKey,
)
sc, pvss = testSetupSocketPair(t, chainID)
) )
err := pvss.Start()
require.Nil(err)
defer sc.Stop()
defer pvss.Stop() defer pvss.Stop()
assert.True(pvss.IsRunning())
serverAddr, err := pvss.privVal.Address()
require.NoError(err)
pvsc := NewPrivValidatorSocketClient(
logger,
pvss.listener.Addr().String(),
&clientPrivKey,
)
clientAddr, err := sc.Address()
require.NoError(err)
err = pvsc.Start()
require.Nil(err)
defer pvsc.Stop()
assert.Equal(serverAddr, clientAddr)
assert.True(pvsc.IsRunning())
// TODO(xla): Remove when PrivValidator2 replaced PrivValidator.
assert.Equal(serverAddr, sc.GetAddress())
cAddr, err := pvsc.Address()
require.Nil(err)
}
sAddr, err := pvss.privVal.Address()
require.Nil(err)
func TestSocketClientPubKey(t *testing.T) {
var (
assert, require = assert.New(t), require.New(t)
chainID = "test-chain-secret"
sc, pvss = testSetupSocketPair(t, chainID)
)
defer sc.Stop()
defer pvss.Stop()
assert.Equal(cAddr, sAddr)
clientKey, err := sc.PubKey()
require.NoError(err)
cKey, err := pvsc.PubKey()
require.Nil(err)
privKey, err := pvss.privVal.PubKey()
require.NoError(err)
sKey, err := pvss.privVal.PubKey()
require.Nil(err)
assert.Equal(privKey, clientKey)
assert.Equal(cKey, sKey)
// TODO(xla): Remove when PrivValidator2 replaced PrivValidator.
assert.Equal(privKey, sc.GetPubKey())
}
err = pvsc.SignProposal(chainID, &types.Proposal{
Timestamp: time.Now(),
})
require.Nil(err)
func TestSocketClientProposal(t *testing.T) {
var (
assert, require = assert.New(t), require.New(t)
chainID = "test-chain-secret"
sc, pvss = testSetupSocketPair(t, chainID)
err = pvsc.SignVote(chainID, &types.Vote{
Timestamp: time.Now(),
Type: types.VoteTypePrecommit,
})
require.Nil(err)
ts = time.Now()
privProposal = &types.Proposal{Timestamp: ts}
clientProposal = &types.Proposal{Timestamp: ts}
)
defer sc.Stop()
defer pvss.Stop()
err = pvsc.SignHeartbeat(chainID, &types.Heartbeat{})
require.Nil(err)
require.NoError(pvss.privVal.SignProposal(chainID, privProposal))
require.NoError(sc.SignProposal(chainID, clientProposal))
assert.Equal(privProposal.Signature, clientProposal.Signature)
} }
func TestPrivValidatorSocketServerWithoutSecret(t *testing.T) {
func TestSocketClientVote(t *testing.T) {
var ( var (
assert, require = assert.New(t), require.New(t) assert, require = assert.New(t), require.New(t)
chainID = "test-chain-secret" chainID = "test-chain-secret"
sc, pvss = testSetupSocketPair(t, chainID)
ts = time.Now()
vType = types.VoteTypePrecommit
want = &types.Vote{Timestamp: ts, Type: vType}
have = &types.Vote{Timestamp: ts, Type: vType}
)
defer sc.Stop()
defer pvss.Stop()
require.NoError(pvss.privVal.SignVote(chainID, want))
require.NoError(sc.SignVote(chainID, have))
assert.Equal(want.Signature, have.Signature)
}
func TestSocketClientHeartbeat(t *testing.T) {
var (
assert, require = assert.New(t), require.New(t)
chainID = "test-chain-secret"
sc, pvss = testSetupSocketPair(t, chainID)
want = &types.Heartbeat{}
have = &types.Heartbeat{}
)
defer sc.Stop()
defer pvss.Stop()
require.NoError(pvss.privVal.SignHeartbeat(chainID, want))
require.NoError(sc.SignHeartbeat(chainID, have))
assert.Equal(want.Signature, have.Signature)
}
func TestSocketClientConnectRetryMax(t *testing.T) {
var (
assert, _ = assert.New(t), require.New(t)
logger = log.TestingLogger()
clientPrivKey = crypto.GenPrivKeyEd25519()
sc = NewSocketClient(
logger,
"127.0.0.1:0",
&clientPrivKey,
)
)
defer sc.Stop()
SocketClientTimeout(time.Millisecond)(sc)
assert.EqualError(sc.Start(), ErrDialRetryMax.Error())
}
func testSetupSocketPair(t *testing.T, chainID string) (*socketClient, *PrivValidatorSocketServer) {
var (
assert, require = assert.New(t), require.New(t)
logger = log.TestingLogger() logger = log.TestingLogger()
signer = types.GenSigner() signer = types.GenSigner()
clientPrivKey = crypto.GenPrivKeyEd25519()
serverPrivKey = crypto.GenPrivKeyEd25519()
privVal = NewTestPrivValidator(signer) privVal = NewTestPrivValidator(signer)
pvss = NewPrivValidatorSocketServer( pvss = NewPrivValidatorSocketServer(
logger, logger,
@ -94,55 +142,23 @@ func TestPrivValidatorSocketServerWithoutSecret(t *testing.T) {
"127.0.0.1:0", "127.0.0.1:0",
1, 1,
privVal, privVal,
nil,
&serverPrivKey,
) )
) )
err := pvss.Start() err := pvss.Start()
require.Nil(err)
defer pvss.Stop()
require.NoError(err)
assert.True(pvss.IsRunning()) assert.True(pvss.IsRunning())
pvsc := NewPrivValidatorSocketClient(
sc := NewSocketClient(
logger, logger,
pvss.listener.Addr().String(), pvss.listener.Addr().String(),
nil,
&clientPrivKey,
) )
err = pvsc.Start()
require.Nil(err)
defer pvsc.Stop()
assert.True(pvsc.IsRunning())
cAddr, err := pvsc.Address()
require.Nil(err)
sAddr, err := pvss.privVal.Address()
require.Nil(err)
assert.Equal(cAddr, sAddr)
cKey, err := pvsc.PubKey()
require.Nil(err)
sKey, err := pvss.privVal.PubKey()
require.Nil(err)
assert.Equal(cKey, sKey)
err = pvsc.SignProposal(chainID, &types.Proposal{
Timestamp: time.Now(),
})
require.Nil(err)
err = pvsc.SignVote(chainID, &types.Vote{
Timestamp: time.Now(),
Type: types.VoteTypePrecommit,
})
require.Nil(err)
err = sc.Start()
require.NoError(err)
assert.True(sc.IsRunning())
err = pvsc.SignHeartbeat(chainID, &types.Heartbeat{})
require.Nil(err)
return sc, pvss
} }

Loading…
Cancel
Save