You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

402 lines
9.0 KiB

10 years ago
10 years ago
  1. package blockchain
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. . "github.com/tendermint/tendermint/common"
  7. "github.com/tendermint/tendermint/types"
  8. )
  9. const (
  10. maxTries = 3
  11. inputsChannelCapacity = 200
  12. requestIntervalMS = 500
  13. maxPendingRequests = 200
  14. maxTotalRequests = 300
  15. maxRequestsPerPeer = 300
  16. )
  17. var (
  18. requestTimeoutSeconds = time.Duration(3)
  19. )
  20. /*
  21. Peers self report their heights when we join the block pool.
  22. Starting from our latest pool.height, we request blocks
  23. in sequence from peers that reported higher heights than ours.
  24. Every so often we ask peers what height they're on so we can keep going.
  25. Requests are continuously made for blocks of heigher heights until
  26. the limits. If most of the requests have no available peers, and we
  27. are not at peer limits, we can probably switch to consensus reactor
  28. */
  29. type BlockPool struct {
  30. // block requests
  31. requestsMtx sync.Mutex
  32. requests map[int]*bpRequest
  33. height int // the lowest key in requests.
  34. numUnassigned int32 // number of requests not yet assigned to a peer
  35. numPending int32 // number of requests pending assignment or block response
  36. // peers
  37. peersMtx sync.Mutex
  38. peers map[string]*bpPeer
  39. requestsCh chan<- BlockRequest
  40. timeoutsCh chan<- string
  41. repeater *RepeatTimer
  42. running int32 // atomic
  43. }
  44. func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
  45. return &BlockPool{
  46. peers: make(map[string]*bpPeer),
  47. requests: make(map[int]*bpRequest),
  48. height: start,
  49. numUnassigned: 0,
  50. numPending: 0,
  51. requestsCh: requestsCh,
  52. timeoutsCh: timeoutsCh,
  53. repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond),
  54. running: 0,
  55. }
  56. }
  57. func (pool *BlockPool) Start() {
  58. if atomic.CompareAndSwapInt32(&pool.running, 0, 1) {
  59. log.Notice("Starting BlockPool")
  60. go pool.run()
  61. }
  62. }
  63. func (pool *BlockPool) Stop() {
  64. if atomic.CompareAndSwapInt32(&pool.running, 1, 0) {
  65. log.Notice("Stopping BlockPool")
  66. pool.repeater.Stop()
  67. }
  68. }
  69. func (pool *BlockPool) IsRunning() bool {
  70. return atomic.LoadInt32(&pool.running) == 1
  71. }
  72. // Run spawns requests as needed.
  73. func (pool *BlockPool) run() {
  74. RUN_LOOP:
  75. for {
  76. if atomic.LoadInt32(&pool.running) == 0 {
  77. break RUN_LOOP
  78. }
  79. _, numPending, _ := pool.GetStatus()
  80. if numPending >= maxPendingRequests {
  81. // sleep for a bit.
  82. time.Sleep(requestIntervalMS * time.Millisecond)
  83. } else if len(pool.requests) >= maxTotalRequests {
  84. // sleep for a bit.
  85. time.Sleep(requestIntervalMS * time.Millisecond)
  86. } else {
  87. // request for more blocks.
  88. pool.makeNextRequest()
  89. }
  90. }
  91. }
  92. func (pool *BlockPool) GetStatus() (height int, numPending int32, numUnssigned int32) {
  93. pool.requestsMtx.Lock() // Lock
  94. defer pool.requestsMtx.Unlock()
  95. return pool.height, pool.numPending, pool.numUnassigned
  96. }
  97. // We need to see the second block's Validation to validate the first block.
  98. // So we peek two blocks at a time.
  99. func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
  100. pool.requestsMtx.Lock() // Lock
  101. defer pool.requestsMtx.Unlock()
  102. if r := pool.requests[pool.height]; r != nil {
  103. first = r.block
  104. }
  105. if r := pool.requests[pool.height+1]; r != nil {
  106. second = r.block
  107. }
  108. return
  109. }
  110. // Pop the first block at pool.height
  111. // It must have been validated by 'second'.Validation from PeekTwoBlocks().
  112. func (pool *BlockPool) PopRequest() {
  113. pool.requestsMtx.Lock() // Lock
  114. defer pool.requestsMtx.Unlock()
  115. // SANITY CHECK
  116. if r := pool.requests[pool.height]; r == nil || r.block == nil {
  117. panic("PopRequest() requires a valid block")
  118. }
  119. // SANITY CHECK END
  120. delete(pool.requests, pool.height)
  121. pool.height++
  122. }
  123. // Invalidates the block at pool.height.
  124. // Remove the peer and request from others.
  125. func (pool *BlockPool) RedoRequest(height int) {
  126. pool.requestsMtx.Lock() // Lock
  127. defer pool.requestsMtx.Unlock()
  128. request := pool.requests[height]
  129. // SANITY CHECK
  130. if request.block == nil {
  131. panic("Expected block to be non-nil")
  132. }
  133. // SANITY CHECK END
  134. // TODO: record this malfeasance
  135. // maybe punish peer on switch (an invalid block!)
  136. pool.RemovePeer(request.peerId) // Lock on peersMtx.
  137. request.block = nil
  138. request.peerId = ""
  139. pool.numPending++
  140. pool.numUnassigned++
  141. go requestRoutine(pool, height)
  142. }
  143. func (pool *BlockPool) hasBlock(height int) bool {
  144. pool.requestsMtx.Lock() // Lock
  145. defer pool.requestsMtx.Unlock()
  146. request := pool.requests[height]
  147. return request != nil && request.block != nil
  148. }
  149. func (pool *BlockPool) setPeerForRequest(height int, peerId string) {
  150. pool.requestsMtx.Lock() // Lock
  151. defer pool.requestsMtx.Unlock()
  152. request := pool.requests[height]
  153. if request == nil {
  154. return
  155. }
  156. pool.numUnassigned--
  157. request.peerId = peerId
  158. }
  159. func (pool *BlockPool) removePeerForRequest(height int, peerId string) {
  160. pool.requestsMtx.Lock() // Lock
  161. defer pool.requestsMtx.Unlock()
  162. request := pool.requests[height]
  163. if request == nil {
  164. return
  165. }
  166. pool.numUnassigned++
  167. request.peerId = ""
  168. }
  169. func (pool *BlockPool) AddBlock(block *types.Block, peerId string) {
  170. pool.requestsMtx.Lock() // Lock
  171. defer pool.requestsMtx.Unlock()
  172. request := pool.requests[block.Height]
  173. if request == nil {
  174. return
  175. }
  176. if request.peerId != peerId {
  177. return
  178. }
  179. if request.block != nil {
  180. return
  181. }
  182. request.block = block
  183. pool.numPending--
  184. }
  185. func (pool *BlockPool) getPeer(peerId string) *bpPeer {
  186. pool.peersMtx.Lock() // Lock
  187. defer pool.peersMtx.Unlock()
  188. peer := pool.peers[peerId]
  189. return peer
  190. }
  191. // Sets the peer's alleged blockchain height.
  192. func (pool *BlockPool) SetPeerHeight(peerId string, height int) {
  193. pool.peersMtx.Lock() // Lock
  194. defer pool.peersMtx.Unlock()
  195. peer := pool.peers[peerId]
  196. if peer != nil {
  197. peer.height = height
  198. } else {
  199. peer = &bpPeer{
  200. height: height,
  201. id: peerId,
  202. numRequests: 0,
  203. }
  204. pool.peers[peerId] = peer
  205. }
  206. }
  207. func (pool *BlockPool) RemovePeer(peerId string) {
  208. pool.peersMtx.Lock() // Lock
  209. defer pool.peersMtx.Unlock()
  210. delete(pool.peers, peerId)
  211. }
  212. // Pick an available peer with at least the given minHeight.
  213. // If no peers are available, returns nil.
  214. func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
  215. pool.peersMtx.Lock()
  216. defer pool.peersMtx.Unlock()
  217. for _, peer := range pool.peers {
  218. if peer.numRequests >= maxRequestsPerPeer {
  219. continue
  220. }
  221. if peer.height < minHeight {
  222. continue
  223. }
  224. peer.numRequests++
  225. return peer
  226. }
  227. return nil
  228. }
  229. func (pool *BlockPool) decrPeer(peerId string) {
  230. pool.peersMtx.Lock()
  231. defer pool.peersMtx.Unlock()
  232. peer := pool.peers[peerId]
  233. if peer == nil {
  234. return
  235. }
  236. peer.numRequests--
  237. }
  238. func (pool *BlockPool) makeNextRequest() {
  239. pool.requestsMtx.Lock() // Lock
  240. defer pool.requestsMtx.Unlock()
  241. nextHeight := pool.height + len(pool.requests)
  242. request := &bpRequest{
  243. height: nextHeight,
  244. peerId: "",
  245. block: nil,
  246. }
  247. pool.requests[nextHeight] = request
  248. pool.numUnassigned++
  249. pool.numPending++
  250. go requestRoutine(pool, nextHeight)
  251. }
  252. func (pool *BlockPool) sendRequest(height int, peerId string) {
  253. if atomic.LoadInt32(&pool.running) == 0 {
  254. return
  255. }
  256. pool.requestsCh <- BlockRequest{height, peerId}
  257. }
  258. func (pool *BlockPool) sendTimeout(peerId string) {
  259. if atomic.LoadInt32(&pool.running) == 0 {
  260. return
  261. }
  262. pool.timeoutsCh <- peerId
  263. }
  264. func (pool *BlockPool) debug() string {
  265. pool.requestsMtx.Lock() // Lock
  266. defer pool.requestsMtx.Unlock()
  267. str := ""
  268. for h := pool.height; h < pool.height+len(pool.requests); h++ {
  269. if pool.requests[h] == nil {
  270. str += Fmt("H(%v):X ", h)
  271. } else {
  272. str += Fmt("H(%v):", h)
  273. str += Fmt("B?(%v) ", pool.requests[h].block != nil)
  274. }
  275. }
  276. return str
  277. }
  278. //-------------------------------------
  279. type bpPeer struct {
  280. id string
  281. height int
  282. numRequests int32
  283. }
  284. type bpRequest struct {
  285. height int
  286. peerId string
  287. block *types.Block
  288. }
  289. //-------------------------------------
  290. // Responsible for making more requests as necessary
  291. // Returns only when a block is found (e.g. AddBlock() is called)
  292. func requestRoutine(pool *BlockPool, height int) {
  293. for {
  294. var peer *bpPeer = nil
  295. PICK_LOOP:
  296. for {
  297. if !pool.IsRunning() {
  298. log.Info("BlockPool not running. Stopping requestRoutine", "height", height)
  299. return
  300. }
  301. peer = pool.pickIncrAvailablePeer(height)
  302. if peer == nil {
  303. //log.Info("No peers available", "height", height)
  304. time.Sleep(requestIntervalMS * time.Millisecond)
  305. continue PICK_LOOP
  306. }
  307. break PICK_LOOP
  308. }
  309. // set the peer, decrement numUnassigned
  310. pool.setPeerForRequest(height, peer.id)
  311. for try := 0; try < maxTries; try++ {
  312. pool.sendRequest(height, peer.id)
  313. time.Sleep(requestTimeoutSeconds * time.Second)
  314. // if successful the block is either in the pool,
  315. if pool.hasBlock(height) {
  316. pool.decrPeer(peer.id)
  317. return
  318. }
  319. // or already processed and we've moved past it
  320. bpHeight, _, _ := pool.GetStatus()
  321. if height < bpHeight {
  322. pool.decrPeer(peer.id)
  323. return
  324. }
  325. }
  326. // unset the peer, increment numUnassigned
  327. pool.removePeerForRequest(height, peer.id)
  328. // this peer failed us, try again
  329. pool.RemovePeer(peer.id)
  330. pool.sendTimeout(peer.id)
  331. }
  332. }
  333. //-------------------------------------
  334. type BlockRequest struct {
  335. Height int
  336. PeerId string
  337. }