You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

451 lines
9.6 KiB

  1. package clist
  2. /*
  3. The purpose of CList is to provide a goroutine-safe linked-list.
  4. This list can be traversed concurrently by any number of goroutines.
  5. However, removed CElements cannot be added back.
  6. NOTE: Not all methods of container/list are (yet) implemented.
  7. NOTE: Removed elements need to DetachPrev or DetachNext consistently
  8. to ensure garbage collection of removed elements.
  9. */
  10. import (
  11. "fmt"
  12. "sync"
  13. tmsync "github.com/tendermint/tendermint/internal/libs/sync"
  14. )
  15. // MaxLength is the max allowed number of elements a linked list is
  16. // allowed to contain.
  17. // If more elements are pushed to the list it will panic.
  18. const MaxLength = int(^uint(0) >> 1)
  19. /*
  20. CElement is an element of a linked-list
  21. Traversal from a CElement is goroutine-safe.
  22. We can't avoid using WaitGroups or for-loops given the documentation
  23. spec without re-implementing the primitives that already exist in
  24. golang/sync. Notice that WaitGroup allows many go-routines to be
  25. simultaneously released, which is what we want. Mutex doesn't do
  26. this. RWMutex does this, but it's clumsy to use in the way that a
  27. WaitGroup would be used -- and we'd end up having two RWMutex's for
  28. prev/next each, which is doubly confusing.
  29. sync.Cond would be sort-of useful, but we don't need a write-lock in
  30. the for-loop. Use sync.Cond when you need serial access to the
  31. "condition". In our case our condition is if `next != nil || removed`,
  32. and there's no reason to serialize that condition for goroutines
  33. waiting on NextWait() (since it's just a read operation).
  34. */
  35. type CElement struct {
  36. mtx tmsync.RWMutex
  37. prev *CElement
  38. prevWg *sync.WaitGroup
  39. prevWaitCh chan struct{}
  40. next *CElement
  41. nextWg *sync.WaitGroup
  42. nextWaitCh chan struct{}
  43. removed bool
  44. Value interface{} // immutable
  45. }
  46. // Blocking implementation of Next().
  47. // May return nil iff CElement was tail and got removed.
  48. func (e *CElement) NextWait() *CElement {
  49. for {
  50. e.mtx.RLock()
  51. next := e.next
  52. nextWg := e.nextWg
  53. removed := e.removed
  54. e.mtx.RUnlock()
  55. if next != nil || removed {
  56. return next
  57. }
  58. nextWg.Wait()
  59. // e.next doesn't necessarily exist here.
  60. // That's why we need to continue a for-loop.
  61. }
  62. }
  63. // Blocking implementation of Prev().
  64. // May return nil iff CElement was head and got removed.
  65. func (e *CElement) PrevWait() *CElement {
  66. for {
  67. e.mtx.RLock()
  68. prev := e.prev
  69. prevWg := e.prevWg
  70. removed := e.removed
  71. e.mtx.RUnlock()
  72. if prev != nil || removed {
  73. return prev
  74. }
  75. prevWg.Wait()
  76. }
  77. }
  78. // PrevWaitChan can be used to wait until Prev becomes not nil. Once it does,
  79. // channel will be closed.
  80. func (e *CElement) PrevWaitChan() <-chan struct{} {
  81. e.mtx.RLock()
  82. defer e.mtx.RUnlock()
  83. return e.prevWaitCh
  84. }
  85. // NextWaitChan can be used to wait until Next becomes not nil. Once it does,
  86. // channel will be closed.
  87. func (e *CElement) NextWaitChan() <-chan struct{} {
  88. e.mtx.RLock()
  89. defer e.mtx.RUnlock()
  90. return e.nextWaitCh
  91. }
  92. // Nonblocking, may return nil if at the end.
  93. func (e *CElement) Next() *CElement {
  94. e.mtx.RLock()
  95. val := e.next
  96. e.mtx.RUnlock()
  97. return val
  98. }
  99. // Nonblocking, may return nil if at the end.
  100. func (e *CElement) Prev() *CElement {
  101. e.mtx.RLock()
  102. prev := e.prev
  103. e.mtx.RUnlock()
  104. return prev
  105. }
  106. func (e *CElement) Removed() bool {
  107. e.mtx.RLock()
  108. isRemoved := e.removed
  109. e.mtx.RUnlock()
  110. return isRemoved
  111. }
  112. func (e *CElement) DetachNext() {
  113. e.mtx.Lock()
  114. if !e.removed {
  115. e.mtx.Unlock()
  116. panic("DetachNext() must be called after Remove(e)")
  117. }
  118. e.next = nil
  119. e.mtx.Unlock()
  120. }
  121. func (e *CElement) DetachPrev() {
  122. e.mtx.Lock()
  123. if !e.removed {
  124. e.mtx.Unlock()
  125. panic("DetachPrev() must be called after Remove(e)")
  126. }
  127. e.prev = nil
  128. e.mtx.Unlock()
  129. }
  130. // NOTE: This function needs to be safe for
  131. // concurrent goroutines waiting on nextWg.
  132. func (e *CElement) SetNext(newNext *CElement) {
  133. e.mtx.Lock()
  134. oldNext := e.next
  135. e.next = newNext
  136. if oldNext != nil && newNext == nil {
  137. // See https://golang.org/pkg/sync/:
  138. //
  139. // If a WaitGroup is reused to wait for several independent sets of
  140. // events, new Add calls must happen after all previous Wait calls have
  141. // returned.
  142. e.nextWg = waitGroup1() // WaitGroups are difficult to re-use.
  143. e.nextWaitCh = make(chan struct{})
  144. }
  145. if oldNext == nil && newNext != nil {
  146. e.nextWg.Done()
  147. close(e.nextWaitCh)
  148. }
  149. e.mtx.Unlock()
  150. }
  151. // NOTE: This function needs to be safe for
  152. // concurrent goroutines waiting on prevWg
  153. func (e *CElement) SetPrev(newPrev *CElement) {
  154. e.mtx.Lock()
  155. oldPrev := e.prev
  156. e.prev = newPrev
  157. if oldPrev != nil && newPrev == nil {
  158. e.prevWg = waitGroup1() // WaitGroups are difficult to re-use.
  159. e.prevWaitCh = make(chan struct{})
  160. }
  161. if oldPrev == nil && newPrev != nil {
  162. e.prevWg.Done()
  163. close(e.prevWaitCh)
  164. }
  165. e.mtx.Unlock()
  166. }
  167. // Detach is a shortcut to mark the given element remove and detach the prev/next elements.
  168. func (e *CElement) Detach() {
  169. e.mtx.Lock()
  170. defer e.mtx.Unlock()
  171. e.removed = true
  172. if e.prev == nil {
  173. e.prevWg.Done()
  174. close(e.prevWaitCh)
  175. } else {
  176. e.prev = nil
  177. }
  178. if e.next == nil {
  179. e.nextWg.Done()
  180. close(e.nextWaitCh)
  181. } else {
  182. e.next = nil
  183. }
  184. }
  185. //--------------------------------------------------------------------------------
  186. // CList represents a linked list.
  187. // The zero value for CList is an empty list ready to use.
  188. // Operations are goroutine-safe.
  189. // Panics if length grows beyond the max.
  190. type CList struct {
  191. mtx tmsync.RWMutex
  192. wg *sync.WaitGroup
  193. waitCh chan struct{}
  194. head *CElement // first element
  195. tail *CElement // last element
  196. len int // list length
  197. maxLen int // max list length
  198. }
  199. // Return CList with MaxLength. CList will panic if it goes beyond MaxLength.
  200. func New() *CList { return newWithMax(MaxLength) }
  201. // Return CList with given maxLength.
  202. // Will panic if list exceeds given maxLength.
  203. func newWithMax(maxLength int) *CList {
  204. l := new(CList)
  205. l.maxLen = maxLength
  206. l.wg = waitGroup1()
  207. l.waitCh = make(chan struct{})
  208. l.head = nil
  209. l.tail = nil
  210. l.len = 0
  211. return l
  212. }
  213. func (l *CList) Len() int {
  214. l.mtx.RLock()
  215. len := l.len
  216. l.mtx.RUnlock()
  217. return len
  218. }
  219. func (l *CList) Front() *CElement {
  220. l.mtx.RLock()
  221. head := l.head
  222. l.mtx.RUnlock()
  223. return head
  224. }
  225. func (l *CList) FrontWait() *CElement {
  226. // Loop until the head is non-nil else wait and try again
  227. for {
  228. l.mtx.RLock()
  229. head := l.head
  230. wg := l.wg
  231. l.mtx.RUnlock()
  232. if head != nil {
  233. return head
  234. }
  235. wg.Wait()
  236. // NOTE: If you think l.head exists here, think harder.
  237. }
  238. }
  239. func (l *CList) Back() *CElement {
  240. l.mtx.RLock()
  241. back := l.tail
  242. l.mtx.RUnlock()
  243. return back
  244. }
  245. func (l *CList) BackWait() *CElement {
  246. for {
  247. l.mtx.RLock()
  248. tail := l.tail
  249. wg := l.wg
  250. l.mtx.RUnlock()
  251. if tail != nil {
  252. return tail
  253. }
  254. wg.Wait()
  255. // l.tail doesn't necessarily exist here.
  256. // That's why we need to continue a for-loop.
  257. }
  258. }
  259. // WaitChan can be used to wait until Front or Back becomes not nil. Once it
  260. // does, channel will be closed.
  261. func (l *CList) WaitChan() <-chan struct{} {
  262. l.mtx.Lock()
  263. defer l.mtx.Unlock()
  264. return l.waitCh
  265. }
  266. // Panics if list grows beyond its max length.
  267. func (l *CList) PushBack(v interface{}) *CElement {
  268. l.mtx.Lock()
  269. // Construct a new element
  270. e := &CElement{
  271. prev: nil,
  272. prevWg: waitGroup1(),
  273. prevWaitCh: make(chan struct{}),
  274. next: nil,
  275. nextWg: waitGroup1(),
  276. nextWaitCh: make(chan struct{}),
  277. removed: false,
  278. Value: v,
  279. }
  280. // Release waiters on FrontWait/BackWait maybe
  281. if l.len == 0 {
  282. l.wg.Done()
  283. close(l.waitCh)
  284. }
  285. if l.len >= l.maxLen {
  286. panic(fmt.Sprintf("clist: maximum length list reached %d", l.maxLen))
  287. }
  288. l.len++
  289. // Modify the tail
  290. if l.tail == nil {
  291. l.head = e
  292. l.tail = e
  293. } else {
  294. e.SetPrev(l.tail) // We must init e first.
  295. l.tail.SetNext(e) // This will make e accessible.
  296. l.tail = e // Update the list.
  297. }
  298. l.mtx.Unlock()
  299. return e
  300. }
  301. // Remove removes the given element in the CList
  302. // NOTE: As per the contract of CList, removed elements cannot be added back.
  303. // Because CList detachse the prev/next element when it removes the given element,
  304. // please do not use CElement.Next() in the for loop postcondition, uses
  305. // a variable ahead the for loop and then assigns the Next() element
  306. // to it in the loop as the postcondition.
  307. func (l *CList) Remove(e *CElement) interface{} {
  308. l.mtx.Lock()
  309. defer l.mtx.Unlock()
  310. prev := e.Prev()
  311. next := e.Next()
  312. if l.head == nil || l.tail == nil {
  313. panic("Remove(e) on empty CList")
  314. }
  315. if prev == nil && l.head != e {
  316. panic("Remove(e) with false head")
  317. }
  318. if next == nil && l.tail != e {
  319. panic("Remove(e) with false tail")
  320. }
  321. // If we're removing the only item, make CList FrontWait/BackWait wait.
  322. if l.len == 1 {
  323. l.wg = waitGroup1() // WaitGroups are difficult to re-use.
  324. l.waitCh = make(chan struct{})
  325. }
  326. // Update l.len
  327. l.len--
  328. // Connect next/prev and set head/tail
  329. if prev == nil {
  330. l.head = next
  331. } else {
  332. prev.SetNext(next)
  333. }
  334. if next == nil {
  335. l.tail = prev
  336. } else {
  337. next.SetPrev(prev)
  338. }
  339. e.Detach()
  340. return e.Value
  341. }
  342. // Clear removes all the elements in the CList
  343. func (l *CList) Clear() {
  344. l.mtx.Lock()
  345. defer l.mtx.Unlock()
  346. if l.head == nil || l.tail == nil {
  347. return
  348. }
  349. for e := l.head; e != nil; e = l.head {
  350. prevE := e.Prev()
  351. nextE := e.Next()
  352. if prevE == nil && e != l.head {
  353. panic("CList.Clear failed due to nil prev element")
  354. }
  355. if nextE == nil && e != l.tail {
  356. panic("CList.Clear failed due to nil next element")
  357. }
  358. if l.len == 1 {
  359. l.wg = waitGroup1()
  360. l.waitCh = make(chan struct{})
  361. }
  362. l.len--
  363. if prevE == nil {
  364. l.head = nextE
  365. } else {
  366. prevE.SetNext(nextE)
  367. }
  368. if nextE == nil {
  369. l.tail = prevE
  370. } else {
  371. nextE.SetPrev(prevE)
  372. }
  373. e.Detach()
  374. }
  375. }
  376. func waitGroup1() (wg *sync.WaitGroup) {
  377. wg = &sync.WaitGroup{}
  378. wg.Add(1)
  379. return
  380. }