You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

542 lines
17 KiB

p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
  1. package p2p
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "time"
  9. "github.com/gogo/protobuf/proto"
  10. "github.com/tendermint/tendermint/libs/log"
  11. "github.com/tendermint/tendermint/libs/service"
  12. )
  13. // Router manages peer connections and routes messages between peers and reactor
  14. // channels. This is an early prototype.
  15. //
  16. // Channels are registered via OpenChannel(). When called, we register an input
  17. // message queue for the channel in channelQueues and spawn off a goroutine for
  18. // Router.routeChannel(). This goroutine reads off outbound messages and puts
  19. // them in the appropriate peer message queue, and processes peer errors which
  20. // will close (and thus disconnect) the appriate peer queue. It runs until
  21. // either the channel is closed by the caller or the router is stopped, at which
  22. // point the input message queue is closed and removed.
  23. //
  24. // On startup, the router spawns off three primary goroutines that maintain
  25. // connections to peers and run for the lifetime of the router:
  26. //
  27. // Router.dialPeers(): in a loop, asks the PeerManager for the next peer
  28. // address to contact, resolves it into endpoints, and attempts to dial
  29. // each one.
  30. //
  31. // Router.acceptPeers(): in a loop, waits for the next inbound connection
  32. // from a peer, and checks with the PeerManager if it should be accepted.
  33. //
  34. // Router.evictPeers(): in a loop, asks the PeerManager for any connected
  35. // peers to evict, and disconnects them.
  36. //
  37. // Once either an inbound or outbound connection has been made, an outbound
  38. // message queue is registered in Router.peerQueues and a goroutine is spawned
  39. // off for Router.routePeer() which will spawn off additional goroutines for
  40. // Router.sendPeer() that sends outbound messages from the peer queue over the
  41. // connection and for Router.receivePeer() that reads inbound messages from
  42. // the connection and places them in the appropriate channel queue. When either
  43. // goroutine exits, the connection and peer queue is closed, which will cause
  44. // the other goroutines to close as well.
  45. //
  46. // The peerStore is used to coordinate peer connections, by only allowing a peer
  47. // to be claimed (owned) by a single caller at a time (both for outbound and
  48. // inbound connections). This is done either via peerStore.Dispense() which
  49. // dispenses and claims an eligible peer to dial, or via peerStore.Claim() which
  50. // attempts to claim a given peer for an inbound connection. Peers must be
  51. // returned to the peerStore with peerStore.Return() to release the claim. Over
  52. // time, the peerStore will also do peer scheduling and prioritization, e.g.
  53. // ensuring we do exponential backoff on dial failures and connecting to
  54. // more important peers first (such as persistent peers and validators).
  55. //
  56. // An additional goroutine Router.broadcastPeerUpdates() is also spawned off
  57. // on startup, which consumes peer updates from Router.peerUpdatesCh (currently
  58. // only connections and disconnections), and broadcasts them to all peer update
  59. // subscriptions registered via SubscribePeerUpdates().
  60. //
  61. // On router shutdown, we close Router.stopCh which will signal to all
  62. // goroutines to terminate. This in turn will cause all pending channel/peer
  63. // queues to close, and we wait for this as a signal that goroutines have ended.
  64. //
  65. // All message scheduling should be limited to the queue implementations used
  66. // for channel queues and peer queues. All message sending throughout the router
  67. // is blocking, and if any messages should be dropped or buffered this is the
  68. // sole responsibility of the queue, such that we can limit this logic to a
  69. // single place. There is currently only a FIFO queue implementation that always
  70. // blocks and never drops messages, but this must be improved with other
  71. // implementations. The only exception is that all message sending must also
  72. // select on appropriate channel/queue/router closure signals, to avoid blocking
  73. // forever on a channel that has no consumer.
  74. type Router struct {
  75. *service.BaseService
  76. logger log.Logger
  77. transports map[Protocol]Transport
  78. peerManager *PeerManager
  79. // FIXME: Consider using sync.Map.
  80. peerMtx sync.RWMutex
  81. peerQueues map[NodeID]queue
  82. // FIXME: We don't strictly need to use a mutex for this if we seal the
  83. // channels on router start. This depends on whether we want to allow
  84. // dynamic channels in the future.
  85. channelMtx sync.RWMutex
  86. channelQueues map[ChannelID]queue
  87. channelMessages map[ChannelID]proto.Message
  88. // stopCh is used to signal router shutdown, by closing the channel.
  89. stopCh chan struct{}
  90. }
  91. // NewRouter creates a new Router, dialing the given peers.
  92. //
  93. // FIXME: providing protocol/transport maps is cumbersome in tests, we should
  94. // consider adding Protocols() to the Transport interface instead and register
  95. // protocol/transport mappings automatically on a first-come basis.
  96. func NewRouter(logger log.Logger, peerManager *PeerManager, transports map[Protocol]Transport) *Router {
  97. router := &Router{
  98. logger: logger,
  99. transports: transports,
  100. peerManager: peerManager,
  101. stopCh: make(chan struct{}),
  102. channelQueues: map[ChannelID]queue{},
  103. channelMessages: map[ChannelID]proto.Message{},
  104. peerQueues: map[NodeID]queue{},
  105. }
  106. router.BaseService = service.NewBaseService(logger, "router", router)
  107. return router
  108. }
  109. // OpenChannel opens a new channel for the given message type. The caller must
  110. // close the channel when done, and this must happen before the router stops.
  111. func (r *Router) OpenChannel(id ChannelID, messageType proto.Message) (*Channel, error) {
  112. // FIXME: NewChannel should take directional channels so we can pass
  113. // queue.dequeue() instead of reaching inside for queue.queueCh.
  114. queue := newFIFOQueue()
  115. channel := NewChannel(id, messageType, queue.queueCh, make(chan Envelope), make(chan PeerError))
  116. r.channelMtx.Lock()
  117. defer r.channelMtx.Unlock()
  118. if _, ok := r.channelQueues[id]; ok {
  119. return nil, fmt.Errorf("channel %v already exists", id)
  120. }
  121. r.channelQueues[id] = queue
  122. r.channelMessages[id] = messageType
  123. go func() {
  124. defer func() {
  125. r.channelMtx.Lock()
  126. delete(r.channelQueues, id)
  127. delete(r.channelMessages, id)
  128. r.channelMtx.Unlock()
  129. queue.close()
  130. }()
  131. r.routeChannel(channel)
  132. }()
  133. return channel, nil
  134. }
  135. // routeChannel receives outbound messages and errors from a channel and routes
  136. // them to the appropriate peer. It returns when either the channel is closed or
  137. // the router is shutting down.
  138. func (r *Router) routeChannel(channel *Channel) {
  139. for {
  140. select {
  141. case envelope, ok := <-channel.outCh:
  142. if !ok {
  143. return
  144. }
  145. // FIXME: This is a bit unergonomic, maybe it'd be better for Wrap()
  146. // to return a wrapped copy.
  147. if _, ok := channel.messageType.(Wrapper); ok {
  148. wrapper := proto.Clone(channel.messageType)
  149. if err := wrapper.(Wrapper).Wrap(envelope.Message); err != nil {
  150. r.Logger.Error("failed to wrap message", "err", err)
  151. continue
  152. }
  153. envelope.Message = wrapper
  154. }
  155. envelope.channelID = channel.id
  156. if envelope.Broadcast {
  157. r.peerMtx.RLock()
  158. peerQueues := make(map[NodeID]queue, len(r.peerQueues))
  159. for peerID, peerQueue := range r.peerQueues {
  160. peerQueues[peerID] = peerQueue
  161. }
  162. r.peerMtx.RUnlock()
  163. for peerID, peerQueue := range peerQueues {
  164. e := envelope
  165. e.Broadcast = false
  166. e.To = peerID
  167. select {
  168. case peerQueue.enqueue() <- e:
  169. case <-peerQueue.closed():
  170. case <-r.stopCh:
  171. return
  172. }
  173. }
  174. } else {
  175. r.peerMtx.RLock()
  176. peerQueue, ok := r.peerQueues[envelope.To]
  177. r.peerMtx.RUnlock()
  178. if !ok {
  179. r.logger.Error("dropping message for non-connected peer",
  180. "peer", envelope.To, "channel", channel.id)
  181. continue
  182. }
  183. select {
  184. case peerQueue.enqueue() <- envelope:
  185. case <-peerQueue.closed():
  186. r.logger.Error("dropping message for non-connected peer",
  187. "peer", envelope.To, "channel", channel.id)
  188. case <-r.stopCh:
  189. return
  190. }
  191. }
  192. case peerError, ok := <-channel.errCh:
  193. if !ok {
  194. return
  195. }
  196. // FIXME: We just disconnect the peer for now
  197. r.logger.Error("peer error, disconnecting", "peer", peerError.PeerID, "err", peerError.Err)
  198. r.peerMtx.RLock()
  199. peerQueue, ok := r.peerQueues[peerError.PeerID]
  200. r.peerMtx.RUnlock()
  201. if ok {
  202. peerQueue.close()
  203. }
  204. case <-channel.Done():
  205. return
  206. case <-r.stopCh:
  207. return
  208. }
  209. }
  210. }
  211. // acceptPeers accepts inbound connections from peers on the given transport.
  212. func (r *Router) acceptPeers(transport Transport) {
  213. ctx := r.stopCtx()
  214. for {
  215. // FIXME: We may need transports to enforce some sort of rate limiting
  216. // here (e.g. by IP address), or alternatively have PeerManager.Accepted()
  217. // do it for us.
  218. conn, err := transport.Accept(ctx)
  219. switch err {
  220. case nil:
  221. case ErrTransportClosed{}, io.EOF, context.Canceled:
  222. r.logger.Debug("stopping accept routine", "transport", transport)
  223. return
  224. default:
  225. r.logger.Error("failed to accept connection", "transport", transport, "err", err)
  226. continue
  227. }
  228. go func() {
  229. defer func() {
  230. _ = conn.Close()
  231. }()
  232. peerID := conn.NodeInfo().NodeID
  233. if err := r.peerManager.Accepted(peerID); err != nil {
  234. r.logger.Error("failed to accept connection", "peer", peerID, "err", err)
  235. return
  236. }
  237. queue := newFIFOQueue()
  238. r.peerMtx.Lock()
  239. r.peerQueues[peerID] = queue
  240. r.peerMtx.Unlock()
  241. r.peerManager.Ready(peerID)
  242. defer func() {
  243. r.peerMtx.Lock()
  244. delete(r.peerQueues, peerID)
  245. r.peerMtx.Unlock()
  246. queue.close()
  247. if err := r.peerManager.Disconnected(peerID); err != nil {
  248. r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err)
  249. }
  250. }()
  251. r.routePeer(peerID, conn, queue)
  252. }()
  253. }
  254. }
  255. // dialPeers maintains outbound connections to peers.
  256. func (r *Router) dialPeers() {
  257. ctx := r.stopCtx()
  258. for {
  259. peerID, address, err := r.peerManager.DialNext(ctx)
  260. switch err {
  261. case nil:
  262. case context.Canceled:
  263. r.logger.Debug("stopping dial routine")
  264. return
  265. default:
  266. r.logger.Error("failed to find next peer to dial", "err", err)
  267. return
  268. }
  269. go func() {
  270. conn, err := r.dialPeer(ctx, address)
  271. if errors.Is(err, context.Canceled) {
  272. return
  273. } else if err != nil {
  274. r.logger.Error("failed to dial peer", "peer", peerID)
  275. if err = r.peerManager.DialFailed(peerID, address); err != nil {
  276. r.logger.Error("failed to report dial failure", "peer", peerID, "err", err)
  277. }
  278. return
  279. }
  280. defer conn.Close()
  281. if err = r.peerManager.Dialed(peerID, address); err != nil {
  282. r.logger.Error("failed to dial peer", "peer", peerID, "err", err)
  283. return
  284. }
  285. queue := newFIFOQueue()
  286. r.peerMtx.Lock()
  287. r.peerQueues[peerID] = queue
  288. r.peerMtx.Unlock()
  289. r.peerManager.Ready(peerID)
  290. defer func() {
  291. r.peerMtx.Lock()
  292. delete(r.peerQueues, peerID)
  293. r.peerMtx.Unlock()
  294. queue.close()
  295. if err := r.peerManager.Disconnected(peerID); err != nil {
  296. r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err)
  297. }
  298. }()
  299. r.routePeer(peerID, conn, queue)
  300. }()
  301. }
  302. }
  303. // dialPeer attempts to connect to a peer.
  304. func (r *Router) dialPeer(ctx context.Context, address PeerAddress) (Connection, error) {
  305. resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
  306. defer cancel()
  307. r.logger.Info("resolving peer address", "address", address)
  308. endpoints, err := address.Resolve(resolveCtx)
  309. if err != nil {
  310. return nil, fmt.Errorf("failed to resolve address %q: %w", address, err)
  311. }
  312. for _, endpoint := range endpoints {
  313. t, ok := r.transports[endpoint.Protocol]
  314. if !ok {
  315. r.logger.Error("no transport found for protocol", "protocol", endpoint.Protocol)
  316. continue
  317. }
  318. dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
  319. defer cancel()
  320. // FIXME: When we dial and handshake the peer, we should pass it
  321. // appropriate address(es) it can use to dial us back. It can't use our
  322. // remote endpoint, since TCP uses different port numbers for outbound
  323. // connections than it does for inbound. Also, we may need to vary this
  324. // by the peer's endpoint, since e.g. a peer on 192.168.0.0 can reach us
  325. // on a private address on this endpoint, but a peer on the public
  326. // Internet can't and needs a different public address.
  327. conn, err := t.Dial(dialCtx, endpoint)
  328. if err != nil {
  329. r.logger.Error("failed to dial endpoint", "endpoint", endpoint, "err", err)
  330. } else {
  331. r.logger.Info("connected to peer", "peer", address.ID, "endpoint", endpoint)
  332. return conn, nil
  333. }
  334. }
  335. return nil, fmt.Errorf("failed to connect to peer via %q", address)
  336. }
  337. // routePeer routes inbound messages from a peer to channels, and also sends
  338. // outbound queued messages to the peer. It will close the connection and send
  339. // queue, using this as a signal to coordinate the internal receivePeer() and
  340. // sendPeer() goroutines. It blocks until the peer is done, e.g. when the
  341. // connection or queue is closed.
  342. func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) {
  343. r.logger.Info("routing peer", "peer", peerID)
  344. resultsCh := make(chan error, 2)
  345. go func() {
  346. resultsCh <- r.receivePeer(peerID, conn)
  347. }()
  348. go func() {
  349. resultsCh <- r.sendPeer(peerID, conn, sendQueue)
  350. }()
  351. err := <-resultsCh
  352. _ = conn.Close()
  353. sendQueue.close()
  354. if e := <-resultsCh; err == nil {
  355. // The first err was nil, so we update it with the second result,
  356. // which may or may not be nil.
  357. err = e
  358. }
  359. switch err {
  360. case nil, io.EOF, ErrTransportClosed{}:
  361. r.logger.Info("peer disconnected", "peer", peerID)
  362. default:
  363. r.logger.Error("peer failure", "peer", peerID, "err", err)
  364. }
  365. }
  366. // receivePeer receives inbound messages from a peer, deserializes them and
  367. // passes them on to the appropriate channel.
  368. func (r *Router) receivePeer(peerID NodeID, conn Connection) error {
  369. for {
  370. chID, bz, err := conn.ReceiveMessage()
  371. if err != nil {
  372. return err
  373. }
  374. r.channelMtx.RLock()
  375. queue, ok := r.channelQueues[ChannelID(chID)]
  376. messageType := r.channelMessages[ChannelID(chID)]
  377. r.channelMtx.RUnlock()
  378. if !ok {
  379. r.logger.Error("dropping message for unknown channel", "peer", peerID, "channel", chID)
  380. continue
  381. }
  382. msg := proto.Clone(messageType)
  383. if err := proto.Unmarshal(bz, msg); err != nil {
  384. r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err)
  385. continue
  386. }
  387. if wrapper, ok := msg.(Wrapper); ok {
  388. msg, err = wrapper.Unwrap()
  389. if err != nil {
  390. r.logger.Error("failed to unwrap message", "err", err)
  391. continue
  392. }
  393. }
  394. select {
  395. // FIXME: ReceiveMessage() should return ChannelID.
  396. case queue.enqueue() <- Envelope{channelID: ChannelID(chID), From: peerID, Message: msg}:
  397. r.logger.Debug("received message", "peer", peerID, "message", msg)
  398. case <-queue.closed():
  399. r.logger.Error("channel closed, dropping message", "peer", peerID, "channel", chID)
  400. case <-r.stopCh:
  401. return nil
  402. }
  403. }
  404. }
  405. // sendPeer sends queued messages to a peer.
  406. func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error {
  407. for {
  408. select {
  409. case envelope := <-queue.dequeue():
  410. bz, err := proto.Marshal(envelope.Message)
  411. if err != nil {
  412. r.logger.Error("failed to marshal message", "peer", peerID, "err", err)
  413. continue
  414. }
  415. // FIXME: SendMessage() should take ChannelID.
  416. _, err = conn.SendMessage(byte(envelope.channelID), bz)
  417. if err != nil {
  418. return err
  419. }
  420. r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message)
  421. case <-queue.closed():
  422. return nil
  423. case <-r.stopCh:
  424. return nil
  425. }
  426. }
  427. }
  428. // evictPeers evicts connected peers as requested by the peer manager.
  429. func (r *Router) evictPeers() {
  430. ctx := r.stopCtx()
  431. for {
  432. peerID, err := r.peerManager.EvictNext(ctx)
  433. switch err {
  434. case nil:
  435. case context.Canceled:
  436. r.logger.Debug("stopping evict routine")
  437. return
  438. default:
  439. r.logger.Error("failed to find next peer to evict", "err", err)
  440. return
  441. }
  442. r.logger.Info("evicting peer", "peer", peerID)
  443. r.peerMtx.RLock()
  444. if queue, ok := r.peerQueues[peerID]; ok {
  445. queue.close()
  446. }
  447. r.peerMtx.RUnlock()
  448. }
  449. }
  450. // OnStart implements service.Service.
  451. func (r *Router) OnStart() error {
  452. go r.dialPeers()
  453. for _, transport := range r.transports {
  454. go r.acceptPeers(transport)
  455. }
  456. go r.evictPeers()
  457. return nil
  458. }
  459. // OnStop implements service.Service.
  460. func (r *Router) OnStop() {
  461. // Collect all active queues, so we can wait for them to close.
  462. queues := []queue{}
  463. r.channelMtx.RLock()
  464. for _, q := range r.channelQueues {
  465. queues = append(queues, q)
  466. }
  467. r.channelMtx.RUnlock()
  468. r.peerMtx.RLock()
  469. for _, q := range r.peerQueues {
  470. queues = append(queues, q)
  471. }
  472. r.peerMtx.RUnlock()
  473. // Signal router shutdown, and wait for queues (and thus goroutines)
  474. // to complete.
  475. close(r.stopCh)
  476. for _, q := range queues {
  477. <-q.closed()
  478. }
  479. }
  480. // stopCtx returns a context that is cancelled when the router stops.
  481. func (r *Router) stopCtx() context.Context {
  482. ctx, cancel := context.WithCancel(context.Background())
  483. go func() {
  484. <-r.stopCh
  485. cancel()
  486. }()
  487. return ctx
  488. }