Browse Source

wip

pull/3370/head
Juan Leni 6 years ago
parent
commit
409175c3f2
No known key found for this signature in database GPG Key ID: 23F1452155140419
4 changed files with 121 additions and 103 deletions
  1. +53
    -50
      privval/signer_dialer_endpoint.go
  2. +64
    -49
      privval/signer_listener_endpoint.go
  3. +3
    -3
      tools/tm-signer-harness/internal/test_harness.go
  4. +1
    -1
      tools/tm-signer-harness/internal/test_harness_test.go

+ 53
- 50
privval/signer_dialer_endpoint.go View File

@ -85,7 +85,6 @@ func (ss *SignerDialerEndpoint) OnStart() error {
go ss.serviceLoop()
ss.Logger.Debug("OnStart - done")
return nil
}
@ -93,24 +92,18 @@ func (ss *SignerDialerEndpoint) OnStart() error {
func (ss *SignerDialerEndpoint) OnStop() {
ss.Logger.Debug("SignerDialerEndpoint: OnStop calling Close")
_ = ss.Close()
}
// IsConnected indicates if there is an active connection
func (ss *SignerDialerEndpoint) IsConnected() bool {
ss.mtx.Lock()
defer ss.mtx.Unlock()
return ss.isConnected()
// Stop service loop
close(ss.stopCh)
<-ss.stoppedCh
}
// Close closes the underlying net.Conn.
func (ss *SignerDialerEndpoint) Close() error {
ss.mtx.Lock()
defer ss.mtx.Unlock()
close(ss.stopCh)
<-ss.stoppedCh
ss.Logger.Debug("SignerDialerEndpoint: Close")
if ss.conn != nil {
if err := ss.conn.Close(); err != nil {
ss.Logger.Error("OnStop", "err", cmn.ErrorWrap(err, "closing listener failed"))
@ -121,54 +114,25 @@ func (ss *SignerDialerEndpoint) Close() error {
return nil
}
func (ss *SignerDialerEndpoint) serviceLoop() {
defer close(ss.stoppedCh)
retries := 0
var err error
for {
select {
default:
{
ss.Logger.Debug("Try connect", "retries", retries, "max", ss.maxConnRetries)
if retries > ss.maxConnRetries {
ss.Logger.Error("Maximum retries reached", "retries", retries)
return
}
if ss.conn == nil {
ss.conn, err = ss.dialer()
if err != nil {
ss.conn = nil // Explicitly set to nil because dialer returns an interface (https://golang.org/doc/faq#nil_error)
retries++
continue
}
}
retries = 0
ss.handleRequest()
}
case <-ss.stopCh:
{
return
}
}
}
// IsConnected indicates if there is an active connection
func (ss *SignerDialerEndpoint) IsConnected() bool {
ss.mtx.Lock()
defer ss.mtx.Unlock()
return ss.isConnected()
}
func (ss *SignerDialerEndpoint) readMessage() (msg RemoteSignerMsg, err error) {
// TODO(jleni): Avoid duplication. Unify endpoints
if ss.conn == nil {
if !ss.isConnected() {
return nil, fmt.Errorf("not connected")
}
// Reset read deadline
deadline := time.Now().Add(ss.timeoutReadWrite)
ss.Logger.Debug("SignerDialerEndpoint: readMessage", "deadline", deadline)
ss.Logger.Debug(
"SignerDialerEndpoint: readMessage",
"timeout", ss.timeoutReadWrite,
"deadline", deadline)
err = ss.conn.SetReadDeadline(deadline)
if err != nil {
@ -240,3 +204,42 @@ func (ss *SignerDialerEndpoint) handleRequest() {
func (ve *SignerDialerEndpoint) isConnected() bool {
return ve.IsRunning() && ve.conn != nil
}
func (ss *SignerDialerEndpoint) serviceLoop() {
defer close(ss.stoppedCh)
retries := 0
var err error
for {
select {
default:
{
ss.Logger.Debug("Try connect", "retries", retries, "max", ss.maxConnRetries)
if retries > ss.maxConnRetries {
ss.Logger.Error("Maximum retries reached", "retries", retries)
return
}
if ss.conn == nil {
ss.conn, err = ss.dialer()
if err != nil {
ss.conn = nil // Explicitly set to nil because dialer returns an interface (https://golang.org/doc/faq#nil_error)
retries++
continue
}
}
retries = 0
ss.handleRequest()
}
case <-ss.stopCh:
{
return
}
}
}
}

+ 64
- 49
privval/signer_listener_endpoint.go View File

@ -18,11 +18,13 @@ type SignerValidatorEndpointOption func(*SignerListenerEndpoint)
type SignerListenerEndpoint struct {
cmn.BaseService
mtx sync.Mutex
extMtx sync.Mutex
listener net.Listener
conn net.Conn
timeoutReadWrite time.Duration
stopCh, stoppedCh chan struct{}
}
// NewSignerListenerEndpoint returns an instance of SignerListenerEndpoint.
@ -42,13 +44,11 @@ func NewSignerListenerEndpoint(logger log.Logger, listener net.Listener) *Signer
func (ve *SignerListenerEndpoint) OnStart() error {
ve.Logger.Debug("SignerListenerEndpoint: OnStart")
err := ve.acceptNewConnection()
if err != nil {
ve.Logger.Error("OnStart", "err", err)
return err
}
ve.stopCh = make(chan struct{})
ve.stoppedCh = make(chan struct{})
go ve.serviceLoop()
ve.Logger.Debug("SignerListenerEndpoint OnStart", "connected", ve.isConnected())
return nil
}
@ -56,50 +56,53 @@ func (ve *SignerListenerEndpoint) OnStart() error {
func (ve *SignerListenerEndpoint) OnStop() {
ve.Logger.Debug("SignerListenerEndpoint: OnStop calling Close")
_ = ve.Close()
}
// IsConnected indicates if there is an active connection
func (ve *SignerListenerEndpoint) IsConnected() bool {
ve.mtx.Lock()
defer ve.mtx.Unlock()
return ve.isConnected()
}
// WaitForConnection waits maxWait for a connection or returns a timeout error
func (ve *SignerListenerEndpoint) WaitForConnection(maxWait time.Duration) error {
ve.mtx.Lock()
defer ve.mtx.Unlock()
// Stop listening
if ve.listener != nil {
if err := ve.listener.Close(); err != nil {
ve.Logger.Error("Closing Listener", "err", err)
}
}
// TODO(jleni): Pass maxwait through
return ve.ensureConnection()
// Stop service loop
close(ve.stopCh)
<-ve.stoppedCh
}
// Close closes the underlying net.Conn.
func (ve *SignerListenerEndpoint) Close() error {
ve.mtx.Lock()
defer ve.mtx.Unlock()
ve.extMtx.Lock()
defer ve.extMtx.Unlock()
ve.Logger.Debug("SignerListenerEndpoint: Close")
ve.dropConnection()
if ve.listener != nil {
if err := ve.listener.Close(); err != nil {
ve.Logger.Error("Closing Listener", "err", err)
return err
}
}
return nil
}
// IsConnected indicates if there is an active connection
func (ve *SignerListenerEndpoint) IsConnected() bool {
ve.extMtx.Lock()
defer ve.extMtx.Unlock()
return ve.isConnected()
}
// WaitForConnection waits maxWait for a connection or returns a timeout error
func (ve *SignerListenerEndpoint) WaitForConnection(maxWait time.Duration) error {
ve.extMtx.Lock()
defer ve.extMtx.Unlock()
return ve.ensureConnection(maxWait)
}
// SendRequest sends a request and waits for a response
func (ve *SignerListenerEndpoint) SendRequest(request RemoteSignerMsg) (RemoteSignerMsg, error) {
ve.mtx.Lock()
defer ve.mtx.Unlock()
ve.extMtx.Lock()
defer ve.extMtx.Unlock()
// TODO: Add retries.. that include dropping the connection and
ve.Logger.Debug("SignerListenerEndpoint: Send request", "connected", ve.isConnected())
err := ve.ensureConnection()
err := ve.ensureConnection(ve.timeoutReadWrite)
if err != nil {
return nil, err
}
@ -127,12 +130,9 @@ func (ve *SignerListenerEndpoint) isConnected() bool {
return ve.IsRunning() && ve.conn != nil
}
func (ve *SignerListenerEndpoint) ensureConnection() error {
func (ve *SignerListenerEndpoint) ensureConnection(maxWait time.Duration) error {
// TODO: Check that is connected
if !ve.isConnected() {
err := ve.acceptNewConnection()
if err != nil {
return cmn.ErrorWrap(ErrListenerNoConnection, "could not reconnect")
}
}
return nil
}
@ -140,10 +140,9 @@ func (ve *SignerListenerEndpoint) ensureConnection() error {
// dropConnection closes the current connection but does not touch the listening socket
func (ve *SignerListenerEndpoint) dropConnection() {
ve.Logger.Debug("SignerListenerEndpoint: dropConnection")
if ve.conn != nil {
if err := ve.conn.Close(); err != nil {
ve.Logger.Error("Closing connection", "err", err)
ve.Logger.Error("SignerListenerEndpoint::dropConnection", "err", err)
}
ve.conn = nil
}
@ -201,32 +200,48 @@ func (ve *SignerListenerEndpoint) writeMessage(msg RemoteSignerMsg) (err error)
return
}
func (ve *SignerListenerEndpoint) serviceLoop() {
defer close(ve.stoppedCh)
for {
select {
default:
{
ve.Logger.Debug("Listening for new connection")
_ = ve.acceptNewConnection()
}
case <-ve.stopCh:
{
return
}
}
}
}
func (ve *SignerListenerEndpoint) acceptNewConnection() error {
// TODO: add proper locking
ve.Logger.Debug("SignerListenerEndpoint: AcceptNewConnection")
if !ve.IsRunning() || ve.listener == nil {
return fmt.Errorf("endpoint is closing")
}
// TODO: add proper locking
// if the conn already exists and close it.
if ve.conn != nil {
if tmpErr := ve.conn.Close(); tmpErr != nil {
ve.Logger.Error("AcceptNewConnection: error closing old connection", "err", tmpErr)
}
}
// Forget old connection
ve.conn = nil
ve.dropConnection()
// wait for a new conn
conn, err := ve.listener.Accept()
if err != nil {
ve.Logger.Debug("listener accept failed", "err", err)
ve.conn = nil // Explicitly set to nil because dialer returns an interface (https://golang.org/doc/faq#nil_error)
return err
}
ve.conn = conn
ve.Logger.Info("SignerListenerEndpoint: New connection")
// TODO: add proper locking
return nil
}

+ 3
- 3
tools/tm-signer-harness/internal/test_harness.go View File

@ -101,7 +101,7 @@ func NewTestHarness(logger log.Logger, cfg TestHarnessConfig) (*TestHarness, err
}
logger.Info("Loaded genesis file", "chainID", st.ChainID)
spv, err := newTestHarnessSignerRemote(logger, cfg)
spv, err := newTestHarnessListener(logger, cfg)
if err != nil {
return nil, newTestHarnessError(ErrFailedToCreateListener, err, "")
}
@ -318,8 +318,8 @@ func (th *TestHarness) Shutdown(err error) {
}
}
// newTestHarnessSignerRemote creates our client instance which we will use for testing.
func newTestHarnessSignerRemote(logger log.Logger, cfg TestHarnessConfig) (*privval.SignerListenerEndpoint, error) {
// newTestHarnessListener creates our client instance which we will use for testing.
func newTestHarnessListener(logger log.Logger, cfg TestHarnessConfig) (*privval.SignerListenerEndpoint, error) {
proto, addr := cmn.ProtocolAndAddress(cfg.BindAddr)
if proto == "unix" {
// make sure the socket doesn't exist - if so, try to delete it


+ 1
- 1
tools/tm-signer-harness/internal/test_harness_test.go View File

@ -73,7 +73,7 @@ const (
)
func TestRemoteSignerTestHarnessMaxAcceptRetriesReached(t *testing.T) {
cfg := makeConfig(t, 1, 2)
cfg := makeConfig(t, 100, 2)
defer cleanup(cfg)
th, err := NewTestHarness(log.TestingLogger(), cfg)


Loading…
Cancel
Save