You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

516 lines
12 KiB

10 years ago
10 years ago
  1. package blockchain
  2. import (
  3. "math"
  4. "sync"
  5. "time"
  6. flow "github.com/tendermint/flowcontrol"
  7. . "github.com/tendermint/go-common"
  8. "github.com/tendermint/tendermint/types"
  9. )
  10. const (
  11. requestIntervalMS = 250
  12. maxTotalRequesters = 300
  13. maxPendingRequests = maxTotalRequesters
  14. maxPendingRequestsPerPeer = 75
  15. peerTimeoutSeconds = 15
  16. minRecvRate = 10240 // 10Kb/s
  17. )
  18. /*
  19. Peers self report their heights when we join the block pool.
  20. Starting from our latest pool.height, we request blocks
  21. in sequence from peers that reported higher heights than ours.
  22. Every so often we ask peers what height they're on so we can keep going.
  23. Requests are continuously made for blocks of heigher heights until
  24. the limits. If most of the requests have no available peers, and we
  25. are not at peer limits, we can probably switch to consensus reactor
  26. */
  27. type BlockPool struct {
  28. QuitService
  29. startTime time.Time
  30. // block requests
  31. mtx sync.Mutex
  32. requesters map[int]*bpRequester
  33. height int // the lowest key in requesters.
  34. numPending int32 // number of requests pending assignment or block response
  35. // peers
  36. peersMtx sync.Mutex
  37. peers map[string]*bpPeer
  38. requestsCh chan<- BlockRequest
  39. timeoutsCh chan<- string
  40. }
  41. func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
  42. bp := &BlockPool{
  43. peers: make(map[string]*bpPeer),
  44. requesters: make(map[int]*bpRequester),
  45. height: start,
  46. numPending: 0,
  47. requestsCh: requestsCh,
  48. timeoutsCh: timeoutsCh,
  49. }
  50. bp.QuitService = *NewQuitService(log, "BlockPool", bp)
  51. return bp
  52. }
  53. func (pool *BlockPool) OnStart() error {
  54. pool.QuitService.OnStart()
  55. go pool.makeRequestersRoutine()
  56. pool.startTime = time.Now()
  57. return nil
  58. }
  59. func (pool *BlockPool) OnStop() {
  60. pool.QuitService.OnStop()
  61. }
  62. // Run spawns requesters as needed.
  63. func (pool *BlockPool) makeRequestersRoutine() {
  64. for {
  65. if !pool.IsRunning() {
  66. break
  67. }
  68. _, numPending := pool.GetStatus()
  69. if numPending >= maxPendingRequests {
  70. // sleep for a bit.
  71. time.Sleep(requestIntervalMS * time.Millisecond)
  72. // check for timed out peers
  73. pool.removeTimedoutPeers()
  74. } else if len(pool.requesters) >= maxTotalRequesters {
  75. // sleep for a bit.
  76. time.Sleep(requestIntervalMS * time.Millisecond)
  77. // check for timed out peers
  78. pool.removeTimedoutPeers()
  79. } else {
  80. // request for more blocks.
  81. pool.makeNextRequester()
  82. }
  83. }
  84. }
  85. func (pool *BlockPool) removeTimedoutPeers() {
  86. for _, peer := range pool.peers {
  87. if !peer.didTimeout && peer.numPending > 0 {
  88. curRate := peer.recvMonitor.Status().CurRate
  89. // XXX remove curRate != 0
  90. if curRate != 0 && curRate < minRecvRate {
  91. pool.sendTimeout(peer.id)
  92. log.Warn("SendTimeout", "peer", peer.id, "reason", "curRate too low")
  93. peer.didTimeout = true
  94. }
  95. }
  96. if peer.didTimeout {
  97. pool.peersMtx.Lock() // Lock
  98. pool.removePeer(peer.id)
  99. pool.peersMtx.Unlock()
  100. }
  101. }
  102. }
  103. func (pool *BlockPool) GetStatus() (height int, numPending int32) {
  104. pool.mtx.Lock() // Lock
  105. defer pool.mtx.Unlock()
  106. return pool.height, pool.numPending
  107. }
  108. // TODO: relax conditions, prevent abuse.
  109. func (pool *BlockPool) IsCaughtUp() bool {
  110. pool.mtx.Lock()
  111. height := pool.height
  112. pool.mtx.Unlock()
  113. // Need at least 1 peer to be considered caught up.
  114. if len(pool.peers) == 0 {
  115. return false
  116. }
  117. pool.peersMtx.Lock()
  118. maxPeerHeight := 0
  119. for _, peer := range pool.peers {
  120. maxPeerHeight = MaxInt(maxPeerHeight, peer.height)
  121. }
  122. pool.peersMtx.Unlock()
  123. return (height > 0 || time.Now().Sub(pool.startTime) > 5*time.Second) && (maxPeerHeight == 0 || height == maxPeerHeight)
  124. }
  125. // We need to see the second block's Validation to validate the first block.
  126. // So we peek two blocks at a time.
  127. func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
  128. pool.mtx.Lock() // Lock
  129. defer pool.mtx.Unlock()
  130. if r := pool.requesters[pool.height]; r != nil {
  131. first = r.getBlock()
  132. }
  133. if r := pool.requesters[pool.height+1]; r != nil {
  134. second = r.getBlock()
  135. }
  136. return
  137. }
  138. // Pop the first block at pool.height
  139. // It must have been validated by 'second'.Validation from PeekTwoBlocks().
  140. func (pool *BlockPool) PopRequest() {
  141. pool.mtx.Lock() // Lock
  142. defer pool.mtx.Unlock()
  143. if r := pool.requesters[pool.height]; r != nil {
  144. /* The block can disappear at any time, due to removePeer().
  145. if r := pool.requesters[pool.height]; r == nil || r.block == nil {
  146. PanicSanity("PopRequest() requires a valid block")
  147. }
  148. */
  149. r.Stop()
  150. delete(pool.requesters, pool.height)
  151. pool.height++
  152. } else {
  153. PanicSanity(Fmt("Expected requester to pop, got nothing at height %v", pool.height))
  154. }
  155. }
  156. // Invalidates the block at pool.height,
  157. // Remove the peer and redo request from others.
  158. func (pool *BlockPool) RedoRequest(height int) {
  159. pool.mtx.Lock() // Lock
  160. defer pool.mtx.Unlock()
  161. request := pool.requesters[height]
  162. if request.block == nil {
  163. PanicSanity("Expected block to be non-nil")
  164. }
  165. // RemovePeer will redo all requesters associated with this peer.
  166. // TODO: record this malfeasance
  167. pool.RemovePeer(request.peerID) // Lock on peersMtx.
  168. }
  169. // TODO: ensure that blocks come in order for each peer.
  170. func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) {
  171. pool.mtx.Lock() // Lock
  172. defer pool.mtx.Unlock()
  173. requester := pool.requesters[block.Height]
  174. if requester == nil {
  175. return
  176. }
  177. if requester.setBlock(block, peerID) {
  178. pool.numPending--
  179. peer := pool.getPeer(peerID)
  180. peer.decrPending(blockSize)
  181. } else {
  182. // Bad peer?
  183. }
  184. }
  185. // Sets the peer's alleged blockchain height.
  186. func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
  187. pool.peersMtx.Lock() // Lock
  188. defer pool.peersMtx.Unlock()
  189. peer := pool.peers[peerID]
  190. if peer != nil {
  191. peer.height = height
  192. } else {
  193. peer = newBPPeer(pool, peerID, height)
  194. pool.peers[peerID] = peer
  195. }
  196. }
  197. func (pool *BlockPool) RemovePeer(peerID string) {
  198. pool.peersMtx.Lock() // Lock
  199. defer pool.peersMtx.Unlock()
  200. pool.removePeer(peerID)
  201. }
  202. func (pool *BlockPool) removePeer(peerID string) {
  203. for _, requester := range pool.requesters {
  204. if requester.getPeerID() == peerID {
  205. pool.numPending++
  206. go requester.redo() // pick another peer and ...
  207. }
  208. }
  209. delete(pool.peers, peerID)
  210. }
  211. func (pool *BlockPool) getPeer(peerID string) *bpPeer {
  212. pool.peersMtx.Lock() // Lock
  213. defer pool.peersMtx.Unlock()
  214. peer := pool.peers[peerID]
  215. return peer
  216. }
  217. // Pick an available peer with at least the given minHeight.
  218. // If no peers are available, returns nil.
  219. func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
  220. pool.peersMtx.Lock()
  221. defer pool.peersMtx.Unlock()
  222. for _, peer := range pool.peers {
  223. if peer.isBad() {
  224. pool.removePeer(peer.id)
  225. continue
  226. } else {
  227. }
  228. if peer.numPending >= maxPendingRequestsPerPeer {
  229. continue
  230. }
  231. if peer.height < minHeight {
  232. continue
  233. }
  234. peer.incrPending()
  235. return peer
  236. }
  237. return nil
  238. }
  239. func (pool *BlockPool) makeNextRequester() {
  240. pool.mtx.Lock() // Lock
  241. defer pool.mtx.Unlock()
  242. nextHeight := pool.height + len(pool.requesters)
  243. request := newBPRequester(pool, nextHeight)
  244. pool.requesters[nextHeight] = request
  245. pool.numPending++
  246. request.Start()
  247. }
  248. func (pool *BlockPool) sendRequest(height int, peerID string) {
  249. if !pool.IsRunning() {
  250. return
  251. }
  252. pool.requestsCh <- BlockRequest{height, peerID}
  253. }
  254. func (pool *BlockPool) sendTimeout(peerID string) {
  255. if !pool.IsRunning() {
  256. return
  257. }
  258. pool.timeoutsCh <- peerID
  259. }
  260. func (pool *BlockPool) debug() string {
  261. pool.mtx.Lock() // Lock
  262. defer pool.mtx.Unlock()
  263. str := ""
  264. for h := pool.height; h < pool.height+len(pool.requesters); h++ {
  265. if pool.requesters[h] == nil {
  266. str += Fmt("H(%v):X ", h)
  267. } else {
  268. str += Fmt("H(%v):", h)
  269. str += Fmt("B?(%v) ", pool.requesters[h].block != nil)
  270. }
  271. }
  272. return str
  273. }
  274. //-------------------------------------
  275. type bpPeer struct {
  276. pool *BlockPool
  277. id string
  278. height int
  279. numPending int32
  280. recvMonitor *flow.Monitor
  281. timeout *time.Timer
  282. didTimeout bool
  283. }
  284. func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer {
  285. peer := &bpPeer{
  286. pool: pool,
  287. id: peerID,
  288. height: height,
  289. numPending: 0,
  290. }
  291. return peer
  292. }
  293. func (peer *bpPeer) resetMonitor() {
  294. peer.recvMonitor = flow.New(time.Second, time.Second*40)
  295. var initialValue = float64(minRecvRate) * math.E
  296. peer.recvMonitor.SetREMA(initialValue)
  297. }
  298. func (peer *bpPeer) resetTimeout() {
  299. if peer.timeout == nil {
  300. peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout)
  301. } else {
  302. peer.timeout.Reset(time.Second * peerTimeoutSeconds)
  303. }
  304. }
  305. func (peer *bpPeer) incrPending() {
  306. if peer.numPending == 0 {
  307. peer.resetMonitor()
  308. peer.resetTimeout()
  309. }
  310. peer.numPending++
  311. }
  312. func (peer *bpPeer) decrPending(recvSize int) {
  313. peer.numPending--
  314. if peer.numPending == 0 {
  315. peer.timeout.Stop()
  316. } else {
  317. peer.recvMonitor.Update(recvSize)
  318. peer.resetTimeout()
  319. }
  320. }
  321. func (peer *bpPeer) onTimeout() {
  322. peer.pool.sendTimeout(peer.id)
  323. log.Warn("SendTimeout", "peer", peer.id, "reason", "onTimeout")
  324. peer.didTimeout = true
  325. }
  326. func (peer *bpPeer) isBad() bool {
  327. return peer.didTimeout
  328. }
  329. //-------------------------------------
  330. type bpRequester struct {
  331. QuitService
  332. pool *BlockPool
  333. height int
  334. gotBlockCh chan struct{}
  335. redoCh chan struct{}
  336. mtx sync.Mutex
  337. peerID string
  338. block *types.Block
  339. }
  340. func newBPRequester(pool *BlockPool, height int) *bpRequester {
  341. bpr := &bpRequester{
  342. pool: pool,
  343. height: height,
  344. gotBlockCh: make(chan struct{}),
  345. redoCh: make(chan struct{}),
  346. peerID: "",
  347. block: nil,
  348. }
  349. bpr.QuitService = *NewQuitService(nil, "bpRequester", bpr)
  350. return bpr
  351. }
  352. func (bpr *bpRequester) OnStart() error {
  353. bpr.QuitService.OnStart()
  354. go bpr.requestRoutine()
  355. return nil
  356. }
  357. // Returns true if the peer matches
  358. func (bpr *bpRequester) setBlock(block *types.Block, peerID string) bool {
  359. bpr.mtx.Lock()
  360. if bpr.block != nil || bpr.peerID != peerID {
  361. bpr.mtx.Unlock()
  362. return false
  363. }
  364. bpr.block = block
  365. bpr.mtx.Unlock()
  366. bpr.gotBlockCh <- struct{}{}
  367. return true
  368. }
  369. func (bpr *bpRequester) getBlock() *types.Block {
  370. bpr.mtx.Lock()
  371. defer bpr.mtx.Unlock()
  372. return bpr.block
  373. }
  374. func (bpr *bpRequester) getPeerID() string {
  375. bpr.mtx.Lock()
  376. defer bpr.mtx.Unlock()
  377. return bpr.peerID
  378. }
  379. func (bpr *bpRequester) reset() {
  380. bpr.mtx.Lock()
  381. bpr.peerID = ""
  382. bpr.block = nil
  383. bpr.mtx.Unlock()
  384. }
  385. // Tells bpRequester to pick another peer and try again.
  386. // NOTE: blocking
  387. func (bpr *bpRequester) redo() {
  388. bpr.redoCh <- struct{}{}
  389. }
  390. // Responsible for making more requests as necessary
  391. // Returns only when a block is found (e.g. AddBlock() is called)
  392. func (bpr *bpRequester) requestRoutine() {
  393. OUTER_LOOP:
  394. for {
  395. // Pick a peer to send request to.
  396. var peer *bpPeer = nil
  397. PICK_PEER_LOOP:
  398. for {
  399. if !bpr.IsRunning() || !bpr.pool.IsRunning() {
  400. return
  401. }
  402. peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
  403. if peer == nil {
  404. //log.Info("No peers available", "height", height)
  405. time.Sleep(requestIntervalMS * time.Millisecond)
  406. continue PICK_PEER_LOOP
  407. }
  408. break PICK_PEER_LOOP
  409. }
  410. bpr.mtx.Lock()
  411. bpr.peerID = peer.id
  412. bpr.mtx.Unlock()
  413. // Send request and wait.
  414. bpr.pool.sendRequest(bpr.height, peer.id)
  415. select {
  416. case <-bpr.pool.Quit:
  417. bpr.Stop()
  418. return
  419. case <-bpr.Quit:
  420. return
  421. case <-bpr.redoCh:
  422. bpr.reset()
  423. continue OUTER_LOOP // When peer is removed
  424. case <-bpr.gotBlockCh:
  425. // We got the block, now see if it's good.
  426. select {
  427. case <-bpr.pool.Quit:
  428. bpr.Stop()
  429. return
  430. case <-bpr.Quit:
  431. return
  432. case <-bpr.redoCh:
  433. bpr.reset()
  434. continue OUTER_LOOP
  435. }
  436. }
  437. }
  438. }
  439. //-------------------------------------
  440. type BlockRequest struct {
  441. Height int
  442. PeerID string
  443. }