From 4734f7d80686e2c74d49d54745b52da4143830f1 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 30 Nov 2021 07:30:35 -0800 Subject: [PATCH] pubsub: Make the queue unwritable after shutdown. (#7316) Prior to this change, shutting down the pubsub server could cause any laggard publishers to race with the shutdown plumbing. Fix that race condition, and plumb in the service context to the runner so that it will respect the external signal directly. Remove now-redundant local shutdown plumbing. --- libs/pubsub/pubsub.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 930dd47bc..3ac2c1940 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -106,7 +106,6 @@ type Server struct { queue chan item done <-chan struct{} // closed when server should exit - stop func() // signal the server to exit pubs sync.RWMutex // excl: shutdown; shared: active publisher exited chan struct{} // server exited @@ -333,15 +332,15 @@ func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events return s.publish(ctx, msg, events) } -// OnStop implements Service.OnStop by shutting down the server. -func (s *Server) OnStop() { s.stop() } +// OnStop implements part of the Service interface. It is a no-op. +func (s *Server) OnStop() {} // Wait implements Service.Wait by blocking until the server has exited, then // yielding to the base service wait. func (s *Server) Wait() { <-s.exited; s.BaseService.Wait() } // OnStart implements Service.OnStart by starting the server. -func (s *Server) OnStart(ctx context.Context) error { s.run(); return nil } +func (s *Server) OnStart(ctx context.Context) error { s.run(ctx); return nil } // OnReset implements Service.OnReset. It has no effect for this service. func (s *Server) OnReset() error { return nil } @@ -363,11 +362,10 @@ func (s *Server) publish(ctx context.Context, data interface{}, events []types.E } } -func (s *Server) run() { +func (s *Server) run(ctx context.Context) { // The server runs until ctx is canceled. - ctx, cancel := context.WithCancel(context.Background()) s.done = ctx.Done() - s.stop = cancel + queue := s.queue // Shutdown monitor: When the context ends, wait for any active publish // calls to exit, then close the queue to signal the sender to exit. @@ -376,6 +374,7 @@ func (s *Server) run() { s.pubs.Lock() defer s.pubs.Unlock() close(s.queue) + s.queue = nil }() s.exited = make(chan struct{}) @@ -383,7 +382,7 @@ func (s *Server) run() { defer close(s.exited) // Sender: Service the queue and forward messages to subscribers. - for it := range s.queue { + for it := range queue { if err := s.send(it.Data, it.Events); err != nil { s.Logger.Error("Error sending event", "err", err) }