You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

111 lines
2.9 KiB

blockchain: add v2 reactor (#4361) The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. This PR replaces #4067 which got far too large and messy after a failed attempt to rebase. ## Commits: * Blockchainv 2 reactor: + I cleaner copy of the work done in #4067 which fell too far behind and was a nightmare to rebase. + The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. * fixes after merge * reorder iIO interface methodset * change iO -> IO * panic before send nil block * rename switchToConsensus -> trySwitchToConsensus * rename tdState -> tmState * Update blockchain/v2/reactor.go Co-Authored-By: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> * remove peer when it sends a block unsolicited * check for not ready in markReceived * fix error * fix the pcFinished event * typo fix * add documentation for processor fields * simplify time.Since * try and make the linter happy * some doc updates * fix channel diagram * Update adr-043-blockchain-riri-org.md * panic on nil switch * liting fixes * account for nil block in bBlockResponseMessage * panic on duplicate block enqueued by processor * linting * goimport reactor_test.go Co-authored-by: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com> Co-authored-by: Anca Zamfir <ancazamfir@users.noreply.github.com> Co-authored-by: Marko <marbar3778@yahoo.com> Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
5 years ago
  1. package v2
  2. import (
  3. "fmt"
  4. "github.com/tendermint/tendermint/p2p"
  5. "github.com/tendermint/tendermint/state"
  6. "github.com/tendermint/tendermint/types"
  7. )
  8. type iIO interface {
  9. sendBlockRequest(peerID p2p.ID, height int64) error
  10. sendBlockToPeer(block *types.Block, peerID p2p.ID) error
  11. sendBlockNotFound(height int64, peerID p2p.ID) error
  12. sendStatusResponse(height int64, peerID p2p.ID) error
  13. broadcastStatusRequest(height int64)
  14. trySwitchToConsensus(state state.State, blocksSynced int)
  15. }
  16. type switchIO struct {
  17. sw *p2p.Switch
  18. }
  19. func newSwitchIo(sw *p2p.Switch) *switchIO {
  20. return &switchIO{
  21. sw: sw,
  22. }
  23. }
  24. const (
  25. // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
  26. BlockchainChannel = byte(0x40)
  27. )
  28. type consensusReactor interface {
  29. // for when we switch from blockchain reactor and fast sync to
  30. // the consensus machine
  31. SwitchToConsensus(state.State, int)
  32. }
  33. func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height int64) error {
  34. peer := sio.sw.Peers().Get(peerID)
  35. if peer == nil {
  36. return fmt.Errorf("peer not found")
  37. }
  38. msgBytes := cdc.MustMarshalBinaryBare(&bcBlockRequestMessage{Height: height})
  39. queued := peer.TrySend(BlockchainChannel, msgBytes)
  40. if !queued {
  41. return fmt.Errorf("send queue full")
  42. }
  43. return nil
  44. }
  45. func (sio *switchIO) sendStatusResponse(height int64, peerID p2p.ID) error {
  46. peer := sio.sw.Peers().Get(peerID)
  47. if peer == nil {
  48. return fmt.Errorf("peer not found")
  49. }
  50. msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{Height: height})
  51. if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
  52. return fmt.Errorf("peer queue full")
  53. }
  54. return nil
  55. }
  56. func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error {
  57. peer := sio.sw.Peers().Get(peerID)
  58. if peer == nil {
  59. return fmt.Errorf("peer not found")
  60. }
  61. if block == nil {
  62. panic("trying to send nil block")
  63. }
  64. msgBytes := cdc.MustMarshalBinaryBare(&bcBlockResponseMessage{Block: block})
  65. if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
  66. return fmt.Errorf("peer queue full")
  67. }
  68. return nil
  69. }
  70. func (sio *switchIO) sendBlockNotFound(height int64, peerID p2p.ID) error {
  71. peer := sio.sw.Peers().Get(peerID)
  72. if peer == nil {
  73. return fmt.Errorf("peer not found")
  74. }
  75. msgBytes := cdc.MustMarshalBinaryBare(&bcNoBlockResponseMessage{Height: height})
  76. if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
  77. return fmt.Errorf("peer queue full")
  78. }
  79. return nil
  80. }
  81. func (sio *switchIO) trySwitchToConsensus(state state.State, blocksSynced int) {
  82. conR, ok := sio.sw.Reactor("CONSENSUS").(consensusReactor)
  83. if ok {
  84. conR.SwitchToConsensus(state, blocksSynced)
  85. }
  86. }
  87. func (sio *switchIO) broadcastStatusRequest(height int64) {
  88. msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{height})
  89. // XXX: maybe we should use an io specific peer list here
  90. sio.sw.Broadcast(BlockchainChannel, msgBytes)
  91. }