diff --git a/sim/bench.go b/sim/bench_block.go similarity index 100% rename from sim/bench.go rename to sim/bench_block.go diff --git a/sim/bench_votes.go b/sim/bench_votes.go new file mode 100644 index 000000000..86ac62b0e --- /dev/null +++ b/sim/bench_votes.go @@ -0,0 +1,619 @@ +package main + +import ( + "container/heap" + "fmt" + "math/rand" + "strings" +) + +const seed = 0 +const numNodes = 6400 // Total number of nodes to simulate +const numNodes8 = (numNodes + 7) / 8 +const minNumPeers = 7 // Each node should be connected to at least this many peers +const maxNumPeers = 10 // ... and at most this many +const latencyMS = int32(500) // One way packet latency +const partTxMS = int32(3) // Transmission time per peer of 100B of data. +const sendQueueCapacity = 40 // Amount of messages to queue between peers. +const maxAllowableRank = 2 // After this, the data is considered waste. +const tryUnsolicited = 0.1 // Chance of sending an unsolicited piece of data. + +func init() { + rand.Seed(seed) +} + +//----------------------------------------------------------------------------- + +type Peer struct { + node *Node // Pointer to node + sent int32 // Time of last packet send, including transmit time. + remote int // SomeNode.peers[x].node.peers[remote].node is SomeNode for all x. + wanted []byte // Bitarray of wanted pieces. + given []byte // Bitarray of given pieces. +} + +func newPeer(pNode *Node, remote int) *Peer { + peer := &Peer{ + node: pNode, + remote: remote, + wanted: make([]byte, numNodes8), + given: make([]byte, numNodes8), + } + for i := 0; i < numNodes8; i++ { + peer.wanted[i] = byte(0xff) + } + return peer +} + +// Send a data event to the peer, or return false if queue is "full". +// Depending on how many event packets are "queued" for peer, +// the actual recvTime may be adjusted to be later. +func (p *Peer) sendEventData(event EventData) bool { + desiredRecvTime := event.RecvTime() + minRecvTime := p.sent + partTxMS + latencyMS + if desiredRecvTime >= minRecvTime { + p.node.sendEvent(event) + p.sent += partTxMS + return true + } else { + if (minRecvTime-desiredRecvTime)/partTxMS > sendQueueCapacity { + return false + } else { + event.SetRecvTime(minRecvTime) // Adjust recvTime + p.node.sendEvent(event) + p.sent += partTxMS + return true + } + } +} + +// Returns true if the sendQueue is not "full" +func (p *Peer) canSendData(now int32) bool { + return (p.sent - now) < sendQueueCapacity +} + +// Since EventPart events are much smaller, we don't consider the transmit time, +// and assume that the sendQueue is always free. +func (p *Peer) sendEventDataResponse(event EventDataResponse) { + p.node.sendEvent(event) +} + +// Does the peer's .wanted (as received by an EventDataResponse event) contain part? +func (p *Peer) wants(part uint16) bool { + return p.wanted[part/8]&(1<<(part%8)) > 0 +} + +func (p *Peer) setWants(part uint16, want bool) { + if want { + p.wanted[part/8] |= (1 << (part % 8)) + } else { + p.wanted[part/8] &= ^(1 << (part % 8)) + } +} + +// Did we give peer a part +func (p *Peer) gave(part uint16) bool { + return p.given[part/8]&(1<<(part%8)) > 0 +} + +func (p *Peer) setGave(part uint16) { + p.given[part/8] |= (1 << (part % 8)) +} + +// Reset state in preparation for new "round" +func (p *Peer) reset() { + for i := 0; i < numNodes8; i++ { + p.given[i] = byte(0x00) + } + p.sent = 0 +} + +//----------------------------------------------------------------------------- + +type Node struct { + index int + peers []*Peer + parts []byte // Bitarray of received parts. + partsCount []uint8 // Count of how many times parts were received. + events *Heap +} + +// Reset state in preparation for new "round" +func (n *Node) reset() { + for i := 0; i < numNodes8; i++ { + n.parts[i] = byte(0x00) + } + for i := 0; i < numNodes; i++ { + n.partsCount[i] = uint8(0) + } + n.events = NewHeap() + for _, peer := range n.peers { + peer.reset() + } +} + +func (n *Node) fill() float64 { + gotten := 0 + for _, count := range n.partsCount { + if count > 0 { + gotten += 1 + } + } + return float64(gotten) / float64(numNodes) +} + +func (n *Node) sendEvent(event Event) { + n.events.Push(event, event.RecvTime()) +} + +func (n *Node) recvEvent() Event { + return n.events.Pop().(Event) +} + +func (n *Node) receive(part uint16) uint8 { + /* + defer func() { + e := recover() + if e != nil { + fmt.Println(part, len(n.parts), len(n.partsCount), part/8) + panic(e) + } + }() + */ + n.parts[part/8] |= (1 << (part % 8)) + n.partsCount[part] += 1 + return n.partsCount[part] +} + +// returns false if already connected, or remote node has too many connections. +func (n *Node) canConnectTo(node *Node) bool { + if len(n.peers) > maxNumPeers { + return false + } + for _, peer := range n.peers { + if peer.node == node { + return false + } + } + return true +} + +func (n *Node) isFull() bool { + for _, count := range n.partsCount { + if count == 0 { + return false + } + } + return true +} + +func (n *Node) pickRandomForPeer(peer *Peer) (part uint16, ok bool) { + // Pick a random piece that node has and give it to peer. + // Do not pick a piece that has already been given. + peerGiven := peer.given + nodeParts := n.parts + randStart := rand.Intn(numNodes8) + for i := 0; i < numNodes8; i++ { + bytei := uint16((i + randStart) % numNodes8) + pByte := peerGiven[bytei] + nByte := nodeParts[bytei] + iHas := nByte & ^pByte // iHas something to give :) + if iHas > 0 { + randBitStart := rand.Intn(8) + //fmt.Println("//--") + for j := 0; j < 8; j++ { + biti := uint16((j + randBitStart) % 8) + //fmt.Printf("%X %v %v %v\n", iHas, j, biti, randBitStart) + if (iHas & (1 << biti)) > 0 { + return 8*bytei + biti, true + } + } + panic("should not happen") + } + } + return 0, false +} + +func (n *Node) debug() { + lines := []string{} + lines = append(lines, n.String()) + lines = append(lines, fmt.Sprintf("events: %v, parts: %X", n.events.Len(), n.parts)) + for _, p := range n.peers { + part, ok := n.pickRandomForPeer(p) + lines = append(lines, fmt.Sprintf("peer sent: %v, given: %X, (%v/%v)", p.sent, p.given, part, ok)) + } + fmt.Println("//---------------") + fmt.Println(strings.Join(lines, "\n")) + fmt.Println("//---------------") +} + +func (n *Node) String() string { + return fmt.Sprintf("{N:%d}", n.index) +} + +//----------------------------------------------------------------------------- + +type Event interface { + RecvTime() int32 + SetRecvTime(int32) +} + +type EventData struct { + time int32 // time of receipt. + src int // src node's peer index on destination node + part uint16 +} + +func (e EventData) RecvTime() int32 { + return e.time +} + +func (e EventData) SetRecvTime(time int32) { + e.time = time +} + +func (e EventData) String() string { + return fmt.Sprintf("[%d:%d:%d]", e.time, e.src, e.part) +} + +type EventDataResponse struct { + time int32 // time of receipt. + src int // src node's peer index on destination node. + part uint16 // in response to given part + rank uint8 // if this is 1, node was first to give peer part. +} + +func (e EventDataResponse) RecvTime() int32 { + return e.time +} + +func (e EventDataResponse) SetRecvTime(time int32) { + e.time = time +} + +func (e EventDataResponse) String() string { + return fmt.Sprintf("[%d:%d:%d:%d]", e.time, e.src, e.part, e.rank) +} + +//----------------------------------------------------------------------------- + +func createNetwork() []*Node { + nodes := make([]*Node, numNodes) + for i := 0; i < numNodes; i++ { + n := &Node{ + index: i, + peers: []*Peer{}, + parts: make([]byte, numNodes8), + partsCount: make([]uint8, numNodes), + events: NewHeap(), + } + nodes[i] = n + } + for i := 0; i < numNodes; i++ { + n := nodes[i] + for j := 0; j < minNumPeers; j++ { + if len(n.peers) > j { + // Already set, continue + continue + } + pidx := rand.Intn(numNodes) + for !n.canConnectTo(nodes[pidx]) { + pidx = rand.Intn(numNodes) + } + // connect to nodes[pidx] + remote := nodes[pidx] + remote_j := len(remote.peers) + n.peers = append(n.peers, newPeer(remote, remote_j)) + remote.peers = append(remote.peers, newPeer(n, j)) + } + } + return nodes +} + +func printNodes(nodes []*Node) { + for _, node := range nodes { + peerStr := "" + for _, peer := range node.peers { + peerStr += fmt.Sprintf(" %v", peer.node.index) + } + fmt.Printf("[%v] peers: %v\n", node.index, peerStr) + } +} + +func countFull(nodes []*Node) (fullCount int) { + for _, node := range nodes { + if node.isFull() { + fullCount += 1 + } + } + return fullCount +} + +type runStat struct { + time int32 // time for all events to propagate + fill float64 // avg % of pieces gotten + full float64 // % of times the sendQueue was full +} + +func (s runStat) String() string { + return fmt.Sprintf("{t:%v/fi:%.5f/fu:%.5f}", s.time, s.fill, s.full) +} + +func main() { + + // Global vars + nodes := createNetwork() + runStats := []runStat{} + //printNodes(nodes[:]) + + // Keep iterating and improving .wanted + for { + timeMS := int32(0) + + // Each node sends a part to its peers. + for _, node := range nodes { + // reset all node state. + node.reset() + } + + // Each node sends a part to its peers. + for i, node := range nodes { + // TODO: make it staggered. + timeMS := int32(0) // scoped + for _, peer := range node.peers { + recvTime := timeMS + latencyMS + partTxMS + event := EventData{ + time: recvTime, + src: peer.remote, + part: uint16(i), + } + peer.sendEventData(event) + //timeMS += partTxMS + } + } + + numEventsZero := 0 // times no events have occured + numSendSuccess := 0 // times data send was successful + numSendFailure := 0 // times data send failed due to queue being full + + // Run simulation + for { + // Lets run the simulation for each user until endTimeMS + // We use latencyMS/2 since causality has at least this much lag. + endTimeMS := timeMS + latencyMS/2 + + // Print out the network for debugging + /* + fmt.Printf("simulating until %v\n", endTimeMS) + if true { + for i := 0; i < 40; i++ { + node := nodes[i] + fmt.Printf("[%v] parts: %X\n", node.index, node.parts) + } + } + */ + + numEvents := 0 + for _, node := range nodes { + + // Iterate over the events of this node until event.time >= endTimeMS + for { + _event, ok := node.events.Peek().(Event) + if !ok || _event.RecvTime() >= endTimeMS { + break + } else { + node.events.Pop() + } + + switch _event.(type) { + case EventData: + event := _event.(EventData) + + // Process this event + rank := node.receive(event.part) + // Send rank back to peer + // NOTE: in reality, maybe this doesn't always happen. + srcPeer := node.peers[event.src] + srcPeer.setGave(event.part) // HACK + srcPeer.sendEventDataResponse(EventDataResponse{ + time: event.time + latencyMS, // TODO: responseTxMS ? + src: srcPeer.remote, + part: event.part, + rank: rank, + }) + numEvents++ + + if rank > 1 { + // Already has this part, ignore this event. + continue + } + + // Let's iterate over peers & see which wants this piece. + for _, peer := range node.peers { + if peer.wants(event.part) { + //fmt.Print("w") + sent := peer.sendEventData(EventData{ + time: event.time + latencyMS + partTxMS, + src: peer.remote, + part: event.part, + }) + if sent { + peer.setGave(event.part) + numSendSuccess++ + } else { + numSendFailure++ + } + } else { + //fmt.Print("!") + // Peer doesn't want it, but sporadically we'll try sending it anyways. + if peer.canSendData(event.time) && + rand.Float32() < tryUnsolicited { + sent := peer.sendEventData(EventData{ + time: event.time + latencyMS + partTxMS, + src: peer.remote, + part: event.part, + }) + if sent { + peer.setGave(event.part) + numSendSuccess++ + } else { + numSendFailure++ + } + } + } + } + + case EventDataResponse: + event := _event.(EventDataResponse) + peer := node.peers[event.src] + + // Adjust peer.wanted accordingly + if event.rank <= maxAllowableRank { + peer.setWants(event.part, true) + } else { + peer.setWants(event.part, false) + } + } + + } + } + + if numEvents == 0 { + numEventsZero++ + } else { + numEventsZero = 0 + } + // If network is full or numEventsZero > 3, quit. + if countFull(nodes) == numNodes || numEventsZero > 3 { + fmt.Printf("Done! took %v ms. Past: %v\n", timeMS, runStats) + fillSum := 0.0 + for _, node := range nodes { + fillSum += node.fill() + } + runStats = append(runStats, runStat{timeMS, fillSum / float64(numNodes), float64(numSendSuccess) / float64(numSendSuccess+numSendFailure)}) + for i := 0; i < 20; i++ { + node := nodes[i] + fmt.Printf("[%v] parts: %X (%f)\n", node.index, node.parts[:80], node.fill()) + } + break + } else { + fmt.Printf("simulated %v ms. numEvents: %v Past: %v\n", timeMS, numEvents, runStats) + for i := 0; i < 2; i++ { + peer := nodes[0].peers[i] + fmt.Printf("[0].[%v] wanted: %X\n", i, peer.wanted[:80]) + fmt.Printf("[0].[%v] given: %X\n", i, peer.given[:80]) + } + for i := 0; i < 5; i++ { + node := nodes[i] + fmt.Printf("[%v] parts: %X (%f)\n", node.index, node.parts[:80], node.fill()) + } + } + + // Lets increment the timeMS now + timeMS += latencyMS / 2 + + /* + // Debug + if timeMS >= 25000 { + nodes[1].debug() + for e := nodes[1].events.Pop(); e != nil; e = nodes[1].events.Pop() { + fmt.Println(e) + } + return + } */ + } // end simulation + } // forever loop +} + +// ---------------------------------------------------------------------------- + +type Heap struct { + pq priorityQueue +} + +func NewHeap() *Heap { + return &Heap{pq: make([]*pqItem, 0)} +} + +func (h *Heap) Len() int { + return len(h.pq) +} + +func (h *Heap) Peek() interface{} { + if len(h.pq) == 0 { + return nil + } + return h.pq[0].value +} + +func (h *Heap) Push(value interface{}, priority int32) { + heap.Push(&h.pq, &pqItem{value: value, priority: priority}) +} + +func (h *Heap) Pop() interface{} { + if len(h.pq) == 0 { + return nil + } + item := heap.Pop(&h.pq).(*pqItem) + return item.value +} + +/* +func main() { + h := NewHeap() + + h.Push(String("msg1"), 1) + h.Push(String("msg3"), 3) + h.Push(String("msg2"), 2) + + fmt.Println(h.Pop()) + fmt.Println(h.Pop()) + fmt.Println(h.Pop()) +} +*/ + +/////////////////////// +// From: http://golang.org/pkg/container/heap/#example__priorityQueue + +type pqItem struct { + value interface{} + priority int32 + index int +} + +type priorityQueue []*pqItem + +func (pq priorityQueue) Len() int { return len(pq) } + +func (pq priorityQueue) Less(i, j int) bool { + return pq[i].priority < pq[j].priority +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *priorityQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*pqItem) + item.index = n + *pq = append(*pq, item) +} + +func (pq *priorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +func (pq *priorityQueue) Update(item *pqItem, value interface{}, priority int32) { + heap.Remove(pq, item.index) + item.value = value + item.priority = priority + heap.Push(pq, item) +}