You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

400 lines
9.1 KiB

p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
  1. package p2p
  2. import (
  3. "fmt"
  4. "net"
  5. "time"
  6. "github.com/tendermint/tendermint/libs/cmap"
  7. "github.com/tendermint/tendermint/libs/log"
  8. "github.com/tendermint/tendermint/libs/service"
  9. tmconn "github.com/tendermint/tendermint/p2p/conn"
  10. )
  11. const metricsTickerDuration = 10 * time.Second
  12. // Peer is an interface representing a peer connected on a reactor.
  13. type Peer interface {
  14. service.Service
  15. FlushStop()
  16. ID() ID // peer's cryptographic ID
  17. RemoteIP() net.IP // remote IP of the connection
  18. RemoteAddr() net.Addr // remote address of the connection
  19. IsOutbound() bool // did we dial the peer
  20. IsPersistent() bool // do we redial this peer when we disconnect
  21. CloseConn() error // close original connection
  22. NodeInfo() NodeInfo // peer's info
  23. Status() tmconn.ConnectionStatus
  24. SocketAddr() *NetAddress // actual address of the socket
  25. Send(byte, []byte) bool
  26. TrySend(byte, []byte) bool
  27. Set(string, interface{})
  28. Get(string) interface{}
  29. }
  30. //----------------------------------------------------------
  31. // peerConn contains the raw connection and its config.
  32. type peerConn struct {
  33. outbound bool
  34. persistent bool
  35. conn net.Conn // source connection
  36. socketAddr *NetAddress
  37. // cached RemoteIP()
  38. ip net.IP
  39. }
  40. func newPeerConn(
  41. outbound, persistent bool,
  42. conn net.Conn,
  43. socketAddr *NetAddress,
  44. ) peerConn {
  45. return peerConn{
  46. outbound: outbound,
  47. persistent: persistent,
  48. conn: conn,
  49. socketAddr: socketAddr,
  50. }
  51. }
  52. // ID only exists for SecretConnection.
  53. // NOTE: Will panic if conn is not *SecretConnection.
  54. func (pc peerConn) ID() ID {
  55. return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey())
  56. }
  57. // Return the IP from the connection RemoteAddr
  58. func (pc peerConn) RemoteIP() net.IP {
  59. if pc.ip != nil {
  60. return pc.ip
  61. }
  62. host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String())
  63. if err != nil {
  64. panic(err)
  65. }
  66. ips, err := net.LookupIP(host)
  67. if err != nil {
  68. panic(err)
  69. }
  70. pc.ip = ips[0]
  71. return pc.ip
  72. }
  73. // peer implements Peer.
  74. //
  75. // Before using a peer, you will need to perform a handshake on connection.
  76. type peer struct {
  77. service.BaseService
  78. // raw peerConn and the multiplex connection
  79. peerConn
  80. mconn *tmconn.MConnection
  81. // peer's node info and the channel it knows about
  82. // channels = nodeInfo.Channels
  83. // cached to avoid copying nodeInfo in hasChannel
  84. nodeInfo NodeInfo
  85. channels []byte
  86. // User data
  87. Data *cmap.CMap
  88. metrics *Metrics
  89. metricsTicker *time.Ticker
  90. }
  91. type PeerOption func(*peer)
  92. func newPeer(
  93. pc peerConn,
  94. mConfig tmconn.MConnConfig,
  95. nodeInfo NodeInfo,
  96. reactorsByCh map[byte]Reactor,
  97. chDescs []*tmconn.ChannelDescriptor,
  98. onPeerError func(Peer, interface{}),
  99. options ...PeerOption,
  100. ) *peer {
  101. p := &peer{
  102. peerConn: pc,
  103. nodeInfo: nodeInfo,
  104. channels: nodeInfo.(DefaultNodeInfo).Channels, // TODO
  105. Data: cmap.NewCMap(),
  106. metricsTicker: time.NewTicker(metricsTickerDuration),
  107. metrics: NopMetrics(),
  108. }
  109. p.mconn = createMConnection(
  110. pc.conn,
  111. p,
  112. reactorsByCh,
  113. chDescs,
  114. onPeerError,
  115. mConfig,
  116. )
  117. p.BaseService = *service.NewBaseService(nil, "Peer", p)
  118. for _, option := range options {
  119. option(p)
  120. }
  121. return p
  122. }
  123. // String representation.
  124. func (p *peer) String() string {
  125. if p.outbound {
  126. return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID())
  127. }
  128. return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
  129. }
  130. //---------------------------------------------------
  131. // Implements service.Service
  132. // SetLogger implements BaseService.
  133. func (p *peer) SetLogger(l log.Logger) {
  134. p.Logger = l
  135. p.mconn.SetLogger(l)
  136. }
  137. // OnStart implements BaseService.
  138. func (p *peer) OnStart() error {
  139. if err := p.BaseService.OnStart(); err != nil {
  140. return err
  141. }
  142. if err := p.mconn.Start(); err != nil {
  143. return err
  144. }
  145. go p.metricsReporter()
  146. return nil
  147. }
  148. // FlushStop mimics OnStop but additionally ensures that all successful
  149. // .Send() calls will get flushed before closing the connection.
  150. // NOTE: it is not safe to call this method more than once.
  151. func (p *peer) FlushStop() {
  152. p.metricsTicker.Stop()
  153. p.BaseService.OnStop()
  154. p.mconn.FlushStop() // stop everything and close the conn
  155. }
  156. // OnStop implements BaseService.
  157. func (p *peer) OnStop() {
  158. p.metricsTicker.Stop()
  159. p.BaseService.OnStop()
  160. p.mconn.Stop() // stop everything and close the conn
  161. }
  162. //---------------------------------------------------
  163. // Implements Peer
  164. // ID returns the peer's ID - the hex encoded hash of its pubkey.
  165. func (p *peer) ID() ID {
  166. return p.nodeInfo.ID()
  167. }
  168. // IsOutbound returns true if the connection is outbound, false otherwise.
  169. func (p *peer) IsOutbound() bool {
  170. return p.peerConn.outbound
  171. }
  172. // IsPersistent returns true if the peer is persitent, false otherwise.
  173. func (p *peer) IsPersistent() bool {
  174. return p.peerConn.persistent
  175. }
  176. // NodeInfo returns a copy of the peer's NodeInfo.
  177. func (p *peer) NodeInfo() NodeInfo {
  178. return p.nodeInfo
  179. }
  180. // SocketAddr returns the address of the socket.
  181. // For outbound peers, it's the address dialed (after DNS resolution).
  182. // For inbound peers, it's the address returned by the underlying connection
  183. // (not what's reported in the peer's NodeInfo).
  184. func (p *peer) SocketAddr() *NetAddress {
  185. return p.peerConn.socketAddr
  186. }
  187. // Status returns the peer's ConnectionStatus.
  188. func (p *peer) Status() tmconn.ConnectionStatus {
  189. return p.mconn.Status()
  190. }
  191. // Send msg bytes to the channel identified by chID byte. Returns false if the
  192. // send queue is full after timeout, specified by MConnection.
  193. func (p *peer) Send(chID byte, msgBytes []byte) bool {
  194. if !p.IsRunning() {
  195. // see Switch#Broadcast, where we fetch the list of peers and loop over
  196. // them - while we're looping, one peer may be removed and stopped.
  197. return false
  198. } else if !p.hasChannel(chID) {
  199. return false
  200. }
  201. res := p.mconn.Send(chID, msgBytes)
  202. if res {
  203. labels := []string{
  204. "peer_id", string(p.ID()),
  205. "chID", fmt.Sprintf("%#x", chID),
  206. }
  207. p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
  208. }
  209. return res
  210. }
  211. // TrySend msg bytes to the channel identified by chID byte. Immediately returns
  212. // false if the send queue is full.
  213. func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
  214. if !p.IsRunning() {
  215. return false
  216. } else if !p.hasChannel(chID) {
  217. return false
  218. }
  219. res := p.mconn.TrySend(chID, msgBytes)
  220. if res {
  221. labels := []string{
  222. "peer_id", string(p.ID()),
  223. "chID", fmt.Sprintf("%#x", chID),
  224. }
  225. p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
  226. }
  227. return res
  228. }
  229. // Get the data for a given key.
  230. func (p *peer) Get(key string) interface{} {
  231. return p.Data.Get(key)
  232. }
  233. // Set sets the data for the given key.
  234. func (p *peer) Set(key string, data interface{}) {
  235. p.Data.Set(key, data)
  236. }
  237. // hasChannel returns true if the peer reported
  238. // knowing about the given chID.
  239. func (p *peer) hasChannel(chID byte) bool {
  240. for _, ch := range p.channels {
  241. if ch == chID {
  242. return true
  243. }
  244. }
  245. // NOTE: probably will want to remove this
  246. // but could be helpful while the feature is new
  247. p.Logger.Debug(
  248. "Unknown channel for peer",
  249. "channel",
  250. chID,
  251. "channels",
  252. p.channels,
  253. )
  254. return false
  255. }
  256. // CloseConn closes original connection. Used for cleaning up in cases where the peer had not been started at all.
  257. func (p *peer) CloseConn() error {
  258. return p.peerConn.conn.Close()
  259. }
  260. //---------------------------------------------------
  261. // methods only used for testing
  262. // TODO: can we remove these?
  263. // CloseConn closes the underlying connection
  264. func (pc *peerConn) CloseConn() {
  265. pc.conn.Close() // nolint: errcheck
  266. }
  267. // RemoteAddr returns peer's remote network address.
  268. func (p *peer) RemoteAddr() net.Addr {
  269. return p.peerConn.conn.RemoteAddr()
  270. }
  271. // CanSend returns true if the send queue is not full, false otherwise.
  272. func (p *peer) CanSend(chID byte) bool {
  273. if !p.IsRunning() {
  274. return false
  275. }
  276. return p.mconn.CanSend(chID)
  277. }
  278. //---------------------------------------------------
  279. func PeerMetrics(metrics *Metrics) PeerOption {
  280. return func(p *peer) {
  281. p.metrics = metrics
  282. }
  283. }
  284. func (p *peer) metricsReporter() {
  285. for {
  286. select {
  287. case <-p.metricsTicker.C:
  288. status := p.mconn.Status()
  289. var sendQueueSize float64
  290. for _, chStatus := range status.Channels {
  291. sendQueueSize += float64(chStatus.SendQueueSize)
  292. }
  293. p.metrics.PeerPendingSendBytes.With("peer_id", string(p.ID())).Set(sendQueueSize)
  294. case <-p.Quit():
  295. return
  296. }
  297. }
  298. }
  299. //------------------------------------------------------------------
  300. // helper funcs
  301. func createMConnection(
  302. conn net.Conn,
  303. p *peer,
  304. reactorsByCh map[byte]Reactor,
  305. chDescs []*tmconn.ChannelDescriptor,
  306. onPeerError func(Peer, interface{}),
  307. config tmconn.MConnConfig,
  308. ) *tmconn.MConnection {
  309. onReceive := func(chID byte, msgBytes []byte) {
  310. reactor := reactorsByCh[chID]
  311. if reactor == nil {
  312. // Note that its ok to panic here as it's caught in the conn._recover,
  313. // which does onPeerError.
  314. panic(fmt.Sprintf("Unknown channel %X", chID))
  315. }
  316. labels := []string{
  317. "peer_id", string(p.ID()),
  318. "chID", fmt.Sprintf("%#x", chID),
  319. }
  320. p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
  321. reactor.Receive(chID, p, msgBytes)
  322. }
  323. onError := func(r interface{}) {
  324. onPeerError(p, r)
  325. }
  326. return tmconn.NewMConnectionWithConfig(
  327. conn,
  328. chDescs,
  329. onReceive,
  330. onError,
  331. config,
  332. )
  333. }