Browse Source

excessive logging. update tmlibs for timer fix

pull/983/head
Ethan Buchman 7 years ago
parent
commit
5d04ccbe51
11 changed files with 113 additions and 44 deletions
  1. +1
    -1
      config/config.go
  2. +1
    -1
      consensus/mempool_test.go
  3. +5
    -0
      consensus/reactor.go
  4. +72
    -33
      consensus/reactor_test.go
  5. +5
    -4
      consensus/replay_test.go
  6. +4
    -0
      consensus/state.go
  7. +8
    -2
      consensus/wal_generator.go
  8. +1
    -1
      glide.lock
  9. +6
    -0
      p2p/connection.go
  10. +2
    -0
      p2p/peer.go
  11. +8
    -2
      p2p/switch.go

+ 1
- 1
config/config.go View File

@ -389,7 +389,7 @@ func DefaultConsensusConfig() *ConsensusConfig {
// TestConsensusConfig returns a configuration for testing the consensus service // TestConsensusConfig returns a configuration for testing the consensus service
func TestConsensusConfig() *ConsensusConfig { func TestConsensusConfig() *ConsensusConfig {
config := DefaultConsensusConfig() config := DefaultConsensusConfig()
config.TimeoutPropose = 2000
config.TimeoutPropose = 100
config.TimeoutProposeDelta = 1 config.TimeoutProposeDelta = 1
config.TimeoutPrevote = 10 config.TimeoutPrevote = 10
config.TimeoutPrevoteDelta = 1 config.TimeoutPrevoteDelta = 1


+ 1
- 1
consensus/mempool_test.go View File

@ -104,8 +104,8 @@ func TestTxConcurrentWithCommit(t *testing.T) {
go deliverTxsRange(cs, 0, NTxs) go deliverTxsRange(cs, 0, NTxs)
startTestRound(cs, height, round) startTestRound(cs, height, round)
ticker := time.NewTicker(time.Second * 20)
for nTxs := 0; nTxs < NTxs; { for nTxs := 0; nTxs < NTxs; {
ticker := time.NewTicker(time.Second * 30)
select { select {
case b := <-newBlockCh: case b := <-newBlockCh:
evt := b.(types.TMEventData).Unwrap().(types.EventDataNewBlock) evt := b.(types.TMEventData).Unwrap().(types.EventDataNewBlock)


+ 5
- 0
consensus/reactor.go View File

@ -76,8 +76,11 @@ func (conR *ConsensusReactor) OnStart() error {
// OnStop implements BaseService // OnStop implements BaseService
func (conR *ConsensusReactor) OnStop() { func (conR *ConsensusReactor) OnStop() {
conR.Logger.Debug("conR.OnStop")
conR.BaseReactor.OnStop() conR.BaseReactor.OnStop()
conR.Logger.Debug("conR.OnStop: Stopping ConsensusState")
conR.conS.Stop() conR.conS.Stop()
conR.Logger.Debug("conR.OnStop: DONE")
} }
// SwitchToConsensus switches from fast_sync mode to consensus mode. // SwitchToConsensus switches from fast_sync mode to consensus mode.
@ -1197,6 +1200,8 @@ func (ps *PeerState) String() string {
// StringIndented returns a string representation of the PeerState // StringIndented returns a string representation of the PeerState
func (ps *PeerState) StringIndented(indent string) string { func (ps *PeerState) StringIndented(indent string) string {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return fmt.Sprintf(`PeerState{ return fmt.Sprintf(`PeerState{
%s Key %v %s Key %v
%s PRS %v %s PRS %v


+ 72
- 33
consensus/reactor_test.go View File

@ -10,11 +10,13 @@ import (
"time" "time"
"github.com/tendermint/abci/example/dummy" "github.com/tendermint/abci/example/dummy"
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -53,6 +55,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus
// make connected switches and start all reactors // make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i]) s.AddReactor("CONSENSUS", reactors[i])
s.SetLogger(reactors[i].Logger.With("module", "p2p", "validator", i))
return s return s
}, p2p.Connect2Switches) }, p2p.Connect2Switches)
@ -67,13 +70,17 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus
return reactors, eventChans, eventBuses return reactors, eventChans, eventBuses
} }
func stopConsensusNet(reactors []*ConsensusReactor, eventBuses []*types.EventBus) {
for _, r := range reactors {
func stopConsensusNet(logger log.Logger, reactors []*ConsensusReactor, eventBuses []*types.EventBus) {
logger.Info("stopConsensusNet", "n", len(reactors))
for i, r := range reactors {
logger.Info("stopConsensusNet: Stopping ConsensusReactor", "i", i)
r.Switch.Stop() r.Switch.Stop()
} }
for _, b := range eventBuses {
for i, b := range eventBuses {
logger.Info("stopConsensusNet: Stopping eventBus", "i", i)
b.Stop() b.Stop()
} }
logger.Info("stopConsensusNet: DONE", "n", len(reactors))
} }
// Ensure a testnet makes blocks // Ensure a testnet makes blocks
@ -81,7 +88,7 @@ func TestReactor(t *testing.T) {
N := 4 N := 4
css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
reactors, eventChans, eventBuses := startConsensusNet(t, css, N) reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(reactors, eventBuses)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// wait till everyone makes the first new block // wait till everyone makes the first new block
timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) { timeoutWaitGroup(t, N, func(wg *sync.WaitGroup, j int) {
<-eventChans[j] <-eventChans[j]
@ -97,7 +104,7 @@ func TestReactorProposalHeartbeats(t *testing.T) {
c.Consensus.CreateEmptyBlocks = false c.Consensus.CreateEmptyBlocks = false
}) })
reactors, eventChans, eventBuses := startConsensusNet(t, css, N) reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
defer stopConsensusNet(reactors, eventBuses)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
heartbeatChans := make([]chan interface{}, N) heartbeatChans := make([]chan interface{}, N)
var err error var err error
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
@ -126,11 +133,12 @@ func TestReactorProposalHeartbeats(t *testing.T) {
//------------------------------------------------------------- //-------------------------------------------------------------
// ensure we can make blocks despite cycling a validator set // ensure we can make blocks despite cycling a validator set
func TestVotingPowerChange(t *testing.T) {
func TestReactorVotingPowerChange(t *testing.T) {
nVals := 4 nVals := 4
logger := log.TestingLogger()
css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentDummy) css := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentDummy)
reactors, eventChans, eventBuses := startConsensusNet(t, css, nVals) reactors, eventChans, eventBuses := startConsensusNet(t, css, nVals)
defer stopConsensusNet(reactors, eventBuses)
defer stopConsensusNet(logger, reactors, eventBuses)
// map of active validators // map of active validators
activeVals := make(map[string]struct{}) activeVals := make(map[string]struct{})
@ -145,14 +153,14 @@ func TestVotingPowerChange(t *testing.T) {
}, css) }, css)
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
t.Log("---------------------------- Testing changing the voting power of one validator a few times")
logger.Debug("---------------------------- Testing changing the voting power of one validator a few times")
val1PubKey := css[0].privValidator.GetPubKey() val1PubKey := css[0].privValidator.GetPubKey()
updateValidatorTx := dummy.MakeValSetChangeTx(val1PubKey.Bytes(), 25) updateValidatorTx := dummy.MakeValSetChangeTx(val1PubKey.Bytes(), 25)
previousTotalVotingPower := css[0].GetRoundState().LastValidators.TotalVotingPower() previousTotalVotingPower := css[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
@ -164,7 +172,7 @@ func TestVotingPowerChange(t *testing.T) {
previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower() previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
@ -176,7 +184,7 @@ func TestVotingPowerChange(t *testing.T) {
previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower() previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css) waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
@ -185,13 +193,15 @@ func TestVotingPowerChange(t *testing.T) {
} }
} }
func TestValidatorSetChanges(t *testing.T) {
func TestReactorValidatorSetChanges(t *testing.T) {
nPeers := 7 nPeers := 7
nVals := 4 nVals := 4
css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentDummy) css := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentDummy)
logger := log.TestingLogger()
reactors, eventChans, eventBuses := startConsensusNet(t, css, nPeers) reactors, eventChans, eventBuses := startConsensusNet(t, css, nPeers)
defer stopConsensusNet(reactors, eventBuses)
defer stopConsensusNet(logger, reactors, eventBuses)
// map of active validators // map of active validators
activeVals := make(map[string]struct{}) activeVals := make(map[string]struct{})
@ -206,7 +216,7 @@ func TestValidatorSetChanges(t *testing.T) {
}, css) }, css)
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
t.Log("---------------------------- Testing adding one validator")
logger.Info("---------------------------- Testing adding one validator")
newValidatorPubKey1 := css[nVals].privValidator.GetPubKey() newValidatorPubKey1 := css[nVals].privValidator.GetPubKey()
newValidatorTx1 := dummy.MakeValSetChangeTx(newValidatorPubKey1.Bytes(), testMinPower) newValidatorTx1 := dummy.MakeValSetChangeTx(newValidatorPubKey1.Bytes(), testMinPower)
@ -218,7 +228,7 @@ func TestValidatorSetChanges(t *testing.T) {
// wait till everyone makes block 3. // wait till everyone makes block 3.
// it includes the commit for block 2, which is by the original validator set // it includes the commit for block 2, which is by the original validator set
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx1)
// wait till everyone makes block 4. // wait till everyone makes block 4.
// it includes the commit for block 3, which is by the original validator set // it includes the commit for block 3, which is by the original validator set
@ -232,14 +242,14 @@ func TestValidatorSetChanges(t *testing.T) {
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
t.Log("---------------------------- Testing changing the voting power of one validator")
logger.Info("---------------------------- Testing changing the voting power of one validator")
updateValidatorPubKey1 := css[nVals].privValidator.GetPubKey() updateValidatorPubKey1 := css[nVals].privValidator.GetPubKey()
updateValidatorTx1 := dummy.MakeValSetChangeTx(updateValidatorPubKey1.Bytes(), 25) updateValidatorTx1 := dummy.MakeValSetChangeTx(updateValidatorPubKey1.Bytes(), 25)
previousTotalVotingPower := css[nVals].GetRoundState().LastValidators.TotalVotingPower() previousTotalVotingPower := css[nVals].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, updateValidatorTx1) waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, updateValidatorTx1)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, updateValidatorTx1)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
@ -248,7 +258,7 @@ func TestValidatorSetChanges(t *testing.T) {
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
t.Log("---------------------------- Testing adding two validators at once")
logger.Info("---------------------------- Testing adding two validators at once")
newValidatorPubKey2 := css[nVals+1].privValidator.GetPubKey() newValidatorPubKey2 := css[nVals+1].privValidator.GetPubKey()
newValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), testMinPower) newValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), testMinPower)
@ -257,20 +267,20 @@ func TestValidatorSetChanges(t *testing.T) {
newValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), testMinPower) newValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), testMinPower)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3) waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
activeVals[string(newValidatorPubKey2.Address())] = struct{}{} activeVals[string(newValidatorPubKey2.Address())] = struct{}{}
activeVals[string(newValidatorPubKey3.Address())] = struct{}{} activeVals[string(newValidatorPubKey3.Address())] = struct{}{}
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css) waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
t.Log("---------------------------- Testing removing two validators at once")
logger.Info("---------------------------- Testing removing two validators at once")
removeValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), 0) removeValidatorTx2 := dummy.MakeValSetChangeTx(newValidatorPubKey2.Bytes(), 0)
removeValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), 0) removeValidatorTx3 := dummy.MakeValSetChangeTx(newValidatorPubKey3.Bytes(), 0)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3) waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3)
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css) waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
delete(activeVals, string(newValidatorPubKey2.Address())) delete(activeVals, string(newValidatorPubKey2.Address()))
delete(activeVals, string(newValidatorPubKey3.Address())) delete(activeVals, string(newValidatorPubKey3.Address()))
@ -287,7 +297,7 @@ func TestReactorWithTimeoutCommit(t *testing.T) {
} }
reactors, eventChans, eventBuses := startConsensusNet(t, css, N-1) reactors, eventChans, eventBuses := startConsensusNet(t, css, N-1)
defer stopConsensusNet(reactors, eventBuses)
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
// wait till everyone makes the first new block // wait till everyone makes the first new block
timeoutWaitGroup(t, N-1, func(wg *sync.WaitGroup, j int) { timeoutWaitGroup(t, N-1, func(wg *sync.WaitGroup, j int) {
@ -300,21 +310,51 @@ func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}
timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) { timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) {
defer wg.Done() defer wg.Done()
css[j].Logger.Debug("waitForAndValidateBlock")
newBlockI, ok := <-eventChans[j] newBlockI, ok := <-eventChans[j]
if !ok { if !ok {
return return
} }
newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
t.Logf("Got block height=%v validator=%v", newBlock.Height, j)
css[j].Logger.Debug("waitForAndValidateBlock: Got block", "height", newBlock.Height)
err := validateBlock(newBlock, activeVals) err := validateBlock(newBlock, activeVals)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, err)
for _, tx := range txs { for _, tx := range txs {
if err = css[j].mempool.CheckTx(tx, nil); err != nil {
t.Fatal(err)
css[j].mempool.CheckTx(tx, nil)
assert.Nil(t, err)
}
}, css)
}
func waitForAndValidateBlockWithTx(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) {
defer wg.Done()
ntxs := 0
BLOCK_TX_LOOP:
for {
css[j].Logger.Debug("waitForAndValidateBlockWithTx", "ntxs", ntxs)
newBlockI, ok := <-eventChans[j]
if !ok {
return
}
newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
css[j].Logger.Debug("waitForAndValidateBlockWithTx: Got block", "height", newBlock.Height)
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
// check that txs match the txs we're waiting for.
// note they could be spread over multiple blocks,
// but they should be in order.
for _, tx := range newBlock.Data.Txs {
assert.EqualValues(t, txs[ntxs], tx)
ntxs += 1
}
if ntxs == len(txs) {
break BLOCK_TX_LOOP
} }
} }
}, css) }, css)
} }
@ -325,23 +365,22 @@ func waitForBlockWithUpdatedValsAndValidateIt(t *testing.T, n int, updatedVals m
var newBlock *types.Block var newBlock *types.Block
LOOP: LOOP:
for { for {
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt")
newBlockI, ok := <-eventChans[j] newBlockI, ok := <-eventChans[j]
if !ok { if !ok {
return return
} }
newBlock = newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block newBlock = newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block
if newBlock.LastCommit.Size() == len(updatedVals) { if newBlock.LastCommit.Size() == len(updatedVals) {
t.Logf("Block with new validators height=%v validator=%v", newBlock.Height, j)
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block", "height", newBlock.Height)
break LOOP break LOOP
} else { } else {
t.Logf("Block with no new validators height=%v validator=%v. Skipping...", newBlock.Height, j)
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block with no new validators. Skipping", "height", newBlock.Height)
} }
} }
err := validateBlock(newBlock, updatedVals) err := validateBlock(newBlock, updatedVals)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, err)
}, css) }, css)
} }


