You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

387 lines
8.8 KiB

  1. // nolint:unused
  2. package v2
  3. import (
  4. "fmt"
  5. "math"
  6. "math/rand"
  7. "time"
  8. "github.com/tendermint/tendermint/p2p"
  9. )
  10. type Event interface{}
  11. type blockState int
  12. const (
  13. blockStateUnknown blockState = iota
  14. blockStateNew
  15. blockStatePending
  16. blockStateReceived
  17. blockStateProcessed
  18. )
  19. func (e blockState) String() string {
  20. switch e {
  21. case blockStateUnknown:
  22. return "Unknown"
  23. case blockStateNew:
  24. return "New"
  25. case blockStatePending:
  26. return "Pending"
  27. case blockStateReceived:
  28. return "Received"
  29. case blockStateProcessed:
  30. return "Processed"
  31. default:
  32. return fmt.Sprintf("unknown blockState: %d", e)
  33. }
  34. }
  35. type peerState int
  36. const (
  37. peerStateNew = iota
  38. peerStateReady
  39. peerStateRemoved
  40. )
  41. func (e peerState) String() string {
  42. switch e {
  43. case peerStateNew:
  44. return "New"
  45. case peerStateReady:
  46. return "Ready"
  47. case peerStateRemoved:
  48. return "Removed"
  49. default:
  50. return fmt.Sprintf("unknown peerState: %d", e)
  51. }
  52. }
  53. type scPeer struct {
  54. peerID p2p.ID
  55. state peerState
  56. height int64
  57. lastTouched time.Time
  58. lastRate int64
  59. }
  60. func newScPeer(peerID p2p.ID) *scPeer {
  61. return &scPeer{
  62. peerID: peerID,
  63. state: peerStateNew,
  64. height: -1,
  65. lastTouched: time.Time{},
  66. }
  67. }
  68. // The schedule is a composite data structure which allows a scheduler to keep
  69. // track of which blocks have been scheduled into which state.
  70. type schedule struct {
  71. initHeight int64
  72. // a list of blocks in which blockState
  73. blockStates map[int64]blockState
  74. // a map of peerID to schedule specific peer struct `scPeer` used to keep
  75. // track of peer specific state
  76. peers map[p2p.ID]*scPeer
  77. // a map of heights to the peer we are waiting for a response from
  78. pendingBlocks map[int64]p2p.ID
  79. // the time at which a block was put in blockStatePending
  80. pendingTime map[int64]time.Time
  81. // the peerID of the peer which put the block in blockStateReceived
  82. receivedBlocks map[int64]p2p.ID
  83. }
  84. func newSchedule(initHeight int64) *schedule {
  85. sc := schedule{
  86. initHeight: initHeight,
  87. blockStates: make(map[int64]blockState),
  88. peers: make(map[p2p.ID]*scPeer),
  89. pendingBlocks: make(map[int64]p2p.ID),
  90. pendingTime: make(map[int64]time.Time),
  91. receivedBlocks: make(map[int64]p2p.ID),
  92. }
  93. sc.setStateAtHeight(initHeight, blockStateNew)
  94. return &sc
  95. }
  96. func (sc *schedule) addPeer(peerID p2p.ID) error {
  97. if _, ok := sc.peers[peerID]; ok {
  98. return fmt.Errorf("Cannot add duplicate peer %s", peerID)
  99. }
  100. sc.peers[peerID] = newScPeer(peerID)
  101. return nil
  102. }
  103. func (sc *schedule) touchPeer(peerID p2p.ID, time time.Time) error {
  104. peer, ok := sc.peers[peerID]
  105. if !ok {
  106. return fmt.Errorf("Couldn't find peer %s", peerID)
  107. }
  108. if peer.state == peerStateRemoved {
  109. return fmt.Errorf("Tried to touch peer in peerStateRemoved")
  110. }
  111. peer.lastTouched = time
  112. return nil
  113. }
  114. func (sc *schedule) removePeer(peerID p2p.ID) error {
  115. peer, ok := sc.peers[peerID]
  116. if !ok {
  117. return fmt.Errorf("Couldn't find peer %s", peerID)
  118. }
  119. if peer.state == peerStateRemoved {
  120. return fmt.Errorf("Tried to remove peer %s in peerStateRemoved", peerID)
  121. }
  122. for height, pendingPeerID := range sc.pendingBlocks {
  123. if pendingPeerID == peerID {
  124. sc.setStateAtHeight(height, blockStateNew)
  125. delete(sc.pendingTime, height)
  126. delete(sc.pendingBlocks, height)
  127. }
  128. }
  129. for height, rcvPeerID := range sc.receivedBlocks {
  130. if rcvPeerID == peerID {
  131. sc.setStateAtHeight(height, blockStateNew)
  132. delete(sc.receivedBlocks, height)
  133. }
  134. }
  135. peer.state = peerStateRemoved
  136. return nil
  137. }
  138. func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error {
  139. peer, ok := sc.peers[peerID]
  140. if !ok {
  141. return fmt.Errorf("Can't find peer %s", peerID)
  142. }
  143. if peer.state == peerStateRemoved {
  144. return fmt.Errorf("Cannot set peer height for a peer in peerStateRemoved")
  145. }
  146. if height < peer.height {
  147. return fmt.Errorf("Cannot move peer height lower. from %d to %d", peer.height, height)
  148. }
  149. peer.height = height
  150. peer.state = peerStateReady
  151. for i := sc.minHeight(); i <= height; i++ {
  152. if sc.getStateAtHeight(i) == blockStateUnknown {
  153. sc.setStateAtHeight(i, blockStateNew)
  154. }
  155. }
  156. return nil
  157. }
  158. func (sc *schedule) getStateAtHeight(height int64) blockState {
  159. if height < sc.initHeight {
  160. return blockStateProcessed
  161. } else if state, ok := sc.blockStates[height]; ok {
  162. return state
  163. } else {
  164. return blockStateUnknown
  165. }
  166. }
  167. func (sc *schedule) getPeersAtHeight(height int64) []*scPeer {
  168. peers := []*scPeer{}
  169. for _, peer := range sc.peers {
  170. if peer.height >= height {
  171. peers = append(peers, peer)
  172. }
  173. }
  174. return peers
  175. }
  176. func (sc *schedule) peersInactiveSince(duration time.Duration, now time.Time) []p2p.ID {
  177. peers := []p2p.ID{}
  178. for _, peer := range sc.peers {
  179. if now.Sub(peer.lastTouched) > duration {
  180. peers = append(peers, peer.peerID)
  181. }
  182. }
  183. return peers
  184. }
  185. func (sc *schedule) peersSlowerThan(minSpeed int64) []p2p.ID {
  186. peers := []p2p.ID{}
  187. for _, peer := range sc.peers {
  188. if peer.lastRate < minSpeed {
  189. peers = append(peers, peer.peerID)
  190. }
  191. }
  192. return peers
  193. }
  194. func (sc *schedule) setStateAtHeight(height int64, state blockState) {
  195. sc.blockStates[height] = state
  196. }
  197. func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error {
  198. peer, ok := sc.peers[peerID]
  199. if !ok {
  200. return fmt.Errorf("Can't find peer %s", peerID)
  201. }
  202. if peer.state == peerStateRemoved {
  203. return fmt.Errorf("Cannot receive blocks from removed peer %s", peerID)
  204. }
  205. if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID {
  206. return fmt.Errorf("Received block %d from peer %s without being requested", height, peerID)
  207. }
  208. pendingTime, ok := sc.pendingTime[height]
  209. if !ok || now.Sub(pendingTime) <= 0 {
  210. return fmt.Errorf("Clock error. Block %d received at %s but requested at %s",
  211. height, pendingTime, now)
  212. }
  213. peer.lastRate = size / int64(now.Sub(pendingTime).Seconds())
  214. sc.setStateAtHeight(height, blockStateReceived)
  215. delete(sc.pendingBlocks, height)
  216. delete(sc.pendingTime, height)
  217. sc.receivedBlocks[height] = peerID
  218. return nil
  219. }
  220. func (sc *schedule) markPending(peerID p2p.ID, height int64, time time.Time) error {
  221. peer, ok := sc.peers[peerID]
  222. if !ok {
  223. return fmt.Errorf("Can't find peer %s", peerID)
  224. }
  225. state := sc.getStateAtHeight(height)
  226. if state != blockStateNew {
  227. return fmt.Errorf("Block %d should be in blockStateNew but was %s", height, state)
  228. }
  229. if peer.state != peerStateReady {
  230. return fmt.Errorf("Cannot schedule %d from %s in %s", height, peerID, peer.state)
  231. }
  232. if height > peer.height {
  233. return fmt.Errorf("Cannot request height %d from peer %s who is at height %d",
  234. height, peerID, peer.height)
  235. }
  236. sc.setStateAtHeight(height, blockStatePending)
  237. sc.pendingBlocks[height] = peerID
  238. // XXX: to make this more accurate we can introduce a message from
  239. // the IO routine which indicates the time the request was put on the wire
  240. sc.pendingTime[height] = time
  241. return nil
  242. }
  243. func (sc *schedule) markProcessed(height int64) error {
  244. state := sc.getStateAtHeight(height)
  245. if state != blockStateReceived {
  246. return fmt.Errorf("Can't mark height %d received from block state %s", height, state)
  247. }
  248. delete(sc.receivedBlocks, height)
  249. sc.setStateAtHeight(height, blockStateProcessed)
  250. return nil
  251. }
  252. // allBlockProcessed returns true if all blocks are in blockStateProcessed and
  253. // determines if the schedule has been completed
  254. func (sc *schedule) allBlocksProcessed() bool {
  255. for _, state := range sc.blockStates {
  256. if state != blockStateProcessed {
  257. return false
  258. }
  259. }
  260. return true
  261. }
  262. // highest block | state == blockStateNew
  263. func (sc *schedule) maxHeight() int64 {
  264. var max int64 = 0
  265. for height, state := range sc.blockStates {
  266. if state == blockStateNew && height > max {
  267. max = height
  268. }
  269. }
  270. return max
  271. }
  272. // lowest block | state == blockStateNew
  273. func (sc *schedule) minHeight() int64 {
  274. var min int64 = math.MaxInt64
  275. for height, state := range sc.blockStates {
  276. if state == blockStateNew && height < min {
  277. min = height
  278. }
  279. }
  280. return min
  281. }
  282. func (sc *schedule) pendingFrom(peerID p2p.ID) []int64 {
  283. heights := []int64{}
  284. for height, pendingPeerID := range sc.pendingBlocks {
  285. if pendingPeerID == peerID {
  286. heights = append(heights, height)
  287. }
  288. }
  289. return heights
  290. }
  291. func (sc *schedule) selectPeer(peers []*scPeer) *scPeer {
  292. // FIXME: properPeerSelector
  293. s := rand.NewSource(time.Now().Unix())
  294. r := rand.New(s)
  295. return peers[r.Intn(len(peers))]
  296. }
  297. // XXX: this duplicates the logic of peersInactiveSince and peersSlowerThan
  298. func (sc *schedule) prunablePeers(peerTimout time.Duration, minRecvRate int64, now time.Time) []p2p.ID {
  299. prunable := []p2p.ID{}
  300. for peerID, peer := range sc.peers {
  301. if now.Sub(peer.lastTouched) > peerTimout || peer.lastRate < minRecvRate {
  302. prunable = append(prunable, peerID)
  303. }
  304. }
  305. return prunable
  306. }
  307. func (sc *schedule) numBlockInState(targetState blockState) uint32 {
  308. var num uint32 = 0
  309. for _, state := range sc.blockStates {
  310. if state == targetState {
  311. num++
  312. }
  313. }
  314. return num
  315. }