Browse Source

Initial pass at bft_fix_2 completion

pull/314/head
Jae Kwon 8 years ago
committed by Ethan Buchman
parent
commit
b73a6905a1
5 changed files with 337 additions and 69 deletions
  1. +20
    -0
      consensus/height_vote_set.go
  2. +276
    -61
      consensus/reactor.go
  3. +6
    -0
      types/block.go
  4. +18
    -6
      types/vote.go
  5. +17
    -2
      types/vote_set.go

+ 20
- 0
consensus/height_vote_set.go View File

@ -103,6 +103,9 @@ func (hvs *HeightVoteSet) addRound(round int) {
func (hvs *HeightVoteSet) AddVote(vote *types.Vote, peerKey string) (added bool, err error) {
hvs.mtx.Lock()
defer hvs.mtx.Unlock()
if !types.IsVoteTypeValid(vote.Type) {
return
}
voteSet := hvs.getVoteSet(vote.Round, vote.Type)
if voteSet == nil {
if _, ok := hvs.peerCatchupRounds[peerKey]; !ok {
@ -196,3 +199,20 @@ func (hvs *HeightVoteSet) StringIndented(indent string) string {
indent, strings.Join(vsStrings, "\n"+indent+" "),
indent)
}
// If a peer claims that it has 2/3 majority for given blockKey, call this.
// NOTE: if there are too many peers, or too much peer churn,
// this can cause memory issues.
// TODO: implement ability to remove peers too
func (hvs *HeightVoteSet) SetPeerMaj23(round int, type_ byte, peerID string, blockID types.BlockID) {
hvs.mtx.Lock()
defer hvs.mtx.Unlock()
if !types.IsVoteTypeValid(type_) {
return
}
voteSet := hvs.getVoteSet(round, type_)
if voteSet == nil {
return
}
voteSet.SetPeerMaj23(peerID, blockID)
}

+ 276
- 61
consensus/reactor.go View File

@ -17,12 +17,14 @@ import (
)
const (
StateChannel = byte(0x20)
DataChannel = byte(0x21)
VoteChannel = byte(0x22)
peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes.
StateChannel = byte(0x20)
DataChannel = byte(0x21)
VoteChannel = byte(0x22)
VoteSetBitsChannel = byte(0x23)
peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send.
peerQueryMaj23SleepDuration = 5 * time.Second // Time to sleep after each VoteSetMaj23Message sent
maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes.
)
//-----------------------------------------------------------------------------
@ -101,6 +103,12 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 100,
RecvBufferCapacity: 100 * 100,
},
&p2p.ChannelDescriptor{
ID: VoteSetBitsChannel,
Priority: 1,
SendQueueCapacity: 2,
RecvBufferCapacity: 1024,
},
}
}
@ -114,9 +122,11 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
peerState := NewPeerState(peer)
peer.Data.Set(types.PeerStateKey, peerState)
// Begin gossip routines for this peer.
// Begin routines for this peer.
go conR.gossipDataRoutine(peer, peerState)
go conR.gossipVotesRoutine(peer, peerState)
go conR.queryMaj23Routine(peer, peerState)
go conR.replyMaj23Routine(peer, peerState)
// Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
@ -166,6 +176,15 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
ps.ApplyCommitStepMessage(msg)
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
case *VoteSetMaj23Message:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()
if height == msg.Height {
votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID)
}
ps.ApplyVoteSetMaj23Message(msg)
default:
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
}
@ -209,6 +228,39 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
// don't punish (leave room for soft upgrades)
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
}
case VoteSetBitsChannel:
if conR.fastSync {
log.Warn("Ignoring message received during fastSync", "msg", msg)
return
}
switch msg := msg.(type) {
case *VoteSetBitsMessage:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()
if height == msg.Height {
var ourVotes *BitArray
switch msg.Type {
case types.VoteTypePrevote:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
case types.VoteTypePrecommit:
ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
default:
log.Warn("Bad VoteSetBitsMessage field Type")
return
}
ps.ApplyVoteSetBitsMessage(msg, ourVotes)
} else {
ps.ApplyVoteSetBitsMessage(msg, nil)
}
default:
// don't punish (leave room for soft upgrades)
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
}
default:
log.Warn(Fmt("Unknown chId %X", chID))
}
@ -516,6 +568,131 @@ OUTER_LOOP:
}
}
func (conR *ConsensusReactor) queryMaj23Routine(peer *p2p.Peer, ps *PeerState) {
log := log.New("peer", peer)
OUTER_LOOP:
for {
// Manage disconnects from self or peer.
if !peer.IsRunning() || !conR.IsRunning() {
log.Notice(Fmt("Stopping queryMaj23Routine for %v.", peer))
return
}
// Maybe send Height/Round/Prevotes
{
rs := conR.conS.GetRoundState()
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(DataChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: types.VoteTypePrevote,
BlockID: maj23,
}})
time.Sleep(peerQueryMaj23SleepDuration)
rs = conR.conS.GetRoundState()
prs = ps.GetRoundState()
}
}
}
// Maybe send Height/Round/Precommits
{
rs := conR.conS.GetRoundState()
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(DataChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: types.VoteTypePrecommit,
BlockID: maj23,
}})
time.Sleep(peerQueryMaj23SleepDuration)
}
}
}
// Maybe send Height/Round/ProposalPOL
{
rs := conR.conS.GetRoundState()
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
peer.TrySend(DataChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: types.VoteTypePrevote,
BlockID: maj23,
}})
time.Sleep(peerQueryMaj23SleepDuration)
}
}
}
// Little point sending LastCommitRound/LastCommit,
// These are fleeting and non-blocking.
// Maybe send Height/CatchupCommitRound/CatchupCommit.
{
rs := conR.conS.GetRoundState()
prs := ps.GetRoundState()
if prs.CatchupCommitRound != -1 {
commit := conR.blockStore.LoadBlockCommit(prs.Height)
peer.TrySend(DataChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{
Height: prs.Height,
Round: commit.Round(),
Type: types.VoteTypePrecommit,
BlockID: commit.BlockID,
}})
time.Sleep(peerQueryMaj23SleepDuration)
}
}
continue OUTER_LOOP
}
}
func (conR *ConsensusReactor) replyMaj23Routine(peer *p2p.Peer, ps *PeerState) {
log := log.New("peer", peer)
OUTER_LOOP:
for {
// Manage disconnects from self or peer.
if !peer.IsRunning() || !conR.IsRunning() {
log.Notice(Fmt("Stopping replyMaj23Routine for %v.", peer))
return
}
rs := conR.conS.GetRoundState()
// Process a VoteSetMaj23Message
msg := <-ps.Maj23Queue
if rs.Height == msg.Height {
var ourVotes *BitArray
switch msg.Type {
case types.VoteTypePrevote:
ourVotes = rs.Votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
case types.VoteTypePrecommit:
ourVotes = rs.Votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID)
default:
log.Warn("Bad VoteSetBitsMessage field Type")
return
}
peer.TrySend(VoteSetBitsChannel, struct{ ConsensusMessage }{&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
}})
}
continue OUTER_LOOP
}
}
//-----------------------------------------------------------------------------
// Read only when returned by PeerState.GetRoundState().
@ -549,6 +726,7 @@ type PeerState struct {
mtx sync.Mutex
PeerRoundState
Maj23Queue chan *VoteSetMaj23Message
}
func NewPeerState(peer *p2p.Peer) *PeerState {
@ -560,6 +738,7 @@ func NewPeerState(peer *p2p.Peer) *PeerState {
LastCommitRound: -1,
CatchupCommitRound: -1,
},
Maj23Queue: make(chan *VoteSetMaj23Message, 2),
}
}
@ -650,6 +829,10 @@ func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote
}
func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
PanicSanity("Invalid vote type")
}
if ps.Height == height {
if ps.Round == round {
switch type_ {
@ -657,8 +840,6 @@ func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
return ps.Prevotes
case types.VoteTypePrecommit:
return ps.Precommits
default:
PanicSanity(Fmt("Unexpected vote type %X", type_))
}
}
if ps.CatchupCommitRound == round {
@ -667,8 +848,14 @@ func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
return nil
case types.VoteTypePrecommit:
return ps.CatchupCommit
default:
PanicSanity(Fmt("Unexpected vote type %X", type_))
}
}
if ps.ProposalPOLRound == round {
switch type_ {
case types.VoteTypePrevote:
return ps.ProposalPOL
case types.VoteTypePrecommit:
return nil
}
}
return nil
@ -680,8 +867,6 @@ func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray {
return nil
case types.VoteTypePrecommit:
return ps.LastCommit
default:
PanicSanity(Fmt("Unexpected vote type %X", type_))
}
}
return nil
@ -750,47 +935,10 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) {
func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
log := log.New("peer", ps.Peer, "peerRound", ps.Round, "height", height, "round", round)
if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit {
PanicSanity("Invalid vote type")
}
log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
if ps.Height == height {
if ps.Round == round {
switch type_ {
case types.VoteTypePrevote:
ps.Prevotes.SetIndex(index, true)
log.Debug("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index)
case types.VoteTypePrecommit:
ps.Precommits.SetIndex(index, true)
log.Debug("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index)
}
} else if ps.CatchupCommitRound == round {
switch type_ {
case types.VoteTypePrevote:
case types.VoteTypePrecommit:
ps.CatchupCommit.SetIndex(index, true)
log.Debug("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index)
}
} else if ps.ProposalPOLRound == round {
switch type_ {
case types.VoteTypePrevote:
ps.ProposalPOL.SetIndex(index, true)
log.Debug("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index)
case types.VoteTypePrecommit:
}
}
} else if ps.Height == height+1 {
if ps.LastCommitRound == round {
switch type_ {
case types.VoteTypePrevote:
case types.VoteTypePrecommit:
ps.LastCommit.SetIndex(index, true)
log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
}
}
} else {
// Does not apply.
}
// NOTE: some may be nil BitArrays -> no side effects.
ps.getVoteBitArray(height, round, type_).SetIndex(index, true)
}
func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
@ -858,31 +1006,67 @@ func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
ps.ProposalBlockParts = msg.BlockParts
}
func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.Height != msg.Height {
return
}
if ps.ProposalPOLRound != msg.ProposalPOLRound {
return
}
ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
// TODO: Merge onto existing ps.ProposalPOL?
// We might have sent some prevotes in the meantime.
ps.ProposalPOL = msg.ProposalPOL
}
func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.Height != msg.Height {
return
}
if ps.ProposalPOLRound != msg.ProposalPOLRound {
return
ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
}
// When a peer claims to have a maj23 for some BlockID at H,R,S,
// we will try to respond with a VoteSetBitsMessage showing which
// bits we already have (and which we don't yet have),
// but that happens in another goroutine.
func (ps *PeerState) ApplyVoteSetMaj23Message(msg *VoteSetMaj23Message) {
// ps.mtx.Lock()
// defer ps.mtx.Unlock()
select {
case ps.Maj23Queue <- msg:
default:
// Just ignore if we're already processing messages.
}
}
// TODO: Merge onto existing ps.ProposalPOL?
// We might have sent some prevotes in the meantime.
ps.ProposalPOL = msg.ProposalPOL
// The peer has responded with a bitarray of votes that it has
// of the corresponding BlockID.
// ourVotes: BitArray of votes we have for msg.BlockID
// NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
// we conservatively overwrite ps's votes w/ msg.Votes.
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *BitArray) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type)
if votes != nil {
if ourVotes == nil {
votes.Update(msg.Votes)
} else {
otherVotes := votes.Sub(ourVotes)
hasVotes := otherVotes.Or(msg.Votes)
votes.Update(hasVotes)
}
}
}
//-----------------------------------------------------------------------------
@ -896,6 +1080,8 @@ const (
msgTypeBlockPart = byte(0x13) // both block & POL
msgTypeVote = byte(0x14)
msgTypeHasVote = byte(0x15)
msgTypeVoteSetMaj23 = byte(0x16)
msgTypeVoteSetBits = byte(0x17)
)
type ConsensusMessage interface{}
@ -909,6 +1095,8 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart},
wire.ConcreteType{&VoteMessage{}, msgTypeVote},
wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
wire.ConcreteType{&VoteSetMaj23Message{}, msgTypeVoteSetMaj23},
wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits},
)
// TODO: check for unnecessary extra bytes at the end.
@ -1004,3 +1192,30 @@ type HasVoteMessage struct {
func (m *HasVoteMessage) String() string {
return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
}
//-------------------------------------
type VoteSetMaj23Message struct {
Height int
Round int
Type byte
BlockID types.BlockID
}
func (m *VoteSetMaj23Message) String() string {
return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID)
}
//-------------------------------------
type VoteSetBitsMessage struct {
Height int
Round int
Type byte
BlockID types.BlockID
Votes *BitArray
}
func (m *VoteSetBitsMessage) String() string {
return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
}

