You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

541 lines
17 KiB

p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
p2p: make PeerManager.DialNext() and EvictNext() block (#5947) See #5936 and #5938 for background. The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about. I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.
4 years ago
  1. package p2p
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "time"
  8. "github.com/gogo/protobuf/proto"
  9. "github.com/tendermint/tendermint/libs/log"
  10. "github.com/tendermint/tendermint/libs/service"
  11. )
  12. // Router manages peer connections and routes messages between peers and reactor
  13. // channels. This is an early prototype.
  14. //
  15. // Channels are registered via OpenChannel(). When called, we register an input
  16. // message queue for the channel in channelQueues and spawn off a goroutine for
  17. // Router.routeChannel(). This goroutine reads off outbound messages and puts
  18. // them in the appropriate peer message queue, and processes peer errors which
  19. // will close (and thus disconnect) the appriate peer queue. It runs until
  20. // either the channel is closed by the caller or the router is stopped, at which
  21. // point the input message queue is closed and removed.
  22. //
  23. // On startup, the router spawns off three primary goroutines that maintain
  24. // connections to peers and run for the lifetime of the router:
  25. //
  26. // Router.dialPeers(): in a loop, asks the PeerManager for the next peer
  27. // address to contact, resolves it into endpoints, and attempts to dial
  28. // each one.
  29. //
  30. // Router.acceptPeers(): in a loop, waits for the next inbound connection
  31. // from a peer, and checks with the PeerManager if it should be accepted.
  32. //
  33. // Router.evictPeers(): in a loop, asks the PeerManager for any connected
  34. // peers to evict, and disconnects them.
  35. //
  36. // Once either an inbound or outbound connection has been made, an outbound
  37. // message queue is registered in Router.peerQueues and a goroutine is spawned
  38. // off for Router.routePeer() which will spawn off additional goroutines for
  39. // Router.sendPeer() that sends outbound messages from the peer queue over the
  40. // connection and for Router.receivePeer() that reads inbound messages from
  41. // the connection and places them in the appropriate channel queue. When either
  42. // goroutine exits, the connection and peer queue is closed, which will cause
  43. // the other goroutines to close as well.
  44. //
  45. // The peerStore is used to coordinate peer connections, by only allowing a peer
  46. // to be claimed (owned) by a single caller at a time (both for outbound and
  47. // inbound connections). This is done either via peerStore.Dispense() which
  48. // dispenses and claims an eligible peer to dial, or via peerStore.Claim() which
  49. // attempts to claim a given peer for an inbound connection. Peers must be
  50. // returned to the peerStore with peerStore.Return() to release the claim. Over
  51. // time, the peerStore will also do peer scheduling and prioritization, e.g.
  52. // ensuring we do exponential backoff on dial failures and connecting to
  53. // more important peers first (such as persistent peers and validators).
  54. //
  55. // An additional goroutine Router.broadcastPeerUpdates() is also spawned off
  56. // on startup, which consumes peer updates from Router.peerUpdatesCh (currently
  57. // only connections and disconnections), and broadcasts them to all peer update
  58. // subscriptions registered via SubscribePeerUpdates().
  59. //
  60. // On router shutdown, we close Router.stopCh which will signal to all
  61. // goroutines to terminate. This in turn will cause all pending channel/peer
  62. // queues to close, and we wait for this as a signal that goroutines have ended.
  63. //
  64. // All message scheduling should be limited to the queue implementations used
  65. // for channel queues and peer queues. All message sending throughout the router
  66. // is blocking, and if any messages should be dropped or buffered this is the
  67. // sole responsibility of the queue, such that we can limit this logic to a
  68. // single place. There is currently only a FIFO queue implementation that always
  69. // blocks and never drops messages, but this must be improved with other
  70. // implementations. The only exception is that all message sending must also
  71. // select on appropriate channel/queue/router closure signals, to avoid blocking
  72. // forever on a channel that has no consumer.
  73. type Router struct {
  74. *service.BaseService
  75. logger log.Logger
  76. transports map[Protocol]Transport
  77. peerManager *PeerManager
  78. // FIXME: Consider using sync.Map.
  79. peerMtx sync.RWMutex
  80. peerQueues map[NodeID]queue
  81. // FIXME: We don't strictly need to use a mutex for this if we seal the
  82. // channels on router start. This depends on whether we want to allow
  83. // dynamic channels in the future.
  84. channelMtx sync.RWMutex
  85. channelQueues map[ChannelID]queue
  86. channelMessages map[ChannelID]proto.Message
  87. // stopCh is used to signal router shutdown, by closing the channel.
  88. stopCh chan struct{}
  89. }
  90. // NewRouter creates a new Router, dialing the given peers.
  91. //
  92. // FIXME: providing protocol/transport maps is cumbersome in tests, we should
  93. // consider adding Protocols() to the Transport interface instead and register
  94. // protocol/transport mappings automatically on a first-come basis.
  95. func NewRouter(logger log.Logger, peerManager *PeerManager, transports map[Protocol]Transport) *Router {
  96. router := &Router{
  97. logger: logger,
  98. transports: transports,
  99. peerManager: peerManager,
  100. stopCh: make(chan struct{}),
  101. channelQueues: map[ChannelID]queue{},
  102. channelMessages: map[ChannelID]proto.Message{},
  103. peerQueues: map[NodeID]queue{},
  104. }
  105. router.BaseService = service.NewBaseService(logger, "router", router)
  106. return router
  107. }
  108. // OpenChannel opens a new channel for the given message type. The caller must
  109. // close the channel when done, and this must happen before the router stops.
  110. func (r *Router) OpenChannel(id ChannelID, messageType proto.Message) (*Channel, error) {
  111. // FIXME: NewChannel should take directional channels so we can pass
  112. // queue.dequeue() instead of reaching inside for queue.queueCh.
  113. queue := newFIFOQueue()
  114. channel := NewChannel(id, messageType, queue.queueCh, make(chan Envelope), make(chan PeerError))
  115. r.channelMtx.Lock()
  116. defer r.channelMtx.Unlock()
  117. if _, ok := r.channelQueues[id]; ok {
  118. return nil, fmt.Errorf("channel %v already exists", id)
  119. }
  120. r.channelQueues[id] = queue
  121. r.channelMessages[id] = messageType
  122. go func() {
  123. defer func() {
  124. r.channelMtx.Lock()
  125. delete(r.channelQueues, id)
  126. delete(r.channelMessages, id)
  127. r.channelMtx.Unlock()
  128. queue.close()
  129. }()
  130. r.routeChannel(channel)
  131. }()
  132. return channel, nil
  133. }
  134. // routeChannel receives outbound messages and errors from a channel and routes
  135. // them to the appropriate peer. It returns when either the channel is closed or
  136. // the router is shutting down.
  137. func (r *Router) routeChannel(channel *Channel) {
  138. for {
  139. select {
  140. case envelope, ok := <-channel.outCh:
  141. if !ok {
  142. return
  143. }
  144. // FIXME: This is a bit unergonomic, maybe it'd be better for Wrap()
  145. // to return a wrapped copy.
  146. if _, ok := channel.messageType.(Wrapper); ok {
  147. wrapper := proto.Clone(channel.messageType)
  148. if err := wrapper.(Wrapper).Wrap(envelope.Message); err != nil {
  149. r.Logger.Error("failed to wrap message", "err", err)
  150. continue
  151. }
  152. envelope.Message = wrapper
  153. }
  154. envelope.channelID = channel.id
  155. if envelope.Broadcast {
  156. r.peerMtx.RLock()
  157. peerQueues := make(map[NodeID]queue, len(r.peerQueues))
  158. for peerID, peerQueue := range r.peerQueues {
  159. peerQueues[peerID] = peerQueue
  160. }
  161. r.peerMtx.RUnlock()
  162. for peerID, peerQueue := range peerQueues {
  163. e := envelope
  164. e.Broadcast = false
  165. e.To = peerID
  166. select {
  167. case peerQueue.enqueue() <- e:
  168. case <-peerQueue.closed():
  169. case <-r.stopCh:
  170. return
  171. }
  172. }
  173. } else {
  174. r.peerMtx.RLock()
  175. peerQueue, ok := r.peerQueues[envelope.To]
  176. r.peerMtx.RUnlock()
  177. if !ok {
  178. r.logger.Error("dropping message for non-connected peer",
  179. "peer", envelope.To, "channel", channel.id)
  180. continue
  181. }
  182. select {
  183. case peerQueue.enqueue() <- envelope:
  184. case <-peerQueue.closed():
  185. r.logger.Error("dropping message for non-connected peer",
  186. "peer", envelope.To, "channel", channel.id)
  187. case <-r.stopCh:
  188. return
  189. }
  190. }
  191. case peerError, ok := <-channel.errCh:
  192. if !ok {
  193. return
  194. }
  195. // FIXME: We just disconnect the peer for now
  196. r.logger.Error("peer error, disconnecting", "peer", peerError.PeerID, "err", peerError.Err)
  197. r.peerMtx.RLock()
  198. peerQueue, ok := r.peerQueues[peerError.PeerID]
  199. r.peerMtx.RUnlock()
  200. if ok {
  201. peerQueue.close()
  202. }
  203. case <-channel.Done():
  204. return
  205. case <-r.stopCh:
  206. return
  207. }
  208. }
  209. }
  210. // acceptPeers accepts inbound connections from peers on the given transport.
  211. func (r *Router) acceptPeers(transport Transport) {
  212. ctx := r.stopCtx()
  213. for {
  214. // FIXME: We may need transports to enforce some sort of rate limiting
  215. // here (e.g. by IP address), or alternatively have PeerManager.Accepted()
  216. // do it for us.
  217. conn, err := transport.Accept(ctx)
  218. switch err {
  219. case nil:
  220. case ErrTransportClosed{}, io.EOF, context.Canceled:
  221. r.logger.Debug("stopping accept routine", "transport", transport)
  222. return
  223. default:
  224. r.logger.Error("failed to accept connection", "transport", transport, "err", err)
  225. continue
  226. }
  227. go func() {
  228. defer func() {
  229. _ = conn.Close()
  230. }()
  231. peerID := conn.NodeInfo().NodeID
  232. if err := r.peerManager.Accepted(peerID); err != nil {
  233. r.logger.Error("failed to accept connection", "peer", peerID, "err", err)
  234. return
  235. }
  236. queue := newFIFOQueue()
  237. r.peerMtx.Lock()
  238. r.peerQueues[peerID] = queue
  239. r.peerMtx.Unlock()
  240. r.peerManager.Ready(peerID)
  241. defer func() {
  242. r.peerMtx.Lock()
  243. delete(r.peerQueues, peerID)
  244. r.peerMtx.Unlock()
  245. queue.close()
  246. if err := r.peerManager.Disconnected(peerID); err != nil {
  247. r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err)
  248. }
  249. }()
  250. r.routePeer(peerID, conn, queue)
  251. }()
  252. }
  253. }
  254. // dialPeers maintains outbound connections to peers.
  255. func (r *Router) dialPeers() {
  256. ctx := r.stopCtx()
  257. for {
  258. peerID, address, err := r.peerManager.DialNext(ctx)
  259. switch err {
  260. case nil:
  261. case context.Canceled:
  262. r.logger.Debug("stopping dial routine")
  263. return
  264. default:
  265. r.logger.Error("failed to find next peer to dial", "err", err)
  266. return
  267. }
  268. go func() {
  269. conn, err := r.dialPeer(address)
  270. if err != nil {
  271. r.logger.Error("failed to dial peer", "peer", peerID)
  272. if err = r.peerManager.DialFailed(peerID, address); err != nil {
  273. r.logger.Error("failed to report dial failure", "peer", peerID, "err", err)
  274. }
  275. return
  276. }
  277. defer conn.Close()
  278. if err = r.peerManager.Dialed(peerID, address); err != nil {
  279. r.logger.Error("failed to dial peer", "peer", peerID, "err", err)
  280. return
  281. }
  282. queue := newFIFOQueue()
  283. r.peerMtx.Lock()
  284. r.peerQueues[peerID] = queue
  285. r.peerMtx.Unlock()
  286. r.peerManager.Ready(peerID)
  287. defer func() {
  288. r.peerMtx.Lock()
  289. delete(r.peerQueues, peerID)
  290. r.peerMtx.Unlock()
  291. queue.close()
  292. if err := r.peerManager.Disconnected(peerID); err != nil {
  293. r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err)
  294. }
  295. }()
  296. r.routePeer(peerID, conn, queue)
  297. }()
  298. }
  299. }
  300. // dialPeer attempts to connect to a peer.
  301. func (r *Router) dialPeer(address PeerAddress) (Connection, error) {
  302. ctx := context.Background()
  303. resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
  304. defer cancel()
  305. r.logger.Info("resolving peer address", "address", address)
  306. endpoints, err := address.Resolve(resolveCtx)
  307. if err != nil {
  308. return nil, fmt.Errorf("failed to resolve address %q: %w", address, err)
  309. }
  310. for _, endpoint := range endpoints {
  311. t, ok := r.transports[endpoint.Protocol]
  312. if !ok {
  313. r.logger.Error("no transport found for protocol", "protocol", endpoint.Protocol)
  314. continue
  315. }
  316. dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
  317. defer cancel()
  318. // FIXME: When we dial and handshake the peer, we should pass it
  319. // appropriate address(es) it can use to dial us back. It can't use our
  320. // remote endpoint, since TCP uses different port numbers for outbound
  321. // connections than it does for inbound. Also, we may need to vary this
  322. // by the peer's endpoint, since e.g. a peer on 192.168.0.0 can reach us
  323. // on a private address on this endpoint, but a peer on the public
  324. // Internet can't and needs a different public address.
  325. conn, err := t.Dial(dialCtx, endpoint)
  326. if err != nil {
  327. r.logger.Error("failed to dial endpoint", "endpoint", endpoint, "err", err)
  328. } else {
  329. r.logger.Info("connected to peer", "peer", address.ID, "endpoint", endpoint)
  330. return conn, nil
  331. }
  332. }
  333. return nil, fmt.Errorf("failed to connect to peer via %q", address)
  334. }
  335. // routePeer routes inbound messages from a peer to channels, and also sends
  336. // outbound queued messages to the peer. It will close the connection and send
  337. // queue, using this as a signal to coordinate the internal receivePeer() and
  338. // sendPeer() goroutines. It blocks until the peer is done, e.g. when the
  339. // connection or queue is closed.
  340. func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) {
  341. r.logger.Info("routing peer", "peer", peerID)
  342. resultsCh := make(chan error, 2)
  343. go func() {
  344. resultsCh <- r.receivePeer(peerID, conn)
  345. }()
  346. go func() {
  347. resultsCh <- r.sendPeer(peerID, conn, sendQueue)
  348. }()
  349. err := <-resultsCh
  350. _ = conn.Close()
  351. sendQueue.close()
  352. if e := <-resultsCh; err == nil {
  353. // The first err was nil, so we update it with the second result,
  354. // which may or may not be nil.
  355. err = e
  356. }
  357. switch err {
  358. case nil, io.EOF, ErrTransportClosed{}:
  359. r.logger.Info("peer disconnected", "peer", peerID)
  360. default:
  361. r.logger.Error("peer failure", "peer", peerID, "err", err)
  362. }
  363. }
  364. // receivePeer receives inbound messages from a peer, deserializes them and
  365. // passes them on to the appropriate channel.
  366. func (r *Router) receivePeer(peerID NodeID, conn Connection) error {
  367. for {
  368. chID, bz, err := conn.ReceiveMessage()
  369. if err != nil {
  370. return err
  371. }
  372. r.channelMtx.RLock()
  373. queue, ok := r.channelQueues[ChannelID(chID)]
  374. messageType := r.channelMessages[ChannelID(chID)]
  375. r.channelMtx.RUnlock()
  376. if !ok {
  377. r.logger.Error("dropping message for unknown channel", "peer", peerID, "channel", chID)
  378. continue
  379. }
  380. msg := proto.Clone(messageType)
  381. if err := proto.Unmarshal(bz, msg); err != nil {
  382. r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err)
  383. continue
  384. }
  385. if wrapper, ok := msg.(Wrapper); ok {
  386. msg, err = wrapper.Unwrap()
  387. if err != nil {
  388. r.logger.Error("failed to unwrap message", "err", err)
  389. continue
  390. }
  391. }
  392. select {
  393. // FIXME: ReceiveMessage() should return ChannelID.
  394. case queue.enqueue() <- Envelope{channelID: ChannelID(chID), From: peerID, Message: msg}:
  395. r.logger.Debug("received message", "peer", peerID, "message", msg)
  396. case <-queue.closed():
  397. r.logger.Error("channel closed, dropping message", "peer", peerID, "channel", chID)
  398. case <-r.stopCh:
  399. return nil
  400. }
  401. }
  402. }
  403. // sendPeer sends queued messages to a peer.
  404. func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error {
  405. for {
  406. select {
  407. case envelope := <-queue.dequeue():
  408. bz, err := proto.Marshal(envelope.Message)
  409. if err != nil {
  410. r.logger.Error("failed to marshal message", "peer", peerID, "err", err)
  411. continue
  412. }
  413. // FIXME: SendMessage() should take ChannelID.
  414. _, err = conn.SendMessage(byte(envelope.channelID), bz)
  415. if err != nil {
  416. return err
  417. }
  418. r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message)
  419. case <-queue.closed():
  420. return nil
  421. case <-r.stopCh:
  422. return nil
  423. }
  424. }
  425. }
  426. // evictPeers evicts connected peers as requested by the peer manager.
  427. func (r *Router) evictPeers() {
  428. ctx := r.stopCtx()
  429. for {
  430. peerID, err := r.peerManager.EvictNext(ctx)
  431. switch err {
  432. case nil:
  433. case context.Canceled:
  434. r.logger.Debug("stopping evict routine")
  435. return
  436. default:
  437. r.logger.Error("failed to find next peer to evict", "err", err)
  438. return
  439. }
  440. r.logger.Info("evicting peer", "peer", peerID)
  441. r.peerMtx.RLock()
  442. if queue, ok := r.peerQueues[peerID]; ok {
  443. queue.close()
  444. }
  445. r.peerMtx.RUnlock()
  446. }
  447. }
  448. // OnStart implements service.Service.
  449. func (r *Router) OnStart() error {
  450. go r.dialPeers()
  451. for _, transport := range r.transports {
  452. go r.acceptPeers(transport)
  453. }
  454. go r.evictPeers()
  455. return nil
  456. }
  457. // OnStop implements service.Service.
  458. func (r *Router) OnStop() {
  459. // Collect all active queues, so we can wait for them to close.
  460. queues := []queue{}
  461. r.channelMtx.RLock()
  462. for _, q := range r.channelQueues {
  463. queues = append(queues, q)
  464. }
  465. r.channelMtx.RUnlock()
  466. r.peerMtx.RLock()
  467. for _, q := range r.peerQueues {
  468. queues = append(queues, q)
  469. }
  470. r.peerMtx.RUnlock()
  471. // Signal router shutdown, and wait for queues (and thus goroutines)
  472. // to complete.
  473. close(r.stopCh)
  474. for _, q := range queues {
  475. <-q.closed()
  476. }
  477. }
  478. // stopCtx returns a context that is cancelled when the router stops.
  479. func (r *Router) stopCtx() context.Context {
  480. ctx, cancel := context.WithCancel(context.Background())
  481. go func() {
  482. <-r.stopCh
  483. cancel()
  484. }()
  485. return ctx
  486. }