You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

555 lines
17 KiB

p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
  1. package p2p
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "time"
  9. "github.com/gogo/protobuf/proto"
  10. "github.com/tendermint/tendermint/libs/log"
  11. "github.com/tendermint/tendermint/libs/service"
  12. )
  13. // Router manages peer connections and routes messages between peers and reactor
  14. // channels. This is an early prototype.
  15. //
  16. // Channels are registered via OpenChannel(). When called, we register an input
  17. // message queue for the channel in channelQueues and spawn off a goroutine for
  18. // Router.routeChannel(). This goroutine reads off outbound messages and puts
  19. // them in the appropriate peer message queue, and processes peer errors which
  20. // will close (and thus disconnect) the appriate peer queue. It runs until
  21. // either the channel is closed by the caller or the router is stopped, at which
  22. // point the input message queue is closed and removed.
  23. //
  24. // On startup, the router spawns off three primary goroutines that maintain
  25. // connections to peers and run for the lifetime of the router:
  26. //
  27. // Router.dialPeers(): in a loop, asks the PeerManager for the next peer
  28. // address to contact, resolves it into endpoints, and attempts to dial
  29. // each one.
  30. //
  31. // Router.acceptPeers(): in a loop, waits for the next inbound connection
  32. // from a peer, and checks with the PeerManager if it should be accepted.
  33. //
  34. // Router.evictPeers(): in a loop, asks the PeerManager for any connected
  35. // peers to evict, and disconnects them.
  36. //
  37. // Once either an inbound or outbound connection has been made, an outbound
  38. // message queue is registered in Router.peerQueues and a goroutine is spawned
  39. // off for Router.routePeer() which will spawn off additional goroutines for
  40. // Router.sendPeer() that sends outbound messages from the peer queue over the
  41. // connection and for Router.receivePeer() that reads inbound messages from
  42. // the connection and places them in the appropriate channel queue. When either
  43. // goroutine exits, the connection and peer queue is closed, which will cause
  44. // the other goroutines to close as well.
  45. //
  46. // The peerStore is used to coordinate peer connections, by only allowing a peer
  47. // to be claimed (owned) by a single caller at a time (both for outbound and
  48. // inbound connections). This is done either via peerStore.Dispense() which
  49. // dispenses and claims an eligible peer to dial, or via peerStore.Claim() which
  50. // attempts to claim a given peer for an inbound connection. Peers must be
  51. // returned to the peerStore with peerStore.Return() to release the claim. Over
  52. // time, the peerStore will also do peer scheduling and prioritization, e.g.
  53. // ensuring we do exponential backoff on dial failures and connecting to
  54. // more important peers first (such as persistent peers and validators).
  55. //
  56. // An additional goroutine Router.broadcastPeerUpdates() is also spawned off
  57. // on startup, which consumes peer updates from Router.peerUpdatesCh (currently
  58. // only connections and disconnections), and broadcasts them to all peer update
  59. // subscriptions registered via SubscribePeerUpdates().
  60. //
  61. // On router shutdown, we close Router.stopCh which will signal to all
  62. // goroutines to terminate. This in turn will cause all pending channel/peer
  63. // queues to close, and we wait for this as a signal that goroutines have ended.
  64. //
  65. // All message scheduling should be limited to the queue implementations used
  66. // for channel queues and peer queues. All message sending throughout the router
  67. // is blocking, and if any messages should be dropped or buffered this is the
  68. // sole responsibility of the queue, such that we can limit this logic to a
  69. // single place. There is currently only a FIFO queue implementation that always
  70. // blocks and never drops messages, but this must be improved with other
  71. // implementations. The only exception is that all message sending must also
  72. // select on appropriate channel/queue/router closure signals, to avoid blocking
  73. // forever on a channel that has no consumer.
  74. type Router struct {
  75. *service.BaseService
  76. logger log.Logger
  77. transports map[Protocol]Transport
  78. peerManager *PeerManager
  79. // FIXME: Consider using sync.Map.
  80. peerMtx sync.RWMutex
  81. peerQueues map[NodeID]queue
  82. // FIXME: We don't strictly need to use a mutex for this if we seal the
  83. // channels on router start. This depends on whether we want to allow
  84. // dynamic channels in the future.
  85. channelMtx sync.RWMutex
  86. channelQueues map[ChannelID]queue
  87. channelMessages map[ChannelID]proto.Message
  88. // stopCh is used to signal router shutdown, by closing the channel.
  89. stopCh chan struct{}
  90. }
  91. // NewRouter creates a new Router, dialing the given peers.
  92. //
  93. // FIXME: providing protocol/transport maps is cumbersome in tests, we should
  94. // consider adding Protocols() to the Transport interface instead and register
  95. // protocol/transport mappings automatically on a first-come basis.
  96. func NewRouter(logger log.Logger, peerManager *PeerManager, transports map[Protocol]Transport) *Router {
  97. router := &Router{
  98. logger: logger,
  99. transports: transports,
  100. peerManager: peerManager,
  101. stopCh: make(chan struct{}),
  102. channelQueues: map[ChannelID]queue{},
  103. channelMessages: map[ChannelID]proto.Message{},
  104. peerQueues: map[NodeID]queue{},
  105. }
  106. router.BaseService = service.NewBaseService(logger, "router", router)
  107. return router
  108. }
  109. // OpenChannel opens a new channel for the given message type. The caller must
  110. // close the channel when done, and this must happen before the router stops.
  111. func (r *Router) OpenChannel(id ChannelID, messageType proto.Message) (*Channel, error) {
  112. // FIXME: NewChannel should take directional channels so we can pass
  113. // queue.dequeue() instead of reaching inside for queue.queueCh.
  114. queue := newFIFOQueue()
  115. channel := NewChannel(id, messageType, queue.queueCh, make(chan Envelope), make(chan PeerError))
  116. r.channelMtx.Lock()
  117. defer r.channelMtx.Unlock()
  118. if _, ok := r.channelQueues[id]; ok {
  119. return nil, fmt.Errorf("channel %v already exists", id)
  120. }
  121. r.channelQueues[id] = queue
  122. r.channelMessages[id] = messageType
  123. go func() {
  124. defer func() {
  125. r.channelMtx.Lock()
  126. delete(r.channelQueues, id)
  127. delete(r.channelMessages, id)
  128. r.channelMtx.Unlock()
  129. queue.close()
  130. }()
  131. r.routeChannel(channel)
  132. }()
  133. return channel, nil
  134. }
  135. // routeChannel receives outbound messages and errors from a channel and routes
  136. // them to the appropriate peer. It returns when either the channel is closed or
  137. // the router is shutting down.
  138. func (r *Router) routeChannel(channel *Channel) {
  139. for {
  140. select {
  141. case envelope, ok := <-channel.outCh:
  142. if !ok {
  143. return
  144. }
  145. // FIXME: This is a bit unergonomic, maybe it'd be better for Wrap()
  146. // to return a wrapped copy.
  147. if _, ok := channel.messageType.(Wrapper); ok {
  148. wrapper := proto.Clone(channel.messageType)
  149. if err := wrapper.(Wrapper).Wrap(envelope.Message); err != nil {
  150. r.Logger.Error("failed to wrap message", "err", err)
  151. continue
  152. }
  153. envelope.Message = wrapper
  154. }
  155. envelope.channelID = channel.id
  156. if envelope.Broadcast {
  157. r.peerMtx.RLock()
  158. peerQueues := make(map[NodeID]queue, len(r.peerQueues))
  159. for peerID, peerQueue := range r.peerQueues {
  160. peerQueues[peerID] = peerQueue
  161. }
  162. r.peerMtx.RUnlock()
  163. for peerID, peerQueue := range peerQueues {
  164. e := envelope
  165. e.Broadcast = false
  166. e.To = peerID
  167. select {
  168. case peerQueue.enqueue() <- e:
  169. case <-peerQueue.closed():
  170. case <-r.stopCh:
  171. return
  172. }
  173. }
  174. } else {
  175. r.peerMtx.RLock()
  176. peerQueue, ok := r.peerQueues[envelope.To]
  177. r.peerMtx.RUnlock()
  178. if !ok {
  179. r.logger.Error("dropping message for non-connected peer",
  180. "peer", envelope.To, "channel", channel.id)
  181. continue
  182. }
  183. select {
  184. case peerQueue.enqueue() <- envelope:
  185. case <-peerQueue.closed():
  186. r.logger.Error("dropping message for non-connected peer",
  187. "peer", envelope.To, "channel", channel.id)
  188. case <-r.stopCh:
  189. return
  190. }
  191. }
  192. case peerError, ok := <-channel.errCh:
  193. if !ok {
  194. return
  195. }
  196. // FIXME: We just disconnect the peer for now
  197. r.logger.Error("peer error, disconnecting", "peer", peerError.PeerID, "err", peerError.Err)
  198. r.peerMtx.RLock()
  199. peerQueue, ok := r.peerQueues[peerError.PeerID]
  200. r.peerMtx.RUnlock()
  201. if ok {
  202. peerQueue.close()
  203. }
  204. case <-channel.Done():
  205. return
  206. case <-r.stopCh:
  207. return
  208. }
  209. }
  210. }
  211. // acceptPeers accepts inbound connections from peers on the given transport.
  212. func (r *Router) acceptPeers(transport Transport) {
  213. ctx := r.stopCtx()
  214. for {
  215. // FIXME: We may need transports to enforce some sort of rate limiting
  216. // here (e.g. by IP address), or alternatively have PeerManager.Accepted()
  217. // do it for us.
  218. conn, err := transport.Accept(ctx)
  219. switch err {
  220. case nil:
  221. case ErrTransportClosed{}, io.EOF, context.Canceled:
  222. r.logger.Debug("stopping accept routine", "transport", transport)
  223. return
  224. default:
  225. r.logger.Error("failed to accept connection", "transport", transport, "err", err)
  226. continue
  227. }
  228. go func() {
  229. defer func() {
  230. _ = conn.Close()
  231. }()
  232. // FIXME: Because we do the handshake in each transport, rather than
  233. // here in the Router, the remote peer will think they've
  234. // successfully connected and start sending us messages, although we
  235. // can end up rejecting the connection here. This can e.g. cause
  236. // problems in tests, where because of race conditions a
  237. // disconnection can cause the local node to immediately redial,
  238. // while the remote node may not have completed the disconnection
  239. // registration yet and reject the accept below.
  240. //
  241. // The Router should do the handshake, and we should check with the
  242. // peer manager before completing the handshake -- this probably
  243. // requires protocol changes to send an additional message when the
  244. // handshake is accepted.
  245. peerID := conn.NodeInfo().NodeID
  246. if err := r.peerManager.Accepted(peerID); err != nil {
  247. r.logger.Error("failed to accept connection", "peer", peerID, "err", err)
  248. return
  249. }
  250. queue := newFIFOQueue()
  251. r.peerMtx.Lock()
  252. r.peerQueues[peerID] = queue
  253. r.peerMtx.Unlock()
  254. r.peerManager.Ready(peerID)
  255. defer func() {
  256. r.peerMtx.Lock()
  257. delete(r.peerQueues, peerID)
  258. r.peerMtx.Unlock()
  259. queue.close()
  260. if err := r.peerManager.Disconnected(peerID); err != nil {
  261. r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err)
  262. }
  263. }()
  264. r.routePeer(peerID, conn, queue)
  265. }()
  266. }
  267. }
  268. // dialPeers maintains outbound connections to peers.
  269. func (r *Router) dialPeers() {
  270. ctx := r.stopCtx()
  271. for {
  272. peerID, address, err := r.peerManager.DialNext(ctx)
  273. switch err {
  274. case nil:
  275. case context.Canceled:
  276. r.logger.Debug("stopping dial routine")
  277. return
  278. default:
  279. r.logger.Error("failed to find next peer to dial", "err", err)
  280. return
  281. }
  282. go func() {
  283. conn, err := r.dialPeer(ctx, address)
  284. if errors.Is(err, context.Canceled) {
  285. return
  286. } else if err != nil {
  287. r.logger.Error("failed to dial peer", "peer", peerID)
  288. if err = r.peerManager.DialFailed(peerID, address); err != nil {
  289. r.logger.Error("failed to report dial failure", "peer", peerID, "err", err)
  290. }
  291. return
  292. }
  293. defer conn.Close()
  294. if err = r.peerManager.Dialed(peerID, address); err != nil {
  295. r.logger.Error("failed to dial peer", "peer", peerID, "err", err)
  296. return
  297. }
  298. queue := newFIFOQueue()
  299. r.peerMtx.Lock()
  300. r.peerQueues[peerID] = queue
  301. r.peerMtx.Unlock()
  302. r.peerManager.Ready(peerID)
  303. defer func() {
  304. r.peerMtx.Lock()
  305. delete(r.peerQueues, peerID)
  306. r.peerMtx.Unlock()
  307. queue.close()
  308. if err := r.peerManager.Disconnected(peerID); err != nil {
  309. r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err)
  310. }
  311. }()
  312. r.routePeer(peerID, conn, queue)
  313. }()
  314. }
  315. }
  316. // dialPeer attempts to connect to a peer.
  317. func (r *Router) dialPeer(ctx context.Context, address PeerAddress) (Connection, error) {
  318. resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
  319. defer cancel()
  320. r.logger.Info("resolving peer address", "address", address)
  321. endpoints, err := address.Resolve(resolveCtx)
  322. if err != nil {
  323. return nil, fmt.Errorf("failed to resolve address %q: %w", address, err)
  324. }
  325. for _, endpoint := range endpoints {
  326. t, ok := r.transports[endpoint.Protocol]
  327. if !ok {
  328. r.logger.Error("no transport found for protocol", "protocol", endpoint.Protocol)
  329. continue
  330. }
  331. dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
  332. defer cancel()
  333. // FIXME: When we dial and handshake the peer, we should pass it
  334. // appropriate address(es) it can use to dial us back. It can't use our
  335. // remote endpoint, since TCP uses different port numbers for outbound
  336. // connections than it does for inbound. Also, we may need to vary this
  337. // by the peer's endpoint, since e.g. a peer on 192.168.0.0 can reach us
  338. // on a private address on this endpoint, but a peer on the public
  339. // Internet can't and needs a different public address.
  340. conn, err := t.Dial(dialCtx, endpoint)
  341. if err != nil {
  342. r.logger.Error("failed to dial endpoint", "endpoint", endpoint, "err", err)
  343. } else {
  344. r.logger.Info("connected to peer", "peer", address.ID, "endpoint", endpoint)
  345. return conn, nil
  346. }
  347. }
  348. return nil, fmt.Errorf("failed to connect to peer via %q", address)
  349. }
  350. // routePeer routes inbound messages from a peer to channels, and also sends
  351. // outbound queued messages to the peer. It will close the connection and send
  352. // queue, using this as a signal to coordinate the internal receivePeer() and
  353. // sendPeer() goroutines. It blocks until the peer is done, e.g. when the
  354. // connection or queue is closed.
  355. func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) {
  356. r.logger.Info("routing peer", "peer", peerID)
  357. resultsCh := make(chan error, 2)
  358. go func() {
  359. resultsCh <- r.receivePeer(peerID, conn)
  360. }()
  361. go func() {
  362. resultsCh <- r.sendPeer(peerID, conn, sendQueue)
  363. }()
  364. err := <-resultsCh
  365. _ = conn.Close()
  366. sendQueue.close()
  367. if e := <-resultsCh; err == nil {
  368. // The first err was nil, so we update it with the second result,
  369. // which may or may not be nil.
  370. err = e
  371. }
  372. switch err {
  373. case nil, io.EOF, ErrTransportClosed{}:
  374. r.logger.Info("peer disconnected", "peer", peerID)
  375. default:
  376. r.logger.Error("peer failure", "peer", peerID, "err", err)
  377. }
  378. }
  379. // receivePeer receives inbound messages from a peer, deserializes them and
  380. // passes them on to the appropriate channel.
  381. func (r *Router) receivePeer(peerID NodeID, conn Connection) error {
  382. for {
  383. chID, bz, err := conn.ReceiveMessage()
  384. if err != nil {
  385. return err
  386. }
  387. r.channelMtx.RLock()
  388. queue, ok := r.channelQueues[ChannelID(chID)]
  389. messageType := r.channelMessages[ChannelID(chID)]
  390. r.channelMtx.RUnlock()
  391. if !ok {
  392. r.logger.Error("dropping message for unknown channel", "peer", peerID, "channel", chID)
  393. continue
  394. }
  395. msg := proto.Clone(messageType)
  396. if err := proto.Unmarshal(bz, msg); err != nil {
  397. r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err)
  398. continue
  399. }
  400. if wrapper, ok := msg.(Wrapper); ok {
  401. msg, err = wrapper.Unwrap()
  402. if err != nil {
  403. r.logger.Error("failed to unwrap message", "err", err)
  404. continue
  405. }
  406. }
  407. select {
  408. // FIXME: ReceiveMessage() should return ChannelID.
  409. case queue.enqueue() <- Envelope{channelID: ChannelID(chID), From: peerID, Message: msg}:
  410. r.logger.Debug("received message", "peer", peerID, "message", msg)
  411. case <-queue.closed():
  412. r.logger.Error("channel closed, dropping message", "peer", peerID, "channel", chID)
  413. case <-r.stopCh:
  414. return nil
  415. }
  416. }
  417. }
  418. // sendPeer sends queued messages to a peer.
  419. func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error {
  420. for {
  421. select {
  422. case envelope := <-queue.dequeue():
  423. bz, err := proto.Marshal(envelope.Message)
  424. if err != nil {
  425. r.logger.Error("failed to marshal message", "peer", peerID, "err", err)
  426. continue
  427. }
  428. // FIXME: SendMessage() should take ChannelID.
  429. _, err = conn.SendMessage(byte(envelope.channelID), bz)
  430. if err != nil {
  431. return err
  432. }
  433. r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message)
  434. case <-queue.closed():
  435. return nil
  436. case <-r.stopCh:
  437. return nil
  438. }
  439. }
  440. }
  441. // evictPeers evicts connected peers as requested by the peer manager.
  442. func (r *Router) evictPeers() {
  443. ctx := r.stopCtx()
  444. for {
  445. peerID, err := r.peerManager.EvictNext(ctx)
  446. switch err {
  447. case nil:
  448. case context.Canceled:
  449. r.logger.Debug("stopping evict routine")
  450. return
  451. default:
  452. r.logger.Error("failed to find next peer to evict", "err", err)
  453. return
  454. }
  455. r.logger.Info("evicting peer", "peer", peerID)
  456. r.peerMtx.RLock()
  457. if queue, ok := r.peerQueues[peerID]; ok {
  458. queue.close()
  459. }
  460. r.peerMtx.RUnlock()
  461. }
  462. }
  463. // OnStart implements service.Service.
  464. func (r *Router) OnStart() error {
  465. go r.dialPeers()
  466. for _, transport := range r.transports {
  467. go r.acceptPeers(transport)
  468. }
  469. go r.evictPeers()
  470. return nil
  471. }
  472. // OnStop implements service.Service.
  473. func (r *Router) OnStop() {
  474. // Collect all active queues, so we can wait for them to close.
  475. queues := []queue{}
  476. r.channelMtx.RLock()
  477. for _, q := range r.channelQueues {
  478. queues = append(queues, q)
  479. }
  480. r.channelMtx.RUnlock()
  481. r.peerMtx.RLock()
  482. for _, q := range r.peerQueues {
  483. queues = append(queues, q)
  484. }
  485. r.peerMtx.RUnlock()
  486. // Signal router shutdown, and wait for queues (and thus goroutines)
  487. // to complete.
  488. close(r.stopCh)
  489. for _, q := range queues {
  490. <-q.closed()
  491. }
  492. }
  493. // stopCtx returns a context that is cancelled when the router stops.
  494. func (r *Router) stopCtx() context.Context {
  495. ctx, cancel := context.WithCancel(context.Background())
  496. go func() {
  497. <-r.stopCh
  498. cancel()
  499. }()
  500. return ctx
  501. }