You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

369 lines
11 KiB

blockchain: Reorg reactor (#3561) * go routines in blockchain reactor * Added reference to the go routine diagram * Initial commit * cleanup * Undo testing_logger change, committed by mistake * Fix the test loggers * pulled some fsm code into pool.go * added pool tests * changes to the design added block requests under peer moved the request trigger in the reactor poolRoutine, triggered now by a ticker in general moved everything required for making block requests smarter in the poolRoutine added a simple map of heights to keep track of what will need to be requested next added a few more tests * send errors to FSM in a different channel than blocks send errors (RemovePeer) from switch on a different channel than the one receiving blocks renamed channels added more pool tests * more pool tests * lint errors * more tests * more tests * switch fast sync to new implementation * fixed data race in tests * cleanup * finished fsm tests * address golangci comments :) * address golangci comments :) * Added timeout on next block needed to advance * updating docs and cleanup * fix issue in test from previous cleanup * cleanup * Added termination scenarios, tests and more cleanup * small fixes to adr, comments and cleanup * Fix bug in sendRequest() If we tried to send a request to a peer not present in the switch, a missing continue statement caused the request to be blackholed in a peer that was removed and never retried. While this bug was manifesting, the reactor kept asking for other blocks that would be stored and never consumed. Added the number of unconsumed blocks in the math for requesting blocks ahead of current processing height so eventually there will be no more blocks requested until the already received ones are consumed. * remove bpPeer's didTimeout field * Use distinct err codes for peer timeout and FSM timeouts * Don't allow peers to update with lower height * review comments from Ethan and Zarko * some cleanup, renaming, comments * Move block execution in separate goroutine * Remove pool's numPending * review comments * fix lint, remove old blockchain reactor and duplicates in fsm tests * small reorg around peer after review comments * add the reactor spec * verify block only once * review comments * change to int for max number of pending requests * cleanup and godoc * Add configuration flag fast sync version * golangci fixes * fix config template * move both reactor versions under blockchain * cleanup, golint, renaming stuff * updated documentation, fixed more golint warnings * integrate with behavior package * sync with master * gofmt * add changelog_pending entry * move to improvments * suggestion to changelog entry
6 years ago
  1. package v1
  2. import (
  3. "sort"
  4. "github.com/tendermint/tendermint/libs/log"
  5. "github.com/tendermint/tendermint/p2p"
  6. "github.com/tendermint/tendermint/types"
  7. )
  8. // BlockPool keeps track of the fast sync peers, block requests and block responses.
  9. type BlockPool struct {
  10. logger log.Logger
  11. // Set of peers that have sent status responses, with height bigger than pool.Height
  12. peers map[p2p.ID]*BpPeer
  13. // Set of block heights and the corresponding peers from where a block response is expected or has been received.
  14. blocks map[int64]p2p.ID
  15. plannedRequests map[int64]struct{} // list of blocks to be assigned peers for blockRequest
  16. nextRequestHeight int64 // next height to be added to plannedRequests
  17. Height int64 // height of next block to execute
  18. MaxPeerHeight int64 // maximum height of all peers
  19. toBcR bcReactor
  20. }
  21. // NewBlockPool creates a new BlockPool.
  22. func NewBlockPool(height int64, toBcR bcReactor) *BlockPool {
  23. return &BlockPool{
  24. Height: height,
  25. MaxPeerHeight: 0,
  26. peers: make(map[p2p.ID]*BpPeer),
  27. blocks: make(map[int64]p2p.ID),
  28. plannedRequests: make(map[int64]struct{}),
  29. nextRequestHeight: height,
  30. toBcR: toBcR,
  31. }
  32. }
  33. // SetLogger sets the logger of the pool.
  34. func (pool *BlockPool) SetLogger(l log.Logger) {
  35. pool.logger = l
  36. }
  37. // ReachedMaxHeight check if the pool has reached the maximum peer height.
  38. func (pool *BlockPool) ReachedMaxHeight() bool {
  39. return pool.Height >= pool.MaxPeerHeight
  40. }
  41. func (pool *BlockPool) rescheduleRequest(peerID p2p.ID, height int64) {
  42. pool.logger.Info("reschedule requests made to peer for height ", "peerID", peerID, "height", height)
  43. pool.plannedRequests[height] = struct{}{}
  44. delete(pool.blocks, height)
  45. pool.peers[peerID].RemoveBlock(height)
  46. }
  47. // Updates the pool's max height. If no peers are left MaxPeerHeight is set to 0.
  48. func (pool *BlockPool) updateMaxPeerHeight() {
  49. var newMax int64
  50. for _, peer := range pool.peers {
  51. peerHeight := peer.Height
  52. if peerHeight > newMax {
  53. newMax = peerHeight
  54. }
  55. }
  56. pool.MaxPeerHeight = newMax
  57. }
  58. // UpdatePeer adds a new peer or updates an existing peer with a new height.
  59. // If a peer is short it is not added.
  60. func (pool *BlockPool) UpdatePeer(peerID p2p.ID, height int64) error {
  61. peer := pool.peers[peerID]
  62. if peer == nil {
  63. if height < pool.Height {
  64. pool.logger.Info("Peer height too small",
  65. "peer", peerID, "height", height, "fsm_height", pool.Height)
  66. return errPeerTooShort
  67. }
  68. // Add new peer.
  69. peer = NewBpPeer(peerID, height, pool.toBcR.sendPeerError, nil)
  70. peer.SetLogger(pool.logger.With("peer", peerID))
  71. pool.peers[peerID] = peer
  72. pool.logger.Info("added peer", "peerID", peerID, "height", height, "num_peers", len(pool.peers))
  73. } else {
  74. // Check if peer is lowering its height. This is not allowed.
  75. if height < peer.Height {
  76. pool.RemovePeer(peerID, errPeerLowersItsHeight)
  77. return errPeerLowersItsHeight
  78. }
  79. // Update existing peer.
  80. peer.Height = height
  81. }
  82. // Update the pool's MaxPeerHeight if needed.
  83. pool.updateMaxPeerHeight()
  84. return nil
  85. }
  86. // Cleans and deletes the peer. Recomputes the max peer height.
  87. func (pool *BlockPool) deletePeer(peer *BpPeer) {
  88. if peer == nil {
  89. return
  90. }
  91. peer.Cleanup()
  92. delete(pool.peers, peer.ID)
  93. if peer.Height == pool.MaxPeerHeight {
  94. pool.updateMaxPeerHeight()
  95. }
  96. }
  97. // RemovePeer removes the blocks and requests from the peer, reschedules them and deletes the peer.
  98. func (pool *BlockPool) RemovePeer(peerID p2p.ID, err error) {
  99. peer := pool.peers[peerID]
  100. if peer == nil {
  101. return
  102. }
  103. pool.logger.Info("removing peer", "peerID", peerID, "error", err)
  104. // Reschedule the block requests made to the peer, or received and not processed yet.
  105. // Note that some of the requests may be removed further down.
  106. for h := range pool.peers[peerID].blocks {
  107. pool.rescheduleRequest(peerID, h)
  108. }
  109. oldMaxPeerHeight := pool.MaxPeerHeight
  110. // Delete the peer. This operation may result in the pool's MaxPeerHeight being lowered.
  111. pool.deletePeer(peer)
  112. // Check if the pool's MaxPeerHeight has been lowered.
  113. // This may happen if the tallest peer has been removed.
  114. if oldMaxPeerHeight > pool.MaxPeerHeight {
  115. // Remove any planned requests for heights over the new MaxPeerHeight.
  116. for h := range pool.plannedRequests {
  117. if h > pool.MaxPeerHeight {
  118. delete(pool.plannedRequests, h)
  119. }
  120. }
  121. // Adjust the nextRequestHeight to the new max plus one.
  122. if pool.nextRequestHeight > pool.MaxPeerHeight {
  123. pool.nextRequestHeight = pool.MaxPeerHeight + 1
  124. }
  125. }
  126. }
  127. func (pool *BlockPool) removeShortPeers() {
  128. for _, peer := range pool.peers {
  129. if peer.Height < pool.Height {
  130. pool.RemovePeer(peer.ID, nil)
  131. }
  132. }
  133. }
  134. func (pool *BlockPool) removeBadPeers() {
  135. pool.removeShortPeers()
  136. for _, peer := range pool.peers {
  137. if err := peer.CheckRate(); err != nil {
  138. pool.RemovePeer(peer.ID, err)
  139. pool.toBcR.sendPeerError(err, peer.ID)
  140. }
  141. }
  142. }
  143. // MakeNextRequests creates more requests if the block pool is running low.
  144. func (pool *BlockPool) MakeNextRequests(maxNumRequests int) {
  145. heights := pool.makeRequestBatch(maxNumRequests)
  146. if len(heights) != 0 {
  147. pool.logger.Info("makeNextRequests will make following requests",
  148. "number", len(heights), "heights", heights)
  149. }
  150. for _, height := range heights {
  151. h := int64(height)
  152. if !pool.sendRequest(h) {
  153. // If a good peer was not found for sending the request at height h then return,
  154. // as it shouldn't be possible to find a peer for h+1.
  155. return
  156. }
  157. delete(pool.plannedRequests, h)
  158. }
  159. }
  160. // Makes a batch of requests sorted by height such that the block pool has up to maxNumRequests entries.
  161. func (pool *BlockPool) makeRequestBatch(maxNumRequests int) []int {
  162. pool.removeBadPeers()
  163. // At this point pool.requests may include heights for requests to be redone due to removal of peers:
  164. // - peers timed out or were removed by switch
  165. // - FSM timed out on waiting to advance the block execution due to missing blocks at h or h+1
  166. // Determine the number of requests needed by subtracting the number of requests already made from the maximum
  167. // allowed
  168. numNeeded := int(maxNumRequests) - len(pool.blocks)
  169. for len(pool.plannedRequests) < numNeeded {
  170. if pool.nextRequestHeight > pool.MaxPeerHeight {
  171. break
  172. }
  173. pool.plannedRequests[pool.nextRequestHeight] = struct{}{}
  174. pool.nextRequestHeight++
  175. }
  176. heights := make([]int, 0, len(pool.plannedRequests))
  177. for k := range pool.plannedRequests {
  178. heights = append(heights, int(k))
  179. }
  180. sort.Ints(heights)
  181. return heights
  182. }
  183. func (pool *BlockPool) sendRequest(height int64) bool {
  184. for _, peer := range pool.peers {
  185. if peer.NumPendingBlockRequests >= maxRequestsPerPeer {
  186. continue
  187. }
  188. if peer.Height < height {
  189. continue
  190. }
  191. err := pool.toBcR.sendBlockRequest(peer.ID, height)
  192. if err == errNilPeerForBlockRequest {
  193. // Switch does not have this peer, remove it and continue to look for another peer.
  194. pool.logger.Error("switch does not have peer..removing peer selected for height", "peer",
  195. peer.ID, "height", height)
  196. pool.RemovePeer(peer.ID, err)
  197. continue
  198. }
  199. if err == errSendQueueFull {
  200. pool.logger.Error("peer queue is full", "peer", peer.ID, "height", height)
  201. continue
  202. }
  203. pool.logger.Info("assigned request to peer", "peer", peer.ID, "height", height)
  204. pool.blocks[height] = peer.ID
  205. peer.RequestSent(height)
  206. return true
  207. }
  208. pool.logger.Error("could not find peer to send request for block at height", "height", height)
  209. return false
  210. }
  211. // AddBlock validates that the block comes from the peer it was expected from and stores it in the 'blocks' map.
  212. func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) error {
  213. peer, ok := pool.peers[peerID]
  214. if !ok {
  215. pool.logger.Error("block from unknown peer", "height", block.Height, "peer", peerID)
  216. return errBadDataFromPeer
  217. }
  218. if wantPeerID, ok := pool.blocks[block.Height]; ok && wantPeerID != peerID {
  219. pool.logger.Error("block received from wrong peer", "height", block.Height,
  220. "peer", peerID, "expected_peer", wantPeerID)
  221. return errBadDataFromPeer
  222. }
  223. return peer.AddBlock(block, blockSize)
  224. }
  225. // BlockData stores the peer responsible to deliver a block and the actual block if delivered.
  226. type BlockData struct {
  227. block *types.Block
  228. peer *BpPeer
  229. }
  230. // BlockAndPeerAtHeight retrieves the block and delivery peer at specified height.
  231. // Returns errMissingBlock if a block was not found
  232. func (pool *BlockPool) BlockAndPeerAtHeight(height int64) (bData *BlockData, err error) {
  233. peerID := pool.blocks[height]
  234. peer := pool.peers[peerID]
  235. if peer == nil {
  236. return nil, errMissingBlock
  237. }
  238. block, err := peer.BlockAtHeight(height)
  239. if err != nil {
  240. return nil, err
  241. }
  242. return &BlockData{peer: peer, block: block}, nil
  243. }
  244. // FirstTwoBlocksAndPeers returns the blocks and the delivery peers at pool's height H and H+1.
  245. func (pool *BlockPool) FirstTwoBlocksAndPeers() (first, second *BlockData, err error) {
  246. first, err = pool.BlockAndPeerAtHeight(pool.Height)
  247. second, err2 := pool.BlockAndPeerAtHeight(pool.Height + 1)
  248. if err == nil {
  249. err = err2
  250. }
  251. return
  252. }
  253. // InvalidateFirstTwoBlocks removes the peers that sent us the first two blocks, blocks are removed by RemovePeer().
  254. func (pool *BlockPool) InvalidateFirstTwoBlocks(err error) {
  255. first, err1 := pool.BlockAndPeerAtHeight(pool.Height)
  256. second, err2 := pool.BlockAndPeerAtHeight(pool.Height + 1)
  257. if err1 == nil {
  258. pool.RemovePeer(first.peer.ID, err)
  259. }
  260. if err2 == nil {
  261. pool.RemovePeer(second.peer.ID, err)
  262. }
  263. }
  264. // ProcessedCurrentHeightBlock performs cleanup after a block is processed. It removes block at pool height and
  265. // the peers that are now short.
  266. func (pool *BlockPool) ProcessedCurrentHeightBlock() {
  267. peerID, peerOk := pool.blocks[pool.Height]
  268. if peerOk {
  269. pool.peers[peerID].RemoveBlock(pool.Height)
  270. }
  271. delete(pool.blocks, pool.Height)
  272. pool.logger.Debug("removed block at height", "height", pool.Height)
  273. pool.Height++
  274. pool.removeShortPeers()
  275. }
  276. // RemovePeerAtCurrentHeights checks if a block at pool's height H exists and if not, it removes the
  277. // delivery peer and returns. If a block at height H exists then the check and peer removal is done for H+1.
  278. // This function is called when the FSM is not able to make progress for some time.
  279. // This happens if either the block H or H+1 have not been delivered.
  280. func (pool *BlockPool) RemovePeerAtCurrentHeights(err error) {
  281. peerID := pool.blocks[pool.Height]
  282. peer, ok := pool.peers[peerID]
  283. if ok {
  284. if _, err := peer.BlockAtHeight(pool.Height); err != nil {
  285. pool.logger.Info("remove peer that hasn't sent block at pool.Height",
  286. "peer", peerID, "height", pool.Height)
  287. pool.RemovePeer(peerID, err)
  288. return
  289. }
  290. }
  291. peerID = pool.blocks[pool.Height+1]
  292. peer, ok = pool.peers[peerID]
  293. if ok {
  294. if _, err := peer.BlockAtHeight(pool.Height + 1); err != nil {
  295. pool.logger.Info("remove peer that hasn't sent block at pool.Height+1",
  296. "peer", peerID, "height", pool.Height+1)
  297. pool.RemovePeer(peerID, err)
  298. return
  299. }
  300. }
  301. }
  302. // Cleanup performs pool and peer cleanup
  303. func (pool *BlockPool) Cleanup() {
  304. for id, peer := range pool.peers {
  305. peer.Cleanup()
  306. delete(pool.peers, id)
  307. }
  308. pool.plannedRequests = make(map[int64]struct{})
  309. pool.blocks = make(map[int64]p2p.ID)
  310. pool.nextRequestHeight = 0
  311. pool.Height = 0
  312. pool.MaxPeerHeight = 0
  313. }
  314. // NumPeers returns the number of peers in the pool
  315. func (pool *BlockPool) NumPeers() int {
  316. return len(pool.peers)
  317. }
  318. // NeedsBlocks returns true if more blocks are required.
  319. func (pool *BlockPool) NeedsBlocks() bool {
  320. return len(pool.blocks) < maxNumRequests
  321. }