You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

662 lines
19 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "math"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/tendermint/tendermint/config"
  9. "github.com/tendermint/tendermint/p2p/conn"
  10. cmn "github.com/tendermint/tendermint/libs/common"
  11. )
  12. const (
  13. // wait a random amount of time from this interval
  14. // before dialing peers or reconnecting to help prevent DoS
  15. dialRandomizerIntervalMilliseconds = 3000
  16. // repeatedly try to reconnect for a few minutes
  17. // ie. 5 * 20 = 100s
  18. reconnectAttempts = 20
  19. reconnectInterval = 5 * time.Second
  20. // then move into exponential backoff mode for ~1day
  21. // ie. 3**10 = 16hrs
  22. reconnectBackOffAttempts = 10
  23. reconnectBackOffBaseSeconds = 3
  24. // keep at least this many outbound peers
  25. // TODO: move to config
  26. DefaultMinNumOutboundPeers = 10
  27. )
  28. //-----------------------------------------------------------------------------
  29. // An AddrBook represents an address book from the pex package, which is used
  30. // to store peer addresses.
  31. type AddrBook interface {
  32. AddAddress(addr *NetAddress, src *NetAddress) error
  33. AddOurAddress(*NetAddress)
  34. OurAddress(*NetAddress) bool
  35. MarkGood(*NetAddress)
  36. RemoveAddress(*NetAddress)
  37. HasAddress(*NetAddress) bool
  38. Save()
  39. }
  40. //-----------------------------------------------------------------------------
  41. // Switch handles peer connections and exposes an API to receive incoming messages
  42. // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  43. // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  44. // incoming messages are received on the reactor.
  45. type Switch struct {
  46. cmn.BaseService
  47. config *config.P2PConfig
  48. listeners []Listener
  49. reactors map[string]Reactor
  50. chDescs []*conn.ChannelDescriptor
  51. reactorsByCh map[byte]Reactor
  52. peers *PeerSet
  53. dialing *cmn.CMap
  54. reconnecting *cmn.CMap
  55. nodeInfo NodeInfo // our node info
  56. nodeKey *NodeKey // our node privkey
  57. addrBook AddrBook
  58. filterConnByAddr func(net.Addr) error
  59. filterConnByID func(ID) error
  60. mConfig conn.MConnConfig
  61. rng *cmn.Rand // seed for randomizing dial times and orders
  62. metrics *Metrics
  63. }
  64. // SwitchOption sets an optional parameter on the Switch.
  65. type SwitchOption func(*Switch)
  66. // NewSwitch creates a new Switch with the given config.
  67. func NewSwitch(cfg *config.P2PConfig, options ...SwitchOption) *Switch {
  68. sw := &Switch{
  69. config: cfg,
  70. reactors: make(map[string]Reactor),
  71. chDescs: make([]*conn.ChannelDescriptor, 0),
  72. reactorsByCh: make(map[byte]Reactor),
  73. peers: NewPeerSet(),
  74. dialing: cmn.NewCMap(),
  75. reconnecting: cmn.NewCMap(),
  76. metrics: NopMetrics(),
  77. }
  78. // Ensure we have a completely undeterministic PRNG.
  79. sw.rng = cmn.NewRand()
  80. mConfig := conn.DefaultMConnConfig()
  81. mConfig.FlushThrottle = time.Duration(cfg.FlushThrottleTimeout) * time.Millisecond
  82. mConfig.SendRate = cfg.SendRate
  83. mConfig.RecvRate = cfg.RecvRate
  84. mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize
  85. sw.mConfig = mConfig
  86. sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
  87. for _, option := range options {
  88. option(sw)
  89. }
  90. return sw
  91. }
  92. // WithMetrics sets the metrics.
  93. func WithMetrics(metrics *Metrics) SwitchOption {
  94. return func(sw *Switch) { sw.metrics = metrics }
  95. }
  96. //---------------------------------------------------------------------
  97. // Switch setup
  98. // AddReactor adds the given reactor to the switch.
  99. // NOTE: Not goroutine safe.
  100. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  101. // Validate the reactor.
  102. // No two reactors can share the same channel.
  103. reactorChannels := reactor.GetChannels()
  104. for _, chDesc := range reactorChannels {
  105. chID := chDesc.ID
  106. if sw.reactorsByCh[chID] != nil {
  107. cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
  108. }
  109. sw.chDescs = append(sw.chDescs, chDesc)
  110. sw.reactorsByCh[chID] = reactor
  111. }
  112. sw.reactors[name] = reactor
  113. reactor.SetSwitch(sw)
  114. return reactor
  115. }
  116. // Reactors returns a map of reactors registered on the switch.
  117. // NOTE: Not goroutine safe.
  118. func (sw *Switch) Reactors() map[string]Reactor {
  119. return sw.reactors
  120. }
  121. // Reactor returns the reactor with the given name.
  122. // NOTE: Not goroutine safe.
  123. func (sw *Switch) Reactor(name string) Reactor {
  124. return sw.reactors[name]
  125. }
  126. // AddListener adds the given listener to the switch for listening to incoming peer connections.
  127. // NOTE: Not goroutine safe.
  128. func (sw *Switch) AddListener(l Listener) {
  129. sw.listeners = append(sw.listeners, l)
  130. }
  131. // Listeners returns the list of listeners the switch listens on.
  132. // NOTE: Not goroutine safe.
  133. func (sw *Switch) Listeners() []Listener {
  134. return sw.listeners
  135. }
  136. // IsListening returns true if the switch has at least one listener.
  137. // NOTE: Not goroutine safe.
  138. func (sw *Switch) IsListening() bool {
  139. return len(sw.listeners) > 0
  140. }
  141. // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
  142. // NOTE: Not goroutine safe.
  143. func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo) {
  144. sw.nodeInfo = nodeInfo
  145. }
  146. // NodeInfo returns the switch's NodeInfo.
  147. // NOTE: Not goroutine safe.
  148. func (sw *Switch) NodeInfo() NodeInfo {
  149. return sw.nodeInfo
  150. }
  151. // SetNodeKey sets the switch's private key for authenticated encryption.
  152. // NOTE: Not goroutine safe.
  153. func (sw *Switch) SetNodeKey(nodeKey *NodeKey) {
  154. sw.nodeKey = nodeKey
  155. }
  156. //---------------------------------------------------------------------
  157. // Service start/stop
  158. // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
  159. func (sw *Switch) OnStart() error {
  160. // Start reactors
  161. for _, reactor := range sw.reactors {
  162. err := reactor.Start()
  163. if err != nil {
  164. return cmn.ErrorWrap(err, "failed to start %v", reactor)
  165. }
  166. }
  167. // Start listeners
  168. for _, listener := range sw.listeners {
  169. go sw.listenerRoutine(listener)
  170. }
  171. return nil
  172. }
  173. // OnStop implements BaseService. It stops all listeners, peers, and reactors.
  174. func (sw *Switch) OnStop() {
  175. // Stop listeners
  176. for _, listener := range sw.listeners {
  177. listener.Stop()
  178. }
  179. sw.listeners = nil
  180. // Stop peers
  181. for _, peer := range sw.peers.List() {
  182. peer.Stop()
  183. sw.peers.Remove(peer)
  184. }
  185. // Stop reactors
  186. sw.Logger.Debug("Switch: Stopping reactors")
  187. for _, reactor := range sw.reactors {
  188. reactor.Stop()
  189. }
  190. }
  191. //---------------------------------------------------------------------
  192. // Peers
  193. // Broadcast runs a go routine for each attempted send, which will block trying
  194. // to send for defaultSendTimeoutSeconds. Returns a channel which receives
  195. // success values for each attempted send (false if times out). Channel will be
  196. // closed once msg bytes are sent to all peers (or time out).
  197. //
  198. // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
  199. func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
  200. successChan := make(chan bool, len(sw.peers.List()))
  201. sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  202. var wg sync.WaitGroup
  203. for _, peer := range sw.peers.List() {
  204. wg.Add(1)
  205. go func(peer Peer) {
  206. defer wg.Done()
  207. success := peer.Send(chID, msgBytes)
  208. successChan <- success
  209. }(peer)
  210. }
  211. go func() {
  212. wg.Wait()
  213. close(successChan)
  214. }()
  215. return successChan
  216. }
  217. // NumPeers returns the count of outbound/inbound and outbound-dialing peers.
  218. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  219. peers := sw.peers.List()
  220. for _, peer := range peers {
  221. if peer.IsOutbound() {
  222. outbound++
  223. } else {
  224. inbound++
  225. }
  226. }
  227. dialing = sw.dialing.Size()
  228. return
  229. }
  230. // Peers returns the set of peers that are connected to the switch.
  231. func (sw *Switch) Peers() IPeerSet {
  232. return sw.peers
  233. }
  234. // StopPeerForError disconnects from a peer due to external error.
  235. // If the peer is persistent, it will attempt to reconnect.
  236. // TODO: make record depending on reason.
  237. func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
  238. sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason)
  239. sw.stopAndRemovePeer(peer, reason)
  240. if peer.IsPersistent() {
  241. // NOTE: this is the self-reported addr, not the original we dialed
  242. go sw.reconnectToPeer(peer.NodeInfo().NetAddress())
  243. }
  244. }
  245. // StopPeerGracefully disconnects from a peer gracefully.
  246. // TODO: handle graceful disconnects.
  247. func (sw *Switch) StopPeerGracefully(peer Peer) {
  248. sw.Logger.Info("Stopping peer gracefully")
  249. sw.stopAndRemovePeer(peer, nil)
  250. }
  251. func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
  252. sw.peers.Remove(peer)
  253. sw.metrics.Peers.Add(float64(-1))
  254. peer.Stop()
  255. for _, reactor := range sw.reactors {
  256. reactor.RemovePeer(peer, reason)
  257. }
  258. }
  259. // reconnectToPeer tries to reconnect to the addr, first repeatedly
  260. // with a fixed interval, then with exponential backoff.
  261. // If no success after all that, it stops trying, and leaves it
  262. // to the PEX/Addrbook to find the peer with the addr again
  263. // NOTE: this will keep trying even if the handshake or auth fails.
  264. // TODO: be more explicit with error types so we only retry on certain failures
  265. // - ie. if we're getting ErrDuplicatePeer we can stop
  266. // because the addrbook got us the peer back already
  267. func (sw *Switch) reconnectToPeer(addr *NetAddress) {
  268. if sw.reconnecting.Has(string(addr.ID)) {
  269. return
  270. }
  271. sw.reconnecting.Set(string(addr.ID), addr)
  272. defer sw.reconnecting.Delete(string(addr.ID))
  273. start := time.Now()
  274. sw.Logger.Info("Reconnecting to peer", "addr", addr)
  275. for i := 0; i < reconnectAttempts; i++ {
  276. if !sw.IsRunning() {
  277. return
  278. }
  279. err := sw.DialPeerWithAddress(addr, true)
  280. if err == nil {
  281. return // success
  282. }
  283. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  284. // sleep a set amount
  285. sw.randomSleep(reconnectInterval)
  286. continue
  287. }
  288. sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff",
  289. "addr", addr, "elapsed", time.Since(start))
  290. for i := 0; i < reconnectBackOffAttempts; i++ {
  291. if !sw.IsRunning() {
  292. return
  293. }
  294. // sleep an exponentially increasing amount
  295. sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
  296. sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
  297. err := sw.DialPeerWithAddress(addr, true)
  298. if err == nil {
  299. return // success
  300. }
  301. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  302. }
  303. sw.Logger.Error("Failed to reconnect to peer. Giving up", "addr", addr, "elapsed", time.Since(start))
  304. }
  305. // SetAddrBook allows to set address book on Switch.
  306. func (sw *Switch) SetAddrBook(addrBook AddrBook) {
  307. sw.addrBook = addrBook
  308. }
  309. // MarkPeerAsGood marks the given peer as good when it did something useful
  310. // like contributed to consensus.
  311. func (sw *Switch) MarkPeerAsGood(peer Peer) {
  312. if sw.addrBook != nil {
  313. sw.addrBook.MarkGood(peer.NodeInfo().NetAddress())
  314. }
  315. }
  316. //---------------------------------------------------------------------
  317. // Dialing
  318. // IsDialing returns true if the switch is currently dialing the given ID.
  319. func (sw *Switch) IsDialing(id ID) bool {
  320. return sw.dialing.Has(string(id))
  321. }
  322. // DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
  323. // Used to dial peers from config on startup or from unsafe-RPC (trusted sources).
  324. // TODO: remove addrBook arg since it's now set on the switch
  325. func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error {
  326. netAddrs, errs := NewNetAddressStrings(peers)
  327. // only log errors, dial correct addresses
  328. for _, err := range errs {
  329. sw.Logger.Error("Error in peer's address", "err", err)
  330. }
  331. ourAddr := sw.nodeInfo.NetAddress()
  332. // TODO: this code feels like it's in the wrong place.
  333. // The integration tests depend on the addrBook being saved
  334. // right away but maybe we can change that. Recall that
  335. // the addrBook is only written to disk every 2min
  336. if addrBook != nil {
  337. // add peers to `addrBook`
  338. for _, netAddr := range netAddrs {
  339. // do not add our address or ID
  340. if !netAddr.Same(ourAddr) {
  341. if err := addrBook.AddAddress(netAddr, ourAddr); err != nil {
  342. sw.Logger.Error("Can't add peer's address to addrbook", "err", err)
  343. }
  344. }
  345. }
  346. // Persist some peers to disk right away.
  347. // NOTE: integration tests depend on this
  348. addrBook.Save()
  349. }
  350. // permute the list, dial them in random order.
  351. perm := sw.rng.Perm(len(netAddrs))
  352. for i := 0; i < len(perm); i++ {
  353. go func(i int) {
  354. j := perm[i]
  355. addr := netAddrs[j]
  356. // do not dial ourselves
  357. if addr.Same(ourAddr) {
  358. return
  359. }
  360. sw.randomSleep(0)
  361. err := sw.DialPeerWithAddress(addr, persistent)
  362. if err != nil {
  363. switch err.(type) {
  364. case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID:
  365. sw.Logger.Debug("Error dialing peer", "err", err)
  366. default:
  367. sw.Logger.Error("Error dialing peer", "err", err)
  368. }
  369. }
  370. }(i)
  371. }
  372. return nil
  373. }
  374. // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully.
  375. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
  376. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error {
  377. sw.dialing.Set(string(addr.ID), addr)
  378. defer sw.dialing.Delete(string(addr.ID))
  379. return sw.addOutboundPeerWithConfig(addr, sw.config, persistent)
  380. }
  381. // sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds]
  382. func (sw *Switch) randomSleep(interval time.Duration) {
  383. r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond
  384. time.Sleep(r + interval)
  385. }
  386. //------------------------------------------------------------------------------------
  387. // Connection filtering
  388. // FilterConnByAddr returns an error if connecting to the given address is forbidden.
  389. func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
  390. if sw.filterConnByAddr != nil {
  391. return sw.filterConnByAddr(addr)
  392. }
  393. return nil
  394. }
  395. // FilterConnByID returns an error if connecting to the given peer ID is forbidden.
  396. func (sw *Switch) FilterConnByID(id ID) error {
  397. if sw.filterConnByID != nil {
  398. return sw.filterConnByID(id)
  399. }
  400. return nil
  401. }
  402. // SetAddrFilter sets the function for filtering connections by address.
  403. func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
  404. sw.filterConnByAddr = f
  405. }
  406. // SetIDFilter sets the function for filtering connections by peer ID.
  407. func (sw *Switch) SetIDFilter(f func(ID) error) {
  408. sw.filterConnByID = f
  409. }
  410. //------------------------------------------------------------------------------------
  411. func (sw *Switch) listenerRoutine(l Listener) {
  412. for {
  413. inConn, ok := <-l.Connections()
  414. if !ok {
  415. break
  416. }
  417. // ignore connection if we already have enough
  418. // leave room for MinNumOutboundPeers
  419. maxPeers := sw.config.MaxNumPeers - DefaultMinNumOutboundPeers
  420. if maxPeers <= sw.peers.Size() {
  421. sw.Logger.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers)
  422. continue
  423. }
  424. // New inbound connection!
  425. err := sw.addInboundPeerWithConfig(inConn, sw.config)
  426. if err != nil {
  427. sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err)
  428. continue
  429. }
  430. }
  431. // cleanup
  432. }
  433. func (sw *Switch) addInboundPeerWithConfig(
  434. conn net.Conn,
  435. config *config.P2PConfig,
  436. ) error {
  437. peerConn, err := newInboundPeerConn(conn, config, sw.nodeKey.PrivKey)
  438. if err != nil {
  439. conn.Close() // peer is nil
  440. return err
  441. }
  442. if err = sw.addPeer(peerConn); err != nil {
  443. peerConn.CloseConn()
  444. return err
  445. }
  446. return nil
  447. }
  448. // dial the peer; make secret connection; authenticate against the dialed ID;
  449. // add the peer.
  450. // if dialing fails, start the reconnect loop. If handhsake fails, its over.
  451. // If peer is started succesffuly, reconnectLoop will start when
  452. // StopPeerForError is called
  453. func (sw *Switch) addOutboundPeerWithConfig(
  454. addr *NetAddress,
  455. config *config.P2PConfig,
  456. persistent bool,
  457. ) error {
  458. sw.Logger.Info("Dialing peer", "address", addr)
  459. peerConn, err := newOutboundPeerConn(
  460. addr,
  461. config,
  462. persistent,
  463. sw.nodeKey.PrivKey,
  464. )
  465. if err != nil {
  466. if persistent {
  467. go sw.reconnectToPeer(addr)
  468. }
  469. return err
  470. }
  471. if err := sw.addPeer(peerConn); err != nil {
  472. peerConn.CloseConn()
  473. return err
  474. }
  475. return nil
  476. }
  477. // addPeer performs the Tendermint P2P handshake with a peer
  478. // that already has a SecretConnection. If all goes well,
  479. // it starts the peer and adds it to the switch.
  480. // NOTE: This performs a blocking handshake before the peer is added.
  481. // NOTE: If error is returned, caller is responsible for calling
  482. // peer.CloseConn()
  483. func (sw *Switch) addPeer(pc peerConn) error {
  484. addr := pc.conn.RemoteAddr()
  485. if err := sw.FilterConnByAddr(addr); err != nil {
  486. return err
  487. }
  488. // Exchange NodeInfo on the conn
  489. peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.HandshakeTimeout))
  490. if err != nil {
  491. return err
  492. }
  493. peerID := peerNodeInfo.ID
  494. // ensure connection key matches self reported key
  495. connID := pc.ID()
  496. if peerID != connID {
  497. return fmt.Errorf(
  498. "nodeInfo.ID() (%v) doesn't match conn.ID() (%v)",
  499. peerID,
  500. connID,
  501. )
  502. }
  503. // Validate the peers nodeInfo
  504. if err := peerNodeInfo.Validate(); err != nil {
  505. return err
  506. }
  507. // Avoid self
  508. if sw.nodeKey.ID() == peerID {
  509. addr := peerNodeInfo.NetAddress()
  510. // remove the given address from the address book
  511. // and add to our addresses to avoid dialing again
  512. sw.addrBook.RemoveAddress(addr)
  513. sw.addrBook.AddOurAddress(addr)
  514. return ErrSwitchConnectToSelf{addr}
  515. }
  516. // Avoid duplicate
  517. if sw.peers.Has(peerID) {
  518. return ErrSwitchDuplicatePeerID{peerID}
  519. }
  520. // Check for duplicate connection or peer info IP.
  521. if !sw.config.AllowDuplicateIP &&
  522. (sw.peers.HasIP(pc.RemoteIP()) ||
  523. sw.peers.HasIP(peerNodeInfo.NetAddress().IP)) {
  524. return ErrSwitchDuplicatePeerIP{pc.RemoteIP()}
  525. }
  526. // Filter peer against ID white list
  527. if err := sw.FilterConnByID(peerID); err != nil {
  528. return err
  529. }
  530. // Check version, chain id
  531. if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
  532. return err
  533. }
  534. peer := newPeer(pc, sw.mConfig, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  535. peer.SetLogger(sw.Logger.With("peer", addr))
  536. peer.Logger.Info("Successful handshake with peer", "peerNodeInfo", peerNodeInfo)
  537. // All good. Start peer
  538. if sw.IsRunning() {
  539. if err = sw.startInitPeer(peer); err != nil {
  540. return err
  541. }
  542. }
  543. // Add the peer to .peers.
  544. // We start it first so that a peer in the list is safe to Stop.
  545. // It should not err since we already checked peers.Has().
  546. if err := sw.peers.Add(peer); err != nil {
  547. return err
  548. }
  549. sw.metrics.Peers.Add(float64(1))
  550. sw.Logger.Info("Added peer", "peer", peer)
  551. return nil
  552. }
  553. func (sw *Switch) startInitPeer(peer *peer) error {
  554. err := peer.Start() // spawn send/recv routines
  555. if err != nil {
  556. // Should never happen
  557. sw.Logger.Error("Error starting peer", "peer", peer, "err", err)
  558. return err
  559. }
  560. for _, reactor := range sw.reactors {
  561. reactor.AddPeer(peer)
  562. }
  563. return nil
  564. }