Browse Source

pubsub: Use a dynamic queue for buffered subscriptions (#7177)

Updates #7156, and a follow-up to #7070.

Event subscriptions in Tendermint currently use a fixed-length Go
channel as a queue. When the channel fills up, the publisher
immediately terminates the subscription. This prevents slow
subscribers from creating memory pressure on the node by not
servicing their queue fast enough.

Replace the buffered channel used to deliver events to buffered
subscribers with an explicit queue. The queue provides a soft
quota and burst credit mechanism: Clients that usually keep up
can survive occasional bursts, without allowing truly slow
clients to hog resources indefinitely.
pull/7192/head
M. J. Fromberger 3 years ago
committed by GitHub
parent
commit
d32913c889
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 485 additions and 17 deletions
  1. +232
    -0
      internal/libs/queue/queue.go
  2. +188
    -0
      internal/libs/queue/queue_test.go
  3. +6
    -10
      libs/pubsub/pubsub.go
  4. +5
    -0
      libs/pubsub/pubsub_test.go
  5. +54
    -7
      libs/pubsub/subscription.go

+ 232
- 0
internal/libs/queue/queue.go View File

@ -0,0 +1,232 @@
// Package queue implements a dynamic FIFO queue with a fixed upper bound
// and a flexible quota mechanism to handle bursty load.
package queue
import (
"context"
"errors"
"sync"
)
var (
// ErrQueueFull is returned by the Add method of a queue when the queue has
// reached its hard capacity limit.
ErrQueueFull = errors.New("queue is full")
// ErrNoCredit is returned by the Add method of a queue when the queue has
// exceeded its soft quota and there is insufficient burst credit.
ErrNoCredit = errors.New("insufficient burst credit")
// ErrQueueClosed is returned by the Add method of a closed queue, and by
// the Wait method of a closed empty queue.
ErrQueueClosed = errors.New("queue is closed")
// Sentinel errors reported by the New constructor.
errHardLimit = errors.New("hard limit must be > 0 and ≥ soft quota")
errBurstCredit = errors.New("burst credit must be non-negative")
)
// A Queue is a limited-capacity FIFO queue of arbitrary data items.
//
// A queue has a soft quota and a hard limit on the number of items that may be
// contained in the queue. Adding items in excess of the hard limit will fail
// unconditionally.
//
// For items in excess of the soft quota, a credit system applies: Each queue
// maintains a burst credit score. Adding an item in excess of the soft quota
// costs 1 unit of burst credit. If there is not enough burst credit, the add
// will fail.
//
// The initial burst credit is assigned when the queue is constructed. Removing
// items from the queue adds additional credit if the resulting queue length is
// less than the current soft quota. Burst credit is capped by the hard limit.
//
// A Queue is safe for concurrent use by multiple goroutines.
type Queue struct {
mu sync.Mutex // protects the fields below
softQuota int // adjusted dynamically (see Add, Remove)
hardLimit int // fixed for the lifespan of the queue
queueLen int // number of entries in the queue list
credit float64 // current burst credit
closed bool
nempty *sync.Cond
back *entry
front *entry
// The queue is singly-linked. Front points to the sentinel and back points
// to the newest entry. The oldest entry is front.link if it exists.
}
// New constructs a new empty queue with the specified options. It reports an
// error if any of the option values are invalid.
func New(opts Options) (*Queue, error) {
if opts.HardLimit <= 0 || opts.HardLimit < opts.SoftQuota {
return nil, errHardLimit
}
if opts.BurstCredit < 0 {
return nil, errBurstCredit
}
if opts.SoftQuota <= 0 {
opts.SoftQuota = opts.HardLimit
}
if opts.BurstCredit == 0 {
opts.BurstCredit = float64(opts.SoftQuota)
}
sentinel := new(entry)
q := &Queue{
softQuota: opts.SoftQuota,
hardLimit: opts.HardLimit,
credit: opts.BurstCredit,
back: sentinel,
front: sentinel,
}
q.nempty = sync.NewCond(&q.mu)
return q, nil
}
// Add adds item to the back of the queue. It reports an error and does not
// enqueue the item if the queue is full or closed, or if it exceeds its soft
// quota and there is not enough burst credit.
func (q *Queue) Add(item interface{}) error {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return ErrQueueClosed
}
if q.queueLen >= q.softQuota {
if q.queueLen == q.hardLimit {
return ErrQueueFull
} else if q.credit < 1 {
return ErrNoCredit
}
// Successfully exceeding the soft quota deducts burst credit and raises
// the soft quota. This has the effect of reducing the credit cap and the
// amount of credit given for removing items to better approximate the
// rate at which the consumer is servicing the queue.
q.credit--
q.softQuota = q.queueLen + 1
}
e := &entry{item: item}
q.back.link = e
q.back = e
q.queueLen++
if q.queueLen == 1 { // was empty
q.nempty.Signal()
}
return nil
}
// Remove removes and returns the frontmost (oldest) item in the queue and
// reports whether an item was available. If the queue is empty, Remove
// returns nil, false.
func (q *Queue) Remove() (interface{}, bool) {
q.mu.Lock()
defer q.mu.Unlock()
if q.queueLen == 0 {
return nil, false
}
return q.popFront(), true
}
// Wait blocks until q is non-empty or closed, and then returns the frontmost
// (oldest) item from the queue. If ctx ends before an item is available, Wait
// returns a nil value and a context error. If the queue is closed while it is
// still empty, Wait returns nil, ErrQueueClosed.
func (q *Queue) Wait(ctx context.Context) (interface{}, error) {
// If the context terminates, wake the waiter.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() { <-ctx.Done(); q.nempty.Broadcast() }()
q.mu.Lock()
defer q.mu.Unlock()
for q.queueLen == 0 {
if q.closed {
return nil, ErrQueueClosed
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
q.nempty.Wait()
}
}
return q.popFront(), nil
}
// Close closes the queue. After closing, any further Add calls will report an
// error, but items that were added to the queue prior to closing will still be
// available for Remove and Wait. Wait will report an error without blocking if
// it is called on a closed, empty queue.
func (q *Queue) Close() error {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
q.nempty.Broadcast()
return nil
}
// popFront removes the frontmost item of q and returns its value after
// updating quota and credit settings.
//
// Preconditions: The caller holds q.mu and q is not empty.
func (q *Queue) popFront() interface{} {
e := q.front.link
q.front.link = e.link
if e == q.back {
q.back = q.front
}
q.queueLen--
if q.queueLen < q.softQuota {
// Successfully removing items from the queue below half the soft quota
// lowers the soft quota. This has the effect of increasing the credit cap
// and the amount of credit given for removing items to better approximate
// the rate at which the consumer is servicing the queue.
if q.softQuota > 1 && q.queueLen < q.softQuota/2 {
q.softQuota--
}
// Give credit for being below the soft quota. Note we do this after
// adjusting the quota so the credit reflects the item we just removed.
q.credit += float64(q.softQuota-q.queueLen) / float64(q.softQuota)
if cap := float64(q.hardLimit - q.softQuota); q.credit > cap {
q.credit = cap
}
}
return e.item
}
// Options are the initial settings for a Queue.
type Options struct {
// The maximum number of items the queue will ever be permitted to hold.
// This value must be positive, and greater than or equal to SoftQuota. The
// hard limit is fixed and does not change as the queue is used.
//
// The hard limit should be chosen to exceed the largest burst size expected
// under normal operating conditions.
HardLimit int
// The initial expected maximum number of items the queue should contain on
// an average workload. If this value is zero, it is initialized to the hard
// limit. The soft quota is adjusted from the initial value dynamically as
// the queue is used.
SoftQuota int
// The initial burst credit score. This value must be greater than or equal
// to zero. If it is zero, the soft quota is used.
BurstCredit float64
}
type entry struct {
item interface{}
link *entry
}

