Browse Source

BaseService and BaseReactor refactor and unification

pull/118/head
Jae Kwon 9 years ago
parent
commit
571a134318
21 changed files with 349 additions and 343 deletions
  1. +14
    -24
      blockchain/pool.go
  2. +17
    -29
      blockchain/reactor.go
  3. +112
    -28
      common/service.go
  4. +24
    -41
      consensus/reactor.go
  5. +8
    -17
      consensus/state.go
  6. +16
    -15
      events/events.go
  7. +7
    -0
      events/log.go
  8. +7
    -22
      mempool/reactor.go
  9. +6
    -6
      node/node.go
  10. +12
    -22
      p2p/addrbook.go
  11. +6
    -8
      p2p/connection.go
  12. +13
    -12
      p2p/listener.go
  13. +4
    -5
      p2p/peer.go
  14. +12
    -26
      p2p/pex_reactor.go
  15. +57
    -46
      p2p/switch.go
  16. +7
    -9
      p2p/switch_test.go
  17. +3
    -3
      p2p/upnp/upnp.go
  18. +21
    -27
      rpc/server/handlers.go
  19. +1
    -1
      state/permissions_test.go
  20. +1
    -1
      vm/test/log_event_test.go
  21. +1
    -1
      vm/test/vm_test.go

+ 14
- 24
blockchain/pool.go View File

@ -2,7 +2,6 @@ package blockchain
import ( import (
"sync" "sync"
"sync/atomic"
"time" "time"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
@ -34,6 +33,8 @@ var (
*/ */
type BlockPool struct { type BlockPool struct {
BaseService
// block requests // block requests
requestsMtx sync.Mutex requestsMtx sync.Mutex
requests map[int]*bpRequest requests map[int]*bpRequest
@ -48,12 +49,10 @@ type BlockPool struct {
requestsCh chan<- BlockRequest requestsCh chan<- BlockRequest
timeoutsCh chan<- string timeoutsCh chan<- string
repeater *RepeatTimer repeater *RepeatTimer
running int32 // atomic
} }
func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool { func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
return &BlockPool{
bp := &BlockPool{
peers: make(map[string]*bpPeer), peers: make(map[string]*bpPeer),
requests: make(map[int]*bpRequest), requests: make(map[int]*bpRequest),
@ -63,35 +62,26 @@ func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- s
requestsCh: requestsCh, requestsCh: requestsCh,
timeoutsCh: timeoutsCh, timeoutsCh: timeoutsCh,
repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond),
running: 0,
repeater: nil,
} }
bp.BaseService = *NewBaseService(log, "BlockPool", bp)
return bp
} }
func (pool *BlockPool) Start() {
if atomic.CompareAndSwapInt32(&pool.running, 0, 1) {
log.Notice("Starting BlockPool")
go pool.run()
}
}
func (pool *BlockPool) Stop() {
if atomic.CompareAndSwapInt32(&pool.running, 1, 0) {
log.Notice("Stopping BlockPool")
pool.repeater.Stop()
}
func (pool *BlockPool) AfterStart() {
pool.repeater = NewRepeatTimer("", requestIntervalMS*time.Millisecond)
go pool.run()
} }
func (pool *BlockPool) IsRunning() bool {
return atomic.LoadInt32(&pool.running) == 1
func (pool *BlockPool) AfterStop() {
pool.repeater.Stop()
} }
// Run spawns requests as needed. // Run spawns requests as needed.
func (pool *BlockPool) run() { func (pool *BlockPool) run() {
RUN_LOOP: RUN_LOOP:
for { for {
if atomic.LoadInt32(&pool.running) == 0 {
if !pool.IsRunning() {
break RUN_LOOP break RUN_LOOP
} }
_, numPending, _ := pool.GetStatus() _, numPending, _ := pool.GetStatus()
@ -301,14 +291,14 @@ func (pool *BlockPool) makeNextRequest() {
} }
func (pool *BlockPool) sendRequest(height int, peerId string) { func (pool *BlockPool) sendRequest(height int, peerId string) {
if atomic.LoadInt32(&pool.running) == 0 {
if !pool.IsRunning() {
return return
} }
pool.requestsCh <- BlockRequest{height, peerId} pool.requestsCh <- BlockRequest{height, peerId}
} }
func (pool *BlockPool) sendTimeout(peerId string) { func (pool *BlockPool) sendTimeout(peerId string) {
if atomic.LoadInt32(&pool.running) == 0 {
if !pool.IsRunning() {
return return
} }
pool.timeoutsCh <- peerId pool.timeoutsCh <- peerId


+ 17
- 29
blockchain/reactor.go View File

@ -5,7 +5,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"reflect" "reflect"
"sync/atomic"
"time" "time"
"github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/binary"
@ -39,6 +38,8 @@ type consensusReactor interface {
// BlockchainReactor handles long-term catchup syncing. // BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct { type BlockchainReactor struct {
p2p.BaseReactor
sw *p2p.Switch sw *p2p.Switch
state *sm.State state *sm.State
store *BlockStore store *BlockStore
@ -47,8 +48,6 @@ type BlockchainReactor struct {
requestsCh chan BlockRequest requestsCh chan BlockRequest
timeoutsCh chan string timeoutsCh chan string
lastBlock *types.Block lastBlock *types.Block
quit chan struct{}
running uint32
evsw events.Fireable evsw events.Fireable
} }
@ -74,31 +73,20 @@ func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *Blockc
sync: sync, sync: sync,
requestsCh: requestsCh, requestsCh: requestsCh,
timeoutsCh: timeoutsCh, timeoutsCh: timeoutsCh,
quit: make(chan struct{}),
running: uint32(0),
} }
bcR.BaseReactor = *p2p.NewBaseReactor(log, "BlockchainReactor", bcR)
return bcR return bcR
} }
// Implements Reactor
func (bcR *BlockchainReactor) Start(sw *p2p.Switch) {
if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) {
log.Notice("Starting BlockchainReactor")
bcR.sw = sw
if bcR.sync {
bcR.pool.Start()
go bcR.poolRoutine()
}
func (bcR *BlockchainReactor) AfterStart() {
if bcR.sync {
bcR.pool.Start()
go bcR.poolRoutine()
} }
} }
// Implements Reactor
func (bcR *BlockchainReactor) Stop() {
if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) {
log.Notice("Stopping BlockchainReactor")
close(bcR.quit)
bcR.pool.Stop()
}
func (bcR *BlockchainReactor) AfterStop() {
bcR.pool.Stop()
} }
// Implements Reactor // Implements Reactor
@ -177,7 +165,7 @@ FOR_LOOP:
for { for {
select { select {
case request := <-bcR.requestsCh: // chan BlockRequest case request := <-bcR.requestsCh: // chan BlockRequest
peer := bcR.sw.Peers().Get(request.PeerId)
peer := bcR.Switch.Peers().Get(request.PeerId)
if peer == nil { if peer == nil {
// We can't assign the request. // We can't assign the request.
continue FOR_LOOP continue FOR_LOOP
@ -191,16 +179,16 @@ FOR_LOOP:
} }
case peerId := <-bcR.timeoutsCh: // chan string case peerId := <-bcR.timeoutsCh: // chan string
// Peer timed out. // Peer timed out.
peer := bcR.sw.Peers().Get(peerId)
peer := bcR.Switch.Peers().Get(peerId)
if peer != nil { if peer != nil {
bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
} }
case _ = <-statusUpdateTicker.C: case _ = <-statusUpdateTicker.C:
// ask for status updates // ask for status updates
go bcR.BroadcastStatusRequest() go bcR.BroadcastStatusRequest()
case _ = <-switchToConsensusTicker.C: case _ = <-switchToConsensusTicker.C:
height, numPending, numUnassigned := bcR.pool.GetStatus() height, numPending, numUnassigned := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.sw.NumPeers()
outbound, inbound, _ := bcR.Switch.NumPeers()
log.Info("Consensus ticker", "numUnassigned", numUnassigned, "numPending", numPending, log.Info("Consensus ticker", "numUnassigned", numUnassigned, "numPending", numPending,
"total", len(bcR.pool.requests), "outbound", outbound, "inbound", inbound) "total", len(bcR.pool.requests), "outbound", outbound, "inbound", inbound)
// NOTE: this condition is very strict right now. may need to weaken // NOTE: this condition is very strict right now. may need to weaken
@ -213,7 +201,7 @@ FOR_LOOP:
log.Notice("Time to switch to consensus reactor!", "height", height) log.Notice("Time to switch to consensus reactor!", "height", height)
bcR.pool.Stop() bcR.pool.Stop()
conR := bcR.sw.Reactor("CONSENSUS").(consensusReactor)
conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
conR.SwitchToConsensus(bcR.state) conR.SwitchToConsensus(bcR.state)
break FOR_LOOP break FOR_LOOP
@ -250,19 +238,19 @@ FOR_LOOP:
} }
} }
continue FOR_LOOP continue FOR_LOOP
case <-bcR.quit:
case <-bcR.Quit:
break FOR_LOOP break FOR_LOOP
} }
} }
} }
func (bcR *BlockchainReactor) BroadcastStatusResponse() error { func (bcR *BlockchainReactor) BroadcastStatusResponse() error {
bcR.sw.Broadcast(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
bcR.Switch.Broadcast(BlockchainChannel, &bcStatusResponseMessage{bcR.store.Height()})
return nil return nil
} }
func (bcR *BlockchainReactor) BroadcastStatusRequest() error { func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
bcR.sw.Broadcast(BlockchainChannel, &bcStatusRequestMessage{bcR.store.Height()})
bcR.Switch.Broadcast(BlockchainChannel, &bcStatusRequestMessage{bcR.store.Height()})
return nil return nil
} }


