Browse Source

cs/replay: execCommitBlock should not read from state.lastValidators (#3067)

* execCommitBlock should not read from state.lastValidators

* fix height 1

* fix blockchain/reactor_test

* fix consensus/mempool_test

* fix consensus/reactor_test

* fix consensus/replay_test

* add CHANGELOG

* fix consensus/reactor_test

* fix consensus/replay_test

* add a test for replay validators change

* fix mem_pool test

* fix byzantine test

* remove a redundant code

* reduce validator change blocks to 6

* fix

* return peer0 config

* seperate testName

* seperate testName 1

* seperate testName 2

* seperate app db path

* seperate app db path 1

* add a lock before startNet

* move the lock to reactor_test

* simulate just once

* try to find problem

* handshake only saveState when app version changed

* update gometalinter to 3.0.0 (#3233)

in the attempt to fix https://circleci.com/gh/tendermint/tendermint/43165

also

    code is simplified by running gofmt -s .
    remove unused vars
    enable linters we're currently passing
    remove deprecated linters
(cherry picked from commit d470945503)

* gofmt code

* goimport code

* change the bool name to testValidatorsChange

* adjust receive kvstore.ProtocolVersion

* adjust receive kvstore.ProtocolVersion 1

* adjust receive kvstore.ProtocolVersion 3

* fix merge execution.go

* fix merge develop

* fix merge develop 1

* fix run cleanupFunc

* adjust code according to reviewers' opinion

* modify the func name match the convention

* simplify simulate a chain containing some validator change txs 1

* test CI error

* Merge remote-tracking branch 'upstream/develop' into fixReplay 1

* fix pubsub_test

* subscribeUnbuffered vote channel
pull/3612/head
JamesRay 6 years ago
committed by Ethan Buchman
parent
commit
2c26d95ab9
13 changed files with 444 additions and 98 deletions
  1. +2
    -1
      CHANGELOG_PENDING.md
  2. +3
    -1
      blockchain/reactor_test.go
  3. +3
    -1
      consensus/byzantine_test.go
  4. +39
    -8
      consensus/common_test.go
  5. +7
    -2
      consensus/mempool_test.go
  6. +6
    -1
      consensus/reactor_test.go
  7. +8
    -4
      consensus/replay.go
  8. +327
    -53
      consensus/replay_test.go
  9. +13
    -4
      consensus/state_test.go
  10. +2
    -1
      consensus/wal_generator.go
  11. +5
    -2
      libs/pubsub/pubsub_test.go
  12. +27
    -18
      state/execution.go
  13. +2
    -2
      state/execution_test.go

+ 2
- 1
CHANGELOG_PENDING.md View File

@ -28,4 +28,5 @@
- [p2p] \#3532 limit the number of attempts to connect to a peer in seed mode - [p2p] \#3532 limit the number of attempts to connect to a peer in seed mode
to 16 (as a result, the node will stop retrying after a 35 hours time window) to 16 (as a result, the node will stop retrying after a 35 hours time window)
- [consensus] \#2723, \#3451 and \#3317 Fix non-deterministic tests - [consensus] \#2723, \#3451 and \#3317 Fix non-deterministic tests
- [pex] \#3603 Dial seeds when addrbook needs more addresses (@defunctzombie)
- [consensus] \#3067 getBeginBlockValidatorInfo loads validators from stateDB instead of state
- [pex] \#3603 Dial seeds when addrbook needs more addresses (@defunctzombie)

+ 3
- 1
blockchain/reactor_test.go View File

@ -91,8 +91,10 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals
// NOTE we have to create and commit the blocks first because // NOTE we have to create and commit the blocks first because
// pool.height is determined from the store. // pool.height is determined from the store.
fastSync := true fastSync := true
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(),
db := dbm.NewMemDB()
blockExec := sm.NewBlockExecutor(db, log.TestingLogger(), proxyApp.Consensus(),
sm.MockMempool{}, sm.MockEvidencePool{}) sm.MockMempool{}, sm.MockEvidencePool{})
sm.SaveState(db, state)
// let's add some blocks in // let's add some blocks in
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {


+ 3
- 1
consensus/byzantine_test.go View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -69,7 +70,7 @@ func TestByzantine(t *testing.T) {
blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock) blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err) require.NoError(t, err)
conR := NewConsensusReactor(css[i], true) // so we dont start the consensus states
conR := NewConsensusReactor(css[i], true) // so we don't start the consensus states
conR.SetLogger(logger.With("validator", i)) conR.SetLogger(logger.With("validator", i))
conR.SetEventBus(eventBus) conR.SetEventBus(eventBus)
@ -81,6 +82,7 @@ func TestByzantine(t *testing.T) {
} }
reactors[i] = conRI reactors[i] = conRI
sm.SaveState(css[i].blockExec.DB(), css[i].state) //for save height 1's validators info
} }
defer func() { defer func() {


+ 39
- 8
consensus/common_test.go View File

@ -14,6 +14,8 @@ import (
"github.com/go-kit/kit/log/term" "github.com/go-kit/kit/log/term"
"path"
abcicli "github.com/tendermint/tendermint/abci/client" abcicli "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/counter" "github.com/tendermint/tendermint/abci/example/counter"
"github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/example/kvstore"
@ -119,6 +121,24 @@ func incrementRound(vss ...*validatorStub) {
} }
} }
type ValidatorStubsByAddress []*validatorStub
func (vss ValidatorStubsByAddress) Len() int {
return len(vss)
}
func (vss ValidatorStubsByAddress) Less(i, j int) bool {
return bytes.Compare(vss[i].GetPubKey().Address(), vss[j].GetPubKey().Address()) == -1
}
func (vss ValidatorStubsByAddress) Swap(i, j int) {
it := vss[i]
vss[i] = vss[j]
vss[i].Index = i
vss[j] = it
vss[j].Index = j
}
//------------------------------------------------------------------------------- //-------------------------------------------------------------------------------
// Functions for transitioning the consensus state // Functions for transitioning the consensus state
@ -228,7 +248,7 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo
} }
func subscribeToVoter(cs *ConsensusState, addr []byte) <-chan tmpubsub.Message { func subscribeToVoter(cs *ConsensusState, addr []byte) <-chan tmpubsub.Message {
votesSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote)
votesSub, err := cs.eventBus.SubscribeUnbuffered(context.Background(), testSubscriber, types.EventQueryVote)
if err != nil { if err != nil {
panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote)) panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote))
} }
@ -278,7 +298,8 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S
evpool := sm.MockEvidencePool{} evpool := sm.MockEvidencePool{}
// Make ConsensusState // Make ConsensusState
stateDB := dbm.NewMemDB()
stateDB := blockDB
sm.SaveState(stateDB, state) //for save height 1's validators info
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool) blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus")) cs.SetLogger(log.TestingLogger().With("module", "consensus"))
@ -564,7 +585,7 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou
vals := types.TM2PB.ValidatorUpdates(state.Validators) vals := types.TM2PB.ValidatorUpdates(state.Validators)
app.InitChain(abci.RequestInitChain{Validators: vals}) app.InitChain(abci.RequestInitChain{Validators: vals})
css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], app)
css[i] = newConsensusStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB)
css[i].SetTimeoutTicker(tickerFunc()) css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus")) css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
} }
@ -576,12 +597,11 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou
} }
// nPeers = nValidators + nNotValidator // nPeers = nValidators + nNotValidator
func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerFunc func() TimeoutTicker,
appFunc func() abci.Application) ([]*ConsensusState, cleanupFunc) {
func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerFunc func() TimeoutTicker, appFunc func(string) abci.Application) ([]*ConsensusState, *types.GenesisDoc, *cfg.Config, cleanupFunc) {
genDoc, privVals := randGenesisDoc(nValidators, false, testMinPower) genDoc, privVals := randGenesisDoc(nValidators, false, testMinPower)
css := make([]*ConsensusState, nPeers) css := make([]*ConsensusState, nPeers)
logger := consensusLogger() logger := consensusLogger()
var peer0Config *cfg.Config
configRootDirs := make([]string, 0, nPeers) configRootDirs := make([]string, 0, nPeers)
for i := 0; i < nPeers; i++ { for i := 0; i < nPeers; i++ {
stateDB := dbm.NewMemDB() // each state needs its own db stateDB := dbm.NewMemDB() // each state needs its own db
@ -589,6 +609,9 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF
thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i)) thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i))
configRootDirs = append(configRootDirs, thisConfig.RootDir) configRootDirs = append(configRootDirs, thisConfig.RootDir)
ensureDir(filepath.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal ensureDir(filepath.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
if i == 0 {
peer0Config = thisConfig
}
var privVal types.PrivValidator var privVal types.PrivValidator
if i < nValidators { if i < nValidators {
privVal = privVals[i] privVal = privVals[i]
@ -605,15 +628,19 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF
privVal = privval.GenFilePV(tempKeyFile.Name(), tempStateFile.Name()) privVal = privval.GenFilePV(tempKeyFile.Name(), tempStateFile.Name())
} }
app := appFunc()
app := appFunc(path.Join(config.DBDir(), fmt.Sprintf("%s_%d", testName, i)))
vals := types.TM2PB.ValidatorUpdates(state.Validators) vals := types.TM2PB.ValidatorUpdates(state.Validators)
if _, ok := app.(*kvstore.PersistentKVStoreApplication); ok {
state.Version.Consensus.App = kvstore.ProtocolVersion //simulate handshake, receive app version. If don't do this, replay test will fail
}
app.InitChain(abci.RequestInitChain{Validators: vals}) app.InitChain(abci.RequestInitChain{Validators: vals})
//sm.SaveState(stateDB,state) //height 1's validatorsInfo already saved in LoadStateFromDBOrGenesisDoc above
css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, app) css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, app)
css[i].SetTimeoutTicker(tickerFunc()) css[i].SetTimeoutTicker(tickerFunc())
css[i].SetLogger(logger.With("validator", i, "module", "consensus")) css[i].SetLogger(logger.With("validator", i, "module", "consensus"))
} }
return css, func() {
return css, genDoc, peer0Config, func() {
for _, dir := range configRootDirs { for _, dir := range configRootDirs {
os.RemoveAll(dir) os.RemoveAll(dir)
} }
@ -719,3 +746,7 @@ func newPersistentKVStore() abci.Application {
} }
return kvstore.NewPersistentKVStoreApplication(dir) return kvstore.NewPersistentKVStoreApplication(dir)
} }
func newPersistentKVStoreWithPath(dbDir string) abci.Application {
return kvstore.NewPersistentKVStoreApplication(dbDir)
}

