Browse Source

Simplify Service/Reactor pattern

pull/119/head
Jae Kwon 10 years ago
parent
commit
e7c1febb65
14 changed files with 67 additions and 56 deletions
  1. +4
    -2
      blockchain/pool.go
  2. +4
    -2
      blockchain/reactor.go
  3. +17
    -30
      common/service.go
  4. +4
    -2
      consensus/reactor.go
  5. +4
    -2
      consensus/state.go
  6. +4
    -2
      events/events.go
  7. +2
    -2
      mempool/reactor.go
  8. +4
    -2
      p2p/addrbook.go
  9. +4
    -2
      p2p/connection.go
  10. +4
    -2
      p2p/listener.go
  11. +4
    -2
      p2p/peer.go
  12. +4
    -2
      p2p/pex_reactor.go
  13. +4
    -2
      p2p/switch.go
  14. +4
    -2
      rpc/server/handlers.go

+ 4
- 2
blockchain/pool.go View File

@ -68,12 +68,14 @@ func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- s
return bp return bp
} }
func (pool *BlockPool) AfterStart() {
func (pool *BlockPool) OnStart() {
pool.BaseService.OnStart()
pool.repeater = NewRepeatTimer("", requestIntervalMS*time.Millisecond) pool.repeater = NewRepeatTimer("", requestIntervalMS*time.Millisecond)
go pool.run() go pool.run()
} }
func (pool *BlockPool) AfterStop() {
func (pool *BlockPool) OnStop() {
pool.BaseService.OnStop()
pool.repeater.Stop() pool.repeater.Stop()
} }


+ 4
- 2
blockchain/reactor.go View File

@ -76,14 +76,16 @@ func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *Blockc
return bcR return bcR
} }
func (bcR *BlockchainReactor) AfterStart() {
func (bcR *BlockchainReactor) OnStart() {
bcR.BaseReactor.OnStart()
if bcR.sync { if bcR.sync {
bcR.pool.Start() bcR.pool.Start()
go bcR.poolRoutine() go bcR.poolRoutine()
} }
} }
func (bcR *BlockchainReactor) AfterStop() {
func (bcR *BlockchainReactor) OnStop() {
bcR.BaseReactor.OnStop()
bcR.pool.Stop() bcR.pool.Stop()
} }


+ 17
- 30
common/service.go View File

@ -2,7 +2,7 @@
Classical-inheritance-style service declarations. Classical-inheritance-style service declarations.
Services can be started, then stopped. Services can be started, then stopped.
Users can override the AfterStart/AfterStop methods.
Users can override the OnStart/OnStop methods.
These methods are guaranteed to be called at most once. These methods are guaranteed to be called at most once.
Caller must ensure that Start() and Stop() are not called concurrently. Caller must ensure that Start() and Stop() are not called concurrently.
It is ok to call Stop() without calling Start() first. It is ok to call Stop() without calling Start() first.
@ -19,16 +19,18 @@ func NewFooService() *FooService {
fs := &FooService{ fs := &FooService{
// init // init
} }
fs.BaseService = *BaseService(log, "FooService", fs)
fs.BaseService = *NewBaseService(log, "FooService", fs)
return fs return fs
} }
func (fs *FooService) AfterStart() {
func (fs *FooService) OnStart() {
fs.BaseService.OnStart() // Always call the overridden method.
// initialize private fields // initialize private fields
// start subroutines, etc. // start subroutines, etc.
} }
func (fs *FooService) AfterStart() {
func (fs *FooService) OnStop() {
fs.BaseService.OnStop() // Always call the overridden method.
// close/destroy private fields // close/destroy private fields
// stop subroutines, etc. // stop subroutines, etc.
} }
@ -41,12 +43,10 @@ import "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tender
type Service interface { type Service interface {
Start() bool Start() bool
BeforeStart()
AfterStart()
OnStart()
Stop() bool Stop() bool
BeforeStop()
AfterStop()
OnStop()
IsRunning() bool IsRunning() bool
@ -80,8 +80,7 @@ func (bs *BaseService) Start() bool {
} else { } else {
bs.log.Notice(Fmt("Starting %v", bs.name), "impl", bs.impl) bs.log.Notice(Fmt("Starting %v", bs.name), "impl", bs.impl)
} }
bs.impl.BeforeStart()
bs.impl.AfterStart()
bs.impl.OnStart()
return true return true
} else { } else {
bs.log.Info(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl) bs.log.Info(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl)
@ -90,17 +89,13 @@ func (bs *BaseService) Start() bool {
} }
// Implements Service // Implements Service
func (bs *BaseService) BeforeStart() {}
// Implements Service
func (bs *BaseService) AfterStart() {}
func (bs *BaseService) OnStart() {}
// Implements Service // Implements Service
func (bs *BaseService) Stop() bool { func (bs *BaseService) Stop() bool {
if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) { if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) {
bs.log.Notice(Fmt("Stopping %v", bs.name), "impl", bs.impl) bs.log.Notice(Fmt("Stopping %v", bs.name), "impl", bs.impl)
bs.impl.BeforeStop()
bs.impl.AfterStop()
bs.impl.OnStop()
return true return true
} else { } else {
bs.log.Notice(Fmt("Not stopping %v", bs.name), "impl", bs.impl) bs.log.Notice(Fmt("Not stopping %v", bs.name), "impl", bs.impl)
@ -109,10 +104,7 @@ func (bs *BaseService) Stop() bool {
} }
// Implements Service // Implements Service
func (bs *BaseService) BeforeStop() {}
// Implements Service
func (bs *BaseService) AfterStop() {}
func (bs *BaseService) OnStop() {}
// Implements Service // Implements Service
func (bs *BaseService) IsRunning() bool { func (bs *BaseService) IsRunning() bool {
@ -138,17 +130,12 @@ func NewQuitService(log log15.Logger, name string, impl Service) *QuitService {
} }
} }
// Init .Quit in BeforeStart such that AfterStart of impls have access to Quit.
// NOTE: When overriding BeforeStart, call QuitService.BeforeStart() manually.
func (qs *QuitService) BeforeStart() {
// NOTE: when overriding OnStart, must call .QuitService.OnStart().
func (qs *QuitService) OnStart() {
qs.Quit = make(chan struct{}) qs.Quit = make(chan struct{})
} }
// Close .Quit after Stop/BeforeStop/AfterStop
func (qs *QuitService) Stop() bool {
res := qs.BaseService.Stop()
if res {
close(qs.Quit)
}
return res
// NOTE: when overriding OnStop, must call .QuitService.OnStop().
func (qs *QuitService) OnStop() {
close(qs.Quit)
} }

