From 6a8bb68888d9084a22546ab2424f258914861f4b Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 8 Sep 2014 16:18:50 -0700 Subject: [PATCH] vote ranks are tracked and used for efficiency. --- consensus/consensus.go | 103 +++++++++++++++++++++++++++-------------- consensus/state.go | 9 ++-- consensus/vote.go | 31 ++++++++++--- 3 files changed, 98 insertions(+), 45 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index f578e73be..0aa0c245e 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -222,7 +222,7 @@ OUTER_LOOP: if !ok { break OUTER_LOOP // Client has stopped } - msg_ := decodeMessage(inMsg.Bytes) + _, msg_ := decodeMessage(inMsg.Bytes) log.Info("gossipProposalRoutine received %v", msg_) switch msg_.(type) { @@ -292,7 +292,7 @@ OUTER_LOOP: if !ok { break OUTER_LOOP // Client has stopped } - msg_ := decodeMessage(inMsg.Bytes) + _, msg_ := decodeMessage(inMsg.Bytes) log.Info("knownPartsRoutine received %v", msg_) msg, ok := msg_.(*KnownBlockPartsMessage) @@ -582,7 +582,7 @@ OUTER_LOOP: if !ok { break // Client has stopped } - msg_ := decodeMessage(inMsg.Bytes) + type_, msg_ := decodeMessage(inMsg.Bytes) log.Info("gossipVoteRoutine received %v", msg_) switch msg_.(type) { @@ -593,7 +593,16 @@ OUTER_LOOP: continue OUTER_LOOP } - added, err := rs.AddVote(vote) + added, rank, err := rs.AddVote(vote, inMsg.MConn.Peer.Key) + // Send peer VoteRankMessage if needed + if type_ == msgTypeVoteAskRank { + msg := &VoteRankMessage{ + ValidatorId: vote.SignerId, + Rank: rank, + } + inMsg.MConn.Peer.TrySend(VoteCh, msg) + } + // Process vote if !added { log.Info("Error adding vote %v", err) } @@ -615,12 +624,30 @@ OUTER_LOOP: // Gossip vote. for _, peer := range cm.sw.Peers().List() { peerState := cm.getPeerState(peer) - if peerState.WantsVote(vote) { - msg := p2p.TypedMessage{msgTypeVote, vote} - peer.TrySend(VoteCh, msg) + wantsVote, unsolicited := peerState.WantsVote(vote) + if wantsVote { + if unsolicited { + // If we're sending an unsolicited vote, + // ask for the rank so we know whether it's good. + msg := p2p.TypedMessage{msgTypeVoteAskRank, vote} + peer.TrySend(VoteCh, msg) + } else { + msg := p2p.TypedMessage{msgTypeVote, vote} + peer.TrySend(VoteCh, msg) + } } } + case *VoteRankMessage: + msg := msg_.(*VoteRankMessage) + + peerState := cm.getPeerState(inMsg.MConn.Peer) + if !peerState.IsConnected() { + // Peer disconnected before we were able to process. + continue OUTER_LOOP + } + peerState.ApplyVoteRankMessage(msg) + default: // Ignore unknown message // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) @@ -737,7 +764,6 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() { if !commitTime.IsZero() { // We already set up ConsensusState for the next height // (it happens in the call to cm.commitProposal). - // XXX: call cm.cs.SetupHeight() } else { // Round is over. This is a special case. // Prepare a new RoundState for the next state. @@ -824,39 +850,44 @@ func (ps *PeerState) WantsBlockPart(part *BlockPart) bool { return false } -func (ps *PeerState) WantsVote(vote *Vote) bool { +func (ps *PeerState) WantsVote(vote *Vote) (wants bool, unsolicited bool) { if ps == nil { - return false + return false, false } ps.mtx.Lock() defer ps.mtx.Unlock() if !ps.connected { - return false - } - // Only wants the vote if voteRank is low. - if ps.voteRanks[vote.SignerId] > voteRankCutoff { - // Sometimes, send unsolicited votes to see if peer wants it. - if rand.Float32() < unsolicitedVoteRate { - // Continue on... - } else { - // Rank too high. Do not send vote. - return false - } + return false, false } // Only wants the vote if peer's current height and round matches. if ps.height == vote.Height { round, _, _, _, elapsedRatio := calcRoundInfo(ps.startTime) if round == vote.Round { if vote.Type == VoteTypeBare && elapsedRatio > roundDeadlineBare { - return false + return false, false } if vote.Type == VoteTypePrecommit && elapsedRatio > roundDeadlinePrecommit { - return false + return false, false + } else { + // continue on ... } - return true + } else { + return false, false } + } else { + return false, false } - return false + // Only wants the vote if voteRank is low. + if ps.voteRanks[vote.SignerId] > voteRankCutoff { + // Sometimes, send unsolicited votes to see if peer wants it. + if rand.Float32() < unsolicitedVoteRate { + return true, true + } else { + // Rank too high. Do not send vote. + return false, false + } + } + return true, false } func (ps *PeerState) ApplyKnownBlockPartsMessage(msg *KnownBlockPartsMessage) error { @@ -960,25 +991,30 @@ const ( msgTypeBlockPart = byte(0x10) msgTypeKnownBlockParts = byte(0x11) msgTypeVote = byte(0x20) - msgTypeVoteRank = byte(0x21) + msgTypeVoteAskRank = byte(0x21) + msgTypeVoteRank = byte(0x22) ) // TODO: check for unnecessary extra bytes at the end. -func decodeMessage(bz []byte) (msg interface{}) { +func decodeMessage(bz []byte) (msgType byte, msg interface{}) { n, err := new(int64), new(error) // log.Debug("decoding msg bytes: %X", bz) - switch bz[0] { + msgType = bz[0] + switch msgType { case msgTypeBlockPart: - return readBlockPartMessage(bytes.NewReader(bz[1:]), n, err) + msg = readBlockPartMessage(bytes.NewReader(bz[1:]), n, err) case msgTypeKnownBlockParts: - return readKnownBlockPartsMessage(bytes.NewReader(bz[1:]), n, err) + msg = readKnownBlockPartsMessage(bytes.NewReader(bz[1:]), n, err) case msgTypeVote: - return ReadVote(bytes.NewReader(bz[1:]), n, err) + msg = ReadVote(bytes.NewReader(bz[1:]), n, err) + case msgTypeVoteAskRank: + msg = ReadVote(bytes.NewReader(bz[1:]), n, err) case msgTypeVoteRank: - return readVoteRankMessage(bytes.NewReader(bz[1:]), n, err) + msg = readVoteRankMessage(bytes.NewReader(bz[1:]), n, err) default: - return nil + msg = nil } + return } //------------------------------------- @@ -1034,7 +1070,6 @@ func (m *KnownBlockPartsMessage) String() string { //------------------------------------- -// XXX use this. type VoteRankMessage struct { ValidatorId uint64 Rank uint8 diff --git a/consensus/state.go b/consensus/state.go index 83c31cc47..fbc80d39f 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -168,14 +168,15 @@ func NewRoundState(height uint32, round uint16, startTime time.Time, return rs } -func (rs *RoundState) AddVote(vote *Vote) (bool, error) { +// "source" is typically the Peer.Key of the peer that gave us this vote. +func (rs *RoundState) AddVote(vote *Vote, source string) (added bool, rank uint8, err error) { switch vote.Type { case VoteTypeBare: - return rs.RoundBareVotes.AddVote(vote) + return rs.RoundBareVotes.AddVote(vote, source) case VoteTypePrecommit: - return rs.RoundPrecommits.AddVote(vote) + return rs.RoundPrecommits.AddVote(vote, source) case VoteTypeCommit: - return rs.Commits.AddVote(vote) + return rs.Commits.AddVote(vote, source) default: panic("Unknown vote type") } diff --git a/consensus/vote.go b/consensus/vote.go index db0add109..774134e1b 100644 --- a/consensus/vote.go +++ b/consensus/vote.go @@ -70,6 +70,7 @@ type VoteSet struct { type_ byte validators *ValidatorSet votes map[uint64]*Vote + votesSources map[uint64][]string votesByHash map[string]uint64 totalVotes uint64 totalVotingPower uint64 @@ -92,6 +93,7 @@ func NewVoteSet(height uint32, round uint16, type_ byte, validators *ValidatorSe type_: type_, validators: validators, votes: make(map[uint64]*Vote, validators.Size()), + votesSources: make(map[uint64][]string, validators.Size()), votesByHash: make(map[string]uint64), totalVotes: 0, totalVotingPower: totalVotingPower, @@ -100,7 +102,7 @@ func NewVoteSet(height uint32, round uint16, type_ byte, validators *ValidatorSe // True if added, false if not. // Returns ErrVote[UnexpectedPhase|InvalidAccount|InvalidSignature|InvalidHash|ConflictingSignature] -func (vs *VoteSet) AddVote(vote *Vote) (bool, error) { +func (vs *VoteSet) AddVote(vote *Vote, source string) (bool, uint8, error) { vs.mtx.Lock() defer vs.mtx.Unlock() @@ -108,25 +110,40 @@ func (vs *VoteSet) AddVote(vote *Vote) (bool, error) { if vote.Height != vs.height || (vote.Type != VoteTypeCommit && vote.Round != vs.round) || vote.Type != vs.type_ { - return false, ErrVoteUnexpectedPhase + return false, 0, ErrVoteUnexpectedPhase } val := vs.validators.Get(vote.SignerId) // Ensure that signer is a validator. if val == nil { - return false, ErrVoteInvalidAccount + return false, 0, ErrVoteInvalidAccount } // Check signature. if !val.Verify([]byte(vote.GetDocument()), vote.Signature) { // Bad signature. - return false, ErrVoteInvalidSignature + return false, 0, ErrVoteInvalidSignature + } + // Get rank of vote & append provider key + var priorSources = vs.votesSources[vote.SignerId] + var rank = uint8(len(priorSources) + 1) + var alreadyProvidedByPeer = false + for i, otherPeer := range priorSources { + if otherPeer == source { + alreadyProvidedByPeer = true + rank = uint8(i + 1) + } + } + if !alreadyProvidedByPeer { + if len(priorSources) < voteRankCutoff { + vs.votesSources[vote.SignerId] = append(priorSources, source) + } } // If vote already exists, return false. if existingVote, ok := vs.votes[vote.SignerId]; ok { if bytes.Equal(existingVote.Hash, vote.Hash) { - return false, nil + return false, rank, nil } else { - return false, ErrVoteConflictingSignature + return false, rank, ErrVoteConflictingSignature } } vs.votes[vote.SignerId] = vote @@ -141,7 +158,7 @@ func (vs *VoteSet) AddVote(vote *Vote) (bool, error) { (totalHashVotes-val.VotingPower) <= vs.totalVotingPower*2/3 { vs.twoThirdsCommitTime = time.Now() } - return true, nil + return true, rank, nil } // Returns either a blockhash (or nil) that received +2/3 majority.