You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

278 lines
8.8 KiB

  1. ---
  2. order: 2
  3. ---
  4. # Reactor
  5. The Blocksync Reactor's high level responsibility is to enable peers who are
  6. far behind the current state of the consensus to quickly catch up by downloading
  7. many blocks in parallel, verifying their commits, and executing them against the
  8. ABCI application.
  9. Tendermint full nodes run the Blocksync Reactor as a service to provide blocks
  10. to new nodes. New nodes run the Blocksync Reactor in "fast_sync" mode,
  11. where they actively make requests for more blocks until they sync up.
  12. Once caught up, "fast_sync" mode is disabled and the node switches to
  13. using (and turns on) the Consensus Reactor.
  14. ## Architecture and algorithm
  15. The Blocksync reactor is organised as a set of concurrent tasks:
  16. - Receive routine of Blocksync Reactor
  17. - Task for creating Requesters
  18. - Set of Requesters tasks and - Controller task.
  19. ![Blocksync Reactor Architecture Diagram](img/bc-reactor.png)
  20. ### Data structures
  21. These are the core data structures necessarily to provide the Blocksync Reactor logic.
  22. Requester data structure is used to track assignment of request for `block` at position `height` to a peer with id equals to `peerID`.
  23. ```go
  24. type Requester {
  25. mtx Mutex
  26. block Block
  27. height int64
  28. peerID p2p.ID
  29. redoChannel chan p2p.ID //redo may send multi-time; peerId is used to identify repeat
  30. }
  31. ```
  32. Pool is a core data structure that stores last executed block (`height`), assignment of requests to peers (`requesters`), current height for each peer and number of pending requests for each peer (`peers`), maximum peer height, etc.
  33. ```go
  34. type Pool {
  35. mtx Mutex
  36. requesters map[int64]*Requester
  37. height int64
  38. peers map[p2p.ID]*Peer
  39. maxPeerHeight int64
  40. numPending int32
  41. store BlockStore
  42. requestsChannel chan<- BlockRequest
  43. errorsChannel chan<- peerError
  44. }
  45. ```
  46. Peer data structure stores for each peer current `height` and number of pending requests sent to the peer (`numPending`), etc.
  47. ```go
  48. type Peer struct {
  49. id p2p.ID
  50. height int64
  51. numPending int32
  52. timeout *time.Timer
  53. didTimeout bool
  54. }
  55. ```
  56. BlockRequest is internal data structure used to denote current mapping of request for a block at some `height` to a peer (`PeerID`).
  57. ```go
  58. type BlockRequest {
  59. Height int64
  60. PeerID p2p.ID
  61. }
  62. ```
  63. ### Receive routine of Blocksync Reactor
  64. It is executed upon message reception on the BlocksyncChannel inside p2p receive routine. There is a separate p2p receive routine (and therefore receive routine of the Blocksync Reactor) executed for each peer. Note that try to send will not block (returns immediately) if outgoing buffer is full.
  65. ```go
  66. handleMsg(pool, m):
  67. upon receiving bcBlockRequestMessage m from peer p:
  68. block = load block for height m.Height from pool.store
  69. if block != nil then
  70. try to send BlockResponseMessage(block) to p
  71. else
  72. try to send bcNoBlockResponseMessage(m.Height) to p
  73. upon receiving bcBlockResponseMessage m from peer p:
  74. pool.mtx.Lock()
  75. requester = pool.requesters[m.Height]
  76. if requester == nil then
  77. error("peer sent us a block we didn't expect")
  78. continue
  79. if requester.block == nil and requester.peerID == p then
  80. requester.block = m
  81. pool.numPending -= 1 // atomic decrement
  82. peer = pool.peers[p]
  83. if peer != nil then
  84. peer.numPending--
  85. if peer.numPending == 0 then
  86. peer.timeout.Stop()
  87. // NOTE: we don't send Quit signal to the corresponding requester task!
  88. else
  89. trigger peer timeout to expire after peerTimeout
  90. pool.mtx.Unlock()
  91. upon receiving bcStatusRequestMessage m from peer p:
  92. try to send bcStatusResponseMessage(pool.store.Height)
  93. upon receiving bcStatusResponseMessage m from peer p:
  94. pool.mtx.Lock()
  95. peer = pool.peers[p]
  96. if peer != nil then
  97. peer.height = m.height
  98. else
  99. peer = create new Peer data structure with id = p and height = m.Height
  100. pool.peers[p] = peer
  101. if m.Height > pool.maxPeerHeight then
  102. pool.maxPeerHeight = m.Height
  103. pool.mtx.Unlock()
  104. onTimeout(p):
  105. send error message to pool error channel
  106. peer = pool.peers[p]
  107. peer.didTimeout = true
  108. ```
  109. ### Requester tasks
  110. Requester task is responsible for fetching a single block at position `height`.
  111. ```go
  112. fetchBlock(height, pool):
  113. while true do {
  114. peerID = nil
  115. block = nil
  116. peer = pickAvailablePeer(height)
  117. peerID = peer.id
  118. enqueue BlockRequest(height, peerID) to pool.requestsChannel
  119. redo = false
  120. while !redo do
  121. select {
  122. upon receiving Quit message do
  123. return
  124. upon receiving redo message with id on redoChannel do
  125. if peerID == id {
  126. mtx.Lock()
  127. pool.numPending++
  128. redo = true
  129. mtx.UnLock()
  130. }
  131. }
  132. }
  133. pickAvailablePeer(height):
  134. selectedPeer = nil
  135. while selectedPeer = nil do
  136. pool.mtx.Lock()
  137. for each peer in pool.peers do
  138. if !peer.didTimeout and peer.numPending < maxPendingRequestsPerPeer and peer.height >= height then
  139. peer.numPending++
  140. selectedPeer = peer
  141. break
  142. pool.mtx.Unlock()
  143. if selectedPeer = nil then
  144. sleep requestIntervalMS
  145. return selectedPeer
  146. ```
  147. sleep for requestIntervalMS
  148. ### Task for creating Requesters
  149. This task is responsible for continuously creating and starting Requester tasks.
  150. ```go
  151. createRequesters(pool):
  152. while true do
  153. if !pool.isRunning then break
  154. if pool.numPending < maxPendingRequests or size(pool.requesters) < maxTotalRequesters then
  155. pool.mtx.Lock()
  156. nextHeight = pool.height + size(pool.requesters)
  157. requester = create new requester for height nextHeight
  158. pool.requesters[nextHeight] = requester
  159. pool.numPending += 1 // atomic increment
  160. start requester task
  161. pool.mtx.Unlock()
  162. else
  163. sleep requestIntervalMS
  164. pool.mtx.Lock()
  165. for each peer in pool.peers do
  166. if !peer.didTimeout && peer.numPending > 0 && peer.curRate < minRecvRate then
  167. send error on pool error channel
  168. peer.didTimeout = true
  169. if peer.didTimeout then
  170. for each requester in pool.requesters do
  171. if requester.getPeerID() == peer then
  172. enqueue msg on requestor's redoChannel
  173. delete(pool.peers, peerID)
  174. pool.mtx.Unlock()
  175. ```
  176. ### Main blocksync reactor controller task
  177. ```go
  178. main(pool):
  179. create trySyncTicker with interval trySyncIntervalMS
  180. create statusUpdateTicker with interval statusUpdateIntervalSeconds
  181. create switchToConsensusTicker with interval switchToConsensusIntervalSeconds
  182. while true do
  183. select {
  184. upon receiving BlockRequest(Height, Peer) on pool.requestsChannel:
  185. try to send bcBlockRequestMessage(Height) to Peer
  186. upon receiving error(peer) on errorsChannel:
  187. stop peer for error
  188. upon receiving message on statusUpdateTickerChannel:
  189. broadcast bcStatusRequestMessage(bcR.store.Height) // message sent in a separate routine
  190. upon receiving message on switchToConsensusTickerChannel:
  191. pool.mtx.Lock()
  192. receivedBlockOrTimedOut = pool.height > 0 || (time.Now() - pool.startTime) > 5 Seconds
  193. ourChainIsLongestAmongPeers = pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight
  194. haveSomePeers = size of pool.peers > 0
  195. pool.mtx.Unlock()
  196. if haveSomePeers && receivedBlockOrTimedOut && ourChainIsLongestAmongPeers then
  197. switch to consensus mode
  198. upon receiving message on trySyncTickerChannel:
  199. for i = 0; i < 10; i++ do
  200. pool.mtx.Lock()
  201. firstBlock = pool.requesters[pool.height].block
  202. secondBlock = pool.requesters[pool.height].block
  203. if firstBlock == nil or secondBlock == nil then continue
  204. pool.mtx.Unlock()
  205. verify firstBlock using LastCommit from secondBlock
  206. if verification failed
  207. pool.mtx.Lock()
  208. peerID = pool.requesters[pool.height].peerID
  209. redoRequestsForPeer(peerId)
  210. delete(pool.peers, peerID)
  211. stop peer peerID for error
  212. pool.mtx.Unlock()
  213. else
  214. delete(pool.requesters, pool.height)
  215. save firstBlock to store
  216. pool.height++
  217. execute firstBlock
  218. }
  219. redoRequestsForPeer(pool, peerId):
  220. for each requester in pool.requesters do
  221. if requester.getPeerID() == peerID
  222. enqueue msg on redoChannel for requester
  223. ```
  224. ## Channels
  225. Defines `maxMsgSize` for the maximum size of incoming messages,
  226. `SendQueueCapacity` and `RecvBufferCapacity` for maximum sending and
  227. receiving buffers respectively. These are supposed to prevent amplification
  228. attacks by setting up the upper limit on how much data we can receive & send to
  229. a peer.
  230. Sending incorrectly encoded data will result in stopping the peer.