You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
3.4 KiB

blockchain: add v2 reactor (#4361) The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. This PR replaces #4067 which got far too large and messy after a failed attempt to rebase. ## Commits: * Blockchainv 2 reactor: + I cleaner copy of the work done in #4067 which fell too far behind and was a nightmare to rebase. + The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. * fixes after merge * reorder iIO interface methodset * change iO -> IO * panic before send nil block * rename switchToConsensus -> trySwitchToConsensus * rename tdState -> tmState * Update blockchain/v2/reactor.go Co-Authored-By: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> * remove peer when it sends a block unsolicited * check for not ready in markReceived * fix error * fix the pcFinished event * typo fix * add documentation for processor fields * simplify time.Since * try and make the linter happy * some doc updates * fix channel diagram * Update adr-043-blockchain-riri-org.md * panic on nil switch * liting fixes * account for nil block in bBlockResponseMessage * panic on duplicate block enqueued by processor * linting * goimport reactor_test.go Co-authored-by: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> Co-authored-by: Anca Zamfir <ancazamfir@users.noreply.github.com> Co-authored-by: Marko <marbar3778@yahoo.com> Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
5 years ago
5 years ago
Blockchain v2 Scheduler (#4043) * Add processor prototype * Change processor API + expose a simple `handle` function which mutates internal state * schedule event handling * rename schedule -> scheduler * fill in handle function * processor tests * fix gofmt and ohter golangci issues * scopelint var on range scope * add check for short block received * small test reorg * ci fix changes * go.mod revert * some cleanup and review comments * scheduler fixes and unit tests, also small processor changes. changed scPeerPruned to include a list of pruned peers touchPeer to check peer state and remove the blocks from blockStates if the peer removal causes the max peer height to be lower. remove the block at sc.initHeight changed peersInactiveSince, peersSlowerThan, getPeersAtHeight check peer state prunablePeers to return a sorted list of peers lastRate in markReceived() attempted to divide by 0, temp fix. fixed allBlocksProcessed conditions maxHeight() and minHeight() to return sc.initHeight if no ready peers present make selectPeer() deterministic. added handleBlockProcessError() added termination cond. (sc.allBlocksProcessed()) to handleTryPrunePeer() and others. changed pcBlockVerificationFailure to include peer of H+2 block along with the one for H+1 changed the processor to call purgePeer on block verification failure. fixed processor tests added scheduler tests. * typo and ci fixes * remove height from scBlockRequest, golangci fixes * limit on blockState map, updated tests * remove unused * separate test for maxHeight(), used for sched. validation * use Math.Min * fix golangci * Document the semantics of blockStates in the scheduler * better docs * distinguish between unknown and invalid blockstate * Standardize peer filtering methods * feedback * s/getPeersAtHeight/getPeersAtHeightOrAbove * small notes * Update blockchain/v2/scheduler.go Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com> * Update comments based on feedback * Add enum offset * panic on nil block in processor * remove unused max height calculation * format shorter line
5 years ago
5 years ago
5 years ago
5 years ago
blockchain: add v2 reactor (#4361) The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. This PR replaces #4067 which got far too large and messy after a failed attempt to rebase. ## Commits: * Blockchainv 2 reactor: + I cleaner copy of the work done in #4067 which fell too far behind and was a nightmare to rebase. + The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. * fixes after merge * reorder iIO interface methodset * change iO -> IO * panic before send nil block * rename switchToConsensus -> trySwitchToConsensus * rename tdState -> tmState * Update blockchain/v2/reactor.go Co-Authored-By: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> * remove peer when it sends a block unsolicited * check for not ready in markReceived * fix error * fix the pcFinished event * typo fix * add documentation for processor fields * simplify time.Since * try and make the linter happy * some doc updates * fix channel diagram * Update adr-043-blockchain-riri-org.md * panic on nil switch * liting fixes * account for nil block in bBlockResponseMessage * panic on duplicate block enqueued by processor * linting * goimport reactor_test.go Co-authored-by: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> Co-authored-by: Anca Zamfir <ancazamfir@users.noreply.github.com> Co-authored-by: Marko <marbar3778@yahoo.com> Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
5 years ago
blockchain: add v2 reactor (#4361) The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. This PR replaces #4067 which got far too large and messy after a failed attempt to rebase. ## Commits: * Blockchainv 2 reactor: + I cleaner copy of the work done in #4067 which fell too far behind and was a nightmare to rebase. + The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. * fixes after merge * reorder iIO interface methodset * change iO -> IO * panic before send nil block * rename switchToConsensus -> trySwitchToConsensus * rename tdState -> tmState * Update blockchain/v2/reactor.go Co-Authored-By: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> * remove peer when it sends a block unsolicited * check for not ready in markReceived * fix error * fix the pcFinished event * typo fix * add documentation for processor fields * simplify time.Since * try and make the linter happy * some doc updates * fix channel diagram * Update adr-043-blockchain-riri-org.md * panic on nil switch * liting fixes * account for nil block in bBlockResponseMessage * panic on duplicate block enqueued by processor * linting * goimport reactor_test.go Co-authored-by: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> Co-authored-by: Anca Zamfir <ancazamfir@users.noreply.github.com> Co-authored-by: Marko <marbar3778@yahoo.com> Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
5 years ago
Blockchain v2 Scheduler (#4043) * Add processor prototype * Change processor API + expose a simple `handle` function which mutates internal state * schedule event handling * rename schedule -> scheduler * fill in handle function * processor tests * fix gofmt and ohter golangci issues * scopelint var on range scope * add check for short block received * small test reorg * ci fix changes * go.mod revert * some cleanup and review comments * scheduler fixes and unit tests, also small processor changes. changed scPeerPruned to include a list of pruned peers touchPeer to check peer state and remove the blocks from blockStates if the peer removal causes the max peer height to be lower. remove the block at sc.initHeight changed peersInactiveSince, peersSlowerThan, getPeersAtHeight check peer state prunablePeers to return a sorted list of peers lastRate in markReceived() attempted to divide by 0, temp fix. fixed allBlocksProcessed conditions maxHeight() and minHeight() to return sc.initHeight if no ready peers present make selectPeer() deterministic. added handleBlockProcessError() added termination cond. (sc.allBlocksProcessed()) to handleTryPrunePeer() and others. changed pcBlockVerificationFailure to include peer of H+2 block along with the one for H+1 changed the processor to call purgePeer on block verification failure. fixed processor tests added scheduler tests. * typo and ci fixes * remove height from scBlockRequest, golangci fixes * limit on blockState map, updated tests * remove unused * separate test for maxHeight(), used for sched. validation * use Math.Min * fix golangci * Document the semantics of blockStates in the scheduler * better docs * distinguish between unknown and invalid blockstate * Standardize peer filtering methods * feedback * s/getPeersAtHeight/getPeersAtHeightOrAbove * small notes * Update blockchain/v2/scheduler.go Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com> * Update comments based on feedback * Add enum offset * panic on nil block in processor * remove unused max height calculation * format shorter line
5 years ago
  1. package v2
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. "github.com/Workiva/go-datastructures/queue"
  6. "github.com/tendermint/tendermint/libs/log"
  7. )
  8. type handleFunc = func(event Event) (Event, error)
  9. // Routine is a structure that models a finite state machine as serialized
  10. // stream of events processed by a handle function. This Routine structure
  11. // handles the concurrency and messaging guarantees. Events are sent via
  12. // `send` are handled by the `handle` function to produce an iterator
  13. // `next()`. Calling `stop()` on a routine will conclude processing of all
  14. // sent events and produce `final()` event representing the terminal state.
  15. type Routine struct {
  16. name string
  17. handle handleFunc
  18. queue *queue.PriorityQueue
  19. out chan Event
  20. fin chan error
  21. rdy chan struct{}
  22. running *uint32
  23. logger log.Logger
  24. metrics *Metrics
  25. }
  26. func newRoutine(name string, handleFunc handleFunc, bufferSize int) *Routine {
  27. return &Routine{
  28. name: name,
  29. handle: handleFunc,
  30. queue: queue.NewPriorityQueue(bufferSize, true),
  31. out: make(chan Event, bufferSize),
  32. rdy: make(chan struct{}, 1),
  33. fin: make(chan error, 1),
  34. running: new(uint32),
  35. logger: log.NewNopLogger(),
  36. metrics: NopMetrics(),
  37. }
  38. }
  39. func (rt *Routine) setLogger(logger log.Logger) {
  40. rt.logger = logger
  41. }
  42. // nolint:unused
  43. func (rt *Routine) setMetrics(metrics *Metrics) {
  44. rt.metrics = metrics
  45. }
  46. func (rt *Routine) start() {
  47. rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name))
  48. running := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1))
  49. if !running {
  50. panic(fmt.Sprintf("%s is already running", rt.name))
  51. }
  52. close(rt.rdy)
  53. defer func() {
  54. stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0))
  55. if !stopped {
  56. panic(fmt.Sprintf("%s is failed to stop", rt.name))
  57. }
  58. }()
  59. for {
  60. events, err := rt.queue.Get(1)
  61. if err == queue.ErrDisposed {
  62. rt.terminate(nil)
  63. return
  64. } else if err != nil {
  65. rt.terminate(err)
  66. return
  67. }
  68. oEvent, err := rt.handle(events[0].(Event))
  69. rt.metrics.EventsHandled.With("routine", rt.name).Add(1)
  70. if err != nil {
  71. rt.terminate(err)
  72. return
  73. }
  74. rt.metrics.EventsOut.With("routine", rt.name).Add(1)
  75. rt.logger.Debug(fmt.Sprintf("%s: produced %T %+v\n", rt.name, oEvent, oEvent))
  76. rt.out <- oEvent
  77. }
  78. }
  79. // XXX: look into returning OpError in the net package
  80. func (rt *Routine) send(event Event) bool {
  81. rt.logger.Debug(fmt.Sprintf("%s: received %T %+v", rt.name, event, event))
  82. if !rt.isRunning() {
  83. return false
  84. }
  85. err := rt.queue.Put(event)
  86. if err != nil {
  87. rt.metrics.EventsShed.With("routine", rt.name).Add(1)
  88. rt.logger.Info(fmt.Sprintf("%s: send failed, queue was full/stopped \n", rt.name))
  89. return false
  90. }
  91. rt.metrics.EventsSent.With("routine", rt.name).Add(1)
  92. return true
  93. }
  94. func (rt *Routine) isRunning() bool {
  95. return atomic.LoadUint32(rt.running) == 1
  96. }
  97. func (rt *Routine) next() chan Event {
  98. return rt.out
  99. }
  100. func (rt *Routine) ready() chan struct{} {
  101. return rt.rdy
  102. }
  103. func (rt *Routine) stop() {
  104. if !rt.isRunning() { // XXX: this should check rt.queue.Disposed()
  105. return
  106. }
  107. rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name))
  108. rt.queue.Dispose() // this should block until all queue items are free?
  109. }
  110. func (rt *Routine) final() chan error {
  111. return rt.fin
  112. }
  113. // XXX: Maybe get rid of this
  114. func (rt *Routine) terminate(reason error) {
  115. // We don't close the rt.out channel here, to avoid spinning on the closed channel
  116. // in the event loop.
  117. rt.fin <- reason
  118. }