From 534635aae90a3e77654e1ee0fb9df0d32cb2200f Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 18 Jun 2014 20:48:32 -0700 Subject: [PATCH] working draft of peer --- peer/{addrmanager.go => addrbook.go} | 111 +++-- peer/client.go | 155 ++++++ peer/connection.go | 143 ++++++ peer/connection_test.go | 16 + peer/knownaddress.go | 20 +- peer/listener.go | 60 +++ peer/log.go | 2 +- peer/netaddress.go | 49 +- peer/peer.go | 715 +++++++++++++++++++++++++++ 9 files changed, 1203 insertions(+), 68 deletions(-) rename peer/{addrmanager.go => addrbook.go} (85%) create mode 100644 peer/client.go create mode 100644 peer/connection.go create mode 100644 peer/connection_test.go create mode 100644 peer/listener.go create mode 100644 peer/peer.go diff --git a/peer/addrmanager.go b/peer/addrbook.go similarity index 85% rename from peer/addrmanager.go rename to peer/addrbook.go index c60a70d0d..c5381498e 100644 --- a/peer/addrmanager.go +++ b/peer/addrbook.go @@ -20,8 +20,10 @@ import ( "fmt" ) -/* AddrManager - concurrency safe peer address manager */ -type AddrManager struct { +/* AddrBook - concurrency safe peer address manager */ +type AddrBook struct { + filePath string + mtx sync.Mutex rand *rand.Rand key [32]byte @@ -34,8 +36,9 @@ type AddrManager struct { quit chan struct{} nOld int nNew int + + lamtx sync.Mutex localAddresses map[string]*localAddress - filePath string } const ( @@ -91,8 +94,8 @@ const ( ) // Use Start to begin processing asynchronous address updates. -func NewAddrManager(filePath string) *AddrManager { - am := AddrManager{ +func NewAddrBook(filePath string) *AddrBook { + am := AddrBook{ rand: rand.New(rand.NewSource(time.Now().UnixNano())), quit: make(chan struct{}), localAddresses: make(map[string]*localAddress), @@ -103,7 +106,7 @@ func NewAddrManager(filePath string) *AddrManager { } // When modifying this, don't forget to update loadFromFile() -func (a *AddrManager) init() { +func (a *AddrBook) init() { a.addrIndex = make(map[string]*KnownAddress) io.ReadFull(crand.Reader, a.key[:]) for i := range a.addrNew { @@ -114,37 +117,37 @@ func (a *AddrManager) init() { } } -func (a *AddrManager) Start() { +func (a *AddrBook) Start() { if atomic.AddInt32(&a.started, 1) != 1 { return } - amgrLog.Trace("Starting address manager") + log.Trace("Starting address manager") a.loadFromFile(a.filePath) a.wg.Add(1) go a.addressHandler() } -func (a *AddrManager) Stop() { +func (a *AddrBook) Stop() { if atomic.AddInt32(&a.shutdown, 1) != 1 { return } - amgrLog.Infof("Address manager shutting down") + log.Infof("Address manager shutting down") close(a.quit) a.wg.Wait() } -func (a *AddrManager) AddAddress(addr *NetAddress, src *NetAddress) { +func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) { a.mtx.Lock(); defer a.mtx.Unlock() a.addAddress(addr, src) } -func (a *AddrManager) NeedMoreAddresses() bool { +func (a *AddrBook) NeedMoreAddresses() bool { return a.NumAddresses() < needAddressThreshold } -func (a *AddrManager) NumAddresses() int { +func (a *AddrBook) NumAddresses() int { a.mtx.Lock(); defer a.mtx.Unlock() return a.nOld + a.nNew } // Pick a new address to connect to. -func (a *AddrManager) PickAddress(class string, newBias int) *KnownAddress { +func (a *AddrBook) PickAddress(class string, newBias int) *KnownAddress { a.mtx.Lock(); defer a.mtx.Unlock() if a.nOld == 0 && a.nNew == 0 { return nil } @@ -182,20 +185,26 @@ func (a *AddrManager) PickAddress(class string, newBias int) *KnownAddress { return nil } -func (a *AddrManager) MarkGood(ka *KnownAddress) { +func (a *AddrBook) MarkGood(addr *NetAddress) { a.mtx.Lock(); defer a.mtx.Unlock() + ka := a.addrIndex[addr.String()] + if ka == nil { return } ka.MarkAttempt(true) - a.moveToOld(ka) + if ka.OldBucket == -1 { + a.moveToOld(ka) + } } -func (a *AddrManager) MarkBad(ka *KnownAddress) { +func (a *AddrBook) MarkAttempt(addr *NetAddress) { a.mtx.Lock(); defer a.mtx.Unlock() + ka := a.addrIndex[addr.String()] + if ka == nil { return } ka.MarkAttempt(false) } /* Loading & Saving */ -type addrManagerJSON struct { +type addrBookJSON struct { Key [32]byte AddrNew [newBucketCount]map[string]*KnownAddress AddrOld [oldBucketCount][]*KnownAddress @@ -203,8 +212,8 @@ type addrManagerJSON struct { NNew int } -func (a *AddrManager) saveToFile(filePath string) { - aJSON := &addrManagerJSON{ +func (a *AddrBook) saveToFile(filePath string) { + aJSON := &addrBookJSON{ Key: a.key, AddrNew: a.addrNew, AddrOld: a.addrOld, @@ -214,7 +223,7 @@ func (a *AddrManager) saveToFile(filePath string) { w, err := os.Create(filePath) if err != nil { - amgrLog.Error("Error opening file: ", filePath, err) + log.Error("Error opening file: ", filePath, err) return } enc := json.NewEncoder(w) @@ -223,25 +232,27 @@ func (a *AddrManager) saveToFile(filePath string) { if err != nil { panic(err) } } -func (a *AddrManager) loadFromFile(filePath string) { +func (a *AddrBook) loadFromFile(filePath string) { // If doesn't exist, do nothing. _, err := os.Stat(filePath) if os.IsNotExist(err) { return } + // Load addrBookJSON{} + r, err := os.Open(filePath) if err != nil { panic(fmt.Errorf("%s error opening file: %v", filePath, err)) } defer r.Close() - aJSON := &addrManagerJSON{} + aJSON := &addrBookJSON{} dec := json.NewDecoder(r) err = dec.Decode(aJSON) if err != nil { panic(fmt.Errorf("error reading %s: %v", filePath, err)) } - // Now we need to initialize 'a'. + // Now we need to initialize self. copy(a.key[:], aJSON.Key[:]) a.addrNew = aJSON.AddrNew @@ -262,7 +273,7 @@ func (a *AddrManager) loadFromFile(filePath string) { /* Private methods */ -func (a *AddrManager) addressHandler() { +func (a *AddrBook) addressHandler() { dumpAddressTicker := time.NewTicker(dumpAddressInterval) out: for { @@ -276,10 +287,10 @@ out: dumpAddressTicker.Stop() a.saveToFile(a.filePath) a.wg.Done() - amgrLog.Trace("Address handler done") + log.Trace("Address handler done") } -func (a *AddrManager) addAddress(addr, src *NetAddress) { +func (a *AddrBook) addAddress(addr, src *NetAddress) { if !addr.Routable() { return } key := addr.String() @@ -310,7 +321,7 @@ func (a *AddrManager) addAddress(addr, src *NetAddress) { // Enforce max addresses. if len(a.addrNew[bucket]) > newBucketSize { - amgrLog.Tracef("new bucket is full, expiring old ") + log.Tracef("new bucket is full, expiring old ") a.expireNew(bucket) } @@ -318,17 +329,17 @@ func (a *AddrManager) addAddress(addr, src *NetAddress) { ka.NewRefs++ a.addrNew[bucket][key] = ka - amgrLog.Tracef("Added new address %s for a total of %d addresses", addr, a.nOld+a.nNew) + log.Tracef("Added new address %s for a total of %d addresses", addr, a.nOld+a.nNew) } // Make space in the new buckets by expiring the really bad entries. // If no bad entries are available we look at a few and remove the oldest. -func (a *AddrManager) expireNew(bucket int) { +func (a *AddrBook) expireNew(bucket int) { var oldest *KnownAddress for k, v := range a.addrNew[bucket] { // If an entry is bad, throw it away if v.Bad() { - amgrLog.Tracef("expiring bad address %v", k) + log.Tracef("expiring bad address %v", k) delete(a.addrNew[bucket], k) v.NewRefs-- if v.NewRefs == 0 { @@ -340,7 +351,7 @@ func (a *AddrManager) expireNew(bucket int) { // or, keep track of the oldest entry if oldest == nil { oldest = v - } else if v.LastAttempt < oldest.LastAttempt { + } else if v.LastAttempt.Before(oldest.LastAttempt.Time) { oldest = v } } @@ -348,7 +359,7 @@ func (a *AddrManager) expireNew(bucket int) { // If we haven't thrown out a bad entry, throw out the oldest entry if oldest != nil { key := oldest.Addr.String() - amgrLog.Tracef("expiring oldest address %v", key) + log.Tracef("expiring oldest address %v", key) delete(a.addrNew[bucket], key) oldest.NewRefs-- if oldest.NewRefs == 0 { @@ -358,7 +369,7 @@ func (a *AddrManager) expireNew(bucket int) { } } -func (a *AddrManager) moveToOld(ka *KnownAddress) { +func (a *AddrBook) moveToOld(ka *KnownAddress) { // Remove from all new buckets. // Remember one of those new buckets. addrKey := ka.Addr.String() @@ -403,18 +414,18 @@ func (a *AddrManager) moveToOld(ka *KnownAddress) { // put rmka into new bucket rmkey := rmka.Addr.String() - amgrLog.Tracef("Replacing %s with %s in old", rmkey, addrKey) + log.Tracef("Replacing %s with %s in old", rmkey, addrKey) a.addrNew[newBucket][rmkey] = rmka rmka.NewRefs++ a.nNew++ } // Returns the index in old bucket of oldest entry. -func (a *AddrManager) pickOld(bucket int) int { +func (a *AddrBook) pickOld(bucket int) int { var oldest *KnownAddress var oldestIndex int for i, ka := range a.addrOld[bucket] { - if oldest == nil || ka.LastAttempt < oldest.LastAttempt { + if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt.Time) { oldest = ka oldestIndex = i } @@ -424,7 +435,7 @@ func (a *AddrManager) pickOld(bucket int) int { // doublesha256(key + sourcegroup + // int64(doublesha256(key + group + sourcegroup))%bucket_per_source_group) % num_new_buckes -func (a *AddrManager) getNewBucket(addr, src *NetAddress) int { +func (a *AddrBook) getNewBucket(addr, src *NetAddress) int { data1 := []byte{} data1 = append(data1, a.key[:]...) data1 = append(data1, []byte(GroupKey(addr))...) @@ -444,7 +455,7 @@ func (a *AddrManager) getNewBucket(addr, src *NetAddress) int { } // doublesha256(key + group + truncate_to_64bits(doublesha256(key + addr))%buckets_per_group) % num_buckets -func (a *AddrManager) getOldBucket(addr *NetAddress) int { +func (a *AddrBook) getOldBucket(addr *NetAddress) int { data1 := []byte{} data1 = append(data1, a.key[:]...) data1 = append(data1, []byte(addr.String())...) @@ -463,7 +474,7 @@ func (a *AddrManager) getOldBucket(addr *NetAddress) int { } -///// LOCAL ADDRESS +/* Local Address */ // addressPrio is an enum type used to describe the heirarchy of local address // discovery methods. @@ -482,15 +493,15 @@ type localAddress struct { Score addressPrio } -// addLocalAddress adds addr to the list of known local addresses to advertise -// with the given priority. -func (a *AddrManager) addLocalAddress(addr *NetAddress, priority addressPrio) { +func (a *AddrBook) AddLocalAddress(addr *NetAddress, priority addressPrio) { + a.mtx.Lock(); defer a.mtx.Unlock() + // sanity check. if !addr.Routable() { - amgrLog.Debugf("rejecting address %s:%d due to routability", addr.IP, addr.Port) + log.Debugf("rejecting address %s:%d due to routability", addr.IP, addr.Port) return } - amgrLog.Debugf("adding address %s:%d", addr.IP, addr.Port) + log.Debugf("adding address %s:%d", addr.IP, addr.Port) key := addr.String() la, ok := a.localAddresses[key] @@ -506,9 +517,11 @@ func (a *AddrManager) addLocalAddress(addr *NetAddress, priority addressPrio) { } } -// getBestLocalAddress returns the most appropriate local address that we know +// Returns the most appropriate local address that we know // of to be contacted by rna (remote net address) -func (a *AddrManager) getBestLocalAddress(rna *NetAddress) *NetAddress { +func (a *AddrBook) GetBestLocalAddress(rna *NetAddress) *NetAddress { + a.mtx.Lock(); defer a.mtx.Unlock() + bestReach := 0 var bestScore addressPrio var bestAddr *NetAddress @@ -522,10 +535,10 @@ func (a *AddrManager) getBestLocalAddress(rna *NetAddress) *NetAddress { } } if bestAddr != nil { - amgrLog.Debugf("Suggesting address %s:%d for %s:%d", + log.Debugf("Suggesting address %s:%d for %s:%d", bestAddr.IP, bestAddr.Port, rna.IP, rna.Port) } else { - amgrLog.Debugf("No worthy address for %s:%d", + log.Debugf("No worthy address for %s:%d", rna.IP, rna.Port) // Send something unroutable if nothing suitable. bestAddr = &NetAddress{ diff --git a/peer/client.go b/peer/client.go new file mode 100644 index 000000000..567f6d776 --- /dev/null +++ b/peer/client.go @@ -0,0 +1,155 @@ +package peer + +import ( + . "github.com/tendermint/tendermint/binary" + "github.com/tendermint/tendermint/merkle" + "sync" + "io" +) + +/* Client */ +type Client struct { + listener *Listener + addrBook AddrBook + strategies map[String]*FilterStrategy + targetNumPeers int + + peersMtx sync.Mutex + peers merkle.Tree // addr -> *Peer + + filtersMtx sync.Mutex + filters merkle.Tree // channelName -> Filter (objects that I know of) +} + +func NewClient(protocol string, laddr string) *Client { + // XXX set the handler + listener := NewListener(protocol, laddr, nil) + c := &Client{ + listener: listener, + peers: merkle.NewIAVLTree(nil), + filters: merkle.NewIAVLTree(nil), + } + return c +} + +func (c *Client) Start() (<-chan *IncomingMsg) { + return nil +} + +func (c *Client) Stop() { + c.listener.Close() +} + +func (c *Client) LocalAddress() *NetAddress { + return c.listener.LocalAddress() +} + +func (c *Client) ConnectTo(addr *NetAddress) (*Peer, error) { + + conn, err := addr.Dial() + if err != nil { return nil, err } + peer := NewPeer(conn) + + // lock + c.peersMtx.Lock() + c.peers.Put(addr, peer) + c.peersMtx.Unlock() + // unlock + + return peer, nil +} + +func (c *Client) Broadcast(channel String, msg Binary) { + for v := range c.peersCopy().Values() { + peer, ok := v.(*Peer) + if !ok { panic("Expected peer but got something else") } + peer.Queue(channel, msg) + } +} + +// Updates the client's filter for a channel & broadcasts it. +func (c *Client) UpdateFilter(channel String, filter Filter) { + c.filtersMtx.Lock() + c.filters.Put(channel, filter) + c.filtersMtx.Unlock() + + c.Broadcast("", &NewFilterMsg{ + Channel: channel, + Filter: filter, + }) +} + +func (c *Client) peersCopy() merkle.Tree { + c.peersMtx.Lock(); defer c.peersMtx.Unlock() + return c.peers.Copy() +} + + +/* Channel */ +type Channel struct { + Name String + Filter Filter + //Stats Stats +} + + +/* Peer */ +type Peer struct { + Conn *Connection + Channels map[String]*Channel +} + +func NewPeer(conn *Connection) *Peer { + return &Peer{ + Conn: conn, + Channels: nil, + } +} + +// Must be quick and nonblocking. +func (p *Peer) Queue(channel String, msg Binary) {} + +func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { + return 0, nil // TODO +} + + +/* IncomingMsg */ +type IncomingMsg struct { + SPeer *Peer + SChan *Channel + + Time Time + + Msg Binary +} + + +/* Filter + + A Filter could be a bloom filter for lossy filtering, or could be a lossless filter. + Either way, it's used to keep track of what a peer knows of. +*/ +type Filter interface { + Binary + Add(ByteSlice) + Has(ByteSlice) bool +} + +/* FilterStrategy + + Defines how filters are generated per peer, and whether they need to get refreshed occasionally. +*/ +type FilterStrategy interface { + LoadFilter(ByteSlice) Filter +} + +/* NewFilterMsg */ +type NewFilterMsg struct { + Channel String + Filter Filter +} + +func (m *NewFilterMsg) WriteTo(w io.Writer) (int64, error) { + return 0, nil // TODO +} diff --git a/peer/connection.go b/peer/connection.go new file mode 100644 index 000000000..6d5b6e532 --- /dev/null +++ b/peer/connection.go @@ -0,0 +1,143 @@ +package peer + +import ( + . "github.com/tendermint/tendermint/common" + . "github.com/tendermint/tendermint/binary" + "sync" + "net" + "runtime" + "fmt" + "time" +) + +const ( + OUT_QUEUE_SIZE = 50 + IDLE_TIMEOUT_MINUTES = 5 + PING_TIMEOUT_MINUTES = 2 +) + +/* Connnection */ +type Connection struct { + ioStats IOStats + + mtx sync.Mutex + outQueue chan ByteSlice + conn net.Conn + quit chan struct{} + disconnected bool + + pingDebouncer *Debouncer + pong chan struct{} +} + +var ( + PACKET_TYPE_PING = UInt8(0x00) + PACKET_TYPE_PONG = UInt8(0x01) + PACKET_TYPE_MSG = UInt8(0x10) +) + +func NewConnection(conn net.Conn) *Connection { + return &Connection{ + outQueue: make(chan ByteSlice, OUT_QUEUE_SIZE), + conn: conn, + quit: make(chan struct{}), + pingDebouncer: NewDebouncer(PING_TIMEOUT_MINUTES * time.Minute), + pong: make(chan struct{}), + } +} + +func (c *Connection) QueueMessage(msg ByteSlice) bool { + c.mtx.Lock(); defer c.mtx.Unlock() + if c.disconnected { return false } + select { + case c.outQueue <- msg: + return true + default: // buffer full + return false + } +} + +func (c *Connection) Start() { + go c.outHandler() + go c.inHandler() +} + +func (c *Connection) Disconnect() { + c.mtx.Lock(); defer c.mtx.Unlock() + close(c.quit) + c.conn.Close() + c.pingDebouncer.Stop() + // do not close c.pong + c.disconnected = true +} + +func (c *Connection) flush() { + // TODO flush? (turn off nagel, turn back on, etc) +} + +func (c *Connection) outHandler() { + + FOR_LOOP: + for { + select { + case <-c.pingDebouncer.Ch: + PACKET_TYPE_PING.WriteTo(c.conn) + case outMsg := <-c.outQueue: + _, err := outMsg.WriteTo(c.conn) + if err != nil { Panicf("TODO: handle error %v", err) } + case <-c.pong: + PACKET_TYPE_PONG.WriteTo(c.conn) + case <-c.quit: + break FOR_LOOP + } + c.flush() + } + + // cleanup + for _ = range c.outQueue { + // do nothing but drain. + } +} + +func (c *Connection) inHandler() { + defer func() { + if e := recover(); e != nil { + // Get stack trace + buf := make([]byte, 1<<16) + runtime.Stack(buf, false) + // TODO do proper logging + fmt.Printf("Disconnecting due to error:\n\n%v\n", string(buf)) + c.Disconnect() + } + }() + + //FOR_LOOP: + for { + msgType := ReadUInt8(c.conn) + + switch msgType { + case PACKET_TYPE_PING: + c.pong <- struct{}{} + case PACKET_TYPE_PONG: + // do nothing + case PACKET_TYPE_MSG: + ReadByteSlice(c.conn) + default: + Panicf("Unknown message type %v", msgType) + } + c.pingDebouncer.Reset() + } +} + + + +/* IOStats */ +type IOStats struct { + TimeConnected Time + LastSent Time + LastRecv Time + BytesRecv UInt64 + BytesSent UInt64 + MsgsRecv UInt64 + MsgsSent UInt64 +} diff --git a/peer/connection_test.go b/peer/connection_test.go new file mode 100644 index 000000000..99f6c00e0 --- /dev/null +++ b/peer/connection_test.go @@ -0,0 +1,16 @@ +package peer + +import ( + "testing" +) + +func TestLocalConnection(t *testing.T) { + + c1 := NewClient("tcp", ":8080") + c2 := NewClient("tcp", ":8081") + + c1.ConnectTo(c2.LocalAddress()) + + c1.Stop() + c2.Stop() +} diff --git a/peer/knownaddress.go b/peer/knownaddress.go index 73a1fc2fa..3baed60f9 100644 --- a/peer/knownaddress.go +++ b/peer/knownaddress.go @@ -16,8 +16,8 @@ type KnownAddress struct { Addr *NetAddress Src *NetAddress Attempts UInt32 - LastAttempt UInt64 - LastSuccess UInt64 + LastAttempt Time + LastSuccess Time NewRefs UInt16 OldBucket Int16 // TODO init to -1 } @@ -27,7 +27,7 @@ func NewKnownAddress(addr *NetAddress, src *NetAddress) *KnownAddress { Addr: addr, Src: src, OldBucket: -1, - LastAttempt: UInt64(time.Now().Unix()), + LastAttempt: Time{time.Now()}, Attempts: 0, } } @@ -37,8 +37,8 @@ func ReadKnownAddress(r io.Reader) *KnownAddress { Addr: ReadNetAddress(r), Src: ReadNetAddress(r), Attempts: ReadUInt32(r), - LastAttempt: ReadUInt64(r), - LastSuccess: ReadUInt64(r), + LastAttempt: ReadTime(r), + LastSuccess: ReadTime(r), NewRefs: ReadUInt16(r), OldBucket: ReadInt16(r), } @@ -56,7 +56,7 @@ func (ka *KnownAddress) WriteTo(w io.Writer) (n int64, err error) { } func (ka *KnownAddress) MarkAttempt(success bool) { - now := UInt64(time.Now().Unix()) + now := Time{time.Now()} ka.LastAttempt = now if success { ka.LastSuccess = now @@ -80,22 +80,22 @@ func (ka *KnownAddress) MarkAttempt(success bool) { */ func (ka *KnownAddress) Bad() bool { // Has been attempted in the last minute --> good - if ka.LastAttempt < UInt64(time.Now().Add(-1 * time.Minute).Unix()) { + if ka.LastAttempt.Before(time.Now().Add(-1 * time.Minute)) { return false } // Over a month old? - if ka.LastAttempt > UInt64(time.Now().Add(-1 * numMissingDays * time.Hour * 24).Unix()) { + if ka.LastAttempt.After(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) { return true } // Never succeeded? - if ka.LastSuccess == 0 && ka.Attempts >= numRetries { + if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries { return true } // Hasn't succeeded in too long? - if ka.LastSuccess < UInt64(time.Now().Add(-1*minBadDays*time.Hour*24).Unix()) && + if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) && ka.Attempts >= maxFailures { return true } diff --git a/peer/listener.go b/peer/listener.go new file mode 100644 index 000000000..72ec9e406 --- /dev/null +++ b/peer/listener.go @@ -0,0 +1,60 @@ +package peer + +import ( + "sync" + "net" +) + +/* Listener */ + +type Listener struct { + listener net.Listener + handler func(net.Conn) + mtx sync.Mutex + closed bool +} + +func NewListener(protocol string, laddr string, handler func(net.Conn)) *Listener { + ln, err := net.Listen(protocol, laddr) + if err != nil { panic(err) } + + s := &Listener{ + listener: ln, + handler: handler, + } + + go s.listen() + + return s +} + +func (s *Listener) listen() { + for { + conn, err := s.listener.Accept() + if err != nil { + // lock & defer + s.mtx.Lock(); defer s.mtx.Unlock() + if s.closed { + return + } else { + panic(err) + } + // unlock (deferred) + } + + go s.handler(conn) + } +} + +func (s *Listener) LocalAddress() *NetAddress { + return NewNetAddress(s.listener.Addr()) +} + +func (s *Listener) Close() { + // lock + s.mtx.Lock() + s.closed = true + s.mtx.Unlock() + // unlock + s.listener.Close() +} diff --git a/peer/log.go b/peer/log.go index 07b2ed2bd..1ff56693e 100644 --- a/peer/log.go +++ b/peer/log.go @@ -4,4 +4,4 @@ import ( "github.com/tendermint/btclog" ) -var amgrLog = btclog.Disabled +var log = btclog.Disabled diff --git a/peer/netaddress.go b/peer/netaddress.go index a5c42c49c..0011f7deb 100644 --- a/peer/netaddress.go +++ b/peer/netaddress.go @@ -18,6 +18,25 @@ type NetAddress struct { Port UInt16 } +// TODO: socks proxies? +func NewNetAddress(addr net.Addr) *NetAddress { + tcpAddr, ok := addr.(*net.TCPAddr) + if !ok { panic("Only TCPAddrs are supported") } + ip := tcpAddr.IP + port := UInt16(tcpAddr.Port) + return NewNetAddressIPPort(ip, port) +} + +func NewNetAddressString(addr string) *NetAddress { + host, portStr, err := net.SplitHostPort(addr) + if err != nil { panic(err) } + ip := net.ParseIP(host) + port, err := strconv.ParseUint(portStr, 10, 16) + if err != nil { panic(err) } + na := NewNetAddressIPPort(ip, UInt16(port)) + return na +} + func NewNetAddressIPPort(ip net.IP, port UInt16) *NetAddress { na := NetAddress{ IP: ip, @@ -26,14 +45,6 @@ func NewNetAddressIPPort(ip net.IP, port UInt16) *NetAddress { return &na } -func NewNetAddress(addr net.Addr) *NetAddress { - tcpAddr, ok := addr.(*net.TCPAddr) - if !ok { panic("addr is not a net.TCPAddr") } - - na := NewNetAddressIPPort(tcpAddr.IP, UInt16(tcpAddr.Port)) - return na -} - func ReadNetAddress(r io.Reader) *NetAddress { return &NetAddress{ IP: net.IP(ReadByteSlice(r)), @@ -47,12 +58,34 @@ func (na *NetAddress) WriteTo(w io.Writer) (n int64, err error) { return } +func (na *NetAddress) Equals(other Binary) bool { + if o, ok := other.(*NetAddress); ok { + return na.String() == o.String() + } else { + return false + } +} + +func (na *NetAddress) Less(other Binary) bool { + if o, ok := other.(*NetAddress); ok { + return na.String() < o.String() + } else { + panic("Cannot compare unequal types") + } +} + func (na *NetAddress) String() string { port := strconv.FormatUint(uint64(na.Port), 10) addr := net.JoinHostPort(na.IP.String(), port) return addr } +func (na *NetAddress) Dial() (*Connection, error) { + conn, err := net.Dial("tcp", na.String()) + if err != nil { return nil, err } + return NewConnection(conn), nil +} + func (na *NetAddress) Routable() bool { // TODO(oga) bitcoind doesn't include RFC3849 here, but should we? return na.Valid() && !(na.RFC1918() || na.RFC3927() || na.RFC4862() || diff --git a/peer/peer.go b/peer/peer.go new file mode 100644 index 000000000..38dab2ffd --- /dev/null +++ b/peer/peer.go @@ -0,0 +1,715 @@ +package peer + +import ( + "bytes" + "container/list" + "fmt" + "github.com/davecgh/go-spew/spew" + "github.com/tendermint/btcwire" + "net" + "strconv" + "sync" + "sync/atomic" + "time" +) + +const ( + // max protocol version the peer supports. + maxProtocolVersion = 70001 + + // number of elements the output channels use. + outputBufferSize = 50 + + // number of seconds of inactivity before we timeout a peer + // that hasn't completed the initial version negotiation. + negotiateTimeoutSeconds = 30 + + // number of minutes of inactivity before we time out a peer. + idleTimeoutMinutes = 5 + + // number of minutes since we last sent a message + // requiring a reply before we will ping a host. + pingTimeoutMinutes = 2 +) + +var ( + userAgentName = "tendermintd" + userAgentVersion = fmt.Sprintf("%d.%d.%d", appMajor, appMinor, appPatch) +) + +// zeroHash is the zero value hash (all zeros). It is defined as a convenience. +var zeroHash btcwire.ShaHash + +// minUint32 is a helper function to return the minimum of two uint32s. +// This avoids a math import and the need to cast to floats. +func minUint32(a, b uint32) uint32 { + if a < b { + return a + } + return b +} + +// TODO(davec): Rename and comment this +type outMsg struct { + msg btcwire.Message + doneChan chan bool +} + +/* +The overall data flow is split into 2 goroutines. + +Inbound messages are read via the inHandler goroutine and generally +dispatched to their own handler. + +Outbound messages are queued via QueueMessage. +*/ +type peer struct { + server *server + addr *NetAddress + inbound bool + persistent bool + + started bool // atomic + quit chan bool + + conn net.Conn + connMtx sync.Mutex + disconnected bool // atomic && protected by connMtx + knownAddresses map[string]bool + outputQueue chan outMsg + + statMtx sync.Mutex // protects all below here. + protocolVersion uint32 + timeConnected time.Time + lastSend time.Time + lastRecv time.Time + bytesReceived uint64 + bytesSent uint64 + userAgent string + lastPingNonce uint64 // Set to nonce if we have a pending ping. + lastPingTime time.Time // Time we sent last ping. + lastPingMicros int64 // Time for last ping to return. +} + +// String returns the peer's address and directionality as a human-readable +// string. +func (p *peer) String() string { + return fmt.Sprintf("%s (%s)", p.addr.String(), directionString(p.inbound)) +} + +// VersionKnown returns the whether or not the version of a peer is known locally. +// It is safe for concurrent access. +func (p *peer) VersionKnown() bool { + p.statMtx.Lock(); defer p.statMtx.Unlock() + + return p.protocolVersion != 0 +} + +// ProtocolVersion returns the peer protocol version in a manner that is safe +// for concurrent access. +func (p *peer) ProtocolVersion() uint32 { + p.statMtx.Lock(); defer p.statMtx.Unlock() + + return p.protocolVersion +} + +// pushVersionMsg sends a version message to the connected peer using the +// current state. +func (p *peer) pushVersionMsg() { + _, blockNum, err := p.server.db.NewestSha() + if err != nil { panic(err) } + + // Version message. + // TODO: DisableListen -> send zero address + msg := btcwire.NewMsgVersion( + p.server.addrManager.getBestLocalAddress(p.addr), p.addr, + p.server.nonce, int32(blockNum)) + msg.AddUserAgent(userAgentName, userAgentVersion) + + // Advertise our max supported protocol version. + msg.ProtocolVersion = maxProtocolVersion + + p.QueueMessage(msg, nil) +} + +// handleVersionMsg is invoked when a peer receives a version bitcoin message +// and is used to negotiate the protocol version details as well as kick start +// the communications. +func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { + // Detect self connections. + if msg.Nonce == p.server.nonce { + peerLog.Debugf("Disconnecting peer connected to self %s", p) + p.Disconnect() + return + } + + p.statMtx.Lock() // Updating a bunch of stats. + // Limit to one version message per peer. + if p.protocolVersion != 0 { + p.logError("Only one version message per peer is allowed %s.", p) + p.statMtx.Unlock() + p.Disconnect() + return + } + + // Negotiate the protocol version. + p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion)) + peerLog.Debugf("Negotiated protocol version %d for peer %s", p.protocolVersion, p) + + // Set the remote peer's user agent. + p.userAgent = msg.UserAgent + + p.statMtx.Unlock() + + // Inbound connections. + if p.inbound { + // Send version. + p.pushVersionMsg() + } + + // Send verack. + p.QueueMessage(btcwire.NewMsgVerAck(), nil) + + if p.inbound { + // A peer might not be advertising the same address that it + // actually connected from. One example of why this can happen + // is with NAT. Only add the address to the address manager if + // the addresses agree. + if msg.AddrMe.String() == p.addr.String() { + p.server.addrManager.AddAddress(p.addr, p.addr) + } + } else { + // Request known addresses from the remote peer. + if !cfg.SimNet && p.server.addrManager.NeedMoreAddresses() { + p.QueueMessage(btcwire.NewMsgGetAddr(), nil) + } + } + + // Mark the address as a known good address. + p.server.addrManager.MarkGood(p.addr) + + // Signal the block manager this peer is a new sync candidate. + p.server.blockManager.NewPeer(p) + + // TODO: Relay alerts. +} + + +// handleGetAddrMsg is invoked when a peer receives a getaddr bitcoin message +// and is used to provide the peer with known addresses from the address +// manager. +func (p *peer) handleGetAddrMsg(msg *btcwire.MsgGetAddr) { + // Don't return any addresses when running on the simulation test + // network. This helps prevent the network from becoming another + // public test network since it will not be able to learn about other + // peers that have not specifically been provided. + if cfg.SimNet { + return + } + + // Get the current known addresses from the address manager. + addrCache := p.server.addrManager.AddressCache() + + // Push the addresses. + p.pushAddrMsg(addrCache) +} + +// pushAddrMsg sends one, or more, addr message(s) to the connected peer using +// the provided addresses. +func (p *peer) pushAddrMsg(addresses []*NetAddress) { + // Nothing to send. + if len(addresses) == 0 { return } + + numAdded := 0 + msg := btcwire.NewMsgAddr() + for _, addr := range addresses { + // Filter addresses the peer already knows about. + if p.knownAddresses[addr.String()] { + continue + } + + // Add the address to the message. + err := msg.AddAddress(addr) + if err != nil { panic(err) } // XXX remove error condition + numAdded++ + + // Split into multiple messages as needed. + if numAdded > 0 && numAdded%btcwire.MaxAddrPerMsg == 0 { + p.QueueMessage(msg, nil) + + // NOTE: This needs to be a new address message and not + // simply call ClearAddresses since the message is a + // pointer and queueing it does not make a copy. + msg = btcwire.NewMsgAddr() + } + } + + // Send message with remaining addresses if needed. + if numAdded%btcwire.MaxAddrPerMsg != 0 { + p.QueueMessage(msg, nil) + } +} + +// handleAddrMsg is invoked when a peer receives an addr bitcoin message and +// is used to notify the server about advertised addresses. +func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) { + // Ignore addresses when running on the simulation test network. This + // helps prevent the network from becoming another public test network + // since it will not be able to learn about other peers that have not + // specifically been provided. + if cfg.SimNet { + return + } + + // A message that has no addresses is invalid. + if len(msg.AddrList) == 0 { + p.logError("Command [%s] from %s does not contain any addresses", msg.Command(), p) + p.Disconnect() + return + } + + for _, addr := range msg.AddrList { + // Set the timestamp to 5 days ago if it's more than 24 hours + // in the future so this address is one of the first to be + // removed when space is needed. + now := time.Now() + if addr.Timestamp.After(now.Add(time.Minute * 10)) { + addr.Timestamp = now.Add(-1 * time.Hour * 24 * 5) + } + + // Add address to known addresses for this peer. + p.knownAddresses[addr.String()] = true + } + + // Add addresses to server address manager. The address manager handles + // the details of things such as preventing duplicate addresses, max + // addresses, and last seen updates. + // XXX bitcoind gives a 2 hour time penalty here, do we want to do the + // same? + p.server.addrManager.AddAddresses(msg.AddrList, p.addr) +} + +func (p *peer) handlePingMsg(msg *btcwire.MsgPing) { + // Include nonce from ping so pong can be identified. + p.QueueMessage(btcwire.NewMsgPong(msg.Nonce), nil) +} + +func (p *peer) handlePongMsg(msg *btcwire.MsgPong) { + p.statMtx.Lock(); defer p.statMtx.Unlock() + + // Arguably we could use a buffered channel here sending data + // in a fifo manner whenever we send a ping, or a list keeping track of + // the times of each ping. For now we just make a best effort and + // only record stats if it was for the last ping sent. Any preceding + // and overlapping pings will be ignored. It is unlikely to occur + // without large usage of the ping rpc call since we ping + // infrequently enough that if they overlap we would have timed out + // the peer. + if p.lastPingNonce != 0 && msg.Nonce == p.lastPingNonce { + p.lastPingMicros = time.Now().Sub(p.lastPingTime).Nanoseconds() + p.lastPingMicros /= 1000 // convert to usec. + p.lastPingNonce = 0 + } +} + +// readMessage reads the next bitcoin message from the peer with logging. +func (p *peer) readMessage() (btcwire.Message, []byte, error) { + n, msg, buf, err := btcwire.ReadMessageN(p.conn, p.ProtocolVersion()) + p.statMtx.Lock() + p.bytesReceived += uint64(n) + p.statMtx.Unlock() + p.server.AddBytesReceived(uint64(n)) + if err != nil { + return nil, nil, err + } + + // Use closures to log expensive operations so they are only run when + // the logging level requires it. + peerLog.Debugf("%v", newLogClosure(func() string { + // Debug summary of message. + summary := messageSummary(msg) + if len(summary) > 0 { + summary = " (" + summary + ")" + } + return fmt.Sprintf("Received %v%s from %s", msg.Command(), summary, p) + })) + peerLog.Tracef("%v", newLogClosure(func() string { + return spew.Sdump(msg) + })) + peerLog.Tracef("%v", newLogClosure(func() string { + return spew.Sdump(buf) + })) + + return msg, buf, nil +} + +// writeMessage sends a bitcoin Message to the peer with logging. +func (p *peer) writeMessage(msg btcwire.Message) { + if p.Disconnected() { return } + + if !p.VersionKnown() { + switch msg.(type) { + case *btcwire.MsgVersion: + // This is OK. + default: + // We drop all messages other than version if we + // haven't done the handshake already. + return + } + } + + // Use closures to log expensive operations so they are only run when + // the logging level requires it. + peerLog.Debugf("%v", newLogClosure(func() string { + // Debug summary of message. + summary := messageSummary(msg) + if len(summary) > 0 { + summary = " (" + summary + ")" + } + return fmt.Sprintf("Sending %v%s to %s", msg.Command(), summary, p) + })) + peerLog.Tracef("%v", newLogClosure(func() string { + return spew.Sdump(msg) + })) + peerLog.Tracef("%v", newLogClosure(func() string { + var buf bytes.Buffer + err := btcwire.WriteMessage(&buf, msg, p.ProtocolVersion()) + if err != nil { + return err.Error() + } + return spew.Sdump(buf.Bytes()) + })) + + // Write the message to the peer. + n, err := btcwire.WriteMessageN(p.conn, msg, p.ProtocolVersion()) + p.statMtx.Lock() + p.bytesSent += uint64(n) + p.statMtx.Unlock() + p.server.AddBytesSent(uint64(n)) + if err != nil { + p.Disconnect() + p.logError("Can't send message to %s: %v", p, err) + return + } +} + + +// inHandler handles all incoming messages for the peer. It must be run as a +// goroutine. +func (p *peer) inHandler() { + // Peers must complete the initial version negotiation within a shorter + // timeframe than a general idle timeout. The timer is then reset below + // to idleTimeoutMinutes for all future messages. + idleTimer := time.AfterFunc(negotiateTimeoutSeconds*time.Second, func() { + if p.VersionKnown() { + peerLog.Warnf("Peer %s no answer for %d minutes, disconnecting", p, idleTimeoutMinutes) + } + p.Disconnect() + }) +out: + for !p.Disconnected() { + rmsg, buf, err := p.readMessage() + // Stop the timer now, if we go around again we will reset it. + idleTimer.Stop() + if err != nil { + if !p.Disconnected() { + p.logError("Can't read message from %s: %v", p, err) + } + break out + } + p.statMtx.Lock() + p.lastRecv = time.Now() + p.statMtx.Unlock() + + // Ensure version message comes first. + if _, ok := rmsg.(*btcwire.MsgVersion); !ok && !p.VersionKnown() { + p.logError("A version message must precede all others") + break out + } + + // Handle each supported message type. + markGood := false + switch msg := rmsg.(type) { + case *btcwire.MsgVersion: + p.handleVersionMsg(msg) + + case *btcwire.MsgVerAck: + // Do nothing. + + case *btcwire.MsgGetAddr: + p.handleGetAddrMsg(msg) + + case *btcwire.MsgAddr: + p.handleAddrMsg(msg) + markGood = true + + case *btcwire.MsgPing: + p.handlePingMsg(msg) + markGood = true + + case *btcwire.MsgPong: + p.handlePongMsg(msg) + + case *btcwire.MsgAlert: + p.server.BroadcastMessage(msg, p) + + case *btcwire.MsgNotFound: + // TODO(davec): Ignore this for now, but ultimately + // it should probably be used to detect when something + // we requested needs to be re-requested from another + // peer. + + default: + peerLog.Debugf("Received unhandled message of type %v: Fix Me", rmsg.Command()) + } + + // Mark the address as currently connected and working as of + // now if one of the messages that trigger it was processed. + if markGood && !p.Disconnected() { + if p.addr == nil { + peerLog.Warnf("we're getting stuff before we got a version message. that's bad") + continue + } + p.server.addrManager.MarkGood(p.addr) + } + // ok we got a message, reset the timer. + // timer just calls p.Disconnect() after logging. + idleTimer.Reset(idleTimeoutMinutes * time.Minute) + } + + idleTimer.Stop() + + // Ensure connection is closed and notify the server that the peer is done. + p.Disconnect() + p.server.donePeers <- p + + // Only tell block manager we are gone if we ever told it we existed. + if p.VersionKnown() { + p.server.blockManager.DonePeer(p) + } + + peerLog.Tracef("Peer input handler done for %s", p) +} + +// outHandler handles all outgoing messages for the peer. It must be run as a +// goroutine. It uses a buffered channel to serialize output messages while +// allowing the sender to continue running asynchronously. +func (p *peer) outHandler() { + pingTimer := time.AfterFunc(pingTimeoutMinutes*time.Minute, func() { + nonce, err := btcwire.RandomUint64() + if err != nil { + peerLog.Errorf("Not sending ping on timeout to %s: %v", + p, err) + return + } + p.QueueMessage(btcwire.NewMsgPing(nonce), nil) + }) +out: + for { + select { + case msg := <-p.outputQueue: + // If the message is one we should get a reply for + // then reset the timer, we only want to send pings + // when otherwise we would not receive a reply from + // the peer. + peerLog.Tracef("%s: received from outputQueue", p) + reset := true + switch m := msg.msg.(type) { + case *btcwire.MsgVersion: + // should get an ack + case *btcwire.MsgGetAddr: + // should get addresses + case *btcwire.MsgPing: + // expects pong + // Also set up statistics. + p.statMtx.Lock() + p.lastPingNonce = m.Nonce + p.lastPingTime = time.Now() + p.statMtx.Unlock() + default: + // Not one of the above, no sure reply. + // We want to ping if nothing else + // interesting happens. + reset = false + } + if reset { + pingTimer.Reset(pingTimeoutMinutes * time.Minute) + } + p.writeMessage(msg.msg) + p.statMtx.Lock() + p.lastSend = time.Now() + p.statMtx.Unlock() + if msg.doneChan != nil { + msg.doneChan <- true + } + + case <-p.quit: + break out + } + } + + pingTimer.Stop() + + // Drain outputQueue + for msg := range p.outputQueue { + if msg.doneChan != nil { + msg.doneChan <- false + } + } + peerLog.Tracef("Peer output handler done for %s", p) +} + +// QueueMessage adds the passed bitcoin message to the peer outputQueue. It +// uses a buffered channel to communicate with the output handler goroutine so +// it is automatically rate limited and safe for concurrent access. +func (p *peer) QueueMessage(msg btcwire.Message, doneChan chan bool) { + // Avoid risk of deadlock if goroutine already exited. The goroutine + // we will be sending to hangs around until it knows for a fact that + // it is marked as disconnected. *then* it drains the channels. + if p.Disconnected() { + // avoid deadlock... + if doneChan != nil { + go func() { + doneChan <- false + }() + } + return + } + p.outputQueue <- outMsg{msg: msg, doneChan: doneChan} +} + +// True if is (or will become) disconnected. +func (p *peer) Disconnected() bool { + return atomic.LoadInt32(&p.disconnected) == 1 +} + +// Disconnects the peer by closing the connection. It also sets +// a flag so the impending shutdown can be detected. +func (p *peer) Disconnect() { + p.connMtx.Lock(); defer p.connMtx.Unlock() + // did we win the race? + if atomic.AddInt32(&p.disconnected, 1) != 1 { + return + } + peerLog.Tracef("disconnecting %s", p) + close(p.quit) + if p.conn != nil { + p.conn.Close() + } +} + +// Sets the connection & starts +func (p *peer) StartWithConnection(conn *net.Conn) { + p.connMtx.Lock(); defer p.connMtx.Unlock() + if p.conn != nil { panic("Conn already set") } + if atomic.LoadInt32(&p.disconnected) == 1 { return } + peerLog.Debugf("Connected to %s", conn.RemoteAddr()) + p.timeConnected = time.Now() + p.conn = conn + p.Start() +} + +// Start begins processing input and output messages. It also sends the initial +// version message for outbound connections to start the negotiation process. +func (p *peer) Start() error { + // Already started? + if atomic.AddInt32(&p.started, 1) != 1 { + return nil + } + + peerLog.Tracef("Starting peer %s", p) + + // Send an initial version message if this is an outbound connection. + if !p.inbound { + p.pushVersionMsg() + } + + // Start processing input and output. + go p.inHandler() + go p.outHandler() + + return nil +} + +// Shutdown gracefully shuts down the peer by disconnecting it. +func (p *peer) Shutdown() { + peerLog.Tracef("Shutdown peer %s", p) + p.Disconnect() +} + +// newPeerBase returns a new base peer for the provided server and inbound flag. +// This is used by the newInboundPeer and newOutboundPeer functions to perform +// base setup needed by both types of peers. +func newPeerBase(s *server, inbound bool) *peer { + p := peer{ + server: s, + protocolVersion: maxProtocolVersion, + inbound: inbound, + knownAddresses: make(map[string]bool), + outputQueue: make(chan outMsg, outputBufferSize), + quit: make(chan bool), + } + return &p +} + +// newPeer returns a new inbound bitcoin peer for the provided server and +// connection. Use Start to begin processing incoming and outgoing messages. +func newInboundPeer(s *server, conn net.Conn) *peer { + addr := NewNetAddress(conn.RemoteAddr()) + // XXX What if p.addr doesn't match (to be) reported addr due to NAT? + s.addrManager.MarkAttempt(addr) + + p := newPeerBase(s, true) + p.conn = conn + p.addr = addr + p.timeConnected = time.Now() + return p +} + +// newOutbountPeer returns a new outbound bitcoin peer for the provided server and +// address and connects to it asynchronously. If the connection is successful +// then the peer will also be started. +func newOutboundPeer(s *server, addr *NetAddress, persistent bool) *peer { + p := newPeerBase(s, false) + p.addr = addr + p.persistent = persistent + + go func() { + // Mark this as one attempt, regardless of # of reconnects. + s.addrManager.MarkAttempt(p.addr) + retryCount := 0 + // Attempt to connect to the peer. If the connection fails and + // this is a persistent connection, retry after the retry + // interval. + for { + peerLog.Debugf("Attempting to connect to %s", addr) + conn, err := addr.Dial() + if err == nil { + p.StartWithConnection(conn) + return + } else { + retryCount++ + peerLog.Debugf("Failed to connect to %s: %v", addr, err) + if !persistent { + p.server.donePeers <- p + return + } + scaledInterval := connectionRetryInterval.Nanoseconds() * retryCount / 2 + scaledDuration := time.Duration(scaledInterval) + peerLog.Debugf("Retrying connection to %s in %s", addr, scaledDuration) + time.Sleep(scaledDuration) + continue + } + } + }() + return p +} + +// logError makes sure that we only log errors loudly on user peers. +func (p *peer) logError(fmt string, args ...interface{}) { + if p.persistent { + peerLog.Errorf(fmt, args...) + } else { + peerLog.Debugf(fmt, args...) + } +}