You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

279 lines
5.3 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package common
  2. /*
  3. The purpose of CList is to provide a goroutine-safe linked-list.
  4. This list can be traversed concurrently by any number of goroutines.
  5. However, removed CElements cannot be added back.
  6. NOTE: Not all methods of container/list are (yet) implemented.
  7. NOTE: Removed elements need to NukePrev or NukeNext consistently
  8. to ensure garbage collection of removed elements.
  9. */
  10. import (
  11. "sync"
  12. "sync/atomic"
  13. "unsafe"
  14. )
  15. // CElement is an element of a linked-list
  16. // Traversal from a CElement are goroutine-safe.
  17. type CElement struct {
  18. next unsafe.Pointer
  19. nextWg *sync.WaitGroup
  20. prev unsafe.Pointer
  21. prevWg *sync.WaitGroup
  22. removed uint32
  23. Value interface{}
  24. }
  25. // Blocking implementation of Next().
  26. // May return nil iff CElement was tail and got removed.
  27. func (e *CElement) NextWait() *CElement {
  28. for {
  29. e.nextWg.Wait()
  30. next := e.Next()
  31. if next == nil {
  32. if e.Removed() {
  33. return nil
  34. } else {
  35. continue
  36. }
  37. } else {
  38. return next
  39. }
  40. }
  41. }
  42. // Blocking implementation of Prev().
  43. // May return nil iff CElement was head and got removed.
  44. func (e *CElement) PrevWait() *CElement {
  45. for {
  46. e.prevWg.Wait()
  47. prev := e.Prev()
  48. if prev == nil {
  49. if e.Removed() {
  50. return nil
  51. } else {
  52. continue
  53. }
  54. } else {
  55. return prev
  56. }
  57. }
  58. }
  59. // Nonblocking, may return nil if at the end.
  60. func (e *CElement) Next() *CElement {
  61. return (*CElement)(atomic.LoadPointer(&e.next))
  62. }
  63. // Nonblocking, may return nil if at the end.
  64. func (e *CElement) Prev() *CElement {
  65. return (*CElement)(atomic.LoadPointer(&e.prev))
  66. }
  67. func (e *CElement) Removed() bool {
  68. return atomic.LoadUint32(&(e.removed)) > 0
  69. }
  70. func (e *CElement) DetachNext() {
  71. e.setNextAtomic(nil)
  72. e.nextWg.Done()
  73. }
  74. func (e *CElement) DetachPrev() {
  75. e.setPrevAtomic(nil)
  76. e.prevWg.Done()
  77. }
  78. func (e *CElement) setNextAtomic(next *CElement) {
  79. for {
  80. oldNext := atomic.LoadPointer(&e.next)
  81. if !atomic.CompareAndSwapPointer(&(e.next), oldNext, unsafe.Pointer(next)) {
  82. continue
  83. }
  84. if next == nil && oldNext != nil { // We for-loop in NextWait() so race is ok
  85. e.nextWg.Add(1)
  86. }
  87. if next != nil && oldNext == nil {
  88. e.nextWg.Done()
  89. }
  90. return
  91. }
  92. }
  93. func (e *CElement) setPrevAtomic(prev *CElement) {
  94. for {
  95. oldPrev := atomic.LoadPointer(&e.prev)
  96. if !atomic.CompareAndSwapPointer(&(e.prev), oldPrev, unsafe.Pointer(prev)) {
  97. continue
  98. }
  99. if prev == nil && oldPrev != nil { // We for-loop in PrevWait() so race is ok
  100. e.prevWg.Add(1)
  101. }
  102. if prev != nil && oldPrev == nil {
  103. e.prevWg.Done()
  104. }
  105. return
  106. }
  107. }
  108. func (e *CElement) setRemovedAtomic() {
  109. atomic.StoreUint32(&(e.removed), 1)
  110. }
  111. //--------------------------------------------------------------------------------
  112. // CList represents a linked list.
  113. // The zero value for CList is an empty list ready to use.
  114. // Operations are goroutine-safe.
  115. type CList struct {
  116. mtx sync.Mutex
  117. wg *sync.WaitGroup
  118. head *CElement // first element
  119. tail *CElement // last element
  120. len int // list length
  121. }
  122. func (l *CList) Init() *CList {
  123. l.mtx.Lock()
  124. defer l.mtx.Unlock()
  125. l.wg = waitGroup1()
  126. l.head = nil
  127. l.tail = nil
  128. l.len = 0
  129. return l
  130. }
  131. func NewCList() *CList { return new(CList).Init() }
  132. func (l *CList) Len() int {
  133. l.mtx.Lock()
  134. defer l.mtx.Unlock()
  135. return l.len
  136. }
  137. func (l *CList) Front() *CElement {
  138. l.mtx.Lock()
  139. defer l.mtx.Unlock()
  140. return l.head
  141. }
  142. func (l *CList) FrontWait() *CElement {
  143. for {
  144. l.mtx.Lock()
  145. head := l.head
  146. wg := l.wg
  147. l.mtx.Unlock()
  148. if head == nil {
  149. wg.Wait()
  150. } else {
  151. return head
  152. }
  153. }
  154. }
  155. func (l *CList) Back() *CElement {
  156. l.mtx.Lock()
  157. defer l.mtx.Unlock()
  158. return l.tail
  159. }
  160. func (l *CList) BackWait() *CElement {
  161. for {
  162. l.mtx.Lock()
  163. tail := l.tail
  164. wg := l.wg
  165. l.mtx.Unlock()
  166. if tail == nil {
  167. wg.Wait()
  168. } else {
  169. return tail
  170. }
  171. }
  172. }
  173. func (l *CList) PushBack(v interface{}) *CElement {
  174. l.mtx.Lock()
  175. defer l.mtx.Unlock()
  176. // Construct a new element
  177. e := &CElement{
  178. prev: nil,
  179. prevWg: waitGroup1(),
  180. next: nil,
  181. nextWg: waitGroup1(),
  182. Value: v,
  183. }
  184. // Release waiters on FrontWait/BackWait maybe
  185. if l.len == 0 {
  186. l.wg.Done()
  187. }
  188. l.len += 1
  189. // Modify the tail
  190. if l.tail == nil {
  191. l.head = e
  192. l.tail = e
  193. } else {
  194. l.tail.setNextAtomic(e)
  195. e.setPrevAtomic(l.tail)
  196. l.tail = e
  197. }
  198. return e
  199. }
  200. // CONTRACT: Caller must call e.DetachPrev() and/or e.DetachNext() to avoid memory leaks.
  201. // NOTE: As per the contract of CList, removed elements cannot be added back.
  202. func (l *CList) Remove(e *CElement) interface{} {
  203. l.mtx.Lock()
  204. defer l.mtx.Unlock()
  205. prev := e.Prev()
  206. next := e.Next()
  207. if l.head == nil || l.tail == nil {
  208. PanicSanity("Remove(e) on empty CList")
  209. }
  210. if prev == nil && l.head != e {
  211. PanicSanity("Remove(e) with false head")
  212. }
  213. if next == nil && l.tail != e {
  214. PanicSanity("Remove(e) with false tail")
  215. }
  216. // If we're removing the only item, make CList FrontWait/BackWait wait.
  217. if l.len == 1 {
  218. l.wg.Add(1)
  219. }
  220. l.len -= 1
  221. // Modify e.prev and e.next and connect
  222. if prev != nil {
  223. prev.setNextAtomic(next)
  224. }
  225. if next != nil {
  226. next.setPrevAtomic(prev)
  227. }
  228. // Mark e as removed so NextWait/PrevWait can return.
  229. e.setRemovedAtomic()
  230. // Set .Done() on e, otherwise waiters will wait forever.
  231. if prev == nil {
  232. e.prevWg.Done()
  233. }
  234. if next == nil {
  235. e.nextWg.Done()
  236. }
  237. return e.Value
  238. }
  239. func waitGroup1() (wg *sync.WaitGroup) {
  240. wg = &sync.WaitGroup{}
  241. wg.Add(1)
  242. return
  243. }