From d0b513c182ee777ffed3bef983e07981416f9a36 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 30 Mar 2021 18:00:43 -0400 Subject: [PATCH] p2p: filter peers by IP address and ID (#6300) --- node/node.go | 37 ++++++++- p2p/router.go | 163 +++++++++++++++++++++++++------------- p2p/router_filter_test.go | 34 ++++++++ 3 files changed, 174 insertions(+), 60 deletions(-) create mode 100644 p2p/router_filter_test.go diff --git a/node/node.go b/node/node.go index 9304bf41c..4b2b584ae 100644 --- a/node/node.go +++ b/node/node.go @@ -917,7 +917,7 @@ func NewSeedNode(config *cfg.Config, } router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey, - peerManager, transport, getRouterConfig(config)) + peerManager, transport, getRouterConfig(config, nil)) if err != nil { return nil, fmt.Errorf("failed to create router: %w", err) } @@ -1080,7 +1080,7 @@ func NewNode(config *cfg.Config, csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey, - peerManager, transport, getRouterConfig(config)) + peerManager, transport, getRouterConfig(config, proxyApp)) if err != nil { return nil, fmt.Errorf("failed to create router: %w", err) } @@ -1963,7 +1963,7 @@ func createAndStartPrivValidatorGRPCClient( return pvsc, nil } -func getRouterConfig(conf *cfg.Config) p2p.RouterOptions { +func getRouterConfig(conf *cfg.Config, proxyApp proxy.AppConns) p2p.RouterOptions { opts := p2p.RouterOptions{ QueueType: p2pRouterQueueType, } @@ -1972,6 +1972,37 @@ func getRouterConfig(conf *cfg.Config) p2p.RouterOptions { opts.MaxIncommingConnectionsPerIP = uint(conf.P2P.MaxNumInboundPeers) } + if conf.FilterPeers && proxyApp != nil { + opts.FilterPeerByID = func(ctx context.Context, id p2p.NodeID) error { + res, err := proxyApp.Query().QuerySync(context.Background(), abci.RequestQuery{ + Path: fmt.Sprintf("/p2p/filter/id/%s", id), + }) + if err != nil { + return err + } + if res.IsErr() { + return fmt.Errorf("error querying abci app: %v", res) + } + + return nil + } + + opts.FilterPeerByIP = func(ctx context.Context, ip net.IP, port uint16) error { + res, err := proxyApp.Query().QuerySync(ctx, abci.RequestQuery{ + Path: fmt.Sprintf("/p2p/filter/addr/%s", net.JoinHostPort(ip.String(), strconv.Itoa(int(port)))), + }) + if err != nil { + return err + } + if res.IsErr() { + return fmt.Errorf("error querying abci app: %v", res) + } + + return nil + } + + } + return opts } diff --git a/p2p/router.go b/p2p/router.go index 8fbeccc8b..5c55164f3 100644 --- a/p2p/router.go +++ b/p2p/router.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "net" "sync" "time" @@ -139,6 +140,21 @@ type RouterOptions struct { // can attempt to create a new connection. Defaults to 10 // milliseconds, and cannot be less than 1 millisecond. IncomingConnectionWindow time.Duration + + // FilterPeerByIP is used by the router to inject filtering + // behavior for new incoming connections. The router passes + // the remote IP of the incoming connection the port number as + // arguments. Functions should return an error to reject the + // peer. + FilterPeerByIP func(context.Context, net.IP, uint16) error + + // FilterPeerByID is used by the router to inject filtering + // behavior for new incoming connections. The router passes + // the NodeID of the node before completing the connection, + // but this occurs after the handshake is complete. Filter by + // IP address to filter before the handshake. Functions should + // return an error to reject the peer. + FilterPeerByID func(context.Context, NodeID) error } const ( @@ -471,15 +487,28 @@ func (r *Router) routeChannel( } } +func (r *Router) filterPeersIP(ctx context.Context, ip net.IP, port uint16) error { + if r.options.FilterPeerByIP == nil { + return nil + } + + return r.options.FilterPeerByIP(ctx, ip, port) +} + +func (r *Router) filterPeersID(ctx context.Context, id NodeID) error { + if r.options.FilterPeerByID == nil { + return nil + } + + return r.options.FilterPeerByID(ctx, id) +} + // acceptPeers accepts inbound connections from peers on the given transport, // and spawns goroutines that route messages to/from them. func (r *Router) acceptPeers(transport Transport) { r.logger.Debug("starting accept routine", "transport", transport) ctx := r.stopCtx() for { - // FIXME: The old P2P stack supported ABCI-based IP address filtering via - // /p2p/filter/addr/ queries, do we want to implement this here as well? - // Filtering by node ID is probably better. conn, err := transport.Accept() switch err { case nil: @@ -499,72 +528,92 @@ func (r *Router) acceptPeers(transport Transport) { "ip", incomingIP.String(), "closeErr", closeErr) - continue + return } // Spawn a goroutine for the handshake, to avoid head-of-line blocking. - go func() { - defer conn.Close() - defer r.connTracker.RemoveConn(incomingIP) - - // FIXME: The peer manager may reject the peer during Accepted() - // after we've handshaked with the peer (to find out which peer it - // is). However, because the handshake has no ack, the remote peer - // will think the handshake was successful and start sending us - // messages. - // - // This can cause problems in tests, where a disconnection can cause - // the local node to immediately redial, while the remote node may - // not have completed the disconnection yet and therefore reject the - // reconnection attempt (since it thinks we're still connected from - // before). - // - // The Router should do the handshake and have a final ack/fail - // message to make sure both ends have accepted the connection, such - // that it can be coordinated with the peer manager. - peerInfo, _, err := r.handshakePeer(ctx, conn, "") - switch { - case errors.Is(err, context.Canceled): - return - case err != nil: - r.logger.Error("peer handshake failed", "endpoint", conn, "err", err) - return - } + go r.openConnection(ctx, conn) - if err := r.peerManager.Accepted(peerInfo.NodeID); err != nil { - r.logger.Error("failed to accept connection", "peer", peerInfo.NodeID, "err", err) - return - } + } +} - r.metrics.Peers.Add(1) - queue := r.queueFactory(queueBufferDefault) +func (r *Router) openConnection(ctx context.Context, conn Connection) { + defer conn.Close() + defer r.connTracker.RemoveConn(conn.RemoteEndpoint().IP) - r.peerMtx.Lock() - r.peerQueues[peerInfo.NodeID] = queue - r.peerMtx.Unlock() + re := conn.RemoteEndpoint() + incomingIP := re.IP - defer func() { - r.peerMtx.Lock() - delete(r.peerQueues, peerInfo.NodeID) - r.peerMtx.Unlock() + if err := r.filterPeersIP(ctx, incomingIP, re.Port); err != nil { + r.logger.Debug("peer filtered by IP", + "ip", incomingIP.String(), + "err", err) + return + } - queue.close() + // FIXME: The peer manager may reject the peer during Accepted() + // after we've handshaked with the peer (to find out which peer it + // is). However, because the handshake has no ack, the remote peer + // will think the handshake was successful and start sending us + // messages. + // + // This can cause problems in tests, where a disconnection can cause + // the local node to immediately redial, while the remote node may + // not have completed the disconnection yet and therefore reject the + // reconnection attempt (since it thinks we're still connected from + // before). + // + // The Router should do the handshake and have a final ack/fail + // message to make sure both ends have accepted the connection, such + // that it can be coordinated with the peer manager. + peerInfo, _, err := r.handshakePeer(ctx, conn, "") + switch { + case errors.Is(err, context.Canceled): + return + case err != nil: + r.logger.Error("peer handshake failed", "endpoint", conn, "err", err) + return + } - if err := r.peerManager.Disconnected(peerInfo.NodeID); err != nil { - r.logger.Error("failed to disconnect peer", "peer", peerInfo.NodeID, "err", err) - } else { - r.metrics.Peers.Add(-1) - } - }() + if err := r.filterPeersID(ctx, peerInfo.NodeID); err != nil { + r.logger.Debug("peer filtered by node ID", + "node", peerInfo.NodeID, + "err", err) + return + } - if err := r.peerManager.Ready(peerInfo.NodeID); err != nil { - r.logger.Error("failed to mark peer as ready", "peer", peerInfo.NodeID, "err", err) - return - } + if err := r.peerManager.Accepted(peerInfo.NodeID); err != nil { + r.logger.Error("failed to accept connection", "peer", peerInfo.NodeID, "err", err) + return + } - r.routePeer(peerInfo.NodeID, conn, queue) - }() + r.metrics.Peers.Add(1) + queue := r.queueFactory(queueBufferDefault) + + r.peerMtx.Lock() + r.peerQueues[peerInfo.NodeID] = queue + r.peerMtx.Unlock() + + defer func() { + r.peerMtx.Lock() + delete(r.peerQueues, peerInfo.NodeID) + r.peerMtx.Unlock() + + queue.close() + + if err := r.peerManager.Disconnected(peerInfo.NodeID); err != nil { + r.logger.Error("failed to disconnect peer", "peer", peerInfo.NodeID, "err", err) + } else { + r.metrics.Peers.Add(-1) + } + }() + + if err := r.peerManager.Ready(peerInfo.NodeID); err != nil { + r.logger.Error("failed to mark peer as ready", "peer", peerInfo.NodeID, "err", err) + return } + + r.routePeer(peerInfo.NodeID, conn, queue) } // dialPeers maintains outbound connections to peers by dialing them. diff --git a/p2p/router_filter_test.go b/p2p/router_filter_test.go new file mode 100644 index 000000000..51c4947b4 --- /dev/null +++ b/p2p/router_filter_test.go @@ -0,0 +1,34 @@ +package p2p + +import ( + "context" + "errors" + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/sync" +) + +func TestConnectionFiltering(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := log.TestingLogger() + + filterByIPCount := 0 + router := &Router{ + logger: logger, + connTracker: newConnTracker(1, time.Second), + options: RouterOptions{ + FilterPeerByIP: func(ctx context.Context, ip net.IP, port uint16) error { + filterByIPCount++ + return errors.New("mock") + }, + }, + } + require.Equal(t, 0, filterByIPCount) + router.openConnection(ctx, &MemoryConnection{logger: logger, closer: sync.NewCloser()}) + require.Equal(t, 1, filterByIPCount) +}