From 5790ea9f43580e6258c397d5475b8c46686cd025 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 4 Jun 2015 13:36:47 -0700 Subject: [PATCH] consensus refactor: reconstruct LastCommits upon restart --- consensus/reactor.go | 31 ++++++--------------- consensus/state.go | 51 +++++++++++++++++++++++++--------- consensus/vote_set.go | 56 +++++++++++++++++++++++++------------- consensus/vote_set_test.go | 22 +++++++-------- 4 files changed, 95 insertions(+), 65 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 8671123ca..8cae3cbb3 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -524,28 +524,15 @@ OUTER_LOOP: // Catchup logic if prs.Height != 0 && !prs.HasAllCatchupCommits { - // If peer is lagging by height 1, match our LastCommits or SeenValidation to peer's Commits. - if rs.Height == prs.Height+1 && rs.LastCommits.Size() > 0 { - // If there are lastcommits to send... - if trySendVote(prs.Height, rs.LastCommits, prs.Commits) { - continue OUTER_LOOP - } else { - ps.SetHasAllCatchupCommits(prs.Height) - } - } - - // Or, if peer is lagging by 1 and we don't have LastCommits, send SeenValidation. - if rs.Height == prs.Height+1 && rs.LastCommits.Size() == 0 { - // Load the blockMeta for block at prs.Height - blockMeta := conR.blockStore.LoadBlockMeta(prs.Height) - // Load the seen validation for prs.Height - validation := conR.blockStore.LoadSeenValidation(prs.Height) - log.Debug("Loaded SeenValidation for catch-up", "height", prs.Height, "blockMeta", blockMeta, "validation", validation) - - if trySendCommitFromValidation(blockMeta, validation, prs.Commits) { - continue OUTER_LOOP - } else { - ps.SetHasAllCatchupCommits(prs.Height) + // If peer is lagging by height 1 + if rs.Height == prs.Height+1 { + if rs.LastCommits.Size() > 0 { + // Sync peer to rs.LastCommits + if trySendVote(prs.Height, rs.LastCommits, prs.Commits) { + continue OUTER_LOOP + } else { + ps.SetHasAllCatchupCommits(prs.Height) + } } } diff --git a/consensus/state.go b/consensus/state.go index de7b5fd20..78527f072 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -268,9 +268,38 @@ func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReacto newStepCh: make(chan *RoundState, 1), } cs.updateToState(state, true) + cs.reconstructLastCommits(state) return cs } +// Reconstruct LastCommits from SeenValidation, which we saved along with the block, +// (which happens even before saving the state) +func (cs *ConsensusState) reconstructLastCommits(state *sm.State) { + if state.LastBlockHeight == 0 { + return + } + lastCommits := NewVoteSet(state.LastBlockHeight, 0, types.VoteTypeCommit, state.LastBondedValidators) + seenValidation := cs.blockStore.LoadSeenValidation(state.LastBlockHeight) + for idx, commit := range seenValidation.Commits { + commitVote := &types.Vote{ + Height: state.LastBlockHeight, + Round: commit.Round, + Type: types.VoteTypeCommit, + BlockHash: state.LastBlockHash, + BlockParts: state.LastBlockParts, + Signature: commit.Signature, + } + added, _, err := lastCommits.AddByIndex(uint(idx), commitVote) + if !added || err != nil { + panic(Fmt("Failed to reconstruct LastCommits: %v", err)) + } + } + if !lastCommits.HasTwoThirdsMajority() { + panic("Failed to reconstruct LastCommits: Does not have +2/3 maj") + } + cs.LastCommits = lastCommits +} + func (cs *ConsensusState) GetState() *sm.State { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -645,13 +674,9 @@ func (cs *ConsensusState) RunActionPropose(height uint, round uint) { // Make the validation from LastCommits validation = cs.LastCommits.MakeValidation() } else { - // Upon reboot, we may have to use SeenValidation - validation = cs.blockStore.LoadSeenValidation(height - 1) - if validation == nil { - // We just don't have any validation for the previous block - log.Debug("Cannot propose anything: No validation for the previous block.") - return - } + // We just don't have any validation for the previous block + log.Debug("Cannot propose anything: No validation for the previous block.") + return } txs := cs.mempoolReactor.Mempool.GetProposalTxs() block = &types.Block{ @@ -1024,14 +1049,14 @@ func (cs *ConsensusState) addVote(address []byte, vote *types.Vote) (added bool, switch vote.Type { case types.VoteTypePrevote: // Prevotes checks for height+round match. - added, index, err = cs.Prevotes.Add(address, vote) + added, index, err = cs.Prevotes.AddByAddress(address, vote) if added { log.Debug(Fmt("Added prevote: %v", cs.Prevotes.StringShort())) } return case types.VoteTypePrecommit: // Precommits checks for height+round match. - added, index, err = cs.Precommits.Add(address, vote) + added, index, err = cs.Precommits.AddByAddress(address, vote) if added { log.Debug(Fmt("Added precommit: %v", cs.Precommits.StringShort())) } @@ -1040,9 +1065,9 @@ func (cs *ConsensusState) addVote(address []byte, vote *types.Vote) (added bool, if vote.Height == cs.Height { // No need to check if vote.Round < cs.Round ... // Prevotes && Precommits already checks that. - cs.Prevotes.Add(address, vote) - cs.Precommits.Add(address, vote) - added, index, err = cs.Commits.Add(address, vote) + cs.Prevotes.AddByAddress(address, vote) + cs.Precommits.AddByAddress(address, vote) + added, index, err = cs.Commits.AddByAddress(address, vote) if added && cs.Commits.HasTwoThirdsMajority() && cs.CommitTime.IsZero() { cs.CommitTime = time.Now() log.Debug(Fmt("Set CommitTime to %v", cs.CommitTime)) @@ -1061,7 +1086,7 @@ func (cs *ConsensusState) addVote(address []byte, vote *types.Vote) (added bool, return } if vote.Height+1 == cs.Height { - added, index, err = cs.LastCommits.Add(address, vote) + added, index, err = cs.LastCommits.AddByAddress(address, vote) log.Debug(Fmt("Added lastCommits: %v", cs.LastCommits.StringShort())) return } diff --git a/consensus/vote_set.go b/consensus/vote_set.go index ca2f44eb8..39f7747b5 100644 --- a/consensus/vote_set.go +++ b/consensus/vote_set.go @@ -70,14 +70,46 @@ func (voteSet *VoteSet) Size() uint { } } -// True if added, false if not. -// Returns ErrVote[UnexpectedStep|InvalidAccount|InvalidSignature|InvalidBlockHash|ConflictingSignature] +// Returns added=true, index if vote was added +// Otherwise returns err=ErrVote[UnexpectedStep|InvalidAccount|InvalidSignature|InvalidBlockHash|ConflictingSignature] +// CONTRACT: if err == nil, added == true // NOTE: vote should not be mutated after adding. -// Returns the validator index of the vote unless error is set. -func (voteSet *VoteSet) Add(address []byte, vote *types.Vote) (bool, uint, error) { +func (voteSet *VoteSet) AddByIndex(valIndex uint, vote *types.Vote) (added bool, index uint, err error) { voteSet.mtx.Lock() defer voteSet.mtx.Unlock() + return voteSet.addByIndex(valIndex, vote) +} + +// Returns added=true, index if vote was added +// Otherwise returns err=ErrVote[UnexpectedStep|InvalidAccount|InvalidSignature|InvalidBlockHash|ConflictingSignature] +// CONTRACT: if err == nil, added == true +// NOTE: vote should not be mutated after adding. +func (voteSet *VoteSet) AddByAddress(address []byte, vote *types.Vote) (added bool, index uint, err error) { + voteSet.mtx.Lock() + defer voteSet.mtx.Unlock() + + // Ensure that signer is a validator. + valIndex, val := voteSet.valSet.GetByAddress(address) + if val == nil { + return false, 0, types.ErrVoteInvalidAccount + } + + return voteSet.addVote(val, valIndex, vote) +} + +func (voteSet *VoteSet) addByIndex(valIndex uint, vote *types.Vote) (bool, uint, error) { + // Ensure that signer is a validator. + _, val := voteSet.valSet.GetByIndex(valIndex) + if val == nil { + return false, 0, types.ErrVoteInvalidAccount + } + + return voteSet.addVote(val, valIndex, vote) +} + +func (voteSet *VoteSet) addVote(val *sm.Validator, valIndex uint, vote *types.Vote) (bool, uint, error) { + // Make sure the step matches. (or that vote is commit && round < voteSet.round) if vote.Height != voteSet.height || (vote.Type != types.VoteTypeCommit && vote.Round != voteSet.round) || @@ -86,22 +118,12 @@ func (voteSet *VoteSet) Add(address []byte, vote *types.Vote) (bool, uint, error return false, 0, types.ErrVoteUnexpectedStep } - // Ensure that signer is a validator. - valIndex, val := voteSet.valSet.GetByAddress(address) - if val == nil { - return false, 0, types.ErrVoteInvalidAccount - } - // Check signature. if !val.PubKey.VerifyBytes(account.SignBytes(config.GetString("chain_id"), vote), vote.Signature) { // Bad signature. return false, 0, types.ErrVoteInvalidSignature } - return voteSet.addVote(valIndex, vote) -} - -func (voteSet *VoteSet) addVote(valIndex uint, vote *types.Vote) (bool, uint, error) { // If vote already exists, return false. if existingVote := voteSet.votes[valIndex]; existingVote != nil { if bytes.Equal(existingVote.BlockHash, vote.BlockHash) { @@ -115,10 +137,6 @@ func (voteSet *VoteSet) addVote(valIndex uint, vote *types.Vote) (bool, uint, er } // Add vote. - _, val := voteSet.valSet.GetByIndex(valIndex) - if val == nil { - panic(fmt.Sprintf("Missing validator for index %v", valIndex)) - } voteSet.votes[valIndex] = vote voteSet.votesBitArray.SetIndex(valIndex, true) blockKey := string(vote.BlockHash) + string(binary.BinaryBytes(vote.BlockParts)) @@ -144,7 +162,7 @@ func (voteSet *VoteSet) AddFromCommits(commits *VoteSet) { continue } if commit.Round < voteSet.round { - voteSet.addVote(uint(valIndex), commit) + voteSet.addByIndex(uint(valIndex), commit) } } } diff --git a/consensus/vote_set_test.go b/consensus/vote_set_test.go index 212fcf665..8cc919f31 100644 --- a/consensus/vote_set_test.go +++ b/consensus/vote_set_test.go @@ -51,7 +51,7 @@ func withBlockParts(vote *types.Vote, blockParts types.PartSetHeader) *types.Vot func signAddVote(privVal *sm.PrivValidator, vote *types.Vote, voteSet *VoteSet) (bool, error) { privVal.SignVoteUnsafe(config.GetString("chain_id"), vote) - added, _, err := voteSet.Add(privVal.Address, vote) + added, _, err := voteSet.AddByAddress(privVal.Address, vote) return added, err } @@ -197,31 +197,31 @@ func TestBadVotes(t *testing.T) { vote := &types.Vote{Height: height, Round: round, Type: types.VoteTypePrevote, BlockHash: nil} added, err := signAddVote(privValidators[0], vote, voteSet) if !added || err != nil { - t.Errorf("Expected Add() to succeed") + t.Errorf("Expected VoteSet.Add to succeed") } // val0 votes again for some block. added, err = signAddVote(privValidators[0], withBlockHash(vote, RandBytes(32)), voteSet) if added || err == nil { - t.Errorf("Expected Add() to fail, dupeout.") + t.Errorf("Expected VoteSet.Add to fail, dupeout.") } // val1 votes on another height added, err = signAddVote(privValidators[1], withHeight(vote, height+1), voteSet) if added { - t.Errorf("Expected Add() to fail, wrong height") + t.Errorf("Expected VoteSet.Add to fail, wrong height") } // val2 votes on another round added, err = signAddVote(privValidators[2], withRound(vote, round+1), voteSet) if added { - t.Errorf("Expected Add() to fail, wrong round") + t.Errorf("Expected VoteSet.Add to fail, wrong round") } // val3 votes of another type. added, err = signAddVote(privValidators[3], withType(vote, types.VoteTypePrecommit), voteSet) if added { - t.Errorf("Expected Add() to fail, wrong type") + t.Errorf("Expected VoteSet.Add to fail, wrong type") } } @@ -243,35 +243,35 @@ func TestAddCommitsToPrevoteVotes(t *testing.T) { vote = &types.Vote{Height: height - 1, Round: round, Type: types.VoteTypeCommit, BlockHash: nil} added, _ := signAddVote(privValidators[6], vote, voteSet) if added { - t.Errorf("Expected Add() to fail, wrong height.") + t.Errorf("Expected VoteSet.Add to fail, wrong height.") } // Attempt to add a commit from val6 at a later round vote = &types.Vote{Height: height, Round: round + 1, Type: types.VoteTypeCommit, BlockHash: nil} added, _ = signAddVote(privValidators[6], vote, voteSet) if added { - t.Errorf("Expected Add() to fail, cannot add future round vote.") + t.Errorf("Expected VoteSet.Add to fail, cannot add future round vote.") } // Attempt to add a commit from val6 for currrent height/round. vote = &types.Vote{Height: height, Round: round, Type: types.VoteTypeCommit, BlockHash: nil} added, err := signAddVote(privValidators[6], vote, voteSet) if added || err == nil { - t.Errorf("Expected Add() to fail, only prior round commits can be added.") + t.Errorf("Expected VoteSet.Add to fail, only prior round commits can be added.") } // Add commit from val6 at a previous round vote = &types.Vote{Height: height, Round: round - 1, Type: types.VoteTypeCommit, BlockHash: nil} added, err = signAddVote(privValidators[6], vote, voteSet) if !added || err != nil { - t.Errorf("Expected Add() to succeed, commit for prior rounds are relevant.") + t.Errorf("Expected VoteSet.Add to succeed, commit for prior rounds are relevant.") } // Also add commit from val7 for previous round. vote = &types.Vote{Height: height, Round: round - 2, Type: types.VoteTypeCommit, BlockHash: nil} added, err = signAddVote(privValidators[7], vote, voteSet) if !added || err != nil { - t.Errorf("Expected Add() to succeed. err: %v", err) + t.Errorf("Expected VoteSet.Add to succeed. err: %v", err) } // We should have 2/3 majority