You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

384 lines
8.0 KiB

  1. package clist
  2. /*
  3. The purpose of CList is to provide a goroutine-safe linked-list.
  4. This list can be traversed concurrently by any number of goroutines.
  5. However, removed CElements cannot be added back.
  6. NOTE: Not all methods of container/list are (yet) implemented.
  7. NOTE: Removed elements need to DetachPrev or DetachNext consistently
  8. to ensure garbage collection of removed elements.
  9. */
  10. import (
  11. "sync"
  12. )
  13. /*
  14. CElement is an element of a linked-list
  15. Traversal from a CElement is goroutine-safe.
  16. We can't avoid using WaitGroups or for-loops given the documentation
  17. spec without re-implementing the primitives that already exist in
  18. golang/sync. Notice that WaitGroup allows many go-routines to be
  19. simultaneously released, which is what we want. Mutex doesn't do
  20. this. RWMutex does this, but it's clumsy to use in the way that a
  21. WaitGroup would be used -- and we'd end up having two RWMutex's for
  22. prev/next each, which is doubly confusing.
  23. sync.Cond would be sort-of useful, but we don't need a write-lock in
  24. the for-loop. Use sync.Cond when you need serial access to the
  25. "condition". In our case our condition is if `next != nil || removed`,
  26. and there's no reason to serialize that condition for goroutines
  27. waiting on NextWait() (since it's just a read operation).
  28. */
  29. type CElement struct {
  30. mtx sync.RWMutex
  31. prev *CElement
  32. prevWg *sync.WaitGroup
  33. prevWaitCh chan struct{}
  34. next *CElement
  35. nextWg *sync.WaitGroup
  36. nextWaitCh chan struct{}
  37. removed bool
  38. Value interface{} // immutable
  39. }
  40. // Blocking implementation of Next().
  41. // May return nil iff CElement was tail and got removed.
  42. func (e *CElement) NextWait() *CElement {
  43. for {
  44. e.mtx.RLock()
  45. next := e.next
  46. nextWg := e.nextWg
  47. removed := e.removed
  48. e.mtx.RUnlock()
  49. if next != nil || removed {
  50. return next
  51. }
  52. nextWg.Wait()
  53. // e.next doesn't necessarily exist here.
  54. // That's why we need to continue a for-loop.
  55. }
  56. }
  57. // Blocking implementation of Prev().
  58. // May return nil iff CElement was head and got removed.
  59. func (e *CElement) PrevWait() *CElement {
  60. for {
  61. e.mtx.RLock()
  62. prev := e.prev
  63. prevWg := e.prevWg
  64. removed := e.removed
  65. e.mtx.RUnlock()
  66. if prev != nil || removed {
  67. return prev
  68. }
  69. prevWg.Wait()
  70. }
  71. }
  72. // PrevWaitChan can be used to wait until Prev becomes not nil. Once it does,
  73. // channel will be closed.
  74. func (e *CElement) PrevWaitChan() <-chan struct{} {
  75. e.mtx.RLock()
  76. defer e.mtx.RUnlock()
  77. return e.prevWaitCh
  78. }
  79. // NextWaitChan can be used to wait until Next becomes not nil. Once it does,
  80. // channel will be closed.
  81. func (e *CElement) NextWaitChan() <-chan struct{} {
  82. e.mtx.RLock()
  83. defer e.mtx.RUnlock()
  84. return e.nextWaitCh
  85. }
  86. // Nonblocking, may return nil if at the end.
  87. func (e *CElement) Next() *CElement {
  88. e.mtx.RLock()
  89. defer e.mtx.RUnlock()
  90. return e.next
  91. }
  92. // Nonblocking, may return nil if at the end.
  93. func (e *CElement) Prev() *CElement {
  94. e.mtx.RLock()
  95. defer e.mtx.RUnlock()
  96. return e.prev
  97. }
  98. func (e *CElement) Removed() bool {
  99. e.mtx.RLock()
  100. defer e.mtx.RUnlock()
  101. return e.removed
  102. }
  103. func (e *CElement) DetachNext() {
  104. if !e.Removed() {
  105. panic("DetachNext() must be called after Remove(e)")
  106. }
  107. e.mtx.Lock()
  108. defer e.mtx.Unlock()
  109. e.next = nil
  110. }
  111. func (e *CElement) DetachPrev() {
  112. if !e.Removed() {
  113. panic("DetachPrev() must be called after Remove(e)")
  114. }
  115. e.mtx.Lock()
  116. defer e.mtx.Unlock()
  117. e.prev = nil
  118. }
  119. // NOTE: This function needs to be safe for
  120. // concurrent goroutines waiting on nextWg.
  121. func (e *CElement) SetNext(newNext *CElement) {
  122. e.mtx.Lock()
  123. defer e.mtx.Unlock()
  124. oldNext := e.next
  125. e.next = newNext
  126. if oldNext != nil && newNext == nil {
  127. // See https://golang.org/pkg/sync/:
  128. //
  129. // If a WaitGroup is reused to wait for several independent sets of
  130. // events, new Add calls must happen after all previous Wait calls have
  131. // returned.
  132. e.nextWg = waitGroup1() // WaitGroups are difficult to re-use.
  133. e.nextWaitCh = make(chan struct{})
  134. }
  135. if oldNext == nil && newNext != nil {
  136. e.nextWg.Done()
  137. close(e.nextWaitCh)
  138. }
  139. }
  140. // NOTE: This function needs to be safe for
  141. // concurrent goroutines waiting on prevWg
  142. func (e *CElement) SetPrev(newPrev *CElement) {
  143. e.mtx.Lock()
  144. defer e.mtx.Unlock()
  145. oldPrev := e.prev
  146. e.prev = newPrev
  147. if oldPrev != nil && newPrev == nil {
  148. e.prevWg = waitGroup1() // WaitGroups are difficult to re-use.
  149. e.prevWaitCh = make(chan struct{})
  150. }
  151. if oldPrev == nil && newPrev != nil {
  152. e.prevWg.Done()
  153. close(e.prevWaitCh)
  154. }
  155. }
  156. func (e *CElement) SetRemoved() {
  157. e.mtx.Lock()
  158. defer e.mtx.Unlock()
  159. e.removed = true
  160. // This wakes up anyone waiting in either direction.
  161. if e.prev == nil {
  162. e.prevWg.Done()
  163. close(e.prevWaitCh)
  164. }
  165. if e.next == nil {
  166. e.nextWg.Done()
  167. close(e.nextWaitCh)
  168. }
  169. }
  170. //--------------------------------------------------------------------------------
  171. // CList represents a linked list.
  172. // The zero value for CList is an empty list ready to use.
  173. // Operations are goroutine-safe.
  174. type CList struct {
  175. mtx sync.RWMutex
  176. wg *sync.WaitGroup
  177. waitCh chan struct{}
  178. head *CElement // first element
  179. tail *CElement // last element
  180. len int // list length
  181. }
  182. func (l *CList) Init() *CList {
  183. l.mtx.Lock()
  184. defer l.mtx.Unlock()
  185. l.wg = waitGroup1()
  186. l.waitCh = make(chan struct{})
  187. l.head = nil
  188. l.tail = nil
  189. l.len = 0
  190. return l
  191. }
  192. func New() *CList { return new(CList).Init() }
  193. func (l *CList) Len() int {
  194. l.mtx.RLock()
  195. defer l.mtx.RUnlock()
  196. return l.len
  197. }
  198. func (l *CList) Front() *CElement {
  199. l.mtx.RLock()
  200. defer l.mtx.RUnlock()
  201. return l.head
  202. }
  203. func (l *CList) FrontWait() *CElement {
  204. // Loop until the head is non-nil else wait and try again
  205. for {
  206. l.mtx.RLock()
  207. head := l.head
  208. wg := l.wg
  209. l.mtx.RUnlock()
  210. if head != nil {
  211. return head
  212. }
  213. wg.Wait()
  214. // NOTE: If you think l.head exists here, think harder.
  215. }
  216. }
  217. func (l *CList) Back() *CElement {
  218. l.mtx.RLock()
  219. defer l.mtx.RUnlock()
  220. return l.tail
  221. }
  222. func (l *CList) BackWait() *CElement {
  223. for {
  224. l.mtx.RLock()
  225. tail := l.tail
  226. wg := l.wg
  227. l.mtx.RUnlock()
  228. if tail != nil {
  229. return tail
  230. }
  231. wg.Wait()
  232. // l.tail doesn't necessarily exist here.
  233. // That's why we need to continue a for-loop.
  234. }
  235. }
  236. // WaitChan can be used to wait until Front or Back becomes not nil. Once it
  237. // does, channel will be closed.
  238. func (l *CList) WaitChan() <-chan struct{} {
  239. l.mtx.Lock()
  240. defer l.mtx.Unlock()
  241. return l.waitCh
  242. }
  243. func (l *CList) PushBack(v interface{}) *CElement {
  244. l.mtx.Lock()
  245. defer l.mtx.Unlock()
  246. // Construct a new element
  247. e := &CElement{
  248. prev: nil,
  249. prevWg: waitGroup1(),
  250. prevWaitCh: make(chan struct{}),
  251. next: nil,
  252. nextWg: waitGroup1(),
  253. nextWaitCh: make(chan struct{}),
  254. removed: false,
  255. Value: v,
  256. }
  257. // Release waiters on FrontWait/BackWait maybe
  258. if l.len == 0 {
  259. l.wg.Done()
  260. close(l.waitCh)
  261. }
  262. l.len += 1
  263. // Modify the tail
  264. if l.tail == nil {
  265. l.head = e
  266. l.tail = e
  267. } else {
  268. e.SetPrev(l.tail) // We must init e first.
  269. l.tail.SetNext(e) // This will make e accessible.
  270. l.tail = e // Update the list.
  271. }
  272. return e
  273. }
  274. // CONTRACT: Caller must call e.DetachPrev() and/or e.DetachNext() to avoid memory leaks.
  275. // NOTE: As per the contract of CList, removed elements cannot be added back.
  276. func (l *CList) Remove(e *CElement) interface{} {
  277. l.mtx.Lock()
  278. defer l.mtx.Unlock()
  279. prev := e.Prev()
  280. next := e.Next()
  281. if l.head == nil || l.tail == nil {
  282. panic("Remove(e) on empty CList")
  283. }
  284. if prev == nil && l.head != e {
  285. panic("Remove(e) with false head")
  286. }
  287. if next == nil && l.tail != e {
  288. panic("Remove(e) with false tail")
  289. }
  290. // If we're removing the only item, make CList FrontWait/BackWait wait.
  291. if l.len == 1 {
  292. l.wg = waitGroup1() // WaitGroups are difficult to re-use.
  293. l.waitCh = make(chan struct{})
  294. }
  295. // Update l.len
  296. l.len -= 1
  297. // Connect next/prev and set head/tail
  298. if prev == nil {
  299. l.head = next
  300. } else {
  301. prev.SetNext(next)
  302. }
  303. if next == nil {
  304. l.tail = prev
  305. } else {
  306. next.SetPrev(prev)
  307. }
  308. // Set .Done() on e, otherwise waiters will wait forever.
  309. e.SetRemoved()
  310. return e.Value
  311. }
  312. func waitGroup1() (wg *sync.WaitGroup) {
  313. wg = &sync.WaitGroup{}
  314. wg.Add(1)
  315. return
  316. }