Browse Source

blockchain: add v2 reactor (#4361)

The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor. This PR replaces #4067 which got far too large and messy after a failed attempt to rebase.

## Commits:

* Blockchainv 2 reactor:

	+ I cleaner copy of the work done in #4067 which fell too far behind and was a nightmare to rebase.
	+ The work includes the reactor which ties together all the seperate routines involved in the design of the blockchain v2 refactor.

* fixes after merge

* reorder iIO interface methodset

* change iO -> IO

* panic before send nil block

* rename switchToConsensus -> trySwitchToConsensus

* rename tdState -> tmState

* Update blockchain/v2/reactor.go

Co-Authored-By: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com>

* remove peer when it sends a block unsolicited

* check for not ready in markReceived

* fix error

* fix the pcFinished event

* typo fix

* add documentation for processor fields

* simplify time.Since

* try and make the linter happy

* some doc updates

* fix channel diagram

* Update adr-043-blockchain-riri-org.md

* panic on nil switch

* liting fixes

* account for nil block in bBlockResponseMessage

* panic on duplicate block enqueued by processor

* linting

* goimport reactor_test.go

Co-authored-by: Bot from GolangCI <42910462+golangcibot@users.noreply.github.com>
Co-authored-by: Anca Zamfir <ancazamfir@users.noreply.github.com>
Co-authored-by: Marko <marbar3778@yahoo.com>
Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
pull/4435/head
Sean Braithwaite 5 years ago
committed by GitHub
parent
commit
ee993ba8ff
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1536 additions and 704 deletions
  1. +1
    -1
      behaviour/reporter.go
  2. +2
    -2
      blockchain/v1/reactor.go
  3. +13
    -0
      blockchain/v2/codec.go
  4. +111
    -0
      blockchain/v2/io.go
  5. +1
    -0
      blockchain/v2/metrics.go
  6. +55
    -69
      blockchain/v2/processor.go
  7. +33
    -18
      blockchain/v2/processor_context.go
  8. +59
    -110
      blockchain/v2/processor_test.go
  9. +462
    -51
      blockchain/v2/reactor.go
  10. +504
    -9
      blockchain/v2/reactor_test.go
  11. +3
    -2
      blockchain/v2/routine.go
  12. +90
    -113
      blockchain/v2/scheduler.go
  13. +191
    -328
      blockchain/v2/scheduler_test.go
  14. +1
    -0
      blockchain/v2/types.go
  15. +10
    -1
      docs/architecture/adr-043-blockchain-riri-org.md
  16. BIN
      docs/architecture/img/blockchain-reactor-v2.png
  17. BIN
      docs/architecture/img/blockchain-v2-channels.png

+ 1
- 1
behaviour/reporter.go View File

@ -19,7 +19,7 @@ type SwitchReporter struct {
}
// NewSwitchReporter return a new SwitchReporter instance which wraps the Switch.
func NewSwitcReporter(sw *p2p.Switch) *SwitchReporter {
func NewSwitchReporter(sw *p2p.Switch) *SwitchReporter {
return &SwitchReporter{
sw: sw,
}


+ 2
- 2
blockchain/v1/reactor.go View File

@ -103,7 +103,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st
fsm := NewFSM(startHeight, bcR)
bcR.fsm = fsm
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
//bcR.swReporter = behaviour.NewSwitcReporter(bcR.BaseReactor.Switch)
//bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)
return bcR
}
@ -141,7 +141,7 @@ func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
// OnStart implements service.Service.
func (bcR *BlockchainReactor) OnStart() error {
bcR.swReporter = behaviour.NewSwitcReporter(bcR.BaseReactor.Switch)
bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)
if bcR.fastSync {
go bcR.poolRoutine()
}


+ 13
- 0
blockchain/v2/codec.go View File

@ -0,0 +1,13 @@
package v2
import (
amino "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/types"
)
var cdc = amino.NewCodec()
func init() {
RegisterBlockchainMessages(cdc)
types.RegisterBlockAmino(cdc)
}

+ 111
- 0
blockchain/v2/io.go View File

@ -0,0 +1,111 @@
package v2
import (
"fmt"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
type iIO interface {
sendBlockRequest(peerID p2p.ID, height int64) error
sendBlockToPeer(block *types.Block, peerID p2p.ID) error
sendBlockNotFound(height int64, peerID p2p.ID) error
sendStatusResponse(height int64, peerID p2p.ID) error
broadcastStatusRequest(height int64)
trySwitchToConsensus(state state.State, blocksSynced int)
}
type switchIO struct {
sw *p2p.Switch
}
func newSwitchIo(sw *p2p.Switch) *switchIO {
return &switchIO{
sw: sw,
}
}
const (
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40)
)
type consensusReactor interface {
// for when we switch from blockchain reactor and fast sync to
// the consensus machine
SwitchToConsensus(state.State, int)
}
func (sio *switchIO) sendBlockRequest(peerID p2p.ID, height int64) error {
peer := sio.sw.Peers().Get(peerID)
if peer == nil {
return fmt.Errorf("peer not found")
}
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockRequestMessage{Height: height})
queued := peer.TrySend(BlockchainChannel, msgBytes)
if !queued {
return fmt.Errorf("send queue full")
}
return nil
}
func (sio *switchIO) sendStatusResponse(height int64, peerID p2p.ID) error {
peer := sio.sw.Peers().Get(peerID)
if peer == nil {
return fmt.Errorf("peer not found")
}
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{Height: height})
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
return fmt.Errorf("peer queue full")
}
return nil
}
func (sio *switchIO) sendBlockToPeer(block *types.Block, peerID p2p.ID) error {
peer := sio.sw.Peers().Get(peerID)
if peer == nil {
return fmt.Errorf("peer not found")
}
if block == nil {
panic("trying to send nil block")
}
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockResponseMessage{Block: block})
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
return fmt.Errorf("peer queue full")
}
return nil
}
func (sio *switchIO) sendBlockNotFound(height int64, peerID p2p.ID) error {
peer := sio.sw.Peers().Get(peerID)
if peer == nil {
return fmt.Errorf("peer not found")
}
msgBytes := cdc.MustMarshalBinaryBare(&bcNoBlockResponseMessage{Height: height})
if queued := peer.TrySend(BlockchainChannel, msgBytes); !queued {
return fmt.Errorf("peer queue full")
}
return nil
}
func (sio *switchIO) trySwitchToConsensus(state state.State, blocksSynced int) {
conR, ok := sio.sw.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(state, blocksSynced)
}
}
func (sio *switchIO) broadcastStatusRequest(height int64) {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{height})
// XXX: maybe we should use an io specific peer list here
sio.sw.Broadcast(BlockchainChannel, msgBytes)
}

+ 1
- 0
blockchain/v2/metrics.go View File

@ -37,6 +37,7 @@ type Metrics struct {
ErrorsShed metrics.Counter
}
// PrometheusMetrics returns metrics for in and out events, errors, etc. handled by routines.
// Can we burn in the routine name here?
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}


+ 55
- 69
blockchain/v2/processor.go View File

