diff --git a/blockchain/v2/processor.go b/blockchain/v2/processor.go index 22b46ede3..e33b36058 100644 --- a/blockchain/v2/processor.go +++ b/blockchain/v2/processor.go @@ -21,17 +21,11 @@ type pcShortBlock struct { priorityNormal } -type bcBlockResponse struct { - priorityNormal - peerID p2p.ID - block *types.Block - height int64 -} - type pcBlockVerificationFailure struct { priorityNormal - peerID p2p.ID - height int64 + height int64 + firstPeerID p2p.ID + secondPeerID p2p.ID } type pcBlockProcessed struct { @@ -135,11 +129,14 @@ func (state *pcState) purgePeer(peerID p2p.ID) { // handle processes FSM events func (state *pcState) handle(event Event) (Event, error) { switch event := event.(type) { - case *bcBlockResponse: - if event.height <= state.height { + case *scBlockReceived: + if event.block == nil { + panic("processor received an event with a nil block") + } + if event.block.Height <= state.height { return pcShortBlock{}, nil } - err := state.enqueue(event.peerID, event.block, event.height) + err := state.enqueue(event.peerID, event.block, event.block.Height) if err != nil { return pcDuplicateBlock{}, nil } @@ -160,7 +157,11 @@ func (state *pcState) handle(event Event) (Event, error) { err = state.context.verifyCommit(state.chainID, firstID, first.Height, second.LastCommit) if err != nil { - return pcBlockVerificationFailure{peerID: firstItem.peerID, height: first.Height}, nil + state.purgePeer(firstItem.peerID) + state.purgePeer(secondItem.peerID) + return pcBlockVerificationFailure{ + height: first.Height, firstPeerID: firstItem.peerID, secondPeerID: secondItem.peerID}, + nil } state.context.saveBlock(first, firstParts, second.LastCommit) diff --git a/blockchain/v2/processor_test.go b/blockchain/v2/processor_test.go index 195df3618..61be23663 100644 --- a/blockchain/v2/processor_test.go +++ b/blockchain/v2/processor_test.go @@ -47,11 +47,10 @@ func makeState(p *params) *pcState { return state } -func mBlockResponse(peerID p2p.ID, height int64) *bcBlockResponse { - return &bcBlockResponse{ +func mBlockResponse(peerID p2p.ID, height int64) *scBlockReceived { + return &scBlockReceived{ peerID: peerID, block: makePcBlock(height), - height: height, } } @@ -235,21 +234,42 @@ func TestPcProcessBlockSuccess(t *testing.T) { func TestPcProcessBlockFailures(t *testing.T) { tests := []testFields{ { - name: "blocks H+1 and H+2 present - H+1 verification fails ", + name: "blocks H+1 and H+2 present from different peers - H+1 verification fails ", steps: []pcFsmMakeStateValues{ { currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, verBL: []int64{1}}, event: pcProcessBlock{}, - wantState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, verBL: []int64{1}}, - wantNextEvent: pcBlockVerificationFailure{peerID: "P1", height: 1}, + wantState: ¶ms{items: []pcBlock{}, verBL: []int64{1}}, + wantNextEvent: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P2"}, }, }, }, { - name: "blocks H+1 and H+2 present - H+1 applyBlock fails ", + name: "blocks H+1 and H+2 present from same peer - H+1 applyBlock fails ", steps: []pcFsmMakeStateValues{ { currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, appBL: []int64{1}}, event: pcProcessBlock{}, - wantState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}}, appBL: []int64{1}}, wantPanic: true, + wantState: ¶ms{items: []pcBlock{}, appBL: []int64{1}}, wantPanic: true, + }, + }, + }, + { + name: "blocks H+1 and H+2 present from same peers - H+1 verification fails ", + steps: []pcFsmMakeStateValues{ + { + currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P1", 2}, {"P2", 3}}, verBL: []int64{1}}, + event: pcProcessBlock{}, + wantState: ¶ms{items: []pcBlock{{"P2", 3}}, verBL: []int64{1}}, + wantNextEvent: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P1"}, + }, + }, + }, + { + name: "blocks H+1 and H+2 present from different peers - H+1 applyBlock fails ", + steps: []pcFsmMakeStateValues{ + { + currentState: ¶ms{items: []pcBlock{{"P1", 1}, {"P2", 2}, {"P2", 3}}, appBL: []int64{1}}, + event: pcProcessBlock{}, + wantState: ¶ms{items: []pcBlock{{"P2", 3}}, appBL: []int64{1}}, wantPanic: true, }, }, }, diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index a24a16f09..897dd738c 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -14,7 +14,7 @@ type handleFunc = func(event Event) (Event, error) // stream of events processed by a handle function. This Routine structure // handles the concurrency and messaging guarantees. Events are sent via // `send` are handled by the `handle` function to produce an iterator -// `next()`. Calling `close()` on a routine will conclude processing of all +// `next()`. Calling `stop()` on a routine will conclude processing of all // sent events and produce `final()` event representing the terminal state. type Routine struct { name string @@ -115,7 +115,7 @@ func (rt *Routine) ready() chan struct{} { } func (rt *Routine) stop() { - if !rt.isRunning() { + if !rt.isRunning() { // XXX: this should check rt.queue.Disposed() return } diff --git a/blockchain/v2/schedule.go b/blockchain/v2/schedule.go deleted file mode 100644 index 236d12aa9..000000000 --- a/blockchain/v2/schedule.go +++ /dev/null @@ -1,386 +0,0 @@ -// nolint:unused -package v2 - -import ( - "fmt" - "math" - "math/rand" - "time" - - "github.com/tendermint/tendermint/p2p" -) - -type blockState int - -const ( - blockStateUnknown blockState = iota - blockStateNew - blockStatePending - blockStateReceived - blockStateProcessed -) - -func (e blockState) String() string { - switch e { - case blockStateUnknown: - return "Unknown" - case blockStateNew: - return "New" - case blockStatePending: - return "Pending" - case blockStateReceived: - return "Received" - case blockStateProcessed: - return "Processed" - default: - return fmt.Sprintf("unknown blockState: %d", e) - } -} - -type peerState int - -const ( - peerStateNew = iota - peerStateReady - peerStateRemoved -) - -func (e peerState) String() string { - switch e { - case peerStateNew: - return "New" - case peerStateReady: - return "Ready" - case peerStateRemoved: - return "Removed" - default: - return fmt.Sprintf("unknown peerState: %d", e) - } -} - -type scPeer struct { - peerID p2p.ID - state peerState - height int64 - lastTouched time.Time - lastRate int64 -} - -func newScPeer(peerID p2p.ID) *scPeer { - return &scPeer{ - peerID: peerID, - state: peerStateNew, - height: -1, - lastTouched: time.Time{}, - } -} - -// The schedule is a composite data structure which allows a scheduler to keep -// track of which blocks have been scheduled into which state. -type schedule struct { - initHeight int64 - // a list of blocks in which blockState - blockStates map[int64]blockState - - // a map of peerID to schedule specific peer struct `scPeer` used to keep - // track of peer specific state - peers map[p2p.ID]*scPeer - - // a map of heights to the peer we are waiting for a response from - pendingBlocks map[int64]p2p.ID - - // the time at which a block was put in blockStatePending - pendingTime map[int64]time.Time - - // the peerID of the peer which put the block in blockStateReceived - receivedBlocks map[int64]p2p.ID -} - -func newSchedule(initHeight int64) *schedule { - sc := schedule{ - initHeight: initHeight, - blockStates: make(map[int64]blockState), - peers: make(map[p2p.ID]*scPeer), - pendingBlocks: make(map[int64]p2p.ID), - pendingTime: make(map[int64]time.Time), - receivedBlocks: make(map[int64]p2p.ID), - } - - sc.setStateAtHeight(initHeight, blockStateNew) - - return &sc -} - -func (sc *schedule) addPeer(peerID p2p.ID) error { - if _, ok := sc.peers[peerID]; ok { - return fmt.Errorf("cannot add duplicate peer %s", peerID) - } - sc.peers[peerID] = newScPeer(peerID) - return nil -} - -func (sc *schedule) touchPeer(peerID p2p.ID, time time.Time) error { - peer, ok := sc.peers[peerID] - if !ok { - return fmt.Errorf("couldn't find peer %s", peerID) - } - - if peer.state == peerStateRemoved { - return fmt.Errorf("tried to touch peer in peerStateRemoved") - } - - peer.lastTouched = time - - return nil -} - -func (sc *schedule) removePeer(peerID p2p.ID) error { - peer, ok := sc.peers[peerID] - if !ok { - return fmt.Errorf("couldn't find peer %s", peerID) - } - - if peer.state == peerStateRemoved { - return fmt.Errorf("tried to remove peer %s in peerStateRemoved", peerID) - } - - for height, pendingPeerID := range sc.pendingBlocks { - if pendingPeerID == peerID { - sc.setStateAtHeight(height, blockStateNew) - delete(sc.pendingTime, height) - delete(sc.pendingBlocks, height) - } - } - - for height, rcvPeerID := range sc.receivedBlocks { - if rcvPeerID == peerID { - sc.setStateAtHeight(height, blockStateNew) - delete(sc.receivedBlocks, height) - } - } - - peer.state = peerStateRemoved - - return nil -} - -// TODO - keep track of highest height -func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error { - peer, ok := sc.peers[peerID] - if !ok { - return fmt.Errorf("can't find peer %s", peerID) - } - - if peer.state == peerStateRemoved { - return fmt.Errorf("cannot set peer height for a peer in peerStateRemoved") - } - - if height < peer.height { - return fmt.Errorf("cannot move peer height lower. from %d to %d", peer.height, height) - } - - peer.height = height - peer.state = peerStateReady - for i := sc.minHeight(); i <= height; i++ { - if sc.getStateAtHeight(i) == blockStateUnknown { - sc.setStateAtHeight(i, blockStateNew) - } - } - - return nil -} - -func (sc *schedule) getStateAtHeight(height int64) blockState { - if height < sc.initHeight { - return blockStateProcessed - } else if state, ok := sc.blockStates[height]; ok { - return state - } else { - return blockStateUnknown - } -} - -func (sc *schedule) getPeersAtHeight(height int64) []*scPeer { - peers := []*scPeer{} - for _, peer := range sc.peers { - if peer.height >= height { - peers = append(peers, peer) - } - } - - return peers -} - -func (sc *schedule) peersInactiveSince(duration time.Duration, now time.Time) []p2p.ID { - peers := []p2p.ID{} - for _, peer := range sc.peers { - if now.Sub(peer.lastTouched) > duration { - peers = append(peers, peer.peerID) - } - } - - return peers -} - -func (sc *schedule) peersSlowerThan(minSpeed int64) []p2p.ID { - peers := []p2p.ID{} - for _, peer := range sc.peers { - if peer.lastRate < minSpeed { - peers = append(peers, peer.peerID) - } - } - - return peers -} - -func (sc *schedule) setStateAtHeight(height int64, state blockState) { - sc.blockStates[height] = state -} - -func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error { - peer, ok := sc.peers[peerID] - if !ok { - return fmt.Errorf("can't find peer %s", peerID) - } - - if peer.state == peerStateRemoved { - return fmt.Errorf("cannot receive blocks from removed peer %s", peerID) - } - - if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID { - return fmt.Errorf("received block %d from peer %s without being requested", height, peerID) - } - - pendingTime, ok := sc.pendingTime[height] - if !ok || now.Sub(pendingTime) <= 0 { - return fmt.Errorf("clock error. Block %d received at %s but requested at %s", - height, pendingTime, now) - } - - peer.lastRate = size / int64(now.Sub(pendingTime).Seconds()) - - sc.setStateAtHeight(height, blockStateReceived) - delete(sc.pendingBlocks, height) - delete(sc.pendingTime, height) - - sc.receivedBlocks[height] = peerID - - return nil -} - -func (sc *schedule) markPending(peerID p2p.ID, height int64, time time.Time) error { - peer, ok := sc.peers[peerID] - if !ok { - return fmt.Errorf("can't find peer %s", peerID) - } - - state := sc.getStateAtHeight(height) - if state != blockStateNew { - return fmt.Errorf("block %d should be in blockStateNew but was %s", height, state) - } - - if peer.state != peerStateReady { - return fmt.Errorf("cannot schedule %d from %s in %s", height, peerID, peer.state) - } - - if height > peer.height { - return fmt.Errorf("cannot request height %d from peer %s who is at height %d", - height, peerID, peer.height) - } - - sc.setStateAtHeight(height, blockStatePending) - sc.pendingBlocks[height] = peerID - // XXX: to make this more accurate we can introduce a message from - // the IO routine which indicates the time the request was put on the wire - sc.pendingTime[height] = time - - return nil -} - -func (sc *schedule) markProcessed(height int64) error { - state := sc.getStateAtHeight(height) - if state != blockStateReceived { - return fmt.Errorf("can't mark height %d received from block state %s", height, state) - } - - delete(sc.receivedBlocks, height) - - sc.setStateAtHeight(height, blockStateProcessed) - - return nil -} - -// allBlockProcessed returns true if all blocks are in blockStateProcessed and -// determines if the schedule has been completed -func (sc *schedule) allBlocksProcessed() bool { - for _, state := range sc.blockStates { - if state != blockStateProcessed { - return false - } - } - return true -} - -// highest block | state == blockStateNew -func (sc *schedule) maxHeight() int64 { - var max int64 = 0 - for height, state := range sc.blockStates { - if state == blockStateNew && height > max { - max = height - } - } - - return max -} - -// lowest block | state == blockStateNew -func (sc *schedule) minHeight() int64 { - var min int64 = math.MaxInt64 - for height, state := range sc.blockStates { - if state == blockStateNew && height < min { - min = height - } - } - - return min -} - -func (sc *schedule) pendingFrom(peerID p2p.ID) []int64 { - heights := []int64{} - for height, pendingPeerID := range sc.pendingBlocks { - if pendingPeerID == peerID { - heights = append(heights, height) - } - } - return heights -} - -func (sc *schedule) selectPeer(peers []*scPeer) *scPeer { - // FIXME: properPeerSelector - s := rand.NewSource(time.Now().Unix()) - r := rand.New(s) - - return peers[r.Intn(len(peers))] -} - -// XXX: this duplicates the logic of peersInactiveSince and peersSlowerThan -func (sc *schedule) prunablePeers(peerTimout time.Duration, minRecvRate int64, now time.Time) []p2p.ID { - prunable := []p2p.ID{} - for peerID, peer := range sc.peers { - if now.Sub(peer.lastTouched) > peerTimout || peer.lastRate < minRecvRate { - prunable = append(prunable, peerID) - } - } - - return prunable -} - -func (sc *schedule) numBlockInState(targetState blockState) uint32 { - var num uint32 = 0 - for _, state := range sc.blockStates { - if state == targetState { - num++ - } - } - return num -} diff --git a/blockchain/v2/schedule_test.go b/blockchain/v2/schedule_test.go deleted file mode 100644 index a1448c528..000000000 --- a/blockchain/v2/schedule_test.go +++ /dev/null @@ -1,272 +0,0 @@ -package v2 - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/tendermint/tendermint/p2p" -) - -func TestScheduleInit(t *testing.T) { - var ( - initHeight int64 = 5 - sc = newSchedule(initHeight) - ) - - assert.Equal(t, blockStateNew, sc.getStateAtHeight(initHeight)) - assert.Equal(t, blockStateProcessed, sc.getStateAtHeight(initHeight-1)) - assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(initHeight+1)) -} - -func TestAddPeer(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerIDTwo p2p.ID = "2" - sc = newSchedule(initHeight) - ) - - assert.Nil(t, sc.addPeer(peerID)) - assert.Nil(t, sc.addPeer(peerIDTwo)) - assert.Error(t, sc.addPeer(peerID)) -} - -func TestTouchPeer(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - sc = newSchedule(initHeight) - now = time.Now() - ) - - assert.Error(t, sc.touchPeer(peerID, now), - "Touching an unknown peer should return errPeerNotFound") - - assert.Nil(t, sc.addPeer(peerID), - "Adding a peer should return no error") - assert.Nil(t, sc.touchPeer(peerID, now), - "Touching a peer should return no error") - - threshold := 10 * time.Second - assert.Empty(t, sc.peersInactiveSince(threshold, now.Add(9*time.Second)), - "Expected no peers to have been touched over 9 seconds") - assert.Containsf(t, sc.peersInactiveSince(threshold, now.Add(11*time.Second)), peerID, - "Expected one %s to have been touched over 10 seconds ago", peerID) -} - -func TestPeerHeight(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerHeight int64 = 20 - sc = newSchedule(initHeight) - ) - - assert.NoError(t, sc.addPeer(peerID), - "Adding a peer should return no error") - assert.NoError(t, sc.setPeerHeight(peerID, peerHeight)) - for i := initHeight; i <= peerHeight; i++ { - assert.Equal(t, sc.getStateAtHeight(i), blockStateNew, - "Expected all blocks to be in blockStateNew") - peerIDs := []p2p.ID{} - for _, peer := range sc.getPeersAtHeight(i) { - peerIDs = append(peerIDs, peer.peerID) - } - - assert.Containsf(t, peerIDs, peerID, - "Expected %s to have block %d", peerID, i) - } -} - -func TestTransitionPending(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerIDTwo p2p.ID = "2" - peerHeight int64 = 20 - sc = newSchedule(initHeight) - now = time.Now() - ) - - assert.NoError(t, sc.addPeer(peerID), - "Adding a peer should return no error") - assert.Nil(t, sc.addPeer(peerIDTwo), - "Adding a peer should return no error") - - assert.Error(t, sc.markPending(peerID, peerHeight, now), - "Expected scheduling a block from a peer in peerStateNew to fail") - - assert.NoError(t, sc.setPeerHeight(peerID, peerHeight), - "Expected setPeerHeight to return no error") - assert.NoError(t, sc.setPeerHeight(peerIDTwo, peerHeight), - "Expected setPeerHeight to return no error") - - assert.NoError(t, sc.markPending(peerID, peerHeight, now), - "Expected markingPending new block to succeed") - assert.Error(t, sc.markPending(peerIDTwo, peerHeight, now), - "Expected markingPending by a second peer to fail") - - assert.Equal(t, blockStatePending, sc.getStateAtHeight(peerHeight), - "Expected the block to to be in blockStatePending") - - assert.NoError(t, sc.removePeer(peerID), - "Expected removePeer to return no error") - - assert.Equal(t, blockStateNew, sc.getStateAtHeight(peerHeight), - "Expected the block to to be in blockStateNew") - - assert.Error(t, sc.markPending(peerID, peerHeight, now), - "Expected markingPending removed peer to fail") - - assert.NoError(t, sc.markPending(peerIDTwo, peerHeight, now), - "Expected markingPending on a ready peer to succeed") - - assert.Equal(t, blockStatePending, sc.getStateAtHeight(peerHeight), - "Expected the block to to be in blockStatePending") -} - -func TestTransitionReceived(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerIDTwo p2p.ID = "2" - peerHeight int64 = 20 - blockSize int64 = 1024 - sc = newSchedule(initHeight) - now = time.Now() - receivedAt = now.Add(1 * time.Second) - ) - - assert.NoError(t, sc.addPeer(peerID), - "Expected adding peer %s to succeed", peerID) - assert.NoError(t, sc.addPeer(peerIDTwo), - "Expected adding peer %s to succeed", peerIDTwo) - assert.NoError(t, sc.setPeerHeight(peerID, peerHeight), - "Expected setPeerHeight to return no error") - assert.NoErrorf(t, sc.setPeerHeight(peerIDTwo, peerHeight), - "Expected setPeerHeight on %s to %d to succeed", peerIDTwo, peerHeight) - assert.NoError(t, sc.markPending(peerID, initHeight, now), - "Expected markingPending new block to succeed") - - assert.Error(t, sc.markReceived(peerIDTwo, initHeight, blockSize, receivedAt), - "Expected marking markReceived from a non requesting peer to fail") - - assert.NoError(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt), - "Expected marking markReceived on a pending block to succeed") - - assert.Error(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt), - "Expected marking markReceived on received block to fail") - - assert.Equalf(t, blockStateReceived, sc.getStateAtHeight(initHeight), - "Expected block %d to be blockHeightReceived", initHeight) - - assert.NoErrorf(t, sc.removePeer(peerID), - "Expected removePeer removing %s to succeed", peerID) - - assert.Equalf(t, blockStateNew, sc.getStateAtHeight(initHeight), - "Expected block %d to be blockStateNew", initHeight) - - assert.NoErrorf(t, sc.markPending(peerIDTwo, initHeight, now), - "Expected markingPending %d from %s to succeed", initHeight, peerIDTwo) - assert.NoErrorf(t, sc.markReceived(peerIDTwo, initHeight, blockSize, receivedAt), - "Expected marking markReceived %d from %s to succeed", initHeight, peerIDTwo) - assert.Equalf(t, blockStateReceived, sc.getStateAtHeight(initHeight), - "Expected block %d to be blockStateReceived", initHeight) -} - -func TestTransitionProcessed(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerHeight int64 = 20 - blockSize int64 = 1024 - sc = newSchedule(initHeight) - now = time.Now() - receivedAt = now.Add(1 * time.Second) - ) - - assert.NoError(t, sc.addPeer(peerID), - "Expected adding peer %s to succeed", peerID) - assert.NoErrorf(t, sc.setPeerHeight(peerID, peerHeight), - "Expected setPeerHeight on %s to %d to succeed", peerID, peerHeight) - assert.NoError(t, sc.markPending(peerID, initHeight, now), - "Expected markingPending new block to succeed") - assert.NoError(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt), - "Expected marking markReceived on a pending block to succeed") - - assert.Error(t, sc.markProcessed(initHeight+1), - "Expected marking %d as processed to fail", initHeight+1) - assert.NoError(t, sc.markProcessed(initHeight), - "Expected marking %d as processed to succeed", initHeight) - - assert.Equalf(t, blockStateProcessed, sc.getStateAtHeight(initHeight), - "Expected block %d to be blockStateProcessed", initHeight) - - assert.NoError(t, sc.removePeer(peerID), - "Expected removing peer %s to succeed", peerID) - - assert.Equalf(t, blockStateProcessed, sc.getStateAtHeight(initHeight), - "Expected block %d to be blockStateProcessed", initHeight) -} - -func TestMinMaxHeight(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerHeight int64 = 20 - sc = newSchedule(initHeight) - now = time.Now() - ) - - assert.Equal(t, initHeight, sc.minHeight(), - "Expected min height to be the initialized height") - - assert.Equal(t, initHeight, sc.maxHeight(), - "Expected max height to be the initialized height") - - assert.NoError(t, sc.addPeer(peerID), - "Adding a peer should return no error") - - assert.NoError(t, sc.setPeerHeight(peerID, peerHeight), - "Expected setPeerHeight to return no error") - - assert.Equal(t, peerHeight, sc.maxHeight(), - "Expected max height to increase to peerHeight") - - assert.Nil(t, sc.markPending(peerID, initHeight, now.Add(1*time.Second)), - "Expected marking initHeight as pending to return no error") - - assert.Equal(t, initHeight+1, sc.minHeight(), - "Expected marking initHeight as pending to move minHeight forward") -} - -func TestPeersSlowerThan(t *testing.T) { - var ( - initHeight int64 = 5 - peerID p2p.ID = "1" - peerHeight int64 = 20 - blockSize int64 = 1024 - sc = newSchedule(initHeight) - now = time.Now() - receivedAt = now.Add(1 * time.Second) - ) - - assert.NoError(t, sc.addPeer(peerID), - "Adding a peer should return no error") - - assert.NoError(t, sc.setPeerHeight(peerID, peerHeight), - "Expected setPeerHeight to return no error") - - assert.NoError(t, sc.markPending(peerID, peerHeight, now), - "Expected markingPending on to return no error") - - assert.NoError(t, sc.markReceived(peerID, peerHeight, blockSize, receivedAt), - "Expected markingPending on to return no error") - - assert.Empty(t, sc.peersSlowerThan(blockSize-1), - "expected no peers to be slower than blockSize-1 bytes/sec") - - assert.Containsf(t, sc.peersSlowerThan(blockSize+1), peerID, - "expected %s to be slower than blockSize+1 bytes/sec", peerID) -} diff --git a/blockchain/v2/scheduler.go b/blockchain/v2/scheduler.go new file mode 100644 index 000000000..ab3892dc5 --- /dev/null +++ b/blockchain/v2/scheduler.go @@ -0,0 +1,718 @@ +package v2 + +import ( + "bytes" + "fmt" + "math" + "sort" + "time" + + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/types" +) + +// Events + +// XXX: The handle API would be much simpler if it return a single event, an +// Event, which embeds a terminationEvent if it wants to terminate the routine. + +// Input events into the scheduler: +// ticker event for cleaning peers +type tryPrunePeer struct { + priorityHigh + time time.Time +} + +// ticker event for scheduling block requests +type trySchedule struct { + priorityHigh + time time.Time +} + +// blockResponse message received from a peer +type bcBlockResponse struct { + priorityNormal + time time.Time + peerID p2p.ID + height int64 + size int64 + block *types.Block +} + +// statusResponse message received from a peer +type bcStatusResponse struct { + priorityNormal + time time.Time + peerID p2p.ID + height int64 +} + +// new peer is connected +type addNewPeer struct { + priorityNormal + peerID p2p.ID +} + +// Output events issued by the scheduler: +// all blocks have been processed +type scFinishedEv struct { + priorityNormal +} + +// send a blockRequest message +type scBlockRequest struct { + priorityNormal + peerID p2p.ID + height int64 +} + +// a block has been received and validated by the scheduler +type scBlockReceived struct { + priorityNormal + peerID p2p.ID + block *types.Block +} + +// scheduler detected a peer error +type scPeerError struct { + priorityHigh + peerID p2p.ID + reason error +} + +// scheduler removed a set of peers (timed out or slow peer) +type scPeersPruned struct { + priorityHigh + peers []p2p.ID +} + +// XXX: make this fatal? +// scheduler encountered a fatal error +type scSchedulerFail struct { + priorityHigh + reason error +} + +type blockState int + +const ( + blockStateUnknown blockState = iota + 1 // no known peer has this block + blockStateNew // indicates that a peer has reported having this block + blockStatePending // indicates that this block has been requested from a peer + blockStateReceived // indicates that this block has been received by a peer + blockStateProcessed // indicates that this block has been applied +) + +func (e blockState) String() string { + switch e { + case blockStateUnknown: + return "Unknown" + case blockStateNew: + return "New" + case blockStatePending: + return "Pending" + case blockStateReceived: + return "Received" + case blockStateProcessed: + return "Processed" + default: + return fmt.Sprintf("invalid blockState: %d", e) + } +} + +type peerState int + +const ( + peerStateNew = iota + 1 + peerStateReady + peerStateRemoved +) + +func (e peerState) String() string { + switch e { + case peerStateNew: + return "New" + case peerStateReady: + return "Ready" + case peerStateRemoved: + return "Removed" + default: + panic(fmt.Sprintf("unknown peerState: %d", e)) + } +} + +type scPeer struct { + peerID p2p.ID + + // initialized as New when peer is added, updated to Ready when statusUpdate is received, + // updated to Removed when peer is removed + state peerState + + height int64 // updated when statusResponse is received + lastTouched time.Time + lastRate int64 // last receive rate in bytes +} + +func (p scPeer) String() string { + return fmt.Sprintf("{state %v, height %d, lastTouched %v, lastRate %d, id %v}", + p.state, p.height, p.lastTouched, p.lastRate, p.peerID) +} + +func newScPeer(peerID p2p.ID) *scPeer { + return &scPeer{ + peerID: peerID, + state: peerStateNew, + height: -1, + } +} + +// The scheduler keep track of the state of each block and each peer. The +// scheduler will attempt to schedule new block requests with `trySchedule` +// events and remove slow peers with `tryPrune` events. +type scheduler struct { + initHeight int64 + + // next block that needs to be processed. All blocks with smaller height are + // in Processed state. + height int64 + + // a map of peerID to scheduler specific peer struct `scPeer` used to keep + // track of peer specific state + peers map[p2p.ID]*scPeer + peerTimeout time.Duration + minRecvRate int64 // minimum receive rate from peer otherwise prune + + // the maximum number of blocks that should be New, Received or Pending at any point + // in time. This is used to enforce a limit on the blockStates map. + targetPending int + // a list of blocks to be scheduled (New), Pending or Received. Its length should be + // smaller than targetPending. + blockStates map[int64]blockState + + // a map of heights to the peer we are waiting a response from + pendingBlocks map[int64]p2p.ID + + // the time at which a block was put in blockStatePending + pendingTime map[int64]time.Time + + // a map of heights to the peers that put the block in blockStateReceived + receivedBlocks map[int64]p2p.ID +} + +func (sc scheduler) String() string { + return fmt.Sprintf("ih: %d, bst: %v, peers: %v, pblks: %v, ptm %v, rblks: %v", + sc.initHeight, sc.blockStates, sc.peers, sc.pendingBlocks, sc.pendingTime, sc.receivedBlocks) +} + +func newScheduler(initHeight int64) *scheduler { + sc := scheduler{ + initHeight: initHeight, + height: initHeight + 1, + blockStates: make(map[int64]blockState), + peers: make(map[p2p.ID]*scPeer), + pendingBlocks: make(map[int64]p2p.ID), + pendingTime: make(map[int64]time.Time), + receivedBlocks: make(map[int64]p2p.ID), + } + + return &sc +} + +func (sc *scheduler) addPeer(peerID p2p.ID) error { + if _, ok := sc.peers[peerID]; ok { + // In the future we should be able to add a previously removed peer + return fmt.Errorf("cannot add duplicate peer %s", peerID) + } + sc.peers[peerID] = newScPeer(peerID) + return nil +} + +func (sc *scheduler) touchPeer(peerID p2p.ID, time time.Time) error { + peer, ok := sc.peers[peerID] + if !ok { + return fmt.Errorf("couldn't find peer %s", peerID) + } + + if peer.state != peerStateReady { + return fmt.Errorf("tried to touch peer in state %s, must be Ready", peer.state) + } + + peer.lastTouched = time + + return nil +} + +func (sc *scheduler) removePeer(peerID p2p.ID) error { + peer, ok := sc.peers[peerID] + if !ok { + return fmt.Errorf("couldn't find peer %s", peerID) + } + + if peer.state == peerStateRemoved { + return fmt.Errorf("tried to remove peer %s in peerStateRemoved", peerID) + } + + for height, pendingPeerID := range sc.pendingBlocks { + if pendingPeerID == peerID { + sc.setStateAtHeight(height, blockStateNew) + delete(sc.pendingTime, height) + delete(sc.pendingBlocks, height) + } + } + + for height, rcvPeerID := range sc.receivedBlocks { + if rcvPeerID == peerID { + sc.setStateAtHeight(height, blockStateNew) + delete(sc.receivedBlocks, height) + } + } + + // remove the blocks from blockStates if the peer removal causes the max peer height to be lower. + peer.state = peerStateRemoved + maxPeerHeight := int64(0) + for _, otherPeer := range sc.peers { + if otherPeer.state != peerStateReady { + continue + } + if otherPeer.peerID != peer.peerID && otherPeer.height > maxPeerHeight { + maxPeerHeight = otherPeer.height + } + } + for h := range sc.blockStates { + if h > maxPeerHeight { + delete(sc.blockStates, h) + } + } + + return nil +} + +// check if the blockPool is running low and add new blocks in New state to be requested. +// This function is called when there is an increase in the maximum peer height or when +// blocks are processed. +func (sc *scheduler) addNewBlocks() { + if len(sc.blockStates) >= sc.targetPending { + return + } + + for i := sc.height; i < int64(sc.targetPending)+sc.height; i++ { + if i > sc.maxHeight() { + break + } + if sc.getStateAtHeight(i) == blockStateUnknown { + sc.setStateAtHeight(i, blockStateNew) + } + } +} + +func (sc *scheduler) setPeerHeight(peerID p2p.ID, height int64) error { + peer, ok := sc.peers[peerID] + if !ok { + return fmt.Errorf("cannot find peer %s", peerID) + } + + if peer.state == peerStateRemoved { + return fmt.Errorf("cannot set peer height for a peer in peerStateRemoved") + } + + if height < peer.height { + return fmt.Errorf("cannot move peer height lower. from %d to %d", peer.height, height) + } + + peer.height = height + peer.state = peerStateReady + + sc.addNewBlocks() + return nil +} + +func (sc *scheduler) getStateAtHeight(height int64) blockState { + if height <= sc.initHeight { + return blockStateProcessed + } else if state, ok := sc.blockStates[height]; ok { + return state + } else { + return blockStateUnknown + } +} + +func (sc *scheduler) getPeersAtHeightOrAbove(height int64) []p2p.ID { + peers := make([]p2p.ID, 0) + for _, peer := range sc.peers { + if peer.state != peerStateReady { + continue + } + if peer.height >= height { + peers = append(peers, peer.peerID) + } + } + return peers +} + +func (sc *scheduler) peersInactiveSince(duration time.Duration, now time.Time) []p2p.ID { + peers := []p2p.ID{} + for _, peer := range sc.peers { + if peer.state != peerStateReady { + continue + } + if now.Sub(peer.lastTouched) > duration { + peers = append(peers, peer.peerID) + } + } + + // Ensure the order is deterministic for testing + sort.Sort(PeerByID(peers)) + return peers +} + +// will return peers who's lastRate i slower than minSpeed denominated in bytes +func (sc *scheduler) peersSlowerThan(minSpeed int64) []p2p.ID { + peers := []p2p.ID{} + for peerID, peer := range sc.peers { + if peer.state != peerStateReady { + continue + } + if peer.lastRate < minSpeed { + peers = append(peers, peerID) + } + } + + // Ensure the order is deterministic for testing + sort.Sort(PeerByID(peers)) + return peers +} + +func (sc *scheduler) prunablePeers(peerTimout time.Duration, minRecvRate int64, now time.Time) []p2p.ID { + prunable := []p2p.ID{} + for peerID, peer := range sc.peers { + if peer.state != peerStateReady { + continue + } + if now.Sub(peer.lastTouched) > peerTimout || peer.lastRate < minRecvRate { + prunable = append(prunable, peerID) + } + } + // Tests for handleTryPrunePeer() may fail without sort due to range non-determinism + sort.Sort(PeerByID(prunable)) + return prunable +} + +func (sc *scheduler) setStateAtHeight(height int64, state blockState) { + sc.blockStates[height] = state +} + +func (sc *scheduler) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error { + peer, ok := sc.peers[peerID] + if !ok { + return fmt.Errorf("couldn't find peer %s", peerID) + } + + if peer.state == peerStateRemoved { + return fmt.Errorf("cannot receive blocks from removed peer %s", peerID) + } + + if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID { + return fmt.Errorf("received block %d from peer %s without being requested", height, peerID) + } + + pendingTime, ok := sc.pendingTime[height] + if !ok || now.Sub(pendingTime) <= 0 { + return fmt.Errorf("clock error: block %d received at %s but requested at %s", + height, pendingTime, now) + } + + peer.lastRate = size / now.Sub(pendingTime).Nanoseconds() + + sc.setStateAtHeight(height, blockStateReceived) + delete(sc.pendingBlocks, height) + delete(sc.pendingTime, height) + + sc.receivedBlocks[height] = peerID + + return nil +} + +func (sc *scheduler) markPending(peerID p2p.ID, height int64, time time.Time) error { + state := sc.getStateAtHeight(height) + if state != blockStateNew { + return fmt.Errorf("block %d should be in blockStateNew but is %s", height, state) + } + + peer, ok := sc.peers[peerID] + if !ok { + return fmt.Errorf("cannot find peer %s", peerID) + } + + if peer.state != peerStateReady { + return fmt.Errorf("cannot schedule %d from %s in %s", height, peerID, peer.state) + } + + if height > peer.height { + return fmt.Errorf("cannot request height %d from peer %s that is at height %d", + height, peerID, peer.height) + } + + sc.setStateAtHeight(height, blockStatePending) + sc.pendingBlocks[height] = peerID + // XXX: to make this more accurate we can introduce a message from + // the IO routine which indicates the time the request was put on the wire + sc.pendingTime[height] = time + + return nil +} + +func (sc *scheduler) markProcessed(height int64) error { + state := sc.getStateAtHeight(height) + if state != blockStateReceived { + return fmt.Errorf("cannot mark height %d received from block state %s", height, state) + } + + sc.height++ + delete(sc.receivedBlocks, height) + delete(sc.blockStates, height) + sc.addNewBlocks() + + return nil +} + +func (sc *scheduler) allBlocksProcessed() bool { + return sc.height >= sc.maxHeight() +} + +// returns max peer height or the last processed block, i.e. sc.height +func (sc *scheduler) maxHeight() int64 { + max := sc.height - 1 + for _, peer := range sc.peers { + if peer.state != peerStateReady { + continue + } + if peer.height > max { + max = peer.height + } + } + return max +} + +// lowest block in sc.blockStates with state == blockStateNew or -1 if no new blocks +func (sc *scheduler) nextHeightToSchedule() int64 { + var min int64 = math.MaxInt64 + for height, state := range sc.blockStates { + if state == blockStateNew && height < min { + min = height + } + } + if min == math.MaxInt64 { + min = -1 + } + return min +} + +func (sc *scheduler) pendingFrom(peerID p2p.ID) []int64 { + var heights []int64 + for height, pendingPeerID := range sc.pendingBlocks { + if pendingPeerID == peerID { + heights = append(heights, height) + } + } + return heights +} + +func (sc *scheduler) selectPeer(height int64) (p2p.ID, error) { + peers := sc.getPeersAtHeightOrAbove(height) + if len(peers) == 0 { + return "", fmt.Errorf("cannot find peer for height %d", height) + } + + // create a map from number of pending requests to a list + // of peers having that number of pending requests. + pendingFrom := make(map[int][]p2p.ID) + for _, peerID := range peers { + numPending := len(sc.pendingFrom(peerID)) + pendingFrom[numPending] = append(pendingFrom[numPending], peerID) + } + + // find the set of peers with minimum number of pending requests. + minPending := math.MaxInt64 + for mp := range pendingFrom { + if mp < minPending { + minPending = mp + } + } + + sort.Sort(PeerByID(pendingFrom[minPending])) + return pendingFrom[minPending][0], nil +} + +// PeerByID is a list of peers sorted by peerID. +type PeerByID []p2p.ID + +func (peers PeerByID) Len() int { + return len(peers) +} +func (peers PeerByID) Less(i, j int) bool { + return bytes.Compare([]byte(peers[i]), []byte(peers[j])) == -1 +} + +func (peers PeerByID) Swap(i, j int) { + it := peers[i] + peers[i] = peers[j] + peers[j] = it +} + +// Handlers + +// This handler gets the block, performs some validation and then passes it on to the processor. +func (sc *scheduler) handleBlockResponse(event bcBlockResponse) (Event, error) { + err := sc.touchPeer(event.peerID, event.time) + if err != nil { + return scPeerError{peerID: event.peerID, reason: err}, nil + } + + err = sc.markReceived(event.peerID, event.block.Height, event.size, event.time) + if err != nil { + return scPeerError{peerID: event.peerID, reason: err}, nil + } + + return scBlockReceived{peerID: event.peerID, block: event.block}, nil +} + +func (sc *scheduler) handleBlockProcessed(event pcBlockProcessed) (Event, error) { + if event.height != sc.height { + panic(fmt.Sprintf("processed height %d but expected height %d", event.height, sc.height)) + } + err := sc.markProcessed(event.height) + if err != nil { + // It is possible that a peer error or timeout is handled after the processor + // has processed the block but before the scheduler received this event, + // so when pcBlockProcessed event is received the block had been requested again + return scSchedulerFail{reason: err}, nil + } + + if sc.allBlocksProcessed() { + return scFinishedEv{}, nil + } + + return noOp, nil +} + +// Handles an error from the processor. The processor had already cleaned the blocks from +// the peers included in this event. Just attempt to remove the peers. +func (sc *scheduler) handleBlockProcessError(event pcBlockVerificationFailure) (Event, error) { + if len(sc.peers) == 0 { + return noOp, nil + } + // The peers may have been just removed due to errors, low speed or timeouts. + _ = sc.removePeer(event.firstPeerID) + if event.firstPeerID != event.secondPeerID { + _ = sc.removePeer(event.secondPeerID) + } + + if sc.allBlocksProcessed() { + return scFinishedEv{}, nil + } + + return noOp, nil +} + +func (sc *scheduler) handleAddNewPeer(event addNewPeer) (Event, error) { + err := sc.addPeer(event.peerID) + if err != nil { + return scSchedulerFail{reason: err}, nil + } + return noOp, nil +} + +// XXX: unify types peerError +func (sc *scheduler) handlePeerError(event peerError) (Event, error) { + err := sc.removePeer(event.peerID) + if err != nil { + // XXX - It is possible that the removePeer fails here for legitimate reasons + // for example if a peer timeout or error was handled just before this. + return scSchedulerFail{reason: err}, nil + } + if sc.allBlocksProcessed() { + return scFinishedEv{}, nil + } + return noOp, nil +} + +func (sc *scheduler) handleTryPrunePeer(event tryPrunePeer) (Event, error) { + prunablePeers := sc.prunablePeers(sc.peerTimeout, sc.minRecvRate, event.time) + if len(prunablePeers) == 0 { + return noOp, nil + } + for _, peerID := range prunablePeers { + err := sc.removePeer(peerID) + if err != nil { + // Should never happen as prunablePeers() returns only existing peers in Ready state. + panic("scheduler data corruption") + } + } + + // If all blocks are processed we should finish even some peers were pruned. + if sc.allBlocksProcessed() { + return scFinishedEv{}, nil + } + + return scPeersPruned{peers: prunablePeers}, nil + +} + +// TODO - Schedule multiple block requests +func (sc *scheduler) handleTrySchedule(event trySchedule) (Event, error) { + + nextHeight := sc.nextHeightToSchedule() + if nextHeight == -1 { + return noOp, nil + } + + bestPeerID, err := sc.selectPeer(nextHeight) + if err != nil { + return scSchedulerFail{reason: err}, nil + } + if err := sc.markPending(bestPeerID, nextHeight, event.time); err != nil { + return scSchedulerFail{reason: err}, nil // XXX: peerError might be more appropriate + } + return scBlockRequest{peerID: bestPeerID, height: nextHeight}, nil + +} + +func (sc *scheduler) handleStatusResponse(event bcStatusResponse) (Event, error) { + err := sc.setPeerHeight(event.peerID, event.height) + if err != nil { + return scPeerError{peerID: event.peerID, reason: err}, nil + } + return noOp, nil +} + +func (sc *scheduler) handle(event Event) (Event, error) { + switch event := event.(type) { + case bcStatusResponse: + nextEvent, err := sc.handleStatusResponse(event) + return nextEvent, err + case bcBlockResponse: + nextEvent, err := sc.handleBlockResponse(event) + return nextEvent, err + case trySchedule: + nextEvent, err := sc.handleTrySchedule(event) + return nextEvent, err + case addNewPeer: + nextEvent, err := sc.handleAddNewPeer(event) + return nextEvent, err + case tryPrunePeer: + nextEvent, err := sc.handleTryPrunePeer(event) + return nextEvent, err + case peerError: + nextEvent, err := sc.handlePeerError(event) + return nextEvent, err + case pcBlockProcessed: + nextEvent, err := sc.handleBlockProcessed(event) + return nextEvent, err + case pcBlockVerificationFailure: + nextEvent, err := sc.handleBlockProcessError(event) + return nextEvent, err + default: + return scSchedulerFail{reason: fmt.Errorf("unknown event %v", event)}, nil + } + //return noOp, nil +} diff --git a/blockchain/v2/scheduler_test.go b/blockchain/v2/scheduler_test.go new file mode 100644 index 000000000..a3a6db672 --- /dev/null +++ b/blockchain/v2/scheduler_test.go @@ -0,0 +1,2293 @@ +package v2 + +import ( + "fmt" + "math" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/types" +) + +type scTestParams struct { + peers map[string]*scPeer + initHeight int64 + height int64 + allB []int64 + pending map[int64]p2p.ID + pendingTime map[int64]time.Time + received map[int64]p2p.ID + peerTimeout time.Duration + minRecvRate int64 + targetPending int +} + +func verifyScheduler(sc *scheduler) { + missing := 0 + if sc.maxHeight() >= sc.height { + missing = int(math.Min(float64(sc.targetPending), float64(sc.maxHeight()-sc.height+1))) + } + if len(sc.blockStates) != missing { + panic(fmt.Sprintf("scheduler block length %d different than target %d", len(sc.blockStates), missing)) + } +} + +func newTestScheduler(params scTestParams) *scheduler { + peers := make(map[p2p.ID]*scPeer) + + sc := newScheduler(params.initHeight) + if params.height != 0 { + sc.height = params.height + } + + for id, peer := range params.peers { + peer.peerID = p2p.ID(id) + peers[p2p.ID(id)] = peer + } + for _, h := range params.allB { + sc.blockStates[h] = blockStateNew + } + for h, pid := range params.pending { + sc.blockStates[h] = blockStatePending + sc.pendingBlocks[h] = pid + } + for h, tm := range params.pendingTime { + sc.pendingTime[h] = tm + } + for h, pid := range params.received { + sc.blockStates[h] = blockStateReceived + sc.receivedBlocks[h] = pid + } + + sc.peers = peers + sc.peerTimeout = params.peerTimeout + if params.targetPending == 0 { + sc.targetPending = 10 + } else { + sc.targetPending = params.targetPending + } + + sc.minRecvRate = params.minRecvRate + + verifyScheduler(sc) + + return sc +} + +func TestScInit(t *testing.T) { + var ( + initHeight int64 = 5 + sc = newScheduler(initHeight) + ) + assert.Equal(t, blockStateProcessed, sc.getStateAtHeight(initHeight)) + assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(initHeight+1)) +} + +func TestScMaxHeights(t *testing.T) { + + tests := []struct { + name string + sc scheduler + wantMax int64 + }{ + { + name: "no peers", + sc: scheduler{height: 11}, + wantMax: 10, + }, + { + name: "one ready peer", + sc: scheduler{ + initHeight: 2, + height: 3, + peers: map[p2p.ID]*scPeer{"P1": {height: 6, state: peerStateReady}}, + }, + wantMax: 6, + }, + { + name: "ready and removed peers", + sc: scheduler{ + height: 1, + peers: map[p2p.ID]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 10, state: peerStateRemoved}}, + }, + wantMax: 4, + }, + { + name: "removed peers", + sc: scheduler{ + height: 1, + peers: map[p2p.ID]*scPeer{ + "P1": {height: 4, state: peerStateRemoved}, + "P2": {height: 10, state: peerStateRemoved}}, + }, + wantMax: 0, + }, + { + name: "new peers", + sc: scheduler{ + height: 1, + peers: map[p2p.ID]*scPeer{ + "P1": {height: -1, state: peerStateNew}, + "P2": {height: -1, state: peerStateNew}}, + }, + wantMax: 0, + }, + { + name: "mixed peers", + sc: scheduler{ + height: 1, + peers: map[p2p.ID]*scPeer{ + "P1": {height: -1, state: peerStateNew}, + "P2": {height: 10, state: peerStateReady}, + "P3": {height: 20, state: peerStateRemoved}, + "P4": {height: 22, state: peerStateReady}, + }, + }, + wantMax: 22, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + // maxHeight() should not mutate the scheduler + wantSc := tt.sc + + resMax := tt.sc.maxHeight() + assert.Equal(t, tt.wantMax, resMax) + assert.Equal(t, wantSc, tt.sc) + }) + } +} + +func TestScAddPeer(t *testing.T) { + + type args struct { + peerID p2p.ID + } + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "add first peer", + fields: scTestParams{}, + args: args{peerID: "P1"}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}}, + }, + { + name: "add second peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}}, + args: args{peerID: "P2"}, + wantFields: scTestParams{peers: map[string]*scPeer{ + "P1": {state: peerStateNew, height: -1}, + "P2": {state: peerStateNew, height: -1}}}, + }, + { + name: "attempt to add duplicate peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}}, + args: args{peerID: "P1"}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew, height: -1}}}, + wantErr: true, + }, + { + name: "attempt to add duplicate peer with existing peer in Ready state", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {state: peerStateReady, height: 3}}, + allB: []int64{1, 2, 3}, + }, + args: args{peerID: "P1"}, + wantErr: true, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {state: peerStateReady, height: 3}}, + allB: []int64{1, 2, 3}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.addPeer(tt.args.peerID); (err != nil) != tt.wantErr { + t.Errorf("scAddPeer() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc, "wanted peers %v, got %v", wantSc.peers, sc.peers) + }) + } +} + +func TestScTouchPeer(t *testing.T) { + now := time.Now() + + type args struct { + peerID p2p.ID + time time.Time + } + + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "attempt to touch non existing peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {state: peerStateReady, height: 5}}, + allB: []int64{1, 2, 3, 4, 5}, + }, + args: args{peerID: "P2", time: now}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, height: 5}}, + allB: []int64{1, 2, 3, 4, 5}, + }, + wantErr: true, + }, + { + name: "attempt to touch peer in state New", + fields: scTestParams{peers: map[string]*scPeer{"P1": {}}}, + args: args{peerID: "P1", time: now}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {}}}, + wantErr: true, + }, + { + name: "attempt to touch peer in state Removed", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved}, "P2": {state: peerStateReady}}}, + args: args{peerID: "P1", time: now}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved}, "P2": {state: peerStateReady}}}, + wantErr: true, + }, + { + name: "touch peer in state Ready", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastTouched: now}}}, + args: args{peerID: "P1", time: now.Add(3 * time.Second)}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, + lastTouched: now.Add(3 * time.Second)}}}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.touchPeer(tt.args.peerID, tt.args.time); (err != nil) != tt.wantErr { + t.Errorf("touchPeer() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc, "wanted peers %v, got %v", wantSc.peers, sc.peers) + }) + } +} + +func TestScPeersInactiveSince(t *testing.T) { + now := time.Now() + + type args struct { + threshold time.Duration + time time.Time + } + + tests := []struct { + name string + fields scTestParams + args args + wantResult []p2p.ID + }{ + { + name: "no peers", + fields: scTestParams{peers: map[string]*scPeer{}}, + args: args{threshold: time.Second, time: now}, + wantResult: []p2p.ID{}, + }, + { + name: "one active peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastTouched: now}}}, + args: args{threshold: time.Second, time: now}, + wantResult: []p2p.ID{}, + }, + { + name: "one inactive peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastTouched: now}}}, + args: args{threshold: time.Second, time: now.Add(time.Second + time.Millisecond)}, + wantResult: []p2p.ID{"P1"}, + }, + { + name: "one active and one inactive peer", + fields: scTestParams{peers: map[string]*scPeer{ + "P1": {state: peerStateReady, lastTouched: now}, + "P2": {state: peerStateReady, lastTouched: now.Add(time.Second)}}}, + args: args{threshold: time.Second, time: now.Add(time.Second + time.Millisecond)}, + wantResult: []p2p.ID{"P1"}, + }, + { + name: "one New peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew}}}, + args: args{threshold: time.Second, time: now.Add(time.Millisecond)}, + wantResult: []p2p.ID{}, + }, + { + name: "one Removed peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastTouched: now}}}, + args: args{threshold: time.Second, time: now.Add(time.Millisecond)}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready active peer and one New", + fields: scTestParams{peers: map[string]*scPeer{ + "P1": {state: peerStateRemoved, lastTouched: now}, + "P2": {state: peerStateReady, lastTouched: now.Add(time.Millisecond)}}}, + args: args{threshold: time.Second, time: now.Add(2 * time.Millisecond)}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready inactive peer and one New", + fields: scTestParams{peers: map[string]*scPeer{ + "P1": {state: peerStateRemoved, lastTouched: now}, + "P2": {state: peerStateReady, lastTouched: now.Add(time.Millisecond)}}}, + args: args{threshold: time.Second, time: now.Add(time.Second + 2*time.Millisecond)}, + wantResult: []p2p.ID{"P2"}, + }, + { + name: "combination of New, Removed and, active and non active Ready peers", + fields: scTestParams{peers: map[string]*scPeer{ + "P1": {state: peerStateNew}, + "P2": {state: peerStateRemoved, lastTouched: now}, + "P3": {state: peerStateRemoved, lastTouched: now.Add(time.Second)}, + "P4": {state: peerStateReady, lastTouched: now.Add(time.Millisecond)}, + "P5": {state: peerStateReady, lastTouched: now.Add(3 * time.Millisecond)}}}, + args: args{threshold: time.Second, time: now.Add(time.Second + 2*time.Millisecond)}, + wantResult: []p2p.ID{"P4"}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // peersInactiveSince should not mutate the scheduler + wantSc := sc + res := sc.peersInactiveSince(tt.args.threshold, tt.args.time) + sort.Sort(PeerByID(res)) + assert.Equal(t, tt.wantResult, res) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScPeersSlowerThan(t *testing.T) { + type args struct { + minSpeed int64 + } + + tests := []struct { + name string + fields scTestParams + args args + wantResult []p2p.ID + }{ + { + name: "no peers", + fields: scTestParams{peers: map[string]*scPeer{}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready faster peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastRate: 101}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready equal peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastRate: 100}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready slow peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateReady, lastRate: 99}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{"P1"}, + }, + { + name: "one Removed faster peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastRate: 101}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, { + name: "one Removed equal peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastRate: 100}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "one Removed slow peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateRemoved, lastRate: 99}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "one New peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "one New peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {state: peerStateNew}}}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "mixed peers", + fields: scTestParams{peers: map[string]*scPeer{ + "P1": {state: peerStateRemoved, lastRate: 101}, + "P2": {state: peerStateReady, lastRate: 101}, + "P3": {state: peerStateRemoved, lastRate: 100}, + "P4": {state: peerStateReady, lastRate: 100}, + "P5": {state: peerStateReady, lastRate: 99}, + "P6": {state: peerStateNew}, + "P7": {state: peerStateRemoved, lastRate: 99}, + "P8": {state: peerStateReady, lastRate: 99}, + }}, + args: args{minSpeed: 100}, + wantResult: []p2p.ID{"P5", "P8"}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // peersSlowerThan should not mutate the scheduler + wantSc := sc + res := sc.peersSlowerThan(tt.args.minSpeed) + assert.Equal(t, tt.wantResult, res) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScPrunablePeers(t *testing.T) { + now := time.Now() + + type args struct { + threshold time.Duration + time time.Time + minSpeed int64 + } + + tests := []struct { + name string + fields scTestParams + args args + wantResult []p2p.ID + }{ + { + name: "no peers", + fields: scTestParams{peers: map[string]*scPeer{}}, + args: args{threshold: time.Second, time: now.Add(time.Second + time.Millisecond), minSpeed: 100}, + wantResult: []p2p.ID{}, + }, + { + name: "mixed peers", + fields: scTestParams{peers: map[string]*scPeer{ + // X - removed, active, fast + "P1": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 101}, + // X - ready, active, fast + "P2": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 101}, + // X - removed, active, equal + "P3": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 100}, + // V - ready, inactive, equal + "P4": {state: peerStateReady, lastTouched: now, lastRate: 100}, + // V - ready, inactive, slow + "P5": {state: peerStateReady, lastTouched: now, lastRate: 99}, + // V - ready, active, slow + "P6": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 90}, + }}, + args: args{threshold: time.Second, time: now.Add(time.Second + time.Millisecond), minSpeed: 100}, + wantResult: []p2p.ID{"P4", "P5", "P6"}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // peersSlowerThan should not mutate the scheduler + wantSc := sc + res := sc.prunablePeers(tt.args.threshold, tt.args.minSpeed, tt.args.time) + assert.Equal(t, tt.wantResult, res) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScRemovePeer(t *testing.T) { + + type args struct { + peerID p2p.ID + } + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "remove non existing peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}}, + args: args{peerID: "P2"}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}}, + wantErr: true, + }, + { + name: "remove single New peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}}}, + args: args{peerID: "P1"}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateRemoved}}}, + }, + { + name: "remove one of two New peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1}, "P2": {height: -1}}}, + args: args{peerID: "P1"}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateRemoved}, "P2": {height: -1}}}, + }, + { + name: "remove one Ready peer, all peers removed", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 10, state: peerStateRemoved}, + "P2": {height: 5, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5}, + }, + args: args{peerID: "P2"}, + wantFields: scTestParams{peers: map[string]*scPeer{ + "P1": {height: 10, state: peerStateRemoved}, + "P2": {height: 5, state: peerStateRemoved}}, + }, + }, + { + name: "attempt to remove already removed peer", + fields: scTestParams{ + height: 8, + peers: map[string]*scPeer{ + "P1": {height: 10, state: peerStateRemoved}, + "P2": {height: 11, state: peerStateReady}}, + allB: []int64{8, 9, 10, 11}, + }, + args: args{peerID: "P1"}, + wantFields: scTestParams{ + height: 8, + peers: map[string]*scPeer{ + "P1": {height: 10, state: peerStateRemoved}, + "P2": {height: 11, state: peerStateReady}}, + allB: []int64{8, 9, 10, 11}}, + wantErr: true, + }, + { + name: "remove Ready peer with blocks requested", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, + allB: []int64{1, 2, 3}, + pending: map[int64]p2p.ID{1: "P1"}, + }, + args: args{peerID: "P1"}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateRemoved}}, + allB: []int64{}, + pending: map[int64]p2p.ID{}, + }, + }, + { + name: "remove Ready peer with blocks received", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, + allB: []int64{1, 2, 3}, + received: map[int64]p2p.ID{1: "P1"}, + }, + args: args{peerID: "P1"}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateRemoved}}, + allB: []int64{}, + received: map[int64]p2p.ID{}, + }, + }, + { + name: "remove Ready peer with blocks received and requested (not yet received)", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{1: "P1", 3: "P1"}, + received: map[int64]p2p.ID{2: "P1", 4: "P1"}, + }, + args: args{peerID: "P1"}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}, + allB: []int64{}, + pending: map[int64]p2p.ID{}, + received: map[int64]p2p.ID{}, + }, + }, + { + name: "remove Ready peer from multiple peers set, with blocks received and requested (not yet received)", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 6, state: peerStateReady}, + "P2": {height: 6, state: peerStateReady}, + }, + allB: []int64{1, 2, 3, 4, 5, 6}, + pending: map[int64]p2p.ID{1: "P1", 3: "P2", 6: "P1"}, + received: map[int64]p2p.ID{2: "P1", 4: "P2", 5: "P2"}, + }, + args: args{peerID: "P1"}, + wantFields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 6, state: peerStateRemoved}, + "P2": {height: 6, state: peerStateReady}, + }, + allB: []int64{1, 2, 3, 4, 5, 6}, + pending: map[int64]p2p.ID{3: "P2"}, + received: map[int64]p2p.ID{4: "P2", 5: "P2"}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.removePeer(tt.args.peerID); (err != nil) != tt.wantErr { + t.Errorf("removePeer() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc, "wanted peers %v, got %v", wantSc.peers, sc.peers) + }) + } +} + +func TestScSetPeerHeight(t *testing.T) { + + type args struct { + peerID p2p.ID + height int64 + } + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "change height of non existing peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + args: args{peerID: "P2", height: 4}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + wantErr: true, + }, + { + name: "increase height of removed peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + args: args{peerID: "P1", height: 4}, + wantFields: scTestParams{peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + wantErr: true, + }, + { + name: "decrease height of single peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + args: args{peerID: "P1", height: 2}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + wantErr: true, + }, + { + name: "increase height of single peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + args: args{peerID: "P1", height: 4}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + }, + { + name: "noop height change of single peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + args: args{peerID: "P1", height: 4}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + }, + { + name: "add peer with huge height 10**10 ", + fields: scTestParams{ + peers: map[string]*scPeer{"P2": {height: -1, state: peerStateNew}}, + targetPending: 4, + }, + args: args{peerID: "P2", height: 10000000000}, + wantFields: scTestParams{ + targetPending: 4, + peers: map[string]*scPeer{"P2": {height: 10000000000, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.setPeerHeight(tt.args.peerID, tt.args.height); (err != nil) != tt.wantErr { + t.Errorf("setPeerHeight() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc, "wanted peers %v, got %v", wantSc.peers, sc.peers) + }) + } +} + +func TestScGetPeersAtHeight(t *testing.T) { + + type args struct { + height int64 + } + tests := []struct { + name string + fields scTestParams + args args + wantResult []p2p.ID + }{ + { + name: "no peers", + fields: scTestParams{peers: map[string]*scPeer{}}, + args: args{height: 10}, + wantResult: []p2p.ID{}, + }, + { + name: "only new peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}}, + args: args{height: 10}, + wantResult: []p2p.ID{}, + }, + { + name: "only Removed peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}}, + args: args{height: 2}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready shorter peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + args: args{height: 5}, + wantResult: []p2p.ID{}, + }, + { + name: "one Ready equal peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + args: args{height: 4}, + wantResult: []p2p.ID{"P1"}, + }, + { + name: "one Ready higher peer", + fields: scTestParams{ + targetPending: 4, + peers: map[string]*scPeer{"P1": {height: 20, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + args: args{height: 4}, + wantResult: []p2p.ID{"P1"}, + }, + { + name: "multiple mixed peers", + fields: scTestParams{ + height: 8, + peers: map[string]*scPeer{ + "P1": {height: -1, state: peerStateNew}, + "P2": {height: 10, state: peerStateReady}, + "P3": {height: 5, state: peerStateReady}, + "P4": {height: 20, state: peerStateRemoved}, + "P5": {height: 11, state: peerStateReady}}, + allB: []int64{8, 9, 10, 11}, + }, + args: args{height: 8}, + wantResult: []p2p.ID{"P2", "P5"}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // getPeersAtHeightOrAbove should not mutate the scheduler + wantSc := sc + res := sc.getPeersAtHeightOrAbove(tt.args.height) + sort.Sort(PeerByID(res)) + assert.Equal(t, tt.wantResult, res) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScMarkPending(t *testing.T) { + now := time.Now() + + type args struct { + peerID p2p.ID + height int64 + tm time.Time + } + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "attempt mark pending an unknown block", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + args: args{peerID: "P1", height: 3, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + wantErr: true, + }, + { + name: "attempt mark pending from non existing peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + args: args{peerID: "P2", height: 1, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + wantErr: true, + }, + { + name: "mark pending from Removed peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + args: args{peerID: "P1", height: 1, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + wantErr: true, + }, + { + name: "mark pending from New peer", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 4, state: peerStateNew}, + }, + allB: []int64{1, 2, 3, 4}, + }, + args: args{peerID: "P2", height: 2, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 4, state: peerStateNew}, + }, + allB: []int64{1, 2, 3, 4}, + }, + wantErr: true, + }, + { + name: "mark pending from short peer", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 2, state: peerStateReady}, + }, + allB: []int64{1, 2, 3, 4}, + }, + args: args{peerID: "P2", height: 3, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 2, state: peerStateReady}, + }, + allB: []int64{1, 2, 3, 4}, + }, + wantErr: true, + }, + { + name: "mark pending all good", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{1: "P1"}, + pendingTime: map[int64]time.Time{1: now}, + }, + args: args{peerID: "P1", height: 2, tm: now.Add(time.Millisecond)}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1"}, + pendingTime: map[int64]time.Time{1: now, 2: now.Add(time.Millisecond)}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.markPending(tt.args.peerID, tt.args.height, tt.args.tm); (err != nil) != tt.wantErr { + t.Errorf("markPending() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScMarkReceived(t *testing.T) { + now := time.Now() + + type args struct { + peerID p2p.ID + height int64 + size int64 + tm time.Time + } + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "received from non existing peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + args: args{peerID: "P2", height: 1, size: 1000, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + wantErr: true, + }, + { + name: "received from removed peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + args: args{peerID: "P1", height: 1, size: 1000, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + wantErr: true, + }, + { + name: "received from unsolicited peer", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 4, state: peerStateReady}, + }, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{1: "P1", 2: "P2", 3: "P2", 4: "P1"}, + }, + args: args{peerID: "P1", height: 2, size: 1000, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 4, state: peerStateReady}, + }, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{1: "P1", 2: "P2", 3: "P2", 4: "P1"}, + }, + wantErr: true, + }, + { + name: "received but blockRequest not sent", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{}, + }, + args: args{peerID: "P1", height: 2, size: 1000, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{}, + }, + wantErr: true, + }, + { + name: "received with bad timestamp", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1"}, + pendingTime: map[int64]time.Time{1: now, 2: now.Add(time.Second)}, + }, + args: args{peerID: "P1", height: 2, size: 1000, tm: now}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1"}, + pendingTime: map[int64]time.Time{1: now, 2: now.Add(time.Second)}, + }, + wantErr: true, + }, + { + name: "received all good", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1"}, + pendingTime: map[int64]time.Time{1: now, 2: now}, + }, + args: args{peerID: "P1", height: 2, size: 1000, tm: now.Add(time.Millisecond)}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{1: "P1"}, + pendingTime: map[int64]time.Time{1: now}, + received: map[int64]p2p.ID{2: "P1"}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.markReceived(tt.args.peerID, + tt.args.height, tt.args.size, now.Add(time.Second)); (err != nil) != tt.wantErr { + t.Errorf("markReceived() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScMarkProcessed(t *testing.T) { + now := time.Now() + + type args struct { + height int64 + } + tests := []struct { + name string + fields scTestParams + args args + wantFields scTestParams + wantErr bool + }{ + { + name: "processed an unreceived block", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{2: "P1"}, + pendingTime: map[int64]time.Time{2: now}, + received: map[int64]p2p.ID{1: "P1"}}, + args: args{height: 2}, + wantFields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{2: "P1"}, + pendingTime: map[int64]time.Time{2: now}, + received: map[int64]p2p.ID{1: "P1"}}, + wantErr: true, + }, + { + name: "mark processed success", + fields: scTestParams{ + height: 1, + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + pending: map[int64]p2p.ID{2: "P1"}, + pendingTime: map[int64]time.Time{2: now}, + received: map[int64]p2p.ID{1: "P1"}}, + args: args{height: 1}, + wantFields: scTestParams{ + height: 2, + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{2}, + pending: map[int64]p2p.ID{2: "P1"}, + pendingTime: map[int64]time.Time{2: now}}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + if err := sc.markProcessed(tt.args.height); (err != nil) != tt.wantErr { + t.Errorf("markProcessed() wantErr %v, error = %v", tt.wantErr, err) + } + wantSc := newTestScheduler(tt.wantFields) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScAllBlocksProcessed(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + fields scTestParams + wantResult bool + }{ + { + name: "no blocks", + fields: scTestParams{}, + wantResult: true, + }, + { + name: "only New blocks", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + wantResult: false, + }, + { + name: "only Pending blocks", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1", 4: "P1"}, + pendingTime: map[int64]time.Time{1: now, 2: now, 3: now, 4: now}, + }, + wantResult: false, + }, + { + name: "only Received blocks", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + received: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1", 4: "P1"}, + }, + wantResult: false, + }, + { + name: "only Processed blocks plus highest is received", + fields: scTestParams{ + height: 4, + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}}, + allB: []int64{4}, + received: map[int64]p2p.ID{4: "P1"}, + }, + wantResult: true, + }, + { + name: "mixed block states", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{2: "P1", 4: "P1"}, + pendingTime: map[int64]time.Time{2: now, 4: now}, + }, + wantResult: false, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // allBlocksProcessed() should not mutate the scheduler + wantSc := sc + res := sc.allBlocksProcessed() + assert.Equal(t, tt.wantResult, res) + assert.Equal(t, wantSc, sc) + }) + } +} + +func TestScNextHeightToSchedule(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + fields scTestParams + wantHeight int64 + }{ + { + name: "no blocks", + fields: scTestParams{initHeight: 10, height: 11}, + wantHeight: -1, + }, + { + name: "only New blocks", + fields: scTestParams{ + initHeight: 2, + height: 3, + peers: map[string]*scPeer{"P1": {height: 6, state: peerStateReady}}, + allB: []int64{3, 4, 5, 6}, + }, + wantHeight: 3, + }, + { + name: "only Pending blocks", + fields: scTestParams{ + height: 1, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1", 4: "P1"}, + pendingTime: map[int64]time.Time{1: now, 2: now, 3: now, 4: now}, + }, + wantHeight: -1, + }, + { + name: "only Received blocks", + fields: scTestParams{ + height: 1, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + received: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1", 4: "P1"}, + }, + wantHeight: -1, + }, + { + name: "only Processed blocks", + fields: scTestParams{ + height: 1, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + wantHeight: 1, + }, + { + name: "mixed block states", + fields: scTestParams{ + height: 1, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + pending: map[int64]p2p.ID{2: "P1"}, + pendingTime: map[int64]time.Time{2: now}, + }, + wantHeight: 1, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // nextHeightToSchedule() should not mutate the scheduler + wantSc := sc + + resMin := sc.nextHeightToSchedule() + assert.Equal(t, tt.wantHeight, resMin) + assert.Equal(t, wantSc, sc) + + }) + } +} + +func TestScSelectPeer(t *testing.T) { + + type args struct { + height int64 + } + tests := []struct { + name string + fields scTestParams + args args + wantResult p2p.ID + wantError bool + }{ + { + name: "no peers", + fields: scTestParams{peers: map[string]*scPeer{}}, + args: args{height: 10}, + wantResult: "", + wantError: true, + }, + { + name: "only new peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}}, + args: args{height: 10}, + wantResult: "", + wantError: true, + }, + { + name: "only Removed peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}}, + args: args{height: 2}, + wantResult: "", + wantError: true, + }, + { + name: "one Ready shorter peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + args: args{height: 5}, + wantResult: "", + wantError: true, + }, + { + name: "one Ready equal peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}, + }, + args: args{height: 4}, + wantResult: "P1", + }, + { + name: "one Ready higher peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 6, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6}, + }, + args: args{height: 4}, + wantResult: "P1", + }, + { + name: "many Ready higher peers with different number of pending requests", + fields: scTestParams{ + height: 4, + peers: map[string]*scPeer{ + "P1": {height: 8, state: peerStateReady}, + "P2": {height: 9, state: peerStateReady}}, + allB: []int64{4, 5, 6, 7, 8, 9}, + pending: map[int64]p2p.ID{ + 4: "P1", 6: "P1", + 5: "P2", + }, + }, + args: args{height: 4}, + wantResult: "P2", + }, + { + name: "many Ready higher peers with same number of pending requests", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P2": {height: 20, state: peerStateReady}, + "P1": {height: 15, state: peerStateReady}, + "P3": {height: 15, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + pending: map[int64]p2p.ID{ + 1: "P1", 2: "P1", + 3: "P3", 4: "P3", + 5: "P2", 6: "P2", + }, + }, + args: args{height: 7}, + wantResult: "P1", + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + // selectPeer should not mutate the scheduler + wantSc := sc + res, err := sc.selectPeer(tt.args.height) + assert.Equal(t, tt.wantResult, res) + assert.Equal(t, tt.wantError, err != nil) + assert.Equal(t, wantSc, sc) + }) + } +} + +// makeScBlock makes an empty block. +func makeScBlock(height int64) *types.Block { + return &types.Block{Header: types.Header{Height: height}} +} + +// checkScResults checks scheduler handler test results +func checkScResults(t *testing.T, wantErr bool, err error, wantEvent Event, event Event) { + if (err != nil) != wantErr { + t.Errorf("error = %v, wantErr %v", err, wantErr) + return + } + switch wantEvent := wantEvent.(type) { + case scPeerError: + assert.Equal(t, wantEvent.peerID, event.(scPeerError).peerID) + assert.Equal(t, wantEvent.reason != nil, event.(scPeerError).reason != nil) + case scBlockReceived: + assert.Equal(t, wantEvent.peerID, event.(scBlockReceived).peerID) + assert.Equal(t, wantEvent.block, event.(scBlockReceived).block) + case scSchedulerFail: + assert.Equal(t, wantEvent.reason != nil, event.(scSchedulerFail).reason != nil) + default: + assert.Equal(t, wantEvent, event) + } +} + +func TestScHandleBlockResponse(t *testing.T) { + now := time.Now() + block6FromP1 := bcBlockResponse{ + time: now.Add(time.Millisecond), + peerID: p2p.ID("P1"), + height: 6, + size: 100, + block: makeScBlock(6), + } + + type args struct { + event bcBlockResponse + } + + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "empty scheduler", + fields: scTestParams{}, + args: args{event: block6FromP1}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + { + name: "block from removed peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 8, state: peerStateRemoved}}}, + args: args{event: block6FromP1}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + { + name: "block we haven't asked for", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}}, + args: args{event: block6FromP1}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + { + name: "block from wrong peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P2": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, + pending: map[int64]p2p.ID{6: "P2"}, + pendingTime: map[int64]time.Time{6: now}, + }, + args: args{event: block6FromP1}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + { + name: "block with bad timestamp", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, + pending: map[int64]p2p.ID{6: "P1"}, + pendingTime: map[int64]time.Time{6: now.Add(time.Second)}, + }, + args: args{event: block6FromP1}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + { + name: "good block, accept", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, + pending: map[int64]p2p.ID{6: "P1"}, + pendingTime: map[int64]time.Time{6: now}, + }, + args: args{event: block6FromP1}, + wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(6)}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleBlockResponse(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandleBlockProcessed(t *testing.T) { + now := time.Now() + processed6FromP1 := pcBlockProcessed{ + peerID: p2p.ID("P1"), + height: 6, + } + + type args struct { + event pcBlockProcessed + } + + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "empty scheduler", + fields: scTestParams{height: 6}, + args: args{event: processed6FromP1}, + wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, + }, + { + name: "processed block we don't have", + fields: scTestParams{ + initHeight: 5, + height: 6, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + pending: map[int64]p2p.ID{6: "P1"}, + pendingTime: map[int64]time.Time{6: now}, + }, + args: args{event: processed6FromP1}, + wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, + }, + { + name: "processed block ok, we processed all blocks", + fields: scTestParams{ + initHeight: 5, + height: 6, + peers: map[string]*scPeer{"P1": {height: 7, state: peerStateReady}}, + allB: []int64{6, 7}, + received: map[int64]p2p.ID{6: "P1", 7: "P1"}, + }, + args: args{event: processed6FromP1}, + wantEvent: scFinishedEv{}, + }, + { + name: "processed block ok, we still have blocks to process", + fields: scTestParams{ + initHeight: 5, + height: 6, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + pending: map[int64]p2p.ID{7: "P1", 8: "P1"}, + received: map[int64]p2p.ID{6: "P1"}, + }, + args: args{event: processed6FromP1}, + wantEvent: noOpEvent{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleBlockProcessed(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandleBlockVerificationFailure(t *testing.T) { + now := time.Now() + + type args struct { + event pcBlockVerificationFailure + } + + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "empty scheduler", + fields: scTestParams{}, + args: args{event: pcBlockVerificationFailure{height: 10, firstPeerID: "P1", secondPeerID: "P1"}}, + wantEvent: noOpEvent{}, + }, + { + name: "failed block we don't have, single peer is still removed", + fields: scTestParams{ + initHeight: 5, + height: 6, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + pending: map[int64]p2p.ID{6: "P1"}, + pendingTime: map[int64]time.Time{6: now}, + }, + args: args{event: pcBlockVerificationFailure{height: 10, firstPeerID: "P1", secondPeerID: "P1"}}, + wantEvent: scFinishedEv{}, + }, + { + name: "failed block we don't have, one of two peers are removed", + fields: scTestParams{ + initHeight: 5, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}, "P2": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + pending: map[int64]p2p.ID{6: "P1"}, + pendingTime: map[int64]time.Time{6: now}, + }, + args: args{event: pcBlockVerificationFailure{height: 10, firstPeerID: "P1", secondPeerID: "P1"}}, + wantEvent: noOpEvent{}, + }, + { + name: "failed block, all blocks are processed after removal", + fields: scTestParams{ + initHeight: 5, + height: 6, + peers: map[string]*scPeer{"P1": {height: 7, state: peerStateReady}}, + allB: []int64{6, 7}, + received: map[int64]p2p.ID{6: "P1", 7: "P1"}, + }, + args: args{event: pcBlockVerificationFailure{height: 7, firstPeerID: "P1", secondPeerID: "P1"}}, + wantEvent: scFinishedEv{}, + }, + { + name: "failed block, we still have blocks to process", + fields: scTestParams{ + initHeight: 4, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}, "P2": {height: 8, state: peerStateReady}}, + allB: []int64{5, 6, 7, 8}, + pending: map[int64]p2p.ID{7: "P1", 8: "P1"}, + received: map[int64]p2p.ID{5: "P1", 6: "P1"}, + }, + args: args{event: pcBlockVerificationFailure{height: 5, firstPeerID: "P1", secondPeerID: "P1"}}, + wantEvent: noOpEvent{}, + }, + { + name: "failed block, H+1 and H+2 delivered by different peers, we still have blocks to process", + fields: scTestParams{ + initHeight: 4, + peers: map[string]*scPeer{ + "P1": {height: 8, state: peerStateReady}, + "P2": {height: 8, state: peerStateReady}, + "P3": {height: 8, state: peerStateReady}, + }, + allB: []int64{5, 6, 7, 8}, + pending: map[int64]p2p.ID{7: "P1", 8: "P1"}, + received: map[int64]p2p.ID{5: "P1", 6: "P1"}, + }, + args: args{event: pcBlockVerificationFailure{height: 5, firstPeerID: "P1", secondPeerID: "P2"}}, + wantEvent: noOpEvent{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleBlockProcessError(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandleAddNewPeer(t *testing.T) { + addP1 := addNewPeer{ + peerID: p2p.ID("P1"), + } + type args struct { + event addNewPeer + } + + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "add P1 to empty scheduler", + fields: scTestParams{}, + args: args{event: addP1}, + wantEvent: noOpEvent{}, + }, + { + name: "add duplicate peer", + fields: scTestParams{ + height: 6, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + }, + args: args{event: addP1}, + wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, + }, + { + name: "add P1 to non empty scheduler", + fields: scTestParams{ + height: 6, + peers: map[string]*scPeer{"P2": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + }, + args: args{event: addP1}, + wantEvent: noOpEvent{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleAddNewPeer(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandlePeerError(t *testing.T) { + errP1 := peerError{ + peerID: p2p.ID("P1"), + } + type args struct { + event peerError + } + + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "no peers", + fields: scTestParams{}, + args: args{event: errP1}, + wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, + }, + { + name: "error finds no peer", + fields: scTestParams{ + height: 6, + peers: map[string]*scPeer{"P2": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + }, + args: args{event: errP1}, + wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, + }, + { + name: "error finds peer, only peer is removed", + fields: scTestParams{ + height: 6, + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}}, + allB: []int64{6, 7, 8}, + }, + args: args{event: errP1}, + wantEvent: scFinishedEv{}, + }, + { + name: "error finds peer, one of two peers are removed", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 8, state: peerStateReady}, "P2": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, + }, + args: args{event: errP1}, + wantEvent: noOpEvent{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handlePeerError(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandleTryPrunePeer(t *testing.T) { + now := time.Now() + + pruneEv := tryPrunePeer{ + time: now.Add(time.Second + time.Millisecond), + } + type args struct { + event tryPrunePeer + } + + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "no peers", + fields: scTestParams{}, + args: args{event: pruneEv}, + wantEvent: noOpEvent{}, + }, + { + name: "no prunable peers", + fields: scTestParams{ + minRecvRate: 100, + peers: map[string]*scPeer{ + // X - removed, active, fast + "P1": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 101}, + // X - ready, active, fast + "P2": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 101}, + // X - removed, active, equal + "P3": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 100}}, + peerTimeout: time.Second, + }, + args: args{event: pruneEv}, + wantEvent: noOpEvent{}, + }, + { + name: "mixed peers", + fields: scTestParams{ + minRecvRate: 100, + peers: map[string]*scPeer{ + // X - removed, active, fast + "P1": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 101, height: 5}, + // X - ready, active, fast + "P2": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 101, height: 5}, + // X - removed, active, equal + "P3": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 100, height: 5}, + // V - ready, inactive, equal + "P4": {state: peerStateReady, lastTouched: now, lastRate: 100, height: 7}, + // V - ready, inactive, slow + "P5": {state: peerStateReady, lastTouched: now, lastRate: 99, height: 7}, + // V - ready, active, slow + "P6": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 90, height: 7}, + }, + allB: []int64{1, 2, 3, 4, 5, 6, 7}, + peerTimeout: time.Second}, + args: args{event: pruneEv}, + wantEvent: scPeersPruned{peers: []p2p.ID{"P4", "P5", "P6"}}, + }, + { + name: "mixed peers, finish after pruning", + fields: scTestParams{ + minRecvRate: 100, + height: 6, + peers: map[string]*scPeer{ + // X - removed, active, fast + "P1": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 101, height: 5}, + // X - ready, active, fast + "P2": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 101, height: 5}, + // X - removed, active, equal + "P3": {state: peerStateRemoved, lastTouched: now.Add(time.Second), lastRate: 100, height: 5}, + // V - ready, inactive, equal + "P4": {state: peerStateReady, lastTouched: now, lastRate: 100, height: 7}, + // V - ready, inactive, slow + "P5": {state: peerStateReady, lastTouched: now, lastRate: 99, height: 7}, + // V - ready, active, slow + "P6": {state: peerStateReady, lastTouched: now.Add(time.Second), lastRate: 90, height: 7}, + }, + allB: []int64{6, 7}, + peerTimeout: time.Second}, + args: args{event: pruneEv}, + wantEvent: scFinishedEv{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleTryPrunePeer(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestHandleTrySchedule(t *testing.T) { + now := time.Now() + tryEv := trySchedule{ + time: now.Add(time.Second + time.Millisecond), + } + + type args struct { + event trySchedule + } + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "no peers", + fields: scTestParams{peers: map[string]*scPeer{}}, + args: args{event: tryEv}, + wantEvent: noOpEvent{}, + }, + { + name: "only new peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}}, + args: args{event: tryEv}, + wantEvent: noOpEvent{}, + }, + { + name: "only Removed peers", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 4, state: peerStateRemoved}}}, + args: args{event: tryEv}, + wantEvent: noOpEvent{}, + }, + { + name: "one Ready shorter peer", + fields: scTestParams{ + height: 6, + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}}, + args: args{event: tryEv}, + wantEvent: noOpEvent{}, + }, + { + name: "one Ready equal peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 4, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4}}, + args: args{event: tryEv}, + wantEvent: scBlockRequest{peerID: "P1", height: 1}, + }, + { + name: "many Ready higher peers with different number of pending requests", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady}, + "P2": {height: 5, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5}, + pending: map[int64]p2p.ID{ + 1: "P1", 2: "P1", + 3: "P2", + }, + }, + args: args{event: tryEv}, + wantEvent: scBlockRequest{peerID: "P2", height: 4}, + }, + + { + name: "many Ready higher peers with same number of pending requests", + fields: scTestParams{ + peers: map[string]*scPeer{ + "P2": {height: 8, state: peerStateReady}, + "P1": {height: 8, state: peerStateReady}, + "P3": {height: 8, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6, 7, 8}, + pending: map[int64]p2p.ID{ + 1: "P1", 2: "P1", + 3: "P3", 4: "P3", + 5: "P2", 6: "P2", + }, + }, + args: args{event: tryEv}, + wantEvent: scBlockRequest{peerID: "P1", height: 7}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleTrySchedule(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandleStatusResponse(t *testing.T) { + now := time.Now() + statusRespP1Ev := bcStatusResponse{ + time: now.Add(time.Second + time.Millisecond), + peerID: "P1", + height: 6, + } + + type args struct { + event bcStatusResponse + } + tests := []struct { + name string + fields scTestParams + args args + wantEvent Event + wantErr bool + }{ + { + name: "change height of non existing peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P2": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}, + }, + args: args{event: statusRespP1Ev}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + + { + name: "increase height of removed peer", + fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 2, state: peerStateRemoved}}}, + args: args{event: statusRespP1Ev}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + + { + name: "decrease height of single peer", + fields: scTestParams{ + height: 5, + peers: map[string]*scPeer{"P1": {height: 10, state: peerStateReady}}, + allB: []int64{5, 6, 7, 8, 9, 10}, + }, + args: args{event: statusRespP1Ev}, + wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")}, + }, + + { + name: "increase height of single peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 2, state: peerStateReady}}, + allB: []int64{1, 2}}, + args: args{event: statusRespP1Ev}, + wantEvent: noOpEvent{}, + }, + { + name: "noop height change of single peer", + fields: scTestParams{ + peers: map[string]*scPeer{"P1": {height: 6, state: peerStateReady}}, + allB: []int64{1, 2, 3, 4, 5, 6}}, + args: args{event: statusRespP1Ev}, + wantEvent: noOpEvent{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + sc := newTestScheduler(tt.fields) + event, err := sc.handleStatusResponse(tt.args.event) + checkScResults(t, tt.wantErr, err, tt.wantEvent, event) + }) + } +} + +func TestScHandle(t *testing.T) { + type unknownEv struct { + priorityNormal + } + + t0 := time.Now() + tick := make([]time.Time, 100) + for i := range tick { + tick[i] = t0.Add(time.Duration(i) * time.Millisecond) + } + + type args struct { + event Event + } + type scStep struct { + currentSc *scTestParams + args args + wantEvent Event + wantErr bool + wantSc *scTestParams + } + tests := []struct { + name string + steps []scStep + }{ + { + name: "unknown event", + steps: []scStep{ + { // add P1 + currentSc: &scTestParams{}, + args: args{event: unknownEv{}}, + wantEvent: scSchedulerFail{reason: fmt.Errorf("some error")}, + wantSc: &scTestParams{}, + }, + }, + }, + { + name: "single peer, sync 3 blocks", + steps: []scStep{ + { // add P1 + currentSc: &scTestParams{peers: map[string]*scPeer{}, height: 1}, + args: args{event: addNewPeer{peerID: "P1"}}, + wantEvent: noOpEvent{}, + wantSc: &scTestParams{peers: map[string]*scPeer{"P1": {height: -1, state: peerStateNew}}, height: 1}, + }, + { // set height of P1 + args: args{event: bcStatusResponse{peerID: "P1", time: tick[0], height: 3}}, + wantEvent: noOpEvent{}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, + allB: []int64{1, 2, 3}, + height: 1, + }, + }, + { // schedule block 1 + args: args{event: trySchedule{time: tick[1]}}, + wantEvent: scBlockRequest{peerID: "P1", height: 1}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, + allB: []int64{1, 2, 3}, + pending: map[int64]p2p.ID{1: "P1"}, + pendingTime: map[int64]time.Time{1: tick[1]}, + height: 1, + }, + }, + { // schedule block 2 + args: args{event: trySchedule{time: tick[2]}}, + wantEvent: scBlockRequest{peerID: "P1", height: 2}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, + allB: []int64{1, 2, 3}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1"}, + pendingTime: map[int64]time.Time{1: tick[1], 2: tick[2]}, + height: 1, + }, + }, + { // schedule block 3 + args: args{event: trySchedule{time: tick[3]}}, + wantEvent: scBlockRequest{peerID: "P1", height: 3}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady}}, + allB: []int64{1, 2, 3}, + pending: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1"}, + pendingTime: map[int64]time.Time{1: tick[1], 2: tick[2], 3: tick[3]}, + height: 1, + }, + }, + { // block response 1 + args: args{event: bcBlockResponse{peerID: "P1", height: 1, time: tick[4], size: 100, block: makeScBlock(1)}}, + wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(1)}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[4]}}, + allB: []int64{1, 2, 3}, + pending: map[int64]p2p.ID{2: "P1", 3: "P1"}, + pendingTime: map[int64]time.Time{2: tick[2], 3: tick[3]}, + received: map[int64]p2p.ID{1: "P1"}, + height: 1, + }, + }, + { // block response 2 + args: args{event: bcBlockResponse{peerID: "P1", height: 2, time: tick[5], size: 100, block: makeScBlock(2)}}, + wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(2)}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[5]}}, + allB: []int64{1, 2, 3}, + pending: map[int64]p2p.ID{3: "P1"}, + pendingTime: map[int64]time.Time{3: tick[3]}, + received: map[int64]p2p.ID{1: "P1", 2: "P1"}, + height: 1, + }, + }, + { // block response 3 + args: args{event: bcBlockResponse{peerID: "P1", height: 3, time: tick[6], size: 100, block: makeScBlock(3)}}, + wantEvent: scBlockReceived{peerID: "P1", block: makeScBlock(3)}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{1, 2, 3}, + received: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1"}, + height: 1, + }, + }, + { // processed block 1 + args: args{event: pcBlockProcessed{peerID: p2p.ID("P1"), height: 1}}, + wantEvent: noOpEvent{}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{2, 3}, + received: map[int64]p2p.ID{2: "P1", 3: "P1"}, + height: 2, + }, + }, + { // processed block 2 + args: args{event: pcBlockProcessed{peerID: p2p.ID("P1"), height: 2}}, + wantEvent: scFinishedEv{}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{3}, + received: map[int64]p2p.ID{3: "P1"}, + height: 3, + }, + }, + }, + }, + { + name: "block verification failure", + steps: []scStep{ + { // failure processing block 1 + currentSc: &scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateReady, lastTouched: tick[6]}, + "P2": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{1, 2, 3, 4}, + received: map[int64]p2p.ID{1: "P1", 2: "P1", 3: "P1"}, + height: 1, + }, + args: args{event: pcBlockVerificationFailure{height: 1, firstPeerID: "P1", secondPeerID: "P1"}}, + wantEvent: noOpEvent{}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{ + "P1": {height: 4, state: peerStateRemoved, lastTouched: tick[6]}, + "P2": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{1, 2, 3}, + received: map[int64]p2p.ID{}, + height: 1, + }, + }, + /* + { // processed block 2 + args: args{event: pcBlockProcessed{peerID: p2p.ID("P1"), height: 2}}, + wantEvent: scFinishedEv{}, + wantSc: &scTestParams{ + peers: map[string]*scPeer{"P1": {height: 3, state: peerStateReady, lastTouched: tick[6]}}, + allB: []int64{3}, + received: map[int64]p2p.ID{3: "P1"}, + height: 3, + }, + }, + + */ + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + var sc *scheduler + for i, step := range tt.steps { + // First step must always initialise the currentState as state. + if step.currentSc != nil { + sc = newTestScheduler(*step.currentSc) + } + if sc == nil { + panic("Bad (initial?) step") + } + + nextEvent, err := sc.handle(step.args.event) + assert.Equal(t, newTestScheduler(*step.wantSc), sc) + t.Logf("step %d(%v): %s", i, step.args.event, sc) + + checkScResults(t, step.wantErr, err, step.wantEvent, nextEvent) + + // Next step may use the wantedState as their currentState. + sc = newTestScheduler(*step.wantSc) + } + }) + } +}