diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 930dd47bc..3ac2c1940 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -106,7 +106,6 @@ type Server struct { queue chan item done <-chan struct{} // closed when server should exit - stop func() // signal the server to exit pubs sync.RWMutex // excl: shutdown; shared: active publisher exited chan struct{} // server exited @@ -333,15 +332,15 @@ func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events return s.publish(ctx, msg, events) } -// OnStop implements Service.OnStop by shutting down the server. -func (s *Server) OnStop() { s.stop() } +// OnStop implements part of the Service interface. It is a no-op. +func (s *Server) OnStop() {} // Wait implements Service.Wait by blocking until the server has exited, then // yielding to the base service wait. func (s *Server) Wait() { <-s.exited; s.BaseService.Wait() } // OnStart implements Service.OnStart by starting the server. -func (s *Server) OnStart(ctx context.Context) error { s.run(); return nil } +func (s *Server) OnStart(ctx context.Context) error { s.run(ctx); return nil } // OnReset implements Service.OnReset. It has no effect for this service. func (s *Server) OnReset() error { return nil } @@ -363,11 +362,10 @@ func (s *Server) publish(ctx context.Context, data interface{}, events []types.E } } -func (s *Server) run() { +func (s *Server) run(ctx context.Context) { // The server runs until ctx is canceled. - ctx, cancel := context.WithCancel(context.Background()) s.done = ctx.Done() - s.stop = cancel + queue := s.queue // Shutdown monitor: When the context ends, wait for any active publish // calls to exit, then close the queue to signal the sender to exit. @@ -376,6 +374,7 @@ func (s *Server) run() { s.pubs.Lock() defer s.pubs.Unlock() close(s.queue) + s.queue = nil }() s.exited = make(chan struct{}) @@ -383,7 +382,7 @@ func (s *Server) run() { defer close(s.exited) // Sender: Service the queue and forward messages to subscribers. - for it := range s.queue { + for it := range queue { if err := s.send(it.Data, it.Events); err != nil { s.Logger.Error("Error sending event", "err", err) }