You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

665 lines
19 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "math"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/tendermint/tendermint/config"
  9. cmn "github.com/tendermint/tendermint/libs/common"
  10. "github.com/tendermint/tendermint/p2p/conn"
  11. )
  12. const (
  13. // wait a random amount of time from this interval
  14. // before dialing peers or reconnecting to help prevent DoS
  15. dialRandomizerIntervalMilliseconds = 3000
  16. // repeatedly try to reconnect for a few minutes
  17. // ie. 5 * 20 = 100s
  18. reconnectAttempts = 20
  19. reconnectInterval = 5 * time.Second
  20. // then move into exponential backoff mode for ~1day
  21. // ie. 3**10 = 16hrs
  22. reconnectBackOffAttempts = 10
  23. reconnectBackOffBaseSeconds = 3
  24. // keep at least this many outbound peers
  25. // TODO: move to config
  26. DefaultMinNumOutboundPeers = 10
  27. )
  28. //-----------------------------------------------------------------------------
  29. // An AddrBook represents an address book from the pex package, which is used
  30. // to store peer addresses.
  31. type AddrBook interface {
  32. AddAddress(addr *NetAddress, src *NetAddress) error
  33. AddOurAddress(*NetAddress)
  34. OurAddress(*NetAddress) bool
  35. MarkGood(*NetAddress)
  36. RemoveAddress(*NetAddress)
  37. HasAddress(*NetAddress) bool
  38. Save()
  39. }
  40. //-----------------------------------------------------------------------------
  41. // Switch handles peer connections and exposes an API to receive incoming messages
  42. // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  43. // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  44. // incoming messages are received on the reactor.
  45. type Switch struct {
  46. cmn.BaseService
  47. config *config.P2PConfig
  48. listeners []Listener
  49. reactors map[string]Reactor
  50. chDescs []*conn.ChannelDescriptor
  51. reactorsByCh map[byte]Reactor
  52. peers *PeerSet
  53. dialing *cmn.CMap
  54. reconnecting *cmn.CMap
  55. nodeInfo NodeInfo // our node info
  56. nodeKey *NodeKey // our node privkey
  57. addrBook AddrBook
  58. filterConnByAddr func(net.Addr) error
  59. filterConnByID func(ID) error
  60. mConfig conn.MConnConfig
  61. rng *cmn.Rand // seed for randomizing dial times and orders
  62. metrics *Metrics
  63. }
  64. // SwitchOption sets an optional parameter on the Switch.
  65. type SwitchOption func(*Switch)
  66. // NewSwitch creates a new Switch with the given config.
  67. func NewSwitch(cfg *config.P2PConfig, options ...SwitchOption) *Switch {
  68. sw := &Switch{
  69. config: cfg,
  70. reactors: make(map[string]Reactor),
  71. chDescs: make([]*conn.ChannelDescriptor, 0),
  72. reactorsByCh: make(map[byte]Reactor),
  73. peers: NewPeerSet(),
  74. dialing: cmn.NewCMap(),
  75. reconnecting: cmn.NewCMap(),
  76. metrics: NopMetrics(),
  77. }
  78. // Ensure we have a completely undeterministic PRNG.
  79. sw.rng = cmn.NewRand()
  80. mConfig := conn.DefaultMConnConfig()
  81. mConfig.FlushThrottle = time.Duration(cfg.FlushThrottleTimeout) * time.Millisecond
  82. mConfig.SendRate = cfg.SendRate
  83. mConfig.RecvRate = cfg.RecvRate
  84. mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize
  85. sw.mConfig = mConfig
  86. sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
  87. for _, option := range options {
  88. option(sw)
  89. }
  90. return sw
  91. }
  92. // WithMetrics sets the metrics.
  93. func WithMetrics(metrics *Metrics) SwitchOption {
  94. return func(sw *Switch) { sw.metrics = metrics }
  95. }
  96. //---------------------------------------------------------------------
  97. // Switch setup
  98. // AddReactor adds the given reactor to the switch.
  99. // NOTE: Not goroutine safe.
  100. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  101. // Validate the reactor.
  102. // No two reactors can share the same channel.
  103. reactorChannels := reactor.GetChannels()
  104. for _, chDesc := range reactorChannels {
  105. chID := chDesc.ID
  106. if sw.reactorsByCh[chID] != nil {
  107. cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
  108. }
  109. sw.chDescs = append(sw.chDescs, chDesc)
  110. sw.reactorsByCh[chID] = reactor
  111. }
  112. sw.reactors[name] = reactor
  113. reactor.SetSwitch(sw)
  114. return reactor
  115. }
  116. // Reactors returns a map of reactors registered on the switch.
  117. // NOTE: Not goroutine safe.
  118. func (sw *Switch) Reactors() map[string]Reactor {
  119. return sw.reactors
  120. }
  121. // Reactor returns the reactor with the given name.
  122. // NOTE: Not goroutine safe.
  123. func (sw *Switch) Reactor(name string) Reactor {
  124. return sw.reactors[name]
  125. }
  126. // AddListener adds the given listener to the switch for listening to incoming peer connections.
  127. // NOTE: Not goroutine safe.
  128. func (sw *Switch) AddListener(l Listener) {
  129. sw.listeners = append(sw.listeners, l)
  130. }
  131. // Listeners returns the list of listeners the switch listens on.
  132. // NOTE: Not goroutine safe.
  133. func (sw *Switch) Listeners() []Listener {
  134. return sw.listeners
  135. }
  136. // IsListening returns true if the switch has at least one listener.
  137. // NOTE: Not goroutine safe.
  138. func (sw *Switch) IsListening() bool {
  139. return len(sw.listeners) > 0
  140. }
  141. // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
  142. // NOTE: Not goroutine safe.
  143. func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo) {
  144. sw.nodeInfo = nodeInfo
  145. }
  146. // NodeInfo returns the switch's NodeInfo.
  147. // NOTE: Not goroutine safe.
  148. func (sw *Switch) NodeInfo() NodeInfo {
  149. return sw.nodeInfo
  150. }
  151. // SetNodeKey sets the switch's private key for authenticated encryption.
  152. // NOTE: Not goroutine safe.
  153. func (sw *Switch) SetNodeKey(nodeKey *NodeKey) {
  154. sw.nodeKey = nodeKey
  155. }
  156. //---------------------------------------------------------------------
  157. // Service start/stop
  158. // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
  159. func (sw *Switch) OnStart() error {
  160. // Start reactors
  161. for _, reactor := range sw.reactors {
  162. err := reactor.Start()
  163. if err != nil {
  164. return cmn.ErrorWrap(err, "failed to start %v", reactor)
  165. }
  166. }
  167. // Start listeners
  168. for _, listener := range sw.listeners {
  169. go sw.listenerRoutine(listener)
  170. }
  171. return nil
  172. }
  173. // OnStop implements BaseService. It stops all listeners, peers, and reactors.
  174. func (sw *Switch) OnStop() {
  175. // Stop listeners
  176. for _, listener := range sw.listeners {
  177. listener.Stop()
  178. }
  179. sw.listeners = nil
  180. // Stop peers
  181. for _, peer := range sw.peers.List() {
  182. peer.Stop()
  183. sw.peers.Remove(peer)
  184. }
  185. // Stop reactors
  186. sw.Logger.Debug("Switch: Stopping reactors")
  187. for _, reactor := range sw.reactors {
  188. reactor.Stop()
  189. }
  190. }
  191. //---------------------------------------------------------------------
  192. // Peers
  193. // Broadcast runs a go routine for each attempted send, which will block trying
  194. // to send for defaultSendTimeoutSeconds. Returns a channel which receives
  195. // success values for each attempted send (false if times out). Channel will be
  196. // closed once msg bytes are sent to all peers (or time out).
  197. //
  198. // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
  199. func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
  200. successChan := make(chan bool, len(sw.peers.List()))
  201. sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  202. var wg sync.WaitGroup
  203. for _, peer := range sw.peers.List() {
  204. wg.Add(1)
  205. go func(peer Peer) {
  206. defer wg.Done()
  207. success := peer.Send(chID, msgBytes)
  208. successChan <- success
  209. }(peer)
  210. }
  211. go func() {
  212. wg.Wait()
  213. close(successChan)
  214. }()
  215. return successChan
  216. }
  217. // NumPeers returns the count of outbound/inbound and outbound-dialing peers.
  218. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  219. peers := sw.peers.List()
  220. for _, peer := range peers {
  221. if peer.IsOutbound() {
  222. outbound++
  223. } else {
  224. inbound++
  225. }
  226. }
  227. dialing = sw.dialing.Size()
  228. return
  229. }
  230. // Peers returns the set of peers that are connected to the switch.
  231. func (sw *Switch) Peers() IPeerSet {
  232. return sw.peers
  233. }
  234. // StopPeerForError disconnects from a peer due to external error.
  235. // If the peer is persistent, it will attempt to reconnect.
  236. // TODO: make record depending on reason.
  237. func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
  238. sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason)
  239. sw.stopAndRemovePeer(peer, reason)
  240. if peer.IsPersistent() {
  241. addr := peer.OriginalAddr()
  242. if addr == nil {
  243. panic(fmt.Sprintf("persistent peer %v with no original address", peer))
  244. }
  245. go sw.reconnectToPeer(addr)
  246. }
  247. }
  248. // StopPeerGracefully disconnects from a peer gracefully.
  249. // TODO: handle graceful disconnects.
  250. func (sw *Switch) StopPeerGracefully(peer Peer) {
  251. sw.Logger.Info("Stopping peer gracefully")
  252. sw.stopAndRemovePeer(peer, nil)
  253. }
  254. func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
  255. sw.peers.Remove(peer)
  256. sw.metrics.Peers.Add(float64(-1))
  257. peer.Stop()
  258. for _, reactor := range sw.reactors {
  259. reactor.RemovePeer(peer, reason)
  260. }
  261. }
  262. // reconnectToPeer tries to reconnect to the addr, first repeatedly
  263. // with a fixed interval, then with exponential backoff.
  264. // If no success after all that, it stops trying, and leaves it
  265. // to the PEX/Addrbook to find the peer with the addr again
  266. // NOTE: this will keep trying even if the handshake or auth fails.
  267. // TODO: be more explicit with error types so we only retry on certain failures
  268. // - ie. if we're getting ErrDuplicatePeer we can stop
  269. // because the addrbook got us the peer back already
  270. func (sw *Switch) reconnectToPeer(addr *NetAddress) {
  271. if sw.reconnecting.Has(string(addr.ID)) {
  272. return
  273. }
  274. sw.reconnecting.Set(string(addr.ID), addr)
  275. defer sw.reconnecting.Delete(string(addr.ID))
  276. start := time.Now()
  277. sw.Logger.Info("Reconnecting to peer", "addr", addr)
  278. for i := 0; i < reconnectAttempts; i++ {
  279. if !sw.IsRunning() {
  280. return
  281. }
  282. err := sw.DialPeerWithAddress(addr, true)
  283. if err == nil {
  284. return // success
  285. }
  286. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  287. // sleep a set amount
  288. sw.randomSleep(reconnectInterval)
  289. continue
  290. }
  291. sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff",
  292. "addr", addr, "elapsed", time.Since(start))
  293. for i := 0; i < reconnectBackOffAttempts; i++ {
  294. if !sw.IsRunning() {
  295. return
  296. }
  297. // sleep an exponentially increasing amount
  298. sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
  299. sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
  300. err := sw.DialPeerWithAddress(addr, true)
  301. if err == nil {
  302. return // success
  303. }
  304. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  305. }
  306. sw.Logger.Error("Failed to reconnect to peer. Giving up", "addr", addr, "elapsed", time.Since(start))
  307. }
  308. // SetAddrBook allows to set address book on Switch.
  309. func (sw *Switch) SetAddrBook(addrBook AddrBook) {
  310. sw.addrBook = addrBook
  311. }
  312. // MarkPeerAsGood marks the given peer as good when it did something useful
  313. // like contributed to consensus.
  314. func (sw *Switch) MarkPeerAsGood(peer Peer) {
  315. if sw.addrBook != nil {
  316. sw.addrBook.MarkGood(peer.NodeInfo().NetAddress())
  317. }
  318. }
  319. //---------------------------------------------------------------------
  320. // Dialing
  321. // IsDialing returns true if the switch is currently dialing the given ID.
  322. func (sw *Switch) IsDialing(id ID) bool {
  323. return sw.dialing.Has(string(id))
  324. }
  325. // DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
  326. // Used to dial peers from config on startup or from unsafe-RPC (trusted sources).
  327. // TODO: remove addrBook arg since it's now set on the switch
  328. func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error {
  329. netAddrs, errs := NewNetAddressStrings(peers)
  330. // only log errors, dial correct addresses
  331. for _, err := range errs {
  332. sw.Logger.Error("Error in peer's address", "err", err)
  333. }
  334. ourAddr := sw.nodeInfo.NetAddress()
  335. // TODO: this code feels like it's in the wrong place.
  336. // The integration tests depend on the addrBook being saved
  337. // right away but maybe we can change that. Recall that
  338. // the addrBook is only written to disk every 2min
  339. if addrBook != nil {
  340. // add peers to `addrBook`
  341. for _, netAddr := range netAddrs {
  342. // do not add our address or ID
  343. if !netAddr.Same(ourAddr) {
  344. if err := addrBook.AddAddress(netAddr, ourAddr); err != nil {
  345. sw.Logger.Error("Can't add peer's address to addrbook", "err", err)
  346. }
  347. }
  348. }
  349. // Persist some peers to disk right away.
  350. // NOTE: integration tests depend on this
  351. addrBook.Save()
  352. }
  353. // permute the list, dial them in random order.
  354. perm := sw.rng.Perm(len(netAddrs))
  355. for i := 0; i < len(perm); i++ {
  356. go func(i int) {
  357. j := perm[i]
  358. addr := netAddrs[j]
  359. // do not dial ourselves
  360. if addr.Same(ourAddr) {
  361. return
  362. }
  363. sw.randomSleep(0)
  364. err := sw.DialPeerWithAddress(addr, persistent)
  365. if err != nil {
  366. switch err.(type) {
  367. case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID:
  368. sw.Logger.Debug("Error dialing peer", "err", err)
  369. default:
  370. sw.Logger.Error("Error dialing peer", "err", err)
  371. }
  372. }
  373. }(i)
  374. }
  375. return nil
  376. }
  377. // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully.
  378. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
  379. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error {
  380. sw.dialing.Set(string(addr.ID), addr)
  381. defer sw.dialing.Delete(string(addr.ID))
  382. return sw.addOutboundPeerWithConfig(addr, sw.config, persistent)
  383. }
  384. // sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds]
  385. func (sw *Switch) randomSleep(interval time.Duration) {
  386. r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond
  387. time.Sleep(r + interval)
  388. }
  389. //------------------------------------------------------------------------------------
  390. // Connection filtering
  391. // FilterConnByAddr returns an error if connecting to the given address is forbidden.
  392. func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
  393. if sw.filterConnByAddr != nil {
  394. return sw.filterConnByAddr(addr)
  395. }
  396. return nil
  397. }
  398. // FilterConnByID returns an error if connecting to the given peer ID is forbidden.
  399. func (sw *Switch) FilterConnByID(id ID) error {
  400. if sw.filterConnByID != nil {
  401. return sw.filterConnByID(id)
  402. }
  403. return nil
  404. }
  405. // SetAddrFilter sets the function for filtering connections by address.
  406. func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
  407. sw.filterConnByAddr = f
  408. }
  409. // SetIDFilter sets the function for filtering connections by peer ID.
  410. func (sw *Switch) SetIDFilter(f func(ID) error) {
  411. sw.filterConnByID = f
  412. }
  413. //------------------------------------------------------------------------------------
  414. func (sw *Switch) listenerRoutine(l Listener) {
  415. for {
  416. inConn, ok := <-l.Connections()
  417. if !ok {
  418. break
  419. }
  420. // ignore connection if we already have enough
  421. // leave room for MinNumOutboundPeers
  422. maxPeers := sw.config.MaxNumPeers - DefaultMinNumOutboundPeers
  423. if maxPeers <= sw.peers.Size() {
  424. sw.Logger.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers)
  425. continue
  426. }
  427. // New inbound connection!
  428. err := sw.addInboundPeerWithConfig(inConn, sw.config)
  429. if err != nil {
  430. sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err)
  431. continue
  432. }
  433. }
  434. // cleanup
  435. }
  436. func (sw *Switch) addInboundPeerWithConfig(
  437. conn net.Conn,
  438. config *config.P2PConfig,
  439. ) error {
  440. peerConn, err := newInboundPeerConn(conn, config, sw.nodeKey.PrivKey)
  441. if err != nil {
  442. conn.Close() // peer is nil
  443. return err
  444. }
  445. if err = sw.addPeer(peerConn); err != nil {
  446. peerConn.CloseConn()
  447. return err
  448. }
  449. return nil
  450. }
  451. // dial the peer; make secret connection; authenticate against the dialed ID;
  452. // add the peer.
  453. // if dialing fails, start the reconnect loop. If handhsake fails, its over.
  454. // If peer is started succesffuly, reconnectLoop will start when
  455. // StopPeerForError is called
  456. func (sw *Switch) addOutboundPeerWithConfig(
  457. addr *NetAddress,
  458. config *config.P2PConfig,
  459. persistent bool,
  460. ) error {
  461. sw.Logger.Info("Dialing peer", "address", addr)
  462. peerConn, err := newOutboundPeerConn(
  463. addr,
  464. config,
  465. persistent,
  466. sw.nodeKey.PrivKey,
  467. )
  468. if err != nil {
  469. if persistent {
  470. go sw.reconnectToPeer(addr)
  471. }
  472. return err
  473. }
  474. if err := sw.addPeer(peerConn); err != nil {
  475. peerConn.CloseConn()
  476. return err
  477. }
  478. return nil
  479. }
  480. // addPeer performs the Tendermint P2P handshake with a peer
  481. // that already has a SecretConnection. If all goes well,
  482. // it starts the peer and adds it to the switch.
  483. // NOTE: This performs a blocking handshake before the peer is added.
  484. // NOTE: If error is returned, caller is responsible for calling
  485. // peer.CloseConn()
  486. func (sw *Switch) addPeer(pc peerConn) error {
  487. addr := pc.conn.RemoteAddr()
  488. if err := sw.FilterConnByAddr(addr); err != nil {
  489. return err
  490. }
  491. // Exchange NodeInfo on the conn
  492. peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.HandshakeTimeout))
  493. if err != nil {
  494. return err
  495. }
  496. peerID := peerNodeInfo.ID
  497. // ensure connection key matches self reported key
  498. connID := pc.ID()
  499. if peerID != connID {
  500. return fmt.Errorf(
  501. "nodeInfo.ID() (%v) doesn't match conn.ID() (%v)",
  502. peerID,
  503. connID,
  504. )
  505. }
  506. // Validate the peers nodeInfo
  507. if err := peerNodeInfo.Validate(); err != nil {
  508. return err
  509. }
  510. // Avoid self
  511. if sw.nodeKey.ID() == peerID {
  512. addr := peerNodeInfo.NetAddress()
  513. // remove the given address from the address book
  514. // and add to our addresses to avoid dialing again
  515. sw.addrBook.RemoveAddress(addr)
  516. sw.addrBook.AddOurAddress(addr)
  517. return ErrSwitchConnectToSelf{addr}
  518. }
  519. // Avoid duplicate
  520. if sw.peers.Has(peerID) {
  521. return ErrSwitchDuplicatePeerID{peerID}
  522. }
  523. // Check for duplicate connection or peer info IP.
  524. if !sw.config.AllowDuplicateIP &&
  525. (sw.peers.HasIP(pc.RemoteIP()) ||
  526. sw.peers.HasIP(peerNodeInfo.NetAddress().IP)) {
  527. return ErrSwitchDuplicatePeerIP{pc.RemoteIP()}
  528. }
  529. // Filter peer against ID white list
  530. if err := sw.FilterConnByID(peerID); err != nil {
  531. return err
  532. }
  533. // Check version, chain id
  534. if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
  535. return err
  536. }
  537. peer := newPeer(pc, sw.mConfig, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  538. peer.SetLogger(sw.Logger.With("peer", addr))
  539. peer.Logger.Info("Successful handshake with peer", "peerNodeInfo", peerNodeInfo)
  540. // All good. Start peer
  541. if sw.IsRunning() {
  542. if err = sw.startInitPeer(peer); err != nil {
  543. return err
  544. }
  545. }
  546. // Add the peer to .peers.
  547. // We start it first so that a peer in the list is safe to Stop.
  548. // It should not err since we already checked peers.Has().
  549. if err := sw.peers.Add(peer); err != nil {
  550. return err
  551. }
  552. sw.metrics.Peers.Add(float64(1))
  553. sw.Logger.Info("Added peer", "peer", peer)
  554. return nil
  555. }
  556. func (sw *Switch) startInitPeer(peer *peer) error {
  557. err := peer.Start() // spawn send/recv routines
  558. if err != nil {
  559. // Should never happen
  560. sw.Logger.Error("Error starting peer", "peer", peer, "err", err)
  561. return err
  562. }
  563. for _, reactor := range sw.reactors {
  564. reactor.AddPeer(peer)
  565. }
  566. return nil
  567. }