Browse Source

shrink and fix

pull/8127/head
tycho garen 3 years ago
committed by M. J. Fromberger
parent
commit
a40b52237e
3 changed files with 110 additions and 36 deletions
  1. +1
    -1
      internal/consensus/invalid_test.go
  2. +32
    -29
      internal/consensus/reactor_test.go
  3. +77
    -6
      internal/consensus/state.go

+ 1
- 1
internal/consensus/invalid_test.go View File

@ -26,7 +26,7 @@ func TestReactorInvalidPrecommit(t *testing.T) {
config := configSetup(t)
n := 4
n := 2
states, cleanup := makeConsensusState(ctx, t,
config, n, "consensus_reactor_test",
newMockTickerFunc(true))


+ 32
- 29
internal/consensus/reactor_test.go View File

@ -355,7 +355,7 @@ func TestReactorBasic(t *testing.T) {
cfg := configSetup(t)
n := 4
n := 2
states, cleanup := makeConsensusState(ctx, t,
cfg, n, "consensus_reactor_test",
newMockTickerFunc(true))
@ -368,7 +368,8 @@ func TestReactorBasic(t *testing.T) {
}
var wg sync.WaitGroup
errCh := make(chan error, len(rts.subs))
bErrCh := make(chan error, len(rts.subs))
bsErrCh := make(chan error, len(rts.blocksyncSubs))
for _, sub := range rts.subs {
wg.Add(1)
@ -382,14 +383,16 @@ func TestReactorBasic(t *testing.T) {
return
case errors.Is(err, context.Canceled):
return
case err != nil:
default:
select {
case errCh <- err:
case bErrCh <- err:
if err != nil {
cancel() // terminate other workers
}
return
case <-ctx.Done():
return
}
cancel() // terminate other workers
return
}
}(sub)
}
@ -401,20 +404,19 @@ func TestReactorBasic(t *testing.T) {
select {
case <-ctx.Done():
t.Fatal("encountered timeout")
case err := <-errCh:
case err := <-bErrCh:
if err != nil {
t.Fatal(err)
}
}
case <-ctx.Done():
t.Fatal("encountered timeout")
case err := <-errCh:
case err := <-bErrCh:
if err != nil {
t.Fatal(err)
}
}
errCh = make(chan error, len(rts.blocksyncSubs))
for _, sub := range rts.blocksyncSubs {
wg.Add(1)
@ -428,7 +430,7 @@ func TestReactorBasic(t *testing.T) {
case errors.Is(err, context.Canceled):
return
case err != nil:
errCh <- err
bsErrCh <- err
cancel() // terminate other workers
return
}
@ -442,7 +444,7 @@ func TestReactorBasic(t *testing.T) {
}
select {
case err := <-errCh:
case err := <-bsErrCh:
if err != nil {
t.Fatal(err)
}
@ -456,7 +458,7 @@ func TestReactorWithEvidence(t *testing.T) {
cfg := configSetup(t)
n := 4
n := 2
testName := "consensus_reactor_test"
tickerFunc := newMockTickerFunc(true)
@ -576,7 +578,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
cfg := configSetup(t)
n := 4
n := 2
states, cleanup := makeConsensusState(ctx,
t,
cfg,
@ -599,7 +601,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
// send a tx
require.NoError(
t,
assertMempool(t, states[3].txNotifier).CheckTx(
assertMempool(t, states[1].txNotifier).CheckTx(
ctx,
[]byte{1, 2, 3},
nil,
@ -642,7 +644,7 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
cfg := configSetup(t)
n := 4
n := 2
states, cleanup := makeConsensusState(ctx, t,
cfg, n, "consensus_reactor_test",
newMockTickerFunc(true))
@ -720,7 +722,7 @@ func TestReactorVotingPowerChange(t *testing.T) {
cfg := configSetup(t)
n := 4
n := 2
states, cleanup := makeConsensusState(ctx,
t,
cfg,
@ -795,11 +797,12 @@ func TestReactorVotingPowerChange(t *testing.T) {
waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states)
waitForAndValidateBlock(ctx, t, n, activeVals, blocksSubs, states)
newVotingPower := states[0].GetRoundState().LastValidators.TotalVotingPower()
require.NotEqualf(
t, previousTotalVotingPower, states[0].GetRoundState().LastValidators.TotalVotingPower(),
t, previousTotalVotingPower, newVotingPower,
"expected voting power to change (before: %d, after: %d)",
previousTotalVotingPower,
states[0].GetRoundState().LastValidators.TotalVotingPower(),
newVotingPower,
)
updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 2)
@ -833,13 +836,13 @@ func TestReactorVotingPowerChange(t *testing.T) {
}
func TestReactorValidatorSetChanges(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
cfg := configSetup(t)
nPeers := 7
nVals := 4
nPeers := 2
nVals := 2
states, _, _, cleanup := randConsensusNetWithPeers(
ctx,
t,
@ -852,7 +855,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
)
t.Cleanup(cleanup)
rts := setup(ctx, t, nPeers, states, 100) // buffer must be large enough to not deadlock
rts := setup(ctx, t, nPeers, states, 512) // buffer must be large enough to not deadlock
for _, reactor := range rts.reactors {
state := reactor.state.GetState()
@ -908,7 +911,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
t.Fatal("subscription encountered unexpected error")
}
newValidatorPubKey1, err := states[nVals].privValidator.GetPubKey(ctx)
newValidatorPubKey1, err := states[nVals-1].privValidator.GetPubKey(ctx)
require.NoError(t, err)
valPubKey1ABCI, err := encoding.PubKeyToProto(newValidatorPubKey1)
@ -941,14 +944,14 @@ func TestReactorValidatorSetChanges(t *testing.T) {
// it includes the commit for block 4, which should have the updated validator set
waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states)
updateValidatorPubKey1, err := states[nVals].privValidator.GetPubKey(ctx)
updateValidatorPubKey1, err := states[nVals-1].privValidator.GetPubKey(ctx)
require.NoError(t, err)
updatePubKey1ABCI, err := encoding.PubKeyToProto(updateValidatorPubKey1)
require.NoError(t, err)
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
previousTotalVotingPower := states[nVals].GetRoundState().LastValidators.TotalVotingPower()
previousTotalVotingPower := states[nVals-1].GetRoundState().LastValidators.TotalVotingPower()
waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1)
waitForAndValidateBlockWithTx(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1)
@ -956,12 +959,12 @@ func TestReactorValidatorSetChanges(t *testing.T) {
waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states)
require.NotEqualf(
t, states[nVals].GetRoundState().LastValidators.TotalVotingPower(), previousTotalVotingPower,
t, states[nVals-1].GetRoundState().LastValidators.TotalVotingPower(), previousTotalVotingPower,
"expected voting power to change (before: %d, after: %d)",
previousTotalVotingPower, states[nVals].GetRoundState().LastValidators.TotalVotingPower(),
previousTotalVotingPower, states[nVals-1].GetRoundState().LastValidators.TotalVotingPower(),
)
newValidatorPubKey2, err := states[nVals+1].privValidator.GetPubKey(ctx)
newValidatorPubKey2, err := states[nVals-2].privValidator.GetPubKey(ctx)
require.NoError(t, err)
newVal2ABCI, err := encoding.PubKeyToProto(newValidatorPubKey2)
@ -969,7 +972,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower)
newValidatorPubKey3, err := states[nVals+2].privValidator.GetPubKey(ctx)
newValidatorPubKey3, err := states[nVals-2].privValidator.GetPubKey(ctx)
require.NoError(t, err)
newVal3ABCI, err := encoding.PubKeyToProto(newValidatorPubKey3)


+ 77
- 6
internal/consensus/state.go View File

@ -514,6 +514,8 @@ func (cs *State) OnStop() {
}
}
close(cs.getOnStopCh())
if cs.timeoutTicker.IsRunning() {
cs.timeoutTicker.Stop()
}
@ -636,6 +638,8 @@ func (cs *State) SetProposalAndBlock(
func (cs *State) updateHeight(height int64) {
cs.metrics.Height.Set(float64(height))
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.Height = height
}
@ -757,7 +761,13 @@ func (cs *State) updateToState(ctx context.Context, state sm.State) {
switch {
case state.LastBlockHeight == 0: // Very first commit should be empty.
cs.LastCommit = (*types.VoteSet)(nil)
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.LastCommit = (*types.VoteSet)(nil)
}()
case cs.CommitRound > -1 && cs.Votes != nil: // Otherwise, use cs.Votes
if !cs.Votes.Precommits(cs.CommitRound).HasTwoThirdsMajority() {
panic(fmt.Sprintf(
@ -766,7 +776,12 @@ func (cs *State) updateToState(ctx context.Context, state sm.State) {
))
}
cs.LastCommit = cs.Votes.Precommits(cs.CommitRound)
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.LastCommit = cs.Votes.Precommits(cs.CommitRound)
}()
case cs.LastCommit == nil:
// NOTE: when Tendermint starts, it has no votes. reconstructLastCommit
@ -1802,8 +1817,13 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3
// Done enterCommit:
// keep cs.Round the same, commitRound points to the right Precommits set.
cs.updateRoundStep(cs.Round, cstypes.RoundStepCommit)
cs.CommitRound = commitRound
cs.CommitTime = tmtime.Now()
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.CommitRound = commitRound
cs.CommitTime = tmtime.Now()
}()
cs.newStep(ctx)
// Maybe finalize immediately.
@ -2215,6 +2235,47 @@ func (cs *State) addProposalBlockPart(
if err := cs.eventBus.PublishEventCompleteProposal(ctx, cs.CompleteProposalEvent()); err != nil {
cs.logger.Error("failed publishing event complete proposal", "err", err)
}
// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)
blockID, hasTwoThirds := prevotes.TwoThirdsMajority()
if hasTwoThirds && !blockID.IsNil() && (cs.ValidRound < cs.Round) {
if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.logger.Debug(
"updating valid block to new proposal block",
"valid_round", cs.Round,
"valid_block_hash", cs.ProposalBlock.Hash(),
)
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.ValidRound = cs.Round
cs.ValidBlock = cs.ProposalBlock
cs.ValidBlockParts = cs.ProposalBlockParts
}()
}
// TODO: In case there is +2/3 majority in Prevotes set for some
// block and cs.ProposalBlock contains different block, either
// proposer is faulty or voting power of faulty processes is more
// than 1/3. We should trigger in the future accountability
// procedure at this point.
}
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(ctx, height, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(ctx, height, cs.Round)
}
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(ctx, height)
}
return added, nil
}
return added, nil
@ -2403,12 +2464,22 @@ func (cs *State) addVote(
)
// we're getting the wrong block
cs.ProposalBlock = nil
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.ProposalBlock = nil
}()
}
if !cs.ProposalBlockParts.HasHeader(blockID.PartSetHeader) {
cs.metrics.MarkBlockGossipStarted()
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader)
func() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.ProposalBlockParts = types.NewPartSetFromHeader(blockID.PartSetHeader)
}()
}
cs.evsw.FireEvent(ctx, types.EventValidBlockValue, &cs.RoundState)


Loading…
Cancel
Save