You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

605 lines
14 KiB

  1. package privval
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "net"
  7. "time"
  8. "github.com/tendermint/go-amino"
  9. "github.com/tendermint/tendermint/crypto"
  10. "github.com/tendermint/tendermint/crypto/ed25519"
  11. cmn "github.com/tendermint/tendermint/libs/common"
  12. "github.com/tendermint/tendermint/libs/log"
  13. p2pconn "github.com/tendermint/tendermint/p2p/conn"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. const (
  17. defaultAcceptDeadlineSeconds = 30 // tendermint waits this long for remote val to connect
  18. defaultConnDeadlineSeconds = 3 // must be set before each read
  19. defaultConnHeartBeatSeconds = 30 // tcp keep-alive period
  20. defaultConnWaitSeconds = 60 // XXX: is this redundant with the accept deadline?
  21. defaultDialRetries = 10 // try to connect to tendermint this many times
  22. )
  23. // Socket errors.
  24. var (
  25. ErrDialRetryMax = errors.New("dialed maximum retries")
  26. ErrConnWaitTimeout = errors.New("waited for remote signer for too long")
  27. ErrConnTimeout = errors.New("remote signer timed out")
  28. ErrUnexpectedResponse = errors.New("received unexpected response")
  29. )
  30. // SocketPVOption sets an optional parameter on the SocketPV.
  31. type SocketPVOption func(*SocketPV)
  32. // SocketPVAcceptDeadline sets the deadline for the SocketPV listener.
  33. // A zero time value disables the deadline.
  34. func SocketPVAcceptDeadline(deadline time.Duration) SocketPVOption {
  35. return func(sc *SocketPV) { sc.acceptDeadline = deadline }
  36. }
  37. // SocketPVConnDeadline sets the read and write deadline for connections
  38. // from external signing processes.
  39. func SocketPVConnDeadline(deadline time.Duration) SocketPVOption {
  40. return func(sc *SocketPV) { sc.connDeadline = deadline }
  41. }
  42. // SocketPVHeartbeat sets the period on which to check the liveness of the
  43. // connected Signer connections.
  44. func SocketPVHeartbeat(period time.Duration) SocketPVOption {
  45. return func(sc *SocketPV) { sc.connHeartbeat = period }
  46. }
  47. // SocketPVConnWait sets the timeout duration before connection of external
  48. // signing processes are considered to be unsuccessful.
  49. func SocketPVConnWait(timeout time.Duration) SocketPVOption {
  50. return func(sc *SocketPV) { sc.connWaitTimeout = timeout }
  51. }
  52. // SocketPV implements PrivValidator, it uses a socket to request signatures
  53. // from an external process.
  54. type SocketPV struct {
  55. cmn.BaseService
  56. addr string
  57. acceptDeadline time.Duration
  58. connDeadline time.Duration
  59. connHeartbeat time.Duration
  60. connWaitTimeout time.Duration
  61. privKey ed25519.PrivKeyEd25519
  62. conn net.Conn
  63. listener net.Listener
  64. }
  65. // Check that SocketPV implements PrivValidator.
  66. var _ types.PrivValidator = (*SocketPV)(nil)
  67. // NewSocketPV returns an instance of SocketPV.
  68. func NewSocketPV(
  69. logger log.Logger,
  70. socketAddr string,
  71. privKey ed25519.PrivKeyEd25519,
  72. ) *SocketPV {
  73. sc := &SocketPV{
  74. addr: socketAddr,
  75. acceptDeadline: time.Second * defaultAcceptDeadlineSeconds,
  76. connDeadline: time.Second * defaultConnDeadlineSeconds,
  77. connHeartbeat: time.Second * defaultConnHeartBeatSeconds,
  78. connWaitTimeout: time.Second * defaultConnWaitSeconds,
  79. privKey: privKey,
  80. }
  81. sc.BaseService = *cmn.NewBaseService(logger, "SocketPV", sc)
  82. return sc
  83. }
  84. // GetAddress implements PrivValidator.
  85. func (sc *SocketPV) GetAddress() types.Address {
  86. addr, err := sc.getAddress()
  87. if err != nil {
  88. panic(err)
  89. }
  90. return addr
  91. }
  92. // Address is an alias for PubKey().Address().
  93. func (sc *SocketPV) getAddress() (cmn.HexBytes, error) {
  94. p, err := sc.getPubKey()
  95. if err != nil {
  96. return nil, err
  97. }
  98. return p.Address(), nil
  99. }
  100. // GetPubKey implements PrivValidator.
  101. func (sc *SocketPV) GetPubKey() crypto.PubKey {
  102. pubKey, err := sc.getPubKey()
  103. if err != nil {
  104. panic(err)
  105. }
  106. return pubKey
  107. }
  108. func (sc *SocketPV) getPubKey() (crypto.PubKey, error) {
  109. err := writeMsg(sc.conn, &PubKeyMsg{})
  110. if err != nil {
  111. return nil, err
  112. }
  113. res, err := readMsg(sc.conn)
  114. if err != nil {
  115. return nil, err
  116. }
  117. return res.(*PubKeyMsg).PubKey, nil
  118. }
  119. // SignVote implements PrivValidator.
  120. func (sc *SocketPV) SignVote(chainID string, vote *types.Vote) error {
  121. err := writeMsg(sc.conn, &SignVoteRequest{Vote: vote})
  122. if err != nil {
  123. return err
  124. }
  125. res, err := readMsg(sc.conn)
  126. if err != nil {
  127. return err
  128. }
  129. resp, ok := res.(*SignedVoteResponse)
  130. if !ok {
  131. return ErrUnexpectedResponse
  132. }
  133. if resp.Error != nil {
  134. return fmt.Errorf("remote error occurred: code: %v, description: %s",
  135. resp.Error.Code,
  136. resp.Error.Description)
  137. }
  138. *vote = *resp.Vote
  139. return nil
  140. }
  141. // SignProposal implements PrivValidator.
  142. func (sc *SocketPV) SignProposal(
  143. chainID string,
  144. proposal *types.Proposal,
  145. ) error {
  146. err := writeMsg(sc.conn, &SignProposalRequest{Proposal: proposal})
  147. if err != nil {
  148. return err
  149. }
  150. res, err := readMsg(sc.conn)
  151. if err != nil {
  152. return err
  153. }
  154. resp, ok := res.(*SignedProposalResponse)
  155. if !ok {
  156. return ErrUnexpectedResponse
  157. }
  158. if resp.Error != nil {
  159. return fmt.Errorf("remote error occurred: code: %v, description: %s",
  160. resp.Error.Code,
  161. resp.Error.Description)
  162. }
  163. *proposal = *resp.Proposal
  164. return nil
  165. }
  166. // SignHeartbeat implements PrivValidator.
  167. func (sc *SocketPV) SignHeartbeat(
  168. chainID string,
  169. heartbeat *types.Heartbeat,
  170. ) error {
  171. err := writeMsg(sc.conn, &SignHeartbeatRequest{Heartbeat: heartbeat})
  172. if err != nil {
  173. return err
  174. }
  175. res, err := readMsg(sc.conn)
  176. if err != nil {
  177. return err
  178. }
  179. resp, ok := res.(*SignedHeartbeatResponse)
  180. if !ok {
  181. return ErrUnexpectedResponse
  182. }
  183. if resp.Error != nil {
  184. return fmt.Errorf("remote error occurred: code: %v, description: %s",
  185. resp.Error.Code,
  186. resp.Error.Description)
  187. }
  188. *heartbeat = *resp.Heartbeat
  189. return nil
  190. }
  191. // OnStart implements cmn.Service.
  192. func (sc *SocketPV) OnStart() error {
  193. if err := sc.listen(); err != nil {
  194. err = cmn.ErrorWrap(err, "failed to listen")
  195. sc.Logger.Error(
  196. "OnStart",
  197. "err", err,
  198. )
  199. return err
  200. }
  201. conn, err := sc.waitConnection()
  202. if err != nil {
  203. err = cmn.ErrorWrap(err, "failed to accept connection")
  204. sc.Logger.Error(
  205. "OnStart",
  206. "err", err,
  207. )
  208. return err
  209. }
  210. sc.conn = conn
  211. return nil
  212. }
  213. // OnStop implements cmn.Service.
  214. func (sc *SocketPV) OnStop() {
  215. if sc.conn != nil {
  216. if err := sc.conn.Close(); err != nil {
  217. err = cmn.ErrorWrap(err, "failed to close connection")
  218. sc.Logger.Error(
  219. "OnStop",
  220. "err", err,
  221. )
  222. }
  223. }
  224. if sc.listener != nil {
  225. if err := sc.listener.Close(); err != nil {
  226. err = cmn.ErrorWrap(err, "failed to close listener")
  227. sc.Logger.Error(
  228. "OnStop",
  229. "err", err,
  230. )
  231. }
  232. }
  233. }
  234. func (sc *SocketPV) acceptConnection() (net.Conn, error) {
  235. conn, err := sc.listener.Accept()
  236. if err != nil {
  237. if !sc.IsRunning() {
  238. return nil, nil // Ignore error from listener closing.
  239. }
  240. return nil, err
  241. }
  242. conn, err = p2pconn.MakeSecretConnection(conn, sc.privKey)
  243. if err != nil {
  244. return nil, err
  245. }
  246. return conn, nil
  247. }
  248. func (sc *SocketPV) listen() error {
  249. ln, err := net.Listen(cmn.ProtocolAndAddress(sc.addr))
  250. if err != nil {
  251. return err
  252. }
  253. sc.listener = newTCPTimeoutListener(
  254. ln,
  255. sc.acceptDeadline,
  256. sc.connDeadline,
  257. sc.connHeartbeat,
  258. )
  259. return nil
  260. }
  261. // waitConnection uses the configured wait timeout to error if no external
  262. // process connects in the time period.
  263. func (sc *SocketPV) waitConnection() (net.Conn, error) {
  264. var (
  265. connc = make(chan net.Conn, 1)
  266. errc = make(chan error, 1)
  267. )
  268. go func(connc chan<- net.Conn, errc chan<- error) {
  269. conn, err := sc.acceptConnection()
  270. if err != nil {
  271. errc <- err
  272. return
  273. }
  274. connc <- conn
  275. }(connc, errc)
  276. select {
  277. case conn := <-connc:
  278. return conn, nil
  279. case err := <-errc:
  280. if _, ok := err.(timeoutError); ok {
  281. return nil, cmn.ErrorWrap(ErrConnWaitTimeout, err.Error())
  282. }
  283. return nil, err
  284. case <-time.After(sc.connWaitTimeout):
  285. return nil, ErrConnWaitTimeout
  286. }
  287. }
  288. //---------------------------------------------------------
  289. // RemoteSignerOption sets an optional parameter on the RemoteSigner.
  290. type RemoteSignerOption func(*RemoteSigner)
  291. // RemoteSignerConnDeadline sets the read and write deadline for connections
  292. // from external signing processes.
  293. func RemoteSignerConnDeadline(deadline time.Duration) RemoteSignerOption {
  294. return func(ss *RemoteSigner) { ss.connDeadline = deadline }
  295. }
  296. // RemoteSignerConnRetries sets the amount of attempted retries to connect.
  297. func RemoteSignerConnRetries(retries int) RemoteSignerOption {
  298. return func(ss *RemoteSigner) { ss.connRetries = retries }
  299. }
  300. // RemoteSigner implements PrivValidator by dialing to a socket.
  301. type RemoteSigner struct {
  302. cmn.BaseService
  303. addr string
  304. chainID string
  305. connDeadline time.Duration
  306. connRetries int
  307. privKey ed25519.PrivKeyEd25519
  308. privVal types.PrivValidator
  309. conn net.Conn
  310. }
  311. // NewRemoteSigner returns an instance of RemoteSigner.
  312. func NewRemoteSigner(
  313. logger log.Logger,
  314. chainID, socketAddr string,
  315. privVal types.PrivValidator,
  316. privKey ed25519.PrivKeyEd25519,
  317. ) *RemoteSigner {
  318. rs := &RemoteSigner{
  319. addr: socketAddr,
  320. chainID: chainID,
  321. connDeadline: time.Second * defaultConnDeadlineSeconds,
  322. connRetries: defaultDialRetries,
  323. privKey: privKey,
  324. privVal: privVal,
  325. }
  326. rs.BaseService = *cmn.NewBaseService(logger, "RemoteSigner", rs)
  327. return rs
  328. }
  329. // OnStart implements cmn.Service.
  330. func (rs *RemoteSigner) OnStart() error {
  331. conn, err := rs.connect()
  332. if err != nil {
  333. err = cmn.ErrorWrap(err, "connect")
  334. rs.Logger.Error("OnStart", "err", err)
  335. return err
  336. }
  337. go rs.handleConnection(conn)
  338. return nil
  339. }
  340. // OnStop implements cmn.Service.
  341. func (rs *RemoteSigner) OnStop() {
  342. if rs.conn == nil {
  343. return
  344. }
  345. if err := rs.conn.Close(); err != nil {
  346. rs.Logger.Error("OnStop", "err", cmn.ErrorWrap(err, "closing listener failed"))
  347. }
  348. }
  349. func (rs *RemoteSigner) connect() (net.Conn, error) {
  350. for retries := rs.connRetries; retries > 0; retries-- {
  351. // Don't sleep if it is the first retry.
  352. if retries != rs.connRetries {
  353. time.Sleep(rs.connDeadline)
  354. }
  355. conn, err := cmn.Connect(rs.addr)
  356. if err != nil {
  357. err = cmn.ErrorWrap(err, "connection failed")
  358. rs.Logger.Error(
  359. "connect",
  360. "addr", rs.addr,
  361. "err", err,
  362. )
  363. continue
  364. }
  365. if err := conn.SetDeadline(time.Now().Add(time.Second * defaultConnDeadlineSeconds)); err != nil {
  366. err = cmn.ErrorWrap(err, "setting connection timeout failed")
  367. rs.Logger.Error(
  368. "connect",
  369. "err", err,
  370. )
  371. continue
  372. }
  373. conn, err = p2pconn.MakeSecretConnection(conn, rs.privKey)
  374. if err != nil {
  375. err = cmn.ErrorWrap(err, "encrypting connection failed")
  376. rs.Logger.Error(
  377. "connect",
  378. "err", err,
  379. )
  380. continue
  381. }
  382. return conn, nil
  383. }
  384. return nil, ErrDialRetryMax
  385. }
  386. func (rs *RemoteSigner) handleConnection(conn net.Conn) {
  387. for {
  388. if !rs.IsRunning() {
  389. return // Ignore error from listener closing.
  390. }
  391. req, err := readMsg(conn)
  392. if err != nil {
  393. if err != io.EOF {
  394. rs.Logger.Error("handleConnection", "err", err)
  395. }
  396. return
  397. }
  398. var res SocketPVMsg
  399. switch r := req.(type) {
  400. case *PubKeyMsg:
  401. var p crypto.PubKey
  402. p = rs.privVal.GetPubKey()
  403. res = &PubKeyMsg{p}
  404. case *SignVoteRequest:
  405. err = rs.privVal.SignVote(rs.chainID, r.Vote)
  406. if err != nil {
  407. res = &SignedVoteResponse{nil, &RemoteSignerError{0, err.Error()}}
  408. } else {
  409. res = &SignedVoteResponse{r.Vote, nil}
  410. }
  411. case *SignProposalRequest:
  412. err = rs.privVal.SignProposal(rs.chainID, r.Proposal)
  413. if err != nil {
  414. res = &SignedProposalResponse{nil, &RemoteSignerError{0, err.Error()}}
  415. } else {
  416. res = &SignedProposalResponse{r.Proposal, nil}
  417. }
  418. case *SignHeartbeatRequest:
  419. err = rs.privVal.SignHeartbeat(rs.chainID, r.Heartbeat)
  420. if err != nil {
  421. res = &SignedHeartbeatResponse{nil, &RemoteSignerError{0, err.Error()}}
  422. } else {
  423. res = &SignedHeartbeatResponse{r.Heartbeat, nil}
  424. }
  425. default:
  426. err = fmt.Errorf("unknown msg: %v", r)
  427. }
  428. if err != nil {
  429. // only log the error; we'll reply with an error in res
  430. rs.Logger.Error("handleConnection", "err", err)
  431. }
  432. err = writeMsg(conn, res)
  433. if err != nil {
  434. rs.Logger.Error("handleConnection", "err", err)
  435. return
  436. }
  437. }
  438. }
  439. //---------------------------------------------------------
  440. // SocketPVMsg is sent between RemoteSigner and SocketPV.
  441. type SocketPVMsg interface{}
  442. func RegisterSocketPVMsg(cdc *amino.Codec) {
  443. cdc.RegisterInterface((*SocketPVMsg)(nil), nil)
  444. cdc.RegisterConcrete(&PubKeyMsg{}, "tendermint/socketpv/PubKeyMsg", nil)
  445. cdc.RegisterConcrete(&SignVoteRequest{}, "tendermint/socketpv/SignVoteRequest", nil)
  446. cdc.RegisterConcrete(&SignedVoteResponse{}, "tendermint/socketpv/SignedVoteResponse", nil)
  447. cdc.RegisterConcrete(&SignProposalRequest{}, "tendermint/socketpv/SignProposalRequest", nil)
  448. cdc.RegisterConcrete(&SignedProposalResponse{}, "tendermint/socketpv/SignedProposalResponse", nil)
  449. cdc.RegisterConcrete(&SignHeartbeatRequest{}, "tendermint/socketpv/SignHeartbeatRequest", nil)
  450. cdc.RegisterConcrete(&SignedHeartbeatResponse{}, "tendermint/socketpv/SignedHeartbeatResponse", nil)
  451. }
  452. // PubKeyMsg is a PrivValidatorSocket message containing the public key.
  453. type PubKeyMsg struct {
  454. PubKey crypto.PubKey
  455. }
  456. // SignVoteRequest is a PrivValidatorSocket message containing a vote.
  457. type SignVoteRequest struct {
  458. Vote *types.Vote
  459. }
  460. // SignedVoteResponse is a PrivValidatorSocket message containing a signed vote along with a potenial error message.
  461. type SignedVoteResponse struct {
  462. Vote *types.Vote
  463. Error *RemoteSignerError
  464. }
  465. // SignProposalRequest is a PrivValidatorSocket message containing a Proposal.
  466. type SignProposalRequest struct {
  467. Proposal *types.Proposal
  468. }
  469. type SignedProposalResponse struct {
  470. Proposal *types.Proposal
  471. Error *RemoteSignerError
  472. }
  473. // SignHeartbeatRequest is a PrivValidatorSocket message containing a Heartbeat.
  474. type SignHeartbeatRequest struct {
  475. Heartbeat *types.Heartbeat
  476. }
  477. type SignedHeartbeatResponse struct {
  478. Heartbeat *types.Heartbeat
  479. Error *RemoteSignerError
  480. }
  481. // RemoteSignerError allows (remote) validators to include meaningful error descriptions in their reply.
  482. type RemoteSignerError struct {
  483. // TODO(ismail): create an enum of known errors
  484. Code int
  485. Description string
  486. }
  487. func readMsg(r io.Reader) (msg SocketPVMsg, err error) {
  488. const maxSocketPVMsgSize = 1024 * 10
  489. // set deadline before trying to read
  490. conn := r.(net.Conn)
  491. if err := conn.SetDeadline(time.Now().Add(time.Second * defaultConnDeadlineSeconds)); err != nil {
  492. err = cmn.ErrorWrap(err, "setting connection timeout failed in readMsg")
  493. return msg, err
  494. }
  495. _, err = cdc.UnmarshalBinaryReader(r, &msg, maxSocketPVMsgSize)
  496. if _, ok := err.(timeoutError); ok {
  497. err = cmn.ErrorWrap(ErrConnTimeout, err.Error())
  498. }
  499. return
  500. }
  501. func writeMsg(w io.Writer, msg interface{}) (err error) {
  502. _, err = cdc.MarshalBinaryWriter(w, msg)
  503. if _, ok := err.(timeoutError); ok {
  504. err = cmn.ErrorWrap(ErrConnTimeout, err.Error())
  505. }
  506. return
  507. }