blockchain: Reorg reactor (#3561)
* go routines in blockchain reactor
* Added reference to the go routine diagram
* Initial commit
* cleanup
* Undo testing_logger change, committed by mistake
* Fix the test loggers
* pulled some fsm code into pool.go
* added pool tests
* changes to the design
added block requests under peer
moved the request trigger in the reactor poolRoutine, triggered now by a ticker
in general moved everything required for making block requests smarter in the poolRoutine
added a simple map of heights to keep track of what will need to be requested next
added a few more tests
* send errors to FSM in a different channel than blocks
send errors (RemovePeer) from switch on a different channel than the
one receiving blocks
renamed channels
added more pool tests
* more pool tests
* lint errors
* more tests
* more tests
* switch fast sync to new implementation
* fixed data race in tests
* cleanup
* finished fsm tests
* address golangci comments :)
* address golangci comments :)
* Added timeout on next block needed to advance
* updating docs and cleanup
* fix issue in test from previous cleanup
* cleanup
* Added termination scenarios, tests and more cleanup
* small fixes to adr, comments and cleanup
* Fix bug in sendRequest()
If we tried to send a request to a peer not present in the switch, a
missing continue statement caused the request to be blackholed in a peer
that was removed and never retried.
While this bug was manifesting, the reactor kept asking for other
blocks that would be stored and never consumed. Added the number of
unconsumed blocks in the math for requesting blocks ahead of current
processing height so eventually there will be no more blocks requested
until the already received ones are consumed.
* remove bpPeer's didTimeout field
* Use distinct err codes for peer timeout and FSM timeouts
* Don't allow peers to update with lower height
* review comments from Ethan and Zarko
* some cleanup, renaming, comments
* Move block execution in separate goroutine
* Remove pool's numPending
* review comments
* fix lint, remove old blockchain reactor and duplicates in fsm tests
* small reorg around peer after review comments
* add the reactor spec
* verify block only once
* review comments
* change to int for max number of pending requests
* cleanup and godoc
* Add configuration flag fast sync version
* golangci fixes
* fix config template
* move both reactor versions under blockchain
* cleanup, golint, renaming stuff
* updated documentation, fixed more golint warnings
* integrate with behavior package
* sync with master
* gofmt
* add changelog_pending entry
* move to improvments
* suggestion to changelog entry
6 years ago |
|
- package v1
-
- import (
- "errors"
- "fmt"
- "sync"
- "time"
-
- "github.com/tendermint/tendermint/libs/log"
- "github.com/tendermint/tendermint/p2p"
- "github.com/tendermint/tendermint/types"
- )
-
- // Blockchain Reactor State
- type bcReactorFSMState struct {
- name string
-
- // called when transitioning out of current state
- handle func(*BcReactorFSM, bReactorEvent, bReactorEventData) (next *bcReactorFSMState, err error)
- // called when entering the state
- enter func(fsm *BcReactorFSM)
-
- // timeout to ensure FSM is not stuck in a state forever
- // the timer is owned and run by the fsm instance
- timeout time.Duration
- }
-
- func (s *bcReactorFSMState) String() string {
- return s.name
- }
-
- // BcReactorFSM is the datastructure for the Blockchain Reactor State Machine
- type BcReactorFSM struct {
- logger log.Logger
- mtx sync.Mutex
-
- startTime time.Time
-
- state *bcReactorFSMState
- stateTimer *time.Timer
- pool *BlockPool
-
- // interface used to call the Blockchain reactor to send StatusRequest, BlockRequest, reporting errors, etc.
- toBcR bcReactor
- }
-
- // NewFSM creates a new reactor FSM.
- func NewFSM(height int64, toBcR bcReactor) *BcReactorFSM {
- return &BcReactorFSM{
- state: unknown,
- startTime: time.Now(),
- pool: NewBlockPool(height, toBcR),
- toBcR: toBcR,
- }
- }
-
- // bReactorEventData is part of the message sent by the reactor to the FSM and used by the state handlers.
- type bReactorEventData struct {
- peerID p2p.ID
- err error // for peer error: timeout, slow; for processed block event if error occurred
- height int64 // for status response; for processed block event
- block *types.Block // for block response
- stateName string // for state timeout events
- length int // for block response event, length of received block, used to detect slow peers
- maxNumRequests int // for request needed event, maximum number of pending requests
- }
-
- // Blockchain Reactor Events (the input to the state machine)
- type bReactorEvent uint
-
- const (
- // message type events
- startFSMEv = iota + 1
- statusResponseEv
- blockResponseEv
- processedBlockEv
- makeRequestsEv
- stopFSMEv
-
- // other events
- peerRemoveEv = iota + 256
- stateTimeoutEv
- )
-
- func (msg *bcReactorMessage) String() string {
- var dataStr string
-
- switch msg.event {
- case startFSMEv:
- dataStr = ""
- case statusResponseEv:
- dataStr = fmt.Sprintf("peer=%v height=%v", msg.data.peerID, msg.data.height)
- case blockResponseEv:
- dataStr = fmt.Sprintf("peer=%v block.height=%v length=%v",
- msg.data.peerID, msg.data.block.Height, msg.data.length)
- case processedBlockEv:
- dataStr = fmt.Sprintf("error=%v", msg.data.err)
- case makeRequestsEv:
- dataStr = ""
- case stopFSMEv:
- dataStr = ""
- case peerRemoveEv:
- dataStr = fmt.Sprintf("peer: %v is being removed by the switch", msg.data.peerID)
- case stateTimeoutEv:
- dataStr = fmt.Sprintf("state=%v", msg.data.stateName)
- default:
- dataStr = fmt.Sprintf("cannot interpret message data")
- }
-
- return fmt.Sprintf("%v: %v", msg.event, dataStr)
- }
-
- func (ev bReactorEvent) String() string {
- switch ev {
- case startFSMEv:
- return "startFSMEv"
- case statusResponseEv:
- return "statusResponseEv"
- case blockResponseEv:
- return "blockResponseEv"
- case processedBlockEv:
- return "processedBlockEv"
- case makeRequestsEv:
- return "makeRequestsEv"
- case stopFSMEv:
- return "stopFSMEv"
- case peerRemoveEv:
- return "peerRemoveEv"
- case stateTimeoutEv:
- return "stateTimeoutEv"
- default:
- return "event unknown"
- }
-
- }
-
- // states
- var (
- unknown *bcReactorFSMState
- waitForPeer *bcReactorFSMState
- waitForBlock *bcReactorFSMState
- finished *bcReactorFSMState
- )
-
- // timeouts for state timers
- const (
- waitForPeerTimeout = 3 * time.Second
- waitForBlockAtCurrentHeightTimeout = 10 * time.Second
- )
-
- // errors
- var (
- // internal to the package
- errNoErrorFinished = errors.New("fast sync is finished")
- errInvalidEvent = errors.New("invalid event in current state")
- errMissingBlock = errors.New("missing blocks")
- errNilPeerForBlockRequest = errors.New("peer for block request does not exist in the switch")
- errSendQueueFull = errors.New("block request not made, send-queue is full")
- errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added")
- errSwitchRemovesPeer = errors.New("switch is removing peer")
- errTimeoutEventWrongState = errors.New("timeout event for a state different than the current one")
- errNoTallerPeer = errors.New("fast sync timed out on waiting for a peer taller than this node")
-
- // reported eventually to the switch
- errPeerLowersItsHeight = errors.New("fast sync peer reports a height lower than previous") // handle return
- errNoPeerResponseForCurrentHeights = errors.New("fast sync timed out on peer block response for current heights") // handle return
- errNoPeerResponse = errors.New("fast sync timed out on peer block response") // xx
- errBadDataFromPeer = errors.New("fast sync received block from wrong peer or block is bad") // xx
- errDuplicateBlock = errors.New("fast sync received duplicate block from peer")
- errBlockVerificationFailure = errors.New("fast sync block verification failure") // xx
- errSlowPeer = errors.New("fast sync peer is not sending us data fast enough") // xx
-
- )
-
- func init() {
- unknown = &bcReactorFSMState{
- name: "unknown",
- handle: func(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) (*bcReactorFSMState, error) {
- switch ev {
- case startFSMEv:
- // Broadcast Status message. Currently doesn't return non-nil error.
- fsm.toBcR.sendStatusRequest()
- return waitForPeer, nil
-
- case stopFSMEv:
- return finished, errNoErrorFinished
-
- default:
- return unknown, errInvalidEvent
- }
- },
- }
-
- waitForPeer = &bcReactorFSMState{
- name: "waitForPeer",
- timeout: waitForPeerTimeout,
- enter: func(fsm *BcReactorFSM) {
- // Stop when leaving the state.
- fsm.resetStateTimer()
- },
- handle: func(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) (*bcReactorFSMState, error) {
- switch ev {
- case stateTimeoutEv:
- if data.stateName != "waitForPeer" {
- fsm.logger.Error("received a state timeout event for different state",
- "state", data.stateName)
- return waitForPeer, errTimeoutEventWrongState
- }
- // There was no statusResponse received from any peer.
- // Should we send status request again?
- return finished, errNoTallerPeer
-
- case statusResponseEv:
- if err := fsm.pool.UpdatePeer(data.peerID, data.height); err != nil {
- if fsm.pool.NumPeers() == 0 {
- return waitForPeer, err
- }
- }
- if fsm.stateTimer != nil {
- fsm.stateTimer.Stop()
- }
- return waitForBlock, nil
-
- case stopFSMEv:
- if fsm.stateTimer != nil {
- fsm.stateTimer.Stop()
- }
- return finished, errNoErrorFinished
-
- default:
- return waitForPeer, errInvalidEvent
- }
- },
- }
-
- waitForBlock = &bcReactorFSMState{
- name: "waitForBlock",
- timeout: waitForBlockAtCurrentHeightTimeout,
- enter: func(fsm *BcReactorFSM) {
- // Stop when leaving the state.
- fsm.resetStateTimer()
- },
- handle: func(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) (*bcReactorFSMState, error) {
- switch ev {
-
- case statusResponseEv:
- err := fsm.pool.UpdatePeer(data.peerID, data.height)
- if fsm.pool.NumPeers() == 0 {
- return waitForPeer, err
- }
- if fsm.pool.ReachedMaxHeight() {
- return finished, err
- }
- return waitForBlock, err
-
- case blockResponseEv:
- fsm.logger.Debug("blockResponseEv", "H", data.block.Height)
- err := fsm.pool.AddBlock(data.peerID, data.block, data.length)
- if err != nil {
- // A block was received that was unsolicited, from unexpected peer, or that we already have it.
- // Ignore block, remove peer and send error to switch.
- fsm.pool.RemovePeer(data.peerID, err)
- fsm.toBcR.sendPeerError(err, data.peerID)
- }
- if fsm.pool.NumPeers() == 0 {
- return waitForPeer, err
- }
- return waitForBlock, err
-
- case processedBlockEv:
- if data.err != nil {
- first, second, _ := fsm.pool.FirstTwoBlocksAndPeers()
- fsm.logger.Error("error processing block", "err", data.err,
- "first", first.block.Height, "second", second.block.Height)
- fsm.logger.Error("send peer error for", "peer", first.peer.ID)
- fsm.toBcR.sendPeerError(data.err, first.peer.ID)
- fsm.logger.Error("send peer error for", "peer", second.peer.ID)
- fsm.toBcR.sendPeerError(data.err, second.peer.ID)
- // Remove the first two blocks. This will also remove the peers
- fsm.pool.InvalidateFirstTwoBlocks(data.err)
- } else {
- fsm.pool.ProcessedCurrentHeightBlock()
- // Since we advanced one block reset the state timer
- fsm.resetStateTimer()
- }
-
- // Both cases above may result in achieving maximum height.
- if fsm.pool.ReachedMaxHeight() {
- return finished, nil
- }
-
- return waitForBlock, data.err
-
- case peerRemoveEv:
- // This event is sent by the switch to remove disconnected and errored peers.
- fsm.pool.RemovePeer(data.peerID, data.err)
- if fsm.pool.NumPeers() == 0 {
- return waitForPeer, nil
- }
- if fsm.pool.ReachedMaxHeight() {
- return finished, nil
- }
- return waitForBlock, nil
-
- case makeRequestsEv:
- fsm.makeNextRequests(data.maxNumRequests)
- return waitForBlock, nil
-
- case stateTimeoutEv:
- if data.stateName != "waitForBlock" {
- fsm.logger.Error("received a state timeout event for different state",
- "state", data.stateName)
- return waitForBlock, errTimeoutEventWrongState
- }
- // We haven't received the block at current height or height+1. Remove peer.
- fsm.pool.RemovePeerAtCurrentHeights(errNoPeerResponseForCurrentHeights)
- fsm.resetStateTimer()
- if fsm.pool.NumPeers() == 0 {
- return waitForPeer, errNoPeerResponseForCurrentHeights
- }
- if fsm.pool.ReachedMaxHeight() {
- return finished, nil
- }
- return waitForBlock, errNoPeerResponseForCurrentHeights
-
- case stopFSMEv:
- if fsm.stateTimer != nil {
- fsm.stateTimer.Stop()
- }
- return finished, errNoErrorFinished
-
- default:
- return waitForBlock, errInvalidEvent
- }
- },
- }
-
- finished = &bcReactorFSMState{
- name: "finished",
- enter: func(fsm *BcReactorFSM) {
- fsm.logger.Info("Time to switch to consensus reactor!", "height", fsm.pool.Height)
- fsm.toBcR.switchToConsensus()
- fsm.cleanup()
- },
- handle: func(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) (*bcReactorFSMState, error) {
- return finished, nil
- },
- }
- }
-
- // Interface used by FSM for sending Block and Status requests,
- // informing of peer errors and state timeouts
- // Implemented by BlockchainReactor and tests
- type bcReactor interface {
- sendStatusRequest()
- sendBlockRequest(peerID p2p.ID, height int64) error
- sendPeerError(err error, peerID p2p.ID)
- resetStateTimer(name string, timer **time.Timer, timeout time.Duration)
- switchToConsensus()
- }
-
- // SetLogger sets the FSM logger.
- func (fsm *BcReactorFSM) SetLogger(l log.Logger) {
- fsm.logger = l
- fsm.pool.SetLogger(l)
- }
-
- // Start starts the FSM.
- func (fsm *BcReactorFSM) Start() {
- _ = fsm.Handle(&bcReactorMessage{event: startFSMEv})
- }
-
- // Handle processes messages and events sent to the FSM.
- func (fsm *BcReactorFSM) Handle(msg *bcReactorMessage) error {
- fsm.mtx.Lock()
- defer fsm.mtx.Unlock()
- fsm.logger.Debug("FSM received", "event", msg, "state", fsm.state)
-
- if fsm.state == nil {
- fsm.state = unknown
- }
- next, err := fsm.state.handle(fsm, msg.event, msg.data)
- if err != nil {
- fsm.logger.Error("FSM event handler returned", "err", err,
- "state", fsm.state, "event", msg.event)
- }
-
- oldState := fsm.state.name
- fsm.transition(next)
- if oldState != fsm.state.name {
- fsm.logger.Info("FSM changed state", "new_state", fsm.state)
- }
- return err
- }
-
- func (fsm *BcReactorFSM) transition(next *bcReactorFSMState) {
- if next == nil {
- return
- }
- if fsm.state != next {
- fsm.state = next
- if next.enter != nil {
- next.enter(fsm)
- }
- }
- }
-
- // Called when entering an FSM state in order to detect lack of progress in the state machine.
- // Note the use of the 'bcr' interface to facilitate testing without timer expiring.
- func (fsm *BcReactorFSM) resetStateTimer() {
- fsm.toBcR.resetStateTimer(fsm.state.name, &fsm.stateTimer, fsm.state.timeout)
- }
-
- func (fsm *BcReactorFSM) isCaughtUp() bool {
- return fsm.state == finished
- }
-
- func (fsm *BcReactorFSM) makeNextRequests(maxNumRequests int) {
- fsm.pool.MakeNextRequests(maxNumRequests)
- }
-
- func (fsm *BcReactorFSM) cleanup() {
- fsm.pool.Cleanup()
- }
-
- // NeedsBlocks checks if more block requests are required.
- func (fsm *BcReactorFSM) NeedsBlocks() bool {
- fsm.mtx.Lock()
- defer fsm.mtx.Unlock()
- return fsm.state.name == "waitForBlock" && fsm.pool.NeedsBlocks()
- }
-
- // FirstTwoBlocks returns the two blocks at pool height and height+1
- func (fsm *BcReactorFSM) FirstTwoBlocks() (first, second *types.Block, err error) {
- fsm.mtx.Lock()
- defer fsm.mtx.Unlock()
- firstBP, secondBP, err := fsm.pool.FirstTwoBlocksAndPeers()
- if err == nil {
- first = firstBP.block
- second = secondBP.block
- }
- return
- }
-
- // Status returns the pool's height and the maximum peer height.
- func (fsm *BcReactorFSM) Status() (height, maxPeerHeight int64) {
- fsm.mtx.Lock()
- defer fsm.mtx.Unlock()
- return fsm.pool.Height, fsm.pool.MaxPeerHeight
- }
|