Browse Source

p2p: revised router message scheduling (#6126)

pull/6281/head
Aleksandr Bezobchuk 4 years ago
committed by GitHub
parent
commit
a554005136
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1277 additions and 88 deletions
  1. +2
    -0
      blockchain/v0/reactor.go
  2. +8
    -0
      consensus/reactor.go
  3. +2
    -0
      evidence/reactor.go
  4. +2
    -0
      mempool/reactor.go
  5. +18
    -3
      mempool/reactor_test.go
  6. +37
    -9
      node/node.go
  7. +12
    -3
      p2p/conn/connection.go
  8. +66
    -5
      p2p/metrics.go
  9. +9
    -2
      p2p/p2ptest/network.go
  10. +2
    -0
      p2p/pex/pex_reactor.go
  11. +277
    -0
      p2p/pqueue.go
  12. +9
    -12
      p2p/queue.go
  13. +163
    -28
      p2p/router.go
  14. +62
    -0
      p2p/router_init_test.go
  15. +72
    -15
      p2p/router_test.go
  16. +283
    -0
      p2p/wdrr_queue.go
  17. +208
    -0
      p2p/wdrr_queue_test.go
  18. +4
    -0
      statesync/reactor.go
  19. +2
    -0
      test/e2e/docker/Dockerfile
  20. +11
    -6
      test/e2e/runner/load.go
  21. +16
    -5
      test/e2e/runner/main.go
  22. +12
    -0
      test/e2e/runner/setup.go

+ 2
- 0
blockchain/v0/reactor.go View File

@ -34,6 +34,8 @@ var (
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: bc.MaxMsgSize,
MaxSendBytes: 100,
},
},
}


+ 8
- 0
consensus/reactor.go View File

@ -36,6 +36,8 @@ var (
Priority: 6,
SendQueueCapacity: 100,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 12000,
},
},
DataChannel: {
@ -49,6 +51,8 @@ var (
SendQueueCapacity: 100,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 40000,
},
},
VoteChannel: {
@ -59,6 +63,8 @@ var (
SendQueueCapacity: 100,
RecvBufferCapacity: 100 * 100,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 150,
},
},
VoteSetBitsChannel: {
@ -69,6 +75,8 @@ var (
SendQueueCapacity: 2,
RecvBufferCapacity: 1024,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 50,
},
},
}


+ 2
- 0
evidence/reactor.go View File

@ -31,6 +31,8 @@ var (
ID: byte(EvidenceChannel),
Priority: 6,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 400,
},
},
}


+ 2
- 0
mempool/reactor.go View File

@ -116,6 +116,8 @@ func GetChannelShims(config *cfg.MempoolConfig) map[p2p.ChannelID]*p2p.ChannelDe
ID: byte(MempoolChannel),
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
MaxSendBytes: 5000,
},
},
}


+ 18
- 3
mempool/reactor_test.go View File

@ -110,8 +110,8 @@ func (rts *reactorTestSuite) assertMempoolChannelsDrained(t *testing.T) {
require.False(t, r.IsRunning(), "reactor %s did not stop", id)
}
for id, mch := range rts.mempoolChnnels {
require.Empty(t, mch.Out, "checking channel %q", id)
for _, mch := range rts.mempoolChnnels {
require.Empty(t, mch.Out, "checking channel %q (len=%d)", mch.ID, len(mch.Out))
}
}
@ -314,7 +314,7 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
// we're creating a single node network, but not starting the
// network.
rts := setup(t, config.Mempool, 1, 0)
rts := setup(t, config.Mempool, 1, maxActiveIDs+1)
nodeID := rts.nodes[0]
@ -336,6 +336,21 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
}
}
require.Eventually(
t,
func() bool {
for _, mch := range rts.mempoolChnnels {
if len(mch.Out) > 0 {
return false
}
}
return true
},
time.Minute,
10*time.Millisecond,
)
rts.assertMempoolChannelsDrained(t)
}


+ 37
- 9
node/node.go View File

