diff --git a/go.mod b/go.mod index c795f1319..ec8a27270 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce golang.org/x/net v0.0.0-20211208012354-db4efeb81f4b golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - google.golang.org/grpc v1.44.0 + google.golang.org/grpc v1.45.0 gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect pgregory.net/rapid v0.4.7 ) diff --git a/go.sum b/go.sum index cd3e0e4f5..64f63a14b 100644 --- a/go.sum +++ b/go.sum @@ -1625,8 +1625,8 @@ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9K google.golang.org/grpc v1.40.1/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= -google.golang.org/grpc v1.44.0 h1:weqSxi/TMs1SqFRMHCtBgXRs8k3X39QIDEZ0pRcttUg= -google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= +google.golang.org/grpc v1.45.0 h1:NEpgUqV3Z+ZjkqMsxMg11IaDrXY4RY6CQukSGK0uI1M= +google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index f3d4a9e0a..4c905c660 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -168,7 +168,7 @@ func (pool *BlockPool) removeTimedoutPeers() { for _, peer := range pool.peers { // check if peer timed out if !peer.didTimeout && peer.numPending > 0 { - curRate := peer.recvMonitor.Status().CurRate + curRate := peer.recvMonitor.CurrentTransferRate() // curRate can be 0 on start if curRate != 0 && curRate < minRecvRate { err := errors.New("peer is not sending us data fast enough") diff --git a/internal/libs/flowrate/flowrate.go b/internal/libs/flowrate/flowrate.go index c2234669b..aaa54a22c 100644 --- a/internal/libs/flowrate/flowrate.go +++ b/internal/libs/flowrate/flowrate.go @@ -275,3 +275,15 @@ func (m *Monitor) waitNextSample(now time.Duration) time.Duration { } return now } + +// CurrentTransferRate returns the current transfer rate +func (m *Monitor) CurrentTransferRate() int64 { + m.mu.Lock() + defer m.mu.Unlock() + + if m.sLast > m.start && m.active { + return round(m.rEMA) + } + + return 0 +} diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index 693a7ce58..4cbca7f19 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -413,7 +413,7 @@ func (c *MConnection) sendSomePacketMsgs(ctx context.Context) bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. - c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true) + c.sendMonitor.Limit(c._maxPacketMsgSize, c.config.SendRate, true) // Now send some PacketMsgs. for i := 0; i < numBatchPacketMsgs; i++ { @@ -481,7 +481,7 @@ FOR_LOOP: } // Block until .recvMonitor says we can read. - c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true) + c.recvMonitor.Limit(c._maxPacketMsgSize, c.config.RecvRate, true) // Peek into bufConnReader for debugging /* diff --git a/libs/service/service.go b/libs/service/service.go index f7701633a..6221c7d92 100644 --- a/libs/service/service.go +++ b/libs/service/service.go @@ -14,6 +14,11 @@ var ( errAlreadyStopped = errors.New("already stopped") ) +var ( + _ Service = (*BaseService)(nil) + _ Service = (*NopService)(nil) +) + // Service defines a service that can be started, stopped, and reset. type Service interface { // Start is called to start the service, which should run until @@ -85,6 +90,12 @@ type BaseService struct { impl Implementation } +type NopService struct{} + +func (NopService) Start(_ context.Context) error { return nil } +func (NopService) IsRunning() bool { return true } +func (NopService) Wait() {} + // NewBaseService creates a new BaseService. func NewBaseService(logger log.Logger, name string, impl Implementation) *BaseService { return &BaseService{ diff --git a/node/node.go b/node/node.go index 4bb72f4f6..3329d9f9a 100644 --- a/node/node.go +++ b/node/node.go @@ -357,7 +357,7 @@ func makeNode( return nil, combineCloseError(err, makeCloser(closers)) } - var pexReactor service.Service + var pexReactor service.Service = service.NopService{} if cfg.P2P.PexReactor { pexReactor, err = pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx)) if err != nil {