diff --git a/blockchain/reactor.go b/blockchain/reactor.go index f80abbe34..56ae6a241 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -35,8 +35,7 @@ type BlockchainReactor struct { timeoutsCh chan string lastBlock *types.Block quit chan struct{} - started uint32 - stopped uint32 + running uint32 } func NewBlockchainReactor(state *sm.State, store *BlockStore) *BlockchainReactor { @@ -57,15 +56,14 @@ func NewBlockchainReactor(state *sm.State, store *BlockStore) *BlockchainReactor requestsCh: requestsCh, timeoutsCh: timeoutsCh, quit: make(chan struct{}), - started: 0, - stopped: 0, + running: uint32(0), } return bcR } // Implements Reactor func (bcR *BlockchainReactor) Start(sw *p2p.Switch) { - if atomic.CompareAndSwapUint32(&bcR.started, 0, 1) { + if atomic.CompareAndSwapUint32(&bcR.running, 0, 1) { log.Info("Starting BlockchainReactor") bcR.sw = sw bcR.pool.Start() @@ -75,7 +73,7 @@ func (bcR *BlockchainReactor) Start(sw *p2p.Switch) { // Implements Reactor func (bcR *BlockchainReactor) Stop() { - if atomic.CompareAndSwapUint32(&bcR.stopped, 0, 1) { + if atomic.CompareAndSwapUint32(&bcR.running, 1, 0) { log.Info("Stopping BlockchainReactor") close(bcR.quit) bcR.pool.Stop() @@ -201,8 +199,14 @@ FOR_LOOP: // method of syncing in the consensus reactor. if lastValidatedBlock != nil && time.Now().Sub(lastValidatedBlock.Time) < stopSyncingDurationMinutes*time.Minute { go func() { - bcR.sw.Reactor("BLOCKCHAIN").Stop() - bcR.sw.Reactor("CONSENSUS").Start(bcR.sw) + log.Info("Stopping blockpool syncing, turning on consensus...") + //bcR.sw.Reactor("BLOCKCHAIN").Stop() + trySyncTicker.Stop() // Just stop the block requests. Still serve blocks to others. + conR := bcR.sw.Reactor("CONSENSUS") + conR.Start(bcR.sw) + for _, peer := range bcR.sw.Peers().List() { + conR.AddPeer(peer) + } }() break FOR_LOOP } diff --git a/consensus/reactor.go b/consensus/reactor.go index bbc39c89c..30ebde879 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -33,8 +33,7 @@ const ( // We atomically copy the RoundState struct before using it. type ConsensusReactor struct { sw *p2p.Switch - started uint32 - stopped uint32 + running uint32 quit chan struct{} blockStore *bc.BlockStore @@ -52,7 +51,7 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockSto // Implements Reactor func (conR *ConsensusReactor) Start(sw *p2p.Switch) { - if atomic.CompareAndSwapUint32(&conR.started, 0, 1) { + if atomic.CompareAndSwapUint32(&conR.running, 0, 1) { log.Info("Starting ConsensusReactor") conR.sw = sw conR.conS.Start() @@ -62,15 +61,15 @@ func (conR *ConsensusReactor) Start(sw *p2p.Switch) { // Implements Reactor func (conR *ConsensusReactor) Stop() { - if atomic.CompareAndSwapUint32(&conR.stopped, 0, 1) { + if atomic.CompareAndSwapUint32(&conR.running, 1, 0) { log.Info("Stopping ConsensusReactor") conR.conS.Stop() close(conR.quit) } } -func (conR *ConsensusReactor) IsStopped() bool { - return atomic.LoadUint32(&conR.stopped) == 1 +func (conR *ConsensusReactor) IsRunning() bool { + return atomic.LoadUint32(&conR.running) == 0 } // Implements Reactor @@ -94,6 +93,10 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { // Implements Reactor func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { + if !conR.IsRunning() { + return + } + // Create peerState for peer peerState := NewPeerState(peer) peer.Data.Set(peerStateKey, peerState) @@ -108,11 +111,18 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { // Implements Reactor func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { + if !conR.IsRunning() { + return + } + //peer.Data.Get(peerStateKey).(*PeerState).Disconnect() } // Implements Reactor func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte) { + if !conR.IsRunning() { + return + } // Get round state rs := conR.conS.GetRoundState() @@ -215,6 +225,10 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *sm.PrivValidator) { conR.conS.SetPrivValidator(priv) } +func (conR *ConsensusReactor) UpdateToState(state *sm.State) { + conR.conS.updateToState(state, false) +} + //-------------------------------------- func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { @@ -279,7 +293,7 @@ func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) { OUTER_LOOP: for { // Manage disconnects from self or peer. - if peer.IsStopped() || conR.IsStopped() { + if !peer.IsRunning() || !conR.IsRunning() { log.Info(Fmt("Stopping gossipDataRoutine for %v.", peer)) return } @@ -382,7 +396,7 @@ func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) OUTER_LOOP: for { // Manage disconnects from self or peer. - if peer.IsStopped() || conR.IsStopped() { + if !peer.IsRunning() || !conR.IsRunning() { log.Info(Fmt("Stopping gossipVotesRoutine for %v.", peer)) return } diff --git a/consensus/state.go b/consensus/state.go index e2738ca98..683612496 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -256,7 +256,7 @@ func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReacto runActionCh: make(chan RoundAction, 1), newStepCh: make(chan *RoundState, 1), } - cs.updateToState(state) + cs.updateToState(state, true) return cs } @@ -457,9 +457,9 @@ ACTION_LOOP: // If calculated round is greater than 0 (based on BlockTime or calculated StartTime) // then also sets up the appropriate round, and cs.Step becomes RoundStepNewRound. // Otherwise the round is 0 and cs.Step becomes RoundStepNewHeight. -func (cs *ConsensusState) updateToState(state *sm.State) { +func (cs *ConsensusState) updateToState(state *sm.State, contiguous bool) { // Sanity check state. - if cs.Height > 0 && cs.Height != state.LastBlockHeight { + if contiguous && cs.Height > 0 && cs.Height != state.LastBlockHeight { panic(Fmt("updateToState() expected state height of %v but found %v", cs.Height, state.LastBlockHeight)) } @@ -859,7 +859,7 @@ func (cs *ConsensusState) TryFinalizeCommit(height uint) bool { // We have the block, so save/stage/sign-commit-vote. cs.saveCommitVoteBlock(cs.ProposalBlock, cs.ProposalBlockParts, cs.Commits) // Increment height. - cs.updateToState(cs.stagedState) + cs.updateToState(cs.stagedState, true) // cs.Step is now RoundStepNewHeight or RoundStepNewRound cs.newStepCh <- cs.getRoundState() return true diff --git a/daemon/daemon.go b/daemon/daemon.go index ddc310762..1b029f9d7 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -94,7 +94,7 @@ func (n *Node) Start() { go n.inboundConnectionRoutine(l) } n.book.Start() - n.sw.StartReactors() + //n.sw.StartReactors() } func (n *Node) Stop() { diff --git a/p2p/peer.go b/p2p/peer.go index 68137a63a..173297eb0 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -13,8 +13,7 @@ import ( type Peer struct { outbound bool mconn *MConnection - started uint32 - stopped uint32 + running uint32 Key string Data *CMap // User data. @@ -37,7 +36,7 @@ func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDesc p = &Peer{ outbound: outbound, mconn: mconn, - stopped: 0, + running: 0, Key: mconn.RemoteAddress.String(), Data: NewCMap(), } @@ -45,21 +44,21 @@ func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDesc } func (p *Peer) start() { - if atomic.CompareAndSwapUint32(&p.started, 0, 1) { + if atomic.CompareAndSwapUint32(&p.running, 0, 1) { log.Debug("Starting Peer", "peer", p) p.mconn.Start() } } func (p *Peer) stop() { - if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) { + if atomic.CompareAndSwapUint32(&p.running, 1, 0) { log.Debug("Stopping Peer", "peer", p) p.mconn.Stop() } } -func (p *Peer) IsStopped() bool { - return atomic.LoadUint32(&p.stopped) == 1 +func (p *Peer) IsRunning() bool { + return atomic.LoadUint32(&p.running) == 1 } func (p *Peer) Connection() *MConnection { @@ -71,21 +70,21 @@ func (p *Peer) IsOutbound() bool { } func (p *Peer) Send(chId byte, msg interface{}) bool { - if atomic.LoadUint32(&p.stopped) == 1 { + if atomic.LoadUint32(&p.running) == 0 { return false } return p.mconn.Send(chId, msg) } func (p *Peer) TrySend(chId byte, msg interface{}) bool { - if atomic.LoadUint32(&p.stopped) == 1 { + if atomic.LoadUint32(&p.running) == 0 { return false } return p.mconn.TrySend(chId, msg) } func (p *Peer) CanSend(chId byte) bool { - if atomic.LoadUint32(&p.stopped) == 1 { + if atomic.LoadUint32(&p.running) == 0 { return false } return p.mconn.CanSend(chId)