+ 188
- 0
internal/libs/queue/queue_test.go View File

@ -0,0 +1,188 @@
package queue
import (
"context"
"testing"
"time"
)
func TestNew(t *testing.T) {
tests := []struct {
desc string
opts Options
want error
}{
{"empty options", Options{}, errHardLimit},
{"zero limit negative quota", Options{SoftQuota: -1}, errHardLimit},
{"zero limit and quota", Options{SoftQuota: 0}, errHardLimit},
{"zero limit", Options{SoftQuota: 1, HardLimit: 0}, errHardLimit},
{"limit less than quota", Options{SoftQuota: 5, HardLimit: 3}, errHardLimit},
{"negative credit", Options{SoftQuota: 1, HardLimit: 1, BurstCredit: -6}, errBurstCredit},
{"valid default credit", Options{SoftQuota: 1, HardLimit: 2, BurstCredit: 0}, nil},
{"valid explicit credit", Options{SoftQuota: 1, HardLimit: 5, BurstCredit: 10}, nil},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
got, err := New(test.opts)
if err != test.want {
t.Errorf("New(%+v): got (%+v, %v), want err=%v", test.opts, got, err, test.want)
}
})
}
}
type testQueue struct {
t *testing.T
*Queue
}
func (q testQueue) mustAdd(item string) {
q.t.Helper()
if err := q.Add(item); err != nil {
q.t.Errorf("Add(%q): unexpected error: %v", item, err)
}
}
func (q testQueue) mustRemove(want string) {
q.t.Helper()
got, ok := q.Remove()
if !ok {
q.t.Error("Remove: queue is empty")
} else if got.(string) != want {
q.t.Errorf("Remove: got %q, want %q", got, want)
}
}
func mustQueue(t *testing.T, opts Options) testQueue {
t.Helper()
q, err := New(opts)
if err != nil {
t.Fatalf("New(%+v): unexpected error: %v", opts, err)
}
return testQueue{t: t, Queue: q}
}
func TestHardLimit(t *testing.T) {
q := mustQueue(t, Options{SoftQuota: 1, HardLimit: 1})
q.mustAdd("foo")
if err := q.Add("bar"); err != ErrQueueFull {
t.Errorf("Add: got err=%v, want %v", err, ErrQueueFull)
}
}
func TestSoftQuota(t *testing.T) {
q := mustQueue(t, Options{SoftQuota: 1, HardLimit: 4})
q.mustAdd("foo")
q.mustAdd("bar")
if err := q.Add("baz"); err != ErrNoCredit {
t.Errorf("Add: got err=%v, want %v", err, ErrNoCredit)
}
}
func TestBurstCredit(t *testing.T) {
q := mustQueue(t, Options{SoftQuota: 2, HardLimit: 5})
q.mustAdd("foo")
q.mustAdd("bar")
// We should still have all our initial credit.
if q.credit < 2 {
t.Errorf("Wrong credit: got %f, want ≥ 2", q.credit)
}
// Removing an item below soft quota should increase our credit.
q.mustRemove("foo")
if q.credit <= 2 {
t.Errorf("wrong credit: got %f, want > 2", q.credit)
}
// Credit should be capped by the hard limit.
q.mustRemove("bar")
q.mustAdd("baz")
q.mustRemove("baz")
if cap := float64(q.hardLimit - q.softQuota); q.credit > cap {
t.Errorf("Wrong credit: got %f, want ≤ %f", q.credit, cap)
}
}
func TestClose(t *testing.T) {
q := mustQueue(t, Options{SoftQuota: 2, HardLimit: 10})
q.mustAdd("alpha")
q.mustAdd("bravo")
q.mustAdd("charlie")
q.Close()
// After closing the queue, subsequent writes should fail.
if err := q.Add("foxtrot"); err == nil {
t.Error("Add should have failed after Close")
}
// However, the remaining contents of the queue should still work.
q.mustRemove("alpha")
q.mustRemove("bravo")
q.mustRemove("charlie")
}
func TestWait(t *testing.T) {
q := mustQueue(t, Options{SoftQuota: 2, HardLimit: 2})
// A wait on an empty queue should time out.
t.Run("WaitTimeout", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
got, err := q.Wait(ctx)
if err == nil {
t.Errorf("Wait: got %v, want error", got)
} else {
t.Logf("Wait correctly failed: %v", err)
}
})
// A wait on a non-empty queue should report an item.
t.Run("WaitNonEmpty", func(t *testing.T) {
const input = "figgy pudding"
q.mustAdd(input)
got, err := q.Wait(context.Background())
if err != nil {
t.Errorf("Wait: unexpected error: %v", err)
} else if got != input {
t.Errorf("Wait: got %q, want %q", got, input)
}
})
// Wait should block until an item arrives.
t.Run("WaitOnEmpty", func(t *testing.T) {
const input = "fleet footed kittens"
done := make(chan struct{})
go func() {
defer close(done)
got, err := q.Wait(context.Background())
if err != nil {
t.Errorf("Wait: unexpected error: %v", err)
} else if got != input {
t.Errorf("Wait: got %q, want %q", got, input)
}
}()
q.mustAdd(input)
<-done
})
// Closing the queue unblocks a wait.
t.Run("UnblockOnClose", func(t *testing.T) {
done := make(chan struct{})
go func() {
defer close(done)
got, err := q.Wait(context.Background())
if err != ErrQueueClosed {
t.Errorf("Wait: got (%v, %v), want %v", got, err, ErrQueueClosed)
}
}()
q.Close()
<-done
})
}

