You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

431 lines
10 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "net"
  5. "sync/atomic"
  6. "time"
  7. crypto "github.com/tendermint/tendermint/crypto"
  8. cmn "github.com/tendermint/tendermint/libs/common"
  9. "github.com/tendermint/tendermint/libs/log"
  10. "github.com/tendermint/tendermint/config"
  11. tmconn "github.com/tendermint/tendermint/p2p/conn"
  12. )
  13. var testIPSuffix uint32
  14. // Peer is an interface representing a peer connected on a reactor.
  15. type Peer interface {
  16. cmn.Service
  17. ID() ID // peer's cryptographic ID
  18. RemoteIP() net.IP // remote IP of the connection
  19. IsOutbound() bool // did we dial the peer
  20. IsPersistent() bool // do we redial this peer when we disconnect
  21. NodeInfo() NodeInfo // peer's info
  22. Status() tmconn.ConnectionStatus
  23. Send(byte, []byte) bool
  24. TrySend(byte, []byte) bool
  25. Set(string, interface{})
  26. Get(string) interface{}
  27. }
  28. //----------------------------------------------------------
  29. // peerConn contains the raw connection and its config.
  30. type peerConn struct {
  31. outbound bool
  32. persistent bool
  33. config *config.P2PConfig
  34. conn net.Conn // source connection
  35. ip net.IP
  36. }
  37. // ID only exists for SecretConnection.
  38. // NOTE: Will panic if conn is not *SecretConnection.
  39. func (pc peerConn) ID() ID {
  40. return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey())
  41. }
  42. // Return the IP from the connection RemoteAddr
  43. func (pc peerConn) RemoteIP() net.IP {
  44. if pc.ip != nil {
  45. return pc.ip
  46. }
  47. // In test cases a conn could not be present at all or be an in-memory
  48. // implementation where we want to return a fake ip.
  49. if pc.conn == nil || pc.conn.RemoteAddr().String() == "pipe" {
  50. pc.ip = net.IP{172, 16, 0, byte(atomic.AddUint32(&testIPSuffix, 1))}
  51. return pc.ip
  52. }
  53. host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String())
  54. if err != nil {
  55. panic(err)
  56. }
  57. ips, err := net.LookupIP(host)
  58. if err != nil {
  59. panic(err)
  60. }
  61. pc.ip = ips[0]
  62. return pc.ip
  63. }
  64. // peer implements Peer.
  65. //
  66. // Before using a peer, you will need to perform a handshake on connection.
  67. type peer struct {
  68. cmn.BaseService
  69. // raw peerConn and the multiplex connection
  70. peerConn
  71. mconn *tmconn.MConnection
  72. // peer's node info and the channel it knows about
  73. // channels = nodeInfo.Channels
  74. // cached to avoid copying nodeInfo in hasChannel
  75. nodeInfo NodeInfo
  76. channels []byte
  77. // User data
  78. Data *cmn.CMap
  79. }
  80. func newPeer(
  81. pc peerConn,
  82. mConfig tmconn.MConnConfig,
  83. nodeInfo NodeInfo,
  84. reactorsByCh map[byte]Reactor,
  85. chDescs []*tmconn.ChannelDescriptor,
  86. onPeerError func(Peer, interface{}),
  87. ) *peer {
  88. p := &peer{
  89. peerConn: pc,
  90. nodeInfo: nodeInfo,
  91. channels: nodeInfo.Channels,
  92. Data: cmn.NewCMap(),
  93. }
  94. p.mconn = createMConnection(
  95. pc.conn,
  96. p,
  97. reactorsByCh,
  98. chDescs,
  99. onPeerError,
  100. mConfig,
  101. )
  102. p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
  103. return p
  104. }
  105. func newOutboundPeerConn(
  106. addr *NetAddress,
  107. config *config.P2PConfig,
  108. persistent bool,
  109. ourNodePrivKey crypto.PrivKey,
  110. ) (peerConn, error) {
  111. conn, err := dial(addr, config)
  112. if err != nil {
  113. return peerConn{}, cmn.ErrorWrap(err, "Error creating peer")
  114. }
  115. pc, err := newPeerConn(conn, config, true, persistent, ourNodePrivKey)
  116. if err != nil {
  117. if cerr := conn.Close(); cerr != nil {
  118. return peerConn{}, cmn.ErrorWrap(err, cerr.Error())
  119. }
  120. return peerConn{}, err
  121. }
  122. // ensure dialed ID matches connection ID
  123. if addr.ID != pc.ID() {
  124. if cerr := conn.Close(); cerr != nil {
  125. return peerConn{}, cmn.ErrorWrap(err, cerr.Error())
  126. }
  127. return peerConn{}, ErrSwitchAuthenticationFailure{addr, pc.ID()}
  128. }
  129. return pc, nil
  130. }
  131. func newInboundPeerConn(
  132. conn net.Conn,
  133. config *config.P2PConfig,
  134. ourNodePrivKey crypto.PrivKey,
  135. ) (peerConn, error) {
  136. // TODO: issue PoW challenge
  137. return newPeerConn(conn, config, false, false, ourNodePrivKey)
  138. }
  139. func newPeerConn(
  140. rawConn net.Conn,
  141. cfg *config.P2PConfig,
  142. outbound, persistent bool,
  143. ourNodePrivKey crypto.PrivKey,
  144. ) (pc peerConn, err error) {
  145. conn := rawConn
  146. // Fuzz connection
  147. if cfg.TestFuzz {
  148. // so we have time to do peer handshakes and get set up
  149. conn = FuzzConnAfterFromConfig(conn, 10*time.Second, cfg.TestFuzzConfig)
  150. }
  151. // Set deadline for secret handshake
  152. dl := time.Now().Add(cfg.HandshakeTimeout)
  153. if err := conn.SetDeadline(dl); err != nil {
  154. return pc, cmn.ErrorWrap(
  155. err,
  156. "Error setting deadline while encrypting connection",
  157. )
  158. }
  159. // Encrypt connection
  160. conn, err = tmconn.MakeSecretConnection(conn, ourNodePrivKey)
  161. if err != nil {
  162. return pc, cmn.ErrorWrap(err, "Error creating peer")
  163. }
  164. // Only the information we already have
  165. return peerConn{
  166. config: cfg,
  167. outbound: outbound,
  168. persistent: persistent,
  169. conn: conn,
  170. }, nil
  171. }
  172. //---------------------------------------------------
  173. // Implements cmn.Service
  174. // SetLogger implements BaseService.
  175. func (p *peer) SetLogger(l log.Logger) {
  176. p.Logger = l
  177. p.mconn.SetLogger(l)
  178. }
  179. // OnStart implements BaseService.
  180. func (p *peer) OnStart() error {
  181. if err := p.BaseService.OnStart(); err != nil {
  182. return err
  183. }
  184. err := p.mconn.Start()
  185. return err
  186. }
  187. // OnStop implements BaseService.
  188. func (p *peer) OnStop() {
  189. p.BaseService.OnStop()
  190. p.mconn.Stop() // stop everything and close the conn
  191. }
  192. //---------------------------------------------------
  193. // Implements Peer
  194. // ID returns the peer's ID - the hex encoded hash of its pubkey.
  195. func (p *peer) ID() ID {
  196. return p.nodeInfo.ID
  197. }
  198. // IsOutbound returns true if the connection is outbound, false otherwise.
  199. func (p *peer) IsOutbound() bool {
  200. return p.peerConn.outbound
  201. }
  202. // IsPersistent returns true if the peer is persitent, false otherwise.
  203. func (p *peer) IsPersistent() bool {
  204. return p.peerConn.persistent
  205. }
  206. // NodeInfo returns a copy of the peer's NodeInfo.
  207. func (p *peer) NodeInfo() NodeInfo {
  208. return p.nodeInfo
  209. }
  210. // Status returns the peer's ConnectionStatus.
  211. func (p *peer) Status() tmconn.ConnectionStatus {
  212. return p.mconn.Status()
  213. }
  214. // Send msg bytes to the channel identified by chID byte. Returns false if the
  215. // send queue is full after timeout, specified by MConnection.
  216. func (p *peer) Send(chID byte, msgBytes []byte) bool {
  217. if !p.IsRunning() {
  218. // see Switch#Broadcast, where we fetch the list of peers and loop over
  219. // them - while we're looping, one peer may be removed and stopped.
  220. return false
  221. } else if !p.hasChannel(chID) {
  222. return false
  223. }
  224. return p.mconn.Send(chID, msgBytes)
  225. }
  226. // TrySend msg bytes to the channel identified by chID byte. Immediately returns
  227. // false if the send queue is full.
  228. func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
  229. if !p.IsRunning() {
  230. return false
  231. } else if !p.hasChannel(chID) {
  232. return false
  233. }
  234. return p.mconn.TrySend(chID, msgBytes)
  235. }
  236. // Get the data for a given key.
  237. func (p *peer) Get(key string) interface{} {
  238. return p.Data.Get(key)
  239. }
  240. // Set sets the data for the given key.
  241. func (p *peer) Set(key string, data interface{}) {
  242. p.Data.Set(key, data)
  243. }
  244. // hasChannel returns true if the peer reported
  245. // knowing about the given chID.
  246. func (p *peer) hasChannel(chID byte) bool {
  247. for _, ch := range p.channels {
  248. if ch == chID {
  249. return true
  250. }
  251. }
  252. // NOTE: probably will want to remove this
  253. // but could be helpful while the feature is new
  254. p.Logger.Debug(
  255. "Unknown channel for peer",
  256. "channel",
  257. chID,
  258. "channels",
  259. p.channels,
  260. )
  261. return false
  262. }
  263. //---------------------------------------------------
  264. // methods used by the Switch
  265. // CloseConn should be called by the Switch if the peer was created but never
  266. // started.
  267. func (pc *peerConn) CloseConn() {
  268. pc.conn.Close() // nolint: errcheck
  269. }
  270. // HandshakeTimeout performs the Tendermint P2P handshake between a given node
  271. // and the peer by exchanging their NodeInfo. It sets the received nodeInfo on
  272. // the peer.
  273. // NOTE: blocking
  274. func (pc *peerConn) HandshakeTimeout(
  275. ourNodeInfo NodeInfo,
  276. timeout time.Duration,
  277. ) (peerNodeInfo NodeInfo, err error) {
  278. // Set deadline for handshake so we don't block forever on conn.ReadFull
  279. if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
  280. return peerNodeInfo, cmn.ErrorWrap(err, "Error setting deadline")
  281. }
  282. var trs, _ = cmn.Parallel(
  283. func(_ int) (val interface{}, err error, abort bool) {
  284. _, err = cdc.MarshalBinaryWriter(pc.conn, ourNodeInfo)
  285. return
  286. },
  287. func(_ int) (val interface{}, err error, abort bool) {
  288. _, err = cdc.UnmarshalBinaryReader(
  289. pc.conn,
  290. &peerNodeInfo,
  291. int64(MaxNodeInfoSize()),
  292. )
  293. return
  294. },
  295. )
  296. if err := trs.FirstError(); err != nil {
  297. return peerNodeInfo, cmn.ErrorWrap(err, "Error during handshake")
  298. }
  299. // Remove deadline
  300. if err := pc.conn.SetDeadline(time.Time{}); err != nil {
  301. return peerNodeInfo, cmn.ErrorWrap(err, "Error removing deadline")
  302. }
  303. return peerNodeInfo, nil
  304. }
  305. // Addr returns peer's remote network address.
  306. func (p *peer) Addr() net.Addr {
  307. return p.peerConn.conn.RemoteAddr()
  308. }
  309. // CanSend returns true if the send queue is not full, false otherwise.
  310. func (p *peer) CanSend(chID byte) bool {
  311. if !p.IsRunning() {
  312. return false
  313. }
  314. return p.mconn.CanSend(chID)
  315. }
  316. // String representation.
  317. func (p *peer) String() string {
  318. if p.outbound {
  319. return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID())
  320. }
  321. return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
  322. }
  323. //------------------------------------------------------------------
  324. // helper funcs
  325. func dial(addr *NetAddress, cfg *config.P2PConfig) (net.Conn, error) {
  326. if cfg.TestDialFail {
  327. return nil, fmt.Errorf("dial err (peerConfig.DialFail == true)")
  328. }
  329. conn, err := addr.DialTimeout(cfg.DialTimeout)
  330. if err != nil {
  331. return nil, err
  332. }
  333. return conn, nil
  334. }
  335. func createMConnection(
  336. conn net.Conn,
  337. p *peer,
  338. reactorsByCh map[byte]Reactor,
  339. chDescs []*tmconn.ChannelDescriptor,
  340. onPeerError func(Peer, interface{}),
  341. config tmconn.MConnConfig,
  342. ) *tmconn.MConnection {
  343. onReceive := func(chID byte, msgBytes []byte) {
  344. reactor := reactorsByCh[chID]
  345. if reactor == nil {
  346. // Note that its ok to panic here as it's caught in the conn._recover,
  347. // which does onPeerError.
  348. panic(cmn.Fmt("Unknown channel %X", chID))
  349. }
  350. reactor.Receive(chID, p, msgBytes)
  351. }
  352. onError := func(r interface{}) {
  353. onPeerError(p, r)
  354. }
  355. return tmconn.NewMConnectionWithConfig(
  356. conn,
  357. chDescs,
  358. onReceive,
  359. onError,
  360. config,
  361. )
  362. }