You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

217 lines
7.0 KiB

  1. // Package eventlog defines a reverse time-ordered log of events over a sliding
  2. // window of time before the most recent item in the log.
  3. //
  4. // New items are added to the head of the log (the newest end), and items that
  5. // fall outside the designated window are pruned from its tail (the oldest).
  6. // Items within the log are indexed by lexicographically-ordered cursors.
  7. package eventlog
  8. import (
  9. "context"
  10. "errors"
  11. "sync"
  12. "time"
  13. "github.com/tendermint/tendermint/internal/eventlog/cursor"
  14. "github.com/tendermint/tendermint/types"
  15. )
  16. // A Log is a reverse time-ordered log of events in a sliding window of time
  17. // before the newest item. Use Add to add new items to the front (head) of the
  18. // log, and Scan or WaitScan to traverse the current contents of the log.
  19. //
  20. // After construction, a *Log is safe for concurrent access by one writer and
  21. // any number of readers.
  22. type Log struct {
  23. // These values do not change after construction.
  24. windowSize time.Duration
  25. maxItems int
  26. numItemsGauge gauge
  27. // Protects access to the fields below. Lock to modify the values of these
  28. // fields, or to read or snapshot the values.
  29. mu sync.Mutex
  30. numItems int // total number of items in the log
  31. oldestCursor cursor.Cursor // cursor of the oldest item
  32. head *logEntry // pointer to the newest item
  33. ready chan struct{} // closed when head changes
  34. source cursor.Source // generator of cursors
  35. }
  36. // New constructs a new empty log with the given settings.
  37. func New(opts LogSettings) (*Log, error) {
  38. if opts.WindowSize <= 0 {
  39. return nil, errors.New("window size must be positive")
  40. }
  41. lg := &Log{
  42. windowSize: opts.WindowSize,
  43. maxItems: opts.MaxItems,
  44. numItemsGauge: discard{},
  45. ready: make(chan struct{}),
  46. source: opts.Source,
  47. }
  48. if opts.Metrics != nil {
  49. lg.numItemsGauge = opts.Metrics.numItemsGauge
  50. }
  51. return lg, nil
  52. }
  53. // Add adds a new item to the front of the log. If necessary, the log is pruned
  54. // to fit its constraints on size and age. Add blocks until both steps are done.
  55. //
  56. // Any error reported by Add arises from pruning; the new item was added to the
  57. // log regardless whether an error occurs.
  58. func (lg *Log) Add(etype string, data types.EventData) error {
  59. lg.mu.Lock()
  60. head := &logEntry{
  61. item: newItem(lg.source.Cursor(), etype, data),
  62. next: lg.head,
  63. }
  64. lg.numItems++
  65. lg.updateHead(head)
  66. size := lg.numItems
  67. age := head.item.Cursor.Diff(lg.oldestCursor)
  68. // If the log requires pruning, do the pruning step outside the lock. This
  69. // permits readers to continue to make progress while we're working.
  70. lg.mu.Unlock()
  71. return lg.checkPrune(head, size, age)
  72. }
  73. // Scan scans the current contents of the log, calling f with each item until
  74. // all items are visited or f reports an error. If f returns ErrStopScan, Scan
  75. // returns nil, otherwise it returns the error reported by f.
  76. //
  77. // The Info value returned is valid even if Scan reports an error.
  78. func (lg *Log) Scan(f func(*Item) error) (Info, error) {
  79. return lg.scanState(lg.state(), f)
  80. }
  81. // WaitScan blocks until the cursor of the frontmost log item is different from
  82. // c, then executes a Scan on the contents of the log. If ctx ends before the
  83. // head is updated, WaitScan returns an error without calling f.
  84. //
  85. // The Info value returned is valid even if WaitScan reports an error.
  86. func (lg *Log) WaitScan(ctx context.Context, c cursor.Cursor, f func(*Item) error) (Info, error) {
  87. st := lg.state()
  88. for st.head == nil || st.head.item.Cursor == c {
  89. var err error
  90. st, err = lg.waitStateChange(ctx)
  91. if err != nil {
  92. return st.info(), err
  93. }
  94. }
  95. return lg.scanState(st, f)
  96. }
  97. // Info returns the current state of the log.
  98. func (lg *Log) Info() Info { return lg.state().info() }
  99. // ErrStopScan is returned by a Scan callback to signal that scanning should be
  100. // terminated without error.
  101. var ErrStopScan = errors.New("stop scanning")
  102. // ErrLogPruned is returned by Add to signal that at least some events within
  103. // the time window were discarded by pruning in excess of the size limit.
  104. // This error may be wrapped, use errors.Is to test for it.
  105. var ErrLogPruned = errors.New("log pruned")
  106. // LogSettings configure the construction of an event log.
  107. type LogSettings struct {
  108. // The size of the time window measured in time before the newest item.
  109. // This value must be positive.
  110. WindowSize time.Duration
  111. // The maximum number of items that will be retained in memory within the
  112. // designated time window. A value ≤ 0 imposes no limit, otherwise items in
  113. // excess of this number will be dropped from the log.
  114. MaxItems int
  115. // The cursor source to use for log entries. If not set, use wallclock time.
  116. Source cursor.Source
  117. // If non-nil, exported metrics to update. If nil, metrics are discarded.
  118. Metrics *Metrics
  119. }
  120. // Info records the current state of the log at the time of a scan operation.
  121. type Info struct {
  122. Oldest cursor.Cursor // the cursor of the oldest item in the log
  123. Newest cursor.Cursor // the cursor of the newest item in the log
  124. Size int // the number of items in the log
  125. }
  126. // logState is a snapshot of the state of the log.
  127. type logState struct {
  128. oldest cursor.Cursor
  129. newest cursor.Cursor
  130. size int
  131. head *logEntry
  132. }
  133. func (st logState) info() Info {
  134. return Info{Oldest: st.oldest, Newest: st.newest, Size: st.size}
  135. }
  136. // state returns a snapshot of the current log contents. The caller may freely
  137. // traverse the internal structure of the list without locking, provided it
  138. // does not modify either the entries or their items.
  139. func (lg *Log) state() logState {
  140. lg.mu.Lock()
  141. defer lg.mu.Unlock()
  142. if lg.head == nil {
  143. return logState{} // empty
  144. }
  145. return logState{
  146. oldest: lg.oldestCursor,
  147. newest: lg.head.item.Cursor,
  148. size: lg.numItems,
  149. head: lg.head,
  150. }
  151. }
  152. // waitStateChange blocks until either ctx ends or the head of the log is
  153. // modified, then returns the state of the log. An error is reported only if
  154. // ctx terminates before head changes.
  155. func (lg *Log) waitStateChange(ctx context.Context) (logState, error) {
  156. lg.mu.Lock()
  157. ch := lg.ready // capture
  158. lg.mu.Unlock()
  159. select {
  160. case <-ctx.Done():
  161. return lg.state(), ctx.Err()
  162. case <-ch:
  163. return lg.state(), nil
  164. }
  165. }
  166. // scanState scans the contents of the log at st. See the Scan method for a
  167. // description of the callback semantics.
  168. func (lg *Log) scanState(st logState, f func(*Item) error) (Info, error) {
  169. info := Info{Oldest: st.oldest, Newest: st.newest, Size: st.size}
  170. for cur := st.head; cur != nil; cur = cur.next {
  171. if err := f(cur.item); err != nil {
  172. if errors.Is(err, ErrStopScan) {
  173. return info, nil
  174. }
  175. return info, err
  176. }
  177. }
  178. return info, nil
  179. }
  180. // updateHead replaces the current head with newHead, signals any waiters, and
  181. // resets the wait signal. The caller must hold log.mu exclusively.
  182. func (lg *Log) updateHead(newHead *logEntry) {
  183. lg.head = newHead
  184. close(lg.ready) // signal
  185. lg.ready = make(chan struct{})
  186. }
  187. // A logEntry is the backbone of the event log queue. Entries are not mutated
  188. // after construction, so it is safe to read item and next without locking.
  189. type logEntry struct {
  190. item *Item
  191. next *logEntry
  192. }