From d9030570113d178e13fbbb8772cebf1e5e94fbc1 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 14 Jul 2018 14:50:56 +0100 Subject: [PATCH] fix stopping pubsub --- libs/pubsub/pubsub.go | 8 ++++++++ node/node.go | 10 +++++++--- node/node_test.go | 10 ++++++++++ types/event_bus.go | 4 ++-- 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 4280ca1ea..4c0d97e2f 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -163,6 +163,8 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou return nil case <-ctx.Done(): return ctx.Err() + case <-s.Quit(): + return nil } } @@ -190,6 +192,8 @@ func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query) return nil case <-ctx.Done(): return ctx.Err() + case <-s.Quit(): + return nil } } @@ -211,6 +215,8 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { return nil case <-ctx.Done(): return ctx.Err() + case <-s.Quit(): + return nil } } @@ -229,6 +235,8 @@ func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagM return nil case <-ctx.Done(): return ctx.Err() + case <-s.Quit(): + return nil } } diff --git a/node/node.go b/node/node.go index 9f6428ec1..faf33d88a 100644 --- a/node/node.go +++ b/node/node.go @@ -486,9 +486,16 @@ func (n *Node) OnStop() { n.BaseService.OnStop() n.Logger.Info("Stopping Node") + + // first stop the non-reactor services + n.eventBus.Stop() + n.indexerService.Stop() + + // now stop the reactors // TODO: gracefully disconnect from peers. n.sw.Stop() + // finally stop the listeners / external services for _, l := range n.rpcListeners { n.Logger.Info("Closing rpc listener", "listener", l) if err := l.Close(); err != nil { @@ -496,9 +503,6 @@ func (n *Node) OnStop() { } } - n.eventBus.Stop() - n.indexerService.Stop() - if pvsc, ok := n.privValidator.(*privval.SocketPV); ok { if err := pvsc.Stop(); err != nil { n.Logger.Error("Error stopping priv validator socket client", "err", err) diff --git a/node/node_test.go b/node/node_test.go index 80f6f02c2..ca074e1bc 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -2,6 +2,9 @@ package node import ( "context" + "fmt" + "os" + "syscall" "testing" "time" @@ -43,6 +46,13 @@ func TestNodeStartStop(t *testing.T) { select { case <-n.Quit(): case <-time.After(5 * time.Second): + pid := os.Getpid() + p, err := os.FindProcess(pid) + if err != nil { + panic(err) + } + err = p.Signal(syscall.SIGABRT) + fmt.Println(err) t.Fatal("timed out waiting for shutdown") } } diff --git a/types/event_bus.go b/types/event_bus.go index 54fc60c7b..b4965feee 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -4,9 +4,9 @@ import ( "context" "fmt" - tmpubsub "github.com/tendermint/tendermint/libs/pubsub" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ) const defaultCapacity = 0 @@ -49,7 +49,7 @@ func (b *EventBus) OnStart() error { } func (b *EventBus) OnStop() { - b.pubsub.OnStop() + b.pubsub.Stop() } func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {