From 5d1bdc6b1ac036c9761536697ba3f56a913c31eb Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sat, 1 Nov 2014 22:42:04 -0700 Subject: [PATCH] Don't send bitarrays, just send height/round/type/index. --- blocks/block.go | 40 +++--- blocks/block_test.go | 6 +- blocks/store.go | 37 ++++-- common/bit_array.go | 4 + consensus/reactor.go | 247 +++++++++++++++++++++---------------- consensus/state.go | 56 ++++++--- consensus/vote_set.go | 26 ++-- consensus/vote_set_test.go | 20 +-- 8 files changed, 268 insertions(+), 168 deletions(-) diff --git a/blocks/block.go b/blocks/block.go index a73074ce9..253cececf 100644 --- a/blocks/block.go +++ b/blocks/block.go @@ -10,14 +10,15 @@ import ( "time" . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/merkle" ) type Block struct { - Header - Validation - Data + *Header + *Validation + *Data // Volatile hash []byte @@ -32,9 +33,9 @@ func ReadBlock(r io.Reader, n *int64, err *error) *Block { } func (b *Block) WriteTo(w io.Writer) (n int64, err error) { - WriteBinary(w, &b.Header, &n, &err) - WriteBinary(w, &b.Validation, &n, &err) - WriteBinary(w, &b.Data, &n, &err) + WriteBinary(w, b.Header, &n, &err) + WriteBinary(w, b.Validation, &n, &err) + WriteBinary(w, b.Data, &n, &err) return } @@ -125,11 +126,11 @@ type Header struct { hash []byte } -func ReadHeader(r io.Reader, n *int64, err *error) (h Header) { +func ReadHeader(r io.Reader, n *int64, err *error) *Header { if *err != nil { - return Header{} + return nil } - return Header{ + return &Header{ Network: ReadString(r, n, err), Height: ReadUInt32(r, n, err), Time: ReadTime(r, n, err), @@ -189,11 +190,12 @@ type Validation struct { Commits []RoundSignature // Volatile - hash []byte + hash []byte + bitArray BitArray } -func ReadValidation(r io.Reader, n *int64, err *error) Validation { - return Validation{ +func ReadValidation(r io.Reader, n *int64, err *error) *Validation { + return &Validation{ Commits: ReadRoundSignatures(r, n, err), } } @@ -226,6 +228,16 @@ func (v *Validation) StringWithIndent(indent string) string { indent, v.hash) } +func (v *Validation) BitArray() BitArray { + if v.bitArray.IsZero() { + v.bitArray = NewBitArray(uint(len(v.Commits))) + for i, rsig := range v.Commits { + v.bitArray.SetIndex(uint(i), !rsig.IsZero()) + } + } + return v.bitArray +} + //----------------------------------------------------------------------------- type Data struct { @@ -235,13 +247,13 @@ type Data struct { hash []byte } -func ReadData(r io.Reader, n *int64, err *error) Data { +func ReadData(r io.Reader, n *int64, err *error) *Data { numTxs := ReadUInt32(r, n, err) txs := make([]Tx, 0, numTxs) for i := uint32(0); i < numTxs; i++ { txs = append(txs, ReadTx(r, n, err)) } - return Data{Txs: txs} + return &Data{Txs: txs} } func (data *Data) WriteTo(w io.Writer) (n int64, err error) { diff --git a/blocks/block_test.go b/blocks/block_test.go index 19be47c12..15723c876 100644 --- a/blocks/block_test.go +++ b/blocks/block_test.go @@ -59,7 +59,7 @@ func randBlock() *Block { // Block block := &Block{ - Header: Header{ + Header: &Header{ Network: "Tendermint", Height: RandUInt32Exp(), Fees: RandUInt64Exp(), @@ -67,10 +67,10 @@ func randBlock() *Block { LastBlockHash: RandBytes(32), StateHash: RandBytes(32), }, - Validation: Validation{ + Validation: &Validation{ Commits: []RoundSignature{randRoundSig(), randRoundSig()}, }, - Data: Data{ + Data: &Data{ Txs: []Tx{sendTx, nameTx, bondTx, unbondTx, dupeoutTx}, }, } diff --git a/blocks/store.go b/blocks/store.go index 6f89369b1..060623934 100644 --- a/blocks/store.go +++ b/blocks/store.go @@ -71,6 +71,16 @@ func (bs *BlockStore) LoadBlockPart(height uint32, index uint16) *Part { return part } +func (bs *BlockStore) LoadBlockMeta(height uint32) *BlockMeta { + var n int64 + var err error + meta := ReadBlockMeta(bs.GetReader(calcBlockMetaKey(height)), &n, &err) + if err != nil { + Panicf("Error reading block meta: %v", err) + } + return meta +} + func (bs *BlockStore) LoadBlockValidation(height uint32) *Validation { var n int64 var err error @@ -78,7 +88,7 @@ func (bs *BlockStore) LoadBlockValidation(height uint32) *Validation { if err != nil { Panicf("Error reading validation: %v", err) } - return &validation + return validation } func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet) { @@ -89,7 +99,11 @@ func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet) { if !blockParts.IsComplete() { Panicf("BlockStore can only save complete block part sets") } - meta := BlockMeta{Hash: block.Hash(), Parts: blockParts.Header()} + meta := &BlockMeta{ + Hash: block.Hash(), + Parts: blockParts.Header(), + Header: block.Header, + } // Save block meta metaBytes := BinaryBytes(meta) bs.db.Set(calcBlockMetaKey(height), metaBytes) @@ -98,7 +112,7 @@ func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet) { bs.saveBlockPart(height, i, blockParts.GetPart(i)) } // Save block validation (duplicate and separate) - validationBytes := BinaryBytes(&block.Validation) + validationBytes := BinaryBytes(block.Validation) bs.db.Set(calcBlockValidationKey(height), validationBytes) // Save new BlockStoreJSON descriptor BlockStoreJSON{Height: height}.Save(bs.db) @@ -115,20 +129,23 @@ func (bs *BlockStore) saveBlockPart(height uint32, index uint16, part *Part) { //----------------------------------------------------------------------------- type BlockMeta struct { - Hash []byte - Parts PartSetHeader + Hash []byte // The BlockHash + Parts PartSetHeader // The PartSetHeader, for transfer + Header *Header // The block's Header } -func ReadBlockMeta(r io.Reader, n *int64, err *error) BlockMeta { - return BlockMeta{ - Hash: ReadByteSlice(r, n, err), - Parts: ReadPartSetHeader(r, n, err), +func ReadBlockMeta(r io.Reader, n *int64, err *error) *BlockMeta { + return &BlockMeta{ + Hash: ReadByteSlice(r, n, err), + Parts: ReadPartSetHeader(r, n, err), + Header: ReadHeader(r, n, err), } } -func (bm BlockMeta) WriteTo(w io.Writer) (n int64, err error) { +func (bm *BlockMeta) WriteTo(w io.Writer) (n int64, err error) { WriteByteSlice(w, bm.Hash, &n, &err) WriteBinary(w, bm.Parts, &n, &err) + WriteBinary(w, bm.Header, &n, &err) return } diff --git a/common/bit_array.go b/common/bit_array.go index 667cf3e6f..3d46184f0 100644 --- a/common/bit_array.go +++ b/common/bit_array.go @@ -60,6 +60,10 @@ func (bA BitArray) WriteTo(w io.Writer) (n int64, err error) { return } +func (bA BitArray) Size() uint { + return bA.bits +} + func (bA BitArray) IsZero() bool { return bA.bits == 0 } diff --git a/consensus/reactor.go b/consensus/reactor.go index eee180cf7..70761364a 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -25,7 +25,6 @@ const ( peerStateKey = "ConsensusReactor.peerState" peerGossipSleepDuration = 50 * time.Millisecond // Time to sleep if there's nothing to send. - hasVotesThreshold = 50 // After this many new votes we'll send a HasVotesMessage. ) //----------------------------------------------------------------------------- @@ -113,7 +112,6 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte rs := conR.conS.GetRoundState() ps := peer.Data.Get(peerStateKey).(*PeerState) _, msg_ := decodeMessage(msgBytes) - voteAddCounter := 0 var err error = nil log.Debug("[%X][%v] Receive: %v", chId, peer, msg_) @@ -129,9 +127,9 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte msg := msg_.(*CommitMessage) ps.ApplyCommitMessage(msg) - case *HasVotesMessage: - msg := msg_.(*HasVotesMessage) - ps.ApplyHasVotesMessage(msg) + case *HasVoteMessage: + msg := msg_.(*HasVoteMessage) + ps.ApplyHasVoteMessage(msg) default: // Ignore unknown message @@ -164,36 +162,21 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte switch msg_.(type) { case *Vote: vote := msg_.(*Vote) - // We can't deal with votes from another height, - // as they have a different validator set. - if vote.Height != rs.Height || vote.Height != ps.Height { - return - } - index, val := rs.Validators.GetById(vote.SignerId) - if val == nil { - log.Warning("Peer gave us an invalid vote.") - return - } - ps.EnsureVoteBitArrays(rs.Height, rs.Round, rs.Validators.Size()) - ps.SetHasVote(rs.Height, rs.Round, index, vote) - added, err := conR.conS.AddVote(vote) + added, index, err := conR.conS.AddVote(vote) if err != nil { log.Warning("Error attempting to add vote: %v", err) } + // Initialize Prevotes/Precommits/Commits if needed + ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size()) + ps.SetHasVote(vote, index) if added { - // Maybe send HasVotesMessage - // TODO optimize. It would be better to just acks for each vote! - voteAddCounter++ - if voteAddCounter%hasVotesThreshold == 0 { - msg := &HasVotesMessage{ - Height: rs.Height, - Round: rs.Round, - Prevotes: rs.Prevotes.BitArray(), - Precommits: rs.Precommits.BitArray(), - Commits: rs.Commits.BitArray(), - } - conR.sw.Broadcast(StateCh, msg) + msg := &HasVoteMessage{ + Height: vote.Height, + Round: vote.Round, + Type: vote.Type, + Index: index, } + conR.sw.Broadcast(StateCh, msg) } default: @@ -320,7 +303,6 @@ OUTER_LOOP: } } -// XXX Need to also send commits for LastComits. func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) { OUTER_LOOP: for { @@ -332,15 +314,6 @@ OUTER_LOOP: rs := conR.conS.GetRoundState() prs := ps.GetRoundState() - // If height doesn't match, sleep. - if rs.Height != prs.Height { - time.Sleep(peerGossipSleepDuration) - continue OUTER_LOOP - } - - // Ensure that peer's prevote/precommit/commit bitarrays of of sufficient capacity - ps.EnsureVoteBitArrays(rs.Height, rs.Round, rs.Validators.Size()) - trySendVote := func(voteSet *VoteSet, peerVoteSet BitArray) (sent bool) { // TODO: give priority to our vote. index, ok := voteSet.BitArray().Sub(peerVoteSet).PickRandom() @@ -349,29 +322,88 @@ OUTER_LOOP: // NOTE: vote may be a commit. msg := p2p.TypedMessage{msgTypeVote, vote} peer.Send(VoteCh, msg) - ps.SetHasVote(rs.Height, rs.Round, index, vote) + ps.SetHasVote(vote, index) return true } return false } - // If there are prevotes to send... - if rs.Round == prs.Round && prs.Step <= RoundStepPrevote { - if trySendVote(rs.Prevotes, prs.Prevotes) { + // If height matches, then send LastCommits, Prevotes, Precommits, or Commits. + if rs.Height == prs.Height { + + // If there are lastcommits to send... + if prs.Round == 0 && prs.Step == RoundStepNewHeight { + if prs.LastCommits.Size() == rs.LastCommits.Size() { + if trySendVote(rs.LastCommits, prs.LastCommits) { + continue OUTER_LOOP + } + } + } + + // Initialize Prevotes/Precommits/Commits if needed + ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size()) + + // If there are prevotes to send... + if rs.Round == prs.Round && prs.Step <= RoundStepPrevote { + if trySendVote(rs.Prevotes, prs.Prevotes) { + continue OUTER_LOOP + } + } + + // If there are precommits to send... + if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit { + if trySendVote(rs.Precommits, prs.Precommits) { + continue OUTER_LOOP + } + } + + // If there are any commits to send... + if trySendVote(rs.Commits, prs.Commits) { continue OUTER_LOOP } } - // If there are precommits to send... - if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit { - if trySendVote(rs.Precommits, prs.Precommits) { + // If peer is lagging by height 1, match our LastCommits to peer's Commits. + if rs.Height == prs.Height+1 { + + // Initialize Commits if needed + ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommits.Size()) + + // If there are lastcommits to send... + if trySendVote(rs.LastCommits, prs.Commits) { continue OUTER_LOOP } + } - // If there are any commits to send... - if trySendVote(rs.Commits, prs.Commits) { - continue OUTER_LOOP + // If peer is lagging by more than 1, load and send Validation and send Commits. + if rs.Height >= prs.Height+2 { + + // Load the block header and validation for prs.Height+1, + // which contains commit signatures for prs.Height. + header, validation := conR.conS.LoadHeaderValidation(prs.Height + 1) + size := uint(len(validation.Commits)) + + // Initialize Commits if needed + ps.EnsureVoteBitArrays(prs.Height, size) + + index, ok := validation.BitArray().Sub(prs.Commits).PickRandom() + if ok { + rsig := validation.Commits[index] + // Reconstruct vote. + vote := &Vote{ + Height: prs.Height, + Round: rsig.Round, + Type: VoteTypeCommit, + BlockHash: header.LastBlockHash, + BlockParts: header.LastBlockParts, + Signature: rsig.Signature, + } + msg := p2p.TypedMessage{msgTypeVote, vote} + peer.Send(VoteCh, msg) + ps.SetHasVote(vote, index) + continue OUTER_LOOP + } } // We sent nothing. Sleep... @@ -396,6 +428,7 @@ type PeerRoundState struct { Prevotes BitArray // All votes peer has for this round Precommits BitArray // All precommits peer has for this round Commits BitArray // All commits peer has for this height + LastCommits BitArray // All commits peer has for last height } //----------------------------------------------------------------------------- @@ -463,11 +496,11 @@ func (ps *PeerState) SetHasProposalPOLPart(height uint32, round uint16, index ui ps.ProposalPOLBitArray.SetIndex(uint(index), true) } -func (ps *PeerState) EnsureVoteBitArrays(height uint32, round uint16, numValidators uint) { +func (ps *PeerState) EnsureVoteBitArrays(height uint32, numValidators uint) { ps.mtx.Lock() defer ps.mtx.Unlock() - if ps.Height != height || ps.Round != round { + if ps.Height != height { return } @@ -482,21 +515,29 @@ func (ps *PeerState) EnsureVoteBitArrays(height uint32, round uint16, numValidat } } -func (ps *PeerState) SetHasVote(height uint32, round uint16, index uint, vote *Vote) { +func (ps *PeerState) SetHasVote(vote *Vote, index uint) { ps.mtx.Lock() defer ps.mtx.Unlock() + ps.setHasVote(vote.Height, vote.Round, vote.Type, index) +} - if ps.Height != height { +func (ps *PeerState) setHasVote(height uint32, round uint16, type_ byte, index uint) { + if ps.Height == height+1 && type_ == VoteTypeCommit { + // Special case for LastCommits. + ps.LastCommits.SetIndex(index, true) + return + } else if ps.Height != height { + // Does not apply. return } - switch vote.Type { + switch type_ { case VoteTypePrevote: ps.Prevotes.SetIndex(index, true) case VoteTypePrecommit: ps.Precommits.SetIndex(index, true) case VoteTypeCommit: - if vote.Round < round { + if round < ps.Round { ps.Prevotes.SetIndex(index, true) ps.Precommits.SetIndex(index, true) } @@ -531,6 +572,12 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage, rs *Roun ps.Precommits = BitArray{} } if psHeight != msg.Height { + // Shift Commits to LastCommits + if psHeight+1 == msg.Height { + ps.LastCommits = ps.Commits + } else { + ps.LastCommits = BitArray{} + } // We'll update the BitArray capacity later. ps.Commits = BitArray{} } @@ -548,22 +595,19 @@ func (ps *PeerState) ApplyCommitMessage(msg *CommitMessage) { ps.ProposalBlockBitArray = msg.BlockBitArray } -func (ps *PeerState) ApplyHasVotesMessage(msg *HasVotesMessage) { +func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() - if ps.Height != msg.Height { + // Special case for LastCommits + if ps.Height == msg.Height+1 && msg.Type == VoteTypeCommit { + ps.LastCommits.SetIndex(msg.Index, true) + return + } else if ps.Height != msg.Height { return } - ps.Commits = ps.Commits.Or(msg.Commits) - if ps.Round == msg.Round { - ps.Prevotes = ps.Prevotes.Or(msg.Prevotes) - ps.Precommits = ps.Precommits.Or(msg.Precommits) - } else { - ps.Prevotes = msg.Prevotes - ps.Precommits = msg.Precommits - } + ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index) } //----------------------------------------------------------------------------- @@ -574,11 +618,11 @@ const ( // Messages for communicating state changes msgTypeNewRoundStep = byte(0x01) msgTypeCommit = byte(0x02) - msgTypeHasVotes = byte(0x03) // Messages of data msgTypeProposal = byte(0x11) msgTypePart = byte(0x12) // both block & POL msgTypeVote = byte(0x13) + msgTypeHasVote = byte(0x14) ) // TODO: check for unnecessary extra bytes at the end. @@ -593,8 +637,6 @@ func decodeMessage(bz []byte) (msgType byte, msg interface{}) { msg = readNewRoundStepMessage(r, n, err) case msgTypeCommit: msg = readCommitMessage(r, n, err) - case msgTypeHasVotes: - msg = readHasVotesMessage(r, n, err) // Messages of data case msgTypeProposal: msg = ReadProposal(r, n, err) @@ -602,6 +644,8 @@ func decodeMessage(bz []byte) (msgType byte, msg interface{}) { msg = readPartMessage(r, n, err) case msgTypeVote: msg = ReadVote(r, n, err) + case msgTypeHasVote: + msg = readHasVoteMessage(r, n, err) default: msg = nil } @@ -669,40 +713,6 @@ func (m *CommitMessage) String() string { //------------------------------------- -type HasVotesMessage struct { - Height uint32 - Round uint16 - Prevotes BitArray - Precommits BitArray - Commits BitArray -} - -func readHasVotesMessage(r io.Reader, n *int64, err *error) *HasVotesMessage { - return &HasVotesMessage{ - Height: ReadUInt32(r, n, err), - Round: ReadUInt16(r, n, err), - Prevotes: ReadBitArray(r, n, err), - Precommits: ReadBitArray(r, n, err), - Commits: ReadBitArray(r, n, err), - } -} - -func (m *HasVotesMessage) WriteTo(w io.Writer) (n int64, err error) { - WriteByte(w, msgTypeHasVotes, &n, &err) - WriteUInt32(w, m.Height, &n, &err) - WriteUInt16(w, m.Round, &n, &err) - WriteBinary(w, m.Prevotes, &n, &err) - WriteBinary(w, m.Precommits, &n, &err) - WriteBinary(w, m.Commits, &n, &err) - return -} - -func (m *HasVotesMessage) String() string { - return fmt.Sprintf("[HasVotesMessage H:%v R:%v]", m.Height, m.Round) -} - -//------------------------------------- - const ( partTypeProposalBlock = byte(0x01) partTypeProposalPOL = byte(0x02) @@ -736,3 +746,34 @@ func (m *PartMessage) WriteTo(w io.Writer) (n int64, err error) { func (m *PartMessage) String() string { return fmt.Sprintf("[PartMessage H:%v R:%v T:%X]", m.Height, m.Round, m.Type) } + +//------------------------------------- + +type HasVoteMessage struct { + Height uint32 + Round uint16 + Type byte + Index uint +} + +func readHasVoteMessage(r io.Reader, n *int64, err *error) *HasVoteMessage { + return &HasVoteMessage{ + Height: ReadUInt32(r, n, err), + Round: ReadUInt16(r, n, err), + Type: ReadByte(r, n, err), + Index: ReadUVarInt(r, n, err), + } +} + +func (m *HasVoteMessage) WriteTo(w io.Writer) (n int64, err error) { + WriteByte(w, msgTypeHasVote, &n, &err) + WriteUInt32(w, m.Height, &n, &err) + WriteUInt16(w, m.Round, &n, &err) + WriteByte(w, m.Type, &n, &err) + WriteUVarInt(w, m.Index, &n, &err) + return +} + +func (m *HasVoteMessage) String() string { + return fmt.Sprintf("[HasVoteMessage H:%v R:%v T:%X]", m.Height, m.Round, m.Type) +} diff --git a/consensus/state.go b/consensus/state.go index a4711c7ae..3bb6618ad 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -496,10 +496,11 @@ func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) { pol = cs.LockedPOL } else { // Otherwise we should create a new proposal. - var validation Validation + var validation *Validation if cs.Height == 1 { // We're creating a proposal for the first block. // The validation is empty. + validation = &Validation{} } else { // We need to create a proposal. // If we don't have enough commits from the last height, @@ -512,7 +513,7 @@ func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) { } txs, state := cs.mempool.GetProposalTxs() // TODO: cache state block = &Block{ - Header: Header{ + Header: &Header{ Network: Config.Network, Height: cs.Height, Time: time.Now(), @@ -522,7 +523,7 @@ func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) { StateHash: state.Hash(), }, Validation: validation, - Data: Data{ + Data: &Data{ Txs: txs, }, } @@ -827,8 +828,26 @@ func (cs *ConsensusState) AddProposalPOLPart(height uint32, round uint16, part * return true, nil } -// NOTE: This function may increment the height. -func (cs *ConsensusState) AddVote(vote *Vote) (added bool, err error) { +func (cs *ConsensusState) AddVote(vote *Vote) (added bool, index uint, err error) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + + return cs.addVote(vote) +} + +// TODO: Maybe move this out of here? +func (cs *ConsensusState) LoadHeaderValidation(height uint32) (*Header, *Validation) { + meta := cs.blockStore.LoadBlockMeta(height) + if meta == nil { + return nil, nil + } + validation := cs.blockStore.LoadBlockValidation(height) + return meta.Header, validation +} + +//----------------------------------------------------------------------------- + +func (cs *ConsensusState) addVote(vote *Vote) (added bool, index uint, err error) { switch vote.Type { case VoteTypePrevote: // Prevotes checks for height+round match. @@ -837,23 +856,26 @@ func (cs *ConsensusState) AddVote(vote *Vote) (added bool, err error) { // Precommits checks for height+round match. return cs.Precommits.Add(vote) case VoteTypeCommit: - // Commits checks for height match. - // No need to check if vote.Round < cs.Round ... - // Prevotes && Precommits already checks that. - cs.Prevotes.Add(vote) - cs.Precommits.Add(vote) - added, err = cs.Commits.Add(vote) - if added && cs.Commits.HasTwoThirdsMajority() { - cs.runActionCh <- RoundAction{cs.Height, cs.Round, RoundActionTryFinalize} + if vote.Height == cs.Height { + // No need to check if vote.Round < cs.Round ... + // Prevotes && Precommits already checks that. + cs.Prevotes.Add(vote) + cs.Precommits.Add(vote) + added, index, err = cs.Commits.Add(vote) + if added && cs.Commits.HasTwoThirdsMajority() { + cs.runActionCh <- RoundAction{cs.Height, cs.Round, RoundActionTryFinalize} + } + return added, index, err } - return added, err + if vote.Height+1 == cs.Height { + return cs.LastCommits.Add(vote) + } + return false, 0, nil default: panic("Unknown vote type") } } -//----------------------------------------------------------------------------- - func (cs *ConsensusState) stageBlock(block *Block, blockParts *PartSet) error { if block == nil { panic("Cannot stage nil block") @@ -891,7 +913,7 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header PartSetHea BlockParts: header, } cs.PrivValidator.Sign(vote) - cs.AddVote(vote) + cs.addVote(vote) return vote } diff --git a/consensus/vote_set.go b/consensus/vote_set.go index 7b8e2fd24..845e90c32 100644 --- a/consensus/vote_set.go +++ b/consensus/vote_set.go @@ -54,10 +54,14 @@ func NewVoteSet(height uint32, round uint16, type_ byte, vset *state.ValidatorSe } } +func (vs *VoteSet) Size() uint { + return vs.vset.Size() +} + // True if added, false if not. // Returns ErrVote[UnexpectedStep|InvalidAccount|InvalidSignature|InvalidBlockHash|ConflictingSignature] // NOTE: vote should not be mutated after adding. -func (vs *VoteSet) Add(vote *Vote) (bool, error) { +func (vs *VoteSet) Add(vote *Vote) (bool, uint, error) { vs.mtx.Lock() defer vs.mtx.Unlock() @@ -66,31 +70,31 @@ func (vs *VoteSet) Add(vote *Vote) (bool, error) { (vote.Type != VoteTypeCommit && vote.Round != vs.round) || (vote.Type != VoteTypeCommit && vote.Type != vs.type_) || (vote.Type == VoteTypeCommit && vs.type_ != VoteTypeCommit && vote.Round >= vs.round) { - return false, ErrVoteUnexpectedStep + return false, 0, ErrVoteUnexpectedStep } // Ensure that signer is a validator. _, val := vs.vset.GetById(vote.SignerId) if val == nil { - return false, ErrVoteInvalidAccount + return false, 0, ErrVoteInvalidAccount } // Check signature. if !val.Verify(vote) { // Bad signature. - return false, ErrVoteInvalidSignature + return false, 0, ErrVoteInvalidSignature } return vs.addVote(vote) } -func (vs *VoteSet) addVote(vote *Vote) (bool, error) { +func (vs *VoteSet) addVote(vote *Vote) (bool, uint, error) { // If vote already exists, return false. if existingVote, ok := vs.votes[vote.SignerId]; ok { if bytes.Equal(existingVote.BlockHash, vote.BlockHash) { - return false, nil + return false, 0, nil } else { - return false, ErrVoteConflictingSignature + return false, 0, ErrVoteConflictingSignature } } @@ -98,7 +102,7 @@ func (vs *VoteSet) addVote(vote *Vote) (bool, error) { vs.votes[vote.SignerId] = vote voterIndex, val := vs.vset.GetById(vote.SignerId) if val == nil { - return false, ErrVoteInvalidAccount + return false, 0, ErrVoteInvalidAccount } vs.votesBitArray.SetIndex(uint(voterIndex), true) blockKey := string(vote.BlockHash) + string(BinaryBytes(vote.BlockParts)) @@ -114,7 +118,7 @@ func (vs *VoteSet) addVote(vote *Vote) (bool, error) { vs.maj23Exists = true } - return true, nil + return true, voterIndex, nil } // Assumes that commits VoteSet is valid. @@ -215,7 +219,7 @@ func (vs *VoteSet) MakePOL() *POL { return pol } -func (vs *VoteSet) MakeValidation() Validation { +func (vs *VoteSet) MakeValidation() *Validation { if vs.type_ != VoteTypeCommit { panic("Cannot MakeValidation() unless VoteSet.Type is VoteTypeCommit") } @@ -240,7 +244,7 @@ func (vs *VoteSet) MakeValidation() Validation { rsigs[index] = RoundSignature{vote.Round, vote.Signature} return false }) - return Validation{ + return &Validation{ Commits: rsigs, } } diff --git a/consensus/vote_set_test.go b/consensus/vote_set_test.go index 428a06f2f..f33dd1d6c 100644 --- a/consensus/vote_set_test.go +++ b/consensus/vote_set_test.go @@ -169,7 +169,7 @@ func TestBadVotes(t *testing.T) { // val0 votes for nil. vote := &Vote{Height: height, Round: round, Type: VoteTypePrevote, BlockHash: nil} privAccounts[0].Sign(vote) - added, err := voteSet.Add(vote) + added, _, err := voteSet.Add(vote) if !added || err != nil { t.Errorf("Expected Add(vote) to succeed") } @@ -177,7 +177,7 @@ func TestBadVotes(t *testing.T) { // val0 votes again for some block. vote = &Vote{Height: height, Round: round, Type: VoteTypePrevote, BlockHash: CRandBytes(32)} privAccounts[0].Sign(vote) - added, err = voteSet.Add(vote) + added, _, err = voteSet.Add(vote) if added || err == nil { t.Errorf("Expected Add(vote) to fail, dupeout.") } @@ -185,7 +185,7 @@ func TestBadVotes(t *testing.T) { // val1 votes on another height vote = &Vote{Height: height + 1, Round: round, Type: VoteTypePrevote, BlockHash: nil} privAccounts[1].Sign(vote) - added, err = voteSet.Add(vote) + added, _, err = voteSet.Add(vote) if added { t.Errorf("Expected Add(vote) to fail, wrong height") } @@ -193,7 +193,7 @@ func TestBadVotes(t *testing.T) { // val2 votes on another round vote = &Vote{Height: height, Round: round + 1, Type: VoteTypePrevote, BlockHash: nil} privAccounts[2].Sign(vote) - added, err = voteSet.Add(vote) + added, _, err = voteSet.Add(vote) if added { t.Errorf("Expected Add(vote) to fail, wrong round") } @@ -201,7 +201,7 @@ func TestBadVotes(t *testing.T) { // val3 votes of another type. vote = &Vote{Height: height, Round: round, Type: VoteTypePrecommit, BlockHash: nil} privAccounts[3].Sign(vote) - added, err = voteSet.Add(vote) + added, _, err = voteSet.Add(vote) if added { t.Errorf("Expected Add(vote) to fail, wrong type") } @@ -226,7 +226,7 @@ func TestAddCommitsToPrevoteVotes(t *testing.T) { // Attempt to add a commit from val6 at a previous height vote := &Vote{Height: height - 1, Round: round, Type: VoteTypeCommit, BlockHash: nil} privAccounts[6].Sign(vote) - added, _ := voteSet.Add(vote) + added, _, _ := voteSet.Add(vote) if added { t.Errorf("Expected Add(vote) to fail, wrong height.") } @@ -234,7 +234,7 @@ func TestAddCommitsToPrevoteVotes(t *testing.T) { // Attempt to add a commit from val6 at a later round vote = &Vote{Height: height, Round: round + 1, Type: VoteTypeCommit, BlockHash: nil} privAccounts[6].Sign(vote) - added, _ = voteSet.Add(vote) + added, _, _ = voteSet.Add(vote) if added { t.Errorf("Expected Add(vote) to fail, cannot add future round vote.") } @@ -242,7 +242,7 @@ func TestAddCommitsToPrevoteVotes(t *testing.T) { // Attempt to add a commit from val6 for currrent height/round. vote = &Vote{Height: height, Round: round, Type: VoteTypeCommit, BlockHash: nil} privAccounts[6].Sign(vote) - added, err := voteSet.Add(vote) + added, _, err := voteSet.Add(vote) if added || err == nil { t.Errorf("Expected Add(vote) to fail, only prior round commits can be added.") } @@ -250,7 +250,7 @@ func TestAddCommitsToPrevoteVotes(t *testing.T) { // Add commit from val6 at a previous round vote = &Vote{Height: height, Round: round - 1, Type: VoteTypeCommit, BlockHash: nil} privAccounts[6].Sign(vote) - added, err = voteSet.Add(vote) + added, _, err = voteSet.Add(vote) if !added || err != nil { t.Errorf("Expected Add(vote) to succeed, commit for prior rounds are relevant.") } @@ -258,7 +258,7 @@ func TestAddCommitsToPrevoteVotes(t *testing.T) { // Also add commit from val7 for previous round. vote = &Vote{Height: height, Round: round - 2, Type: VoteTypeCommit, BlockHash: nil} privAccounts[7].Sign(vote) - added, err = voteSet.Add(vote) + added, _, err = voteSet.Add(vote) if !added || err != nil { t.Errorf("Expected Add(vote) to succeed. err: %v", err) }