You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

398 lines
8.9 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package blockchain
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. . "github.com/tendermint/tendermint/common"
  7. "github.com/tendermint/tendermint/types"
  8. )
  9. const (
  10. maxTries = 3
  11. inputsChannelCapacity = 200
  12. requestIntervalMS = 500
  13. maxPendingRequests = 200
  14. maxTotalRequests = 300
  15. maxRequestsPerPeer = 300
  16. )
  17. var (
  18. requestTimeoutSeconds = time.Duration(3)
  19. )
  20. /*
  21. Peers self report their heights when a new peer joins the block pool.
  22. Starting from pool.height (inclusive), we request blocks
  23. in sequence from peers that reported higher heights than ours.
  24. Every so often we ask peers what height they're on so we can keep going.
  25. Requests are continuously made for blocks of heigher heights until
  26. the limits. If most of the requests have no available peers, and we
  27. are not at peer limits, we can probably switch to consensus reactor
  28. */
  29. type BlockPool struct {
  30. // block requests
  31. requestsMtx sync.Mutex
  32. requests map[uint]*bpRequest
  33. height uint // the lowest key in requests.
  34. numUnassigned int32 // number of requests not yet assigned to a peer
  35. numPending int32 // number of requests pending assignment or block response
  36. // peers
  37. peersMtx sync.Mutex
  38. peers map[string]*bpPeer
  39. requestsCh chan<- BlockRequest
  40. timeoutsCh chan<- string
  41. repeater *RepeatTimer
  42. running int32 // atomic
  43. }
  44. func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
  45. return &BlockPool{
  46. peers: make(map[string]*bpPeer),
  47. requests: make(map[uint]*bpRequest),
  48. height: start,
  49. numUnassigned: 0,
  50. numPending: 0,
  51. requestsCh: requestsCh,
  52. timeoutsCh: timeoutsCh,
  53. repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond),
  54. running: 0,
  55. }
  56. }
  57. func (pool *BlockPool) Start() {
  58. if atomic.CompareAndSwapInt32(&pool.running, 0, 1) {
  59. log.Info("Starting BlockPool")
  60. go pool.run()
  61. }
  62. }
  63. func (pool *BlockPool) Stop() {
  64. if atomic.CompareAndSwapInt32(&pool.running, 1, 0) {
  65. log.Info("Stopping BlockPool")
  66. pool.repeater.Stop()
  67. }
  68. }
  69. func (pool *BlockPool) IsRunning() bool {
  70. return atomic.LoadInt32(&pool.running) == 1
  71. }
  72. // Run spawns requests as needed.
  73. func (pool *BlockPool) run() {
  74. RUN_LOOP:
  75. for {
  76. if atomic.LoadInt32(&pool.running) == 0 {
  77. break RUN_LOOP
  78. }
  79. _, numPending := pool.GetStatus()
  80. if numPending >= maxPendingRequests {
  81. // sleep for a bit.
  82. time.Sleep(requestIntervalMS * time.Millisecond)
  83. } else if len(pool.requests) >= maxTotalRequests {
  84. // sleep for a bit.
  85. time.Sleep(requestIntervalMS * time.Millisecond)
  86. } else {
  87. // request for more blocks.
  88. pool.makeNextRequest()
  89. }
  90. }
  91. }
  92. func (pool *BlockPool) GetStatus() (uint, int32) {
  93. pool.requestsMtx.Lock() // Lock
  94. defer pool.requestsMtx.Unlock()
  95. return pool.height, pool.numPending
  96. }
  97. // We need to see the second block's Validation to validate the first block.
  98. // So we peek two blocks at a time.
  99. func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
  100. pool.requestsMtx.Lock() // Lock
  101. defer pool.requestsMtx.Unlock()
  102. if r := pool.requests[pool.height]; r != nil {
  103. first = r.block
  104. }
  105. if r := pool.requests[pool.height+1]; r != nil {
  106. second = r.block
  107. }
  108. return
  109. }
  110. // Pop the first block at pool.height
  111. // It must have been validated by 'second'.Validation from PeekTwoBlocks().
  112. func (pool *BlockPool) PopRequest() {
  113. pool.requestsMtx.Lock() // Lock
  114. defer pool.requestsMtx.Unlock()
  115. if r := pool.requests[pool.height]; r == nil || r.block == nil {
  116. panic("PopRequest() requires a valid block")
  117. }
  118. delete(pool.requests, pool.height)
  119. pool.height++
  120. }
  121. // Invalidates the block at pool.height.
  122. // Remove the peer and request from others.
  123. func (pool *BlockPool) RedoRequest(height uint) {
  124. pool.requestsMtx.Lock() // Lock
  125. defer pool.requestsMtx.Unlock()
  126. request := pool.requests[height]
  127. if request.block == nil {
  128. panic("Expected block to be non-nil")
  129. }
  130. // TODO: record this malfeasance
  131. // maybe punish peer on switch (an invalid block!)
  132. pool.RemovePeer(request.peerId) // Lock on peersMtx.
  133. request.block = nil
  134. request.peerId = ""
  135. pool.numPending++
  136. pool.numUnassigned++
  137. go requestRoutine(pool, height)
  138. }
  139. func (pool *BlockPool) hasBlock(height uint) bool {
  140. pool.requestsMtx.Lock() // Lock
  141. defer pool.requestsMtx.Unlock()
  142. request := pool.requests[height]
  143. return request != nil && request.block != nil
  144. }
  145. func (pool *BlockPool) setPeerForRequest(height uint, peerId string) {
  146. pool.requestsMtx.Lock() // Lock
  147. defer pool.requestsMtx.Unlock()
  148. request := pool.requests[height]
  149. if request == nil {
  150. return
  151. }
  152. pool.numUnassigned--
  153. request.peerId = peerId
  154. }
  155. func (pool *BlockPool) removePeerForRequest(height uint, peerId string) {
  156. pool.requestsMtx.Lock() // Lock
  157. defer pool.requestsMtx.Unlock()
  158. request := pool.requests[height]
  159. if request == nil {
  160. return
  161. }
  162. pool.numUnassigned++
  163. request.peerId = ""
  164. }
  165. func (pool *BlockPool) AddBlock(block *types.Block, peerId string) {
  166. pool.requestsMtx.Lock() // Lock
  167. defer pool.requestsMtx.Unlock()
  168. request := pool.requests[block.Height]
  169. if request == nil {
  170. return
  171. }
  172. if request.peerId != peerId {
  173. return
  174. }
  175. if request.block != nil {
  176. return
  177. }
  178. request.block = block
  179. pool.numPending--
  180. }
  181. func (pool *BlockPool) getPeer(peerId string) *bpPeer {
  182. pool.peersMtx.Lock() // Lock
  183. defer pool.peersMtx.Unlock()
  184. peer := pool.peers[peerId]
  185. return peer
  186. }
  187. // Sets the peer's alleged blockchain height.
  188. func (pool *BlockPool) SetPeerHeight(peerId string, height uint) {
  189. pool.peersMtx.Lock() // Lock
  190. defer pool.peersMtx.Unlock()
  191. peer := pool.peers[peerId]
  192. if peer != nil {
  193. peer.height = height
  194. } else {
  195. peer = &bpPeer{
  196. height: height,
  197. id: peerId,
  198. numRequests: 0,
  199. }
  200. pool.peers[peerId] = peer
  201. }
  202. }
  203. func (pool *BlockPool) RemovePeer(peerId string) {
  204. pool.peersMtx.Lock() // Lock
  205. defer pool.peersMtx.Unlock()
  206. delete(pool.peers, peerId)
  207. }
  208. // Pick an available peer with at least the given minHeight.
  209. // If no peers are available, returns nil.
  210. func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer {
  211. pool.peersMtx.Lock()
  212. defer pool.peersMtx.Unlock()
  213. for _, peer := range pool.peers {
  214. if peer.numRequests >= maxRequestsPerPeer {
  215. continue
  216. }
  217. if peer.height < minHeight {
  218. continue
  219. }
  220. peer.numRequests++
  221. return peer
  222. }
  223. return nil
  224. }
  225. func (pool *BlockPool) decrPeer(peerId string) {
  226. pool.peersMtx.Lock()
  227. defer pool.peersMtx.Unlock()
  228. peer := pool.peers[peerId]
  229. if peer == nil {
  230. return
  231. }
  232. peer.numRequests--
  233. }
  234. func (pool *BlockPool) makeNextRequest() {
  235. pool.requestsMtx.Lock() // Lock
  236. defer pool.requestsMtx.Unlock()
  237. nextHeight := pool.height + uint(len(pool.requests))
  238. request := &bpRequest{
  239. height: nextHeight,
  240. peerId: "",
  241. block: nil,
  242. }
  243. pool.requests[nextHeight] = request
  244. pool.numUnassigned++
  245. pool.numPending++
  246. go requestRoutine(pool, nextHeight)
  247. }
  248. func (pool *BlockPool) sendRequest(height uint, peerId string) {
  249. if atomic.LoadInt32(&pool.running) == 0 {
  250. return
  251. }
  252. pool.requestsCh <- BlockRequest{height, peerId}
  253. }
  254. func (pool *BlockPool) sendTimeout(peerId string) {
  255. if atomic.LoadInt32(&pool.running) == 0 {
  256. return
  257. }
  258. pool.timeoutsCh <- peerId
  259. }
  260. func (pool *BlockPool) debug() string {
  261. pool.requestsMtx.Lock() // Lock
  262. defer pool.requestsMtx.Unlock()
  263. str := ""
  264. for h := pool.height; h < pool.height+uint(len(pool.requests)); h++ {
  265. if pool.requests[h] == nil {
  266. str += Fmt("H(%v):X ", h)
  267. } else {
  268. str += Fmt("H(%v):", h)
  269. str += Fmt("B?(%v) ", pool.requests[h].block != nil)
  270. }
  271. }
  272. return str
  273. }
  274. //-------------------------------------
  275. type bpPeer struct {
  276. id string
  277. height uint
  278. numRequests int32
  279. }
  280. type bpRequest struct {
  281. height uint
  282. peerId string
  283. block *types.Block
  284. }
  285. //-------------------------------------
  286. // Responsible for making more requests as necessary
  287. // Returns only when a block is found (e.g. AddBlock() is called)
  288. func requestRoutine(pool *BlockPool, height uint) {
  289. for {
  290. var peer *bpPeer = nil
  291. PICK_LOOP:
  292. for {
  293. if !pool.IsRunning() {
  294. log.Debug("BlockPool not running. Stopping requestRoutine", "height", height)
  295. return
  296. }
  297. peer = pool.pickIncrAvailablePeer(height)
  298. if peer == nil {
  299. //log.Debug("No peers available", "height", height)
  300. time.Sleep(requestIntervalMS * time.Millisecond)
  301. continue PICK_LOOP
  302. }
  303. break PICK_LOOP
  304. }
  305. // set the peer, decrement numUnassigned
  306. pool.setPeerForRequest(height, peer.id)
  307. for try := 0; try < maxTries; try++ {
  308. pool.sendRequest(height, peer.id)
  309. time.Sleep(requestTimeoutSeconds * time.Second)
  310. // if successful the block is either in the pool,
  311. if pool.hasBlock(height) {
  312. pool.decrPeer(peer.id)
  313. return
  314. }
  315. // or already processed and we've moved past it
  316. bpHeight, _ := pool.GetStatus()
  317. if height < bpHeight {
  318. pool.decrPeer(peer.id)
  319. return
  320. }
  321. }
  322. // unset the peer, increment numUnassigned
  323. pool.removePeerForRequest(height, peer.id)
  324. // this peer failed us, try again
  325. pool.RemovePeer(peer.id)
  326. pool.sendTimeout(peer.id)
  327. }
  328. }
  329. //-------------------------------------
  330. type BlockRequest struct {
  331. Height uint
  332. PeerId string
  333. }