You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

374 lines
8.2 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "net"
  5. "time"
  6. cmn "github.com/tendermint/tendermint/libs/common"
  7. "github.com/tendermint/tendermint/libs/log"
  8. "github.com/tendermint/tendermint/config"
  9. tmconn "github.com/tendermint/tendermint/p2p/conn"
  10. )
  11. const metricsTickerDuration = 10 * time.Second
  12. // Peer is an interface representing a peer connected on a reactor.
  13. type Peer interface {
  14. cmn.Service
  15. ID() ID // peer's cryptographic ID
  16. RemoteIP() net.IP // remote IP of the connection
  17. IsOutbound() bool // did we dial the peer
  18. IsPersistent() bool // do we redial this peer when we disconnect
  19. NodeInfo() NodeInfo // peer's info
  20. Status() tmconn.ConnectionStatus
  21. OriginalAddr() *NetAddress
  22. Send(byte, []byte) bool
  23. TrySend(byte, []byte) bool
  24. Set(string, interface{})
  25. Get(string) interface{}
  26. }
  27. //----------------------------------------------------------
  28. // peerConn contains the raw connection and its config.
  29. type peerConn struct {
  30. outbound bool
  31. persistent bool
  32. config *config.P2PConfig
  33. conn net.Conn // source connection
  34. originalAddr *NetAddress // nil for inbound connections
  35. // cached RemoteIP()
  36. ip net.IP
  37. }
  38. func newPeerConn(
  39. outbound, persistent bool,
  40. config *config.P2PConfig,
  41. conn net.Conn,
  42. originalAddr *NetAddress,
  43. ) peerConn {
  44. return peerConn{
  45. outbound: outbound,
  46. persistent: persistent,
  47. config: config,
  48. conn: conn,
  49. originalAddr: originalAddr,
  50. }
  51. }
  52. // ID only exists for SecretConnection.
  53. // NOTE: Will panic if conn is not *SecretConnection.
  54. func (pc peerConn) ID() ID {
  55. return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey())
  56. }
  57. // Return the IP from the connection RemoteAddr
  58. func (pc peerConn) RemoteIP() net.IP {
  59. if pc.ip != nil {
  60. return pc.ip
  61. }
  62. host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String())
  63. if err != nil {
  64. panic(err)
  65. }
  66. ips, err := net.LookupIP(host)
  67. if err != nil {
  68. panic(err)
  69. }
  70. pc.ip = ips[0]
  71. return pc.ip
  72. }
  73. // peer implements Peer.
  74. //
  75. // Before using a peer, you will need to perform a handshake on connection.
  76. type peer struct {
  77. cmn.BaseService
  78. // raw peerConn and the multiplex connection
  79. peerConn
  80. mconn *tmconn.MConnection
  81. // peer's node info and the channel it knows about
  82. // channels = nodeInfo.Channels
  83. // cached to avoid copying nodeInfo in hasChannel
  84. nodeInfo NodeInfo
  85. channels []byte
  86. // User data
  87. Data *cmn.CMap
  88. metrics *Metrics
  89. metricsTicker *time.Ticker
  90. }
  91. type PeerOption func(*peer)
  92. func newPeer(
  93. pc peerConn,
  94. mConfig tmconn.MConnConfig,
  95. nodeInfo NodeInfo,
  96. reactorsByCh map[byte]Reactor,
  97. chDescs []*tmconn.ChannelDescriptor,
  98. onPeerError func(Peer, interface{}),
  99. options ...PeerOption,
  100. ) *peer {
  101. p := &peer{
  102. peerConn: pc,
  103. nodeInfo: nodeInfo,
  104. channels: nodeInfo.(DefaultNodeInfo).Channels, // TODO
  105. Data: cmn.NewCMap(),
  106. metricsTicker: time.NewTicker(metricsTickerDuration),
  107. metrics: NopMetrics(),
  108. }
  109. p.mconn = createMConnection(
  110. pc.conn,
  111. p,
  112. reactorsByCh,
  113. chDescs,
  114. onPeerError,
  115. mConfig,
  116. )
  117. p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
  118. for _, option := range options {
  119. option(p)
  120. }
  121. return p
  122. }
  123. // String representation.
  124. func (p *peer) String() string {
  125. if p.outbound {
  126. return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID())
  127. }
  128. return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
  129. }
  130. //---------------------------------------------------
  131. // Implements cmn.Service
  132. // SetLogger implements BaseService.
  133. func (p *peer) SetLogger(l log.Logger) {
  134. p.Logger = l
  135. p.mconn.SetLogger(l)
  136. }
  137. // OnStart implements BaseService.
  138. func (p *peer) OnStart() error {
  139. if err := p.BaseService.OnStart(); err != nil {
  140. return err
  141. }
  142. if err := p.mconn.Start(); err != nil {
  143. return err
  144. }
  145. go p.metricsReporter()
  146. return nil
  147. }
  148. // OnStop implements BaseService.
  149. func (p *peer) OnStop() {
  150. p.metricsTicker.Stop()
  151. p.BaseService.OnStop()
  152. p.mconn.Stop() // stop everything and close the conn
  153. }
  154. //---------------------------------------------------
  155. // Implements Peer
  156. // ID returns the peer's ID - the hex encoded hash of its pubkey.
  157. func (p *peer) ID() ID {
  158. return p.nodeInfo.ID()
  159. }
  160. // IsOutbound returns true if the connection is outbound, false otherwise.
  161. func (p *peer) IsOutbound() bool {
  162. return p.peerConn.outbound
  163. }
  164. // IsPersistent returns true if the peer is persitent, false otherwise.
  165. func (p *peer) IsPersistent() bool {
  166. return p.peerConn.persistent
  167. }
  168. // NodeInfo returns a copy of the peer's NodeInfo.
  169. func (p *peer) NodeInfo() NodeInfo {
  170. return p.nodeInfo
  171. }
  172. // OriginalAddr returns the original address, which was used to connect with
  173. // the peer. Returns nil for inbound peers.
  174. func (p *peer) OriginalAddr() *NetAddress {
  175. if p.peerConn.outbound {
  176. return p.peerConn.originalAddr
  177. }
  178. return nil
  179. }
  180. // Status returns the peer's ConnectionStatus.
  181. func (p *peer) Status() tmconn.ConnectionStatus {
  182. return p.mconn.Status()
  183. }
  184. // Send msg bytes to the channel identified by chID byte. Returns false if the
  185. // send queue is full after timeout, specified by MConnection.
  186. func (p *peer) Send(chID byte, msgBytes []byte) bool {
  187. if !p.IsRunning() {
  188. // see Switch#Broadcast, where we fetch the list of peers and loop over
  189. // them - while we're looping, one peer may be removed and stopped.
  190. return false
  191. } else if !p.hasChannel(chID) {
  192. return false
  193. }
  194. res := p.mconn.Send(chID, msgBytes)
  195. if res {
  196. p.metrics.PeerSendBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
  197. }
  198. return res
  199. }
  200. // TrySend msg bytes to the channel identified by chID byte. Immediately returns
  201. // false if the send queue is full.
  202. func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
  203. if !p.IsRunning() {
  204. return false
  205. } else if !p.hasChannel(chID) {
  206. return false
  207. }
  208. res := p.mconn.TrySend(chID, msgBytes)
  209. if res {
  210. p.metrics.PeerSendBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
  211. }
  212. return res
  213. }
  214. // Get the data for a given key.
  215. func (p *peer) Get(key string) interface{} {
  216. return p.Data.Get(key)
  217. }
  218. // Set sets the data for the given key.
  219. func (p *peer) Set(key string, data interface{}) {
  220. p.Data.Set(key, data)
  221. }
  222. // hasChannel returns true if the peer reported
  223. // knowing about the given chID.
  224. func (p *peer) hasChannel(chID byte) bool {
  225. for _, ch := range p.channels {
  226. if ch == chID {
  227. return true
  228. }
  229. }
  230. // NOTE: probably will want to remove this
  231. // but could be helpful while the feature is new
  232. p.Logger.Debug(
  233. "Unknown channel for peer",
  234. "channel",
  235. chID,
  236. "channels",
  237. p.channels,
  238. )
  239. return false
  240. }
  241. //---------------------------------------------------
  242. // methods only used for testing
  243. // TODO: can we remove these?
  244. // CloseConn closes the underlying connection
  245. func (pc *peerConn) CloseConn() {
  246. pc.conn.Close() // nolint: errcheck
  247. }
  248. // Addr returns peer's remote network address.
  249. func (p *peer) Addr() net.Addr {
  250. return p.peerConn.conn.RemoteAddr()
  251. }
  252. // CanSend returns true if the send queue is not full, false otherwise.
  253. func (p *peer) CanSend(chID byte) bool {
  254. if !p.IsRunning() {
  255. return false
  256. }
  257. return p.mconn.CanSend(chID)
  258. }
  259. //---------------------------------------------------
  260. func PeerMetrics(metrics *Metrics) PeerOption {
  261. return func(p *peer) {
  262. p.metrics = metrics
  263. }
  264. }
  265. func (p *peer) metricsReporter() {
  266. for {
  267. select {
  268. case <-p.metricsTicker.C:
  269. status := p.mconn.Status()
  270. var sendQueueSize float64
  271. for _, chStatus := range status.Channels {
  272. sendQueueSize += float64(chStatus.SendQueueSize)
  273. }
  274. p.metrics.PeerPendingSendBytes.With("peer_id", string(p.ID())).Set(sendQueueSize)
  275. case <-p.Quit():
  276. return
  277. }
  278. }
  279. }
  280. //------------------------------------------------------------------
  281. // helper funcs
  282. func createMConnection(
  283. conn net.Conn,
  284. p *peer,
  285. reactorsByCh map[byte]Reactor,
  286. chDescs []*tmconn.ChannelDescriptor,
  287. onPeerError func(Peer, interface{}),
  288. config tmconn.MConnConfig,
  289. ) *tmconn.MConnection {
  290. onReceive := func(chID byte, msgBytes []byte) {
  291. reactor := reactorsByCh[chID]
  292. if reactor == nil {
  293. // Note that its ok to panic here as it's caught in the conn._recover,
  294. // which does onPeerError.
  295. panic(fmt.Sprintf("Unknown channel %X", chID))
  296. }
  297. p.metrics.PeerReceiveBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
  298. reactor.Receive(chID, p, msgBytes)
  299. }
  300. onError := func(r interface{}) {
  301. onPeerError(p, r)
  302. }
  303. return tmconn.NewMConnectionWithConfig(
  304. conn,
  305. chDescs,
  306. onReceive,
  307. onError,
  308. config,
  309. )
  310. }