You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

232 lines
7.1 KiB

// Package queue implements a dynamic FIFO queue with a fixed upper bound
// and a flexible quota mechanism to handle bursty load.
package queue
import (
"context"
"errors"
"sync"
)
var (
// ErrQueueFull is returned by the Add method of a queue when the queue has
// reached its hard capacity limit.
ErrQueueFull = errors.New("queue is full")
// ErrNoCredit is returned by the Add method of a queue when the queue has
// exceeded its soft quota and there is insufficient burst credit.
ErrNoCredit = errors.New("insufficient burst credit")
// ErrQueueClosed is returned by the Add method of a closed queue, and by
// the Wait method of a closed empty queue.
ErrQueueClosed = errors.New("queue is closed")
// Sentinel errors reported by the New constructor.
errHardLimit = errors.New("hard limit must be > 0 and ≥ soft quota")
errBurstCredit = errors.New("burst credit must be non-negative")
)
// A Queue is a limited-capacity FIFO queue of arbitrary data items.
//
// A queue has a soft quota and a hard limit on the number of items that may be
// contained in the queue. Adding items in excess of the hard limit will fail
// unconditionally.
//
// For items in excess of the soft quota, a credit system applies: Each queue
// maintains a burst credit score. Adding an item in excess of the soft quota
// costs 1 unit of burst credit. If there is not enough burst credit, the add
// will fail.
//
// The initial burst credit is assigned when the queue is constructed. Removing
// items from the queue adds additional credit if the resulting queue length is
// less than the current soft quota. Burst credit is capped by the hard limit.
//
// A Queue is safe for concurrent use by multiple goroutines.
type Queue struct {
mu sync.Mutex // protects the fields below
softQuota int // adjusted dynamically (see Add, Remove)
hardLimit int // fixed for the lifespan of the queue
queueLen int // number of entries in the queue list
credit float64 // current burst credit
closed bool
nempty *sync.Cond
back *entry
front *entry
// The queue is singly-linked. Front points to the sentinel and back points
// to the newest entry. The oldest entry is front.link if it exists.
}
// New constructs a new empty queue with the specified options. It reports an
// error if any of the option values are invalid.
func New(opts Options) (*Queue, error) {
if opts.HardLimit <= 0 || opts.HardLimit < opts.SoftQuota {
return nil, errHardLimit
}
if opts.BurstCredit < 0 {
return nil, errBurstCredit
}
if opts.SoftQuota <= 0 {
opts.SoftQuota = opts.HardLimit
}
if opts.BurstCredit == 0 {
opts.BurstCredit = float64(opts.SoftQuota)
}
sentinel := new(entry)
q := &Queue{
softQuota: opts.SoftQuota,
hardLimit: opts.HardLimit,
credit: opts.BurstCredit,
back: sentinel,
front: sentinel,
}
q.nempty = sync.NewCond(&q.mu)
return q, nil
}
// Add adds item to the back of the queue. It reports an error and does not
// enqueue the item if the queue is full or closed, or if it exceeds its soft
// quota and there is not enough burst credit.
func (q *Queue) Add(item interface{}) error {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return ErrQueueClosed
}
if q.queueLen >= q.softQuota {
if q.queueLen == q.hardLimit {
return ErrQueueFull
} else if q.credit < 1 {
return ErrNoCredit
}
// Successfully exceeding the soft quota deducts burst credit and raises
// the soft quota. This has the effect of reducing the credit cap and the
// amount of credit given for removing items to better approximate the
// rate at which the consumer is servicing the queue.
q.credit--
q.softQuota = q.queueLen + 1
}
e := &entry{item: item}
q.back.link = e
q.back = e
q.queueLen++
if q.queueLen == 1 { // was empty
q.nempty.Signal()
}
return nil
}
// Remove removes and returns the frontmost (oldest) item in the queue and
// reports whether an item was available. If the queue is empty, Remove
// returns nil, false.
func (q *Queue) Remove() (interface{}, bool) {
q.mu.Lock()
defer q.mu.Unlock()
if q.queueLen == 0 {
return nil, false
}
return q.popFront(), true
}
// Wait blocks until q is non-empty or closed, and then returns the frontmost
// (oldest) item from the queue. If ctx ends before an item is available, Wait
// returns a nil value and a context error. If the queue is closed while it is
// still empty, Wait returns nil, ErrQueueClosed.
func (q *Queue) Wait(ctx context.Context) (interface{}, error) {
// If the context terminates, wake the waiter.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() { <-ctx.Done(); q.nempty.Broadcast() }()
q.mu.Lock()
defer q.mu.Unlock()
for q.queueLen == 0 {
if q.closed {
return nil, ErrQueueClosed
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
q.nempty.Wait()
}
}
return q.popFront(), nil
}
// Close closes the queue. After closing, any further Add calls will report an
// error, but items that were added to the queue prior to closing will still be
// available for Remove and Wait. Wait will report an error without blocking if
// it is called on a closed, empty queue.
func (q *Queue) Close() error {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
q.nempty.Broadcast()
return nil
}
// popFront removes the frontmost item of q and returns its value after
// updating quota and credit settings.
//
// Preconditions: The caller holds q.mu and q is not empty.
func (q *Queue) popFront() interface{} {
e := q.front.link
q.front.link = e.link
if e == q.back {
q.back = q.front
}
q.queueLen--
if q.queueLen < q.softQuota {
// Successfully removing items from the queue below half the soft quota
// lowers the soft quota. This has the effect of increasing the credit cap
// and the amount of credit given for removing items to better approximate
// the rate at which the consumer is servicing the queue.
if q.softQuota > 1 && q.queueLen < q.softQuota/2 {
q.softQuota--
}
// Give credit for being below the soft quota. Note we do this after
// adjusting the quota so the credit reflects the item we just removed.
q.credit += float64(q.softQuota-q.queueLen) / float64(q.softQuota)
if cap := float64(q.hardLimit - q.softQuota); q.credit > cap {
q.credit = cap
}
}
return e.item
}
// Options are the initial settings for a Queue.
type Options struct {
// The maximum number of items the queue will ever be permitted to hold.
// This value must be positive, and greater than or equal to SoftQuota. The
// hard limit is fixed and does not change as the queue is used.
//
// The hard limit should be chosen to exceed the largest burst size expected
// under normal operating conditions.
HardLimit int
// The initial expected maximum number of items the queue should contain on
// an average workload. If this value is zero, it is initialized to the hard
// limit. The soft quota is adjusted from the initial value dynamically as
// the queue is used.
SoftQuota int
// The initial burst credit score. This value must be greater than or equal
// to zero. If it is zero, the soft quota is used.
BurstCredit float64
}
type entry struct {
item interface{}
link *entry
}