Browse Source

Integrate private validator socket client

Following ADDR 008 the node will connect to an external
process to handle signing requests. Operation of the external process is
left to the user.

* introduce alias for PrivValidator interface on socket client
* integrate socket client in node
* structure tests
* remove unnecessary flag
pull/1237/head
Alexander Simmerl 7 years ago
parent
commit
74d3f7e1fd
No known key found for this signature in database GPG Key ID: 7A3963B54B0ECA50
5 changed files with 217 additions and 175 deletions
  1. +0
    -1
      cmd/tendermint/commands/run_node.go
  2. +15
    -20
      config/config.go
  3. +7
    -5
      node/node.go
  4. +90
    -60
      types/priv_validator/socket.go
  5. +105
    -89
      types/priv_validator/socket_test.go

+ 0
- 1
cmd/tendermint/commands/run_node.go View File

@ -16,7 +16,6 @@ func AddNodeFlags(cmd *cobra.Command) {
// priv val flags // priv val flags
cmd.Flags().String("priv_validator_addr", config.PrivValidatorAddr, "Socket address for private validator") cmd.Flags().String("priv_validator_addr", config.PrivValidatorAddr, "Socket address for private validator")
cmd.Flags().Int("priv_validator_max_conn", config.PrivValidatorMaxConn, "Limit of concurrent connections to the PrivValidator")
// node flags // node flags
cmd.Flags().Bool("fast_sync", config.FastSync, "Fast blockchain syncing") cmd.Flags().Bool("fast_sync", config.FastSync, "Fast blockchain syncing")


+ 15
- 20
config/config.go View File

@ -21,10 +21,9 @@ var (
defaultConfigFileName = "config.toml" defaultConfigFileName = "config.toml"
defaultGenesisJSONName = "genesis.json" defaultGenesisJSONName = "genesis.json"
defaultPrivValName = "priv_validator.json"
defaultPrivValMaxConn = 3
defaultNodeKeyName = "node_key.json"
defaultAddrBookName = "addrbook.json"
defaultPrivValName = "priv_validator.json"
defaultNodeKeyName = "node_key.json"
defaultAddrBookName = "addrbook.json"
defaultConfigFilePath = filepath.Join(defaultConfigDir, defaultConfigFileName) defaultConfigFilePath = filepath.Join(defaultConfigDir, defaultConfigFileName)
defaultGenesisJSONPath = filepath.Join(defaultConfigDir, defaultGenesisJSONName) defaultGenesisJSONPath = filepath.Join(defaultConfigDir, defaultGenesisJSONName)
@ -108,9 +107,6 @@ type BaseConfig struct {
// TCP or UNIX socket address of the PrivValidator server // TCP or UNIX socket address of the PrivValidator server
PrivValidatorAddr string `mapstructure:"priv_validator_addr"` PrivValidatorAddr string `mapstructure:"priv_validator_addr"`
// Limit of concurrent connections to the PrivValidator.
PrivValidatorMaxConn int `mapstructure:"priv_validator_max_conn"`
// TCP or UNIX socket address of the ABCI application, // TCP or UNIX socket address of the ABCI application,
// or the name of an ABCI application compiled in with the Tendermint binary // or the name of an ABCI application compiled in with the Tendermint binary
ProxyApp string `mapstructure:"proxy_app"` ProxyApp string `mapstructure:"proxy_app"`
@ -147,19 +143,18 @@ func (c BaseConfig) ChainID() string {
// DefaultBaseConfig returns a default base configuration for a Tendermint node // DefaultBaseConfig returns a default base configuration for a Tendermint node
func DefaultBaseConfig() BaseConfig { func DefaultBaseConfig() BaseConfig {
return BaseConfig{ return BaseConfig{
Genesis: defaultGenesisJSONPath,
PrivValidator: defaultPrivValPath,
PrivValidatorMaxConn: defaultPrivValMaxConn,
NodeKey: defaultNodeKeyPath,
Moniker: defaultMoniker,
ProxyApp: "tcp://127.0.0.1:46658",
ABCI: "socket",
LogLevel: DefaultPackageLogLevels(),
ProfListenAddress: "",
FastSync: true,
FilterPeers: false,
DBBackend: "leveldb",
DBPath: "data",
Genesis: defaultGenesisJSONPath,
PrivValidator: defaultPrivValPath,
NodeKey: defaultNodeKeyPath,
Moniker: defaultMoniker,
ProxyApp: "tcp://127.0.0.1:46658",
ABCI: "socket",
LogLevel: DefaultPackageLogLevels(),
ProfListenAddress: "",
FastSync: true,
FilterPeers: false,
DBBackend: "leveldb",
DBPath: "data",
} }
} }


+ 7
- 5
node/node.go View File

@ -173,20 +173,22 @@ func NewNode(config *cfg.Config,
// reload the state (it may have been updated by the handshake) // reload the state (it may have been updated by the handshake)
state = sm.LoadState(stateDB) state = sm.LoadState(stateDB)
// Connect to external signing process, if an address is provided.
if config.PrivValidatorAddr != "" { if config.PrivValidatorAddr != "" {
var ( var (
privKey = crypto.GenPrivKeyEd25519() privKey = crypto.GenPrivKeyEd25519()
pvss = priv_val.NewPrivValidatorSocketServer(
pvsc = priv_val.NewSocketClient(
logger.With("module", "priv_val"), logger.With("module", "priv_val"),
config.ChainID(),
config.PrivValidatorAddr, config.PrivValidatorAddr,
config.PrivValidatorMaxConn,
priv_val.LoadPrivValidatorJSON(config.PrivValidatorFile()),
&privKey, &privKey,
) )
) )
pvss.Start()
if err := pvsc.Start(); err != nil {
return nil, fmt.Errorf("Error starting private validator client: %v", err)
}
privValidator = pvsc
} }
// Decide whether to fast-sync or not // Decide whether to fast-sync or not


+ 90
- 60
types/priv_validator/socket.go View File

@ -18,9 +18,9 @@ import (
) )
const ( const (
connDeadlineSeconds = 3
dialRetryIntervalSeconds = 1
dialRetryMax = 10
defaultConnDeadlineSeconds = 3
defaultDialRetryIntervalSeconds = 1
defaultDialRetryMax = 10
) )
// Socket errors. // Socket errors.
@ -29,70 +29,89 @@ var (
) )
var ( var (
connDeadline = time.Second * connDeadlineSeconds
connDeadline = time.Second * defaultConnDeadlineSeconds
) )
//-----------------------------------------------------------------
// SocketClientOption sets an optional parameter on the SocketClient.
type SocketClientOption func(*socketClient)
var _ types.PrivValidator2 = (*PrivValidatorSocketClient)(nil)
// SocketClientTimeout sets the timeout for connecting to the external socket
// address.
func SocketClientTimeout(timeout time.Duration) SocketClientOption {
return func(sc *socketClient) { sc.connectTimeout = timeout }
}
// PrivValidatorSocketClient implements PrivValidator.
// It uses a socket to request signatures.
type PrivValidatorSocketClient struct {
// socketClient implements PrivValidator, it uses a socket to request signatures
// from an external process.
type socketClient struct {
cmn.BaseService cmn.BaseService
conn net.Conn conn net.Conn
privKey *crypto.PrivKeyEd25519 privKey *crypto.PrivKeyEd25519
ID types.ValidatorID
SocketAddress string
addr string
connectTimeout time.Duration
} }
// NewPrivValidatorSocketClient returns an instance of
// PrivValidatorSocketClient.
func NewPrivValidatorSocketClient(
// Check that socketClient implements PrivValidator2.
var _ types.PrivValidator2 = (*socketClient)(nil)
// NewsocketClient returns an instance of socketClient.
func NewSocketClient(
logger log.Logger, logger log.Logger,
socketAddr string, socketAddr string,
privKey *crypto.PrivKeyEd25519, privKey *crypto.PrivKeyEd25519,
) *PrivValidatorSocketClient {
pvsc := &PrivValidatorSocketClient{
SocketAddress: socketAddr,
privKey: privKey,
) *socketClient {
sc := &socketClient{
addr: socketAddr,
connectTimeout: time.Second * defaultConnDeadlineSeconds,
privKey: privKey,
} }
pvsc.BaseService = *cmn.NewBaseService(logger, "privValidatorSocketClient", pvsc)
sc.BaseService = *cmn.NewBaseService(logger, "privValidatorsocketClient", sc)
return pvsc
return sc
} }
// OnStart implements cmn.Service. // OnStart implements cmn.Service.
func (pvsc *PrivValidatorSocketClient) OnStart() error {
if err := pvsc.BaseService.OnStart(); err != nil {
func (sc *socketClient) OnStart() error {
if err := sc.BaseService.OnStart(); err != nil {
return err return err
} }
conn, err := pvsc.connect()
conn, err := sc.connect()
if err != nil { if err != nil {
return err return err
} }
pvsc.conn = conn
sc.conn = conn
return nil return nil
} }
// OnStop implements cmn.Service. // OnStop implements cmn.Service.
func (pvsc *PrivValidatorSocketClient) OnStop() {
pvsc.BaseService.OnStop()
func (sc *socketClient) OnStop() {
sc.BaseService.OnStop()
if pvsc.conn != nil {
pvsc.conn.Close()
if sc.conn != nil {
sc.conn.Close()
} }
} }
// GetAddress implements PrivValidator.
// TODO(xla): Remove when PrivValidator2 replaced PrivValidator.
func (sc *socketClient) GetAddress() types.Address {
addr, err := sc.Address()
if err != nil {
panic(err)
}
return addr
}
// Address is an alias for PubKey().Address(). // Address is an alias for PubKey().Address().
func (pvsc *PrivValidatorSocketClient) Address() (cmn.HexBytes, error) {
p, err := pvsc.PubKey()
func (sc *socketClient) Address() (cmn.HexBytes, error) {
p, err := sc.PubKey()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -100,14 +119,25 @@ func (pvsc *PrivValidatorSocketClient) Address() (cmn.HexBytes, error) {
return p.Address(), nil return p.Address(), nil
} }
// PubKey implements PrivValidator.
func (pvsc *PrivValidatorSocketClient) PubKey() (crypto.PubKey, error) {
err := writeMsg(pvsc.conn, &PubKeyMsg{})
// GetPubKey implements PrivValidator.
// TODO(xla): Remove when PrivValidator2 replaced PrivValidator.
func (sc *socketClient) GetPubKey() crypto.PubKey {
pubKey, err := sc.PubKey()
if err != nil {
panic(err)
}
return pubKey
}
// PubKey implements PrivValidator2.
func (sc *socketClient) PubKey() (crypto.PubKey, error) {
err := writeMsg(sc.conn, &PubKeyMsg{})
if err != nil { if err != nil {
return crypto.PubKey{}, err return crypto.PubKey{}, err
} }
res, err := readMsg(pvsc.conn)
res, err := readMsg(sc.conn)
if err != nil { if err != nil {
return crypto.PubKey{}, err return crypto.PubKey{}, err
} }
@ -115,14 +145,14 @@ func (pvsc *PrivValidatorSocketClient) PubKey() (crypto.PubKey, error) {
return res.(*PubKeyMsg).PubKey, nil return res.(*PubKeyMsg).PubKey, nil
} }
// SignVote implements PrivValidator.
func (pvsc *PrivValidatorSocketClient) SignVote(chainID string, vote *types.Vote) error {
err := writeMsg(pvsc.conn, &SignVoteMsg{Vote: vote})
// SignVote implements PrivValidator2.
func (sc *socketClient) SignVote(chainID string, vote *types.Vote) error {
err := writeMsg(sc.conn, &SignVoteMsg{Vote: vote})
if err != nil { if err != nil {
return err return err
} }
res, err := readMsg(pvsc.conn)
res, err := readMsg(sc.conn)
if err != nil { if err != nil {
return err return err
} }
@ -132,14 +162,14 @@ func (pvsc *PrivValidatorSocketClient) SignVote(chainID string, vote *types.Vote
return nil return nil
} }
// SignProposal implements PrivValidator.
func (pvsc *PrivValidatorSocketClient) SignProposal(chainID string, proposal *types.Proposal) error {
err := writeMsg(pvsc.conn, &SignProposalMsg{Proposal: proposal})
// SignProposal implements PrivValidator2.
func (sc *socketClient) SignProposal(chainID string, proposal *types.Proposal) error {
err := writeMsg(sc.conn, &SignProposalMsg{Proposal: proposal})
if err != nil { if err != nil {
return err return err
} }
res, err := readMsg(pvsc.conn)
res, err := readMsg(sc.conn)
if err != nil { if err != nil {
return err return err
} }
@ -149,14 +179,14 @@ func (pvsc *PrivValidatorSocketClient) SignProposal(chainID string, proposal *ty
return nil return nil
} }
// SignHeartbeat implements PrivValidator.
func (pvsc *PrivValidatorSocketClient) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error {
err := writeMsg(pvsc.conn, &SignHeartbeatMsg{Heartbeat: heartbeat})
// SignHeartbeat implements PrivValidator2.
func (sc *socketClient) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error {
err := writeMsg(sc.conn, &SignHeartbeatMsg{Heartbeat: heartbeat})
if err != nil { if err != nil {
return err return err
} }
res, err := readMsg(pvsc.conn)
res, err := readMsg(sc.conn)
if err != nil { if err != nil {
return err return err
} }
@ -166,22 +196,22 @@ func (pvsc *PrivValidatorSocketClient) SignHeartbeat(chainID string, heartbeat *
return nil return nil
} }
func (pvsc *PrivValidatorSocketClient) connect() (net.Conn, error) {
retries := dialRetryMax
func (sc *socketClient) connect() (net.Conn, error) {
retries := defaultDialRetryMax
RETRY_LOOP: RETRY_LOOP:
for retries > 0 { for retries > 0 {
if retries != dialRetryMax {
time.Sleep(time.Second * dialRetryIntervalSeconds)
if retries != defaultDialRetryMax {
time.Sleep(sc.connectTimeout)
} }
retries-- retries--
conn, err := cmn.Connect(pvsc.SocketAddress)
conn, err := cmn.Connect(sc.addr)
if err != nil { if err != nil {
pvsc.Logger.Error(
"pvsc connect",
"addr", pvsc.SocketAddress,
sc.Logger.Error(
"sc connect",
"addr", sc.addr,
"err", errors.Wrap(err, "connection failed"), "err", errors.Wrap(err, "connection failed"),
) )
@ -189,18 +219,18 @@ RETRY_LOOP:
} }
if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil { if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil {
pvsc.Logger.Error(
"pvsc connect",
sc.Logger.Error(
"sc connect",
"err", errors.Wrap(err, "setting connection timeout failed"), "err", errors.Wrap(err, "setting connection timeout failed"),
) )
continue continue
} }
if pvsc.privKey != nil {
conn, err = p2pconn.MakeSecretConnection(conn, pvsc.privKey.Wrap())
if sc.privKey != nil {
conn, err = p2pconn.MakeSecretConnection(conn, sc.privKey.Wrap())
if err != nil { if err != nil {
pvsc.Logger.Error(
"pvsc connect",
sc.Logger.Error(
"sc connect",
"err", errors.Wrap(err, "encrypting connection failed"), "err", errors.Wrap(err, "encrypting connection failed"),
) )


+ 105
- 89
types/priv_validator/socket_test.go View File

@ -13,80 +13,128 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
func TestPrivValidatorSocketServer(t *testing.T) {
func TestSocketClientAddress(t *testing.T) {
var ( var (
assert, require = assert.New(t), require.New(t) assert, require = assert.New(t), require.New(t)
chainID = "test-chain-secret" chainID = "test-chain-secret"
logger = log.TestingLogger()
signer = types.GenSigner()
clientPrivKey = crypto.GenPrivKeyEd25519()
serverPrivKey = crypto.GenPrivKeyEd25519()
privVal = NewTestPrivValidator(signer)
pvss = NewPrivValidatorSocketServer(
logger,
chainID,
"127.0.0.1:0",
1,
privVal,
&serverPrivKey,
)
sc, pvss = testSetupSocketPair(t, chainID)
) )
err := pvss.Start()
require.Nil(err)
defer sc.Stop()
defer pvss.Stop() defer pvss.Stop()
assert.True(pvss.IsRunning())
serverAddr, err := pvss.privVal.Address()
require.NoError(err)
pvsc := NewPrivValidatorSocketClient(
logger,
pvss.listener.Addr().String(),
&clientPrivKey,
)
clientAddr, err := sc.Address()
require.NoError(err)
err = pvsc.Start()
require.Nil(err)
defer pvsc.Stop()
assert.Equal(serverAddr, clientAddr)
assert.True(pvsc.IsRunning())
// TODO(xla): Remove when PrivValidator2 replaced PrivValidator.
assert.Equal(serverAddr, sc.GetAddress())
cAddr, err := pvsc.Address()
require.Nil(err)
}
sAddr, err := pvss.privVal.Address()
require.Nil(err)
func TestSocketClientPubKey(t *testing.T) {
var (
assert, require = assert.New(t), require.New(t)
chainID = "test-chain-secret"
sc, pvss = testSetupSocketPair(t, chainID)
)
defer sc.Stop()
defer pvss.Stop()
assert.Equal(cAddr, sAddr)
clientKey, err := sc.PubKey()
require.NoError(err)
cKey, err := pvsc.PubKey()
require.Nil(err)
privKey, err := pvss.privVal.PubKey()
require.NoError(err)
sKey, err := pvss.privVal.PubKey()
require.Nil(err)
assert.Equal(privKey, clientKey)
assert.Equal(cKey, sKey)
// TODO(xla): Remove when PrivValidator2 replaced PrivValidator.
assert.Equal(privKey, sc.GetPubKey())
}
err = pvsc.SignProposal(chainID, &types.Proposal{
Timestamp: time.Now(),
})
require.Nil(err)
func TestSocketClientProposal(t *testing.T) {
var (
assert, require = assert.New(t), require.New(t)
chainID = "test-chain-secret"
sc, pvss = testSetupSocketPair(t, chainID)
err = pvsc.SignVote(chainID, &types.Vote{
Timestamp: time.Now(),
Type: types.VoteTypePrecommit,
})
require.Nil(err)
ts = time.Now()
privProposal = &types.Proposal{Timestamp: ts}
clientProposal = &types.Proposal{Timestamp: ts}
)
defer sc.Stop()
defer pvss.Stop()
err = pvsc.SignHeartbeat(chainID, &types.Heartbeat{})
require.Nil(err)
require.NoError(pvss.privVal.SignProposal(chainID, privProposal))
require.NoError(sc.SignProposal(chainID, clientProposal))
assert.Equal(privProposal.Signature, clientProposal.Signature)
} }
func TestPrivValidatorSocketServerWithoutSecret(t *testing.T) {
func TestSocketClientVote(t *testing.T) {
var ( var (
assert, require = assert.New(t), require.New(t) assert, require = assert.New(t), require.New(t)
chainID = "test-chain-secret" chainID = "test-chain-secret"
sc, pvss = testSetupSocketPair(t, chainID)
ts = time.Now()
vType = types.VoteTypePrecommit
want = &types.Vote{Timestamp: ts, Type: vType}
have = &types.Vote{Timestamp: ts, Type: vType}
)
defer sc.Stop()
defer pvss.Stop()
require.NoError(pvss.privVal.SignVote(chainID, want))
require.NoError(sc.SignVote(chainID, have))
assert.Equal(want.Signature, have.Signature)
}
func TestSocketClientHeartbeat(t *testing.T) {
var (
assert, require = assert.New(t), require.New(t)
chainID = "test-chain-secret"
sc, pvss = testSetupSocketPair(t, chainID)
want = &types.Heartbeat{}
have = &types.Heartbeat{}
)
defer sc.Stop()
defer pvss.Stop()
require.NoError(pvss.privVal.SignHeartbeat(chainID, want))
require.NoError(sc.SignHeartbeat(chainID, have))
assert.Equal(want.Signature, have.Signature)
}
func TestSocketClientConnectRetryMax(t *testing.T) {
var (
assert, _ = assert.New(t), require.New(t)
logger = log.TestingLogger()
clientPrivKey = crypto.GenPrivKeyEd25519()
sc = NewSocketClient(
logger,
"127.0.0.1:0",
&clientPrivKey,
)
)
defer sc.Stop()
SocketClientTimeout(time.Millisecond)(sc)
assert.EqualError(sc.Start(), ErrDialRetryMax.Error())
}
func testSetupSocketPair(t *testing.T, chainID string) (*socketClient, *PrivValidatorSocketServer) {
var (
assert, require = assert.New(t), require.New(t)
logger = log.TestingLogger() logger = log.TestingLogger()
signer = types.GenSigner() signer = types.GenSigner()
clientPrivKey = crypto.GenPrivKeyEd25519()
serverPrivKey = crypto.GenPrivKeyEd25519()
privVal = NewTestPrivValidator(signer) privVal = NewTestPrivValidator(signer)
pvss = NewPrivValidatorSocketServer( pvss = NewPrivValidatorSocketServer(
logger, logger,
@ -94,55 +142,23 @@ func TestPrivValidatorSocketServerWithoutSecret(t *testing.T) {
"127.0.0.1:0", "127.0.0.1:0",
1, 1,
privVal, privVal,
nil,
&serverPrivKey,
) )
) )
err := pvss.Start() err := pvss.Start()
require.Nil(err)
defer pvss.Stop()
require.NoError(err)
assert.True(pvss.IsRunning()) assert.True(pvss.IsRunning())
pvsc := NewPrivValidatorSocketClient(
sc := NewSocketClient(
logger, logger,
pvss.listener.Addr().String(), pvss.listener.Addr().String(),
nil,
&clientPrivKey,
) )
err = pvsc.Start()
require.Nil(err)
defer pvsc.Stop()
assert.True(pvsc.IsRunning())
cAddr, err := pvsc.Address()
require.Nil(err)
sAddr, err := pvss.privVal.Address()
require.Nil(err)
assert.Equal(cAddr, sAddr)
cKey, err := pvsc.PubKey()
require.Nil(err)
sKey, err := pvss.privVal.PubKey()
require.Nil(err)
assert.Equal(cKey, sKey)
err = pvsc.SignProposal(chainID, &types.Proposal{
Timestamp: time.Now(),
})
require.Nil(err)
err = pvsc.SignVote(chainID, &types.Vote{
Timestamp: time.Now(),
Type: types.VoteTypePrecommit,
})
require.Nil(err)
err = sc.Start()
require.NoError(err)
assert.True(sc.IsRunning())
err = pvsc.SignHeartbeat(chainID, &types.Heartbeat{})
require.Nil(err)
return sc, pvss
} }

Loading…
Cancel
Save