|
|
@ -35,14 +35,16 @@ type ConsensusReactor struct { |
|
|
|
stopped uint32 |
|
|
|
quit chan struct{} |
|
|
|
|
|
|
|
conS *ConsensusState |
|
|
|
blockStore *BlockStore |
|
|
|
conS *ConsensusState |
|
|
|
} |
|
|
|
|
|
|
|
func NewConsensusReactor(blockStore *BlockStore, mempoolReactor *mempool.MempoolReactor, state *state.State) *ConsensusReactor { |
|
|
|
conS := NewConsensusState(state, blockStore, mempoolReactor) |
|
|
|
conR := &ConsensusReactor{ |
|
|
|
quit: make(chan struct{}), |
|
|
|
conS: conS, |
|
|
|
blockStore: blockStore, |
|
|
|
quit: make(chan struct{}), |
|
|
|
conS: conS, |
|
|
|
} |
|
|
|
return conR |
|
|
|
} |
|
|
@ -263,13 +265,13 @@ OUTER_LOOP: |
|
|
|
// won't necessarily match, but that's OK.
|
|
|
|
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockParts) { |
|
|
|
log.Debug("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts) |
|
|
|
if index, ok := rs.ProposalBlockParts.BitArray().Sub( |
|
|
|
prs.ProposalBlockBitArray).PickRandom(); ok { |
|
|
|
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockBitArray.Copy()).PickRandom(); ok { |
|
|
|
part := rs.ProposalBlockParts.GetPart(index) |
|
|
|
msg := &PartMessage{ |
|
|
|
Height: rs.Height, |
|
|
|
Round: rs.Round, |
|
|
|
Type: partTypeProposalBlock, |
|
|
|
Part: rs.ProposalBlockParts.GetPart(index), |
|
|
|
Part: part, |
|
|
|
} |
|
|
|
peer.Send(DataCh, msg) |
|
|
|
ps.SetHasProposalBlockPart(rs.Height, rs.Round, index) |
|
|
@ -277,9 +279,46 @@ OUTER_LOOP: |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// If the peer is on a previous height, help catch up.
|
|
|
|
if rs.Height > prs.Height { |
|
|
|
log.Debug("Data catchup", "height", rs.Height, "peerHeight", prs.Height) |
|
|
|
if index, ok := prs.ProposalBlockBitArray.Not().PickRandom(); ok { |
|
|
|
// Ensure that the peer's PartSetHeaeder is correct
|
|
|
|
blockMeta := conR.blockStore.LoadBlockMeta(prs.Height) |
|
|
|
if !blockMeta.Parts.Equals(prs.ProposalBlockParts) { |
|
|
|
log.Debug("Peer ProposalBlockParts mismatch, sleeping", |
|
|
|
"peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts) |
|
|
|
time.Sleep(peerGossipSleepDuration) |
|
|
|
continue OUTER_LOOP |
|
|
|
} |
|
|
|
// Load the part
|
|
|
|
part := conR.blockStore.LoadBlockPart(prs.Height, index) |
|
|
|
if part == nil { |
|
|
|
log.Warn("Could not load part", "index", index, |
|
|
|
"peerHeight", prs.Height, "blockParts", blockMeta.Parts, "peerBlockParts", prs.ProposalBlockParts) |
|
|
|
time.Sleep(peerGossipSleepDuration) |
|
|
|
continue OUTER_LOOP |
|
|
|
} |
|
|
|
// Send the part
|
|
|
|
msg := &PartMessage{ |
|
|
|
Height: prs.Height, |
|
|
|
Round: prs.Round, |
|
|
|
Type: partTypeProposalBlock, |
|
|
|
Part: part, |
|
|
|
} |
|
|
|
peer.Send(DataCh, msg) |
|
|
|
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) |
|
|
|
continue OUTER_LOOP |
|
|
|
} else { |
|
|
|
log.Debug("No parts to send in catch-up, sleeping") |
|
|
|
time.Sleep(peerGossipSleepDuration) |
|
|
|
continue OUTER_LOOP |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// If height and round doesn't match, sleep.
|
|
|
|
if rs.Height != prs.Height || rs.Round != prs.Round { |
|
|
|
log.Debug("Height or Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer) |
|
|
|
log.Debug("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer) |
|
|
|
time.Sleep(peerGossipSleepDuration) |
|
|
|
continue OUTER_LOOP |
|
|
|
} |
|
|
@ -294,8 +333,7 @@ OUTER_LOOP: |
|
|
|
|
|
|
|
// Send proposal POL parts?
|
|
|
|
if rs.ProposalPOLParts.HasHeader(prs.ProposalPOLParts) { |
|
|
|
if index, ok := rs.ProposalPOLParts.BitArray().Sub( |
|
|
|
prs.ProposalPOLBitArray).PickRandom(); ok { |
|
|
|
if index, ok := rs.ProposalPOLParts.BitArray().Sub(prs.ProposalPOLBitArray.Copy()).PickRandom(); ok { |
|
|
|
msg := &PartMessage{ |
|
|
|
Height: rs.Height, |
|
|
|
Round: rs.Round, |
|
|
@ -327,12 +365,7 @@ OUTER_LOOP: |
|
|
|
|
|
|
|
trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) { |
|
|
|
// TODO: give priority to our vote.
|
|
|
|
// peerVoteSet BitArray is being accessed concurrently with
|
|
|
|
// writes from Receive() routines. We must lock like so here:
|
|
|
|
ps.mtx.Lock() |
|
|
|
index, ok := voteSet.BitArray().Sub(peerVoteSet).PickRandom() |
|
|
|
ps.mtx.Unlock() |
|
|
|
if ok { |
|
|
|
if index, ok := voteSet.BitArray().Sub(peerVoteSet.Copy()).PickRandom(); ok { |
|
|
|
vote := voteSet.GetByIndex(index) |
|
|
|
// NOTE: vote may be a commit.
|
|
|
|
msg := &VoteMessage{index, vote} |
|
|
@ -392,8 +425,7 @@ OUTER_LOOP: |
|
|
|
} |
|
|
|
|
|
|
|
// If peer is lagging by more than 1, load and send Validation and send Commits.
|
|
|
|
if prs.Height != 0 && !prs.HasAllValidationCommits && |
|
|
|
rs.Height >= prs.Height+2 { |
|
|
|
if prs.Height != 0 && !prs.HasAllCatchupCommits && rs.Height >= prs.Height+2 { |
|
|
|
|
|
|
|
// Load the block header and validation for prs.Height+1,
|
|
|
|
// which contains commit signatures for prs.Height.
|
|
|
@ -404,8 +436,7 @@ OUTER_LOOP: |
|
|
|
// Initialize Commits if needed
|
|
|
|
ps.EnsureVoteBitArrays(prs.Height, size) |
|
|
|
|
|
|
|
index, ok := validation.BitArray().Sub(prs.Commits).PickRandom() |
|
|
|
if ok { |
|
|
|
if index, ok := validation.BitArray().Sub(prs.Commits.Copy()).PickRandom(); ok { |
|
|
|
commit := validation.Commits[index] |
|
|
|
log.Debug("Picked commit to send", "index", index, "commit", commit) |
|
|
|
// Reconstruct vote.
|
|
|
@ -423,7 +454,7 @@ OUTER_LOOP: |
|
|
|
continue OUTER_LOOP |
|
|
|
} else { |
|
|
|
log.Debug("No commits to send", "ours", validation.BitArray(), "theirs", prs.Commits) |
|
|
|
ps.SetHasAllValidationCommits(prs.Height) |
|
|
|
ps.SetHasAllCatchupCommits(prs.Height) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -437,20 +468,20 @@ OUTER_LOOP: |
|
|
|
|
|
|
|
// Read only when returned by PeerState.GetRoundState().
|
|
|
|
type PeerRoundState struct { |
|
|
|
Height uint // Height peer is at
|
|
|
|
Round uint // Round peer is at
|
|
|
|
Step RoundStep // Step peer is at
|
|
|
|
StartTime time.Time // Estimated start of round 0 at this height
|
|
|
|
Proposal bool // True if peer has proposal for this round
|
|
|
|
ProposalBlockParts PartSetHeader //
|
|
|
|
ProposalBlockBitArray BitArray // True bit -> has part
|
|
|
|
ProposalPOLParts PartSetHeader //
|
|
|
|
ProposalPOLBitArray BitArray // True bit -> has part
|
|
|
|
Prevotes BitArray // All votes peer has for this round
|
|
|
|
Precommits BitArray // All precommits peer has for this round
|
|
|
|
Commits BitArray // All commits peer has for this height
|
|
|
|
HasAllValidationCommits bool // Used for catch-up
|
|
|
|
LastCommits BitArray // All commits peer has for last height
|
|
|
|
Height uint // Height peer is at
|
|
|
|
Round uint // Round peer is at
|
|
|
|
Step RoundStep // Step peer is at
|
|
|
|
StartTime time.Time // Estimated start of round 0 at this height
|
|
|
|
Proposal bool // True if peer has proposal for this round
|
|
|
|
ProposalBlockParts PartSetHeader //
|
|
|
|
ProposalBlockBitArray BitArray // True bit -> has part
|
|
|
|
ProposalPOLParts PartSetHeader //
|
|
|
|
ProposalPOLBitArray BitArray // True bit -> has part
|
|
|
|
Prevotes BitArray // All votes peer has for this round
|
|
|
|
Precommits BitArray // All precommits peer has for this round
|
|
|
|
Commits BitArray // All commits peer has for this height
|
|
|
|
LastCommits BitArray // All commits peer has for last height
|
|
|
|
HasAllCatchupCommits bool // Used for catch-up
|
|
|
|
} |
|
|
|
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
@ -474,6 +505,7 @@ func NewPeerState(peer *p2p.Peer) *PeerState { |
|
|
|
func (ps *PeerState) GetRoundState() *PeerRoundState { |
|
|
|
ps.mtx.Lock() |
|
|
|
defer ps.mtx.Unlock() |
|
|
|
|
|
|
|
prs := ps.PeerRoundState // copy
|
|
|
|
return &prs |
|
|
|
} |
|
|
@ -540,6 +572,7 @@ func (ps *PeerState) EnsureVoteBitArrays(height uint, numValidators uint) { |
|
|
|
func (ps *PeerState) SetHasVote(vote *Vote, index uint) { |
|
|
|
ps.mtx.Lock() |
|
|
|
defer ps.mtx.Unlock() |
|
|
|
|
|
|
|
ps.setHasVote(vote.Height, vote.Round, vote.Type, index) |
|
|
|
} |
|
|
|
|
|
|
@ -571,11 +604,12 @@ func (ps *PeerState) setHasVote(height uint, round uint, type_ byte, index uint) |
|
|
|
|
|
|
|
// When catching up, this helps keep track of whether
|
|
|
|
// we should send more commit votes from the block (validation) store
|
|
|
|
func (ps *PeerState) SetHasAllValidationCommits(height uint) { |
|
|
|
func (ps *PeerState) SetHasAllCatchupCommits(height uint) { |
|
|
|
ps.mtx.Lock() |
|
|
|
defer ps.mtx.Unlock() |
|
|
|
|
|
|
|
if ps.Height == height { |
|
|
|
ps.HasAllValidationCommits = true |
|
|
|
ps.HasAllCatchupCommits = true |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -612,7 +646,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *Roun |
|
|
|
} |
|
|
|
// We'll update the BitArray capacity later.
|
|
|
|
ps.Commits = BitArray{} |
|
|
|
ps.HasAllValidationCommits = false |
|
|
|
ps.HasAllCatchupCommits = false |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|