Browse Source

blockchain: update for new state

pull/1015/head
Ethan Buchman 7 years ago
parent
commit
bac60f2067
2 changed files with 52 additions and 34 deletions
  1. +40
    -27
      blockchain/reactor.go
  2. +12
    -7
      blockchain/reactor_test.go

+ 40
- 27
blockchain/reactor.go View File

@ -4,11 +4,11 @@ import (
"bytes"
"errors"
"reflect"
"sync"
"time"
wire "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
@ -34,29 +34,33 @@ const (
type consensusReactor interface {
// for when we switch from blockchain reactor and fast sync to
// the consensus machine
SwitchToConsensus(*sm.State, int)
SwitchToConsensus(sm.State, int)
}
// BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct {
p2p.BaseReactor
state *sm.State
proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn
store *BlockStore
pool *BlockPool
fastSync bool
requestsCh chan BlockRequest
timeoutsCh chan string
mtx sync.Mutex
params types.ConsensusParams
eventBus *types.EventBus
// immutable
initialState sm.State
blockExec *sm.BlockExecutor
store *BlockStore
pool *BlockPool
fastSync bool
requestsCh chan BlockRequest
timeoutsCh chan string
}
// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore, fastSync bool) *BlockchainReactor {
if state.LastBlockHeight != store.Height() {
cmn.PanicSanity(cmn.Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
}
requestsCh := make(chan BlockRequest, defaultChannelCapacity)
timeoutsCh := make(chan string, defaultChannelCapacity)
pool := NewBlockPool(
@ -65,8 +69,9 @@ func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus,
timeoutsCh,
)
bcR := &BlockchainReactor{
state: state,
proxyAppConn: proxyAppConn,
params: state.ConsensusParams,
initialState: state,
blockExec: blockExec,
store: store,
pool: pool,
fastSync: fastSync,
@ -183,7 +188,16 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
// maxMsgSize returns the maximum allowable size of a
// message on the blockchain reactor.
func (bcR *BlockchainReactor) maxMsgSize() int {
return bcR.state.ConsensusParams.BlockSize.MaxBytes + 2
bcR.mtx.Lock()
defer bcR.mtx.Unlock()
return bcR.params.BlockSize.MaxBytes + 2
}
// updateConsensusParams updates the internal consensus params
func (bcR *BlockchainReactor) updateConsensusParams(params types.ConsensusParams) {
bcR.mtx.Lock()
defer bcR.mtx.Unlock()
bcR.params = params
}
// Handle messages from the poolReactor telling the reactor what to do.
@ -197,7 +211,8 @@ func (bcR *BlockchainReactor) poolRoutine() {
blocksSynced := 0
chainID := bcR.state.ChainID
chainID := bcR.initialState.ChainID
state := bcR.initialState
lastHundred := time.Now()
lastRate := 0.0
@ -236,7 +251,7 @@ FOR_LOOP:
bcR.pool.Stop()
conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
conR.SwitchToConsensus(bcR.state, blocksSynced)
conR.SwitchToConsensus(state, blocksSynced)
break FOR_LOOP
}
@ -251,14 +266,15 @@ FOR_LOOP:
// We need both to sync the first block.
break SYNC_LOOP
}
firstParts := first.MakePartSet(bcR.state.ConsensusParams.BlockPartSizeBytes)
firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes)
firstPartsHeader := firstParts.Header()
firstID := types.BlockID{first.Hash(), firstPartsHeader}
// Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
err := bcR.state.Validators.VerifyCommit(
chainID, types.BlockID{first.Hash(), firstPartsHeader}, first.Height, second.LastCommit)
err := state.Validators.VerifyCommit(
chainID, firstID, first.Height, second.LastCommit)
if err != nil {
bcR.Logger.Error("Error in validation", "err", err)
bcR.pool.RedoRequest(first.Height)
@ -272,15 +288,17 @@ FOR_LOOP:
// NOTE: we could improve performance if we
// didn't make the app commit to disk every block
// ... but we would need a way to get the hash without it persisting
err := bcR.state.ApplyBlock(bcR.eventBus, bcR.proxyAppConn,
first, firstPartsHeader,
types.MockMempool{}, types.MockEvidencePool{}) // TODO unmock!
var err error
state, err = bcR.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
blocksSynced += 1
// update the consensus params
bcR.updateConsensusParams(state.ConsensusParams)
if blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,
@ -302,11 +320,6 @@ func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
return nil
}
// SetEventBus sets event bus.
func (bcR *BlockchainReactor) SetEventBus(b *types.EventBus) {
bcR.eventBus = b
}
//-----------------------------------------------------------------------------
// Messages


+ 12
- 7
blockchain/reactor_test.go View File

@ -14,14 +14,14 @@ import (
"github.com/tendermint/tendermint/types"
)
func makeStateAndBlockStore(logger log.Logger) (*sm.State, *BlockStore) {
func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
blockStore := NewBlockStore(dbm.NewMemDB())
// Get State
state, _ := sm.GetState(dbm.NewMemDB(), config.GenesisFile())
state.SetLogger(logger.With("module", "state"))
state.Save()
stateDB := dbm.NewMemDB()
state, _ := sm.GetState(stateDB, config.GenesisFile())
sm.SaveState(stateDB, state, state.AppHash)
return state, blockStore
}
@ -31,7 +31,10 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe
// Make the blockchainReactor itself
fastSync := true
bcReactor := NewBlockchainReactor(state.Copy(), nil, blockStore, fastSync)
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(),
nil, nil, types.MockMempool{}, types.MockEvidencePool{})
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
// Next: we need to set a switch in order for peers to be added in
@ -51,7 +54,7 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe
func TestNoBlockMessageResponse(t *testing.T) {
maxBlockHeight := int64(20)
bcr := newBlockchainReactor(log.NewNopLogger(), maxBlockHeight)
bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight)
bcr.Start()
defer bcr.Stop()
@ -71,6 +74,8 @@ func TestNoBlockMessageResponse(t *testing.T) {
{100, false},
}
// receive a request message from peer,
// wait to hear response
for _, tt := range tests {
reqBlockMsg := &bcBlockRequestMessage{tt.height}
reqBlockBytes := wire.BinaryBytes(struct{ BlockchainMessage }{reqBlockMsg})
@ -104,7 +109,7 @@ func makeTxs(height int64) (txs []types.Tx) {
return txs
}
func makeBlock(height int64, state *sm.State) *types.Block {
func makeBlock(height int64, state sm.State) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(height), new(types.Commit))
return block
}


Loading…
Cancel
Save