You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

505 lines
14 KiB

privval: improve Remote Signer implementation (#3351) This issue is related to #3107 This is a first renaming/refactoring step before reworking and removing heartbeats. As discussed with @Liamsi , we preferred to go for a couple of independent and separate PRs to simplify review work. The changes: Help to clarify the relation between the validator and remote signer endpoints Differentiate between timeouts and deadlines Prepare to encapsulate networking related code behind RemoteSigner in the next PR My intention is to separate and encapsulate the "network related" code from the actual signer. SignerRemote ---(uses/contains)--> SignerValidatorEndpoint <--(connects to)--> SignerServiceEndpoint ---> SignerService (future.. not here yet but would like to decouple too) All reconnection/heartbeat/whatever code goes in the endpoints. Signer[Remote/Service] do not need to know about that. I agree Endpoint may not be the perfect name. I tried to find something "Go-ish" enough. It is a common name in go-kit, kubernetes, etc. Right now: SignerValidatorEndpoint: handles the listener contains SignerRemote Implements the PrivValidator interface connects and sets a connection object in a contained SignerRemote delegates PrivValidator some calls to SignerRemote which in turn uses the conn object that was set externally SignerRemote: Implements the PrivValidator interface read/writes from a connection object directly handles heartbeats SignerServiceEndpoint: Does most things in a single place delegates to a PrivValidator IIRC. * cleanup * Refactoring step 1 * Refactoring step 2 * move messages to another file * mark for future work / next steps * mark deprecated classes in docs * Fix linter problems * additional linter fixes
6 years ago
  1. package privval
  2. import (
  3. "fmt"
  4. "net"
  5. "testing"
  6. "time"
  7. "github.com/stretchr/testify/assert"
  8. "github.com/stretchr/testify/require"
  9. "github.com/tendermint/tendermint/crypto/ed25519"
  10. cmn "github.com/tendermint/tendermint/libs/common"
  11. "github.com/tendermint/tendermint/libs/log"
  12. "github.com/tendermint/tendermint/types"
  13. )
  14. var (
  15. testTimeoutAccept = defaultTimeoutAcceptSeconds * time.Second
  16. testTimeoutReadWrite = 100 * time.Millisecond
  17. testTimeoutReadWrite2o3 = 66 * time.Millisecond // 2/3 of the other one
  18. testTimeoutHeartbeat = 10 * time.Millisecond
  19. testTimeoutHeartbeat3o2 = 6 * time.Millisecond // 3/2 of the other one
  20. )
  21. type socketTestCase struct {
  22. addr string
  23. dialer SocketDialer
  24. }
  25. func socketTestCases(t *testing.T) []socketTestCase {
  26. tcpAddr := fmt.Sprintf("tcp://%s", testFreeTCPAddr(t))
  27. unixFilePath, err := testUnixAddr()
  28. require.NoError(t, err)
  29. unixAddr := fmt.Sprintf("unix://%s", unixFilePath)
  30. return []socketTestCase{
  31. {
  32. addr: tcpAddr,
  33. dialer: DialTCPFn(tcpAddr, testTimeoutReadWrite, ed25519.GenPrivKey()),
  34. },
  35. {
  36. addr: unixAddr,
  37. dialer: DialUnixFn(unixFilePath),
  38. },
  39. }
  40. }
  41. func TestSocketPVAddress(t *testing.T) {
  42. for _, tc := range socketTestCases(t) {
  43. // Execute the test within a closure to ensure the deferred statements
  44. // are called between each for loop iteration, for isolated test cases.
  45. func() {
  46. var (
  47. chainID = cmn.RandStr(12)
  48. validatorEndpoint, serviceEndpoint = testSetupSocketPair(t, chainID, types.NewMockPV(), tc.addr, tc.dialer)
  49. )
  50. defer validatorEndpoint.Stop()
  51. defer serviceEndpoint.Stop()
  52. serviceAddr := serviceEndpoint.privVal.GetPubKey().Address()
  53. validatorAddr := validatorEndpoint.GetPubKey().Address()
  54. assert.Equal(t, serviceAddr, validatorAddr)
  55. }()
  56. }
  57. }
  58. func TestSocketPVPubKey(t *testing.T) {
  59. for _, tc := range socketTestCases(t) {
  60. func() {
  61. var (
  62. chainID = cmn.RandStr(12)
  63. validatorEndpoint, serviceEndpoint = testSetupSocketPair(
  64. t,
  65. chainID,
  66. types.NewMockPV(),
  67. tc.addr,
  68. tc.dialer)
  69. )
  70. defer validatorEndpoint.Stop()
  71. defer serviceEndpoint.Stop()
  72. clientKey := validatorEndpoint.GetPubKey()
  73. privvalPubKey := serviceEndpoint.privVal.GetPubKey()
  74. assert.Equal(t, privvalPubKey, clientKey)
  75. }()
  76. }
  77. }
  78. func TestSocketPVProposal(t *testing.T) {
  79. for _, tc := range socketTestCases(t) {
  80. func() {
  81. var (
  82. chainID = cmn.RandStr(12)
  83. validatorEndpoint, serviceEndpoint = testSetupSocketPair(
  84. t,
  85. chainID,
  86. types.NewMockPV(),
  87. tc.addr,
  88. tc.dialer)
  89. ts = time.Now()
  90. privProposal = &types.Proposal{Timestamp: ts}
  91. clientProposal = &types.Proposal{Timestamp: ts}
  92. )
  93. defer validatorEndpoint.Stop()
  94. defer serviceEndpoint.Stop()
  95. require.NoError(t, serviceEndpoint.privVal.SignProposal(chainID, privProposal))
  96. require.NoError(t, validatorEndpoint.SignProposal(chainID, clientProposal))
  97. assert.Equal(t, privProposal.Signature, clientProposal.Signature)
  98. }()
  99. }
  100. }
  101. func TestSocketPVVote(t *testing.T) {
  102. for _, tc := range socketTestCases(t) {
  103. func() {
  104. var (
  105. chainID = cmn.RandStr(12)
  106. validatorEndpoint, serviceEndpoint = testSetupSocketPair(
  107. t,
  108. chainID,
  109. types.NewMockPV(),
  110. tc.addr,
  111. tc.dialer)
  112. ts = time.Now()
  113. vType = types.PrecommitType
  114. want = &types.Vote{Timestamp: ts, Type: vType}
  115. have = &types.Vote{Timestamp: ts, Type: vType}
  116. )
  117. defer validatorEndpoint.Stop()
  118. defer serviceEndpoint.Stop()
  119. require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want))
  120. require.NoError(t, validatorEndpoint.SignVote(chainID, have))
  121. assert.Equal(t, want.Signature, have.Signature)
  122. }()
  123. }
  124. }
  125. func TestSocketPVVoteResetDeadline(t *testing.T) {
  126. for _, tc := range socketTestCases(t) {
  127. func() {
  128. var (
  129. chainID = cmn.RandStr(12)
  130. validatorEndpoint, serviceEndpoint = testSetupSocketPair(
  131. t,
  132. chainID,
  133. types.NewMockPV(),
  134. tc.addr,
  135. tc.dialer)
  136. ts = time.Now()
  137. vType = types.PrecommitType
  138. want = &types.Vote{Timestamp: ts, Type: vType}
  139. have = &types.Vote{Timestamp: ts, Type: vType}
  140. )
  141. defer validatorEndpoint.Stop()
  142. defer serviceEndpoint.Stop()
  143. time.Sleep(testTimeoutReadWrite2o3)
  144. require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want))
  145. require.NoError(t, validatorEndpoint.SignVote(chainID, have))
  146. assert.Equal(t, want.Signature, have.Signature)
  147. // This would exceed the deadline if it was not extended by the previous message
  148. time.Sleep(testTimeoutReadWrite2o3)
  149. require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want))
  150. require.NoError(t, validatorEndpoint.SignVote(chainID, have))
  151. assert.Equal(t, want.Signature, have.Signature)
  152. }()
  153. }
  154. }
  155. func TestSocketPVVoteKeepalive(t *testing.T) {
  156. for _, tc := range socketTestCases(t) {
  157. func() {
  158. var (
  159. chainID = cmn.RandStr(12)
  160. validatorEndpoint, serviceEndpoint = testSetupSocketPair(
  161. t,
  162. chainID,
  163. types.NewMockPV(),
  164. tc.addr,
  165. tc.dialer)
  166. ts = time.Now()
  167. vType = types.PrecommitType
  168. want = &types.Vote{Timestamp: ts, Type: vType}
  169. have = &types.Vote{Timestamp: ts, Type: vType}
  170. )
  171. defer validatorEndpoint.Stop()
  172. defer serviceEndpoint.Stop()
  173. time.Sleep(testTimeoutReadWrite * 2)
  174. require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want))
  175. require.NoError(t, validatorEndpoint.SignVote(chainID, have))
  176. assert.Equal(t, want.Signature, have.Signature)
  177. }()
  178. }
  179. }
  180. func TestSocketPVDeadline(t *testing.T) {
  181. for _, tc := range socketTestCases(t) {
  182. func() {
  183. var (
  184. listenc = make(chan struct{})
  185. thisConnTimeout = 100 * time.Millisecond
  186. validatorEndpoint = newSignerValidatorEndpoint(log.TestingLogger(), tc.addr, thisConnTimeout)
  187. )
  188. go func(sc *SignerValidatorEndpoint) {
  189. defer close(listenc)
  190. // Note: the TCP connection times out at the accept() phase,
  191. // whereas the Unix domain sockets connection times out while
  192. // attempting to fetch the remote signer's public key.
  193. assert.True(t, IsConnTimeout(sc.Start()))
  194. assert.False(t, sc.IsRunning())
  195. }(validatorEndpoint)
  196. for {
  197. _, err := cmn.Connect(tc.addr)
  198. if err == nil {
  199. break
  200. }
  201. }
  202. <-listenc
  203. }()
  204. }
  205. }
  206. func TestRemoteSignVoteErrors(t *testing.T) {
  207. for _, tc := range socketTestCases(t) {
  208. func() {
  209. var (
  210. chainID = cmn.RandStr(12)
  211. validatorEndpoint, serviceEndpoint = testSetupSocketPair(
  212. t,
  213. chainID,
  214. types.NewErroringMockPV(),
  215. tc.addr,
  216. tc.dialer)
  217. ts = time.Now()
  218. vType = types.PrecommitType
  219. vote = &types.Vote{Timestamp: ts, Type: vType}
  220. )
  221. defer validatorEndpoint.Stop()
  222. defer serviceEndpoint.Stop()
  223. err := validatorEndpoint.SignVote("", vote)
  224. require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
  225. err = serviceEndpoint.privVal.SignVote(chainID, vote)
  226. require.Error(t, err)
  227. err = validatorEndpoint.SignVote(chainID, vote)
  228. require.Error(t, err)
  229. }()
  230. }
  231. }
  232. func TestRemoteSignProposalErrors(t *testing.T) {
  233. for _, tc := range socketTestCases(t) {
  234. func() {
  235. var (
  236. chainID = cmn.RandStr(12)
  237. validatorEndpoint, serviceEndpoint = testSetupSocketPair(
  238. t,
  239. chainID,
  240. types.NewErroringMockPV(),
  241. tc.addr,
  242. tc.dialer)
  243. ts = time.Now()
  244. proposal = &types.Proposal{Timestamp: ts}
  245. )
  246. defer validatorEndpoint.Stop()
  247. defer serviceEndpoint.Stop()
  248. err := validatorEndpoint.SignProposal("", proposal)
  249. require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
  250. err = serviceEndpoint.privVal.SignProposal(chainID, proposal)
  251. require.Error(t, err)
  252. err = validatorEndpoint.SignProposal(chainID, proposal)
  253. require.Error(t, err)
  254. }()
  255. }
  256. }
  257. func TestErrUnexpectedResponse(t *testing.T) {
  258. for _, tc := range socketTestCases(t) {
  259. func() {
  260. var (
  261. logger = log.TestingLogger()
  262. chainID = cmn.RandStr(12)
  263. readyCh = make(chan struct{})
  264. errCh = make(chan error, 1)
  265. serviceEndpoint = NewSignerServiceEndpoint(
  266. logger,
  267. chainID,
  268. types.NewMockPV(),
  269. tc.dialer,
  270. )
  271. validatorEndpoint = newSignerValidatorEndpoint(
  272. logger,
  273. tc.addr,
  274. testTimeoutReadWrite)
  275. )
  276. testStartEndpoint(t, readyCh, validatorEndpoint)
  277. defer validatorEndpoint.Stop()
  278. SignerServiceEndpointTimeoutReadWrite(time.Millisecond)(serviceEndpoint)
  279. SignerServiceEndpointConnRetries(100)(serviceEndpoint)
  280. // we do not want to Start() the remote signer here and instead use the connection to
  281. // reply with intentionally wrong replies below:
  282. rsConn, err := serviceEndpoint.connect()
  283. defer rsConn.Close()
  284. require.NoError(t, err)
  285. require.NotNil(t, rsConn)
  286. // send over public key to get the remote signer running:
  287. go testReadWriteResponse(t, &PubKeyResponse{}, rsConn)
  288. <-readyCh
  289. // Proposal:
  290. go func(errc chan error) {
  291. errc <- validatorEndpoint.SignProposal(chainID, &types.Proposal{})
  292. }(errCh)
  293. // read request and write wrong response:
  294. go testReadWriteResponse(t, &SignedVoteResponse{}, rsConn)
  295. err = <-errCh
  296. require.Error(t, err)
  297. require.Equal(t, err, ErrUnexpectedResponse)
  298. // Vote:
  299. go func(errc chan error) {
  300. errc <- validatorEndpoint.SignVote(chainID, &types.Vote{})
  301. }(errCh)
  302. // read request and write wrong response:
  303. go testReadWriteResponse(t, &SignedProposalResponse{}, rsConn)
  304. err = <-errCh
  305. require.Error(t, err)
  306. require.Equal(t, err, ErrUnexpectedResponse)
  307. }()
  308. }
  309. }
  310. func TestRetryConnToRemoteSigner(t *testing.T) {
  311. for _, tc := range socketTestCases(t) {
  312. func() {
  313. var (
  314. logger = log.TestingLogger()
  315. chainID = cmn.RandStr(12)
  316. readyCh = make(chan struct{})
  317. serviceEndpoint = NewSignerServiceEndpoint(
  318. logger,
  319. chainID,
  320. types.NewMockPV(),
  321. tc.dialer,
  322. )
  323. thisConnTimeout = testTimeoutReadWrite
  324. validatorEndpoint = newSignerValidatorEndpoint(logger, tc.addr, thisConnTimeout)
  325. )
  326. // Ping every:
  327. SignerValidatorEndpointSetHeartbeat(testTimeoutHeartbeat)(validatorEndpoint)
  328. SignerServiceEndpointTimeoutReadWrite(testTimeoutReadWrite)(serviceEndpoint)
  329. SignerServiceEndpointConnRetries(10)(serviceEndpoint)
  330. testStartEndpoint(t, readyCh, validatorEndpoint)
  331. defer validatorEndpoint.Stop()
  332. require.NoError(t, serviceEndpoint.Start())
  333. assert.True(t, serviceEndpoint.IsRunning())
  334. <-readyCh
  335. time.Sleep(testTimeoutHeartbeat * 2)
  336. serviceEndpoint.Stop()
  337. rs2 := NewSignerServiceEndpoint(
  338. logger,
  339. chainID,
  340. types.NewMockPV(),
  341. tc.dialer,
  342. )
  343. // let some pings pass
  344. time.Sleep(testTimeoutHeartbeat3o2)
  345. require.NoError(t, rs2.Start())
  346. assert.True(t, rs2.IsRunning())
  347. defer rs2.Stop()
  348. // give the client some time to re-establish the conn to the remote signer
  349. // should see sth like this in the logs:
  350. //
  351. // E[10016-01-10|17:12:46.128] Ping err="remote signer timed out"
  352. // I[10016-01-10|17:16:42.447] Re-created connection to remote signer impl=SocketVal
  353. time.Sleep(testTimeoutReadWrite * 2)
  354. }()
  355. }
  356. }
  357. func newSignerValidatorEndpoint(logger log.Logger, addr string, timeoutReadWrite time.Duration) *SignerValidatorEndpoint {
  358. proto, address := cmn.ProtocolAndAddress(addr)
  359. ln, err := net.Listen(proto, address)
  360. logger.Info("Listening at", "proto", proto, "address", address)
  361. if err != nil {
  362. panic(err)
  363. }
  364. var listener net.Listener
  365. if proto == "unix" {
  366. unixLn := NewUnixListener(ln)
  367. UnixListenerTimeoutAccept(testTimeoutAccept)(unixLn)
  368. UnixListenerTimeoutReadWrite(timeoutReadWrite)(unixLn)
  369. listener = unixLn
  370. } else {
  371. tcpLn := NewTCPListener(ln, ed25519.GenPrivKey())
  372. TCPListenerTimeoutAccept(testTimeoutAccept)(tcpLn)
  373. TCPListenerTimeoutReadWrite(timeoutReadWrite)(tcpLn)
  374. listener = tcpLn
  375. }
  376. return NewSignerValidatorEndpoint(logger, listener)
  377. }
  378. func testSetupSocketPair(
  379. t *testing.T,
  380. chainID string,
  381. privValidator types.PrivValidator,
  382. addr string,
  383. socketDialer SocketDialer,
  384. ) (*SignerValidatorEndpoint, *SignerServiceEndpoint) {
  385. var (
  386. logger = log.TestingLogger()
  387. privVal = privValidator
  388. readyc = make(chan struct{})
  389. serviceEndpoint = NewSignerServiceEndpoint(
  390. logger,
  391. chainID,
  392. privVal,
  393. socketDialer,
  394. )
  395. thisConnTimeout = testTimeoutReadWrite
  396. validatorEndpoint = newSignerValidatorEndpoint(logger, addr, thisConnTimeout)
  397. )
  398. SignerValidatorEndpointSetHeartbeat(testTimeoutHeartbeat)(validatorEndpoint)
  399. SignerServiceEndpointTimeoutReadWrite(testTimeoutReadWrite)(serviceEndpoint)
  400. SignerServiceEndpointConnRetries(1e6)(serviceEndpoint)
  401. testStartEndpoint(t, readyc, validatorEndpoint)
  402. require.NoError(t, serviceEndpoint.Start())
  403. assert.True(t, serviceEndpoint.IsRunning())
  404. <-readyc
  405. return validatorEndpoint, serviceEndpoint
  406. }
  407. func testReadWriteResponse(t *testing.T, resp RemoteSignerMsg, rsConn net.Conn) {
  408. _, err := readMsg(rsConn)
  409. require.NoError(t, err)
  410. err = writeMsg(rsConn, resp)
  411. require.NoError(t, err)
  412. }
  413. func testStartEndpoint(t *testing.T, readyCh chan struct{}, sc *SignerValidatorEndpoint) {
  414. go func(sc *SignerValidatorEndpoint) {
  415. require.NoError(t, sc.Start())
  416. assert.True(t, sc.IsRunning())
  417. readyCh <- struct{}{}
  418. }(sc)
  419. }
  420. // testFreeTCPAddr claims a free port so we don't block on listener being ready.
  421. func testFreeTCPAddr(t *testing.T) string {
  422. ln, err := net.Listen("tcp", "127.0.0.1:0")
  423. require.NoError(t, err)
  424. defer ln.Close()
  425. return fmt.Sprintf("127.0.0.1:%d", ln.Addr().(*net.TCPAddr).Port)
  426. }