Browse Source

p2p: filter peers by IP address and ID (#6300)

pull/6302/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
d0b513c182
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 174 additions and 60 deletions
  1. +34
    -3
      node/node.go
  2. +106
    -57
      p2p/router.go
  3. +34
    -0
      p2p/router_filter_test.go

+ 34
- 3
node/node.go View File

@ -917,7 +917,7 @@ func NewSeedNode(config *cfg.Config,
}
router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey,
peerManager, transport, getRouterConfig(config))
peerManager, transport, getRouterConfig(config, nil))
if err != nil {
return nil, fmt.Errorf("failed to create router: %w", err)
}
@ -1080,7 +1080,7 @@ func NewNode(config *cfg.Config,
csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)
router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey,
peerManager, transport, getRouterConfig(config))
peerManager, transport, getRouterConfig(config, proxyApp))
if err != nil {
return nil, fmt.Errorf("failed to create router: %w", err)
}
@ -1963,7 +1963,7 @@ func createAndStartPrivValidatorGRPCClient(
return pvsc, nil
}
func getRouterConfig(conf *cfg.Config) p2p.RouterOptions {
func getRouterConfig(conf *cfg.Config, proxyApp proxy.AppConns) p2p.RouterOptions {
opts := p2p.RouterOptions{
QueueType: p2pRouterQueueType,
}
@ -1972,6 +1972,37 @@ func getRouterConfig(conf *cfg.Config) p2p.RouterOptions {
opts.MaxIncommingConnectionsPerIP = uint(conf.P2P.MaxNumInboundPeers)
}
if conf.FilterPeers && proxyApp != nil {
opts.FilterPeerByID = func(ctx context.Context, id p2p.NodeID) error {
res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/id/%s", id),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("error querying abci app: %v", res)
}
return nil
}
opts.FilterPeerByIP = func(ctx context.Context, ip net.IP, port uint16) error {
res, err := proxyApp.Query().QuerySync(ctx, abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/addr/%s", net.JoinHostPort(ip.String(), strconv.Itoa(int(port)))),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("error querying abci app: %v", res)
}
return nil
}
}
return opts
}