@ -54,12 +54,19 @@ import (
"github.com/tendermint/tendermint/version"
)
var useLegacyP2P = true
var (
useLegacyP2P = true
p2pRouterQueueType string
)
func init() {
if v := os.Getenv("TM_LEGACY_P2P"); len(v) > 0 {
useLegacyP2P, _ = strconv.ParseBool(v)
}
if v := os.Getenv("TM_P2P_QUEUE"); len(v) > 0 {
p2pRouterQueueType = v
}
}
// DBContext specifies config information for loading a new DB.
@ -628,15 +635,26 @@ func createPeerManager(config *cfg.Config, p2pLogger log.Logger, nodeID p2p.Node
func createRouter(
p2pLogger log.Logger,
p2pMetrics *p2p.Metrics,
nodeInfo p2p.NodeInfo,
privKey crypto.PrivKey,
peerManager *p2p.PeerManager,
transport p2p.Transport,
) (*p2p.Router, error) {
return p2p.NewRouter(p2pLogger, nodeInfo, privKey, peerManager, []p2p.Transport{transport}, p2p.RouterOptions{})
return p2p.NewRouter(
p2pLogger,
p2pMetrics,
nodeInfo,
privKey,
peerManager,
[]p2p.Transport{transport},
p2p.RouterOptions{QueueType: p2pRouterQueueType},
)
}
func createSwitch(config *cfg.Config,
func createSwitch(
config *cfg.Config,
transport p2p.Transport,
p2pMetrics *p2p.Metrics,
mempoolReactor *p2p.ReactorShim,
@ -781,7 +799,7 @@ func createPEXReactorV2(
router *p2p.Router,
) (*pex.ReactorV2, error) {
channel, err := router.OpenChannel(p2p.ChannelID(pex.PexChannel), &protop2p.PexMessage{}, 0)
channel, err := router.OpenChannel(p2p.ChannelID(pex.PexChannel), &protop2p.PexMessage{}, 4096)
if err != nil {
return nil, err
}
@ -897,7 +915,7 @@ func NewSeedNode(config *cfg.Config,
return nil, fmt.Errorf("failed to create peer manager: %w", err)
}
router, err := createRouter(p2pLogger, nodeInfo, nodeKey.PrivKey, peerManager, transport)
router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey, peerManager, transport)
if err != nil {
return nil, fmt.Errorf("failed to create router: %w", err)
}
@ -1057,12 +1075,13 @@ func NewNode(config *cfg.Config,
return nil, fmt.Errorf("failed to create peer manager: %w", err)
}
router, err := createRouter(p2pLogger, nodeInfo, nodeKey.PrivKey, peerManager, transport)
csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)
router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey, peerManager, transport)
if err != nil {
return nil, fmt.Errorf("failed to create router: %w", err)
}
csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)
mpReactorShim, mpReactor, mempool := createMempoolReactor(
config, proxyApp, state, memplMetrics, peerManager, router, logger,
)
@ -1148,11 +1167,18 @@ func NewNode(config *cfg.Config,
config.StateSync.TempDir,
)
// Setup Transport and Switch.
router.AddChannelDescriptors(mpReactorShim.GetChannels())
router.AddChannelDescriptors(bcReactorForSwitch.GetChannels())
router.AddChannelDescriptors(csReactorShim.GetChannels())
router.AddChannelDescriptors(evReactorShim.GetChannels())
router.AddChannelDescriptors(stateSyncReactorShim.GetChannels())
// setup Transport and Switch
sw := createSwitch(
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
@ -1191,6 +1217,8 @@ func NewNode(config *cfg.Config,
if err != nil {
return nil, err
}
router.AddChannelDescriptors(pexReactor.GetChannels())
}
if config.RPC.PprofListenAddress != "" {
@ -1940,7 +1968,7 @@ func makeChannelsFromShims(
channels := map[p2p.ChannelID]*p2p.Channel{}
for chID, chShim := range chShims {
ch, err := router.OpenChannel(chID, chShim.MsgType, 0)
ch, err := router.OpenChannel(chID, chShim.MsgType, chShim.Descriptor.RecvBufferCapacity)
if err != nil {
panic(fmt.Sprintf("failed to open channel %v: %v", chID, err))
}


+ 12
- 3
p2p/conn/connection.go View File

@ -718,11 +718,20 @@ func (c *MConnection) Status() ConnectionStatus {
//-----------------------------------------------------------------------------
type ChannelDescriptor struct {
ID byte
Priority int
ID byte
Priority int
// TODO: Remove once p2p refactor is complete.
SendQueueCapacity int
RecvBufferCapacity int
RecvMessageCapacity int
// RecvBufferCapacity defines the max buffer size of inbound messages for a
// given p2p Channel queue.
RecvBufferCapacity int
// MaxSendBytes defines the maximum number of bytes that can be sent at any
// given moment from a Channel to a peer.
MaxSendBytes uint
}
func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {


+ 66
- 5
p2p/metrics.go View File

@ -25,11 +25,33 @@ type Metrics struct {
PeerPendingSendBytes metrics.Gauge
// Number of transactions submitted by each peer.
NumTxs metrics.Gauge
// RouterPeerQueueRecv defines the time taken to read off of a peer's queue
// before sending on the connection.
RouterPeerQueueRecv metrics.Histogram
// RouterPeerQueueSend defines the time taken to send on a peer's queue which
// will later be read and sent on the connection (see RouterPeerQueueRecv).
RouterPeerQueueSend metrics.Histogram
// RouterChannelQueueSend defines the time taken to send on a p2p channel's
// queue which will later be consued by the corresponding reactor/service.
RouterChannelQueueSend metrics.Histogram
// PeerQueueDroppedMsgs defines the number of messages dropped from a peer's
// queue for a specific flow (i.e. Channel).
PeerQueueDroppedMsgs metrics.Counter
// PeerQueueMsgSize defines the average size of messages sent over a peer's
// queue for a specific flow (i.e. Channel).
PeerQueueMsgSize metrics.Gauge
}
// PrometheusMetrics returns Metrics build using Prometheus client library.
// Optionally, labels can be provided along with their values ("foo",
// "fooValue").
//
// nolint: lll
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
@ -66,16 +88,55 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "num_txs",
Help: "Number of transactions submitted by each peer.",
}, append(labels, "peer_id")).With(labelsAndValues...),
RouterPeerQueueRecv: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "router_peer_queue_recv",
Help: "The time taken to read off of a peer's queue before sending on the connection.",
}, labels).With(labelsAndValues...),
RouterPeerQueueSend: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "router_peer_queue_send",
Help: "The time taken to send on a peer's queue which will later be read and sent on the connection (see RouterPeerQueueRecv).",
}, labels).With(labelsAndValues...),
RouterChannelQueueSend: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "router_channel_queue_send",
Help: "The time taken to send on a p2p channel's queue which will later be consued by the corresponding reactor/service.",
}, labels).With(labelsAndValues...),
PeerQueueDroppedMsgs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "router_channel_queue_dropped_msgs",
Help: "The number of messages dropped from a peer's queue for a specific p2p Channel.",
}, append(labels, "ch_id")).With(labelsAndValues...),
PeerQueueMsgSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "router_channel_queue_msg_size",
Help: "The size of messages sent over a peer's queue for a specific p2p Channel.",
}, append(labels, "ch_id")).With(labelsAndValues...),
}
}
// NopMetrics returns no-op Metrics.
func NopMetrics() *Metrics {
return &Metrics{
Peers: discard.NewGauge(),
PeerReceiveBytesTotal: discard.NewCounter(),
PeerSendBytesTotal: discard.NewCounter(),
PeerPendingSendBytes: discard.NewGauge(),
NumTxs: discard.NewGauge(),
Peers: discard.NewGauge(),
PeerReceiveBytesTotal: discard.NewCounter(),
PeerSendBytesTotal: discard.NewCounter(),
PeerPendingSendBytes: discard.NewGauge(),
NumTxs: discard.NewGauge(),
RouterPeerQueueRecv: discard.NewHistogram(),
RouterPeerQueueSend: discard.NewHistogram(),
RouterChannelQueueSend: discard.NewHistogram(),
PeerQueueDroppedMsgs: discard.NewCounter(),
PeerQueueMsgSize: discard.NewGauge(),
}
}

+ 9
- 2
p2p/p2ptest/network.go View File

@ -233,8 +233,15 @@ func (n *Network) MakeNode(t *testing.T) *Node {
})
require.NoError(t, err)
router, err := p2p.NewRouter(n.logger, nodeInfo, privKey, peerManager,
[]p2p.Transport{transport}, p2p.RouterOptions{})
router, err := p2p.NewRouter(
n.logger,
p2p.NopMetrics(),
nodeInfo,
privKey,
peerManager,
[]p2p.Transport{transport},
p2p.RouterOptions{},
)
require.NoError(t, err)
require.NoError(t, router.Start())


+ 2
- 0
p2p/pex/pex_reactor.go View File

@ -184,6 +184,8 @@ func (r *Reactor) GetChannels() []*conn.ChannelDescriptor {
Priority: 1,
SendQueueCapacity: 10,
RecvMessageCapacity: maxMsgSize,
MaxSendBytes: 200,
},
}
}


+ 277
- 0
p2p/pqueue.go View File

