From 05c0dfac120c5a44f674c80ec3afb3c913ec5264 Mon Sep 17 00:00:00 2001 From: Adrian Brink Date: Mon, 10 Jul 2017 19:30:54 +0200 Subject: [PATCH 1/4] First crack it providing fast-sync endpoint --- blockchain/reactor.go | 5 +++++ node/node.go | 1 + rpc/core/pipe.go | 6 ++++++ rpc/core/status.go | 3 ++- rpc/core/types/responses.go | 1 + 5 files changed, 15 insertions(+), 1 deletion(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 813d8f6b7..eca0a6251 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -267,6 +267,11 @@ func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) { bcR.evsw = evsw } +// FastSync returns whether the blockchain reactor is currently fast syncing +func (bcR *BlockchainReactor) FastSync() bool { + return bcR.fastSync +} + //----------------------------------------------------------------------------- // Messages diff --git a/node/node.go b/node/node.go index 672e384b9..f11c01398 100644 --- a/node/node.go +++ b/node/node.go @@ -315,6 +315,7 @@ func (n *Node) ConfigureRPC() { rpccore.SetAddrBook(n.addrBook) rpccore.SetProxyAppQuery(n.proxyApp.Query()) rpccore.SetTxIndexer(n.txIndexer) + rpccore.SetBlockchainReactor(n.bcReactor) rpccore.SetLogger(n.Logger.With("module", "rpc")) } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index a18de2ad8..e581de8b9 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -2,6 +2,7 @@ package core import ( crypto "github.com/tendermint/go-crypto" + "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" p2p "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/proxy" @@ -45,6 +46,7 @@ var ( genDoc *types.GenesisDoc // cache the genesis structure addrBook *p2p.AddrBook txIndexer txindex.TxIndexer + bcReactor *blockchain.BlockchainReactor logger log.Logger ) @@ -89,6 +91,10 @@ func SetTxIndexer(indexer txindex.TxIndexer) { txIndexer = indexer } +func SetBlockchainReactor(bc *blockchain.BlockchainReactor) { + bcReactor = bc +} + func SetLogger(l log.Logger) { logger = l } diff --git a/rpc/core/status.go b/rpc/core/status.go index 7493aeb0a..ca83031c9 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -27,5 +27,6 @@ func Status() (*ctypes.ResultStatus, error) { LatestBlockHash: latestBlockHash, LatestAppHash: latestAppHash, LatestBlockHeight: latestHeight, - LatestBlockTime: latestBlockTime}, nil + LatestBlockTime: latestBlockTime, + Syncing: bcReactor.FastSync()}, nil } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index b9c22a8b0..6b66f1168 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -38,6 +38,7 @@ type ResultStatus struct { LatestAppHash data.Bytes `json:"latest_app_hash"` LatestBlockHeight int `json:"latest_block_height"` LatestBlockTime int64 `json:"latest_block_time"` // nano + Syncing bool `json:"syncing"` } func (s *ResultStatus) TxIndexEnabled() bool { From 6f8d385dfae522e1cb030319bbd800edeb9b29bd Mon Sep 17 00:00:00 2001 From: ramil Date: Mon, 17 Jul 2017 09:44:23 +0300 Subject: [PATCH 2/4] fast sync status --- blockchain/reactor.go | 5 ----- consensus/reactor.go | 5 +++++ node/node.go | 2 +- rpc/core/pipe.go | 15 +++++++-------- rpc/core/status.go | 2 +- 5 files changed, 14 insertions(+), 15 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index eca0a6251..813d8f6b7 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -267,11 +267,6 @@ func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) { bcR.evsw = evsw } -// FastSync returns whether the blockchain reactor is currently fast syncing -func (bcR *BlockchainReactor) FastSync() bool { - return bcR.fastSync -} - //----------------------------------------------------------------------------- // Messages diff --git a/consensus/reactor.go b/consensus/reactor.go index f8a6dfae1..801e010ce 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -290,6 +290,11 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { conR.conS.SetEventSwitch(evsw) } +// FastSync returns whether the consensus reactor is currently fast syncing +func (conR *ConsensusReactor) FastSync() bool { + return conR.fastSync +} + //-------------------------------------- // Listens for new steps and votes, diff --git a/node/node.go b/node/node.go index f11c01398..bd87798cf 100644 --- a/node/node.go +++ b/node/node.go @@ -315,7 +315,7 @@ func (n *Node) ConfigureRPC() { rpccore.SetAddrBook(n.addrBook) rpccore.SetProxyAppQuery(n.proxyApp.Query()) rpccore.SetTxIndexer(n.txIndexer) - rpccore.SetBlockchainReactor(n.bcReactor) + rpccore.SetConsensusReactor(n.consensusReactor) rpccore.SetLogger(n.Logger.With("module", "rpc")) } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index e581de8b9..6c1930598 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -2,7 +2,6 @@ package core import ( crypto "github.com/tendermint/go-crypto" - "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" p2p "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/proxy" @@ -42,11 +41,11 @@ var ( p2pSwitch P2P // objects - pubKey crypto.PubKey - genDoc *types.GenesisDoc // cache the genesis structure - addrBook *p2p.AddrBook - txIndexer txindex.TxIndexer - bcReactor *blockchain.BlockchainReactor + pubKey crypto.PubKey + genDoc *types.GenesisDoc // cache the genesis structure + addrBook *p2p.AddrBook + txIndexer txindex.TxIndexer + consensusReactor *consensus.ConsensusReactor logger log.Logger ) @@ -91,8 +90,8 @@ func SetTxIndexer(indexer txindex.TxIndexer) { txIndexer = indexer } -func SetBlockchainReactor(bc *blockchain.BlockchainReactor) { - bcReactor = bc +func SetConsensusReactor(conR *consensus.ConsensusReactor) { + consensusReactor = conR } func SetLogger(l log.Logger) { diff --git a/rpc/core/status.go b/rpc/core/status.go index ca83031c9..ba19c0e9b 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -28,5 +28,5 @@ func Status() (*ctypes.ResultStatus, error) { LatestAppHash: latestAppHash, LatestBlockHeight: latestHeight, LatestBlockTime: latestBlockTime, - Syncing: bcReactor.FastSync()}, nil + Syncing: consensusReactor.FastSync()}, nil } From 92ada55e5ab539ad8a45e22eb8abb2d8035ec312 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 9 Aug 2017 14:55:21 -0400 Subject: [PATCH 3/4] make conR.FastSync() thread safe --- consensus/reactor.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 801e010ce..27d1537ad 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -29,9 +29,11 @@ const ( type ConsensusReactor struct { p2p.BaseReactor // BaseService + p2p.Switch - conS *ConsensusState + conS *ConsensusState + evsw types.EventSwitch + + mtx sync.RWMutex fastSync bool - evsw types.EventSwitch } func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor { @@ -44,14 +46,14 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens } func (conR *ConsensusReactor) OnStart() error { - conR.Logger.Info("ConsensusReactor ", "fastSync", conR.fastSync) + conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync()) conR.BaseReactor.OnStart() // callbacks for broadcasting new steps and votes to peers // upon their respective events (ie. uses evsw) conR.registerEventCallbacks() - if !conR.fastSync { + if !conR.FastSync() { _, err := conR.conS.Start() if err != nil { return err @@ -73,7 +75,11 @@ func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { // NOTE: The line below causes broadcastNewRoundStepRoutine() to // broadcast a NewRoundStepMessage. conR.conS.updateToState(state) + + conR.mtx.Lock() conR.fastSync = false + conR.mtx.Unlock() + conR.conS.Start() } @@ -124,7 +130,7 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { // Send our state to peer. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). - if !conR.fastSync { + if !conR.FastSync() { conR.sendNewRoundStepMessages(peer) } } @@ -204,7 +210,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } case DataChannel: - if conR.fastSync { + if conR.FastSync() { conR.Logger.Info("Ignoring message received during fastSync", "msg", msg) return } @@ -222,7 +228,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } case VoteChannel: - if conR.fastSync { + if conR.FastSync() { conR.Logger.Info("Ignoring message received during fastSync", "msg", msg) return } @@ -244,7 +250,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } case VoteSetBitsChannel: - if conR.fastSync { + if conR.FastSync() { conR.Logger.Info("Ignoring message received during fastSync", "msg", msg) return } @@ -292,6 +298,8 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { // FastSync returns whether the consensus reactor is currently fast syncing func (conR *ConsensusReactor) FastSync() bool { + conR.mtx.RLock() + defer conR.mtx.RUnlock() return conR.fastSync } From b0728260e925ab8ad3b80ba731f277e18676f7e9 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 9 Aug 2017 23:51:09 -0400 Subject: [PATCH 4/4] comments --- consensus/reactor.go | 2 +- rpc/core/pipe.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 27d1537ad..51f6f9902 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -296,7 +296,7 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { conR.conS.SetEventSwitch(evsw) } -// FastSync returns whether the consensus reactor is currently fast syncing +// FastSync returns whether the consensus reactor is in fast-sync mode. func (conR *ConsensusReactor) FastSync() bool { conR.mtx.RLock() defer conR.mtx.RUnlock() diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 6c1930598..92e5746a6 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -28,6 +28,8 @@ type P2P interface { } //---------------------------------------------- +// These package level globals come with setters +// that are expected to be called only once, on startup var ( // external, thread safe interfaces