diff --git a/main.go b/main.go index 75251b52a..e64027466 100644 --- a/main.go +++ b/main.go @@ -1,28 +1,21 @@ package main +// TODO: ensure Mark* gets called. + import ( "os" "os/signal" - "time" - . "github.com/tendermint/tendermint/common" + . "github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" ) -const ( - minNumPeers = 10 - maxNumPeers = 20 - - ensurePeersPeriodSeconds = 30 - peerDialTimeoutSeconds = 30 -) - type Node struct { - sw *p2p.Switch - book *p2p.AddrBook - quit chan struct{} - dialing *CMap + lz []p2p.Listener + sw *p2p.Switch + book *p2p.AddrBook + pmgr *p2p.PeerManager } func NewNode() *Node { @@ -51,122 +44,83 @@ func NewNode() *Node { } sw := p2p.NewSwitch(chDescs) book := p2p.NewAddrBook(config.AppDir + "/addrbook.json") + pmgr := p2p.NewPeerManager(sw, book) return &Node{ - sw: sw, - book: book, - quit: make(chan struct{}, 0), - dialing: NewCMap(), + sw: sw, + book: book, + pmgr: pmgr, } } func (n *Node) Start() { log.Infof("Starting node") + for _, l := range n.lz { + go n.inboundConnectionHandler(l) + } n.sw.Start() n.book.Start() - go p2p.PexHandler(n.sw, n.book) - go n.ensurePeersHandler() -} - -func (n *Node) initPeer(peer *p2p.Peer) { - if peer.IsOutbound() { - // TODO: initiate PEX - } + n.pmgr.Start() } // Add a Listener to accept incoming peer connections. func (n *Node) AddListener(l p2p.Listener) { - log.Infof("Adding listener %v", l) - go func() { - for { - inConn, ok := <-l.Connections() - if !ok { - break - } - peer, err := n.sw.AddPeerWithConnection(inConn, false) - if err != nil { - log.Infof("Ignoring error from incoming connection: %v\n%v", - peer, err) - continue - } - n.initPeer(peer) - } - }() + n.lz = append(n.lz, l) } -// threadsafe -func (n *Node) DialPeerWithAddress(addr *p2p.NetAddress) (*p2p.Peer, error) { - log.Infof("Dialing peer @ %v", addr) - n.dialing.Set(addr.String(), addr) - n.book.MarkAttempt(addr) - conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second) - n.dialing.Delete(addr.String()) - if err != nil { - return nil, err - } - peer, err := n.sw.AddPeerWithConnection(conn, true) - if err != nil { - return nil, err - } - n.initPeer(peer) - return peer, nil -} - -// Ensures that sufficient peers are connected. -func (n *Node) ensurePeers() { - numPeers := n.sw.NumOutboundPeers() - numDialing := n.dialing.Size() - numToDial := minNumPeers - (numPeers + numDialing) - if numToDial <= 0 { - return - } - for i := 0; i < numToDial; i++ { - newBias := MinInt(numPeers, 8)*10 + 10 - var picked *p2p.NetAddress - // Try to fetch a new peer 3 times. - // This caps the maximum number of tries to 3 * numToDial. - for j := 0; i < 3; j++ { - picked = n.book.PickAddress(newBias) - if picked == nil { - log.Debug("Empty addrbook.") - return - } - if n.sw.Peers().Has(picked) { - continue - } else { - break - } +func (n *Node) inboundConnectionHandler(l p2p.Listener) { + for { + inConn, ok := <-l.Connections() + if !ok { + break } - if picked == nil { + // New incoming connection! + peer, err := n.sw.AddPeerWithConnection(inConn, false) + if err != nil { + log.Infof("Ignoring error from incoming connection: %v\n%v", + peer, err) continue } - go n.DialPeerWithAddress(picked) + // NOTE: We don't yet have the external address of the + // remote (if they have a listener at all). + // PeerManager's pexHandler will handle that. } + + // cleanup } -func (n *Node) ensurePeersHandler() { - // fire once immediately. - n.ensurePeers() - // fire periodically - timer := NewRepeatTimer(ensurePeersPeriodSeconds * time.Second) -FOR_LOOP: +func (n *Node) SendOurExternalAddrs(peer *p2p.Peer) { + // Send listener our external address(es) + addrs := []*p2p.NetAddress{} + for _, l := range n.lz { + addrs = append(addrs, l.ExternalAddress()) + } + pexAddrsMsg := &p2p.PexAddrsMessage{Addrs: addrs} + peer.Send(p2p.NewPacket( + p2p.PexCh, + BinaryBytes(pexAddrsMsg), + )) + // On the remote end, the pexHandler may choose + // to add these to its book. +} + +func (n *Node) newPeersHandler() { for { - select { - case <-timer.Ch: - n.ensurePeers() - case <-n.quit: - break FOR_LOOP + peer, ok := <-n.pmgr.NewPeers() + if !ok { + break } + // New outgoing peer! + n.SendOurExternalAddrs(peer) } - - // cleanup - timer.Stop() } func (n *Node) Stop() { + log.Infof("Stopping node") // TODO: gracefully disconnect from peers. n.sw.Stop() n.book.Stop() + n.pmgr.Stop() } //----------------------------------------------------------------------------- @@ -175,33 +129,38 @@ func main() { // Create & start node n := NewNode() - log.Warnf(">> %v", config.Config.LAddr) l := p2p.NewDefaultListener("tcp", config.Config.LAddr) n.AddListener(l) n.Start() // Seed? if config.Config.Seed != "" { - peer, err := n.DialPeerWithAddress(p2p.NewNetAddressString(config.Config.Seed)) + peer, err := n.sw.DialPeerWithAddress(p2p.NewNetAddressString(config.Config.Seed)) if err != nil { log.Errorf("Error dialing seed: %v", err) + //n.book.MarkAttempt(addr) return + } else { + log.Infof("Connected to seed: %v", peer) + n.SendOurExternalAddrs(peer) } - log.Infof("Connected to seed: %v", peer) } - // Sleep - trapSignal() - select {} + // Sleep forever and then... + trapSignal(func() { + n.Stop() + }) } -func trapSignal() { +func trapSignal(cb func()) { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) go func() { for sig := range c { log.Infof("captured %v, exiting..", sig) + cb() os.Exit(1) } }() + select {} } diff --git a/p2p/connection.go b/p2p/connection.go index 021bfd839..845d8a494 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -158,11 +158,11 @@ FOR_LOOP: c.flush() case <-c.pingRepeatTimer.Ch: _, err = packetTypePing.WriteTo(c.bufWriter) - log.Debugf("[%v] Sending Ping", c) + log.Debugf("Send [Ping] -> %v", c) c.flush() case <-c.pong: _, err = packetTypePong.WriteTo(c.bufWriter) - log.Debugf("[%v] Sending Pong", c) + log.Debugf("Send [Pong] -> %v", c) c.flush() case <-c.quit: break FOR_LOOP diff --git a/p2p/listener.go b/p2p/listener.go index 2335f07ad..196b1f20d 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -110,6 +110,8 @@ func (l *DefaultListener) listenHandler() { } } +// A channel of inbound connections. +// It gets closed when the listener closes. func (l *DefaultListener) Connections() <-chan *Connection { return l.connections } diff --git a/p2p/peer.go b/p2p/peer.go index 2b7395e34..36e7f0c4b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -1,6 +1,7 @@ package p2p import ( + "bytes" "fmt" "io" "sync/atomic" @@ -71,9 +72,10 @@ func (p *Peer) Channel(chName string) *Channel { return p.channels[chName] } -// TryQueue returns true if the packet was successfully queued. +// TrySend returns true if the packet was successfully queued. // Returning true does not imply that the packet will be sent. -func (p *Peer) TryQueue(pkt Packet) bool { +func (p *Peer) TrySend(pkt Packet) bool { + log.Debugf("TrySend [%v] -> %v", pkt, p) channel := p.Channel(string(pkt.Channel)) sendQueue := channel.sendQueue @@ -81,8 +83,6 @@ func (p *Peer) TryQueue(pkt Packet) bool { return false } - sendQueue <- pkt - return true select { case sendQueue <- pkt: return true @@ -91,6 +91,19 @@ func (p *Peer) TryQueue(pkt Packet) bool { } } +func (p *Peer) Send(pkt Packet) bool { + log.Debugf("Send [%v] -> %v", pkt, p) + channel := p.Channel(string(pkt.Channel)) + sendQueue := channel.sendQueue + + if atomic.LoadUint32(&p.stopped) == 1 { + return false + } + + sendQueue <- pkt + return true +} + func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { return p.RemoteAddress().WriteTo(w) } @@ -159,6 +172,8 @@ FOR_LOOP: // (none) } +//----------------------------------------------------------------------------- + /* ChannelDescriptor */ type ChannelDescriptor struct { @@ -196,7 +211,7 @@ func (c *Channel) SendQueue() chan<- Packet { return c.sendQueue } -/* Packet */ +//----------------------------------------------------------------------------- /* Packet encapsulates a ByteSlice on a Channel. @@ -207,10 +222,12 @@ type Packet struct { // Hash } -func NewPacket(chName String, bytes ByteSlice) Packet { +func NewPacket(chName String, msg Binary) Packet { + msgBytes := BinaryBytes(msg) + log.Tracef("NewPacket msg bytes: %X", msgBytes) return Packet{ Channel: chName, - Bytes: bytes, + Bytes: msgBytes, } } @@ -220,6 +237,14 @@ func (p Packet) WriteTo(w io.Writer) (n int64, err error) { return } +func (p Packet) Reader() io.Reader { + return bytes.NewReader(p.Bytes) +} + +func (p Packet) String() string { + return fmt.Sprintf("%v:%X", p.Channel, p.Bytes) +} + func ReadPacketSafe(r io.Reader) (pkt Packet, err error) { chName, err := ReadStringSafe(r) if err != nil { @@ -230,7 +255,8 @@ func ReadPacketSafe(r io.Reader) (pkt Packet, err error) { if err != nil { return } - return NewPacket(chName, bytes), nil + log.Tracef("ReadPacket* msg bytes: %X", bytes) + return Packet{Channel: chName, Bytes: bytes}, nil } /* diff --git a/p2p/peer_manager.go b/p2p/peer_manager.go new file mode 100644 index 000000000..9a60a2524 --- /dev/null +++ b/p2p/peer_manager.go @@ -0,0 +1,228 @@ +package p2p + +import ( + "bytes" + "errors" + "io" + "sync/atomic" + "time" + + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" +) + +var pexErrInvalidMessage = errors.New("Invalid PEX message") + +const ( + PexCh = "PEX" + ensurePeersPeriodSeconds = 30 + minNumPeers = 10 + maxNumPeers = 20 +) + +/* +PeerManager handles PEX (peer exchange) and ensures that an +adequate number of peers are connected to the switch. +User must pull from the .NewPeers() channel. +*/ +type PeerManager struct { + sw *Switch + book *AddrBook + quit chan struct{} + newPeers chan *Peer + started uint32 + stopped uint32 +} + +func NewPeerManager(sw *Switch, book *AddrBook) *PeerManager { + pm := &PeerManager{ + sw: sw, + book: book, + quit: make(chan struct{}), + newPeers: make(chan *Peer), + } + return pm +} + +func (pm *PeerManager) Start() { + if atomic.CompareAndSwapUint32(&pm.started, 0, 1) { + log.Infof("Starting peerManager") + go pm.ensurePeersHandler() + go pm.pexHandler() + } +} + +func (pm *PeerManager) Stop() { + if atomic.CompareAndSwapUint32(&pm.stopped, 0, 1) { + log.Infof("Stopping peerManager") + close(pm.newPeers) + close(pm.quit) + } +} + +// Closes when PeerManager closes. +func (pm *PeerManager) NewPeers() <-chan *Peer { + return pm.newPeers +} + +func (pm *PeerManager) ensurePeersHandler() { + // fire once immediately. + pm.ensurePeers() + // fire periodically + timer := NewRepeatTimer(ensurePeersPeriodSeconds * time.Second) +FOR_LOOP: + for { + select { + case <-timer.Ch: + pm.ensurePeers() + case <-pm.quit: + break FOR_LOOP + } + } + + // cleanup + timer.Stop() +} + +// Ensures that sufficient peers are connected. +func (pm *PeerManager) ensurePeers() { + numPeers := pm.sw.NumOutboundPeers() + numDialing := pm.sw.dialing.Size() + numToDial := minNumPeers - (numPeers + numDialing) + if numToDial <= 0 { + return + } + for i := 0; i < numToDial; i++ { + newBias := MinInt(numPeers, 8)*10 + 10 + var picked *NetAddress + // Try to fetch a new peer 3 times. + // This caps the maximum number of tries to 3 * numToDial. + for j := 0; i < 3; j++ { + picked = pm.book.PickAddress(newBias) + if picked == nil { + log.Debug("Empty addrbook.") + return + } + if pm.sw.Peers().Has(picked) { + continue + } else { + break + } + } + if picked == nil { + continue + } + // Dial picked address + go func() { + peer, err := pm.sw.DialPeerWithAddress(picked) + if err != nil { + pm.book.MarkAttempt(picked) + } + // Connection established. + pm.newPeers <- peer + }() + } +} + +func (pm *PeerManager) pexHandler() { + + for { + inPkt := pm.sw.Receive(PexCh) // {Peer, Time, Packet} + if inPkt == nil { + // Client has stopped + break + } + + // decode message + msg := decodeMessage(inPkt.Bytes) + log.Infof("pexHandler received %v", msg) + + switch msg.(type) { + case *PexRequestMessage: + // inPkt.Peer requested some peers. + // TODO: prevent abuse. + addrs := pm.book.GetSelection() + response := &PexAddrsMessage{Addrs: addrs} + pkt := NewPacket(PexCh, BinaryBytes(response)) + queued := inPkt.Peer.TrySend(pkt) + if !queued { + // ignore + } + case *PexAddrsMessage: + // We received some peer addresses from inPkt.Peer. + // TODO: prevent abuse. + // (We don't want to get spammed with bad peers) + srcAddr := inPkt.Peer.RemoteAddress() + for _, addr := range msg.(*PexAddrsMessage).Addrs { + pm.book.AddAddress(addr, srcAddr) + } + default: + // Bad peer. + pm.sw.StopPeerForError(inPkt.Peer, pexErrInvalidMessage) + } + } + + // cleanup + +} + +//----------------------------------------------------------------------------- + +/* Messages */ + +const ( + pexTypeUnknown = Byte(0x00) + pexTypeRequest = Byte(0x01) + pexTypeAddrs = Byte(0x02) +) + +// TODO: check for unnecessary extra bytes at the end. +func decodeMessage(bz ByteSlice) (msg Message) { + switch Byte(bz[0]) { + case pexTypeRequest: + return &PexRequestMessage{} + case pexTypeAddrs: + return readPexAddrsMessage(bytes.NewReader(bz[1:])) + default: + return nil + } +} + +/* +A PexRequestMessage requests additional peer addresses. +*/ +type PexRequestMessage struct { +} + +func (m *PexRequestMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteOnto(pexTypeRequest, w, n, err) + return +} + +/* +A message with announced peer addresses. +*/ +type PexAddrsMessage struct { + Addrs []*NetAddress +} + +func readPexAddrsMessage(r io.Reader) *PexAddrsMessage { + numAddrs := int(ReadUInt32(r)) + addrs := []*NetAddress{} + for i := 0; i < numAddrs; i++ { + addr := ReadNetAddress(r) + addrs = append(addrs, addr) + } + return &PexAddrsMessage{ + Addrs: addrs, + } +} + +func (m *PexAddrsMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteOnto(pexTypeAddrs, w, n, err) + n, err = WriteOnto(UInt32(len(m.Addrs)), w, n, err) + for _, addr := range m.Addrs { + n, err = WriteOnto(addr, w, n, err) + } + return +} diff --git a/p2p/peer_set.go b/p2p/peer_set.go index e8905ce03..c1404f5f9 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -5,11 +5,12 @@ import ( ) /* -ReadOnlyPeerSet has a subset of the methods of PeerSet. +IPeerSet has a (immutable) subset of the methods of PeerSet. */ -type ReadOnlyPeerSet interface { +type IPeerSet interface { Has(addr *NetAddress) bool List() []*Peer + Size() int } //----------------------------------------------------------------------------- diff --git a/p2p/pex.go b/p2p/pex.go deleted file mode 100644 index a7986f7a8..000000000 --- a/p2p/pex.go +++ /dev/null @@ -1,118 +0,0 @@ -package p2p - -import ( - "bytes" - "errors" - "io" - - . "github.com/tendermint/tendermint/binary" -) - -var pexErrInvalidMessage = errors.New("Invalid PEX message") - -const pexCh = "PEX" - -/* -The PexHandler routine should be started separately from the Switch. -It handles basic PEX communciation. -The application is responsible for sending out a PexRequestMessage. -*/ -func PexHandler(s *Switch, addrBook *AddrBook) { - - for { - inPkt := s.Receive(pexCh) // {Peer, Time, Packet} - if inPkt == nil { - // Client has stopped - break - } - - // decode message - msg := decodeMessage(inPkt.Bytes) - - switch msg.(type) { - case *PexRequestMessage: - // inPkt.Peer requested some peers. - // TODO: prevent abuse. - addrs := addrBook.GetSelection() - response := &pexResponseMessage{Addrs: addrs} - pkt := NewPacket(pexCh, BinaryBytes(response)) - queued := inPkt.Peer.TryQueue(pkt) - if !queued { - // ignore - } - case *pexResponseMessage: - // We received some peer addresses from inPkt.Peer. - // TODO: prevent abuse. - // (We don't want to get spammed with bad peers) - srcAddr := inPkt.Peer.RemoteAddress() - for _, addr := range msg.(*pexResponseMessage).Addrs { - addrBook.AddAddress(addr, srcAddr) - } - default: - // Bad peer. - s.StopPeerForError(inPkt.Peer, pexErrInvalidMessage) - } - } - - // cleanup - -} - -/* Messages */ - -const ( - pexTypeUnknown = Byte(0x00) - pexTypeRequest = Byte(0x01) - pexTypeResponse = Byte(0x02) -) - -// TODO: check for unnecessary extra bytes at the end. -func decodeMessage(bz ByteSlice) (msg Message) { - switch Byte(bz[0]) { - case pexTypeRequest: - return &PexRequestMessage{} - case pexTypeResponse: - return readPexResponseMessage(bytes.NewReader(bz[1:])) - default: - return nil - } -} - -/* -A response with peer addresses -*/ -type PexRequestMessage struct { -} - -func (m *PexRequestMessage) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(pexTypeRequest, w, n, err) - return -} - -/* -A response with peer addresses -*/ -type pexResponseMessage struct { - Addrs []*NetAddress -} - -func readPexResponseMessage(r io.Reader) *pexResponseMessage { - numAddrs := int(ReadUInt32(r)) - addrs := []*NetAddress{} - for i := 0; i < numAddrs; i++ { - addr := ReadNetAddress(r) - addrs = append(addrs, addr) - } - return &pexResponseMessage{ - Addrs: addrs, - } -} - -func (m *pexResponseMessage) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(pexTypeResponse, w, n, err) - n, err = WriteOnto(UInt32(len(m.Addrs)), w, n, err) - for _, addr := range m.Addrs { - n, err = WriteOnto(addr, w, n, err) - } - return -} diff --git a/p2p/switch.go b/p2p/switch.go index 7e49a933e..b55119d42 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -3,6 +3,7 @@ package p2p import ( "errors" "sync/atomic" + "time" . "github.com/tendermint/tendermint/common" ) @@ -22,6 +23,7 @@ type Switch struct { channels []ChannelDescriptor pktRecvQueues map[string]chan *InboundPacket peers *PeerSet + dialing *CMap quit chan struct{} started uint32 stopped uint32 @@ -32,6 +34,10 @@ var ( ErrSwitchDuplicatePeer = errors.New("Duplicate peer") ) +const ( + peerDialTimeoutSeconds = 30 +) + func NewSwitch(channels []ChannelDescriptor) *Switch { // make pktRecvQueues... pktRecvQueues := make(map[string]chan *InboundPacket) @@ -43,6 +49,7 @@ func NewSwitch(channels []ChannelDescriptor) *Switch { channels: channels, pktRecvQueues: pktRecvQueues, peers: NewPeerSet(), + dialing: NewCMap(), quit: make(chan struct{}), stopped: 0, } @@ -92,6 +99,25 @@ func (s *Switch) AddPeerWithConnection(conn *Connection, outbound bool) (*Peer, return peer, nil } +func (s *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { + if atomic.LoadUint32(&s.stopped) == 1 { + return nil, ErrSwitchStopped + } + + log.Infof("Dialing peer @ %v", addr) + s.dialing.Set(addr.String(), addr) + conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second) + s.dialing.Delete(addr.String()) + if err != nil { + return nil, err + } + peer, err := s.AddPeerWithConnection(conn, true) + if err != nil { + return nil, err + } + return peer, nil +} + func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) { if atomic.LoadUint32(&s.stopped) == 1 { return @@ -99,7 +125,7 @@ func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) { log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes)) for _, peer := range s.peers.List() { - success := peer.TryQueue(pkt) + success := peer.TrySend(pkt) log.Tracef("Broadcast for peer %v success: %v", peer, success) if success { numSuccess += 1 @@ -143,7 +169,7 @@ func (s *Switch) NumOutboundPeers() (count int) { return } -func (s *Switch) Peers() ReadOnlyPeerSet { +func (s *Switch) Peers() IPeerSet { return s.peers } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 57933dded..963f4b97b 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -67,25 +67,25 @@ func TestSwitches(t *testing.T) { } // Broadcast a message on ch1 - s1.Broadcast(NewPacket("ch1", ByteSlice("channel one"))) + s1.Broadcast(NewPacket("ch1", String("channel one"))) // Broadcast a message on ch2 - s1.Broadcast(NewPacket("ch2", ByteSlice("channel two"))) + s1.Broadcast(NewPacket("ch2", String("channel two"))) // Broadcast a message on ch3 - s1.Broadcast(NewPacket("ch3", ByteSlice("channel three"))) + s1.Broadcast(NewPacket("ch3", String("channel three"))) // Wait for things to settle... time.Sleep(100 * time.Millisecond) // Receive message from channel 2 and check inMsg := s2.Receive("ch2") - if string(inMsg.Bytes) != "channel two" { - t.Errorf("Unexpected received message bytes: %v", string(inMsg.Bytes)) + if ReadString(inMsg.Reader()) != "channel two" { + t.Errorf("Unexpected received message bytes: %X = [%v]", inMsg.Bytes, ReadString(inMsg.Reader())) } // Receive message from channel 1 and check inMsg = s2.Receive("ch1") - if string(inMsg.Bytes) != "channel one" { - t.Errorf("Unexpected received message bytes: %v", string(inMsg.Bytes)) + if ReadString(inMsg.Reader()) != "channel one" { + t.Errorf("Unexpected received message bytes: %X = [%v]", inMsg.Bytes, ReadString(inMsg.Reader())) } }