Browse Source

.

pull/9/head
Jae Kwon 11 years ago
parent
commit
3cc385e881
8 changed files with 73 additions and 27 deletions
  1. +19
    -16
      peer/client.go
  2. +16
    -5
      peer/client_test.go
  3. +5
    -1
      peer/connection.go
  4. +4
    -2
      peer/listener.go
  5. +20
    -2
      peer/log.go
  6. +2
    -1
      peer/netaddress.go
  7. +5
    -0
      peer/peer.go
  8. +2
    -0
      peer/server.go

+ 19
- 16
peer/client.go View File

@ -15,15 +15,15 @@ import (
It can reach out to the network and establish connections with servers.
A client doesn't listen for incoming connections -- that's done by the server.
peerMaker is a factory method for generating new peers from new *Connections.
peerMaker(nil) must return a prototypical peer that represents the self "peer".
makePeerFn is a factory method for generating new peers from new *Connections.
makePeerFn(nil) must return a prototypical peer that represents the self "peer".
XXX what about peer disconnects?
*/
type Client struct {
addrBook *AddrBook
targetNumPeers int
peerMaker func(*Connection) *Peer
makePeerFn func(*Connection) *Peer
self *Peer
inQueues map[String]chan *InboundMsg
@ -38,10 +38,10 @@ var (
CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer")
)
func NewClient(peerMaker func(*Connection) *Peer) *Client {
self := peerMaker(nil)
func NewClient(makePeerFn func(*Connection) *Peer) *Client {
self := makePeerFn(nil)
if self == nil {
Panicf("peerMaker(nil) must return a prototypical peer for self")
Panicf("makePeerFn(nil) must return a prototypical peer for self")
}
inQueues := make(map[String]chan *InboundMsg)
@ -52,7 +52,7 @@ func NewClient(peerMaker func(*Connection) *Peer) *Client {
c := &Client{
addrBook: nil, // TODO
targetNumPeers: 0, // TODO
peerMaker: peerMaker,
makePeerFn: makePeerFn,
self: self,
inQueues: inQueues,
@ -64,6 +64,7 @@ func NewClient(peerMaker func(*Connection) *Peer) *Client {
}
func (c *Client) Stop() {
log.Infof("Stopping client")
// lock
c.mtx.Lock()
if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
@ -83,7 +84,8 @@ func (c *Client) Stop() {
func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) {
if atomic.LoadUint32(&c.stopped) == 1 { return nil, CLIENT_STOPPED_ERROR }
peer := c.peerMaker(conn)
log.Infof("Adding peer with connection: %v, outgoing: %v", conn, outgoing)
peer := c.makePeerFn(conn)
peer.outgoing = outgoing
err := c.addPeer(peer)
if err != nil { return nil, err }
@ -96,7 +98,7 @@ func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer,
func (c *Client) Broadcast(chName String, msg Msg) {
if atomic.LoadUint32(&c.stopped) == 1 { return }
for v := range c.peersCopy().Values() {
for v := range c.Peers().Values() {
peer := v.(*Peer)
success := peer.TryQueueOut(chName , msg)
if !success {
@ -121,6 +123,13 @@ func (c *Client) PopMessage(chName String) *InboundMsg {
}
}
func (c *Client) Peers() merkle.Tree {
// lock & defer
c.mtx.Lock(); defer c.mtx.Unlock()
return c.peers.Copy()
// unlock deferred
}
func (c *Client) StopPeer(peer *Peer) {
// lock
c.mtx.Lock()
@ -141,6 +150,7 @@ func (c *Client) addPeer(peer *Peer) error {
c.mtx.Lock(); defer c.mtx.Unlock()
if c.stopped == 1 { return CLIENT_STOPPED_ERROR }
if !c.peers.Has(addr) {
log.Tracef("Actually putting addr: %v, peer: %v", addr, peer)
c.peers.Put(addr, peer)
return nil
} else {
@ -150,10 +160,3 @@ func (c *Client) addPeer(peer *Peer) error {
}
// unlock deferred
}
func (c *Client) peersCopy() merkle.Tree {
// lock & defer
c.mtx.Lock(); defer c.mtx.Unlock()
return c.peers.Copy()
// unlock deferred
}

+ 16
- 5
peer/client_test.go View File

@ -21,19 +21,30 @@ func TestConnection(t *testing.T) {
c2 := NewClient(peerMaker)
s1 := NewServer("tcp", ":8001", c1)
s1laddr := s1.LocalAddress()
conn, err := s1.LocalAddress().Dial()
conn, err := s1laddr.Dial()
if err != nil {
t.Fatalf("Could not connect to server address %v", s1.LocalAddress())
t.Fatalf("Could not connect to server address %v", s1laddr)
} else {
t.Logf("Created a connection to local server address %v", s1laddr)
}
c2.AddPeerWithConnection(conn, true)
// Wait for things to happen, peers to get added...
time.Sleep(100 * time.Millisecond)
// lets send a message from c1 to c2.
// XXX do we even want a broadcast function?
//c1.Broadcast(String(""), String("message"))
time.Sleep(500 * time.Millisecond)
if c1.Peers().Size() != 1 {
t.Errorf("Expected exactly 1 peer in c1, got %v", c1.Peers().Size())
}
if c2.Peers().Size() != 1 {
t.Errorf("Expected exactly 1 peer in c2, got %v", c2.Peers().Size())
}
// TODO: test the transmission of information on channels.
time.Sleep(500 * time.Millisecond)
//inMsg := c2.PopMessage(String(""))
s1.Stop()


+ 5
- 1
peer/connection.go View File

@ -6,6 +6,7 @@ import (
"sync/atomic"
"net"
"time"
"fmt"
)
const (
@ -80,6 +81,10 @@ func (c *Connection) RemoteAddress() *NetAddress {
return NewNetAddress(c.conn.RemoteAddr())
}
func (c *Connection) String() string {
return fmt.Sprintf("Connection{%v}", c.conn.RemoteAddr())
}
func (c *Connection) flush() {
// TODO flush? (turn off nagel, turn back on, etc)
}
@ -158,7 +163,6 @@ func (c *Connection) inHandler() {
}
/* IOStats */
type IOStats struct {
TimeConnected Time


+ 4
- 2
peer/listener.go View File

@ -1,6 +1,7 @@
package peer
import (
. "github.com/tendermint/tendermint/common"
"sync/atomic"
"net"
)
@ -111,13 +112,14 @@ func GetUPNPLocalAddress() *NetAddress {
// TODO: use syscalls to get actual ourIP. http://pastebin.com/9exZG4rh
func GetDefaultLocalAddress() *NetAddress {
addrs, err := net.InterfaceAddrs()
if err != nil { panic("Wtf") }
if err != nil { Panicf("Unexpected error fetching interface addresses: %v", err) }
for _, a := range addrs {
ipnet, ok := a.(*net.IPNet)
if !ok { continue }
v4 := ipnet.IP.To4()
if v4 == nil || v4[0] == 127 { continue } // loopback
return NewNetAddress(a)
return NewNetAddressIPPort(ipnet.IP, DEFAULT_PORT)
}
return nil
}


+ 20
- 2
peer/log.go View File

@ -1,7 +1,25 @@
package peer
import (
"github.com/tendermint/btclog"
"github.com/cihub/seelog"
)
var log = btclog.Disabled
var log seelog.LoggerInterface
func init() {
// TODO: replace with configuration file in the ~/.tendermint directory.
config := `
<seelog type="sync">
<outputs formatid="colored">
<console/>
</outputs>
<formats>
<format id="main" format="%Date/%Time [%LEV] %Msg%n"/>
<format id="colored" format="%EscM(46)%Level%EscM(49) %EscM(36)%File%EscM(39) %Msg%n%EscM(0)"/>
</formats>
</seelog>`
var err error
log, err = seelog.LoggerFromConfigAsBytes([]byte(config))
if err != nil { panic(err) }
}

+ 2
- 1
peer/netaddress.go View File

@ -5,6 +5,7 @@
package peer
import (
. "github.com/tendermint/tendermint/common"
. "github.com/tendermint/tendermint/binary"
"io"
"net"
@ -21,7 +22,7 @@ type NetAddress struct {
// TODO: socks proxies?
func NewNetAddress(addr net.Addr) *NetAddress {
tcpAddr, ok := addr.(*net.TCPAddr)
if !ok { panic("Only TCPAddrs are supported") }
if !ok { Panicf("Only TCPAddrs are supported. Got: %v", addr) }
ip := tcpAddr.IP
port := UInt16(tcpAddr.Port)
return NewNetAddressIPPort(ip, port)


+ 5
- 0
peer/peer.go View File

@ -6,6 +6,7 @@ import (
"sync"
"io"
"time"
"fmt"
)
/* Peer */
@ -80,6 +81,10 @@ func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
return p.RemoteAddress().WriteTo(w)
}
func (p *Peer) String() string {
return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing)
}
func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) {
channel := p.channels[chName]
inQueue := channel.InQueue()


+ 2
- 0
peer/server.go View File

@ -27,11 +27,13 @@ func (s *Server) LocalAddress() *NetAddress {
// meant to run in a goroutine
func (s *Server) IncomingConnectionHandler() {
for conn := range s.listener.Connections() {
log.Infof("New connection found: %v", conn)
s.client.AddPeerWithConnection(conn, false)
}
}
func (s *Server) Stop() {
log.Infof("Stopping server")
s.listener.Stop()
s.client.Stop()
}

Loading…
Cancel
Save