Browse Source

cleanup

pull/9/head
Jae Kwon 11 years ago
parent
commit
ec89eb168d
8 changed files with 153 additions and 126 deletions
  1. +70
    -36
      peer/addrbook.go
  2. +3
    -1
      peer/addrbook_test.go
  3. +6
    -7
      peer/client.go
  4. +2
    -2
      peer/client_test.go
  5. +20
    -22
      peer/connection.go
  6. +2
    -2
      peer/listener.go
  7. +3
    -55
      peer/msg.go
  8. +47
    -1
      peer/peer.go

+ 70
- 36
peer/addrbook.go View File

@ -25,18 +25,18 @@ import (
type AddrBook struct {
filePath string
mtx sync.Mutex
rand *rand.Rand
key [32]byte
addrNewIndex map[string]*knownAddress // addr.String() -> knownAddress
addrNew [newBucketCount]map[string]*knownAddress
addrOld [oldBucketCount][]*knownAddress
started int32
shutdown int32
wg sync.WaitGroup
quit chan struct{}
nOld int
nNew int
mtx sync.Mutex
rand *rand.Rand
key [32]byte
addrIndex map[string]*knownAddress // new & old
addrNew [newBucketCount]map[string]*knownAddress
addrOld [oldBucketCount][]*knownAddress
started int32
shutdown int32
wg sync.WaitGroup
quit chan struct{}
nOld int
nNew int
}
const (
@ -80,12 +80,11 @@ const (
// days since the last success before we will consider evicting an address.
minBadDays = 7
// max addresses that we will send in response to a getAddr
// (in practise the most addresses we will return from a call to AddressCache()).
getAddrMax = 2500
// max addresses that we will send in response to a GetSelection
getSelectionMax = 2500
// % of total addresses known that we will share with a call to AddressCache.
getAddrPercent = 23
// % of total addresses known that we will share with a call to GetSelection
getSelectionPercent = 23
// current version of the on-disk format.
serializationVersion = 1
@ -104,7 +103,7 @@ func NewAddrBook(filePath string) *AddrBook {
// When modifying this, don't forget to update loadFromFile()
func (a *AddrBook) init() {
a.addrNewIndex = make(map[string]*knownAddress)
a.addrIndex = make(map[string]*knownAddress)
io.ReadFull(crand.Reader, a.key[:])
for i := range a.addrNew {
a.addrNew[i] = make(map[string]*knownAddress)
@ -146,11 +145,15 @@ func (a *AddrBook) NeedMoreAddresses() bool {
func (a *AddrBook) Size() int {
a.mtx.Lock()
defer a.mtx.Unlock()
return a.nOld + a.nNew
return a.size()
}
// Pick a new address to connect to.
func (a *AddrBook) PickAddress(class string, newBias int) *knownAddress {
func (a *AddrBook) size() int {
return a.nNew + a.nOld
}
// Pick an address to connect to with new/old bias.
func (a *AddrBook) PickAddress(newBias int) *knownAddress {
a.mtx.Lock()
defer a.mtx.Unlock()
@ -198,7 +201,7 @@ func (a *AddrBook) PickAddress(class string, newBias int) *knownAddress {
func (a *AddrBook) MarkGood(addr *NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.addrNewIndex[addr.String()]
ka := a.addrIndex[addr.String()]
if ka == nil {
return
}
@ -211,13 +214,47 @@ func (a *AddrBook) MarkGood(addr *NetAddress) {
func (a *AddrBook) MarkAttempt(addr *NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.addrNewIndex[addr.String()]
ka := a.addrIndex[addr.String()]
if ka == nil {
return
}
ka.MarkAttempt()
}
/* Peer exchange */
// GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
func (a *AddrBook) GetSelection() []*NetAddress {
a.mtx.Lock()
defer a.mtx.Unlock()
if a.size() == 0 {
return nil
}
allAddr := make([]*NetAddress, a.size())
i := 0
for _, v := range a.addrIndex {
allAddr[i] = v.Addr
i++
}
numAddresses := len(allAddr) * getSelectionPercent / 100
if numAddresses > getSelectionMax {
numAddresses = getSelectionMax
}
// Fisher-Yates shuffle the array. We only need to do the first
// `numAddresses' since we are throwing the rest.
for i := 0; i < numAddresses; i++ {
// pick a number between current index and the end
j := rand.Intn(len(allAddr)-i) + i
allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
}
// slice off the limit we are willing to share.
return allAddr[:numAddresses]
}
/* Loading & Saving */
type addrBookJSON struct {
@ -290,22 +327,19 @@ func (a *AddrBook) loadFromFile(filePath string) {
for i, newBucket := range aJSON.AddrNew {
for _, ka := range newBucket {
a.addrNew[i][ka.Addr.String()] = ka
a.addrIndex[ka.Addr.String()] = ka
}
}
// Restore .addrOld
for i, oldBucket := range aJSON.AddrOld {
copy(a.addrOld[i], oldBucket)
for _, ka := range oldBucket {
a.addrIndex[ka.Addr.String()] = ka
}
}
// Restore simple fields
a.nNew = aJSON.NumNew
a.nOld = aJSON.NumOld
// Restore addrNewIndex
a.addrNewIndex = make(map[string]*knownAddress)
for _, newBucket := range a.addrNew {
for key, ka := range newBucket {
a.addrNewIndex[key] = ka
}
}
}
/* Private methods */
@ -333,7 +367,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) {
}
key := addr.String()
ka := a.addrNewIndex[key]
ka := a.addrIndex[key]
if ka != nil {
// Already added
@ -351,7 +385,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) {
}
} else {
ka = NewknownAddress(addr, src)
a.addrNewIndex[key] = ka
a.addrIndex[key] = ka
a.nNew++
}
@ -387,7 +421,7 @@ func (a *AddrBook) expireNew(bucket int) {
v.NewRefs--
if v.NewRefs == 0 {
a.nNew--
delete(a.addrNewIndex, k)
delete(a.addrIndex, k)
}
return
}
@ -407,7 +441,7 @@ func (a *AddrBook) expireNew(bucket int) {
oldest.NewRefs--
if oldest.NewRefs == 0 {
a.nNew--
delete(a.addrNewIndex, key)
delete(a.addrIndex, key)
}
}
}
@ -452,12 +486,12 @@ func (a *AddrBook) moveToOld(ka *knownAddress) {
newBucket = freedBucket
}
// replace with ka in list.
// Replace with ka in list.
ka.OldBucket = Int16(oldBucket)
a.addrOld[oldBucket][rmkaIndex] = ka
rmka.OldBucket = -1
// put rmka into new bucket
// Put rmka into new bucket
rmkey := rmka.Addr.String()
log.Tracef("Replacing %s with %s in old", rmkey, addrKey)
a.addrNew[newBucket][rmkey] = rmka


+ 3
- 1
peer/addrbook_test.go View File

@ -96,7 +96,7 @@ func TestSaveAddresses(t *testing.T) {
for _, addrSrc := range randAddrs {
addr := addrSrc.addr
src := addrSrc.src
ka := book.addrNewIndex[addr.String()]
ka := book.addrIndex[addr.String()]
if ka == nil {
t.Fatalf("Expected to find KnownAddress %v but wasn't there.", addr)
}
@ -156,4 +156,6 @@ func TestPromoteToOld(t *testing.T) {
// TODO: do more testing :)
selection := book.GetSelection()
t.Logf("selection: %v", selection)
}

+ 6
- 7
peer/client.go View File

@ -10,8 +10,6 @@ import (
"github.com/tendermint/tendermint/merkle"
)
// BUG(jae) handle peer disconnects
/*
A client is half of a p2p system.
It can reach out to the network and establish connections with other peers.
@ -41,8 +39,8 @@ type Client struct {
}
var (
CLIENT_STOPPED_ERROR = errors.New("Client already stopped")
CLIENT_DUPLICATE_PEER_ERROR = errors.New("Duplicate peer")
ErrClientStopped = errors.New("Client already stopped")
ErrClientDuplicatePeer = errors.New("Duplicate peer")
)
// "makePeerFn" is a factory method for generating new peers from new *Connections.
@ -101,7 +99,7 @@ func (c *Client) Stop() {
func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) {
if atomic.LoadUint32(&c.stopped) == 1 {
return nil, CLIENT_STOPPED_ERROR
return nil, ErrClientStopped
}
log.Infof("Adding peer with connection: %v, outgoing: %v", conn, outgoing)
@ -187,7 +185,7 @@ func (c *Client) addPeer(peer *Peer) error {
c.peersMtx.Lock()
defer c.peersMtx.Unlock()
if c.stopped == 1 {
return CLIENT_STOPPED_ERROR
return ErrClientStopped
}
if !c.peers.Has(addr) {
log.Tracef("Actually putting addr: %v, peer: %v", addr, peer)
@ -196,7 +194,7 @@ func (c *Client) addPeer(peer *Peer) error {
} else {
// ignore duplicate peer for addr.
log.Infof("Ignoring duplicate peer for addr %v", addr)
return CLIENT_DUPLICATE_PEER_ERROR
return ErrClientDuplicatePeer
}
// unlock deferred
}
@ -208,6 +206,7 @@ func (c *Client) peerErrorHandler() {
return
case errPeer := <-c.erroredPeers:
log.Infof("%v errored: %v", errPeer.peer, errPeer.err)
// TODO: do more
c.StopPeer(errPeer.peer)
return
}


+ 2
- 2
peer/client_test.go View File

@ -47,7 +47,7 @@ func makeClientPair(t testing.TB, bufferSize int, chNames []String) (*Client, *C
func TestClients(t *testing.T) {
channels := []String{"ch1", "ch2", "ch3"}
channels := []String{"ch1", "ch2", "ch3", "ch4", "ch5", "ch6", "ch7", "ch8", "ch9", "ch0"}
c1, c2 := makeClientPair(t, 10, channels)
defer c1.Stop()
defer c2.Stop()
@ -87,7 +87,7 @@ func BenchmarkClients(b *testing.B) {
b.StopTimer()
channels := []String{"ch1", "ch2", "ch3"}
channels := []String{"ch1", "ch2", "ch3", "ch4", "ch5", "ch6", "ch7", "ch8", "ch9", "ch0"}
c1, c2 := makeClientPair(b, 10, channels)
defer c1.Stop()
defer c2.Stop()


+ 20
- 22
peer/connection.go View File

@ -12,16 +12,14 @@ import (
)
const (
READ_BUFFER_MIN_SIZE = 1024
WRITE_BUFFER_MIN_SIZE = 1024
FLUSH_THROTTLE_MS = 50
OUT_QUEUE_SIZE = 50
IDLE_TIMEOUT_MINUTES = 5
PING_TIMEOUT_MINUTES = 2
MinReadBufferSize = 1024
MinWriteBufferSize = 1024
FlushThrottleMS = 50
OutQueueSize = 50
IdleTimeoutMinutes = 5
PingTimeoutMinutes = 2
)
// BUG(jae): Handle disconnects.
/*
A Connection wraps a network connection and handles buffering and multiplexing.
"Packets" are sent with ".Send(Packet)".
@ -46,20 +44,20 @@ type Connection struct {
}
var (
PACKET_TYPE_PING = UInt8(0x00)
PACKET_TYPE_PONG = UInt8(0x01)
PACKET_TYPE_MSG = UInt8(0x10)
PacketTypePing = UInt8(0x00)
PacketTypePong = UInt8(0x01)
PacketTypeMessage = UInt8(0x10)
)
func NewConnection(conn net.Conn) *Connection {
return &Connection{
sendQueue: make(chan Packet, OUT_QUEUE_SIZE),
sendQueue: make(chan Packet, OutQueueSize),
conn: conn,
bufReader: bufio.NewReaderSize(conn, READ_BUFFER_MIN_SIZE),
bufWriter: bufio.NewWriterSize(conn, WRITE_BUFFER_MIN_SIZE),
flushThrottler: NewThrottler(FLUSH_THROTTLE_MS * time.Millisecond),
bufReader: bufio.NewReaderSize(conn, MinReadBufferSize),
bufWriter: bufio.NewWriterSize(conn, MinWriteBufferSize),
flushThrottler: NewThrottler(FlushThrottleMS * time.Millisecond),
quit: make(chan struct{}),
pingRepeatTimer: NewRepeatTimer(PING_TIMEOUT_MINUTES * time.Minute),
pingRepeatTimer: NewRepeatTimer(PingTimeoutMinutes * time.Minute),
pong: make(chan struct{}),
}
}
@ -150,7 +148,7 @@ FOR_LOOP:
select {
case sendPkt := <-c.sendQueue:
log.Tracef("Found pkt from sendQueue. Writing pkt to underlying connection")
_, err = PACKET_TYPE_MSG.WriteTo(c.bufWriter)
_, err = PacketTypeMessage.WriteTo(c.bufWriter)
if err != nil {
break
}
@ -159,10 +157,10 @@ FOR_LOOP:
case <-c.flushThrottler.Ch:
c.flush()
case <-c.pingRepeatTimer.Ch:
_, err = PACKET_TYPE_PING.WriteTo(c.bufWriter)
_, err = PacketTypePing.WriteTo(c.bufWriter)
c.flush()
case <-c.pong:
_, err = PACKET_TYPE_PONG.WriteTo(c.bufWriter)
_, err = PacketTypePong.WriteTo(c.bufWriter)
c.flush()
case <-c.quit:
break FOR_LOOP
@ -202,13 +200,13 @@ FOR_LOOP:
}
switch pktType {
case PACKET_TYPE_PING:
case PacketTypePing:
// TODO: keep track of these, make sure it isn't abused
// as they cause flush()'s in the send buffer.
c.pong <- struct{}{}
case PACKET_TYPE_PONG:
case PacketTypePong:
// do nothing
case PACKET_TYPE_MSG:
case PacketTypeMessage:
pkt, err := ReadPacketSafe(c.bufReader)
if err != nil {
if atomic.LoadUint32(&c.stopped) != 1 {


+ 2
- 2
peer/listener.go View File

@ -30,7 +30,7 @@ type DefaultListener struct {
}
const (
DEFAULT_BUFFERED_CONNECTIONS = 10
NumBufferedConnections = 10
)
func NewDefaultListener(protocol string, listenAddr string) Listener {
@ -66,7 +66,7 @@ func NewDefaultListener(protocol string, listenAddr string) Listener {
dl := &DefaultListener{
listener: listener,
extAddr: extAddr,
connections: make(chan *Connection, DEFAULT_BUFFERED_CONNECTIONS),
connections: make(chan *Connection, NumBufferedConnections),
}
go dl.listenHandler()


+ 3
- 55
peer/msg.go View File

@ -6,59 +6,7 @@ import (
. "github.com/tendermint/tendermint/binary"
)
/*
Packet encapsulates a ByteSlice on a Channel.
*/
type Packet struct {
Channel String
Bytes ByteSlice
// Hash
}
func NewPacket(chName String, bytes ByteSlice) Packet {
return Packet{
Channel: chName,
Bytes: bytes,
}
}
func (p Packet) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(&p.Channel, w, n, err)
n, err = WriteOnto(&p.Bytes, w, n, err)
return
}
func ReadPacketSafe(r io.Reader) (pkt Packet, err error) {
chName, err := ReadStringSafe(r)
if err != nil {
return
}
// TODO: packet length sanity check.
bytes, err := ReadByteSliceSafe(r)
if err != nil {
return
}
return NewPacket(chName, bytes), nil
}
/*
InboundPacket extends Packet with fields relevant to incoming packets.
*/
type InboundPacket struct {
Peer *Peer
Time Time
Packet
}
/*
NewFilterMsg is not implemented. TODO
*/
type NewFilterMsg struct {
ChName String
Filter interface{} // todo
}
func (m *NewFilterMsg) WriteTo(w io.Writer) (int64, error) {
panic("TODO: implement")
return 0, nil // TODO
type Message interface {
Binary
Type() string
}

+ 47
- 1
peer/peer.go View File

@ -153,7 +153,7 @@ FOR_LOOP:
// (none)
}
/* Channel */
/* Channel */
type Channel struct {
name String
@ -182,6 +182,52 @@ func (c *Channel) SendQueue() chan<- Packet {
return c.sendQueue
}
/* Packet */
/*
Packet encapsulates a ByteSlice on a Channel.
*/
type Packet struct {
Channel String
Bytes ByteSlice
// Hash
}
func NewPacket(chName String, bytes ByteSlice) Packet {
return Packet{
Channel: chName,
Bytes: bytes,
}
}
func (p Packet) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteOnto(&p.Channel, w, n, err)
n, err = WriteOnto(&p.Bytes, w, n, err)
return
}
func ReadPacketSafe(r io.Reader) (pkt Packet, err error) {
chName, err := ReadStringSafe(r)
if err != nil {
return
}
// TODO: packet length sanity check.
bytes, err := ReadByteSliceSafe(r)
if err != nil {
return
}
return NewPacket(chName, bytes), nil
}
/*
InboundPacket extends Packet with fields relevant to incoming packets.
*/
type InboundPacket struct {
Peer *Peer
Time Time
Packet
}
/* Misc */
type peerError struct {


Loading…
Cancel
Save