Browse Source

p2p: remove wdrr queue (#7064)

This code hasn't been battle tested, and seems to have grown
increasingly flaky int tests. Given our general direction of reducing
queue complexity over the next couple of releases I think it makes
sense to remove it.
pull/7062/head
Sam Kleinman 3 years ago
committed by GitHub
parent
commit
3ea81bfaa7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 11 additions and 522 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +1
    -1
      config/config.go
  3. +3
    -0
      internal/p2p/pqueue_test.go
  4. +4
    -16
      internal/p2p/router.go
  5. +0
    -8
      internal/p2p/router_init_test.go
  6. +0
    -287
      internal/p2p/wdrr_queue.go
  7. +0
    -208
      internal/p2p/wdrr_queue_test.go
  8. +1
    -1
      test/e2e/generator/generate.go
  9. +1
    -1
      test/e2e/pkg/testnet.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -24,6 +24,7 @@ Special thanks to external contributors on this release:
- [blocksync] \#7046 Remove v2 implementation of the blocksync
service and recactor, which was disabled in the previous release
(@tychoish)
- [p2p] \#7064 Remove WDRR queue implementation. (@tychoish)
- Blockchain Protocol


+ 1
- 1
config/config.go View File

@ -710,7 +710,7 @@ type P2PConfig struct { //nolint: maligned
TestDialFail bool `mapstructure:"test-dial-fail"`
// Makes it possible to configure which queue backend the p2p
// layer uses. Options are: "fifo", "priority" and "wdrr",
// layer uses. Options are: "fifo" and "priority",
// with the default being "priority".
QueueType string `mapstructure:"queue-type"`
}


+ 3
- 0
internal/p2p/pqueue_test.go View File

@ -4,9 +4,12 @@ import (
"testing"
"time"
gogotypes "github.com/gogo/protobuf/types"
"github.com/tendermint/tendermint/libs/log"
)
type testMessage = gogotypes.StringValue
func TestCloseWhileDequeueFull(t *testing.T) {
enqueueLength := 5
chDescs := []ChannelDescriptor{


+ 4
- 16
internal/p2p/router.go View File

@ -133,8 +133,8 @@ type RouterOptions struct {
// no timeout.
HandshakeTimeout time.Duration
// QueueType must be "wdrr" (Weighed Deficit Round Robin), "priority", or
// "fifo". Defaults to "fifo".
// QueueType must be, "priority", or "fifo". Defaults to
// "fifo".
QueueType string
// MaxIncomingConnectionAttempts rate limits the number of incoming connection
@ -176,7 +176,6 @@ type RouterOptions struct {
const (
queueTypeFifo = "fifo"
queueTypePriority = "priority"
queueTypeWDRR = "wdrr"
)
// Validate validates router options.
@ -184,8 +183,8 @@ func (o *RouterOptions) Validate() error {
switch o.QueueType {
case "":
o.QueueType = queueTypeFifo
case queueTypeFifo, queueTypeWDRR, queueTypePriority:
// passI me
case queueTypeFifo, queueTypePriority:
// pass
default:
return fmt.Errorf("queue type %q is not supported", o.QueueType)
}
@ -347,17 +346,6 @@ func (r *Router) createQueueFactory() (func(int) queue, error) {
return q
}, nil
case queueTypeWDRR:
return func(size int) queue {
if size%2 != 0 {
size++
}
q := newWDRRScheduler(r.logger, r.metrics, r.chDescs, uint(size)/2, uint(size)/2, defaultCapacity)
q.start()
return q
}, nil
default:
return nil, fmt.Errorf("cannot construct queue of type %q", r.options.QueueType)
}


+ 0
- 8
internal/p2p/router_init_test.go View File

@ -38,14 +38,6 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
require.True(t, ok)
defer q.close()
})
t.Run("WDRR", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypeWDRR}
r, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, opts)
require.NoError(t, err)
q, ok := r.queueFactory(1).(*wdrrScheduler)
require.True(t, ok)
defer q.close()
})
t.Run("NonExistant", func(t *testing.T) {
opts := RouterOptions{QueueType: "fast"}
_, err := NewRouter(log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, opts)


+ 0
- 287
internal/p2p/wdrr_queue.go View File

@ -1,287 +0,0 @@
package p2p
import (
"fmt"
"sort"
"strconv"
"github.com/gogo/protobuf/proto"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/log"
)
// wrappedEnvelope wraps a p2p Envelope with its precomputed size.
type wrappedEnvelope struct {
envelope Envelope
size uint
}
// assert the WDRR scheduler implements the queue interface at compile-time
var _ queue = (*wdrrScheduler)(nil)
// wdrrQueue implements a Weighted Deficit Round Robin (WDRR) scheduling
// algorithm via the queue interface. A WDRR queue is created per peer, where
// the queue will have N number of flows. Each flow corresponds to a p2p Channel,
// so there are n input flows and a single output source, the peer's connection.
//
// The WDRR scheduler contains a shared buffer with a fixed capacity.
//
// Each flow has the following:
// - quantum: The number of bytes that is added to the deficit counter of the
// flow in each round. The flow can send at most quantum bytes at a time. Each
// flow has its own unique quantum, which gives the queue its weighted nature.
// A higher quantum corresponds to a higher weight/priority. The quantum is
// computed as MaxSendBytes * Priority.
// - deficit counter: The number of bytes that the flow is allowed to transmit
// when it is its turn.
//
// See: https://en.wikipedia.org/wiki/Deficit_round_robin
type wdrrScheduler struct {
logger log.Logger
metrics *Metrics
chDescs []ChannelDescriptor
capacity uint
size uint
chPriorities map[ChannelID]uint
buffer map[ChannelID][]wrappedEnvelope
quanta map[ChannelID]uint
deficits map[ChannelID]uint
closer *tmsync.Closer
doneCh *tmsync.Closer
enqueueCh chan Envelope
dequeueCh chan Envelope
}
func newWDRRScheduler(
logger log.Logger,
m *Metrics,
chDescs []ChannelDescriptor,
enqueueBuf, dequeueBuf, capacity uint,
) *wdrrScheduler {
// copy each ChannelDescriptor and sort them by channel priority
chDescsCopy := make([]ChannelDescriptor, len(chDescs))
copy(chDescsCopy, chDescs)
sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority > chDescsCopy[j].Priority })
var (
buffer = make(map[ChannelID][]wrappedEnvelope)
chPriorities = make(map[ChannelID]uint)
quanta = make(map[ChannelID]uint)
deficits = make(map[ChannelID]uint)
)
for _, chDesc := range chDescsCopy {
chID := ChannelID(chDesc.ID)
chPriorities[chID] = uint(chDesc.Priority)
buffer[chID] = make([]wrappedEnvelope, 0)
quanta[chID] = chDesc.MaxSendBytes * uint(chDesc.Priority)
}
return &wdrrScheduler{
logger: logger.With("queue", "wdrr"),
metrics: m,
capacity: capacity,
chPriorities: chPriorities,
chDescs: chDescsCopy,
buffer: buffer,
quanta: quanta,
deficits: deficits,
closer: tmsync.NewCloser(),
doneCh: tmsync.NewCloser(),
enqueueCh: make(chan Envelope, enqueueBuf),
dequeueCh: make(chan Envelope, dequeueBuf),
}
}
// enqueue returns an unbuffered write-only channel which a producer can send on.
func (s *wdrrScheduler) enqueue() chan<- Envelope {
return s.enqueueCh
}
// dequeue returns an unbuffered read-only channel which a consumer can read from.
func (s *wdrrScheduler) dequeue() <-chan Envelope {
return s.dequeueCh
}
func (s *wdrrScheduler) closed() <-chan struct{} {
return s.closer.Done()
}
// close closes the WDRR queue. After this call enqueue() will block, so the
// caller must select on closed() as well to avoid blocking forever. The
// enqueue() and dequeue() along with the internal channels will NOT be closed.
// Note, close() will block until all externally spawned goroutines have exited.
func (s *wdrrScheduler) close() {
s.closer.Close()
<-s.doneCh.Done()
}
// start starts the WDRR queue process in a blocking goroutine. This must be
// called before the queue can start to process and accept Envelopes.
func (s *wdrrScheduler) start() {
go s.process()
}
// process starts a blocking WDRR scheduler process, where we continuously
// evaluate if we need to attempt to enqueue an Envelope or schedule Envelopes
// to be dequeued and subsequently read and sent on the source connection.
// Internally, each p2p Channel maps to a flow, where each flow has a deficit
// and a quantum.
//
// For each Envelope requested to be enqueued, we evaluate if there is sufficient
// capacity in the shared buffer to add the Envelope. If so, it is added.
// Otherwise, we evaluate all flows of lower priority where we attempt find an
// existing Envelope in the shared buffer of sufficient size that can be dropped
// in place of the incoming Envelope. If there is no such Envelope that can be
// dropped, then the incoming Envelope is dropped.
//
// When there is nothing to be enqueued, we perform the WDRR algorithm and
// determine which Envelopes can be dequeued. For each Envelope that can be
// dequeued, it is sent on the dequeueCh. Specifically, for each flow, if it is
// non-empty, its deficit counter is incremented by its quantum value. Then, the
// value of the deficit counter is a maximal amount of bytes that can be sent at
// this round. If the deficit counter is greater than the Envelopes's message
// size at the head of the queue (HoQ), this envelope can be sent and the value
// of the counter is decremented by the message's size. Then, the size of the
// next Envelopes's message is compared to the counter value, etc. Once the flow
// is empty or the value of the counter is insufficient, the scheduler will skip
// to the next flow. If the flow is empty, the value of the deficit counter is
// reset to 0.
//
// XXX/TODO: Evaluate the single goroutine scheduler mechanism. In other words,
// evaluate the effectiveness and performance of having a single goroutine
// perform handling both enqueueing and dequeueing logic. Specifically, there
// is potentially contention between reading off of enqueueCh and trying to
// enqueue while also attempting to perform the WDRR algorithm and find the next
// set of Envelope(s) to send on the dequeueCh. Alternatively, we could consider
// separate scheduling goroutines, but then that requires the use of mutexes and
// possibly a degrading performance.
func (s *wdrrScheduler) process() {
defer s.doneCh.Close()
for {
select {
case <-s.closer.Done():
return
case e := <-s.enqueueCh:
// attempt to enqueue the incoming Envelope
chIDStr := strconv.Itoa(int(e.channelID))
wEnv := wrappedEnvelope{envelope: e, size: uint(proto.Size(e.Message))}
msgSize := wEnv.size
s.metrics.PeerPendingSendBytes.With("peer_id", string(e.To)).Add(float64(msgSize))
// If we're at capacity, we need to either drop the incoming Envelope or
// an Envelope from a lower priority flow. Otherwise, we add the (wrapped)
// envelope to the flow's queue.
if s.size+wEnv.size > s.capacity {
chPriority := s.chPriorities[e.channelID]
var (
canDrop bool
dropIdx int
dropChID ChannelID
)
// Evaluate all lower priority flows and determine if there exists an
// Envelope that is of equal or greater size that we can drop in favor
// of the incoming Envelope.
for i := len(s.chDescs) - 1; i >= 0 && uint(s.chDescs[i].Priority) < chPriority && !canDrop; i-- {
currChID := ChannelID(s.chDescs[i].ID)
flow := s.buffer[currChID]
for j := 0; j < len(flow) && !canDrop; j++ {
if flow[j].size >= wEnv.size {
canDrop = true
dropIdx = j
dropChID = currChID
break
}
}
}
// If we can drop an existing Envelope, drop it and enqueue the incoming
// Envelope.
if canDrop {
chIDStr = strconv.Itoa(int(dropChID))
chPriority = s.chPriorities[dropChID]
msgSize = s.buffer[dropChID][dropIdx].size
// Drop Envelope for the lower priority flow and update the queue's
// buffer size
s.size -= msgSize
s.buffer[dropChID] = append(s.buffer[dropChID][:dropIdx], s.buffer[dropChID][dropIdx+1:]...)
// add the incoming Envelope and update queue's buffer size
s.size += wEnv.size
s.buffer[e.channelID] = append(s.buffer[e.channelID], wEnv)
s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(wEnv.size))
}
// We either dropped the incoming Enevelope or one from an existing
// lower priority flow.
s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1)
s.logger.Debug(
"dropped envelope",
"ch_id", chIDStr,
"priority", chPriority,
"capacity", s.capacity,
"msg_size", msgSize,
)
} else {
// we have sufficient capacity to enqueue the incoming Envelope
s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(wEnv.size))
s.buffer[e.channelID] = append(s.buffer[e.channelID], wEnv)
s.size += wEnv.size
}
default:
// perform the WDRR algorithm
for _, chDesc := range s.chDescs {
chID := ChannelID(chDesc.ID)
// only consider non-empty flows
if len(s.buffer[chID]) > 0 {
// bump flow's quantum
s.deficits[chID] += s.quanta[chID]
// grab the flow's current deficit counter and HoQ (wrapped) Envelope
d := s.deficits[chID]
we := s.buffer[chID][0]
// While the flow is non-empty and we can send the current Envelope
// on the dequeueCh:
//
// 1. send the Envelope
// 2. update the scheduler's shared buffer's size
// 3. update the flow's deficit
// 4. remove from the flow's queue
// 5. grab the next HoQ Envelope and flow's deficit
for len(s.buffer[chID]) > 0 && d >= we.size {
s.metrics.PeerSendBytesTotal.With(
"chID", fmt.Sprint(chID),
"peer_id", string(we.envelope.To)).Add(float64(we.size))
s.dequeueCh <- we.envelope
s.size -= we.size
s.deficits[chID] -= we.size
s.buffer[chID] = s.buffer[chID][1:]
if len(s.buffer[chID]) > 0 {
d = s.deficits[chID]
we = s.buffer[chID][0]
}
}
}
// reset the flow's deficit to zero if it is empty
if len(s.buffer[chID]) == 0 {
s.deficits[chID] = 0
}
}
}
}
}

