diff --git a/node/node.go b/node/node.go index 62d0bb166..1086d40be 100644 --- a/node/node.go +++ b/node/node.go @@ -460,13 +460,30 @@ func createConsensusReactor(config *cfg.Config, func createTransport( logger log.Logger, config *cfg.Config, +) *p2p.MConnTransport { + return p2p.NewMConnTransport( + logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{}, + p2p.MConnTransportOptions{ + MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers + + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")), + ), + }, + ) +} + +func createSwitch(config *cfg.Config, + transport p2p.Transport, + p2pMetrics *p2p.Metrics, + mempoolReactor *p2p.ReactorShim, + bcReactor p2p.Reactor, + stateSyncReactor *p2p.ReactorShim, + consensusReactor *cs.Reactor, + evidenceReactor *p2p.ReactorShim, + proxyApp proxy.AppConns, nodeInfo p2p.NodeInfo, nodeKey p2p.NodeKey, - proxyApp proxy.AppConns, -) ( - *p2p.MConnTransport, - []p2p.PeerFilterFunc, -) { + p2pLogger log.Logger) *p2p.Switch { + var ( connFilters = []p2p.ConnFilterFunc{} peerFilters = []p2p.PeerFilterFunc{} @@ -516,34 +533,12 @@ func createTransport( ) } - transport := p2p.NewMConnTransport( - logger, nodeInfo, nodeKey.PrivKey, p2p.MConnConfig(config.P2P), - p2p.MConnTransportConnFilters(connFilters...), - p2p.MConnTransportMaxIncomingConnections(config.P2P.MaxNumInboundPeers+ - len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))), - ) - - return transport, peerFilters -} - -func createSwitch(config *cfg.Config, - transport p2p.Transport, - p2pMetrics *p2p.Metrics, - peerFilters []p2p.PeerFilterFunc, - mempoolReactor *p2p.ReactorShim, - bcReactor p2p.Reactor, - stateSyncReactor *p2p.ReactorShim, - consensusReactor *cs.Reactor, - evidenceReactor *p2p.ReactorShim, - nodeInfo p2p.NodeInfo, - nodeKey p2p.NodeKey, - p2pLogger log.Logger) *p2p.Switch { - sw := p2p.NewSwitch( config.P2P, transport, p2p.WithMetrics(p2pMetrics), p2p.SwitchPeerFilters(peerFilters...), + p2p.SwitchConnFilters(connFilters...), ) sw.SetLogger(p2pLogger) sw.AddReactor("MEMPOOL", mempoolReactor) @@ -839,10 +834,10 @@ func NewNode(config *cfg.Config, // Setup Transport and Switch. p2pLogger := logger.With("module", "p2p") - transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) + transport := createTransport(p2pLogger, config) sw := createSwitch( - config, transport, p2pMetrics, peerFilters, mpReactorShim, bcReactorForSwitch, - stateSyncReactorShim, csReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, + config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, + stateSyncReactorShim, csReactor, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, ) err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) @@ -957,12 +952,6 @@ func (n *Node) OnStart() error { } } - // Start the switch (the P2P server). - err := n.sw.Start() - if err != nil { - return err - } - // Start the transport. addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress)) if err != nil { @@ -974,6 +963,12 @@ func (n *Node) OnStart() error { n.isListening = true + // Start the switch (the P2P server). + err = n.sw.Start() + if err != nil { + return err + } + if n.config.FastSync.Version == "v0" { // Start the real blockchain reactor separately since the switch uses the shim. if err := n.bcReactor.Start(); err != nil { diff --git a/p2p/netaddress.go b/p2p/netaddress.go index a9bd72315..adf30bad5 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -292,7 +292,6 @@ func (na *NetAddress) HasID() bool { func (na *NetAddress) Endpoint() Endpoint { return Endpoint{ Protocol: MConnProtocol, - PeerID: na.ID, IP: na.IP, Port: na.Port, } diff --git a/p2p/peer.go b/p2p/peer.go index d0b46e9aa..0a70a0f3b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -29,13 +29,13 @@ import ( // PeerAddress is a peer address URL. It differs from Endpoint in that the // address hostname may be expanded into multiple IP addresses (thus multiple -// endpoints). +// endpoints), and that it knows the node's ID. // // If the URL is opaque, i.e. of the form "scheme:", then the opaque // part has to contain either the node ID or a node ID and path in the form // "scheme:@". type PeerAddress struct { - ID NodeID + NodeID NodeID Protocol Protocol Hostname string Port uint16 @@ -60,7 +60,7 @@ func ParsePeerAddress(urlString string) (PeerAddress, error) { if len(parts) > 2 { return PeerAddress{}, fmt.Errorf("invalid address format %q, unexpected @", urlString) } - address.ID, err = NewNodeID(parts[0]) + address.NodeID, err = NewNodeID(parts[0]) if err != nil { return PeerAddress{}, fmt.Errorf("invalid peer ID %q: %w", parts[0], err) } @@ -71,7 +71,7 @@ func ParsePeerAddress(urlString string) (PeerAddress, error) { } // Otherwise, just parse a normal networked URL. - address.ID, err = NewNodeID(url.User.Username()) + address.NodeID, err = NewNodeID(url.User.Username()) if err != nil { return PeerAddress{}, fmt.Errorf("invalid peer ID %q: %w", url.User.Username(), err) } @@ -117,7 +117,6 @@ func (a PeerAddress) Resolve(ctx context.Context) ([]Endpoint, error) { // "scheme:". if a.Hostname == "" { return []Endpoint{{ - PeerID: a.ID, Protocol: a.Protocol, Path: a.Path, }}, nil @@ -130,7 +129,6 @@ func (a PeerAddress) Resolve(ctx context.Context) ([]Endpoint, error) { endpoints := make([]Endpoint, len(ips)) for i, ip := range ips { endpoints[i] = Endpoint{ - PeerID: a.ID, Protocol: a.Protocol, IP: ip, Port: a.Port, @@ -145,9 +143,9 @@ func (a PeerAddress) Validate() error { if a.Protocol == "" { return errors.New("no protocol") } - if a.ID == "" { + if a.NodeID == "" { return errors.New("no peer ID") - } else if err := a.ID.Validate(); err != nil { + } else if err := a.NodeID.Validate(); err != nil { return fmt.Errorf("invalid peer ID: %w", err) } if a.Port > 0 && a.Hostname == "" { @@ -160,14 +158,14 @@ func (a PeerAddress) Validate() error { func (a PeerAddress) String() string { // Handle opaque URLs. if a.Hostname == "" { - s := fmt.Sprintf("%s:%s", a.Protocol, a.ID) + s := fmt.Sprintf("%s:%s", a.Protocol, a.NodeID) if a.Path != "" { s += "@" + a.Path } return s } - s := fmt.Sprintf("%s://%s@%s", a.Protocol, a.ID, a.Hostname) + s := fmt.Sprintf("%s://%s@%s", a.Protocol, a.NodeID, a.Hostname) if a.Port > 0 { s += ":" + strconv.Itoa(int(a.Port)) } @@ -316,6 +314,11 @@ const ( // lower-scored to evict. // - EvictNext: pick peer from evict, mark as evicting. // - Disconnected: unmark connected, upgrading[from]=to, evict, evicting. +// +// FIXME: The old stack supports ABCI-based peer ID filtering via +// /p2p/filter/id/ queries, we should implement this here as well by taking +// a peer ID filtering callback in PeerManagerOptions and configuring it during +// Node setup. type PeerManager struct { options PeerManagerOptions wakeDialCh chan struct{} // wakes up DialNext() on relevant peer changes @@ -476,9 +479,9 @@ func (m *PeerManager) Add(address PeerAddress) error { m.mtx.Lock() defer m.mtx.Unlock() - peer, ok := m.store.Get(address.ID) + peer, ok := m.store.Get(address.NodeID) if !ok { - peer = m.makePeerInfo(address.ID) + peer = m.makePeerInfo(address.NodeID) } if _, ok := peer.AddressInfo[address.String()]; !ok { peer.AddressInfo[address.String()] = &peerAddressInfo{Address: address} @@ -1363,11 +1366,6 @@ func newPeerConn(outbound, persistent bool, conn Connection) peerConn { } } -// ID only exists for SecretConnection. -func (pc peerConn) ID() NodeID { - return NodeIDFromPubKey(pc.conn.PubKey()) -} - // Return the IP from the connection RemoteAddr func (pc peerConn) RemoteIP() net.IP { if pc.ip == nil { @@ -1403,12 +1401,12 @@ type peer struct { type PeerOption func(*peer) func newPeer( + nodeInfo NodeInfo, pc peerConn, reactorsByCh map[byte]Reactor, onPeerError func(Peer, interface{}), options ...PeerOption, ) *peer { - nodeInfo := pc.conn.NodeInfo() p := &peer{ peerConn: pc, nodeInfo: nodeInfo, @@ -1534,7 +1532,12 @@ func (p *peer) NodeInfo() NodeInfo { // For inbound peers, it's the address returned by the underlying connection // (not what's reported in the peer's NodeInfo). func (p *peer) SocketAddr() *NetAddress { - return p.peerConn.conn.RemoteEndpoint().NetAddress() + endpoint := p.peerConn.conn.RemoteEndpoint() + return &NetAddress{ + ID: p.ID(), + IP: endpoint.IP, + Port: endpoint.Port, + } } // Status returns the peer's ConnectionStatus. diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 92023c43f..665c46706 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -1,6 +1,7 @@ package p2p import ( + "context" "fmt" golog "log" "net" @@ -82,15 +83,18 @@ func createOutboundPeerAndPerformHandshake( } pk := ed25519.GenPrivKey() ourNodeInfo := testNodeInfo(NodeIDFromPubKey(pk.PubKey()), "host_peer") - transport := NewMConnTransport(log.TestingLogger(), ourNodeInfo, pk, mConfig) - transport.SetChannelDescriptors(chDescs) + transport := NewMConnTransport(log.TestingLogger(), mConfig, chDescs, MConnTransportOptions{}) reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)} pc, err := testOutboundPeerConn(transport, addr, config, false, pk) if err != nil { return nil, err } + peerInfo, _, err := pc.conn.Handshake(context.Background(), ourNodeInfo, pk) + if err != nil { + return nil, err + } - p := newPeer(pc, reactorsByCh, func(p Peer, r interface{}) {}) + p := newPeer(peerInfo, pc, reactorsByCh, func(p Peer, r interface{}) {}) p.SetLogger(log.TestingLogger().With("peer", addr)) return p, nil } @@ -129,14 +133,6 @@ func testOutboundPeerConn( return pc, err } - // ensure dialed ID matches connection ID - if addr.ID != pc.ID() { - if cerr := conn.Close(); cerr != nil { - return pc, fmt.Errorf("%v: %w", cerr.Error(), err) - } - return pc, ErrSwitchAuthenticationFailure{addr, pc.ID()} - } - return pc, nil } @@ -179,12 +175,17 @@ func (rp *remotePeer) Stop() { } func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) { - transport := NewMConnTransport(log.TestingLogger(), rp.nodeInfo(), rp.PrivKey, MConnConfig(rp.Config)) + transport := NewMConnTransport(log.TestingLogger(), MConnConfig(rp.Config), + []*ChannelDescriptor{}, MConnTransportOptions{}) conn, err := addr.DialTimeout(1 * time.Second) if err != nil { return nil, err } - _, err = testInboundPeerConn(transport, conn) + pc, err := testInboundPeerConn(transport, conn) + if err != nil { + return nil, err + } + _, _, err = pc.conn.Handshake(context.Background(), rp.nodeInfo(), rp.PrivKey) if err != nil { return nil, err } @@ -192,7 +193,8 @@ func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) { } func (rp *remotePeer) accept() { - transport := NewMConnTransport(log.TestingLogger(), rp.nodeInfo(), rp.PrivKey, MConnConfig(rp.Config)) + transport := NewMConnTransport(log.TestingLogger(), MConnConfig(rp.Config), + []*ChannelDescriptor{}, MConnTransportOptions{}) conns := []net.Conn{} for { @@ -205,10 +207,14 @@ func (rp *remotePeer) accept() { return } - _, err = testInboundPeerConn(transport, conn) + pc, err := testInboundPeerConn(transport, conn) if err != nil { golog.Printf("Failed to create a peer: %+v", err) } + _, _, err = pc.conn.Handshake(context.Background(), rp.nodeInfo(), rp.PrivKey) + if err != nil { + golog.Printf("Failed to handshake a peer: %+v", err) + } conns = append(conns, conn) } diff --git a/p2p/pex/reactor.go b/p2p/pex/reactor.go index c9c5b6779..782dadbb9 100644 --- a/p2p/pex/reactor.go +++ b/p2p/pex/reactor.go @@ -137,7 +137,7 @@ func (r *ReactorV2) resolve(addresses []p2p.PeerAddress, limit uint16) []protop2 // PEX currently only supports IP-networked transports (as // opposed to e.g. p2p.MemoryTransport). pexAddresses = append(pexAddresses, protop2p.PexAddress{ - ID: string(endpoint.PeerID), + ID: string(address.NodeID), IP: endpoint.IP.String(), Port: uint32(endpoint.Port), }) diff --git a/p2p/router.go b/p2p/router.go index 7158464a0..4f8be6924 100644 --- a/p2p/router.go +++ b/p2p/router.go @@ -9,10 +9,30 @@ import ( "time" "github.com/gogo/protobuf/proto" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" ) +// RouterOptions specifies options for a Router. +type RouterOptions struct { + // ResolveTimeout is the timeout for resolving a PeerAddress URLs. + // 0 means no timeout. + ResolveTimeout time.Duration + + // DialTimeout is the timeout for dialing a peer. 0 means no timeout. + DialTimeout time.Duration + + // HandshakeTimeout is the timeout for handshaking with a peer. 0 means + // no timeout. + HandshakeTimeout time.Duration +} + +// Validate validates the options. +func (o *RouterOptions) Validate() error { + return nil +} + // Router manages peer connections and routes messages between peers and reactor // channels. This is an early prototype. // @@ -76,9 +96,13 @@ import ( // forever on a channel that has no consumer. type Router struct { *service.BaseService + logger log.Logger + nodeInfo NodeInfo + privKey crypto.PrivKey transports map[Protocol]Transport peerManager *PeerManager + options RouterOptions // FIXME: Consider using sync.Map. peerMtx sync.RWMutex @@ -95,23 +119,42 @@ type Router struct { stopCh chan struct{} } -// NewRouter creates a new Router, dialing the given peers. -// -// FIXME: providing protocol/transport maps is cumbersome in tests, we should -// consider adding Protocols() to the Transport interface instead and register -// protocol/transport mappings automatically on a first-come basis. -func NewRouter(logger log.Logger, peerManager *PeerManager, transports map[Protocol]Transport) *Router { +// NewRouter creates a new Router. +func NewRouter( + logger log.Logger, + nodeInfo NodeInfo, + privKey crypto.PrivKey, + peerManager *PeerManager, + transports []Transport, + options RouterOptions, +) (*Router, error) { + if err := options.Validate(); err != nil { + return nil, err + } + router := &Router{ logger: logger, - transports: transports, + nodeInfo: nodeInfo, + privKey: privKey, + transports: map[Protocol]Transport{}, peerManager: peerManager, + options: options, stopCh: make(chan struct{}), channelQueues: map[ChannelID]queue{}, channelMessages: map[ChannelID]proto.Message{}, peerQueues: map[NodeID]queue{}, } router.BaseService = service.NewBaseService(logger, "router", router) - return router + + for _, transport := range transports { + for _, protocol := range transport.Protocols() { + if _, ok := router.transports[protocol]; !ok { + router.transports[protocol] = transport + } + } + } + + return router, nil } // OpenChannel opens a new channel for the given message type. The caller must @@ -236,6 +279,20 @@ func (r *Router) acceptPeers(transport Transport) { // FIXME: We may need transports to enforce some sort of rate limiting // here (e.g. by IP address), or alternatively have PeerManager.Accepted() // do it for us. + // + // FIXME: Even though PeerManager enforces MaxConnected, we may want to + // limit the maximum number of active connections here too, since e.g. + // an adversary can open a ton of connections and then just hang during + // the handshake, taking up TCP socket descriptors. + // + // FIXME: The old P2P stack rejected multiple connections for the same IP + // unless P2PConfig.AllowDuplicateIP is true -- it's better to limit this + // by peer ID rather than IP address, so this hasn't been implemented and + // probably shouldn't (?). + // + // FIXME: The old P2P stack supported ABCI-based IP address filtering via + // /p2p/filter/addr/ queries, do we want to implement this here as well? + // Filtering by node ID is probably better. conn, err := transport.Accept(ctx) switch err { case nil: @@ -265,29 +322,35 @@ func (r *Router) acceptPeers(transport Transport) { // peer manager before completing the handshake -- this probably // requires protocol changes to send an additional message when the // handshake is accepted. - peerID := conn.NodeInfo().NodeID - if err := r.peerManager.Accepted(peerID); err != nil { - r.logger.Error("failed to accept connection", "peer", peerID, "err", err) + peerInfo, _, err := r.handshakePeer(ctx, conn, "") + if err == context.Canceled { + return + } else if err != nil { + r.logger.Error("failed to handshake with peer", "err", err) + return + } + if err := r.peerManager.Accepted(peerInfo.NodeID); err != nil { + r.logger.Error("failed to accept connection", "peer", peerInfo.NodeID, "err", err) return } queue := newFIFOQueue() r.peerMtx.Lock() - r.peerQueues[peerID] = queue + r.peerQueues[peerInfo.NodeID] = queue r.peerMtx.Unlock() - r.peerManager.Ready(peerID) + r.peerManager.Ready(peerInfo.NodeID) defer func() { r.peerMtx.Lock() - delete(r.peerQueues, peerID) + delete(r.peerQueues, peerInfo.NodeID) r.peerMtx.Unlock() queue.close() - if err := r.peerManager.Disconnected(peerID); err != nil { - r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err) + if err := r.peerManager.Disconnected(peerInfo.NodeID); err != nil { + r.logger.Error("failed to disconnect peer", "peer", peerInfo.NodeID, "err", err) } }() - r.routePeer(peerID, conn, queue) + r.routePeer(peerInfo.NodeID, conn, queue) }() } } @@ -312,7 +375,7 @@ func (r *Router) dialPeers() { if errors.Is(err, context.Canceled) { return } else if err != nil { - r.logger.Error("failed to dial peer", "peer", peerID) + r.logger.Error("failed to dial peer", "peer", peerID, "err", err) if err = r.peerManager.DialFailed(peerID, address); err != nil { r.logger.Error("failed to report dial failure", "peer", peerID, "err", err) } @@ -320,6 +383,17 @@ func (r *Router) dialPeers() { } defer conn.Close() + _, _, err = r.handshakePeer(ctx, conn, peerID) + if errors.Is(err, context.Canceled) { + return + } else if err != nil { + r.logger.Error("failed to handshake with peer", "peer", peerID, "err", err) + if err = r.peerManager.DialFailed(peerID, address); err != nil { + r.logger.Error("failed to report dial failure", "peer", peerID, "err", err) + } + return + } + if err = r.peerManager.Dialed(peerID, address); err != nil { r.logger.Error("failed to dial peer", "peer", peerID, "err", err) return @@ -346,27 +420,33 @@ func (r *Router) dialPeers() { } } -// dialPeer attempts to connect to a peer. +// dialPeer connects to a peer by dialing it. func (r *Router) dialPeer(ctx context.Context, address PeerAddress) (Connection, error) { - resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - r.logger.Info("resolving peer address", "address", address) - + resolveCtx := ctx + if r.options.ResolveTimeout > 0 { + var cancel context.CancelFunc + resolveCtx, cancel = context.WithTimeout(resolveCtx, r.options.ResolveTimeout) + defer cancel() + } endpoints, err := address.Resolve(resolveCtx) if err != nil { return nil, fmt.Errorf("failed to resolve address %q: %w", address, err) } for _, endpoint := range endpoints { - t, ok := r.transports[endpoint.Protocol] + transport, ok := r.transports[endpoint.Protocol] if !ok { - r.logger.Error("no transport found for protocol", "protocol", endpoint.Protocol) + r.logger.Error("no transport found for endpoint protocol", "endpoint", endpoint) continue } - dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() + dialCtx := ctx + if r.options.DialTimeout > 0 { + var cancel context.CancelFunc + dialCtx, cancel = context.WithTimeout(dialCtx, r.options.DialTimeout) + defer cancel() + } // FIXME: When we dial and handshake the peer, we should pass it // appropriate address(es) it can use to dial us back. It can't use our @@ -375,17 +455,46 @@ func (r *Router) dialPeer(ctx context.Context, address PeerAddress) (Connection, // by the peer's endpoint, since e.g. a peer on 192.168.0.0 can reach us // on a private address on this endpoint, but a peer on the public // Internet can't and needs a different public address. - conn, err := t.Dial(dialCtx, endpoint) + conn, err := transport.Dial(dialCtx, endpoint) if err != nil { r.logger.Error("failed to dial endpoint", "endpoint", endpoint, "err", err) } else { - r.logger.Info("connected to peer", "peer", address.ID, "endpoint", endpoint) + r.logger.Info("connected to peer", "peer", address.NodeID, "endpoint", endpoint) return conn, nil } } return nil, fmt.Errorf("failed to connect to peer via %q", address) } +// handshakePeer handshakes with a peer, validating the peer's information. If +// expectID is given, we check that the peer's public key matches it. +func (r *Router) handshakePeer(ctx context.Context, conn Connection, expectID NodeID) (NodeInfo, crypto.PubKey, error) { + if r.options.HandshakeTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, r.options.HandshakeTimeout) + defer cancel() + } + peerInfo, peerKey, err := conn.Handshake(ctx, r.nodeInfo, r.privKey) + if err != nil { + return peerInfo, peerKey, err + } + if err = peerInfo.Validate(); err != nil { + return peerInfo, peerKey, fmt.Errorf("invalid handshake NodeInfo: %w", err) + } + if expectID != "" && expectID != peerInfo.NodeID { + return peerInfo, peerKey, fmt.Errorf("expected to connect with peer %q, got %q", + expectID, peerInfo.NodeID) + } + if NodeIDFromPubKey(peerKey) != peerInfo.NodeID { + return peerInfo, peerKey, fmt.Errorf("peer's public key did not match its node ID %q (expected %q)", + peerInfo.NodeID, NodeIDFromPubKey(peerKey)) + } + if peerInfo.NodeID == r.nodeInfo.NodeID { + return peerInfo, peerKey, errors.New("rejecting handshake with self") + } + return peerInfo, peerKey, nil +} + // routePeer routes inbound messages from a peer to channels, and also sends // outbound queued messages to the peer. It will close the connection and send // queue, using this as a signal to coordinate the internal receivePeer() and diff --git a/p2p/router_test.go b/p2p/router_test.go index c42ef72d3..f1a18bf10 100644 --- a/p2p/router_test.go +++ b/p2p/router_test.go @@ -10,12 +10,26 @@ import ( "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" + "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" ) type TestMessage = gogotypes.StringValue +func generateNode() (p2p.NodeInfo, crypto.PrivKey) { + privKey := ed25519.GenPrivKey() + nodeID := p2p.NodeIDFromPubKey(privKey.PubKey()) + nodeInfo := p2p.NodeInfo{ + NodeID: nodeID, + // FIXME: We have to fake a ListenAddr for now. + ListenAddr: "127.0.0.1:1234", + Moniker: "foo", + } + return nodeInfo, privKey +} + func echoReactor(channel *p2p.Channel) { for { select { @@ -35,7 +49,9 @@ func TestRouter(t *testing.T) { logger := log.TestingLogger() network := p2p.NewMemoryNetwork(logger) - transport := network.GenerateTransport() + nodeInfo, privKey := generateNode() + transport, err := network.CreateTransport(nodeInfo.NodeID) + require.NoError(t, err) defer transport.Close() chID := p2p.ChannelID(1) @@ -45,16 +61,20 @@ func TestRouter(t *testing.T) { for i := 0; i < 3; i++ { peerManager, err := p2p.NewPeerManager(dbm.NewMemDB(), p2p.PeerManagerOptions{}) require.NoError(t, err) - peerTransport := network.GenerateTransport() + peerInfo, peerKey := generateNode() + peerTransport, err := network.CreateTransport(peerInfo.NodeID) + require.NoError(t, err) defer peerTransport.Close() - peerRouter := p2p.NewRouter( + peerRouter, err := p2p.NewRouter( logger.With("peerID", i), + peerInfo, + peerKey, peerManager, - map[p2p.Protocol]p2p.Transport{ - p2p.MemoryProtocol: peerTransport, - }, + []p2p.Transport{peerTransport}, + p2p.RouterOptions{}, ) - peers = append(peers, peerTransport.Endpoints()[0].PeerAddress()) + require.NoError(t, err) + peers = append(peers, peerTransport.Endpoints()[0].PeerAddress(peerInfo.NodeID)) channel, err := peerRouter.OpenChannel(chID, &TestMessage{}) require.NoError(t, err) @@ -77,10 +97,8 @@ func TestRouter(t *testing.T) { peerUpdates := peerManager.Subscribe() defer peerUpdates.Close() - router := p2p.NewRouter(logger, peerManager, map[p2p.Protocol]p2p.Transport{ - p2p.MemoryProtocol: transport, - }) - + router, err := p2p.NewRouter(logger, nodeInfo, privKey, peerManager, []p2p.Transport{transport}, p2p.RouterOptions{}) + require.NoError(t, err) channel, err := router.OpenChannel(chID, &TestMessage{}) require.NoError(t, err) defer channel.Close() @@ -124,13 +142,13 @@ func TestRouter(t *testing.T) { // We then submit an error for a peer, and watch it get disconnected. channel.Error() <- p2p.PeerError{ - PeerID: peers[0].ID, + PeerID: peers[0].NodeID, Err: errors.New("test error"), Severity: p2p.PeerErrorSeverityCritical, } peerUpdate := <-peerUpdates.Updates() require.Equal(t, p2p.PeerUpdate{ - PeerID: peers[0].ID, + PeerID: peers[0].NodeID, Status: p2p.PeerStatusDown, }, peerUpdate) @@ -138,7 +156,7 @@ func TestRouter(t *testing.T) { // for that to happen. peerUpdate = <-peerUpdates.Updates() require.Equal(t, p2p.PeerUpdate{ - PeerID: peers[0].ID, + PeerID: peers[0].NodeID, Status: p2p.PeerStatusUp, }, peerUpdate) } diff --git a/p2p/switch.go b/p2p/switch.go index d4c76cd38..e345b3572 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -3,11 +3,14 @@ package p2p import ( "context" "fmt" + "io" "math" + "net" "sync" "time" "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/libs/cmap" "github.com/tendermint/tendermint/libs/rand" "github.com/tendermint/tendermint/libs/service" @@ -28,6 +31,8 @@ const ( // ie. 3**10 = 16hrs reconnectBackOffAttempts = 10 reconnectBackOffBaseSeconds = 3 + + defaultFilterTimeout = 5 * time.Second ) // MConnConfig returns an MConnConfig with fields updated @@ -56,10 +61,30 @@ type AddrBook interface { Save() } +// ConnFilterFunc is a callback for connection filtering. If it returns an +// error, the connection is rejected. The set of existing connections is passed +// along with the new connection and all resolved IPs. +type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error + // PeerFilterFunc to be implemented by filter hooks after a new Peer has been // fully setup. type PeerFilterFunc func(IPeerSet, Peer) error +// ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection +// and refuses new ones if they come from a known ip. +var ConnDuplicateIPFilter ConnFilterFunc = func(cs ConnSet, c net.Conn, ips []net.IP) error { + for _, ip := range ips { + if cs.HasIP(ip) { + return ErrRejected{ + conn: c, + err: fmt.Errorf("ip<%v> already connected", ip), + isDuplicate: true, + } + } + } + return nil +} + //----------------------------------------------------------------------------- // Switch handles peer connections and exposes an API to receive incoming messages @@ -87,6 +112,8 @@ type Switch struct { filterTimeout time.Duration peerFilters []PeerFilterFunc + connFilters []ConnFilterFunc + conns ConnSet rng *rand.Rand // seed for randomizing dial times and orders @@ -100,7 +127,11 @@ func (sw *Switch) NetAddress() *NetAddress { if len(endpoints) == 0 { return nil } - return endpoints[0].NetAddress() + return &NetAddress{ + ID: sw.nodeInfo.NodeID, + IP: endpoints[0].IP, + Port: endpoints[0].Port, + } } // SwitchOption sets an optional parameter on the Switch. @@ -122,9 +153,10 @@ func NewSwitch( reconnecting: cmap.NewCMap(), metrics: NopMetrics(), transport: transport, - filterTimeout: defaultFilterTimeout, persistentPeersAddrs: make([]*NetAddress, 0), unconditionalPeerIDs: make(map[NodeID]struct{}), + filterTimeout: defaultFilterTimeout, + conns: NewConnSet(), } // Ensure we have a completely undeterministic PRNG. @@ -149,6 +181,11 @@ func SwitchPeerFilters(filters ...PeerFilterFunc) SwitchOption { return func(sw *Switch) { sw.peerFilters = filters } } +// SwitchConnFilters sets the filters for rejection of connections. +func SwitchConnFilters(filters ...ConnFilterFunc) SwitchOption { + return func(sw *Switch) { sw.connFilters = filters } +} + // WithMetrics sets the metrics. func WithMetrics(metrics *Metrics) SwitchOption { return func(sw *Switch) { sw.metrics = metrics } @@ -230,7 +267,9 @@ func (sw *Switch) OnStart() error { // FIXME: Temporary hack to pass channel descriptors to MConn transport, // since they are not available when it is constructed. This will be // fixed when we implement the new router abstraction. - sw.transport.SetChannelDescriptors(sw.chDescs) + if t, ok := sw.transport.(*MConnTransport); ok { + t.channelDescs = sw.chDescs + } // Start reactors for _, reactor := range sw.reactors { @@ -380,6 +419,8 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { if sw.peers.Remove(peer) { sw.metrics.Peers.Add(float64(-1)) } + + sw.conns.RemoveAddr(peer.RemoteAddr()) } // reconnectToPeer tries to reconnect to the addr, first repeatedly @@ -627,8 +668,26 @@ func (sw *Switch) IsPeerPersistent(na *NetAddress) bool { func (sw *Switch) acceptRoutine() { for { - c, err := sw.transport.Accept(context.Background()) + var peerNodeInfo NodeInfo + ctx := context.Background() + c, err := sw.transport.Accept(ctx) + if err == nil { + // NOTE: The legacy MConn transport did handshaking in Accept(), + // which was asynchronous and avoided head-of-line-blocking. + // However, as handshakes are being migrated out from the transport, + // we just do it synchronously here for now. + peerNodeInfo, _, err = sw.handshakePeer(c, "") + } + if err == nil { + err = sw.filterConn(c.(*mConnConnection).conn) + } if err != nil { + if c != nil { + _ = c.Close() + } + if err == io.EOF { + err = ErrTransportClosed{} + } switch err := err.(type) { case ErrRejected: if err.IsSelf() { @@ -675,7 +734,6 @@ func (sw *Switch) acceptRoutine() { break } - peerNodeInfo := c.NodeInfo() isPersistent := false addr, err := peerNodeInfo.NetAddress() if err == nil { @@ -683,6 +741,7 @@ func (sw *Switch) acceptRoutine() { } p := newPeer( + peerNodeInfo, newPeerConn(false, isPersistent, c), sw.reactorsByCh, sw.StopPeerForError, @@ -710,6 +769,7 @@ func (sw *Switch) acceptRoutine() { if p.IsRunning() { _ = p.Stop() } + sw.conns.RemoveAddr(p.RemoteAddr()) sw.Logger.Info( "Ignoring inbound connection: error while adding peer", "err", err, @@ -736,13 +796,26 @@ func (sw *Switch) addOutboundPeerWithConfig( return fmt.Errorf("dial err (peerConfig.DialFail == true)") } - c, err := sw.transport.Dial(context.Background(), Endpoint{ + // Hardcoded timeout moved from MConn transport during refactoring. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var peerNodeInfo NodeInfo + c, err := sw.transport.Dial(ctx, Endpoint{ Protocol: MConnProtocol, - PeerID: addr.ID, IP: addr.IP, Port: addr.Port, }) + if err == nil { + peerNodeInfo, _, err = sw.handshakePeer(c, addr.ID) + } + if err == nil { + err = sw.filterConn(c.(*mConnConnection).conn) + } if err != nil { + if c != nil { + _ = c.Close() + } if e, ok := err.(ErrRejected); ok { if e.IsSelf() { // Remove the given address from the address book and add to our addresses @@ -764,7 +837,8 @@ func (sw *Switch) addOutboundPeerWithConfig( } p := newPeer( - newPeerConn(true, sw.IsPeerPersistent(c.RemoteEndpoint().NetAddress()), c), + peerNodeInfo, + newPeerConn(true, sw.IsPeerPersistent(addr), c), sw.reactorsByCh, sw.StopPeerForError, PeerMetrics(sw.metrics), @@ -775,12 +849,73 @@ func (sw *Switch) addOutboundPeerWithConfig( if p.IsRunning() { _ = p.Stop() } + sw.conns.RemoveAddr(p.RemoteAddr()) return err } return nil } +func (sw *Switch) handshakePeer(c Connection, expectPeerID NodeID) (NodeInfo, crypto.PubKey, error) { + // Moved from transport and hardcoded until legacy P2P stack removal. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + peerInfo, peerKey, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey) + if err != nil { + return peerInfo, peerKey, ErrRejected{ + conn: c.(*mConnConnection).conn, + err: fmt.Errorf("handshake failed: %v", err), + isAuthFailure: true, + } + } + + if err = peerInfo.Validate(); err != nil { + return peerInfo, peerKey, ErrRejected{ + conn: c.(*mConnConnection).conn, + err: err, + isNodeInfoInvalid: true, + } + } + + // For outgoing conns, ensure connection key matches dialed key. + if expectPeerID != "" { + peerID := NodeIDFromPubKey(peerKey) + if expectPeerID != peerID { + return peerInfo, peerKey, ErrRejected{ + conn: c.(*mConnConnection).conn, + id: peerID, + err: fmt.Errorf( + "conn.ID (%v) dialed ID (%v) mismatch", + peerID, + expectPeerID, + ), + isAuthFailure: true, + } + } + } + + if sw.nodeInfo.ID() == peerInfo.ID() { + return peerInfo, peerKey, ErrRejected{ + addr: *NewNetAddress(peerInfo.ID(), c.(*mConnConnection).conn.RemoteAddr()), + conn: c.(*mConnConnection).conn, + id: peerInfo.ID(), + isSelf: true, + } + } + + if err = sw.nodeInfo.CompatibleWith(peerInfo); err != nil { + return peerInfo, peerKey, ErrRejected{ + conn: c.(*mConnConnection).conn, + err: err, + id: peerInfo.ID(), + isIncompatible: true, + } + } + + return peerInfo, peerKey, nil +} + func (sw *Switch) filterPeer(p Peer) error { // Avoid duplicate if sw.peers.Has(p.ID()) { @@ -809,6 +944,51 @@ func (sw *Switch) filterPeer(p Peer) error { return nil } +// filterConn filters a connection, rejecting it if this function errors. +// +// FIXME: This is only here for compatibility with the current Switch code. In +// the new P2P stack, peer/connection filtering should be moved into the Router +// or PeerManager and removed from here. +func (sw *Switch) filterConn(conn net.Conn) error { + if sw.conns.Has(conn) { + return ErrRejected{conn: conn, isDuplicate: true} + } + + host, _, err := net.SplitHostPort(conn.RemoteAddr().String()) + if err != nil { + return err + } + ip := net.ParseIP(host) + if ip == nil { + return fmt.Errorf("connection address has invalid IP address %q", host) + } + + // Apply filter callbacks. + chErr := make(chan error, len(sw.connFilters)) + for _, connFilter := range sw.connFilters { + go func(connFilter ConnFilterFunc) { + chErr <- connFilter(sw.conns, conn, []net.IP{ip}) + }(connFilter) + } + + for i := 0; i < cap(chErr); i++ { + select { + case err := <-chErr: + if err != nil { + return ErrRejected{conn: conn, err: err, isFiltered: true} + } + case <-time.After(sw.filterTimeout): + return ErrFilterTimeout{} + } + + } + + // FIXME: Doesn't really make sense to set this here, but we preserve the + // behavior from the previous P2P transport implementation. + sw.conns.Set(conn, []net.IP{ip}) + return nil +} + // addPeer starts up the Peer and adds it to the Switch. Error is returned if // the peer is filtered out or failed to start or can't be added. func (sw *Switch) addPeer(p Peer) error { diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 432586cfe..ab8882f83 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -245,7 +245,12 @@ func TestSwitchPeerFilter(t *testing.T) { if err != nil { t.Fatal(err) } + peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey) + if err != nil { + t.Fatal(err) + } p := newPeer( + peerInfo, newPeerConn(true, false, c), sw.reactorsByCh, sw.StopPeerForError, @@ -296,7 +301,12 @@ func TestSwitchPeerFilterTimeout(t *testing.T) { if err != nil { t.Fatal(err) } + peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey) + if err != nil { + t.Fatal(err) + } p := newPeer( + peerInfo, newPeerConn(true, false, c), sw.reactorsByCh, sw.StopPeerForError, @@ -327,7 +337,12 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) { if err != nil { t.Fatal(err) } + peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey) + if err != nil { + t.Fatal(err) + } p := newPeer( + peerInfo, newPeerConn(true, false, c), sw.reactorsByCh, sw.StopPeerForError, @@ -377,7 +392,12 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { if err != nil { t.Fatal(err) } + peerInfo, _, err := c.Handshake(ctx, sw.nodeInfo, sw.nodeKey.PrivKey) + if err != nil { + t.Fatal(err) + } p := newPeer( + peerInfo, newPeerConn(true, false, c), sw.reactorsByCh, sw.StopPeerForError, @@ -678,16 +698,23 @@ type errorTransport struct { acceptErr error } +func (et errorTransport) String() string { + return "error" +} + +func (et errorTransport) Protocols() []Protocol { + return []Protocol{"error"} +} + func (et errorTransport) Accept(context.Context) (Connection, error) { return nil, et.acceptErr } func (errorTransport) Dial(context.Context, Endpoint) (Connection, error) { panic("not implemented") } -func (errorTransport) Close() error { panic("not implemented") } -func (errorTransport) FlushClose() error { panic("not implemented") } -func (errorTransport) Endpoints() []Endpoint { panic("not implemented") } -func (errorTransport) SetChannelDescriptors([]*ChannelDescriptor) {} +func (errorTransport) Close() error { panic("not implemented") } +func (errorTransport) FlushClose() error { panic("not implemented") } +func (errorTransport) Endpoints() []Endpoint { panic("not implemented") } func TestSwitchAcceptRoutineErrorCases(t *testing.T) { sw := NewSwitch(cfg, errorTransport{ErrFilterTimeout{}}) diff --git a/p2p/test_util.go b/p2p/test_util.go index 04091a991..8b80034c3 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -1,6 +1,7 @@ package p2p import ( + "context" "fmt" "net" @@ -122,8 +123,16 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { } return err } + peerNodeInfo, _, err := pc.conn.Handshake(context.Background(), sw.nodeInfo, sw.nodeKey.PrivKey) + if err != nil { + if err := conn.Close(); err != nil { + sw.Logger.Error("Error closing connection", "err", err) + } + return err + } p := newPeer( + peerNodeInfo, pc, sw.reactorsByCh, sw.StopPeerForError, @@ -167,7 +176,8 @@ func MakeSwitch( } logger := log.TestingLogger().With("switch", i) - t := NewMConnTransport(logger, nodeInfo, nodeKey.PrivKey, MConnConfig(cfg)) + t := NewMConnTransport(logger, MConnConfig(cfg), + []*ChannelDescriptor{}, MConnTransportOptions{}) // TODO: let the config be passed in? sw := initSwitch(i, NewSwitch(cfg, t, opts...)) @@ -187,7 +197,6 @@ func MakeSwitch( // TODO: We need to setup reactors ahead of time so the NodeInfo is properly // populated and we don't have to do those awkward overrides and setters. - t.nodeInfo = nodeInfo sw.SetNodeInfo(nodeInfo) return sw @@ -206,10 +215,7 @@ func testPeerConn( outbound, persistent bool, ) (pc peerConn, err error) { - conn, err := newMConnConnection(transport, rawConn, "") - if err != nil { - return pc, fmt.Errorf("error creating peer: %w", err) - } + conn := newMConnConnection(transport.logger, rawConn, transport.mConnConfig, transport.channelDescs) return newPeerConn(outbound, persistent, conn), nil } diff --git a/p2p/transport.go b/p2p/transport.go index 2e98c5d16..d278cbeae 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "strconv" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/p2p/conn" @@ -14,98 +15,34 @@ const ( defaultProtocol Protocol = MConnProtocol ) -// Transport is an arbitrary mechanism for exchanging bytes with a peer. +// Protocol identifies a transport protocol. +type Protocol string + +// Transport is a connection-oriented mechanism for exchanging data with a peer. type Transport interface { - // Accept waits for the next inbound connection on a listening endpoint. If - // this returns io.EOF or ErrTransportClosed the transport should be - // considered closed and further Accept() calls are futile. + // Protocols returns the protocols the transport supports, which the + // router uses to pick a transport for a PeerAddress. + Protocols() []Protocol + + // Accept waits for the next inbound connection on a listening endpoint, or + // returns io.EOF if the transport is closed. Accept(context.Context) (Connection, error) // Dial creates an outbound connection to an endpoint. Dial(context.Context, Endpoint) (Connection, error) - // Endpoints lists endpoints the transport is listening on. Any endpoint IP - // addresses do not need to be normalized in any way (e.g. 0.0.0.0 is - // valid), as they should be preprocessed before being advertised. + // Endpoints lists endpoints the transport is listening on. Endpoints() []Endpoint // Close stops accepting new connections, but does not close active connections. Close() error - // SetChannelDescriptors sets the channel descriptors for the transport. - // FIXME: This is only here for compatibility with the current Switch code. - SetChannelDescriptors(chDescs []*conn.ChannelDescriptor) -} - -// Protocol identifies a transport protocol. -type Protocol string - -// Endpoint represents a transport connection endpoint, either local or remote. -type Endpoint struct { - // PeerID specifies the peer ID of the endpoint. + // Stringer is used to display the transport, e.g. in logs. // - // FIXME: This is here for backwards-compatibility with the existing MConn - // protocol, we should consider moving this higher in the stack (i.e. to - // the router). - PeerID NodeID - - // Protocol specifies the transport protocol, used by the router to pick a - // transport for an endpoint. - Protocol Protocol - - // Path is an optional, arbitrary transport-specific path or identifier. - Path string - - // IP is an IP address (v4 or v6) to connect to. If set, this defines the - // endpoint as a networked endpoint. - IP net.IP - - // Port is a network port (either TCP or UDP). If not set, a default port - // may be used depending on the protocol. - Port uint16 -} - -// PeerAddress converts the endpoint into a peer address. -func (e Endpoint) PeerAddress() PeerAddress { - address := PeerAddress{ - ID: e.PeerID, - Protocol: e.Protocol, - Path: e.Path, - } - if e.IP != nil { - address.Hostname = e.IP.String() - address.Port = e.Port - } - return address -} - -// String formats an endpoint as a URL string. -func (e Endpoint) String() string { - return e.PeerAddress().String() -} - -// Validate validates an endpoint. -func (e Endpoint) Validate() error { - switch { - case e.PeerID == "": - return errors.New("endpoint has no peer ID") - case e.Protocol == "": - return errors.New("endpoint has no protocol") - case e.Port > 0 && len(e.IP) == 0: - return fmt.Errorf("endpoint has port %v but no IP", e.Port) - default: - return nil - } -} - -// NetAddress returns a NetAddress for the endpoint. -// FIXME: This is temporary for compatibility with the old P2P stack. -func (e Endpoint) NetAddress() *NetAddress { - return &NetAddress{ - ID: e.PeerID, - IP: e.IP, - Port: e.Port, - } + // Without this, the logger may use reflection to access and display + // internal fields -- these are written concurrently, which can trigger the + // race detector or even cause a panic. + fmt.Stringer } // Connection represents an established connection between two endpoints. @@ -120,6 +57,15 @@ func (e Endpoint) NetAddress() *NetAddress { // MConnection behavior that the rest of the P2P stack relies on. This should be // removed once the P2P core is rewritten. type Connection interface { + // Handshake handshakes with the remote peer. It must be called immediately + // after the connection is established, and returns the remote peer's node + // info and public key. The caller is responsible for validation. + // + // FIXME: The handshaking should really be the Router's responsibility, but + // that requires the connection interface to be byte-oriented rather than + // message-oriented (see comment above). + Handshake(context.Context, NodeInfo, crypto.PrivKey) (NodeInfo, crypto.PubKey, error) + // ReceiveMessage returns the next message received on the connection, // blocking until one is available. io.EOF is returned when closed. ReceiveMessage() (chID byte, msg []byte, err error) @@ -144,12 +90,6 @@ type Connection interface { // RemoteEndpoint returns the remote endpoint for the connection. RemoteEndpoint() Endpoint - // PubKey returns the remote peer's public key. - PubKey() crypto.PubKey - - // NodeInfo returns the remote peer's node info. - NodeInfo() NodeInfo - // Close closes the connection. Close() error @@ -165,3 +105,62 @@ type Connection interface { // FIXME: Only here for compatibility with the current Peer code. Status() conn.ConnectionStatus } + +// Endpoint represents a transport connection endpoint, either local or remote. +type Endpoint struct { + // Protocol specifies the transport protocol, used by the router to pick a + // transport for an endpoint. + Protocol Protocol + + // Path is an optional, arbitrary transport-specific path or identifier. + Path string + + // IP is an IP address (v4 or v6) to connect to. If set, this defines the + // endpoint as a networked endpoint. + IP net.IP + + // Port is a network port (either TCP or UDP). If not set, a default port + // may be used depending on the protocol. + Port uint16 +} + +// PeerAddress converts the endpoint into a peer address for a given node ID. +func (e Endpoint) PeerAddress(nodeID NodeID) PeerAddress { + address := PeerAddress{ + NodeID: nodeID, + Protocol: e.Protocol, + Path: e.Path, + } + if e.IP != nil { + address.Hostname = e.IP.String() + address.Port = e.Port + } + return address +} + +// String formats an endpoint as a URL string. +func (e Endpoint) String() string { + if e.IP == nil { + return fmt.Sprintf("%s:%s", e.Protocol, e.Path) + } + s := fmt.Sprintf("%s://%s", e.Protocol, e.IP) + if e.Port > 0 { + s += strconv.Itoa(int(e.Port)) + } + s += e.Path + return s +} + +// Validate validates an endpoint. +func (e Endpoint) Validate() error { + switch { + case e.Protocol == "": + return errors.New("endpoint has no protocol") + case e.Port > 0 && len(e.IP) == 0: + return fmt.Errorf("endpoint has port %v but no IP", e.Port) + case len(e.IP) == 0 && e.Path == "": + return errors.New("endpoint has neither path nor IP") + default: + return nil + } +} diff --git a/p2p/transport_mconn.go b/p2p/transport_mconn.go index 928c63adf..567bdec6f 100644 --- a/p2p/transport_mconn.go +++ b/p2p/transport_mconn.go @@ -9,133 +9,71 @@ import ( "sync" "time" + "golang.org/x/net/netutil" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/protoio" "github.com/tendermint/tendermint/p2p/conn" p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p" - - "golang.org/x/net/netutil" ) const ( - defaultDialTimeout = time.Second - defaultFilterTimeout = 5 * time.Second - defaultHandshakeTimeout = 3 * time.Second + MConnProtocol Protocol = "mconn" + TCPProtocol Protocol = "tcp" ) -// MConnProtocol is the MConn protocol identifier. -const MConnProtocol Protocol = "mconn" - -// MConnTransportOption sets an option for MConnTransport. -type MConnTransportOption func(*MConnTransport) - -// MConnTransportMaxIncomingConnections sets the maximum number of -// simultaneous incoming connections. Default: 0 (unlimited) -func MConnTransportMaxIncomingConnections(max int) MConnTransportOption { - return func(mt *MConnTransport) { mt.maxIncomingConnections = max } -} - -// MConnTransportFilterTimeout sets the timeout for filter callbacks. -func MConnTransportFilterTimeout(timeout time.Duration) MConnTransportOption { - return func(mt *MConnTransport) { mt.filterTimeout = timeout } -} - -// MConnTransportConnFilters sets connection filters. -func MConnTransportConnFilters(filters ...ConnFilterFunc) MConnTransportOption { - return func(mt *MConnTransport) { mt.connFilters = filters } -} - -// ConnFilterFunc is a callback for connection filtering. If it returns an -// error, the connection is rejected. The set of existing connections is passed -// along with the new connection and all resolved IPs. -type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error - -// ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection -// and refuses new ones if they come from a known ip. -var ConnDuplicateIPFilter ConnFilterFunc = func(cs ConnSet, c net.Conn, ips []net.IP) error { - for _, ip := range ips { - if cs.HasIP(ip) { - return ErrRejected{ - conn: c, - err: fmt.Errorf("ip<%v> already connected", ip), - isDuplicate: true, - } - } - } - return nil +// MConnTransportOptions sets options for MConnTransport. +type MConnTransportOptions struct { + // MaxAcceptedConnections is the maximum number of simultaneous accepted + // (incoming) connections. Beyond this, new connections will block until + // a slot is free. 0 means unlimited. + // + // FIXME: We may want to replace this with connection accounting in the + // Router, since it will need to do e.g. rate limiting and such as well. + // But it might also make sense to have per-transport limits. + MaxAcceptedConnections uint32 } // MConnTransport is a Transport implementation using the current multiplexed -// Tendermint protocol ("MConn"). It inherits lots of code and logic from the -// previous implementation for parity with the current P2P stack (such as -// connection filtering, peer verification, and panic handling), which should be -// moved out of the transport once the rest of the P2P stack is rewritten. +// Tendermint protocol ("MConn"). type MConnTransport struct { - privKey crypto.PrivKey - nodeInfo NodeInfo - channelDescs []*ChannelDescriptor + logger log.Logger + options MConnTransportOptions mConnConfig conn.MConnConfig + channelDescs []*ChannelDescriptor + closeCh chan struct{} + closeOnce sync.Once - maxIncomingConnections int - dialTimeout time.Duration - handshakeTimeout time.Duration - filterTimeout time.Duration - - logger log.Logger listener net.Listener - - closeOnce sync.Once - chAccept chan *mConnConnection - chError chan error - chClose chan struct{} - - // FIXME: This is a vestige from the old transport, and should be managed - // by the router once we rewrite the P2P core. - conns ConnSet - connFilters []ConnFilterFunc } -// NewMConnTransport sets up a new MConn transport. +// NewMConnTransport sets up a new MConnection transport. This uses the +// proprietary Tendermint MConnection protocol, which is implemented as +// conn.MConnection. func NewMConnTransport( logger log.Logger, - nodeInfo NodeInfo, - privKey crypto.PrivKey, mConnConfig conn.MConnConfig, - opts ...MConnTransportOption, + channelDescs []*ChannelDescriptor, + options MConnTransportOptions, ) *MConnTransport { - m := &MConnTransport{ - privKey: privKey, - nodeInfo: nodeInfo, + return &MConnTransport{ + logger: logger, + options: options, mConnConfig: mConnConfig, - channelDescs: []*ChannelDescriptor{}, - - dialTimeout: defaultDialTimeout, - handshakeTimeout: defaultHandshakeTimeout, - filterTimeout: defaultFilterTimeout, - - logger: logger, - chAccept: make(chan *mConnConnection), - chError: make(chan error), - chClose: make(chan struct{}), - - conns: NewConnSet(), - connFilters: []ConnFilterFunc{}, - } - for _, opt := range opts { - opt(m) + closeCh: make(chan struct{}), + channelDescs: channelDescs, } - return m } -// SetChannelDescriptors implements Transport. -// -// This is not concurrency-safe, and must be called before listening. -// -// FIXME: This is here for compatibility with existing switch code, -// it should be passed via the constructor instead. -func (m *MConnTransport) SetChannelDescriptors(chDescs []*conn.ChannelDescriptor) { - m.channelDescs = chDescs +// String implements Transport. +func (m *MConnTransport) String() string { + return string(MConnProtocol) +} + +// Protocols implements Transport. We support tcp for backwards-compatibility. +func (m *MConnTransport) Protocols() []Protocol { + return []Protocol{MConnProtocol, TCPProtocol} } // Listen asynchronously listens for inbound connections on the given endpoint. @@ -143,9 +81,9 @@ func (m *MConnTransport) SetChannelDescriptors(chDescs []*conn.ChannelDescriptor // call Close() to shut down the listener. func (m *MConnTransport) Listen(endpoint Endpoint) error { if m.listener != nil { - return errors.New("MConn transport is already listening") + return errors.New("transport is already listening") } - err := m.normalizeEndpoint(&endpoint) + endpoint, err := m.normalizeEndpoint(endpoint) if err != nil { return fmt.Errorf("invalid MConn listen endpoint %q: %w", endpoint, err) } @@ -154,104 +92,50 @@ func (m *MConnTransport) Listen(endpoint Endpoint) error { if err != nil { return err } - if m.maxIncomingConnections > 0 { - m.listener = netutil.LimitListener(m.listener, m.maxIncomingConnections) + if m.options.MaxAcceptedConnections > 0 { + m.listener = netutil.LimitListener(m.listener, int(m.options.MaxAcceptedConnections)) } - - // Spawn a goroutine to accept inbound connections asynchronously. - go m.accept() - return nil } -// accept accepts inbound connections in a loop, and asynchronously handshakes -// with the peer to avoid head-of-line blocking. Established connections are -// passed to Accept() via chAccept. -// See: https://github.com/tendermint/tendermint/issues/204 -func (m *MConnTransport) accept() { - for { - tcpConn, err := m.listener.Accept() - if err != nil { - // We have to check for closure first, since we don't want to - // propagate "use of closed network connection" errors. - select { - case <-m.chClose: - default: - // We also select on chClose here, in case the transport closes - // while we're blocked on error propagation. - select { - case m.chError <- err: - case <-m.chClose: - } - } - return - } - - go func() { - err := m.filterTCPConn(tcpConn) - if err != nil { - if err := tcpConn.Close(); err != nil { - m.logger.Debug("failed to close TCP connection", "err", err) - } - select { - case m.chError <- err: - case <-m.chClose: - } - return - } +// Accept implements Transport. +func (m *MConnTransport) Accept(ctx context.Context) (Connection, error) { + if m.listener == nil { + return nil, errors.New("transport is not listening") + } - conn, err := newMConnConnection(m, tcpConn, "") - if err != nil { - m.conns.Remove(tcpConn) - if err := tcpConn.Close(); err != nil { - m.logger.Debug("failed to close TCP connection", "err", err) - } - select { - case m.chError <- err: - case <-m.chClose: - } - } else { - select { - case m.chAccept <- conn: - case <-m.chClose: - if err := tcpConn.Close(); err != nil { - m.logger.Debug("failed to close TCP connection", "err", err) - } - } + if deadline, ok := ctx.Deadline(); ok { + if tcpListener, ok := m.listener.(*net.TCPListener); ok { + // FIXME: This probably needs to have a goroutine that overrides the + // deadline on context cancellation as well. + if err := tcpListener.SetDeadline(deadline); err != nil { + return nil, err } - }() + } } -} -// Accept implements Transport. -// -// accept() runs a concurrent accept loop that accepts inbound connections -// and then handshakes in a non-blocking fashion. The handshaked and validated -// connections are returned via this call, picking them off of the chAccept -// channel (or the handshake error, if any). -func (m *MConnTransport) Accept(ctx context.Context) (Connection, error) { - select { - case conn := <-m.chAccept: - return conn, nil - case err := <-m.chError: - return nil, err - case <-m.chClose: - return nil, ErrTransportClosed{} - case <-ctx.Done(): - return nil, ctx.Err() + tcpConn, err := m.listener.Accept() + if err != nil { + select { + case <-m.closeCh: + return nil, io.EOF + case <-ctx.Done(): + return nil, ctx.Err() + default: + return nil, err + } } + + return newMConnConnection(m.logger, tcpConn, m.mConnConfig, m.channelDescs), nil } // Dial implements Transport. func (m *MConnTransport) Dial(ctx context.Context, endpoint Endpoint) (Connection, error) { - err := m.normalizeEndpoint(&endpoint) + endpoint, err := m.normalizeEndpoint(endpoint) if err != nil { return nil, err } - ctx, cancel := context.WithTimeout(ctx, m.dialTimeout) - defer cancel() - dialer := net.Dialer{} tcpConn, err := dialer.DialContext(ctx, "tcp", net.JoinHostPort(endpoint.IP.String(), fmt.Sprintf("%v", endpoint.Port))) @@ -259,24 +143,7 @@ func (m *MConnTransport) Dial(ctx context.Context, endpoint Endpoint) (Connectio return nil, err } - err = m.filterTCPConn(tcpConn) - if err != nil { - if err := tcpConn.Close(); err != nil { - m.logger.Debug("failed to close TCP connection", "err", err) - } - return nil, err - } - - conn, err := newMConnConnection(m, tcpConn, endpoint.PeerID) - if err != nil { - m.conns.Remove(tcpConn) - if err := tcpConn.Close(); err != nil { - m.logger.Debug("failed to close TCP connection", "err", err) - } - return nil, err - } - - return conn, nil + return newMConnConnection(m.logger, tcpConn, m.mConnConfig, m.channelDescs), nil } // Endpoints implements Transport. @@ -284,22 +151,21 @@ func (m *MConnTransport) Endpoints() []Endpoint { if m.listener == nil { return []Endpoint{} } - addr := m.listener.Addr().(*net.TCPAddr) - return []Endpoint{{ + endpoint := Endpoint{ Protocol: MConnProtocol, - PeerID: m.nodeInfo.ID(), - IP: addr.IP, - Port: uint16(addr.Port), - }} + } + if addr, ok := m.listener.Addr().(*net.TCPAddr); ok { + endpoint.IP = addr.IP + endpoint.Port = uint16(addr.Port) + } + return []Endpoint{endpoint} } // Close implements Transport. func (m *MConnTransport) Close() error { var err error m.closeOnce.Do(func() { - // We have to close chClose first, so that accept() will detect - // the closure and not propagate the error. - close(m.chClose) + close(m.closeCh) // must be closed first, to handle error in Accept() if m.listener != nil { err = m.listener.Close() } @@ -307,88 +173,38 @@ func (m *MConnTransport) Close() error { return err } -// filterTCPConn filters a TCP connection, rejecting it if this function errors. -func (m *MConnTransport) filterTCPConn(tcpConn net.Conn) error { - if m.conns.Has(tcpConn) { - return ErrRejected{conn: tcpConn, isDuplicate: true} - } - - host, _, err := net.SplitHostPort(tcpConn.RemoteAddr().String()) - if err != nil { - return err - } - ip := net.ParseIP(host) - if ip == nil { - return fmt.Errorf("connection address has invalid IP address %q", host) - } - - // Apply filter callbacks. - chErr := make(chan error, len(m.connFilters)) - for _, connFilter := range m.connFilters { - go func(connFilter ConnFilterFunc) { - chErr <- connFilter(m.conns, tcpConn, []net.IP{ip}) - }(connFilter) - } - - for i := 0; i < cap(chErr); i++ { - select { - case err := <-chErr: - if err != nil { - return ErrRejected{conn: tcpConn, err: err, isFiltered: true} - } - case <-time.After(m.filterTimeout): - return ErrFilterTimeout{} - } - - } - - // FIXME: Doesn't really make sense to set this here, but we preserve the - // behavior from the previous P2P transport implementation. This should - // be moved to the router. - m.conns.Set(tcpConn, []net.IP{ip}) - return nil -} - // normalizeEndpoint normalizes and validates an endpoint. -func (m *MConnTransport) normalizeEndpoint(endpoint *Endpoint) error { - if endpoint == nil { - return errors.New("nil endpoint") - } +func (m *MConnTransport) normalizeEndpoint(endpoint Endpoint) (Endpoint, error) { if err := endpoint.Validate(); err != nil { - return err + return Endpoint{}, err } - if endpoint.Protocol == "" { - endpoint.Protocol = MConnProtocol - } - if endpoint.Protocol != MConnProtocol { - return fmt.Errorf("unsupported protocol %q", endpoint.Protocol) + if endpoint.Protocol != MConnProtocol && endpoint.Protocol != TCPProtocol { + return Endpoint{}, fmt.Errorf("unsupported protocol %q", endpoint.Protocol) } if len(endpoint.IP) == 0 { - return errors.New("endpoint must have an IP address") + return Endpoint{}, errors.New("endpoint must have an IP address") } if endpoint.Path != "" { - return fmt.Errorf("endpoint cannot have path (got %q)", endpoint.Path) + return Endpoint{}, fmt.Errorf("endpoint cannot have path (got %q)", endpoint.Path) } if endpoint.Port == 0 { endpoint.Port = 26657 } - return nil + return endpoint, nil } -// mConnConnection implements Connection for MConnTransport. It takes a base TCP -// connection and upgrades it to MConnection over an encrypted SecretConnection. +// mConnConnection implements Connection for MConnTransport. type mConnConnection struct { - logger log.Logger - transport *MConnTransport - secretConn *conn.SecretConnection - mConn *conn.MConnection - - peerInfo NodeInfo + logger log.Logger + conn net.Conn + mConnConfig conn.MConnConfig + channelDescs []*ChannelDescriptor + receiveCh chan mConnMessage + errorCh chan error + closeCh chan struct{} + closeOnce sync.Once - closeOnce sync.Once - chReceive chan mConnMessage - chError chan error - chClose chan struct{} + mconn *conn.MConnection // set during Handshake() } // mConnMessage passes MConnection messages through internal channels. @@ -397,184 +213,133 @@ type mConnMessage struct { payload []byte } -// newMConnConnection creates a new mConnConnection by handshaking -// with a peer. +// newMConnConnection creates a new mConnConnection. func newMConnConnection( - transport *MConnTransport, - tcpConn net.Conn, - expectPeerID NodeID, -) (c *mConnConnection, err error) { - // FIXME: Since the MConnection code panics, we need to recover here - // and turn it into an error. Be careful not to alias err, so we can - // update it from within this function. We should remove panics instead. + logger log.Logger, + conn net.Conn, + mConnConfig conn.MConnConfig, + channelDescs []*ChannelDescriptor, +) *mConnConnection { + return &mConnConnection{ + logger: logger, + conn: conn, + mConnConfig: mConnConfig, + channelDescs: channelDescs, + receiveCh: make(chan mConnMessage), + errorCh: make(chan error), + closeCh: make(chan struct{}), + } +} + +// Handshake implements Connection. +// +// FIXME: Since the MConnection code panics, we need to recover it and turn it +// into an error. We should remove panics instead. +func (c *mConnConnection) Handshake( + ctx context.Context, + nodeInfo NodeInfo, + privKey crypto.PrivKey, +) (peerInfo NodeInfo, peerKey crypto.PubKey, err error) { defer func() { if r := recover(); r != nil { - err = ErrRejected{ - conn: tcpConn, - err: fmt.Errorf("recovered from panic: %v", r), - isAuthFailure: true, - } + err = fmt.Errorf("recovered from panic: %v", r) } }() - err = tcpConn.SetDeadline(time.Now().Add(transport.handshakeTimeout)) - if err != nil { - err = ErrRejected{ - conn: tcpConn, - err: fmt.Errorf("secret conn failed: %v", err), - isAuthFailure: true, - } - return - } + peerInfo, peerKey, err = c.handshake(ctx, nodeInfo, privKey) + return +} - c = &mConnConnection{ - transport: transport, - chReceive: make(chan mConnMessage), - chError: make(chan error), - chClose: make(chan struct{}), - } - c.secretConn, err = conn.MakeSecretConnection(tcpConn, transport.privKey) - if err != nil { - err = ErrRejected{ - conn: tcpConn, - err: fmt.Errorf("secret conn failed: %v", err), - isAuthFailure: true, - } - return +// handshake is a helper for Handshake, simplifying error handling so we can +// keep panic recovery in Handshake. It sets c.mconn. +// +// FIXME: Move this into Handshake() when MConnection no longer panics. +func (c *mConnConnection) handshake( + ctx context.Context, + nodeInfo NodeInfo, + privKey crypto.PrivKey, +) (NodeInfo, crypto.PubKey, error) { + if c.mconn != nil { + return NodeInfo{}, nil, errors.New("connection is already handshaked") } - c.peerInfo, err = c.handshake() - if err != nil { - err = ErrRejected{ - conn: tcpConn, - err: fmt.Errorf("handshake failed: %v", err), - isAuthFailure: true, + + if deadline, ok := ctx.Deadline(); ok { + if err := c.conn.SetDeadline(deadline); err != nil { + return NodeInfo{}, nil, err } - return } - // Validate node info. - // FIXME: All of the ID verification code below should be moved to the - // router once implemented. - err = c.peerInfo.Validate() + secretConn, err := conn.MakeSecretConnection(c.conn, privKey) if err != nil { - err = ErrRejected{ - conn: tcpConn, - err: err, - isNodeInfoInvalid: true, - } - return - } - - // For outgoing conns, ensure connection key matches dialed key. - if expectPeerID != "" { - peerID := NodeIDFromPubKey(c.PubKey()) - if expectPeerID != peerID { - err = ErrRejected{ - conn: tcpConn, - id: peerID, - err: fmt.Errorf( - "conn.ID (%v) dialed ID (%v) mismatch", - peerID, - expectPeerID, - ), - isAuthFailure: true, - } - return - } + return NodeInfo{}, nil, err } - // Reject self. - if transport.nodeInfo.ID() == c.peerInfo.ID() { - err = ErrRejected{ - addr: *NewNetAddress(c.peerInfo.ID(), c.secretConn.RemoteAddr()), - conn: tcpConn, - id: c.peerInfo.ID(), - isSelf: true, + var pbPeerInfo p2pproto.NodeInfo + errCh := make(chan error, 2) + go func() { + _, err := protoio.NewDelimitedWriter(secretConn).WriteMsg(nodeInfo.ToProto()) + errCh <- err + }() + go func() { + _, err := protoio.NewDelimitedReader(secretConn, MaxNodeInfoSize()).ReadMsg(&pbPeerInfo) + errCh <- err + }() + for i := 0; i < cap(errCh); i++ { + if err = <-errCh; err != nil { + return NodeInfo{}, nil, err } - return } - - err = transport.nodeInfo.CompatibleWith(c.peerInfo) + peerInfo, err := NodeInfoFromProto(&pbPeerInfo) if err != nil { - err = ErrRejected{ - conn: tcpConn, - err: err, - id: c.peerInfo.ID(), - isIncompatible: true, - } - return + return NodeInfo{}, nil, err } - err = tcpConn.SetDeadline(time.Time{}) - if err != nil { - err = ErrRejected{ - conn: tcpConn, - err: fmt.Errorf("secret conn failed: %v", err), - isAuthFailure: true, - } - return + if err = c.conn.SetDeadline(time.Time{}); err != nil { + return NodeInfo{}, nil, err } - // Set up the MConnection wrapper - c.mConn = conn.NewMConnectionWithConfig( - c.secretConn, - transport.channelDescs, + c.logger = c.logger.With("peer", c.RemoteEndpoint().PeerAddress(peerInfo.NodeID)) + + mconn := conn.NewMConnectionWithConfig( + secretConn, + c.channelDescs, c.onReceive, c.onError, - transport.mConnConfig, + c.mConnConfig, ) - // FIXME: Log format is set up for compatibility with existing peer code. - c.logger = transport.logger.With("peer", c.RemoteEndpoint().NetAddress()) - c.mConn.SetLogger(c.logger) - err = c.mConn.Start() - return c, err -} - -// handshake performs an MConn handshake, returning the peer's node info. -func (c *mConnConnection) handshake() (NodeInfo, error) { - var pbNodeInfo p2pproto.NodeInfo - chErr := make(chan error, 2) - go func() { - _, err := protoio.NewDelimitedWriter(c.secretConn).WriteMsg(c.transport.nodeInfo.ToProto()) - chErr <- err - }() - go func() { - _, err := protoio.NewDelimitedReader(c.secretConn, MaxNodeInfoSize()).ReadMsg(&pbNodeInfo) - chErr <- err - }() - for i := 0; i < cap(chErr); i++ { - if err := <-chErr; err != nil { - return NodeInfo{}, err - } + mconn.SetLogger(c.logger) + if err = mconn.Start(); err != nil { + return NodeInfo{}, nil, err } + c.mconn = mconn - return NodeInfoFromProto(&pbNodeInfo) + return peerInfo, secretConn.RemotePubKey(), nil } // onReceive is a callback for MConnection received messages. func (c *mConnConnection) onReceive(channelID byte, payload []byte) { select { - case c.chReceive <- mConnMessage{channelID: channelID, payload: payload}: - case <-c.chClose: + case c.receiveCh <- mConnMessage{channelID: channelID, payload: payload}: + case <-c.closeCh: } } -// onError is a callback for MConnection errors. The error is passed to -// chError, which is only consumed by ReceiveMessage() for parity with -// the old MConnection behavior. +// onError is a callback for MConnection errors. The error is passed to errorCh, +// which is only consumed by ReceiveMessage() for parity with the old +// MConnection behavior. func (c *mConnConnection) onError(e interface{}) { err, ok := e.(error) if !ok { err = fmt.Errorf("%v", err) } select { - case c.chError <- err: - case <-c.chClose: + case c.errorCh <- err: + case <-c.closeCh: } } // String displays connection information. -// FIXME: This is here for backwards compatibility with existing code, +// FIXME: This is here for backwards compatibility with existing logging, // it should probably just return RemoteEndpoint().String(), if anything. func (c *mConnConnection) String() string { endpoint := c.RemoteEndpoint() @@ -583,57 +348,44 @@ func (c *mConnConnection) String() string { // SendMessage implements Connection. func (c *mConnConnection) SendMessage(channelID byte, msg []byte) (bool, error) { - // We don't check chError here, to preserve old MConnection behavior. + // We don't check errorCh here, to preserve old MConnection behavior. select { - case <-c.chClose: + case <-c.closeCh: return false, io.EOF default: - return c.mConn.Send(channelID, msg), nil + return c.mconn.Send(channelID, msg), nil } } // TrySendMessage implements Connection. func (c *mConnConnection) TrySendMessage(channelID byte, msg []byte) (bool, error) { - // We don't check chError here, to preserve old MConnection behavior. + // We don't check errorCh here, to preserve old MConnection behavior. select { - case <-c.chClose: + case <-c.closeCh: return false, io.EOF default: - return c.mConn.TrySend(channelID, msg), nil + return c.mconn.TrySend(channelID, msg), nil } } // ReceiveMessage implements Connection. func (c *mConnConnection) ReceiveMessage() (byte, []byte, error) { select { - case err := <-c.chError: + case err := <-c.errorCh: return 0, nil, err - case <-c.chClose: + case <-c.closeCh: return 0, nil, io.EOF - case msg := <-c.chReceive: + case msg := <-c.receiveCh: return msg.channelID, msg.payload, nil } } -// NodeInfo implements Connection. -func (c *mConnConnection) NodeInfo() NodeInfo { - return c.peerInfo -} - -// PubKey implements Connection. -func (c *mConnConnection) PubKey() crypto.PubKey { - return c.secretConn.RemotePubKey() -} - // LocalEndpoint implements Connection. func (c *mConnConnection) LocalEndpoint() Endpoint { - // FIXME: For compatibility with existing P2P tests we need to - // handle non-TCP connections. This should be removed. endpoint := Endpoint{ Protocol: MConnProtocol, - PeerID: c.transport.nodeInfo.ID(), } - if addr, ok := c.secretConn.LocalAddr().(*net.TCPAddr); ok { + if addr, ok := c.conn.LocalAddr().(*net.TCPAddr); ok { endpoint.IP = addr.IP endpoint.Port = uint16(addr.Port) } @@ -642,13 +394,10 @@ func (c *mConnConnection) LocalEndpoint() Endpoint { // RemoteEndpoint implements Connection. func (c *mConnConnection) RemoteEndpoint() Endpoint { - // FIXME: For compatibility with existing P2P tests we need to - // handle non-TCP connections. This should be removed. endpoint := Endpoint{ Protocol: MConnProtocol, - PeerID: c.peerInfo.ID(), } - if addr, ok := c.secretConn.RemoteAddr().(*net.TCPAddr); ok { + if addr, ok := c.conn.RemoteAddr().(*net.TCPAddr); ok { endpoint.IP = addr.IP endpoint.Port = uint16(addr.Port) } @@ -657,26 +406,36 @@ func (c *mConnConnection) RemoteEndpoint() Endpoint { // Status implements Connection. func (c *mConnConnection) Status() conn.ConnectionStatus { - return c.mConn.Status() + if c.mconn == nil { + return conn.ConnectionStatus{} + } + return c.mconn.Status() } // Close implements Connection. func (c *mConnConnection) Close() error { - c.transport.conns.RemoveAddr(c.secretConn.RemoteAddr()) var err error c.closeOnce.Do(func() { - err = c.mConn.Stop() - close(c.chClose) + if c.mconn != nil { + err = c.mconn.Stop() + } else { + err = c.conn.Close() + } + close(c.closeCh) }) return err } // FlushClose implements Connection. func (c *mConnConnection) FlushClose() error { - c.transport.conns.RemoveAddr(c.secretConn.RemoteAddr()) + var err error c.closeOnce.Do(func() { - c.mConn.FlushStop() - close(c.chClose) + if c.mconn != nil { + c.mconn.FlushStop() + } else { + err = c.conn.Close() + } + close(c.closeCh) }) - return nil + return err } diff --git a/p2p/transport_memory.go b/p2p/transport_memory.go index 96a948ce1..7faa46860 100644 --- a/p2p/transport_memory.go +++ b/p2p/transport_memory.go @@ -8,8 +8,8 @@ import ( "sync" "github.com/tendermint/tendermint/crypto" - "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/libs/log" + tmsync "github.com/tendermint/tendermint/libs/sync" "github.com/tendermint/tendermint/p2p/conn" ) @@ -35,21 +35,11 @@ func NewMemoryNetwork(logger log.Logger) *MemoryNetwork { } } -// CreateTransport creates a new memory transport and endpoint for the given -// NodeInfo and private key. Use GenerateTransport() to autogenerate a random -// key and node info. -// -// The transport immediately begins listening on the endpoint "memory:", and +// CreateTransport creates a new memory transport and endpoint with the given +// node ID. It immediately begins listening on the endpoint "memory:", and // can be accessed by other transports in the same memory network. -func (n *MemoryNetwork) CreateTransport( - nodeInfo NodeInfo, - privKey crypto.PrivKey, -) (*MemoryTransport, error) { - nodeID := nodeInfo.NodeID - if nodeID == "" { - return nil, errors.New("no node ID") - } - t := newMemoryTransport(n, nodeInfo, privKey) +func (n *MemoryNetwork) CreateTransport(nodeID NodeID) (*MemoryTransport, error) { + t := newMemoryTransport(n, nodeID) n.mtx.Lock() defer n.mtx.Unlock() @@ -60,25 +50,6 @@ func (n *MemoryNetwork) CreateTransport( return t, nil } -// GenerateTransport generates a new transport endpoint by generating a random -// private key and node info. The endpoint address can be obtained via -// Transport.Endpoints(). -func (n *MemoryNetwork) GenerateTransport() *MemoryTransport { - privKey := ed25519.GenPrivKey() - nodeID := NodeIDFromPubKey(privKey.PubKey()) - nodeInfo := NodeInfo{ - NodeID: nodeID, - ListenAddr: fmt.Sprintf("%v:%v", MemoryProtocol, nodeID), - } - t, err := n.CreateTransport(nodeInfo, privKey) - if err != nil { - // GenerateTransport is only used for testing, and the likelihood of - // generating a duplicate node ID is very low, so we'll panic. - panic(err) - } - return t -} - // GetTransport looks up a transport in the network, returning nil if not found. func (n *MemoryNetwork) GetTransport(id NodeID) *MemoryTransport { n.mtx.RLock() @@ -105,10 +76,9 @@ func (n *MemoryNetwork) RemoveTransport(id NodeID) error { // It communicates between endpoints using Go channels. To dial a different // endpoint, both endpoints/transports must be in the same MemoryNetwork. type MemoryTransport struct { - network *MemoryNetwork - nodeInfo NodeInfo - privKey crypto.PrivKey - logger log.Logger + network *MemoryNetwork + nodeID NodeID + logger log.Logger acceptCh chan *MemoryConnection closeCh chan struct{} @@ -118,17 +88,11 @@ type MemoryTransport struct { // newMemoryTransport creates a new in-memory transport in the given network. // Callers should use MemoryNetwork.CreateTransport() or GenerateTransport() // to create transports, this is for internal use by MemoryNetwork. -func newMemoryTransport( - network *MemoryNetwork, - nodeInfo NodeInfo, - privKey crypto.PrivKey, -) *MemoryTransport { +func newMemoryTransport(network *MemoryNetwork, nodeID NodeID) *MemoryTransport { return &MemoryTransport{ - network: network, - nodeInfo: nodeInfo, - privKey: privKey, - logger: network.logger.With("local", - fmt.Sprintf("%v:%v", MemoryProtocol, nodeInfo.NodeID)), + network: network, + nodeID: nodeID, + logger: network.logger.With("local", fmt.Sprintf("%v:%v", MemoryProtocol, nodeID)), acceptCh: make(chan *MemoryConnection), closeCh: make(chan struct{}), @@ -136,13 +100,13 @@ func newMemoryTransport( } // String displays the transport. -// -// FIXME: The Transport interface should either have Name() or embed -// fmt.Stringer. This is necessary since we log the transport (to know which one -// it is), and if it doesn't implement fmt.Stringer then it inspects all struct -// contents via reflect, which triggers the race detector. func (t *MemoryTransport) String() string { - return "memory" + return string(MemoryProtocol) +} + +// Protocols implements Transport. +func (t *MemoryTransport) Protocols() []Protocol { + return []Protocol{MemoryProtocol} } // Accept implements Transport. @@ -152,7 +116,7 @@ func (t *MemoryTransport) Accept(ctx context.Context) (Connection, error) { t.logger.Info("accepted connection from peer", "remote", conn.RemoteEndpoint()) return conn, nil case <-t.closeCh: - return nil, ErrTransportClosed{} + return nil, io.EOF case <-ctx.Done(): return nil, ctx.Err() } @@ -163,30 +127,25 @@ func (t *MemoryTransport) Dial(ctx context.Context, endpoint Endpoint) (Connecti if endpoint.Protocol != MemoryProtocol { return nil, fmt.Errorf("invalid protocol %q", endpoint.Protocol) } - if endpoint.PeerID == "" { - return nil, errors.New("no peer ID") + if endpoint.Path == "" { + return nil, errors.New("no path") + } + nodeID, err := NewNodeID(endpoint.Path) + if err != nil { + return nil, err } t.logger.Info("dialing peer", "remote", endpoint) - peerTransport := t.network.GetTransport(endpoint.PeerID) + peerTransport := t.network.GetTransport(nodeID) if peerTransport == nil { - return nil, fmt.Errorf("unknown peer %q", endpoint.PeerID) + return nil, fmt.Errorf("unknown peer %q", nodeID) } inCh := make(chan memoryMessage, 1) outCh := make(chan memoryMessage, 1) - closeCh := make(chan struct{}) - closeOnce := sync.Once{} - closer := func() bool { - closed := false - closeOnce.Do(func() { - close(closeCh) - closed = true - }) - return closed - } + closer := tmsync.NewCloser() - outConn := newMemoryConnection(t, peerTransport, inCh, outCh, closeCh, closer) - inConn := newMemoryConnection(peerTransport, t, outCh, inCh, closeCh, closer) + outConn := newMemoryConnection(t, peerTransport, inCh, outCh, closer) + inConn := newMemoryConnection(peerTransport, t, outCh, inCh, closer) select { case peerTransport.acceptCh <- inConn: @@ -206,7 +165,7 @@ func (t *MemoryTransport) DialAccept( ) (Connection, Connection, error) { endpoints := peer.Endpoints() if len(endpoints) == 0 { - return nil, nil, fmt.Errorf("peer %q not listening on any endpoints", peer.nodeInfo.NodeID) + return nil, nil, fmt.Errorf("peer %q not listening on any endpoints", peer.nodeID) } acceptCh := make(chan Connection, 1) @@ -231,7 +190,7 @@ func (t *MemoryTransport) DialAccept( // Close implements Transport. func (t *MemoryTransport) Close() error { - err := t.network.RemoveTransport(t.nodeInfo.NodeID) + err := t.network.RemoveTransport(t.nodeID) t.closeOnce.Do(func() { close(t.closeCh) }) @@ -247,15 +206,11 @@ func (t *MemoryTransport) Endpoints() []Endpoint { default: return []Endpoint{{ Protocol: MemoryProtocol, - PeerID: t.nodeInfo.NodeID, + Path: string(t.nodeID), }} } } -// SetChannelDescriptors implements Transport. -func (t *MemoryTransport) SetChannelDescriptors(chDescs []*conn.ChannelDescriptor) { -} - // MemoryConnection is an in-memory connection between two transports (nodes). type MemoryConnection struct { logger log.Logger @@ -264,14 +219,18 @@ type MemoryConnection struct { receiveCh <-chan memoryMessage sendCh chan<- memoryMessage - closeCh <-chan struct{} - close func() bool + closer *tmsync.Closer } // memoryMessage is used to pass messages internally in the connection. +// For handshakes, nodeInfo and pubKey are set instead of channel and message. type memoryMessage struct { channel byte message []byte + + // For handshakes. + nodeInfo NodeInfo + pubKey crypto.PubKey } // newMemoryConnection creates a new MemoryConnection. It takes all channels @@ -282,26 +241,49 @@ func newMemoryConnection( remote *MemoryTransport, receiveCh <-chan memoryMessage, sendCh chan<- memoryMessage, - closeCh <-chan struct{}, - close func() bool, + closer *tmsync.Closer, ) *MemoryConnection { c := &MemoryConnection{ local: local, remote: remote, receiveCh: receiveCh, sendCh: sendCh, - closeCh: closeCh, - close: close, + closer: closer, } c.logger = c.local.logger.With("remote", c.RemoteEndpoint()) return c } +// Handshake implements Connection. +func (c *MemoryConnection) Handshake( + ctx context.Context, + nodeInfo NodeInfo, + privKey crypto.PrivKey, +) (NodeInfo, crypto.PubKey, error) { + select { + case c.sendCh <- memoryMessage{nodeInfo: nodeInfo, pubKey: privKey.PubKey()}: + case <-ctx.Done(): + return NodeInfo{}, nil, ctx.Err() + case <-c.closer.Done(): + return NodeInfo{}, nil, io.EOF + } + + select { + case msg := <-c.receiveCh: + c.logger.Debug("handshake complete") + return msg.nodeInfo, msg.pubKey, nil + case <-ctx.Done(): + return NodeInfo{}, nil, ctx.Err() + case <-c.closer.Done(): + return NodeInfo{}, nil, io.EOF + } +} + // ReceiveMessage implements Connection. func (c *MemoryConnection) ReceiveMessage() (chID byte, msg []byte, err error) { // check close first, since channels are buffered select { - case <-c.closeCh: + case <-c.closer.Done(): return 0, nil, io.EOF default: } @@ -310,7 +292,7 @@ func (c *MemoryConnection) ReceiveMessage() (chID byte, msg []byte, err error) { case msg := <-c.receiveCh: c.logger.Debug("received message", "channel", msg.channel, "message", msg.message) return msg.channel, msg.message, nil - case <-c.closeCh: + case <-c.closer.Done(): return 0, nil, io.EOF } } @@ -319,7 +301,7 @@ func (c *MemoryConnection) ReceiveMessage() (chID byte, msg []byte, err error) { func (c *MemoryConnection) SendMessage(chID byte, msg []byte) (bool, error) { // check close first, since channels are buffered select { - case <-c.closeCh: + case <-c.closer.Done(): return false, io.EOF default: } @@ -328,7 +310,7 @@ func (c *MemoryConnection) SendMessage(chID byte, msg []byte) (bool, error) { case c.sendCh <- memoryMessage{channel: chID, message: msg}: c.logger.Debug("sent message", "channel", chID, "message", msg) return true, nil - case <-c.closeCh: + case <-c.closer.Done(): return false, io.EOF } } @@ -337,7 +319,7 @@ func (c *MemoryConnection) SendMessage(chID byte, msg []byte) (bool, error) { func (c *MemoryConnection) TrySendMessage(chID byte, msg []byte) (bool, error) { // check close first, since channels are buffered select { - case <-c.closeCh: + case <-c.closer.Done(): return false, io.EOF default: } @@ -346,7 +328,7 @@ func (c *MemoryConnection) TrySendMessage(chID byte, msg []byte) (bool, error) { case c.sendCh <- memoryMessage{channel: chID, message: msg}: c.logger.Debug("sent message", "channel", chID, "message", msg) return true, nil - case <-c.closeCh: + case <-c.closer.Done(): return false, io.EOF default: return false, nil @@ -355,9 +337,8 @@ func (c *MemoryConnection) TrySendMessage(chID byte, msg []byte) (bool, error) { // Close closes the connection. func (c *MemoryConnection) Close() error { - if c.close() { - c.logger.Info("closed connection") - } + c.closer.Close() + c.logger.Info("closed connection") return nil } @@ -369,29 +350,19 @@ func (c *MemoryConnection) FlushClose() error { // LocalEndpoint returns the local endpoint for the connection. func (c *MemoryConnection) LocalEndpoint() Endpoint { return Endpoint{ - PeerID: c.local.nodeInfo.NodeID, Protocol: MemoryProtocol, + Path: string(c.local.nodeID), } } // RemoteEndpoint returns the remote endpoint for the connection. func (c *MemoryConnection) RemoteEndpoint() Endpoint { return Endpoint{ - PeerID: c.remote.nodeInfo.NodeID, Protocol: MemoryProtocol, + Path: string(c.remote.nodeID), } } -// PubKey returns the remote peer's public key. -func (c *MemoryConnection) PubKey() crypto.PubKey { - return c.remote.privKey.PubKey() -} - -// NodeInfo returns the remote peer's node info. -func (c *MemoryConnection) NodeInfo() NodeInfo { - return c.remote.nodeInfo -} - // Status returns the current connection status. func (c *MemoryConnection) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} diff --git a/p2p/transport_memory_test.go b/p2p/transport_memory_test.go index a9f8761ae..6b79005e8 100644 --- a/p2p/transport_memory_test.go +++ b/p2p/transport_memory_test.go @@ -13,14 +13,16 @@ import ( func TestMemoryTransport(t *testing.T) { ctx := context.Background() network := p2p.NewMemoryNetwork(log.TestingLogger()) - a := network.GenerateTransport() - b := network.GenerateTransport() - c := network.GenerateTransport() + a, err := network.CreateTransport("0a") + require.NoError(t, err) + b, err := network.CreateTransport("0b") + require.NoError(t, err) + c, err := network.CreateTransport("0c") + require.NoError(t, err) // Dialing a missing endpoint should fail. - _, err := a.Dial(ctx, p2p.Endpoint{ + _, err = a.Dial(ctx, p2p.Endpoint{ Protocol: p2p.MemoryProtocol, - PeerID: p2p.NodeID("foo"), Path: "foo", }) require.Error(t, err) @@ -37,68 +39,68 @@ func TestMemoryTransport(t *testing.T) { defer cToA.Close() // Send and receive a message both ways a→b and b→a - sent, err := aToB.SendMessage(1, []byte("hi!")) + sent, err := aToB.SendMessage(1, []byte{0x01}) require.NoError(t, err) require.True(t, sent) ch, msg, err := bToA.ReceiveMessage() require.NoError(t, err) require.EqualValues(t, 1, ch) - require.EqualValues(t, "hi!", msg) + require.EqualValues(t, []byte{0x01}, msg) - sent, err = bToA.SendMessage(1, []byte("hello")) + sent, err = bToA.SendMessage(1, []byte{0x02}) require.NoError(t, err) require.True(t, sent) ch, msg, err = aToB.ReceiveMessage() require.NoError(t, err) require.EqualValues(t, 1, ch) - require.EqualValues(t, "hello", msg) + require.EqualValues(t, []byte{0x02}, msg) // Send and receive a message both ways a→c and c→a - sent, err = aToC.SendMessage(1, []byte("foo")) + sent, err = aToC.SendMessage(1, []byte{0x03}) require.NoError(t, err) require.True(t, sent) ch, msg, err = cToA.ReceiveMessage() require.NoError(t, err) require.EqualValues(t, 1, ch) - require.EqualValues(t, "foo", msg) + require.EqualValues(t, []byte{0x03}, msg) - sent, err = cToA.SendMessage(1, []byte("bar")) + sent, err = cToA.SendMessage(1, []byte{0x04}) require.NoError(t, err) require.True(t, sent) ch, msg, err = aToC.ReceiveMessage() require.NoError(t, err) require.EqualValues(t, 1, ch) - require.EqualValues(t, "bar", msg) + require.EqualValues(t, []byte{0x04}, msg) // If we close aToB, sending and receiving on either end will error. err = aToB.Close() require.NoError(t, err) - _, err = aToB.SendMessage(1, []byte("foo")) + _, err = aToB.SendMessage(1, []byte{0x05}) require.Equal(t, io.EOF, err) _, _, err = aToB.ReceiveMessage() require.Equal(t, io.EOF, err) - _, err = bToA.SendMessage(1, []byte("foo")) + _, err = bToA.SendMessage(1, []byte{0x06}) require.Equal(t, io.EOF, err) _, _, err = bToA.ReceiveMessage() require.Equal(t, io.EOF, err) // We can still send aToC. - sent, err = aToC.SendMessage(1, []byte("foo")) + sent, err = aToC.SendMessage(1, []byte{0x07}) require.NoError(t, err) require.True(t, sent) ch, msg, err = cToA.ReceiveMessage() require.NoError(t, err) require.EqualValues(t, 1, ch) - require.EqualValues(t, "foo", msg) + require.EqualValues(t, []byte{0x07}, msg) // If we close the c transport, it will no longer accept connections, // but we can still use the open connection. @@ -110,12 +112,12 @@ func TestMemoryTransport(t *testing.T) { _, err = a.Dial(ctx, endpoint) require.Error(t, err) - sent, err = aToC.SendMessage(1, []byte("bar")) + sent, err = aToC.SendMessage(1, []byte{0x08}) require.NoError(t, err) require.True(t, sent) ch, msg, err = cToA.ReceiveMessage() require.NoError(t, err) require.EqualValues(t, 1, ch) - require.EqualValues(t, "bar", msg) + require.EqualValues(t, []byte{0x08}, msg) } diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index 14f8896ae..4ad921e9e 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -503,13 +503,30 @@ func createConsensusReactor(config *cfg.Config, func createTransport( logger log.Logger, config *cfg.Config, +) *p2p.MConnTransport { + return p2p.NewMConnTransport( + logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{}, + p2p.MConnTransportOptions{ + MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers + + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")), + ), + }, + ) +} + +func createSwitch(config *cfg.Config, + transport p2p.Transport, + p2pMetrics *p2p.Metrics, + mempoolReactor *p2p.ReactorShim, + bcReactor p2p.Reactor, + stateSyncReactor *p2p.ReactorShim, + consensusReactor *cs.Reactor, + evidenceReactor *p2p.ReactorShim, + proxyApp proxy.AppConns, nodeInfo p2p.NodeInfo, nodeKey p2p.NodeKey, - proxyApp proxy.AppConns, -) ( - *p2p.MConnTransport, - []p2p.PeerFilterFunc, -) { + p2pLogger log.Logger) *p2p.Switch { + var ( connFilters = []p2p.ConnFilterFunc{} peerFilters = []p2p.PeerFilterFunc{} @@ -559,34 +576,12 @@ func createTransport( ) } - transport := p2p.NewMConnTransport( - logger, nodeInfo, nodeKey.PrivKey, p2p.MConnConfig(config.P2P), - p2p.MConnTransportConnFilters(connFilters...), - p2p.MConnTransportMaxIncomingConnections(config.P2P.MaxNumInboundPeers+ - len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))), - ) - - return transport, peerFilters -} - -func createSwitch(config *cfg.Config, - transport p2p.Transport, - p2pMetrics *p2p.Metrics, - peerFilters []p2p.PeerFilterFunc, - mempoolReactor *p2p.ReactorShim, - bcReactor p2p.Reactor, - stateSyncReactor *p2p.ReactorShim, - consensusReactor *cs.Reactor, - evidenceReactor *p2p.ReactorShim, - nodeInfo p2p.NodeInfo, - nodeKey p2p.NodeKey, - p2pLogger log.Logger) *p2p.Switch { - sw := p2p.NewSwitch( config.P2P, transport, p2p.WithMetrics(p2pMetrics), p2p.SwitchPeerFilters(peerFilters...), + p2p.SwitchConnFilters(connFilters...), ) sw.SetLogger(p2pLogger) sw.AddReactor("MEMPOOL", mempoolReactor) @@ -872,10 +867,10 @@ func NewNode(config *cfg.Config, // Setup Transport and Switch. p2pLogger := logger.With("module", "p2p") - transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) + transport := createTransport(p2pLogger, config) sw := createSwitch( - config, transport, p2pMetrics, peerFilters, mpReactorShim, bcReactorForSwitch, - stateSyncReactorShim, csReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, + config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, + stateSyncReactorShim, csReactor, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, ) err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) @@ -990,12 +985,6 @@ func (n *Node) OnStart() error { } } - // Start the switch (the P2P server). - err := n.sw.Start() - if err != nil { - return err - } - // Start the transport. addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress)) if err != nil { @@ -1007,6 +996,12 @@ func (n *Node) OnStart() error { n.isListening = true + // Start the switch (the P2P server). + err = n.sw.Start() + if err != nil { + return err + } + if n.config.FastSync.Version == "v0" { // Start the real blockchain reactor separately since the switch uses the shim. if err := n.bcReactor.Start(); err != nil {