|
|
@ -238,14 +238,13 @@ func (sc *scheduler) touchPeer(peerID p2p.ID, time time.Time) error { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (sc *scheduler) removePeer(peerID p2p.ID) error { |
|
|
|
func (sc *scheduler) removePeer(peerID p2p.ID) { |
|
|
|
peer, ok := sc.peers[peerID] |
|
|
|
if !ok { |
|
|
|
return nil |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
if peer.state == peerStateRemoved { |
|
|
|
return fmt.Errorf("tried to remove peer %s in peerStateRemoved", peerID) |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
for height, pendingPeerID := range sc.pendingBlocks { |
|
|
@ -279,8 +278,6 @@ func (sc *scheduler) removePeer(peerID p2p.ID) error { |
|
|
|
delete(sc.blockStates, h) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// check if the blockPool is running low and add new blocks in New state to be requested.
|
|
|
@ -309,16 +306,12 @@ func (sc *scheduler) setPeerRange(peerID p2p.ID, base int64, height int64) error |
|
|
|
} |
|
|
|
|
|
|
|
if height < peer.height { |
|
|
|
if err := sc.removePeer(peerID); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
sc.removePeer(peerID) |
|
|
|
return fmt.Errorf("cannot move peer height lower. from %d to %d", peer.height, height) |
|
|
|
} |
|
|
|
|
|
|
|
if base > height { |
|
|
|
if err := sc.removePeer(peerID); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
sc.removePeer(peerID) |
|
|
|
return fmt.Errorf("cannot set peer base higher than its height") |
|
|
|
} |
|
|
|
|
|
|
@ -372,15 +365,9 @@ func (sc *scheduler) setStateAtHeight(height int64, state blockState) { |
|
|
|
sc.blockStates[height] = state |
|
|
|
} |
|
|
|
|
|
|
|
// CONTRACT: peer exists and in Ready state.
|
|
|
|
func (sc *scheduler) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error { |
|
|
|
peer, ok := sc.peers[peerID] |
|
|
|
if !ok { |
|
|
|
return fmt.Errorf("received block from unknown peer %s", peerID) |
|
|
|
} |
|
|
|
|
|
|
|
if peer.state != peerStateReady { |
|
|
|
return fmt.Errorf("cannot receive blocks from not ready peer %s", peerID) |
|
|
|
} |
|
|
|
peer := sc.peers[peerID] |
|
|
|
|
|
|
|
if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID { |
|
|
|
return fmt.Errorf("received block %d from peer %s without being requested", height, peerID) |
|
|
@ -541,12 +528,13 @@ func (peers PeerByID) Swap(i, j int) { |
|
|
|
func (sc *scheduler) handleBlockResponse(event bcBlockResponse) (Event, error) { |
|
|
|
err := sc.touchPeer(event.peerID, event.time) |
|
|
|
if err != nil { |
|
|
|
return scPeerError{peerID: event.peerID, reason: err}, nil |
|
|
|
// peer does not exist OR not ready
|
|
|
|
return noOp, nil |
|
|
|
} |
|
|
|
|
|
|
|
err = sc.markReceived(event.peerID, event.block.Height, event.size, event.time) |
|
|
|
if err != nil { |
|
|
|
_ = sc.removePeer(event.peerID) |
|
|
|
sc.removePeer(event.peerID) |
|
|
|
return scPeerError{peerID: event.peerID, reason: err}, nil |
|
|
|
} |
|
|
|
|
|
|
@ -561,7 +549,7 @@ func (sc *scheduler) handleNoBlockResponse(event bcNoBlockResponse) (Event, erro |
|
|
|
} |
|
|
|
|
|
|
|
// The peer may have been just removed due to errors, low speed or timeouts.
|
|
|
|
_ = sc.removePeer(event.peerID) |
|
|
|
sc.removePeer(event.peerID) |
|
|
|
|
|
|
|
return scPeerError{peerID: event.peerID, |
|
|
|
reason: fmt.Errorf("peer %v with base %d height %d claims no block for %d", |
|
|
@ -588,13 +576,10 @@ func (sc *scheduler) handleBlockProcessed(event pcBlockProcessed) (Event, error) |
|
|
|
// Handles an error from the processor. The processor had already cleaned the blocks from
|
|
|
|
// the peers included in this event. Just attempt to remove the peers.
|
|
|
|
func (sc *scheduler) handleBlockProcessError(event pcBlockVerificationFailure) (Event, error) { |
|
|
|
if len(sc.peers) == 0 { |
|
|
|
return noOp, nil |
|
|
|
} |
|
|
|
// The peers may have been just removed due to errors, low speed or timeouts.
|
|
|
|
_ = sc.removePeer(event.firstPeerID) |
|
|
|
sc.removePeer(event.firstPeerID) |
|
|
|
if event.firstPeerID != event.secondPeerID { |
|
|
|
_ = sc.removePeer(event.secondPeerID) |
|
|
|
sc.removePeer(event.secondPeerID) |
|
|
|
} |
|
|
|
|
|
|
|
if sc.allBlocksProcessed() { |
|
|
@ -610,12 +595,8 @@ func (sc *scheduler) handleAddNewPeer(event bcAddNewPeer) (Event, error) { |
|
|
|
} |
|
|
|
|
|
|
|
func (sc *scheduler) handleRemovePeer(event bcRemovePeer) (Event, error) { |
|
|
|
err := sc.removePeer(event.peerID) |
|
|
|
if err != nil { |
|
|
|
// XXX - It is possible that the removePeer fails here for legitimate reasons
|
|
|
|
// for example if a peer timeout or error was handled just before this.
|
|
|
|
return scSchedulerFail{reason: err}, nil |
|
|
|
} |
|
|
|
sc.removePeer(event.peerID) |
|
|
|
|
|
|
|
if sc.allBlocksProcessed() { |
|
|
|
return scFinishedEv{reason: "removed peer"}, nil |
|
|
|
} |
|
|
@ -633,9 +614,7 @@ func (sc *scheduler) handleTryPrunePeer(event rTryPrunePeer) (Event, error) { |
|
|
|
// from that peer within sc.peerTimeout. Remove the peer. This is to ensure that a peer
|
|
|
|
// will be timed out even if it sends blocks at higher heights but prevents progress by
|
|
|
|
// not sending the block at current height.
|
|
|
|
if err := sc.removePeer(sc.pendingBlocks[sc.height]); err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
sc.removePeer(sc.pendingBlocks[sc.height]) |
|
|
|
} |
|
|
|
|
|
|
|
prunablePeers := sc.prunablePeers(sc.peerTimeout, sc.minRecvRate, event.time) |
|
|
@ -643,11 +622,7 @@ func (sc *scheduler) handleTryPrunePeer(event rTryPrunePeer) (Event, error) { |
|
|
|
return noOp, nil |
|
|
|
} |
|
|
|
for _, peerID := range prunablePeers { |
|
|
|
err := sc.removePeer(peerID) |
|
|
|
if err != nil { |
|
|
|
// Should never happen as prunablePeers() returns only existing peers in Ready state.
|
|
|
|
panic("scheduler data corruption") |
|
|
|
} |
|
|
|
sc.removePeer(peerID) |
|
|
|
} |
|
|
|
|
|
|
|
// If all blocks are processed we should finish.
|
|
|
@ -656,7 +631,6 @@ func (sc *scheduler) handleTryPrunePeer(event rTryPrunePeer) (Event, error) { |
|
|
|
} |
|
|
|
|
|
|
|
return scPeersPruned{peers: prunablePeers}, nil |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
func (sc *scheduler) handleResetState(event bcResetState) (Event, error) { |
|
|
|