Browse Source

Send external addresses upon new outbound peer

pull/9/head
Jae Kwon 10 years ago
parent
commit
56a92b512f
9 changed files with 370 additions and 246 deletions
  1. +66
    -107
      main.go
  2. +2
    -2
      p2p/connection.go
  3. +2
    -0
      p2p/listener.go
  4. +34
    -8
      p2p/peer.go
  5. +228
    -0
      p2p/peer_manager.go
  6. +3
    -2
      p2p/peer_set.go
  7. +0
    -118
      p2p/pex.go
  8. +28
    -2
      p2p/switch.go
  9. +7
    -7
      p2p/switch_test.go

+ 66
- 107
main.go View File

@ -1,28 +1,21 @@
package main
// TODO: ensure Mark* gets called.
import (
"os"
"os/signal"
"time"
. "github.com/tendermint/tendermint/common"
. "github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p"
)
const (
minNumPeers = 10
maxNumPeers = 20
ensurePeersPeriodSeconds = 30
peerDialTimeoutSeconds = 30
)
type Node struct {
sw *p2p.Switch
book *p2p.AddrBook
quit chan struct{}
dialing *CMap
lz []p2p.Listener
sw *p2p.Switch
book *p2p.AddrBook
pmgr *p2p.PeerManager
}
func NewNode() *Node {
@ -51,122 +44,83 @@ func NewNode() *Node {
}
sw := p2p.NewSwitch(chDescs)
book := p2p.NewAddrBook(config.AppDir + "/addrbook.json")
pmgr := p2p.NewPeerManager(sw, book)
return &Node{
sw: sw,
book: book,
quit: make(chan struct{}, 0),
dialing: NewCMap(),
sw: sw,
book: book,
pmgr: pmgr,
}
}
func (n *Node) Start() {
log.Infof("Starting node")
for _, l := range n.lz {
go n.inboundConnectionHandler(l)
}
n.sw.Start()
n.book.Start()
go p2p.PexHandler(n.sw, n.book)
go n.ensurePeersHandler()
}
func (n *Node) initPeer(peer *p2p.Peer) {
if peer.IsOutbound() {
// TODO: initiate PEX
}
n.pmgr.Start()
}
// Add a Listener to accept incoming peer connections.
func (n *Node) AddListener(l p2p.Listener) {
log.Infof("Adding listener %v", l)
go func() {
for {
inConn, ok := <-l.Connections()
if !ok {
break
}
peer, err := n.sw.AddPeerWithConnection(inConn, false)
if err != nil {
log.Infof("Ignoring error from incoming connection: %v\n%v",
peer, err)
continue
}
n.initPeer(peer)
}
}()
n.lz = append(n.lz, l)
}
// threadsafe
func (n *Node) DialPeerWithAddress(addr *p2p.NetAddress) (*p2p.Peer, error) {
log.Infof("Dialing peer @ %v", addr)
n.dialing.Set(addr.String(), addr)
n.book.MarkAttempt(addr)
conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
n.dialing.Delete(addr.String())
if err != nil {
return nil, err
}
peer, err := n.sw.AddPeerWithConnection(conn, true)
if err != nil {
return nil, err
}
n.initPeer(peer)
return peer, nil
}
// Ensures that sufficient peers are connected.
func (n *Node) ensurePeers() {
numPeers := n.sw.NumOutboundPeers()
numDialing := n.dialing.Size()
numToDial := minNumPeers - (numPeers + numDialing)
if numToDial <= 0 {
return
}
for i := 0; i < numToDial; i++ {
newBias := MinInt(numPeers, 8)*10 + 10
var picked *p2p.NetAddress
// Try to fetch a new peer 3 times.
// This caps the maximum number of tries to 3 * numToDial.
for j := 0; i < 3; j++ {
picked = n.book.PickAddress(newBias)
if picked == nil {
log.Debug("Empty addrbook.")
return
}
if n.sw.Peers().Has(picked) {
continue
} else {
break
}
func (n *Node) inboundConnectionHandler(l p2p.Listener) {
for {
inConn, ok := <-l.Connections()
if !ok {
break
}
if picked == nil {
// New incoming connection!
peer, err := n.sw.AddPeerWithConnection(inConn, false)
if err != nil {
log.Infof("Ignoring error from incoming connection: %v\n%v",
peer, err)
continue
}
go n.DialPeerWithAddress(picked)
// NOTE: We don't yet have the external address of the
// remote (if they have a listener at all).
// PeerManager's pexHandler will handle that.
}
// cleanup
}
func (n *Node) ensurePeersHandler() {
// fire once immediately.
n.ensurePeers()
// fire periodically
timer := NewRepeatTimer(ensurePeersPeriodSeconds * time.Second)
FOR_LOOP:
func (n *Node) SendOurExternalAddrs(peer *p2p.Peer) {
// Send listener our external address(es)
addrs := []*p2p.NetAddress{}
for _, l := range n.lz {
addrs = append(addrs, l.ExternalAddress())
}
pexAddrsMsg := &p2p.PexAddrsMessage{Addrs: addrs}
peer.Send(p2p.NewPacket(
p2p.PexCh,
BinaryBytes(pexAddrsMsg),
))
// On the remote end, the pexHandler may choose
// to add these to its book.
}
func (n *Node) newPeersHandler() {
for {
select {
case <-timer.Ch:
n.ensurePeers()
case <-n.quit:
break FOR_LOOP
peer, ok := <-n.pmgr.NewPeers()
if !ok {
break
}
// New outgoing peer!
n.SendOurExternalAddrs(peer)
}
// cleanup
timer.Stop()
}
func (n *Node) Stop() {
log.Infof("Stopping node")
// TODO: gracefully disconnect from peers.
n.sw.Stop()
n.book.Stop()
n.pmgr.Stop()
}
//-----------------------------------------------------------------------------
@ -175,33 +129,38 @@ func main() {
// Create & start node
n := NewNode()
log.Warnf(">> %v", config.Config.LAddr)
l := p2p.NewDefaultListener("tcp", config.Config.LAddr)
n.AddListener(l)
n.Start()
// Seed?
if config.Config.Seed != "" {
peer, err := n.DialPeerWithAddress(p2p.NewNetAddressString(config.Config.Seed))
peer, err := n.sw.DialPeerWithAddress(p2p.NewNetAddressString(config.Config.Seed))
if err != nil {
log.Errorf("Error dialing seed: %v", err)
//n.book.MarkAttempt(addr)
return
} else {
log.Infof("Connected to seed: %v", peer)
n.SendOurExternalAddrs(peer)
}
log.Infof("Connected to seed: %v", peer)
}
// Sleep
trapSignal()
select {}
// Sleep forever and then...
trapSignal(func() {
n.Stop()
})
}
func trapSignal() {
func trapSignal(cb func()) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for sig := range c {
log.Infof("captured %v, exiting..", sig)
cb()
os.Exit(1)
}
}()
select {}
}

+ 2
- 2
p2p/connection.go View File

@ -158,11 +158,11 @@ FOR_LOOP:
c.flush()
case <-c.pingRepeatTimer.Ch:
_, err = packetTypePing.WriteTo(c.bufWriter)
log.Debugf("[%v] Sending Ping", c)
log.Debugf("Send [Ping] -> %v", c)
c.flush()
case <-c.pong:
_, err = packetTypePong.WriteTo(c.bufWriter)
log.Debugf("[%v] Sending Pong", c)
log.Debugf("Send [Pong] -> %v", c)
c.flush()
case <-c.quit:
break FOR_LOOP


+ 2
- 0
p2p/listener.go View File

@ -110,6 +110,8 @@ func (l *DefaultListener) listenHandler() {
}
}
// A channel of inbound connections.
// It gets closed when the listener closes.
func (l *DefaultListener) Connections() <-chan *Connection {
return l.connections
}


+ 34
- 8
p2p/peer.go View File

@ -1,6 +1,7 @@
package p2p
import (
"bytes"
"fmt"
"io"
"sync/atomic"
@ -71,9 +72,10 @@ func (p *Peer) Channel(chName string) *Channel {
return p.channels[chName]
}
// TryQueue returns true if the packet was successfully queued.
// TrySend returns true if the packet was successfully queued.
// Returning true does not imply that the packet will be sent.
func (p *Peer) TryQueue(pkt Packet) bool {
func (p *Peer) TrySend(pkt Packet) bool {
log.Debugf("TrySend [%v] -> %v", pkt, p)
channel := p.Channel(string(pkt.Channel))
sendQueue := channel.sendQueue
@ -81,8 +83,6 @@ func (p *Peer) TryQueue(pkt Packet) bool {
return false
}
sendQueue <- pkt
return true
select {
case sendQueue <- pkt:
return true
@ -91,6 +91,19 @@ func (p *Peer) TryQueue(pkt Packet) bool {
}
}
func (p *Peer) Send(pkt Packet) bool {
log.Debugf("Send [%v] -> %v", pkt, p)
channel := p.Channel(string(pkt.Channel))
sendQueue := channel.sendQueue
if atomic.LoadUint32(&p.stopped) == 1 {
return false
}
sendQueue <- pkt
return true
}
func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
return p.RemoteAddress().WriteTo(w)
}
@ -159,6 +172,8 @@ FOR_LOOP:
// (none)
}
//-----------------------------------------------------------------------------
/* ChannelDescriptor */
type ChannelDescriptor struct {
@ -196,7 +211,7 @@ func (c *Channel) SendQueue() chan<- Packet {
return c.sendQueue
}
/* Packet */
//-----------------------------------------------------------------------------
/*
Packet encapsulates a ByteSlice on a Channel.
@ -207,10 +222,12 @@ type Packet struct {
// Hash
}
func NewPacket(chName String, bytes ByteSlice) Packet {
func NewPacket(chName String, msg Binary) Packet {
msgBytes := BinaryBytes(msg)
log.Tracef("NewPacket msg bytes: %X", msgBytes)
return Packet{
Channel: chName,
Bytes: bytes,
Bytes: msgBytes,
}
}
@ -220,6 +237,14 @@ func (p Packet) WriteTo(w io.Writer) (n int64, err error) {
return
}
func (p Packet) Reader() io.Reader {
return bytes.NewReader(p.Bytes)
}
func (p Packet) String() string {
return fmt.Sprintf("%v:%X", p.Channel, p.Bytes)
}
func ReadPacketSafe(r io.Reader) (pkt Packet, err error) {
chName, err := ReadStringSafe(r)
if err != nil {
@ -230,7 +255,8 @@ func ReadPacketSafe(r io.Reader) (pkt Packet, err error) {
if err != nil {
return
}
return NewPacket(chName, bytes), nil
log.Tracef("ReadPacket* msg bytes: %X", bytes)
return Packet{Channel: chName, Bytes: bytes}, nil
}
/*


+ 228
- 0
p2p/peer_manager.go View File

@ -0,0 +1,228 @@
package p2p
import (
"bytes"
"errors"
"io"
"sync/atomic"
"time"
. "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
)
var pexErrInvalidMessage = errors.New("Invalid PEX message")
const (
PexCh = "PEX"
ensurePeersPeriodSeconds = 30
minNumPeers = 10
maxNumPeers = 20
)
/*
PeerManager handles PEX (peer exchange) and ensures that an
adequate number of peers are connected to the switch.
User must pull from the .NewPeers() channel.
*/
type PeerManager struct {
sw *Switch
book *AddrBook
quit chan struct{}
newPeers chan *Peer
started uint32
stopped uint32
}
func NewPeerManager(sw *Switch, book *AddrBook) *PeerManager {
pm := &PeerManager{
sw: sw,
book: book,
quit: make(chan struct{}),
newPeers: make(chan *Peer),
}
return pm
}
func (pm *PeerManager) Start() {
if atomic.CompareAndSwapUint32(&pm.started, 0, 1) {
log.Infof("Starting peerManager")
go pm.ensurePeersHandler()
go pm.pexHandler()
}
}
func (pm *PeerManager) Stop() {
if atomic.CompareAndSwapUint32(&pm.stopped, 0, 1) {
log.Infof("Stopping peerManager")
close(pm.newPeers)
close(pm.quit)
}
}
// Closes when PeerManager closes.
func (pm *PeerManager) NewPeers() <-chan *Peer {
return pm.newPeers
}
func (pm *PeerManager) ensurePeersHandler() {
// fire once immediately.
pm.ensurePeers()
// fire periodically
timer := NewRepeatTimer(ensurePeersPeriodSeconds * time.Second)
FOR_LOOP:
for {
select {
case <-timer.Ch:
pm.ensurePeers()
case <-pm.quit:
break FOR_LOOP
}
}
// cleanup
timer.Stop()
}
// Ensures that sufficient peers are connected.
func (pm *PeerManager) ensurePeers() {
numPeers := pm.sw.NumOutboundPeers()
numDialing := pm.sw.dialing.Size()
numToDial := minNumPeers - (numPeers + numDialing)
if numToDial <= 0 {
return
}
for i := 0; i < numToDial; i++ {
newBias := MinInt(numPeers, 8)*10 + 10
var picked *NetAddress
// Try to fetch a new peer 3 times.
// This caps the maximum number of tries to 3 * numToDial.
for j := 0; i < 3; j++ {
picked = pm.book.PickAddress(newBias)
if picked == nil {
log.Debug("Empty addrbook.")
return
}
if pm.sw.Peers().Has(picked) {
continue
} else {
break
}
}
if picked == nil {
continue
}
// Dial picked address
go func() {
peer, err := pm.sw.DialPeerWithAddress(picked)
if err != nil {
pm.book.MarkAttempt(picked)
}
// Connection established.
pm.newPeers <- peer
}()
}
}
func (pm *PeerManager) pexHandler() {
for {
inPkt := pm.sw.Receive(PexCh) // {Peer, Time, Packet}
if inPkt == nil {
// Client has stopped
break
}
// decode message
msg := decodeMessage(inPkt.Bytes)
log.Infof("pexHandler received %v", msg)
switch msg.(type) {
case *PexRequestMessage:
// inPkt.Peer requested some peers.
// TODO: prevent abuse.
addrs := pm.book.GetSelection()
response := &PexAddrsMessage{Addrs: addrs}
pkt := NewPacket(PexCh, BinaryBytes(response))
queued := inPkt.Peer.TrySend(pkt)
if !queued {
// ignore
}
case *PexAddrsMessage:
// We received some peer addresses from inPkt.Peer.
// TODO: prevent abuse.
// (We don't want to get spammed with bad peers)
srcAddr := inPkt.Peer.RemoteAddress()
for _, addr := range msg.(*PexAddrsMessage).Addrs {
pm.book.AddAddress(addr, srcAddr)
}
default:
// Bad peer.
pm.sw.StopPeerForError(inPkt.Peer, pexErrInvalidMessage)
}
}
// cleanup
}
//-----------------------------------------------------------------------------
/* Messages */
const (
pexTypeUnknown = Byte(0x00)
pexTypeRequest = Byte(0x01)
pexTypeAddrs = Byte(0x02)
)
// TODO: check for unnecessary extra bytes at the end.
func decodeMessage(bz ByteSlice) (msg Message) {
switch Byte(bz[0]) {
case pexTypeRequest:
return &PexRequestMessage{}
case pexTypeAddrs:
return readPexAddrsMessage(bytes.NewReader(bz[1:]))
default:
return nil
}
}
/*
A PexRequestMessage requests additional peer addresses.
*/
type PexRequestMessage struct {
}
func (m *PexRequestMessage) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(pexTypeRequest, w, n, err)
return
}
/*
A message with announced peer addresses.
*/
type PexAddrsMessage struct {
Addrs []*NetAddress
}
func readPexAddrsMessage(r io.Reader) *PexAddrsMessage {
numAddrs := int(ReadUInt32(r))
addrs := []*NetAddress{}
for i := 0; i < numAddrs; i++ {
addr := ReadNetAddress(r)
addrs = append(addrs, addr)
}
return &PexAddrsMessage{
Addrs: addrs,
}
}
func (m *PexAddrsMessage) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(pexTypeAddrs, w, n, err)
n, err = WriteOnto(UInt32(len(m.Addrs)), w, n, err)
for _, addr := range m.Addrs {
n, err = WriteOnto(addr, w, n, err)
}
return
}

+ 3
- 2
p2p/peer_set.go View File

@ -5,11 +5,12 @@ import (
)
/*
ReadOnlyPeerSet has a subset of the methods of PeerSet.
IPeerSet has a (immutable) subset of the methods of PeerSet.
*/
type ReadOnlyPeerSet interface {
type IPeerSet interface {
Has(addr *NetAddress) bool
List() []*Peer
Size() int
}
//-----------------------------------------------------------------------------


+ 0
- 118
p2p/pex.go View File

@ -1,118 +0,0 @@
package p2p
import (
"bytes"
"errors"
"io"
. "github.com/tendermint/tendermint/binary"
)
var pexErrInvalidMessage = errors.New("Invalid PEX message")
const pexCh = "PEX"
/*
The PexHandler routine should be started separately from the Switch.
It handles basic PEX communciation.
The application is responsible for sending out a PexRequestMessage.
*/
func PexHandler(s *Switch, addrBook *AddrBook) {
for {
inPkt := s.Receive(pexCh) // {Peer, Time, Packet}
if inPkt == nil {
// Client has stopped
break
}
// decode message
msg := decodeMessage(inPkt.Bytes)
switch msg.(type) {
case *PexRequestMessage:
// inPkt.Peer requested some peers.
// TODO: prevent abuse.
addrs := addrBook.GetSelection()
response := &pexResponseMessage{Addrs: addrs}
pkt := NewPacket(pexCh, BinaryBytes(response))
queued := inPkt.Peer.TryQueue(pkt)
if !queued {
// ignore
}
case *pexResponseMessage:
// We received some peer addresses from inPkt.Peer.
// TODO: prevent abuse.
// (We don't want to get spammed with bad peers)
srcAddr := inPkt.Peer.RemoteAddress()
for _, addr := range msg.(*pexResponseMessage).Addrs {
addrBook.AddAddress(addr, srcAddr)
}
default:
// Bad peer.
s.StopPeerForError(inPkt.Peer, pexErrInvalidMessage)
}
}
// cleanup
}
/* Messages */
const (
pexTypeUnknown = Byte(0x00)
pexTypeRequest = Byte(0x01)
pexTypeResponse = Byte(0x02)
)
// TODO: check for unnecessary extra bytes at the end.
func decodeMessage(bz ByteSlice) (msg Message) {
switch Byte(bz[0]) {
case pexTypeRequest:
return &PexRequestMessage{}
case pexTypeResponse:
return readPexResponseMessage(bytes.NewReader(bz[1:]))
default:
return nil
}
}
/*
A response with peer addresses
*/
type PexRequestMessage struct {
}
func (m *PexRequestMessage) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(pexTypeRequest, w, n, err)
return
}
/*
A response with peer addresses
*/
type pexResponseMessage struct {
Addrs []*NetAddress
}
func readPexResponseMessage(r io.Reader) *pexResponseMessage {
numAddrs := int(ReadUInt32(r))
addrs := []*NetAddress{}
for i := 0; i < numAddrs; i++ {
addr := ReadNetAddress(r)
addrs = append(addrs, addr)
}
return &pexResponseMessage{
Addrs: addrs,
}
}
func (m *pexResponseMessage) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(pexTypeResponse, w, n, err)
n, err = WriteOnto(UInt32(len(m.Addrs)), w, n, err)
for _, addr := range m.Addrs {
n, err = WriteOnto(addr, w, n, err)
}
return
}

+ 28
- 2
p2p/switch.go View File

@ -3,6 +3,7 @@ package p2p
import (
"errors"
"sync/atomic"
"time"
. "github.com/tendermint/tendermint/common"
)
@ -22,6 +23,7 @@ type Switch struct {
channels []ChannelDescriptor
pktRecvQueues map[string]chan *InboundPacket
peers *PeerSet
dialing *CMap
quit chan struct{}
started uint32
stopped uint32
@ -32,6 +34,10 @@ var (
ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
)
const (
peerDialTimeoutSeconds = 30
)
func NewSwitch(channels []ChannelDescriptor) *Switch {
// make pktRecvQueues...
pktRecvQueues := make(map[string]chan *InboundPacket)
@ -43,6 +49,7 @@ func NewSwitch(channels []ChannelDescriptor) *Switch {
channels: channels,
pktRecvQueues: pktRecvQueues,
peers: NewPeerSet(),
dialing: NewCMap(),
quit: make(chan struct{}),
stopped: 0,
}
@ -92,6 +99,25 @@ func (s *Switch) AddPeerWithConnection(conn *Connection, outbound bool) (*Peer,
return peer, nil
}
func (s *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
if atomic.LoadUint32(&s.stopped) == 1 {
return nil, ErrSwitchStopped
}
log.Infof("Dialing peer @ %v", addr)
s.dialing.Set(addr.String(), addr)
conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
s.dialing.Delete(addr.String())
if err != nil {
return nil, err
}
peer, err := s.AddPeerWithConnection(conn, true)
if err != nil {
return nil, err
}
return peer, nil
}
func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) {
if atomic.LoadUint32(&s.stopped) == 1 {
return
@ -99,7 +125,7 @@ func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) {
log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes))
for _, peer := range s.peers.List() {
success := peer.TryQueue(pkt)
success := peer.TrySend(pkt)
log.Tracef("Broadcast for peer %v success: %v", peer, success)
if success {
numSuccess += 1
@ -143,7 +169,7 @@ func (s *Switch) NumOutboundPeers() (count int) {
return
}
func (s *Switch) Peers() ReadOnlyPeerSet {
func (s *Switch) Peers() IPeerSet {
return s.peers
}


+ 7
- 7
p2p/switch_test.go View File

@ -67,25 +67,25 @@ func TestSwitches(t *testing.T) {
}
// Broadcast a message on ch1
s1.Broadcast(NewPacket("ch1", ByteSlice("channel one")))
s1.Broadcast(NewPacket("ch1", String("channel one")))
// Broadcast a message on ch2
s1.Broadcast(NewPacket("ch2", ByteSlice("channel two")))
s1.Broadcast(NewPacket("ch2", String("channel two")))
// Broadcast a message on ch3
s1.Broadcast(NewPacket("ch3", ByteSlice("channel three")))
s1.Broadcast(NewPacket("ch3", String("channel three")))
// Wait for things to settle...
time.Sleep(100 * time.Millisecond)
// Receive message from channel 2 and check
inMsg := s2.Receive("ch2")
if string(inMsg.Bytes) != "channel two" {
t.Errorf("Unexpected received message bytes: %v", string(inMsg.Bytes))
if ReadString(inMsg.Reader()) != "channel two" {
t.Errorf("Unexpected received message bytes: %X = [%v]", inMsg.Bytes, ReadString(inMsg.Reader()))
}
// Receive message from channel 1 and check
inMsg = s2.Receive("ch1")
if string(inMsg.Bytes) != "channel one" {
t.Errorf("Unexpected received message bytes: %v", string(inMsg.Bytes))
if ReadString(inMsg.Reader()) != "channel one" {
t.Errorf("Unexpected received message bytes: %X = [%v]", inMsg.Bytes, ReadString(inMsg.Reader()))
}
}


Loading…
Cancel
Save