@ -0,0 +1,277 @@
package p2p
import (
"container/heap"
"sort"
"strconv"
"time"
"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/libs/log"
tmsync "github.com/tendermint/tendermint/libs/sync"
)
// pqEnvelope defines a wrapper around an Envelope with priority to be inserted
// into a priority queue used for Envelope scheduling.
type pqEnvelope struct {
envelope Envelope
priority uint
size uint
timestamp time.Time
index int
}
// priorityQueue defines a type alias for a priority queue implementation.
type priorityQueue []*pqEnvelope
func (pq priorityQueue) get(i int) *pqEnvelope { return pq[i] }
func (pq priorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Less(i, j int) bool {
// if both elements have the same priority, prioritize based on most recent
if pq[i].priority == pq[j].priority {
return pq[i].timestamp.After(pq[j].timestamp)
}
// otherwise, pick the pqEnvelope with the higher priority
return pq[i].priority > pq[j].priority
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
pqEnv := x.(*pqEnvelope)
pqEnv.index = n
*pq = append(*pq, pqEnv)
}
func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
pqEnv := old[n-1]
old[n-1] = nil
pqEnv.index = -1
*pq = old[:n-1]
return pqEnv
}
// Assert the priority queue scheduler implements the queue interface at
// compile-time.
var _ queue = (*pqScheduler)(nil)
type pqScheduler struct {
logger log.Logger
metrics *Metrics
size uint
sizes map[uint]uint // cumulative priority sizes
pq *priorityQueue
chDescs []ChannelDescriptor
capacity uint
chPriorities map[ChannelID]uint
enqueueCh chan Envelope
dequeueCh chan Envelope
closer *tmsync.Closer
done *tmsync.Closer
}
func newPQScheduler(
logger log.Logger,
m *Metrics,
chDescs []ChannelDescriptor,
enqueueBuf, dequeueBuf, capacity uint,
) *pqScheduler {
// copy each ChannelDescriptor and sort them by ascending channel priority
chDescsCopy := make([]ChannelDescriptor, len(chDescs))
copy(chDescsCopy, chDescs)
sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority < chDescsCopy[j].Priority })
var (
chPriorities = make(map[ChannelID]uint)
sizes = make(map[uint]uint)
)
for _, chDesc := range chDescsCopy {
chID := ChannelID(chDesc.ID)
chPriorities[chID] = uint(chDesc.Priority)
sizes[uint(chDesc.Priority)] = 0
}
pq := make(priorityQueue, 0)
heap.Init(&pq)
return &pqScheduler{
logger: logger.With("router", "scheduler"),
metrics: m,
chDescs: chDescsCopy,
capacity: capacity,
chPriorities: chPriorities,
pq: &pq,
sizes: sizes,
enqueueCh: make(chan Envelope, enqueueBuf),
dequeueCh: make(chan Envelope, dequeueBuf),
closer: tmsync.NewCloser(),
done: tmsync.NewCloser(),
}
}
func (s *pqScheduler) enqueue() chan<- Envelope {
return s.enqueueCh
}
func (s *pqScheduler) dequeue() <-chan Envelope {
return s.dequeueCh
}
func (s *pqScheduler) close() {
s.closer.Close()
<-s.done.Done()
}
func (s *pqScheduler) closed() <-chan struct{} {
return s.closer.Done()
}
// start starts non-blocking process that starts the priority queue scheduler.
func (s *pqScheduler) start() {
go s.process()
}
// process starts a block process where we listen for Envelopes to enqueue. If
// there is sufficient capacity, it will be enqueued into the priority queue,
// otherwise, we attempt to dequeue enough elements from the priority queue to
// make room for the incoming Envelope by dropping lower priority elements. If
// there isn't sufficient capacity at lower priorities for the incoming Envelope,
// it is dropped.
//
// After we attempt to enqueue the incoming Envelope, if the priority queue is
// non-empty, we pop the top Envelope and send it on the dequeueCh.
func (s *pqScheduler) process() {
defer s.done.Close()
for {
select {
case e := <-s.enqueueCh:
chIDStr := strconv.Itoa(int(e.channelID))
pqEnv := &pqEnvelope{
envelope: e,
size: uint(proto.Size(e.Message)),
priority: s.chPriorities[e.channelID],
timestamp: time.Now().UTC(),
}
// enqueue
// Check if we have sufficient capacity to simply enqueue the incoming
// Envelope.
if s.size+pqEnv.size <= s.capacity {
// enqueue the incoming Envelope
s.push(pqEnv)
} else {
// There is not sufficient capacity to simply enqueue the incoming
// Envelope. So we have to attempt to make room for it by dropping lower
// priority Envelopes or drop the incoming Envelope otherwise.
// The cumulative size of all enqueue envelopes at the incoming envelope's
// priority or lower.
total := s.sizes[pqEnv.priority]
if total >= pqEnv.size {
// There is room for the incoming Envelope, so we drop as many lower
// priority Envelopes as we need to.
var (
canEnqueue bool
tmpSize = s.size
i = s.pq.Len() - 1
)
// Drop lower priority Envelopes until sufficient capacity exists for
// the incoming Envelope
for i >= 0 && !canEnqueue {
pqEnvTmp := s.pq.get(i)
if pqEnvTmp.priority < pqEnv.priority {
if tmpSize+pqEnv.size <= s.capacity {
canEnqueue = true
} else {
pqEnvTmpChIDStr := strconv.Itoa(int(pqEnvTmp.envelope.channelID))
s.metrics.PeerQueueDroppedMsgs.With("ch_id", pqEnvTmpChIDStr).Add(1)
s.logger.Debug(
"dropped envelope",
"ch_id", pqEnvTmpChIDStr,
"priority", pqEnvTmp.priority,
"msg_size", pqEnvTmp.size,
"capacity", s.capacity,
)
// dequeue/drop from the priority queue
heap.Remove(s.pq, pqEnvTmp.index)
// update the size tracker
tmpSize -= pqEnvTmp.size
// start from the end again
i = s.pq.Len() - 1
}
} else {
i--
}
}
// enqueue the incoming Envelope
s.push(pqEnv)
} else {
// There is not sufficient capacity to drop lower priority Envelopes,
// so we drop the incoming Envelope.
s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1)
s.logger.Debug(
"dropped envelope",
"ch_id", chIDStr,
"priority", pqEnv.priority,
"msg_size", pqEnv.size,
"capacity", s.capacity,
)
}
}
// dequeue
for s.pq.Len() > 0 {
pqEnv = heap.Pop(s.pq).(*pqEnvelope)
s.size -= pqEnv.size
// deduct the Envelope size from all the relevant cumulative sizes
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
s.sizes[uint(s.chDescs[i].Priority)] -= pqEnv.size
}
s.dequeueCh <- pqEnv.envelope
}
case <-s.closer.Done():
return
}
}
}
func (s *pqScheduler) push(pqEnv *pqEnvelope) {
chIDStr := strconv.Itoa(int(pqEnv.envelope.channelID))
// enqueue the incoming Envelope
heap.Push(s.pq, pqEnv)
s.size += pqEnv.size
s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(pqEnv.size))
// Update the cumulative sizes by adding the Envelope's size to every
// priority less than or equal to it.
for i := 0; i < len(s.chDescs) && pqEnv.priority <= uint(s.chDescs[i].Priority); i++ {
s.sizes[uint(s.chDescs[i].Priority)] += pqEnv.size
}
}

+ 9
- 12
p2p/queue.go View File

