You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

525 lines
16 KiB

  1. package p2p
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "time"
  8. "github.com/gogo/protobuf/proto"
  9. "github.com/tendermint/tendermint/libs/log"
  10. "github.com/tendermint/tendermint/libs/service"
  11. )
  12. // Router manages peer connections and routes messages between peers and reactor
  13. // channels. This is an early prototype.
  14. //
  15. // Channels are registered via OpenChannel(). When called, we register an input
  16. // message queue for the channel in channelQueues and spawn off a goroutine for
  17. // Router.routeChannel(). This goroutine reads off outbound messages and puts
  18. // them in the appropriate peer message queue, and processes peer errors which
  19. // will close (and thus disconnect) the appriate peer queue. It runs until
  20. // either the channel is closed by the caller or the router is stopped, at which
  21. // point the input message queue is closed and removed.
  22. //
  23. // On startup, the router spawns off two primary goroutines that maintain
  24. // connections to peers and run for the lifetime of the router:
  25. //
  26. // Router.dialPeers(): in a loop, asks the peerStore to dispense an
  27. // eligible peer to connect to, and attempts to resolve and dial each
  28. // address until successful.
  29. //
  30. // Router.acceptPeers(): in a loop, waits for the next inbound connection
  31. // from a peer, and attempts to claim it in the peerStore.
  32. //
  33. // Once either an inbound or outbound connection has been made, an outbound
  34. // message queue is registered in Router.peerQueues and a goroutine is spawned
  35. // off for Router.routePeer() which will spawn off additional goroutines for
  36. // Router.sendPeer() that sends outbound messages from the peer queue over the
  37. // connection and for Router.receivePeer() that reads inbound messages from
  38. // the connection and places them in the appropriate channel queue. When either
  39. // goroutine exits, the connection and peer queue is closed, which will cause
  40. // the other goroutines to close as well.
  41. //
  42. // The peerStore is used to coordinate peer connections, by only allowing a peer
  43. // to be claimed (owned) by a single caller at a time (both for outbound and
  44. // inbound connections). This is done either via peerStore.Dispense() which
  45. // dispenses and claims an eligible peer to dial, or via peerStore.Claim() which
  46. // attempts to claim a given peer for an inbound connection. Peers must be
  47. // returned to the peerStore with peerStore.Return() to release the claim. Over
  48. // time, the peerStore will also do peer scheduling and prioritization, e.g.
  49. // ensuring we do exponential backoff on dial failures and connecting to
  50. // more important peers first (such as persistent peers and validators).
  51. //
  52. // An additional goroutine Router.broadcastPeerUpdates() is also spawned off
  53. // on startup, which consumes peer updates from Router.peerUpdatesCh (currently
  54. // only connections and disconnections), and broadcasts them to all peer update
  55. // subscriptions registered via SubscribePeerUpdates().
  56. //
  57. // On router shutdown, we close Router.stopCh which will signal to all
  58. // goroutines to terminate. This in turn will cause all pending channel/peer
  59. // queues to close, and we wait for this as a signal that goroutines have ended.
  60. //
  61. // All message scheduling should be limited to the queue implementations used
  62. // for channel queues and peer queues. All message sending throughout the router
  63. // is blocking, and if any messages should be dropped or buffered this is the
  64. // sole responsibility of the queue, such that we can limit this logic to a
  65. // single place. There is currently only a FIFO queue implementation that always
  66. // blocks and never drops messages, but this must be improved with other
  67. // implementations. The only exception is that all message sending must also
  68. // select on appropriate channel/queue/router closure signals, to avoid blocking
  69. // forever on a channel that has no consumer.
  70. type Router struct {
  71. *service.BaseService
  72. logger log.Logger
  73. transports map[Protocol]Transport
  74. peerManager *peerManager
  75. // FIXME: Consider using sync.Map.
  76. peerMtx sync.RWMutex
  77. peerQueues map[NodeID]queue
  78. // FIXME: We don't strictly need to use a mutex for this if we seal the
  79. // channels on router start. This depends on whether we want to allow
  80. // dynamic channels in the future.
  81. channelMtx sync.RWMutex
  82. channelQueues map[ChannelID]queue
  83. channelMessages map[ChannelID]proto.Message
  84. // stopCh is used to signal router shutdown, by closing the channel.
  85. stopCh chan struct{}
  86. }
  87. // NewRouter creates a new Router, dialing the given peers.
  88. //
  89. // FIXME: providing protocol/transport maps is cumbersome in tests, we should
  90. // consider adding Protocols() to the Transport interface instead and register
  91. // protocol/transport mappings automatically on a first-come basis.
  92. func NewRouter(logger log.Logger, transports map[Protocol]Transport, peers []PeerAddress) *Router {
  93. router := &Router{
  94. logger: logger,
  95. transports: transports,
  96. peerManager: newPeerManager(newPeerStore()),
  97. stopCh: make(chan struct{}),
  98. channelQueues: map[ChannelID]queue{},
  99. channelMessages: map[ChannelID]proto.Message{},
  100. peerQueues: map[NodeID]queue{},
  101. }
  102. router.BaseService = service.NewBaseService(logger, "router", router)
  103. for _, address := range peers {
  104. if err := router.peerManager.Add(address); err != nil {
  105. logger.Error("failed to add peer", "address", address, "err", err)
  106. }
  107. }
  108. return router
  109. }
  110. // OpenChannel opens a new channel for the given message type. The caller must
  111. // close the channel when done, and this must happen before the router stops.
  112. func (r *Router) OpenChannel(id ChannelID, messageType proto.Message) (*Channel, error) {
  113. // FIXME: NewChannel should take directional channels so we can pass
  114. // queue.dequeue() instead of reaching inside for queue.queueCh.
  115. queue := newFIFOQueue()
  116. channel := NewChannel(id, messageType, queue.queueCh, make(chan Envelope), make(chan PeerError))
  117. r.channelMtx.Lock()
  118. defer r.channelMtx.Unlock()
  119. if _, ok := r.channelQueues[id]; ok {
  120. return nil, fmt.Errorf("channel %v already exists", id)
  121. }
  122. r.channelQueues[id] = queue
  123. r.channelMessages[id] = messageType
  124. go func() {
  125. defer func() {
  126. r.channelMtx.Lock()
  127. delete(r.channelQueues, id)
  128. delete(r.channelMessages, id)
  129. r.channelMtx.Unlock()
  130. queue.close()
  131. }()
  132. r.routeChannel(channel)
  133. }()
  134. return channel, nil
  135. }
  136. // routeChannel receives outbound messages and errors from a channel and routes
  137. // them to the appropriate peer. It returns when either the channel is closed or
  138. // the router is shutting down.
  139. func (r *Router) routeChannel(channel *Channel) {
  140. for {
  141. select {
  142. case envelope, ok := <-channel.outCh:
  143. if !ok {
  144. return
  145. }
  146. // FIXME: This is a bit unergonomic, maybe it'd be better for Wrap()
  147. // to return a wrapped copy.
  148. if _, ok := channel.messageType.(Wrapper); ok {
  149. wrapper := proto.Clone(channel.messageType)
  150. if err := wrapper.(Wrapper).Wrap(envelope.Message); err != nil {
  151. r.Logger.Error("failed to wrap message", "err", err)
  152. continue
  153. }
  154. envelope.Message = wrapper
  155. }
  156. envelope.channelID = channel.id
  157. if envelope.Broadcast {
  158. r.peerMtx.RLock()
  159. peerQueues := make(map[NodeID]queue, len(r.peerQueues))
  160. for peerID, peerQueue := range r.peerQueues {
  161. peerQueues[peerID] = peerQueue
  162. }
  163. r.peerMtx.RUnlock()
  164. for peerID, peerQueue := range peerQueues {
  165. e := envelope
  166. e.Broadcast = false
  167. e.To = peerID
  168. select {
  169. case peerQueue.enqueue() <- e:
  170. case <-peerQueue.closed():
  171. case <-r.stopCh:
  172. return
  173. }
  174. }
  175. } else {
  176. r.peerMtx.RLock()
  177. peerQueue, ok := r.peerQueues[envelope.To]
  178. r.peerMtx.RUnlock()
  179. if !ok {
  180. r.logger.Error("dropping message for non-connected peer",
  181. "peer", envelope.To, "channel", channel.id)
  182. continue
  183. }
  184. select {
  185. case peerQueue.enqueue() <- envelope:
  186. case <-peerQueue.closed():
  187. r.logger.Error("dropping message for non-connected peer",
  188. "peer", envelope.To, "channel", channel.id)
  189. case <-r.stopCh:
  190. return
  191. }
  192. }
  193. case peerError, ok := <-channel.errCh:
  194. if !ok {
  195. return
  196. }
  197. // FIXME: We just disconnect the peer for now
  198. r.logger.Error("peer error, disconnecting", "peer", peerError.PeerID, "err", peerError.Err)
  199. r.peerMtx.RLock()
  200. peerQueue, ok := r.peerQueues[peerError.PeerID]
  201. r.peerMtx.RUnlock()
  202. if ok {
  203. peerQueue.close()
  204. }
  205. case <-channel.Done():
  206. return
  207. case <-r.stopCh:
  208. return
  209. }
  210. }
  211. }
  212. // acceptPeers accepts inbound connections from peers on the given transport.
  213. func (r *Router) acceptPeers(transport Transport) {
  214. for {
  215. select {
  216. case <-r.stopCh:
  217. return
  218. default:
  219. }
  220. conn, err := transport.Accept(context.Background())
  221. switch err {
  222. case nil:
  223. case ErrTransportClosed{}, io.EOF:
  224. r.logger.Info("transport closed; stopping accept routine", "transport", transport)
  225. return
  226. default:
  227. r.logger.Error("failed to accept connection", "transport", transport, "err", err)
  228. continue
  229. }
  230. go func() {
  231. defer func() {
  232. _ = conn.Close()
  233. }()
  234. peerID := conn.NodeInfo().NodeID
  235. if err := r.peerManager.Accepted(peerID); err != nil {
  236. r.logger.Error("failed to accept connection", "peer", peerID, "err", err)
  237. return
  238. }
  239. queue := newFIFOQueue()
  240. r.peerMtx.Lock()
  241. r.peerQueues[peerID] = queue
  242. r.peerMtx.Unlock()
  243. r.peerManager.Ready(peerID)
  244. defer func() {
  245. r.peerMtx.Lock()
  246. delete(r.peerQueues, peerID)
  247. r.peerMtx.Unlock()
  248. queue.close()
  249. if err := r.peerManager.Disconnected(peerID); err != nil {
  250. r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err)
  251. }
  252. }()
  253. r.routePeer(peerID, conn, queue)
  254. }()
  255. }
  256. }
  257. // dialPeers maintains outbound connections to peers.
  258. func (r *Router) dialPeers() {
  259. for {
  260. select {
  261. case <-r.stopCh:
  262. return
  263. default:
  264. }
  265. peerID, address, err := r.peerManager.DialNext()
  266. if err != nil {
  267. r.logger.Error("failed to find next peer to dial", "err", err)
  268. return
  269. } else if peerID == "" {
  270. r.logger.Debug("no eligible peers, sleeping")
  271. select {
  272. case <-time.After(time.Second):
  273. continue
  274. case <-r.stopCh:
  275. return
  276. }
  277. }
  278. go func() {
  279. conn, err := r.dialPeer(address)
  280. if err != nil {
  281. r.logger.Error("failed to dial peer, will retry", "peer", peerID)
  282. if err = r.peerManager.DialFailed(peerID, address); err != nil {
  283. r.logger.Error("failed to report dial failure", "peer", peerID, "err", err)
  284. }
  285. return
  286. }
  287. defer conn.Close()
  288. if err = r.peerManager.Dialed(peerID, address); err != nil {
  289. r.logger.Error("failed to dial peer", "peer", peerID, "err", err)
  290. return
  291. }
  292. queue := newFIFOQueue()
  293. r.peerMtx.Lock()
  294. r.peerQueues[peerID] = queue
  295. r.peerMtx.Unlock()
  296. r.peerManager.Ready(peerID)
  297. defer func() {
  298. r.peerMtx.Lock()
  299. delete(r.peerQueues, peerID)
  300. r.peerMtx.Unlock()
  301. queue.close()
  302. if err := r.peerManager.Disconnected(peerID); err != nil {
  303. r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err)
  304. }
  305. }()
  306. r.routePeer(peerID, conn, queue)
  307. }()
  308. }
  309. }
  310. // dialPeer attempts to connect to a peer.
  311. func (r *Router) dialPeer(address PeerAddress) (Connection, error) {
  312. ctx := context.Background()
  313. resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
  314. defer cancel()
  315. r.logger.Info("resolving peer address", "address", address)
  316. endpoints, err := address.Resolve(resolveCtx)
  317. if err != nil {
  318. return nil, fmt.Errorf("failed to resolve address %q: %w", address, err)
  319. }
  320. for _, endpoint := range endpoints {
  321. t, ok := r.transports[endpoint.Protocol]
  322. if !ok {
  323. r.logger.Error("no transport found for protocol", "protocol", endpoint.Protocol)
  324. continue
  325. }
  326. dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
  327. defer cancel()
  328. conn, err := t.Dial(dialCtx, endpoint)
  329. if err != nil {
  330. r.logger.Error("failed to dial endpoint", "endpoint", endpoint)
  331. } else {
  332. r.logger.Info("connected to peer", "peer", address.NodeID(), "endpoint", endpoint)
  333. return conn, nil
  334. }
  335. }
  336. return nil, fmt.Errorf("failed to connect to peer via %q", address)
  337. }
  338. // routePeer routes inbound messages from a peer to channels, and also sends
  339. // outbound queued messages to the peer. It will close the connection and send
  340. // queue, using this as a signal to coordinate the internal receivePeer() and
  341. // sendPeer() goroutines. It blocks until the peer is done, e.g. when the
  342. // connection or queue is closed.
  343. func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) {
  344. r.logger.Info("routing peer", "peer", peerID)
  345. resultsCh := make(chan error, 2)
  346. go func() {
  347. resultsCh <- r.receivePeer(peerID, conn)
  348. }()
  349. go func() {
  350. resultsCh <- r.sendPeer(peerID, conn, sendQueue)
  351. }()
  352. err := <-resultsCh
  353. _ = conn.Close()
  354. sendQueue.close()
  355. if e := <-resultsCh; err == nil {
  356. // The first err was nil, so we update it with the second result,
  357. // which may or may not be nil.
  358. err = e
  359. }
  360. switch err {
  361. case nil, io.EOF, ErrTransportClosed{}:
  362. r.logger.Info("peer disconnected", "peer", peerID)
  363. default:
  364. r.logger.Error("peer failure", "peer", peerID, "err", err)
  365. }
  366. }
  367. // receivePeer receives inbound messages from a peer, deserializes them and
  368. // passes them on to the appropriate channel.
  369. func (r *Router) receivePeer(peerID NodeID, conn Connection) error {
  370. for {
  371. chID, bz, err := conn.ReceiveMessage()
  372. if err != nil {
  373. return err
  374. }
  375. r.channelMtx.RLock()
  376. queue, ok := r.channelQueues[ChannelID(chID)]
  377. messageType := r.channelMessages[ChannelID(chID)]
  378. r.channelMtx.RUnlock()
  379. if !ok {
  380. r.logger.Error("dropping message for unknown channel", "peer", peerID, "channel", chID)
  381. continue
  382. }
  383. msg := proto.Clone(messageType)
  384. if err := proto.Unmarshal(bz, msg); err != nil {
  385. r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err)
  386. continue
  387. }
  388. if wrapper, ok := msg.(Wrapper); ok {
  389. msg, err = wrapper.Unwrap()
  390. if err != nil {
  391. r.logger.Error("failed to unwrap message", "err", err)
  392. continue
  393. }
  394. }
  395. select {
  396. // FIXME: ReceiveMessage() should return ChannelID.
  397. case queue.enqueue() <- Envelope{channelID: ChannelID(chID), From: peerID, Message: msg}:
  398. r.logger.Debug("received message", "peer", peerID, "message", msg)
  399. case <-queue.closed():
  400. r.logger.Error("channel closed, dropping message", "peer", peerID, "channel", chID)
  401. case <-r.stopCh:
  402. return nil
  403. }
  404. }
  405. }
  406. // sendPeer sends queued messages to a peer.
  407. func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error {
  408. for {
  409. select {
  410. case envelope := <-queue.dequeue():
  411. bz, err := proto.Marshal(envelope.Message)
  412. if err != nil {
  413. r.logger.Error("failed to marshal message", "peer", peerID, "err", err)
  414. continue
  415. }
  416. // FIXME: SendMessage() should take ChannelID.
  417. _, err = conn.SendMessage(byte(envelope.channelID), bz)
  418. if err != nil {
  419. return err
  420. }
  421. r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message)
  422. case <-queue.closed():
  423. return nil
  424. case <-r.stopCh:
  425. return nil
  426. }
  427. }
  428. }
  429. // SubscribePeerUpdates creates a new peer updates subscription. The caller must
  430. // consume the peer updates in a timely fashion and close the subscription when
  431. // done, since delivery is guaranteed and will block peer
  432. // connection/disconnection otherwise.
  433. //
  434. // FIXME: Consider having callers just use peerManager.Subscribe() directly, if
  435. // we export peerManager and make it an injected dependency (which we probably
  436. // should).
  437. func (r *Router) SubscribePeerUpdates() *PeerUpdatesCh {
  438. return r.peerManager.Subscribe()
  439. }
  440. // OnStart implements service.Service.
  441. func (r *Router) OnStart() error {
  442. go r.dialPeers()
  443. for _, transport := range r.transports {
  444. go r.acceptPeers(transport)
  445. }
  446. return nil
  447. }
  448. // OnStop implements service.Service.
  449. func (r *Router) OnStop() {
  450. // Collect all active queues, so we can wait for them to close.
  451. queues := []queue{}
  452. r.channelMtx.RLock()
  453. for _, q := range r.channelQueues {
  454. queues = append(queues, q)
  455. }
  456. r.channelMtx.RUnlock()
  457. r.peerMtx.RLock()
  458. for _, q := range r.peerQueues {
  459. queues = append(queues, q)
  460. }
  461. r.peerMtx.RUnlock()
  462. // Signal router shutdown, and wait for queues (and thus goroutines)
  463. // to complete.
  464. close(r.stopCh)
  465. for _, q := range queues {
  466. <-q.closed()
  467. }
  468. }