Browse Source

privval: increase read/write timeout to 5s and calculate ping interva… (#5666)

…l based on it (#5638)

Partially closes #5550
pull/5668/head
Anton Kaliaev 4 years ago
committed by GitHub
parent
commit
9567477d55
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 49 additions and 22 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +11
    -9
      privval/signer_dialer_endpoint.go
  3. +1
    -1
      privval/signer_endpoint.go
  4. +30
    -9
      privval/signer_listener_endpoint.go
  5. +5
    -1
      privval/signer_listener_endpoint_test.go
  6. +1
    -2
      privval/socket_listeners.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -35,3 +35,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [evidence] \#5574 Fix bug where node sends committed evidence to peer (@cmwaters) - [evidence] \#5574 Fix bug where node sends committed evidence to peer (@cmwaters)
- [privval] \#5583 Make `Vote`, `Proposal` & `PubKey` non-nullable in Responses (@marbar3778) - [privval] \#5583 Make `Vote`, `Proposal` & `PubKey` non-nullable in Responses (@marbar3778)
- [evidence] \#5610 Make it possible for abci evidence to be formed from tm evidence (@cmwaters) - [evidence] \#5610 Make it possible for abci evidence to be formed from tm evidence (@cmwaters)
- [privval] \#5638 Increase read/write timeout to 5s and calculate ping interval based on it (@JoeKash)

+ 11
- 9
privval/signer_dialer_endpoint.go View File

@ -15,24 +15,26 @@ const (
// SignerServiceEndpointOption sets an optional parameter on the SignerDialerEndpoint. // SignerServiceEndpointOption sets an optional parameter on the SignerDialerEndpoint.
type SignerServiceEndpointOption func(*SignerDialerEndpoint) type SignerServiceEndpointOption func(*SignerDialerEndpoint)
// SignerDialerEndpointTimeoutReadWrite sets the read and write timeout for connections
// from external signing processes.
// SignerDialerEndpointTimeoutReadWrite sets the read and write timeout for
// connections from client processes.
func SignerDialerEndpointTimeoutReadWrite(timeout time.Duration) SignerServiceEndpointOption { func SignerDialerEndpointTimeoutReadWrite(timeout time.Duration) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.timeoutReadWrite = timeout } return func(ss *SignerDialerEndpoint) { ss.timeoutReadWrite = timeout }
} }
// SignerDialerEndpointConnRetries sets the amount of attempted retries to acceptNewConnection.
// SignerDialerEndpointConnRetries sets the amount of attempted retries to
// acceptNewConnection.
func SignerDialerEndpointConnRetries(retries int) SignerServiceEndpointOption { func SignerDialerEndpointConnRetries(retries int) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.maxConnRetries = retries } return func(ss *SignerDialerEndpoint) { ss.maxConnRetries = retries }
} }
// SignerDialerEndpointRetryWaitInterval sets the retry wait interval to a custom value
// SignerDialerEndpointRetryWaitInterval sets the retry wait interval to a
// custom value.
func SignerDialerEndpointRetryWaitInterval(interval time.Duration) SignerServiceEndpointOption { func SignerDialerEndpointRetryWaitInterval(interval time.Duration) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.retryWait = interval } return func(ss *SignerDialerEndpoint) { ss.retryWait = interval }
} }
// SignerDialerEndpoint dials using its dialer and responds to any
// signature requests using its privVal.
// SignerDialerEndpoint dials using its dialer and responds to any signature
// requests using its privVal.
type SignerDialerEndpoint struct { type SignerDialerEndpoint struct {
signerEndpoint signerEndpoint
@ -57,13 +59,13 @@ func NewSignerDialerEndpoint(
maxConnRetries: defaultMaxDialRetries, maxConnRetries: defaultMaxDialRetries,
} }
sd.BaseService = *service.NewBaseService(logger, "SignerDialerEndpoint", sd)
sd.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second
for _, optionFunc := range options { for _, optionFunc := range options {
optionFunc(sd) optionFunc(sd)
} }
sd.BaseService = *service.NewBaseService(logger, "SignerDialerEndpoint", sd)
sd.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second
return sd return sd
} }


+ 1
- 1
privval/signer_endpoint.go View File