+ 112
- 28
common/service.go View File

@ -1,63 +1,147 @@
/*
Classical-inheritance-style service declarations.
Services can be started, then stopped.
Users can override the AfterStart/AfterStop methods.
These methods are guaranteed to be called at most once.
Caller must ensure that Start() and Stop() are not called concurrently.
It is ok to call Stop() without calling Start() first.
Services cannot be re-started unless otherwise documented.
Typical usage:
type FooService struct {
BaseService
// private fields
}
func NewFooService() *FooService {
fs := &FooService{
// init
}
fs.BaseService = *BaseService(log, "FooService", fs)
return fs
}
func (fs *FooService) AfterStart() {
// initialize private fields
// start subroutines, etc.
}
func (fs *FooService) AfterStart() {
// close/destroy private fields
// stop subroutines, etc.
}
*/
package common package common
import "sync/atomic" import "sync/atomic"
import "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/log15"
type Service interface {
Start() bool
BeforeStart()
AfterStart()
Stop() bool
BeforeStop()
AfterStop()
IsRunning() bool
}
// BaseService represents a service that can be started then stopped,
// but cannot be restarted.
// .Start() calls the onStart callback function, and .Stop() calls onStop.
// It is meant to be embedded into service structs.
// The user must ensure that Start() and Stop() are not called concurrently.
// It is ok to call Stop() without calling Start() first -- the onStop
// callback will be called, and the service will never start.
type BaseService struct { type BaseService struct {
log log15.Logger
name string name string
service interface{} // for log statements.
started uint32 // atomic
stopped uint32 // atomic
onStart func()
onStop func()
started uint32 // atomic
stopped uint32 // atomic
// The "subclass" of BaseService
impl Service
} }
func NewBaseService(name string, service interface{}, onStart, onStop func()) *BaseService {
func NewBaseService(log log15.Logger, name string, impl Service) *BaseService {
return &BaseService{ return &BaseService{
name: name,
service: service,
onStart: onStart,
onStop: onStop,
log: log,
name: name,
impl: impl,
} }
} }
// Implements Servce
func (bs *BaseService) Start() bool { func (bs *BaseService) Start() bool {
if atomic.CompareAndSwapUint32(&bs.started, 0, 1) { if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
if atomic.LoadUint32(&bs.stopped) == 1 { if atomic.LoadUint32(&bs.stopped) == 1 {
log.Warn(Fmt("Not starting %v -- already stopped", bs.name), "service", bs.service)
bs.log.Warn(Fmt("Not starting %v -- already stopped", bs.name), "impl", bs.impl)
return false return false
} else { } else {
log.Notice(Fmt("Starting %v", bs.name), "service", bs.service)
}
if bs.onStart != nil {
bs.onStart()
bs.log.Notice(Fmt("Starting %v", bs.name), "impl", bs.impl)
} }
bs.impl.BeforeStart()
bs.impl.AfterStart()
return true return true
} else { } else {
log.Info(Fmt("Not starting %v -- already started", bs.name), "service", bs.service)
bs.log.Info(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl)
return false return false
} }
} }
// Implements Service
func (bs *BaseService) BeforeStart() {}
// Implements Service
func (bs *BaseService) AfterStart() {}
// Implements Service
func (bs *BaseService) Stop() bool { func (bs *BaseService) Stop() bool {
if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) { if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) {
log.Notice(Fmt("Stopping %v", bs.name), "service", bs.service)
if bs.onStop != nil {
bs.onStop()
}
bs.log.Notice(Fmt("Stopping %v", bs.name), "impl", bs.impl)
bs.impl.BeforeStop()
bs.impl.AfterStop()
return true return true
} else { } else {
log.Notice(Fmt("Not stopping %v", bs.name), "service", bs.service)
bs.log.Notice(Fmt("Not stopping %v", bs.name), "impl", bs.impl)
return false return false
} }
} }
// Implements Service
func (bs *BaseService) BeforeStop() {}
// Implements Service
func (bs *BaseService) AfterStop() {}
// Implements Service
func (bs *BaseService) IsRunning() bool { func (bs *BaseService) IsRunning() bool {
return atomic.LoadUint32(&bs.started) == 1 && atomic.LoadUint32(&bs.stopped) == 0 return atomic.LoadUint32(&bs.started) == 1 && atomic.LoadUint32(&bs.stopped) == 0
} }
//----------------------------------------
type QuitService struct {
BaseService
Quit chan struct{}
}
func NewQuitService(log log15.Logger, name string, impl Service) *QuitService {
return &QuitService{
BaseService: *NewBaseService(log, name, impl),
Quit: nil,
}
}
// Init .Quit in BeforeStart such that AfterStart of impls have access to Quit.
// NOTE: When overriding BeforeStart, call QuitService.BeforeStart() manually.
func (qs *QuitService) BeforeStart() {
qs.Quit = make(chan struct{})
}
// Close .Quit after Stop/BeforeStop/AfterStop
func (qs *QuitService) Stop() bool {
res := qs.BaseService.Stop()
if res {
close(qs.Quit)
}
return res
}

