You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

546 lines
16 KiB

  1. package p2p
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "time"
  8. "github.com/gogo/protobuf/proto"
  9. "github.com/tendermint/tendermint/libs/log"
  10. "github.com/tendermint/tendermint/libs/service"
  11. )
  12. // Router manages peer connections and routes messages between peers and reactor
  13. // channels. This is an early prototype.
  14. //
  15. // Channels are registered via OpenChannel(). When called, we register an input
  16. // message queue for the channel in channelQueues and spawn off a goroutine for
  17. // Router.routeChannel(). This goroutine reads off outbound messages and puts
  18. // them in the appropriate peer message queue, and processes peer errors which
  19. // will close (and thus disconnect) the appriate peer queue. It runs until
  20. // either the channel is closed by the caller or the router is stopped, at which
  21. // point the input message queue is closed and removed.
  22. //
  23. // On startup, the router spawns off three primary goroutines that maintain
  24. // connections to peers and run for the lifetime of the router:
  25. //
  26. // Router.dialPeers(): in a loop, asks the PeerManager for the next peer
  27. // address to contact, resolves it into endpoints, and attempts to dial
  28. // each one.
  29. //
  30. // Router.acceptPeers(): in a loop, waits for the next inbound connection
  31. // from a peer, and checks with the PeerManager if it should be accepted.
  32. //
  33. // Router.evictPeers(): in a loop, asks the PeerManager for any connected
  34. // peers to evict, and disconnects them.
  35. //
  36. // Once either an inbound or outbound connection has been made, an outbound
  37. // message queue is registered in Router.peerQueues and a goroutine is spawned
  38. // off for Router.routePeer() which will spawn off additional goroutines for
  39. // Router.sendPeer() that sends outbound messages from the peer queue over the
  40. // connection and for Router.receivePeer() that reads inbound messages from
  41. // the connection and places them in the appropriate channel queue. When either
  42. // goroutine exits, the connection and peer queue is closed, which will cause
  43. // the other goroutines to close as well.
  44. //
  45. // The peerStore is used to coordinate peer connections, by only allowing a peer
  46. // to be claimed (owned) by a single caller at a time (both for outbound and
  47. // inbound connections). This is done either via peerStore.Dispense() which
  48. // dispenses and claims an eligible peer to dial, or via peerStore.Claim() which
  49. // attempts to claim a given peer for an inbound connection. Peers must be
  50. // returned to the peerStore with peerStore.Return() to release the claim. Over
  51. // time, the peerStore will also do peer scheduling and prioritization, e.g.
  52. // ensuring we do exponential backoff on dial failures and connecting to
  53. // more important peers first (such as persistent peers and validators).
  54. //
  55. // An additional goroutine Router.broadcastPeerUpdates() is also spawned off
  56. // on startup, which consumes peer updates from Router.peerUpdatesCh (currently
  57. // only connections and disconnections), and broadcasts them to all peer update
  58. // subscriptions registered via SubscribePeerUpdates().
  59. //
  60. // On router shutdown, we close Router.stopCh which will signal to all
  61. // goroutines to terminate. This in turn will cause all pending channel/peer
  62. // queues to close, and we wait for this as a signal that goroutines have ended.
  63. //
  64. // All message scheduling should be limited to the queue implementations used
  65. // for channel queues and peer queues. All message sending throughout the router
  66. // is blocking, and if any messages should be dropped or buffered this is the
  67. // sole responsibility of the queue, such that we can limit this logic to a
  68. // single place. There is currently only a FIFO queue implementation that always
  69. // blocks and never drops messages, but this must be improved with other
  70. // implementations. The only exception is that all message sending must also
  71. // select on appropriate channel/queue/router closure signals, to avoid blocking
  72. // forever on a channel that has no consumer.
  73. type Router struct {
  74. *service.BaseService
  75. logger log.Logger
  76. transports map[Protocol]Transport
  77. peerManager *PeerManager
  78. // FIXME: Consider using sync.Map.
  79. peerMtx sync.RWMutex
  80. peerQueues map[NodeID]queue
  81. // FIXME: We don't strictly need to use a mutex for this if we seal the
  82. // channels on router start. This depends on whether we want to allow
  83. // dynamic channels in the future.
  84. channelMtx sync.RWMutex
  85. channelQueues map[ChannelID]queue
  86. channelMessages map[ChannelID]proto.Message
  87. // stopCh is used to signal router shutdown, by closing the channel.
  88. stopCh chan struct{}
  89. }
  90. // NewRouter creates a new Router, dialing the given peers.
  91. //
  92. // FIXME: providing protocol/transport maps is cumbersome in tests, we should
  93. // consider adding Protocols() to the Transport interface instead and register
  94. // protocol/transport mappings automatically on a first-come basis.
  95. func NewRouter(logger log.Logger, peerManager *PeerManager, transports map[Protocol]Transport) *Router {
  96. router := &Router{
  97. logger: logger,
  98. transports: transports,
  99. peerManager: peerManager,
  100. stopCh: make(chan struct{}),
  101. channelQueues: map[ChannelID]queue{},
  102. channelMessages: map[ChannelID]proto.Message{},
  103. peerQueues: map[NodeID]queue{},
  104. }
  105. router.BaseService = service.NewBaseService(logger, "router", router)
  106. return router
  107. }
  108. // OpenChannel opens a new channel for the given message type. The caller must
  109. // close the channel when done, and this must happen before the router stops.
  110. func (r *Router) OpenChannel(id ChannelID, messageType proto.Message) (*Channel, error) {
  111. // FIXME: NewChannel should take directional channels so we can pass
  112. // queue.dequeue() instead of reaching inside for queue.queueCh.
  113. queue := newFIFOQueue()
  114. channel := NewChannel(id, messageType, queue.queueCh, make(chan Envelope), make(chan PeerError))
  115. r.channelMtx.Lock()
  116. defer r.channelMtx.Unlock()
  117. if _, ok := r.channelQueues[id]; ok {
  118. return nil, fmt.Errorf("channel %v already exists", id)
  119. }
  120. r.channelQueues[id] = queue
  121. r.channelMessages[id] = messageType
  122. go func() {
  123. defer func() {
  124. r.channelMtx.Lock()
  125. delete(r.channelQueues, id)
  126. delete(r.channelMessages, id)
  127. r.channelMtx.Unlock()
  128. queue.close()
  129. }()
  130. r.routeChannel(channel)
  131. }()
  132. return channel, nil
  133. }
  134. // routeChannel receives outbound messages and errors from a channel and routes
  135. // them to the appropriate peer. It returns when either the channel is closed or
  136. // the router is shutting down.
  137. func (r *Router) routeChannel(channel *Channel) {
  138. for {
  139. select {
  140. case envelope, ok := <-channel.outCh:
  141. if !ok {
  142. return
  143. }
  144. // FIXME: This is a bit unergonomic, maybe it'd be better for Wrap()
  145. // to return a wrapped copy.
  146. if _, ok := channel.messageType.(Wrapper); ok {
  147. wrapper := proto.Clone(channel.messageType)
  148. if err := wrapper.(Wrapper).Wrap(envelope.Message); err != nil {
  149. r.Logger.Error("failed to wrap message", "err", err)
  150. continue
  151. }
  152. envelope.Message = wrapper
  153. }
  154. envelope.channelID = channel.id
  155. if envelope.Broadcast {
  156. r.peerMtx.RLock()
  157. peerQueues := make(map[NodeID]queue, len(r.peerQueues))
  158. for peerID, peerQueue := range r.peerQueues {
  159. peerQueues[peerID] = peerQueue
  160. }
  161. r.peerMtx.RUnlock()
  162. for peerID, peerQueue := range peerQueues {
  163. e := envelope
  164. e.Broadcast = false
  165. e.To = peerID
  166. select {
  167. case peerQueue.enqueue() <- e:
  168. case <-peerQueue.closed():
  169. case <-r.stopCh:
  170. return
  171. }
  172. }
  173. } else {
  174. r.peerMtx.RLock()
  175. peerQueue, ok := r.peerQueues[envelope.To]
  176. r.peerMtx.RUnlock()
  177. if !ok {
  178. r.logger.Error("dropping message for non-connected peer",
  179. "peer", envelope.To, "channel", channel.id)
  180. continue
  181. }
  182. select {
  183. case peerQueue.enqueue() <- envelope:
  184. case <-peerQueue.closed():
  185. r.logger.Error("dropping message for non-connected peer",
  186. "peer", envelope.To, "channel", channel.id)
  187. case <-r.stopCh:
  188. return
  189. }
  190. }
  191. case peerError, ok := <-channel.errCh:
  192. if !ok {
  193. return
  194. }
  195. // FIXME: We just disconnect the peer for now
  196. r.logger.Error("peer error, disconnecting", "peer", peerError.PeerID, "err", peerError.Err)
  197. r.peerMtx.RLock()
  198. peerQueue, ok := r.peerQueues[peerError.PeerID]
  199. r.peerMtx.RUnlock()
  200. if ok {
  201. peerQueue.close()
  202. }
  203. case <-channel.Done():
  204. return
  205. case <-r.stopCh:
  206. return
  207. }
  208. }
  209. }
  210. // acceptPeers accepts inbound connections from peers on the given transport.
  211. func (r *Router) acceptPeers(transport Transport) {
  212. for {
  213. select {
  214. case <-r.stopCh:
  215. return
  216. default:
  217. }
  218. // FIXME: We may need transports to enforce some sort of rate limiting
  219. // here (e.g. by IP address), or alternatively have PeerManager.Accepted()
  220. // do it for us.
  221. conn, err := transport.Accept(context.Background())
  222. switch err {
  223. case nil:
  224. case ErrTransportClosed{}, io.EOF:
  225. r.logger.Info("transport closed; stopping accept routine", "transport", transport)
  226. return
  227. default:
  228. r.logger.Error("failed to accept connection", "transport", transport, "err", err)
  229. continue
  230. }
  231. go func() {
  232. defer func() {
  233. _ = conn.Close()
  234. }()
  235. peerID := conn.NodeInfo().NodeID
  236. if err := r.peerManager.Accepted(peerID); err != nil {
  237. r.logger.Error("failed to accept connection", "peer", peerID, "err", err)
  238. return
  239. }
  240. queue := newFIFOQueue()
  241. r.peerMtx.Lock()
  242. r.peerQueues[peerID] = queue
  243. r.peerMtx.Unlock()
  244. r.peerManager.Ready(peerID)
  245. defer func() {
  246. r.peerMtx.Lock()
  247. delete(r.peerQueues, peerID)
  248. r.peerMtx.Unlock()
  249. queue.close()
  250. if err := r.peerManager.Disconnected(peerID); err != nil {
  251. r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err)
  252. }
  253. }()
  254. r.routePeer(peerID, conn, queue)
  255. }()
  256. }
  257. }
  258. // dialPeers maintains outbound connections to peers.
  259. func (r *Router) dialPeers() {
  260. for {
  261. select {
  262. case <-r.stopCh:
  263. return
  264. default:
  265. }
  266. peerID, address, err := r.peerManager.DialNext()
  267. if err != nil {
  268. r.logger.Error("failed to find next peer to dial", "err", err)
  269. return
  270. } else if peerID == "" {
  271. r.logger.Debug("no eligible peers, sleeping")
  272. select {
  273. case <-time.After(time.Second):
  274. continue
  275. case <-r.stopCh:
  276. return
  277. }
  278. }
  279. go func() {
  280. conn, err := r.dialPeer(address)
  281. if err != nil {
  282. r.logger.Error("failed to dial peer, will retry", "peer", peerID)
  283. if err = r.peerManager.DialFailed(peerID, address); err != nil {
  284. r.logger.Error("failed to report dial failure", "peer", peerID, "err", err)
  285. }
  286. return
  287. }
  288. defer conn.Close()
  289. if err = r.peerManager.Dialed(peerID, address); err != nil {
  290. r.logger.Error("failed to dial peer", "peer", peerID, "err", err)
  291. return
  292. }
  293. queue := newFIFOQueue()
  294. r.peerMtx.Lock()
  295. r.peerQueues[peerID] = queue
  296. r.peerMtx.Unlock()
  297. r.peerManager.Ready(peerID)
  298. defer func() {
  299. r.peerMtx.Lock()
  300. delete(r.peerQueues, peerID)
  301. r.peerMtx.Unlock()
  302. queue.close()
  303. if err := r.peerManager.Disconnected(peerID); err != nil {
  304. r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err)
  305. }
  306. }()
  307. r.routePeer(peerID, conn, queue)
  308. }()
  309. }
  310. }
  311. // dialPeer attempts to connect to a peer.
  312. func (r *Router) dialPeer(address PeerAddress) (Connection, error) {
  313. ctx := context.Background()
  314. resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
  315. defer cancel()
  316. r.logger.Info("resolving peer address", "address", address)
  317. endpoints, err := address.Resolve(resolveCtx)
  318. if err != nil {
  319. return nil, fmt.Errorf("failed to resolve address %q: %w", address, err)
  320. }
  321. for _, endpoint := range endpoints {
  322. t, ok := r.transports[endpoint.Protocol]
  323. if !ok {
  324. r.logger.Error("no transport found for protocol", "protocol", endpoint.Protocol)
  325. continue
  326. }
  327. dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
  328. defer cancel()
  329. conn, err := t.Dial(dialCtx, endpoint)
  330. if err != nil {
  331. r.logger.Error("failed to dial endpoint", "endpoint", endpoint)
  332. } else {
  333. r.logger.Info("connected to peer", "peer", address.NodeID(), "endpoint", endpoint)
  334. return conn, nil
  335. }
  336. }
  337. return nil, fmt.Errorf("failed to connect to peer via %q", address)
  338. }
  339. // routePeer routes inbound messages from a peer to channels, and also sends
  340. // outbound queued messages to the peer. It will close the connection and send
  341. // queue, using this as a signal to coordinate the internal receivePeer() and
  342. // sendPeer() goroutines. It blocks until the peer is done, e.g. when the
  343. // connection or queue is closed.
  344. func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) {
  345. r.logger.Info("routing peer", "peer", peerID)
  346. resultsCh := make(chan error, 2)
  347. go func() {
  348. resultsCh <- r.receivePeer(peerID, conn)
  349. }()
  350. go func() {
  351. resultsCh <- r.sendPeer(peerID, conn, sendQueue)
  352. }()
  353. err := <-resultsCh
  354. _ = conn.Close()
  355. sendQueue.close()
  356. if e := <-resultsCh; err == nil {
  357. // The first err was nil, so we update it with the second result,
  358. // which may or may not be nil.
  359. err = e
  360. }
  361. switch err {
  362. case nil, io.EOF, ErrTransportClosed{}:
  363. r.logger.Info("peer disconnected", "peer", peerID)
  364. default:
  365. r.logger.Error("peer failure", "peer", peerID, "err", err)
  366. }
  367. }
  368. // receivePeer receives inbound messages from a peer, deserializes them and
  369. // passes them on to the appropriate channel.
  370. func (r *Router) receivePeer(peerID NodeID, conn Connection) error {
  371. for {
  372. chID, bz, err := conn.ReceiveMessage()
  373. if err != nil {
  374. return err
  375. }
  376. r.channelMtx.RLock()
  377. queue, ok := r.channelQueues[ChannelID(chID)]
  378. messageType := r.channelMessages[ChannelID(chID)]
  379. r.channelMtx.RUnlock()
  380. if !ok {
  381. r.logger.Error("dropping message for unknown channel", "peer", peerID, "channel", chID)
  382. continue
  383. }
  384. msg := proto.Clone(messageType)
  385. if err := proto.Unmarshal(bz, msg); err != nil {
  386. r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err)
  387. continue
  388. }
  389. if wrapper, ok := msg.(Wrapper); ok {
  390. msg, err = wrapper.Unwrap()
  391. if err != nil {
  392. r.logger.Error("failed to unwrap message", "err", err)
  393. continue
  394. }
  395. }
  396. select {
  397. // FIXME: ReceiveMessage() should return ChannelID.
  398. case queue.enqueue() <- Envelope{channelID: ChannelID(chID), From: peerID, Message: msg}:
  399. r.logger.Debug("received message", "peer", peerID, "message", msg)
  400. case <-queue.closed():
  401. r.logger.Error("channel closed, dropping message", "peer", peerID, "channel", chID)
  402. case <-r.stopCh:
  403. return nil
  404. }
  405. }
  406. }
  407. // sendPeer sends queued messages to a peer.
  408. func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error {
  409. for {
  410. select {
  411. case envelope := <-queue.dequeue():
  412. bz, err := proto.Marshal(envelope.Message)
  413. if err != nil {
  414. r.logger.Error("failed to marshal message", "peer", peerID, "err", err)
  415. continue
  416. }
  417. // FIXME: SendMessage() should take ChannelID.
  418. _, err = conn.SendMessage(byte(envelope.channelID), bz)
  419. if err != nil {
  420. return err
  421. }
  422. r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message)
  423. case <-queue.closed():
  424. return nil
  425. case <-r.stopCh:
  426. return nil
  427. }
  428. }
  429. }
  430. // evictPeers evicts connected peers as requested by the peer manager.
  431. func (r *Router) evictPeers() {
  432. for {
  433. select {
  434. case <-r.stopCh:
  435. return
  436. default:
  437. }
  438. peerID, err := r.peerManager.EvictNext()
  439. if err != nil {
  440. r.logger.Error("failed to find next peer to evict", "err", err)
  441. return
  442. } else if peerID == "" {
  443. r.logger.Debug("no evictable peers, sleeping")
  444. select {
  445. case <-time.After(time.Second):
  446. continue
  447. case <-r.stopCh:
  448. return
  449. }
  450. }
  451. r.logger.Info("evicting peer", "peer", peerID)
  452. r.peerMtx.RLock()
  453. queue, ok := r.peerQueues[peerID]
  454. r.peerMtx.RUnlock()
  455. if ok {
  456. queue.close()
  457. }
  458. }
  459. }
  460. // OnStart implements service.Service.
  461. func (r *Router) OnStart() error {
  462. go r.dialPeers()
  463. for _, transport := range r.transports {
  464. go r.acceptPeers(transport)
  465. }
  466. go r.evictPeers()
  467. return nil
  468. }
  469. // OnStop implements service.Service.
  470. func (r *Router) OnStop() {
  471. // Collect all active queues, so we can wait for them to close.
  472. queues := []queue{}
  473. r.channelMtx.RLock()
  474. for _, q := range r.channelQueues {
  475. queues = append(queues, q)
  476. }
  477. r.channelMtx.RUnlock()
  478. r.peerMtx.RLock()
  479. for _, q := range r.peerQueues {
  480. queues = append(queues, q)
  481. }
  482. r.peerMtx.RUnlock()
  483. // Signal router shutdown, and wait for queues (and thus goroutines)
  484. // to complete.
  485. close(r.stopCh)
  486. for _, q := range queues {
  487. <-q.closed()
  488. }
  489. }