You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

667 lines
19 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "math"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/tendermint/tendermint/config"
  9. cmn "github.com/tendermint/tendermint/libs/common"
  10. "github.com/tendermint/tendermint/p2p/conn"
  11. )
  12. const (
  13. // wait a random amount of time from this interval
  14. // before dialing peers or reconnecting to help prevent DoS
  15. dialRandomizerIntervalMilliseconds = 3000
  16. // repeatedly try to reconnect for a few minutes
  17. // ie. 5 * 20 = 100s
  18. reconnectAttempts = 20
  19. reconnectInterval = 5 * time.Second
  20. // then move into exponential backoff mode for ~1day
  21. // ie. 3**10 = 16hrs
  22. reconnectBackOffAttempts = 10
  23. reconnectBackOffBaseSeconds = 3
  24. // keep at least this many outbound peers
  25. // TODO: move to config
  26. DefaultMinNumOutboundPeers = 10
  27. )
  28. //-----------------------------------------------------------------------------
  29. // An AddrBook represents an address book from the pex package, which is used
  30. // to store peer addresses.
  31. type AddrBook interface {
  32. AddAddress(addr *NetAddress, src *NetAddress) error
  33. AddOurAddress(*NetAddress)
  34. OurAddress(*NetAddress) bool
  35. MarkGood(*NetAddress)
  36. RemoveAddress(*NetAddress)
  37. HasAddress(*NetAddress) bool
  38. Save()
  39. }
  40. //-----------------------------------------------------------------------------
  41. // Switch handles peer connections and exposes an API to receive incoming messages
  42. // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  43. // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  44. // incoming messages are received on the reactor.
  45. type Switch struct {
  46. cmn.BaseService
  47. config *config.P2PConfig
  48. listeners []Listener
  49. reactors map[string]Reactor
  50. chDescs []*conn.ChannelDescriptor
  51. reactorsByCh map[byte]Reactor
  52. peers *PeerSet
  53. dialing *cmn.CMap
  54. reconnecting *cmn.CMap
  55. nodeInfo NodeInfo // our node info
  56. nodeKey *NodeKey // our node privkey
  57. addrBook AddrBook
  58. filterConnByAddr func(net.Addr) error
  59. filterConnByID func(ID) error
  60. mConfig conn.MConnConfig
  61. rng *cmn.Rand // seed for randomizing dial times and orders
  62. metrics *Metrics
  63. }
  64. // SwitchOption sets an optional parameter on the Switch.
  65. type SwitchOption func(*Switch)
  66. // NewSwitch creates a new Switch with the given config.
  67. func NewSwitch(cfg *config.P2PConfig, options ...SwitchOption) *Switch {
  68. sw := &Switch{
  69. config: cfg,
  70. reactors: make(map[string]Reactor),
  71. chDescs: make([]*conn.ChannelDescriptor, 0),
  72. reactorsByCh: make(map[byte]Reactor),
  73. peers: NewPeerSet(),
  74. dialing: cmn.NewCMap(),
  75. reconnecting: cmn.NewCMap(),
  76. metrics: NopMetrics(),
  77. }
  78. // Ensure we have a completely undeterministic PRNG.
  79. sw.rng = cmn.NewRand()
  80. mConfig := conn.DefaultMConnConfig()
  81. mConfig.FlushThrottle = time.Duration(cfg.FlushThrottleTimeout) * time.Millisecond
  82. mConfig.SendRate = cfg.SendRate
  83. mConfig.RecvRate = cfg.RecvRate
  84. mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize
  85. sw.mConfig = mConfig
  86. sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
  87. for _, option := range options {
  88. option(sw)
  89. }
  90. return sw
  91. }
  92. // WithMetrics sets the metrics.
  93. func WithMetrics(metrics *Metrics) SwitchOption {
  94. return func(sw *Switch) { sw.metrics = metrics }
  95. }
  96. //---------------------------------------------------------------------
  97. // Switch setup
  98. // AddReactor adds the given reactor to the switch.
  99. // NOTE: Not goroutine safe.
  100. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  101. // Validate the reactor.
  102. // No two reactors can share the same channel.
  103. reactorChannels := reactor.GetChannels()
  104. for _, chDesc := range reactorChannels {
  105. chID := chDesc.ID
  106. if sw.reactorsByCh[chID] != nil {
  107. cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
  108. }
  109. sw.chDescs = append(sw.chDescs, chDesc)
  110. sw.reactorsByCh[chID] = reactor
  111. }
  112. sw.reactors[name] = reactor
  113. reactor.SetSwitch(sw)
  114. return reactor
  115. }
  116. // Reactors returns a map of reactors registered on the switch.
  117. // NOTE: Not goroutine safe.
  118. func (sw *Switch) Reactors() map[string]Reactor {
  119. return sw.reactors
  120. }
  121. // Reactor returns the reactor with the given name.
  122. // NOTE: Not goroutine safe.
  123. func (sw *Switch) Reactor(name string) Reactor {
  124. return sw.reactors[name]
  125. }
  126. // AddListener adds the given listener to the switch for listening to incoming peer connections.
  127. // NOTE: Not goroutine safe.
  128. func (sw *Switch) AddListener(l Listener) {
  129. sw.listeners = append(sw.listeners, l)
  130. }
  131. // Listeners returns the list of listeners the switch listens on.
  132. // NOTE: Not goroutine safe.
  133. func (sw *Switch) Listeners() []Listener {
  134. return sw.listeners
  135. }
  136. // IsListening returns true if the switch has at least one listener.
  137. // NOTE: Not goroutine safe.
  138. func (sw *Switch) IsListening() bool {
  139. return len(sw.listeners) > 0
  140. }
  141. // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
  142. // NOTE: Not goroutine safe.
  143. func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo) {
  144. sw.nodeInfo = nodeInfo
  145. }
  146. // NodeInfo returns the switch's NodeInfo.
  147. // NOTE: Not goroutine safe.
  148. func (sw *Switch) NodeInfo() NodeInfo {
  149. return sw.nodeInfo
  150. }
  151. // SetNodeKey sets the switch's private key for authenticated encryption.
  152. // NOTE: Not goroutine safe.
  153. func (sw *Switch) SetNodeKey(nodeKey *NodeKey) {
  154. sw.nodeKey = nodeKey
  155. }
  156. //---------------------------------------------------------------------
  157. // Service start/stop
  158. // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
  159. func (sw *Switch) OnStart() error {
  160. // Start reactors
  161. for _, reactor := range sw.reactors {
  162. err := reactor.Start()
  163. if err != nil {
  164. return cmn.ErrorWrap(err, "failed to start %v", reactor)
  165. }
  166. }
  167. // Start listeners
  168. for _, listener := range sw.listeners {
  169. go sw.listenerRoutine(listener)
  170. }
  171. return nil
  172. }
  173. // OnStop implements BaseService. It stops all listeners, peers, and reactors.
  174. func (sw *Switch) OnStop() {
  175. // Stop listeners
  176. for _, listener := range sw.listeners {
  177. listener.Stop()
  178. }
  179. sw.listeners = nil
  180. // Stop peers
  181. for _, peer := range sw.peers.List() {
  182. peer.Stop()
  183. sw.peers.Remove(peer)
  184. }
  185. // Stop reactors
  186. sw.Logger.Debug("Switch: Stopping reactors")
  187. for _, reactor := range sw.reactors {
  188. reactor.Stop()
  189. }
  190. }
  191. //---------------------------------------------------------------------
  192. // Peers
  193. // Broadcast runs a go routine for each attempted send, which will block trying
  194. // to send for defaultSendTimeoutSeconds. Returns a channel which receives
  195. // success values for each attempted send (false if times out). Channel will be
  196. // closed once msg bytes are sent to all peers (or time out).
  197. //
  198. // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
  199. func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
  200. successChan := make(chan bool, len(sw.peers.List()))
  201. sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  202. var wg sync.WaitGroup
  203. for _, peer := range sw.peers.List() {
  204. wg.Add(1)
  205. go func(peer Peer) {
  206. defer wg.Done()
  207. success := peer.Send(chID, msgBytes)
  208. successChan <- success
  209. }(peer)
  210. }
  211. go func() {
  212. wg.Wait()
  213. close(successChan)
  214. }()
  215. return successChan
  216. }
  217. // NumPeers returns the count of outbound/inbound and outbound-dialing peers.
  218. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  219. peers := sw.peers.List()
  220. for _, peer := range peers {
  221. if peer.IsOutbound() {
  222. outbound++
  223. } else {
  224. inbound++
  225. }
  226. }
  227. dialing = sw.dialing.Size()
  228. return
  229. }
  230. // Peers returns the set of peers that are connected to the switch.
  231. func (sw *Switch) Peers() IPeerSet {
  232. return sw.peers
  233. }
  234. // StopPeerForError disconnects from a peer due to external error.
  235. // If the peer is persistent, it will attempt to reconnect.
  236. // TODO: make record depending on reason.
  237. func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
  238. sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason)
  239. sw.stopAndRemovePeer(peer, reason)
  240. if peer.IsPersistent() {
  241. addr := peer.OriginalAddr()
  242. if addr == nil {
  243. // FIXME: persistent peers can't be inbound right now.
  244. // self-reported address for inbound persistent peers
  245. addr = peer.NodeInfo().NetAddress()
  246. }
  247. go sw.reconnectToPeer(addr)
  248. }
  249. }
  250. // StopPeerGracefully disconnects from a peer gracefully.
  251. // TODO: handle graceful disconnects.
  252. func (sw *Switch) StopPeerGracefully(peer Peer) {
  253. sw.Logger.Info("Stopping peer gracefully")
  254. sw.stopAndRemovePeer(peer, nil)
  255. }
  256. func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
  257. sw.peers.Remove(peer)
  258. sw.metrics.Peers.Add(float64(-1))
  259. peer.Stop()
  260. for _, reactor := range sw.reactors {
  261. reactor.RemovePeer(peer, reason)
  262. }
  263. }
  264. // reconnectToPeer tries to reconnect to the addr, first repeatedly
  265. // with a fixed interval, then with exponential backoff.
  266. // If no success after all that, it stops trying, and leaves it
  267. // to the PEX/Addrbook to find the peer with the addr again
  268. // NOTE: this will keep trying even if the handshake or auth fails.
  269. // TODO: be more explicit with error types so we only retry on certain failures
  270. // - ie. if we're getting ErrDuplicatePeer we can stop
  271. // because the addrbook got us the peer back already
  272. func (sw *Switch) reconnectToPeer(addr *NetAddress) {
  273. if sw.reconnecting.Has(string(addr.ID)) {
  274. return
  275. }
  276. sw.reconnecting.Set(string(addr.ID), addr)
  277. defer sw.reconnecting.Delete(string(addr.ID))
  278. start := time.Now()
  279. sw.Logger.Info("Reconnecting to peer", "addr", addr)
  280. for i := 0; i < reconnectAttempts; i++ {
  281. if !sw.IsRunning() {
  282. return
  283. }
  284. err := sw.DialPeerWithAddress(addr, true)
  285. if err == nil {
  286. return // success
  287. }
  288. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  289. // sleep a set amount
  290. sw.randomSleep(reconnectInterval)
  291. continue
  292. }
  293. sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff",
  294. "addr", addr, "elapsed", time.Since(start))
  295. for i := 0; i < reconnectBackOffAttempts; i++ {
  296. if !sw.IsRunning() {
  297. return
  298. }
  299. // sleep an exponentially increasing amount
  300. sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
  301. sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
  302. err := sw.DialPeerWithAddress(addr, true)
  303. if err == nil {
  304. return // success
  305. }
  306. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  307. }
  308. sw.Logger.Error("Failed to reconnect to peer. Giving up", "addr", addr, "elapsed", time.Since(start))
  309. }
  310. // SetAddrBook allows to set address book on Switch.
  311. func (sw *Switch) SetAddrBook(addrBook AddrBook) {
  312. sw.addrBook = addrBook
  313. }
  314. // MarkPeerAsGood marks the given peer as good when it did something useful
  315. // like contributed to consensus.
  316. func (sw *Switch) MarkPeerAsGood(peer Peer) {
  317. if sw.addrBook != nil {
  318. sw.addrBook.MarkGood(peer.NodeInfo().NetAddress())
  319. }
  320. }
  321. //---------------------------------------------------------------------
  322. // Dialing
  323. // IsDialing returns true if the switch is currently dialing the given ID.
  324. func (sw *Switch) IsDialing(id ID) bool {
  325. return sw.dialing.Has(string(id))
  326. }
  327. // DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
  328. // Used to dial peers from config on startup or from unsafe-RPC (trusted sources).
  329. // TODO: remove addrBook arg since it's now set on the switch
  330. func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error {
  331. netAddrs, errs := NewNetAddressStrings(peers)
  332. // only log errors, dial correct addresses
  333. for _, err := range errs {
  334. sw.Logger.Error("Error in peer's address", "err", err)
  335. }
  336. ourAddr := sw.nodeInfo.NetAddress()
  337. // TODO: this code feels like it's in the wrong place.
  338. // The integration tests depend on the addrBook being saved
  339. // right away but maybe we can change that. Recall that
  340. // the addrBook is only written to disk every 2min
  341. if addrBook != nil {
  342. // add peers to `addrBook`
  343. for _, netAddr := range netAddrs {
  344. // do not add our address or ID
  345. if !netAddr.Same(ourAddr) {
  346. if err := addrBook.AddAddress(netAddr, ourAddr); err != nil {
  347. sw.Logger.Error("Can't add peer's address to addrbook", "err", err)
  348. }
  349. }
  350. }
  351. // Persist some peers to disk right away.
  352. // NOTE: integration tests depend on this
  353. addrBook.Save()
  354. }
  355. // permute the list, dial them in random order.
  356. perm := sw.rng.Perm(len(netAddrs))
  357. for i := 0; i < len(perm); i++ {
  358. go func(i int) {
  359. j := perm[i]
  360. addr := netAddrs[j]
  361. // do not dial ourselves
  362. if addr.Same(ourAddr) {
  363. return
  364. }
  365. sw.randomSleep(0)
  366. err := sw.DialPeerWithAddress(addr, persistent)
  367. if err != nil {
  368. switch err.(type) {
  369. case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID:
  370. sw.Logger.Debug("Error dialing peer", "err", err)
  371. default:
  372. sw.Logger.Error("Error dialing peer", "err", err)
  373. }
  374. }
  375. }(i)
  376. }
  377. return nil
  378. }
  379. // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully.
  380. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
  381. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error {
  382. sw.dialing.Set(string(addr.ID), addr)
  383. defer sw.dialing.Delete(string(addr.ID))
  384. return sw.addOutboundPeerWithConfig(addr, sw.config, persistent)
  385. }
  386. // sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds]
  387. func (sw *Switch) randomSleep(interval time.Duration) {
  388. r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond
  389. time.Sleep(r + interval)
  390. }
  391. //------------------------------------------------------------------------------------
  392. // Connection filtering
  393. // FilterConnByAddr returns an error if connecting to the given address is forbidden.
  394. func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
  395. if sw.filterConnByAddr != nil {
  396. return sw.filterConnByAddr(addr)
  397. }
  398. return nil
  399. }
  400. // FilterConnByID returns an error if connecting to the given peer ID is forbidden.
  401. func (sw *Switch) FilterConnByID(id ID) error {
  402. if sw.filterConnByID != nil {
  403. return sw.filterConnByID(id)
  404. }
  405. return nil
  406. }
  407. // SetAddrFilter sets the function for filtering connections by address.
  408. func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
  409. sw.filterConnByAddr = f
  410. }
  411. // SetIDFilter sets the function for filtering connections by peer ID.
  412. func (sw *Switch) SetIDFilter(f func(ID) error) {
  413. sw.filterConnByID = f
  414. }
  415. //------------------------------------------------------------------------------------
  416. func (sw *Switch) listenerRoutine(l Listener) {
  417. for {
  418. inConn, ok := <-l.Connections()
  419. if !ok {
  420. break
  421. }
  422. // ignore connection if we already have enough
  423. // leave room for MinNumOutboundPeers
  424. maxPeers := sw.config.MaxNumPeers - DefaultMinNumOutboundPeers
  425. if maxPeers <= sw.peers.Size() {
  426. sw.Logger.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers)
  427. continue
  428. }
  429. // New inbound connection!
  430. err := sw.addInboundPeerWithConfig(inConn, sw.config)
  431. if err != nil {
  432. sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err)
  433. continue
  434. }
  435. }
  436. // cleanup
  437. }
  438. func (sw *Switch) addInboundPeerWithConfig(
  439. conn net.Conn,
  440. config *config.P2PConfig,
  441. ) error {
  442. peerConn, err := newInboundPeerConn(conn, config, sw.nodeKey.PrivKey)
  443. if err != nil {
  444. conn.Close() // peer is nil
  445. return err
  446. }
  447. if err = sw.addPeer(peerConn); err != nil {
  448. peerConn.CloseConn()
  449. return err
  450. }
  451. return nil
  452. }
  453. // dial the peer; make secret connection; authenticate against the dialed ID;
  454. // add the peer.
  455. // if dialing fails, start the reconnect loop. If handhsake fails, its over.
  456. // If peer is started succesffuly, reconnectLoop will start when
  457. // StopPeerForError is called
  458. func (sw *Switch) addOutboundPeerWithConfig(
  459. addr *NetAddress,
  460. config *config.P2PConfig,
  461. persistent bool,
  462. ) error {
  463. sw.Logger.Info("Dialing peer", "address", addr)
  464. peerConn, err := newOutboundPeerConn(
  465. addr,
  466. config,
  467. persistent,
  468. sw.nodeKey.PrivKey,
  469. )
  470. if err != nil {
  471. if persistent {
  472. go sw.reconnectToPeer(addr)
  473. }
  474. return err
  475. }
  476. if err := sw.addPeer(peerConn); err != nil {
  477. peerConn.CloseConn()
  478. return err
  479. }
  480. return nil
  481. }
  482. // addPeer performs the Tendermint P2P handshake with a peer
  483. // that already has a SecretConnection. If all goes well,
  484. // it starts the peer and adds it to the switch.
  485. // NOTE: This performs a blocking handshake before the peer is added.
  486. // NOTE: If error is returned, caller is responsible for calling
  487. // peer.CloseConn()
  488. func (sw *Switch) addPeer(pc peerConn) error {
  489. addr := pc.conn.RemoteAddr()
  490. if err := sw.FilterConnByAddr(addr); err != nil {
  491. return err
  492. }
  493. // Exchange NodeInfo on the conn
  494. peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.HandshakeTimeout))
  495. if err != nil {
  496. return err
  497. }
  498. peerID := peerNodeInfo.ID
  499. // ensure connection key matches self reported key
  500. connID := pc.ID()
  501. if peerID != connID {
  502. return fmt.Errorf(
  503. "nodeInfo.ID() (%v) doesn't match conn.ID() (%v)",
  504. peerID,
  505. connID,
  506. )
  507. }
  508. // Validate the peers nodeInfo
  509. if err := peerNodeInfo.Validate(); err != nil {
  510. return err
  511. }
  512. // Avoid self
  513. if sw.nodeKey.ID() == peerID {
  514. addr := peerNodeInfo.NetAddress()
  515. // remove the given address from the address book
  516. // and add to our addresses to avoid dialing again
  517. sw.addrBook.RemoveAddress(addr)
  518. sw.addrBook.AddOurAddress(addr)
  519. return ErrSwitchConnectToSelf{addr}
  520. }
  521. // Avoid duplicate
  522. if sw.peers.Has(peerID) {
  523. return ErrSwitchDuplicatePeerID{peerID}
  524. }
  525. // Check for duplicate connection or peer info IP.
  526. if !sw.config.AllowDuplicateIP &&
  527. (sw.peers.HasIP(pc.RemoteIP()) ||
  528. sw.peers.HasIP(peerNodeInfo.NetAddress().IP)) {
  529. return ErrSwitchDuplicatePeerIP{pc.RemoteIP()}
  530. }
  531. // Filter peer against ID white list
  532. if err := sw.FilterConnByID(peerID); err != nil {
  533. return err
  534. }
  535. // Check version, chain id
  536. if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
  537. return err
  538. }
  539. peer := newPeer(pc, sw.mConfig, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  540. peer.SetLogger(sw.Logger.With("peer", addr))
  541. peer.Logger.Info("Successful handshake with peer", "peerNodeInfo", peerNodeInfo)
  542. // All good. Start peer
  543. if sw.IsRunning() {
  544. if err = sw.startInitPeer(peer); err != nil {
  545. return err
  546. }
  547. }
  548. // Add the peer to .peers.
  549. // We start it first so that a peer in the list is safe to Stop.
  550. // It should not err since we already checked peers.Has().
  551. if err := sw.peers.Add(peer); err != nil {
  552. return err
  553. }
  554. sw.metrics.Peers.Add(float64(1))
  555. sw.Logger.Info("Added peer", "peer", peer)
  556. return nil
  557. }
  558. func (sw *Switch) startInitPeer(peer *peer) error {
  559. err := peer.Start() // spawn send/recv routines
  560. if err != nil {
  561. // Should never happen
  562. sw.Logger.Error("Error starting peer", "peer", peer, "err", err)
  563. return err
  564. }
  565. for _, reactor := range sw.reactors {
  566. reactor.AddPeer(peer)
  567. }
  568. return nil
  569. }