+ 24
- 41
consensus/reactor.go View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/binary"
@ -32,50 +31,45 @@ const (
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
type ConsensusReactor struct { type ConsensusReactor struct {
sw *p2p.Switch
running uint32
quit chan struct{}
p2p.BaseReactor
blockStore *bc.BlockStore blockStore *bc.BlockStore
conS *ConsensusState conS *ConsensusState
fastSync bool fastSync bool
evsw events.Fireable
evsw events.Fireable
} }
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor { func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
conR := &ConsensusReactor{ conR := &ConsensusReactor{
quit: make(chan struct{}),
blockStore: blockStore, blockStore: blockStore,
conS: consensusState, conS: consensusState,
fastSync: fastSync, fastSync: fastSync,
} }
conR.BaseReactor = *p2p.NewBaseReactor(log, "ConsensusReactor", conR)
return conR return conR
} }
// Implements Reactor
func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
log.Notice("Starting ConsensusReactor", "fastSync", conR.fastSync)
conR.sw = sw
if !conR.fastSync {
conR.conS.Start()
}
go conR.broadcastNewRoundStepRoutine()
func (conR *ConsensusReactor) AfterStart() {
log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
if !conR.fastSync {
conR.conS.Start()
} }
go conR.broadcastNewRoundStepRoutine()
} }
// Implements Reactor
func (conR *ConsensusReactor) Stop() {
if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
log.Notice("Stopping ConsensusReactor")
conR.conS.Stop()
close(conR.quit)
}
func (conR *ConsensusReactor) AfterStop() {
conR.conS.Stop()
} }
func (conR *ConsensusReactor) IsRunning() bool {
return atomic.LoadUint32(&conR.running) == 1
// Switch from the fast_sync to the consensus:
// reset the state, turn off fast_sync, start the consensus-state-machine
func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
log.Notice("SwitchToConsensus")
// NOTE: The line below causes broadcastNewRoundStepRoutine() to
// broadcast a NewRoundStepMessage.
conR.conS.updateToState(state, false)
conR.fastSync = false
conR.conS.Start()
} }
// Implements Reactor // Implements Reactor
@ -249,10 +243,10 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index in
Type: vote.Type, Type: vote.Type,
Index: index, Index: index,
} }
conR.sw.Broadcast(StateChannel, msg)
conR.Switch.Broadcast(StateChannel, msg)
/* /*
// TODO: Make this broadcast more selective. // TODO: Make this broadcast more selective.
for _, peer := range conR.sw.Peers().List() {
for _, peer := range conR.Switch.Peers().List() {
ps := peer.Data.Get(PeerStateKey).(*PeerState) ps := peer.Data.Get(PeerStateKey).(*PeerState)
prs := ps.GetRoundState() prs := ps.GetRoundState()
if prs.Height == vote.Height { if prs.Height == vote.Height {
@ -272,17 +266,6 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
conR.conS.SetPrivValidator(priv) conR.conS.SetPrivValidator(priv)
} }
// Switch from the fast_sync to the consensus:
// reset the state, turn off fast_sync, start the consensus-state-machine
func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
log.Notice("SwitchToConsensus")
// NOTE: The line below causes broadcastNewRoundStepRoutine() to
// broadcast a NewRoundStepMessage.
conR.conS.updateToState(state, false)
conR.fastSync = false
conR.conS.Start()
}
// implements events.Eventable // implements events.Eventable
func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
conR.evsw = evsw conR.evsw = evsw
@ -317,16 +300,16 @@ func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
var rs *RoundState var rs *RoundState
select { select {
case rs = <-conR.conS.NewStepCh(): case rs = <-conR.conS.NewStepCh():
case <-conR.quit:
case <-conR.Quit:
return return
} }
nrsMsg, csMsg := makeRoundStepMessages(rs) nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil { if nrsMsg != nil {
conR.sw.Broadcast(StateChannel, nrsMsg)
conR.Switch.Broadcast(StateChannel, nrsMsg)
} }
if csMsg != nil { if csMsg != nil {
conR.sw.Broadcast(StateChannel, csMsg)
conR.Switch.Broadcast(StateChannel, csMsg)
} }
} }
} }


+ 8
- 17
consensus/state.go View File

@ -155,7 +155,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
acm "github.com/tendermint/tendermint/account" acm "github.com/tendermint/tendermint/account"
@ -285,9 +284,7 @@ func (rs *RoundState) StringShort() string {
// Tracks consensus state across block heights and rounds. // Tracks consensus state across block heights and rounds.
type ConsensusState struct { type ConsensusState struct {
started uint32
stopped uint32
quit chan struct{}
BaseService
blockStore *bc.BlockStore blockStore *bc.BlockStore
mempoolReactor *mempl.MempoolReactor mempoolReactor *mempl.MempoolReactor
@ -306,7 +303,6 @@ type ConsensusState struct {
func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState { func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState {
cs := &ConsensusState{ cs := &ConsensusState{
quit: make(chan struct{}),
blockStore: blockStore, blockStore: blockStore,
mempoolReactor: mempoolReactor, mempoolReactor: mempoolReactor,
newStepCh: make(chan *RoundState, 10), newStepCh: make(chan *RoundState, 10),
@ -316,6 +312,7 @@ func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReacto
// We do that upon Start(). // We do that upon Start().
cs.maybeRebond() cs.maybeRebond()
cs.reconstructLastCommit(state) cs.reconstructLastCommit(state)
cs.BaseService = *NewBaseService(log, "ConsensusState", cs)
return cs return cs
} }
@ -363,11 +360,12 @@ func (cs *ConsensusState) NewStepCh() chan *RoundState {
return cs.newStepCh return cs.newStepCh
} }
func (cs *ConsensusState) Start() {
if atomic.CompareAndSwapUint32(&cs.started, 0, 1) {
log.Notice("Starting ConsensusState")
cs.scheduleRound0(cs.Height)
}
func (cs *ConsensusState) AfterStart() {
cs.scheduleRound0(cs.Height)
}
func (cs *ConsensusState) AfterStop() {
// It's mostly asynchronous so, there's not much to stop.
} }
// EnterNewRound(height, 0) at cs.StartTime. // EnterNewRound(height, 0) at cs.StartTime.
@ -382,13 +380,6 @@ func (cs *ConsensusState) scheduleRound0(height int) {
}() }()
} }
func (cs *ConsensusState) Stop() {
if atomic.CompareAndSwapUint32(&cs.stopped, 0, 1) {
log.Notice("Stopping ConsensusState")
close(cs.quit)
}
}
// Updates ConsensusState and increments height to match that of state. // Updates ConsensusState and increments height to match that of state.
// The round becomes 0 and cs.Step becomes RoundStepNewHeight. // The round becomes 0 and cs.Step becomes RoundStepNewHeight.
func (cs *ConsensusState) updateToState(state *sm.State, contiguous bool) { func (cs *ConsensusState) updateToState(state *sm.State, contiguous bool) {


+ 16
- 15
events/events.go View File

@ -2,7 +2,8 @@ package events
import ( import (
"sync" "sync"
"sync/atomic"
. "github.com/tendermint/tendermint/common"
) )
// reactors and other modules should export // reactors and other modules should export
@ -17,27 +18,27 @@ type Fireable interface {
} }
type EventSwitch struct { type EventSwitch struct {
BaseService
mtx sync.RWMutex mtx sync.RWMutex
eventCells map[string]*eventCell eventCells map[string]*eventCell
listeners map[string]*eventListener listeners map[string]*eventListener
running uint32
quit chan struct{}
} }
func (evsw *EventSwitch) Start() {
if atomic.CompareAndSwapUint32(&evsw.running, 0, 1) {
evsw.eventCells = make(map[string]*eventCell)
evsw.listeners = make(map[string]*eventListener)
evsw.quit = make(chan struct{})
}
func NewEventSwitch() *EventSwitch {
evsw := &EventSwitch{}
evsw.BaseService = *NewBaseService(log, "EventSwitch", evsw)
return evsw
} }
func (evsw *EventSwitch) Stop() {
if atomic.CompareAndSwapUint32(&evsw.running, 1, 0) {
evsw.eventCells = nil
evsw.listeners = nil
close(evsw.quit)
}
func (evsw *EventSwitch) AfterStart() {
evsw.eventCells = make(map[string]*eventCell)
evsw.listeners = make(map[string]*eventListener)
}
func (evsw *EventSwitch) AfterStop() {
evsw.eventCells = nil
evsw.listeners = nil
} }
func (evsw *EventSwitch) AddListenerForEvent(listenerId, event string, cb eventCallback) { func (evsw *EventSwitch) AddListenerForEvent(listenerId, event string, cb eventCallback) {


+ 7
- 0
events/log.go View File

@ -0,0 +1,7 @@
package events
import (
"github.com/tendermint/tendermint/logger"
)
var log = logger.New("module", "events")

+ 7
- 22
mempool/reactor.go View File

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"reflect" "reflect"
"sync/atomic"
"github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
@ -19,11 +18,9 @@ var (
// MempoolReactor handles mempool tx broadcasting amongst peers. // MempoolReactor handles mempool tx broadcasting amongst peers.
type MempoolReactor struct { type MempoolReactor struct {
sw *p2p.Switch
quit chan struct{}
started uint32
stopped uint32
p2p.BaseReactor
sw *p2p.Switch
Mempool *Mempool Mempool *Mempool
evsw events.Fireable evsw events.Fireable
@ -31,27 +28,15 @@ type MempoolReactor struct {
func NewMempoolReactor(mempool *Mempool) *MempoolReactor { func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
memR := &MempoolReactor{ memR := &MempoolReactor{
quit: make(chan struct{}),
Mempool: mempool, Mempool: mempool,
} }
memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR)
return memR return memR
} }
// Implements Reactor
func (memR *MempoolReactor) Start(sw *p2p.Switch) {
if atomic.CompareAndSwapUint32(&memR.started, 0, 1) {
memR.sw = sw
log.Notice("Starting MempoolReactor")
}
}
// func (memR *MempoolReactor) AfterStart() {}
// Implements Reactor
func (memR *MempoolReactor) Stop() {
if atomic.CompareAndSwapUint32(&memR.stopped, 0, 1) {
log.Notice("Stopping MempoolReactor")
close(memR.quit)
}
}
// func (memR *MempoolReactor) AfterStop() {}
// Implements Reactor // Implements Reactor
func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
@ -93,7 +78,7 @@ func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
// Share tx. // Share tx.
// We use a simple shotgun approach for now. // We use a simple shotgun approach for now.
// TODO: improve efficiency // TODO: improve efficiency
for _, peer := range memR.sw.Peers().List() {
for _, peer := range memR.Switch.Peers().List() {
if peer.Key == src.Key { if peer.Key == src.Key {
continue continue
} }
@ -111,7 +96,7 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
return err return err
} }
msg := &TxMessage{Tx: tx} msg := &TxMessage{Tx: tx}
memR.sw.Broadcast(MempoolChannel, msg)
memR.Switch.Broadcast(MempoolChannel, msg)
return nil return nil
} }


+ 6
- 6
node/node.go View File

@ -97,28 +97,28 @@ func NewNode() *Node {
privKey := acm.GenPrivKeyEd25519() privKey := acm.GenPrivKeyEd25519()
// Make event switch // Make event switch
eventSwitch := new(events.EventSwitch)
eventSwitch := events.NewEventSwitch()
eventSwitch.Start() eventSwitch.Start()
// Get PEXReactor
// Make PEXReactor
book := p2p.NewAddrBook(config.GetString("addrbook_file")) book := p2p.NewAddrBook(config.GetString("addrbook_file"))
pexReactor := p2p.NewPEXReactor(book) pexReactor := p2p.NewPEXReactor(book)
// Get BlockchainReactor
// Make BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), blockStore, config.GetBool("fast_sync")) bcReactor := bc.NewBlockchainReactor(state.Copy(), blockStore, config.GetBool("fast_sync"))
// Get MempoolReactor
// Make MempoolReactor
mempool := mempl.NewMempool(state.Copy()) mempool := mempl.NewMempool(state.Copy())
mempoolReactor := mempl.NewMempoolReactor(mempool) mempoolReactor := mempl.NewMempoolReactor(mempool)
// Get ConsensusReactor
// Make ConsensusReactor
consensusState := consensus.NewConsensusState(state.Copy(), blockStore, mempoolReactor) consensusState := consensus.NewConsensusState(state.Copy(), blockStore, mempoolReactor)
consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, config.GetBool("fast_sync")) consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, config.GetBool("fast_sync"))
if privValidator != nil { if privValidator != nil {
consensusReactor.SetPrivValidator(privValidator) consensusReactor.SetPrivValidator(privValidator)
} }
// Make Switch
// Make p2p network switch
sw := p2p.NewSwitch() sw := p2p.NewSwitch()
sw.AddReactor("PEX", pexReactor) sw.AddReactor("PEX", pexReactor)
sw.AddReactor("MEMPOOL", mempoolReactor) sw.AddReactor("MEMPOOL", mempoolReactor)


+ 12
- 22
p2p/addrbook.go View File

@ -12,7 +12,6 @@ import (
"net" "net"
"os" "os"
"sync" "sync"
"sync/atomic"
"time" "time"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
@ -74,19 +73,17 @@ const (
/* AddrBook - concurrency safe peer address manager */ /* AddrBook - concurrency safe peer address manager */
type AddrBook struct { type AddrBook struct {
filePath string
QuitService
mtx sync.Mutex mtx sync.Mutex
filePath string
rand *rand.Rand rand *rand.Rand
key string key string
ourAddrs map[string]*NetAddress ourAddrs map[string]*NetAddress
addrLookup map[string]*knownAddress // new & old addrLookup map[string]*knownAddress // new & old
addrNew []map[string]*knownAddress addrNew []map[string]*knownAddress
addrOld []map[string]*knownAddress addrOld []map[string]*knownAddress
started uint32
stopped uint32
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{}
nOld int nOld int
nNew int nNew int
} }
@ -98,15 +95,15 @@ const (
// Use Start to begin processing asynchronous address updates. // Use Start to begin processing asynchronous address updates.
func NewAddrBook(filePath string) *AddrBook { func NewAddrBook(filePath string) *AddrBook {
am := AddrBook{
am := &AddrBook{
rand: rand.New(rand.NewSource(time.Now().UnixNano())), rand: rand.New(rand.NewSource(time.Now().UnixNano())),
ourAddrs: make(map[string]*NetAddress), ourAddrs: make(map[string]*NetAddress),
addrLookup: make(map[string]*knownAddress), addrLookup: make(map[string]*knownAddress),
quit: make(chan struct{}),
filePath: filePath, filePath: filePath,
} }
am.init() am.init()
return &am
am.QuitService = *NewQuitService(log, "AddrBook", am)
return am
} }
// When modifying this, don't forget to update loadFromFile() // When modifying this, don't forget to update loadFromFile()
@ -124,21 +121,14 @@ func (a *AddrBook) init() {
} }
} }
func (a *AddrBook) Start() {
if atomic.CompareAndSwapUint32(&a.started, 0, 1) {
log.Notice("Starting AddrBook")
a.loadFromFile(a.filePath)
a.wg.Add(1)
go a.saveRoutine()
}
func (a *AddrBook) AfterStart() {
a.loadFromFile(a.filePath)
a.wg.Add(1)
go a.saveRoutine()
} }
func (a *AddrBook) Stop() {
if atomic.CompareAndSwapUint32(&a.stopped, 0, 1) {
log.Notice("Stopping AddrBook")
close(a.quit)
a.wg.Wait()
}
func (a *AddrBook) AfterStop() {
a.wg.Wait()
} }
func (a *AddrBook) AddOurAddress(addr *NetAddress) { func (a *AddrBook) AddOurAddress(addr *NetAddress) {
@ -381,7 +371,7 @@ out:
case <-dumpAddressTicker.C: case <-dumpAddressTicker.C:
log.Info("Saving AddrBook to file", "size", a.Size()) log.Info("Saving AddrBook to file", "size", a.Size())
a.saveToFile(a.filePath) a.saveToFile(a.filePath)
case <-a.quit:
case <-a.Quit:
break out break out
} }
} }


