You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

404 lines
9.2 KiB

p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
  1. package p2p
  2. import (
  3. "fmt"
  4. "net"
  5. "time"
  6. "github.com/tendermint/tendermint/libs/cmap"
  7. "github.com/tendermint/tendermint/libs/log"
  8. "github.com/tendermint/tendermint/libs/service"
  9. tmconn "github.com/tendermint/tendermint/p2p/conn"
  10. )
  11. //go:generate mockery --case underscore --name Peer
  12. const metricsTickerDuration = 10 * time.Second
  13. // Peer is an interface representing a peer connected on a reactor.
  14. type Peer interface {
  15. service.Service
  16. FlushStop()
  17. ID() ID // peer's cryptographic ID
  18. RemoteIP() net.IP // remote IP of the connection
  19. RemoteAddr() net.Addr // remote address of the connection
  20. IsOutbound() bool // did we dial the peer
  21. IsPersistent() bool // do we redial this peer when we disconnect
  22. CloseConn() error // close original connection
  23. NodeInfo() NodeInfo // peer's info
  24. Status() tmconn.ConnectionStatus
  25. SocketAddr() *NetAddress // actual address of the socket
  26. Send(byte, []byte) bool
  27. TrySend(byte, []byte) bool
  28. Set(string, interface{})
  29. Get(string) interface{}
  30. }
  31. //----------------------------------------------------------
  32. // peerConn contains the raw connection and its config.
  33. type peerConn struct {
  34. outbound bool
  35. persistent bool
  36. conn net.Conn // source connection
  37. socketAddr *NetAddress
  38. // cached RemoteIP()
  39. ip net.IP
  40. }
  41. func newPeerConn(
  42. outbound, persistent bool,
  43. conn net.Conn,
  44. socketAddr *NetAddress,
  45. ) peerConn {
  46. return peerConn{
  47. outbound: outbound,
  48. persistent: persistent,
  49. conn: conn,
  50. socketAddr: socketAddr,
  51. }
  52. }
  53. // ID only exists for SecretConnection.
  54. // NOTE: Will panic if conn is not *SecretConnection.
  55. func (pc peerConn) ID() ID {
  56. return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey())
  57. }
  58. // Return the IP from the connection RemoteAddr
  59. func (pc peerConn) RemoteIP() net.IP {
  60. if pc.ip != nil {
  61. return pc.ip
  62. }
  63. host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String())
  64. if err != nil {
  65. panic(err)
  66. }
  67. ips, err := net.LookupIP(host)
  68. if err != nil {
  69. panic(err)
  70. }
  71. pc.ip = ips[0]
  72. return pc.ip
  73. }
  74. // peer implements Peer.
  75. //
  76. // Before using a peer, you will need to perform a handshake on connection.
  77. type peer struct {
  78. service.BaseService
  79. // raw peerConn and the multiplex connection
  80. peerConn
  81. mconn *tmconn.MConnection
  82. // peer's node info and the channel it knows about
  83. // channels = nodeInfo.Channels
  84. // cached to avoid copying nodeInfo in hasChannel
  85. nodeInfo NodeInfo
  86. channels []byte
  87. // User data
  88. Data *cmap.CMap
  89. metrics *Metrics
  90. metricsTicker *time.Ticker
  91. }
  92. type PeerOption func(*peer)
  93. func newPeer(
  94. pc peerConn,
  95. mConfig tmconn.MConnConfig,
  96. nodeInfo NodeInfo,
  97. reactorsByCh map[byte]Reactor,
  98. chDescs []*tmconn.ChannelDescriptor,
  99. onPeerError func(Peer, interface{}),
  100. options ...PeerOption,
  101. ) *peer {
  102. p := &peer{
  103. peerConn: pc,
  104. nodeInfo: nodeInfo,
  105. channels: nodeInfo.(DefaultNodeInfo).Channels,
  106. Data: cmap.NewCMap(),
  107. metricsTicker: time.NewTicker(metricsTickerDuration),
  108. metrics: NopMetrics(),
  109. }
  110. p.mconn = createMConnection(
  111. pc.conn,
  112. p,
  113. reactorsByCh,
  114. chDescs,
  115. onPeerError,
  116. mConfig,
  117. )
  118. p.BaseService = *service.NewBaseService(nil, "Peer", p)
  119. for _, option := range options {
  120. option(p)
  121. }
  122. return p
  123. }
  124. // String representation.
  125. func (p *peer) String() string {
  126. if p.outbound {
  127. return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID())
  128. }
  129. return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
  130. }
  131. //---------------------------------------------------
  132. // Implements service.Service
  133. // SetLogger implements BaseService.
  134. func (p *peer) SetLogger(l log.Logger) {
  135. p.Logger = l
  136. p.mconn.SetLogger(l)
  137. }
  138. // OnStart implements BaseService.
  139. func (p *peer) OnStart() error {
  140. if err := p.BaseService.OnStart(); err != nil {
  141. return err
  142. }
  143. if err := p.mconn.Start(); err != nil {
  144. return err
  145. }
  146. go p.metricsReporter()
  147. return nil
  148. }
  149. // FlushStop mimics OnStop but additionally ensures that all successful
  150. // .Send() calls will get flushed before closing the connection.
  151. // NOTE: it is not safe to call this method more than once.
  152. func (p *peer) FlushStop() {
  153. p.metricsTicker.Stop()
  154. p.BaseService.OnStop()
  155. p.mconn.FlushStop() // stop everything and close the conn
  156. }
  157. // OnStop implements BaseService.
  158. func (p *peer) OnStop() {
  159. p.metricsTicker.Stop()
  160. p.BaseService.OnStop()
  161. if err := p.mconn.Stop(); err != nil { // stop everything and close the conn
  162. p.Logger.Debug("Error while stopping peer", "err", err)
  163. }
  164. }
  165. //---------------------------------------------------
  166. // Implements Peer
  167. // ID returns the peer's ID - the hex encoded hash of its pubkey.
  168. func (p *peer) ID() ID {
  169. return p.nodeInfo.ID()
  170. }
  171. // IsOutbound returns true if the connection is outbound, false otherwise.
  172. func (p *peer) IsOutbound() bool {
  173. return p.peerConn.outbound
  174. }
  175. // IsPersistent returns true if the peer is persitent, false otherwise.
  176. func (p *peer) IsPersistent() bool {
  177. return p.peerConn.persistent
  178. }
  179. // NodeInfo returns a copy of the peer's NodeInfo.
  180. func (p *peer) NodeInfo() NodeInfo {
  181. return p.nodeInfo
  182. }
  183. // SocketAddr returns the address of the socket.
  184. // For outbound peers, it's the address dialed (after DNS resolution).
  185. // For inbound peers, it's the address returned by the underlying connection
  186. // (not what's reported in the peer's NodeInfo).
  187. func (p *peer) SocketAddr() *NetAddress {
  188. return p.peerConn.socketAddr
  189. }
  190. // Status returns the peer's ConnectionStatus.
  191. func (p *peer) Status() tmconn.ConnectionStatus {
  192. return p.mconn.Status()
  193. }
  194. // Send msg bytes to the channel identified by chID byte. Returns false if the
  195. // send queue is full after timeout, specified by MConnection.
  196. func (p *peer) Send(chID byte, msgBytes []byte) bool {
  197. if !p.IsRunning() {
  198. // see Switch#Broadcast, where we fetch the list of peers and loop over
  199. // them - while we're looping, one peer may be removed and stopped.
  200. return false
  201. } else if !p.hasChannel(chID) {
  202. return false
  203. }
  204. res := p.mconn.Send(chID, msgBytes)
  205. if res {
  206. labels := []string{
  207. "peer_id", string(p.ID()),
  208. "chID", fmt.Sprintf("%#x", chID),
  209. }
  210. p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
  211. }
  212. return res
  213. }
  214. // TrySend msg bytes to the channel identified by chID byte. Immediately returns
  215. // false if the send queue is full.
  216. func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
  217. if !p.IsRunning() {
  218. return false
  219. } else if !p.hasChannel(chID) {
  220. return false
  221. }
  222. res := p.mconn.TrySend(chID, msgBytes)
  223. if res {
  224. labels := []string{
  225. "peer_id", string(p.ID()),
  226. "chID", fmt.Sprintf("%#x", chID),
  227. }
  228. p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
  229. }
  230. return res
  231. }
  232. // Get the data for a given key.
  233. func (p *peer) Get(key string) interface{} {
  234. return p.Data.Get(key)
  235. }
  236. // Set sets the data for the given key.
  237. func (p *peer) Set(key string, data interface{}) {
  238. p.Data.Set(key, data)
  239. }
  240. // hasChannel returns true if the peer reported
  241. // knowing about the given chID.
  242. func (p *peer) hasChannel(chID byte) bool {
  243. for _, ch := range p.channels {
  244. if ch == chID {
  245. return true
  246. }
  247. }
  248. // NOTE: probably will want to remove this
  249. // but could be helpful while the feature is new
  250. p.Logger.Debug(
  251. "Unknown channel for peer",
  252. "channel",
  253. chID,
  254. "channels",
  255. p.channels,
  256. )
  257. return false
  258. }
  259. // CloseConn closes original connection. Used for cleaning up in cases where the peer had not been started at all.
  260. func (p *peer) CloseConn() error {
  261. return p.peerConn.conn.Close()
  262. }
  263. //---------------------------------------------------
  264. // methods only used for testing
  265. // TODO: can we remove these?
  266. // CloseConn closes the underlying connection
  267. func (pc *peerConn) CloseConn() {
  268. pc.conn.Close()
  269. }
  270. // RemoteAddr returns peer's remote network address.
  271. func (p *peer) RemoteAddr() net.Addr {
  272. return p.peerConn.conn.RemoteAddr()
  273. }
  274. // CanSend returns true if the send queue is not full, false otherwise.
  275. func (p *peer) CanSend(chID byte) bool {
  276. if !p.IsRunning() {
  277. return false
  278. }
  279. return p.mconn.CanSend(chID)
  280. }
  281. //---------------------------------------------------
  282. func PeerMetrics(metrics *Metrics) PeerOption {
  283. return func(p *peer) {
  284. p.metrics = metrics
  285. }
  286. }
  287. func (p *peer) metricsReporter() {
  288. for {
  289. select {
  290. case <-p.metricsTicker.C:
  291. status := p.mconn.Status()
  292. var sendQueueSize float64
  293. for _, chStatus := range status.Channels {
  294. sendQueueSize += float64(chStatus.SendQueueSize)
  295. }
  296. p.metrics.PeerPendingSendBytes.With("peer_id", string(p.ID())).Set(sendQueueSize)
  297. case <-p.Quit():
  298. return
  299. }
  300. }
  301. }
  302. //------------------------------------------------------------------
  303. // helper funcs
  304. func createMConnection(
  305. conn net.Conn,
  306. p *peer,
  307. reactorsByCh map[byte]Reactor,
  308. chDescs []*tmconn.ChannelDescriptor,
  309. onPeerError func(Peer, interface{}),
  310. config tmconn.MConnConfig,
  311. ) *tmconn.MConnection {
  312. onReceive := func(chID byte, msgBytes []byte) {
  313. reactor := reactorsByCh[chID]
  314. if reactor == nil {
  315. // Note that its ok to panic here as it's caught in the conn._recover,
  316. // which does onPeerError.
  317. panic(fmt.Sprintf("Unknown channel %X", chID))
  318. }
  319. labels := []string{
  320. "peer_id", string(p.ID()),
  321. "chID", fmt.Sprintf("%#x", chID),
  322. }
  323. p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
  324. reactor.Receive(chID, p, msgBytes)
  325. }
  326. onError := func(r interface{}) {
  327. onPeerError(p, r)
  328. }
  329. return tmconn.NewMConnectionWithConfig(
  330. conn,
  331. chDescs,
  332. onReceive,
  333. onError,
  334. config,
  335. )
  336. }