package v2 import ( "fmt" "github.com/tendermint/tendermint/p2p" tmState "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) // Events generated by the processor: // block execution failure, event will indicate the peer(s) that caused the error type pcBlockVerificationFailure struct { priorityNormal height int64 firstPeerID p2p.NodeID secondPeerID p2p.NodeID } func (e pcBlockVerificationFailure) String() string { return fmt.Sprintf("pcBlockVerificationFailure{%d 1st peer: %v, 2nd peer: %v}", e.height, e.firstPeerID, e.secondPeerID) } // successful block execution type pcBlockProcessed struct { priorityNormal height int64 peerID p2p.NodeID } func (e pcBlockProcessed) String() string { return fmt.Sprintf("pcBlockProcessed{%d peer: %v}", e.height, e.peerID) } // processor has finished type pcFinished struct { priorityNormal blocksSynced int tmState tmState.State } func (p pcFinished) Error() string { return "finished" } type queueItem struct { block *types.Block peerID p2p.NodeID } type blockQueue map[int64]queueItem type pcState struct { // blocks waiting to be processed queue blockQueue // draining indicates that the next rProcessBlock event with a queue miss constitutes completion draining bool // the number of blocks successfully synced by the processor blocksSynced int // the processorContext which contains the processor dependencies context processorContext } func (state *pcState) String() string { return fmt.Sprintf("height: %d queue length: %d draining: %v blocks synced: %d", state.height(), len(state.queue), state.draining, state.blocksSynced) } // newPcState returns a pcState initialized with the last verified block enqueued func newPcState(context processorContext) *pcState { return &pcState{ queue: blockQueue{}, draining: false, blocksSynced: 0, context: context, } } // nextTwo returns the next two unverified blocks func (state *pcState) nextTwo() (queueItem, queueItem, error) { if first, ok := state.queue[state.height()+1]; ok { if second, ok := state.queue[state.height()+2]; ok { return first, second, nil } } return queueItem{}, queueItem{}, fmt.Errorf("not found") } // synced returns true when at most the last verified block remains in the queue func (state *pcState) synced() bool { return len(state.queue) <= 1 } func (state *pcState) enqueue(peerID p2p.NodeID, block *types.Block, height int64) { if item, ok := state.queue[height]; ok { panic(fmt.Sprintf( "duplicate block %d (%X) enqueued by processor (sent by %v; existing block %X from %v)", height, block.Hash(), peerID, item.block.Hash(), item.peerID)) } state.queue[height] = queueItem{block: block, peerID: peerID} } func (state *pcState) height() int64 { return state.context.tmState().LastBlockHeight } // purgePeer moves all unprocessed blocks from the queue func (state *pcState) purgePeer(peerID p2p.NodeID) { // what if height is less than state.height? for height, item := range state.queue { if item.peerID == peerID { delete(state.queue, height) } } } // handle processes FSM events func (state *pcState) handle(event Event) (Event, error) { switch event := event.(type) { case bcResetState: state.context.setState(event.state) return noOp, nil case scFinishedEv: if state.synced() { return pcFinished{tmState: state.context.tmState(), blocksSynced: state.blocksSynced}, nil } state.draining = true return noOp, nil case scPeerError: state.purgePeer(event.peerID) return noOp, nil case scBlockReceived: if event.block == nil { return noOp, nil } // enqueue block if height is higher than state height, else ignore it if event.block.Height > state.height() { state.enqueue(event.peerID, event.block, event.block.Height) } return noOp, nil case rProcessBlock: tmState := state.context.tmState() firstItem, secondItem, err := state.nextTwo() if err != nil { if state.draining { return pcFinished{tmState: tmState, blocksSynced: state.blocksSynced}, nil } return noOp, nil } var ( first, second = firstItem.block, secondItem.block firstParts = first.MakePartSet(types.BlockPartSizeBytes) firstID = types.BlockID{Hash: first.Hash(), PartSetHeader: firstParts.Header()} ) // verify if +second+ last commit "confirms" +first+ block err = state.context.verifyCommit(tmState.ChainID, firstID, first.Height, second.LastCommit) if err != nil { state.purgePeer(firstItem.peerID) if firstItem.peerID != secondItem.peerID { state.purgePeer(secondItem.peerID) } return pcBlockVerificationFailure{ height: first.Height, firstPeerID: firstItem.peerID, secondPeerID: secondItem.peerID}, nil } state.context.saveBlock(first, firstParts, second.LastCommit) if err := state.context.applyBlock(firstID, first); err != nil { panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } delete(state.queue, first.Height) state.blocksSynced++ return pcBlockProcessed{height: first.Height, peerID: firstItem.peerID}, nil } return noOp, nil }