+ 6
- 8
p2p/connection.go View File

@ -122,12 +122,12 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei
mconn.channels = channels mconn.channels = channels
mconn.channelsIdx = channelsIdx mconn.channelsIdx = channelsIdx
mconn.BaseService = *NewBaseService("MConnection", mconn, mconn.onStart, mconn.onStop)
mconn.BaseService = *NewBaseService(log, "MConnection", mconn)
return mconn return mconn
} }
func (c *MConnection) onStart() {
func (c *MConnection) AfterStart() {
c.quit = make(chan struct{}) c.quit = make(chan struct{})
go c.sendRoutine() go c.sendRoutine()
go c.recvRoutine() go c.recvRoutine()
@ -136,7 +136,7 @@ func (c *MConnection) onStart() {
c.chStatsTimer = NewRepeatTimer("chStats", updateStatsSeconds*time.Second) c.chStatsTimer = NewRepeatTimer("chStats", updateStatsSeconds*time.Second)
} }
func (c *MConnection) onStop() {
func (c *MConnection) AfterStop() {
c.flushTimer.Stop() c.flushTimer.Stop()
c.pingTimer.Stop() c.pingTimer.Stop()
c.chStatsTimer.Stop() c.chStatsTimer.Stop()
@ -391,7 +391,7 @@ FOR_LOOP:
pktType := binary.ReadByte(c.bufReader, &n, &err) pktType := binary.ReadByte(c.bufReader, &n, &err)
c.recvMonitor.Update(int(n)) c.recvMonitor.Update(int(n))
if err != nil { if err != nil {
if !c.IsRunning() {
if c.IsRunning() {
log.Warn("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err) log.Warn("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err)
c.stopForError(err) c.stopForError(err)
} }
@ -412,8 +412,7 @@ FOR_LOOP:
binary.ReadBinaryPtr(&pkt, c.bufReader, &n, &err) binary.ReadBinaryPtr(&pkt, c.bufReader, &n, &err)
c.recvMonitor.Update(int(n)) c.recvMonitor.Update(int(n))
if err != nil { if err != nil {
if !c.IsRunning() {
if c.IsRunning() {
log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err) log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err)
c.stopForError(err) c.stopForError(err)
} }
@ -425,8 +424,7 @@ FOR_LOOP:
} }
msgBytes, err := channel.recvMsgPacket(pkt) msgBytes, err := channel.recvMsgPacket(pkt)
if err != nil { if err != nil {
if !c.IsRunning() {
if c.IsRunning() {
log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err) log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err)
c.stopForError(err) c.stopForError(err)
} }


+ 13
- 12
p2p/listener.go View File

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
"sync/atomic"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/p2p/upnp" "github.com/tendermint/tendermint/p2p/upnp"
@ -15,16 +14,17 @@ type Listener interface {
InternalAddress() *NetAddress InternalAddress() *NetAddress
ExternalAddress() *NetAddress ExternalAddress() *NetAddress
String() string String() string
Stop()
Stop() bool
} }
// Implements Listener // Implements Listener
type DefaultListener struct { type DefaultListener struct {
BaseService
listener net.Listener listener net.Listener
intAddr *NetAddress intAddr *NetAddress
extAddr *NetAddress extAddr *NetAddress
connections chan net.Conn connections chan net.Conn
stopped uint32
} }
const ( const (
@ -92,10 +92,17 @@ SKIP_UPNP:
extAddr: extAddr, extAddr: extAddr,
connections: make(chan net.Conn, numBufferedConnections), connections: make(chan net.Conn, numBufferedConnections),
} }
dl.BaseService = *NewBaseService(log, "DefaultListener", dl)
dl.Start() // Started upon construction
return dl
}
go dl.listenRoutine()
func (l *DefaultListener) AfterStart() {
go l.listenRoutine()
}
return dl
func (l *DefaultListener) AfterStop() {
l.listener.Close()
} }
// Accept connections and pass on the channel // Accept connections and pass on the channel
@ -103,7 +110,7 @@ func (l *DefaultListener) listenRoutine() {
for { for {
conn, err := l.listener.Accept() conn, err := l.listener.Accept()
if atomic.LoadUint32(&l.stopped) == 1 {
if !l.IsRunning() {
break // Go to cleanup break // Go to cleanup
} }
@ -143,12 +150,6 @@ func (l *DefaultListener) NetListener() net.Listener {
return l.listener return l.listener
} }
func (l *DefaultListener) Stop() {
if atomic.CompareAndSwapUint32(&l.stopped, 0, 1) {
l.listener.Close()
}
}
func (l *DefaultListener) String() string { func (l *DefaultListener) String() string {
return fmt.Sprintf("Listener(@%v)", l.extAddr) return fmt.Sprintf("Listener(@%v)", l.extAddr)
} }


+ 4
- 5
p2p/peer.go View File

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"sync/atomic"
"github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
@ -58,7 +57,7 @@ func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactor
reactor.Receive(chId, p, msgBytes) reactor.Receive(chId, p, msgBytes)
} }
onError := func(r interface{}) { onError := func(r interface{}) {
p.stop()
p.Stop()
onPeerError(p, r) onPeerError(p, r)
} }
mconn := NewMConnection(conn, chDescs, onReceive, onError) mconn := NewMConnection(conn, chDescs, onReceive, onError)
@ -69,15 +68,15 @@ func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactor
Key: peerNodeInfo.PubKey.KeyString(), Key: peerNodeInfo.PubKey.KeyString(),
Data: NewCMap(), Data: NewCMap(),
} }
p.BaseService = *NewBaseService("Peer", p, p.onStart, p.onStop)
p.BaseService = *NewBaseService(log, "Peer", p)
return p return p
} }
func (p *Peer) onStart() {
func (p *Peer) AfterStart() {
p.mconn.Start() p.mconn.Start()
} }
func (p *Peer) onStop() {
func (p *Peer) AfterStop() {
p.mconn.Stop() p.mconn.Stop()
} }


