You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
5.9 KiB

privval: improve Remote Signer implementation (#3351) This issue is related to #3107 This is a first renaming/refactoring step before reworking and removing heartbeats. As discussed with @Liamsi , we preferred to go for a couple of independent and separate PRs to simplify review work. The changes: Help to clarify the relation between the validator and remote signer endpoints Differentiate between timeouts and deadlines Prepare to encapsulate networking related code behind RemoteSigner in the next PR My intention is to separate and encapsulate the "network related" code from the actual signer. SignerRemote ---(uses/contains)--> SignerValidatorEndpoint <--(connects to)--> SignerServiceEndpoint ---> SignerService (future.. not here yet but would like to decouple too) All reconnection/heartbeat/whatever code goes in the endpoints. Signer[Remote/Service] do not need to know about that. I agree Endpoint may not be the perfect name. I tried to find something "Go-ish" enough. It is a common name in go-kit, kubernetes, etc. Right now: SignerValidatorEndpoint: handles the listener contains SignerRemote Implements the PrivValidator interface connects and sets a connection object in a contained SignerRemote delegates PrivValidator some calls to SignerRemote which in turn uses the conn object that was set externally SignerRemote: Implements the PrivValidator interface read/writes from a connection object directly handles heartbeats SignerServiceEndpoint: Does most things in a single place delegates to a PrivValidator IIRC. * cleanup * Refactoring step 1 * Refactoring step 2 * move messages to another file * mark for future work / next steps * mark deprecated classes in docs * Fix linter problems * additional linter fixes
6 years ago
  1. package privval
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "time"
  7. "github.com/tendermint/tendermint/crypto"
  8. cmn "github.com/tendermint/tendermint/libs/common"
  9. "github.com/tendermint/tendermint/libs/log"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. const (
  13. defaultHeartbeatSeconds = 2
  14. defaultMaxDialRetries = 10
  15. )
  16. var (
  17. heartbeatPeriod = time.Second * defaultHeartbeatSeconds
  18. )
  19. // SignerValidatorEndpointOption sets an optional parameter on the SocketVal.
  20. type SignerValidatorEndpointOption func(*SignerValidatorEndpoint)
  21. // SignerValidatorEndpointSetHeartbeat sets the period on which to check the liveness of the
  22. // connected Signer connections.
  23. func SignerValidatorEndpointSetHeartbeat(period time.Duration) SignerValidatorEndpointOption {
  24. return func(sc *SignerValidatorEndpoint) { sc.heartbeatPeriod = period }
  25. }
  26. // SocketVal implements PrivValidator.
  27. // It listens for an external process to dial in and uses
  28. // the socket to request signatures.
  29. type SignerValidatorEndpoint struct {
  30. cmn.BaseService
  31. listener net.Listener
  32. // ping
  33. cancelPingCh chan struct{}
  34. pingTicker *time.Ticker
  35. heartbeatPeriod time.Duration
  36. // signer is mutable since it can be reset if the connection fails.
  37. // failures are detected by a background ping routine.
  38. // All messages are request/response, so we hold the mutex
  39. // so only one request/response pair can happen at a time.
  40. // Methods on the underlying net.Conn itself are already goroutine safe.
  41. mtx sync.Mutex
  42. // TODO: Signer should encapsulate and hide the endpoint completely. Invert the relation
  43. signer *SignerRemote
  44. }
  45. // Check that SignerValidatorEndpoint implements PrivValidator.
  46. var _ types.PrivValidator = (*SignerValidatorEndpoint)(nil)
  47. // NewSignerValidatorEndpoint returns an instance of SignerValidatorEndpoint.
  48. func NewSignerValidatorEndpoint(logger log.Logger, listener net.Listener) *SignerValidatorEndpoint {
  49. sc := &SignerValidatorEndpoint{
  50. listener: listener,
  51. heartbeatPeriod: heartbeatPeriod,
  52. }
  53. sc.BaseService = *cmn.NewBaseService(logger, "SignerValidatorEndpoint", sc)
  54. return sc
  55. }
  56. //--------------------------------------------------------
  57. // Implement PrivValidator
  58. // GetPubKey implements PrivValidator.
  59. func (ve *SignerValidatorEndpoint) GetPubKey() crypto.PubKey {
  60. ve.mtx.Lock()
  61. defer ve.mtx.Unlock()
  62. return ve.signer.GetPubKey()
  63. }
  64. // SignVote implements PrivValidator.
  65. func (ve *SignerValidatorEndpoint) SignVote(chainID string, vote *types.Vote) error {
  66. ve.mtx.Lock()
  67. defer ve.mtx.Unlock()
  68. return ve.signer.SignVote(chainID, vote)
  69. }
  70. // SignProposal implements PrivValidator.
  71. func (ve *SignerValidatorEndpoint) SignProposal(chainID string, proposal *types.Proposal) error {
  72. ve.mtx.Lock()
  73. defer ve.mtx.Unlock()
  74. return ve.signer.SignProposal(chainID, proposal)
  75. }
  76. //--------------------------------------------------------
  77. // More thread safe methods proxied to the signer
  78. // Ping is used to check connection health.
  79. func (ve *SignerValidatorEndpoint) Ping() error {
  80. ve.mtx.Lock()
  81. defer ve.mtx.Unlock()
  82. return ve.signer.Ping()
  83. }
  84. // Close closes the underlying net.Conn.
  85. func (ve *SignerValidatorEndpoint) Close() {
  86. ve.mtx.Lock()
  87. defer ve.mtx.Unlock()
  88. if ve.signer != nil {
  89. if err := ve.signer.Close(); err != nil {
  90. ve.Logger.Error("OnStop", "err", err)
  91. }
  92. }
  93. if ve.listener != nil {
  94. if err := ve.listener.Close(); err != nil {
  95. ve.Logger.Error("OnStop", "err", err)
  96. }
  97. }
  98. }
  99. //--------------------------------------------------------
  100. // Service start and stop
  101. // OnStart implements cmn.Service.
  102. func (ve *SignerValidatorEndpoint) OnStart() error {
  103. if closed, err := ve.reset(); err != nil {
  104. ve.Logger.Error("OnStart", "err", err)
  105. return err
  106. } else if closed {
  107. return fmt.Errorf("listener is closed")
  108. }
  109. // Start a routine to keep the connection alive
  110. ve.cancelPingCh = make(chan struct{}, 1)
  111. ve.pingTicker = time.NewTicker(ve.heartbeatPeriod)
  112. go func() {
  113. for {
  114. select {
  115. case <-ve.pingTicker.C:
  116. err := ve.Ping()
  117. if err != nil {
  118. ve.Logger.Error("Ping", "err", err)
  119. if err == ErrUnexpectedResponse {
  120. return
  121. }
  122. closed, err := ve.reset()
  123. if err != nil {
  124. ve.Logger.Error("Reconnecting to remote signer failed", "err", err)
  125. continue
  126. }
  127. if closed {
  128. ve.Logger.Info("listener is closing")
  129. return
  130. }
  131. ve.Logger.Info("Re-created connection to remote signer", "impl", ve)
  132. }
  133. case <-ve.cancelPingCh:
  134. ve.pingTicker.Stop()
  135. return
  136. }
  137. }
  138. }()
  139. return nil
  140. }
  141. // OnStop implements cmn.Service.
  142. func (ve *SignerValidatorEndpoint) OnStop() {
  143. if ve.cancelPingCh != nil {
  144. close(ve.cancelPingCh)
  145. }
  146. ve.Close()
  147. }
  148. //--------------------------------------------------------
  149. // Connection and signer management
  150. // waits to accept and sets a new connection.
  151. // connection is closed in OnStop.
  152. // returns true if the listener is closed
  153. // (ie. it returns a nil conn).
  154. func (ve *SignerValidatorEndpoint) reset() (closed bool, err error) {
  155. ve.mtx.Lock()
  156. defer ve.mtx.Unlock()
  157. // first check if the conn already exists and close it.
  158. if ve.signer != nil {
  159. if tmpErr := ve.signer.Close(); tmpErr != nil {
  160. ve.Logger.Error("error closing socket val connection during reset", "err", tmpErr)
  161. }
  162. }
  163. // wait for a new conn
  164. conn, err := ve.acceptConnection()
  165. if err != nil {
  166. return false, err
  167. }
  168. // listener is closed
  169. if conn == nil {
  170. return true, nil
  171. }
  172. ve.signer, err = NewSignerRemote(conn)
  173. if err != nil {
  174. // failed to fetch the pubkey. close out the connection.
  175. if tmpErr := conn.Close(); tmpErr != nil {
  176. ve.Logger.Error("error closing connection", "err", tmpErr)
  177. }
  178. return false, err
  179. }
  180. return false, nil
  181. }
  182. // Attempt to accept a connection.
  183. // Times out after the listener's timeoutAccept
  184. func (ve *SignerValidatorEndpoint) acceptConnection() (net.Conn, error) {
  185. conn, err := ve.listener.Accept()
  186. if err != nil {
  187. if !ve.IsRunning() {
  188. return nil, nil // Ignore error from listener closing.
  189. }
  190. return nil, err
  191. }
  192. return conn, nil
  193. }