diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index c104439f8..18f098d87 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -9,6 +9,39 @@ // When some message is published, we match it with all queries. If there is a // match, this message will be pushed to all clients, subscribed to that query. // See query subpackage for our implementation. +// +// Due to the blocking send implementation, a single subscriber can freeze an +// entire server by not reading messages before it unsubscribes. To avoid such +// scenario, subscribers must either: +// +// a) make sure they continue to read from the out channel until +// Unsubscribe(All) is called +// +// s.Subscribe(ctx, sub, qry, out) +// go func() { +// for msg := range out { +// // handle msg +// // will exit automatically when out is closed by Unsubscribe(All) +// } +// }() +// s.UnsubscribeAll(ctx, sub) +// +// b) drain the out channel before calling Unsubscribe(All) +// +// s.Subscribe(ctx, sub, qry, out) +// defer func() { +// for range out { +// // drain out to make sure we don't block +// } +// s.UnsubscribeAll(ctx, sub) +// }() +// for msg := range out { +// // handle msg +// if err != nil { +// return err +// } +// } +// package pubsub import (