Browse Source

add waitCh as an alternative to waitGroup

new methods:
 - [CList] WaitChan()
 - [CElement] NextWaitChan()
 - [CElement] PrevWaitChan()

Refs https://github.com/tendermint/tendermint/pull/1173
pull/1842/head
Anton Kaliaev 7 years ago
parent
commit
91b41ddd59
No known key found for this signature in database GPG Key ID: 7B6881D965918214
3 changed files with 138 additions and 17 deletions
  1. +7
    -0
      CHANGELOG.md
  2. +58
    -17
      clist/clist.go
  3. +73
    -0
      clist/clist_test.go

+ 7
- 0
CHANGELOG.md View File

@ -6,6 +6,13 @@ BREAKING:
- [cli] WriteDemoConfig -> WriteConfigValues
## 0.6.1 (TBD)
IMPROVEMENTS:
- [clist] add WaitChan() to CList, NextWaitChan() and PrevWaitChan()
to CElement. These can be used instead of blocking *Wait() methods
if you need to be able to send quit signal and not block forever
## 0.6.0 (December 29, 2017)
BREAKING:


+ 58
- 17
clist/clist.go View File

@ -36,12 +36,14 @@ waiting on NextWait() (since it's just a read operation).
*/
type CElement struct {
mtx sync.RWMutex
prev *CElement
prevWg *sync.WaitGroup
next *CElement
nextWg *sync.WaitGroup
removed bool
mtx sync.RWMutex
prev *CElement
prevWg *sync.WaitGroup
prevWaitCh chan struct{}
next *CElement
nextWg *sync.WaitGroup
nextWaitCh chan struct{}
removed bool
Value interface{} // immutable
}
@ -84,6 +86,24 @@ func (e *CElement) PrevWait() *CElement {
}
}
// PrevWaitChan can be used to wait until Prev becomes not nil. Once it does,
// channel will be closed.
func (e *CElement) PrevWaitChan() <-chan struct{} {
e.mtx.RLock()
defer e.mtx.RUnlock()
return e.prevWaitCh
}
// NextWaitChan can be used to wait until Next becomes not nil. Once it does,
// channel will be closed.
func (e *CElement) NextWaitChan() <-chan struct{} {
e.mtx.RLock()
defer e.mtx.RUnlock()
return e.nextWaitCh
}
// Nonblocking, may return nil if at the end.
func (e *CElement) Next() *CElement {
e.mtx.RLock()
@ -142,9 +162,11 @@ func (e *CElement) SetNext(newNext *CElement) {
// events, new Add calls must happen after all previous Wait calls have
// returned.
e.nextWg = waitGroup1() // WaitGroups are difficult to re-use.
e.nextWaitCh = make(chan struct{})
}
if oldNext == nil && newNext != nil {
e.nextWg.Done()
close(e.nextWaitCh)
}
}
@ -158,9 +180,11 @@ func (e *CElement) SetPrev(newPrev *CElement) {
e.prev = newPrev
if oldPrev != nil && newPrev == nil {
e.prevWg = waitGroup1() // WaitGroups are difficult to re-use.
e.prevWaitCh = make(chan struct{})
}
if oldPrev == nil && newPrev != nil {
e.prevWg.Done()
close(e.prevWaitCh)
}
}
@ -173,9 +197,11 @@ func (e *CElement) SetRemoved() {
// This wakes up anyone waiting in either direction.
if e.prev == nil {
e.prevWg.Done()
close(e.prevWaitCh)
}
if e.next == nil {
e.nextWg.Done()
close(e.nextWaitCh)
}
}
@ -185,11 +211,12 @@ func (e *CElement) SetRemoved() {
// The zero value for CList is an empty list ready to use.
// Operations are goroutine-safe.
type CList struct {
mtx sync.RWMutex
wg *sync.WaitGroup
head *CElement // first element
tail *CElement // last element
len int // list length
mtx sync.RWMutex
wg *sync.WaitGroup
waitCh chan struct{}
head *CElement // first element
tail *CElement // last element
len int // list length
}
func (l *CList) Init() *CList {
@ -197,6 +224,7 @@ func (l *CList) Init() *CList {
defer l.mtx.Unlock()
l.wg = waitGroup1()
l.waitCh = make(chan struct{})
l.head = nil
l.tail = nil
l.len = 0
@ -258,23 +286,35 @@ func (l *CList) BackWait() *CElement {
}
}
// WaitChan can be used to wait until Front or Back becomes not nil. Once it
// does, channel will be closed.
func (l *CList) WaitChan() <-chan struct{} {
l.mtx.Lock()
defer l.mtx.Unlock()
return l.waitCh
}
func (l *CList) PushBack(v interface{}) *CElement {
l.mtx.Lock()
defer l.mtx.Unlock()
// Construct a new element
e := &CElement{
prev: nil,
prevWg: waitGroup1(),
next: nil,
nextWg: waitGroup1(),
removed: false,
Value: v,
prev: nil,
prevWg: waitGroup1(),
prevWaitCh: make(chan struct{}),
next: nil,
nextWg: waitGroup1(),
nextWaitCh: make(chan struct{}),
removed: false,
Value: v,
}
// Release waiters on FrontWait/BackWait maybe
if l.len == 0 {
l.wg.Done()
close(l.waitCh)
}
l.len += 1
@ -313,6 +353,7 @@ func (l *CList) Remove(e *CElement) interface{} {
// If we're removing the only item, make CList FrontWait/BackWait wait.
if l.len == 1 {
l.wg = waitGroup1() // WaitGroups are difficult to re-use.
l.waitCh = make(chan struct{})
}
// Update l.len


+ 73
- 0
clist/clist_test.go View File

@ -218,3 +218,76 @@ func TestScanRightDeleteRandom(t *testing.T) {
t.Fatal("Failed to remove all elements from CList")
}
}
func TestWaitChan(t *testing.T) {
l := New()
ch := l.WaitChan()
// 1) add one element to an empty list
go l.PushBack(1)
<-ch
// 2) and remove it
el := l.Front()
v := l.Remove(el)
if v != 1 {
t.Fatal("where is 1 coming from?")
}
// 3) test iterating forward and waiting for Next (NextWaitChan and Next)
el = l.PushBack(0)
done := make(chan struct{})
pushed := 0
go func() {
for i := 1; i < 100; i++ {
l.PushBack(i)
pushed++
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
}
close(done)
}()
next := el
seen := 0
FOR_LOOP:
for {
select {
case <-next.NextWaitChan():
next = next.Next()
seen++
if next == nil {
continue
}
case <-done:
break FOR_LOOP
case <-time.After(10 * time.Second):
t.Fatal("max execution time")
}
}
if pushed != seen {
t.Fatalf("number of pushed items (%d) not equal to number of seen items (%d)", pushed, seen)
}
// 4) test iterating backwards (PrevWaitChan and Prev)
prev := next
seen = 0
FOR_LOOP2:
for {
select {
case <-prev.PrevWaitChan():
prev = prev.Prev()
seen++
if prev == nil {
t.Fatal("expected PrevWaitChan to block forever on nil when reached first elem")
}
case <-time.After(5 * time.Second):
break FOR_LOOP2
}
}
if pushed != seen {
t.Fatalf("number of pushed items (%d) not equal to number of seen items (%d)", pushed, seen)
}
}

Loading…
Cancel
Save