Browse Source

small formatting changes

pull/3370/head
Juan Leni 6 years ago
parent
commit
fb7ad0720b
No known key found for this signature in database GPG Key ID: 23F1452155140419
2 changed files with 158 additions and 158 deletions
  1. +66
    -66
      privval/signer_dialer_endpoint.go
  2. +92
    -92
      privval/signer_listener_endpoint.go

+ 66
- 66
privval/signer_dialer_endpoint.go View File

@ -76,38 +76,38 @@ func NewSignerDialerEndpoint(
}
// OnStart implements cmn.Service.
func (ss *SignerDialerEndpoint) OnStart() error {
ss.Logger.Debug("SignerDialerEndpoint: OnStart")
func (sd *SignerDialerEndpoint) OnStart() error {
sd.Logger.Debug("SignerDialerEndpoint: OnStart")
ss.stopCh = make(chan struct{})
ss.stoppedCh = make(chan struct{})
sd.stopCh = make(chan struct{})
sd.stoppedCh = make(chan struct{})
go ss.serviceLoop()
go sd.serviceLoop()
return nil
}
// OnStop implements cmn.Service.
func (ss *SignerDialerEndpoint) OnStop() {
ss.Logger.Debug("SignerDialerEndpoint: OnStop calling Close")
func (sd *SignerDialerEndpoint) OnStop() {
sd.Logger.Debug("SignerDialerEndpoint: OnStop calling Close")
// Stop service loop
close(ss.stopCh)
<-ss.stoppedCh
close(sd.stopCh)
<-sd.stoppedCh
_ = ss.Close()
_ = sd.Close()
}
// Close closes the underlying net.Conn.
func (ss *SignerDialerEndpoint) Close() error {
ss.mtx.Lock()
defer ss.mtx.Unlock()
ss.Logger.Debug("SignerDialerEndpoint: Close")
if ss.conn != nil {
if err := ss.conn.Close(); err != nil {
ss.Logger.Error("OnStop", "err", cmn.ErrorWrap(err, "closing listener failed"))
ss.conn = nil
func (sd *SignerDialerEndpoint) Close() error {
sd.mtx.Lock()
defer sd.mtx.Unlock()
sd.Logger.Debug("SignerDialerEndpoint: Close")
if sd.conn != nil {
if err := sd.conn.Close(); err != nil {
sd.Logger.Error("OnStop", "err", cmn.ErrorWrap(err, "closing listener failed"))
sd.conn = nil
}
}
@ -115,32 +115,36 @@ func (ss *SignerDialerEndpoint) Close() error {
}
// IsConnected indicates if there is an active connection
func (ss *SignerDialerEndpoint) IsConnected() bool {
ss.mtx.Lock()
defer ss.mtx.Unlock()
return ss.isConnected()
func (sd *SignerDialerEndpoint) IsConnected() bool {
sd.mtx.Lock()
defer sd.mtx.Unlock()
return sd.isConnected()
}
func (ss *SignerDialerEndpoint) readMessage() (msg RemoteSignerMsg, err error) {
// TODO(jleni): Avoid duplication. Unify endpoints
if !ss.isConnected() {
// IsConnected indicates if there is an active connection
func (sd *SignerDialerEndpoint) isConnected() bool {
return sd.IsRunning() && sd.conn != nil
}
func (sd *SignerDialerEndpoint) readMessage() (msg RemoteSignerMsg, err error) {
if !sd.isConnected() {
return nil, fmt.Errorf("not connected")
}
// Reset read deadline
deadline := time.Now().Add(ss.timeoutReadWrite)
ss.Logger.Debug(
deadline := time.Now().Add(sd.timeoutReadWrite)
sd.Logger.Debug(
"SignerDialerEndpoint: readMessage",
"timeout", ss.timeoutReadWrite,
"timeout", sd.timeoutReadWrite,
"deadline", deadline)
err = ss.conn.SetReadDeadline(deadline)
err = sd.conn.SetReadDeadline(deadline)
if err != nil {
return
}
const maxRemoteSignerMsgSize = 1024 * 10
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(ss.conn, &msg, maxRemoteSignerMsgSize)
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(sd.conn, &msg, maxRemoteSignerMsgSize)
if _, ok := err.(timeoutError); ok {
err = cmn.ErrorWrap(ErrDialerReadTimeout, err.Error())
}
@ -148,22 +152,23 @@ func (ss *SignerDialerEndpoint) readMessage() (msg RemoteSignerMsg, err error) {
return
}
func (ss *SignerDialerEndpoint) writeMessage(msg RemoteSignerMsg) (err error) {
// TODO(jleni): Avoid duplication. Unify endpoints
if ss.conn == nil {
return fmt.Errorf("not connected")
func (sd *SignerDialerEndpoint) writeMessage(msg RemoteSignerMsg) (err error) {
if !sd.isConnected() {
return cmn.ErrorWrap(ErrListenerNoConnection, "endpoint is not connected")
}
// Reset read deadline
deadline := time.Now().Add(ss.timeoutReadWrite)
ss.Logger.Debug("SignerDialerEndpoint: readMessage", "deadline", deadline)
err = ss.conn.SetWriteDeadline(deadline)
deadline := time.Now().Add(sd.timeoutReadWrite)
sd.Logger.Debug("SignerDialerEndpoint: readMessage",
"timeout", sd.timeoutReadWrite,
"deadline", deadline)
err = sd.conn.SetWriteDeadline(deadline)
if err != nil {
return
}
_, err = cdc.MarshalBinaryLengthPrefixedWriter(ss.conn, msg)
_, err = cdc.MarshalBinaryLengthPrefixedWriter(sd.conn, msg)
if _, ok := err.(timeoutError); ok {
err = cmn.ErrorWrap(ErrDialerWriteTimeout, err.Error())
}
@ -171,42 +176,37 @@ func (ss *SignerDialerEndpoint) writeMessage(msg RemoteSignerMsg) (err error) {
return
}
func (ss *SignerDialerEndpoint) handleRequest() {
if !ss.IsRunning() {
func (sd *SignerDialerEndpoint) handleRequest() {
if !sd.IsRunning() {
return // Ignore error from listener closing.
}
ss.Logger.Info("SignerDialerEndpoint: connected", "timeout", ss.timeoutReadWrite)
sd.Logger.Info("SignerDialerEndpoint: connected", "timeout", sd.timeoutReadWrite)
req, err := ss.readMessage()
req, err := sd.readMessage()
if err != nil {
if err != io.EOF {
ss.Logger.Error("SignerDialerEndpoint handleMessage", "err", err)
sd.Logger.Error("SignerDialerEndpoint handleMessage", "err", err)
}
return
}
res, err := HandleValidatorRequest(req, ss.chainID, ss.privVal)
res, err := HandleValidatorRequest(req, sd.chainID, sd.privVal)
if err != nil {
// only log the error; we'll reply with an error in res
ss.Logger.Error("handleMessage handleMessage", "err", err)
sd.Logger.Error("handleMessage handleMessage", "err", err)
}
err = ss.writeMessage(res)
err = sd.writeMessage(res)
if err != nil {
ss.Logger.Error("handleMessage writeMessage", "err", err)
sd.Logger.Error("handleMessage writeMessage", "err", err)
return
}
}
// IsConnected indicates if there is an active connection
func (ve *SignerDialerEndpoint) isConnected() bool {
return ve.IsRunning() && ve.conn != nil
}
func (ss *SignerDialerEndpoint) serviceLoop() {
defer close(ss.stoppedCh)
func (sd *SignerDialerEndpoint) serviceLoop() {
defer close(sd.stoppedCh)
retries := 0
var err error
@ -214,30 +214,30 @@ func (ss *SignerDialerEndpoint) serviceLoop() {
for {
select {
default:
ss.Logger.Debug("Try connect", "retries", retries, "max", ss.maxConnRetries)
sd.Logger.Debug("Try connect", "retries", retries, "max", sd.maxConnRetries)
if retries > ss.maxConnRetries {
ss.Logger.Error("Maximum retries reached", "retries", retries)
if retries > sd.maxConnRetries {
sd.Logger.Error("Maximum retries reached", "retries", retries)
return
}
if ss.conn == nil {
ss.conn, err = ss.dialer()
if sd.conn == nil {
sd.conn, err = sd.dialer()
if err != nil {
ss.Logger.Info("Try connect", "err", err)
ss.conn = nil // Explicitly set to nil because dialer returns an interface (https://golang.org/doc/faq#nil_error)
sd.Logger.Info("Try connect", "err", err)
sd.conn = nil // Explicitly set to nil because dialer returns an interface (https://golang.org/doc/faq#nil_error)
retries++
// Wait between retries
time.Sleep(ss.retryWait)
time.Sleep(sd.retryWait)
continue
}
}
retries = 0
ss.handleRequest()
sd.handleRequest()
case <-ss.stopCh:
case <-sd.stopCh:
return
}
}


+ 92
- 92
privval/signer_listener_endpoint.go View File

@ -43,88 +43,88 @@ func NewSignerListenerEndpoint(logger log.Logger, listener net.Listener) *Signer
}
// OnStart implements cmn.Service.
func (ve *SignerListenerEndpoint) OnStart() error {
ve.Logger.Debug("SignerListenerEndpoint: OnStart")
func (sl *SignerListenerEndpoint) OnStart() error {
sl.Logger.Debug("SignerListenerEndpoint: OnStart")
ve.stopCh = make(chan struct{})
ve.stoppedCh = make(chan struct{})
sl.stopCh = make(chan struct{})
sl.stoppedCh = make(chan struct{})
ve.connectCh = make(chan struct{})
ve.connectedCh = make(chan net.Conn)
sl.connectCh = make(chan struct{})
sl.connectedCh = make(chan net.Conn)
go ve.serviceLoop()
ve.connectCh <- struct{}{}
go sl.serviceLoop()
sl.connectCh <- struct{}{}
return nil
}
// OnStop implements cmn.Service.
func (ve *SignerListenerEndpoint) OnStop() {
ve.Logger.Debug("SignerListenerEndpoint: OnStop calling Close")
_ = ve.Close()
func (sl *SignerListenerEndpoint) OnStop() {
sl.Logger.Debug("SignerListenerEndpoint: OnStop calling Close")
_ = sl.Close()
ve.Logger.Debug("SignerListenerEndpoint: OnStop stop listening")
sl.Logger.Debug("SignerListenerEndpoint: OnStop stop listening")
// Stop listening
if ve.listener != nil {
if err := ve.listener.Close(); err != nil {
ve.Logger.Error("Closing Listener", "err", err)
if sl.listener != nil {
if err := sl.listener.Close(); err != nil {
sl.Logger.Error("Closing Listener", "err", err)
}
}
// Stop service loop
ve.stopCh <- struct{}{}
<-ve.stoppedCh
sl.stopCh <- struct{}{}
<-sl.stoppedCh
}
// Close closes the underlying net.Conn.
func (ve *SignerListenerEndpoint) Close() error {
ve.mtx.Lock()
defer ve.mtx.Unlock()
ve.Logger.Debug("SignerListenerEndpoint: Close")
func (sl *SignerListenerEndpoint) Close() error {
sl.mtx.Lock()
defer sl.mtx.Unlock()
sl.Logger.Debug("SignerListenerEndpoint: Close")
ve.dropConnection()
sl.dropConnection()
return nil
}
// IsConnected indicates if there is an active connection
func (ve *SignerListenerEndpoint) IsConnected() bool {
ve.mtx.Lock()
defer ve.mtx.Unlock()
return ve.isConnected()
func (sl *SignerListenerEndpoint) IsConnected() bool {
sl.mtx.Lock()
defer sl.mtx.Unlock()
return sl.isConnected()
}
// WaitForConnection waits maxWait for a connection or returns a timeout error
func (ve *SignerListenerEndpoint) WaitForConnection(maxWait time.Duration) error {
ve.mtx.Lock()
defer ve.mtx.Unlock()
return ve.ensureConnection(maxWait)
func (sl *SignerListenerEndpoint) WaitForConnection(maxWait time.Duration) error {
sl.mtx.Lock()
defer sl.mtx.Unlock()
return sl.ensureConnection(maxWait)
}
// SendRequest sends a request and waits for a response
func (ve *SignerListenerEndpoint) SendRequest(request RemoteSignerMsg) (RemoteSignerMsg, error) {
ve.mtx.Lock()
defer ve.mtx.Unlock()
func (sl *SignerListenerEndpoint) SendRequest(request RemoteSignerMsg) (RemoteSignerMsg, error) {
sl.mtx.Lock()
defer sl.mtx.Unlock()
// TODO: Add retries.. that include dropping the connection and
ve.Logger.Debug("SignerListenerEndpoint: Send request", "connected", ve.isConnected())
err := ve.ensureConnection(ve.timeoutReadWrite)
sl.Logger.Debug("SignerListenerEndpoint: Send request", "connected", sl.isConnected())
err := sl.ensureConnection(sl.timeoutReadWrite)
if err != nil {
return nil, err
}
ve.Logger.Debug("Send request. Write")
sl.Logger.Debug("Send request. Write")
err = ve.writeMessage(request)
err = sl.writeMessage(request)
if err != nil {
return nil, err
}
ve.Logger.Debug("Send request. Read")
sl.Logger.Debug("Send request. Read")
res, err := ve.readMessage()
res, err := sl.readMessage()
if err != nil {
ve.Logger.Debug("Read Error", "err", err)
sl.Logger.Debug("Read Error", "err", err)
return nil, err
}
@ -132,55 +132,55 @@ func (ve *SignerListenerEndpoint) SendRequest(request RemoteSignerMsg) (RemoteSi
}
// IsConnected indicates if there is an active connection
func (ve *SignerListenerEndpoint) isConnected() bool {
return ve.IsRunning() && ve.conn != nil
func (sl *SignerListenerEndpoint) isConnected() bool {
return sl.IsRunning() && sl.conn != nil
}
func (ve *SignerListenerEndpoint) readMessage() (msg RemoteSignerMsg, err error) {
if !ve.isConnected() {
func (sl *SignerListenerEndpoint) readMessage() (msg RemoteSignerMsg, err error) {
if !sl.isConnected() {
return nil, cmn.ErrorWrap(ErrListenerNoConnection, "endpoint is not connected")
}
// Reset read deadline
deadline := time.Now().Add(ve.timeoutReadWrite)
ve.Logger.Debug(
deadline := time.Now().Add(sl.timeoutReadWrite)
sl.Logger.Debug(
"SignerListenerEndpoint: readMessage",
"timeout", ve.timeoutReadWrite,
"timeout", sl.timeoutReadWrite,
"deadline", deadline)
err = ve.conn.SetReadDeadline(deadline)
err = sl.conn.SetReadDeadline(deadline)
if err != nil {
return
}
const maxRemoteSignerMsgSize = 1024 * 10
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(ve.conn, &msg, maxRemoteSignerMsgSize)
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(sl.conn, &msg, maxRemoteSignerMsgSize)
if _, ok := err.(timeoutError); ok {
err = cmn.ErrorWrap(ErrListenerTimeout, err.Error())
ve.dropConnection()
sl.dropConnection()
}
return
}
func (ve *SignerListenerEndpoint) writeMessage(msg RemoteSignerMsg) (err error) {
if !ve.isConnected() {
func (sl *SignerListenerEndpoint) writeMessage(msg RemoteSignerMsg) (err error) {
if !sl.isConnected() {
return cmn.ErrorWrap(ErrListenerNoConnection, "endpoint is not connected")
}
// Reset read deadline
deadline := time.Now().Add(ve.timeoutReadWrite)
ve.Logger.Debug(
deadline := time.Now().Add(sl.timeoutReadWrite)
sl.Logger.Debug(
"SignerListenerEndpoint: writeMessage",
"timeout", ve.timeoutReadWrite,
"timeout", sl.timeoutReadWrite,
"deadline", deadline)
err = ve.conn.SetWriteDeadline(deadline)
err = sl.conn.SetWriteDeadline(deadline)
if err != nil {
return
}
_, err = cdc.MarshalBinaryLengthPrefixedWriter(ve.conn, msg)
_, err = cdc.MarshalBinaryLengthPrefixedWriter(sl.conn, msg)
if _, ok := err.(timeoutError); ok {
err = cmn.ErrorWrap(ErrListenerTimeout, err.Error())
}
@ -188,42 +188,42 @@ func (ve *SignerListenerEndpoint) writeMessage(msg RemoteSignerMsg) (err error)
return
}
func (ve *SignerListenerEndpoint) ensureConnection(maxWait time.Duration) error {
if !ve.isConnected() {
func (sl *SignerListenerEndpoint) ensureConnection(maxWait time.Duration) error {
if !sl.isConnected() {
// Is there a connection ready?
select {
case ve.conn = <-ve.connectedCh:
case sl.conn = <-sl.connectedCh:
{
ve.Logger.Debug("SignerListenerEndpoint: received connection")
sl.Logger.Debug("SignerListenerEndpoint: received connection")
return nil
}
default:
{
ve.Logger.Debug("SignerListenerEndpoint: no connection is ready")
sl.Logger.Debug("SignerListenerEndpoint: no connection is ready")
}
}
// should we trigger a reconnect?
select {
case ve.connectCh <- struct{}{}:
case sl.connectCh <- struct{}{}:
{
ve.Logger.Debug("SignerListenerEndpoint: triggered a reconnect")
sl.Logger.Debug("SignerListenerEndpoint: triggered a reconnect")
}
default:
{
ve.Logger.Debug("SignerListenerEndpoint: reconnect in progress")
sl.Logger.Debug("SignerListenerEndpoint: reconnect in progress")
}
}
// block until connected or timeout
select {
case ve.conn = <-ve.connectedCh:
case sl.conn = <-sl.connectedCh:
{
ve.Logger.Debug("SignerListenerEndpoint: connected")
sl.Logger.Debug("SignerListenerEndpoint: connected")
}
case <-time.After(maxWait):
{
ve.Logger.Debug("SignerListenerEndpoint: timeout")
sl.Logger.Debug("SignerListenerEndpoint: timeout")
return ErrListenerTimeout
}
}
@ -232,37 +232,37 @@ func (ve *SignerListenerEndpoint) ensureConnection(maxWait time.Duration) error
}
// dropConnection closes the current connection but does not touch the listening socket
func (ve *SignerListenerEndpoint) dropConnection() {
if ve.conn != nil {
if err := ve.conn.Close(); err != nil {
ve.Logger.Error("SignerListenerEndpoint::dropConnection", "err", err)
func (sl *SignerListenerEndpoint) dropConnection() {
if sl.conn != nil {
if err := sl.conn.Close(); err != nil {
sl.Logger.Error("SignerListenerEndpoint::dropConnection", "err", err)
}
ve.conn = nil
sl.conn = nil
}
}
func (ve *SignerListenerEndpoint) serviceLoop() {
defer close(ve.stoppedCh)
ve.Logger.Debug("SignerListenerEndpoint::serviceLoop")
func (sl *SignerListenerEndpoint) serviceLoop() {
defer close(sl.stoppedCh)
sl.Logger.Debug("SignerListenerEndpoint::serviceLoop")
for {
select {
case <-ve.connectCh:
case <-sl.connectCh:
{
for {
ve.Logger.Info("Listening for new connection")
conn, err := ve.acceptNewConnection()
sl.Logger.Info("Listening for new connection")
conn, err := sl.acceptNewConnection()
if err == nil {
ve.Logger.Info("Connected")
sl.Logger.Info("Connected")
select {
case ve.connectedCh <- conn:
case sl.connectedCh <- conn:
{
ve.Logger.Debug("SignerListenerEndpoint: connection relayed")
sl.Logger.Debug("SignerListenerEndpoint: connection relayed")
}
case <-ve.stopCh:
case <-sl.stopCh:
{
ve.Logger.Debug("SignerListenerEndpoint: stopping")
sl.Logger.Debug("SignerListenerEndpoint: stopping")
return
}
}
@ -270,29 +270,29 @@ func (ve *SignerListenerEndpoint) serviceLoop() {
}
}
case <-ve.stopCh:
case <-sl.stopCh:
{
ve.Logger.Debug("SignerListenerEndpoint::serviceLoop Stop")
sl.Logger.Debug("SignerListenerEndpoint::serviceLoop Stop")
return
}
}
}
}
func (ve *SignerListenerEndpoint) acceptNewConnection() (net.Conn, error) {
ve.Logger.Debug("SignerListenerEndpoint: AcceptNewConnection")
func (sl *SignerListenerEndpoint) acceptNewConnection() (net.Conn, error) {
sl.Logger.Debug("SignerListenerEndpoint: AcceptNewConnection")
if !ve.IsRunning() || ve.listener == nil {
if !sl.IsRunning() || sl.listener == nil {
return nil, fmt.Errorf("endpoint is closing")
}
// wait for a new conn
conn, err := ve.listener.Accept()
conn, err := sl.listener.Accept()
if err != nil {
ve.Logger.Debug("listener accept failed", "err", err)
sl.Logger.Debug("listener accept failed", "err", err)
return nil, err
}
ve.Logger.Info("SignerListenerEndpoint: New connection")
sl.Logger.Info("SignerListenerEndpoint: New connection")
return conn, nil
}

Loading…
Cancel
Save