You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

386 lines
8.0 KiB

  1. package clist
  2. /*
  3. The purpose of CList is to provide a goroutine-safe linked-list.
  4. This list can be traversed concurrently by any number of goroutines.
  5. However, removed CElements cannot be added back.
  6. NOTE: Not all methods of container/list are (yet) implemented.
  7. NOTE: Removed elements need to DetachPrev or DetachNext consistently
  8. to ensure garbage collection of removed elements.
  9. */
  10. import (
  11. "sync"
  12. )
  13. /*
  14. CElement is an element of a linked-list
  15. Traversal from a CElement is goroutine-safe.
  16. We can't avoid using WaitGroups or for-loops given the documentation
  17. spec without re-implementing the primitives that already exist in
  18. golang/sync. Notice that WaitGroup allows many go-routines to be
  19. simultaneously released, which is what we want. Mutex doesn't do
  20. this. RWMutex does this, but it's clumsy to use in the way that a
  21. WaitGroup would be used -- and we'd end up having two RWMutex's for
  22. prev/next each, which is doubly confusing.
  23. sync.Cond would be sort-of useful, but we don't need a write-lock in
  24. the for-loop. Use sync.Cond when you need serial access to the
  25. "condition". In our case our condition is if `next != nil || removed`,
  26. and there's no reason to serialize that condition for goroutines
  27. waiting on NextWait() (since it's just a read operation).
  28. */
  29. type CElement struct {
  30. mtx sync.RWMutex
  31. prev *CElement
  32. prevWg *sync.WaitGroup
  33. prevWaitCh chan struct{}
  34. next *CElement
  35. nextWg *sync.WaitGroup
  36. nextWaitCh chan struct{}
  37. removed bool
  38. Value interface{} // immutable
  39. }
  40. // Blocking implementation of Next().
  41. // May return nil iff CElement was tail and got removed.
  42. func (e *CElement) NextWait() *CElement {
  43. for {
  44. e.mtx.RLock()
  45. next := e.next
  46. nextWg := e.nextWg
  47. removed := e.removed
  48. e.mtx.RUnlock()
  49. if next != nil || removed {
  50. return next
  51. }
  52. nextWg.Wait()
  53. // e.next doesn't necessarily exist here.
  54. // That's why we need to continue a for-loop.
  55. }
  56. }
  57. // Blocking implementation of Prev().
  58. // May return nil iff CElement was head and got removed.
  59. func (e *CElement) PrevWait() *CElement {
  60. for {
  61. e.mtx.RLock()
  62. prev := e.prev
  63. prevWg := e.prevWg
  64. removed := e.removed
  65. e.mtx.RUnlock()
  66. if prev != nil || removed {
  67. return prev
  68. }
  69. prevWg.Wait()
  70. }
  71. }
  72. // PrevWaitChan can be used to wait until Prev becomes not nil. Once it does,
  73. // channel will be closed.
  74. func (e *CElement) PrevWaitChan() <-chan struct{} {
  75. e.mtx.RLock()
  76. defer e.mtx.RUnlock()
  77. return e.prevWaitCh
  78. }
  79. // NextWaitChan can be used to wait until Next becomes not nil. Once it does,
  80. // channel will be closed.
  81. func (e *CElement) NextWaitChan() <-chan struct{} {
  82. e.mtx.RLock()
  83. defer e.mtx.RUnlock()
  84. return e.nextWaitCh
  85. }
  86. // Nonblocking, may return nil if at the end.
  87. func (e *CElement) Next() *CElement {
  88. e.mtx.RLock()
  89. defer e.mtx.RUnlock()
  90. return e.next
  91. }
  92. // Nonblocking, may return nil if at the end.
  93. func (e *CElement) Prev() *CElement {
  94. e.mtx.RLock()
  95. prev := e.prev
  96. e.mtx.RUnlock()
  97. return prev
  98. }
  99. func (e *CElement) Removed() bool {
  100. e.mtx.RLock()
  101. isRemoved := e.removed
  102. e.mtx.RUnlock()
  103. return isRemoved
  104. }
  105. func (e *CElement) DetachNext() {
  106. e.mtx.Lock()
  107. if !e.removed {
  108. e.mtx.Unlock()
  109. panic("DetachNext() must be called after Remove(e)")
  110. }
  111. e.next = nil
  112. e.mtx.Unlock()
  113. }
  114. func (e *CElement) DetachPrev() {
  115. e.mtx.Lock()
  116. if !e.removed {
  117. e.mtx.Unlock()
  118. panic("DetachPrev() must be called after Remove(e)")
  119. }
  120. e.prev = nil
  121. e.mtx.Unlock()
  122. }
  123. // NOTE: This function needs to be safe for
  124. // concurrent goroutines waiting on nextWg.
  125. func (e *CElement) SetNext(newNext *CElement) {
  126. e.mtx.Lock()
  127. oldNext := e.next
  128. e.next = newNext
  129. if oldNext != nil && newNext == nil {
  130. // See https://golang.org/pkg/sync/:
  131. //
  132. // If a WaitGroup is reused to wait for several independent sets of
  133. // events, new Add calls must happen after all previous Wait calls have
  134. // returned.
  135. e.nextWg = waitGroup1() // WaitGroups are difficult to re-use.
  136. e.nextWaitCh = make(chan struct{})
  137. }
  138. if oldNext == nil && newNext != nil {
  139. e.nextWg.Done()
  140. close(e.nextWaitCh)
  141. }
  142. e.mtx.Unlock()
  143. }
  144. // NOTE: This function needs to be safe for
  145. // concurrent goroutines waiting on prevWg
  146. func (e *CElement) SetPrev(newPrev *CElement) {
  147. e.mtx.Lock()
  148. oldPrev := e.prev
  149. e.prev = newPrev
  150. if oldPrev != nil && newPrev == nil {
  151. e.prevWg = waitGroup1() // WaitGroups are difficult to re-use.
  152. e.prevWaitCh = make(chan struct{})
  153. }
  154. if oldPrev == nil && newPrev != nil {
  155. e.prevWg.Done()
  156. close(e.prevWaitCh)
  157. }
  158. e.mtx.Unlock()
  159. }
  160. func (e *CElement) SetRemoved() {
  161. e.mtx.Lock()
  162. e.removed = true
  163. // This wakes up anyone waiting in either direction.
  164. if e.prev == nil {
  165. e.prevWg.Done()
  166. close(e.prevWaitCh)
  167. }
  168. if e.next == nil {
  169. e.nextWg.Done()
  170. close(e.nextWaitCh)
  171. }
  172. e.mtx.Unlock()
  173. }
  174. //--------------------------------------------------------------------------------
  175. // CList represents a linked list.
  176. // The zero value for CList is an empty list ready to use.
  177. // Operations are goroutine-safe.
  178. type CList struct {
  179. mtx sync.RWMutex
  180. wg *sync.WaitGroup
  181. waitCh chan struct{}
  182. head *CElement // first element
  183. tail *CElement // last element
  184. len int // list length
  185. }
  186. func (l *CList) Init() *CList {
  187. l.mtx.Lock()
  188. l.wg = waitGroup1()
  189. l.waitCh = make(chan struct{})
  190. l.head = nil
  191. l.tail = nil
  192. l.len = 0
  193. l.mtx.Unlock()
  194. return l
  195. }
  196. func New() *CList { return new(CList).Init() }
  197. func (l *CList) Len() int {
  198. l.mtx.RLock()
  199. len := l.len
  200. l.mtx.RUnlock()
  201. return len
  202. }
  203. func (l *CList) Front() *CElement {
  204. l.mtx.RLock()
  205. head := l.head
  206. l.mtx.RUnlock()
  207. return head
  208. }
  209. func (l *CList) FrontWait() *CElement {
  210. // Loop until the head is non-nil else wait and try again
  211. for {
  212. l.mtx.RLock()
  213. head := l.head
  214. wg := l.wg
  215. l.mtx.RUnlock()
  216. if head != nil {
  217. return head
  218. }
  219. wg.Wait()
  220. // NOTE: If you think l.head exists here, think harder.
  221. }
  222. }
  223. func (l *CList) Back() *CElement {
  224. l.mtx.RLock()
  225. back := l.tail
  226. l.mtx.RUnlock()
  227. return back
  228. }
  229. func (l *CList) BackWait() *CElement {
  230. for {
  231. l.mtx.RLock()
  232. tail := l.tail
  233. wg := l.wg
  234. l.mtx.RUnlock()
  235. if tail != nil {
  236. return tail
  237. }
  238. wg.Wait()
  239. // l.tail doesn't necessarily exist here.
  240. // That's why we need to continue a for-loop.
  241. }
  242. }
  243. // WaitChan can be used to wait until Front or Back becomes not nil. Once it
  244. // does, channel will be closed.
  245. func (l *CList) WaitChan() <-chan struct{} {
  246. l.mtx.Lock()
  247. defer l.mtx.Unlock()
  248. return l.waitCh
  249. }
  250. func (l *CList) PushBack(v interface{}) *CElement {
  251. l.mtx.Lock()
  252. // Construct a new element
  253. e := &CElement{
  254. prev: nil,
  255. prevWg: waitGroup1(),
  256. prevWaitCh: make(chan struct{}),
  257. next: nil,
  258. nextWg: waitGroup1(),
  259. nextWaitCh: make(chan struct{}),
  260. removed: false,
  261. Value: v,
  262. }
  263. // Release waiters on FrontWait/BackWait maybe
  264. if l.len == 0 {
  265. l.wg.Done()
  266. close(l.waitCh)
  267. }
  268. l.len++
  269. // Modify the tail
  270. if l.tail == nil {
  271. l.head = e
  272. l.tail = e
  273. } else {
  274. e.SetPrev(l.tail) // We must init e first.
  275. l.tail.SetNext(e) // This will make e accessible.
  276. l.tail = e // Update the list.
  277. }
  278. l.mtx.Unlock()
  279. return e
  280. }
  281. // CONTRACT: Caller must call e.DetachPrev() and/or e.DetachNext() to avoid memory leaks.
  282. // NOTE: As per the contract of CList, removed elements cannot be added back.
  283. func (l *CList) Remove(e *CElement) interface{} {
  284. l.mtx.Lock()
  285. prev := e.Prev()
  286. next := e.Next()
  287. if l.head == nil || l.tail == nil {
  288. l.mtx.Unlock()
  289. panic("Remove(e) on empty CList")
  290. }
  291. if prev == nil && l.head != e {
  292. l.mtx.Unlock()
  293. panic("Remove(e) with false head")
  294. }
  295. if next == nil && l.tail != e {
  296. l.mtx.Unlock()
  297. panic("Remove(e) with false tail")
  298. }
  299. // If we're removing the only item, make CList FrontWait/BackWait wait.
  300. if l.len == 1 {
  301. l.wg = waitGroup1() // WaitGroups are difficult to re-use.
  302. l.waitCh = make(chan struct{})
  303. }
  304. // Update l.len
  305. l.len--
  306. // Connect next/prev and set head/tail
  307. if prev == nil {
  308. l.head = next
  309. } else {
  310. prev.SetNext(next)
  311. }
  312. if next == nil {
  313. l.tail = prev
  314. } else {
  315. next.SetPrev(prev)
  316. }
  317. // Set .Done() on e, otherwise waiters will wait forever.
  318. e.SetRemoved()
  319. l.mtx.Unlock()
  320. return e.Value
  321. }
  322. func waitGroup1() (wg *sync.WaitGroup) {
  323. wg = &sync.WaitGroup{}
  324. wg.Add(1)
  325. return
  326. }