From 92ada55e5ab539ad8a45e22eb8abb2d8035ec312 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 9 Aug 2017 14:55:21 -0400 Subject: [PATCH] make conR.FastSync() thread safe --- consensus/reactor.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 801e010ce..27d1537ad 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -29,9 +29,11 @@ const ( type ConsensusReactor struct { p2p.BaseReactor // BaseService + p2p.Switch - conS *ConsensusState + conS *ConsensusState + evsw types.EventSwitch + + mtx sync.RWMutex fastSync bool - evsw types.EventSwitch } func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor { @@ -44,14 +46,14 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens } func (conR *ConsensusReactor) OnStart() error { - conR.Logger.Info("ConsensusReactor ", "fastSync", conR.fastSync) + conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync()) conR.BaseReactor.OnStart() // callbacks for broadcasting new steps and votes to peers // upon their respective events (ie. uses evsw) conR.registerEventCallbacks() - if !conR.fastSync { + if !conR.FastSync() { _, err := conR.conS.Start() if err != nil { return err @@ -73,7 +75,11 @@ func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { // NOTE: The line below causes broadcastNewRoundStepRoutine() to // broadcast a NewRoundStepMessage. conR.conS.updateToState(state) + + conR.mtx.Lock() conR.fastSync = false + conR.mtx.Unlock() + conR.conS.Start() } @@ -124,7 +130,7 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { // Send our state to peer. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). - if !conR.fastSync { + if !conR.FastSync() { conR.sendNewRoundStepMessages(peer) } } @@ -204,7 +210,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } case DataChannel: - if conR.fastSync { + if conR.FastSync() { conR.Logger.Info("Ignoring message received during fastSync", "msg", msg) return } @@ -222,7 +228,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } case VoteChannel: - if conR.fastSync { + if conR.FastSync() { conR.Logger.Info("Ignoring message received during fastSync", "msg", msg) return } @@ -244,7 +250,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } case VoteSetBitsChannel: - if conR.fastSync { + if conR.FastSync() { conR.Logger.Info("Ignoring message received during fastSync", "msg", msg) return } @@ -292,6 +298,8 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { // FastSync returns whether the consensus reactor is currently fast syncing func (conR *ConsensusReactor) FastSync() bool { + conR.mtx.RLock() + defer conR.mtx.RUnlock() return conR.fastSync }