Browse Source

rpc: implement the eventlog defined by ADR 075 (#7825)

Implement the basic cursor and eventlog types described in ADR 075.  Handle
encoding and decoding as strings for compatibility with JSON.

- Add unit tests for the required order and synchronization properties.
- Add hooks for metrics, with one value to be expanded later.
- Update ADR 075 to match the specifics of the implementation so far.
pull/7939/head
M. J. Fromberger 2 years ago
committed by GitHub
parent
commit
705f365bcd
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 942 additions and 33 deletions
  1. +34
    -33
      docs/architecture/adr-075-rpc-subscription.md
  2. +100
    -0
      internal/eventlog/cursor/cursor.go
  3. +141
    -0
      internal/eventlog/cursor/cursor_test.go
  4. +217
    -0
      internal/eventlog/eventlog.go
  5. +222
    -0
      internal/eventlog/eventlog_test.go
  6. +78
    -0
      internal/eventlog/item.go
  7. +39
    -0
      internal/eventlog/metrics.go
  8. +111
    -0
      internal/eventlog/prune.go

+ 34
- 33
docs/architecture/adr-075-rpc-subscription.md View File

@ -2,6 +2,7 @@
## Changelog
- 10-Feb-2022: Updates to reflect implementation.
- 26-Jan-2022: Marked accepted.
- 22-Jan-2022: Updated and expanded (@creachadair).
- 20-Nov-2021: Initial draft (@creachadair).
@ -267,12 +268,12 @@ initial implementation will store the event log in-memory, and the operator
will be given two per-node configuration settings. Note, these names are
provisional:
- `[event-subscription] time-window`: A duration before present during which the
node will retain event items published. Setting this value to zero disables
event subscription.
- `[rpc] event-log-window-size`: A duration before the latest published event,
during which the node will retain event items published. Setting this value
to zero disables event subscription.
- `[event-subscription] max-items`: A maximum number of event items that the
node will retain within the time window. If the number of items exceeds this
- `[rpc] event-log-max-items`: A maximum number of event items that the node
will retain within the time window. If the number of items exceeds this
value, the node discardes the oldest items in the window. Setting this value
to zero means that no limit is imposed on the number of items.
@ -307,11 +308,11 @@ type EventParams struct {
// Return only items after this cursor. If empty, the limit is just
// before the the beginning of the event log.
After string `json:"after_item"`
After string `json:"after"`
// Return only items before this cursor. If empty, the limit is just
// after the head of the event log.
Before string `json:"before_item"`
Before string `json:"before"`
// Wait for up to this long for events to be available.
WaitTime time.Duration `json:"wait_time"`
@ -335,8 +336,8 @@ type Filter struct {
The semantics of the request are as follows: An item in the event log is
**eligible** for a query if:
- It is newer than the `after_item` cursor (if set).
- It is older than the `before_item` cursor (if set).
- It is newer than the `after` cursor (if set).
- It is older than the `before` cursor (if set).
- It matches the filter (if set).
Among the eligible items in the log, the server returns up to `max_results` of
@ -344,13 +345,13 @@ the newest items, in reverse order of cursor. If `max_results` is unset the
server chooses a number to return, and will cap `max_results` at a sensible
limit.
The `wait_time` parameter is used to effect polling. If `before_item` is empty,
the server will wait for up to `wait_time` for additional items, if there are
fewer than `max_results` eligible results in the log. If `wait_time` is zero,
the server will return whatever eligible items are available immediately.
The `wait_time` parameter is used to effect polling. If `before` is empty and
no items are available, the server will wait for up to `wait_time` for matching
items to arrive at the head of the log. If `wait_time` is zero, the server will
return whatever eligible items are available immediately.
If `before_item` non-empty, `wait_time` is ignored: new results are only added
to the head of the log, so there is no need to wait. This allows the client to
If `before` non-empty, `wait_time` is ignored: new results are only added to
the head of the log, so there is no need to wait. This allows the client to
poll for new data, and "page" backward through matching event items. This is
discussed in more detail below.
@ -372,11 +373,11 @@ type EventReply struct {
// The cursor of the oldest item in the log at the time of this reply,
// or "" if the log is empty.
Oldest string `json:"oldest_item"`
Oldest string `json:"oldest"`
// The cursor of the newest item in the log at the time of this reply,
// or "" if the log is empty.
Newest string `json:"newest_item"`
Newest string `json:"newest"`
}
type EventItem struct {
@ -392,9 +393,9 @@ type EventItem struct {
}
```
The `oldest_item` and `newest_item` fields of the reply report the cursors of
the oldest and newest items (of any kind) recorded in the event log at the time
of the reply, or are `""` if the log is empty.
The `oldest` and `newest` fields of the reply report the cursors of the oldest
and newest items (of any kind) recorded in the event log at the time of the
reply, or are `""` if the log is empty.
The `data` field contains the type-specific event datum. The datum carries any
ABCI events that may have been defined.
@ -412,26 +413,26 @@ The semantics of the reply are as follows:
- If `more` is true, there is at least one additional, older item in the
event log that was not returned (in excess of `max_results`).
In this case the client can fetch the next page by setting `before_item`
in a new request, to the cursor of the oldest item fetched (i.e., the
last one in `items`).
In this case the client can fetch the next page by setting `before` in a
new request, to the cursor of the oldest item fetched (i.e., the last one
in `items`).
- Otherwise (if `more` is false), all the matching results have been
reported (pagination is complete).
- The first element of `items` identifies the newest item considered.
Subsequent poll requests can set `after_item` to this cursor to skip
items that were already retrieved.
Subsequent poll requests can set `after` to this cursor to skip items
that were already retrieved.
- If `items` is empty:
- If the `before_item` was set in the request, there are no further
eligible items for this query in the log (pagination is complete).
- If the `before` was set in the request, there are no further eligible
items for this query in the log (pagination is complete).
This is just a safety case; the client can detect this without issuing
another call by consulting the `more` field of the previous reply.
- If the `before_item` was empty in the request, no eligible items were
- If the `before` was empty in the request, no eligible items were
available before the `wait_time` expired. The client may poll again to
wait for more event items.
@ -453,12 +454,11 @@ crashes and connectivity issues:
1. In ordinary operation, clients will **long-poll** the head of the event
log for new events matching their criteria (by setting a `wait_time` and
no `before_item`).
no `before`).
2. If there are more events than the client requested, or if the client needs
to to read older events to recover from a stall or crash, clients will
**page** backward through the event log (by setting `before_item` and
possibly `after_item`).
**page** backward through the event log (by setting `before` and `after`).
- While the new API requires explicit polling by the client, it makes better
use of the node's existing HTTP infrastructure (e.g., connection pools).
@ -479,7 +479,7 @@ crashes and connectivity issues:
The initial implementation will do this by checking the tail of the event log
after each new item is published. If the number of items in the log exceeds
the item limit, it will delete oldest items until the log is under the limit;
then discard any older than the time window before present.
then discard any older than the time window before the latest.
To minimize coordination interference between the publisher (the event bus)
and the subcribers (the `events` service handlers), the event log will be
@ -664,13 +664,14 @@ The following alternative approaches were considered:
- [rpc: remove duplication of events when querying][i7273] (#7273)
[rfc006]: https://github.com/tendermint/tendermint/blob/master/docs/rfc/rfc-006-event-subscription.md
[rpc-service]: https://docs.tendermint.com/master/rpc
[rpc-service]: https://github.com/tendermint/tendermint/blob/master/rpc/openapi/openapi.yaml
[query-grammar]: https://pkg.go.dev/github.com/tendermint/tendermint@master/internal/pubsub/query/syntax
[ws]: https://datatracker.ietf.org/doc/html/rfc6455
[jsonrpc2]: https://www.jsonrpc.org/specification
[nginx]: https://nginx.org/en/docs/
[fcgi]: http://www.mit.edu/~yandros/doc/specs/fcgi-spec.html
[rp-ws]: https://nginx.org/en/docs/http/websocket.html
<!-- markdown-link-check-disable-next-line -->
[ng-xm]: https://www.nginx.com/resources/wiki/extending/
[abci-event]: https://pkg.go.dev/github.com/tendermint/tendermint/abci/types#Event
[rfc001]: https://github.com/tendermint/tendermint/blob/master/docs/rfc/rfc-001-storage-engine.rst


+ 100
- 0
internal/eventlog/cursor/cursor.go View File

@ -0,0 +1,100 @@
// Package cursor implements time-ordered item cursors for an event log.
package cursor
import (
"errors"
"fmt"
"strconv"
"strings"
"time"
)
// A Source produces cursors based on a time index generator and a sequence
// counter. A zero-valued Source is ready for use with defaults as described.
type Source struct {
// This function is called to produce the current time index.
// If nil, it defaults to time.Now().UnixNano().
TimeIndex func() int64
// The current counter value used for sequence number generation. It is
// incremented in-place each time a cursor is generated.
Counter int64
}
func (s *Source) timeIndex() int64 {
if s.TimeIndex == nil {
return time.Now().UnixNano()
}
return s.TimeIndex()
}
func (s *Source) nextCounter() int64 {
s.Counter++
return s.Counter
}
// Cursor produces a fresh cursor from s at the current time index and counter.
func (s *Source) Cursor() Cursor {
return Cursor{
timestamp: uint64(s.timeIndex()),
sequence: uint16(s.nextCounter() & 0xffff),
}
}
// A Cursor is a unique identifier for an item in a time-ordered event log.
// It is safe to copy and compare cursors by value.
type Cursor struct {
timestamp uint64 // ns since Unix epoch
sequence uint16 // sequence number
}
// Before reports whether c is prior to o in time ordering. This comparison
// ignores sequence numbers.
func (c Cursor) Before(o Cursor) bool { return c.timestamp < o.timestamp }
// Diff returns the time duration between c and o. The duration is negative if
// c is before o in time order.
func (c Cursor) Diff(o Cursor) time.Duration {
return time.Duration(c.timestamp) - time.Duration(o.timestamp)
}
// IsZero reports whether c is the zero cursor.
func (c Cursor) IsZero() bool { return c == Cursor{} }
// MarshalText implements the encoding.TextMarshaler interface.
// A zero cursor marshals as "", otherwise the format used by the String method.
func (c Cursor) MarshalText() ([]byte, error) {
if c.IsZero() {
return nil, nil
}
return []byte(c.String()), nil
}
// UnmarshalText implements the encoding.TextUnmarshaler interface.
// An empty text unmarshals without error to a zero cursor.
func (c *Cursor) UnmarshalText(data []byte) error {
if len(data) == 0 {
*c = Cursor{} // set zero
return nil
}
ps := strings.SplitN(string(data), "-", 2)
if len(ps) != 2 {
return errors.New("invalid cursor format")
}
ts, err := strconv.ParseUint(ps[0], 16, 64)
if err != nil {
return fmt.Errorf("invalid timestamp: %w", err)
}
sn, err := strconv.ParseUint(ps[1], 16, 16)
if err != nil {
return fmt.Errorf("invalid sequence: %w", err)
}
c.timestamp = ts
c.sequence = uint16(sn)
return nil
}
// String returns a printable text representation of a cursor.
func (c Cursor) String() string {
return fmt.Sprintf("%016x-%04x", c.timestamp, c.sequence)
}

+ 141
- 0
internal/eventlog/cursor/cursor_test.go View File

@ -0,0 +1,141 @@
package cursor_test
import (
"fmt"
"testing"
"time"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
)
func mustParse(t *testing.T, s string) cursor.Cursor {
t.Helper()
var c cursor.Cursor
if err := c.UnmarshalText([]byte(s)); err != nil {
t.Fatalf("Unmarshal %q: unexpected error: %v", s, err)
}
return c
}
func TestSource_counter(t *testing.T) {
src := &cursor.Source{
TimeIndex: func() int64 { return 255 },
}
for i := 1; i <= 5; i++ {
want := fmt.Sprintf("00000000000000ff-%04x", i)
got := src.Cursor().String()
if got != want {
t.Errorf("Cursor %d: got %q, want %q", i, got, want)
}
}
}
func TestSource_timeIndex(t *testing.T) {
times := []int64{0, 1, 100, 65535, 0x76543210fecdba98}
src := &cursor.Source{
TimeIndex: func() int64 {
out := times[0]
times = append(times[1:], out)
return out
},
Counter: 160,
}
results := []string{
"0000000000000000-00a1",
"0000000000000001-00a2",
"0000000000000064-00a3",
"000000000000ffff-00a4",
"76543210fecdba98-00a5",
}
for i, want := range results {
if got := src.Cursor().String(); got != want {
t.Errorf("Cursor %d: got %q, want %q", i+1, got, want)
}
}
}
func TestCursor_roundTrip(t *testing.T) {
const text = `0123456789abcdef-fce9`
c := mustParse(t, text)
if got := c.String(); got != text {
t.Errorf("Wrong string format: got %q, want %q", got, text)
}
cmp, err := c.MarshalText()
if err != nil {
t.Fatalf("Marshal %+v failed: %v", c, err)
}
if got := string(cmp); got != text {
t.Errorf("Wrong text format: got %q, want %q", got, text)
}
}
func TestCursor_ordering(t *testing.T) {
// Condition: text1 precedes text2 in time order.
// Condition: text2 has an earlier sequence than text1.
const zero = ""
const text1 = "0000000012345678-0005"
const text2 = "00000000fecdeba9-0002"
zc := mustParse(t, zero)
c1 := mustParse(t, text1)
c2 := mustParse(t, text2)
// Confirm for all pairs that string order respects time order.
pairs := []struct {
t1, t2 string
c1, c2 cursor.Cursor
}{
{zero, zero, zc, zc},
{zero, text1, zc, c1},
{zero, text2, zc, c2},
{text1, zero, c1, zc},
{text1, text1, c1, c1},
{text1, text2, c1, c2},
{text2, zero, c2, zc},
{text2, text1, c2, c1},
{text2, text2, c2, c2},
}
for _, pair := range pairs {
want := pair.t1 < pair.t2
if got := pair.c1.Before(pair.c2); got != want {
t.Errorf("(%s).Before(%s): got %v, want %v", pair.t1, pair.t2, got, want)
}
}
}
func TestCursor_IsZero(t *testing.T) {
tests := []struct {
text string
want bool
}{
{"", true},
{"0000000000000000-0000", true},
{"0000000000000001-0000", false},
{"0000000000000000-0001", false},
{"0000000000000001-0001", false},
}
for _, test := range tests {
c := mustParse(t, test.text)
if got := c.IsZero(); got != test.want {
t.Errorf("IsZero(%q): got %v, want %v", test.text, got, test.want)
}
}
}
func TestCursor_Diff(t *testing.T) {
const time1 = 0x1ac0193001
const time2 = 0x0ac0193001
text1 := fmt.Sprintf("%016x-0001", time1)
text2 := fmt.Sprintf("%016x-0005", time2)
want := time.Duration(time1 - time2)
c1 := mustParse(t, text1)
c2 := mustParse(t, text2)
got := c1.Diff(c2)
if got != want {
t.Fatalf("Diff %q - %q: got %v, want %v", text1, text2, got, want)
}
}

+ 217
- 0
internal/eventlog/eventlog.go View File

@ -0,0 +1,217 @@
// Package eventlog defines a reverse time-ordered log of events over a sliding
// window of time before the most recent item in the log.
//
// New items are added to the head of the log (the newest end), and items that
// fall outside the designated window are pruned from its tail (the oldest).
// Items within the log are indexed by lexicographically-ordered cursors.
package eventlog
import (
"context"
"errors"
"sync"
"time"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/types"
)
// A Log is a reverse time-ordered log of events in a sliding window of time
// before the newest item. Use Add to add new items to the front (head) of the
// log, and Scan or WaitScan to traverse the current contents of the log.
//
// After construction, a *Log is safe for concurrent access by one writer and
// any number of readers.
type Log struct {
// These values do not change after construction.
windowSize time.Duration
maxItems int
numItemsGauge gauge
// Protects access to the fields below. Lock to modify the values of these
// fields, or to read or snapshot the values.
mu sync.Mutex
numItems int // total number of items in the log
oldestCursor cursor.Cursor // cursor of the oldest item
head *logEntry // pointer to the newest item
ready chan struct{} // closed when head changes
source cursor.Source // generator of cursors
}
// New constructs a new empty log with the given settings.
func New(opts LogSettings) (*Log, error) {
if opts.WindowSize <= 0 {
return nil, errors.New("window size must be positive")
}
lg := &Log{
windowSize: opts.WindowSize,
maxItems: opts.MaxItems,
numItemsGauge: discard{},
ready: make(chan struct{}),
source: opts.Source,
}
if opts.Metrics != nil {
lg.numItemsGauge = opts.Metrics.numItemsGauge
}
return lg, nil
}
// Add adds a new item to the front of the log. If necessary, the log is pruned
// to fit its constraints on size and age. Add blocks until both steps are done.
//
// Any error reported by Add arises from pruning; the new item was added to the
// log regardless whether an error occurs.
func (lg *Log) Add(etype string, data types.EventData) error {
lg.mu.Lock()
head := &logEntry{
item: newItem(lg.source.Cursor(), etype, data),
next: lg.head,
}
lg.numItems++
lg.updateHead(head)
size := lg.numItems
age := head.item.Cursor.Diff(lg.oldestCursor)
// If the log requires pruning, do the pruning step outside the lock. This
// permits readers to continue to make progress while we're working.
lg.mu.Unlock()
return lg.checkPrune(head, size, age)
}
// Scan scans the current contents of the log, calling f with each item until
// all items are visited or f reports an error. If f returns ErrStopScan, Scan
// returns nil, otherwise it returns the error reported by f.
//
// The Info value returned is valid even if Scan reports an error.
func (lg *Log) Scan(f func(*Item) error) (Info, error) {
return lg.scanState(lg.state(), f)
}
// WaitScan blocks until the cursor of the frontmost log item is different from
// c, then executes a Scan on the contents of the log. If ctx ends before the
// head is updated, WaitScan returns an error without calling f.
//
// The Info value returned is valid even if WaitScan reports an error.
func (lg *Log) WaitScan(ctx context.Context, c cursor.Cursor, f func(*Item) error) (Info, error) {
st := lg.state()
for st.head == nil || st.head.item.Cursor == c {
var err error
st, err = lg.waitStateChange(ctx)
if err != nil {
return st.info(), err
}
}
return lg.scanState(st, f)
}
// Info returns the current state of the log.
func (lg *Log) Info() Info { return lg.state().info() }
// ErrStopScan is returned by a Scan callback to signal that scanning should be
// terminated without error.
var ErrStopScan = errors.New("stop scanning")
// ErrLogPruned is returned by Add to signal that at least some events within
// the time window were discarded by pruning in excess of the size limit.
// This error may be wrapped, use errors.Is to test for it.
var ErrLogPruned = errors.New("log pruned")
// LogSettings configure the construction of an event log.
type LogSettings struct {
// The size of the time window measured in time before the newest item.
// This value must be positive.
WindowSize time.Duration
// The maximum number of items that will be retained in memory within the
// designated time window. A value ≤ 0 imposes no limit, otherwise items in
// excess of this number will be dropped from the log.
MaxItems int
// The cursor source to use for log entries. If not set, use wallclock time.
Source cursor.Source
// If non-nil, exported metrics to update. If nil, metrics are discarded.
Metrics *Metrics
}
// Info records the current state of the log at the time of a scan operation.
type Info struct {
Oldest cursor.Cursor // the cursor of the oldest item in the log
Newest cursor.Cursor // the cursor of the newest item in the log
Size int // the number of items in the log
}
// logState is a snapshot of the state of the log.
type logState struct {
oldest cursor.Cursor
newest cursor.Cursor
size int
head *logEntry
}
func (st logState) info() Info {
return Info{Oldest: st.oldest, Newest: st.newest, Size: st.size}
}
// state returns a snapshot of the current log contents. The caller may freely
// traverse the internal structure of the list without locking, provided it
// does not modify either the entries or their items.
func (lg *Log) state() logState {
lg.mu.Lock()
defer lg.mu.Unlock()
if lg.head == nil {
return logState{} // empty
}
return logState{
oldest: lg.oldestCursor,
newest: lg.head.item.Cursor,
size: lg.numItems,
head: lg.head,
}
}
// waitStateChange blocks until either ctx ends or the head of the log is
// modified, then returns the state of the log. An error is reported only if
// ctx terminates before head changes.
func (lg *Log) waitStateChange(ctx context.Context) (logState, error) {
lg.mu.Lock()
ch := lg.ready // capture
lg.mu.Unlock()
select {
case <-ctx.Done():
return lg.state(), ctx.Err()
case <-ch:
return lg.state(), nil
}
}
// scanState scans the contents of the log at st. See the Scan method for a
// description of the callback semantics.
func (lg *Log) scanState(st logState, f func(*Item) error) (Info, error) {
info := Info{Oldest: st.oldest, Newest: st.newest, Size: st.size}
for cur := st.head; cur != nil; cur = cur.next {
if err := f(cur.item); err != nil {
if errors.Is(err, ErrStopScan) {
return info, nil
}
return info, err
}
}
return info, nil
}
// updateHead replaces the current head with newHead, signals any waiters, and
// resets the wait signal. The caller must hold log.mu exclusively.
func (lg *Log) updateHead(newHead *logEntry) {
lg.head = newHead
close(lg.ready) // signal
lg.ready = make(chan struct{})
}
// A logEntry is the backbone of the event log queue. Entries are not mutated
// after construction, so it is safe to read item and next without locking.
type logEntry struct {
item *Item
next *logEntry
}

+ 222
- 0
internal/eventlog/eventlog_test.go View File

@ -0,0 +1,222 @@
package eventlog_test
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/google/go-cmp/cmp"
"github.com/tendermint/tendermint/internal/eventlog"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/types"
)
// fakeTime is a fake clock to use to control cursor assignment.
// The timeIndex method reports the current "time" and advance manually updates
// the apparent time.
type fakeTime struct{ now int64 }
func newFakeTime(init int64) *fakeTime { return &fakeTime{now: init} }
func (f *fakeTime) timeIndex() int64 { return f.now }
func (f *fakeTime) advance(d time.Duration) { f.now += int64(d) }
// eventData is a placeholder event data implementation for testing.
type eventData string
func (eventData) TypeTag() string { return "eventData" }
func TestNewError(t *testing.T) {
lg, err := eventlog.New(eventlog.LogSettings{})
if err == nil {
t.Fatalf("New: got %+v, wanted error", lg)
} else {
t.Logf("New: got expected error: %v", err)
}
}
func TestPruneTime(t *testing.T) {
clk := newFakeTime(0)
// Construct a log with a 60-second time window.
lg, err := eventlog.New(eventlog.LogSettings{
WindowSize: 60 * time.Second,
Source: cursor.Source{
TimeIndex: clk.timeIndex,
},
})
if err != nil {
t.Fatalf("New unexpectedly failed: %v", err)
}
// Add events up to the time window, at seconds 0, 15, 30, 45, 60.
// None of these should be pruned (yet).
var want []string // cursor strings
for i := 1; i <= 5; i++ {
want = append(want, fmt.Sprintf("%016x-%04x", clk.timeIndex(), i))
mustAdd(t, lg, "test-event", eventData("whatever"))
clk.advance(15 * time.Second)
}
// time now: 75 sec.
// Verify that all the events we added are present.
got := cursors(t, lg)
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("Cursors before pruning: (-want, +got)\n%s", diff)
}
// Add an event past the end of the window at second 90, and verify that
// this triggered an age-based prune of the oldest events (0, 15) that are
// outside the 60-second window.
clk.advance(15 * time.Second) // time now: 90 sec.
want = append(want[2:], fmt.Sprintf("%016x-%04x", clk.timeIndex(), 6))
mustAdd(t, lg, "test-event", eventData("extra"))
got = cursors(t, lg)
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("Cursors after pruning: (-want, +got)\n%s", diff)
}
}
// Run a publisher and concurrent subscribers to tickle the race detector with
// concurrent add and scan operations.
func TestConcurrent(t *testing.T) {
defer leaktest.Check(t)
if testing.Short() {
t.Skip("Skipping concurrency exercise because -short is set")
}
lg, err := eventlog.New(eventlog.LogSettings{
WindowSize: 30 * time.Second,
})
if err != nil {
t.Fatalf("New unexpectedly failed: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
// Publisher: Add events and handle expirations.
wg.Add(1)
go func() {
defer wg.Done()
tick := time.NewTimer(0)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case t := <-tick.C:
_ = lg.Add("test-event", eventData(t.Format(time.RFC3339Nano)))
tick.Reset(time.Duration(rand.Intn(50)) * time.Millisecond)
}
}
}()
// Subscribers: Wait for new events at the head of the queue. This
// simulates the typical operation of a subscriber by waiting for the head
// cursor to change and then scanning down toward the unconsumed item.
const numSubs = 16
for i := 0; i < numSubs; i++ {
task := i
wg.Add(1)
go func() {
defer wg.Done()
tick := time.NewTimer(0)
var cur cursor.Cursor
for {
// Simulate the subscriber being busy with other things.
select {
case <-ctx.Done():
return
case <-tick.C:
tick.Reset(time.Duration(rand.Intn(150)) * time.Millisecond)
}
// Wait for new data to arrive.
info, err := lg.WaitScan(ctx, cur, func(itm *eventlog.Item) error {
if itm.Cursor == cur {
return eventlog.ErrStopScan
}
return nil
})
if err != nil {
if !errors.Is(err, context.Canceled) {
t.Errorf("Wait scan for task %d failed: %v", task, err)
}
return
}
cur = info.Newest
}
}()
}
time.AfterFunc(2*time.Second, cancel)
wg.Wait()
}
func TestPruneSize(t *testing.T) {
const maxItems = 25
lg, err := eventlog.New(eventlog.LogSettings{
WindowSize: 60 * time.Second,
MaxItems: maxItems,
})
if err != nil {
t.Fatalf("New unexpectedly failed: %v", err)
}
// Add a lot of items to the log and verify that we never exceed the
// specified cap.
for i := 0; i < 60; i++ {
mustAdd(t, lg, "test-event", eventData(strconv.Itoa(i+1)))
if got := lg.Info().Size; got > maxItems {
t.Errorf("After add %d: log size is %d, want ≤ %d", i+1, got, maxItems)
}
}
}
// mustAdd adds a single event to lg. If Add reports an error other than for
// pruning, the test fails; otherwise the error is returned.
func mustAdd(t *testing.T, lg *eventlog.Log, etype string, data types.EventData) {
t.Helper()
err := lg.Add(etype, data)
if err != nil && !errors.Is(err, eventlog.ErrLogPruned) {
t.Fatalf("Add %q failed: %v", etype, err)
}
}
// cursors extracts the cursors from lg in ascending order of time.
func cursors(t *testing.T, lg *eventlog.Log) []string {
t.Helper()
var cursors []string
if _, err := lg.Scan(func(itm *eventlog.Item) error {
cursors = append(cursors, itm.Cursor.String())
return nil
}); err != nil {
t.Fatalf("Scan failed: %v", err)
}
reverse(cursors) // put in forward-time order for comparison
return cursors
}
func reverse(ss []string) {
for i, j := 0, len(ss)-1; i < j; {
ss[i], ss[j] = ss[j], ss[i]
i++
j--
}
}

+ 78
- 0
internal/eventlog/item.go View File

@ -0,0 +1,78 @@
package eventlog
import (
"strings"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/types"
)
// Cached constants for the pieces of reserved event names.
var (
tmTypeTag string
tmTypeKey string
)
func init() {
parts := strings.SplitN(types.EventTypeKey, ".", 2)
if len(parts) != 2 {
panic("invalid event type key: " + types.EventTypeKey)
}
tmTypeTag = parts[0]
tmTypeKey = parts[1]
}
// ABCIEventer is an optional extension interface that may be implemented by
// event data types, to expose ABCI metadata to the event log. If an event item
// does not implement this interface, it is presumed to have no ABCI metadata.
type ABCIEventer interface {
// Return any ABCI events metadata the receiver contains.
// The reported slice must not contain a type (tm.event) record, since some
// events share the same structure among different event types.
ABCIEvents() []abci.Event
}
// An Item is a single event item.
type Item struct {
Cursor cursor.Cursor
Type string
Data types.EventData
Events []abci.Event
}
// newItem constructs a new item with the specified cursor, type, and data.
func newItem(cursor cursor.Cursor, etype string, data types.EventData) *Item {
return &Item{Cursor: cursor, Type: etype, Data: data, Events: makeEvents(etype, data)}
}
// makeEvents returns a slice of ABCI events comprising the type tag along with
// any internal events exported by the data value.
func makeEvents(etype string, data types.EventData) []abci.Event {
base := []abci.Event{{
Type: tmTypeTag,
Attributes: []abci.EventAttribute{{
Key: tmTypeKey, Value: etype,
}},
}}
if evt, ok := data.(ABCIEventer); ok {
return append(base, evt.ABCIEvents()...)
}
return base
}
// FindType reports whether events contains a tm.event event, and if so returns
// its value, which is the type of the underlying event item.
func FindType(events []abci.Event) (string, bool) {
for _, evt := range events {
if evt.Type != tmTypeTag {
continue
}
for _, attr := range evt.Attributes {
if attr.Key == tmTypeKey {
return attr.Value, true
}
}
}
return "", false
}

+ 39
- 0
internal/eventlog/metrics.go View File

@ -0,0 +1,39 @@
package eventlog
import (
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
// gauge is the subset of the Prometheus gauge interface used here.
type gauge interface {
Set(float64)
}
// Metrics define the metrics exported by the eventlog package.
type Metrics struct {
numItemsGauge gauge
}
// discard is a no-op implementation of the gauge interface.
type discard struct{}
func (discard) Set(float64) {}
const eventlogSubsystem = "eventlog"
// PrometheusMetrics returns a collection of eventlog metrics for Prometheus.
func PrometheusMetrics(ns string, fields ...string) *Metrics {
var labels []string
for i := 0; i < len(fields); i += 2 {
labels = append(labels, fields[i])
}
return &Metrics{
numItemsGauge: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: ns,
Subsystem: eventlogSubsystem,
Name: "num_items",
Help: "Number of items currently resident in the event log.",
}, labels).With(fields...),
}
}

+ 111
- 0
internal/eventlog/prune.go View File

@ -0,0 +1,111 @@
package eventlog
import (
"time"
)
// checkPrune checks whether the log has exceeded its boundaries of size or
// age, and if so prunes the log and updates the head.
func (lg *Log) checkPrune(head *logEntry, size int, age time.Duration) error {
// To avoid potentially re-pruning for every event, don't trigger an age
// prune until we're at least this far beyond the designated size.
const windowSlop = 30 * time.Second
if age < (lg.windowSize+windowSlop) && (lg.maxItems <= 0 || size <= lg.maxItems) {
lg.numItemsGauge.Set(float64(lg.numItems))
return nil // no pruning is needed
}
var newState logState
var err error
switch {
case lg.maxItems > 0 && size > lg.maxItems:
// We exceeded the size cap. In this case, age does not matter: count off
// the newest items and drop the unconsumed tail. Note that we prune by a
// fraction rather than an absolute amount so that we only have to prune
// for size occasionally.
// TODO(creachadair): We may want to spill dropped events to secondary
// storage rather than dropping them. The size cap is meant as a safety
// valve against unexpected extremes, but if a network has "expected"
// spikes that nevertheless exceed any safe buffer size (e.g., Osmosis
// epochs), we may want to have a fallback so that we don't lose events
// that would otherwise fall within the window.
newSize := 3 * size / 4
newState, err = lg.pruneSize(head, newSize)
default:
// We did not exceed the size cap, but some items are too old.
newState = lg.pruneAge(head)
}
// Note that when we update the head after pruning, we do not need to signal
// any waiters; pruning never adds new material to the log so anyone waiting
// should continue doing so until a subsequent Add occurs.
lg.mu.Lock()
defer lg.mu.Unlock()
lg.numItems = newState.size
lg.numItemsGauge.Set(float64(newState.size))
lg.oldestCursor = newState.oldest
lg.head = newState.head
return err
}
// pruneSize returns a new log state by pruning head to newSize.
// Precondition: newSize ≤ len(head).
func (lg *Log) pruneSize(head *logEntry, newSize int) (logState, error) {
// Special case for size 0 to simplify the logic below.
if newSize == 0 {
return logState{}, ErrLogPruned // drop everything
}
// Initialize: New head has the same item as the old head.
first := &logEntry{item: head.item} // new head
last := first // new tail (last copied cons)
cur := head.next
for i := 1; i < newSize; i++ {
cp := &logEntry{item: cur.item}
last.next = cp
last = cp
cur = cur.next
}
var err error
if head.item.Cursor.Diff(last.item.Cursor) <= lg.windowSize {
err = ErrLogPruned
}
return logState{
oldest: last.item.Cursor,
newest: first.item.Cursor,
size: newSize,
head: first,
}, err
}
// pruneAge returns a new log state by pruning items older than the window
// prior to the head element.
func (lg *Log) pruneAge(head *logEntry) logState {
first := &logEntry{item: head.item}
last := first
size := 1
for cur := head.next; cur != nil; cur = cur.next {
diff := head.item.Cursor.Diff(cur.item.Cursor)
if diff > lg.windowSize {
break // all remaining items are older than the window
}
cp := &logEntry{item: cur.item}
last.next = cp
last = cp
size++
}
return logState{
oldest: last.item.Cursor,
newest: first.item.Cursor,
size: size,
head: first,
}
}

Loading…
Cancel
Save