@ -130,7 +130,6 @@ type BlockchainReactor struct {
p2p . BaseReactor
p2p . BaseReactor
events chan Event // XXX: Rename eventsFromPeers
events chan Event // XXX: Rename eventsFromPeers
stopDemux chan struct { }
scheduler * Routine
scheduler * Routine
processor * Routine
processor * Routine
logger log . Logger
logger log . Logger
@ -166,7 +165,6 @@ func newReactor(state state.State, store blockStore, reporter behaviour.Reporter
return & BlockchainReactor {
return & BlockchainReactor {
events : make ( chan Event , bufferSize ) ,
events : make ( chan Event , bufferSize ) ,
stopDemux : make ( chan struct { } ) ,
scheduler : newRoutine ( "scheduler" , scheduler . handle , bufferSize ) ,
scheduler : newRoutine ( "scheduler" , scheduler . handle , bufferSize ) ,
processor : newRoutine ( "processor" , processor . handle , bufferSize ) ,
processor : newRoutine ( "processor" , processor . handle , bufferSize ) ,
store : store ,
store : store ,
@ -306,19 +304,29 @@ func (r *BlockchainReactor) demux() {
processBlockFreq = 20 * time . Millisecond
processBlockFreq = 20 * time . Millisecond
doProcessBlockCh = make ( chan struct { } , 1 )
doProcessBlockCh = make ( chan struct { } , 1 )
doProcessBlockTk = time . NewTicker ( processBlockFreq )
doProcessBlockTk = time . NewTicker ( processBlockFreq )
)
defer doProcessBlockTk . Stop ( )
var (
prunePeerFreq = 1 * time . Second
prunePeerFreq = 1 * time . Second
doPrunePeerCh = make ( chan struct { } , 1 )
doPrunePeerCh = make ( chan struct { } , 1 )
doPrunePeerTk = time . NewTicker ( prunePeerFreq )
doPrunePeerTk = time . NewTicker ( prunePeerFreq )
)
defer doPrunePeerTk . Stop ( )
var (
scheduleFreq = 20 * time . Millisecond
scheduleFreq = 20 * time . Millisecond
doScheduleCh = make ( chan struct { } , 1 )
doScheduleCh = make ( chan struct { } , 1 )
doScheduleTk = time . NewTicker ( scheduleFreq )
doScheduleTk = time . NewTicker ( scheduleFreq )
)
defer doScheduleTk . Stop ( )
var (
statusFreq = 10 * time . Second
statusFreq = 10 * time . Second
doStatusCh = make ( chan struct { } , 1 )
doStatusCh = make ( chan struct { } , 1 )
doStatusTk = time . NewTicker ( statusFreq )
doStatusTk = time . NewTicker ( statusFreq )
)
)
defer doStatusTk . Stop ( )
// XXX: Extract timers to make testing atemporal
// XXX: Extract timers to make testing atemporal
for {
for {
@ -355,14 +363,20 @@ func (r *BlockchainReactor) demux() {
case <- doStatusCh :
case <- doStatusCh :
r . io . broadcastStatusRequest ( r . store . Base ( ) , r . SyncHeight ( ) )
r . io . broadcastStatusRequest ( r . store . Base ( ) , r . SyncHeight ( ) )
// Events from peers
case event := <- r . events :
// Events from peers. Closing the channel signals event loop termination.
case event , ok := <- r . events :
if ! ok {
r . logger . Info ( "Stopping event processing" )
return
}
switch event := event . ( type ) {
switch event := event . ( type ) {
case bcStatusResponse :
case bcStatusResponse :
r . setMaxPeerHeight ( event . height )
r . setMaxPeerHeight ( event . height )
r . scheduler . send ( event )
r . scheduler . send ( event )
case bcAddNewPeer , bcRemovePeer , bcBlockResponse , bcNoBlockResponse :
case bcAddNewPeer , bcRemovePeer , bcBlockResponse , bcNoBlockResponse :
r . scheduler . send ( event )
r . scheduler . send ( event )
default :
r . logger . Error ( "Received unknown event" , "event" , fmt . Sprintf ( "%T" , event ) )
}
}
// Incremental events form scheduler
// Incremental events form scheduler
@ -378,6 +392,9 @@ func (r *BlockchainReactor) demux() {
case scFinishedEv :
case scFinishedEv :
r . processor . send ( event )
r . processor . send ( event )
r . scheduler . stop ( )
r . scheduler . stop ( )
case noOpEvent :
default :
r . logger . Error ( "Received unknown scheduler event" , "event" , fmt . Sprintf ( "%T" , event ) )
}
}
// Incremental events from processor
// Incremental events from processor
@ -397,20 +414,28 @@ func (r *BlockchainReactor) demux() {
case pcFinished :
case pcFinished :
r . io . trySwitchToConsensus ( event . tmState , event . blocksSynced > 0 )
r . io . trySwitchToConsensus ( event . tmState , event . blocksSynced > 0 )
r . processor . stop ( )
r . processor . stop ( )
case noOpEvent :
default :
r . logger . Error ( "Received unknown processor event" , "event" , fmt . Sprintf ( "%T" , event ) )
}
}
// Terminal events from scheduler
// Terminal event from scheduler
case err := <- r . scheduler . final ( ) :
case err := <- r . scheduler . final ( ) :
r . logger . Info ( fmt . Sprintf ( "scheduler final %s" , err ) )
// send the processor stop?
switch err {
case nil :
r . logger . Info ( "Scheduler stopped" )
default :
r . logger . Error ( "Scheduler aborted with error" , "err" , err )
}
// Terminal event from processor
// Terminal event from processor
case event := <- r . processor . final ( ) :
r . logger . Info ( fmt . Sprintf ( "processor final %s" , event ) )
case <- r . stopDemux :
r . logger . Info ( "demuxing stopped" )
return
case err := <- r . processor . final ( ) :
switch err {
case nil :
r . logger . Info ( "Processor stopped" )
default :
r . logger . Error ( "Processor aborted with error" , "err" , err )
}
}
}
}
}
}
}
@ -421,7 +446,6 @@ func (r *BlockchainReactor) Stop() error {
r . scheduler . stop ( )
r . scheduler . stop ( )
r . processor . stop ( )
r . processor . stop ( )
close ( r . stopDemux )
close ( r . events )
close ( r . events )
r . logger . Info ( "reactor stopped" )
r . logger . Info ( "reactor stopped" )