You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

208 lines
5.5 KiB

package p2p
import (
"math"
"math/rand"
"testing"
"time"
gogotypes "github.com/gogo/protobuf/types"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
tmsync "github.com/tendermint/tendermint/libs/sync"
)
type testMessage = gogotypes.StringValue
func TestWDRRQueue_EqualWeights(t *testing.T) {
chDescs := []ChannelDescriptor{
{ID: 0x01, Priority: 1, MaxSendBytes: 4},
{ID: 0x02, Priority: 1, MaxSendBytes: 4},
{ID: 0x03, Priority: 1, MaxSendBytes: 4},
{ID: 0x04, Priority: 1, MaxSendBytes: 4},
{ID: 0x05, Priority: 1, MaxSendBytes: 4},
{ID: 0x06, Priority: 1, MaxSendBytes: 4},
}
peerQueue := newWDRRScheduler(log.NewNopLogger(), NopMetrics(), chDescs, 1000, 1000, 120)
peerQueue.start()
totalMsgs := make(map[ChannelID]int)
deliveredMsgs := make(map[ChannelID]int)
successRates := make(map[ChannelID]float64)
closer := tmsync.NewCloser()
go func() {
timout := 10 * time.Second
ticker := time.NewTicker(timout)
defer ticker.Stop()
for {
select {
case e := <-peerQueue.dequeue():
deliveredMsgs[e.channelID]++
ticker.Reset(timout)
case <-ticker.C:
closer.Close()
}
}
}()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
maxMsgs := 5000
minMsgs := 1000
for _, chDesc := range chDescs {
total := rng.Intn(maxMsgs-minMsgs) + minMsgs // total = rand[minMsgs, maxMsgs)
totalMsgs[ChannelID(chDesc.ID)] = total
go func(cID ChannelID, n int) {
for i := 0; i < n; i++ {
peerQueue.enqueue() <- Envelope{
channelID: cID,
Message: &testMessage{Value: "foo"}, // 5 bytes
}
}
}(ChannelID(chDesc.ID), total)
}
// wait for dequeueing to complete
<-closer.Done()
// close queue and wait for cleanup
peerQueue.close()
<-peerQueue.closed()
var (
sum float64
stdDev float64
)
for _, chDesc := range peerQueue.chDescs {
chID := ChannelID(chDesc.ID)
require.Zero(t, peerQueue.deficits[chID], "expected flow deficit to be zero")
require.Len(t, peerQueue.buffer[chID], 0, "expected flow queue to be empty")
total := totalMsgs[chID]
delivered := deliveredMsgs[chID]
successRate := float64(delivered) / float64(total)
sum += successRate
successRates[chID] = successRate
// require some messages dropped
require.Less(t, delivered, total, "expected some messages to be dropped")
require.Less(t, successRate, 1.0, "expected a success rate below 100%")
}
require.Zero(t, peerQueue.size, "expected scheduler size to be zero")
numFlows := float64(len(peerQueue.buffer))
mean := sum / numFlows
for _, successRate := range successRates {
stdDev += math.Pow(successRate-mean, 2)
}
stdDev = math.Sqrt(stdDev / numFlows)
require.Less(t, stdDev, 0.02, "expected success rate standard deviation to be less than 2%")
}
func TestWDRRQueue_DecreasingWeights(t *testing.T) {
chDescs := []ChannelDescriptor{
{ID: 0x01, Priority: 18, MaxSendBytes: 4},
{ID: 0x02, Priority: 10, MaxSendBytes: 4},
{ID: 0x03, Priority: 2, MaxSendBytes: 4},
{ID: 0x04, Priority: 1, MaxSendBytes: 4},
{ID: 0x05, Priority: 1, MaxSendBytes: 4},
{ID: 0x06, Priority: 1, MaxSendBytes: 4},
}
peerQueue := newWDRRScheduler(log.NewNopLogger(), NopMetrics(), chDescs, 0, 0, 500)
peerQueue.start()
totalMsgs := make(map[ChannelID]int)
deliveredMsgs := make(map[ChannelID]int)
successRates := make(map[ChannelID]float64)
for _, chDesc := range chDescs {
total := 1000
totalMsgs[ChannelID(chDesc.ID)] = total
go func(cID ChannelID, n int) {
for i := 0; i < n; i++ {
peerQueue.enqueue() <- Envelope{
channelID: cID,
Message: &testMessage{Value: "foo"}, // 5 bytes
}
}
}(ChannelID(chDesc.ID), total)
}
closer := tmsync.NewCloser()
go func() {
timout := 20 * time.Second
ticker := time.NewTicker(timout)
defer ticker.Stop()
for {
select {
case e := <-peerQueue.dequeue():
deliveredMsgs[e.channelID]++
ticker.Reset(timout)
case <-ticker.C:
closer.Close()
}
}
}()
// wait for dequeueing to complete
<-closer.Done()
// close queue and wait for cleanup
peerQueue.close()
<-peerQueue.closed()
for i, chDesc := range peerQueue.chDescs {
chID := ChannelID(chDesc.ID)
require.Zero(t, peerQueue.deficits[chID], "expected flow deficit to be zero")
require.Len(t, peerQueue.buffer[chID], 0, "expected flow queue to be empty")
total := totalMsgs[chID]
delivered := deliveredMsgs[chID]
successRate := float64(delivered) / float64(total)
successRates[chID] = successRate
// Require some messages dropped. Note, the top weighted flows may not have
// any dropped if lower priority non-empty queues always exist.
if i > 2 {
require.Less(t, delivered, total, "expected some messages to be dropped")
require.Less(t, successRate, 1.0, "expected a success rate below 100%")
}
}
require.Zero(t, peerQueue.size, "expected scheduler size to be zero")
// require channel 0x01 to have the highest success rate due to its weight
ch01Rate := successRates[ChannelID(chDescs[0].ID)]
for i := 1; i < len(chDescs); i++ {
require.GreaterOrEqual(t, ch01Rate, successRates[ChannelID(chDescs[i].ID)])
}
// require channel 0x02 to have the 2nd highest success rate due to its weight
ch02Rate := successRates[ChannelID(chDescs[1].ID)]
for i := 2; i < len(chDescs); i++ {
require.GreaterOrEqual(t, ch02Rate, successRates[ChannelID(chDescs[i].ID)])
}
// require channel 0x03 to have the 3rd highest success rate due to its weight
ch03Rate := successRates[ChannelID(chDescs[2].ID)]
for i := 3; i < len(chDescs); i++ {
require.GreaterOrEqual(t, ch03Rate, successRates[ChannelID(chDescs[i].ID)])
}
}