+ 12
- 26
p2p/pex_reactor.go View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"reflect" "reflect"
"sync/atomic"
"time" "time"
"github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/binary"
@ -27,39 +26,26 @@ PEXReactor handles PEX (peer exchange) and ensures that an
adequate number of peers are connected to the switch. adequate number of peers are connected to the switch.
*/ */
type PEXReactor struct { type PEXReactor struct {
sw *Switch
quit chan struct{}
started uint32
stopped uint32
BaseReactor
sw *Switch
book *AddrBook book *AddrBook
evsw events.Fireable evsw events.Fireable
} }
func NewPEXReactor(book *AddrBook) *PEXReactor { func NewPEXReactor(book *AddrBook) *PEXReactor {
pexR := &PEXReactor{ pexR := &PEXReactor{
quit: make(chan struct{}),
book: book, book: book,
} }
pexR.BaseReactor = *NewBaseReactor(log, "PEXReactor", pexR)
return pexR return pexR
} }
// Implements Reactor
func (pexR *PEXReactor) Start(sw *Switch) {
if atomic.CompareAndSwapUint32(&pexR.started, 0, 1) {
log.Notice("Starting PEXReactor")
pexR.sw = sw
go pexR.ensurePeersRoutine()
}
func (pexR *PEXReactor) AfterStart() {
go pexR.ensurePeersRoutine()
} }
// Implements Reactor
func (pexR *PEXReactor) Stop() {
if atomic.CompareAndSwapUint32(&pexR.stopped, 0, 1) {
log.Notice("Stopping PEXReactor")
close(pexR.quit)
}
func (pexR *PEXReactor) AfterStop() {
} }
// Implements Reactor // Implements Reactor
@ -147,7 +133,7 @@ FOR_LOOP:
select { select {
case <-timer.Ch: case <-timer.Ch:
pexR.ensurePeers() pexR.ensurePeers()
case <-pexR.quit:
case <-pexR.Quit:
break FOR_LOOP break FOR_LOOP
} }
} }
@ -158,7 +144,7 @@ FOR_LOOP:
// Ensures that sufficient peers are connected. (once) // Ensures that sufficient peers are connected. (once)
func (pexR *PEXReactor) ensurePeers() { func (pexR *PEXReactor) ensurePeers() {
numOutPeers, _, numDialing := pexR.sw.NumPeers()
numOutPeers, _, numDialing := pexR.Switch.NumPeers()
numToDial := minNumOutboundPeers - (numOutPeers + numDialing) numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
log.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial) log.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
if numToDial <= 0 { if numToDial <= 0 {
@ -179,8 +165,8 @@ func (pexR *PEXReactor) ensurePeers() {
break break
} }
alreadySelected := toDial.Has(try.IP.String()) alreadySelected := toDial.Has(try.IP.String())
alreadyDialing := pexR.sw.IsDialing(try)
alreadyConnected := pexR.sw.Peers().Has(try.IP.String())
alreadyDialing := pexR.Switch.IsDialing(try)
alreadyConnected := pexR.Switch.Peers().Has(try.IP.String())
if alreadySelected || alreadyDialing || alreadyConnected { if alreadySelected || alreadyDialing || alreadyConnected {
/* /*
log.Info("Cannot dial address", "addr", try, log.Info("Cannot dial address", "addr", try,
@ -204,7 +190,7 @@ func (pexR *PEXReactor) ensurePeers() {
// Dial picked addresses // Dial picked addresses
for _, item := range toDial.Values() { for _, item := range toDial.Values() {
go func(picked *NetAddress) { go func(picked *NetAddress) {
_, err := pexR.sw.DialPeerWithAddress(picked)
_, err := pexR.Switch.DialPeerWithAddress(picked)
if err != nil { if err != nil {
pexR.book.MarkAttempt(picked) pexR.book.MarkAttempt(picked)
} }
@ -213,7 +199,7 @@ func (pexR *PEXReactor) ensurePeers() {
// If we need more addresses, pick a random peer and ask for more. // If we need more addresses, pick a random peer and ask for more.
if pexR.book.NeedMoreAddrs() { if pexR.book.NeedMoreAddrs() {
if peers := pexR.sw.Peers().List(); len(peers) > 0 {
if peers := pexR.Switch.Peers().List(); len(peers) > 0 {
i := rand.Int() % len(peers) i := rand.Int() % len(peers)
peer := peers[i] peer := peers[i]
log.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer) log.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer)


+ 57
- 46
p2p/switch.go View File

@ -5,17 +5,18 @@ import (
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
"sync/atomic"
"time" "time"
"github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/log15"
acm "github.com/tendermint/tendermint/account" acm "github.com/tendermint/tendermint/account"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
type Reactor interface { type Reactor interface {
Start(sw *Switch)
Stop()
Service // Start, Stop
SetSwitch(*Switch)
GetChannels() []*ChannelDescriptor GetChannels() []*ChannelDescriptor
AddPeer(peer *Peer) AddPeer(peer *Peer)
RemovePeer(peer *Peer, reason interface{}) RemovePeer(peer *Peer, reason interface{})
@ -24,14 +25,25 @@ type Reactor interface {
//-------------------------------------- //--------------------------------------
type BaseReactor struct{}
type BaseReactor struct {
QuitService // Provides Start, Stop, .Quit
Switch *Switch
}
func NewBaseReactor(log log15.Logger, name string, impl Reactor) *BaseReactor {
return &BaseReactor{
QuitService: *NewQuitService(log, name, impl),
Switch: nil,
}
}
func (_ BaseReactor) Start(sw *Switch) {}
func (_ BaseReactor) Stop() {}
func (_ BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
func (_ BaseReactor) AddPeer(peer *Peer) {}
func (_ BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
func (_ BaseReactor) Receive(chId byte, peer *Peer, msgBytes []byte) {}
func (br *BaseReactor) SetSwitch(sw *Switch) {
br.Switch = sw
}
func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
func (_ *BaseReactor) AddPeer(peer *Peer) {}
func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
func (_ *BaseReactor) Receive(chId byte, peer *Peer, msgBytes []byte) {}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -42,13 +54,14 @@ or more `Channels`. So while sending outgoing messages is typically performed o
incoming messages are received on the reactor. incoming messages are received on the reactor.
*/ */
type Switch struct { type Switch struct {
BaseService
listeners []Listener listeners []Listener
reactors map[string]Reactor reactors map[string]Reactor
chDescs []*ChannelDescriptor chDescs []*ChannelDescriptor
reactorsByCh map[byte]Reactor reactorsByCh map[byte]Reactor
peers *PeerSet peers *PeerSet
dialing *CMap dialing *CMap
running uint32
nodeInfo *types.NodeInfo // our node info nodeInfo *types.NodeInfo // our node info
nodePrivKey acm.PrivKeyEd25519 // our node privkey nodePrivKey acm.PrivKeyEd25519 // our node privkey
} }
@ -71,9 +84,9 @@ func NewSwitch() *Switch {
reactorsByCh: make(map[byte]Reactor), reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(), peers: NewPeerSet(),
dialing: NewCMap(), dialing: NewCMap(),
running: 0,
nodeInfo: nil, nodeInfo: nil,
} }
sw.BaseService = *NewBaseService(log, "P2P Switch", sw)
return sw return sw
} }
@ -91,6 +104,7 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
sw.reactorsByCh[chId] = reactor sw.reactorsByCh[chId] = reactor
} }
sw.reactors[name] = reactor sw.reactors[name] = reactor
reactor.SetSwitch(sw)
return reactor return reactor
} }
@ -138,39 +152,36 @@ func (sw *Switch) SetNodePrivKey(nodePrivKey acm.PrivKeyEd25519) {
} }
} }
func (sw *Switch) Start() {
if atomic.CompareAndSwapUint32(&sw.running, 0, 1) {
// Start reactors
for _, reactor := range sw.reactors {
reactor.Start(sw)
}
// Start peers
for _, peer := range sw.peers.List() {
sw.startInitPeer(peer)
}
// Start listeners
for _, listener := range sw.listeners {
go sw.listenerRoutine(listener)
}
// Switch.Start() starts all the reactors, peers, and listeners.
func (sw *Switch) AfterStart() {
// Start reactors
for _, reactor := range sw.reactors {
reactor.Start()
}
// Start peers
for _, peer := range sw.peers.List() {
sw.startInitPeer(peer)
}
// Start listeners
for _, listener := range sw.listeners {
go sw.listenerRoutine(listener)
} }
} }
func (sw *Switch) Stop() {
if atomic.CompareAndSwapUint32(&sw.running, 1, 0) {
// Stop listeners
for _, listener := range sw.listeners {
listener.Stop()
}
sw.listeners = nil
// Stop peers
for _, peer := range sw.peers.List() {
peer.stop()
}
sw.peers = NewPeerSet()
// Stop reactors
for _, reactor := range sw.reactors {
reactor.Stop()
}
func (sw *Switch) AfterStop() {
// Stop listeners
for _, listener := range sw.listeners {
listener.Stop()
}
sw.listeners = nil
// Stop peers
for _, peer := range sw.peers.List() {
peer.Stop()
}
sw.peers = NewPeerSet()
// Stop reactors
for _, reactor := range sw.reactors {
reactor.Stop()
} }
} }
@ -230,7 +241,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er
// remove deadline and start peer // remove deadline and start peer
conn.SetDeadline(time.Time{}) conn.SetDeadline(time.Time{})
if atomic.LoadUint32(&sw.running) == 1 {
if sw.IsRunning() {
sw.startInitPeer(peer) sw.startInitPeer(peer)
} }
@ -239,7 +250,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er
} }
func (sw *Switch) startInitPeer(peer *Peer) { func (sw *Switch) startInitPeer(peer *Peer) {
peer.start() // spawn send/recv routines
peer.Start() // spawn send/recv routines
sw.addPeerToReactors(peer) // run AddPeer on each reactor sw.addPeerToReactors(peer) // run AddPeer on each reactor
} }
@ -304,7 +315,7 @@ func (sw *Switch) Peers() IPeerSet {
func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
log.Notice("Stopping peer for error", "peer", peer, "error", reason) log.Notice("Stopping peer for error", "peer", peer, "error", reason)
sw.peers.Remove(peer) sw.peers.Remove(peer)
peer.stop()
peer.Stop()
sw.removePeerFromReactors(peer, reason) sw.removePeerFromReactors(peer, reason)
} }
@ -313,7 +324,7 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
func (sw *Switch) StopPeerGracefully(peer *Peer) { func (sw *Switch) StopPeerGracefully(peer *Peer) {
log.Notice("Stopping peer gracefully") log.Notice("Stopping peer gracefully")
sw.peers.Remove(peer) sw.peers.Remove(peer)
peer.stop()
peer.Stop()
sw.removePeerFromReactors(peer, nil) sw.removePeerFromReactors(peer, nil)
} }


