diff --git a/internal/libs/queue/queue.go b/internal/libs/queue/queue.go new file mode 100644 index 000000000..7b4199504 --- /dev/null +++ b/internal/libs/queue/queue.go @@ -0,0 +1,232 @@ +// Package queue implements a dynamic FIFO queue with a fixed upper bound +// and a flexible quota mechanism to handle bursty load. +package queue + +import ( + "context" + "errors" + "sync" +) + +var ( + // ErrQueueFull is returned by the Add method of a queue when the queue has + // reached its hard capacity limit. + ErrQueueFull = errors.New("queue is full") + + // ErrNoCredit is returned by the Add method of a queue when the queue has + // exceeded its soft quota and there is insufficient burst credit. + ErrNoCredit = errors.New("insufficient burst credit") + + // ErrQueueClosed is returned by the Add method of a closed queue, and by + // the Wait method of a closed empty queue. + ErrQueueClosed = errors.New("queue is closed") + + // Sentinel errors reported by the New constructor. + errHardLimit = errors.New("hard limit must be > 0 and ≥ soft quota") + errBurstCredit = errors.New("burst credit must be non-negative") +) + +// A Queue is a limited-capacity FIFO queue of arbitrary data items. +// +// A queue has a soft quota and a hard limit on the number of items that may be +// contained in the queue. Adding items in excess of the hard limit will fail +// unconditionally. +// +// For items in excess of the soft quota, a credit system applies: Each queue +// maintains a burst credit score. Adding an item in excess of the soft quota +// costs 1 unit of burst credit. If there is not enough burst credit, the add +// will fail. +// +// The initial burst credit is assigned when the queue is constructed. Removing +// items from the queue adds additional credit if the resulting queue length is +// less than the current soft quota. Burst credit is capped by the hard limit. +// +// A Queue is safe for concurrent use by multiple goroutines. +type Queue struct { + mu sync.Mutex // protects the fields below + + softQuota int // adjusted dynamically (see Add, Remove) + hardLimit int // fixed for the lifespan of the queue + queueLen int // number of entries in the queue list + credit float64 // current burst credit + + closed bool + nempty *sync.Cond + back *entry + front *entry + + // The queue is singly-linked. Front points to the sentinel and back points + // to the newest entry. The oldest entry is front.link if it exists. +} + +// New constructs a new empty queue with the specified options. It reports an +// error if any of the option values are invalid. +func New(opts Options) (*Queue, error) { + if opts.HardLimit <= 0 || opts.HardLimit < opts.SoftQuota { + return nil, errHardLimit + } + if opts.BurstCredit < 0 { + return nil, errBurstCredit + } + if opts.SoftQuota <= 0 { + opts.SoftQuota = opts.HardLimit + } + if opts.BurstCredit == 0 { + opts.BurstCredit = float64(opts.SoftQuota) + } + sentinel := new(entry) + q := &Queue{ + softQuota: opts.SoftQuota, + hardLimit: opts.HardLimit, + credit: opts.BurstCredit, + back: sentinel, + front: sentinel, + } + q.nempty = sync.NewCond(&q.mu) + return q, nil +} + +// Add adds item to the back of the queue. It reports an error and does not +// enqueue the item if the queue is full or closed, or if it exceeds its soft +// quota and there is not enough burst credit. +func (q *Queue) Add(item interface{}) error { + q.mu.Lock() + defer q.mu.Unlock() + + if q.closed { + return ErrQueueClosed + } + + if q.queueLen >= q.softQuota { + if q.queueLen == q.hardLimit { + return ErrQueueFull + } else if q.credit < 1 { + return ErrNoCredit + } + + // Successfully exceeding the soft quota deducts burst credit and raises + // the soft quota. This has the effect of reducing the credit cap and the + // amount of credit given for removing items to better approximate the + // rate at which the consumer is servicing the queue. + q.credit-- + q.softQuota = q.queueLen + 1 + } + e := &entry{item: item} + q.back.link = e + q.back = e + q.queueLen++ + if q.queueLen == 1 { // was empty + q.nempty.Signal() + } + return nil +} + +// Remove removes and returns the frontmost (oldest) item in the queue and +// reports whether an item was available. If the queue is empty, Remove +// returns nil, false. +func (q *Queue) Remove() (interface{}, bool) { + q.mu.Lock() + defer q.mu.Unlock() + + if q.queueLen == 0 { + return nil, false + } + return q.popFront(), true +} + +// Wait blocks until q is non-empty or closed, and then returns the frontmost +// (oldest) item from the queue. If ctx ends before an item is available, Wait +// returns a nil value and a context error. If the queue is closed while it is +// still empty, Wait returns nil, ErrQueueClosed. +func (q *Queue) Wait(ctx context.Context) (interface{}, error) { + // If the context terminates, wake the waiter. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { <-ctx.Done(); q.nempty.Broadcast() }() + + q.mu.Lock() + defer q.mu.Unlock() + + for q.queueLen == 0 { + if q.closed { + return nil, ErrQueueClosed + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + q.nempty.Wait() + } + } + return q.popFront(), nil +} + +// Close closes the queue. After closing, any further Add calls will report an +// error, but items that were added to the queue prior to closing will still be +// available for Remove and Wait. Wait will report an error without blocking if +// it is called on a closed, empty queue. +func (q *Queue) Close() error { + q.mu.Lock() + defer q.mu.Unlock() + q.closed = true + q.nempty.Broadcast() + return nil +} + +// popFront removes the frontmost item of q and returns its value after +// updating quota and credit settings. +// +// Preconditions: The caller holds q.mu and q is not empty. +func (q *Queue) popFront() interface{} { + e := q.front.link + q.front.link = e.link + if e == q.back { + q.back = q.front + } + q.queueLen-- + + if q.queueLen < q.softQuota { + // Successfully removing items from the queue below half the soft quota + // lowers the soft quota. This has the effect of increasing the credit cap + // and the amount of credit given for removing items to better approximate + // the rate at which the consumer is servicing the queue. + if q.softQuota > 1 && q.queueLen < q.softQuota/2 { + q.softQuota-- + } + + // Give credit for being below the soft quota. Note we do this after + // adjusting the quota so the credit reflects the item we just removed. + q.credit += float64(q.softQuota-q.queueLen) / float64(q.softQuota) + if cap := float64(q.hardLimit - q.softQuota); q.credit > cap { + q.credit = cap + } + } + + return e.item +} + +// Options are the initial settings for a Queue. +type Options struct { + // The maximum number of items the queue will ever be permitted to hold. + // This value must be positive, and greater than or equal to SoftQuota. The + // hard limit is fixed and does not change as the queue is used. + // + // The hard limit should be chosen to exceed the largest burst size expected + // under normal operating conditions. + HardLimit int + + // The initial expected maximum number of items the queue should contain on + // an average workload. If this value is zero, it is initialized to the hard + // limit. The soft quota is adjusted from the initial value dynamically as + // the queue is used. + SoftQuota int + + // The initial burst credit score. This value must be greater than or equal + // to zero. If it is zero, the soft quota is used. + BurstCredit float64 +} + +type entry struct { + item interface{} + link *entry +} diff --git a/internal/libs/queue/queue_test.go b/internal/libs/queue/queue_test.go new file mode 100644 index 000000000..f339e07fa --- /dev/null +++ b/internal/libs/queue/queue_test.go @@ -0,0 +1,188 @@ +package queue + +import ( + "context" + "testing" + "time" +) + +func TestNew(t *testing.T) { + tests := []struct { + desc string + opts Options + want error + }{ + {"empty options", Options{}, errHardLimit}, + {"zero limit negative quota", Options{SoftQuota: -1}, errHardLimit}, + {"zero limit and quota", Options{SoftQuota: 0}, errHardLimit}, + {"zero limit", Options{SoftQuota: 1, HardLimit: 0}, errHardLimit}, + {"limit less than quota", Options{SoftQuota: 5, HardLimit: 3}, errHardLimit}, + {"negative credit", Options{SoftQuota: 1, HardLimit: 1, BurstCredit: -6}, errBurstCredit}, + {"valid default credit", Options{SoftQuota: 1, HardLimit: 2, BurstCredit: 0}, nil}, + {"valid explicit credit", Options{SoftQuota: 1, HardLimit: 5, BurstCredit: 10}, nil}, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + got, err := New(test.opts) + if err != test.want { + t.Errorf("New(%+v): got (%+v, %v), want err=%v", test.opts, got, err, test.want) + } + }) + } +} + +type testQueue struct { + t *testing.T + *Queue +} + +func (q testQueue) mustAdd(item string) { + q.t.Helper() + if err := q.Add(item); err != nil { + q.t.Errorf("Add(%q): unexpected error: %v", item, err) + } +} + +func (q testQueue) mustRemove(want string) { + q.t.Helper() + got, ok := q.Remove() + if !ok { + q.t.Error("Remove: queue is empty") + } else if got.(string) != want { + q.t.Errorf("Remove: got %q, want %q", got, want) + } +} + +func mustQueue(t *testing.T, opts Options) testQueue { + t.Helper() + + q, err := New(opts) + if err != nil { + t.Fatalf("New(%+v): unexpected error: %v", opts, err) + } + return testQueue{t: t, Queue: q} +} + +func TestHardLimit(t *testing.T) { + q := mustQueue(t, Options{SoftQuota: 1, HardLimit: 1}) + q.mustAdd("foo") + if err := q.Add("bar"); err != ErrQueueFull { + t.Errorf("Add: got err=%v, want %v", err, ErrQueueFull) + } +} + +func TestSoftQuota(t *testing.T) { + q := mustQueue(t, Options{SoftQuota: 1, HardLimit: 4}) + q.mustAdd("foo") + q.mustAdd("bar") + if err := q.Add("baz"); err != ErrNoCredit { + t.Errorf("Add: got err=%v, want %v", err, ErrNoCredit) + } +} + +func TestBurstCredit(t *testing.T) { + q := mustQueue(t, Options{SoftQuota: 2, HardLimit: 5}) + q.mustAdd("foo") + q.mustAdd("bar") + + // We should still have all our initial credit. + if q.credit < 2 { + t.Errorf("Wrong credit: got %f, want ≥ 2", q.credit) + } + + // Removing an item below soft quota should increase our credit. + q.mustRemove("foo") + if q.credit <= 2 { + t.Errorf("wrong credit: got %f, want > 2", q.credit) + } + + // Credit should be capped by the hard limit. + q.mustRemove("bar") + q.mustAdd("baz") + q.mustRemove("baz") + if cap := float64(q.hardLimit - q.softQuota); q.credit > cap { + t.Errorf("Wrong credit: got %f, want ≤ %f", q.credit, cap) + } +} + +func TestClose(t *testing.T) { + q := mustQueue(t, Options{SoftQuota: 2, HardLimit: 10}) + q.mustAdd("alpha") + q.mustAdd("bravo") + q.mustAdd("charlie") + q.Close() + + // After closing the queue, subsequent writes should fail. + if err := q.Add("foxtrot"); err == nil { + t.Error("Add should have failed after Close") + } + + // However, the remaining contents of the queue should still work. + q.mustRemove("alpha") + q.mustRemove("bravo") + q.mustRemove("charlie") +} + +func TestWait(t *testing.T) { + q := mustQueue(t, Options{SoftQuota: 2, HardLimit: 2}) + + // A wait on an empty queue should time out. + t.Run("WaitTimeout", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + got, err := q.Wait(ctx) + if err == nil { + t.Errorf("Wait: got %v, want error", got) + } else { + t.Logf("Wait correctly failed: %v", err) + } + }) + + // A wait on a non-empty queue should report an item. + t.Run("WaitNonEmpty", func(t *testing.T) { + const input = "figgy pudding" + q.mustAdd(input) + + got, err := q.Wait(context.Background()) + if err != nil { + t.Errorf("Wait: unexpected error: %v", err) + } else if got != input { + t.Errorf("Wait: got %q, want %q", got, input) + } + }) + + // Wait should block until an item arrives. + t.Run("WaitOnEmpty", func(t *testing.T) { + const input = "fleet footed kittens" + + done := make(chan struct{}) + go func() { + defer close(done) + got, err := q.Wait(context.Background()) + if err != nil { + t.Errorf("Wait: unexpected error: %v", err) + } else if got != input { + t.Errorf("Wait: got %q, want %q", got, input) + } + }() + + q.mustAdd(input) + <-done + }) + + // Closing the queue unblocks a wait. + t.Run("UnblockOnClose", func(t *testing.T) { + done := make(chan struct{}) + go func() { + defer close(done) + got, err := q.Wait(context.Background()) + if err != ErrQueueClosed { + t.Errorf("Wait: got (%v, %v), want %v", got, err, ErrQueueClosed) + } + }() + + q.Close() + <-done + }) +} diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 1ae111f63..fdd728961 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -190,7 +190,10 @@ func (s *Server) subscribe(ctx context.Context, clientID string, query Query, ou return nil, ErrAlreadySubscribed } - sub := NewSubscription(outCapacity) + sub, err := newSubscription(outCapacity) + if err != nil { + return nil, err + } s.subs.index.add(&subInfo{ clientID: clientID, query: query, @@ -388,15 +391,8 @@ func (s *Server) send(data interface{}, events []types.Event) error { // use case doesn't require this affordance, and then remove unbuffered // subscriptions. msg := NewMessage(si.sub.id, data, events) - if cap(si.sub.out) == 0 { - si.sub.out <- msg - continue - } - select { - case si.sub.out <- msg: - // ok, delivered - default: - // slow subscriber, cancel them + if err := si.sub.putMessage(msg); err != nil { + // The subscriber was too slow, cancel them. evict.add(si) } } diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go index d3855be7a..e0f1f9eb5 100644 --- a/libs/pubsub/pubsub_test.go +++ b/libs/pubsub/pubsub_test.go @@ -55,6 +55,9 @@ func TestSubscribe(t *testing.T) { err = s.Publish(ctx, "Ivan") require.NoError(t, err) + + err = s.Publish(ctx, "Natasha") + require.NoError(t, err) }() select { @@ -146,6 +149,8 @@ func TestSlowClientIsRemovedWithErrOutOfCapacity(t *testing.T) { require.NoError(t, err) err = s.Publish(ctx, "Viper") require.NoError(t, err) + err = s.Publish(ctx, "Black Panther") + require.NoError(t, err) assertCanceled(t, subscription, pubsub.ErrOutOfCapacity) } diff --git a/libs/pubsub/subscription.go b/libs/pubsub/subscription.go index 4210416b6..827e5e6a6 100644 --- a/libs/pubsub/subscription.go +++ b/libs/pubsub/subscription.go @@ -1,11 +1,13 @@ package pubsub import ( + "context" "errors" "fmt" "github.com/google/uuid" "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/internal/libs/queue" tmsync "github.com/tendermint/tendermint/internal/libs/sync" ) @@ -24,21 +26,66 @@ var ( // 2) channel which is closed if a client is too slow or choose to unsubscribe // 3) err indicating the reason for (2) type Subscription struct { - id string - out chan Message + id string + out chan Message + queue *queue.Queue canceled chan struct{} + stop func() mtx tmsync.RWMutex err error } -// NewSubscription returns a new subscription with the given outCapacity. -func NewSubscription(outCapacity int) *Subscription { - return &Subscription{ +// newSubscription returns a new subscription with the given outCapacity. +func newSubscription(outCapacity int) (*Subscription, error) { + sub := &Subscription{ id: uuid.NewString(), - out: make(chan Message, outCapacity), + out: make(chan Message), canceled: make(chan struct{}), + + // N.B. The output channel is always unbuffered. For an unbuffered + // subscription that was already the case, and for a buffered one the + // queue now serves as the buffer. + } + + if outCapacity == 0 { + sub.stop = func() { close(sub.canceled) } + return sub, nil + } + q, err := queue.New(queue.Options{ + SoftQuota: outCapacity, + HardLimit: outCapacity, + }) + if err != nil { + return nil, fmt.Errorf("creating queue: %w", err) + } + sub.queue = q + sub.stop = func() { q.Close(); close(sub.canceled) } + + // Start a goroutine to bridge messages from the queue to the channel. + // TODO(creachadair): This is a temporary hack until we can change the + // interface not to expose the channel directly. + go func() { + for { + next, err := q.Wait(context.Background()) + if err != nil { + return // the subscription was terminated + } + sub.out <- next.(Message) + } + }() + return sub, nil +} + +// putMessage transmits msg to the subscriber. If s is unbuffered, this blocks +// until msg is delivered and returns nil; otherwise it reports an error if the +// queue cannot accept any further messages. +func (s *Subscription) putMessage(msg Message) error { + if s.queue != nil { + return s.queue.Add(msg) } + s.out <- msg + return nil } // Out returns a channel onto which messages and events are published. @@ -81,7 +128,7 @@ func (s *Subscription) cancel(err error) { s.err = err } - close(s.canceled) + s.stop() } // Message glues data and events together.