Browse Source

Don't dial dupes

pull/9/head
Jae Kwon 11 years ago
parent
commit
ac668d4d14
6 changed files with 45 additions and 12 deletions
  1. +10
    -0
      common/cmap.go
  2. +5
    -3
      p2p/connection.go
  3. +1
    -0
      p2p/listener.go
  4. +16
    -6
      p2p/peer_manager.go
  5. +1
    -0
      p2p/peer_set.go
  6. +12
    -3
      p2p/switch.go

+ 10
- 0
common/cmap.go View File

@ -52,3 +52,13 @@ func (cm *CMap) Clear() {
defer cm.l.Unlock() defer cm.l.Unlock()
cm.m = make(map[string]interface{}, 0) cm.m = make(map[string]interface{}, 0)
} }
func (cm *CMap) Values() []interface{} {
cm.l.Lock()
defer cm.l.Unlock()
items := []interface{}{}
for _, v := range cm.m {
items = append(items, v)
}
return items
}

+ 5
- 3
p2p/connection.go View File

@ -191,16 +191,18 @@ func (c *Connection) recvHandler() {
FOR_LOOP: FOR_LOOP:
for { for {
pktType, err := ReadUInt8Safe(c.bufReader)
if log.IsEnabledFor(logging.DEBUG) { if log.IsEnabledFor(logging.DEBUG) {
// peeking into bufReader // peeking into bufReader
numBytes := c.bufReader.Buffered() numBytes := c.bufReader.Buffered()
bytes, err := c.bufReader.Peek(MinInt(numBytes, 100)) bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
log.Debug("recvHandler peeked: %X\nerr:%v", bytes, err)
if err != nil {
log.Debug("recvHandler packet type %X, peeked: %X", pktType, bytes)
}
} }
pktType, err := ReadUInt8Safe(c.bufReader)
if err != nil { if err != nil {
if atomic.LoadUint32(&c.stopped) != 1 { if atomic.LoadUint32(&c.stopped) != 1 {
log.Info("%v failed @ recvHandler", c)
log.Info("%v failed @ recvHandler with err: %v", c, err)
c.Stop() c.Stop()
} }
break FOR_LOOP break FOR_LOOP


+ 1
- 0
p2p/listener.go View File

@ -85,6 +85,7 @@ func NewDefaultListener(protocol string, lAddr string) Listener {
return dl return dl
} }
// TODO: prevent abuse, esp a bunch of connections coming from the same IP range.
func (l *DefaultListener) listenHandler() { func (l *DefaultListener) listenHandler() {
for { for {
conn, err := l.listener.Accept() conn, err := l.listener.Accept()


+ 16
- 6
p2p/peer_manager.go View File

@ -86,14 +86,17 @@ FOR_LOOP:
// Ensures that sufficient peers are connected. // Ensures that sufficient peers are connected.
func (pm *PeerManager) ensurePeers() { func (pm *PeerManager) ensurePeers() {
numPeers := pm.sw.NumOutboundPeers()
numDialing := pm.sw.dialing.Size()
numToDial := minNumPeers - (numPeers + numDialing)
numOutPeers, _, numDialing := pm.sw.NumPeers()
numToDial := minNumPeers - (numOutPeers + numDialing)
if numToDial <= 0 { if numToDial <= 0 {
return return
} }
toDial := NewCMap()
// Try to pick numToDial addresses to dial.
// TODO: improve logic.
for i := 0; i < numToDial; i++ { for i := 0; i < numToDial; i++ {
newBias := MinInt(numPeers, 8)*10 + 10
newBias := MinInt(numOutPeers, 8)*10 + 10
var picked *NetAddress var picked *NetAddress
// Try to fetch a new peer 3 times. // Try to fetch a new peer 3 times.
// This caps the maximum number of tries to 3 * numToDial. // This caps the maximum number of tries to 3 * numToDial.
@ -103,7 +106,9 @@ func (pm *PeerManager) ensurePeers() {
log.Debug("Empty addrbook.") log.Debug("Empty addrbook.")
return return
} }
if pm.sw.Peers().Has(picked) {
if toDial.Has(picked.String()) ||
pm.sw.IsDialing(picked) ||
pm.sw.Peers().Has(picked) {
continue continue
} else { } else {
break break
@ -112,7 +117,12 @@ func (pm *PeerManager) ensurePeers() {
if picked == nil { if picked == nil {
continue continue
} }
// Dial picked address
toDial.Set(picked.String(), picked)
}
// Dial picked addresses
for _, item := range toDial.Values() {
picked := item.(*NetAddress)
go func() { go func() {
peer, err := pm.sw.DialPeerWithAddress(picked) peer, err := pm.sw.DialPeerWithAddress(picked)
if err != nil { if err != nil {


+ 1
- 0
p2p/peer_set.go View File

@ -37,6 +37,7 @@ func NewPeerSet() *PeerSet {
} }
} }
// Returns false if peer with address is already in set.
func (ps *PeerSet) Add(peer *Peer) bool { func (ps *PeerSet) Add(peer *Peer) bool {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()


+ 12
- 3
p2p/switch.go View File

@ -118,6 +118,10 @@ func (s *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
return peer, nil return peer, nil
} }
func (s *Switch) IsDialing(addr *NetAddress) bool {
return s.dialing.Has(addr.String())
}
func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) { func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) {
if atomic.LoadUint32(&s.stopped) == 1 { if atomic.LoadUint32(&s.stopped) == 1 {
return return
@ -145,7 +149,7 @@ func (s *Switch) Receive(chName string) *InboundPacket {
return nil return nil
} }
log.Debug("Receive on [%v]", chName)
log.Debug("Waiting for [%v]", chName)
q := s.pktRecvQueues[chName] q := s.pktRecvQueues[chName]
if q == nil { if q == nil {
Panicf("Expected pktRecvQueues[%f], found none", chName) Panicf("Expected pktRecvQueues[%f], found none", chName)
@ -155,17 +159,22 @@ func (s *Switch) Receive(chName string) *InboundPacket {
case <-s.quit: case <-s.quit:
return nil return nil
case inPacket := <-q: case inPacket := <-q:
log.Debug("Received packet on [%v]", chName)
return inPacket return inPacket
} }
} }
func (s *Switch) NumOutboundPeers() (count int) {
// Returns the count of outbound/inbound and outbound-dialing peers.
func (s *Switch) NumPeers() (outbound, inbound, dialing int) {
peers := s.peers.List() peers := s.peers.List()
for _, peer := range peers { for _, peer := range peers {
if peer.outbound { if peer.outbound {
count++
outbound++
} else {
inbound++
} }
} }
dialing = s.dialing.Size()
return return
} }


Loading…
Cancel
Save