You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

226 lines
6.7 KiB

  1. package pex
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/tendermint/tendermint/libs/log"
  7. "github.com/tendermint/tendermint/libs/service"
  8. "github.com/tendermint/tendermint/p2p"
  9. protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
  10. )
  11. var (
  12. _ service.Service = (*ReactorV2)(nil)
  13. _ p2p.Wrapper = (*protop2p.PexMessage)(nil)
  14. )
  15. const (
  16. maxAddresses uint16 = 100
  17. resolveTimeout = 3 * time.Second
  18. )
  19. // ReactorV2 is a PEX reactor for the new P2P stack. The legacy reactor
  20. // is Reactor.
  21. //
  22. // FIXME: Rename this when Reactor is removed, and consider moving to p2p/.
  23. type ReactorV2 struct {
  24. service.BaseService
  25. peerManager *p2p.PeerManager
  26. pexCh *p2p.Channel
  27. peerUpdates *p2p.PeerUpdatesCh
  28. closeCh chan struct{}
  29. }
  30. // NewReactor returns a reference to a new reactor.
  31. func NewReactorV2(
  32. logger log.Logger,
  33. peerManager *p2p.PeerManager,
  34. pexCh *p2p.Channel,
  35. peerUpdates *p2p.PeerUpdatesCh,
  36. ) *ReactorV2 {
  37. r := &ReactorV2{
  38. peerManager: peerManager,
  39. pexCh: pexCh,
  40. peerUpdates: peerUpdates,
  41. closeCh: make(chan struct{}),
  42. }
  43. r.BaseService = *service.NewBaseService(logger, "PEX", r)
  44. return r
  45. }
  46. // OnStart starts separate go routines for each p2p Channel and listens for
  47. // envelopes on each. In addition, it also listens for peer updates and handles
  48. // messages on that p2p channel accordingly. The caller must be sure to execute
  49. // OnStop to ensure the outbound p2p Channels are closed.
  50. func (r *ReactorV2) OnStart() error {
  51. go r.processPexCh()
  52. go r.processPeerUpdates()
  53. return nil
  54. }
  55. // OnStop stops the reactor by signaling to all spawned goroutines to exit and
  56. // blocking until they all exit.
  57. func (r *ReactorV2) OnStop() {
  58. // Close closeCh to signal to all spawned goroutines to gracefully exit. All
  59. // p2p Channels should execute Close().
  60. close(r.closeCh)
  61. // Wait for all p2p Channels to be closed before returning. This ensures we
  62. // can easily reason about synchronization of all p2p Channels and ensure no
  63. // panics will occur.
  64. <-r.pexCh.Done()
  65. <-r.peerUpdates.Done()
  66. }
  67. // handlePexMessage handles envelopes sent from peers on the PexChannel.
  68. func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error {
  69. logger := r.Logger.With("peer", envelope.From)
  70. // FIXME: We may want to add DoS protection here, by rate limiting peers and
  71. // only processing addresses we actually requested.
  72. switch msg := envelope.Message.(type) {
  73. case *protop2p.PexRequest:
  74. pexAddresses := r.resolve(r.peerManager.Advertise(envelope.From, maxAddresses), maxAddresses)
  75. r.pexCh.Out() <- p2p.Envelope{
  76. To: envelope.From,
  77. Message: &protop2p.PexResponse{Addresses: pexAddresses},
  78. }
  79. case *protop2p.PexResponse:
  80. for _, pexAddress := range msg.Addresses {
  81. peerAddress, err := p2p.ParseNodeAddress(
  82. fmt.Sprintf("%s@%s:%d", pexAddress.ID, pexAddress.IP, pexAddress.Port))
  83. if err != nil {
  84. logger.Debug("invalid PEX address", "address", pexAddress, "err", err)
  85. continue
  86. }
  87. if err = r.peerManager.Add(peerAddress); err != nil {
  88. logger.Debug("failed to register PEX address", "address", peerAddress, "err", err)
  89. }
  90. }
  91. default:
  92. return fmt.Errorf("received unknown message: %T", msg)
  93. }
  94. return nil
  95. }
  96. // resolve resolves a set of peer addresses into PEX addresses.
  97. //
  98. // FIXME: This is necessary because the current PEX protocol only supports
  99. // IP/port pairs, while the P2P stack uses NodeAddress URLs. The PEX protocol
  100. // should really use URLs too, to exchange DNS names instead of IPs and allow
  101. // different transport protocols (e.g. QUIC and MemoryTransport).
  102. //
  103. // FIXME: We may want to cache and parallelize this, but for now we'll just rely
  104. // on the operating system to cache it for us.
  105. func (r *ReactorV2) resolve(addresses []p2p.NodeAddress, limit uint16) []protop2p.PexAddress {
  106. pexAddresses := make([]protop2p.PexAddress, 0, len(addresses))
  107. for _, address := range addresses {
  108. ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout)
  109. endpoints, err := address.Resolve(ctx)
  110. cancel()
  111. if err != nil {
  112. r.Logger.Debug("failed to resolve address", "address", address, "err", err)
  113. continue
  114. }
  115. for _, endpoint := range endpoints {
  116. if len(pexAddresses) >= int(limit) {
  117. return pexAddresses
  118. } else if endpoint.IP != nil {
  119. // PEX currently only supports IP-networked transports (as
  120. // opposed to e.g. p2p.MemoryTransport).
  121. pexAddresses = append(pexAddresses, protop2p.PexAddress{
  122. ID: string(address.NodeID),
  123. IP: endpoint.IP.String(),
  124. Port: uint32(endpoint.Port),
  125. })
  126. }
  127. }
  128. }
  129. return pexAddresses
  130. }
  131. // handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
  132. // It will handle errors and any possible panics gracefully. A caller can handle
  133. // any error returned by sending a PeerError on the respective channel.
  134. func (r *ReactorV2) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
  135. defer func() {
  136. if e := recover(); e != nil {
  137. err = fmt.Errorf("panic in processing message: %v", e)
  138. }
  139. }()
  140. r.Logger.Debug("received message", "peer", envelope.From)
  141. switch chID {
  142. case p2p.ChannelID(PexChannel):
  143. err = r.handlePexMessage(envelope)
  144. default:
  145. err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
  146. }
  147. return err
  148. }
  149. // processPexCh implements a blocking event loop where we listen for p2p
  150. // Envelope messages from the pexCh.
  151. func (r *ReactorV2) processPexCh() {
  152. defer r.pexCh.Close()
  153. for {
  154. select {
  155. case envelope := <-r.pexCh.In():
  156. if err := r.handleMessage(r.pexCh.ID(), envelope); err != nil {
  157. r.Logger.Error("failed to process message", "ch_id", r.pexCh.ID(), "envelope", envelope, "err", err)
  158. r.pexCh.Error() <- p2p.PeerError{
  159. PeerID: envelope.From,
  160. Err: err,
  161. Severity: p2p.PeerErrorSeverityLow,
  162. }
  163. }
  164. case <-r.closeCh:
  165. r.Logger.Debug("stopped listening on PEX channel; closing...")
  166. return
  167. }
  168. }
  169. }
  170. // processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we
  171. // send a request for addresses.
  172. func (r *ReactorV2) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
  173. r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status)
  174. if peerUpdate.Status == p2p.PeerStatusUp {
  175. r.pexCh.Out() <- p2p.Envelope{
  176. To: peerUpdate.PeerID,
  177. Message: &protop2p.PexRequest{},
  178. }
  179. }
  180. }
  181. // processPeerUpdates initiates a blocking process where we listen for and handle
  182. // PeerUpdate messages. When the reactor is stopped, we will catch the signal and
  183. // close the p2p PeerUpdatesCh gracefully.
  184. func (r *ReactorV2) processPeerUpdates() {
  185. defer r.peerUpdates.Close()
  186. for {
  187. select {
  188. case peerUpdate := <-r.peerUpdates.Updates():
  189. r.processPeerUpdate(peerUpdate)
  190. case <-r.closeCh:
  191. r.Logger.Debug("stopped listening on peer updates channel; closing...")
  192. return
  193. }
  194. }
  195. }