Browse Source

Merge pull request #579 from tendermint/feature/sync_status

Add fast-sync status to Status() call
pull/606/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
49278a7f9c
5 changed files with 36 additions and 13 deletions
  1. +21
    -8
      consensus/reactor.go
  2. +1
    -0
      node/node.go
  3. +11
    -4
      rpc/core/pipe.go
  4. +2
    -1
      rpc/core/status.go
  5. +1
    -0
      rpc/core/types/responses.go

+ 21
- 8
consensus/reactor.go View File

@ -32,9 +32,11 @@ const (
type ConsensusReactor struct {
p2p.BaseReactor // BaseService + p2p.Switch
conS *ConsensusState
conS *ConsensusState
evsw types.EventSwitch
mtx sync.RWMutex
fastSync bool
evsw types.EventSwitch
}
// NewConsensusReactor returns a new ConsensusReactor with the given consensusState.
@ -49,14 +51,14 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens
// OnStart implements BaseService.
func (conR *ConsensusReactor) OnStart() error {
conR.Logger.Info("ConsensusReactor ", "fastSync", conR.fastSync)
conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync())
conR.BaseReactor.OnStart()
// callbacks for broadcasting new steps and votes to peers
// upon their respective events (ie. uses evsw)
conR.registerEventCallbacks()
if !conR.fastSync {
if !conR.FastSync() {
_, err := conR.conS.Start()
if err != nil {
return err
@ -79,7 +81,11 @@ func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
// NOTE: The line below causes broadcastNewRoundStepRoutine() to
// broadcast a NewRoundStepMessage.
conR.conS.updateToState(state)
conR.mtx.Lock()
conR.fastSync = false
conR.mtx.Unlock()
conR.conS.Start()
}
@ -130,7 +136,7 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
// Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !conR.fastSync {
if !conR.FastSync() {
conR.sendNewRoundStepMessages(peer)
}
}
@ -210,7 +216,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
}
case DataChannel:
if conR.fastSync {
if conR.FastSync() {
conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
return
}
@ -228,7 +234,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
}
case VoteChannel:
if conR.fastSync {
if conR.FastSync() {
conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
return
}
@ -250,7 +256,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
}
case VoteSetBitsChannel:
if conR.fastSync {
if conR.FastSync() {
conR.Logger.Info("Ignoring message received during fastSync", "msg", msg)
return
}
@ -296,6 +302,13 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) {
conR.conS.SetEventSwitch(evsw)
}
// FastSync returns whether the consensus reactor is in fast-sync mode.
func (conR *ConsensusReactor) FastSync() bool {
conR.mtx.RLock()
defer conR.mtx.RUnlock()
return conR.fastSync
}
//--------------------------------------
// Listens for new steps and votes,


+ 1
- 0
node/node.go View File

@ -319,6 +319,7 @@ func (n *Node) ConfigureRPC() {
rpccore.SetAddrBook(n.addrBook)
rpccore.SetProxyAppQuery(n.proxyApp.Query())
rpccore.SetTxIndexer(n.txIndexer)
rpccore.SetConsensusReactor(n.consensusReactor)
rpccore.SetLogger(n.Logger.With("module", "rpc"))
}


+ 11
- 4
rpc/core/pipe.go View File

@ -28,6 +28,8 @@ type P2P interface {
}
//----------------------------------------------
// These package level globals come with setters
// that are expected to be called only once, on startup
var (
// external, thread safe interfaces
@ -41,10 +43,11 @@ var (
p2pSwitch P2P
// objects
pubKey crypto.PubKey
genDoc *types.GenesisDoc // cache the genesis structure
addrBook *p2p.AddrBook
txIndexer txindex.TxIndexer
pubKey crypto.PubKey
genDoc *types.GenesisDoc // cache the genesis structure
addrBook *p2p.AddrBook
txIndexer txindex.TxIndexer
consensusReactor *consensus.ConsensusReactor
logger log.Logger
)
@ -89,6 +92,10 @@ func SetTxIndexer(indexer txindex.TxIndexer) {
txIndexer = indexer
}
func SetConsensusReactor(conR *consensus.ConsensusReactor) {
consensusReactor = conR
}
func SetLogger(l log.Logger) {
logger = l
}

+ 2
- 1
rpc/core/status.go View File

@ -27,5 +27,6 @@ func Status() (*ctypes.ResultStatus, error) {
LatestBlockHash: latestBlockHash,
LatestAppHash: latestAppHash,
LatestBlockHeight: latestHeight,
LatestBlockTime: latestBlockTime}, nil
LatestBlockTime: latestBlockTime,
Syncing: consensusReactor.FastSync()}, nil
}

+ 1
- 0
rpc/core/types/responses.go View File

@ -38,6 +38,7 @@ type ResultStatus struct {
LatestAppHash data.Bytes `json:"latest_app_hash"`
LatestBlockHeight int `json:"latest_block_height"`
LatestBlockTime int64 `json:"latest_block_time"` // nano
Syncing bool `json:"syncing"`
}
func (s *ResultStatus) TxIndexEnabled() bool {


Loading…
Cancel
Save