Browse Source

Merge pull request #113 from tendermint/hotfix/clist

Fix #112 by using RWMutex per element
pull/1842/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
6372c415a5
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 152 additions and 94 deletions
  1. +152
    -94
      clist/clist.go

+ 152
- 94
clist/clist.go View File

@ -1,46 +1,68 @@
package clist package clist
/* /*
The purpose of CList is to provide a goroutine-safe linked-list. The purpose of CList is to provide a goroutine-safe linked-list.
This list can be traversed concurrently by any number of goroutines. This list can be traversed concurrently by any number of goroutines.
However, removed CElements cannot be added back. However, removed CElements cannot be added back.
NOTE: Not all methods of container/list are (yet) implemented. NOTE: Not all methods of container/list are (yet) implemented.
NOTE: Removed elements need to DetachPrev or DetachNext consistently NOTE: Removed elements need to DetachPrev or DetachNext consistently
to ensure garbage collection of removed elements. to ensure garbage collection of removed elements.
*/ */
import ( import (
"sync" "sync"
"sync/atomic"
"unsafe"
) )
// CElement is an element of a linked-list
// Traversal from a CElement are goroutine-safe.
/*
CElement is an element of a linked-list
Traversal from a CElement is goroutine-safe.
We can't avoid using WaitGroups or for-loops given the documentation
spec without re-implementing the primitives that already exist in
golang/sync. Notice that WaitGroup allows many go-routines to be
simultaneously released, which is what we want. Mutex doesn't do
this. RWMutex does this, but it's clumsy to use in the way that a
WaitGroup would be used -- and we'd end up having two RWMutex's for
prev/next each, which is doubly confusing.
sync.Cond would be sort-of useful, but we don't need a write-lock in
the for-loop. Use sync.Cond when you need serial access to the
"condition". In our case our condition is if `next != nil || removed`,
and there's no reason to serialize that condition for goroutines
waiting on NextWait() (since it's just a read operation).
*/
type CElement struct { type CElement struct {
prev unsafe.Pointer
mtx sync.RWMutex
prev *CElement
prevWg *sync.WaitGroup prevWg *sync.WaitGroup
next unsafe.Pointer
next *CElement
nextWg *sync.WaitGroup nextWg *sync.WaitGroup
removed uint32
Value interface{}
removed bool
Value interface{} // immutable
} }
// Blocking implementation of Next(). // Blocking implementation of Next().
// May return nil iff CElement was tail and got removed. // May return nil iff CElement was tail and got removed.
func (e *CElement) NextWait() *CElement { func (e *CElement) NextWait() *CElement {
for { for {
e.nextWg.Wait()
next := e.Next()
if next == nil {
if e.Removed() {
return nil
} else {
continue
}
} else {
e.mtx.RLock()
next := e.next
nextWg := e.nextWg
removed := e.removed
e.mtx.RUnlock()
if next != nil || removed {
return next return next
} }
nextWg.Wait()
// e.next doesn't necessarily exist here.
// That's why we need to continue a for-loop.
} }
} }
@ -48,82 +70,113 @@ func (e *CElement) NextWait() *CElement {
// May return nil iff CElement was head and got removed. // May return nil iff CElement was head and got removed.
func (e *CElement) PrevWait() *CElement { func (e *CElement) PrevWait() *CElement {
for { for {
e.prevWg.Wait()
prev := e.Prev()
if prev == nil {
if e.Removed() {
return nil
} else {
continue
}
} else {
e.mtx.RLock()
prev := e.prev
prevWg := e.prevWg
removed := e.removed
e.mtx.RUnlock()
if prev != nil || removed {
return prev return prev
} }
prevWg.Wait()
} }
} }
// Nonblocking, may return nil if at the end. // Nonblocking, may return nil if at the end.
func (e *CElement) Next() *CElement { func (e *CElement) Next() *CElement {
return (*CElement)(atomic.LoadPointer(&e.next))
e.mtx.RLock()
defer e.mtx.RUnlock()
return e.next
} }
// Nonblocking, may return nil if at the end. // Nonblocking, may return nil if at the end.
func (e *CElement) Prev() *CElement { func (e *CElement) Prev() *CElement {
return (*CElement)(atomic.LoadPointer(&e.prev))
e.mtx.RLock()
defer e.mtx.RUnlock()
return e.prev
} }
func (e *CElement) Removed() bool { func (e *CElement) Removed() bool {
return atomic.LoadUint32(&(e.removed)) > 0
e.mtx.RLock()
defer e.mtx.RUnlock()
return e.removed
} }
func (e *CElement) DetachNext() { func (e *CElement) DetachNext() {
if !e.Removed() { if !e.Removed() {
panic("DetachNext() must be called after Remove(e)") panic("DetachNext() must be called after Remove(e)")
} }
atomic.StorePointer(&e.next, nil)
e.mtx.Lock()
defer e.mtx.Unlock()
e.next = nil
} }
func (e *CElement) DetachPrev() { func (e *CElement) DetachPrev() {
if !e.Removed() { if !e.Removed() {
panic("DetachPrev() must be called after Remove(e)") panic("DetachPrev() must be called after Remove(e)")
} }
atomic.StorePointer(&e.prev, nil)
e.mtx.Lock()
defer e.mtx.Unlock()
e.prev = nil
} }
func (e *CElement) setNextAtomic(next *CElement) {
for {
oldNext := atomic.LoadPointer(&e.next)
if !atomic.CompareAndSwapPointer(&(e.next), oldNext, unsafe.Pointer(next)) {
continue
}
if next == nil && oldNext != nil { // We for-loop in NextWait() so race is ok
e.nextWg.Add(1)
}
if next != nil && oldNext == nil {
e.nextWg.Done()
}
return
// NOTE: This function needs to be safe for
// concurrent goroutines waiting on nextWg.
func (e *CElement) SetNext(newNext *CElement) {
e.mtx.Lock()
defer e.mtx.Unlock()
oldNext := e.next
e.next = newNext
if oldNext != nil && newNext == nil {
// See https://golang.org/pkg/sync/:
//
// If a WaitGroup is reused to wait for several independent sets of
// events, new Add calls must happen after all previous Wait calls have
// returned.
e.nextWg = waitGroup1() // WaitGroups are difficult to re-use.
}
if oldNext == nil && newNext != nil {
e.nextWg.Done()
} }
} }
func (e *CElement) setPrevAtomic(prev *CElement) {
for {
oldPrev := atomic.LoadPointer(&e.prev)
if !atomic.CompareAndSwapPointer(&(e.prev), oldPrev, unsafe.Pointer(prev)) {
continue
}
if prev == nil && oldPrev != nil { // We for-loop in PrevWait() so race is ok
e.prevWg.Add(1)
}
if prev != nil && oldPrev == nil {
e.prevWg.Done()
}
return
// NOTE: This function needs to be safe for
// concurrent goroutines waiting on prevWg
func (e *CElement) SetPrev(newPrev *CElement) {
e.mtx.Lock()
defer e.mtx.Unlock()
oldPrev := e.prev
e.prev = newPrev
if oldPrev != nil && newPrev == nil {
e.prevWg = waitGroup1() // WaitGroups are difficult to re-use.
}
if oldPrev == nil && newPrev != nil {
e.prevWg.Done()
} }
} }
func (e *CElement) setRemovedAtomic() {
atomic.StoreUint32(&(e.removed), 1)
func (e *CElement) SetRemoved() {
e.mtx.Lock()
defer e.mtx.Unlock()
e.removed = true
// This wakes up anyone waiting in either direction.
if e.prev == nil {
e.prevWg.Done()
}
if e.next == nil {
e.nextWg.Done()
}
} }
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
@ -132,7 +185,7 @@ func (e *CElement) setRemovedAtomic() {
// The zero value for CList is an empty list ready to use. // The zero value for CList is an empty list ready to use.
// Operations are goroutine-safe. // Operations are goroutine-safe.
type CList struct { type CList struct {
mtx sync.Mutex
mtx sync.RWMutex
wg *sync.WaitGroup wg *sync.WaitGroup
head *CElement // first element head *CElement // first element
tail *CElement // last element tail *CElement // last element
@ -142,6 +195,7 @@ type CList struct {
func (l *CList) Init() *CList { func (l *CList) Init() *CList {
l.mtx.Lock() l.mtx.Lock()
defer l.mtx.Unlock() defer l.mtx.Unlock()
l.wg = waitGroup1() l.wg = waitGroup1()
l.head = nil l.head = nil
l.tail = nil l.tail = nil
@ -152,48 +206,55 @@ func (l *CList) Init() *CList {
func New() *CList { return new(CList).Init() } func New() *CList { return new(CList).Init() }
func (l *CList) Len() int { func (l *CList) Len() int {
l.mtx.Lock()
defer l.mtx.Unlock()
l.mtx.RLock()
defer l.mtx.RUnlock()
return l.len return l.len
} }
func (l *CList) Front() *CElement { func (l *CList) Front() *CElement {
l.mtx.Lock()
defer l.mtx.Unlock()
l.mtx.RLock()
defer l.mtx.RUnlock()
return l.head return l.head
} }
func (l *CList) FrontWait() *CElement { func (l *CList) FrontWait() *CElement {
// Loop until the head is non-nil else wait and try again
for { for {
l.mtx.Lock()
l.mtx.RLock()
head := l.head head := l.head
wg := l.wg wg := l.wg
l.mtx.Unlock()
if head == nil {
wg.Wait()
} else {
l.mtx.RUnlock()
if head != nil {
return head return head
} }
wg.Wait()
// NOTE: If you think l.head exists here, think harder.
} }
} }
func (l *CList) Back() *CElement { func (l *CList) Back() *CElement {
l.mtx.Lock()
defer l.mtx.Unlock()
l.mtx.RLock()
defer l.mtx.RUnlock()
return l.tail return l.tail
} }
func (l *CList) BackWait() *CElement { func (l *CList) BackWait() *CElement {
for { for {
l.mtx.Lock()
l.mtx.RLock()
tail := l.tail tail := l.tail
wg := l.wg wg := l.wg
l.mtx.Unlock()
if tail == nil {
wg.Wait()
} else {
l.mtx.RUnlock()
if tail != nil {
return tail return tail
} }
wg.Wait()
// l.tail doesn't necessarily exist here.
// That's why we need to continue a for-loop.
} }
} }
@ -203,11 +264,12 @@ func (l *CList) PushBack(v interface{}) *CElement {
// Construct a new element // Construct a new element
e := &CElement{ e := &CElement{
prev: nil,
prevWg: waitGroup1(),
next: nil,
nextWg: waitGroup1(),
Value: v,
prev: nil,
prevWg: waitGroup1(),
next: nil,
nextWg: waitGroup1(),
removed: false,
Value: v,
} }
// Release waiters on FrontWait/BackWait maybe // Release waiters on FrontWait/BackWait maybe
@ -221,9 +283,9 @@ func (l *CList) PushBack(v interface{}) *CElement {
l.head = e l.head = e
l.tail = e l.tail = e
} else { } else {
l.tail.setNextAtomic(e)
e.setPrevAtomic(l.tail)
l.tail = e
e.SetPrev(l.tail) // We must init e first.
l.tail.SetNext(e) // This will make e accessible.
l.tail = e // Update the list.
} }
return e return e
@ -250,30 +312,26 @@ func (l *CList) Remove(e *CElement) interface{} {
// If we're removing the only item, make CList FrontWait/BackWait wait. // If we're removing the only item, make CList FrontWait/BackWait wait.
if l.len == 1 { if l.len == 1 {
l.wg.Add(1)
l.wg = waitGroup1() // WaitGroups are difficult to re-use.
} }
// Update l.len
l.len -= 1 l.len -= 1
// Connect next/prev and set head/tail // Connect next/prev and set head/tail
if prev == nil { if prev == nil {
l.head = next l.head = next
} else { } else {
prev.setNextAtomic(next)
prev.SetNext(next)
} }
if next == nil { if next == nil {
l.tail = prev l.tail = prev
} else { } else {
next.setPrevAtomic(prev)
next.SetPrev(prev)
} }
// Set .Done() on e, otherwise waiters will wait forever. // Set .Done() on e, otherwise waiters will wait forever.
e.setRemovedAtomic()
if prev == nil {
e.prevWg.Done()
}
if next == nil {
e.nextWg.Done()
}
e.SetRemoved()
return e.Value return e.Value
} }


Loading…
Cancel
Save