Browse Source

blockchain/v2: make the removal of an already removed peer a noop (#5553)

also, since multiple StopPeerForError calls may be executed in parallel,
only execute StopPeerForError once

Closes #5541
pull/5620/head
Anton Kaliaev 4 years ago
parent
commit
25fafb30b5
4 changed files with 26 additions and 50 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +17
    -43
      blockchain/v2/scheduler.go
  3. +4
    -7
      blockchain/v2/scheduler_test.go
  4. +4
    -0
      p2p/switch.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -31,6 +31,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [blockchain/v2] \#5530 Fix "processed height 4541 but expected height 4540" panic (@melekes)
- [consensus/wal] Fix WAL autorepair by opening target WAL in read/write mode (@erikgrinaker)
- [block] \#5567 Fix MaxCommitSigBytes (@cmwaters)
- [blockchain/v2] \#5553 Make the removal of an already removed peer a noop (@melekes)
- [evidence] \#5574 Fix bug where node sends committed evidence to peer (@cmwaters)
- [privval] \#5583 Make `Vote`, `Proposal` & `PubKey` non-nullable in Responses (@marbar3778)
- [evidence] \#5610 Make it possible for abci evidence to be formed from tm evidence (@cmwaters)

+ 17
- 43
blockchain/v2/scheduler.go View File

@ -238,14 +238,13 @@ func (sc *scheduler) touchPeer(peerID p2p.ID, time time.Time) error {
return nil
}
func (sc *scheduler) removePeer(peerID p2p.ID) error {
func (sc *scheduler) removePeer(peerID p2p.ID) {
peer, ok := sc.peers[peerID]
if !ok {
return nil
return
}
if peer.state == peerStateRemoved {
return fmt.Errorf("tried to remove peer %s in peerStateRemoved", peerID)
return
}
for height, pendingPeerID := range sc.pendingBlocks {
@ -279,8 +278,6 @@ func (sc *scheduler) removePeer(peerID p2p.ID) error {
delete(sc.blockStates, h)
}
}
return nil
}
// check if the blockPool is running low and add new blocks in New state to be requested.
@ -309,16 +306,12 @@ func (sc *scheduler) setPeerRange(peerID p2p.ID, base int64, height int64) error
}
if height < peer.height {
if err := sc.removePeer(peerID); err != nil {
return err
}
sc.removePeer(peerID)
return fmt.Errorf("cannot move peer height lower. from %d to %d", peer.height, height)
}
if base > height {
if err := sc.removePeer(peerID); err != nil {
return err
}
sc.removePeer(peerID)
return fmt.Errorf("cannot set peer base higher than its height")
}
@ -372,15 +365,9 @@ func (sc *scheduler) setStateAtHeight(height int64, state blockState) {
sc.blockStates[height] = state
}
// CONTRACT: peer exists and in Ready state.
func (sc *scheduler) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error {
peer, ok := sc.peers[peerID]
if !ok {
return fmt.Errorf("received block from unknown peer %s", peerID)
}
if peer.state != peerStateReady {
return fmt.Errorf("cannot receive blocks from not ready peer %s", peerID)
}
peer := sc.peers[peerID]
if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID {
return fmt.Errorf("received block %d from peer %s without being requested", height, peerID)
@ -541,12 +528,13 @@ func (peers PeerByID) Swap(i, j int) {
func (sc *scheduler) handleBlockResponse(event bcBlockResponse) (Event, error) {
err := sc.touchPeer(event.peerID, event.time)
if err != nil {
return scPeerError{peerID: event.peerID, reason: err}, nil
// peer does not exist OR not ready
return noOp, nil
}
err = sc.markReceived(event.peerID, event.block.Height, event.size, event.time)
if err != nil {
_ = sc.removePeer(event.peerID)
sc.removePeer(event.peerID)
return scPeerError{peerID: event.peerID, reason: err}, nil
}
@ -561,7 +549,7 @@ func (sc *scheduler) handleNoBlockResponse(event bcNoBlockResponse) (Event, erro
}
// The peer may have been just removed due to errors, low speed or timeouts.
_ = sc.removePeer(event.peerID)
sc.removePeer(event.peerID)
return scPeerError{peerID: event.peerID,
reason: fmt.Errorf("peer %v with base %d height %d claims no block for %d",
@ -588,13 +576,10 @@ func (sc *scheduler) handleBlockProcessed(event pcBlockProcessed) (Event, error)
// Handles an error from the processor. The processor had already cleaned the blocks from
// the peers included in this event. Just attempt to remove the peers.
func (sc *scheduler) handleBlockProcessError(event pcBlockVerificationFailure) (Event, error) {
if len(sc.peers) == 0 {
return noOp, nil
}
// The peers may have been just removed due to errors, low speed or timeouts.
_ = sc.removePeer(event.firstPeerID)
sc.removePeer(event.firstPeerID)
if event.firstPeerID != event.secondPeerID {
_ = sc.removePeer(event.secondPeerID)
sc.removePeer(event.secondPeerID)
}
if sc.allBlocksProcessed() {
@ -610,12 +595,8 @@ func (sc *scheduler) handleAddNewPeer(event bcAddNewPeer) (Event, error) {
}
func (sc *scheduler) handleRemovePeer(event bcRemovePeer) (Event, error) {
err := sc.removePeer(event.peerID)
if err != nil {
// XXX - It is possible that the removePeer fails here for legitimate reasons
// for example if a peer timeout or error was handled just before this.
return scSchedulerFail{reason: err}, nil
}
sc.removePeer(event.peerID)
if sc.allBlocksProcessed() {
return scFinishedEv{reason: "removed peer"}, nil
}
@ -633,9 +614,7 @@ func (sc *scheduler) handleTryPrunePeer(event rTryPrunePeer) (Event, error) {
// from that peer within sc.peerTimeout. Remove the peer. This is to ensure that a peer
// will be timed out even if it sends blocks at higher heights but prevents progress by
// not sending the block at current height.
if err := sc.removePeer(sc.pendingBlocks[sc.height]); err != nil {
return nil, err
}
sc.removePeer(sc.pendingBlocks[sc.height])
}
prunablePeers := sc.prunablePeers(sc.peerTimeout, sc.minRecvRate, event.time)
@ -643,11 +622,7 @@ func (sc *scheduler) handleTryPrunePeer(event rTryPrunePeer) (Event, error) {
return noOp, nil
}
for _, peerID := range prunablePeers {
err := sc.removePeer(peerID)
if err != nil {
// Should never happen as prunablePeers() returns only existing peers in Ready state.
panic("scheduler data corruption")
}
sc.removePeer(peerID)
}
// If all blocks are processed we should finish.
@ -656,7 +631,6 @@ func (sc *scheduler) handleTryPrunePeer(event rTryPrunePeer) (Event, error) {
}
return scPeersPruned{peers: prunablePeers}, nil
}
func (sc *scheduler) handleResetState(event bcResetState) (Event, error) {


+ 4
- 7
blockchain/v2/scheduler_test.go View File

@ -418,7 +418,6 @@ func TestScRemovePeer(t *testing.T) {
"P1": {height: 10, state: peerStateRemoved},
"P2": {height: 11, state: peerStateReady}},
allB: []int64{8, 9, 10, 11}},
wantErr: true,
},
{
name: "remove Ready peer with blocks requested",
@ -492,9 +491,7 @@ func TestScRemovePeer(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
sc := newTestScheduler(tt.fields)
if err := sc.removePeer(tt.args.peerID); (err != nil) != tt.wantErr {
t.Errorf("removePeer() wantErr %v, error = %v", tt.wantErr, err)
}
sc.removePeer(tt.args.peerID)
wantSc := newTestScheduler(tt.wantFields)
assert.Equal(t, wantSc, sc, "wanted peers %v, got %v", wantSc.peers, sc.peers)
})
@ -1413,13 +1410,13 @@ func TestScHandleBlockResponse(t *testing.T) {
name: "empty scheduler",
fields: scTestParams{},
args: args{event: block6FromP1},
wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")},
wantEvent: noOpEvent{},
},
{
name: "block from removed peer",
fields: scTestParams{peers: map[string]*scPeer{"P1": {height: 8, state: peerStateRemoved}}},
args: args{event: block6FromP1},
wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")},
wantEvent: noOpEvent{},
},
{
name: "block we haven't asked for",
@ -1438,7 +1435,7 @@ func TestScHandleBlockResponse(t *testing.T) {
pendingTime: map[int64]time.Time{6: now},
},
args: args{event: block6FromP1},
wantEvent: scPeerError{peerID: "P1", reason: fmt.Errorf("some error")},
wantEvent: noOpEvent{},
},
{
name: "block with bad timestamp",


+ 4
- 0
p2p/switch.go View File

@ -322,6 +322,10 @@ func (sw *Switch) Peers() IPeerSet {
// If the peer is persistent, it will attempt to reconnect.
// TODO: make record depending on reason.
func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
if !peer.IsRunning() {
return
}
sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason)
sw.stopAndRemovePeer(peer, reason)


Loading…
Cancel
Save