+ 7
- 2
consensus/mempool_test.go View File

@ -11,6 +11,7 @@ import (
"github.com/tendermint/tendermint/abci/example/code" "github.com/tendermint/tendermint/abci/example/code"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
dbm "github.com/tendermint/tendermint/libs/db"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -106,7 +107,9 @@ func deliverTxsRange(cs *ConsensusState, start, end int) {
func TestMempoolTxConcurrentWithCommit(t *testing.T) { func TestMempoolTxConcurrentWithCommit(t *testing.T) {
state, privVals := randGenesisState(1, false, 10) state, privVals := randGenesisState(1, false, 10)
cs := newConsensusState(state, privVals[0], NewCounterApplication())
blockDB := dbm.NewMemDB()
cs := newConsensusStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB)
sm.SaveState(blockDB, state)
height, round := cs.Height, cs.Round height, round := cs.Height, cs.Round
newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock) newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
@ -129,7 +132,9 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
func TestMempoolRmBadTx(t *testing.T) { func TestMempoolRmBadTx(t *testing.T) {
state, privVals := randGenesisState(1, false, 10) state, privVals := randGenesisState(1, false, 10)
app := NewCounterApplication() app := NewCounterApplication()
cs := newConsensusState(state, privVals[0], app)
blockDB := dbm.NewMemDB()
cs := newConsensusStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB)
sm.SaveState(blockDB, state)
// increment the counter by 1 // increment the counter by 1
txBytes := make([]byte, 8) txBytes := make([]byte, 8)


+ 6
- 1
consensus/reactor_test.go View File

@ -51,6 +51,10 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) (
blocksSub, err := eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock) blocksSub, err := eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err) require.NoError(t, err)
blocksSubs = append(blocksSubs, blocksSub) blocksSubs = append(blocksSubs, blocksSub)
if css[i].state.LastBlockHeight == 0 { //simulate handle initChain in handshake
sm.SaveState(css[i].blockExec.DB(), css[i].state)
}
} }
// make connected switches and start all reactors // make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
@ -329,7 +333,8 @@ func TestReactorVotingPowerChange(t *testing.T) {
func TestReactorValidatorSetChanges(t *testing.T) { func TestReactorValidatorSetChanges(t *testing.T) {
nPeers := 7 nPeers := 7
nVals := 4 nVals := 4
css, cleanup := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentKVStore)
css, _, _, cleanup := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentKVStoreWithPath)
defer cleanup() defer cleanup()
logger := log.TestingLogger() logger := log.TestingLogger()


+ 8
- 4
consensus/replay.go View File

@ -258,8 +258,10 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
) )
// Set AppVersion on the state. // Set AppVersion on the state.
h.initialState.Version.Consensus.App = version.Protocol(res.AppVersion)
sm.SaveState(h.stateDB, h.initialState)
if h.initialState.Version.Consensus.App != version.Protocol(res.AppVersion) {
h.initialState.Version.Consensus.App = version.Protocol(res.AppVersion)
sm.SaveState(h.stateDB, h.initialState)
}
// Replay blocks up to the latest in the blockstore. // Replay blocks up to the latest in the blockstore.
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
@ -421,13 +423,12 @@ func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBl
for i := appBlockHeight + 1; i <= finalBlock; i++ { for i := appBlockHeight + 1; i <= finalBlock; i++ {
h.logger.Info("Applying block", "height", i) h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i) block := h.store.LoadBlock(i)
// Extra check to ensure the app was not changed in a way it shouldn't have. // Extra check to ensure the app was not changed in a way it shouldn't have.
if len(appHash) > 0 { if len(appHash) > 0 {
assertAppHashEqualsOneFromBlock(appHash, block) assertAppHashEqualsOneFromBlock(appHash, block)
} }
appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, state.LastValidators, h.stateDB)
appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, h.stateDB)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -517,6 +518,9 @@ type mockProxyApp struct {
func (mock *mockProxyApp) DeliverTx(tx []byte) abci.ResponseDeliverTx { func (mock *mockProxyApp) DeliverTx(tx []byte) abci.ResponseDeliverTx {
r := mock.abciResponses.DeliverTx[mock.txCount] r := mock.abciResponses.DeliverTx[mock.txCount]
mock.txCount++ mock.txCount++
if r == nil { //it could be nil because of amino unMarshall, it will cause an empty ResponseDeliverTx to become nil
return abci.ResponseDeliverTx{}
}
return *r return *r
} }


+ 327
- 53
consensus/replay_test.go View File

@ -15,6 +15,8 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"sort"
"github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/example/kvstore"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
@ -140,10 +142,10 @@ LOOP:
// create consensus state from a clean slate // create consensus state from a clean slate
logger := log.NewNopLogger() logger := log.NewNopLogger()
stateDB := dbm.NewMemDB()
blockDB := dbm.NewMemDB()
stateDB := blockDB
state, _ := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile()) state, _ := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile())
privValidator := loadPrivValidator(consensusReplayConfig) privValidator := loadPrivValidator(consensusReplayConfig)
blockDB := dbm.NewMemDB()
cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, kvstore.NewKVStoreApplication(), blockDB) cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, kvstore.NewKVStoreApplication(), blockDB)
cs.SetLogger(logger) cs.SetLogger(logger)
@ -262,7 +264,13 @@ func (w *crashingWAL) Stop() error { return w.next.Stop() }
func (w *crashingWAL) Wait() { w.next.Wait() } func (w *crashingWAL) Wait() { w.next.Wait() }
//------------------------------------------------------------------------------------------ //------------------------------------------------------------------------------------------
// Handshake Tests
type testSim struct {
GenesisState sm.State
Config *cfg.Config
Chain []*types.Block
Commits []*types.Commit
CleanupFunc cleanupFunc
}
const ( const (
numBlocks = 6 numBlocks = 6
@ -271,6 +279,8 @@ const (
var ( var (
mempool = sm.MockMempool{} mempool = sm.MockMempool{}
evpool = sm.MockEvidencePool{} evpool = sm.MockEvidencePool{}
sim testSim
) )
//--------------------------------------- //---------------------------------------
@ -281,46 +291,285 @@ var (
// 2 - save block and committed but state is behind // 2 - save block and committed but state is behind
var modes = []uint{0, 1, 2} var modes = []uint{0, 1, 2}
// This is actually not a test, it's for storing validator change tx data for testHandshakeReplay
func TestSimulateValidatorsChange(t *testing.T) {
nPeers := 7
nVals := 4
css, genDoc, config, cleanup := randConsensusNetWithPeers(nVals, nPeers, "replay_test", newMockTickerFunc(true), newPersistentKVStoreWithPath)
sim.Config = config
sim.GenesisState, _ = sm.MakeGenesisState(genDoc)
sim.CleanupFunc = cleanup
partSize := types.BlockPartSizeBytes
newRoundCh := subscribe(css[0].eventBus, types.EventQueryNewRound)
proposalCh := subscribe(css[0].eventBus, types.EventQueryCompleteProposal)
vss := make([]*validatorStub, nPeers)
for i := 0; i < nPeers; i++ {
vss[i] = NewValidatorStub(css[i].privValidator, i)
}
height, round := css[0].Height, css[0].Round
// start the machine
startTestRound(css[0], height, round)
incrementHeight(vss...)
ensureNewRound(newRoundCh, height, 0)
ensureNewProposal(proposalCh, height, round)
rs := css[0].GetRoundState()
signAddVotes(css[0], types.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), vss[1:nVals]...)
ensureNewRound(newRoundCh, height+1, 0)
//height 2
height++
incrementHeight(vss...)
newValidatorPubKey1 := css[nVals].privValidator.GetPubKey()
valPubKey1ABCI := types.TM2PB.PubKey(newValidatorPubKey1)
newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower)
err := assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil)
assert.Nil(t, err)
propBlock, _ := css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts := propBlock.MakePartSet(partSize)
blockID := types.BlockID{Hash: propBlock.Hash(), PartsHeader: propBlockParts.Header()}
proposal := types.NewProposal(vss[1].Height, round, -1, blockID)
if err := vss[1].SignProposal(config.ChainID(), proposal); err != nil {
t.Fatal("failed to sign bad proposal", err)
}
// set the proposal block
if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil {
t.Fatal(err)
}
ensureNewProposal(proposalCh, height, round)
rs = css[0].GetRoundState()
signAddVotes(css[0], types.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), vss[1:nVals]...)
ensureNewRound(newRoundCh, height+1, 0)
//height 3
height++
incrementHeight(vss...)
updateValidatorPubKey1 := css[nVals].privValidator.GetPubKey()
updatePubKey1ABCI := types.TM2PB.PubKey(updateValidatorPubKey1)
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil)
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
blockID = types.BlockID{Hash: propBlock.Hash(), PartsHeader: propBlockParts.Header()}
proposal = types.NewProposal(vss[2].Height, round, -1, blockID)
if err := vss[2].SignProposal(config.ChainID(), proposal); err != nil {
t.Fatal("failed to sign bad proposal", err)
}
// set the proposal block
if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil {
t.Fatal(err)
}
ensureNewProposal(proposalCh, height, round)
rs = css[0].GetRoundState()
signAddVotes(css[0], types.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), vss[1:nVals]...)
ensureNewRound(newRoundCh, height+1, 0)
//height 4
height++
incrementHeight(vss...)
newValidatorPubKey2 := css[nVals+1].privValidator.GetPubKey()
newVal2ABCI := types.TM2PB.PubKey(newValidatorPubKey2)
newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil)
assert.Nil(t, err)
newValidatorPubKey3 := css[nVals+2].privValidator.GetPubKey()
newVal3ABCI := types.TM2PB.PubKey(newValidatorPubKey3)
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil)
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
blockID = types.BlockID{Hash: propBlock.Hash(), PartsHeader: propBlockParts.Header()}
newVss := make([]*validatorStub, nVals+1)
copy(newVss, vss[:nVals+1])
sort.Sort(ValidatorStubsByAddress(newVss))
selfIndex := 0
for i, vs := range newVss {
if vs.GetPubKey().Equals(css[0].privValidator.GetPubKey()) {
selfIndex = i
break
}
}
proposal = types.NewProposal(vss[3].Height, round, -1, blockID)
if err := vss[3].SignProposal(config.ChainID(), proposal); err != nil {
t.Fatal("failed to sign bad proposal", err)
}
// set the proposal block
if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil {
t.Fatal(err)
}
ensureNewProposal(proposalCh, height, round)
removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, nil)
assert.Nil(t, err)
rs = css[0].GetRoundState()
for i := 0; i < nVals+1; i++ {
if i == selfIndex {
continue
}
signAddVotes(css[0], types.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), newVss[i])
}
ensureNewRound(newRoundCh, height+1, 0)
//height 5
height++
incrementHeight(vss...)
ensureNewProposal(proposalCh, height, round)
rs = css[0].GetRoundState()
for i := 0; i < nVals+1; i++ {
if i == selfIndex {
continue
}
signAddVotes(css[0], types.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), newVss[i])
}
ensureNewRound(newRoundCh, height+1, 0)
//height 6
height++
incrementHeight(vss...)
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil)
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
blockID = types.BlockID{Hash: propBlock.Hash(), PartsHeader: propBlockParts.Header()}
newVss = make([]*validatorStub, nVals+3)
copy(newVss, vss[:nVals+3])
sort.Sort(ValidatorStubsByAddress(newVss))
for i, vs := range newVss {
if vs.GetPubKey().Equals(css[0].privValidator.GetPubKey()) {
selfIndex = i
break
}
}
proposal = types.NewProposal(vss[1].Height, round, -1, blockID)
if err := vss[1].SignProposal(config.ChainID(), proposal); err != nil {
t.Fatal("failed to sign bad proposal", err)
}
// set the proposal block
if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil {
t.Fatal(err)
}
ensureNewProposal(proposalCh, height, round)
rs = css[0].GetRoundState()
for i := 0; i < nVals+3; i++ {
if i == selfIndex {
continue
}
signAddVotes(css[0], types.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), newVss[i])
}
ensureNewRound(newRoundCh, height+1, 0)
sim.Chain = make([]*types.Block, 0)
sim.Commits = make([]*types.Commit, 0)
for i := 1; i <= numBlocks; i++ {
sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i)))
sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i)))
}
}
// Sync from scratch // Sync from scratch
func TestHandshakeReplayAll(t *testing.T) { func TestHandshakeReplayAll(t *testing.T) {
for i, m := range modes {
config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
defer os.RemoveAll(config.RootDir)
testHandshakeReplay(t, config, 0, m)
for _, m := range modes {
testHandshakeReplay(t, config, 0, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, config, 0, m, true)
} }
} }
// Sync many, not from scratch // Sync many, not from scratch
func TestHandshakeReplaySome(t *testing.T) { func TestHandshakeReplaySome(t *testing.T) {
for i, m := range modes {
config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
defer os.RemoveAll(config.RootDir)
testHandshakeReplay(t, config, 1, m)
for _, m := range modes {
testHandshakeReplay(t, config, 1, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, config, 1, m, true)
} }
} }
// Sync from lagging by one // Sync from lagging by one
func TestHandshakeReplayOne(t *testing.T) { func TestHandshakeReplayOne(t *testing.T) {
for i, m := range modes {
config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
defer os.RemoveAll(config.RootDir)
testHandshakeReplay(t, config, numBlocks-1, m)
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks-1, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks-1, m, true)
} }
} }
// Sync from caught up // Sync from caught up
func TestHandshakeReplayNone(t *testing.T) { func TestHandshakeReplayNone(t *testing.T) {
for i, m := range modes {
config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i))
defer os.RemoveAll(config.RootDir)
testHandshakeReplay(t, config, numBlocks, m)
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks, m, false)
}
for _, m := range modes {
testHandshakeReplay(t, config, numBlocks, m, true)
} }
} }
// Test mockProxyApp should not panic when app return ABCIResponses with some empty ResponseDeliverTx
func TestMockProxyApp(t *testing.T) {
sim.CleanupFunc() //clean the test env created in TestSimulateValidatorsChange
logger := log.TestingLogger()
var validTxs, invalidTxs = 0, 0
txIndex := 0
assert.NotPanics(t, func() {
abciResWithEmptyDeliverTx := new(sm.ABCIResponses)
abciResWithEmptyDeliverTx.DeliverTx = make([]*abci.ResponseDeliverTx, 0)
abciResWithEmptyDeliverTx.DeliverTx = append(abciResWithEmptyDeliverTx.DeliverTx, &abci.ResponseDeliverTx{})
// called when saveABCIResponses:
bytes := cdc.MustMarshalBinaryBare(abciResWithEmptyDeliverTx)
loadedAbciRes := new(sm.ABCIResponses)
// this also happens sm.LoadABCIResponses
err := cdc.UnmarshalBinaryBare(bytes, loadedAbciRes)
require.NoError(t, err)
mock := newMockProxyApp([]byte("mock_hash"), loadedAbciRes)
abciRes := new(sm.ABCIResponses)
abciRes.DeliverTx = make([]*abci.ResponseDeliverTx, len(loadedAbciRes.DeliverTx))
// Execute transactions and get hash.
proxyCb := func(req *abci.Request, res *abci.Response) {
switch r := res.Value.(type) {
case *abci.Response_DeliverTx:
// TODO: make use of res.Log
// TODO: make use of this info
// Blocks may include invalid txs.
txRes := r.DeliverTx
if txRes.Code == abci.CodeTypeOK {
validTxs++
} else {
logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
invalidTxs++
}
abciRes.DeliverTx[txIndex] = txRes
txIndex++
}
}
mock.SetResponseCallback(proxyCb)
someTx := []byte("tx")
mock.DeliverTxAsync(someTx)
})
assert.True(t, validTxs == 1)
assert.True(t, invalidTxs == 0)
}
func tempWALWithData(data []byte) string { func tempWALWithData(data []byte) string {
walFile, err := ioutil.TempFile("", "wal") walFile, err := ioutil.TempFile("", "wal")
if err != nil { if err != nil {
@ -336,54 +585,68 @@ func tempWALWithData(data []byte) string {
return walFile.Name() return walFile.Name()
} }
// Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart
// the app and sync it up with the remaining blocks.
func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uint) {
walBody, err := WALWithNBlocks(t, numBlocks)
require.NoError(t, err)
walFile := tempWALWithData(walBody)
config.Consensus.SetWalFile(walFile)
privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
// Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart the app and sync it up with the remaining blocks
func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uint, testValidatorsChange bool) {
var chain []*types.Block
var commits []*types.Commit
var store *mockBlockStore
var stateDB dbm.DB
var genisisState sm.State
if testValidatorsChange {
testConfig := ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode))
defer os.RemoveAll(testConfig.RootDir)
stateDB = dbm.NewMemDB()
genisisState = sim.GenesisState
config = sim.Config
chain = sim.Chain
commits = sim.Commits
store = newMockBlockStore(config, genisisState.ConsensusParams)
} else { //test single node
testConfig := ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode))
defer os.RemoveAll(testConfig.RootDir)
walBody, err := WALWithNBlocks(t, numBlocks)
require.NoError(t, err)
walFile := tempWALWithData(walBody)
config.Consensus.SetWalFile(walFile)
wal, err := NewWAL(walFile)
require.NoError(t, err)
wal.SetLogger(log.TestingLogger())
err = wal.Start()
require.NoError(t, err)
defer wal.Stop()
privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
chain, commits, err := makeBlockchainFromWAL(wal)
require.NoError(t, err)
wal, err := NewWAL(walFile)
require.NoError(t, err)
wal.SetLogger(log.TestingLogger())
err = wal.Start()
require.NoError(t, err)
defer wal.Stop()
stateDB, state, store := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion)
chain, commits, err = makeBlockchainFromWAL(wal)
require.NoError(t, err)
stateDB, genisisState, store = stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion)
}
store.chain = chain store.chain = chain
store.commits = commits store.commits = commits
state := genisisState.Copy()
// run the chain through state.ApplyBlock to build up the tendermint state // run the chain through state.ApplyBlock to build up the tendermint state
clientCreator := proxy.NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "1")))
proxyApp := proxy.NewAppConns(clientCreator)
err = proxyApp.Start()
require.NoError(t, err)
state = buildTMStateFromChain(config, stateDB, state, chain, proxyApp, mode)
proxyApp.Stop()
state = buildTMStateFromChain(config, stateDB, state, chain, nBlocks, mode)
latestAppHash := state.AppHash latestAppHash := state.AppHash
// make a new client creator // make a new client creator
kvstoreApp := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "2"))
kvstoreApp := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_a", nBlocks, mode)))
clientCreator2 := proxy.NewLocalClientCreator(kvstoreApp) clientCreator2 := proxy.NewLocalClientCreator(kvstoreApp)
if nBlocks > 0 { if nBlocks > 0 {
// run nBlocks against a new client to build up the app state. // run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state // use a throwaway tendermint state
proxyApp := proxy.NewAppConns(clientCreator2) proxyApp := proxy.NewAppConns(clientCreator2)
stateDB, state, _ := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion)
buildAppStateFromChain(proxyApp, stateDB, state, chain, nBlocks, mode)
stateDB1 := dbm.NewMemDB()
sm.SaveState(stateDB1, genisisState)
buildAppStateFromChain(proxyApp, stateDB1, genisisState, chain, nBlocks, mode)
} }
// now start the app using the handshake - it should sync // now start the app using the handshake - it should sync
genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile()) genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
handshaker := NewHandshaker(stateDB, state, store, genDoc) handshaker := NewHandshaker(stateDB, state, store, genDoc)
proxyApp = proxy.NewAppConns(clientCreator2)
proxyApp := proxy.NewAppConns(clientCreator2)
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err) t.Fatalf("Error starting proxy app connections: %v", err)
} }
@ -435,12 +698,14 @@ func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB,
} }
defer proxyApp.Stop() defer proxyApp.Stop()
state.Version.Consensus.App = kvstore.ProtocolVersion //simulate handshake, receive app version
validators := types.TM2PB.ValidatorUpdates(state.Validators) validators := types.TM2PB.ValidatorUpdates(state.Validators)
if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{ if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
Validators: validators, Validators: validators,
}); err != nil { }); err != nil {
panic(err) panic(err)
} }
sm.SaveState(stateDB, state) //save height 1's validatorsInfo
switch mode { switch mode {
case 0: case 0:
@ -463,15 +728,23 @@ func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB,
} }
func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State,
chain []*types.Block, proxyApp proxy.AppConns, mode uint) sm.State {
func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, chain []*types.Block, nBlocks int, mode uint) sm.State {
// run the whole chain against this client to build up the tendermint state
clientCreator := proxy.NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode))))
proxyApp := proxy.NewAppConns(clientCreator)
if err := proxyApp.Start(); err != nil {
panic(err)
}
defer proxyApp.Stop()
state.Version.Consensus.App = kvstore.ProtocolVersion //simulate handshake, receive app version
validators := types.TM2PB.ValidatorUpdates(state.Validators) validators := types.TM2PB.ValidatorUpdates(state.Validators)
if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{ if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{
Validators: validators, Validators: validators,
}); err != nil { }); err != nil {
panic(err) panic(err)
} }
sm.SaveState(stateDB, state) //save height 1's validatorsInfo
switch mode { switch mode {
case 0: case 0:
@ -743,6 +1016,7 @@ func stateAndStore(config *cfg.Config, pubKey crypto.PubKey, appVersion version.
state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile()) state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile())
state.Version.Consensus.App = appVersion state.Version.Consensus.App = appVersion
store := newMockBlockStore(config, state.ConsensusParams) store := newMockBlockStore(config, state.ConsensusParams)
sm.SaveState(stateDB, state)
return stateDB, state, store return stateDB, state, store
} }