+ 7
- 9
p2p/switch_test.go View File

@ -19,6 +19,8 @@ type PeerMessage struct {
} }
type TestReactor struct { type TestReactor struct {
BaseReactor
mtx sync.Mutex mtx sync.Mutex
channels []*ChannelDescriptor channels []*ChannelDescriptor
peersAdded []*Peer peersAdded []*Peer
@ -29,17 +31,13 @@ type TestReactor struct {
} }
func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor { func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
return &TestReactor{
tr := &TestReactor{
channels: channels, channels: channels,
logMessages: logMessages, logMessages: logMessages,
msgsReceived: make(map[byte][]PeerMessage), msgsReceived: make(map[byte][]PeerMessage),
} }
}
func (tr *TestReactor) Start(sw *Switch) {
}
func (tr *TestReactor) Stop() {
tr.BaseReactor = *NewBaseReactor(log, "TestReactor", tr)
return tr
} }
func (tr *TestReactor) GetChannels() []*ChannelDescriptor { func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
@ -132,11 +130,11 @@ func TestSwitches(t *testing.T) {
sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
&ChannelDescriptor{Id: byte(0x00), Priority: 10}, &ChannelDescriptor{Id: byte(0x00), Priority: 10},
&ChannelDescriptor{Id: byte(0x01), Priority: 10}, &ChannelDescriptor{Id: byte(0x01), Priority: 10},
}, true)).Start(sw) // Start the reactor
}, true)).Start() // Start the reactor
sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
&ChannelDescriptor{Id: byte(0x02), Priority: 10}, &ChannelDescriptor{Id: byte(0x02), Priority: 10},
&ChannelDescriptor{Id: byte(0x03), Priority: 10}, &ChannelDescriptor{Id: byte(0x03), Priority: 10},
}, true)).Start(sw) // Start the reactor
}, true)).Start() // Start the reactor
return sw return sw
}) })
defer s1.Stop() defer s1.Stop()


