Browse Source

p2p: wire pex v2 reactor to router (#6407)

pull/6426/head
Callum Waters 4 years ago
committed by GitHub
parent
commit
ec5e3b0b02
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 77 additions and 27 deletions
  1. +36
    -15
      node/node.go
  2. +0
    -1
      p2p/node_info.go
  3. +1
    -1
      p2p/peer.go
  4. +20
    -2
      p2p/pex/reactor.go
  5. +0
    -8
      p2p/switch.go
  6. +11
    -0
      p2p/transport_mconn.go
  7. +9
    -0
      test/e2e/runner/main.go

+ 36
- 15
node/node.go View File

@ -770,9 +770,6 @@ func createSwitch(
sw.SetNodeInfo(nodeInfo)
sw.SetNodeKey(nodeKey)
// XXX: needed to support old/new P2P stacks
sw.PutChannelDescsIntoTransport()
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile())
return sw
}
@ -955,11 +952,24 @@ func NewSeedNode(config *cfg.Config,
return nil, fmt.Errorf("failed to create router: %w", err)
}
// start the pex reactor
pexReactor := createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
pexReactorV2, err := createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
var (
pexReactor *pex.Reactor
pexReactorV2 *pex.ReactorV2
)
// add the pex reactor
// FIXME: we add channel descriptors to both the router and the transport but only the router
// should be aware of channel info. We should remove this from transport once the legacy
// p2p stack is removed.
router.AddChannelDescriptors(pex.ChannelDescriptors())
transport.AddChannelDescriptors(pex.ChannelDescriptors())
if config.P2P.DisableLegacy {
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
} else {
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
}
if config.RPC.PprofListenAddress != "" {
@ -1203,12 +1213,20 @@ func NewNode(config *cfg.Config,
config.StateSync.TempDir,
)
// add the channel descriptors to both the router and the underlying
// transports
router.AddChannelDescriptors(mpReactorShim.GetChannels())
router.AddChannelDescriptors(bcReactorForSwitch.GetChannels())
router.AddChannelDescriptors(csReactorShim.GetChannels())
router.AddChannelDescriptors(evReactorShim.GetChannels())
router.AddChannelDescriptors(stateSyncReactorShim.GetChannels())
transport.AddChannelDescriptors(mpReactorShim.GetChannels())
transport.AddChannelDescriptors(bcReactorForSwitch.GetChannels())
transport.AddChannelDescriptors(csReactorShim.GetChannels())
transport.AddChannelDescriptors(evReactorShim.GetChannels())
transport.AddChannelDescriptors(stateSyncReactorShim.GetChannels())
// setup Transport and Switch
sw := createSwitch(
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
@ -1248,13 +1266,16 @@ func NewNode(config *cfg.Config,
)
if config.P2P.PexReactor {
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
router.AddChannelDescriptors(pex.ChannelDescriptors())
transport.AddChannelDescriptors(pex.ChannelDescriptors())
if config.P2P.DisableLegacy {
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
} else {
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
}
router.AddChannelDescriptors(pexReactor.GetChannels())
}
if config.RPC.PprofListenAddress != "" {
@ -1428,7 +1449,7 @@ func (n *Node) OnStop() {
if n.config.Mode != cfg.ModeSeed {
// now stop the reactors
if n.config.FastSync.Version == "v0" {
if n.config.FastSync.Version == cfg.BlockchainV0 {
// Stop the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the blockchain reactor", "err", err)


+ 0
- 1
p2p/node_info.go View File

@ -52,7 +52,6 @@ type NodeInfo struct {
ProtocolVersion ProtocolVersion `json:"protocol_version"`
// Authenticate
// TODO: replace with NetAddress
NodeID NodeID `json:"id"` // authenticated identifier
ListenAddr string `json:"listen_addr"` // accepting incoming


+ 1
- 1
p2p/peer.go View File

@ -104,7 +104,7 @@ func newPeer(
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.Channels, // TODO
channels: nodeInfo.Channels,
reactors: reactorsByCh,
onPeerError: onPeerError,
Data: cmap.NewCMap(),


+ 20
- 2
p2p/pex/reactor.go View File

@ -11,6 +11,7 @@ import (
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/conn"
protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
@ -40,6 +41,23 @@ const (
fullCapacityInterval = 10 * time.Minute
)
// TODO: We should decide whether we want channel descriptors to be housed
// within each reactor (as they are now) or, considering that the reactor doesn't
// really need to care about the channel descriptors, if they should be housed
// in the node module.
func ChannelDescriptors() []*conn.ChannelDescriptor {
return []*conn.ChannelDescriptor{
{
ID: PexChannel,
Priority: 1,
SendQueueCapacity: 10,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 200,
},
}
}
// ReactorV2 is a PEX reactor for the new P2P stack. The legacy reactor
// is Reactor.
//
@ -389,6 +407,8 @@ func (r *ReactorV2) waitUntilNextRequest() <-chan time.Time {
// peer into the requestsSent bucket and calculates when the next request
// time should be
func (r *ReactorV2) sendRequestForPeers() {
r.mtx.Lock()
defer r.mtx.Unlock()
peer := r.availablePeers.Front()
if peer == nil {
// no peers are available
@ -414,9 +434,7 @@ func (r *ReactorV2) sendRequestForPeers() {
// remove the peer from the available peers list and mark it in the requestsSent map
r.availablePeers.Remove(peer)
peer.DetachPrev()
r.mtx.Lock()
r.requestsSent[peerID] = struct{}{}
r.mtx.Unlock()
r.calculateNextRequestTime()
r.Logger.Debug("peer request sent", "next_request_time", r.nextRequestTime)


+ 0
- 8
p2p/switch.go View File

@ -1037,11 +1037,3 @@ func (sw *Switch) addPeer(p Peer) error {
return nil
}
// FIXME: Eww, needed to wire up the new P2P stack along with the old one. This
// should be passed into the transport when it's constructed.
func (sw *Switch) PutChannelDescsIntoTransport() {
if t, ok := sw.transport.(*MConnTransport); ok {
t.channelDescs = sw.chDescs
}
}

+ 11
- 0
p2p/transport_mconn.go View File

@ -185,6 +185,17 @@ func (m *MConnTransport) Close() error {
return err
}
// SetChannels sets the channel descriptors to be used when
// establishing a connection.
//
// FIXME: To be removed when the legacy p2p stack is removed. Channel
// descriptors should be managed by the router. The underlying transport and
// connections should be agnostic to everything but the channel ID's which are
// initialized in the handshake.
func (m *MConnTransport) AddChannelDescriptors(channelDesc []*ChannelDescriptor) {
m.channelDescs = append(m.channelDescs, channelDesc...)
}
// validateEndpoint validates an endpoint.
func (m *MConnTransport) validateEndpoint(endpoint Endpoint) error {
if err := endpoint.Validate(); err != nil {


+ 9
- 0
test/e2e/runner/main.go View File

@ -171,6 +171,15 @@ func NewCLI() *CLI {
},
})
cli.root.AddCommand(&cobra.Command{
Use: "resume",
Short: "Resumes the Docker testnet",
RunE: func(cmd *cobra.Command, args []string) error {
logger.Info("Resuming testnet")
return execCompose(cli.testnet.Dir, "up")
},
})
cli.root.AddCommand(&cobra.Command{
Use: "load [multiplier]",
Args: cobra.MaximumNArgs(1),


Loading…
Cancel
Save