You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

522 lines
12 KiB

8 years ago
8 years ago
8 years ago
10 years ago
8 years ago
8 years ago
8 years ago
8 years ago
10 years ago
9 years ago
8 years ago
9 years ago
10 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
  1. package blockchain
  2. import (
  3. "math"
  4. "sync"
  5. "time"
  6. "github.com/tendermint/tendermint/types"
  7. . "github.com/tendermint/tmlibs/common"
  8. flow "github.com/tendermint/tmlibs/flowrate"
  9. "github.com/tendermint/tmlibs/log"
  10. )
  11. const (
  12. requestIntervalMS = 250
  13. maxTotalRequesters = 300
  14. maxPendingRequests = maxTotalRequesters
  15. maxPendingRequestsPerPeer = 75
  16. minRecvRate = 10240 // 10Kb/s
  17. )
  18. var peerTimeoutSeconds = time.Duration(15) // not const so we can override with tests
  19. /*
  20. Peers self report their heights when we join the block pool.
  21. Starting from our latest pool.height, we request blocks
  22. in sequence from peers that reported higher heights than ours.
  23. Every so often we ask peers what height they're on so we can keep going.
  24. Requests are continuously made for blocks of higher heights until
  25. the limits. If most of the requests have no available peers, and we
  26. are not at peer limits, we can probably switch to consensus reactor
  27. */
  28. type BlockPool struct {
  29. BaseService
  30. startTime time.Time
  31. mtx sync.Mutex
  32. // block requests
  33. requesters map[int]*bpRequester
  34. height int // the lowest key in requesters.
  35. numPending int32 // number of requests pending assignment or block response
  36. // peers
  37. peers map[string]*bpPeer
  38. requestsCh chan<- BlockRequest
  39. timeoutsCh chan<- string
  40. }
  41. func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
  42. bp := &BlockPool{
  43. peers: make(map[string]*bpPeer),
  44. requesters: make(map[int]*bpRequester),
  45. height: start,
  46. numPending: 0,
  47. requestsCh: requestsCh,
  48. timeoutsCh: timeoutsCh,
  49. }
  50. bp.BaseService = *NewBaseService(nil, "BlockPool", bp)
  51. return bp
  52. }
  53. func (pool *BlockPool) OnStart() error {
  54. go pool.makeRequestersRoutine()
  55. pool.startTime = time.Now()
  56. return nil
  57. }
  58. func (pool *BlockPool) OnStop() {
  59. pool.BaseService.OnStop()
  60. }
  61. // Run spawns requesters as needed.
  62. func (pool *BlockPool) makeRequestersRoutine() {
  63. for {
  64. if !pool.IsRunning() {
  65. break
  66. }
  67. _, numPending, lenRequesters := pool.GetStatus()
  68. if numPending >= maxPendingRequests {
  69. // sleep for a bit.
  70. time.Sleep(requestIntervalMS * time.Millisecond)
  71. // check for timed out peers
  72. pool.removeTimedoutPeers()
  73. } else if lenRequesters >= maxTotalRequesters {
  74. // sleep for a bit.
  75. time.Sleep(requestIntervalMS * time.Millisecond)
  76. // check for timed out peers
  77. pool.removeTimedoutPeers()
  78. } else {
  79. // request for more blocks.
  80. pool.makeNextRequester()
  81. }
  82. }
  83. }
  84. func (pool *BlockPool) removeTimedoutPeers() {
  85. pool.mtx.Lock()
  86. defer pool.mtx.Unlock()
  87. for _, peer := range pool.peers {
  88. if !peer.didTimeout && peer.numPending > 0 {
  89. curRate := peer.recvMonitor.Status().CurRate
  90. // XXX remove curRate != 0
  91. if curRate != 0 && curRate < minRecvRate {
  92. pool.sendTimeout(peer.id)
  93. pool.Logger.Error("SendTimeout", "peer", peer.id, "reason", "curRate too low")
  94. peer.didTimeout = true
  95. }
  96. }
  97. if peer.didTimeout {
  98. pool.removePeer(peer.id)
  99. }
  100. }
  101. }
  102. func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) {
  103. pool.mtx.Lock()
  104. defer pool.mtx.Unlock()
  105. return pool.height, pool.numPending, len(pool.requesters)
  106. }
  107. // TODO: relax conditions, prevent abuse.
  108. func (pool *BlockPool) IsCaughtUp() bool {
  109. pool.mtx.Lock()
  110. defer pool.mtx.Unlock()
  111. height := pool.height
  112. // Need at least 1 peer to be considered caught up.
  113. if len(pool.peers) == 0 {
  114. pool.Logger.Debug("Blockpool has no peers")
  115. return false
  116. }
  117. maxPeerHeight := 0
  118. for _, peer := range pool.peers {
  119. maxPeerHeight = MaxInt(maxPeerHeight, peer.height)
  120. }
  121. isCaughtUp := (height > 0 || time.Now().Sub(pool.startTime) > 5*time.Second) && (maxPeerHeight == 0 || height >= maxPeerHeight)
  122. pool.Logger.Info(Fmt("IsCaughtUp: %v", isCaughtUp), "height", height, "maxPeerHeight", maxPeerHeight)
  123. return isCaughtUp
  124. }
  125. // We need to see the second block's Commit to validate the first block.
  126. // So we peek two blocks at a time.
  127. // The caller will verify the commit.
  128. func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
  129. pool.mtx.Lock()
  130. defer pool.mtx.Unlock()
  131. if r := pool.requesters[pool.height]; r != nil {
  132. first = r.getBlock()
  133. }
  134. if r := pool.requesters[pool.height+1]; r != nil {
  135. second = r.getBlock()
  136. }
  137. return
  138. }
  139. // Pop the first block at pool.height
  140. // It must have been validated by 'second'.Commit from PeekTwoBlocks().
  141. func (pool *BlockPool) PopRequest() {
  142. pool.mtx.Lock()
  143. defer pool.mtx.Unlock()
  144. if r := pool.requesters[pool.height]; r != nil {
  145. /* The block can disappear at any time, due to removePeer().
  146. if r := pool.requesters[pool.height]; r == nil || r.block == nil {
  147. PanicSanity("PopRequest() requires a valid block")
  148. }
  149. */
  150. r.Stop()
  151. delete(pool.requesters, pool.height)
  152. pool.height++
  153. } else {
  154. PanicSanity(Fmt("Expected requester to pop, got nothing at height %v", pool.height))
  155. }
  156. }
  157. // Invalidates the block at pool.height,
  158. // Remove the peer and redo request from others.
  159. func (pool *BlockPool) RedoRequest(height int) {
  160. pool.mtx.Lock()
  161. request := pool.requesters[height]
  162. pool.mtx.Unlock()
  163. if request.block == nil {
  164. PanicSanity("Expected block to be non-nil")
  165. }
  166. // RemovePeer will redo all requesters associated with this peer.
  167. // TODO: record this malfeasance
  168. pool.RemovePeer(request.peerID)
  169. }
  170. // TODO: ensure that blocks come in order for each peer.
  171. func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) {
  172. pool.mtx.Lock()
  173. defer pool.mtx.Unlock()
  174. requester := pool.requesters[block.Height]
  175. if requester == nil {
  176. return
  177. }
  178. if requester.setBlock(block, peerID) {
  179. pool.numPending--
  180. peer := pool.peers[peerID]
  181. peer.decrPending(blockSize)
  182. } else {
  183. // Bad peer?
  184. }
  185. }
  186. // Sets the peer's alleged blockchain height.
  187. func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
  188. pool.mtx.Lock()
  189. defer pool.mtx.Unlock()
  190. peer := pool.peers[peerID]
  191. if peer != nil {
  192. peer.height = height
  193. } else {
  194. peer = newBPPeer(pool, peerID, height)
  195. peer.setLogger(pool.Logger.With("peer", peerID))
  196. pool.peers[peerID] = peer
  197. }
  198. }
  199. func (pool *BlockPool) RemovePeer(peerID string) {
  200. pool.mtx.Lock()
  201. defer pool.mtx.Unlock()
  202. pool.removePeer(peerID)
  203. }
  204. func (pool *BlockPool) removePeer(peerID string) {
  205. for _, requester := range pool.requesters {
  206. if requester.getPeerID() == peerID {
  207. if requester.getBlock() != nil {
  208. pool.numPending++
  209. }
  210. go requester.redo() // pick another peer and ...
  211. }
  212. }
  213. delete(pool.peers, peerID)
  214. }
  215. // Pick an available peer with at least the given minHeight.
  216. // If no peers are available, returns nil.
  217. func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
  218. pool.mtx.Lock()
  219. defer pool.mtx.Unlock()
  220. for _, peer := range pool.peers {
  221. if peer.didTimeout {
  222. pool.removePeer(peer.id)
  223. continue
  224. } else {
  225. }
  226. if peer.numPending >= maxPendingRequestsPerPeer {
  227. continue
  228. }
  229. if peer.height < minHeight {
  230. continue
  231. }
  232. peer.incrPending()
  233. return peer
  234. }
  235. return nil
  236. }
  237. func (pool *BlockPool) makeNextRequester() {
  238. pool.mtx.Lock()
  239. defer pool.mtx.Unlock()
  240. nextHeight := pool.height + len(pool.requesters)
  241. request := newBPRequester(pool, nextHeight)
  242. request.SetLogger(pool.Logger.With("height", nextHeight))
  243. pool.requesters[nextHeight] = request
  244. pool.numPending++
  245. request.Start()
  246. }
  247. func (pool *BlockPool) sendRequest(height int, peerID string) {
  248. if !pool.IsRunning() {
  249. return
  250. }
  251. pool.requestsCh <- BlockRequest{height, peerID}
  252. }
  253. func (pool *BlockPool) sendTimeout(peerID string) {
  254. if !pool.IsRunning() {
  255. return
  256. }
  257. pool.timeoutsCh <- peerID
  258. }
  259. func (pool *BlockPool) debug() string {
  260. pool.mtx.Lock() // Lock
  261. defer pool.mtx.Unlock()
  262. str := ""
  263. for h := pool.height; h < pool.height+len(pool.requesters); h++ {
  264. if pool.requesters[h] == nil {
  265. str += Fmt("H(%v):X ", h)
  266. } else {
  267. str += Fmt("H(%v):", h)
  268. str += Fmt("B?(%v) ", pool.requesters[h].block != nil)
  269. }
  270. }
  271. return str
  272. }
  273. //-------------------------------------
  274. type bpPeer struct {
  275. pool *BlockPool
  276. id string
  277. recvMonitor *flow.Monitor
  278. mtx sync.Mutex
  279. height int
  280. numPending int32
  281. timeout *time.Timer
  282. didTimeout bool
  283. logger log.Logger
  284. }
  285. func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer {
  286. peer := &bpPeer{
  287. pool: pool,
  288. id: peerID,
  289. height: height,
  290. numPending: 0,
  291. logger: log.NewNopLogger(),
  292. }
  293. return peer
  294. }
  295. func (peer *bpPeer) setLogger(l log.Logger) {
  296. peer.logger = l
  297. }
  298. func (peer *bpPeer) resetMonitor() {
  299. peer.recvMonitor = flow.New(time.Second, time.Second*40)
  300. var initialValue = float64(minRecvRate) * math.E
  301. peer.recvMonitor.SetREMA(initialValue)
  302. }
  303. func (peer *bpPeer) resetTimeout() {
  304. if peer.timeout == nil {
  305. peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout)
  306. } else {
  307. peer.timeout.Reset(time.Second * peerTimeoutSeconds)
  308. }
  309. }
  310. func (peer *bpPeer) incrPending() {
  311. if peer.numPending == 0 {
  312. peer.resetMonitor()
  313. peer.resetTimeout()
  314. }
  315. peer.numPending++
  316. }
  317. func (peer *bpPeer) decrPending(recvSize int) {
  318. peer.numPending--
  319. if peer.numPending == 0 {
  320. peer.timeout.Stop()
  321. } else {
  322. peer.recvMonitor.Update(recvSize)
  323. peer.resetTimeout()
  324. }
  325. }
  326. func (peer *bpPeer) onTimeout() {
  327. peer.pool.mtx.Lock()
  328. defer peer.pool.mtx.Unlock()
  329. peer.pool.sendTimeout(peer.id)
  330. peer.logger.Error("SendTimeout", "reason", "onTimeout")
  331. peer.didTimeout = true
  332. }
  333. //-------------------------------------
  334. type bpRequester struct {
  335. BaseService
  336. pool *BlockPool
  337. height int
  338. gotBlockCh chan struct{}
  339. redoCh chan struct{}
  340. mtx sync.Mutex
  341. peerID string
  342. block *types.Block
  343. }
  344. func newBPRequester(pool *BlockPool, height int) *bpRequester {
  345. bpr := &bpRequester{
  346. pool: pool,
  347. height: height,
  348. gotBlockCh: make(chan struct{}),
  349. redoCh: make(chan struct{}),
  350. peerID: "",
  351. block: nil,
  352. }
  353. bpr.BaseService = *NewBaseService(nil, "bpRequester", bpr)
  354. return bpr
  355. }
  356. func (bpr *bpRequester) OnStart() error {
  357. go bpr.requestRoutine()
  358. return nil
  359. }
  360. // Returns true if the peer matches
  361. func (bpr *bpRequester) setBlock(block *types.Block, peerID string) bool {
  362. bpr.mtx.Lock()
  363. if bpr.block != nil || bpr.peerID != peerID {
  364. bpr.mtx.Unlock()
  365. return false
  366. }
  367. bpr.block = block
  368. bpr.mtx.Unlock()
  369. bpr.gotBlockCh <- struct{}{}
  370. return true
  371. }
  372. func (bpr *bpRequester) getBlock() *types.Block {
  373. bpr.mtx.Lock()
  374. defer bpr.mtx.Unlock()
  375. return bpr.block
  376. }
  377. func (bpr *bpRequester) getPeerID() string {
  378. bpr.mtx.Lock()
  379. defer bpr.mtx.Unlock()
  380. return bpr.peerID
  381. }
  382. func (bpr *bpRequester) reset() {
  383. bpr.mtx.Lock()
  384. bpr.peerID = ""
  385. bpr.block = nil
  386. bpr.mtx.Unlock()
  387. }
  388. // Tells bpRequester to pick another peer and try again.
  389. // NOTE: blocking
  390. func (bpr *bpRequester) redo() {
  391. bpr.redoCh <- struct{}{}
  392. }
  393. // Responsible for making more requests as necessary
  394. // Returns only when a block is found (e.g. AddBlock() is called)
  395. func (bpr *bpRequester) requestRoutine() {
  396. OUTER_LOOP:
  397. for {
  398. // Pick a peer to send request to.
  399. var peer *bpPeer = nil
  400. PICK_PEER_LOOP:
  401. for {
  402. if !bpr.IsRunning() || !bpr.pool.IsRunning() {
  403. return
  404. }
  405. peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
  406. if peer == nil {
  407. //log.Info("No peers available", "height", height)
  408. time.Sleep(requestIntervalMS * time.Millisecond)
  409. continue PICK_PEER_LOOP
  410. }
  411. break PICK_PEER_LOOP
  412. }
  413. bpr.mtx.Lock()
  414. bpr.peerID = peer.id
  415. bpr.mtx.Unlock()
  416. // Send request and wait.
  417. bpr.pool.sendRequest(bpr.height, peer.id)
  418. select {
  419. case <-bpr.pool.Quit:
  420. bpr.Stop()
  421. return
  422. case <-bpr.Quit:
  423. return
  424. case <-bpr.redoCh:
  425. bpr.reset()
  426. continue OUTER_LOOP // When peer is removed
  427. case <-bpr.gotBlockCh:
  428. // We got the block, now see if it's good.
  429. select {
  430. case <-bpr.pool.Quit:
  431. bpr.Stop()
  432. return
  433. case <-bpr.Quit:
  434. return
  435. case <-bpr.redoCh:
  436. bpr.reset()
  437. continue OUTER_LOOP
  438. }
  439. }
  440. }
  441. }
  442. //-------------------------------------
  443. type BlockRequest struct {
  444. Height int
  445. PeerID string
  446. }