Browse Source

p2p: extend e2e tests for new p2p framework (#6323)

pull/6332/head
Sam Kleinman 4 years ago
committed by GitHub
parent
commit
0f41f7465c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 118 additions and 64 deletions
  1. +10
    -0
      config/config.go
  2. +6
    -1
      config/toml.go
  3. +32
    -48
      node/node.go
  4. +0
    -5
      p2p/peermanager.go
  5. +3
    -1
      p2p/pqueue.go
  6. +3
    -1
      p2p/router.go
  7. +4
    -1
      p2p/wdrr_queue.go
  8. +0
    -2
      test/e2e/docker/Dockerfile
  9. +38
    -5
      test/e2e/generator/generate.go
  10. +2
    -0
      test/e2e/networks/ci.toml
  11. +14
    -0
      test/e2e/pkg/manifest.go
  12. +4
    -0
      test/e2e/pkg/testnet.go
  13. +2
    -0
      test/e2e/runner/setup.go

+ 10
- 0
config/config.go View File

@ -607,6 +607,15 @@ type P2PConfig struct { //nolint: maligned
// Testing params. // Testing params.
// Force dial to fail // Force dial to fail
TestDialFail bool `mapstructure:"test-dial-fail"` TestDialFail bool `mapstructure:"test-dial-fail"`
// Mostly for testing, use rather than environment variables
// to turn on the new P2P stack.
UseNewP2P bool `mapstructure:"use-new-p2p"`
// Makes it possible to configure which queue backend the p2p
// layer uses. Options are: "fifo", "priority" and "wdrr",
// with the default being "fifo".
QueueType string `mapstructure:"queue-type"`
} }
// DefaultP2PConfig returns a default configuration for the peer-to-peer layer // DefaultP2PConfig returns a default configuration for the peer-to-peer layer
@ -634,6 +643,7 @@ func DefaultP2PConfig() *P2PConfig {
HandshakeTimeout: 20 * time.Second, HandshakeTimeout: 20 * time.Second,
DialTimeout: 3 * time.Second, DialTimeout: 3 * time.Second,
TestDialFail: false, TestDialFail: false,
QueueType: "priority",
} }
} }


+ 6
- 1
config/toml.go View File

@ -168,7 +168,6 @@ abci = "{{ .BaseConfig.ABCI }}"
# so the app can decide if we should keep the connection or not # so the app can decide if we should keep the connection or not
filter-peers = {{ .BaseConfig.FilterPeers }} filter-peers = {{ .BaseConfig.FilterPeers }}
####################################################################### #######################################################################
### Advanced Configuration Options ### ### Advanced Configuration Options ###
####################################################################### #######################################################################
@ -262,6 +261,12 @@ pprof-laddr = "{{ .RPC.PprofListenAddress }}"
####################################################### #######################################################
[p2p] [p2p]
# Enable the new p2p layer.
use-new-p2p = {{ .P2P.UseNewP2P }}
# Select the p2p internal queue
queue-type = "{{ .P2P.QueueType }}"
# Address to listen for incoming connections # Address to listen for incoming connections
laddr = "{{ .P2P.ListenAddress }}" laddr = "{{ .P2P.ListenAddress }}"


+ 32
- 48
node/node.go View File

@ -8,7 +8,6 @@ import (
"net" "net"
"net/http" "net/http"
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
"os"
"strconv" "strconv"
"time" "time"
@ -54,21 +53,6 @@ import (
"github.com/tendermint/tendermint/version" "github.com/tendermint/tendermint/version"
) )
var (
useLegacyP2P = true
p2pRouterQueueType string
)
func init() {
if v := os.Getenv("TM_LEGACY_P2P"); len(v) > 0 {
useLegacyP2P, _ = strconv.ParseBool(v)
}
if v := os.Getenv("TM_P2P_QUEUE"); len(v) > 0 {
p2pRouterQueueType = v
}
}
// DBContext specifies config information for loading a new DB. // DBContext specifies config information for loading a new DB.
type DBContext struct { type DBContext struct {
ID string ID string
@ -403,12 +387,12 @@ func createMempoolReactor(
peerUpdates *p2p.PeerUpdates peerUpdates *p2p.PeerUpdates
) )
if useLegacyP2P {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
if config.P2P.UseNewP2P {
channels = makeChannelsFromShims(router, channelShims) channels = makeChannelsFromShims(router, channelShims)
peerUpdates = peerManager.Subscribe() peerUpdates = peerManager.Subscribe()
} else {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} }
reactor := mempl.NewReactor( reactor := mempl.NewReactor(
@ -454,12 +438,12 @@ func createEvidenceReactor(
peerUpdates *p2p.PeerUpdates peerUpdates *p2p.PeerUpdates
) )
if useLegacyP2P {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
if config.P2P.UseNewP2P {
channels = makeChannelsFromShims(router, evidence.ChannelShims) channels = makeChannelsFromShims(router, evidence.ChannelShims)
peerUpdates = peerManager.Subscribe() peerUpdates = peerManager.Subscribe()
} else {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} }
evidenceReactor := evidence.NewReactor( evidenceReactor := evidence.NewReactor(
@ -495,12 +479,12 @@ func createBlockchainReactor(
peerUpdates *p2p.PeerUpdates peerUpdates *p2p.PeerUpdates
) )
if useLegacyP2P {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
if config.P2P.UseNewP2P {
channels = makeChannelsFromShims(router, bcv0.ChannelShims) channels = makeChannelsFromShims(router, bcv0.ChannelShims)
peerUpdates = peerManager.Subscribe() peerUpdates = peerManager.Subscribe()
} else {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} }
reactor, err := bcv0.NewReactor( reactor, err := bcv0.NewReactor(
@ -561,12 +545,12 @@ func createConsensusReactor(
peerUpdates *p2p.PeerUpdates peerUpdates *p2p.PeerUpdates
) )
if useLegacyP2P {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} else {
if config.P2P.UseNewP2P {
channels = makeChannelsFromShims(router, cs.ChannelShims) channels = makeChannelsFromShims(router, cs.ChannelShims)
peerUpdates = peerManager.Subscribe() peerUpdates = peerManager.Subscribe()
} else {
channels = getChannelsFromShim(reactorShim)
peerUpdates = reactorShim.PeerUpdates
} }
reactor := cs.NewReactor( reactor := cs.NewReactor(
@ -1154,12 +1138,12 @@ func NewNode(config *cfg.Config,
stateSyncReactorShim = p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims) stateSyncReactorShim = p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims)
if useLegacyP2P {
channels = getChannelsFromShim(stateSyncReactorShim)
peerUpdates = stateSyncReactorShim.PeerUpdates
} else {
if config.P2P.UseNewP2P {
channels = makeChannelsFromShims(router, statesync.ChannelShims) channels = makeChannelsFromShims(router, statesync.ChannelShims)
peerUpdates = peerManager.Subscribe() peerUpdates = peerManager.Subscribe()
} else {
channels = getChannelsFromShim(stateSyncReactorShim)
peerUpdates = stateSyncReactorShim.PeerUpdates
} }
stateSyncReactor = statesync.NewReactor( stateSyncReactor = statesync.NewReactor(
@ -1321,12 +1305,12 @@ func (n *Node) OnStart() error {
n.isListening = true n.isListening = true
n.Logger.Info("p2p service", "legacy_enabled", useLegacyP2P)
n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.UseNewP2P)
if useLegacyP2P {
err = n.sw.Start()
} else {
if n.config.P2P.UseNewP2P {
err = n.router.Start() err = n.router.Start()
} else {
err = n.sw.Start()
} }
if err != nil { if err != nil {
return err return err
@ -1361,7 +1345,7 @@ func (n *Node) OnStart() error {
} }
} }
if !useLegacyP2P && n.pexReactorV2 != nil {
if n.config.P2P.UseNewP2P && n.pexReactorV2 != nil {
if err := n.pexReactorV2.Start(); err != nil { if err := n.pexReactorV2.Start(); err != nil {
return err return err
} }
@ -1434,20 +1418,20 @@ func (n *Node) OnStop() {
} }
} }
if !useLegacyP2P && n.pexReactorV2 != nil {
if n.config.P2P.UseNewP2P && n.pexReactorV2 != nil {
if err := n.pexReactorV2.Stop(); err != nil { if err := n.pexReactorV2.Stop(); err != nil {
n.Logger.Error("failed to stop the PEX v2 reactor", "err", err) n.Logger.Error("failed to stop the PEX v2 reactor", "err", err)
} }
} }
if useLegacyP2P {
if err := n.sw.Stop(); err != nil {
n.Logger.Error("failed to stop switch", "err", err)
}
} else {
if n.config.P2P.UseNewP2P {
if err := n.router.Stop(); err != nil { if err := n.router.Stop(); err != nil {
n.Logger.Error("failed to stop router", "err", err) n.Logger.Error("failed to stop router", "err", err)
} }
} else {
if err := n.sw.Stop(); err != nil {
n.Logger.Error("failed to stop switch", "err", err)
}
} }
// stop mempool WAL // stop mempool WAL
@ -1967,7 +1951,7 @@ func createAndStartPrivValidatorGRPCClient(
func getRouterConfig(conf *cfg.Config, proxyApp proxy.AppConns) p2p.RouterOptions { func getRouterConfig(conf *cfg.Config, proxyApp proxy.AppConns) p2p.RouterOptions {
opts := p2p.RouterOptions{ opts := p2p.RouterOptions{
QueueType: p2pRouterQueueType,
QueueType: conf.P2P.QueueType,
} }
if conf.P2P.MaxNumInboundPeers > 0 { if conf.P2P.MaxNumInboundPeers > 0 {


+ 0
- 5
p2p/peermanager.go View File

@ -257,11 +257,6 @@ func (o *PeerManagerOptions) optimize() {
// lower-scored to evict. // lower-scored to evict.
// - EvictNext: pick peer from evict, mark as evicting. // - EvictNext: pick peer from evict, mark as evicting.
// - Disconnected: unmark connected, upgrading[from]=to, evict, evicting. // - Disconnected: unmark connected, upgrading[from]=to, evict, evicting.
//
// FIXME: The old stack supports ABCI-based peer ID filtering via
// /p2p/filter/id/<ID> queries, we should implement this here as well by taking
// a peer ID filtering callback in PeerManagerOptions and configuring it during
// Node setup.
type PeerManager struct { type PeerManager struct {
selfID NodeID selfID NodeID
options PeerManagerOptions options PeerManagerOptions


+ 3
- 1
p2p/pqueue.go View File

@ -254,7 +254,9 @@ func (s *pqScheduler) process() {
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
} }
s.metrics.PeerSendBytesTotal.With("peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
s.metrics.PeerSendBytesTotal.With(
"chID", chIDStr,
"peer_id", string(pqEnv.envelope.To)).Add(float64(pqEnv.size))
s.dequeueCh <- pqEnv.envelope s.dequeueCh <- pqEnv.envelope
} }


+ 3
- 1
p2p/router.go View File

@ -855,7 +855,9 @@ func (r *Router) receivePeer(peerID NodeID, conn Connection) error {
select { select {
case queue.enqueue() <- Envelope{From: peerID, Message: msg}: case queue.enqueue() <- Envelope{From: peerID, Message: msg}:
r.metrics.PeerReceiveBytesTotal.With("peer_id", string(peerID)).Add(float64(proto.Size(msg)))
r.metrics.PeerReceiveBytesTotal.With(
"chID", fmt.Sprint(chID),
"peer_id", string(peerID)).Add(float64(proto.Size(msg)))
r.metrics.RouterChannelQueueSend.Observe(time.Since(start).Seconds()) r.metrics.RouterChannelQueueSend.Observe(time.Since(start).Seconds())
r.logger.Debug("received message", "peer", peerID, "message", msg) r.logger.Debug("received message", "peer", peerID, "message", msg)


+ 4
- 1
p2p/wdrr_queue.go View File

@ -1,6 +1,7 @@
package p2p package p2p
import ( import (
"fmt"
"sort" "sort"
"strconv" "strconv"
@ -263,7 +264,9 @@ func (s *wdrrScheduler) process() {
// 4. remove from the flow's queue // 4. remove from the flow's queue
// 5. grab the next HoQ Envelope and flow's deficit // 5. grab the next HoQ Envelope and flow's deficit
for len(s.buffer[chID]) > 0 && d >= we.size { for len(s.buffer[chID]) > 0 && d >= we.size {
s.metrics.PeerSendBytesTotal.With("peer_id", string(we.envelope.To)).Add(float64(we.size))
s.metrics.PeerSendBytesTotal.With(
"chID", fmt.Sprint(chID),
"peer_id", string(we.envelope.To)).Add(float64(we.size))
s.dequeueCh <- we.envelope s.dequeueCh <- we.envelope
s.size -= we.size s.size -= we.size
s.deficits[chID] -= we.size s.deficits[chID] -= we.size


+ 0
- 2
test/e2e/docker/Dockerfile View File

@ -26,8 +26,6 @@ RUN cd test/e2e && make app && cp build/app /usr/bin/app
WORKDIR /tendermint WORKDIR /tendermint
VOLUME /tendermint VOLUME /tendermint
ENV TMHOME=/tendermint ENV TMHOME=/tendermint
ENV TM_LEGACY_P2P=true
ENV TM_P2P_QUEUE="priority"
EXPOSE 26656 26657 26660 6060 EXPOSE 26656 26657 26660 6060
ENTRYPOINT ["/usr/bin/entrypoint"] ENTRYPOINT ["/usr/bin/entrypoint"]


+ 38
- 5
test/e2e/generator/generate.go View File

@ -16,6 +16,8 @@ var (
testnetCombinations = map[string][]interface{}{ testnetCombinations = map[string][]interface{}{
"topology": {"single", "quad", "large"}, "topology": {"single", "quad", "large"},
"ipv6": {false, true}, "ipv6": {false, true},
"useNewP2P": {false, true, 2},
"queueType": {"priority"}, // "fifo", "wdrr"
"initialHeight": {0, 1000}, "initialHeight": {0, 1000},
"initialState": { "initialState": {
map[string]string{}, map[string]string{},
@ -69,6 +71,17 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
Nodes: map[string]*e2e.ManifestNode{}, Nodes: map[string]*e2e.ManifestNode{},
KeyType: opt["keyType"].(string), KeyType: opt["keyType"].(string),
Evidence: evidence.Choose(r).(int), Evidence: evidence.Choose(r).(int),
QueueType: opt["queueType"].(string),
}
var p2pNodeFactor int
switch p2pInfo := opt["useNewP2P"].(type) {
case bool:
manifest.UseNewP2P = p2pInfo
case int:
manifest.UseNewP2P = false
p2pNodeFactor = p2pInfo
} }
var numSeeds, numValidators, numFulls, numLightClients int var numSeeds, numValidators, numFulls, numLightClients int
@ -89,8 +102,14 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
// First we generate seed nodes, starting at the initial height. // First we generate seed nodes, starting at the initial height.
for i := 1; i <= numSeeds; i++ { for i := 1; i <= numSeeds; i++ {
manifest.Nodes[fmt.Sprintf("seed%02d", i)] = generateNode(
r, e2e.ModeSeed, 0, manifest.InitialHeight, false)
node := generateNode(r, e2e.ModeSeed, 0, manifest.InitialHeight, false)
node.QueueType = manifest.QueueType
if p2pNodeFactor == 0 {
node.UseNewP2P = manifest.UseNewP2P
} else if p2pNodeFactor%i == 0 {
node.UseNewP2P = !manifest.UseNewP2P
}
manifest.Nodes[fmt.Sprintf("seed%02d", i)] = node
} }
// Next, we generate validators. We make sure a BFT quorum of validators start // Next, we generate validators. We make sure a BFT quorum of validators start
@ -105,9 +124,17 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
nextStartAt += 5 nextStartAt += 5
} }
name := fmt.Sprintf("validator%02d", i) name := fmt.Sprintf("validator%02d", i)
manifest.Nodes[name] = generateNode(
node := generateNode(
r, e2e.ModeValidator, startAt, manifest.InitialHeight, i <= 2) r, e2e.ModeValidator, startAt, manifest.InitialHeight, i <= 2)
node.QueueType = manifest.QueueType
if p2pNodeFactor == 0 {
node.UseNewP2P = manifest.UseNewP2P
} else if p2pNodeFactor%i == 0 {
node.UseNewP2P = !manifest.UseNewP2P
}
manifest.Nodes[name] = node
if startAt == 0 { if startAt == 0 {
(*manifest.Validators)[name] = int64(30 + r.Intn(71)) (*manifest.Validators)[name] = int64(30 + r.Intn(71))
} else { } else {
@ -134,8 +161,14 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
startAt = nextStartAt startAt = nextStartAt
nextStartAt += 5 nextStartAt += 5
} }
manifest.Nodes[fmt.Sprintf("full%02d", i)] = generateNode(
r, e2e.ModeFull, startAt, manifest.InitialHeight, false)
node := generateNode(r, e2e.ModeFull, startAt, manifest.InitialHeight, false)
node.QueueType = manifest.QueueType
if p2pNodeFactor == 0 {
node.UseNewP2P = manifest.UseNewP2P
} else if p2pNodeFactor%i == 0 {
node.UseNewP2P = !manifest.UseNewP2P
}
manifest.Nodes[fmt.Sprintf("full%02d", i)] = node
} }
// We now set up peer discovery for nodes. Seed nodes are fully meshed with // We now set up peer discovery for nodes. Seed nodes are fully meshed with


+ 2
- 0
test/e2e/networks/ci.toml View File

@ -4,6 +4,8 @@
initial_height = 1000 initial_height = 1000
evidence = 0 evidence = 0
initial_state = { initial01 = "a", initial02 = "b", initial03 = "c" } initial_state = { initial01 = "a", initial02 = "b", initial03 = "c" }
use_new_p2p = false
queue_type = "priority"
[validators] [validators]
validator01 = 100 validator01 = 100


+ 14
- 0
test/e2e/pkg/manifest.go View File

@ -58,6 +58,13 @@ type Manifest struct {
// LogLevel sets the log level of the entire testnet. This can be overridden // LogLevel sets the log level of the entire testnet. This can be overridden
// by individual nodes. // by individual nodes.
LogLevel string `toml:"log_level"` LogLevel string `toml:"log_level"`
// UseNewP2P enables use of the new p2p layer for all nodes in
// a test.
UseNewP2P bool `toml:"use_new_p2p"`
// QueueType describes the type of queue that the system uses internally
QueueType string `toml:"queue_type"`
} }
// ManifestNode represents a node in a testnet manifest. // ManifestNode represents a node in a testnet manifest.
@ -134,6 +141,13 @@ type ManifestNode struct {
// This is helpful when debugging a specific problem. This overrides the network // This is helpful when debugging a specific problem. This overrides the network
// level. // level.
LogLevel string `toml:"log_level"` LogLevel string `toml:"log_level"`
// UseNewP2P enables use of the new p2p layer for this node.
UseNewP2P bool `toml:"use_new_p2p"`
// QueueType describes the type of queue that the p2p layer
// uses internally.
QueueType string `toml:"queue_type"`
} }
// Save saves the testnet manifest to a file. // Save saves the testnet manifest to a file.


+ 4
- 0
test/e2e/pkg/testnet.go View File

@ -90,6 +90,8 @@ type Node struct {
PersistentPeers []*Node PersistentPeers []*Node
Perturbations []Perturbation Perturbations []Perturbation
LogLevel string LogLevel string
UseNewP2P bool
QueueType string
} }
// LoadTestnet loads a testnet from a manifest file, using the filename to // LoadTestnet loads a testnet from a manifest file, using the filename to
@ -167,6 +169,8 @@ func LoadTestnet(file string) (*Testnet, error) {
RetainBlocks: nodeManifest.RetainBlocks, RetainBlocks: nodeManifest.RetainBlocks,
Perturbations: []Perturbation{}, Perturbations: []Perturbation{},
LogLevel: manifest.LogLevel, LogLevel: manifest.LogLevel,
UseNewP2P: manifest.UseNewP2P,
QueueType: manifest.QueueType,
} }
if node.StartAt == testnet.InitialHeight { if node.StartAt == testnet.InitialHeight {
node.StartAt = 0 // normalize to 0 for initial nodes, since code expects this node.StartAt = 0 // normalize to 0 for initial nodes, since code expects this


+ 2
- 0
test/e2e/runner/setup.go View File

@ -239,6 +239,8 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
cfg.RPC.PprofListenAddress = ":6060" cfg.RPC.PprofListenAddress = ":6060"
cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false)) cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false))
cfg.P2P.AddrBookStrict = false cfg.P2P.AddrBookStrict = false
cfg.P2P.UseNewP2P = node.UseNewP2P
cfg.P2P.QueueType = node.QueueType
cfg.DBBackend = node.Database cfg.DBBackend = node.Database
cfg.StateSync.DiscoveryTime = 5 * time.Second cfg.StateSync.DiscoveryTime = 5 * time.Second
if node.Mode != e2e.ModeLight { if node.Mode != e2e.ModeLight {


Loading…
Cancel
Save