+ 0
- 208
internal/p2p/wdrr_queue_test.go View File

@ -1,208 +0,0 @@
package p2p
import (
"math"
"math/rand"
"testing"
"time"
gogotypes "github.com/gogo/protobuf/types"
"github.com/stretchr/testify/require"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/log"
)
type testMessage = gogotypes.StringValue
func TestWDRRQueue_EqualWeights(t *testing.T) {
chDescs := []ChannelDescriptor{
{ID: 0x01, Priority: 1, MaxSendBytes: 4},
{ID: 0x02, Priority: 1, MaxSendBytes: 4},
{ID: 0x03, Priority: 1, MaxSendBytes: 4},
{ID: 0x04, Priority: 1, MaxSendBytes: 4},
{ID: 0x05, Priority: 1, MaxSendBytes: 4},
{ID: 0x06, Priority: 1, MaxSendBytes: 4},
}
peerQueue := newWDRRScheduler(log.NewNopLogger(), NopMetrics(), chDescs, 1000, 1000, 120)
peerQueue.start()
totalMsgs := make(map[ChannelID]int)
deliveredMsgs := make(map[ChannelID]int)
successRates := make(map[ChannelID]float64)
closer := tmsync.NewCloser()
go func() {
timout := 10 * time.Second
ticker := time.NewTicker(timout)
defer ticker.Stop()
for {
select {
case e := <-peerQueue.dequeue():
deliveredMsgs[e.channelID]++
ticker.Reset(timout)
case <-ticker.C:
closer.Close()
}
}
}()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
maxMsgs := 5000
minMsgs := 1000
for _, chDesc := range chDescs {
total := rng.Intn(maxMsgs-minMsgs) + minMsgs // total = rand[minMsgs, maxMsgs)
totalMsgs[ChannelID(chDesc.ID)] = total
go func(cID ChannelID, n int) {
for i := 0; i < n; i++ {
peerQueue.enqueue() <- Envelope{
channelID: cID,
Message: &testMessage{Value: "foo"}, // 5 bytes
}
}
}(ChannelID(chDesc.ID), total)
}
// wait for dequeueing to complete
<-closer.Done()
// close queue and wait for cleanup
peerQueue.close()
<-peerQueue.closed()
var (
sum float64
stdDev float64
)
for _, chDesc := range peerQueue.chDescs {
chID := ChannelID(chDesc.ID)
require.Zero(t, peerQueue.deficits[chID], "expected flow deficit to be zero")
require.Len(t, peerQueue.buffer[chID], 0, "expected flow queue to be empty")
total := totalMsgs[chID]
delivered := deliveredMsgs[chID]
successRate := float64(delivered) / float64(total)
sum += successRate
successRates[chID] = successRate
// require some messages dropped
require.Less(t, delivered, total, "expected some messages to be dropped")
require.Less(t, successRate, 1.0, "expected a success rate below 100%")
}
require.Zero(t, peerQueue.size, "expected scheduler size to be zero")
numFlows := float64(len(peerQueue.buffer))
mean := sum / numFlows
for _, successRate := range successRates {
stdDev += math.Pow(successRate-mean, 2)
}
stdDev = math.Sqrt(stdDev / numFlows)
require.Less(t, stdDev, 0.02, "expected success rate standard deviation to be less than 2%")
}
func TestWDRRQueue_DecreasingWeights(t *testing.T) {
chDescs := []ChannelDescriptor{
{ID: 0x01, Priority: 18, MaxSendBytes: 4},
{ID: 0x02, Priority: 10, MaxSendBytes: 4},
{ID: 0x03, Priority: 2, MaxSendBytes: 4},
{ID: 0x04, Priority: 1, MaxSendBytes: 4},
{ID: 0x05, Priority: 1, MaxSendBytes: 4},
{ID: 0x06, Priority: 1, MaxSendBytes: 4},
}
peerQueue := newWDRRScheduler(log.NewNopLogger(), NopMetrics(), chDescs, 0, 0, 500)
peerQueue.start()
totalMsgs := make(map[ChannelID]int)
deliveredMsgs := make(map[ChannelID]int)
successRates := make(map[ChannelID]float64)
for _, chDesc := range chDescs {
total := 1000
totalMsgs[ChannelID(chDesc.ID)] = total
go func(cID ChannelID, n int) {
for i := 0; i < n; i++ {
peerQueue.enqueue() <- Envelope{
channelID: cID,
Message: &testMessage{Value: "foo"}, // 5 bytes
}
}
}(ChannelID(chDesc.ID), total)
}
closer := tmsync.NewCloser()
go func() {
timout := 20 * time.Second
ticker := time.NewTicker(timout)
defer ticker.Stop()
for {
select {
case e := <-peerQueue.dequeue():
deliveredMsgs[e.channelID]++
ticker.Reset(timout)
case <-ticker.C:
closer.Close()
}
}
}()
// wait for dequeueing to complete
<-closer.Done()
// close queue and wait for cleanup
peerQueue.close()
<-peerQueue.closed()
for i, chDesc := range peerQueue.chDescs {
chID := ChannelID(chDesc.ID)
require.Zero(t, peerQueue.deficits[chID], "expected flow deficit to be zero")
require.Len(t, peerQueue.buffer[chID], 0, "expected flow queue to be empty")
total := totalMsgs[chID]
delivered := deliveredMsgs[chID]
successRate := float64(delivered) / float64(total)
successRates[chID] = successRate
// Require some messages dropped. Note, the top weighted flows may not have
// any dropped if lower priority non-empty queues always exist.
if i > 2 {
require.Less(t, delivered, total, "expected some messages to be dropped")
require.Less(t, successRate, 1.0, "expected a success rate below 100%")
}
}
require.Zero(t, peerQueue.size, "expected scheduler size to be zero")
// require channel 0x01 to have the highest success rate due to its weight
ch01Rate := successRates[ChannelID(chDescs[0].ID)]
for i := 1; i < len(chDescs); i++ {
require.GreaterOrEqual(t, ch01Rate, successRates[ChannelID(chDescs[i].ID)])
}
// require channel 0x02 to have the 2nd highest success rate due to its weight
ch02Rate := successRates[ChannelID(chDescs[1].ID)]
for i := 2; i < len(chDescs); i++ {
require.GreaterOrEqual(t, ch02Rate, successRates[ChannelID(chDescs[i].ID)])
}
// require channel 0x03 to have the 3rd highest success rate due to its weight
ch03Rate := successRates[ChannelID(chDescs[2].ID)]
for i := 3; i < len(chDescs); i++ {
require.GreaterOrEqual(t, ch03Rate, successRates[ChannelID(chDescs[i].ID)])
}
}

+ 1
- 1
test/e2e/generator/generate.go View File

@ -15,7 +15,7 @@ var (
// separate testnet for each combination (Cartesian product) of options.
testnetCombinations = map[string][]interface{}{
"topology": {"single", "quad", "large"},
"queueType": {"priority"}, // "fifo", "wdrr"
"queueType": {"priority"}, // "fifo"
"initialHeight": {0, 1000},
"initialState": {
map[string]string{},


+ 1
- 1
test/e2e/pkg/testnet.go View File

@ -347,7 +347,7 @@ func (n Node) Validate(testnet Testnet) error {
return fmt.Errorf("invalid mempool version %q", n.Mempool)
}
switch n.QueueType {
case "", "priority", "wdrr", "fifo":
case "", "priority", "fifo":
default:
return fmt.Errorf("unsupported p2p queue type: %s", n.QueueType)
}


Loading…
Cancel
Save