+ 13
- 4
consensus/state_test.go View File

@ -239,7 +239,7 @@ func TestStateFullRound1(t *testing.T) {
cs.SetEventBus(eventBus) cs.SetEventBus(eventBus)
eventBus.Start() eventBus.Start()
voteCh := subscribe(cs.eventBus, types.EventQueryVote)
voteCh := subscribeUnBuffered(cs.eventBus, types.EventQueryVote)
propCh := subscribe(cs.eventBus, types.EventQueryCompleteProposal) propCh := subscribe(cs.eventBus, types.EventQueryCompleteProposal)
newRoundCh := subscribe(cs.eventBus, types.EventQueryNewRound) newRoundCh := subscribe(cs.eventBus, types.EventQueryNewRound)
@ -267,7 +267,7 @@ func TestStateFullRoundNil(t *testing.T) {
cs, vss := randConsensusState(1) cs, vss := randConsensusState(1)
height, round := cs.Height, cs.Round height, round := cs.Height, cs.Round
voteCh := subscribe(cs.eventBus, types.EventQueryVote)
voteCh := subscribeUnBuffered(cs.eventBus, types.EventQueryVote)
cs.enterPrevote(height, round) cs.enterPrevote(height, round)
cs.startRoutines(4) cs.startRoutines(4)
@ -286,7 +286,7 @@ func TestStateFullRound2(t *testing.T) {
vs2 := vss[1] vs2 := vss[1]
height, round := cs1.Height, cs1.Round height, round := cs1.Height, cs1.Round
voteCh := subscribe(cs1.eventBus, types.EventQueryVote)
voteCh := subscribeUnBuffered(cs1.eventBus, types.EventQueryVote)
newBlockCh := subscribe(cs1.eventBus, types.EventQueryNewBlock) newBlockCh := subscribe(cs1.eventBus, types.EventQueryNewBlock)
// start round and wait for propose and prevote // start round and wait for propose and prevote
@ -330,7 +330,7 @@ func TestStateLockNoPOL(t *testing.T) {
timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose)
timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait)
voteCh := subscribe(cs1.eventBus, types.EventQueryVote)
voteCh := subscribeUnBuffered(cs1.eventBus, types.EventQueryVote)
proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal)
newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound)
@ -1623,3 +1623,12 @@ func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan tmpubsub.Messa
} }
return sub.Out() return sub.Out()
} }
// subscribe subscribes test client to the given query and returns a channel with cap = 0.
func subscribeUnBuffered(eventBus *types.EventBus, q tmpubsub.Query) <-chan tmpubsub.Message {
sub, err := eventBus.SubscribeUnbuffered(context.Background(), testSubscriber, q)
if err != nil {
panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, q))
}
return sub.Out()
}

