From 1789f39a3e0c1f326cf19b2085006c9d3b841efe Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Fri, 27 Jun 2014 17:01:59 -0700 Subject: [PATCH] Channels do not have integrated filters. They're only for multiplexing. --- peer/README.md | 6 +++- peer/client.go | 15 +-------- peer/connection.go | 1 + peer/connection_test.go | 38 +++++++---------------- peer/filter.go | 20 ------------ peer/listener.go | 57 ++++++++++++++++++++++++++++++++--- peer/peer.go | 46 +++------------------------- peer/server.go | 9 +++++- peer/set.go | 67 +++++++++++++++++++++++++++++++++++++++++ peer/upnp.go | 1 + peer/upnp_test.go | 48 +++++++++++++++++++++++++++++ 11 files changed, 200 insertions(+), 108 deletions(-) delete mode 100644 peer/filter.go create mode 100644 peer/set.go create mode 100644 peer/upnp_test.go diff --git a/peer/README.md b/peer/README.md index ee6da1677..8527fb073 100644 --- a/peer/README.md +++ b/peer/README.md @@ -99,7 +99,7 @@ The consensus channel broadcasts all information used in the rounds of the Tende Filter Bitarray filter
- Each validator has a predetermined index in teh bitarray
+ Each validator has a predetermined index in the bitarray
Refreshes every new consensus round @@ -114,3 +114,7 @@ The consensus channel broadcasts all information used in the rounds of the Tende + +## Resources + +* http://www.upnp-hacks.org/upnp.html diff --git a/peer/client.go b/peer/client.go index 2cc7171ce..bb3d91c1d 100644 --- a/peer/client.go +++ b/peer/client.go @@ -113,7 +113,7 @@ func (c *Client) PopMessage(chName String) *InboundMsg { return nil case inMsg := <-q: // skip if known. - if channel.Filter().Has(inMsg.Msg) { + if channel.Has(inMsg.Msg) { continue } return inMsg @@ -121,19 +121,6 @@ func (c *Client) PopMessage(chName String) *InboundMsg { } } -// Updates self's filter for a channel & broadcasts it. -// TODO: rename, same name is confusing. -func (c *Client) UpdateFilter(chName String, filter Filter) { - if atomic.LoadUint32(&c.stopped) == 1 { return } - - c.self.Channel(chName).UpdateFilter(filter) - - c.Broadcast("", &NewFilterMsg{ - ChName: chName, - Filter: filter, - }) -} - func (c *Client) StopPeer(peer *Peer) { // lock c.mtx.Lock() diff --git a/peer/connection.go b/peer/connection.go index a7eb53c8f..b03311f43 100644 --- a/peer/connection.go +++ b/peer/connection.go @@ -141,6 +141,7 @@ func (c *Connection) inHandler() { } // What to do? // TODO + log.Tracef("%v", msg) default: Panicf("Unknown message type %v", msgType) diff --git a/peer/connection_test.go b/peer/connection_test.go index f7b931f4f..b124fe9db 100644 --- a/peer/connection_test.go +++ b/peer/connection_test.go @@ -8,40 +8,24 @@ import ( func TestLocalConnection(t *testing.T) { - c1 := NewClient(func(conn *Connection) *Peer { + makePeer := func(conn *Connection) *Peer { + bufferSize := 10 p := &Peer{conn: conn} - - ch1 := NewChannel(String("ch1"), - nil, - // XXX these channels should be buffered. - make(chan Msg), - make(chan Msg), - ) - - ch2 := NewChannel(String("ch2"), - nil, - make(chan Msg), - make(chan Msg), - ) - - channels := make(map[String]*Channel) - channels[ch1.Name()] = ch1 - channels[ch2.Name()] = ch2 - p.channels = channels - + p.channels := map[String]*Channel{} + p.channels["ch1"] = NewChannel("ch1", bufferSize) + p.channels["ch2"] = NewChannel("ch2", bufferSize) return p - }) + } - // XXX make c2 like c1. + c1 := NewClient(makePeer) + c2 := NewClient(makePeer) - c2 := NewClient(func(conn *Connection) *Peer { - return nil - }) + s1 := NewServer("tcp", "127.0.0.1:8001", c1) - // XXX clients don't have "local addresses" - c1.ConnectTo(c2.LocalAddress()) + c2.ConnectTo(c1.LocalAddress()) // lets send a message from c1 to c2. + // XXX do we even want a broadcast function? c1.Broadcast(String(""), String("message")) time.Sleep(500 * time.Millisecond) diff --git a/peer/filter.go b/peer/filter.go deleted file mode 100644 index aecde4eec..000000000 --- a/peer/filter.go +++ /dev/null @@ -1,20 +0,0 @@ -package peer - -import ( - . "github.com/tendermint/tendermint/binary" -) - -/* Filter - - A Filter could be a bloom filter for lossy filtering, or could be a lossless filter. - Either way, it's used to keep track of what a peer knows of. -*/ -type Filter interface { - Binary - Add(Msg) - Has(Msg) bool - - // Loads a new filter. - // Convenience factory method - Load(ByteSlice) Filter -} diff --git a/peer/listener.go b/peer/listener.go index 46f5f9a89..a314c599f 100644 --- a/peer/listener.go +++ b/peer/listener.go @@ -5,6 +5,10 @@ import ( "net" ) +const ( + DEFAULT_PORT = 8001 +) + /* Listener */ type Listener interface { @@ -26,12 +30,12 @@ const ( DEFAULT_BUFFERED_CONNECTIONS = 10 ) -func NewListener(protocol string, laddr string) Listener { - ln, err := net.Listen(protocol, laddr) +func NewDefaultListener(protocol string, listenAddr string) Listener { + listener, err := net.Listen(protocol, listenAddr) if err != nil { panic(err) } dl := &DefaultListener{ - listener: ln, + listener: listener, connections: make(chan *Connection, DEFAULT_BUFFERED_CONNECTIONS), } @@ -66,7 +70,7 @@ func (l *DefaultListener) Connections() <-chan *Connection { } func (l *DefaultListener) LocalAddress() *NetAddress { - return NewNetAddress(l.listener.Addr()) + return GetLocalAddress() } func (l *DefaultListener) Stop() { @@ -74,3 +78,48 @@ func (l *DefaultListener) Stop() { l.listener.Close() } } + + +/* local address helpers */ + +func GetLocalAddress() *NetAddress { + laddr := GetUPNPLocalAddress() + if laddr != nil { return laddr } + + laddr = GetDefaultLocalAddress() + if laddr != nil { return laddr } + + panic("Could not determine local address") +} + +// UPNP external address discovery & port mapping +// TODO: more flexible internal & external ports +func GetUPNPLocalAddress() *NetAddress { + nat, err := Discover() + if err != nil { return nil } + + ext, err := nat.GetExternalAddress() + if err != nil { return nil } + + _, err := nat.AddPortMapping("tcp", DEFAULT_PORT, DEFAULT_PORT, "tendermint", 0) + if err != nil { return nil } + + return NewNetAddressIPPort(ext, DEFAULT_PORT) +} + +// Naive local IPv4 interface address detection +// TODO: use syscalls to get actual ourIP. http://pastebin.com/9exZG4rh +func GetDefaultLocalAddress() *NetAddress { + addrs, err := net.InterfaceAddrs() + if err != nil { panic("Wtf") } + for _, a := range addrs { + ipnet, ok := a.(*net.IPNet) + if !ok { continue } + v4 := ipnet.IP.To4() + if v4 == nil || v4[0] == 127 { continue } // loopback + return NewNetAddress(a) + } + return nil +} + + diff --git a/peer/peer.go b/peer/peer.go index 30bb00ddb..8c84c0350 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -50,18 +50,12 @@ func (p *Peer) Channel(chName String) *Channel { return p.channels[chName] } -// If msg isn't already in the peer's filter, then -// queue the msg for output. +// Queue the msg for output. // If the queue is full, just return false. func (p *Peer) TryQueueOut(chName String, msg Msg) bool { channel := p.Channel(chName) outQueue := channel.OutQueue() - // just return if already in filter - if channel.Filter().Has(msg) { - return true - } - // lock & defer p.mtx.Lock(); defer p.mtx.Unlock() if p.stopped == 1 { return false } @@ -88,8 +82,6 @@ func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) { case <-p.quit: break FOR_LOOP case msg := <-inQueue: - // add to channel filter - channel.Filter().Add(msg) // send to inboundMsgQueue inboundMsg := &InboundMsg{ Peer: p, @@ -133,21 +125,16 @@ func (p *Peer) outHandler(chName String) { type Channel struct { name String - - mtx sync.Mutex - filter Filter - inQueue chan Msg outQueue chan Msg //stats Stats } -func NewChannel(name String, filter Filter, in, out chan Msg) *Channel { +func NewChannel(name string, bufferSize int) *Channel { return &Channel{ - name: name, - filter: filter, - inQueue: in, - outQueue: out, + name: String(name), + inQueue: make(chan Msg, bufferSize), + outQueue: make(chan Msg, buffersize), } } @@ -162,26 +149,3 @@ func (c *Channel) InQueue() <-chan Msg { func (c *Channel) OutQueue() chan<- Msg { return c.outQueue } - -func (c *Channel) Add(msg Msg) { - c.Filter().Add(msg) -} - -func (c *Channel) Has(msg Msg) bool { - return c.Filter().Has(msg) -} - -func (c *Channel) Filter() Filter { - // lock & defer - c.mtx.Lock(); defer c.mtx.Unlock() - return c.filter - // unlock deferred -} - -func (c *Channel) UpdateFilter(filter Filter) { - // lock - c.mtx.Lock() - c.filter = filter - c.mtx.Unlock() - // unlock -} diff --git a/peer/server.go b/peer/server.go index de168c9b6..65ea30f72 100644 --- a/peer/server.go +++ b/peer/server.go @@ -1,6 +1,8 @@ package peer import ( + "sync/atomic" + "net" ) /* Server */ @@ -10,7 +12,8 @@ type Server struct { client *Client } -func NewServer(l Listener, c *Client) *Server { +func NewServer(protocol string, laddr string, c *Client) *Server { + l := NewListener(protocol, laddr) s := &Server{ listener: l, client: c, @@ -19,6 +22,10 @@ func NewServer(l Listener, c *Client) *Server { return s } +func (s *Server) LocalAddress() *NetAddress { + return s.listener.LocalAddress() +} + // meant to run in a goroutine func (s *Server) IncomingConnectionHandler() { for conn := range s.listener.Connections() { diff --git a/peer/set.go b/peer/set.go new file mode 100644 index 000000000..d832830c2 --- /dev/null +++ b/peer/set.go @@ -0,0 +1,67 @@ +package peer + +import ( + . "github.com/tendermint/tendermint/binary" + "io" +) + +/* Set + + A Set could be a bloom filter for lossy filtering, or could be a lossless filter. +*/ +type Set interface { + Binary + Add(Msg) + Has(Msg) bool + + // Loads a new set. + // Convenience factory method + Load(ByteSlice) Set +} + + +/* BloomFilterSet */ + +type BloomFilterSet struct { + lastBlockHeight UInt64 + lastHeaderHeight UInt64 +} + +func (bs *BloomFilterSet) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteOnto(String("block"), w, n, err) + n, err = WriteOnto(bs.lastBlockHeight, w, n, err) + n, err = WriteOnto(bs.lastHeaderHeight, w, n, err) + return +} + +func (bs *BloomFilterSet) Add(msg Msg) { +} + +func (bs *BloomFilterSet) Has(msg Msg) bool { + return false +} + +func (bs *BloomFilterSet) Load(bytes ByteSlice) Set { + return nil +} + + +/* BitarraySet */ + +type BlockSet struct { +} + +func (bs *BlockSet) WriteTo(w io.Writer) (n int64, err error) { + return +} + +func (bs *BlockSet) Add(msg Msg) { +} + +func (bs *BlockSet) Has(msg Msg) bool { + return false +} + +func (bs *BlockSet) Load(bytes ByteSlice) Set { + return nil +} diff --git a/peer/upnp.go b/peer/upnp.go index 084780001..920bc329e 100644 --- a/peer/upnp.go +++ b/peer/upnp.go @@ -1,4 +1,5 @@ // from taipei-torrent +// TODO: use syscalls to get actual ourIP. http://pastebin.com/9exZG4rh package peer diff --git a/peer/upnp_test.go b/peer/upnp_test.go new file mode 100644 index 000000000..9735949f8 --- /dev/null +++ b/peer/upnp_test.go @@ -0,0 +1,48 @@ +package peer + +import ( + "testing" + "time" +) + +/* +This is a manual test. +TODO: set up or find a service to probe open ports. +*/ + +func TestUPNP(t *testing.T) { + t.Log("hello!") + + nat, err := Discover() + if err != nil { + t.Fatalf("NAT upnp could not be discovered: %v", err) + } + + t.Log("ourIP: ", nat.(*upnpNAT).ourIP) + + ext, err := nat.GetExternalAddress() + if err != nil { + t.Fatalf("External address error: %v", err) + } + t.Logf("External address: %v", ext) + + port, err := nat.AddPortMapping("tcp", 8001, 8001, "testing", 0) + if err != nil { + t.Fatalf("Port mapping error: %v", err) + } + t.Logf("Port mapping mapped: %v", port) + + // also run the listener, open for all remote addresses. + listener := NewDefaultListener("tcp", "0.0.0.0:8001") + + // now sleep for 10 seconds + time.Sleep(10 * time.Second) + + err = nat.DeletePortMapping("tcp", 8001, 8001) + if err != nil { + t.Fatalf("Port mapping delete error: %v", err) + } + t.Logf("Port mapping deleted") + + listener.Stop() +}