You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

350 lines
8.2 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "net"
  5. "sync/atomic"
  6. "time"
  7. cmn "github.com/tendermint/tendermint/libs/common"
  8. "github.com/tendermint/tendermint/libs/log"
  9. "github.com/tendermint/tendermint/config"
  10. tmconn "github.com/tendermint/tendermint/p2p/conn"
  11. )
  12. var testIPSuffix uint32
  13. // Peer is an interface representing a peer connected on a reactor.
  14. type Peer interface {
  15. cmn.Service
  16. ID() ID // peer's cryptographic ID
  17. RemoteIP() net.IP // remote IP of the connection
  18. IsOutbound() bool // did we dial the peer
  19. IsPersistent() bool // do we redial this peer when we disconnect
  20. NodeInfo() NodeInfo // peer's info
  21. Status() tmconn.ConnectionStatus
  22. OriginalAddr() *NetAddress
  23. Send(byte, []byte) bool
  24. TrySend(byte, []byte) bool
  25. Set(string, interface{})
  26. Get(string) interface{}
  27. }
  28. //----------------------------------------------------------
  29. // peerConn contains the raw connection and its config.
  30. type peerConn struct {
  31. outbound bool
  32. persistent bool
  33. config *config.P2PConfig
  34. conn net.Conn // source connection
  35. ip net.IP
  36. originalAddr *NetAddress // nil for inbound connections
  37. }
  38. // ID only exists for SecretConnection.
  39. // NOTE: Will panic if conn is not *SecretConnection.
  40. func (pc peerConn) ID() ID {
  41. return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey())
  42. }
  43. // Return the IP from the connection RemoteAddr
  44. func (pc peerConn) RemoteIP() net.IP {
  45. if pc.ip != nil {
  46. return pc.ip
  47. }
  48. // In test cases a conn could not be present at all or be an in-memory
  49. // implementation where we want to return a fake ip.
  50. if pc.conn == nil || pc.conn.RemoteAddr().String() == "pipe" {
  51. pc.ip = net.IP{172, 16, 0, byte(atomic.AddUint32(&testIPSuffix, 1))}
  52. return pc.ip
  53. }
  54. host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String())
  55. if err != nil {
  56. panic(err)
  57. }
  58. ips, err := net.LookupIP(host)
  59. if err != nil {
  60. panic(err)
  61. }
  62. pc.ip = ips[0]
  63. return pc.ip
  64. }
  65. // peer implements Peer.
  66. //
  67. // Before using a peer, you will need to perform a handshake on connection.
  68. type peer struct {
  69. cmn.BaseService
  70. // raw peerConn and the multiplex connection
  71. peerConn
  72. mconn *tmconn.MConnection
  73. // peer's node info and the channel it knows about
  74. // channels = nodeInfo.Channels
  75. // cached to avoid copying nodeInfo in hasChannel
  76. nodeInfo NodeInfo
  77. channels []byte
  78. // User data
  79. Data *cmn.CMap
  80. }
  81. func newPeer(
  82. pc peerConn,
  83. mConfig tmconn.MConnConfig,
  84. nodeInfo NodeInfo,
  85. reactorsByCh map[byte]Reactor,
  86. chDescs []*tmconn.ChannelDescriptor,
  87. onPeerError func(Peer, interface{}),
  88. ) *peer {
  89. p := &peer{
  90. peerConn: pc,
  91. nodeInfo: nodeInfo,
  92. channels: nodeInfo.Channels,
  93. Data: cmn.NewCMap(),
  94. }
  95. p.mconn = createMConnection(
  96. pc.conn,
  97. p,
  98. reactorsByCh,
  99. chDescs,
  100. onPeerError,
  101. mConfig,
  102. )
  103. p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
  104. return p
  105. }
  106. //---------------------------------------------------
  107. // Implements cmn.Service
  108. // SetLogger implements BaseService.
  109. func (p *peer) SetLogger(l log.Logger) {
  110. p.Logger = l
  111. p.mconn.SetLogger(l)
  112. }
  113. // OnStart implements BaseService.
  114. func (p *peer) OnStart() error {
  115. if err := p.BaseService.OnStart(); err != nil {
  116. return err
  117. }
  118. err := p.mconn.Start()
  119. return err
  120. }
  121. // OnStop implements BaseService.
  122. func (p *peer) OnStop() {
  123. p.BaseService.OnStop()
  124. p.mconn.Stop() // stop everything and close the conn
  125. }
  126. //---------------------------------------------------
  127. // Implements Peer
  128. // ID returns the peer's ID - the hex encoded hash of its pubkey.
  129. func (p *peer) ID() ID {
  130. return p.nodeInfo.ID
  131. }
  132. // IsOutbound returns true if the connection is outbound, false otherwise.
  133. func (p *peer) IsOutbound() bool {
  134. return p.peerConn.outbound
  135. }
  136. // IsPersistent returns true if the peer is persitent, false otherwise.
  137. func (p *peer) IsPersistent() bool {
  138. return p.peerConn.persistent
  139. }
  140. // NodeInfo returns a copy of the peer's NodeInfo.
  141. func (p *peer) NodeInfo() NodeInfo {
  142. return p.nodeInfo
  143. }
  144. // OriginalAddr returns the original address, which was used to connect with
  145. // the peer. Returns nil for inbound peers.
  146. func (p *peer) OriginalAddr() *NetAddress {
  147. if p.peerConn.outbound {
  148. return p.peerConn.originalAddr
  149. }
  150. return nil
  151. }
  152. // Status returns the peer's ConnectionStatus.
  153. func (p *peer) Status() tmconn.ConnectionStatus {
  154. return p.mconn.Status()
  155. }
  156. // Send msg bytes to the channel identified by chID byte. Returns false if the
  157. // send queue is full after timeout, specified by MConnection.
  158. func (p *peer) Send(chID byte, msgBytes []byte) bool {
  159. if !p.IsRunning() {
  160. // see Switch#Broadcast, where we fetch the list of peers and loop over
  161. // them - while we're looping, one peer may be removed and stopped.
  162. return false
  163. } else if !p.hasChannel(chID) {
  164. return false
  165. }
  166. return p.mconn.Send(chID, msgBytes)
  167. }
  168. // TrySend msg bytes to the channel identified by chID byte. Immediately returns
  169. // false if the send queue is full.
  170. func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
  171. if !p.IsRunning() {
  172. return false
  173. } else if !p.hasChannel(chID) {
  174. return false
  175. }
  176. return p.mconn.TrySend(chID, msgBytes)
  177. }
  178. // Get the data for a given key.
  179. func (p *peer) Get(key string) interface{} {
  180. return p.Data.Get(key)
  181. }
  182. // Set sets the data for the given key.
  183. func (p *peer) Set(key string, data interface{}) {
  184. p.Data.Set(key, data)
  185. }
  186. // hasChannel returns true if the peer reported
  187. // knowing about the given chID.
  188. func (p *peer) hasChannel(chID byte) bool {
  189. for _, ch := range p.channels {
  190. if ch == chID {
  191. return true
  192. }
  193. }
  194. // NOTE: probably will want to remove this
  195. // but could be helpful while the feature is new
  196. p.Logger.Debug(
  197. "Unknown channel for peer",
  198. "channel",
  199. chID,
  200. "channels",
  201. p.channels,
  202. )
  203. return false
  204. }
  205. //---------------------------------------------------
  206. // methods used by the Switch
  207. // CloseConn should be called by the Switch if the peer was created but never
  208. // started.
  209. func (pc *peerConn) CloseConn() {
  210. pc.conn.Close() // nolint: errcheck
  211. }
  212. // HandshakeTimeout performs the Tendermint P2P handshake between a given node
  213. // and the peer by exchanging their NodeInfo. It sets the received nodeInfo on
  214. // the peer.
  215. // NOTE: blocking
  216. func (pc *peerConn) HandshakeTimeout(
  217. ourNodeInfo NodeInfo,
  218. timeout time.Duration,
  219. ) (peerNodeInfo NodeInfo, err error) {
  220. // Set deadline for handshake so we don't block forever on conn.ReadFull
  221. if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
  222. return peerNodeInfo, cmn.ErrorWrap(err, "Error setting deadline")
  223. }
  224. var trs, _ = cmn.Parallel(
  225. func(_ int) (val interface{}, err error, abort bool) {
  226. _, err = cdc.MarshalBinaryWriter(pc.conn, ourNodeInfo)
  227. return
  228. },
  229. func(_ int) (val interface{}, err error, abort bool) {
  230. _, err = cdc.UnmarshalBinaryReader(
  231. pc.conn,
  232. &peerNodeInfo,
  233. int64(MaxNodeInfoSize()),
  234. )
  235. return
  236. },
  237. )
  238. if err := trs.FirstError(); err != nil {
  239. return peerNodeInfo, cmn.ErrorWrap(err, "Error during handshake")
  240. }
  241. // Remove deadline
  242. if err := pc.conn.SetDeadline(time.Time{}); err != nil {
  243. return peerNodeInfo, cmn.ErrorWrap(err, "Error removing deadline")
  244. }
  245. return peerNodeInfo, nil
  246. }
  247. // Addr returns peer's remote network address.
  248. func (p *peer) Addr() net.Addr {
  249. return p.peerConn.conn.RemoteAddr()
  250. }
  251. // CanSend returns true if the send queue is not full, false otherwise.
  252. func (p *peer) CanSend(chID byte) bool {
  253. if !p.IsRunning() {
  254. return false
  255. }
  256. return p.mconn.CanSend(chID)
  257. }
  258. // String representation.
  259. func (p *peer) String() string {
  260. if p.outbound {
  261. return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID())
  262. }
  263. return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
  264. }
  265. //------------------------------------------------------------------
  266. // helper funcs
  267. func createMConnection(
  268. conn net.Conn,
  269. p *peer,
  270. reactorsByCh map[byte]Reactor,
  271. chDescs []*tmconn.ChannelDescriptor,
  272. onPeerError func(Peer, interface{}),
  273. config tmconn.MConnConfig,
  274. ) *tmconn.MConnection {
  275. onReceive := func(chID byte, msgBytes []byte) {
  276. reactor := reactorsByCh[chID]
  277. if reactor == nil {
  278. // Note that its ok to panic here as it's caught in the conn._recover,
  279. // which does onPeerError.
  280. panic(fmt.Sprintf("Unknown channel %X", chID))
  281. }
  282. reactor.Receive(chID, p, msgBytes)
  283. }
  284. onError := func(r interface{}) {
  285. onPeerError(p, r)
  286. }
  287. return tmconn.NewMConnectionWithConfig(
  288. conn,
  289. chDescs,
  290. onReceive,
  291. onError,
  292. config,
  293. )
  294. }