+ 3
- 3
p2p/upnp/upnp.go View File

@ -126,7 +126,7 @@ type ExternalIPAddress struct {
IP string IP string
} }
type Service struct {
type UPNPService struct {
ServiceType string `xml:"serviceType"` ServiceType string `xml:"serviceType"`
ControlURL string `xml:"controlURL"` ControlURL string `xml:"controlURL"`
} }
@ -136,7 +136,7 @@ type DeviceList struct {
} }
type ServiceList struct { type ServiceList struct {
Service []Service `xml:"service"`
Service []UPNPService `xml:"service"`
} }
type Device struct { type Device struct {
@ -160,7 +160,7 @@ func getChildDevice(d *Device, deviceType string) *Device {
return nil return nil
} }
func getChildService(d *Device, serviceType string) *Service {
func getChildService(d *Device, serviceType string) *UPNPService {
sl := d.ServiceList.Service sl := d.ServiceList.Service
for i := 0; i < len(sl); i++ { for i := 0; i < len(sl); i++ {
if strings.Index(sl[i].ServiceType, serviceType) >= 0 { if strings.Index(sl[i].ServiceType, serviceType) >= 0 {


+ 21
- 27
rpc/server/handlers.go View File

@ -9,11 +9,11 @@ import (
"net/http" "net/http"
"reflect" "reflect"
"sort" "sort"
"sync/atomic"
"time" "time"
"github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/gorilla/websocket" "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/gorilla/websocket"
"github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/events" "github.com/tendermint/tendermint/events"
. "github.com/tendermint/tendermint/rpc/types" . "github.com/tendermint/tendermint/rpc/types"
) )
@ -220,50 +220,43 @@ const (
// contains listener id, underlying ws connection, // contains listener id, underlying ws connection,
// and the event switch for subscribing to events // and the event switch for subscribing to events
type WSConnection struct { type WSConnection struct {
QuitService
id string id string
wsConn *websocket.Conn wsConn *websocket.Conn
writeChan chan WSResponse writeChan chan WSResponse
quitChan chan struct{}
failedSends int failedSends int
started uint32
stopped uint32
evsw *events.EventSwitch evsw *events.EventSwitch
} }
// new websocket connection wrapper // new websocket connection wrapper
func NewWSConnection(wsConn *websocket.Conn) *WSConnection { func NewWSConnection(wsConn *websocket.Conn) *WSConnection {
return &WSConnection{
con := &WSConnection{
id: wsConn.RemoteAddr().String(), id: wsConn.RemoteAddr().String(),
wsConn: wsConn, wsConn: wsConn,
writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full writeChan: make(chan WSResponse, WriteChanBufferSize), // buffered. we keep track when its full
quitChan: make(chan struct{}),
} }
con.QuitService = *NewQuitService(log, "WSConnection", con)
return con
} }
// start the connection and hand her the event switch
func (con *WSConnection) Start(evsw *events.EventSwitch) {
if atomic.CompareAndSwapUint32(&con.started, 0, 1) {
con.evsw = evsw
// read subscriptions/unsubscriptions to events
go con.read()
// write responses
con.write()
}
func (con *WSConnection) AfterStart() {
// read subscriptions/unsubscriptions to events
go con.read()
// write responses
con.write()
} }
// close the connection
func (con *WSConnection) Stop() {
if atomic.CompareAndSwapUint32(&con.stopped, 0, 1) {
con.evsw.RemoveListener(con.id)
close(con.quitChan)
// the write loop closes the websocket connection
// when it exits its loop, and the read loop
// closes the writeChan
}
func (con *WSConnection) AfterStop() {
con.evsw.RemoveListener(con.id)
// the write loop closes the websocket connection
// when it exits its loop, and the read loop
// closes the writeChan
} }
func (con *WSConnection) SetEventSwitch(evsw *events.EventSwitch) { con.evsw = evsw }
// attempt to write response to writeChan and record failures // attempt to write response to writeChan and record failures
func (con *WSConnection) safeWrite(resp WSResponse) { func (con *WSConnection) safeWrite(resp WSResponse) {
select { select {
@ -351,7 +344,7 @@ func (con *WSConnection) write() {
return return
} }
} }
case <-con.quitChan:
case <-con.Quit:
return return
} }
} }
@ -389,7 +382,8 @@ func (wm *WebsocketManager) websocketHandler(w http.ResponseWriter, r *http.Requ
// register connection // register connection
con := NewWSConnection(wsConn) con := NewWSConnection(wsConn)
log.Notice("New websocket connection", "origin", con.id) log.Notice("New websocket connection", "origin", con.id)
con.Start(wm.evsw)
con.SetEventSwitch(wm.evsw)
con.Start()
} }
// rpc.websocket // rpc.websocket


+ 1
- 1
state/permissions_test.go View File

@ -1104,7 +1104,7 @@ func TestSNativeCallTx(t *testing.T) {
// run ExecTx and wait for the Receive event on given addr // run ExecTx and wait for the Receive event on given addr
// returns the msg data and an error/exception // returns the msg data and an error/exception
func execTxWaitEvent(t *testing.T, blockCache *BlockCache, tx types.Tx, eventid string) (interface{}, string) { func execTxWaitEvent(t *testing.T, blockCache *BlockCache, tx types.Tx, eventid string) (interface{}, string) {
evsw := new(events.EventSwitch)
evsw := events.NewEventSwitch()
evsw.Start() evsw.Start()
ch := make(chan interface{}) ch := make(chan interface{})
evsw.AddListenerForEvent("test", eventid, func(msg interface{}) { evsw.AddListenerForEvent("test", eventid, func(msg interface{}) {


+ 1
- 1
vm/test/log_event_test.go View File

@ -35,7 +35,7 @@ func TestLog4(t *testing.T) {
ourVm := NewVM(st, newParams(), Zero256, nil) ourVm := NewVM(st, newParams(), Zero256, nil)
eventSwitch := &events.EventSwitch{}
eventSwitch := events.NewEventSwitch()
eventSwitch.Start() eventSwitch.Start()
eventId := types.EventStringLogEvent(account2.Address.Postfix(20)) eventId := types.EventStringLogEvent(account2.Address.Postfix(20))


+ 1
- 1
vm/test/vm_test.go View File

@ -154,7 +154,7 @@ func TestSendCall(t *testing.T) {
// subscribes to an AccReceive, runs the vm, returns the exception // subscribes to an AccReceive, runs the vm, returns the exception
func runVMWaitEvents(t *testing.T, ourVm *VM, caller, callee *Account, subscribeAddr, contractCode []byte, gas int64) string { func runVMWaitEvents(t *testing.T, ourVm *VM, caller, callee *Account, subscribeAddr, contractCode []byte, gas int64) string {
// we need to catch the event from the CALL to check for exceptions // we need to catch the event from the CALL to check for exceptions
evsw := new(events.EventSwitch)
evsw := events.NewEventSwitch()
evsw.Start() evsw.Start()
ch := make(chan interface{}) ch := make(chan interface{})
fmt.Printf("subscribe to %x\n", subscribeAddr) fmt.Printf("subscribe to %x\n", subscribeAddr)


Loading…
Cancel
Save