+ 4
- 2
consensus/reactor.go View File

@ -49,15 +49,17 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockSto
return conR return conR
} }
func (conR *ConsensusReactor) AfterStart() {
func (conR *ConsensusReactor) OnStart() {
log.Notice("ConsensusReactor ", "fastSync", conR.fastSync) log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
conR.BaseReactor.OnStart()
if !conR.fastSync { if !conR.fastSync {
conR.conS.Start() conR.conS.Start()
} }
go conR.broadcastNewRoundStepRoutine() go conR.broadcastNewRoundStepRoutine()
} }
func (conR *ConsensusReactor) AfterStop() {
func (conR *ConsensusReactor) OnStop() {
conR.BaseReactor.OnStop()
conR.conS.Stop() conR.conS.Stop()
} }


+ 4
- 2
consensus/state.go View File

@ -360,12 +360,14 @@ func (cs *ConsensusState) NewStepCh() chan *RoundState {
return cs.newStepCh return cs.newStepCh
} }
func (cs *ConsensusState) AfterStart() {
func (cs *ConsensusState) OnStart() {
cs.BaseService.OnStart()
cs.scheduleRound0(cs.Height) cs.scheduleRound0(cs.Height)
} }
func (cs *ConsensusState) AfterStop() {
func (cs *ConsensusState) OnStop() {
// It's mostly asynchronous so, there's not much to stop. // It's mostly asynchronous so, there's not much to stop.
cs.BaseService.OnStop()
} }
// EnterNewRound(height, 0) at cs.StartTime. // EnterNewRound(height, 0) at cs.StartTime.


+ 4
- 2
events/events.go View File

@ -31,12 +31,14 @@ func NewEventSwitch() *EventSwitch {
return evsw return evsw
} }
func (evsw *EventSwitch) AfterStart() {
func (evsw *EventSwitch) OnStart() {
evsw.BaseService.OnStart()
evsw.eventCells = make(map[string]*eventCell) evsw.eventCells = make(map[string]*eventCell)
evsw.listeners = make(map[string]*eventListener) evsw.listeners = make(map[string]*eventListener)
} }
func (evsw *EventSwitch) AfterStop() {
func (evsw *EventSwitch) OnStop() {
evsw.BaseService.OnStop()
evsw.eventCells = nil evsw.eventCells = nil
evsw.listeners = nil evsw.listeners = nil
} }


+ 2
- 2
mempool/reactor.go View File

@ -34,9 +34,9 @@ func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
return memR return memR
} }
// func (memR *MempoolReactor) AfterStart() {}
// func (memR *MempoolReactor) OnStart() { memR.BaseReactor.OnStart() }
// func (memR *MempoolReactor) AfterStop() {}
// func (memR *MempoolReactor) OnStop() { memR.BaseReactor.OnStop() }
// Implements Reactor // Implements Reactor
func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {


+ 4
- 2
p2p/addrbook.go View File

@ -121,13 +121,15 @@ func (a *AddrBook) init() {
} }
} }
func (a *AddrBook) AfterStart() {
func (a *AddrBook) OnStart() {
a.QuitService.OnStart()
a.loadFromFile(a.filePath) a.loadFromFile(a.filePath)
a.wg.Add(1) a.wg.Add(1)
go a.saveRoutine() go a.saveRoutine()
} }
func (a *AddrBook) AfterStop() {
func (a *AddrBook) OnStop() {
a.QuitService.OnStop()
a.wg.Wait() a.wg.Wait()
} }


