diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 0a4c90f40..c44c58ef0 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -24,6 +24,7 @@ Special thanks to external contributors on this release: - [blocksync] \#7046 Remove v2 implementation of the blocksync service and recactor, which was disabled in the previous release (@tychoish) + - [p2p] \#7064 Remove WDRR queue implementation. (@tychoish) - Blockchain Protocol diff --git a/config/config.go b/config/config.go index a393e6edc..349e5a6f7 100644 --- a/config/config.go +++ b/config/config.go @@ -710,7 +710,7 @@ type P2PConfig struct { //nolint: maligned TestDialFail bool `mapstructure:"test-dial-fail"` // Makes it possible to configure which queue backend the p2p - // layer uses. Options are: "fifo", "priority" and "wdrr", + // layer uses. Options are: "fifo" and "priority", // with the default being "priority". QueueType string `mapstructure:"queue-type"` } diff --git a/internal/p2p/pqueue_test.go b/internal/p2p/pqueue_test.go index ddb7addbe..c038408ef 100644 --- a/internal/p2p/pqueue_test.go +++ b/internal/p2p/pqueue_test.go @@ -4,9 +4,12 @@ import ( "testing" "time" + gogotypes "github.com/gogo/protobuf/types" "github.com/tendermint/tendermint/libs/log" ) +type testMessage = gogotypes.StringValue + func TestCloseWhileDequeueFull(t *testing.T) { enqueueLength := 5 chDescs := []ChannelDescriptor{ diff --git a/internal/p2p/router.go b/internal/p2p/router.go index d68f16c4f..ef1eef564 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -133,8 +133,8 @@ type RouterOptions struct { // no timeout. HandshakeTimeout time.Duration - // QueueType must be "wdrr" (Weighed Deficit Round Robin), "priority", or - // "fifo". Defaults to "fifo". + // QueueType must be, "priority", or "fifo". Defaults to + // "fifo". QueueType string // MaxIncomingConnectionAttempts rate limits the number of incoming connection @@ -176,7 +176,6 @@ type RouterOptions struct { const ( queueTypeFifo = "fifo" queueTypePriority = "priority" - queueTypeWDRR = "wdrr" ) // Validate validates router options. @@ -184,8 +183,8 @@ func (o *RouterOptions) Validate() error { switch o.QueueType { case "": o.QueueType = queueTypeFifo - case queueTypeFifo, queueTypeWDRR, queueTypePriority: - // passI me + case queueTypeFifo, queueTypePriority: + // pass default: return fmt.Errorf("queue type %q is not supported", o.QueueType) } @@ -347,17 +346,6 @@ func (r *Router) createQueueFactory() (func(int) queue, error) { return q }, nil - case queueTypeWDRR: - return func(size int) queue { - if size%2 != 0 { - size++ - } - - q := newWDRRScheduler(r.logger, r.metrics, r.chDescs, uint(size)/2, uint(size)/2, defaultCapacity) - q.start() - return q - }, nil - default: return nil, fmt.Errorf("cannot construct queue of type %q", r.options.QueueType) } diff --git a/internal/p2p/router_init_test.go b/internal/p2p/router_init_test.go index 3622c0cc1..b90d2a3dd 100644 --- a/internal/p2p/router_init_test.go +++ b/internal/p2p/router_init_test.go @@ -38,14 +38,6 @@ func TestRouter_ConstructQueueFactory(t *testing.T) { require.True(t, ok) defer q.close() }) - t.Run("WDRR", func(t *testing.T) { - opts := RouterOptions{QueueType: queueTypeWDRR} - r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, opts) - require.NoError(t, err) - q, ok := r.queueFactory(1).(*wdrrScheduler) - require.True(t, ok) - defer q.close() - }) t.Run("NonExistant", func(t *testing.T) { opts := RouterOptions{QueueType: "fast"} _, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, opts) diff --git a/internal/p2p/wdrr_queue.go b/internal/p2p/wdrr_queue.go deleted file mode 100644 index 1b75ffce8..000000000 --- a/internal/p2p/wdrr_queue.go +++ /dev/null @@ -1,287 +0,0 @@ -package p2p - -import ( - "fmt" - "sort" - "strconv" - - "github.com/gogo/protobuf/proto" - tmsync "github.com/tendermint/tendermint/internal/libs/sync" - "github.com/tendermint/tendermint/libs/log" -) - -// wrappedEnvelope wraps a p2p Envelope with its precomputed size. -type wrappedEnvelope struct { - envelope Envelope - size uint -} - -// assert the WDRR scheduler implements the queue interface at compile-time -var _ queue = (*wdrrScheduler)(nil) - -// wdrrQueue implements a Weighted Deficit Round Robin (WDRR) scheduling -// algorithm via the queue interface. A WDRR queue is created per peer, where -// the queue will have N number of flows. Each flow corresponds to a p2p Channel, -// so there are n input flows and a single output source, the peer's connection. -// -// The WDRR scheduler contains a shared buffer with a fixed capacity. -// -// Each flow has the following: -// - quantum: The number of bytes that is added to the deficit counter of the -// flow in each round. The flow can send at most quantum bytes at a time. Each -// flow has its own unique quantum, which gives the queue its weighted nature. -// A higher quantum corresponds to a higher weight/priority. The quantum is -// computed as MaxSendBytes * Priority. -// - deficit counter: The number of bytes that the flow is allowed to transmit -// when it is its turn. -// -// See: https://en.wikipedia.org/wiki/Deficit_round_robin -type wdrrScheduler struct { - logger log.Logger - metrics *Metrics - chDescs []ChannelDescriptor - capacity uint - size uint - chPriorities map[ChannelID]uint - buffer map[ChannelID][]wrappedEnvelope - quanta map[ChannelID]uint - deficits map[ChannelID]uint - - closer *tmsync.Closer - doneCh *tmsync.Closer - - enqueueCh chan Envelope - dequeueCh chan Envelope -} - -func newWDRRScheduler( - logger log.Logger, - m *Metrics, - chDescs []ChannelDescriptor, - enqueueBuf, dequeueBuf, capacity uint, -) *wdrrScheduler { - - // copy each ChannelDescriptor and sort them by channel priority - chDescsCopy := make([]ChannelDescriptor, len(chDescs)) - copy(chDescsCopy, chDescs) - sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority > chDescsCopy[j].Priority }) - - var ( - buffer = make(map[ChannelID][]wrappedEnvelope) - chPriorities = make(map[ChannelID]uint) - quanta = make(map[ChannelID]uint) - deficits = make(map[ChannelID]uint) - ) - - for _, chDesc := range chDescsCopy { - chID := ChannelID(chDesc.ID) - chPriorities[chID] = uint(chDesc.Priority) - buffer[chID] = make([]wrappedEnvelope, 0) - quanta[chID] = chDesc.MaxSendBytes * uint(chDesc.Priority) - } - - return &wdrrScheduler{ - logger: logger.With("queue", "wdrr"), - metrics: m, - capacity: capacity, - chPriorities: chPriorities, - chDescs: chDescsCopy, - buffer: buffer, - quanta: quanta, - deficits: deficits, - closer: tmsync.NewCloser(), - doneCh: tmsync.NewCloser(), - enqueueCh: make(chan Envelope, enqueueBuf), - dequeueCh: make(chan Envelope, dequeueBuf), - } -} - -// enqueue returns an unbuffered write-only channel which a producer can send on. -func (s *wdrrScheduler) enqueue() chan<- Envelope { - return s.enqueueCh -} - -// dequeue returns an unbuffered read-only channel which a consumer can read from. -func (s *wdrrScheduler) dequeue() <-chan Envelope { - return s.dequeueCh -} - -func (s *wdrrScheduler) closed() <-chan struct{} { - return s.closer.Done() -} - -// close closes the WDRR queue. After this call enqueue() will block, so the -// caller must select on closed() as well to avoid blocking forever. The -// enqueue() and dequeue() along with the internal channels will NOT be closed. -// Note, close() will block until all externally spawned goroutines have exited. -func (s *wdrrScheduler) close() { - s.closer.Close() - <-s.doneCh.Done() -} - -// start starts the WDRR queue process in a blocking goroutine. This must be -// called before the queue can start to process and accept Envelopes. -func (s *wdrrScheduler) start() { - go s.process() -} - -// process starts a blocking WDRR scheduler process, where we continuously -// evaluate if we need to attempt to enqueue an Envelope or schedule Envelopes -// to be dequeued and subsequently read and sent on the source connection. -// Internally, each p2p Channel maps to a flow, where each flow has a deficit -// and a quantum. -// -// For each Envelope requested to be enqueued, we evaluate if there is sufficient -// capacity in the shared buffer to add the Envelope. If so, it is added. -// Otherwise, we evaluate all flows of lower priority where we attempt find an -// existing Envelope in the shared buffer of sufficient size that can be dropped -// in place of the incoming Envelope. If there is no such Envelope that can be -// dropped, then the incoming Envelope is dropped. -// -// When there is nothing to be enqueued, we perform the WDRR algorithm and -// determine which Envelopes can be dequeued. For each Envelope that can be -// dequeued, it is sent on the dequeueCh. Specifically, for each flow, if it is -// non-empty, its deficit counter is incremented by its quantum value. Then, the -// value of the deficit counter is a maximal amount of bytes that can be sent at -// this round. If the deficit counter is greater than the Envelopes's message -// size at the head of the queue (HoQ), this envelope can be sent and the value -// of the counter is decremented by the message's size. Then, the size of the -// next Envelopes's message is compared to the counter value, etc. Once the flow -// is empty or the value of the counter is insufficient, the scheduler will skip -// to the next flow. If the flow is empty, the value of the deficit counter is -// reset to 0. -// -// XXX/TODO: Evaluate the single goroutine scheduler mechanism. In other words, -// evaluate the effectiveness and performance of having a single goroutine -// perform handling both enqueueing and dequeueing logic. Specifically, there -// is potentially contention between reading off of enqueueCh and trying to -// enqueue while also attempting to perform the WDRR algorithm and find the next -// set of Envelope(s) to send on the dequeueCh. Alternatively, we could consider -// separate scheduling goroutines, but then that requires the use of mutexes and -// possibly a degrading performance. -func (s *wdrrScheduler) process() { - defer s.doneCh.Close() - - for { - select { - case <-s.closer.Done(): - return - - case e := <-s.enqueueCh: - // attempt to enqueue the incoming Envelope - chIDStr := strconv.Itoa(int(e.channelID)) - wEnv := wrappedEnvelope{envelope: e, size: uint(proto.Size(e.Message))} - msgSize := wEnv.size - - s.metrics.PeerPendingSendBytes.With("peer_id", string(e.To)).Add(float64(msgSize)) - - // If we're at capacity, we need to either drop the incoming Envelope or - // an Envelope from a lower priority flow. Otherwise, we add the (wrapped) - // envelope to the flow's queue. - if s.size+wEnv.size > s.capacity { - chPriority := s.chPriorities[e.channelID] - - var ( - canDrop bool - dropIdx int - dropChID ChannelID - ) - - // Evaluate all lower priority flows and determine if there exists an - // Envelope that is of equal or greater size that we can drop in favor - // of the incoming Envelope. - for i := len(s.chDescs) - 1; i >= 0 && uint(s.chDescs[i].Priority) < chPriority && !canDrop; i-- { - currChID := ChannelID(s.chDescs[i].ID) - flow := s.buffer[currChID] - - for j := 0; j < len(flow) && !canDrop; j++ { - if flow[j].size >= wEnv.size { - canDrop = true - dropIdx = j - dropChID = currChID - break - } - } - } - - // If we can drop an existing Envelope, drop it and enqueue the incoming - // Envelope. - if canDrop { - chIDStr = strconv.Itoa(int(dropChID)) - chPriority = s.chPriorities[dropChID] - msgSize = s.buffer[dropChID][dropIdx].size - - // Drop Envelope for the lower priority flow and update the queue's - // buffer size - s.size -= msgSize - s.buffer[dropChID] = append(s.buffer[dropChID][:dropIdx], s.buffer[dropChID][dropIdx+1:]...) - - // add the incoming Envelope and update queue's buffer size - s.size += wEnv.size - s.buffer[e.channelID] = append(s.buffer[e.channelID], wEnv) - s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(wEnv.size)) - } - - // We either dropped the incoming Enevelope or one from an existing - // lower priority flow. - s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1) - s.logger.Debug( - "dropped envelope", - "ch_id", chIDStr, - "priority", chPriority, - "capacity", s.capacity, - "msg_size", msgSize, - ) - } else { - // we have sufficient capacity to enqueue the incoming Envelope - s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(wEnv.size)) - s.buffer[e.channelID] = append(s.buffer[e.channelID], wEnv) - s.size += wEnv.size - } - - default: - // perform the WDRR algorithm - for _, chDesc := range s.chDescs { - chID := ChannelID(chDesc.ID) - - // only consider non-empty flows - if len(s.buffer[chID]) > 0 { - // bump flow's quantum - s.deficits[chID] += s.quanta[chID] - - // grab the flow's current deficit counter and HoQ (wrapped) Envelope - d := s.deficits[chID] - we := s.buffer[chID][0] - - // While the flow is non-empty and we can send the current Envelope - // on the dequeueCh: - // - // 1. send the Envelope - // 2. update the scheduler's shared buffer's size - // 3. update the flow's deficit - // 4. remove from the flow's queue - // 5. grab the next HoQ Envelope and flow's deficit - for len(s.buffer[chID]) > 0 && d >= we.size { - s.metrics.PeerSendBytesTotal.With( - "chID", fmt.Sprint(chID), - "peer_id", string(we.envelope.To)).Add(float64(we.size)) - s.dequeueCh <- we.envelope - s.size -= we.size - s.deficits[chID] -= we.size - s.buffer[chID] = s.buffer[chID][1:] - - if len(s.buffer[chID]) > 0 { - d = s.deficits[chID] - we = s.buffer[chID][0] - } - } - } - - // reset the flow's deficit to zero if it is empty - if len(s.buffer[chID]) == 0 { - s.deficits[chID] = 0 - } - } - } - } -} diff --git a/internal/p2p/wdrr_queue_test.go b/internal/p2p/wdrr_queue_test.go deleted file mode 100644 index d49c77e76..000000000 --- a/internal/p2p/wdrr_queue_test.go +++ /dev/null @@ -1,208 +0,0 @@ -package p2p - -import ( - "math" - "math/rand" - "testing" - "time" - - gogotypes "github.com/gogo/protobuf/types" - "github.com/stretchr/testify/require" - tmsync "github.com/tendermint/tendermint/internal/libs/sync" - "github.com/tendermint/tendermint/libs/log" -) - -type testMessage = gogotypes.StringValue - -func TestWDRRQueue_EqualWeights(t *testing.T) { - chDescs := []ChannelDescriptor{ - {ID: 0x01, Priority: 1, MaxSendBytes: 4}, - {ID: 0x02, Priority: 1, MaxSendBytes: 4}, - {ID: 0x03, Priority: 1, MaxSendBytes: 4}, - {ID: 0x04, Priority: 1, MaxSendBytes: 4}, - {ID: 0x05, Priority: 1, MaxSendBytes: 4}, - {ID: 0x06, Priority: 1, MaxSendBytes: 4}, - } - - peerQueue := newWDRRScheduler(log.NewNopLogger(), NopMetrics(), chDescs, 1000, 1000, 120) - peerQueue.start() - - totalMsgs := make(map[ChannelID]int) - deliveredMsgs := make(map[ChannelID]int) - successRates := make(map[ChannelID]float64) - - closer := tmsync.NewCloser() - - go func() { - timout := 10 * time.Second - ticker := time.NewTicker(timout) - defer ticker.Stop() - - for { - select { - case e := <-peerQueue.dequeue(): - deliveredMsgs[e.channelID]++ - ticker.Reset(timout) - - case <-ticker.C: - closer.Close() - } - } - }() - - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - maxMsgs := 5000 - minMsgs := 1000 - - for _, chDesc := range chDescs { - total := rng.Intn(maxMsgs-minMsgs) + minMsgs // total = rand[minMsgs, maxMsgs) - totalMsgs[ChannelID(chDesc.ID)] = total - - go func(cID ChannelID, n int) { - for i := 0; i < n; i++ { - peerQueue.enqueue() <- Envelope{ - channelID: cID, - Message: &testMessage{Value: "foo"}, // 5 bytes - } - } - }(ChannelID(chDesc.ID), total) - } - - // wait for dequeueing to complete - <-closer.Done() - - // close queue and wait for cleanup - peerQueue.close() - <-peerQueue.closed() - - var ( - sum float64 - stdDev float64 - ) - - for _, chDesc := range peerQueue.chDescs { - chID := ChannelID(chDesc.ID) - require.Zero(t, peerQueue.deficits[chID], "expected flow deficit to be zero") - require.Len(t, peerQueue.buffer[chID], 0, "expected flow queue to be empty") - - total := totalMsgs[chID] - delivered := deliveredMsgs[chID] - successRate := float64(delivered) / float64(total) - - sum += successRate - successRates[chID] = successRate - - // require some messages dropped - require.Less(t, delivered, total, "expected some messages to be dropped") - require.Less(t, successRate, 1.0, "expected a success rate below 100%") - } - - require.Zero(t, peerQueue.size, "expected scheduler size to be zero") - - numFlows := float64(len(peerQueue.buffer)) - mean := sum / numFlows - - for _, successRate := range successRates { - stdDev += math.Pow(successRate-mean, 2) - } - - stdDev = math.Sqrt(stdDev / numFlows) - require.Less(t, stdDev, 0.02, "expected success rate standard deviation to be less than 2%") -} - -func TestWDRRQueue_DecreasingWeights(t *testing.T) { - chDescs := []ChannelDescriptor{ - {ID: 0x01, Priority: 18, MaxSendBytes: 4}, - {ID: 0x02, Priority: 10, MaxSendBytes: 4}, - {ID: 0x03, Priority: 2, MaxSendBytes: 4}, - {ID: 0x04, Priority: 1, MaxSendBytes: 4}, - {ID: 0x05, Priority: 1, MaxSendBytes: 4}, - {ID: 0x06, Priority: 1, MaxSendBytes: 4}, - } - - peerQueue := newWDRRScheduler(log.NewNopLogger(), NopMetrics(), chDescs, 0, 0, 500) - peerQueue.start() - - totalMsgs := make(map[ChannelID]int) - deliveredMsgs := make(map[ChannelID]int) - successRates := make(map[ChannelID]float64) - - for _, chDesc := range chDescs { - total := 1000 - totalMsgs[ChannelID(chDesc.ID)] = total - - go func(cID ChannelID, n int) { - for i := 0; i < n; i++ { - peerQueue.enqueue() <- Envelope{ - channelID: cID, - Message: &testMessage{Value: "foo"}, // 5 bytes - } - } - }(ChannelID(chDesc.ID), total) - } - - closer := tmsync.NewCloser() - - go func() { - timout := 20 * time.Second - ticker := time.NewTicker(timout) - defer ticker.Stop() - - for { - select { - case e := <-peerQueue.dequeue(): - deliveredMsgs[e.channelID]++ - ticker.Reset(timout) - - case <-ticker.C: - closer.Close() - } - } - }() - - // wait for dequeueing to complete - <-closer.Done() - - // close queue and wait for cleanup - peerQueue.close() - <-peerQueue.closed() - - for i, chDesc := range peerQueue.chDescs { - chID := ChannelID(chDesc.ID) - require.Zero(t, peerQueue.deficits[chID], "expected flow deficit to be zero") - require.Len(t, peerQueue.buffer[chID], 0, "expected flow queue to be empty") - - total := totalMsgs[chID] - delivered := deliveredMsgs[chID] - successRate := float64(delivered) / float64(total) - - successRates[chID] = successRate - - // Require some messages dropped. Note, the top weighted flows may not have - // any dropped if lower priority non-empty queues always exist. - if i > 2 { - require.Less(t, delivered, total, "expected some messages to be dropped") - require.Less(t, successRate, 1.0, "expected a success rate below 100%") - } - } - - require.Zero(t, peerQueue.size, "expected scheduler size to be zero") - - // require channel 0x01 to have the highest success rate due to its weight - ch01Rate := successRates[ChannelID(chDescs[0].ID)] - for i := 1; i < len(chDescs); i++ { - require.GreaterOrEqual(t, ch01Rate, successRates[ChannelID(chDescs[i].ID)]) - } - - // require channel 0x02 to have the 2nd highest success rate due to its weight - ch02Rate := successRates[ChannelID(chDescs[1].ID)] - for i := 2; i < len(chDescs); i++ { - require.GreaterOrEqual(t, ch02Rate, successRates[ChannelID(chDescs[i].ID)]) - } - - // require channel 0x03 to have the 3rd highest success rate due to its weight - ch03Rate := successRates[ChannelID(chDescs[2].ID)] - for i := 3; i < len(chDescs); i++ { - require.GreaterOrEqual(t, ch03Rate, successRates[ChannelID(chDescs[i].ID)]) - } -} diff --git a/test/e2e/generator/generate.go b/test/e2e/generator/generate.go index 0a2d39665..21b3ebe0a 100644 --- a/test/e2e/generator/generate.go +++ b/test/e2e/generator/generate.go @@ -15,7 +15,7 @@ var ( // separate testnet for each combination (Cartesian product) of options. testnetCombinations = map[string][]interface{}{ "topology": {"single", "quad", "large"}, - "queueType": {"priority"}, // "fifo", "wdrr" + "queueType": {"priority"}, // "fifo" "initialHeight": {0, 1000}, "initialState": { map[string]string{}, diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index 3a75b169e..1daa8b87c 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -347,7 +347,7 @@ func (n Node) Validate(testnet Testnet) error { return fmt.Errorf("invalid mempool version %q", n.Mempool) } switch n.QueueType { - case "", "priority", "wdrr", "fifo": + case "", "priority", "fifo": default: return fmt.Errorf("unsupported p2p queue type: %s", n.QueueType) }