|
|
@ -44,10 +44,8 @@ waiting on NextWait() (since it's just a read operation). |
|
|
|
type CElement struct { |
|
|
|
mtx sync.RWMutex |
|
|
|
prev *CElement |
|
|
|
prevWg *sync.WaitGroup |
|
|
|
prevWaitCh chan struct{} |
|
|
|
next *CElement |
|
|
|
nextWg *sync.WaitGroup |
|
|
|
nextWaitCh chan struct{} |
|
|
|
removed bool |
|
|
|
|
|
|
@ -60,15 +58,15 @@ func (e *CElement) NextWait() *CElement { |
|
|
|
for { |
|
|
|
e.mtx.RLock() |
|
|
|
next := e.next |
|
|
|
nextWg := e.nextWg |
|
|
|
removed := e.removed |
|
|
|
signal := e.nextWaitCh |
|
|
|
e.mtx.RUnlock() |
|
|
|
|
|
|
|
if next != nil || removed { |
|
|
|
return next |
|
|
|
} |
|
|
|
|
|
|
|
nextWg.Wait() |
|
|
|
<-signal |
|
|
|
// e.next doesn't necessarily exist here.
|
|
|
|
// That's why we need to continue a for-loop.
|
|
|
|
} |
|
|
@ -80,15 +78,15 @@ func (e *CElement) PrevWait() *CElement { |
|
|
|
for { |
|
|
|
e.mtx.RLock() |
|
|
|
prev := e.prev |
|
|
|
prevWg := e.prevWg |
|
|
|
removed := e.removed |
|
|
|
signal := e.prevWaitCh |
|
|
|
e.mtx.RUnlock() |
|
|
|
|
|
|
|
if prev != nil || removed { |
|
|
|
return prev |
|
|
|
} |
|
|
|
|
|
|
|
prevWg.Wait() |
|
|
|
<-signal |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -166,11 +164,9 @@ func (e *CElement) SetNext(newNext *CElement) { |
|
|
|
// If a WaitGroup is reused to wait for several independent sets of
|
|
|
|
// events, new Add calls must happen after all previous Wait calls have
|
|
|
|
// returned.
|
|
|
|
e.nextWg = waitGroup1() // WaitGroups are difficult to re-use.
|
|
|
|
e.nextWaitCh = make(chan struct{}) |
|
|
|
} |
|
|
|
if oldNext == nil && newNext != nil { |
|
|
|
e.nextWg.Done() |
|
|
|
close(e.nextWaitCh) |
|
|
|
} |
|
|
|
e.mtx.Unlock() |
|
|
@ -180,35 +176,31 @@ func (e *CElement) SetNext(newNext *CElement) { |
|
|
|
// concurrent goroutines waiting on prevWg
|
|
|
|
func (e *CElement) SetPrev(newPrev *CElement) { |
|
|
|
e.mtx.Lock() |
|
|
|
defer e.mtx.Unlock() |
|
|
|
|
|
|
|
oldPrev := e.prev |
|
|
|
e.prev = newPrev |
|
|
|
if oldPrev != nil && newPrev == nil { |
|
|
|
e.prevWg = waitGroup1() // WaitGroups are difficult to re-use.
|
|
|
|
e.prevWaitCh = make(chan struct{}) |
|
|
|
} |
|
|
|
if oldPrev == nil && newPrev != nil { |
|
|
|
e.prevWg.Done() |
|
|
|
close(e.prevWaitCh) |
|
|
|
} |
|
|
|
e.mtx.Unlock() |
|
|
|
} |
|
|
|
|
|
|
|
func (e *CElement) SetRemoved() { |
|
|
|
e.mtx.Lock() |
|
|
|
defer e.mtx.Unlock() |
|
|
|
|
|
|
|
e.removed = true |
|
|
|
|
|
|
|
// This wakes up anyone waiting in either direction.
|
|
|
|
if e.prev == nil { |
|
|
|
e.prevWg.Done() |
|
|
|
close(e.prevWaitCh) |
|
|
|
} |
|
|
|
if e.next == nil { |
|
|
|
e.nextWg.Done() |
|
|
|
close(e.nextWaitCh) |
|
|
|
} |
|
|
|
e.mtx.Unlock() |
|
|
|
} |
|
|
|
|
|
|
|
//--------------------------------------------------------------------------------
|
|
|
@ -236,7 +228,6 @@ func newWithMax(maxLength int) *CList { |
|
|
|
l := new(CList) |
|
|
|
l.maxLen = maxLength |
|
|
|
|
|
|
|
l.wg = waitGroup1() |
|
|
|
l.waitCh = make(chan struct{}) |
|
|
|
l.head = nil |
|
|
|
l.tail = nil |
|
|
@ -264,13 +255,13 @@ func (l *CList) FrontWait() *CElement { |
|
|
|
for { |
|
|
|
l.mtx.RLock() |
|
|
|
head := l.head |
|
|
|
wg := l.wg |
|
|
|
signal := l.waitCh |
|
|
|
l.mtx.RUnlock() |
|
|
|
|
|
|
|
if head != nil { |
|
|
|
return head |
|
|
|
} |
|
|
|
wg.Wait() |
|
|
|
<-signal |
|
|
|
// NOTE: If you think l.head exists here, think harder.
|
|
|
|
} |
|
|
|
} |
|
|
@ -314,10 +305,8 @@ func (l *CList) PushBack(v interface{}) *CElement { |
|
|
|
// Construct a new element
|
|
|
|
e := &CElement{ |
|
|
|
prev: nil, |
|
|
|
prevWg: waitGroup1(), |
|
|
|
prevWaitCh: make(chan struct{}), |
|
|
|
next: nil, |
|
|
|
nextWg: waitGroup1(), |
|
|
|
nextWaitCh: make(chan struct{}), |
|
|
|
removed: false, |
|
|
|
Value: v, |
|
|
@ -325,7 +314,6 @@ func (l *CList) PushBack(v interface{}) *CElement { |
|
|
|
|
|
|
|
// Release waiters on FrontWait/BackWait maybe
|
|
|
|
if l.len == 0 { |
|
|
|
l.wg.Done() |
|
|
|
close(l.waitCh) |
|
|
|
} |
|
|
|
if l.len >= l.maxLen { |
|
|
@ -350,26 +338,23 @@ func (l *CList) PushBack(v interface{}) *CElement { |
|
|
|
// NOTE: As per the contract of CList, removed elements cannot be added back.
|
|
|
|
func (l *CList) Remove(e *CElement) interface{} { |
|
|
|
l.mtx.Lock() |
|
|
|
defer l.mtx.Unlock() |
|
|
|
|
|
|
|
prev := e.Prev() |
|
|
|
next := e.Next() |
|
|
|
|
|
|
|
if l.head == nil || l.tail == nil { |
|
|
|
l.mtx.Unlock() |
|
|
|
panic("Remove(e) on empty CList") |
|
|
|
} |
|
|
|
if prev == nil && l.head != e { |
|
|
|
l.mtx.Unlock() |
|
|
|
panic("Remove(e) with false head") |
|
|
|
} |
|
|
|
if next == nil && l.tail != e { |
|
|
|
l.mtx.Unlock() |
|
|
|
panic("Remove(e) with false tail") |
|
|
|
} |
|
|
|
|
|
|
|
// If we're removing the only item, make CList FrontWait/BackWait wait.
|
|
|
|
if l.len == 1 { |
|
|
|
l.wg = waitGroup1() // WaitGroups are difficult to re-use.
|
|
|
|
l.waitCh = make(chan struct{}) |
|
|
|
} |
|
|
|
|
|
|
@ -391,12 +376,5 @@ func (l *CList) Remove(e *CElement) interface{} { |
|
|
|
// Set .Done() on e, otherwise waiters will wait forever.
|
|
|
|
e.SetRemoved() |
|
|
|
|
|
|
|
l.mtx.Unlock() |
|
|
|
return e.Value |
|
|
|
} |
|
|
|
|
|
|
|
func waitGroup1() (wg *sync.WaitGroup) { |
|
|
|
wg = &sync.WaitGroup{} |
|
|
|
wg.Add(1) |
|
|
|
return |
|
|
|
} |