From e7a12f8e38d8cc7d998c4cab3018cb84cb07b019 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 12 Jan 2017 14:44:42 -0500 Subject: [PATCH] cs.Wait() --- consensus/common_test.go | 17 ++++++++++++----- consensus/replay_test.go | 9 +++++++++ consensus/state.go | 11 +++++++++++ glide.lock | 4 ++-- 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/consensus/common_test.go b/consensus/common_test.go index a45c45dba..2082ff73a 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io/ioutil" + "os" "path" "sort" "sync" @@ -29,6 +30,12 @@ import ( var config cfg.Config // NOTE: must be reset for each _test.go file var ensureTimeout = time.Duration(2) +func ensureDir(dir string, mode os.FileMode) { + if err := EnsureDir(dir, mode); err != nil { + panic(err) + } +} + //------------------------------------------------------------------------------- // validator stub (a dummy consensus peer we control) @@ -249,9 +256,9 @@ func newConsensusStateWithConfig(thisConfig cfg.Config, state *sm.State, pv *typ return cs } -func loadPrivValidator(config cfg.Config) *types.PrivValidator { - privValidatorFile := config.GetString("priv_validator_file") - EnsureDir(path.Dir(privValidatorFile), 0700) +func loadPrivValidator(conf cfg.Config) *types.PrivValidator { + privValidatorFile := conf.GetString("priv_validator_file") + ensureDir(path.Dir(privValidatorFile), 0700) privValidator := types.LoadOrGenPrivValidator(privValidatorFile) privValidator.Reset() return privValidator @@ -313,7 +320,7 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou state := sm.MakeGenesisState(db, genDoc) state.Save() thisConfig := tendermint_test.ResetConfig(Fmt("%s_%d", testName, i)) - EnsureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal + ensureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], appFunc()) css[i].SetTimeoutTicker(tickerFunc()) } @@ -329,7 +336,7 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF state := sm.MakeGenesisState(db, genDoc) state.Save() thisConfig := tendermint_test.ResetConfig(Fmt("%s_%d", testName, i)) - EnsureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal + ensureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal var privVal *types.PrivValidator if i < nValidators { privVal = privVals[i] diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 823b74a89..d60fa9f2a 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -118,7 +118,16 @@ func runReplayTest(t *testing.T, cs *ConsensusState, walDir string, newBlockCh c // Assuming the consensus state is running, replay of any WAL, including the empty one, // should eventually be followed by a new block, or else something is wrong waitForBlock(newBlockCh, thisCase, i) + cs.evsw.Stop() cs.Stop() +LOOP: + for { + select { + case <-newBlockCh: + default: + break LOOP + } + } cs.Wait() } diff --git a/consensus/state.go b/consensus/state.go index 769bb1e47..bc3eb18fa 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -251,6 +251,8 @@ type ConsensusState struct { decideProposal func(height, round int) doPrevote func(height, round int) setProposal func(proposal *types.Proposal) error + + done chan struct{} } func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState { @@ -263,6 +265,7 @@ func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.Ap internalMsgQueue: make(chan msgInfo, msgQueueSize), timeoutTicker: NewTimeoutTicker(), timeoutParams: InitTimeoutParamsFromConfig(config), + done: make(chan struct{}), } // set function defaults (may be overwritten before calling Start) cs.decideProposal = cs.defaultDecideProposal @@ -410,6 +413,12 @@ func (cs *ConsensusState) OnStop() { } } +// NOTE: be sure to Stop() the event switch and drain +// any event channels or this may deadlock +func (cs *ConsensusState) Wait() { + <-cs.done +} + // Open file to log all consensus messages and timeouts for deterministic accountability func (cs *ConsensusState) OpenWAL(walDir string) (err error) { cs.mtx.Lock() @@ -659,6 +668,8 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { if cs.wal != nil { cs.wal.Stop() } + + close(cs.done) return } } diff --git a/glide.lock b/glide.lock index 1fa4f5920..c9ebbfee6 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: 2c623322ed0ff7136db54be910e62e679af6e989b18804bb2e6c457fa79533ff -updated: 2017-01-12T11:00:17.295872167-05:00 +updated: 2017-01-12T14:33:09.663440725-05:00 imports: - name: github.com/btcsuite/btcd version: afec1bd1245a4a19e6dfe1306974b733e7cbb9b8 @@ -68,7 +68,7 @@ imports: - name: github.com/tendermint/go-db version: 2645626c33d8702739e52a61a55d705c2dfe4530 - name: github.com/tendermint/go-events - version: 1c85cb98a4e8ca9e92fe585bc9687fd69b98f841 + version: 2337086736a6adeb2de6f66739b66ecd77535997 - name: github.com/tendermint/go-flowrate version: a20c98e61957faa93b4014fbd902f20ab9317a6a subpackages: