Browse Source

p2p: transport should be captive resposibility of router (#7160)

The main (and minor) win of this PR is that the transport is fully the
responsibility of the router and the node doesn't need to be responsible for its lifecylce.
pull/7168/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
4bd8c5ab6f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 67 additions and 41 deletions
  1. +14
    -0
      internal/p2p/mocks/transport.go
  2. +1
    -0
      internal/p2p/p2ptest/network.go
  3. +12
    -0
      internal/p2p/router.go
  4. +4
    -4
      internal/p2p/router_init_test.go
  5. +10
    -0
      internal/p2p/router_test.go
  6. +3
    -0
      internal/p2p/transport.go
  7. +2
    -0
      internal/p2p/transport_memory.go
  8. +6
    -31
      node/node.go
  9. +15
    -6
      node/setup.go

+ 14
- 0
internal/p2p/mocks/transport.go View File

@ -98,6 +98,20 @@ func (_m *Transport) Endpoints() []p2p.Endpoint {
return r0
}
// Listen provides a mock function with given fields: _a0
func (_m *Transport) Listen(_a0 p2p.Endpoint) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(p2p.Endpoint) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// Protocols provides a mock function with given fields:
func (_m *Transport) Protocols() []p2p.Protocol {
ret := _m.Called()


+ 1
- 0
internal/p2p/p2ptest/network.go View File

@ -249,6 +249,7 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node {
privKey,
peerManager,
[]p2p.Transport{transport},
transport.Endpoints(),
p2p.RouterOptions{DialSleep: func(_ context.Context) {}},
)
require.NoError(t, err)


+ 12
- 0
internal/p2p/router.go View File

@ -249,6 +249,7 @@ type Router struct {
peerManager *PeerManager
chDescs []*ChannelDescriptor
transports []Transport
endpoints []Endpoint
connTracker connectionTracker
protocolTransports map[Protocol]Transport
stopCh chan struct{} // signals Router shutdown
@ -277,6 +278,7 @@ func NewRouter(
privKey crypto.PrivKey,
peerManager *PeerManager,
transports []Transport,
endpoints []Endpoint,
options RouterOptions,
) (*Router, error) {
@ -295,6 +297,7 @@ func NewRouter(
),
chDescs: make([]*ChannelDescriptor, 0),
transports: transports,
endpoints: endpoints,
protocolTransports: map[Protocol]Transport{},
peerManager: peerManager,
options: options,
@ -1021,11 +1024,20 @@ func (r *Router) NodeInfo() types.NodeInfo {
// OnStart implements service.Service.
func (r *Router) OnStart() error {
for _, transport := range r.transports {
for _, endpoint := range r.endpoints {
if err := transport.Listen(endpoint); err != nil {
return err
}
}
}
r.Logger.Info(
"starting router",
"node_id", r.nodeInfo.NodeID,
"channels", r.nodeInfo.Channels,
"listen_addr", r.nodeInfo.ListenAddr,
"transports", len(r.transports),
)
go r.dialPeers()


+ 4
- 4
internal/p2p/router_init_test.go View File

@ -18,21 +18,21 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
t.Run("Default", func(t *testing.T) {
require.Zero(t, os.Getenv("TM_P2P_QUEUE"))
opts := RouterOptions{}
r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, opts)
r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
require.NoError(t, err)
_, ok := r.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Fifo", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypeFifo}
r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, opts)
r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
require.NoError(t, err)
_, ok := r.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Priority", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypePriority}
r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, opts)
r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
require.NoError(t, err)
q, ok := r.queueFactory(1).(*pqScheduler)
require.True(t, ok)
@ -40,7 +40,7 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
})
t.Run("NonExistant", func(t *testing.T) {
opts := RouterOptions{QueueType: "fast"}
_, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, opts)
_, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
require.Error(t, err)
require.Contains(t, err.Error(), "fast")
})


+ 10
- 0
internal/p2p/router_test.go View File

@ -109,6 +109,7 @@ func TestRouter_Channel_Basic(t *testing.T) {
selfKey,
peerManager,
nil,
nil,
p2p.RouterOptions{},
)
require.NoError(t, err)
@ -393,6 +394,7 @@ func TestRouter_AcceptPeers(t *testing.T) {
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
)
require.NoError(t, err)
@ -445,6 +447,7 @@ func TestRouter_AcceptPeers_Error(t *testing.T) {
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
)
require.NoError(t, err)
@ -479,6 +482,7 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) {
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
)
require.NoError(t, err)
@ -527,6 +531,7 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
)
require.NoError(t, err)
@ -626,6 +631,7 @@ func TestRouter_DialPeers(t *testing.T) {
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
)
require.NoError(t, err)
@ -709,6 +715,7 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{
DialSleep: func(_ context.Context) {},
NumConcurrentDials: func() int {
@ -781,6 +788,7 @@ func TestRouter_EvictPeers(t *testing.T) {
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
)
require.NoError(t, err)
@ -842,6 +850,7 @@ func TestRouter_ChannelCompatability(t *testing.T) {
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
)
require.NoError(t, err)
@ -896,6 +905,7 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
)
require.NoError(t, err)


+ 3
- 0
internal/p2p/transport.go View File

@ -23,6 +23,9 @@ type Protocol string
// Transport is a connection-oriented mechanism for exchanging data with a peer.
type Transport interface {
// Listen starts the transport on the specified endpoint.
Listen(Endpoint) error
// Protocols returns the protocols supported by the transport. The Router
// uses this to pick a transport for an Endpoint.
Protocols() []Protocol


+ 2
- 0
internal/p2p/transport_memory.go View File

@ -117,6 +117,8 @@ func (t *MemoryTransport) String() string {
return string(MemoryProtocol)
}
func (*MemoryTransport) Listen(Endpoint) error { return nil }
func (t *MemoryTransport) AddChannelDescriptors([]*ChannelDescriptor) {}
// Protocols implements Transport.


+ 6
- 31
node/node.go View File

@ -52,7 +52,6 @@ type nodeImpl struct {
privValidator types.PrivValidator // local node's validator key
// network
transport *p2p.MConnTransport
peerManager *p2p.PeerManager
router *p2p.Router
nodeInfo types.NodeInfo
@ -257,9 +256,6 @@ func makeNode(cfg *config.Config,
}
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, cfg)
peerManager, peerCloser, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
closers = append(closers, peerCloser)
if err != nil {
@ -268,8 +264,8 @@ func makeNode(cfg *config.Config,
makeCloser(closers))
}
router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey,
peerManager, transport, getRouterConfig(cfg, proxyApp))
router, err := createRouter(logger, nodeMetrics.p2p, nodeInfo, nodeKey,
peerManager, cfg, proxyApp)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
@ -381,12 +377,9 @@ func makeNode(cfg *config.Config,
// If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
// Note we currently use the addrBook regardless at least for AddOurAddress
var pexReactor service.Service
pexReactor, err = createPEXReactor(logger, peerManager, router)
pexReactor, err := createPEXReactor(logger, peerManager, router)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
if cfg.RPC.PprofListenAddress != "" {
@ -401,7 +394,6 @@ func makeNode(cfg *config.Config,
genesisDoc: genDoc,
privValidator: privValidator,
transport: transport,
peerManager: peerManager,
router: router,
nodeInfo: nodeInfo,
@ -479,8 +471,6 @@ func makeSeedNode(cfg *config.Config,
// Setup Transport and Switch.
p2pMetrics := p2p.PrometheusMetrics(cfg.Instrumentation.Namespace, "chain_id", genDoc.ChainID)
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, cfg)
peerManager, closer, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
if err != nil {
@ -489,8 +479,8 @@ func makeSeedNode(cfg *config.Config,
closer)
}
router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey,
peerManager, transport, getRouterConfig(cfg, nil))
router, err := createRouter(logger, p2pMetrics, nodeInfo, nodeKey,
peerManager, cfg, nil)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
@ -516,7 +506,6 @@ func makeSeedNode(cfg *config.Config,
config: cfg,
genesisDoc: genDoc,
transport: transport,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
peerManager: peerManager,
@ -556,20 +545,11 @@ func (n *nodeImpl) OnStart() error {
}
// Start the transport.
ep, err := p2p.NewEndpoint(n.nodeKey.ID.AddressString(n.config.P2P.ListenAddress))
if err != nil {
return err
}
if err := n.transport.Listen(ep); err != nil {
if err := n.router.Start(); err != nil {
return err
}
n.isListening = true
if err = n.router.Start(); err != nil {
return err
}
if n.config.Mode != config.ModeSeed {
if err := n.bcReactor.Start(); err != nil {
return err
@ -732,11 +712,6 @@ func (n *nodeImpl) OnStop() {
if err := n.router.Stop(); err != nil {
n.Logger.Error("failed to stop router", "err", err)
}
if err := n.transport.Close(); err != nil {
n.Logger.Error("Error closing transport", "err", err)
}
n.isListening = false
// finally stop the listeners / external services


+ 15
- 6
node/setup.go View File

@ -486,23 +486,32 @@ func createPeerManager(
}
func createRouter(
p2pLogger log.Logger,
logger log.Logger,
p2pMetrics *p2p.Metrics,
nodeInfo types.NodeInfo,
privKey crypto.PrivKey,
nodeKey types.NodeKey,
peerManager *p2p.PeerManager,
transport p2p.Transport,
options p2p.RouterOptions,
conf *config.Config,
proxyApp proxy.AppConns,
) (*p2p.Router, error) {
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, conf)
ep, err := p2p.NewEndpoint(nodeKey.ID.AddressString(conf.P2P.ListenAddress))
if err != nil {
return nil, err
}
return p2p.NewRouter(
p2pLogger,
p2pMetrics,
nodeInfo,
privKey,
nodeKey.PrivKey,
peerManager,
[]p2p.Transport{transport},
options,
[]p2p.Endpoint{ep},
getRouterConfig(conf, proxyApp),
)
}


Loading…
Cancel
Save