Browse Source

pubsub: Make the queue unwritable after shutdown. (#7316)

Prior to this change, shutting down the pubsub server could cause
any laggard publishers to race with the shutdown plumbing.

Fix that race condition, and plumb in the service context to the
runner so that it will respect the external signal directly.
Remove now-redundant local shutdown plumbing.
pull/7346/head
M. J. Fromberger 2 years ago
committed by GitHub
parent
commit
4734f7d806
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 7 additions and 8 deletions
  1. +7
    -8
      libs/pubsub/pubsub.go

+ 7
- 8
libs/pubsub/pubsub.go View File

@ -106,7 +106,6 @@ type Server struct {
queue chan item
done <-chan struct{} // closed when server should exit
stop func() // signal the server to exit
pubs sync.RWMutex // excl: shutdown; shared: active publisher
exited chan struct{} // server exited
@ -333,15 +332,15 @@ func (s *Server) PublishWithEvents(ctx context.Context, msg interface{}, events
return s.publish(ctx, msg, events)
}
// OnStop implements Service.OnStop by shutting down the server.
func (s *Server) OnStop() { s.stop() }
// OnStop implements part of the Service interface. It is a no-op.
func (s *Server) OnStop() {}
// Wait implements Service.Wait by blocking until the server has exited, then
// yielding to the base service wait.
func (s *Server) Wait() { <-s.exited; s.BaseService.Wait() }
// OnStart implements Service.OnStart by starting the server.
func (s *Server) OnStart(ctx context.Context) error { s.run(); return nil }
func (s *Server) OnStart(ctx context.Context) error { s.run(ctx); return nil }
// OnReset implements Service.OnReset. It has no effect for this service.
func (s *Server) OnReset() error { return nil }
@ -363,11 +362,10 @@ func (s *Server) publish(ctx context.Context, data interface{}, events []types.E
}
}
func (s *Server) run() {
func (s *Server) run(ctx context.Context) {
// The server runs until ctx is canceled.
ctx, cancel := context.WithCancel(context.Background())
s.done = ctx.Done()
s.stop = cancel
queue := s.queue
// Shutdown monitor: When the context ends, wait for any active publish
// calls to exit, then close the queue to signal the sender to exit.
@ -376,6 +374,7 @@ func (s *Server) run() {
s.pubs.Lock()
defer s.pubs.Unlock()
close(s.queue)
s.queue = nil
}()
s.exited = make(chan struct{})
@ -383,7 +382,7 @@ func (s *Server) run() {
defer close(s.exited)
// Sender: Service the queue and forward messages to subscribers.
for it := range s.queue {
for it := range queue {
if err := s.send(it.Data, it.Events); err != nil {
s.Logger.Error("Error sending event", "err", err)
}


Loading…
Cancel
Save