From a554005136e73d113e1040054b0ba7d0d9bee639 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Thu, 25 Mar 2021 16:58:46 -0400 Subject: [PATCH] p2p: revised router message scheduling (#6126) --- blockchain/v0/reactor.go | 2 + consensus/reactor.go | 8 ++ evidence/reactor.go | 2 + mempool/reactor.go | 2 + mempool/reactor_test.go | 21 ++- node/node.go | 46 ++++-- p2p/conn/connection.go | 15 +- p2p/metrics.go | 71 +++++++++- p2p/p2ptest/network.go | 11 +- p2p/pex/pex_reactor.go | 2 + p2p/pqueue.go | 277 ++++++++++++++++++++++++++++++++++++ p2p/queue.go | 21 ++- p2p/router.go | 191 +++++++++++++++++++++---- p2p/router_init_test.go | 62 ++++++++ p2p/router_test.go | 87 ++++++++++-- p2p/wdrr_queue.go | 283 +++++++++++++++++++++++++++++++++++++ p2p/wdrr_queue_test.go | 208 +++++++++++++++++++++++++++ statesync/reactor.go | 4 + test/e2e/docker/Dockerfile | 2 + test/e2e/runner/load.go | 17 ++- test/e2e/runner/main.go | 21 ++- test/e2e/runner/setup.go | 12 ++ 22 files changed, 1277 insertions(+), 88 deletions(-) create mode 100644 p2p/pqueue.go create mode 100644 p2p/router_init_test.go create mode 100644 p2p/wdrr_queue.go create mode 100644 p2p/wdrr_queue_test.go diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index af79fe563..57d8272ab 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -34,6 +34,8 @@ var ( SendQueueCapacity: 1000, RecvBufferCapacity: 50 * 4096, RecvMessageCapacity: bc.MaxMsgSize, + + MaxSendBytes: 100, }, }, } diff --git a/consensus/reactor.go b/consensus/reactor.go index 9e3551d03..8185f4202 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -36,6 +36,8 @@ var ( Priority: 6, SendQueueCapacity: 100, RecvMessageCapacity: maxMsgSize, + + MaxSendBytes: 12000, }, }, DataChannel: { @@ -49,6 +51,8 @@ var ( SendQueueCapacity: 100, RecvBufferCapacity: 50 * 4096, RecvMessageCapacity: maxMsgSize, + + MaxSendBytes: 40000, }, }, VoteChannel: { @@ -59,6 +63,8 @@ var ( SendQueueCapacity: 100, RecvBufferCapacity: 100 * 100, RecvMessageCapacity: maxMsgSize, + + MaxSendBytes: 150, }, }, VoteSetBitsChannel: { @@ -69,6 +75,8 @@ var ( SendQueueCapacity: 2, RecvBufferCapacity: 1024, RecvMessageCapacity: maxMsgSize, + + MaxSendBytes: 50, }, }, } diff --git a/evidence/reactor.go b/evidence/reactor.go index aa3db318e..bc83f5127 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -31,6 +31,8 @@ var ( ID: byte(EvidenceChannel), Priority: 6, RecvMessageCapacity: maxMsgSize, + + MaxSendBytes: 400, }, }, } diff --git a/mempool/reactor.go b/mempool/reactor.go index c725a3027..dc35ac927 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -116,6 +116,8 @@ func GetChannelShims(config *cfg.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe ID: byte(MempoolChannel), Priority: 5, RecvMessageCapacity: batchMsg.Size(), + + MaxSendBytes: 5000, }, }, } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 51ac2d79d..d1d41209e 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -110,8 +110,8 @@ func (rts *reactorTestSuite) assertMempoolChannelsDrained(t *testing.T) { require.False(t, r.IsRunning(), "reactor %s did not stop", id) } - for id, mch := range rts.mempoolChnnels { - require.Empty(t, mch.Out, "checking channel %q", id) + for _, mch := range rts.mempoolChnnels { + require.Empty(t, mch.Out, "checking channel %q (len=%d)", mch.ID, len(mch.Out)) } } @@ -314,7 +314,7 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) { // we're creating a single node network, but not starting the // network. - rts := setup(t, config.Mempool, 1, 0) + rts := setup(t, config.Mempool, 1, maxActiveIDs+1) nodeID := rts.nodes[0] @@ -336,6 +336,21 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) { } } + require.Eventually( + t, + func() bool { + for _, mch := range rts.mempoolChnnels { + if len(mch.Out) > 0 { + return false + } + } + + return true + }, + time.Minute, + 10*time.Millisecond, + ) + rts.assertMempoolChannelsDrained(t) } diff --git a/node/node.go b/node/node.go index 32374ac8c..2c3a713f0 100644 --- a/node/node.go +++ b/node/node.go @@ -54,12 +54,19 @@ import ( "github.com/tendermint/tendermint/version" ) -var useLegacyP2P = true +var ( + useLegacyP2P = true + p2pRouterQueueType string +) func init() { if v := os.Getenv("TM_LEGACY_P2P"); len(v) > 0 { useLegacyP2P, _ = strconv.ParseBool(v) } + + if v := os.Getenv("TM_P2P_QUEUE"); len(v) > 0 { + p2pRouterQueueType = v + } } // DBContext specifies config information for loading a new DB. @@ -628,15 +635,26 @@ func createPeerManager(config *cfg.Config, p2pLogger log.Logger, nodeID p2p.Node func createRouter( p2pLogger log.Logger, + p2pMetrics *p2p.Metrics, nodeInfo p2p.NodeInfo, privKey crypto.PrivKey, peerManager *p2p.PeerManager, transport p2p.Transport, ) (*p2p.Router, error) { - return p2p.NewRouter(p2pLogger, nodeInfo, privKey, peerManager, []p2p.Transport{transport}, p2p.RouterOptions{}) + + return p2p.NewRouter( + p2pLogger, + p2pMetrics, + nodeInfo, + privKey, + peerManager, + []p2p.Transport{transport}, + p2p.RouterOptions{QueueType: p2pRouterQueueType}, + ) } -func createSwitch(config *cfg.Config, +func createSwitch( + config *cfg.Config, transport p2p.Transport, p2pMetrics *p2p.Metrics, mempoolReactor *p2p.ReactorShim, @@ -781,7 +799,7 @@ func createPEXReactorV2( router *p2p.Router, ) (*pex.ReactorV2, error) { - channel, err := router.OpenChannel(p2p.ChannelID(pex.PexChannel), &protop2p.PexMessage{}, 0) + channel, err := router.OpenChannel(p2p.ChannelID(pex.PexChannel), &protop2p.PexMessage{}, 4096) if err != nil { return nil, err } @@ -897,7 +915,7 @@ func NewSeedNode(config *cfg.Config, return nil, fmt.Errorf("failed to create peer manager: %w", err) } - router, err := createRouter(p2pLogger, nodeInfo, nodeKey.PrivKey, peerManager, transport) + router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey, peerManager, transport) if err != nil { return nil, fmt.Errorf("failed to create router: %w", err) } @@ -1057,12 +1075,13 @@ func NewNode(config *cfg.Config, return nil, fmt.Errorf("failed to create peer manager: %w", err) } - router, err := createRouter(p2pLogger, nodeInfo, nodeKey.PrivKey, peerManager, transport) + csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) + + router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey, peerManager, transport) if err != nil { return nil, fmt.Errorf("failed to create router: %w", err) } - csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) mpReactorShim, mpReactor, mempool := createMempoolReactor( config, proxyApp, state, memplMetrics, peerManager, router, logger, ) @@ -1148,11 +1167,18 @@ func NewNode(config *cfg.Config, config.StateSync.TempDir, ) - // Setup Transport and Switch. + router.AddChannelDescriptors(mpReactorShim.GetChannels()) + router.AddChannelDescriptors(bcReactorForSwitch.GetChannels()) + router.AddChannelDescriptors(csReactorShim.GetChannels()) + router.AddChannelDescriptors(evReactorShim.GetChannels()) + router.AddChannelDescriptors(stateSyncReactorShim.GetChannels()) + + // setup Transport and Switch sw := createSwitch( config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, ) + err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) if err != nil { return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err) @@ -1191,6 +1217,8 @@ func NewNode(config *cfg.Config, if err != nil { return nil, err } + + router.AddChannelDescriptors(pexReactor.GetChannels()) } if config.RPC.PprofListenAddress != "" { @@ -1940,7 +1968,7 @@ func makeChannelsFromShims( channels := map[p2p.ChannelID]*p2p.Channel{} for chID, chShim := range chShims { - ch, err := router.OpenChannel(chID, chShim.MsgType, 0) + ch, err := router.OpenChannel(chID, chShim.MsgType, chShim.Descriptor.RecvBufferCapacity) if err != nil { panic(fmt.Sprintf("failed to open channel %v: %v", chID, err)) } diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 10892564a..8062805dc 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -718,11 +718,20 @@ func (c *MConnection) Status() ConnectionStatus { //----------------------------------------------------------------------------- type ChannelDescriptor struct { - ID byte - Priority int + ID byte + Priority int + + // TODO: Remove once p2p refactor is complete. SendQueueCapacity int - RecvBufferCapacity int RecvMessageCapacity int + + // RecvBufferCapacity defines the max buffer size of inbound messages for a + // given p2p Channel queue. + RecvBufferCapacity int + + // MaxSendBytes defines the maximum number of bytes that can be sent at any + // given moment from a Channel to a peer. + MaxSendBytes uint } func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) { diff --git a/p2p/metrics.go b/p2p/metrics.go index 675dd9c7c..c7d199cd7 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -25,11 +25,33 @@ type Metrics struct { PeerPendingSendBytes metrics.Gauge // Number of transactions submitted by each peer. NumTxs metrics.Gauge + + // RouterPeerQueueRecv defines the time taken to read off of a peer's queue + // before sending on the connection. + RouterPeerQueueRecv metrics.Histogram + + // RouterPeerQueueSend defines the time taken to send on a peer's queue which + // will later be read and sent on the connection (see RouterPeerQueueRecv). + RouterPeerQueueSend metrics.Histogram + + // RouterChannelQueueSend defines the time taken to send on a p2p channel's + // queue which will later be consued by the corresponding reactor/service. + RouterChannelQueueSend metrics.Histogram + + // PeerQueueDroppedMsgs defines the number of messages dropped from a peer's + // queue for a specific flow (i.e. Channel). + PeerQueueDroppedMsgs metrics.Counter + + // PeerQueueMsgSize defines the average size of messages sent over a peer's + // queue for a specific flow (i.e. Channel). + PeerQueueMsgSize metrics.Gauge } // PrometheusMetrics returns Metrics build using Prometheus client library. // Optionally, labels can be provided along with their values ("foo", // "fooValue"). +// +// nolint: lll func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { labels := []string{} for i := 0; i < len(labelsAndValues); i += 2 { @@ -66,16 +88,55 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "num_txs", Help: "Number of transactions submitted by each peer.", }, append(labels, "peer_id")).With(labelsAndValues...), + RouterPeerQueueRecv: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "router_peer_queue_recv", + Help: "The time taken to read off of a peer's queue before sending on the connection.", + }, labels).With(labelsAndValues...), + + RouterPeerQueueSend: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "router_peer_queue_send", + Help: "The time taken to send on a peer's queue which will later be read and sent on the connection (see RouterPeerQueueRecv).", + }, labels).With(labelsAndValues...), + + RouterChannelQueueSend: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "router_channel_queue_send", + Help: "The time taken to send on a p2p channel's queue which will later be consued by the corresponding reactor/service.", + }, labels).With(labelsAndValues...), + + PeerQueueDroppedMsgs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "router_channel_queue_dropped_msgs", + Help: "The number of messages dropped from a peer's queue for a specific p2p Channel.", + }, append(labels, "ch_id")).With(labelsAndValues...), + + PeerQueueMsgSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "router_channel_queue_msg_size", + Help: "The size of messages sent over a peer's queue for a specific p2p Channel.", + }, append(labels, "ch_id")).With(labelsAndValues...), } } // NopMetrics returns no-op Metrics. func NopMetrics() *Metrics { return &Metrics{ - Peers: discard.NewGauge(), - PeerReceiveBytesTotal: discard.NewCounter(), - PeerSendBytesTotal: discard.NewCounter(), - PeerPendingSendBytes: discard.NewGauge(), - NumTxs: discard.NewGauge(), + Peers: discard.NewGauge(), + PeerReceiveBytesTotal: discard.NewCounter(), + PeerSendBytesTotal: discard.NewCounter(), + PeerPendingSendBytes: discard.NewGauge(), + NumTxs: discard.NewGauge(), + RouterPeerQueueRecv: discard.NewHistogram(), + RouterPeerQueueSend: discard.NewHistogram(), + RouterChannelQueueSend: discard.NewHistogram(), + PeerQueueDroppedMsgs: discard.NewCounter(), + PeerQueueMsgSize: discard.NewGauge(), } } diff --git a/p2p/p2ptest/network.go b/p2p/p2ptest/network.go index 6493933ca..74faadb36 100644 --- a/p2p/p2ptest/network.go +++ b/p2p/p2ptest/network.go @@ -233,8 +233,15 @@ func (n *Network) MakeNode(t *testing.T) *Node { }) require.NoError(t, err) - router, err := p2p.NewRouter(n.logger, nodeInfo, privKey, peerManager, - []p2p.Transport{transport}, p2p.RouterOptions{}) + router, err := p2p.NewRouter( + n.logger, + p2p.NopMetrics(), + nodeInfo, + privKey, + peerManager, + []p2p.Transport{transport}, + p2p.RouterOptions{}, + ) require.NoError(t, err) require.NoError(t, router.Start()) diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 092bcb531..5ea75c96c 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -184,6 +184,8 @@ func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { Priority: 1, SendQueueCapacity: 10, RecvMessageCapacity: maxMsgSize, + + MaxSendBytes: 200, }, } } diff --git a/p2p/pqueue.go b/p2p/pqueue.go new file mode 100644 index 000000000..d97d98375 --- /dev/null +++ b/p2p/pqueue.go @@ -0,0 +1,277 @@ +package p2p + +import ( + "container/heap" + "sort" + "strconv" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/tendermint/tendermint/libs/log" + tmsync "github.com/tendermint/tendermint/libs/sync" +) + +// pqEnvelope defines a wrapper around an Envelope with priority to be inserted +// into a priority queue used for Envelope scheduling. +type pqEnvelope struct { + envelope Envelope + priority uint + size uint + timestamp time.Time + + index int +} + +// priorityQueue defines a type alias for a priority queue implementation. +type priorityQueue []*pqEnvelope + +func (pq priorityQueue) get(i int) *pqEnvelope { return pq[i] } +func (pq priorityQueue) Len() int { return len(pq) } + +func (pq priorityQueue) Less(i, j int) bool { + // if both elements have the same priority, prioritize based on most recent + if pq[i].priority == pq[j].priority { + return pq[i].timestamp.After(pq[j].timestamp) + } + + // otherwise, pick the pqEnvelope with the higher priority + return pq[i].priority > pq[j].priority +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *priorityQueue) Push(x interface{}) { + n := len(*pq) + pqEnv := x.(*pqEnvelope) + pqEnv.index = n + *pq = append(*pq, pqEnv) +} + +func (pq *priorityQueue) Pop() interface{} { + old := *pq + n := len(old) + pqEnv := old[n-1] + old[n-1] = nil + pqEnv.index = -1 + *pq = old[:n-1] + return pqEnv +} + +// Assert the priority queue scheduler implements the queue interface at +// compile-time. +var _ queue = (*pqScheduler)(nil) + +type pqScheduler struct { + logger log.Logger + metrics *Metrics + size uint + sizes map[uint]uint // cumulative priority sizes + pq *priorityQueue + chDescs []ChannelDescriptor + capacity uint + chPriorities map[ChannelID]uint + + enqueueCh chan Envelope + dequeueCh chan Envelope + closer *tmsync.Closer + done *tmsync.Closer +} + +func newPQScheduler( + logger log.Logger, + m *Metrics, + chDescs []ChannelDescriptor, + enqueueBuf, dequeueBuf, capacity uint, +) *pqScheduler { + + // copy each ChannelDescriptor and sort them by ascending channel priority + chDescsCopy := make([]ChannelDescriptor, len(chDescs)) + copy(chDescsCopy, chDescs) + sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority < chDescsCopy[j].Priority }) + + var ( + chPriorities = make(map[ChannelID]uint) + sizes = make(map[uint]uint) + ) + + for _, chDesc := range chDescsCopy { + chID := ChannelID(chDesc.ID) + chPriorities[chID] = uint(chDesc.Priority) + sizes[uint(chDesc.Priority)] = 0 + } + + pq := make(priorityQueue, 0) + heap.Init(&pq) + + return &pqScheduler{ + logger: logger.With("router", "scheduler"), + metrics: m, + chDescs: chDescsCopy, + capacity: capacity, + chPriorities: chPriorities, + pq: &pq, + sizes: sizes, + enqueueCh: make(chan Envelope, enqueueBuf), + dequeueCh: make(chan Envelope, dequeueBuf), + closer: tmsync.NewCloser(), + done: tmsync.NewCloser(), + } +} + +func (s *pqScheduler) enqueue() chan<- Envelope { + return s.enqueueCh +} + +func (s *pqScheduler) dequeue() <-chan Envelope { + return s.dequeueCh +} + +func (s *pqScheduler) close() { + s.closer.Close() + <-s.done.Done() +} + +func (s *pqScheduler) closed() <-chan struct{} { + return s.closer.Done() +} + +// start starts non-blocking process that starts the priority queue scheduler. +func (s *pqScheduler) start() { + go s.process() +} + +// process starts a block process where we listen for Envelopes to enqueue. If +// there is sufficient capacity, it will be enqueued into the priority queue, +// otherwise, we attempt to dequeue enough elements from the priority queue to +// make room for the incoming Envelope by dropping lower priority elements. If +// there isn't sufficient capacity at lower priorities for the incoming Envelope, +// it is dropped. +// +// After we attempt to enqueue the incoming Envelope, if the priority queue is +// non-empty, we pop the top Envelope and send it on the dequeueCh. +func (s *pqScheduler) process() { + defer s.done.Close() + + for { + select { + case e := <-s.enqueueCh: + chIDStr := strconv.Itoa(int(e.channelID)) + pqEnv := &pqEnvelope{ + envelope: e, + size: uint(proto.Size(e.Message)), + priority: s.chPriorities[e.channelID], + timestamp: time.Now().UTC(), + } + + // enqueue + + // Check if we have sufficient capacity to simply enqueue the incoming + // Envelope. + if s.size+pqEnv.size <= s.capacity { + // enqueue the incoming Envelope + s.push(pqEnv) + } else { + // There is not sufficient capacity to simply enqueue the incoming + // Envelope. So we have to attempt to make room for it by dropping lower + // priority Envelopes or drop the incoming Envelope otherwise. + + // The cumulative size of all enqueue envelopes at the incoming envelope's + // priority or lower. + total := s.sizes[pqEnv.priority] + + if total >= pqEnv.size { + // There is room for the incoming Envelope, so we drop as many lower + // priority Envelopes as we need to. + var ( + canEnqueue bool + tmpSize = s.size + i = s.pq.Len() - 1 + ) + + // Drop lower priority Envelopes until sufficient capacity exists for + // the incoming Envelope + for i >= 0 && !canEnqueue { + pqEnvTmp := s.pq.get(i) + + if pqEnvTmp.priority < pqEnv.priority { + if tmpSize+pqEnv.size <= s.capacity { + canEnqueue = true + } else { + pqEnvTmpChIDStr := strconv.Itoa(int(pqEnvTmp.envelope.channelID)) + s.metrics.PeerQueueDroppedMsgs.With("ch_id", pqEnvTmpChIDStr).Add(1) + s.logger.Debug( + "dropped envelope", + "ch_id", pqEnvTmpChIDStr, + "priority", pqEnvTmp.priority, + "msg_size", pqEnvTmp.size, + "capacity", s.capacity, + ) + + // dequeue/drop from the priority queue + heap.Remove(s.pq, pqEnvTmp.index) + + // update the size tracker + tmpSize -= pqEnvTmp.size + + // start from the end again + i = s.pq.Len() - 1 + } + } else { + i-- + } + } + + // enqueue the incoming Envelope + s.push(pqEnv) + } else { + // There is not sufficient capacity to drop lower priority Envelopes, + // so we drop the incoming Envelope. + s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1) + s.logger.Debug( + "dropped envelope", + "ch_id", chIDStr, + "priority", pqEnv.priority, + "msg_size", pqEnv.size, + "capacity", s.capacity, + ) + } + } + + // dequeue + + for s.pq.Len() > 0 { + pqEnv = heap.Pop(s.pq).(*pqEnvelope) + s.size -= pqEnv.size + + // deduct the Envelope size from all the relevant cumulative sizes + for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ { + s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size + } + + s.dequeueCh <- pqEnv.envelope + } + + case <-s.closer.Done(): + return + } + } +} + +func (s *pqScheduler) push(pqEnv *pqEnvelope) { + chIDStr := strconv.Itoa(int(pqEnv.envelope.channelID)) + + // enqueue the incoming Envelope + heap.Push(s.pq, pqEnv) + s.size += pqEnv.size + s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(pqEnv.size)) + + // Update the cumulative sizes by adding the Envelope's size to every + // priority less than or equal to it. + for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ { + s.sizes[uint(s.chDescs[i].Priority)] += pqEnv.size + } +} diff --git a/p2p/queue.go b/p2p/queue.go index c225f2889..38e6b47b2 100644 --- a/p2p/queue.go +++ b/p2p/queue.go @@ -1,6 +1,8 @@ package p2p -import "sync" +import ( + tmsync "github.com/tendermint/tendermint/libs/sync" +) // queue does QoS scheduling for Envelopes, enqueueing and dequeueing according // to some policy. Queues are used at contention points, i.e.: @@ -26,17 +28,14 @@ type queue interface { // fifoQueue is a simple unbuffered lossless queue that passes messages through // in the order they were received, and blocks until message is received. type fifoQueue struct { - queueCh chan Envelope - closeCh chan struct{} - closeOnce sync.Once + queueCh chan Envelope + closer *tmsync.Closer } -var _ queue = (*fifoQueue)(nil) - -func newFIFOQueue(size int) *fifoQueue { +func newFIFOQueue(size int) queue { return &fifoQueue{ queueCh: make(chan Envelope, size), - closeCh: make(chan struct{}), + closer: tmsync.NewCloser(), } } @@ -49,11 +48,9 @@ func (q *fifoQueue) dequeue() <-chan Envelope { } func (q *fifoQueue) close() { - q.closeOnce.Do(func() { - close(q.closeCh) - }) + q.closer.Close() } func (q *fifoQueue) closed() <-chan struct{} { - return q.closeCh + return q.closer.Done() } diff --git a/p2p/router.go b/p2p/router.go index ce8e9ede6..b7ebf7570 100644 --- a/p2p/router.go +++ b/p2p/router.go @@ -15,6 +15,8 @@ import ( "github.com/tendermint/tendermint/libs/service" ) +const queueBufferDefault = 4096 + // ChannelID is an arbitrary channel ID. type ChannelID uint16 @@ -124,10 +126,29 @@ type RouterOptions struct { // HandshakeTimeout is the timeout for handshaking with a peer. 0 means // no timeout. HandshakeTimeout time.Duration + + // QueueType must be "wdrr" (Weighed Deficit Round Robin), + // "priority", or FIFO. Defaults to FIFO. + QueueType string } +const ( + queueTypeFifo = "fifo" + queueTypePriority = "priority" + queueTypeWDRR = "wdrr" +) + // Validate validates router options. func (o *RouterOptions) Validate() error { + switch o.QueueType { + case "": + o.QueueType = queueTypeFifo + case queueTypeFifo, queueTypeWDRR, queueTypePriority: + // pass + default: + return fmt.Errorf("queue type %q is not supported", o.QueueType) + } + return nil } @@ -174,22 +195,25 @@ type Router struct { *service.BaseService logger log.Logger + metrics *Metrics options RouterOptions nodeInfo NodeInfo privKey crypto.PrivKey peerManager *PeerManager + chDescs []ChannelDescriptor transports []Transport protocolTransports map[Protocol]Transport stopCh chan struct{} // signals Router shutdown - peerMtx sync.RWMutex - peerQueues map[NodeID]queue + peerMtx sync.RWMutex + peerQueues map[NodeID]queue // outbound messages per peer for all channels + queueFactory func(int) queue // FIXME: We don't strictly need to use a mutex for this if we seal the // channels on router start. This depends on whether we want to allow // dynamic channels in the future. channelMtx sync.RWMutex - channelQueues map[ChannelID]queue + channelQueues map[ChannelID]queue // inbound messages from all peers to a single channel channelMessages map[ChannelID]proto.Message } @@ -198,20 +222,24 @@ type Router struct { // stops. func NewRouter( logger log.Logger, + metrics *Metrics, nodeInfo NodeInfo, privKey crypto.PrivKey, peerManager *PeerManager, transports []Transport, options RouterOptions, ) (*Router, error) { + if err := options.Validate(); err != nil { return nil, err } router := &Router{ logger: logger, + metrics: metrics, nodeInfo: nodeInfo, privKey: privKey, + chDescs: make([]ChannelDescriptor, 0), transports: transports, protocolTransports: map[Protocol]Transport{}, peerManager: peerManager, @@ -221,8 +249,16 @@ func NewRouter( channelMessages: map[ChannelID]proto.Message{}, peerQueues: map[NodeID]queue{}, } + router.BaseService = service.NewBaseService(logger, "router", router) + qf, err := router.createQueueFactory() + if err != nil { + return nil, err + } + + router.queueFactory = qf + for _, transport := range transports { for _, protocol := range transport.Protocols() { if _, ok := router.protocolTransports[protocol]; !ok { @@ -234,6 +270,46 @@ func NewRouter( return router, nil } +func (r *Router) createQueueFactory() (func(int) queue, error) { + switch r.options.QueueType { + case queueTypeFifo: + return newFIFOQueue, nil + + case queueTypePriority: + return func(size int) queue { + if size%2 != 0 { + size++ + } + + q := newPQScheduler(r.logger, r.metrics, r.chDescs, uint(size)/2, uint(size)/2, defaultCapacity) + q.start() + return q + }, nil + + case queueTypeWDRR: + return func(size int) queue { + if size%2 != 0 { + size++ + } + + q := newWDRRScheduler(r.logger, r.metrics, r.chDescs, uint(size)/2, uint(size)/2, defaultCapacity) + q.start() + return q + }, nil + + default: + return nil, fmt.Errorf("cannot construct queue of type %q", r.options.QueueType) + } +} + +// AddChannelDescriptors adds a set of ChannelDescriptors to the reactor. Note, +// this should be called before the router is started and any connections are made. +func (r *Router) AddChannelDescriptors(chDescs []*ChannelDescriptor) { + for _, chDesc := range chDescs { + r.chDescs = append(r.chDescs, *chDesc) + } +} + // OpenChannel opens a new channel for the given message type. The caller must // close the channel when done, before stopping the Router. messageType is the // type of message passed through the channel (used for unmarshaling), which can @@ -241,7 +317,18 @@ func NewRouter( // wrapper message. The caller may provide a size to make the channel buffered, // which internally makes the inbound, outbound, and error channel buffered. func (r *Router) OpenChannel(id ChannelID, messageType proto.Message, size int) (*Channel, error) { - queue := newFIFOQueue(size) + if size == 0 { + size = queueBufferDefault + } + + r.channelMtx.Lock() + defer r.channelMtx.Unlock() + + if _, ok := r.channelQueues[id]; ok { + return nil, fmt.Errorf("channel %v already exists", id) + } + + queue := r.queueFactory(size) outCh := make(chan Envelope, size) errCh := make(chan PeerError, size) channel := NewChannel(id, messageType, queue.dequeue(), outCh, errCh) @@ -251,12 +338,6 @@ func (r *Router) OpenChannel(id ChannelID, messageType proto.Message, size int) wrapper = w } - r.channelMtx.Lock() - defer r.channelMtx.Unlock() - - if _, ok := r.channelQueues[id]; ok { - return nil, fmt.Errorf("channel %v already exists", id) - } r.channelQueues[id] = queue r.channelMessages[id] = messageType @@ -297,44 +378,52 @@ func (r *Router) routeChannel( // it on to Transport.SendMessage(). envelope.channelID = chID - // Wrap the message in a wrapper message, if requested. + // wrap the message in a wrapper message, if requested if wrapper != nil { msg := proto.Clone(wrapper) if err := msg.(Wrapper).Wrap(envelope.Message); err != nil { r.Logger.Error("failed to wrap message", "channel", chID, "err", err) continue } + envelope.Message = msg } - // Collect peer queues to pass the message via. + // collect peer queues to pass the message via var queues []queue if envelope.Broadcast { r.peerMtx.RLock() + queues = make([]queue, 0, len(r.peerQueues)) for _, q := range r.peerQueues { queues = append(queues, q) } + r.peerMtx.RUnlock() } else { r.peerMtx.RLock() q, ok := r.peerQueues[envelope.To] r.peerMtx.RUnlock() + if !ok { - r.logger.Debug("dropping message for unconnected peer", - "peer", envelope.To, "channel", chID) + r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID) continue } + queues = []queue{q} } - // Send message to peers. + // send message to peers for _, q := range queues { + start := time.Now().UTC() + select { case q.enqueue() <- envelope: + r.metrics.RouterPeerQueueSend.Observe(time.Since(start).Seconds()) + case <-q.closed(): - r.logger.Debug("dropping message for unconnected peer", - "peer", envelope.To, "channel", chID) + r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID) + case <-r.stopCh: return } @@ -344,7 +433,9 @@ func (r *Router) routeChannel( if !ok { return } + r.logger.Error("peer error, evicting", "peer", peerError.NodeID, "err", peerError.Err) + if err := r.peerManager.Errored(peerError.NodeID, peerError.Err); err != nil { r.logger.Error("failed to report peer error", "peer", peerError.NodeID, "err", err) } @@ -422,7 +513,8 @@ func (r *Router) acceptPeers(transport Transport) { return } - queue := newFIFOQueue(0) + queue := r.queueFactory(queueBufferDefault) + r.peerMtx.Lock() r.peerQueues[peerInfo.NodeID] = queue r.peerMtx.Unlock() @@ -431,7 +523,9 @@ func (r *Router) acceptPeers(transport Transport) { r.peerMtx.Lock() delete(r.peerQueues, peerInfo.NodeID) r.peerMtx.Unlock() + queue.close() + if err := r.peerManager.Disconnected(peerInfo.NodeID); err != nil { r.logger.Error("failed to disconnect peer", "peer", peerInfo.NodeID, "err", err) } @@ -496,16 +590,14 @@ func (r *Router) dialPeers() { return } - queue := newFIFOQueue(0) - r.peerMtx.Lock() - r.peerQueues[peerID] = queue - r.peerMtx.Unlock() - + peerQueue := r.getOrMakeQueue(peerID) defer func() { r.peerMtx.Lock() delete(r.peerQueues, peerID) r.peerMtx.Unlock() - queue.close() + + peerQueue.close() + if err := r.peerManager.Disconnected(peerID); err != nil { r.logger.Error("failed to disconnect peer", "peer", address, "err", err) } @@ -516,11 +608,24 @@ func (r *Router) dialPeers() { return } - r.routePeer(peerID, conn, queue) + r.routePeer(peerID, conn, peerQueue) }() } } +func (r *Router) getOrMakeQueue(peerID NodeID) queue { + r.peerMtx.Lock() + defer r.peerMtx.Unlock() + + if peerQueue, ok := r.peerQueues[peerID]; ok { + return peerQueue + } + + peerQueue := r.queueFactory(queueBufferDefault) + r.peerQueues[peerID] = peerQueue + return peerQueue +} + // dialPeer connects to a peer by dialing it. func (r *Router) dialPeer(ctx context.Context, address NodeAddress) (Connection, error) { resolveCtx := ctx @@ -603,10 +708,13 @@ func (r *Router) handshakePeer(ctx context.Context, conn Connection, expectID No // they are closed elsewhere it will cause this method to shut down and return. func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) { r.logger.Info("peer connected", "peer", peerID, "endpoint", conn) + errCh := make(chan error, 2) + go func() { errCh <- r.receivePeer(peerID, conn) }() + go func() { errCh <- r.sendPeer(peerID, conn, sendQueue) }() @@ -614,14 +722,17 @@ func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) { err := <-errCh _ = conn.Close() sendQueue.close() + if e := <-errCh; err == nil { // The first err was nil, so we update it with the second err, which may // or may not be nil. err = e } + switch err { case nil, io.EOF: r.logger.Info("peer disconnected", "peer", peerID, "endpoint", conn) + default: r.logger.Error("peer failure", "peer", peerID, "endpoint", conn, "err", err) } @@ -640,6 +751,7 @@ func (r *Router) receivePeer(peerID NodeID, conn Connection) error { queue, ok := r.channelQueues[chID] messageType := r.channelMessages[chID] r.channelMtx.RUnlock() + if !ok { r.logger.Debug("dropping message for unknown channel", "peer", peerID, "channel", chID) continue @@ -650,6 +762,7 @@ func (r *Router) receivePeer(peerID NodeID, conn Connection) error { r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err) continue } + if wrapper, ok := msg.(Wrapper); ok { msg, err = wrapper.Unwrap() if err != nil { @@ -658,11 +771,16 @@ func (r *Router) receivePeer(peerID NodeID, conn Connection) error { } } + start := time.Now().UTC() + select { case queue.enqueue() <- Envelope{From: peerID, Message: msg}: + r.metrics.RouterChannelQueueSend.Observe(time.Since(start).Seconds()) r.logger.Debug("received message", "peer", peerID, "message", msg) + case <-queue.closed(): r.logger.Debug("channel closed, dropping message", "peer", peerID, "channel", chID) + case <-r.stopCh: return nil } @@ -670,14 +788,18 @@ func (r *Router) receivePeer(peerID NodeID, conn Connection) error { } // sendPeer sends queued messages to a peer. -func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error { +func (r *Router) sendPeer(peerID NodeID, conn Connection, peerQueue queue) error { for { + start := time.Now().UTC() + select { - case envelope := <-queue.dequeue(): + case envelope := <-peerQueue.dequeue(): + r.metrics.RouterPeerQueueRecv.Observe(time.Since(start).Seconds()) if envelope.Message == nil { r.logger.Error("dropping nil message", "peer", peerID) continue } + bz, err := proto.Marshal(envelope.Message) if err != nil { r.logger.Error("failed to marshal message", "peer", peerID, "err", err) @@ -688,9 +810,10 @@ func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error { if err != nil { return err } + r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message) - case <-queue.closed(): + case <-peerQueue.closed(): return nil case <-r.stopCh: @@ -703,21 +826,26 @@ func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error { func (r *Router) evictPeers() { r.logger.Debug("starting evict routine") ctx := r.stopCtx() + for { peerID, err := r.peerManager.EvictNext(ctx) + switch { case errors.Is(err, context.Canceled): r.logger.Debug("stopping evict routine") return + case err != nil: r.logger.Error("failed to find next peer to evict", "err", err) return } r.logger.Info("evicting peer", "peer", peerID) + r.peerMtx.RLock() queue, ok := r.peerQueues[peerID] r.peerMtx.RUnlock() + if ok { queue.close() } @@ -728,9 +856,11 @@ func (r *Router) evictPeers() { func (r *Router) OnStart() error { go r.dialPeers() go r.evictPeers() + for _, transport := range r.transports { go r.acceptPeers(transport) } + return nil } @@ -753,16 +883,19 @@ func (r *Router) OnStop() { // Collect all remaining queues, and wait for them to close. queues := []queue{} + r.channelMtx.RLock() for _, q := range r.channelQueues { queues = append(queues, q) } r.channelMtx.RUnlock() + r.peerMtx.RLock() for _, q := range r.peerQueues { queues = append(queues, q) } r.peerMtx.RUnlock() + for _, q := range queues { <-q.closed() } @@ -771,9 +904,11 @@ func (r *Router) OnStop() { // stopCtx returns a new context that is canceled when the router stops. func (r *Router) stopCtx() context.Context { ctx, cancel := context.WithCancel(context.Background()) + go func() { <-r.stopCh cancel() }() + return ctx } diff --git a/p2p/router_init_test.go b/p2p/router_init_test.go new file mode 100644 index 000000000..6a8a88f38 --- /dev/null +++ b/p2p/router_init_test.go @@ -0,0 +1,62 @@ +package p2p + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/libs/log" +) + +func TestRouter_ConstructQueueFactory(t *testing.T) { + t.Run("ValidateOptionsPopulatesDefaultQueue", func(t *testing.T) { + opts := RouterOptions{} + require.NoError(t, opts.Validate()) + require.Equal(t, "fifo", opts.QueueType) + }) + t.Run("Default", func(t *testing.T) { + require.Zero(t, os.Getenv("TM_P2P_QUEUE")) + opts := RouterOptions{} + r, err := NewRouter(log.NewNopLogger(), nil, NodeInfo{}, nil, nil, nil, opts) + require.NoError(t, err) + _, ok := r.queueFactory(1).(*fifoQueue) + require.True(t, ok) + }) + t.Run("Fifo", func(t *testing.T) { + opts := RouterOptions{QueueType: queueTypeFifo} + r, err := NewRouter(log.NewNopLogger(), nil, NodeInfo{}, nil, nil, nil, opts) + require.NoError(t, err) + _, ok := r.queueFactory(1).(*fifoQueue) + require.True(t, ok) + }) + t.Run("Priority", func(t *testing.T) { + opts := RouterOptions{QueueType: queueTypePriority} + r, err := NewRouter(log.NewNopLogger(), nil, NodeInfo{}, nil, nil, nil, opts) + require.NoError(t, err) + q, ok := r.queueFactory(1).(*pqScheduler) + require.True(t, ok) + defer q.close() + }) + t.Run("WDRR", func(t *testing.T) { + opts := RouterOptions{QueueType: queueTypeWDRR} + r, err := NewRouter(log.NewNopLogger(), nil, NodeInfo{}, nil, nil, nil, opts) + require.NoError(t, err) + q, ok := r.queueFactory(1).(*wdrrScheduler) + require.True(t, ok) + defer q.close() + }) + t.Run("NonExistant", func(t *testing.T) { + opts := RouterOptions{QueueType: "fast"} + _, err := NewRouter(log.NewNopLogger(), nil, NodeInfo{}, nil, nil, nil, opts) + require.Error(t, err) + require.Contains(t, err.Error(), "fast") + }) + t.Run("InternalsSafeWhenUnspecified", func(t *testing.T) { + r := &Router{} + require.Zero(t, r.options.QueueType) + + fn, err := r.createQueueFactory() + require.Error(t, err) + require.Nil(t, fn) + }) +} diff --git a/p2p/router_test.go b/p2p/router_test.go index b4db2ca08..ac020a7d7 100644 --- a/p2p/router_test.go +++ b/p2p/router_test.go @@ -97,7 +97,15 @@ func TestRouter_Channel(t *testing.T) { // Set up a router with no transports (so no peers). peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) require.NoError(t, err) - router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, nil, p2p.RouterOptions{}) + router, err := p2p.NewRouter( + log.TestingLogger(), + p2p.NopMetrics(), + selfInfo, + selfKey, + peerManager, + nil, + p2p.RouterOptions{}, + ) require.NoError(t, err) require.NoError(t, router.Start()) @@ -343,8 +351,15 @@ func TestRouter_AcceptPeers(t *testing.T) { sub := peerManager.Subscribe() defer sub.Close() - router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, - []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + router, err := p2p.NewRouter( + log.TestingLogger(), + p2p.NopMetrics(), + selfInfo, + selfKey, + peerManager, + []p2p.Transport{mockTransport}, + p2p.RouterOptions{}, + ) require.NoError(t, err) require.NoError(t, router.Start()) @@ -383,8 +398,15 @@ func TestRouter_AcceptPeers_Error(t *testing.T) { // Set up and start the router. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) require.NoError(t, err) - router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, - []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + router, err := p2p.NewRouter( + log.TestingLogger(), + p2p.NopMetrics(), + selfInfo, + selfKey, + peerManager, + []p2p.Transport{mockTransport}, + p2p.RouterOptions{}, + ) require.NoError(t, err) require.NoError(t, router.Start()) @@ -408,8 +430,15 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) { // Set up and start the router. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) require.NoError(t, err) - router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, - []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + router, err := p2p.NewRouter( + log.TestingLogger(), + p2p.NopMetrics(), + selfInfo, + selfKey, + peerManager, + []p2p.Transport{mockTransport}, + p2p.RouterOptions{}, + ) require.NoError(t, err) require.NoError(t, router.Start()) @@ -446,8 +475,15 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) { // Set up and start the router. peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) require.NoError(t, err) - router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, - []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + router, err := p2p.NewRouter( + log.TestingLogger(), + p2p.NopMetrics(), + selfInfo, + selfKey, + peerManager, + []p2p.Transport{mockTransport}, + p2p.RouterOptions{}, + ) require.NoError(t, err) require.NoError(t, router.Start()) @@ -520,8 +556,15 @@ func TestRouter_DialPeers(t *testing.T) { sub := peerManager.Subscribe() defer sub.Close() - router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, - []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + router, err := p2p.NewRouter( + log.TestingLogger(), + p2p.NopMetrics(), + selfInfo, + selfKey, + peerManager, + []p2p.Transport{mockTransport}, + p2p.RouterOptions{}, + ) require.NoError(t, err) require.NoError(t, router.Start()) @@ -583,8 +626,15 @@ func TestRouter_DialPeers_Parallel(t *testing.T) { require.NoError(t, peerManager.Add(b)) require.NoError(t, peerManager.Add(c)) - router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, - []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + router, err := p2p.NewRouter( + log.TestingLogger(), + p2p.NopMetrics(), + selfInfo, + selfKey, + peerManager, + []p2p.Transport{mockTransport}, + p2p.RouterOptions{}, + ) require.NoError(t, err) require.NoError(t, router.Start()) @@ -630,8 +680,15 @@ func TestRouter_EvictPeers(t *testing.T) { sub := peerManager.Subscribe() defer sub.Close() - router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, - []p2p.Transport{mockTransport}, p2p.RouterOptions{}) + router, err := p2p.NewRouter( + log.TestingLogger(), + p2p.NopMetrics(), + selfInfo, + selfKey, + peerManager, + []p2p.Transport{mockTransport}, + p2p.RouterOptions{}, + ) require.NoError(t, err) require.NoError(t, router.Start()) diff --git a/p2p/wdrr_queue.go b/p2p/wdrr_queue.go new file mode 100644 index 000000000..57815c8dd --- /dev/null +++ b/p2p/wdrr_queue.go @@ -0,0 +1,283 @@ +package p2p + +import ( + "sort" + "strconv" + + "github.com/gogo/protobuf/proto" + "github.com/tendermint/tendermint/libs/log" + tmsync "github.com/tendermint/tendermint/libs/sync" +) + +const defaultCapacity uint = 1048576 // 1MB + +// wrappedEnvelope wraps a p2p Envelope with its precomputed size. +type wrappedEnvelope struct { + envelope Envelope + size uint +} + +// assert the WDDR scheduler implements the queue interface at compile-time +var _ queue = (*wdrrScheduler)(nil) + +// wdrrQueue implements a Weighted Deficit Round Robin (WDRR) scheduling +// algorithm via the queue interface. A WDRR queue is created per peer, where +// the queue will have N number of flows. Each flow corresponds to a p2p Channel, +// so there are n input flows and a single output source, the peer's connection. +// +// The WDRR scheduler contains a shared buffer with a fixed capacity. +// +// Each flow has the following: +// - quantum: The number of bytes that is added to the deficit counter of the +// flow in each round. The flow can send at most quantum bytes at a time. Each +// flow has its own unique quantum, which gives the queue its weighted nature. +// A higher quantum corresponds to a higher weight/priority. The quantum is +// computed as MaxSendBytes * Priority. +// - deficit counter: The number of bytes that the flow is allowed to transmit +// when it is its turn. +// +// See: https://en.wikipedia.org/wiki/Deficit_round_robin +type wdrrScheduler struct { + logger log.Logger + metrics *Metrics + chDescs []ChannelDescriptor + capacity uint + size uint + chPriorities map[ChannelID]uint + buffer map[ChannelID][]wrappedEnvelope + quanta map[ChannelID]uint + deficits map[ChannelID]uint + + closer *tmsync.Closer + doneCh *tmsync.Closer + + enqueueCh chan Envelope + dequeueCh chan Envelope +} + +func newWDRRScheduler( + logger log.Logger, + m *Metrics, + chDescs []ChannelDescriptor, + enqueueBuf, dequeueBuf, capacity uint, +) *wdrrScheduler { + + // copy each ChannelDescriptor and sort them by channel priority + chDescsCopy := make([]ChannelDescriptor, len(chDescs)) + copy(chDescsCopy, chDescs) + sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority > chDescsCopy[j].Priority }) + + var ( + buffer = make(map[ChannelID][]wrappedEnvelope) + chPriorities = make(map[ChannelID]uint) + quanta = make(map[ChannelID]uint) + deficits = make(map[ChannelID]uint) + ) + + for _, chDesc := range chDescsCopy { + chID := ChannelID(chDesc.ID) + chPriorities[chID] = uint(chDesc.Priority) + buffer[chID] = make([]wrappedEnvelope, 0) + quanta[chID] = chDesc.MaxSendBytes * uint(chDesc.Priority) + } + + return &wdrrScheduler{ + logger: logger.With("queue", "wdrr"), + metrics: m, + capacity: capacity, + chPriorities: chPriorities, + chDescs: chDescsCopy, + buffer: buffer, + quanta: quanta, + deficits: deficits, + closer: tmsync.NewCloser(), + doneCh: tmsync.NewCloser(), + enqueueCh: make(chan Envelope, enqueueBuf), + dequeueCh: make(chan Envelope, dequeueBuf), + } +} + +// enqueue returns an unbuffered write-only channel which a producer can send on. +func (s *wdrrScheduler) enqueue() chan<- Envelope { + return s.enqueueCh +} + +// dequeue returns an unbuffered read-only channel which a consumer can read from. +func (s *wdrrScheduler) dequeue() <-chan Envelope { + return s.dequeueCh +} + +func (s *wdrrScheduler) closed() <-chan struct{} { + return s.closer.Done() +} + +// close closes the WDRR queue. After this call enqueue() will block, so the +// caller must select on closed() as well to avoid blocking forever. The +// enqueue() and dequeue() along with the internal channels will NOT be closed. +// Note, close() will block until all externally spawned goroutines have exited. +func (s *wdrrScheduler) close() { + s.closer.Close() + <-s.doneCh.Done() +} + +// start starts the WDRR queue process in a blocking goroutine. This must be +// called before the queue can start to process and accept Envelopes. +func (s *wdrrScheduler) start() { + go s.process() +} + +// process starts a blocking WDRR scheduler process, where we continuously +// evaluate if we need to attempt to enqueue an Envelope or schedule Envelopes +// to be dequeued and subsequently read and sent on the source connection. +// Internally, each p2p Channel maps to a flow, where each flow has a deficit +// and a quantum. +// +// For each Envelope requested to be enqueued, we evaluate if there is sufficient +// capacity in the shared buffer to add the Envelope. If so, it is added. +// Otherwise, we evaluate all flows of lower priority where we attempt find an +// existing Envelope in the shared buffer of sufficient size that can be dropped +// in place of the incoming Envelope. If there is no such Envelope that can be +// dropped, then the incoming Envelope is dropped. +// +// When there is nothing to be enqueued, we perform the WDRR algorithm and +// determine which Envelopes can be dequeued. For each Envelope that can be +// dequeued, it is sent on the dequeueCh. Specifically, for each flow, if it is +// non-empty, its deficit counter is incremented by its quantum value. Then, the +// value of the deficit counter is a maximal amount of bytes that can be sent at +// this round. If the deficit counter is greater than the Envelopes's message +// size at the head of the queue (HoQ), this envelope can be sent and the value +// of the counter is decremented by the message's size. Then, the size of the +// next Envelopes's message is compared to the counter value, etc. Once the flow +// is empty or the value of the counter is insufficient, the scheduler will skip +// to the next flow. If the flow is empty, the value of the deficit counter is +// reset to 0. +// +// XXX/TODO: Evaluate the single goroutine scheduler mechanism. In other words, +// evaluate the effectiveness and performance of having a single goroutine +// perform handling both enqueueing and dequeueing logic. Specifically, there +// is potentially contention between reading off of enqueueCh and trying to +// enqueue while also attempting to perform the WDRR algorithm and find the next +// set of Envelope(s) to send on the dequeueCh. Alternatively, we could consider +// separate scheduling goroutines, but then that requires the use of mutexes and +// possibly a degrading performance. +func (s *wdrrScheduler) process() { + defer s.doneCh.Close() + + for { + select { + case <-s.closer.Done(): + return + + case e := <-s.enqueueCh: + // attempt to enqueue the incoming Envelope + chIDStr := strconv.Itoa(int(e.channelID)) + wEnv := wrappedEnvelope{envelope: e, size: uint(proto.Size(e.Message))} + msgSize := wEnv.size + + // If we're at capacity, we need to either drop the incoming Envelope or + // an Envelope from a lower priority flow. Otherwise, we add the (wrapped) + // envelope to the flow's queue. + if s.size+wEnv.size > s.capacity { + chPriority := s.chPriorities[e.channelID] + + var ( + canDrop bool + dropIdx int + dropChID ChannelID + ) + + // Evaluate all lower priority flows and determine if there exists an + // Envelope that is of equal or greater size that we can drop in favor + // of the incoming Envelope. + for i := len(s.chDescs) - 1; i >= 0 && uint(s.chDescs[i].Priority) < chPriority && !canDrop; i-- { + currChID := ChannelID(s.chDescs[i].ID) + flow := s.buffer[currChID] + + for j := 0; j < len(flow) && !canDrop; j++ { + if flow[j].size >= wEnv.size { + canDrop = true + dropIdx = j + dropChID = currChID + break + } + } + } + + // If we can drop an existing Envelope, drop it and enqueue the incoming + // Envelope. + if canDrop { + chIDStr = strconv.Itoa(int(dropChID)) + chPriority = s.chPriorities[dropChID] + msgSize = s.buffer[dropChID][dropIdx].size + + // Drop Envelope for the lower priority flow and update the queue's + // buffer size + s.size -= msgSize + s.buffer[dropChID] = append(s.buffer[dropChID][:dropIdx], s.buffer[dropChID][dropIdx+1:]...) + + // add the incoming Envelope and update queue's buffer size + s.size += wEnv.size + s.buffer[e.channelID] = append(s.buffer[e.channelID], wEnv) + s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(wEnv.size)) + } + + // We either dropped the incoming Enevelope or one from an existing + // lower priority flow. + s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1) + s.logger.Debug( + "dropped envelope", + "ch_id", chIDStr, + "priority", chPriority, + "capacity", s.capacity, + "msg_size", msgSize, + ) + } else { + // we have sufficient capacity to enqueue the incoming Envelope + s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(wEnv.size)) + s.buffer[e.channelID] = append(s.buffer[e.channelID], wEnv) + s.size += wEnv.size + } + + default: + // perform the WDRR algorithm + for _, chDesc := range s.chDescs { + chID := ChannelID(chDesc.ID) + + // only consider non-empty flows + if len(s.buffer[chID]) > 0 { + // bump flow's quantum + s.deficits[chID] += s.quanta[chID] + + // grab the flow's current deficit counter and HoQ (wrapped) Envelope + d := s.deficits[chID] + we := s.buffer[chID][0] + + // While the flow is non-empty and we can send the current Envelope + // on the dequeueCh: + // + // 1. send the Envelope + // 2. update the scheduler's shared buffer's size + // 3. update the flow's deficit + // 4. remove from the flow's queue + // 5. grab the next HoQ Envelope and flow's deficit + for len(s.buffer[chID]) > 0 && d >= we.size { + s.dequeueCh <- we.envelope + s.size -= we.size + s.deficits[chID] -= we.size + s.buffer[chID] = s.buffer[chID][1:] + + if len(s.buffer[chID]) > 0 { + d = s.deficits[chID] + we = s.buffer[chID][0] + } + } + } + + // reset the flow's deficit to zero if it is empty + if len(s.buffer[chID]) == 0 { + s.deficits[chID] = 0 + } + } + } + } +} diff --git a/p2p/wdrr_queue_test.go b/p2p/wdrr_queue_test.go new file mode 100644 index 000000000..0fe67a94b --- /dev/null +++ b/p2p/wdrr_queue_test.go @@ -0,0 +1,208 @@ +package p2p + +import ( + "math" + "math/rand" + "testing" + "time" + + gogotypes "github.com/gogo/protobuf/types" + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/libs/log" + tmsync "github.com/tendermint/tendermint/libs/sync" +) + +type testMessage = gogotypes.StringValue + +func TestWDRRQueue_EqualWeights(t *testing.T) { + chDescs := []ChannelDescriptor{ + {ID: 0x01, Priority: 1, MaxSendBytes: 4}, + {ID: 0x02, Priority: 1, MaxSendBytes: 4}, + {ID: 0x03, Priority: 1, MaxSendBytes: 4}, + {ID: 0x04, Priority: 1, MaxSendBytes: 4}, + {ID: 0x05, Priority: 1, MaxSendBytes: 4}, + {ID: 0x06, Priority: 1, MaxSendBytes: 4}, + } + + peerQueue := newWDRRScheduler(log.NewNopLogger(), NopMetrics(), chDescs, 1000, 1000, 120) + peerQueue.start() + + totalMsgs := make(map[ChannelID]int) + deliveredMsgs := make(map[ChannelID]int) + successRates := make(map[ChannelID]float64) + + closer := tmsync.NewCloser() + + go func() { + timout := 10 * time.Second + ticker := time.NewTicker(timout) + defer ticker.Stop() + + for { + select { + case e := <-peerQueue.dequeue(): + deliveredMsgs[e.channelID]++ + ticker.Reset(timout) + + case <-ticker.C: + closer.Close() + } + } + }() + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + maxMsgs := 5000 + minMsgs := 1000 + + for _, chDesc := range chDescs { + total := rng.Intn(maxMsgs-minMsgs) + minMsgs // total = rand[minMsgs, maxMsgs) + totalMsgs[ChannelID(chDesc.ID)] = total + + go func(cID ChannelID, n int) { + for i := 0; i < n; i++ { + peerQueue.enqueue() <- Envelope{ + channelID: cID, + Message: &testMessage{Value: "foo"}, // 5 bytes + } + } + }(ChannelID(chDesc.ID), total) + } + + // wait for dequeueing to complete + <-closer.Done() + + // close queue and wait for cleanup + peerQueue.close() + <-peerQueue.closed() + + var ( + sum float64 + stdDev float64 + ) + + for _, chDesc := range peerQueue.chDescs { + chID := ChannelID(chDesc.ID) + require.Zero(t, peerQueue.deficits[chID], "expected flow deficit to be zero") + require.Len(t, peerQueue.buffer[chID], 0, "expected flow queue to be empty") + + total := totalMsgs[chID] + delivered := deliveredMsgs[chID] + successRate := float64(delivered) / float64(total) + + sum += successRate + successRates[chID] = successRate + + // require some messages dropped + require.Less(t, delivered, total, "expected some messages to be dropped") + require.Less(t, successRate, 1.0, "expected a success rate below 100%") + } + + require.Zero(t, peerQueue.size, "expected scheduler size to be zero") + + numFlows := float64(len(peerQueue.buffer)) + mean := sum / numFlows + + for _, successRate := range successRates { + stdDev += math.Pow(successRate-mean, 2) + } + + stdDev = math.Sqrt(stdDev / numFlows) + require.Less(t, stdDev, 0.02, "expected success rate standard deviation to be less than 2%") +} + +func TestWDRRQueue_DecreasingWeights(t *testing.T) { + chDescs := []ChannelDescriptor{ + {ID: 0x01, Priority: 18, MaxSendBytes: 4}, + {ID: 0x02, Priority: 10, MaxSendBytes: 4}, + {ID: 0x03, Priority: 2, MaxSendBytes: 4}, + {ID: 0x04, Priority: 1, MaxSendBytes: 4}, + {ID: 0x05, Priority: 1, MaxSendBytes: 4}, + {ID: 0x06, Priority: 1, MaxSendBytes: 4}, + } + + peerQueue := newWDRRScheduler(log.NewNopLogger(), NopMetrics(), chDescs, 0, 0, 500) + peerQueue.start() + + totalMsgs := make(map[ChannelID]int) + deliveredMsgs := make(map[ChannelID]int) + successRates := make(map[ChannelID]float64) + + for _, chDesc := range chDescs { + total := 1000 + totalMsgs[ChannelID(chDesc.ID)] = total + + go func(cID ChannelID, n int) { + for i := 0; i < n; i++ { + peerQueue.enqueue() <- Envelope{ + channelID: cID, + Message: &testMessage{Value: "foo"}, // 5 bytes + } + } + }(ChannelID(chDesc.ID), total) + } + + closer := tmsync.NewCloser() + + go func() { + timout := 20 * time.Second + ticker := time.NewTicker(timout) + defer ticker.Stop() + + for { + select { + case e := <-peerQueue.dequeue(): + deliveredMsgs[e.channelID]++ + ticker.Reset(timout) + + case <-ticker.C: + closer.Close() + } + } + }() + + // wait for dequeueing to complete + <-closer.Done() + + // close queue and wait for cleanup + peerQueue.close() + <-peerQueue.closed() + + for i, chDesc := range peerQueue.chDescs { + chID := ChannelID(chDesc.ID) + require.Zero(t, peerQueue.deficits[chID], "expected flow deficit to be zero") + require.Len(t, peerQueue.buffer[chID], 0, "expected flow queue to be empty") + + total := totalMsgs[chID] + delivered := deliveredMsgs[chID] + successRate := float64(delivered) / float64(total) + + successRates[chID] = successRate + + // Require some messages dropped. Note, the top weighted flows may not have + // any dropped if lower priority non-empty queues always exist. + if i > 2 { + require.Less(t, delivered, total, "expected some messages to be dropped") + require.Less(t, successRate, 1.0, "expected a success rate below 100%") + } + } + + require.Zero(t, peerQueue.size, "expected scheduler size to be zero") + + // require channel 0x01 to have the highest success rate due to its weight + ch01Rate := successRates[ChannelID(chDescs[0].ID)] + for i := 1; i < len(chDescs); i++ { + require.GreaterOrEqual(t, ch01Rate, successRates[ChannelID(chDescs[i].ID)]) + } + + // require channel 0x02 to have the 2nd highest success rate due to its weight + ch02Rate := successRates[ChannelID(chDescs[1].ID)] + for i := 2; i < len(chDescs); i++ { + require.GreaterOrEqual(t, ch02Rate, successRates[ChannelID(chDescs[i].ID)]) + } + + // require channel 0x03 to have the 3rd highest success rate due to its weight + ch03Rate := successRates[ChannelID(chDescs[2].ID)] + for i := 3; i < len(chDescs); i++ { + require.GreaterOrEqual(t, ch03Rate, successRates[ChannelID(chDescs[i].ID)]) + } +} diff --git a/statesync/reactor.go b/statesync/reactor.go index 5b4d1d95d..7c48fe3d7 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -37,6 +37,8 @@ var ( Priority: 5, SendQueueCapacity: 10, RecvMessageCapacity: snapshotMsgSize, + + MaxSendBytes: 400, }, }, ChunkChannel: { @@ -46,6 +48,8 @@ var ( Priority: 1, SendQueueCapacity: 4, RecvMessageCapacity: chunkMsgSize, + + MaxSendBytes: 400, }, }, } diff --git a/test/e2e/docker/Dockerfile b/test/e2e/docker/Dockerfile index cd28056b6..d0f9cd7a2 100644 --- a/test/e2e/docker/Dockerfile +++ b/test/e2e/docker/Dockerfile @@ -18,6 +18,7 @@ RUN go mod download COPY . . RUN make build && cp build/tendermint /usr/bin/tendermint COPY test/e2e/docker/entrypoint* /usr/bin/ + # FIXME: Temporarily disconnect maverick node until it is redesigned # RUN cd test/e2e && make maverick && cp build/maverick /usr/bin/maverick RUN cd test/e2e && make app && cp build/app /usr/bin/app @@ -28,6 +29,7 @@ WORKDIR /tendermint VOLUME /tendermint ENV TMHOME=/tendermint ENV TM_LEGACY_P2P=true +ENV TM_P2P_QUEUE="priority" EXPOSE 26656 26657 26660 6060 ENTRYPOINT ["/usr/bin/entrypoint"] diff --git a/test/e2e/runner/load.go b/test/e2e/runner/load.go index 4b58c2569..151f4511d 100644 --- a/test/e2e/runner/load.go +++ b/test/e2e/runner/load.go @@ -13,9 +13,10 @@ import ( "github.com/tendermint/tendermint/types" ) -// Load generates transactions against the network until the given -// context is canceled. -func Load(ctx context.Context, testnet *e2e.Testnet) error { +// Load generates transactions against the network until the given context is +// canceled. A multiplier of great than one can be supplied if load needs to +// be generated beyond a minimum amount. +func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error { // Since transactions are executed across all nodes in the network, we need // to reduce transaction load for larger networks to avoid using too much // CPU. This gives high-throughput small networks and low-throughput large ones. @@ -39,7 +40,7 @@ func Load(ctx context.Context, testnet *e2e.Testnet) error { go loadGenerate(ctx, chTx) - for w := 0; w < concurrency; w++ { + for w := 0; w < concurrency*multiplier; w++ { go loadProcess(ctx, testnet, chTx, chSuccess) } @@ -81,6 +82,7 @@ func loadGenerate(ctx context.Context, chTx chan<- types.Tx) { select { case chTx <- tx: time.Sleep(10 * time.Millisecond) + case <-ctx.Done(): close(chTx) return @@ -97,18 +99,21 @@ func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx var err error for tx := range chTx { node := testnet.RandomNode() + client, ok := clients[node.Name] if !ok { client, err = node.Client() if err != nil { continue } + clients[node.Name] = client } - _, err = client.BroadcastTxCommit(ctx, tx) - if err != nil { + + if _, err = client.BroadcastTxCommit(ctx, tx); err != nil { continue } + chSuccess <- tx } } diff --git a/test/e2e/runner/main.go b/test/e2e/runner/main.go index 72eb52c58..993df98ef 100644 --- a/test/e2e/runner/main.go +++ b/test/e2e/runner/main.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "strconv" "github.com/spf13/cobra" @@ -59,7 +60,7 @@ func NewCLI() *CLI { ctx, loadCancel := context.WithCancel(context.Background()) defer loadCancel() go func() { - err := Load(ctx, cli.testnet) + err := Load(ctx, cli.testnet, 1) if err != nil { logger.Error(fmt.Sprintf("Transaction load failed: %v", err.Error())) } @@ -170,10 +171,20 @@ func NewCLI() *CLI { }) cli.root.AddCommand(&cobra.Command{ - Use: "load", + Use: "load [multiplier]", + Args: cobra.MaximumNArgs(1), Short: "Generates transaction load until the command is canceled", - RunE: func(cmd *cobra.Command, args []string) error { - return Load(context.Background(), cli.testnet) + RunE: func(cmd *cobra.Command, args []string) (err error) { + m := 1 + + if len(args) == 1 { + m, err = strconv.Atoi(args[0]) + if err != nil { + return err + } + } + + return Load(context.Background(), cli.testnet, m) }, }) @@ -242,7 +253,7 @@ Does not run any perbutations. ctx, loadCancel := context.WithCancel(context.Background()) defer loadCancel() go func() { - err := Load(ctx, cli.testnet) + err := Load(ctx, cli.testnet, 1) if err != nil { logger.Error(fmt.Sprintf("Transaction load failed: %v", err.Error())) } diff --git a/test/e2e/runner/setup.go b/test/e2e/runner/setup.go index 69c9067ad..6045afbe4 100644 --- a/test/e2e/runner/setup.go +++ b/test/e2e/runner/setup.go @@ -154,6 +154,9 @@ func MakeDockerCompose(testnet *e2e.Testnet) ([]byte, error) { } return command }, + "addUint32": func(x, y uint32) uint32 { + return x + y + }, }).Parse(`version: '2.4' networks: @@ -184,6 +187,7 @@ services: init: true ports: - 26656 + - {{ if .ProxyPort }}{{ addUint32 .ProxyPort 1000 }}:{{ end }}26660 - {{ if .ProxyPort }}{{ .ProxyPort }}:{{ end }}26657 - 6060 volumes: @@ -247,9 +251,11 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { cfg := config.DefaultConfig() cfg.Moniker = node.Name cfg.ProxyApp = AppAddressTCP + if node.LogLevel != "" { cfg.LogLevel = node.LogLevel } + cfg.RPC.ListenAddress = "tcp://0.0.0.0:26657" cfg.RPC.PprofListenAddress = ":6060" cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false)) @@ -315,12 +321,14 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { if node.StateSync { cfg.StateSync.Enable = true cfg.StateSync.RPCServers = []string{} + for _, peer := range node.Testnet.ArchiveNodes() { if peer.Name == node.Name { continue } cfg.StateSync.RPCServers = append(cfg.StateSync.RPCServers, peer.AddressRPC()) } + if len(cfg.StateSync.RPCServers) < 2 { return nil, errors.New("unable to find 2 suitable state sync RPC servers") } @@ -333,6 +341,7 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { } cfg.P2P.Seeds += seed.AddressP2P(true) } + cfg.P2P.PersistentPeers = "" for _, peer := range node.PersistentPeers { if len(cfg.P2P.PersistentPeers) > 0 { @@ -340,6 +349,9 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { } cfg.P2P.PersistentPeers += peer.AddressP2P(true) } + + cfg.Instrumentation.Prometheus = true + return cfg, nil }