Browse Source

.

pull/9/head
Jae Kwon 11 years ago
parent
commit
c895c6a586
12 changed files with 78 additions and 48 deletions
  1. +7
    -0
      binary/int.go
  2. +4
    -0
      merkle/iavl_tree.go
  3. +0
    -1
      peer/addrbook.go
  4. +15
    -14
      peer/client.go
  5. +3
    -6
      peer/connection.go
  6. +8
    -7
      peer/connection_test.go
  7. +4
    -0
      peer/filter.go
  8. +10
    -10
      peer/listener.go
  9. +5
    -0
      peer/msg.go
  10. +13
    -8
      peer/peer.go
  11. +2
    -2
      peer/server.go
  12. +7
    -0
      peer/upnp.go

+ 7
- 0
binary/int.go View File

@ -117,6 +117,13 @@ func ReadUInt8(r io.Reader) UInt8 {
return UInt8(buf[0]) return UInt8(buf[0])
} }
func ReadUInt8Safe(r io.Reader) (UInt8, error) {
buf := [1]byte{0}
_, err := io.ReadFull(r, buf[:])
if err != nil { return UInt8(0), err }
return UInt8(buf[0]), nil
}
// Int16 // Int16


+ 4
- 0
merkle/iavl_tree.go View File

@ -1,5 +1,9 @@
package merkle package merkle
import (
. "github.com/tendermint/tendermint/binary"
)
const HASH_BYTE_SIZE int = 4+32 const HASH_BYTE_SIZE int = 4+32
/* /*


+ 0
- 1
peer/addrbook.go View File

@ -95,7 +95,6 @@ func NewAddrBook(filePath string) *AddrBook {
am := AddrBook{ am := AddrBook{
rand: rand.New(rand.NewSource(time.Now().UnixNano())), rand: rand.New(rand.NewSource(time.Now().UnixNano())),
quit: make(chan struct{}), quit: make(chan struct{}),
localAddresses: make(map[string]*localAddress),
filePath: filePath, filePath: filePath,
} }
am.init() am.init()


+ 15
- 14
peer/client.go View File

@ -1,11 +1,11 @@
package peer package peer
import ( import (
. "github.com/tendermint/tendermint/common"
. "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/merkle" "github.com/tendermint/tendermint/merkle"
"atomic"
"sync/atomic"
"sync" "sync"
"io"
"errors" "errors"
) )
@ -38,14 +38,14 @@ var (
CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer") CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer")
) )
func NewClient(newPeerCb func(*Connect) *Peer) *Client {
func NewClient(newPeerCb func(*Connection) *Peer) *Client {
self := newPeerCb(nil) self := newPeerCb(nil)
if self == nil { if self == nil {
Panicf("newPeerCb(nil) must return a prototypical peer for self") Panicf("newPeerCb(nil) must return a prototypical peer for self")
} }
inQueues := make(map[String]chan *InboundMsg) inQueues := make(map[String]chan *InboundMsg)
for chName, channel := peer.channels {
for chName, channel := range self.channels {
inQueues[chName] = make(chan *InboundMsg) inQueues[chName] = make(chan *InboundMsg)
} }
@ -103,33 +103,33 @@ func (c *Client) Broadcast(chName String, msg Msg) {
func (c *Client) PopMessage(chName String) *InboundMsg { func (c *Client) PopMessage(chName String) *InboundMsg {
if atomic.LoadUint32(&c.stopped) == 1 { return nil } if atomic.LoadUint32(&c.stopped) == 1 { return nil }
channel := c.Channel(chName)
channel := c.self.Channel(chName)
q := c.inQueues[chName] q := c.inQueues[chName]
if q == nil { Panicf("Expected inQueues[%f], found none", chName) } if q == nil { Panicf("Expected inQueues[%f], found none", chName) }
for { for {
select { select {
case <-quit:
case <-c.quit:
return nil return nil
case msg := <-q:
case inMsg := <-q:
// skip if known. // skip if known.
if channel.Filter().Has(msg) {
if channel.Filter().Has(inMsg.Msg) {
continue continue
} }
return msg
return inMsg
} }
} }
} }
// Updates self's filter for a channel & broadcasts it. // Updates self's filter for a channel & broadcasts it.
// TODO: maybe don't expose this
// TODO: rename, same name is confusing.
func (c *Client) UpdateFilter(chName String, filter Filter) { func (c *Client) UpdateFilter(chName String, filter Filter) {
if atomic.LoadUint32(&c.stopped) == 1 { return } if atomic.LoadUint32(&c.stopped) == 1 { return }
c.self.Channel(chName).UpdateFilter(filter) c.self.Channel(chName).UpdateFilter(filter)
c.Broadcast("", &NewFilterMsg{ c.Broadcast("", &NewFilterMsg{
Channel: chName,
ChName: chName,
Filter: filter, Filter: filter,
}) })
} }
@ -137,12 +137,13 @@ func (c *Client) UpdateFilter(chName String, filter Filter) {
func (c *Client) StopPeer(peer *Peer) { func (c *Client) StopPeer(peer *Peer) {
// lock // lock
c.mtx.Lock() c.mtx.Lock()
p, _ := c.peers.Remove(peer.RemoteAddress())
peerValue, _ := c.peers.Remove(peer.RemoteAddress())
c.mtx.Unlock() c.mtx.Unlock()
// unlock // unlock
if p != nil {
p.Stop()
peer_ := peerValue.(*Peer)
if peer_ != nil {
peer_.Stop()
} }
} }


+ 3
- 6
peer/connection.go View File

@ -3,11 +3,8 @@ package peer
import ( import (
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
. "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/binary"
"atomic"
"sync"
"sync/atomic"
"net" "net"
"runtime"
"fmt"
"time" "time"
) )
@ -24,7 +21,7 @@ type Connection struct {
outQueue chan ByteSlice // never closes. outQueue chan ByteSlice // never closes.
conn net.Conn conn net.Conn
quit chan struct{} quit chan struct{}
stopped int32
stopped uint32
pingDebouncer *Debouncer pingDebouncer *Debouncer
pong chan struct{} pong chan struct{}
} }
@ -63,7 +60,7 @@ func (c *Connection) Start() {
} }
func (c *Connection) Stop() { func (c *Connection) Stop() {
if atomic.SwapAndCompare(&c.stopped, 0, 1) {
if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
close(c.quit) close(c.quit)
c.conn.Close() c.conn.Close()
c.pingDebouncer.Stop() c.pingDebouncer.Stop()


+ 8
- 7
peer/connection_test.go View File

@ -1,6 +1,7 @@
package peer package peer
import ( import (
. "github.com/tendermint/tendermint/binary"
"testing" "testing"
"time" "time"
) )
@ -13,19 +14,19 @@ func TestLocalConnection(t *testing.T) {
ch1 := NewChannel(String("ch1"), ch1 := NewChannel(String("ch1"),
nil, nil,
// XXX these channels should be buffered. // XXX these channels should be buffered.
make(chan ByteSlice),
make(chan ByteSlice),
make(chan Msg),
make(chan Msg),
) )
ch2 := NewChannel(String("ch2"), ch2 := NewChannel(String("ch2"),
nil, nil,
make(chan ByteSlice),
make(chan ByteSlice),
make(chan Msg),
make(chan Msg),
) )
channels := make(map[String]*Channel) channels := make(map[String]*Channel)
channels[ch1.Name] = ch1
channels[ch2.Name] = ch2
channels[ch1.Name()] = ch1
channels[ch2.Name()] = ch2
p.channels = channels p.channels = channels
return p return p
@ -44,7 +45,7 @@ func TestLocalConnection(t *testing.T) {
c1.Broadcast(String(""), String("message")) c1.Broadcast(String(""), String("message"))
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
inMsg := c2.PopMessage()
inMsg := c2.PopMessage(String(""))
c1.Stop() c1.Stop()
c2.Stop() c2.Stop()


+ 4
- 0
peer/filter.go View File

@ -1,5 +1,9 @@
package peer package peer
import (
. "github.com/tendermint/tendermint/binary"
)
/* Filter /* Filter
A Filter could be a bloom filter for lossy filtering, or could be a lossless filter. A Filter could be a bloom filter for lossy filtering, or could be a lossless filter.


+ 10
- 10
peer/listener.go View File

@ -1,7 +1,7 @@
package peer package peer
import ( import (
"atomic"
"sync/atomic"
"net" "net"
) )
@ -26,21 +26,21 @@ const (
DEFAULT_BUFFERED_CONNECTIONS = 10 DEFAULT_BUFFERED_CONNECTIONS = 10
) )
func NewListener(protocol string, laddr string) *Listener {
func NewListener(protocol string, laddr string) Listener {
ln, err := net.Listen(protocol, laddr) ln, err := net.Listen(protocol, laddr)
if err != nil { panic(err) } if err != nil { panic(err) }
s := &Listener{
dl := &DefaultListener{
listener: ln, listener: ln,
connections: make(chan *Connection, DEFAULT_BUFFERED_CONNECTIONS), connections: make(chan *Connection, DEFAULT_BUFFERED_CONNECTIONS),
} }
go l.listenHandler()
go dl.listenHandler()
return s
return dl
} }
func (l *Listener) listenHandler() {
func (l *DefaultListener) listenHandler() {
for { for {
conn, err := l.listener.Accept() conn, err := l.listener.Accept()
@ -50,7 +50,7 @@ func (l *Listener) listenHandler() {
// yet we encountered an error. // yet we encountered an error.
if err != nil { panic(err) } if err != nil { panic(err) }
c := NewConnection(con)
c := NewConnection(conn)
l.connections <- c l.connections <- c
} }
@ -61,15 +61,15 @@ func (l *Listener) listenHandler() {
} }
} }
func (l *Listener) Connections() <-chan *Connection {
func (l *DefaultListener) Connections() <-chan *Connection {
return l.connections return l.connections
} }
func (l *Listener) LocalAddress() *NetAddress {
func (l *DefaultListener) LocalAddress() *NetAddress {
return NewNetAddress(l.listener.Addr()) return NewNetAddress(l.listener.Addr())
} }
func (l *Listener) Stop() {
func (l *DefaultListener) Stop() {
if atomic.CompareAndSwapUint32(&l.stopped, 0, 1) { if atomic.CompareAndSwapUint32(&l.stopped, 0, 1) {
l.listener.Close() l.listener.Close()
} }


+ 5
- 0
peer/msg.go View File

@ -1,5 +1,10 @@
package peer package peer
import (
. "github.com/tendermint/tendermint/binary"
"io"
)
/* Msg */ /* Msg */
type Msg struct { type Msg struct {


+ 13
- 8
peer/peer.go View File

@ -1,8 +1,11 @@
package peer package peer
import ( import (
"atomic"
. "github.com/tendermint/tendermint/binary"
"sync/atomic"
"sync" "sync"
"io"
"time"
) )
/* Peer */ /* Peer */
@ -82,7 +85,7 @@ func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) {
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
case <-quit:
case <-p.quit:
break FOR_LOOP break FOR_LOOP
case msg := <-inQueue: case msg := <-inQueue:
// add to channel filter // add to channel filter
@ -91,11 +94,11 @@ func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) {
inboundMsg := &InboundMsg{ inboundMsg := &InboundMsg{
Peer: p, Peer: p,
Channel: channel, Channel: channel,
Time: Time(time.Now()),
Time: Time{time.Now()},
Msg: msg, Msg: msg,
} }
select { select {
case <-quit:
case <-p.quit:
break FOR_LOOP break FOR_LOOP
case inboundMsgQueue <- inboundMsg: case inboundMsgQueue <- inboundMsg:
continue continue
@ -108,11 +111,11 @@ func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) {
} }
func (p *Peer) outHandler(chName String) { func (p *Peer) outHandler(chName String) {
outQueue := p.channels[chName].OutQueue()
outQueue := p.channels[chName].outQueue
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
case <-quit:
case <-p.quit:
break FOR_LOOP break FOR_LOOP
case msg := <-outQueue: case msg := <-outQueue:
// blocks until the connection is Stop'd, // blocks until the connection is Stop'd,
@ -148,6 +151,10 @@ func NewChannel(name String, filter Filter, in, out chan Msg) *Channel {
} }
} }
func (c *Channel) Name() String {
return c.name
}
func (c *Channel) InQueue() <-chan Msg { func (c *Channel) InQueue() <-chan Msg {
return c.inQueue return c.inQueue
} }
@ -164,7 +171,6 @@ func (c *Channel) Has(msg Msg) bool {
return c.Filter().Has(msg) return c.Filter().Has(msg)
} }
// TODO: maybe don't expose this
func (c *Channel) Filter() Filter { func (c *Channel) Filter() Filter {
// lock & defer // lock & defer
c.mtx.Lock(); defer c.mtx.Unlock() c.mtx.Lock(); defer c.mtx.Unlock()
@ -172,7 +178,6 @@ func (c *Channel) Filter() Filter {
// unlock deferred // unlock deferred
} }
// TODO: maybe don't expose this
func (c *Channel) UpdateFilter(filter Filter) { func (c *Channel) UpdateFilter(filter Filter) {
// lock // lock
c.mtx.Lock() c.mtx.Lock()


+ 2
- 2
peer/server.go View File

@ -15,14 +15,14 @@ func NewServer(l Listener, c *Client) *Server {
listener: l, listener: l,
client: c, client: c,
} }
go s.IncomingConnectionsHandler()
go s.IncomingConnectionHandler()
return s return s
} }
// meant to run in a goroutine // meant to run in a goroutine
func (s *Server) IncomingConnectionHandler() { func (s *Server) IncomingConnectionHandler() {
for conn := range s.listener.Connections() { for conn := range s.listener.Connections() {
s.client.AddIncomingConnection(conn)
s.client.AddPeerWithConnection(conn, false)
} }
} }


+ 7
- 0
peer/upnp.go View File

@ -23,6 +23,13 @@ type upnpNAT struct {
urnDomain string urnDomain string
} }
// protocol is either "udp" or "tcp"
type NAT interface {
GetExternalAddress() (addr net.IP, err error)
AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error)
DeletePortMapping(protocol string, externalPort, internalPort int) (err error)
}
func Discover() (nat NAT, err error) { func Discover() (nat NAT, err error) {
ssdp, err := net.ResolveUDPAddr("udp4", "239.255.255.250:1900") ssdp, err := net.ResolveUDPAddr("udp4", "239.255.255.250:1900")
if err != nil { if err != nil {


Loading…
Cancel
Save