Browse Source

node: feature flag for legacy p2p support (#6056)

pull/6132/head
Erik Grinaker 3 years ago
committed by GitHub
parent
commit
b6be889b97
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 285 additions and 50 deletions
  1. +274
    -50
      node/node.go
  2. +8
    -0
      p2p/switch.go
  3. +1
    -0
      test/e2e/docker/Dockerfile
  4. +2
    -0
      test/e2e/runner/rpc.go

+ 274
- 50
node/node.go View File

@ -8,6 +8,8 @@ import (
"net"
"net/http"
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
"os"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
@ -34,6 +36,7 @@ import (
"github.com/tendermint/tendermint/p2p/pex"
"github.com/tendermint/tendermint/privval"
tmgrpc "github.com/tendermint/tendermint/privval/grpc"
protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
rpccore "github.com/tendermint/tendermint/rpc/core"
grpccore "github.com/tendermint/tendermint/rpc/grpc"
@ -49,7 +52,13 @@ import (
"github.com/tendermint/tendermint/version"
)
//------------------------------------------------------------------------------
var useLegacyP2P = true
func init() {
if v := os.Getenv("TM_LEGACY_P2P"); len(v) > 0 {
useLegacyP2P, _ = strconv.ParseBool(v)
}
}
// DBContext specifies config information for loading a new DB.
type DBContext struct {
@ -182,7 +191,9 @@ type Node struct {
// network
transport *p2p.MConnTransport
sw *p2p.Switch // p2p connections
sw *p2p.Switch // p2p connections
peerManager *p2p.PeerManager
router *p2p.Router
addrBook pex.AddrBook // known peers
nodeInfo p2p.NodeInfo
nodeKey p2p.NodeKey // our node privkey
@ -202,6 +213,7 @@ type Node struct {
consensusState *cs.State // latest consensus state
consensusReactor *cs.Reactor // for participating in the consensus
pexReactor *pex.Reactor // for exchanging peer addresses
pexReactorV2 *pex.ReactorV2 // for exchanging peer addresses
evidenceReactor *evidence.Reactor
evidencePool *evidence.Pool // tracking evidence
proxyApp proxy.AppConns // connection to the application
@ -324,7 +336,8 @@ func createMempoolReactor(
proxyApp proxy.AppConns,
state sm.State,
memplMetrics *mempl.Metrics,
peerMgr *p2p.PeerManager,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
) (*p2p.ReactorShim, *mempl.Reactor, *mempl.CListMempool) {
@ -340,14 +353,29 @@ func createMempoolReactor(
mempool.SetLogger(logger)
reactorShim := p2p.NewReactorShim(logger, "MempoolShim", mempl.GetChannelShims(config.Mempool))
channelShims := mempl.GetChannelShims(config.Mempool)
reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims)
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if useLegacyP2P {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, channelShims)
peerUpdates = peerManager.Subscribe()
}
reactor := mempl.NewReactor(
logger,
config.Mempool,
peerMgr,
peerManager,
mempool,
reactorShim.GetChannel(mempl.MempoolChannel),
reactorShim.PeerUpdates,
channels[mempl.MempoolChannel],
peerUpdates,
)
if config.Consensus.WaitForTxs() {
@ -362,6 +390,8 @@ func createEvidenceReactor(
dbProvider DBProvider,
stateDB dbm.DB,
blockStore *store.BlockStore,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) {
evidenceDB, err := dbProvider(&DBContext{"evidence", config})
@ -370,21 +400,34 @@ func createEvidenceReactor(
}
logger = logger.With("module", "evidence")
reactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims)
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore)
if err != nil {
return nil, nil, nil, err
}
evidenceReactorShim := p2p.NewReactorShim(logger, "EvidenceShim", evidence.ChannelShims)
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if useLegacyP2P {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, evidence.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
evidenceReactor := evidence.NewReactor(
logger,
evidenceReactorShim.GetChannel(evidence.EvidenceChannel),
evidenceReactorShim.PeerUpdates,
channels[evidence.EvidenceChannel],
peerUpdates,
evidencePool,
)
return evidenceReactorShim, evidenceReactor, evidencePool, nil
return reactorShim, evidenceReactor, evidencePool, nil
}
func createBlockchainReactor(
@ -394,6 +437,8 @@ func createBlockchainReactor(
blockExec *sm.BlockExecutor,
blockStore *store.BlockStore,
csReactor *cs.Reactor,
peerManager *p2p.PeerManager,
router *p2p.Router,
fastSync bool,
) (*p2p.ReactorShim, service.Service, error) {
@ -403,9 +448,22 @@ func createBlockchainReactor(
case "v0":
reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if useLegacyP2P {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, bcv0.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
reactor, err := bcv0.NewReactor(
logger, state.Copy(), blockExec, blockStore, csReactor,
reactorShim.GetChannel(bcv0.BlockchainChannel), reactorShim.PeerUpdates, fastSync,
channels[bcv0.BlockchainChannel], peerUpdates, fastSync,
)
if err != nil {
return nil, nil, err
@ -435,6 +493,8 @@ func createConsensusReactor(
csMetrics *cs.Metrics,
waitSync bool,
eventBus *types.EventBus,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
) (*p2p.ReactorShim, *cs.Reactor, *cs.State) {
@ -455,14 +515,28 @@ func createConsensusReactor(
}
reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims)
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if useLegacyP2P {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, cs.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
reactor := cs.NewReactor(
logger,
consensusState,
reactorShim.GetChannel(cs.StateChannel),
reactorShim.GetChannel(cs.DataChannel),
reactorShim.GetChannel(cs.VoteChannel),
reactorShim.GetChannel(cs.VoteSetBitsChannel),
reactorShim.PeerUpdates,
channels[cs.StateChannel],
channels[cs.DataChannel],
channels[cs.VoteChannel],
channels[cs.VoteSetBitsChannel],
peerUpdates,
waitSync,
cs.ReactorMetrics(csMetrics),
)
@ -474,10 +548,7 @@ func createConsensusReactor(
return reactorShim, reactor, consensusState
}
func createTransport(
logger log.Logger,
config *cfg.Config,
) *p2p.MConnTransport {
func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport {
return p2p.NewMConnTransport(
logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{},
p2p.MConnTransportOptions{
@ -488,6 +559,52 @@ func createTransport(
)
}
func createPeerManager(config *cfg.Config, p2pLogger log.Logger, nodeID p2p.NodeID) (*p2p.PeerManager, error) {
options := p2p.PeerManagerOptions{
MaxConnected: 64,
MaxConnectedUpgrade: 4,
MaxPeers: 1000,
MinRetryTime: 100 * time.Millisecond,
MaxRetryTime: 8 * time.Hour,
MaxRetryTimePersistent: 5 * time.Minute,
RetryTimeJitter: 3 * time.Second,
}
peers := []p2p.NodeAddress{}
for _, p := range splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
}
peers = append(peers, address)
options.PersistentPeers = append(options.PersistentPeers, address.NodeID)
}
peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), options)
if err != nil {
return nil, fmt.Errorf("failed to create peer manager: %w", err)
}
for _, peer := range peers {
if err := peerManager.Add(peer); err != nil {
return nil, fmt.Errorf("failed to add peer %q: %w", peer, err)
}
}
return peerManager, nil
}
func createRouter(
p2pLogger log.Logger,
nodeInfo p2p.NodeInfo,
privKey crypto.PrivKey,
peerManager *p2p.PeerManager,
transport p2p.Transport,
) (*p2p.Router, error) {
return p2p.NewRouter(p2pLogger, nodeInfo, privKey, peerManager, []p2p.Transport{transport}, p2p.RouterOptions{})
}
func createSwitch(config *cfg.Config,
transport p2p.Transport,
p2pMetrics *p2p.Metrics,
@ -499,7 +616,8 @@ func createSwitch(config *cfg.Config,
proxyApp proxy.AppConns,
nodeInfo p2p.NodeInfo,
nodeKey p2p.NodeKey,
p2pLogger log.Logger) *p2p.Switch {
p2pLogger log.Logger,
) *p2p.Switch {
var (
connFilters = []p2p.ConnFilterFunc{}
@ -557,6 +675,7 @@ func createSwitch(config *cfg.Config,
p2p.SwitchPeerFilters(peerFilters...),
p2p.SwitchConnFilters(connFilters...),
)
sw.SetLogger(p2pLogger)
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
@ -567,6 +686,9 @@ func createSwitch(config *cfg.Config,
sw.SetNodeInfo(nodeInfo)
sw.SetNodeKey(nodeKey)
// XXX: needed to support old/new P2P stacks
sw.PutChannelDescsIntoTransport()
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile())
return sw
}
@ -619,6 +741,22 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
return pexReactor
}
func createPEXReactorV2(
config *cfg.Config,
logger log.Logger,
peerManager *p2p.PeerManager,
router *p2p.Router,
) (*pex.ReactorV2, error) {
channel, err := router.OpenChannel(p2p.ChannelID(pex.PexChannel), &protop2p.PexMessage{}, 0)
if err != nil {
return nil, err
}
peerUpdates := peerManager.Subscribe()
return pex.NewReactorV2(logger, peerManager, channel, peerUpdates), nil
}
// startStateSync starts an asynchronous state sync process, then switches to fast sync mode.
func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reactor,
stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, fastSync bool,
@ -775,15 +913,32 @@ func NewNode(config *cfg.Config,
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
// TODO: Use a persistent peer database.
peerMgr, err := p2p.NewPeerManager(nodeKey.ID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state)
if err != nil {
return nil, err
}
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, config)
peerManager, err := createPeerManager(config, p2pLogger, nodeKey.ID)
if err != nil {
return nil, fmt.Errorf("failed to create peer manager: %w", err)
}
router, err := createRouter(p2pLogger, nodeInfo, nodeKey.PrivKey, peerManager, transport)
if err != nil {
return nil, fmt.Errorf("failed to create router: %w", err)
}
csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)
mpReactorShim, mpReactor, mempool := createMempoolReactor(config, proxyApp, state, memplMetrics, peerMgr, logger)
mpReactorShim, mpReactor, mempool := createMempoolReactor(
config, proxyApp, state, memplMetrics, peerManager, router, logger,
)
evReactorShim, evReactor, evPool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger)
evReactorShim, evReactor, evPool, err := createEvidenceReactor(
config, dbProvider, stateDB, blockStore, peerManager, router, logger,
)
if err != nil {
return nil, err
}
@ -800,13 +955,15 @@ func NewNode(config *cfg.Config,
csReactorShim, csReactor, csState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evPool,
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger,
privValidator, csMetrics, stateSync || fastSync, eventBus,
peerManager, router, consensusLogger,
)
// Create the blockchain reactor. Note, we do not start fast sync if we're
// doing a state sync first.
bcReactorShim, bcReactor, err := createBlockchainReactor(
logger, config, state, blockExec, blockStore, csReactor, fastSync && !stateSync,
logger, config, state, blockExec, blockStore, csReactor,
peerManager, router, fastSync && !stateSync,
)
if err != nil {
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
@ -834,29 +991,34 @@ func NewNode(config *cfg.Config,
// https://github.com/tendermint/tendermint/issues/4644
stateSyncReactorShim := p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims)
var (
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
if useLegacyP2P {
channels = getChannelsFromShim(stateSyncReactorShim)
peerUpdates = stateSyncReactorShim.PeerUpdates
} else {
channels = makeChannelsFromShims(router, statesync.ChannelShims)
peerUpdates = peerManager.Subscribe()
}
stateSyncReactor := statesync.NewReactor(
stateSyncReactorShim.Logger,
proxyApp.Snapshot(),
proxyApp.Query(),
stateSyncReactorShim.GetChannel(statesync.SnapshotChannel),
stateSyncReactorShim.GetChannel(statesync.ChunkChannel),
stateSyncReactorShim.PeerUpdates,
channels[statesync.SnapshotChannel],
channels[statesync.ChunkChannel],
peerUpdates,
config.StateSync.TempDir,
)
nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state)
if err != nil {
return nil, err
}
// Setup Transport and Switch.
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, config)
sw := createSwitch(
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
@ -884,9 +1046,17 @@ func NewNode(config *cfg.Config,
//
// If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
// Note we currently use the addrBook regardless at least for AddOurAddress
var pexReactor *pex.Reactor
var (
pexReactor *pex.Reactor
pexReactorV2 *pex.ReactorV2
)
if config.P2P.PexReactor {
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
}
if config.RPC.PprofListenAddress != "" {
@ -901,11 +1071,13 @@ func NewNode(config *cfg.Config,
genesisDoc: genDoc,
privValidator: privValidator,
transport: transport,
sw: sw,
addrBook: addrBook,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
transport: transport,
sw: sw,
peerManager: peerManager,
router: router,
addrBook: addrBook,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
stateStore: stateStore,
blockStore: blockStore,
@ -918,6 +1090,7 @@ func NewNode(config *cfg.Config,
stateSync: stateSync,
stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
pexReactor: pexReactor,
pexReactorV2: pexReactorV2,
evidenceReactor: evReactor,
evidencePool: evPool,
proxyApp: proxyApp,
@ -980,8 +1153,13 @@ func (n *Node) OnStart() error {
n.isListening = true
// Start the switch (the P2P server).
err = n.sw.Start()
n.Logger.Info("p2p service", "legacy_enabled", useLegacyP2P)
if useLegacyP2P {
err = n.sw.Start()
} else {
err = n.router.Start()
}
if err != nil {
return err
}
@ -1013,6 +1191,12 @@ func (n *Node) OnStart() error {
return err
}
if !useLegacyP2P && n.pexReactorV2 != nil {
if err := n.pexReactorV2.Start(); err != nil {
return err
}
}
// Always connect to persistent peers
err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
if err != nil {
@ -1050,10 +1234,6 @@ func (n *Node) OnStop() {
}
// now stop the reactors
if err := n.sw.Stop(); err != nil {
n.Logger.Error("Error closing switch", "err", err)
}
if n.config.FastSync.Version == "v0" {
// Stop the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Stop(); err != nil {
@ -1081,6 +1261,22 @@ func (n *Node) OnStop() {
n.Logger.Error("failed to stop the evidence reactor", "err", err)
}
if !useLegacyP2P && n.pexReactorV2 != nil {
if err := n.pexReactorV2.Stop(); err != nil {
n.Logger.Error("failed to stop the PEX v2 reactor", "err", err)
}
}
if useLegacyP2P {
if err := n.sw.Stop(); err != nil {
n.Logger.Error("failed to stop switch", "err", err)
}
} else {
if err := n.router.Stop(); err != nil {
n.Logger.Error("failed to stop router", "err", err)
}
}
// stop mempool WAL
if n.config.Mempool.WalEnabled() {
n.mempool.CloseWAL()
@ -1547,3 +1743,31 @@ func createAndStartPrivValidatorGRPCClient(
return pvsc, nil
}
// FIXME: Temporary helper function, shims should be removed.
func makeChannelsFromShims(
router *p2p.Router,
chShims map[p2p.ChannelID]*p2p.ChannelDescriptorShim,
) map[p2p.ChannelID]*p2p.Channel {
channels := map[p2p.ChannelID]*p2p.Channel{}
for chID, chShim := range chShims {
ch, err := router.OpenChannel(chID, chShim.MsgType, 0)
if err != nil {
panic(fmt.Sprintf("failed to open channel %v: %v", chID, err))
}
channels[chID] = ch
}
return channels
}
func getChannelsFromShim(reactorShim *p2p.ReactorShim) map[p2p.ChannelID]*p2p.Channel {
channels := map[p2p.ChannelID]*p2p.Channel{}
for chID := range reactorShim.Channels {
channels[chID] = reactorShim.GetChannel(chID)
}
return channels
}

+ 8
- 0
p2p/switch.go View File

@ -1037,3 +1037,11 @@ func (sw *Switch) addPeer(p Peer) error {
return nil
}
// FIXME: Eww, needed to wire up the new P2P stack along with the old one. This
// should be passed into the transport when it's constructed.
func (sw *Switch) PutChannelDescsIntoTransport() {
if t, ok := sw.transport.(*MConnTransport); ok {
t.channelDescs = sw.chDescs
}
}

+ 1
- 0
test/e2e/docker/Dockerfile View File

@ -27,6 +27,7 @@ RUN cd test/e2e && make app && cp build/app /usr/bin/app
WORKDIR /tendermint
VOLUME /tendermint
ENV TMHOME=/tendermint
ENV TM_LEGACY_P2P=true
EXPOSE 26656 26657 26660 6060
ENTRYPOINT ["/usr/bin/entrypoint"]


+ 2
- 0
test/e2e/runner/rpc.go View File

@ -71,8 +71,10 @@ func waitForNode(node *e2e.Node, height int64, timeout time.Duration) (*rpctypes
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
status, err := client.Status(ctx)
switch {


Loading…
Cancel
Save