diff --git a/behaviour/reporter.go b/behaviour/reporter.go index 96ce32994..1f16b9bb3 100644 --- a/behaviour/reporter.go +++ b/behaviour/reporter.go @@ -19,7 +19,7 @@ type SwitchReporter struct { } // NewSwitchReporter return a new SwitchReporter instance which wraps the Switch. -func NewSwitcReporter(sw *p2p.Switch) *SwitchReporter { +func NewSwitchReporter(sw *p2p.Switch) *SwitchReporter { return &SwitchReporter{ sw: sw, } diff --git a/blockchain/v1/reactor.go b/blockchain/v1/reactor.go index b57901789..1aba26b35 100644 --- a/blockchain/v1/reactor.go +++ b/blockchain/v1/reactor.go @@ -103,7 +103,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st fsm := NewFSM(startHeight, bcR) bcR.fsm = fsm bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) - //bcR.swReporter = behaviour.NewSwitcReporter(bcR.BaseReactor.Switch) + //bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch) return bcR } @@ -141,7 +141,7 @@ func (bcR *BlockchainReactor) SetLogger(l log.Logger) { // OnStart implements service.Service. func (bcR *BlockchainReactor) OnStart() error { - bcR.swReporter = behaviour.NewSwitcReporter(bcR.BaseReactor.Switch) + bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch) if bcR.fastSync { go bcR.poolRoutine() } diff --git a/blockchain/v2/codec.go b/blockchain/v2/codec.go new file mode 100644 index 000000000..f970d115f --- /dev/null +++ b/blockchain/v2/codec.go @@ -0,0 +1,13 @@ +package v2 + +import ( + amino "github.com/tendermint/go-amino" + "github.com/tendermint/tendermint/types" +) + +var cdc = amino.NewCodec() + +func init() { + RegisterBlockchainMessages(cdc) + types.RegisterBlockAmino(cdc) +} diff --git a/blockchain/v2/io.go b/blockchain/v2/io.go new file mode 100644 index 000000000..3db48c8c0 --- /dev/null +++ b/blockchain/v2/io.go @@ -0,0 +1,111 @@ +package v2 + +import ( + "fmt" + + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" +) + +type iIO interface { + sendBlockRequest(peerID p2p.ID, height int64) error + sendBlockToPeer(block *types.Block, peerID p2p.ID) error + sendBlockNotFound(height int64, peerID p2p.ID) error + sendStatusResponse(height int64, peerID p2p.ID) error + + broadcastStatusRequest(height int64) + + trySwitchToConsensus(state state.State, blocksSynced int) +} + +type switchIO struct { + sw *p2p.Switch +} + +func newSwitchIo(sw *p2p.Switch) *switchIO { + return &switchIO{ + sw: sw, + } +} + +const ( + // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) + BlockchainChannel = byte(0x40) +) + +type consensusReactor interface { + // for when we switch from blockchain reactor and fast sync to + // the consensus machine + SwitchToConsensus(state.State, int) +} + +func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height int64) error { + peer := sio.sw.Peers().Get(peerID) + if peer == nil { + return fmt.Errorf("peer not found") + } + + msgBytes := cdc.MustMarshalBinaryBare(&bcBlockRequestMessage{Height: height}) + queued := peer.TrySend(BlockchainChannel, msgBytes) + if !queued { + return fmt.Errorf("send queue full") + } + return nil +} + +func (sio *switchIO) sendStatusResponse(height int64, peerID p2p.ID) error { + peer := sio.sw.Peers().Get(peerID) + if peer == nil { + return fmt.Errorf("peer not found") + } + msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{Height: height}) + + if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued { + return fmt.Errorf("peer queue full") + } + + return nil +} + +func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error { + peer := sio.sw.Peers().Get(peerID) + if peer == nil { + return fmt.Errorf("peer not found") + } + if block == nil { + panic("trying to send nil block") + } + msgBytes := cdc.MustMarshalBinaryBare(&bcBlockResponseMessage{Block: block}) + if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued { + return fmt.Errorf("peer queue full") + } + + return nil +} + +func (sio *switchIO) sendBlockNotFound(height int64, peerID p2p.ID) error { + peer := sio.sw.Peers().Get(peerID) + if peer == nil { + return fmt.Errorf("peer not found") + } + msgBytes := cdc.MustMarshalBinaryBare(&bcNoBlockResponseMessage{Height: height}) + if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued { + return fmt.Errorf("peer queue full") + } + + return nil +} + +func (sio *switchIO) trySwitchToConsensus(state state.State, blocksSynced int) { + conR, ok := sio.sw.Reactor("CONSENSUS").(consensusReactor) + if ok { + conR.SwitchToConsensus(state, blocksSynced) + } +} + +func (sio *switchIO) broadcastStatusRequest(height int64) { + msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{height}) + // XXX: maybe we should use an io specific peer list here + sio.sw.Broadcast(BlockchainChannel, msgBytes) +} diff --git a/blockchain/v2/metrics.go b/blockchain/v2/metrics.go index d865e7360..c68ec6447 100644 --- a/blockchain/v2/metrics.go +++ b/blockchain/v2/metrics.go @@ -37,6 +37,7 @@ type Metrics struct { ErrorsShed metrics.Counter } +// PrometheusMetrics returns metrics for in and out events, errors, etc. handled by routines. // Can we burn in the routine name here? func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { labels := []string{} diff --git a/blockchain/v2/processor.go b/blockchain/v2/processor.go index e33b36058..d6a2fe1e8 100644 --- a/blockchain/v2/processor.go +++ b/blockchain/v2/processor.go @@ -4,23 +4,12 @@ import ( "fmt" "github.com/tendermint/tendermint/p2p" - tdState "github.com/tendermint/tendermint/state" + tmState "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) -type peerError struct { - priorityHigh - peerID p2p.ID -} - -type pcDuplicateBlock struct { - priorityNormal -} - -type pcShortBlock struct { - priorityNormal -} - +// Events generated by the processor: +// block execution failure, event will indicate the peer(s) that caused the error type pcBlockVerificationFailure struct { priorityNormal height int64 @@ -28,24 +17,18 @@ type pcBlockVerificationFailure struct { secondPeerID p2p.ID } +// successful block execution type pcBlockProcessed struct { priorityNormal height int64 peerID p2p.ID } -type pcProcessBlock struct { - priorityNormal -} - -type pcStop struct { - priorityNormal -} - +// processor has finished type pcFinished struct { priorityNormal - height int64 - blocksSynced int64 + blocksSynced int + tmState tmState.State } func (p pcFinished) Error() string { @@ -60,37 +43,38 @@ type queueItem struct { type blockQueue map[int64]queueItem type pcState struct { - height int64 // height of the last synced block - queue blockQueue // blocks waiting to be processed - chainID string - blocksSynced int64 - draining bool - tdState tdState.State - context processorContext + // blocks waiting to be processed + queue blockQueue + + // draining indicates that the next rProcessBlock event with a queue miss constitutes completion + draining bool + + // the number of blocks successfully synced by the processor + blocksSynced int + + // the processorContext which contains the processor dependencies + context processorContext } func (state *pcState) String() string { return fmt.Sprintf("height: %d queue length: %d draining: %v blocks synced: %d", - state.height, len(state.queue), state.draining, state.blocksSynced) + state.height(), len(state.queue), state.draining, state.blocksSynced) } // newPcState returns a pcState initialized with the last verified block enqueued -func newPcState(initHeight int64, tdState tdState.State, chainID string, context processorContext) *pcState { +func newPcState(context processorContext) *pcState { return &pcState{ - height: initHeight, queue: blockQueue{}, - chainID: chainID, draining: false, blocksSynced: 0, context: context, - tdState: tdState, } } // nextTwo returns the next two unverified blocks func (state *pcState) nextTwo() (queueItem, queueItem, error) { - if first, ok := state.queue[state.height+1]; ok { - if second, ok := state.queue[state.height+2]; ok { + if first, ok := state.queue[state.height()+1]; ok { + if second, ok := state.queue[state.height()+2]; ok { return first, second, nil } } @@ -102,18 +86,15 @@ func (state *pcState) synced() bool { return len(state.queue) <= 1 } -func (state *pcState) advance() { - state.height++ - delete(state.queue, state.height) - state.blocksSynced++ -} - -func (state *pcState) enqueue(peerID p2p.ID, block *types.Block, height int64) error { +func (state *pcState) enqueue(peerID p2p.ID, block *types.Block, height int64) { if _, ok := state.queue[height]; ok { - return fmt.Errorf("duplicate queue item") + panic("duplicate block enqueued by processor") } state.queue[height] = queueItem{block: block, peerID: peerID} - return nil +} + +func (state *pcState) height() int64 { + return state.context.tmState().LastBlockHeight } // purgePeer moves all unprocessed blocks from the queue @@ -129,23 +110,34 @@ func (state *pcState) purgePeer(peerID p2p.ID) { // handle processes FSM events func (state *pcState) handle(event Event) (Event, error) { switch event := event.(type) { - case *scBlockReceived: - if event.block == nil { - panic("processor received an event with a nil block") + case scFinishedEv: + if state.synced() { + return pcFinished{tmState: state.context.tmState(), blocksSynced: state.blocksSynced}, nil } - if event.block.Height <= state.height { - return pcShortBlock{}, nil + state.draining = true + return noOp, nil + + case scPeerError: + state.purgePeer(event.peerID) + return noOp, nil + + case scBlockReceived: + if event.block == nil { + return noOp, nil } - err := state.enqueue(event.peerID, event.block, event.block.Height) - if err != nil { - return pcDuplicateBlock{}, nil + + // enqueue block if height is higher than state height, else ignore it + if event.block.Height > state.height() { + state.enqueue(event.peerID, event.block, event.block.Height) } + return noOp, nil - case pcProcessBlock: + case rProcessBlock: + tmState := state.context.tmState() firstItem, secondItem, err := state.nextTwo() if err != nil { if state.draining { - return noOp, pcFinished{height: state.height} + return pcFinished{tmState: tmState, blocksSynced: state.blocksSynced}, nil } return noOp, nil } @@ -155,7 +147,7 @@ func (state *pcState) handle(event Event) (Event, error) { firstPartsHeader := firstParts.Header() firstID := types.BlockID{Hash: first.Hash(), PartsHeader: firstPartsHeader} - err = state.context.verifyCommit(state.chainID, firstID, first.Height, second.LastCommit) + err = state.context.verifyCommit(tmState.ChainID, firstID, first.Height, second.LastCommit) if err != nil { state.purgePeer(firstItem.peerID) state.purgePeer(secondItem.peerID) @@ -166,21 +158,15 @@ func (state *pcState) handle(event Event) (Event, error) { state.context.saveBlock(first, firstParts, second.LastCommit) - state.tdState, err = state.context.applyBlock(state.tdState, firstID, first) - if err != nil { + if err := state.context.applyBlock(firstID, first); err != nil { panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } - state.advance() - return pcBlockProcessed{height: first.Height, peerID: firstItem.peerID}, nil - case *peerError: - state.purgePeer(event.peerID) + delete(state.queue, first.Height) + state.blocksSynced++ + + return pcBlockProcessed{height: first.Height, peerID: firstItem.peerID}, nil - case pcStop: - if state.synced() { - return noOp, pcFinished{height: state.height, blocksSynced: state.blocksSynced} - } - state.draining = true } return noOp, nil diff --git a/blockchain/v2/processor_context.go b/blockchain/v2/processor_context.go index c4c8770cd..7e96a3a69 100644 --- a/blockchain/v2/processor_context.go +++ b/blockchain/v2/processor_context.go @@ -4,37 +4,41 @@ import ( "fmt" "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" ) type processorContext interface { - applyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, error) + applyBlock(blockID types.BlockID, block *types.Block) error verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) + tmState() state.State } -// nolint:unused type pContext struct { - store *store.BlockStore - executor *state.BlockExecutor - state *state.State + store blockStore + applier blockApplier + state state.State } -// nolint:unused,deadcode -func newProcessorContext(st *store.BlockStore, ex *state.BlockExecutor, s *state.State) *pContext { +func newProcessorContext(st blockStore, ex blockApplier, s state.State) *pContext { return &pContext{ - store: st, - executor: ex, - state: s, + store: st, + applier: ex, + state: s, } } -func (pc *pContext) applyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, error) { - return pc.executor.ApplyBlock(state, blockID, block) +func (pc *pContext) applyBlock(blockID types.BlockID, block *types.Block) error { + newState, err := pc.applier.ApplyBlock(pc.state, blockID, block) + pc.state = newState + return err } -func (pc *pContext) verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error { +func (pc pContext) tmState() state.State { + return pc.state +} + +func (pc pContext) verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error { return pc.state.Validators.VerifyCommit(chainID, blockID, height, commit) } @@ -45,22 +49,28 @@ func (pc *pContext) saveBlock(block *types.Block, blockParts *types.PartSet, see type mockPContext struct { applicationBL []int64 verificationBL []int64 + state state.State } -func newMockProcessorContext(verificationBlackList []int64, applicationBlackList []int64) *mockPContext { +func newMockProcessorContext( + state state.State, + verificationBlackList []int64, + applicationBlackList []int64) *mockPContext { return &mockPContext{ applicationBL: applicationBlackList, verificationBL: verificationBlackList, + state: state, } } -func (mpc *mockPContext) applyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, error) { +func (mpc *mockPContext) applyBlock(blockID types.BlockID, block *types.Block) error { for _, h := range mpc.applicationBL { if h == block.Height { - return state, fmt.Errorf("generic application error") + return fmt.Errorf("generic application error") } } - return state, nil + mpc.state.LastBlockHeight = block.Height + return nil } func (mpc *mockPContext) verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error { @@ -73,4 +83,9 @@ func (mpc *mockPContext) verifyCommit(chainID string, blockID types.BlockID, hei } func (mpc *mockPContext) saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { + +} + +func (mpc *mockPContext) tmState() state.State { + return mpc.state } diff --git a/blockchain/v2/processor_test.go b/blockchain/v2/processor_test.go index 61be23663..fc35c4c72 100644 --- a/blockchain/v2/processor_test.go +++ b/blockchain/v2/processor_test.go @@ -5,7 +5,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/tendermint/tendermint/p2p" - tdState "github.com/tendermint/tendermint/state" + tmState "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -19,7 +19,7 @@ type pcBlock struct { type params struct { height int64 items []pcBlock - blocksSynced int64 + blocksSynced int verBL []int64 appBL []int64 draining bool @@ -33,13 +33,13 @@ func makePcBlock(height int64) *types.Block { // makeState takes test parameters and creates a specific processor state. func makeState(p *params) *pcState { var ( - tdState = tdState.State{} - context = newMockProcessorContext(p.verBL, p.appBL) + tmState = tmState.State{LastBlockHeight: p.height} + context = newMockProcessorContext(tmState, p.verBL, p.appBL) ) - state := newPcState(p.height, tdState, "test", context) + state := newPcState(context) for _, item := range p.items { - _ = state.enqueue(p2p.ID(item.pid), makePcBlock(item.height), item.height) + state.enqueue(p2p.ID(item.pid), makePcBlock(item.height), item.height) } state.blocksSynced = p.blocksSynced @@ -47,8 +47,8 @@ func makeState(p *params) *pcState { return state } -func mBlockResponse(peerID p2p.ID, height int64) *scBlockReceived { - return &scBlockReceived{ +func mBlockResponse(peerID p2p.ID, height int64) scBlockReceived { + return scBlockReceived{ peerID: peerID, block: makePcBlock(height), } @@ -101,72 +101,57 @@ func executeProcessorTests(t *testing.T, tests []testFields) { } } -func TestPcBlockResponse(t *testing.T) { +func TestRProcessPeerError(t *testing.T) { tests := []testFields{ - { - name: "add one block", + name: "error for existing peer", steps: []pcFsmMakeStateValues{ { - currentState: ¶ms{}, event: mBlockResponse("P1", 1), - wantState: ¶ms{items: []pcBlock{{"P1", 1}}}, wantNextEvent: noOp, + currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, + event: scPeerError{peerID: "P2"}, + wantState: ¶ms{items: []pcBlock{{"P1", 1}}}, + wantNextEvent: noOp, }, }, }, { - name: "add two blocks", + name: "error for unknown peer", steps: []pcFsmMakeStateValues{ { - currentState: ¶ms{}, event: mBlockResponse("P1", 3), - wantState: ¶ms{items: []pcBlock{{"P1", 3}}}, wantNextEvent: noOp, - }, - { // use previous wantState as currentState, - event: mBlockResponse("P1", 4), - wantState: ¶ms{items: []pcBlock{{"P1", 3}, {"P1", 4}}}, wantNextEvent: noOp, + currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, + event: scPeerError{peerID: "P3"}, + wantState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, + wantNextEvent: noOp, }, }, }, + } + + executeProcessorTests(t, tests) +} + +func TestPcBlockResponse(t *testing.T) { + tests := []testFields{ { - name: "add duplicate block from same peer", + name: "add one block", steps: []pcFsmMakeStateValues{ { - currentState: ¶ms{}, event: mBlockResponse("P1", 3), - wantState: ¶ms{items: []pcBlock{{"P1", 3}}}, wantNextEvent: noOp, - }, - { // use previous wantState as currentState, - event: mBlockResponse("P1", 3), - wantState: ¶ms{items: []pcBlock{{"P1", 3}}}, wantNextEvent: pcDuplicateBlock{}, + currentState: ¶ms{}, event: mBlockResponse("P1", 1), + wantState: ¶ms{items: []pcBlock{{"P1", 1}}}, wantNextEvent: noOp, }, }, }, + { - name: "add duplicate block from different peer", + name: "add two blocks", steps: []pcFsmMakeStateValues{ { currentState: ¶ms{}, event: mBlockResponse("P1", 3), wantState: ¶ms{items: []pcBlock{{"P1", 3}}}, wantNextEvent: noOp, }, { // use previous wantState as currentState, - event: mBlockResponse("P2", 3), - wantState: ¶ms{items: []pcBlock{{"P1", 3}}}, wantNextEvent: pcDuplicateBlock{}, - }, - }, - }, - { - name: "attempt to add block with height equal to state.height", - steps: []pcFsmMakeStateValues{ - { - currentState: ¶ms{height: 2, items: []pcBlock{{"P1", 3}}}, event: mBlockResponse("P1", 2), - wantState: ¶ms{height: 2, items: []pcBlock{{"P1", 3}}}, wantNextEvent: pcShortBlock{}, - }, - }, - }, - { - name: "attempt to add block with height smaller than state.height", - steps: []pcFsmMakeStateValues{ - { - currentState: ¶ms{height: 2, items: []pcBlock{{"P1", 3}}}, event: mBlockResponse("P1", 1), - wantState: ¶ms{height: 2, items: []pcBlock{{"P1", 3}}}, wantNextEvent: pcShortBlock{}, + event: mBlockResponse("P1", 4), + wantState: ¶ms{items: []pcBlock{{"P1", 3}, {"P1", 4}}}, wantNextEvent: noOp, }, }, }, @@ -175,13 +160,13 @@ func TestPcBlockResponse(t *testing.T) { executeProcessorTests(t, tests) } -func TestPcProcessBlockSuccess(t *testing.T) { +func TestRProcessBlockSuccess(t *testing.T) { tests := []testFields{ { name: "noop - no blocks over current height", steps: []pcFsmMakeStateValues{ { - currentState: ¶ms{}, event: pcProcessBlock{}, + currentState: ¶ms{}, event: rProcessBlock{}, wantState: ¶ms{}, wantNextEvent: noOp, }, }, @@ -190,7 +175,7 @@ func TestPcProcessBlockSuccess(t *testing.T) { name: "noop - high new blocks", steps: []pcFsmMakeStateValues{ { - currentState: ¶ms{height: 5, items: []pcBlock{{"P1", 30}, {"P2", 31}}}, event: pcProcessBlock{}, + currentState: ¶ms{height: 5, items: []pcBlock{{"P1", 30}, {"P2", 31}}}, event: rProcessBlock{}, wantState: ¶ms{height: 5, items: []pcBlock{{"P1", 30}, {"P2", 31}}}, wantNextEvent: noOp, }, }, @@ -199,7 +184,7 @@ func TestPcProcessBlockSuccess(t *testing.T) { name: "blocks H+1 and H+2 present", steps: []pcFsmMakeStateValues{ { - currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, event: pcProcessBlock{}, + currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, event: rProcessBlock{}, wantState: ¶ms{height: 1, items: []pcBlock{{"P2", 2}}, blocksSynced: 1}, wantNextEvent: pcBlockProcessed{height: 1, peerID: "P1"}, }, @@ -209,20 +194,20 @@ func TestPcProcessBlockSuccess(t *testing.T) { name: "blocks H+1 and H+2 present after draining", steps: []pcFsmMakeStateValues{ { // some contiguous blocks - on stop check draining is set - currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}, {"P1", 4}}}, event: pcStop{}, + currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}, {"P1", 4}}}, + event: scFinishedEv{}, wantState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}, {"P1", 4}}, draining: true}, wantNextEvent: noOp, }, { - event: pcProcessBlock{}, + event: rProcessBlock{}, wantState: ¶ms{height: 1, items: []pcBlock{{"P2", 2}, {"P1", 4}}, blocksSynced: 1, draining: true}, wantNextEvent: pcBlockProcessed{height: 1, peerID: "P1"}, }, { // finish when H+1 or/and H+2 are missing - event: pcProcessBlock{}, + event: rProcessBlock{}, wantState: ¶ms{height: 1, items: []pcBlock{{"P2", 2}, {"P1", 4}}, blocksSynced: 1, draining: true}, - wantNextEvent: noOp, - wantErr: pcFinished{height: 1}, + wantNextEvent: pcFinished{tmState: tmState.State{LastBlockHeight: 1}, blocksSynced: 1}, }, }, }, @@ -231,13 +216,13 @@ func TestPcProcessBlockSuccess(t *testing.T) { executeProcessorTests(t, tests) } -func TestPcProcessBlockFailures(t *testing.T) { +func TestRProcessBlockFailures(t *testing.T) { tests := []testFields{ { name: "blocks H+1 and H+2 present from different peers - H+1 verification fails ", steps: []pcFsmMakeStateValues{ { - currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, verBL: []int64{1}}, event: pcProcessBlock{}, + currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, verBL: []int64{1}}, event: rProcessBlock{}, wantState: ¶ms{items: []pcBlock{}, verBL: []int64{1}}, wantNextEvent: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P2"}, }, @@ -247,7 +232,7 @@ func TestPcProcessBlockFailures(t *testing.T) { name: "blocks H+1 and H+2 present from same peer - H+1 applyBlock fails ", steps: []pcFsmMakeStateValues{ { - currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, appBL: []int64{1}}, event: pcProcessBlock{}, + currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, appBL: []int64{1}}, event: rProcessBlock{}, wantState: ¶ms{items: []pcBlock{}, appBL: []int64{1}}, wantPanic: true, }, }, @@ -256,9 +241,9 @@ func TestPcProcessBlockFailures(t *testing.T) { name: "blocks H+1 and H+2 present from same peers - H+1 verification fails ", steps: []pcFsmMakeStateValues{ { - currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P1", 2}, {"P2", 3}}, verBL: []int64{1}}, - event: pcProcessBlock{}, - wantState: ¶ms{items: []pcBlock{{"P2", 3}}, verBL: []int64{1}}, + currentState: ¶ms{height: 0, items: []pcBlock{{"P1", 1}, {"P1", 2}, {"P2", 3}}, + verBL: []int64{1}}, event: rProcessBlock{}, + wantState: ¶ms{height: 0, items: []pcBlock{{"P2", 3}}, verBL: []int64{1}}, wantNextEvent: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P1"}, }, }, @@ -268,7 +253,7 @@ func TestPcProcessBlockFailures(t *testing.T) { steps: []pcFsmMakeStateValues{ { currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}, {"P2", 3}}, appBL: []int64{1}}, - event: pcProcessBlock{}, + event: rProcessBlock{}, wantState: ¶ms{items: []pcBlock{{"P2", 3}}, appBL: []int64{1}}, wantPanic: true, }, }, @@ -278,53 +263,15 @@ func TestPcProcessBlockFailures(t *testing.T) { executeProcessorTests(t, tests) } -func TestPcPeerError(t *testing.T) { - tests := []testFields{ - { - name: "peer not present", - steps: []pcFsmMakeStateValues{ - { - currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, event: &peerError{peerID: "P3"}, - wantState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, - wantNextEvent: noOp, - }, - }, - }, - { - name: "some blocks are from errored peer", - steps: []pcFsmMakeStateValues{ - { - currentState: ¶ms{items: []pcBlock{{"P1", 100}, {"P1", 99}, {"P2", 101}}}, event: &peerError{peerID: "P1"}, - wantState: ¶ms{items: []pcBlock{{"P2", 101}}}, - wantNextEvent: noOp, - }, - }, - }, - { - name: "all blocks are from errored peer", - steps: []pcFsmMakeStateValues{ - { - currentState: ¶ms{items: []pcBlock{{"P1", 100}, {"P1", 99}}}, event: &peerError{peerID: "P1"}, - wantState: ¶ms{}, - wantNextEvent: noOp, - }, - }, - }, - } - - executeProcessorTests(t, tests) -} - -func TestStop(t *testing.T) { +func TestScFinishedEv(t *testing.T) { tests := []testFields{ { name: "no blocks", steps: []pcFsmMakeStateValues{ { - currentState: ¶ms{height: 100, items: []pcBlock{}, blocksSynced: 100}, event: pcStop{}, + currentState: ¶ms{height: 100, items: []pcBlock{}, blocksSynced: 100}, event: scFinishedEv{}, wantState: ¶ms{height: 100, items: []pcBlock{}, blocksSynced: 100}, - wantNextEvent: noOp, - wantErr: pcFinished{height: 100, blocksSynced: 100}, + wantNextEvent: pcFinished{tmState: tmState.State{LastBlockHeight: 100}, blocksSynced: 100}, }, }, }, @@ -332,10 +279,10 @@ func TestStop(t *testing.T) { name: "maxHeight+1 block present", steps: []pcFsmMakeStateValues{ { - currentState: ¶ms{height: 100, items: []pcBlock{{"P1", 101}}, blocksSynced: 100}, event: pcStop{}, + currentState: ¶ms{height: 100, items: []pcBlock{ + {"P1", 101}}, blocksSynced: 100}, event: scFinishedEv{}, wantState: ¶ms{height: 100, items: []pcBlock{{"P1", 101}}, blocksSynced: 100}, - wantNextEvent: noOp, - wantErr: pcFinished{height: 100, blocksSynced: 100}, + wantNextEvent: pcFinished{tmState: tmState.State{LastBlockHeight: 100}, blocksSynced: 100}, }, }, }, @@ -343,8 +290,10 @@ func TestStop(t *testing.T) { name: "more blocks present", steps: []pcFsmMakeStateValues{ { - currentState: ¶ms{height: 100, items: []pcBlock{{"P1", 101}, {"P1", 102}}, blocksSynced: 100}, event: pcStop{}, - wantState: ¶ms{height: 100, items: []pcBlock{{"P1", 101}, {"P1", 102}}, blocksSynced: 100, draining: true}, + currentState: ¶ms{height: 100, items: []pcBlock{ + {"P1", 101}, {"P1", 102}}, blocksSynced: 100}, event: scFinishedEv{}, + wantState: ¶ms{height: 100, items: []pcBlock{ + {"P1", 101}, {"P1", 102}}, blocksSynced: 100, draining: true}, wantNextEvent: noOp, wantErr: nil, }, diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 8f7143083..767e59819 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -1,118 +1,529 @@ package v2 import ( + "errors" "fmt" + "sync" "time" + "github.com/tendermint/go-amino" + "github.com/tendermint/tendermint/behaviour" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" ) -type timeCheck struct { - priorityHigh - time time.Time +//------------------------------------- + +type bcBlockRequestMessage struct { + Height int64 +} + +// ValidateBasic performs basic validation. +func (m *bcBlockRequestMessage) ValidateBasic() error { + if m.Height < 0 { + return errors.New("negative Height") + } + return nil +} + +func (m *bcBlockRequestMessage) String() string { + return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height) +} + +type bcNoBlockResponseMessage struct { + Height int64 +} + +// ValidateBasic performs basic validation. +func (m *bcNoBlockResponseMessage) ValidateBasic() error { + if m.Height < 0 { + return errors.New("negative Height") + } + return nil +} + +func (m *bcNoBlockResponseMessage) String() string { + return fmt.Sprintf("[bcNoBlockResponseMessage %d]", m.Height) +} + +//------------------------------------- + +type bcBlockResponseMessage struct { + Block *types.Block +} + +// ValidateBasic performs basic validation. +func (m *bcBlockResponseMessage) ValidateBasic() error { + if m.Block == nil { + return errors.New("block response message has nil block") + } + + return m.Block.ValidateBasic() +} + +func (m *bcBlockResponseMessage) String() string { + return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height) } -func schedulerHandle(event Event) (Event, error) { - if _, ok := event.(timeCheck); ok { - fmt.Println("scheduler handle timeCheck") +//------------------------------------- + +type bcStatusRequestMessage struct { + Height int64 +} + +// ValidateBasic performs basic validation. +func (m *bcStatusRequestMessage) ValidateBasic() error { + if m.Height < 0 { + return errors.New("negative Height") } - return noOp, nil + return nil +} + +func (m *bcStatusRequestMessage) String() string { + return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height) +} + +//------------------------------------- + +type bcStatusResponseMessage struct { + Height int64 } -func processorHandle(event Event) (Event, error) { - if _, ok := event.(timeCheck); ok { - fmt.Println("processor handle timeCheck") +// ValidateBasic performs basic validation. +func (m *bcStatusResponseMessage) ValidateBasic() error { + if m.Height < 0 { + return errors.New("negative Height") } - return noOp, nil + return nil +} + +func (m *bcStatusResponseMessage) String() string { + return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height) +} +type blockStore interface { + LoadBlock(height int64) *types.Block + SaveBlock(*types.Block, *types.PartSet, *types.Commit) + Height() int64 } -type Reactor struct { - events chan Event +// BlockchainReactor handles fast sync protocol. +type BlockchainReactor struct { + p2p.BaseReactor + + events chan Event // XXX: Rename eventsFromPeers stopDemux chan struct{} scheduler *Routine processor *Routine - ticker *time.Ticker logger log.Logger + + mtx sync.RWMutex + maxPeerHeight int64 + syncHeight int64 + + reporter behaviour.Reporter + io iIO + store blockStore +} + +//nolint:unused,deadcode +type blockVerifier interface { + VerifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error } -func NewReactor(bufferSize int) *Reactor { - return &Reactor{ +//nolint:deadcode +type blockApplier interface { + ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, error) +} + +// XXX: unify naming in this package around tmState +// XXX: V1 stores a copy of state as initialState, which is never mutated. Is that nessesary? +func newReactor(state state.State, store blockStore, reporter behaviour.Reporter, + blockApplier blockApplier, bufferSize int) *BlockchainReactor { + scheduler := newScheduler(state.LastBlockHeight, time.Now()) + pContext := newProcessorContext(store, blockApplier, state) + // TODO: Fix naming to just newProcesssor + // newPcState requires a processorContext + processor := newPcState(pContext) + + return &BlockchainReactor{ events: make(chan Event, bufferSize), stopDemux: make(chan struct{}), - scheduler: newRoutine("scheduler", schedulerHandle, bufferSize), - processor: newRoutine("processor", processorHandle, bufferSize), - ticker: time.NewTicker(1 * time.Second), + scheduler: newRoutine("scheduler", scheduler.handle, bufferSize), + processor: newRoutine("processor", processor.handle, bufferSize), + store: store, + reporter: reporter, logger: log.NewNopLogger(), } } -// nolint:unused -func (r *Reactor) setLogger(logger log.Logger) { +// NewBlockchainReactor creates a new reactor instance. +func NewBlockchainReactor( + state state.State, + blockApplier blockApplier, + store blockStore, + fastSync bool) *BlockchainReactor { + reporter := behaviour.NewMockReporter() + return newReactor(state, store, reporter, blockApplier, 1000) +} + +// SetSwitch implements Reactor interface. +func (r *BlockchainReactor) SetSwitch(sw *p2p.Switch) { + if sw == nil { + panic("set nil switch") + } + + r.Switch = sw + r.io = newSwitchIo(sw) +} + +func (r *BlockchainReactor) setMaxPeerHeight(height int64) { + r.mtx.Lock() + defer r.mtx.Unlock() + if height > r.maxPeerHeight { + r.maxPeerHeight = height + } +} + +func (r *BlockchainReactor) setSyncHeight(height int64) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.syncHeight = height +} + +// SyncHeight returns the height to which the BlockchainReactor has synced. +func (r *BlockchainReactor) SyncHeight() int64 { + r.mtx.RLock() + defer r.mtx.RUnlock() + return r.syncHeight +} + +// SetLogger sets the logger of the reactor. +func (r *BlockchainReactor) SetLogger(logger log.Logger) { r.logger = logger r.scheduler.setLogger(logger) r.processor.setLogger(logger) } -func (r *Reactor) Start() { +// Start implements cmn.Service interface +func (r *BlockchainReactor) Start() error { + r.reporter = behaviour.NewSwitchReporter(r.BaseReactor.Switch) go r.scheduler.start() go r.processor.start() go r.demux() + return nil +} - <-r.scheduler.ready() - <-r.processor.ready() +// reactor generated ticker events: +// ticker for cleaning peers +type rTryPrunePeer struct { + priorityHigh + time time.Time +} - go func() { - for t := range r.ticker.C { - r.events <- timeCheck{time: t} - } - }() +func (e rTryPrunePeer) String() string { + return fmt.Sprintf(": %v", e.time) } -// XXX: How to make this deterministic? -// XXX: Would it be possible here to provide some kind of type safety for the types -// of events that each routine can produce and consume? -func (r *Reactor) demux() { +// ticker event for scheduling block requests +type rTrySchedule struct { + priorityHigh + time time.Time +} + +func (e rTrySchedule) String() string { + return fmt.Sprintf(": %v", e.time) +} + +// ticker for block processing +type rProcessBlock struct { + priorityNormal +} + +// reactor generated events based on blockchain related messages from peers: +// blockResponse message received from a peer +type bcBlockResponse struct { + priorityNormal + time time.Time + peerID p2p.ID + size int64 + block *types.Block +} + +// blockNoResponse message received from a peer +type bcNoBlockResponse struct { + priorityNormal + time time.Time + peerID p2p.ID + height int64 +} + +// statusResponse message received from a peer +type bcStatusResponse struct { + priorityNormal + time time.Time + peerID p2p.ID + height int64 +} + +// new peer is connected +type bcAddNewPeer struct { + priorityNormal + peerID p2p.ID +} + +// existing peer is removed +type bcRemovePeer struct { + priorityHigh + peerID p2p.ID + reason interface{} +} + +func (r *BlockchainReactor) demux() { + var lastRate = 0.0 + var lastHundred = time.Now() + + var ( + processBlockFreq = 20 * time.Millisecond + doProcessBlockCh = make(chan struct{}, 1) + doProcessBlockTk = time.NewTicker(processBlockFreq) + + prunePeerFreq = 1 * time.Second + doPrunePeerCh = make(chan struct{}, 1) + doPrunePeerTk = time.NewTicker(prunePeerFreq) + + scheduleFreq = 20 * time.Millisecond + doScheduleCh = make(chan struct{}, 1) + doScheduleTk = time.NewTicker(scheduleFreq) + + statusFreq = 10 * time.Second + doStatusCh = make(chan struct{}, 1) + doStatusTk = time.NewTicker(statusFreq) + ) + + // XXX: Extract timers to make testing atemporal for { select { + // Pacers: send at most per frequency but don't saturate + case <-doProcessBlockTk.C: + select { + case doProcessBlockCh <- struct{}{}: + default: + } + case <-doPrunePeerTk.C: + select { + case doPrunePeerCh <- struct{}{}: + default: + } + case <-doScheduleTk.C: + select { + case doScheduleCh <- struct{}{}: + default: + } + case <-doStatusTk.C: + select { + case doStatusCh <- struct{}{}: + default: + } + + // Tickers: perform tasks periodically + case <-doScheduleCh: + r.scheduler.send(rTrySchedule{time: time.Now()}) + case <-doPrunePeerCh: + r.scheduler.send(rTryPrunePeer{time: time.Now()}) + case <-doProcessBlockCh: + r.processor.send(rProcessBlock{}) + case <-doStatusCh: + r.io.broadcastStatusRequest(r.SyncHeight()) + + // Events from peers case event := <-r.events: - // XXX: check for backpressure - r.scheduler.send(event) - r.processor.send(event) - case <-r.stopDemux: - r.logger.Info("demuxing stopped") - return + switch event := event.(type) { + case bcStatusResponse: + r.setMaxPeerHeight(event.height) + r.scheduler.send(event) + case bcAddNewPeer, bcRemovePeer, bcBlockResponse, bcNoBlockResponse: + r.scheduler.send(event) + } + + // Incremental events form scheduler case event := <-r.scheduler.next(): - r.processor.send(event) + switch event := event.(type) { + case scBlockReceived: + r.processor.send(event) + case scPeerError: + r.processor.send(event) + r.reporter.Report(behaviour.BadMessage(event.peerID, "scPeerError")) + case scBlockRequest: + r.io.sendBlockRequest(event.peerID, event.height) + case scFinishedEv: + r.processor.send(event) + r.scheduler.stop() + } + + // Incremental events from processor case event := <-r.processor.next(): - r.scheduler.send(event) + switch event := event.(type) { + case pcBlockProcessed: + r.setSyncHeight(event.height) + if r.syncHeight%100 == 0 { + lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) + r.logger.Info("Fast Syncc Rate", "height", r.syncHeight, + "max_peer_height", r.maxPeerHeight, "blocks/s", lastRate) + lastHundred = time.Now() + } + r.scheduler.send(event) + case pcBlockVerificationFailure: + r.scheduler.send(event) + case pcFinished: + r.io.trySwitchToConsensus(event.tmState, event.blocksSynced) + r.processor.stop() + } + + // Terminal events from scheduler case err := <-r.scheduler.final(): r.logger.Info(fmt.Sprintf("scheduler final %s", err)) - case err := <-r.processor.final(): - r.logger.Info(fmt.Sprintf("processor final %s", err)) - // XXX: switch to consensus + // send the processor stop? + + // Terminal event from processor + case event := <-r.processor.final(): + r.logger.Info(fmt.Sprintf("processor final %s", event)) + + case <-r.stopDemux: + r.logger.Info("demuxing stopped") + return } } } -func (r *Reactor) Stop() { +// Stop implements cmn.Service interface. +func (r *BlockchainReactor) Stop() error { r.logger.Info("reactor stopping") - r.ticker.Stop() r.scheduler.stop() r.processor.stop() close(r.stopDemux) close(r.events) r.logger.Info("reactor stopped") + return nil +} + +const ( + // NOTE: keep up to date with bcBlockResponseMessage + bcBlockResponseMessagePrefixSize = 4 + bcBlockResponseMessageFieldKeySize = 1 + maxMsgSize = types.MaxBlockSizeBytes + + bcBlockResponseMessagePrefixSize + + bcBlockResponseMessageFieldKeySize +) + +// BlockchainMessage is a generic message for this reactor. +type BlockchainMessage interface { + ValidateBasic() error +} + +// RegisterBlockchainMessages registers the fast sync messages for amino encoding. +func RegisterBlockchainMessages(cdc *amino.Codec) { + cdc.RegisterInterface((*BlockchainMessage)(nil), nil) + cdc.RegisterConcrete(&bcBlockRequestMessage{}, "tendermint/blockchain/BlockRequest", nil) + cdc.RegisterConcrete(&bcBlockResponseMessage{}, "tendermint/blockchain/BlockResponse", nil) + cdc.RegisterConcrete(&bcNoBlockResponseMessage{}, "tendermint/blockchain/NoBlockResponse", nil) + cdc.RegisterConcrete(&bcStatusResponseMessage{}, "tendermint/blockchain/StatusResponse", nil) + cdc.RegisterConcrete(&bcStatusRequestMessage{}, "tendermint/blockchain/StatusRequest", nil) } -func (r *Reactor) Receive(event Event) { - // XXX: decode and serialize write events - // TODO: backpressure +func decodeMsg(bz []byte) (msg BlockchainMessage, err error) { + if len(bz) > maxMsgSize { + return msg, fmt.Errorf("msg exceeds max size (%d > %d)", len(bz), maxMsgSize) + } + err = cdc.UnmarshalBinaryBare(bz, &msg) + return +} + +// Receive implements Reactor by handling different message types. +func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { + msg, err := decodeMsg(msgBytes) + if err != nil { + r.logger.Error("error decoding message", + "src", src.ID(), "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) + _ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error())) + return + } + + if err = msg.ValidateBasic(); err != nil { + r.logger.Error("peer sent us invalid msg", "peer", src, "msg", msg, "err", err) + _ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error())) + return + } + + r.logger.Debug("Receive", "src", src.ID(), "chID", chID, "msg", msg) + + switch msg := msg.(type) { + case *bcStatusRequestMessage: + if err := r.io.sendStatusResponse(r.store.Height(), src.ID()); err != nil { + r.logger.Error("Could not send status message to peer", "src", src) + } + + case *bcBlockRequestMessage: + block := r.store.LoadBlock(msg.Height) + if block != nil { + if err = r.io.sendBlockToPeer(block, src.ID()); err != nil { + r.logger.Error("Could not send block message to peer: ", err) + } + } else { + r.logger.Info("peer asking for a block we don't have", "src", src, "height", msg.Height) + peerID := src.ID() + if err = r.io.sendBlockNotFound(msg.Height, peerID); err != nil { + r.logger.Error("Couldn't send block not found: ", err) + } + } + + case *bcStatusResponseMessage: + r.events <- bcStatusResponse{peerID: src.ID(), height: msg.Height} + + case *bcBlockResponseMessage: + r.events <- bcBlockResponse{ + peerID: src.ID(), + block: msg.Block, + size: int64(len(msgBytes)), + time: time.Now(), + } + + case *bcNoBlockResponseMessage: + r.events <- bcNoBlockResponse{peerID: src.ID(), height: msg.Height, time: time.Now()} + } +} + +// AddPeer implements Reactor interface +func (r *BlockchainReactor) AddPeer(peer p2p.Peer) { + err := r.io.sendStatusResponse(r.store.Height(), peer.ID()) + if err != nil { + r.logger.Error("Could not send status message to peer new", "src", peer.ID, "height", r.SyncHeight()) + } + r.events <- bcAddNewPeer{peerID: peer.ID()} +} + +// RemovePeer implements Reactor interface. +func (r *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { + event := bcRemovePeer{ + peerID: peer.ID(), + reason: reason, + } r.events <- event } -func (r *Reactor) AddPeer() { - // TODO: add peer event and send to demuxer +// GetChannels implements Reactor +func (r *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { + return []*p2p.ChannelDescriptor{ + { + ID: BlockchainChannel, + Priority: 10, + SendQueueCapacity: 2000, + RecvBufferCapacity: 50 * 4096, + RecvMessageCapacity: maxMsgSize, + }, + } } diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index 46a2e60c6..10457cdb0 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -1,22 +1,517 @@ package v2 import ( + "net" + "os" + "sort" + "sync" "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/behaviour" + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" + "github.com/tendermint/tendermint/mock" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/conn" + "github.com/tendermint/tendermint/proxy" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/store" + "github.com/tendermint/tendermint/types" + tmtime "github.com/tendermint/tendermint/types/time" + dbm "github.com/tendermint/tm-db" ) -func TestReactor(t *testing.T) { +type mockPeer struct { + service.Service + id p2p.ID +} + +func (mp mockPeer) FlushStop() {} +func (mp mockPeer) ID() p2p.ID { return mp.id } +func (mp mockPeer) RemoteIP() net.IP { return net.IP{} } +func (mp mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.RemoteIP(), Port: 8800} } + +func (mp mockPeer) IsOutbound() bool { return true } +func (mp mockPeer) IsPersistent() bool { return true } +func (mp mockPeer) CloseConn() error { return nil } + +func (mp mockPeer) NodeInfo() p2p.NodeInfo { + return p2p.DefaultNodeInfo{ + DefaultNodeID: "", + ListenAddr: "", + } +} +func (mp mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} } +func (mp mockPeer) SocketAddr() *p2p.NetAddress { return &p2p.NetAddress{} } + +func (mp mockPeer) Send(byte, []byte) bool { return true } +func (mp mockPeer) TrySend(byte, []byte) bool { return true } + +func (mp mockPeer) Set(string, interface{}) {} +func (mp mockPeer) Get(string) interface{} { return struct{}{} } + +//nolint:unused +type mockBlockStore struct { + blocks map[int64]*types.Block +} + +func (ml *mockBlockStore) Height() int64 { + return int64(len(ml.blocks)) +} + +func (ml *mockBlockStore) LoadBlock(height int64) *types.Block { + return ml.blocks[height] +} + +func (ml *mockBlockStore) SaveBlock(block *types.Block, part *types.PartSet, commit *types.Commit) { + ml.blocks[block.Height] = block +} + +type mockBlockApplier struct { +} + +// XXX: Add whitelist/blacklist? +func (mba *mockBlockApplier) ApplyBlock(state sm.State, blockID types.BlockID, block *types.Block) (sm.State, error) { + state.LastBlockHeight++ + return state, nil +} + +type mockSwitchIo struct { + mtx sync.Mutex + switchedToConsensus bool + numStatusResponse int + numBlockResponse int + numNoBlockResponse int +} + +func (sio *mockSwitchIo) sendBlockRequest(peerID p2p.ID, height int64) error { + return nil +} + +func (sio *mockSwitchIo) sendStatusResponse(height int64, peerID p2p.ID) error { + sio.mtx.Lock() + defer sio.mtx.Unlock() + sio.numStatusResponse++ + return nil +} + +func (sio *mockSwitchIo) sendBlockToPeer(block *types.Block, peerID p2p.ID) error { + sio.mtx.Lock() + defer sio.mtx.Unlock() + sio.numBlockResponse++ + return nil +} + +func (sio *mockSwitchIo) sendBlockNotFound(height int64, peerID p2p.ID) error { + sio.mtx.Lock() + defer sio.mtx.Unlock() + sio.numNoBlockResponse++ + return nil +} + +func (sio *mockSwitchIo) trySwitchToConsensus(state sm.State, blocksSynced int) { + sio.mtx.Lock() + defer sio.mtx.Unlock() + sio.switchedToConsensus = true +} + +func (sio *mockSwitchIo) hasSwitchedToConsensus() bool { + sio.mtx.Lock() + defer sio.mtx.Unlock() + return sio.switchedToConsensus +} + +func (sio *mockSwitchIo) broadcastStatusRequest(height int64) { +} + +type testReactorParams struct { + logger log.Logger + genDoc *types.GenesisDoc + privVals []types.PrivValidator + startHeight int64 + bufferSize int + mockA bool +} + +func newTestReactor(p testReactorParams) *BlockchainReactor { + store, state, _ := newReactorStore(p.genDoc, p.privVals, p.startHeight) + reporter := behaviour.NewMockReporter() + + var appl blockApplier + + if p.mockA { + appl = &mockBlockApplier{} + } else { + app := &testApp{} + cc := proxy.NewLocalClientCreator(app) + proxyApp := proxy.NewAppConns(cc) + err := proxyApp.Start() + if err != nil { + panic(errors.Wrap(err, "error start app")) + } + db := dbm.NewMemDB() + appl = sm.NewBlockExecutor(db, p.logger, proxyApp.Consensus(), mock.Mempool{}, sm.MockEvidencePool{}) + sm.SaveState(db, state) + } + + r := newReactor(state, store, reporter, appl, p.bufferSize) + logger := log.TestingLogger() + r.SetLogger(logger.With("module", "blockchain")) + + return r +} + +func TestReactorTerminationScenarios(t *testing.T) { + + config := cfg.ResetTestRoot("blockchain_reactor_v2_test") + defer os.RemoveAll(config.RootDir) + genDoc, privVals := randGenesisDoc(config.ChainID(), 1, false, 30) + refStore, _, _ := newReactorStore(genDoc, privVals, 20) + + params := testReactorParams{ + logger: log.TestingLogger(), + genDoc: genDoc, + privVals: privVals, + startHeight: 10, + bufferSize: 100, + mockA: true, + } + + type testEvent struct { + evType string + peer string + height int64 + } + + tests := []struct { + name string + params testReactorParams + msgs []testEvent + }{ + { + name: "simple termination on max peer height - one peer", + params: params, + msgs: []testEvent{ + {evType: "AddPeer", peer: "P1"}, + {evType: "ReceiveS", peer: "P1", height: 13}, + {evType: "BlockReq"}, + {evType: "ReceiveB", peer: "P1", height: 11}, + {evType: "BlockReq"}, + {evType: "BlockReq"}, + {evType: "ReceiveB", peer: "P1", height: 12}, + {evType: "Process"}, + {evType: "ReceiveB", peer: "P1", height: 13}, + {evType: "Process"}, + }, + }, + { + name: "simple termination on max peer height - two peers", + params: params, + msgs: []testEvent{ + {evType: "AddPeer", peer: "P1"}, + {evType: "AddPeer", peer: "P2"}, + {evType: "ReceiveS", peer: "P1", height: 13}, + {evType: "ReceiveS", peer: "P2", height: 15}, + {evType: "BlockReq"}, + {evType: "BlockReq"}, + {evType: "ReceiveB", peer: "P1", height: 11}, + {evType: "ReceiveB", peer: "P2", height: 12}, + {evType: "Process"}, + {evType: "BlockReq"}, + {evType: "BlockReq"}, + {evType: "ReceiveB", peer: "P1", height: 13}, + {evType: "Process"}, + {evType: "ReceiveB", peer: "P2", height: 14}, + {evType: "Process"}, + {evType: "BlockReq"}, + {evType: "ReceiveB", peer: "P2", height: 15}, + {evType: "Process"}, + }, + }, + { + name: "termination on max peer height - two peers, noBlock error", + params: params, + msgs: []testEvent{ + {evType: "AddPeer", peer: "P1"}, + {evType: "AddPeer", peer: "P2"}, + {evType: "ReceiveS", peer: "P1", height: 13}, + {evType: "ReceiveS", peer: "P2", height: 15}, + {evType: "BlockReq"}, + {evType: "BlockReq"}, + {evType: "ReceiveNB", peer: "P1", height: 11}, + {evType: "BlockReq"}, + {evType: "ReceiveB", peer: "P2", height: 12}, + {evType: "ReceiveB", peer: "P2", height: 11}, + {evType: "Process"}, + {evType: "BlockReq"}, + {evType: "BlockReq"}, + {evType: "ReceiveB", peer: "P2", height: 13}, + {evType: "Process"}, + {evType: "ReceiveB", peer: "P2", height: 14}, + {evType: "Process"}, + {evType: "BlockReq"}, + {evType: "ReceiveB", peer: "P2", height: 15}, + {evType: "Process"}, + }, + }, + { + name: "termination on max peer height - two peers, remove one peer", + params: params, + msgs: []testEvent{ + {evType: "AddPeer", peer: "P1"}, + {evType: "AddPeer", peer: "P2"}, + {evType: "ReceiveS", peer: "P1", height: 13}, + {evType: "ReceiveS", peer: "P2", height: 15}, + {evType: "BlockReq"}, + {evType: "BlockReq"}, + {evType: "RemovePeer", peer: "P1"}, + {evType: "BlockReq"}, + {evType: "ReceiveB", peer: "P2", height: 12}, + {evType: "ReceiveB", peer: "P2", height: 11}, + {evType: "Process"}, + {evType: "BlockReq"}, + {evType: "BlockReq"}, + {evType: "ReceiveB", peer: "P2", height: 13}, + {evType: "Process"}, + {evType: "ReceiveB", peer: "P2", height: 14}, + {evType: "Process"}, + {evType: "BlockReq"}, + {evType: "ReceiveB", peer: "P2", height: 15}, + {evType: "Process"}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + reactor := newTestReactor(params) + reactor.Start() + reactor.reporter = behaviour.NewMockReporter() + mockSwitch := &mockSwitchIo{switchedToConsensus: false} + reactor.io = mockSwitch + // time for go routines to start + time.Sleep(time.Millisecond) + + for _, step := range tt.msgs { + switch step.evType { + case "AddPeer": + reactor.scheduler.send(bcAddNewPeer{peerID: p2p.ID(step.peer)}) + case "RemovePeer": + reactor.scheduler.send(bcRemovePeer{peerID: p2p.ID(step.peer)}) + case "ReceiveS": + reactor.scheduler.send(bcStatusResponse{ + peerID: p2p.ID(step.peer), + height: step.height, + time: time.Now(), + }) + case "ReceiveB": + reactor.scheduler.send(bcBlockResponse{ + peerID: p2p.ID(step.peer), + block: refStore.LoadBlock(step.height), + size: 10, + time: time.Now(), + }) + case "ReceiveNB": + reactor.scheduler.send(bcNoBlockResponse{ + peerID: p2p.ID(step.peer), + height: step.height, + time: time.Now(), + }) + case "BlockReq": + reactor.scheduler.send(rTrySchedule{time: time.Now()}) + case "Process": + reactor.processor.send(rProcessBlock{}) + } + // give time for messages to propagate between routines + time.Sleep(time.Millisecond) + } + + // time for processor to finish and reactor to switch to consensus + time.Sleep(20 * time.Millisecond) + assert.True(t, mockSwitch.hasSwitchedToConsensus()) + reactor.Stop() + }) + } +} + +func TestReactorHelperMode(t *testing.T) { var ( - bufferSize = 10 - reactor = NewReactor(bufferSize) + channelID = byte(0x40) ) - reactor.Start() - script := []Event{ - // TODO + config := cfg.ResetTestRoot("blockchain_reactor_v2_test") + defer os.RemoveAll(config.RootDir) + genDoc, privVals := randGenesisDoc(config.ChainID(), 1, false, 30) + + params := testReactorParams{ + logger: log.TestingLogger(), + genDoc: genDoc, + privVals: privVals, + startHeight: 20, + bufferSize: 100, + mockA: true, + } + + type testEvent struct { + peer string + event interface{} + } + + tests := []struct { + name string + params testReactorParams + msgs []testEvent + }{ + { + name: "status request", + params: params, + msgs: []testEvent{ + {"P1", bcStatusRequestMessage{}}, + {"P1", bcBlockRequestMessage{Height: 13}}, + {"P1", bcBlockRequestMessage{Height: 20}}, + {"P1", bcBlockRequestMessage{Height: 22}}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + reactor := newTestReactor(params) + reactor.Start() + mockSwitch := &mockSwitchIo{switchedToConsensus: false} + reactor.io = mockSwitch + + for i := 0; i < len(tt.msgs); i++ { + step := tt.msgs[i] + switch ev := step.event.(type) { + case bcStatusRequestMessage: + old := mockSwitch.numStatusResponse + reactor.Receive(channelID, mockPeer{id: p2p.ID(step.peer)}, cdc.MustMarshalBinaryBare(ev)) + assert.Equal(t, old+1, mockSwitch.numStatusResponse) + case bcBlockRequestMessage: + if ev.Height > params.startHeight { + old := mockSwitch.numNoBlockResponse + reactor.Receive(channelID, mockPeer{id: p2p.ID(step.peer)}, cdc.MustMarshalBinaryBare(ev)) + assert.Equal(t, old+1, mockSwitch.numNoBlockResponse) + } else { + old := mockSwitch.numBlockResponse + reactor.Receive(channelID, mockPeer{id: p2p.ID(step.peer)}, cdc.MustMarshalBinaryBare(ev)) + assert.Equal(t, old+1, mockSwitch.numBlockResponse) + } + } + } + reactor.Stop() + }) + } +} + +//---------------------------------------------- +// utility funcs + +func makeTxs(height int64) (txs []types.Tx) { + for i := 0; i < 10; i++ { + txs = append(txs, types.Tx([]byte{byte(height), byte(i)})) + } + return txs +} + +func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block { + block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address) + return block +} + +type testApp struct { + abci.BaseApplication +} + +func randGenesisDoc(chainID string, numValidators int, randPower bool, minPower int64) ( + *types.GenesisDoc, []types.PrivValidator) { + validators := make([]types.GenesisValidator, numValidators) + privValidators := make([]types.PrivValidator, numValidators) + for i := 0; i < numValidators; i++ { + val, privVal := types.RandValidator(randPower, minPower) + validators[i] = types.GenesisValidator{ + PubKey: val.PubKey, + Power: val.VotingPower, + } + privValidators[i] = privVal } + sort.Sort(types.PrivValidatorsByAddress(privValidators)) + + return &types.GenesisDoc{ + GenesisTime: tmtime.Now(), + ChainID: chainID, + Validators: validators, + }, privValidators +} + +// Why are we importing the entire blockExecutor dependency graph here +// when we have the facilities to +func newReactorStore( + genDoc *types.GenesisDoc, + privVals []types.PrivValidator, + maxBlockHeight int64) (*store.BlockStore, sm.State, *sm.BlockExecutor) { + if len(privVals) != 1 { + panic("only support one validator") + } + app := &testApp{} + cc := proxy.NewLocalClientCreator(app) + proxyApp := proxy.NewAppConns(cc) + err := proxyApp.Start() + if err != nil { + panic(errors.Wrap(err, "error start app")) + } + + stateDB := dbm.NewMemDB() + blockStore := store.NewBlockStore(dbm.NewMemDB()) + + state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc) + if err != nil { + panic(errors.Wrap(err, "error constructing state from genesis file")) + } + + db := dbm.NewMemDB() + blockExec := sm.NewBlockExecutor(db, log.TestingLogger(), proxyApp.Consensus(), + mock.Mempool{}, sm.MockEvidencePool{}) + sm.SaveState(db, state) + + // add blocks in + for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { + lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil) + if blockHeight > 1 { + lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1) + lastBlock := blockStore.LoadBlock(blockHeight - 1) + vote, err := types.MakeVote( + lastBlock.Header.Height, + lastBlockMeta.BlockID, + state.Validators, + privVals[0], + lastBlock.Header.ChainID) + if err != nil { + panic(err) + } + lastCommit = types.NewCommit(vote.Height, vote.Round, + lastBlockMeta.BlockID, []types.CommitSig{vote.CommitSig()}) + } + + thisBlock := makeBlock(blockHeight, state, lastCommit) + + thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) + blockID := types.BlockID{Hash: thisBlock.Hash(), PartsHeader: thisParts.Header()} + + state, err = blockExec.ApplyBlock(state, blockID, thisBlock) + if err != nil { + panic(errors.Wrap(err, "error apply block")) + } - for _, event := range script { - reactor.Receive(event) + blockStore.SaveBlock(thisBlock, thisParts, lastCommit) } - reactor.Stop() + return blockStore, state, blockExec } diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 897dd738c..1a883c3c4 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -10,7 +10,7 @@ import ( type handleFunc = func(event Event) (Event, error) -// Routines are a structure which model a finite state machine as serialized +// Routine is a structure that models a finite state machine as serialized // stream of events processed by a handle function. This Routine structure // handles the concurrency and messaging guarantees. Events are sent via // `send` are handled by the `handle` function to produce an iterator @@ -80,7 +80,7 @@ func (rt *Routine) start() { return } rt.metrics.EventsOut.With("routine", rt.name).Add(1) - rt.logger.Debug(fmt.Sprintf("%s produced %T %+v\n", rt.name, oEvent, oEvent)) + rt.logger.Debug(fmt.Sprintf("%s: produced %T %+v\n", rt.name, oEvent, oEvent)) rt.out <- oEvent } @@ -98,6 +98,7 @@ func (rt *Routine) send(event Event) bool { rt.logger.Info(fmt.Sprintf("%s: send failed, queue was full/stopped \n", rt.name)) return false } + rt.metrics.EventsSent.With("routine", rt.name).Add(1) return true } diff --git a/blockchain/v2/scheduler.go b/blockchain/v2/scheduler.go index ab3892dc5..3cf0b2468 100644 --- a/blockchain/v2/scheduler.go +++ b/blockchain/v2/scheduler.go @@ -11,52 +11,11 @@ import ( "github.com/tendermint/tendermint/types" ) -// Events - -// XXX: The handle API would be much simpler if it return a single event, an -// Event, which embeds a terminationEvent if it wants to terminate the routine. - -// Input events into the scheduler: -// ticker event for cleaning peers -type tryPrunePeer struct { - priorityHigh - time time.Time -} - -// ticker event for scheduling block requests -type trySchedule struct { - priorityHigh - time time.Time -} - -// blockResponse message received from a peer -type bcBlockResponse struct { - priorityNormal - time time.Time - peerID p2p.ID - height int64 - size int64 - block *types.Block -} - -// statusResponse message received from a peer -type bcStatusResponse struct { - priorityNormal - time time.Time - peerID p2p.ID - height int64 -} - -// new peer is connected -type addNewPeer struct { - priorityNormal - peerID p2p.ID -} - -// Output events issued by the scheduler: +// Events generated by the scheduler: // all blocks have been processed type scFinishedEv struct { priorityNormal + reason string } // send a blockRequest message @@ -80,6 +39,10 @@ type scPeerError struct { reason error } +func (e scPeerError) String() string { + return fmt.Sprintf("scPeerError - peerID %s, err %s", e.peerID, e.reason) +} + // scheduler removed a set of peers (timed out or slow peer) type scPeersPruned struct { priorityHigh @@ -160,9 +123,10 @@ func (p scPeer) String() string { func newScPeer(peerID p2p.ID) *scPeer { return &scPeer{ - peerID: peerID, - state: peerStateNew, - height: -1, + peerID: peerID, + state: peerStateNew, + height: -1, + lastTouched: time.Time{}, } } @@ -176,11 +140,17 @@ type scheduler struct { // in Processed state. height int64 + // lastAdvance tracks the last time a block execution happened. + // syncTimeout is the maximum time the scheduler waits to advance in the fast sync process before finishing. + // This covers the cases where there are no peers or all peers have a lower height. + lastAdvance time.Time + syncTimeout time.Duration + // a map of peerID to scheduler specific peer struct `scPeer` used to keep // track of peer specific state peers map[p2p.ID]*scPeer - peerTimeout time.Duration - minRecvRate int64 // minimum receive rate from peer otherwise prune + peerTimeout time.Duration // maximum response time from a peer otherwise prune + minRecvRate int64 // minimum receive rate from peer otherwise prune // the maximum number of blocks that should be New, Received or Pending at any point // in time. This is used to enforce a limit on the blockStates map. @@ -204,15 +174,20 @@ func (sc scheduler) String() string { sc.initHeight, sc.blockStates, sc.peers, sc.pendingBlocks, sc.pendingTime, sc.receivedBlocks) } -func newScheduler(initHeight int64) *scheduler { +func newScheduler(initHeight int64, startTime time.Time) *scheduler { sc := scheduler{ initHeight: initHeight, + lastAdvance: startTime, + syncTimeout: 60 * time.Second, height: initHeight + 1, blockStates: make(map[int64]blockState), peers: make(map[p2p.ID]*scPeer), pendingBlocks: make(map[int64]p2p.ID), pendingTime: make(map[int64]time.Time), receivedBlocks: make(map[int64]p2p.ID), + targetPending: 10, // TODO - pass as param + peerTimeout: 15 * time.Second, // TODO - pass as param + minRecvRate: 0, //int64(7680), TODO - pass as param } return &sc @@ -316,6 +291,7 @@ func (sc *scheduler) setPeerHeight(peerID p2p.ID, height int64) error { } if height < peer.height { + sc.removePeer(peerID) return fmt.Errorf("cannot move peer height lower. from %d to %d", peer.height, height) } @@ -327,7 +303,7 @@ func (sc *scheduler) setPeerHeight(peerID p2p.ID, height int64) error { } func (sc *scheduler) getStateAtHeight(height int64) blockState { - if height <= sc.initHeight { + if height < sc.height { return blockStateProcessed } else if state, ok := sc.blockStates[height]; ok { return state @@ -349,41 +325,8 @@ func (sc *scheduler) getPeersAtHeightOrAbove(height int64) []p2p.ID { return peers } -func (sc *scheduler) peersInactiveSince(duration time.Duration, now time.Time) []p2p.ID { - peers := []p2p.ID{} - for _, peer := range sc.peers { - if peer.state != peerStateReady { - continue - } - if now.Sub(peer.lastTouched) > duration { - peers = append(peers, peer.peerID) - } - } - - // Ensure the order is deterministic for testing - sort.Sort(PeerByID(peers)) - return peers -} - -// will return peers who's lastRate i slower than minSpeed denominated in bytes -func (sc *scheduler) peersSlowerThan(minSpeed int64) []p2p.ID { - peers := []p2p.ID{} - for peerID, peer := range sc.peers { - if peer.state != peerStateReady { - continue - } - if peer.lastRate < minSpeed { - peers = append(peers, peerID) - } - } - - // Ensure the order is deterministic for testing - sort.Sort(PeerByID(peers)) - return peers -} - func (sc *scheduler) prunablePeers(peerTimout time.Duration, minRecvRate int64, now time.Time) []p2p.ID { - prunable := []p2p.ID{} + prunable := make([]p2p.ID, 0) for peerID, peer := range sc.peers { if peer.state != peerStateReady { continue @@ -407,8 +350,8 @@ func (sc *scheduler) markReceived(peerID p2p.ID, height int64, size int64, now t return fmt.Errorf("couldn't find peer %s", peerID) } - if peer.state == peerStateRemoved { - return fmt.Errorf("cannot receive blocks from removed peer %s", peerID) + if peer.state != peerStateReady { + return fmt.Errorf("cannot receive blocks from not ready peer %s", peerID) } if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID { @@ -454,14 +397,13 @@ func (sc *scheduler) markPending(peerID p2p.ID, height int64, time time.Time) er sc.setStateAtHeight(height, blockStatePending) sc.pendingBlocks[height] = peerID - // XXX: to make this more accurate we can introduce a message from - // the IO routine which indicates the time the request was put on the wire sc.pendingTime[height] = time return nil } func (sc *scheduler) markProcessed(height int64) error { + sc.lastAdvance = time.Now() state := sc.getStateAtHeight(height) if state != blockStateReceived { return fmt.Errorf("cannot mark height %d received from block state %s", height, state) @@ -476,6 +418,9 @@ func (sc *scheduler) markProcessed(height int64) error { } func (sc *scheduler) allBlocksProcessed() bool { + if len(sc.peers) == 0 { + return false + } return sc.height >= sc.maxHeight() } @@ -486,7 +431,7 @@ func (sc *scheduler) maxHeight() int64 { if peer.state != peerStateReady { continue } - if peer.height > max { + if max < peer.height { max = peer.height } } @@ -532,15 +477,15 @@ func (sc *scheduler) selectPeer(height int64) (p2p.ID, error) { } // find the set of peers with minimum number of pending requests. - minPending := math.MaxInt64 + var minPending int64 = math.MaxInt64 for mp := range pendingFrom { - if mp < minPending { - minPending = mp + if int64(mp) < minPending { + minPending = int64(mp) } } - sort.Sort(PeerByID(pendingFrom[minPending])) - return pendingFrom[minPending][0], nil + sort.Sort(PeerByID(pendingFrom[int(minPending)])) + return pendingFrom[int(minPending)][0], nil } // PeerByID is a list of peers sorted by peerID. @@ -570,12 +515,30 @@ func (sc *scheduler) handleBlockResponse(event bcBlockResponse) (Event, error) { err = sc.markReceived(event.peerID, event.block.Height, event.size, event.time) if err != nil { + _ = sc.removePeer(event.peerID) return scPeerError{peerID: event.peerID, reason: err}, nil } return scBlockReceived{peerID: event.peerID, block: event.block}, nil } +func (sc *scheduler) handleNoBlockResponse(event bcNoBlockResponse) (Event, error) { + if len(sc.peers) == 0 { + return noOp, nil + } + + peer, ok := sc.peers[event.peerID] + if !ok || peer.state == peerStateRemoved { + return noOp, nil + } + // The peer may have been just removed due to errors, low speed or timeouts. + _ = sc.removePeer(event.peerID) + + return scPeerError{peerID: event.peerID, + reason: fmt.Errorf("peer %v with height %d claims no block for %d", + event.peerID, peer.height, event.height)}, nil +} + func (sc *scheduler) handleBlockProcessed(event pcBlockProcessed) (Event, error) { if event.height != sc.height { panic(fmt.Sprintf("processed height %d but expected height %d", event.height, sc.height)) @@ -584,12 +547,12 @@ func (sc *scheduler) handleBlockProcessed(event pcBlockProcessed) (Event, error) if err != nil { // It is possible that a peer error or timeout is handled after the processor // has processed the block but before the scheduler received this event, - // so when pcBlockProcessed event is received the block had been requested again + // so when pcBlockProcessed event is received the block had been requested again. return scSchedulerFail{reason: err}, nil } if sc.allBlocksProcessed() { - return scFinishedEv{}, nil + return scFinishedEv{reason: "processed all blocks"}, nil } return noOp, nil @@ -608,13 +571,13 @@ func (sc *scheduler) handleBlockProcessError(event pcBlockVerificationFailure) ( } if sc.allBlocksProcessed() { - return scFinishedEv{}, nil + return scFinishedEv{reason: "error on last block"}, nil } return noOp, nil } -func (sc *scheduler) handleAddNewPeer(event addNewPeer) (Event, error) { +func (sc *scheduler) handleAddNewPeer(event bcAddNewPeer) (Event, error) { err := sc.addPeer(event.peerID) if err != nil { return scSchedulerFail{reason: err}, nil @@ -622,8 +585,7 @@ func (sc *scheduler) handleAddNewPeer(event addNewPeer) (Event, error) { return noOp, nil } -// XXX: unify types peerError -func (sc *scheduler) handlePeerError(event peerError) (Event, error) { +func (sc *scheduler) handleRemovePeer(event bcRemovePeer) (Event, error) { err := sc.removePeer(event.peerID) if err != nil { // XXX - It is possible that the removePeer fails here for legitimate reasons @@ -631,12 +593,23 @@ func (sc *scheduler) handlePeerError(event peerError) (Event, error) { return scSchedulerFail{reason: err}, nil } if sc.allBlocksProcessed() { - return scFinishedEv{}, nil + return scFinishedEv{reason: "removed peer"}, nil } return noOp, nil } -func (sc *scheduler) handleTryPrunePeer(event tryPrunePeer) (Event, error) { +func (sc *scheduler) handleTryPrunePeer(event rTryPrunePeer) (Event, error) { + + // Check behavior of peer responsible to deliver block at sc.height. + timeHeightAsked, ok := sc.pendingTime[sc.height] + if ok && time.Since(timeHeightAsked) > sc.peerTimeout { + // A request was sent to a peer for block at sc.height but a response was not received + // from that peer within sc.peerTimeout. Remove the peer. This is to ensure that a peer + // will be timed out even if it sends blocks at higher heights but prevents progress by + // not sending the block at current height. + sc.removePeer(sc.pendingBlocks[sc.height]) + } + prunablePeers := sc.prunablePeers(sc.peerTimeout, sc.minRecvRate, event.time) if len(prunablePeers) == 0 { return noOp, nil @@ -649,17 +622,19 @@ func (sc *scheduler) handleTryPrunePeer(event tryPrunePeer) (Event, error) { } } - // If all blocks are processed we should finish even some peers were pruned. + // If all blocks are processed we should finish. if sc.allBlocksProcessed() { - return scFinishedEv{}, nil + return scFinishedEv{reason: "after try prune"}, nil } return scPeersPruned{peers: prunablePeers}, nil } -// TODO - Schedule multiple block requests -func (sc *scheduler) handleTrySchedule(event trySchedule) (Event, error) { +func (sc *scheduler) handleTrySchedule(event rTrySchedule) (Event, error) { + if time.Since(sc.lastAdvance) > sc.syncTimeout { + return scFinishedEv{reason: "timeout, no advance"}, nil + } nextHeight := sc.nextHeightToSchedule() if nextHeight == -1 { @@ -693,17 +668,20 @@ func (sc *scheduler) handle(event Event) (Event, error) { case bcBlockResponse: nextEvent, err := sc.handleBlockResponse(event) return nextEvent, err - case trySchedule: + case bcNoBlockResponse: + nextEvent, err := sc.handleNoBlockResponse(event) + return nextEvent, err + case rTrySchedule: nextEvent, err := sc.handleTrySchedule(event) return nextEvent, err - case addNewPeer: + case bcAddNewPeer: nextEvent, err := sc.handleAddNewPeer(event) return nextEvent, err - case tryPrunePeer: - nextEvent, err := sc.handleTryPrunePeer(event) + case bcRemovePeer: + nextEvent, err := sc.handleRemovePeer(event) return nextEvent, err - case peerError: - nextEvent, err := sc.handlePeerError(event) + case rTryPrunePeer: + nextEvent, err := sc.handleTryPrunePeer(event) return nextEvent, err case pcBlockProcessed: nextEvent, err := sc.handleBlockProcessed(event) @@ -714,5 +692,4 @@ func (sc *scheduler) handle(event Event) (Event, error) { default: return scSchedulerFail{reason: fmt.Errorf("unknown event %v", event)}, nil } - //return noOp, nil } diff --git a/blockchain/v2/scheduler_test.go b/blockchain/v2/scheduler_test.go index a3a6db672..445ba51a7 100644 --- a/blockchain/v2/scheduler_test.go +++ b/blockchain/v2/scheduler_test.go @@ -23,6 +23,8 @@ type scTestParams struct { peerTimeout time.Duration minRecvRate int64 targetPending int + startTime time.Time + syncTimeout time.Duration } func verifyScheduler(sc *scheduler) { @@ -37,8 +39,9 @@ func verifyScheduler(sc *scheduler) { func newTestScheduler(params scTestParams) *scheduler { peers := make(map[p2p.ID]*scPeer) + var maxHeight int64 - sc := newScheduler(params.initHeight) + sc := newScheduler(params.initHeight, params.startTime) if params.height != 0 { sc.height = params.height } @@ -46,6 +49,9 @@ func newTestScheduler(params scTestParams) *scheduler { for id, peer := range params.peers { peer.peerID = p2p.ID(id) peers[p2p.ID(id)] = peer + if maxHeight < peer.height { + maxHeight = peer.height + } } for _, h := range params.allB { sc.blockStates[h] = blockStateNew @@ -64,6 +70,12 @@ func newTestScheduler(params scTestParams) *scheduler { sc.peers = peers sc.peerTimeout = params.peerTimeout + if params.syncTimeout == 0 { + sc.syncTimeout = 10 * time.Second + } else { + sc.syncTimeout = params.syncTimeout + } + if params.targetPending == 0 { sc.targetPending = 10 } else { @@ -80,7 +92,7 @@ func newTestScheduler(params scTestParams) *scheduler { func TestScInit(t *testing.T) { var ( initHeight int64 = 5 - sc = newScheduler(initHeight) + sc = newScheduler(initHeight, time.Now()) ) assert.Equal(t, blockStateProcessed, sc.getStateAtHeight(initHeight)) assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(initHeight+1)) @@ -181,21 +193,21 @@ func TestScAddPeer(t *testing.T) { name: "add first peer", fields: scTestParams{}, args: args{peerID: "P1"}, - wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}}, }, { name: "add second peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}}, + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}}, args: args{peerID: "P2"}, wantFields: scTestParams{peers: map[string]*scPeer{ - "P1": {state: peerStateNew, height: -1}, - "P2": {state: peerStateNew, height: -1}}}, + "P1": {height: -1, state: peerStateNew}, + "P2": {height: -1, state: peerStateNew}}}, }, { name: "attempt to add duplicate peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}}, + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}}, args: args{peerID: "P1"}, - wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}}, wantErr: true, }, { @@ -271,8 +283,8 @@ func TestScTouchPeer(t *testing.T) { name: "touch peer in state Ready", fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastTouched: now}}}, args: args{peerID: "P1", time: now.Add(3 * time.Second)}, - wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, - lastTouched: now.Add(3 * time.Second)}}}, + wantFields: scTestParams{peers: map[string]*scPeer{ + "P1": {state: peerStateReady, lastTouched: now.Add(3 * time.Second)}}}, }, } @@ -289,195 +301,6 @@ func TestScTouchPeer(t *testing.T) { } } -func TestScPeersInactiveSince(t *testing.T) { - now := time.Now() - - type args struct { - threshold time.Duration - time time.Time - } - - tests := []struct { - name string - fields scTestParams - args args - wantResult []p2p.ID - }{ - { - name: "no peers", - fields: scTestParams{peers: map[string]*scPeer{}}, - args: args{threshold: time.Second, time: now}, - wantResult: []p2p.ID{}, - }, - { - name: "one active peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastTouched: now}}}, - args: args{threshold: time.Second, time: now}, - wantResult: []p2p.ID{}, - }, - { - name: "one inactive peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastTouched: now}}}, - args: args{threshold: time.Second, time: now.Add(time.Second + time.Millisecond)}, - wantResult: []p2p.ID{"P1"}, - }, - { - name: "one active and one inactive peer", - fields: scTestParams{peers: map[string]*scPeer{ - "P1": {state: peerStateReady, lastTouched: now}, - "P2": {state: peerStateReady, lastTouched: now.Add(time.Second)}}}, - args: args{threshold: time.Second, time: now.Add(time.Second + time.Millisecond)}, - wantResult: []p2p.ID{"P1"}, - }, - { - name: "one New peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew}}}, - args: args{threshold: time.Second, time: now.Add(time.Millisecond)}, - wantResult: []p2p.ID{}, - }, - { - name: "one Removed peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastTouched: now}}}, - args: args{threshold: time.Second, time: now.Add(time.Millisecond)}, - wantResult: []p2p.ID{}, - }, - { - name: "one Ready active peer and one New", - fields: scTestParams{peers: map[string]*scPeer{ - "P1": {state: peerStateRemoved, lastTouched: now}, - "P2": {state: peerStateReady, lastTouched: now.Add(time.Millisecond)}}}, - args: args{threshold: time.Second, time: now.Add(2 * time.Millisecond)}, - wantResult: []p2p.ID{}, - }, - { - name: "one Ready inactive peer and one New", - fields: scTestParams{peers: map[string]*scPeer{ - "P1": {state: peerStateRemoved, lastTouched: now}, - "P2": {state: peerStateReady, lastTouched: now.Add(time.Millisecond)}}}, - args: args{threshold: time.Second, time: now.Add(time.Second + 2*time.Millisecond)}, - wantResult: []p2p.ID{"P2"}, - }, - { - name: "combination of New, Removed and, active and non active Ready peers", - fields: scTestParams{peers: map[string]*scPeer{ - "P1": {state: peerStateNew}, - "P2": {state: peerStateRemoved, lastTouched: now}, - "P3": {state: peerStateRemoved, lastTouched: now.Add(time.Second)}, - "P4": {state: peerStateReady, lastTouched: now.Add(time.Millisecond)}, - "P5": {state: peerStateReady, lastTouched: now.Add(3 * time.Millisecond)}}}, - args: args{threshold: time.Second, time: now.Add(time.Second + 2*time.Millisecond)}, - wantResult: []p2p.ID{"P4"}, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - sc := newTestScheduler(tt.fields) - // peersInactiveSince should not mutate the scheduler - wantSc := sc - res := sc.peersInactiveSince(tt.args.threshold, tt.args.time) - sort.Sort(PeerByID(res)) - assert.Equal(t, tt.wantResult, res) - assert.Equal(t, wantSc, sc) - }) - } -} - -func TestScPeersSlowerThan(t *testing.T) { - type args struct { - minSpeed int64 - } - - tests := []struct { - name string - fields scTestParams - args args - wantResult []p2p.ID - }{ - { - name: "no peers", - fields: scTestParams{peers: map[string]*scPeer{}}, - args: args{minSpeed: 100}, - wantResult: []p2p.ID{}, - }, - { - name: "one Ready faster peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastRate: 101}}}, - args: args{minSpeed: 100}, - wantResult: []p2p.ID{}, - }, - { - name: "one Ready equal peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastRate: 100}}}, - args: args{minSpeed: 100}, - wantResult: []p2p.ID{}, - }, - { - name: "one Ready slow peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastRate: 99}}}, - args: args{minSpeed: 100}, - wantResult: []p2p.ID{"P1"}, - }, - { - name: "one Removed faster peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastRate: 101}}}, - args: args{minSpeed: 100}, - wantResult: []p2p.ID{}, - }, { - name: "one Removed equal peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastRate: 100}}}, - args: args{minSpeed: 100}, - wantResult: []p2p.ID{}, - }, - { - name: "one Removed slow peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastRate: 99}}}, - args: args{minSpeed: 100}, - wantResult: []p2p.ID{}, - }, - { - name: "one New peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew}}}, - args: args{minSpeed: 100}, - wantResult: []p2p.ID{}, - }, - { - name: "one New peer", - fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew}}}, - args: args{minSpeed: 100}, - wantResult: []p2p.ID{}, - }, - { - name: "mixed peers", - fields: scTestParams{peers: map[string]*scPeer{ - "P1": {state: peerStateRemoved, lastRate: 101}, - "P2": {state: peerStateReady, lastRate: 101}, - "P3": {state: peerStateRemoved, lastRate: 100}, - "P4": {state: peerStateReady, lastRate: 100}, - "P5": {state: peerStateReady, lastRate: 99}, - "P6": {state: peerStateNew}, - "P7": {state: peerStateRemoved, lastRate: 99}, - "P8": {state: peerStateReady, lastRate: 99}, - }}, - args: args{minSpeed: 100}, - wantResult: []p2p.ID{"P5", "P8"}, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - sc := newTestScheduler(tt.fields) - // peersSlowerThan should not mutate the scheduler - wantSc := sc - res := sc.peersSlowerThan(tt.args.minSpeed) - assert.Equal(t, tt.wantResult, res) - assert.Equal(t, wantSc, sc) - }) - } -} - func TestScPrunablePeers(t *testing.T) { now := time.Now() @@ -716,8 +539,8 @@ func TestScSetPeerHeight(t *testing.T) { allB: []int64{1, 2, 3, 4}}, args: args{peerID: "P1", height: 2}, wantFields: scTestParams{ - peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, - allB: []int64{1, 2, 3, 4}}, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}, + allB: []int64{}}, wantErr: true, }, { @@ -845,7 +668,7 @@ func TestScGetPeersAtHeight(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { sc := newTestScheduler(tt.fields) - // getPeersAtHeightOrAbove should not mutate the scheduler + // getPeersAtHeight should not mutate the scheduler wantSc := sc res := sc.getPeersAtHeightOrAbove(tt.args.height) sort.Sort(PeerByID(res)) @@ -1082,8 +905,11 @@ func TestScMarkReceived(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { sc := newTestScheduler(tt.fields) - if err := sc.markReceived(tt.args.peerID, - tt.args.height, tt.args.size, now.Add(time.Second)); (err != nil) != tt.wantErr { + if err := sc.markReceived( + tt.args.peerID, + tt.args.height, + tt.args.size, + now.Add(time.Second)); (err != nil) != tt.wantErr { t.Errorf("markReceived() wantErr %v, error = %v", tt.wantErr, err) } wantSc := newTestScheduler(tt.wantFields) @@ -1145,11 +971,17 @@ func TestScMarkProcessed(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { sc := newTestScheduler(tt.fields) + oldBlockState := sc.getStateAtHeight(tt.args.height) if err := sc.markProcessed(tt.args.height); (err != nil) != tt.wantErr { t.Errorf("markProcessed() wantErr %v, error = %v", tt.wantErr, err) } + if tt.wantErr { + assert.Equal(t, oldBlockState, sc.getStateAtHeight(tt.args.height)) + } else { + assert.Equal(t, blockStateProcessed, sc.getStateAtHeight(tt.args.height)) + } wantSc := newTestScheduler(tt.wantFields) - assert.Equal(t, wantSc, sc) + checkSameScheduler(t, wantSc, sc) }) } } @@ -1163,9 +995,9 @@ func TestScAllBlocksProcessed(t *testing.T) { wantResult bool }{ { - name: "no blocks", + name: "no blocks, no peers", fields: scTestParams{}, - wantResult: true, + wantResult: false, }, { name: "only New blocks", @@ -1225,7 +1057,7 @@ func TestScAllBlocksProcessed(t *testing.T) { wantSc := sc res := sc.allBlocksProcessed() assert.Equal(t, tt.wantResult, res) - assert.Equal(t, wantSc, sc) + checkSameScheduler(t, wantSc, sc) }) } } @@ -1305,8 +1137,7 @@ func TestScNextHeightToSchedule(t *testing.T) { resMin := sc.nextHeightToSchedule() assert.Equal(t, tt.wantHeight, resMin) - assert.Equal(t, wantSc, sc) - + checkSameScheduler(t, wantSc, sc) }) } } @@ -1414,7 +1245,7 @@ func TestScSelectPeer(t *testing.T) { res, err := sc.selectPeer(tt.args.height) assert.Equal(t, tt.wantResult, res) assert.Equal(t, tt.wantError, err != nil) - assert.Equal(t, wantSc, sc) + checkSameScheduler(t, wantSc, sc) }) } } @@ -1424,6 +1255,20 @@ func makeScBlock(height int64) *types.Block { return &types.Block{Header: types.Header{Height: height}} } +// used in place of assert.Equal(t, want, actual) to avoid failures due to +// scheduler.lastAdvanced timestamp inequalities. +func checkSameScheduler(t *testing.T, want *scheduler, actual *scheduler) { + assert.Equal(t, want.initHeight, actual.initHeight) + assert.Equal(t, want.height, actual.height) + assert.Equal(t, want.peers, actual.peers) + assert.Equal(t, want.blockStates, actual.blockStates) + assert.Equal(t, want.pendingBlocks, actual.pendingBlocks) + assert.Equal(t, want.pendingTime, actual.pendingTime) + assert.Equal(t, want.blockStates, actual.blockStates) + assert.Equal(t, want.receivedBlocks, actual.receivedBlocks) + assert.Equal(t, want.blockStates, actual.blockStates) +} + // checkScResults checks scheduler handler test results func checkScResults(t *testing.T, wantErr bool, err error, wantEvent Event, event Event) { if (err != nil) != wantErr { @@ -1439,8 +1284,6 @@ func checkScResults(t *testing.T, wantErr bool, err error, wantEvent Event, even assert.Equal(t, wantEvent.block, event.(scBlockReceived).block) case scSchedulerFail: assert.Equal(t, wantEvent.reason != nil, event.(scSchedulerFail).reason != nil) - default: - assert.Equal(t, wantEvent, event) } } @@ -1449,7 +1292,6 @@ func TestScHandleBlockResponse(t *testing.T) { block6FromP1 := bcBlockResponse{ time: now.Add(time.Millisecond), peerID: p2p.ID("P1"), - height: 6, size: 100, block: makeScBlock(6), } @@ -1530,6 +1372,82 @@ func TestScHandleBlockResponse(t *testing.T) { } } +func TestScHandleNoBlockResponse(t *testing.T) { + now := time.Now() + noBlock6FromP1 := bcNoBlockResponse{ + time: now.Add(time.Millisecond), + peerID: p2p.ID("P1"), + height: 6, + } + + tests := []struct { + name string + fields scTestParams + wantEvent Event + wantFields scTestParams + wantErr bool + }{ + { + name: "empty scheduler", + fields: scTestParams{}, + wantEvent: noOpEvent{}, + wantFields: scTestParams{}, + }, + { + name: "noBlock from removed peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 8, state: peerStateRemoved}}}, + wantEvent: noOpEvent{}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: 8, state: peerStateRemoved}}}, + }, + { + name: "for block we haven't asked for", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: 8, state: peerStateRemoved}}}, + }, + { + name: "noBlock from peer we don't have", + fields: scTestParams{ + peers: map[string]*scPeer{"P2": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, + pending: map[int64]p2p.ID{6: "P2"}, + pendingTime: map[int64]time.Time{6: now}, + }, + wantEvent: noOpEvent{}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P2": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, + pending: map[int64]p2p.ID{6: "P2"}, + pendingTime: map[int64]time.Time{6: now}, + }, + }, + { + name: "noBlock from existing peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, + pending: map[int64]p2p.ID{6: "P1"}, + pendingTime: map[int64]time.Time{6: now}, + }, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: 8, state: peerStateRemoved}}}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleNoBlockResponse(noBlock6FromP1) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc) + }) + } +} + func TestScHandleBlockProcessed(t *testing.T) { now := time.Now() processed6FromP1 := pcBlockProcessed{ @@ -1702,11 +1620,11 @@ func TestScHandleBlockVerificationFailure(t *testing.T) { } func TestScHandleAddNewPeer(t *testing.T) { - addP1 := addNewPeer{ + addP1 := bcAddNewPeer{ peerID: p2p.ID("P1"), } type args struct { - event addNewPeer + event bcAddNewPeer } tests := []struct { @@ -1754,76 +1672,14 @@ func TestScHandleAddNewPeer(t *testing.T) { } } -func TestScHandlePeerError(t *testing.T) { - errP1 := peerError{ - peerID: p2p.ID("P1"), - } - type args struct { - event peerError - } - - tests := []struct { - name string - fields scTestParams - args args - wantEvent Event - wantErr bool - }{ - { - name: "no peers", - fields: scTestParams{}, - args: args{event: errP1}, - wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, - }, - { - name: "error finds no peer", - fields: scTestParams{ - height: 6, - peers: map[string]*scPeer{"P2": {height: 8, state: peerStateReady}}, - allB: []int64{6, 7, 8}, - }, - args: args{event: errP1}, - wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, - }, - { - name: "error finds peer, only peer is removed", - fields: scTestParams{ - height: 6, - peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, - allB: []int64{6, 7, 8}, - }, - args: args{event: errP1}, - wantEvent: scFinishedEv{}, - }, - { - name: "error finds peer, one of two peers are removed", - fields: scTestParams{ - peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}, "P2": {height: 8, state: peerStateReady}}, - allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, - }, - args: args{event: errP1}, - wantEvent: noOpEvent{}, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - sc := newTestScheduler(tt.fields) - event, err := sc.handlePeerError(tt.args.event) - checkScResults(t, tt.wantErr, err, tt.wantEvent, event) - }) - } -} - func TestScHandleTryPrunePeer(t *testing.T) { now := time.Now() - pruneEv := tryPrunePeer{ + pruneEv := rTryPrunePeer{ time: now.Add(time.Second + time.Millisecond), } type args struct { - event tryPrunePeer + event rTryPrunePeer } tests := []struct { @@ -1914,14 +1770,14 @@ func TestScHandleTryPrunePeer(t *testing.T) { } } -func TestHandleTrySchedule(t *testing.T) { +func TestScHandleTrySchedule(t *testing.T) { now := time.Now() - tryEv := trySchedule{ + tryEv := rTrySchedule{ time: now.Add(time.Second + time.Millisecond), } type args struct { - event trySchedule + event rTrySchedule } tests := []struct { name string @@ -1932,41 +1788,44 @@ func TestHandleTrySchedule(t *testing.T) { }{ { name: "no peers", - fields: scTestParams{peers: map[string]*scPeer{}}, + fields: scTestParams{startTime: now, peers: map[string]*scPeer{}}, args: args{event: tryEv}, wantEvent: noOpEvent{}, }, { name: "only new peers", - fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}}, + fields: scTestParams{startTime: now, peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}}, args: args{event: tryEv}, wantEvent: noOpEvent{}, }, { name: "only Removed peers", - fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}}, + fields: scTestParams{startTime: now, peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}}, args: args{event: tryEv}, wantEvent: noOpEvent{}, }, { name: "one Ready shorter peer", fields: scTestParams{ - height: 6, - peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}}, + startTime: now, + height: 6, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}}, args: args{event: tryEv}, wantEvent: noOpEvent{}, }, { name: "one Ready equal peer", fields: scTestParams{ - peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, - allB: []int64{1, 2, 3, 4}}, + startTime: now, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, args: args{event: tryEv}, wantEvent: scBlockRequest{peerID: "P1", height: 1}, }, { name: "many Ready higher peers with different number of pending requests", fields: scTestParams{ + startTime: now, peers: map[string]*scPeer{ "P1": {height: 4, state: peerStateReady}, "P2": {height: 5, state: peerStateReady}}, @@ -1983,6 +1842,7 @@ func TestHandleTrySchedule(t *testing.T) { { name: "many Ready higher peers with same number of pending requests", fields: scTestParams{ + startTime: now, peers: map[string]*scPeer{ "P2": {height: 8, state: peerStateReady}, "P1": {height: 8, state: peerStateReady}, @@ -2084,6 +1944,8 @@ func TestScHandleStatusResponse(t *testing.T) { } func TestScHandle(t *testing.T) { + now := time.Now() + type unknownEv struct { priorityNormal } @@ -2123,24 +1985,27 @@ func TestScHandle(t *testing.T) { name: "single peer, sync 3 blocks", steps: []scStep{ { // add P1 - currentSc: &scTestParams{peers: map[string]*scPeer{}, height: 1}, - args: args{event: addNewPeer{peerID: "P1"}}, + currentSc: &scTestParams{startTime: now, peers: map[string]*scPeer{}, height: 1}, + args: args{event: bcAddNewPeer{peerID: "P1"}}, wantEvent: noOpEvent{}, - wantSc: &scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}, height: 1}, + wantSc: &scTestParams{startTime: now, peers: map[string]*scPeer{ + "P1": {height: -1, state: peerStateNew}}, height: 1}, }, { // set height of P1 args: args{event: bcStatusResponse{peerID: "P1", time: tick[0], height: 3}}, wantEvent: noOpEvent{}, wantSc: &scTestParams{ - peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, - allB: []int64{1, 2, 3}, - height: 1, + startTime: now, + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, + allB: []int64{1, 2, 3}, + height: 1, }, }, { // schedule block 1 - args: args{event: trySchedule{time: tick[1]}}, + args: args{event: rTrySchedule{time: tick[1]}}, wantEvent: scBlockRequest{peerID: "P1", height: 1}, wantSc: &scTestParams{ + startTime: now, peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, allB: []int64{1, 2, 3}, pending: map[int64]p2p.ID{1: "P1"}, @@ -2149,9 +2014,10 @@ func TestScHandle(t *testing.T) { }, }, { // schedule block 2 - args: args{event: trySchedule{time: tick[2]}}, + args: args{event: rTrySchedule{time: tick[2]}}, wantEvent: scBlockRequest{peerID: "P1", height: 2}, wantSc: &scTestParams{ + startTime: now, peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, allB: []int64{1, 2, 3}, pending: map[int64]p2p.ID{1: "P1", 2: "P1"}, @@ -2160,9 +2026,10 @@ func TestScHandle(t *testing.T) { }, }, { // schedule block 3 - args: args{event: trySchedule{time: tick[3]}}, + args: args{event: rTrySchedule{time: tick[3]}}, wantEvent: scBlockRequest{peerID: "P1", height: 3}, wantSc: &scTestParams{ + startTime: now, peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, allB: []int64{1, 2, 3}, pending: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1"}, @@ -2171,9 +2038,10 @@ func TestScHandle(t *testing.T) { }, }, { // block response 1 - args: args{event: bcBlockResponse{peerID: "P1", height: 1, time: tick[4], size: 100, block: makeScBlock(1)}}, + args: args{event: bcBlockResponse{peerID: "P1", time: tick[4], size: 100, block: makeScBlock(1)}}, wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(1)}, wantSc: &scTestParams{ + startTime: now, peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[4]}}, allB: []int64{1, 2, 3}, pending: map[int64]p2p.ID{2: "P1", 3: "P1"}, @@ -2183,9 +2051,10 @@ func TestScHandle(t *testing.T) { }, }, { // block response 2 - args: args{event: bcBlockResponse{peerID: "P1", height: 2, time: tick[5], size: 100, block: makeScBlock(2)}}, + args: args{event: bcBlockResponse{peerID: "P1", time: tick[5], size: 100, block: makeScBlock(2)}}, wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(2)}, wantSc: &scTestParams{ + startTime: now, peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[5]}}, allB: []int64{1, 2, 3}, pending: map[int64]p2p.ID{3: "P1"}, @@ -2195,33 +2064,36 @@ func TestScHandle(t *testing.T) { }, }, { // block response 3 - args: args{event: bcBlockResponse{peerID: "P1", height: 3, time: tick[6], size: 100, block: makeScBlock(3)}}, + args: args{event: bcBlockResponse{peerID: "P1", time: tick[6], size: 100, block: makeScBlock(3)}}, wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(3)}, wantSc: &scTestParams{ - peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, - allB: []int64{1, 2, 3}, - received: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1"}, - height: 1, + startTime: now, + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{1, 2, 3}, + received: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1"}, + height: 1, }, }, { // processed block 1 args: args{event: pcBlockProcessed{peerID: p2p.ID("P1"), height: 1}}, wantEvent: noOpEvent{}, wantSc: &scTestParams{ - peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, - allB: []int64{2, 3}, - received: map[int64]p2p.ID{2: "P1", 3: "P1"}, - height: 2, + startTime: now, + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{2, 3}, + received: map[int64]p2p.ID{2: "P1", 3: "P1"}, + height: 2, }, }, { // processed block 2 args: args{event: pcBlockProcessed{peerID: p2p.ID("P1"), height: 2}}, wantEvent: scFinishedEv{}, wantSc: &scTestParams{ - peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, - allB: []int64{3}, - received: map[int64]p2p.ID{3: "P1"}, - height: 3, + startTime: now, + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{3}, + received: map[int64]p2p.ID{3: "P1"}, + height: 3, }, }, }, @@ -2231,6 +2103,7 @@ func TestScHandle(t *testing.T) { steps: []scStep{ { // failure processing block 1 currentSc: &scTestParams{ + startTime: now, peers: map[string]*scPeer{ "P1": {height: 4, state: peerStateReady, lastTouched: tick[6]}, "P2": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, @@ -2241,6 +2114,7 @@ func TestScHandle(t *testing.T) { args: args{event: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P1"}}, wantEvent: noOpEvent{}, wantSc: &scTestParams{ + startTime: now, peers: map[string]*scPeer{ "P1": {height: 4, state: peerStateRemoved, lastTouched: tick[6]}, "P2": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, @@ -2249,19 +2123,6 @@ func TestScHandle(t *testing.T) { height: 1, }, }, - /* - { // processed block 2 - args: args{event: pcBlockProcessed{peerID: p2p.ID("P1"), height: 2}}, - wantEvent: scFinishedEv{}, - wantSc: &scTestParams{ - peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, - allB: []int64{3}, - received: map[int64]p2p.ID{3: "P1"}, - height: 3, - }, - }, - - */ }, }, } @@ -2280,8 +2141,10 @@ func TestScHandle(t *testing.T) { } nextEvent, err := sc.handle(step.args.event) - assert.Equal(t, newTestScheduler(*step.wantSc), sc) + wantSc := newTestScheduler(*step.wantSc) + t.Logf("step %d(%v): %s", i, step.args.event, sc) + checkSameScheduler(t, wantSc, sc) checkScResults(t, step.wantErr, err, step.wantEvent, nextEvent) diff --git a/blockchain/v2/types.go b/blockchain/v2/types.go index 836e87fd8..7a73728e4 100644 --- a/blockchain/v2/types.go +++ b/blockchain/v2/types.go @@ -4,6 +4,7 @@ import ( "github.com/Workiva/go-datastructures/queue" ) +// Event is the type that can be added to the priority queue. type Event queue.Item type priority interface { diff --git a/docs/architecture/adr-043-blockchain-riri-org.md b/docs/architecture/adr-043-blockchain-riri-org.md index 8daef1817..6bb018f51 100644 --- a/docs/architecture/adr-043-blockchain-riri-org.md +++ b/docs/architecture/adr-043-blockchain-riri-org.md @@ -4,6 +4,7 @@ * 18-06-2019: Initial draft * 08-07-2019: Reviewed * 29-11-2019: Implemented +* 14-02-2020: Updated with the implementation details ## Context @@ -27,7 +28,15 @@ This ADR is meant to specify the missing components and control necessary to ach Partition the responsibilities of the blockchain reactor into a set of components which communicate exclusively with events. Events will contain timestamps allowing each component to track time as internal state. The internal state will be mutated by a set of `handle*` which will produce event(s). The integration between components will happen in the reactor and reactor tests will then become integration tests between components. This design will be known as `v2`. ![v2 Blockchain Reactor Architecture -Diagram](https://github.com/tendermint/tendermint/blob/f9e556481654a24aeb689bdadaf5eab3ccd66829/docs/architecture/img/blockchain-reactor-v2.png) +Diagram](https://github.com/tendermint/tendermint/blob/584e67ac3fac220c5c3e0652e3582eca8231e814/docs/architecture/img/blockchain-reactor-v2.png) + +### Fast Sync Related Communication Channels + +The diagram below shows the fast sync routines and the types of channels and queues used to communicate with each other. +In addition the per reactor channels used by the sendRoutine to send messages over the Peer MConnection are shown. + +![v2 Blockchain Channels and Queues +Diagram](https://github.com/tendermint/tendermint/blob/5cf570690f989646fb3b615b734da503f038891f/docs/architecture/img/blockchain-v2-channels.png) ### Reactor changes in detail diff --git a/docs/architecture/img/blockchain-reactor-v2.png b/docs/architecture/img/blockchain-reactor-v2.png index 086bf71bd..5f15333a0 100644 Binary files a/docs/architecture/img/blockchain-reactor-v2.png and b/docs/architecture/img/blockchain-reactor-v2.png differ diff --git a/docs/architecture/img/blockchain-v2-channels.png b/docs/architecture/img/blockchain-v2-channels.png new file mode 100644 index 000000000..69886da95 Binary files /dev/null and b/docs/architecture/img/blockchain-v2-channels.png differ