You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

346 lines
7.6 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "net"
  5. "time"
  6. cmn "github.com/tendermint/tendermint/libs/common"
  7. "github.com/tendermint/tendermint/libs/log"
  8. "github.com/tendermint/tendermint/config"
  9. tmconn "github.com/tendermint/tendermint/p2p/conn"
  10. )
  11. const metricsTickerDuration = 10 * time.Second
  12. // Peer is an interface representing a peer connected on a reactor.
  13. type Peer interface {
  14. cmn.Service
  15. ID() ID // peer's cryptographic ID
  16. RemoteIP() net.IP // remote IP of the connection
  17. IsOutbound() bool // did we dial the peer
  18. IsPersistent() bool // do we redial this peer when we disconnect
  19. NodeInfo() NodeInfo // peer's info
  20. Status() tmconn.ConnectionStatus
  21. Send(byte, []byte) bool
  22. TrySend(byte, []byte) bool
  23. Set(string, interface{})
  24. Get(string) interface{}
  25. }
  26. //----------------------------------------------------------
  27. // peerConn contains the raw connection and its config.
  28. type peerConn struct {
  29. outbound bool
  30. persistent bool
  31. config *config.P2PConfig
  32. conn net.Conn // source connection
  33. // cached RemoteIP()
  34. ip net.IP
  35. }
  36. // ID only exists for SecretConnection.
  37. // NOTE: Will panic if conn is not *SecretConnection.
  38. func (pc peerConn) ID() ID {
  39. return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey())
  40. }
  41. // Return the IP from the connection RemoteAddr
  42. func (pc peerConn) RemoteIP() net.IP {
  43. if pc.ip != nil {
  44. return pc.ip
  45. }
  46. host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String())
  47. if err != nil {
  48. panic(err)
  49. }
  50. ips, err := net.LookupIP(host)
  51. if err != nil {
  52. panic(err)
  53. }
  54. pc.ip = ips[0]
  55. return pc.ip
  56. }
  57. // peer implements Peer.
  58. //
  59. // Before using a peer, you will need to perform a handshake on connection.
  60. type peer struct {
  61. cmn.BaseService
  62. // raw peerConn and the multiplex connection
  63. peerConn
  64. mconn *tmconn.MConnection
  65. // peer's node info and the channel it knows about
  66. // channels = nodeInfo.Channels
  67. // cached to avoid copying nodeInfo in hasChannel
  68. nodeInfo NodeInfo
  69. channels []byte
  70. // User data
  71. Data *cmn.CMap
  72. metrics *Metrics
  73. metricsTicker *time.Ticker
  74. }
  75. type PeerOption func(*peer)
  76. func newPeer(
  77. pc peerConn,
  78. mConfig tmconn.MConnConfig,
  79. nodeInfo NodeInfo,
  80. reactorsByCh map[byte]Reactor,
  81. chDescs []*tmconn.ChannelDescriptor,
  82. onPeerError func(Peer, interface{}),
  83. options ...PeerOption,
  84. ) *peer {
  85. p := &peer{
  86. peerConn: pc,
  87. nodeInfo: nodeInfo,
  88. channels: nodeInfo.(DefaultNodeInfo).Channels, // TODO
  89. Data: cmn.NewCMap(),
  90. metricsTicker: time.NewTicker(metricsTickerDuration),
  91. metrics: NopMetrics(),
  92. }
  93. p.mconn = createMConnection(
  94. pc.conn,
  95. p,
  96. reactorsByCh,
  97. chDescs,
  98. onPeerError,
  99. mConfig,
  100. )
  101. p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
  102. for _, option := range options {
  103. option(p)
  104. }
  105. return p
  106. }
  107. // String representation.
  108. func (p *peer) String() string {
  109. if p.outbound {
  110. return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID())
  111. }
  112. return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
  113. }
  114. //---------------------------------------------------
  115. // Implements cmn.Service
  116. // SetLogger implements BaseService.
  117. func (p *peer) SetLogger(l log.Logger) {
  118. p.Logger = l
  119. p.mconn.SetLogger(l)
  120. }
  121. // OnStart implements BaseService.
  122. func (p *peer) OnStart() error {
  123. if err := p.BaseService.OnStart(); err != nil {
  124. return err
  125. }
  126. if err := p.mconn.Start(); err != nil {
  127. return err
  128. }
  129. go p.metricsReporter()
  130. return nil
  131. }
  132. // OnStop implements BaseService.
  133. func (p *peer) OnStop() {
  134. p.metricsTicker.Stop()
  135. p.BaseService.OnStop()
  136. p.mconn.Stop() // stop everything and close the conn
  137. }
  138. //---------------------------------------------------
  139. // Implements Peer
  140. // ID returns the peer's ID - the hex encoded hash of its pubkey.
  141. func (p *peer) ID() ID {
  142. return p.nodeInfo.ID()
  143. }
  144. // IsOutbound returns true if the connection is outbound, false otherwise.
  145. func (p *peer) IsOutbound() bool {
  146. return p.peerConn.outbound
  147. }
  148. // IsPersistent returns true if the peer is persitent, false otherwise.
  149. func (p *peer) IsPersistent() bool {
  150. return p.peerConn.persistent
  151. }
  152. // NodeInfo returns a copy of the peer's NodeInfo.
  153. func (p *peer) NodeInfo() NodeInfo {
  154. return p.nodeInfo
  155. }
  156. // Status returns the peer's ConnectionStatus.
  157. func (p *peer) Status() tmconn.ConnectionStatus {
  158. return p.mconn.Status()
  159. }
  160. // Send msg bytes to the channel identified by chID byte. Returns false if the
  161. // send queue is full after timeout, specified by MConnection.
  162. func (p *peer) Send(chID byte, msgBytes []byte) bool {
  163. if !p.IsRunning() {
  164. // see Switch#Broadcast, where we fetch the list of peers and loop over
  165. // them - while we're looping, one peer may be removed and stopped.
  166. return false
  167. } else if !p.hasChannel(chID) {
  168. return false
  169. }
  170. res := p.mconn.Send(chID, msgBytes)
  171. if res {
  172. p.metrics.PeerSendBytesTotal.With("peer-id", string(p.ID())).Add(float64(len(msgBytes)))
  173. }
  174. return res
  175. }
  176. // TrySend msg bytes to the channel identified by chID byte. Immediately returns
  177. // false if the send queue is full.
  178. func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
  179. if !p.IsRunning() {
  180. return false
  181. } else if !p.hasChannel(chID) {
  182. return false
  183. }
  184. res := p.mconn.TrySend(chID, msgBytes)
  185. if res {
  186. p.metrics.PeerSendBytesTotal.With("peer-id", string(p.ID())).Add(float64(len(msgBytes)))
  187. }
  188. return res
  189. }
  190. // Get the data for a given key.
  191. func (p *peer) Get(key string) interface{} {
  192. return p.Data.Get(key)
  193. }
  194. // Set sets the data for the given key.
  195. func (p *peer) Set(key string, data interface{}) {
  196. p.Data.Set(key, data)
  197. }
  198. // hasChannel returns true if the peer reported
  199. // knowing about the given chID.
  200. func (p *peer) hasChannel(chID byte) bool {
  201. for _, ch := range p.channels {
  202. if ch == chID {
  203. return true
  204. }
  205. }
  206. // NOTE: probably will want to remove this
  207. // but could be helpful while the feature is new
  208. p.Logger.Debug(
  209. "Unknown channel for peer",
  210. "channel",
  211. chID,
  212. "channels",
  213. p.channels,
  214. )
  215. return false
  216. }
  217. //---------------------------------------------------
  218. // methods only used for testing
  219. // TODO: can we remove these?
  220. // CloseConn closes the underlying connection
  221. func (pc *peerConn) CloseConn() {
  222. pc.conn.Close() // nolint: errcheck
  223. }
  224. // Addr returns peer's remote network address.
  225. func (p *peer) Addr() net.Addr {
  226. return p.peerConn.conn.RemoteAddr()
  227. }
  228. // CanSend returns true if the send queue is not full, false otherwise.
  229. func (p *peer) CanSend(chID byte) bool {
  230. if !p.IsRunning() {
  231. return false
  232. }
  233. return p.mconn.CanSend(chID)
  234. }
  235. //---------------------------------------------------
  236. func PeerMetrics(metrics *Metrics) PeerOption {
  237. return func(p *peer) {
  238. p.metrics = metrics
  239. }
  240. }
  241. func (p *peer) metricsReporter() {
  242. for {
  243. select {
  244. case <-p.metricsTicker.C:
  245. status := p.mconn.Status()
  246. var sendQueueSize float64
  247. for _, chStatus := range status.Channels {
  248. sendQueueSize += float64(chStatus.SendQueueSize)
  249. }
  250. p.metrics.PeerPendingSendBytes.With("peer-id", string(p.ID())).Set(sendQueueSize)
  251. case <-p.Quit():
  252. return
  253. }
  254. }
  255. }
  256. //------------------------------------------------------------------
  257. // helper funcs
  258. func createMConnection(
  259. conn net.Conn,
  260. p *peer,
  261. reactorsByCh map[byte]Reactor,
  262. chDescs []*tmconn.ChannelDescriptor,
  263. onPeerError func(Peer, interface{}),
  264. config tmconn.MConnConfig,
  265. ) *tmconn.MConnection {
  266. onReceive := func(chID byte, msgBytes []byte) {
  267. reactor := reactorsByCh[chID]
  268. if reactor == nil {
  269. // Note that its ok to panic here as it's caught in the conn._recover,
  270. // which does onPeerError.
  271. panic(fmt.Sprintf("Unknown channel %X", chID))
  272. }
  273. p.metrics.PeerReceiveBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
  274. reactor.Receive(chID, p, msgBytes)
  275. }
  276. onError := func(r interface{}) {
  277. onPeerError(p, r)
  278. }
  279. return tmconn.NewMConnectionWithConfig(
  280. conn,
  281. chDescs,
  282. onReceive,
  283. onError,
  284. config,
  285. )
  286. }