You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

343 lines
6.9 KiB

  1. package clist
  2. /*
  3. The purpose of CList is to provide a goroutine-safe linked-list.
  4. This list can be traversed concurrently by any number of goroutines.
  5. However, removed CElements cannot be added back.
  6. NOTE: Not all methods of container/list are (yet) implemented.
  7. NOTE: Removed elements need to DetachPrev or DetachNext consistently
  8. to ensure garbage collection of removed elements.
  9. */
  10. import (
  11. "sync"
  12. )
  13. /*
  14. CElement is an element of a linked-list
  15. Traversal from a CElement is goroutine-safe.
  16. We can't avoid using WaitGroups or for-loops given the documentation
  17. spec without re-implementing the primitives that already exist in
  18. golang/sync. Notice that WaitGroup allows many go-routines to be
  19. simultaneously released, which is what we want. Mutex doesn't do
  20. this. RWMutex does this, but it's clumsy to use in the way that a
  21. WaitGroup would be used -- and we'd end up having two RWMutex's for
  22. prev/next each, which is doubly confusing.
  23. sync.Cond would be sort-of useful, but we don't need a write-lock in
  24. the for-loop. Use sync.Cond when you need serial access to the
  25. "condition". In our case our condition is if `next != nil || removed`,
  26. and there's no reason to serialize that condition for goroutines
  27. waiting on NextWait() (since it's just a read operation).
  28. */
  29. type CElement struct {
  30. mtx sync.RWMutex
  31. prev *CElement
  32. prevWg *sync.WaitGroup
  33. next *CElement
  34. nextWg *sync.WaitGroup
  35. removed bool
  36. Value interface{} // immutable
  37. }
  38. // Blocking implementation of Next().
  39. // May return nil iff CElement was tail and got removed.
  40. func (e *CElement) NextWait() *CElement {
  41. for {
  42. e.mtx.RLock()
  43. next := e.next
  44. nextWg := e.nextWg
  45. removed := e.removed
  46. e.mtx.RUnlock()
  47. if next != nil || removed {
  48. return next
  49. }
  50. nextWg.Wait()
  51. // e.next doesn't necessarily exist here.
  52. // That's why we need to continue a for-loop.
  53. }
  54. }
  55. // Blocking implementation of Prev().
  56. // May return nil iff CElement was head and got removed.
  57. func (e *CElement) PrevWait() *CElement {
  58. for {
  59. e.mtx.RLock()
  60. prev := e.prev
  61. prevWg := e.prevWg
  62. removed := e.removed
  63. e.mtx.RUnlock()
  64. if prev != nil || removed {
  65. return prev
  66. }
  67. prevWg.Wait()
  68. }
  69. }
  70. // Nonblocking, may return nil if at the end.
  71. func (e *CElement) Next() *CElement {
  72. e.mtx.RLock()
  73. defer e.mtx.RUnlock()
  74. return e.next
  75. }
  76. // Nonblocking, may return nil if at the end.
  77. func (e *CElement) Prev() *CElement {
  78. e.mtx.RLock()
  79. defer e.mtx.RUnlock()
  80. return e.prev
  81. }
  82. func (e *CElement) Removed() bool {
  83. e.mtx.RLock()
  84. defer e.mtx.RUnlock()
  85. return e.removed
  86. }
  87. func (e *CElement) DetachNext() {
  88. if !e.Removed() {
  89. panic("DetachNext() must be called after Remove(e)")
  90. }
  91. e.mtx.Lock()
  92. defer e.mtx.Unlock()
  93. e.next = nil
  94. }
  95. func (e *CElement) DetachPrev() {
  96. if !e.Removed() {
  97. panic("DetachPrev() must be called after Remove(e)")
  98. }
  99. e.mtx.Lock()
  100. defer e.mtx.Unlock()
  101. e.prev = nil
  102. }
  103. // NOTE: This function needs to be safe for
  104. // concurrent goroutines waiting on nextWg.
  105. func (e *CElement) SetNext(newNext *CElement) {
  106. e.mtx.Lock()
  107. defer e.mtx.Unlock()
  108. oldNext := e.next
  109. e.next = newNext
  110. if oldNext != nil && newNext == nil {
  111. // See https://golang.org/pkg/sync/:
  112. //
  113. // If a WaitGroup is reused to wait for several independent sets of
  114. // events, new Add calls must happen after all previous Wait calls have
  115. // returned.
  116. e.nextWg = waitGroup1() // WaitGroups are difficult to re-use.
  117. }
  118. if oldNext == nil && newNext != nil {
  119. e.nextWg.Done()
  120. }
  121. }
  122. // NOTE: This function needs to be safe for
  123. // concurrent goroutines waiting on prevWg
  124. func (e *CElement) SetPrev(newPrev *CElement) {
  125. e.mtx.Lock()
  126. defer e.mtx.Unlock()
  127. oldPrev := e.prev
  128. e.prev = newPrev
  129. if oldPrev != nil && newPrev == nil {
  130. e.prevWg = waitGroup1() // WaitGroups are difficult to re-use.
  131. }
  132. if oldPrev == nil && newPrev != nil {
  133. e.prevWg.Done()
  134. }
  135. }
  136. func (e *CElement) SetRemoved() {
  137. e.mtx.Lock()
  138. defer e.mtx.Unlock()
  139. e.removed = true
  140. // This wakes up anyone waiting in either direction.
  141. if e.prev == nil {
  142. e.prevWg.Done()
  143. }
  144. if e.next == nil {
  145. e.nextWg.Done()
  146. }
  147. }
  148. //--------------------------------------------------------------------------------
  149. // CList represents a linked list.
  150. // The zero value for CList is an empty list ready to use.
  151. // Operations are goroutine-safe.
  152. type CList struct {
  153. mtx sync.RWMutex
  154. wg *sync.WaitGroup
  155. head *CElement // first element
  156. tail *CElement // last element
  157. len int // list length
  158. }
  159. func (l *CList) Init() *CList {
  160. l.mtx.Lock()
  161. defer l.mtx.Unlock()
  162. l.wg = waitGroup1()
  163. l.head = nil
  164. l.tail = nil
  165. l.len = 0
  166. return l
  167. }
  168. func New() *CList { return new(CList).Init() }
  169. func (l *CList) Len() int {
  170. l.mtx.RLock()
  171. defer l.mtx.RUnlock()
  172. return l.len
  173. }
  174. func (l *CList) Front() *CElement {
  175. l.mtx.RLock()
  176. defer l.mtx.RUnlock()
  177. return l.head
  178. }
  179. func (l *CList) FrontWait() *CElement {
  180. // Loop until the head is non-nil else wait and try again
  181. for {
  182. l.mtx.RLock()
  183. head := l.head
  184. wg := l.wg
  185. l.mtx.RUnlock()
  186. if head != nil {
  187. return head
  188. }
  189. wg.Wait()
  190. // NOTE: If you think l.head exists here, think harder.
  191. }
  192. }
  193. func (l *CList) Back() *CElement {
  194. l.mtx.RLock()
  195. defer l.mtx.RUnlock()
  196. return l.tail
  197. }
  198. func (l *CList) BackWait() *CElement {
  199. for {
  200. l.mtx.RLock()
  201. tail := l.tail
  202. wg := l.wg
  203. l.mtx.RUnlock()
  204. if tail != nil {
  205. return tail
  206. }
  207. wg.Wait()
  208. // l.tail doesn't necessarily exist here.
  209. // That's why we need to continue a for-loop.
  210. }
  211. }
  212. func (l *CList) PushBack(v interface{}) *CElement {
  213. l.mtx.Lock()
  214. defer l.mtx.Unlock()
  215. // Construct a new element
  216. e := &CElement{
  217. prev: nil,
  218. prevWg: waitGroup1(),
  219. next: nil,
  220. nextWg: waitGroup1(),
  221. removed: false,
  222. Value: v,
  223. }
  224. // Release waiters on FrontWait/BackWait maybe
  225. if l.len == 0 {
  226. l.wg.Done()
  227. }
  228. l.len += 1
  229. // Modify the tail
  230. if l.tail == nil {
  231. l.head = e
  232. l.tail = e
  233. } else {
  234. e.SetPrev(l.tail) // We must init e first.
  235. l.tail.SetNext(e) // This will make e accessible.
  236. l.tail = e // Update the list.
  237. }
  238. return e
  239. }
  240. // CONTRACT: Caller must call e.DetachPrev() and/or e.DetachNext() to avoid memory leaks.
  241. // NOTE: As per the contract of CList, removed elements cannot be added back.
  242. func (l *CList) Remove(e *CElement) interface{} {
  243. l.mtx.Lock()
  244. defer l.mtx.Unlock()
  245. prev := e.Prev()
  246. next := e.Next()
  247. if l.head == nil || l.tail == nil {
  248. panic("Remove(e) on empty CList")
  249. }
  250. if prev == nil && l.head != e {
  251. panic("Remove(e) with false head")
  252. }
  253. if next == nil && l.tail != e {
  254. panic("Remove(e) with false tail")
  255. }
  256. // If we're removing the only item, make CList FrontWait/BackWait wait.
  257. if l.len == 1 {
  258. l.wg = waitGroup1() // WaitGroups are difficult to re-use.
  259. }
  260. // Update l.len
  261. l.len -= 1
  262. // Connect next/prev and set head/tail
  263. if prev == nil {
  264. l.head = next
  265. } else {
  266. prev.SetNext(next)
  267. }
  268. if next == nil {
  269. l.tail = prev
  270. } else {
  271. next.SetPrev(prev)
  272. }
  273. // Set .Done() on e, otherwise waiters will wait forever.
  274. e.SetRemoved()
  275. return e.Value
  276. }
  277. func waitGroup1() (wg *sync.WaitGroup) {
  278. wg = &sync.WaitGroup{}
  279. wg.Add(1)
  280. return
  281. }