+ 2
- 1
consensus/wal_generator.go View File

@ -45,13 +45,14 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
if err != nil { if err != nil {
return errors.Wrap(err, "failed to read genesis file") return errors.Wrap(err, "failed to read genesis file")
} }
stateDB := db.NewMemDB()
blockStoreDB := db.NewMemDB() blockStoreDB := db.NewMemDB()
stateDB := blockStoreDB
state, err := sm.MakeGenesisState(genDoc) state, err := sm.MakeGenesisState(genDoc)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to make genesis state") return errors.Wrap(err, "failed to make genesis state")
} }
state.Version.Consensus.App = kvstore.ProtocolVersion state.Version.Consensus.App = kvstore.ProtocolVersion
sm.SaveState(stateDB, state)
blockStore := bc.NewBlockStore(blockStoreDB) blockStore := bc.NewBlockStore(blockStoreDB)
proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app)) proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app))
proxyApp.SetLogger(logger.With("module", "proxy")) proxyApp.SetLogger(logger.With("module", "proxy"))


+ 5
- 2
libs/pubsub/pubsub_test.go View File

@ -46,13 +46,16 @@ func TestSubscribe(t *testing.T) {
err = s.Publish(ctx, "Asylum") err = s.Publish(ctx, "Asylum")
assert.NoError(t, err) assert.NoError(t, err)
err = s.Publish(ctx, "Ivan")
assert.NoError(t, err)
}() }()
select { select {
case <-published: case <-published:
assertReceive(t, "Quicksilver", subscription.Out()) assertReceive(t, "Quicksilver", subscription.Out())
assertCancelled(t, subscription, pubsub.ErrOutOfCapacity) assertCancelled(t, subscription, pubsub.ErrOutOfCapacity)
case <-time.After(100 * time.Millisecond):
case <-time.After(3 * time.Second):
t.Fatal("Expected Publish(Asylum) not to block") t.Fatal("Expected Publish(Asylum) not to block")
} }
} }
@ -101,7 +104,7 @@ func TestSubscribeUnbuffered(t *testing.T) {
select { select {
case <-published: case <-published:
t.Fatal("Expected Publish(Darkhawk) to block") t.Fatal("Expected Publish(Darkhawk) to block")
case <-time.After(100 * time.Millisecond):
case <-time.After(3 * time.Second):
assertReceive(t, "Ultron", subscription.Out()) assertReceive(t, "Ultron", subscription.Out())
assertReceive(t, "Darkhawk", subscription.Out()) assertReceive(t, "Darkhawk", subscription.Out())
} }


+ 27
- 18
state/execution.go View File

@ -66,6 +66,10 @@ func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsen
return res return res
} }
func (blockExec *BlockExecutor) DB() dbm.DB {
return blockExec.db
}
// SetEventBus - sets the event bus for publishing block related events. // SetEventBus - sets the event bus for publishing block related events.
// If not called, it defaults to types.NopEventBus. // If not called, it defaults to types.NopEventBus.
func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) { func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
@ -116,7 +120,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b
} }
startTime := time.Now().UnixNano() startTime := time.Now().UnixNano()
abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, state.LastValidators, blockExec.db)
abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, blockExec.db)
endTime := time.Now().UnixNano() endTime := time.Now().UnixNano()
blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000) blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
if err != nil { if err != nil {
@ -233,7 +237,6 @@ func execBlockOnProxyApp(
logger log.Logger, logger log.Logger,
proxyAppConn proxy.AppConnConsensus, proxyAppConn proxy.AppConnConsensus,
block *types.Block, block *types.Block,
lastValSet *types.ValidatorSet,
stateDB dbm.DB, stateDB dbm.DB,
) (*ABCIResponses, error) { ) (*ABCIResponses, error) {
var validTxs, invalidTxs = 0, 0 var validTxs, invalidTxs = 0, 0
@ -261,7 +264,7 @@ func execBlockOnProxyApp(
} }
proxyAppConn.SetResponseCallback(proxyCb) proxyAppConn.SetResponseCallback(proxyCb)
commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB)
commitInfo, byzVals := getBeginBlockValidatorInfo(block, stateDB)
// Begin block // Begin block
var err error var err error
@ -296,22 +299,31 @@ func execBlockOnProxyApp(
return abciResponses, nil return abciResponses, nil
} }
func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorSet, stateDB dbm.DB) (abci.LastCommitInfo, []abci.Evidence) {
// Sanity check that commit length matches validator set size -
// only applies after first block
func getBeginBlockValidatorInfo(block *types.Block, stateDB dbm.DB) (abci.LastCommitInfo, []abci.Evidence) {
voteInfos := make([]abci.VoteInfo, block.LastCommit.Size())
byzVals := make([]abci.Evidence, len(block.Evidence.Evidence))
var lastValSet *types.ValidatorSet
var err error
if block.Height > 1 { if block.Height > 1 {
precommitLen := len(block.LastCommit.Precommits)
lastValSet, err = LoadValidators(stateDB, block.Height-1)
if err != nil {
panic(err) // shouldn't happen
}
// Sanity check that commit length matches validator set size -
// only applies after first block
precommitLen := block.LastCommit.Size()
valSetLen := len(lastValSet.Validators) valSetLen := len(lastValSet.Validators)
if precommitLen != valSetLen { if precommitLen != valSetLen {
// sanity check // sanity check
panic(fmt.Sprintf("precommit length (%d) doesn't match valset length (%d) at height %d\n\n%v\n\n%v", panic(fmt.Sprintf("precommit length (%d) doesn't match valset length (%d) at height %d\n\n%v\n\n%v",
precommitLen, valSetLen, block.Height, block.LastCommit.Precommits, lastValSet.Validators)) precommitLen, valSetLen, block.Height, block.LastCommit.Precommits, lastValSet.Validators))
} }
} else {
lastValSet = types.NewValidatorSet(nil)
} }
// Collect the vote info (list of validators and whether or not they signed).
voteInfos := make([]abci.VoteInfo, len(lastValSet.Validators))
for i, val := range lastValSet.Validators { for i, val := range lastValSet.Validators {
var vote *types.CommitSig var vote *types.CommitSig
if i < len(block.LastCommit.Precommits) { if i < len(block.LastCommit.Precommits) {
@ -324,12 +336,6 @@ func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorS
voteInfos[i] = voteInfo voteInfos[i] = voteInfo
} }
commitInfo := abci.LastCommitInfo{
Round: int32(block.LastCommit.Round()),
Votes: voteInfos,
}
byzVals := make([]abci.Evidence, len(block.Evidence.Evidence))
for i, ev := range block.Evidence.Evidence { for i, ev := range block.Evidence.Evidence {
// We need the validator set. We already did this in validateBlock. // We need the validator set. We already did this in validateBlock.
// TODO: Should we instead cache the valset in the evidence itself and add // TODO: Should we instead cache the valset in the evidence itself and add
@ -341,6 +347,10 @@ func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorS
byzVals[i] = types.TM2PB.Evidence(ev, valset, block.Time) byzVals[i] = types.TM2PB.Evidence(ev, valset, block.Time)
} }
commitInfo := abci.LastCommitInfo{
Round: int32(block.LastCommit.Round()),
Votes: voteInfos,
}
return commitInfo, byzVals return commitInfo, byzVals
} }
@ -469,10 +479,9 @@ func ExecCommitBlock(
appConnConsensus proxy.AppConnConsensus, appConnConsensus proxy.AppConnConsensus,
block *types.Block, block *types.Block,
logger log.Logger, logger log.Logger,
lastValSet *types.ValidatorSet,
stateDB dbm.DB, stateDB dbm.DB,
) ([]byte, error) { ) ([]byte, error) {
_, err := execBlockOnProxyApp(logger, appConnConsensus, block, lastValSet, stateDB)
_, err := execBlockOnProxyApp(logger, appConnConsensus, block, stateDB)
if err != nil { if err != nil {
logger.Error("Error executing block on proxy app", "height", block.Height, "err", err) logger.Error("Error executing block on proxy app", "height", block.Height, "err", err)
return nil, err return nil, err


+ 2
- 2
state/execution_test.go View File

@ -85,7 +85,7 @@ func TestBeginBlockValidators(t *testing.T) {
// block for height 2 // block for height 2
block, _ := state.MakeBlock(2, makeTxs(2), lastCommit, nil, state.Validators.GetProposer().Address) block, _ := state.MakeBlock(2, makeTxs(2), lastCommit, nil, state.Validators.GetProposer().Address)
_, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger(), state.Validators, stateDB)
_, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger(), stateDB)
require.Nil(t, err, tc.desc) require.Nil(t, err, tc.desc)
// -> app receives a list of validators with a bool indicating if they signed // -> app receives a list of validators with a bool indicating if they signed
@ -146,7 +146,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
block, _ := state.MakeBlock(10, makeTxs(2), lastCommit, nil, state.Validators.GetProposer().Address) block, _ := state.MakeBlock(10, makeTxs(2), lastCommit, nil, state.Validators.GetProposer().Address)
block.Time = now block.Time = now
block.Evidence.Evidence = tc.evidence block.Evidence.Evidence = tc.evidence
_, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger(), state.Validators, stateDB)
_, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger(), stateDB)
require.Nil(t, err, tc.desc) require.Nil(t, err, tc.desc)
// -> app must receive an index of the byzantine validator // -> app must receive an index of the byzantine validator


Loading…
Cancel
Save