You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

307 lines
10 KiB

  1. # Blockchain Reactor
  2. The Blockchain Reactor's high level responsibility is to enable peers who are
  3. far behind the current state of the consensus to quickly catch up by downloading
  4. many blocks in parallel, verifying their commits, and executing them against the
  5. ABCI application.
  6. Tendermint full nodes run the Blockchain Reactor as a service to provide blocks
  7. to new nodes. New nodes run the Blockchain Reactor in "fast_sync" mode,
  8. where they actively make requests for more blocks until they sync up.
  9. Once caught up, "fast_sync" mode is disabled and the node switches to
  10. using (and turns on) the Consensus Reactor.
  11. ## Message Types
  12. ```go
  13. const (
  14. msgTypeBlockRequest = byte(0x10)
  15. msgTypeBlockResponse = byte(0x11)
  16. msgTypeNoBlockResponse = byte(0x12)
  17. msgTypeStatusResponse = byte(0x20)
  18. msgTypeStatusRequest = byte(0x21)
  19. )
  20. ```
  21. ```go
  22. type bcBlockRequestMessage struct {
  23. Height int64
  24. }
  25. type bcNoBlockResponseMessage struct {
  26. Height int64
  27. }
  28. type bcBlockResponseMessage struct {
  29. Block Block
  30. }
  31. type bcStatusRequestMessage struct {
  32. Height int64
  33. type bcStatusResponseMessage struct {
  34. Height int64
  35. }
  36. ```
  37. ## Architecture and algorithm
  38. The Blockchain reactor is organised as a set of concurrent tasks:
  39. - Receive routine of Blockchain Reactor
  40. - Task for creating Requesters
  41. - Set of Requesters tasks and
  42. - Controller task.
  43. ![Blockchain Reactor Architecture Diagram](img/bc-reactor.png)
  44. ### Data structures
  45. These are the core data structures necessarily to provide the Blockchain Reactor logic.
  46. Requester data structure is used to track assignment of request for `block` at position `height` to a
  47. peer with id equals to `peerID`.
  48. ```go
  49. type Requester {
  50. mtx Mutex
  51. block Block
  52. height int64
  53. 
 peerID p2p.ID
  54. redoChannel chan struct{}
  55. }
  56. ```
  57. Pool is core data structure that stores last executed block (`height`), assignment of requests to peers (`requesters`),
  58. current height for each peer and number of pending requests for each peer (`peers`), maximum peer height, etc.
  59. ```go
  60. type Pool {
  61. mtx Mutex
  62. requesters map[int64]*Requester
  63. 
height int64
  64. peers map[p2p.ID]*Peer
  65. 
maxPeerHeight int64 


  66. 
numPending int32
  67. store BlockStore
  68. 
requestsChannel chan<- BlockRequest
  69. 
errorsChannel chan<- peerError
  70. }
  71. ```
  72. Peer data structure stores for each peer current `height` and number of pending requests sent to
  73. the peer (`numPending`), etc.
  74. ```go
  75. type Peer struct {
  76. id p2p.ID
  77. height int64
  78. numPending int32
  79. timeout *time.Timer
  80. didTimeout bool
  81. }
  82. ```
  83. BlockRequest is internal data structure used to denote current mapping of request for a block at some `height` to
  84. a peer (`PeerID`).
  85. ```go
  86. type BlockRequest {
  87. Height int64
  88. PeerID p2p.ID
  89. }
  90. ```
  91. ### Receive routine of Blockchain Reactor
  92. It is executed upon message reception on the BlockchainChannel inside p2p receive routine. There is a separate p2p
  93. receive routine (and therefore receive routine of the Blockchain Reactor) executed for each peer. Note that
  94. try to send will not block (returns immediately) if outgoing buffer is full.
  95. ```go
  96. handleMsg(pool, m):
  97. upon receiving bcBlockRequestMessage m from peer p:
  98. block = load block for height m.Height from pool.store
  99. if block != nil then
  100. try to send BlockResponseMessage(block) to p
  101. else
  102. try to send bcNoBlockResponseMessage(m.Height) to p
  103. upon receiving bcBlockResponseMessage m from peer p:
  104. pool.mtx.Lock()
  105. requester = pool.requesters[m.Height]
  106. if requester == nil then
  107. error("peer sent us a block we didn't expect")
  108. continue
  109. if requester.block == nil and requester.peerID == p then
  110. requester.block = m
  111. pool.numPending -= 1 // atomic decrement
  112. peer = pool.peers[p]
  113. if peer != nil then
  114. peer.numPending--
  115. if peer.numPending == 0 then
  116. peer.timeout.Stop()
  117. // NOTE: we don't send Quit signal to the corresponding requester task!
  118. else
  119. trigger peer timeout to expire after peerTimeout
  120. pool.mtx.Unlock()
  121. upon receiving bcStatusRequestMessage m from peer p:
  122. try to send bcStatusResponseMessage(pool.store.Height)
  123. upon receiving bcStatusResponseMessage m from peer p:
  124. pool.mtx.Lock()
  125. peer = pool.peers[p]
  126. if peer != nil then
  127. peer.height = m.height
  128. else
  129. peer = create new Peer data structure with id = p and height = m.Height
  130. pool.peers[p] = peer
  131. if m.Height > pool.maxPeerHeight then
  132. pool.maxPeerHeight = m.Height
  133. pool.mtx.Unlock()
  134. onTimeout(p):
  135. send error message to pool error channel
  136. peer = pool.peers[p]
  137. peer.didTimeout = true
  138. ```
  139. ### Requester tasks
  140. Requester task is responsible for fetching a single block at position `height`.
  141. ```go
  142. fetchBlock(height, pool):
  143. while true do
  144. peerID = nil
  145. block = nil
  146. peer = pickAvailablePeer(height)
  147. peerId = peer.id
  148. enqueue BlockRequest(height, peerID) to pool.requestsChannel
  149. redo = false
  150. while !redo do
  151. select {
  152. upon receiving Quit message do
  153. return
  154. upon receiving message on redoChannel do
  155. mtx.Lock()
  156. pool.numPending++
  157. redo = true
  158. mtx.UnLock()
  159. }
  160. pickAvailablePeer(height):
  161. selectedPeer = nil
  162. while selectedPeer = nil do
  163. pool.mtx.Lock()
  164. for each peer in pool.peers do
  165. if !peer.didTimeout and peer.numPending < maxPendingRequestsPerPeer and peer.height >= height then
  166. peer.numPending++
  167. selectedPeer = peer
  168. break
  169. pool.mtx.Unlock()
  170. if selectedPeer = nil then
  171. sleep requestIntervalMS
  172. return selectedPeer
  173. ```
  174. sleep for requestIntervalMS
  175. ### Task for creating Requesters
  176. This task is responsible for continuously creating and starting Requester tasks.
  177. ```go
  178. createRequesters(pool):
  179. while true do
  180. if !pool.isRunning then break
  181. if pool.numPending < maxPendingRequests or size(pool.requesters) < maxTotalRequesters then
  182. pool.mtx.Lock()
  183. nextHeight = pool.height + size(pool.requesters)
  184. requester = create new requester for height nextHeight
  185. pool.requesters[nextHeight] = requester
  186. pool.numPending += 1 // atomic increment
  187. start requester task
  188. pool.mtx.Unlock()
  189. else
  190. sleep requestIntervalMS
  191. pool.mtx.Lock()
  192. for each peer in pool.peers do
  193. if !peer.didTimeout && peer.numPending > 0 && peer.curRate < minRecvRate then
  194. send error on pool error channel
  195. peer.didTimeout = true
  196. if peer.didTimeout then
  197. for each requester in pool.requesters do
  198. if requester.getPeerID() == peer then
  199. enqueue msg on requestor's redoChannel
  200. delete(pool.peers, peerID)
  201. pool.mtx.Unlock()
  202. ```
  203. ### Main blockchain reactor controller task
  204. ```go
  205. main(pool):
  206. create trySyncTicker with interval trySyncIntervalMS
  207. create statusUpdateTicker with interval statusUpdateIntervalSeconds
  208. create switchToConsensusTicker with interbal switchToConsensusIntervalSeconds
  209. while true do
  210. select {
  211. upon receiving BlockRequest(Height, Peer) on pool.requestsChannel:
  212. try to send bcBlockRequestMessage(Height) to Peer
  213. upon receiving error(peer) on errorsChannel:
  214. stop peer for error
  215. upon receiving message on statusUpdateTickerChannel:
  216. broadcast bcStatusRequestMessage(bcR.store.Height) // message sent in a separate routine
  217. upon receiving message on switchToConsensusTickerChannel:
  218. pool.mtx.Lock()
  219. receivedBlockOrTimedOut = pool.height > 0 || (time.Now() - pool.startTime) > 5 Seconds
  220. ourChainIsLongestAmongPeers = pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight
  221. haveSomePeers = size of pool.peers > 0
  222. pool.mtx.Unlock()
  223. if haveSomePeers && receivedBlockOrTimedOut && ourChainIsLongestAmongPeers then
  224. switch to consensus mode
  225. upon receiving message on trySyncTickerChannel:
  226. for i = 0; i < 10; i++ do
  227. pool.mtx.Lock()
  228. firstBlock = pool.requesters[pool.height].block
  229. secondBlock = pool.requesters[pool.height].block
  230. if firstBlock == nil or secondBlock == nil then continue
  231. pool.mtx.Unlock()
  232. verify firstBlock using LastCommit from secondBlock
  233. if verification failed
  234. pool.mtx.Lock()
  235. peerID = pool.requesters[pool.height].peerID
  236. redoRequestsForPeer(peerId)
  237. delete(pool.peers, peerID)
  238. stop peer peerID for error
  239. pool.mtx.Unlock()
  240. else
  241. delete(pool.requesters, pool.height)
  242. save firstBlock to store
  243. pool.height++
  244. execute firstBlock
  245. }
  246. redoRequestsForPeer(pool, peerId):
  247. for each requester in pool.requesters do
  248. if requester.getPeerID() == peerID
  249. enqueue msg on redoChannel for requester
  250. ```
  251. ## Channels
  252. Defines `maxMsgSize` for the maximum size of incoming messages,
  253. `SendQueueCapacity` and `RecvBufferCapacity` for maximum sending and
  254. receiving buffers respectively. These are supposed to prevent amplification
  255. attacks by setting up the upper limit on how much data we can receive & send to
  256. a peer.
  257. Sending incorrectly encoded data will result in stopping the peer.