diff --git a/README.md b/README.md index cf614c1e6..747dbeb3d 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,8 @@ TenderMint - proof of concept ### Status -* Implement basic peer exchange -* Implemented the basics of peer/* +* Node & testnet *now* +* PEX peer exchange *complete* +* p2p/* *complete* * Ed25519 bindings *complete* * merkle/* *complete* diff --git a/common/math.go b/common/math.go new file mode 100644 index 000000000..cc9c31ade --- /dev/null +++ b/common/math.go @@ -0,0 +1,115 @@ +package common + +func MaxInt8(a, b int8) int8 { + if a > b { + return a + } + return b +} + +func MaxUint8(a, b uint8) uint8 { + if a > b { + return a + } + return b +} + +func MaxInt16(a, b int16) int16 { + if a > b { + return a + } + return b +} + +func MaxUint16(a, b uint16) uint16 { + if a > b { + return a + } + return b +} + +func MaxInt32(a, b int32) int32 { + if a > b { + return a + } + return b +} + +func MaxUint32(a, b uint32) uint32 { + if a > b { + return a + } + return b +} + +func MaxInt(a, b int) int { + if a > b { + return a + } + return b +} + +func MaxUint(a, b uint) uint { + if a > b { + return a + } + return b +} + +//----------------------------------------------------------------------------- + +func MinInt8(a, b int8) int8 { + if a < b { + return a + } + return b +} + +func MinUint8(a, b uint8) uint8 { + if a < b { + return a + } + return b +} + +func MinInt16(a, b int16) int16 { + if a < b { + return a + } + return b +} + +func MinUint16(a, b uint16) uint16 { + if a < b { + return a + } + return b +} + +func MinInt32(a, b int32) int32 { + if a < b { + return a + } + return b +} + +func MinUint32(a, b uint32) uint32 { + if a < b { + return a + } + return b +} + +func MinInt(a, b int) int { + if a < b { + return a + } + return b +} + +func MinUint(a, b uint) uint { + if a < b { + return a + } + return b +} diff --git a/main.go b/main.go index 182496444..f08868621 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,9 @@ package main import ( "os" "os/signal" + "time" + . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" ) @@ -18,7 +20,7 @@ const ( type Node struct { sw *p2p.Switch - book *p2p.AddressBook + book *p2p.AddrBook quit chan struct{} dialing *CMap } @@ -50,7 +52,7 @@ func NewNode() *Node { sw := p2p.NewSwitch(chDescs) book := p2p.NewAddrBook(config.AppDir + "/addrbook.json") - return &New{ + return &Node{ sw: sw, book: book, quit: make(chan struct{}, 0), @@ -59,20 +61,21 @@ func NewNode() *Node { } func (n *Node) Start() { + log.Infof("Starting node") n.sw.Start() n.book.Start() - go p2p.PexHandler(sw, book) - go n.ensurePeersHandler(sw, book) + go p2p.PexHandler(n.sw, n.book) + go n.ensurePeersHandler() } -func (n *Node) initPeer(peer *Peer) { - if peer.IsOutgoing() { +func (n *Node) initPeer(peer *p2p.Peer) { + if peer.IsOutbound() { // TODO: initiate PEX } } // Add a Listener to accept incoming peer connections. -func (n *Node) AddListener(l Listener) { +func (n *Node) AddListener(l p2p.Listener) { go func() { for { inConn, ok := <-l.Connections() @@ -92,18 +95,56 @@ func (n *Node) AddListener(l Listener) { // Ensures that sufficient peers are connected. func (n *Node) ensurePeers() { - numPeers := len(n.sw.Peers()) + numPeers := n.sw.NumOutboundPeers() numDialing := n.dialing.Size() - numToDial = minNumPeers - (numPeers + numDialing) + numToDial := minNumPeers - (numPeers + numDialing) if numToDial <= 0 { return } for i := 0; i < numToDial; i++ { - // XXX + newBias := MinInt(numPeers, 8)*10 + 10 + var picked *p2p.NetAddress + // Try to fetch a new peer 3 times. + // This caps the maximum number of tries to 3 * numToDial. + for j := 0; i < 3; j++ { + picked = n.book.PickAddress(newBias) + if picked == nil { + log.Infof("Empty addrbook.") + return + } + if n.sw.Peers().Has(picked) { + continue + } else { + break + } + } + if picked == nil { + continue + } + n.dialing.Set(picked.String(), picked) + n.book.MarkAttempt(picked) + go func() { + log.Infof("Dialing addr: %v", picked) + conn, err := picked.DialTimeout(peerDialTimeoutSeconds * time.Second) + n.dialing.Delete(picked.String()) + if err != nil { + // ignore error. + return + } + peer, err := n.sw.AddPeerWithConnection(conn, true) + if err != nil { + log.Warnf("Error trying to add new outbound peer connection:%v", err) + return + } + n.initPeer(peer) + }() } } func (n *Node) ensurePeersHandler() { + // fire once immediately. + n.ensurePeers() + // fire periodically timer := NewRepeatTimer(ensurePeersPeriodSeconds * time.Second) FOR_LOOP: for { @@ -131,7 +172,8 @@ func main() { n := NewNode() l := p2p.NewDefaultListener("tcp", ":8001") - n.AddListener() + n.AddListener(l) + n.Start() if false { // TODO remove @@ -141,7 +183,7 @@ func main() { log.Infof("Error connecting to it: %v", err) return } - peer, err := sw.AddPeerWithConnection(conn, true) + peer, err := n.sw.AddPeerWithConnection(conn, true) if err != nil { log.Infof("Error adding peer with connection: %v", err) return diff --git a/p2p/addrbook.go b/p2p/addrbook.go index 2cdb9683e..066427536 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -123,7 +123,7 @@ func (a *AddrBook) init() { func (a *AddrBook) Start() { if atomic.CompareAndSwapUint32(&a.started, 0, 1) { - log.Trace("Starting address manager") + log.Infof("Starting address manager") a.loadFromFile(a.filePath) a.wg.Add(1) go a.saveHandler() @@ -367,7 +367,7 @@ out: dumpAddressTicker.Stop() a.saveToFile(a.filePath) a.wg.Done() - log.Trace("Address handler done") + log.Info("Address handler done") } func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress { @@ -399,7 +399,7 @@ func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool { // Enforce max addresses. if len(bucket) > newBucketSize { - log.Tracef("new bucket is full, expiring old ") + log.Infof("new bucket is full, expiring old ") a.expireNew(bucketIdx) } @@ -519,7 +519,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) { bucket := a.calcNewBucket(addr, src) a.addToNewBucket(ka, bucket) - log.Tracef("Added new address %s for a total of %d addresses", addr, a.size()) + log.Infof("Added new address %s for a total of %d addresses", addr, a.size()) } // Make space in the new buckets by expiring the really bad entries. @@ -527,8 +527,8 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) { func (a *AddrBook) expireNew(bucketIdx int) { for key, ka := range a.addrNew[bucketIdx] { // If an entry is bad, throw it away - if ka.IsBad() { - log.Tracef("expiring bad address %v", key) + if ka.isBad() { + log.Infof("expiring bad address %v", key) a.removeFromBucket(ka, bucketTypeNew, bucketIdx) return } @@ -756,7 +756,7 @@ func (ka *knownAddress) removeBucketRef(bucketIdx int) int { All addresses that meet these criteria are assumed to be worthless and not worth keeping hold of. */ -func (ka *knownAddress) IsBad() bool { +func (ka *knownAddress) isBad() bool { // Has been attempted in the last minute --> good if ka.LastAttempt.Before(time.Now().Add(-1 * time.Minute)) { return false diff --git a/p2p/peer.go b/p2p/peer.go index 039662f38..2b7395e34 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -12,7 +12,7 @@ import ( /* Peer */ type Peer struct { - outgoing bool + outbound bool conn *Connection channels map[string]*Channel quit chan struct{} @@ -55,8 +55,8 @@ func (p *Peer) stop() { } } -func (p *Peer) IsOutgoing() bool { - return p.outgoing +func (p *Peer) IsOutbound() bool { + return p.outbound } func (p *Peer) LocalAddress() *NetAddress { @@ -96,7 +96,7 @@ func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { } func (p *Peer) String() string { - return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing) + return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outbound) } // sendHandler pulls from a channel and pushes to the connection. diff --git a/p2p/peer_set.go b/p2p/peer_set.go index ade9abcac..e8905ce03 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -4,6 +4,16 @@ import ( "sync" ) +/* +ReadOnlyPeerSet has a subset of the methods of PeerSet. +*/ +type ReadOnlyPeerSet interface { + Has(addr *NetAddress) bool + List() []*Peer +} + +//----------------------------------------------------------------------------- + /* PeerSet is a special structure for keeping a table of peers. Iteration over the peers is super fast and thread-safe. @@ -41,6 +51,13 @@ func (ps *PeerSet) Add(peer *Peer) bool { return true } +func (ps *PeerSet) Has(addr *NetAddress) bool { + ps.mtx.Lock() + defer ps.mtx.Unlock() + _, ok := ps.lookup[addr.String()] + return ok +} + func (ps *PeerSet) Remove(peer *Peer) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -69,6 +86,12 @@ func (ps *PeerSet) Remove(peer *Peer) { delete(ps.lookup, addr) } +func (ps *PeerSet) Size() int { + ps.mtx.Lock() + defer ps.mtx.Unlock() + return len(ps.list) +} + // threadsafe list of peers. func (ps *PeerSet) List() []*Peer { ps.mtx.Lock() diff --git a/p2p/switch.go b/p2p/switch.go index 14af7833b..7e49a933e 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -12,7 +12,7 @@ All communication amongst peers are multiplexed by "channels". (Not the same as Go "channels") To send a message, encapsulate it into a "Packet" and send it to each peer. -You can find all connected and active peers by iterating over ".Peers()". +You can find all connected and active peers by iterating over ".Peers().List()". ".Broadcast()" is provided for convenience, but by iterating over the peers manually the caller can decide which subset receives a message. @@ -69,19 +69,19 @@ func (s *Switch) Stop() { } } -func (s *Switch) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) { +func (s *Switch) AddPeerWithConnection(conn *Connection, outbound bool) (*Peer, error) { if atomic.LoadUint32(&s.stopped) == 1 { return nil, ErrSwitchStopped } - log.Infof("Adding peer with connection: %v, outgoing: %v", conn, outgoing) + log.Infof("Adding peer with connection: %v, outbound: %v", conn, outbound) // Create channels for peer channels := map[string]*Channel{} for _, chDesc := range s.channels { channels[chDesc.Name] = newChannel(chDesc) } peer := newPeer(conn, channels) - peer.outgoing = outgoing + peer.outbound = outbound err := s.addPeer(peer) if err != nil { return nil, err @@ -133,8 +133,18 @@ func (s *Switch) Receive(chName string) *InboundPacket { } } -func (s *Switch) Peers() []*Peer { - return s.peers.List() +func (s *Switch) NumOutboundPeers() (count int) { + peers := s.peers.List() + for _, peer := range peers { + if peer.outbound { + count++ + } + } + return +} + +func (s *Switch) Peers() ReadOnlyPeerSet { + return s.peers } // Disconnect from a peer due to external error. diff --git a/p2p/switch_test.go b/p2p/switch_test.go index fc50c84ef..57933dded 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -59,11 +59,11 @@ func TestSwitches(t *testing.T) { defer s2.Stop() // Lets send a message from s1 to s2. - if len(s1.Peers()) != 1 { - t.Errorf("Expected exactly 1 peer in s1, got %v", len(s1.Peers())) + if s1.Peers().Size() != 1 { + t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size()) } - if len(s2.Peers()) != 1 { - t.Errorf("Expected exactly 1 peer in s2, got %v", len(s2.Peers())) + if s2.Peers().Size() != 1 { + t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size()) } // Broadcast a message on ch1