Browse Source

.

pull/9/head
Jae Kwon 11 years ago
parent
commit
0b1265dc10
8 changed files with 226 additions and 35 deletions
  1. +3
    -2
      README.md
  2. +115
    -0
      common/math.go
  3. +54
    -12
      main.go
  4. +7
    -7
      p2p/addrbook.go
  5. +4
    -4
      p2p/peer.go
  6. +23
    -0
      p2p/peer_set.go
  7. +16
    -6
      p2p/switch.go
  8. +4
    -4
      p2p/switch_test.go

+ 3
- 2
README.md View File

@ -6,7 +6,8 @@ TenderMint - proof of concept
### Status
* Implement basic peer exchange
* Implemented the basics of peer/*
* Node & testnet *now*
* PEX peer exchange *complete*
* p2p/* *complete*
* Ed25519 bindings *complete*
* merkle/* *complete*

+ 115
- 0
common/math.go View File

@ -0,0 +1,115 @@
package common
func MaxInt8(a, b int8) int8 {
if a > b {
return a
}
return b
}
func MaxUint8(a, b uint8) uint8 {
if a > b {
return a
}
return b
}
func MaxInt16(a, b int16) int16 {
if a > b {
return a
}
return b
}
func MaxUint16(a, b uint16) uint16 {
if a > b {
return a
}
return b
}
func MaxInt32(a, b int32) int32 {
if a > b {
return a
}
return b
}
func MaxUint32(a, b uint32) uint32 {
if a > b {
return a
}
return b
}
func MaxInt(a, b int) int {
if a > b {
return a
}
return b
}
func MaxUint(a, b uint) uint {
if a > b {
return a
}
return b
}
//-----------------------------------------------------------------------------
func MinInt8(a, b int8) int8 {
if a < b {
return a
}
return b
}
func MinUint8(a, b uint8) uint8 {
if a < b {
return a
}
return b
}
func MinInt16(a, b int16) int16 {
if a < b {
return a
}
return b
}
func MinUint16(a, b uint16) uint16 {
if a < b {
return a
}
return b
}
func MinInt32(a, b int32) int32 {
if a < b {
return a
}
return b
}
func MinUint32(a, b uint32) uint32 {
if a < b {
return a
}
return b
}
func MinInt(a, b int) int {
if a < b {
return a
}
return b
}
func MinUint(a, b uint) uint {
if a < b {
return a
}
return b
}

+ 54
- 12
main.go View File

@ -3,7 +3,9 @@ package main
import (
"os"
"os/signal"
"time"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p"
)
@ -18,7 +20,7 @@ const (
type Node struct {
sw *p2p.Switch
book *p2p.AddressBook
book *p2p.AddrBook
quit chan struct{}
dialing *CMap
}
@ -50,7 +52,7 @@ func NewNode() *Node {
sw := p2p.NewSwitch(chDescs)
book := p2p.NewAddrBook(config.AppDir + "/addrbook.json")
return &New{
return &Node{
sw: sw,
book: book,
quit: make(chan struct{}, 0),
@ -59,20 +61,21 @@ func NewNode() *Node {
}
func (n *Node) Start() {
log.Infof("Starting node")
n.sw.Start()
n.book.Start()
go p2p.PexHandler(sw, book)
go n.ensurePeersHandler(sw, book)
go p2p.PexHandler(n.sw, n.book)
go n.ensurePeersHandler()
}
func (n *Node) initPeer(peer *Peer) {
if peer.IsOutgoing() {
func (n *Node) initPeer(peer *p2p.Peer) {
if peer.IsOutbound() {
// TODO: initiate PEX
}
}
// Add a Listener to accept incoming peer connections.
func (n *Node) AddListener(l Listener) {
func (n *Node) AddListener(l p2p.Listener) {
go func() {
for {
inConn, ok := <-l.Connections()
@ -92,18 +95,56 @@ func (n *Node) AddListener(l Listener) {
// Ensures that sufficient peers are connected.
func (n *Node) ensurePeers() {
numPeers := len(n.sw.Peers())
numPeers := n.sw.NumOutboundPeers()
numDialing := n.dialing.Size()
numToDial = minNumPeers - (numPeers + numDialing)
numToDial := minNumPeers - (numPeers + numDialing)
if numToDial <= 0 {
return
}
for i := 0; i < numToDial; i++ {
// XXX
newBias := MinInt(numPeers, 8)*10 + 10
var picked *p2p.NetAddress
// Try to fetch a new peer 3 times.
// This caps the maximum number of tries to 3 * numToDial.
for j := 0; i < 3; j++ {
picked = n.book.PickAddress(newBias)
if picked == nil {
log.Infof("Empty addrbook.")
return
}
if n.sw.Peers().Has(picked) {
continue
} else {
break
}
}
if picked == nil {
continue
}
n.dialing.Set(picked.String(), picked)
n.book.MarkAttempt(picked)
go func() {
log.Infof("Dialing addr: %v", picked)
conn, err := picked.DialTimeout(peerDialTimeoutSeconds * time.Second)
n.dialing.Delete(picked.String())
if err != nil {
// ignore error.
return
}
peer, err := n.sw.AddPeerWithConnection(conn, true)
if err != nil {
log.Warnf("Error trying to add new outbound peer connection:%v", err)
return
}
n.initPeer(peer)
}()
}
}
func (n *Node) ensurePeersHandler() {
// fire once immediately.
n.ensurePeers()
// fire periodically
timer := NewRepeatTimer(ensurePeersPeriodSeconds * time.Second)
FOR_LOOP:
for {
@ -131,7 +172,8 @@ func main() {
n := NewNode()
l := p2p.NewDefaultListener("tcp", ":8001")
n.AddListener()
n.AddListener(l)
n.Start()
if false {
// TODO remove
@ -141,7 +183,7 @@ func main() {
log.Infof("Error connecting to it: %v", err)
return
}
peer, err := sw.AddPeerWithConnection(conn, true)
peer, err := n.sw.AddPeerWithConnection(conn, true)
if err != nil {
log.Infof("Error adding peer with connection: %v", err)
return


+ 7
- 7
p2p/addrbook.go View File

@ -123,7 +123,7 @@ func (a *AddrBook) init() {
func (a *AddrBook) Start() {
if atomic.CompareAndSwapUint32(&a.started, 0, 1) {
log.Trace("Starting address manager")
log.Infof("Starting address manager")
a.loadFromFile(a.filePath)
a.wg.Add(1)
go a.saveHandler()
@ -367,7 +367,7 @@ out:
dumpAddressTicker.Stop()
a.saveToFile(a.filePath)
a.wg.Done()
log.Trace("Address handler done")
log.Info("Address handler done")
}
func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
@ -399,7 +399,7 @@ func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool {
// Enforce max addresses.
if len(bucket) > newBucketSize {
log.Tracef("new bucket is full, expiring old ")
log.Infof("new bucket is full, expiring old ")
a.expireNew(bucketIdx)
}
@ -519,7 +519,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) {
bucket := a.calcNewBucket(addr, src)
a.addToNewBucket(ka, bucket)
log.Tracef("Added new address %s for a total of %d addresses", addr, a.size())
log.Infof("Added new address %s for a total of %d addresses", addr, a.size())
}
// Make space in the new buckets by expiring the really bad entries.
@ -527,8 +527,8 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) {
func (a *AddrBook) expireNew(bucketIdx int) {
for key, ka := range a.addrNew[bucketIdx] {
// If an entry is bad, throw it away
if ka.IsBad() {
log.Tracef("expiring bad address %v", key)
if ka.isBad() {
log.Infof("expiring bad address %v", key)
a.removeFromBucket(ka, bucketTypeNew, bucketIdx)
return
}
@ -756,7 +756,7 @@ func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
All addresses that meet these criteria are assumed to be worthless and not
worth keeping hold of.
*/
func (ka *knownAddress) IsBad() bool {
func (ka *knownAddress) isBad() bool {
// Has been attempted in the last minute --> good
if ka.LastAttempt.Before(time.Now().Add(-1 * time.Minute)) {
return false


+ 4
- 4
p2p/peer.go View File

@ -12,7 +12,7 @@ import (
/* Peer */
type Peer struct {
outgoing bool
outbound bool
conn *Connection
channels map[string]*Channel
quit chan struct{}
@ -55,8 +55,8 @@ func (p *Peer) stop() {
}
}
func (p *Peer) IsOutgoing() bool {
return p.outgoing
func (p *Peer) IsOutbound() bool {
return p.outbound
}
func (p *Peer) LocalAddress() *NetAddress {
@ -96,7 +96,7 @@ func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
}
func (p *Peer) String() string {
return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing)
return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outbound)
}
// sendHandler pulls from a channel and pushes to the connection.


+ 23
- 0
p2p/peer_set.go View File

@ -4,6 +4,16 @@ import (
"sync"
)
/*
ReadOnlyPeerSet has a subset of the methods of PeerSet.
*/
type ReadOnlyPeerSet interface {
Has(addr *NetAddress) bool
List() []*Peer
}
//-----------------------------------------------------------------------------
/*
PeerSet is a special structure for keeping a table of peers.
Iteration over the peers is super fast and thread-safe.
@ -41,6 +51,13 @@ func (ps *PeerSet) Add(peer *Peer) bool {
return true
}
func (ps *PeerSet) Has(addr *NetAddress) bool {
ps.mtx.Lock()
defer ps.mtx.Unlock()
_, ok := ps.lookup[addr.String()]
return ok
}
func (ps *PeerSet) Remove(peer *Peer) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
@ -69,6 +86,12 @@ func (ps *PeerSet) Remove(peer *Peer) {
delete(ps.lookup, addr)
}
func (ps *PeerSet) Size() int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return len(ps.list)
}
// threadsafe list of peers.
func (ps *PeerSet) List() []*Peer {
ps.mtx.Lock()


+ 16
- 6
p2p/switch.go View File

@ -12,7 +12,7 @@ All communication amongst peers are multiplexed by "channels".
(Not the same as Go "channels")
To send a message, encapsulate it into a "Packet" and send it to each peer.
You can find all connected and active peers by iterating over ".Peers()".
You can find all connected and active peers by iterating over ".Peers().List()".
".Broadcast()" is provided for convenience, but by iterating over
the peers manually the caller can decide which subset receives a message.
@ -69,19 +69,19 @@ func (s *Switch) Stop() {
}
}
func (s *Switch) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) {
func (s *Switch) AddPeerWithConnection(conn *Connection, outbound bool) (*Peer, error) {
if atomic.LoadUint32(&s.stopped) == 1 {
return nil, ErrSwitchStopped
}
log.Infof("Adding peer with connection: %v, outgoing: %v", conn, outgoing)
log.Infof("Adding peer with connection: %v, outbound: %v", conn, outbound)
// Create channels for peer
channels := map[string]*Channel{}
for _, chDesc := range s.channels {
channels[chDesc.Name] = newChannel(chDesc)
}
peer := newPeer(conn, channels)
peer.outgoing = outgoing
peer.outbound = outbound
err := s.addPeer(peer)
if err != nil {
return nil, err
@ -133,8 +133,18 @@ func (s *Switch) Receive(chName string) *InboundPacket {
}
}
func (s *Switch) Peers() []*Peer {
return s.peers.List()
func (s *Switch) NumOutboundPeers() (count int) {
peers := s.peers.List()
for _, peer := range peers {
if peer.outbound {
count++
}
}
return
}
func (s *Switch) Peers() ReadOnlyPeerSet {
return s.peers
}
// Disconnect from a peer due to external error.


+ 4
- 4
p2p/switch_test.go View File

@ -59,11 +59,11 @@ func TestSwitches(t *testing.T) {
defer s2.Stop()
// Lets send a message from s1 to s2.
if len(s1.Peers()) != 1 {
t.Errorf("Expected exactly 1 peer in s1, got %v", len(s1.Peers()))
if s1.Peers().Size() != 1 {
t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
}
if len(s2.Peers()) != 1 {
t.Errorf("Expected exactly 1 peer in s2, got %v", len(s2.Peers()))
if s2.Peers().Size() != 1 {
t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
}
// Broadcast a message on ch1


Loading…
Cancel
Save