diff --git a/consensus/reactor.go b/consensus/reactor.go index 3e12a3148..222339241 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -32,9 +32,11 @@ const ( type ConsensusReactor struct { p2p.BaseReactor // BaseService + p2p.Switch - conS *ConsensusState + conS *ConsensusState + evsw types.EventSwitch + + mtx sync.RWMutex fastSync bool - evsw types.EventSwitch } // NewConsensusReactor returns a new ConsensusReactor with the given consensusState. @@ -49,14 +51,14 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens // OnStart implements BaseService. func (conR *ConsensusReactor) OnStart() error { - conR.Logger.Info("ConsensusReactor ", "fastSync", conR.fastSync) + conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync()) conR.BaseReactor.OnStart() // callbacks for broadcasting new steps and votes to peers // upon their respective events (ie. uses evsw) conR.registerEventCallbacks() - if !conR.fastSync { + if !conR.FastSync() { _, err := conR.conS.Start() if err != nil { return err @@ -79,7 +81,11 @@ func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { // NOTE: The line below causes broadcastNewRoundStepRoutine() to // broadcast a NewRoundStepMessage. conR.conS.updateToState(state) + + conR.mtx.Lock() conR.fastSync = false + conR.mtx.Unlock() + conR.conS.Start() } @@ -130,7 +136,7 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { // Send our state to peer. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). - if !conR.fastSync { + if !conR.FastSync() { conR.sendNewRoundStepMessages(peer) } } @@ -210,7 +216,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } case DataChannel: - if conR.fastSync { + if conR.FastSync() { conR.Logger.Info("Ignoring message received during fastSync", "msg", msg) return } @@ -228,7 +234,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } case VoteChannel: - if conR.fastSync { + if conR.FastSync() { conR.Logger.Info("Ignoring message received during fastSync", "msg", msg) return } @@ -250,7 +256,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } case VoteSetBitsChannel: - if conR.fastSync { + if conR.FastSync() { conR.Logger.Info("Ignoring message received during fastSync", "msg", msg) return } @@ -296,6 +302,13 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { conR.conS.SetEventSwitch(evsw) } +// FastSync returns whether the consensus reactor is in fast-sync mode. +func (conR *ConsensusReactor) FastSync() bool { + conR.mtx.RLock() + defer conR.mtx.RUnlock() + return conR.fastSync +} + //-------------------------------------- // Listens for new steps and votes, diff --git a/node/node.go b/node/node.go index e55731f4f..96c913cb4 100644 --- a/node/node.go +++ b/node/node.go @@ -319,6 +319,7 @@ func (n *Node) ConfigureRPC() { rpccore.SetAddrBook(n.addrBook) rpccore.SetProxyAppQuery(n.proxyApp.Query()) rpccore.SetTxIndexer(n.txIndexer) + rpccore.SetConsensusReactor(n.consensusReactor) rpccore.SetLogger(n.Logger.With("module", "rpc")) } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index a18de2ad8..92e5746a6 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -28,6 +28,8 @@ type P2P interface { } //---------------------------------------------- +// These package level globals come with setters +// that are expected to be called only once, on startup var ( // external, thread safe interfaces @@ -41,10 +43,11 @@ var ( p2pSwitch P2P // objects - pubKey crypto.PubKey - genDoc *types.GenesisDoc // cache the genesis structure - addrBook *p2p.AddrBook - txIndexer txindex.TxIndexer + pubKey crypto.PubKey + genDoc *types.GenesisDoc // cache the genesis structure + addrBook *p2p.AddrBook + txIndexer txindex.TxIndexer + consensusReactor *consensus.ConsensusReactor logger log.Logger ) @@ -89,6 +92,10 @@ func SetTxIndexer(indexer txindex.TxIndexer) { txIndexer = indexer } +func SetConsensusReactor(conR *consensus.ConsensusReactor) { + consensusReactor = conR +} + func SetLogger(l log.Logger) { logger = l } diff --git a/rpc/core/status.go b/rpc/core/status.go index 7493aeb0a..ba19c0e9b 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -27,5 +27,6 @@ func Status() (*ctypes.ResultStatus, error) { LatestBlockHash: latestBlockHash, LatestAppHash: latestAppHash, LatestBlockHeight: latestHeight, - LatestBlockTime: latestBlockTime}, nil + LatestBlockTime: latestBlockTime, + Syncing: consensusReactor.FastSync()}, nil } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index b9c22a8b0..6b66f1168 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -38,6 +38,7 @@ type ResultStatus struct { LatestAppHash data.Bytes `json:"latest_app_hash"` LatestBlockHeight int `json:"latest_block_height"` LatestBlockTime int64 `json:"latest_block_time"` // nano + Syncing bool `json:"syncing"` } func (s *ResultStatus) TxIndexEnabled() bool {