Browse Source

final updates for state

pull/1015/head
Ethan Buchman 7 years ago
parent
commit
0acca7fe69
17 changed files with 192 additions and 165 deletions
  1. +1
    -1
      blockchain/reactor_test.go
  2. +17
    -19
      consensus/common_test.go
  3. +1
    -1
      consensus/reactor.go
  4. +41
    -28
      consensus/replay.go
  5. +14
    -8
      consensus/replay_file.go
  6. +34
    -37
      consensus/replay_test.go
  7. +19
    -20
      consensus/state.go
  8. +4
    -4
      consensus/wal_generator.go
  9. +12
    -9
      evidence/pool.go
  10. +22
    -16
      node/node.go
  11. +1
    -2
      rpc/core/blocks.go
  12. +1
    -2
      rpc/core/consensus.go
  13. +7
    -1
      rpc/core/pipe.go
  14. +2
    -4
      state/db.go
  15. +12
    -9
      state/execution.go
  16. +1
    -1
      state/state.go
  17. +3
    -3
      state/state_test.go

+ 1
- 1
blockchain/reactor_test.go View File

@ -21,7 +21,7 @@ func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) {
// Get State // Get State
stateDB := dbm.NewMemDB() stateDB := dbm.NewMemDB()
state, _ := sm.GetState(stateDB, config.GenesisFile()) state, _ := sm.GetState(stateDB, config.GenesisFile())
sm.SaveState(stateDB, state, state.AppHash)
sm.SaveState(stateDB, state)
return state, blockStore return state, blockStore
} }


+ 17
- 19
consensus/common_test.go View File

@ -235,16 +235,16 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
//------------------------------------------------------------------------------- //-------------------------------------------------------------------------------
// consensus states // consensus states
func newConsensusState(state *sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState {
func newConsensusState(state sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState {
return newConsensusStateWithConfig(config, state, pv, app) return newConsensusStateWithConfig(config, state, pv, app)
} }
func newConsensusStateWithConfig(thisConfig *cfg.Config, state *sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState {
func newConsensusStateWithConfig(thisConfig *cfg.Config, state sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState {
blockDB := dbm.NewMemDB() blockDB := dbm.NewMemDB()
return newConsensusStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB) return newConsensusStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB)
} }
func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state *sm.State, pv types.PrivValidator, app abci.Application, blockDB dbm.DB) *ConsensusState {
func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.State, pv types.PrivValidator, app abci.Application, blockDB dbm.DB) *ConsensusState {
// Get BlockStore // Get BlockStore
blockStore := bc.NewBlockStore(blockDB) blockStore := bc.NewBlockStore(blockDB)
@ -264,7 +264,10 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state *sm.
evpool := types.MockEvidencePool{} evpool := types.MockEvidencePool{}
// Make ConsensusReactor // Make ConsensusReactor
cs := NewConsensusState(thisConfig.Consensus, state, proxyAppConnCon, blockStore, mempool, evpool)
stateDB := dbm.NewMemDB() // XXX !!
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(),
nil, proxyAppConnCon, mempool, evpool)
cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger()) cs.SetLogger(log.TestingLogger())
cs.SetPrivValidator(pv) cs.SetPrivValidator(pv)
@ -284,9 +287,7 @@ func loadPrivValidator(config *cfg.Config) *types.PrivValidatorFS {
} }
func fixedConsensusStateDummy(config *cfg.Config, logger log.Logger) *ConsensusState { func fixedConsensusStateDummy(config *cfg.Config, logger log.Logger) *ConsensusState {
stateDB := dbm.NewMemDB()
state, _ := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile())
state.SetLogger(logger.With("module", "state"))
state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile())
privValidator := loadPrivValidator(config) privValidator := loadPrivValidator(config)
cs := newConsensusState(state, privValidator, dummy.NewDummyApplication()) cs := newConsensusState(state, privValidator, dummy.NewDummyApplication())
cs.SetLogger(logger) cs.SetLogger(logger)
@ -354,10 +355,9 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou
css := make([]*ConsensusState, nValidators) css := make([]*ConsensusState, nValidators)
logger := consensusLogger() logger := consensusLogger()
for i := 0; i < nValidators; i++ { for i := 0; i < nValidators; i++ {
db := dbm.NewMemDB() // each state needs its own db
state, _ := sm.MakeGenesisState(db, genDoc)
state.SetLogger(logger.With("module", "state", "validator", i))
state.Save()
stateDB := dbm.NewMemDB() // each state needs its own db
state, _ := sm.MakeGenesisState(genDoc)
sm.SaveState(stateDB, state)
thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i)) thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i))
for _, opt := range configOpts { for _, opt := range configOpts {
opt(thisConfig) opt(thisConfig)
@ -380,10 +380,9 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF
css := make([]*ConsensusState, nPeers) css := make([]*ConsensusState, nPeers)
logger := consensusLogger() logger := consensusLogger()
for i := 0; i < nPeers; i++ { for i := 0; i < nPeers; i++ {
db := dbm.NewMemDB() // each state needs its own db
state, _ := sm.MakeGenesisState(db, genDoc)
state.SetLogger(logger.With("module", "state", "validator", i))
state.Save()
stateDB := dbm.NewMemDB() // each state needs its own db
state, _ := sm.MakeGenesisState(genDoc)
sm.SaveState(stateDB, state)
thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i)) thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i))
ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
var privVal types.PrivValidator var privVal types.PrivValidator
@ -437,12 +436,11 @@ func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.G
}, privValidators }, privValidators
} }
func randGenesisState(numValidators int, randPower bool, minPower int64) (*sm.State, []*types.PrivValidatorFS) {
func randGenesisState(numValidators int, randPower bool, minPower int64) (sm.State, []*types.PrivValidatorFS) {
genDoc, privValidators := randGenesisDoc(numValidators, randPower, minPower) genDoc, privValidators := randGenesisDoc(numValidators, randPower, minPower)
s0, _ := sm.MakeGenesisState(genDoc)
db := dbm.NewMemDB() db := dbm.NewMemDB()
s0, _ := sm.MakeGenesisState(db, genDoc)
s0.SetLogger(log.TestingLogger().With("module", "state"))
s0.Save()
sm.SaveState(db, s0)
return s0, privValidators return s0, privValidators
} }


+ 1
- 1
consensus/reactor.go View File

