Browse Source

p2p: implement new Transport interface (#5791)

This implements a new `Transport` interface and related types for the P2P refactor in #5670. Previously, `conn.MConnection` was very tightly coupled to the `Peer` implementation -- in order to allow alternative non-multiplexed transports (e.g. QUIC), MConnection has now been moved below the `Transport` interface, as `MConnTransport`, and decoupled from the peer. Since the `p2p` package is not covered by our Go API stability, this is not considered a breaking change, and not listed in the changelog.

The initial approach was to implement the new interface in its final form (which also involved possible protocol changes, see https://github.com/tendermint/spec/pull/227). However, it turned out that this would require a large amount of changes to existing P2P code because of the previous tight coupling between `Peer` and `MConnection` and the reliance on subtleties in the MConnection behavior. Instead, I have broadened the `Transport` interface to expose much of the existing MConnection interface, preserved much of the existing MConnection logic and behavior in the transport implementation, and tried to make as few changes to the rest of the P2P stack as possible. We will instead reduce this interface gradually as we refactor other parts of the P2P stack.

The low-level transport code and protocol (e.g. MConnection, SecretConnection and so on) has not been significantly changed, and refactoring this is not a priority until we come up with a plan for QUIC adoption, as we may end up discarding the MConnection code entirely.

There are no tests of the new `MConnTransport`, as this code is likely to evolve as we proceed with the P2P refactor, but tests should be added before a final release. The E2E tests are sufficient for basic validation in the meanwhile.
pull/5799/head
Erik Grinaker 4 years ago
committed by GitHub
parent
commit
bcfc889f25
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1098 additions and 1591 deletions
  1. +24
    -41
      consensus/byzantine_test.go
  2. +7
    -1
      libs/log/testing_logger.go
  3. +26
    -27
      node/node.go
  4. +2
    -7
      p2p/conn/connection.go
  5. +10
    -0
      p2p/netaddress.go
  6. +1
    -1
      p2p/node_info.go
  7. +75
    -108
      p2p/peer.go
  8. +13
    -23
      p2p/peer_test.go
  9. +13
    -8
      p2p/pex/pex_reactor_test.go
  10. +45
    -23
      p2p/switch.go
  11. +40
    -35
      p2p/switch_test.go
  12. +12
    -33
      p2p/test_util.go
  13. +129
    -561
      p2p/transport.go
  14. +675
    -0
      p2p/transport_mconn.go
  15. +0
    -696
      p2p/transport_test.go
  16. +26
    -27
      test/maverick/node/node.go

+ 24
- 41
consensus/byzantine_test.go View File

@ -224,18 +224,7 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
ticker.SetLogger(css[0].Logger)
css[0].SetTimeoutTicker(ticker)
switches := make([]*p2p.Switch, N)
p2pLogger := logger.With("module", "p2p")
for i := 0; i < N; i++ {
switches[i] = p2p.MakeSwitch(
config.P2P,
i,
"foo", "1.0.0",
func(i int, sw *p2p.Switch) *p2p.Switch {
return sw
})
switches[i].SetLogger(p2pLogger.With("validator", i))
}
blocksSubs := make([]types.Subscription, N)
reactors := make([]p2p.Reactor, N)
@ -243,20 +232,6 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
// enable txs so we can create different proposals
assertMempool(css[i].txNotifier).EnableTxsAvailable()
// make first val byzantine
if i == 0 {
// NOTE: Now, test validators are MockPV, which by default doesn't
// do any safety checks.
css[i].privValidator.(types.MockPV).DisableChecks()
css[i].decideProposal = func(j int32) func(int64, int32) {
return func(height int64, round int32) {
byzantineDecideProposalFunc(t, height, round, css[j], switches[j])
}
}(int32(i))
// We are setting the prevote function to do nothing because the prevoting
// and precommitting are done alongside the proposal.
css[i].doPrevote = func(height int64, round int32) {}
}
eventBus := css[i].eventBus
eventBus.SetLogger(logger.With("module", "events", "validator", i))
@ -281,22 +256,10 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
require.NoError(t, err)
}
defer func() {
for _, r := range reactors {
if rr, ok := r.(*ByzantineReactor); ok {
err := rr.reactor.Switch.Stop()
require.NoError(t, err)
} else {
err := r.(*Reactor).Switch.Stop()
require.NoError(t, err)
}
}
}()
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
// ignore new switch s, we already made ours
switches[i].AddReactor("CONSENSUS", reactors[i])
return switches[i]
switches := p2p.MakeConnectedSwitches(config.P2P, N, func(i int, sw *p2p.Switch) *p2p.Switch {
sw.SetLogger(p2pLogger.With("validator", i))
sw.AddReactor("CONSENSUS", reactors[i])
return sw
}, func(sws []*p2p.Switch, i, j int) {
// the network starts partitioned with globally active adversary
if i != 0 {
@ -305,6 +268,26 @@ func TestByzantineConflictingProposalsWithPartition(t *testing.T) {
p2p.Connect2Switches(sws, i, j)
})
// make first val byzantine
// NOTE: Now, test validators are MockPV, which by default doesn't
// do any safety checks.
css[0].privValidator.(types.MockPV).DisableChecks()
css[0].decideProposal = func(j int32) func(int64, int32) {
return func(height int64, round int32) {
byzantineDecideProposalFunc(t, height, round, css[j], switches[j])
}
}(int32(0))
// We are setting the prevote function to do nothing because the prevoting
// and precommitting are done alongside the proposal.
css[0].doPrevote = func(height int64, round int32) {}
defer func() {
for _, sw := range switches {
err := sw.Stop()
require.NoError(t, err)
}
}()
// start the non-byz state machines.
// note these must be started before the byz
for i := 1; i < N; i++ {


+ 7
- 1
libs/log/testing_logger.go View File

@ -3,6 +3,7 @@ package log
import (
"io"
"os"
"sync"
"testing"
"github.com/go-kit/kit/log/term"
@ -10,7 +11,8 @@ import (
var (
// reuse the same logger across all tests
_testingLogger Logger
_testingLoggerMutex = sync.Mutex{}
_testingLogger Logger
)
// TestingLogger returns a TMLogger which writes to STDOUT if testing being run
@ -30,6 +32,8 @@ func TestingLogger() Logger {
// inside a test (not in the init func) because
// verbose flag only set at the time of testing.
func TestingLoggerWithOutput(w io.Writer) Logger {
_testingLoggerMutex.Lock()
defer _testingLoggerMutex.Unlock()
if _testingLogger != nil {
return _testingLogger
}
@ -46,6 +50,8 @@ func TestingLoggerWithOutput(w io.Writer) Logger {
// TestingLoggerWithColorFn allow you to provide your own color function. See
// TestingLogger for documentation.
func TestingLoggerWithColorFn(colorFn func(keyvals ...interface{}) term.FgBgColor) Logger {
_testingLoggerMutex.Lock()
defer _testingLoggerMutex.Unlock()
if _testingLogger != nil {
return _testingLogger
}


+ 26
- 27
node/node.go View File

@ -180,7 +180,7 @@ type Node struct {
privValidator types.PrivValidator // local node's validator key
// network
transport *p2p.MultiplexTransport
transport *p2p.MConnTransport
sw *p2p.Switch // p2p connections
addrBook pex.AddrBook // known peers
nodeInfo p2p.NodeInfo
@ -409,23 +409,22 @@ func createConsensusReactor(config *cfg.Config,
}
func createTransport(
logger log.Logger,
config *cfg.Config,
nodeInfo p2p.NodeInfo,
nodeKey p2p.NodeKey,
proxyApp proxy.AppConns,
) (
*p2p.MultiplexTransport,
*p2p.MConnTransport,
[]p2p.PeerFilterFunc,
) {
var (
mConnConfig = p2p.MConnConfig(config.P2P)
transport = p2p.NewMultiplexTransport(nodeInfo, nodeKey, mConnConfig)
connFilters = []p2p.ConnFilterFunc{}
peerFilters = []p2p.PeerFilterFunc{}
)
if !config.P2P.AllowDuplicateIP {
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter())
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter)
}
// Filter peers by addr or pubkey with an ABCI query.
@ -468,11 +467,12 @@ func createTransport(
)
}
p2p.MultiplexTransportConnFilters(connFilters...)(transport)
// Limit the number of incoming connections.
max := config.P2P.MaxNumInboundPeers + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
p2p.MultiplexTransportMaxIncomingConnections(max)(transport)
transport := p2p.NewMConnTransport(
logger, nodeInfo, nodeKey.PrivKey, p2p.MConnConfig(config.P2P),
p2p.MConnTransportConnFilters(connFilters...),
p2p.MConnTransportMaxIncomingConnections(config.P2P.MaxNumInboundPeers+
len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))),
)
return transport, peerFilters
}
@ -764,11 +764,9 @@ func NewNode(config *cfg.Config,
return nil, err
}
// Setup Transport.
transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp)
// Setup Switch.
// Setup Transport and Switch.
p2pLogger := logger.With("module", "p2p")
transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp)
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
stateSyncReactorShim, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger,
@ -877,29 +875,30 @@ func (n *Node) OnStart() error {
n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
}
// Start the transport.
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress))
if err != nil {
return err
}
if err := n.transport.Listen(*addr); err != nil {
return err
}
n.isListening = true
// Start the mempool.
if n.config.Mempool.WalEnabled() {
err = n.mempool.InitWAL()
err := n.mempool.InitWAL()
if err != nil {
return fmt.Errorf("init mempool WAL: %w", err)
}
}
// Start the switch (the P2P server).
err = n.sw.Start()
err := n.sw.Start()
if err != nil {
return err
}
// Start the transport.
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress))
if err != nil {
return err
}
if err := n.transport.Listen(addr.Endpoint()); err != nil {
return err
}
n.isListening = true
// Start the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Start(); err != nil {


+ 2
- 7
p2p/conn/connection.go View File

@ -848,7 +848,7 @@ func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) {
}
// Handles incoming PacketMsgs. It returns a message bytes if message is
// complete. NOTE message bytes may change on next call to recvPacketMsg.
// complete, which is owned by the caller and will not be modified.
// Not goroutine-safe
func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
@ -859,12 +859,7 @@ func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
ch.recving = append(ch.recving, packet.Data...)
if packet.EOF {
msgBytes := ch.recving
// clear the slice without re-allocating.
// http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
// suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
// at which point the recving slice stops being used and should be garbage collected
ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
ch.recving = make([]byte, 0, ch.desc.RecvBufferCapacity)
return msgBytes, nil
}
return nil, nil


+ 10
- 0
p2p/netaddress.go View File

