Browse Source

Hide internal methods to make GoDoc more user-friendly

pull/9/head
Jae Kwon 11 years ago
parent
commit
0f973c29ca
8 changed files with 229 additions and 144 deletions
  1. +66
    -34
      peer/client.go
  2. +5
    -6
      peer/client_test.go
  3. +53
    -24
      peer/connection.go
  4. +9
    -6
      peer/listener.go
  5. +13
    -10
      peer/msg.go
  6. +70
    -55
      peer/peer.go
  7. +6
    -5
      peer/upnp/upnp.go
  8. +7
    -4
      peer/upnp/upnp_test.go

+ 66
- 34
peer/client.go View File

@ -10,28 +10,34 @@ import (
"github.com/tendermint/tendermint/merkle" "github.com/tendermint/tendermint/merkle"
) )
/* Client
// BUG(jae) handle peer disconnects
A client is half of a p2p system.
It can reach out to the network and establish connections with servers.
A client doesn't listen for incoming connections -- that's done by the server.
/*
A client is half of a p2p system.
It can reach out to the network and establish connections with other peers.
A client doesn't listen for incoming connections -- that's done by the server.
makePeerFn is a factory method for generating new peers from new *Connections.
makePeerFn(nil) must return a prototypical peer that represents the self "peer".
All communication amongst peers are multiplexed by "channels".
(Not the same as Go "channels")
XXX what about peer disconnects?
To send a message, encapsulate it into a "Packet" and send it to each peer.
You can find all connected and active peers by iterating over ".Peers()".
".Broadcast()" is provided for convenience, but by iterating over
the peers manually the caller can decide which subset receives a message.
Incoming messages are received by calling ".Receive()".
*/ */
type Client struct { type Client struct {
addrBook *AddrBook addrBook *AddrBook
targetNumPeers int targetNumPeers int
makePeerFn func(*Connection) *Peer makePeerFn func(*Connection) *Peer
self *Peer self *Peer
recvQueues map[String]chan *InboundPacket
mtx sync.Mutex
peers merkle.Tree // addr -> *Peer
quit chan struct{}
stopped uint32
pktRecvQueues map[String]chan *InboundPacket
peersMtx sync.Mutex
peers merkle.Tree // addr -> *Peer
quit chan struct{}
erroredPeers chan peerError
stopped uint32
} }
var ( var (
@ -39,15 +45,17 @@ var (
CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer") CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer")
) )
// "makePeerFn" is a factory method for generating new peers from new *Connections.
// "makePeerFn(nil)" must return a prototypical peer that represents the self "peer".
func NewClient(makePeerFn func(*Connection) *Peer) *Client { func NewClient(makePeerFn func(*Connection) *Peer) *Client {
self := makePeerFn(nil) self := makePeerFn(nil)
if self == nil { if self == nil {
Panicf("makePeerFn(nil) must return a prototypical peer for self") Panicf("makePeerFn(nil) must return a prototypical peer for self")
} }
recvQueues := make(map[String]chan *InboundPacket)
pktRecvQueues := make(map[String]chan *InboundPacket)
for chName, _ := range self.channels { for chName, _ := range self.channels {
recvQueues[chName] = make(chan *InboundPacket)
pktRecvQueues[chName] = make(chan *InboundPacket)
} }
c := &Client{ c := &Client{
@ -55,30 +63,39 @@ func NewClient(makePeerFn func(*Connection) *Peer) *Client {
targetNumPeers: 0, // TODO targetNumPeers: 0, // TODO
makePeerFn: makePeerFn, makePeerFn: makePeerFn,
self: self, self: self,
recvQueues: recvQueues,
peers: merkle.NewIAVLTree(nil),
quit: make(chan struct{}),
stopped: 0,
pktRecvQueues: pktRecvQueues,
peers: merkle.NewIAVLTree(nil),
quit: make(chan struct{}),
erroredPeers: make(chan peerError),
stopped: 0,
} }
// automatically start
c.start()
return c return c
} }
func (c *Client) start() {
// Handle peer disconnects & errors
go c.peerErrorHandler()
}
func (c *Client) Stop() { func (c *Client) Stop() {
log.Infof("Stopping client") log.Infof("Stopping client")
// lock // lock
c.mtx.Lock()
c.peersMtx.Lock()
if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
close(c.quit) close(c.quit)
// stop each peer. // stop each peer.
for peerValue := range c.peers.Values() { for peerValue := range c.peers.Values() {
peer := peerValue.(*Peer) peer := peerValue.(*Peer)
peer.Stop()
peer.stop()
} }
// empty tree. // empty tree.
c.peers = merkle.NewIAVLTree(nil) c.peers = merkle.NewIAVLTree(nil)
} }
c.mtx.Unlock()
c.peersMtx.Unlock()
// unlock // unlock
} }
@ -95,7 +112,7 @@ func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer,
return nil, err return nil, err
} }
go peer.Start(c.recvQueues)
go peer.start(c.pktRecvQueues, c.erroredPeers)
return peer, nil return peer, nil
} }
@ -120,16 +137,18 @@ func (c *Client) Broadcast(pkt Packet) (numSuccess, numFailure int) {
} }
// blocks until a message is popped.
/*
Receive blocks on a channel until a message is found.
*/
func (c *Client) Receive(chName String) *InboundPacket { func (c *Client) Receive(chName String) *InboundPacket {
if atomic.LoadUint32(&c.stopped) == 1 { if atomic.LoadUint32(&c.stopped) == 1 {
return nil return nil
} }
log.Tracef("Receive on [%v]", chName) log.Tracef("Receive on [%v]", chName)
q := c.recvQueues[chName]
q := c.pktRecvQueues[chName]
if q == nil { if q == nil {
Panicf("Expected recvQueues[%f], found none", chName)
Panicf("Expected pktRecvQueues[%f], found none", chName)
} }
select { select {
@ -142,22 +161,22 @@ func (c *Client) Receive(chName String) *InboundPacket {
func (c *Client) Peers() merkle.Tree { func (c *Client) Peers() merkle.Tree {
// lock & defer // lock & defer
c.mtx.Lock()
defer c.mtx.Unlock()
c.peersMtx.Lock()
defer c.peersMtx.Unlock()
return c.peers.Copy() return c.peers.Copy()
// unlock deferred // unlock deferred
} }
func (c *Client) StopPeer(peer *Peer) { func (c *Client) StopPeer(peer *Peer) {
// lock // lock
c.mtx.Lock()
c.peersMtx.Lock()
peerValue, _ := c.peers.Remove(peer.RemoteAddress()) peerValue, _ := c.peers.Remove(peer.RemoteAddress())
c.mtx.Unlock()
c.peersMtx.Unlock()
// unlock // unlock
peer_ := peerValue.(*Peer) peer_ := peerValue.(*Peer)
if peer_ != nil { if peer_ != nil {
peer_.Stop()
peer_.stop()
} }
} }
@ -165,8 +184,8 @@ func (c *Client) addPeer(peer *Peer) error {
addr := peer.RemoteAddress() addr := peer.RemoteAddress()
// lock & defer // lock & defer
c.mtx.Lock()
defer c.mtx.Unlock()
c.peersMtx.Lock()
defer c.peersMtx.Unlock()
if c.stopped == 1 { if c.stopped == 1 {
return CLIENT_STOPPED_ERROR return CLIENT_STOPPED_ERROR
} }
@ -181,3 +200,16 @@ func (c *Client) addPeer(peer *Peer) error {
} }
// unlock deferred // unlock deferred
} }
func (c *Client) peerErrorHandler() {
for {
select {
case <-c.quit:
return
case errPeer := <-c.erroredPeers:
// TODO do something
c.StopPeer(errPeer.peer)
return
}
}
}

+ 5
- 6
peer/client_test.go View File

@ -8,15 +8,14 @@ import (
) )
// convenience method for creating two clients connected to each other. // convenience method for creating two clients connected to each other.
func makeClientPair(t testing.TB, bufferSize int, channels []String) (*Client, *Client) {
func makeClientPair(t testing.TB, bufferSize int, chNames []String) (*Client, *Client) {
peerMaker := func(conn *Connection) *Peer { peerMaker := func(conn *Connection) *Peer {
p := NewPeer(conn)
p.channels = map[String]*Channel{}
for _, chName := range channels {
p.channels[chName] = NewChannel(chName, bufferSize)
channels := map[String]*Channel{}
for _, chName := range chNames {
channels[chName] = NewChannel(chName, bufferSize)
} }
return p
return NewPeer(conn, channels)
} }
// Create two clients that will be interconnected. // Create two clients that will be interconnected.


+ 53
- 24
peer/connection.go View File

@ -20,7 +20,13 @@ const (
PING_TIMEOUT_MINUTES = 2 PING_TIMEOUT_MINUTES = 2
) )
/* Connnection */
// BUG(jae): Handle disconnects.
/*
A Connection wraps a network connection and handles buffering and multiplexing.
"Packets" are sent with ".Send(Packet)".
Packets received are sent to channels as commanded by the ".Start(...)" method.
*/
type Connection struct { type Connection struct {
ioStats IOStats ioStats IOStats
@ -30,9 +36,13 @@ type Connection struct {
bufWriter *bufio.Writer bufWriter *bufio.Writer
flushThrottler *Throttler flushThrottler *Throttler
quit chan struct{} quit chan struct{}
stopped uint32
pingRepeatTimer *RepeatTimer pingRepeatTimer *RepeatTimer
pong chan struct{} pong chan struct{}
channels map[String]*Channel
onError func(interface{})
started uint32
stopped uint32
errored uint32
} }
var ( var (
@ -54,22 +64,16 @@ func NewConnection(conn net.Conn) *Connection {
} }
} }
// returns true if successfully queued,
// returns false if connection was closed.
// blocks.
func (c *Connection) Send(pkt Packet) bool {
select {
case c.sendQueue <- pkt:
return true
case <-c.quit:
return false
}
}
func (c *Connection) Start(channels map[String]*Channel) {
// .Start() begins multiplexing packets to and from "channels".
// If an error occurs, the recovered reason is passed to "onError".
func (c *Connection) Start(channels map[String]*Channel, onError func(interface{})) {
log.Debugf("Starting %v", c) log.Debugf("Starting %v", c)
go c.sendHandler()
go c.recvHandler(channels)
if atomic.CompareAndSwapUint32(&c.started, 0, 1) {
c.channels = channels
c.onError = onError
go c.sendHandler()
go c.recvHandler()
}
} }
func (c *Connection) Stop() { func (c *Connection) Stop() {
@ -95,6 +99,18 @@ func (c *Connection) RemoteAddress() *NetAddress {
return NewNetAddress(c.conn.RemoteAddr()) return NewNetAddress(c.conn.RemoteAddr())
} }
// Returns true if successfully queued,
// Returns false if connection was closed.
// Blocks.
func (c *Connection) Send(pkt Packet) bool {
select {
case c.sendQueue <- pkt:
return true
case <-c.quit:
return false
}
}
func (c *Connection) String() string { func (c *Connection) String() string {
return fmt.Sprintf("Connection{%v}", c.conn.RemoteAddr()) return fmt.Sprintf("Connection{%v}", c.conn.RemoteAddr())
} }
@ -111,10 +127,22 @@ func (c *Connection) flush() {
} }
} }
// Catch panics, usually caused by remote disconnects.
func (c *Connection) _recover() {
if r := recover(); r != nil {
c.Stop()
if atomic.CompareAndSwapUint32(&c.errored, 0, 1) {
if c.onError != nil {
c.onError(r)
}
}
}
}
// sendHandler pulls from .sendQueue and writes to .bufWriter
func (c *Connection) sendHandler() { func (c *Connection) sendHandler() {
log.Tracef("%v sendHandler", c) log.Tracef("%v sendHandler", c)
// TODO: catch panics & stop connection.
defer c._recover()
FOR_LOOP: FOR_LOOP:
for { for {
@ -154,10 +182,11 @@ FOR_LOOP:
// cleanup // cleanup
} }
func (c *Connection) recvHandler(channels map[String]*Channel) {
log.Tracef("%v recvHandler with %v channels", c, len(channels))
// TODO: catch panics & stop connection.
// recvHandler reads from .bufReader and pushes to the appropriate
// channel's recvQueue.
func (c *Connection) recvHandler() {
log.Tracef("%v recvHandler", c)
defer c._recover()
FOR_LOOP: FOR_LOOP:
for { for {
@ -188,7 +217,7 @@ FOR_LOOP:
} }
break FOR_LOOP break FOR_LOOP
} }
channel := channels[pkt.Channel]
channel := c.channels[pkt.Channel]
if channel == nil { if channel == nil {
Panicf("Unknown channel %v", pkt.Channel) Panicf("Unknown channel %v", pkt.Channel)
} }


+ 9
- 6
peer/listener.go View File

@ -5,23 +5,26 @@ import (
"sync/atomic" "sync/atomic"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/peer/upnp"
) )
const ( const (
// TODO REMOVE
// BUG(jae) Remove DEFAULT_PORT
DEFAULT_PORT = 8001 DEFAULT_PORT = 8001
) )
/* Listener */
/*
Listener is part of a Server.
*/
type Listener interface { type Listener interface {
Connections() <-chan *Connection Connections() <-chan *Connection
LocalAddress() *NetAddress LocalAddress() *NetAddress
Stop() Stop()
} }
/* DefaultListener */
/*
DefaultListener is an implementation that works on the golang network stack.
*/
type DefaultListener struct { type DefaultListener struct {
listener net.Listener listener net.Listener
connections chan *Connection connections chan *Connection
@ -110,7 +113,7 @@ func GetUPNPLocalAddress() *NetAddress {
// removed because this takes too long. // removed because this takes too long.
return nil return nil
log.Infof("Getting UPNP local address") log.Infof("Getting UPNP local address")
nat, err := Discover()
nat, err := upnp.Discover()
if err != nil { if err != nil {
log.Infof("Could not get UPNP local address: %v", err) log.Infof("Could not get UPNP local address: %v", err)
return nil return nil


+ 13
- 10
peer/msg.go View File

@ -1,12 +1,14 @@
package peer package peer
import ( import (
. "github.com/tendermint/tendermint/binary"
"io" "io"
)
/* Packet */
. "github.com/tendermint/tendermint/binary"
)
/*
Packet encapsulates a ByteSlice on a Channel.
*/
type Packet struct { type Packet struct {
Channel String Channel String
Bytes ByteSlice Bytes ByteSlice
@ -39,17 +41,18 @@ func ReadPacketSafe(r io.Reader) (pkt Packet, err error) {
return NewPacket(chName, bytes), nil return NewPacket(chName, bytes), nil
} }
/* InboundPacket */
/*
InboundPacket extends Packet with fields relevant to incoming packets.
*/
type InboundPacket struct { type InboundPacket struct {
Peer *Peer
Channel *Channel
Time Time
Peer *Peer
Time Time
Packet Packet
} }
/* NewFilterMsg */
/*
NewFilterMsg is not implemented. TODO
*/
type NewFilterMsg struct { type NewFilterMsg struct {
ChName String ChName String
Filter interface{} // todo Filter interface{} // todo


+ 70
- 55
peer/peer.go View File

@ -3,7 +3,6 @@ package peer
import ( import (
"fmt" "fmt"
"io" "io"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -16,39 +15,44 @@ type Peer struct {
outgoing bool outgoing bool
conn *Connection conn *Connection
channels map[String]*Channel channels map[String]*Channel
mtx sync.Mutex
quit chan struct{}
stopped uint32
quit chan struct{}
started uint32
stopped uint32
} }
func NewPeer(conn *Connection) *Peer {
func NewPeer(conn *Connection, channels map[String]*Channel) *Peer {
return &Peer{ return &Peer{
conn: conn,
quit: make(chan struct{}),
stopped: 0,
conn: conn,
channels: channels,
quit: make(chan struct{}),
stopped: 0,
} }
} }
func (p *Peer) Start(peerRecvQueues map[String]chan *InboundPacket) {
func (p *Peer) start(pktRecvQueues map[String]chan *InboundPacket, erroredPeers chan peerError) {
log.Debugf("Starting %v", p) log.Debugf("Starting %v", p)
p.conn.Start(p.channels)
for chName, _ := range p.channels {
go p.recvHandler(chName, peerRecvQueues[chName])
go p.sendHandler(chName)
if atomic.CompareAndSwapUint32(&p.started, 0, 1) {
// on connection error
onError := func(r interface{}) {
p.stop()
erroredPeers <- peerError{p, r}
}
p.conn.Start(p.channels, onError)
for chName, _ := range p.channels {
chInQueue := pktRecvQueues[chName]
go p.recvHandler(chName, chInQueue)
go p.sendHandler(chName)
}
} }
} }
func (p *Peer) Stop() {
// lock
p.mtx.Lock()
func (p *Peer) stop() {
if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) { if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) {
log.Debugf("Stopping %v", p) log.Debugf("Stopping %v", p)
close(p.quit) close(p.quit)
p.conn.Stop() p.conn.Stop()
} }
p.mtx.Unlock()
// unlock
} }
func (p *Peer) LocalAddress() *NetAddress { func (p *Peer) LocalAddress() *NetAddress {
@ -63,25 +67,22 @@ func (p *Peer) Channel(chName String) *Channel {
return p.channels[chName] return p.channels[chName]
} }
// If the channel's queue is full, just return false.
// Later the sendHandler will send the pkt to the underlying connection.
// TrySend returns true if the packet was successfully queued.
// Returning true does not imply that the packet will be sent.
func (p *Peer) TrySend(pkt Packet) bool { func (p *Peer) TrySend(pkt Packet) bool {
channel := p.Channel(pkt.Channel) channel := p.Channel(pkt.Channel)
sendQueue := channel.SendQueue()
sendQueue := channel.sendQueue
// lock & defer
p.mtx.Lock()
defer p.mtx.Unlock()
if p.stopped == 1 {
if atomic.LoadUint32(&p.stopped) == 1 {
return false return false
} }
select { select {
case sendQueue <- pkt: case sendQueue <- pkt:
return true return true
default: // buffer full default: // buffer full
return false return false
} }
// unlock deferred
} }
func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
@ -92,55 +93,62 @@ func (p *Peer) String() string {
return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing) return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing)
} }
func (p *Peer) recvHandler(chName String, inboundPacketQueue chan<- *InboundPacket) {
log.Tracef("%v recvHandler [%v]", p, chName)
// sendHandler pulls from a channel and pushes to the connection.
// Each channel gets its own sendHandler goroutine;
// Golang's channel implementation handles the scheduling.
func (p *Peer) sendHandler(chName String) {
log.Tracef("%v sendHandler [%v]", p, chName)
channel := p.channels[chName] channel := p.channels[chName]
recvQueue := channel.RecvQueue()
sendQueue := channel.sendQueue
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
case <-p.quit: case <-p.quit:
break FOR_LOOP break FOR_LOOP
case pkt := <-recvQueue:
// send to inboundPacketQueue
inboundPacket := &InboundPacket{
Peer: p,
Channel: channel,
Time: Time{time.Now()},
Packet: pkt,
}
select {
case <-p.quit:
break FOR_LOOP
case inboundPacketQueue <- inboundPacket:
continue
}
case pkt := <-sendQueue:
log.Tracef("Sending packet to peer sendQueue")
// blocks until the connection is Stop'd,
// which happens when this peer is Stop'd.
p.conn.Send(pkt)
} }
} }
log.Tracef("%v recvHandler [%v] closed", p, chName)
log.Tracef("%v sendHandler [%v] closed", p, chName)
// cleanup // cleanup
// (none) // (none)
} }
func (p *Peer) sendHandler(chName String) {
log.Tracef("%v sendHandler [%v]", p, chName)
chSendQueue := p.channels[chName].sendQueue
// recvHandler pulls from a channel and pushes to the given pktRecvQueue.
// Each channel gets its own recvHandler goroutine.
// Many peers have goroutines that push to the same pktRecvQueue.
// Golang's channel implementation handles the scheduling.
func (p *Peer) recvHandler(chName String, pktRecvQueue chan<- *InboundPacket) {
log.Tracef("%v recvHandler [%v]", p, chName)
channel := p.channels[chName]
recvQueue := channel.recvQueue
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
case <-p.quit: case <-p.quit:
break FOR_LOOP break FOR_LOOP
case pkt := <-chSendQueue:
log.Tracef("Sending packet to peer chSendQueue")
// blocks until the connection is Stop'd,
// which happens when this peer is Stop'd.
p.conn.Send(pkt)
case pkt := <-recvQueue:
// send to pktRecvQueue
inboundPacket := &InboundPacket{
Peer: p,
Time: Time{time.Now()},
Packet: pkt,
}
select {
case <-p.quit:
break FOR_LOOP
case pktRecvQueue <- inboundPacket:
continue
}
} }
} }
log.Tracef("%v sendHandler [%v] closed", p, chName)
log.Tracef("%v recvHandler [%v] closed", p, chName)
// cleanup // cleanup
// (none) // (none)
} }
@ -173,3 +181,10 @@ func (c *Channel) RecvQueue() <-chan Packet {
func (c *Channel) SendQueue() chan<- Packet { func (c *Channel) SendQueue() chan<- Packet {
return c.sendQueue return c.sendQueue
} }
/* Misc */
type peerError struct {
peer *Peer
err interface{}
}

peer/upnp.go → peer/upnp/upnp.go View File


peer/upnp_test.go → peer/upnp/upnp_test.go View File


Loading…
Cancel
Save