@ -82,7 +82,7 @@ func (conR *ConsensusReactor) OnStop() {
// SwitchToConsensus switches from fast_sync mode to consensus mode. // SwitchToConsensus switches from fast_sync mode to consensus mode.
// It resets the state, turns off fast_sync, and starts the consensus state-machine // It resets the state, turns off fast_sync, and starts the consensus state-machine
func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State, blocksSynced int) {
func (conR *ConsensusReactor) SwitchToConsensus(state sm.State, blocksSynced int) {
conR.Logger.Info("SwitchToConsensus") conR.Logger.Info("SwitchToConsensus")
conR.conS.reconstructLastCommit(state) conR.conS.reconstructLastCommit(state)
// NOTE: The line below causes broadcastNewRoundStepRoutine() to // NOTE: The line below causes broadcastNewRoundStepRoutine() to


+ 41
- 28
consensus/replay.go View File

@ -13,6 +13,7 @@ import (
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
//auto "github.com/tendermint/tmlibs/autofile" //auto "github.com/tendermint/tmlibs/autofile"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
@ -186,15 +187,16 @@ func makeHeightSearchFunc(height int64) auto.SearchFunc {
// we were last and using the WAL to recover there // we were last and using the WAL to recover there
type Handshaker struct { type Handshaker struct {
state *sm.State
store types.BlockStore
logger log.Logger
stateDB dbm.DB
initialState sm.State
store types.BlockStore
logger log.Logger
nBlocks int // number of blocks applied to the state nBlocks int // number of blocks applied to the state
} }
func NewHandshaker(state *sm.State, store types.BlockStore) *Handshaker {
return &Handshaker{state, store, log.NewNopLogger(), 0}
func NewHandshaker(stateDB dbm.DB, state sm.State, store types.BlockStore) *Handshaker {
return &Handshaker{stateDB, state, store, log.NewNopLogger(), 0}
} }
func (h *Handshaker) SetLogger(l log.Logger) { func (h *Handshaker) SetLogger(l log.Logger) {
@ -224,7 +226,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// TODO: check version // TODO: check version
// replay blocks up to the latest in the blockstore // replay blocks up to the latest in the blockstore
_, err = h.ReplayBlocks(appHash, blockHeight, proxyApp)
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
if err != nil { if err != nil {
return fmt.Errorf("Error on replay: %v", err) return fmt.Errorf("Error on replay: %v", err)
} }
@ -238,15 +240,15 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// Replay all blocks since appBlockHeight and ensure the result matches the current state. // Replay all blocks since appBlockHeight and ensure the result matches the current state.
// Returns the final AppHash or an error // Returns the final AppHash or an error
func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns) ([]byte, error) {
func (h *Handshaker) ReplayBlocks(state sm.State, appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns) ([]byte, error) {
storeBlockHeight := h.store.Height() storeBlockHeight := h.store.Height()
stateBlockHeight := h.state.LastBlockHeight
stateBlockHeight := state.LastBlockHeight
h.logger.Info("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight) h.logger.Info("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight)
// If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain // If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain
if appBlockHeight == 0 { if appBlockHeight == 0 {
validators := types.TM2PB.Validators(h.state.Validators)
validators := types.TM2PB.Validators(state.Validators)
if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil {
return nil, err return nil, err
} }
@ -254,7 +256,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp
// First handle edge cases and constraints on the storeBlockHeight // First handle edge cases and constraints on the storeBlockHeight
if storeBlockHeight == 0 { if storeBlockHeight == 0 {
return appHash, h.checkAppHash(appHash)
return appHash, checkAppHash(state, appHash)
} else if storeBlockHeight < appBlockHeight { } else if storeBlockHeight < appBlockHeight {
// the app should never be ahead of the store (but this is under app's control) // the app should never be ahead of the store (but this is under app's control)
@ -269,6 +271,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp
cmn.PanicSanity(cmn.Fmt("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1)) cmn.PanicSanity(cmn.Fmt("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1))
} }
var err error
// Now either store is equal to state, or one ahead. // Now either store is equal to state, or one ahead.
// For each, consider all cases of where the app could be, given app <= store // For each, consider all cases of where the app could be, given app <= store
if storeBlockHeight == stateBlockHeight { if storeBlockHeight == stateBlockHeight {
@ -276,11 +279,11 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp
// Either the app is asking for replay, or we're all synced up. // Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight { if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, false)
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false)
} else if appBlockHeight == storeBlockHeight { } else if appBlockHeight == storeBlockHeight {
// We're good! // We're good!
return appHash, h.checkAppHash(appHash)
return appHash, checkAppHash(state, appHash)
} }
} else if storeBlockHeight == stateBlockHeight+1 { } else if storeBlockHeight == stateBlockHeight+1 {
@ -289,7 +292,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp
if appBlockHeight < stateBlockHeight { if appBlockHeight < stateBlockHeight {
// the app is further behind than it should be, so replay blocks // the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL // but leave the last block to go through the WAL
return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, true)
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true)
} else if appBlockHeight == stateBlockHeight { } else if appBlockHeight == stateBlockHeight {
// We haven't run Commit (both the state and app are one block behind), // We haven't run Commit (both the state and app are one block behind),
@ -297,17 +300,19 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp
// NOTE: We could instead use the cs.WAL on cs.Start, // NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT // but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT
h.logger.Info("Replay last block using real app") h.logger.Info("Replay last block using real app")
return h.replayBlock(storeBlockHeight, proxyApp.Consensus())
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
return state.AppHash, err
} else if appBlockHeight == storeBlockHeight { } else if appBlockHeight == storeBlockHeight {
// We ran Commit, but didn't save the state, so replayBlock with mock app // We ran Commit, but didn't save the state, so replayBlock with mock app
abciResponses, err := sm.LoadABCIResponses(h.state.DB(), storeBlockHeight)
abciResponses, err := sm.LoadABCIResponses(h.stateDB, storeBlockHeight)
if err != nil { if err != nil {
return nil, err return nil, err
} }
mockApp := newMockProxyApp(appHash, abciResponses) mockApp := newMockProxyApp(appHash, abciResponses)
h.logger.Info("Replay last block using mock app") h.logger.Info("Replay last block using mock app")
return h.replayBlock(storeBlockHeight, mockApp)
state, err = h.replayBlock(state, storeBlockHeight, mockApp)
return state.AppHash, err
} }
} }
@ -316,7 +321,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp
return nil, nil return nil, nil
} }
func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) {
func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) {
// App is further behind than it should be, so we need to replay blocks. // App is further behind than it should be, so we need to replay blocks.
// We replay all blocks from appBlockHeight+1. // We replay all blocks from appBlockHeight+1.
// //
@ -336,7 +341,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store
for i := appBlockHeight + 1; i <= finalBlock; i++ { for i := appBlockHeight + 1; i <= finalBlock; i++ {
h.logger.Info("Applying block", "height", i) h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i) block := h.store.LoadBlock(i)
appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, h.state.LastValidators)
appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -346,33 +351,41 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store
if mutateState { if mutateState {
// sync the final block // sync the final block
return h.replayBlock(storeBlockHeight, proxyApp.Consensus())
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
if err != nil {
return nil, err
}
appHash = state.AppHash
} }
return appHash, h.checkAppHash(appHash)
return appHash, checkAppHash(state, appHash)
} }
// ApplyBlock on the proxyApp with the last block. // ApplyBlock on the proxyApp with the last block.
func (h *Handshaker) replayBlock(height int64, proxyApp proxy.AppConnConsensus) ([]byte, error) {
func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) {
mempool := types.MockMempool{} mempool := types.MockMempool{}
evpool := types.MockEvidencePool{} evpool := types.MockEvidencePool{}
block := h.store.LoadBlock(height) block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height) meta := h.store.LoadBlockMeta(height)
if err := h.state.ApplyBlock(types.NopEventBus{}, proxyApp,
block, meta.BlockID.PartsHeader, mempool, evpool); err != nil {
return nil, err
blockExec := sm.NewBlockExecutor(h.stateDB, h.logger,
types.NopEventBus{}, proxyApp, mempool, evpool)
var err error
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)
if err != nil {
return sm.State{}, err
} }
h.nBlocks += 1 h.nBlocks += 1
return h.state.AppHash, nil
return state, nil
} }
func (h *Handshaker) checkAppHash(appHash []byte) error {
if !bytes.Equal(h.state.AppHash, appHash) {
panic(fmt.Errorf("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash).Error())
func checkAppHash(state sm.State, appHash []byte) error {
if !bytes.Equal(state.AppHash, appHash) {
panic(fmt.Errorf("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, state.AppHash).Error())
} }
return nil return nil
} }


+ 14
- 8
consensus/replay_file.go View File

@ -18,6 +18,7 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
) )
const ( const (
@ -104,11 +105,11 @@ type playback struct {
count int // how many lines/msgs into the file are we count int // how many lines/msgs into the file are we
// replays can be reset to beginning // replays can be reset to beginning
fileName string // so we can close/reopen the file
genesisState *sm.State // so the replay session knows where to restart from
fileName string // so we can close/reopen the file
genesisState sm.State // so the replay session knows where to restart from
} }
func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.State) *playback {
func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState sm.State) *playback {
return &playback{ return &playback{
cs: cs, cs: cs,
fp: fp, fp: fp,
@ -123,7 +124,7 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
pb.cs.Stop() pb.cs.Stop()
pb.cs.Wait() pb.cs.Wait()
newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn,
newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
pb.cs.blockStore, pb.cs.mempool, pb.cs.evpool) pb.cs.blockStore, pb.cs.mempool, pb.cs.evpool)
newCS.SetEventBus(pb.cs.eventBus) newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay() newCS.startForReplay()
@ -285,14 +286,14 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
// Get State // Get State
stateDB := dbm.NewDB("state", config.DBBackend, config.DBDir()) stateDB := dbm.NewDB("state", config.DBBackend, config.DBDir())
state, err := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile())
state, err := sm.MakeGenesisStateFromFile(config.GenesisFile())
if err != nil { if err != nil {
cmn.Exit(err.Error()) cmn.Exit(err.Error())
} }
// Create proxyAppConn connection (consensus, mempool, query) // Create proxyAppConn connection (consensus, mempool, query)
clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()) clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir())
proxyApp := proxy.NewAppConns(clientCreator, NewHandshaker(state, blockStore))
proxyApp := proxy.NewAppConns(clientCreator, NewHandshaker(stateDB, state, blockStore))
err = proxyApp.Start() err = proxyApp.Start()
if err != nil { if err != nil {
cmn.Exit(cmn.Fmt("Error starting proxy app conns: %v", err)) cmn.Exit(cmn.Fmt("Error starting proxy app conns: %v", err))
@ -303,8 +304,13 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err)) cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err))
} }
consensusState := NewConsensusState(csConfig, state.Copy(), proxyApp.Consensus(),
blockStore, types.MockMempool{}, types.MockEvidencePool{})
mempool, evpool := types.MockMempool{}, types.MockEvidencePool{}
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(),
nil, proxyApp.Consensus(),
mempool, evpool)
consensusState := NewConsensusState(csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool)
consensusState.SetEventBus(eventBus) consensusState.SetEventBus(eventBus)
return consensusState return consensusState