@ -281,6 +281,16 @@ func (na *NetAddress) HasID() bool {
return string(na.ID) != ""
}
// Endpoint converts the address to an MConnection endpoint.
func (na *NetAddress) Endpoint() Endpoint {
return Endpoint{
Protocol: MConnProtocol,
PeerID: na.ID,
IP: na.IP,
Port: na.Port,
}
}
// Local returns true if it is a local address.
func (na *NetAddress) Local() bool {
return na.IP.IsLoopback() || zero4.Contains(na.IP)


+ 1
- 1
p2p/node_info.go View File

@ -245,7 +245,7 @@ func (info DefaultNodeInfo) ToProto() *tmp2p.DefaultNodeInfo {
return dni
}
func DefaultNodeInfoFromToProto(pb *tmp2p.DefaultNodeInfo) (DefaultNodeInfo, error) {
func DefaultNodeInfoFromProto(pb *tmp2p.DefaultNodeInfo) (DefaultNodeInfo, error) {
if pb == nil {
return DefaultNodeInfo{}, errors.New("nil node info")
}


+ 75
- 108
p2p/peer.go View File

@ -4,7 +4,9 @@ import (
"bytes"
"encoding/hex"
"fmt"
"io"
"net"
"runtime/debug"
"strings"
"sync"
"time"
@ -178,52 +180,28 @@ type Peer interface {
type peerConn struct {
outbound bool
persistent bool
conn net.Conn // source connection
socketAddr *NetAddress
// cached RemoteIP()
ip net.IP
conn Connection
ip net.IP // cached RemoteIP()
}
func newPeerConn(
outbound, persistent bool,
conn net.Conn,
socketAddr *NetAddress,
) peerConn {
func newPeerConn(outbound, persistent bool, conn Connection) peerConn {
return peerConn{
outbound: outbound,
persistent: persistent,
conn: conn,
socketAddr: socketAddr,
}
}
// ID only exists for SecretConnection.
// NOTE: Will panic if conn is not *SecretConnection.
func (pc peerConn) ID() ID {
return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey())
return PubKeyToID(pc.conn.PubKey())
}
// Return the IP from the connection RemoteAddr
func (pc peerConn) RemoteIP() net.IP {
if pc.ip != nil {
return pc.ip
}
host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String())
if err != nil {
panic(err)
}
ips, err := net.LookupIP(host)
if err != nil {
panic(err)
if pc.ip == nil {
pc.ip = pc.conn.RemoteEndpoint().IP
}
pc.ip = ips[0]
return pc.ip
}
@ -235,13 +213,14 @@ type peer struct {
// raw peerConn and the multiplex connection
peerConn
mconn *tmconn.MConnection
// peer's node info and the channel it knows about
// channels = nodeInfo.Channels
// cached to avoid copying nodeInfo in hasChannel
nodeInfo NodeInfo
channels []byte
nodeInfo NodeInfo
channels []byte
reactors map[byte]Reactor
onPeerError func(Peer, interface{})
// User data
Data *cmap.CMap
@ -254,30 +233,22 @@ type PeerOption func(*peer)
func newPeer(
pc peerConn,
mConfig tmconn.MConnConfig,
nodeInfo NodeInfo,
reactorsByCh map[byte]Reactor,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
options ...PeerOption,
) *peer {
nodeInfo := pc.conn.NodeInfo()
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.(DefaultNodeInfo).Channels, // TODO
channels: nodeInfo.Channels, // TODO
reactors: reactorsByCh,
onPeerError: onPeerError,
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
}
p.mconn = createMConnection(
pc.conn,
p,
reactorsByCh,
chDescs,
onPeerError,
mConfig,
)
p.BaseService = *service.NewBaseService(nil, "Peer", p)
for _, option := range options {
option(p)
@ -286,13 +257,18 @@ func newPeer(
return p
}
// onError calls the peer error callback.
func (p *peer) onError(err interface{}) {
p.onPeerError(p, err)
}
// String representation.
func (p *peer) String() string {
if p.outbound {
return fmt.Sprintf("Peer{%v %v out}", p.mconn, p.ID())
return fmt.Sprintf("Peer{%v %v out}", p.conn, p.ID())
}
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
return fmt.Sprintf("Peer{%v %v in}", p.conn, p.ID())
}
//---------------------------------------------------
@ -301,7 +277,6 @@ func (p *peer) String() string {
// SetLogger implements BaseService.
func (p *peer) SetLogger(l log.Logger) {
p.Logger = l
p.mconn.SetLogger(l)
}
// OnStart implements BaseService.
@ -310,29 +285,53 @@ func (p *peer) OnStart() error {
return err
}
if err := p.mconn.Start(); err != nil {
return err
}
go p.processMessages()
go p.metricsReporter()
return nil
}
// processMessages processes messages received from the connection.
func (p *peer) processMessages() {
defer func() {
if r := recover(); r != nil {
p.Logger.Error("peer message processing panic", "err", r, "stack", string(debug.Stack()))
p.onError(fmt.Errorf("panic during peer message processing: %v", r))
}
}()
for {
chID, msg, err := p.conn.ReceiveMessage()
if err != nil {
p.onError(err)
return
}
reactor, ok := p.reactors[chID]
if !ok {
p.onError(fmt.Errorf("unknown channel %v", chID))
return
}
reactor.Receive(chID, p, msg)
}
}
// FlushStop mimics OnStop but additionally ensures that all successful
// .Send() calls will get flushed before closing the connection.
// NOTE: it is not safe to call this method more than once.
func (p *peer) FlushStop() {
p.metricsTicker.Stop()
p.BaseService.OnStop()
p.mconn.FlushStop() // stop everything and close the conn
if err := p.conn.FlushClose(); err != nil {
p.Logger.Debug("error while stopping peer", "err", err)
}
}
// OnStop implements BaseService.
func (p *peer) OnStop() {
p.metricsTicker.Stop()
p.BaseService.OnStop()
if err := p.mconn.Stop(); err != nil { // stop everything and close the conn
p.Logger.Debug("Error while stopping peer", "err", err)
if err := p.conn.Close(); err != nil {
p.Logger.Debug("error while stopping peer", "err", err)
}
}
@ -364,12 +363,12 @@ func (p *peer) NodeInfo() NodeInfo {
// For inbound peers, it's the address returned by the underlying connection
// (not what's reported in the peer's NodeInfo).
func (p *peer) SocketAddr() *NetAddress {
return p.peerConn.socketAddr
return p.peerConn.conn.RemoteEndpoint().NetAddress()
}
// Status returns the peer's ConnectionStatus.
func (p *peer) Status() tmconn.ConnectionStatus {
return p.mconn.Status()
return p.conn.Status()
}
// Send msg bytes to the channel identified by chID byte. Returns false if the
@ -382,7 +381,13 @@ func (p *peer) Send(chID byte, msgBytes []byte) bool {
} else if !p.hasChannel(chID) {
return false
}
res := p.mconn.Send(chID, msgBytes)
res, err := p.conn.SendMessage(chID, msgBytes)
if err == io.EOF {
return false
} else if err != nil {
p.onError(err)
return false
}
if res {
labels := []string{
"peer_id", string(p.ID()),
@ -401,7 +406,13 @@ func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
} else if !p.hasChannel(chID) {
return false
}
res := p.mconn.TrySend(chID, msgBytes)
res, err := p.conn.TrySendMessage(chID, msgBytes)
if err == io.EOF {
return false
} else if err != nil {
p.onError(err)
return false
}
if res {
labels := []string{
"peer_id", string(p.ID()),
@ -458,15 +469,11 @@ func (pc *peerConn) CloseConn() {
// RemoteAddr returns peer's remote network address.
func (p *peer) RemoteAddr() net.Addr {
return p.peerConn.conn.RemoteAddr()
}
// CanSend returns true if the send queue is not full, false otherwise.
func (p *peer) CanSend(chID byte) bool {
if !p.IsRunning() {
return false
endpoint := p.conn.RemoteEndpoint()
return &net.TCPAddr{
IP: endpoint.IP,
Port: int(endpoint.Port),
}
return p.mconn.CanSend(chID)
}
//---------------------------------------------------
@ -481,7 +488,7 @@ func (p *peer) metricsReporter() {
for {
select {
case <-p.metricsTicker.C:
status := p.mconn.Status()
status := p.conn.Status()
var sendQueueSize float64
for _, chStatus := range status.Channels {
sendQueueSize += float64(chStatus.SendQueueSize)
@ -493,43 +500,3 @@ func (p *peer) metricsReporter() {
}
}
}
//------------------------------------------------------------------
// helper funcs
func createMConnection(
conn net.Conn,
p *peer,
reactorsByCh map[byte]Reactor,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
config tmconn.MConnConfig,
) *tmconn.MConnection {
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
// Note that its ok to panic here as it's caught in the conn._recover,
// which does onPeerError.
panic(fmt.Sprintf("Unknown channel %X", chID))
}
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {
onPeerError(p, r)
}
return tmconn.NewMConnectionWithConfig(
conn,
chDescs,
onReceive,
onError,
config,
)
}

+ 13
- 23
p2p/peer_test.go View File

@ -114,7 +114,6 @@ func TestPeerSend(t *testing.T) {
}
})
assert.True(p.CanSend(testCh))
assert.True(p.Send(testCh, []byte("Asylum")))
}
@ -126,20 +125,17 @@ func createOutboundPeerAndPerformHandshake(
chDescs := []*tmconn.ChannelDescriptor{
{ID: testCh, Priority: 1},
}
reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)}
pk := ed25519.GenPrivKey()
pc, err := testOutboundPeerConn(addr, config, false, pk)
if err != nil {
return nil, err
}
timeout := 1 * time.Second
ourNodeInfo := testNodeInfo(addr.ID, "host_peer")
peerNodeInfo, err := handshake(pc.conn, timeout, ourNodeInfo)
ourNodeInfo := testNodeInfo(PubKeyToID(pk.PubKey()), "host_peer")
transport := NewMConnTransport(log.TestingLogger(), ourNodeInfo, pk, mConfig)
transport.SetChannelDescriptors(chDescs)
reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)}
pc, err := testOutboundPeerConn(transport, addr, config, false, pk)
if err != nil {
return nil, err
}
p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {})
p := newPeer(pc, reactorsByCh, func(p Peer, r interface{}) {})
p.SetLogger(log.TestingLogger().With("peer", addr))
return p, nil
}
@ -157,6 +153,7 @@ func testDial(addr *NetAddress, cfg *config.P2PConfig) (net.Conn, error) {
}
func testOutboundPeerConn(
transport *MConnTransport,
addr *NetAddress,
config *config.P2PConfig,
persistent bool,
@ -169,7 +166,7 @@ func testOutboundPeerConn(
return pc, fmt.Errorf("error creating peer: %w", err)
}
pc, err = testPeerConn(conn, config, true, persistent, ourNodePrivKey, addr)
pc, err = testPeerConn(transport, conn, true, persistent)
if err != nil {
if cerr := conn.Close(); cerr != nil {
return pc, fmt.Errorf("%v: %w", cerr.Error(), err)
@ -227,15 +224,12 @@ func (rp *remotePeer) Stop() {
}
func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) {
transport := NewMConnTransport(log.TestingLogger(), rp.nodeInfo(), rp.PrivKey, MConnConfig(rp.Config))
conn, err := addr.DialTimeout(1 * time.Second)
if err != nil {
return nil, err
}
pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey)
if err != nil {
return nil, err
}
_, err = handshake(pc.conn, time.Second, rp.nodeInfo())
_, err = testInboundPeerConn(transport, conn)
if err != nil {
return nil, err
}
@ -243,6 +237,7 @@ func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) {
}
func (rp *remotePeer) accept() {
transport := NewMConnTransport(log.TestingLogger(), rp.nodeInfo(), rp.PrivKey, MConnConfig(rp.Config))
conns := []net.Conn{}
for {
@ -255,14 +250,9 @@ func (rp *remotePeer) accept() {
return
}
pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey)
if err != nil {
golog.Fatalf("Failed to create a peer: %+v", err)
}
_, err = handshake(pc.conn, time.Second, rp.nodeInfo())
_, err = testInboundPeerConn(transport, conn)
if err != nil {
golog.Fatalf("Failed to perform handshake: %+v", err)
golog.Printf("Failed to create a peer: %+v", err)
}
conns = append(conns, conn)


+ 13
- 8
p2p/pex/pex_reactor_test.go View File

@ -126,7 +126,9 @@ func TestPEXReactorReceive(t *testing.T) {
r.RequestAddrs(peer)
size := book.Size()
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
na, err := peer.NodeInfo().NetAddress()
require.NoError(t, err)
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{na.ToProto()}})
r.Receive(PexChannel, peer, msg)
assert.Equal(t, size+1, book.Size())
@ -327,7 +329,7 @@ func TestPEXReactorDoesNotDisconnectFromPersistentPeerInSeedMode(t *testing.T) {
assert.Zero(t, sw.Peers().Size())
peerSwitch := testCreateDefaultPeer(dir, 1)
peerSwitch := testCreatePeerWithConfig(dir, 1, pexRConfig)
require.NoError(t, peerSwitch.Start())
t.Cleanup(func() { _ = peerSwitch.Stop() })
@ -452,7 +454,9 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
pexR.RequestAddrs(peer)
size := book.Size()
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
na, err := peer.NodeInfo().NetAddress()
require.NoError(t, err)
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{na.ToProto()}})
pexR.Receive(PexChannel, peer, msg)
assert.Equal(t, size, book.Size())
@ -619,12 +623,13 @@ func createReactor(t *testing.T, conf *ReactorConfig) (r *Reactor, book AddrBook
}
func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch {
sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
for _, r := range reactors {
sw.AddReactor(r.String(), r)
}
return sw
})
sw.SetLogger(log.TestingLogger())
for _, r := range reactors {
sw.AddReactor(r.String(), r)
r.SetSwitch(sw)
}
return sw
}


+ 45
- 23
p2p/switch.go View File

@ -1,6 +1,7 @@
package p2p
import (
"context"
"fmt"
"math"
"sync"
@ -92,10 +93,14 @@ type Switch struct {
metrics *Metrics
}
// NetAddress returns the address the switch is listening on.
// NetAddress returns the first address the switch is listening on,
// or nil if no addresses are found.
func (sw *Switch) NetAddress() *NetAddress {
addr := sw.transport.NetAddress()
return &addr
endpoints := sw.transport.Endpoints()
if len(endpoints) == 0 {
return nil
}
return endpoints[0].NetAddress()
}
// SwitchOption sets an optional parameter on the Switch.
@ -221,6 +226,12 @@ func (sw *Switch) SetNodeKey(nodeKey NodeKey) {
// OnStart implements BaseService. It starts all the reactors and peers.
func (sw *Switch) OnStart() error {
// FIXME: Temporary hack to pass channel descriptors to MConn transport,
// since they are not available when it is constructed. This will be
// fixed when we implement the new router abstraction.
sw.transport.SetChannelDescriptors(sw.chDescs)
// Start reactors
for _, reactor := range sw.reactors {
err := reactor.Start()
@ -246,7 +257,7 @@ func (sw *Switch) OnStop() {
sw.Logger.Debug("Switch: Stopping reactors")
for _, reactor := range sw.reactors {
if err := reactor.Stop(); err != nil {
sw.Logger.Error("error while stopped reactor", "reactor", reactor, "error", err)
sw.Logger.Error("error while stopping reactor", "reactor", reactor, "error", err)
}
}
}
@ -354,7 +365,6 @@ func (sw *Switch) StopPeerGracefully(peer Peer) {
}
func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
sw.transport.Cleanup(peer)
if err := peer.Stop(); err != nil {
sw.Logger.Error("error while stopping peer", "error", err) // TODO: should return error to be handled accordingly
}
@ -617,13 +627,7 @@ func (sw *Switch) IsPeerPersistent(na *NetAddress) bool {
func (sw *Switch) acceptRoutine() {
for {
p, err := sw.transport.Accept(peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
isPersistent: sw.IsPeerPersistent,
})
c, err := sw.transport.Accept(context.Background())
if err != nil {
switch err := err.(type) {
case ErrRejected:
@ -671,6 +675,20 @@ func (sw *Switch) acceptRoutine() {
break
}
peerNodeInfo := c.NodeInfo()
isPersistent := false
addr, err := peerNodeInfo.NetAddress()
if err == nil {
isPersistent = sw.IsPeerPersistent(addr)
}
p := newPeer(
newPeerConn(false, isPersistent, c),
sw.reactorsByCh,
sw.StopPeerForError,
PeerMetrics(sw.metrics),
)
if !sw.IsPeerUnconditional(p.NodeInfo().ID()) {
// Ignore connection if we already have enough peers.
_, in, _ := sw.NumPeers()
@ -681,16 +699,14 @@ func (sw *Switch) acceptRoutine() {
"have", in,
"max", sw.config.MaxNumInboundPeers,
)
sw.transport.Cleanup(p)
_ = p.CloseConn()
continue
}
}
if err := sw.addPeer(p); err != nil {
sw.transport.Cleanup(p)
_ = p.CloseConn()
if p.IsRunning() {
_ = p.Stop()
}
@ -720,12 +736,11 @@ func (sw *Switch) addOutboundPeerWithConfig(
return fmt.Errorf("dial err (peerConfig.DialFail == true)")
}
p, err := sw.transport.Dial(*addr, peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.IsPeerPersistent,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
c, err := sw.transport.Dial(context.Background(), Endpoint{
Protocol: MConnProtocol,
PeerID: addr.ID,
IP: addr.IP,
Port: addr.Port,
})
if err != nil {
if e, ok := err.(ErrRejected); ok {
@ -748,8 +763,15 @@ func (sw *Switch) addOutboundPeerWithConfig(
return err
}
p := newPeer(
newPeerConn(true, sw.IsPeerPersistent(c.RemoteEndpoint().NetAddress()), c),
sw.reactorsByCh,
sw.StopPeerForError,
PeerMetrics(sw.metrics),
)
if err := sw.addPeer(p); err != nil {
sw.transport.Cleanup(p)
_ = p.CloseConn()
if p.IsRunning() {
_ = p.Stop()
}


+ 40
- 35
p2p/switch_test.go View File

@ -2,6 +2,7 @@ package p2p
import (
"bytes"
"context"
"errors"
"fmt"
"io"
@ -28,6 +29,7 @@ import (
var (
cfg *config.P2PConfig
ctx = context.Background()
)
func init() {
@ -240,15 +242,15 @@ func TestSwitchPeerFilter(t *testing.T) {
rp.Start()
t.Cleanup(rp.Stop)
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.IsPeerPersistent,
reactorsByCh: sw.reactorsByCh,
})
c, err := sw.transport.Dial(ctx, rp.Addr().Endpoint())
if err != nil {
t.Fatal(err)
}
p := newPeer(
newPeerConn(true, false, c),
sw.reactorsByCh,
sw.StopPeerForError,
)
err = sw.addPeer(p)
if err, ok := err.(ErrRejected); ok {
@ -291,15 +293,15 @@ func TestSwitchPeerFilterTimeout(t *testing.T) {
rp.Start()
defer rp.Stop()
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.IsPeerPersistent,
reactorsByCh: sw.reactorsByCh,
})
c, err := sw.transport.Dial(ctx, rp.Addr().Endpoint())
if err != nil {
t.Fatal(err)
}
p := newPeer(
newPeerConn(true, false, c),
sw.reactorsByCh,
sw.StopPeerForError,
)
err = sw.addPeer(p)
if _, ok := err.(ErrFilterTimeout); !ok {
@ -322,15 +324,15 @@ func TestSwitchPeerFilterDuplicate(t *testing.T) {
rp.Start()
defer rp.Stop()
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.IsPeerPersistent,
reactorsByCh: sw.reactorsByCh,
})
c, err := sw.transport.Dial(ctx, rp.Addr().Endpoint())
if err != nil {
t.Fatal(err)
}
p := newPeer(
newPeerConn(true, false, c),
sw.reactorsByCh,
sw.StopPeerForError,
)
if err := sw.addPeer(p); err != nil {
t.Fatal(err)
@ -372,13 +374,15 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
rp.Start()
defer rp.Stop()
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.IsPeerPersistent,
reactorsByCh: sw.reactorsByCh,
})
require.Nil(err)
c, err := sw.transport.Dial(ctx, rp.Addr().Endpoint())
if err != nil {
t.Fatal(err)
}
p := newPeer(
newPeerConn(true, false, c),
sw.reactorsByCh,
sw.StopPeerForError,
)
err = sw.addPeer(p)
require.Nil(err)
@ -386,7 +390,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
require.NotNil(sw.Peers().Get(rp.ID()))
// simulate failure by closing connection
err = p.(*peer).CloseConn()
err = p.CloseConn()
require.NoError(err)
assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
@ -677,19 +681,16 @@ type errorTransport struct {
acceptErr error
}
func (et errorTransport) NetAddress() NetAddress {
panic("not implemented")
}
func (et errorTransport) Accept(c peerConfig) (Peer, error) {
func (et errorTransport) Accept(context.Context) (Connection, error) {
return nil, et.acceptErr
}
func (errorTransport) Dial(NetAddress, peerConfig) (Peer, error) {
panic("not implemented")
}
func (errorTransport) Cleanup(Peer) {
func (errorTransport) Dial(context.Context, Endpoint) (Connection, error) {
panic("not implemented")
}
func (errorTransport) Close() error { panic("not implemented") }
func (errorTransport) FlushClose() error { panic("not implemented") }
func (errorTransport) Endpoints() []Endpoint { panic("not implemented") }
func (errorTransport) SetChannelDescriptors([]*ChannelDescriptor) {}
func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
sw := NewSwitch(cfg, errorTransport{ErrFilterTimeout{}})
@ -728,6 +729,10 @@ type mockReactor struct {
initCalledBeforeRemoveFinished uint32
}
func (r *mockReactor) GetChannels() []*ChannelDescriptor {
return []*ChannelDescriptor{{ID: testCh, Priority: 10}}
}
func (r *mockReactor) RemovePeer(peer Peer, reason interface{}) {
atomic.StoreUint32(&r.removePeerInProgress, 1)
defer atomic.StoreUint32(&r.removePeerInProgress, 0)


+ 12
- 33
p2p/test_util.go View File

@ -3,9 +3,7 @@ package p2p
import (
"fmt"
"net"
"time"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/log"
tmnet "github.com/tendermint/tendermint/libs/net"
tmrand "github.com/tendermint/tendermint/libs/rand"
@ -34,12 +32,8 @@ func AddPeerToSwitchPeerSet(sw *Switch, peer Peer) {
func CreateRandomPeer(outbound bool) Peer {
addr, netAddr := CreateRoutableAddr()
p := &peer{
peerConn: peerConn{
outbound: outbound,
socketAddr: netAddr,
},
peerConn: peerConn{outbound: outbound},
nodeInfo: mockNodeInfo{netAddr},
mconn: &conn.MConnection{},
metrics: NopMetrics(),
}
p.SetLogger(log.TestingLogger().With("peer", addr))
@ -127,15 +121,7 @@ func Connect2Switches(switches []*Switch, i, j int) {
}
func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
pc, err := testInboundPeerConn(conn, sw.config, sw.nodeKey.PrivKey)
if err != nil {
if err := conn.Close(); err != nil {
sw.Logger.Error("Error closing connection", "err", err)
}
return err
}
ni, err := handshake(conn, time.Second, sw.nodeInfo)
pc, err := testInboundPeerConn(sw.transport.(*MConnTransport), conn)
if err != nil {
if err := conn.Close(); err != nil {
sw.Logger.Error("Error closing connection", "err", err)
@ -145,10 +131,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
p := newPeer(
pc,
MConnConfig(sw.config),
ni,
sw.reactorsByCh,
sw.chDescs,
sw.StopPeerForError,
)
@ -189,9 +172,10 @@ func MakeSwitch(
panic(err)
}
t := NewMultiplexTransport(nodeInfo, nodeKey, MConnConfig(cfg))
logger := log.TestingLogger().With("switch", i)
t := NewMConnTransport(logger, nodeInfo, nodeKey.PrivKey, MConnConfig(cfg))
if err := t.Listen(*addr); err != nil {
if err := t.Listen(addr.Endpoint()); err != nil {
panic(err)
}
@ -201,6 +185,7 @@ func MakeSwitch(
sw.SetNodeKey(nodeKey)
ni := nodeInfo.(DefaultNodeInfo)
ni.Channels = []byte{}
for ch := range sw.reactorsByCh {
ni.Channels = append(ni.Channels, ch)
}
@ -208,37 +193,31 @@ func MakeSwitch(
// TODO: We need to setup reactors ahead of time so the NodeInfo is properly
// populated and we don't have to do those awkward overrides and setters.
t.nodeInfo = nodeInfo
t.nodeInfo = nodeInfo.(DefaultNodeInfo)
sw.SetNodeInfo(nodeInfo)
return sw
}
func testInboundPeerConn(
transport *MConnTransport,
conn net.Conn,
config *config.P2PConfig,
ourNodePrivKey crypto.PrivKey,
) (peerConn, error) {
return testPeerConn(conn, config, false, false, ourNodePrivKey, nil)
return testPeerConn(transport, conn, false, false)
}
func testPeerConn(
transport *MConnTransport,
rawConn net.Conn,
cfg *config.P2PConfig,
outbound, persistent bool,
ourNodePrivKey crypto.PrivKey,
socketAddr *NetAddress,
) (pc peerConn, err error) {
conn := rawConn
// Encrypt connection
conn, err = upgradeSecretConn(conn, cfg.HandshakeTimeout, ourNodePrivKey)
conn, err := newMConnConnection(transport, rawConn, "")
if err != nil {
return pc, fmt.Errorf("error creating peer: %w", err)
}
// Only the information we already have
return newPeerConn(outbound, persistent, conn, socketAddr), nil
return newPeerConn(outbound, persistent, conn), nil
}
//----------------------------------------------------------------


+ 129
- 561
p2p/transport.go View File

@ -2,593 +2,161 @@ package p2p
import (
"context"
"errors"
"fmt"
"net"
"time"
"golang.org/x/net/netutil"
"net/url"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/protoio"
"github.com/tendermint/tendermint/p2p/conn"
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
const (
defaultDialTimeout = time.Second
defaultFilterTimeout = 5 * time.Second
defaultHandshakeTimeout = 3 * time.Second
)
// IPResolver is a behaviour subset of net.Resolver.
type IPResolver interface {
LookupIPAddr(context.Context, string) ([]net.IPAddr, error)
}
// accept is the container to carry the upgraded connection and NodeInfo from an
// asynchronously running routine to the Accept method.
type accept struct {
netAddr *NetAddress
conn net.Conn
nodeInfo NodeInfo
err error
}
// peerConfig is used to bundle data we need to fully setup a Peer with an
// MConn, provided by the caller of Accept and Dial (currently the Switch). This
// a temporary measure until reactor setup is less dynamic and we introduce the
// concept of PeerBehaviour to communicate about significant Peer lifecycle
// events.
// TODO(xla): Refactor out with more static Reactor setup and PeerBehaviour.
type peerConfig struct {
chDescs []*conn.ChannelDescriptor
onPeerError func(Peer, interface{})
outbound bool
// isPersistent allows you to set a function, which, given socket address
// (for outbound peers) OR self-reported address (for inbound peers), tells
// if the peer is persistent or not.
isPersistent func(*NetAddress) bool
reactorsByCh map[byte]Reactor
metrics *Metrics
}
// Transport emits and connects to Peers. The implementation of Peer is left to
// the transport. Each transport is also responsible to filter establishing
// peers specific to its domain.
// Transport is an arbitrary mechanism for exchanging bytes with a peer.
type Transport interface {
// Listening address.
NetAddress() NetAddress
// Accept waits for the next inbound connection on a listening endpoint.
Accept(context.Context) (Connection, error)
// Accept returns a newly connected Peer.
Accept(peerConfig) (Peer, error)
// Dial creates an outbound connection to an endpoint.
Dial(context.Context, Endpoint) (Connection, error)
// Dial connects to the Peer for the address.
Dial(NetAddress, peerConfig) (Peer, error)
// Endpoints lists endpoints the transport is listening on. Any endpoint IP
// addresses do not need to be normalized in any way (e.g. 0.0.0.0 is
// valid), as they should be preprocessed before being advertised.
Endpoints() []Endpoint
// Cleanup any resources associated with Peer.
Cleanup(Peer)
}
// transportLifecycle bundles the methods for callers to control start and stop
// behaviour.
type transportLifecycle interface {
// Close stops accepting new connections, but does not close active connections.
Close() error
Listen(NetAddress) error
}
// ConnFilterFunc to be implemented by filter hooks after a new connection has
// been established. The set of exisiting connections is passed along together
// with all resolved IPs for the new connection.
type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error
// ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection
// and refuses new ones if they come from a known ip.
func ConnDuplicateIPFilter() ConnFilterFunc {
return func(cs ConnSet, c net.Conn, ips []net.IP) error {
for _, ip := range ips {
if cs.HasIP(ip) {
return ErrRejected{
conn: c,
err: fmt.Errorf("ip<%v> already connected", ip),
isDuplicate: true,
}
}
}
return nil
}
}
// MultiplexTransportOption sets an optional parameter on the
// MultiplexTransport.
type MultiplexTransportOption func(*MultiplexTransport)
// MultiplexTransportConnFilters sets the filters for rejection new connections.
func MultiplexTransportConnFilters(
filters ...ConnFilterFunc,
) MultiplexTransportOption {
return func(mt *MultiplexTransport) { mt.connFilters = filters }
}
// MultiplexTransportFilterTimeout sets the timeout waited for filter calls to
// return.
func MultiplexTransportFilterTimeout(
timeout time.Duration,
) MultiplexTransportOption {
return func(mt *MultiplexTransport) { mt.filterTimeout = timeout }
}
// MultiplexTransportResolver sets the Resolver used for ip lokkups, defaults to
// net.DefaultResolver.
func MultiplexTransportResolver(resolver IPResolver) MultiplexTransportOption {
return func(mt *MultiplexTransport) { mt.resolver = resolver }
}
// MultiplexTransportMaxIncomingConnections sets the maximum number of
// simultaneous connections (incoming). Default: 0 (unlimited)
func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption {
return func(mt *MultiplexTransport) { mt.maxIncomingConnections = n }
}
// MultiplexTransport accepts and dials tcp connections and upgrades them to
// multiplexed peers.
type MultiplexTransport struct {
netAddr NetAddress
listener net.Listener
maxIncomingConnections int // see MaxIncomingConnections
acceptc chan accept
closec chan struct{}
// Lookup table for duplicate ip and id checks.
conns ConnSet
connFilters []ConnFilterFunc
dialTimeout time.Duration
filterTimeout time.Duration
handshakeTimeout time.Duration
nodeInfo NodeInfo
nodeKey NodeKey
resolver IPResolver
// TODO(xla): This config is still needed as we parameterise peerConn and
// peer currently. All relevant configuration should be refactored into options
// with sane defaults.
mConfig conn.MConnConfig
}
// Test multiplexTransport for interface completeness.
var _ Transport = (*MultiplexTransport)(nil)
var _ transportLifecycle = (*MultiplexTransport)(nil)
// NewMultiplexTransport returns a tcp connected multiplexed peer.
func NewMultiplexTransport(
nodeInfo NodeInfo,
nodeKey NodeKey,
mConfig conn.MConnConfig,
) *MultiplexTransport {
return &MultiplexTransport{
acceptc: make(chan accept),
closec: make(chan struct{}),
dialTimeout: defaultDialTimeout,
filterTimeout: defaultFilterTimeout,
handshakeTimeout: defaultHandshakeTimeout,
mConfig: mConfig,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
conns: NewConnSet(),
resolver: net.DefaultResolver,
}
}
// NetAddress implements Transport.
func (mt *MultiplexTransport) NetAddress() NetAddress {
return mt.netAddr
}
// Accept implements Transport.
func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) {
select {
// This case should never have any side-effectful/blocking operations to
// ensure that quality peers are ready to be used.
case a := <-mt.acceptc:
if a.err != nil {
return nil, a.err
}
cfg.outbound = false
return mt.wrapPeer(a.conn, a.nodeInfo, cfg, a.netAddr), nil
case <-mt.closec:
return nil, ErrTransportClosed{}
}
}
// Dial implements Transport.
func (mt *MultiplexTransport) Dial(
addr NetAddress,
cfg peerConfig,
) (Peer, error) {
c, err := addr.DialTimeout(mt.dialTimeout)
if err != nil {
return nil, err
}
// TODO(xla): Evaluate if we should apply filters if we explicitly dial.
if err := mt.filterConn(c); err != nil {
return nil, err
}
secretConn, nodeInfo, err := mt.upgrade(c, &addr)
if err != nil {
return nil, err
}
cfg.outbound = true
p := mt.wrapPeer(secretConn, nodeInfo, cfg, &addr)
return p, nil
}
// Close implements transportLifecycle.
func (mt *MultiplexTransport) Close() error {
close(mt.closec)
if mt.listener != nil {
return mt.listener.Close()
}
return nil
}
// Listen implements transportLifecycle.
func (mt *MultiplexTransport) Listen(addr NetAddress) error {
ln, err := net.Listen("tcp", addr.DialString())
if err != nil {
return err
}
if mt.maxIncomingConnections > 0 {
ln = netutil.LimitListener(ln, mt.maxIncomingConnections)
}
mt.netAddr = addr
mt.listener = ln
go mt.acceptPeers()
return nil
}
func (mt *MultiplexTransport) acceptPeers() {
for {
c, err := mt.listener.Accept()
if err != nil {
// If Close() has been called, silently exit.
select {
case _, ok := <-mt.closec:
if !ok {
return
}
default:
// Transport is not closed
}
mt.acceptc <- accept{err: err}
return
}
// Connection upgrade and filtering should be asynchronous to avoid
// Head-of-line blocking[0].
// Reference: https://github.com/tendermint/tendermint/issues/2047
//
// [0] https://en.wikipedia.org/wiki/Head-of-line_blocking
go func(c net.Conn) {
defer func() {
if r := recover(); r != nil {
err := ErrRejected{
conn: c,
err: fmt.Errorf("recovered from panic: %v", r),
isAuthFailure: true,
}
select {
case mt.acceptc <- accept{err: err}:
case <-mt.closec:
// Give up if the transport was closed.
_ = c.Close()
return
}
}
}()
var (
nodeInfo NodeInfo
secretConn *conn.SecretConnection
netAddr *NetAddress
)
err := mt.filterConn(c)
if err == nil {
secretConn, nodeInfo, err = mt.upgrade(c, nil)
if err == nil {
addr := c.RemoteAddr()
id := PubKeyToID(secretConn.RemotePubKey())
netAddr = NewNetAddress(id, addr)
}
}
select {
case mt.acceptc <- accept{netAddr, secretConn, nodeInfo, err}:
// Make the upgraded peer available.
case <-mt.closec:
// Give up if the transport was closed.
_ = c.Close()
return
}
}(c)
}
}
// Cleanup removes the given address from the connections set and
// closes the connection.
func (mt *MultiplexTransport) Cleanup(p Peer) {
mt.conns.RemoveAddr(p.RemoteAddr())
_ = p.CloseConn()
}
func (mt *MultiplexTransport) cleanup(c net.Conn) error {
mt.conns.Remove(c)
return c.Close()
}
func (mt *MultiplexTransport) filterConn(c net.Conn) (err error) {
defer func() {
if err != nil {
_ = c.Close()
}
}()
// Reject if connection is already present.
if mt.conns.Has(c) {
return ErrRejected{conn: c, isDuplicate: true}
}
// Resolve ips for incoming conn.
ips, err := resolveIPs(mt.resolver, c)
if err != nil {
return err
}
errc := make(chan error, len(mt.connFilters))
for _, f := range mt.connFilters {
go func(f ConnFilterFunc, c net.Conn, ips []net.IP, errc chan<- error) {
errc <- f(mt.conns, c, ips)
}(f, c, ips, errc)
}
for i := 0; i < cap(errc); i++ {
select {
case err := <-errc:
if err != nil {
return ErrRejected{conn: c, err: err, isFiltered: true}
}
case <-time.After(mt.filterTimeout):
return ErrFilterTimeout{}
}
}
mt.conns.Set(c, ips)
return nil
// SetChannelDescriptors sets the channel descriptors for the transport.
// FIXME: This is only here for compatibility with the current Switch code.
SetChannelDescriptors(chDescs []*conn.ChannelDescriptor)
}
func (mt *MultiplexTransport) upgrade(
c net.Conn,
dialedAddr *NetAddress,
) (secretConn *conn.SecretConnection, nodeInfo NodeInfo, err error) {
defer func() {
if err != nil {
_ = mt.cleanup(c)
}
}()
secretConn, err = upgradeSecretConn(c, mt.handshakeTimeout, mt.nodeKey.PrivKey)
if err != nil {
return nil, nil, ErrRejected{
conn: c,
err: fmt.Errorf("secret conn failed: %v", err),
isAuthFailure: true,
}
}
// For outgoing conns, ensure connection key matches dialed key.
connID := PubKeyToID(secretConn.RemotePubKey())
if dialedAddr != nil {
if dialedID := dialedAddr.ID; connID != dialedID {
return nil, nil, ErrRejected{
conn: c,
id: connID,
err: fmt.Errorf(
"conn.ID (%v) dialed ID (%v) mismatch",
connID,
dialedID,
),
isAuthFailure: true,
}
}
}
nodeInfo, err = handshake(secretConn, mt.handshakeTimeout, mt.nodeInfo)
if err != nil {
return nil, nil, ErrRejected{
conn: c,
err: fmt.Errorf("handshake failed: %v", err),
isAuthFailure: true,
}
}
if err := nodeInfo.Validate(); err != nil {
return nil, nil, ErrRejected{
conn: c,
err: err,
isNodeInfoInvalid: true,
}
}
// Ensure connection key matches self reported key.
if connID != nodeInfo.ID() {
return nil, nil, ErrRejected{
conn: c,
id: connID,
err: fmt.Errorf(
"conn.ID (%v) NodeInfo.ID (%v) mismatch",
connID,
nodeInfo.ID(),
),
isAuthFailure: true,
}
}
// Reject self.
if mt.nodeInfo.ID() == nodeInfo.ID() {
return nil, nil, ErrRejected{
addr: *NewNetAddress(nodeInfo.ID(), c.RemoteAddr()),
conn: c,
id: nodeInfo.ID(),
isSelf: true,
}
}
if err := mt.nodeInfo.CompatibleWith(nodeInfo); err != nil {
return nil, nil, ErrRejected{
conn: c,
err: err,
id: nodeInfo.ID(),
isIncompatible: true,
}
}
// Protocol identifies a transport protocol.
type Protocol string
return secretConn, nodeInfo, nil
}
// Endpoint represents a transport connection endpoint, either local or remote.
type Endpoint struct {
// PeerID specifies the peer ID of the endpoint.
//
// FIXME: This is here for backwards-compatibility with the existing MConn
// protocol, we should consider moving this higher in the stack (i.e. to
// the router).
PeerID ID
func (mt *MultiplexTransport) wrapPeer(
c net.Conn,
ni NodeInfo,
cfg peerConfig,
socketAddr *NetAddress,
) Peer {
// Protocol specifies the transport protocol, used by the router to pick a
// transport for an endpoint.
Protocol Protocol
persistent := false
if cfg.isPersistent != nil {
if cfg.outbound {
persistent = cfg.isPersistent(socketAddr)
} else {
selfReportedAddr, err := ni.NetAddress()
if err == nil {
persistent = cfg.isPersistent(selfReportedAddr)
}
}
}
// Path is an optional, arbitrary transport-specific path or identifier.
Path string
peerConn := newPeerConn(
cfg.outbound,
persistent,
c,
socketAddr,
)
// IP is an IP address (v4 or v6) to connect to. If set, this defines the
// endpoint as a networked endpoint.
IP net.IP
p := newPeer(
peerConn,
mt.mConfig,
ni,
cfg.reactorsByCh,
cfg.chDescs,
cfg.onPeerError,
PeerMetrics(cfg.metrics),
)
return p
// Port is a network port (either TCP or UDP). If not set, a default port
// may be used depending on the protocol.
Port uint16
}
func handshake(
c net.Conn,
timeout time.Duration,
nodeInfo NodeInfo,
) (NodeInfo, error) {
if err := c.SetDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
// String formats an endpoint as a URL string.
func (e Endpoint) String() string {
u := url.URL{Scheme: string(e.Protocol)}
if e.PeerID != "" {
u.User = url.User(string(e.PeerID))
}
var (
errc = make(chan error, 2)
pbpeerNodeInfo tmp2p.DefaultNodeInfo
peerNodeInfo DefaultNodeInfo
ourNodeInfo = nodeInfo.(DefaultNodeInfo)
)
go func(errc chan<- error, c net.Conn) {
_, err := protoio.NewDelimitedWriter(c).WriteMsg(ourNodeInfo.ToProto())
errc <- err
}(errc, c)
go func(errc chan<- error, c net.Conn) {
protoReader := protoio.NewDelimitedReader(c, MaxNodeInfoSize())
err := protoReader.ReadMsg(&pbpeerNodeInfo)
errc <- err
}(errc, c)
for i := 0; i < cap(errc); i++ {
err := <-errc
if err != nil {
return nil, err
if len(e.IP) > 0 {
u.Host = e.IP.String()
if e.Port > 0 {
u.Host += fmt.Sprintf(":%v", e.Port)
}
} else if e.Path != "" {
u.Opaque = e.Path
}
return u.String()
}
// Validate validates an endpoint.
func (e Endpoint) Validate() error {
switch {
case e.PeerID == "":
return errors.New("endpoint has no peer ID")
case e.Protocol == "":
return errors.New("endpoint has no protocol")
case len(e.IP) == 0 && len(e.Path) == 0:
return errors.New("endpoint must have either IP or path")
case e.Port > 0 && len(e.IP) == 0:
return fmt.Errorf("endpoint has port %v but no IP", e.Port)
default:
return nil
}
peerNodeInfo, err := DefaultNodeInfoFromToProto(&pbpeerNodeInfo)
if err != nil {
return nil, err
}
return peerNodeInfo, c.SetDeadline(time.Time{})
}
func upgradeSecretConn(
c net.Conn,
timeout time.Duration,
privKey crypto.PrivKey,
) (*conn.SecretConnection, error) {
if err := c.SetDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
}
sc, err := conn.MakeSecretConnection(c, privKey)
if err != nil {
return nil, err
}
return sc, sc.SetDeadline(time.Time{})
}
func resolveIPs(resolver IPResolver, c net.Conn) ([]net.IP, error) {
host, _, err := net.SplitHostPort(c.RemoteAddr().String())
if err != nil {
return nil, err
}
addrs, err := resolver.LookupIPAddr(context.Background(), host)
if err != nil {
return nil, err
}
ips := []net.IP{}
for _, addr := range addrs {
ips = append(ips, addr.IP)
}
// NetAddress returns a NetAddress for the endpoint.
// FIXME: This is temporary for compatibility with the old P2P stack.
func (e Endpoint) NetAddress() *NetAddress {
return &NetAddress{
ID: e.PeerID,
IP: e.IP,
Port: e.Port,
}
}
// Connection represents an established connection between two endpoints.
//
// FIXME: This is a temporary interface while we figure out whether we'll be
// adopting QUIC or not. If we do, this should be a byte-oriented multi-stream
// interface with one goroutine consuming each stream, and the MConnection
// transport either needs protocol changes or a shim. For details, see:
// https://github.com/tendermint/spec/pull/227
//
// FIXME: The interface is currently very broad in order to accommodate
// MConnection behavior that the rest of the P2P stack relies on. This should be
// removed once the P2P core is rewritten.
type Connection interface {
// ReceiveMessage returns the next message received on the connection,
// blocking until one is available. io.EOF is returned when closed.
ReceiveMessage() (chID byte, msg []byte, err error)
// SendMessage sends a message on the connection.
// FIXME: For compatibility with the current Peer, it returns an additional
// boolean false if the message timed out waiting to be accepted into the
// send buffer.
SendMessage(chID byte, msg []byte) (bool, error)
// TrySendMessage is a non-blocking version of SendMessage that returns
// immediately if the message buffer is full. It returns true if the message
// was accepted.
//
// FIXME: This is here for backwards-compatibility with the current Peer
// code, and should be removed when possible.
TrySendMessage(chID byte, msg []byte) (bool, error)
// LocalEndpoint returns the local endpoint for the connection.
LocalEndpoint() Endpoint
// RemoteEndpoint returns the remote endpoint for the connection.
RemoteEndpoint() Endpoint
// PubKey returns the remote peer's public key.
PubKey() crypto.PubKey
// NodeInfo returns the remote peer's node info.
NodeInfo() DefaultNodeInfo
// Close closes the connection.
Close() error
return ips, nil
// FlushClose flushes all pending sends and then closes the connection.
//
// FIXME: This only exists for backwards-compatibility with the current
// MConnection implementation. There should really be a separate Flush()
// method, but there is no easy way to synchronously flush pending data with
// the current MConnection structure.
FlushClose() error
// Status returns the current connection status.
// FIXME: Only here for compatibility with the current Peer code.
Status() conn.ConnectionStatus
}

+ 675
- 0
p2p/transport_mconn.go View File

@ -0,0 +1,675 @@
package p2p
import (
"context"
"errors"
"fmt"
"io"
"net"
"sync"
"time"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/protoio"
"github.com/tendermint/tendermint/p2p/conn"
p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p"
"golang.org/x/net/netutil"
)
const (
defaultDialTimeout = time.Second
defaultFilterTimeout = 5 * time.Second
defaultHandshakeTimeout = 3 * time.Second
)
// MConnProtocol is the MConn protocol identifier.
const MConnProtocol Protocol = "mconn"
// MConnTransportOption sets an option for MConnTransport.
type MConnTransportOption func(*MConnTransport)
// MConnTransportMaxIncomingConnections sets the maximum number of
// simultaneous incoming connections. Default: 0 (unlimited)
func MConnTransportMaxIncomingConnections(max int) MConnTransportOption {
return func(mt *MConnTransport) { mt.maxIncomingConnections = max }
}
// MConnTransportFilterTimeout sets the timeout for filter callbacks.
func MConnTransportFilterTimeout(timeout time.Duration) MConnTransportOption {
return func(mt *MConnTransport) { mt.filterTimeout = timeout }
}
// MConnTransportConnFilters sets connection filters.
func MConnTransportConnFilters(filters ...ConnFilterFunc) MConnTransportOption {
return func(mt *MConnTransport) { mt.connFilters = filters }
}
// ConnFilterFunc is a callback for connection filtering. If it returns an
// error, the connection is rejected. The set of existing connections is passed
// along with the new connection and all resolved IPs.
type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error
// ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection
// and refuses new ones if they come from a known ip.
var ConnDuplicateIPFilter ConnFilterFunc = func(cs ConnSet, c net.Conn, ips []net.IP) error {
for _, ip := range ips {
if cs.HasIP(ip) {
return ErrRejected{
conn: c,
err: fmt.Errorf("ip<%v> already connected", ip),
isDuplicate: true,
}
}
}
return nil
}
// MConnTransport is a Transport implementation using the current multiplexed
// Tendermint protocol ("MConn"). It inherits lots of code and logic from the
// previous implementation for parity with the current P2P stack (such as
// connection filtering, peer verification, and panic handling), which should be
// moved out of the transport once the rest of the P2P stack is rewritten.
type MConnTransport struct {
privKey crypto.PrivKey
nodeInfo DefaultNodeInfo
channelDescs []*ChannelDescriptor
mConnConfig conn.MConnConfig
maxIncomingConnections int
dialTimeout time.Duration
handshakeTimeout time.Duration
filterTimeout time.Duration
logger log.Logger
listener net.Listener
closeOnce sync.Once
chAccept chan *mConnConnection
chError chan error
chClose chan struct{}
// FIXME: This is a vestige from the old transport, and should be managed
// by the router once we rewrite the P2P core.
conns ConnSet
connFilters []ConnFilterFunc
}
// NewMConnTransport sets up a new MConn transport.
func NewMConnTransport(
logger log.Logger,
nodeInfo NodeInfo, // FIXME: should use DefaultNodeInfo, left for code compatibility
privKey crypto.PrivKey,
mConnConfig conn.MConnConfig,
opts ...MConnTransportOption,
) *MConnTransport {
m := &MConnTransport{
privKey: privKey,
nodeInfo: nodeInfo.(DefaultNodeInfo),
mConnConfig: mConnConfig,
channelDescs: []*ChannelDescriptor{},
dialTimeout: defaultDialTimeout,
handshakeTimeout: defaultHandshakeTimeout,
filterTimeout: defaultFilterTimeout,
logger: logger,
chAccept: make(chan *mConnConnection),
chError: make(chan error),
chClose: make(chan struct{}),
conns: NewConnSet(),
connFilters: []ConnFilterFunc{},
}
for _, opt := range opts {
opt(m)
}
return m
}
// SetChannelDescriptors implements Transport.
func (m *MConnTransport) SetChannelDescriptors(chDescs []*conn.ChannelDescriptor) {
m.channelDescs = chDescs
}
// Listen asynchronously listens for inbound connections on the given endpoint.
// It must be called exactly once before calling Accept(), and the caller must
// call Close() to shut down the listener.
func (m *MConnTransport) Listen(endpoint Endpoint) error {
if m.listener != nil {
return errors.New("MConn transport is already listening")
}
err := m.normalizeEndpoint(&endpoint)
if err != nil {
return fmt.Errorf("invalid MConn listen endpoint %q: %w", endpoint, err)
}
m.listener, err = net.Listen("tcp", fmt.Sprintf("%v:%v", endpoint.IP, endpoint.Port))
if err != nil {
return err
}
if m.maxIncomingConnections > 0 {
m.listener = netutil.LimitListener(m.listener, m.maxIncomingConnections)
}
// Spawn a goroutine to accept inbound connections asynchronously.
go m.accept()
return nil
}
// accept accepts inbound connections in a loop, and asynchronously handshakes
// with the peer to avoid head-of-line blocking. Established connections are
// passed to Accept() via chAccept.
// See: https://github.com/tendermint/tendermint/issues/204
func (m *MConnTransport) accept() {
for {
tcpConn, err := m.listener.Accept()
if err != nil {
// We have to check for closure first, since we don't want to
// propagate "use of closed network connection" errors.
select {
case <-m.chClose:
default:
// We also select on chClose here, in case the transport closes
// while we're blocked on error propagation.
select {
case m.chError <- err:
case <-m.chClose:
}
}
return
}
go func() {
err := m.filterTCPConn(tcpConn)
if err != nil {
if err := tcpConn.Close(); err != nil {
m.logger.Debug("failed to close TCP connection", "err", err)
}
select {
case m.chError <- err:
case <-m.chClose:
}
return
}
conn, err := newMConnConnection(m, tcpConn, "")
if err != nil {
m.conns.Remove(tcpConn)
if err := tcpConn.Close(); err != nil {
m.logger.Debug("failed to close TCP connection", "err", err)
}
select {
case m.chError <- err:
case <-m.chClose:
}
} else {
select {
case m.chAccept <- conn:
case <-m.chClose:
if err := tcpConn.Close(); err != nil {
m.logger.Debug("failed to close TCP connection", "err", err)
}
}
}
}()
}
}
// Accept implements Transport.
//
// accept() runs a concurrent accept loop that accepts inbound connections
// and then handshakes in a non-blocking fashion. The handshaked and validated
// connections are returned via this call, picking them off of the chAccept
// channel (or the handshake error, if any).
func (m *MConnTransport) Accept(ctx context.Context) (Connection, error) {
select {
case conn := <-m.chAccept:
return conn, nil
case err := <-m.chError:
return nil, err
case <-m.chClose:
return nil, ErrTransportClosed{}
case <-ctx.Done():
return nil, nil
}
}
// Dial implements Transport.
func (m *MConnTransport) Dial(ctx context.Context, endpoint Endpoint) (Connection, error) {
err := m.normalizeEndpoint(&endpoint)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(ctx, m.dialTimeout)
defer cancel()
dialer := net.Dialer{}
tcpConn, err := dialer.DialContext(ctx, "tcp", fmt.Sprintf("%v:%v", endpoint.IP, endpoint.Port))
if err != nil {
return nil, err
}
err = m.filterTCPConn(tcpConn)
if err != nil {
if err := tcpConn.Close(); err != nil {
m.logger.Debug("failed to close TCP connection", "err", err)
}
return nil, err
}
conn, err := newMConnConnection(m, tcpConn, endpoint.PeerID)
if err != nil {
m.conns.Remove(tcpConn)
if err := tcpConn.Close(); err != nil {
m.logger.Debug("failed to close TCP connection", "err", err)
}
return nil, err
}
return conn, nil
}
// Endpoints implements Transport.
func (m *MConnTransport) Endpoints() []Endpoint {
if m.listener == nil {
return []Endpoint{}
}
addr := m.listener.Addr().(*net.TCPAddr)
return []Endpoint{{
Protocol: MConnProtocol,
PeerID: m.nodeInfo.ID(),
IP: addr.IP,
Port: uint16(addr.Port),
}}
}
// Close implements Transport.
func (m *MConnTransport) Close() error {
var err error
m.closeOnce.Do(func() {
// We have to close chClose first, so that accept() will detect
// the closure and not propagate the error.
close(m.chClose)
if m.listener != nil {
err = m.listener.Close()
}
})
return err
}
// filterTCPConn filters a TCP connection, rejecting it if this function errors.
func (m *MConnTransport) filterTCPConn(tcpConn net.Conn) error {
if m.conns.Has(tcpConn) {
return ErrRejected{conn: tcpConn, isDuplicate: true}
}
host, _, err := net.SplitHostPort(tcpConn.RemoteAddr().String())
if err != nil {
return err
}
ip := net.ParseIP(host)
if ip == nil {
return fmt.Errorf("connection address has invalid IP address %q", host)
}
// Apply filter callbacks.
chErr := make(chan error, len(m.connFilters))
for _, connFilter := range m.connFilters {
go func(connFilter ConnFilterFunc) {
chErr <- connFilter(m.conns, tcpConn, []net.IP{ip})
}(connFilter)
}
for i := 0; i < cap(chErr); i++ {
select {
case err := <-chErr:
if err != nil {
return ErrRejected{conn: tcpConn, err: err, isFiltered: true}
}
case <-time.After(m.filterTimeout):
return ErrFilterTimeout{}
}
}
// FIXME: Doesn't really make sense to set this here, but we preserve the
// behavior from the previous P2P transport implementation. This should
// be moved to the router.
m.conns.Set(tcpConn, []net.IP{ip})
return nil
}
// normalizeEndpoint normalizes and validates an endpoint.
func (m *MConnTransport) normalizeEndpoint(endpoint *Endpoint) error {
if endpoint == nil {
return errors.New("nil endpoint")
}
if err := endpoint.Validate(); err != nil {
return err
}
if endpoint.Protocol == "" {
endpoint.Protocol = MConnProtocol
}
if endpoint.Protocol != MConnProtocol {
return fmt.Errorf("unsupported protocol %q", endpoint.Protocol)
}
if len(endpoint.IP) == 0 {
return errors.New("endpoint must have an IP address")
}
if endpoint.Path != "" {
return fmt.Errorf("endpoint cannot have path (got %q)", endpoint.Path)
}
if endpoint.Port == 0 {
endpoint.Port = 26657
}
return nil
}
// mConnConnection implements Connection for MConnTransport. It takes a base TCP
// connection and upgrades it to MConnection over an encrypted SecretConnection.
type mConnConnection struct {
logger log.Logger
transport *MConnTransport
secretConn *conn.SecretConnection
mConn *conn.MConnection
peerInfo DefaultNodeInfo
closeOnce sync.Once
chReceive chan mConnMessage
chError chan error
chClose chan struct{}
}
// mConnMessage passes MConnection messages through internal channels.
type mConnMessage struct {
channelID byte
payload []byte
}
// newMConnConnection creates a new mConnConnection by handshaking
// with a peer.
func newMConnConnection(
transport *MConnTransport,
tcpConn net.Conn,
expectPeerID ID,
) (c *mConnConnection, err error) {
// FIXME: Since the MConnection code panics, we need to recover here
// and turn it into an error. Be careful not to alias err, so we can
// update it from within this function. We should remove panics instead.
defer func() {
if r := recover(); r != nil {
err = ErrRejected{
conn: tcpConn,
err: fmt.Errorf("recovered from panic: %v", r),
isAuthFailure: true,
}
}
}()
err = tcpConn.SetDeadline(time.Now().Add(transport.handshakeTimeout))
if err != nil {
err = ErrRejected{
conn: tcpConn,
err: fmt.Errorf("secret conn failed: %v", err),
isAuthFailure: true,
}
return
}
c = &mConnConnection{
transport: transport,
chReceive: make(chan mConnMessage),
chError: make(chan error),
chClose: make(chan struct{}),
}
c.secretConn, err = conn.MakeSecretConnection(tcpConn, transport.privKey)
if err != nil {
err = ErrRejected{
conn: tcpConn,
err: fmt.Errorf("secret conn failed: %v", err),
isAuthFailure: true,
}
return
}
c.peerInfo, err = c.handshake()
if err != nil {
err = ErrRejected{
conn: tcpConn,
err: fmt.Errorf("handshake failed: %v", err),
isAuthFailure: true,
}
return
}
// Validate node info.
// FIXME: All of the ID verification code below should be moved to the
// router once implemented.
err = c.peerInfo.Validate()
if err != nil {
err = ErrRejected{
conn: tcpConn,
err: err,
isNodeInfoInvalid: true,
}
return
}
// For outgoing conns, ensure connection key matches dialed key.
if expectPeerID != "" {
peerID := PubKeyToID(c.PubKey())
if expectPeerID != peerID {
err = ErrRejected{
conn: tcpConn,
id: peerID,
err: fmt.Errorf(
"conn.ID (%v) dialed ID (%v) mismatch",
peerID,
expectPeerID,
),
isAuthFailure: true,
}
return
}
}
// Reject self.
if transport.nodeInfo.ID() == c.peerInfo.ID() {
err = ErrRejected{
addr: *NewNetAddress(c.peerInfo.ID(), c.secretConn.RemoteAddr()),
conn: tcpConn,
id: c.peerInfo.ID(),
isSelf: true,
}
return
}
err = transport.nodeInfo.CompatibleWith(c.peerInfo)
if err != nil {
err = ErrRejected{
conn: tcpConn,
err: err,
id: c.peerInfo.ID(),
isIncompatible: true,
}
return
}
err = tcpConn.SetDeadline(time.Time{})
if err != nil {
err = ErrRejected{
conn: tcpConn,
err: fmt.Errorf("secret conn failed: %v", err),
isAuthFailure: true,
}
return
}
// Set up the MConnection wrapper
c.mConn = conn.NewMConnectionWithConfig(
c.secretConn,
transport.channelDescs,
c.onReceive,
c.onError,
transport.mConnConfig,
)
// FIXME: Log format is set up for compatibility with existing peer code.
c.logger = transport.logger.With("peer", c.RemoteEndpoint().NetAddress())
c.mConn.SetLogger(c.logger)
err = c.mConn.Start()
return c, err
}
// handshake performs an MConn handshake, returning the peer's node info.
func (c *mConnConnection) handshake() (DefaultNodeInfo, error) {
var pbNodeInfo p2pproto.DefaultNodeInfo
chErr := make(chan error, 2)
go func() {
_, err := protoio.NewDelimitedWriter(c.secretConn).WriteMsg(c.transport.nodeInfo.ToProto())
chErr <- err
}()
go func() {
chErr <- protoio.NewDelimitedReader(c.secretConn, MaxNodeInfoSize()).ReadMsg(&pbNodeInfo)
}()
for i := 0; i < cap(chErr); i++ {
if err := <-chErr; err != nil {
return DefaultNodeInfo{}, err
}
}
return DefaultNodeInfoFromProto(&pbNodeInfo)
}
// onReceive is a callback for MConnection received messages.
func (c *mConnConnection) onReceive(channelID byte, payload []byte) {
select {
case c.chReceive <- mConnMessage{channelID: channelID, payload: payload}:
case <-c.chClose:
}
}
// onError is a callback for MConnection errors. The error is passed to
// chError, which is only consumed by ReceiveMessage() for parity with
// the old MConnection behavior.
func (c *mConnConnection) onError(e interface{}) {
err, ok := e.(error)
if !ok {
err = fmt.Errorf("%v", err)
}
select {
case c.chError <- err:
case <-c.chClose:
}
}
// String displays connection information.
// FIXME: This is here for backwards compatibility with existing code,
// it should probably just return RemoteEndpoint().String(), if anything.
func (c *mConnConnection) String() string {
endpoint := c.RemoteEndpoint()
return fmt.Sprintf("MConn{%v:%v}", endpoint.IP, endpoint.Port)
}
// SendMessage implements Connection.
func (c *mConnConnection) SendMessage(channelID byte, msg []byte) (bool, error) {
// We don't check chError here, to preserve old MConnection behavior.
select {
case <-c.chClose:
return false, io.EOF
default:
return c.mConn.Send(channelID, msg), nil
}
}
// TrySendMessage implements Connection.
func (c *mConnConnection) TrySendMessage(channelID byte, msg []byte) (bool, error) {
// We don't check chError here, to preserve old MConnection behavior.
select {
case <-c.chClose:
return false, io.EOF
default:
return c.mConn.TrySend(channelID, msg), nil
}
}
// ReceiveMessage implements Connection.
func (c *mConnConnection) ReceiveMessage() (byte, []byte, error) {
select {
case err := <-c.chError:
return 0, nil, err
case <-c.chClose:
return 0, nil, io.EOF
case msg := <-c.chReceive:
return msg.channelID, msg.payload, nil
}
}
// NodeInfo implements Connection.
func (c *mConnConnection) NodeInfo() DefaultNodeInfo {
return c.peerInfo
}
// PubKey implements Connection.
func (c *mConnConnection) PubKey() crypto.PubKey {
return c.secretConn.RemotePubKey()
}
// LocalEndpoint implements Connection.
func (c *mConnConnection) LocalEndpoint() Endpoint {
// FIXME: For compatibility with existing P2P tests we need to
// handle non-TCP connections. This should be removed.
endpoint := Endpoint{
Protocol: MConnProtocol,
PeerID: c.transport.nodeInfo.ID(),
}
if addr, ok := c.secretConn.LocalAddr().(*net.TCPAddr); ok {
endpoint.IP = addr.IP
endpoint.Port = uint16(addr.Port)
}
return endpoint
}
// RemoteEndpoint implements Connection.
func (c *mConnConnection) RemoteEndpoint() Endpoint {
// FIXME: For compatibility with existing P2P tests we need to
// handle non-TCP connections. This should be removed.
endpoint := Endpoint{
Protocol: MConnProtocol,
PeerID: c.peerInfo.ID(),
}
if addr, ok := c.secretConn.RemoteAddr().(*net.TCPAddr); ok {
endpoint.IP = addr.IP
endpoint.Port = uint16(addr.Port)
}
return endpoint
}
// Status implements Connection.
func (c *mConnConnection) Status() conn.ConnectionStatus {
return c.mConn.Status()
}
// Close implements Connection.
func (c *mConnConnection) Close() error {
c.transport.conns.RemoveAddr(c.secretConn.RemoteAddr())
var err error
c.closeOnce.Do(func() {
err = c.mConn.Stop()
close(c.chClose)
})
return err
}
// FlushClose implements Connection.
func (c *mConnConnection) FlushClose() error {
c.transport.conns.RemoveAddr(c.secretConn.RemoteAddr())
c.closeOnce.Do(func() {
c.mConn.FlushStop()
close(c.chClose)
})
return nil
}

+ 0
- 696
p2p/transport_test.go View File

@ -1,696 +0,0 @@
package p2p
import (
"fmt"
"math/rand"
"net"
"reflect"
"runtime"
"strings"
"testing"
"time"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/libs/protoio"
"github.com/tendermint/tendermint/p2p/conn"
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
var defaultNodeName = "host_peer"
func emptyNodeInfo() NodeInfo {
return DefaultNodeInfo{}
}
// newMultiplexTransport returns a tcp connected multiplexed peer
// using the default MConnConfig. It's a convenience function used
// for testing.
func newMultiplexTransport(
nodeInfo NodeInfo,
nodeKey NodeKey,
) *MultiplexTransport {
return NewMultiplexTransport(
nodeInfo, nodeKey, conn.DefaultMConnConfig(),
)
}
func TestTransportMultiplexConnFilter(t *testing.T) {
mt := newMultiplexTransport(
emptyNodeInfo(),
GenNodeKey(),
)
id := mt.nodeKey.ID
MultiplexTransportConnFilters(
func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil },
func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil },
func(_ ConnSet, _ net.Conn, _ []net.IP) error {
return fmt.Errorf("rejected")
},
)(mt)
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
errc := make(chan error)
go func() {
addr := NewNetAddress(id, mt.listener.Addr())
_, err := addr.Dial()
if err != nil {
errc <- err
return
}
close(errc)
}()
if err := <-errc; err != nil {
t.Errorf("connection failed: %v", err)
}
_, err = mt.Accept(peerConfig{})
if err, ok := err.(ErrRejected); ok {
if !err.IsFiltered() {
t.Errorf("expected peer to be filtered, got %v", err)
}
} else {
t.Errorf("expected ErrRejected, got %v", err)
}
}
func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
mt := newMultiplexTransport(
emptyNodeInfo(),
GenNodeKey(),
)
id := mt.nodeKey.ID
MultiplexTransportFilterTimeout(5 * time.Millisecond)(mt)
MultiplexTransportConnFilters(
func(_ ConnSet, _ net.Conn, _ []net.IP) error {
time.Sleep(1 * time.Second)
return nil
},
)(mt)
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
errc := make(chan error)
go func() {
addr := NewNetAddress(id, mt.listener.Addr())
_, err := addr.Dial()
if err != nil {
errc <- err
return
}
close(errc)
}()
if err := <-errc; err != nil {
t.Errorf("connection failed: %v", err)
}
_, err = mt.Accept(peerConfig{})
if _, ok := err.(ErrFilterTimeout); !ok {
t.Errorf("expected ErrFilterTimeout, got %v", err)
}
}
func TestTransportMultiplexMaxIncomingConnections(t *testing.T) {
pv := ed25519.GenPrivKey()
id := PubKeyToID(pv.PubKey())
mt := newMultiplexTransport(
testNodeInfo(
id, "transport",
),
NodeKey{
ID: id,
PrivKey: pv,
},
)
MultiplexTransportMaxIncomingConnections(0)(mt)
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
const maxIncomingConns = 2
MultiplexTransportMaxIncomingConnections(maxIncomingConns)(mt)
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
laddr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr())
// Connect more peers than max
for i := 0; i <= maxIncomingConns; i++ {
errc := make(chan error)
go testDialer(*laddr, errc)
err = <-errc
if i < maxIncomingConns {
if err != nil {
t.Errorf("dialer connection failed: %v", err)
}
_, err = mt.Accept(peerConfig{})
if err != nil {
t.Errorf("connection failed: %v", err)
}
} else if err == nil || !strings.Contains(err.Error(), "i/o timeout") {
// mt actually blocks forever on trying to accept a new peer into a full channel so
// expect the dialer to encounter a timeout error. Calling mt.Accept will block until
// mt is closed.
t.Errorf("expected i/o timeout error, got %v", err)
}
}
}
func TestTransportMultiplexAcceptMultiple(t *testing.T) {
mt := testSetupMultiplexTransport(t)
laddr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr())
var (
seed = rand.New(rand.NewSource(time.Now().UnixNano()))
nDialers = seed.Intn(64) + 64
errc = make(chan error, nDialers)
)
// Setup dialers.
for i := 0; i < nDialers; i++ {
go testDialer(*laddr, errc)
}
// Catch connection errors.
for i := 0; i < nDialers; i++ {
if err := <-errc; err != nil {
t.Fatal(err)
}
}
ps := []Peer{}
// Accept all peers.
for i := 0; i < cap(errc); i++ {
p, err := mt.Accept(peerConfig{})
if err != nil {
t.Fatal(err)
}
if err := p.Start(); err != nil {
t.Fatal(err)
}
ps = append(ps, p)
}
if have, want := len(ps), cap(errc); have != want {
t.Errorf("have %v, want %v", have, want)
}
// Stop all peers.
for _, p := range ps {
if err := p.Stop(); err != nil {
t.Fatal(err)
}
}
if err := mt.Close(); err != nil {
t.Errorf("close errored: %v", err)
}
}
func testDialer(dialAddr NetAddress, errc chan error) {
var (
pv = ed25519.GenPrivKey()
id = PubKeyToID(pv.PubKey())
dialer = newMultiplexTransport(
testNodeInfo(id, defaultNodeName),
NodeKey{
ID: id,
PrivKey: pv,
},
)
)
_, err := dialer.Dial(dialAddr, peerConfig{})
if err != nil {
errc <- err
return
}
// Signal that the connection was established.
errc <- nil
}
func TestTransportMultiplexAcceptNonBlocking(t *testing.T) {
mt := testSetupMultiplexTransport(t)
var (
fastNodePV = ed25519.GenPrivKey()
fastNodeInfo = testNodeInfo(PubKeyToID(fastNodePV.PubKey()), "fastnode")
errc = make(chan error)
fastc = make(chan struct{})
slowc = make(chan struct{})
slowdonec = make(chan struct{})
)
// Simulate slow Peer.
go func() {
addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr())
c, err := addr.Dial()
if err != nil {
errc <- err
return
}
close(slowc)
defer func() {
close(slowdonec)
}()
// Make sure we switch to fast peer goroutine.
runtime.Gosched()
select {
case <-fastc:
// Fast peer connected.
case <-time.After(200 * time.Millisecond):
// We error if the fast peer didn't succeed.
errc <- fmt.Errorf("fast peer timed out")
}
sc, err := upgradeSecretConn(c, 200*time.Millisecond, ed25519.GenPrivKey())
if err != nil {
errc <- err
return
}
_, err = handshake(sc, 200*time.Millisecond,
testNodeInfo(
PubKeyToID(ed25519.GenPrivKey().PubKey()),
"slow_peer",
))
if err != nil {
errc <- err
}
}()
// Simulate fast Peer.
go func() {
<-slowc
var (
dialer = newMultiplexTransport(
fastNodeInfo,
NodeKey{
ID: PubKeyToID(fastNodePV.PubKey()),
PrivKey: fastNodePV,
},
)
)
addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr())
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
errc <- err
return
}
close(fastc)
<-slowdonec
close(errc)
}()
if err := <-errc; err != nil {
t.Logf("connection failed: %v", err)
}
p, err := mt.Accept(peerConfig{})
if err != nil {
t.Fatal(err)
}
if have, want := p.NodeInfo(), fastNodeInfo; !reflect.DeepEqual(have, want) {
t.Errorf("have %v, want %v", have, want)
}
}
func TestTransportMultiplexValidateNodeInfo(t *testing.T) {
mt := testSetupMultiplexTransport(t)
errc := make(chan error)
go func() {
var (
pv = ed25519.GenPrivKey()
id = PubKeyToID(pv.PubKey())
dialer = newMultiplexTransport(
testNodeInfo(id, ""), // Should not be empty
NodeKey{
ID: id,
PrivKey: pv,
},
)
)
addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr())
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
errc <- err
return
}
close(errc)
}()
if err := <-errc; err != nil {
t.Errorf("connection failed: %v", err)
}
_, err := mt.Accept(peerConfig{})
if err, ok := err.(ErrRejected); ok {
if !err.IsNodeInfoInvalid() {
t.Errorf("expected NodeInfo to be invalid, got %v", err)
}
} else {
t.Errorf("expected ErrRejected, got %v", err)
}
}
func TestTransportMultiplexRejectMissmatchID(t *testing.T) {
mt := testSetupMultiplexTransport(t)
errc := make(chan error)
go func() {
dialer := newMultiplexTransport(
testNodeInfo(
PubKeyToID(ed25519.GenPrivKey().PubKey()), "dialer",
),
GenNodeKey(),
)
addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr())
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
errc <- err
return
}
close(errc)
}()
if err := <-errc; err != nil {
t.Errorf("connection failed: %v", err)
}
_, err := mt.Accept(peerConfig{})
if err, ok := err.(ErrRejected); ok {
if !err.IsAuthFailure() {
t.Errorf("expected auth failure, got %v", err)
}
} else {
t.Errorf("expected ErrRejected, got %v", err)
}
}
func TestTransportMultiplexDialRejectWrongID(t *testing.T) {
mt := testSetupMultiplexTransport(t)
var (
pv = ed25519.GenPrivKey()
id = PubKeyToID(pv.PubKey())
dialer = newMultiplexTransport(
testNodeInfo(id, ""), // Should not be empty
NodeKey{
ID: id,
PrivKey: pv,
},
)
)
wrongID := PubKeyToID(ed25519.GenPrivKey().PubKey())
addr := NewNetAddress(wrongID, mt.listener.Addr())
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
t.Logf("connection failed: %v", err)
if err, ok := err.(ErrRejected); ok {
if !err.IsAuthFailure() {
t.Errorf("expected auth failure, got %v", err)
}
} else {
t.Errorf("expected ErrRejected, got %v", err)
}
}
}
func TestTransportMultiplexRejectIncompatible(t *testing.T) {
mt := testSetupMultiplexTransport(t)
errc := make(chan error)
go func() {
var (
pv = ed25519.GenPrivKey()
id = PubKeyToID(pv.PubKey())
dialer = newMultiplexTransport(
testNodeInfoWithNetwork(id, "dialer", "incompatible-network"),
NodeKey{
ID: id,
PrivKey: pv,
},
)
)
addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr())
_, err := dialer.Dial(*addr, peerConfig{})
if err != nil {
errc <- err
return
}
close(errc)
}()
_, err := mt.Accept(peerConfig{})
if err, ok := err.(ErrRejected); ok {
if !err.IsIncompatible() {
t.Errorf("expected to reject incompatible, got %v", err)
}
} else {
t.Errorf("expected ErrRejected, got %v", err)
}
}
func TestTransportMultiplexRejectSelf(t *testing.T) {
mt := testSetupMultiplexTransport(t)
errc := make(chan error)
go func() {
addr := NewNetAddress(mt.nodeKey.ID, mt.listener.Addr())
_, err := mt.Dial(*addr, peerConfig{})
if err != nil {
errc <- err
return
}
close(errc)
}()
if err := <-errc; err != nil {
if err, ok := err.(ErrRejected); ok {
if !err.IsSelf() {
t.Errorf("expected to reject self, got: %v", err)
}
} else {
t.Errorf("expected ErrRejected, got %v", err)
}
} else {
t.Errorf("expected connection failure")
}
_, err := mt.Accept(peerConfig{})
if err, ok := err.(ErrRejected); ok {
if !err.IsSelf() {
t.Errorf("expected to reject self, got: %v", err)
}
} else {
t.Errorf("expected ErrRejected, got %v", nil)
}
}
func TestTransportConnDuplicateIPFilter(t *testing.T) {
filter := ConnDuplicateIPFilter()
if err := filter(nil, &testTransportConn{}, nil); err != nil {
t.Fatal(err)
}
var (
c = &testTransportConn{}
cs = NewConnSet()
)
cs.Set(c, []net.IP{
{10, 0, 10, 1},
{10, 0, 10, 2},
{10, 0, 10, 3},
})
if err := filter(cs, c, []net.IP{
{10, 0, 10, 2},
}); err == nil {
t.Errorf("expected Peer to be rejected as duplicate")
}
}
func TestTransportHandshake(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
var (
peerPV = ed25519.GenPrivKey()
peerNodeInfo = testNodeInfo(PubKeyToID(peerPV.PubKey()), defaultNodeName)
)
go func() {
c, err := net.Dial(ln.Addr().Network(), ln.Addr().String())
if err != nil {
t.Error(err)
return
}
go func(c net.Conn) {
_, err := protoio.NewDelimitedWriter(c).WriteMsg(peerNodeInfo.(DefaultNodeInfo).ToProto())
if err != nil {
t.Error(err)
}
}(c)
go func(c net.Conn) {
var (
// ni DefaultNodeInfo
pbni tmp2p.DefaultNodeInfo
)
protoReader := protoio.NewDelimitedReader(c, MaxNodeInfoSize())
err := protoReader.ReadMsg(&pbni)
if err != nil {
t.Error(err)
}
_, err = DefaultNodeInfoFromToProto(&pbni)
if err != nil {
t.Error(err)
}
}(c)
}()
c, err := ln.Accept()
if err != nil {
t.Fatal(err)
}
ni, err := handshake(c, 20*time.Millisecond, emptyNodeInfo())
if err != nil {
t.Fatal(err)
}
if have, want := ni, peerNodeInfo; !reflect.DeepEqual(have, want) {
t.Errorf("have %v, want %v", have, want)
}
}
// create listener
func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport {
var (
pv = ed25519.GenPrivKey()
id = PubKeyToID(pv.PubKey())
mt = newMultiplexTransport(
testNodeInfo(
id, "transport",
),
NodeKey{
ID: id,
PrivKey: pv,
},
)
)
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}
if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}
// give the listener some time to get ready
time.Sleep(20 * time.Millisecond)
return mt
}
type testTransportAddr struct{}
func (a *testTransportAddr) Network() string { return "tcp" }
func (a *testTransportAddr) String() string { return "test.local:1234" }
type testTransportConn struct{}
func (c *testTransportConn) Close() error {
return fmt.Errorf("close() not implemented")
}
func (c *testTransportConn) LocalAddr() net.Addr {
return &testTransportAddr{}
}
func (c *testTransportConn) RemoteAddr() net.Addr {
return &testTransportAddr{}
}
func (c *testTransportConn) Read(_ []byte) (int, error) {
return -1, fmt.Errorf("read() not implemented")
}
func (c *testTransportConn) SetDeadline(_ time.Time) error {
return fmt.Errorf("setDeadline() not implemented")
}
func (c *testTransportConn) SetReadDeadline(_ time.Time) error {
return fmt.Errorf("setReadDeadline() not implemented")
}
func (c *testTransportConn) SetWriteDeadline(_ time.Time) error {
return fmt.Errorf("setWriteDeadline() not implemented")
}
func (c *testTransportConn) Write(_ []byte) (int, error) {
return -1, fmt.Errorf("write() not implemented")
}

+ 26
- 27
test/maverick/node/node.go View File

@ -222,7 +222,7 @@ type Node struct {
privValidator types.PrivValidator // local node's validator key
// network
transport *p2p.MultiplexTransport
transport *p2p.MConnTransport
sw *p2p.Switch // p2p connections
addrBook pex.AddrBook // known peers
nodeInfo p2p.NodeInfo
@ -453,23 +453,22 @@ func createConsensusReactor(config *cfg.Config,
}
func createTransport(
logger log.Logger,
config *cfg.Config,
nodeInfo p2p.NodeInfo,
nodeKey p2p.NodeKey,
proxyApp proxy.AppConns,
) (
*p2p.MultiplexTransport,
*p2p.MConnTransport,
[]p2p.PeerFilterFunc,
) {
var (
mConnConfig = p2p.MConnConfig(config.P2P)
transport = p2p.NewMultiplexTransport(nodeInfo, nodeKey, mConnConfig)
connFilters = []p2p.ConnFilterFunc{}
peerFilters = []p2p.PeerFilterFunc{}
)
if !config.P2P.AllowDuplicateIP {
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter())
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter)
}
// Filter peers by addr or pubkey with an ABCI query.
@ -512,11 +511,12 @@ func createTransport(
)
}
p2p.MultiplexTransportConnFilters(connFilters...)(transport)
// Limit the number of incoming connections.
max := config.P2P.MaxNumInboundPeers + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
p2p.MultiplexTransportMaxIncomingConnections(max)(transport)
transport := p2p.NewMConnTransport(
logger, nodeInfo, nodeKey.PrivKey, p2p.MConnConfig(config.P2P),
p2p.MConnTransportConnFilters(connFilters...),
p2p.MConnTransportMaxIncomingConnections(config.P2P.MaxNumInboundPeers+
len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))),
)
return transport, peerFilters
}
@ -808,11 +808,9 @@ func NewNode(config *cfg.Config,
return nil, err
}
// Setup Transport.
transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp)
// Setup Switch.
// Setup Transport and Switch.
p2pLogger := logger.With("module", "p2p")
transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp)
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
stateSyncReactorShim, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger,
@ -921,29 +919,30 @@ func (n *Node) OnStart() error {
n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
}
// Start the transport.
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress))
if err != nil {
return err
}
if err := n.transport.Listen(*addr); err != nil {
return err
}
n.isListening = true
// Start the mempool.
if n.config.Mempool.WalEnabled() {
err = n.mempool.InitWAL()
err := n.mempool.InitWAL()
if err != nil {
return fmt.Errorf("init mempool WAL: %w", err)
}
}
// Start the switch (the P2P server).
err = n.sw.Start()
err := n.sw.Start()
if err != nil {
return err
}
// Start the transport.
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID, n.config.P2P.ListenAddress))
if err != nil {
return err
}
if err := n.transport.Listen(addr.Endpoint()); err != nil {
return err
}
n.isListening = true
// Start the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Start(); err != nil {


Loading…
Cancel
Save