From b73a6905a1cb0f2f69b850f6e09f5dbe25aac664 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 5 Sep 2016 17:33:02 -0700 Subject: [PATCH] Initial pass at bft_fix_2 completion --- consensus/height_vote_set.go | 20 +++ consensus/reactor.go | 337 ++++++++++++++++++++++++++++------- types/block.go | 6 + types/vote.go | 24 ++- types/vote_set.go | 19 +- 5 files changed, 337 insertions(+), 69 deletions(-) diff --git a/consensus/height_vote_set.go b/consensus/height_vote_set.go index dadc2710f..7ba53b9cf 100644 --- a/consensus/height_vote_set.go +++ b/consensus/height_vote_set.go @@ -103,6 +103,9 @@ func (hvs *HeightVoteSet) addRound(round int) { func (hvs *HeightVoteSet) AddVote(vote *types.Vote, peerKey string) (added bool, err error) { hvs.mtx.Lock() defer hvs.mtx.Unlock() + if !types.IsVoteTypeValid(vote.Type) { + return + } voteSet := hvs.getVoteSet(vote.Round, vote.Type) if voteSet == nil { if _, ok := hvs.peerCatchupRounds[peerKey]; !ok { @@ -196,3 +199,20 @@ func (hvs *HeightVoteSet) StringIndented(indent string) string { indent, strings.Join(vsStrings, "\n"+indent+" "), indent) } + +// If a peer claims that it has 2/3 majority for given blockKey, call this. +// NOTE: if there are too many peers, or too much peer churn, +// this can cause memory issues. +// TODO: implement ability to remove peers too +func (hvs *HeightVoteSet) SetPeerMaj23(round int, type_ byte, peerID string, blockID types.BlockID) { + hvs.mtx.Lock() + defer hvs.mtx.Unlock() + if !types.IsVoteTypeValid(type_) { + return + } + voteSet := hvs.getVoteSet(round, type_) + if voteSet == nil { + return + } + voteSet.SetPeerMaj23(peerID, blockID) +} diff --git a/consensus/reactor.go b/consensus/reactor.go index f6cd25484..468bc32ea 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -17,12 +17,14 @@ import ( ) const ( - StateChannel = byte(0x20) - DataChannel = byte(0x21) - VoteChannel = byte(0x22) - - peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send. - maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes. + StateChannel = byte(0x20) + DataChannel = byte(0x21) + VoteChannel = byte(0x22) + VoteSetBitsChannel = byte(0x23) + + peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send. + peerQueryMaj23SleepDuration = 5 * time.Second // Time to sleep after each VoteSetMaj23Message sent + maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes. ) //----------------------------------------------------------------------------- @@ -101,6 +103,12 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { SendQueueCapacity: 100, RecvBufferCapacity: 100 * 100, }, + &p2p.ChannelDescriptor{ + ID: VoteSetBitsChannel, + Priority: 1, + SendQueueCapacity: 2, + RecvBufferCapacity: 1024, + }, } } @@ -114,9 +122,11 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { peerState := NewPeerState(peer) peer.Data.Set(types.PeerStateKey, peerState) - // Begin gossip routines for this peer. + // Begin routines for this peer. go conR.gossipDataRoutine(peer, peerState) go conR.gossipVotesRoutine(peer, peerState) + go conR.queryMaj23Routine(peer, peerState) + go conR.replyMaj23Routine(peer, peerState) // Send our state to peer. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). @@ -166,6 +176,15 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) ps.ApplyCommitStepMessage(msg) case *HasVoteMessage: ps.ApplyHasVoteMessage(msg) + case *VoteSetMaj23Message: + cs := conR.conS + cs.mtx.Lock() + height, votes := cs.Height, cs.Votes + cs.mtx.Unlock() + if height == msg.Height { + votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID) + } + ps.ApplyVoteSetMaj23Message(msg) default: log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) } @@ -209,6 +228,39 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) // don't punish (leave room for soft upgrades) log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) } + + case VoteSetBitsChannel: + if conR.fastSync { + log.Warn("Ignoring message received during fastSync", "msg", msg) + return + } + switch msg := msg.(type) { + case *VoteSetBitsMessage: + cs := conR.conS + cs.mtx.Lock() + height, votes := cs.Height, cs.Votes + cs.mtx.Unlock() + + if height == msg.Height { + var ourVotes *BitArray + switch msg.Type { + case types.VoteTypePrevote: + ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) + case types.VoteTypePrecommit: + ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID) + default: + log.Warn("Bad VoteSetBitsMessage field Type") + return + } + ps.ApplyVoteSetBitsMessage(msg, ourVotes) + } else { + ps.ApplyVoteSetBitsMessage(msg, nil) + } + default: + // don't punish (leave room for soft upgrades) + log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + } + default: log.Warn(Fmt("Unknown chId %X", chID)) } @@ -516,6 +568,131 @@ OUTER_LOOP: } } +func (conR *ConsensusReactor) queryMaj23Routine(peer *p2p.Peer, ps *PeerState) { + log := log.New("peer", peer) + +OUTER_LOOP: + for { + // Manage disconnects from self or peer. + if !peer.IsRunning() || !conR.IsRunning() { + log.Notice(Fmt("Stopping queryMaj23Routine for %v.", peer)) + return + } + + // Maybe send Height/Round/Prevotes + { + rs := conR.conS.GetRoundState() + prs := ps.GetRoundState() + if rs.Height == prs.Height { + if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok { + peer.TrySend(DataChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{ + Height: prs.Height, + Round: prs.Round, + Type: types.VoteTypePrevote, + BlockID: maj23, + }}) + time.Sleep(peerQueryMaj23SleepDuration) + rs = conR.conS.GetRoundState() + prs = ps.GetRoundState() + } + } + } + + // Maybe send Height/Round/Precommits + { + rs := conR.conS.GetRoundState() + prs := ps.GetRoundState() + if rs.Height == prs.Height { + if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { + peer.TrySend(DataChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{ + Height: prs.Height, + Round: prs.Round, + Type: types.VoteTypePrecommit, + BlockID: maj23, + }}) + time.Sleep(peerQueryMaj23SleepDuration) + } + } + } + + // Maybe send Height/Round/ProposalPOL + { + rs := conR.conS.GetRoundState() + prs := ps.GetRoundState() + if rs.Height == prs.Height { + if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { + peer.TrySend(DataChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{ + Height: prs.Height, + Round: prs.ProposalPOLRound, + Type: types.VoteTypePrevote, + BlockID: maj23, + }}) + time.Sleep(peerQueryMaj23SleepDuration) + } + } + } + + // Little point sending LastCommitRound/LastCommit, + // These are fleeting and non-blocking. + + // Maybe send Height/CatchupCommitRound/CatchupCommit. + { + rs := conR.conS.GetRoundState() + prs := ps.GetRoundState() + if prs.CatchupCommitRound != -1 { + commit := conR.blockStore.LoadBlockCommit(prs.Height) + peer.TrySend(DataChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{ + Height: prs.Height, + Round: commit.Round(), + Type: types.VoteTypePrecommit, + BlockID: commit.BlockID, + }}) + time.Sleep(peerQueryMaj23SleepDuration) + } + } + + continue OUTER_LOOP + } +} + +func (conR *ConsensusReactor) replyMaj23Routine(peer *p2p.Peer, ps *PeerState) { + log := log.New("peer", peer) + +OUTER_LOOP: + for { + // Manage disconnects from self or peer. + if !peer.IsRunning() || !conR.IsRunning() { + log.Notice(Fmt("Stopping replyMaj23Routine for %v.", peer)) + return + } + rs := conR.conS.GetRoundState() + + // Process a VoteSetMaj23Message + msg := <-ps.Maj23Queue + if rs.Height == msg.Height { + var ourVotes *BitArray + switch msg.Type { + case types.VoteTypePrevote: + ourVotes = rs.Votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) + case types.VoteTypePrecommit: + ourVotes = rs.Votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID) + default: + log.Warn("Bad VoteSetBitsMessage field Type") + return + } + peer.TrySend(VoteSetBitsChannel, struct{ ConsensusMessage }{&VoteSetBitsMessage{ + Height: msg.Height, + Round: msg.Round, + Type: msg.Type, + BlockID: msg.BlockID, + Votes: ourVotes, + }}) + } + + continue OUTER_LOOP + } +} + //----------------------------------------------------------------------------- // Read only when returned by PeerState.GetRoundState(). @@ -549,6 +726,7 @@ type PeerState struct { mtx sync.Mutex PeerRoundState + Maj23Queue chan *VoteSetMaj23Message } func NewPeerState(peer *p2p.Peer) *PeerState { @@ -560,6 +738,7 @@ func NewPeerState(peer *p2p.Peer) *PeerState { LastCommitRound: -1, CatchupCommitRound: -1, }, + Maj23Queue: make(chan *VoteSetMaj23Message, 2), } } @@ -650,6 +829,10 @@ func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote } func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray { + if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit { + PanicSanity("Invalid vote type") + } + if ps.Height == height { if ps.Round == round { switch type_ { @@ -657,8 +840,6 @@ func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray { return ps.Prevotes case types.VoteTypePrecommit: return ps.Precommits - default: - PanicSanity(Fmt("Unexpected vote type %X", type_)) } } if ps.CatchupCommitRound == round { @@ -667,8 +848,14 @@ func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray { return nil case types.VoteTypePrecommit: return ps.CatchupCommit - default: - PanicSanity(Fmt("Unexpected vote type %X", type_)) + } + } + if ps.ProposalPOLRound == round { + switch type_ { + case types.VoteTypePrevote: + return ps.ProposalPOL + case types.VoteTypePrecommit: + return nil } } return nil @@ -680,8 +867,6 @@ func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray { return nil case types.VoteTypePrecommit: return ps.LastCommit - default: - PanicSanity(Fmt("Unexpected vote type %X", type_)) } } return nil @@ -750,47 +935,10 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) { func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) { log := log.New("peer", ps.Peer, "peerRound", ps.Round, "height", height, "round", round) - if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit { - PanicSanity("Invalid vote type") - } + log.Info("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index) - if ps.Height == height { - if ps.Round == round { - switch type_ { - case types.VoteTypePrevote: - ps.Prevotes.SetIndex(index, true) - log.Debug("SetHasVote(round-match)", "prevotes", ps.Prevotes, "index", index) - case types.VoteTypePrecommit: - ps.Precommits.SetIndex(index, true) - log.Debug("SetHasVote(round-match)", "precommits", ps.Precommits, "index", index) - } - } else if ps.CatchupCommitRound == round { - switch type_ { - case types.VoteTypePrevote: - case types.VoteTypePrecommit: - ps.CatchupCommit.SetIndex(index, true) - log.Debug("SetHasVote(CatchupCommit)", "precommits", ps.Precommits, "index", index) - } - } else if ps.ProposalPOLRound == round { - switch type_ { - case types.VoteTypePrevote: - ps.ProposalPOL.SetIndex(index, true) - log.Debug("SetHasVote(ProposalPOL)", "prevotes", ps.Prevotes, "index", index) - case types.VoteTypePrecommit: - } - } - } else if ps.Height == height+1 { - if ps.LastCommitRound == round { - switch type_ { - case types.VoteTypePrevote: - case types.VoteTypePrecommit: - ps.LastCommit.SetIndex(index, true) - log.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index) - } - } - } else { - // Does not apply. - } + // NOTE: some may be nil BitArrays -> no side effects. + ps.getVoteBitArray(height, round, type_).SetIndex(index, true) } func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { @@ -858,31 +1006,67 @@ func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) { ps.ProposalBlockParts = msg.BlockParts } -func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) { +func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() if ps.Height != msg.Height { return } + if ps.ProposalPOLRound != msg.ProposalPOLRound { + return + } - ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index) + // TODO: Merge onto existing ps.ProposalPOL? + // We might have sent some prevotes in the meantime. + ps.ProposalPOL = msg.ProposalPOL } -func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) { +func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() if ps.Height != msg.Height { return } - if ps.ProposalPOLRound != msg.ProposalPOLRound { - return + + ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index) +} + +// When a peer claims to have a maj23 for some BlockID at H,R,S, +// we will try to respond with a VoteSetBitsMessage showing which +// bits we already have (and which we don't yet have), +// but that happens in another goroutine. +func (ps *PeerState) ApplyVoteSetMaj23Message(msg *VoteSetMaj23Message) { + // ps.mtx.Lock() + // defer ps.mtx.Unlock() + + select { + case ps.Maj23Queue <- msg: + default: + // Just ignore if we're already processing messages. } +} - // TODO: Merge onto existing ps.ProposalPOL? - // We might have sent some prevotes in the meantime. - ps.ProposalPOL = msg.ProposalPOL +// The peer has responded with a bitarray of votes that it has +// of the corresponding BlockID. +// ourVotes: BitArray of votes we have for msg.BlockID +// NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height), +// we conservatively overwrite ps's votes w/ msg.Votes. +func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *BitArray) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + votes := ps.getVoteBitArray(msg.Height, msg.Round, msg.Type) + if votes != nil { + if ourVotes == nil { + votes.Update(msg.Votes) + } else { + otherVotes := votes.Sub(ourVotes) + hasVotes := otherVotes.Or(msg.Votes) + votes.Update(hasVotes) + } + } } //----------------------------------------------------------------------------- @@ -896,6 +1080,8 @@ const ( msgTypeBlockPart = byte(0x13) // both block & POL msgTypeVote = byte(0x14) msgTypeHasVote = byte(0x15) + msgTypeVoteSetMaj23 = byte(0x16) + msgTypeVoteSetBits = byte(0x17) ) type ConsensusMessage interface{} @@ -909,6 +1095,8 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart}, wire.ConcreteType{&VoteMessage{}, msgTypeVote}, wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote}, + wire.ConcreteType{&VoteSetMaj23Message{}, msgTypeVoteSetMaj23}, + wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits}, ) // TODO: check for unnecessary extra bytes at the end. @@ -1004,3 +1192,30 @@ type HasVoteMessage struct { func (m *HasVoteMessage) String() string { return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index) } + +//------------------------------------- + +type VoteSetMaj23Message struct { + Height int + Round int + Type byte + BlockID types.BlockID +} + +func (m *VoteSetMaj23Message) String() string { + return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID) +} + +//------------------------------------- + +type VoteSetBitsMessage struct { + Height int + Round int + Type byte + BlockID types.BlockID + Votes *BitArray +} + +func (m *VoteSetBitsMessage) String() string { + return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes) +} diff --git a/types/block.go b/types/block.go index 56e85a73c..179c8b77b 100644 --- a/types/block.go +++ b/types/block.go @@ -191,6 +191,7 @@ type Commit struct { // NOTE: The Precommits are in order of address to preserve the bonded ValidatorSet order. // Any peer with a block can gossip precommits by index with a peer without recalculating the // active ValidatorSet. + BlockID BlockID `json:"blockID"` Precommits []*Vote `json:"precommits"` // Volatile @@ -262,6 +263,9 @@ func (commit *Commit) IsCommit() bool { } func (commit *Commit) ValidateBasic() error { + if commit.BlockID.IsZero() { + return errors.New("Commit cannot be for nil block") + } if len(commit.Precommits) == 0 { return errors.New("No precommits in commit") } @@ -310,8 +314,10 @@ func (commit *Commit) StringIndented(indent string) string { precommitStrings[i] = precommit.String() } return fmt.Sprintf(`Commit{ +%s BlockID: %v %s Precommits: %v %s}#%X`, + indent, commit.BlockID, indent, strings.Join(precommitStrings, "\n"+indent+" "), indent, commit.hash) } diff --git a/types/vote.go b/types/vote.go index 4584b749b..92d99f485 100644 --- a/types/vote.go +++ b/types/vote.go @@ -27,6 +27,24 @@ func (err *ErrVoteConflictingVotes) Error() string { return "Conflicting votes" } +// Types of votes +// TODO Make a new type "VoteType" +const ( + VoteTypePrevote = byte(0x01) + VoteTypePrecommit = byte(0x02) +) + +func IsVoteTypeValid(type_ byte) bool { + switch type_ { + case VoteTypePrevote: + return true + case VoteTypePrecommit: + return true + default: + return false + } +} + // Represents a prevote, precommit, or commit vote from validators for consensus. type Vote struct { ValidatorAddress []byte `json:"validator_address"` @@ -38,12 +56,6 @@ type Vote struct { Signature crypto.SignatureEd25519 `json:"signature"` } -// Types of votes -const ( - VoteTypePrevote = byte(0x01) - VoteTypePrecommit = byte(0x02) -) - func (vote *Vote) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) { wire.WriteTo([]byte(Fmt(`{"chain_id":"%s"`, chainID)), w, n, err) wire.WriteTo([]byte(`,"vote":{"block_id":`), w, n, err) diff --git a/types/vote_set.go b/types/vote_set.go index 4ee0588e9..d42301e08 100644 --- a/types/vote_set.go +++ b/types/vote_set.go @@ -317,6 +317,19 @@ func (voteSet *VoteSet) BitArray() *BitArray { return voteSet.votesBitArray.Copy() } +func (voteSet *VoteSet) BitArrayByBlockID(blockID BlockID) *BitArray { + if voteSet == nil { + return nil + } + voteSet.mtx.Lock() + defer voteSet.mtx.Unlock() + votesByBlock, ok := voteSet.votesByBlock[blockID.Key()] + if ok { + return votesByBlock.bitArray + } + return nil +} + // NOTE: if validator has conflicting votes, picks random. func (voteSet *VoteSet) GetByIndex(valIndex int) *Vote { voteSet.mtx.Lock() @@ -429,9 +442,11 @@ func (voteSet *VoteSet) MakeCommit() *Commit { } // For every validator, get the precommit - maj23Votes := voteSet.votesByBlock[voteSet.maj23.Key()] + votesCopy := make([]*Vote, len(voteSet.votes)) + copy(votesCopy, voteSet.votes) return &Commit{ - Precommits: maj23Votes.votes, + BlockID: *voteSet.maj23, + Precommits: votesCopy, } }