You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

682 lines
19 KiB

  1. package p2p
  2. import (
  3. "fmt"
  4. "math"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/tendermint/tendermint/config"
  9. cmn "github.com/tendermint/tendermint/libs/common"
  10. "github.com/tendermint/tendermint/p2p/conn"
  11. )
  12. const (
  13. // wait a random amount of time from this interval
  14. // before dialing peers or reconnecting to help prevent DoS
  15. dialRandomizerIntervalMilliseconds = 3000
  16. // repeatedly try to reconnect for a few minutes
  17. // ie. 5 * 20 = 100s
  18. reconnectAttempts = 20
  19. reconnectInterval = 5 * time.Second
  20. // then move into exponential backoff mode for ~1day
  21. // ie. 3**10 = 16hrs
  22. reconnectBackOffAttempts = 10
  23. reconnectBackOffBaseSeconds = 3
  24. )
  25. //-----------------------------------------------------------------------------
  26. // An AddrBook represents an address book from the pex package, which is used
  27. // to store peer addresses.
  28. type AddrBook interface {
  29. AddAddress(addr *NetAddress, src *NetAddress) error
  30. AddOurAddress(*NetAddress)
  31. OurAddress(*NetAddress) bool
  32. MarkGood(*NetAddress)
  33. RemoveAddress(*NetAddress)
  34. HasAddress(*NetAddress) bool
  35. Save()
  36. }
  37. //-----------------------------------------------------------------------------
  38. // Switch handles peer connections and exposes an API to receive incoming messages
  39. // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
  40. // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
  41. // incoming messages are received on the reactor.
  42. type Switch struct {
  43. cmn.BaseService
  44. config *config.P2PConfig
  45. listeners []Listener
  46. reactors map[string]Reactor
  47. chDescs []*conn.ChannelDescriptor
  48. reactorsByCh map[byte]Reactor
  49. peers *PeerSet
  50. dialing *cmn.CMap
  51. reconnecting *cmn.CMap
  52. nodeInfo NodeInfo // our node info
  53. nodeKey *NodeKey // our node privkey
  54. addrBook AddrBook
  55. filterConnByAddr func(net.Addr) error
  56. filterConnByID func(ID) error
  57. mConfig conn.MConnConfig
  58. rng *cmn.Rand // seed for randomizing dial times and orders
  59. metrics *Metrics
  60. }
  61. // SwitchOption sets an optional parameter on the Switch.
  62. type SwitchOption func(*Switch)
  63. // NewSwitch creates a new Switch with the given config.
  64. func NewSwitch(cfg *config.P2PConfig, options ...SwitchOption) *Switch {
  65. sw := &Switch{
  66. config: cfg,
  67. reactors: make(map[string]Reactor),
  68. chDescs: make([]*conn.ChannelDescriptor, 0),
  69. reactorsByCh: make(map[byte]Reactor),
  70. peers: NewPeerSet(),
  71. dialing: cmn.NewCMap(),
  72. reconnecting: cmn.NewCMap(),
  73. metrics: NopMetrics(),
  74. }
  75. // Ensure we have a completely undeterministic PRNG.
  76. sw.rng = cmn.NewRand()
  77. mConfig := conn.DefaultMConnConfig()
  78. mConfig.FlushThrottle = time.Duration(cfg.FlushThrottleTimeout) * time.Millisecond
  79. mConfig.SendRate = cfg.SendRate
  80. mConfig.RecvRate = cfg.RecvRate
  81. mConfig.MaxPacketMsgPayloadSize = cfg.MaxPacketMsgPayloadSize
  82. sw.mConfig = mConfig
  83. sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
  84. for _, option := range options {
  85. option(sw)
  86. }
  87. return sw
  88. }
  89. // WithMetrics sets the metrics.
  90. func WithMetrics(metrics *Metrics) SwitchOption {
  91. return func(sw *Switch) { sw.metrics = metrics }
  92. }
  93. //---------------------------------------------------------------------
  94. // Switch setup
  95. // AddReactor adds the given reactor to the switch.
  96. // NOTE: Not goroutine safe.
  97. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
  98. // Validate the reactor.
  99. // No two reactors can share the same channel.
  100. reactorChannels := reactor.GetChannels()
  101. for _, chDesc := range reactorChannels {
  102. chID := chDesc.ID
  103. if sw.reactorsByCh[chID] != nil {
  104. cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
  105. }
  106. sw.chDescs = append(sw.chDescs, chDesc)
  107. sw.reactorsByCh[chID] = reactor
  108. }
  109. sw.reactors[name] = reactor
  110. reactor.SetSwitch(sw)
  111. return reactor
  112. }
  113. // Reactors returns a map of reactors registered on the switch.
  114. // NOTE: Not goroutine safe.
  115. func (sw *Switch) Reactors() map[string]Reactor {
  116. return sw.reactors
  117. }
  118. // Reactor returns the reactor with the given name.
  119. // NOTE: Not goroutine safe.
  120. func (sw *Switch) Reactor(name string) Reactor {
  121. return sw.reactors[name]
  122. }
  123. // AddListener adds the given listener to the switch for listening to incoming peer connections.
  124. // NOTE: Not goroutine safe.
  125. func (sw *Switch) AddListener(l Listener) {
  126. sw.listeners = append(sw.listeners, l)
  127. }
  128. // Listeners returns the list of listeners the switch listens on.
  129. // NOTE: Not goroutine safe.
  130. func (sw *Switch) Listeners() []Listener {
  131. return sw.listeners
  132. }
  133. // IsListening returns true if the switch has at least one listener.
  134. // NOTE: Not goroutine safe.
  135. func (sw *Switch) IsListening() bool {
  136. return len(sw.listeners) > 0
  137. }
  138. // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
  139. // NOTE: Not goroutine safe.
  140. func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo) {
  141. sw.nodeInfo = nodeInfo
  142. }
  143. // NodeInfo returns the switch's NodeInfo.
  144. // NOTE: Not goroutine safe.
  145. func (sw *Switch) NodeInfo() NodeInfo {
  146. return sw.nodeInfo
  147. }
  148. // SetNodeKey sets the switch's private key for authenticated encryption.
  149. // NOTE: Not goroutine safe.
  150. func (sw *Switch) SetNodeKey(nodeKey *NodeKey) {
  151. sw.nodeKey = nodeKey
  152. }
  153. //---------------------------------------------------------------------
  154. // Service start/stop
  155. // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
  156. func (sw *Switch) OnStart() error {
  157. // Start reactors
  158. for _, reactor := range sw.reactors {
  159. err := reactor.Start()
  160. if err != nil {
  161. return cmn.ErrorWrap(err, "failed to start %v", reactor)
  162. }
  163. }
  164. // Start listeners
  165. for _, listener := range sw.listeners {
  166. go sw.listenerRoutine(listener)
  167. }
  168. return nil
  169. }
  170. // OnStop implements BaseService. It stops all listeners, peers, and reactors.
  171. func (sw *Switch) OnStop() {
  172. // Stop listeners
  173. for _, listener := range sw.listeners {
  174. listener.Stop()
  175. }
  176. sw.listeners = nil
  177. // Stop peers
  178. for _, peer := range sw.peers.List() {
  179. peer.Stop()
  180. sw.peers.Remove(peer)
  181. }
  182. // Stop reactors
  183. sw.Logger.Debug("Switch: Stopping reactors")
  184. for _, reactor := range sw.reactors {
  185. reactor.Stop()
  186. }
  187. }
  188. //---------------------------------------------------------------------
  189. // Peers
  190. // Broadcast runs a go routine for each attempted send, which will block trying
  191. // to send for defaultSendTimeoutSeconds. Returns a channel which receives
  192. // success values for each attempted send (false if times out). Channel will be
  193. // closed once msg bytes are sent to all peers (or time out).
  194. //
  195. // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
  196. func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
  197. successChan := make(chan bool, len(sw.peers.List()))
  198. sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", fmt.Sprintf("%X", msgBytes))
  199. var wg sync.WaitGroup
  200. for _, peer := range sw.peers.List() {
  201. wg.Add(1)
  202. go func(peer Peer) {
  203. defer wg.Done()
  204. success := peer.Send(chID, msgBytes)
  205. successChan <- success
  206. }(peer)
  207. }
  208. go func() {
  209. wg.Wait()
  210. close(successChan)
  211. }()
  212. return successChan
  213. }
  214. // NumPeers returns the count of outbound/inbound and outbound-dialing peers.
  215. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
  216. peers := sw.peers.List()
  217. for _, peer := range peers {
  218. if peer.IsOutbound() {
  219. outbound++
  220. } else {
  221. inbound++
  222. }
  223. }
  224. dialing = sw.dialing.Size()
  225. return
  226. }
  227. // MaxNumOutboundPeers returns a maximum number of outbound peers.
  228. func (sw *Switch) MaxNumOutboundPeers() int {
  229. return sw.config.MaxNumOutboundPeers
  230. }
  231. // Peers returns the set of peers that are connected to the switch.
  232. func (sw *Switch) Peers() IPeerSet {
  233. return sw.peers
  234. }
  235. // StopPeerForError disconnects from a peer due to external error.
  236. // If the peer is persistent, it will attempt to reconnect.
  237. // TODO: make record depending on reason.
  238. func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
  239. sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason)
  240. sw.stopAndRemovePeer(peer, reason)
  241. if peer.IsPersistent() {
  242. addr := peer.OriginalAddr()
  243. if addr == nil {
  244. // FIXME: persistent peers can't be inbound right now.
  245. // self-reported address for inbound persistent peers
  246. addr = peer.NodeInfo().NetAddress()
  247. }
  248. go sw.reconnectToPeer(addr)
  249. }
  250. }
  251. // StopPeerGracefully disconnects from a peer gracefully.
  252. // TODO: handle graceful disconnects.
  253. func (sw *Switch) StopPeerGracefully(peer Peer) {
  254. sw.Logger.Info("Stopping peer gracefully")
  255. sw.stopAndRemovePeer(peer, nil)
  256. }
  257. func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
  258. sw.peers.Remove(peer)
  259. sw.metrics.Peers.Add(float64(-1))
  260. peer.Stop()
  261. for _, reactor := range sw.reactors {
  262. reactor.RemovePeer(peer, reason)
  263. }
  264. }
  265. // reconnectToPeer tries to reconnect to the addr, first repeatedly
  266. // with a fixed interval, then with exponential backoff.
  267. // If no success after all that, it stops trying, and leaves it
  268. // to the PEX/Addrbook to find the peer with the addr again
  269. // NOTE: this will keep trying even if the handshake or auth fails.
  270. // TODO: be more explicit with error types so we only retry on certain failures
  271. // - ie. if we're getting ErrDuplicatePeer we can stop
  272. // because the addrbook got us the peer back already
  273. func (sw *Switch) reconnectToPeer(addr *NetAddress) {
  274. if sw.reconnecting.Has(string(addr.ID)) {
  275. return
  276. }
  277. sw.reconnecting.Set(string(addr.ID), addr)
  278. defer sw.reconnecting.Delete(string(addr.ID))
  279. start := time.Now()
  280. sw.Logger.Info("Reconnecting to peer", "addr", addr)
  281. for i := 0; i < reconnectAttempts; i++ {
  282. if !sw.IsRunning() {
  283. return
  284. }
  285. err := sw.DialPeerWithAddress(addr, true)
  286. if err == nil {
  287. return // success
  288. }
  289. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  290. // sleep a set amount
  291. sw.randomSleep(reconnectInterval)
  292. continue
  293. }
  294. sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff",
  295. "addr", addr, "elapsed", time.Since(start))
  296. for i := 0; i < reconnectBackOffAttempts; i++ {
  297. if !sw.IsRunning() {
  298. return
  299. }
  300. // sleep an exponentially increasing amount
  301. sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
  302. sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
  303. err := sw.DialPeerWithAddress(addr, true)
  304. if err == nil {
  305. return // success
  306. }
  307. sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
  308. }
  309. sw.Logger.Error("Failed to reconnect to peer. Giving up", "addr", addr, "elapsed", time.Since(start))
  310. }
  311. // SetAddrBook allows to set address book on Switch.
  312. func (sw *Switch) SetAddrBook(addrBook AddrBook) {
  313. sw.addrBook = addrBook
  314. }
  315. // MarkPeerAsGood marks the given peer as good when it did something useful
  316. // like contributed to consensus.
  317. func (sw *Switch) MarkPeerAsGood(peer Peer) {
  318. if sw.addrBook != nil {
  319. sw.addrBook.MarkGood(peer.NodeInfo().NetAddress())
  320. }
  321. }
  322. //---------------------------------------------------------------------
  323. // Dialing
  324. // DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
  325. // Used to dial peers from config on startup or from unsafe-RPC (trusted sources).
  326. // TODO: remove addrBook arg since it's now set on the switch
  327. func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error {
  328. netAddrs, errs := NewNetAddressStrings(peers)
  329. // only log errors, dial correct addresses
  330. for _, err := range errs {
  331. sw.Logger.Error("Error in peer's address", "err", err)
  332. }
  333. ourAddr := sw.nodeInfo.NetAddress()
  334. // TODO: this code feels like it's in the wrong place.
  335. // The integration tests depend on the addrBook being saved
  336. // right away but maybe we can change that. Recall that
  337. // the addrBook is only written to disk every 2min
  338. if addrBook != nil {
  339. // add peers to `addrBook`
  340. for _, netAddr := range netAddrs {
  341. // do not add our address or ID
  342. if !netAddr.Same(ourAddr) {
  343. if err := addrBook.AddAddress(netAddr, ourAddr); err != nil {
  344. sw.Logger.Error("Can't add peer's address to addrbook", "err", err)
  345. }
  346. }
  347. }
  348. // Persist some peers to disk right away.
  349. // NOTE: integration tests depend on this
  350. addrBook.Save()
  351. }
  352. // permute the list, dial them in random order.
  353. perm := sw.rng.Perm(len(netAddrs))
  354. for i := 0; i < len(perm); i++ {
  355. go func(i int) {
  356. j := perm[i]
  357. addr := netAddrs[j]
  358. if addr.Same(ourAddr) {
  359. sw.Logger.Debug("Ignore attempt to connect to ourselves", "addr", addr, "ourAddr", ourAddr)
  360. return
  361. } else if sw.IsDialingOrExistingAddress(addr) {
  362. sw.Logger.Debug("Ignore attempt to connect to an existing peer", "addr", addr)
  363. return
  364. }
  365. sw.randomSleep(0)
  366. err := sw.DialPeerWithAddress(addr, persistent)
  367. if err != nil {
  368. switch err.(type) {
  369. case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID:
  370. sw.Logger.Debug("Error dialing peer", "err", err)
  371. default:
  372. sw.Logger.Error("Error dialing peer", "err", err)
  373. }
  374. }
  375. }(i)
  376. }
  377. return nil
  378. }
  379. // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully.
  380. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
  381. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error {
  382. sw.dialing.Set(string(addr.ID), addr)
  383. defer sw.dialing.Delete(string(addr.ID))
  384. return sw.addOutboundPeerWithConfig(addr, sw.config, persistent)
  385. }
  386. // sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds]
  387. func (sw *Switch) randomSleep(interval time.Duration) {
  388. r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond
  389. time.Sleep(r + interval)
  390. }
  391. // IsDialingOrExistingAddress returns true if switch has a peer with the given
  392. // address or dialing it at the moment.
  393. func (sw *Switch) IsDialingOrExistingAddress(addr *NetAddress) bool {
  394. return sw.dialing.Has(string(addr.ID)) ||
  395. sw.peers.Has(addr.ID) ||
  396. (!sw.config.AllowDuplicateIP && sw.peers.HasIP(addr.IP))
  397. }
  398. //------------------------------------------------------------------------------------
  399. // Connection filtering
  400. // FilterConnByAddr returns an error if connecting to the given address is forbidden.
  401. func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
  402. if sw.filterConnByAddr != nil {
  403. return sw.filterConnByAddr(addr)
  404. }
  405. return nil
  406. }
  407. // FilterConnByID returns an error if connecting to the given peer ID is forbidden.
  408. func (sw *Switch) FilterConnByID(id ID) error {
  409. if sw.filterConnByID != nil {
  410. return sw.filterConnByID(id)
  411. }
  412. return nil
  413. }
  414. // SetAddrFilter sets the function for filtering connections by address.
  415. func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
  416. sw.filterConnByAddr = f
  417. }
  418. // SetIDFilter sets the function for filtering connections by peer ID.
  419. func (sw *Switch) SetIDFilter(f func(ID) error) {
  420. sw.filterConnByID = f
  421. }
  422. //------------------------------------------------------------------------------------
  423. func (sw *Switch) listenerRoutine(l Listener) {
  424. for {
  425. inConn, ok := <-l.Connections()
  426. if !ok {
  427. break
  428. }
  429. // Ignore connection if we already have enough peers.
  430. _, in, _ := sw.NumPeers()
  431. if in >= sw.config.MaxNumInboundPeers {
  432. sw.Logger.Info(
  433. "Ignoring inbound connection: already have enough inbound peers",
  434. "address", inConn.RemoteAddr().String(),
  435. "have", in,
  436. "max", sw.config.MaxNumInboundPeers,
  437. )
  438. inConn.Close()
  439. continue
  440. }
  441. // New inbound connection!
  442. err := sw.addInboundPeerWithConfig(inConn, sw.config)
  443. if err != nil {
  444. sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err)
  445. continue
  446. }
  447. }
  448. // cleanup
  449. }
  450. // closes conn if err is returned
  451. func (sw *Switch) addInboundPeerWithConfig(
  452. conn net.Conn,
  453. config *config.P2PConfig,
  454. ) error {
  455. peerConn, err := newInboundPeerConn(conn, config, sw.nodeKey.PrivKey)
  456. if err != nil {
  457. conn.Close() // peer is nil
  458. return err
  459. }
  460. if err = sw.addPeer(peerConn); err != nil {
  461. peerConn.CloseConn()
  462. return err
  463. }
  464. return nil
  465. }
  466. // dial the peer; make secret connection; authenticate against the dialed ID;
  467. // add the peer.
  468. // if dialing fails, start the reconnect loop. If handhsake fails, its over.
  469. // If peer is started succesffuly, reconnectLoop will start when
  470. // StopPeerForError is called
  471. func (sw *Switch) addOutboundPeerWithConfig(
  472. addr *NetAddress,
  473. config *config.P2PConfig,
  474. persistent bool,
  475. ) error {
  476. sw.Logger.Info("Dialing peer", "address", addr)
  477. peerConn, err := newOutboundPeerConn(
  478. addr,
  479. config,
  480. persistent,
  481. sw.nodeKey.PrivKey,
  482. )
  483. if err != nil {
  484. if persistent {
  485. go sw.reconnectToPeer(addr)
  486. }
  487. return err
  488. }
  489. if err := sw.addPeer(peerConn); err != nil {
  490. peerConn.CloseConn()
  491. return err
  492. }
  493. return nil
  494. }
  495. // addPeer performs the Tendermint P2P handshake with a peer
  496. // that already has a SecretConnection. If all goes well,
  497. // it starts the peer and adds it to the switch.
  498. // NOTE: This performs a blocking handshake before the peer is added.
  499. // NOTE: If error is returned, caller is responsible for calling
  500. // peer.CloseConn()
  501. func (sw *Switch) addPeer(pc peerConn) error {
  502. addr := pc.conn.RemoteAddr()
  503. if err := sw.FilterConnByAddr(addr); err != nil {
  504. return err
  505. }
  506. // Exchange NodeInfo on the conn
  507. peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.HandshakeTimeout))
  508. if err != nil {
  509. return err
  510. }
  511. peerID := peerNodeInfo.ID
  512. // ensure connection key matches self reported key
  513. connID := pc.ID()
  514. if peerID != connID {
  515. return fmt.Errorf(
  516. "nodeInfo.ID() (%v) doesn't match conn.ID() (%v)",
  517. peerID,
  518. connID,
  519. )
  520. }
  521. // Validate the peers nodeInfo
  522. if err := peerNodeInfo.Validate(); err != nil {
  523. return err
  524. }
  525. // Avoid self
  526. if sw.nodeKey.ID() == peerID {
  527. addr := peerNodeInfo.NetAddress()
  528. // remove the given address from the address book
  529. // and add to our addresses to avoid dialing again
  530. if sw.addrBook != nil {
  531. sw.addrBook.RemoveAddress(addr)
  532. sw.addrBook.AddOurAddress(addr)
  533. }
  534. return ErrSwitchConnectToSelf{addr}
  535. }
  536. // Avoid duplicate
  537. if sw.peers.Has(peerID) {
  538. return ErrSwitchDuplicatePeerID{peerID}
  539. }
  540. // Check for duplicate connection or peer info IP.
  541. if !sw.config.AllowDuplicateIP &&
  542. (sw.peers.HasIP(pc.RemoteIP()) ||
  543. sw.peers.HasIP(peerNodeInfo.NetAddress().IP)) {
  544. return ErrSwitchDuplicatePeerIP{pc.RemoteIP()}
  545. }
  546. // Filter peer against ID white list
  547. if err := sw.FilterConnByID(peerID); err != nil {
  548. return err
  549. }
  550. // Check version, chain id
  551. if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
  552. return err
  553. }
  554. peer := newPeer(pc, sw.mConfig, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
  555. peer.SetLogger(sw.Logger.With("peer", addr))
  556. peer.Logger.Info("Successful handshake with peer", "peerNodeInfo", peerNodeInfo)
  557. // All good. Start peer
  558. if sw.IsRunning() {
  559. if err = sw.startInitPeer(peer); err != nil {
  560. return err
  561. }
  562. }
  563. // Add the peer to .peers.
  564. // We start it first so that a peer in the list is safe to Stop.
  565. // It should not err since we already checked peers.Has().
  566. if err := sw.peers.Add(peer); err != nil {
  567. return err
  568. }
  569. sw.metrics.Peers.Add(float64(1))
  570. sw.Logger.Info("Added peer", "peer", peer)
  571. return nil
  572. }
  573. func (sw *Switch) startInitPeer(peer *peer) error {
  574. err := peer.Start() // spawn send/recv routines
  575. if err != nil {
  576. // Should never happen
  577. sw.Logger.Error("Error starting peer", "peer", peer, "err", err)
  578. return err
  579. }
  580. for _, reactor := range sw.reactors {
  581. reactor.AddPeer(peer)
  582. }
  583. return nil
  584. }