You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
5.2 KiB

9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
  1. package service
  2. import (
  3. "context"
  4. "errors"
  5. "sync/atomic"
  6. "github.com/tendermint/tendermint/libs/log"
  7. )
  8. var (
  9. // ErrAlreadyStarted is returned when somebody tries to start an already
  10. // running service.
  11. ErrAlreadyStarted = errors.New("already started")
  12. // ErrAlreadyStopped is returned when somebody tries to stop an already
  13. // stopped service (without resetting it).
  14. ErrAlreadyStopped = errors.New("already stopped")
  15. // ErrNotStarted is returned when somebody tries to stop a not running
  16. // service.
  17. ErrNotStarted = errors.New("not started")
  18. )
  19. // Service defines a service that can be started, stopped, and reset.
  20. type Service interface {
  21. // Start is called to start the service, which should run until
  22. // the context terminates. If the service is already running, Start
  23. // must report an error.
  24. Start(context.Context) error
  25. // Return true if the service is running
  26. IsRunning() bool
  27. // String representation of the service
  28. String() string
  29. // Wait blocks until the service is stopped.
  30. Wait()
  31. }
  32. // Implementation describes the implementation that the
  33. // BaseService implementation wraps.
  34. type Implementation interface {
  35. Service
  36. // Called by the Services Start Method
  37. OnStart(context.Context) error
  38. // Called when the service's context is canceled.
  39. OnStop()
  40. }
  41. /*
  42. Classical-inheritance-style service declarations. Services can be started, then
  43. stopped, then optionally restarted.
  44. Users can override the OnStart/OnStop methods. In the absence of errors, these
  45. methods are guaranteed to be called at most once. If OnStart returns an error,
  46. service won't be marked as started, so the user can call Start again.
  47. A call to Reset will panic, unless OnReset is overwritten, allowing
  48. OnStart/OnStop to be called again.
  49. The caller must ensure that Start and Stop are not called concurrently.
  50. It is ok to call Stop without calling Start first.
  51. Typical usage:
  52. type FooService struct {
  53. BaseService
  54. // private fields
  55. }
  56. func NewFooService() *FooService {
  57. fs := &FooService{
  58. // init
  59. }
  60. fs.BaseService = *NewBaseService(log, "FooService", fs)
  61. return fs
  62. }
  63. func (fs *FooService) OnStart(ctx context.Context) error {
  64. fs.BaseService.OnStart() // Always call the overridden method.
  65. // initialize private fields
  66. // start subroutines, etc.
  67. }
  68. func (fs *FooService) OnStop() error {
  69. fs.BaseService.OnStop() // Always call the overridden method.
  70. // close/destroy private fields
  71. // stop subroutines, etc.
  72. }
  73. */
  74. type BaseService struct {
  75. logger log.Logger
  76. name string
  77. started uint32 // atomic
  78. stopped uint32 // atomic
  79. quit chan struct{}
  80. // The "subclass" of BaseService
  81. impl Implementation
  82. }
  83. // NewBaseService creates a new BaseService.
  84. func NewBaseService(logger log.Logger, name string, impl Implementation) *BaseService {
  85. return &BaseService{
  86. logger: logger,
  87. name: name,
  88. quit: make(chan struct{}),
  89. impl: impl,
  90. }
  91. }
  92. // Start starts the Service and calls its OnStart method. An error will be
  93. // returned if the service is already running or stopped. To restart a
  94. // stopped service, call Reset.
  95. func (bs *BaseService) Start(ctx context.Context) error {
  96. if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
  97. if atomic.LoadUint32(&bs.stopped) == 1 {
  98. bs.logger.Error("not starting service; already stopped", "service", bs.name, "impl", bs.impl.String())
  99. atomic.StoreUint32(&bs.started, 0)
  100. return ErrAlreadyStopped
  101. }
  102. bs.logger.Info("starting service", "service", bs.name, "impl", bs.impl.String())
  103. if err := bs.impl.OnStart(ctx); err != nil {
  104. // revert flag
  105. atomic.StoreUint32(&bs.started, 0)
  106. return err
  107. }
  108. go func(ctx context.Context) {
  109. select {
  110. case <-bs.quit:
  111. // someone else explicitly called stop
  112. // and then we shouldn't.
  113. return
  114. case <-ctx.Done():
  115. // if nothing is running, no need to
  116. // shut down again.
  117. if !bs.impl.IsRunning() {
  118. return
  119. }
  120. // the context was cancel and we
  121. // should stop.
  122. if err := bs.Stop(); err != nil {
  123. bs.logger.Error("stopped service",
  124. "err", err.Error(),
  125. "service", bs.name,
  126. "impl", bs.impl.String())
  127. }
  128. bs.logger.Info("stopped service",
  129. "service", bs.name,
  130. "impl", bs.impl.String())
  131. }
  132. }(ctx)
  133. return nil
  134. }
  135. return ErrAlreadyStarted
  136. }
  137. // Stop implements Service by calling OnStop (if defined) and closing quit
  138. // channel. An error will be returned if the service is already stopped.
  139. func (bs *BaseService) Stop() error {
  140. if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) {
  141. if atomic.LoadUint32(&bs.started) == 0 {
  142. bs.logger.Error("not stopping service; not started yet", "service", bs.name, "impl", bs.impl.String())
  143. atomic.StoreUint32(&bs.stopped, 0)
  144. return ErrNotStarted
  145. }
  146. bs.logger.Info("stopping service", "service", bs.name, "impl", bs.impl.String())
  147. bs.impl.OnStop()
  148. close(bs.quit)
  149. return nil
  150. }
  151. return ErrAlreadyStopped
  152. }
  153. // IsRunning implements Service by returning true or false depending on the
  154. // service's state.
  155. func (bs *BaseService) IsRunning() bool {
  156. return atomic.LoadUint32(&bs.started) == 1 && atomic.LoadUint32(&bs.stopped) == 0
  157. }
  158. // Wait blocks until the service is stopped.
  159. func (bs *BaseService) Wait() { <-bs.quit }
  160. // String implements Service by returning a string representation of the service.
  161. func (bs *BaseService) String() string { return bs.name }