From bcfc889f252e13fbe345ef985bdf9e41be2544d3 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 15 Dec 2020 16:08:16 +0100 Subject: [PATCH] p2p: implement new Transport interface (#5791) This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog. The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack. The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely. There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile. --- consensus/byzantine_test.go | 65 ++-- libs/log/testing_logger.go | 8 +- node/node.go | 53 ++- p2p/conn/connection.go | 9 +- p2p/netaddress.go | 10 + p2p/node_info.go | 2 +- p2p/peer.go | 183 ++++------ p2p/peer_test.go | 36 +- p2p/pex/pex_reactor_test.go | 21 +- p2p/switch.go | 68 ++-- p2p/switch_test.go | 75 ++-- p2p/test_util.go | 45 +-- p2p/transport.go | 690 +++++++---------------------------- p2p/transport_mconn.go | 675 ++++++++++++++++++++++++++++++++++ p2p/transport_test.go | 696 ------------------------------------ test/maverick/node/node.go | 53 ++- 16 files changed, 1098 insertions(+), 1591 deletions(-) create mode 100644 p2p/transport_mconn.go delete mode 100644 p2p/transport_test.go diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 4e31df98d..52357008d 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -224,18 +224,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { ticker.SetLogger(css[0].Logger) css[0].SetTimeoutTicker(ticker) - switches := make([]*p2p.Switch, N) p2pLogger := logger.With("module", "p2p") - for i := 0; i < N; i++ { - switches[i] = p2p.MakeSwitch( - config.P2P, - i, - "foo", "1.0.0", - func(i int, sw *p2p.Switch) *p2p.Switch { - return sw - }) - switches[i].SetLogger(p2pLogger.With("validator", i)) - } blocksSubs := make([]types.Subscription, N) reactors := make([]p2p.Reactor, N) @@ -243,20 +232,6 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { // enable txs so we can create different proposals assertMempool(css[i].txNotifier).EnableTxsAvailable() - // make first val byzantine - if i == 0 { - // NOTE: Now, test validators are MockPV, which by default doesn't - // do any safety checks. - css[i].privValidator.(types.MockPV).DisableChecks() - css[i].decideProposal = func(j int32) func(int64, int32) { - return func(height int64, round int32) { - byzantineDecideProposalFunc(t, height, round, css[j], switches[j]) - } - }(int32(i)) - // We are setting the prevote function to do nothing because the prevoting - // and precommitting are done alongside the proposal. - css[i].doPrevote = func(height int64, round int32) {} - } eventBus := css[i].eventBus eventBus.SetLogger(logger.With("module", "events", "validator", i)) @@ -281,22 +256,10 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { require.NoError(t, err) } - defer func() { - for _, r := range reactors { - if rr, ok := r.(*ByzantineReactor); ok { - err := rr.reactor.Switch.Stop() - require.NoError(t, err) - } else { - err := r.(*Reactor).Switch.Stop() - require.NoError(t, err) - } - } - }() - - p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { - // ignore new switch s, we already made ours - switches[i].AddReactor("CONSENSUS", reactors[i]) - return switches[i] + switches := p2p.MakeConnectedSwitches(config.P2P, N, func(i int, sw *p2p.Switch) *p2p.Switch { + sw.SetLogger(p2pLogger.With("validator", i)) + sw.AddReactor("CONSENSUS", reactors[i]) + return sw }, func(sws []*p2p.Switch, i, j int) { // the network starts partitioned with globally active adversary if i != 0 { @@ -305,6 +268,26 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) { p2p.Connect2Switches(sws, i, j) }) + // make first val byzantine + // NOTE: Now, test validators are MockPV, which by default doesn't + // do any safety checks. + css[0].privValidator.(types.MockPV).DisableChecks() + css[0].decideProposal = func(j int32) func(int64, int32) { + return func(height int64, round int32) { + byzantineDecideProposalFunc(t, height, round, css[j], switches[j]) + } + }(int32(0)) + // We are setting the prevote function to do nothing because the prevoting + // and precommitting are done alongside the proposal. + css[0].doPrevote = func(height int64, round int32) {} + + defer func() { + for _, sw := range switches { + err := sw.Stop() + require.NoError(t, err) + } + }() + // start the non-byz state machines. // note these must be started before the byz for i := 1; i < N; i++ { diff --git a/libs/log/testing_logger.go b/libs/log/testing_logger.go index 8914bd81f..95a7e3f7c 100644 --- a/libs/log/testing_logger.go +++ b/libs/log/testing_logger.go @@ -3,6 +3,7 @@ package log import ( "io" "os" + "sync" "testing" "github.com/go-kit/kit/log/term" @@ -10,7 +11,8 @@ import ( var ( // reuse the same logger across all tests - _testingLogger Logger + _testingLoggerMutex = sync.Mutex{} + _testingLogger Logger ) // TestingLogger returns a TMLogger which writes to STDOUT if testing being run @@ -30,6 +32,8 @@ func TestingLogger() Logger { // inside a test (not in the init func) because // verbose flag only set at the time of testing. func TestingLoggerWithOutput(w io.Writer) Logger { + _testingLoggerMutex.Lock() + defer _testingLoggerMutex.Unlock() if _testingLogger != nil { return _testingLogger } @@ -46,6 +50,8 @@ func TestingLoggerWithOutput(w io.Writer) Logger { // TestingLoggerWithColorFn allow you to provide your own color function. See // TestingLogger for documentation. func TestingLoggerWithColorFn(colorFn func(keyvals ...interface{}) term.FgBgColor) Logger { + _testingLoggerMutex.Lock() + defer _testingLoggerMutex.Unlock() if _testingLogger != nil { return _testingLogger } diff --git a/node/node.go b/node/node.go index cc047db9c..8d1a9795f 100644 --- a/node/node.go +++ b/node/node.go @@ -180,7 +180,7 @@ type Node struct { privValidator types.PrivValidator // local node's validator key // network - transport *p2p.MultiplexTransport + transport *p2p.MConnTransport sw *p2p.Switch // p2p connections addrBook pex.AddrBook // known peers nodeInfo p2p.NodeInfo @@ -409,23 +409,22 @@ func createConsensusReactor(config *cfg.Config, } func createTransport( + logger log.Logger, config *cfg.Config, nodeInfo p2p.NodeInfo, nodeKey p2p.NodeKey, proxyApp proxy.AppConns, ) ( - *p2p.MultiplexTransport, + *p2p.MConnTransport, []p2p.PeerFilterFunc, ) { var ( - mConnConfig = p2p.MConnConfig(config.P2P) - transport = p2p.NewMultiplexTransport(nodeInfo, nodeKey, mConnConfig) connFilters = []p2p.ConnFilterFunc{} peerFilters = []p2p.PeerFilterFunc{} ) if !config.P2P.AllowDuplicateIP { - connFilters = append(connFilters, p2p.ConnDuplicateIPFilter()) + connFilters = append(connFilters, p2p.ConnDuplicateIPFilter) } // Filter peers by addr or pubkey with an ABCI query. @@ -468,11 +467,12 @@ func createTransport( ) } - p2p.MultiplexTransportConnFilters(connFilters...)(transport) - - // Limit the number of incoming connections. - max := config.P2P.MaxNumInboundPeers + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) - p2p.MultiplexTransportMaxIncomingConnections(max)(transport) + transport := p2p.NewMConnTransport( + logger, nodeInfo, nodeKey.PrivKey, p2p.MConnConfig(config.P2P), + p2p.MConnTransportConnFilters(connFilters...), + p2p.MConnTransportMaxIncomingConnections(config.P2P.MaxNumInboundPeers+ + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))), + ) return transport, peerFilters } @@ -764,11 +764,9 @@ func NewNode(config *cfg.Config, return nil, err } - // Setup Transport. - transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp) - - // Setup Switch. + // Setup Transport and Switch. p2pLogger := logger.With("module", "p2p") + transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) sw := createSwitch( config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, stateSyncReactorShim, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, @@ -877,29 +875,30 @@ func (n *Node) OnStart() error { n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr) } - // Start the transport. - addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress)) - if err != nil { - return err - } - if err := n.transport.Listen(*addr); err != nil { - return err - } - - n.isListening = true - + // Start the mempool. if n.config.Mempool.WalEnabled() { - err = n.mempool.InitWAL() + err := n.mempool.InitWAL() if err != nil { return fmt.Errorf("init mempool WAL: %w", err) } } // Start the switch (the P2P server). - err = n.sw.Start() + err := n.sw.Start() + if err != nil { + return err + } + + // Start the transport. + addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress)) if err != nil { return err } + if err := n.transport.Listen(addr.Endpoint()); err != nil { + return err + } + + n.isListening = true // Start the real state sync reactor separately since the switch uses the shim. if err := n.stateSyncReactor.Start(); err != nil { diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index b5290116d..e597b4c63 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -848,7 +848,7 @@ func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) { } // Handles incoming PacketMsgs. It returns a message bytes if message is -// complete. NOTE message bytes may change on next call to recvPacketMsg. +// complete, which is owned by the caller and will not be modified. // Not goroutine-safe func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet) @@ -859,12 +859,7 @@ func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { ch.recving = append(ch.recving, packet.Data...) if packet.EOF { msgBytes := ch.recving - - // clear the slice without re-allocating. - // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go - // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes, - // at which point the recving slice stops being used and should be garbage collected - ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity) + ch.recving = make([]byte, 0, ch.desc.RecvBufferCapacity) return msgBytes, nil } return nil, nil diff --git a/p2p/netaddress.go b/p2p/netaddress.go index 77209217b..c14c3753a 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -281,6 +281,16 @@ func (na *NetAddress) HasID() bool { return string(na.ID) != "" } +// Endpoint converts the address to an MConnection endpoint. +func (na *NetAddress) Endpoint() Endpoint { + return Endpoint{ + Protocol: MConnProtocol, + PeerID: na.ID, + IP: na.IP, + Port: na.Port, + } +} + // Local returns true if it is a local address. func (na *NetAddress) Local() bool { return na.IP.IsLoopback() || zero4.Contains(na.IP) diff --git a/p2p/node_info.go b/p2p/node_info.go index 8acc23676..511716afb 100644 --- a/p2p/node_info.go +++ b/p2p/node_info.go @@ -245,7 +245,7 @@ func (info DefaultNodeInfo) ToProto() *tmp2p.DefaultNodeInfo { return dni } -func DefaultNodeInfoFromToProto(pb *tmp2p.DefaultNodeInfo) (DefaultNodeInfo, error) { +func DefaultNodeInfoFromProto(pb *tmp2p.DefaultNodeInfo) (DefaultNodeInfo, error) { if pb == nil { return DefaultNodeInfo{}, errors.New("nil node info") } diff --git a/p2p/peer.go b/p2p/peer.go index 43a1e7c9c..3ef8240c5 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -4,7 +4,9 @@ import ( "bytes" "encoding/hex" "fmt" + "io" "net" + "runtime/debug" "strings" "sync" "time" @@ -178,52 +180,28 @@ type Peer interface { type peerConn struct { outbound bool persistent bool - conn net.Conn // source connection - - socketAddr *NetAddress - - // cached RemoteIP() - ip net.IP + conn Connection + ip net.IP // cached RemoteIP() } -func newPeerConn( - outbound, persistent bool, - conn net.Conn, - socketAddr *NetAddress, -) peerConn { - +func newPeerConn(outbound, persistent bool, conn Connection) peerConn { return peerConn{ outbound: outbound, persistent: persistent, conn: conn, - socketAddr: socketAddr, } } // ID only exists for SecretConnection. -// NOTE: Will panic if conn is not *SecretConnection. func (pc peerConn) ID() ID { - return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey()) + return PubKeyToID(pc.conn.PubKey()) } // Return the IP from the connection RemoteAddr func (pc peerConn) RemoteIP() net.IP { - if pc.ip != nil { - return pc.ip - } - - host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String()) - if err != nil { - panic(err) - } - - ips, err := net.LookupIP(host) - if err != nil { - panic(err) + if pc.ip == nil { + pc.ip = pc.conn.RemoteEndpoint().IP } - - pc.ip = ips[0] - return pc.ip } @@ -235,13 +213,14 @@ type peer struct { // raw peerConn and the multiplex connection peerConn - mconn *tmconn.MConnection // peer's node info and the channel it knows about // channels = nodeInfo.Channels // cached to avoid copying nodeInfo in hasChannel - nodeInfo NodeInfo - channels []byte + nodeInfo NodeInfo + channels []byte + reactors map[byte]Reactor + onPeerError func(Peer, interface{}) // User data Data *cmap.CMap @@ -254,30 +233,22 @@ type PeerOption func(*peer) func newPeer( pc peerConn, - mConfig tmconn.MConnConfig, - nodeInfo NodeInfo, reactorsByCh map[byte]Reactor, - chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), options ...PeerOption, ) *peer { + nodeInfo := pc.conn.NodeInfo() p := &peer{ peerConn: pc, nodeInfo: nodeInfo, - channels: nodeInfo.(DefaultNodeInfo).Channels, // TODO + channels: nodeInfo.Channels, // TODO + reactors: reactorsByCh, + onPeerError: onPeerError, Data: cmap.NewCMap(), metricsTicker: time.NewTicker(metricsTickerDuration), metrics: NopMetrics(), } - p.mconn = createMConnection( - pc.conn, - p, - reactorsByCh, - chDescs, - onPeerError, - mConfig, - ) p.BaseService = *service.NewBaseService(nil, "Peer", p) for _, option := range options { option(p) @@ -286,13 +257,18 @@ func newPeer( return p } +// onError calls the peer error callback. +func (p *peer) onError(err interface{}) { + p.onPeerError(p, err) +} + // String representation. func (p *peer) String() string { if p.outbound { - return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID()) + return fmt.Sprintf("Peer{%v %v out}", p.conn, p.ID()) } - return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) + return fmt.Sprintf("Peer{%v %v in}", p.conn, p.ID()) } //--------------------------------------------------- @@ -301,7 +277,6 @@ func (p *peer) String() string { // SetLogger implements BaseService. func (p *peer) SetLogger(l log.Logger) { p.Logger = l - p.mconn.SetLogger(l) } // OnStart implements BaseService. @@ -310,29 +285,53 @@ func (p *peer) OnStart() error { return err } - if err := p.mconn.Start(); err != nil { - return err - } - + go p.processMessages() go p.metricsReporter() + return nil } +// processMessages processes messages received from the connection. +func (p *peer) processMessages() { + defer func() { + if r := recover(); r != nil { + p.Logger.Error("peer message processing panic", "err", r, "stack", string(debug.Stack())) + p.onError(fmt.Errorf("panic during peer message processing: %v", r)) + } + }() + + for { + chID, msg, err := p.conn.ReceiveMessage() + if err != nil { + p.onError(err) + return + } + reactor, ok := p.reactors[chID] + if !ok { + p.onError(fmt.Errorf("unknown channel %v", chID)) + return + } + reactor.Receive(chID, p, msg) + } +} + // FlushStop mimics OnStop but additionally ensures that all successful // .Send() calls will get flushed before closing the connection. // NOTE: it is not safe to call this method more than once. func (p *peer) FlushStop() { p.metricsTicker.Stop() p.BaseService.OnStop() - p.mconn.FlushStop() // stop everything and close the conn + if err := p.conn.FlushClose(); err != nil { + p.Logger.Debug("error while stopping peer", "err", err) + } } // OnStop implements BaseService. func (p *peer) OnStop() { p.metricsTicker.Stop() p.BaseService.OnStop() - if err := p.mconn.Stop(); err != nil { // stop everything and close the conn - p.Logger.Debug("Error while stopping peer", "err", err) + if err := p.conn.Close(); err != nil { + p.Logger.Debug("error while stopping peer", "err", err) } } @@ -364,12 +363,12 @@ func (p *peer) NodeInfo() NodeInfo { // For inbound peers, it's the address returned by the underlying connection // (not what's reported in the peer's NodeInfo). func (p *peer) SocketAddr() *NetAddress { - return p.peerConn.socketAddr + return p.peerConn.conn.RemoteEndpoint().NetAddress() } // Status returns the peer's ConnectionStatus. func (p *peer) Status() tmconn.ConnectionStatus { - return p.mconn.Status() + return p.conn.Status() } // Send msg bytes to the channel identified by chID byte. Returns false if the @@ -382,7 +381,13 @@ func (p *peer) Send(chID byte, msgBytes []byte) bool { } else if !p.hasChannel(chID) { return false } - res := p.mconn.Send(chID, msgBytes) + res, err := p.conn.SendMessage(chID, msgBytes) + if err == io.EOF { + return false + } else if err != nil { + p.onError(err) + return false + } if res { labels := []string{ "peer_id", string(p.ID()), @@ -401,7 +406,13 @@ func (p *peer) TrySend(chID byte, msgBytes []byte) bool { } else if !p.hasChannel(chID) { return false } - res := p.mconn.TrySend(chID, msgBytes) + res, err := p.conn.TrySendMessage(chID, msgBytes) + if err == io.EOF { + return false + } else if err != nil { + p.onError(err) + return false + } if res { labels := []string{ "peer_id", string(p.ID()), @@ -458,15 +469,11 @@ func (pc *peerConn) CloseConn() { // RemoteAddr returns peer's remote network address. func (p *peer) RemoteAddr() net.Addr { - return p.peerConn.conn.RemoteAddr() -} - -// CanSend returns true if the send queue is not full, false otherwise. -func (p *peer) CanSend(chID byte) bool { - if !p.IsRunning() { - return false + endpoint := p.conn.RemoteEndpoint() + return &net.TCPAddr{ + IP: endpoint.IP, + Port: int(endpoint.Port), } - return p.mconn.CanSend(chID) } //--------------------------------------------------- @@ -481,7 +488,7 @@ func (p *peer) metricsReporter() { for { select { case <-p.metricsTicker.C: - status := p.mconn.Status() + status := p.conn.Status() var sendQueueSize float64 for _, chStatus := range status.Channels { sendQueueSize += float64(chStatus.SendQueueSize) @@ -493,43 +500,3 @@ func (p *peer) metricsReporter() { } } } - -//------------------------------------------------------------------ -// helper funcs - -func createMConnection( - conn net.Conn, - p *peer, - reactorsByCh map[byte]Reactor, - chDescs []*tmconn.ChannelDescriptor, - onPeerError func(Peer, interface{}), - config tmconn.MConnConfig, -) *tmconn.MConnection { - - onReceive := func(chID byte, msgBytes []byte) { - reactor := reactorsByCh[chID] - if reactor == nil { - // Note that its ok to panic here as it's caught in the conn._recover, - // which does onPeerError. - panic(fmt.Sprintf("Unknown channel %X", chID)) - } - labels := []string{ - "peer_id", string(p.ID()), - "chID", fmt.Sprintf("%#x", chID), - } - p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) - reactor.Receive(chID, p, msgBytes) - } - - onError := func(r interface{}) { - onPeerError(p, r) - } - - return tmconn.NewMConnectionWithConfig( - conn, - chDescs, - onReceive, - onError, - config, - ) -} diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 77f40d1b3..958d22729 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -114,7 +114,6 @@ func TestPeerSend(t *testing.T) { } }) - assert.True(p.CanSend(testCh)) assert.True(p.Send(testCh, []byte("Asylum"))) } @@ -126,20 +125,17 @@ func createOutboundPeerAndPerformHandshake( chDescs := []*tmconn.ChannelDescriptor{ {ID: testCh, Priority: 1}, } - reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)} pk := ed25519.GenPrivKey() - pc, err := testOutboundPeerConn(addr, config, false, pk) - if err != nil { - return nil, err - } - timeout := 1 * time.Second - ourNodeInfo := testNodeInfo(addr.ID, "host_peer") - peerNodeInfo, err := handshake(pc.conn, timeout, ourNodeInfo) + ourNodeInfo := testNodeInfo(PubKeyToID(pk.PubKey()), "host_peer") + transport := NewMConnTransport(log.TestingLogger(), ourNodeInfo, pk, mConfig) + transport.SetChannelDescriptors(chDescs) + reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)} + pc, err := testOutboundPeerConn(transport, addr, config, false, pk) if err != nil { return nil, err } - p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {}) + p := newPeer(pc, reactorsByCh, func(p Peer, r interface{}) {}) p.SetLogger(log.TestingLogger().With("peer", addr)) return p, nil } @@ -157,6 +153,7 @@ func testDial(addr *NetAddress, cfg *config.P2PConfig) (net.Conn, error) { } func testOutboundPeerConn( + transport *MConnTransport, addr *NetAddress, config *config.P2PConfig, persistent bool, @@ -169,7 +166,7 @@ func testOutboundPeerConn( return pc, fmt.Errorf("error creating peer: %w", err) } - pc, err = testPeerConn(conn, config, true, persistent, ourNodePrivKey, addr) + pc, err = testPeerConn(transport, conn, true, persistent) if err != nil { if cerr := conn.Close(); cerr != nil { return pc, fmt.Errorf("%v: %w", cerr.Error(), err) @@ -227,15 +224,12 @@ func (rp *remotePeer) Stop() { } func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) { + transport := NewMConnTransport(log.TestingLogger(), rp.nodeInfo(), rp.PrivKey, MConnConfig(rp.Config)) conn, err := addr.DialTimeout(1 * time.Second) if err != nil { return nil, err } - pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey) - if err != nil { - return nil, err - } - _, err = handshake(pc.conn, time.Second, rp.nodeInfo()) + _, err = testInboundPeerConn(transport, conn) if err != nil { return nil, err } @@ -243,6 +237,7 @@ func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) { } func (rp *remotePeer) accept() { + transport := NewMConnTransport(log.TestingLogger(), rp.nodeInfo(), rp.PrivKey, MConnConfig(rp.Config)) conns := []net.Conn{} for { @@ -255,14 +250,9 @@ func (rp *remotePeer) accept() { return } - pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey) - if err != nil { - golog.Fatalf("Failed to create a peer: %+v", err) - } - - _, err = handshake(pc.conn, time.Second, rp.nodeInfo()) + _, err = testInboundPeerConn(transport, conn) if err != nil { - golog.Fatalf("Failed to perform handshake: %+v", err) + golog.Printf("Failed to create a peer: %+v", err) } conns = append(conns, conn) diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index fd3c15bc6..a32164628 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -126,7 +126,9 @@ func TestPEXReactorReceive(t *testing.T) { r.RequestAddrs(peer) size := book.Size() - msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}) + na, err := peer.NodeInfo().NetAddress() + require.NoError(t, err) + msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{na.ToProto()}}) r.Receive(PexChannel, peer, msg) assert.Equal(t, size+1, book.Size()) @@ -327,7 +329,7 @@ func TestPEXReactorDoesNotDisconnectFromPersistentPeerInSeedMode(t *testing.T) { assert.Zero(t, sw.Peers().Size()) - peerSwitch := testCreateDefaultPeer(dir, 1) + peerSwitch := testCreatePeerWithConfig(dir, 1, pexRConfig) require.NoError(t, peerSwitch.Start()) t.Cleanup(func() { _ = peerSwitch.Stop() }) @@ -452,7 +454,9 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) { pexR.RequestAddrs(peer) size := book.Size() - msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}) + na, err := peer.NodeInfo().NetAddress() + require.NoError(t, err) + msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{na.ToProto()}}) pexR.Receive(PexChannel, peer, msg) assert.Equal(t, size, book.Size()) @@ -619,12 +623,13 @@ func createReactor(t *testing.T, conf *ReactorConfig) (r *Reactor, book AddrBook } func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch { - sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) + sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { + for _, r := range reactors { + sw.AddReactor(r.String(), r) + } + return sw + }) sw.SetLogger(log.TestingLogger()) - for _, r := range reactors { - sw.AddReactor(r.String(), r) - r.SetSwitch(sw) - } return sw } diff --git a/p2p/switch.go b/p2p/switch.go index 785b7f064..99cc04868 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -1,6 +1,7 @@ package p2p import ( + "context" "fmt" "math" "sync" @@ -92,10 +93,14 @@ type Switch struct { metrics *Metrics } -// NetAddress returns the address the switch is listening on. +// NetAddress returns the first address the switch is listening on, +// or nil if no addresses are found. func (sw *Switch) NetAddress() *NetAddress { - addr := sw.transport.NetAddress() - return &addr + endpoints := sw.transport.Endpoints() + if len(endpoints) == 0 { + return nil + } + return endpoints[0].NetAddress() } // SwitchOption sets an optional parameter on the Switch. @@ -221,6 +226,12 @@ func (sw *Switch) SetNodeKey(nodeKey NodeKey) { // OnStart implements BaseService. It starts all the reactors and peers. func (sw *Switch) OnStart() error { + + // FIXME: Temporary hack to pass channel descriptors to MConn transport, + // since they are not available when it is constructed. This will be + // fixed when we implement the new router abstraction. + sw.transport.SetChannelDescriptors(sw.chDescs) + // Start reactors for _, reactor := range sw.reactors { err := reactor.Start() @@ -246,7 +257,7 @@ func (sw *Switch) OnStop() { sw.Logger.Debug("Switch: Stopping reactors") for _, reactor := range sw.reactors { if err := reactor.Stop(); err != nil { - sw.Logger.Error("error while stopped reactor", "reactor", reactor, "error", err) + sw.Logger.Error("error while stopping reactor", "reactor", reactor, "error", err) } } } @@ -354,7 +365,6 @@ func (sw *Switch) StopPeerGracefully(peer Peer) { } func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { - sw.transport.Cleanup(peer) if err := peer.Stop(); err != nil { sw.Logger.Error("error while stopping peer", "error", err) // TODO: should return error to be handled accordingly } @@ -617,13 +627,7 @@ func (sw *Switch) IsPeerPersistent(na *NetAddress) bool { func (sw *Switch) acceptRoutine() { for { - p, err := sw.transport.Accept(peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - reactorsByCh: sw.reactorsByCh, - metrics: sw.metrics, - isPersistent: sw.IsPeerPersistent, - }) + c, err := sw.transport.Accept(context.Background()) if err != nil { switch err := err.(type) { case ErrRejected: @@ -671,6 +675,20 @@ func (sw *Switch) acceptRoutine() { break } + peerNodeInfo := c.NodeInfo() + isPersistent := false + addr, err := peerNodeInfo.NetAddress() + if err == nil { + isPersistent = sw.IsPeerPersistent(addr) + } + + p := newPeer( + newPeerConn(false, isPersistent, c), + sw.reactorsByCh, + sw.StopPeerForError, + PeerMetrics(sw.metrics), + ) + if !sw.IsPeerUnconditional(p.NodeInfo().ID()) { // Ignore connection if we already have enough peers. _, in, _ := sw.NumPeers() @@ -681,16 +699,14 @@ func (sw *Switch) acceptRoutine() { "have", in, "max", sw.config.MaxNumInboundPeers, ) - - sw.transport.Cleanup(p) - + _ = p.CloseConn() continue } } if err := sw.addPeer(p); err != nil { - sw.transport.Cleanup(p) + _ = p.CloseConn() if p.IsRunning() { _ = p.Stop() } @@ -720,12 +736,11 @@ func (sw *Switch) addOutboundPeerWithConfig( return fmt.Errorf("dial err (peerConfig.DialFail == true)") } - p, err := sw.transport.Dial(*addr, peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - isPersistent: sw.IsPeerPersistent, - reactorsByCh: sw.reactorsByCh, - metrics: sw.metrics, + c, err := sw.transport.Dial(context.Background(), Endpoint{ + Protocol: MConnProtocol, + PeerID: addr.ID, + IP: addr.IP, + Port: addr.Port, }) if err != nil { if e, ok := err.(ErrRejected); ok { @@ -748,8 +763,15 @@ func (sw *Switch) addOutboundPeerWithConfig( return err } + p := newPeer( + newPeerConn(true, sw.IsPeerPersistent(c.RemoteEndpoint().NetAddress()), c), + sw.reactorsByCh, + sw.StopPeerForError, + PeerMetrics(sw.metrics), + ) + if err := sw.addPeer(p); err != nil { - sw.transport.Cleanup(p) + _ = p.CloseConn() if p.IsRunning() { _ = p.Stop() } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 538ccfcc3..01dce5c8b 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -2,6 +2,7 @@ package p2p import ( "bytes" + "context" "errors" "fmt" "io" @@ -28,6 +29,7 @@ import ( var ( cfg *config.P2PConfig + ctx = context.Background() ) func init() { @@ -240,15 +242,15 @@ func TestSwitchPeerFilter(t *testing.T) { rp.Start() t.Cleanup(rp.Stop) - p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - isPersistent: sw.IsPeerPersistent, - reactorsByCh: sw.reactorsByCh, - }) + c, err := sw.transport.Dial(ctx, rp.Addr().Endpoint()) if err != nil { t.Fatal(err) } + p := newPeer( + newPeerConn(true, false, c), + sw.reactorsByCh, + sw.StopPeerForError, + ) err = sw.addPeer(p) if err, ok := err.(ErrRejected); ok { @@ -291,15 +293,15 @@ func TestSwitchPeerFilterTimeout(t *testing.T) { rp.Start() defer rp.Stop() - p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - isPersistent: sw.IsPeerPersistent, - reactorsByCh: sw.reactorsByCh, - }) + c, err := sw.transport.Dial(ctx, rp.Addr().Endpoint()) if err != nil { t.Fatal(err) } + p := newPeer( + newPeerConn(true, false, c), + sw.reactorsByCh, + sw.StopPeerForError, + ) err = sw.addPeer(p) if _, ok := err.(ErrFilterTimeout); !ok { @@ -322,15 +324,15 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) { rp.Start() defer rp.Stop() - p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - isPersistent: sw.IsPeerPersistent, - reactorsByCh: sw.reactorsByCh, - }) + c, err := sw.transport.Dial(ctx, rp.Addr().Endpoint()) if err != nil { t.Fatal(err) } + p := newPeer( + newPeerConn(true, false, c), + sw.reactorsByCh, + sw.StopPeerForError, + ) if err := sw.addPeer(p); err != nil { t.Fatal(err) @@ -372,13 +374,15 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { rp.Start() defer rp.Stop() - p, err := sw.transport.Dial(*rp.Addr(), peerConfig{ - chDescs: sw.chDescs, - onPeerError: sw.StopPeerForError, - isPersistent: sw.IsPeerPersistent, - reactorsByCh: sw.reactorsByCh, - }) - require.Nil(err) + c, err := sw.transport.Dial(ctx, rp.Addr().Endpoint()) + if err != nil { + t.Fatal(err) + } + p := newPeer( + newPeerConn(true, false, c), + sw.reactorsByCh, + sw.StopPeerForError, + ) err = sw.addPeer(p) require.Nil(err) @@ -386,7 +390,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { require.NotNil(sw.Peers().Get(rp.ID())) // simulate failure by closing connection - err = p.(*peer).CloseConn() + err = p.CloseConn() require.NoError(err) assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond) @@ -677,19 +681,16 @@ type errorTransport struct { acceptErr error } -func (et errorTransport) NetAddress() NetAddress { - panic("not implemented") -} - -func (et errorTransport) Accept(c peerConfig) (Peer, error) { +func (et errorTransport) Accept(context.Context) (Connection, error) { return nil, et.acceptErr } -func (errorTransport) Dial(NetAddress, peerConfig) (Peer, error) { - panic("not implemented") -} -func (errorTransport) Cleanup(Peer) { +func (errorTransport) Dial(context.Context, Endpoint) (Connection, error) { panic("not implemented") } +func (errorTransport) Close() error { panic("not implemented") } +func (errorTransport) FlushClose() error { panic("not implemented") } +func (errorTransport) Endpoints() []Endpoint { panic("not implemented") } +func (errorTransport) SetChannelDescriptors([]*ChannelDescriptor) {} func TestSwitchAcceptRoutineErrorCases(t *testing.T) { sw := NewSwitch(cfg, errorTransport{ErrFilterTimeout{}}) @@ -728,6 +729,10 @@ type mockReactor struct { initCalledBeforeRemoveFinished uint32 } +func (r *mockReactor) GetChannels() []*ChannelDescriptor { + return []*ChannelDescriptor{{ID: testCh, Priority: 10}} +} + func (r *mockReactor) RemovePeer(peer Peer, reason interface{}) { atomic.StoreUint32(&r.removePeerInProgress, 1) defer atomic.StoreUint32(&r.removePeerInProgress, 0) diff --git a/p2p/test_util.go b/p2p/test_util.go index 9ae42a599..d24cf896a 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -3,9 +3,7 @@ package p2p import ( "fmt" "net" - "time" - "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/libs/log" tmnet "github.com/tendermint/tendermint/libs/net" tmrand "github.com/tendermint/tendermint/libs/rand" @@ -34,12 +32,8 @@ func AddPeerToSwitchPeerSet(sw *Switch, peer Peer) { func CreateRandomPeer(outbound bool) Peer { addr, netAddr := CreateRoutableAddr() p := &peer{ - peerConn: peerConn{ - outbound: outbound, - socketAddr: netAddr, - }, + peerConn: peerConn{outbound: outbound}, nodeInfo: mockNodeInfo{netAddr}, - mconn: &conn.MConnection{}, metrics: NopMetrics(), } p.SetLogger(log.TestingLogger().With("peer", addr)) @@ -127,15 +121,7 @@ func Connect2Switches(switches []*Switch, i, j int) { } func (sw *Switch) addPeerWithConnection(conn net.Conn) error { - pc, err := testInboundPeerConn(conn, sw.config, sw.nodeKey.PrivKey) - if err != nil { - if err := conn.Close(); err != nil { - sw.Logger.Error("Error closing connection", "err", err) - } - return err - } - - ni, err := handshake(conn, time.Second, sw.nodeInfo) + pc, err := testInboundPeerConn(sw.transport.(*MConnTransport), conn) if err != nil { if err := conn.Close(); err != nil { sw.Logger.Error("Error closing connection", "err", err) @@ -145,10 +131,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { p := newPeer( pc, - MConnConfig(sw.config), - ni, sw.reactorsByCh, - sw.chDescs, sw.StopPeerForError, ) @@ -189,9 +172,10 @@ func MakeSwitch( panic(err) } - t := NewMultiplexTransport(nodeInfo, nodeKey, MConnConfig(cfg)) + logger := log.TestingLogger().With("switch", i) + t := NewMConnTransport(logger, nodeInfo, nodeKey.PrivKey, MConnConfig(cfg)) - if err := t.Listen(*addr); err != nil { + if err := t.Listen(addr.Endpoint()); err != nil { panic(err) } @@ -201,6 +185,7 @@ func MakeSwitch( sw.SetNodeKey(nodeKey) ni := nodeInfo.(DefaultNodeInfo) + ni.Channels = []byte{} for ch := range sw.reactorsByCh { ni.Channels = append(ni.Channels, ch) } @@ -208,37 +193,31 @@ func MakeSwitch( // TODO: We need to setup reactors ahead of time so the NodeInfo is properly // populated and we don't have to do those awkward overrides and setters. - t.nodeInfo = nodeInfo + t.nodeInfo = nodeInfo.(DefaultNodeInfo) sw.SetNodeInfo(nodeInfo) return sw } func testInboundPeerConn( + transport *MConnTransport, conn net.Conn, - config *config.P2PConfig, - ourNodePrivKey crypto.PrivKey, ) (peerConn, error) { - return testPeerConn(conn, config, false, false, ourNodePrivKey, nil) + return testPeerConn(transport, conn, false, false) } func testPeerConn( + transport *MConnTransport, rawConn net.Conn, - cfg *config.P2PConfig, outbound, persistent bool, - ourNodePrivKey crypto.PrivKey, - socketAddr *NetAddress, ) (pc peerConn, err error) { - conn := rawConn - // Encrypt connection - conn, err = upgradeSecretConn(conn, cfg.HandshakeTimeout, ourNodePrivKey) + conn, err := newMConnConnection(transport, rawConn, "") if err != nil { return pc, fmt.Errorf("error creating peer: %w", err) } - // Only the information we already have - return newPeerConn(outbound, persistent, conn, socketAddr), nil + return newPeerConn(outbound, persistent, conn), nil } //---------------------------------------------------------------- diff --git a/p2p/transport.go b/p2p/transport.go index e597ac0a1..bbb607700 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -2,593 +2,161 @@ package p2p import ( "context" + "errors" "fmt" "net" - "time" - - "golang.org/x/net/netutil" + "net/url" "github.com/tendermint/tendermint/crypto" - "github.com/tendermint/tendermint/libs/protoio" "github.com/tendermint/tendermint/p2p/conn" - tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p" -) - -const ( - defaultDialTimeout = time.Second - defaultFilterTimeout = 5 * time.Second - defaultHandshakeTimeout = 3 * time.Second ) -// IPResolver is a behaviour subset of net.Resolver. -type IPResolver interface { - LookupIPAddr(context.Context, string) ([]net.IPAddr, error) -} - -// accept is the container to carry the upgraded connection and NodeInfo from an -// asynchronously running routine to the Accept method. -type accept struct { - netAddr *NetAddress - conn net.Conn - nodeInfo NodeInfo - err error -} - -// peerConfig is used to bundle data we need to fully setup a Peer with an -// MConn, provided by the caller of Accept and Dial (currently the Switch). This -// a temporary measure until reactor setup is less dynamic and we introduce the -// concept of PeerBehaviour to communicate about significant Peer lifecycle -// events. -// TODO(xla): Refactor out with more static Reactor setup and PeerBehaviour. -type peerConfig struct { - chDescs []*conn.ChannelDescriptor - onPeerError func(Peer, interface{}) - outbound bool - // isPersistent allows you to set a function, which, given socket address - // (for outbound peers) OR self-reported address (for inbound peers), tells - // if the peer is persistent or not. - isPersistent func(*NetAddress) bool - reactorsByCh map[byte]Reactor - metrics *Metrics -} - -// Transport emits and connects to Peers. The implementation of Peer is left to -// the transport. Each transport is also responsible to filter establishing -// peers specific to its domain. +// Transport is an arbitrary mechanism for exchanging bytes with a peer. type Transport interface { - // Listening address. - NetAddress() NetAddress + // Accept waits for the next inbound connection on a listening endpoint. + Accept(context.Context) (Connection, error) - // Accept returns a newly connected Peer. - Accept(peerConfig) (Peer, error) + // Dial creates an outbound connection to an endpoint. + Dial(context.Context, Endpoint) (Connection, error) - // Dial connects to the Peer for the address. - Dial(NetAddress, peerConfig) (Peer, error) + // Endpoints lists endpoints the transport is listening on. Any endpoint IP + // addresses do not need to be normalized in any way (e.g. 0.0.0.0 is + // valid), as they should be preprocessed before being advertised. + Endpoints() []Endpoint - // Cleanup any resources associated with Peer. - Cleanup(Peer) -} - -// transportLifecycle bundles the methods for callers to control start and stop -// behaviour. -type transportLifecycle interface { + // Close stops accepting new connections, but does not close active connections. Close() error - Listen(NetAddress) error -} - -// ConnFilterFunc to be implemented by filter hooks after a new connection has -// been established. The set of exisiting connections is passed along together -// with all resolved IPs for the new connection. -type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error - -// ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection -// and refuses new ones if they come from a known ip. -func ConnDuplicateIPFilter() ConnFilterFunc { - return func(cs ConnSet, c net.Conn, ips []net.IP) error { - for _, ip := range ips { - if cs.HasIP(ip) { - return ErrRejected{ - conn: c, - err: fmt.Errorf("ip<%v> already connected", ip), - isDuplicate: true, - } - } - } - - return nil - } -} - -// MultiplexTransportOption sets an optional parameter on the -// MultiplexTransport. -type MultiplexTransportOption func(*MultiplexTransport) - -// MultiplexTransportConnFilters sets the filters for rejection new connections. -func MultiplexTransportConnFilters( - filters ...ConnFilterFunc, -) MultiplexTransportOption { - return func(mt *MultiplexTransport) { mt.connFilters = filters } -} - -// MultiplexTransportFilterTimeout sets the timeout waited for filter calls to -// return. -func MultiplexTransportFilterTimeout( - timeout time.Duration, -) MultiplexTransportOption { - return func(mt *MultiplexTransport) { mt.filterTimeout = timeout } -} - -// MultiplexTransportResolver sets the Resolver used for ip lokkups, defaults to -// net.DefaultResolver. -func MultiplexTransportResolver(resolver IPResolver) MultiplexTransportOption { - return func(mt *MultiplexTransport) { mt.resolver = resolver } -} - -// MultiplexTransportMaxIncomingConnections sets the maximum number of -// simultaneous connections (incoming). Default: 0 (unlimited) -func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption { - return func(mt *MultiplexTransport) { mt.maxIncomingConnections = n } -} - -// MultiplexTransport accepts and dials tcp connections and upgrades them to -// multiplexed peers. -type MultiplexTransport struct { - netAddr NetAddress - listener net.Listener - maxIncomingConnections int // see MaxIncomingConnections - - acceptc chan accept - closec chan struct{} - - // Lookup table for duplicate ip and id checks. - conns ConnSet - connFilters []ConnFilterFunc - - dialTimeout time.Duration - filterTimeout time.Duration - handshakeTimeout time.Duration - nodeInfo NodeInfo - nodeKey NodeKey - resolver IPResolver - - // TODO(xla): This config is still needed as we parameterise peerConn and - // peer currently. All relevant configuration should be refactored into options - // with sane defaults. - mConfig conn.MConnConfig -} - -// Test multiplexTransport for interface completeness. -var _ Transport = (*MultiplexTransport)(nil) -var _ transportLifecycle = (*MultiplexTransport)(nil) - -// NewMultiplexTransport returns a tcp connected multiplexed peer. -func NewMultiplexTransport( - nodeInfo NodeInfo, - nodeKey NodeKey, - mConfig conn.MConnConfig, -) *MultiplexTransport { - return &MultiplexTransport{ - acceptc: make(chan accept), - closec: make(chan struct{}), - dialTimeout: defaultDialTimeout, - filterTimeout: defaultFilterTimeout, - handshakeTimeout: defaultHandshakeTimeout, - mConfig: mConfig, - nodeInfo: nodeInfo, - nodeKey: nodeKey, - conns: NewConnSet(), - resolver: net.DefaultResolver, - } -} - -// NetAddress implements Transport. -func (mt *MultiplexTransport) NetAddress() NetAddress { - return mt.netAddr -} - -// Accept implements Transport. -func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) { - select { - // This case should never have any side-effectful/blocking operations to - // ensure that quality peers are ready to be used. - case a := <-mt.acceptc: - if a.err != nil { - return nil, a.err - } - - cfg.outbound = false - - return mt.wrapPeer(a.conn, a.nodeInfo, cfg, a.netAddr), nil - case <-mt.closec: - return nil, ErrTransportClosed{} - } -} - -// Dial implements Transport. -func (mt *MultiplexTransport) Dial( - addr NetAddress, - cfg peerConfig, -) (Peer, error) { - c, err := addr.DialTimeout(mt.dialTimeout) - if err != nil { - return nil, err - } - - // TODO(xla): Evaluate if we should apply filters if we explicitly dial. - if err := mt.filterConn(c); err != nil { - return nil, err - } - - secretConn, nodeInfo, err := mt.upgrade(c, &addr) - if err != nil { - return nil, err - } - - cfg.outbound = true - - p := mt.wrapPeer(secretConn, nodeInfo, cfg, &addr) - - return p, nil -} - -// Close implements transportLifecycle. -func (mt *MultiplexTransport) Close() error { - close(mt.closec) - - if mt.listener != nil { - return mt.listener.Close() - } - - return nil -} - -// Listen implements transportLifecycle. -func (mt *MultiplexTransport) Listen(addr NetAddress) error { - ln, err := net.Listen("tcp", addr.DialString()) - if err != nil { - return err - } - - if mt.maxIncomingConnections > 0 { - ln = netutil.LimitListener(ln, mt.maxIncomingConnections) - } - - mt.netAddr = addr - mt.listener = ln - - go mt.acceptPeers() - - return nil -} - -func (mt *MultiplexTransport) acceptPeers() { - for { - c, err := mt.listener.Accept() - if err != nil { - // If Close() has been called, silently exit. - select { - case _, ok := <-mt.closec: - if !ok { - return - } - default: - // Transport is not closed - } - - mt.acceptc <- accept{err: err} - return - } - // Connection upgrade and filtering should be asynchronous to avoid - // Head-of-line blocking[0]. - // Reference: https://github.com/tendermint/tendermint/issues/2047 - // - // [0] https://en.wikipedia.org/wiki/Head-of-line_blocking - go func(c net.Conn) { - defer func() { - if r := recover(); r != nil { - err := ErrRejected{ - conn: c, - err: fmt.Errorf("recovered from panic: %v", r), - isAuthFailure: true, - } - select { - case mt.acceptc <- accept{err: err}: - case <-mt.closec: - // Give up if the transport was closed. - _ = c.Close() - return - } - } - }() - - var ( - nodeInfo NodeInfo - secretConn *conn.SecretConnection - netAddr *NetAddress - ) - - err := mt.filterConn(c) - if err == nil { - secretConn, nodeInfo, err = mt.upgrade(c, nil) - if err == nil { - addr := c.RemoteAddr() - id := PubKeyToID(secretConn.RemotePubKey()) - netAddr = NewNetAddress(id, addr) - } - } - - select { - case mt.acceptc <- accept{netAddr, secretConn, nodeInfo, err}: - // Make the upgraded peer available. - case <-mt.closec: - // Give up if the transport was closed. - _ = c.Close() - return - } - }(c) - } -} - -// Cleanup removes the given address from the connections set and -// closes the connection. -func (mt *MultiplexTransport) Cleanup(p Peer) { - mt.conns.RemoveAddr(p.RemoteAddr()) - _ = p.CloseConn() -} - -func (mt *MultiplexTransport) cleanup(c net.Conn) error { - mt.conns.Remove(c) - - return c.Close() -} - -func (mt *MultiplexTransport) filterConn(c net.Conn) (err error) { - defer func() { - if err != nil { - _ = c.Close() - } - }() - - // Reject if connection is already present. - if mt.conns.Has(c) { - return ErrRejected{conn: c, isDuplicate: true} - } - - // Resolve ips for incoming conn. - ips, err := resolveIPs(mt.resolver, c) - if err != nil { - return err - } - - errc := make(chan error, len(mt.connFilters)) - - for _, f := range mt.connFilters { - go func(f ConnFilterFunc, c net.Conn, ips []net.IP, errc chan<- error) { - errc <- f(mt.conns, c, ips) - }(f, c, ips, errc) - } - - for i := 0; i < cap(errc); i++ { - select { - case err := <-errc: - if err != nil { - return ErrRejected{conn: c, err: err, isFiltered: true} - } - case <-time.After(mt.filterTimeout): - return ErrFilterTimeout{} - } - - } - - mt.conns.Set(c, ips) - - return nil + // SetChannelDescriptors sets the channel descriptors for the transport. + // FIXME: This is only here for compatibility with the current Switch code. + SetChannelDescriptors(chDescs []*conn.ChannelDescriptor) } -func (mt *MultiplexTransport) upgrade( - c net.Conn, - dialedAddr *NetAddress, -) (secretConn *conn.SecretConnection, nodeInfo NodeInfo, err error) { - defer func() { - if err != nil { - _ = mt.cleanup(c) - } - }() - - secretConn, err = upgradeSecretConn(c, mt.handshakeTimeout, mt.nodeKey.PrivKey) - if err != nil { - return nil, nil, ErrRejected{ - conn: c, - err: fmt.Errorf("secret conn failed: %v", err), - isAuthFailure: true, - } - } - - // For outgoing conns, ensure connection key matches dialed key. - connID := PubKeyToID(secretConn.RemotePubKey()) - if dialedAddr != nil { - if dialedID := dialedAddr.ID; connID != dialedID { - return nil, nil, ErrRejected{ - conn: c, - id: connID, - err: fmt.Errorf( - "conn.ID (%v) dialed ID (%v) mismatch", - connID, - dialedID, - ), - isAuthFailure: true, - } - } - } - - nodeInfo, err = handshake(secretConn, mt.handshakeTimeout, mt.nodeInfo) - if err != nil { - return nil, nil, ErrRejected{ - conn: c, - err: fmt.Errorf("handshake failed: %v", err), - isAuthFailure: true, - } - } - - if err := nodeInfo.Validate(); err != nil { - return nil, nil, ErrRejected{ - conn: c, - err: err, - isNodeInfoInvalid: true, - } - } - - // Ensure connection key matches self reported key. - if connID != nodeInfo.ID() { - return nil, nil, ErrRejected{ - conn: c, - id: connID, - err: fmt.Errorf( - "conn.ID (%v) NodeInfo.ID (%v) mismatch", - connID, - nodeInfo.ID(), - ), - isAuthFailure: true, - } - } - - // Reject self. - if mt.nodeInfo.ID() == nodeInfo.ID() { - return nil, nil, ErrRejected{ - addr: *NewNetAddress(nodeInfo.ID(), c.RemoteAddr()), - conn: c, - id: nodeInfo.ID(), - isSelf: true, - } - } - - if err := mt.nodeInfo.CompatibleWith(nodeInfo); err != nil { - return nil, nil, ErrRejected{ - conn: c, - err: err, - id: nodeInfo.ID(), - isIncompatible: true, - } - } +// Protocol identifies a transport protocol. +type Protocol string - return secretConn, nodeInfo, nil -} +// Endpoint represents a transport connection endpoint, either local or remote. +type Endpoint struct { + // PeerID specifies the peer ID of the endpoint. + // + // FIXME: This is here for backwards-compatibility with the existing MConn + // protocol, we should consider moving this higher in the stack (i.e. to + // the router). + PeerID ID -func (mt *MultiplexTransport) wrapPeer( - c net.Conn, - ni NodeInfo, - cfg peerConfig, - socketAddr *NetAddress, -) Peer { + // Protocol specifies the transport protocol, used by the router to pick a + // transport for an endpoint. + Protocol Protocol - persistent := false - if cfg.isPersistent != nil { - if cfg.outbound { - persistent = cfg.isPersistent(socketAddr) - } else { - selfReportedAddr, err := ni.NetAddress() - if err == nil { - persistent = cfg.isPersistent(selfReportedAddr) - } - } - } + // Path is an optional, arbitrary transport-specific path or identifier. + Path string - peerConn := newPeerConn( - cfg.outbound, - persistent, - c, - socketAddr, - ) + // IP is an IP address (v4 or v6) to connect to. If set, this defines the + // endpoint as a networked endpoint. + IP net.IP - p := newPeer( - peerConn, - mt.mConfig, - ni, - cfg.reactorsByCh, - cfg.chDescs, - cfg.onPeerError, - PeerMetrics(cfg.metrics), - ) - - return p + // Port is a network port (either TCP or UDP). If not set, a default port + // may be used depending on the protocol. + Port uint16 } -func handshake( - c net.Conn, - timeout time.Duration, - nodeInfo NodeInfo, -) (NodeInfo, error) { - if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { - return nil, err +// String formats an endpoint as a URL string. +func (e Endpoint) String() string { + u := url.URL{Scheme: string(e.Protocol)} + if e.PeerID != "" { + u.User = url.User(string(e.PeerID)) } - - var ( - errc = make(chan error, 2) - - pbpeerNodeInfo tmp2p.DefaultNodeInfo - peerNodeInfo DefaultNodeInfo - ourNodeInfo = nodeInfo.(DefaultNodeInfo) - ) - - go func(errc chan<- error, c net.Conn) { - _, err := protoio.NewDelimitedWriter(c).WriteMsg(ourNodeInfo.ToProto()) - errc <- err - }(errc, c) - go func(errc chan<- error, c net.Conn) { - protoReader := protoio.NewDelimitedReader(c, MaxNodeInfoSize()) - err := protoReader.ReadMsg(&pbpeerNodeInfo) - errc <- err - }(errc, c) - - for i := 0; i < cap(errc); i++ { - err := <-errc - if err != nil { - return nil, err + if len(e.IP) > 0 { + u.Host = e.IP.String() + if e.Port > 0 { + u.Host += fmt.Sprintf(":%v", e.Port) } + } else if e.Path != "" { + u.Opaque = e.Path + } + return u.String() +} + +// Validate validates an endpoint. +func (e Endpoint) Validate() error { + switch { + case e.PeerID == "": + return errors.New("endpoint has no peer ID") + case e.Protocol == "": + return errors.New("endpoint has no protocol") + case len(e.IP) == 0 && len(e.Path) == 0: + return errors.New("endpoint must have either IP or path") + case e.Port > 0 && len(e.IP) == 0: + return fmt.Errorf("endpoint has port %v but no IP", e.Port) + default: + return nil } - - peerNodeInfo, err := DefaultNodeInfoFromToProto(&pbpeerNodeInfo) - if err != nil { - return nil, err - } - - return peerNodeInfo, c.SetDeadline(time.Time{}) -} - -func upgradeSecretConn( - c net.Conn, - timeout time.Duration, - privKey crypto.PrivKey, -) (*conn.SecretConnection, error) { - if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { - return nil, err - } - - sc, err := conn.MakeSecretConnection(c, privKey) - if err != nil { - return nil, err - } - - return sc, sc.SetDeadline(time.Time{}) } -func resolveIPs(resolver IPResolver, c net.Conn) ([]net.IP, error) { - host, _, err := net.SplitHostPort(c.RemoteAddr().String()) - if err != nil { - return nil, err - } - - addrs, err := resolver.LookupIPAddr(context.Background(), host) - if err != nil { - return nil, err - } - - ips := []net.IP{} - - for _, addr := range addrs { - ips = append(ips, addr.IP) - } +// NetAddress returns a NetAddress for the endpoint. +// FIXME: This is temporary for compatibility with the old P2P stack. +func (e Endpoint) NetAddress() *NetAddress { + return &NetAddress{ + ID: e.PeerID, + IP: e.IP, + Port: e.Port, + } +} + +// Connection represents an established connection between two endpoints. +// +// FIXME: This is a temporary interface while we figure out whether we'll be +// adopting QUIC or not. If we do, this should be a byte-oriented multi-stream +// interface with one goroutine consuming each stream, and the MConnection +// transport either needs protocol changes or a shim. For details, see: +// https://github.com/tendermint/spec/pull/227 +// +// FIXME: The interface is currently very broad in order to accommodate +// MConnection behavior that the rest of the P2P stack relies on. This should be +// removed once the P2P core is rewritten. +type Connection interface { + // ReceiveMessage returns the next message received on the connection, + // blocking until one is available. io.EOF is returned when closed. + ReceiveMessage() (chID byte, msg []byte, err error) + + // SendMessage sends a message on the connection. + // FIXME: For compatibility with the current Peer, it returns an additional + // boolean false if the message timed out waiting to be accepted into the + // send buffer. + SendMessage(chID byte, msg []byte) (bool, error) + + // TrySendMessage is a non-blocking version of SendMessage that returns + // immediately if the message buffer is full. It returns true if the message + // was accepted. + // + // FIXME: This is here for backwards-compatibility with the current Peer + // code, and should be removed when possible. + TrySendMessage(chID byte, msg []byte) (bool, error) + + // LocalEndpoint returns the local endpoint for the connection. + LocalEndpoint() Endpoint + + // RemoteEndpoint returns the remote endpoint for the connection. + RemoteEndpoint() Endpoint + + // PubKey returns the remote peer's public key. + PubKey() crypto.PubKey + + // NodeInfo returns the remote peer's node info. + NodeInfo() DefaultNodeInfo + + // Close closes the connection. + Close() error - return ips, nil + // FlushClose flushes all pending sends and then closes the connection. + // + // FIXME: This only exists for backwards-compatibility with the current + // MConnection implementation. There should really be a separate Flush() + // method, but there is no easy way to synchronously flush pending data with + // the current MConnection structure. + FlushClose() error + + // Status returns the current connection status. + // FIXME: Only here for compatibility with the current Peer code. + Status() conn.ConnectionStatus } diff --git a/p2p/transport_mconn.go b/p2p/transport_mconn.go new file mode 100644 index 000000000..24ed4db3d --- /dev/null +++ b/p2p/transport_mconn.go @@ -0,0 +1,675 @@ +package p2p + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "sync" + "time" + + "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/protoio" + "github.com/tendermint/tendermint/p2p/conn" + p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p" + + "golang.org/x/net/netutil" +) + +const ( + defaultDialTimeout = time.Second + defaultFilterTimeout = 5 * time.Second + defaultHandshakeTimeout = 3 * time.Second +) + +// MConnProtocol is the MConn protocol identifier. +const MConnProtocol Protocol = "mconn" + +// MConnTransportOption sets an option for MConnTransport. +type MConnTransportOption func(*MConnTransport) + +// MConnTransportMaxIncomingConnections sets the maximum number of +// simultaneous incoming connections. Default: 0 (unlimited) +func MConnTransportMaxIncomingConnections(max int) MConnTransportOption { + return func(mt *MConnTransport) { mt.maxIncomingConnections = max } +} + +// MConnTransportFilterTimeout sets the timeout for filter callbacks. +func MConnTransportFilterTimeout(timeout time.Duration) MConnTransportOption { + return func(mt *MConnTransport) { mt.filterTimeout = timeout } +} + +// MConnTransportConnFilters sets connection filters. +func MConnTransportConnFilters(filters ...ConnFilterFunc) MConnTransportOption { + return func(mt *MConnTransport) { mt.connFilters = filters } +} + +// ConnFilterFunc is a callback for connection filtering. If it returns an +// error, the connection is rejected. The set of existing connections is passed +// along with the new connection and all resolved IPs. +type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error + +// ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection +// and refuses new ones if they come from a known ip. +var ConnDuplicateIPFilter ConnFilterFunc = func(cs ConnSet, c net.Conn, ips []net.IP) error { + for _, ip := range ips { + if cs.HasIP(ip) { + return ErrRejected{ + conn: c, + err: fmt.Errorf("ip<%v> already connected", ip), + isDuplicate: true, + } + } + } + return nil +} + +// MConnTransport is a Transport implementation using the current multiplexed +// Tendermint protocol ("MConn"). It inherits lots of code and logic from the +// previous implementation for parity with the current P2P stack (such as +// connection filtering, peer verification, and panic handling), which should be +// moved out of the transport once the rest of the P2P stack is rewritten. +type MConnTransport struct { + privKey crypto.PrivKey + nodeInfo DefaultNodeInfo + channelDescs []*ChannelDescriptor + mConnConfig conn.MConnConfig + + maxIncomingConnections int + dialTimeout time.Duration + handshakeTimeout time.Duration + filterTimeout time.Duration + + logger log.Logger + listener net.Listener + + closeOnce sync.Once + chAccept chan *mConnConnection + chError chan error + chClose chan struct{} + + // FIXME: This is a vestige from the old transport, and should be managed + // by the router once we rewrite the P2P core. + conns ConnSet + connFilters []ConnFilterFunc +} + +// NewMConnTransport sets up a new MConn transport. +func NewMConnTransport( + logger log.Logger, + nodeInfo NodeInfo, // FIXME: should use DefaultNodeInfo, left for code compatibility + privKey crypto.PrivKey, + mConnConfig conn.MConnConfig, + opts ...MConnTransportOption, +) *MConnTransport { + m := &MConnTransport{ + privKey: privKey, + nodeInfo: nodeInfo.(DefaultNodeInfo), + mConnConfig: mConnConfig, + channelDescs: []*ChannelDescriptor{}, + + dialTimeout: defaultDialTimeout, + handshakeTimeout: defaultHandshakeTimeout, + filterTimeout: defaultFilterTimeout, + + logger: logger, + chAccept: make(chan *mConnConnection), + chError: make(chan error), + chClose: make(chan struct{}), + + conns: NewConnSet(), + connFilters: []ConnFilterFunc{}, + } + for _, opt := range opts { + opt(m) + } + return m +} + +// SetChannelDescriptors implements Transport. +func (m *MConnTransport) SetChannelDescriptors(chDescs []*conn.ChannelDescriptor) { + m.channelDescs = chDescs +} + +// Listen asynchronously listens for inbound connections on the given endpoint. +// It must be called exactly once before calling Accept(), and the caller must +// call Close() to shut down the listener. +func (m *MConnTransport) Listen(endpoint Endpoint) error { + if m.listener != nil { + return errors.New("MConn transport is already listening") + } + err := m.normalizeEndpoint(&endpoint) + if err != nil { + return fmt.Errorf("invalid MConn listen endpoint %q: %w", endpoint, err) + } + + m.listener, err = net.Listen("tcp", fmt.Sprintf("%v:%v", endpoint.IP, endpoint.Port)) + if err != nil { + return err + } + if m.maxIncomingConnections > 0 { + m.listener = netutil.LimitListener(m.listener, m.maxIncomingConnections) + } + + // Spawn a goroutine to accept inbound connections asynchronously. + go m.accept() + + return nil +} + +// accept accepts inbound connections in a loop, and asynchronously handshakes +// with the peer to avoid head-of-line blocking. Established connections are +// passed to Accept() via chAccept. +// See: https://github.com/tendermint/tendermint/issues/204 +func (m *MConnTransport) accept() { + for { + tcpConn, err := m.listener.Accept() + if err != nil { + // We have to check for closure first, since we don't want to + // propagate "use of closed network connection" errors. + select { + case <-m.chClose: + default: + // We also select on chClose here, in case the transport closes + // while we're blocked on error propagation. + select { + case m.chError <- err: + case <-m.chClose: + } + } + return + } + + go func() { + err := m.filterTCPConn(tcpConn) + if err != nil { + if err := tcpConn.Close(); err != nil { + m.logger.Debug("failed to close TCP connection", "err", err) + } + select { + case m.chError <- err: + case <-m.chClose: + } + return + } + + conn, err := newMConnConnection(m, tcpConn, "") + if err != nil { + m.conns.Remove(tcpConn) + if err := tcpConn.Close(); err != nil { + m.logger.Debug("failed to close TCP connection", "err", err) + } + select { + case m.chError <- err: + case <-m.chClose: + } + } else { + select { + case m.chAccept <- conn: + case <-m.chClose: + if err := tcpConn.Close(); err != nil { + m.logger.Debug("failed to close TCP connection", "err", err) + } + } + } + }() + } +} + +// Accept implements Transport. +// +// accept() runs a concurrent accept loop that accepts inbound connections +// and then handshakes in a non-blocking fashion. The handshaked and validated +// connections are returned via this call, picking them off of the chAccept +// channel (or the handshake error, if any). +func (m *MConnTransport) Accept(ctx context.Context) (Connection, error) { + select { + case conn := <-m.chAccept: + return conn, nil + case err := <-m.chError: + return nil, err + case <-m.chClose: + return nil, ErrTransportClosed{} + case <-ctx.Done(): + return nil, nil + } +} + +// Dial implements Transport. +func (m *MConnTransport) Dial(ctx context.Context, endpoint Endpoint) (Connection, error) { + err := m.normalizeEndpoint(&endpoint) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(ctx, m.dialTimeout) + defer cancel() + + dialer := net.Dialer{} + tcpConn, err := dialer.DialContext(ctx, "tcp", fmt.Sprintf("%v:%v", endpoint.IP, endpoint.Port)) + if err != nil { + return nil, err + } + + err = m.filterTCPConn(tcpConn) + if err != nil { + if err := tcpConn.Close(); err != nil { + m.logger.Debug("failed to close TCP connection", "err", err) + } + return nil, err + } + + conn, err := newMConnConnection(m, tcpConn, endpoint.PeerID) + if err != nil { + m.conns.Remove(tcpConn) + if err := tcpConn.Close(); err != nil { + m.logger.Debug("failed to close TCP connection", "err", err) + } + return nil, err + } + + return conn, nil +} + +// Endpoints implements Transport. +func (m *MConnTransport) Endpoints() []Endpoint { + if m.listener == nil { + return []Endpoint{} + } + addr := m.listener.Addr().(*net.TCPAddr) + return []Endpoint{{ + Protocol: MConnProtocol, + PeerID: m.nodeInfo.ID(), + IP: addr.IP, + Port: uint16(addr.Port), + }} +} + +// Close implements Transport. +func (m *MConnTransport) Close() error { + var err error + m.closeOnce.Do(func() { + // We have to close chClose first, so that accept() will detect + // the closure and not propagate the error. + close(m.chClose) + if m.listener != nil { + err = m.listener.Close() + } + }) + return err +} + +// filterTCPConn filters a TCP connection, rejecting it if this function errors. +func (m *MConnTransport) filterTCPConn(tcpConn net.Conn) error { + if m.conns.Has(tcpConn) { + return ErrRejected{conn: tcpConn, isDuplicate: true} + } + + host, _, err := net.SplitHostPort(tcpConn.RemoteAddr().String()) + if err != nil { + return err + } + ip := net.ParseIP(host) + if ip == nil { + return fmt.Errorf("connection address has invalid IP address %q", host) + } + + // Apply filter callbacks. + chErr := make(chan error, len(m.connFilters)) + for _, connFilter := range m.connFilters { + go func(connFilter ConnFilterFunc) { + chErr <- connFilter(m.conns, tcpConn, []net.IP{ip}) + }(connFilter) + } + + for i := 0; i < cap(chErr); i++ { + select { + case err := <-chErr: + if err != nil { + return ErrRejected{conn: tcpConn, err: err, isFiltered: true} + } + case <-time.After(m.filterTimeout): + return ErrFilterTimeout{} + } + + } + + // FIXME: Doesn't really make sense to set this here, but we preserve the + // behavior from the previous P2P transport implementation. This should + // be moved to the router. + m.conns.Set(tcpConn, []net.IP{ip}) + return nil +} + +// normalizeEndpoint normalizes and validates an endpoint. +func (m *MConnTransport) normalizeEndpoint(endpoint *Endpoint) error { + if endpoint == nil { + return errors.New("nil endpoint") + } + if err := endpoint.Validate(); err != nil { + return err + } + if endpoint.Protocol == "" { + endpoint.Protocol = MConnProtocol + } + if endpoint.Protocol != MConnProtocol { + return fmt.Errorf("unsupported protocol %q", endpoint.Protocol) + } + if len(endpoint.IP) == 0 { + return errors.New("endpoint must have an IP address") + } + if endpoint.Path != "" { + return fmt.Errorf("endpoint cannot have path (got %q)", endpoint.Path) + } + if endpoint.Port == 0 { + endpoint.Port = 26657 + } + return nil +} + +// mConnConnection implements Connection for MConnTransport. It takes a base TCP +// connection and upgrades it to MConnection over an encrypted SecretConnection. +type mConnConnection struct { + logger log.Logger + transport *MConnTransport + secretConn *conn.SecretConnection + mConn *conn.MConnection + + peerInfo DefaultNodeInfo + + closeOnce sync.Once + chReceive chan mConnMessage + chError chan error + chClose chan struct{} +} + +// mConnMessage passes MConnection messages through internal channels. +type mConnMessage struct { + channelID byte + payload []byte +} + +// newMConnConnection creates a new mConnConnection by handshaking +// with a peer. +func newMConnConnection( + transport *MConnTransport, + tcpConn net.Conn, + expectPeerID ID, +) (c *mConnConnection, err error) { + // FIXME: Since the MConnection code panics, we need to recover here + // and turn it into an error. Be careful not to alias err, so we can + // update it from within this function. We should remove panics instead. + defer func() { + if r := recover(); r != nil { + err = ErrRejected{ + conn: tcpConn, + err: fmt.Errorf("recovered from panic: %v", r), + isAuthFailure: true, + } + } + }() + + err = tcpConn.SetDeadline(time.Now().Add(transport.handshakeTimeout)) + if err != nil { + err = ErrRejected{ + conn: tcpConn, + err: fmt.Errorf("secret conn failed: %v", err), + isAuthFailure: true, + } + return + } + + c = &mConnConnection{ + transport: transport, + chReceive: make(chan mConnMessage), + chError: make(chan error), + chClose: make(chan struct{}), + } + c.secretConn, err = conn.MakeSecretConnection(tcpConn, transport.privKey) + if err != nil { + err = ErrRejected{ + conn: tcpConn, + err: fmt.Errorf("secret conn failed: %v", err), + isAuthFailure: true, + } + return + } + c.peerInfo, err = c.handshake() + if err != nil { + err = ErrRejected{ + conn: tcpConn, + err: fmt.Errorf("handshake failed: %v", err), + isAuthFailure: true, + } + return + } + + // Validate node info. + // FIXME: All of the ID verification code below should be moved to the + // router once implemented. + err = c.peerInfo.Validate() + if err != nil { + err = ErrRejected{ + conn: tcpConn, + err: err, + isNodeInfoInvalid: true, + } + return + } + + // For outgoing conns, ensure connection key matches dialed key. + if expectPeerID != "" { + peerID := PubKeyToID(c.PubKey()) + if expectPeerID != peerID { + err = ErrRejected{ + conn: tcpConn, + id: peerID, + err: fmt.Errorf( + "conn.ID (%v) dialed ID (%v) mismatch", + peerID, + expectPeerID, + ), + isAuthFailure: true, + } + return + } + } + + // Reject self. + if transport.nodeInfo.ID() == c.peerInfo.ID() { + err = ErrRejected{ + addr: *NewNetAddress(c.peerInfo.ID(), c.secretConn.RemoteAddr()), + conn: tcpConn, + id: c.peerInfo.ID(), + isSelf: true, + } + return + } + + err = transport.nodeInfo.CompatibleWith(c.peerInfo) + if err != nil { + err = ErrRejected{ + conn: tcpConn, + err: err, + id: c.peerInfo.ID(), + isIncompatible: true, + } + return + } + + err = tcpConn.SetDeadline(time.Time{}) + if err != nil { + err = ErrRejected{ + conn: tcpConn, + err: fmt.Errorf("secret conn failed: %v", err), + isAuthFailure: true, + } + return + } + + // Set up the MConnection wrapper + c.mConn = conn.NewMConnectionWithConfig( + c.secretConn, + transport.channelDescs, + c.onReceive, + c.onError, + transport.mConnConfig, + ) + // FIXME: Log format is set up for compatibility with existing peer code. + c.logger = transport.logger.With("peer", c.RemoteEndpoint().NetAddress()) + c.mConn.SetLogger(c.logger) + err = c.mConn.Start() + return c, err +} + +// handshake performs an MConn handshake, returning the peer's node info. +func (c *mConnConnection) handshake() (DefaultNodeInfo, error) { + var pbNodeInfo p2pproto.DefaultNodeInfo + chErr := make(chan error, 2) + go func() { + _, err := protoio.NewDelimitedWriter(c.secretConn).WriteMsg(c.transport.nodeInfo.ToProto()) + chErr <- err + }() + go func() { + chErr <- protoio.NewDelimitedReader(c.secretConn, MaxNodeInfoSize()).ReadMsg(&pbNodeInfo) + }() + for i := 0; i < cap(chErr); i++ { + if err := <-chErr; err != nil { + return DefaultNodeInfo{}, err + } + } + + return DefaultNodeInfoFromProto(&pbNodeInfo) +} + +// onReceive is a callback for MConnection received messages. +func (c *mConnConnection) onReceive(channelID byte, payload []byte) { + select { + case c.chReceive <- mConnMessage{channelID: channelID, payload: payload}: + case <-c.chClose: + } +} + +// onError is a callback for MConnection errors. The error is passed to +// chError, which is only consumed by ReceiveMessage() for parity with +// the old MConnection behavior. +func (c *mConnConnection) onError(e interface{}) { + err, ok := e.(error) + if !ok { + err = fmt.Errorf("%v", err) + } + select { + case c.chError <- err: + case <-c.chClose: + } +} + +// String displays connection information. +// FIXME: This is here for backwards compatibility with existing code, +// it should probably just return RemoteEndpoint().String(), if anything. +func (c *mConnConnection) String() string { + endpoint := c.RemoteEndpoint() + return fmt.Sprintf("MConn{%v:%v}", endpoint.IP, endpoint.Port) +} + +// SendMessage implements Connection. +func (c *mConnConnection) SendMessage(channelID byte, msg []byte) (bool, error) { + // We don't check chError here, to preserve old MConnection behavior. + select { + case <-c.chClose: + return false, io.EOF + default: + return c.mConn.Send(channelID, msg), nil + } +} + +// TrySendMessage implements Connection. +func (c *mConnConnection) TrySendMessage(channelID byte, msg []byte) (bool, error) { + // We don't check chError here, to preserve old MConnection behavior. + select { + case <-c.chClose: + return false, io.EOF + default: + return c.mConn.TrySend(channelID, msg), nil + } +} + +// ReceiveMessage implements Connection. +func (c *mConnConnection) ReceiveMessage() (byte, []byte, error) { + select { + case err := <-c.chError: + return 0, nil, err + case <-c.chClose: + return 0, nil, io.EOF + case msg := <-c.chReceive: + return msg.channelID, msg.payload, nil + } +} + +// NodeInfo implements Connection. +func (c *mConnConnection) NodeInfo() DefaultNodeInfo { + return c.peerInfo +} + +// PubKey implements Connection. +func (c *mConnConnection) PubKey() crypto.PubKey { + return c.secretConn.RemotePubKey() +} + +// LocalEndpoint implements Connection. +func (c *mConnConnection) LocalEndpoint() Endpoint { + // FIXME: For compatibility with existing P2P tests we need to + // handle non-TCP connections. This should be removed. + endpoint := Endpoint{ + Protocol: MConnProtocol, + PeerID: c.transport.nodeInfo.ID(), + } + if addr, ok := c.secretConn.LocalAddr().(*net.TCPAddr); ok { + endpoint.IP = addr.IP + endpoint.Port = uint16(addr.Port) + } + return endpoint +} + +// RemoteEndpoint implements Connection. +func (c *mConnConnection) RemoteEndpoint() Endpoint { + // FIXME: For compatibility with existing P2P tests we need to + // handle non-TCP connections. This should be removed. + endpoint := Endpoint{ + Protocol: MConnProtocol, + PeerID: c.peerInfo.ID(), + } + if addr, ok := c.secretConn.RemoteAddr().(*net.TCPAddr); ok { + endpoint.IP = addr.IP + endpoint.Port = uint16(addr.Port) + } + return endpoint +} + +// Status implements Connection. +func (c *mConnConnection) Status() conn.ConnectionStatus { + return c.mConn.Status() +} + +// Close implements Connection. +func (c *mConnConnection) Close() error { + c.transport.conns.RemoveAddr(c.secretConn.RemoteAddr()) + var err error + c.closeOnce.Do(func() { + err = c.mConn.Stop() + close(c.chClose) + }) + return err +} + +// FlushClose implements Connection. +func (c *mConnConnection) FlushClose() error { + c.transport.conns.RemoveAddr(c.secretConn.RemoteAddr()) + c.closeOnce.Do(func() { + c.mConn.FlushStop() + close(c.chClose) + }) + return nil +} diff --git a/p2p/transport_test.go b/p2p/transport_test.go deleted file mode 100644 index 349291f47..000000000 --- a/p2p/transport_test.go +++ /dev/null @@ -1,696 +0,0 @@ -package p2p - -import ( - "fmt" - "math/rand" - "net" - "reflect" - "runtime" - "strings" - "testing" - "time" - - "github.com/tendermint/tendermint/crypto/ed25519" - "github.com/tendermint/tendermint/libs/protoio" - "github.com/tendermint/tendermint/p2p/conn" - tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p" -) - -var defaultNodeName = "host_peer" - -func emptyNodeInfo() NodeInfo { - return DefaultNodeInfo{} -} - -// newMultiplexTransport returns a tcp connected multiplexed peer -// using the default MConnConfig. It's a convenience function used -// for testing. -func newMultiplexTransport( - nodeInfo NodeInfo, - nodeKey NodeKey, -) *MultiplexTransport { - return NewMultiplexTransport( - nodeInfo, nodeKey, conn.DefaultMConnConfig(), - ) -} - -func TestTransportMultiplexConnFilter(t *testing.T) { - mt := newMultiplexTransport( - emptyNodeInfo(), - GenNodeKey(), - ) - id := mt.nodeKey.ID - - MultiplexTransportConnFilters( - func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil }, - func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil }, - func(_ ConnSet, _ net.Conn, _ []net.IP) error { - return fmt.Errorf("rejected") - }, - )(mt) - - addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) - if err != nil { - t.Fatal(err) - } - - if err := mt.Listen(*addr); err != nil { - t.Fatal(err) - } - - errc := make(chan error) - - go func() { - addr := NewNetAddress(id, mt.listener.Addr()) - - _, err := addr.Dial() - if err != nil { - errc <- err - return - } - - close(errc) - }() - - if err := <-errc; err != nil { - t.Errorf("connection failed: %v", err) - } - - _, err = mt.Accept(peerConfig{}) - if err, ok := err.(ErrRejected); ok { - if !err.IsFiltered() { - t.Errorf("expected peer to be filtered, got %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", err) - } -} - -func TestTransportMultiplexConnFilterTimeout(t *testing.T) { - mt := newMultiplexTransport( - emptyNodeInfo(), - GenNodeKey(), - ) - id := mt.nodeKey.ID - - MultiplexTransportFilterTimeout(5 * time.Millisecond)(mt) - MultiplexTransportConnFilters( - func(_ ConnSet, _ net.Conn, _ []net.IP) error { - time.Sleep(1 * time.Second) - return nil - }, - )(mt) - - addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) - if err != nil { - t.Fatal(err) - } - - if err := mt.Listen(*addr); err != nil { - t.Fatal(err) - } - - errc := make(chan error) - go func() { - addr := NewNetAddress(id, mt.listener.Addr()) - - _, err := addr.Dial() - if err != nil { - errc <- err - return - } - - close(errc) - }() - - if err := <-errc; err != nil { - t.Errorf("connection failed: %v", err) - } - - _, err = mt.Accept(peerConfig{}) - if _, ok := err.(ErrFilterTimeout); !ok { - t.Errorf("expected ErrFilterTimeout, got %v", err) - } -} - -func TestTransportMultiplexMaxIncomingConnections(t *testing.T) { - pv := ed25519.GenPrivKey() - id := PubKeyToID(pv.PubKey()) - mt := newMultiplexTransport( - testNodeInfo( - id, "transport", - ), - NodeKey{ - ID: id, - PrivKey: pv, - }, - ) - - MultiplexTransportMaxIncomingConnections(0)(mt) - - addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) - if err != nil { - t.Fatal(err) - } - const maxIncomingConns = 2 - MultiplexTransportMaxIncomingConnections(maxIncomingConns)(mt) - if err := mt.Listen(*addr); err != nil { - t.Fatal(err) - } - - laddr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - // Connect more peers than max - for i := 0; i <= maxIncomingConns; i++ { - errc := make(chan error) - go testDialer(*laddr, errc) - - err = <-errc - if i < maxIncomingConns { - if err != nil { - t.Errorf("dialer connection failed: %v", err) - } - _, err = mt.Accept(peerConfig{}) - if err != nil { - t.Errorf("connection failed: %v", err) - } - } else if err == nil || !strings.Contains(err.Error(), "i/o timeout") { - // mt actually blocks forever on trying to accept a new peer into a full channel so - // expect the dialer to encounter a timeout error. Calling mt.Accept will block until - // mt is closed. - t.Errorf("expected i/o timeout error, got %v", err) - } - } -} - -func TestTransportMultiplexAcceptMultiple(t *testing.T) { - mt := testSetupMultiplexTransport(t) - laddr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - var ( - seed = rand.New(rand.NewSource(time.Now().UnixNano())) - nDialers = seed.Intn(64) + 64 - errc = make(chan error, nDialers) - ) - - // Setup dialers. - for i := 0; i < nDialers; i++ { - go testDialer(*laddr, errc) - } - - // Catch connection errors. - for i := 0; i < nDialers; i++ { - if err := <-errc; err != nil { - t.Fatal(err) - } - } - - ps := []Peer{} - - // Accept all peers. - for i := 0; i < cap(errc); i++ { - p, err := mt.Accept(peerConfig{}) - if err != nil { - t.Fatal(err) - } - - if err := p.Start(); err != nil { - t.Fatal(err) - } - - ps = append(ps, p) - } - - if have, want := len(ps), cap(errc); have != want { - t.Errorf("have %v, want %v", have, want) - } - - // Stop all peers. - for _, p := range ps { - if err := p.Stop(); err != nil { - t.Fatal(err) - } - } - - if err := mt.Close(); err != nil { - t.Errorf("close errored: %v", err) - } -} - -func testDialer(dialAddr NetAddress, errc chan error) { - var ( - pv = ed25519.GenPrivKey() - id = PubKeyToID(pv.PubKey()) - dialer = newMultiplexTransport( - testNodeInfo(id, defaultNodeName), - NodeKey{ - ID: id, - PrivKey: pv, - }, - ) - ) - - _, err := dialer.Dial(dialAddr, peerConfig{}) - if err != nil { - errc <- err - return - } - - // Signal that the connection was established. - errc <- nil -} - -func TestTransportMultiplexAcceptNonBlocking(t *testing.T) { - mt := testSetupMultiplexTransport(t) - - var ( - fastNodePV = ed25519.GenPrivKey() - fastNodeInfo = testNodeInfo(PubKeyToID(fastNodePV.PubKey()), "fastnode") - errc = make(chan error) - fastc = make(chan struct{}) - slowc = make(chan struct{}) - slowdonec = make(chan struct{}) - ) - - // Simulate slow Peer. - go func() { - addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - c, err := addr.Dial() - if err != nil { - errc <- err - return - } - - close(slowc) - defer func() { - close(slowdonec) - }() - - // Make sure we switch to fast peer goroutine. - runtime.Gosched() - - select { - case <-fastc: - // Fast peer connected. - case <-time.After(200 * time.Millisecond): - // We error if the fast peer didn't succeed. - errc <- fmt.Errorf("fast peer timed out") - } - - sc, err := upgradeSecretConn(c, 200*time.Millisecond, ed25519.GenPrivKey()) - if err != nil { - errc <- err - return - } - - _, err = handshake(sc, 200*time.Millisecond, - testNodeInfo( - PubKeyToID(ed25519.GenPrivKey().PubKey()), - "slow_peer", - )) - if err != nil { - errc <- err - } - }() - - // Simulate fast Peer. - go func() { - <-slowc - - var ( - dialer = newMultiplexTransport( - fastNodeInfo, - NodeKey{ - ID: PubKeyToID(fastNodePV.PubKey()), - PrivKey: fastNodePV, - }, - ) - ) - addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - _, err := dialer.Dial(*addr, peerConfig{}) - if err != nil { - errc <- err - return - } - - close(fastc) - <-slowdonec - close(errc) - }() - - if err := <-errc; err != nil { - t.Logf("connection failed: %v", err) - } - - p, err := mt.Accept(peerConfig{}) - if err != nil { - t.Fatal(err) - } - - if have, want := p.NodeInfo(), fastNodeInfo; !reflect.DeepEqual(have, want) { - t.Errorf("have %v, want %v", have, want) - } -} - -func TestTransportMultiplexValidateNodeInfo(t *testing.T) { - mt := testSetupMultiplexTransport(t) - - errc := make(chan error) - - go func() { - var ( - pv = ed25519.GenPrivKey() - id = PubKeyToID(pv.PubKey()) - dialer = newMultiplexTransport( - testNodeInfo(id, ""), // Should not be empty - NodeKey{ - ID: id, - PrivKey: pv, - }, - ) - ) - - addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - _, err := dialer.Dial(*addr, peerConfig{}) - if err != nil { - errc <- err - return - } - - close(errc) - }() - - if err := <-errc; err != nil { - t.Errorf("connection failed: %v", err) - } - - _, err := mt.Accept(peerConfig{}) - if err, ok := err.(ErrRejected); ok { - if !err.IsNodeInfoInvalid() { - t.Errorf("expected NodeInfo to be invalid, got %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", err) - } -} - -func TestTransportMultiplexRejectMissmatchID(t *testing.T) { - mt := testSetupMultiplexTransport(t) - - errc := make(chan error) - - go func() { - dialer := newMultiplexTransport( - testNodeInfo( - PubKeyToID(ed25519.GenPrivKey().PubKey()), "dialer", - ), - GenNodeKey(), - ) - addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - _, err := dialer.Dial(*addr, peerConfig{}) - if err != nil { - errc <- err - return - } - - close(errc) - }() - - if err := <-errc; err != nil { - t.Errorf("connection failed: %v", err) - } - - _, err := mt.Accept(peerConfig{}) - if err, ok := err.(ErrRejected); ok { - if !err.IsAuthFailure() { - t.Errorf("expected auth failure, got %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", err) - } -} - -func TestTransportMultiplexDialRejectWrongID(t *testing.T) { - mt := testSetupMultiplexTransport(t) - - var ( - pv = ed25519.GenPrivKey() - id = PubKeyToID(pv.PubKey()) - dialer = newMultiplexTransport( - testNodeInfo(id, ""), // Should not be empty - NodeKey{ - ID: id, - PrivKey: pv, - }, - ) - ) - - wrongID := PubKeyToID(ed25519.GenPrivKey().PubKey()) - addr := NewNetAddress(wrongID, mt.listener.Addr()) - - _, err := dialer.Dial(*addr, peerConfig{}) - if err != nil { - t.Logf("connection failed: %v", err) - if err, ok := err.(ErrRejected); ok { - if !err.IsAuthFailure() { - t.Errorf("expected auth failure, got %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", err) - } - } -} - -func TestTransportMultiplexRejectIncompatible(t *testing.T) { - mt := testSetupMultiplexTransport(t) - - errc := make(chan error) - - go func() { - var ( - pv = ed25519.GenPrivKey() - id = PubKeyToID(pv.PubKey()) - dialer = newMultiplexTransport( - testNodeInfoWithNetwork(id, "dialer", "incompatible-network"), - NodeKey{ - ID: id, - PrivKey: pv, - }, - ) - ) - addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - _, err := dialer.Dial(*addr, peerConfig{}) - if err != nil { - errc <- err - return - } - - close(errc) - }() - - _, err := mt.Accept(peerConfig{}) - if err, ok := err.(ErrRejected); ok { - if !err.IsIncompatible() { - t.Errorf("expected to reject incompatible, got %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", err) - } -} - -func TestTransportMultiplexRejectSelf(t *testing.T) { - mt := testSetupMultiplexTransport(t) - - errc := make(chan error) - - go func() { - addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr()) - - _, err := mt.Dial(*addr, peerConfig{}) - if err != nil { - errc <- err - return - } - - close(errc) - }() - - if err := <-errc; err != nil { - if err, ok := err.(ErrRejected); ok { - if !err.IsSelf() { - t.Errorf("expected to reject self, got: %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", err) - } - } else { - t.Errorf("expected connection failure") - } - - _, err := mt.Accept(peerConfig{}) - if err, ok := err.(ErrRejected); ok { - if !err.IsSelf() { - t.Errorf("expected to reject self, got: %v", err) - } - } else { - t.Errorf("expected ErrRejected, got %v", nil) - } -} - -func TestTransportConnDuplicateIPFilter(t *testing.T) { - filter := ConnDuplicateIPFilter() - - if err := filter(nil, &testTransportConn{}, nil); err != nil { - t.Fatal(err) - } - - var ( - c = &testTransportConn{} - cs = NewConnSet() - ) - - cs.Set(c, []net.IP{ - {10, 0, 10, 1}, - {10, 0, 10, 2}, - {10, 0, 10, 3}, - }) - - if err := filter(cs, c, []net.IP{ - {10, 0, 10, 2}, - }); err == nil { - t.Errorf("expected Peer to be rejected as duplicate") - } -} - -func TestTransportHandshake(t *testing.T) { - ln, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - - var ( - peerPV = ed25519.GenPrivKey() - peerNodeInfo = testNodeInfo(PubKeyToID(peerPV.PubKey()), defaultNodeName) - ) - - go func() { - c, err := net.Dial(ln.Addr().Network(), ln.Addr().String()) - if err != nil { - t.Error(err) - return - } - - go func(c net.Conn) { - _, err := protoio.NewDelimitedWriter(c).WriteMsg(peerNodeInfo.(DefaultNodeInfo).ToProto()) - if err != nil { - t.Error(err) - } - }(c) - go func(c net.Conn) { - var ( - // ni DefaultNodeInfo - pbni tmp2p.DefaultNodeInfo - ) - - protoReader := protoio.NewDelimitedReader(c, MaxNodeInfoSize()) - err := protoReader.ReadMsg(&pbni) - if err != nil { - t.Error(err) - } - - _, err = DefaultNodeInfoFromToProto(&pbni) - if err != nil { - t.Error(err) - } - }(c) - }() - - c, err := ln.Accept() - if err != nil { - t.Fatal(err) - } - - ni, err := handshake(c, 20*time.Millisecond, emptyNodeInfo()) - if err != nil { - t.Fatal(err) - } - - if have, want := ni, peerNodeInfo; !reflect.DeepEqual(have, want) { - t.Errorf("have %v, want %v", have, want) - } -} - -// create listener -func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport { - var ( - pv = ed25519.GenPrivKey() - id = PubKeyToID(pv.PubKey()) - mt = newMultiplexTransport( - testNodeInfo( - id, "transport", - ), - NodeKey{ - ID: id, - PrivKey: pv, - }, - ) - ) - - addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) - if err != nil { - t.Fatal(err) - } - - if err := mt.Listen(*addr); err != nil { - t.Fatal(err) - } - - // give the listener some time to get ready - time.Sleep(20 * time.Millisecond) - - return mt -} - -type testTransportAddr struct{} - -func (a *testTransportAddr) Network() string { return "tcp" } -func (a *testTransportAddr) String() string { return "test.local:1234" } - -type testTransportConn struct{} - -func (c *testTransportConn) Close() error { - return fmt.Errorf("close() not implemented") -} - -func (c *testTransportConn) LocalAddr() net.Addr { - return &testTransportAddr{} -} - -func (c *testTransportConn) RemoteAddr() net.Addr { - return &testTransportAddr{} -} - -func (c *testTransportConn) Read(_ []byte) (int, error) { - return -1, fmt.Errorf("read() not implemented") -} - -func (c *testTransportConn) SetDeadline(_ time.Time) error { - return fmt.Errorf("setDeadline() not implemented") -} - -func (c *testTransportConn) SetReadDeadline(_ time.Time) error { - return fmt.Errorf("setReadDeadline() not implemented") -} - -func (c *testTransportConn) SetWriteDeadline(_ time.Time) error { - return fmt.Errorf("setWriteDeadline() not implemented") -} - -func (c *testTransportConn) Write(_ []byte) (int, error) { - return -1, fmt.Errorf("write() not implemented") -} diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index 3a818f54f..44d7a3dd6 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -222,7 +222,7 @@ type Node struct { privValidator types.PrivValidator // local node's validator key // network - transport *p2p.MultiplexTransport + transport *p2p.MConnTransport sw *p2p.Switch // p2p connections addrBook pex.AddrBook // known peers nodeInfo p2p.NodeInfo @@ -453,23 +453,22 @@ func createConsensusReactor(config *cfg.Config, } func createTransport( + logger log.Logger, config *cfg.Config, nodeInfo p2p.NodeInfo, nodeKey p2p.NodeKey, proxyApp proxy.AppConns, ) ( - *p2p.MultiplexTransport, + *p2p.MConnTransport, []p2p.PeerFilterFunc, ) { var ( - mConnConfig = p2p.MConnConfig(config.P2P) - transport = p2p.NewMultiplexTransport(nodeInfo, nodeKey, mConnConfig) connFilters = []p2p.ConnFilterFunc{} peerFilters = []p2p.PeerFilterFunc{} ) if !config.P2P.AllowDuplicateIP { - connFilters = append(connFilters, p2p.ConnDuplicateIPFilter()) + connFilters = append(connFilters, p2p.ConnDuplicateIPFilter) } // Filter peers by addr or pubkey with an ABCI query. @@ -512,11 +511,12 @@ func createTransport( ) } - p2p.MultiplexTransportConnFilters(connFilters...)(transport) - - // Limit the number of incoming connections. - max := config.P2P.MaxNumInboundPeers + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) - p2p.MultiplexTransportMaxIncomingConnections(max)(transport) + transport := p2p.NewMConnTransport( + logger, nodeInfo, nodeKey.PrivKey, p2p.MConnConfig(config.P2P), + p2p.MConnTransportConnFilters(connFilters...), + p2p.MConnTransportMaxIncomingConnections(config.P2P.MaxNumInboundPeers+ + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))), + ) return transport, peerFilters } @@ -808,11 +808,9 @@ func NewNode(config *cfg.Config, return nil, err } - // Setup Transport. - transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp) - - // Setup Switch. + // Setup Transport and Switch. p2pLogger := logger.With("module", "p2p") + transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) sw := createSwitch( config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, stateSyncReactorShim, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, @@ -921,29 +919,30 @@ func (n *Node) OnStart() error { n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr) } - // Start the transport. - addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress)) - if err != nil { - return err - } - if err := n.transport.Listen(*addr); err != nil { - return err - } - - n.isListening = true - + // Start the mempool. if n.config.Mempool.WalEnabled() { - err = n.mempool.InitWAL() + err := n.mempool.InitWAL() if err != nil { return fmt.Errorf("init mempool WAL: %w", err) } } // Start the switch (the P2P server). - err = n.sw.Start() + err := n.sw.Start() + if err != nil { + return err + } + + // Start the transport. + addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress)) if err != nil { return err } + if err := n.transport.Listen(addr.Endpoint()); err != nil { + return err + } + + n.isListening = true // Start the real state sync reactor separately since the switch uses the shim. if err := n.stateSyncReactor.Start(); err != nil {