You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

718 lines
18 KiB

Blockchain v2 Scheduler (#4043) * Add processor prototype * Change processor API + expose a simple `handle` function which mutates internal state * schedule event handling * rename schedule -> scheduler * fill in handle function * processor tests * fix gofmt and ohter golangci issues * scopelint var on range scope * add check for short block received * small test reorg * ci fix changes * go.mod revert * some cleanup and review comments * scheduler fixes and unit tests, also small processor changes. changed scPeerPruned to include a list of pruned peers touchPeer to check peer state and remove the blocks from blockStates if the peer removal causes the max peer height to be lower. remove the block at sc.initHeight changed peersInactiveSince, peersSlowerThan, getPeersAtHeight check peer state prunablePeers to return a sorted list of peers lastRate in markReceived() attempted to divide by 0, temp fix. fixed allBlocksProcessed conditions maxHeight() and minHeight() to return sc.initHeight if no ready peers present make selectPeer() deterministic. added handleBlockProcessError() added termination cond. (sc.allBlocksProcessed()) to handleTryPrunePeer() and others. changed pcBlockVerificationFailure to include peer of H+2 block along with the one for H+1 changed the processor to call purgePeer on block verification failure. fixed processor tests added scheduler tests. * typo and ci fixes * remove height from scBlockRequest, golangci fixes * limit on blockState map, updated tests * remove unused * separate test for maxHeight(), used for sched. validation * use Math.Min * fix golangci * Document the semantics of blockStates in the scheduler * better docs * distinguish between unknown and invalid blockstate * Standardize peer filtering methods * feedback * s/getPeersAtHeight/getPeersAtHeightOrAbove * small notes * Update blockchain/v2/scheduler.go Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com> * Update comments based on feedback * Add enum offset * panic on nil block in processor * remove unused max height calculation * format shorter line
5 years ago
  1. package v2
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math"
  6. "sort"
  7. "time"
  8. "github.com/tendermint/tendermint/p2p"
  9. "github.com/tendermint/tendermint/types"
  10. )
  11. // Events
  12. // XXX: The handle API would be much simpler if it return a single event, an
  13. // Event, which embeds a terminationEvent if it wants to terminate the routine.
  14. // Input events into the scheduler:
  15. // ticker event for cleaning peers
  16. type tryPrunePeer struct {
  17. priorityHigh
  18. time time.Time
  19. }
  20. // ticker event for scheduling block requests
  21. type trySchedule struct {
  22. priorityHigh
  23. time time.Time
  24. }
  25. // blockResponse message received from a peer
  26. type bcBlockResponse struct {
  27. priorityNormal
  28. time time.Time
  29. peerID p2p.ID
  30. height int64
  31. size int64
  32. block *types.Block
  33. }
  34. // statusResponse message received from a peer
  35. type bcStatusResponse struct {
  36. priorityNormal
  37. time time.Time
  38. peerID p2p.ID
  39. height int64
  40. }
  41. // new peer is connected
  42. type addNewPeer struct {
  43. priorityNormal
  44. peerID p2p.ID
  45. }
  46. // Output events issued by the scheduler:
  47. // all blocks have been processed
  48. type scFinishedEv struct {
  49. priorityNormal
  50. }
  51. // send a blockRequest message
  52. type scBlockRequest struct {
  53. priorityNormal
  54. peerID p2p.ID
  55. height int64
  56. }
  57. // a block has been received and validated by the scheduler
  58. type scBlockReceived struct {
  59. priorityNormal
  60. peerID p2p.ID
  61. block *types.Block
  62. }
  63. // scheduler detected a peer error
  64. type scPeerError struct {
  65. priorityHigh
  66. peerID p2p.ID
  67. reason error
  68. }
  69. // scheduler removed a set of peers (timed out or slow peer)
  70. type scPeersPruned struct {
  71. priorityHigh
  72. peers []p2p.ID
  73. }
  74. // XXX: make this fatal?
  75. // scheduler encountered a fatal error
  76. type scSchedulerFail struct {
  77. priorityHigh
  78. reason error
  79. }
  80. type blockState int
  81. const (
  82. blockStateUnknown blockState = iota + 1 // no known peer has this block
  83. blockStateNew // indicates that a peer has reported having this block
  84. blockStatePending // indicates that this block has been requested from a peer
  85. blockStateReceived // indicates that this block has been received by a peer
  86. blockStateProcessed // indicates that this block has been applied
  87. )
  88. func (e blockState) String() string {
  89. switch e {
  90. case blockStateUnknown:
  91. return "Unknown"
  92. case blockStateNew:
  93. return "New"
  94. case blockStatePending:
  95. return "Pending"
  96. case blockStateReceived:
  97. return "Received"
  98. case blockStateProcessed:
  99. return "Processed"
  100. default:
  101. return fmt.Sprintf("invalid blockState: %d", e)
  102. }
  103. }
  104. type peerState int
  105. const (
  106. peerStateNew = iota + 1
  107. peerStateReady
  108. peerStateRemoved
  109. )
  110. func (e peerState) String() string {
  111. switch e {
  112. case peerStateNew:
  113. return "New"
  114. case peerStateReady:
  115. return "Ready"
  116. case peerStateRemoved:
  117. return "Removed"
  118. default:
  119. panic(fmt.Sprintf("unknown peerState: %d", e))
  120. }
  121. }
  122. type scPeer struct {
  123. peerID p2p.ID
  124. // initialized as New when peer is added, updated to Ready when statusUpdate is received,
  125. // updated to Removed when peer is removed
  126. state peerState
  127. height int64 // updated when statusResponse is received
  128. lastTouched time.Time
  129. lastRate int64 // last receive rate in bytes
  130. }
  131. func (p scPeer) String() string {
  132. return fmt.Sprintf("{state %v, height %d, lastTouched %v, lastRate %d, id %v}",
  133. p.state, p.height, p.lastTouched, p.lastRate, p.peerID)
  134. }
  135. func newScPeer(peerID p2p.ID) *scPeer {
  136. return &scPeer{
  137. peerID: peerID,
  138. state: peerStateNew,
  139. height: -1,
  140. }
  141. }
  142. // The scheduler keep track of the state of each block and each peer. The
  143. // scheduler will attempt to schedule new block requests with `trySchedule`
  144. // events and remove slow peers with `tryPrune` events.
  145. type scheduler struct {
  146. initHeight int64
  147. // next block that needs to be processed. All blocks with smaller height are
  148. // in Processed state.
  149. height int64
  150. // a map of peerID to scheduler specific peer struct `scPeer` used to keep
  151. // track of peer specific state
  152. peers map[p2p.ID]*scPeer
  153. peerTimeout time.Duration
  154. minRecvRate int64 // minimum receive rate from peer otherwise prune
  155. // the maximum number of blocks that should be New, Received or Pending at any point
  156. // in time. This is used to enforce a limit on the blockStates map.
  157. targetPending int
  158. // a list of blocks to be scheduled (New), Pending or Received. Its length should be
  159. // smaller than targetPending.
  160. blockStates map[int64]blockState
  161. // a map of heights to the peer we are waiting a response from
  162. pendingBlocks map[int64]p2p.ID
  163. // the time at which a block was put in blockStatePending
  164. pendingTime map[int64]time.Time
  165. // a map of heights to the peers that put the block in blockStateReceived
  166. receivedBlocks map[int64]p2p.ID
  167. }
  168. func (sc scheduler) String() string {
  169. return fmt.Sprintf("ih: %d, bst: %v, peers: %v, pblks: %v, ptm %v, rblks: %v",
  170. sc.initHeight, sc.blockStates, sc.peers, sc.pendingBlocks, sc.pendingTime, sc.receivedBlocks)
  171. }
  172. func newScheduler(initHeight int64) *scheduler {
  173. sc := scheduler{
  174. initHeight: initHeight,
  175. height: initHeight + 1,
  176. blockStates: make(map[int64]blockState),
  177. peers: make(map[p2p.ID]*scPeer),
  178. pendingBlocks: make(map[int64]p2p.ID),
  179. pendingTime: make(map[int64]time.Time),
  180. receivedBlocks: make(map[int64]p2p.ID),
  181. }
  182. return &sc
  183. }
  184. func (sc *scheduler) addPeer(peerID p2p.ID) error {
  185. if _, ok := sc.peers[peerID]; ok {
  186. // In the future we should be able to add a previously removed peer
  187. return fmt.Errorf("cannot add duplicate peer %s", peerID)
  188. }
  189. sc.peers[peerID] = newScPeer(peerID)
  190. return nil
  191. }
  192. func (sc *scheduler) touchPeer(peerID p2p.ID, time time.Time) error {
  193. peer, ok := sc.peers[peerID]
  194. if !ok {
  195. return fmt.Errorf("couldn't find peer %s", peerID)
  196. }
  197. if peer.state != peerStateReady {
  198. return fmt.Errorf("tried to touch peer in state %s, must be Ready", peer.state)
  199. }
  200. peer.lastTouched = time
  201. return nil
  202. }
  203. func (sc *scheduler) removePeer(peerID p2p.ID) error {
  204. peer, ok := sc.peers[peerID]
  205. if !ok {
  206. return fmt.Errorf("couldn't find peer %s", peerID)
  207. }
  208. if peer.state == peerStateRemoved {
  209. return fmt.Errorf("tried to remove peer %s in peerStateRemoved", peerID)
  210. }
  211. for height, pendingPeerID := range sc.pendingBlocks {
  212. if pendingPeerID == peerID {
  213. sc.setStateAtHeight(height, blockStateNew)
  214. delete(sc.pendingTime, height)
  215. delete(sc.pendingBlocks, height)
  216. }
  217. }
  218. for height, rcvPeerID := range sc.receivedBlocks {
  219. if rcvPeerID == peerID {
  220. sc.setStateAtHeight(height, blockStateNew)
  221. delete(sc.receivedBlocks, height)
  222. }
  223. }
  224. // remove the blocks from blockStates if the peer removal causes the max peer height to be lower.
  225. peer.state = peerStateRemoved
  226. maxPeerHeight := int64(0)
  227. for _, otherPeer := range sc.peers {
  228. if otherPeer.state != peerStateReady {
  229. continue
  230. }
  231. if otherPeer.peerID != peer.peerID && otherPeer.height > maxPeerHeight {
  232. maxPeerHeight = otherPeer.height
  233. }
  234. }
  235. for h := range sc.blockStates {
  236. if h > maxPeerHeight {
  237. delete(sc.blockStates, h)
  238. }
  239. }
  240. return nil
  241. }
  242. // check if the blockPool is running low and add new blocks in New state to be requested.
  243. // This function is called when there is an increase in the maximum peer height or when
  244. // blocks are processed.
  245. func (sc *scheduler) addNewBlocks() {
  246. if len(sc.blockStates) >= sc.targetPending {
  247. return
  248. }
  249. for i := sc.height; i < int64(sc.targetPending)+sc.height; i++ {
  250. if i > sc.maxHeight() {
  251. break
  252. }
  253. if sc.getStateAtHeight(i) == blockStateUnknown {
  254. sc.setStateAtHeight(i, blockStateNew)
  255. }
  256. }
  257. }
  258. func (sc *scheduler) setPeerHeight(peerID p2p.ID, height int64) error {
  259. peer, ok := sc.peers[peerID]
  260. if !ok {
  261. return fmt.Errorf("cannot find peer %s", peerID)
  262. }
  263. if peer.state == peerStateRemoved {
  264. return fmt.Errorf("cannot set peer height for a peer in peerStateRemoved")
  265. }
  266. if height < peer.height {
  267. return fmt.Errorf("cannot move peer height lower. from %d to %d", peer.height, height)
  268. }
  269. peer.height = height
  270. peer.state = peerStateReady
  271. sc.addNewBlocks()
  272. return nil
  273. }
  274. func (sc *scheduler) getStateAtHeight(height int64) blockState {
  275. if height <= sc.initHeight {
  276. return blockStateProcessed
  277. } else if state, ok := sc.blockStates[height]; ok {
  278. return state
  279. } else {
  280. return blockStateUnknown
  281. }
  282. }
  283. func (sc *scheduler) getPeersAtHeightOrAbove(height int64) []p2p.ID {
  284. peers := make([]p2p.ID, 0)
  285. for _, peer := range sc.peers {
  286. if peer.state != peerStateReady {
  287. continue
  288. }
  289. if peer.height >= height {
  290. peers = append(peers, peer.peerID)
  291. }
  292. }
  293. return peers
  294. }
  295. func (sc *scheduler) peersInactiveSince(duration time.Duration, now time.Time) []p2p.ID {
  296. peers := []p2p.ID{}
  297. for _, peer := range sc.peers {
  298. if peer.state != peerStateReady {
  299. continue
  300. }
  301. if now.Sub(peer.lastTouched) > duration {
  302. peers = append(peers, peer.peerID)
  303. }
  304. }
  305. // Ensure the order is deterministic for testing
  306. sort.Sort(PeerByID(peers))
  307. return peers
  308. }
  309. // will return peers who's lastRate i slower than minSpeed denominated in bytes
  310. func (sc *scheduler) peersSlowerThan(minSpeed int64) []p2p.ID {
  311. peers := []p2p.ID{}
  312. for peerID, peer := range sc.peers {
  313. if peer.state != peerStateReady {
  314. continue
  315. }
  316. if peer.lastRate < minSpeed {
  317. peers = append(peers, peerID)
  318. }
  319. }
  320. // Ensure the order is deterministic for testing
  321. sort.Sort(PeerByID(peers))
  322. return peers
  323. }
  324. func (sc *scheduler) prunablePeers(peerTimout time.Duration, minRecvRate int64, now time.Time) []p2p.ID {
  325. prunable := []p2p.ID{}
  326. for peerID, peer := range sc.peers {
  327. if peer.state != peerStateReady {
  328. continue
  329. }
  330. if now.Sub(peer.lastTouched) > peerTimout || peer.lastRate < minRecvRate {
  331. prunable = append(prunable, peerID)
  332. }
  333. }
  334. // Tests for handleTryPrunePeer() may fail without sort due to range non-determinism
  335. sort.Sort(PeerByID(prunable))
  336. return prunable
  337. }
  338. func (sc *scheduler) setStateAtHeight(height int64, state blockState) {
  339. sc.blockStates[height] = state
  340. }
  341. func (sc *scheduler) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error {
  342. peer, ok := sc.peers[peerID]
  343. if !ok {
  344. return fmt.Errorf("couldn't find peer %s", peerID)
  345. }
  346. if peer.state == peerStateRemoved {
  347. return fmt.Errorf("cannot receive blocks from removed peer %s", peerID)
  348. }
  349. if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID {
  350. return fmt.Errorf("received block %d from peer %s without being requested", height, peerID)
  351. }
  352. pendingTime, ok := sc.pendingTime[height]
  353. if !ok || now.Sub(pendingTime) <= 0 {
  354. return fmt.Errorf("clock error: block %d received at %s but requested at %s",
  355. height, pendingTime, now)
  356. }
  357. peer.lastRate = size / now.Sub(pendingTime).Nanoseconds()
  358. sc.setStateAtHeight(height, blockStateReceived)
  359. delete(sc.pendingBlocks, height)
  360. delete(sc.pendingTime, height)
  361. sc.receivedBlocks[height] = peerID
  362. return nil
  363. }
  364. func (sc *scheduler) markPending(peerID p2p.ID, height int64, time time.Time) error {
  365. state := sc.getStateAtHeight(height)
  366. if state != blockStateNew {
  367. return fmt.Errorf("block %d should be in blockStateNew but is %s", height, state)
  368. }
  369. peer, ok := sc.peers[peerID]
  370. if !ok {
  371. return fmt.Errorf("cannot find peer %s", peerID)
  372. }
  373. if peer.state != peerStateReady {
  374. return fmt.Errorf("cannot schedule %d from %s in %s", height, peerID, peer.state)
  375. }
  376. if height > peer.height {
  377. return fmt.Errorf("cannot request height %d from peer %s that is at height %d",
  378. height, peerID, peer.height)
  379. }
  380. sc.setStateAtHeight(height, blockStatePending)
  381. sc.pendingBlocks[height] = peerID
  382. // XXX: to make this more accurate we can introduce a message from
  383. // the IO routine which indicates the time the request was put on the wire
  384. sc.pendingTime[height] = time
  385. return nil
  386. }
  387. func (sc *scheduler) markProcessed(height int64) error {
  388. state := sc.getStateAtHeight(height)
  389. if state != blockStateReceived {
  390. return fmt.Errorf("cannot mark height %d received from block state %s", height, state)
  391. }
  392. sc.height++
  393. delete(sc.receivedBlocks, height)
  394. delete(sc.blockStates, height)
  395. sc.addNewBlocks()
  396. return nil
  397. }
  398. func (sc *scheduler) allBlocksProcessed() bool {
  399. return sc.height >= sc.maxHeight()
  400. }
  401. // returns max peer height or the last processed block, i.e. sc.height
  402. func (sc *scheduler) maxHeight() int64 {
  403. max := sc.height - 1
  404. for _, peer := range sc.peers {
  405. if peer.state != peerStateReady {
  406. continue
  407. }
  408. if peer.height > max {
  409. max = peer.height
  410. }
  411. }
  412. return max
  413. }
  414. // lowest block in sc.blockStates with state == blockStateNew or -1 if no new blocks
  415. func (sc *scheduler) nextHeightToSchedule() int64 {
  416. var min int64 = math.MaxInt64
  417. for height, state := range sc.blockStates {
  418. if state == blockStateNew && height < min {
  419. min = height
  420. }
  421. }
  422. if min == math.MaxInt64 {
  423. min = -1
  424. }
  425. return min
  426. }
  427. func (sc *scheduler) pendingFrom(peerID p2p.ID) []int64 {
  428. var heights []int64
  429. for height, pendingPeerID := range sc.pendingBlocks {
  430. if pendingPeerID == peerID {
  431. heights = append(heights, height)
  432. }
  433. }
  434. return heights
  435. }
  436. func (sc *scheduler) selectPeer(height int64) (p2p.ID, error) {
  437. peers := sc.getPeersAtHeightOrAbove(height)
  438. if len(peers) == 0 {
  439. return "", fmt.Errorf("cannot find peer for height %d", height)
  440. }
  441. // create a map from number of pending requests to a list
  442. // of peers having that number of pending requests.
  443. pendingFrom := make(map[int][]p2p.ID)
  444. for _, peerID := range peers {
  445. numPending := len(sc.pendingFrom(peerID))
  446. pendingFrom[numPending] = append(pendingFrom[numPending], peerID)
  447. }
  448. // find the set of peers with minimum number of pending requests.
  449. minPending := math.MaxInt64
  450. for mp := range pendingFrom {
  451. if mp < minPending {
  452. minPending = mp
  453. }
  454. }
  455. sort.Sort(PeerByID(pendingFrom[minPending]))
  456. return pendingFrom[minPending][0], nil
  457. }
  458. // PeerByID is a list of peers sorted by peerID.
  459. type PeerByID []p2p.ID
  460. func (peers PeerByID) Len() int {
  461. return len(peers)
  462. }
  463. func (peers PeerByID) Less(i, j int) bool {
  464. return bytes.Compare([]byte(peers[i]), []byte(peers[j])) == -1
  465. }
  466. func (peers PeerByID) Swap(i, j int) {
  467. it := peers[i]
  468. peers[i] = peers[j]
  469. peers[j] = it
  470. }
  471. // Handlers
  472. // This handler gets the block, performs some validation and then passes it on to the processor.
  473. func (sc *scheduler) handleBlockResponse(event bcBlockResponse) (Event, error) {
  474. err := sc.touchPeer(event.peerID, event.time)
  475. if err != nil {
  476. return scPeerError{peerID: event.peerID, reason: err}, nil
  477. }
  478. err = sc.markReceived(event.peerID, event.block.Height, event.size, event.time)
  479. if err != nil {
  480. return scPeerError{peerID: event.peerID, reason: err}, nil
  481. }
  482. return scBlockReceived{peerID: event.peerID, block: event.block}, nil
  483. }
  484. func (sc *scheduler) handleBlockProcessed(event pcBlockProcessed) (Event, error) {
  485. if event.height != sc.height {
  486. panic(fmt.Sprintf("processed height %d but expected height %d", event.height, sc.height))
  487. }
  488. err := sc.markProcessed(event.height)
  489. if err != nil {
  490. // It is possible that a peer error or timeout is handled after the processor
  491. // has processed the block but before the scheduler received this event,
  492. // so when pcBlockProcessed event is received the block had been requested again
  493. return scSchedulerFail{reason: err}, nil
  494. }
  495. if sc.allBlocksProcessed() {
  496. return scFinishedEv{}, nil
  497. }
  498. return noOp, nil
  499. }
  500. // Handles an error from the processor. The processor had already cleaned the blocks from
  501. // the peers included in this event. Just attempt to remove the peers.
  502. func (sc *scheduler) handleBlockProcessError(event pcBlockVerificationFailure) (Event, error) {
  503. if len(sc.peers) == 0 {
  504. return noOp, nil
  505. }
  506. // The peers may have been just removed due to errors, low speed or timeouts.
  507. _ = sc.removePeer(event.firstPeerID)
  508. if event.firstPeerID != event.secondPeerID {
  509. _ = sc.removePeer(event.secondPeerID)
  510. }
  511. if sc.allBlocksProcessed() {
  512. return scFinishedEv{}, nil
  513. }
  514. return noOp, nil
  515. }
  516. func (sc *scheduler) handleAddNewPeer(event addNewPeer) (Event, error) {
  517. err := sc.addPeer(event.peerID)
  518. if err != nil {
  519. return scSchedulerFail{reason: err}, nil
  520. }
  521. return noOp, nil
  522. }
  523. // XXX: unify types peerError
  524. func (sc *scheduler) handlePeerError(event peerError) (Event, error) {
  525. err := sc.removePeer(event.peerID)
  526. if err != nil {
  527. // XXX - It is possible that the removePeer fails here for legitimate reasons
  528. // for example if a peer timeout or error was handled just before this.
  529. return scSchedulerFail{reason: err}, nil
  530. }
  531. if sc.allBlocksProcessed() {
  532. return scFinishedEv{}, nil
  533. }
  534. return noOp, nil
  535. }
  536. func (sc *scheduler) handleTryPrunePeer(event tryPrunePeer) (Event, error) {
  537. prunablePeers := sc.prunablePeers(sc.peerTimeout, sc.minRecvRate, event.time)
  538. if len(prunablePeers) == 0 {
  539. return noOp, nil
  540. }
  541. for _, peerID := range prunablePeers {
  542. err := sc.removePeer(peerID)
  543. if err != nil {
  544. // Should never happen as prunablePeers() returns only existing peers in Ready state.
  545. panic("scheduler data corruption")
  546. }
  547. }
  548. // If all blocks are processed we should finish even some peers were pruned.
  549. if sc.allBlocksProcessed() {
  550. return scFinishedEv{}, nil
  551. }
  552. return scPeersPruned{peers: prunablePeers}, nil
  553. }
  554. // TODO - Schedule multiple block requests
  555. func (sc *scheduler) handleTrySchedule(event trySchedule) (Event, error) {
  556. nextHeight := sc.nextHeightToSchedule()
  557. if nextHeight == -1 {
  558. return noOp, nil
  559. }
  560. bestPeerID, err := sc.selectPeer(nextHeight)
  561. if err != nil {
  562. return scSchedulerFail{reason: err}, nil
  563. }
  564. if err := sc.markPending(bestPeerID, nextHeight, event.time); err != nil {
  565. return scSchedulerFail{reason: err}, nil // XXX: peerError might be more appropriate
  566. }
  567. return scBlockRequest{peerID: bestPeerID, height: nextHeight}, nil
  568. }
  569. func (sc *scheduler) handleStatusResponse(event bcStatusResponse) (Event, error) {
  570. err := sc.setPeerHeight(event.peerID, event.height)
  571. if err != nil {
  572. return scPeerError{peerID: event.peerID, reason: err}, nil
  573. }
  574. return noOp, nil
  575. }
  576. func (sc *scheduler) handle(event Event) (Event, error) {
  577. switch event := event.(type) {
  578. case bcStatusResponse:
  579. nextEvent, err := sc.handleStatusResponse(event)
  580. return nextEvent, err
  581. case bcBlockResponse:
  582. nextEvent, err := sc.handleBlockResponse(event)
  583. return nextEvent, err
  584. case trySchedule:
  585. nextEvent, err := sc.handleTrySchedule(event)
  586. return nextEvent, err
  587. case addNewPeer:
  588. nextEvent, err := sc.handleAddNewPeer(event)
  589. return nextEvent, err
  590. case tryPrunePeer:
  591. nextEvent, err := sc.handleTryPrunePeer(event)
  592. return nextEvent, err
  593. case peerError:
  594. nextEvent, err := sc.handlePeerError(event)
  595. return nextEvent, err
  596. case pcBlockProcessed:
  597. nextEvent, err := sc.handleBlockProcessed(event)
  598. return nextEvent, err
  599. case pcBlockVerificationFailure:
  600. nextEvent, err := sc.handleBlockProcessError(event)
  601. return nextEvent, err
  602. default:
  603. return scSchedulerFail{reason: fmt.Errorf("unknown event %v", event)}, nil
  604. }
  605. //return noOp, nil
  606. }