You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

397 lines
12 KiB

  1. # ADR 043: Blockhchain Reactor Riri-Org
  2. ## Changelog
  3. * 18-06-2019: Initial draft
  4. * 08-07-2019: Reviewed
  5. * 29-11-2019: Implemented
  6. ## Context
  7. The blockchain reactor is responsible for two high level processes:sending/receiving blocks from peers and FastSync-ing blocks to catch upnode who is far behind. The goal of [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md) was to refactor these two processes by separating business logic currently wrapped up in go-channels into pure `handle*` functions. While the ADR specified what the final form of the reactor might look like it lacked guidance on intermediary steps to get there.
  8. The following diagram illustrates the state of the [blockchain-reorg](https://github.com/tendermint/tendermint/pull/35610) reactor which will be referred to as `v1`.
  9. ![v1 Blockchain Reactor Architecture
  10. Diagram](https://github.com/tendermint/tendermint/blob/f9e556481654a24aeb689bdadaf5eab3ccd66829/docs/architecture/img/blockchain-reactor-v1.png)
  11. While `v1` of the blockchain reactor has shown significant improvements in terms of simplifying the concurrency model, the current PR has run into few roadblocks.
  12. * The current PR large and difficult to review.
  13. * Block gossiping and fast sync processes are highly coupled to the shared `Pool` data structure.
  14. * Peer communication is spread over multiple components creating complex dependency graph which must be mocked out during testing.
  15. * Timeouts modeled as stateful tickers introduce non-determinism in tests
  16. This ADR is meant to specify the missing components and control necessary to achieve [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md).
  17. ## Decision
  18. Partition the responsibilities of the blockchain reactor into a set of components which communicate exclusively with events. Events will contain timestamps allowing each component to track time as internal state. The internal state will be mutated by a set of `handle*` which will produce event(s). The integration between components will happen in the reactor and reactor tests will then become integration tests between components. This design will be known as `v2`.
  19. ![v2 Blockchain Reactor Architecture
  20. Diagram](https://github.com/tendermint/tendermint/blob/f9e556481654a24aeb689bdadaf5eab3ccd66829/docs/architecture/img/blockchain-reactor-v2.png)
  21. ### Reactor changes in detail
  22. The reactor will include a demultiplexing routine which will send each message to each sub routine for independent processing. Each sub routine will then select the messages it's interested in and call the handle specific function specified in [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md). The demuxRoutine acts as "pacemaker" setting the time in which events are expected to be handled.
  23. ```go
  24. func demuxRoutine(msgs, scheduleMsgs, processorMsgs, ioMsgs) {
  25. timer := time.NewTicker(interval)
  26. for {
  27. select {
  28. case <-timer.C:
  29. now := evTimeCheck{time.Now()}
  30. schedulerMsgs <- now
  31. processorMsgs <- now
  32. ioMsgs <- now
  33. case msg:= <- msgs:
  34. msg.time = time.Now()
  35. // These channels should produce backpressure before
  36. // being full to avoid starving each other
  37. schedulerMsgs <- msg
  38. processorMsgs <- msg
  39. ioMesgs <- msg
  40. if msg == stop {
  41. break;
  42. }
  43. }
  44. }
  45. }
  46. func processRoutine(input chan Message, output chan Message) {
  47. processor := NewProcessor(..)
  48. for {
  49. msg := <- input
  50. switch msg := msg.(type) {
  51. case bcBlockRequestMessage:
  52. output <- processor.handleBlockRequest(msg))
  53. ...
  54. case stop:
  55. processor.stop()
  56. break;
  57. }
  58. }
  59. func scheduleRoutine(input chan Message, output chan Message) {
  60. schelduer = NewScheduler(...)
  61. for {
  62. msg := <-msgs
  63. switch msg := input.(type) {
  64. case bcBlockResponseMessage:
  65. output <- scheduler.handleBlockResponse(msg)
  66. ...
  67. case stop:
  68. schedule.stop()
  69. break;
  70. }
  71. }
  72. }
  73. ```
  74. ## Lifecycle management
  75. A set of routines for individual processes allow processes to run in parallel with clear lifecycle management. `Start`, `Stop`, and `AddPeer` hooks currently present in the reactor will delegate to the sub-routines allowing them to manage internal state independent without further coupling to the reactor.
  76. ```go
  77. func (r *BlockChainReactor) Start() {
  78. r.msgs := make(chan Message, maxInFlight)
  79. schedulerMsgs := make(chan Message)
  80. processorMsgs := make(chan Message)
  81. ioMsgs := make(chan Message)
  82. go processorRoutine(processorMsgs, r.msgs)
  83. go scheduleRoutine(schedulerMsgs, r.msgs)
  84. go ioRoutine(ioMsgs, r.msgs)
  85. ...
  86. }
  87. func (bcR *BlockchainReactor) Receive(...) {
  88. ...
  89. r.msgs <- msg
  90. ...
  91. }
  92. func (r *BlockchainReactor) Stop() {
  93. ...
  94. r.msgs <- stop
  95. ...
  96. }
  97. ...
  98. func (r *BlockchainReactor) Stop() {
  99. ...
  100. r.msgs <- stop
  101. ...
  102. }
  103. ...
  104. func (r *BlockchainReactor) AddPeer(peer p2p.Peer) {
  105. ...
  106. r.msgs <- bcAddPeerEv{peer.ID}
  107. ...
  108. }
  109. ```
  110. ## IO handling
  111. An io handling routine within the reactor will isolate peer communication. Message going through the ioRoutine will usually be one way, using `p2p` APIs. In the case in which the `p2p` API such as `trySend` return errors, the ioRoutine can funnel those message back to the demuxRoutine for distribution to the other routines. For instance errors from the ioRoutine can be consumed by the scheduler to inform better peer selection implementations.
  112. ```go
  113. func (r *BlockchainReacor) ioRoutine(ioMesgs chan Message, outMsgs chan Message) {
  114. ...
  115. for {
  116. msg := <-ioMsgs
  117. switch msg := msg.(type) {
  118. case scBlockRequestMessage:
  119. queued := r.sendBlockRequestToPeer(...)
  120. if queued {
  121. outMsgs <- ioSendQueued{...}
  122. }
  123. case scStatusRequestMessage
  124. r.sendStatusRequestToPeer(...)
  125. case bcPeerError
  126. r.Swtich.StopPeerForError(msg.src)
  127. ...
  128. ...
  129. case bcFinished
  130. break;
  131. }
  132. }
  133. }
  134. ```
  135. ### Processor Internals
  136. The processor is responsible for ordering, verifying and executing blocks. The Processor will maintain an internal cursor `height` refering to the last processed block. As a set of blocks arrive unordered, the Processor will check if it has `height+1` necessary to process the next block. The processor also maintains the map `blockPeers` of peers to height, to keep track of which peer provided the block at `height`. `blockPeers` can be used in`handleRemovePeer(...)` to reschedule all unprocessed blocks provided by a peer who has errored.
  137. ```go
  138. type Processor struct {
  139. height int64 // the height cursor
  140. state ...
  141. blocks [height]*Block // keep a set of blocks in memory until they are processed
  142. blockPeers [height]PeerID // keep track of which heights came from which peerID
  143. lastTouch timestamp
  144. }
  145. func (proc *Processor) handleBlockResponse(peerID, block) {
  146. if block.height <= height || block[block.height] {
  147. } else if blocks[block.height] {
  148. return errDuplicateBlock{}
  149. } else {
  150. blocks[block.height] = block
  151. }
  152. if blocks[height] && blocks[height+1] {
  153. ... = state.Validators.VerifyCommit(...)
  154. ... = store.SaveBlock(...)
  155. state, err = blockExec.ApplyBlock(...)
  156. ...
  157. if err == nil {
  158. delete blocks[height]
  159. height++
  160. lastTouch = msg.time
  161. return pcBlockProcessed{height-1}
  162. } else {
  163. ... // Delete all unprocessed block from the peer
  164. return pcBlockProcessError{peerID, height}
  165. }
  166. }
  167. }
  168. func (proc *Processor) handleRemovePeer(peerID) {
  169. events = []
  170. // Delete all unprocessed blocks from peerID
  171. for i = height; i < len(blocks); i++ {
  172. if blockPeers[i] == peerID {
  173. events = append(events, pcBlockReschedule{height})
  174. delete block[height]
  175. }
  176. }
  177. return events
  178. }
  179. func handleTimeCheckEv(time) {
  180. if time - lastTouch > timeout {
  181. // Timeout the processor
  182. ...
  183. }
  184. }
  185. ```
  186. ## Schedule
  187. The Schedule maintains the internal state used for scheduling blockRequestMessages based on some scheduling algorithm. The schedule needs to maintain state on:
  188. * The state `blockState` of every block seem up to height of maxHeight
  189. * The set of peers and their peer state `peerState`
  190. * which peers have which blocks
  191. * which blocks have been requested from which peers
  192. ```go
  193. type blockState int
  194. const (
  195. blockStateNew = iota
  196. blockStatePending,
  197. blockStateReceived,
  198. blockStateProcessed
  199. )
  200. type schedule {
  201. // a list of blocks in which blockState
  202. blockStates map[height]blockState
  203. // a map of which blocks are available from which peers
  204. blockPeers map[height]map[p2p.ID]scPeer
  205. // a map of peerID to schedule specific peer struct `scPeer`
  206. peers map[p2p.ID]scPeer
  207. // a map of heights to the peer we are waiting for a response from
  208. pending map[height]scPeer
  209. targetPending int // the number of blocks we want in blockStatePending
  210. targetReceived int // the number of blocks we want in blockStateReceived
  211. peerTimeout int
  212. peerMinSpeed int
  213. }
  214. func (sc *schedule) numBlockInState(state blockState) uint32 {
  215. num := 0
  216. for i := sc.minHeight(); i <= sc.maxHeight(); i++ {
  217. if sc.blockState[i] == state {
  218. num++
  219. }
  220. }
  221. return num
  222. }
  223. func (sc *schedule) popSchedule(maxRequest int) []scBlockRequestMessage {
  224. // We only want to schedule requests such that we have less than sc.targetPending and sc.targetReceived
  225. // This ensures we don't saturate the network or flood the processor with unprocessed blocks
  226. todo := min(sc.targetPending - sc.numBlockInState(blockStatePending), sc.numBlockInState(blockStateReceived))
  227. events := []scBlockRequestMessage{}
  228. for i := sc.minHeight(); i < sc.maxMaxHeight(); i++ {
  229. if todo == 0 {
  230. break
  231. }
  232. if blockStates[i] == blockStateNew {
  233. peer = sc.selectPeer(blockPeers[i])
  234. sc.blockStates[i] = blockStatePending
  235. sc.pending[i] = peer
  236. events = append(events, scBlockRequestMessage{peerID: peer.peerID, height: i})
  237. todo--
  238. }
  239. }
  240. return events
  241. }
  242. ...
  243. type scPeer struct {
  244. peerID p2p.ID
  245. numOustandingRequest int
  246. lastTouched time.Time
  247. monitor flow.Monitor
  248. }
  249. ```
  250. # Scheduler
  251. The scheduler is configured to maintain a target `n` of in flight
  252. messages and will use feedback from `_blockResponseMessage`,
  253. `_statusResponseMessage` and `_peerError` produce an optimal assignment
  254. of scBlockRequestMessage at each `timeCheckEv`.
  255. ```
  256. func handleStatusResponse(peerID, height, time) {
  257. schedule.touchPeer(peerID, time)
  258. schedule.setPeerHeight(peerID, height)
  259. }
  260. func handleBlockResponseMessage(peerID, height, block, time) {
  261. schedule.touchPeer(peerID, time)
  262. schedule.markReceived(peerID, height, size(block))
  263. }
  264. func handleNoBlockResponseMessage(peerID, height, time) {
  265. schedule.touchPeer(peerID, time)
  266. // reschedule that block, punish peer...
  267. ...
  268. }
  269. func handlePeerError(peerID) {
  270. // Remove the peer, reschedule the requests
  271. ...
  272. }
  273. func handleTimeCheckEv(time) {
  274. // clean peer list
  275. events = []
  276. for peerID := range schedule.peersNotTouchedSince(time) {
  277. pending = schedule.pendingFrom(peerID)
  278. schedule.setPeerState(peerID, timedout)
  279. schedule.resetBlocks(pending)
  280. events = append(events, peerTimeout{peerID})
  281. }
  282. events = append(events, schedule.popSchedule())
  283. return events
  284. }
  285. ```
  286. ## Peer
  287. The Peer Stores per peer state based on messages received by the scheduler.
  288. ```go
  289. type Peer struct {
  290. lastTouched timestamp
  291. lastDownloaded timestamp
  292. pending map[height]struct{}
  293. height height // max height for the peer
  294. state {
  295. pending, // we know the peer but not the height
  296. active, // we know the height
  297. timeout // the peer has timed out
  298. }
  299. }
  300. ```
  301. ## Status
  302. This design is under active development. The Implementation has been
  303. staged in the following PRs:
  304. * [Routine](https://github.com/tendermint/tendermint/pull/3878)
  305. * [Processor](https://github.com/tendermint/tendermint/pull/4012)
  306. * [Scheduler](https://github.com/tendermint/tendermint/pull/4043)
  307. * [Reactor](https://github.com/tendermint/tendermint/pull/4067)
  308. ## Consequences
  309. ### Positive
  310. * Test become deterministic
  311. * Simulation becomes a-termporal: no need wait for a wall-time timeout
  312. * Peer Selection can be independently tested/simulated
  313. * Develop a general approach to refactoring reactors
  314. ### Negative
  315. ### Neutral
  316. ### Implementation Path
  317. * Implement the scheduler, test the scheduler, review the rescheduler
  318. * Implement the processor, test the processor, review the processor
  319. * Implement the demuxer, write integration test, review integration tests
  320. ## References
  321. * [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md): The original blockchain reactor re-org proposal
  322. * [Blockchain re-org](https://github.com/tendermint/tendermint/pull/3561): The current blockchain reactor re-org implementation (v1)