From 0f973c29ca08559c201d823f147b5c5996d7398a Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Fri, 4 Jul 2014 14:29:27 -0700 Subject: [PATCH] Hide internal methods to make GoDoc more user-friendly --- peer/client.go | 100 ++++++++++++++++++---------- peer/client_test.go | 11 ++- peer/connection.go | 77 ++++++++++++++------- peer/listener.go | 15 +++-- peer/msg.go | 23 ++++--- peer/peer.go | 125 ++++++++++++++++++++--------------- peer/{ => upnp}/upnp.go | 11 +-- peer/{ => upnp}/upnp_test.go | 11 +-- 8 files changed, 229 insertions(+), 144 deletions(-) rename peer/{ => upnp}/upnp.go (98%) rename peer/{ => upnp}/upnp_test.go (88%) diff --git a/peer/client.go b/peer/client.go index 7689ee8bc..75d33a2e8 100644 --- a/peer/client.go +++ b/peer/client.go @@ -10,28 +10,34 @@ import ( "github.com/tendermint/tendermint/merkle" ) -/* Client +// BUG(jae) handle peer disconnects - A client is half of a p2p system. - It can reach out to the network and establish connections with servers. - A client doesn't listen for incoming connections -- that's done by the server. +/* +A client is half of a p2p system. +It can reach out to the network and establish connections with other peers. +A client doesn't listen for incoming connections -- that's done by the server. - makePeerFn is a factory method for generating new peers from new *Connections. - makePeerFn(nil) must return a prototypical peer that represents the self "peer". +All communication amongst peers are multiplexed by "channels". +(Not the same as Go "channels") - XXX what about peer disconnects? +To send a message, encapsulate it into a "Packet" and send it to each peer. +You can find all connected and active peers by iterating over ".Peers()". +".Broadcast()" is provided for convenience, but by iterating over +the peers manually the caller can decide which subset receives a message. + +Incoming messages are received by calling ".Receive()". */ type Client struct { addrBook *AddrBook targetNumPeers int makePeerFn func(*Connection) *Peer self *Peer - recvQueues map[String]chan *InboundPacket - - mtx sync.Mutex - peers merkle.Tree // addr -> *Peer - quit chan struct{} - stopped uint32 + pktRecvQueues map[String]chan *InboundPacket + peersMtx sync.Mutex + peers merkle.Tree // addr -> *Peer + quit chan struct{} + erroredPeers chan peerError + stopped uint32 } var ( @@ -39,15 +45,17 @@ var ( CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer") ) +// "makePeerFn" is a factory method for generating new peers from new *Connections. +// "makePeerFn(nil)" must return a prototypical peer that represents the self "peer". func NewClient(makePeerFn func(*Connection) *Peer) *Client { self := makePeerFn(nil) if self == nil { Panicf("makePeerFn(nil) must return a prototypical peer for self") } - recvQueues := make(map[String]chan *InboundPacket) + pktRecvQueues := make(map[String]chan *InboundPacket) for chName, _ := range self.channels { - recvQueues[chName] = make(chan *InboundPacket) + pktRecvQueues[chName] = make(chan *InboundPacket) } c := &Client{ @@ -55,30 +63,39 @@ func NewClient(makePeerFn func(*Connection) *Peer) *Client { targetNumPeers: 0, // TODO makePeerFn: makePeerFn, self: self, - recvQueues: recvQueues, - - peers: merkle.NewIAVLTree(nil), - quit: make(chan struct{}), - stopped: 0, + pktRecvQueues: pktRecvQueues, + peers: merkle.NewIAVLTree(nil), + quit: make(chan struct{}), + erroredPeers: make(chan peerError), + stopped: 0, } + + // automatically start + c.start() + return c } +func (c *Client) start() { + // Handle peer disconnects & errors + go c.peerErrorHandler() +} + func (c *Client) Stop() { log.Infof("Stopping client") // lock - c.mtx.Lock() + c.peersMtx.Lock() if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { close(c.quit) // stop each peer. for peerValue := range c.peers.Values() { peer := peerValue.(*Peer) - peer.Stop() + peer.stop() } // empty tree. c.peers = merkle.NewIAVLTree(nil) } - c.mtx.Unlock() + c.peersMtx.Unlock() // unlock } @@ -95,7 +112,7 @@ func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, return nil, err } - go peer.Start(c.recvQueues) + go peer.start(c.pktRecvQueues, c.erroredPeers) return peer, nil } @@ -120,16 +137,18 @@ func (c *Client) Broadcast(pkt Packet) (numSuccess, numFailure int) { } -// blocks until a message is popped. +/* +Receive blocks on a channel until a message is found. +*/ func (c *Client) Receive(chName String) *InboundPacket { if atomic.LoadUint32(&c.stopped) == 1 { return nil } log.Tracef("Receive on [%v]", chName) - q := c.recvQueues[chName] + q := c.pktRecvQueues[chName] if q == nil { - Panicf("Expected recvQueues[%f], found none", chName) + Panicf("Expected pktRecvQueues[%f], found none", chName) } select { @@ -142,22 +161,22 @@ func (c *Client) Receive(chName String) *InboundPacket { func (c *Client) Peers() merkle.Tree { // lock & defer - c.mtx.Lock() - defer c.mtx.Unlock() + c.peersMtx.Lock() + defer c.peersMtx.Unlock() return c.peers.Copy() // unlock deferred } func (c *Client) StopPeer(peer *Peer) { // lock - c.mtx.Lock() + c.peersMtx.Lock() peerValue, _ := c.peers.Remove(peer.RemoteAddress()) - c.mtx.Unlock() + c.peersMtx.Unlock() // unlock peer_ := peerValue.(*Peer) if peer_ != nil { - peer_.Stop() + peer_.stop() } } @@ -165,8 +184,8 @@ func (c *Client) addPeer(peer *Peer) error { addr := peer.RemoteAddress() // lock & defer - c.mtx.Lock() - defer c.mtx.Unlock() + c.peersMtx.Lock() + defer c.peersMtx.Unlock() if c.stopped == 1 { return CLIENT_STOPPED_ERROR } @@ -181,3 +200,16 @@ func (c *Client) addPeer(peer *Peer) error { } // unlock deferred } + +func (c *Client) peerErrorHandler() { + for { + select { + case <-c.quit: + return + case errPeer := <-c.erroredPeers: + // TODO do something + c.StopPeer(errPeer.peer) + return + } + } +} diff --git a/peer/client_test.go b/peer/client_test.go index 54c1e291e..31e8cb685 100644 --- a/peer/client_test.go +++ b/peer/client_test.go @@ -8,15 +8,14 @@ import ( ) // convenience method for creating two clients connected to each other. -func makeClientPair(t testing.TB, bufferSize int, channels []String) (*Client, *Client) { +func makeClientPair(t testing.TB, bufferSize int, chNames []String) (*Client, *Client) { peerMaker := func(conn *Connection) *Peer { - p := NewPeer(conn) - p.channels = map[String]*Channel{} - for _, chName := range channels { - p.channels[chName] = NewChannel(chName, bufferSize) + channels := map[String]*Channel{} + for _, chName := range chNames { + channels[chName] = NewChannel(chName, bufferSize) } - return p + return NewPeer(conn, channels) } // Create two clients that will be interconnected. diff --git a/peer/connection.go b/peer/connection.go index 6b4e86152..86cf77d3c 100644 --- a/peer/connection.go +++ b/peer/connection.go @@ -20,7 +20,13 @@ const ( PING_TIMEOUT_MINUTES = 2 ) -/* Connnection */ +// BUG(jae): Handle disconnects. + +/* +A Connection wraps a network connection and handles buffering and multiplexing. +"Packets" are sent with ".Send(Packet)". +Packets received are sent to channels as commanded by the ".Start(...)" method. +*/ type Connection struct { ioStats IOStats @@ -30,9 +36,13 @@ type Connection struct { bufWriter *bufio.Writer flushThrottler *Throttler quit chan struct{} - stopped uint32 pingRepeatTimer *RepeatTimer pong chan struct{} + channels map[String]*Channel + onError func(interface{}) + started uint32 + stopped uint32 + errored uint32 } var ( @@ -54,22 +64,16 @@ func NewConnection(conn net.Conn) *Connection { } } -// returns true if successfully queued, -// returns false if connection was closed. -// blocks. -func (c *Connection) Send(pkt Packet) bool { - select { - case c.sendQueue <- pkt: - return true - case <-c.quit: - return false - } -} - -func (c *Connection) Start(channels map[String]*Channel) { +// .Start() begins multiplexing packets to and from "channels". +// If an error occurs, the recovered reason is passed to "onError". +func (c *Connection) Start(channels map[String]*Channel, onError func(interface{})) { log.Debugf("Starting %v", c) - go c.sendHandler() - go c.recvHandler(channels) + if atomic.CompareAndSwapUint32(&c.started, 0, 1) { + c.channels = channels + c.onError = onError + go c.sendHandler() + go c.recvHandler() + } } func (c *Connection) Stop() { @@ -95,6 +99,18 @@ func (c *Connection) RemoteAddress() *NetAddress { return NewNetAddress(c.conn.RemoteAddr()) } +// Returns true if successfully queued, +// Returns false if connection was closed. +// Blocks. +func (c *Connection) Send(pkt Packet) bool { + select { + case c.sendQueue <- pkt: + return true + case <-c.quit: + return false + } +} + func (c *Connection) String() string { return fmt.Sprintf("Connection{%v}", c.conn.RemoteAddr()) } @@ -111,10 +127,22 @@ func (c *Connection) flush() { } } +// Catch panics, usually caused by remote disconnects. +func (c *Connection) _recover() { + if r := recover(); r != nil { + c.Stop() + if atomic.CompareAndSwapUint32(&c.errored, 0, 1) { + if c.onError != nil { + c.onError(r) + } + } + } +} + +// sendHandler pulls from .sendQueue and writes to .bufWriter func (c *Connection) sendHandler() { log.Tracef("%v sendHandler", c) - - // TODO: catch panics & stop connection. + defer c._recover() FOR_LOOP: for { @@ -154,10 +182,11 @@ FOR_LOOP: // cleanup } -func (c *Connection) recvHandler(channels map[String]*Channel) { - log.Tracef("%v recvHandler with %v channels", c, len(channels)) - - // TODO: catch panics & stop connection. +// recvHandler reads from .bufReader and pushes to the appropriate +// channel's recvQueue. +func (c *Connection) recvHandler() { + log.Tracef("%v recvHandler", c) + defer c._recover() FOR_LOOP: for { @@ -188,7 +217,7 @@ FOR_LOOP: } break FOR_LOOP } - channel := channels[pkt.Channel] + channel := c.channels[pkt.Channel] if channel == nil { Panicf("Unknown channel %v", pkt.Channel) } diff --git a/peer/listener.go b/peer/listener.go index 5060d434d..fac97601f 100644 --- a/peer/listener.go +++ b/peer/listener.go @@ -5,23 +5,26 @@ import ( "sync/atomic" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/peer/upnp" ) const ( - // TODO REMOVE + // BUG(jae) Remove DEFAULT_PORT DEFAULT_PORT = 8001 ) -/* Listener */ - +/* +Listener is part of a Server. +*/ type Listener interface { Connections() <-chan *Connection LocalAddress() *NetAddress Stop() } -/* DefaultListener */ - +/* +DefaultListener is an implementation that works on the golang network stack. +*/ type DefaultListener struct { listener net.Listener connections chan *Connection @@ -110,7 +113,7 @@ func GetUPNPLocalAddress() *NetAddress { // removed because this takes too long. return nil log.Infof("Getting UPNP local address") - nat, err := Discover() + nat, err := upnp.Discover() if err != nil { log.Infof("Could not get UPNP local address: %v", err) return nil diff --git a/peer/msg.go b/peer/msg.go index 9f07ef738..bf9e6420d 100644 --- a/peer/msg.go +++ b/peer/msg.go @@ -1,12 +1,14 @@ package peer import ( - . "github.com/tendermint/tendermint/binary" "io" -) -/* Packet */ + . "github.com/tendermint/tendermint/binary" +) +/* +Packet encapsulates a ByteSlice on a Channel. +*/ type Packet struct { Channel String Bytes ByteSlice @@ -39,17 +41,18 @@ func ReadPacketSafe(r io.Reader) (pkt Packet, err error) { return NewPacket(chName, bytes), nil } -/* InboundPacket */ - +/* +InboundPacket extends Packet with fields relevant to incoming packets. +*/ type InboundPacket struct { - Peer *Peer - Channel *Channel - Time Time + Peer *Peer + Time Time Packet } -/* NewFilterMsg */ - +/* +NewFilterMsg is not implemented. TODO +*/ type NewFilterMsg struct { ChName String Filter interface{} // todo diff --git a/peer/peer.go b/peer/peer.go index a36d90f02..bb25b5b18 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -3,7 +3,6 @@ package peer import ( "fmt" "io" - "sync" "sync/atomic" "time" @@ -16,39 +15,44 @@ type Peer struct { outgoing bool conn *Connection channels map[String]*Channel - - mtx sync.Mutex - quit chan struct{} - stopped uint32 + quit chan struct{} + started uint32 + stopped uint32 } -func NewPeer(conn *Connection) *Peer { +func NewPeer(conn *Connection, channels map[String]*Channel) *Peer { return &Peer{ - conn: conn, - quit: make(chan struct{}), - stopped: 0, + conn: conn, + channels: channels, + quit: make(chan struct{}), + stopped: 0, } } -func (p *Peer) Start(peerRecvQueues map[String]chan *InboundPacket) { +func (p *Peer) start(pktRecvQueues map[String]chan *InboundPacket, erroredPeers chan peerError) { log.Debugf("Starting %v", p) - p.conn.Start(p.channels) - for chName, _ := range p.channels { - go p.recvHandler(chName, peerRecvQueues[chName]) - go p.sendHandler(chName) + + if atomic.CompareAndSwapUint32(&p.started, 0, 1) { + // on connection error + onError := func(r interface{}) { + p.stop() + erroredPeers <- peerError{p, r} + } + p.conn.Start(p.channels, onError) + for chName, _ := range p.channels { + chInQueue := pktRecvQueues[chName] + go p.recvHandler(chName, chInQueue) + go p.sendHandler(chName) + } } } -func (p *Peer) Stop() { - // lock - p.mtx.Lock() +func (p *Peer) stop() { if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) { log.Debugf("Stopping %v", p) close(p.quit) p.conn.Stop() } - p.mtx.Unlock() - // unlock } func (p *Peer) LocalAddress() *NetAddress { @@ -63,25 +67,22 @@ func (p *Peer) Channel(chName String) *Channel { return p.channels[chName] } -// If the channel's queue is full, just return false. -// Later the sendHandler will send the pkt to the underlying connection. +// TrySend returns true if the packet was successfully queued. +// Returning true does not imply that the packet will be sent. func (p *Peer) TrySend(pkt Packet) bool { channel := p.Channel(pkt.Channel) - sendQueue := channel.SendQueue() + sendQueue := channel.sendQueue - // lock & defer - p.mtx.Lock() - defer p.mtx.Unlock() - if p.stopped == 1 { + if atomic.LoadUint32(&p.stopped) == 1 { return false } + select { case sendQueue <- pkt: return true default: // buffer full return false } - // unlock deferred } func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { @@ -92,55 +93,62 @@ func (p *Peer) String() string { return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing) } -func (p *Peer) recvHandler(chName String, inboundPacketQueue chan<- *InboundPacket) { - log.Tracef("%v recvHandler [%v]", p, chName) +// sendHandler pulls from a channel and pushes to the connection. +// Each channel gets its own sendHandler goroutine; +// Golang's channel implementation handles the scheduling. +func (p *Peer) sendHandler(chName String) { + log.Tracef("%v sendHandler [%v]", p, chName) channel := p.channels[chName] - recvQueue := channel.RecvQueue() - + sendQueue := channel.sendQueue FOR_LOOP: for { select { case <-p.quit: break FOR_LOOP - case pkt := <-recvQueue: - // send to inboundPacketQueue - inboundPacket := &InboundPacket{ - Peer: p, - Channel: channel, - Time: Time{time.Now()}, - Packet: pkt, - } - select { - case <-p.quit: - break FOR_LOOP - case inboundPacketQueue <- inboundPacket: - continue - } + case pkt := <-sendQueue: + log.Tracef("Sending packet to peer sendQueue") + // blocks until the connection is Stop'd, + // which happens when this peer is Stop'd. + p.conn.Send(pkt) } } - log.Tracef("%v recvHandler [%v] closed", p, chName) + log.Tracef("%v sendHandler [%v] closed", p, chName) // cleanup // (none) } -func (p *Peer) sendHandler(chName String) { - log.Tracef("%v sendHandler [%v]", p, chName) - chSendQueue := p.channels[chName].sendQueue +// recvHandler pulls from a channel and pushes to the given pktRecvQueue. +// Each channel gets its own recvHandler goroutine. +// Many peers have goroutines that push to the same pktRecvQueue. +// Golang's channel implementation handles the scheduling. +func (p *Peer) recvHandler(chName String, pktRecvQueue chan<- *InboundPacket) { + log.Tracef("%v recvHandler [%v]", p, chName) + channel := p.channels[chName] + recvQueue := channel.recvQueue + FOR_LOOP: for { select { case <-p.quit: break FOR_LOOP - case pkt := <-chSendQueue: - log.Tracef("Sending packet to peer chSendQueue") - // blocks until the connection is Stop'd, - // which happens when this peer is Stop'd. - p.conn.Send(pkt) + case pkt := <-recvQueue: + // send to pktRecvQueue + inboundPacket := &InboundPacket{ + Peer: p, + Time: Time{time.Now()}, + Packet: pkt, + } + select { + case <-p.quit: + break FOR_LOOP + case pktRecvQueue <- inboundPacket: + continue + } } } - log.Tracef("%v sendHandler [%v] closed", p, chName) + log.Tracef("%v recvHandler [%v] closed", p, chName) // cleanup // (none) } @@ -173,3 +181,10 @@ func (c *Channel) RecvQueue() <-chan Packet { func (c *Channel) SendQueue() chan<- Packet { return c.sendQueue } + +/* Misc */ + +type peerError struct { + peer *Peer + err interface{} +} diff --git a/peer/upnp.go b/peer/upnp/upnp.go similarity index 98% rename from peer/upnp.go rename to peer/upnp/upnp.go index db8d66f99..2f3e7a9ab 100644 --- a/peer/upnp.go +++ b/peer/upnp/upnp.go @@ -1,10 +1,11 @@ -// from taipei-torrent -// TODO: use syscalls to get actual ourIP. http://pastebin.com/9exZG4rh +/* +Taken from taipei-torrent -package peer +Just enough UPnP to be able to forward ports +*/ +package upnp -// Just enough UPnP to be able to forward ports -// +// BUG(jae): TODO: use syscalls to get actual ourIP. http://pastebin.com/9exZG4rh import ( "bytes" diff --git a/peer/upnp_test.go b/peer/upnp/upnp_test.go similarity index 88% rename from peer/upnp_test.go rename to peer/upnp/upnp_test.go index 082ed4ffa..1e5a3f456 100644 --- a/peer/upnp_test.go +++ b/peer/upnp/upnp_test.go @@ -1,6 +1,7 @@ -package peer +package upnp import ( + "net" "testing" "time" ) @@ -9,7 +10,6 @@ import ( This is a manual test. TODO: set up or find a service to probe open ports. */ - func TestUPNP(t *testing.T) { t.Log("hello!") @@ -33,7 +33,10 @@ func TestUPNP(t *testing.T) { t.Logf("Port mapping mapped: %v", port) // also run the listener, open for all remote addresses. - listener := NewDefaultListener("tcp", "0.0.0.0:8001") + listener, err := net.Listen("tcp", ":8001") + if err != nil { + panic(err) + } // now sleep for 10 seconds time.Sleep(10 * time.Second) @@ -44,5 +47,5 @@ func TestUPNP(t *testing.T) { } t.Logf("Port mapping deleted") - listener.Stop() + listener.Close() }