diff --git a/Godeps/_workspace/src/code.google.com/p/mxk/go1/flowcontrol/flowcontrol.go b/Godeps/_workspace/src/code.google.com/p/mxk/go1/flowcontrol/flowcontrol.go index 40db5d89e..5c3bd3509 100644 --- a/Godeps/_workspace/src/code.google.com/p/mxk/go1/flowcontrol/flowcontrol.go +++ b/Godeps/_workspace/src/code.google.com/p/mxk/go1/flowcontrol/flowcontrol.go @@ -72,6 +72,13 @@ func (m *Monitor) Update(n int) int { return n } +// Hack to set the current rEMA. +func (m *Monitor) SetREMA(rEMA float64) { + m.mu.Lock() + m.rEMA = rEMA + m.mu.Unlock() +} + // IO is a convenience method intended to wrap io.Reader and io.Writer method // execution. It calls m.Update(n) and then returns (n, err) unmodified. func (m *Monitor) IO(n int, err error) (int, error) { diff --git a/blockchain/pool.go b/blockchain/pool.go index 4ddba39f2..66d1934c5 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -82,9 +82,13 @@ func (pool *BlockPool) makeRequestsRoutine() { if numPending >= maxPendingRequests { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) + // check for timed out peers + pool.removeTimedoutPeers() } else if len(pool.requests) >= maxTotalRequests { // sleep for a bit. time.Sleep(requestIntervalMS * time.Millisecond) + // check for timed out peers + pool.removeTimedoutPeers() } else { // request for more blocks. pool.makeNextRequest() @@ -92,6 +96,22 @@ func (pool *BlockPool) makeRequestsRoutine() { } } +func (pool *BlockPool) removeTimedoutPeers() { + for _, peer := range pool.peers { + if !peer.didTimeout && peer.numPending > 0 { + curRate := peer.recvMonitor.Status().CurRate + // XXX remove curRate != 0 + if curRate != 0 && curRate < minRecvRate { + pool.sendTimeout(peer.id) + peer.didTimeout = true + } + } + if peer.didTimeout { + pool.removePeer(peer.id) + } + } +} + func (pool *BlockPool) GetStatus() (height int, numPending int32) { pool.mtx.Lock() // Lock defer pool.mtx.Unlock() @@ -228,6 +248,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer { if peer.isBad() { pool.removePeer(peer.id) continue + } else { } if peer.numPending >= maxPendingRequestsPerPeer { continue @@ -309,7 +330,7 @@ func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer { func (bpp *bpPeer) resetMonitor() { bpp.recvMonitor = flow.New(time.Second, time.Second*40) var initialValue = float64(minRecvRate) * math.E - bpp.recvMonitor.Update(int(initialValue)) + bpp.recvMonitor.SetREMA(initialValue) } func (bpp *bpPeer) resetTimeout() { @@ -339,20 +360,12 @@ func (bpp *bpPeer) decrPending(recvSize int) { } func (bpp *bpPeer) onTimeout() { + bpp.pool.sendTimeout(bpp.id) bpp.didTimeout = true } func (bpp *bpPeer) isBad() bool { - if bpp.didTimeout { - bpp.pool.sendTimeout(bpp.id) - return true - } - if bpp.numPending == 0 { - return false - } else { - bpp.pool.sendTimeout(bpp.id) - return bpp.recvMonitor.Status().CurRate < minRecvRate - } + return bpp.didTimeout } //------------------------------------- @@ -426,7 +439,6 @@ func (bpr *bpRequester) redo() { func (bpr *bpRequester) requestRoutine() { OUTER_LOOP: for { - // Pick a peer to send request to. var peer *bpPeer = nil PICK_PEER_LOOP: diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index a5b30e4db..e22c127f5 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -25,8 +25,8 @@ func makePeers(numPeers int, minHeight, maxHeight int) map[string]testPeer { } func TestBasic(t *testing.T) { - peers := makePeers(10, 0, 1000) start := 42 + peers := makePeers(10, start+1, 1000) timeoutsCh := make(chan string, 100) requestsCh := make(chan BlockRequest, 100) pool := NewBlockPool(start, requestsCh, timeoutsCh) @@ -77,13 +77,17 @@ func TestBasic(t *testing.T) { } func TestTimeout(t *testing.T) { - peers := makePeers(10, 0, 1000) start := 42 + peers := makePeers(10, start+1, 1000) timeoutsCh := make(chan string, 100) requestsCh := make(chan BlockRequest, 100) pool := NewBlockPool(start, requestsCh, timeoutsCh) pool.Start() + for _, peer := range peers { + log.Info("Peer", peer.id) + } + // Introduce each peer. go func() { for _, peer := range peers {