diff --git a/p2p/listener.go b/p2p/listener.go index 06ff02863..790caa41c 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -98,7 +98,9 @@ SKIP_UPNP: return dl } -// TODO: prevent abuse, esp a bunch of connections coming from the same IP range. +// Accept connections and pass on the channel +// Reading from the channel blocks on the peerHandshake for each connection +// Connection is ignored if we have too many connections to that ip range func (l *DefaultListener) listenRoutine() { for { conn, err := l.listener.Accept() diff --git a/p2p/peer_set.go b/p2p/peer_set.go index 4b43defec..a0e434751 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -1,6 +1,8 @@ package p2p import ( + "net" + "strings" "sync" ) @@ -14,12 +16,18 @@ type IPeerSet interface { //----------------------------------------------------------------------------- +var ( + maxPeersPerIPRange = [4]int{11, 7, 5, 3} // ... +) + // PeerSet is a special structure for keeping a table of peers. // Iteration over the peers is super fast and thread-safe. +// We also track how many peers per ip range and avoid too many type PeerSet struct { - mtx sync.Mutex - lookup map[string]*peerSetItem - list []*Peer + mtx sync.Mutex + lookup map[string]*peerSetItem + list []*Peer + connectedIPs *nestedCounter } type peerSetItem struct { @@ -29,24 +37,33 @@ type peerSetItem struct { func NewPeerSet() *PeerSet { return &PeerSet{ - lookup: make(map[string]*peerSetItem), - list: make([]*Peer, 0, 256), + lookup: make(map[string]*peerSetItem), + list: make([]*Peer, 0, 256), + connectedIPs: NewNestedCounter(), } } -// Returns false if peer with key (uuid) is already in set. -func (ps *PeerSet) Add(peer *Peer) bool { +// Returns false if peer with key (uuid) is already in set +// or if we have too many peers from the peer's ip range +func (ps *PeerSet) Add(peer *Peer) error { ps.mtx.Lock() defer ps.mtx.Unlock() if ps.lookup[peer.Key] != nil { - return false + return ErrSwitchDuplicatePeer + } + + // ensure we havent maxed out connections for the peer's ip range yet + // and update the ip range counters + if !ps.updateIPRangeCounts(peer.Host) { + return ErrSwitchMaxPeersPerIPRange } + index := len(ps.list) // Appending is safe even with other goroutines // iterating over the ps.list slice. ps.list = append(ps.list, peer) ps.lookup[peer.Key] = &peerSetItem{peer, index} - return true + return nil } func (ps *PeerSet) Has(peerKey string) bool { @@ -107,3 +124,73 @@ func (ps *PeerSet) List() []*Peer { defer ps.mtx.Unlock() return ps.list } + +//----------------------------------------------------------------------------- +// track the number of ips we're connected to for each ip address range + +// forms an ip address hierarchy tree with counts +// the struct itself is not thread safe and should always only be accessed with the ps.mtx locked +type nestedCounter struct { + count int + children map[string]*nestedCounter +} + +func NewNestedCounter() *nestedCounter { + nc := new(nestedCounter) + nc.children = make(map[string]*nestedCounter) + return nc +} + +// Check if we have too many ips in the ip range of the incoming connection +// Thread safe +func (ps *PeerSet) HasMaxForIPRange(conn net.Conn) (ok bool) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) + spl := strings.Split(ip, ".") + + c := ps.connectedIPs + for i, ipByte := range spl { + if c, ok = c.children[ipByte]; !ok { + return false + } + if c.count == maxPeersPerIPRange[i] { + return true + } + } + return false +} + +// Update counts for this address' ip range +// Returns false if we already have enough connections +// Not thread safe (only called by ps.Add()) +func (ps *PeerSet) updateIPRangeCounts(address string) bool { + spl := strings.Split(address, ".") + + c := ps.connectedIPs + return updateNestedCountRecursive(c, spl, 0) +} + +// recursively descend the ip hierarchy, checking if we have +// max peers for each range and updating if not +func updateNestedCountRecursive(c *nestedCounter, ipBytes []string, index int) bool { + if index == len(ipBytes) { + return true + } + ipByte := ipBytes[index] + if c2, ok := c.children[ipByte]; !ok { + c2 = NewNestedCounter() + c.children[ipByte] = c2 + c = c2 + } else { + c = c2 + if c.count == maxPeersPerIPRange[index] { + return false + } + } + if !updateNestedCountRecursive(c, ipBytes, index+1) { + return false + } + c.count += 1 + return true +} diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index d322c13ac..dfd8ee153 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -1,15 +1,22 @@ package p2p import ( + "math/rand" + "strings" "testing" "github.com/tendermint/tendermint/Godeps/_workspace/src/code.google.com/p/go-uuid/uuid" + . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/types" ) // Returns an empty dummy peer func randPeer() *Peer { return &Peer{ Key: uuid.New(), + NodeInfo: &types.NodeInfo{ + Host: Fmt("%v.%v.%v.%v", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), + }, } } @@ -18,7 +25,7 @@ func TestAddRemoveOne(t *testing.T) { peer := randPeer() added := peerSet.Add(peer) - if !added { + if err := added; err != nil { t.Errorf("Failed to add new peer") } if peerSet.Size() != 1 { @@ -38,10 +45,11 @@ func TestAddRemoveMany(t *testing.T) { peerSet := NewPeerSet() peers := []*Peer{} - for i := 0; i < 100; i++ { + N := 100 + maxPeersPerIPRange = [4]int{N, N, N, N} + for i := 0; i < N; i++ { peer := randPeer() - added := peerSet.Add(peer) - if !added { + if err := peerSet.Add(peer); err != nil { t.Errorf("Failed to add new peer") } if peerSet.Size() != i+1 { @@ -60,3 +68,105 @@ func TestAddRemoveMany(t *testing.T) { } } } + +func newPeerInIPRange(ipBytes ...string) *Peer { + ips := make([]string, 4) + for i, ipByte := range ipBytes { + ips[i] = ipByte + } + + for i := len(ipBytes); i < 4; i++ { + ips[i] = Fmt("%v", rand.Int()%256) + } + + ipS := strings.Join(ips, ".") + return &Peer{ + Key: uuid.New(), + NodeInfo: &types.NodeInfo{ + Host: ipS, + }, + } +} + +func TestIPRanges(t *testing.T) { + peerSet := NewPeerSet() + + // test /8 + maxPeersPerIPRange = [4]int{2, 2, 2, 2} + peer := newPeerInIPRange("54") + if err := peerSet.Add(peer); err != nil { + t.Errorf("Failed to add new peer") + } + peer = newPeerInIPRange("54") + if err := peerSet.Add(peer); err != nil { + t.Errorf("Failed to add new peer") + } + peer = newPeerInIPRange("54") + if err := peerSet.Add(peer); err == nil { + t.Errorf("Added peer when we shouldn't have") + } + peer = newPeerInIPRange("55") + if err := peerSet.Add(peer); err != nil { + t.Errorf("Failed to add new peer") + } + + // test /16 + peerSet = NewPeerSet() + maxPeersPerIPRange = [4]int{3, 2, 1, 1} + peer = newPeerInIPRange("54", "112") + if err := peerSet.Add(peer); err != nil { + t.Errorf("Failed to add new peer") + } + peer = newPeerInIPRange("54", "112") + if err := peerSet.Add(peer); err != nil { + t.Errorf("Failed to add new peer") + } + peer = newPeerInIPRange("54", "112") + if err := peerSet.Add(peer); err == nil { + t.Errorf("Added peer when we shouldn't have") + } + peer = newPeerInIPRange("54", "113") + if err := peerSet.Add(peer); err != nil { + t.Errorf("Failed to add new peer") + } + + // test /24 + peerSet = NewPeerSet() + maxPeersPerIPRange = [4]int{5, 3, 2, 1} + peer = newPeerInIPRange("54", "112", "11") + if err := peerSet.Add(peer); err != nil { + t.Errorf("Failed to add new peer") + } + peer = newPeerInIPRange("54", "112", "11") + if err := peerSet.Add(peer); err != nil { + t.Errorf("Failed to add new peer") + } + peer = newPeerInIPRange("54", "112", "11") + if err := peerSet.Add(peer); err == nil { + t.Errorf("Added peer when we shouldn't have") + } + peer = newPeerInIPRange("54", "112", "12") + if err := peerSet.Add(peer); err != nil { + t.Errorf("Failed to add new peer") + } + + // test /32 + peerSet = NewPeerSet() + maxPeersPerIPRange = [4]int{11, 7, 5, 2} + peer = newPeerInIPRange("54", "112", "11", "10") + if err := peerSet.Add(peer); err != nil { + t.Errorf("Failed to add new peer") + } + peer = newPeerInIPRange("54", "112", "11", "10") + if err := peerSet.Add(peer); err != nil { + t.Errorf("Failed to add new peer") + } + peer = newPeerInIPRange("54", "112", "11", "10") + if err := peerSet.Add(peer); err == nil { + t.Errorf("Added peer when we shouldn't have") + } + peer = newPeerInIPRange("54", "112", "11", "11") + if err := peerSet.Add(peer); err != nil { + t.Errorf("Failed to add new peer") + } +} diff --git a/p2p/switch.go b/p2p/switch.go index b2584c8c0..bc5c004fc 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -52,8 +52,8 @@ type Switch struct { } var ( - ErrSwitchDuplicatePeer = errors.New("Duplicate peer") - ErrSwitchMaxPeersPerIP = errors.New("IP has too many peers") + ErrSwitchDuplicatePeer = errors.New("Duplicate peer") + ErrSwitchMaxPeersPerIPRange = errors.New("IP range has too many peers") ) const ( @@ -194,24 +194,12 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er } peer := newPeer(conn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) - // restrict the number of peers we're willing to connect to behind a single IP - var numPeersOnThisIP int - peers := sw.Peers().List() - for _, p := range peers { - if p.Host == peerNodeInfo.Host { - numPeersOnThisIP += 1 - } - } - if numPeersOnThisIP == maxPeersPerIP { - log.Info("Ignoring peer as we have the max allowed for that IP", "IP", peerNodeInfo.Host, "peer", peer, "max", maxPeersPerIP) - return nil, ErrSwitchMaxPeersPerIP - } - // Add the peer to .peers - if !sw.peers.Add(peer) { - log.Info("Ignoring duplicate peer", "peer", peer) + // ignore if duplicate or if we already have too many for that ip range + if err := sw.peers.Add(peer); err != nil { + log.Info("Ignoring peer", "error", err, "peer", peer) peer.stop() // will also close conn - return nil, ErrSwitchDuplicatePeer + return nil, err } if atomic.LoadUint32(&sw.running) == 1 { @@ -319,6 +307,12 @@ func (sw *Switch) listenerRoutine(l Listener) { if !ok { break } + // Ignore connections from ip ranges for which we have too many + if sw.peers.HasMaxForIPRange(inConn) { + log.Debug("Already have enough peers for that IP range", "address", inConn.RemoteAddr().String()) + continue + } + // New inbound connection! peer, err := sw.AddPeerWithConnection(inConn, false) if err != nil {