You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

401 lines
9.3 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "net"
  5. "sync/atomic"
  6. "time"
  7. cmn "github.com/tendermint/tendermint/libs/common"
  8. "github.com/tendermint/tendermint/libs/log"
  9. "github.com/tendermint/tendermint/config"
  10. tmconn "github.com/tendermint/tendermint/p2p/conn"
  11. )
  12. const metricsTickerDuration = 10 * time.Second
  13. var testIPSuffix uint32
  14. // Peer is an interface representing a peer connected on a reactor.
  15. type Peer interface {
  16. cmn.Service
  17. ID() ID // peer's cryptographic ID
  18. RemoteIP() net.IP // remote IP of the connection
  19. IsOutbound() bool // did we dial the peer
  20. IsPersistent() bool // do we redial this peer when we disconnect
  21. NodeInfo() NodeInfo // peer's info
  22. Status() tmconn.ConnectionStatus
  23. OriginalAddr() *NetAddress
  24. Send(byte, []byte) bool
  25. TrySend(byte, []byte) bool
  26. Set(string, interface{})
  27. Get(string) interface{}
  28. }
  29. //----------------------------------------------------------
  30. // peerConn contains the raw connection and its config.
  31. type peerConn struct {
  32. outbound bool
  33. persistent bool
  34. config *config.P2PConfig
  35. conn net.Conn // source connection
  36. ip net.IP
  37. originalAddr *NetAddress // nil for inbound connections
  38. }
  39. // ID only exists for SecretConnection.
  40. // NOTE: Will panic if conn is not *SecretConnection.
  41. func (pc peerConn) ID() ID {
  42. return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey())
  43. }
  44. // Return the IP from the connection RemoteAddr
  45. func (pc peerConn) RemoteIP() net.IP {
  46. if pc.ip != nil {
  47. return pc.ip
  48. }
  49. // In test cases a conn could not be present at all or be an in-memory
  50. // implementation where we want to return a fake ip.
  51. if pc.conn == nil || pc.conn.RemoteAddr().String() == "pipe" {
  52. pc.ip = net.IP{172, 16, 0, byte(atomic.AddUint32(&testIPSuffix, 1))}
  53. return pc.ip
  54. }
  55. host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String())
  56. if err != nil {
  57. panic(err)
  58. }
  59. ips, err := net.LookupIP(host)
  60. if err != nil {
  61. panic(err)
  62. }
  63. pc.ip = ips[0]
  64. return pc.ip
  65. }
  66. // peer implements Peer.
  67. //
  68. // Before using a peer, you will need to perform a handshake on connection.
  69. type peer struct {
  70. cmn.BaseService
  71. // raw peerConn and the multiplex connection
  72. peerConn
  73. mconn *tmconn.MConnection
  74. // peer's node info and the channel it knows about
  75. // channels = nodeInfo.Channels
  76. // cached to avoid copying nodeInfo in hasChannel
  77. nodeInfo NodeInfo
  78. channels []byte
  79. // User data
  80. Data *cmn.CMap
  81. metrics *Metrics
  82. metricsTicker *time.Ticker
  83. }
  84. type PeerOption func(*peer)
  85. func newPeer(
  86. pc peerConn,
  87. mConfig tmconn.MConnConfig,
  88. nodeInfo NodeInfo,
  89. reactorsByCh map[byte]Reactor,
  90. chDescs []*tmconn.ChannelDescriptor,
  91. onPeerError func(Peer, interface{}),
  92. options ...PeerOption,
  93. ) *peer {
  94. p := &peer{
  95. peerConn: pc,
  96. nodeInfo: nodeInfo,
  97. channels: nodeInfo.Channels,
  98. Data: cmn.NewCMap(),
  99. metricsTicker: time.NewTicker(metricsTickerDuration),
  100. metrics: NopMetrics(),
  101. }
  102. p.mconn = createMConnection(
  103. pc.conn,
  104. p,
  105. reactorsByCh,
  106. chDescs,
  107. onPeerError,
  108. mConfig,
  109. )
  110. p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
  111. for _, option := range options {
  112. option(p)
  113. }
  114. return p
  115. }
  116. //---------------------------------------------------
  117. // Implements cmn.Service
  118. // SetLogger implements BaseService.
  119. func (p *peer) SetLogger(l log.Logger) {
  120. p.Logger = l
  121. p.mconn.SetLogger(l)
  122. }
  123. // OnStart implements BaseService.
  124. func (p *peer) OnStart() error {
  125. if err := p.BaseService.OnStart(); err != nil {
  126. return err
  127. }
  128. if err := p.mconn.Start(); err != nil {
  129. return err
  130. }
  131. go p.metricsReporter()
  132. return nil
  133. }
  134. // OnStop implements BaseService.
  135. func (p *peer) OnStop() {
  136. p.metricsTicker.Stop()
  137. p.BaseService.OnStop()
  138. p.mconn.Stop() // stop everything and close the conn
  139. }
  140. //---------------------------------------------------
  141. // Implements Peer
  142. // ID returns the peer's ID - the hex encoded hash of its pubkey.
  143. func (p *peer) ID() ID {
  144. return p.nodeInfo.ID
  145. }
  146. // IsOutbound returns true if the connection is outbound, false otherwise.
  147. func (p *peer) IsOutbound() bool {
  148. return p.peerConn.outbound
  149. }
  150. // IsPersistent returns true if the peer is persitent, false otherwise.
  151. func (p *peer) IsPersistent() bool {
  152. return p.peerConn.persistent
  153. }
  154. // NodeInfo returns a copy of the peer's NodeInfo.
  155. func (p *peer) NodeInfo() NodeInfo {
  156. return p.nodeInfo
  157. }
  158. // OriginalAddr returns the original address, which was used to connect with
  159. // the peer. Returns nil for inbound peers.
  160. func (p *peer) OriginalAddr() *NetAddress {
  161. if p.peerConn.outbound {
  162. return p.peerConn.originalAddr
  163. }
  164. return nil
  165. }
  166. // Status returns the peer's ConnectionStatus.
  167. func (p *peer) Status() tmconn.ConnectionStatus {
  168. return p.mconn.Status()
  169. }
  170. // Send msg bytes to the channel identified by chID byte. Returns false if the
  171. // send queue is full after timeout, specified by MConnection.
  172. func (p *peer) Send(chID byte, msgBytes []byte) bool {
  173. if !p.IsRunning() {
  174. // see Switch#Broadcast, where we fetch the list of peers and loop over
  175. // them - while we're looping, one peer may be removed and stopped.
  176. return false
  177. } else if !p.hasChannel(chID) {
  178. return false
  179. }
  180. res := p.mconn.Send(chID, msgBytes)
  181. if res {
  182. p.metrics.PeerSendBytesTotal.With("peer-id", string(p.ID())).Add(float64(len(msgBytes)))
  183. }
  184. return res
  185. }
  186. // TrySend msg bytes to the channel identified by chID byte. Immediately returns
  187. // false if the send queue is full.
  188. func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
  189. if !p.IsRunning() {
  190. return false
  191. } else if !p.hasChannel(chID) {
  192. return false
  193. }
  194. res := p.mconn.TrySend(chID, msgBytes)
  195. if res {
  196. p.metrics.PeerSendBytesTotal.With("peer-id", string(p.ID())).Add(float64(len(msgBytes)))
  197. }
  198. return res
  199. }
  200. // Get the data for a given key.
  201. func (p *peer) Get(key string) interface{} {
  202. return p.Data.Get(key)
  203. }
  204. // Set sets the data for the given key.
  205. func (p *peer) Set(key string, data interface{}) {
  206. p.Data.Set(key, data)
  207. }
  208. // hasChannel returns true if the peer reported
  209. // knowing about the given chID.
  210. func (p *peer) hasChannel(chID byte) bool {
  211. for _, ch := range p.channels {
  212. if ch == chID {
  213. return true
  214. }
  215. }
  216. // NOTE: probably will want to remove this
  217. // but could be helpful while the feature is new
  218. p.Logger.Debug(
  219. "Unknown channel for peer",
  220. "channel",
  221. chID,
  222. "channels",
  223. p.channels,
  224. )
  225. return false
  226. }
  227. //---------------------------------------------------
  228. // methods used by the Switch
  229. // CloseConn should be called by the Switch if the peer was created but never
  230. // started.
  231. func (pc *peerConn) CloseConn() {
  232. pc.conn.Close() // nolint: errcheck
  233. }
  234. // HandshakeTimeout performs the Tendermint P2P handshake between a given node
  235. // and the peer by exchanging their NodeInfo. It sets the received nodeInfo on
  236. // the peer.
  237. // NOTE: blocking
  238. func (pc *peerConn) HandshakeTimeout(
  239. ourNodeInfo NodeInfo,
  240. timeout time.Duration,
  241. ) (peerNodeInfo NodeInfo, err error) {
  242. // Set deadline for handshake so we don't block forever on conn.ReadFull
  243. if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
  244. return peerNodeInfo, cmn.ErrorWrap(err, "Error setting deadline")
  245. }
  246. var trs, _ = cmn.Parallel(
  247. func(_ int) (val interface{}, err error, abort bool) {
  248. _, err = cdc.MarshalBinaryWriter(pc.conn, ourNodeInfo)
  249. return
  250. },
  251. func(_ int) (val interface{}, err error, abort bool) {
  252. _, err = cdc.UnmarshalBinaryReader(
  253. pc.conn,
  254. &peerNodeInfo,
  255. int64(MaxNodeInfoSize()),
  256. )
  257. return
  258. },
  259. )
  260. if err := trs.FirstError(); err != nil {
  261. return peerNodeInfo, cmn.ErrorWrap(err, "Error during handshake")
  262. }
  263. // Remove deadline
  264. if err := pc.conn.SetDeadline(time.Time{}); err != nil {
  265. return peerNodeInfo, cmn.ErrorWrap(err, "Error removing deadline")
  266. }
  267. return peerNodeInfo, nil
  268. }
  269. // Addr returns peer's remote network address.
  270. func (p *peer) Addr() net.Addr {
  271. return p.peerConn.conn.RemoteAddr()
  272. }
  273. // CanSend returns true if the send queue is not full, false otherwise.
  274. func (p *peer) CanSend(chID byte) bool {
  275. if !p.IsRunning() {
  276. return false
  277. }
  278. return p.mconn.CanSend(chID)
  279. }
  280. // String representation.
  281. func (p *peer) String() string {
  282. if p.outbound {
  283. return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID())
  284. }
  285. return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
  286. }
  287. func PeerMetrics(metrics *Metrics) PeerOption {
  288. return func(p *peer) {
  289. p.metrics = metrics
  290. }
  291. }
  292. func (p *peer) metricsReporter() {
  293. for {
  294. select {
  295. case <-p.metricsTicker.C:
  296. status := p.mconn.Status()
  297. var sendQueueSize float64
  298. for _, chStatus := range status.Channels {
  299. sendQueueSize += float64(chStatus.SendQueueSize)
  300. }
  301. p.metrics.PeerPendingSendBytes.With("peer-id", string(p.ID())).Set(sendQueueSize)
  302. case <-p.Quit():
  303. return
  304. }
  305. }
  306. }
  307. //------------------------------------------------------------------
  308. // helper funcs
  309. func createMConnection(
  310. conn net.Conn,
  311. p *peer,
  312. reactorsByCh map[byte]Reactor,
  313. chDescs []*tmconn.ChannelDescriptor,
  314. onPeerError func(Peer, interface{}),
  315. config tmconn.MConnConfig,
  316. ) *tmconn.MConnection {
  317. onReceive := func(chID byte, msgBytes []byte) {
  318. reactor := reactorsByCh[chID]
  319. if reactor == nil {
  320. // Note that its ok to panic here as it's caught in the conn._recover,
  321. // which does onPeerError.
  322. panic(fmt.Sprintf("Unknown channel %X", chID))
  323. }
  324. p.metrics.PeerReceiveBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
  325. reactor.Receive(chID, p, msgBytes)
  326. }
  327. onError := func(r interface{}) {
  328. onPeerError(p, r)
  329. }
  330. return tmconn.NewMConnectionWithConfig(
  331. conn,
  332. chDescs,
  333. onReceive,
  334. onError,
  335. config,
  336. )
  337. }