Browse Source

Merge pull request #1204 from tendermint/feature/priv_val_sockets

Feature/priv val sockets
pull/1237/head
Ethan Buchman 6 years ago
committed by GitHub
parent
commit
ffd2483e67
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 655 additions and 171 deletions
  1. +47
    -0
      cmd/priv_val_server/main.go
  2. +4
    -0
      cmd/tendermint/commands/run_node.go
  3. +24
    -15
      config/config.go
  4. +2
    -1
      docs/architecture/adr-008-priv-validator.md
  5. +19
    -1
      node/node.go
  6. +3
    -3
      types/canonical_json.go
  7. +52
    -2
      types/priv_validator.go
  8. +19
    -4
      types/priv_validator/json.go
  9. +43
    -20
      types/priv_validator/priv_validator_test.go
  10. +2
    -2
      types/priv_validator/sign_info.go
  11. +275
    -111
      types/priv_validator/socket.go
  12. +148
    -0
      types/priv_validator/socket_test.go
  13. +12
    -7
      types/priv_validator/unencrypted.go
  14. +3
    -3
      types/priv_validator/upgrade.go
  15. +1
    -1
      types/proposal_test.go
  16. +1
    -1
      types/vote_test.go

+ 47
- 0
cmd/priv_val_server/main.go View File

@ -0,0 +1,47 @@
package main
import (
"flag"
"os"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
priv_val "github.com/tendermint/tendermint/types/priv_validator"
)
func main() {
var (
chainID = flag.String("chain-id", "mychain", "chain id")
listenAddr = flag.String("laddr", ":46659", "Validator listen address (0.0.0.0:0 means any interface, any port")
maxConn = flag.Int("clients", 3, "maximum of concurrent connections")
privValPath = flag.String("priv", "", "priv val file path")
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "priv_val")
)
flag.Parse()
logger.Info(
"Starting private validator",
"chainID", *chainID,
"listenAddr", *listenAddr,
"maxConn", *maxConn,
"privPath", *privValPath,
)
privVal := priv_val.LoadPrivValidatorJSON(*privValPath)
pvss := priv_val.NewPrivValidatorSocketServer(
logger,
*chainID,
*listenAddr,
*maxConn,
privVal,
nil,
)
pvss.Start()
cmn.TrapSignal(func() {
pvss.Stop()
})
}

+ 4
- 0
cmd/tendermint/commands/run_node.go View File

