|
|
@ -29,9 +29,11 @@ const ( |
|
|
|
type ConsensusReactor struct { |
|
|
|
p2p.BaseReactor // BaseService + p2p.Switch
|
|
|
|
|
|
|
|
conS *ConsensusState |
|
|
|
conS *ConsensusState |
|
|
|
evsw types.EventSwitch |
|
|
|
|
|
|
|
mtx sync.RWMutex |
|
|
|
fastSync bool |
|
|
|
evsw types.EventSwitch |
|
|
|
} |
|
|
|
|
|
|
|
func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor { |
|
|
@ -44,14 +46,14 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens |
|
|
|
} |
|
|
|
|
|
|
|
func (conR *ConsensusReactor) OnStart() error { |
|
|
|
conR.Logger.Info("ConsensusReactor ", "fastSync", conR.fastSync) |
|
|
|
conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync()) |
|
|
|
conR.BaseReactor.OnStart() |
|
|
|
|
|
|
|
// callbacks for broadcasting new steps and votes to peers
|
|
|
|
// upon their respective events (ie. uses evsw)
|
|
|
|
conR.registerEventCallbacks() |
|
|
|
|
|
|
|
if !conR.fastSync { |
|
|
|
if !conR.FastSync() { |
|
|
|
_, err := conR.conS.Start() |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
@ -73,7 +75,11 @@ func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { |
|
|
|
// NOTE: The line below causes broadcastNewRoundStepRoutine() to
|
|
|
|
// broadcast a NewRoundStepMessage.
|
|
|
|
conR.conS.updateToState(state) |
|
|
|
|
|
|
|
conR.mtx.Lock() |
|
|
|
conR.fastSync = false |
|
|
|
conR.mtx.Unlock() |
|
|
|
|
|
|
|
conR.conS.Start() |
|
|
|
} |
|
|
|
|
|
|
@ -124,7 +130,7 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { |
|
|
|
|
|
|
|
// Send our state to peer.
|
|
|
|
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
|
|
|
|
if !conR.fastSync { |
|
|
|
if !conR.FastSync() { |
|
|
|
conR.sendNewRoundStepMessages(peer) |
|
|
|
} |
|
|
|
} |
|
|
@ -204,7 +210,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) |
|
|
|
} |
|
|
|
|
|
|
|
case DataChannel: |
|
|
|
if conR.fastSync { |
|
|
|
if conR.FastSync() { |
|
|
|
conR.Logger.Info("Ignoring message received during fastSync", "msg", msg) |
|
|
|
return |
|
|
|
} |
|
|
@ -222,7 +228,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) |
|
|
|
} |
|
|
|
|
|
|
|
case VoteChannel: |
|
|
|
if conR.fastSync { |
|
|
|
if conR.FastSync() { |
|
|
|
conR.Logger.Info("Ignoring message received during fastSync", "msg", msg) |
|
|
|
return |
|
|
|
} |
|
|
@ -244,7 +250,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) |
|
|
|
} |
|
|
|
|
|
|
|
case VoteSetBitsChannel: |
|
|
|
if conR.fastSync { |
|
|
|
if conR.FastSync() { |
|
|
|
conR.Logger.Info("Ignoring message received during fastSync", "msg", msg) |
|
|
|
return |
|
|
|
} |
|
|
@ -292,6 +298,8 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { |
|
|
|
|
|
|
|
// FastSync returns whether the consensus reactor is currently fast syncing
|
|
|
|
func (conR *ConsensusReactor) FastSync() bool { |
|
|
|
conR.mtx.RLock() |
|
|
|
defer conR.mtx.RUnlock() |
|
|
|
return conR.fastSync |
|
|
|
} |
|
|
|
|
|
|
|