Browse Source

vote ranks are tracked and used for efficiency.

pull/9/head
Jae Kwon 10 years ago
parent
commit
6a8bb68888
3 changed files with 98 additions and 45 deletions
  1. +69
    -34
      consensus/consensus.go
  2. +5
    -4
      consensus/state.go
  3. +24
    -7
      consensus/vote.go

+ 69
- 34
consensus/consensus.go View File

@ -222,7 +222,7 @@ OUTER_LOOP:
if !ok { if !ok {
break OUTER_LOOP // Client has stopped break OUTER_LOOP // Client has stopped
} }
msg_ := decodeMessage(inMsg.Bytes)
_, msg_ := decodeMessage(inMsg.Bytes)
log.Info("gossipProposalRoutine received %v", msg_) log.Info("gossipProposalRoutine received %v", msg_)
switch msg_.(type) { switch msg_.(type) {
@ -292,7 +292,7 @@ OUTER_LOOP:
if !ok { if !ok {
break OUTER_LOOP // Client has stopped break OUTER_LOOP // Client has stopped
} }
msg_ := decodeMessage(inMsg.Bytes)
_, msg_ := decodeMessage(inMsg.Bytes)
log.Info("knownPartsRoutine received %v", msg_) log.Info("knownPartsRoutine received %v", msg_)
msg, ok := msg_.(*KnownBlockPartsMessage) msg, ok := msg_.(*KnownBlockPartsMessage)
@ -582,7 +582,7 @@ OUTER_LOOP:
if !ok { if !ok {
break // Client has stopped break // Client has stopped
} }
msg_ := decodeMessage(inMsg.Bytes)
type_, msg_ := decodeMessage(inMsg.Bytes)
log.Info("gossipVoteRoutine received %v", msg_) log.Info("gossipVoteRoutine received %v", msg_)
switch msg_.(type) { switch msg_.(type) {
@ -593,7 +593,16 @@ OUTER_LOOP:
continue OUTER_LOOP continue OUTER_LOOP
} }
added, err := rs.AddVote(vote)
added, rank, err := rs.AddVote(vote, inMsg.MConn.Peer.Key)
// Send peer VoteRankMessage if needed
if type_ == msgTypeVoteAskRank {
msg := &VoteRankMessage{
ValidatorId: vote.SignerId,
Rank: rank,
}
inMsg.MConn.Peer.TrySend(VoteCh, msg)
}
// Process vote
if !added { if !added {
log.Info("Error adding vote %v", err) log.Info("Error adding vote %v", err)
} }
@ -615,12 +624,30 @@ OUTER_LOOP:
// Gossip vote. // Gossip vote.
for _, peer := range cm.sw.Peers().List() { for _, peer := range cm.sw.Peers().List() {
peerState := cm.getPeerState(peer) peerState := cm.getPeerState(peer)
if peerState.WantsVote(vote) {
msg := p2p.TypedMessage{msgTypeVote, vote}
peer.TrySend(VoteCh, msg)
wantsVote, unsolicited := peerState.WantsVote(vote)
if wantsVote {
if unsolicited {
// If we're sending an unsolicited vote,
// ask for the rank so we know whether it's good.
msg := p2p.TypedMessage{msgTypeVoteAskRank, vote}
peer.TrySend(VoteCh, msg)
} else {
msg := p2p.TypedMessage{msgTypeVote, vote}
peer.TrySend(VoteCh, msg)
}
} }
} }
case *VoteRankMessage:
msg := msg_.(*VoteRankMessage)
peerState := cm.getPeerState(inMsg.MConn.Peer)
if !peerState.IsConnected() {
// Peer disconnected before we were able to process.
continue OUTER_LOOP
}
peerState.ApplyVoteRankMessage(msg)
default: default:
// Ignore unknown message // Ignore unknown message
// cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage) // cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
@ -737,7 +764,6 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() {
if !commitTime.IsZero() { if !commitTime.IsZero() {
// We already set up ConsensusState for the next height // We already set up ConsensusState for the next height
// (it happens in the call to cm.commitProposal). // (it happens in the call to cm.commitProposal).
// XXX: call cm.cs.SetupHeight()
} else { } else {
// Round is over. This is a special case. // Round is over. This is a special case.
// Prepare a new RoundState for the next state. // Prepare a new RoundState for the next state.
@ -824,39 +850,44 @@ func (ps *PeerState) WantsBlockPart(part *BlockPart) bool {
return false return false
} }
func (ps *PeerState) WantsVote(vote *Vote) bool {
func (ps *PeerState) WantsVote(vote *Vote) (wants bool, unsolicited bool) {
if ps == nil { if ps == nil {
return false
return false, false
} }
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
if !ps.connected { if !ps.connected {
return false
}
// Only wants the vote if voteRank is low.
if ps.voteRanks[vote.SignerId] > voteRankCutoff {
// Sometimes, send unsolicited votes to see if peer wants it.
if rand.Float32() < unsolicitedVoteRate {
// Continue on...
} else {
// Rank too high. Do not send vote.
return false
}
return false, false
} }
// Only wants the vote if peer's current height and round matches. // Only wants the vote if peer's current height and round matches.
if ps.height == vote.Height { if ps.height == vote.Height {
round, _, _, _, elapsedRatio := calcRoundInfo(ps.startTime) round, _, _, _, elapsedRatio := calcRoundInfo(ps.startTime)
if round == vote.Round { if round == vote.Round {
if vote.Type == VoteTypeBare && elapsedRatio > roundDeadlineBare { if vote.Type == VoteTypeBare && elapsedRatio > roundDeadlineBare {
return false
return false, false
} }
if vote.Type == VoteTypePrecommit && elapsedRatio > roundDeadlinePrecommit { if vote.Type == VoteTypePrecommit && elapsedRatio > roundDeadlinePrecommit {
return false
return false, false
} else {
// continue on ...
} }
return true
} else {
return false, false
} }
} else {
return false, false
} }
return false
// Only wants the vote if voteRank is low.
if ps.voteRanks[vote.SignerId] > voteRankCutoff {
// Sometimes, send unsolicited votes to see if peer wants it.
if rand.Float32() < unsolicitedVoteRate {
return true, true
} else {
// Rank too high. Do not send vote.
return false, false
}
}
return true, false
} }
func (ps *PeerState) ApplyKnownBlockPartsMessage(msg *KnownBlockPartsMessage) error { func (ps *PeerState) ApplyKnownBlockPartsMessage(msg *KnownBlockPartsMessage) error {
@ -960,25 +991,30 @@ const (
msgTypeBlockPart = byte(0x10) msgTypeBlockPart = byte(0x10)
msgTypeKnownBlockParts = byte(0x11) msgTypeKnownBlockParts = byte(0x11)
msgTypeVote = byte(0x20) msgTypeVote = byte(0x20)
msgTypeVoteRank = byte(0x21)
msgTypeVoteAskRank = byte(0x21)
msgTypeVoteRank = byte(0x22)
) )
// TODO: check for unnecessary extra bytes at the end. // TODO: check for unnecessary extra bytes at the end.
func decodeMessage(bz []byte) (msg interface{}) {
func decodeMessage(bz []byte) (msgType byte, msg interface{}) {
n, err := new(int64), new(error) n, err := new(int64), new(error)
// log.Debug("decoding msg bytes: %X", bz) // log.Debug("decoding msg bytes: %X", bz)
switch bz[0] {
msgType = bz[0]
switch msgType {
case msgTypeBlockPart: case msgTypeBlockPart:
return readBlockPartMessage(bytes.NewReader(bz[1:]), n, err)
msg = readBlockPartMessage(bytes.NewReader(bz[1:]), n, err)
case msgTypeKnownBlockParts: case msgTypeKnownBlockParts:
return readKnownBlockPartsMessage(bytes.NewReader(bz[1:]), n, err)
msg = readKnownBlockPartsMessage(bytes.NewReader(bz[1:]), n, err)
case msgTypeVote: case msgTypeVote:
return ReadVote(bytes.NewReader(bz[1:]), n, err)
msg = ReadVote(bytes.NewReader(bz[1:]), n, err)
case msgTypeVoteAskRank:
msg = ReadVote(bytes.NewReader(bz[1:]), n, err)
case msgTypeVoteRank: case msgTypeVoteRank:
return readVoteRankMessage(bytes.NewReader(bz[1:]), n, err)
msg = readVoteRankMessage(bytes.NewReader(bz[1:]), n, err)
default: default:
return nil
msg = nil
} }
return
} }
//------------------------------------- //-------------------------------------
@ -1034,7 +1070,6 @@ func (m *KnownBlockPartsMessage) String() string {
//------------------------------------- //-------------------------------------
// XXX use this.
type VoteRankMessage struct { type VoteRankMessage struct {
ValidatorId uint64 ValidatorId uint64
Rank uint8 Rank uint8


+ 5
- 4
consensus/state.go View File

@ -168,14 +168,15 @@ func NewRoundState(height uint32, round uint16, startTime time.Time,
return rs return rs
} }
func (rs *RoundState) AddVote(vote *Vote) (bool, error) {
// "source" is typically the Peer.Key of the peer that gave us this vote.
func (rs *RoundState) AddVote(vote *Vote, source string) (added bool, rank uint8, err error) {
switch vote.Type { switch vote.Type {
case VoteTypeBare: case VoteTypeBare:
return rs.RoundBareVotes.AddVote(vote)
return rs.RoundBareVotes.AddVote(vote, source)
case VoteTypePrecommit: case VoteTypePrecommit:
return rs.RoundPrecommits.AddVote(vote)
return rs.RoundPrecommits.AddVote(vote, source)
case VoteTypeCommit: case VoteTypeCommit:
return rs.Commits.AddVote(vote)
return rs.Commits.AddVote(vote, source)
default: default:
panic("Unknown vote type") panic("Unknown vote type")
} }


+ 24
- 7
consensus/vote.go View File

@ -70,6 +70,7 @@ type VoteSet struct {
type_ byte type_ byte
validators *ValidatorSet validators *ValidatorSet
votes map[uint64]*Vote votes map[uint64]*Vote
votesSources map[uint64][]string
votesByHash map[string]uint64 votesByHash map[string]uint64
totalVotes uint64 totalVotes uint64
totalVotingPower uint64 totalVotingPower uint64
@ -92,6 +93,7 @@ func NewVoteSet(height uint32, round uint16, type_ byte, validators *ValidatorSe
type_: type_, type_: type_,
validators: validators, validators: validators,
votes: make(map[uint64]*Vote, validators.Size()), votes: make(map[uint64]*Vote, validators.Size()),
votesSources: make(map[uint64][]string, validators.Size()),
votesByHash: make(map[string]uint64), votesByHash: make(map[string]uint64),
totalVotes: 0, totalVotes: 0,
totalVotingPower: totalVotingPower, totalVotingPower: totalVotingPower,
@ -100,7 +102,7 @@ func NewVoteSet(height uint32, round uint16, type_ byte, validators *ValidatorSe
// True if added, false if not. // True if added, false if not.
// Returns ErrVote[UnexpectedPhase|InvalidAccount|InvalidSignature|InvalidHash|ConflictingSignature] // Returns ErrVote[UnexpectedPhase|InvalidAccount|InvalidSignature|InvalidHash|ConflictingSignature]
func (vs *VoteSet) AddVote(vote *Vote) (bool, error) {
func (vs *VoteSet) AddVote(vote *Vote, source string) (bool, uint8, error) {
vs.mtx.Lock() vs.mtx.Lock()
defer vs.mtx.Unlock() defer vs.mtx.Unlock()
@ -108,25 +110,40 @@ func (vs *VoteSet) AddVote(vote *Vote) (bool, error) {
if vote.Height != vs.height || if vote.Height != vs.height ||
(vote.Type != VoteTypeCommit && vote.Round != vs.round) || (vote.Type != VoteTypeCommit && vote.Round != vs.round) ||
vote.Type != vs.type_ { vote.Type != vs.type_ {
return false, ErrVoteUnexpectedPhase
return false, 0, ErrVoteUnexpectedPhase
} }
val := vs.validators.Get(vote.SignerId) val := vs.validators.Get(vote.SignerId)
// Ensure that signer is a validator. // Ensure that signer is a validator.
if val == nil { if val == nil {
return false, ErrVoteInvalidAccount
return false, 0, ErrVoteInvalidAccount
} }
// Check signature. // Check signature.
if !val.Verify([]byte(vote.GetDocument()), vote.Signature) { if !val.Verify([]byte(vote.GetDocument()), vote.Signature) {
// Bad signature. // Bad signature.
return false, ErrVoteInvalidSignature
return false, 0, ErrVoteInvalidSignature
}
// Get rank of vote & append provider key
var priorSources = vs.votesSources[vote.SignerId]
var rank = uint8(len(priorSources) + 1)
var alreadyProvidedByPeer = false
for i, otherPeer := range priorSources {
if otherPeer == source {
alreadyProvidedByPeer = true
rank = uint8(i + 1)
}
}
if !alreadyProvidedByPeer {
if len(priorSources) < voteRankCutoff {
vs.votesSources[vote.SignerId] = append(priorSources, source)
}
} }
// If vote already exists, return false. // If vote already exists, return false.
if existingVote, ok := vs.votes[vote.SignerId]; ok { if existingVote, ok := vs.votes[vote.SignerId]; ok {
if bytes.Equal(existingVote.Hash, vote.Hash) { if bytes.Equal(existingVote.Hash, vote.Hash) {
return false, nil
return false, rank, nil
} else { } else {
return false, ErrVoteConflictingSignature
return false, rank, ErrVoteConflictingSignature
} }
} }
vs.votes[vote.SignerId] = vote vs.votes[vote.SignerId] = vote
@ -141,7 +158,7 @@ func (vs *VoteSet) AddVote(vote *Vote) (bool, error) {
(totalHashVotes-val.VotingPower) <= vs.totalVotingPower*2/3 { (totalHashVotes-val.VotingPower) <= vs.totalVotingPower*2/3 {
vs.twoThirdsCommitTime = time.Now() vs.twoThirdsCommitTime = time.Now()
} }
return true, nil
return true, rank, nil
} }
// Returns either a blockhash (or nil) that received +2/3 majority. // Returns either a blockhash (or nil) that received +2/3 majority.


Loading…
Cancel
Save