@ -14,6 +14,10 @@ func AddNodeFlags(cmd *cobra.Command) {
// bind flags
cmd.Flags().String("moniker", config.Moniker, "Node Name")
// priv val flags
cmd.Flags().String("priv_validator_addr", config.PrivValidatorAddr, "Socket address for private validator")
cmd.Flags().Int("priv_validator_max_conn", config.PrivValidatorMaxConn, "Limit of concurrent connections to the PrivValidator")
// node flags
cmd.Flags().Bool("fast_sync", config.FastSync, "Fast blockchain syncing")


+ 24
- 15
config/config.go View File

@ -20,9 +20,11 @@ var (
defaultConfigFileName = "config.toml"
defaultGenesisJSONName = "genesis.json"
defaultPrivValName = "priv_validator.json"
defaultNodeKeyName = "node_key.json"
defaultAddrBookName = "addrbook.json"
defaultPrivValName = "priv_validator.json"
defaultPrivValMaxConn = 3
defaultNodeKeyName = "node_key.json"
defaultAddrBookName = "addrbook.json"
defaultConfigFilePath = filepath.Join(defaultConfigDir, defaultConfigFileName)
defaultGenesisJSONPath = filepath.Join(defaultConfigDir, defaultGenesisJSONName)
@ -103,6 +105,12 @@ type BaseConfig struct {
// A custom human readable name for this node
Moniker string `mapstructure:"moniker"`
// TCP or UNIX socket address of the PrivValidator server
PrivValidatorAddr string `mapstructure:"priv_validator_addr"`
// Limit of concurrent connections to the PrivValidator.
PrivValidatorMaxConn int `mapstructure:"priv_validator_max_conn"`
// TCP or UNIX socket address of the ABCI application,
// or the name of an ABCI application compiled in with the Tendermint binary
ProxyApp string `mapstructure:"proxy_app"`
@ -139,18 +147,19 @@ func (c BaseConfig) ChainID() string {
// DefaultBaseConfig returns a default base configuration for a Tendermint node
func DefaultBaseConfig() BaseConfig {
return BaseConfig{
Genesis: defaultGenesisJSONPath,
PrivValidator: defaultPrivValPath,
NodeKey: defaultNodeKeyPath,
Moniker: defaultMoniker,
ProxyApp: "tcp://127.0.0.1:46658",
ABCI: "socket",
LogLevel: DefaultPackageLogLevels(),
ProfListenAddress: "",
FastSync: true,
FilterPeers: false,
DBBackend: "leveldb",
DBPath: "data",
Genesis: defaultGenesisJSONPath,
PrivValidator: defaultPrivValPath,
PrivValidatorMaxConn: defaultPrivValMaxConn,
NodeKey: defaultNodeKeyPath,
Moniker: defaultMoniker,
ProxyApp: "tcp://127.0.0.1:46658",
ABCI: "socket",
LogLevel: DefaultPackageLogLevels(),
ProfListenAddress: "",
FastSync: true,
FilterPeers: false,
DBBackend: "leveldb",
DBPath: "data",
}
}


+ 2
- 1
docs/architecture/adr-008-priv-validator.md View File

@ -8,7 +8,7 @@ For instance, see https://github.com/tendermint/tendermint/issues/673
The goal is to have a clean PrivValidator interface like:
``
```
type PrivValidator interface {
Address() data.Bytes
PubKey() crypto.PubKey
@ -60,6 +60,7 @@ type ValidatorID struct {
Address data.Bytes `json:"address"`
PubKey crypto.PubKey `json:"pub_key"`
}
```
### LastSignedInfo


+ 19
- 1
node/node.go View File

@ -34,6 +34,7 @@ import (
"github.com/tendermint/tendermint/state/txindex/kv"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types"
priv_val "github.com/tendermint/tendermint/types/priv_validator"
"github.com/tendermint/tendermint/version"
_ "net/http/pprof"
@ -82,7 +83,8 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
DefaultGenesisDocProviderFunc(config),
DefaultDBProvider,
logger)
logger,
)
}
//------------------------------------------------------------------------------
@ -171,6 +173,22 @@ func NewNode(config *cfg.Config,
// reload the state (it may have been updated by the handshake)
state = sm.LoadState(stateDB)
if config.PrivValidatorAddr != "" {
var (
privKey = crypto.GenPrivKeyEd25519()
pvss = priv_val.NewPrivValidatorSocketServer(
logger.With("module", "priv_val"),
config.ChainID(),
config.PrivValidatorAddr,
config.PrivValidatorMaxConn,
priv_val.LoadPrivValidatorJSON(config.PrivValidatorFile()),
&privKey,
)
)
pvss.Start()
}
// Decide whether to fast-sync or not
// We don't fast-sync when the only validator is us.
fastSync := config.FastSync


+ 3
- 3
types/canonical_json.go View File

@ -9,8 +9,8 @@ import (
// canonical json is go-wire's json for structs with fields in alphabetical order
// timeFormat is used for generating the sigs
const timeFormat = wire.RFC3339Millis
// TimeFormat is used for generating the sigs
const TimeFormat = wire.RFC3339Millis
type CanonicalJSONBlockID struct {
Hash cmn.HexBytes `json:"hash,omitempty"`
@ -117,5 +117,5 @@ func CanonicalTime(t time.Time) string {
// note that sending time over go-wire resets it to
// local time, we need to force UTC here, so the
// signatures match
return t.UTC().Format(timeFormat)
return t.UTC().Format(TimeFormat)
}

+ 52
- 2
types/priv_validator.go View File

@ -34,6 +34,56 @@ func voteToStep(vote *Vote) int8 {
}
}
//--------------------------------------------------------------
// PrivValidator is being upgraded! See types/priv_validator
// ValidatorID contains the identity of the validator.
type ValidatorID struct {
Address cmn.HexBytes `json:"address"`
PubKey crypto.PubKey `json:"pub_key"`
}
// PrivValidator defines the functionality of a local Tendermint validator
// that signs votes, proposals, and heartbeats, and never double signs.
type PrivValidator2 interface {
Address() (Address, error) // redundant since .PubKey().Address()
PubKey() (crypto.PubKey, error)
SignVote(chainID string, vote *Vote) error
SignProposal(chainID string, proposal *Proposal) error
SignHeartbeat(chainID string, heartbeat *Heartbeat) error
}
type TestSigner interface {
Address() cmn.HexBytes
PubKey() crypto.PubKey
Sign([]byte) (crypto.Signature, error)
}
func GenSigner() TestSigner {
return &DefaultTestSigner{
crypto.GenPrivKeyEd25519().Wrap(),
}
}
type DefaultTestSigner struct {
crypto.PrivKey
}
func (ds *DefaultTestSigner) Address() cmn.HexBytes {
return ds.PubKey().Address()
}
func (ds *DefaultTestSigner) PubKey() crypto.PubKey {
return ds.PrivKey.PubKey()
}
func (ds *DefaultTestSigner) Sign(msg []byte) (crypto.Signature, error) {
return ds.PrivKey.Sign(msg), nil
}
//--------------------------------------------------------------
// PrivValidator defines the functionality of a local Tendermint validator
// that signs votes, proposals, and heartbeats, and never double signs.
type PrivValidator interface {
@ -379,7 +429,7 @@ func checkVotesOnlyDifferByTimestamp(lastSignBytes, newSignBytes []byte) (time.T
panic(fmt.Sprintf("signBytes cannot be unmarshalled into vote: %v", err))
}
lastTime, err := time.Parse(timeFormat, lastVote.Vote.Timestamp)
lastTime, err := time.Parse(TimeFormat, lastVote.Vote.Timestamp)
if err != nil {
panic(err)
}
@ -405,7 +455,7 @@ func checkProposalsOnlyDifferByTimestamp(lastSignBytes, newSignBytes []byte) (ti
panic(fmt.Sprintf("signBytes cannot be unmarshalled into proposal: %v", err))
}
lastTime, err := time.Parse(timeFormat, lastProposal.Proposal.Timestamp)
lastTime, err := time.Parse(TimeFormat, lastProposal.Proposal.Timestamp)
if err != nil {
panic(err)
}


+ 19
- 4
types/priv_validator/json.go View File

@ -13,7 +13,7 @@ import (
)
// PrivValidator aliases types.PrivValidator
type PrivValidator = types.PrivValidator
type PrivValidator = types.PrivValidator2
//-----------------------------------------------------
@ -42,7 +42,7 @@ func (pk *PrivKey) UnmarshalJSON(b []byte) error {
//-----------------------------------------------------
var _ types.PrivValidator = (*PrivValidatorJSON)(nil)
var _ types.PrivValidator2 = (*PrivValidatorJSON)(nil)
// PrivValidatorJSON wraps PrivValidatorUnencrypted
// and persists it to disk after every SignVote and SignProposal.
@ -76,7 +76,12 @@ func (pvj *PrivValidatorJSON) SignProposal(chainID string, proposal *types.Propo
// String returns a string representation of the PrivValidatorJSON.
func (pvj *PrivValidatorJSON) String() string {
return fmt.Sprintf("PrivValidator{%v %v}", pvj.Address(), pvj.PrivValidatorUnencrypted.String())
addr, err := pvj.Address()
if err != nil {
panic(err)
}
return fmt.Sprintf("PrivValidator{%v %v}", addr, pvj.PrivValidatorUnencrypted.String())
}
func (pvj *PrivValidatorJSON) Save() {
@ -170,7 +175,17 @@ func (pvs PrivValidatorsByAddress) Len() int {
}
func (pvs PrivValidatorsByAddress) Less(i, j int) bool {
return bytes.Compare(pvs[i].Address(), pvs[j].Address()) == -1
iaddr, err := pvs[j].Address()
if err != nil {
panic(err)
}
jaddr, err := pvs[i].Address()
if err != nil {
panic(err)
}
return bytes.Compare(iaddr, jaddr) == -1
}
func (pvs PrivValidatorsByAddress) Swap(i, j int) {


+ 43
- 20
types/priv_validator/priv_validator_test.go View File

@ -11,13 +11,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
crypto "github.com/tendermint/go-crypto"
data "github.com/tendermint/go-wire/data"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tendermint/types"
)
func TestGenLoadValidator(t *testing.T) {
assert := assert.New(t)
assert, require := assert.New(t), require.New(t)
_, tempFilePath := cmn.Tempfile("priv_validator_")
privVal := GenPrivValidatorJSON(tempFilePath)
@ -25,24 +25,33 @@ func TestGenLoadValidator(t *testing.T) {
height := int64(100)
privVal.LastSignedInfo.Height = height
privVal.Save()
addr := privVal.Address()
addr, err := privVal.Address()
require.Nil(err)
privVal = LoadPrivValidatorJSON(tempFilePath)
assert.Equal(addr, privVal.Address(), "expected privval addr to be the same")
pAddr, err := privVal.Address()
require.Nil(err)
assert.Equal(addr, pAddr, "expected privval addr to be the same")
assert.Equal(height, privVal.LastSignedInfo.Height, "expected privval.LastHeight to have been saved")
}
func TestLoadOrGenValidator(t *testing.T) {
assert := assert.New(t)
assert, require := assert.New(t), require.New(t)
_, tempFilePath := cmn.Tempfile("priv_validator_")
if err := os.Remove(tempFilePath); err != nil {
t.Error(err)
}
privVal := LoadOrGenPrivValidatorJSON(tempFilePath)
addr := privVal.Address()
addr, err := privVal.Address()
require.Nil(err)
privVal = LoadOrGenPrivValidatorJSON(tempFilePath)
assert.Equal(addr, privVal.Address(), "expected privval addr to be the same")
pAddr, err := privVal.Address()
require.Nil(err)
assert.Equal(addr, pAddr, "expected privval addr to be the same")
}
func TestUnmarshalValidator(t *testing.T) {
@ -87,8 +96,14 @@ func TestUnmarshalValidator(t *testing.T) {
require.Nil(err, "%+v", err)
// make sure the values match
assert.EqualValues(addrBytes, val.Address())
assert.EqualValues(pubKey, val.PubKey())
vAddr, err := val.Address()
require.Nil(err)
pKey, err := val.PubKey()
require.Nil(err)
assert.EqualValues(addrBytes, vAddr)
assert.EqualValues(pubKey, pKey)
assert.EqualValues(privKey, val.PrivKey)
// export it and make sure it is the same
@ -98,7 +113,7 @@ func TestUnmarshalValidator(t *testing.T) {
}
func TestSignVote(t *testing.T) {
assert := assert.New(t)
assert, require := assert.New(t), require.New(t)
_, tempFilePath := cmn.Tempfile("priv_validator_")
privVal := GenPrivValidatorJSON(tempFilePath)
@ -109,8 +124,11 @@ func TestSignVote(t *testing.T) {
voteType := types.VoteTypePrevote
// sign a vote for first time
vote := newVote(privVal.Address(), 0, height, round, voteType, block1)
err := privVal.SignVote("mychainid", vote)
addr, err := privVal.Address()
require.Nil(err)
vote := newVote(addr, 0, height, round, voteType, block1)
err = privVal.SignVote("mychainid", vote)
assert.NoError(err, "expected no error signing vote")
// try to sign the same vote again; should be fine
@ -119,10 +137,10 @@ func TestSignVote(t *testing.T) {
// now try some bad votes
cases := []*types.Vote{
newVote(privVal.Address(), 0, height, round-1, voteType, block1), // round regression
newVote(privVal.Address(), 0, height-1, round, voteType, block1), // height regression
newVote(privVal.Address(), 0, height-2, round+4, voteType, block1), // height regression and different round
newVote(privVal.Address(), 0, height, round, voteType, block2), // different block
newVote(addr, 0, height, round-1, voteType, block1), // round regression
newVote(addr, 0, height-1, round, voteType, block1), // height regression
newVote(addr, 0, height-2, round+4, voteType, block1), // height regression and different round
newVote(addr, 0, height, round, voteType, block2), // different block
}
for _, c := range cases {
@ -179,6 +197,8 @@ func TestSignProposal(t *testing.T) {
}
func TestDifferByTimestamp(t *testing.T) {
require := require.New(t)
_, tempFilePath := cmn.Tempfile("priv_validator_")
privVal := GenPrivValidatorJSON(tempFilePath)
@ -208,10 +228,13 @@ func TestDifferByTimestamp(t *testing.T) {
// test vote
{
addr, err := privVal.Address()
require.Nil(err)
voteType := types.VoteTypePrevote
blockID := types.BlockID{[]byte{1, 2, 3}, types.PartSetHeader{}}
vote := newVote(privVal.Address(), 0, height, round, voteType, blockID)
err := privVal.SignVote("mychainid", vote)
vote := newVote(addr, 0, height, round, voteType, blockID)
err = privVal.SignVote("mychainid", vote)
assert.NoError(t, err, "expected no error signing vote")
signBytes := types.SignBytes(chainID, vote)
@ -230,7 +253,7 @@ func TestDifferByTimestamp(t *testing.T) {
}
}
func newVote(addr data.Bytes, idx int, height int64, round int, typ byte, blockID types.BlockID) *types.Vote {
func newVote(addr cmn.HexBytes, idx int, height int64, round int, typ byte, blockID types.BlockID) *types.Vote {
return &types.Vote{
ValidatorAddress: addr,
ValidatorIndex: idx,


+ 2
- 2
types/priv_validator/sign_info.go View File

@ -8,8 +8,8 @@ import (
"time"
crypto "github.com/tendermint/go-crypto"
data "github.com/tendermint/go-wire/data"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
)
// TODO: type ?
@ -40,7 +40,7 @@ type LastSignedInfo struct {
Round int `json:"round"`
Step int8 `json:"step"`
Signature crypto.Signature `json:"signature,omitempty"` // so we dont lose signatures
SignBytes data.Bytes `json:"signbytes,omitempty"` // so we dont lose signatures
SignBytes cmn.HexBytes `json:"signbytes,omitempty"` // so we dont lose signatures
}
func NewLastSignedInfo() *LastSignedInfo {


+ 275
- 111
types/priv_validator/socket.go View File

@ -1,68 +1,87 @@
package types
import (
"bytes"
"fmt"
"io"
"net"
"time"
"github.com/pkg/errors"
crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire"
"github.com/tendermint/go-wire/data"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
"golang.org/x/net/netutil"
p2pconn "github.com/tendermint/tendermint/p2p/conn"
"github.com/tendermint/tendermint/types"
)
const (
connDeadlineSeconds = 3
dialRetryIntervalSeconds = 1
dialRetryMax = 10
)
// Socket errors.
var (
ErrDialRetryMax = errors.New("Error max client retries")
)
var (
connDeadline = time.Second * connDeadlineSeconds
)
//-----------------------------------------------------------------
var _ types.PrivValidator = (*PrivValidatorSocketClient)(nil)
var _ types.PrivValidator2 = (*PrivValidatorSocketClient)(nil)
// PrivValidatorSocketClient implements PrivValidator.
// It uses a socket to request signatures.
type PrivValidatorSocketClient struct {
cmn.BaseService
conn net.Conn
conn net.Conn
privKey *crypto.PrivKeyEd25519
ID types.ValidatorID
SocketAddress string
}
const (
dialRetryIntervalSeconds = 1
)
// NewPrivValidatorSocket returns an instance of PrivValidatorSocket.
func NewPrivValidatorSocketClient(logger log.Logger, socketAddr string) *PrivValidatorSocketClient {
// NewPrivValidatorSocketClient returns an instance of
// PrivValidatorSocketClient.
func NewPrivValidatorSocketClient(
logger log.Logger,
socketAddr string,
privKey *crypto.PrivKeyEd25519,
) *PrivValidatorSocketClient {
pvsc := &PrivValidatorSocketClient{
SocketAddress: socketAddr,
privKey: privKey,
}
pvsc.BaseService = *cmn.NewBaseService(logger, "privValidatorSocketClient", pvsc)
return pvsc
}
// OnStart implements cmn.Service.
func (pvsc *PrivValidatorSocketClient) OnStart() error {
if err := pvsc.BaseService.OnStart(); err != nil {
return err
}
var err error
var conn net.Conn
RETRY_LOOP:
for {
conn, err = cmn.Connect(pvsc.SocketAddress)
if err != nil {
pvsc.Logger.Error(fmt.Sprintf("PrivValidatorSocket failed to connect to %v. Retrying...", pvsc.SocketAddress))
time.Sleep(time.Second * dialRetryIntervalSeconds)
continue RETRY_LOOP
}
pvsc.conn = conn
return nil
conn, err := pvsc.connect()
if err != nil {
return err
}
pvsc.conn = conn
return nil
}
// OnStop implements cmn.Service.
func (pvsc *PrivValidatorSocketClient) OnStop() {
pvsc.BaseService.OnStop()
@ -71,46 +90,130 @@ func (pvsc *PrivValidatorSocketClient) OnStop() {
}
}
func (pvsc *PrivValidatorSocketClient) Address() data.Bytes {
pubKey := pvsc.PubKey()
return pubKey.Address()
// Address is an alias for PubKey().Address().
func (pvsc *PrivValidatorSocketClient) Address() (cmn.HexBytes, error) {
p, err := pvsc.PubKey()
if err != nil {
return nil, err
}
return p.Address(), nil
}
func (pvsc *PrivValidatorSocketClient) PubKey() crypto.PubKey {
res, err := readWrite(pvsc.conn, PubKeyMsg{})
// PubKey implements PrivValidator.
func (pvsc *PrivValidatorSocketClient) PubKey() (crypto.PubKey, error) {
err := writeMsg(pvsc.conn, &PubKeyMsg{})
if err != nil {
return crypto.PubKey{}, err
}
res, err := readMsg(pvsc.conn)
if err != nil {
panic(err)
return crypto.PubKey{}, err
}
return res.(PubKeyMsg).PubKey
return res.(*PubKeyMsg).PubKey, nil
}
// SignVote implements PrivValidator.
func (pvsc *PrivValidatorSocketClient) SignVote(chainID string, vote *types.Vote) error {
res, err := readWrite(pvsc.conn, SignVoteMsg{Vote: vote})
err := writeMsg(pvsc.conn, &SignVoteMsg{Vote: vote})
if err != nil {
return err
}
res, err := readMsg(pvsc.conn)
if err != nil {
return err
}
*vote = *res.(SignVoteMsg).Vote
*vote = *res.(*SignVoteMsg).Vote
return nil
}
// SignProposal implements PrivValidator.
func (pvsc *PrivValidatorSocketClient) SignProposal(chainID string, proposal *types.Proposal) error {
res, err := readWrite(pvsc.conn, SignProposalMsg{Proposal: proposal})
err := writeMsg(pvsc.conn, &SignProposalMsg{Proposal: proposal})
if err != nil {
return err
}
res, err := readMsg(pvsc.conn)
if err != nil {
return err
}
*proposal = *res.(SignProposalMsg).Proposal
*proposal = *res.(*SignProposalMsg).Proposal
return nil
}
// SignHeartbeat implements PrivValidator.
func (pvsc *PrivValidatorSocketClient) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error {
res, err := readWrite(pvsc.conn, SignHeartbeatMsg{Heartbeat: heartbeat})
err := writeMsg(pvsc.conn, &SignHeartbeatMsg{Heartbeat: heartbeat})
if err != nil {
return err
}
res, err := readMsg(pvsc.conn)
if err != nil {
return err
}
*heartbeat = *res.(SignHeartbeatMsg).Heartbeat
*heartbeat = *res.(*SignHeartbeatMsg).Heartbeat
return nil
}
func (pvsc *PrivValidatorSocketClient) connect() (net.Conn, error) {
retries := dialRetryMax
RETRY_LOOP:
for retries > 0 {
if retries != dialRetryMax {
time.Sleep(time.Second * dialRetryIntervalSeconds)
}
retries--
conn, err := cmn.Connect(pvsc.SocketAddress)
if err != nil {
pvsc.Logger.Error(
"pvsc connect",
"addr", pvsc.SocketAddress,
"err", errors.Wrap(err, "connection failed"),
)
continue RETRY_LOOP
}
if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil {
pvsc.Logger.Error(
"pvsc connect",
"err", errors.Wrap(err, "setting connection timeout failed"),
)
continue
}
if pvsc.privKey != nil {
conn, err = p2pconn.MakeSecretConnection(conn, pvsc.privKey.Wrap())
if err != nil {
pvsc.Logger.Error(
"pvsc connect",
"err", errors.Wrap(err, "encrypting connection failed"),
)
continue RETRY_LOOP
}
}
return conn, nil
}
return nil, ErrDialRetryMax
}
//---------------------------------------------------------
// PrivValidatorSocketServer implements PrivValidator.
@ -118,101 +221,145 @@ func (pvsc *PrivValidatorSocketClient) SignHeartbeat(chainID string, heartbeat *
type PrivValidatorSocketServer struct {
cmn.BaseService
conn net.Conn
proto, addr string
listener net.Listener
proto, addr string
listener net.Listener
maxConnections int
privKey *crypto.PrivKeyEd25519
privVal PrivValidator
chainID string
}
func NewPrivValidatorSocketServer(logger log.Logger, socketAddr, chainID string, privVal PrivValidator) *PrivValidatorSocketServer {
// NewPrivValidatorSocketServer returns an instance of
// PrivValidatorSocketServer.
func NewPrivValidatorSocketServer(
logger log.Logger,
chainID, socketAddr string,
maxConnections int,
privVal PrivValidator,
privKey *crypto.PrivKeyEd25519,
) *PrivValidatorSocketServer {
proto, addr := cmn.ProtocolAndAddress(socketAddr)
pvss := &PrivValidatorSocketServer{
proto: proto,
addr: addr,
privVal: privVal,
chainID: chainID,
proto: proto,
addr: addr,
maxConnections: maxConnections,
privKey: privKey,
privVal: privVal,
chainID: chainID,
}
pvss.BaseService = *cmn.NewBaseService(logger, "privValidatorSocketServer", pvss)
return pvss
}
// OnStart implements cmn.Service.
func (pvss *PrivValidatorSocketServer) OnStart() error {
if err := pvss.BaseService.OnStart(); err != nil {
return err
}
ln, err := net.Listen(pvss.proto, pvss.addr)
if err != nil {
return err
}
pvss.listener = ln
go pvss.acceptConnectionsRoutine()
pvss.listener = netutil.LimitListener(ln, pvss.maxConnections)
go pvss.acceptConnections()
return nil
}
// OnStop implements cmn.Service.
func (pvss *PrivValidatorSocketServer) OnStop() {
pvss.BaseService.OnStop()
if err := pvss.listener.Close(); err != nil {
pvss.Logger.Error("Error closing listener", "err", err)
if pvss.listener == nil {
return
}
if err := pvss.conn.Close(); err != nil {
pvss.Logger.Error("Error closing connection", "conn", pvss.conn, "err", err)
if err := pvss.listener.Close(); err != nil {
pvss.Logger.Error("OnStop", "err", errors.Wrap(err, "closing listener failed"))
}
}
func (pvss *PrivValidatorSocketServer) acceptConnectionsRoutine() {
func (pvss *PrivValidatorSocketServer) acceptConnections() {
for {
// Accept a connection
pvss.Logger.Info("Waiting for new connection...")
var err error
pvss.conn, err = pvss.listener.Accept()
conn, err := pvss.listener.Accept()
if err != nil {
if !pvss.IsRunning() {
return // Ignore error from listener closing.
}
pvss.Logger.Error("Failed to accept connection: " + err.Error())
pvss.Logger.Error(
"accpetConnections",
"err", errors.Wrap(err, "failed to accept connection"),
)
continue
}
pvss.Logger.Info("Accepted a new connection")
// read/write
for {
if !pvss.IsRunning() {
return // Ignore error from listener closing.
}
if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil {
pvss.Logger.Error(
"acceptConnetions",
"err", errors.Wrap(err, "setting connection timeout failed"),
)
continue
}
var n int
var err error
b := wire.ReadByteSlice(pvss.conn, 0, &n, &err) //XXX: no max
req_, err := decodeMsg(b)
if pvss.privKey != nil {
conn, err = p2pconn.MakeSecretConnection(conn, pvss.privKey.Wrap())
if err != nil {
panic(err)
}
var res PrivValidatorSocketMsg
switch req := req_.(type) {
case PubKeyMsg:
res = PubKeyMsg{pvss.privVal.PubKey()}
case SignVoteMsg:
pvss.privVal.SignVote(pvss.chainID, req.Vote)
res = SignVoteMsg{req.Vote}
case SignProposalMsg:
pvss.privVal.SignProposal(pvss.chainID, req.Proposal)
res = SignProposalMsg{req.Proposal}
case SignHeartbeatMsg:
pvss.privVal.SignHeartbeat(pvss.chainID, req.Heartbeat)
res = SignHeartbeatMsg{req.Heartbeat}
default:
panic(fmt.Sprintf("unknown msg: %v", req_))
pvss.Logger.Error(
"acceptConnections",
"err", errors.Wrap(err, "secret connection failed"),
)
continue
}
}
b = wire.BinaryBytes(res)
_, err = pvss.conn.Write(b)
if err != nil {
panic(err)
go pvss.handleConnection(conn)
}
}
func (pvss *PrivValidatorSocketServer) handleConnection(conn net.Conn) {
defer conn.Close()
for {
if !pvss.IsRunning() {
return // Ignore error from listener closing.
}
req, err := readMsg(conn)
if err != nil {
if err != io.EOF {
pvss.Logger.Error("handleConnection", "err", err)
}
return
}
var res PrivValidatorSocketMsg
switch r := req.(type) {
case *PubKeyMsg:
var p crypto.PubKey
p, err = pvss.privVal.PubKey()
res = &PubKeyMsg{p}
case *SignVoteMsg:
err = pvss.privVal.SignVote(pvss.chainID, r.Vote)
res = &SignVoteMsg{r.Vote}
case *SignProposalMsg:
err = pvss.privVal.SignProposal(pvss.chainID, r.Proposal)
res = &SignProposalMsg{r.Proposal}
case *SignHeartbeatMsg:
err = pvss.privVal.SignHeartbeat(pvss.chainID, r.Heartbeat)
res = &SignHeartbeatMsg{r.Heartbeat}
default:
err = fmt.Errorf("unknown msg: %v", r)
}
if err != nil {
pvss.Logger.Error("handleConnection", "err", err)
return
}
err = writeMsg(conn, res)
if err != nil {
pvss.Logger.Error("handleConnection", "err", err)
return
}
}
}
@ -226,6 +373,8 @@ const (
msgTypeSignHeartbeat = byte(0x12)
)
// PrivValidatorSocketMsg is a message sent between PrivValidatorSocket client
// and server.
type PrivValidatorSocketMsg interface{}
var _ = wire.RegisterInterface(
@ -236,38 +385,53 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&SignHeartbeatMsg{}, msgTypeSignHeartbeat},
)
func readWrite(conn net.Conn, req PrivValidatorSocketMsg) (res PrivValidatorSocketMsg, err error) {
b := wire.BinaryBytes(req)
_, err = conn.Write(b)
if err != nil {
return nil, err
}
var n int
b = wire.ReadByteSlice(conn, 0, &n, &err) //XXX: no max
return decodeMsg(b)
}
func decodeMsg(bz []byte) (msg PrivValidatorSocketMsg, err error) {
n := new(int)
r := bytes.NewReader(bz)
msgI := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, n, &err)
msg = msgI.(struct{ PrivValidatorSocketMsg }).PrivValidatorSocketMsg
return msg, err
}
// PubKeyMsg is a PrivValidatorSocket message containing the public key.
type PubKeyMsg struct {
PubKey crypto.PubKey
}
// SignVoteMsg is a PrivValidatorSocket message containing a vote.
type SignVoteMsg struct {
Vote *types.Vote
}
// SignProposalMsg is a PrivValidatorSocket message containing a Proposal.
type SignProposalMsg struct {
Proposal *types.Proposal
}
// SignHeartbeatMsg is a PrivValidatorSocket message containing a Heartbeat.
type SignHeartbeatMsg struct {
Heartbeat *types.Heartbeat
}
func readMsg(r io.Reader) (PrivValidatorSocketMsg, error) {
var (
n int
err error
)
read := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, &n, &err)
if err != nil {
return nil, err
}
w, ok := read.(struct{ PrivValidatorSocketMsg })
if !ok {
return nil, errors.New("unknwon type")
}
return w.PrivValidatorSocketMsg, nil
}
func writeMsg(w io.Writer, msg interface{}) error {
var (
err error
n int
)
// TODO(xla): This extra wrap should be gone with the sdk-2 update.
wire.WriteBinary(struct{ PrivValidatorSocketMsg }{msg}, w, &n, &err)
return err
}

+ 148
- 0
types/priv_validator/socket_test.go View File

@ -0,0 +1,148 @@
package types
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tmlibs/log"
"github.com/tendermint/tendermint/types"
)
func TestPrivValidatorSocketServer(t *testing.T) {
var (
assert, require = assert.New(t), require.New(t)
chainID = "test-chain-secret"
logger = log.TestingLogger()
signer = types.GenSigner()
clientPrivKey = crypto.GenPrivKeyEd25519()
serverPrivKey = crypto.GenPrivKeyEd25519()
privVal = NewTestPrivValidator(signer)
pvss = NewPrivValidatorSocketServer(
logger,
chainID,
"127.0.0.1:0",
1,
privVal,
&serverPrivKey,
)
)
err := pvss.Start()
require.Nil(err)
defer pvss.Stop()
assert.True(pvss.IsRunning())
pvsc := NewPrivValidatorSocketClient(
logger,
pvss.listener.Addr().String(),
&clientPrivKey,
)
err = pvsc.Start()
require.Nil(err)
defer pvsc.Stop()
assert.True(pvsc.IsRunning())
cAddr, err := pvsc.Address()
require.Nil(err)
sAddr, err := pvss.privVal.Address()
require.Nil(err)
assert.Equal(cAddr, sAddr)
cKey, err := pvsc.PubKey()
require.Nil(err)
sKey, err := pvss.privVal.PubKey()
require.Nil(err)
assert.Equal(cKey, sKey)
err = pvsc.SignProposal(chainID, &types.Proposal{
Timestamp: time.Now(),
})
require.Nil(err)
err = pvsc.SignVote(chainID, &types.Vote{
Timestamp: time.Now(),
Type: types.VoteTypePrecommit,
})
require.Nil(err)
err = pvsc.SignHeartbeat(chainID, &types.Heartbeat{})
require.Nil(err)
}
func TestPrivValidatorSocketServerWithoutSecret(t *testing.T) {
var (
assert, require = assert.New(t), require.New(t)
chainID = "test-chain-secret"
logger = log.TestingLogger()
signer = types.GenSigner()
privVal = NewTestPrivValidator(signer)
pvss = NewPrivValidatorSocketServer(
logger,
chainID,
"127.0.0.1:0",
1,
privVal,
nil,
)
)
err := pvss.Start()
require.Nil(err)
defer pvss.Stop()
assert.True(pvss.IsRunning())
pvsc := NewPrivValidatorSocketClient(
logger,
pvss.listener.Addr().String(),
nil,
)
err = pvsc.Start()
require.Nil(err)
defer pvsc.Stop()
assert.True(pvsc.IsRunning())
cAddr, err := pvsc.Address()
require.Nil(err)
sAddr, err := pvss.privVal.Address()
require.Nil(err)
assert.Equal(cAddr, sAddr)
cKey, err := pvsc.PubKey()
require.Nil(err)
sKey, err := pvss.privVal.PubKey()
require.Nil(err)
assert.Equal(cKey, sKey)
err = pvsc.SignProposal(chainID, &types.Proposal{
Timestamp: time.Now(),
})
require.Nil(err)
err = pvsc.SignVote(chainID, &types.Vote{
Timestamp: time.Now(),
Type: types.VoteTypePrecommit,
})
require.Nil(err)
err = pvsc.SignHeartbeat(chainID, &types.Heartbeat{})
require.Nil(err)
}

+ 12
- 7
types/priv_validator/unencrypted.go View File

@ -4,13 +4,13 @@ import (
"fmt"
crypto "github.com/tendermint/go-crypto"
data "github.com/tendermint/go-wire/data"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
)
//-----------------------------------------------------------------
var _ types.PrivValidator = (*PrivValidatorUnencrypted)(nil)
var _ types.PrivValidator2 = (*PrivValidatorUnencrypted)(nil)
// PrivValidatorUnencrypted implements PrivValidator.
// It uses an in-memory crypto.PrivKey that is
@ -35,15 +35,20 @@ func NewPrivValidatorUnencrypted(priv crypto.PrivKey) *PrivValidatorUnencrypted
// String returns a string representation of the PrivValidatorUnencrypted
func (upv *PrivValidatorUnencrypted) String() string {
return fmt.Sprintf("PrivValidator{%v %v}", upv.Address(), upv.LastSignedInfo.String())
addr, err := upv.Address()
if err != nil {
panic(err)
}
return fmt.Sprintf("PrivValidator{%v %v}", addr, upv.LastSignedInfo.String())
}
func (upv *PrivValidatorUnencrypted) Address() data.Bytes {
return upv.PrivKey.PubKey().Address()
func (upv *PrivValidatorUnencrypted) Address() (cmn.HexBytes, error) {
return upv.PrivKey.PubKey().Address(), nil
}
func (upv *PrivValidatorUnencrypted) PubKey() crypto.PubKey {
return upv.PrivKey.PubKey()
func (upv *PrivValidatorUnencrypted) PubKey() (crypto.PubKey, error) {
return upv.PrivKey.PubKey(), nil
}
func (upv *PrivValidatorUnencrypted) SignVote(chainID string, vote *types.Vote) error {


+ 3
- 3
types/priv_validator/upgrade.go View File

@ -5,18 +5,18 @@ import (
"io/ioutil"
crypto "github.com/tendermint/go-crypto"
data "github.com/tendermint/go-wire/data"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
)
type PrivValidatorV1 struct {
Address data.Bytes `json:"address"`
Address cmn.HexBytes `json:"address"`
PubKey crypto.PubKey `json:"pub_key"`
LastHeight int64 `json:"last_height"`
LastRound int `json:"last_round"`
LastStep int8 `json:"last_step"`
LastSignature crypto.Signature `json:"last_signature,omitempty"` // so we dont lose signatures
LastSignBytes data.Bytes `json:"last_signbytes,omitempty"` // so we dont lose signatures
LastSignBytes cmn.HexBytes `json:"last_signbytes,omitempty"` // so we dont lose signatures
PrivKey crypto.PrivKey `json:"priv_key"`
}


+ 1
- 1
types/proposal_test.go View File

@ -12,7 +12,7 @@ import (
var testProposal *Proposal
func init() {
var stamp, err = time.Parse(timeFormat, "2018-02-11T07:09:22.765Z")
var stamp, err = time.Parse(TimeFormat, "2018-02-11T07:09:22.765Z")
if err != nil {
panic(err)
}


+ 1
- 1
types/vote_test.go View File

@ -18,7 +18,7 @@ func examplePrecommit() *Vote {
}
func exampleVote(t byte) *Vote {
var stamp, err = time.Parse(timeFormat, "2017-12-25T03:00:01.234Z")
var stamp, err = time.Parse(TimeFormat, "2017-12-25T03:00:01.234Z")
if err != nil {
panic(err)
}


Loading…
Cancel
Save