You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

161 lines
3.7 KiB

package privval
import (
"context"
"fmt"
"net"
"sync"
"time"
"github.com/tendermint/tendermint/internal/libs/protoio"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
privvalproto "github.com/tendermint/tendermint/proto/tendermint/privval"
)
const (
defaultTimeoutReadWriteSeconds = 5
)
type signerEndpoint struct {
service.BaseService
logger log.Logger
connMtx sync.Mutex
conn net.Conn
timeoutReadWrite time.Duration
}
// Close closes the underlying net.Conn.
func (se *signerEndpoint) Close() error {
se.DropConnection()
return nil
}
// IsConnected indicates if there is an active connection
func (se *signerEndpoint) IsConnected() bool {
se.connMtx.Lock()
defer se.connMtx.Unlock()
return se.isConnected()
}
// TryGetConnection retrieves a connection if it is already available
func (se *signerEndpoint) GetAvailableConnection(connectionAvailableCh chan net.Conn) bool {
se.connMtx.Lock()
defer se.connMtx.Unlock()
// Is there a connection ready?
select {
case se.conn = <-connectionAvailableCh:
return true
default:
}
return false
}
// TryGetConnection retrieves a connection if it is already available
func (se *signerEndpoint) WaitConnection(ctx context.Context, connectionAvailableCh chan net.Conn, maxWait time.Duration) error {
se.connMtx.Lock()
defer se.connMtx.Unlock()
select {
case <-ctx.Done():
return ctx.Err()
case se.conn = <-connectionAvailableCh:
case <-time.After(maxWait):
return ErrConnectionTimeout
}
return nil
}
// SetConnection replaces the current connection object
func (se *signerEndpoint) SetConnection(newConnection net.Conn) {
se.connMtx.Lock()
defer se.connMtx.Unlock()
se.conn = newConnection
}
// IsConnected indicates if there is an active connection
func (se *signerEndpoint) DropConnection() {
se.connMtx.Lock()
defer se.connMtx.Unlock()
se.dropConnection()
}
// ReadMessage reads a message from the endpoint
func (se *signerEndpoint) ReadMessage() (msg privvalproto.Message, err error) {
se.connMtx.Lock()
defer se.connMtx.Unlock()
if !se.isConnected() {
return msg, fmt.Errorf("endpoint is not connected: %w", ErrNoConnection)
}
// Reset read deadline
deadline := time.Now().Add(se.timeoutReadWrite)
err = se.conn.SetReadDeadline(deadline)
if err != nil {
return
}
const maxRemoteSignerMsgSize = 1024 * 10
protoReader := protoio.NewDelimitedReader(se.conn, maxRemoteSignerMsgSize)
_, err = protoReader.ReadMsg(&msg)
if _, ok := err.(timeoutError); ok {
if err != nil {
err = fmt.Errorf("%v: %w", err, ErrReadTimeout)
} else {
err = fmt.Errorf("empty error: %w", ErrReadTimeout)
}
se.logger.Debug("Dropping [read]", "obj", se)
se.dropConnection()
}
return
}
// WriteMessage writes a message from the endpoint
func (se *signerEndpoint) WriteMessage(msg privvalproto.Message) (err error) {
se.connMtx.Lock()
defer se.connMtx.Unlock()
if !se.isConnected() {
return fmt.Errorf("endpoint is not connected: %w", ErrNoConnection)
}
protoWriter := protoio.NewDelimitedWriter(se.conn)
// Reset read deadline
deadline := time.Now().Add(se.timeoutReadWrite)
err = se.conn.SetWriteDeadline(deadline)
if err != nil {
return
}
_, err = protoWriter.WriteMsg(&msg)
if _, ok := err.(timeoutError); ok {
if err != nil {
err = fmt.Errorf("%v: %w", err, ErrWriteTimeout)
} else {
err = fmt.Errorf("empty error: %w", ErrWriteTimeout)
}
se.dropConnection()
}
return
}
func (se *signerEndpoint) isConnected() bool {
return se.conn != nil
}
func (se *signerEndpoint) dropConnection() {
if se.conn != nil {
if err := se.conn.Close(); err != nil {
se.logger.Error("signerEndpoint::dropConnection", "err", err)
}
se.conn = nil
}
}