@ -4,23 +4,12 @@ import (
"fmt"
"github.com/tendermint/tendermint/p2p"
tdState "github.com/tendermint/tendermint/state"
tmState "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
type peerError struct {
priorityHigh
peerID p2p.ID
}
type pcDuplicateBlock struct {
priorityNormal
}
type pcShortBlock struct {
priorityNormal
}
// Events generated by the processor:
// block execution failure, event will indicate the peer(s) that caused the error
type pcBlockVerificationFailure struct {
priorityNormal
height int64
@ -28,24 +17,18 @@ type pcBlockVerificationFailure struct {
secondPeerID p2p.ID
}
// successful block execution
type pcBlockProcessed struct {
priorityNormal
height int64
peerID p2p.ID
}
type pcProcessBlock struct {
priorityNormal
}
type pcStop struct {
priorityNormal
}
// processor has finished
type pcFinished struct {
priorityNormal
height int64
blocksSynced int64
blocksSynced int
tmState tmState.State
}
func (p pcFinished) Error() string {
@ -60,37 +43,38 @@ type queueItem struct {
type blockQueue map[int64]queueItem
type pcState struct {
height int64 // height of the last synced block
queue blockQueue // blocks waiting to be processed
chainID string
blocksSynced int64
draining bool
tdState tdState.State
context processorContext
// blocks waiting to be processed
queue blockQueue
// draining indicates that the next rProcessBlock event with a queue miss constitutes completion
draining bool
// the number of blocks successfully synced by the processor
blocksSynced int
// the processorContext which contains the processor dependencies
context processorContext
}
func (state *pcState) String() string {
return fmt.Sprintf("height: %d queue length: %d draining: %v blocks synced: %d",
state.height, len(state.queue), state.draining, state.blocksSynced)
state.height(), len(state.queue), state.draining, state.blocksSynced)
}
// newPcState returns a pcState initialized with the last verified block enqueued
func newPcState(initHeight int64, tdState tdState.State, chainID string, context processorContext) *pcState {
func newPcState(context processorContext) *pcState {
return &pcState{
height: initHeight,
queue: blockQueue{},
chainID: chainID,
draining: false,
blocksSynced: 0,
context: context,
tdState: tdState,
}
}
// nextTwo returns the next two unverified blocks
func (state *pcState) nextTwo() (queueItem, queueItem, error) {
if first, ok := state.queue[state.height+1]; ok {
if second, ok := state.queue[state.height+2]; ok {
if first, ok := state.queue[state.height()+1]; ok {
if second, ok := state.queue[state.height()+2]; ok {
return first, second, nil
}
}
@ -102,18 +86,15 @@ func (state *pcState) synced() bool {
return len(state.queue) <= 1
}
func (state *pcState) advance() {
state.height++
delete(state.queue, state.height)
state.blocksSynced++
}
func (state *pcState) enqueue(peerID p2p.ID, block *types.Block, height int64) error {
func (state *pcState) enqueue(peerID p2p.ID, block *types.Block, height int64) {
if _, ok := state.queue[height]; ok {
return fmt.Errorf("duplicate queue item")
panic("duplicate block enqueued by processor")
}
state.queue[height] = queueItem{block: block, peerID: peerID}
return nil
}
func (state *pcState) height() int64 {
return state.context.tmState().LastBlockHeight
}
// purgePeer moves all unprocessed blocks from the queue
@ -129,23 +110,34 @@ func (state *pcState) purgePeer(peerID p2p.ID) {
// handle processes FSM events
func (state *pcState) handle(event Event) (Event, error) {
switch event := event.(type) {
case *scBlockReceived:
if event.block == nil {
panic("processor received an event with a nil block")
case scFinishedEv:
if state.synced() {
return pcFinished{tmState: state.context.tmState(), blocksSynced: state.blocksSynced}, nil
}
if event.block.Height <= state.height {
return pcShortBlock{}, nil
state.draining = true
return noOp, nil
case scPeerError:
state.purgePeer(event.peerID)
return noOp, nil
case scBlockReceived:
if event.block == nil {
return noOp, nil
}
err := state.enqueue(event.peerID, event.block, event.block.Height)
if err != nil {
return pcDuplicateBlock{}, nil
// enqueue block if height is higher than state height, else ignore it
if event.block.Height > state.height() {
state.enqueue(event.peerID, event.block, event.block.Height)
}
return noOp, nil
case pcProcessBlock:
case rProcessBlock:
tmState := state.context.tmState()
firstItem, secondItem, err := state.nextTwo()
if err != nil {
if state.draining {
return noOp, pcFinished{height: state.height}
return pcFinished{tmState: tmState, blocksSynced: state.blocksSynced}, nil
}
return noOp, nil
}
@ -155,7 +147,7 @@ func (state *pcState) handle(event Event) (Event, error) {
firstPartsHeader := firstParts.Header()
firstID := types.BlockID{Hash: first.Hash(), PartsHeader: firstPartsHeader}
err = state.context.verifyCommit(state.chainID, firstID, first.Height, second.LastCommit)
err = state.context.verifyCommit(tmState.ChainID, firstID, first.Height, second.LastCommit)
if err != nil {
state.purgePeer(firstItem.peerID)
state.purgePeer(secondItem.peerID)
@ -166,21 +158,15 @@ func (state *pcState) handle(event Event) (Event, error) {
state.context.saveBlock(first, firstParts, second.LastCommit)
state.tdState, err = state.context.applyBlock(state.tdState, firstID, first)
if err != nil {
if err := state.context.applyBlock(firstID, first); err != nil {
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
state.advance()
return pcBlockProcessed{height: first.Height, peerID: firstItem.peerID}, nil
case *peerError:
state.purgePeer(event.peerID)
delete(state.queue, first.Height)
state.blocksSynced++
return pcBlockProcessed{height: first.Height, peerID: firstItem.peerID}, nil
case pcStop:
if state.synced() {
return noOp, pcFinished{height: state.height, blocksSynced: state.blocksSynced}
}
state.draining = true
}
return noOp, nil


+ 33
- 18
blockchain/v2/processor_context.go View File

@ -4,37 +4,41 @@ import (
"fmt"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
type processorContext interface {
applyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, error)
applyBlock(blockID types.BlockID, block *types.Block) error
verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error
saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit)
tmState() state.State
}
// nolint:unused
type pContext struct {
store *store.BlockStore
executor *state.BlockExecutor
state *state.State
store blockStore
applier blockApplier
state state.State
}
// nolint:unused,deadcode
func newProcessorContext(st *store.BlockStore, ex *state.BlockExecutor, s *state.State) *pContext {
func newProcessorContext(st blockStore, ex blockApplier, s state.State) *pContext {
return &pContext{
store: st,
executor: ex,
state: s,
store: st,
applier: ex,
state: s,
}
}
func (pc *pContext) applyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, error) {
return pc.executor.ApplyBlock(state, blockID, block)
func (pc *pContext) applyBlock(blockID types.BlockID, block *types.Block) error {
newState, err := pc.applier.ApplyBlock(pc.state, blockID, block)
pc.state = newState
return err
}
func (pc *pContext) verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error {
func (pc pContext) tmState() state.State {
return pc.state
}
func (pc pContext) verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error {
return pc.state.Validators.VerifyCommit(chainID, blockID, height, commit)
}
@ -45,22 +49,28 @@ func (pc *pContext) saveBlock(block *types.Block, blockParts *types.PartSet, see
type mockPContext struct {
applicationBL []int64
verificationBL []int64
state state.State
}
func newMockProcessorContext(verificationBlackList []int64, applicationBlackList []int64) *mockPContext {
func newMockProcessorContext(
state state.State,
verificationBlackList []int64,
applicationBlackList []int64) *mockPContext {
return &mockPContext{
applicationBL: applicationBlackList,
verificationBL: verificationBlackList,
state: state,
}
}
func (mpc *mockPContext) applyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, error) {
func (mpc *mockPContext) applyBlock(blockID types.BlockID, block *types.Block) error {
for _, h := range mpc.applicationBL {
if h == block.Height {
return state, fmt.Errorf("generic application error")
return fmt.Errorf("generic application error")
}
}
return state, nil
mpc.state.LastBlockHeight = block.Height
return nil
}
func (mpc *mockPContext) verifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error {
@ -73,4 +83,9 @@ func (mpc *mockPContext) verifyCommit(chainID string, blockID types.BlockID, hei
}
func (mpc *mockPContext) saveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
}
func (mpc *mockPContext) tmState() state.State {
return mpc.state
}

+ 59
- 110
blockchain/v2/processor_test.go View File

@ -5,7 +5,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/tendermint/tendermint/p2p"
tdState "github.com/tendermint/tendermint/state"
tmState "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
@ -19,7 +19,7 @@ type pcBlock struct {
type params struct {
height int64
items []pcBlock
blocksSynced int64
blocksSynced int
verBL []int64
appBL []int64
draining bool
@ -33,13 +33,13 @@ func makePcBlock(height int64) *types.Block {
// makeState takes test parameters and creates a specific processor state.
func makeState(p *params) *pcState {
var (
tdState = tdState.State{}
context = newMockProcessorContext(p.verBL, p.appBL)
tmState = tmState.State{LastBlockHeight: p.height}
context = newMockProcessorContext(tmState, p.verBL, p.appBL)
)
state := newPcState(p.height, tdState, "test", context)
state := newPcState(context)
for _, item := range p.items {
_ = state.enqueue(p2p.ID(item.pid), makePcBlock(item.height), item.height)
state.enqueue(p2p.ID(item.pid), makePcBlock(item.height), item.height)
}
state.blocksSynced = p.blocksSynced
@ -47,8 +47,8 @@ func makeState(p *params) *pcState {
return state
}
func mBlockResponse(peerID p2p.ID, height int64) *scBlockReceived {
return &scBlockReceived{
func mBlockResponse(peerID p2p.ID, height int64) scBlockReceived {
return scBlockReceived{
peerID: peerID,
block: makePcBlock(height),
}
@ -101,72 +101,57 @@ func executeProcessorTests(t *testing.T, tests []testFields) {
}
}
func TestPcBlockResponse(t *testing.T) {
func TestRProcessPeerError(t *testing.T) {
tests := []testFields{
{
name: "add one block",
name: "error for existing peer",
steps: []pcFsmMakeStateValues{
{
currentState: &params{}, event: mBlockResponse("P1", 1),
wantState: &params{items: []pcBlock{{"P1", 1}}}, wantNextEvent: noOp,
currentState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}}},
event: scPeerError{peerID: "P2"},
wantState: &params{items: []pcBlock{{"P1", 1}}},
wantNextEvent: noOp,
},
},
},
{
name: "add two blocks",
name: "error for unknown peer",
steps: []pcFsmMakeStateValues{
{
currentState: &params{}, event: mBlockResponse("P1", 3),
wantState: &params{items: []pcBlock{{"P1", 3}}}, wantNextEvent: noOp,
},
{ // use previous wantState as currentState,
event: mBlockResponse("P1", 4),
wantState: &params{items: []pcBlock{{"P1", 3}, {"P1", 4}}}, wantNextEvent: noOp,
currentState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}}},
event: scPeerError{peerID: "P3"},
wantState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}}},
wantNextEvent: noOp,
},
},
},
}
executeProcessorTests(t, tests)
}
func TestPcBlockResponse(t *testing.T) {
tests := []testFields{
{
name: "add duplicate block from same peer",
name: "add one block",
steps: []pcFsmMakeStateValues{
{
currentState: &params{}, event: mBlockResponse("P1", 3),
wantState: &params{items: []pcBlock{{"P1", 3}}}, wantNextEvent: noOp,
},
{ // use previous wantState as currentState,
event: mBlockResponse("P1", 3),
wantState: &params{items: []pcBlock{{"P1", 3}}}, wantNextEvent: pcDuplicateBlock{},
currentState: &params{}, event: mBlockResponse("P1", 1),
wantState: &params{items: []pcBlock{{"P1", 1}}}, wantNextEvent: noOp,
},
},
},
{
name: "add duplicate block from different peer",
name: "add two blocks",
steps: []pcFsmMakeStateValues{
{
currentState: &params{}, event: mBlockResponse("P1", 3),
wantState: &params{items: []pcBlock{{"P1", 3}}}, wantNextEvent: noOp,
},
{ // use previous wantState as currentState,
event: mBlockResponse("P2", 3),
wantState: &params{items: []pcBlock{{"P1", 3}}}, wantNextEvent: pcDuplicateBlock{},
},
},
},
{
name: "attempt to add block with height equal to state.height",
steps: []pcFsmMakeStateValues{
{
currentState: &params{height: 2, items: []pcBlock{{"P1", 3}}}, event: mBlockResponse("P1", 2),
wantState: &params{height: 2, items: []pcBlock{{"P1", 3}}}, wantNextEvent: pcShortBlock{},
},
},
},
{
name: "attempt to add block with height smaller than state.height",
steps: []pcFsmMakeStateValues{
{
currentState: &params{height: 2, items: []pcBlock{{"P1", 3}}}, event: mBlockResponse("P1", 1),
wantState: &params{height: 2, items: []pcBlock{{"P1", 3}}}, wantNextEvent: pcShortBlock{},
event: mBlockResponse("P1", 4),
wantState: &params{items: []pcBlock{{"P1", 3}, {"P1", 4}}}, wantNextEvent: noOp,
},
},
},
@ -175,13 +160,13 @@ func TestPcBlockResponse(t *testing.T) {
executeProcessorTests(t, tests)
}
func TestPcProcessBlockSuccess(t *testing.T) {
func TestRProcessBlockSuccess(t *testing.T) {
tests := []testFields{
{
name: "noop - no blocks over current height",
steps: []pcFsmMakeStateValues{
{
currentState: &params{}, event: pcProcessBlock{},
currentState: &params{}, event: rProcessBlock{},
wantState: &params{}, wantNextEvent: noOp,
},
},
@ -190,7 +175,7 @@ func TestPcProcessBlockSuccess(t *testing.T) {
name: "noop - high new blocks",
steps: []pcFsmMakeStateValues{
{
currentState: &params{height: 5, items: []pcBlock{{"P1", 30}, {"P2", 31}}}, event: pcProcessBlock{},
currentState: &params{height: 5, items: []pcBlock{{"P1", 30}, {"P2", 31}}}, event: rProcessBlock{},
wantState: &params{height: 5, items: []pcBlock{{"P1", 30}, {"P2", 31}}}, wantNextEvent: noOp,
},
},
@ -199,7 +184,7 @@ func TestPcProcessBlockSuccess(t *testing.T) {
name: "blocks H+1 and H+2 present",
steps: []pcFsmMakeStateValues{
{
currentState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, event: pcProcessBlock{},
currentState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, event: rProcessBlock{},
wantState: &params{height: 1, items: []pcBlock{{"P2", 2}}, blocksSynced: 1},
wantNextEvent: pcBlockProcessed{height: 1, peerID: "P1"},
},
@ -209,20 +194,20 @@ func TestPcProcessBlockSuccess(t *testing.T) {
name: "blocks H+1 and H+2 present after draining",
steps: []pcFsmMakeStateValues{
{ // some contiguous blocks - on stop check draining is set
currentState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}, {"P1", 4}}}, event: pcStop{},
currentState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}, {"P1", 4}}},
event: scFinishedEv{},
wantState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}, {"P1", 4}}, draining: true},
wantNextEvent: noOp,
},
{
event: pcProcessBlock{},
event: rProcessBlock{},
wantState: &params{height: 1, items: []pcBlock{{"P2", 2}, {"P1", 4}}, blocksSynced: 1, draining: true},
wantNextEvent: pcBlockProcessed{height: 1, peerID: "P1"},
},
{ // finish when H+1 or/and H+2 are missing
event: pcProcessBlock{},
event: rProcessBlock{},
wantState: &params{height: 1, items: []pcBlock{{"P2", 2}, {"P1", 4}}, blocksSynced: 1, draining: true},
wantNextEvent: noOp,
wantErr: pcFinished{height: 1},
wantNextEvent: pcFinished{tmState: tmState.State{LastBlockHeight: 1}, blocksSynced: 1},
},
},
},
@ -231,13 +216,13 @@ func TestPcProcessBlockSuccess(t *testing.T) {
executeProcessorTests(t, tests)
}
func TestPcProcessBlockFailures(t *testing.T) {
func TestRProcessBlockFailures(t *testing.T) {
tests := []testFields{
{
name: "blocks H+1 and H+2 present from different peers - H+1 verification fails ",
steps: []pcFsmMakeStateValues{
{
currentState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}}, verBL: []int64{1}}, event: pcProcessBlock{},
currentState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}}, verBL: []int64{1}}, event: rProcessBlock{},
wantState: &params{items: []pcBlock{}, verBL: []int64{1}},
wantNextEvent: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P2"},
},
@ -247,7 +232,7 @@ func TestPcProcessBlockFailures(t *testing.T) {
name: "blocks H+1 and H+2 present from same peer - H+1 applyBlock fails ",
steps: []pcFsmMakeStateValues{
{
currentState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}}, appBL: []int64{1}}, event: pcProcessBlock{},
currentState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}}, appBL: []int64{1}}, event: rProcessBlock{},
wantState: &params{items: []pcBlock{}, appBL: []int64{1}}, wantPanic: true,
},
},
@ -256,9 +241,9 @@ func TestPcProcessBlockFailures(t *testing.T) {
name: "blocks H+1 and H+2 present from same peers - H+1 verification fails ",
steps: []pcFsmMakeStateValues{
{
currentState: &params{items: []pcBlock{{"P1", 1}, {"P1", 2}, {"P2", 3}}, verBL: []int64{1}},
event: pcProcessBlock{},
wantState: &params{items: []pcBlock{{"P2", 3}}, verBL: []int64{1}},
currentState: &params{height: 0, items: []pcBlock{{"P1", 1}, {"P1", 2}, {"P2", 3}},
verBL: []int64{1}}, event: rProcessBlock{},
wantState: &params{height: 0, items: []pcBlock{{"P2", 3}}, verBL: []int64{1}},
wantNextEvent: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P1"},
},
},
@ -268,7 +253,7 @@ func TestPcProcessBlockFailures(t *testing.T) {
steps: []pcFsmMakeStateValues{
{
currentState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}, {"P2", 3}}, appBL: []int64{1}},
event: pcProcessBlock{},
event: rProcessBlock{},
wantState: &params{items: []pcBlock{{"P2", 3}}, appBL: []int64{1}}, wantPanic: true,
},
},
@ -278,53 +263,15 @@ func TestPcProcessBlockFailures(t *testing.T) {
executeProcessorTests(t, tests)
}
func TestPcPeerError(t *testing.T) {
tests := []testFields{
{
name: "peer not present",
steps: []pcFsmMakeStateValues{
{
currentState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}}}, event: &peerError{peerID: "P3"},
wantState: &params{items: []pcBlock{{"P1", 1}, {"P2", 2}}},
wantNextEvent: noOp,
},
},
},
{
name: "some blocks are from errored peer",
steps: []pcFsmMakeStateValues{
{
currentState: &params{items: []pcBlock{{"P1", 100}, {"P1", 99}, {"P2", 101}}}, event: &peerError{peerID: "P1"},
wantState: &params{items: []pcBlock{{"P2", 101}}},
wantNextEvent: noOp,
},
},
},
{
name: "all blocks are from errored peer",
steps: []pcFsmMakeStateValues{
{
currentState: &params{items: []pcBlock{{"P1", 100}, {"P1", 99}}}, event: &peerError{peerID: "P1"},
wantState: &params{},
wantNextEvent: noOp,
},
},
},
}
executeProcessorTests(t, tests)
}
func TestStop(t *testing.T) {
func TestScFinishedEv(t *testing.T) {
tests := []testFields{
{
name: "no blocks",
steps: []pcFsmMakeStateValues{
{
currentState: &params{height: 100, items: []pcBlock{}, blocksSynced: 100}, event: pcStop{},
currentState: &params{height: 100, items: []pcBlock{}, blocksSynced: 100}, event: scFinishedEv{},
wantState: &params{height: 100, items: []pcBlock{}, blocksSynced: 100},
wantNextEvent: noOp,
wantErr: pcFinished{height: 100, blocksSynced: 100},
wantNextEvent: pcFinished{tmState: tmState.State{LastBlockHeight: 100}, blocksSynced: 100},
},
},
},
@ -332,10 +279,10 @@ func TestStop(t *testing.T) {
name: "maxHeight+1 block present",
steps: []pcFsmMakeStateValues{
{
currentState: &params{height: 100, items: []pcBlock{{"P1", 101}}, blocksSynced: 100}, event: pcStop{},
currentState: &params{height: 100, items: []pcBlock{
{"P1", 101}}, blocksSynced: 100}, event: scFinishedEv{},
wantState: &params{height: 100, items: []pcBlock{{"P1", 101}}, blocksSynced: 100},
wantNextEvent: noOp,
wantErr: pcFinished{height: 100, blocksSynced: 100},
wantNextEvent: pcFinished{tmState: tmState.State{LastBlockHeight: 100}, blocksSynced: 100},
},
},
},
@ -343,8 +290,10 @@ func TestStop(t *testing.T) {
name: "more blocks present",
steps: []pcFsmMakeStateValues{
{
currentState: &params{height: 100, items: []pcBlock{{"P1", 101}, {"P1", 102}}, blocksSynced: 100}, event: pcStop{},
wantState: &params{height: 100, items: []pcBlock{{"P1", 101}, {"P1", 102}}, blocksSynced: 100, draining: true},
currentState: &params{height: 100, items: []pcBlock{
{"P1", 101}, {"P1", 102}}, blocksSynced: 100}, event: scFinishedEv{},
wantState: &params{height: 100, items: []pcBlock{
{"P1", 101}, {"P1", 102}}, blocksSynced: 100, draining: true},
wantNextEvent: noOp,
wantErr: nil,
},


+ 462
- 51
blockchain/v2/reactor.go View File

@ -1,118 +1,529 @@
package v2
import (
"errors"
"fmt"
"sync"
"time"
"github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/behaviour"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
type timeCheck struct {
priorityHigh
time time.Time
//-------------------------------------
type bcBlockRequestMessage struct {
Height int64
}
// ValidateBasic performs basic validation.
func (m *bcBlockRequestMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
return nil
}
func (m *bcBlockRequestMessage) String() string {
return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
}
type bcNoBlockResponseMessage struct {
Height int64
}
// ValidateBasic performs basic validation.
func (m *bcNoBlockResponseMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
return nil
}
func (m *bcNoBlockResponseMessage) String() string {
return fmt.Sprintf("[bcNoBlockResponseMessage %d]", m.Height)
}
//-------------------------------------
type bcBlockResponseMessage struct {
Block *types.Block
}
// ValidateBasic performs basic validation.
func (m *bcBlockResponseMessage) ValidateBasic() error {
if m.Block == nil {
return errors.New("block response message has nil block")
}
return m.Block.ValidateBasic()
}
func (m *bcBlockResponseMessage) String() string {
return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
}
func schedulerHandle(event Event) (Event, error) {
if _, ok := event.(timeCheck); ok {
fmt.Println("scheduler handle timeCheck")
//-------------------------------------
type bcStatusRequestMessage struct {
Height int64
}
// ValidateBasic performs basic validation.
func (m *bcStatusRequestMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
return noOp, nil
return nil
}
func (m *bcStatusRequestMessage) String() string {
return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
}
//-------------------------------------
type bcStatusResponseMessage struct {
Height int64
}
func processorHandle(event Event) (Event, error) {
if _, ok := event.(timeCheck); ok {
fmt.Println("processor handle timeCheck")
// ValidateBasic performs basic validation.
func (m *bcStatusResponseMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
return noOp, nil
return nil
}
func (m *bcStatusResponseMessage) String() string {
return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
}
type blockStore interface {
LoadBlock(height int64) *types.Block
SaveBlock(*types.Block, *types.PartSet, *types.Commit)
Height() int64
}
type Reactor struct {
events chan Event
// BlockchainReactor handles fast sync protocol.
type BlockchainReactor struct {
p2p.BaseReactor
events chan Event // XXX: Rename eventsFromPeers
stopDemux chan struct{}
scheduler *Routine
processor *Routine
ticker *time.Ticker
logger log.Logger
mtx sync.RWMutex
maxPeerHeight int64
syncHeight int64
reporter behaviour.Reporter
io iIO
store blockStore
}
//nolint:unused,deadcode
type blockVerifier interface {
VerifyCommit(chainID string, blockID types.BlockID, height int64, commit *types.Commit) error
}
func NewReactor(bufferSize int) *Reactor {
return &Reactor{
//nolint:deadcode
type blockApplier interface {
ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, error)
}
// XXX: unify naming in this package around tmState
// XXX: V1 stores a copy of state as initialState, which is never mutated. Is that nessesary?
func newReactor(state state.State, store blockStore, reporter behaviour.Reporter,
blockApplier blockApplier, bufferSize int) *BlockchainReactor {
scheduler := newScheduler(state.LastBlockHeight, time.Now())
pContext := newProcessorContext(store, blockApplier, state)
// TODO: Fix naming to just newProcesssor
// newPcState requires a processorContext
processor := newPcState(pContext)
return &BlockchainReactor{
events: make(chan Event, bufferSize),
stopDemux: make(chan struct{}),
scheduler: newRoutine("scheduler", schedulerHandle, bufferSize),
processor: newRoutine("processor", processorHandle, bufferSize),
ticker: time.NewTicker(1 * time.Second),
scheduler: newRoutine("scheduler", scheduler.handle, bufferSize),
processor: newRoutine("processor", processor.handle, bufferSize),
store: store,
reporter: reporter,
logger: log.NewNopLogger(),
}
}
// nolint:unused
func (r *Reactor) setLogger(logger log.Logger) {
// NewBlockchainReactor creates a new reactor instance.
func NewBlockchainReactor(
state state.State,
blockApplier blockApplier,
store blockStore,
fastSync bool) *BlockchainReactor {
reporter := behaviour.NewMockReporter()
return newReactor(state, store, reporter, blockApplier, 1000)
}
// SetSwitch implements Reactor interface.
func (r *BlockchainReactor) SetSwitch(sw *p2p.Switch) {
if sw == nil {
panic("set nil switch")
}
r.Switch = sw
r.io = newSwitchIo(sw)
}
func (r *BlockchainReactor) setMaxPeerHeight(height int64) {
r.mtx.Lock()
defer r.mtx.Unlock()
if height > r.maxPeerHeight {
r.maxPeerHeight = height
}
}
func (r *BlockchainReactor) setSyncHeight(height int64) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.syncHeight = height
}
// SyncHeight returns the height to which the BlockchainReactor has synced.
func (r *BlockchainReactor) SyncHeight() int64 {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.syncHeight
}
// SetLogger sets the logger of the reactor.
func (r *BlockchainReactor) SetLogger(logger log.Logger) {
r.logger = logger
r.scheduler.setLogger(logger)
r.processor.setLogger(logger)
}
func (r *Reactor) Start() {
// Start implements cmn.Service interface
func (r *BlockchainReactor) Start() error {
r.reporter = behaviour.NewSwitchReporter(r.BaseReactor.Switch)
go r.scheduler.start()
go r.processor.start()
go r.demux()
return nil
}
<-r.scheduler.ready()
<-r.processor.ready()
// reactor generated ticker events:
// ticker for cleaning peers
type rTryPrunePeer struct {
priorityHigh
time time.Time
}
go func() {
for t := range r.ticker.C {
r.events <- timeCheck{time: t}
}
}()
func (e rTryPrunePeer) String() string {
return fmt.Sprintf(": %v", e.time)
}
// XXX: How to make this deterministic?
// XXX: Would it be possible here to provide some kind of type safety for the types
// of events that each routine can produce and consume?
func (r *Reactor) demux() {
// ticker event for scheduling block requests
type rTrySchedule struct {
priorityHigh
time time.Time
}
func (e rTrySchedule) String() string {
return fmt.Sprintf(": %v", e.time)
}
// ticker for block processing
type rProcessBlock struct {
priorityNormal
}
// reactor generated events based on blockchain related messages from peers:
// blockResponse message received from a peer
type bcBlockResponse struct {
priorityNormal
time time.Time
peerID p2p.ID
size int64
block *types.Block
}
// blockNoResponse message received from a peer
type bcNoBlockResponse struct {
priorityNormal
time time.Time
peerID p2p.ID
height int64
}
// statusResponse message received from a peer
type bcStatusResponse struct {
priorityNormal
time time.Time
peerID p2p.ID
height int64
}
// new peer is connected
type bcAddNewPeer struct {
priorityNormal
peerID p2p.ID
}
// existing peer is removed
type bcRemovePeer struct {
priorityHigh
peerID p2p.ID
reason interface{}
}
func (r *BlockchainReactor) demux() {
var lastRate = 0.0
var lastHundred = time.Now()
var (
processBlockFreq = 20 * time.Millisecond
doProcessBlockCh = make(chan struct{}, 1)
doProcessBlockTk = time.NewTicker(processBlockFreq)
prunePeerFreq = 1 * time.Second
doPrunePeerCh = make(chan struct{}, 1)
doPrunePeerTk = time.NewTicker(prunePeerFreq)
scheduleFreq = 20 * time.Millisecond
doScheduleCh = make(chan struct{}, 1)
doScheduleTk = time.NewTicker(scheduleFreq)
statusFreq = 10 * time.Second
doStatusCh = make(chan struct{}, 1)
doStatusTk = time.NewTicker(statusFreq)
)
// XXX: Extract timers to make testing atemporal
for {
select {
// Pacers: send at most per frequency but don't saturate
case <-doProcessBlockTk.C:
select {
case doProcessBlockCh <- struct{}{}:
default:
}
case <-doPrunePeerTk.C:
select {
case doPrunePeerCh <- struct{}{}:
default:
}
case <-doScheduleTk.C:
select {
case doScheduleCh <- struct{}{}:
default:
}
case <-doStatusTk.C:
select {
case doStatusCh <- struct{}{}:
default:
}
// Tickers: perform tasks periodically
case <-doScheduleCh:
r.scheduler.send(rTrySchedule{time: time.Now()})
case <-doPrunePeerCh:
r.scheduler.send(rTryPrunePeer{time: time.Now()})
case <-doProcessBlockCh:
r.processor.send(rProcessBlock{})
case <-doStatusCh:
r.io.broadcastStatusRequest(r.SyncHeight())
// Events from peers
case event := <-r.events:
// XXX: check for backpressure
r.scheduler.send(event)
r.processor.send(event)
case <-r.stopDemux:
r.logger.Info("demuxing stopped")
return
switch event := event.(type) {
case bcStatusResponse:
r.setMaxPeerHeight(event.height)
r.scheduler.send(event)
case bcAddNewPeer, bcRemovePeer, bcBlockResponse, bcNoBlockResponse:
r.scheduler.send(event)
}
// Incremental events form scheduler
case event := <-r.scheduler.next():
r.processor.send(event)
switch event := event.(type) {
case scBlockReceived:
r.processor.send(event)
case scPeerError:
r.processor.send(event)
r.reporter.Report(behaviour.BadMessage(event.peerID, "scPeerError"))
case scBlockRequest:
r.io.sendBlockRequest(event.peerID, event.height)
case scFinishedEv:
r.processor.send(event)
r.scheduler.stop()
}
// Incremental events from processor
case event := <-r.processor.next():
r.scheduler.send(event)
switch event := event.(type) {
case pcBlockProcessed:
r.setSyncHeight(event.height)
if r.syncHeight%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
r.logger.Info("Fast Syncc Rate", "height", r.syncHeight,
"max_peer_height", r.maxPeerHeight, "blocks/s", lastRate)
lastHundred = time.Now()
}
r.scheduler.send(event)
case pcBlockVerificationFailure:
r.scheduler.send(event)
case pcFinished:
r.io.trySwitchToConsensus(event.tmState, event.blocksSynced)
r.processor.stop()
}
// Terminal events from scheduler
case err := <-r.scheduler.final():
r.logger.Info(fmt.Sprintf("scheduler final %s", err))
case err := <-r.processor.final():
r.logger.Info(fmt.Sprintf("processor final %s", err))
// XXX: switch to consensus
// send the processor stop?
// Terminal event from processor
case event := <-r.processor.final():
r.logger.Info(fmt.Sprintf("processor final %s", event))
case <-r.stopDemux:
r.logger.Info("demuxing stopped")
return
}
}
}
func (r *Reactor) Stop() {
// Stop implements cmn.Service interface.
func (r *BlockchainReactor) Stop() error {
r.logger.Info("reactor stopping")
r.ticker.Stop()
r.scheduler.stop()
r.processor.stop()
close(r.stopDemux)
close(r.events)
r.logger.Info("reactor stopped")
return nil
}
const (
// NOTE: keep up to date with bcBlockResponseMessage
bcBlockResponseMessagePrefixSize = 4
bcBlockResponseMessageFieldKeySize = 1
maxMsgSize = types.MaxBlockSizeBytes +
bcBlockResponseMessagePrefixSize +
bcBlockResponseMessageFieldKeySize
)
// BlockchainMessage is a generic message for this reactor.
type BlockchainMessage interface {
ValidateBasic() error
}
// RegisterBlockchainMessages registers the fast sync messages for amino encoding.
func RegisterBlockchainMessages(cdc *amino.Codec) {
cdc.RegisterInterface((*BlockchainMessage)(nil), nil)
cdc.RegisterConcrete(&bcBlockRequestMessage{}, "tendermint/blockchain/BlockRequest", nil)
cdc.RegisterConcrete(&bcBlockResponseMessage{}, "tendermint/blockchain/BlockResponse", nil)
cdc.RegisterConcrete(&bcNoBlockResponseMessage{}, "tendermint/blockchain/NoBlockResponse", nil)
cdc.RegisterConcrete(&bcStatusResponseMessage{}, "tendermint/blockchain/StatusResponse", nil)
cdc.RegisterConcrete(&bcStatusRequestMessage{}, "tendermint/blockchain/StatusRequest", nil)
}
func (r *Reactor) Receive(event Event) {
// XXX: decode and serialize write events
// TODO: backpressure
func decodeMsg(bz []byte) (msg BlockchainMessage, err error) {
if len(bz) > maxMsgSize {
return msg, fmt.Errorf("msg exceeds max size (%d > %d)", len(bz), maxMsgSize)
}
err = cdc.UnmarshalBinaryBare(bz, &msg)
return
}
// Receive implements Reactor by handling different message types.
func (r *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes)
if err != nil {
r.logger.Error("error decoding message",
"src", src.ID(), "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
return
}
if err = msg.ValidateBasic(); err != nil {
r.logger.Error("peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
_ = r.reporter.Report(behaviour.BadMessage(src.ID(), err.Error()))
return
}
r.logger.Debug("Receive", "src", src.ID(), "chID", chID, "msg", msg)
switch msg := msg.(type) {
case *bcStatusRequestMessage:
if err := r.io.sendStatusResponse(r.store.Height(), src.ID()); err != nil {
r.logger.Error("Could not send status message to peer", "src", src)
}
case *bcBlockRequestMessage:
block := r.store.LoadBlock(msg.Height)
if block != nil {
if err = r.io.sendBlockToPeer(block, src.ID()); err != nil {
r.logger.Error("Could not send block message to peer: ", err)
}
} else {
r.logger.Info("peer asking for a block we don't have", "src", src, "height", msg.Height)
peerID := src.ID()
if err = r.io.sendBlockNotFound(msg.Height, peerID); err != nil {
r.logger.Error("Couldn't send block not found: ", err)
}
}
case *bcStatusResponseMessage:
r.events <- bcStatusResponse{peerID: src.ID(), height: msg.Height}
case *bcBlockResponseMessage:
r.events <- bcBlockResponse{
peerID: src.ID(),
block: msg.Block,
size: int64(len(msgBytes)),
time: time.Now(),
}
case *bcNoBlockResponseMessage:
r.events <- bcNoBlockResponse{peerID: src.ID(), height: msg.Height, time: time.Now()}
}
}
// AddPeer implements Reactor interface
func (r *BlockchainReactor) AddPeer(peer p2p.Peer) {
err := r.io.sendStatusResponse(r.store.Height(), peer.ID())
if err != nil {
r.logger.Error("Could not send status message to peer new", "src", peer.ID, "height", r.SyncHeight())
}
r.events <- bcAddNewPeer{peerID: peer.ID()}
}
// RemovePeer implements Reactor interface.
func (r *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
event := bcRemovePeer{
peerID: peer.ID(),
reason: reason,
}
r.events <- event
}
func (r *Reactor) AddPeer() {
// TODO: add peer event and send to demuxer
// GetChannels implements Reactor
func (r *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
ID: BlockchainChannel,
Priority: 10,
SendQueueCapacity: 2000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: maxMsgSize,
},
}
}

+ 504
- 9
blockchain/v2/reactor_test.go View File

@ -1,22 +1,517 @@
package v2
import (
"net"
"os"
"sort"
"sync"
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/behaviour"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/mock"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/conn"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
dbm "github.com/tendermint/tm-db"
)
func TestReactor(t *testing.T) {
type mockPeer struct {
service.Service
id p2p.ID
}
func (mp mockPeer) FlushStop() {}
func (mp mockPeer) ID() p2p.ID { return mp.id }
func (mp mockPeer) RemoteIP() net.IP { return net.IP{} }
func (mp mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.RemoteIP(), Port: 8800} }
func (mp mockPeer) IsOutbound() bool { return true }
func (mp mockPeer) IsPersistent() bool { return true }
func (mp mockPeer) CloseConn() error { return nil }
func (mp mockPeer) NodeInfo() p2p.NodeInfo {
return p2p.DefaultNodeInfo{
DefaultNodeID: "",
ListenAddr: "",
}
}
func (mp mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} }
func (mp mockPeer) SocketAddr() *p2p.NetAddress { return &p2p.NetAddress{} }
func (mp mockPeer) Send(byte, []byte) bool { return true }
func (mp mockPeer) TrySend(byte, []byte) bool { return true }
func (mp mockPeer) Set(string, interface{}) {}
func (mp mockPeer) Get(string) interface{} { return struct{}{} }
//nolint:unused
type mockBlockStore struct {
blocks map[int64]*types.Block
}
func (ml *mockBlockStore) Height() int64 {
return int64(len(ml.blocks))
}
func (ml *mockBlockStore) LoadBlock(height int64) *types.Block {
return ml.blocks[height]
}
func (ml *mockBlockStore) SaveBlock(block *types.Block, part *types.PartSet, commit *types.Commit) {
ml.blocks[block.Height] = block
}
type mockBlockApplier struct {
}
// XXX: Add whitelist/blacklist?
func (mba *mockBlockApplier) ApplyBlock(state sm.State, blockID types.BlockID, block *types.Block) (sm.State, error) {
state.LastBlockHeight++
return state, nil
}
type mockSwitchIo struct {
mtx sync.Mutex
switchedToConsensus bool
numStatusResponse int
numBlockResponse int
numNoBlockResponse int
}
func (sio *mockSwitchIo) sendBlockRequest(peerID p2p.ID, height int64) error {
return nil
}
func (sio *mockSwitchIo) sendStatusResponse(height int64, peerID p2p.ID) error {
sio.mtx.Lock()
defer sio.mtx.Unlock()
sio.numStatusResponse++
return nil
}
func (sio *mockSwitchIo) sendBlockToPeer(block *types.Block, peerID p2p.ID) error {
sio.mtx.Lock()
defer sio.mtx.Unlock()
sio.numBlockResponse++
return nil
}
func (sio *mockSwitchIo) sendBlockNotFound(height int64, peerID p2p.ID) error {
sio.mtx.Lock()
defer sio.mtx.Unlock()
sio.numNoBlockResponse++
return nil
}
func (sio *mockSwitchIo) trySwitchToConsensus(state sm.State, blocksSynced int) {
sio.mtx.Lock()
defer sio.mtx.Unlock()
sio.switchedToConsensus = true
}
func (sio *mockSwitchIo) hasSwitchedToConsensus() bool {
sio.mtx.Lock()
defer sio.mtx.Unlock()
return sio.switchedToConsensus
}
func (sio *mockSwitchIo) broadcastStatusRequest(height int64) {
}
type testReactorParams struct {
logger log.Logger
genDoc *types.GenesisDoc
privVals []types.PrivValidator
startHeight int64
bufferSize int
mockA bool
}
func newTestReactor(p testReactorParams) *BlockchainReactor {
store, state, _ := newReactorStore(p.genDoc, p.privVals, p.startHeight)
reporter := behaviour.NewMockReporter()
var appl blockApplier
if p.mockA {
appl = &mockBlockApplier{}
} else {
app := &testApp{}
cc := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start()
if err != nil {
panic(errors.Wrap(err, "error start app"))
}
db := dbm.NewMemDB()
appl = sm.NewBlockExecutor(db, p.logger, proxyApp.Consensus(), mock.Mempool{}, sm.MockEvidencePool{})
sm.SaveState(db, state)
}
r := newReactor(state, store, reporter, appl, p.bufferSize)
logger := log.TestingLogger()
r.SetLogger(logger.With("module", "blockchain"))
return r
}
func TestReactorTerminationScenarios(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_v2_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(config.ChainID(), 1, false, 30)
refStore, _, _ := newReactorStore(genDoc, privVals, 20)
params := testReactorParams{
logger: log.TestingLogger(),
genDoc: genDoc,
privVals: privVals,
startHeight: 10,
bufferSize: 100,
mockA: true,
}
type testEvent struct {
evType string
peer string
height int64
}
tests := []struct {
name string
params testReactorParams
msgs []testEvent
}{
{
name: "simple termination on max peer height - one peer",
params: params,
msgs: []testEvent{
{evType: "AddPeer", peer: "P1"},
{evType: "ReceiveS", peer: "P1", height: 13},
{evType: "BlockReq"},
{evType: "ReceiveB", peer: "P1", height: 11},
{evType: "BlockReq"},
{evType: "BlockReq"},
{evType: "ReceiveB", peer: "P1", height: 12},
{evType: "Process"},
{evType: "ReceiveB", peer: "P1", height: 13},
{evType: "Process"},
},
},
{
name: "simple termination on max peer height - two peers",
params: params,
msgs: []testEvent{
{evType: "AddPeer", peer: "P1"},
{evType: "AddPeer", peer: "P2"},
{evType: "ReceiveS", peer: "P1", height: 13},
{evType: "ReceiveS", peer: "P2", height: 15},
{evType: "BlockReq"},
{evType: "BlockReq"},
{evType: "ReceiveB", peer: "P1", height: 11},
{evType: "ReceiveB", peer: "P2", height: 12},
{evType: "Process"},
{evType: "BlockReq"},
{evType: "BlockReq"},
{evType: "ReceiveB", peer: "P1", height: 13},
{evType: "Process"},
{evType: "ReceiveB", peer: "P2", height: 14},
{evType: "Process"},
{evType: "BlockReq"},
{evType: "ReceiveB", peer: "P2", height: 15},
{evType: "Process"},
},
},
{
name: "termination on max peer height - two peers, noBlock error",
params: params,
msgs: []testEvent{
{evType: "AddPeer", peer: "P1"},
{evType: "AddPeer", peer: "P2"},
{evType: "ReceiveS", peer: "P1", height: 13},
{evType: "ReceiveS", peer: "P2", height: 15},
{evType: "BlockReq"},
{evType: "BlockReq"},
{evType: "ReceiveNB", peer: "P1", height: 11},
{evType: "BlockReq"},
{evType: "ReceiveB", peer: "P2", height: 12},
{evType: "ReceiveB", peer: "P2", height: 11},
{evType: "Process"},
{evType: "BlockReq"},
{evType: "BlockReq"},
{evType: "ReceiveB", peer: "P2", height: 13},
{evType: "Process"},
{evType: "ReceiveB", peer: "P2", height: 14},
{evType: "Process"},
{evType: "BlockReq"},
{evType: "ReceiveB", peer: "P2", height: 15},
{evType: "Process"},
},
},
{
name: "termination on max peer height - two peers, remove one peer",
params: params,
msgs: []testEvent{
{evType: "AddPeer", peer: "P1"},
{evType: "AddPeer", peer: "P2"},
{evType: "ReceiveS", peer: "P1", height: 13},
{evType: "ReceiveS", peer: "P2", height: 15},
{evType: "BlockReq"},
{evType: "BlockReq"},
{evType: "RemovePeer", peer: "P1"},
{evType: "BlockReq"},
{evType: "ReceiveB", peer: "P2", height: 12},
{evType: "ReceiveB", peer: "P2", height: 11},
{evType: "Process"},
{evType: "BlockReq"},
{evType: "BlockReq"},
{evType: "ReceiveB", peer: "P2", height: 13},
{evType: "Process"},
{evType: "ReceiveB", peer: "P2", height: 14},
{evType: "Process"},
{evType: "BlockReq"},
{evType: "ReceiveB", peer: "P2", height: 15},
{evType: "Process"},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
reactor := newTestReactor(params)
reactor.Start()
reactor.reporter = behaviour.NewMockReporter()
mockSwitch := &mockSwitchIo{switchedToConsensus: false}
reactor.io = mockSwitch
// time for go routines to start
time.Sleep(time.Millisecond)
for _, step := range tt.msgs {
switch step.evType {
case "AddPeer":
reactor.scheduler.send(bcAddNewPeer{peerID: p2p.ID(step.peer)})
case "RemovePeer":
reactor.scheduler.send(bcRemovePeer{peerID: p2p.ID(step.peer)})
case "ReceiveS":
reactor.scheduler.send(bcStatusResponse{
peerID: p2p.ID(step.peer),
height: step.height,
time: time.Now(),
})
case "ReceiveB":
reactor.scheduler.send(bcBlockResponse{
peerID: p2p.ID(step.peer),
block: refStore.LoadBlock(step.height),
size: 10,
time: time.Now(),
})
case "ReceiveNB":
reactor.scheduler.send(bcNoBlockResponse{
peerID: p2p.ID(step.peer),
height: step.height,
time: time.Now(),
})
case "BlockReq":
reactor.scheduler.send(rTrySchedule{time: time.Now()})
case "Process":
reactor.processor.send(rProcessBlock{})
}
// give time for messages to propagate between routines
time.Sleep(time.Millisecond)
}
// time for processor to finish and reactor to switch to consensus
time.Sleep(20 * time.Millisecond)
assert.True(t, mockSwitch.hasSwitchedToConsensus())
reactor.Stop()
})
}
}
func TestReactorHelperMode(t *testing.T) {
var (
bufferSize = 10
reactor = NewReactor(bufferSize)
channelID = byte(0x40)
)
reactor.Start()
script := []Event{
// TODO
config := cfg.ResetTestRoot("blockchain_reactor_v2_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(config.ChainID(), 1, false, 30)
params := testReactorParams{
logger: log.TestingLogger(),
genDoc: genDoc,
privVals: privVals,
startHeight: 20,
bufferSize: 100,
mockA: true,
}
type testEvent struct {
peer string
event interface{}
}
tests := []struct {
name string
params testReactorParams
msgs []testEvent
}{
{
name: "status request",
params: params,
msgs: []testEvent{
{"P1", bcStatusRequestMessage{}},
{"P1", bcBlockRequestMessage{Height: 13}},
{"P1", bcBlockRequestMessage{Height: 20}},
{"P1", bcBlockRequestMessage{Height: 22}},
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
reactor := newTestReactor(params)
reactor.Start()
mockSwitch := &mockSwitchIo{switchedToConsensus: false}
reactor.io = mockSwitch
for i := 0; i < len(tt.msgs); i++ {
step := tt.msgs[i]
switch ev := step.event.(type) {
case bcStatusRequestMessage:
old := mockSwitch.numStatusResponse
reactor.Receive(channelID, mockPeer{id: p2p.ID(step.peer)}, cdc.MustMarshalBinaryBare(ev))
assert.Equal(t, old+1, mockSwitch.numStatusResponse)
case bcBlockRequestMessage:
if ev.Height > params.startHeight {
old := mockSwitch.numNoBlockResponse
reactor.Receive(channelID, mockPeer{id: p2p.ID(step.peer)}, cdc.MustMarshalBinaryBare(ev))
assert.Equal(t, old+1, mockSwitch.numNoBlockResponse)
} else {
old := mockSwitch.numBlockResponse
reactor.Receive(channelID, mockPeer{id: p2p.ID(step.peer)}, cdc.MustMarshalBinaryBare(ev))
assert.Equal(t, old+1, mockSwitch.numBlockResponse)
}
}
}
reactor.Stop()
})
}
}
//----------------------------------------------
// utility funcs
func makeTxs(height int64) (txs []types.Tx) {
for i := 0; i < 10; i++ {
txs = append(txs, types.Tx([]byte{byte(height), byte(i)}))
}
return txs
}
func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address)
return block
}
type testApp struct {
abci.BaseApplication
}
func randGenesisDoc(chainID string, numValidators int, randPower bool, minPower int64) (
*types.GenesisDoc, []types.PrivValidator) {
validators := make([]types.GenesisValidator, numValidators)
privValidators := make([]types.PrivValidator, numValidators)
for i := 0; i < numValidators; i++ {
val, privVal := types.RandValidator(randPower, minPower)
validators[i] = types.GenesisValidator{
PubKey: val.PubKey,
Power: val.VotingPower,
}
privValidators[i] = privVal
}
sort.Sort(types.PrivValidatorsByAddress(privValidators))
return &types.GenesisDoc{
GenesisTime: tmtime.Now(),
ChainID: chainID,
Validators: validators,
}, privValidators
}
// Why are we importing the entire blockExecutor dependency graph here
// when we have the facilities to
func newReactorStore(
genDoc *types.GenesisDoc,
privVals []types.PrivValidator,
maxBlockHeight int64) (*store.BlockStore, sm.State, *sm.BlockExecutor) {
if len(privVals) != 1 {
panic("only support one validator")
}
app := &testApp{}
cc := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start()
if err != nil {
panic(errors.Wrap(err, "error start app"))
}
stateDB := dbm.NewMemDB()
blockStore := store.NewBlockStore(dbm.NewMemDB())
state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
if err != nil {
panic(errors.Wrap(err, "error constructing state from genesis file"))
}
db := dbm.NewMemDB()
blockExec := sm.NewBlockExecutor(db, log.TestingLogger(), proxyApp.Consensus(),
mock.Mempool{}, sm.MockEvidencePool{})
sm.SaveState(db, state)
// add blocks in
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
if blockHeight > 1 {
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
lastBlock := blockStore.LoadBlock(blockHeight - 1)
vote, err := types.MakeVote(
lastBlock.Header.Height,
lastBlockMeta.BlockID,
state.Validators,
privVals[0],
lastBlock.Header.ChainID)
if err != nil {
panic(err)
}
lastCommit = types.NewCommit(vote.Height, vote.Round,
lastBlockMeta.BlockID, []types.CommitSig{vote.CommitSig()})
}
thisBlock := makeBlock(blockHeight, state, lastCommit)
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartsHeader: thisParts.Header()}
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
if err != nil {
panic(errors.Wrap(err, "error apply block"))
}
for _, event := range script {
reactor.Receive(event)
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}
reactor.Stop()
return blockStore, state, blockExec
}

+ 3
- 2
blockchain/v2/routine.go View File

@ -10,7 +10,7 @@ import (
type handleFunc = func(event Event) (Event, error)
// Routines are a structure which model a finite state machine as serialized
// Routine is a structure that models a finite state machine as serialized
// stream of events processed by a handle function. This Routine structure
// handles the concurrency and messaging guarantees. Events are sent via
// `send` are handled by the `handle` function to produce an iterator
@ -80,7 +80,7 @@ func (rt *Routine) start() {
return
}
rt.metrics.EventsOut.With("routine", rt.name).Add(1)
rt.logger.Debug(fmt.Sprintf("%s produced %T %+v\n", rt.name, oEvent, oEvent))
rt.logger.Debug(fmt.Sprintf("%s: produced %T %+v\n", rt.name, oEvent, oEvent))
rt.out <- oEvent
}
@ -98,6 +98,7 @@ func (rt *Routine) send(event Event) bool {
rt.logger.Info(fmt.Sprintf("%s: send failed, queue was full/stopped \n", rt.name))
return false
}
rt.metrics.EventsSent.With("routine", rt.name).Add(1)
return true
}


+ 90
- 113
blockchain/v2/scheduler.go View File

@ -11,52 +11,11 @@ import (
"github.com/tendermint/tendermint/types"
)
// Events
// XXX: The handle API would be much simpler if it return a single event, an
// Event, which embeds a terminationEvent if it wants to terminate the routine.
// Input events into the scheduler:
// ticker event for cleaning peers
type tryPrunePeer struct {
priorityHigh
time time.Time
}
// ticker event for scheduling block requests
type trySchedule struct {
priorityHigh
time time.Time
}
// blockResponse message received from a peer
type bcBlockResponse struct {
priorityNormal
time time.Time
peerID p2p.ID
height int64
size int64
block *types.Block
}
// statusResponse message received from a peer
type bcStatusResponse struct {
priorityNormal
time time.Time
peerID p2p.ID
height int64
}
// new peer is connected
type addNewPeer struct {
priorityNormal
peerID p2p.ID
}
// Output events issued by the scheduler:
// Events generated by the scheduler:
// all blocks have been processed
type scFinishedEv struct {
priorityNormal
reason string
}
// send a blockRequest message
@ -80,6 +39,10 @@ type scPeerError struct {
reason error
}
func (e scPeerError) String() string {
return fmt.Sprintf("scPeerError - peerID %s, err %s", e.peerID, e.reason)
}
// scheduler removed a set of peers (timed out or slow peer)
type scPeersPruned struct {
priorityHigh
@ -160,9 +123,10 @@ func (p scPeer) String() string {
func newScPeer(peerID p2p.ID) *scPeer {
return &scPeer{
peerID: peerID,
state: peerStateNew,
height: -1,
peerID: peerID,
state: peerStateNew,
height: -1,
lastTouched: time.Time{},
}
}
@ -176,11 +140,17 @@ type scheduler struct {
// in Processed state.
height int64
// lastAdvance tracks the last time a block execution happened.
// syncTimeout is the maximum time the scheduler waits to advance in the fast sync process before finishing.
// This covers the cases where there are no peers or all peers have a lower height.
lastAdvance time.Time
syncTimeout time.Duration
// a map of peerID to scheduler specific peer struct `scPeer` used to keep
// track of peer specific state
peers map[p2p.ID]*scPeer
peerTimeout time.Duration
minRecvRate int64 // minimum receive rate from peer otherwise prune
peerTimeout time.Duration // maximum response time from a peer otherwise prune
minRecvRate int64 // minimum receive rate from peer otherwise prune
// the maximum number of blocks that should be New, Received or Pending at any point
// in time. This is used to enforce a limit on the blockStates map.
@ -204,15 +174,20 @@ func (sc scheduler) String() string {
sc.initHeight, sc.blockStates, sc.peers, sc.pendingBlocks, sc.pendingTime, sc.receivedBlocks)
}
func newScheduler(initHeight int64) *scheduler {
func newScheduler(initHeight int64, startTime time.Time) *scheduler {
sc := scheduler{
initHeight: initHeight,
lastAdvance: startTime,
syncTimeout: 60 * time.Second,
height: initHeight + 1,
blockStates: make(map[int64]blockState),
peers: make(map[p2p.ID]*scPeer),
pendingBlocks: make(map[int64]p2p.ID),
pendingTime: make(map[int64]time.Time),
receivedBlocks: make(map[int64]p2p.ID),
targetPending: 10, // TODO - pass as param
peerTimeout: 15 * time.Second, // TODO - pass as param
minRecvRate: 0, //int64(7680), TODO - pass as param
}
return &sc
@ -316,6 +291,7 @@ func (sc *scheduler) setPeerHeight(peerID p2p.ID, height int64) error {
}
if height < peer.height {
sc.removePeer(peerID)
return fmt.Errorf("cannot move peer height lower. from %d to %d", peer.height, height)
}
@ -327,7 +303,7 @@ func (sc *scheduler) setPeerHeight(peerID p2p.ID, height int64) error {
}
func (sc *scheduler) getStateAtHeight(height int64) blockState {
if height <= sc.initHeight {
if height < sc.height {
return blockStateProcessed
} else if state, ok := sc.blockStates[height]; ok {
return state
@ -349,41 +325,8 @@ func (sc *scheduler) getPeersAtHeightOrAbove(height int64) []p2p.ID {
return peers
}
func (sc *scheduler) peersInactiveSince(duration time.Duration, now time.Time) []p2p.ID {
peers := []p2p.ID{}
for _, peer := range sc.peers {
if peer.state != peerStateReady {
continue
}
if now.Sub(peer.lastTouched) > duration {
peers = append(peers, peer.peerID)
}
}
// Ensure the order is deterministic for testing
sort.Sort(PeerByID(peers))
return peers
}
// will return peers who's lastRate i slower than minSpeed denominated in bytes
func (sc *scheduler) peersSlowerThan(minSpeed int64) []p2p.ID {
peers := []p2p.ID{}
for peerID, peer := range sc.peers {
if peer.state != peerStateReady {
continue
}
if peer.lastRate < minSpeed {
peers = append(peers, peerID)
}
}
// Ensure the order is deterministic for testing
sort.Sort(PeerByID(peers))
return peers
}
func (sc *scheduler) prunablePeers(peerTimout time.Duration, minRecvRate int64, now time.Time) []p2p.ID {
prunable := []p2p.ID{}
prunable := make([]p2p.ID, 0)
for peerID, peer := range sc.peers {
if peer.state != peerStateReady {
continue
@ -407,8 +350,8 @@ func (sc *scheduler) markReceived(peerID p2p.ID, height int64, size int64, now t
return fmt.Errorf("couldn't find peer %s", peerID)
}
if peer.state == peerStateRemoved {
return fmt.Errorf("cannot receive blocks from removed peer %s", peerID)
if peer.state != peerStateReady {
return fmt.Errorf("cannot receive blocks from not ready peer %s", peerID)
}
if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID {
@ -454,14 +397,13 @@ func (sc *scheduler) markPending(peerID p2p.ID, height int64, time time.Time) er
sc.setStateAtHeight(height, blockStatePending)
sc.pendingBlocks[height] = peerID
// XXX: to make this more accurate we can introduce a message from
// the IO routine which indicates the time the request was put on the wire
sc.pendingTime[height] = time
return nil
}
func (sc *scheduler) markProcessed(height int64) error {
sc.lastAdvance = time.Now()
state := sc.getStateAtHeight(height)
if state != blockStateReceived {
return fmt.Errorf("cannot mark height %d received from block state %s", height, state)
@ -476,6 +418,9 @@ func (sc *scheduler) markProcessed(height int64) error {
}
func (sc *scheduler) allBlocksProcessed() bool {
if len(sc.peers) == 0 {
return false
}
return sc.height >= sc.maxHeight()
}
@ -486,7 +431,7 @@ func (sc *scheduler) maxHeight() int64 {
if peer.state != peerStateReady {
continue
}
if peer.height > max {
if max < peer.height {
max = peer.height
}
}
@ -532,15 +477,15 @@ func (sc *scheduler) selectPeer(height int64) (p2p.ID, error) {
}
// find the set of peers with minimum number of pending requests.
minPending := math.MaxInt64
var minPending int64 = math.MaxInt64
for mp := range pendingFrom {
if mp < minPending {
minPending = mp
if int64(mp) < minPending {
minPending = int64(mp)
}
}
sort.Sort(PeerByID(pendingFrom[minPending]))
return pendingFrom[minPending][0], nil
sort.Sort(PeerByID(pendingFrom[int(minPending)]))
return pendingFrom[int(minPending)][0], nil
}
// PeerByID is a list of peers sorted by peerID.
@ -570,12 +515,30 @@ func (sc *scheduler) handleBlockResponse(event bcBlockResponse) (Event, error) {
err = sc.markReceived(event.peerID, event.block.Height, event.size, event.time)
if err != nil {
_ = sc.removePeer(event.peerID)
return scPeerError{peerID: event.peerID, reason: err}, nil
}
return scBlockReceived{peerID: event.peerID, block: event.block}, nil
}
func (sc *scheduler) handleNoBlockResponse(event bcNoBlockResponse) (Event, error) {
if len(sc.peers) == 0 {
return noOp, nil
}
peer, ok := sc.peers[event.peerID]
if !ok || peer.state == peerStateRemoved {
return noOp, nil
}
// The peer may have been just removed due to errors, low speed or timeouts.
_ = sc.removePeer(event.peerID)
return scPeerError{peerID: event.peerID,
reason: fmt.Errorf("peer %v with height %d claims no block for %d",
event.peerID, peer.height, event.height)}, nil
}
func (sc *scheduler) handleBlockProcessed(event pcBlockProcessed) (Event, error) {
if event.height != sc.height {
panic(fmt.Sprintf("processed height %d but expected height %d", event.height, sc.height))
@ -584,12 +547,12 @@ func (sc *scheduler) handleBlockProcessed(event pcBlockProcessed) (Event, error)
if err != nil {
// It is possible that a peer error or timeout is handled after the processor
// has processed the block but before the scheduler received this event,
// so when pcBlockProcessed event is received the block had been requested again
// so when pcBlockProcessed event is received the block had been requested again.
return scSchedulerFail{reason: err}, nil
}
if sc.allBlocksProcessed() {
return scFinishedEv{}, nil
return scFinishedEv{reason: "processed all blocks"}, nil
}
return noOp, nil
@ -608,13 +571,13 @@ func (sc *scheduler) handleBlockProcessError(event pcBlockVerificationFailure) (
}
if sc.allBlocksProcessed() {
return scFinishedEv{}, nil
return scFinishedEv{reason: "error on last block"}, nil
}
return noOp, nil
}
func (sc *scheduler) handleAddNewPeer(event addNewPeer) (Event, error) {
func (sc *scheduler) handleAddNewPeer(event bcAddNewPeer) (Event, error) {
err := sc.addPeer(event.peerID)
if err != nil {
return scSchedulerFail{reason: err}, nil
@ -622,8 +585,7 @@ func (sc *scheduler) handleAddNewPeer(event addNewPeer) (Event, error) {
return noOp, nil
}
// XXX: unify types peerError
func (sc *scheduler) handlePeerError(event peerError) (Event, error) {
func (sc *scheduler) handleRemovePeer(event bcRemovePeer) (Event, error) {
err := sc.removePeer(event.peerID)
if err != nil {
// XXX - It is possible that the removePeer fails here for legitimate reasons
@ -631,12 +593,23 @@ func (sc *scheduler) handlePeerError(event peerError) (Event, error) {
return scSchedulerFail{reason: err}, nil
}
if sc.allBlocksProcessed() {
return scFinishedEv{}, nil
return scFinishedEv{reason: "removed peer"}, nil
}
return noOp, nil
}
func (sc *scheduler) handleTryPrunePeer(event tryPrunePeer) (Event, error) {
func (sc *scheduler) handleTryPrunePeer(event rTryPrunePeer) (Event, error) {
// Check behavior of peer responsible to deliver block at sc.height.
timeHeightAsked, ok := sc.pendingTime[sc.height]
if ok && time.Since(timeHeightAsked) > sc.peerTimeout {
// A request was sent to a peer for block at sc.height but a response was not received
// from that peer within sc.peerTimeout. Remove the peer. This is to ensure that a peer
// will be timed out even if it sends blocks at higher heights but prevents progress by
// not sending the block at current height.
sc.removePeer(sc.pendingBlocks[sc.height])
}
prunablePeers := sc.prunablePeers(sc.peerTimeout, sc.minRecvRate, event.time)
if len(prunablePeers) == 0 {
return noOp, nil
@ -649,17 +622,19 @@ func (sc *scheduler) handleTryPrunePeer(event tryPrunePeer) (Event, error) {
}
}
// If all blocks are processed we should finish even some peers were pruned.
// If all blocks are processed we should finish.
if sc.allBlocksProcessed() {
return scFinishedEv{}, nil
return scFinishedEv{reason: "after try prune"}, nil
}
return scPeersPruned{peers: prunablePeers}, nil
}
// TODO - Schedule multiple block requests
func (sc *scheduler) handleTrySchedule(event trySchedule) (Event, error) {
func (sc *scheduler) handleTrySchedule(event rTrySchedule) (Event, error) {
if time.Since(sc.lastAdvance) > sc.syncTimeout {
return scFinishedEv{reason: "timeout, no advance"}, nil
}
nextHeight := sc.nextHeightToSchedule()
if nextHeight == -1 {
@ -693,17 +668,20 @@ func (sc *scheduler) handle(event Event) (Event, error) {
case bcBlockResponse:
nextEvent, err := sc.handleBlockResponse(event)
return nextEvent, err
case trySchedule:
case bcNoBlockResponse:
nextEvent, err := sc.handleNoBlockResponse(event)
return nextEvent, err
case rTrySchedule:
nextEvent, err := sc.handleTrySchedule(event)
return nextEvent, err
case addNewPeer:
case bcAddNewPeer:
nextEvent, err := sc.handleAddNewPeer(event)
return nextEvent, err
case tryPrunePeer:
nextEvent, err := sc.handleTryPrunePeer(event)
case bcRemovePeer:
nextEvent, err := sc.handleRemovePeer(event)
return nextEvent, err
case peerError:
nextEvent, err := sc.handlePeerError(event)
case rTryPrunePeer:
nextEvent, err := sc.handleTryPrunePeer(event)
return nextEvent, err
case pcBlockProcessed:
nextEvent, err := sc.handleBlockProcessed(event)
@ -714,5 +692,4 @@ func (sc *scheduler) handle(event Event) (Event, error) {
default:
return scSchedulerFail{reason: fmt.Errorf("unknown event %v", event)}, nil
}
//return noOp, nil
}

+ 191
- 328
blockchain/v2/scheduler_test.go View File

@ -23,6 +23,8 @@ type scTestParams struct {
peerTimeout time.Duration
minRecvRate int64
targetPending int
startTime time.Time
syncTimeout time.Duration
}
func verifyScheduler(sc *scheduler) {
@ -37,8 +39,9 @@ func verifyScheduler(sc *scheduler) {
func newTestScheduler(params scTestParams) *scheduler {
peers := make(map[p2p.ID]*scPeer)
var maxHeight int64
sc := newScheduler(params.initHeight)
sc := newScheduler(params.initHeight, params.startTime)
if params.height != 0 {
sc.height = params.height
}
@ -46,6 +49,9 @@ func newTestScheduler(params scTestParams) *scheduler {
for id, peer := range params.peers {
peer.peerID = p2p.ID(id)
peers[p2p.ID(id)] = peer
if maxHeight < peer.height {
maxHeight = peer.height
}
}
for _, h := range params.allB {
sc.blockStates[h] = blockStateNew
@ -64,6 +70,12 @@ func newTestScheduler(params scTestParams) *scheduler {
sc.peers = peers
sc.peerTimeout = params.peerTimeout
if params.syncTimeout == 0 {
sc.syncTimeout = 10 * time.Second
} else {
sc.syncTimeout = params.syncTimeout
}
if params.targetPending == 0 {
sc.targetPending = 10
} else {
@ -80,7 +92,7 @@ func newTestScheduler(params scTestParams) *scheduler {
func TestScInit(t *testing.T) {
var (
initHeight int64 = 5
sc = newScheduler(initHeight)
sc = newScheduler(initHeight, time.Now())
)
assert.Equal(t, blockStateProcessed, sc.getStateAtHeight(initHeight))
assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(initHeight+1))
@ -181,21 +193,21 @@ func TestScAddPeer(t *testing.T) {
name: "add first peer",
fields: scTestParams{},
args: args{peerID: "P1"},
wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}},
wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}},
},
{
name: "add second peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}},
fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}},
args: args{peerID: "P2"},
wantFields: scTestParams{peers: map[string]*scPeer{
"P1": {state: peerStateNew, height: -1},
"P2": {state: peerStateNew, height: -1}}},
"P1": {height: -1, state: peerStateNew},
"P2": {height: -1, state: peerStateNew}}},
},
{
name: "attempt to add duplicate peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}},
fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}},
args: args{peerID: "P1"},
wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}},
wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}},
wantErr: true,
},
{
@ -271,8 +283,8 @@ func TestScTouchPeer(t *testing.T) {
name: "touch peer in state Ready",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastTouched: now}}},
args: args{peerID: "P1", time: now.Add(3 * time.Second)},
wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady,
lastTouched: now.Add(3 * time.Second)}}},
wantFields: scTestParams{peers: map[string]*scPeer{
"P1": {state: peerStateReady, lastTouched: now.Add(3 * time.Second)}}},
},
}
@ -289,195 +301,6 @@ func TestScTouchPeer(t *testing.T) {
}
}
func TestScPeersInactiveSince(t *testing.T) {
now := time.Now()
type args struct {
threshold time.Duration
time time.Time
}
tests := []struct {
name string
fields scTestParams
args args
wantResult []p2p.ID
}{
{
name: "no peers",
fields: scTestParams{peers: map[string]*scPeer{}},
args: args{threshold: time.Second, time: now},
wantResult: []p2p.ID{},
},
{
name: "one active peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastTouched: now}}},
args: args{threshold: time.Second, time: now},
wantResult: []p2p.ID{},
},
{
name: "one inactive peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastTouched: now}}},
args: args{threshold: time.Second, time: now.Add(time.Second + time.Millisecond)},
wantResult: []p2p.ID{"P1"},
},
{
name: "one active and one inactive peer",
fields: scTestParams{peers: map[string]*scPeer{
"P1": {state: peerStateReady, lastTouched: now},
"P2": {state: peerStateReady, lastTouched: now.Add(time.Second)}}},
args: args{threshold: time.Second, time: now.Add(time.Second + time.Millisecond)},
wantResult: []p2p.ID{"P1"},
},
{
name: "one New peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew}}},
args: args{threshold: time.Second, time: now.Add(time.Millisecond)},
wantResult: []p2p.ID{},
},
{
name: "one Removed peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastTouched: now}}},
args: args{threshold: time.Second, time: now.Add(time.Millisecond)},
wantResult: []p2p.ID{},
},
{
name: "one Ready active peer and one New",
fields: scTestParams{peers: map[string]*scPeer{
"P1": {state: peerStateRemoved, lastTouched: now},
"P2": {state: peerStateReady, lastTouched: now.Add(time.Millisecond)}}},
args: args{threshold: time.Second, time: now.Add(2 * time.Millisecond)},
wantResult: []p2p.ID{},
},
{
name: "one Ready inactive peer and one New",
fields: scTestParams{peers: map[string]*scPeer{
"P1": {state: peerStateRemoved, lastTouched: now},
"P2": {state: peerStateReady, lastTouched: now.Add(time.Millisecond)}}},
args: args{threshold: time.Second, time: now.Add(time.Second + 2*time.Millisecond)},
wantResult: []p2p.ID{"P2"},
},
{
name: "combination of New, Removed and, active and non active Ready peers",
fields: scTestParams{peers: map[string]*scPeer{
"P1": {state: peerStateNew},
"P2": {state: peerStateRemoved, lastTouched: now},
"P3": {state: peerStateRemoved, lastTouched: now.Add(time.Second)},
"P4": {state: peerStateReady, lastTouched: now.Add(time.Millisecond)},
"P5": {state: peerStateReady, lastTouched: now.Add(3 * time.Millisecond)}}},
args: args{threshold: time.Second, time: now.Add(time.Second + 2*time.Millisecond)},
wantResult: []p2p.ID{"P4"},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
sc := newTestScheduler(tt.fields)
// peersInactiveSince should not mutate the scheduler
wantSc := sc
res := sc.peersInactiveSince(tt.args.threshold, tt.args.time)
sort.Sort(PeerByID(res))
assert.Equal(t, tt.wantResult, res)
assert.Equal(t, wantSc, sc)
})
}
}
func TestScPeersSlowerThan(t *testing.T) {
type args struct {
minSpeed int64
}
tests := []struct {
name string
fields scTestParams
args args
wantResult []p2p.ID
}{
{
name: "no peers",
fields: scTestParams{peers: map[string]*scPeer{}},
args: args{minSpeed: 100},
wantResult: []p2p.ID{},
},
{
name: "one Ready faster peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastRate: 101}}},
args: args{minSpeed: 100},
wantResult: []p2p.ID{},
},
{
name: "one Ready equal peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastRate: 100}}},
args: args{minSpeed: 100},
wantResult: []p2p.ID{},
},
{
name: "one Ready slow peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastRate: 99}}},
args: args{minSpeed: 100},
wantResult: []p2p.ID{"P1"},
},
{
name: "one Removed faster peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastRate: 101}}},
args: args{minSpeed: 100},
wantResult: []p2p.ID{},
}, {
name: "one Removed equal peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastRate: 100}}},
args: args{minSpeed: 100},
wantResult: []p2p.ID{},
},
{
name: "one Removed slow peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastRate: 99}}},
args: args{minSpeed: 100},
wantResult: []p2p.ID{},
},
{
name: "one New peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew}}},
args: args{minSpeed: 100},
wantResult: []p2p.ID{},
},
{
name: "one New peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew}}},
args: args{minSpeed: 100},
wantResult: []p2p.ID{},
},
{
name: "mixed peers",
fields: scTestParams{peers: map[string]*scPeer{
"P1": {state: peerStateRemoved, lastRate: 101},
"P2": {state: peerStateReady, lastRate: 101},
"P3": {state: peerStateRemoved, lastRate: 100},
"P4": {state: peerStateReady, lastRate: 100},
"P5": {state: peerStateReady, lastRate: 99},
"P6": {state: peerStateNew},
"P7": {state: peerStateRemoved, lastRate: 99},
"P8": {state: peerStateReady, lastRate: 99},
}},
args: args{minSpeed: 100},
wantResult: []p2p.ID{"P5", "P8"},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
sc := newTestScheduler(tt.fields)
// peersSlowerThan should not mutate the scheduler
wantSc := sc
res := sc.peersSlowerThan(tt.args.minSpeed)
assert.Equal(t, tt.wantResult, res)
assert.Equal(t, wantSc, sc)
})
}
}
func TestScPrunablePeers(t *testing.T) {
now := time.Now()
@ -716,8 +539,8 @@ func TestScSetPeerHeight(t *testing.T) {
allB: []int64{1, 2, 3, 4}},
args: args{peerID: "P1", height: 2},
wantFields: scTestParams{
peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}},
allB: []int64{1, 2, 3, 4}},
peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}},
allB: []int64{}},
wantErr: true,
},
{
@ -845,7 +668,7 @@ func TestScGetPeersAtHeight(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
sc := newTestScheduler(tt.fields)
// getPeersAtHeightOrAbove should not mutate the scheduler
// getPeersAtHeight should not mutate the scheduler
wantSc := sc
res := sc.getPeersAtHeightOrAbove(tt.args.height)
sort.Sort(PeerByID(res))
@ -1082,8 +905,11 @@ func TestScMarkReceived(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
sc := newTestScheduler(tt.fields)
if err := sc.markReceived(tt.args.peerID,
tt.args.height, tt.args.size, now.Add(time.Second)); (err != nil) != tt.wantErr {
if err := sc.markReceived(
tt.args.peerID,
tt.args.height,
tt.args.size,
now.Add(time.Second)); (err != nil) != tt.wantErr {
t.Errorf("markReceived() wantErr %v, error = %v", tt.wantErr, err)
}
wantSc := newTestScheduler(tt.wantFields)
@ -1145,11 +971,17 @@ func TestScMarkProcessed(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
sc := newTestScheduler(tt.fields)
oldBlockState := sc.getStateAtHeight(tt.args.height)
if err := sc.markProcessed(tt.args.height); (err != nil) != tt.wantErr {
t.Errorf("markProcessed() wantErr %v, error = %v", tt.wantErr, err)
}
if tt.wantErr {
assert.Equal(t, oldBlockState, sc.getStateAtHeight(tt.args.height))
} else {
assert.Equal(t, blockStateProcessed, sc.getStateAtHeight(tt.args.height))
}
wantSc := newTestScheduler(tt.wantFields)
assert.Equal(t, wantSc, sc)
checkSameScheduler(t, wantSc, sc)
})
}
}
@ -1163,9 +995,9 @@ func TestScAllBlocksProcessed(t *testing.T) {
wantResult bool
}{
{
name: "no blocks",
name: "no blocks, no peers",
fields: scTestParams{},
wantResult: true,
wantResult: false,
},
{
name: "only New blocks",
@ -1225,7 +1057,7 @@ func TestScAllBlocksProcessed(t *testing.T) {
wantSc := sc
res := sc.allBlocksProcessed()
assert.Equal(t, tt.wantResult, res)
assert.Equal(t, wantSc, sc)
checkSameScheduler(t, wantSc, sc)
})
}
}
@ -1305,8 +1137,7 @@ func TestScNextHeightToSchedule(t *testing.T) {
resMin := sc.nextHeightToSchedule()
assert.Equal(t, tt.wantHeight, resMin)
assert.Equal(t, wantSc, sc)
checkSameScheduler(t, wantSc, sc)
})
}
}
@ -1414,7 +1245,7 @@ func TestScSelectPeer(t *testing.T) {
res, err := sc.selectPeer(tt.args.height)
assert.Equal(t, tt.wantResult, res)
assert.Equal(t, tt.wantError, err != nil)
assert.Equal(t, wantSc, sc)
checkSameScheduler(t, wantSc, sc)
})
}
}
@ -1424,6 +1255,20 @@ func makeScBlock(height int64) *types.Block {
return &types.Block{Header: types.Header{Height: height}}
}
// used in place of assert.Equal(t, want, actual) to avoid failures due to
// scheduler.lastAdvanced timestamp inequalities.
func checkSameScheduler(t *testing.T, want *scheduler, actual *scheduler) {
assert.Equal(t, want.initHeight, actual.initHeight)
assert.Equal(t, want.height, actual.height)
assert.Equal(t, want.peers, actual.peers)
assert.Equal(t, want.blockStates, actual.blockStates)
assert.Equal(t, want.pendingBlocks, actual.pendingBlocks)
assert.Equal(t, want.pendingTime, actual.pendingTime)
assert.Equal(t, want.blockStates, actual.blockStates)
assert.Equal(t, want.receivedBlocks, actual.receivedBlocks)
assert.Equal(t, want.blockStates, actual.blockStates)
}
// checkScResults checks scheduler handler test results
func checkScResults(t *testing.T, wantErr bool, err error, wantEvent Event, event Event) {
if (err != nil) != wantErr {
@ -1439,8 +1284,6 @@ func checkScResults(t *testing.T, wantErr bool, err error, wantEvent Event, even
assert.Equal(t, wantEvent.block, event.(scBlockReceived).block)
case scSchedulerFail:
assert.Equal(t, wantEvent.reason != nil, event.(scSchedulerFail).reason != nil)
default:
assert.Equal(t, wantEvent, event)
}
}
@ -1449,7 +1292,6 @@ func TestScHandleBlockResponse(t *testing.T) {
block6FromP1 := bcBlockResponse{
time: now.Add(time.Millisecond),
peerID: p2p.ID("P1"),
height: 6,
size: 100,
block: makeScBlock(6),
}
@ -1530,6 +1372,82 @@ func TestScHandleBlockResponse(t *testing.T) {
}
}
func TestScHandleNoBlockResponse(t *testing.T) {
now := time.Now()
noBlock6FromP1 := bcNoBlockResponse{
time: now.Add(time.Millisecond),
peerID: p2p.ID("P1"),
height: 6,
}
tests := []struct {
name string
fields scTestParams
wantEvent Event
wantFields scTestParams
wantErr bool
}{
{
name: "empty scheduler",
fields: scTestParams{},
wantEvent: noOpEvent{},
wantFields: scTestParams{},
},
{
name: "noBlock from removed peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 8, state: peerStateRemoved}}},
wantEvent: noOpEvent{},
wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: 8, state: peerStateRemoved}}},
},
{
name: "for block we haven't asked for",
fields: scTestParams{
peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}},
allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}},
wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")},
wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: 8, state: peerStateRemoved}}},
},
{
name: "noBlock from peer we don't have",
fields: scTestParams{
peers: map[string]*scPeer{"P2": {height: 8, state: peerStateReady}},
allB: []int64{1, 2, 3, 4, 5, 6, 7, 8},
pending: map[int64]p2p.ID{6: "P2"},
pendingTime: map[int64]time.Time{6: now},
},
wantEvent: noOpEvent{},
wantFields: scTestParams{
peers: map[string]*scPeer{"P2": {height: 8, state: peerStateReady}},
allB: []int64{1, 2, 3, 4, 5, 6, 7, 8},
pending: map[int64]p2p.ID{6: "P2"},
pendingTime: map[int64]time.Time{6: now},
},
},
{
name: "noBlock from existing peer",
fields: scTestParams{
peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}},
allB: []int64{1, 2, 3, 4, 5, 6, 7, 8},
pending: map[int64]p2p.ID{6: "P1"},
pendingTime: map[int64]time.Time{6: now},
},
wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")},
wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: 8, state: peerStateRemoved}}},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
sc := newTestScheduler(tt.fields)
event, err := sc.handleNoBlockResponse(noBlock6FromP1)
checkScResults(t, tt.wantErr, err, tt.wantEvent, event)
wantSc := newTestScheduler(tt.wantFields)
assert.Equal(t, wantSc, sc)
})
}
}
func TestScHandleBlockProcessed(t *testing.T) {
now := time.Now()
processed6FromP1 := pcBlockProcessed{
@ -1702,11 +1620,11 @@ func TestScHandleBlockVerificationFailure(t *testing.T) {
}
func TestScHandleAddNewPeer(t *testing.T) {
addP1 := addNewPeer{
addP1 := bcAddNewPeer{
peerID: p2p.ID("P1"),
}
type args struct {
event addNewPeer
event bcAddNewPeer
}
tests := []struct {
@ -1754,76 +1672,14 @@ func TestScHandleAddNewPeer(t *testing.T) {
}
}
func TestScHandlePeerError(t *testing.T) {
errP1 := peerError{
peerID: p2p.ID("P1"),
}
type args struct {
event peerError
}
tests := []struct {
name string
fields scTestParams
args args
wantEvent Event
wantErr bool
}{
{
name: "no peers",
fields: scTestParams{},
args: args{event: errP1},
wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")},
},
{
name: "error finds no peer",
fields: scTestParams{
height: 6,
peers: map[string]*scPeer{"P2": {height: 8, state: peerStateReady}},
allB: []int64{6, 7, 8},
},
args: args{event: errP1},
wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")},
},
{
name: "error finds peer, only peer is removed",
fields: scTestParams{
height: 6,
peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}},
allB: []int64{6, 7, 8},
},
args: args{event: errP1},
wantEvent: scFinishedEv{},
},
{
name: "error finds peer, one of two peers are removed",
fields: scTestParams{
peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}, "P2": {height: 8, state: peerStateReady}},
allB: []int64{1, 2, 3, 4, 5, 6, 7, 8},
},
args: args{event: errP1},
wantEvent: noOpEvent{},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
sc := newTestScheduler(tt.fields)
event, err := sc.handlePeerError(tt.args.event)
checkScResults(t, tt.wantErr, err, tt.wantEvent, event)
})
}
}
func TestScHandleTryPrunePeer(t *testing.T) {
now := time.Now()
pruneEv := tryPrunePeer{
pruneEv := rTryPrunePeer{
time: now.Add(time.Second + time.Millisecond),
}
type args struct {
event tryPrunePeer
event rTryPrunePeer
}
tests := []struct {
@ -1914,14 +1770,14 @@ func TestScHandleTryPrunePeer(t *testing.T) {
}
}
func TestHandleTrySchedule(t *testing.T) {
func TestScHandleTrySchedule(t *testing.T) {
now := time.Now()
tryEv := trySchedule{
tryEv := rTrySchedule{
time: now.Add(time.Second + time.Millisecond),
}
type args struct {
event trySchedule
event rTrySchedule
}
tests := []struct {
name string
@ -1932,41 +1788,44 @@ func TestHandleTrySchedule(t *testing.T) {
}{
{
name: "no peers",
fields: scTestParams{peers: map[string]*scPeer{}},
fields: scTestParams{startTime: now, peers: map[string]*scPeer{}},
args: args{event: tryEv},
wantEvent: noOpEvent{},
},
{
name: "only new peers",
fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}},
fields: scTestParams{startTime: now, peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}},
args: args{event: tryEv},
wantEvent: noOpEvent{},
},
{
name: "only Removed peers",
fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}},
fields: scTestParams{startTime: now, peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}},
args: args{event: tryEv},
wantEvent: noOpEvent{},
},
{
name: "one Ready shorter peer",
fields: scTestParams{
height: 6,
peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}},
startTime: now,
height: 6,
peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}},
args: args{event: tryEv},
wantEvent: noOpEvent{},
},
{
name: "one Ready equal peer",
fields: scTestParams{
peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}},
allB: []int64{1, 2, 3, 4}},
startTime: now,
peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}},
allB: []int64{1, 2, 3, 4}},
args: args{event: tryEv},
wantEvent: scBlockRequest{peerID: "P1", height: 1},
},
{
name: "many Ready higher peers with different number of pending requests",
fields: scTestParams{
startTime: now,
peers: map[string]*scPeer{
"P1": {height: 4, state: peerStateReady},
"P2": {height: 5, state: peerStateReady}},
@ -1983,6 +1842,7 @@ func TestHandleTrySchedule(t *testing.T) {
{
name: "many Ready higher peers with same number of pending requests",
fields: scTestParams{
startTime: now,
peers: map[string]*scPeer{
"P2": {height: 8, state: peerStateReady},
"P1": {height: 8, state: peerStateReady},
@ -2084,6 +1944,8 @@ func TestScHandleStatusResponse(t *testing.T) {
}
func TestScHandle(t *testing.T) {
now := time.Now()
type unknownEv struct {
priorityNormal
}
@ -2123,24 +1985,27 @@ func TestScHandle(t *testing.T) {
name: "single peer, sync 3 blocks",
steps: []scStep{
{ // add P1
currentSc: &scTestParams{peers: map[string]*scPeer{}, height: 1},
args: args{event: addNewPeer{peerID: "P1"}},
currentSc: &scTestParams{startTime: now, peers: map[string]*scPeer{}, height: 1},
args: args{event: bcAddNewPeer{peerID: "P1"}},
wantEvent: noOpEvent{},
wantSc: &scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}, height: 1},
wantSc: &scTestParams{startTime: now, peers: map[string]*scPeer{
"P1": {height: -1, state: peerStateNew}}, height: 1},
},
{ // set height of P1
args: args{event: bcStatusResponse{peerID: "P1", time: tick[0], height: 3}},
wantEvent: noOpEvent{},
wantSc: &scTestParams{
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}},
allB: []int64{1, 2, 3},
height: 1,
startTime: now,
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}},
allB: []int64{1, 2, 3},
height: 1,
},
},
{ // schedule block 1
args: args{event: trySchedule{time: tick[1]}},
args: args{event: rTrySchedule{time: tick[1]}},
wantEvent: scBlockRequest{peerID: "P1", height: 1},
wantSc: &scTestParams{
startTime: now,
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}},
allB: []int64{1, 2, 3},
pending: map[int64]p2p.ID{1: "P1"},
@ -2149,9 +2014,10 @@ func TestScHandle(t *testing.T) {
},
},
{ // schedule block 2
args: args{event: trySchedule{time: tick[2]}},
args: args{event: rTrySchedule{time: tick[2]}},
wantEvent: scBlockRequest{peerID: "P1", height: 2},
wantSc: &scTestParams{
startTime: now,
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}},
allB: []int64{1, 2, 3},
pending: map[int64]p2p.ID{1: "P1", 2: "P1"},
@ -2160,9 +2026,10 @@ func TestScHandle(t *testing.T) {
},
},
{ // schedule block 3
args: args{event: trySchedule{time: tick[3]}},
args: args{event: rTrySchedule{time: tick[3]}},
wantEvent: scBlockRequest{peerID: "P1", height: 3},
wantSc: &scTestParams{
startTime: now,
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}},
allB: []int64{1, 2, 3},
pending: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1"},
@ -2171,9 +2038,10 @@ func TestScHandle(t *testing.T) {
},
},
{ // block response 1
args: args{event: bcBlockResponse{peerID: "P1", height: 1, time: tick[4], size: 100, block: makeScBlock(1)}},
args: args{event: bcBlockResponse{peerID: "P1", time: tick[4], size: 100, block: makeScBlock(1)}},
wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(1)},
wantSc: &scTestParams{
startTime: now,
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[4]}},
allB: []int64{1, 2, 3},
pending: map[int64]p2p.ID{2: "P1", 3: "P1"},
@ -2183,9 +2051,10 @@ func TestScHandle(t *testing.T) {
},
},
{ // block response 2
args: args{event: bcBlockResponse{peerID: "P1", height: 2, time: tick[5], size: 100, block: makeScBlock(2)}},
args: args{event: bcBlockResponse{peerID: "P1", time: tick[5], size: 100, block: makeScBlock(2)}},
wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(2)},
wantSc: &scTestParams{
startTime: now,
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[5]}},
allB: []int64{1, 2, 3},
pending: map[int64]p2p.ID{3: "P1"},
@ -2195,33 +2064,36 @@ func TestScHandle(t *testing.T) {
},
},
{ // block response 3
args: args{event: bcBlockResponse{peerID: "P1", height: 3, time: tick[6], size: 100, block: makeScBlock(3)}},
args: args{event: bcBlockResponse{peerID: "P1", time: tick[6], size: 100, block: makeScBlock(3)}},
wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(3)},
wantSc: &scTestParams{
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}},
allB: []int64{1, 2, 3},
received: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1"},
height: 1,
startTime: now,
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}},
allB: []int64{1, 2, 3},
received: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1"},
height: 1,
},
},
{ // processed block 1
args: args{event: pcBlockProcessed{peerID: p2p.ID("P1"), height: 1}},
wantEvent: noOpEvent{},
wantSc: &scTestParams{
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}},
allB: []int64{2, 3},
received: map[int64]p2p.ID{2: "P1", 3: "P1"},
height: 2,
startTime: now,
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}},
allB: []int64{2, 3},
received: map[int64]p2p.ID{2: "P1", 3: "P1"},
height: 2,
},
},
{ // processed block 2
args: args{event: pcBlockProcessed{peerID: p2p.ID("P1"), height: 2}},
wantEvent: scFinishedEv{},
wantSc: &scTestParams{
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}},
allB: []int64{3},
received: map[int64]p2p.ID{3: "P1"},
height: 3,
startTime: now,
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}},
allB: []int64{3},
received: map[int64]p2p.ID{3: "P1"},
height: 3,
},
},
},
@ -2231,6 +2103,7 @@ func TestScHandle(t *testing.T) {
steps: []scStep{
{ // failure processing block 1
currentSc: &scTestParams{
startTime: now,
peers: map[string]*scPeer{
"P1": {height: 4, state: peerStateReady, lastTouched: tick[6]},
"P2": {height: 3, state: peerStateReady, lastTouched: tick[6]}},
@ -2241,6 +2114,7 @@ func TestScHandle(t *testing.T) {
args: args{event: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P1"}},
wantEvent: noOpEvent{},
wantSc: &scTestParams{
startTime: now,
peers: map[string]*scPeer{
"P1": {height: 4, state: peerStateRemoved, lastTouched: tick[6]},
"P2": {height: 3, state: peerStateReady, lastTouched: tick[6]}},
@ -2249,19 +2123,6 @@ func TestScHandle(t *testing.T) {
height: 1,
},
},
/*
{ // processed block 2
args: args{event: pcBlockProcessed{peerID: p2p.ID("P1"), height: 2}},
wantEvent: scFinishedEv{},
wantSc: &scTestParams{
peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}},
allB: []int64{3},
received: map[int64]p2p.ID{3: "P1"},
height: 3,
},
},
*/
},
},
}
@ -2280,8 +2141,10 @@ func TestScHandle(t *testing.T) {
}
nextEvent, err := sc.handle(step.args.event)
assert.Equal(t, newTestScheduler(*step.wantSc), sc)
wantSc := newTestScheduler(*step.wantSc)
t.Logf("step %d(%v): %s", i, step.args.event, sc)
checkSameScheduler(t, wantSc, sc)
checkScResults(t, step.wantErr, err, step.wantEvent, nextEvent)


+ 1
- 0
blockchain/v2/types.go View File

@ -4,6 +4,7 @@ import (
"github.com/Workiva/go-datastructures/queue"
)
// Event is the type that can be added to the priority queue.
type Event queue.Item
type priority interface {


+ 10
- 1
docs/architecture/adr-043-blockchain-riri-org.md View File

@ -4,6 +4,7 @@
* 18-06-2019: Initial draft
* 08-07-2019: Reviewed
* 29-11-2019: Implemented
* 14-02-2020: Updated with the implementation details
## Context
@ -27,7 +28,15 @@ This ADR is meant to specify the missing components and control necessary to ach
Partition the responsibilities of the blockchain reactor into a set of components which communicate exclusively with events. Events will contain timestamps allowing each component to track time as internal state. The internal state will be mutated by a set of `handle*` which will produce event(s). The integration between components will happen in the reactor and reactor tests will then become integration tests between components. This design will be known as `v2`.
![v2 Blockchain Reactor Architecture
Diagram](https://github.com/tendermint/tendermint/blob/f9e556481654a24aeb689bdadaf5eab3ccd66829/docs/architecture/img/blockchain-reactor-v2.png)
Diagram](https://github.com/tendermint/tendermint/blob/584e67ac3fac220c5c3e0652e3582eca8231e814/docs/architecture/img/blockchain-reactor-v2.png)
### Fast Sync Related Communication Channels
The diagram below shows the fast sync routines and the types of channels and queues used to communicate with each other.
In addition the per reactor channels used by the sendRoutine to send messages over the Peer MConnection are shown.
![v2 Blockchain Channels and Queues
Diagram](https://github.com/tendermint/tendermint/blob/5cf570690f989646fb3b615b734da503f038891f/docs/architecture/img/blockchain-v2-channels.png)
### Reactor changes in detail


BIN
docs/architecture/img/blockchain-reactor-v2.png View File

Before After
Width: 1104  |  Height: 770  |  Size: 118 KiB Width: 1315  |  Height: 809  |  Size: 167 KiB

BIN
docs/architecture/img/blockchain-v2-channels.png View File

Before After
Width: 1098  |  Height: 692  |  Size: 107 KiB

Loading…
Cancel
Save