From bac60f2067cfd08f09e02e060bab520bb51f3c3c Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 27 Dec 2017 20:40:36 -0500 Subject: [PATCH] blockchain: update for new state --- blockchain/reactor.go | 67 +++++++++++++++++++++++--------------- blockchain/reactor_test.go | 19 +++++++---- 2 files changed, 52 insertions(+), 34 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index f985e2849..c8e794a13 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -4,11 +4,11 @@ import ( "bytes" "errors" "reflect" + "sync" "time" wire "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" @@ -34,29 +34,33 @@ const ( type consensusReactor interface { // for when we switch from blockchain reactor and fast sync to // the consensus machine - SwitchToConsensus(*sm.State, int) + SwitchToConsensus(sm.State, int) } // BlockchainReactor handles long-term catchup syncing. type BlockchainReactor struct { p2p.BaseReactor - state *sm.State - proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn - store *BlockStore - pool *BlockPool - fastSync bool - requestsCh chan BlockRequest - timeoutsCh chan string + mtx sync.Mutex + params types.ConsensusParams - eventBus *types.EventBus + // immutable + initialState sm.State + + blockExec *sm.BlockExecutor + store *BlockStore + pool *BlockPool + fastSync bool + requestsCh chan BlockRequest + timeoutsCh chan string } // NewBlockchainReactor returns new reactor instance. -func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor { +func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore, fastSync bool) *BlockchainReactor { if state.LastBlockHeight != store.Height() { cmn.PanicSanity(cmn.Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())) } + requestsCh := make(chan BlockRequest, defaultChannelCapacity) timeoutsCh := make(chan string, defaultChannelCapacity) pool := NewBlockPool( @@ -65,8 +69,9 @@ func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, timeoutsCh, ) bcR := &BlockchainReactor{ - state: state, - proxyAppConn: proxyAppConn, + params: state.ConsensusParams, + initialState: state, + blockExec: blockExec, store: store, pool: pool, fastSync: fastSync, @@ -183,7 +188,16 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) // maxMsgSize returns the maximum allowable size of a // message on the blockchain reactor. func (bcR *BlockchainReactor) maxMsgSize() int { - return bcR.state.ConsensusParams.BlockSize.MaxBytes + 2 + bcR.mtx.Lock() + defer bcR.mtx.Unlock() + return bcR.params.BlockSize.MaxBytes + 2 +} + +// updateConsensusParams updates the internal consensus params +func (bcR *BlockchainReactor) updateConsensusParams(params types.ConsensusParams) { + bcR.mtx.Lock() + defer bcR.mtx.Unlock() + bcR.params = params } // Handle messages from the poolReactor telling the reactor what to do. @@ -197,7 +211,8 @@ func (bcR *BlockchainReactor) poolRoutine() { blocksSynced := 0 - chainID := bcR.state.ChainID + chainID := bcR.initialState.ChainID + state := bcR.initialState lastHundred := time.Now() lastRate := 0.0 @@ -236,7 +251,7 @@ FOR_LOOP: bcR.pool.Stop() conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) - conR.SwitchToConsensus(bcR.state, blocksSynced) + conR.SwitchToConsensus(state, blocksSynced) break FOR_LOOP } @@ -251,14 +266,15 @@ FOR_LOOP: // We need both to sync the first block. break SYNC_LOOP } - firstParts := first.MakePartSet(bcR.state.ConsensusParams.BlockPartSizeBytes) + firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes) firstPartsHeader := firstParts.Header() + firstID := types.BlockID{first.Hash(), firstPartsHeader} // Finally, verify the first block using the second's commit // NOTE: we can probably make this more efficient, but note that calling // first.Hash() doesn't verify the tx contents, so MakePartSet() is // currently necessary. - err := bcR.state.Validators.VerifyCommit( - chainID, types.BlockID{first.Hash(), firstPartsHeader}, first.Height, second.LastCommit) + err := state.Validators.VerifyCommit( + chainID, firstID, first.Height, second.LastCommit) if err != nil { bcR.Logger.Error("Error in validation", "err", err) bcR.pool.RedoRequest(first.Height) @@ -272,15 +288,17 @@ FOR_LOOP: // NOTE: we could improve performance if we // didn't make the app commit to disk every block // ... but we would need a way to get the hash without it persisting - err := bcR.state.ApplyBlock(bcR.eventBus, bcR.proxyAppConn, - first, firstPartsHeader, - types.MockMempool{}, types.MockEvidencePool{}) // TODO unmock! + var err error + state, err = bcR.blockExec.ApplyBlock(state, firstID, first) if err != nil { // TODO This is bad, are we zombie? cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } blocksSynced += 1 + // update the consensus params + bcR.updateConsensusParams(state.ConsensusParams) + if blocksSynced%100 == 0 { lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height, @@ -302,11 +320,6 @@ func (bcR *BlockchainReactor) BroadcastStatusRequest() error { return nil } -// SetEventBus sets event bus. -func (bcR *BlockchainReactor) SetEventBus(b *types.EventBus) { - bcR.eventBus = b -} - //----------------------------------------------------------------------------- // Messages diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 36cdc080d..ecb4a9e63 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -14,14 +14,14 @@ import ( "github.com/tendermint/tendermint/types" ) -func makeStateAndBlockStore(logger log.Logger) (*sm.State, *BlockStore) { +func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) { config := cfg.ResetTestRoot("blockchain_reactor_test") blockStore := NewBlockStore(dbm.NewMemDB()) // Get State - state, _ := sm.GetState(dbm.NewMemDB(), config.GenesisFile()) - state.SetLogger(logger.With("module", "state")) - state.Save() + stateDB := dbm.NewMemDB() + state, _ := sm.GetState(stateDB, config.GenesisFile()) + sm.SaveState(stateDB, state, state.AppHash) return state, blockStore } @@ -31,7 +31,10 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe // Make the blockchainReactor itself fastSync := true - bcReactor := NewBlockchainReactor(state.Copy(), nil, blockStore, fastSync) + blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), + nil, nil, types.MockMempool{}, types.MockEvidencePool{}) + + bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor.SetLogger(logger.With("module", "blockchain")) // Next: we need to set a switch in order for peers to be added in @@ -51,7 +54,7 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe func TestNoBlockMessageResponse(t *testing.T) { maxBlockHeight := int64(20) - bcr := newBlockchainReactor(log.NewNopLogger(), maxBlockHeight) + bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight) bcr.Start() defer bcr.Stop() @@ -71,6 +74,8 @@ func TestNoBlockMessageResponse(t *testing.T) { {100, false}, } + // receive a request message from peer, + // wait to hear response for _, tt := range tests { reqBlockMsg := &bcBlockRequestMessage{tt.height} reqBlockBytes := wire.BinaryBytes(struct{ BlockchainMessage }{reqBlockMsg}) @@ -104,7 +109,7 @@ func makeTxs(height int64) (txs []types.Tx) { return txs } -func makeBlock(height int64, state *sm.State) *types.Block { +func makeBlock(height int64, state sm.State) *types.Block { block, _ := state.MakeBlock(height, makeTxs(height), new(types.Commit)) return block }