package privval
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/tendermint/tendermint/crypto"
|
|
cmn "github.com/tendermint/tendermint/libs/common"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
const (
|
|
defaultHeartbeatSeconds = 2
|
|
defaultMaxDialRetries = 10
|
|
)
|
|
|
|
var (
|
|
heartbeatPeriod = time.Second * defaultHeartbeatSeconds
|
|
)
|
|
|
|
// SignerValidatorEndpointOption sets an optional parameter on the SocketVal.
|
|
type SignerValidatorEndpointOption func(*SignerValidatorEndpoint)
|
|
|
|
// SignerValidatorEndpointSetHeartbeat sets the period on which to check the liveness of the
|
|
// connected Signer connections.
|
|
func SignerValidatorEndpointSetHeartbeat(period time.Duration) SignerValidatorEndpointOption {
|
|
return func(sc *SignerValidatorEndpoint) { sc.heartbeatPeriod = period }
|
|
}
|
|
|
|
// SocketVal implements PrivValidator.
|
|
// It listens for an external process to dial in and uses
|
|
// the socket to request signatures.
|
|
type SignerValidatorEndpoint struct {
|
|
cmn.BaseService
|
|
|
|
listener net.Listener
|
|
|
|
// ping
|
|
cancelPingCh chan struct{}
|
|
pingTicker *time.Ticker
|
|
heartbeatPeriod time.Duration
|
|
|
|
// signer is mutable since it can be reset if the connection fails.
|
|
// failures are detected by a background ping routine.
|
|
// All messages are request/response, so we hold the mutex
|
|
// so only one request/response pair can happen at a time.
|
|
// Methods on the underlying net.Conn itself are already goroutine safe.
|
|
mtx sync.Mutex
|
|
|
|
// TODO: Signer should encapsulate and hide the endpoint completely. Invert the relation
|
|
signer *SignerRemote
|
|
}
|
|
|
|
// Check that SignerValidatorEndpoint implements PrivValidator.
|
|
var _ types.PrivValidator = (*SignerValidatorEndpoint)(nil)
|
|
|
|
// NewSignerValidatorEndpoint returns an instance of SignerValidatorEndpoint.
|
|
func NewSignerValidatorEndpoint(logger log.Logger, listener net.Listener) *SignerValidatorEndpoint {
|
|
sc := &SignerValidatorEndpoint{
|
|
listener: listener,
|
|
heartbeatPeriod: heartbeatPeriod,
|
|
}
|
|
|
|
sc.BaseService = *cmn.NewBaseService(logger, "SignerValidatorEndpoint", sc)
|
|
|
|
return sc
|
|
}
|
|
|
|
//--------------------------------------------------------
|
|
// Implement PrivValidator
|
|
|
|
// GetPubKey implements PrivValidator.
|
|
func (ve *SignerValidatorEndpoint) GetPubKey() crypto.PubKey {
|
|
ve.mtx.Lock()
|
|
defer ve.mtx.Unlock()
|
|
return ve.signer.GetPubKey()
|
|
}
|
|
|
|
// SignVote implements PrivValidator.
|
|
func (ve *SignerValidatorEndpoint) SignVote(chainID string, vote *types.Vote) error {
|
|
ve.mtx.Lock()
|
|
defer ve.mtx.Unlock()
|
|
return ve.signer.SignVote(chainID, vote)
|
|
}
|
|
|
|
// SignProposal implements PrivValidator.
|
|
func (ve *SignerValidatorEndpoint) SignProposal(chainID string, proposal *types.Proposal) error {
|
|
ve.mtx.Lock()
|
|
defer ve.mtx.Unlock()
|
|
return ve.signer.SignProposal(chainID, proposal)
|
|
}
|
|
|
|
//--------------------------------------------------------
|
|
// More thread safe methods proxied to the signer
|
|
|
|
// Ping is used to check connection health.
|
|
func (ve *SignerValidatorEndpoint) Ping() error {
|
|
ve.mtx.Lock()
|
|
defer ve.mtx.Unlock()
|
|
return ve.signer.Ping()
|
|
}
|
|
|
|
// Close closes the underlying net.Conn.
|
|
func (ve *SignerValidatorEndpoint) Close() {
|
|
ve.mtx.Lock()
|
|
defer ve.mtx.Unlock()
|
|
if ve.signer != nil {
|
|
if err := ve.signer.Close(); err != nil {
|
|
ve.Logger.Error("OnStop", "err", err)
|
|
}
|
|
}
|
|
|
|
if ve.listener != nil {
|
|
if err := ve.listener.Close(); err != nil {
|
|
ve.Logger.Error("OnStop", "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
//--------------------------------------------------------
|
|
// Service start and stop
|
|
|
|
// OnStart implements cmn.Service.
|
|
func (ve *SignerValidatorEndpoint) OnStart() error {
|
|
if closed, err := ve.reset(); err != nil {
|
|
ve.Logger.Error("OnStart", "err", err)
|
|
return err
|
|
} else if closed {
|
|
return fmt.Errorf("listener is closed")
|
|
}
|
|
|
|
// Start a routine to keep the connection alive
|
|
ve.cancelPingCh = make(chan struct{}, 1)
|
|
ve.pingTicker = time.NewTicker(ve.heartbeatPeriod)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ve.pingTicker.C:
|
|
err := ve.Ping()
|
|
if err != nil {
|
|
ve.Logger.Error("Ping", "err", err)
|
|
if err == ErrUnexpectedResponse {
|
|
return
|
|
}
|
|
|
|
closed, err := ve.reset()
|
|
if err != nil {
|
|
ve.Logger.Error("Reconnecting to remote signer failed", "err", err)
|
|
continue
|
|
}
|
|
if closed {
|
|
ve.Logger.Info("listener is closing")
|
|
return
|
|
}
|
|
|
|
ve.Logger.Info("Re-created connection to remote signer", "impl", ve)
|
|
}
|
|
case <-ve.cancelPingCh:
|
|
ve.pingTicker.Stop()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// OnStop implements cmn.Service.
|
|
func (ve *SignerValidatorEndpoint) OnStop() {
|
|
if ve.cancelPingCh != nil {
|
|
close(ve.cancelPingCh)
|
|
}
|
|
ve.Close()
|
|
}
|
|
|
|
//--------------------------------------------------------
|
|
// Connection and signer management
|
|
|
|
// waits to accept and sets a new connection.
|
|
// connection is closed in OnStop.
|
|
// returns true if the listener is closed
|
|
// (ie. it returns a nil conn).
|
|
func (ve *SignerValidatorEndpoint) reset() (closed bool, err error) {
|
|
ve.mtx.Lock()
|
|
defer ve.mtx.Unlock()
|
|
|
|
// first check if the conn already exists and close it.
|
|
if ve.signer != nil {
|
|
if tmpErr := ve.signer.Close(); tmpErr != nil {
|
|
ve.Logger.Error("error closing socket val connection during reset", "err", tmpErr)
|
|
}
|
|
}
|
|
|
|
// wait for a new conn
|
|
conn, err := ve.acceptConnection()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// listener is closed
|
|
if conn == nil {
|
|
return true, nil
|
|
}
|
|
|
|
ve.signer, err = NewSignerRemote(conn)
|
|
if err != nil {
|
|
// failed to fetch the pubkey. close out the connection.
|
|
if tmpErr := conn.Close(); tmpErr != nil {
|
|
ve.Logger.Error("error closing connection", "err", tmpErr)
|
|
}
|
|
return false, err
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
// Attempt to accept a connection.
|
|
// Times out after the listener's timeoutAccept
|
|
func (ve *SignerValidatorEndpoint) acceptConnection() (net.Conn, error) {
|
|
conn, err := ve.listener.Accept()
|
|
if err != nil {
|
|
if !ve.IsRunning() {
|
|
return nil, nil // Ignore error from listener closing.
|
|
}
|
|
return nil, err
|
|
}
|
|
return conn, nil
|
|
}
|