You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

404 lines
8.7 KiB

  1. package clist
  2. /*
  3. The purpose of CList is to provide a goroutine-safe linked-list.
  4. This list can be traversed concurrently by any number of goroutines.
  5. However, removed CElements cannot be added back.
  6. NOTE: Not all methods of container/list are (yet) implemented.
  7. NOTE: Removed elements need to DetachPrev or DetachNext consistently
  8. to ensure garbage collection of removed elements.
  9. */
  10. import (
  11. "fmt"
  12. "sync"
  13. tmsync "github.com/tendermint/tendermint/libs/sync"
  14. )
  15. // MaxLength is the max allowed number of elements a linked list is
  16. // allowed to contain.
  17. // If more elements are pushed to the list it will panic.
  18. const MaxLength = int(^uint(0) >> 1)
  19. /*
  20. CElement is an element of a linked-list
  21. Traversal from a CElement is goroutine-safe.
  22. We can't avoid using WaitGroups or for-loops given the documentation
  23. spec without re-implementing the primitives that already exist in
  24. golang/sync. Notice that WaitGroup allows many go-routines to be
  25. simultaneously released, which is what we want. Mutex doesn't do
  26. this. RWMutex does this, but it's clumsy to use in the way that a
  27. WaitGroup would be used -- and we'd end up having two RWMutex's for
  28. prev/next each, which is doubly confusing.
  29. sync.Cond would be sort-of useful, but we don't need a write-lock in
  30. the for-loop. Use sync.Cond when you need serial access to the
  31. "condition". In our case our condition is if `next != nil || removed`,
  32. and there's no reason to serialize that condition for goroutines
  33. waiting on NextWait() (since it's just a read operation).
  34. */
  35. type CElement struct {
  36. mtx tmsync.RWMutex
  37. prev *CElement
  38. prevWg *sync.WaitGroup
  39. prevWaitCh chan struct{}
  40. next *CElement
  41. nextWg *sync.WaitGroup
  42. nextWaitCh chan struct{}
  43. removed bool
  44. Value interface{} // immutable
  45. }
  46. // Blocking implementation of Next().
  47. // May return nil iff CElement was tail and got removed.
  48. func (e *CElement) NextWait() *CElement {
  49. for {
  50. e.mtx.RLock()
  51. next := e.next
  52. nextWg := e.nextWg
  53. removed := e.removed
  54. e.mtx.RUnlock()
  55. if next != nil || removed {
  56. return next
  57. }
  58. nextWg.Wait()
  59. // e.next doesn't necessarily exist here.
  60. // That's why we need to continue a for-loop.
  61. }
  62. }
  63. // Blocking implementation of Prev().
  64. // May return nil iff CElement was head and got removed.
  65. func (e *CElement) PrevWait() *CElement {
  66. for {
  67. e.mtx.RLock()
  68. prev := e.prev
  69. prevWg := e.prevWg
  70. removed := e.removed
  71. e.mtx.RUnlock()
  72. if prev != nil || removed {
  73. return prev
  74. }
  75. prevWg.Wait()
  76. }
  77. }
  78. // PrevWaitChan can be used to wait until Prev becomes not nil. Once it does,
  79. // channel will be closed.
  80. func (e *CElement) PrevWaitChan() <-chan struct{} {
  81. e.mtx.RLock()
  82. defer e.mtx.RUnlock()
  83. return e.prevWaitCh
  84. }
  85. // NextWaitChan can be used to wait until Next becomes not nil. Once it does,
  86. // channel will be closed.
  87. func (e *CElement) NextWaitChan() <-chan struct{} {
  88. e.mtx.RLock()
  89. defer e.mtx.RUnlock()
  90. return e.nextWaitCh
  91. }
  92. // Nonblocking, may return nil if at the end.
  93. func (e *CElement) Next() *CElement {
  94. e.mtx.RLock()
  95. val := e.next
  96. e.mtx.RUnlock()
  97. return val
  98. }
  99. // Nonblocking, may return nil if at the end.
  100. func (e *CElement) Prev() *CElement {
  101. e.mtx.RLock()
  102. prev := e.prev
  103. e.mtx.RUnlock()
  104. return prev
  105. }
  106. func (e *CElement) Removed() bool {
  107. e.mtx.RLock()
  108. isRemoved := e.removed
  109. e.mtx.RUnlock()
  110. return isRemoved
  111. }
  112. func (e *CElement) DetachNext() {
  113. e.mtx.Lock()
  114. if !e.removed {
  115. e.mtx.Unlock()
  116. panic("DetachNext() must be called after Remove(e)")
  117. }
  118. e.next = nil
  119. e.mtx.Unlock()
  120. }
  121. func (e *CElement) DetachPrev() {
  122. e.mtx.Lock()
  123. if !e.removed {
  124. e.mtx.Unlock()
  125. panic("DetachPrev() must be called after Remove(e)")
  126. }
  127. e.prev = nil
  128. e.mtx.Unlock()
  129. }
  130. // NOTE: This function needs to be safe for
  131. // concurrent goroutines waiting on nextWg.
  132. func (e *CElement) SetNext(newNext *CElement) {
  133. e.mtx.Lock()
  134. oldNext := e.next
  135. e.next = newNext
  136. if oldNext != nil && newNext == nil {
  137. // See https://golang.org/pkg/sync/:
  138. //
  139. // If a WaitGroup is reused to wait for several independent sets of
  140. // events, new Add calls must happen after all previous Wait calls have
  141. // returned.
  142. e.nextWg = waitGroup1() // WaitGroups are difficult to re-use.
  143. e.nextWaitCh = make(chan struct{})
  144. }
  145. if oldNext == nil && newNext != nil {
  146. e.nextWg.Done()
  147. close(e.nextWaitCh)
  148. }
  149. e.mtx.Unlock()
  150. }
  151. // NOTE: This function needs to be safe for
  152. // concurrent goroutines waiting on prevWg
  153. func (e *CElement) SetPrev(newPrev *CElement) {
  154. e.mtx.Lock()
  155. oldPrev := e.prev
  156. e.prev = newPrev
  157. if oldPrev != nil && newPrev == nil {
  158. e.prevWg = waitGroup1() // WaitGroups are difficult to re-use.
  159. e.prevWaitCh = make(chan struct{})
  160. }
  161. if oldPrev == nil && newPrev != nil {
  162. e.prevWg.Done()
  163. close(e.prevWaitCh)
  164. }
  165. e.mtx.Unlock()
  166. }
  167. func (e *CElement) SetRemoved() {
  168. e.mtx.Lock()
  169. e.removed = true
  170. // This wakes up anyone waiting in either direction.
  171. if e.prev == nil {
  172. e.prevWg.Done()
  173. close(e.prevWaitCh)
  174. }
  175. if e.next == nil {
  176. e.nextWg.Done()
  177. close(e.nextWaitCh)
  178. }
  179. e.mtx.Unlock()
  180. }
  181. //--------------------------------------------------------------------------------
  182. // CList represents a linked list.
  183. // The zero value for CList is an empty list ready to use.
  184. // Operations are goroutine-safe.
  185. // Panics if length grows beyond the max.
  186. type CList struct {
  187. mtx tmsync.RWMutex
  188. wg *sync.WaitGroup
  189. waitCh chan struct{}
  190. head *CElement // first element
  191. tail *CElement // last element
  192. len int // list length
  193. maxLen int // max list length
  194. }
  195. // Return CList with MaxLength. CList will panic if it goes beyond MaxLength.
  196. func New() *CList { return newWithMax(MaxLength) }
  197. // Return CList with given maxLength.
  198. // Will panic if list exceeds given maxLength.
  199. func newWithMax(maxLength int) *CList {
  200. l := new(CList)
  201. l.maxLen = maxLength
  202. l.wg = waitGroup1()
  203. l.waitCh = make(chan struct{})
  204. l.head = nil
  205. l.tail = nil
  206. l.len = 0
  207. return l
  208. }
  209. func (l *CList) Len() int {
  210. l.mtx.RLock()
  211. len := l.len
  212. l.mtx.RUnlock()
  213. return len
  214. }
  215. func (l *CList) Front() *CElement {
  216. l.mtx.RLock()
  217. head := l.head
  218. l.mtx.RUnlock()
  219. return head
  220. }
  221. func (l *CList) FrontWait() *CElement {
  222. // Loop until the head is non-nil else wait and try again
  223. for {
  224. l.mtx.RLock()
  225. head := l.head
  226. wg := l.wg
  227. l.mtx.RUnlock()
  228. if head != nil {
  229. return head
  230. }
  231. wg.Wait()
  232. // NOTE: If you think l.head exists here, think harder.
  233. }
  234. }
  235. func (l *CList) Back() *CElement {
  236. l.mtx.RLock()
  237. back := l.tail
  238. l.mtx.RUnlock()
  239. return back
  240. }
  241. func (l *CList) BackWait() *CElement {
  242. for {
  243. l.mtx.RLock()
  244. tail := l.tail
  245. wg := l.wg
  246. l.mtx.RUnlock()
  247. if tail != nil {
  248. return tail
  249. }
  250. wg.Wait()
  251. // l.tail doesn't necessarily exist here.
  252. // That's why we need to continue a for-loop.
  253. }
  254. }
  255. // WaitChan can be used to wait until Front or Back becomes not nil. Once it
  256. // does, channel will be closed.
  257. func (l *CList) WaitChan() <-chan struct{} {
  258. l.mtx.Lock()
  259. defer l.mtx.Unlock()
  260. return l.waitCh
  261. }
  262. // Panics if list grows beyond its max length.
  263. func (l *CList) PushBack(v interface{}) *CElement {
  264. l.mtx.Lock()
  265. // Construct a new element
  266. e := &CElement{
  267. prev: nil,
  268. prevWg: waitGroup1(),
  269. prevWaitCh: make(chan struct{}),
  270. next: nil,
  271. nextWg: waitGroup1(),
  272. nextWaitCh: make(chan struct{}),
  273. removed: false,
  274. Value: v,
  275. }
  276. // Release waiters on FrontWait/BackWait maybe
  277. if l.len == 0 {
  278. l.wg.Done()
  279. close(l.waitCh)
  280. }
  281. if l.len >= l.maxLen {
  282. panic(fmt.Sprintf("clist: maximum length list reached %d", l.maxLen))
  283. }
  284. l.len++
  285. // Modify the tail
  286. if l.tail == nil {
  287. l.head = e
  288. l.tail = e
  289. } else {
  290. e.SetPrev(l.tail) // We must init e first.
  291. l.tail.SetNext(e) // This will make e accessible.
  292. l.tail = e // Update the list.
  293. }
  294. l.mtx.Unlock()
  295. return e
  296. }
  297. // CONTRACT: Caller must call e.DetachPrev() and/or e.DetachNext() to avoid memory leaks.
  298. // NOTE: As per the contract of CList, removed elements cannot be added back.
  299. func (l *CList) Remove(e *CElement) interface{} {
  300. l.mtx.Lock()
  301. prev := e.Prev()
  302. next := e.Next()
  303. if l.head == nil || l.tail == nil {
  304. l.mtx.Unlock()
  305. panic("Remove(e) on empty CList")
  306. }
  307. if prev == nil && l.head != e {
  308. l.mtx.Unlock()
  309. panic("Remove(e) with false head")
  310. }
  311. if next == nil && l.tail != e {
  312. l.mtx.Unlock()
  313. panic("Remove(e) with false tail")
  314. }
  315. // If we're removing the only item, make CList FrontWait/BackWait wait.
  316. if l.len == 1 {
  317. l.wg = waitGroup1() // WaitGroups are difficult to re-use.
  318. l.waitCh = make(chan struct{})
  319. }
  320. // Update l.len
  321. l.len--
  322. // Connect next/prev and set head/tail
  323. if prev == nil {
  324. l.head = next
  325. } else {
  326. prev.SetNext(next)
  327. }
  328. if next == nil {
  329. l.tail = prev
  330. } else {
  331. next.SetPrev(prev)
  332. }
  333. // Set .Done() on e, otherwise waiters will wait forever.
  334. e.SetRemoved()
  335. l.mtx.Unlock()
  336. return e.Value
  337. }
  338. func waitGroup1() (wg *sync.WaitGroup) {
  339. wg = &sync.WaitGroup{}
  340. wg.Add(1)
  341. return
  342. }