+ 4
- 2
p2p/connection.go View File

@ -127,7 +127,8 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei
return mconn return mconn
} }
func (c *MConnection) AfterStart() {
func (c *MConnection) OnStart() {
c.BaseService.OnStart()
c.quit = make(chan struct{}) c.quit = make(chan struct{})
c.flushTimer = NewThrottleTimer("flush", flushThrottleMS*time.Millisecond) c.flushTimer = NewThrottleTimer("flush", flushThrottleMS*time.Millisecond)
c.pingTimer = NewRepeatTimer("ping", pingTimeoutSeconds*time.Second) c.pingTimer = NewRepeatTimer("ping", pingTimeoutSeconds*time.Second)
@ -136,7 +137,8 @@ func (c *MConnection) AfterStart() {
go c.recvRoutine() go c.recvRoutine()
} }
func (c *MConnection) AfterStop() {
func (c *MConnection) OnStop() {
c.BaseService.OnStop()
c.flushTimer.Stop() c.flushTimer.Stop()
c.pingTimer.Stop() c.pingTimer.Stop()
c.chStatsTimer.Stop() c.chStatsTimer.Stop()


+ 4
- 2
p2p/listener.go View File

@ -97,11 +97,13 @@ SKIP_UPNP:
return dl return dl
} }
func (l *DefaultListener) AfterStart() {
func (l *DefaultListener) OnStart() {
l.BaseService.OnStart()
go l.listenRoutine() go l.listenRoutine()
} }
func (l *DefaultListener) AfterStop() {
func (l *DefaultListener) OnStop() {
l.BaseService.OnStop()
l.listener.Close() l.listener.Close()
} }


+ 4
- 2
p2p/peer.go View File

@ -72,11 +72,13 @@ func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactor
return p return p
} }
func (p *Peer) AfterStart() {
func (p *Peer) OnStart() {
p.BaseService.OnStart()
p.mconn.Start() p.mconn.Start()
} }
func (p *Peer) AfterStop() {
func (p *Peer) OnStop() {
p.BaseService.OnStop()
p.mconn.Stop() p.mconn.Stop()
} }


+ 4
- 2
p2p/pex_reactor.go View File

@ -41,11 +41,13 @@ func NewPEXReactor(book *AddrBook) *PEXReactor {
return pexR return pexR
} }
func (pexR *PEXReactor) AfterStart() {
func (pexR *PEXReactor) OnStart() {
pexR.BaseReactor.OnStart()
go pexR.ensurePeersRoutine() go pexR.ensurePeersRoutine()
} }
func (pexR *PEXReactor) AfterStop() {
func (pexR *PEXReactor) OnStop() {
pexR.BaseReactor.OnStop()
} }
// Implements Reactor // Implements Reactor


+ 4
- 2
p2p/switch.go View File

@ -153,7 +153,8 @@ func (sw *Switch) SetNodePrivKey(nodePrivKey acm.PrivKeyEd25519) {
} }
// Switch.Start() starts all the reactors, peers, and listeners. // Switch.Start() starts all the reactors, peers, and listeners.
func (sw *Switch) AfterStart() {
func (sw *Switch) OnStart() {
sw.BaseService.OnStart()
// Start reactors // Start reactors
for _, reactor := range sw.reactors { for _, reactor := range sw.reactors {
reactor.Start() reactor.Start()
@ -168,7 +169,8 @@ func (sw *Switch) AfterStart() {
} }
} }
func (sw *Switch) AfterStop() {
func (sw *Switch) OnStop() {
sw.BaseService.OnStop()
// Stop listeners // Stop listeners
for _, listener := range sw.listeners { for _, listener := range sw.listeners {
listener.Stop() listener.Stop()


+ 4
- 2
rpc/server/handlers.go View File

@ -241,14 +241,16 @@ func NewWSConnection(wsConn *websocket.Conn) *WSConnection {
return con return con
} }
func (con *WSConnection) AfterStart() {
func (con *WSConnection) OnStart() {
con.QuitService.OnStart()
// read subscriptions/unsubscriptions to events // read subscriptions/unsubscriptions to events
go con.read() go con.read()
// write responses // write responses
con.write() con.write()
} }
func (con *WSConnection) AfterStop() {
func (con *WSConnection) OnStop() {
con.QuitService.OnStop()
con.evsw.RemoveListener(con.id) con.evsw.RemoveListener(con.id)
// the write loop closes the websocket connection // the write loop closes the websocket connection
// when it exits its loop, and the read loop // when it exits its loop, and the read loop


Loading…
Cancel
Save