+ 106
- 57
p2p/router.go View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"net"
"sync"
"time"
@ -139,6 +140,21 @@ type RouterOptions struct {
// can attempt to create a new connection. Defaults to 10
// milliseconds, and cannot be less than 1 millisecond.
IncomingConnectionWindow time.Duration
// FilterPeerByIP is used by the router to inject filtering
// behavior for new incoming connections. The router passes
// the remote IP of the incoming connection the port number as
// arguments. Functions should return an error to reject the
// peer.
FilterPeerByIP func(context.Context, net.IP, uint16) error
// FilterPeerByID is used by the router to inject filtering
// behavior for new incoming connections. The router passes
// the NodeID of the node before completing the connection,
// but this occurs after the handshake is complete. Filter by
// IP address to filter before the handshake. Functions should
// return an error to reject the peer.
FilterPeerByID func(context.Context, NodeID) error
}
const (
@ -471,15 +487,28 @@ func (r *Router) routeChannel(
}
}
func (r *Router) filterPeersIP(ctx context.Context, ip net.IP, port uint16) error {
if r.options.FilterPeerByIP == nil {
return nil
}
return r.options.FilterPeerByIP(ctx, ip, port)
}
func (r *Router) filterPeersID(ctx context.Context, id NodeID) error {
if r.options.FilterPeerByID == nil {
return nil
}
return r.options.FilterPeerByID(ctx, id)
}
// acceptPeers accepts inbound connections from peers on the given transport,
// and spawns goroutines that route messages to/from them.
func (r *Router) acceptPeers(transport Transport) {
r.logger.Debug("starting accept routine", "transport", transport)
ctx := r.stopCtx()
for {
// FIXME: The old P2P stack supported ABCI-based IP address filtering via
// /p2p/filter/addr/<ip> queries, do we want to implement this here as well?
// Filtering by node ID is probably better.
conn, err := transport.Accept()
switch err {
case nil:
@ -499,72 +528,92 @@ func (r *Router) acceptPeers(transport Transport) {
"ip", incomingIP.String(),
"closeErr", closeErr)
continue
return
}
// Spawn a goroutine for the handshake, to avoid head-of-line blocking.
go func() {
defer conn.Close()
defer r.connTracker.RemoveConn(incomingIP)
// FIXME: The peer manager may reject the peer during Accepted()
// after we've handshaked with the peer (to find out which peer it
// is). However, because the handshake has no ack, the remote peer
// will think the handshake was successful and start sending us
// messages.
//
// This can cause problems in tests, where a disconnection can cause
// the local node to immediately redial, while the remote node may
// not have completed the disconnection yet and therefore reject the
// reconnection attempt (since it thinks we're still connected from
// before).
//
// The Router should do the handshake and have a final ack/fail
// message to make sure both ends have accepted the connection, such
// that it can be coordinated with the peer manager.
peerInfo, _, err := r.handshakePeer(ctx, conn, "")
switch {
case errors.Is(err, context.Canceled):
return
case err != nil:
r.logger.Error("peer handshake failed", "endpoint", conn, "err", err)
return
}
go r.openConnection(ctx, conn)
if err := r.peerManager.Accepted(peerInfo.NodeID); err != nil {
r.logger.Error("failed to accept connection", "peer", peerInfo.NodeID, "err", err)
return
}
}
}
r.metrics.Peers.Add(1)
queue := r.queueFactory(queueBufferDefault)
func (r *Router) openConnection(ctx context.Context, conn Connection) {
defer conn.Close()
defer r.connTracker.RemoveConn(conn.RemoteEndpoint().IP)
r.peerMtx.Lock()
r.peerQueues[peerInfo.NodeID] = queue
r.peerMtx.Unlock()
re := conn.RemoteEndpoint()
incomingIP := re.IP
defer func() {
r.peerMtx.Lock()
delete(r.peerQueues, peerInfo.NodeID)
r.peerMtx.Unlock()
if err := r.filterPeersIP(ctx, incomingIP, re.Port); err != nil {
r.logger.Debug("peer filtered by IP",
"ip", incomingIP.String(),
"err", err)
return
}
queue.close()
// FIXME: The peer manager may reject the peer during Accepted()
// after we've handshaked with the peer (to find out which peer it
// is). However, because the handshake has no ack, the remote peer
// will think the handshake was successful and start sending us
// messages.
//
// This can cause problems in tests, where a disconnection can cause
// the local node to immediately redial, while the remote node may
// not have completed the disconnection yet and therefore reject the
// reconnection attempt (since it thinks we're still connected from
// before).
//
// The Router should do the handshake and have a final ack/fail
// message to make sure both ends have accepted the connection, such
// that it can be coordinated with the peer manager.
peerInfo, _, err := r.handshakePeer(ctx, conn, "")
switch {
case errors.Is(err, context.Canceled):
return
case err != nil:
r.logger.Error("peer handshake failed", "endpoint", conn, "err", err)
return
}
if err := r.peerManager.Disconnected(peerInfo.NodeID); err != nil {
r.logger.Error("failed to disconnect peer", "peer", peerInfo.NodeID, "err", err)
} else {
r.metrics.Peers.Add(-1)
}
}()
if err := r.filterPeersID(ctx, peerInfo.NodeID); err != nil {
r.logger.Debug("peer filtered by node ID",
"node", peerInfo.NodeID,
"err", err)
return
}
if err := r.peerManager.Ready(peerInfo.NodeID); err != nil {
r.logger.Error("failed to mark peer as ready", "peer", peerInfo.NodeID, "err", err)
return
}
if err := r.peerManager.Accepted(peerInfo.NodeID); err != nil {
r.logger.Error("failed to accept connection", "peer", peerInfo.NodeID, "err", err)
return
}
r.routePeer(peerInfo.NodeID, conn, queue)
}()
r.metrics.Peers.Add(1)
queue := r.queueFactory(queueBufferDefault)
r.peerMtx.Lock()
r.peerQueues[peerInfo.NodeID] = queue
r.peerMtx.Unlock()
defer func() {
r.peerMtx.Lock()
delete(r.peerQueues, peerInfo.NodeID)
r.peerMtx.Unlock()
queue.close()
if err := r.peerManager.Disconnected(peerInfo.NodeID); err != nil {
r.logger.Error("failed to disconnect peer", "peer", peerInfo.NodeID, "err", err)
} else {
r.metrics.Peers.Add(-1)
}
}()
if err := r.peerManager.Ready(peerInfo.NodeID); err != nil {
r.logger.Error("failed to mark peer as ready", "peer", peerInfo.NodeID, "err", err)
return
}
r.routePeer(peerInfo.NodeID, conn, queue)
}
// dialPeers maintains outbound connections to peers by dialing them.


+ 34
- 0
p2p/router_filter_test.go View File

@ -0,0 +1,34 @@
package p2p
import (
"context"
"errors"
"net"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/sync"
)
func TestConnectionFiltering(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
filterByIPCount := 0
router := &Router{
logger: logger,
connTracker: newConnTracker(1, time.Second),
options: RouterOptions{
FilterPeerByIP: func(ctx context.Context, ip net.IP, port uint16) error {
filterByIPCount++
return errors.New("mock")
},
},
}
require.Equal(t, 0, filterByIPCount)
router.openConnection(ctx, &MemoryConnection{logger: logger, closer: sync.NewCloser()})
require.Equal(t, 1, filterByIPCount)
}

Loading…
Cancel
Save