* Add Schedule: + The schedule is a data structure used to determine the optimal schedule of requests to the optimal set of peers in order to perform fast-sync as fast and efficiently as possible. * Add some doc strings * fix golangci * Add Schedule: + The schedule is a data structure used to determine the optimal schedule of requests to the optimal set of peers in order to perform fast-sync as fast and efficiently as possible. * Add some doc strings * remove globals from testspull/3912/head
@ -0,0 +1,387 @@ | |||
// nolint:unused | |||
package v2 | |||
import ( | |||
"fmt" | |||
"math" | |||
"math/rand" | |||
"time" | |||
"github.com/tendermint/tendermint/p2p" | |||
) | |||
type Event interface{} | |||
type blockState int | |||
const ( | |||
blockStateUnknown blockState = iota | |||
blockStateNew | |||
blockStatePending | |||
blockStateReceived | |||
blockStateProcessed | |||
) | |||
func (e blockState) String() string { | |||
switch e { | |||
case blockStateUnknown: | |||
return "Unknown" | |||
case blockStateNew: | |||
return "New" | |||
case blockStatePending: | |||
return "Pending" | |||
case blockStateReceived: | |||
return "Received" | |||
case blockStateProcessed: | |||
return "Processed" | |||
default: | |||
return fmt.Sprintf("unknown blockState: %d", e) | |||
} | |||
} | |||
type peerState int | |||
const ( | |||
peerStateNew = iota | |||
peerStateReady | |||
peerStateRemoved | |||
) | |||
func (e peerState) String() string { | |||
switch e { | |||
case peerStateNew: | |||
return "New" | |||
case peerStateReady: | |||
return "Ready" | |||
case peerStateRemoved: | |||
return "Removed" | |||
default: | |||
return fmt.Sprintf("unknown peerState: %d", e) | |||
} | |||
} | |||
type scPeer struct { | |||
peerID p2p.ID | |||
state peerState | |||
height int64 | |||
lastTouched time.Time | |||
lastRate int64 | |||
} | |||
func newScPeer(peerID p2p.ID) *scPeer { | |||
return &scPeer{ | |||
peerID: peerID, | |||
state: peerStateNew, | |||
height: -1, | |||
lastTouched: time.Time{}, | |||
} | |||
} | |||
// The schedule is a composite data structure which allows a scheduler to keep | |||
// track of which blocks have been scheduled into which state. | |||
type schedule struct { | |||
initHeight int64 | |||
// a list of blocks in which blockState | |||
blockStates map[int64]blockState | |||
// a map of peerID to schedule specific peer struct `scPeer` used to keep | |||
// track of peer specific state | |||
peers map[p2p.ID]*scPeer | |||
// a map of heights to the peer we are waiting for a response from | |||
pendingBlocks map[int64]p2p.ID | |||
// the time at which a block was put in blockStatePending | |||
pendingTime map[int64]time.Time | |||
// the peerID of the peer which put the block in blockStateReceived | |||
receivedBlocks map[int64]p2p.ID | |||
} | |||
func newSchedule(initHeight int64) *schedule { | |||
sc := schedule{ | |||
initHeight: initHeight, | |||
blockStates: make(map[int64]blockState), | |||
peers: make(map[p2p.ID]*scPeer), | |||
pendingBlocks: make(map[int64]p2p.ID), | |||
pendingTime: make(map[int64]time.Time), | |||
receivedBlocks: make(map[int64]p2p.ID), | |||
} | |||
sc.setStateAtHeight(initHeight, blockStateNew) | |||
return &sc | |||
} | |||
func (sc *schedule) addPeer(peerID p2p.ID) error { | |||
if _, ok := sc.peers[peerID]; ok { | |||
return fmt.Errorf("Cannot add duplicate peer %s", peerID) | |||
} | |||
sc.peers[peerID] = newScPeer(peerID) | |||
return nil | |||
} | |||
func (sc *schedule) touchPeer(peerID p2p.ID, time time.Time) error { | |||
peer, ok := sc.peers[peerID] | |||
if !ok { | |||
return fmt.Errorf("Couldn't find peer %s", peerID) | |||
} | |||
if peer.state == peerStateRemoved { | |||
return fmt.Errorf("Tried to touch peer in peerStateRemoved") | |||
} | |||
peer.lastTouched = time | |||
return nil | |||
} | |||
func (sc *schedule) removePeer(peerID p2p.ID) error { | |||
peer, ok := sc.peers[peerID] | |||
if !ok { | |||
return fmt.Errorf("Couldn't find peer %s", peerID) | |||
} | |||
if peer.state == peerStateRemoved { | |||
return fmt.Errorf("Tried to remove peer %s in peerStateRemoved", peerID) | |||
} | |||
for height, pendingPeerID := range sc.pendingBlocks { | |||
if pendingPeerID == peerID { | |||
sc.setStateAtHeight(height, blockStateNew) | |||
delete(sc.pendingTime, height) | |||
delete(sc.pendingBlocks, height) | |||
} | |||
} | |||
for height, rcvPeerID := range sc.receivedBlocks { | |||
if rcvPeerID == peerID { | |||
sc.setStateAtHeight(height, blockStateNew) | |||
delete(sc.receivedBlocks, height) | |||
} | |||
} | |||
peer.state = peerStateRemoved | |||
return nil | |||
} | |||
func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error { | |||
peer, ok := sc.peers[peerID] | |||
if !ok { | |||
return fmt.Errorf("Can't find peer %s", peerID) | |||
} | |||
if peer.state == peerStateRemoved { | |||
return fmt.Errorf("Cannot set peer height for a peer in peerStateRemoved") | |||
} | |||
if height < peer.height { | |||
return fmt.Errorf("Cannot move peer height lower. from %d to %d", peer.height, height) | |||
} | |||
peer.height = height | |||
peer.state = peerStateReady | |||
for i := sc.minHeight(); i <= height; i++ { | |||
if sc.getStateAtHeight(i) == blockStateUnknown { | |||
sc.setStateAtHeight(i, blockStateNew) | |||
} | |||
} | |||
return nil | |||
} | |||
func (sc *schedule) getStateAtHeight(height int64) blockState { | |||
if height < sc.initHeight { | |||
return blockStateProcessed | |||
} else if state, ok := sc.blockStates[height]; ok { | |||
return state | |||
} else { | |||
return blockStateUnknown | |||
} | |||
} | |||
func (sc *schedule) getPeersAtHeight(height int64) []*scPeer { | |||
peers := []*scPeer{} | |||
for _, peer := range sc.peers { | |||
if peer.height >= height { | |||
peers = append(peers, peer) | |||
} | |||
} | |||
return peers | |||
} | |||
func (sc *schedule) peersInactiveSince(duration time.Duration, now time.Time) []p2p.ID { | |||
peers := []p2p.ID{} | |||
for _, peer := range sc.peers { | |||
if now.Sub(peer.lastTouched) > duration { | |||
peers = append(peers, peer.peerID) | |||
} | |||
} | |||
return peers | |||
} | |||
func (sc *schedule) peersSlowerThan(minSpeed int64) []p2p.ID { | |||
peers := []p2p.ID{} | |||
for _, peer := range sc.peers { | |||
if peer.lastRate < minSpeed { | |||
peers = append(peers, peer.peerID) | |||
} | |||
} | |||
return peers | |||
} | |||
func (sc *schedule) setStateAtHeight(height int64, state blockState) { | |||
sc.blockStates[height] = state | |||
} | |||
func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error { | |||
peer, ok := sc.peers[peerID] | |||
if !ok { | |||
return fmt.Errorf("Can't find peer %s", peerID) | |||
} | |||
if peer.state == peerStateRemoved { | |||
return fmt.Errorf("Cannot receive blocks from removed peer %s", peerID) | |||
} | |||
if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID { | |||
return fmt.Errorf("Received block %d from peer %s without being requested", height, peerID) | |||
} | |||
pendingTime, ok := sc.pendingTime[height] | |||
if !ok || now.Sub(pendingTime) <= 0 { | |||
return fmt.Errorf("Clock error. Block %d received at %s but requested at %s", | |||
height, pendingTime, now) | |||
} | |||
peer.lastRate = size / int64(now.Sub(pendingTime).Seconds()) | |||
sc.setStateAtHeight(height, blockStateReceived) | |||
delete(sc.pendingBlocks, height) | |||
delete(sc.pendingTime, height) | |||
sc.receivedBlocks[height] = peerID | |||
return nil | |||
} | |||
func (sc *schedule) markPending(peerID p2p.ID, height int64, time time.Time) error { | |||
peer, ok := sc.peers[peerID] | |||
if !ok { | |||
return fmt.Errorf("Can't find peer %s", peerID) | |||
} | |||
state := sc.getStateAtHeight(height) | |||
if state != blockStateNew { | |||
return fmt.Errorf("Block %d should be in blockStateNew but was %s", height, state) | |||
} | |||
if peer.state != peerStateReady { | |||
return fmt.Errorf("Cannot schedule %d from %s in %s", height, peerID, peer.state) | |||
} | |||
if height > peer.height { | |||
return fmt.Errorf("Cannot request height %d from peer %s who is at height %d", | |||
height, peerID, peer.height) | |||
} | |||
sc.setStateAtHeight(height, blockStatePending) | |||
sc.pendingBlocks[height] = peerID | |||
// XXX: to make this more accurate we can introduce a message from | |||
// the IO routine which indicates the time the request was put on the wire | |||
sc.pendingTime[height] = time | |||
return nil | |||
} | |||
func (sc *schedule) markProcessed(height int64) error { | |||
state := sc.getStateAtHeight(height) | |||
if state != blockStateReceived { | |||
return fmt.Errorf("Can't mark height %d received from block state %s", height, state) | |||
} | |||
delete(sc.receivedBlocks, height) | |||
sc.setStateAtHeight(height, blockStateProcessed) | |||
return nil | |||
} | |||
// allBlockProcessed returns true if all blocks are in blockStateProcessed and | |||
// determines if the schedule has been completed | |||
func (sc *schedule) allBlocksProcessed() bool { | |||
for _, state := range sc.blockStates { | |||
if state != blockStateProcessed { | |||
return false | |||
} | |||
} | |||
return true | |||
} | |||
// highest block | state == blockStateNew | |||
func (sc *schedule) maxHeight() int64 { | |||
var max int64 = 0 | |||
for height, state := range sc.blockStates { | |||
if state == blockStateNew && height > max { | |||
max = height | |||
} | |||
} | |||
return max | |||
} | |||
// lowest block | state == blockStateNew | |||
func (sc *schedule) minHeight() int64 { | |||
var min int64 = math.MaxInt64 | |||
for height, state := range sc.blockStates { | |||
if state == blockStateNew && height < min { | |||
min = height | |||
} | |||
} | |||
return min | |||
} | |||
func (sc *schedule) pendingFrom(peerID p2p.ID) []int64 { | |||
heights := []int64{} | |||
for height, pendingPeerID := range sc.pendingBlocks { | |||
if pendingPeerID == peerID { | |||
heights = append(heights, height) | |||
} | |||
} | |||
return heights | |||
} | |||
func (sc *schedule) selectPeer(peers []*scPeer) *scPeer { | |||
// FIXME: properPeerSelector | |||
s := rand.NewSource(time.Now().Unix()) | |||
r := rand.New(s) | |||
return peers[r.Intn(len(peers))] | |||
} | |||
// XXX: this duplicates the logic of peersInactiveSince and peersSlowerThan | |||
func (sc *schedule) prunablePeers(peerTimout time.Duration, minRecvRate int64, now time.Time) []p2p.ID { | |||
prunable := []p2p.ID{} | |||
for peerID, peer := range sc.peers { | |||
if now.Sub(peer.lastTouched) > peerTimout || peer.lastRate < minRecvRate { | |||
prunable = append(prunable, peerID) | |||
} | |||
} | |||
return prunable | |||
} | |||
func (sc *schedule) numBlockInState(targetState blockState) uint32 { | |||
var num uint32 = 0 | |||
for _, state := range sc.blockStates { | |||
if state == targetState { | |||
num++ | |||
} | |||
} | |||
return num | |||
} |
@ -0,0 +1,272 @@ | |||
package v2 | |||
import ( | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/assert" | |||
"github.com/tendermint/tendermint/p2p" | |||
) | |||
func TestScheduleInit(t *testing.T) { | |||
var ( | |||
initHeight int64 = 5 | |||
sc = newSchedule(initHeight) | |||
) | |||
assert.Equal(t, blockStateNew, sc.getStateAtHeight(initHeight)) | |||
assert.Equal(t, blockStateProcessed, sc.getStateAtHeight(initHeight-1)) | |||
assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(initHeight+1)) | |||
} | |||
func TestAddPeer(t *testing.T) { | |||
var ( | |||
initHeight int64 = 5 | |||
peerID p2p.ID = "1" | |||
peerIDTwo p2p.ID = "2" | |||
sc = newSchedule(initHeight) | |||
) | |||
assert.Nil(t, sc.addPeer(peerID)) | |||
assert.Nil(t, sc.addPeer(peerIDTwo)) | |||
assert.Error(t, sc.addPeer(peerID)) | |||
} | |||
func TestTouchPeer(t *testing.T) { | |||
var ( | |||
initHeight int64 = 5 | |||
peerID p2p.ID = "1" | |||
sc = newSchedule(initHeight) | |||
now = time.Now() | |||
) | |||
assert.Error(t, sc.touchPeer(peerID, now), | |||
"Touching an unknown peer should return errPeerNotFound") | |||
assert.Nil(t, sc.addPeer(peerID), | |||
"Adding a peer should return no error") | |||
assert.Nil(t, sc.touchPeer(peerID, now), | |||
"Touching a peer should return no error") | |||
threshold := 10 * time.Second | |||
assert.Empty(t, sc.peersInactiveSince(threshold, now.Add(9*time.Second)), | |||
"Expected no peers to have been touched over 9 seconds") | |||
assert.Containsf(t, sc.peersInactiveSince(threshold, now.Add(11*time.Second)), peerID, | |||
"Expected one %s to have been touched over 10 seconds ago", peerID) | |||
} | |||
func TestPeerHeight(t *testing.T) { | |||
var ( | |||
initHeight int64 = 5 | |||
peerID p2p.ID = "1" | |||
peerHeight int64 = 20 | |||
sc = newSchedule(initHeight) | |||
) | |||
assert.NoError(t, sc.addPeer(peerID), | |||
"Adding a peer should return no error") | |||
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight)) | |||
for i := initHeight; i <= peerHeight; i++ { | |||
assert.Equal(t, sc.getStateAtHeight(i), blockStateNew, | |||
"Expected all blocks to be in blockStateNew") | |||
peerIDs := []p2p.ID{} | |||
for _, peer := range sc.getPeersAtHeight(i) { | |||
peerIDs = append(peerIDs, peer.peerID) | |||
} | |||
assert.Containsf(t, peerIDs, peerID, | |||
"Expected %s to have block %d", peerID, i) | |||
} | |||
} | |||
func TestTransitionPending(t *testing.T) { | |||
var ( | |||
initHeight int64 = 5 | |||
peerID p2p.ID = "1" | |||
peerIDTwo p2p.ID = "2" | |||
peerHeight int64 = 20 | |||
sc = newSchedule(initHeight) | |||
now = time.Now() | |||
) | |||
assert.NoError(t, sc.addPeer(peerID), | |||
"Adding a peer should return no error") | |||
assert.Nil(t, sc.addPeer(peerIDTwo), | |||
"Adding a peer should return no error") | |||
assert.Error(t, sc.markPending(peerID, peerHeight, now), | |||
"Expected scheduling a block from a peer in peerStateNew to fail") | |||
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight), | |||
"Expected setPeerHeight to return no error") | |||
assert.NoError(t, sc.setPeerHeight(peerIDTwo, peerHeight), | |||
"Expected setPeerHeight to return no error") | |||
assert.NoError(t, sc.markPending(peerID, peerHeight, now), | |||
"Expected markingPending new block to succeed") | |||
assert.Error(t, sc.markPending(peerIDTwo, peerHeight, now), | |||
"Expected markingPending by a second peer to fail") | |||
assert.Equal(t, blockStatePending, sc.getStateAtHeight(peerHeight), | |||
"Expected the block to to be in blockStatePending") | |||
assert.NoError(t, sc.removePeer(peerID), | |||
"Expected removePeer to return no error") | |||
assert.Equal(t, blockStateNew, sc.getStateAtHeight(peerHeight), | |||
"Expected the block to to be in blockStateNew") | |||
assert.Error(t, sc.markPending(peerID, peerHeight, now), | |||
"Expected markingPending removed peer to fail") | |||
assert.NoError(t, sc.markPending(peerIDTwo, peerHeight, now), | |||
"Expected markingPending on a ready peer to succeed") | |||
assert.Equal(t, blockStatePending, sc.getStateAtHeight(peerHeight), | |||
"Expected the block to to be in blockStatePending") | |||
} | |||
func TestTransitionReceived(t *testing.T) { | |||
var ( | |||
initHeight int64 = 5 | |||
peerID p2p.ID = "1" | |||
peerIDTwo p2p.ID = "2" | |||
peerHeight int64 = 20 | |||
blockSize int64 = 1024 | |||
sc = newSchedule(initHeight) | |||
now = time.Now() | |||
receivedAt = now.Add(1 * time.Second) | |||
) | |||
assert.NoError(t, sc.addPeer(peerID), | |||
"Expected adding peer %s to succeed", peerID) | |||
assert.NoError(t, sc.addPeer(peerIDTwo), | |||
"Expected adding peer %s to succeed", peerIDTwo) | |||
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight), | |||
"Expected setPeerHeight to return no error") | |||
assert.NoErrorf(t, sc.setPeerHeight(peerIDTwo, peerHeight), | |||
"Expected setPeerHeight on %s to %d to succeed", peerIDTwo, peerHeight) | |||
assert.NoError(t, sc.markPending(peerID, initHeight, now), | |||
"Expected markingPending new block to succeed") | |||
assert.Error(t, sc.markReceived(peerIDTwo, initHeight, blockSize, receivedAt), | |||
"Expected marking markReceived from a non requesting peer to fail") | |||
assert.NoError(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt), | |||
"Expected marking markReceived on a pending block to succeed") | |||
assert.Error(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt), | |||
"Expected marking markReceived on received block to fail") | |||
assert.Equalf(t, blockStateReceived, sc.getStateAtHeight(initHeight), | |||
"Expected block %d to be blockHeightReceived", initHeight) | |||
assert.NoErrorf(t, sc.removePeer(peerID), | |||
"Expected removePeer removing %s to succeed", peerID) | |||
assert.Equalf(t, blockStateNew, sc.getStateAtHeight(initHeight), | |||
"Expected block %d to be blockStateNew", initHeight) | |||
assert.NoErrorf(t, sc.markPending(peerIDTwo, initHeight, now), | |||
"Expected markingPending %d from %s to succeed", initHeight, peerIDTwo) | |||
assert.NoErrorf(t, sc.markReceived(peerIDTwo, initHeight, blockSize, receivedAt), | |||
"Expected marking markReceived %d from %s to succeed", initHeight, peerIDTwo) | |||
assert.Equalf(t, blockStateReceived, sc.getStateAtHeight(initHeight), | |||
"Expected block %d to be blockStateReceived", initHeight) | |||
} | |||
func TestTransitionProcessed(t *testing.T) { | |||
var ( | |||
initHeight int64 = 5 | |||
peerID p2p.ID = "1" | |||
peerHeight int64 = 20 | |||
blockSize int64 = 1024 | |||
sc = newSchedule(initHeight) | |||
now = time.Now() | |||
receivedAt = now.Add(1 * time.Second) | |||
) | |||
assert.NoError(t, sc.addPeer(peerID), | |||
"Expected adding peer %s to succeed", peerID) | |||
assert.NoErrorf(t, sc.setPeerHeight(peerID, peerHeight), | |||
"Expected setPeerHeight on %s to %d to succeed", peerID, peerHeight) | |||
assert.NoError(t, sc.markPending(peerID, initHeight, now), | |||
"Expected markingPending new block to succeed") | |||
assert.NoError(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt), | |||
"Expected marking markReceived on a pending block to succeed") | |||
assert.Error(t, sc.markProcessed(initHeight+1), | |||
"Expected marking %d as processed to fail", initHeight+1) | |||
assert.NoError(t, sc.markProcessed(initHeight), | |||
"Expected marking %d as processed to succeed", initHeight) | |||
assert.Equalf(t, blockStateProcessed, sc.getStateAtHeight(initHeight), | |||
"Expected block %d to be blockStateProcessed", initHeight) | |||
assert.NoError(t, sc.removePeer(peerID), | |||
"Expected removing peer %s to succeed", peerID) | |||
assert.Equalf(t, blockStateProcessed, sc.getStateAtHeight(initHeight), | |||
"Expected block %d to be blockStateProcessed", initHeight) | |||
} | |||
func TestMinMaxHeight(t *testing.T) { | |||
var ( | |||
initHeight int64 = 5 | |||
peerID p2p.ID = "1" | |||
peerHeight int64 = 20 | |||
sc = newSchedule(initHeight) | |||
now = time.Now() | |||
) | |||
assert.Equal(t, initHeight, sc.minHeight(), | |||
"Expected min height to be the initialized height") | |||
assert.Equal(t, initHeight, sc.maxHeight(), | |||
"Expected max height to be the initialized height") | |||
assert.NoError(t, sc.addPeer(peerID), | |||
"Adding a peer should return no error") | |||
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight), | |||
"Expected setPeerHeight to return no error") | |||
assert.Equal(t, peerHeight, sc.maxHeight(), | |||
"Expected max height to increase to peerHeight") | |||
assert.Nil(t, sc.markPending(peerID, initHeight, now.Add(1*time.Second)), | |||
"Expected marking initHeight as pending to return no error") | |||
assert.Equal(t, initHeight+1, sc.minHeight(), | |||
"Expected marking initHeight as pending to move minHeight forward") | |||
} | |||
func TestPeersSlowerThan(t *testing.T) { | |||
var ( | |||
initHeight int64 = 5 | |||
peerID p2p.ID = "1" | |||
peerHeight int64 = 20 | |||
blockSize int64 = 1024 | |||
sc = newSchedule(initHeight) | |||
now = time.Now() | |||
receivedAt = now.Add(1 * time.Second) | |||
) | |||
assert.NoError(t, sc.addPeer(peerID), | |||
"Adding a peer should return no error") | |||
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight), | |||
"Expected setPeerHeight to return no error") | |||
assert.NoError(t, sc.markPending(peerID, peerHeight, now), | |||
"Expected markingPending on to return no error") | |||
assert.NoError(t, sc.markReceived(peerID, peerHeight, blockSize, receivedAt), | |||
"Expected markingPending on to return no error") | |||
assert.Empty(t, sc.peersSlowerThan(blockSize-1), | |||
"expected no peers to be slower than blockSize-1 bytes/sec") | |||
assert.Containsf(t, sc.peersSlowerThan(blockSize+1), peerID, | |||
"expected %s to be slower than blockSize+1 bytes/sec", peerID) | |||
} |