+ 6
- 0
types/block.go View File

@ -191,6 +191,7 @@ type Commit struct {
// NOTE: The Precommits are in order of address to preserve the bonded ValidatorSet order.
// Any peer with a block can gossip precommits by index with a peer without recalculating the
// active ValidatorSet.
BlockID BlockID `json:"blockID"`
Precommits []*Vote `json:"precommits"`
// Volatile
@ -262,6 +263,9 @@ func (commit *Commit) IsCommit() bool {
}
func (commit *Commit) ValidateBasic() error {
if commit.BlockID.IsZero() {
return errors.New("Commit cannot be for nil block")
}
if len(commit.Precommits) == 0 {
return errors.New("No precommits in commit")
}
@ -310,8 +314,10 @@ func (commit *Commit) StringIndented(indent string) string {
precommitStrings[i] = precommit.String()
}
return fmt.Sprintf(`Commit{
%s BlockID: %v
%s Precommits: %v
%s}#%X`,
indent, commit.BlockID,
indent, strings.Join(precommitStrings, "\n"+indent+" "),
indent, commit.hash)
}


+ 18
- 6
types/vote.go View File

@ -27,6 +27,24 @@ func (err *ErrVoteConflictingVotes) Error() string {
return "Conflicting votes"
}
// Types of votes
// TODO Make a new type "VoteType"
const (
VoteTypePrevote = byte(0x01)
VoteTypePrecommit = byte(0x02)
)
func IsVoteTypeValid(type_ byte) bool {
switch type_ {
case VoteTypePrevote:
return true
case VoteTypePrecommit:
return true
default:
return false
}
}
// Represents a prevote, precommit, or commit vote from validators for consensus.
type Vote struct {
ValidatorAddress []byte `json:"validator_address"`
@ -38,12 +56,6 @@ type Vote struct {
Signature crypto.SignatureEd25519 `json:"signature"`
}
// Types of votes
const (
VoteTypePrevote = byte(0x01)
VoteTypePrecommit = byte(0x02)
)
func (vote *Vote) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) {
wire.WriteTo([]byte(Fmt(`{"chain_id":"%s"`, chainID)), w, n, err)
wire.WriteTo([]byte(`,"vote":{"block_id":`), w, n, err)


+ 17
- 2
types/vote_set.go View File

@ -317,6 +317,19 @@ func (voteSet *VoteSet) BitArray() *BitArray {
return voteSet.votesBitArray.Copy()
}
func (voteSet *VoteSet) BitArrayByBlockID(blockID BlockID) *BitArray {
if voteSet == nil {
return nil
}
voteSet.mtx.Lock()
defer voteSet.mtx.Unlock()
votesByBlock, ok := voteSet.votesByBlock[blockID.Key()]
if ok {
return votesByBlock.bitArray
}
return nil
}
// NOTE: if validator has conflicting votes, picks random.
func (voteSet *VoteSet) GetByIndex(valIndex int) *Vote {
voteSet.mtx.Lock()
@ -429,9 +442,11 @@ func (voteSet *VoteSet) MakeCommit() *Commit {
}
// For every validator, get the precommit
maj23Votes := voteSet.votesByBlock[voteSet.maj23.Key()]
votesCopy := make([]*Vote, len(voteSet.votes))
copy(votesCopy, voteSet.votes)
return &Commit{
Precommits: maj23Votes.votes,
BlockID: *voteSet.maj23,
Precommits: votesCopy,
}
}


Loading…
Cancel
Save