Browse Source

shrink and fix

pull/8091/head
tycho garen 3 years ago
parent
commit
e550e15924
3 changed files with 76 additions and 46 deletions
  1. +1
    -1
      internal/consensus/invalid_test.go
  2. +32
    -29
      internal/consensus/reactor_test.go
  3. +43
    -16
      internal/consensus/state.go

+ 1
- 1
internal/consensus/invalid_test.go View File

@ -26,7 +26,7 @@ func TestReactorInvalidPrecommit(t *testing.T) {
config := configSetup(t)
n := 4
n := 2
states, cleanup := makeConsensusState(ctx, t,
config, n, "consensus_reactor_test",
newMockTickerFunc(true))


+ 32
- 29
internal/consensus/reactor_test.go View File

@ -356,7 +356,7 @@ func TestReactorBasic(t *testing.T) {
cfg := configSetup(t)
n := 4
n := 2
states, cleanup := makeConsensusState(ctx, t,
cfg, n, "consensus_reactor_test",
newMockTickerFunc(true))
@ -369,7 +369,8 @@ func TestReactorBasic(t *testing.T) {
}
var wg sync.WaitGroup
errCh := make(chan error, len(rts.subs))
bErrCh := make(chan error, len(rts.subs))
bsErrCh := make(chan error, len(rts.blocksyncSubs))
for _, sub := range rts.subs {
wg.Add(1)
@ -383,14 +384,16 @@ func TestReactorBasic(t *testing.T) {
return
case errors.Is(err, context.Canceled):
return
case err != nil:
default:
select {
case errCh <- err:
case bErrCh <- err:
if err != nil {
cancel() // terminate other workers
}
return
case <-ctx.Done():
return
}
cancel() // terminate other workers
return
}
}(sub)
}
@ -402,20 +405,19 @@ func TestReactorBasic(t *testing.T) {
select {
case <-ctx.Done():
t.Fatal("encountered timeout")
case err := <-errCh:
case err := <-bErrCh:
if err != nil {
t.Fatal(err)
}
}
case <-ctx.Done():
t.Fatal("encountered timeout")
case err := <-errCh:
case err := <-bErrCh:
if err != nil {
t.Fatal(err)
}
}
errCh = make(chan error, len(rts.blocksyncSubs))
for _, sub := range rts.blocksyncSubs {
wg.Add(1)
@ -429,7 +431,7 @@ func TestReactorBasic(t *testing.T) {
case errors.Is(err, context.Canceled):
return
case err != nil:
errCh <- err
bsErrCh <- err
cancel() // terminate other workers
return
}
@ -443,7 +445,7 @@ func TestReactorBasic(t *testing.T) {
}
select {
case err := <-errCh:
case err := <-bsErrCh:
if err != nil {
t.Fatal(err)
}
@ -457,7 +459,7 @@ func TestReactorWithEvidence(t *testing.T) {
cfg := configSetup(t)
n := 4
n := 2
testName := "consensus_reactor_test"
tickerFunc := newMockTickerFunc(true)
@ -578,7 +580,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
cfg := configSetup(t)
n := 4
n := 2
states, cleanup := makeConsensusState(ctx,
t,
cfg,
@ -602,7 +604,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
// send a tx
require.NoError(
t,
assertMempool(t, states[3].txNotifier).CheckTx(
assertMempool(t, states[1].txNotifier).CheckTx(
ctx,
[]byte{1, 2, 3},
nil,
@ -645,7 +647,7 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
cfg := configSetup(t)
n := 4
n := 2
states, cleanup := makeConsensusState(ctx, t,
cfg, n, "consensus_reactor_test",
newMockTickerFunc(true))
@ -723,7 +725,7 @@ func TestReactorVotingPowerChange(t *testing.T) {
cfg := configSetup(t)
n := 4
n := 2
states, cleanup := makeConsensusState(ctx,
t,
cfg,
@ -798,11 +800,12 @@ func TestReactorVotingPowerChange(t *testing.T) {
waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states)
waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states)
newVotingPower := states[0].GetRoundState().LastValidators.TotalVotingPower()
require.NotEqualf(
t, previousTotalVotingPower, states[0].GetRoundState().LastValidators.TotalVotingPower(),
t, previousTotalVotingPower, newVotingPower,
"expected voting power to change (before: %d, after: %d)",
previousTotalVotingPower,
states[0].GetRoundState().LastValidators.TotalVotingPower(),
newVotingPower,
)
updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 2)
@ -836,13 +839,13 @@ func TestReactorVotingPowerChange(t *testing.T) {
}
func TestReactorValidatorSetChanges(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
cfg := configSetup(t)
nPeers := 7
nVals := 4
nPeers := 2
nVals := 2
states, _, _, cleanup := randConsensusNetWithPeers(
ctx,
t,
@ -855,7 +858,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
)
t.Cleanup(cleanup)
rts := setup(ctx, t, nPeers, states, 100) // buffer must be large enough to not deadlock
rts := setup(ctx, t, nPeers, states, 512) // buffer must be large enough to not deadlock
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
@ -911,7 +914,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
t.Fatal("subscription encountered unexpected error")
}
newValidatorPubKey1, err := states[nVals].privValidator.GetPubKey(ctx)
newValidatorPubKey1, err := states[nVals-1].privValidator.GetPubKey(ctx)
require.NoError(t, err)
valPubKey1ABCI, err := encoding.PubKeyToProto(newValidatorPubKey1)
@ -944,14 +947,14 @@ func TestReactorValidatorSetChanges(t *testing.T) {
// it includes the commit for block 4, which should have the updated validator set
waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states)
updateValidatorPubKey1, err := states[nVals].privValidator.GetPubKey(ctx)
updateValidatorPubKey1, err := states[nVals-1].privValidator.GetPubKey(ctx)
require.NoError(t, err)
updatePubKey1ABCI, err := encoding.PubKeyToProto(updateValidatorPubKey1)
require.NoError(t, err)
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
previousTotalVotingPower := states[nVals].GetRoundState().LastValidators.TotalVotingPower()
previousTotalVotingPower := states[nVals-1].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1)
waitForAndValidateBlockWithTx(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1)
@ -959,12 +962,12 @@ func TestReactorValidatorSetChanges(t *testing.T) {
waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states)
require.NotEqualf(
t, states[nVals].GetRoundState().LastValidators.TotalVotingPower(), previousTotalVotingPower,
t, states[nVals-1].GetRoundState().LastValidators.TotalVotingPower(), previousTotalVotingPower,
"expected voting power to change (before: %d, after: %d)",
previousTotalVotingPower, states[nVals].GetRoundState().LastValidators.TotalVotingPower(),
previousTotalVotingPower, states[nVals-1].GetRoundState().LastValidators.TotalVotingPower(),
)
newValidatorPubKey2, err := states[nVals+1].privValidator.GetPubKey(ctx)
newValidatorPubKey2, err := states[nVals-2].privValidator.GetPubKey(ctx)
require.NoError(t, err)
newVal2ABCI, err := encoding.PubKeyToProto(newValidatorPubKey2)
@ -972,7 +975,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower)
newValidatorPubKey3, err := states[nVals+2].privValidator.GetPubKey(ctx)
newValidatorPubKey3, err := states[nVals-2].privValidator.GetPubKey(ctx)
require.NoError(t, err)
newVal3ABCI, err := encoding.PubKeyToProto(newValidatorPubKey3)


+ 43
- 16
internal/consensus/state.go View File

@ -514,13 +514,6 @@ func (cs *State) OnStop() {
}
}
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
close(cs.onStopCh)
}()
if cs.evsw.IsRunning() {
cs.evsw.Stop()
}
@ -655,6 +648,8 @@ func (cs *State) SetProposalAndBlock(
func (cs *State) updateHeight(height int64) {
cs.metrics.Height.Set(float64(height))
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.Height = height
}
@ -776,7 +771,13 @@ func (cs *State) updateToState(ctx context.Context, state sm.State) {
switch {
case state.LastBlockHeight == 0: // Very first commit should be empty.
cs.LastCommit = (*types.VoteSet)(nil)
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.LastCommit = (*types.VoteSet)(nil)
}()
case cs.CommitRound > -1 && cs.Votes != nil: // Otherwise, use cs.Votes
if !cs.Votes.Precommits(cs.CommitRound).HasTwoThirdsMajority() {
panic(fmt.Sprintf(
@ -785,7 +786,12 @@ func (cs *State) updateToState(ctx context.Context, state sm.State) {
))
}
cs.LastCommit = cs.Votes.Precommits(cs.CommitRound)
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.LastCommit = cs.Votes.Precommits(cs.CommitRound)
}()
case cs.LastCommit == nil:
// NOTE: when Tendermint starts, it has no votes. reconstructLastCommit
@ -1790,8 +1796,13 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3
// Done enterCommit:
// keep cs.Round the same, commitRound points to the right Precommits set.
cs.updateRoundStep(cs.Round, cstypes.RoundStepCommit)
cs.CommitRound = commitRound
cs.CommitTime = tmtime.Now()
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.CommitRound = commitRound
cs.CommitTime = tmtime.Now()
}()
cs.newStep(ctx)
// Maybe finalize immediately.
@ -2215,9 +2226,15 @@ func (cs *State) addProposalBlockPart(
"valid_block_hash", cs.ProposalBlock.Hash(),
)
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
}()
}
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
@ -2392,12 +2409,22 @@ func (cs *State) addVote(
)
// we're getting the wrong block
cs.ProposalBlock = nil
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.ProposalBlock = nil
}()
}
if !cs.ProposalBlockParts.HasHeader(blockID.PartSetHeader) {
cs.metrics.MarkBlockGossipStarted()
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader)
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader)
}()
}
cs.evsw.FireEvent(ctx, types.EventValidBlockValue, &cs.RoundState)


Loading…
Cancel
Save