You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

407 lines
8.7 KiB

  1. package clist
  2. /*
  3. The purpose of CList is to provide a goroutine-safe linked-list.
  4. This list can be traversed concurrently by any number of goroutines.
  5. However, removed CElements cannot be added back.
  6. NOTE: Not all methods of container/list are (yet) implemented.
  7. NOTE: Removed elements need to DetachPrev or DetachNext consistently
  8. to ensure garbage collection of removed elements.
  9. */
  10. import (
  11. "fmt"
  12. "sync"
  13. )
  14. // MaxLength is the max allowed number of elements a linked list is
  15. // allowed to contain.
  16. // If more elements are pushed to the list it will panic.
  17. const MaxLength = int(^uint(0) >> 1)
  18. /*
  19. CElement is an element of a linked-list
  20. Traversal from a CElement is goroutine-safe.
  21. We can't avoid using WaitGroups or for-loops given the documentation
  22. spec without re-implementing the primitives that already exist in
  23. golang/sync. Notice that WaitGroup allows many go-routines to be
  24. simultaneously released, which is what we want. Mutex doesn't do
  25. this. RWMutex does this, but it's clumsy to use in the way that a
  26. WaitGroup would be used -- and we'd end up having two RWMutex's for
  27. prev/next each, which is doubly confusing.
  28. sync.Cond would be sort-of useful, but we don't need a write-lock in
  29. the for-loop. Use sync.Cond when you need serial access to the
  30. "condition". In our case our condition is if `next != nil || removed`,
  31. and there's no reason to serialize that condition for goroutines
  32. waiting on NextWait() (since it's just a read operation).
  33. */
  34. type CElement struct {
  35. mtx sync.RWMutex
  36. prev *CElement
  37. prevWg *sync.WaitGroup
  38. prevWaitCh chan struct{}
  39. next *CElement
  40. nextWg *sync.WaitGroup
  41. nextWaitCh chan struct{}
  42. removed bool
  43. Value interface{} // immutable
  44. }
  45. // Blocking implementation of Next().
  46. // May return nil iff CElement was tail and got removed.
  47. func (e *CElement) NextWait() *CElement {
  48. for {
  49. e.mtx.RLock()
  50. next := e.next
  51. nextWg := e.nextWg
  52. removed := e.removed
  53. e.mtx.RUnlock()
  54. if next != nil || removed {
  55. return next
  56. }
  57. nextWg.Wait()
  58. // e.next doesn't necessarily exist here.
  59. // That's why we need to continue a for-loop.
  60. }
  61. }
  62. // Blocking implementation of Prev().
  63. // May return nil iff CElement was head and got removed.
  64. func (e *CElement) PrevWait() *CElement {
  65. for {
  66. e.mtx.RLock()
  67. prev := e.prev
  68. prevWg := e.prevWg
  69. removed := e.removed
  70. e.mtx.RUnlock()
  71. if prev != nil || removed {
  72. return prev
  73. }
  74. prevWg.Wait()
  75. }
  76. }
  77. // PrevWaitChan can be used to wait until Prev becomes not nil. Once it does,
  78. // channel will be closed.
  79. func (e *CElement) PrevWaitChan() <-chan struct{} {
  80. e.mtx.RLock()
  81. defer e.mtx.RUnlock()
  82. return e.prevWaitCh
  83. }
  84. // NextWaitChan can be used to wait until Next becomes not nil. Once it does,
  85. // channel will be closed.
  86. func (e *CElement) NextWaitChan() <-chan struct{} {
  87. e.mtx.RLock()
  88. defer e.mtx.RUnlock()
  89. return e.nextWaitCh
  90. }
  91. // Nonblocking, may return nil if at the end.
  92. func (e *CElement) Next() *CElement {
  93. e.mtx.RLock()
  94. defer e.mtx.RUnlock()
  95. return e.next
  96. }
  97. // Nonblocking, may return nil if at the end.
  98. func (e *CElement) Prev() *CElement {
  99. e.mtx.RLock()
  100. prev := e.prev
  101. e.mtx.RUnlock()
  102. return prev
  103. }
  104. func (e *CElement) Removed() bool {
  105. e.mtx.RLock()
  106. isRemoved := e.removed
  107. e.mtx.RUnlock()
  108. return isRemoved
  109. }
  110. func (e *CElement) DetachNext() {
  111. e.mtx.Lock()
  112. if !e.removed {
  113. e.mtx.Unlock()
  114. panic("DetachNext() must be called after Remove(e)")
  115. }
  116. e.next = nil
  117. e.mtx.Unlock()
  118. }
  119. func (e *CElement) DetachPrev() {
  120. e.mtx.Lock()
  121. if !e.removed {
  122. e.mtx.Unlock()
  123. panic("DetachPrev() must be called after Remove(e)")
  124. }
  125. e.prev = nil
  126. e.mtx.Unlock()
  127. }
  128. // NOTE: This function needs to be safe for
  129. // concurrent goroutines waiting on nextWg.
  130. func (e *CElement) SetNext(newNext *CElement) {
  131. e.mtx.Lock()
  132. oldNext := e.next
  133. e.next = newNext
  134. if oldNext != nil && newNext == nil {
  135. // See https://golang.org/pkg/sync/:
  136. //
  137. // If a WaitGroup is reused to wait for several independent sets of
  138. // events, new Add calls must happen after all previous Wait calls have
  139. // returned.
  140. e.nextWg = waitGroup1() // WaitGroups are difficult to re-use.
  141. e.nextWaitCh = make(chan struct{})
  142. }
  143. if oldNext == nil && newNext != nil {
  144. e.nextWg.Done()
  145. close(e.nextWaitCh)
  146. }
  147. e.mtx.Unlock()
  148. }
  149. // NOTE: This function needs to be safe for
  150. // concurrent goroutines waiting on prevWg
  151. func (e *CElement) SetPrev(newPrev *CElement) {
  152. e.mtx.Lock()
  153. oldPrev := e.prev
  154. e.prev = newPrev
  155. if oldPrev != nil && newPrev == nil {
  156. e.prevWg = waitGroup1() // WaitGroups are difficult to re-use.
  157. e.prevWaitCh = make(chan struct{})
  158. }
  159. if oldPrev == nil && newPrev != nil {
  160. e.prevWg.Done()
  161. close(e.prevWaitCh)
  162. }
  163. e.mtx.Unlock()
  164. }
  165. func (e *CElement) SetRemoved() {
  166. e.mtx.Lock()
  167. e.removed = true
  168. // This wakes up anyone waiting in either direction.
  169. if e.prev == nil {
  170. e.prevWg.Done()
  171. close(e.prevWaitCh)
  172. }
  173. if e.next == nil {
  174. e.nextWg.Done()
  175. close(e.nextWaitCh)
  176. }
  177. e.mtx.Unlock()
  178. }
  179. //--------------------------------------------------------------------------------
  180. // CList represents a linked list.
  181. // The zero value for CList is an empty list ready to use.
  182. // Operations are goroutine-safe.
  183. // Panics if length grows beyond the max.
  184. type CList struct {
  185. mtx sync.RWMutex
  186. wg *sync.WaitGroup
  187. waitCh chan struct{}
  188. head *CElement // first element
  189. tail *CElement // last element
  190. len int // list length
  191. maxLen int // max list length
  192. }
  193. func (l *CList) Init() *CList {
  194. l.mtx.Lock()
  195. l.wg = waitGroup1()
  196. l.waitCh = make(chan struct{})
  197. l.head = nil
  198. l.tail = nil
  199. l.len = 0
  200. l.mtx.Unlock()
  201. return l
  202. }
  203. // Return CList with MaxLength. CList will panic if it goes beyond MaxLength.
  204. func New() *CList { return newWithMax(MaxLength) }
  205. // Return CList with given maxLength.
  206. // Will panic if list exceeds given maxLength.
  207. func newWithMax(maxLength int) *CList {
  208. l := new(CList)
  209. l.maxLen = maxLength
  210. return l.Init()
  211. }
  212. func (l *CList) Len() int {
  213. l.mtx.RLock()
  214. len := l.len
  215. l.mtx.RUnlock()
  216. return len
  217. }
  218. func (l *CList) Front() *CElement {
  219. l.mtx.RLock()
  220. head := l.head
  221. l.mtx.RUnlock()
  222. return head
  223. }
  224. func (l *CList) FrontWait() *CElement {
  225. // Loop until the head is non-nil else wait and try again
  226. for {
  227. l.mtx.RLock()
  228. head := l.head
  229. wg := l.wg
  230. l.mtx.RUnlock()
  231. if head != nil {
  232. return head
  233. }
  234. wg.Wait()
  235. // NOTE: If you think l.head exists here, think harder.
  236. }
  237. }
  238. func (l *CList) Back() *CElement {
  239. l.mtx.RLock()
  240. back := l.tail
  241. l.mtx.RUnlock()
  242. return back
  243. }
  244. func (l *CList) BackWait() *CElement {
  245. for {
  246. l.mtx.RLock()
  247. tail := l.tail
  248. wg := l.wg
  249. l.mtx.RUnlock()
  250. if tail != nil {
  251. return tail
  252. }
  253. wg.Wait()
  254. // l.tail doesn't necessarily exist here.
  255. // That's why we need to continue a for-loop.
  256. }
  257. }
  258. // WaitChan can be used to wait until Front or Back becomes not nil. Once it
  259. // does, channel will be closed.
  260. func (l *CList) WaitChan() <-chan struct{} {
  261. l.mtx.Lock()
  262. defer l.mtx.Unlock()
  263. return l.waitCh
  264. }
  265. // Panics if list grows beyond its max length.
  266. func (l *CList) PushBack(v interface{}) *CElement {
  267. l.mtx.Lock()
  268. // Construct a new element
  269. e := &CElement{
  270. prev: nil,
  271. prevWg: waitGroup1(),
  272. prevWaitCh: make(chan struct{}),
  273. next: nil,
  274. nextWg: waitGroup1(),
  275. nextWaitCh: make(chan struct{}),
  276. removed: false,
  277. Value: v,
  278. }
  279. // Release waiters on FrontWait/BackWait maybe
  280. if l.len == 0 {
  281. l.wg.Done()
  282. close(l.waitCh)
  283. }
  284. if l.len >= l.maxLen {
  285. panic(fmt.Sprintf("clist: maximum length list reached %d", l.maxLen))
  286. }
  287. l.len++
  288. // Modify the tail
  289. if l.tail == nil {
  290. l.head = e
  291. l.tail = e
  292. } else {
  293. e.SetPrev(l.tail) // We must init e first.
  294. l.tail.SetNext(e) // This will make e accessible.
  295. l.tail = e // Update the list.
  296. }
  297. l.mtx.Unlock()
  298. return e
  299. }
  300. // CONTRACT: Caller must call e.DetachPrev() and/or e.DetachNext() to avoid memory leaks.
  301. // NOTE: As per the contract of CList, removed elements cannot be added back.
  302. func (l *CList) Remove(e *CElement) interface{} {
  303. l.mtx.Lock()
  304. prev := e.Prev()
  305. next := e.Next()
  306. if l.head == nil || l.tail == nil {
  307. l.mtx.Unlock()
  308. panic("Remove(e) on empty CList")
  309. }
  310. if prev == nil && l.head != e {
  311. l.mtx.Unlock()
  312. panic("Remove(e) with false head")
  313. }
  314. if next == nil && l.tail != e {
  315. l.mtx.Unlock()
  316. panic("Remove(e) with false tail")
  317. }
  318. // If we're removing the only item, make CList FrontWait/BackWait wait.
  319. if l.len == 1 {
  320. l.wg = waitGroup1() // WaitGroups are difficult to re-use.
  321. l.waitCh = make(chan struct{})
  322. }
  323. // Update l.len
  324. l.len--
  325. // Connect next/prev and set head/tail
  326. if prev == nil {
  327. l.head = next
  328. } else {
  329. prev.SetNext(next)
  330. }
  331. if next == nil {
  332. l.tail = prev
  333. } else {
  334. next.SetPrev(prev)
  335. }
  336. // Set .Done() on e, otherwise waiters will wait forever.
  337. e.SetRemoved()
  338. l.mtx.Unlock()
  339. return e.Value
  340. }
  341. func waitGroup1() (wg *sync.WaitGroup) {
  342. wg = &sync.WaitGroup{}
  343. wg.Add(1)
  344. return
  345. }