package pex import ( "context" "fmt" "time" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/p2p" protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p" ) var ( _ service.Service = (*ReactorV2)(nil) _ p2p.Wrapper = (*protop2p.PexMessage)(nil) ) const ( maxAddresses uint16 = 100 resolveTimeout = 3 * time.Second ) // ReactorV2 is a PEX reactor for the new P2P stack. The legacy reactor // is Reactor. // // FIXME: Rename this when Reactor is removed, and consider moving to p2p/. type ReactorV2 struct { service.BaseService peerManager *p2p.PeerManager pexCh *p2p.Channel peerUpdates *p2p.PeerUpdates closeCh chan struct{} } // NewReactor returns a reference to a new reactor. func NewReactorV2( logger log.Logger, peerManager *p2p.PeerManager, pexCh *p2p.Channel, peerUpdates *p2p.PeerUpdates, ) *ReactorV2 { r := &ReactorV2{ peerManager: peerManager, pexCh: pexCh, peerUpdates: peerUpdates, closeCh: make(chan struct{}), } r.BaseService = *service.NewBaseService(logger, "PEX", r) return r } // OnStart starts separate go routines for each p2p Channel and listens for // envelopes on each. In addition, it also listens for peer updates and handles // messages on that p2p channel accordingly. The caller must be sure to execute // OnStop to ensure the outbound p2p Channels are closed. func (r *ReactorV2) OnStart() error { go r.processPexCh() go r.processPeerUpdates() return nil } // OnStop stops the reactor by signaling to all spawned goroutines to exit and // blocking until they all exit. func (r *ReactorV2) OnStop() { // Close closeCh to signal to all spawned goroutines to gracefully exit. All // p2p Channels should execute Close(). close(r.closeCh) // Wait for all p2p Channels to be closed before returning. This ensures we // can easily reason about synchronization of all p2p Channels and ensure no // panics will occur. <-r.pexCh.Done() <-r.peerUpdates.Done() } // handlePexMessage handles envelopes sent from peers on the PexChannel. func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { logger := r.Logger.With("peer", envelope.From) // FIXME: We may want to add DoS protection here, by rate limiting peers and // only processing addresses we actually requested. switch msg := envelope.Message.(type) { case *protop2p.PexRequest: pexAddresses := r.resolve(r.peerManager.Advertise(envelope.From, maxAddresses), maxAddresses) r.pexCh.Out <- p2p.Envelope{ To: envelope.From, Message: &protop2p.PexResponse{Addresses: pexAddresses}, } case *protop2p.PexResponse: for _, pexAddress := range msg.Addresses { peerAddress, err := p2p.ParseNodeAddress( fmt.Sprintf("%s@%s:%d", pexAddress.ID, pexAddress.IP, pexAddress.Port)) if err != nil { logger.Debug("invalid PEX address", "address", pexAddress, "err", err) continue } if err = r.peerManager.Add(peerAddress); err != nil { logger.Debug("failed to register PEX address", "address", peerAddress, "err", err) } } default: return fmt.Errorf("received unknown message: %T", msg) } return nil } // resolve resolves a set of peer addresses into PEX addresses. // // FIXME: This is necessary because the current PEX protocol only supports // IP/port pairs, while the P2P stack uses NodeAddress URLs. The PEX protocol // should really use URLs too, to exchange DNS names instead of IPs and allow // different transport protocols (e.g. QUIC and MemoryTransport). // // FIXME: We may want to cache and parallelize this, but for now we'll just rely // on the operating system to cache it for us. func (r *ReactorV2) resolve(addresses []p2p.NodeAddress, limit uint16) []protop2p.PexAddress { pexAddresses := make([]protop2p.PexAddress, 0, len(addresses)) for _, address := range addresses { ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout) endpoints, err := address.Resolve(ctx) cancel() if err != nil { r.Logger.Debug("failed to resolve address", "address", address, "err", err) continue } for _, endpoint := range endpoints { if len(pexAddresses) >= int(limit) { return pexAddresses } else if endpoint.IP != nil { // PEX currently only supports IP-networked transports (as // opposed to e.g. p2p.MemoryTransport). pexAddresses = append(pexAddresses, protop2p.PexAddress{ ID: string(address.NodeID), IP: endpoint.IP.String(), Port: uint32(endpoint.Port), }) } } } return pexAddresses } // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. func (r *ReactorV2) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) } }() r.Logger.Debug("received message", "peer", envelope.From) switch chID { case p2p.ChannelID(PexChannel): err = r.handlePexMessage(envelope) default: err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) } return err } // processPexCh implements a blocking event loop where we listen for p2p // Envelope messages from the pexCh. func (r *ReactorV2) processPexCh() { defer r.pexCh.Close() for { select { case envelope := <-r.pexCh.In: if err := r.handleMessage(r.pexCh.ID, envelope); err != nil { r.Logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err) r.pexCh.Error <- p2p.PeerError{ NodeID: envelope.From, Err: err, } } case <-r.closeCh: r.Logger.Debug("stopped listening on PEX channel; closing...") return } } } // processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we // send a request for addresses. func (r *ReactorV2) processPeerUpdate(peerUpdate p2p.PeerUpdate) { r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) if peerUpdate.Status == p2p.PeerStatusUp { r.pexCh.Out <- p2p.Envelope{ To: peerUpdate.NodeID, Message: &protop2p.PexRequest{}, } } } // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. func (r *ReactorV2) processPeerUpdates() { defer r.peerUpdates.Close() for { select { case peerUpdate := <-r.peerUpdates.Updates(): r.processPeerUpdate(peerUpdate) case <-r.closeCh: r.Logger.Debug("stopped listening on peer updates channel; closing...") return } } }