diff --git a/cmd/priv_val_server/main.go b/cmd/priv_val_server/main.go new file mode 100644 index 000000000..57c3355f9 --- /dev/null +++ b/cmd/priv_val_server/main.go @@ -0,0 +1,47 @@ +package main + +import ( + "flag" + "os" + + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" + + priv_val "github.com/tendermint/tendermint/types/priv_validator" +) + +func main() { + var ( + chainID = flag.String("chain-id", "mychain", "chain id") + listenAddr = flag.String("laddr", ":46659", "Validator listen address (0.0.0.0:0 means any interface, any port") + maxConn = flag.Int("clients", 3, "maximum of concurrent connections") + privValPath = flag.String("priv", "", "priv val file path") + + logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "priv_val") + ) + flag.Parse() + + logger.Info( + "Starting private validator", + "chainID", *chainID, + "listenAddr", *listenAddr, + "maxConn", *maxConn, + "privPath", *privValPath, + ) + + privVal := priv_val.LoadPrivValidatorJSON(*privValPath) + + pvss := priv_val.NewPrivValidatorSocketServer( + logger, + *chainID, + *listenAddr, + *maxConn, + privVal, + nil, + ) + pvss.Start() + + cmn.TrapSignal(func() { + pvss.Stop() + }) +} diff --git a/cmd/tendermint/commands/run_node.go b/cmd/tendermint/commands/run_node.go index 76d61671d..481aa0729 100644 --- a/cmd/tendermint/commands/run_node.go +++ b/cmd/tendermint/commands/run_node.go @@ -14,6 +14,10 @@ func AddNodeFlags(cmd *cobra.Command) { // bind flags cmd.Flags().String("moniker", config.Moniker, "Node Name") + // priv val flags + cmd.Flags().String("priv_validator_addr", config.PrivValidatorAddr, "Socket address for private validator") + cmd.Flags().Int("priv_validator_max_conn", config.PrivValidatorMaxConn, "Limit of concurrent connections to the PrivValidator") + // node flags cmd.Flags().Bool("fast_sync", config.FastSync, "Fast blockchain syncing") diff --git a/config/config.go b/config/config.go index 621847bd0..bdafa7a62 100644 --- a/config/config.go +++ b/config/config.go @@ -20,9 +20,11 @@ var ( defaultConfigFileName = "config.toml" defaultGenesisJSONName = "genesis.json" - defaultPrivValName = "priv_validator.json" - defaultNodeKeyName = "node_key.json" - defaultAddrBookName = "addrbook.json" + + defaultPrivValName = "priv_validator.json" + defaultPrivValMaxConn = 3 + defaultNodeKeyName = "node_key.json" + defaultAddrBookName = "addrbook.json" defaultConfigFilePath = filepath.Join(defaultConfigDir, defaultConfigFileName) defaultGenesisJSONPath = filepath.Join(defaultConfigDir, defaultGenesisJSONName) @@ -103,6 +105,12 @@ type BaseConfig struct { // A custom human readable name for this node Moniker string `mapstructure:"moniker"` + // TCP or UNIX socket address of the PrivValidator server + PrivValidatorAddr string `mapstructure:"priv_validator_addr"` + + // Limit of concurrent connections to the PrivValidator. + PrivValidatorMaxConn int `mapstructure:"priv_validator_max_conn"` + // TCP or UNIX socket address of the ABCI application, // or the name of an ABCI application compiled in with the Tendermint binary ProxyApp string `mapstructure:"proxy_app"` @@ -139,18 +147,19 @@ func (c BaseConfig) ChainID() string { // DefaultBaseConfig returns a default base configuration for a Tendermint node func DefaultBaseConfig() BaseConfig { return BaseConfig{ - Genesis: defaultGenesisJSONPath, - PrivValidator: defaultPrivValPath, - NodeKey: defaultNodeKeyPath, - Moniker: defaultMoniker, - ProxyApp: "tcp://127.0.0.1:46658", - ABCI: "socket", - LogLevel: DefaultPackageLogLevels(), - ProfListenAddress: "", - FastSync: true, - FilterPeers: false, - DBBackend: "leveldb", - DBPath: "data", + Genesis: defaultGenesisJSONPath, + PrivValidator: defaultPrivValPath, + PrivValidatorMaxConn: defaultPrivValMaxConn, + NodeKey: defaultNodeKeyPath, + Moniker: defaultMoniker, + ProxyApp: "tcp://127.0.0.1:46658", + ABCI: "socket", + LogLevel: DefaultPackageLogLevels(), + ProfListenAddress: "", + FastSync: true, + FilterPeers: false, + DBBackend: "leveldb", + DBPath: "data", } } diff --git a/docs/architecture/adr-008-priv-validator.md b/docs/architecture/adr-008-priv-validator.md index dfee112c6..94d164785 100644 --- a/docs/architecture/adr-008-priv-validator.md +++ b/docs/architecture/adr-008-priv-validator.md @@ -8,7 +8,7 @@ For instance, see https://github.com/tendermint/tendermint/issues/673 The goal is to have a clean PrivValidator interface like: -`` +``` type PrivValidator interface { Address() data.Bytes PubKey() crypto.PubKey @@ -60,6 +60,7 @@ type ValidatorID struct { Address data.Bytes `json:"address"` PubKey crypto.PubKey `json:"pub_key"` } +``` ### LastSignedInfo diff --git a/node/node.go b/node/node.go index cd2b4bcb5..4a0afd852 100644 --- a/node/node.go +++ b/node/node.go @@ -34,6 +34,7 @@ import ( "github.com/tendermint/tendermint/state/txindex/kv" "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/types" + priv_val "github.com/tendermint/tendermint/types/priv_validator" "github.com/tendermint/tendermint/version" _ "net/http/pprof" @@ -82,7 +83,8 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), DefaultGenesisDocProviderFunc(config), DefaultDBProvider, - logger) + logger, + ) } //------------------------------------------------------------------------------ @@ -171,6 +173,22 @@ func NewNode(config *cfg.Config, // reload the state (it may have been updated by the handshake) state = sm.LoadState(stateDB) + if config.PrivValidatorAddr != "" { + var ( + privKey = crypto.GenPrivKeyEd25519() + pvss = priv_val.NewPrivValidatorSocketServer( + logger.With("module", "priv_val"), + config.ChainID(), + config.PrivValidatorAddr, + config.PrivValidatorMaxConn, + priv_val.LoadPrivValidatorJSON(config.PrivValidatorFile()), + &privKey, + ) + ) + + pvss.Start() + } + // Decide whether to fast-sync or not // We don't fast-sync when the only validator is us. fastSync := config.FastSync diff --git a/types/canonical_json.go b/types/canonical_json.go index 45d12b45f..879bb5c52 100644 --- a/types/canonical_json.go +++ b/types/canonical_json.go @@ -9,8 +9,8 @@ import ( // canonical json is go-wire's json for structs with fields in alphabetical order -// timeFormat is used for generating the sigs -const timeFormat = wire.RFC3339Millis +// TimeFormat is used for generating the sigs +const TimeFormat = wire.RFC3339Millis type CanonicalJSONBlockID struct { Hash cmn.HexBytes `json:"hash,omitempty"` @@ -117,5 +117,5 @@ func CanonicalTime(t time.Time) string { // note that sending time over go-wire resets it to // local time, we need to force UTC here, so the // signatures match - return t.UTC().Format(timeFormat) + return t.UTC().Format(TimeFormat) } diff --git a/types/priv_validator.go b/types/priv_validator.go index 062fe09d2..237e3f796 100644 --- a/types/priv_validator.go +++ b/types/priv_validator.go @@ -34,6 +34,56 @@ func voteToStep(vote *Vote) int8 { } } +//-------------------------------------------------------------- +// PrivValidator is being upgraded! See types/priv_validator + +// ValidatorID contains the identity of the validator. +type ValidatorID struct { + Address cmn.HexBytes `json:"address"` + PubKey crypto.PubKey `json:"pub_key"` +} + +// PrivValidator defines the functionality of a local Tendermint validator +// that signs votes, proposals, and heartbeats, and never double signs. +type PrivValidator2 interface { + Address() (Address, error) // redundant since .PubKey().Address() + PubKey() (crypto.PubKey, error) + + SignVote(chainID string, vote *Vote) error + SignProposal(chainID string, proposal *Proposal) error + SignHeartbeat(chainID string, heartbeat *Heartbeat) error +} + +type TestSigner interface { + Address() cmn.HexBytes + PubKey() crypto.PubKey + Sign([]byte) (crypto.Signature, error) +} + +func GenSigner() TestSigner { + return &DefaultTestSigner{ + crypto.GenPrivKeyEd25519().Wrap(), + } +} + +type DefaultTestSigner struct { + crypto.PrivKey +} + +func (ds *DefaultTestSigner) Address() cmn.HexBytes { + return ds.PubKey().Address() +} + +func (ds *DefaultTestSigner) PubKey() crypto.PubKey { + return ds.PrivKey.PubKey() +} + +func (ds *DefaultTestSigner) Sign(msg []byte) (crypto.Signature, error) { + return ds.PrivKey.Sign(msg), nil +} + +//-------------------------------------------------------------- + // PrivValidator defines the functionality of a local Tendermint validator // that signs votes, proposals, and heartbeats, and never double signs. type PrivValidator interface { @@ -379,7 +429,7 @@ func checkVotesOnlyDifferByTimestamp(lastSignBytes, newSignBytes []byte) (time.T panic(fmt.Sprintf("signBytes cannot be unmarshalled into vote: %v", err)) } - lastTime, err := time.Parse(timeFormat, lastVote.Vote.Timestamp) + lastTime, err := time.Parse(TimeFormat, lastVote.Vote.Timestamp) if err != nil { panic(err) } @@ -405,7 +455,7 @@ func checkProposalsOnlyDifferByTimestamp(lastSignBytes, newSignBytes []byte) (ti panic(fmt.Sprintf("signBytes cannot be unmarshalled into proposal: %v", err)) } - lastTime, err := time.Parse(timeFormat, lastProposal.Proposal.Timestamp) + lastTime, err := time.Parse(TimeFormat, lastProposal.Proposal.Timestamp) if err != nil { panic(err) } diff --git a/types/priv_validator/json.go b/types/priv_validator/json.go index 446587765..6e5261386 100644 --- a/types/priv_validator/json.go +++ b/types/priv_validator/json.go @@ -13,7 +13,7 @@ import ( ) // PrivValidator aliases types.PrivValidator -type PrivValidator = types.PrivValidator +type PrivValidator = types.PrivValidator2 //----------------------------------------------------- @@ -42,7 +42,7 @@ func (pk *PrivKey) UnmarshalJSON(b []byte) error { //----------------------------------------------------- -var _ types.PrivValidator = (*PrivValidatorJSON)(nil) +var _ types.PrivValidator2 = (*PrivValidatorJSON)(nil) // PrivValidatorJSON wraps PrivValidatorUnencrypted // and persists it to disk after every SignVote and SignProposal. @@ -76,7 +76,12 @@ func (pvj *PrivValidatorJSON) SignProposal(chainID string, proposal *types.Propo // String returns a string representation of the PrivValidatorJSON. func (pvj *PrivValidatorJSON) String() string { - return fmt.Sprintf("PrivValidator{%v %v}", pvj.Address(), pvj.PrivValidatorUnencrypted.String()) + addr, err := pvj.Address() + if err != nil { + panic(err) + } + + return fmt.Sprintf("PrivValidator{%v %v}", addr, pvj.PrivValidatorUnencrypted.String()) } func (pvj *PrivValidatorJSON) Save() { @@ -170,7 +175,17 @@ func (pvs PrivValidatorsByAddress) Len() int { } func (pvs PrivValidatorsByAddress) Less(i, j int) bool { - return bytes.Compare(pvs[i].Address(), pvs[j].Address()) == -1 + iaddr, err := pvs[j].Address() + if err != nil { + panic(err) + } + + jaddr, err := pvs[i].Address() + if err != nil { + panic(err) + } + + return bytes.Compare(iaddr, jaddr) == -1 } func (pvs PrivValidatorsByAddress) Swap(i, j int) { diff --git a/types/priv_validator/priv_validator_test.go b/types/priv_validator/priv_validator_test.go index 33f8338d4..664d59cf2 100644 --- a/types/priv_validator/priv_validator_test.go +++ b/types/priv_validator/priv_validator_test.go @@ -11,13 +11,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" crypto "github.com/tendermint/go-crypto" - data "github.com/tendermint/go-wire/data" - "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" + + "github.com/tendermint/tendermint/types" ) func TestGenLoadValidator(t *testing.T) { - assert := assert.New(t) + assert, require := assert.New(t), require.New(t) _, tempFilePath := cmn.Tempfile("priv_validator_") privVal := GenPrivValidatorJSON(tempFilePath) @@ -25,24 +25,33 @@ func TestGenLoadValidator(t *testing.T) { height := int64(100) privVal.LastSignedInfo.Height = height privVal.Save() - addr := privVal.Address() + addr, err := privVal.Address() + require.Nil(err) privVal = LoadPrivValidatorJSON(tempFilePath) - assert.Equal(addr, privVal.Address(), "expected privval addr to be the same") + pAddr, err := privVal.Address() + require.Nil(err) + + assert.Equal(addr, pAddr, "expected privval addr to be the same") assert.Equal(height, privVal.LastSignedInfo.Height, "expected privval.LastHeight to have been saved") } func TestLoadOrGenValidator(t *testing.T) { - assert := assert.New(t) + assert, require := assert.New(t), require.New(t) _, tempFilePath := cmn.Tempfile("priv_validator_") if err := os.Remove(tempFilePath); err != nil { t.Error(err) } privVal := LoadOrGenPrivValidatorJSON(tempFilePath) - addr := privVal.Address() + addr, err := privVal.Address() + require.Nil(err) + privVal = LoadOrGenPrivValidatorJSON(tempFilePath) - assert.Equal(addr, privVal.Address(), "expected privval addr to be the same") + pAddr, err := privVal.Address() + require.Nil(err) + + assert.Equal(addr, pAddr, "expected privval addr to be the same") } func TestUnmarshalValidator(t *testing.T) { @@ -87,8 +96,14 @@ func TestUnmarshalValidator(t *testing.T) { require.Nil(err, "%+v", err) // make sure the values match - assert.EqualValues(addrBytes, val.Address()) - assert.EqualValues(pubKey, val.PubKey()) + vAddr, err := val.Address() + require.Nil(err) + + pKey, err := val.PubKey() + require.Nil(err) + + assert.EqualValues(addrBytes, vAddr) + assert.EqualValues(pubKey, pKey) assert.EqualValues(privKey, val.PrivKey) // export it and make sure it is the same @@ -98,7 +113,7 @@ func TestUnmarshalValidator(t *testing.T) { } func TestSignVote(t *testing.T) { - assert := assert.New(t) + assert, require := assert.New(t), require.New(t) _, tempFilePath := cmn.Tempfile("priv_validator_") privVal := GenPrivValidatorJSON(tempFilePath) @@ -109,8 +124,11 @@ func TestSignVote(t *testing.T) { voteType := types.VoteTypePrevote // sign a vote for first time - vote := newVote(privVal.Address(), 0, height, round, voteType, block1) - err := privVal.SignVote("mychainid", vote) + addr, err := privVal.Address() + require.Nil(err) + + vote := newVote(addr, 0, height, round, voteType, block1) + err = privVal.SignVote("mychainid", vote) assert.NoError(err, "expected no error signing vote") // try to sign the same vote again; should be fine @@ -119,10 +137,10 @@ func TestSignVote(t *testing.T) { // now try some bad votes cases := []*types.Vote{ - newVote(privVal.Address(), 0, height, round-1, voteType, block1), // round regression - newVote(privVal.Address(), 0, height-1, round, voteType, block1), // height regression - newVote(privVal.Address(), 0, height-2, round+4, voteType, block1), // height regression and different round - newVote(privVal.Address(), 0, height, round, voteType, block2), // different block + newVote(addr, 0, height, round-1, voteType, block1), // round regression + newVote(addr, 0, height-1, round, voteType, block1), // height regression + newVote(addr, 0, height-2, round+4, voteType, block1), // height regression and different round + newVote(addr, 0, height, round, voteType, block2), // different block } for _, c := range cases { @@ -179,6 +197,8 @@ func TestSignProposal(t *testing.T) { } func TestDifferByTimestamp(t *testing.T) { + require := require.New(t) + _, tempFilePath := cmn.Tempfile("priv_validator_") privVal := GenPrivValidatorJSON(tempFilePath) @@ -208,10 +228,13 @@ func TestDifferByTimestamp(t *testing.T) { // test vote { + addr, err := privVal.Address() + require.Nil(err) + voteType := types.VoteTypePrevote blockID := types.BlockID{[]byte{1, 2, 3}, types.PartSetHeader{}} - vote := newVote(privVal.Address(), 0, height, round, voteType, blockID) - err := privVal.SignVote("mychainid", vote) + vote := newVote(addr, 0, height, round, voteType, blockID) + err = privVal.SignVote("mychainid", vote) assert.NoError(t, err, "expected no error signing vote") signBytes := types.SignBytes(chainID, vote) @@ -230,7 +253,7 @@ func TestDifferByTimestamp(t *testing.T) { } } -func newVote(addr data.Bytes, idx int, height int64, round int, typ byte, blockID types.BlockID) *types.Vote { +func newVote(addr cmn.HexBytes, idx int, height int64, round int, typ byte, blockID types.BlockID) *types.Vote { return &types.Vote{ ValidatorAddress: addr, ValidatorIndex: idx, diff --git a/types/priv_validator/sign_info.go b/types/priv_validator/sign_info.go index 1a3dae442..60113c570 100644 --- a/types/priv_validator/sign_info.go +++ b/types/priv_validator/sign_info.go @@ -8,8 +8,8 @@ import ( "time" crypto "github.com/tendermint/go-crypto" - data "github.com/tendermint/go-wire/data" "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tmlibs/common" ) // TODO: type ? @@ -40,7 +40,7 @@ type LastSignedInfo struct { Round int `json:"round"` Step int8 `json:"step"` Signature crypto.Signature `json:"signature,omitempty"` // so we dont lose signatures - SignBytes data.Bytes `json:"signbytes,omitempty"` // so we dont lose signatures + SignBytes cmn.HexBytes `json:"signbytes,omitempty"` // so we dont lose signatures } func NewLastSignedInfo() *LastSignedInfo { diff --git a/types/priv_validator/socket.go b/types/priv_validator/socket.go index 9fdc8c205..a6b4b87bf 100644 --- a/types/priv_validator/socket.go +++ b/types/priv_validator/socket.go @@ -1,68 +1,87 @@ package types import ( - "bytes" "fmt" + "io" "net" "time" + "github.com/pkg/errors" crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" - "github.com/tendermint/go-wire/data" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + "golang.org/x/net/netutil" + p2pconn "github.com/tendermint/tendermint/p2p/conn" "github.com/tendermint/tendermint/types" ) +const ( + connDeadlineSeconds = 3 + dialRetryIntervalSeconds = 1 + dialRetryMax = 10 +) + +// Socket errors. +var ( + ErrDialRetryMax = errors.New("Error max client retries") +) + +var ( + connDeadline = time.Second * connDeadlineSeconds +) + //----------------------------------------------------------------- -var _ types.PrivValidator = (*PrivValidatorSocketClient)(nil) +var _ types.PrivValidator2 = (*PrivValidatorSocketClient)(nil) // PrivValidatorSocketClient implements PrivValidator. // It uses a socket to request signatures. type PrivValidatorSocketClient struct { cmn.BaseService - conn net.Conn + conn net.Conn + privKey *crypto.PrivKeyEd25519 ID types.ValidatorID SocketAddress string } -const ( - dialRetryIntervalSeconds = 1 -) - -// NewPrivValidatorSocket returns an instance of PrivValidatorSocket. -func NewPrivValidatorSocketClient(logger log.Logger, socketAddr string) *PrivValidatorSocketClient { +// NewPrivValidatorSocketClient returns an instance of +// PrivValidatorSocketClient. +func NewPrivValidatorSocketClient( + logger log.Logger, + socketAddr string, + privKey *crypto.PrivKeyEd25519, +) *PrivValidatorSocketClient { pvsc := &PrivValidatorSocketClient{ SocketAddress: socketAddr, + privKey: privKey, } + pvsc.BaseService = *cmn.NewBaseService(logger, "privValidatorSocketClient", pvsc) + return pvsc } +// OnStart implements cmn.Service. func (pvsc *PrivValidatorSocketClient) OnStart() error { if err := pvsc.BaseService.OnStart(); err != nil { return err } - var err error - var conn net.Conn -RETRY_LOOP: - for { - conn, err = cmn.Connect(pvsc.SocketAddress) - if err != nil { - pvsc.Logger.Error(fmt.Sprintf("PrivValidatorSocket failed to connect to %v. Retrying...", pvsc.SocketAddress)) - time.Sleep(time.Second * dialRetryIntervalSeconds) - continue RETRY_LOOP - } - pvsc.conn = conn - return nil + conn, err := pvsc.connect() + if err != nil { + return err } + + pvsc.conn = conn + + return nil } +// OnStop implements cmn.Service. func (pvsc *PrivValidatorSocketClient) OnStop() { pvsc.BaseService.OnStop() @@ -71,46 +90,130 @@ func (pvsc *PrivValidatorSocketClient) OnStop() { } } -func (pvsc *PrivValidatorSocketClient) Address() data.Bytes { - pubKey := pvsc.PubKey() - return pubKey.Address() +// Address is an alias for PubKey().Address(). +func (pvsc *PrivValidatorSocketClient) Address() (cmn.HexBytes, error) { + p, err := pvsc.PubKey() + if err != nil { + return nil, err + } + + return p.Address(), nil } -func (pvsc *PrivValidatorSocketClient) PubKey() crypto.PubKey { - res, err := readWrite(pvsc.conn, PubKeyMsg{}) +// PubKey implements PrivValidator. +func (pvsc *PrivValidatorSocketClient) PubKey() (crypto.PubKey, error) { + err := writeMsg(pvsc.conn, &PubKeyMsg{}) + if err != nil { + return crypto.PubKey{}, err + } + + res, err := readMsg(pvsc.conn) if err != nil { - panic(err) + return crypto.PubKey{}, err } - return res.(PubKeyMsg).PubKey + + return res.(*PubKeyMsg).PubKey, nil } +// SignVote implements PrivValidator. func (pvsc *PrivValidatorSocketClient) SignVote(chainID string, vote *types.Vote) error { - res, err := readWrite(pvsc.conn, SignVoteMsg{Vote: vote}) + err := writeMsg(pvsc.conn, &SignVoteMsg{Vote: vote}) + if err != nil { + return err + } + + res, err := readMsg(pvsc.conn) if err != nil { return err } - *vote = *res.(SignVoteMsg).Vote + + *vote = *res.(*SignVoteMsg).Vote + return nil } +// SignProposal implements PrivValidator. func (pvsc *PrivValidatorSocketClient) SignProposal(chainID string, proposal *types.Proposal) error { - res, err := readWrite(pvsc.conn, SignProposalMsg{Proposal: proposal}) + err := writeMsg(pvsc.conn, &SignProposalMsg{Proposal: proposal}) + if err != nil { + return err + } + + res, err := readMsg(pvsc.conn) if err != nil { return err } - *proposal = *res.(SignProposalMsg).Proposal + + *proposal = *res.(*SignProposalMsg).Proposal + return nil } +// SignHeartbeat implements PrivValidator. func (pvsc *PrivValidatorSocketClient) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error { - res, err := readWrite(pvsc.conn, SignHeartbeatMsg{Heartbeat: heartbeat}) + err := writeMsg(pvsc.conn, &SignHeartbeatMsg{Heartbeat: heartbeat}) + if err != nil { + return err + } + + res, err := readMsg(pvsc.conn) if err != nil { return err } - *heartbeat = *res.(SignHeartbeatMsg).Heartbeat + + *heartbeat = *res.(*SignHeartbeatMsg).Heartbeat + return nil } +func (pvsc *PrivValidatorSocketClient) connect() (net.Conn, error) { + retries := dialRetryMax + +RETRY_LOOP: + for retries > 0 { + if retries != dialRetryMax { + time.Sleep(time.Second * dialRetryIntervalSeconds) + } + + retries-- + + conn, err := cmn.Connect(pvsc.SocketAddress) + if err != nil { + pvsc.Logger.Error( + "pvsc connect", + "addr", pvsc.SocketAddress, + "err", errors.Wrap(err, "connection failed"), + ) + + continue RETRY_LOOP + } + + if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil { + pvsc.Logger.Error( + "pvsc connect", + "err", errors.Wrap(err, "setting connection timeout failed"), + ) + continue + } + + if pvsc.privKey != nil { + conn, err = p2pconn.MakeSecretConnection(conn, pvsc.privKey.Wrap()) + if err != nil { + pvsc.Logger.Error( + "pvsc connect", + "err", errors.Wrap(err, "encrypting connection failed"), + ) + + continue RETRY_LOOP + } + } + + return conn, nil + } + + return nil, ErrDialRetryMax +} + //--------------------------------------------------------- // PrivValidatorSocketServer implements PrivValidator. @@ -118,101 +221,145 @@ func (pvsc *PrivValidatorSocketClient) SignHeartbeat(chainID string, heartbeat * type PrivValidatorSocketServer struct { cmn.BaseService - conn net.Conn - proto, addr string - listener net.Listener + proto, addr string + listener net.Listener + maxConnections int + privKey *crypto.PrivKeyEd25519 privVal PrivValidator chainID string } -func NewPrivValidatorSocketServer(logger log.Logger, socketAddr, chainID string, privVal PrivValidator) *PrivValidatorSocketServer { +// NewPrivValidatorSocketServer returns an instance of +// PrivValidatorSocketServer. +func NewPrivValidatorSocketServer( + logger log.Logger, + chainID, socketAddr string, + maxConnections int, + privVal PrivValidator, + privKey *crypto.PrivKeyEd25519, +) *PrivValidatorSocketServer { proto, addr := cmn.ProtocolAndAddress(socketAddr) pvss := &PrivValidatorSocketServer{ - proto: proto, - addr: addr, - privVal: privVal, - chainID: chainID, + proto: proto, + addr: addr, + maxConnections: maxConnections, + privKey: privKey, + privVal: privVal, + chainID: chainID, } pvss.BaseService = *cmn.NewBaseService(logger, "privValidatorSocketServer", pvss) return pvss } +// OnStart implements cmn.Service. func (pvss *PrivValidatorSocketServer) OnStart() error { - if err := pvss.BaseService.OnStart(); err != nil { - return err - } ln, err := net.Listen(pvss.proto, pvss.addr) if err != nil { return err } - pvss.listener = ln - go pvss.acceptConnectionsRoutine() + + pvss.listener = netutil.LimitListener(ln, pvss.maxConnections) + + go pvss.acceptConnections() + return nil } +// OnStop implements cmn.Service. func (pvss *PrivValidatorSocketServer) OnStop() { - pvss.BaseService.OnStop() - if err := pvss.listener.Close(); err != nil { - pvss.Logger.Error("Error closing listener", "err", err) + if pvss.listener == nil { + return } - if err := pvss.conn.Close(); err != nil { - pvss.Logger.Error("Error closing connection", "conn", pvss.conn, "err", err) + if err := pvss.listener.Close(); err != nil { + pvss.Logger.Error("OnStop", "err", errors.Wrap(err, "closing listener failed")) } } -func (pvss *PrivValidatorSocketServer) acceptConnectionsRoutine() { +func (pvss *PrivValidatorSocketServer) acceptConnections() { for { - // Accept a connection - pvss.Logger.Info("Waiting for new connection...") - var err error - pvss.conn, err = pvss.listener.Accept() + conn, err := pvss.listener.Accept() if err != nil { if !pvss.IsRunning() { return // Ignore error from listener closing. } - pvss.Logger.Error("Failed to accept connection: " + err.Error()) + pvss.Logger.Error( + "accpetConnections", + "err", errors.Wrap(err, "failed to accept connection"), + ) continue } - pvss.Logger.Info("Accepted a new connection") - - // read/write - for { - if !pvss.IsRunning() { - return // Ignore error from listener closing. - } + if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil { + pvss.Logger.Error( + "acceptConnetions", + "err", errors.Wrap(err, "setting connection timeout failed"), + ) + continue + } - var n int - var err error - b := wire.ReadByteSlice(pvss.conn, 0, &n, &err) //XXX: no max - req_, err := decodeMsg(b) + if pvss.privKey != nil { + conn, err = p2pconn.MakeSecretConnection(conn, pvss.privKey.Wrap()) if err != nil { - panic(err) - } - var res PrivValidatorSocketMsg - switch req := req_.(type) { - case PubKeyMsg: - res = PubKeyMsg{pvss.privVal.PubKey()} - case SignVoteMsg: - pvss.privVal.SignVote(pvss.chainID, req.Vote) - res = SignVoteMsg{req.Vote} - case SignProposalMsg: - pvss.privVal.SignProposal(pvss.chainID, req.Proposal) - res = SignProposalMsg{req.Proposal} - case SignHeartbeatMsg: - pvss.privVal.SignHeartbeat(pvss.chainID, req.Heartbeat) - res = SignHeartbeatMsg{req.Heartbeat} - default: - panic(fmt.Sprintf("unknown msg: %v", req_)) + pvss.Logger.Error( + "acceptConnections", + "err", errors.Wrap(err, "secret connection failed"), + ) + continue } + } - b = wire.BinaryBytes(res) - _, err = pvss.conn.Write(b) - if err != nil { - panic(err) + go pvss.handleConnection(conn) + } +} + +func (pvss *PrivValidatorSocketServer) handleConnection(conn net.Conn) { + defer conn.Close() + + for { + if !pvss.IsRunning() { + return // Ignore error from listener closing. + } + + req, err := readMsg(conn) + if err != nil { + if err != io.EOF { + pvss.Logger.Error("handleConnection", "err", err) } + return + } + + var res PrivValidatorSocketMsg + + switch r := req.(type) { + case *PubKeyMsg: + var p crypto.PubKey + + p, err = pvss.privVal.PubKey() + res = &PubKeyMsg{p} + case *SignVoteMsg: + err = pvss.privVal.SignVote(pvss.chainID, r.Vote) + res = &SignVoteMsg{r.Vote} + case *SignProposalMsg: + err = pvss.privVal.SignProposal(pvss.chainID, r.Proposal) + res = &SignProposalMsg{r.Proposal} + case *SignHeartbeatMsg: + err = pvss.privVal.SignHeartbeat(pvss.chainID, r.Heartbeat) + res = &SignHeartbeatMsg{r.Heartbeat} + default: + err = fmt.Errorf("unknown msg: %v", r) + } + + if err != nil { + pvss.Logger.Error("handleConnection", "err", err) + return + } + + err = writeMsg(conn, res) + if err != nil { + pvss.Logger.Error("handleConnection", "err", err) + return } } } @@ -226,6 +373,8 @@ const ( msgTypeSignHeartbeat = byte(0x12) ) +// PrivValidatorSocketMsg is a message sent between PrivValidatorSocket client +// and server. type PrivValidatorSocketMsg interface{} var _ = wire.RegisterInterface( @@ -236,38 +385,53 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&SignHeartbeatMsg{}, msgTypeSignHeartbeat}, ) -func readWrite(conn net.Conn, req PrivValidatorSocketMsg) (res PrivValidatorSocketMsg, err error) { - b := wire.BinaryBytes(req) - _, err = conn.Write(b) - if err != nil { - return nil, err - } - - var n int - b = wire.ReadByteSlice(conn, 0, &n, &err) //XXX: no max - return decodeMsg(b) -} - -func decodeMsg(bz []byte) (msg PrivValidatorSocketMsg, err error) { - n := new(int) - r := bytes.NewReader(bz) - msgI := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, n, &err) - msg = msgI.(struct{ PrivValidatorSocketMsg }).PrivValidatorSocketMsg - return msg, err -} - +// PubKeyMsg is a PrivValidatorSocket message containing the public key. type PubKeyMsg struct { PubKey crypto.PubKey } +// SignVoteMsg is a PrivValidatorSocket message containing a vote. type SignVoteMsg struct { Vote *types.Vote } +// SignProposalMsg is a PrivValidatorSocket message containing a Proposal. type SignProposalMsg struct { Proposal *types.Proposal } +// SignHeartbeatMsg is a PrivValidatorSocket message containing a Heartbeat. type SignHeartbeatMsg struct { Heartbeat *types.Heartbeat } + +func readMsg(r io.Reader) (PrivValidatorSocketMsg, error) { + var ( + n int + err error + ) + + read := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, &n, &err) + if err != nil { + return nil, err + } + + w, ok := read.(struct{ PrivValidatorSocketMsg }) + if !ok { + return nil, errors.New("unknwon type") + } + + return w.PrivValidatorSocketMsg, nil +} + +func writeMsg(w io.Writer, msg interface{}) error { + var ( + err error + n int + ) + + // TODO(xla): This extra wrap should be gone with the sdk-2 update. + wire.WriteBinary(struct{ PrivValidatorSocketMsg }{msg}, w, &n, &err) + + return err +} diff --git a/types/priv_validator/socket_test.go b/types/priv_validator/socket_test.go new file mode 100644 index 000000000..9a290abb4 --- /dev/null +++ b/types/priv_validator/socket_test.go @@ -0,0 +1,148 @@ +package types + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + crypto "github.com/tendermint/go-crypto" + "github.com/tendermint/tmlibs/log" + + "github.com/tendermint/tendermint/types" +) + +func TestPrivValidatorSocketServer(t *testing.T) { + var ( + assert, require = assert.New(t), require.New(t) + chainID = "test-chain-secret" + logger = log.TestingLogger() + signer = types.GenSigner() + clientPrivKey = crypto.GenPrivKeyEd25519() + serverPrivKey = crypto.GenPrivKeyEd25519() + privVal = NewTestPrivValidator(signer) + pvss = NewPrivValidatorSocketServer( + logger, + chainID, + "127.0.0.1:0", + 1, + privVal, + &serverPrivKey, + ) + ) + + err := pvss.Start() + require.Nil(err) + defer pvss.Stop() + + assert.True(pvss.IsRunning()) + + pvsc := NewPrivValidatorSocketClient( + logger, + pvss.listener.Addr().String(), + &clientPrivKey, + ) + + err = pvsc.Start() + require.Nil(err) + defer pvsc.Stop() + + assert.True(pvsc.IsRunning()) + + cAddr, err := pvsc.Address() + require.Nil(err) + + sAddr, err := pvss.privVal.Address() + require.Nil(err) + + assert.Equal(cAddr, sAddr) + + cKey, err := pvsc.PubKey() + require.Nil(err) + + sKey, err := pvss.privVal.PubKey() + require.Nil(err) + + assert.Equal(cKey, sKey) + + err = pvsc.SignProposal(chainID, &types.Proposal{ + Timestamp: time.Now(), + }) + require.Nil(err) + + err = pvsc.SignVote(chainID, &types.Vote{ + Timestamp: time.Now(), + Type: types.VoteTypePrecommit, + }) + require.Nil(err) + + err = pvsc.SignHeartbeat(chainID, &types.Heartbeat{}) + require.Nil(err) +} + +func TestPrivValidatorSocketServerWithoutSecret(t *testing.T) { + var ( + assert, require = assert.New(t), require.New(t) + chainID = "test-chain-secret" + logger = log.TestingLogger() + signer = types.GenSigner() + privVal = NewTestPrivValidator(signer) + pvss = NewPrivValidatorSocketServer( + logger, + chainID, + "127.0.0.1:0", + 1, + privVal, + nil, + ) + ) + + err := pvss.Start() + require.Nil(err) + defer pvss.Stop() + + assert.True(pvss.IsRunning()) + + pvsc := NewPrivValidatorSocketClient( + logger, + pvss.listener.Addr().String(), + nil, + ) + + err = pvsc.Start() + require.Nil(err) + defer pvsc.Stop() + + assert.True(pvsc.IsRunning()) + + cAddr, err := pvsc.Address() + require.Nil(err) + + sAddr, err := pvss.privVal.Address() + require.Nil(err) + + assert.Equal(cAddr, sAddr) + + cKey, err := pvsc.PubKey() + require.Nil(err) + + sKey, err := pvss.privVal.PubKey() + require.Nil(err) + + assert.Equal(cKey, sKey) + + err = pvsc.SignProposal(chainID, &types.Proposal{ + Timestamp: time.Now(), + }) + require.Nil(err) + + err = pvsc.SignVote(chainID, &types.Vote{ + Timestamp: time.Now(), + Type: types.VoteTypePrecommit, + }) + require.Nil(err) + + err = pvsc.SignHeartbeat(chainID, &types.Heartbeat{}) + require.Nil(err) +} diff --git a/types/priv_validator/unencrypted.go b/types/priv_validator/unencrypted.go index e9b0bb880..3ef38eeca 100644 --- a/types/priv_validator/unencrypted.go +++ b/types/priv_validator/unencrypted.go @@ -4,13 +4,13 @@ import ( "fmt" crypto "github.com/tendermint/go-crypto" - data "github.com/tendermint/go-wire/data" "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tmlibs/common" ) //----------------------------------------------------------------- -var _ types.PrivValidator = (*PrivValidatorUnencrypted)(nil) +var _ types.PrivValidator2 = (*PrivValidatorUnencrypted)(nil) // PrivValidatorUnencrypted implements PrivValidator. // It uses an in-memory crypto.PrivKey that is @@ -35,15 +35,20 @@ func NewPrivValidatorUnencrypted(priv crypto.PrivKey) *PrivValidatorUnencrypted // String returns a string representation of the PrivValidatorUnencrypted func (upv *PrivValidatorUnencrypted) String() string { - return fmt.Sprintf("PrivValidator{%v %v}", upv.Address(), upv.LastSignedInfo.String()) + addr, err := upv.Address() + if err != nil { + panic(err) + } + + return fmt.Sprintf("PrivValidator{%v %v}", addr, upv.LastSignedInfo.String()) } -func (upv *PrivValidatorUnencrypted) Address() data.Bytes { - return upv.PrivKey.PubKey().Address() +func (upv *PrivValidatorUnencrypted) Address() (cmn.HexBytes, error) { + return upv.PrivKey.PubKey().Address(), nil } -func (upv *PrivValidatorUnencrypted) PubKey() crypto.PubKey { - return upv.PrivKey.PubKey() +func (upv *PrivValidatorUnencrypted) PubKey() (crypto.PubKey, error) { + return upv.PrivKey.PubKey(), nil } func (upv *PrivValidatorUnencrypted) SignVote(chainID string, vote *types.Vote) error { diff --git a/types/priv_validator/upgrade.go b/types/priv_validator/upgrade.go index 6d5c472ab..063655421 100644 --- a/types/priv_validator/upgrade.go +++ b/types/priv_validator/upgrade.go @@ -5,18 +5,18 @@ import ( "io/ioutil" crypto "github.com/tendermint/go-crypto" - data "github.com/tendermint/go-wire/data" "github.com/tendermint/tendermint/types" + cmn "github.com/tendermint/tmlibs/common" ) type PrivValidatorV1 struct { - Address data.Bytes `json:"address"` + Address cmn.HexBytes `json:"address"` PubKey crypto.PubKey `json:"pub_key"` LastHeight int64 `json:"last_height"` LastRound int `json:"last_round"` LastStep int8 `json:"last_step"` LastSignature crypto.Signature `json:"last_signature,omitempty"` // so we dont lose signatures - LastSignBytes data.Bytes `json:"last_signbytes,omitempty"` // so we dont lose signatures + LastSignBytes cmn.HexBytes `json:"last_signbytes,omitempty"` // so we dont lose signatures PrivKey crypto.PrivKey `json:"priv_key"` } diff --git a/types/proposal_test.go b/types/proposal_test.go index 0d2af71e2..6fbfbba05 100644 --- a/types/proposal_test.go +++ b/types/proposal_test.go @@ -12,7 +12,7 @@ import ( var testProposal *Proposal func init() { - var stamp, err = time.Parse(timeFormat, "2018-02-11T07:09:22.765Z") + var stamp, err = time.Parse(TimeFormat, "2018-02-11T07:09:22.765Z") if err != nil { panic(err) } diff --git a/types/vote_test.go b/types/vote_test.go index 51eca12de..5e2d5c0df 100644 --- a/types/vote_test.go +++ b/types/vote_test.go @@ -18,7 +18,7 @@ func examplePrecommit() *Vote { } func exampleVote(t byte) *Vote { - var stamp, err = time.Parse(timeFormat, "2017-12-25T03:00:01.234Z") + var stamp, err = time.Parse(TimeFormat, "2017-12-25T03:00:01.234Z") if err != nil { panic(err) }