+ 5
- 4
consensus/replay_test.go View File

@ -65,9 +65,7 @@ func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64,
err := cs.Start() err := cs.Start()
require.NoError(t, err) require.NoError(t, err)
defer func() {
cs.Stop()
}()
defer cs.Stop()
// This is just a signal that we haven't halted; its not something contained // This is just a signal that we haven't halted; its not something contained
// in the WAL itself. Assuming the consensus state is running, replay of any // in the WAL itself. Assuming the consensus state is running, replay of any
@ -337,6 +335,8 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
if err := wal.Start(); err != nil { if err := wal.Start(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer wal.Stop()
chain, commits, err := makeBlockchainFromWAL(wal) chain, commits, err := makeBlockchainFromWAL(wal)
if err != nil { if err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
@ -366,6 +366,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err) t.Fatalf("Error starting proxy app connections: %v", err)
} }
defer proxyApp.Stop()
// get the latest app hash from the app // get the latest app hash from the app
res, err := proxyApp.Query().InfoSync(abci.RequestInfo{""}) res, err := proxyApp.Query().InfoSync(abci.RequestInfo{""})
@ -404,13 +405,13 @@ func buildAppStateFromChain(proxyApp proxy.AppConns,
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
panic(err) panic(err)
} }
defer proxyApp.Stop()
validators := types.TM2PB.Validators(state.Validators) validators := types.TM2PB.Validators(state.Validators)
if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil {
panic(err) panic(err)
} }
defer proxyApp.Stop()
switch mode { switch mode {
case 0: case 0:
for i := 0; i < nBlocks; i++ { for i := 0; i < nBlocks; i++ {


+ 4
- 0
consensus/state.go View File

@ -267,14 +267,18 @@ func (cs *ConsensusState) startRoutines(maxSteps int) {
// OnStop implements cmn.Service. It stops all routines and waits for the WAL to finish. // OnStop implements cmn.Service. It stops all routines and waits for the WAL to finish.
func (cs *ConsensusState) OnStop() { func (cs *ConsensusState) OnStop() {
cs.Logger.Debug("conS.OnStop")
cs.BaseService.OnStop() cs.BaseService.OnStop()
cs.Logger.Debug("conS.OnStop: Stopping ticker")
cs.timeoutTicker.Stop() cs.timeoutTicker.Stop()
// Make BaseService.Wait() wait until cs.wal.Wait() // Make BaseService.Wait() wait until cs.wal.Wait()
cs.Logger.Debug("conS.OnStop: Waiting for WAL")
if cs.IsRunning() { if cs.IsRunning() {
cs.wal.Wait() cs.wal.Wait()
} }
cs.Logger.Debug("conS.OnStop: DONE")
} }
// Wait waits for the the main routine to return. // Wait waits for the the main routine to return.


+ 8
- 2
consensus/wal_generator.go View File

@ -77,7 +77,7 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
var b bytes.Buffer var b bytes.Buffer
wr := bufio.NewWriter(&b) wr := bufio.NewWriter(&b)
numBlocksWritten := make(chan struct{}) numBlocksWritten := make(chan struct{})
wal := newByteBufferWAL(NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
wal := newByteBufferWAL(logger, NewWALEncoder(wr), int64(numBlocks), numBlocksWritten)
// see wal.go#103 // see wal.go#103
wal.Save(EndHeightMessage{0}) wal.Save(EndHeightMessage{0})
consensusState.wal = wal consensusState.wal = wal
@ -142,16 +142,18 @@ type byteBufferWAL struct {
stopped bool stopped bool
heightToStop int64 heightToStop int64
signalWhenStopsTo chan struct{} signalWhenStopsTo chan struct{}
logger log.Logger
} }
// needed for determinism // needed for determinism
var fixedTime, _ = time.Parse(time.RFC3339, "2017-01-02T15:04:05Z") var fixedTime, _ = time.Parse(time.RFC3339, "2017-01-02T15:04:05Z")
func newByteBufferWAL(enc *WALEncoder, nBlocks int64, signalStop chan struct{}) *byteBufferWAL {
func newByteBufferWAL(logger log.Logger, enc *WALEncoder, nBlocks int64, signalStop chan struct{}) *byteBufferWAL {
return &byteBufferWAL{ return &byteBufferWAL{
enc: enc, enc: enc,
heightToStop: nBlocks, heightToStop: nBlocks,
signalWhenStopsTo: signalStop, signalWhenStopsTo: signalStop,
logger: logger,
} }
} }
@ -160,17 +162,21 @@ func newByteBufferWAL(enc *WALEncoder, nBlocks int64, signalStop chan struct{})
// skip writing. // skip writing.
func (w *byteBufferWAL) Save(m WALMessage) { func (w *byteBufferWAL) Save(m WALMessage) {
if w.stopped { if w.stopped {
w.logger.Debug("WAL already stopped. Not writing message", "msg", m)
return return
} }
if endMsg, ok := m.(EndHeightMessage); ok { if endMsg, ok := m.(EndHeightMessage); ok {
w.logger.Debug("WAL write end height message", "height", endMsg.Height, "stopHeight", w.heightToStop)
if endMsg.Height == w.heightToStop { if endMsg.Height == w.heightToStop {
w.logger.Debug("Stopping WAL at height", "height", endMsg.Height)
w.signalWhenStopsTo <- struct{}{} w.signalWhenStopsTo <- struct{}{}
w.stopped = true w.stopped = true
return return
} }
} }
w.logger.Debug("WAL Write Message", "msg", m)
err := w.enc.Encode(&TimedWALMessage{fixedTime, m}) err := w.enc.Encode(&TimedWALMessage{fixedTime, m})
if err != nil { if err != nil {
panic(fmt.Sprintf("failed to encode the msg %v", m)) panic(fmt.Sprintf("failed to encode the msg %v", m))


+ 1
- 1
glide.lock View File

@ -129,7 +129,7 @@ imports:
subpackages: subpackages:
- iavl - iavl
- name: github.com/tendermint/tmlibs - name: github.com/tendermint/tmlibs
version: e4ef2835f0081c2ece83b9c1f777cf071f956e81
version: a483e1ff486b577ba94e6a20f08bf52fbb7bff14
subpackages: subpackages:
- autofile - autofile
- cli - cli


+ 6
- 0
p2p/connection.go View File

@ -185,13 +185,19 @@ func (c *MConnection) OnStart() error {
// OnStop implements BaseService // OnStop implements BaseService
func (c *MConnection) OnStop() { func (c *MConnection) OnStop() {
c.Logger.Debug("MConn.OnStop")
c.BaseService.OnStop() c.BaseService.OnStop()
c.Logger.Debug("MConn.flushTimer.Stop")
c.flushTimer.Stop() c.flushTimer.Stop()
c.Logger.Debug("MConn.pingTimer.Stop")
c.pingTimer.Stop() c.pingTimer.Stop()
c.Logger.Debug("MConn.chStatsTimer.Stop")
c.chStatsTimer.Stop() c.chStatsTimer.Stop()
if c.quit != nil { if c.quit != nil {
c.Logger.Debug("MConn: Close Quit")
close(c.quit) close(c.quit)
} }
c.Logger.Debug("MConn.conn.Close()")
c.conn.Close() // nolint: errcheck c.conn.Close() // nolint: errcheck
// We can't close pong safely here because // We can't close pong safely here because
// recvRoutine may write to it after we've stopped. // recvRoutine may write to it after we've stopped.


+ 2
- 0
p2p/peer.go View File

@ -237,7 +237,9 @@ func (p *peer) OnStart() error {
// OnStop implements BaseService. // OnStop implements BaseService.
func (p *peer) OnStop() { func (p *peer) OnStop() {
p.Logger.Debug("Peer.OnStop")
p.BaseService.OnStop() p.BaseService.OnStop()
p.Logger.Debug("Peer.mconn.Stop")
p.mconn.Stop() p.mconn.Stop()
} }


+ 8
- 2
p2p/switch.go View File

@ -210,17 +210,23 @@ func (sw *Switch) OnStart() error {
// OnStop implements BaseService. It stops all listeners, peers, and reactors. // OnStop implements BaseService. It stops all listeners, peers, and reactors.
func (sw *Switch) OnStop() { func (sw *Switch) OnStop() {
// Stop listeners // Stop listeners
sw.Logger.Debug("Switch: Stopping listeners")
for _, listener := range sw.listeners { for _, listener := range sw.listeners {
listener.Stop() listener.Stop()
} }
sw.listeners = nil sw.listeners = nil
// Stop peers // Stop peers
for _, peer := range sw.peers.List() {
sw.Logger.Debug("Switch: Stopping Peers")
for i, peer := range sw.peers.List() {
sw.Logger.Debug("Switch: Stopping peer", "i", i, "peer", peer)
peer.Stop() peer.Stop()
sw.Logger.Debug("Switch: Removing peer", "i", i, "peer", peer)
sw.peers.Remove(peer) sw.peers.Remove(peer)
} }
// Stop reactors // Stop reactors
for _, reactor := range sw.reactors {
sw.Logger.Debug("Switch: Stopping reactors")
for name, reactor := range sw.reactors {
sw.Logger.Debug("Switch: Stopping reactor", "name", name)
reactor.Stop() reactor.Stop()
} }
} }


Loading…
Cancel
Save