You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

535 lines
13 KiB

p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
p2p: file descriptor leaks (#3150) * close peer's connection to avoid fd leak Fixes #2967 * rename peer#Addr to RemoteAddr * fix test * fixes after Ethan's review * bring back the check * changelog entry * write a test for switch#acceptRoutine * increase timeouts? :( * remove extra assertNPeersWithTimeout * simplify test * assert number of peers (just to be safe) * Cleanup in OnStop * run tests with verbose flag on CircleCI * spawn a reading routine to prevent connection from closing * get port from the listener random port is faster, but often results in ``` panic: listen tcp 127.0.0.1:44068: bind: address already in use [recovered] panic: listen tcp 127.0.0.1:44068: bind: address already in use goroutine 79 [running]: testing.tRunner.func1(0xc0001bd600) /usr/local/go/src/testing/testing.go:792 +0x387 panic(0x974d20, 0xc0001b0500) /usr/local/go/src/runtime/panic.go:513 +0x1b9 github.com/tendermint/tendermint/p2p.MakeSwitch(0xc0000f42a0, 0x0, 0x9fb9cc, 0x9, 0x9fc346, 0xb, 0xb42128, 0x0, 0x0, 0x0, ...) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:182 +0xa28 github.com/tendermint/tendermint/p2p.MakeConnectedSwitches(0xc0000f42a0, 0x2, 0xb42128, 0xb41eb8, 0x4f1205, 0xc0001bed80, 0x4f16ed) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/test_util.go:75 +0xf9 github.com/tendermint/tendermint/p2p.MakeSwitchPair(0xbb8d20, 0xc0001bd600, 0xb42128, 0x2f7, 0x4f16c0) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:94 +0x4c github.com/tendermint/tendermint/p2p.TestSwitches(0xc0001bd600) /home/vagrant/go/src/github.com/tendermint/tendermint/p2p/switch_test.go:117 +0x58 testing.tRunner(0xc0001bd600, 0xb42038) /usr/local/go/src/testing/testing.go:827 +0xbf created by testing.(*T).Run /usr/local/go/src/testing/testing.go:878 +0x353 exit status 2 FAIL github.com/tendermint/tendermint/p2p 0.350s ```
6 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "encoding/hex"
  5. "fmt"
  6. "net"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/tendermint/tendermint/libs/cmap"
  11. "github.com/tendermint/tendermint/libs/log"
  12. "github.com/tendermint/tendermint/libs/service"
  13. tmconn "github.com/tendermint/tendermint/p2p/conn"
  14. )
  15. // PeerID is a unique peer ID, generally expressed in hex form.
  16. type PeerID []byte
  17. // String implements the fmt.Stringer interface for the PeerID type.
  18. func (pid PeerID) String() string {
  19. return strings.ToLower(hex.EncodeToString(pid))
  20. }
  21. // Empty returns true if the PeerID is considered empty.
  22. func (pid PeerID) Empty() bool {
  23. return len(pid) == 0
  24. }
  25. // PeerIDFromString returns a PeerID from an encoded string or an error upon
  26. // decode failure.
  27. func PeerIDFromString(s string) (PeerID, error) {
  28. bz, err := hex.DecodeString(s)
  29. if err != nil {
  30. return nil, fmt.Errorf("failed to decode PeerID (%s): %w", s, err)
  31. }
  32. return PeerID(bz), nil
  33. }
  34. // Equal reports whether two PeerID are equal.
  35. func (pid PeerID) Equal(other PeerID) bool {
  36. return bytes.Equal(pid, other)
  37. }
  38. // PeerStatus specifies peer statuses.
  39. type PeerStatus string
  40. const (
  41. PeerStatusNew = PeerStatus("new") // New peer which we haven't tried to contact yet.
  42. PeerStatusUp = PeerStatus("up") // Peer which we have an active connection to.
  43. PeerStatusDown = PeerStatus("down") // Peer which we're temporarily disconnected from.
  44. PeerStatusRemoved = PeerStatus("removed") // Peer which has been removed.
  45. PeerStatusBanned = PeerStatus("banned") // Peer which is banned for misbehavior.
  46. )
  47. // PeerPriority specifies peer priorities.
  48. type PeerPriority int
  49. const (
  50. PeerPriorityNormal PeerPriority = iota + 1
  51. PeerPriorityValidator
  52. PeerPriorityPersistent
  53. )
  54. // PeerError is a peer error reported by a reactor via the Error channel. The
  55. // severity may cause the peer to be disconnected or banned depending on policy.
  56. type PeerError struct {
  57. PeerID PeerID
  58. Err error
  59. Severity PeerErrorSeverity
  60. }
  61. // PeerErrorSeverity determines the severity of a peer error.
  62. type PeerErrorSeverity string
  63. const (
  64. PeerErrorSeverityLow PeerErrorSeverity = "low" // Mostly ignored.
  65. PeerErrorSeverityHigh PeerErrorSeverity = "high" // May disconnect.
  66. PeerErrorSeverityCritical PeerErrorSeverity = "critical" // Ban.
  67. )
  68. // PeerUpdatesCh defines a wrapper around a PeerUpdate go channel that allows
  69. // a reactor to listen for peer updates and safely close it when stopping.
  70. type PeerUpdatesCh struct {
  71. closeOnce sync.Once
  72. // updatesCh defines the go channel in which the router sends peer updates to
  73. // reactors. Each reactor will have its own PeerUpdatesCh to listen for updates
  74. // from.
  75. updatesCh chan PeerUpdate
  76. // doneCh is used to signal that a PeerUpdatesCh is closed. It is the
  77. // reactor's responsibility to invoke Close.
  78. doneCh chan struct{}
  79. }
  80. // NewPeerUpdates returns a reference to a new PeerUpdatesCh.
  81. func NewPeerUpdates() *PeerUpdatesCh {
  82. return &PeerUpdatesCh{
  83. updatesCh: make(chan PeerUpdate),
  84. doneCh: make(chan struct{}),
  85. }
  86. }
  87. // Updates returns a read-only go channel where a consuming reactor can listen
  88. // for peer updates sent from the router.
  89. func (puc *PeerUpdatesCh) Updates() <-chan PeerUpdate {
  90. return puc.updatesCh
  91. }
  92. // Close closes the PeerUpdatesCh channel. It should only be closed by the respective
  93. // reactor when stopping and ensure nothing is listening for updates.
  94. //
  95. // NOTE: After a PeerUpdatesCh is closed, the router may safely assume it can no
  96. // longer send on the internal updatesCh, however it should NEVER explicitly close
  97. // it as that could result in panics by sending on a closed channel.
  98. func (puc *PeerUpdatesCh) Close() {
  99. puc.closeOnce.Do(func() {
  100. close(puc.doneCh)
  101. })
  102. }
  103. // Done returns a read-only version of the PeerUpdatesCh's internal doneCh go
  104. // channel that should be used by a router to signal when it is safe to explicitly
  105. // not send any peer updates.
  106. func (puc *PeerUpdatesCh) Done() <-chan struct{} {
  107. return puc.doneCh
  108. }
  109. // PeerUpdate is a peer status update for reactors.
  110. type PeerUpdate struct {
  111. PeerID PeerID
  112. Status PeerStatus
  113. }
  114. // ============================================================================
  115. // Types and business logic below may be deprecated.
  116. //
  117. // TODO: Rename once legacy p2p types are removed.
  118. // ref: https://github.com/tendermint/tendermint/issues/5670
  119. // ============================================================================
  120. //go:generate mockery --case underscore --name Peer
  121. const metricsTickerDuration = 10 * time.Second
  122. // Peer is an interface representing a peer connected on a reactor.
  123. type Peer interface {
  124. service.Service
  125. FlushStop()
  126. ID() ID // peer's cryptographic ID
  127. RemoteIP() net.IP // remote IP of the connection
  128. RemoteAddr() net.Addr // remote address of the connection
  129. IsOutbound() bool // did we dial the peer
  130. IsPersistent() bool // do we redial this peer when we disconnect
  131. CloseConn() error // close original connection
  132. NodeInfo() NodeInfo // peer's info
  133. Status() tmconn.ConnectionStatus
  134. SocketAddr() *NetAddress // actual address of the socket
  135. Send(byte, []byte) bool
  136. TrySend(byte, []byte) bool
  137. Set(string, interface{})
  138. Get(string) interface{}
  139. }
  140. //----------------------------------------------------------
  141. // peerConn contains the raw connection and its config.
  142. type peerConn struct {
  143. outbound bool
  144. persistent bool
  145. conn net.Conn // source connection
  146. socketAddr *NetAddress
  147. // cached RemoteIP()
  148. ip net.IP
  149. }
  150. func newPeerConn(
  151. outbound, persistent bool,
  152. conn net.Conn,
  153. socketAddr *NetAddress,
  154. ) peerConn {
  155. return peerConn{
  156. outbound: outbound,
  157. persistent: persistent,
  158. conn: conn,
  159. socketAddr: socketAddr,
  160. }
  161. }
  162. // ID only exists for SecretConnection.
  163. // NOTE: Will panic if conn is not *SecretConnection.
  164. func (pc peerConn) ID() ID {
  165. return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey())
  166. }
  167. // Return the IP from the connection RemoteAddr
  168. func (pc peerConn) RemoteIP() net.IP {
  169. if pc.ip != nil {
  170. return pc.ip
  171. }
  172. host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String())
  173. if err != nil {
  174. panic(err)
  175. }
  176. ips, err := net.LookupIP(host)
  177. if err != nil {
  178. panic(err)
  179. }
  180. pc.ip = ips[0]
  181. return pc.ip
  182. }
  183. // peer implements Peer.
  184. //
  185. // Before using a peer, you will need to perform a handshake on connection.
  186. type peer struct {
  187. service.BaseService
  188. // raw peerConn and the multiplex connection
  189. peerConn
  190. mconn *tmconn.MConnection
  191. // peer's node info and the channel it knows about
  192. // channels = nodeInfo.Channels
  193. // cached to avoid copying nodeInfo in hasChannel
  194. nodeInfo NodeInfo
  195. channels []byte
  196. // User data
  197. Data *cmap.CMap
  198. metrics *Metrics
  199. metricsTicker *time.Ticker
  200. }
  201. type PeerOption func(*peer)
  202. func newPeer(
  203. pc peerConn,
  204. mConfig tmconn.MConnConfig,
  205. nodeInfo NodeInfo,
  206. reactorsByCh map[byte]Reactor,
  207. chDescs []*tmconn.ChannelDescriptor,
  208. onPeerError func(Peer, interface{}),
  209. options ...PeerOption,
  210. ) *peer {
  211. p := &peer{
  212. peerConn: pc,
  213. nodeInfo: nodeInfo,
  214. channels: nodeInfo.(DefaultNodeInfo).Channels, // TODO
  215. Data: cmap.NewCMap(),
  216. metricsTicker: time.NewTicker(metricsTickerDuration),
  217. metrics: NopMetrics(),
  218. }
  219. p.mconn = createMConnection(
  220. pc.conn,
  221. p,
  222. reactorsByCh,
  223. chDescs,
  224. onPeerError,
  225. mConfig,
  226. )
  227. p.BaseService = *service.NewBaseService(nil, "Peer", p)
  228. for _, option := range options {
  229. option(p)
  230. }
  231. return p
  232. }
  233. // String representation.
  234. func (p *peer) String() string {
  235. if p.outbound {
  236. return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID())
  237. }
  238. return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
  239. }
  240. //---------------------------------------------------
  241. // Implements service.Service
  242. // SetLogger implements BaseService.
  243. func (p *peer) SetLogger(l log.Logger) {
  244. p.Logger = l
  245. p.mconn.SetLogger(l)
  246. }
  247. // OnStart implements BaseService.
  248. func (p *peer) OnStart() error {
  249. if err := p.BaseService.OnStart(); err != nil {
  250. return err
  251. }
  252. if err := p.mconn.Start(); err != nil {
  253. return err
  254. }
  255. go p.metricsReporter()
  256. return nil
  257. }
  258. // FlushStop mimics OnStop but additionally ensures that all successful
  259. // .Send() calls will get flushed before closing the connection.
  260. // NOTE: it is not safe to call this method more than once.
  261. func (p *peer) FlushStop() {
  262. p.metricsTicker.Stop()
  263. p.BaseService.OnStop()
  264. p.mconn.FlushStop() // stop everything and close the conn
  265. }
  266. // OnStop implements BaseService.
  267. func (p *peer) OnStop() {
  268. p.metricsTicker.Stop()
  269. p.BaseService.OnStop()
  270. if err := p.mconn.Stop(); err != nil { // stop everything and close the conn
  271. p.Logger.Debug("Error while stopping peer", "err", err)
  272. }
  273. }
  274. //---------------------------------------------------
  275. // Implements Peer
  276. // ID returns the peer's ID - the hex encoded hash of its pubkey.
  277. func (p *peer) ID() ID {
  278. return p.nodeInfo.ID()
  279. }
  280. // IsOutbound returns true if the connection is outbound, false otherwise.
  281. func (p *peer) IsOutbound() bool {
  282. return p.peerConn.outbound
  283. }
  284. // IsPersistent returns true if the peer is persitent, false otherwise.
  285. func (p *peer) IsPersistent() bool {
  286. return p.peerConn.persistent
  287. }
  288. // NodeInfo returns a copy of the peer's NodeInfo.
  289. func (p *peer) NodeInfo() NodeInfo {
  290. return p.nodeInfo
  291. }
  292. // SocketAddr returns the address of the socket.
  293. // For outbound peers, it's the address dialed (after DNS resolution).
  294. // For inbound peers, it's the address returned by the underlying connection
  295. // (not what's reported in the peer's NodeInfo).
  296. func (p *peer) SocketAddr() *NetAddress {
  297. return p.peerConn.socketAddr
  298. }
  299. // Status returns the peer's ConnectionStatus.
  300. func (p *peer) Status() tmconn.ConnectionStatus {
  301. return p.mconn.Status()
  302. }
  303. // Send msg bytes to the channel identified by chID byte. Returns false if the
  304. // send queue is full after timeout, specified by MConnection.
  305. func (p *peer) Send(chID byte, msgBytes []byte) bool {
  306. if !p.IsRunning() {
  307. // see Switch#Broadcast, where we fetch the list of peers and loop over
  308. // them - while we're looping, one peer may be removed and stopped.
  309. return false
  310. } else if !p.hasChannel(chID) {
  311. return false
  312. }
  313. res := p.mconn.Send(chID, msgBytes)
  314. if res {
  315. labels := []string{
  316. "peer_id", string(p.ID()),
  317. "chID", fmt.Sprintf("%#x", chID),
  318. }
  319. p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
  320. }
  321. return res
  322. }
  323. // TrySend msg bytes to the channel identified by chID byte. Immediately returns
  324. // false if the send queue is full.
  325. func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
  326. if !p.IsRunning() {
  327. return false
  328. } else if !p.hasChannel(chID) {
  329. return false
  330. }
  331. res := p.mconn.TrySend(chID, msgBytes)
  332. if res {
  333. labels := []string{
  334. "peer_id", string(p.ID()),
  335. "chID", fmt.Sprintf("%#x", chID),
  336. }
  337. p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
  338. }
  339. return res
  340. }
  341. // Get the data for a given key.
  342. func (p *peer) Get(key string) interface{} {
  343. return p.Data.Get(key)
  344. }
  345. // Set sets the data for the given key.
  346. func (p *peer) Set(key string, data interface{}) {
  347. p.Data.Set(key, data)
  348. }
  349. // hasChannel returns true if the peer reported
  350. // knowing about the given chID.
  351. func (p *peer) hasChannel(chID byte) bool {
  352. for _, ch := range p.channels {
  353. if ch == chID {
  354. return true
  355. }
  356. }
  357. // NOTE: probably will want to remove this
  358. // but could be helpful while the feature is new
  359. p.Logger.Debug(
  360. "Unknown channel for peer",
  361. "channel",
  362. chID,
  363. "channels",
  364. p.channels,
  365. )
  366. return false
  367. }
  368. // CloseConn closes original connection. Used for cleaning up in cases where the peer had not been started at all.
  369. func (p *peer) CloseConn() error {
  370. return p.peerConn.conn.Close()
  371. }
  372. //---------------------------------------------------
  373. // methods only used for testing
  374. // TODO: can we remove these?
  375. // CloseConn closes the underlying connection
  376. func (pc *peerConn) CloseConn() {
  377. pc.conn.Close()
  378. }
  379. // RemoteAddr returns peer's remote network address.
  380. func (p *peer) RemoteAddr() net.Addr {
  381. return p.peerConn.conn.RemoteAddr()
  382. }
  383. // CanSend returns true if the send queue is not full, false otherwise.
  384. func (p *peer) CanSend(chID byte) bool {
  385. if !p.IsRunning() {
  386. return false
  387. }
  388. return p.mconn.CanSend(chID)
  389. }
  390. //---------------------------------------------------
  391. func PeerMetrics(metrics *Metrics) PeerOption {
  392. return func(p *peer) {
  393. p.metrics = metrics
  394. }
  395. }
  396. func (p *peer) metricsReporter() {
  397. for {
  398. select {
  399. case <-p.metricsTicker.C:
  400. status := p.mconn.Status()
  401. var sendQueueSize float64
  402. for _, chStatus := range status.Channels {
  403. sendQueueSize += float64(chStatus.SendQueueSize)
  404. }
  405. p.metrics.PeerPendingSendBytes.With("peer_id", string(p.ID())).Set(sendQueueSize)
  406. case <-p.Quit():
  407. return
  408. }
  409. }
  410. }
  411. //------------------------------------------------------------------
  412. // helper funcs
  413. func createMConnection(
  414. conn net.Conn,
  415. p *peer,
  416. reactorsByCh map[byte]Reactor,
  417. chDescs []*tmconn.ChannelDescriptor,
  418. onPeerError func(Peer, interface{}),
  419. config tmconn.MConnConfig,
  420. ) *tmconn.MConnection {
  421. onReceive := func(chID byte, msgBytes []byte) {
  422. reactor := reactorsByCh[chID]
  423. if reactor == nil {
  424. // Note that its ok to panic here as it's caught in the conn._recover,
  425. // which does onPeerError.
  426. panic(fmt.Sprintf("Unknown channel %X", chID))
  427. }
  428. labels := []string{
  429. "peer_id", string(p.ID()),
  430. "chID", fmt.Sprintf("%#x", chID),
  431. }
  432. p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
  433. reactor.Receive(chID, p, msgBytes)
  434. }
  435. onError := func(r interface{}) {
  436. onPeerError(p, r)
  437. }
  438. return tmconn.NewMConnectionWithConfig(
  439. conn,
  440. chDescs,
  441. onReceive,
  442. onError,
  443. config,
  444. )
  445. }