- package p2p
-
- import (
- "context"
- "fmt"
- "net"
- "time"
-
- "golang.org/x/net/netutil"
-
- "github.com/tendermint/tendermint/crypto"
- "github.com/tendermint/tendermint/libs/protoio"
- "github.com/tendermint/tendermint/p2p/conn"
- tmp2p "github.com/tendermint/tendermint/proto/p2p"
- )
-
- const (
- defaultDialTimeout = time.Second
- defaultFilterTimeout = 5 * time.Second
- defaultHandshakeTimeout = 3 * time.Second
- )
-
- // IPResolver is a behaviour subset of net.Resolver.
- type IPResolver interface {
- LookupIPAddr(context.Context, string) ([]net.IPAddr, error)
- }
-
- // accept is the container to carry the upgraded connection and NodeInfo from an
- // asynchronously running routine to the Accept method.
- type accept struct {
- netAddr *NetAddress
- conn net.Conn
- nodeInfo NodeInfo
- err error
- }
-
- // peerConfig is used to bundle data we need to fully setup a Peer with an
- // MConn, provided by the caller of Accept and Dial (currently the Switch). This
- // a temporary measure until reactor setup is less dynamic and we introduce the
- // concept of PeerBehaviour to communicate about significant Peer lifecycle
- // events.
- // TODO(xla): Refactor out with more static Reactor setup and PeerBehaviour.
- type peerConfig struct {
- chDescs []*conn.ChannelDescriptor
- onPeerError func(Peer, interface{})
- outbound bool
- // isPersistent allows you to set a function, which, given socket address
- // (for outbound peers) OR self-reported address (for inbound peers), tells
- // if the peer is persistent or not.
- isPersistent func(*NetAddress) bool
- reactorsByCh map[byte]Reactor
- metrics *Metrics
- }
-
- // Transport emits and connects to Peers. The implementation of Peer is left to
- // the transport. Each transport is also responsible to filter establishing
- // peers specific to its domain.
- type Transport interface {
- // Listening address.
- NetAddress() NetAddress
-
- // Accept returns a newly connected Peer.
- Accept(peerConfig) (Peer, error)
-
- // Dial connects to the Peer for the address.
- Dial(NetAddress, peerConfig) (Peer, error)
-
- // Cleanup any resources associated with Peer.
- Cleanup(Peer)
- }
-
- // transportLifecycle bundles the methods for callers to control start and stop
- // behaviour.
- type transportLifecycle interface {
- Close() error
- Listen(NetAddress) error
- }
-
- // ConnFilterFunc to be implemented by filter hooks after a new connection has
- // been established. The set of exisiting connections is passed along together
- // with all resolved IPs for the new connection.
- type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error
-
- // ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection
- // and refuses new ones if they come from a known ip.
- func ConnDuplicateIPFilter() ConnFilterFunc {
- return func(cs ConnSet, c net.Conn, ips []net.IP) error {
- for _, ip := range ips {
- if cs.HasIP(ip) {
- return ErrRejected{
- conn: c,
- err: fmt.Errorf("ip<%v> already connected", ip),
- isDuplicate: true,
- }
- }
- }
-
- return nil
- }
- }
-
- // MultiplexTransportOption sets an optional parameter on the
- // MultiplexTransport.
- type MultiplexTransportOption func(*MultiplexTransport)
-
- // MultiplexTransportConnFilters sets the filters for rejection new connections.
- func MultiplexTransportConnFilters(
- filters ...ConnFilterFunc,
- ) MultiplexTransportOption {
- return func(mt *MultiplexTransport) { mt.connFilters = filters }
- }
-
- // MultiplexTransportFilterTimeout sets the timeout waited for filter calls to
- // return.
- func MultiplexTransportFilterTimeout(
- timeout time.Duration,
- ) MultiplexTransportOption {
- return func(mt *MultiplexTransport) { mt.filterTimeout = timeout }
- }
-
- // MultiplexTransportResolver sets the Resolver used for ip lokkups, defaults to
- // net.DefaultResolver.
- func MultiplexTransportResolver(resolver IPResolver) MultiplexTransportOption {
- return func(mt *MultiplexTransport) { mt.resolver = resolver }
- }
-
- // MultiplexTransportMaxIncomingConnections sets the maximum number of
- // simultaneous connections (incoming). Default: 0 (unlimited)
- func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption {
- return func(mt *MultiplexTransport) { mt.maxIncomingConnections = n }
- }
-
- // MultiplexTransport accepts and dials tcp connections and upgrades them to
- // multiplexed peers.
- type MultiplexTransport struct {
- netAddr NetAddress
- listener net.Listener
- maxIncomingConnections int // see MaxIncomingConnections
-
- acceptc chan accept
- closec chan struct{}
-
- // Lookup table for duplicate ip and id checks.
- conns ConnSet
- connFilters []ConnFilterFunc
-
- dialTimeout time.Duration
- filterTimeout time.Duration
- handshakeTimeout time.Duration
- nodeInfo NodeInfo
- nodeKey NodeKey
- resolver IPResolver
-
- // TODO(xla): This config is still needed as we parameterise peerConn and
- // peer currently. All relevant configuration should be refactored into options
- // with sane defaults.
- mConfig conn.MConnConfig
- }
-
- // Test multiplexTransport for interface completeness.
- var _ Transport = (*MultiplexTransport)(nil)
- var _ transportLifecycle = (*MultiplexTransport)(nil)
-
- // NewMultiplexTransport returns a tcp connected multiplexed peer.
- func NewMultiplexTransport(
- nodeInfo NodeInfo,
- nodeKey NodeKey,
- mConfig conn.MConnConfig,
- ) *MultiplexTransport {
- return &MultiplexTransport{
- acceptc: make(chan accept),
- closec: make(chan struct{}),
- dialTimeout: defaultDialTimeout,
- filterTimeout: defaultFilterTimeout,
- handshakeTimeout: defaultHandshakeTimeout,
- mConfig: mConfig,
- nodeInfo: nodeInfo,
- nodeKey: nodeKey,
- conns: NewConnSet(),
- resolver: net.DefaultResolver,
- }
- }
-
- // NetAddress implements Transport.
- func (mt *MultiplexTransport) NetAddress() NetAddress {
- return mt.netAddr
- }
-
- // Accept implements Transport.
- func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) {
- select {
- // This case should never have any side-effectful/blocking operations to
- // ensure that quality peers are ready to be used.
- case a := <-mt.acceptc:
- if a.err != nil {
- return nil, a.err
- }
-
- cfg.outbound = false
-
- return mt.wrapPeer(a.conn, a.nodeInfo, cfg, a.netAddr), nil
- case <-mt.closec:
- return nil, ErrTransportClosed{}
- }
- }
-
- // Dial implements Transport.
- func (mt *MultiplexTransport) Dial(
- addr NetAddress,
- cfg peerConfig,
- ) (Peer, error) {
- c, err := addr.DialTimeout(mt.dialTimeout)
- if err != nil {
- return nil, err
- }
-
- // TODO(xla): Evaluate if we should apply filters if we explicitly dial.
- if err := mt.filterConn(c); err != nil {
- return nil, err
- }
-
- secretConn, nodeInfo, err := mt.upgrade(c, &addr)
- if err != nil {
- return nil, err
- }
-
- cfg.outbound = true
-
- p := mt.wrapPeer(secretConn, nodeInfo, cfg, &addr)
-
- return p, nil
- }
-
- // Close implements transportLifecycle.
- func (mt *MultiplexTransport) Close() error {
- close(mt.closec)
-
- if mt.listener != nil {
- return mt.listener.Close()
- }
-
- return nil
- }
-
- // Listen implements transportLifecycle.
- func (mt *MultiplexTransport) Listen(addr NetAddress) error {
- ln, err := net.Listen("tcp", addr.DialString())
- if err != nil {
- return err
- }
-
- if mt.maxIncomingConnections > 0 {
- ln = netutil.LimitListener(ln, mt.maxIncomingConnections)
- }
-
- mt.netAddr = addr
- mt.listener = ln
-
- go mt.acceptPeers()
-
- return nil
- }
-
- func (mt *MultiplexTransport) acceptPeers() {
- for {
- c, err := mt.listener.Accept()
- if err != nil {
- // If Close() has been called, silently exit.
- select {
- case _, ok := <-mt.closec:
- if !ok {
- return
- }
- default:
- // Transport is not closed
- }
-
- mt.acceptc <- accept{err: err}
- return
- }
-
- // Connection upgrade and filtering should be asynchronous to avoid
- // Head-of-line blocking[0].
- // Reference: https://github.com/tendermint/tendermint/issues/2047
- //
- // [0] https://en.wikipedia.org/wiki/Head-of-line_blocking
- go func(c net.Conn) {
- defer func() {
- if r := recover(); r != nil {
- err := ErrRejected{
- conn: c,
- err: fmt.Errorf("recovered from panic: %v", r),
- isAuthFailure: true,
- }
- select {
- case mt.acceptc <- accept{err: err}:
- case <-mt.closec:
- // Give up if the transport was closed.
- _ = c.Close()
- return
- }
- }
- }()
-
- var (
- nodeInfo NodeInfo
- secretConn *conn.SecretConnection
- netAddr *NetAddress
- )
-
- err := mt.filterConn(c)
- if err == nil {
- secretConn, nodeInfo, err = mt.upgrade(c, nil)
- if err == nil {
- addr := c.RemoteAddr()
- id := PubKeyToID(secretConn.RemotePubKey())
- netAddr = NewNetAddress(id, addr)
- }
- }
-
- select {
- case mt.acceptc <- accept{netAddr, secretConn, nodeInfo, err}:
- // Make the upgraded peer available.
- case <-mt.closec:
- // Give up if the transport was closed.
- _ = c.Close()
- return
- }
- }(c)
- }
- }
-
- // Cleanup removes the given address from the connections set and
- // closes the connection.
- func (mt *MultiplexTransport) Cleanup(p Peer) {
- mt.conns.RemoveAddr(p.RemoteAddr())
- _ = p.CloseConn()
- }
-
- func (mt *MultiplexTransport) cleanup(c net.Conn) error {
- mt.conns.Remove(c)
-
- return c.Close()
- }
-
- func (mt *MultiplexTransport) filterConn(c net.Conn) (err error) {
- defer func() {
- if err != nil {
- _ = c.Close()
- }
- }()
-
- // Reject if connection is already present.
- if mt.conns.Has(c) {
- return ErrRejected{conn: c, isDuplicate: true}
- }
-
- // Resolve ips for incoming conn.
- ips, err := resolveIPs(mt.resolver, c)
- if err != nil {
- return err
- }
-
- errc := make(chan error, len(mt.connFilters))
-
- for _, f := range mt.connFilters {
- go func(f ConnFilterFunc, c net.Conn, ips []net.IP, errc chan<- error) {
- errc <- f(mt.conns, c, ips)
- }(f, c, ips, errc)
- }
-
- for i := 0; i < cap(errc); i++ {
- select {
- case err := <-errc:
- if err != nil {
- return ErrRejected{conn: c, err: err, isFiltered: true}
- }
- case <-time.After(mt.filterTimeout):
- return ErrFilterTimeout{}
- }
-
- }
-
- mt.conns.Set(c, ips)
-
- return nil
- }
-
- func (mt *MultiplexTransport) upgrade(
- c net.Conn,
- dialedAddr *NetAddress,
- ) (secretConn *conn.SecretConnection, nodeInfo NodeInfo, err error) {
- defer func() {
- if err != nil {
- _ = mt.cleanup(c)
- }
- }()
-
- secretConn, err = upgradeSecretConn(c, mt.handshakeTimeout, mt.nodeKey.PrivKey)
- if err != nil {
- return nil, nil, ErrRejected{
- conn: c,
- err: fmt.Errorf("secret conn failed: %v", err),
- isAuthFailure: true,
- }
- }
-
- // For outgoing conns, ensure connection key matches dialed key.
- connID := PubKeyToID(secretConn.RemotePubKey())
- if dialedAddr != nil {
- if dialedID := dialedAddr.ID; connID != dialedID {
- return nil, nil, ErrRejected{
- conn: c,
- id: connID,
- err: fmt.Errorf(
- "conn.ID (%v) dialed ID (%v) mismatch",
- connID,
- dialedID,
- ),
- isAuthFailure: true,
- }
- }
- }
-
- nodeInfo, err = handshake(secretConn, mt.handshakeTimeout, mt.nodeInfo)
- if err != nil {
- return nil, nil, ErrRejected{
- conn: c,
- err: fmt.Errorf("handshake failed: %v", err),
- isAuthFailure: true,
- }
- }
-
- if err := nodeInfo.Validate(); err != nil {
- return nil, nil, ErrRejected{
- conn: c,
- err: err,
- isNodeInfoInvalid: true,
- }
- }
-
- // Ensure connection key matches self reported key.
- if connID != nodeInfo.ID() {
- return nil, nil, ErrRejected{
- conn: c,
- id: connID,
- err: fmt.Errorf(
- "conn.ID (%v) NodeInfo.ID (%v) mismatch",
- connID,
- nodeInfo.ID(),
- ),
- isAuthFailure: true,
- }
- }
-
- // Reject self.
- if mt.nodeInfo.ID() == nodeInfo.ID() {
- return nil, nil, ErrRejected{
- addr: *NewNetAddress(nodeInfo.ID(), c.RemoteAddr()),
- conn: c,
- id: nodeInfo.ID(),
- isSelf: true,
- }
- }
-
- if err := mt.nodeInfo.CompatibleWith(nodeInfo); err != nil {
- return nil, nil, ErrRejected{
- conn: c,
- err: err,
- id: nodeInfo.ID(),
- isIncompatible: true,
- }
- }
-
- return secretConn, nodeInfo, nil
- }
-
- func (mt *MultiplexTransport) wrapPeer(
- c net.Conn,
- ni NodeInfo,
- cfg peerConfig,
- socketAddr *NetAddress,
- ) Peer {
-
- persistent := false
- if cfg.isPersistent != nil {
- if cfg.outbound {
- persistent = cfg.isPersistent(socketAddr)
- } else {
- selfReportedAddr, err := ni.NetAddress()
- if err == nil {
- persistent = cfg.isPersistent(selfReportedAddr)
- }
- }
- }
-
- peerConn := newPeerConn(
- cfg.outbound,
- persistent,
- c,
- socketAddr,
- )
-
- p := newPeer(
- peerConn,
- mt.mConfig,
- ni,
- cfg.reactorsByCh,
- cfg.chDescs,
- cfg.onPeerError,
- PeerMetrics(cfg.metrics),
- )
-
- return p
- }
-
- func handshake(
- c net.Conn,
- timeout time.Duration,
- nodeInfo NodeInfo,
- ) (NodeInfo, error) {
- if err := c.SetDeadline(time.Now().Add(timeout)); err != nil {
- return nil, err
- }
-
- var (
- errc = make(chan error, 2)
-
- pbpeerNodeInfo tmp2p.DefaultNodeInfo
- peerNodeInfo DefaultNodeInfo
- ourNodeInfo = nodeInfo.(DefaultNodeInfo)
- )
-
- go func(errc chan<- error, c net.Conn) {
- _, err := protoio.NewDelimitedWriter(c).WriteMsg(ourNodeInfo.ToProto())
- errc <- err
- }(errc, c)
- go func(errc chan<- error, c net.Conn) {
- protoReader := protoio.NewDelimitedReader(c, MaxNodeInfoSize())
- err := protoReader.ReadMsg(&pbpeerNodeInfo)
- errc <- err
- }(errc, c)
-
- for i := 0; i < cap(errc); i++ {
- err := <-errc
- if err != nil {
- return nil, err
- }
- }
-
- peerNodeInfo, err := DefaultNodeInfoFromToProto(&pbpeerNodeInfo)
- if err != nil {
- return nil, err
- }
-
- return peerNodeInfo, c.SetDeadline(time.Time{})
- }
-
- func upgradeSecretConn(
- c net.Conn,
- timeout time.Duration,
- privKey crypto.PrivKey,
- ) (*conn.SecretConnection, error) {
- if err := c.SetDeadline(time.Now().Add(timeout)); err != nil {
- return nil, err
- }
-
- sc, err := conn.MakeSecretConnection(c, privKey)
- if err != nil {
- return nil, err
- }
-
- return sc, sc.SetDeadline(time.Time{})
- }
-
- func resolveIPs(resolver IPResolver, c net.Conn) ([]net.IP, error) {
- host, _, err := net.SplitHostPort(c.RemoteAddr().String())
- if err != nil {
- return nil, err
- }
-
- addrs, err := resolver.LookupIPAddr(context.Background(), host)
- if err != nil {
- return nil, err
- }
-
- ips := []net.IP{}
-
- for _, addr := range addrs {
- ips = append(ips, addr.IP)
- }
-
- return ips, nil
- }
|