You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

778 lines
24 KiB

p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
  1. package p2p
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "time"
  9. "github.com/gogo/protobuf/proto"
  10. "github.com/tendermint/tendermint/crypto"
  11. "github.com/tendermint/tendermint/libs/log"
  12. "github.com/tendermint/tendermint/libs/service"
  13. )
  14. // ChannelID is an arbitrary channel ID.
  15. type ChannelID uint16
  16. // Envelope contains a message with sender/receiver routing info.
  17. type Envelope struct {
  18. From NodeID // sender (empty if outbound)
  19. To NodeID // receiver (empty if inbound)
  20. Broadcast bool // send to all connected peers (ignores To)
  21. Message proto.Message // message payload
  22. // channelID is for internal Router use, set on outbound messages to inform
  23. // the sendPeer() goroutine which transport channel to use.
  24. //
  25. // FIXME: If we migrate the Transport API to a byte-oriented multi-stream
  26. // API, this will no longer be necessary since each channel will be mapped
  27. // onto a stream during channel/peer setup. See:
  28. // https://github.com/tendermint/spec/pull/227
  29. channelID ChannelID
  30. }
  31. // PeerError is a peer error reported via Channel.Error.
  32. //
  33. // FIXME: This currently just disconnects the peer, which is too simplistic.
  34. // For example, some errors should be logged, some should cause disconnects,
  35. // and some should ban the peer.
  36. //
  37. // FIXME: This should probably be replaced by a more general PeerBehavior
  38. // concept that can mark good and bad behavior and contributes to peer scoring.
  39. // It should possibly also allow reactors to request explicit actions, e.g.
  40. // disconnection or banning, in addition to doing this based on aggregates.
  41. type PeerError struct {
  42. NodeID NodeID
  43. Err error
  44. }
  45. // Channel is a bidirectional channel to exchange Protobuf messages with peers,
  46. // wrapped in Envelope to specify routing info (i.e. sender/receiver).
  47. type Channel struct {
  48. ID ChannelID
  49. In <-chan Envelope // inbound messages (peers to reactors)
  50. Out chan<- Envelope // outbound messages (reactors to peers)
  51. Error chan<- PeerError // peer error reporting
  52. messageType proto.Message // the channel's message type, used for unmarshalling
  53. closeCh chan struct{}
  54. closeOnce sync.Once
  55. }
  56. // NewChannel creates a new channel. It is primarily for internal and test
  57. // use, reactors should use Router.OpenChannel().
  58. func NewChannel(
  59. id ChannelID,
  60. messageType proto.Message,
  61. inCh <-chan Envelope,
  62. outCh chan<- Envelope,
  63. errCh chan<- PeerError,
  64. ) *Channel {
  65. return &Channel{
  66. ID: id,
  67. messageType: messageType,
  68. In: inCh,
  69. Out: outCh,
  70. Error: errCh,
  71. closeCh: make(chan struct{}),
  72. }
  73. }
  74. // Close closes the channel. Future sends on Out and Error will panic. The In
  75. // channel remains open to avoid having to synchronize Router senders, which
  76. // should use Done() to detect channel closure instead.
  77. func (c *Channel) Close() {
  78. c.closeOnce.Do(func() {
  79. close(c.closeCh)
  80. close(c.Out)
  81. close(c.Error)
  82. })
  83. }
  84. // Done returns a channel that's closed when Channel.Close() is called.
  85. func (c *Channel) Done() <-chan struct{} {
  86. return c.closeCh
  87. }
  88. // Wrapper is a Protobuf message that can contain a variety of inner messages
  89. // (e.g. via oneof fields). If a Channel's message type implements Wrapper, the
  90. // Router will automatically wrap outbound messages and unwrap inbound messages,
  91. // such that reactors do not have to do this themselves.
  92. type Wrapper interface {
  93. proto.Message
  94. // Wrap will take a message and wrap it in this one if possible.
  95. Wrap(proto.Message) error
  96. // Unwrap will unwrap the inner message contained in this message.
  97. Unwrap() (proto.Message, error)
  98. }
  99. // RouterOptions specifies options for a Router.
  100. type RouterOptions struct {
  101. // ResolveTimeout is the timeout for resolving NodeAddress URLs.
  102. // 0 means no timeout.
  103. ResolveTimeout time.Duration
  104. // DialTimeout is the timeout for dialing a peer. 0 means no timeout.
  105. DialTimeout time.Duration
  106. // HandshakeTimeout is the timeout for handshaking with a peer. 0 means
  107. // no timeout.
  108. HandshakeTimeout time.Duration
  109. }
  110. // Validate validates router options.
  111. func (o *RouterOptions) Validate() error {
  112. return nil
  113. }
  114. // Router manages peer connections and routes messages between peers and reactor
  115. // channels. It takes a PeerManager for peer lifecycle management (e.g. which
  116. // peers to dial and when) and a set of Transports for connecting and
  117. // communicating with peers.
  118. //
  119. // On startup, three main goroutines are spawned to maintain peer connections:
  120. //
  121. // dialPeers(): in a loop, calls PeerManager.DialNext() to get the next peer
  122. // address to dial and spawns a goroutine that dials the peer, handshakes
  123. // with it, and begins to route messages if successful.
  124. //
  125. // acceptPeers(): in a loop, waits for an inbound connection via
  126. // Transport.Accept() and spawns a goroutine that handshakes with it and
  127. // begins to route messages if successful.
  128. //
  129. // evictPeers(): in a loop, calls PeerManager.EvictNext() to get the next
  130. // peer to evict, and disconnects it by closing its message queue.
  131. //
  132. // When a peer is connected, an outbound peer message queue is registered in
  133. // peerQueues, and routePeer() is called to spawn off two additional goroutines:
  134. //
  135. // sendPeer(): waits for an outbound message from the peerQueues queue,
  136. // marshals it, and passes it to the peer transport which delivers it.
  137. //
  138. // receivePeer(): waits for an inbound message from the peer transport,
  139. // unmarshals it, and passes it to the appropriate inbound channel queue
  140. // in channelQueues.
  141. //
  142. // When a reactor opens a channel via OpenChannel, an inbound channel message
  143. // queue is registered in channelQueues, and a channel goroutine is spawned:
  144. //
  145. // routeChannel(): waits for an outbound message from the channel, looks
  146. // up the recipient peer's outbound message queue in peerQueues, and submits
  147. // the message to it.
  148. //
  149. // All channel sends in the router are blocking. It is the responsibility of the
  150. // queue interface in peerQueues and channelQueues to prioritize and drop
  151. // messages as appropriate during contention to prevent stalls and ensure good
  152. // quality of service.
  153. type Router struct {
  154. *service.BaseService
  155. logger log.Logger
  156. options RouterOptions
  157. nodeInfo NodeInfo
  158. privKey crypto.PrivKey
  159. peerManager *PeerManager
  160. transports []Transport
  161. protocolTransports map[Protocol]Transport
  162. stopCh chan struct{} // signals Router shutdown
  163. peerMtx sync.RWMutex
  164. peerQueues map[NodeID]queue
  165. // FIXME: We don't strictly need to use a mutex for this if we seal the
  166. // channels on router start. This depends on whether we want to allow
  167. // dynamic channels in the future.
  168. channelMtx sync.RWMutex
  169. channelQueues map[ChannelID]queue
  170. channelMessages map[ChannelID]proto.Message
  171. }
  172. // NewRouter creates a new Router. The given Transports must already be
  173. // listening on appropriate interfaces, and will be closed by the Router when it
  174. // stops.
  175. func NewRouter(
  176. logger log.Logger,
  177. nodeInfo NodeInfo,
  178. privKey crypto.PrivKey,
  179. peerManager *PeerManager,
  180. transports []Transport,
  181. options RouterOptions,
  182. ) (*Router, error) {
  183. if err := options.Validate(); err != nil {
  184. return nil, err
  185. }
  186. router := &Router{
  187. logger: logger,
  188. nodeInfo: nodeInfo,
  189. privKey: privKey,
  190. transports: transports,
  191. protocolTransports: map[Protocol]Transport{},
  192. peerManager: peerManager,
  193. options: options,
  194. stopCh: make(chan struct{}),
  195. channelQueues: map[ChannelID]queue{},
  196. channelMessages: map[ChannelID]proto.Message{},
  197. peerQueues: map[NodeID]queue{},
  198. }
  199. router.BaseService = service.NewBaseService(logger, "router", router)
  200. for _, transport := range transports {
  201. for _, protocol := range transport.Protocols() {
  202. if _, ok := router.protocolTransports[protocol]; !ok {
  203. router.protocolTransports[protocol] = transport
  204. }
  205. }
  206. }
  207. return router, nil
  208. }
  209. // OpenChannel opens a new channel for the given message type. The caller must
  210. // close the channel when done, before stopping the Router. messageType is the
  211. // type of message passed through the channel (used for unmarshaling), which can
  212. // implement Wrapper to automatically (un)wrap multiple message types in a
  213. // wrapper message.
  214. func (r *Router) OpenChannel(id ChannelID, messageType proto.Message) (*Channel, error) {
  215. queue := newFIFOQueue()
  216. outCh := make(chan Envelope)
  217. errCh := make(chan PeerError)
  218. channel := NewChannel(id, messageType, queue.dequeue(), outCh, errCh)
  219. var wrapper Wrapper
  220. if w, ok := messageType.(Wrapper); ok {
  221. wrapper = w
  222. }
  223. r.channelMtx.Lock()
  224. defer r.channelMtx.Unlock()
  225. if _, ok := r.channelQueues[id]; ok {
  226. return nil, fmt.Errorf("channel %v already exists", id)
  227. }
  228. r.channelQueues[id] = queue
  229. r.channelMessages[id] = messageType
  230. go func() {
  231. defer func() {
  232. r.channelMtx.Lock()
  233. delete(r.channelQueues, id)
  234. delete(r.channelMessages, id)
  235. r.channelMtx.Unlock()
  236. queue.close()
  237. }()
  238. r.routeChannel(id, outCh, errCh, wrapper)
  239. }()
  240. return channel, nil
  241. }
  242. // routeChannel receives outbound channel messages and routes them to the
  243. // appropriate peer. It also receives peer errors and reports them to the peer
  244. // manager. It returns when either the outbound channel or error channel is
  245. // closed, or the Router is stopped. wrapper is an optional message wrapper
  246. // for messages, see Wrapper for details.
  247. func (r *Router) routeChannel(
  248. chID ChannelID,
  249. outCh <-chan Envelope,
  250. errCh <-chan PeerError,
  251. wrapper Wrapper,
  252. ) {
  253. for {
  254. select {
  255. case envelope, ok := <-outCh:
  256. if !ok {
  257. return
  258. }
  259. // Mark the envelope with the channel ID to allow sendPeer() to pass
  260. // it on to Transport.SendMessage().
  261. envelope.channelID = chID
  262. // Wrap the message in a wrapper message, if requested.
  263. if wrapper != nil {
  264. msg := proto.Clone(wrapper)
  265. if err := msg.(Wrapper).Wrap(envelope.Message); err != nil {
  266. r.Logger.Error("failed to wrap message", "channel", chID, "err", err)
  267. continue
  268. }
  269. envelope.Message = msg
  270. }
  271. // Collect peer queues to pass the message via.
  272. var queues []queue
  273. if envelope.Broadcast {
  274. r.peerMtx.RLock()
  275. queues = make([]queue, 0, len(r.peerQueues))
  276. for _, q := range r.peerQueues {
  277. queues = append(queues, q)
  278. }
  279. r.peerMtx.RUnlock()
  280. } else {
  281. r.peerMtx.RLock()
  282. q, ok := r.peerQueues[envelope.To]
  283. r.peerMtx.RUnlock()
  284. if !ok {
  285. r.logger.Debug("dropping message for unconnected peer",
  286. "peer", envelope.To, "channel", chID)
  287. continue
  288. }
  289. queues = []queue{q}
  290. }
  291. // Send message to peers.
  292. for _, q := range queues {
  293. select {
  294. case q.enqueue() <- envelope:
  295. case <-q.closed():
  296. r.logger.Debug("dropping message for unconnected peer",
  297. "peer", envelope.To, "channel", chID)
  298. case <-r.stopCh:
  299. return
  300. }
  301. }
  302. case peerError, ok := <-errCh:
  303. if !ok {
  304. return
  305. }
  306. r.logger.Error("peer error, evicting", "peer", peerError.NodeID, "err", peerError.Err)
  307. if err := r.peerManager.Errored(peerError.NodeID, peerError.Err); err != nil {
  308. r.logger.Error("failed to report peer error", "peer", peerError.NodeID, "err", err)
  309. }
  310. case <-r.stopCh:
  311. return
  312. }
  313. }
  314. }
  315. // acceptPeers accepts inbound connections from peers on the given transport,
  316. // and spawns goroutines that route messages to/from them.
  317. func (r *Router) acceptPeers(transport Transport) {
  318. r.logger.Debug("starting accept routine", "transport", transport)
  319. ctx := r.stopCtx()
  320. for {
  321. // FIXME: We may need transports to enforce some sort of rate limiting
  322. // here (e.g. by IP address), or alternatively have PeerManager.Accepted()
  323. // do it for us.
  324. //
  325. // FIXME: Even though PeerManager enforces MaxConnected, we may want to
  326. // limit the maximum number of active connections here too, since e.g.
  327. // an adversary can open a ton of connections and then just hang during
  328. // the handshake, taking up TCP socket descriptors.
  329. //
  330. // FIXME: The old P2P stack rejected multiple connections for the same IP
  331. // unless P2PConfig.AllowDuplicateIP is true -- it's better to limit this
  332. // by peer ID rather than IP address, so this hasn't been implemented and
  333. // probably shouldn't (?).
  334. //
  335. // FIXME: The old P2P stack supported ABCI-based IP address filtering via
  336. // /p2p/filter/addr/<ip> queries, do we want to implement this here as well?
  337. // Filtering by node ID is probably better.
  338. conn, err := transport.Accept()
  339. switch err {
  340. case nil:
  341. case io.EOF:
  342. r.logger.Debug("stopping accept routine", "transport", transport)
  343. return
  344. default:
  345. r.logger.Error("failed to accept connection", "transport", transport, "err", err)
  346. return
  347. }
  348. // Spawn a goroutine for the handshake, to avoid head-of-line blocking.
  349. go func() {
  350. defer conn.Close()
  351. // FIXME: The peer manager may reject the peer during Accepted()
  352. // after we've handshaked with the peer (to find out which peer it
  353. // is). However, because the handshake has no ack, the remote peer
  354. // will think the handshake was successful and start sending us
  355. // messages.
  356. //
  357. // This can cause problems in tests, where a disconnection can cause
  358. // the local node to immediately redial, while the remote node may
  359. // not have completed the disconnection yet and therefore reject the
  360. // reconnection attempt (since it thinks we're still connected from
  361. // before).
  362. //
  363. // The Router should do the handshake and have a final ack/fail
  364. // message to make sure both ends have accepted the connection, such
  365. // that it can be coordinated with the peer manager.
  366. peerInfo, _, err := r.handshakePeer(ctx, conn, "")
  367. switch {
  368. case errors.Is(err, context.Canceled):
  369. return
  370. case err != nil:
  371. r.logger.Error("peer handshake failed", "endpoint", conn, "err", err)
  372. return
  373. }
  374. if err := r.peerManager.Accepted(peerInfo.NodeID); err != nil {
  375. r.logger.Error("failed to accept connection", "peer", peerInfo.NodeID, "err", err)
  376. return
  377. }
  378. queue := newFIFOQueue()
  379. r.peerMtx.Lock()
  380. r.peerQueues[peerInfo.NodeID] = queue
  381. r.peerMtx.Unlock()
  382. defer func() {
  383. r.peerMtx.Lock()
  384. delete(r.peerQueues, peerInfo.NodeID)
  385. r.peerMtx.Unlock()
  386. queue.close()
  387. if err := r.peerManager.Disconnected(peerInfo.NodeID); err != nil {
  388. r.logger.Error("failed to disconnect peer", "peer", peerInfo.NodeID, "err", err)
  389. }
  390. }()
  391. if err := r.peerManager.Ready(peerInfo.NodeID); err != nil {
  392. r.logger.Error("failed to mark peer as ready", "peer", peerInfo.NodeID, "err", err)
  393. return
  394. }
  395. r.routePeer(peerInfo.NodeID, conn, queue)
  396. }()
  397. }
  398. }
  399. // dialPeers maintains outbound connections to peers by dialing them.
  400. func (r *Router) dialPeers() {
  401. r.logger.Debug("starting dial routine")
  402. ctx := r.stopCtx()
  403. for {
  404. address, err := r.peerManager.DialNext(ctx)
  405. switch {
  406. case errors.Is(err, context.Canceled):
  407. r.logger.Debug("stopping dial routine")
  408. return
  409. case err != nil:
  410. r.logger.Error("failed to find next peer to dial", "err", err)
  411. return
  412. }
  413. // Spawn off a goroutine to actually dial the peer, so that we can
  414. // dial multiple peers in parallel.
  415. go func() {
  416. conn, err := r.dialPeer(ctx, address)
  417. switch {
  418. case errors.Is(err, context.Canceled):
  419. return
  420. case err != nil:
  421. r.logger.Error("failed to dial peer", "peer", address, "err", err)
  422. if err = r.peerManager.DialFailed(address); err != nil {
  423. r.logger.Error("failed to report dial failure", "peer", address, "err", err)
  424. }
  425. return
  426. }
  427. defer conn.Close()
  428. peerID := address.NodeID
  429. _, _, err = r.handshakePeer(ctx, conn, peerID)
  430. switch {
  431. case errors.Is(err, context.Canceled):
  432. return
  433. case err != nil:
  434. r.logger.Error("failed to handshake with peer", "peer", address, "err", err)
  435. if err = r.peerManager.DialFailed(address); err != nil {
  436. r.logger.Error("failed to report dial failure", "peer", address, "err", err)
  437. }
  438. return
  439. }
  440. if err = r.peerManager.Dialed(address); err != nil {
  441. r.logger.Error("failed to dial peer", "peer", address, "err", err)
  442. return
  443. }
  444. queue := newFIFOQueue()
  445. r.peerMtx.Lock()
  446. r.peerQueues[peerID] = queue
  447. r.peerMtx.Unlock()
  448. defer func() {
  449. r.peerMtx.Lock()
  450. delete(r.peerQueues, peerID)
  451. r.peerMtx.Unlock()
  452. queue.close()
  453. if err := r.peerManager.Disconnected(peerID); err != nil {
  454. r.logger.Error("failed to disconnect peer", "peer", address, "err", err)
  455. }
  456. }()
  457. if err := r.peerManager.Ready(peerID); err != nil {
  458. r.logger.Error("failed to mark peer as ready", "peer", address, "err", err)
  459. return
  460. }
  461. r.routePeer(peerID, conn, queue)
  462. }()
  463. }
  464. }
  465. // dialPeer connects to a peer by dialing it.
  466. func (r *Router) dialPeer(ctx context.Context, address NodeAddress) (Connection, error) {
  467. resolveCtx := ctx
  468. if r.options.ResolveTimeout > 0 {
  469. var cancel context.CancelFunc
  470. resolveCtx, cancel = context.WithTimeout(resolveCtx, r.options.ResolveTimeout)
  471. defer cancel()
  472. }
  473. r.logger.Debug("resolving peer address", "peer", address)
  474. endpoints, err := address.Resolve(resolveCtx)
  475. switch {
  476. case err != nil:
  477. return nil, fmt.Errorf("failed to resolve address %q: %w", address, err)
  478. case len(endpoints) == 0:
  479. return nil, fmt.Errorf("address %q did not resolve to any endpoints", address)
  480. }
  481. for _, endpoint := range endpoints {
  482. transport, ok := r.protocolTransports[endpoint.Protocol]
  483. if !ok {
  484. r.logger.Error("no transport found for protocol", "endpoint", endpoint)
  485. continue
  486. }
  487. dialCtx := ctx
  488. if r.options.DialTimeout > 0 {
  489. var cancel context.CancelFunc
  490. dialCtx, cancel = context.WithTimeout(dialCtx, r.options.DialTimeout)
  491. defer cancel()
  492. }
  493. // FIXME: When we dial and handshake the peer, we should pass it
  494. // appropriate address(es) it can use to dial us back. It can't use our
  495. // remote endpoint, since TCP uses different port numbers for outbound
  496. // connections than it does for inbound. Also, we may need to vary this
  497. // by the peer's endpoint, since e.g. a peer on 192.168.0.0 can reach us
  498. // on a private address on this endpoint, but a peer on the public
  499. // Internet can't and needs a different public address.
  500. conn, err := transport.Dial(dialCtx, endpoint)
  501. if err != nil {
  502. r.logger.Error("failed to dial endpoint", "peer", address.NodeID, "endpoint", endpoint, "err", err)
  503. } else {
  504. r.logger.Debug("dialed peer", "peer", address.NodeID, "endpoint", endpoint)
  505. return conn, nil
  506. }
  507. }
  508. return nil, errors.New("all endpoints failed")
  509. }
  510. // handshakePeer handshakes with a peer, validating the peer's information. If
  511. // expectID is given, we check that the peer's info matches it.
  512. func (r *Router) handshakePeer(ctx context.Context, conn Connection, expectID NodeID) (NodeInfo, crypto.PubKey, error) {
  513. if r.options.HandshakeTimeout > 0 {
  514. var cancel context.CancelFunc
  515. ctx, cancel = context.WithTimeout(ctx, r.options.HandshakeTimeout)
  516. defer cancel()
  517. }
  518. peerInfo, peerKey, err := conn.Handshake(ctx, r.nodeInfo, r.privKey)
  519. if err != nil {
  520. return peerInfo, peerKey, err
  521. }
  522. if err = peerInfo.Validate(); err != nil {
  523. return peerInfo, peerKey, fmt.Errorf("invalid handshake NodeInfo: %w", err)
  524. }
  525. if NodeIDFromPubKey(peerKey) != peerInfo.NodeID {
  526. return peerInfo, peerKey, fmt.Errorf("peer's public key did not match its node ID %q (expected %q)",
  527. peerInfo.NodeID, NodeIDFromPubKey(peerKey))
  528. }
  529. if expectID != "" && expectID != peerInfo.NodeID {
  530. return peerInfo, peerKey, fmt.Errorf("expected to connect with peer %q, got %q",
  531. expectID, peerInfo.NodeID)
  532. }
  533. return peerInfo, peerKey, nil
  534. }
  535. // routePeer routes inbound and outbound messages between a peer and the reactor
  536. // channels. It will close the given connection and send queue when done, or if
  537. // they are closed elsewhere it will cause this method to shut down and return.
  538. func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) {
  539. r.logger.Info("peer connected", "peer", peerID, "endpoint", conn)
  540. errCh := make(chan error, 2)
  541. go func() {
  542. errCh <- r.receivePeer(peerID, conn)
  543. }()
  544. go func() {
  545. errCh <- r.sendPeer(peerID, conn, sendQueue)
  546. }()
  547. err := <-errCh
  548. _ = conn.Close()
  549. sendQueue.close()
  550. if e := <-errCh; err == nil {
  551. // The first err was nil, so we update it with the second err, which may
  552. // or may not be nil.
  553. err = e
  554. }
  555. switch err {
  556. case nil, io.EOF:
  557. r.logger.Info("peer disconnected", "peer", peerID, "endpoint", conn)
  558. default:
  559. r.logger.Error("peer failure", "peer", peerID, "endpoint", conn, "err", err)
  560. }
  561. }
  562. // receivePeer receives inbound messages from a peer, deserializes them and
  563. // passes them on to the appropriate channel.
  564. func (r *Router) receivePeer(peerID NodeID, conn Connection) error {
  565. for {
  566. chID, bz, err := conn.ReceiveMessage()
  567. if err != nil {
  568. return err
  569. }
  570. r.channelMtx.RLock()
  571. queue, ok := r.channelQueues[chID]
  572. messageType := r.channelMessages[chID]
  573. r.channelMtx.RUnlock()
  574. if !ok {
  575. r.logger.Debug("dropping message for unknown channel", "peer", peerID, "channel", chID)
  576. continue
  577. }
  578. msg := proto.Clone(messageType)
  579. if err := proto.Unmarshal(bz, msg); err != nil {
  580. r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err)
  581. continue
  582. }
  583. if wrapper, ok := msg.(Wrapper); ok {
  584. msg, err = wrapper.Unwrap()
  585. if err != nil {
  586. r.logger.Error("failed to unwrap message", "err", err)
  587. continue
  588. }
  589. }
  590. select {
  591. case queue.enqueue() <- Envelope{From: peerID, Message: msg}:
  592. r.logger.Debug("received message", "peer", peerID, "message", msg)
  593. case <-queue.closed():
  594. r.logger.Debug("channel closed, dropping message", "peer", peerID, "channel", chID)
  595. case <-r.stopCh:
  596. return nil
  597. }
  598. }
  599. }
  600. // sendPeer sends queued messages to a peer.
  601. func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error {
  602. for {
  603. select {
  604. case envelope := <-queue.dequeue():
  605. if envelope.Message == nil {
  606. r.logger.Error("dropping nil message", "peer", peerID)
  607. continue
  608. }
  609. bz, err := proto.Marshal(envelope.Message)
  610. if err != nil {
  611. r.logger.Error("failed to marshal message", "peer", peerID, "err", err)
  612. continue
  613. }
  614. _, err = conn.SendMessage(envelope.channelID, bz)
  615. if err != nil {
  616. return err
  617. }
  618. r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message)
  619. case <-queue.closed():
  620. return nil
  621. case <-r.stopCh:
  622. return nil
  623. }
  624. }
  625. }
  626. // evictPeers evicts connected peers as requested by the peer manager.
  627. func (r *Router) evictPeers() {
  628. r.logger.Debug("starting evict routine")
  629. ctx := r.stopCtx()
  630. for {
  631. peerID, err := r.peerManager.EvictNext(ctx)
  632. switch {
  633. case errors.Is(err, context.Canceled):
  634. r.logger.Debug("stopping evict routine")
  635. return
  636. case err != nil:
  637. r.logger.Error("failed to find next peer to evict", "err", err)
  638. return
  639. }
  640. r.logger.Info("evicting peer", "peer", peerID)
  641. r.peerMtx.RLock()
  642. queue, ok := r.peerQueues[peerID]
  643. r.peerMtx.RUnlock()
  644. if ok {
  645. queue.close()
  646. }
  647. }
  648. }
  649. // OnStart implements service.Service.
  650. func (r *Router) OnStart() error {
  651. go r.dialPeers()
  652. go r.evictPeers()
  653. for _, transport := range r.transports {
  654. go r.acceptPeers(transport)
  655. }
  656. return nil
  657. }
  658. // OnStop implements service.Service.
  659. //
  660. // All channels must be closed by OpenChannel() callers before stopping the
  661. // router, to prevent blocked channel sends in reactors. Channels are not closed
  662. // here, since that would cause any reactor senders to panic, so it is the
  663. // sender's responsibility.
  664. func (r *Router) OnStop() {
  665. // Signal router shutdown.
  666. close(r.stopCh)
  667. // Close transport listeners (unblocks Accept calls).
  668. for _, transport := range r.transports {
  669. if err := transport.Close(); err != nil {
  670. r.logger.Error("failed to close transport", "transport", transport, "err", err)
  671. }
  672. }
  673. // Collect all remaining queues, and wait for them to close.
  674. queues := []queue{}
  675. r.channelMtx.RLock()
  676. for _, q := range r.channelQueues {
  677. queues = append(queues, q)
  678. }
  679. r.channelMtx.RUnlock()
  680. r.peerMtx.RLock()
  681. for _, q := range r.peerQueues {
  682. queues = append(queues, q)
  683. }
  684. r.peerMtx.RUnlock()
  685. for _, q := range queues {
  686. <-q.closed()
  687. }
  688. }
  689. // stopCtx returns a new context that is cancelled when the router stops.
  690. func (r *Router) stopCtx() context.Context {
  691. ctx, cancel := context.WithCancel(context.Background())
  692. go func() {
  693. <-r.stopCh
  694. cancel()
  695. }()
  696. return ctx
  697. }