You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

230 lines
5.9 KiB

package privval
import (
"fmt"
"net"
"sync"
"time"
"github.com/tendermint/tendermint/crypto"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
const (
defaultHeartbeatSeconds = 2
defaultMaxDialRetries = 10
)
var (
heartbeatPeriod = time.Second * defaultHeartbeatSeconds
)
// SignerValidatorEndpointOption sets an optional parameter on the SocketVal.
type SignerValidatorEndpointOption func(*SignerValidatorEndpoint)
// SignerValidatorEndpointSetHeartbeat sets the period on which to check the liveness of the
// connected Signer connections.
func SignerValidatorEndpointSetHeartbeat(period time.Duration) SignerValidatorEndpointOption {
return func(sc *SignerValidatorEndpoint) { sc.heartbeatPeriod = period }
}
// SocketVal implements PrivValidator.
// It listens for an external process to dial in and uses
// the socket to request signatures.
type SignerValidatorEndpoint struct {
cmn.BaseService
listener net.Listener
// ping
cancelPingCh chan struct{}
pingTicker *time.Ticker
heartbeatPeriod time.Duration
// signer is mutable since it can be reset if the connection fails.
// failures are detected by a background ping routine.
// All messages are request/response, so we hold the mutex
// so only one request/response pair can happen at a time.
// Methods on the underlying net.Conn itself are already goroutine safe.
mtx sync.Mutex
// TODO: Signer should encapsulate and hide the endpoint completely. Invert the relation
signer *SignerRemote
}
// Check that SignerValidatorEndpoint implements PrivValidator.
var _ types.PrivValidator = (*SignerValidatorEndpoint)(nil)
// NewSignerValidatorEndpoint returns an instance of SignerValidatorEndpoint.
func NewSignerValidatorEndpoint(logger log.Logger, listener net.Listener) *SignerValidatorEndpoint {
sc := &SignerValidatorEndpoint{
listener: listener,
heartbeatPeriod: heartbeatPeriod,
}
sc.BaseService = *cmn.NewBaseService(logger, "SignerValidatorEndpoint", sc)
return sc
}
//--------------------------------------------------------
// Implement PrivValidator
// GetPubKey implements PrivValidator.
func (ve *SignerValidatorEndpoint) GetPubKey() crypto.PubKey {
ve.mtx.Lock()
defer ve.mtx.Unlock()
return ve.signer.GetPubKey()
}
// SignVote implements PrivValidator.
func (ve *SignerValidatorEndpoint) SignVote(chainID string, vote *types.Vote) error {
ve.mtx.Lock()
defer ve.mtx.Unlock()
return ve.signer.SignVote(chainID, vote)
}
// SignProposal implements PrivValidator.
func (ve *SignerValidatorEndpoint) SignProposal(chainID string, proposal *types.Proposal) error {
ve.mtx.Lock()
defer ve.mtx.Unlock()
return ve.signer.SignProposal(chainID, proposal)
}
//--------------------------------------------------------
// More thread safe methods proxied to the signer
// Ping is used to check connection health.
func (ve *SignerValidatorEndpoint) Ping() error {
ve.mtx.Lock()
defer ve.mtx.Unlock()
return ve.signer.Ping()
}
// Close closes the underlying net.Conn.
func (ve *SignerValidatorEndpoint) Close() {
ve.mtx.Lock()
defer ve.mtx.Unlock()
if ve.signer != nil {
if err := ve.signer.Close(); err != nil {
ve.Logger.Error("OnStop", "err", err)
}
}
if ve.listener != nil {
if err := ve.listener.Close(); err != nil {
ve.Logger.Error("OnStop", "err", err)
}
}
}
//--------------------------------------------------------
// Service start and stop
// OnStart implements cmn.Service.
func (ve *SignerValidatorEndpoint) OnStart() error {
if closed, err := ve.reset(); err != nil {
ve.Logger.Error("OnStart", "err", err)
return err
} else if closed {
return fmt.Errorf("listener is closed")
}
// Start a routine to keep the connection alive
ve.cancelPingCh = make(chan struct{}, 1)
ve.pingTicker = time.NewTicker(ve.heartbeatPeriod)
go func() {
for {
select {
case <-ve.pingTicker.C:
err := ve.Ping()
if err != nil {
ve.Logger.Error("Ping", "err", err)
if err == ErrUnexpectedResponse {
return
}
closed, err := ve.reset()
if err != nil {
ve.Logger.Error("Reconnecting to remote signer failed", "err", err)
continue
}
if closed {
ve.Logger.Info("listener is closing")
return
}
ve.Logger.Info("Re-created connection to remote signer", "impl", ve)
}
case <-ve.cancelPingCh:
ve.pingTicker.Stop()
return
}
}
}()
return nil
}
// OnStop implements cmn.Service.
func (ve *SignerValidatorEndpoint) OnStop() {
if ve.cancelPingCh != nil {
close(ve.cancelPingCh)
}
ve.Close()
}
//--------------------------------------------------------
// Connection and signer management
// waits to accept and sets a new connection.
// connection is closed in OnStop.
// returns true if the listener is closed
// (ie. it returns a nil conn).
func (ve *SignerValidatorEndpoint) reset() (closed bool, err error) {
ve.mtx.Lock()
defer ve.mtx.Unlock()
// first check if the conn already exists and close it.
if ve.signer != nil {
if tmpErr := ve.signer.Close(); tmpErr != nil {
ve.Logger.Error("error closing socket val connection during reset", "err", tmpErr)
}
}
// wait for a new conn
conn, err := ve.acceptConnection()
if err != nil {
return false, err
}
// listener is closed
if conn == nil {
return true, nil
}
ve.signer, err = NewSignerRemote(conn)
if err != nil {
// failed to fetch the pubkey. close out the connection.
if tmpErr := conn.Close(); tmpErr != nil {
ve.Logger.Error("error closing connection", "err", tmpErr)
}
return false, err
}
return false, nil
}
// Attempt to accept a connection.
// Times out after the listener's timeoutAccept
func (ve *SignerValidatorEndpoint) acceptConnection() (net.Conn, error) {
conn, err := ve.listener.Accept()
if err != nil {
if !ve.IsRunning() {
return nil, nil // Ignore error from listener closing.
}
return nil, err
}
return conn, nil
}