package privval import ( "errors" "fmt" "net" "sync" "time" "github.com/tendermint/tendermint/crypto" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/types" ) const ( defaultConnHeartBeatSeconds = 2 defaultDialRetries = 10 ) // Socket errors. var ( ErrUnexpectedResponse = errors.New("received unexpected response") ) var ( connHeartbeat = time.Second * defaultConnHeartBeatSeconds ) // SocketValOption sets an optional parameter on the SocketVal. type SocketValOption func(*SocketVal) // SocketValHeartbeat sets the period on which to check the liveness of the // connected Signer connections. func SocketValHeartbeat(period time.Duration) SocketValOption { return func(sc *SocketVal) { sc.connHeartbeat = period } } // SocketVal implements PrivValidator. // It listens for an external process to dial in and uses // the socket to request signatures. type SocketVal struct { cmn.BaseService listener net.Listener // ping cancelPing chan struct{} pingTicker *time.Ticker connHeartbeat time.Duration // signer is mutable since it can be // reset if the connection fails. // failures are detected by a background // ping routine. // Methods on the underlying net.Conn itself // are already gorountine safe. mtx sync.RWMutex signer *RemoteSignerClient } // Check that SocketVal implements PrivValidator. var _ types.PrivValidator = (*SocketVal)(nil) // NewSocketVal returns an instance of SocketVal. func NewSocketVal( logger log.Logger, listener net.Listener, ) *SocketVal { sc := &SocketVal{ listener: listener, connHeartbeat: connHeartbeat, } sc.BaseService = *cmn.NewBaseService(logger, "SocketVal", sc) return sc } //-------------------------------------------------------- // Implement PrivValidator // GetPubKey implements PrivValidator. func (sc *SocketVal) GetPubKey() crypto.PubKey { sc.mtx.RLock() defer sc.mtx.RUnlock() return sc.signer.GetPubKey() } // SignVote implements PrivValidator. func (sc *SocketVal) SignVote(chainID string, vote *types.Vote) error { sc.mtx.RLock() defer sc.mtx.RUnlock() return sc.signer.SignVote(chainID, vote) } // SignProposal implements PrivValidator. func (sc *SocketVal) SignProposal(chainID string, proposal *types.Proposal) error { sc.mtx.RLock() defer sc.mtx.RUnlock() return sc.signer.SignProposal(chainID, proposal) } //-------------------------------------------------------- // More thread safe methods proxied to the signer // Ping is used to check connection health. func (sc *SocketVal) Ping() error { sc.mtx.RLock() defer sc.mtx.RUnlock() return sc.signer.Ping() } // Close closes the underlying net.Conn. func (sc *SocketVal) Close() { sc.mtx.RLock() defer sc.mtx.RUnlock() if sc.signer != nil { if err := sc.signer.Close(); err != nil { sc.Logger.Error("OnStop", "err", err) } } if sc.listener != nil { if err := sc.listener.Close(); err != nil { sc.Logger.Error("OnStop", "err", err) } } } //-------------------------------------------------------- // Service start and stop // OnStart implements cmn.Service. func (sc *SocketVal) OnStart() error { if closed, err := sc.reset(); err != nil { sc.Logger.Error("OnStart", "err", err) return err } else if closed { return fmt.Errorf("listener is closed") } // Start a routine to keep the connection alive sc.cancelPing = make(chan struct{}, 1) sc.pingTicker = time.NewTicker(sc.connHeartbeat) go func() { for { select { case <-sc.pingTicker.C: err := sc.Ping() if err != nil { sc.Logger.Error("Ping", "err", err) if err == ErrUnexpectedResponse { return } closed, err := sc.reset() if err != nil { sc.Logger.Error("Reconnecting to remote signer failed", "err", err) continue } if closed { sc.Logger.Info("listener is closing") return } sc.Logger.Info("Re-created connection to remote signer", "impl", sc) } case <-sc.cancelPing: sc.pingTicker.Stop() return } } }() return nil } // OnStop implements cmn.Service. func (sc *SocketVal) OnStop() { if sc.cancelPing != nil { close(sc.cancelPing) } sc.Close() } //-------------------------------------------------------- // Connection and signer management // waits to accept and sets a new connection. // connection is closed in OnStop. // returns true if the listener is closed // (ie. it returns a nil conn). func (sc *SocketVal) reset() (bool, error) { sc.mtx.Lock() defer sc.mtx.Unlock() // first check if the conn already exists and close it. if sc.signer != nil { if err := sc.signer.Close(); err != nil { sc.Logger.Error("error closing connection", "err", err) } } // wait for a new conn conn, err := sc.waitConnection() if err != nil { return false, err } // listener is closed if conn == nil { return true, nil } sc.signer, err = NewRemoteSignerClient(conn) if err != nil { // failed to fetch the pubkey. close out the connection. if err := conn.Close(); err != nil { sc.Logger.Error("error closing connection", "err", err) } return false, err } return false, nil } func (sc *SocketVal) acceptConnection() (net.Conn, error) { conn, err := sc.listener.Accept() if err != nil { if !sc.IsRunning() { return nil, nil // Ignore error from listener closing. } return nil, err } return conn, nil } // waitConnection uses the configured wait timeout to error if no external // process connects in the time period. func (sc *SocketVal) waitConnection() (net.Conn, error) { var ( connc = make(chan net.Conn, 1) errc = make(chan error, 1) ) go func(connc chan<- net.Conn, errc chan<- error) { conn, err := sc.acceptConnection() if err != nil { errc <- err return } connc <- conn }(connc, errc) select { case conn := <-connc: return conn, nil case err := <-errc: return nil, err } }