You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

380 lines
8.5 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "net"
  5. "time"
  6. cmn "github.com/tendermint/tendermint/libs/common"
  7. "github.com/tendermint/tendermint/libs/log"
  8. tmconn "github.com/tendermint/tendermint/p2p/conn"
  9. )
  10. const metricsTickerDuration = 10 * time.Second
  11. // Peer is an interface representing a peer connected on a reactor.
  12. type Peer interface {
  13. cmn.Service
  14. FlushStop()
  15. ID() ID // peer's cryptographic ID
  16. RemoteIP() net.IP // remote IP of the connection
  17. IsOutbound() bool // did we dial the peer
  18. IsPersistent() bool // do we redial this peer when we disconnect
  19. NodeInfo() NodeInfo // peer's info
  20. Status() tmconn.ConnectionStatus
  21. OriginalAddr() *NetAddress
  22. Send(byte, []byte) bool
  23. TrySend(byte, []byte) bool
  24. Set(string, interface{})
  25. Get(string) interface{}
  26. }
  27. //----------------------------------------------------------
  28. // peerConn contains the raw connection and its config.
  29. type peerConn struct {
  30. outbound bool
  31. persistent bool
  32. conn net.Conn // source connection
  33. originalAddr *NetAddress // nil for inbound connections
  34. // cached RemoteIP()
  35. ip net.IP
  36. }
  37. func newPeerConn(
  38. outbound, persistent bool,
  39. conn net.Conn,
  40. originalAddr *NetAddress,
  41. ) peerConn {
  42. return peerConn{
  43. outbound: outbound,
  44. persistent: persistent,
  45. conn: conn,
  46. originalAddr: originalAddr,
  47. }
  48. }
  49. // ID only exists for SecretConnection.
  50. // NOTE: Will panic if conn is not *SecretConnection.
  51. func (pc peerConn) ID() ID {
  52. return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey())
  53. }
  54. // Return the IP from the connection RemoteAddr
  55. func (pc peerConn) RemoteIP() net.IP {
  56. if pc.ip != nil {
  57. return pc.ip
  58. }
  59. host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String())
  60. if err != nil {
  61. panic(err)
  62. }
  63. ips, err := net.LookupIP(host)
  64. if err != nil {
  65. panic(err)
  66. }
  67. pc.ip = ips[0]
  68. return pc.ip
  69. }
  70. // peer implements Peer.
  71. //
  72. // Before using a peer, you will need to perform a handshake on connection.
  73. type peer struct {
  74. cmn.BaseService
  75. // raw peerConn and the multiplex connection
  76. peerConn
  77. mconn *tmconn.MConnection
  78. // peer's node info and the channel it knows about
  79. // channels = nodeInfo.Channels
  80. // cached to avoid copying nodeInfo in hasChannel
  81. nodeInfo NodeInfo
  82. channels []byte
  83. // User data
  84. Data *cmn.CMap
  85. metrics *Metrics
  86. metricsTicker *time.Ticker
  87. }
  88. type PeerOption func(*peer)
  89. func newPeer(
  90. pc peerConn,
  91. mConfig tmconn.MConnConfig,
  92. nodeInfo NodeInfo,
  93. reactorsByCh map[byte]Reactor,
  94. chDescs []*tmconn.ChannelDescriptor,
  95. onPeerError func(Peer, interface{}),
  96. options ...PeerOption,
  97. ) *peer {
  98. p := &peer{
  99. peerConn: pc,
  100. nodeInfo: nodeInfo,
  101. channels: nodeInfo.(DefaultNodeInfo).Channels, // TODO
  102. Data: cmn.NewCMap(),
  103. metricsTicker: time.NewTicker(metricsTickerDuration),
  104. metrics: NopMetrics(),
  105. }
  106. p.mconn = createMConnection(
  107. pc.conn,
  108. p,
  109. reactorsByCh,
  110. chDescs,
  111. onPeerError,
  112. mConfig,
  113. )
  114. p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
  115. for _, option := range options {
  116. option(p)
  117. }
  118. return p
  119. }
  120. // String representation.
  121. func (p *peer) String() string {
  122. if p.outbound {
  123. return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID())
  124. }
  125. return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
  126. }
  127. //---------------------------------------------------
  128. // Implements cmn.Service
  129. // SetLogger implements BaseService.
  130. func (p *peer) SetLogger(l log.Logger) {
  131. p.Logger = l
  132. p.mconn.SetLogger(l)
  133. }
  134. // OnStart implements BaseService.
  135. func (p *peer) OnStart() error {
  136. if err := p.BaseService.OnStart(); err != nil {
  137. return err
  138. }
  139. if err := p.mconn.Start(); err != nil {
  140. return err
  141. }
  142. go p.metricsReporter()
  143. return nil
  144. }
  145. // FlushStop mimics OnStop but additionally ensures that all successful
  146. // .Send() calls will get flushed before closing the connection.
  147. // NOTE: it is not safe to call this method more than once.
  148. func (p *peer) FlushStop() {
  149. p.metricsTicker.Stop()
  150. p.BaseService.OnStop()
  151. p.mconn.FlushStop() // stop everything and close the conn
  152. }
  153. // OnStop implements BaseService.
  154. func (p *peer) OnStop() {
  155. p.metricsTicker.Stop()
  156. p.BaseService.OnStop()
  157. p.mconn.Stop() // stop everything and close the conn
  158. }
  159. //---------------------------------------------------
  160. // Implements Peer
  161. // ID returns the peer's ID - the hex encoded hash of its pubkey.
  162. func (p *peer) ID() ID {
  163. return p.nodeInfo.ID()
  164. }
  165. // IsOutbound returns true if the connection is outbound, false otherwise.
  166. func (p *peer) IsOutbound() bool {
  167. return p.peerConn.outbound
  168. }
  169. // IsPersistent returns true if the peer is persitent, false otherwise.
  170. func (p *peer) IsPersistent() bool {
  171. return p.peerConn.persistent
  172. }
  173. // NodeInfo returns a copy of the peer's NodeInfo.
  174. func (p *peer) NodeInfo() NodeInfo {
  175. return p.nodeInfo
  176. }
  177. // OriginalAddr returns the original address, which was used to connect with
  178. // the peer. Returns nil for inbound peers.
  179. func (p *peer) OriginalAddr() *NetAddress {
  180. if p.peerConn.outbound {
  181. return p.peerConn.originalAddr
  182. }
  183. return nil
  184. }
  185. // Status returns the peer's ConnectionStatus.
  186. func (p *peer) Status() tmconn.ConnectionStatus {
  187. return p.mconn.Status()
  188. }
  189. // Send msg bytes to the channel identified by chID byte. Returns false if the
  190. // send queue is full after timeout, specified by MConnection.
  191. func (p *peer) Send(chID byte, msgBytes []byte) bool {
  192. if !p.IsRunning() {
  193. // see Switch#Broadcast, where we fetch the list of peers and loop over
  194. // them - while we're looping, one peer may be removed and stopped.
  195. return false
  196. } else if !p.hasChannel(chID) {
  197. return false
  198. }
  199. res := p.mconn.Send(chID, msgBytes)
  200. if res {
  201. p.metrics.PeerSendBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
  202. }
  203. return res
  204. }
  205. // TrySend msg bytes to the channel identified by chID byte. Immediately returns
  206. // false if the send queue is full.
  207. func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
  208. if !p.IsRunning() {
  209. return false
  210. } else if !p.hasChannel(chID) {
  211. return false
  212. }
  213. res := p.mconn.TrySend(chID, msgBytes)
  214. if res {
  215. p.metrics.PeerSendBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
  216. }
  217. return res
  218. }
  219. // Get the data for a given key.
  220. func (p *peer) Get(key string) interface{} {
  221. return p.Data.Get(key)
  222. }
  223. // Set sets the data for the given key.
  224. func (p *peer) Set(key string, data interface{}) {
  225. p.Data.Set(key, data)
  226. }
  227. // hasChannel returns true if the peer reported
  228. // knowing about the given chID.
  229. func (p *peer) hasChannel(chID byte) bool {
  230. for _, ch := range p.channels {
  231. if ch == chID {
  232. return true
  233. }
  234. }
  235. // NOTE: probably will want to remove this
  236. // but could be helpful while the feature is new
  237. p.Logger.Debug(
  238. "Unknown channel for peer",
  239. "channel",
  240. chID,
  241. "channels",
  242. p.channels,
  243. )
  244. return false
  245. }
  246. //---------------------------------------------------
  247. // methods only used for testing
  248. // TODO: can we remove these?
  249. // CloseConn closes the underlying connection
  250. func (pc *peerConn) CloseConn() {
  251. pc.conn.Close() // nolint: errcheck
  252. }
  253. // Addr returns peer's remote network address.
  254. func (p *peer) Addr() net.Addr {
  255. return p.peerConn.conn.RemoteAddr()
  256. }
  257. // CanSend returns true if the send queue is not full, false otherwise.
  258. func (p *peer) CanSend(chID byte) bool {
  259. if !p.IsRunning() {
  260. return false
  261. }
  262. return p.mconn.CanSend(chID)
  263. }
  264. //---------------------------------------------------
  265. func PeerMetrics(metrics *Metrics) PeerOption {
  266. return func(p *peer) {
  267. p.metrics = metrics
  268. }
  269. }
  270. func (p *peer) metricsReporter() {
  271. for {
  272. select {
  273. case <-p.metricsTicker.C:
  274. status := p.mconn.Status()
  275. var sendQueueSize float64
  276. for _, chStatus := range status.Channels {
  277. sendQueueSize += float64(chStatus.SendQueueSize)
  278. }
  279. p.metrics.PeerPendingSendBytes.With("peer_id", string(p.ID())).Set(sendQueueSize)
  280. case <-p.Quit():
  281. return
  282. }
  283. }
  284. }
  285. //------------------------------------------------------------------
  286. // helper funcs
  287. func createMConnection(
  288. conn net.Conn,
  289. p *peer,
  290. reactorsByCh map[byte]Reactor,
  291. chDescs []*tmconn.ChannelDescriptor,
  292. onPeerError func(Peer, interface{}),
  293. config tmconn.MConnConfig,
  294. ) *tmconn.MConnection {
  295. onReceive := func(chID byte, msgBytes []byte) {
  296. reactor := reactorsByCh[chID]
  297. if reactor == nil {
  298. // Note that its ok to panic here as it's caught in the conn._recover,
  299. // which does onPeerError.
  300. panic(fmt.Sprintf("Unknown channel %X", chID))
  301. }
  302. p.metrics.PeerReceiveBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
  303. reactor.Receive(chID, p, msgBytes)
  304. }
  305. onError := func(r interface{}) {
  306. onPeerError(p, r)
  307. }
  308. return tmconn.NewMConnectionWithConfig(
  309. conn,
  310. chDescs,
  311. onReceive,
  312. onError,
  313. config,
  314. )
  315. }