Browse Source

Merge pull request #1973 from tendermint/bucky/fix-pubsub-stop

fix stopping pubsub
pull/1972/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
722f8a1b6f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 27 additions and 5 deletions
  1. +8
    -0
      libs/pubsub/pubsub.go
  2. +7
    -3
      node/node.go
  3. +10
    -0
      node/node_test.go
  4. +2
    -2
      types/event_bus.go

+ 8
- 0
libs/pubsub/pubsub.go View File

@ -163,6 +163,8 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-s.Quit():
return nil
} }
} }
@ -190,6 +192,8 @@ func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query)
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-s.Quit():
return nil
} }
} }
@ -211,6 +215,8 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-s.Quit():
return nil
} }
} }
@ -229,6 +235,8 @@ func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagM
return nil return nil
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-s.Quit():
return nil
} }
} }


+ 7
- 3
node/node.go View File

@ -486,9 +486,16 @@ func (n *Node) OnStop() {
n.BaseService.OnStop() n.BaseService.OnStop()
n.Logger.Info("Stopping Node") n.Logger.Info("Stopping Node")
// first stop the non-reactor services
n.eventBus.Stop()
n.indexerService.Stop()
// now stop the reactors
// TODO: gracefully disconnect from peers. // TODO: gracefully disconnect from peers.
n.sw.Stop() n.sw.Stop()
// finally stop the listeners / external services
for _, l := range n.rpcListeners { for _, l := range n.rpcListeners {
n.Logger.Info("Closing rpc listener", "listener", l) n.Logger.Info("Closing rpc listener", "listener", l)
if err := l.Close(); err != nil { if err := l.Close(); err != nil {
@ -496,9 +503,6 @@ func (n *Node) OnStop() {
} }
} }
n.eventBus.Stop()
n.indexerService.Stop()
if pvsc, ok := n.privValidator.(*privval.SocketPV); ok { if pvsc, ok := n.privValidator.(*privval.SocketPV); ok {
if err := pvsc.Stop(); err != nil { if err := pvsc.Stop(); err != nil {
n.Logger.Error("Error stopping priv validator socket client", "err", err) n.Logger.Error("Error stopping priv validator socket client", "err", err)


+ 10
- 0
node/node_test.go View File

@ -2,6 +2,9 @@ package node
import ( import (
"context" "context"
"fmt"
"os"
"syscall"
"testing" "testing"
"time" "time"
@ -43,6 +46,13 @@ func TestNodeStartStop(t *testing.T) {
select { select {
case <-n.Quit(): case <-n.Quit():
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
pid := os.Getpid()
p, err := os.FindProcess(pid)
if err != nil {
panic(err)
}
err = p.Signal(syscall.SIGABRT)
fmt.Println(err)
t.Fatal("timed out waiting for shutdown") t.Fatal("timed out waiting for shutdown")
} }
} }

+ 2
- 2
types/event_bus.go View File

@ -4,9 +4,9 @@ import (
"context" "context"
"fmt" "fmt"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
) )
const defaultCapacity = 0 const defaultCapacity = 0
@ -49,7 +49,7 @@ func (b *EventBus) OnStart() error {
} }
func (b *EventBus) OnStop() { func (b *EventBus) OnStop() {
b.pubsub.OnStop()
b.pubsub.Stop()
} }
func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error { func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {


Loading…
Cancel
Save