You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

407 lines
8.7 KiB

  1. package clist
  2. /*
  3. The purpose of CList is to provide a goroutine-safe linked-list.
  4. This list can be traversed concurrently by any number of goroutines.
  5. However, removed CElements cannot be added back.
  6. NOTE: Not all methods of container/list are (yet) implemented.
  7. NOTE: Removed elements need to DetachPrev or DetachNext consistently
  8. to ensure garbage collection of removed elements.
  9. */
  10. import (
  11. "fmt"
  12. "sync"
  13. )
  14. // MaxLength is the max allowed number of elements a linked list is
  15. // allowed to contain.
  16. // If more elements are pushed to the list it will panic.
  17. const MaxLength = int(^uint(0) >> 1)
  18. /*
  19. CElement is an element of a linked-list
  20. Traversal from a CElement is goroutine-safe.
  21. We can't avoid using WaitGroups or for-loops given the documentation
  22. spec without re-implementing the primitives that already exist in
  23. golang/sync. Notice that WaitGroup allows many go-routines to be
  24. simultaneously released, which is what we want. Mutex doesn't do
  25. this. RWMutex does this, but it's clumsy to use in the way that a
  26. WaitGroup would be used -- and we'd end up having two RWMutex's for
  27. prev/next each, which is doubly confusing.
  28. sync.Cond would be sort-of useful, but we don't need a write-lock in
  29. the for-loop. Use sync.Cond when you need serial access to the
  30. "condition". In our case our condition is if `next != nil || removed`,
  31. and there's no reason to serialize that condition for goroutines
  32. waiting on NextWait() (since it's just a read operation).
  33. */
  34. type CElement struct {
  35. mtx sync.RWMutex
  36. prev *CElement
  37. prevWg *sync.WaitGroup
  38. prevWaitCh chan struct{}
  39. next *CElement
  40. nextWg *sync.WaitGroup
  41. nextWaitCh chan struct{}
  42. removed bool
  43. Value interface{} // immutable
  44. }
  45. // Blocking implementation of Next().
  46. // May return nil iff CElement was tail and got removed.
  47. func (e *CElement) NextWait() *CElement {
  48. for {
  49. e.mtx.RLock()
  50. next := e.next
  51. nextWg := e.nextWg
  52. removed := e.removed
  53. e.mtx.RUnlock()
  54. if next != nil || removed {
  55. return next
  56. }
  57. nextWg.Wait()
  58. // e.next doesn't necessarily exist here.
  59. // That's why we need to continue a for-loop.
  60. }
  61. }
  62. // Blocking implementation of Prev().
  63. // May return nil iff CElement was head and got removed.
  64. func (e *CElement) PrevWait() *CElement {
  65. for {
  66. e.mtx.RLock()
  67. prev := e.prev
  68. prevWg := e.prevWg
  69. removed := e.removed
  70. e.mtx.RUnlock()
  71. if prev != nil || removed {
  72. return prev
  73. }
  74. prevWg.Wait()
  75. }
  76. }
  77. // PrevWaitChan can be used to wait until Prev becomes not nil. Once it does,
  78. // channel will be closed.
  79. func (e *CElement) PrevWaitChan() <-chan struct{} {
  80. e.mtx.RLock()
  81. defer e.mtx.RUnlock()
  82. return e.prevWaitCh
  83. }
  84. // NextWaitChan can be used to wait until Next becomes not nil. Once it does,
  85. // channel will be closed.
  86. func (e *CElement) NextWaitChan() <-chan struct{} {
  87. e.mtx.RLock()
  88. defer e.mtx.RUnlock()
  89. return e.nextWaitCh
  90. }
  91. // Nonblocking, may return nil if at the end.
  92. func (e *CElement) Next() *CElement {
  93. e.mtx.RLock()
  94. val := e.next
  95. e.mtx.RUnlock()
  96. return val
  97. }
  98. // Nonblocking, may return nil if at the end.
  99. func (e *CElement) Prev() *CElement {
  100. e.mtx.RLock()
  101. prev := e.prev
  102. e.mtx.RUnlock()
  103. return prev
  104. }
  105. func (e *CElement) Removed() bool {
  106. e.mtx.RLock()
  107. isRemoved := e.removed
  108. e.mtx.RUnlock()
  109. return isRemoved
  110. }
  111. func (e *CElement) DetachNext() {
  112. e.mtx.Lock()
  113. if !e.removed {
  114. e.mtx.Unlock()
  115. panic("DetachNext() must be called after Remove(e)")
  116. }
  117. e.next = nil
  118. e.mtx.Unlock()
  119. }
  120. func (e *CElement) DetachPrev() {
  121. e.mtx.Lock()
  122. if !e.removed {
  123. e.mtx.Unlock()
  124. panic("DetachPrev() must be called after Remove(e)")
  125. }
  126. e.prev = nil
  127. e.mtx.Unlock()
  128. }
  129. // NOTE: This function needs to be safe for
  130. // concurrent goroutines waiting on nextWg.
  131. func (e *CElement) SetNext(newNext *CElement) {
  132. e.mtx.Lock()
  133. oldNext := e.next
  134. e.next = newNext
  135. if oldNext != nil && newNext == nil {
  136. // See https://golang.org/pkg/sync/:
  137. //
  138. // If a WaitGroup is reused to wait for several independent sets of
  139. // events, new Add calls must happen after all previous Wait calls have
  140. // returned.
  141. e.nextWg = waitGroup1() // WaitGroups are difficult to re-use.
  142. e.nextWaitCh = make(chan struct{})
  143. }
  144. if oldNext == nil && newNext != nil {
  145. e.nextWg.Done()
  146. close(e.nextWaitCh)
  147. }
  148. e.mtx.Unlock()
  149. }
  150. // NOTE: This function needs to be safe for
  151. // concurrent goroutines waiting on prevWg
  152. func (e *CElement) SetPrev(newPrev *CElement) {
  153. e.mtx.Lock()
  154. oldPrev := e.prev
  155. e.prev = newPrev
  156. if oldPrev != nil && newPrev == nil {
  157. e.prevWg = waitGroup1() // WaitGroups are difficult to re-use.
  158. e.prevWaitCh = make(chan struct{})
  159. }
  160. if oldPrev == nil && newPrev != nil {
  161. e.prevWg.Done()
  162. close(e.prevWaitCh)
  163. }
  164. e.mtx.Unlock()
  165. }
  166. func (e *CElement) SetRemoved() {
  167. e.mtx.Lock()
  168. e.removed = true
  169. // This wakes up anyone waiting in either direction.
  170. if e.prev == nil {
  171. e.prevWg.Done()
  172. close(e.prevWaitCh)
  173. }
  174. if e.next == nil {
  175. e.nextWg.Done()
  176. close(e.nextWaitCh)
  177. }
  178. e.mtx.Unlock()
  179. }
  180. //--------------------------------------------------------------------------------
  181. // CList represents a linked list.
  182. // The zero value for CList is an empty list ready to use.
  183. // Operations are goroutine-safe.
  184. // Panics if length grows beyond the max.
  185. type CList struct {
  186. mtx sync.RWMutex
  187. wg *sync.WaitGroup
  188. waitCh chan struct{}
  189. head *CElement // first element
  190. tail *CElement // last element
  191. len int // list length
  192. maxLen int // max list length
  193. }
  194. func (l *CList) Init() *CList {
  195. l.mtx.Lock()
  196. l.wg = waitGroup1()
  197. l.waitCh = make(chan struct{})
  198. l.head = nil
  199. l.tail = nil
  200. l.len = 0
  201. l.mtx.Unlock()
  202. return l
  203. }
  204. // Return CList with MaxLength. CList will panic if it goes beyond MaxLength.
  205. func New() *CList { return newWithMax(MaxLength) }
  206. // Return CList with given maxLength.
  207. // Will panic if list exceeds given maxLength.
  208. func newWithMax(maxLength int) *CList {
  209. l := new(CList)
  210. l.maxLen = maxLength
  211. return l.Init()
  212. }
  213. func (l *CList) Len() int {
  214. l.mtx.RLock()
  215. len := l.len
  216. l.mtx.RUnlock()
  217. return len
  218. }
  219. func (l *CList) Front() *CElement {
  220. l.mtx.RLock()
  221. head := l.head
  222. l.mtx.RUnlock()
  223. return head
  224. }
  225. func (l *CList) FrontWait() *CElement {
  226. // Loop until the head is non-nil else wait and try again
  227. for {
  228. l.mtx.RLock()
  229. head := l.head
  230. wg := l.wg
  231. l.mtx.RUnlock()
  232. if head != nil {
  233. return head
  234. }
  235. wg.Wait()
  236. // NOTE: If you think l.head exists here, think harder.
  237. }
  238. }
  239. func (l *CList) Back() *CElement {
  240. l.mtx.RLock()
  241. back := l.tail
  242. l.mtx.RUnlock()
  243. return back
  244. }
  245. func (l *CList) BackWait() *CElement {
  246. for {
  247. l.mtx.RLock()
  248. tail := l.tail
  249. wg := l.wg
  250. l.mtx.RUnlock()
  251. if tail != nil {
  252. return tail
  253. }
  254. wg.Wait()
  255. // l.tail doesn't necessarily exist here.
  256. // That's why we need to continue a for-loop.
  257. }
  258. }
  259. // WaitChan can be used to wait until Front or Back becomes not nil. Once it
  260. // does, channel will be closed.
  261. func (l *CList) WaitChan() <-chan struct{} {
  262. l.mtx.Lock()
  263. defer l.mtx.Unlock()
  264. return l.waitCh
  265. }
  266. // Panics if list grows beyond its max length.
  267. func (l *CList) PushBack(v interface{}) *CElement {
  268. l.mtx.Lock()
  269. // Construct a new element
  270. e := &CElement{
  271. prev: nil,
  272. prevWg: waitGroup1(),
  273. prevWaitCh: make(chan struct{}),
  274. next: nil,
  275. nextWg: waitGroup1(),
  276. nextWaitCh: make(chan struct{}),
  277. removed: false,
  278. Value: v,
  279. }
  280. // Release waiters on FrontWait/BackWait maybe
  281. if l.len == 0 {
  282. l.wg.Done()
  283. close(l.waitCh)
  284. }
  285. if l.len >= l.maxLen {
  286. panic(fmt.Sprintf("clist: maximum length list reached %d", l.maxLen))
  287. }
  288. l.len++
  289. // Modify the tail
  290. if l.tail == nil {
  291. l.head = e
  292. l.tail = e
  293. } else {
  294. e.SetPrev(l.tail) // We must init e first.
  295. l.tail.SetNext(e) // This will make e accessible.
  296. l.tail = e // Update the list.
  297. }
  298. l.mtx.Unlock()
  299. return e
  300. }
  301. // CONTRACT: Caller must call e.DetachPrev() and/or e.DetachNext() to avoid memory leaks.
  302. // NOTE: As per the contract of CList, removed elements cannot be added back.
  303. func (l *CList) Remove(e *CElement) interface{} {
  304. l.mtx.Lock()
  305. prev := e.Prev()
  306. next := e.Next()
  307. if l.head == nil || l.tail == nil {
  308. l.mtx.Unlock()
  309. panic("Remove(e) on empty CList")
  310. }
  311. if prev == nil && l.head != e {
  312. l.mtx.Unlock()
  313. panic("Remove(e) with false head")
  314. }
  315. if next == nil && l.tail != e {
  316. l.mtx.Unlock()
  317. panic("Remove(e) with false tail")
  318. }
  319. // If we're removing the only item, make CList FrontWait/BackWait wait.
  320. if l.len == 1 {
  321. l.wg = waitGroup1() // WaitGroups are difficult to re-use.
  322. l.waitCh = make(chan struct{})
  323. }
  324. // Update l.len
  325. l.len--
  326. // Connect next/prev and set head/tail
  327. if prev == nil {
  328. l.head = next
  329. } else {
  330. prev.SetNext(next)
  331. }
  332. if next == nil {
  333. l.tail = prev
  334. } else {
  335. next.SetPrev(prev)
  336. }
  337. // Set .Done() on e, otherwise waiters will wait forever.
  338. e.SetRemoved()
  339. l.mtx.Unlock()
  340. return e.Value
  341. }
  342. func waitGroup1() (wg *sync.WaitGroup) {
  343. wg = &sync.WaitGroup{}
  344. wg.Add(1)
  345. return
  346. }