@ -12,7 +12,7 @@ import (
) )
const ( const (
defaultTimeoutReadWriteSeconds = 3
defaultTimeoutReadWriteSeconds = 5
) )
type signerEndpoint struct { type signerEndpoint struct {


+ 30
- 9
privval/signer_listener_endpoint.go View File

@ -11,11 +11,22 @@ import (
privvalproto "github.com/tendermint/tendermint/proto/tendermint/privval" privvalproto "github.com/tendermint/tendermint/proto/tendermint/privval"
) )
// SignerValidatorEndpointOption sets an optional parameter on the SocketVal.
type SignerValidatorEndpointOption func(*SignerListenerEndpoint)
// SignerListenerEndpointOption sets an optional parameter on the SignerListenerEndpoint.
type SignerListenerEndpointOption func(*SignerListenerEndpoint)
// SignerListenerEndpointTimeoutReadWrite sets the read and write timeout for
// connections from external signing processes.
//
// Default: 5s
func SignerListenerEndpointTimeoutReadWrite(timeout time.Duration) SignerListenerEndpointOption {
return func(sl *SignerListenerEndpoint) { sl.signerEndpoint.timeoutReadWrite = timeout }
}
// SignerListenerEndpoint listens for an external process to dial in
// and keeps the connection alive by dropping and reconnecting
// SignerListenerEndpoint listens for an external process to dial in and keeps
// the connection alive by dropping and reconnecting.
//
// The process will send pings every ~3s (read/write timeout * 2/3) to keep the
// connection alive.
type SignerListenerEndpoint struct { type SignerListenerEndpoint struct {
signerEndpoint signerEndpoint
@ -25,6 +36,7 @@ type SignerListenerEndpoint struct {
timeoutAccept time.Duration timeoutAccept time.Duration
pingTimer *time.Ticker pingTimer *time.Ticker
pingInterval time.Duration
instanceMtx tmsync.Mutex // Ensures instance public methods access, i.e. SendRequest instanceMtx tmsync.Mutex // Ensures instance public methods access, i.e. SendRequest
} }
@ -33,15 +45,21 @@ type SignerListenerEndpoint struct {
func NewSignerListenerEndpoint( func NewSignerListenerEndpoint(
logger log.Logger, logger log.Logger,
listener net.Listener, listener net.Listener,
options ...SignerListenerEndpointOption,
) *SignerListenerEndpoint { ) *SignerListenerEndpoint {
sc := &SignerListenerEndpoint{
sl := &SignerListenerEndpoint{
listener: listener, listener: listener,
timeoutAccept: defaultTimeoutAcceptSeconds * time.Second, timeoutAccept: defaultTimeoutAcceptSeconds * time.Second,
} }
sc.BaseService = *service.NewBaseService(logger, "SignerListenerEndpoint", sc)
sc.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second
return sc
sl.BaseService = *service.NewBaseService(logger, "SignerListenerEndpoint", sl)
sl.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second
for _, optionFunc := range options {
optionFunc(sl)
}
return sl
} }
// OnStart implements service.Service. // OnStart implements service.Service.
@ -49,7 +67,9 @@ func (sl *SignerListenerEndpoint) OnStart() error {
sl.connectRequestCh = make(chan struct{}) sl.connectRequestCh = make(chan struct{})
sl.connectionAvailableCh = make(chan net.Conn) sl.connectionAvailableCh = make(chan net.Conn)
sl.pingTimer = time.NewTicker(defaultPingPeriodMilliseconds * time.Millisecond)
// NOTE: ping timeout must be less than read/write timeout
sl.pingInterval = time.Duration(sl.signerEndpoint.timeoutReadWrite.Milliseconds()*2/3) * time.Millisecond
sl.pingTimer = time.NewTicker(sl.pingInterval)
go sl.serviceLoop() go sl.serviceLoop()
go sl.pingLoop() go sl.pingLoop()
@ -117,6 +137,7 @@ func (sl *SignerListenerEndpoint) ensureConnection(maxWait time.Duration) error
} }
// block until connected or timeout // block until connected or timeout
sl.Logger.Info("SignerListener: Blocking for connection")
sl.triggerConnect() sl.triggerConnect()
err := sl.WaitConnection(sl.connectionAvailableCh, maxWait) err := sl.WaitConnection(sl.connectionAvailableCh, maxWait)
if err != nil { if err != nil {


+ 5
- 1
privval/signer_listener_endpoint_test.go View File

@ -168,7 +168,11 @@ func newSignerListenerEndpoint(logger log.Logger, addr string, timeoutReadWrite
listener = tcpLn listener = tcpLn
} }
return NewSignerListenerEndpoint(logger, listener)
return NewSignerListenerEndpoint(
logger,
listener,
SignerListenerEndpointTimeoutReadWrite(testTimeoutReadWrite),
)
} }
func startListenerEndpointAsync(t *testing.T, sle *SignerListenerEndpoint, endpointIsOpenCh chan struct{}) { func startListenerEndpointAsync(t *testing.T, sle *SignerListenerEndpoint, endpointIsOpenCh chan struct{}) {


+ 1
- 2
privval/socket_listeners.go View File

@ -9,8 +9,7 @@ import (
) )
const ( const (
defaultTimeoutAcceptSeconds = 3
defaultPingPeriodMilliseconds = 100
defaultTimeoutAcceptSeconds = 3
) )
// timeoutError can be used to check if an error returned from the netp package // timeoutError can be used to check if an error returned from the netp package


Loading…
Cancel
Save