+ 6
- 10
libs/pubsub/pubsub.go View File

@ -190,7 +190,10 @@ func (s *Server) subscribe(ctx context.Context, clientID string, query Query, ou
return nil, ErrAlreadySubscribed return nil, ErrAlreadySubscribed
} }
sub := NewSubscription(outCapacity)
sub, err := newSubscription(outCapacity)
if err != nil {
return nil, err
}
s.subs.index.add(&subInfo{ s.subs.index.add(&subInfo{
clientID: clientID, clientID: clientID,
query: query, query: query,
@ -388,15 +391,8 @@ func (s *Server) send(data interface{}, events []types.Event) error {
// use case doesn't require this affordance, and then remove unbuffered // use case doesn't require this affordance, and then remove unbuffered
// subscriptions. // subscriptions.
msg := NewMessage(si.sub.id, data, events) msg := NewMessage(si.sub.id, data, events)
if cap(si.sub.out) == 0 {
si.sub.out <- msg
continue
}
select {
case si.sub.out <- msg:
// ok, delivered
default:
// slow subscriber, cancel them
if err := si.sub.putMessage(msg); err != nil {
// The subscriber was too slow, cancel them.
evict.add(si) evict.add(si)
} }
} }


+ 5
- 0
libs/pubsub/pubsub_test.go View File

@ -55,6 +55,9 @@ func TestSubscribe(t *testing.T) {
err = s.Publish(ctx, "Ivan") err = s.Publish(ctx, "Ivan")
require.NoError(t, err) require.NoError(t, err)
err = s.Publish(ctx, "Natasha")
require.NoError(t, err)
}() }()
select { select {
@ -146,6 +149,8 @@ func TestSlowClientIsRemovedWithErrOutOfCapacity(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
err = s.Publish(ctx, "Viper") err = s.Publish(ctx, "Viper")
require.NoError(t, err) require.NoError(t, err)
err = s.Publish(ctx, "Black Panther")
require.NoError(t, err)
assertCanceled(t, subscription, pubsub.ErrOutOfCapacity) assertCanceled(t, subscription, pubsub.ErrOutOfCapacity)
} }


+ 54
- 7
libs/pubsub/subscription.go View File

@ -1,11 +1,13 @@
package pubsub package pubsub
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/libs/queue"
tmsync "github.com/tendermint/tendermint/internal/libs/sync" tmsync "github.com/tendermint/tendermint/internal/libs/sync"
) )
@ -24,21 +26,66 @@ var (
// 2) channel which is closed if a client is too slow or choose to unsubscribe // 2) channel which is closed if a client is too slow or choose to unsubscribe
// 3) err indicating the reason for (2) // 3) err indicating the reason for (2)
type Subscription struct { type Subscription struct {
id string
out chan Message
id string
out chan Message
queue *queue.Queue
canceled chan struct{} canceled chan struct{}
stop func()
mtx tmsync.RWMutex mtx tmsync.RWMutex
err error err error
} }
// NewSubscription returns a new subscription with the given outCapacity.
func NewSubscription(outCapacity int) *Subscription {
return &Subscription{
// newSubscription returns a new subscription with the given outCapacity.
func newSubscription(outCapacity int) (*Subscription, error) {
sub := &Subscription{
id: uuid.NewString(), id: uuid.NewString(),
out: make(chan Message, outCapacity),
out: make(chan Message),
canceled: make(chan struct{}), canceled: make(chan struct{}),
// N.B. The output channel is always unbuffered. For an unbuffered
// subscription that was already the case, and for a buffered one the
// queue now serves as the buffer.
}
if outCapacity == 0 {
sub.stop = func() { close(sub.canceled) }
return sub, nil
}
q, err := queue.New(queue.Options{
SoftQuota: outCapacity,
HardLimit: outCapacity,
})
if err != nil {
return nil, fmt.Errorf("creating queue: %w", err)
}
sub.queue = q
sub.stop = func() { q.Close(); close(sub.canceled) }
// Start a goroutine to bridge messages from the queue to the channel.
// TODO(creachadair): This is a temporary hack until we can change the
// interface not to expose the channel directly.
go func() {
for {
next, err := q.Wait(context.Background())
if err != nil {
return // the subscription was terminated
}
sub.out <- next.(Message)
}
}()
return sub, nil
}
// putMessage transmits msg to the subscriber. If s is unbuffered, this blocks
// until msg is delivered and returns nil; otherwise it reports an error if the
// queue cannot accept any further messages.
func (s *Subscription) putMessage(msg Message) error {
if s.queue != nil {
return s.queue.Add(msg)
} }
s.out <- msg
return nil
} }
// Out returns a channel onto which messages and events are published. // Out returns a channel onto which messages and events are published.
@ -81,7 +128,7 @@ func (s *Subscription) cancel(err error) {
s.err = err s.err = err
} }
close(s.canceled)
s.stop()
} }
// Message glues data and events together. // Message glues data and events together.


Loading…
Cancel
Save