From 4bd8c5ab6f24e0b4aa611755b9dedbe23543cbb4 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 26 Oct 2021 18:34:44 +0200 Subject: [PATCH] p2p: transport should be captive resposibility of router (#7160) The main (and minor) win of this PR is that the transport is fully the responsibility of the router and the node doesn't need to be responsible for its lifecylce. --- internal/p2p/mocks/transport.go | 14 ++++++++++++ internal/p2p/p2ptest/network.go | 1 + internal/p2p/router.go | 12 +++++++++++ internal/p2p/router_init_test.go | 8 +++---- internal/p2p/router_test.go | 10 +++++++++ internal/p2p/transport.go | 3 +++ internal/p2p/transport_memory.go | 2 ++ node/node.go | 37 ++++++-------------------------- node/setup.go | 21 ++++++++++++------ 9 files changed, 67 insertions(+), 41 deletions(-) diff --git a/internal/p2p/mocks/transport.go b/internal/p2p/mocks/transport.go index 2fc7baa29..eea1de4c5 100644 --- a/internal/p2p/mocks/transport.go +++ b/internal/p2p/mocks/transport.go @@ -98,6 +98,20 @@ func (_m *Transport) Endpoints() []p2p.Endpoint { return r0 } +// Listen provides a mock function with given fields: _a0 +func (_m *Transport) Listen(_a0 p2p.Endpoint) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(p2p.Endpoint) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Protocols provides a mock function with given fields: func (_m *Transport) Protocols() []p2p.Protocol { ret := _m.Called() diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index c808ad3e0..0d92b2619 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -249,6 +249,7 @@ func (n *Network) MakeNode(t *testing.T, opts NodeOptions) *Node { privKey, peerManager, []p2p.Transport{transport}, + transport.Endpoints(), p2p.RouterOptions{DialSleep: func(_ context.Context) {}}, ) require.NoError(t, err) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 4724e8d9b..efd29c0d4 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -249,6 +249,7 @@ type Router struct { peerManager *PeerManager chDescs []*ChannelDescriptor transports []Transport + endpoints []Endpoint connTracker connectionTracker protocolTransports map[Protocol]Transport stopCh chan struct{} // signals Router shutdown @@ -277,6 +278,7 @@ func NewRouter( privKey crypto.PrivKey, peerManager *PeerManager, transports []Transport, + endpoints []Endpoint, options RouterOptions, ) (*Router, error) { @@ -295,6 +297,7 @@ func NewRouter( ), chDescs: make([]*ChannelDescriptor, 0), transports: transports, + endpoints: endpoints, protocolTransports: map[Protocol]Transport{}, peerManager: peerManager, options: options, @@ -1021,11 +1024,20 @@ func (r *Router) NodeInfo() types.NodeInfo { // OnStart implements service.Service. func (r *Router) OnStart() error { + for _, transport := range r.transports { + for _, endpoint := range r.endpoints { + if err := transport.Listen(endpoint); err != nil { + return err + } + } + } + r.Logger.Info( "starting router", "node_id", r.nodeInfo.NodeID, "channels", r.nodeInfo.Channels, "listen_addr", r.nodeInfo.ListenAddr, + "transports", len(r.transports), ) go r.dialPeers() diff --git a/internal/p2p/router_init_test.go b/internal/p2p/router_init_test.go index b90d2a3dd..c8bef696a 100644 --- a/internal/p2p/router_init_test.go +++ b/internal/p2p/router_init_test.go @@ -18,21 +18,21 @@ func TestRouter_ConstructQueueFactory(t *testing.T) { t.Run("Default", func(t *testing.T) { require.Zero(t, os.Getenv("TM_P2P_QUEUE")) opts := RouterOptions{} - r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, opts) + r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) require.NoError(t, err) _, ok := r.queueFactory(1).(*fifoQueue) require.True(t, ok) }) t.Run("Fifo", func(t *testing.T) { opts := RouterOptions{QueueType: queueTypeFifo} - r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, opts) + r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) require.NoError(t, err) _, ok := r.queueFactory(1).(*fifoQueue) require.True(t, ok) }) t.Run("Priority", func(t *testing.T) { opts := RouterOptions{QueueType: queueTypePriority} - r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, opts) + r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) require.NoError(t, err) q, ok := r.queueFactory(1).(*pqScheduler) require.True(t, ok) @@ -40,7 +40,7 @@ func TestRouter_ConstructQueueFactory(t *testing.T) { }) t.Run("NonExistant", func(t *testing.T) { opts := RouterOptions{QueueType: "fast"} - _, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, opts) + _, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts) require.Error(t, err) require.Contains(t, err.Error(), "fast") }) diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 997f02a06..77c6f768e 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -109,6 +109,7 @@ func TestRouter_Channel_Basic(t *testing.T) { selfKey, peerManager, nil, + nil, p2p.RouterOptions{}, ) require.NoError(t, err) @@ -393,6 +394,7 @@ func TestRouter_AcceptPeers(t *testing.T) { selfKey, peerManager, []p2p.Transport{mockTransport}, + nil, p2p.RouterOptions{}, ) require.NoError(t, err) @@ -445,6 +447,7 @@ func TestRouter_AcceptPeers_Error(t *testing.T) { selfKey, peerManager, []p2p.Transport{mockTransport}, + nil, p2p.RouterOptions{}, ) require.NoError(t, err) @@ -479,6 +482,7 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) { selfKey, peerManager, []p2p.Transport{mockTransport}, + nil, p2p.RouterOptions{}, ) require.NoError(t, err) @@ -527,6 +531,7 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) { selfKey, peerManager, []p2p.Transport{mockTransport}, + nil, p2p.RouterOptions{}, ) require.NoError(t, err) @@ -626,6 +631,7 @@ func TestRouter_DialPeers(t *testing.T) { selfKey, peerManager, []p2p.Transport{mockTransport}, + nil, p2p.RouterOptions{}, ) require.NoError(t, err) @@ -709,6 +715,7 @@ func TestRouter_DialPeers_Parallel(t *testing.T) { selfKey, peerManager, []p2p.Transport{mockTransport}, + nil, p2p.RouterOptions{ DialSleep: func(_ context.Context) {}, NumConcurrentDials: func() int { @@ -781,6 +788,7 @@ func TestRouter_EvictPeers(t *testing.T) { selfKey, peerManager, []p2p.Transport{mockTransport}, + nil, p2p.RouterOptions{}, ) require.NoError(t, err) @@ -842,6 +850,7 @@ func TestRouter_ChannelCompatability(t *testing.T) { selfKey, peerManager, []p2p.Transport{mockTransport}, + nil, p2p.RouterOptions{}, ) require.NoError(t, err) @@ -896,6 +905,7 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) { selfKey, peerManager, []p2p.Transport{mockTransport}, + nil, p2p.RouterOptions{}, ) require.NoError(t, err) diff --git a/internal/p2p/transport.go b/internal/p2p/transport.go index e78906362..08de0d3b0 100644 --- a/internal/p2p/transport.go +++ b/internal/p2p/transport.go @@ -23,6 +23,9 @@ type Protocol string // Transport is a connection-oriented mechanism for exchanging data with a peer. type Transport interface { + // Listen starts the transport on the specified endpoint. + Listen(Endpoint) error + // Protocols returns the protocols supported by the transport. The Router // uses this to pick a transport for an Endpoint. Protocols() []Protocol diff --git a/internal/p2p/transport_memory.go b/internal/p2p/transport_memory.go index b4161ecd6..5d9291675 100644 --- a/internal/p2p/transport_memory.go +++ b/internal/p2p/transport_memory.go @@ -117,6 +117,8 @@ func (t *MemoryTransport) String() string { return string(MemoryProtocol) } +func (*MemoryTransport) Listen(Endpoint) error { return nil } + func (t *MemoryTransport) AddChannelDescriptors([]*ChannelDescriptor) {} // Protocols implements Transport. diff --git a/node/node.go b/node/node.go index d329e2494..bfccff6ef 100644 --- a/node/node.go +++ b/node/node.go @@ -52,7 +52,6 @@ type nodeImpl struct { privValidator types.PrivValidator // local node's validator key // network - transport *p2p.MConnTransport peerManager *p2p.PeerManager router *p2p.Router nodeInfo types.NodeInfo @@ -257,9 +256,6 @@ func makeNode(cfg *config.Config, } - p2pLogger := logger.With("module", "p2p") - transport := createTransport(p2pLogger, cfg) - peerManager, peerCloser, err := createPeerManager(cfg, dbProvider, nodeKey.ID) closers = append(closers, peerCloser) if err != nil { @@ -268,8 +264,8 @@ func makeNode(cfg *config.Config, makeCloser(closers)) } - router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey, - peerManager, transport, getRouterConfig(cfg, proxyApp)) + router, err := createRouter(logger, nodeMetrics.p2p, nodeInfo, nodeKey, + peerManager, cfg, proxyApp) if err != nil { return nil, combineCloseError( fmt.Errorf("failed to create router: %w", err), @@ -381,12 +377,9 @@ func makeNode(cfg *config.Config, // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it. // Note we currently use the addrBook regardless at least for AddOurAddress - var pexReactor service.Service - - pexReactor, err = createPEXReactor(logger, peerManager, router) + pexReactor, err := createPEXReactor(logger, peerManager, router) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) - } if cfg.RPC.PprofListenAddress != "" { @@ -401,7 +394,6 @@ func makeNode(cfg *config.Config, genesisDoc: genDoc, privValidator: privValidator, - transport: transport, peerManager: peerManager, router: router, nodeInfo: nodeInfo, @@ -479,8 +471,6 @@ func makeSeedNode(cfg *config.Config, // Setup Transport and Switch. p2pMetrics := p2p.PrometheusMetrics(cfg.Instrumentation.Namespace, "chain_id", genDoc.ChainID) - p2pLogger := logger.With("module", "p2p") - transport := createTransport(p2pLogger, cfg) peerManager, closer, err := createPeerManager(cfg, dbProvider, nodeKey.ID) if err != nil { @@ -489,8 +479,8 @@ func makeSeedNode(cfg *config.Config, closer) } - router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey, - peerManager, transport, getRouterConfig(cfg, nil)) + router, err := createRouter(logger, p2pMetrics, nodeInfo, nodeKey, + peerManager, cfg, nil) if err != nil { return nil, combineCloseError( fmt.Errorf("failed to create router: %w", err), @@ -516,7 +506,6 @@ func makeSeedNode(cfg *config.Config, config: cfg, genesisDoc: genDoc, - transport: transport, nodeInfo: nodeInfo, nodeKey: nodeKey, peerManager: peerManager, @@ -556,20 +545,11 @@ func (n *nodeImpl) OnStart() error { } // Start the transport. - ep, err := p2p.NewEndpoint(n.nodeKey.ID.AddressString(n.config.P2P.ListenAddress)) - if err != nil { - return err - } - if err := n.transport.Listen(ep); err != nil { + if err := n.router.Start(); err != nil { return err } - n.isListening = true - if err = n.router.Start(); err != nil { - return err - } - if n.config.Mode != config.ModeSeed { if err := n.bcReactor.Start(); err != nil { return err @@ -732,11 +712,6 @@ func (n *nodeImpl) OnStop() { if err := n.router.Stop(); err != nil { n.Logger.Error("failed to stop router", "err", err) } - - if err := n.transport.Close(); err != nil { - n.Logger.Error("Error closing transport", "err", err) - } - n.isListening = false // finally stop the listeners / external services diff --git a/node/setup.go b/node/setup.go index 2fddceac1..f7f6230d1 100644 --- a/node/setup.go +++ b/node/setup.go @@ -486,23 +486,32 @@ func createPeerManager( } func createRouter( - p2pLogger log.Logger, + logger log.Logger, p2pMetrics *p2p.Metrics, nodeInfo types.NodeInfo, - privKey crypto.PrivKey, + nodeKey types.NodeKey, peerManager *p2p.PeerManager, - transport p2p.Transport, - options p2p.RouterOptions, + conf *config.Config, + proxyApp proxy.AppConns, ) (*p2p.Router, error) { + p2pLogger := logger.With("module", "p2p") + transport := createTransport(p2pLogger, conf) + + ep, err := p2p.NewEndpoint(nodeKey.ID.AddressString(conf.P2P.ListenAddress)) + if err != nil { + return nil, err + } + return p2p.NewRouter( p2pLogger, p2pMetrics, nodeInfo, - privKey, + nodeKey.PrivKey, peerManager, []p2p.Transport{transport}, - options, + []p2p.Endpoint{ep}, + getRouterConfig(conf, proxyApp), ) }