You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

493 lines
11 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
  1. package main
  2. import (
  3. "container/heap"
  4. "fmt"
  5. "math/rand"
  6. "strings"
  7. )
  8. const seed = 0
  9. const numNodes = 30000 // Total number of nodes to simulate
  10. const minNumPeers = 7 // Each node should be connected to at least this many peers
  11. const maxNumPeers = 10 // ... and at most this many
  12. const latencyMS = int32(500) // One way packet latency
  13. const partTxMS = int32(100) // Transmission time per peer of 4KB of data.
  14. const sendQueueCapacity = 5 // Amount of messages to queue between peers.
  15. func init() {
  16. rand.Seed(seed)
  17. }
  18. //-----------------------------------------------------------------------------
  19. type Peer struct {
  20. node *Node // Pointer to node
  21. sent int32 // Time of last packet send, including transmit time.
  22. remote int // SomeNode.peers[x].node.peers[remote].node is SomeNode for all x.
  23. parts []byte // [32]byte{} bitarray of received block pieces.
  24. }
  25. // Send a data event to the peer, or return false if queue is "full".
  26. // Depending on how many event packets are "queued" for peer,
  27. // the actual recvTime may be adjusted to be later.
  28. func (p *Peer) sendEventData(event EventData) bool {
  29. desiredRecvTime := event.RecvTime()
  30. minRecvTime := p.sent + partTxMS + latencyMS
  31. if desiredRecvTime >= minRecvTime {
  32. p.node.sendEvent(event)
  33. p.sent += partTxMS
  34. return true
  35. } else {
  36. if (minRecvTime-desiredRecvTime)/partTxMS > sendQueueCapacity {
  37. return false
  38. } else {
  39. event.SetRecvTime(minRecvTime) // Adjust recvTime
  40. p.node.sendEvent(event)
  41. p.sent += partTxMS
  42. return true
  43. }
  44. }
  45. }
  46. // Returns true if the sendQueue is not "full"
  47. func (p *Peer) canSendData(now int32) bool {
  48. return (p.sent - now) < sendQueueCapacity
  49. }
  50. // Since EventPart events are much smaller, we don't consider the transmit time,
  51. // and assume that the sendQueue is always free.
  52. func (p *Peer) sendEventParts(event EventParts) {
  53. p.node.sendEvent(event)
  54. }
  55. // Does the peer's .parts (as received by an EventParts event) contain part?
  56. func (p *Peer) knownToHave(part uint8) bool {
  57. return p.parts[part/8]&(1<<(part%8)) > 0
  58. }
  59. //-----------------------------------------------------------------------------
  60. type Node struct {
  61. index int
  62. peers []*Peer
  63. parts []byte
  64. events *Heap
  65. }
  66. func (n *Node) sendEvent(event Event) {
  67. n.events.Push(event, event.RecvTime())
  68. }
  69. func (n *Node) recvEvent() Event {
  70. return n.events.Pop().(Event)
  71. }
  72. func (n *Node) receive(part uint8) bool {
  73. x := n.parts[part/8]
  74. nx := x | (1 << (part % 8))
  75. if x == nx {
  76. return false
  77. } else {
  78. n.parts[part/8] = nx
  79. return true
  80. }
  81. }
  82. // returns false if already connected, or remote node has too many connections.
  83. func (n *Node) canConnectTo(node *Node) bool {
  84. if len(n.peers) > maxNumPeers {
  85. return false
  86. }
  87. for _, peer := range n.peers {
  88. if peer.node == node {
  89. return false
  90. }
  91. }
  92. return true
  93. }
  94. func (n *Node) isFull() bool {
  95. for _, part := range n.parts {
  96. if part != byte(0xff) {
  97. return false
  98. }
  99. }
  100. return true
  101. }
  102. func (n *Node) pickRandomForPeer(peer *Peer) (part uint8, ok bool) {
  103. peerParts := peer.parts
  104. nodeParts := n.parts
  105. randStart := rand.Intn(32)
  106. for i := 0; i < 32; i++ {
  107. bytei := uint8((i + randStart) % 32)
  108. nByte := nodeParts[bytei]
  109. pByte := peerParts[bytei]
  110. iHas := nByte & ^pByte
  111. if iHas > 0 {
  112. randBitStart := rand.Intn(8)
  113. //fmt.Println("//--")
  114. for j := 0; j < 8; j++ {
  115. biti := uint8((j + randBitStart) % 8)
  116. //fmt.Printf("%X %v %v %v\n", iHas, j, biti, randBitStart)
  117. if (iHas & (1 << biti)) > 0 {
  118. return 8*bytei + biti, true
  119. }
  120. }
  121. panic("should not happen")
  122. }
  123. }
  124. return 0, false
  125. }
  126. func (n *Node) debug() {
  127. lines := []string{}
  128. lines = append(lines, n.String())
  129. lines = append(lines, fmt.Sprintf("events: %v, parts: %X", n.events.Len(), n.parts))
  130. for _, p := range n.peers {
  131. part, ok := n.pickRandomForPeer(p)
  132. lines = append(lines, fmt.Sprintf("peer sent: %v, parts: %X, (%v/%v)", p.sent, p.parts, part, ok))
  133. }
  134. fmt.Println("//---------------")
  135. fmt.Println(strings.Join(lines, "\n"))
  136. fmt.Println("//---------------")
  137. }
  138. func (n *Node) String() string {
  139. return fmt.Sprintf("{N:%d}", n.index)
  140. }
  141. //-----------------------------------------------------------------------------
  142. type Event interface {
  143. RecvTime() int32
  144. SetRecvTime(int32)
  145. }
  146. type EventData struct {
  147. time int32 // time of receipt.
  148. src int // src node's peer index on destination node
  149. part uint8
  150. }
  151. func (e EventData) RecvTime() int32 {
  152. return e.time
  153. }
  154. func (e EventData) SetRecvTime(time int32) {
  155. e.time = time
  156. }
  157. func (e EventData) String() string {
  158. return fmt.Sprintf("[%d:%d:%d]", e.time, e.src, e.part)
  159. }
  160. type EventParts struct {
  161. time int32 // time of receipt.
  162. src int // src node's peer index on destination node.
  163. parts []byte
  164. }
  165. func (e EventParts) RecvTime() int32 {
  166. return e.time
  167. }
  168. func (e EventParts) SetRecvTime(time int32) {
  169. e.time = time
  170. }
  171. func (e EventParts) String() string {
  172. return fmt.Sprintf("[%d:%d:%d]", e.time, e.src, e.parts)
  173. }
  174. //-----------------------------------------------------------------------------
  175. func createNetwork() []*Node {
  176. nodes := make([]*Node, numNodes)
  177. for i := 0; i < numNodes; i++ {
  178. n := &Node{
  179. index: i,
  180. peers: []*Peer{},
  181. parts: make([]byte, 32),
  182. events: NewHeap(),
  183. }
  184. nodes[i] = n
  185. }
  186. for i := 0; i < numNodes; i++ {
  187. n := nodes[i]
  188. for j := 0; j < minNumPeers; j++ {
  189. if len(n.peers) > j {
  190. // Already set, continue
  191. continue
  192. }
  193. pidx := rand.Intn(numNodes)
  194. for !n.canConnectTo(nodes[pidx]) {
  195. pidx = rand.Intn(numNodes)
  196. }
  197. // connect to nodes[pidx]
  198. remote := nodes[pidx]
  199. remote_j := len(remote.peers)
  200. n.peers = append(n.peers, &Peer{node: remote, remote: remote_j, parts: make([]byte, 32)})
  201. remote.peers = append(remote.peers, &Peer{node: n, remote: j, parts: make([]byte, 32)})
  202. }
  203. }
  204. return nodes
  205. }
  206. func printNodes(nodes []*Node) {
  207. for _, node := range nodes {
  208. peerStr := ""
  209. for _, peer := range node.peers {
  210. peerStr += fmt.Sprintf(" %v", peer.node.index)
  211. }
  212. fmt.Printf("[%v] peers: %v\n", node.index, peerStr)
  213. }
  214. }
  215. func countFull(nodes []*Node) (fullCount int) {
  216. for _, node := range nodes {
  217. if node.isFull() {
  218. fullCount += 1
  219. }
  220. }
  221. return fullCount
  222. }
  223. func main() {
  224. // Global vars
  225. nodes := createNetwork()
  226. timeMS := int32(0)
  227. proposer := nodes[0]
  228. for i := 0; i < 32; i++ {
  229. proposer.parts[i] = byte(0xff)
  230. }
  231. //printNodes(nodes[:])
  232. // The proposer sends parts to all of its peers.
  233. for i := 0; i < len(proposer.peers); i++ {
  234. timeMS := int32(0) // scoped
  235. peer := proposer.peers[i]
  236. for j := 0; j < 256; j++ {
  237. // Send each part to a peer, but each peer starts at a different offset.
  238. part := uint8((j + i*(256/len(proposer.peers))) % 256)
  239. recvTime := timeMS + latencyMS + partTxMS
  240. event := EventData{
  241. time: recvTime,
  242. src: peer.remote,
  243. part: part,
  244. }
  245. peer.sendEventData(event)
  246. timeMS += partTxMS
  247. }
  248. }
  249. // Run simulation
  250. for {
  251. // Lets run the simulation for each user until endTimeMS
  252. // We use latencyMS/2 since causality has at least this much lag.
  253. endTimeMS := timeMS + latencyMS/2
  254. fmt.Printf("simulating until %v\n", endTimeMS)
  255. // Print out the network for debugging
  256. if true {
  257. for i := 0; i < 40; i++ {
  258. node := nodes[i]
  259. fmt.Printf("[%v] parts: %X\n", node.index, node.parts)
  260. }
  261. }
  262. for _, node := range nodes {
  263. // Iterate over the events of this node until event.time >= endTimeMS
  264. for {
  265. _event, ok := node.events.Peek().(Event)
  266. if !ok || _event.RecvTime() >= endTimeMS {
  267. break
  268. } else {
  269. node.events.Pop()
  270. }
  271. switch _event.(type) {
  272. case EventData:
  273. event := _event.(EventData)
  274. // Process this event
  275. if !node.receive(event.part) {
  276. // Already has this part, ignore this event.
  277. continue
  278. }
  279. // Let's iterate over peers & see which needs this piece.
  280. for _, peer := range node.peers {
  281. if !peer.knownToHave(event.part) {
  282. peer.sendEventData(EventData{
  283. time: event.time + latencyMS + partTxMS,
  284. src: peer.remote,
  285. part: event.part,
  286. })
  287. } else {
  288. continue
  289. }
  290. }
  291. case EventParts:
  292. event := _event.(EventParts)
  293. node.peers[event.src].parts = event.parts
  294. peer := node.peers[event.src]
  295. // Lets blast the peer with random parts.
  296. randomSent := 0
  297. randomSentErr := 0
  298. for peer.canSendData(event.time) {
  299. part, ok := node.pickRandomForPeer(peer)
  300. if ok {
  301. randomSent += 1
  302. sent := peer.sendEventData(EventData{
  303. time: event.time + latencyMS + partTxMS,
  304. src: peer.remote,
  305. part: part,
  306. })
  307. if !sent {
  308. randomSentErr += 1
  309. }
  310. } else {
  311. break
  312. }
  313. }
  314. /*
  315. if randomSent > 0 {
  316. fmt.Printf("radom sent: %v %v", randomSent, randomSentErr)
  317. }
  318. */
  319. }
  320. }
  321. }
  322. // If network is full, quit.
  323. if countFull(nodes) == numNodes {
  324. fmt.Printf("Done! took %v ms", timeMS)
  325. break
  326. }
  327. // Lets increment the timeMS now
  328. timeMS += latencyMS / 2
  329. // Debug
  330. if timeMS >= 25000 {
  331. nodes[1].debug()
  332. for e := nodes[1].events.Pop(); e != nil; e = nodes[1].events.Pop() {
  333. fmt.Println(e)
  334. }
  335. return
  336. }
  337. // Send EventParts rather frequently. It's cheap.
  338. for _, node := range nodes {
  339. for _, peer := range node.peers {
  340. peer.sendEventParts(EventParts{
  341. time: timeMS + latencyMS,
  342. src: peer.remote,
  343. parts: node.parts,
  344. })
  345. }
  346. newParts := make([]byte, 32)
  347. copy(newParts, node.parts)
  348. node.parts = newParts
  349. }
  350. }
  351. }
  352. // ----------------------------------------------------------------------------
  353. type Heap struct {
  354. pq priorityQueue
  355. }
  356. func NewHeap() *Heap {
  357. return &Heap{pq: make([]*pqItem, 0)}
  358. }
  359. func (h *Heap) Len() int {
  360. return len(h.pq)
  361. }
  362. func (h *Heap) Peek() interface{} {
  363. if len(h.pq) == 0 {
  364. return nil
  365. }
  366. return h.pq[0].value
  367. }
  368. func (h *Heap) Push(value interface{}, priority int32) {
  369. heap.Push(&h.pq, &pqItem{value: value, priority: priority})
  370. }
  371. func (h *Heap) Pop() interface{} {
  372. item := heap.Pop(&h.pq).(*pqItem)
  373. return item.value
  374. }
  375. /*
  376. func main() {
  377. h := NewHeap()
  378. h.Push(String("msg1"), 1)
  379. h.Push(String("msg3"), 3)
  380. h.Push(String("msg2"), 2)
  381. fmt.Println(h.Pop())
  382. fmt.Println(h.Pop())
  383. fmt.Println(h.Pop())
  384. }
  385. */
  386. ///////////////////////
  387. // From: http://golang.org/pkg/container/heap/#example__priorityQueue
  388. type pqItem struct {
  389. value interface{}
  390. priority int32
  391. index int
  392. }
  393. type priorityQueue []*pqItem
  394. func (pq priorityQueue) Len() int { return len(pq) }
  395. func (pq priorityQueue) Less(i, j int) bool {
  396. return pq[i].priority < pq[j].priority
  397. }
  398. func (pq priorityQueue) Swap(i, j int) {
  399. pq[i], pq[j] = pq[j], pq[i]
  400. pq[i].index = i
  401. pq[j].index = j
  402. }
  403. func (pq *priorityQueue) Push(x interface{}) {
  404. n := len(*pq)
  405. item := x.(*pqItem)
  406. item.index = n
  407. *pq = append(*pq, item)
  408. }
  409. func (pq *priorityQueue) Pop() interface{} {
  410. old := *pq
  411. n := len(old)
  412. item := old[n-1]
  413. item.index = -1 // for safety
  414. *pq = old[0 : n-1]
  415. return item
  416. }
  417. func (pq *priorityQueue) Update(item *pqItem, value interface{}, priority int32) {
  418. heap.Remove(pq, item.index)
  419. item.value = value
  420. item.priority = priority
  421. heap.Push(pq, item)
  422. }