Browse Source

fix stopping pubsub

pull/1973/head
Ethan Buchman 6 years ago
parent
commit
d903057011
4 changed files with 27 additions and 5 deletions
  1. +8
    -0
      libs/pubsub/pubsub.go
  2. +7
    -3
      node/node.go
  3. +10
    -0
      node/node_test.go
  4. +2
    -2
      types/event_bus.go

+ 8
- 0
libs/pubsub/pubsub.go View File

@ -163,6 +163,8 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou
return nil
case <-ctx.Done():
return ctx.Err()
case <-s.Quit():
return nil
}
}
@ -190,6 +192,8 @@ func (s *Server) Unsubscribe(ctx context.Context, clientID string, query Query)
return nil
case <-ctx.Done():
return ctx.Err()
case <-s.Quit():
return nil
}
}
@ -211,6 +215,8 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
return nil
case <-ctx.Done():
return ctx.Err()
case <-s.Quit():
return nil
}
}
@ -229,6 +235,8 @@ func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagM
return nil
case <-ctx.Done():
return ctx.Err()
case <-s.Quit():
return nil
}
}


+ 7
- 3
node/node.go View File

@ -486,9 +486,16 @@ func (n *Node) OnStop() {
n.BaseService.OnStop()
n.Logger.Info("Stopping Node")
// first stop the non-reactor services
n.eventBus.Stop()
n.indexerService.Stop()
// now stop the reactors
// TODO: gracefully disconnect from peers.
n.sw.Stop()
// finally stop the listeners / external services
for _, l := range n.rpcListeners {
n.Logger.Info("Closing rpc listener", "listener", l)
if err := l.Close(); err != nil {
@ -496,9 +503,6 @@ func (n *Node) OnStop() {
}
}
n.eventBus.Stop()
n.indexerService.Stop()
if pvsc, ok := n.privValidator.(*privval.SocketPV); ok {
if err := pvsc.Stop(); err != nil {
n.Logger.Error("Error stopping priv validator socket client", "err", err)


+ 10
- 0
node/node_test.go View File

@ -2,6 +2,9 @@ package node
import (
"context"
"fmt"
"os"
"syscall"
"testing"
"time"
@ -43,6 +46,13 @@ func TestNodeStartStop(t *testing.T) {
select {
case <-n.Quit():
case <-time.After(5 * time.Second):
pid := os.Getpid()
p, err := os.FindProcess(pid)
if err != nil {
panic(err)
}
err = p.Signal(syscall.SIGABRT)
fmt.Println(err)
t.Fatal("timed out waiting for shutdown")
}
}

+ 2
- 2
types/event_bus.go View File

@ -4,9 +4,9 @@ import (
"context"
"fmt"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
)
const defaultCapacity = 0
@ -49,7 +49,7 @@ func (b *EventBus) OnStart() error {
}
func (b *EventBus) OnStop() {
b.pubsub.OnStop()
b.pubsub.Stop()
}
func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {


Loading…
Cancel
Save