You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

467 lines
10 KiB

7 years ago
7 years ago
7 years ago
  1. package types
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "time"
  7. "github.com/pkg/errors"
  8. crypto "github.com/tendermint/go-crypto"
  9. wire "github.com/tendermint/go-wire"
  10. cmn "github.com/tendermint/tmlibs/common"
  11. "github.com/tendermint/tmlibs/log"
  12. "golang.org/x/net/netutil"
  13. p2pconn "github.com/tendermint/tendermint/p2p/conn"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. const (
  17. defaultConnDeadlineSeconds = 3
  18. defaultDialRetryIntervalSeconds = 1
  19. defaultDialRetryMax = 10
  20. )
  21. // Socket errors.
  22. var (
  23. ErrDialRetryMax = errors.New("Error max client retries")
  24. )
  25. var (
  26. connDeadline = time.Second * defaultConnDeadlineSeconds
  27. )
  28. // SocketClientOption sets an optional parameter on the SocketClient.
  29. type SocketClientOption func(*socketClient)
  30. // SocketClientTimeout sets the timeout for connecting to the external socket
  31. // address.
  32. func SocketClientTimeout(timeout time.Duration) SocketClientOption {
  33. return func(sc *socketClient) { sc.connectTimeout = timeout }
  34. }
  35. // socketClient implements PrivValidator, it uses a socket to request signatures
  36. // from an external process.
  37. type socketClient struct {
  38. cmn.BaseService
  39. conn net.Conn
  40. privKey *crypto.PrivKeyEd25519
  41. addr string
  42. connectTimeout time.Duration
  43. }
  44. // Check that socketClient implements PrivValidator2.
  45. var _ types.PrivValidator2 = (*socketClient)(nil)
  46. // NewsocketClient returns an instance of socketClient.
  47. func NewSocketClient(
  48. logger log.Logger,
  49. socketAddr string,
  50. privKey *crypto.PrivKeyEd25519,
  51. ) *socketClient {
  52. sc := &socketClient{
  53. addr: socketAddr,
  54. connectTimeout: time.Second * defaultConnDeadlineSeconds,
  55. privKey: privKey,
  56. }
  57. sc.BaseService = *cmn.NewBaseService(logger, "privValidatorsocketClient", sc)
  58. return sc
  59. }
  60. // OnStart implements cmn.Service.
  61. func (sc *socketClient) OnStart() error {
  62. if err := sc.BaseService.OnStart(); err != nil {
  63. return err
  64. }
  65. conn, err := sc.connect()
  66. if err != nil {
  67. return err
  68. }
  69. sc.conn = conn
  70. return nil
  71. }
  72. // OnStop implements cmn.Service.
  73. func (sc *socketClient) OnStop() {
  74. sc.BaseService.OnStop()
  75. if sc.conn != nil {
  76. sc.conn.Close()
  77. }
  78. }
  79. // GetAddress implements PrivValidator.
  80. // TODO(xla): Remove when PrivValidator2 replaced PrivValidator.
  81. func (sc *socketClient) GetAddress() types.Address {
  82. addr, err := sc.Address()
  83. if err != nil {
  84. panic(err)
  85. }
  86. return addr
  87. }
  88. // Address is an alias for PubKey().Address().
  89. func (sc *socketClient) Address() (cmn.HexBytes, error) {
  90. p, err := sc.PubKey()
  91. if err != nil {
  92. return nil, err
  93. }
  94. return p.Address(), nil
  95. }
  96. // GetPubKey implements PrivValidator.
  97. // TODO(xla): Remove when PrivValidator2 replaced PrivValidator.
  98. func (sc *socketClient) GetPubKey() crypto.PubKey {
  99. pubKey, err := sc.PubKey()
  100. if err != nil {
  101. panic(err)
  102. }
  103. return pubKey
  104. }
  105. // PubKey implements PrivValidator2.
  106. func (sc *socketClient) PubKey() (crypto.PubKey, error) {
  107. err := writeMsg(sc.conn, &PubKeyMsg{})
  108. if err != nil {
  109. return crypto.PubKey{}, err
  110. }
  111. res, err := readMsg(sc.conn)
  112. if err != nil {
  113. return crypto.PubKey{}, err
  114. }
  115. return res.(*PubKeyMsg).PubKey, nil
  116. }
  117. // SignVote implements PrivValidator2.
  118. func (sc *socketClient) SignVote(chainID string, vote *types.Vote) error {
  119. err := writeMsg(sc.conn, &SignVoteMsg{Vote: vote})
  120. if err != nil {
  121. return err
  122. }
  123. res, err := readMsg(sc.conn)
  124. if err != nil {
  125. return err
  126. }
  127. *vote = *res.(*SignVoteMsg).Vote
  128. return nil
  129. }
  130. // SignProposal implements PrivValidator2.
  131. func (sc *socketClient) SignProposal(chainID string, proposal *types.Proposal) error {
  132. err := writeMsg(sc.conn, &SignProposalMsg{Proposal: proposal})
  133. if err != nil {
  134. return err
  135. }
  136. res, err := readMsg(sc.conn)
  137. if err != nil {
  138. return err
  139. }
  140. *proposal = *res.(*SignProposalMsg).Proposal
  141. return nil
  142. }
  143. // SignHeartbeat implements PrivValidator2.
  144. func (sc *socketClient) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error {
  145. err := writeMsg(sc.conn, &SignHeartbeatMsg{Heartbeat: heartbeat})
  146. if err != nil {
  147. return err
  148. }
  149. res, err := readMsg(sc.conn)
  150. if err != nil {
  151. return err
  152. }
  153. *heartbeat = *res.(*SignHeartbeatMsg).Heartbeat
  154. return nil
  155. }
  156. func (sc *socketClient) connect() (net.Conn, error) {
  157. retries := defaultDialRetryMax
  158. RETRY_LOOP:
  159. for retries > 0 {
  160. if retries != defaultDialRetryMax {
  161. time.Sleep(sc.connectTimeout)
  162. }
  163. retries--
  164. conn, err := cmn.Connect(sc.addr)
  165. if err != nil {
  166. sc.Logger.Error(
  167. "sc connect",
  168. "addr", sc.addr,
  169. "err", errors.Wrap(err, "connection failed"),
  170. )
  171. continue RETRY_LOOP
  172. }
  173. if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil {
  174. sc.Logger.Error(
  175. "sc connect",
  176. "err", errors.Wrap(err, "setting connection timeout failed"),
  177. )
  178. continue
  179. }
  180. if sc.privKey != nil {
  181. conn, err = p2pconn.MakeSecretConnection(conn, sc.privKey.Wrap())
  182. if err != nil {
  183. sc.Logger.Error(
  184. "sc connect",
  185. "err", errors.Wrap(err, "encrypting connection failed"),
  186. )
  187. continue RETRY_LOOP
  188. }
  189. }
  190. return conn, nil
  191. }
  192. return nil, ErrDialRetryMax
  193. }
  194. //---------------------------------------------------------
  195. // PrivValidatorSocketServer implements PrivValidator.
  196. // It responds to requests over a socket
  197. type PrivValidatorSocketServer struct {
  198. cmn.BaseService
  199. proto, addr string
  200. listener net.Listener
  201. maxConnections int
  202. privKey *crypto.PrivKeyEd25519
  203. privVal PrivValidator
  204. chainID string
  205. }
  206. // NewPrivValidatorSocketServer returns an instance of
  207. // PrivValidatorSocketServer.
  208. func NewPrivValidatorSocketServer(
  209. logger log.Logger,
  210. chainID, socketAddr string,
  211. maxConnections int,
  212. privVal PrivValidator,
  213. privKey *crypto.PrivKeyEd25519,
  214. ) *PrivValidatorSocketServer {
  215. proto, addr := cmn.ProtocolAndAddress(socketAddr)
  216. pvss := &PrivValidatorSocketServer{
  217. proto: proto,
  218. addr: addr,
  219. maxConnections: maxConnections,
  220. privKey: privKey,
  221. privVal: privVal,
  222. chainID: chainID,
  223. }
  224. pvss.BaseService = *cmn.NewBaseService(logger, "privValidatorSocketServer", pvss)
  225. return pvss
  226. }
  227. // OnStart implements cmn.Service.
  228. func (pvss *PrivValidatorSocketServer) OnStart() error {
  229. ln, err := net.Listen(pvss.proto, pvss.addr)
  230. if err != nil {
  231. return err
  232. }
  233. pvss.listener = netutil.LimitListener(ln, pvss.maxConnections)
  234. go pvss.acceptConnections()
  235. return nil
  236. }
  237. // OnStop implements cmn.Service.
  238. func (pvss *PrivValidatorSocketServer) OnStop() {
  239. if pvss.listener == nil {
  240. return
  241. }
  242. if err := pvss.listener.Close(); err != nil {
  243. pvss.Logger.Error("OnStop", "err", errors.Wrap(err, "closing listener failed"))
  244. }
  245. }
  246. func (pvss *PrivValidatorSocketServer) acceptConnections() {
  247. for {
  248. conn, err := pvss.listener.Accept()
  249. if err != nil {
  250. if !pvss.IsRunning() {
  251. return // Ignore error from listener closing.
  252. }
  253. pvss.Logger.Error(
  254. "accpetConnections",
  255. "err", errors.Wrap(err, "failed to accept connection"),
  256. )
  257. continue
  258. }
  259. if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil {
  260. pvss.Logger.Error(
  261. "acceptConnetions",
  262. "err", errors.Wrap(err, "setting connection timeout failed"),
  263. )
  264. continue
  265. }
  266. if pvss.privKey != nil {
  267. conn, err = p2pconn.MakeSecretConnection(conn, pvss.privKey.Wrap())
  268. if err != nil {
  269. pvss.Logger.Error(
  270. "acceptConnections",
  271. "err", errors.Wrap(err, "secret connection failed"),
  272. )
  273. continue
  274. }
  275. }
  276. go pvss.handleConnection(conn)
  277. }
  278. }
  279. func (pvss *PrivValidatorSocketServer) handleConnection(conn net.Conn) {
  280. defer conn.Close()
  281. for {
  282. if !pvss.IsRunning() {
  283. return // Ignore error from listener closing.
  284. }
  285. req, err := readMsg(conn)
  286. if err != nil {
  287. if err != io.EOF {
  288. pvss.Logger.Error("handleConnection", "err", err)
  289. }
  290. return
  291. }
  292. var res PrivValidatorSocketMsg
  293. switch r := req.(type) {
  294. case *PubKeyMsg:
  295. var p crypto.PubKey
  296. p, err = pvss.privVal.PubKey()
  297. res = &PubKeyMsg{p}
  298. case *SignVoteMsg:
  299. err = pvss.privVal.SignVote(pvss.chainID, r.Vote)
  300. res = &SignVoteMsg{r.Vote}
  301. case *SignProposalMsg:
  302. err = pvss.privVal.SignProposal(pvss.chainID, r.Proposal)
  303. res = &SignProposalMsg{r.Proposal}
  304. case *SignHeartbeatMsg:
  305. err = pvss.privVal.SignHeartbeat(pvss.chainID, r.Heartbeat)
  306. res = &SignHeartbeatMsg{r.Heartbeat}
  307. default:
  308. err = fmt.Errorf("unknown msg: %v", r)
  309. }
  310. if err != nil {
  311. pvss.Logger.Error("handleConnection", "err", err)
  312. return
  313. }
  314. err = writeMsg(conn, res)
  315. if err != nil {
  316. pvss.Logger.Error("handleConnection", "err", err)
  317. return
  318. }
  319. }
  320. }
  321. //---------------------------------------------------------
  322. const (
  323. msgTypePubKey = byte(0x01)
  324. msgTypeSignVote = byte(0x10)
  325. msgTypeSignProposal = byte(0x11)
  326. msgTypeSignHeartbeat = byte(0x12)
  327. )
  328. // PrivValidatorSocketMsg is a message sent between PrivValidatorSocket client
  329. // and server.
  330. type PrivValidatorSocketMsg interface{}
  331. var _ = wire.RegisterInterface(
  332. struct{ PrivValidatorSocketMsg }{},
  333. wire.ConcreteType{&PubKeyMsg{}, msgTypePubKey},
  334. wire.ConcreteType{&SignVoteMsg{}, msgTypeSignVote},
  335. wire.ConcreteType{&SignProposalMsg{}, msgTypeSignProposal},
  336. wire.ConcreteType{&SignHeartbeatMsg{}, msgTypeSignHeartbeat},
  337. )
  338. // PubKeyMsg is a PrivValidatorSocket message containing the public key.
  339. type PubKeyMsg struct {
  340. PubKey crypto.PubKey
  341. }
  342. // SignVoteMsg is a PrivValidatorSocket message containing a vote.
  343. type SignVoteMsg struct {
  344. Vote *types.Vote
  345. }
  346. // SignProposalMsg is a PrivValidatorSocket message containing a Proposal.
  347. type SignProposalMsg struct {
  348. Proposal *types.Proposal
  349. }
  350. // SignHeartbeatMsg is a PrivValidatorSocket message containing a Heartbeat.
  351. type SignHeartbeatMsg struct {
  352. Heartbeat *types.Heartbeat
  353. }
  354. func readMsg(r io.Reader) (PrivValidatorSocketMsg, error) {
  355. var (
  356. n int
  357. err error
  358. )
  359. read := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, &n, &err)
  360. if err != nil {
  361. return nil, err
  362. }
  363. w, ok := read.(struct{ PrivValidatorSocketMsg })
  364. if !ok {
  365. return nil, errors.New("unknwon type")
  366. }
  367. return w.PrivValidatorSocketMsg, nil
  368. }
  369. func writeMsg(w io.Writer, msg interface{}) error {
  370. var (
  371. err error
  372. n int
  373. )
  374. // TODO(xla): This extra wrap should be gone with the sdk-2 update.
  375. wire.WriteBinary(struct{ PrivValidatorSocketMsg }{msg}, w, &n, &err)
  376. return err
  377. }