diff --git a/common/cmap.go b/common/cmap.go index 384ca8c47..421b9c54a 100644 --- a/common/cmap.go +++ b/common/cmap.go @@ -52,3 +52,13 @@ func (cm *CMap) Clear() { defer cm.l.Unlock() cm.m = make(map[string]interface{}, 0) } + +func (cm *CMap) Values() []interface{} { + cm.l.Lock() + defer cm.l.Unlock() + items := []interface{}{} + for _, v := range cm.m { + items = append(items, v) + } + return items +} diff --git a/p2p/connection.go b/p2p/connection.go index 3f40b5c12..20d8f382a 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -191,16 +191,18 @@ func (c *Connection) recvHandler() { FOR_LOOP: for { + pktType, err := ReadUInt8Safe(c.bufReader) if log.IsEnabledFor(logging.DEBUG) { // peeking into bufReader numBytes := c.bufReader.Buffered() bytes, err := c.bufReader.Peek(MinInt(numBytes, 100)) - log.Debug("recvHandler peeked: %X\nerr:%v", bytes, err) + if err != nil { + log.Debug("recvHandler packet type %X, peeked: %X", pktType, bytes) + } } - pktType, err := ReadUInt8Safe(c.bufReader) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { - log.Info("%v failed @ recvHandler", c) + log.Info("%v failed @ recvHandler with err: %v", c, err) c.Stop() } break FOR_LOOP diff --git a/p2p/listener.go b/p2p/listener.go index 6ebe28f83..a9b3d1b89 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -85,6 +85,7 @@ func NewDefaultListener(protocol string, lAddr string) Listener { return dl } +// TODO: prevent abuse, esp a bunch of connections coming from the same IP range. func (l *DefaultListener) listenHandler() { for { conn, err := l.listener.Accept() diff --git a/p2p/peer_manager.go b/p2p/peer_manager.go index da0bc57fb..ad65e6999 100644 --- a/p2p/peer_manager.go +++ b/p2p/peer_manager.go @@ -86,14 +86,17 @@ FOR_LOOP: // Ensures that sufficient peers are connected. func (pm *PeerManager) ensurePeers() { - numPeers := pm.sw.NumOutboundPeers() - numDialing := pm.sw.dialing.Size() - numToDial := minNumPeers - (numPeers + numDialing) + numOutPeers, _, numDialing := pm.sw.NumPeers() + numToDial := minNumPeers - (numOutPeers + numDialing) if numToDial <= 0 { return } + toDial := NewCMap() + + // Try to pick numToDial addresses to dial. + // TODO: improve logic. for i := 0; i < numToDial; i++ { - newBias := MinInt(numPeers, 8)*10 + 10 + newBias := MinInt(numOutPeers, 8)*10 + 10 var picked *NetAddress // Try to fetch a new peer 3 times. // This caps the maximum number of tries to 3 * numToDial. @@ -103,7 +106,9 @@ func (pm *PeerManager) ensurePeers() { log.Debug("Empty addrbook.") return } - if pm.sw.Peers().Has(picked) { + if toDial.Has(picked.String()) || + pm.sw.IsDialing(picked) || + pm.sw.Peers().Has(picked) { continue } else { break @@ -112,7 +117,12 @@ func (pm *PeerManager) ensurePeers() { if picked == nil { continue } - // Dial picked address + toDial.Set(picked.String(), picked) + } + + // Dial picked addresses + for _, item := range toDial.Values() { + picked := item.(*NetAddress) go func() { peer, err := pm.sw.DialPeerWithAddress(picked) if err != nil { diff --git a/p2p/peer_set.go b/p2p/peer_set.go index c1404f5f9..b3bdfa8df 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -37,6 +37,7 @@ func NewPeerSet() *PeerSet { } } +// Returns false if peer with address is already in set. func (ps *PeerSet) Add(peer *Peer) bool { ps.mtx.Lock() defer ps.mtx.Unlock() diff --git a/p2p/switch.go b/p2p/switch.go index 68c29da01..f3a41bd16 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -118,6 +118,10 @@ func (s *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { return peer, nil } +func (s *Switch) IsDialing(addr *NetAddress) bool { + return s.dialing.Has(addr.String()) +} + func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) { if atomic.LoadUint32(&s.stopped) == 1 { return @@ -145,7 +149,7 @@ func (s *Switch) Receive(chName string) *InboundPacket { return nil } - log.Debug("Receive on [%v]", chName) + log.Debug("Waiting for [%v]", chName) q := s.pktRecvQueues[chName] if q == nil { Panicf("Expected pktRecvQueues[%f], found none", chName) @@ -155,17 +159,22 @@ func (s *Switch) Receive(chName string) *InboundPacket { case <-s.quit: return nil case inPacket := <-q: + log.Debug("Received packet on [%v]", chName) return inPacket } } -func (s *Switch) NumOutboundPeers() (count int) { +// Returns the count of outbound/inbound and outbound-dialing peers. +func (s *Switch) NumPeers() (outbound, inbound, dialing int) { peers := s.peers.List() for _, peer := range peers { if peer.outbound { - count++ + outbound++ + } else { + inbound++ } } + dialing = s.dialing.Size() return }