From 55278211edad882297b78e2b15480da8ec4f3a9c Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Mon, 2 Dec 2019 11:01:24 +0100 Subject: [PATCH] Blockchain v2 Scheduler (#4043) * Add processor prototype * Change processor API + expose a simple `handle` function which mutates internal state * schedule event handling * rename schedule -> scheduler * fill in handle function * processor tests * fix gofmt and ohter golangci issues * scopelint var on range scope * add check for short block received * small test reorg * ci fix changes * go.mod revert * some cleanup and review comments * scheduler fixes and unit tests, also small processor changes. changed scPeerPruned to include a list of pruned peers touchPeer to check peer state and remove the blocks from blockStates if the peer removal causes the max peer height to be lower. remove the block at sc.initHeight changed peersInactiveSince, peersSlowerThan, getPeersAtHeight check peer state prunablePeers to return a sorted list of peers lastRate in markReceived() attempted to divide by 0, temp fix. fixed allBlocksProcessed conditions maxHeight() and minHeight() to return sc.initHeight if no ready peers present make selectPeer() deterministic. added handleBlockProcessError() added termination cond. (sc.allBlocksProcessed()) to handleTryPrunePeer() and others. changed pcBlockVerificationFailure to include peer of H+2 block along with the one for H+1 changed the processor to call purgePeer on block verification failure. fixed processor tests added scheduler tests. * typo and ci fixes * remove height from scBlockRequest, golangci fixes * limit on blockState map, updated tests * remove unused * separate test for maxHeight(), used for sched. validation * use Math.Min * fix golangci * Document the semantics of blockStates in the scheduler * better docs * distinguish between unknown and invalid blockstate * Standardize peer filtering methods * feedback * s/getPeersAtHeight/getPeersAtHeightOrAbove * small notes * Update blockchain/v2/scheduler.go Co-Authored-By: Anton Kaliaev * Update comments based on feedback * Add enum offset * panic on nil block in processor * remove unused max height calculation * format shorter line --- blockchain/v2/processor.go | 27 +- blockchain/v2/processor_test.go | 36 +- blockchain/v2/routine.go | 4 +- blockchain/v2/schedule.go | 386 ------ blockchain/v2/schedule_test.go | 272 ---- blockchain/v2/scheduler.go | 718 ++++++++++ blockchain/v2/scheduler_test.go | 2293 +++++++++++++++++++++++++++++++ 7 files changed, 3055 insertions(+), 681 deletions(-) delete mode 100644 blockchain/v2/schedule.go delete mode 100644 blockchain/v2/schedule_test.go create mode 100644 blockchain/v2/scheduler.go create mode 100644 blockchain/v2/scheduler_test.go diff --git a/blockchain/v2/processor.go b/blockchain/v2/processor.go index 22b46ede3..e33b36058 100644 --- a/blockchain/v2/processor.go +++ b/blockchain/v2/processor.go @@ -21,17 +21,11 @@ type pcShortBlock struct { priorityNormal } -type bcBlockResponse struct { - priorityNormal - peerID p2p.ID - block *types.Block - height int64 -} - type pcBlockVerificationFailure struct { priorityNormal - peerID p2p.ID - height int64 + height int64 + firstPeerID p2p.ID + secondPeerID p2p.ID } type pcBlockProcessed struct { @@ -135,11 +129,14 @@ func (state *pcState) purgePeer(peerID p2p.ID) { // handle processes FSM events func (state *pcState) handle(event Event) (Event, error) { switch event := event.(type) { - case *bcBlockResponse: - if event.height <= state.height { + case *scBlockReceived: + if event.block == nil { + panic("processor received an event with a nil block") + } + if event.block.Height <= state.height { return pcShortBlock{}, nil } - err := state.enqueue(event.peerID, event.block, event.height) + err := state.enqueue(event.peerID, event.block, event.block.Height) if err != nil { return pcDuplicateBlock{}, nil } @@ -160,7 +157,11 @@ func (state *pcState) handle(event Event) (Event, error) { err = state.context.verifyCommit(state.chainID, firstID, first.Height, second.LastCommit) if err != nil { - return pcBlockVerificationFailure{peerID: firstItem.peerID, height: first.Height}, nil + state.purgePeer(firstItem.peerID) + state.purgePeer(secondItem.peerID) + return pcBlockVerificationFailure{ + height: first.Height, firstPeerID: firstItem.peerID, secondPeerID: secondItem.peerID}, + nil } state.context.saveBlock(first, firstParts, second.LastCommit) diff --git a/blockchain/v2/processor_test.go b/blockchain/v2/processor_test.go index 195df3618..61be23663 100644 --- a/blockchain/v2/processor_test.go +++ b/blockchain/v2/processor_test.go @@ -47,11 +47,10 @@ func makeState(p *params) *pcState { return state } -func mBlockResponse(peerID p2p.ID, height int64) *bcBlockResponse { - return &bcBlockResponse{ +func mBlockResponse(peerID p2p.ID, height int64) *scBlockReceived { + return &scBlockReceived{ peerID: peerID, block: makePcBlock(height), - height: height, } } @@ -235,21 +234,42 @@ func TestPcProcessBlockSuccess(t *testing.T) { func TestPcProcessBlockFailures(t *testing.T) { tests := []testFields{ { - name: "blocks H+1 and H+2 present - H+1 verification fails ", + name: "blocks H+1 and H+2 present from different peers - H+1 verification fails ", steps: []pcFsmMakeStateValues{ { currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, verBL: []int64{1}}, event: pcProcessBlock{}, - wantState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, verBL: []int64{1}}, - wantNextEvent: pcBlockVerificationFailure{peerID: "P1", height: 1}, + wantState: ¶ms{items: []pcBlock{}, verBL: []int64{1}}, + wantNextEvent: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P2"}, }, }, }, { - name: "blocks H+1 and H+2 present - H+1 applyBlock fails ", + name: "blocks H+1 and H+2 present from same peer - H+1 applyBlock fails ", steps: []pcFsmMakeStateValues{ { currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, appBL: []int64{1}}, event: pcProcessBlock{}, - wantState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, appBL: []int64{1}}, wantPanic: true, + wantState: ¶ms{items: []pcBlock{}, appBL: []int64{1}}, wantPanic: true, + }, + }, + }, + { + name: "blocks H+1 and H+2 present from same peers - H+1 verification fails ", + steps: []pcFsmMakeStateValues{ + { + currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P1", 2}, {"P2", 3}}, verBL: []int64{1}}, + event: pcProcessBlock{}, + wantState: ¶ms{items: []pcBlock{{"P2", 3}}, verBL: []int64{1}}, + wantNextEvent: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P1"}, + }, + }, + }, + { + name: "blocks H+1 and H+2 present from different peers - H+1 applyBlock fails ", + steps: []pcFsmMakeStateValues{ + { + currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}, {"P2", 3}}, appBL: []int64{1}}, + event: pcProcessBlock{}, + wantState: ¶ms{items: []pcBlock{{"P2", 3}}, appBL: []int64{1}}, wantPanic: true, }, }, }, diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index a24a16f09..897dd738c 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -14,7 +14,7 @@ type handleFunc = func(event Event) (Event, error) // stream of events processed by a handle function. This Routine structure // handles the concurrency and messaging guarantees. Events are sent via // `send` are handled by the `handle` function to produce an iterator -// `next()`. Calling `close()` on a routine will conclude processing of all +// `next()`. Calling `stop()` on a routine will conclude processing of all // sent events and produce `final()` event representing the terminal state. type Routine struct { name string @@ -115,7 +115,7 @@ func (rt *Routine) ready() chan struct{} { } func (rt *Routine) stop() { - if !rt.isRunning() { + if !rt.isRunning() { // XXX: this should check rt.queue.Disposed() return } diff --git a/blockchain/v2/schedule.go b/blockchain/v2/schedule.go deleted file mode 100644 index 236d12aa9..000000000 --- a/blockchain/v2/schedule.go +++ /dev/null @@ -1,386 +0,0 @@ -// nolint:unused -package v2 - -import ( - "fmt" - "math" - "math/rand" - "time" - - "github.com/tendermint/tendermint/p2p" -) - -type blockState int - -const ( - blockStateUnknown blockState = iota - blockStateNew - blockStatePending - blockStateReceived - blockStateProcessed -) - -func (e blockState) String() string { - switch e { - case blockStateUnknown: - return "Unknown" - case blockStateNew: - return "New" - case blockStatePending: - return "Pending" - case blockStateReceived: - return "Received" - case blockStateProcessed: - return "Processed" - default: - return fmt.Sprintf("unknown blockState: %d", e) - } -} - -type peerState int - -const ( - peerStateNew = iota - peerStateReady - peerStateRemoved -) - -func (e peerState) String() string { - switch e { - case peerStateNew: - return "New" - case peerStateReady: - return "Ready" - case peerStateRemoved: - return "Removed" - default: - return fmt.Sprintf("unknown peerState: %d", e) - } -} - -type scPeer struct { - peerID p2p.ID - state peerState - height int64 - lastTouched time.Time - lastRate int64 -} - -func newScPeer(peerID p2p.ID) *scPeer { - return &scPeer{ - peerID: peerID, - state: peerStateNew, - height: -1, - lastTouched: time.Time{}, - } -} - -// The schedule is a composite data structure which allows a scheduler to keep -// track of which blocks have been scheduled into which state. -type schedule struct { - initHeight int64 - // a list of blocks in which blockState - blockStates map[int64]blockState - - // a map of peerID to schedule specific peer struct `scPeer` used to keep - // track of peer specific state - peers map[p2p.ID]*scPeer - - // a map of heights to the peer we are waiting for a response from - pendingBlocks map[int64]p2p.ID - - // the time at which a block was put in blockStatePending - pendingTime map[int64]time.Time - - // the peerID of the peer which put the block in blockStateReceived - receivedBlocks map[int64]p2p.ID -} - -func newSchedule(initHeight int64) *schedule { - sc := schedule{ - initHeight: initHeight, - blockStates: make(map[int64]blockState), - peers: make(map[p2p.ID]*scPeer), - pendingBlocks: make(map[int64]p2p.ID), - pendingTime: make(map[int64]time.Time), - receivedBlocks: make(map[int64]p2p.ID), - } - - sc.setStateAtHeight(initHeight, blockStateNew) - - return &sc -} - -func (sc *schedule) addPeer(peerID p2p.ID) error { - if _, ok := sc.peers[peerID]; ok { - return fmt.Errorf("cannot add duplicate peer %s", peerID) - } - sc.peers[peerID] = newScPeer(peerID) - return nil -} - -func (sc *schedule) touchPeer(peerID p2p.ID, time time.Time) error { - peer, ok := sc.peers[peerID] - if !ok { - return fmt.Errorf("couldn't find peer %s", peerID) - } - - if peer.state == peerStateRemoved { - return fmt.Errorf("tried to touch peer in peerStateRemoved") - } - - peer.lastTouched = time - - return nil -} - -func (sc *schedule) removePeer(peerID p2p.ID) error { - peer, ok := sc.peers[peerID] - if !ok { - return fmt.Errorf("couldn't find peer %s", peerID) - } - - if peer.state == peerStateRemoved { - return fmt.Errorf("tried to remove peer %s in peerStateRemoved", peerID) - } - - for height, pendingPeerID := range sc.pendingBlocks { - if pendingPeerID == peerID { - sc.setStateAtHeight(height, blockStateNew) - delete(sc.pendingTime, height) - delete(sc.pendingBlocks, height) - } - } - - for height, rcvPeerID := range sc.receivedBlocks { - if rcvPeerID == peerID { - sc.setStateAtHeight(height, blockStateNew) - delete(sc.receivedBlocks, height) - } - } - - peer.state = peerStateRemoved - - return nil -} - -// TODO - keep track of highest height -func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error { - peer, ok := sc.peers[peerID] - if !ok { - return fmt.Errorf("can't find peer %s", peerID) - } - - if peer.state == peerStateRemoved { - return fmt.Errorf("cannot set peer height for a peer in peerStateRemoved") - } - - if height < peer.height { - return fmt.Errorf("cannot move peer height lower. from %d to %d", peer.height, height) - } - - peer.height = height - peer.state = peerStateReady - for i := sc.minHeight(); i <= height; i++ { - if sc.getStateAtHeight(i) == blockStateUnknown { - sc.setStateAtHeight(i, blockStateNew) - } - } - - return nil -} - -func (sc *schedule) getStateAtHeight(height int64) blockState { - if height < sc.initHeight { - return blockStateProcessed - } else if state, ok := sc.blockStates[height]; ok { - return state - } else { - return blockStateUnknown - } -} - -func (sc *schedule) getPeersAtHeight(height int64) []*scPeer { - peers := []*scPeer{} - for _, peer := range sc.peers { - if peer.height >= height { - peers = append(peers, peer) - } - } - - return peers -} - -func (sc *schedule) peersInactiveSince(duration time.Duration, now time.Time) []p2p.ID { - peers := []p2p.ID{} - for _, peer := range sc.peers { - if now.Sub(peer.lastTouched) > duration { - peers = append(peers, peer.peerID) - } - } - - return peers -} - -func (sc *schedule) peersSlowerThan(minSpeed int64) []p2p.ID { - peers := []p2p.ID{} - for _, peer := range sc.peers { - if peer.lastRate < minSpeed { - peers = append(peers, peer.peerID) - } - } - - return peers -} - -func (sc *schedule) setStateAtHeight(height int64, state blockState) { - sc.blockStates[height] = state -} - -func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error { - peer, ok := sc.peers[peerID] - if !ok { - return fmt.Errorf("can't find peer %s", peerID) - } - - if peer.state == peerStateRemoved { - return fmt.Errorf("cannot receive blocks from removed peer %s", peerID) - } - - if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID { - return fmt.Errorf("received block %d from peer %s without being requested", height, peerID) - } - - pendingTime, ok := sc.pendingTime[height] - if !ok || now.Sub(pendingTime) <= 0 { - return fmt.Errorf("clock error. Block %d received at %s but requested at %s", - height, pendingTime, now) - } - - peer.lastRate = size / int64(now.Sub(pendingTime).Seconds()) - - sc.setStateAtHeight(height, blockStateReceived) - delete(sc.pendingBlocks, height) - delete(sc.pendingTime, height) - - sc.receivedBlocks[height] = peerID - - return nil -} - -func (sc *schedule) markPending(peerID p2p.ID, height int64, time time.Time) error { - peer, ok := sc.peers[peerID] - if !ok { - return fmt.Errorf("can't find peer %s", peerID) - } - - state := sc.getStateAtHeight(height) - if state != blockStateNew { - return fmt.Errorf("block %d should be in blockStateNew but was %s", height, state) - } - - if peer.state != peerStateReady { - return fmt.Errorf("cannot schedule %d from %s in %s", height, peerID, peer.state) - } - - if height > peer.height { - return fmt.Errorf("cannot request height %d from peer %s who is at height %d", - height, peerID, peer.height) - } - - sc.setStateAtHeight(height, blockStatePending) - sc.pendingBlocks[height] = peerID - // XXX: to make this more accurate we can introduce a message from - // the IO routine which indicates the time the request was put on the wire - sc.pendingTime[height] = time - - return nil -} - -func (sc *schedule) markProcessed(height int64) error { - state := sc.getStateAtHeight(height) - if state != blockStateReceived { - return fmt.Errorf("can't mark height %d received from block state %s", height, state) - } - - delete(sc.receivedBlocks, height) - - sc.setStateAtHeight(height, blockStateProcessed) - - return nil -} - -// allBlockProcessed returns true if all blocks are in blockStateProcessed and -// determines if the schedule has been completed -func (sc *schedule) allBlocksProcessed() bool { - for _, state := range sc.blockStates { - if state != blockStateProcessed { - return false - } - } - return true -} - -// highest block | state == blockStateNew -func (sc *schedule) maxHeight() int64 { - var max int64 = 0 - for height, state := range sc.blockStates { - if state == blockStateNew && height > max { - max = height - } - } - - return max -} - -// lowest block | state == blockStateNew -func (sc *schedule) minHeight() int64 { - var min int64 = math.MaxInt64 - for height, state := range sc.blockStates { - if state == blockStateNew && height < min { - min = height - } - } - - return min -} - -func (sc *schedule) pendingFrom(peerID p2p.ID) []int64 { - heights := []int64{} - for height, pendingPeerID := range sc.pendingBlocks { - if pendingPeerID == peerID { - heights = append(heights, height) - } - } - return heights -} - -func (sc *schedule) selectPeer(peers []*scPeer) *scPeer { - // FIXME: properPeerSelector - s := rand.NewSource(time.Now().Unix()) - r := rand.New(s) - - return peers[r.Intn(len(peers))] -} - -// XXX: this duplicates the logic of peersInactiveSince and peersSlowerThan -func (sc *schedule) prunablePeers(peerTimout time.Duration, minRecvRate int64, now time.Time) []p2p.ID { - prunable := []p2p.ID{} - for peerID, peer := range sc.peers { - if now.Sub(peer.lastTouched) > peerTimout || peer.lastRate < minRecvRate { - prunable = append(prunable, peerID) - } - } - - return prunable -} - -func (sc *schedule) numBlockInState(targetState blockState) uint32 { - var num uint32 = 0 - for _, state := range sc.blockStates { - if state == targetState { - num++ - } - } - return num -} diff --git a/blockchain/v2/schedule_test.go b/blockchain/v2/schedule_test.go deleted file mode 100644 index a1448c528..000000000 --- a/blockchain/v2/schedule_test.go +++ /dev/null @@ -1,272 +0,0 @@ -package v2 - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/tendermint/tendermint/p2p" -) - -func TestScheduleInit(t *testing.T) { - var ( - initHeight int64 = 5 - sc = newSchedule(initHeight) - ) - - assert.Equal(t, blockStateNew, sc.getStateAtHeight(initHeight)) - assert.Equal(t, blockStateProcessed, sc.getStateAtHeight(initHeight-1)) - assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(initHeight+1)) -} - -func TestAddPeer(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerIDTwo p2p.ID = "2" - sc = newSchedule(initHeight) - ) - - assert.Nil(t, sc.addPeer(peerID)) - assert.Nil(t, sc.addPeer(peerIDTwo)) - assert.Error(t, sc.addPeer(peerID)) -} - -func TestTouchPeer(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - sc = newSchedule(initHeight) - now = time.Now() - ) - - assert.Error(t, sc.touchPeer(peerID, now), - "Touching an unknown peer should return errPeerNotFound") - - assert.Nil(t, sc.addPeer(peerID), - "Adding a peer should return no error") - assert.Nil(t, sc.touchPeer(peerID, now), - "Touching a peer should return no error") - - threshold := 10 * time.Second - assert.Empty(t, sc.peersInactiveSince(threshold, now.Add(9*time.Second)), - "Expected no peers to have been touched over 9 seconds") - assert.Containsf(t, sc.peersInactiveSince(threshold, now.Add(11*time.Second)), peerID, - "Expected one %s to have been touched over 10 seconds ago", peerID) -} - -func TestPeerHeight(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerHeight int64 = 20 - sc = newSchedule(initHeight) - ) - - assert.NoError(t, sc.addPeer(peerID), - "Adding a peer should return no error") - assert.NoError(t, sc.setPeerHeight(peerID, peerHeight)) - for i := initHeight; i <= peerHeight; i++ { - assert.Equal(t, sc.getStateAtHeight(i), blockStateNew, - "Expected all blocks to be in blockStateNew") - peerIDs := []p2p.ID{} - for _, peer := range sc.getPeersAtHeight(i) { - peerIDs = append(peerIDs, peer.peerID) - } - - assert.Containsf(t, peerIDs, peerID, - "Expected %s to have block %d", peerID, i) - } -} - -func TestTransitionPending(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerIDTwo p2p.ID = "2" - peerHeight int64 = 20 - sc = newSchedule(initHeight) - now = time.Now() - ) - - assert.NoError(t, sc.addPeer(peerID), - "Adding a peer should return no error") - assert.Nil(t, sc.addPeer(peerIDTwo), - "Adding a peer should return no error") - - assert.Error(t, sc.markPending(peerID, peerHeight, now), - "Expected scheduling a block from a peer in peerStateNew to fail") - - assert.NoError(t, sc.setPeerHeight(peerID, peerHeight), - "Expected setPeerHeight to return no error") - assert.NoError(t, sc.setPeerHeight(peerIDTwo, peerHeight), - "Expected setPeerHeight to return no error") - - assert.NoError(t, sc.markPending(peerID, peerHeight, now), - "Expected markingPending new block to succeed") - assert.Error(t, sc.markPending(peerIDTwo, peerHeight, now), - "Expected markingPending by a second peer to fail") - - assert.Equal(t, blockStatePending, sc.getStateAtHeight(peerHeight), - "Expected the block to to be in blockStatePending") - - assert.NoError(t, sc.removePeer(peerID), - "Expected removePeer to return no error") - - assert.Equal(t, blockStateNew, sc.getStateAtHeight(peerHeight), - "Expected the block to to be in blockStateNew") - - assert.Error(t, sc.markPending(peerID, peerHeight, now), - "Expected markingPending removed peer to fail") - - assert.NoError(t, sc.markPending(peerIDTwo, peerHeight, now), - "Expected markingPending on a ready peer to succeed") - - assert.Equal(t, blockStatePending, sc.getStateAtHeight(peerHeight), - "Expected the block to to be in blockStatePending") -} - -func TestTransitionReceived(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerIDTwo p2p.ID = "2" - peerHeight int64 = 20 - blockSize int64 = 1024 - sc = newSchedule(initHeight) - now = time.Now() - receivedAt = now.Add(1 * time.Second) - ) - - assert.NoError(t, sc.addPeer(peerID), - "Expected adding peer %s to succeed", peerID) - assert.NoError(t, sc.addPeer(peerIDTwo), - "Expected adding peer %s to succeed", peerIDTwo) - assert.NoError(t, sc.setPeerHeight(peerID, peerHeight), - "Expected setPeerHeight to return no error") - assert.NoErrorf(t, sc.setPeerHeight(peerIDTwo, peerHeight), - "Expected setPeerHeight on %s to %d to succeed", peerIDTwo, peerHeight) - assert.NoError(t, sc.markPending(peerID, initHeight, now), - "Expected markingPending new block to succeed") - - assert.Error(t, sc.markReceived(peerIDTwo, initHeight, blockSize, receivedAt), - "Expected marking markReceived from a non requesting peer to fail") - - assert.NoError(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt), - "Expected marking markReceived on a pending block to succeed") - - assert.Error(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt), - "Expected marking markReceived on received block to fail") - - assert.Equalf(t, blockStateReceived, sc.getStateAtHeight(initHeight), - "Expected block %d to be blockHeightReceived", initHeight) - - assert.NoErrorf(t, sc.removePeer(peerID), - "Expected removePeer removing %s to succeed", peerID) - - assert.Equalf(t, blockStateNew, sc.getStateAtHeight(initHeight), - "Expected block %d to be blockStateNew", initHeight) - - assert.NoErrorf(t, sc.markPending(peerIDTwo, initHeight, now), - "Expected markingPending %d from %s to succeed", initHeight, peerIDTwo) - assert.NoErrorf(t, sc.markReceived(peerIDTwo, initHeight, blockSize, receivedAt), - "Expected marking markReceived %d from %s to succeed", initHeight, peerIDTwo) - assert.Equalf(t, blockStateReceived, sc.getStateAtHeight(initHeight), - "Expected block %d to be blockStateReceived", initHeight) -} - -func TestTransitionProcessed(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerHeight int64 = 20 - blockSize int64 = 1024 - sc = newSchedule(initHeight) - now = time.Now() - receivedAt = now.Add(1 * time.Second) - ) - - assert.NoError(t, sc.addPeer(peerID), - "Expected adding peer %s to succeed", peerID) - assert.NoErrorf(t, sc.setPeerHeight(peerID, peerHeight), - "Expected setPeerHeight on %s to %d to succeed", peerID, peerHeight) - assert.NoError(t, sc.markPending(peerID, initHeight, now), - "Expected markingPending new block to succeed") - assert.NoError(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt), - "Expected marking markReceived on a pending block to succeed") - - assert.Error(t, sc.markProcessed(initHeight+1), - "Expected marking %d as processed to fail", initHeight+1) - assert.NoError(t, sc.markProcessed(initHeight), - "Expected marking %d as processed to succeed", initHeight) - - assert.Equalf(t, blockStateProcessed, sc.getStateAtHeight(initHeight), - "Expected block %d to be blockStateProcessed", initHeight) - - assert.NoError(t, sc.removePeer(peerID), - "Expected removing peer %s to succeed", peerID) - - assert.Equalf(t, blockStateProcessed, sc.getStateAtHeight(initHeight), - "Expected block %d to be blockStateProcessed", initHeight) -} - -func TestMinMaxHeight(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerHeight int64 = 20 - sc = newSchedule(initHeight) - now = time.Now() - ) - - assert.Equal(t, initHeight, sc.minHeight(), - "Expected min height to be the initialized height") - - assert.Equal(t, initHeight, sc.maxHeight(), - "Expected max height to be the initialized height") - - assert.NoError(t, sc.addPeer(peerID), - "Adding a peer should return no error") - - assert.NoError(t, sc.setPeerHeight(peerID, peerHeight), - "Expected setPeerHeight to return no error") - - assert.Equal(t, peerHeight, sc.maxHeight(), - "Expected max height to increase to peerHeight") - - assert.Nil(t, sc.markPending(peerID, initHeight, now.Add(1*time.Second)), - "Expected marking initHeight as pending to return no error") - - assert.Equal(t, initHeight+1, sc.minHeight(), - "Expected marking initHeight as pending to move minHeight forward") -} - -func TestPeersSlowerThan(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerHeight int64 = 20 - blockSize int64 = 1024 - sc = newSchedule(initHeight) - now = time.Now() - receivedAt = now.Add(1 * time.Second) - ) - - assert.NoError(t, sc.addPeer(peerID), - "Adding a peer should return no error") - - assert.NoError(t, sc.setPeerHeight(peerID, peerHeight), - "Expected setPeerHeight to return no error") - - assert.NoError(t, sc.markPending(peerID, peerHeight, now), - "Expected markingPending on to return no error") - - assert.NoError(t, sc.markReceived(peerID, peerHeight, blockSize, receivedAt), - "Expected markingPending on to return no error") - - assert.Empty(t, sc.peersSlowerThan(blockSize-1), - "expected no peers to be slower than blockSize-1 bytes/sec") - - assert.Containsf(t, sc.peersSlowerThan(blockSize+1), peerID, - "expected %s to be slower than blockSize+1 bytes/sec", peerID) -} diff --git a/blockchain/v2/scheduler.go b/blockchain/v2/scheduler.go new file mode 100644 index 000000000..ab3892dc5 --- /dev/null +++ b/blockchain/v2/scheduler.go @@ -0,0 +1,718 @@ +package v2 + +import ( + "bytes" + "fmt" + "math" + "sort" + "time" + + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/types" +) + +// Events + +// XXX: The handle API would be much simpler if it return a single event, an +// Event, which embeds a terminationEvent if it wants to terminate the routine. + +// Input events into the scheduler: +// ticker event for cleaning peers +type tryPrunePeer struct { + priorityHigh + time time.Time +} + +// ticker event for scheduling block requests +type trySchedule struct { + priorityHigh + time time.Time +} + +// blockResponse message received from a peer +type bcBlockResponse struct { + priorityNormal + time time.Time + peerID p2p.ID + height int64 + size int64 + block *types.Block +} + +// statusResponse message received from a peer +type bcStatusResponse struct { + priorityNormal + time time.Time + peerID p2p.ID + height int64 +} + +// new peer is connected +type addNewPeer struct { + priorityNormal + peerID p2p.ID +} + +// Output events issued by the scheduler: +// all blocks have been processed +type scFinishedEv struct { + priorityNormal +} + +// send a blockRequest message +type scBlockRequest struct { + priorityNormal + peerID p2p.ID + height int64 +} + +// a block has been received and validated by the scheduler +type scBlockReceived struct { + priorityNormal + peerID p2p.ID + block *types.Block +} + +// scheduler detected a peer error +type scPeerError struct { + priorityHigh + peerID p2p.ID + reason error +} + +// scheduler removed a set of peers (timed out or slow peer) +type scPeersPruned struct { + priorityHigh + peers []p2p.ID +} + +// XXX: make this fatal? +// scheduler encountered a fatal error +type scSchedulerFail struct { + priorityHigh + reason error +} + +type blockState int + +const ( + blockStateUnknown blockState = iota + 1 // no known peer has this block + blockStateNew // indicates that a peer has reported having this block + blockStatePending // indicates that this block has been requested from a peer + blockStateReceived // indicates that this block has been received by a peer + blockStateProcessed // indicates that this block has been applied +) + +func (e blockState) String() string { + switch e { + case blockStateUnknown: + return "Unknown" + case blockStateNew: + return "New" + case blockStatePending: + return "Pending" + case blockStateReceived: + return "Received" + case blockStateProcessed: + return "Processed" + default: + return fmt.Sprintf("invalid blockState: %d", e) + } +} + +type peerState int + +const ( + peerStateNew = iota + 1 + peerStateReady + peerStateRemoved +) + +func (e peerState) String() string { + switch e { + case peerStateNew: + return "New" + case peerStateReady: + return "Ready" + case peerStateRemoved: + return "Removed" + default: + panic(fmt.Sprintf("unknown peerState: %d", e)) + } +} + +type scPeer struct { + peerID p2p.ID + + // initialized as New when peer is added, updated to Ready when statusUpdate is received, + // updated to Removed when peer is removed + state peerState + + height int64 // updated when statusResponse is received + lastTouched time.Time + lastRate int64 // last receive rate in bytes +} + +func (p scPeer) String() string { + return fmt.Sprintf("{state %v, height %d, lastTouched %v, lastRate %d, id %v}", + p.state, p.height, p.lastTouched, p.lastRate, p.peerID) +} + +func newScPeer(peerID p2p.ID) *scPeer { + return &scPeer{ + peerID: peerID, + state: peerStateNew, + height: -1, + } +} + +// The scheduler keep track of the state of each block and each peer. The +// scheduler will attempt to schedule new block requests with `trySchedule` +// events and remove slow peers with `tryPrune` events. +type scheduler struct { + initHeight int64 + + // next block that needs to be processed. All blocks with smaller height are + // in Processed state. + height int64 + + // a map of peerID to scheduler specific peer struct `scPeer` used to keep + // track of peer specific state + peers map[p2p.ID]*scPeer + peerTimeout time.Duration + minRecvRate int64 // minimum receive rate from peer otherwise prune + + // the maximum number of blocks that should be New, Received or Pending at any point + // in time. This is used to enforce a limit on the blockStates map. + targetPending int + // a list of blocks to be scheduled (New), Pending or Received. Its length should be + // smaller than targetPending. + blockStates map[int64]blockState + + // a map of heights to the peer we are waiting a response from + pendingBlocks map[int64]p2p.ID + + // the time at which a block was put in blockStatePending + pendingTime map[int64]time.Time + + // a map of heights to the peers that put the block in blockStateReceived + receivedBlocks map[int64]p2p.ID +} + +func (sc scheduler) String() string { + return fmt.Sprintf("ih: %d, bst: %v, peers: %v, pblks: %v, ptm %v, rblks: %v", + sc.initHeight, sc.blockStates, sc.peers, sc.pendingBlocks, sc.pendingTime, sc.receivedBlocks) +} + +func newScheduler(initHeight int64) *scheduler { + sc := scheduler{ + initHeight: initHeight, + height: initHeight + 1, + blockStates: make(map[int64]blockState), + peers: make(map[p2p.ID]*scPeer), + pendingBlocks: make(map[int64]p2p.ID), + pendingTime: make(map[int64]time.Time), + receivedBlocks: make(map[int64]p2p.ID), + } + + return &sc +} + +func (sc *scheduler) addPeer(peerID p2p.ID) error { + if _, ok := sc.peers[peerID]; ok { + // In the future we should be able to add a previously removed peer + return fmt.Errorf("cannot add duplicate peer %s", peerID) + } + sc.peers[peerID] = newScPeer(peerID) + return nil +} + +func (sc *scheduler) touchPeer(peerID p2p.ID, time time.Time) error { + peer, ok := sc.peers[peerID] + if !ok { + return fmt.Errorf("couldn't find peer %s", peerID) + } + + if peer.state != peerStateReady { + return fmt.Errorf("tried to touch peer in state %s, must be Ready", peer.state) + } + + peer.lastTouched = time + + return nil +} + +func (sc *scheduler) removePeer(peerID p2p.ID) error { + peer, ok := sc.peers[peerID] + if !ok { + return fmt.Errorf("couldn't find peer %s", peerID) + } + + if peer.state == peerStateRemoved { + return fmt.Errorf("tried to remove peer %s in peerStateRemoved", peerID) + } + + for height, pendingPeerID := range sc.pendingBlocks { + if pendingPeerID == peerID { + sc.setStateAtHeight(height, blockStateNew) + delete(sc.pendingTime, height) + delete(sc.pendingBlocks, height) + } + } + + for height, rcvPeerID := range sc.receivedBlocks { + if rcvPeerID == peerID { + sc.setStateAtHeight(height, blockStateNew) + delete(sc.receivedBlocks, height) + } + } + + // remove the blocks from blockStates if the peer removal causes the max peer height to be lower. + peer.state = peerStateRemoved + maxPeerHeight := int64(0) + for _, otherPeer := range sc.peers { + if otherPeer.state != peerStateReady { + continue + } + if otherPeer.peerID != peer.peerID && otherPeer.height > maxPeerHeight { + maxPeerHeight = otherPeer.height + } + } + for h := range sc.blockStates { + if h > maxPeerHeight { + delete(sc.blockStates, h) + } + } + + return nil +} + +// check if the blockPool is running low and add new blocks in New state to be requested. +// This function is called when there is an increase in the maximum peer height or when +// blocks are processed. +func (sc *scheduler) addNewBlocks() { + if len(sc.blockStates) >= sc.targetPending { + return + } + + for i := sc.height; i < int64(sc.targetPending)+sc.height; i++ { + if i > sc.maxHeight() { + break + } + if sc.getStateAtHeight(i) == blockStateUnknown { + sc.setStateAtHeight(i, blockStateNew) + } + } +} + +func (sc *scheduler) setPeerHeight(peerID p2p.ID, height int64) error { + peer, ok := sc.peers[peerID] + if !ok { + return fmt.Errorf("cannot find peer %s", peerID) + } + + if peer.state == peerStateRemoved { + return fmt.Errorf("cannot set peer height for a peer in peerStateRemoved") + } + + if height < peer.height { + return fmt.Errorf("cannot move peer height lower. from %d to %d", peer.height, height) + } + + peer.height = height + peer.state = peerStateReady + + sc.addNewBlocks() + return nil +} + +func (sc *scheduler) getStateAtHeight(height int64) blockState { + if height <= sc.initHeight { + return blockStateProcessed + } else if state, ok := sc.blockStates[height]; ok { + return state + } else { + return blockStateUnknown + } +} + +func (sc *scheduler) getPeersAtHeightOrAbove(height int64) []p2p.ID { + peers := make([]p2p.ID, 0) + for _, peer := range sc.peers { + if peer.state != peerStateReady { + continue + } + if peer.height >= height { + peers = append(peers, peer.peerID) + } + } + return peers +} + +func (sc *scheduler) peersInactiveSince(duration time.Duration, now time.Time) []p2p.ID { + peers := []p2p.ID{} + for _, peer := range sc.peers { + if peer.state != peerStateReady { + continue + } + if now.Sub(peer.lastTouched) > duration { + peers = append(peers, peer.peerID) + } + } + + // Ensure the order is deterministic for testing + sort.Sort(PeerByID(peers)) + return peers +} + +// will return peers who's lastRate i slower than minSpeed denominated in bytes +func (sc *scheduler) peersSlowerThan(minSpeed int64) []p2p.ID { + peers := []p2p.ID{} + for peerID, peer := range sc.peers { + if peer.state != peerStateReady { + continue + } + if peer.lastRate < minSpeed { + peers = append(peers, peerID) + } + } + + // Ensure the order is deterministic for testing + sort.Sort(PeerByID(peers)) + return peers +} + +func (sc *scheduler) prunablePeers(peerTimout time.Duration, minRecvRate int64, now time.Time) []p2p.ID { + prunable := []p2p.ID{} + for peerID, peer := range sc.peers { + if peer.state != peerStateReady { + continue + } + if now.Sub(peer.lastTouched) > peerTimout || peer.lastRate < minRecvRate { + prunable = append(prunable, peerID) + } + } + // Tests for handleTryPrunePeer() may fail without sort due to range non-determinism + sort.Sort(PeerByID(prunable)) + return prunable +} + +func (sc *scheduler) setStateAtHeight(height int64, state blockState) { + sc.blockStates[height] = state +} + +func (sc *scheduler) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error { + peer, ok := sc.peers[peerID] + if !ok { + return fmt.Errorf("couldn't find peer %s", peerID) + } + + if peer.state == peerStateRemoved { + return fmt.Errorf("cannot receive blocks from removed peer %s", peerID) + } + + if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID { + return fmt.Errorf("received block %d from peer %s without being requested", height, peerID) + } + + pendingTime, ok := sc.pendingTime[height] + if !ok || now.Sub(pendingTime) <= 0 { + return fmt.Errorf("clock error: block %d received at %s but requested at %s", + height, pendingTime, now) + } + + peer.lastRate = size / now.Sub(pendingTime).Nanoseconds() + + sc.setStateAtHeight(height, blockStateReceived) + delete(sc.pendingBlocks, height) + delete(sc.pendingTime, height) + + sc.receivedBlocks[height] = peerID + + return nil +} + +func (sc *scheduler) markPending(peerID p2p.ID, height int64, time time.Time) error { + state := sc.getStateAtHeight(height) + if state != blockStateNew { + return fmt.Errorf("block %d should be in blockStateNew but is %s", height, state) + } + + peer, ok := sc.peers[peerID] + if !ok { + return fmt.Errorf("cannot find peer %s", peerID) + } + + if peer.state != peerStateReady { + return fmt.Errorf("cannot schedule %d from %s in %s", height, peerID, peer.state) + } + + if height > peer.height { + return fmt.Errorf("cannot request height %d from peer %s that is at height %d", + height, peerID, peer.height) + } + + sc.setStateAtHeight(height, blockStatePending) + sc.pendingBlocks[height] = peerID + // XXX: to make this more accurate we can introduce a message from + // the IO routine which indicates the time the request was put on the wire + sc.pendingTime[height] = time + + return nil +} + +func (sc *scheduler) markProcessed(height int64) error { + state := sc.getStateAtHeight(height) + if state != blockStateReceived { + return fmt.Errorf("cannot mark height %d received from block state %s", height, state) + } + + sc.height++ + delete(sc.receivedBlocks, height) + delete(sc.blockStates, height) + sc.addNewBlocks() + + return nil +} + +func (sc *scheduler) allBlocksProcessed() bool { + return sc.height >= sc.maxHeight() +} + +// returns max peer height or the last processed block, i.e. sc.height +func (sc *scheduler) maxHeight() int64 { + max := sc.height - 1 + for _, peer := range sc.peers { + if peer.state != peerStateReady { + continue + } + if peer.height > max { + max = peer.height + } + } + return max +} + +// lowest block in sc.blockStates with state == blockStateNew or -1 if no new blocks +func (sc *scheduler) nextHeightToSchedule() int64 { + var min int64 = math.MaxInt64 + for height, state := range sc.blockStates { + if state == blockStateNew && height < min { + min = height + } + } + if min == math.MaxInt64 { + min = -1 + } + return min +} + +func (sc *scheduler) pendingFrom(peerID p2p.ID) []int64 { + var heights []int64 + for height, pendingPeerID := range sc.pendingBlocks { + if pendingPeerID == peerID { + heights = append(heights, height) + } + } + return heights +} + +func (sc *scheduler) selectPeer(height int64) (p2p.ID, error) { + peers := sc.getPeersAtHeightOrAbove(height) + if len(peers) == 0 { + return "", fmt.Errorf("cannot find peer for height %d", height) + } + + // create a map from number of pending requests to a list + // of peers having that number of pending requests. + pendingFrom := make(map[int][]p2p.ID) + for _, peerID := range peers { + numPending := len(sc.pendingFrom(peerID)) + pendingFrom[numPending] = append(pendingFrom[numPending], peerID) + } + + // find the set of peers with minimum number of pending requests. + minPending := math.MaxInt64 + for mp := range pendingFrom { + if mp < minPending { + minPending = mp + } + } + + sort.Sort(PeerByID(pendingFrom[minPending])) + return pendingFrom[minPending][0], nil +} + +// PeerByID is a list of peers sorted by peerID. +type PeerByID []p2p.ID + +func (peers PeerByID) Len() int { + return len(peers) +} +func (peers PeerByID) Less(i, j int) bool { + return bytes.Compare([]byte(peers[i]), []byte(peers[j])) == -1 +} + +func (peers PeerByID) Swap(i, j int) { + it := peers[i] + peers[i] = peers[j] + peers[j] = it +} + +// Handlers + +// This handler gets the block, performs some validation and then passes it on to the processor. +func (sc *scheduler) handleBlockResponse(event bcBlockResponse) (Event, error) { + err := sc.touchPeer(event.peerID, event.time) + if err != nil { + return scPeerError{peerID: event.peerID, reason: err}, nil + } + + err = sc.markReceived(event.peerID, event.block.Height, event.size, event.time) + if err != nil { + return scPeerError{peerID: event.peerID, reason: err}, nil + } + + return scBlockReceived{peerID: event.peerID, block: event.block}, nil +} + +func (sc *scheduler) handleBlockProcessed(event pcBlockProcessed) (Event, error) { + if event.height != sc.height { + panic(fmt.Sprintf("processed height %d but expected height %d", event.height, sc.height)) + } + err := sc.markProcessed(event.height) + if err != nil { + // It is possible that a peer error or timeout is handled after the processor + // has processed the block but before the scheduler received this event, + // so when pcBlockProcessed event is received the block had been requested again + return scSchedulerFail{reason: err}, nil + } + + if sc.allBlocksProcessed() { + return scFinishedEv{}, nil + } + + return noOp, nil +} + +// Handles an error from the processor. The processor had already cleaned the blocks from +// the peers included in this event. Just attempt to remove the peers. +func (sc *scheduler) handleBlockProcessError(event pcBlockVerificationFailure) (Event, error) { + if len(sc.peers) == 0 { + return noOp, nil + } + // The peers may have been just removed due to errors, low speed or timeouts. + _ = sc.removePeer(event.firstPeerID) + if event.firstPeerID != event.secondPeerID { + _ = sc.removePeer(event.secondPeerID) + } + + if sc.allBlocksProcessed() { + return scFinishedEv{}, nil + } + + return noOp, nil +} + +func (sc *scheduler) handleAddNewPeer(event addNewPeer) (Event, error) { + err := sc.addPeer(event.peerID) + if err != nil { + return scSchedulerFail{reason: err}, nil + } + return noOp, nil +} + +// XXX: unify types peerError +func (sc *scheduler) handlePeerError(event peerError) (Event, error) { + err := sc.removePeer(event.peerID) + if err != nil { + // XXX - It is possible that the removePeer fails here for legitimate reasons + // for example if a peer timeout or error was handled just before this. + return scSchedulerFail{reason: err}, nil + } + if sc.allBlocksProcessed() { + return scFinishedEv{}, nil + } + return noOp, nil +} + +func (sc *scheduler) handleTryPrunePeer(event tryPrunePeer) (Event, error) { + prunablePeers := sc.prunablePeers(sc.peerTimeout, sc.minRecvRate, event.time) + if len(prunablePeers) == 0 { + return noOp, nil + } + for _, peerID := range prunablePeers { + err := sc.removePeer(peerID) + if err != nil { + // Should never happen as prunablePeers() returns only existing peers in Ready state. + panic("scheduler data corruption") + } + } + + // If all blocks are processed we should finish even some peers were pruned. + if sc.allBlocksProcessed() { + return scFinishedEv{}, nil + } + + return scPeersPruned{peers: prunablePeers}, nil + +} + +// TODO - Schedule multiple block requests +func (sc *scheduler) handleTrySchedule(event trySchedule) (Event, error) { + + nextHeight := sc.nextHeightToSchedule() + if nextHeight == -1 { + return noOp, nil + } + + bestPeerID, err := sc.selectPeer(nextHeight) + if err != nil { + return scSchedulerFail{reason: err}, nil + } + if err := sc.markPending(bestPeerID, nextHeight, event.time); err != nil { + return scSchedulerFail{reason: err}, nil // XXX: peerError might be more appropriate + } + return scBlockRequest{peerID: bestPeerID, height: nextHeight}, nil + +} + +func (sc *scheduler) handleStatusResponse(event bcStatusResponse) (Event, error) { + err := sc.setPeerHeight(event.peerID, event.height) + if err != nil { + return scPeerError{peerID: event.peerID, reason: err}, nil + } + return noOp, nil +} + +func (sc *scheduler) handle(event Event) (Event, error) { + switch event := event.(type) { + case bcStatusResponse: + nextEvent, err := sc.handleStatusResponse(event) + return nextEvent, err + case bcBlockResponse: + nextEvent, err := sc.handleBlockResponse(event) + return nextEvent, err + case trySchedule: + nextEvent, err := sc.handleTrySchedule(event) + return nextEvent, err + case addNewPeer: + nextEvent, err := sc.handleAddNewPeer(event) + return nextEvent, err + case tryPrunePeer: + nextEvent, err := sc.handleTryPrunePeer(event) + return nextEvent, err + case peerError: + nextEvent, err := sc.handlePeerError(event) + return nextEvent, err + case pcBlockProcessed: + nextEvent, err := sc.handleBlockProcessed(event) + return nextEvent, err + case pcBlockVerificationFailure: + nextEvent, err := sc.handleBlockProcessError(event) + return nextEvent, err + default: + return scSchedulerFail{reason: fmt.Errorf("unknown event %v", event)}, nil + } + //return noOp, nil +} diff --git a/blockchain/v2/scheduler_test.go b/blockchain/v2/scheduler_test.go new file mode 100644 index 000000000..a3a6db672 --- /dev/null +++ b/blockchain/v2/scheduler_test.go @@ -0,0 +1,2293 @@ +package v2 + +import ( + "fmt" + "math" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/types" +) + +type scTestParams struct { + peers map[string]*scPeer + initHeight int64 + height int64 + allB []int64 + pending map[int64]p2p.ID + pendingTime map[int64]time.Time + received map[int64]p2p.ID + peerTimeout time.Duration + minRecvRate int64 + targetPending int +} + +func verifyScheduler(sc *scheduler) { + missing := 0 + if sc.maxHeight() >= sc.height { + missing = int(math.Min(float64(sc.targetPending), float64(sc.maxHeight()-sc.height+1))) + } + if len(sc.blockStates) != missing { + panic(fmt.Sprintf("scheduler block length %d different than target %d", len(sc.blockStates), missing)) + } +} + +func newTestScheduler(params scTestParams) *scheduler { + peers := make(map[p2p.ID]*scPeer) + + sc := newScheduler(params.initHeight) + if params.height != 0 { + sc.height = params.height + } + + for id, peer := range params.peers { + peer.peerID = p2p.ID(id) + peers[p2p.ID(id)] = peer + } + for _, h := range params.allB { + sc.blockStates[h] = blockStateNew + } + for h, pid := range params.pending { + sc.blockStates[h] = blockStatePending + sc.pendingBlocks[h] = pid + } + for h, tm := range params.pendingTime { + sc.pendingTime[h] = tm + } + for h, pid := range params.received { + sc.blockStates[h] = blockStateReceived + sc.receivedBlocks[h] = pid + } + + sc.peers = peers + sc.peerTimeout = params.peerTimeout + if params.targetPending == 0 { + sc.targetPending = 10 + } else { + sc.targetPending = params.targetPending + } + + sc.minRecvRate = params.minRecvRate + + verifyScheduler(sc) + + return sc +} + +func TestScInit(t *testing.T) { + var ( + initHeight int64 = 5 + sc = newScheduler(initHeight) + ) + assert.Equal(t, blockStateProcessed, sc.getStateAtHeight(initHeight)) + assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(initHeight+1)) +} + +func TestScMaxHeights(t *testing.T) { + + tests := []struct { + name string + sc scheduler + wantMax int64 + }{ + { + name: "no peers", + sc: scheduler{height: 11}, + wantMax: 10, + }, + { + name: "one ready peer", + sc: scheduler{ + initHeight: 2, + height: 3, + peers: map[p2p.ID]*scPeer{"P1": {height: 6, state: peerStateReady}}, + }, + wantMax: 6, + }, + { + name: "ready and removed peers", + sc: scheduler{ + height: 1, + peers: map[p2p.ID]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 10, state: peerStateRemoved}}, + }, + wantMax: 4, + }, + { + name: "removed peers", + sc: scheduler{ + height: 1, + peers: map[p2p.ID]*scPeer{ + "P1": {height: 4, state: peerStateRemoved}, + "P2": {height: 10, state: peerStateRemoved}}, + }, + wantMax: 0, + }, + { + name: "new peers", + sc: scheduler{ + height: 1, + peers: map[p2p.ID]*scPeer{ + "P1": {height: -1, state: peerStateNew}, + "P2": {height: -1, state: peerStateNew}}, + }, + wantMax: 0, + }, + { + name: "mixed peers", + sc: scheduler{ + height: 1, + peers: map[p2p.ID]*scPeer{ + "P1": {height: -1, state: peerStateNew}, + "P2": {height: 10, state: peerStateReady}, + "P3": {height: 20, state: peerStateRemoved}, + "P4": {height: 22, state: peerStateReady}, + }, + }, + wantMax: 22, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + // maxHeight() should not mutate the scheduler + wantSc := tt.sc + + resMax := tt.sc.maxHeight() + assert.Equal(t, tt.wantMax, resMax) + assert.Equal(t, wantSc, tt.sc) + }) + } +} + +func TestScAddPeer(t *testing.T) { + + type args struct { + peerID p2p.ID + } + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "add first peer", + fields: scTestParams{}, + args: args{peerID: "P1"}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}}, + }, + { + name: "add second peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}}, + args: args{peerID: "P2"}, + wantFields: scTestParams{peers: map[string]*scPeer{ + "P1": {state: peerStateNew, height: -1}, + "P2": {state: peerStateNew, height: -1}}}, + }, + { + name: "attempt to add duplicate peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}}, + args: args{peerID: "P1"}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}}, + wantErr: true, + }, + { + name: "attempt to add duplicate peer with existing peer in Ready state", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {state: peerStateReady, height: 3}}, + allB: []int64{1, 2, 3}, + }, + args: args{peerID: "P1"}, + wantErr: true, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {state: peerStateReady, height: 3}}, + allB: []int64{1, 2, 3}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.addPeer(tt.args.peerID); (err != nil) != tt.wantErr { + t.Errorf("scAddPeer() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc, "wanted peers %v, got %v", wantSc.peers, sc.peers) + }) + } +} + +func TestScTouchPeer(t *testing.T) { + now := time.Now() + + type args struct { + peerID p2p.ID + time time.Time + } + + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "attempt to touch non existing peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {state: peerStateReady, height: 5}}, + allB: []int64{1, 2, 3, 4, 5}, + }, + args: args{peerID: "P2", time: now}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, height: 5}}, + allB: []int64{1, 2, 3, 4, 5}, + }, + wantErr: true, + }, + { + name: "attempt to touch peer in state New", + fields: scTestParams{peers: map[string]*scPeer{"P1": {}}}, + args: args{peerID: "P1", time: now}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {}}}, + wantErr: true, + }, + { + name: "attempt to touch peer in state Removed", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved}, "P2": {state: peerStateReady}}}, + args: args{peerID: "P1", time: now}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved}, "P2": {state: peerStateReady}}}, + wantErr: true, + }, + { + name: "touch peer in state Ready", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastTouched: now}}}, + args: args{peerID: "P1", time: now.Add(3 * time.Second)}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, + lastTouched: now.Add(3 * time.Second)}}}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.touchPeer(tt.args.peerID, tt.args.time); (err != nil) != tt.wantErr { + t.Errorf("touchPeer() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc, "wanted peers %v, got %v", wantSc.peers, sc.peers) + }) + } +} + +func TestScPeersInactiveSince(t *testing.T) { + now := time.Now() + + type args struct { + threshold time.Duration + time time.Time + } + + tests := []struct { + name string + fields scTestParams + args args + wantResult []p2p.ID + }{ + { + name: "no peers", + fields: scTestParams{peers: map[string]*scPeer{}}, + args: args{threshold: time.Second, time: now}, + wantResult: []p2p.ID{}, + }, + { + name: "one active peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastTouched: now}}}, + args: args{threshold: time.Second, time: now}, + wantResult: []p2p.ID{}, + }, + { + name: "one inactive peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastTouched: now}}}, + args: args{threshold: time.Second, time: now.Add(time.Second + time.Millisecond)}, + wantResult: []p2p.ID{"P1"}, + }, + { + name: "one active and one inactive peer", + fields: scTestParams{peers: map[string]*scPeer{ + "P1": {state: peerStateReady, lastTouched: now}, + "P2": {state: peerStateReady, lastTouched: now.Add(time.Second)}}}, + args: args{threshold: time.Second, time: now.Add(time.Second + time.Millisecond)}, + wantResult: []p2p.ID{"P1"}, + }, + { + name: "one New peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew}}}, + args: args{threshold: time.Second, time: now.Add(time.Millisecond)}, + wantResult: []p2p.ID{}, + }, + { + name: "one Removed peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastTouched: now}}}, + args: args{threshold: time.Second, time: now.Add(time.Millisecond)}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready active peer and one New", + fields: scTestParams{peers: map[string]*scPeer{ + "P1": {state: peerStateRemoved, lastTouched: now}, + "P2": {state: peerStateReady, lastTouched: now.Add(time.Millisecond)}}}, + args: args{threshold: time.Second, time: now.Add(2 * time.Millisecond)}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready inactive peer and one New", + fields: scTestParams{peers: map[string]*scPeer{ + "P1": {state: peerStateRemoved, lastTouched: now}, + "P2": {state: peerStateReady, lastTouched: now.Add(time.Millisecond)}}}, + args: args{threshold: time.Second, time: now.Add(time.Second + 2*time.Millisecond)}, + wantResult: []p2p.ID{"P2"}, + }, + { + name: "combination of New, Removed and, active and non active Ready peers", + fields: scTestParams{peers: map[string]*scPeer{ + "P1": {state: peerStateNew}, + "P2": {state: peerStateRemoved, lastTouched: now}, + "P3": {state: peerStateRemoved, lastTouched: now.Add(time.Second)}, + "P4": {state: peerStateReady, lastTouched: now.Add(time.Millisecond)}, + "P5": {state: peerStateReady, lastTouched: now.Add(3 * time.Millisecond)}}}, + args: args{threshold: time.Second, time: now.Add(time.Second + 2*time.Millisecond)}, + wantResult: []p2p.ID{"P4"}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // peersInactiveSince should not mutate the scheduler + wantSc := sc + res := sc.peersInactiveSince(tt.args.threshold, tt.args.time) + sort.Sort(PeerByID(res)) + assert.Equal(t, tt.wantResult, res) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScPeersSlowerThan(t *testing.T) { + type args struct { + minSpeed int64 + } + + tests := []struct { + name string + fields scTestParams + args args + wantResult []p2p.ID + }{ + { + name: "no peers", + fields: scTestParams{peers: map[string]*scPeer{}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready faster peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastRate: 101}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready equal peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastRate: 100}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready slow peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastRate: 99}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{"P1"}, + }, + { + name: "one Removed faster peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastRate: 101}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, { + name: "one Removed equal peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastRate: 100}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "one Removed slow peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastRate: 99}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "one New peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "one New peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "mixed peers", + fields: scTestParams{peers: map[string]*scPeer{ + "P1": {state: peerStateRemoved, lastRate: 101}, + "P2": {state: peerStateReady, lastRate: 101}, + "P3": {state: peerStateRemoved, lastRate: 100}, + "P4": {state: peerStateReady, lastRate: 100}, + "P5": {state: peerStateReady, lastRate: 99}, + "P6": {state: peerStateNew}, + "P7": {state: peerStateRemoved, lastRate: 99}, + "P8": {state: peerStateReady, lastRate: 99}, + }}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{"P5", "P8"}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // peersSlowerThan should not mutate the scheduler + wantSc := sc + res := sc.peersSlowerThan(tt.args.minSpeed) + assert.Equal(t, tt.wantResult, res) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScPrunablePeers(t *testing.T) { + now := time.Now() + + type args struct { + threshold time.Duration + time time.Time + minSpeed int64 + } + + tests := []struct { + name string + fields scTestParams + args args + wantResult []p2p.ID + }{ + { + name: "no peers", + fields: scTestParams{peers: map[string]*scPeer{}}, + args: args{threshold: time.Second, time: now.Add(time.Second + time.Millisecond), minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "mixed peers", + fields: scTestParams{peers: map[string]*scPeer{ + // X - removed, active, fast + "P1": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 101}, + // X - ready, active, fast + "P2": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 101}, + // X - removed, active, equal + "P3": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 100}, + // V - ready, inactive, equal + "P4": {state: peerStateReady, lastTouched: now, lastRate: 100}, + // V - ready, inactive, slow + "P5": {state: peerStateReady, lastTouched: now, lastRate: 99}, + // V - ready, active, slow + "P6": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 90}, + }}, + args: args{threshold: time.Second, time: now.Add(time.Second + time.Millisecond), minSpeed: 100}, + wantResult: []p2p.ID{"P4", "P5", "P6"}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // peersSlowerThan should not mutate the scheduler + wantSc := sc + res := sc.prunablePeers(tt.args.threshold, tt.args.minSpeed, tt.args.time) + assert.Equal(t, tt.wantResult, res) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScRemovePeer(t *testing.T) { + + type args struct { + peerID p2p.ID + } + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "remove non existing peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}}, + args: args{peerID: "P2"}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}}, + wantErr: true, + }, + { + name: "remove single New peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}}, + args: args{peerID: "P1"}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateRemoved}}}, + }, + { + name: "remove one of two New peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}, "P2": {height: -1}}}, + args: args{peerID: "P1"}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateRemoved}, "P2": {height: -1}}}, + }, + { + name: "remove one Ready peer, all peers removed", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 10, state: peerStateRemoved}, + "P2": {height: 5, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5}, + }, + args: args{peerID: "P2"}, + wantFields: scTestParams{peers: map[string]*scPeer{ + "P1": {height: 10, state: peerStateRemoved}, + "P2": {height: 5, state: peerStateRemoved}}, + }, + }, + { + name: "attempt to remove already removed peer", + fields: scTestParams{ + height: 8, + peers: map[string]*scPeer{ + "P1": {height: 10, state: peerStateRemoved}, + "P2": {height: 11, state: peerStateReady}}, + allB: []int64{8, 9, 10, 11}, + }, + args: args{peerID: "P1"}, + wantFields: scTestParams{ + height: 8, + peers: map[string]*scPeer{ + "P1": {height: 10, state: peerStateRemoved}, + "P2": {height: 11, state: peerStateReady}}, + allB: []int64{8, 9, 10, 11}}, + wantErr: true, + }, + { + name: "remove Ready peer with blocks requested", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, + allB: []int64{1, 2, 3}, + pending: map[int64]p2p.ID{1: "P1"}, + }, + args: args{peerID: "P1"}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateRemoved}}, + allB: []int64{}, + pending: map[int64]p2p.ID{}, + }, + }, + { + name: "remove Ready peer with blocks received", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, + allB: []int64{1, 2, 3}, + received: map[int64]p2p.ID{1: "P1"}, + }, + args: args{peerID: "P1"}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateRemoved}}, + allB: []int64{}, + received: map[int64]p2p.ID{}, + }, + }, + { + name: "remove Ready peer with blocks received and requested (not yet received)", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{1: "P1", 3: "P1"}, + received: map[int64]p2p.ID{2: "P1", 4: "P1"}, + }, + args: args{peerID: "P1"}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}, + allB: []int64{}, + pending: map[int64]p2p.ID{}, + received: map[int64]p2p.ID{}, + }, + }, + { + name: "remove Ready peer from multiple peers set, with blocks received and requested (not yet received)", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 6, state: peerStateReady}, + "P2": {height: 6, state: peerStateReady}, + }, + allB: []int64{1, 2, 3, 4, 5, 6}, + pending: map[int64]p2p.ID{1: "P1", 3: "P2", 6: "P1"}, + received: map[int64]p2p.ID{2: "P1", 4: "P2", 5: "P2"}, + }, + args: args{peerID: "P1"}, + wantFields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 6, state: peerStateRemoved}, + "P2": {height: 6, state: peerStateReady}, + }, + allB: []int64{1, 2, 3, 4, 5, 6}, + pending: map[int64]p2p.ID{3: "P2"}, + received: map[int64]p2p.ID{4: "P2", 5: "P2"}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.removePeer(tt.args.peerID); (err != nil) != tt.wantErr { + t.Errorf("removePeer() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc, "wanted peers %v, got %v", wantSc.peers, sc.peers) + }) + } +} + +func TestScSetPeerHeight(t *testing.T) { + + type args struct { + peerID p2p.ID + height int64 + } + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "change height of non existing peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + args: args{peerID: "P2", height: 4}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + wantErr: true, + }, + { + name: "increase height of removed peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + args: args{peerID: "P1", height: 4}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + wantErr: true, + }, + { + name: "decrease height of single peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + args: args{peerID: "P1", height: 2}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + wantErr: true, + }, + { + name: "increase height of single peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + args: args{peerID: "P1", height: 4}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + }, + { + name: "noop height change of single peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + args: args{peerID: "P1", height: 4}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + }, + { + name: "add peer with huge height 10**10 ", + fields: scTestParams{ + peers: map[string]*scPeer{"P2": {height: -1, state: peerStateNew}}, + targetPending: 4, + }, + args: args{peerID: "P2", height: 10000000000}, + wantFields: scTestParams{ + targetPending: 4, + peers: map[string]*scPeer{"P2": {height: 10000000000, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.setPeerHeight(tt.args.peerID, tt.args.height); (err != nil) != tt.wantErr { + t.Errorf("setPeerHeight() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc, "wanted peers %v, got %v", wantSc.peers, sc.peers) + }) + } +} + +func TestScGetPeersAtHeight(t *testing.T) { + + type args struct { + height int64 + } + tests := []struct { + name string + fields scTestParams + args args + wantResult []p2p.ID + }{ + { + name: "no peers", + fields: scTestParams{peers: map[string]*scPeer{}}, + args: args{height: 10}, + wantResult: []p2p.ID{}, + }, + { + name: "only new peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}}, + args: args{height: 10}, + wantResult: []p2p.ID{}, + }, + { + name: "only Removed peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}}, + args: args{height: 2}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready shorter peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + args: args{height: 5}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready equal peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + args: args{height: 4}, + wantResult: []p2p.ID{"P1"}, + }, + { + name: "one Ready higher peer", + fields: scTestParams{ + targetPending: 4, + peers: map[string]*scPeer{"P1": {height: 20, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + args: args{height: 4}, + wantResult: []p2p.ID{"P1"}, + }, + { + name: "multiple mixed peers", + fields: scTestParams{ + height: 8, + peers: map[string]*scPeer{ + "P1": {height: -1, state: peerStateNew}, + "P2": {height: 10, state: peerStateReady}, + "P3": {height: 5, state: peerStateReady}, + "P4": {height: 20, state: peerStateRemoved}, + "P5": {height: 11, state: peerStateReady}}, + allB: []int64{8, 9, 10, 11}, + }, + args: args{height: 8}, + wantResult: []p2p.ID{"P2", "P5"}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // getPeersAtHeightOrAbove should not mutate the scheduler + wantSc := sc + res := sc.getPeersAtHeightOrAbove(tt.args.height) + sort.Sort(PeerByID(res)) + assert.Equal(t, tt.wantResult, res) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScMarkPending(t *testing.T) { + now := time.Now() + + type args struct { + peerID p2p.ID + height int64 + tm time.Time + } + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "attempt mark pending an unknown block", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + args: args{peerID: "P1", height: 3, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + wantErr: true, + }, + { + name: "attempt mark pending from non existing peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + args: args{peerID: "P2", height: 1, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + wantErr: true, + }, + { + name: "mark pending from Removed peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + args: args{peerID: "P1", height: 1, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + wantErr: true, + }, + { + name: "mark pending from New peer", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 4, state: peerStateNew}, + }, + allB: []int64{1, 2, 3, 4}, + }, + args: args{peerID: "P2", height: 2, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 4, state: peerStateNew}, + }, + allB: []int64{1, 2, 3, 4}, + }, + wantErr: true, + }, + { + name: "mark pending from short peer", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 2, state: peerStateReady}, + }, + allB: []int64{1, 2, 3, 4}, + }, + args: args{peerID: "P2", height: 3, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 2, state: peerStateReady}, + }, + allB: []int64{1, 2, 3, 4}, + }, + wantErr: true, + }, + { + name: "mark pending all good", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{1: "P1"}, + pendingTime: map[int64]time.Time{1: now}, + }, + args: args{peerID: "P1", height: 2, tm: now.Add(time.Millisecond)}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1"}, + pendingTime: map[int64]time.Time{1: now, 2: now.Add(time.Millisecond)}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.markPending(tt.args.peerID, tt.args.height, tt.args.tm); (err != nil) != tt.wantErr { + t.Errorf("markPending() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScMarkReceived(t *testing.T) { + now := time.Now() + + type args struct { + peerID p2p.ID + height int64 + size int64 + tm time.Time + } + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "received from non existing peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + args: args{peerID: "P2", height: 1, size: 1000, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + wantErr: true, + }, + { + name: "received from removed peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + args: args{peerID: "P1", height: 1, size: 1000, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + wantErr: true, + }, + { + name: "received from unsolicited peer", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 4, state: peerStateReady}, + }, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{1: "P1", 2: "P2", 3: "P2", 4: "P1"}, + }, + args: args{peerID: "P1", height: 2, size: 1000, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 4, state: peerStateReady}, + }, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{1: "P1", 2: "P2", 3: "P2", 4: "P1"}, + }, + wantErr: true, + }, + { + name: "received but blockRequest not sent", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{}, + }, + args: args{peerID: "P1", height: 2, size: 1000, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{}, + }, + wantErr: true, + }, + { + name: "received with bad timestamp", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1"}, + pendingTime: map[int64]time.Time{1: now, 2: now.Add(time.Second)}, + }, + args: args{peerID: "P1", height: 2, size: 1000, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1"}, + pendingTime: map[int64]time.Time{1: now, 2: now.Add(time.Second)}, + }, + wantErr: true, + }, + { + name: "received all good", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1"}, + pendingTime: map[int64]time.Time{1: now, 2: now}, + }, + args: args{peerID: "P1", height: 2, size: 1000, tm: now.Add(time.Millisecond)}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{1: "P1"}, + pendingTime: map[int64]time.Time{1: now}, + received: map[int64]p2p.ID{2: "P1"}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.markReceived(tt.args.peerID, + tt.args.height, tt.args.size, now.Add(time.Second)); (err != nil) != tt.wantErr { + t.Errorf("markReceived() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScMarkProcessed(t *testing.T) { + now := time.Now() + + type args struct { + height int64 + } + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "processed an unreceived block", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{2: "P1"}, + pendingTime: map[int64]time.Time{2: now}, + received: map[int64]p2p.ID{1: "P1"}}, + args: args{height: 2}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{2: "P1"}, + pendingTime: map[int64]time.Time{2: now}, + received: map[int64]p2p.ID{1: "P1"}}, + wantErr: true, + }, + { + name: "mark processed success", + fields: scTestParams{ + height: 1, + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{2: "P1"}, + pendingTime: map[int64]time.Time{2: now}, + received: map[int64]p2p.ID{1: "P1"}}, + args: args{height: 1}, + wantFields: scTestParams{ + height: 2, + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{2}, + pending: map[int64]p2p.ID{2: "P1"}, + pendingTime: map[int64]time.Time{2: now}}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.markProcessed(tt.args.height); (err != nil) != tt.wantErr { + t.Errorf("markProcessed() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScAllBlocksProcessed(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + fields scTestParams + wantResult bool + }{ + { + name: "no blocks", + fields: scTestParams{}, + wantResult: true, + }, + { + name: "only New blocks", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + wantResult: false, + }, + { + name: "only Pending blocks", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1", 4: "P1"}, + pendingTime: map[int64]time.Time{1: now, 2: now, 3: now, 4: now}, + }, + wantResult: false, + }, + { + name: "only Received blocks", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + received: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1", 4: "P1"}, + }, + wantResult: false, + }, + { + name: "only Processed blocks plus highest is received", + fields: scTestParams{ + height: 4, + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}}, + allB: []int64{4}, + received: map[int64]p2p.ID{4: "P1"}, + }, + wantResult: true, + }, + { + name: "mixed block states", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{2: "P1", 4: "P1"}, + pendingTime: map[int64]time.Time{2: now, 4: now}, + }, + wantResult: false, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // allBlocksProcessed() should not mutate the scheduler + wantSc := sc + res := sc.allBlocksProcessed() + assert.Equal(t, tt.wantResult, res) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScNextHeightToSchedule(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + fields scTestParams + wantHeight int64 + }{ + { + name: "no blocks", + fields: scTestParams{initHeight: 10, height: 11}, + wantHeight: -1, + }, + { + name: "only New blocks", + fields: scTestParams{ + initHeight: 2, + height: 3, + peers: map[string]*scPeer{"P1": {height: 6, state: peerStateReady}}, + allB: []int64{3, 4, 5, 6}, + }, + wantHeight: 3, + }, + { + name: "only Pending blocks", + fields: scTestParams{ + height: 1, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1", 4: "P1"}, + pendingTime: map[int64]time.Time{1: now, 2: now, 3: now, 4: now}, + }, + wantHeight: -1, + }, + { + name: "only Received blocks", + fields: scTestParams{ + height: 1, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + received: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1", 4: "P1"}, + }, + wantHeight: -1, + }, + { + name: "only Processed blocks", + fields: scTestParams{ + height: 1, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + wantHeight: 1, + }, + { + name: "mixed block states", + fields: scTestParams{ + height: 1, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{2: "P1"}, + pendingTime: map[int64]time.Time{2: now}, + }, + wantHeight: 1, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // nextHeightToSchedule() should not mutate the scheduler + wantSc := sc + + resMin := sc.nextHeightToSchedule() + assert.Equal(t, tt.wantHeight, resMin) + assert.Equal(t, wantSc, sc) + + }) + } +} + +func TestScSelectPeer(t *testing.T) { + + type args struct { + height int64 + } + tests := []struct { + name string + fields scTestParams + args args + wantResult p2p.ID + wantError bool + }{ + { + name: "no peers", + fields: scTestParams{peers: map[string]*scPeer{}}, + args: args{height: 10}, + wantResult: "", + wantError: true, + }, + { + name: "only new peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}}, + args: args{height: 10}, + wantResult: "", + wantError: true, + }, + { + name: "only Removed peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}}, + args: args{height: 2}, + wantResult: "", + wantError: true, + }, + { + name: "one Ready shorter peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + args: args{height: 5}, + wantResult: "", + wantError: true, + }, + { + name: "one Ready equal peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + args: args{height: 4}, + wantResult: "P1", + }, + { + name: "one Ready higher peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 6, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6}, + }, + args: args{height: 4}, + wantResult: "P1", + }, + { + name: "many Ready higher peers with different number of pending requests", + fields: scTestParams{ + height: 4, + peers: map[string]*scPeer{ + "P1": {height: 8, state: peerStateReady}, + "P2": {height: 9, state: peerStateReady}}, + allB: []int64{4, 5, 6, 7, 8, 9}, + pending: map[int64]p2p.ID{ + 4: "P1", 6: "P1", + 5: "P2", + }, + }, + args: args{height: 4}, + wantResult: "P2", + }, + { + name: "many Ready higher peers with same number of pending requests", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P2": {height: 20, state: peerStateReady}, + "P1": {height: 15, state: peerStateReady}, + "P3": {height: 15, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + pending: map[int64]p2p.ID{ + 1: "P1", 2: "P1", + 3: "P3", 4: "P3", + 5: "P2", 6: "P2", + }, + }, + args: args{height: 7}, + wantResult: "P1", + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // selectPeer should not mutate the scheduler + wantSc := sc + res, err := sc.selectPeer(tt.args.height) + assert.Equal(t, tt.wantResult, res) + assert.Equal(t, tt.wantError, err != nil) + assert.Equal(t, wantSc, sc) + }) + } +} + +// makeScBlock makes an empty block. +func makeScBlock(height int64) *types.Block { + return &types.Block{Header: types.Header{Height: height}} +} + +// checkScResults checks scheduler handler test results +func checkScResults(t *testing.T, wantErr bool, err error, wantEvent Event, event Event) { + if (err != nil) != wantErr { + t.Errorf("error = %v, wantErr %v", err, wantErr) + return + } + switch wantEvent := wantEvent.(type) { + case scPeerError: + assert.Equal(t, wantEvent.peerID, event.(scPeerError).peerID) + assert.Equal(t, wantEvent.reason != nil, event.(scPeerError).reason != nil) + case scBlockReceived: + assert.Equal(t, wantEvent.peerID, event.(scBlockReceived).peerID) + assert.Equal(t, wantEvent.block, event.(scBlockReceived).block) + case scSchedulerFail: + assert.Equal(t, wantEvent.reason != nil, event.(scSchedulerFail).reason != nil) + default: + assert.Equal(t, wantEvent, event) + } +} + +func TestScHandleBlockResponse(t *testing.T) { + now := time.Now() + block6FromP1 := bcBlockResponse{ + time: now.Add(time.Millisecond), + peerID: p2p.ID("P1"), + height: 6, + size: 100, + block: makeScBlock(6), + } + + type args struct { + event bcBlockResponse + } + + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "empty scheduler", + fields: scTestParams{}, + args: args{event: block6FromP1}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + { + name: "block from removed peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 8, state: peerStateRemoved}}}, + args: args{event: block6FromP1}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + { + name: "block we haven't asked for", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}}, + args: args{event: block6FromP1}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + { + name: "block from wrong peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P2": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, + pending: map[int64]p2p.ID{6: "P2"}, + pendingTime: map[int64]time.Time{6: now}, + }, + args: args{event: block6FromP1}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + { + name: "block with bad timestamp", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, + pending: map[int64]p2p.ID{6: "P1"}, + pendingTime: map[int64]time.Time{6: now.Add(time.Second)}, + }, + args: args{event: block6FromP1}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + { + name: "good block, accept", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, + pending: map[int64]p2p.ID{6: "P1"}, + pendingTime: map[int64]time.Time{6: now}, + }, + args: args{event: block6FromP1}, + wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(6)}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleBlockResponse(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandleBlockProcessed(t *testing.T) { + now := time.Now() + processed6FromP1 := pcBlockProcessed{ + peerID: p2p.ID("P1"), + height: 6, + } + + type args struct { + event pcBlockProcessed + } + + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "empty scheduler", + fields: scTestParams{height: 6}, + args: args{event: processed6FromP1}, + wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, + }, + { + name: "processed block we don't have", + fields: scTestParams{ + initHeight: 5, + height: 6, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + pending: map[int64]p2p.ID{6: "P1"}, + pendingTime: map[int64]time.Time{6: now}, + }, + args: args{event: processed6FromP1}, + wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, + }, + { + name: "processed block ok, we processed all blocks", + fields: scTestParams{ + initHeight: 5, + height: 6, + peers: map[string]*scPeer{"P1": {height: 7, state: peerStateReady}}, + allB: []int64{6, 7}, + received: map[int64]p2p.ID{6: "P1", 7: "P1"}, + }, + args: args{event: processed6FromP1}, + wantEvent: scFinishedEv{}, + }, + { + name: "processed block ok, we still have blocks to process", + fields: scTestParams{ + initHeight: 5, + height: 6, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + pending: map[int64]p2p.ID{7: "P1", 8: "P1"}, + received: map[int64]p2p.ID{6: "P1"}, + }, + args: args{event: processed6FromP1}, + wantEvent: noOpEvent{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleBlockProcessed(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandleBlockVerificationFailure(t *testing.T) { + now := time.Now() + + type args struct { + event pcBlockVerificationFailure + } + + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "empty scheduler", + fields: scTestParams{}, + args: args{event: pcBlockVerificationFailure{height: 10, firstPeerID: "P1", secondPeerID: "P1"}}, + wantEvent: noOpEvent{}, + }, + { + name: "failed block we don't have, single peer is still removed", + fields: scTestParams{ + initHeight: 5, + height: 6, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + pending: map[int64]p2p.ID{6: "P1"}, + pendingTime: map[int64]time.Time{6: now}, + }, + args: args{event: pcBlockVerificationFailure{height: 10, firstPeerID: "P1", secondPeerID: "P1"}}, + wantEvent: scFinishedEv{}, + }, + { + name: "failed block we don't have, one of two peers are removed", + fields: scTestParams{ + initHeight: 5, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}, "P2": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + pending: map[int64]p2p.ID{6: "P1"}, + pendingTime: map[int64]time.Time{6: now}, + }, + args: args{event: pcBlockVerificationFailure{height: 10, firstPeerID: "P1", secondPeerID: "P1"}}, + wantEvent: noOpEvent{}, + }, + { + name: "failed block, all blocks are processed after removal", + fields: scTestParams{ + initHeight: 5, + height: 6, + peers: map[string]*scPeer{"P1": {height: 7, state: peerStateReady}}, + allB: []int64{6, 7}, + received: map[int64]p2p.ID{6: "P1", 7: "P1"}, + }, + args: args{event: pcBlockVerificationFailure{height: 7, firstPeerID: "P1", secondPeerID: "P1"}}, + wantEvent: scFinishedEv{}, + }, + { + name: "failed block, we still have blocks to process", + fields: scTestParams{ + initHeight: 4, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}, "P2": {height: 8, state: peerStateReady}}, + allB: []int64{5, 6, 7, 8}, + pending: map[int64]p2p.ID{7: "P1", 8: "P1"}, + received: map[int64]p2p.ID{5: "P1", 6: "P1"}, + }, + args: args{event: pcBlockVerificationFailure{height: 5, firstPeerID: "P1", secondPeerID: "P1"}}, + wantEvent: noOpEvent{}, + }, + { + name: "failed block, H+1 and H+2 delivered by different peers, we still have blocks to process", + fields: scTestParams{ + initHeight: 4, + peers: map[string]*scPeer{ + "P1": {height: 8, state: peerStateReady}, + "P2": {height: 8, state: peerStateReady}, + "P3": {height: 8, state: peerStateReady}, + }, + allB: []int64{5, 6, 7, 8}, + pending: map[int64]p2p.ID{7: "P1", 8: "P1"}, + received: map[int64]p2p.ID{5: "P1", 6: "P1"}, + }, + args: args{event: pcBlockVerificationFailure{height: 5, firstPeerID: "P1", secondPeerID: "P2"}}, + wantEvent: noOpEvent{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleBlockProcessError(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandleAddNewPeer(t *testing.T) { + addP1 := addNewPeer{ + peerID: p2p.ID("P1"), + } + type args struct { + event addNewPeer + } + + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "add P1 to empty scheduler", + fields: scTestParams{}, + args: args{event: addP1}, + wantEvent: noOpEvent{}, + }, + { + name: "add duplicate peer", + fields: scTestParams{ + height: 6, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + }, + args: args{event: addP1}, + wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, + }, + { + name: "add P1 to non empty scheduler", + fields: scTestParams{ + height: 6, + peers: map[string]*scPeer{"P2": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + }, + args: args{event: addP1}, + wantEvent: noOpEvent{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleAddNewPeer(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandlePeerError(t *testing.T) { + errP1 := peerError{ + peerID: p2p.ID("P1"), + } + type args struct { + event peerError + } + + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "no peers", + fields: scTestParams{}, + args: args{event: errP1}, + wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, + }, + { + name: "error finds no peer", + fields: scTestParams{ + height: 6, + peers: map[string]*scPeer{"P2": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + }, + args: args{event: errP1}, + wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, + }, + { + name: "error finds peer, only peer is removed", + fields: scTestParams{ + height: 6, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + }, + args: args{event: errP1}, + wantEvent: scFinishedEv{}, + }, + { + name: "error finds peer, one of two peers are removed", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}, "P2": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, + }, + args: args{event: errP1}, + wantEvent: noOpEvent{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handlePeerError(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandleTryPrunePeer(t *testing.T) { + now := time.Now() + + pruneEv := tryPrunePeer{ + time: now.Add(time.Second + time.Millisecond), + } + type args struct { + event tryPrunePeer + } + + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "no peers", + fields: scTestParams{}, + args: args{event: pruneEv}, + wantEvent: noOpEvent{}, + }, + { + name: "no prunable peers", + fields: scTestParams{ + minRecvRate: 100, + peers: map[string]*scPeer{ + // X - removed, active, fast + "P1": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 101}, + // X - ready, active, fast + "P2": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 101}, + // X - removed, active, equal + "P3": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 100}}, + peerTimeout: time.Second, + }, + args: args{event: pruneEv}, + wantEvent: noOpEvent{}, + }, + { + name: "mixed peers", + fields: scTestParams{ + minRecvRate: 100, + peers: map[string]*scPeer{ + // X - removed, active, fast + "P1": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 101, height: 5}, + // X - ready, active, fast + "P2": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 101, height: 5}, + // X - removed, active, equal + "P3": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 100, height: 5}, + // V - ready, inactive, equal + "P4": {state: peerStateReady, lastTouched: now, lastRate: 100, height: 7}, + // V - ready, inactive, slow + "P5": {state: peerStateReady, lastTouched: now, lastRate: 99, height: 7}, + // V - ready, active, slow + "P6": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 90, height: 7}, + }, + allB: []int64{1, 2, 3, 4, 5, 6, 7}, + peerTimeout: time.Second}, + args: args{event: pruneEv}, + wantEvent: scPeersPruned{peers: []p2p.ID{"P4", "P5", "P6"}}, + }, + { + name: "mixed peers, finish after pruning", + fields: scTestParams{ + minRecvRate: 100, + height: 6, + peers: map[string]*scPeer{ + // X - removed, active, fast + "P1": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 101, height: 5}, + // X - ready, active, fast + "P2": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 101, height: 5}, + // X - removed, active, equal + "P3": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 100, height: 5}, + // V - ready, inactive, equal + "P4": {state: peerStateReady, lastTouched: now, lastRate: 100, height: 7}, + // V - ready, inactive, slow + "P5": {state: peerStateReady, lastTouched: now, lastRate: 99, height: 7}, + // V - ready, active, slow + "P6": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 90, height: 7}, + }, + allB: []int64{6, 7}, + peerTimeout: time.Second}, + args: args{event: pruneEv}, + wantEvent: scFinishedEv{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleTryPrunePeer(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestHandleTrySchedule(t *testing.T) { + now := time.Now() + tryEv := trySchedule{ + time: now.Add(time.Second + time.Millisecond), + } + + type args struct { + event trySchedule + } + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "no peers", + fields: scTestParams{peers: map[string]*scPeer{}}, + args: args{event: tryEv}, + wantEvent: noOpEvent{}, + }, + { + name: "only new peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}}, + args: args{event: tryEv}, + wantEvent: noOpEvent{}, + }, + { + name: "only Removed peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}}, + args: args{event: tryEv}, + wantEvent: noOpEvent{}, + }, + { + name: "one Ready shorter peer", + fields: scTestParams{ + height: 6, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}}, + args: args{event: tryEv}, + wantEvent: noOpEvent{}, + }, + { + name: "one Ready equal peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + args: args{event: tryEv}, + wantEvent: scBlockRequest{peerID: "P1", height: 1}, + }, + { + name: "many Ready higher peers with different number of pending requests", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 5, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5}, + pending: map[int64]p2p.ID{ + 1: "P1", 2: "P1", + 3: "P2", + }, + }, + args: args{event: tryEv}, + wantEvent: scBlockRequest{peerID: "P2", height: 4}, + }, + + { + name: "many Ready higher peers with same number of pending requests", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P2": {height: 8, state: peerStateReady}, + "P1": {height: 8, state: peerStateReady}, + "P3": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, + pending: map[int64]p2p.ID{ + 1: "P1", 2: "P1", + 3: "P3", 4: "P3", + 5: "P2", 6: "P2", + }, + }, + args: args{event: tryEv}, + wantEvent: scBlockRequest{peerID: "P1", height: 7}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleTrySchedule(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandleStatusResponse(t *testing.T) { + now := time.Now() + statusRespP1Ev := bcStatusResponse{ + time: now.Add(time.Second + time.Millisecond), + peerID: "P1", + height: 6, + } + + type args struct { + event bcStatusResponse + } + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "change height of non existing peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P2": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + }, + args: args{event: statusRespP1Ev}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + + { + name: "increase height of removed peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + args: args{event: statusRespP1Ev}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + + { + name: "decrease height of single peer", + fields: scTestParams{ + height: 5, + peers: map[string]*scPeer{"P1": {height: 10, state: peerStateReady}}, + allB: []int64{5, 6, 7, 8, 9, 10}, + }, + args: args{event: statusRespP1Ev}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + + { + name: "increase height of single peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + args: args{event: statusRespP1Ev}, + wantEvent: noOpEvent{}, + }, + { + name: "noop height change of single peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 6, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6}}, + args: args{event: statusRespP1Ev}, + wantEvent: noOpEvent{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleStatusResponse(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandle(t *testing.T) { + type unknownEv struct { + priorityNormal + } + + t0 := time.Now() + tick := make([]time.Time, 100) + for i := range tick { + tick[i] = t0.Add(time.Duration(i) * time.Millisecond) + } + + type args struct { + event Event + } + type scStep struct { + currentSc *scTestParams + args args + wantEvent Event + wantErr bool + wantSc *scTestParams + } + tests := []struct { + name string + steps []scStep + }{ + { + name: "unknown event", + steps: []scStep{ + { // add P1 + currentSc: &scTestParams{}, + args: args{event: unknownEv{}}, + wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, + wantSc: &scTestParams{}, + }, + }, + }, + { + name: "single peer, sync 3 blocks", + steps: []scStep{ + { // add P1 + currentSc: &scTestParams{peers: map[string]*scPeer{}, height: 1}, + args: args{event: addNewPeer{peerID: "P1"}}, + wantEvent: noOpEvent{}, + wantSc: &scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}, height: 1}, + }, + { // set height of P1 + args: args{event: bcStatusResponse{peerID: "P1", time: tick[0], height: 3}}, + wantEvent: noOpEvent{}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, + allB: []int64{1, 2, 3}, + height: 1, + }, + }, + { // schedule block 1 + args: args{event: trySchedule{time: tick[1]}}, + wantEvent: scBlockRequest{peerID: "P1", height: 1}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, + allB: []int64{1, 2, 3}, + pending: map[int64]p2p.ID{1: "P1"}, + pendingTime: map[int64]time.Time{1: tick[1]}, + height: 1, + }, + }, + { // schedule block 2 + args: args{event: trySchedule{time: tick[2]}}, + wantEvent: scBlockRequest{peerID: "P1", height: 2}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, + allB: []int64{1, 2, 3}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1"}, + pendingTime: map[int64]time.Time{1: tick[1], 2: tick[2]}, + height: 1, + }, + }, + { // schedule block 3 + args: args{event: trySchedule{time: tick[3]}}, + wantEvent: scBlockRequest{peerID: "P1", height: 3}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, + allB: []int64{1, 2, 3}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1"}, + pendingTime: map[int64]time.Time{1: tick[1], 2: tick[2], 3: tick[3]}, + height: 1, + }, + }, + { // block response 1 + args: args{event: bcBlockResponse{peerID: "P1", height: 1, time: tick[4], size: 100, block: makeScBlock(1)}}, + wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(1)}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[4]}}, + allB: []int64{1, 2, 3}, + pending: map[int64]p2p.ID{2: "P1", 3: "P1"}, + pendingTime: map[int64]time.Time{2: tick[2], 3: tick[3]}, + received: map[int64]p2p.ID{1: "P1"}, + height: 1, + }, + }, + { // block response 2 + args: args{event: bcBlockResponse{peerID: "P1", height: 2, time: tick[5], size: 100, block: makeScBlock(2)}}, + wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(2)}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[5]}}, + allB: []int64{1, 2, 3}, + pending: map[int64]p2p.ID{3: "P1"}, + pendingTime: map[int64]time.Time{3: tick[3]}, + received: map[int64]p2p.ID{1: "P1", 2: "P1"}, + height: 1, + }, + }, + { // block response 3 + args: args{event: bcBlockResponse{peerID: "P1", height: 3, time: tick[6], size: 100, block: makeScBlock(3)}}, + wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(3)}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{1, 2, 3}, + received: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1"}, + height: 1, + }, + }, + { // processed block 1 + args: args{event: pcBlockProcessed{peerID: p2p.ID("P1"), height: 1}}, + wantEvent: noOpEvent{}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{2, 3}, + received: map[int64]p2p.ID{2: "P1", 3: "P1"}, + height: 2, + }, + }, + { // processed block 2 + args: args{event: pcBlockProcessed{peerID: p2p.ID("P1"), height: 2}}, + wantEvent: scFinishedEv{}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{3}, + received: map[int64]p2p.ID{3: "P1"}, + height: 3, + }, + }, + }, + }, + { + name: "block verification failure", + steps: []scStep{ + { // failure processing block 1 + currentSc: &scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady, lastTouched: tick[6]}, + "P2": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{1, 2, 3, 4}, + received: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1"}, + height: 1, + }, + args: args{event: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P1"}}, + wantEvent: noOpEvent{}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateRemoved, lastTouched: tick[6]}, + "P2": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{1, 2, 3}, + received: map[int64]p2p.ID{}, + height: 1, + }, + }, + /* + { // processed block 2 + args: args{event: pcBlockProcessed{peerID: p2p.ID("P1"), height: 2}}, + wantEvent: scFinishedEv{}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{3}, + received: map[int64]p2p.ID{3: "P1"}, + height: 3, + }, + }, + + */ + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + var sc *scheduler + for i, step := range tt.steps { + // First step must always initialise the currentState as state. + if step.currentSc != nil { + sc = newTestScheduler(*step.currentSc) + } + if sc == nil { + panic("Bad (initial?) step") + } + + nextEvent, err := sc.handle(step.args.event) + assert.Equal(t, newTestScheduler(*step.wantSc), sc) + t.Logf("step %d(%v): %s", i, step.args.event, sc) + + checkScResults(t, step.wantErr, err, step.wantEvent, nextEvent) + + // Next step may use the wantedState as their currentState. + sc = newTestScheduler(*step.wantSc) + } + }) + } +}