You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

666 lines
18 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "math"
  5. "sync"
  6. "time"
  7. "github.com/tendermint/tendermint/config"
  8. cmn "github.com/tendermint/tendermint/libs/common"
  9. "github.com/tendermint/tendermint/p2p/conn"
  10. )
  11. const (
  12. // wait a random amount of time from this interval
  13. // before dialing peers or reconnecting to help prevent DoS
  14. dialRandomizerIntervalMilliseconds = 3000
  15. // repeatedly try to reconnect for a few minutes
  16. // ie. 5 * 20 = 100s
  17. reconnectAttempts = 20
  18. reconnectInterval = 5 * time.Second
  19. // then move into exponential backoff mode for ~1day
  20. // ie. 3**10 = 16hrs
  21. reconnectBackOffAttempts = 10
  22. reconnectBackOffBaseSeconds = 3
  23. )
  24. //-----------------------------------------------------------------------------
  25. // An AddrBook represents an address book from the pex package, which is used
  26. // to store peer addresses.
  27. type AddrBook interface {
  28. AddAddress(addr *NetAddress, src *NetAddress) error
  29. AddOurAddress(*NetAddress)
  30. OurAddress(*NetAddress) bool
  31. MarkGood(*NetAddress)
  32. RemoveAddress(*NetAddress)
  33. HasAddress(*NetAddress) bool
  34. Save()
  35. }
  36. // PeerFilterFunc to be implemented by filter hooks after a new Peer has been
  37. // fully setup.
  38. type PeerFilterFunc func(IPeerSet, Peer) error
  39. //-----------------------------------------------------------------------------
  40. // Switch handles peer connections and exposes an API to receive incoming messages
  41. // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  42. // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  43. // incoming messages are received on the reactor.
  44. type Switch struct {
  45. cmn.BaseService
  46. config *config.P2PConfig
  47. reactors map[string]Reactor
  48. chDescs []*conn.ChannelDescriptor
  49. reactorsByCh map[byte]Reactor
  50. peers *PeerSet
  51. dialing *cmn.CMap
  52. reconnecting *cmn.CMap
  53. nodeInfo NodeInfo // our node info
  54. nodeKey *NodeKey // our node privkey
  55. addrBook AddrBook
  56. transport Transport
  57. filterTimeout time.Duration
  58. peerFilters []PeerFilterFunc
  59. mConfig conn.MConnConfig
  60. rng *cmn.Rand // seed for randomizing dial times and orders
  61. metrics *Metrics
  62. }
  63. // SwitchOption sets an optional parameter on the Switch.
  64. type SwitchOption func(*Switch)
  65. // NewSwitch creates a new Switch with the given config.
  66. func NewSwitch(
  67. cfg *config.P2PConfig,
  68. transport Transport,
  69. options ...SwitchOption,
  70. ) *Switch {
  71. sw := &Switch{
  72. config: cfg,
  73. reactors: make(map[string]Reactor),
  74. chDescs: make([]*conn.ChannelDescriptor, 0),
  75. reactorsByCh: make(map[byte]Reactor),
  76. peers: NewPeerSet(),
  77. dialing: cmn.NewCMap(),
  78. reconnecting: cmn.NewCMap(),
  79. metrics: NopMetrics(),
  80. transport: transport,
  81. filterTimeout: defaultFilterTimeout,
  82. }
  83. // Ensure we have a completely undeterministic PRNG.
  84. sw.rng = cmn.NewRand()
  85. mConfig := conn.DefaultMConnConfig()
  86. mConfig.FlushThrottle = cfg.FlushThrottleTimeout
  87. mConfig.SendRate = cfg.SendRate
  88. mConfig.RecvRate = cfg.RecvRate
  89. mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize
  90. sw.mConfig = mConfig
  91. sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
  92. for _, option := range options {
  93. option(sw)
  94. }
  95. return sw
  96. }
  97. // SwitchFilterTimeout sets the timeout used for peer filters.
  98. func SwitchFilterTimeout(timeout time.Duration) SwitchOption {
  99. return func(sw *Switch) { sw.filterTimeout = timeout }
  100. }
  101. // SwitchPeerFilters sets the filters for rejection of new peers.
  102. func SwitchPeerFilters(filters ...PeerFilterFunc) SwitchOption {
  103. return func(sw *Switch) { sw.peerFilters = filters }
  104. }
  105. // WithMetrics sets the metrics.
  106. func WithMetrics(metrics *Metrics) SwitchOption {
  107. return func(sw *Switch) { sw.metrics = metrics }
  108. }
  109. //---------------------------------------------------------------------
  110. // Switch setup
  111. // AddReactor adds the given reactor to the switch.
  112. // NOTE: Not goroutine safe.
  113. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  114. // Validate the reactor.
  115. // No two reactors can share the same channel.
  116. reactorChannels := reactor.GetChannels()
  117. for _, chDesc := range reactorChannels {
  118. chID := chDesc.ID
  119. if sw.reactorsByCh[chID] != nil {
  120. cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
  121. }
  122. sw.chDescs = append(sw.chDescs, chDesc)
  123. sw.reactorsByCh[chID] = reactor
  124. }
  125. sw.reactors[name] = reactor
  126. reactor.SetSwitch(sw)
  127. return reactor
  128. }
  129. // Reactors returns a map of reactors registered on the switch.
  130. // NOTE: Not goroutine safe.
  131. func (sw *Switch) Reactors() map[string]Reactor {
  132. return sw.reactors
  133. }
  134. // Reactor returns the reactor with the given name.
  135. // NOTE: Not goroutine safe.
  136. func (sw *Switch) Reactor(name string) Reactor {
  137. return sw.reactors[name]
  138. }
  139. // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
  140. // NOTE: Not goroutine safe.
  141. func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo) {
  142. sw.nodeInfo = nodeInfo
  143. }
  144. // NodeInfo returns the switch's NodeInfo.
  145. // NOTE: Not goroutine safe.
  146. func (sw *Switch) NodeInfo() NodeInfo {
  147. return sw.nodeInfo
  148. }
  149. // SetNodeKey sets the switch's private key for authenticated encryption.
  150. // NOTE: Not goroutine safe.
  151. func (sw *Switch) SetNodeKey(nodeKey *NodeKey) {
  152. sw.nodeKey = nodeKey
  153. }
  154. //---------------------------------------------------------------------
  155. // Service start/stop
  156. // OnStart implements BaseService. It starts all the reactors and peers.
  157. func (sw *Switch) OnStart() error {
  158. // Start reactors
  159. for _, reactor := range sw.reactors {
  160. err := reactor.Start()
  161. if err != nil {
  162. return cmn.ErrorWrap(err, "failed to start %v", reactor)
  163. }
  164. }
  165. // Start accepting Peers.
  166. go sw.acceptRoutine()
  167. return nil
  168. }
  169. // OnStop implements BaseService. It stops all peers and reactors.
  170. func (sw *Switch) OnStop() {
  171. // Stop peers
  172. for _, p := range sw.peers.List() {
  173. p.Stop()
  174. sw.peers.Remove(p)
  175. }
  176. // Stop reactors
  177. sw.Logger.Debug("Switch: Stopping reactors")
  178. for _, reactor := range sw.reactors {
  179. reactor.Stop()
  180. }
  181. }
  182. //---------------------------------------------------------------------
  183. // Peers
  184. // Broadcast runs a go routine for each attempted send, which will block trying
  185. // to send for defaultSendTimeoutSeconds. Returns a channel which receives
  186. // success values for each attempted send (false if times out). Channel will be
  187. // closed once msg bytes are sent to all peers (or time out).
  188. //
  189. // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
  190. func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
  191. successChan := make(chan bool, len(sw.peers.List()))
  192. sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  193. var wg sync.WaitGroup
  194. for _, peer := range sw.peers.List() {
  195. wg.Add(1)
  196. go func(peer Peer) {
  197. defer wg.Done()
  198. success := peer.Send(chID, msgBytes)
  199. successChan <- success
  200. }(peer)
  201. }
  202. go func() {
  203. wg.Wait()
  204. close(successChan)
  205. }()
  206. return successChan
  207. }
  208. // NumPeers returns the count of outbound/inbound and outbound-dialing peers.
  209. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  210. peers := sw.peers.List()
  211. for _, peer := range peers {
  212. if peer.IsOutbound() {
  213. outbound++
  214. } else {
  215. inbound++
  216. }
  217. }
  218. dialing = sw.dialing.Size()
  219. return
  220. }
  221. // MaxNumOutboundPeers returns a maximum number of outbound peers.
  222. func (sw *Switch) MaxNumOutboundPeers() int {
  223. return sw.config.MaxNumOutboundPeers
  224. }
  225. // Peers returns the set of peers that are connected to the switch.
  226. func (sw *Switch) Peers() IPeerSet {
  227. return sw.peers
  228. }
  229. // StopPeerForError disconnects from a peer due to external error.
  230. // If the peer is persistent, it will attempt to reconnect.
  231. // TODO: make record depending on reason.
  232. func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
  233. sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason)
  234. sw.stopAndRemovePeer(peer, reason)
  235. if peer.IsPersistent() {
  236. addr := peer.OriginalAddr()
  237. if addr == nil {
  238. // FIXME: persistent peers can't be inbound right now.
  239. // self-reported address for inbound persistent peers
  240. addr = peer.NodeInfo().NetAddress()
  241. }
  242. go sw.reconnectToPeer(addr)
  243. }
  244. }
  245. // StopPeerGracefully disconnects from a peer gracefully.
  246. // TODO: handle graceful disconnects.
  247. func (sw *Switch) StopPeerGracefully(peer Peer) {
  248. sw.Logger.Info("Stopping peer gracefully")
  249. sw.stopAndRemovePeer(peer, nil)
  250. }
  251. func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
  252. sw.peers.Remove(peer)
  253. sw.metrics.Peers.Add(float64(-1))
  254. peer.Stop()
  255. for _, reactor := range sw.reactors {
  256. reactor.RemovePeer(peer, reason)
  257. }
  258. }
  259. // reconnectToPeer tries to reconnect to the addr, first repeatedly
  260. // with a fixed interval, then with exponential backoff.
  261. // If no success after all that, it stops trying, and leaves it
  262. // to the PEX/Addrbook to find the peer with the addr again
  263. // NOTE: this will keep trying even if the handshake or auth fails.
  264. // TODO: be more explicit with error types so we only retry on certain failures
  265. // - ie. if we're getting ErrDuplicatePeer we can stop
  266. // because the addrbook got us the peer back already
  267. func (sw *Switch) reconnectToPeer(addr *NetAddress) {
  268. if sw.reconnecting.Has(string(addr.ID)) {
  269. return
  270. }
  271. sw.reconnecting.Set(string(addr.ID), addr)
  272. defer sw.reconnecting.Delete(string(addr.ID))
  273. start := time.Now()
  274. sw.Logger.Info("Reconnecting to peer", "addr", addr)
  275. for i := 0; i < reconnectAttempts; i++ {
  276. if !sw.IsRunning() {
  277. return
  278. }
  279. if sw.IsDialingOrExistingAddress(addr) {
  280. sw.Logger.Debug("Peer connection has been established or dialed while we waiting next try", "addr", addr)
  281. return
  282. }
  283. err := sw.DialPeerWithAddress(addr, true)
  284. if err == nil {
  285. return // success
  286. }
  287. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  288. // sleep a set amount
  289. sw.randomSleep(reconnectInterval)
  290. continue
  291. }
  292. sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff",
  293. "addr", addr, "elapsed", time.Since(start))
  294. for i := 0; i < reconnectBackOffAttempts; i++ {
  295. if !sw.IsRunning() {
  296. return
  297. }
  298. // sleep an exponentially increasing amount
  299. sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
  300. sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
  301. err := sw.DialPeerWithAddress(addr, true)
  302. if err == nil {
  303. return // success
  304. }
  305. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  306. }
  307. sw.Logger.Error("Failed to reconnect to peer. Giving up", "addr", addr, "elapsed", time.Since(start))
  308. }
  309. // SetAddrBook allows to set address book on Switch.
  310. func (sw *Switch) SetAddrBook(addrBook AddrBook) {
  311. sw.addrBook = addrBook
  312. }
  313. // MarkPeerAsGood marks the given peer as good when it did something useful
  314. // like contributed to consensus.
  315. func (sw *Switch) MarkPeerAsGood(peer Peer) {
  316. if sw.addrBook != nil {
  317. sw.addrBook.MarkGood(peer.NodeInfo().NetAddress())
  318. }
  319. }
  320. //---------------------------------------------------------------------
  321. // Dialing
  322. // DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
  323. // Used to dial peers from config on startup or from unsafe-RPC (trusted sources).
  324. // TODO: remove addrBook arg since it's now set on the switch
  325. func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error {
  326. netAddrs, errs := NewNetAddressStrings(peers)
  327. // only log errors, dial correct addresses
  328. for _, err := range errs {
  329. sw.Logger.Error("Error in peer's address", "err", err)
  330. }
  331. ourAddr := sw.nodeInfo.NetAddress()
  332. // TODO: this code feels like it's in the wrong place.
  333. // The integration tests depend on the addrBook being saved
  334. // right away but maybe we can change that. Recall that
  335. // the addrBook is only written to disk every 2min
  336. if addrBook != nil {
  337. // add peers to `addrBook`
  338. for _, netAddr := range netAddrs {
  339. // do not add our address or ID
  340. if !netAddr.Same(ourAddr) {
  341. if err := addrBook.AddAddress(netAddr, ourAddr); err != nil {
  342. sw.Logger.Error("Can't add peer's address to addrbook", "err", err)
  343. }
  344. }
  345. }
  346. // Persist some peers to disk right away.
  347. // NOTE: integration tests depend on this
  348. addrBook.Save()
  349. }
  350. // permute the list, dial them in random order.
  351. perm := sw.rng.Perm(len(netAddrs))
  352. for i := 0; i < len(perm); i++ {
  353. go func(i int) {
  354. j := perm[i]
  355. addr := netAddrs[j]
  356. if addr.Same(ourAddr) {
  357. sw.Logger.Debug("Ignore attempt to connect to ourselves", "addr", addr, "ourAddr", ourAddr)
  358. return
  359. }
  360. sw.randomSleep(0)
  361. if sw.IsDialingOrExistingAddress(addr) {
  362. sw.Logger.Debug("Ignore attempt to connect to an existing peer", "addr", addr)
  363. return
  364. }
  365. err := sw.DialPeerWithAddress(addr, persistent)
  366. if err != nil {
  367. switch err.(type) {
  368. case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID:
  369. sw.Logger.Debug("Error dialing peer", "err", err)
  370. default:
  371. sw.Logger.Error("Error dialing peer", "err", err)
  372. }
  373. }
  374. }(i)
  375. }
  376. return nil
  377. }
  378. // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully.
  379. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
  380. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error {
  381. sw.dialing.Set(string(addr.ID), addr)
  382. defer sw.dialing.Delete(string(addr.ID))
  383. return sw.addOutboundPeerWithConfig(addr, sw.config, persistent)
  384. }
  385. // sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds]
  386. func (sw *Switch) randomSleep(interval time.Duration) {
  387. r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond
  388. time.Sleep(r + interval)
  389. }
  390. // IsDialingOrExistingAddress returns true if switch has a peer with the given
  391. // address or dialing it at the moment.
  392. func (sw *Switch) IsDialingOrExistingAddress(addr *NetAddress) bool {
  393. return sw.dialing.Has(string(addr.ID)) ||
  394. sw.peers.Has(addr.ID) ||
  395. (!sw.config.AllowDuplicateIP && sw.peers.HasIP(addr.IP))
  396. }
  397. func (sw *Switch) acceptRoutine() {
  398. for {
  399. p, err := sw.transport.Accept(peerConfig{
  400. chDescs: sw.chDescs,
  401. onPeerError: sw.StopPeerForError,
  402. reactorsByCh: sw.reactorsByCh,
  403. metrics: sw.metrics,
  404. })
  405. if err != nil {
  406. switch err.(type) {
  407. case ErrRejected:
  408. rErr := err.(ErrRejected)
  409. if rErr.IsSelf() {
  410. // Remove the given address from the address book and add to our addresses
  411. // to avoid dialing in the future.
  412. addr := rErr.Addr()
  413. sw.addrBook.RemoveAddress(&addr)
  414. sw.addrBook.AddOurAddress(&addr)
  415. }
  416. sw.Logger.Info(
  417. "Inbound Peer rejected",
  418. "err", err,
  419. "numPeers", sw.peers.Size(),
  420. )
  421. continue
  422. case *ErrTransportClosed:
  423. sw.Logger.Error(
  424. "Stopped accept routine, as transport is closed",
  425. "numPeers", sw.peers.Size(),
  426. )
  427. default:
  428. sw.Logger.Error(
  429. "Accept on transport errored",
  430. "err", err,
  431. "numPeers", sw.peers.Size(),
  432. )
  433. }
  434. break
  435. }
  436. // Ignore connection if we already have enough peers.
  437. _, in, _ := sw.NumPeers()
  438. if in >= sw.config.MaxNumInboundPeers {
  439. sw.Logger.Info(
  440. "Ignoring inbound connection: already have enough inbound peers",
  441. "address", p.NodeInfo().NetAddress().String(),
  442. "have", in,
  443. "max", sw.config.MaxNumInboundPeers,
  444. )
  445. _ = p.Stop()
  446. continue
  447. }
  448. if err := sw.addPeer(p); err != nil {
  449. _ = p.Stop()
  450. sw.Logger.Info(
  451. "Ignoring inbound connection: error while adding peer",
  452. "err", err,
  453. "id", p.ID(),
  454. )
  455. }
  456. }
  457. }
  458. // dial the peer; make secret connection; authenticate against the dialed ID;
  459. // add the peer.
  460. // if dialing fails, start the reconnect loop. If handhsake fails, its over.
  461. // If peer is started succesffuly, reconnectLoop will start when
  462. // StopPeerForError is called
  463. func (sw *Switch) addOutboundPeerWithConfig(
  464. addr *NetAddress,
  465. cfg *config.P2PConfig,
  466. persistent bool,
  467. ) error {
  468. sw.Logger.Info("Dialing peer", "address", addr)
  469. // XXX(xla): Remove the leakage of test concerns in implementation.
  470. if cfg.TestDialFail {
  471. go sw.reconnectToPeer(addr)
  472. return fmt.Errorf("dial err (peerConfig.DialFail == true)")
  473. }
  474. p, err := sw.transport.Dial(*addr, peerConfig{
  475. chDescs: sw.chDescs,
  476. onPeerError: sw.StopPeerForError,
  477. persistent: persistent,
  478. reactorsByCh: sw.reactorsByCh,
  479. metrics: sw.metrics,
  480. })
  481. if err != nil {
  482. switch e := err.(type) {
  483. case ErrRejected:
  484. if e.IsSelf() {
  485. // Remove the given address from the address book and add to our addresses
  486. // to avoid dialing in the future.
  487. sw.addrBook.RemoveAddress(addr)
  488. sw.addrBook.AddOurAddress(addr)
  489. return err
  490. }
  491. }
  492. // retry persistent peers after
  493. // any dial error besides IsSelf()
  494. if persistent {
  495. go sw.reconnectToPeer(addr)
  496. }
  497. return err
  498. }
  499. if err := sw.addPeer(p); err != nil {
  500. _ = p.Stop()
  501. return err
  502. }
  503. return nil
  504. }
  505. func (sw *Switch) filterPeer(p Peer) error {
  506. // Avoid duplicate
  507. if sw.peers.Has(p.ID()) {
  508. return ErrRejected{id: p.ID(), isDuplicate: true}
  509. }
  510. errc := make(chan error, len(sw.peerFilters))
  511. for _, f := range sw.peerFilters {
  512. go func(f PeerFilterFunc, p Peer, errc chan<- error) {
  513. errc <- f(sw.peers, p)
  514. }(f, p, errc)
  515. }
  516. for i := 0; i < cap(errc); i++ {
  517. select {
  518. case err := <-errc:
  519. if err != nil {
  520. return ErrRejected{id: p.ID(), err: err, isFiltered: true}
  521. }
  522. case <-time.After(sw.filterTimeout):
  523. return ErrFilterTimeout{}
  524. }
  525. }
  526. return nil
  527. }
  528. // addPeer starts up the Peer and adds it to the Switch.
  529. func (sw *Switch) addPeer(p Peer) error {
  530. if err := sw.filterPeer(p); err != nil {
  531. return err
  532. }
  533. p.SetLogger(sw.Logger.With("peer", p.NodeInfo().NetAddress()))
  534. // All good. Start peer
  535. if sw.IsRunning() {
  536. if err := sw.startInitPeer(p); err != nil {
  537. return err
  538. }
  539. }
  540. // Add the peer to .peers.
  541. // We start it first so that a peer in the list is safe to Stop.
  542. // It should not err since we already checked peers.Has().
  543. if err := sw.peers.Add(p); err != nil {
  544. return err
  545. }
  546. sw.Logger.Info("Added peer", "peer", p)
  547. sw.metrics.Peers.Add(float64(1))
  548. return nil
  549. }
  550. func (sw *Switch) startInitPeer(p Peer) error {
  551. err := p.Start() // spawn send/recv routines
  552. if err != nil {
  553. // Should never happen
  554. sw.Logger.Error(
  555. "Error starting peer",
  556. "err", err,
  557. "peer", p,
  558. )
  559. return err
  560. }
  561. for _, reactor := range sw.reactors {
  562. reactor.AddPeer(p)
  563. }
  564. return nil
  565. }