diff --git a/blockchain/pool.go b/blockchain/pool.go index 72f8f3c46..ab931d8f6 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -2,7 +2,6 @@ package blockchain import ( "sync" - "sync/atomic" "time" . "github.com/tendermint/tendermint/common" @@ -34,6 +33,8 @@ var ( */ type BlockPool struct { + BaseService + // block requests requestsMtx sync.Mutex requests map[int]*bpRequest @@ -48,12 +49,10 @@ type BlockPool struct { requestsCh chan<- BlockRequest timeoutsCh chan<- string repeater *RepeatTimer - - running int32 // atomic } func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool { - return &BlockPool{ + bp := &BlockPool{ peers: make(map[string]*bpPeer), requests: make(map[int]*bpRequest), @@ -63,35 +62,26 @@ func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- s requestsCh: requestsCh, timeoutsCh: timeoutsCh, - repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond), - - running: 0, + repeater: nil, } + bp.BaseService = *NewBaseService(log, "BlockPool", bp) + return bp } -func (pool *BlockPool) Start() { - if atomic.CompareAndSwapInt32(&pool.running, 0, 1) { - log.Notice("Starting BlockPool") - go pool.run() - } -} - -func (pool *BlockPool) Stop() { - if atomic.CompareAndSwapInt32(&pool.running, 1, 0) { - log.Notice("Stopping BlockPool") - pool.repeater.Stop() - } +func (pool *BlockPool) AfterStart() { + pool.repeater = NewRepeatTimer("", requestIntervalMS*time.Millisecond) + go pool.run() } -func (pool *BlockPool) IsRunning() bool { - return atomic.LoadInt32(&pool.running) == 1 +func (pool *BlockPool) AfterStop() { + pool.repeater.Stop() } // Run spawns requests as needed. func (pool *BlockPool) run() { RUN_LOOP: for { - if atomic.LoadInt32(&pool.running) == 0 { + if !pool.IsRunning() { break RUN_LOOP } _, numPending, _ := pool.GetStatus() @@ -301,14 +291,14 @@ func (pool *BlockPool) makeNextRequest() { } func (pool *BlockPool) sendRequest(height int, peerId string) { - if atomic.LoadInt32(&pool.running) == 0 { + if !pool.IsRunning() { return } pool.requestsCh <- BlockRequest{height, peerId} } func (pool *BlockPool) sendTimeout(peerId string) { - if atomic.LoadInt32(&pool.running) == 0 { + if !pool.IsRunning() { return } pool.timeoutsCh <- peerId diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 83dba679f..ccd04773e 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "reflect" - "sync/atomic" "time" "github.com/tendermint/tendermint/binary" @@ -39,6 +38,8 @@ type consensusReactor interface { // BlockchainReactor handles long-term catchup syncing. type BlockchainReactor struct { + p2p.BaseReactor + sw *p2p.Switch state *sm.State store *BlockStore @@ -47,8 +48,6 @@ type BlockchainReactor struct { requestsCh chan BlockRequest timeoutsCh chan string lastBlock *types.Block - quit chan struct{} - running uint32 evsw events.Fireable } @@ -74,31 +73,20 @@ func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *Blockc sync: sync, requestsCh: requestsCh, timeoutsCh: timeoutsCh, - quit: make(chan struct{}), - running: uint32(0), } + bcR.BaseReactor = *p2p.NewBaseReactor(log, "BlockchainReactor", bcR) return bcR } -// Implements Reactor -func (bcR *BlockchainReactor) Start(sw *p2p.Switch) { - if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) { - log.Notice("Starting BlockchainReactor") - bcR.sw = sw - if bcR.sync { - bcR.pool.Start() - go bcR.poolRoutine() - } +func (bcR *BlockchainReactor) AfterStart() { + if bcR.sync { + bcR.pool.Start() + go bcR.poolRoutine() } } -// Implements Reactor -func (bcR *BlockchainReactor) Stop() { - if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) { - log.Notice("Stopping BlockchainReactor") - close(bcR.quit) - bcR.pool.Stop() - } +func (bcR *BlockchainReactor) AfterStop() { + bcR.pool.Stop() } // Implements Reactor @@ -177,7 +165,7 @@ FOR_LOOP: for { select { case request := <-bcR.requestsCh: // chan BlockRequest - peer := bcR.sw.Peers().Get(request.PeerId) + peer := bcR.Switch.Peers().Get(request.PeerId) if peer == nil { // We can't assign the request. continue FOR_LOOP @@ -191,16 +179,16 @@ FOR_LOOP: } case peerId := <-bcR.timeoutsCh: // chan string // Peer timed out. - peer := bcR.sw.Peers().Get(peerId) + peer := bcR.Switch.Peers().Get(peerId) if peer != nil { - bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) + bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) } case _ = <-statusUpdateTicker.C: // ask for status updates go bcR.BroadcastStatusRequest() case _ = <-switchToConsensusTicker.C: height, numPending, numUnassigned := bcR.pool.GetStatus() - outbound, inbound, _ := bcR.sw.NumPeers() + outbound, inbound, _ := bcR.Switch.NumPeers() log.Info("Consensus ticker", "numUnassigned", numUnassigned, "numPending", numPending, "total", len(bcR.pool.requests), "outbound", outbound, "inbound", inbound) // NOTE: this condition is very strict right now. may need to weaken @@ -213,7 +201,7 @@ FOR_LOOP: log.Notice("Time to switch to consensus reactor!", "height", height) bcR.pool.Stop() - conR := bcR.sw.Reactor("CONSENSUS").(consensusReactor) + conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) conR.SwitchToConsensus(bcR.state) break FOR_LOOP @@ -250,19 +238,19 @@ FOR_LOOP: } } continue FOR_LOOP - case <-bcR.quit: + case <-bcR.Quit: break FOR_LOOP } } } func (bcR *BlockchainReactor) BroadcastStatusResponse() error { - bcR.sw.Broadcast(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()}) + bcR.Switch.Broadcast(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()}) return nil } func (bcR *BlockchainReactor) BroadcastStatusRequest() error { - bcR.sw.Broadcast(BlockchainChannel, &bcStatusRequestMessage{bcR.store.Height()}) + bcR.Switch.Broadcast(BlockchainChannel, &bcStatusRequestMessage{bcR.store.Height()}) return nil } diff --git a/common/service.go b/common/service.go index 686686f2f..f3829ee0a 100644 --- a/common/service.go +++ b/common/service.go @@ -1,63 +1,147 @@ +/* + +Classical-inheritance-style service declarations. +Services can be started, then stopped. +Users can override the AfterStart/AfterStop methods. +These methods are guaranteed to be called at most once. +Caller must ensure that Start() and Stop() are not called concurrently. +It is ok to call Stop() without calling Start() first. +Services cannot be re-started unless otherwise documented. + +Typical usage: + +type FooService struct { + BaseService + // private fields +} + +func NewFooService() *FooService { + fs := &FooService{ + // init + } + fs.BaseService = *BaseService(log, "FooService", fs) + return fs +} + +func (fs *FooService) AfterStart() { + // initialize private fields + // start subroutines, etc. +} + +func (fs *FooService) AfterStart() { + // close/destroy private fields + // stop subroutines, etc. +} + +*/ package common import "sync/atomic" +import "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/log15" + +type Service interface { + Start() bool + BeforeStart() + AfterStart() + + Stop() bool + BeforeStop() + AfterStop() + + IsRunning() bool +} -// BaseService represents a service that can be started then stopped, -// but cannot be restarted. -// .Start() calls the onStart callback function, and .Stop() calls onStop. -// It is meant to be embedded into service structs. -// The user must ensure that Start() and Stop() are not called concurrently. -// It is ok to call Stop() without calling Start() first -- the onStop -// callback will be called, and the service will never start. type BaseService struct { + log log15.Logger name string - service interface{} // for log statements. - started uint32 // atomic - stopped uint32 // atomic - onStart func() - onStop func() + started uint32 // atomic + stopped uint32 // atomic + + // The "subclass" of BaseService + impl Service } -func NewBaseService(name string, service interface{}, onStart, onStop func()) *BaseService { +func NewBaseService(log log15.Logger, name string, impl Service) *BaseService { return &BaseService{ - name: name, - service: service, - onStart: onStart, - onStop: onStop, + log: log, + name: name, + impl: impl, } } +// Implements Servce func (bs *BaseService) Start() bool { if atomic.CompareAndSwapUint32(&bs.started, 0, 1) { if atomic.LoadUint32(&bs.stopped) == 1 { - log.Warn(Fmt("Not starting %v -- already stopped", bs.name), "service", bs.service) + bs.log.Warn(Fmt("Not starting %v -- already stopped", bs.name), "impl", bs.impl) return false } else { - log.Notice(Fmt("Starting %v", bs.name), "service", bs.service) - } - if bs.onStart != nil { - bs.onStart() + bs.log.Notice(Fmt("Starting %v", bs.name), "impl", bs.impl) } + bs.impl.BeforeStart() + bs.impl.AfterStart() return true } else { - log.Info(Fmt("Not starting %v -- already started", bs.name), "service", bs.service) + bs.log.Info(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl) return false } } +// Implements Service +func (bs *BaseService) BeforeStart() {} + +// Implements Service +func (bs *BaseService) AfterStart() {} + +// Implements Service func (bs *BaseService) Stop() bool { if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) { - log.Notice(Fmt("Stopping %v", bs.name), "service", bs.service) - if bs.onStop != nil { - bs.onStop() - } + bs.log.Notice(Fmt("Stopping %v", bs.name), "impl", bs.impl) + bs.impl.BeforeStop() + bs.impl.AfterStop() return true } else { - log.Notice(Fmt("Not stopping %v", bs.name), "service", bs.service) + bs.log.Notice(Fmt("Not stopping %v", bs.name), "impl", bs.impl) return false } } +// Implements Service +func (bs *BaseService) BeforeStop() {} + +// Implements Service +func (bs *BaseService) AfterStop() {} + +// Implements Service func (bs *BaseService) IsRunning() bool { return atomic.LoadUint32(&bs.started) == 1 && atomic.LoadUint32(&bs.stopped) == 0 } + +//---------------------------------------- + +type QuitService struct { + BaseService + Quit chan struct{} +} + +func NewQuitService(log log15.Logger, name string, impl Service) *QuitService { + return &QuitService{ + BaseService: *NewBaseService(log, name, impl), + Quit: nil, + } +} + +// Init .Quit in BeforeStart such that AfterStart of impls have access to Quit. +// NOTE: When overriding BeforeStart, call QuitService.BeforeStart() manually. +func (qs *QuitService) BeforeStart() { + qs.Quit = make(chan struct{}) +} + +// Close .Quit after Stop/BeforeStop/AfterStop +func (qs *QuitService) Stop() bool { + res := qs.BaseService.Stop() + if res { + close(qs.Quit) + } + return res +} diff --git a/consensus/reactor.go b/consensus/reactor.go index a5f21aeb4..b45670d80 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -6,7 +6,6 @@ import ( "fmt" "reflect" "sync" - "sync/atomic" "time" "github.com/tendermint/tendermint/binary" @@ -32,50 +31,45 @@ const ( //----------------------------------------------------------------------------- type ConsensusReactor struct { - sw *p2p.Switch - running uint32 - quit chan struct{} + p2p.BaseReactor blockStore *bc.BlockStore conS *ConsensusState fastSync bool - - evsw events.Fireable + evsw events.Fireable } func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor { conR := &ConsensusReactor{ - quit: make(chan struct{}), blockStore: blockStore, conS: consensusState, fastSync: fastSync, } + conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR) return conR } -// Implements Reactor -func (conR *ConsensusReactor) Start(sw *p2p.Switch) { - if atomic.CompareAndSwapUint32(&conR.running, 0, 1) { - log.Notice("Starting ConsensusReactor", "fastSync", conR.fastSync) - conR.sw = sw - if !conR.fastSync { - conR.conS.Start() - } - go conR.broadcastNewRoundStepRoutine() +func (conR *ConsensusReactor) AfterStart() { + log.Notice("ConsensusReactor ", "fastSync", conR.fastSync) + if !conR.fastSync { + conR.conS.Start() } + go conR.broadcastNewRoundStepRoutine() } -// Implements Reactor -func (conR *ConsensusReactor) Stop() { - if atomic.CompareAndSwapUint32(&conR.running, 1, 0) { - log.Notice("Stopping ConsensusReactor") - conR.conS.Stop() - close(conR.quit) - } +func (conR *ConsensusReactor) AfterStop() { + conR.conS.Stop() } -func (conR *ConsensusReactor) IsRunning() bool { - return atomic.LoadUint32(&conR.running) == 1 +// Switch from the fast_sync to the consensus: +// reset the state, turn off fast_sync, start the consensus-state-machine +func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { + log.Notice("SwitchToConsensus") + // NOTE: The line below causes broadcastNewRoundStepRoutine() to + // broadcast a NewRoundStepMessage. + conR.conS.updateToState(state, false) + conR.fastSync = false + conR.conS.Start() } // Implements Reactor @@ -249,10 +243,10 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index in Type: vote.Type, Index: index, } - conR.sw.Broadcast(StateChannel, msg) + conR.Switch.Broadcast(StateChannel, msg) /* // TODO: Make this broadcast more selective. - for _, peer := range conR.sw.Peers().List() { + for _, peer := range conR.Switch.Peers().List() { ps := peer.Data.Get(PeerStateKey).(*PeerState) prs := ps.GetRoundState() if prs.Height == vote.Height { @@ -272,17 +266,6 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) { conR.conS.SetPrivValidator(priv) } -// Switch from the fast_sync to the consensus: -// reset the state, turn off fast_sync, start the consensus-state-machine -func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { - log.Notice("SwitchToConsensus") - // NOTE: The line below causes broadcastNewRoundStepRoutine() to - // broadcast a NewRoundStepMessage. - conR.conS.updateToState(state, false) - conR.fastSync = false - conR.conS.Start() -} - // implements events.Eventable func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { conR.evsw = evsw @@ -317,16 +300,16 @@ func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() { var rs *RoundState select { case rs = <-conR.conS.NewStepCh(): - case <-conR.quit: + case <-conR.Quit: return } nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { - conR.sw.Broadcast(StateChannel, nrsMsg) + conR.Switch.Broadcast(StateChannel, nrsMsg) } if csMsg != nil { - conR.sw.Broadcast(StateChannel, csMsg) + conR.Switch.Broadcast(StateChannel, csMsg) } } } diff --git a/consensus/state.go b/consensus/state.go index 85e8da203..d09b9e079 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -155,7 +155,6 @@ import ( "errors" "fmt" "sync" - "sync/atomic" "time" acm "github.com/tendermint/tendermint/account" @@ -285,9 +284,7 @@ func (rs *RoundState) StringShort() string { // Tracks consensus state across block heights and rounds. type ConsensusState struct { - started uint32 - stopped uint32 - quit chan struct{} + BaseService blockStore *bc.BlockStore mempoolReactor *mempl.MempoolReactor @@ -306,7 +303,6 @@ type ConsensusState struct { func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState { cs := &ConsensusState{ - quit: make(chan struct{}), blockStore: blockStore, mempoolReactor: mempoolReactor, newStepCh: make(chan *RoundState, 10), @@ -316,6 +312,7 @@ func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReacto // We do that upon Start(). cs.maybeRebond() cs.reconstructLastCommit(state) + cs.BaseService = *NewBaseService(log, "ConsensusState", cs) return cs } @@ -363,11 +360,12 @@ func (cs *ConsensusState) NewStepCh() chan *RoundState { return cs.newStepCh } -func (cs *ConsensusState) Start() { - if atomic.CompareAndSwapUint32(&cs.started, 0, 1) { - log.Notice("Starting ConsensusState") - cs.scheduleRound0(cs.Height) - } +func (cs *ConsensusState) AfterStart() { + cs.scheduleRound0(cs.Height) +} + +func (cs *ConsensusState) AfterStop() { + // It's mostly asynchronous so, there's not much to stop. } // EnterNewRound(height, 0) at cs.StartTime. @@ -382,13 +380,6 @@ func (cs *ConsensusState) scheduleRound0(height int) { }() } -func (cs *ConsensusState) Stop() { - if atomic.CompareAndSwapUint32(&cs.stopped, 0, 1) { - log.Notice("Stopping ConsensusState") - close(cs.quit) - } -} - // Updates ConsensusState and increments height to match that of state. // The round becomes 0 and cs.Step becomes RoundStepNewHeight. func (cs *ConsensusState) updateToState(state *sm.State, contiguous bool) { diff --git a/events/events.go b/events/events.go index 13aca2dac..14ea8b180 100644 --- a/events/events.go +++ b/events/events.go @@ -2,7 +2,8 @@ package events import ( "sync" - "sync/atomic" + + . "github.com/tendermint/tendermint/common" ) // reactors and other modules should export @@ -17,27 +18,27 @@ type Fireable interface { } type EventSwitch struct { + BaseService + mtx sync.RWMutex eventCells map[string]*eventCell listeners map[string]*eventListener - running uint32 - quit chan struct{} } -func (evsw *EventSwitch) Start() { - if atomic.CompareAndSwapUint32(&evsw.running, 0, 1) { - evsw.eventCells = make(map[string]*eventCell) - evsw.listeners = make(map[string]*eventListener) - evsw.quit = make(chan struct{}) - } +func NewEventSwitch() *EventSwitch { + evsw := &EventSwitch{} + evsw.BaseService = *NewBaseService(log, "EventSwitch", evsw) + return evsw } -func (evsw *EventSwitch) Stop() { - if atomic.CompareAndSwapUint32(&evsw.running, 1, 0) { - evsw.eventCells = nil - evsw.listeners = nil - close(evsw.quit) - } +func (evsw *EventSwitch) AfterStart() { + evsw.eventCells = make(map[string]*eventCell) + evsw.listeners = make(map[string]*eventListener) +} + +func (evsw *EventSwitch) AfterStop() { + evsw.eventCells = nil + evsw.listeners = nil } func (evsw *EventSwitch) AddListenerForEvent(listenerId, event string, cb eventCallback) { diff --git a/events/log.go b/events/log.go new file mode 100644 index 000000000..5b301bdf0 --- /dev/null +++ b/events/log.go @@ -0,0 +1,7 @@ +package events + +import ( + "github.com/tendermint/tendermint/logger" +) + +var log = logger.New("module", "events") diff --git a/mempool/reactor.go b/mempool/reactor.go index 440a04502..0a9b97325 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "reflect" - "sync/atomic" "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" @@ -19,11 +18,9 @@ var ( // MempoolReactor handles mempool tx broadcasting amongst peers. type MempoolReactor struct { - sw *p2p.Switch - quit chan struct{} - started uint32 - stopped uint32 + p2p.BaseReactor + sw *p2p.Switch Mempool *Mempool evsw events.Fireable @@ -31,27 +28,15 @@ type MempoolReactor struct { func NewMempoolReactor(mempool *Mempool) *MempoolReactor { memR := &MempoolReactor{ - quit: make(chan struct{}), Mempool: mempool, } + memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR) return memR } -// Implements Reactor -func (memR *MempoolReactor) Start(sw *p2p.Switch) { - if atomic.CompareAndSwapUint32(&memR.started, 0, 1) { - memR.sw = sw - log.Notice("Starting MempoolReactor") - } -} +// func (memR *MempoolReactor) AfterStart() {} -// Implements Reactor -func (memR *MempoolReactor) Stop() { - if atomic.CompareAndSwapUint32(&memR.stopped, 0, 1) { - log.Notice("Stopping MempoolReactor") - close(memR.quit) - } -} +// func (memR *MempoolReactor) AfterStop() {} // Implements Reactor func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { @@ -93,7 +78,7 @@ func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) { // Share tx. // We use a simple shotgun approach for now. // TODO: improve efficiency - for _, peer := range memR.sw.Peers().List() { + for _, peer := range memR.Switch.Peers().List() { if peer.Key == src.Key { continue } @@ -111,7 +96,7 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { return err } msg := &TxMessage{Tx: tx} - memR.sw.Broadcast(MempoolChannel, msg) + memR.Switch.Broadcast(MempoolChannel, msg) return nil } diff --git a/node/node.go b/node/node.go index b6d05f7ef..c4a259f76 100644 --- a/node/node.go +++ b/node/node.go @@ -97,28 +97,28 @@ func NewNode() *Node { privKey := acm.GenPrivKeyEd25519() // Make event switch - eventSwitch := new(events.EventSwitch) + eventSwitch := events.NewEventSwitch() eventSwitch.Start() - // Get PEXReactor + // Make PEXReactor book := p2p.NewAddrBook(config.GetString("addrbook_file")) pexReactor := p2p.NewPEXReactor(book) - // Get BlockchainReactor + // Make BlockchainReactor bcReactor := bc.NewBlockchainReactor(state.Copy(), blockStore, config.GetBool("fast_sync")) - // Get MempoolReactor + // Make MempoolReactor mempool := mempl.NewMempool(state.Copy()) mempoolReactor := mempl.NewMempoolReactor(mempool) - // Get ConsensusReactor + // Make ConsensusReactor consensusState := consensus.NewConsensusState(state.Copy(), blockStore, mempoolReactor) consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, config.GetBool("fast_sync")) if privValidator != nil { consensusReactor.SetPrivValidator(privValidator) } - // Make Switch + // Make p2p network switch sw := p2p.NewSwitch() sw.AddReactor("PEX", pexReactor) sw.AddReactor("MEMPOOL", mempoolReactor) diff --git a/p2p/addrbook.go b/p2p/addrbook.go index d6d217758..6397e7042 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -12,7 +12,6 @@ import ( "net" "os" "sync" - "sync/atomic" "time" . "github.com/tendermint/tendermint/common" @@ -74,19 +73,17 @@ const ( /* AddrBook - concurrency safe peer address manager */ type AddrBook struct { - filePath string + QuitService mtx sync.Mutex + filePath string rand *rand.Rand key string ourAddrs map[string]*NetAddress addrLookup map[string]*knownAddress // new & old addrNew []map[string]*knownAddress addrOld []map[string]*knownAddress - started uint32 - stopped uint32 wg sync.WaitGroup - quit chan struct{} nOld int nNew int } @@ -98,15 +95,15 @@ const ( // Use Start to begin processing asynchronous address updates. func NewAddrBook(filePath string) *AddrBook { - am := AddrBook{ + am := &AddrBook{ rand: rand.New(rand.NewSource(time.Now().UnixNano())), ourAddrs: make(map[string]*NetAddress), addrLookup: make(map[string]*knownAddress), - quit: make(chan struct{}), filePath: filePath, } am.init() - return &am + am.QuitService = *NewQuitService(log, "AddrBook", am) + return am } // When modifying this, don't forget to update loadFromFile() @@ -124,21 +121,14 @@ func (a *AddrBook) init() { } } -func (a *AddrBook) Start() { - if atomic.CompareAndSwapUint32(&a.started, 0, 1) { - log.Notice("Starting AddrBook") - a.loadFromFile(a.filePath) - a.wg.Add(1) - go a.saveRoutine() - } +func (a *AddrBook) AfterStart() { + a.loadFromFile(a.filePath) + a.wg.Add(1) + go a.saveRoutine() } -func (a *AddrBook) Stop() { - if atomic.CompareAndSwapUint32(&a.stopped, 0, 1) { - log.Notice("Stopping AddrBook") - close(a.quit) - a.wg.Wait() - } +func (a *AddrBook) AfterStop() { + a.wg.Wait() } func (a *AddrBook) AddOurAddress(addr *NetAddress) { @@ -381,7 +371,7 @@ out: case <-dumpAddressTicker.C: log.Info("Saving AddrBook to file", "size", a.Size()) a.saveToFile(a.filePath) - case <-a.quit: + case <-a.Quit: break out } } diff --git a/p2p/connection.go b/p2p/connection.go index cd9be1080..30452e49d 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -122,12 +122,12 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei mconn.channels = channels mconn.channelsIdx = channelsIdx - mconn.BaseService = *NewBaseService("MConnection", mconn, mconn.onStart, mconn.onStop) + mconn.BaseService = *NewBaseService(log, "MConnection", mconn) return mconn } -func (c *MConnection) onStart() { +func (c *MConnection) AfterStart() { c.quit = make(chan struct{}) go c.sendRoutine() go c.recvRoutine() @@ -136,7 +136,7 @@ func (c *MConnection) onStart() { c.chStatsTimer = NewRepeatTimer("chStats", updateStatsSeconds*time.Second) } -func (c *MConnection) onStop() { +func (c *MConnection) AfterStop() { c.flushTimer.Stop() c.pingTimer.Stop() c.chStatsTimer.Stop() @@ -391,7 +391,7 @@ FOR_LOOP: pktType := binary.ReadByte(c.bufReader, &n, &err) c.recvMonitor.Update(int(n)) if err != nil { - if !c.IsRunning() { + if c.IsRunning() { log.Warn("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err) c.stopForError(err) } @@ -412,8 +412,7 @@ FOR_LOOP: binary.ReadBinaryPtr(&pkt, c.bufReader, &n, &err) c.recvMonitor.Update(int(n)) if err != nil { - if !c.IsRunning() { - + if c.IsRunning() { log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err) c.stopForError(err) } @@ -425,8 +424,7 @@ FOR_LOOP: } msgBytes, err := channel.recvMsgPacket(pkt) if err != nil { - if !c.IsRunning() { - + if c.IsRunning() { log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err) c.stopForError(err) } diff --git a/p2p/listener.go b/p2p/listener.go index 86ccd753c..95dddc80d 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -4,7 +4,6 @@ import ( "fmt" "net" "strconv" - "sync/atomic" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/p2p/upnp" @@ -15,16 +14,17 @@ type Listener interface { InternalAddress() *NetAddress ExternalAddress() *NetAddress String() string - Stop() + Stop() bool } // Implements Listener type DefaultListener struct { + BaseService + listener net.Listener intAddr *NetAddress extAddr *NetAddress connections chan net.Conn - stopped uint32 } const ( @@ -92,10 +92,17 @@ SKIP_UPNP: extAddr: extAddr, connections: make(chan net.Conn, numBufferedConnections), } + dl.BaseService = *NewBaseService(log, "DefaultListener", dl) + dl.Start() // Started upon construction + return dl +} - go dl.listenRoutine() +func (l *DefaultListener) AfterStart() { + go l.listenRoutine() +} - return dl +func (l *DefaultListener) AfterStop() { + l.listener.Close() } // Accept connections and pass on the channel @@ -103,7 +110,7 @@ func (l *DefaultListener) listenRoutine() { for { conn, err := l.listener.Accept() - if atomic.LoadUint32(&l.stopped) == 1 { + if !l.IsRunning() { break // Go to cleanup } @@ -143,12 +150,6 @@ func (l *DefaultListener) NetListener() net.Listener { return l.listener } -func (l *DefaultListener) Stop() { - if atomic.CompareAndSwapUint32(&l.stopped, 0, 1) { - l.listener.Close() - } -} - func (l *DefaultListener) String() string { return fmt.Sprintf("Listener(@%v)", l.extAddr) } diff --git a/p2p/peer.go b/p2p/peer.go index 682d8f7d1..0a0973a9c 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "net" - "sync/atomic" "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" @@ -58,7 +57,7 @@ func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactor reactor.Receive(chId, p, msgBytes) } onError := func(r interface{}) { - p.stop() + p.Stop() onPeerError(p, r) } mconn := NewMConnection(conn, chDescs, onReceive, onError) @@ -69,15 +68,15 @@ func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactor Key: peerNodeInfo.PubKey.KeyString(), Data: NewCMap(), } - p.BaseService = *NewBaseService("Peer", p, p.onStart, p.onStop) + p.BaseService = *NewBaseService(log, "Peer", p) return p } -func (p *Peer) onStart() { +func (p *Peer) AfterStart() { p.mconn.Start() } -func (p *Peer) onStop() { +func (p *Peer) AfterStop() { p.mconn.Stop() } diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 4ffb5f438..601e48f06 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -6,7 +6,6 @@ import ( "fmt" "math/rand" "reflect" - "sync/atomic" "time" "github.com/tendermint/tendermint/binary" @@ -27,39 +26,26 @@ PEXReactor handles PEX (peer exchange) and ensures that an adequate number of peers are connected to the switch. */ type PEXReactor struct { - sw *Switch - quit chan struct{} - started uint32 - stopped uint32 + BaseReactor + sw *Switch book *AddrBook - evsw events.Fireable } func NewPEXReactor(book *AddrBook) *PEXReactor { pexR := &PEXReactor{ - quit: make(chan struct{}), book: book, } + pexR.BaseReactor = *NewBaseReactor(log, "PEXReactor", pexR) return pexR } -// Implements Reactor -func (pexR *PEXReactor) Start(sw *Switch) { - if atomic.CompareAndSwapUint32(&pexR.started, 0, 1) { - log.Notice("Starting PEXReactor") - pexR.sw = sw - go pexR.ensurePeersRoutine() - } +func (pexR *PEXReactor) AfterStart() { + go pexR.ensurePeersRoutine() } -// Implements Reactor -func (pexR *PEXReactor) Stop() { - if atomic.CompareAndSwapUint32(&pexR.stopped, 0, 1) { - log.Notice("Stopping PEXReactor") - close(pexR.quit) - } +func (pexR *PEXReactor) AfterStop() { } // Implements Reactor @@ -147,7 +133,7 @@ FOR_LOOP: select { case <-timer.Ch: pexR.ensurePeers() - case <-pexR.quit: + case <-pexR.Quit: break FOR_LOOP } } @@ -158,7 +144,7 @@ FOR_LOOP: // Ensures that sufficient peers are connected. (once) func (pexR *PEXReactor) ensurePeers() { - numOutPeers, _, numDialing := pexR.sw.NumPeers() + numOutPeers, _, numDialing := pexR.Switch.NumPeers() numToDial := minNumOutboundPeers - (numOutPeers + numDialing) log.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial) if numToDial <= 0 { @@ -179,8 +165,8 @@ func (pexR *PEXReactor) ensurePeers() { break } alreadySelected := toDial.Has(try.IP.String()) - alreadyDialing := pexR.sw.IsDialing(try) - alreadyConnected := pexR.sw.Peers().Has(try.IP.String()) + alreadyDialing := pexR.Switch.IsDialing(try) + alreadyConnected := pexR.Switch.Peers().Has(try.IP.String()) if alreadySelected || alreadyDialing || alreadyConnected { /* log.Info("Cannot dial address", "addr", try, @@ -204,7 +190,7 @@ func (pexR *PEXReactor) ensurePeers() { // Dial picked addresses for _, item := range toDial.Values() { go func(picked *NetAddress) { - _, err := pexR.sw.DialPeerWithAddress(picked) + _, err := pexR.Switch.DialPeerWithAddress(picked) if err != nil { pexR.book.MarkAttempt(picked) } @@ -213,7 +199,7 @@ func (pexR *PEXReactor) ensurePeers() { // If we need more addresses, pick a random peer and ask for more. if pexR.book.NeedMoreAddrs() { - if peers := pexR.sw.Peers().List(); len(peers) > 0 { + if peers := pexR.Switch.Peers().List(); len(peers) > 0 { i := rand.Int() % len(peers) peer := peers[i] log.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer) diff --git a/p2p/switch.go b/p2p/switch.go index 9325a12bb..fc814a382 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -5,17 +5,18 @@ import ( "fmt" "net" "strconv" - "sync/atomic" "time" + "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/log15" acm "github.com/tendermint/tendermint/account" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/types" ) type Reactor interface { - Start(sw *Switch) - Stop() + Service // Start, Stop + + SetSwitch(*Switch) GetChannels() []*ChannelDescriptor AddPeer(peer *Peer) RemovePeer(peer *Peer, reason interface{}) @@ -24,14 +25,25 @@ type Reactor interface { //-------------------------------------- -type BaseReactor struct{} +type BaseReactor struct { + QuitService // Provides Start, Stop, .Quit + Switch *Switch +} + +func NewBaseReactor(log log15.Logger, name string, impl Reactor) *BaseReactor { + return &BaseReactor{ + QuitService: *NewQuitService(log, name, impl), + Switch: nil, + } +} -func (_ BaseReactor) Start(sw *Switch) {} -func (_ BaseReactor) Stop() {} -func (_ BaseReactor) GetChannels() []*ChannelDescriptor { return nil } -func (_ BaseReactor) AddPeer(peer *Peer) {} -func (_ BaseReactor) RemovePeer(peer *Peer, reason interface{}) {} -func (_ BaseReactor) Receive(chId byte, peer *Peer, msgBytes []byte) {} +func (br *BaseReactor) SetSwitch(sw *Switch) { + br.Switch = sw +} +func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil } +func (_ *BaseReactor) AddPeer(peer *Peer) {} +func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {} +func (_ *BaseReactor) Receive(chId byte, peer *Peer, msgBytes []byte) {} //----------------------------------------------------------------------------- @@ -42,13 +54,14 @@ or more `Channels`. So while sending outgoing messages is typically performed o incoming messages are received on the reactor. */ type Switch struct { + BaseService + listeners []Listener reactors map[string]Reactor chDescs []*ChannelDescriptor reactorsByCh map[byte]Reactor peers *PeerSet dialing *CMap - running uint32 nodeInfo *types.NodeInfo // our node info nodePrivKey acm.PrivKeyEd25519 // our node privkey } @@ -71,9 +84,9 @@ func NewSwitch() *Switch { reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: NewCMap(), - running: 0, nodeInfo: nil, } + sw.BaseService = *NewBaseService(log, "P2P Switch", sw) return sw } @@ -91,6 +104,7 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { sw.reactorsByCh[chId] = reactor } sw.reactors[name] = reactor + reactor.SetSwitch(sw) return reactor } @@ -138,39 +152,36 @@ func (sw *Switch) SetNodePrivKey(nodePrivKey acm.PrivKeyEd25519) { } } -func (sw *Switch) Start() { - if atomic.CompareAndSwapUint32(&sw.running, 0, 1) { - // Start reactors - for _, reactor := range sw.reactors { - reactor.Start(sw) - } - // Start peers - for _, peer := range sw.peers.List() { - sw.startInitPeer(peer) - } - // Start listeners - for _, listener := range sw.listeners { - go sw.listenerRoutine(listener) - } +// Switch.Start() starts all the reactors, peers, and listeners. +func (sw *Switch) AfterStart() { + // Start reactors + for _, reactor := range sw.reactors { + reactor.Start() + } + // Start peers + for _, peer := range sw.peers.List() { + sw.startInitPeer(peer) + } + // Start listeners + for _, listener := range sw.listeners { + go sw.listenerRoutine(listener) } } -func (sw *Switch) Stop() { - if atomic.CompareAndSwapUint32(&sw.running, 1, 0) { - // Stop listeners - for _, listener := range sw.listeners { - listener.Stop() - } - sw.listeners = nil - // Stop peers - for _, peer := range sw.peers.List() { - peer.stop() - } - sw.peers = NewPeerSet() - // Stop reactors - for _, reactor := range sw.reactors { - reactor.Stop() - } +func (sw *Switch) AfterStop() { + // Stop listeners + for _, listener := range sw.listeners { + listener.Stop() + } + sw.listeners = nil + // Stop peers + for _, peer := range sw.peers.List() { + peer.Stop() + } + sw.peers = NewPeerSet() + // Stop reactors + for _, reactor := range sw.reactors { + reactor.Stop() } } @@ -230,7 +241,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er // remove deadline and start peer conn.SetDeadline(time.Time{}) - if atomic.LoadUint32(&sw.running) == 1 { + if sw.IsRunning() { sw.startInitPeer(peer) } @@ -239,7 +250,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er } func (sw *Switch) startInitPeer(peer *Peer) { - peer.start() // spawn send/recv routines + peer.Start() // spawn send/recv routines sw.addPeerToReactors(peer) // run AddPeer on each reactor } @@ -304,7 +315,7 @@ func (sw *Switch) Peers() IPeerSet { func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { log.Notice("Stopping peer for error", "peer", peer, "error", reason) sw.peers.Remove(peer) - peer.stop() + peer.Stop() sw.removePeerFromReactors(peer, reason) } @@ -313,7 +324,7 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { func (sw *Switch) StopPeerGracefully(peer *Peer) { log.Notice("Stopping peer gracefully") sw.peers.Remove(peer) - peer.stop() + peer.Stop() sw.removePeerFromReactors(peer, nil) } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 11e9c478f..89e417a44 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -19,6 +19,8 @@ type PeerMessage struct { } type TestReactor struct { + BaseReactor + mtx sync.Mutex channels []*ChannelDescriptor peersAdded []*Peer @@ -29,17 +31,13 @@ type TestReactor struct { } func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor { - return &TestReactor{ + tr := &TestReactor{ channels: channels, logMessages: logMessages, msgsReceived: make(map[byte][]PeerMessage), } -} - -func (tr *TestReactor) Start(sw *Switch) { -} - -func (tr *TestReactor) Stop() { + tr.BaseReactor = *NewBaseReactor(log, "TestReactor", tr) + return tr } func (tr *TestReactor) GetChannels() []*ChannelDescriptor { @@ -132,11 +130,11 @@ func TestSwitches(t *testing.T) { sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x00), Priority: 10}, &ChannelDescriptor{Id: byte(0x01), Priority: 10}, - }, true)).Start(sw) // Start the reactor + }, true)).Start() // Start the reactor sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x02), Priority: 10}, &ChannelDescriptor{Id: byte(0x03), Priority: 10}, - }, true)).Start(sw) // Start the reactor + }, true)).Start() // Start the reactor return sw }) defer s1.Stop() diff --git a/p2p/upnp/upnp.go b/p2p/upnp/upnp.go index 7aff240ba..3d6c55035 100644 --- a/p2p/upnp/upnp.go +++ b/p2p/upnp/upnp.go @@ -126,7 +126,7 @@ type ExternalIPAddress struct { IP string } -type Service struct { +type UPNPService struct { ServiceType string `xml:"serviceType"` ControlURL string `xml:"controlURL"` } @@ -136,7 +136,7 @@ type DeviceList struct { } type ServiceList struct { - Service []Service `xml:"service"` + Service []UPNPService `xml:"service"` } type Device struct { @@ -160,7 +160,7 @@ func getChildDevice(d *Device, deviceType string) *Device { return nil } -func getChildService(d *Device, serviceType string) *Service { +func getChildService(d *Device, serviceType string) *UPNPService { sl := d.ServiceList.Service for i := 0; i < len(sl); i++ { if strings.Index(sl[i].ServiceType, serviceType) >= 0 { diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index 8869a5c9d..530dc19e4 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -9,11 +9,11 @@ import ( "net/http" "reflect" "sort" - "sync/atomic" "time" "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/gorilla/websocket" "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/events" . "github.com/tendermint/tendermint/rpc/types" ) @@ -220,50 +220,43 @@ const ( // contains listener id, underlying ws connection, // and the event switch for subscribing to events type WSConnection struct { + QuitService + id string wsConn *websocket.Conn writeChan chan WSResponse - quitChan chan struct{} failedSends int - started uint32 - stopped uint32 evsw *events.EventSwitch } // new websocket connection wrapper func NewWSConnection(wsConn *websocket.Conn) *WSConnection { - return &WSConnection{ + con := &WSConnection{ id: wsConn.RemoteAddr().String(), wsConn: wsConn, writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full - quitChan: make(chan struct{}), } + con.QuitService = *NewQuitService(log, "WSConnection", con) + return con } -// start the connection and hand her the event switch -func (con *WSConnection) Start(evsw *events.EventSwitch) { - if atomic.CompareAndSwapUint32(&con.started, 0, 1) { - con.evsw = evsw - - // read subscriptions/unsubscriptions to events - go con.read() - // write responses - con.write() - } +func (con *WSConnection) AfterStart() { + // read subscriptions/unsubscriptions to events + go con.read() + // write responses + con.write() } -// close the connection -func (con *WSConnection) Stop() { - if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) { - con.evsw.RemoveListener(con.id) - close(con.quitChan) - // the write loop closes the websocket connection - // when it exits its loop, and the read loop - // closes the writeChan - } +func (con *WSConnection) AfterStop() { + con.evsw.RemoveListener(con.id) + // the write loop closes the websocket connection + // when it exits its loop, and the read loop + // closes the writeChan } +func (con *WSConnection) SetEventSwitch(evsw *events.EventSwitch) { con.evsw = evsw } + // attempt to write response to writeChan and record failures func (con *WSConnection) safeWrite(resp WSResponse) { select { @@ -351,7 +344,7 @@ func (con *WSConnection) write() { return } } - case <-con.quitChan: + case <-con.Quit: return } } @@ -389,7 +382,8 @@ func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Requ // register connection con := NewWSConnection(wsConn) log.Notice("New websocket connection", "origin", con.id) - con.Start(wm.evsw) + con.SetEventSwitch(wm.evsw) + con.Start() } // rpc.websocket diff --git a/state/permissions_test.go b/state/permissions_test.go index 85e98daa2..ad3199a9c 100644 --- a/state/permissions_test.go +++ b/state/permissions_test.go @@ -1104,7 +1104,7 @@ func TestSNativeCallTx(t *testing.T) { // run ExecTx and wait for the Receive event on given addr // returns the msg data and an error/exception func execTxWaitEvent(t *testing.T, blockCache *BlockCache, tx types.Tx, eventid string) (interface{}, string) { - evsw := new(events.EventSwitch) + evsw := events.NewEventSwitch() evsw.Start() ch := make(chan interface{}) evsw.AddListenerForEvent("test", eventid, func(msg interface{}) { diff --git a/vm/test/log_event_test.go b/vm/test/log_event_test.go index 3c53b324d..81dd3ef35 100644 --- a/vm/test/log_event_test.go +++ b/vm/test/log_event_test.go @@ -35,7 +35,7 @@ func TestLog4(t *testing.T) { ourVm := NewVM(st, newParams(), Zero256, nil) - eventSwitch := &events.EventSwitch{} + eventSwitch := events.NewEventSwitch() eventSwitch.Start() eventId := types.EventStringLogEvent(account2.Address.Postfix(20)) diff --git a/vm/test/vm_test.go b/vm/test/vm_test.go index b491071be..417f6c9ac 100644 --- a/vm/test/vm_test.go +++ b/vm/test/vm_test.go @@ -154,7 +154,7 @@ func TestSendCall(t *testing.T) { // subscribes to an AccReceive, runs the vm, returns the exception func runVMWaitEvents(t *testing.T, ourVm *VM, caller, callee *Account, subscribeAddr, contractCode []byte, gas int64) string { // we need to catch the event from the CALL to check for exceptions - evsw := new(events.EventSwitch) + evsw := events.NewEventSwitch() evsw.Start() ch := make(chan interface{}) fmt.Printf("subscribe to %x\n", subscribeAddr)