|
|
@ -177,16 +177,22 @@ func (c *Local) Subscribe( |
|
|
|
if err != nil { |
|
|
|
return nil, errors.Wrap(err, "failed to parse query") |
|
|
|
} |
|
|
|
sub, err := c.EventBus.Subscribe(ctx, subscriber, q) |
|
|
|
if err != nil { |
|
|
|
return nil, errors.Wrap(err, "failed to subscribe") |
|
|
|
} |
|
|
|
|
|
|
|
outCap := 1 |
|
|
|
if len(outCapacity) > 0 { |
|
|
|
outCap = outCapacity[0] |
|
|
|
} |
|
|
|
|
|
|
|
var sub types.Subscription |
|
|
|
if outCap > 0 { |
|
|
|
sub, err = c.EventBus.Subscribe(ctx, subscriber, q, outCap) |
|
|
|
} else { |
|
|
|
sub, err = c.EventBus.SubscribeUnbuffered(ctx, subscriber, q) |
|
|
|
} |
|
|
|
if err != nil { |
|
|
|
return nil, errors.Wrap(err, "failed to subscribe") |
|
|
|
} |
|
|
|
|
|
|
|
outc := make(chan ctypes.ResultEvent, outCap) |
|
|
|
go c.eventsRoutine(sub, subscriber, q, outc) |
|
|
|
|
|
|
|