You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

564 lines
12 KiB

package types
import (
"fmt"
"io"
"net"
"time"
"github.com/pkg/errors"
crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
p2pconn "github.com/tendermint/tendermint/p2p/conn"
"github.com/tendermint/tendermint/types"
)
const (
defaultAcceptDeadlineSeconds = 3
defaultConnDeadlineSeconds = 3
defaultConnHeartBeatSeconds = 30
defaultConnWaitSeconds = 60
defaultDialRetries = 10
)
// Socket errors.
var (
ErrDialRetryMax = errors.New("dialed maximum retries")
ErrConnWaitTimeout = errors.New("waited for remote signer for too long")
ErrConnTimeout = errors.New("remote signer timed out")
)
var (
acceptDeadline = time.Second + defaultAcceptDeadlineSeconds
connDeadline = time.Second * defaultConnDeadlineSeconds
connHeartbeat = time.Second * defaultConnHeartBeatSeconds
)
// SocketClientOption sets an optional parameter on the SocketClient.
type SocketClientOption func(*SocketClient)
// SocketClientAcceptDeadline sets the deadline for the SocketClient listener.
// A zero time value disables the deadline.
func SocketClientAcceptDeadline(deadline time.Duration) SocketClientOption {
return func(sc *SocketClient) { sc.acceptDeadline = deadline }
}
// SocketClientConnDeadline sets the read and write deadline for connections
// from external signing processes.
func SocketClientConnDeadline(deadline time.Duration) SocketClientOption {
return func(sc *SocketClient) { sc.connDeadline = deadline }
}
// SocketClientHeartbeat sets the period on which to check the liveness of the
// connected Signer connections.
func SocketClientHeartbeat(period time.Duration) SocketClientOption {
return func(sc *SocketClient) { sc.connHeartbeat = period }
}
// SocketClientConnWait sets the timeout duration before connection of external
// signing processes are considered to be unsuccessful.
func SocketClientConnWait(timeout time.Duration) SocketClientOption {
return func(sc *SocketClient) { sc.connWaitTimeout = timeout }
}
// SocketClient implements PrivValidator, it uses a socket to request signatures
// from an external process.
type SocketClient struct {
cmn.BaseService
addr string
acceptDeadline time.Duration
connDeadline time.Duration
connHeartbeat time.Duration
connWaitTimeout time.Duration
privKey crypto.PrivKeyEd25519
conn net.Conn
listener net.Listener
}
// Check that SocketClient implements PrivValidator2.
var _ types.PrivValidator2 = (*SocketClient)(nil)
// NewSocketClient returns an instance of SocketClient.
func NewSocketClient(
logger log.Logger,
socketAddr string,
privKey crypto.PrivKeyEd25519,
) *SocketClient {
sc := &SocketClient{
addr: socketAddr,
acceptDeadline: acceptDeadline,
connDeadline: connDeadline,
connHeartbeat: connHeartbeat,
connWaitTimeout: time.Second * defaultConnWaitSeconds,
privKey: privKey,
}
sc.BaseService = *cmn.NewBaseService(logger, "SocketClient", sc)
return sc
}
// GetAddress implements PrivValidator.
// TODO(xla): Remove when PrivValidator2 replaced PrivValidator.
func (sc *SocketClient) GetAddress() types.Address {
addr, err := sc.Address()
if err != nil {
panic(err)
}
return addr
}
// Address is an alias for PubKey().Address().
func (sc *SocketClient) Address() (cmn.HexBytes, error) {
p, err := sc.PubKey()
if err != nil {
return nil, err
}
return p.Address(), nil
}
// GetPubKey implements PrivValidator.
// TODO(xla): Remove when PrivValidator2 replaced PrivValidator.
func (sc *SocketClient) GetPubKey() crypto.PubKey {
pubKey, err := sc.PubKey()
if err != nil {
panic(err)
}
return pubKey
}
// PubKey implements PrivValidator2.
func (sc *SocketClient) PubKey() (crypto.PubKey, error) {
err := writeMsg(sc.conn, &PubKeyMsg{})
if err != nil {
return crypto.PubKey{}, err
}
res, err := readMsg(sc.conn)
if err != nil {
return crypto.PubKey{}, err
}
return res.(*PubKeyMsg).PubKey, nil
}
// SignVote implements PrivValidator2.
func (sc *SocketClient) SignVote(chainID string, vote *types.Vote) error {
err := writeMsg(sc.conn, &SignVoteMsg{Vote: vote})
if err != nil {
return err
}
res, err := readMsg(sc.conn)
if err != nil {
return err
}
*vote = *res.(*SignVoteMsg).Vote
return nil
}
// SignProposal implements PrivValidator2.
func (sc *SocketClient) SignProposal(
chainID string,
proposal *types.Proposal,
) error {
err := writeMsg(sc.conn, &SignProposalMsg{Proposal: proposal})
if err != nil {
return err
}
res, err := readMsg(sc.conn)
if err != nil {
return err
}
*proposal = *res.(*SignProposalMsg).Proposal
return nil
}
// SignHeartbeat implements PrivValidator2.
func (sc *SocketClient) SignHeartbeat(
chainID string,
heartbeat *types.Heartbeat,
) error {
err := writeMsg(sc.conn, &SignHeartbeatMsg{Heartbeat: heartbeat})
if err != nil {
return err
}
res, err := readMsg(sc.conn)
if err != nil {
return err
}
*heartbeat = *res.(*SignHeartbeatMsg).Heartbeat
return nil
}
// OnStart implements cmn.Service.
func (sc *SocketClient) OnStart() error {
if err := sc.listen(); err != nil {
sc.Logger.Error(
"OnStart",
"err", errors.Wrap(err, "failed to listen"),
)
return err
}
conn, err := sc.waitConnection()
if err != nil {
sc.Logger.Error(
"OnStart",
"err", errors.Wrap(err, "failed to accept connection"),
)
return err
}
sc.conn = conn
return nil
}
// OnStop implements cmn.Service.
func (sc *SocketClient) OnStop() {
if sc.conn != nil {
if err := sc.conn.Close(); err != nil {
sc.Logger.Error(
"OnStop",
"err", errors.Wrap(err, "failed to close connection"),
)
}
}
if sc.listener != nil {
if err := sc.listener.Close(); err != nil {
sc.Logger.Error(
"OnStop",
"err", errors.Wrap(err, "failed to close listener"),
)
}
}
}
func (sc *SocketClient) acceptConnection() (net.Conn, error) {
conn, err := sc.listener.Accept()
if err != nil {
if !sc.IsRunning() {
return nil, nil // Ignore error from listener closing.
}
return nil, err
}
conn, err = p2pconn.MakeSecretConnection(conn, sc.privKey.Wrap())
if err != nil {
return nil, err
}
return conn, nil
}
func (sc *SocketClient) listen() error {
ln, err := net.Listen(cmn.ProtocolAndAddress(sc.addr))
if err != nil {
return err
}
sc.listener = newTCPTimeoutListener(
ln,
sc.acceptDeadline,
sc.connDeadline,
sc.connHeartbeat,
)
return nil
}
// waitConnection uses the configured wait timeout to error if no external
// process connects in the time period.
func (sc *SocketClient) waitConnection() (net.Conn, error) {
var (
connc = make(chan net.Conn, 1)
errc = make(chan error, 1)
)
go func(connc chan<- net.Conn, errc chan<- error) {
conn, err := sc.acceptConnection()
if err != nil {
errc <- err
return
}
connc <- conn
}(connc, errc)
select {
case conn := <-connc:
return conn, nil
case err := <-errc:
if _, ok := err.(timeoutError); ok {
return nil, errors.Wrap(ErrConnWaitTimeout, err.Error())
}
return nil, err
case <-time.After(sc.connWaitTimeout):
return nil, ErrConnWaitTimeout
}
}
//---------------------------------------------------------
// RemoteSignerOption sets an optional parameter on the RemoteSigner.
type RemoteSignerOption func(*RemoteSigner)
// RemoteSignerConnDeadline sets the read and write deadline for connections
// from external signing processes.
func RemoteSignerConnDeadline(deadline time.Duration) RemoteSignerOption {
return func(ss *RemoteSigner) { ss.connDeadline = deadline }
}
// RemoteSignerConnRetries sets the amount of attempted retries to connect.
func RemoteSignerConnRetries(retries int) RemoteSignerOption {
return func(ss *RemoteSigner) { ss.connRetries = retries }
}
// RemoteSigner implements PrivValidator by dialing to a socket.
type RemoteSigner struct {
cmn.BaseService
addr string
chainID string
connDeadline time.Duration
connRetries int
privKey crypto.PrivKeyEd25519
privVal PrivValidator
conn net.Conn
}
// NewRemoteSigner returns an instance of RemoteSigner.
func NewRemoteSigner(
logger log.Logger,
chainID, socketAddr string,
privVal PrivValidator,
privKey crypto.PrivKeyEd25519,
) *RemoteSigner {
rs := &RemoteSigner{
addr: socketAddr,
chainID: chainID,
connDeadline: time.Second * defaultConnDeadlineSeconds,
connRetries: defaultDialRetries,
privKey: privKey,
privVal: privVal,
}
rs.BaseService = *cmn.NewBaseService(logger, "RemoteSigner", rs)
return rs
}
// OnStart implements cmn.Service.
func (rs *RemoteSigner) OnStart() error {
conn, err := rs.connect()
if err != nil {
rs.Logger.Error("OnStart", "err", errors.Wrap(err, "connect"))
return err
}
go rs.handleConnection(conn)
return nil
}
// OnStop implements cmn.Service.
func (rs *RemoteSigner) OnStop() {
if rs.conn == nil {
return
}
if err := rs.conn.Close(); err != nil {
rs.Logger.Error("OnStop", "err", errors.Wrap(err, "closing listener failed"))
}
}
func (rs *RemoteSigner) connect() (net.Conn, error) {
for retries := rs.connRetries; retries > 0; retries-- {
// Don't sleep if it is the first retry.
if retries != rs.connRetries {
time.Sleep(rs.connDeadline)
}
conn, err := cmn.Connect(rs.addr)
if err != nil {
rs.Logger.Error(
"connect",
"addr", rs.addr,
"err", errors.Wrap(err, "connection failed"),
)
continue
}
if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil {
rs.Logger.Error(
"connect",
"err", errors.Wrap(err, "setting connection timeout failed"),
)
continue
}
conn, err = p2pconn.MakeSecretConnection(conn, rs.privKey.Wrap())
if err != nil {
rs.Logger.Error(
"connect",
"err", errors.Wrap(err, "encrypting connection failed"),
)
continue
}
return conn, nil
}
return nil, ErrDialRetryMax
}
func (rs *RemoteSigner) handleConnection(conn net.Conn) {
for {
if !rs.IsRunning() {
return // Ignore error from listener closing.
}
req, err := readMsg(conn)
if err != nil {
if err != io.EOF {
rs.Logger.Error("handleConnection", "err", err)
}
return
}
var res PrivValMsg
switch r := req.(type) {
case *PubKeyMsg:
var p crypto.PubKey
p, err = rs.privVal.PubKey()
res = &PubKeyMsg{p}
case *SignVoteMsg:
err = rs.privVal.SignVote(rs.chainID, r.Vote)
res = &SignVoteMsg{r.Vote}
case *SignProposalMsg:
err = rs.privVal.SignProposal(rs.chainID, r.Proposal)
res = &SignProposalMsg{r.Proposal}
case *SignHeartbeatMsg:
err = rs.privVal.SignHeartbeat(rs.chainID, r.Heartbeat)
res = &SignHeartbeatMsg{r.Heartbeat}
default:
err = fmt.Errorf("unknown msg: %v", r)
}
if err != nil {
rs.Logger.Error("handleConnection", "err", err)
return
}
err = writeMsg(conn, res)
if err != nil {
rs.Logger.Error("handleConnection", "err", err)
return
}
}
}
//---------------------------------------------------------
const (
msgTypePubKey = byte(0x01)
msgTypeSignVote = byte(0x10)
msgTypeSignProposal = byte(0x11)
msgTypeSignHeartbeat = byte(0x12)
)
// PrivValMsg is sent between RemoteSigner and SocketClient.
type PrivValMsg interface{}
var _ = wire.RegisterInterface(
struct{ PrivValMsg }{},
wire.ConcreteType{&PubKeyMsg{}, msgTypePubKey},
wire.ConcreteType{&SignVoteMsg{}, msgTypeSignVote},
wire.ConcreteType{&SignProposalMsg{}, msgTypeSignProposal},
wire.ConcreteType{&SignHeartbeatMsg{}, msgTypeSignHeartbeat},
)
// PubKeyMsg is a PrivValidatorSocket message containing the public key.
type PubKeyMsg struct {
PubKey crypto.PubKey
}
// SignVoteMsg is a PrivValidatorSocket message containing a vote.
type SignVoteMsg struct {
Vote *types.Vote
}
// SignProposalMsg is a PrivValidatorSocket message containing a Proposal.
type SignProposalMsg struct {
Proposal *types.Proposal
}
// SignHeartbeatMsg is a PrivValidatorSocket message containing a Heartbeat.
type SignHeartbeatMsg struct {
Heartbeat *types.Heartbeat
}
func readMsg(r io.Reader) (PrivValMsg, error) {
var (
n int
err error
)
read := wire.ReadBinary(struct{ PrivValMsg }{}, r, 0, &n, &err)
if err != nil {
if _, ok := err.(timeoutError); ok {
return nil, errors.Wrap(ErrConnTimeout, err.Error())
}
return nil, err
}
w, ok := read.(struct{ PrivValMsg })
if !ok {
return nil, errors.New("unknown type")
}
return w.PrivValMsg, nil
}
func writeMsg(w io.Writer, msg interface{}) error {
var (
err error
n int
)
// TODO(xla): This extra wrap should be gone with the sdk-2 update.
wire.WriteBinary(struct{ PrivValMsg }{msg}, w, &n, &err)
if _, ok := err.(timeoutError); ok {
return errors.Wrap(ErrConnTimeout, err.Error())
}
return err
}