@ -1,6 +1,8 @@
package p2p
import "sync"
import (
tmsync "github.com/tendermint/tendermint/libs/sync"
)
// queue does QoS scheduling for Envelopes, enqueueing and dequeueing according
// to some policy. Queues are used at contention points, i.e.:
@ -26,17 +28,14 @@ type queue interface {
// fifoQueue is a simple unbuffered lossless queue that passes messages through
// in the order they were received, and blocks until message is received.
type fifoQueue struct {
queueCh chan Envelope
closeCh chan struct{}
closeOnce sync.Once
queueCh chan Envelope
closer *tmsync.Closer
}
var _ queue = (*fifoQueue)(nil)
func newFIFOQueue(size int) *fifoQueue {
func newFIFOQueue(size int) queue {
return &fifoQueue{
queueCh: make(chan Envelope, size),
closeCh: make(chan struct{}),
closer: tmsync.NewCloser(),
}
}
@ -49,11 +48,9 @@ func (q *fifoQueue) dequeue() <-chan Envelope {
}
func (q *fifoQueue) close() {
q.closeOnce.Do(func() {
close(q.closeCh)
})
q.closer.Close()
}
func (q *fifoQueue) closed() <-chan struct{} {
return q.closeCh
return q.closer.Done()
}

+ 163
- 28
p2p/router.go View File

@ -15,6 +15,8 @@ import (
"github.com/tendermint/tendermint/libs/service"
)
const queueBufferDefault = 4096
// ChannelID is an arbitrary channel ID.
type ChannelID uint16
@ -124,10 +126,29 @@ type RouterOptions struct {
// HandshakeTimeout is the timeout for handshaking with a peer. 0 means
// no timeout.
HandshakeTimeout time.Duration
// QueueType must be "wdrr" (Weighed Deficit Round Robin),
// "priority", or FIFO. Defaults to FIFO.
QueueType string
}
const (
queueTypeFifo = "fifo"
queueTypePriority = "priority"
queueTypeWDRR = "wdrr"
)
// Validate validates router options.
func (o *RouterOptions) Validate() error {
switch o.QueueType {
case "":
o.QueueType = queueTypeFifo
case queueTypeFifo, queueTypeWDRR, queueTypePriority:
// pass
default:
return fmt.Errorf("queue type %q is not supported", o.QueueType)
}
return nil
}
@ -174,22 +195,25 @@ type Router struct {
*service.BaseService
logger log.Logger
metrics *Metrics
options RouterOptions
nodeInfo NodeInfo
privKey crypto.PrivKey
peerManager *PeerManager
chDescs []ChannelDescriptor
transports []Transport
protocolTransports map[Protocol]Transport
stopCh chan struct{} // signals Router shutdown
peerMtx sync.RWMutex
peerQueues map[NodeID]queue
peerMtx sync.RWMutex
peerQueues map[NodeID]queue // outbound messages per peer for all channels
queueFactory func(int) queue
// FIXME: We don't strictly need to use a mutex for this if we seal the
// channels on router start. This depends on whether we want to allow
// dynamic channels in the future.
channelMtx sync.RWMutex
channelQueues map[ChannelID]queue
channelQueues map[ChannelID]queue // inbound messages from all peers to a single channel
channelMessages map[ChannelID]proto.Message
}
@ -198,20 +222,24 @@ type Router struct {
// stops.
func NewRouter(
logger log.Logger,
metrics *Metrics,
nodeInfo NodeInfo,
privKey crypto.PrivKey,
peerManager *PeerManager,
transports []Transport,
options RouterOptions,
) (*Router, error) {
if err := options.Validate(); err != nil {
return nil, err
}
router := &Router{
logger: logger,
metrics: metrics,
nodeInfo: nodeInfo,
privKey: privKey,
chDescs: make([]ChannelDescriptor, 0),
transports: transports,
protocolTransports: map[Protocol]Transport{},
peerManager: peerManager,
@ -221,8 +249,16 @@ func NewRouter(
channelMessages: map[ChannelID]proto.Message{},
peerQueues: map[NodeID]queue{},
}
router.BaseService = service.NewBaseService(logger, "router", router)
qf, err := router.createQueueFactory()
if err != nil {
return nil, err
}
router.queueFactory = qf
for _, transport := range transports {
for _, protocol := range transport.Protocols() {
if _, ok := router.protocolTransports[protocol]; !ok {
@ -234,6 +270,46 @@ func NewRouter(
return router, nil
}
func (r *Router) createQueueFactory() (func(int) queue, error) {
switch r.options.QueueType {
case queueTypeFifo:
return newFIFOQueue, nil
case queueTypePriority:
return func(size int) queue {
if size%2 != 0 {
size++
}
q := newPQScheduler(r.logger, r.metrics, r.chDescs, uint(size)/2, uint(size)/2, defaultCapacity)
q.start()
return q
}, nil
case queueTypeWDRR:
return func(size int) queue {
if size%2 != 0 {
size++
}
q := newWDRRScheduler(r.logger, r.metrics, r.chDescs, uint(size)/2, uint(size)/2, defaultCapacity)
q.start()
return q
}, nil
default:
return nil, fmt.Errorf("cannot construct queue of type %q", r.options.QueueType)
}
}
// AddChannelDescriptors adds a set of ChannelDescriptors to the reactor. Note,
// this should be called before the router is started and any connections are made.
func (r *Router) AddChannelDescriptors(chDescs []*ChannelDescriptor) {
for _, chDesc := range chDescs {
r.chDescs = append(r.chDescs, *chDesc)
}
}
// OpenChannel opens a new channel for the given message type. The caller must
// close the channel when done, before stopping the Router. messageType is the
// type of message passed through the channel (used for unmarshaling), which can
@ -241,7 +317,18 @@ func NewRouter(
// wrapper message. The caller may provide a size to make the channel buffered,
// which internally makes the inbound, outbound, and error channel buffered.
func (r *Router) OpenChannel(id ChannelID, messageType proto.Message, size int) (*Channel, error) {
queue := newFIFOQueue(size)
if size == 0 {
size = queueBufferDefault
}
r.channelMtx.Lock()
defer r.channelMtx.Unlock()
if _, ok := r.channelQueues[id]; ok {
return nil, fmt.Errorf("channel %v already exists", id)
}
queue := r.queueFactory(size)
outCh := make(chan Envelope, size)
errCh := make(chan PeerError, size)
channel := NewChannel(id, messageType, queue.dequeue(), outCh, errCh)
@ -251,12 +338,6 @@ func (r *Router) OpenChannel(id ChannelID, messageType proto.Message, size int)
wrapper = w
}
r.channelMtx.Lock()
defer r.channelMtx.Unlock()
if _, ok := r.channelQueues[id]; ok {
return nil, fmt.Errorf("channel %v already exists", id)
}
r.channelQueues[id] = queue
r.channelMessages[id] = messageType
@ -297,44 +378,52 @@ func (r *Router) routeChannel(
// it on to Transport.SendMessage().
envelope.channelID = chID
// Wrap the message in a wrapper message, if requested.
// wrap the message in a wrapper message, if requested
if wrapper != nil {
msg := proto.Clone(wrapper)
if err := msg.(Wrapper).Wrap(envelope.Message); err != nil {
r.Logger.Error("failed to wrap message", "channel", chID, "err", err)
continue
}
envelope.Message = msg
}
// Collect peer queues to pass the message via.
// collect peer queues to pass the message via
var queues []queue
if envelope.Broadcast {
r.peerMtx.RLock()
queues = make([]queue, 0, len(r.peerQueues))
for _, q := range r.peerQueues {
queues = append(queues, q)
}
r.peerMtx.RUnlock()
} else {
r.peerMtx.RLock()
q, ok := r.peerQueues[envelope.To]
r.peerMtx.RUnlock()
if !ok {
r.logger.Debug("dropping message for unconnected peer",
"peer", envelope.To, "channel", chID)
r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID)
continue
}
queues = []queue{q}
}
// Send message to peers.
// send message to peers
for _, q := range queues {
start := time.Now().UTC()
select {
case q.enqueue() <- envelope:
r.metrics.RouterPeerQueueSend.Observe(time.Since(start).Seconds())
case <-q.closed():
r.logger.Debug("dropping message for unconnected peer",
"peer", envelope.To, "channel", chID)
r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID)
case <-r.stopCh:
return
}
@ -344,7 +433,9 @@ func (r *Router) routeChannel(
if !ok {
return
}
r.logger.Error("peer error, evicting", "peer", peerError.NodeID, "err", peerError.Err)
if err := r.peerManager.Errored(peerError.NodeID, peerError.Err); err != nil {
r.logger.Error("failed to report peer error", "peer", peerError.NodeID, "err", err)
}
@ -422,7 +513,8 @@ func (r *Router) acceptPeers(transport Transport) {
return
}
queue := newFIFOQueue(0)
queue := r.queueFactory(queueBufferDefault)
r.peerMtx.Lock()
r.peerQueues[peerInfo.NodeID] = queue
r.peerMtx.Unlock()
@ -431,7 +523,9 @@ func (r *Router) acceptPeers(transport Transport) {
r.peerMtx.Lock()
delete(r.peerQueues, peerInfo.NodeID)
r.peerMtx.Unlock()
queue.close()
if err := r.peerManager.Disconnected(peerInfo.NodeID); err != nil {
r.logger.Error("failed to disconnect peer", "peer", peerInfo.NodeID, "err", err)
}
@ -496,16 +590,14 @@ func (r *Router) dialPeers() {
return
}
queue := newFIFOQueue(0)
r.peerMtx.Lock()
r.peerQueues[peerID] = queue
r.peerMtx.Unlock()
peerQueue := r.getOrMakeQueue(peerID)
defer func() {
r.peerMtx.Lock()
delete(r.peerQueues, peerID)
r.peerMtx.Unlock()
queue.close()
peerQueue.close()
if err := r.peerManager.Disconnected(peerID); err != nil {
r.logger.Error("failed to disconnect peer", "peer", address, "err", err)
}
@ -516,11 +608,24 @@ func (r *Router) dialPeers() {
return
}
r.routePeer(peerID, conn, queue)
r.routePeer(peerID, conn, peerQueue)
}()
}
}
func (r *Router) getOrMakeQueue(peerID NodeID) queue {
r.peerMtx.Lock()
defer r.peerMtx.Unlock()
if peerQueue, ok := r.peerQueues[peerID]; ok {
return peerQueue
}
peerQueue := r.queueFactory(queueBufferDefault)
r.peerQueues[peerID] = peerQueue
return peerQueue
}
// dialPeer connects to a peer by dialing it.
func (r *Router) dialPeer(ctx context.Context, address NodeAddress) (Connection, error) {
resolveCtx := ctx
@ -603,10 +708,13 @@ func (r *Router) handshakePeer(ctx context.Context, conn Connection, expectID No
// they are closed elsewhere it will cause this method to shut down and return.
func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) {
r.logger.Info("peer connected", "peer", peerID, "endpoint", conn)
errCh := make(chan error, 2)
go func() {
errCh <- r.receivePeer(peerID, conn)
}()
go func() {
errCh <- r.sendPeer(peerID, conn, sendQueue)
}()
@ -614,14 +722,17 @@ func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) {
err := <-errCh
_ = conn.Close()
sendQueue.close()
if e := <-errCh; err == nil {
// The first err was nil, so we update it with the second err, which may
// or may not be nil.
err = e
}
switch err {
case nil, io.EOF:
r.logger.Info("peer disconnected", "peer", peerID, "endpoint", conn)
default:
r.logger.Error("peer failure", "peer", peerID, "endpoint", conn, "err", err)
}
@ -640,6 +751,7 @@ func (r *Router) receivePeer(peerID NodeID, conn Connection) error {
queue, ok := r.channelQueues[chID]
messageType := r.channelMessages[chID]
r.channelMtx.RUnlock()
if !ok {
r.logger.Debug("dropping message for unknown channel", "peer", peerID, "channel", chID)
continue
@ -650,6 +762,7 @@ func (r *Router) receivePeer(peerID NodeID, conn Connection) error {
r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err)
continue
}
if wrapper, ok := msg.(Wrapper); ok {
msg, err = wrapper.Unwrap()
if err != nil {
@ -658,11 +771,16 @@ func (r *Router) receivePeer(peerID NodeID, conn Connection) error {
}
}
start := time.Now().UTC()
select {
case queue.enqueue() <- Envelope{From: peerID, Message: msg}:
r.metrics.RouterChannelQueueSend.Observe(time.Since(start).Seconds())
r.logger.Debug("received message", "peer", peerID, "message", msg)
case <-queue.closed():
r.logger.Debug("channel closed, dropping message", "peer", peerID, "channel", chID)
case <-r.stopCh:
return nil
}
@ -670,14 +788,18 @@ func (r *Router) receivePeer(peerID NodeID, conn Connection) error {
}
// sendPeer sends queued messages to a peer.
func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error {
func (r *Router) sendPeer(peerID NodeID, conn Connection, peerQueue queue) error {
for {
start := time.Now().UTC()
select {
case envelope := <-queue.dequeue():
case envelope := <-peerQueue.dequeue():
r.metrics.RouterPeerQueueRecv.Observe(time.Since(start).Seconds())
if envelope.Message == nil {
r.logger.Error("dropping nil message", "peer", peerID)
continue
}
bz, err := proto.Marshal(envelope.Message)
if err != nil {
r.logger.Error("failed to marshal message", "peer", peerID, "err", err)
@ -688,9 +810,10 @@ func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error {
if err != nil {
return err
}
r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message)
case <-queue.closed():
case <-peerQueue.closed():
return nil
case <-r.stopCh:
@ -703,21 +826,26 @@ func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error {
func (r *Router) evictPeers() {
r.logger.Debug("starting evict routine")
ctx := r.stopCtx()
for {
peerID, err := r.peerManager.EvictNext(ctx)
switch {
case errors.Is(err, context.Canceled):
r.logger.Debug("stopping evict routine")
return
case err != nil:
r.logger.Error("failed to find next peer to evict", "err", err)
return
}
r.logger.Info("evicting peer", "peer", peerID)
r.peerMtx.RLock()
queue, ok := r.peerQueues[peerID]
r.peerMtx.RUnlock()
if ok {
queue.close()
}
@ -728,9 +856,11 @@ func (r *Router) evictPeers() {
func (r *Router) OnStart() error {
go r.dialPeers()
go r.evictPeers()
for _, transport := range r.transports {
go r.acceptPeers(transport)
}
return nil
}
@ -753,16 +883,19 @@ func (r *Router) OnStop() {
// Collect all remaining queues, and wait for them to close.
queues := []queue{}
r.channelMtx.RLock()
for _, q := range r.channelQueues {
queues = append(queues, q)
}
r.channelMtx.RUnlock()
r.peerMtx.RLock()
for _, q := range r.peerQueues {
queues = append(queues, q)
}
r.peerMtx.RUnlock()
for _, q := range queues {
<-q.closed()
}
@ -771,9 +904,11 @@ func (r *Router) OnStop() {
// stopCtx returns a new context that is canceled when the router stops.
func (r *Router) stopCtx() context.Context {
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-r.stopCh
cancel()
}()
return ctx
}

+ 62
- 0
p2p/router_init_test.go View File

@ -0,0 +1,62 @@
package p2p
import (
"os"
"testing"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
)
func TestRouter_ConstructQueueFactory(t *testing.T) {
t.Run("ValidateOptionsPopulatesDefaultQueue", func(t *testing.T) {
opts := RouterOptions{}
require.NoError(t, opts.Validate())
require.Equal(t, "fifo", opts.QueueType)
})
t.Run("Default", func(t *testing.T) {
require.Zero(t, os.Getenv("TM_P2P_QUEUE"))
opts := RouterOptions{}
r, err := NewRouter(log.NewNopLogger(), nil, NodeInfo{}, nil, nil, nil, opts)
require.NoError(t, err)
_, ok := r.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Fifo", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypeFifo}
r, err := NewRouter(log.NewNopLogger(), nil, NodeInfo{}, nil, nil, nil, opts)
require.NoError(t, err)
_, ok := r.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Priority", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypePriority}
r, err := NewRouter(log.NewNopLogger(), nil, NodeInfo{}, nil, nil, nil, opts)
require.NoError(t, err)
q, ok := r.queueFactory(1).(*pqScheduler)
require.True(t, ok)
defer q.close()
})
t.Run("WDRR", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypeWDRR}
r, err := NewRouter(log.NewNopLogger(), nil, NodeInfo{}, nil, nil, nil, opts)
require.NoError(t, err)
q, ok := r.queueFactory(1).(*wdrrScheduler)
require.True(t, ok)
defer q.close()
})
t.Run("NonExistant", func(t *testing.T) {
opts := RouterOptions{QueueType: "fast"}
_, err := NewRouter(log.NewNopLogger(), nil, NodeInfo{}, nil, nil, nil, opts)
require.Error(t, err)
require.Contains(t, err.Error(), "fast")
})
t.Run("InternalsSafeWhenUnspecified", func(t *testing.T) {
r := &Router{}
require.Zero(t, r.options.QueueType)
fn, err := r.createQueueFactory()
require.Error(t, err)
require.Nil(t, fn)
})
}

+ 72
- 15
p2p/router_test.go View File

@ -97,7 +97,15 @@ func TestRouter_Channel(t *testing.T) {
// Set up a router with no transports (so no peers).
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager, nil, p2p.RouterOptions{})
router, err := p2p.NewRouter(
log.TestingLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
nil,
p2p.RouterOptions{},
)
require.NoError(t, err)
require.NoError(t, router.Start())
@ -343,8 +351,15 @@ func TestRouter_AcceptPeers(t *testing.T) {
sub := peerManager.Subscribe()
defer sub.Close()
router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager,
[]p2p.Transport{mockTransport}, p2p.RouterOptions{})
router, err := p2p.NewRouter(
log.TestingLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
p2p.RouterOptions{},
)
require.NoError(t, err)
require.NoError(t, router.Start())
@ -383,8 +398,15 @@ func TestRouter_AcceptPeers_Error(t *testing.T) {
// Set up and start the router.
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager,
[]p2p.Transport{mockTransport}, p2p.RouterOptions{})
router, err := p2p.NewRouter(
log.TestingLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
p2p.RouterOptions{},
)
require.NoError(t, err)
require.NoError(t, router.Start())
@ -408,8 +430,15 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) {
// Set up and start the router.
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager,
[]p2p.Transport{mockTransport}, p2p.RouterOptions{})
router, err := p2p.NewRouter(
log.TestingLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
p2p.RouterOptions{},
)
require.NoError(t, err)
require.NoError(t, router.Start())
@ -446,8 +475,15 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
// Set up and start the router.
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
require.NoError(t, err)
router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager,
[]p2p.Transport{mockTransport}, p2p.RouterOptions{})
router, err := p2p.NewRouter(
log.TestingLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
p2p.RouterOptions{},
)
require.NoError(t, err)
require.NoError(t, router.Start())
@ -520,8 +556,15 @@ func TestRouter_DialPeers(t *testing.T) {
sub := peerManager.Subscribe()
defer sub.Close()
router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager,
[]p2p.Transport{mockTransport}, p2p.RouterOptions{})
router, err := p2p.NewRouter(
log.TestingLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
p2p.RouterOptions{},
)
require.NoError(t, err)
require.NoError(t, router.Start())
@ -583,8 +626,15 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
require.NoError(t, peerManager.Add(b))
require.NoError(t, peerManager.Add(c))
router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager,
[]p2p.Transport{mockTransport}, p2p.RouterOptions{})
router, err := p2p.NewRouter(
log.TestingLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
p2p.RouterOptions{},
)
require.NoError(t, err)
require.NoError(t, router.Start())
@ -630,8 +680,15 @@ func TestRouter_EvictPeers(t *testing.T) {
sub := peerManager.Subscribe()
defer sub.Close()
router, err := p2p.NewRouter(log.TestingLogger(), selfInfo, selfKey, peerManager,
[]p2p.Transport{mockTransport}, p2p.RouterOptions{})
router, err := p2p.NewRouter(
log.TestingLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
[]p2p.Transport{mockTransport},
p2p.RouterOptions{},
)
require.NoError(t, err)
require.NoError(t, router.Start())


+ 283
- 0
p2p/wdrr_queue.go View File

@ -0,0 +1,283 @@
package p2p
import (
"sort"
"strconv"
"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/libs/log"
tmsync "github.com/tendermint/tendermint/libs/sync"
)
const defaultCapacity uint = 1048576 // 1MB
// wrappedEnvelope wraps a p2p Envelope with its precomputed size.
type wrappedEnvelope struct {
envelope Envelope
size uint
}
// assert the WDDR scheduler implements the queue interface at compile-time
var _ queue = (*wdrrScheduler)(nil)
// wdrrQueue implements a Weighted Deficit Round Robin (WDRR) scheduling
// algorithm via the queue interface. A WDRR queue is created per peer, where
// the queue will have N number of flows. Each flow corresponds to a p2p Channel,
// so there are n input flows and a single output source, the peer's connection.
//
// The WDRR scheduler contains a shared buffer with a fixed capacity.
//
// Each flow has the following:
// - quantum: The number of bytes that is added to the deficit counter of the
// flow in each round. The flow can send at most quantum bytes at a time. Each
// flow has its own unique quantum, which gives the queue its weighted nature.
// A higher quantum corresponds to a higher weight/priority. The quantum is
// computed as MaxSendBytes * Priority.
// - deficit counter: The number of bytes that the flow is allowed to transmit
// when it is its turn.
//
// See: https://en.wikipedia.org/wiki/Deficit_round_robin
type wdrrScheduler struct {
logger log.Logger
metrics *Metrics
chDescs []ChannelDescriptor
capacity uint
size uint
chPriorities map[ChannelID]uint
buffer map[ChannelID][]wrappedEnvelope
quanta map[ChannelID]uint
deficits map[ChannelID]uint
closer *tmsync.Closer
doneCh *tmsync.Closer
enqueueCh chan Envelope
dequeueCh chan Envelope
}
func newWDRRScheduler(
logger log.Logger,
m *Metrics,
chDescs []ChannelDescriptor,
enqueueBuf, dequeueBuf, capacity uint,
) *wdrrScheduler {
// copy each ChannelDescriptor and sort them by channel priority
chDescsCopy := make([]ChannelDescriptor, len(chDescs))
copy(chDescsCopy, chDescs)
sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority > chDescsCopy[j].Priority })
var (
buffer = make(map[ChannelID][]wrappedEnvelope)
chPriorities = make(map[ChannelID]uint)
quanta = make(map[ChannelID]uint)
deficits = make(map[ChannelID]uint)
)
for _, chDesc := range chDescsCopy {
chID := ChannelID(chDesc.ID)
chPriorities[chID] = uint(chDesc.Priority)
buffer[chID] = make([]wrappedEnvelope, 0)
quanta[chID] = chDesc.MaxSendBytes * uint(chDesc.Priority)
}
return &wdrrScheduler{
logger: logger.With("queue", "wdrr"),
metrics: m,
capacity: capacity,
chPriorities: chPriorities,
chDescs: chDescsCopy,
buffer: buffer,
quanta: quanta,
deficits: deficits,
closer: tmsync.NewCloser(),
doneCh: tmsync.NewCloser(),
enqueueCh: make(chan Envelope, enqueueBuf),
dequeueCh: make(chan Envelope, dequeueBuf),
}
}
// enqueue returns an unbuffered write-only channel which a producer can send on.
func (s *wdrrScheduler) enqueue() chan<- Envelope {
return s.enqueueCh
}
// dequeue returns an unbuffered read-only channel which a consumer can read from.
func (s *wdrrScheduler) dequeue() <-chan Envelope {
return s.dequeueCh
}
func (s *wdrrScheduler) closed() <-chan struct{} {
return s.closer.Done()
}
// close closes the WDRR queue. After this call enqueue() will block, so the
// caller must select on closed() as well to avoid blocking forever. The
// enqueue() and dequeue() along with the internal channels will NOT be closed.
// Note, close() will block until all externally spawned goroutines have exited.
func (s *wdrrScheduler) close() {
s.closer.Close()
<-s.doneCh.Done()
}
// start starts the WDRR queue process in a blocking goroutine. This must be
// called before the queue can start to process and accept Envelopes.
func (s *wdrrScheduler) start() {
go s.process()
}
// process starts a blocking WDRR scheduler process, where we continuously
// evaluate if we need to attempt to enqueue an Envelope or schedule Envelopes
// to be dequeued and subsequently read and sent on the source connection.
// Internally, each p2p Channel maps to a flow, where each flow has a deficit
// and a quantum.
//
// For each Envelope requested to be enqueued, we evaluate if there is sufficient
// capacity in the shared buffer to add the Envelope. If so, it is added.
// Otherwise, we evaluate all flows of lower priority where we attempt find an
// existing Envelope in the shared buffer of sufficient size that can be dropped
// in place of the incoming Envelope. If there is no such Envelope that can be
// dropped, then the incoming Envelope is dropped.
//
// When there is nothing to be enqueued, we perform the WDRR algorithm and
// determine which Envelopes can be dequeued. For each Envelope that can be
// dequeued, it is sent on the dequeueCh. Specifically, for each flow, if it is
// non-empty, its deficit counter is incremented by its quantum value. Then, the
// value of the deficit counter is a maximal amount of bytes that can be sent at
// this round. If the deficit counter is greater than the Envelopes's message
// size at the head of the queue (HoQ), this envelope can be sent and the value
// of the counter is decremented by the message's size. Then, the size of the
// next Envelopes's message is compared to the counter value, etc. Once the flow
// is empty or the value of the counter is insufficient, the scheduler will skip
// to the next flow. If the flow is empty, the value of the deficit counter is
// reset to 0.
//
// XXX/TODO: Evaluate the single goroutine scheduler mechanism. In other words,
// evaluate the effectiveness and performance of having a single goroutine
// perform handling both enqueueing and dequeueing logic. Specifically, there
// is potentially contention between reading off of enqueueCh and trying to
// enqueue while also attempting to perform the WDRR algorithm and find the next
// set of Envelope(s) to send on the dequeueCh. Alternatively, we could consider
// separate scheduling goroutines, but then that requires the use of mutexes and
// possibly a degrading performance.
func (s *wdrrScheduler) process() {
defer s.doneCh.Close()
for {
select {
case <-s.closer.Done():
return
case e := <-s.enqueueCh:
// attempt to enqueue the incoming Envelope
chIDStr := strconv.Itoa(int(e.channelID))
wEnv := wrappedEnvelope{envelope: e, size: uint(proto.Size(e.Message))}
msgSize := wEnv.size
// If we're at capacity, we need to either drop the incoming Envelope or
// an Envelope from a lower priority flow. Otherwise, we add the (wrapped)
// envelope to the flow's queue.
if s.size+wEnv.size > s.capacity {
chPriority := s.chPriorities[e.channelID]
var (
canDrop bool
dropIdx int
dropChID ChannelID
)
// Evaluate all lower priority flows and determine if there exists an
// Envelope that is of equal or greater size that we can drop in favor
// of the incoming Envelope.
for i := len(s.chDescs) - 1; i >= 0 && uint(s.chDescs[i].Priority) < chPriority && !canDrop; i-- {
currChID := ChannelID(s.chDescs[i].ID)
flow := s.buffer[currChID]
for j := 0; j < len(flow) && !canDrop; j++ {
if flow[j].size >= wEnv.size {
canDrop = true
dropIdx = j
dropChID = currChID
break
}
}
}
// If we can drop an existing Envelope, drop it and enqueue the incoming
// Envelope.
if canDrop {
chIDStr = strconv.Itoa(int(dropChID))
chPriority = s.chPriorities[dropChID]
msgSize = s.buffer[dropChID][dropIdx].size
// Drop Envelope for the lower priority flow and update the queue's
// buffer size
s.size -= msgSize
s.buffer[dropChID] = append(s.buffer[dropChID][:dropIdx], s.buffer[dropChID][dropIdx+1:]...)
// add the incoming Envelope and update queue's buffer size
s.size += wEnv.size
s.buffer[e.channelID] = append(s.buffer[e.channelID], wEnv)
s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(wEnv.size))
}
// We either dropped the incoming Enevelope or one from an existing
// lower priority flow.
s.metrics.PeerQueueDroppedMsgs.With("ch_id", chIDStr).Add(1)
s.logger.Debug(
"dropped envelope",
"ch_id", chIDStr,
"priority", chPriority,
"capacity", s.capacity,
"msg_size", msgSize,
)
} else {
// we have sufficient capacity to enqueue the incoming Envelope
s.metrics.PeerQueueMsgSize.With("ch_id", chIDStr).Set(float64(wEnv.size))
s.buffer[e.channelID] = append(s.buffer[e.channelID], wEnv)
s.size += wEnv.size
}
default:
// perform the WDRR algorithm
for _, chDesc := range s.chDescs {
chID := ChannelID(chDesc.ID)
// only consider non-empty flows
if len(s.buffer[chID]) > 0 {
// bump flow's quantum
s.deficits[chID] += s.quanta[chID]
// grab the flow's current deficit counter and HoQ (wrapped) Envelope
d := s.deficits[chID]
we := s.buffer[chID][0]
// While the flow is non-empty and we can send the current Envelope
// on the dequeueCh:
//
// 1. send the Envelope
// 2. update the scheduler's shared buffer's size
// 3. update the flow's deficit
// 4. remove from the flow's queue
// 5. grab the next HoQ Envelope and flow's deficit
for len(s.buffer[chID]) > 0 && d >= we.size {
s.dequeueCh <- we.envelope
s.size -= we.size
s.deficits[chID] -= we.size
s.buffer[chID] = s.buffer[chID][1:]
if len(s.buffer[chID]) > 0 {
d = s.deficits[chID]
we = s.buffer[chID][0]
}
}
}
// reset the flow's deficit to zero if it is empty
if len(s.buffer[chID]) == 0 {
s.deficits[chID] = 0
}
}
}
}
}

+ 208
- 0
p2p/wdrr_queue_test.go View File

@ -0,0 +1,208 @@
package p2p
import (
"math"
"math/rand"
"testing"
"time"
gogotypes "github.com/gogo/protobuf/types"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
tmsync "github.com/tendermint/tendermint/libs/sync"
)
type testMessage = gogotypes.StringValue
func TestWDRRQueue_EqualWeights(t *testing.T) {
chDescs := []ChannelDescriptor{
{ID: 0x01, Priority: 1, MaxSendBytes: 4},
{ID: 0x02, Priority: 1, MaxSendBytes: 4},
{ID: 0x03, Priority: 1, MaxSendBytes: 4},
{ID: 0x04, Priority: 1, MaxSendBytes: 4},
{ID: 0x05, Priority: 1, MaxSendBytes: 4},
{ID: 0x06, Priority: 1, MaxSendBytes: 4},
}
peerQueue := newWDRRScheduler(log.NewNopLogger(), NopMetrics(), chDescs, 1000, 1000, 120)
peerQueue.start()
totalMsgs := make(map[ChannelID]int)
deliveredMsgs := make(map[ChannelID]int)
successRates := make(map[ChannelID]float64)
closer := tmsync.NewCloser()
go func() {
timout := 10 * time.Second
ticker := time.NewTicker(timout)
defer ticker.Stop()
for {
select {
case e := <-peerQueue.dequeue():
deliveredMsgs[e.channelID]++
ticker.Reset(timout)
case <-ticker.C:
closer.Close()
}
}
}()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
maxMsgs := 5000
minMsgs := 1000
for _, chDesc := range chDescs {
total := rng.Intn(maxMsgs-minMsgs) + minMsgs // total = rand[minMsgs, maxMsgs)
totalMsgs[ChannelID(chDesc.ID)] = total
go func(cID ChannelID, n int) {
for i := 0; i < n; i++ {
peerQueue.enqueue() <- Envelope{
channelID: cID,
Message: &testMessage{Value: "foo"}, // 5 bytes
}
}
}(ChannelID(chDesc.ID), total)
}
// wait for dequeueing to complete
<-closer.Done()
// close queue and wait for cleanup
peerQueue.close()
<-peerQueue.closed()
var (
sum float64
stdDev float64
)
for _, chDesc := range peerQueue.chDescs {
chID := ChannelID(chDesc.ID)
require.Zero(t, peerQueue.deficits[chID], "expected flow deficit to be zero")
require.Len(t, peerQueue.buffer[chID], 0, "expected flow queue to be empty")
total := totalMsgs[chID]
delivered := deliveredMsgs[chID]
successRate := float64(delivered) / float64(total)
sum += successRate
successRates[chID] = successRate
// require some messages dropped
require.Less(t, delivered, total, "expected some messages to be dropped")
require.Less(t, successRate, 1.0, "expected a success rate below 100%")
}
require.Zero(t, peerQueue.size, "expected scheduler size to be zero")
numFlows := float64(len(peerQueue.buffer))
mean := sum / numFlows
for _, successRate := range successRates {
stdDev += math.Pow(successRate-mean, 2)
}
stdDev = math.Sqrt(stdDev / numFlows)
require.Less(t, stdDev, 0.02, "expected success rate standard deviation to be less than 2%")
}
func TestWDRRQueue_DecreasingWeights(t *testing.T) {
chDescs := []ChannelDescriptor{
{ID: 0x01, Priority: 18, MaxSendBytes: 4},
{ID: 0x02, Priority: 10, MaxSendBytes: 4},
{ID: 0x03, Priority: 2, MaxSendBytes: 4},
{ID: 0x04, Priority: 1, MaxSendBytes: 4},
{ID: 0x05, Priority: 1, MaxSendBytes: 4},
{ID: 0x06, Priority: 1, MaxSendBytes: 4},
}
peerQueue := newWDRRScheduler(log.NewNopLogger(), NopMetrics(), chDescs, 0, 0, 500)
peerQueue.start()
totalMsgs := make(map[ChannelID]int)
deliveredMsgs := make(map[ChannelID]int)
successRates := make(map[ChannelID]float64)
for _, chDesc := range chDescs {
total := 1000
totalMsgs[ChannelID(chDesc.ID)] = total
go func(cID ChannelID, n int) {
for i := 0; i < n; i++ {
peerQueue.enqueue() <- Envelope{
channelID: cID,
Message: &testMessage{Value: "foo"}, // 5 bytes
}
}
}(ChannelID(chDesc.ID), total)
}
closer := tmsync.NewCloser()
go func() {
timout := 20 * time.Second
ticker := time.NewTicker(timout)
defer ticker.Stop()
for {
select {
case e := <-peerQueue.dequeue():
deliveredMsgs[e.channelID]++
ticker.Reset(timout)
case <-ticker.C:
closer.Close()
}
}
}()
// wait for dequeueing to complete
<-closer.Done()
// close queue and wait for cleanup
peerQueue.close()
<-peerQueue.closed()
for i, chDesc := range peerQueue.chDescs {
chID := ChannelID(chDesc.ID)
require.Zero(t, peerQueue.deficits[chID], "expected flow deficit to be zero")
require.Len(t, peerQueue.buffer[chID], 0, "expected flow queue to be empty")
total := totalMsgs[chID]
delivered := deliveredMsgs[chID]
successRate := float64(delivered) / float64(total)
successRates[chID] = successRate
// Require some messages dropped. Note, the top weighted flows may not have
// any dropped if lower priority non-empty queues always exist.
if i > 2 {
require.Less(t, delivered, total, "expected some messages to be dropped")
require.Less(t, successRate, 1.0, "expected a success rate below 100%")
}
}
require.Zero(t, peerQueue.size, "expected scheduler size to be zero")
// require channel 0x01 to have the highest success rate due to its weight
ch01Rate := successRates[ChannelID(chDescs[0].ID)]
for i := 1; i < len(chDescs); i++ {
require.GreaterOrEqual(t, ch01Rate, successRates[ChannelID(chDescs[i].ID)])
}
// require channel 0x02 to have the 2nd highest success rate due to its weight
ch02Rate := successRates[ChannelID(chDescs[1].ID)]
for i := 2; i < len(chDescs); i++ {
require.GreaterOrEqual(t, ch02Rate, successRates[ChannelID(chDescs[i].ID)])
}
// require channel 0x03 to have the 3rd highest success rate due to its weight
ch03Rate := successRates[ChannelID(chDescs[2].ID)]
for i := 3; i < len(chDescs); i++ {
require.GreaterOrEqual(t, ch03Rate, successRates[ChannelID(chDescs[i].ID)])
}
}

+ 4
- 0
statesync/reactor.go View File

@ -37,6 +37,8 @@ var (
Priority: 5,
SendQueueCapacity: 10,
RecvMessageCapacity: snapshotMsgSize,
MaxSendBytes: 400,
},
},
ChunkChannel: {
@ -46,6 +48,8 @@ var (
Priority: 1,
SendQueueCapacity: 4,
RecvMessageCapacity: chunkMsgSize,
MaxSendBytes: 400,
},
},
}


+ 2
- 0
test/e2e/docker/Dockerfile View File

@ -18,6 +18,7 @@ RUN go mod download
COPY . .
RUN make build && cp build/tendermint /usr/bin/tendermint
COPY test/e2e/docker/entrypoint* /usr/bin/
# FIXME: Temporarily disconnect maverick node until it is redesigned
# RUN cd test/e2e && make maverick && cp build/maverick /usr/bin/maverick
RUN cd test/e2e && make app && cp build/app /usr/bin/app
@ -28,6 +29,7 @@ WORKDIR /tendermint
VOLUME /tendermint
ENV TMHOME=/tendermint
ENV TM_LEGACY_P2P=true
ENV TM_P2P_QUEUE="priority"
EXPOSE 26656 26657 26660 6060
ENTRYPOINT ["/usr/bin/entrypoint"]


+ 11
- 6
test/e2e/runner/load.go View File

@ -13,9 +13,10 @@ import (
"github.com/tendermint/tendermint/types"
)
// Load generates transactions against the network until the given
// context is canceled.
func Load(ctx context.Context, testnet *e2e.Testnet) error {
// Load generates transactions against the network until the given context is
// canceled. A multiplier of great than one can be supplied if load needs to
// be generated beyond a minimum amount.
func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
// Since transactions are executed across all nodes in the network, we need
// to reduce transaction load for larger networks to avoid using too much
// CPU. This gives high-throughput small networks and low-throughput large ones.
@ -39,7 +40,7 @@ func Load(ctx context.Context, testnet *e2e.Testnet) error {
go loadGenerate(ctx, chTx)
for w := 0; w < concurrency; w++ {
for w := 0; w < concurrency*multiplier; w++ {
go loadProcess(ctx, testnet, chTx, chSuccess)
}
@ -81,6 +82,7 @@ func loadGenerate(ctx context.Context, chTx chan<- types.Tx) {
select {
case chTx <- tx:
time.Sleep(10 * time.Millisecond)
case <-ctx.Done():
close(chTx)
return
@ -97,18 +99,21 @@ func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx
var err error
for tx := range chTx {
node := testnet.RandomNode()
client, ok := clients[node.Name]
if !ok {
client, err = node.Client()
if err != nil {
continue
}
clients[node.Name] = client
}
_, err = client.BroadcastTxCommit(ctx, tx)
if err != nil {
if _, err = client.BroadcastTxCommit(ctx, tx); err != nil {
continue
}
chSuccess <- tx
}
}

+ 16
- 5
test/e2e/runner/main.go View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"strconv"
"github.com/spf13/cobra"
@ -59,7 +60,7 @@ func NewCLI() *CLI {
ctx, loadCancel := context.WithCancel(context.Background())
defer loadCancel()
go func() {
err := Load(ctx, cli.testnet)
err := Load(ctx, cli.testnet, 1)
if err != nil {
logger.Error(fmt.Sprintf("Transaction load failed: %v", err.Error()))
}
@ -170,10 +171,20 @@ func NewCLI() *CLI {
})
cli.root.AddCommand(&cobra.Command{
Use: "load",
Use: "load [multiplier]",
Args: cobra.MaximumNArgs(1),
Short: "Generates transaction load until the command is canceled",
RunE: func(cmd *cobra.Command, args []string) error {
return Load(context.Background(), cli.testnet)
RunE: func(cmd *cobra.Command, args []string) (err error) {
m := 1
if len(args) == 1 {
m, err = strconv.Atoi(args[0])
if err != nil {
return err
}
}
return Load(context.Background(), cli.testnet, m)
},
})
@ -242,7 +253,7 @@ Does not run any perbutations.
ctx, loadCancel := context.WithCancel(context.Background())
defer loadCancel()
go func() {
err := Load(ctx, cli.testnet)
err := Load(ctx, cli.testnet, 1)
if err != nil {
logger.Error(fmt.Sprintf("Transaction load failed: %v", err.Error()))
}


+ 12
- 0
test/e2e/runner/setup.go View File

@ -154,6 +154,9 @@ func MakeDockerCompose(testnet *e2e.Testnet) ([]byte, error) {
}
return command
},
"addUint32": func(x, y uint32) uint32 {
return x + y
},
}).Parse(`version: '2.4'
networks:
@ -184,6 +187,7 @@ services:
init: true
ports:
- 26656
- {{ if .ProxyPort }}{{ addUint32 .ProxyPort 1000 }}:{{ end }}26660
- {{ if .ProxyPort }}{{ .ProxyPort }}:{{ end }}26657
- 6060
volumes:
@ -247,9 +251,11 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
cfg := config.DefaultConfig()
cfg.Moniker = node.Name
cfg.ProxyApp = AppAddressTCP
if node.LogLevel != "" {
cfg.LogLevel = node.LogLevel
}
cfg.RPC.ListenAddress = "tcp://0.0.0.0:26657"
cfg.RPC.PprofListenAddress = ":6060"
cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false))
@ -315,12 +321,14 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
if node.StateSync {
cfg.StateSync.Enable = true
cfg.StateSync.RPCServers = []string{}
for _, peer := range node.Testnet.ArchiveNodes() {
if peer.Name == node.Name {
continue
}
cfg.StateSync.RPCServers = append(cfg.StateSync.RPCServers, peer.AddressRPC())
}
if len(cfg.StateSync.RPCServers) < 2 {
return nil, errors.New("unable to find 2 suitable state sync RPC servers")
}
@ -333,6 +341,7 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
}
cfg.P2P.Seeds += seed.AddressP2P(true)
}
cfg.P2P.PersistentPeers = ""
for _, peer := range node.PersistentPeers {
if len(cfg.P2P.PersistentPeers) > 0 {
@ -340,6 +349,9 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
}
cfg.P2P.PersistentPeers += peer.AddressP2P(true)
}
cfg.Instrumentation.Prometheus = true
return cfg, nil
}


Loading…
Cancel
Save