From c895c6a5862f21c57a283cf86851fec4ad841a74 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 25 Jun 2014 21:37:20 -0700 Subject: [PATCH] . --- binary/int.go | 7 +++++++ merkle/iavl_tree.go | 4 ++++ peer/addrbook.go | 1 - peer/client.go | 29 +++++++++++++++-------------- peer/connection.go | 9 +++------ peer/connection_test.go | 15 ++++++++------- peer/filter.go | 4 ++++ peer/listener.go | 20 ++++++++++---------- peer/msg.go | 5 +++++ peer/peer.go | 21 +++++++++++++-------- peer/server.go | 4 ++-- peer/upnp.go | 7 +++++++ 12 files changed, 78 insertions(+), 48 deletions(-) diff --git a/binary/int.go b/binary/int.go index 85bc21c78..351b2a23b 100644 --- a/binary/int.go +++ b/binary/int.go @@ -117,6 +117,13 @@ func ReadUInt8(r io.Reader) UInt8 { return UInt8(buf[0]) } +func ReadUInt8Safe(r io.Reader) (UInt8, error) { + buf := [1]byte{0} + _, err := io.ReadFull(r, buf[:]) + if err != nil { return UInt8(0), err } + return UInt8(buf[0]), nil +} + // Int16 diff --git a/merkle/iavl_tree.go b/merkle/iavl_tree.go index de2c6930c..a6c5493ee 100644 --- a/merkle/iavl_tree.go +++ b/merkle/iavl_tree.go @@ -1,5 +1,9 @@ package merkle +import ( + . "github.com/tendermint/tendermint/binary" +) + const HASH_BYTE_SIZE int = 4+32 /* diff --git a/peer/addrbook.go b/peer/addrbook.go index a833de8b5..d84712fa3 100644 --- a/peer/addrbook.go +++ b/peer/addrbook.go @@ -95,7 +95,6 @@ func NewAddrBook(filePath string) *AddrBook { am := AddrBook{ rand: rand.New(rand.NewSource(time.Now().UnixNano())), quit: make(chan struct{}), - localAddresses: make(map[string]*localAddress), filePath: filePath, } am.init() diff --git a/peer/client.go b/peer/client.go index c0962b289..2cc7171ce 100644 --- a/peer/client.go +++ b/peer/client.go @@ -1,11 +1,11 @@ package peer import ( + . "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/merkle" - "atomic" + "sync/atomic" "sync" - "io" "errors" ) @@ -38,14 +38,14 @@ var ( CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer") ) -func NewClient(newPeerCb func(*Connect) *Peer) *Client { +func NewClient(newPeerCb func(*Connection) *Peer) *Client { self := newPeerCb(nil) if self == nil { Panicf("newPeerCb(nil) must return a prototypical peer for self") } inQueues := make(map[String]chan *InboundMsg) - for chName, channel := peer.channels { + for chName, channel := range self.channels { inQueues[chName] = make(chan *InboundMsg) } @@ -103,33 +103,33 @@ func (c *Client) Broadcast(chName String, msg Msg) { func (c *Client) PopMessage(chName String) *InboundMsg { if atomic.LoadUint32(&c.stopped) == 1 { return nil } - channel := c.Channel(chName) + channel := c.self.Channel(chName) q := c.inQueues[chName] if q == nil { Panicf("Expected inQueues[%f], found none", chName) } for { select { - case <-quit: + case <-c.quit: return nil - case msg := <-q: + case inMsg := <-q: // skip if known. - if channel.Filter().Has(msg) { + if channel.Filter().Has(inMsg.Msg) { continue } - return msg + return inMsg } } } // Updates self's filter for a channel & broadcasts it. -// TODO: maybe don't expose this +// TODO: rename, same name is confusing. func (c *Client) UpdateFilter(chName String, filter Filter) { if atomic.LoadUint32(&c.stopped) == 1 { return } c.self.Channel(chName).UpdateFilter(filter) c.Broadcast("", &NewFilterMsg{ - Channel: chName, + ChName: chName, Filter: filter, }) } @@ -137,12 +137,13 @@ func (c *Client) UpdateFilter(chName String, filter Filter) { func (c *Client) StopPeer(peer *Peer) { // lock c.mtx.Lock() - p, _ := c.peers.Remove(peer.RemoteAddress()) + peerValue, _ := c.peers.Remove(peer.RemoteAddress()) c.mtx.Unlock() // unlock - if p != nil { - p.Stop() + peer_ := peerValue.(*Peer) + if peer_ != nil { + peer_.Stop() } } diff --git a/peer/connection.go b/peer/connection.go index 27e2f57c7..a7eb53c8f 100644 --- a/peer/connection.go +++ b/peer/connection.go @@ -3,11 +3,8 @@ package peer import ( . "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/binary" - "atomic" - "sync" + "sync/atomic" "net" - "runtime" - "fmt" "time" ) @@ -24,7 +21,7 @@ type Connection struct { outQueue chan ByteSlice // never closes. conn net.Conn quit chan struct{} - stopped int32 + stopped uint32 pingDebouncer *Debouncer pong chan struct{} } @@ -63,7 +60,7 @@ func (c *Connection) Start() { } func (c *Connection) Stop() { - if atomic.SwapAndCompare(&c.stopped, 0, 1) { + if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { close(c.quit) c.conn.Close() c.pingDebouncer.Stop() diff --git a/peer/connection_test.go b/peer/connection_test.go index e72e15d81..f7b931f4f 100644 --- a/peer/connection_test.go +++ b/peer/connection_test.go @@ -1,6 +1,7 @@ package peer import ( + . "github.com/tendermint/tendermint/binary" "testing" "time" ) @@ -13,19 +14,19 @@ func TestLocalConnection(t *testing.T) { ch1 := NewChannel(String("ch1"), nil, // XXX these channels should be buffered. - make(chan ByteSlice), - make(chan ByteSlice), + make(chan Msg), + make(chan Msg), ) ch2 := NewChannel(String("ch2"), nil, - make(chan ByteSlice), - make(chan ByteSlice), + make(chan Msg), + make(chan Msg), ) channels := make(map[String]*Channel) - channels[ch1.Name] = ch1 - channels[ch2.Name] = ch2 + channels[ch1.Name()] = ch1 + channels[ch2.Name()] = ch2 p.channels = channels return p @@ -44,7 +45,7 @@ func TestLocalConnection(t *testing.T) { c1.Broadcast(String(""), String("message")) time.Sleep(500 * time.Millisecond) - inMsg := c2.PopMessage() + inMsg := c2.PopMessage(String("")) c1.Stop() c2.Stop() diff --git a/peer/filter.go b/peer/filter.go index c0177e8ab..aecde4eec 100644 --- a/peer/filter.go +++ b/peer/filter.go @@ -1,5 +1,9 @@ package peer +import ( + . "github.com/tendermint/tendermint/binary" +) + /* Filter A Filter could be a bloom filter for lossy filtering, or could be a lossless filter. diff --git a/peer/listener.go b/peer/listener.go index c19a5a2e1..46f5f9a89 100644 --- a/peer/listener.go +++ b/peer/listener.go @@ -1,7 +1,7 @@ package peer import ( - "atomic" + "sync/atomic" "net" ) @@ -26,21 +26,21 @@ const ( DEFAULT_BUFFERED_CONNECTIONS = 10 ) -func NewListener(protocol string, laddr string) *Listener { +func NewListener(protocol string, laddr string) Listener { ln, err := net.Listen(protocol, laddr) if err != nil { panic(err) } - s := &Listener{ + dl := &DefaultListener{ listener: ln, connections: make(chan *Connection, DEFAULT_BUFFERED_CONNECTIONS), } - go l.listenHandler() + go dl.listenHandler() - return s + return dl } -func (l *Listener) listenHandler() { +func (l *DefaultListener) listenHandler() { for { conn, err := l.listener.Accept() @@ -50,7 +50,7 @@ func (l *Listener) listenHandler() { // yet we encountered an error. if err != nil { panic(err) } - c := NewConnection(con) + c := NewConnection(conn) l.connections <- c } @@ -61,15 +61,15 @@ func (l *Listener) listenHandler() { } } -func (l *Listener) Connections() <-chan *Connection { +func (l *DefaultListener) Connections() <-chan *Connection { return l.connections } -func (l *Listener) LocalAddress() *NetAddress { +func (l *DefaultListener) LocalAddress() *NetAddress { return NewNetAddress(l.listener.Addr()) } -func (l *Listener) Stop() { +func (l *DefaultListener) Stop() { if atomic.CompareAndSwapUint32(&l.stopped, 0, 1) { l.listener.Close() } diff --git a/peer/msg.go b/peer/msg.go index 6581c56f7..bf6642e8f 100644 --- a/peer/msg.go +++ b/peer/msg.go @@ -1,5 +1,10 @@ package peer +import ( + . "github.com/tendermint/tendermint/binary" + "io" +) + /* Msg */ type Msg struct { diff --git a/peer/peer.go b/peer/peer.go index e7f84d4a6..30bb00ddb 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -1,8 +1,11 @@ package peer import ( - "atomic" + . "github.com/tendermint/tendermint/binary" + "sync/atomic" "sync" + "io" + "time" ) /* Peer */ @@ -82,7 +85,7 @@ func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) { FOR_LOOP: for { select { - case <-quit: + case <-p.quit: break FOR_LOOP case msg := <-inQueue: // add to channel filter @@ -91,11 +94,11 @@ func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) { inboundMsg := &InboundMsg{ Peer: p, Channel: channel, - Time: Time(time.Now()), + Time: Time{time.Now()}, Msg: msg, } select { - case <-quit: + case <-p.quit: break FOR_LOOP case inboundMsgQueue <- inboundMsg: continue @@ -108,11 +111,11 @@ func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) { } func (p *Peer) outHandler(chName String) { - outQueue := p.channels[chName].OutQueue() + outQueue := p.channels[chName].outQueue FOR_LOOP: for { select { - case <-quit: + case <-p.quit: break FOR_LOOP case msg := <-outQueue: // blocks until the connection is Stop'd, @@ -148,6 +151,10 @@ func NewChannel(name String, filter Filter, in, out chan Msg) *Channel { } } +func (c *Channel) Name() String { + return c.name +} + func (c *Channel) InQueue() <-chan Msg { return c.inQueue } @@ -164,7 +171,6 @@ func (c *Channel) Has(msg Msg) bool { return c.Filter().Has(msg) } -// TODO: maybe don't expose this func (c *Channel) Filter() Filter { // lock & defer c.mtx.Lock(); defer c.mtx.Unlock() @@ -172,7 +178,6 @@ func (c *Channel) Filter() Filter { // unlock deferred } -// TODO: maybe don't expose this func (c *Channel) UpdateFilter(filter Filter) { // lock c.mtx.Lock() diff --git a/peer/server.go b/peer/server.go index 787e5f7ac..de168c9b6 100644 --- a/peer/server.go +++ b/peer/server.go @@ -15,14 +15,14 @@ func NewServer(l Listener, c *Client) *Server { listener: l, client: c, } - go s.IncomingConnectionsHandler() + go s.IncomingConnectionHandler() return s } // meant to run in a goroutine func (s *Server) IncomingConnectionHandler() { for conn := range s.listener.Connections() { - s.client.AddIncomingConnection(conn) + s.client.AddPeerWithConnection(conn, false) } } diff --git a/peer/upnp.go b/peer/upnp.go index 711440c92..084780001 100644 --- a/peer/upnp.go +++ b/peer/upnp.go @@ -23,6 +23,13 @@ type upnpNAT struct { urnDomain string } +// protocol is either "udp" or "tcp" +type NAT interface { + GetExternalAddress() (addr net.IP, err error) + AddPortMapping(protocol string, externalPort, internalPort int, description string, timeout int) (mappedExternalPort int, err error) + DeletePortMapping(protocol string, externalPort, internalPort int) (err error) +} + func Discover() (nat NAT, err error) { ssdp, err := net.ResolveUDPAddr("udp4", "239.255.255.250:1900") if err != nil {