You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

674 lines
19 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "math"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/tendermint/tendermint/config"
  9. cmn "github.com/tendermint/tendermint/libs/common"
  10. "github.com/tendermint/tendermint/p2p/conn"
  11. )
  12. const (
  13. // wait a random amount of time from this interval
  14. // before dialing peers or reconnecting to help prevent DoS
  15. dialRandomizerIntervalMilliseconds = 3000
  16. // repeatedly try to reconnect for a few minutes
  17. // ie. 5 * 20 = 100s
  18. reconnectAttempts = 20
  19. reconnectInterval = 5 * time.Second
  20. // then move into exponential backoff mode for ~1day
  21. // ie. 3**10 = 16hrs
  22. reconnectBackOffAttempts = 10
  23. reconnectBackOffBaseSeconds = 3
  24. )
  25. //-----------------------------------------------------------------------------
  26. // An AddrBook represents an address book from the pex package, which is used
  27. // to store peer addresses.
  28. type AddrBook interface {
  29. AddAddress(addr *NetAddress, src *NetAddress) error
  30. AddOurAddress(*NetAddress)
  31. OurAddress(*NetAddress) bool
  32. MarkGood(*NetAddress)
  33. RemoveAddress(*NetAddress)
  34. HasAddress(*NetAddress) bool
  35. Save()
  36. }
  37. //-----------------------------------------------------------------------------
  38. // Switch handles peer connections and exposes an API to receive incoming messages
  39. // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  40. // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  41. // incoming messages are received on the reactor.
  42. type Switch struct {
  43. cmn.BaseService
  44. config *config.P2PConfig
  45. listeners []Listener
  46. reactors map[string]Reactor
  47. chDescs []*conn.ChannelDescriptor
  48. reactorsByCh map[byte]Reactor
  49. peers *PeerSet
  50. dialing *cmn.CMap
  51. reconnecting *cmn.CMap
  52. nodeInfo NodeInfo // our node info
  53. nodeKey *NodeKey // our node privkey
  54. addrBook AddrBook
  55. filterConnByAddr func(net.Addr) error
  56. filterConnByID func(ID) error
  57. mConfig conn.MConnConfig
  58. rng *cmn.Rand // seed for randomizing dial times and orders
  59. metrics *Metrics
  60. }
  61. // SwitchOption sets an optional parameter on the Switch.
  62. type SwitchOption func(*Switch)
  63. // NewSwitch creates a new Switch with the given config.
  64. func NewSwitch(cfg *config.P2PConfig, options ...SwitchOption) *Switch {
  65. sw := &Switch{
  66. config: cfg,
  67. reactors: make(map[string]Reactor),
  68. chDescs: make([]*conn.ChannelDescriptor, 0),
  69. reactorsByCh: make(map[byte]Reactor),
  70. peers: NewPeerSet(),
  71. dialing: cmn.NewCMap(),
  72. reconnecting: cmn.NewCMap(),
  73. metrics: NopMetrics(),
  74. }
  75. // Ensure we have a completely undeterministic PRNG.
  76. sw.rng = cmn.NewRand()
  77. mConfig := conn.DefaultMConnConfig()
  78. mConfig.FlushThrottle = time.Duration(cfg.FlushThrottleTimeout) * time.Millisecond
  79. mConfig.SendRate = cfg.SendRate
  80. mConfig.RecvRate = cfg.RecvRate
  81. mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize
  82. sw.mConfig = mConfig
  83. sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
  84. for _, option := range options {
  85. option(sw)
  86. }
  87. return sw
  88. }
  89. // WithMetrics sets the metrics.
  90. func WithMetrics(metrics *Metrics) SwitchOption {
  91. return func(sw *Switch) { sw.metrics = metrics }
  92. }
  93. //---------------------------------------------------------------------
  94. // Switch setup
  95. // AddReactor adds the given reactor to the switch.
  96. // NOTE: Not goroutine safe.
  97. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  98. // Validate the reactor.
  99. // No two reactors can share the same channel.
  100. reactorChannels := reactor.GetChannels()
  101. for _, chDesc := range reactorChannels {
  102. chID := chDesc.ID
  103. if sw.reactorsByCh[chID] != nil {
  104. cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
  105. }
  106. sw.chDescs = append(sw.chDescs, chDesc)
  107. sw.reactorsByCh[chID] = reactor
  108. }
  109. sw.reactors[name] = reactor
  110. reactor.SetSwitch(sw)
  111. return reactor
  112. }
  113. // Reactors returns a map of reactors registered on the switch.
  114. // NOTE: Not goroutine safe.
  115. func (sw *Switch) Reactors() map[string]Reactor {
  116. return sw.reactors
  117. }
  118. // Reactor returns the reactor with the given name.
  119. // NOTE: Not goroutine safe.
  120. func (sw *Switch) Reactor(name string) Reactor {
  121. return sw.reactors[name]
  122. }
  123. // AddListener adds the given listener to the switch for listening to incoming peer connections.
  124. // NOTE: Not goroutine safe.
  125. func (sw *Switch) AddListener(l Listener) {
  126. sw.listeners = append(sw.listeners, l)
  127. }
  128. // Listeners returns the list of listeners the switch listens on.
  129. // NOTE: Not goroutine safe.
  130. func (sw *Switch) Listeners() []Listener {
  131. return sw.listeners
  132. }
  133. // IsListening returns true if the switch has at least one listener.
  134. // NOTE: Not goroutine safe.
  135. func (sw *Switch) IsListening() bool {
  136. return len(sw.listeners) > 0
  137. }
  138. // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
  139. // NOTE: Not goroutine safe.
  140. func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo) {
  141. sw.nodeInfo = nodeInfo
  142. }
  143. // NodeInfo returns the switch's NodeInfo.
  144. // NOTE: Not goroutine safe.
  145. func (sw *Switch) NodeInfo() NodeInfo {
  146. return sw.nodeInfo
  147. }
  148. // SetNodeKey sets the switch's private key for authenticated encryption.
  149. // NOTE: Not goroutine safe.
  150. func (sw *Switch) SetNodeKey(nodeKey *NodeKey) {
  151. sw.nodeKey = nodeKey
  152. }
  153. //---------------------------------------------------------------------
  154. // Service start/stop
  155. // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
  156. func (sw *Switch) OnStart() error {
  157. // Start reactors
  158. for _, reactor := range sw.reactors {
  159. err := reactor.Start()
  160. if err != nil {
  161. return cmn.ErrorWrap(err, "failed to start %v", reactor)
  162. }
  163. }
  164. // Start listeners
  165. for _, listener := range sw.listeners {
  166. go sw.listenerRoutine(listener)
  167. }
  168. return nil
  169. }
  170. // OnStop implements BaseService. It stops all listeners, peers, and reactors.
  171. func (sw *Switch) OnStop() {
  172. // Stop listeners
  173. for _, listener := range sw.listeners {
  174. listener.Stop()
  175. }
  176. sw.listeners = nil
  177. // Stop peers
  178. for _, peer := range sw.peers.List() {
  179. peer.Stop()
  180. sw.peers.Remove(peer)
  181. }
  182. // Stop reactors
  183. sw.Logger.Debug("Switch: Stopping reactors")
  184. for _, reactor := range sw.reactors {
  185. reactor.Stop()
  186. }
  187. }
  188. //---------------------------------------------------------------------
  189. // Peers
  190. // Broadcast runs a go routine for each attempted send, which will block trying
  191. // to send for defaultSendTimeoutSeconds. Returns a channel which receives
  192. // success values for each attempted send (false if times out). Channel will be
  193. // closed once msg bytes are sent to all peers (or time out).
  194. //
  195. // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
  196. func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
  197. successChan := make(chan bool, len(sw.peers.List()))
  198. sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  199. var wg sync.WaitGroup
  200. for _, peer := range sw.peers.List() {
  201. wg.Add(1)
  202. go func(peer Peer) {
  203. defer wg.Done()
  204. success := peer.Send(chID, msgBytes)
  205. successChan <- success
  206. }(peer)
  207. }
  208. go func() {
  209. wg.Wait()
  210. close(successChan)
  211. }()
  212. return successChan
  213. }
  214. // NumPeers returns the count of outbound/inbound and outbound-dialing peers.
  215. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  216. peers := sw.peers.List()
  217. for _, peer := range peers {
  218. if peer.IsOutbound() {
  219. outbound++
  220. } else {
  221. inbound++
  222. }
  223. }
  224. dialing = sw.dialing.Size()
  225. return
  226. }
  227. // MaxNumOutboundPeers returns a maximum number of outbound peers.
  228. func (sw *Switch) MaxNumOutboundPeers() int {
  229. return sw.config.MaxNumOutboundPeers
  230. }
  231. // Peers returns the set of peers that are connected to the switch.
  232. func (sw *Switch) Peers() IPeerSet {
  233. return sw.peers
  234. }
  235. // StopPeerForError disconnects from a peer due to external error.
  236. // If the peer is persistent, it will attempt to reconnect.
  237. // TODO: make record depending on reason.
  238. func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
  239. sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason)
  240. sw.stopAndRemovePeer(peer, reason)
  241. if peer.IsPersistent() {
  242. addr := peer.OriginalAddr()
  243. if addr == nil {
  244. // FIXME: persistent peers can't be inbound right now.
  245. // self-reported address for inbound persistent peers
  246. addr = peer.NodeInfo().NetAddress()
  247. }
  248. go sw.reconnectToPeer(addr)
  249. }
  250. }
  251. // StopPeerGracefully disconnects from a peer gracefully.
  252. // TODO: handle graceful disconnects.
  253. func (sw *Switch) StopPeerGracefully(peer Peer) {
  254. sw.Logger.Info("Stopping peer gracefully")
  255. sw.stopAndRemovePeer(peer, nil)
  256. }
  257. func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
  258. sw.peers.Remove(peer)
  259. sw.metrics.Peers.Add(float64(-1))
  260. peer.Stop()
  261. for _, reactor := range sw.reactors {
  262. reactor.RemovePeer(peer, reason)
  263. }
  264. }
  265. // reconnectToPeer tries to reconnect to the addr, first repeatedly
  266. // with a fixed interval, then with exponential backoff.
  267. // If no success after all that, it stops trying, and leaves it
  268. // to the PEX/Addrbook to find the peer with the addr again
  269. // NOTE: this will keep trying even if the handshake or auth fails.
  270. // TODO: be more explicit with error types so we only retry on certain failures
  271. // - ie. if we're getting ErrDuplicatePeer we can stop
  272. // because the addrbook got us the peer back already
  273. func (sw *Switch) reconnectToPeer(addr *NetAddress) {
  274. if sw.reconnecting.Has(string(addr.ID)) {
  275. return
  276. }
  277. sw.reconnecting.Set(string(addr.ID), addr)
  278. defer sw.reconnecting.Delete(string(addr.ID))
  279. start := time.Now()
  280. sw.Logger.Info("Reconnecting to peer", "addr", addr)
  281. for i := 0; i < reconnectAttempts; i++ {
  282. if !sw.IsRunning() {
  283. return
  284. }
  285. err := sw.DialPeerWithAddress(addr, true)
  286. if err == nil {
  287. return // success
  288. }
  289. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  290. // sleep a set amount
  291. sw.randomSleep(reconnectInterval)
  292. continue
  293. }
  294. sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff",
  295. "addr", addr, "elapsed", time.Since(start))
  296. for i := 0; i < reconnectBackOffAttempts; i++ {
  297. if !sw.IsRunning() {
  298. return
  299. }
  300. // sleep an exponentially increasing amount
  301. sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
  302. sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
  303. err := sw.DialPeerWithAddress(addr, true)
  304. if err == nil {
  305. return // success
  306. }
  307. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  308. }
  309. sw.Logger.Error("Failed to reconnect to peer. Giving up", "addr", addr, "elapsed", time.Since(start))
  310. }
  311. // SetAddrBook allows to set address book on Switch.
  312. func (sw *Switch) SetAddrBook(addrBook AddrBook) {
  313. sw.addrBook = addrBook
  314. }
  315. // MarkPeerAsGood marks the given peer as good when it did something useful
  316. // like contributed to consensus.
  317. func (sw *Switch) MarkPeerAsGood(peer Peer) {
  318. if sw.addrBook != nil {
  319. sw.addrBook.MarkGood(peer.NodeInfo().NetAddress())
  320. }
  321. }
  322. //---------------------------------------------------------------------
  323. // Dialing
  324. // IsDialing returns true if the switch is currently dialing the given ID.
  325. func (sw *Switch) IsDialing(id ID) bool {
  326. return sw.dialing.Has(string(id))
  327. }
  328. // DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
  329. // Used to dial peers from config on startup or from unsafe-RPC (trusted sources).
  330. // TODO: remove addrBook arg since it's now set on the switch
  331. func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error {
  332. netAddrs, errs := NewNetAddressStrings(peers)
  333. // only log errors, dial correct addresses
  334. for _, err := range errs {
  335. sw.Logger.Error("Error in peer's address", "err", err)
  336. }
  337. ourAddr := sw.nodeInfo.NetAddress()
  338. // TODO: this code feels like it's in the wrong place.
  339. // The integration tests depend on the addrBook being saved
  340. // right away but maybe we can change that. Recall that
  341. // the addrBook is only written to disk every 2min
  342. if addrBook != nil {
  343. // add peers to `addrBook`
  344. for _, netAddr := range netAddrs {
  345. // do not add our address or ID
  346. if !netAddr.Same(ourAddr) {
  347. if err := addrBook.AddAddress(netAddr, ourAddr); err != nil {
  348. sw.Logger.Error("Can't add peer's address to addrbook", "err", err)
  349. }
  350. }
  351. }
  352. // Persist some peers to disk right away.
  353. // NOTE: integration tests depend on this
  354. addrBook.Save()
  355. }
  356. // permute the list, dial them in random order.
  357. perm := sw.rng.Perm(len(netAddrs))
  358. for i := 0; i < len(perm); i++ {
  359. go func(i int) {
  360. j := perm[i]
  361. addr := netAddrs[j]
  362. // do not dial ourselves
  363. if addr.Same(ourAddr) {
  364. return
  365. }
  366. sw.randomSleep(0)
  367. err := sw.DialPeerWithAddress(addr, persistent)
  368. if err != nil {
  369. switch err.(type) {
  370. case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID:
  371. sw.Logger.Debug("Error dialing peer", "err", err)
  372. default:
  373. sw.Logger.Error("Error dialing peer", "err", err)
  374. }
  375. }
  376. }(i)
  377. }
  378. return nil
  379. }
  380. // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully.
  381. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
  382. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error {
  383. sw.dialing.Set(string(addr.ID), addr)
  384. defer sw.dialing.Delete(string(addr.ID))
  385. return sw.addOutboundPeerWithConfig(addr, sw.config, persistent)
  386. }
  387. // sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds]
  388. func (sw *Switch) randomSleep(interval time.Duration) {
  389. r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond
  390. time.Sleep(r + interval)
  391. }
  392. //------------------------------------------------------------------------------------
  393. // Connection filtering
  394. // FilterConnByAddr returns an error if connecting to the given address is forbidden.
  395. func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
  396. if sw.filterConnByAddr != nil {
  397. return sw.filterConnByAddr(addr)
  398. }
  399. return nil
  400. }
  401. // FilterConnByID returns an error if connecting to the given peer ID is forbidden.
  402. func (sw *Switch) FilterConnByID(id ID) error {
  403. if sw.filterConnByID != nil {
  404. return sw.filterConnByID(id)
  405. }
  406. return nil
  407. }
  408. // SetAddrFilter sets the function for filtering connections by address.
  409. func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
  410. sw.filterConnByAddr = f
  411. }
  412. // SetIDFilter sets the function for filtering connections by peer ID.
  413. func (sw *Switch) SetIDFilter(f func(ID) error) {
  414. sw.filterConnByID = f
  415. }
  416. //------------------------------------------------------------------------------------
  417. func (sw *Switch) listenerRoutine(l Listener) {
  418. for {
  419. inConn, ok := <-l.Connections()
  420. if !ok {
  421. break
  422. }
  423. // Ignore connection if we already have enough peers.
  424. _, in, _ := sw.NumPeers()
  425. if in >= sw.config.MaxNumInboundPeers {
  426. sw.Logger.Info(
  427. "Ignoring inbound connection: already have enough inbound peers",
  428. "address", inConn.RemoteAddr().String(),
  429. "have", in,
  430. "max", sw.config.MaxNumInboundPeers,
  431. )
  432. inConn.Close()
  433. continue
  434. }
  435. // New inbound connection!
  436. err := sw.addInboundPeerWithConfig(inConn, sw.config)
  437. if err != nil {
  438. sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err)
  439. continue
  440. }
  441. }
  442. // cleanup
  443. }
  444. // closes conn if err is returned
  445. func (sw *Switch) addInboundPeerWithConfig(
  446. conn net.Conn,
  447. config *config.P2PConfig,
  448. ) error {
  449. peerConn, err := newInboundPeerConn(conn, config, sw.nodeKey.PrivKey)
  450. if err != nil {
  451. conn.Close() // peer is nil
  452. return err
  453. }
  454. if err = sw.addPeer(peerConn); err != nil {
  455. peerConn.CloseConn()
  456. return err
  457. }
  458. return nil
  459. }
  460. // dial the peer; make secret connection; authenticate against the dialed ID;
  461. // add the peer.
  462. // if dialing fails, start the reconnect loop. If handhsake fails, its over.
  463. // If peer is started succesffuly, reconnectLoop will start when
  464. // StopPeerForError is called
  465. func (sw *Switch) addOutboundPeerWithConfig(
  466. addr *NetAddress,
  467. config *config.P2PConfig,
  468. persistent bool,
  469. ) error {
  470. sw.Logger.Info("Dialing peer", "address", addr)
  471. peerConn, err := newOutboundPeerConn(
  472. addr,
  473. config,
  474. persistent,
  475. sw.nodeKey.PrivKey,
  476. )
  477. if err != nil {
  478. if persistent {
  479. go sw.reconnectToPeer(addr)
  480. }
  481. return err
  482. }
  483. if err := sw.addPeer(peerConn); err != nil {
  484. peerConn.CloseConn()
  485. return err
  486. }
  487. return nil
  488. }
  489. // addPeer performs the Tendermint P2P handshake with a peer
  490. // that already has a SecretConnection. If all goes well,
  491. // it starts the peer and adds it to the switch.
  492. // NOTE: This performs a blocking handshake before the peer is added.
  493. // NOTE: If error is returned, caller is responsible for calling
  494. // peer.CloseConn()
  495. func (sw *Switch) addPeer(pc peerConn) error {
  496. addr := pc.conn.RemoteAddr()
  497. if err := sw.FilterConnByAddr(addr); err != nil {
  498. return err
  499. }
  500. // Exchange NodeInfo on the conn
  501. peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.HandshakeTimeout))
  502. if err != nil {
  503. return err
  504. }
  505. peerID := peerNodeInfo.ID
  506. // ensure connection key matches self reported key
  507. connID := pc.ID()
  508. if peerID != connID {
  509. return fmt.Errorf(
  510. "nodeInfo.ID() (%v) doesn't match conn.ID() (%v)",
  511. peerID,
  512. connID,
  513. )
  514. }
  515. // Validate the peers nodeInfo
  516. if err := peerNodeInfo.Validate(); err != nil {
  517. return err
  518. }
  519. // Avoid self
  520. if sw.nodeKey.ID() == peerID {
  521. addr := peerNodeInfo.NetAddress()
  522. // remove the given address from the address book
  523. // and add to our addresses to avoid dialing again
  524. sw.addrBook.RemoveAddress(addr)
  525. sw.addrBook.AddOurAddress(addr)
  526. return ErrSwitchConnectToSelf{addr}
  527. }
  528. // Avoid duplicate
  529. if sw.peers.Has(peerID) {
  530. return ErrSwitchDuplicatePeerID{peerID}
  531. }
  532. // Check for duplicate connection or peer info IP.
  533. if !sw.config.AllowDuplicateIP &&
  534. (sw.peers.HasIP(pc.RemoteIP()) ||
  535. sw.peers.HasIP(peerNodeInfo.NetAddress().IP)) {
  536. return ErrSwitchDuplicatePeerIP{pc.RemoteIP()}
  537. }
  538. // Filter peer against ID white list
  539. if err := sw.FilterConnByID(peerID); err != nil {
  540. return err
  541. }
  542. // Check version, chain id
  543. if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
  544. return err
  545. }
  546. peer := newPeer(pc, sw.mConfig, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  547. peer.SetLogger(sw.Logger.With("peer", addr))
  548. peer.Logger.Info("Successful handshake with peer", "peerNodeInfo", peerNodeInfo)
  549. // All good. Start peer
  550. if sw.IsRunning() {
  551. if err = sw.startInitPeer(peer); err != nil {
  552. return err
  553. }
  554. }
  555. // Add the peer to .peers.
  556. // We start it first so that a peer in the list is safe to Stop.
  557. // It should not err since we already checked peers.Has().
  558. if err := sw.peers.Add(peer); err != nil {
  559. return err
  560. }
  561. sw.metrics.Peers.Add(float64(1))
  562. sw.Logger.Info("Added peer", "peer", peer)
  563. return nil
  564. }
  565. func (sw *Switch) startInitPeer(peer *peer) error {
  566. err := peer.Start() // spawn send/recv routines
  567. if err != nil {
  568. // Should never happen
  569. sw.Logger.Error("Error starting peer", "peer", peer, "err", err)
  570. return err
  571. }
  572. for _, reactor := range sw.reactors {
  573. reactor.AddPeer(peer)
  574. }
  575. return nil
  576. }