package v2 import ( "fmt" "github.com/tendermint/tendermint/p2p" tmState "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) // Events generated by the processor: // block execution failure, event will indicate the peer(s) that caused the error type pcBlockVerificationFailure struct { priorityNormal height int64 firstPeerID p2p.ID secondPeerID p2p.ID } // successful block execution type pcBlockProcessed struct { priorityNormal height int64 peerID p2p.ID } // processor has finished type pcFinished struct { priorityNormal blocksSynced int tmState tmState.State } func (p pcFinished) Error() string { return "finished" } type queueItem struct { block *types.Block peerID p2p.ID } type blockQueue map[int64]queueItem type pcState struct { // blocks waiting to be processed queue blockQueue // draining indicates that the next rProcessBlock event with a queue miss constitutes completion draining bool // the number of blocks successfully synced by the processor blocksSynced int // the processorContext which contains the processor dependencies context processorContext } func (state *pcState) String() string { return fmt.Sprintf("height: %d queue length: %d draining: %v blocks synced: %d", state.height(), len(state.queue), state.draining, state.blocksSynced) } // newPcState returns a pcState initialized with the last verified block enqueued func newPcState(context processorContext) *pcState { return &pcState{ queue: blockQueue{}, draining: false, blocksSynced: 0, context: context, } } // nextTwo returns the next two unverified blocks func (state *pcState) nextTwo() (queueItem, queueItem, error) { if first, ok := state.queue[state.height()+1]; ok { if second, ok := state.queue[state.height()+2]; ok { return first, second, nil } } return queueItem{}, queueItem{}, fmt.Errorf("not found") } // synced returns true when at most the last verified block remains in the queue func (state *pcState) synced() bool { return len(state.queue) <= 1 } func (state *pcState) enqueue(peerID p2p.ID, block *types.Block, height int64) { if _, ok := state.queue[height]; ok { panic("duplicate block enqueued by processor") } state.queue[height] = queueItem{block: block, peerID: peerID} } func (state *pcState) height() int64 { return state.context.tmState().LastBlockHeight } // purgePeer moves all unprocessed blocks from the queue func (state *pcState) purgePeer(peerID p2p.ID) { // what if height is less than state.height? for height, item := range state.queue { if item.peerID == peerID { delete(state.queue, height) } } } // handle processes FSM events func (state *pcState) handle(event Event) (Event, error) { switch event := event.(type) { case bcResetState: state.context.setState(event.state) return noOp, nil case scFinishedEv: if state.synced() { return pcFinished{tmState: state.context.tmState(), blocksSynced: state.blocksSynced}, nil } state.draining = true return noOp, nil case scPeerError: state.purgePeer(event.peerID) return noOp, nil case scBlockReceived: if event.block == nil { return noOp, nil } // enqueue block if height is higher than state height, else ignore it if event.block.Height > state.height() { state.enqueue(event.peerID, event.block, event.block.Height) } return noOp, nil case rProcessBlock: tmState := state.context.tmState() firstItem, secondItem, err := state.nextTwo() if err != nil { if state.draining { return pcFinished{tmState: tmState, blocksSynced: state.blocksSynced}, nil } return noOp, nil } first, second := firstItem.block, secondItem.block firstParts := first.MakePartSet(types.BlockPartSizeBytes) firstPartSetHeader := firstParts.Header() firstID := types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader} err = state.context.verifyCommit(tmState.ChainID, firstID, first.Height, second.LastCommit) if err != nil { state.purgePeer(firstItem.peerID) state.purgePeer(secondItem.peerID) return pcBlockVerificationFailure{ height: first.Height, firstPeerID: firstItem.peerID, secondPeerID: secondItem.peerID}, nil } state.context.saveBlock(first, firstParts, second.LastCommit) if err := state.context.applyBlock(firstID, first); err != nil { panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } delete(state.queue, first.Height) state.blocksSynced++ return pcBlockProcessed{height: first.Height, peerID: firstItem.peerID}, nil } return noOp, nil }