+ 34
- 37
consensus/replay_test.go View File

@ -54,7 +54,6 @@ func init() {
func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64, blockDB dbm.DB, stateDB dbm.DB) { func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64, blockDB dbm.DB, stateDB dbm.DB) {
logger := log.TestingLogger() logger := log.TestingLogger()
state, _ := sm.GetState(stateDB, consensusReplayConfig.GenesisFile()) state, _ := sm.GetState(stateDB, consensusReplayConfig.GenesisFile())
state.SetLogger(logger.With("module", "state"))
privValidator := loadPrivValidator(consensusReplayConfig) privValidator := loadPrivValidator(consensusReplayConfig)
cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB) cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB)
cs.SetLogger(logger) cs.SetLogger(logger)
@ -98,22 +97,22 @@ func sendTxs(cs *ConsensusState, ctx context.Context) {
func TestWALCrash(t *testing.T) { func TestWALCrash(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
initFn func(*ConsensusState, context.Context)
initFn func(dbm.DB, *ConsensusState, context.Context)
heightToStop int64 heightToStop int64
}{ }{
{"empty block", {"empty block",
func(cs *ConsensusState, ctx context.Context) {},
func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {},
1}, 1},
{"block with a smaller part size", {"block with a smaller part size",
func(cs *ConsensusState, ctx context.Context) {
func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {
// XXX: is there a better way to change BlockPartSizeBytes? // XXX: is there a better way to change BlockPartSizeBytes?
cs.state.ConsensusParams.BlockPartSizeBytes = 512 cs.state.ConsensusParams.BlockPartSizeBytes = 512
cs.state.Save()
sm.SaveState(stateDB, cs.state)
go sendTxs(cs, ctx) go sendTxs(cs, ctx)
}, },
1}, 1},
{"many non-empty blocks", {"many non-empty blocks",
func(cs *ConsensusState, ctx context.Context) {
func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {
go sendTxs(cs, ctx) go sendTxs(cs, ctx)
}, },
3}, 3},
@ -126,7 +125,7 @@ func TestWALCrash(t *testing.T) {
} }
} }
func crashWALandCheckLiveness(t *testing.T, initFn func(*ConsensusState, context.Context), heightToStop int64) {
func crashWALandCheckLiveness(t *testing.T, initFn func(dbm.DB, *ConsensusState, context.Context), heightToStop int64) {
walPaniced := make(chan error) walPaniced := make(chan error)
crashingWal := &crashingWAL{panicCh: walPaniced, heightToStop: heightToStop} crashingWal := &crashingWAL{panicCh: walPaniced, heightToStop: heightToStop}
@ -139,8 +138,7 @@ LOOP:
// create consensus state from a clean slate // create consensus state from a clean slate
logger := log.NewNopLogger() logger := log.NewNopLogger()
stateDB := dbm.NewMemDB() stateDB := dbm.NewMemDB()
state, _ := sm.MakeGenesisStateFromFile(stateDB, consensusReplayConfig.GenesisFile())
state.SetLogger(logger.With("module", "state"))
state, _ := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile())
privValidator := loadPrivValidator(consensusReplayConfig) privValidator := loadPrivValidator(consensusReplayConfig)
blockDB := dbm.NewMemDB() blockDB := dbm.NewMemDB()
cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB) cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB)
@ -148,7 +146,7 @@ LOOP:
// start sending transactions // start sending transactions
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
initFn(cs, ctx)
initFn(stateDB, cs, ctx)
// clean up WAL file from the previous iteration // clean up WAL file from the previous iteration
walFile := cs.config.WalFile() walFile := cs.config.WalFile()
@ -344,12 +342,13 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
state, store := stateAndStore(config, privVal.GetPubKey())
stateDB, state, store := stateAndStore(config, privVal.GetPubKey())
store.chain = chain store.chain = chain
store.commits = commits store.commits = commits
// run the chain through state.ApplyBlock to build up the tendermint state // run the chain through state.ApplyBlock to build up the tendermint state
latestAppHash := buildTMStateFromChain(config, state, chain, mode)
state = buildTMStateFromChain(config, stateDB, state, chain, mode)
latestAppHash := state.AppHash
// make a new client creator // make a new client creator
dummyApp := dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "2")) dummyApp := dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "2"))
@ -358,12 +357,12 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
// run nBlocks against a new client to build up the app state. // run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state // use a throwaway tendermint state
proxyApp := proxy.NewAppConns(clientCreator2, nil) proxyApp := proxy.NewAppConns(clientCreator2, nil)
state, _ := stateAndStore(config, privVal.GetPubKey())
buildAppStateFromChain(proxyApp, state, chain, nBlocks, mode)
stateDB, state, _ := stateAndStore(config, privVal.GetPubKey())
buildAppStateFromChain(proxyApp, stateDB, state, chain, nBlocks, mode)
} }
// now start the app using the handshake - it should sync // now start the app using the handshake - it should sync
handshaker := NewHandshaker(state, store)
handshaker := NewHandshaker(stateDB, state, store)
proxyApp := proxy.NewAppConns(clientCreator2, handshaker) proxyApp := proxy.NewAppConns(clientCreator2, handshaker)
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err) t.Fatalf("Error starting proxy app connections: %v", err)
@ -393,16 +392,21 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
} }
} }
func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) {
func applyBlock(stateDB dbm.DB, st sm.State, blk *types.Block, proxyApp proxy.AppConns) sm.State {
testPartSize := st.ConsensusParams.BlockPartSizeBytes testPartSize := st.ConsensusParams.BlockPartSizeBytes
err := st.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool, evpool)
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(),
types.NopEventBus{}, proxyApp.Consensus(), mempool, evpool)
blkID := types.BlockID{blk.Hash(), blk.MakePartSet(testPartSize).Header()}
newState, err := blockExec.ApplyBlock(st, blkID, blk)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return newState
} }
func buildAppStateFromChain(proxyApp proxy.AppConns,
state *sm.State, chain []*types.Block, nBlocks int, mode uint) {
func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB,
state sm.State, chain []*types.Block, nBlocks int, mode uint) {
// start a new app without handshake, play nBlocks blocks // start a new app without handshake, play nBlocks blocks
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
panic(err) panic(err)
@ -418,24 +422,24 @@ func buildAppStateFromChain(proxyApp proxy.AppConns,
case 0: case 0:
for i := 0; i < nBlocks; i++ { for i := 0; i < nBlocks; i++ {
block := chain[i] block := chain[i]
applyBlock(state, block, proxyApp)
state = applyBlock(stateDB, state, block, proxyApp)
} }
case 1, 2: case 1, 2:
for i := 0; i < nBlocks-1; i++ { for i := 0; i < nBlocks-1; i++ {
block := chain[i] block := chain[i]
applyBlock(state, block, proxyApp)
state = applyBlock(stateDB, state, block, proxyApp)
} }
if mode == 2 { if mode == 2 {
// update the dummy height and apphash // update the dummy height and apphash
// as if we ran commit but not // as if we ran commit but not
applyBlock(state, chain[nBlocks-1], proxyApp)
state = applyBlock(stateDB, state, chain[nBlocks-1], proxyApp)
} }
} }
} }
func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.Block, mode uint) []byte {
func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, chain []*types.Block, mode uint) sm.State {
// run the whole chain against this client to build up the tendermint state // run the whole chain against this client to build up the tendermint state
clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "1"))) clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "1")))
proxyApp := proxy.NewAppConns(clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock)) proxyApp := proxy.NewAppConns(clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock))
@ -449,31 +453,26 @@ func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.B
panic(err) panic(err)
} }
var latestAppHash []byte
switch mode { switch mode {
case 0: case 0:
// sync right up // sync right up
for _, block := range chain { for _, block := range chain {
applyBlock(state, block, proxyApp)
state = applyBlock(stateDB, state, block, proxyApp)
} }
latestAppHash = state.AppHash
case 1, 2: case 1, 2:
// sync up to the penultimate as if we stored the block. // sync up to the penultimate as if we stored the block.
// whether we commit or not depends on the appHash // whether we commit or not depends on the appHash
for _, block := range chain[:len(chain)-1] { for _, block := range chain[:len(chain)-1] {
applyBlock(state, block, proxyApp)
state = applyBlock(stateDB, state, block, proxyApp)
} }
// apply the final block to a state copy so we can // apply the final block to a state copy so we can
// get the right next appHash but keep the state back // get the right next appHash but keep the state back
stateCopy := state.Copy()
applyBlock(stateCopy, chain[len(chain)-1], proxyApp)
latestAppHash = stateCopy.AppHash
applyBlock(stateDB, state, chain[len(chain)-1], proxyApp)
} }
return latestAppHash
return state
} }
//-------------------------- //--------------------------
@ -587,13 +586,11 @@ func readPieceFromWAL(msg *TimedWALMessage) interface{} {
} }
// fresh state and mock store // fresh state and mock store
func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (*sm.State, *mockBlockStore) {
func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (dbm.DB, sm.State, *mockBlockStore) {
stateDB := dbm.NewMemDB() stateDB := dbm.NewMemDB()
state, _ := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile())
state.SetLogger(log.TestingLogger().With("module", "state"))
state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile())
store := NewMockBlockStore(config, state.ConsensusParams) store := NewMockBlockStore(config, state.ConsensusParams)
return state, store
return stateDB, state, store
} }
//---------------------------------- //----------------------------------


