You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1014 lines
28 KiB

p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
  1. package p2p
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math/rand"
  8. "net"
  9. "runtime"
  10. "sync"
  11. "time"
  12. "github.com/gogo/protobuf/proto"
  13. "github.com/tendermint/tendermint/crypto"
  14. "github.com/tendermint/tendermint/libs/log"
  15. "github.com/tendermint/tendermint/libs/service"
  16. "github.com/tendermint/tendermint/types"
  17. )
  18. const queueBufferDefault = 32
  19. // RouterOptions specifies options for a Router.
  20. type RouterOptions struct {
  21. // ResolveTimeout is the timeout for resolving NodeAddress URLs.
  22. // 0 means no timeout.
  23. ResolveTimeout time.Duration
  24. // DialTimeout is the timeout for dialing a peer. 0 means no timeout.
  25. DialTimeout time.Duration
  26. // HandshakeTimeout is the timeout for handshaking with a peer. 0 means
  27. // no timeout.
  28. HandshakeTimeout time.Duration
  29. // QueueType must be, "priority", or "fifo". Defaults to
  30. // "fifo".
  31. QueueType string
  32. // MaxIncomingConnectionAttempts rate limits the number of incoming connection
  33. // attempts per IP address. Defaults to 100.
  34. MaxIncomingConnectionAttempts uint
  35. // IncomingConnectionWindow describes how often an IP address
  36. // can attempt to create a new connection. Defaults to 10
  37. // milliseconds, and cannot be less than 1 millisecond.
  38. IncomingConnectionWindow time.Duration
  39. // FilterPeerByIP is used by the router to inject filtering
  40. // behavior for new incoming connections. The router passes
  41. // the remote IP of the incoming connection the port number as
  42. // arguments. Functions should return an error to reject the
  43. // peer.
  44. FilterPeerByIP func(context.Context, net.IP, uint16) error
  45. // FilterPeerByID is used by the router to inject filtering
  46. // behavior for new incoming connections. The router passes
  47. // the NodeID of the node before completing the connection,
  48. // but this occurs after the handshake is complete. Filter by
  49. // IP address to filter before the handshake. Functions should
  50. // return an error to reject the peer.
  51. FilterPeerByID func(context.Context, types.NodeID) error
  52. // DialSleep controls the amount of time that the router
  53. // sleeps between dialing peers. If not set, a default value
  54. // is used that sleeps for a (random) amount of time up to 3
  55. // seconds between submitting each peer to be dialed.
  56. DialSleep func(context.Context)
  57. // NumConcrruentDials controls how many parallel go routines
  58. // are used to dial peers. This defaults to the value of
  59. // runtime.NumCPU.
  60. NumConcurrentDials func() int
  61. }
  62. const (
  63. queueTypeFifo = "fifo"
  64. queueTypePriority = "priority"
  65. )
  66. // Validate validates router options.
  67. func (o *RouterOptions) Validate() error {
  68. switch o.QueueType {
  69. case "":
  70. o.QueueType = queueTypeFifo
  71. case queueTypeFifo, queueTypePriority:
  72. // pass
  73. default:
  74. return fmt.Errorf("queue type %q is not supported", o.QueueType)
  75. }
  76. switch {
  77. case o.IncomingConnectionWindow == 0:
  78. o.IncomingConnectionWindow = 100 * time.Millisecond
  79. case o.IncomingConnectionWindow < time.Millisecond:
  80. return fmt.Errorf("incomming connection window must be grater than 1m [%s]",
  81. o.IncomingConnectionWindow)
  82. }
  83. if o.MaxIncomingConnectionAttempts == 0 {
  84. o.MaxIncomingConnectionAttempts = 100
  85. }
  86. return nil
  87. }
  88. // Router manages peer connections and routes messages between peers and reactor
  89. // channels. It takes a PeerManager for peer lifecycle management (e.g. which
  90. // peers to dial and when) and a set of Transports for connecting and
  91. // communicating with peers.
  92. //
  93. // On startup, three main goroutines are spawned to maintain peer connections:
  94. //
  95. // dialPeers(): in a loop, calls PeerManager.DialNext() to get the next peer
  96. // address to dial and spawns a goroutine that dials the peer, handshakes
  97. // with it, and begins to route messages if successful.
  98. //
  99. // acceptPeers(): in a loop, waits for an inbound connection via
  100. // Transport.Accept() and spawns a goroutine that handshakes with it and
  101. // begins to route messages if successful.
  102. //
  103. // evictPeers(): in a loop, calls PeerManager.EvictNext() to get the next
  104. // peer to evict, and disconnects it by closing its message queue.
  105. //
  106. // When a peer is connected, an outbound peer message queue is registered in
  107. // peerQueues, and routePeer() is called to spawn off two additional goroutines:
  108. //
  109. // sendPeer(): waits for an outbound message from the peerQueues queue,
  110. // marshals it, and passes it to the peer transport which delivers it.
  111. //
  112. // receivePeer(): waits for an inbound message from the peer transport,
  113. // unmarshals it, and passes it to the appropriate inbound channel queue
  114. // in channelQueues.
  115. //
  116. // When a reactor opens a channel via OpenChannel, an inbound channel message
  117. // queue is registered in channelQueues, and a channel goroutine is spawned:
  118. //
  119. // routeChannel(): waits for an outbound message from the channel, looks
  120. // up the recipient peer's outbound message queue in peerQueues, and submits
  121. // the message to it.
  122. //
  123. // All channel sends in the router are blocking. It is the responsibility of the
  124. // queue interface in peerQueues and channelQueues to prioritize and drop
  125. // messages as appropriate during contention to prevent stalls and ensure good
  126. // quality of service.
  127. type Router struct {
  128. *service.BaseService
  129. logger log.Logger
  130. metrics *Metrics
  131. options RouterOptions
  132. nodeInfo types.NodeInfo
  133. privKey crypto.PrivKey
  134. peerManager *PeerManager
  135. chDescs []*ChannelDescriptor
  136. transports []Transport
  137. endpoints []Endpoint
  138. connTracker connectionTracker
  139. protocolTransports map[Protocol]Transport
  140. peerMtx sync.RWMutex
  141. peerQueues map[types.NodeID]queue // outbound messages per peer for all channels
  142. // the channels that the peer queue has open
  143. peerChannels map[types.NodeID]channelIDs
  144. queueFactory func(int) queue
  145. // FIXME: We don't strictly need to use a mutex for this if we seal the
  146. // channels on router start. This depends on whether we want to allow
  147. // dynamic channels in the future.
  148. channelMtx sync.RWMutex
  149. channelQueues map[ChannelID]queue // inbound messages from all peers to a single channel
  150. channelMessages map[ChannelID]proto.Message
  151. }
  152. // NewRouter creates a new Router. The given Transports must already be
  153. // listening on appropriate interfaces, and will be closed by the Router when it
  154. // stops.
  155. func NewRouter(
  156. ctx context.Context,
  157. logger log.Logger,
  158. metrics *Metrics,
  159. nodeInfo types.NodeInfo,
  160. privKey crypto.PrivKey,
  161. peerManager *PeerManager,
  162. transports []Transport,
  163. endpoints []Endpoint,
  164. options RouterOptions,
  165. ) (*Router, error) {
  166. if err := options.Validate(); err != nil {
  167. return nil, err
  168. }
  169. router := &Router{
  170. logger: logger,
  171. metrics: metrics,
  172. nodeInfo: nodeInfo,
  173. privKey: privKey,
  174. connTracker: newConnTracker(
  175. options.MaxIncomingConnectionAttempts,
  176. options.IncomingConnectionWindow,
  177. ),
  178. chDescs: make([]*ChannelDescriptor, 0),
  179. transports: transports,
  180. endpoints: endpoints,
  181. protocolTransports: map[Protocol]Transport{},
  182. peerManager: peerManager,
  183. options: options,
  184. channelQueues: map[ChannelID]queue{},
  185. channelMessages: map[ChannelID]proto.Message{},
  186. peerQueues: map[types.NodeID]queue{},
  187. peerChannels: make(map[types.NodeID]channelIDs),
  188. }
  189. router.BaseService = service.NewBaseService(logger, "router", router)
  190. qf, err := router.createQueueFactory(ctx)
  191. if err != nil {
  192. return nil, err
  193. }
  194. router.queueFactory = qf
  195. for _, transport := range transports {
  196. for _, protocol := range transport.Protocols() {
  197. if _, ok := router.protocolTransports[protocol]; !ok {
  198. router.protocolTransports[protocol] = transport
  199. }
  200. }
  201. }
  202. return router, nil
  203. }
  204. func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error) {
  205. switch r.options.QueueType {
  206. case queueTypeFifo:
  207. return newFIFOQueue, nil
  208. case queueTypePriority:
  209. return func(size int) queue {
  210. if size%2 != 0 {
  211. size++
  212. }
  213. q := newPQScheduler(r.logger, r.metrics, r.chDescs, uint(size)/2, uint(size)/2, defaultCapacity)
  214. q.start(ctx)
  215. return q
  216. }, nil
  217. default:
  218. return nil, fmt.Errorf("cannot construct queue of type %q", r.options.QueueType)
  219. }
  220. }
  221. // ChannelCreator allows routers to construct their own channels,
  222. // either by receiving a reference to Router.OpenChannel or using some
  223. // kind shim for testing purposes.
  224. type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error)
  225. // OpenChannel opens a new channel for the given message type. The caller must
  226. // close the channel when done, before stopping the Router. messageType is the
  227. // type of message passed through the channel (used for unmarshaling), which can
  228. // implement Wrapper to automatically (un)wrap multiple message types in a
  229. // wrapper message. The caller may provide a size to make the channel buffered,
  230. // which internally makes the inbound, outbound, and error channel buffered.
  231. func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*Channel, error) {
  232. r.channelMtx.Lock()
  233. defer r.channelMtx.Unlock()
  234. id := chDesc.ID
  235. if _, ok := r.channelQueues[id]; ok {
  236. return nil, fmt.Errorf("channel %v already exists", id)
  237. }
  238. r.chDescs = append(r.chDescs, chDesc)
  239. messageType := chDesc.MessageType
  240. queue := r.queueFactory(chDesc.RecvBufferCapacity)
  241. outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
  242. errCh := make(chan PeerError, chDesc.RecvBufferCapacity)
  243. channel := NewChannel(id, messageType, queue.dequeue(), outCh, errCh)
  244. var wrapper Wrapper
  245. if w, ok := messageType.(Wrapper); ok {
  246. wrapper = w
  247. }
  248. r.channelQueues[id] = queue
  249. r.channelMessages[id] = messageType
  250. // add the channel to the nodeInfo if it's not already there.
  251. r.nodeInfo.AddChannel(uint16(chDesc.ID))
  252. for _, t := range r.transports {
  253. t.AddChannelDescriptors([]*ChannelDescriptor{chDesc})
  254. }
  255. go func() {
  256. defer func() {
  257. r.channelMtx.Lock()
  258. delete(r.channelQueues, id)
  259. delete(r.channelMessages, id)
  260. r.channelMtx.Unlock()
  261. queue.close()
  262. }()
  263. r.routeChannel(ctx, id, outCh, errCh, wrapper)
  264. }()
  265. return channel, nil
  266. }
  267. // routeChannel receives outbound channel messages and routes them to the
  268. // appropriate peer. It also receives peer errors and reports them to the peer
  269. // manager. It returns when either the outbound channel or error channel is
  270. // closed, or the Router is stopped. wrapper is an optional message wrapper
  271. // for messages, see Wrapper for details.
  272. func (r *Router) routeChannel(
  273. ctx context.Context,
  274. chID ChannelID,
  275. outCh <-chan Envelope,
  276. errCh <-chan PeerError,
  277. wrapper Wrapper,
  278. ) {
  279. for {
  280. select {
  281. case envelope, ok := <-outCh:
  282. if !ok {
  283. return
  284. }
  285. // Mark the envelope with the channel ID to allow sendPeer() to pass
  286. // it on to Transport.SendMessage().
  287. envelope.ChannelID = chID
  288. // wrap the message in a wrapper message, if requested
  289. if wrapper != nil {
  290. msg := proto.Clone(wrapper)
  291. if err := msg.(Wrapper).Wrap(envelope.Message); err != nil {
  292. r.logger.Error("failed to wrap message", "channel", chID, "err", err)
  293. continue
  294. }
  295. envelope.Message = msg
  296. }
  297. // collect peer queues to pass the message via
  298. var queues []queue
  299. if envelope.Broadcast {
  300. r.peerMtx.RLock()
  301. queues = make([]queue, 0, len(r.peerQueues))
  302. for nodeID, q := range r.peerQueues {
  303. peerChs := r.peerChannels[nodeID]
  304. // check whether the peer is receiving on that channel
  305. if _, ok := peerChs[chID]; ok {
  306. queues = append(queues, q)
  307. }
  308. }
  309. r.peerMtx.RUnlock()
  310. } else {
  311. r.peerMtx.RLock()
  312. q, ok := r.peerQueues[envelope.To]
  313. contains := false
  314. if ok {
  315. peerChs := r.peerChannels[envelope.To]
  316. // check whether the peer is receiving on that channel
  317. _, contains = peerChs[chID]
  318. }
  319. r.peerMtx.RUnlock()
  320. if !ok {
  321. r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID)
  322. continue
  323. }
  324. if !contains {
  325. // reactor tried to send a message across a channel that the
  326. // peer doesn't have available. This is a known issue due to
  327. // how peer subscriptions work:
  328. // https://github.com/tendermint/tendermint/issues/6598
  329. continue
  330. }
  331. queues = []queue{q}
  332. }
  333. // send message to peers
  334. for _, q := range queues {
  335. start := time.Now().UTC()
  336. select {
  337. case q.enqueue() <- envelope:
  338. r.metrics.RouterPeerQueueSend.Observe(time.Since(start).Seconds())
  339. case <-q.closed():
  340. r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID)
  341. case <-ctx.Done():
  342. return
  343. }
  344. }
  345. case peerError, ok := <-errCh:
  346. if !ok {
  347. return
  348. }
  349. r.logger.Error("peer error, evicting", "peer", peerError.NodeID, "err", peerError.Err)
  350. r.peerManager.Errored(peerError.NodeID, peerError.Err)
  351. case <-ctx.Done():
  352. return
  353. }
  354. }
  355. }
  356. func (r *Router) numConccurentDials() int {
  357. if r.options.NumConcurrentDials == nil {
  358. return runtime.NumCPU()
  359. }
  360. return r.options.NumConcurrentDials()
  361. }
  362. func (r *Router) filterPeersIP(ctx context.Context, ip net.IP, port uint16) error {
  363. if r.options.FilterPeerByIP == nil {
  364. return nil
  365. }
  366. return r.options.FilterPeerByIP(ctx, ip, port)
  367. }
  368. func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error {
  369. if r.options.FilterPeerByID == nil {
  370. return nil
  371. }
  372. return r.options.FilterPeerByID(ctx, id)
  373. }
  374. func (r *Router) dialSleep(ctx context.Context) {
  375. if r.options.DialSleep == nil {
  376. const (
  377. maxDialerInterval = 3000
  378. minDialerInterval = 250
  379. )
  380. // nolint:gosec // G404: Use of weak random number generator
  381. dur := time.Duration(rand.Int63n(maxDialerInterval-minDialerInterval+1) + minDialerInterval)
  382. timer := time.NewTimer(dur * time.Millisecond)
  383. defer timer.Stop()
  384. select {
  385. case <-ctx.Done():
  386. case <-timer.C:
  387. }
  388. return
  389. }
  390. r.options.DialSleep(ctx)
  391. }
  392. // acceptPeers accepts inbound connections from peers on the given transport,
  393. // and spawns goroutines that route messages to/from them.
  394. func (r *Router) acceptPeers(ctx context.Context, transport Transport) {
  395. for {
  396. conn, err := transport.Accept(ctx)
  397. switch err {
  398. case nil:
  399. case io.EOF:
  400. r.logger.Debug("stopping accept routine", "transport", transport)
  401. return
  402. default:
  403. r.logger.Error("failed to accept connection", "transport", transport, "err", err)
  404. return
  405. }
  406. incomingIP := conn.RemoteEndpoint().IP
  407. if err := r.connTracker.AddConn(incomingIP); err != nil {
  408. closeErr := conn.Close()
  409. r.logger.Debug("rate limiting incoming peer",
  410. "err", err,
  411. "ip", incomingIP.String(),
  412. "close_err", closeErr,
  413. )
  414. return
  415. }
  416. // Spawn a goroutine for the handshake, to avoid head-of-line blocking.
  417. go r.openConnection(ctx, conn)
  418. }
  419. }
  420. func (r *Router) openConnection(ctx context.Context, conn Connection) {
  421. defer conn.Close()
  422. defer r.connTracker.RemoveConn(conn.RemoteEndpoint().IP)
  423. re := conn.RemoteEndpoint()
  424. incomingIP := re.IP
  425. if err := r.filterPeersIP(ctx, incomingIP, re.Port); err != nil {
  426. r.logger.Debug("peer filtered by IP", "ip", incomingIP.String(), "err", err)
  427. return
  428. }
  429. // FIXME: The peer manager may reject the peer during Accepted()
  430. // after we've handshaked with the peer (to find out which peer it
  431. // is). However, because the handshake has no ack, the remote peer
  432. // will think the handshake was successful and start sending us
  433. // messages.
  434. //
  435. // This can cause problems in tests, where a disconnection can cause
  436. // the local node to immediately redial, while the remote node may
  437. // not have completed the disconnection yet and therefore reject the
  438. // reconnection attempt (since it thinks we're still connected from
  439. // before).
  440. //
  441. // The Router should do the handshake and have a final ack/fail
  442. // message to make sure both ends have accepted the connection, such
  443. // that it can be coordinated with the peer manager.
  444. peerInfo, err := r.handshakePeer(ctx, conn, "")
  445. switch {
  446. case errors.Is(err, context.Canceled):
  447. return
  448. case err != nil:
  449. r.logger.Error("peer handshake failed", "endpoint", conn, "err", err)
  450. return
  451. }
  452. if err := r.filterPeersID(ctx, peerInfo.NodeID); err != nil {
  453. r.logger.Debug("peer filtered by node ID", "node", peerInfo.NodeID, "err", err)
  454. return
  455. }
  456. if err := r.runWithPeerMutex(func() error { return r.peerManager.Accepted(peerInfo.NodeID) }); err != nil {
  457. r.logger.Error("failed to accept connection",
  458. "op", "incoming/accepted", "peer", peerInfo.NodeID, "err", err)
  459. return
  460. }
  461. r.routePeer(ctx, peerInfo.NodeID, conn, toChannelIDs(peerInfo.Channels))
  462. }
  463. // dialPeers maintains outbound connections to peers by dialing them.
  464. func (r *Router) dialPeers(ctx context.Context) {
  465. addresses := make(chan NodeAddress)
  466. wg := &sync.WaitGroup{}
  467. // Start a limited number of goroutines to dial peers in
  468. // parallel. the goal is to avoid starting an unbounded number
  469. // of goroutines thereby spamming the network, but also being
  470. // able to add peers at a reasonable pace, though the number
  471. // is somewhat arbitrary. The action is further throttled by a
  472. // sleep after sending to the addresses channel.
  473. for i := 0; i < r.numConccurentDials(); i++ {
  474. wg.Add(1)
  475. go func() {
  476. defer wg.Done()
  477. for {
  478. select {
  479. case <-ctx.Done():
  480. return
  481. case address := <-addresses:
  482. r.connectPeer(ctx, address)
  483. }
  484. }
  485. }()
  486. }
  487. LOOP:
  488. for {
  489. address, err := r.peerManager.DialNext(ctx)
  490. switch {
  491. case errors.Is(err, context.Canceled):
  492. break LOOP
  493. case err != nil:
  494. r.logger.Error("failed to find next peer to dial", "err", err)
  495. break LOOP
  496. }
  497. select {
  498. case addresses <- address:
  499. // this jitters the frequency that we call
  500. // DialNext and prevents us from attempting to
  501. // create connections too quickly.
  502. r.dialSleep(ctx)
  503. continue
  504. case <-ctx.Done():
  505. close(addresses)
  506. break LOOP
  507. }
  508. }
  509. wg.Wait()
  510. }
  511. func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
  512. conn, err := r.dialPeer(ctx, address)
  513. switch {
  514. case errors.Is(err, context.Canceled):
  515. return
  516. case err != nil:
  517. r.logger.Error("failed to dial peer", "peer", address, "err", err)
  518. if err = r.peerManager.DialFailed(ctx, address); err != nil {
  519. r.logger.Error("failed to report dial failure", "peer", address, "err", err)
  520. }
  521. return
  522. }
  523. peerInfo, err := r.handshakePeer(ctx, conn, address.NodeID)
  524. switch {
  525. case errors.Is(err, context.Canceled):
  526. conn.Close()
  527. return
  528. case err != nil:
  529. r.logger.Error("failed to handshake with peer", "peer", address, "err", err)
  530. if err = r.peerManager.DialFailed(ctx, address); err != nil {
  531. r.logger.Error("failed to report dial failure", "peer", address, "err", err)
  532. }
  533. conn.Close()
  534. return
  535. }
  536. if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil {
  537. r.logger.Error("failed to dial peer",
  538. "op", "outgoing/dialing", "peer", address.NodeID, "err", err)
  539. conn.Close()
  540. return
  541. }
  542. // routePeer (also) calls connection close
  543. go r.routePeer(ctx, address.NodeID, conn, toChannelIDs(peerInfo.Channels))
  544. }
  545. func (r *Router) getOrMakeQueue(peerID types.NodeID, channels channelIDs) queue {
  546. r.peerMtx.Lock()
  547. defer r.peerMtx.Unlock()
  548. if peerQueue, ok := r.peerQueues[peerID]; ok {
  549. return peerQueue
  550. }
  551. peerQueue := r.queueFactory(queueBufferDefault)
  552. r.peerQueues[peerID] = peerQueue
  553. r.peerChannels[peerID] = channels
  554. return peerQueue
  555. }
  556. // dialPeer connects to a peer by dialing it.
  557. func (r *Router) dialPeer(ctx context.Context, address NodeAddress) (Connection, error) {
  558. resolveCtx := ctx
  559. if r.options.ResolveTimeout > 0 {
  560. var cancel context.CancelFunc
  561. resolveCtx, cancel = context.WithTimeout(resolveCtx, r.options.ResolveTimeout)
  562. defer cancel()
  563. }
  564. r.logger.Debug("resolving peer address", "peer", address)
  565. endpoints, err := address.Resolve(resolveCtx)
  566. switch {
  567. case err != nil:
  568. return nil, fmt.Errorf("failed to resolve address %q: %w", address, err)
  569. case len(endpoints) == 0:
  570. return nil, fmt.Errorf("address %q did not resolve to any endpoints", address)
  571. }
  572. for _, endpoint := range endpoints {
  573. transport, ok := r.protocolTransports[endpoint.Protocol]
  574. if !ok {
  575. r.logger.Error("no transport found for protocol", "endpoint", endpoint)
  576. continue
  577. }
  578. dialCtx := ctx
  579. if r.options.DialTimeout > 0 {
  580. var cancel context.CancelFunc
  581. dialCtx, cancel = context.WithTimeout(dialCtx, r.options.DialTimeout)
  582. defer cancel()
  583. }
  584. // FIXME: When we dial and handshake the peer, we should pass it
  585. // appropriate address(es) it can use to dial us back. It can't use our
  586. // remote endpoint, since TCP uses different port numbers for outbound
  587. // connections than it does for inbound. Also, we may need to vary this
  588. // by the peer's endpoint, since e.g. a peer on 192.168.0.0 can reach us
  589. // on a private address on this endpoint, but a peer on the public
  590. // Internet can't and needs a different public address.
  591. conn, err := transport.Dial(dialCtx, endpoint)
  592. if err != nil {
  593. r.logger.Error("failed to dial endpoint", "peer", address.NodeID, "endpoint", endpoint, "err", err)
  594. } else {
  595. r.logger.Debug("dialed peer", "peer", address.NodeID, "endpoint", endpoint)
  596. return conn, nil
  597. }
  598. }
  599. return nil, errors.New("all endpoints failed")
  600. }
  601. // handshakePeer handshakes with a peer, validating the peer's information. If
  602. // expectID is given, we check that the peer's info matches it.
  603. func (r *Router) handshakePeer(
  604. ctx context.Context,
  605. conn Connection,
  606. expectID types.NodeID,
  607. ) (types.NodeInfo, error) {
  608. if r.options.HandshakeTimeout > 0 {
  609. var cancel context.CancelFunc
  610. ctx, cancel = context.WithTimeout(ctx, r.options.HandshakeTimeout)
  611. defer cancel()
  612. }
  613. peerInfo, peerKey, err := conn.Handshake(ctx, r.nodeInfo, r.privKey)
  614. if err != nil {
  615. return peerInfo, err
  616. }
  617. if err = peerInfo.Validate(); err != nil {
  618. return peerInfo, fmt.Errorf("invalid handshake NodeInfo: %w", err)
  619. }
  620. if types.NodeIDFromPubKey(peerKey) != peerInfo.NodeID {
  621. return peerInfo, fmt.Errorf("peer's public key did not match its node ID %q (expected %q)",
  622. peerInfo.NodeID, types.NodeIDFromPubKey(peerKey))
  623. }
  624. if expectID != "" && expectID != peerInfo.NodeID {
  625. return peerInfo, fmt.Errorf("expected to connect with peer %q, got %q",
  626. expectID, peerInfo.NodeID)
  627. }
  628. if err := r.nodeInfo.CompatibleWith(peerInfo); err != nil {
  629. return peerInfo, ErrRejected{
  630. err: err,
  631. id: peerInfo.ID(),
  632. isIncompatible: true,
  633. }
  634. }
  635. return peerInfo, nil
  636. }
  637. func (r *Router) runWithPeerMutex(fn func() error) error {
  638. r.peerMtx.Lock()
  639. defer r.peerMtx.Unlock()
  640. return fn()
  641. }
  642. // routePeer routes inbound and outbound messages between a peer and the reactor
  643. // channels. It will close the given connection and send queue when done, or if
  644. // they are closed elsewhere it will cause this method to shut down and return.
  645. func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connection, channels channelIDs) {
  646. r.metrics.Peers.Add(1)
  647. r.peerManager.Ready(ctx, peerID)
  648. sendQueue := r.getOrMakeQueue(peerID, channels)
  649. defer func() {
  650. r.peerMtx.Lock()
  651. delete(r.peerQueues, peerID)
  652. delete(r.peerChannels, peerID)
  653. r.peerMtx.Unlock()
  654. sendQueue.close()
  655. r.peerManager.Disconnected(ctx, peerID)
  656. r.metrics.Peers.Add(-1)
  657. }()
  658. r.logger.Info("peer connected", "peer", peerID, "endpoint", conn)
  659. errCh := make(chan error, 2)
  660. go func() {
  661. select {
  662. case errCh <- r.receivePeer(ctx, peerID, conn):
  663. case <-ctx.Done():
  664. }
  665. }()
  666. go func() {
  667. select {
  668. case errCh <- r.sendPeer(ctx, peerID, conn, sendQueue):
  669. case <-ctx.Done():
  670. }
  671. }()
  672. var err error
  673. select {
  674. case err = <-errCh:
  675. case <-ctx.Done():
  676. }
  677. _ = conn.Close()
  678. sendQueue.close()
  679. select {
  680. case <-ctx.Done():
  681. case e := <-errCh:
  682. // The first err was nil, so we update it with the second err, which may
  683. // or may not be nil.
  684. if err == nil {
  685. err = e
  686. }
  687. }
  688. // if the context was canceled
  689. if e := ctx.Err(); err == nil && e != nil {
  690. err = e
  691. }
  692. switch err {
  693. case nil, io.EOF:
  694. r.logger.Info("peer disconnected", "peer", peerID, "endpoint", conn)
  695. default:
  696. r.logger.Error("peer failure", "peer", peerID, "endpoint", conn, "err", err)
  697. }
  698. }
  699. // receivePeer receives inbound messages from a peer, deserializes them and
  700. // passes them on to the appropriate channel.
  701. func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Connection) error {
  702. for {
  703. chID, bz, err := conn.ReceiveMessage(ctx)
  704. if err != nil {
  705. return err
  706. }
  707. r.channelMtx.RLock()
  708. queue, ok := r.channelQueues[chID]
  709. messageType := r.channelMessages[chID]
  710. r.channelMtx.RUnlock()
  711. if !ok {
  712. r.logger.Debug("dropping message for unknown channel", "peer", peerID, "channel", chID)
  713. continue
  714. }
  715. msg := proto.Clone(messageType)
  716. if err := proto.Unmarshal(bz, msg); err != nil {
  717. r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err)
  718. continue
  719. }
  720. if wrapper, ok := msg.(Wrapper); ok {
  721. msg, err = wrapper.Unwrap()
  722. if err != nil {
  723. r.logger.Error("failed to unwrap message", "err", err)
  724. continue
  725. }
  726. }
  727. start := time.Now().UTC()
  728. select {
  729. case queue.enqueue() <- Envelope{From: peerID, Message: msg, ChannelID: chID}:
  730. r.metrics.PeerReceiveBytesTotal.With(
  731. "chID", fmt.Sprint(chID),
  732. "peer_id", string(peerID),
  733. "message_type", r.metrics.ValueToMetricLabel(msg)).Add(float64(proto.Size(msg)))
  734. r.metrics.RouterChannelQueueSend.Observe(time.Since(start).Seconds())
  735. r.logger.Debug("received message", "peer", peerID, "message", msg)
  736. case <-queue.closed():
  737. r.logger.Debug("channel closed, dropping message", "peer", peerID, "channel", chID)
  738. case <-ctx.Done():
  739. return nil
  740. }
  741. }
  742. }
  743. // sendPeer sends queued messages to a peer.
  744. func (r *Router) sendPeer(ctx context.Context, peerID types.NodeID, conn Connection, peerQueue queue) error {
  745. for {
  746. start := time.Now().UTC()
  747. select {
  748. case envelope := <-peerQueue.dequeue():
  749. r.metrics.RouterPeerQueueRecv.Observe(time.Since(start).Seconds())
  750. if envelope.Message == nil {
  751. r.logger.Error("dropping nil message", "peer", peerID)
  752. continue
  753. }
  754. bz, err := proto.Marshal(envelope.Message)
  755. if err != nil {
  756. r.logger.Error("failed to marshal message", "peer", peerID, "err", err)
  757. continue
  758. }
  759. if err = conn.SendMessage(ctx, envelope.ChannelID, bz); err != nil {
  760. return err
  761. }
  762. r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message)
  763. case <-peerQueue.closed():
  764. return nil
  765. case <-ctx.Done():
  766. return nil
  767. }
  768. }
  769. }
  770. // evictPeers evicts connected peers as requested by the peer manager.
  771. func (r *Router) evictPeers(ctx context.Context) {
  772. for {
  773. peerID, err := r.peerManager.EvictNext(ctx)
  774. switch {
  775. case errors.Is(err, context.Canceled):
  776. return
  777. case err != nil:
  778. r.logger.Error("failed to find next peer to evict", "err", err)
  779. return
  780. }
  781. r.logger.Info("evicting peer", "peer", peerID)
  782. r.peerMtx.RLock()
  783. queue, ok := r.peerQueues[peerID]
  784. r.peerMtx.RUnlock()
  785. if ok {
  786. queue.close()
  787. }
  788. }
  789. }
  790. // NodeInfo returns a copy of the current NodeInfo. Used for testing.
  791. func (r *Router) NodeInfo() types.NodeInfo {
  792. return r.nodeInfo.Copy()
  793. }
  794. // OnStart implements service.Service.
  795. func (r *Router) OnStart(ctx context.Context) error {
  796. for _, transport := range r.transports {
  797. for _, endpoint := range r.endpoints {
  798. if err := transport.Listen(endpoint); err != nil {
  799. return err
  800. }
  801. }
  802. }
  803. r.logger.Info(
  804. "starting router",
  805. "node_id", r.nodeInfo.NodeID,
  806. "channels", r.nodeInfo.Channels,
  807. "listen_addr", r.nodeInfo.ListenAddr,
  808. "transports", len(r.transports),
  809. )
  810. go r.dialPeers(ctx)
  811. go r.evictPeers(ctx)
  812. for _, transport := range r.transports {
  813. go r.acceptPeers(ctx, transport)
  814. }
  815. return nil
  816. }
  817. // OnStop implements service.Service.
  818. //
  819. // All channels must be closed by OpenChannel() callers before stopping the
  820. // router, to prevent blocked channel sends in reactors. Channels are not closed
  821. // here, since that would cause any reactor senders to panic, so it is the
  822. // sender's responsibility.
  823. func (r *Router) OnStop() {
  824. // Close transport listeners (unblocks Accept calls).
  825. for _, transport := range r.transports {
  826. if err := transport.Close(); err != nil {
  827. r.logger.Error("failed to close transport", "transport", transport, "err", err)
  828. }
  829. }
  830. // Collect all remaining queues, and wait for them to close.
  831. queues := []queue{}
  832. r.channelMtx.RLock()
  833. for _, q := range r.channelQueues {
  834. queues = append(queues, q)
  835. }
  836. r.channelMtx.RUnlock()
  837. r.peerMtx.RLock()
  838. for _, q := range r.peerQueues {
  839. queues = append(queues, q)
  840. }
  841. r.peerMtx.RUnlock()
  842. for _, q := range queues {
  843. q.close()
  844. <-q.closed()
  845. }
  846. }
  847. type channelIDs map[ChannelID]struct{}
  848. func toChannelIDs(bytes []byte) channelIDs {
  849. c := make(map[ChannelID]struct{}, len(bytes))
  850. for _, b := range bytes {
  851. c[ChannelID(b)] = struct{}{}
  852. }
  853. return c
  854. }