diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 4e31df98d..52357008d 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -224,18 +224,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { ticker.SetLogger(css[0].Logger) css[0].SetTimeoutTicker(ticker) - switches := make([]*p2p.Switch, N) p2pLogger := logger.With("module", "p2p") - for i := 0; i < N; i++ { - switches[i] = p2p.MakeSwitch( - config.P2P, - i, - "foo", "1.0.0", - func(i int, sw *p2p.Switch) *p2p.Switch { - return sw - }) - switches[i].SetLogger(p2pLogger.With("validator", i)) - } blocksSubs := make([]types.Subscription, N) reactors := make([]p2p.Reactor, N) @@ -243,20 +232,6 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { // enable txs so we can create different proposals assertMempool(css[i].txNotifier).EnableTxsAvailable() - // make first val byzantine - if i == 0 { - // NOTE: Now, test validators are MockPV, which by default doesn't - // do any safety checks. - css[i].privValidator.(types.MockPV).DisableChecks() - css[i].decideProposal = func(j int32) func(int64, int32) { - return func(height int64, round int32) { - byzantineDecideProposalFunc(t, height, round, css[j], switches[j]) - } - }(int32(i)) - // We are setting the prevote function to do nothing because the prevoting - // and precommitting are done alongside the proposal. - css[i].doPrevote = func(height int64, round int32) {} - } eventBus := css[i].eventBus eventBus.SetLogger(logger.With("module", "events", "validator", i)) @@ -281,22 +256,10 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { require.NoError(t, err) } - defer func() { - for _, r := range reactors { - if rr, ok := r.(*ByzantineReactor); ok { - err := rr.reactor.Switch.Stop() - require.NoError(t, err) - } else { - err := r.(*Reactor).Switch.Stop() - require.NoError(t, err) - } - } - }() - - p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { - // ignore new switch s, we already made ours - switches[i].AddReactor("CONSENSUS", reactors[i]) - return switches[i] + switches := p2p.MakeConnectedSwitches(config.P2P, N, func(i int, sw *p2p.Switch) *p2p.Switch { + sw.SetLogger(p2pLogger.With("validator", i)) + sw.AddReactor("CONSENSUS", reactors[i]) + return sw }, func(sws []*p2p.Switch, i, j int) { // the network starts partitioned with globally active adversary if i != 0 { @@ -305,6 +268,26 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { p2p.Connect2Switches(sws, i, j) }) + // make first val byzantine + // NOTE: Now, test validators are MockPV, which by default doesn't + // do any safety checks. + css[0].privValidator.(types.MockPV).DisableChecks() + css[0].decideProposal = func(j int32) func(int64, int32) { + return func(height int64, round int32) { + byzantineDecideProposalFunc(t, height, round, css[j], switches[j]) + } + }(int32(0)) + // We are setting the prevote function to do nothing because the prevoting + // and precommitting are done alongside the proposal. + css[0].doPrevote = func(height int64, round int32) {} + + defer func() { + for _, sw := range switches { + err := sw.Stop() + require.NoError(t, err) + } + }() + // start the non-byz state machines. // note these must be started before the byz for i := 1; i < N; i++ { diff --git a/libs/log/testing_logger.go b/libs/log/testing_logger.go index 8914bd81f..95a7e3f7c 100644 --- a/libs/log/testing_logger.go +++ b/libs/log/testing_logger.go @@ -3,6 +3,7 @@ package log import ( "io" "os" + "sync" "testing" "github.com/go-kit/kit/log/term" @@ -10,7 +11,8 @@ import ( var ( // reuse the same logger across all tests - _testingLogger Logger + _testingLoggerMutex = sync.Mutex{} + _testingLogger Logger ) // TestingLogger returns a TMLogger which writes to STDOUT if testing being run @@ -30,6 +32,8 @@ func TestingLogger() Logger { // inside a test (not in the init func) because // verbose flag only set at the time of testing. func TestingLoggerWithOutput(w io.Writer) Logger { + _testingLoggerMutex.Lock() + defer _testingLoggerMutex.Unlock() if _testingLogger != nil { return _testingLogger } @@ -46,6 +50,8 @@ func TestingLoggerWithOutput(w io.Writer) Logger { // TestingLoggerWithColorFn allow you to provide your own color function. See // TestingLogger for documentation. func TestingLoggerWithColorFn(colorFn func(keyvals ...interface{}) term.FgBgColor) Logger { + _testingLoggerMutex.Lock() + defer _testingLoggerMutex.Unlock() if _testingLogger != nil { return _testingLogger } diff --git a/node/node.go b/node/node.go index cc047db9c..8d1a9795f 100644 --- a/node/node.go +++ b/node/node.go @@ -180,7 +180,7 @@ type Node struct { privValidator types.PrivValidator // local node's validator key // network - transport *p2p.MultiplexTransport + transport *p2p.MConnTransport sw *p2p.Switch // p2p connections addrBook pex.AddrBook // known peers nodeInfo p2p.NodeInfo @@ -409,23 +409,22 @@ func createConsensusReactor(config *cfg.Config, } func createTransport( + logger log.Logger, config *cfg.Config, nodeInfo p2p.NodeInfo, nodeKey p2p.NodeKey, proxyApp proxy.AppConns, ) ( - *p2p.MultiplexTransport, + *p2p.MConnTransport, []p2p.PeerFilterFunc, ) { var ( - mConnConfig = p2p.MConnConfig(config.P2P) - transport = p2p.NewMultiplexTransport(nodeInfo, nodeKey, mConnConfig) connFilters = []p2p.ConnFilterFunc{} peerFilters = []p2p.PeerFilterFunc{} ) if !config.P2P.AllowDuplicateIP { - connFilters = append(connFilters, p2p.ConnDuplicateIPFilter()) + connFilters = append(connFilters, p2p.ConnDuplicateIPFilter) } // Filter peers by addr or pubkey with an ABCI query. @@ -468,11 +467,12 @@ func createTransport( ) } - p2p.MultiplexTransportConnFilters(connFilters...)(transport) - - // Limit the number of incoming connections. - max := config.P2P.MaxNumInboundPeers + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) - p2p.MultiplexTransportMaxIncomingConnections(max)(transport) + transport := p2p.NewMConnTransport( + logger, nodeInfo, nodeKey.PrivKey, p2p.MConnConfig(config.P2P), + p2p.MConnTransportConnFilters(connFilters...), + p2p.MConnTransportMaxIncomingConnections(config.P2P.MaxNumInboundPeers+ + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))), + ) return transport, peerFilters } @@ -764,11 +764,9 @@ func NewNode(config *cfg.Config, return nil, err } - // Setup Transport. - transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp) - - // Setup Switch. + // Setup Transport and Switch. p2pLogger := logger.With("module", "p2p") + transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) sw := createSwitch( config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, stateSyncReactorShim, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, @@ -877,29 +875,30 @@ func (n *Node) OnStart() error { n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr) } - // Start the transport. - addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress)) - if err != nil { - return err - } - if err := n.transport.Listen(*addr); err != nil { - return err - } - - n.isListening = true - + // Start the mempool. if n.config.Mempool.WalEnabled() { - err = n.mempool.InitWAL() + err := n.mempool.InitWAL() if err != nil { return fmt.Errorf("init mempool WAL: %w", err) } } // Start the switch (the P2P server). - err = n.sw.Start() + err := n.sw.Start() + if err != nil { + return err + } + + // Start the transport. + addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress)) if err != nil { return err } + if err := n.transport.Listen(addr.Endpoint()); err != nil { + return err + } + + n.isListening = true // Start the real state sync reactor separately since the switch uses the shim. if err := n.stateSyncReactor.Start(); err != nil { diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index b5290116d..e597b4c63 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -848,7 +848,7 @@ func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) { } // Handles incoming PacketMsgs. It returns a message bytes if message is -// complete. NOTE message bytes may change on next call to recvPacketMsg. +// complete, which is owned by the caller and will not be modified. // Not goroutine-safe func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet) @@ -859,12 +859,7 @@ func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { ch.recving = append(ch.recving, packet.Data...) if packet.EOF { msgBytes := ch.recving - - // clear the slice without re-allocating. - // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go - // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes, - // at which point the recving slice stops being used and should be garbage collected - ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity) + ch.recving = make([]byte, 0, ch.desc.RecvBufferCapacity) return msgBytes, nil } return nil, nil diff --git a/p2p/netaddress.go b/p2p/netaddress.go index 77209217b..c14c3753a 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -281,6 +281,16 @@ func (na *NetAddress) HasID() bool { return string(na.ID) != "" } +// Endpoint converts the address to an MConnection endpoint. +func (na *NetAddress) Endpoint() Endpoint { + return Endpoint{ + Protocol: MConnProtocol, + PeerID: na.ID, + IP: na.IP, + Port: na.Port, + } +} + // Local returns true if it is a local address. func (na *NetAddress) Local() bool { return na.IP.IsLoopback() || zero4.Contains(na.IP) diff --git a/p2p/node_info.go b/p2p/node_info.go index 8acc23676..511716afb 100644 --- a/p2p/node_info.go +++ b/p2p/node_info.go @@ -245,7 +245,7 @@ func (info DefaultNodeInfo) ToProto() *tmp2p.DefaultNodeInfo { return dni } -func DefaultNodeInfoFromToProto(pb *tmp2p.DefaultNodeInfo) (DefaultNodeInfo, error) { +func DefaultNodeInfoFromProto(pb *tmp2p.DefaultNodeInfo) (DefaultNodeInfo, error) { if pb == nil { return DefaultNodeInfo{}, errors.New("nil node info") } diff --git a/p2p/peer.go b/p2p/peer.go index 43a1e7c9c..3ef8240c5 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -4,7 +4,9 @@ import ( "bytes" "encoding/hex" "fmt" + "io" "net" + "runtime/debug" "strings" "sync" "time" @@ -178,52 +180,28 @@ type Peer interface { type peerConn struct { outbound bool persistent bool - conn net.Conn // source connection - - socketAddr *NetAddress - - // cached RemoteIP() - ip net.IP + conn Connection + ip net.IP // cached RemoteIP() } -func newPeerConn( - outbound, persistent bool, - conn net.Conn, - socketAddr *NetAddress, -) peerConn { - +func newPeerConn(outbound, persistent bool, conn Connection) peerConn { return peerConn{ outbound: outbound, persistent: persistent, conn: conn, - socketAddr: socketAddr, } } // ID only exists for SecretConnection. -// NOTE: Will panic if conn is not *SecretConnection. func (pc peerConn) ID() ID { - return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey()) + return PubKeyToID(pc.conn.PubKey()) } // Return the IP from the connection RemoteAddr func (pc peerConn) RemoteIP() net.IP { - if pc.ip != nil { - return pc.ip - } - - host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String()) - if err != nil { - panic(err) - } - - ips, err := net.LookupIP(host) - if err != nil { - panic(err) + if pc.ip == nil { + pc.ip = pc.conn.RemoteEndpoint().IP } - - pc.ip = ips[0] - return pc.ip } @@ -235,13 +213,14 @@ type peer struct { // raw peerConn and the multiplex connection peerConn - mconn *tmconn.MConnection // peer's node info and the channel it knows about // channels = nodeInfo.Channels // cached to avoid copying nodeInfo in hasChannel - nodeInfo NodeInfo - channels []byte + nodeInfo NodeInfo + channels []byte + reactors map[byte]Reactor + onPeerError func(Peer, interface{}) // User data Data *cmap.CMap @@ -254,30 +233,22 @@ type PeerOption func(*peer) func newPeer( pc peerConn, - mConfig tmconn.MConnConfig, - nodeInfo NodeInfo, reactorsByCh map[byte]Reactor, - chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), options ...PeerOption, ) *peer { + nodeInfo := pc.conn.NodeInfo() p := &peer{ peerConn: pc, nodeInfo: nodeInfo, - channels: nodeInfo.(DefaultNodeInfo).Channels, // TODO + channels: nodeInfo.Channels, // TODO + reactors: reactorsByCh, + onPeerError: onPeerError, Data: cmap.NewCMap(), metricsTicker: time.NewTicker(metricsTickerDuration), metrics: NopMetrics(), } - p.mconn = createMConnection( - pc.conn, - p, - reactorsByCh, - chDescs, - onPeerError, - mConfig, - ) p.BaseService = *service.NewBaseService(nil, "Peer", p) for _, option := range options { option(p) @@ -286,13 +257,18 @@ func newPeer( return p } +// onError calls the peer error callback. +func (p *peer) onError(err interface{}) { + p.onPeerError(p, err) +} + // String representation. func (p *peer) String() string { if p.outbound { - return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID()) + return fmt.Sprintf("Peer{%v %v out}", p.conn, p.ID()) } - return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) + return fmt.Sprintf("Peer{%v %v in}", p.conn, p.ID()) } //--------------------------------------------------- @@ -301,7 +277,6 @@ func (p *peer) String() string { // SetLogger implements BaseService. func (p *peer) SetLogger(l log.Logger) { p.Logger = l - p.mconn.SetLogger(l) } // OnStart implements BaseService. @@ -310,29 +285,53 @@ func (p *peer) OnStart() error { return err } - if err := p.mconn.Start(); err != nil { - return err - } - + go p.processMessages() go p.metricsReporter() + return nil } +// processMessages processes messages received from the connection. +func (p *peer) processMessages() { + defer func() { + if r := recover(); r != nil { + p.Logger.Error("peer message processing panic", "err", r, "stack", string(debug.Stack())) + p.onError(fmt.Errorf("panic during peer message processing: %v", r)) + } + }() + + for { + chID, msg, err := p.conn.ReceiveMessage() + if err != nil { + p.onError(err) + return + } + reactor, ok := p.reactors[chID] + if !ok { + p.onError(fmt.Errorf("unknown channel %v", chID)) + return + } + reactor.Receive(chID, p, msg) + } +} + // FlushStop mimics OnStop but additionally ensures that all successful // .Send() calls will get flushed before closing the connection. // NOTE: it is not safe to call this method more than once. func (p *peer) FlushStop() { p.metricsTicker.Stop() p.BaseService.OnStop() - p.mconn.FlushStop() // stop everything and close the conn + if err := p.conn.FlushClose(); err != nil { + p.Logger.Debug("error while stopping peer", "err", err) + } } // OnStop implements BaseService. func (p *peer) OnStop() { p.metricsTicker.Stop() p.BaseService.OnStop() - if err := p.mconn.Stop(); err != nil { // stop everything and close the conn - p.Logger.Debug("Error while stopping peer", "err", err) + if err := p.conn.Close(); err != nil { + p.Logger.Debug("error while stopping peer", "err", err) } } @@ -364,12 +363,12 @@ func (p *peer) NodeInfo() NodeInfo { // For inbound peers, it's the address returned by the underlying connection // (not what's reported in the peer's NodeInfo). func (p *peer) SocketAddr() *NetAddress { - return p.peerConn.socketAddr + return p.peerConn.conn.RemoteEndpoint().NetAddress() } // Status returns the peer's ConnectionStatus. func (p *peer) Status() tmconn.ConnectionStatus { - return p.mconn.Status() + return p.conn.Status() } // Send msg bytes to the channel identified by chID byte. Returns false if the @@ -382,7 +381,13 @@ func (p *peer) Send(chID byte, msgBytes []byte) bool { } else if !p.hasChannel(chID) { return false } - res := p.mconn.Send(chID, msgBytes) + res, err := p.conn.SendMessage(chID, msgBytes) + if err == io.EOF { + return false + } else if err != nil { + p.onError(err) + return false + } if res { labels := []string{ "peer_id", string(p.ID()), @@ -401,7 +406,13 @@ func (p *peer) TrySend(chID byte, msgBytes []byte) bool { } else if !p.hasChannel(chID) { return false } - res := p.mconn.TrySend(chID, msgBytes) + res, err := p.conn.TrySendMessage(chID, msgBytes) + if err == io.EOF { + return false + } else if err != nil { + p.onError(err) + return false + } if res { labels := []string{ "peer_id", string(p.ID()), @@ -458,15 +469,11 @@ func (pc *peerConn) CloseConn() { // RemoteAddr returns peer's remote network address. func (p *peer) RemoteAddr() net.Addr { - return p.peerConn.conn.RemoteAddr() -} - -// CanSend returns true if the send queue is not full, false otherwise. -func (p *peer) CanSend(chID byte) bool { - if !p.IsRunning() { - return false + endpoint := p.conn.RemoteEndpoint() + return &net.TCPAddr{ + IP: endpoint.IP, + Port: int(endpoint.Port), } - return p.mconn.CanSend(chID) } //--------------------------------------------------- @@ -481,7 +488,7 @@ func (p *peer) metricsReporter() { for { select { case <-p.metricsTicker.C: - status := p.mconn.Status() + status := p.conn.Status() var sendQueueSize float64 for _, chStatus := range status.Channels { sendQueueSize += float64(chStatus.SendQueueSize) @@ -493,43 +500,3 @@ func (p *peer) metricsReporter() { } } } - -//------------------------------------------------------------------ -// helper funcs - -func createMConnection( - conn net.Conn, - p *peer, - reactorsByCh map[byte]Reactor, - chDescs []*tmconn.ChannelDescriptor, - onPeerError func(Peer, interface{}), - config tmconn.MConnConfig, -) *tmconn.MConnection { - - onReceive := func(chID byte, msgBytes []byte) { - reactor := reactorsByCh[chID] - if reactor == nil { - // Note that its ok to panic here as it's caught in the conn._recover, - // which does onPeerError. - panic(fmt.Sprintf("Unknown channel %X", chID)) - } - labels := []string{ - "peer_id", string(p.ID()), - "chID", fmt.Sprintf("%#x", chID), - } - p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) - reactor.Receive(chID, p, msgBytes) - } - - onError := func(r interface{}) { - onPeerError(p, r) - } - - return tmconn.NewMConnectionWithConfig( - conn, - chDescs, - onReceive, - onError, - config, - ) -} diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 77f40d1b3..958d22729 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -114,7 +114,6 @@ func TestPeerSend(t *testing.T) { } }) - assert.True(p.CanSend(testCh)) assert.True(p.Send(testCh, []byte("Asylum"))) } @@ -126,20 +125,17 @@ func createOutboundPeerAndPerformHandshake( chDescs := []*tmconn.ChannelDescriptor{ {ID: testCh, Priority: 1}, } - reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)} pk := ed25519.GenPrivKey() - pc, err := testOutboundPeerConn(addr, config, false, pk) - if err != nil { - return nil, err - } - timeout := 1 * time.Second - ourNodeInfo := testNodeInfo(addr.ID, "host_peer") - peerNodeInfo, err := handshake(pc.conn, timeout, ourNodeInfo) + ourNodeInfo := testNodeInfo(PubKeyToID(pk.PubKey()), "host_peer") + transport := NewMConnTransport(log.TestingLogger(), ourNodeInfo, pk, mConfig) + transport.SetChannelDescriptors(chDescs) + reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)} + pc, err := testOutboundPeerConn(transport, addr, config, false, pk) if err != nil { return nil, err } - p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {}) + p := newPeer(pc, reactorsByCh, func(p Peer, r interface{}) {}) p.SetLogger(log.TestingLogger().With("peer", addr)) return p, nil } @@ -157,6 +153,7 @@ func testDial(addr *NetAddress, cfg *config.P2PConfig) (net.Conn, error) { } func testOutboundPeerConn( + transport *MConnTransport, addr *NetAddress, config *config.P2PConfig, persistent bool, @@ -169,7 +166,7 @@ func testOutboundPeerConn( return pc, fmt.Errorf("error creating peer: %w", err) } - pc, err = testPeerConn(conn, config, true, persistent, ourNodePrivKey, addr) + pc, err = testPeerConn(transport, conn, true, persistent) if err != nil { if cerr := conn.Close(); cerr != nil { return pc, fmt.Errorf("%v: %w", cerr.Error(), err) @@ -227,15 +224,12 @@ func (rp *remotePeer) Stop() { } func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) { + transport := NewMConnTransport(log.TestingLogger(), rp.nodeInfo(), rp.PrivKey, MConnConfig(rp.Config)) conn, err := addr.DialTimeout(1 * time.Second) if err != nil { return nil, err } - pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey) - if err != nil { - return nil, err - } - _, err = handshake(pc.conn, time.Second, rp.nodeInfo()) + _, err = testInboundPeerConn(transport, conn) if err != nil { return nil, err } @@ -243,6 +237,7 @@ func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) { } func (rp *remotePeer) accept() { + transport := NewMConnTransport(log.TestingLogger(), rp.nodeInfo(), rp.PrivKey, MConnConfig(rp.Config)) conns := []net.Conn{} for { @@ -255,14 +250,9 @@ func (rp *remotePeer) accept() { return } - pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey) - if err != nil { - golog.Fatalf("Failed to create a peer: %+v", err) - } - - _, err = handshake(pc.conn, time.Second, rp.nodeInfo()) + _, err = testInboundPeerConn(transport, conn) if err != nil { - golog.Fatalf("Failed to perform handshake: %+v", err) + golog.Printf("Failed to create a peer: %+v", err) } conns = append(conns, conn) diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index fd3c15bc6..a32164628 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -126,7 +126,9 @@ func TestPEXReactorReceive(t *testing.T) { r.RequestAddrs(peer) size := book.Size() - msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}) + na, err := peer.NodeInfo().NetAddress() + require.NoError(t, err) + msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{na.ToProto()}}) r.Receive(PexChannel, peer, msg) assert.Equal(t, size+1, book.Size()) @@ -327,7 +329,7 @@ func TestPEXReactorDoesNotDisconnectFromPersistentPeerInSeedMode(t *testing.T) { assert.Zero(t, sw.Peers().Size()) - peerSwitch := testCreateDefaultPeer(dir, 1) + peerSwitch := testCreatePeerWithConfig(dir, 1, pexRConfig) require.NoError(t, peerSwitch.Start()) t.Cleanup(func() { _ = peerSwitch.Stop() }) @@ -452,7 +454,9 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) { pexR.RequestAddrs(peer) size := book.Size() - msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}) + na, err := peer.NodeInfo().NetAddress() + require.NoError(t, err) + msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{na.ToProto()}}) pexR.Receive(PexChannel, peer, msg) assert.Equal(t, size, book.Size()) @@ -619,12 +623,13 @@ func createReactor(t *testing.T, conf *ReactorConfig) (r *Reactor, book AddrBook } func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch { - sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) + sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { + for _, r := range reactors { + sw.AddReactor(r.String(), r) + } + return sw + }) sw.SetLogger(log.TestingLogger()) - for _, r := range reactors { - sw.AddReactor(r.String(), r) - r.SetSwitch(sw) - } return sw } diff --git a/p2p/switch.go b/p2p/switch.go index 785b7f064..99cc04868 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -1,6 +1,7 @@ package p2p import ( + "context" "fmt" "math" "sync" @@ -92,10 +93,14 @@ type Switch struct { metrics *Metrics } -// NetAddress returns the address the switch is listening on. +// NetAddress returns the first address the switch is listening on, +// or nil if no addresses are found. func (sw *Switch) NetAddress() *NetAddress { - addr := sw.transport.NetAddress() - return &addr + endpoints := sw.transport.Endpoints() + if len(endpoints) == 0 { + return nil + } + return endpoints[0].NetAddress() } // SwitchOption sets an optional parameter on the Switch. @@ -221,6 +226,12 @@ func (sw *Switch) SetNodeKey(nodeKey NodeKey) { // OnStart implements BaseService. It starts all the reactors and peers. func (sw *Switch) OnStart() error { + + // FIXME: Temporary hack to pass channel descriptors to MConn transport, + // since they are not available when it is constructed. This will be + // fixed when we implement the new router abstraction. + sw.transport.SetChannelDescriptors(sw.chDescs) + // Start reactors for _, reactor := range sw.reactors { err := reactor.Start() @@ -246,7 +257,7 @@ func (sw *Switch) OnStop() { sw.Logger.Debug("Switch: Stopping reactors") for _, reactor := range sw.reactors { if err := reactor.Stop(); err != nil { - sw.Logger.Error("error while stopped reactor", "reactor", reactor, "error", err) + sw.Logger.Error("error while stopping reactor", "reactor", reactor, "error", err) } } } @@ -354,7 +365,6 @@ func (sw *Switch) StopPeerGracefully(peer Peer) { } func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { - sw.transport.Cleanup(peer) if err := peer.Stop(); err != nil { sw.Logger.Error("error while stopping peer", "error", err) // TODO: should return error to be handled accordingly } @@ -617,13 +627,7 @@ func (sw *Switch) IsPeerPersistent(na *NetAddress) bool { func (sw *Switch) acceptRoutine() { for { - p, err := sw.transport.Accept(peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - reactorsByCh: sw.reactorsByCh, - metrics: sw.metrics, - isPersistent: sw.IsPeerPersistent, - }) + c, err := sw.transport.Accept(context.Background()) if err != nil { switch err := err.(type) { case ErrRejected: @@ -671,6 +675,20 @@ func (sw *Switch) acceptRoutine() { break } + peerNodeInfo := c.NodeInfo() + isPersistent := false + addr, err := peerNodeInfo.NetAddress() + if err == nil { + isPersistent = sw.IsPeerPersistent(addr) + } + + p := newPeer( + newPeerConn(false, isPersistent, c), + sw.reactorsByCh, + sw.StopPeerForError, + PeerMetrics(sw.metrics), + ) + if !sw.IsPeerUnconditional(p.NodeInfo().ID()) { // Ignore connection if we already have enough peers. _, in, _ := sw.NumPeers() @@ -681,16 +699,14 @@ func (sw *Switch) acceptRoutine() { "have", in, "max", sw.config.MaxNumInboundPeers, ) - - sw.transport.Cleanup(p) - + _ = p.CloseConn() continue } } if err := sw.addPeer(p); err != nil { - sw.transport.Cleanup(p) + _ = p.CloseConn() if p.IsRunning() { _ = p.Stop() } @@ -720,12 +736,11 @@ func (sw *Switch) addOutboundPeerWithConfig( return fmt.Errorf("dial err (peerConfig.DialFail == true)") } - p, err := sw.transport.Dial(*addr, peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - isPersistent: sw.IsPeerPersistent, - reactorsByCh: sw.reactorsByCh, - metrics: sw.metrics, + c, err := sw.transport.Dial(context.Background(), Endpoint{ + Protocol: MConnProtocol, + PeerID: addr.ID, + IP: addr.IP, + Port: addr.Port, }) if err != nil { if e, ok := err.(ErrRejected); ok { @@ -748,8 +763,15 @@ func (sw *Switch) addOutboundPeerWithConfig( return err } + p := newPeer( + newPeerConn(true, sw.IsPeerPersistent(c.RemoteEndpoint().NetAddress()), c), + sw.reactorsByCh, + sw.StopPeerForError, + PeerMetrics(sw.metrics), + ) + if err := sw.addPeer(p); err != nil { - sw.transport.Cleanup(p) + _ = p.CloseConn() if p.IsRunning() { _ = p.Stop() } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 538ccfcc3..01dce5c8b 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -2,6 +2,7 @@ package p2p import ( "bytes" + "context" "errors" "fmt" "io" @@ -28,6 +29,7 @@ import ( var ( cfg *config.P2PConfig + ctx = context.Background() ) func init() { @@ -240,15 +242,15 @@ func TestSwitchPeerFilter(t *testing.T) { rp.Start() t.Cleanup(rp.Stop) - p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - isPersistent: sw.IsPeerPersistent, - reactorsByCh: sw.reactorsByCh, - }) + c, err := sw.transport.Dial(ctx, rp.Addr().Endpoint()) if err != nil { t.Fatal(err) } + p := newPeer( + newPeerConn(true, false, c), + sw.reactorsByCh, + sw.StopPeerForError, + ) err = sw.addPeer(p) if err, ok := err.(ErrRejected); ok { @@ -291,15 +293,15 @@ func TestSwitchPeerFilterTimeout(t *testing.T) { rp.Start() defer rp.Stop() - p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - isPersistent: sw.IsPeerPersistent, - reactorsByCh: sw.reactorsByCh, - }) + c, err := sw.transport.Dial(ctx, rp.Addr().Endpoint()) if err != nil { t.Fatal(err) } + p := newPeer( + newPeerConn(true, false, c), + sw.reactorsByCh, + sw.StopPeerForError, + ) err = sw.addPeer(p) if _, ok := err.(ErrFilterTimeout); !ok { @@ -322,15 +324,15 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) { rp.Start() defer rp.Stop() - p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - isPersistent: sw.IsPeerPersistent, - reactorsByCh: sw.reactorsByCh, - }) + c, err := sw.transport.Dial(ctx, rp.Addr().Endpoint()) if err != nil { t.Fatal(err) } + p := newPeer( + newPeerConn(true, false, c), + sw.reactorsByCh, + sw.StopPeerForError, + ) if err := sw.addPeer(p); err != nil { t.Fatal(err) @@ -372,13 +374,15 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { rp.Start() defer rp.Stop() - p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - isPersistent: sw.IsPeerPersistent, - reactorsByCh: sw.reactorsByCh, - }) - require.Nil(err) + c, err := sw.transport.Dial(ctx, rp.Addr().Endpoint()) + if err != nil { + t.Fatal(err) + } + p := newPeer( + newPeerConn(true, false, c), + sw.reactorsByCh, + sw.StopPeerForError, + ) err = sw.addPeer(p) require.Nil(err) @@ -386,7 +390,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { require.NotNil(sw.Peers().Get(rp.ID())) // simulate failure by closing connection - err = p.(*peer).CloseConn() + err = p.CloseConn() require.NoError(err) assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond) @@ -677,19 +681,16 @@ type errorTransport struct { acceptErr error } -func (et errorTransport) NetAddress() NetAddress { - panic("not implemented") -} - -func (et errorTransport) Accept(c peerConfig) (Peer, error) { +func (et errorTransport) Accept(context.Context) (Connection, error) { return nil, et.acceptErr } -func (errorTransport) Dial(NetAddress, peerConfig) (Peer, error) { - panic("not implemented") -} -func (errorTransport) Cleanup(Peer) { +func (errorTransport) Dial(context.Context, Endpoint) (Connection, error) { panic("not implemented") } +func (errorTransport) Close() error { panic("not implemented") } +func (errorTransport) FlushClose() error { panic("not implemented") } +func (errorTransport) Endpoints() []Endpoint { panic("not implemented") } +func (errorTransport) SetChannelDescriptors([]*ChannelDescriptor) {} func TestSwitchAcceptRoutineErrorCases(t *testing.T) { sw := NewSwitch(cfg, errorTransport{ErrFilterTimeout{}}) @@ -728,6 +729,10 @@ type mockReactor struct { initCalledBeforeRemoveFinished uint32 } +func (r *mockReactor) GetChannels() []*ChannelDescriptor { + return []*ChannelDescriptor{{ID: testCh, Priority: 10}} +} + func (r *mockReactor) RemovePeer(peer Peer, reason interface{}) { atomic.StoreUint32(&r.removePeerInProgress, 1) defer atomic.StoreUint32(&r.removePeerInProgress, 0) diff --git a/p2p/test_util.go b/p2p/test_util.go index 9ae42a599..d24cf896a 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -3,9 +3,7 @@ package p2p import ( "fmt" "net" - "time" - "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/libs/log" tmnet "github.com/tendermint/tendermint/libs/net" tmrand "github.com/tendermint/tendermint/libs/rand" @@ -34,12 +32,8 @@ func AddPeerToSwitchPeerSet(sw *Switch, peer Peer) { func CreateRandomPeer(outbound bool) Peer { addr, netAddr := CreateRoutableAddr() p := &peer{ - peerConn: peerConn{ - outbound: outbound, - socketAddr: netAddr, - }, + peerConn: peerConn{outbound: outbound}, nodeInfo: mockNodeInfo{netAddr}, - mconn: &conn.MConnection{}, metrics: NopMetrics(), } p.SetLogger(log.TestingLogger().With("peer", addr)) @@ -127,15 +121,7 @@ func Connect2Switches(switches []*Switch, i, j int) { } func (sw *Switch) addPeerWithConnection(conn net.Conn) error { - pc, err := testInboundPeerConn(conn, sw.config, sw.nodeKey.PrivKey) - if err != nil { - if err := conn.Close(); err != nil { - sw.Logger.Error("Error closing connection", "err", err) - } - return err - } - - ni, err := handshake(conn, time.Second, sw.nodeInfo) + pc, err := testInboundPeerConn(sw.transport.(*MConnTransport), conn) if err != nil { if err := conn.Close(); err != nil { sw.Logger.Error("Error closing connection", "err", err) @@ -145,10 +131,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { p := newPeer( pc, - MConnConfig(sw.config), - ni, sw.reactorsByCh, - sw.chDescs, sw.StopPeerForError, ) @@ -189,9 +172,10 @@ func MakeSwitch( panic(err) } - t := NewMultiplexTransport(nodeInfo, nodeKey, MConnConfig(cfg)) + logger := log.TestingLogger().With("switch", i) + t := NewMConnTransport(logger, nodeInfo, nodeKey.PrivKey, MConnConfig(cfg)) - if err := t.Listen(*addr); err != nil { + if err := t.Listen(addr.Endpoint()); err != nil { panic(err) } @@ -201,6 +185,7 @@ func MakeSwitch( sw.SetNodeKey(nodeKey) ni := nodeInfo.(DefaultNodeInfo) + ni.Channels = []byte{} for ch := range sw.reactorsByCh { ni.Channels = append(ni.Channels, ch) } @@ -208,37 +193,31 @@ func MakeSwitch( // TODO: We need to setup reactors ahead of time so the NodeInfo is properly // populated and we don't have to do those awkward overrides and setters. - t.nodeInfo = nodeInfo + t.nodeInfo = nodeInfo.(DefaultNodeInfo) sw.SetNodeInfo(nodeInfo) return sw } func testInboundPeerConn( + transport *MConnTransport, conn net.Conn, - config *config.P2PConfig, - ourNodePrivKey crypto.PrivKey, ) (peerConn, error) { - return testPeerConn(conn, config, false, false, ourNodePrivKey, nil) + return testPeerConn(transport, conn, false, false) } func testPeerConn( + transport *MConnTransport, rawConn net.Conn, - cfg *config.P2PConfig, outbound, persistent bool, - ourNodePrivKey crypto.PrivKey, - socketAddr *NetAddress, ) (pc peerConn, err error) { - conn := rawConn - // Encrypt connection - conn, err = upgradeSecretConn(conn, cfg.HandshakeTimeout, ourNodePrivKey) + conn, err := newMConnConnection(transport, rawConn, "") if err != nil { return pc, fmt.Errorf("error creating peer: %w", err) } - // Only the information we already have - return newPeerConn(outbound, persistent, conn, socketAddr), nil + return newPeerConn(outbound, persistent, conn), nil } //---------------------------------------------------------------- diff --git a/p2p/transport.go b/p2p/transport.go index e597ac0a1..bbb607700 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -2,593 +2,161 @@ package p2p import ( "context" + "errors" "fmt" "net" - "time" - - "golang.org/x/net/netutil" + "net/url" "github.com/tendermint/tendermint/crypto" - "github.com/tendermint/tendermint/libs/protoio" "github.com/tendermint/tendermint/p2p/conn" - tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p" -) - -const ( - defaultDialTimeout = time.Second - defaultFilterTimeout = 5 * time.Second - defaultHandshakeTimeout = 3 * time.Second ) -// IPResolver is a behaviour subset of net.Resolver. -type IPResolver interface { - LookupIPAddr(context.Context, string) ([]net.IPAddr, error) -} - -// accept is the container to carry the upgraded connection and NodeInfo from an -// asynchronously running routine to the Accept method. -type accept struct { - netAddr *NetAddress - conn net.Conn - nodeInfo NodeInfo - err error -} - -// peerConfig is used to bundle data we need to fully setup a Peer with an -// MConn, provided by the caller of Accept and Dial (currently the Switch). This -// a temporary measure until reactor setup is less dynamic and we introduce the -// concept of PeerBehaviour to communicate about significant Peer lifecycle -// events. -// TODO(xla): Refactor out with more static Reactor setup and PeerBehaviour. -type peerConfig struct { - chDescs []*conn.ChannelDescriptor - onPeerError func(Peer, interface{}) - outbound bool - // isPersistent allows you to set a function, which, given socket address - // (for outbound peers) OR self-reported address (for inbound peers), tells - // if the peer is persistent or not. - isPersistent func(*NetAddress) bool - reactorsByCh map[byte]Reactor - metrics *Metrics -} - -// Transport emits and connects to Peers. The implementation of Peer is left to -// the transport. Each transport is also responsible to filter establishing -// peers specific to its domain. +// Transport is an arbitrary mechanism for exchanging bytes with a peer. type Transport interface { - // Listening address. - NetAddress() NetAddress + // Accept waits for the next inbound connection on a listening endpoint. + Accept(context.Context) (Connection, error) - // Accept returns a newly connected Peer. - Accept(peerConfig) (Peer, error) + // Dial creates an outbound connection to an endpoint. + Dial(context.Context, Endpoint) (Connection, error) - // Dial connects to the Peer for the address. - Dial(NetAddress, peerConfig) (Peer, error) + // Endpoints lists endpoints the transport is listening on. Any endpoint IP + // addresses do not need to be normalized in any way (e.g. 0.0.0.0 is + // valid), as they should be preprocessed before being advertised. + Endpoints() []Endpoint - // Cleanup any resources associated with Peer. - Cleanup(Peer) -} - -// transportLifecycle bundles the methods for callers to control start and stop -// behaviour. -type transportLifecycle interface { + // Close stops accepting new connections, but does not close active connections. Close() error - Listen(NetAddress) error -} - -// ConnFilterFunc to be implemented by filter hooks after a new connection has -// been established. The set of exisiting connections is passed along together -// with all resolved IPs for the new connection. -type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error - -// ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection -// and refuses new ones if they come from a known ip. -func ConnDuplicateIPFilter() ConnFilterFunc { - return func(cs ConnSet, c net.Conn, ips []net.IP) error { - for _, ip := range ips { - if cs.HasIP(ip) { - return ErrRejected{ - conn: c, - err: fmt.Errorf("ip<%v> already connected", ip), - isDuplicate: true, - } - } - } - - return nil - } -} - -// MultiplexTransportOption sets an optional parameter on the -// MultiplexTransport. -type MultiplexTransportOption func(*MultiplexTransport) - -// MultiplexTransportConnFilters sets the filters for rejection new connections. -func MultiplexTransportConnFilters( - filters ...ConnFilterFunc, -) MultiplexTransportOption { - return func(mt *MultiplexTransport) { mt.connFilters = filters } -} - -// MultiplexTransportFilterTimeout sets the timeout waited for filter calls to -// return. -func MultiplexTransportFilterTimeout( - timeout time.Duration, -) MultiplexTransportOption { - return func(mt *MultiplexTransport) { mt.filterTimeout = timeout } -} - -// MultiplexTransportResolver sets the Resolver used for ip lokkups, defaults to -// net.DefaultResolver. -func MultiplexTransportResolver(resolver IPResolver) MultiplexTransportOption { - return func(mt *MultiplexTransport) { mt.resolver = resolver } -} - -// MultiplexTransportMaxIncomingConnections sets the maximum number of -// simultaneous connections (incoming). Default: 0 (unlimited) -func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption { - return func(mt *MultiplexTransport) { mt.maxIncomingConnections = n } -} - -// MultiplexTransport accepts and dials tcp connections and upgrades them to -// multiplexed peers. -type MultiplexTransport struct { - netAddr NetAddress - listener net.Listener - maxIncomingConnections int // see MaxIncomingConnections - - acceptc chan accept - closec chan struct{} - - // Lookup table for duplicate ip and id checks. - conns ConnSet - connFilters []ConnFilterFunc - - dialTimeout time.Duration - filterTimeout time.Duration - handshakeTimeout time.Duration - nodeInfo NodeInfo - nodeKey NodeKey - resolver IPResolver - - // TODO(xla): This config is still needed as we parameterise peerConn and - // peer currently. All relevant configuration should be refactored into options - // with sane defaults. - mConfig conn.MConnConfig -} - -// Test multiplexTransport for interface completeness. -var _ Transport = (*MultiplexTransport)(nil) -var _ transportLifecycle = (*MultiplexTransport)(nil) - -// NewMultiplexTransport returns a tcp connected multiplexed peer. -func NewMultiplexTransport( - nodeInfo NodeInfo, - nodeKey NodeKey, - mConfig conn.MConnConfig, -) *MultiplexTransport { - return &MultiplexTransport{ - acceptc: make(chan accept), - closec: make(chan struct{}), - dialTimeout: defaultDialTimeout, - filterTimeout: defaultFilterTimeout, - handshakeTimeout: defaultHandshakeTimeout, - mConfig: mConfig, - nodeInfo: nodeInfo, - nodeKey: nodeKey, - conns: NewConnSet(), - resolver: net.DefaultResolver, - } -} - -// NetAddress implements Transport. -func (mt *MultiplexTransport) NetAddress() NetAddress { - return mt.netAddr -} - -// Accept implements Transport. -func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) { - select { - // This case should never have any side-effectful/blocking operations to - // ensure that quality peers are ready to be used. - case a := <-mt.acceptc: - if a.err != nil { - return nil, a.err - } - - cfg.outbound = false - - return mt.wrapPeer(a.conn, a.nodeInfo, cfg, a.netAddr), nil - case <-mt.closec: - return nil, ErrTransportClosed{} - } -} - -// Dial implements Transport. -func (mt *MultiplexTransport) Dial( - addr NetAddress, - cfg peerConfig, -) (Peer, error) { - c, err := addr.DialTimeout(mt.dialTimeout) - if err != nil { - return nil, err - } - - // TODO(xla): Evaluate if we should apply filters if we explicitly dial. - if err := mt.filterConn(c); err != nil { - return nil, err - } - - secretConn, nodeInfo, err := mt.upgrade(c, &addr) - if err != nil { - return nil, err - } - - cfg.outbound = true - - p := mt.wrapPeer(secretConn, nodeInfo, cfg, &addr) - - return p, nil -} - -// Close implements transportLifecycle. -func (mt *MultiplexTransport) Close() error { - close(mt.closec) - - if mt.listener != nil { - return mt.listener.Close() - } - - return nil -} - -// Listen implements transportLifecycle. -func (mt *MultiplexTransport) Listen(addr NetAddress) error { - ln, err := net.Listen("tcp", addr.DialString()) - if err != nil { - return err - } - - if mt.maxIncomingConnections > 0 { - ln = netutil.LimitListener(ln, mt.maxIncomingConnections) - } - - mt.netAddr = addr - mt.listener = ln - - go mt.acceptPeers() - - return nil -} - -func (mt *MultiplexTransport) acceptPeers() { - for { - c, err := mt.listener.Accept() - if err != nil { - // If Close() has been called, silently exit. - select { - case _, ok := <-mt.closec: - if !ok { - return - } - default: - // Transport is not closed - } - - mt.acceptc <- accept{err: err} - return - } - // Connection upgrade and filtering should be asynchronous to avoid - // Head-of-line blocking[0]. - // Reference: https://github.com/tendermint/tendermint/issues/2047 - // - // [0] https://en.wikipedia.org/wiki/Head-of-line_blocking - go func(c net.Conn) { - defer func() { - if r := recover(); r != nil { - err := ErrRejected{ - conn: c, - err: fmt.Errorf("recovered from panic: %v", r), - isAuthFailure: true, - } - select { - case mt.acceptc <- accept{err: err}: - case <-mt.closec: - // Give up if the transport was closed. - _ = c.Close() - return - } - } - }() - - var ( - nodeInfo NodeInfo - secretConn *conn.SecretConnection - netAddr *NetAddress - ) - - err := mt.filterConn(c) - if err == nil { - secretConn, nodeInfo, err = mt.upgrade(c, nil) - if err == nil { - addr := c.RemoteAddr() - id := PubKeyToID(secretConn.RemotePubKey()) - netAddr = NewNetAddress(id, addr) - } - } - - select { - case mt.acceptc <- accept{netAddr, secretConn, nodeInfo, err}: - // Make the upgraded peer available. - case <-mt.closec: - // Give up if the transport was closed. - _ = c.Close() - return - } - }(c) - } -} - -// Cleanup removes the given address from the connections set and -// closes the connection. -func (mt *MultiplexTransport) Cleanup(p Peer) { - mt.conns.RemoveAddr(p.RemoteAddr()) - _ = p.CloseConn() -} - -func (mt *MultiplexTransport) cleanup(c net.Conn) error { - mt.conns.Remove(c) - - return c.Close() -} - -func (mt *MultiplexTransport) filterConn(c net.Conn) (err error) { - defer func() { - if err != nil { - _ = c.Close() - } - }() - - // Reject if connection is already present. - if mt.conns.Has(c) { - return ErrRejected{conn: c, isDuplicate: true} - } - - // Resolve ips for incoming conn. - ips, err := resolveIPs(mt.resolver, c) - if err != nil { - return err - } - - errc := make(chan error, len(mt.connFilters)) - - for _, f := range mt.connFilters { - go func(f ConnFilterFunc, c net.Conn, ips []net.IP, errc chan<- error) { - errc <- f(mt.conns, c, ips) - }(f, c, ips, errc) - } - - for i := 0; i < cap(errc); i++ { - select { - case err := <-errc: - if err != nil { - return ErrRejected{conn: c, err: err, isFiltered: true} - } - case <-time.After(mt.filterTimeout): - return ErrFilterTimeout{} - } - - } - - mt.conns.Set(c, ips) - - return nil + // SetChannelDescriptors sets the channel descriptors for the transport. + // FIXME: This is only here for compatibility with the current Switch code. + SetChannelDescriptors(chDescs []*conn.ChannelDescriptor) } -func (mt *MultiplexTransport) upgrade( - c net.Conn, - dialedAddr *NetAddress, -) (secretConn *conn.SecretConnection, nodeInfo NodeInfo, err error) { - defer func() { - if err != nil { - _ = mt.cleanup(c) - } - }() - - secretConn, err = upgradeSecretConn(c, mt.handshakeTimeout, mt.nodeKey.PrivKey) - if err != nil { - return nil, nil, ErrRejected{ - conn: c, - err: fmt.Errorf("secret conn failed: %v", err), - isAuthFailure: true, - } - } - - // For outgoing conns, ensure connection key matches dialed key. - connID := PubKeyToID(secretConn.RemotePubKey()) - if dialedAddr != nil { - if dialedID := dialedAddr.ID; connID != dialedID { - return nil, nil, ErrRejected{ - conn: c, - id: connID, - err: fmt.Errorf( - "conn.ID (%v) dialed ID (%v) mismatch", - connID, - dialedID, - ), - isAuthFailure: true, - } - } - } - - nodeInfo, err = handshake(secretConn, mt.handshakeTimeout, mt.nodeInfo) - if err != nil { - return nil, nil, ErrRejected{ - conn: c, - err: fmt.Errorf("handshake failed: %v", err), - isAuthFailure: true, - } - } - - if err := nodeInfo.Validate(); err != nil { - return nil, nil, ErrRejected{ - conn: c, - err: err, - isNodeInfoInvalid: true, - } - } - - // Ensure connection key matches self reported key. - if connID != nodeInfo.ID() { - return nil, nil, ErrRejected{ - conn: c, - id: connID, - err: fmt.Errorf( - "conn.ID (%v) NodeInfo.ID (%v) mismatch", - connID, - nodeInfo.ID(), - ), - isAuthFailure: true, - } - } - - // Reject self. - if mt.nodeInfo.ID() == nodeInfo.ID() { - return nil, nil, ErrRejected{ - addr: *NewNetAddress(nodeInfo.ID(), c.RemoteAddr()), - conn: c, - id: nodeInfo.ID(), - isSelf: true, - } - } - - if err := mt.nodeInfo.CompatibleWith(nodeInfo); err != nil { - return nil, nil, ErrRejected{ - conn: c, - err: err, - id: nodeInfo.ID(), - isIncompatible: true, - } - } +// Protocol identifies a transport protocol. +type Protocol string - return secretConn, nodeInfo, nil -} +// Endpoint represents a transport connection endpoint, either local or remote. +type Endpoint struct { + // PeerID specifies the peer ID of the endpoint. + // + // FIXME: This is here for backwards-compatibility with the existing MConn + // protocol, we should consider moving this higher in the stack (i.e. to + // the router). + PeerID ID -func (mt *MultiplexTransport) wrapPeer( - c net.Conn, - ni NodeInfo, - cfg peerConfig, - socketAddr *NetAddress, -) Peer { + // Protocol specifies the transport protocol, used by the router to pick a + // transport for an endpoint. + Protocol Protocol - persistent := false - if cfg.isPersistent != nil { - if cfg.outbound { - persistent = cfg.isPersistent(socketAddr) - } else { - selfReportedAddr, err := ni.NetAddress() - if err == nil { - persistent = cfg.isPersistent(selfReportedAddr) - } - } - } + // Path is an optional, arbitrary transport-specific path or identifier. + Path string - peerConn := newPeerConn( - cfg.outbound, - persistent, - c, - socketAddr, - ) + // IP is an IP address (v4 or v6) to connect to. If set, this defines the + // endpoint as a networked endpoint. + IP net.IP - p := newPeer( - peerConn, - mt.mConfig, - ni, - cfg.reactorsByCh, - cfg.chDescs, - cfg.onPeerError, - PeerMetrics(cfg.metrics), - ) - - return p + // Port is a network port (either TCP or UDP). If not set, a default port + // may be used depending on the protocol. + Port uint16 } -func handshake( - c net.Conn, - timeout time.Duration, - nodeInfo NodeInfo, -) (NodeInfo, error) { - if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { - return nil, err +// String formats an endpoint as a URL string. +func (e Endpoint) String() string { + u := url.URL{Scheme: string(e.Protocol)} + if e.PeerID != "" { + u.User = url.User(string(e.PeerID)) } - - var ( - errc = make(chan error, 2) - - pbpeerNodeInfo tmp2p.DefaultNodeInfo - peerNodeInfo DefaultNodeInfo - ourNodeInfo = nodeInfo.(DefaultNodeInfo) - ) - - go func(errc chan<- error, c net.Conn) { - _, err := protoio.NewDelimitedWriter(c).WriteMsg(ourNodeInfo.ToProto()) - errc <- err - }(errc, c) - go func(errc chan<- error, c net.Conn) { - protoReader := protoio.NewDelimitedReader(c, MaxNodeInfoSize()) - err := protoReader.ReadMsg(&pbpeerNodeInfo) - errc <- err - }(errc, c) - - for i := 0; i < cap(errc); i++ { - err := <-errc - if err != nil { - return nil, err + if len(e.IP) > 0 { + u.Host = e.IP.String() + if e.Port > 0 { + u.Host += fmt.Sprintf(":%v", e.Port) } + } else if e.Path != "" { + u.Opaque = e.Path + } + return u.String() +} + +// Validate validates an endpoint. +func (e Endpoint) Validate() error { + switch { + case e.PeerID == "": + return errors.New("endpoint has no peer ID") + case e.Protocol == "": + return errors.New("endpoint has no protocol") + case len(e.IP) == 0 && len(e.Path) == 0: + return errors.New("endpoint must have either IP or path") + case e.Port > 0 && len(e.IP) == 0: + return fmt.Errorf("endpoint has port %v but no IP", e.Port) + default: + return nil } - - peerNodeInfo, err := DefaultNodeInfoFromToProto(&pbpeerNodeInfo) - if err != nil { - return nil, err - } - - return peerNodeInfo, c.SetDeadline(time.Time{}) -} - -func upgradeSecretConn( - c net.Conn, - timeout time.Duration, - privKey crypto.PrivKey, -) (*conn.SecretConnection, error) { - if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { - return nil, err - } - - sc, err := conn.MakeSecretConnection(c, privKey) - if err != nil { - return nil, err - } - - return sc, sc.SetDeadline(time.Time{}) } -func resolveIPs(resolver IPResolver, c net.Conn) ([]net.IP, error) { - host, _, err := net.SplitHostPort(c.RemoteAddr().String()) - if err != nil { - return nil, err - } - - addrs, err := resolver.LookupIPAddr(context.Background(), host) - if err != nil { - return nil, err - } - - ips := []net.IP{} - - for _, addr := range addrs { - ips = append(ips, addr.IP) - } +// NetAddress returns a NetAddress for the endpoint. +// FIXME: This is temporary for compatibility with the old P2P stack. +func (e Endpoint) NetAddress() *NetAddress { + return &NetAddress{ + ID: e.PeerID, + IP: e.IP, + Port: e.Port, + } +} + +// Connection represents an established connection between two endpoints. +// +// FIXME: This is a temporary interface while we figure out whether we'll be +// adopting QUIC or not. If we do, this should be a byte-oriented multi-stream +// interface with one goroutine consuming each stream, and the MConnection +// transport either needs protocol changes or a shim. For details, see: +// https://github.com/tendermint/spec/pull/227 +// +// FIXME: The interface is currently very broad in order to accommodate +// MConnection behavior that the rest of the P2P stack relies on. This should be +// removed once the P2P core is rewritten. +type Connection interface { + // ReceiveMessage returns the next message received on the connection, + // blocking until one is available. io.EOF is returned when closed. + ReceiveMessage() (chID byte, msg []byte, err error) + + // SendMessage sends a message on the connection. + // FIXME: For compatibility with the current Peer, it returns an additional + // boolean false if the message timed out waiting to be accepted into the + // send buffer. + SendMessage(chID byte, msg []byte) (bool, error) + + // TrySendMessage is a non-blocking version of SendMessage that returns + // immediately if the message buffer is full. It returns true if the message + // was accepted. + // + // FIXME: This is here for backwards-compatibility with the current Peer + // code, and should be removed when possible. + TrySendMessage(chID byte, msg []byte) (bool, error) + + // LocalEndpoint returns the local endpoint for the connection. + LocalEndpoint() Endpoint + + // RemoteEndpoint returns the remote endpoint for the connection. + RemoteEndpoint() Endpoint + + // PubKey returns the remote peer's public key. + PubKey() crypto.PubKey + + // NodeInfo returns the remote peer's node info. + NodeInfo() DefaultNodeInfo + + // Close closes the connection. + Close() error - return ips, nil + // FlushClose flushes all pending sends and then closes the connection. + // + // FIXME: This only exists for backwards-compatibility with the current + // MConnection implementation. There should really be a separate Flush() + // method, but there is no easy way to synchronously flush pending data with + // the current MConnection structure. + FlushClose() error + + // Status returns the current connection status. + // FIXME: Only here for compatibility with the current Peer code. + Status() conn.ConnectionStatus } diff --git a/p2p/transport_mconn.go b/p2p/transport_mconn.go new file mode 100644 index 000000000..24ed4db3d --- /dev/null +++ b/p2p/transport_mconn.go @@ -0,0 +1,675 @@ +package p2p + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "sync" + "time" + + "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/protoio" + "github.com/tendermint/tendermint/p2p/conn" + p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p" + + "golang.org/x/net/netutil" +) + +const ( + defaultDialTimeout = time.Second + defaultFilterTimeout = 5 * time.Second + defaultHandshakeTimeout = 3 * time.Second +) + +// MConnProtocol is the MConn protocol identifier. +const MConnProtocol Protocol = "mconn" + +// MConnTransportOption sets an option for MConnTransport. +type MConnTransportOption func(*MConnTransport) + +// MConnTransportMaxIncomingConnections sets the maximum number of +// simultaneous incoming connections. Default: 0 (unlimited) +func MConnTransportMaxIncomingConnections(max int) MConnTransportOption { + return func(mt *MConnTransport) { mt.maxIncomingConnections = max } +} + +// MConnTransportFilterTimeout sets the timeout for filter callbacks. +func MConnTransportFilterTimeout(timeout time.Duration) MConnTransportOption { + return func(mt *MConnTransport) { mt.filterTimeout = timeout } +} + +// MConnTransportConnFilters sets connection filters. +func MConnTransportConnFilters(filters ...ConnFilterFunc) MConnTransportOption { + return func(mt *MConnTransport) { mt.connFilters = filters } +} + +// ConnFilterFunc is a callback for connection filtering. If it returns an +// error, the connection is rejected. The set of existing connections is passed +// along with the new connection and all resolved IPs. +type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error + +// ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection +// and refuses new ones if they come from a known ip. +var ConnDuplicateIPFilter ConnFilterFunc = func(cs ConnSet, c net.Conn, ips []net.IP) error { + for _, ip := range ips { + if cs.HasIP(ip) { + return ErrRejected{ + conn: c, + err: fmt.Errorf("ip<%v> already connected", ip), + isDuplicate: true, + } + } + } + return nil +} + +// MConnTransport is a Transport implementation using the current multiplexed +// Tendermint protocol ("MConn"). It inherits lots of code and logic from the +// previous implementation for parity with the current P2P stack (such as +// connection filtering, peer verification, and panic handling), which should be +// moved out of the transport once the rest of the P2P stack is rewritten. +type MConnTransport struct { + privKey crypto.PrivKey + nodeInfo DefaultNodeInfo + channelDescs []*ChannelDescriptor + mConnConfig conn.MConnConfig + + maxIncomingConnections int + dialTimeout time.Duration + handshakeTimeout time.Duration + filterTimeout time.Duration + + logger log.Logger + listener net.Listener + + closeOnce sync.Once + chAccept chan *mConnConnection + chError chan error + chClose chan struct{} + + // FIXME: This is a vestige from the old transport, and should be managed + // by the router once we rewrite the P2P core. + conns ConnSet + connFilters []ConnFilterFunc +} + +// NewMConnTransport sets up a new MConn transport. +func NewMConnTransport( + logger log.Logger, + nodeInfo NodeInfo, // FIXME: should use DefaultNodeInfo, left for code compatibility + privKey crypto.PrivKey, + mConnConfig conn.MConnConfig, + opts ...MConnTransportOption, +) *MConnTransport { + m := &MConnTransport{ + privKey: privKey, + nodeInfo: nodeInfo.(DefaultNodeInfo), + mConnConfig: mConnConfig, + channelDescs: []*ChannelDescriptor{}, + + dialTimeout: defaultDialTimeout, + handshakeTimeout: defaultHandshakeTimeout, + filterTimeout: defaultFilterTimeout, + + logger: logger, + chAccept: make(chan *mConnConnection), + chError: make(chan error), + chClose: make(chan struct{}), + + conns: NewConnSet(), + connFilters: []ConnFilterFunc{}, + } + for _, opt := range opts { + opt(m) + } + return m +} + +// SetChannelDescriptors implements Transport. +func (m *MConnTransport) SetChannelDescriptors(chDescs []*conn.ChannelDescriptor) { + m.channelDescs = chDescs +} + +// Listen asynchronously listens for inbound connections on the given endpoint. +// It must be called exactly once before calling Accept(), and the caller must +// call Close() to shut down the listener. +func (m *MConnTransport) Listen(endpoint Endpoint) error { + if m.listener != nil { + return errors.New("MConn transport is already listening") + } + err := m.normalizeEndpoint(&endpoint) + if err != nil { + return fmt.Errorf("invalid MConn listen endpoint %q: %w", endpoint, err) + } + + m.listener, err = net.Listen("tcp", fmt.Sprintf("%v:%v", endpoint.IP, endpoint.Port)) + if err != nil { + return err + } + if m.maxIncomingConnections > 0 { + m.listener = netutil.LimitListener(m.listener, m.maxIncomingConnections) + } + + // Spawn a goroutine to accept inbound connections asynchronously. + go m.accept() + + return nil +} + +// accept accepts inbound connections in a loop, and asynchronously handshakes +// with the peer to avoid head-of-line blocking. Established connections are +// passed to Accept() via chAccept. +// See: https://github.com/tendermint/tendermint/issues/204 +func (m *MConnTransport) accept() { + for { + tcpConn, err := m.listener.Accept() + if err != nil { + // We have to check for closure first, since we don't want to + // propagate "use of closed network connection" errors. + select { + case <-m.chClose: + default: + // We also select on chClose here, in case the transport closes + // while we're blocked on error propagation. + select { + case m.chError <- err: + case <-m.chClose: + } + } + return + } + + go func() { + err := m.filterTCPConn(tcpConn) + if err != nil { + if err := tcpConn.Close(); err != nil { + m.logger.Debug("failed to close TCP connection", "err", err) + } + select { + case m.chError <- err: + case <-m.chClose: + } + return + } + + conn, err := newMConnConnection(m, tcpConn, "") + if err != nil { + m.conns.Remove(tcpConn) + if err := tcpConn.Close(); err != nil { + m.logger.Debug("failed to close TCP connection", "err", err) + } + select { + case m.chError <- err: + case <-m.chClose: + } + } else { + select { + case m.chAccept <- conn: + case <-m.chClose: + if err := tcpConn.Close(); err != nil { + m.logger.Debug("failed to close TCP connection", "err", err) + } + } + } + }() + } +} + +// Accept implements Transport. +// +// accept() runs a concurrent accept loop that accepts inbound connections +// and then handshakes in a non-blocking fashion. The handshaked and validated +// connections are returned via this call, picking them off of the chAccept +// channel (or the handshake error, if any). +func (m *MConnTransport) Accept(ctx context.Context) (Connection, error) { + select { + case conn := <-m.chAccept: + return conn, nil + case err := <-m.chError: + return nil, err + case <-m.chClose: + return nil, ErrTransportClosed{} + case <-ctx.Done(): + return nil, nil + } +} + +// Dial implements Transport. +func (m *MConnTransport) Dial(ctx context.Context, endpoint Endpoint) (Connection, error) { + err := m.normalizeEndpoint(&endpoint) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(ctx, m.dialTimeout) + defer cancel() + + dialer := net.Dialer{} + tcpConn, err := dialer.DialContext(ctx, "tcp", fmt.Sprintf("%v:%v", endpoint.IP, endpoint.Port)) + if err != nil { + return nil, err + } + + err = m.filterTCPConn(tcpConn) + if err != nil { + if err := tcpConn.Close(); err != nil { + m.logger.Debug("failed to close TCP connection", "err", err) + } + return nil, err + } + + conn, err := newMConnConnection(m, tcpConn, endpoint.PeerID) + if err != nil { + m.conns.Remove(tcpConn) + if err := tcpConn.Close(); err != nil { + m.logger.Debug("failed to close TCP connection", "err", err) + } + return nil, err + } + + return conn, nil +} + +// Endpoints implements Transport. +func (m *MConnTransport) Endpoints() []Endpoint { + if m.listener == nil { + return []Endpoint{} + } + addr := m.listener.Addr().(*net.TCPAddr) + return []Endpoint{{ + Protocol: MConnProtocol, + PeerID: m.nodeInfo.ID(), + IP: addr.IP, + Port: uint16(addr.Port), + }} +} + +// Close implements Transport. +func (m *MConnTransport) Close() error { + var err error + m.closeOnce.Do(func() { + // We have to close chClose first, so that accept() will detect + // the closure and not propagate the error. + close(m.chClose) + if m.listener != nil { + err = m.listener.Close() + } + }) + return err +} + +// filterTCPConn filters a TCP connection, rejecting it if this function errors. +func (m *MConnTransport) filterTCPConn(tcpConn net.Conn) error { + if m.conns.Has(tcpConn) { + return ErrRejected{conn: tcpConn, isDuplicate: true} + } + + host, _, err := net.SplitHostPort(tcpConn.RemoteAddr().String()) + if err != nil { + return err + } + ip := net.ParseIP(host) + if ip == nil { + return fmt.Errorf("connection address has invalid IP address %q", host) + } + + // Apply filter callbacks. + chErr := make(chan error, len(m.connFilters)) + for _, connFilter := range m.connFilters { + go func(connFilter ConnFilterFunc) { + chErr <- connFilter(m.conns, tcpConn, []net.IP{ip}) + }(connFilter) + } + + for i := 0; i < cap(chErr); i++ { + select { + case err := <-chErr: + if err != nil { + return ErrRejected{conn: tcpConn, err: err, isFiltered: true} + } + case <-time.After(m.filterTimeout): + return ErrFilterTimeout{} + } + + } + + // FIXME: Doesn't really make sense to set this here, but we preserve the + // behavior from the previous P2P transport implementation. This should + // be moved to the router. + m.conns.Set(tcpConn, []net.IP{ip}) + return nil +} + +// normalizeEndpoint normalizes and validates an endpoint. +func (m *MConnTransport) normalizeEndpoint(endpoint *Endpoint) error { + if endpoint == nil { + return errors.New("nil endpoint") + } + if err := endpoint.Validate(); err != nil { + return err + } + if endpoint.Protocol == "" { + endpoint.Protocol = MConnProtocol + } + if endpoint.Protocol != MConnProtocol { + return fmt.Errorf("unsupported protocol %q", endpoint.Protocol) + } + if len(endpoint.IP) == 0 { + return errors.New("endpoint must have an IP address") + } + if endpoint.Path != "" { + return fmt.Errorf("endpoint cannot have path (got %q)", endpoint.Path) + } + if endpoint.Port == 0 { + endpoint.Port = 26657 + } + return nil +} + +// mConnConnection implements Connection for MConnTransport. It takes a base TCP +// connection and upgrades it to MConnection over an encrypted SecretConnection. +type mConnConnection struct { + logger log.Logger + transport *MConnTransport + secretConn *conn.SecretConnection + mConn *conn.MConnection + + peerInfo DefaultNodeInfo + + closeOnce sync.Once + chReceive chan mConnMessage + chError chan error + chClose chan struct{} +} + +// mConnMessage passes MConnection messages through internal channels. +type mConnMessage struct { + channelID byte + payload []byte +} + +// newMConnConnection creates a new mConnConnection by handshaking +// with a peer. +func newMConnConnection( + transport *MConnTransport, + tcpConn net.Conn, + expectPeerID ID, +) (c *mConnConnection, err error) { + // FIXME: Since the MConnection code panics, we need to recover here + // and turn it into an error. Be careful not to alias err, so we can + // update it from within this function. We should remove panics instead. + defer func() { + if r := recover(); r != nil { + err = ErrRejected{ + conn: tcpConn, + err: fmt.Errorf("recovered from panic: %v", r), + isAuthFailure: true, + } + } + }() + + err = tcpConn.SetDeadline(time.Now().Add(transport.handshakeTimeout)) + if err != nil { + err = ErrRejected{ + conn: tcpConn, + err: fmt.Errorf("secret conn failed: %v", err), + isAuthFailure: true, + } + return + } + + c = &mConnConnection{ + transport: transport, + chReceive: make(chan mConnMessage), + chError: make(chan error), + chClose: make(chan struct{}), + } + c.secretConn, err = conn.MakeSecretConnection(tcpConn, transport.privKey) + if err != nil { + err = ErrRejected{ + conn: tcpConn, + err: fmt.Errorf("secret conn failed: %v", err), + isAuthFailure: true, + } + return + } + c.peerInfo, err = c.handshake() + if err != nil { + err = ErrRejected{ + conn: tcpConn, + err: fmt.Errorf("handshake failed: %v", err), + isAuthFailure: true, + } + return + } + + // Validate node info. + // FIXME: All of the ID verification code below should be moved to the + // router once implemented. + err = c.peerInfo.Validate() + if err != nil { + err = ErrRejected{ + conn: tcpConn, + err: err, + isNodeInfoInvalid: true, + } + return + } + + // For outgoing conns, ensure connection key matches dialed key. + if expectPeerID != "" { + peerID := PubKeyToID(c.PubKey()) + if expectPeerID != peerID { + err = ErrRejected{ + conn: tcpConn, + id: peerID, + err: fmt.Errorf( + "conn.ID (%v) dialed ID (%v) mismatch", + peerID, + expectPeerID, + ), + isAuthFailure: true, + } + return + } + } + + // Reject self. + if transport.nodeInfo.ID() == c.peerInfo.ID() { + err = ErrRejected{ + addr: *NewNetAddress(c.peerInfo.ID(), c.secretConn.RemoteAddr()), + conn: tcpConn, + id: c.peerInfo.ID(), + isSelf: true, + } + return + } + + err = transport.nodeInfo.CompatibleWith(c.peerInfo) + if err != nil { + err = ErrRejected{ + conn: tcpConn, + err: err, + id: c.peerInfo.ID(), + isIncompatible: true, + } + return + } + + err = tcpConn.SetDeadline(time.Time{}) + if err != nil { + err = ErrRejected{ + conn: tcpConn, + err: fmt.Errorf("secret conn failed: %v", err), + isAuthFailure: true, + } + return + } + + // Set up the MConnection wrapper + c.mConn = conn.NewMConnectionWithConfig( + c.secretConn, + transport.channelDescs, + c.onReceive, + c.onError, + transport.mConnConfig, + ) + // FIXME: Log format is set up for compatibility with existing peer code. + c.logger = transport.logger.With("peer", c.RemoteEndpoint().NetAddress()) + c.mConn.SetLogger(c.logger) + err = c.mConn.Start() + return c, err +} + +// handshake performs an MConn handshake, returning the peer's node info. +func (c *mConnConnection) handshake() (DefaultNodeInfo, error) { + var pbNodeInfo p2pproto.DefaultNodeInfo + chErr := make(chan error, 2) + go func() { + _, err := protoio.NewDelimitedWriter(c.secretConn).WriteMsg(c.transport.nodeInfo.ToProto()) + chErr <- err + }() + go func() { + chErr <- protoio.NewDelimitedReader(c.secretConn, MaxNodeInfoSize()).ReadMsg(&pbNodeInfo) + }() + for i := 0; i < cap(chErr); i++ { + if err := <-chErr; err != nil { + return DefaultNodeInfo{}, err + } + } + + return DefaultNodeInfoFromProto(&pbNodeInfo) +} + +// onReceive is a callback for MConnection received messages. +func (c *mConnConnection) onReceive(channelID byte, payload []byte) { + select { + case c.chReceive <- mConnMessage{channelID: channelID, payload: payload}: + case <-c.chClose: + } +} + +// onError is a callback for MConnection errors. The error is passed to +// chError, which is only consumed by ReceiveMessage() for parity with +// the old MConnection behavior. +func (c *mConnConnection) onError(e interface{}) { + err, ok := e.(error) + if !ok { + err = fmt.Errorf("%v", err) + } + select { + case c.chError <- err: + case <-c.chClose: + } +} + +// String displays connection information. +// FIXME: This is here for backwards compatibility with existing code, +// it should probably just return RemoteEndpoint().String(), if anything. +func (c *mConnConnection) String() string { + endpoint := c.RemoteEndpoint() + return fmt.Sprintf("MConn{%v:%v}", endpoint.IP, endpoint.Port) +} + +// SendMessage implements Connection. +func (c *mConnConnection) SendMessage(channelID byte, msg []byte) (bool, error) { + // We don't check chError here, to preserve old MConnection behavior. + select { + case <-c.chClose: + return false, io.EOF + default: + return c.mConn.Send(channelID, msg), nil + } +} + +// TrySendMessage implements Connection. +func (c *mConnConnection) TrySendMessage(channelID byte, msg []byte) (bool, error) { + // We don't check chError here, to preserve old MConnection behavior. + select { + case <-c.chClose: + return false, io.EOF + default: + return c.mConn.TrySend(channelID, msg), nil + } +} + +// ReceiveMessage implements Connection. +func (c *mConnConnection) ReceiveMessage() (byte, []byte, error) { + select { + case err := <-c.chError: + return 0, nil, err + case <-c.chClose: + return 0, nil, io.EOF + case msg := <-c.chReceive: + return msg.channelID, msg.payload, nil + } +} + +// NodeInfo implements Connection. +func (c *mConnConnection) NodeInfo() DefaultNodeInfo { + return c.peerInfo +} + +// PubKey implements Connection. +func (c *mConnConnection) PubKey() crypto.PubKey { + return c.secretConn.RemotePubKey() +} + +// LocalEndpoint implements Connection. +func (c *mConnConnection) LocalEndpoint() Endpoint { + // FIXME: For compatibility with existing P2P tests we need to + // handle non-TCP connections. This should be removed. + endpoint := Endpoint{ + Protocol: MConnProtocol, + PeerID: c.transport.nodeInfo.ID(), + } + if addr, ok := c.secretConn.LocalAddr().(*net.TCPAddr); ok { + endpoint.IP = addr.IP + endpoint.Port = uint16(addr.Port) + } + return endpoint +} + +// RemoteEndpoint implements Connection. +func (c *mConnConnection) RemoteEndpoint() Endpoint { + // FIXME: For compatibility with existing P2P tests we need to + // handle non-TCP connections. This should be removed. + endpoint := Endpoint{ + Protocol: MConnProtocol, + PeerID: c.peerInfo.ID(), + } + if addr, ok := c.secretConn.RemoteAddr().(*net.TCPAddr); ok { + endpoint.IP = addr.IP + endpoint.Port = uint16(addr.Port) + } + return endpoint +} + +// Status implements Connection. +func (c *mConnConnection) Status() conn.ConnectionStatus { + return c.mConn.Status() +} + +// Close implements Connection. +func (c *mConnConnection) Close() error { + c.transport.conns.RemoveAddr(c.secretConn.RemoteAddr()) + var err error + c.closeOnce.Do(func() { + err = c.mConn.Stop() + close(c.chClose) + }) + return err +} + +// FlushClose implements Connection. +func (c *mConnConnection) FlushClose() error { + c.transport.conns.RemoveAddr(c.secretConn.RemoteAddr()) + c.closeOnce.Do(func() { + c.mConn.FlushStop() + close(c.chClose) + }) + return nil +} diff --git a/p2p/transport_test.go b/p2p/transport_test.go deleted file mode 100644 index 349291f47..000000000 --- a/p2p/transport_test.go +++ /dev/null @@ -1,696 +0,0 @@ -package p2p - -import ( - "fmt" - "math/rand" - "net" - "reflect" - "runtime" - "strings" - "testing" - "time" - - "github.com/tendermint/tendermint/crypto/ed25519" - "github.com/tendermint/tendermint/libs/protoio" - "github.com/tendermint/tendermint/p2p/conn" - tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p" -) - -var defaultNodeName = "host_peer" - -func emptyNodeInfo() NodeInfo { - return DefaultNodeInfo{} -} - -// newMultiplexTransport returns a tcp connected multiplexed peer -// using the default MConnConfig. It's a convenience function used -// for testing. -func newMultiplexTransport( - nodeInfo NodeInfo, - nodeKey NodeKey, -) *MultiplexTransport { - return NewMultiplexTransport( - nodeInfo, nodeKey, conn.DefaultMConnConfig(), - ) -} - -func TestTransportMultiplexConnFilter(t *testing.T) { - mt := newMultiplexTransport( - emptyNodeInfo(), - GenNodeKey(), - ) - id := mt.nodeKey.ID - - MultiplexTransportConnFilters( - func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil }, - func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil }, - func(_ ConnSet, _ net.Conn, _ []net.IP) error { - return fmt.Errorf("rejected") - }, - )(mt) - - addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) - if err != nil { - t.Fatal(err) - } - - if err := mt.Listen(*addr); err != nil { - t.Fatal(err) - } - - errc := make(chan error) - - go func() { - addr := NewNetAddress(id, mt.listener.Addr()) - - _, err := addr.Dial() - if err != nil { - errc <- err - return - } - - close(errc) - }() - - if err := <-errc; err != nil { - t.Errorf("connection failed: %v", err) - } - - _, err = mt.Accept(peerConfig{}) - if err, ok := err.(ErrRejected); ok { - if !err.IsFiltered() { - t.Errorf("expected peer to be filtered, got %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", err) - } -} - -func TestTransportMultiplexConnFilterTimeout(t *testing.T) { - mt := newMultiplexTransport( - emptyNodeInfo(), - GenNodeKey(), - ) - id := mt.nodeKey.ID - - MultiplexTransportFilterTimeout(5 * time.Millisecond)(mt) - MultiplexTransportConnFilters( - func(_ ConnSet, _ net.Conn, _ []net.IP) error { - time.Sleep(1 * time.Second) - return nil - }, - )(mt) - - addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) - if err != nil { - t.Fatal(err) - } - - if err := mt.Listen(*addr); err != nil { - t.Fatal(err) - } - - errc := make(chan error) - go func() { - addr := NewNetAddress(id, mt.listener.Addr()) - - _, err := addr.Dial() - if err != nil { - errc <- err - return - } - - close(errc) - }() - - if err := <-errc; err != nil { - t.Errorf("connection failed: %v", err) - } - - _, err = mt.Accept(peerConfig{}) - if _, ok := err.(ErrFilterTimeout); !ok { - t.Errorf("expected ErrFilterTimeout, got %v", err) - } -} - -func TestTransportMultiplexMaxIncomingConnections(t *testing.T) { - pv := ed25519.GenPrivKey() - id := PubKeyToID(pv.PubKey()) - mt := newMultiplexTransport( - testNodeInfo( - id, "transport", - ), - NodeKey{ - ID: id, - PrivKey: pv, - }, - ) - - MultiplexTransportMaxIncomingConnections(0)(mt) - - addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) - if err != nil { - t.Fatal(err) - } - const maxIncomingConns = 2 - MultiplexTransportMaxIncomingConnections(maxIncomingConns)(mt) - if err := mt.Listen(*addr); err != nil { - t.Fatal(err) - } - - laddr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - // Connect more peers than max - for i := 0; i <= maxIncomingConns; i++ { - errc := make(chan error) - go testDialer(*laddr, errc) - - err = <-errc - if i < maxIncomingConns { - if err != nil { - t.Errorf("dialer connection failed: %v", err) - } - _, err = mt.Accept(peerConfig{}) - if err != nil { - t.Errorf("connection failed: %v", err) - } - } else if err == nil || !strings.Contains(err.Error(), "i/o timeout") { - // mt actually blocks forever on trying to accept a new peer into a full channel so - // expect the dialer to encounter a timeout error. Calling mt.Accept will block until - // mt is closed. - t.Errorf("expected i/o timeout error, got %v", err) - } - } -} - -func TestTransportMultiplexAcceptMultiple(t *testing.T) { - mt := testSetupMultiplexTransport(t) - laddr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - var ( - seed = rand.New(rand.NewSource(time.Now().UnixNano())) - nDialers = seed.Intn(64) + 64 - errc = make(chan error, nDialers) - ) - - // Setup dialers. - for i := 0; i < nDialers; i++ { - go testDialer(*laddr, errc) - } - - // Catch connection errors. - for i := 0; i < nDialers; i++ { - if err := <-errc; err != nil { - t.Fatal(err) - } - } - - ps := []Peer{} - - // Accept all peers. - for i := 0; i < cap(errc); i++ { - p, err := mt.Accept(peerConfig{}) - if err != nil { - t.Fatal(err) - } - - if err := p.Start(); err != nil { - t.Fatal(err) - } - - ps = append(ps, p) - } - - if have, want := len(ps), cap(errc); have != want { - t.Errorf("have %v, want %v", have, want) - } - - // Stop all peers. - for _, p := range ps { - if err := p.Stop(); err != nil { - t.Fatal(err) - } - } - - if err := mt.Close(); err != nil { - t.Errorf("close errored: %v", err) - } -} - -func testDialer(dialAddr NetAddress, errc chan error) { - var ( - pv = ed25519.GenPrivKey() - id = PubKeyToID(pv.PubKey()) - dialer = newMultiplexTransport( - testNodeInfo(id, defaultNodeName), - NodeKey{ - ID: id, - PrivKey: pv, - }, - ) - ) - - _, err := dialer.Dial(dialAddr, peerConfig{}) - if err != nil { - errc <- err - return - } - - // Signal that the connection was established. - errc <- nil -} - -func TestTransportMultiplexAcceptNonBlocking(t *testing.T) { - mt := testSetupMultiplexTransport(t) - - var ( - fastNodePV = ed25519.GenPrivKey() - fastNodeInfo = testNodeInfo(PubKeyToID(fastNodePV.PubKey()), "fastnode") - errc = make(chan error) - fastc = make(chan struct{}) - slowc = make(chan struct{}) - slowdonec = make(chan struct{}) - ) - - // Simulate slow Peer. - go func() { - addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - c, err := addr.Dial() - if err != nil { - errc <- err - return - } - - close(slowc) - defer func() { - close(slowdonec) - }() - - // Make sure we switch to fast peer goroutine. - runtime.Gosched() - - select { - case <-fastc: - // Fast peer connected. - case <-time.After(200 * time.Millisecond): - // We error if the fast peer didn't succeed. - errc <- fmt.Errorf("fast peer timed out") - } - - sc, err := upgradeSecretConn(c, 200*time.Millisecond, ed25519.GenPrivKey()) - if err != nil { - errc <- err - return - } - - _, err = handshake(sc, 200*time.Millisecond, - testNodeInfo( - PubKeyToID(ed25519.GenPrivKey().PubKey()), - "slow_peer", - )) - if err != nil { - errc <- err - } - }() - - // Simulate fast Peer. - go func() { - <-slowc - - var ( - dialer = newMultiplexTransport( - fastNodeInfo, - NodeKey{ - ID: PubKeyToID(fastNodePV.PubKey()), - PrivKey: fastNodePV, - }, - ) - ) - addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - _, err := dialer.Dial(*addr, peerConfig{}) - if err != nil { - errc <- err - return - } - - close(fastc) - <-slowdonec - close(errc) - }() - - if err := <-errc; err != nil { - t.Logf("connection failed: %v", err) - } - - p, err := mt.Accept(peerConfig{}) - if err != nil { - t.Fatal(err) - } - - if have, want := p.NodeInfo(), fastNodeInfo; !reflect.DeepEqual(have, want) { - t.Errorf("have %v, want %v", have, want) - } -} - -func TestTransportMultiplexValidateNodeInfo(t *testing.T) { - mt := testSetupMultiplexTransport(t) - - errc := make(chan error) - - go func() { - var ( - pv = ed25519.GenPrivKey() - id = PubKeyToID(pv.PubKey()) - dialer = newMultiplexTransport( - testNodeInfo(id, ""), // Should not be empty - NodeKey{ - ID: id, - PrivKey: pv, - }, - ) - ) - - addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - _, err := dialer.Dial(*addr, peerConfig{}) - if err != nil { - errc <- err - return - } - - close(errc) - }() - - if err := <-errc; err != nil { - t.Errorf("connection failed: %v", err) - } - - _, err := mt.Accept(peerConfig{}) - if err, ok := err.(ErrRejected); ok { - if !err.IsNodeInfoInvalid() { - t.Errorf("expected NodeInfo to be invalid, got %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", err) - } -} - -func TestTransportMultiplexRejectMissmatchID(t *testing.T) { - mt := testSetupMultiplexTransport(t) - - errc := make(chan error) - - go func() { - dialer := newMultiplexTransport( - testNodeInfo( - PubKeyToID(ed25519.GenPrivKey().PubKey()), "dialer", - ), - GenNodeKey(), - ) - addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - _, err := dialer.Dial(*addr, peerConfig{}) - if err != nil { - errc <- err - return - } - - close(errc) - }() - - if err := <-errc; err != nil { - t.Errorf("connection failed: %v", err) - } - - _, err := mt.Accept(peerConfig{}) - if err, ok := err.(ErrRejected); ok { - if !err.IsAuthFailure() { - t.Errorf("expected auth failure, got %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", err) - } -} - -func TestTransportMultiplexDialRejectWrongID(t *testing.T) { - mt := testSetupMultiplexTransport(t) - - var ( - pv = ed25519.GenPrivKey() - id = PubKeyToID(pv.PubKey()) - dialer = newMultiplexTransport( - testNodeInfo(id, ""), // Should not be empty - NodeKey{ - ID: id, - PrivKey: pv, - }, - ) - ) - - wrongID := PubKeyToID(ed25519.GenPrivKey().PubKey()) - addr := NewNetAddress(wrongID, mt.listener.Addr()) - - _, err := dialer.Dial(*addr, peerConfig{}) - if err != nil { - t.Logf("connection failed: %v", err) - if err, ok := err.(ErrRejected); ok { - if !err.IsAuthFailure() { - t.Errorf("expected auth failure, got %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", err) - } - } -} - -func TestTransportMultiplexRejectIncompatible(t *testing.T) { - mt := testSetupMultiplexTransport(t) - - errc := make(chan error) - - go func() { - var ( - pv = ed25519.GenPrivKey() - id = PubKeyToID(pv.PubKey()) - dialer = newMultiplexTransport( - testNodeInfoWithNetwork(id, "dialer", "incompatible-network"), - NodeKey{ - ID: id, - PrivKey: pv, - }, - ) - ) - addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - _, err := dialer.Dial(*addr, peerConfig{}) - if err != nil { - errc <- err - return - } - - close(errc) - }() - - _, err := mt.Accept(peerConfig{}) - if err, ok := err.(ErrRejected); ok { - if !err.IsIncompatible() { - t.Errorf("expected to reject incompatible, got %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", err) - } -} - -func TestTransportMultiplexRejectSelf(t *testing.T) { - mt := testSetupMultiplexTransport(t) - - errc := make(chan error) - - go func() { - addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - _, err := mt.Dial(*addr, peerConfig{}) - if err != nil { - errc <- err - return - } - - close(errc) - }() - - if err := <-errc; err != nil { - if err, ok := err.(ErrRejected); ok { - if !err.IsSelf() { - t.Errorf("expected to reject self, got: %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", err) - } - } else { - t.Errorf("expected connection failure") - } - - _, err := mt.Accept(peerConfig{}) - if err, ok := err.(ErrRejected); ok { - if !err.IsSelf() { - t.Errorf("expected to reject self, got: %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", nil) - } -} - -func TestTransportConnDuplicateIPFilter(t *testing.T) { - filter := ConnDuplicateIPFilter() - - if err := filter(nil, &testTransportConn{}, nil); err != nil { - t.Fatal(err) - } - - var ( - c = &testTransportConn{} - cs = NewConnSet() - ) - - cs.Set(c, []net.IP{ - {10, 0, 10, 1}, - {10, 0, 10, 2}, - {10, 0, 10, 3}, - }) - - if err := filter(cs, c, []net.IP{ - {10, 0, 10, 2}, - }); err == nil { - t.Errorf("expected Peer to be rejected as duplicate") - } -} - -func TestTransportHandshake(t *testing.T) { - ln, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - - var ( - peerPV = ed25519.GenPrivKey() - peerNodeInfo = testNodeInfo(PubKeyToID(peerPV.PubKey()), defaultNodeName) - ) - - go func() { - c, err := net.Dial(ln.Addr().Network(), ln.Addr().String()) - if err != nil { - t.Error(err) - return - } - - go func(c net.Conn) { - _, err := protoio.NewDelimitedWriter(c).WriteMsg(peerNodeInfo.(DefaultNodeInfo).ToProto()) - if err != nil { - t.Error(err) - } - }(c) - go func(c net.Conn) { - var ( - // ni DefaultNodeInfo - pbni tmp2p.DefaultNodeInfo - ) - - protoReader := protoio.NewDelimitedReader(c, MaxNodeInfoSize()) - err := protoReader.ReadMsg(&pbni) - if err != nil { - t.Error(err) - } - - _, err = DefaultNodeInfoFromToProto(&pbni) - if err != nil { - t.Error(err) - } - }(c) - }() - - c, err := ln.Accept() - if err != nil { - t.Fatal(err) - } - - ni, err := handshake(c, 20*time.Millisecond, emptyNodeInfo()) - if err != nil { - t.Fatal(err) - } - - if have, want := ni, peerNodeInfo; !reflect.DeepEqual(have, want) { - t.Errorf("have %v, want %v", have, want) - } -} - -// create listener -func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport { - var ( - pv = ed25519.GenPrivKey() - id = PubKeyToID(pv.PubKey()) - mt = newMultiplexTransport( - testNodeInfo( - id, "transport", - ), - NodeKey{ - ID: id, - PrivKey: pv, - }, - ) - ) - - addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) - if err != nil { - t.Fatal(err) - } - - if err := mt.Listen(*addr); err != nil { - t.Fatal(err) - } - - // give the listener some time to get ready - time.Sleep(20 * time.Millisecond) - - return mt -} - -type testTransportAddr struct{} - -func (a *testTransportAddr) Network() string { return "tcp" } -func (a *testTransportAddr) String() string { return "test.local:1234" } - -type testTransportConn struct{} - -func (c *testTransportConn) Close() error { - return fmt.Errorf("close() not implemented") -} - -func (c *testTransportConn) LocalAddr() net.Addr { - return &testTransportAddr{} -} - -func (c *testTransportConn) RemoteAddr() net.Addr { - return &testTransportAddr{} -} - -func (c *testTransportConn) Read(_ []byte) (int, error) { - return -1, fmt.Errorf("read() not implemented") -} - -func (c *testTransportConn) SetDeadline(_ time.Time) error { - return fmt.Errorf("setDeadline() not implemented") -} - -func (c *testTransportConn) SetReadDeadline(_ time.Time) error { - return fmt.Errorf("setReadDeadline() not implemented") -} - -func (c *testTransportConn) SetWriteDeadline(_ time.Time) error { - return fmt.Errorf("setWriteDeadline() not implemented") -} - -func (c *testTransportConn) Write(_ []byte) (int, error) { - return -1, fmt.Errorf("write() not implemented") -} diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index 3a818f54f..44d7a3dd6 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -222,7 +222,7 @@ type Node struct { privValidator types.PrivValidator // local node's validator key // network - transport *p2p.MultiplexTransport + transport *p2p.MConnTransport sw *p2p.Switch // p2p connections addrBook pex.AddrBook // known peers nodeInfo p2p.NodeInfo @@ -453,23 +453,22 @@ func createConsensusReactor(config *cfg.Config, } func createTransport( + logger log.Logger, config *cfg.Config, nodeInfo p2p.NodeInfo, nodeKey p2p.NodeKey, proxyApp proxy.AppConns, ) ( - *p2p.MultiplexTransport, + *p2p.MConnTransport, []p2p.PeerFilterFunc, ) { var ( - mConnConfig = p2p.MConnConfig(config.P2P) - transport = p2p.NewMultiplexTransport(nodeInfo, nodeKey, mConnConfig) connFilters = []p2p.ConnFilterFunc{} peerFilters = []p2p.PeerFilterFunc{} ) if !config.P2P.AllowDuplicateIP { - connFilters = append(connFilters, p2p.ConnDuplicateIPFilter()) + connFilters = append(connFilters, p2p.ConnDuplicateIPFilter) } // Filter peers by addr or pubkey with an ABCI query. @@ -512,11 +511,12 @@ func createTransport( ) } - p2p.MultiplexTransportConnFilters(connFilters...)(transport) - - // Limit the number of incoming connections. - max := config.P2P.MaxNumInboundPeers + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) - p2p.MultiplexTransportMaxIncomingConnections(max)(transport) + transport := p2p.NewMConnTransport( + logger, nodeInfo, nodeKey.PrivKey, p2p.MConnConfig(config.P2P), + p2p.MConnTransportConnFilters(connFilters...), + p2p.MConnTransportMaxIncomingConnections(config.P2P.MaxNumInboundPeers+ + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))), + ) return transport, peerFilters } @@ -808,11 +808,9 @@ func NewNode(config *cfg.Config, return nil, err } - // Setup Transport. - transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp) - - // Setup Switch. + // Setup Transport and Switch. p2pLogger := logger.With("module", "p2p") + transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) sw := createSwitch( config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, stateSyncReactorShim, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, @@ -921,29 +919,30 @@ func (n *Node) OnStart() error { n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr) } - // Start the transport. - addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress)) - if err != nil { - return err - } - if err := n.transport.Listen(*addr); err != nil { - return err - } - - n.isListening = true - + // Start the mempool. if n.config.Mempool.WalEnabled() { - err = n.mempool.InitWAL() + err := n.mempool.InitWAL() if err != nil { return fmt.Errorf("init mempool WAL: %w", err) } } // Start the switch (the P2P server). - err = n.sw.Start() + err := n.sw.Start() + if err != nil { + return err + } + + // Start the transport. + addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress)) if err != nil { return err } + if err := n.transport.Listen(addr.Endpoint()); err != nil { + return err + } + + n.isListening = true // Start the real state sync reactor separately since the switch uses the shim. if err := n.stateSyncReactor.Start(); err != nil {