Browse Source

blockchain: dismiss request channel delay (#3459)

Fixes #3457

The topic of the issue is that : write a BlockRequest int requestsCh channel will create an timer at the same time that stop the peer 15s later if no block have been received . But pop a BlockRequest from requestsCh and send it out may delay more than 15s later. So that the peer will be stopped for error("send nothing to us").
Extracting requestsCh into its own goroutine can make sure that every BlockRequest been handled timely.

Instead of the requestsCh handling, we should probably pull the didProcessCh handling in a separate go routine since this is the one "starving" the other channel handlers. I believe the way it is right now, we still have issues with high delays in errorsCh handling that might cause sending requests to invalid/ disconnected peers.
pull/3580/head
zjubfd 6 years ago
committed by Anton Kaliaev
parent
commit
439312b9c0
3 changed files with 33 additions and 27 deletions
  1. +31
    -24
      blockchain/reactor.go
  2. +1
    -0
      libs/common/throttle_timer_test.go
  3. +1
    -3
      libs/db/mem_batch.go

+ 31
- 24
blockchain/reactor.go View File

@ -228,32 +228,40 @@ func (bcR *BlockchainReactor) poolRoutine() {
didProcessCh := make(chan struct{}, 1)
FOR_LOOP:
for {
select {
case request := <-bcR.requestsCh:
peer := bcR.Switch.Peers().Get(request.PeerID)
if peer == nil {
continue FOR_LOOP // Peer has since been disconnected.
}
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockRequestMessage{request.Height})
queued := peer.TrySend(BlockchainChannel, msgBytes)
if !queued {
// We couldn't make the request, send-queue full.
// The pool handles timeouts, just let it go.
continue FOR_LOOP
}
go func() {
for {
select {
case <-bcR.Quit():
return
case <-bcR.pool.Quit():
return
case request := <-bcR.requestsCh:
peer := bcR.Switch.Peers().Get(request.PeerID)
if peer == nil {
continue
}
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockRequestMessage{request.Height})
queued := peer.TrySend(BlockchainChannel, msgBytes)
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
}
case err := <-bcR.errorsCh:
peer := bcR.Switch.Peers().Get(err.peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, err)
}
case err := <-bcR.errorsCh:
peer := bcR.Switch.Peers().Get(err.peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, err)
}
case <-statusUpdateTicker.C:
// ask for status updates
go bcR.BroadcastStatusRequest() // nolint: errcheck
case <-statusUpdateTicker.C:
// ask for status updates
go bcR.BroadcastStatusRequest() // nolint: errcheck
}
}
}()
FOR_LOOP:
for {
select {
case <-switchToConsensusTicker.C:
height, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
@ -262,7 +270,6 @@ FOR_LOOP:
if bcR.pool.IsCaughtUp() {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
bcR.pool.Stop()
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(state, blocksSynced)


+ 1
- 0
libs/common/throttle_timer_test.go View File

@ -6,6 +6,7 @@ import (
"time"
// make govet noshadow happy...
asrt "github.com/stretchr/testify/assert"
)


+ 1
- 3
libs/db/mem_batch.go View File

@ -1,8 +1,6 @@
package db
import (
"sync"
)
import "sync"
type atomicSetDeleter interface {
Mutex() *sync.Mutex


Loading…
Cancel
Save