Browse Source

started/stopped -> running; contiguous vs fast forward ConsensusState

updates.
pull/43/merge
Jae Kwon 10 years ago
parent
commit
788f9bfb93
5 changed files with 48 additions and 31 deletions
  1. +12
    -8
      blockchain/reactor.go
  2. +22
    -8
      consensus/reactor.go
  3. +4
    -4
      consensus/state.go
  4. +1
    -1
      daemon/daemon.go
  5. +9
    -10
      p2p/peer.go

+ 12
- 8
blockchain/reactor.go View File

@ -35,8 +35,7 @@ type BlockchainReactor struct {
timeoutsCh chan string timeoutsCh chan string
lastBlock *types.Block lastBlock *types.Block
quit chan struct{} quit chan struct{}
started uint32
stopped uint32
running uint32
} }
func NewBlockchainReactor(state *sm.State, store *BlockStore) *BlockchainReactor { func NewBlockchainReactor(state *sm.State, store *BlockStore) *BlockchainReactor {
@ -57,15 +56,14 @@ func NewBlockchainReactor(state *sm.State, store *BlockStore) *BlockchainReactor
requestsCh: requestsCh, requestsCh: requestsCh,
timeoutsCh: timeoutsCh, timeoutsCh: timeoutsCh,
quit: make(chan struct{}), quit: make(chan struct{}),
started: 0,
stopped: 0,
running: uint32(0),
} }
return bcR return bcR
} }
// Implements Reactor // Implements Reactor
func (bcR *BlockchainReactor) Start(sw *p2p.Switch) { func (bcR *BlockchainReactor) Start(sw *p2p.Switch) {
if atomic.CompareAndSwapUint32(&bcR.started, 0, 1) {
if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) {
log.Info("Starting BlockchainReactor") log.Info("Starting BlockchainReactor")
bcR.sw = sw bcR.sw = sw
bcR.pool.Start() bcR.pool.Start()
@ -75,7 +73,7 @@ func (bcR *BlockchainReactor) Start(sw *p2p.Switch) {
// Implements Reactor // Implements Reactor
func (bcR *BlockchainReactor) Stop() { func (bcR *BlockchainReactor) Stop() {
if atomic.CompareAndSwapUint32(&bcR.stopped, 0, 1) {
if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) {
log.Info("Stopping BlockchainReactor") log.Info("Stopping BlockchainReactor")
close(bcR.quit) close(bcR.quit)
bcR.pool.Stop() bcR.pool.Stop()
@ -201,8 +199,14 @@ FOR_LOOP:
// method of syncing in the consensus reactor. // method of syncing in the consensus reactor.
if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute { if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute {
go func() { go func() {
bcR.sw.Reactor("BLOCKCHAIN").Stop()
bcR.sw.Reactor("CONSENSUS").Start(bcR.sw)
log.Info("Stopping blockpool syncing, turning on consensus...")
//bcR.sw.Reactor("BLOCKCHAIN").Stop()
trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others.
conR := bcR.sw.Reactor("CONSENSUS")
conR.Start(bcR.sw)
for _, peer := range bcR.sw.Peers().List() {
conR.AddPeer(peer)
}
}() }()
break FOR_LOOP break FOR_LOOP
} }


+ 22
- 8
consensus/reactor.go View File

@ -33,8 +33,7 @@ const (
// We atomically copy the RoundState struct before using it. // We atomically copy the RoundState struct before using it.
type ConsensusReactor struct { type ConsensusReactor struct {
sw *p2p.Switch sw *p2p.Switch
started uint32
stopped uint32
running uint32
quit chan struct{} quit chan struct{}
blockStore *bc.BlockStore blockStore *bc.BlockStore
@ -52,7 +51,7 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockSto
// Implements Reactor // Implements Reactor
func (conR *ConsensusReactor) Start(sw *p2p.Switch) { func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
if atomic.CompareAndSwapUint32(&conR.started, 0, 1) {
if atomic.CompareAndSwapUint32(&conR.running, 0, 1) {
log.Info("Starting ConsensusReactor") log.Info("Starting ConsensusReactor")
conR.sw = sw conR.sw = sw
conR.conS.Start() conR.conS.Start()
@ -62,15 +61,15 @@ func (conR *ConsensusReactor) Start(sw *p2p.Switch) {
// Implements Reactor // Implements Reactor
func (conR *ConsensusReactor) Stop() { func (conR *ConsensusReactor) Stop() {
if atomic.CompareAndSwapUint32(&conR.stopped, 0, 1) {
if atomic.CompareAndSwapUint32(&conR.running, 1, 0) {
log.Info("Stopping ConsensusReactor") log.Info("Stopping ConsensusReactor")
conR.conS.Stop() conR.conS.Stop()
close(conR.quit) close(conR.quit)
} }
} }
func (conR *ConsensusReactor) IsStopped() bool {
return atomic.LoadUint32(&conR.stopped) == 1
func (conR *ConsensusReactor) IsRunning() bool {
return atomic.LoadUint32(&conR.running) == 0
} }
// Implements Reactor // Implements Reactor
@ -94,6 +93,10 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
// Implements Reactor // Implements Reactor
func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
if !conR.IsRunning() {
return
}
// Create peerState for peer // Create peerState for peer
peerState := NewPeerState(peer) peerState := NewPeerState(peer)
peer.Data.Set(peerStateKey, peerState) peer.Data.Set(peerStateKey, peerState)
@ -108,11 +111,18 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
// Implements Reactor // Implements Reactor
func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
if !conR.IsRunning() {
return
}
//peer.Data.Get(peerStateKey).(*PeerState).Disconnect() //peer.Data.Get(peerStateKey).(*PeerState).Disconnect()
} }
// Implements Reactor // Implements Reactor
func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) { func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) {
if !conR.IsRunning() {
return
}
// Get round state // Get round state
rs := conR.conS.GetRoundState() rs := conR.conS.GetRoundState()
@ -215,6 +225,10 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) {
conR.conS.SetPrivValidator(priv) conR.conS.SetPrivValidator(priv)
} }
func (conR *ConsensusReactor) UpdateToState(state *sm.State) {
conR.conS.updateToState(state, false)
}
//-------------------------------------- //--------------------------------------
func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
@ -279,7 +293,7 @@ func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
OUTER_LOOP: OUTER_LOOP:
for { for {
// Manage disconnects from self or peer. // Manage disconnects from self or peer.
if peer.IsStopped() || conR.IsStopped() {
if !peer.IsRunning() || !conR.IsRunning() {
log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer)) log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer))
return return
} }
@ -382,7 +396,7 @@ func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState)
OUTER_LOOP: OUTER_LOOP:
for { for {
// Manage disconnects from self or peer. // Manage disconnects from self or peer.
if peer.IsStopped() || conR.IsStopped() {
if !peer.IsRunning() || !conR.IsRunning() {
log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer)) log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer))
return return
} }


+ 4
- 4
consensus/state.go View File

@ -256,7 +256,7 @@ func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReacto
runActionCh: make(chan RoundAction, 1), runActionCh: make(chan RoundAction, 1),
newStepCh: make(chan *RoundState, 1), newStepCh: make(chan *RoundState, 1),
} }
cs.updateToState(state)
cs.updateToState(state, true)
return cs return cs
} }
@ -457,9 +457,9 @@ ACTION_LOOP:
// If calculated round is greater than 0 (based on BlockTime or calculated StartTime) // If calculated round is greater than 0 (based on BlockTime or calculated StartTime)
// then also sets up the appropriate round, and cs.Step becomes RoundStepNewRound. // then also sets up the appropriate round, and cs.Step becomes RoundStepNewRound.
// Otherwise the round is 0 and cs.Step becomes RoundStepNewHeight. // Otherwise the round is 0 and cs.Step becomes RoundStepNewHeight.
func (cs *ConsensusState) updateToState(state *sm.State) {
func (cs *ConsensusState) updateToState(state *sm.State, contiguous bool) {
// Sanity check state. // Sanity check state.
if cs.Height > 0 && cs.Height != state.LastBlockHeight {
if contiguous && cs.Height > 0 && cs.Height != state.LastBlockHeight {
panic(Fmt("updateToState() expected state height of %v but found %v", panic(Fmt("updateToState() expected state height of %v but found %v",
cs.Height, state.LastBlockHeight)) cs.Height, state.LastBlockHeight))
} }
@ -859,7 +859,7 @@ func (cs *ConsensusState) TryFinalizeCommit(height uint) bool {
// We have the block, so save/stage/sign-commit-vote. // We have the block, so save/stage/sign-commit-vote.
cs.saveCommitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Commits) cs.saveCommitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Commits)
// Increment height. // Increment height.
cs.updateToState(cs.stagedState)
cs.updateToState(cs.stagedState, true)
// cs.Step is now RoundStepNewHeight or RoundStepNewRound // cs.Step is now RoundStepNewHeight or RoundStepNewRound
cs.newStepCh <- cs.getRoundState() cs.newStepCh <- cs.getRoundState()
return true return true


+ 1
- 1
daemon/daemon.go View File

@ -94,7 +94,7 @@ func (n *Node) Start() {
go n.inboundConnectionRoutine(l) go n.inboundConnectionRoutine(l)
} }
n.book.Start() n.book.Start()
n.sw.StartReactors()
//n.sw.StartReactors()
} }
func (n *Node) Stop() { func (n *Node) Stop() {


+ 9
- 10
p2p/peer.go View File

@ -13,8 +13,7 @@ import (
type Peer struct { type Peer struct {
outbound bool outbound bool
mconn *MConnection mconn *MConnection
started uint32
stopped uint32
running uint32
Key string Key string
Data *CMap // User data. Data *CMap // User data.
@ -37,7 +36,7 @@ func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDesc
p = &Peer{ p = &Peer{
outbound: outbound, outbound: outbound,
mconn: mconn, mconn: mconn,
stopped: 0,
running: 0,
Key: mconn.RemoteAddress.String(), Key: mconn.RemoteAddress.String(),
Data: NewCMap(), Data: NewCMap(),
} }
@ -45,21 +44,21 @@ func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDesc
} }
func (p *Peer) start() { func (p *Peer) start() {
if atomic.CompareAndSwapUint32(&p.started, 0, 1) {
if atomic.CompareAndSwapUint32(&p.running, 0, 1) {
log.Debug("Starting Peer", "peer", p) log.Debug("Starting Peer", "peer", p)
p.mconn.Start() p.mconn.Start()
} }
} }
func (p *Peer) stop() { func (p *Peer) stop() {
if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) {
if atomic.CompareAndSwapUint32(&p.running, 1, 0) {
log.Debug("Stopping Peer", "peer", p) log.Debug("Stopping Peer", "peer", p)
p.mconn.Stop() p.mconn.Stop()
} }
} }
func (p *Peer) IsStopped() bool {
return atomic.LoadUint32(&p.stopped) == 1
func (p *Peer) IsRunning() bool {
return atomic.LoadUint32(&p.running) == 1
} }
func (p *Peer) Connection() *MConnection { func (p *Peer) Connection() *MConnection {
@ -71,21 +70,21 @@ func (p *Peer) IsOutbound() bool {
} }
func (p *Peer) Send(chId byte, msg interface{}) bool { func (p *Peer) Send(chId byte, msg interface{}) bool {
if atomic.LoadUint32(&p.stopped) == 1 {
if atomic.LoadUint32(&p.running) == 0 {
return false return false
} }
return p.mconn.Send(chId, msg) return p.mconn.Send(chId, msg)
} }
func (p *Peer) TrySend(chId byte, msg interface{}) bool { func (p *Peer) TrySend(chId byte, msg interface{}) bool {
if atomic.LoadUint32(&p.stopped) == 1 {
if atomic.LoadUint32(&p.running) == 0 {
return false return false
} }
return p.mconn.TrySend(chId, msg) return p.mconn.TrySend(chId, msg)
} }
func (p *Peer) CanSend(chId byte) bool { func (p *Peer) CanSend(chId byte) bool {
if atomic.LoadUint32(&p.stopped) == 1 {
if atomic.LoadUint32(&p.running) == 0 {
return false return false
} }
return p.mconn.CanSend(chId) return p.mconn.CanSend(chId)


Loading…
Cancel
Save