From 017bc2794efc36ff76183e7a6817b47888e21200 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 3 Aug 2014 18:16:32 -0700 Subject: [PATCH] with pull --- sim/bench.go | 112 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 97 insertions(+), 15 deletions(-) diff --git a/sim/bench.go b/sim/bench.go index 7188d1a04..162b5d141 100644 --- a/sim/bench.go +++ b/sim/bench.go @@ -4,15 +4,16 @@ import ( "container/heap" "fmt" "math/rand" + "strings" ) const seed = 0 -const numNodes = 50000 // Total number of nodes to simulate -const minNumPeers = 10 // Each node should be connected to at least this many peers -const maxNumPeers = 15 // ... and at most this many +const numNodes = 30000 // Total number of nodes to simulate +const minNumPeers = 7 // Each node should be connected to at least this many peers +const maxNumPeers = 10 // ... and at most this many const latencyMS = int32(500) // One way packet latency -const partTxMS = int32(10) // Transmission time per peer of 4KB of data. -const sendQueueCapacity = 100 // Amount of messages to queue between peers. +const partTxMS = int32(100) // Transmission time per peer of 4KB of data. +const sendQueueCapacity = 5 // Amount of messages to queue between peers. func init() { rand.Seed(seed) @@ -49,6 +50,11 @@ func (p *Peer) sendEventData(event EventData) bool { } } +// Returns true if the sendQueue is not "full" +func (p *Peer) canSendData(now int32) bool { + return (p.sent - now) < sendQueueCapacity +} + // Since EventPart events are much smaller, we don't consider the transmit time, // and assume that the sendQueue is always free. func (p *Peer) sendEventParts(event EventParts) { @@ -110,6 +116,44 @@ func (n *Node) isFull() bool { return true } +func (n *Node) pickRandomForPeer(peer *Peer) (part uint8, ok bool) { + peerParts := peer.parts + nodeParts := n.parts + randStart := rand.Intn(32) + for i:=0; i<32; i++ { + bytei := uint8((i+randStart) % 32) + nByte := nodeParts[bytei] + pByte := peerParts[bytei] + iHas := nByte & ^pByte + if iHas > 0 { + randBitStart := rand.Intn(8) + //fmt.Println("//--") + for j:=0; j<8; j++ { + biti := uint8((j+randBitStart) % 8) + //fmt.Printf("%X %v %v %v\n", iHas, j, biti, randBitStart) + if (iHas & (1< 0 { + return 8*bytei + biti, true + } + } + panic("should not happen") + } + } + return 0, false +} + +func (n *Node) debug() { + lines := []string{} + lines = append(lines, n.String()) + lines = append(lines, fmt.Sprintf("events: %v, parts: %X", n.events.Len(), n.parts)) + for _, p := range n.peers { + part, ok := n.pickRandomForPeer(p) + lines = append(lines, fmt.Sprintf("peer sent: %v, parts: %X, (%v/%v)", p.sent, p.parts, part, ok)) + } + fmt.Println("//---------------") + fmt.Println(strings.Join(lines, "\n")) + fmt.Println("//---------------") +} + func (n *Node) String() string { return fmt.Sprintf("{N:%d}", n.index) } @@ -123,6 +167,7 @@ type Event interface { type EventData struct { time int32 // time of receipt. + src int // src node's peer index on destination node part uint8 } @@ -135,7 +180,7 @@ func (e EventData) SetRecvTime(time int32) { } func (e EventData) String() string { - return fmt.Sprintf("[%d:%d]", e.time, e.part) + return fmt.Sprintf("[%d:%d:%d]", e.time, e.src, e.part) } type EventParts struct { @@ -230,6 +275,7 @@ func main() { recvTime := timeMS + latencyMS + partTxMS event := EventData{ time: recvTime, + src: peer.remote, part: part, } peer.sendEventData(event) @@ -246,7 +292,7 @@ func main() { // Print out the network for debugging if true { - for i := 40000; i < 40050; i++ { + for i := 0; i < 40; i++ { node := nodes[i] fmt.Printf("[%v] parts: %X\n", node.index, node.parts) } @@ -274,20 +320,47 @@ func main() { } // Let's iterate over peers & see which needs this piece. - recvTime := event.time + latencyMS + partTxMS for _, peer := range node.peers { - if peer.knownToHave(event.part) { - continue - } - peer.sendEventData(EventData{ - time: recvTime, - part: event.part, - }) + if !peer.knownToHave(event.part) { + peer.sendEventData(EventData{ + time: event.time + latencyMS + partTxMS, + src: peer.remote, + part: event.part, + }) + } else { + continue + } } case EventParts: event := _event.(EventParts) node.peers[event.src].parts = event.parts + peer := node.peers[event.src] + + // Lets blast the peer with random parts. + randomSent := 0 + randomSentErr := 0 + for peer.canSendData(event.time) { + part, ok := node.pickRandomForPeer(peer) + if ok { + randomSent += 1 + sent := peer.sendEventData(EventData{ + time: event.time + latencyMS + partTxMS, + src: peer.remote, + part: part, + }) + if !sent { + randomSentErr += 1 + } + } else { + break + } + } + /* + if randomSent > 0 { + fmt.Printf("radom sent: %v %v", randomSent, randomSentErr) + } + */ } } @@ -302,6 +375,15 @@ func main() { // Lets increment the timeMS now timeMS += latencyMS / 2 + // Debug + if timeMS >= 25000 { + nodes[1].debug() + for e := nodes[1].events.Pop(); e != nil; e = nodes[1].events.Pop() { + fmt.Println(e) + } + return + } + // Send EventParts rather frequently. It's cheap. for _, node := range nodes { for _, peer := range node.peers {