|
@ -9,6 +9,39 @@ |
|
|
// When some message is published, we match it with all queries. If there is a
|
|
|
// When some message is published, we match it with all queries. If there is a
|
|
|
// match, this message will be pushed to all clients, subscribed to that query.
|
|
|
// match, this message will be pushed to all clients, subscribed to that query.
|
|
|
// See query subpackage for our implementation.
|
|
|
// See query subpackage for our implementation.
|
|
|
|
|
|
//
|
|
|
|
|
|
// Due to the blocking send implementation, a single subscriber can freeze an
|
|
|
|
|
|
// entire server by not reading messages before it unsubscribes. To avoid such
|
|
|
|
|
|
// scenario, subscribers must either:
|
|
|
|
|
|
//
|
|
|
|
|
|
// a) make sure they continue to read from the out channel until
|
|
|
|
|
|
// Unsubscribe(All) is called
|
|
|
|
|
|
//
|
|
|
|
|
|
// s.Subscribe(ctx, sub, qry, out)
|
|
|
|
|
|
// go func() {
|
|
|
|
|
|
// for msg := range out {
|
|
|
|
|
|
// // handle msg
|
|
|
|
|
|
// // will exit automatically when out is closed by Unsubscribe(All)
|
|
|
|
|
|
// }
|
|
|
|
|
|
// }()
|
|
|
|
|
|
// s.UnsubscribeAll(ctx, sub)
|
|
|
|
|
|
//
|
|
|
|
|
|
// b) drain the out channel before calling Unsubscribe(All)
|
|
|
|
|
|
//
|
|
|
|
|
|
// s.Subscribe(ctx, sub, qry, out)
|
|
|
|
|
|
// defer func() {
|
|
|
|
|
|
// for range out {
|
|
|
|
|
|
// // drain out to make sure we don't block
|
|
|
|
|
|
// }
|
|
|
|
|
|
// s.UnsubscribeAll(ctx, sub)
|
|
|
|
|
|
// }()
|
|
|
|
|
|
// for msg := range out {
|
|
|
|
|
|
// // handle msg
|
|
|
|
|
|
// if err != nil {
|
|
|
|
|
|
// return err
|
|
|
|
|
|
// }
|
|
|
|
|
|
// }
|
|
|
|
|
|
//
|
|
|
package pubsub |
|
|
package pubsub |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|