diff --git a/clist/clist.go b/clist/clist.go index 5295dd995..a52920f8c 100644 --- a/clist/clist.go +++ b/clist/clist.go @@ -1,46 +1,68 @@ package clist /* + The purpose of CList is to provide a goroutine-safe linked-list. This list can be traversed concurrently by any number of goroutines. However, removed CElements cannot be added back. NOTE: Not all methods of container/list are (yet) implemented. NOTE: Removed elements need to DetachPrev or DetachNext consistently to ensure garbage collection of removed elements. + */ import ( "sync" - "sync/atomic" - "unsafe" ) -// CElement is an element of a linked-list -// Traversal from a CElement are goroutine-safe. +/* + +CElement is an element of a linked-list +Traversal from a CElement is goroutine-safe. + +We can't avoid using WaitGroups or for-loops given the documentation +spec without re-implementing the primitives that already exist in +golang/sync. Notice that WaitGroup allows many go-routines to be +simultaneously released, which is what we want. Mutex doesn't do +this. RWMutex does this, but it's clumsy to use in the way that a +WaitGroup would be used -- and we'd end up having two RWMutex's for +prev/next each, which is doubly confusing. + +sync.Cond would be sort-of useful, but we don't need a write-lock in +the for-loop. Use sync.Cond when you need serial access to the +"condition". In our case our condition is if `next != nil || removed`, +and there's no reason to serialize that condition for goroutines +waiting on NextWait() (since it's just a read operation). + +*/ type CElement struct { - prev unsafe.Pointer + mtx sync.RWMutex + prev *CElement prevWg *sync.WaitGroup - next unsafe.Pointer + next *CElement nextWg *sync.WaitGroup - removed uint32 - Value interface{} + removed bool + + Value interface{} // immutable } // Blocking implementation of Next(). // May return nil iff CElement was tail and got removed. func (e *CElement) NextWait() *CElement { for { - e.nextWg.Wait() - next := e.Next() - if next == nil { - if e.Removed() { - return nil - } else { - continue - } - } else { + e.mtx.RLock() + next := e.next + nextWg := e.nextWg + removed := e.removed + e.mtx.RUnlock() + + if next != nil || removed { return next } + + nextWg.Wait() + // e.next doesn't necessarily exist here. + // That's why we need to continue a for-loop. } } @@ -48,82 +70,113 @@ func (e *CElement) NextWait() *CElement { // May return nil iff CElement was head and got removed. func (e *CElement) PrevWait() *CElement { for { - e.prevWg.Wait() - prev := e.Prev() - if prev == nil { - if e.Removed() { - return nil - } else { - continue - } - } else { + e.mtx.RLock() + prev := e.prev + prevWg := e.prevWg + removed := e.removed + e.mtx.RUnlock() + + if prev != nil || removed { return prev } + + prevWg.Wait() } } // Nonblocking, may return nil if at the end. func (e *CElement) Next() *CElement { - return (*CElement)(atomic.LoadPointer(&e.next)) + e.mtx.RLock() + defer e.mtx.RUnlock() + + return e.next } // Nonblocking, may return nil if at the end. func (e *CElement) Prev() *CElement { - return (*CElement)(atomic.LoadPointer(&e.prev)) + e.mtx.RLock() + defer e.mtx.RUnlock() + + return e.prev } func (e *CElement) Removed() bool { - return atomic.LoadUint32(&(e.removed)) > 0 + e.mtx.RLock() + defer e.mtx.RUnlock() + + return e.removed } func (e *CElement) DetachNext() { if !e.Removed() { panic("DetachNext() must be called after Remove(e)") } - atomic.StorePointer(&e.next, nil) + e.mtx.Lock() + defer e.mtx.Unlock() + + e.next = nil } func (e *CElement) DetachPrev() { if !e.Removed() { panic("DetachPrev() must be called after Remove(e)") } - atomic.StorePointer(&e.prev, nil) + e.mtx.Lock() + defer e.mtx.Unlock() + + e.prev = nil } -func (e *CElement) setNextAtomic(next *CElement) { - for { - oldNext := atomic.LoadPointer(&e.next) - if !atomic.CompareAndSwapPointer(&(e.next), oldNext, unsafe.Pointer(next)) { - continue - } - if next == nil && oldNext != nil { // We for-loop in NextWait() so race is ok - e.nextWg.Add(1) - } - if next != nil && oldNext == nil { - e.nextWg.Done() - } - return +// NOTE: This function needs to be safe for +// concurrent goroutines waiting on nextWg. +func (e *CElement) SetNext(newNext *CElement) { + e.mtx.Lock() + defer e.mtx.Unlock() + + oldNext := e.next + e.next = newNext + if oldNext != nil && newNext == nil { + // See https://golang.org/pkg/sync/: + // + // If a WaitGroup is reused to wait for several independent sets of + // events, new Add calls must happen after all previous Wait calls have + // returned. + e.nextWg = waitGroup1() // WaitGroups are difficult to re-use. + } + if oldNext == nil && newNext != nil { + e.nextWg.Done() } } -func (e *CElement) setPrevAtomic(prev *CElement) { - for { - oldPrev := atomic.LoadPointer(&e.prev) - if !atomic.CompareAndSwapPointer(&(e.prev), oldPrev, unsafe.Pointer(prev)) { - continue - } - if prev == nil && oldPrev != nil { // We for-loop in PrevWait() so race is ok - e.prevWg.Add(1) - } - if prev != nil && oldPrev == nil { - e.prevWg.Done() - } - return +// NOTE: This function needs to be safe for +// concurrent goroutines waiting on prevWg +func (e *CElement) SetPrev(newPrev *CElement) { + e.mtx.Lock() + defer e.mtx.Unlock() + + oldPrev := e.prev + e.prev = newPrev + if oldPrev != nil && newPrev == nil { + e.prevWg = waitGroup1() // WaitGroups are difficult to re-use. + } + if oldPrev == nil && newPrev != nil { + e.prevWg.Done() } } -func (e *CElement) setRemovedAtomic() { - atomic.StoreUint32(&(e.removed), 1) +func (e *CElement) SetRemoved() { + e.mtx.Lock() + defer e.mtx.Unlock() + + e.removed = true + + // This wakes up anyone waiting in either direction. + if e.prev == nil { + e.prevWg.Done() + } + if e.next == nil { + e.nextWg.Done() + } } //-------------------------------------------------------------------------------- @@ -132,7 +185,7 @@ func (e *CElement) setRemovedAtomic() { // The zero value for CList is an empty list ready to use. // Operations are goroutine-safe. type CList struct { - mtx sync.Mutex + mtx sync.RWMutex wg *sync.WaitGroup head *CElement // first element tail *CElement // last element @@ -142,6 +195,7 @@ type CList struct { func (l *CList) Init() *CList { l.mtx.Lock() defer l.mtx.Unlock() + l.wg = waitGroup1() l.head = nil l.tail = nil @@ -152,48 +206,55 @@ func (l *CList) Init() *CList { func New() *CList { return new(CList).Init() } func (l *CList) Len() int { - l.mtx.Lock() - defer l.mtx.Unlock() + l.mtx.RLock() + defer l.mtx.RUnlock() + return l.len } func (l *CList) Front() *CElement { - l.mtx.Lock() - defer l.mtx.Unlock() + l.mtx.RLock() + defer l.mtx.RUnlock() + return l.head } func (l *CList) FrontWait() *CElement { + // Loop until the head is non-nil else wait and try again for { - l.mtx.Lock() + l.mtx.RLock() head := l.head wg := l.wg - l.mtx.Unlock() - if head == nil { - wg.Wait() - } else { + l.mtx.RUnlock() + + if head != nil { return head } + wg.Wait() + // NOTE: If you think l.head exists here, think harder. } } func (l *CList) Back() *CElement { - l.mtx.Lock() - defer l.mtx.Unlock() + l.mtx.RLock() + defer l.mtx.RUnlock() + return l.tail } func (l *CList) BackWait() *CElement { for { - l.mtx.Lock() + l.mtx.RLock() tail := l.tail wg := l.wg - l.mtx.Unlock() - if tail == nil { - wg.Wait() - } else { + l.mtx.RUnlock() + + if tail != nil { return tail } + wg.Wait() + // l.tail doesn't necessarily exist here. + // That's why we need to continue a for-loop. } } @@ -203,11 +264,12 @@ func (l *CList) PushBack(v interface{}) *CElement { // Construct a new element e := &CElement{ - prev: nil, - prevWg: waitGroup1(), - next: nil, - nextWg: waitGroup1(), - Value: v, + prev: nil, + prevWg: waitGroup1(), + next: nil, + nextWg: waitGroup1(), + removed: false, + Value: v, } // Release waiters on FrontWait/BackWait maybe @@ -221,9 +283,9 @@ func (l *CList) PushBack(v interface{}) *CElement { l.head = e l.tail = e } else { - l.tail.setNextAtomic(e) - e.setPrevAtomic(l.tail) - l.tail = e + e.SetPrev(l.tail) // We must init e first. + l.tail.SetNext(e) // This will make e accessible. + l.tail = e // Update the list. } return e @@ -250,30 +312,26 @@ func (l *CList) Remove(e *CElement) interface{} { // If we're removing the only item, make CList FrontWait/BackWait wait. if l.len == 1 { - l.wg.Add(1) + l.wg = waitGroup1() // WaitGroups are difficult to re-use. } + + // Update l.len l.len -= 1 // Connect next/prev and set head/tail if prev == nil { l.head = next } else { - prev.setNextAtomic(next) + prev.SetNext(next) } if next == nil { l.tail = prev } else { - next.setPrevAtomic(prev) + next.SetPrev(prev) } // Set .Done() on e, otherwise waiters will wait forever. - e.setRemovedAtomic() - if prev == nil { - e.prevWg.Done() - } - if next == nil { - e.nextWg.Done() - } + e.SetRemoved() return e.Value }