You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

415 lines
9.2 KiB

10 years ago
10 years ago
  1. package blockchain
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. . "github.com/tendermint/tendermint/common"
  7. "github.com/tendermint/tendermint/types"
  8. )
  9. const (
  10. maxTries = 3
  11. inputsChannelCapacity = 200
  12. requestIntervalMS = 500
  13. maxPendingRequests = 200
  14. maxTotalRequests = 300
  15. maxRequestsPerPeer = 300
  16. )
  17. // numTotal = numPending + blocks in the pool we havnt synced yet
  18. var (
  19. requestTimeoutSeconds = time.Duration(3)
  20. )
  21. /*
  22. Peers self report their heights when a new peer joins the block pool.
  23. Starting from whatever we've got (pool.height), we request blocks
  24. in sequence from peers that reported higher heights than ours.
  25. Every so often we ask peers what height they're on so we can keep going.
  26. Requests are continuously made for blocks of heigher heights until
  27. the limits. If most of the requests have no available peers, and we
  28. are not at peer limits, we can probably switch to consensus reactor
  29. */
  30. type BlockPool struct {
  31. // block requests
  32. requestsMtx sync.Mutex
  33. requests map[uint]*bpRequest
  34. peerless int32 // number of requests without peers
  35. height uint // the lowest key in requests.
  36. numPending int32
  37. numTotal int32
  38. // peers
  39. peersMtx sync.Mutex
  40. peers map[string]*bpPeer
  41. requestsCh chan<- BlockRequest
  42. timeoutsCh chan<- string
  43. repeater *RepeatTimer
  44. running int32 // atomic
  45. }
  46. func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
  47. return &BlockPool{
  48. peers: make(map[string]*bpPeer),
  49. requests: make(map[uint]*bpRequest),
  50. height: start,
  51. numPending: 0,
  52. numTotal: 0,
  53. requestsCh: requestsCh,
  54. timeoutsCh: timeoutsCh,
  55. repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond),
  56. running: 0,
  57. }
  58. }
  59. func (pool *BlockPool) Start() {
  60. if atomic.CompareAndSwapInt32(&pool.running, 0, 1) {
  61. log.Info("Starting BlockPool")
  62. go pool.run()
  63. }
  64. }
  65. func (pool *BlockPool) Stop() {
  66. if atomic.CompareAndSwapInt32(&pool.running, 1, 0) {
  67. log.Info("Stopping BlockPool")
  68. pool.repeater.Stop()
  69. }
  70. }
  71. func (pool *BlockPool) IsRunning() bool {
  72. return atomic.LoadInt32(&pool.running) == 1
  73. }
  74. // Run spawns requests as needed.
  75. func (pool *BlockPool) run() {
  76. RUN_LOOP:
  77. for {
  78. if atomic.LoadInt32(&pool.running) == 0 {
  79. break RUN_LOOP
  80. }
  81. _, numPending, numTotal := pool.GetStatus()
  82. if numPending >= maxPendingRequests {
  83. // sleep for a bit.
  84. time.Sleep(requestIntervalMS * time.Millisecond)
  85. } else if numTotal >= maxTotalRequests {
  86. // sleep for a bit.
  87. time.Sleep(requestIntervalMS * time.Millisecond)
  88. } else {
  89. // request for more blocks.
  90. height := pool.nextHeight()
  91. pool.makeRequest(height)
  92. }
  93. }
  94. }
  95. func (pool *BlockPool) GetStatus() (uint, int32, int32) {
  96. pool.requestsMtx.Lock() // Lock
  97. defer pool.requestsMtx.Unlock()
  98. return pool.height, pool.numPending, pool.numTotal
  99. }
  100. // We need to see the second block's Validation to validate the first block.
  101. // So we peek two blocks at a time.
  102. func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
  103. pool.requestsMtx.Lock() // Lock
  104. defer pool.requestsMtx.Unlock()
  105. if r := pool.requests[pool.height]; r != nil {
  106. first = r.block
  107. }
  108. if r := pool.requests[pool.height+1]; r != nil {
  109. second = r.block
  110. }
  111. return
  112. }
  113. // Pop the first block at pool.height
  114. // It must have been validated by 'second'.Validation from PeekTwoBlocks().
  115. func (pool *BlockPool) PopRequest() {
  116. pool.requestsMtx.Lock() // Lock
  117. defer pool.requestsMtx.Unlock()
  118. if r := pool.requests[pool.height]; r == nil || r.block == nil {
  119. panic("PopRequest() requires a valid block")
  120. }
  121. delete(pool.requests, pool.height)
  122. pool.height++
  123. pool.numTotal--
  124. }
  125. // Invalidates the block at pool.height.
  126. // Remove the peer and request from others.
  127. func (pool *BlockPool) RedoRequest(height uint) {
  128. pool.requestsMtx.Lock() // Lock
  129. defer pool.requestsMtx.Unlock()
  130. request := pool.requests[height]
  131. if request.block == nil {
  132. panic("Expected block to be non-nil")
  133. }
  134. // TODO: record this malfeasance
  135. // maybe punish peer on switch (an invalid block!)
  136. pool.RemovePeer(request.peerId) // Lock on peersMtx.
  137. request.block = nil
  138. request.peerId = ""
  139. pool.numPending++
  140. pool.peerless++
  141. go requestRoutine(pool, height)
  142. }
  143. func (pool *BlockPool) hasBlock(height uint) bool {
  144. pool.requestsMtx.Lock() // Lock
  145. defer pool.requestsMtx.Unlock()
  146. request := pool.requests[height]
  147. return request != nil && request.block != nil
  148. }
  149. func (pool *BlockPool) setPeerForRequest(height uint, peerId string) {
  150. pool.requestsMtx.Lock() // Lock
  151. defer pool.requestsMtx.Unlock()
  152. request := pool.requests[height]
  153. if request == nil {
  154. return
  155. }
  156. pool.peerless--
  157. request.peerId = peerId
  158. }
  159. func (pool *BlockPool) removePeerForRequest(height uint, peerId string) {
  160. pool.requestsMtx.Lock() // Lock
  161. defer pool.requestsMtx.Unlock()
  162. request := pool.requests[height]
  163. if request == nil {
  164. return
  165. }
  166. pool.peerless++
  167. request.peerId = ""
  168. }
  169. func (pool *BlockPool) AddBlock(block *types.Block, peerId string) {
  170. pool.requestsMtx.Lock() // Lock
  171. defer pool.requestsMtx.Unlock()
  172. request := pool.requests[block.Height]
  173. if request == nil {
  174. return
  175. }
  176. if request.peerId != peerId {
  177. return
  178. }
  179. if request.block != nil {
  180. return
  181. }
  182. request.block = block
  183. pool.numPending--
  184. }
  185. func (pool *BlockPool) getPeer(peerId string) *bpPeer {
  186. pool.peersMtx.Lock() // Lock
  187. defer pool.peersMtx.Unlock()
  188. peer := pool.peers[peerId]
  189. return peer
  190. }
  191. // Sets the peer's alleged blockchain height.
  192. func (pool *BlockPool) SetPeerHeight(peerId string, height uint) {
  193. pool.peersMtx.Lock() // Lock
  194. defer pool.peersMtx.Unlock()
  195. peer := pool.peers[peerId]
  196. if peer != nil {
  197. peer.height = height
  198. } else {
  199. peer = &bpPeer{
  200. height: height,
  201. id: peerId,
  202. numRequests: 0,
  203. }
  204. pool.peers[peerId] = peer
  205. }
  206. }
  207. func (pool *BlockPool) RemovePeer(peerId string) {
  208. pool.peersMtx.Lock() // Lock
  209. defer pool.peersMtx.Unlock()
  210. delete(pool.peers, peerId)
  211. }
  212. // Pick an available peer with at least the given minHeight.
  213. // If no peers are available, returns nil.
  214. func (pool *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer {
  215. pool.peersMtx.Lock()
  216. defer pool.peersMtx.Unlock()
  217. for _, peer := range pool.peers {
  218. if peer.numRequests >= maxRequestsPerPeer {
  219. continue
  220. }
  221. if peer.height < minHeight {
  222. continue
  223. }
  224. peer.numRequests++
  225. return peer
  226. }
  227. return nil
  228. }
  229. func (pool *BlockPool) decrPeer(peerId string) {
  230. pool.peersMtx.Lock()
  231. defer pool.peersMtx.Unlock()
  232. peer := pool.peers[peerId]
  233. if peer == nil {
  234. return
  235. }
  236. peer.numRequests--
  237. }
  238. func (pool *BlockPool) nextHeight() uint {
  239. pool.requestsMtx.Lock() // Lock
  240. defer pool.requestsMtx.Unlock()
  241. // we make one request per height.
  242. return pool.height + uint(pool.numTotal)
  243. }
  244. func (pool *BlockPool) makeRequest(height uint) {
  245. pool.requestsMtx.Lock() // Lock
  246. defer pool.requestsMtx.Unlock()
  247. request := &bpRequest{
  248. height: height,
  249. peerId: "",
  250. block: nil,
  251. }
  252. pool.requests[height] = request
  253. pool.peerless++
  254. nextHeight := pool.height + uint(pool.numTotal)
  255. if nextHeight == height {
  256. pool.numTotal++
  257. pool.numPending++
  258. }
  259. go requestRoutine(pool, height)
  260. }
  261. func (pool *BlockPool) sendRequest(height uint, peerId string) {
  262. if atomic.LoadInt32(&pool.running) == 0 {
  263. return
  264. }
  265. pool.requestsCh <- BlockRequest{height, peerId}
  266. }
  267. func (pool *BlockPool) sendTimeout(peerId string) {
  268. if atomic.LoadInt32(&pool.running) == 0 {
  269. return
  270. }
  271. pool.timeoutsCh <- peerId
  272. }
  273. func (pool *BlockPool) debug() string {
  274. pool.requestsMtx.Lock() // Lock
  275. defer pool.requestsMtx.Unlock()
  276. str := ""
  277. for h := pool.height; h < pool.height+uint(pool.numTotal); h++ {
  278. if pool.requests[h] == nil {
  279. str += Fmt("H(%v):X ", h)
  280. } else {
  281. str += Fmt("H(%v):", h)
  282. str += Fmt("B?(%v) ", pool.requests[h].block != nil)
  283. }
  284. }
  285. return str
  286. }
  287. //-------------------------------------
  288. type bpPeer struct {
  289. id string
  290. height uint
  291. numRequests int32
  292. }
  293. type bpRequest struct {
  294. height uint
  295. peerId string
  296. block *types.Block
  297. }
  298. //-------------------------------------
  299. // Responsible for making more requests as necessary
  300. // Returns only when a block is found (e.g. AddBlock() is called)
  301. func requestRoutine(pool *BlockPool, height uint) {
  302. for {
  303. var peer *bpPeer = nil
  304. PICK_LOOP:
  305. for {
  306. if !pool.IsRunning() {
  307. log.Debug("BlockPool not running. Stopping requestRoutine", "height", height)
  308. return
  309. }
  310. peer = pool.pickIncrAvailablePeer(height)
  311. if peer == nil {
  312. //log.Debug("No peers available", "height", height)
  313. time.Sleep(requestIntervalMS * time.Millisecond)
  314. continue PICK_LOOP
  315. }
  316. break PICK_LOOP
  317. }
  318. // set the peer, decrement peerless
  319. pool.setPeerForRequest(height, peer.id)
  320. for try := 0; try < maxTries; try++ {
  321. pool.sendRequest(height, peer.id)
  322. time.Sleep(requestTimeoutSeconds * time.Second)
  323. // if successful the block is either in the pool,
  324. if pool.hasBlock(height) {
  325. pool.decrPeer(peer.id)
  326. return
  327. }
  328. // or already processed and we've moved past it
  329. bpHeight, _, _ := pool.GetStatus()
  330. if height < bpHeight {
  331. pool.decrPeer(peer.id)
  332. return
  333. }
  334. }
  335. // unset the peer, increment peerless
  336. pool.removePeerForRequest(height, peer.id)
  337. // this peer failed us, try again
  338. pool.RemovePeer(peer.id)
  339. pool.sendTimeout(peer.id)
  340. }
  341. }
  342. //-------------------------------------
  343. type BlockRequest struct {
  344. Height uint
  345. PeerId string
  346. }