You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

227 lines
5.4 KiB

9 years ago
  1. package p2p
  2. import (
  3. "net"
  4. "strings"
  5. "sync"
  6. )
  7. // IPeerSet has a (immutable) subset of the methods of PeerSet.
  8. type IPeerSet interface {
  9. Has(key string) bool
  10. Get(key string) *Peer
  11. List() []*Peer
  12. Size() int
  13. }
  14. //-----------------------------------------------------------------------------
  15. var (
  16. maxPeersPerIPRange = [4]int{11, 7, 5, 3} // ...
  17. )
  18. // PeerSet is a special structure for keeping a table of peers.
  19. // Iteration over the peers is super fast and thread-safe.
  20. // We also track how many peers per IP range and avoid too many
  21. type PeerSet struct {
  22. mtx sync.Mutex
  23. lookup map[string]*peerSetItem
  24. list []*Peer
  25. connectedIPs *nestedCounter
  26. }
  27. type peerSetItem struct {
  28. peer *Peer
  29. index int
  30. }
  31. func NewPeerSet() *PeerSet {
  32. return &PeerSet{
  33. lookup: make(map[string]*peerSetItem),
  34. list: make([]*Peer, 0, 256),
  35. connectedIPs: NewNestedCounter(),
  36. }
  37. }
  38. // Returns false if peer with key (PubKeyEd25519) is already in set
  39. // or if we have too many peers from the peer's IP range
  40. func (ps *PeerSet) Add(peer *Peer) error {
  41. ps.mtx.Lock()
  42. defer ps.mtx.Unlock()
  43. if ps.lookup[peer.Key] != nil {
  44. return ErrSwitchDuplicatePeer
  45. }
  46. // ensure we havent maxed out connections for the peer's IP range yet
  47. // and update the IP range counters
  48. if !ps.incrIPRangeCounts(peer.Host()) {
  49. return ErrSwitchMaxPeersPerIPRange
  50. }
  51. index := len(ps.list)
  52. // Appending is safe even with other goroutines
  53. // iterating over the ps.list slice.
  54. ps.list = append(ps.list, peer)
  55. ps.lookup[peer.Key] = &peerSetItem{peer, index}
  56. return nil
  57. }
  58. func (ps *PeerSet) Has(peerKey string) bool {
  59. ps.mtx.Lock()
  60. defer ps.mtx.Unlock()
  61. _, ok := ps.lookup[peerKey]
  62. return ok
  63. }
  64. func (ps *PeerSet) Get(peerKey string) *Peer {
  65. ps.mtx.Lock()
  66. defer ps.mtx.Unlock()
  67. item, ok := ps.lookup[peerKey]
  68. if ok {
  69. return item.peer
  70. } else {
  71. return nil
  72. }
  73. }
  74. func (ps *PeerSet) Remove(peer *Peer) {
  75. ps.mtx.Lock()
  76. defer ps.mtx.Unlock()
  77. item := ps.lookup[peer.Key]
  78. if item == nil {
  79. return
  80. }
  81. // update the IP range counters
  82. ps.decrIPRangeCounts(peer.Host())
  83. index := item.index
  84. // Copy the list but without the last element.
  85. // (we must copy because we're mutating the list)
  86. newList := make([]*Peer, len(ps.list)-1)
  87. copy(newList, ps.list)
  88. // If it's the last peer, that's an easy special case.
  89. if index == len(ps.list)-1 {
  90. ps.list = newList
  91. delete(ps.lookup, peer.Key)
  92. return
  93. }
  94. // Move the last item from ps.list to "index" in list.
  95. lastPeer := ps.list[len(ps.list)-1]
  96. lastPeerKey := lastPeer.Key
  97. lastPeerItem := ps.lookup[lastPeerKey]
  98. newList[index] = lastPeer
  99. lastPeerItem.index = index
  100. ps.list = newList
  101. delete(ps.lookup, peer.Key)
  102. }
  103. func (ps *PeerSet) Size() int {
  104. ps.mtx.Lock()
  105. defer ps.mtx.Unlock()
  106. return len(ps.list)
  107. }
  108. // threadsafe list of peers.
  109. func (ps *PeerSet) List() []*Peer {
  110. ps.mtx.Lock()
  111. defer ps.mtx.Unlock()
  112. return ps.list
  113. }
  114. //-----------------------------------------------------------------------------
  115. // track the number of IPs we're connected to for each IP address range
  116. // forms an IP address hierarchy tree with counts
  117. // the struct itself is not thread safe and should always only be accessed with the ps.mtx locked
  118. type nestedCounter struct {
  119. count int
  120. children map[string]*nestedCounter
  121. }
  122. func NewNestedCounter() *nestedCounter {
  123. nc := new(nestedCounter)
  124. nc.children = make(map[string]*nestedCounter)
  125. return nc
  126. }
  127. // Check if we have too many IPs in the IP range of the incoming connection
  128. // Thread safe
  129. func (ps *PeerSet) HasMaxForIPRange(conn net.Conn) (ok bool) {
  130. ps.mtx.Lock()
  131. defer ps.mtx.Unlock()
  132. ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
  133. ipBytes := strings.Split(ip, ".")
  134. c := ps.connectedIPs
  135. for i, ipByte := range ipBytes {
  136. if c, ok = c.children[ipByte]; !ok {
  137. return false
  138. }
  139. if maxPeersPerIPRange[i] <= c.count {
  140. return true
  141. }
  142. }
  143. return false
  144. }
  145. // Increments counts for this address' IP range
  146. // Returns false if we already have enough connections
  147. // Not thread safe (only called by ps.Add())
  148. func (ps *PeerSet) incrIPRangeCounts(address string) bool {
  149. addrParts := strings.Split(address, ".")
  150. c := ps.connectedIPs
  151. return incrNestedCounters(c, addrParts, 0)
  152. }
  153. // Recursively descend the IP hierarchy, checking if we have
  154. // max peers for each range and incrementing if not.
  155. // Returns false if incr failed because max peers reached for some range counter.
  156. func incrNestedCounters(c *nestedCounter, ipBytes []string, index int) bool {
  157. ipByte := ipBytes[index]
  158. child := c.children[ipByte]
  159. if child == nil {
  160. child = NewNestedCounter()
  161. c.children[ipByte] = child
  162. }
  163. if index+1 < len(ipBytes) {
  164. if !incrNestedCounters(child, ipBytes, index+1) {
  165. return false
  166. }
  167. }
  168. if maxPeersPerIPRange[index] <= child.count {
  169. return false
  170. } else {
  171. child.count += 1
  172. return true
  173. }
  174. }
  175. // Decrement counts for this address' IP range
  176. func (ps *PeerSet) decrIPRangeCounts(address string) {
  177. addrParts := strings.Split(address, ".")
  178. c := ps.connectedIPs
  179. decrNestedCounters(c, addrParts, 0)
  180. }
  181. // Recursively descend the IP hierarchy, decrementing by one.
  182. // If the counter is zero, deletes the child.
  183. func decrNestedCounters(c *nestedCounter, ipBytes []string, index int) {
  184. ipByte := ipBytes[index]
  185. child := c.children[ipByte]
  186. if child == nil {
  187. log.Error("p2p/peer_set decrNestedCounters encountered a missing child counter")
  188. return
  189. }
  190. if index+1 < len(ipBytes) {
  191. decrNestedCounters(child, ipBytes, index+1)
  192. }
  193. child.count -= 1
  194. if child.count <= 0 {
  195. delete(c.children, ipByte)
  196. }
  197. }