diff --git a/peer/addrbook.go b/peer/addrbook.go index 5e6d0f68e..872b4a39b 100644 --- a/peer/addrbook.go +++ b/peer/addrbook.go @@ -25,18 +25,18 @@ import ( type AddrBook struct { filePath string - mtx sync.Mutex - rand *rand.Rand - key [32]byte - addrNewIndex map[string]*knownAddress // addr.String() -> knownAddress - addrNew [newBucketCount]map[string]*knownAddress - addrOld [oldBucketCount][]*knownAddress - started int32 - shutdown int32 - wg sync.WaitGroup - quit chan struct{} - nOld int - nNew int + mtx sync.Mutex + rand *rand.Rand + key [32]byte + addrIndex map[string]*knownAddress // new & old + addrNew [newBucketCount]map[string]*knownAddress + addrOld [oldBucketCount][]*knownAddress + started int32 + shutdown int32 + wg sync.WaitGroup + quit chan struct{} + nOld int + nNew int } const ( @@ -80,12 +80,11 @@ const ( // days since the last success before we will consider evicting an address. minBadDays = 7 - // max addresses that we will send in response to a getAddr - // (in practise the most addresses we will return from a call to AddressCache()). - getAddrMax = 2500 + // max addresses that we will send in response to a GetSelection + getSelectionMax = 2500 - // % of total addresses known that we will share with a call to AddressCache. - getAddrPercent = 23 + // % of total addresses known that we will share with a call to GetSelection + getSelectionPercent = 23 // current version of the on-disk format. serializationVersion = 1 @@ -104,7 +103,7 @@ func NewAddrBook(filePath string) *AddrBook { // When modifying this, don't forget to update loadFromFile() func (a *AddrBook) init() { - a.addrNewIndex = make(map[string]*knownAddress) + a.addrIndex = make(map[string]*knownAddress) io.ReadFull(crand.Reader, a.key[:]) for i := range a.addrNew { a.addrNew[i] = make(map[string]*knownAddress) @@ -146,11 +145,15 @@ func (a *AddrBook) NeedMoreAddresses() bool { func (a *AddrBook) Size() int { a.mtx.Lock() defer a.mtx.Unlock() - return a.nOld + a.nNew + return a.size() } -// Pick a new address to connect to. -func (a *AddrBook) PickAddress(class string, newBias int) *knownAddress { +func (a *AddrBook) size() int { + return a.nNew + a.nOld +} + +// Pick an address to connect to with new/old bias. +func (a *AddrBook) PickAddress(newBias int) *knownAddress { a.mtx.Lock() defer a.mtx.Unlock() @@ -198,7 +201,7 @@ func (a *AddrBook) PickAddress(class string, newBias int) *knownAddress { func (a *AddrBook) MarkGood(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - ka := a.addrNewIndex[addr.String()] + ka := a.addrIndex[addr.String()] if ka == nil { return } @@ -211,13 +214,47 @@ func (a *AddrBook) MarkGood(addr *NetAddress) { func (a *AddrBook) MarkAttempt(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - ka := a.addrNewIndex[addr.String()] + ka := a.addrIndex[addr.String()] if ka == nil { return } ka.MarkAttempt() } +/* Peer exchange */ + +// GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols. +func (a *AddrBook) GetSelection() []*NetAddress { + a.mtx.Lock() + defer a.mtx.Unlock() + if a.size() == 0 { + return nil + } + + allAddr := make([]*NetAddress, a.size()) + i := 0 + for _, v := range a.addrIndex { + allAddr[i] = v.Addr + i++ + } + + numAddresses := len(allAddr) * getSelectionPercent / 100 + if numAddresses > getSelectionMax { + numAddresses = getSelectionMax + } + + // Fisher-Yates shuffle the array. We only need to do the first + // `numAddresses' since we are throwing the rest. + for i := 0; i < numAddresses; i++ { + // pick a number between current index and the end + j := rand.Intn(len(allAddr)-i) + i + allAddr[i], allAddr[j] = allAddr[j], allAddr[i] + } + + // slice off the limit we are willing to share. + return allAddr[:numAddresses] +} + /* Loading & Saving */ type addrBookJSON struct { @@ -290,22 +327,19 @@ func (a *AddrBook) loadFromFile(filePath string) { for i, newBucket := range aJSON.AddrNew { for _, ka := range newBucket { a.addrNew[i][ka.Addr.String()] = ka + a.addrIndex[ka.Addr.String()] = ka } } // Restore .addrOld for i, oldBucket := range aJSON.AddrOld { copy(a.addrOld[i], oldBucket) + for _, ka := range oldBucket { + a.addrIndex[ka.Addr.String()] = ka + } } // Restore simple fields a.nNew = aJSON.NumNew a.nOld = aJSON.NumOld - // Restore addrNewIndex - a.addrNewIndex = make(map[string]*knownAddress) - for _, newBucket := range a.addrNew { - for key, ka := range newBucket { - a.addrNewIndex[key] = ka - } - } } /* Private methods */ @@ -333,7 +367,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) { } key := addr.String() - ka := a.addrNewIndex[key] + ka := a.addrIndex[key] if ka != nil { // Already added @@ -351,7 +385,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) { } } else { ka = NewknownAddress(addr, src) - a.addrNewIndex[key] = ka + a.addrIndex[key] = ka a.nNew++ } @@ -387,7 +421,7 @@ func (a *AddrBook) expireNew(bucket int) { v.NewRefs-- if v.NewRefs == 0 { a.nNew-- - delete(a.addrNewIndex, k) + delete(a.addrIndex, k) } return } @@ -407,7 +441,7 @@ func (a *AddrBook) expireNew(bucket int) { oldest.NewRefs-- if oldest.NewRefs == 0 { a.nNew-- - delete(a.addrNewIndex, key) + delete(a.addrIndex, key) } } } @@ -452,12 +486,12 @@ func (a *AddrBook) moveToOld(ka *knownAddress) { newBucket = freedBucket } - // replace with ka in list. + // Replace with ka in list. ka.OldBucket = Int16(oldBucket) a.addrOld[oldBucket][rmkaIndex] = ka rmka.OldBucket = -1 - // put rmka into new bucket + // Put rmka into new bucket rmkey := rmka.Addr.String() log.Tracef("Replacing %s with %s in old", rmkey, addrKey) a.addrNew[newBucket][rmkey] = rmka diff --git a/peer/addrbook_test.go b/peer/addrbook_test.go index d03d53366..a91410985 100644 --- a/peer/addrbook_test.go +++ b/peer/addrbook_test.go @@ -96,7 +96,7 @@ func TestSaveAddresses(t *testing.T) { for _, addrSrc := range randAddrs { addr := addrSrc.addr src := addrSrc.src - ka := book.addrNewIndex[addr.String()] + ka := book.addrIndex[addr.String()] if ka == nil { t.Fatalf("Expected to find KnownAddress %v but wasn't there.", addr) } @@ -156,4 +156,6 @@ func TestPromoteToOld(t *testing.T) { // TODO: do more testing :) + selection := book.GetSelection() + t.Logf("selection: %v", selection) } diff --git a/peer/client.go b/peer/client.go index 6d4be538f..7619c8f1d 100644 --- a/peer/client.go +++ b/peer/client.go @@ -10,8 +10,6 @@ import ( "github.com/tendermint/tendermint/merkle" ) -// BUG(jae) handle peer disconnects - /* A client is half of a p2p system. It can reach out to the network and establish connections with other peers. @@ -41,8 +39,8 @@ type Client struct { } var ( - CLIENT_STOPPED_ERROR = errors.New("Client already stopped") - CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer") + ErrClientStopped = errors.New("Client already stopped") + ErrClientDuplicatePeer = errors.New("Duplicate peer") ) // "makePeerFn" is a factory method for generating new peers from new *Connections. @@ -101,7 +99,7 @@ func (c *Client) Stop() { func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) { if atomic.LoadUint32(&c.stopped) == 1 { - return nil, CLIENT_STOPPED_ERROR + return nil, ErrClientStopped } log.Infof("Adding peer with connection: %v, outgoing: %v", conn, outgoing) @@ -187,7 +185,7 @@ func (c *Client) addPeer(peer *Peer) error { c.peersMtx.Lock() defer c.peersMtx.Unlock() if c.stopped == 1 { - return CLIENT_STOPPED_ERROR + return ErrClientStopped } if !c.peers.Has(addr) { log.Tracef("Actually putting addr: %v, peer: %v", addr, peer) @@ -196,7 +194,7 @@ func (c *Client) addPeer(peer *Peer) error { } else { // ignore duplicate peer for addr. log.Infof("Ignoring duplicate peer for addr %v", addr) - return CLIENT_DUPLICATE_PEER_ERROR + return ErrClientDuplicatePeer } // unlock deferred } @@ -208,6 +206,7 @@ func (c *Client) peerErrorHandler() { return case errPeer := <-c.erroredPeers: log.Infof("%v errored: %v", errPeer.peer, errPeer.err) + // TODO: do more c.StopPeer(errPeer.peer) return } diff --git a/peer/client_test.go b/peer/client_test.go index a2043f17c..69b0ba053 100644 --- a/peer/client_test.go +++ b/peer/client_test.go @@ -47,7 +47,7 @@ func makeClientPair(t testing.TB, bufferSize int, chNames []String) (*Client, *C func TestClients(t *testing.T) { - channels := []String{"ch1", "ch2", "ch3"} + channels := []String{"ch1", "ch2", "ch3", "ch4", "ch5", "ch6", "ch7", "ch8", "ch9", "ch0"} c1, c2 := makeClientPair(t, 10, channels) defer c1.Stop() defer c2.Stop() @@ -87,7 +87,7 @@ func BenchmarkClients(b *testing.B) { b.StopTimer() - channels := []String{"ch1", "ch2", "ch3"} + channels := []String{"ch1", "ch2", "ch3", "ch4", "ch5", "ch6", "ch7", "ch8", "ch9", "ch0"} c1, c2 := makeClientPair(b, 10, channels) defer c1.Stop() defer c2.Stop() diff --git a/peer/connection.go b/peer/connection.go index 86cf77d3c..cd218ac2e 100644 --- a/peer/connection.go +++ b/peer/connection.go @@ -12,16 +12,14 @@ import ( ) const ( - READ_BUFFER_MIN_SIZE = 1024 - WRITE_BUFFER_MIN_SIZE = 1024 - FLUSH_THROTTLE_MS = 50 - OUT_QUEUE_SIZE = 50 - IDLE_TIMEOUT_MINUTES = 5 - PING_TIMEOUT_MINUTES = 2 + MinReadBufferSize = 1024 + MinWriteBufferSize = 1024 + FlushThrottleMS = 50 + OutQueueSize = 50 + IdleTimeoutMinutes = 5 + PingTimeoutMinutes = 2 ) -// BUG(jae): Handle disconnects. - /* A Connection wraps a network connection and handles buffering and multiplexing. "Packets" are sent with ".Send(Packet)". @@ -46,20 +44,20 @@ type Connection struct { } var ( - PACKET_TYPE_PING = UInt8(0x00) - PACKET_TYPE_PONG = UInt8(0x01) - PACKET_TYPE_MSG = UInt8(0x10) + PacketTypePing = UInt8(0x00) + PacketTypePong = UInt8(0x01) + PacketTypeMessage = UInt8(0x10) ) func NewConnection(conn net.Conn) *Connection { return &Connection{ - sendQueue: make(chan Packet, OUT_QUEUE_SIZE), + sendQueue: make(chan Packet, OutQueueSize), conn: conn, - bufReader: bufio.NewReaderSize(conn, READ_BUFFER_MIN_SIZE), - bufWriter: bufio.NewWriterSize(conn, WRITE_BUFFER_MIN_SIZE), - flushThrottler: NewThrottler(FLUSH_THROTTLE_MS * time.Millisecond), + bufReader: bufio.NewReaderSize(conn, MinReadBufferSize), + bufWriter: bufio.NewWriterSize(conn, MinWriteBufferSize), + flushThrottler: NewThrottler(FlushThrottleMS * time.Millisecond), quit: make(chan struct{}), - pingRepeatTimer: NewRepeatTimer(PING_TIMEOUT_MINUTES * time.Minute), + pingRepeatTimer: NewRepeatTimer(PingTimeoutMinutes * time.Minute), pong: make(chan struct{}), } } @@ -150,7 +148,7 @@ FOR_LOOP: select { case sendPkt := <-c.sendQueue: log.Tracef("Found pkt from sendQueue. Writing pkt to underlying connection") - _, err = PACKET_TYPE_MSG.WriteTo(c.bufWriter) + _, err = PacketTypeMessage.WriteTo(c.bufWriter) if err != nil { break } @@ -159,10 +157,10 @@ FOR_LOOP: case <-c.flushThrottler.Ch: c.flush() case <-c.pingRepeatTimer.Ch: - _, err = PACKET_TYPE_PING.WriteTo(c.bufWriter) + _, err = PacketTypePing.WriteTo(c.bufWriter) c.flush() case <-c.pong: - _, err = PACKET_TYPE_PONG.WriteTo(c.bufWriter) + _, err = PacketTypePong.WriteTo(c.bufWriter) c.flush() case <-c.quit: break FOR_LOOP @@ -202,13 +200,13 @@ FOR_LOOP: } switch pktType { - case PACKET_TYPE_PING: + case PacketTypePing: // TODO: keep track of these, make sure it isn't abused // as they cause flush()'s in the send buffer. c.pong <- struct{}{} - case PACKET_TYPE_PONG: + case PacketTypePong: // do nothing - case PACKET_TYPE_MSG: + case PacketTypeMessage: pkt, err := ReadPacketSafe(c.bufReader) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { diff --git a/peer/listener.go b/peer/listener.go index 0cc5b476a..0440a9383 100644 --- a/peer/listener.go +++ b/peer/listener.go @@ -30,7 +30,7 @@ type DefaultListener struct { } const ( - DEFAULT_BUFFERED_CONNECTIONS = 10 + NumBufferedConnections = 10 ) func NewDefaultListener(protocol string, listenAddr string) Listener { @@ -66,7 +66,7 @@ func NewDefaultListener(protocol string, listenAddr string) Listener { dl := &DefaultListener{ listener: listener, extAddr: extAddr, - connections: make(chan *Connection, DEFAULT_BUFFERED_CONNECTIONS), + connections: make(chan *Connection, NumBufferedConnections), } go dl.listenHandler() diff --git a/peer/msg.go b/peer/msg.go index bf9e6420d..3ee4cf380 100644 --- a/peer/msg.go +++ b/peer/msg.go @@ -6,59 +6,7 @@ import ( . "github.com/tendermint/tendermint/binary" ) -/* -Packet encapsulates a ByteSlice on a Channel. -*/ -type Packet struct { - Channel String - Bytes ByteSlice - // Hash -} - -func NewPacket(chName String, bytes ByteSlice) Packet { - return Packet{ - Channel: chName, - Bytes: bytes, - } -} - -func (p Packet) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(&p.Channel, w, n, err) - n, err = WriteOnto(&p.Bytes, w, n, err) - return -} - -func ReadPacketSafe(r io.Reader) (pkt Packet, err error) { - chName, err := ReadStringSafe(r) - if err != nil { - return - } - // TODO: packet length sanity check. - bytes, err := ReadByteSliceSafe(r) - if err != nil { - return - } - return NewPacket(chName, bytes), nil -} - -/* -InboundPacket extends Packet with fields relevant to incoming packets. -*/ -type InboundPacket struct { - Peer *Peer - Time Time - Packet -} - -/* -NewFilterMsg is not implemented. TODO -*/ -type NewFilterMsg struct { - ChName String - Filter interface{} // todo -} - -func (m *NewFilterMsg) WriteTo(w io.Writer) (int64, error) { - panic("TODO: implement") - return 0, nil // TODO +type Message interface { + Binary + Type() string } diff --git a/peer/peer.go b/peer/peer.go index bb25b5b18..5fbf47686 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -153,7 +153,7 @@ FOR_LOOP: // (none) } -/* Channel */ +/* Channel */ type Channel struct { name String @@ -182,6 +182,52 @@ func (c *Channel) SendQueue() chan<- Packet { return c.sendQueue } +/* Packet */ + +/* +Packet encapsulates a ByteSlice on a Channel. +*/ +type Packet struct { + Channel String + Bytes ByteSlice + // Hash +} + +func NewPacket(chName String, bytes ByteSlice) Packet { + return Packet{ + Channel: chName, + Bytes: bytes, + } +} + +func (p Packet) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteOnto(&p.Channel, w, n, err) + n, err = WriteOnto(&p.Bytes, w, n, err) + return +} + +func ReadPacketSafe(r io.Reader) (pkt Packet, err error) { + chName, err := ReadStringSafe(r) + if err != nil { + return + } + // TODO: packet length sanity check. + bytes, err := ReadByteSliceSafe(r) + if err != nil { + return + } + return NewPacket(chName, bytes), nil +} + +/* +InboundPacket extends Packet with fields relevant to incoming packets. +*/ +type InboundPacket struct { + Peer *Peer + Time Time + Packet +} + /* Misc */ type peerError struct {