You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

406 lines
13 KiB

blockchain: add v2 reactor (#4361) The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. This PR replaces #4067 which got far too large and messy after a failed attempt to rebase. ## Commits: * Blockchainv 2 reactor: + I cleaner copy of the work done in #4067 which fell too far behind and was a nightmare to rebase. + The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. * fixes after merge * reorder iIO interface methodset * change iO -> IO * panic before send nil block * rename switchToConsensus -> trySwitchToConsensus * rename tdState -> tmState * Update blockchain/v2/reactor.go Co-Authored-By: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> * remove peer when it sends a block unsolicited * check for not ready in markReceived * fix error * fix the pcFinished event * typo fix * add documentation for processor fields * simplify time.Since * try and make the linter happy * some doc updates * fix channel diagram * Update adr-043-blockchain-riri-org.md * panic on nil switch * liting fixes * account for nil block in bBlockResponseMessage * panic on duplicate block enqueued by processor * linting * goimport reactor_test.go Co-authored-by: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> Co-authored-by: Anca Zamfir <ancazamfir@users.noreply.github.com> Co-authored-by: Marko <marbar3778@yahoo.com> Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
5 years ago
blockchain: add v2 reactor (#4361) The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. This PR replaces #4067 which got far too large and messy after a failed attempt to rebase. ## Commits: * Blockchainv 2 reactor: + I cleaner copy of the work done in #4067 which fell too far behind and was a nightmare to rebase. + The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. * fixes after merge * reorder iIO interface methodset * change iO -> IO * panic before send nil block * rename switchToConsensus -> trySwitchToConsensus * rename tdState -> tmState * Update blockchain/v2/reactor.go Co-Authored-By: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> * remove peer when it sends a block unsolicited * check for not ready in markReceived * fix error * fix the pcFinished event * typo fix * add documentation for processor fields * simplify time.Since * try and make the linter happy * some doc updates * fix channel diagram * Update adr-043-blockchain-riri-org.md * panic on nil switch * liting fixes * account for nil block in bBlockResponseMessage * panic on duplicate block enqueued by processor * linting * goimport reactor_test.go Co-authored-by: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> Co-authored-by: Anca Zamfir <ancazamfir@users.noreply.github.com> Co-authored-by: Marko <marbar3778@yahoo.com> Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
5 years ago
  1. # ADR 043: Blockhchain Reactor Riri-Org
  2. ## Changelog
  3. * 18-06-2019: Initial draft
  4. * 08-07-2019: Reviewed
  5. * 29-11-2019: Implemented
  6. * 14-02-2020: Updated with the implementation details
  7. ## Context
  8. The blockchain reactor is responsible for two high level processes:sending/receiving blocks from peers and FastSync-ing blocks to catch upnode who is far behind. The goal of [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md) was to refactor these two processes by separating business logic currently wrapped up in go-channels into pure `handle*` functions. While the ADR specified what the final form of the reactor might look like it lacked guidance on intermediary steps to get there.
  9. The following diagram illustrates the state of the [blockchain-reorg](https://github.com/tendermint/tendermint/pull/35610) reactor which will be referred to as `v1`.
  10. ![v1 Blockchain Reactor Architecture
  11. Diagram](https://github.com/tendermint/tendermint/blob/f9e556481654a24aeb689bdadaf5eab3ccd66829/docs/architecture/img/blockchain-reactor-v1.png)
  12. While `v1` of the blockchain reactor has shown significant improvements in terms of simplifying the concurrency model, the current PR has run into few roadblocks.
  13. * The current PR large and difficult to review.
  14. * Block gossiping and fast sync processes are highly coupled to the shared `Pool` data structure.
  15. * Peer communication is spread over multiple components creating complex dependency graph which must be mocked out during testing.
  16. * Timeouts modeled as stateful tickers introduce non-determinism in tests
  17. This ADR is meant to specify the missing components and control necessary to achieve [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md).
  18. ## Decision
  19. Partition the responsibilities of the blockchain reactor into a set of components which communicate exclusively with events. Events will contain timestamps allowing each component to track time as internal state. The internal state will be mutated by a set of `handle*` which will produce event(s). The integration between components will happen in the reactor and reactor tests will then become integration tests between components. This design will be known as `v2`.
  20. ![v2 Blockchain Reactor Architecture
  21. Diagram](https://github.com/tendermint/tendermint/blob/584e67ac3fac220c5c3e0652e3582eca8231e814/docs/architecture/img/blockchain-reactor-v2.png)
  22. ### Fast Sync Related Communication Channels
  23. The diagram below shows the fast sync routines and the types of channels and queues used to communicate with each other.
  24. In addition the per reactor channels used by the sendRoutine to send messages over the Peer MConnection are shown.
  25. ![v2 Blockchain Channels and Queues
  26. Diagram](https://github.com/tendermint/tendermint/blob/5cf570690f989646fb3b615b734da503f038891f/docs/architecture/img/blockchain-v2-channels.png)
  27. ### Reactor changes in detail
  28. The reactor will include a demultiplexing routine which will send each message to each sub routine for independent processing. Each sub routine will then select the messages it's interested in and call the handle specific function specified in [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md). The demuxRoutine acts as "pacemaker" setting the time in which events are expected to be handled.
  29. ```go
  30. func demuxRoutine(msgs, scheduleMsgs, processorMsgs, ioMsgs) {
  31. timer := time.NewTicker(interval)
  32. for {
  33. select {
  34. case <-timer.C:
  35. now := evTimeCheck{time.Now()}
  36. schedulerMsgs <- now
  37. processorMsgs <- now
  38. ioMsgs <- now
  39. case msg:= <- msgs:
  40. msg.time = time.Now()
  41. // These channels should produce backpressure before
  42. // being full to avoid starving each other
  43. schedulerMsgs <- msg
  44. processorMsgs <- msg
  45. ioMesgs <- msg
  46. if msg == stop {
  47. break;
  48. }
  49. }
  50. }
  51. }
  52. func processRoutine(input chan Message, output chan Message) {
  53. processor := NewProcessor(..)
  54. for {
  55. msg := <- input
  56. switch msg := msg.(type) {
  57. case bcBlockRequestMessage:
  58. output <- processor.handleBlockRequest(msg))
  59. ...
  60. case stop:
  61. processor.stop()
  62. break;
  63. }
  64. }
  65. func scheduleRoutine(input chan Message, output chan Message) {
  66. schelduer = NewScheduler(...)
  67. for {
  68. msg := <-msgs
  69. switch msg := input.(type) {
  70. case bcBlockResponseMessage:
  71. output <- scheduler.handleBlockResponse(msg)
  72. ...
  73. case stop:
  74. schedule.stop()
  75. break;
  76. }
  77. }
  78. }
  79. ```
  80. ## Lifecycle management
  81. A set of routines for individual processes allow processes to run in parallel with clear lifecycle management. `Start`, `Stop`, and `AddPeer` hooks currently present in the reactor will delegate to the sub-routines allowing them to manage internal state independent without further coupling to the reactor.
  82. ```go
  83. func (r *BlockChainReactor) Start() {
  84. r.msgs := make(chan Message, maxInFlight)
  85. schedulerMsgs := make(chan Message)
  86. processorMsgs := make(chan Message)
  87. ioMsgs := make(chan Message)
  88. go processorRoutine(processorMsgs, r.msgs)
  89. go scheduleRoutine(schedulerMsgs, r.msgs)
  90. go ioRoutine(ioMsgs, r.msgs)
  91. ...
  92. }
  93. func (bcR *BlockchainReactor) Receive(...) {
  94. ...
  95. r.msgs <- msg
  96. ...
  97. }
  98. func (r *BlockchainReactor) Stop() {
  99. ...
  100. r.msgs <- stop
  101. ...
  102. }
  103. ...
  104. func (r *BlockchainReactor) Stop() {
  105. ...
  106. r.msgs <- stop
  107. ...
  108. }
  109. ...
  110. func (r *BlockchainReactor) AddPeer(peer p2p.Peer) {
  111. ...
  112. r.msgs <- bcAddPeerEv{peer.ID}
  113. ...
  114. }
  115. ```
  116. ## IO handling
  117. An io handling routine within the reactor will isolate peer communication. Message going through the ioRoutine will usually be one way, using `p2p` APIs. In the case in which the `p2p` API such as `trySend` return errors, the ioRoutine can funnel those message back to the demuxRoutine for distribution to the other routines. For instance errors from the ioRoutine can be consumed by the scheduler to inform better peer selection implementations.
  118. ```go
  119. func (r *BlockchainReacor) ioRoutine(ioMesgs chan Message, outMsgs chan Message) {
  120. ...
  121. for {
  122. msg := <-ioMsgs
  123. switch msg := msg.(type) {
  124. case scBlockRequestMessage:
  125. queued := r.sendBlockRequestToPeer(...)
  126. if queued {
  127. outMsgs <- ioSendQueued{...}
  128. }
  129. case scStatusRequestMessage
  130. r.sendStatusRequestToPeer(...)
  131. case bcPeerError
  132. r.Swtich.StopPeerForError(msg.src)
  133. ...
  134. ...
  135. case bcFinished
  136. break;
  137. }
  138. }
  139. }
  140. ```
  141. ### Processor Internals
  142. The processor is responsible for ordering, verifying and executing blocks. The Processor will maintain an internal cursor `height` refering to the last processed block. As a set of blocks arrive unordered, the Processor will check if it has `height+1` necessary to process the next block. The processor also maintains the map `blockPeers` of peers to height, to keep track of which peer provided the block at `height`. `blockPeers` can be used in`handleRemovePeer(...)` to reschedule all unprocessed blocks provided by a peer who has errored.
  143. ```go
  144. type Processor struct {
  145. height int64 // the height cursor
  146. state ...
  147. blocks [height]*Block // keep a set of blocks in memory until they are processed
  148. blockPeers [height]PeerID // keep track of which heights came from which peerID
  149. lastTouch timestamp
  150. }
  151. func (proc *Processor) handleBlockResponse(peerID, block) {
  152. if block.height <= height || block[block.height] {
  153. } else if blocks[block.height] {
  154. return errDuplicateBlock{}
  155. } else {
  156. blocks[block.height] = block
  157. }
  158. if blocks[height] && blocks[height+1] {
  159. ... = state.Validators.VerifyCommit(...)
  160. ... = store.SaveBlock(...)
  161. state, err = blockExec.ApplyBlock(...)
  162. ...
  163. if err == nil {
  164. delete blocks[height]
  165. height++
  166. lastTouch = msg.time
  167. return pcBlockProcessed{height-1}
  168. } else {
  169. ... // Delete all unprocessed block from the peer
  170. return pcBlockProcessError{peerID, height}
  171. }
  172. }
  173. }
  174. func (proc *Processor) handleRemovePeer(peerID) {
  175. events = []
  176. // Delete all unprocessed blocks from peerID
  177. for i = height; i < len(blocks); i++ {
  178. if blockPeers[i] == peerID {
  179. events = append(events, pcBlockReschedule{height})
  180. delete block[height]
  181. }
  182. }
  183. return events
  184. }
  185. func handleTimeCheckEv(time) {
  186. if time - lastTouch > timeout {
  187. // Timeout the processor
  188. ...
  189. }
  190. }
  191. ```
  192. ## Schedule
  193. The Schedule maintains the internal state used for scheduling blockRequestMessages based on some scheduling algorithm. The schedule needs to maintain state on:
  194. * The state `blockState` of every block seem up to height of maxHeight
  195. * The set of peers and their peer state `peerState`
  196. * which peers have which blocks
  197. * which blocks have been requested from which peers
  198. ```go
  199. type blockState int
  200. const (
  201. blockStateNew = iota
  202. blockStatePending,
  203. blockStateReceived,
  204. blockStateProcessed
  205. )
  206. type schedule {
  207. // a list of blocks in which blockState
  208. blockStates map[height]blockState
  209. // a map of which blocks are available from which peers
  210. blockPeers map[height]map[p2p.ID]scPeer
  211. // a map of peerID to schedule specific peer struct `scPeer`
  212. peers map[p2p.ID]scPeer
  213. // a map of heights to the peer we are waiting for a response from
  214. pending map[height]scPeer
  215. targetPending int // the number of blocks we want in blockStatePending
  216. targetReceived int // the number of blocks we want in blockStateReceived
  217. peerTimeout int
  218. peerMinSpeed int
  219. }
  220. func (sc *schedule) numBlockInState(state blockState) uint32 {
  221. num := 0
  222. for i := sc.minHeight(); i <= sc.maxHeight(); i++ {
  223. if sc.blockState[i] == state {
  224. num++
  225. }
  226. }
  227. return num
  228. }
  229. func (sc *schedule) popSchedule(maxRequest int) []scBlockRequestMessage {
  230. // We only want to schedule requests such that we have less than sc.targetPending and sc.targetReceived
  231. // This ensures we don't saturate the network or flood the processor with unprocessed blocks
  232. todo := min(sc.targetPending - sc.numBlockInState(blockStatePending), sc.numBlockInState(blockStateReceived))
  233. events := []scBlockRequestMessage{}
  234. for i := sc.minHeight(); i < sc.maxMaxHeight(); i++ {
  235. if todo == 0 {
  236. break
  237. }
  238. if blockStates[i] == blockStateNew {
  239. peer = sc.selectPeer(blockPeers[i])
  240. sc.blockStates[i] = blockStatePending
  241. sc.pending[i] = peer
  242. events = append(events, scBlockRequestMessage{peerID: peer.peerID, height: i})
  243. todo--
  244. }
  245. }
  246. return events
  247. }
  248. ...
  249. type scPeer struct {
  250. peerID p2p.ID
  251. numOustandingRequest int
  252. lastTouched time.Time
  253. monitor flow.Monitor
  254. }
  255. ```
  256. # Scheduler
  257. The scheduler is configured to maintain a target `n` of in flight
  258. messages and will use feedback from `_blockResponseMessage`,
  259. `_statusResponseMessage` and `_peerError` produce an optimal assignment
  260. of scBlockRequestMessage at each `timeCheckEv`.
  261. ```
  262. func handleStatusResponse(peerID, height, time) {
  263. schedule.touchPeer(peerID, time)
  264. schedule.setPeerHeight(peerID, height)
  265. }
  266. func handleBlockResponseMessage(peerID, height, block, time) {
  267. schedule.touchPeer(peerID, time)
  268. schedule.markReceived(peerID, height, size(block))
  269. }
  270. func handleNoBlockResponseMessage(peerID, height, time) {
  271. schedule.touchPeer(peerID, time)
  272. // reschedule that block, punish peer...
  273. ...
  274. }
  275. func handlePeerError(peerID) {
  276. // Remove the peer, reschedule the requests
  277. ...
  278. }
  279. func handleTimeCheckEv(time) {
  280. // clean peer list
  281. events = []
  282. for peerID := range schedule.peersNotTouchedSince(time) {
  283. pending = schedule.pendingFrom(peerID)
  284. schedule.setPeerState(peerID, timedout)
  285. schedule.resetBlocks(pending)
  286. events = append(events, peerTimeout{peerID})
  287. }
  288. events = append(events, schedule.popSchedule())
  289. return events
  290. }
  291. ```
  292. ## Peer
  293. The Peer Stores per peer state based on messages received by the scheduler.
  294. ```go
  295. type Peer struct {
  296. lastTouched timestamp
  297. lastDownloaded timestamp
  298. pending map[height]struct{}
  299. height height // max height for the peer
  300. state {
  301. pending, // we know the peer but not the height
  302. active, // we know the height
  303. timeout // the peer has timed out
  304. }
  305. }
  306. ```
  307. ## Status
  308. This design is under active development. The Implementation has been
  309. staged in the following PRs:
  310. * [Routine](https://github.com/tendermint/tendermint/pull/3878)
  311. * [Processor](https://github.com/tendermint/tendermint/pull/4012)
  312. * [Scheduler](https://github.com/tendermint/tendermint/pull/4043)
  313. * [Reactor](https://github.com/tendermint/tendermint/pull/4067)
  314. ## Consequences
  315. ### Positive
  316. * Test become deterministic
  317. * Simulation becomes a-termporal: no need wait for a wall-time timeout
  318. * Peer Selection can be independently tested/simulated
  319. * Develop a general approach to refactoring reactors
  320. ### Negative
  321. ### Neutral
  322. ### Implementation Path
  323. * Implement the scheduler, test the scheduler, review the rescheduler
  324. * Implement the processor, test the processor, review the processor
  325. * Implement the demuxer, write integration test, review integration tests
  326. ## References
  327. * [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md): The original blockchain reactor re-org proposal
  328. * [Blockchain re-org](https://github.com/tendermint/tendermint/pull/3561): The current blockchain reactor re-org implementation (v1)