+ 19
- 20
consensus/state.go View File

@ -17,7 +17,6 @@ import (
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
cstypes "github.com/tendermint/tendermint/consensus/types" cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -75,15 +74,16 @@ type ConsensusState struct {
privValidator types.PrivValidator // for signing votes privValidator types.PrivValidator // for signing votes
// services for creating and executing blocks // services for creating and executing blocks
proxyAppConn proxy.AppConnConsensus
blockStore types.BlockStore
mempool types.Mempool
evpool types.EvidencePool
// TODO: encapsulate all of this in one "BlockManager"
blockExec *sm.BlockExecutor
blockStore types.BlockStore
mempool types.Mempool
evpool types.EvidencePool
// internal state // internal state
mtx sync.Mutex mtx sync.Mutex
cstypes.RoundState cstypes.RoundState
state *sm.State // State until height-1.
state sm.State // State until height-1.
// state changes may be triggered by msgs from peers, // state changes may be triggered by msgs from peers,
// msgs from ourself, or by timeouts // msgs from ourself, or by timeouts
@ -114,10 +114,10 @@ type ConsensusState struct {
} }
// NewConsensusState returns a new ConsensusState. // NewConsensusState returns a new ConsensusState.
func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState {
func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState {
cs := &ConsensusState{ cs := &ConsensusState{
config: config, config: config,
proxyAppConn: proxyAppConn,
blockExec: blockExec,
blockStore: blockStore, blockStore: blockStore,
mempool: mempool, mempool: mempool,
peerMsgQueue: make(chan msgInfo, msgQueueSize), peerMsgQueue: make(chan msgInfo, msgQueueSize),
@ -162,7 +162,7 @@ func (cs *ConsensusState) String() string {
} }
// GetState returns a copy of the chain state. // GetState returns a copy of the chain state.
func (cs *ConsensusState) GetState() *sm.State {
func (cs *ConsensusState) GetState() sm.State {
cs.mtx.Lock() cs.mtx.Lock()
defer cs.mtx.Unlock() defer cs.mtx.Unlock()
return cs.state.Copy() return cs.state.Copy()
@ -399,7 +399,7 @@ func (cs *ConsensusState) sendInternalMessage(mi msgInfo) {
// Reconstruct LastCommit from SeenCommit, which we saved along with the block, // Reconstruct LastCommit from SeenCommit, which we saved along with the block,
// (which happens even before saving the state) // (which happens even before saving the state)
func (cs *ConsensusState) reconstructLastCommit(state *sm.State) {
func (cs *ConsensusState) reconstructLastCommit(state sm.State) {
if state.LastBlockHeight == 0 { if state.LastBlockHeight == 0 {
return return
} }
@ -422,12 +422,12 @@ func (cs *ConsensusState) reconstructLastCommit(state *sm.State) {
// Updates ConsensusState and increments height to match that of state. // Updates ConsensusState and increments height to match that of state.
// The round becomes 0 and cs.Step becomes cstypes.RoundStepNewHeight. // The round becomes 0 and cs.Step becomes cstypes.RoundStepNewHeight.
func (cs *ConsensusState) updateToState(state *sm.State) {
func (cs *ConsensusState) updateToState(state sm.State) {
if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight { if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight {
cmn.PanicSanity(cmn.Fmt("updateToState() expected state height of %v but found %v", cmn.PanicSanity(cmn.Fmt("updateToState() expected state height of %v but found %v",
cs.Height, state.LastBlockHeight)) cs.Height, state.LastBlockHeight))
} }
if cs.state != nil && cs.state.LastBlockHeight+1 != cs.Height {
if !cs.state.IsEmpty() && cs.state.LastBlockHeight+1 != cs.Height {
// This might happen when someone else is mutating cs.state. // This might happen when someone else is mutating cs.state.
// Someone forgot to pass in state.Copy() somewhere?! // Someone forgot to pass in state.Copy() somewhere?!
cmn.PanicSanity(cmn.Fmt("Inconsistent cs.state.LastBlockHeight+1 %v vs cs.Height %v", cmn.PanicSanity(cmn.Fmt("Inconsistent cs.state.LastBlockHeight+1 %v vs cs.Height %v",
@ -437,7 +437,7 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
// If state isn't further out than cs.state, just ignore. // If state isn't further out than cs.state, just ignore.
// This happens when SwitchToConsensus() is called in the reactor. // This happens when SwitchToConsensus() is called in the reactor.
// We don't want to reset e.g. the Votes. // We don't want to reset e.g. the Votes.
if cs.state != nil && (state.LastBlockHeight <= cs.state.LastBlockHeight) {
if !cs.state.IsEmpty() && (state.LastBlockHeight <= cs.state.LastBlockHeight) {
cs.Logger.Info("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1) cs.Logger.Info("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1)
return return
} }
@ -922,7 +922,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int64, round int) {
} }
// Validate proposal block // Validate proposal block
err := cs.state.ValidateBlock(cs.ProposalBlock)
err := sm.ValidateBlock(cs.state, cs.ProposalBlock)
if err != nil { if err != nil {
// ProposalBlock is invalid, prevote nil. // ProposalBlock is invalid, prevote nil.
logger.Error("enterPrevote: ProposalBlock is invalid", "err", err) logger.Error("enterPrevote: ProposalBlock is invalid", "err", err)
@ -1030,7 +1030,7 @@ func (cs *ConsensusState) enterPrecommit(height int64, round int) {
if cs.ProposalBlock.HashesTo(blockID.Hash) { if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.Logger.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", blockID.Hash) cs.Logger.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", blockID.Hash)
// Validate the block. // Validate the block.
if err := cs.state.ValidateBlock(cs.ProposalBlock); err != nil {
if err := sm.ValidateBlock(cs.state, cs.ProposalBlock); err != nil {
cmn.PanicConsensus(cmn.Fmt("enterPrecommit: +2/3 prevoted for an invalid block: %v", err)) cmn.PanicConsensus(cmn.Fmt("enterPrecommit: +2/3 prevoted for an invalid block: %v", err))
} }
cs.LockedRound = round cs.LockedRound = round
@ -1165,7 +1165,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
if !block.HashesTo(blockID.Hash) { if !block.HashesTo(blockID.Hash) {
cmn.PanicSanity(cmn.Fmt("Cannot finalizeCommit, ProposalBlock does not hash to commit hash")) cmn.PanicSanity(cmn.Fmt("Cannot finalizeCommit, ProposalBlock does not hash to commit hash"))
} }
if err := cs.state.ValidateBlock(block); err != nil {
if err := sm.ValidateBlock(cs.state, block); err != nil {
cmn.PanicConsensus(cmn.Fmt("+2/3 committed an invalid block: %v", err)) cmn.PanicConsensus(cmn.Fmt("+2/3 committed an invalid block: %v", err))
} }
@ -1204,13 +1204,12 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
// and an event cache for txs // and an event cache for txs
stateCopy := cs.state.Copy() stateCopy := cs.state.Copy()
txEventBuffer := types.NewTxEventBuffer(cs.eventBus, int(block.NumTxs)) txEventBuffer := types.NewTxEventBuffer(cs.eventBus, int(block.NumTxs))
cs.blockExec.SetTxEventPublisher(txEventBuffer)
// Execute and commit the block, update and save the state, and update the mempool. // Execute and commit the block, update and save the state, and update the mempool.
// All calls to the proxyAppConn come here.
// NOTE: the block.AppHash wont reflect these txs until the next block // NOTE: the block.AppHash wont reflect these txs until the next block
err := stateCopy.ApplyBlock(txEventBuffer, cs.proxyAppConn,
block, blockParts.Header(),
cs.mempool, cs.evpool)
var err error
stateCopy, err = cs.blockExec.ApplyBlock(stateCopy, types.BlockID{block.Hash(), blockParts.Header()}, block)
if err != nil { if err != nil {
cs.Logger.Error("Error on ApplyBlock. Did the application crash? Please restart tendermint", "err", err) cs.Logger.Error("Error on ApplyBlock. Did the application crash? Please restart tendermint", "err", err)
err := cmn.Kill() err := cmn.Kill()


+ 4
- 4
consensus/wal_generator.go View File

@ -47,13 +47,12 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
} }
stateDB := db.NewMemDB() stateDB := db.NewMemDB()
blockStoreDB := db.NewMemDB() blockStoreDB := db.NewMemDB()
state, err := sm.MakeGenesisState(stateDB, genDoc)
state.SetLogger(logger.With("module", "state"))
state, err := sm.MakeGenesisState(genDoc)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to make genesis state") return nil, errors.Wrap(err, "failed to make genesis state")
} }
blockStore := bc.NewBlockStore(blockStoreDB) blockStore := bc.NewBlockStore(blockStoreDB)
handshaker := NewHandshaker(state, blockStore)
handshaker := NewHandshaker(stateDB, state, blockStore)
proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app), handshaker) proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app), handshaker)
proxyApp.SetLogger(logger.With("module", "proxy")) proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
@ -68,7 +67,8 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
defer eventBus.Stop() defer eventBus.Stop()
mempool := types.MockMempool{} mempool := types.MockMempool{}
evpool := types.MockEvidencePool{} evpool := types.MockEvidencePool{}
consensusState := NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), nil, proxyApp.Consensus(), mempool, evpool)
consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetLogger(logger) consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus) consensusState.SetEventBus(eventBus)
if privValidator != nil { if privValidator != nil {


+ 12
- 9
evidence/pool.go View File

@ -21,13 +21,13 @@ type EvidencePool struct {
evidenceChan chan types.Evidence evidenceChan chan types.Evidence
} }
func NewEvidencePool(params types.EvidenceParams, evidenceStore *EvidenceStore, state types.State) *EvidencePool {
func NewEvidencePool(params types.EvidenceParams, evidenceStore *EvidenceStore) *EvidencePool {
evpool := &EvidencePool{ evpool := &EvidencePool{
params: params, params: params,
logger: log.NewNopLogger(), logger: log.NewNopLogger(),
evidenceStore: evidenceStore, evidenceStore: evidenceStore,
state: *state,
evidenceChan: make(chan types.Evidence),
// state: *state,
evidenceChan: make(chan types.Evidence),
} }
return evpool return evpool
} }
@ -58,12 +58,15 @@ func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) {
// TODO: check if we already have evidence for this // TODO: check if we already have evidence for this
// validator at this height so we dont get spammed // validator at this height so we dont get spammed
priority, err := sm.VerifyEvidence(evpool.state, evidence)
if err != nil {
// TODO: if err is just that we cant find it cuz we pruned, ignore.
// TODO: if its actually bad evidence, punish peer
return err
}
// TODO
var priority int64
/*
priority, err := sm.VerifyEvidence(evpool.state, evidence)
if err != nil {
// TODO: if err is just that we cant find it cuz we pruned, ignore.
// TODO: if its actually bad evidence, punish peer
return err
}*/
added := evpool.evidenceStore.AddNewEvidence(evidence, priority) added := evpool.evidenceStore.AddNewEvidence(evidence, priority)
if !added { if !added {


+ 22
- 16
node/node.go View File

@ -102,7 +102,8 @@ type Node struct {
trustMetricStore *trust.TrustMetricStore // trust metrics for all peers trustMetricStore *trust.TrustMetricStore // trust metrics for all peers
// services // services
eventBus *types.EventBus // pub/sub for services
eventBus *types.EventBus // pub/sub for services
stateDB dbm.DB
blockStore *bc.BlockStore // store the blockchain to disk blockStore *bc.BlockStore // store the blockchain to disk
bcReactor *bc.BlockchainReactor // for fast-syncing bcReactor *bc.BlockchainReactor // for fast-syncing
mempoolReactor *mempl.MempoolReactor // for gossipping transactions mempoolReactor *mempl.MempoolReactor // for gossipping transactions
@ -148,21 +149,20 @@ func NewNode(config *cfg.Config,
saveGenesisDoc(stateDB, genDoc) saveGenesisDoc(stateDB, genDoc)
} }
stateLogger := logger.With("module", "state")
state := sm.LoadState(stateDB) state := sm.LoadState(stateDB)
if state == nil {
state, err = sm.MakeGenesisState(stateDB, genDoc)
if state.IsEmpty() {
state, err = sm.MakeGenesisState(genDoc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
state.Save()
sm.SaveState(stateDB, state)
} }
state.SetLogger(stateLogger)
// Create the proxyApp, which manages connections (consensus, mempool, query) // Create the proxyApp, which manages connections (consensus, mempool, query)
// and sync tendermint and the app by replaying any necessary blocks
// and sync tendermint and the app by performing a handshake
// and replaying any necessary blocks
consensusLogger := logger.With("module", "consensus") consensusLogger := logger.With("module", "consensus")
handshaker := consensus.NewHandshaker(state, blockStore)
handshaker := consensus.NewHandshaker(stateDB, state, blockStore)
handshaker.SetLogger(consensusLogger) handshaker.SetLogger(consensusLogger)
proxyApp := proxy.NewAppConns(clientCreator, handshaker) proxyApp := proxy.NewAppConns(clientCreator, handshaker)
proxyApp.SetLogger(logger.With("module", "proxy")) proxyApp.SetLogger(logger.With("module", "proxy"))
@ -172,7 +172,6 @@ func NewNode(config *cfg.Config,
// reload the state (it may have been updated by the handshake) // reload the state (it may have been updated by the handshake)
state = sm.LoadState(stateDB) state = sm.LoadState(stateDB)
state.SetLogger(stateLogger)
// Generate node PrivKey // Generate node PrivKey
privKey := crypto.GenPrivKeyEd25519() privKey := crypto.GenPrivKeyEd25519()
@ -194,10 +193,6 @@ func NewNode(config *cfg.Config,
consensusLogger.Info("This node is not a validator") consensusLogger.Info("This node is not a validator")
} }
// Make BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
// Make MempoolReactor // Make MempoolReactor
mempoolLogger := logger.With("module", "mempool") mempoolLogger := logger.With("module", "mempool")
mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight) mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight)
@ -216,14 +211,24 @@ func NewNode(config *cfg.Config,
} }
evidenceLogger := logger.With("module", "evidence") evidenceLogger := logger.With("module", "evidence")
evidenceStore := evidence.NewEvidenceStore(evidenceDB) evidenceStore := evidence.NewEvidenceStore(evidenceDB)
evidencePool := evidence.NewEvidencePool(state.ConsensusParams.EvidenceParams, evidenceStore, state.Copy())
evidencePool := evidence.NewEvidencePool(state.ConsensusParams.EvidenceParams, evidenceStore) // , state.Copy())
evidencePool.SetLogger(evidenceLogger) evidencePool.SetLogger(evidenceLogger)
evidenceReactor := evidence.NewEvidenceReactor(evidencePool) evidenceReactor := evidence.NewEvidenceReactor(evidencePool)
evidenceReactor.SetLogger(evidenceLogger) evidenceReactor.SetLogger(evidenceLogger)
blockExecLogger := logger.With("module", "state")
// make block executor for consensus and blockchain reactors to execute blocks
blockExec := sm.NewBlockExecutor(stateDB, blockExecLogger,
nil, proxyApp.Consensus(),
mempool, evidencePool)
// Make BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
// Make ConsensusReactor // Make ConsensusReactor
consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(),
proxyApp.Consensus(), blockStore, mempool, evidencePool)
blockExec, blockStore, mempool, evidencePool)
consensusState.SetLogger(consensusLogger) consensusState.SetLogger(consensusLogger)
if privValidator != nil { if privValidator != nil {
consensusState.SetPrivValidator(privValidator) consensusState.SetPrivValidator(privValidator)
@ -291,7 +296,6 @@ func NewNode(config *cfg.Config,
eventBus.SetLogger(logger.With("module", "events")) eventBus.SetLogger(logger.With("module", "events"))
// services which will be publishing and/or subscribing for messages (events) // services which will be publishing and/or subscribing for messages (events)
bcReactor.SetEventBus(eventBus)
consensusReactor.SetEventBus(eventBus) consensusReactor.SetEventBus(eventBus)
// Transaction indexing // Transaction indexing
@ -333,6 +337,7 @@ func NewNode(config *cfg.Config,
addrBook: addrBook, addrBook: addrBook,
trustMetricStore: trustMetricStore, trustMetricStore: trustMetricStore,
stateDB: stateDB,
blockStore: blockStore, blockStore: blockStore,
bcReactor: bcReactor, bcReactor: bcReactor,
mempoolReactor: mempoolReactor, mempoolReactor: mempoolReactor,
@ -429,6 +434,7 @@ func (n *Node) AddListener(l p2p.Listener) {
// ConfigureRPC sets all variables in rpccore so they will serve // ConfigureRPC sets all variables in rpccore so they will serve
// rpc calls from this node // rpc calls from this node
func (n *Node) ConfigureRPC() { func (n *Node) ConfigureRPC() {
rpccore.SetStateDB(n.stateDB)
rpccore.SetBlockStore(n.blockStore) rpccore.SetBlockStore(n.blockStore)
rpccore.SetConsensusState(n.consensusState) rpccore.SetConsensusState(n.consensusState)
rpccore.SetMempool(n.mempoolReactor.Mempool) rpccore.SetMempool(n.mempoolReactor.Mempool)


+ 1
- 2
rpc/core/blocks.go View File

@ -337,8 +337,7 @@ func BlockResults(heightPtr *int64) (*ctypes.ResultBlockResults, error) {
} }
// load the results // load the results
state := consensusState.GetState()
results, err := sm.LoadABCIResponses(state.DB(), height)
results, err := sm.LoadABCIResponses(stateDB, height)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 1
- 2
rpc/core/consensus.go View File

@ -50,8 +50,7 @@ func Validators(heightPtr *int64) (*ctypes.ResultValidators, error) {
return nil, err return nil, err
} }
state := consensusState.GetState()
validators, err := sm.LoadValidators(state.DB(), height)
validators, err := sm.LoadValidators(stateDB, height)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 7
- 1
rpc/core/pipe.go View File

@ -11,6 +11,7 @@ import (
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
) )
@ -20,7 +21,7 @@ var subscribeTimeout = 5 * time.Second
// These interfaces are used by RPC and must be thread safe // These interfaces are used by RPC and must be thread safe
type Consensus interface { type Consensus interface {
GetState() *sm.State
GetState() sm.State
GetValidators() (int64, []*types.Validator) GetValidators() (int64, []*types.Validator)
GetRoundState() *cstypes.RoundState GetRoundState() *cstypes.RoundState
} }
@ -43,6 +44,7 @@ var (
proxyAppQuery proxy.AppConnQuery proxyAppQuery proxy.AppConnQuery
// interfaces defined in types and above // interfaces defined in types and above
stateDB dbm.DB
blockStore types.BlockStore blockStore types.BlockStore
mempool types.Mempool mempool types.Mempool
evidencePool types.EvidencePool evidencePool types.EvidencePool
@ -60,6 +62,10 @@ var (
logger log.Logger logger log.Logger
) )
func SetStateDB(db dbm.DB) {
stateDB = db
}
func SetBlockStore(bs types.BlockStore) { func SetBlockStore(bs types.BlockStore) {
blockStore = bs blockStore = bs
} }


+ 2
- 4
state/db.go View File

@ -36,7 +36,7 @@ func GetState(stateDB dbm.DB, genesisFile string) (State, error) {
if err != nil { if err != nil {
return state, err return state, err
} }
SaveState(stateDB, state, state.AppHash)
SaveState(stateDB, state)
} }
return state, nil return state, nil
@ -66,9 +66,7 @@ func loadState(db dbm.DB, key []byte) (state State) {
} }
// SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database. // SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database.
// It sets the given appHash on the state before persisting.
func SaveState(db dbm.DB, s State, appHash []byte) {
s.AppHash = appHash
func SaveState(db dbm.DB, s State) {
nextHeight := s.LastBlockHeight + 1 nextHeight := s.LastBlockHeight + 1
saveValidatorsInfo(db, nextHeight, s.LastHeightValidatorsChanged, s.Validators) saveValidatorsInfo(db, nextHeight, s.LastHeightValidatorsChanged, s.Validators)
saveConsensusParamsInfo(db, nextHeight, s.LastHeightConsensusParamsChanged, s.ConsensusParams) saveConsensusParamsInfo(db, nextHeight, s.LastHeightConsensusParamsChanged, s.ConsensusParams)


+ 12
- 9
state/execution.go View File

@ -30,6 +30,10 @@ type BlockExecutor struct {
evpool types.EvidencePool evpool types.EvidencePool
} }
func (blockExec *BlockExecutor) SetTxEventPublisher(txEventPublisher types.TxEventPublisher) {
blockExec.txEventPublisher = txEventPublisher
}
// NewBlockExecutor returns a new BlockExecutor. // NewBlockExecutor returns a new BlockExecutor.
func NewBlockExecutor(db dbm.DB, logger log.Logger, func NewBlockExecutor(db dbm.DB, logger log.Logger,
txEventer types.TxEventPublisher, proxyApp proxy.AppConnConsensus, txEventer types.TxEventPublisher, proxyApp proxy.AppConnConsensus,
@ -82,8 +86,9 @@ func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block
fail.Fail() // XXX fail.Fail() // XXX
// save the state and the validators
SaveState(blockExec.db, s, appHash)
// update the app hash and save the state
s.AppHash = appHash
SaveState(blockExec.db, s)
return s, nil return s, nil
} }
@ -110,7 +115,7 @@ func (blockExec *BlockExecutor) Commit(block *types.Block) ([]byte, error) {
blockExec.logger.Debug("Commit.Log: " + res.Log) blockExec.logger.Debug("Commit.Log: " + res.Log)
} }
blockExec.logger.Info("Committed state", "height", block.Height, "txs", block.NumTxs, "hash", res.Data)
blockExec.logger.Info("Committed state", "height", block.Height, "txs", block.NumTxs, "appHash", res.Data)
// Update evpool // Update evpool
blockExec.evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence) blockExec.evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence)
@ -343,16 +348,14 @@ func updateState(s State, blockID types.BlockID, header *types.Header,
} }
func fireEvents(txEventPublisher types.TxEventPublisher, block *types.Block, abciResponses *ABCIResponses) { func fireEvents(txEventPublisher types.TxEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
// TODO: Fire events
/*
tx := types.Tx(req.GetDeliverTx().Tx)
for i, tx := range block.Data.Txs {
txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{ txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{
Height: block.Height, Height: block.Height,
Index: uint32(txIndex),
Index: uint32(i),
Tx: tx, Tx: tx,
Result: *txRes,
Result: *(abciResponses.DeliverTx[i]),
}}) }})
*/
}
} }
//---------------------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------------------


+ 1
- 1
state/state.go View File

@ -91,7 +91,7 @@ func (s State) Bytes() []byte {
// IsEmpty returns true if the State is equal to the empty State. // IsEmpty returns true if the State is equal to the empty State.
func (s State) IsEmpty() bool { func (s State) IsEmpty() bool {
return s.LastBlockHeight == 0 // XXX can't compare to Empty
return s.Validators == nil // XXX can't compare to Empty
} }
// GetValidators returns the last and current validator sets. // GetValidators returns the last and current validator sets.


+ 3
- 3
state/state_test.go View File

@ -57,7 +57,7 @@ func TestStateSaveLoad(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
state.LastBlockHeight++ state.LastBlockHeight++
SaveState(stateDB, state, state.AppHash)
SaveState(stateDB, state)
loadedState := LoadState(stateDB) loadedState := LoadState(stateDB)
assert.True(state.Equals(loadedState), assert.True(state.Equals(loadedState),
@ -261,7 +261,7 @@ func TestManyValidatorChangesSaveLoad(t *testing.T) {
const valSetSize = 7 const valSetSize = 7
tearDown, stateDB, state := setupTestCase(t) tearDown, stateDB, state := setupTestCase(t)
state.Validators = genValSet(valSetSize) state.Validators = genValSet(valSetSize)
SaveState(stateDB, state, state.AppHash)
SaveState(stateDB, state)
defer tearDown(t) defer tearDown(t)
const height = 1 const height = 1
@ -425,7 +425,7 @@ func TestLessThanOneThirdOfVotingPowerPerBlockEnforced(t *testing.T) {
for i, tc := range testCases { for i, tc := range testCases {
tearDown, stateDB, state := setupTestCase(t) tearDown, stateDB, state := setupTestCase(t)
state.Validators = genValSet(tc.initialValSetSize) state.Validators = genValSet(tc.initialValSetSize)
SaveState(stateDB, state, state.AppHash)
SaveState(stateDB, state)
height := state.LastBlockHeight + 1 height := state.LastBlockHeight + 1
block := makeBlock(state, height) block := makeBlock(state, height)
abciResponses := &ABCIResponses{ abciResponses := &ABCIResponses{


Loading…
Cancel
Save