diff --git a/peer/README.md b/peer/README.md
index ee6da1677..8527fb073 100644
--- a/peer/README.md
+++ b/peer/README.md
@@ -99,7 +99,7 @@ The consensus channel broadcasts all information used in the rounds of the Tende
Filter |
Bitarray filter
- Each validator has a predetermined index in teh bitarray
+ Each validator has a predetermined index in the bitarray
Refreshes every new consensus round
|
@@ -114,3 +114,7 @@ The consensus channel broadcasts all information used in the rounds of the Tende
+
+## Resources
+
+* http://www.upnp-hacks.org/upnp.html
diff --git a/peer/client.go b/peer/client.go
index 2cc7171ce..bb3d91c1d 100644
--- a/peer/client.go
+++ b/peer/client.go
@@ -113,7 +113,7 @@ func (c *Client) PopMessage(chName String) *InboundMsg {
return nil
case inMsg := <-q:
// skip if known.
- if channel.Filter().Has(inMsg.Msg) {
+ if channel.Has(inMsg.Msg) {
continue
}
return inMsg
@@ -121,19 +121,6 @@ func (c *Client) PopMessage(chName String) *InboundMsg {
}
}
-// Updates self's filter for a channel & broadcasts it.
-// TODO: rename, same name is confusing.
-func (c *Client) UpdateFilter(chName String, filter Filter) {
- if atomic.LoadUint32(&c.stopped) == 1 { return }
-
- c.self.Channel(chName).UpdateFilter(filter)
-
- c.Broadcast("", &NewFilterMsg{
- ChName: chName,
- Filter: filter,
- })
-}
-
func (c *Client) StopPeer(peer *Peer) {
// lock
c.mtx.Lock()
diff --git a/peer/connection.go b/peer/connection.go
index a7eb53c8f..b03311f43 100644
--- a/peer/connection.go
+++ b/peer/connection.go
@@ -141,6 +141,7 @@ func (c *Connection) inHandler() {
}
// What to do?
// TODO
+ log.Tracef("%v", msg)
default:
Panicf("Unknown message type %v", msgType)
diff --git a/peer/connection_test.go b/peer/connection_test.go
index f7b931f4f..b124fe9db 100644
--- a/peer/connection_test.go
+++ b/peer/connection_test.go
@@ -8,40 +8,24 @@ import (
func TestLocalConnection(t *testing.T) {
- c1 := NewClient(func(conn *Connection) *Peer {
+ makePeer := func(conn *Connection) *Peer {
+ bufferSize := 10
p := &Peer{conn: conn}
-
- ch1 := NewChannel(String("ch1"),
- nil,
- // XXX these channels should be buffered.
- make(chan Msg),
- make(chan Msg),
- )
-
- ch2 := NewChannel(String("ch2"),
- nil,
- make(chan Msg),
- make(chan Msg),
- )
-
- channels := make(map[String]*Channel)
- channels[ch1.Name()] = ch1
- channels[ch2.Name()] = ch2
- p.channels = channels
-
+ p.channels := map[String]*Channel{}
+ p.channels["ch1"] = NewChannel("ch1", bufferSize)
+ p.channels["ch2"] = NewChannel("ch2", bufferSize)
return p
- })
+ }
- // XXX make c2 like c1.
+ c1 := NewClient(makePeer)
+ c2 := NewClient(makePeer)
- c2 := NewClient(func(conn *Connection) *Peer {
- return nil
- })
+ s1 := NewServer("tcp", "127.0.0.1:8001", c1)
- // XXX clients don't have "local addresses"
- c1.ConnectTo(c2.LocalAddress())
+ c2.ConnectTo(c1.LocalAddress())
// lets send a message from c1 to c2.
+ // XXX do we even want a broadcast function?
c1.Broadcast(String(""), String("message"))
time.Sleep(500 * time.Millisecond)
diff --git a/peer/filter.go b/peer/filter.go
deleted file mode 100644
index aecde4eec..000000000
--- a/peer/filter.go
+++ /dev/null
@@ -1,20 +0,0 @@
-package peer
-
-import (
- . "github.com/tendermint/tendermint/binary"
-)
-
-/* Filter
-
- A Filter could be a bloom filter for lossy filtering, or could be a lossless filter.
- Either way, it's used to keep track of what a peer knows of.
-*/
-type Filter interface {
- Binary
- Add(Msg)
- Has(Msg) bool
-
- // Loads a new filter.
- // Convenience factory method
- Load(ByteSlice) Filter
-}
diff --git a/peer/listener.go b/peer/listener.go
index 46f5f9a89..a314c599f 100644
--- a/peer/listener.go
+++ b/peer/listener.go
@@ -5,6 +5,10 @@ import (
"net"
)
+const (
+ DEFAULT_PORT = 8001
+)
+
/* Listener */
type Listener interface {
@@ -26,12 +30,12 @@ const (
DEFAULT_BUFFERED_CONNECTIONS = 10
)
-func NewListener(protocol string, laddr string) Listener {
- ln, err := net.Listen(protocol, laddr)
+func NewDefaultListener(protocol string, listenAddr string) Listener {
+ listener, err := net.Listen(protocol, listenAddr)
if err != nil { panic(err) }
dl := &DefaultListener{
- listener: ln,
+ listener: listener,
connections: make(chan *Connection, DEFAULT_BUFFERED_CONNECTIONS),
}
@@ -66,7 +70,7 @@ func (l *DefaultListener) Connections() <-chan *Connection {
}
func (l *DefaultListener) LocalAddress() *NetAddress {
- return NewNetAddress(l.listener.Addr())
+ return GetLocalAddress()
}
func (l *DefaultListener) Stop() {
@@ -74,3 +78,48 @@ func (l *DefaultListener) Stop() {
l.listener.Close()
}
}
+
+
+/* local address helpers */
+
+func GetLocalAddress() *NetAddress {
+ laddr := GetUPNPLocalAddress()
+ if laddr != nil { return laddr }
+
+ laddr = GetDefaultLocalAddress()
+ if laddr != nil { return laddr }
+
+ panic("Could not determine local address")
+}
+
+// UPNP external address discovery & port mapping
+// TODO: more flexible internal & external ports
+func GetUPNPLocalAddress() *NetAddress {
+ nat, err := Discover()
+ if err != nil { return nil }
+
+ ext, err := nat.GetExternalAddress()
+ if err != nil { return nil }
+
+ _, err := nat.AddPortMapping("tcp", DEFAULT_PORT, DEFAULT_PORT, "tendermint", 0)
+ if err != nil { return nil }
+
+ return NewNetAddressIPPort(ext, DEFAULT_PORT)
+}
+
+// Naive local IPv4 interface address detection
+// TODO: use syscalls to get actual ourIP. http://pastebin.com/9exZG4rh
+func GetDefaultLocalAddress() *NetAddress {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil { panic("Wtf") }
+ for _, a := range addrs {
+ ipnet, ok := a.(*net.IPNet)
+ if !ok { continue }
+ v4 := ipnet.IP.To4()
+ if v4 == nil || v4[0] == 127 { continue } // loopback
+ return NewNetAddress(a)
+ }
+ return nil
+}
+
+
diff --git a/peer/peer.go b/peer/peer.go
index 30bb00ddb..8c84c0350 100644
--- a/peer/peer.go
+++ b/peer/peer.go
@@ -50,18 +50,12 @@ func (p *Peer) Channel(chName String) *Channel {
return p.channels[chName]
}
-// If msg isn't already in the peer's filter, then
-// queue the msg for output.
+// Queue the msg for output.
// If the queue is full, just return false.
func (p *Peer) TryQueueOut(chName String, msg Msg) bool {
channel := p.Channel(chName)
outQueue := channel.OutQueue()
- // just return if already in filter
- if channel.Filter().Has(msg) {
- return true
- }
-
// lock & defer
p.mtx.Lock(); defer p.mtx.Unlock()
if p.stopped == 1 { return false }
@@ -88,8 +82,6 @@ func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) {
case <-p.quit:
break FOR_LOOP
case msg := <-inQueue:
- // add to channel filter
- channel.Filter().Add(msg)
// send to inboundMsgQueue
inboundMsg := &InboundMsg{
Peer: p,
@@ -133,21 +125,16 @@ func (p *Peer) outHandler(chName String) {
type Channel struct {
name String
-
- mtx sync.Mutex
- filter Filter
-
inQueue chan Msg
outQueue chan Msg
//stats Stats
}
-func NewChannel(name String, filter Filter, in, out chan Msg) *Channel {
+func NewChannel(name string, bufferSize int) *Channel {
return &Channel{
- name: name,
- filter: filter,
- inQueue: in,
- outQueue: out,
+ name: String(name),
+ inQueue: make(chan Msg, bufferSize),
+ outQueue: make(chan Msg, buffersize),
}
}
@@ -162,26 +149,3 @@ func (c *Channel) InQueue() <-chan Msg {
func (c *Channel) OutQueue() chan<- Msg {
return c.outQueue
}
-
-func (c *Channel) Add(msg Msg) {
- c.Filter().Add(msg)
-}
-
-func (c *Channel) Has(msg Msg) bool {
- return c.Filter().Has(msg)
-}
-
-func (c *Channel) Filter() Filter {
- // lock & defer
- c.mtx.Lock(); defer c.mtx.Unlock()
- return c.filter
- // unlock deferred
-}
-
-func (c *Channel) UpdateFilter(filter Filter) {
- // lock
- c.mtx.Lock()
- c.filter = filter
- c.mtx.Unlock()
- // unlock
-}
diff --git a/peer/server.go b/peer/server.go
index de168c9b6..65ea30f72 100644
--- a/peer/server.go
+++ b/peer/server.go
@@ -1,6 +1,8 @@
package peer
import (
+ "sync/atomic"
+ "net"
)
/* Server */
@@ -10,7 +12,8 @@ type Server struct {
client *Client
}
-func NewServer(l Listener, c *Client) *Server {
+func NewServer(protocol string, laddr string, c *Client) *Server {
+ l := NewListener(protocol, laddr)
s := &Server{
listener: l,
client: c,
@@ -19,6 +22,10 @@ func NewServer(l Listener, c *Client) *Server {
return s
}
+func (s *Server) LocalAddress() *NetAddress {
+ return s.listener.LocalAddress()
+}
+
// meant to run in a goroutine
func (s *Server) IncomingConnectionHandler() {
for conn := range s.listener.Connections() {
diff --git a/peer/set.go b/peer/set.go
new file mode 100644
index 000000000..d832830c2
--- /dev/null
+++ b/peer/set.go
@@ -0,0 +1,67 @@
+package peer
+
+import (
+ . "github.com/tendermint/tendermint/binary"
+ "io"
+)
+
+/* Set
+
+ A Set could be a bloom filter for lossy filtering, or could be a lossless filter.
+*/
+type Set interface {
+ Binary
+ Add(Msg)
+ Has(Msg) bool
+
+ // Loads a new set.
+ // Convenience factory method
+ Load(ByteSlice) Set
+}
+
+
+/* BloomFilterSet */
+
+type BloomFilterSet struct {
+ lastBlockHeight UInt64
+ lastHeaderHeight UInt64
+}
+
+func (bs *BloomFilterSet) WriteTo(w io.Writer) (n int64, err error) {
+ n, err = WriteOnto(String("block"), w, n, err)
+ n, err = WriteOnto(bs.lastBlockHeight, w, n, err)
+ n, err = WriteOnto(bs.lastHeaderHeight, w, n, err)
+ return
+}
+
+func (bs *BloomFilterSet) Add(msg Msg) {
+}
+
+func (bs *BloomFilterSet) Has(msg Msg) bool {
+ return false
+}
+
+func (bs *BloomFilterSet) Load(bytes ByteSlice) Set {
+ return nil
+}
+
+
+/* BitarraySet */
+
+type BlockSet struct {
+}
+
+func (bs *BlockSet) WriteTo(w io.Writer) (n int64, err error) {
+ return
+}
+
+func (bs *BlockSet) Add(msg Msg) {
+}
+
+func (bs *BlockSet) Has(msg Msg) bool {
+ return false
+}
+
+func (bs *BlockSet) Load(bytes ByteSlice) Set {
+ return nil
+}
diff --git a/peer/upnp.go b/peer/upnp.go
index 084780001..920bc329e 100644
--- a/peer/upnp.go
+++ b/peer/upnp.go
@@ -1,4 +1,5 @@
// from taipei-torrent
+// TODO: use syscalls to get actual ourIP. http://pastebin.com/9exZG4rh
package peer
diff --git a/peer/upnp_test.go b/peer/upnp_test.go
new file mode 100644
index 000000000..9735949f8
--- /dev/null
+++ b/peer/upnp_test.go
@@ -0,0 +1,48 @@
+package peer
+
+import (
+ "testing"
+ "time"
+)
+
+/*
+This is a manual test.
+TODO: set up or find a service to probe open ports.
+*/
+
+func TestUPNP(t *testing.T) {
+ t.Log("hello!")
+
+ nat, err := Discover()
+ if err != nil {
+ t.Fatalf("NAT upnp could not be discovered: %v", err)
+ }
+
+ t.Log("ourIP: ", nat.(*upnpNAT).ourIP)
+
+ ext, err := nat.GetExternalAddress()
+ if err != nil {
+ t.Fatalf("External address error: %v", err)
+ }
+ t.Logf("External address: %v", ext)
+
+ port, err := nat.AddPortMapping("tcp", 8001, 8001, "testing", 0)
+ if err != nil {
+ t.Fatalf("Port mapping error: %v", err)
+ }
+ t.Logf("Port mapping mapped: %v", port)
+
+ // also run the listener, open for all remote addresses.
+ listener := NewDefaultListener("tcp", "0.0.0.0:8001")
+
+ // now sleep for 10 seconds
+ time.Sleep(10 * time.Second)
+
+ err = nat.DeletePortMapping("tcp", 8001, 8001)
+ if err != nil {
+ t.Fatalf("Port mapping delete error: %v", err)
+ }
+ t.Logf("Port mapping deleted")
+
+ listener.Stop()
+}