Browse Source

precommit nil if locked and no POL

pull/142/head
Ethan Buchman 9 years ago
parent
commit
555ecb095d
7 changed files with 269 additions and 111 deletions
  1. +3
    -3
      consensus/height_vote_set.go
  2. +14
    -34
      consensus/reactor.go
  3. +139
    -34
      consensus/state.go
  4. +0
    -38
      consensus/state_test.go
  5. +113
    -0
      consensus/test.go
  6. +0
    -1
      mempool/reactor.go
  7. +0
    -1
      p2p/switch.go

+ 3
- 3
consensus/height_vote_set.go View File

@ -77,7 +77,7 @@ func (hvs *HeightVoteSet) addRound(round int) {
if _, ok := hvs.roundVoteSets[round]; ok { if _, ok := hvs.roundVoteSets[round]; ok {
PanicSanity("addRound() for an existing round") PanicSanity("addRound() for an existing round")
} }
log.Info("addRound(round)", "round", round)
log.Debug("addRound(round)", "round", round)
prevotes := types.NewVoteSet(hvs.height, round, types.VoteTypePrevote, hvs.valSet) prevotes := types.NewVoteSet(hvs.height, round, types.VoteTypePrevote, hvs.valSet)
precommits := types.NewVoteSet(hvs.height, round, types.VoteTypePrecommit, hvs.valSet) precommits := types.NewVoteSet(hvs.height, round, types.VoteTypePrecommit, hvs.valSet)
hvs.roundVoteSets[round] = RoundVoteSet{ hvs.roundVoteSets[round] = RoundVoteSet{
@ -120,7 +120,7 @@ func (hvs *HeightVoteSet) Precommits(round int) *types.VoteSet {
return hvs.getVoteSet(round, types.VoteTypePrecommit) return hvs.getVoteSet(round, types.VoteTypePrecommit)
} }
// Last round that has +2/3 prevotes for a particular block or nik.
// Last round that has +2/3 prevotes for a particular block or nil.
// Returns -1 if no such round exists. // Returns -1 if no such round exists.
func (hvs *HeightVoteSet) POLRound() int { func (hvs *HeightVoteSet) POLRound() int {
hvs.mtx.Lock() hvs.mtx.Lock()
@ -134,7 +134,7 @@ func (hvs *HeightVoteSet) POLRound() int {
} }
func (hvs *HeightVoteSet) getVoteSet(round int, type_ byte) *types.VoteSet { func (hvs *HeightVoteSet) getVoteSet(round int, type_ byte) *types.VoteSet {
log.Info("getVoteSet(round)", "round", round, "type", type_)
log.Debug("getVoteSet(round)", "round", round, "type", type_)
rvs, ok := hvs.roundVoteSets[round] rvs, ok := hvs.roundVoteSets[round]
if !ok { if !ok {
return nil return nil


+ 14
- 34
consensus/reactor.go View File

@ -142,7 +142,8 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte
ps := peer.Data.Get(PeerStateKey).(*PeerState) ps := peer.Data.Get(PeerStateKey).(*PeerState)
_, msg, err := DecodeMessage(msgBytes) _, msg, err := DecodeMessage(msgBytes)
if err != nil { if err != nil {
log.Warn("Error decoding message", "channel", chID, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes)
log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes)
// TODO punish peer?
return return
} }
log.Debug("Receive", "channel", chID, "peer", peer, "msg", msg, "rsHeight", rs.Height) log.Debug("Receive", "channel", chID, "peer", peer, "msg", msg, "rsHeight", rs.Height)
@ -185,50 +186,29 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte
} }
switch msg := msg.(type) { switch msg := msg.(type) {
case *VoteMessage: case *VoteMessage:
vote := msg.Vote
var validators *types.ValidatorSet
if rs.Height == vote.Height {
validators = rs.Validators
} else if rs.Height == vote.Height+1 {
if !(rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) {
return // Wrong height, not a LastCommit straggler commit.
}
validators = rs.LastValidators
} else {
return // Wrong height. Not necessarily a bad peer.
vote, valIndex := msg.Vote, msg.ValidatorIndex
// attempt to add the vote and dupeout the validator if its a duplicate signature
added, err := conR.conS.TryAddVote(rs, vote, valIndex, peer.Key)
if err == ErrAddingVote {
// TODO: punish peer
} else if err != nil {
return
} }
// We have vote/validators. Height may not be rs.Height
address, _ := validators.GetByIndex(msg.ValidatorIndex)
added, index, err := conR.conS.AddVote(address, vote, peer.Key)
if err != nil {
// If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
log.Warn("Found conflicting vote. Publish evidence")
evidenceTx := &types.DupeoutTx{
Address: address,
VoteA: *errDupe.VoteA,
VoteB: *errDupe.VoteB,
}
conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
} else {
// Probably an invalid signature. Bad peer.
log.Warn("Error attempting to add vote", "error", err)
// TODO: punish peer
}
}
ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size()) ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size())
ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size()) ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size())
ps.SetHasVote(vote, index)
ps.SetHasVote(vote, valIndex)
if added { if added {
// If rs.Height == vote.Height && rs.Round < vote.Round, // If rs.Height == vote.Height && rs.Round < vote.Round,
// the peer is sending us CatchupCommit precommits. // the peer is sending us CatchupCommit precommits.
// We could make note of this and help filter in broadcastHasVoteMessage(). // We could make note of this and help filter in broadcastHasVoteMessage().
conR.broadcastHasVoteMessage(vote, index)
conR.broadcastHasVoteMessage(vote, valIndex)
} }
default: default:
// TODO: should these be punishable?
log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
default: default:


+ 139
- 34
consensus/state.go View File

@ -89,7 +89,7 @@
it locks (changes lock to) that block and precommits that block. it locks (changes lock to) that block and precommits that block.
* Else, if the validator had seen +2/3 of prevotes for <nil>, it unlocks and precommits <nil>. * Else, if the validator had seen +2/3 of prevotes for <nil>, it unlocks and precommits <nil>.
* Else, if +2/3 of prevotes for a particular block or <nil> is not received on time, * Else, if +2/3 of prevotes for a particular block or <nil> is not received on time,
it precommits what it's locked on, or <nil>.
it precommits <nil>.
* The Precommit step ends: * The Precommit step ends:
* After +2/3 precommits for a particular block. --> goto Commit(H) * After +2/3 precommits for a particular block. --> goto Commit(H)
* After +2/3 precommits for <nil>. --> goto NewRound(H,R+1) * After +2/3 precommits for <nil>. --> goto NewRound(H,R+1)
@ -179,6 +179,7 @@ var (
var ( var (
ErrInvalidProposalSignature = errors.New("Error invalid proposal signature") ErrInvalidProposalSignature = errors.New("Error invalid proposal signature")
ErrInvalidProposalPOLRound = errors.New("Error invalid proposal POL round") ErrInvalidProposalPOLRound = errors.New("Error invalid proposal POL round")
ErrAddingVote = errors.New("Error adding vote")
) )
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -298,13 +299,18 @@ type ConsensusState struct {
evsw events.Fireable evsw events.Fireable
evc *events.EventCache // set in stageBlock and passed into state evc *events.EventCache // set in stageBlock and passed into state
timeoutChan chan TimeoutEvent // RoundState instead?
timeoutQuitChan chan struct{}
} }
func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState { func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState {
cs := &ConsensusState{ cs := &ConsensusState{
blockStore: blockStore,
mempoolReactor: mempoolReactor,
newStepCh: make(chan *RoundState, 10),
blockStore: blockStore,
mempoolReactor: mempoolReactor,
newStepCh: make(chan *RoundState, 10),
timeoutChan: make(chan TimeoutEvent), // XXX: blocks!
timeoutQuitChan: make(chan struct{}, 1),
} }
cs.updateToState(state, true) cs.updateToState(state, true)
// Don't call scheduleRound0 yet. // Don't call scheduleRound0 yet.
@ -361,12 +367,15 @@ func (cs *ConsensusState) NewStepCh() chan *RoundState {
func (cs *ConsensusState) OnStart() error { func (cs *ConsensusState) OnStart() error {
cs.BaseService.OnStart() cs.BaseService.OnStart()
go cs.logTimeouts(cs.timeoutChan, cs.timeoutQuitChan)
cs.scheduleRound0(cs.Height) cs.scheduleRound0(cs.Height)
return nil return nil
} }
func (cs *ConsensusState) OnStop() { func (cs *ConsensusState) OnStop() {
// It's mostly asynchronous so, there's not much to stop. // It's mostly asynchronous so, there's not much to stop.
// just the timeout tracker
close(cs.timeoutQuitChan)
cs.BaseService.OnStop() cs.BaseService.OnStop()
} }
@ -377,6 +386,7 @@ func (cs *ConsensusState) scheduleRound0(height int) {
go func() { go func() {
if 0 < sleepDuration { if 0 < sleepDuration {
time.Sleep(sleepDuration) time.Sleep(sleepDuration)
// TODO: fire on timeoutCh ?
} }
cs.EnterNewRound(height, 0) cs.EnterNewRound(height, 0)
}() }()
@ -533,21 +543,28 @@ func (cs *ConsensusState) EnterPropose(height int, round int) {
} }
log.Info(Fmt("EnterPropose(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) log.Info(Fmt("EnterPropose(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
var enterPrevote = make(chan struct{}, 1)
defer func() { defer func() {
// Done EnterPropose: // Done EnterPropose:
cs.Round = round cs.Round = round
cs.Step = RoundStepPropose cs.Step = RoundStepPropose
cs.newStepCh <- cs.getRoundState()
// If we already have the proposal + POL, then goto Prevote // If we already have the proposal + POL, then goto Prevote
if cs.isProposalComplete() { if cs.isProposalComplete() {
go cs.EnterPrevote(height, cs.Round)
enterPrevote <- struct{}{}
} }
}() }()
// This step times out after `timeoutPropose`
// EnterPrevote after timeoutPropose or if the proposal is complete
go func() { go func() {
time.Sleep(timeoutPropose)
ticker := time.NewTicker(timeoutPropose)
select {
case <-ticker.C:
cs.timeoutChan <- TimeoutEvent{RoundStepPropose, height, round}
case <-enterPrevote:
}
cs.newStepCh <- cs.getRoundState()
cs.EnterPrevote(height, round) cs.EnterPrevote(height, round)
}() }()
@ -560,7 +577,7 @@ func (cs *ConsensusState) EnterPropose(height int, round int) {
log.Info("EnterPropose: Not our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) log.Info("EnterPropose: Not our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator)
} else { } else {
log.Info("EnterPropose: Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) log.Info("EnterPropose: Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator)
cs.decideProposal(height, round)
cs.decideProposal(height, round) // if this takes longer than timeoutPropose we'll catch it in a later EnterPropose
} }
} }
@ -583,7 +600,7 @@ func (cs *ConsensusState) decideProposal(height int, round int) {
err := cs.privValidator.SignProposal(cs.state.ChainID, proposal) err := cs.privValidator.SignProposal(cs.state.ChainID, proposal)
if err == nil { if err == nil {
log.Notice("Signed and set proposal", "height", height, "round", round, "proposal", proposal) log.Notice("Signed and set proposal", "height", height, "round", round, "proposal", proposal)
log.Info(Fmt("Signed and set proposal block: %v", block))
log.Debug(Fmt("Signed and set proposal block: %v", block))
// Set fields // Set fields
cs.Proposal = proposal cs.Proposal = proposal
cs.ProposalBlock = block cs.ProposalBlock = block
@ -600,15 +617,19 @@ func (cs *ConsensusState) isProposalComplete() bool {
if cs.Proposal == nil || cs.ProposalBlock == nil { if cs.Proposal == nil || cs.ProposalBlock == nil {
return false return false
} }
// we have the proposal. if there's a POLRound,
// make sure we have the prevotes from it too
if cs.Proposal.POLRound < 0 { if cs.Proposal.POLRound < 0 {
return true return true
} else { } else {
// if this is false the proposer is lying or we haven't received the POL yet
return cs.Votes.Prevotes(cs.Proposal.POLRound).HasTwoThirdsMajority() return cs.Votes.Prevotes(cs.Proposal.POLRound).HasTwoThirdsMajority()
} }
} }
// Create the next block to propose and return it. // Create the next block to propose and return it.
// NOTE: make it side-effect free for clarity. // NOTE: make it side-effect free for clarity.
// XXX: where are the side-effects?
func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts *types.PartSet) { func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts *types.PartSet) {
var validation *types.Validation var validation *types.Validation
if cs.Height == 1 { if cs.Height == 1 {
@ -705,6 +726,8 @@ func (cs *ConsensusState) doPrevote(height int, round int) {
} }
// Prevote cs.ProposalBlock // Prevote cs.ProposalBlock
// NOTE: the proposal signature is validated when it is received,
// and the proposal block parts are validated as they are received (against the merkle hash in the proposal)
cs.signAddVote(types.VoteTypePrevote, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header()) cs.signAddVote(types.VoteTypePrevote, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header())
return return
} }
@ -729,7 +752,8 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) {
// After `timeoutPrevote0+timeoutPrevoteDelta*round`, EnterPrecommit() // After `timeoutPrevote0+timeoutPrevoteDelta*round`, EnterPrecommit()
go func() { go func() {
time.Sleep(timeoutPrevote0 + timeoutPrevote0*time.Duration(round))
time.Sleep(timeoutPrevote0 + timeoutPrevoteDelta*time.Duration(round))
cs.timeoutChan <- TimeoutEvent{RoundStepPrevote, height, round}
cs.EnterPrecommit(height, round) cs.EnterPrecommit(height, round)
}() }()
} }
@ -739,7 +763,8 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) {
// Enter: any +2/3 precommits for next round. // Enter: any +2/3 precommits for next round.
// Lock & precommit the ProposalBlock if we have enough prevotes for it, // Lock & precommit the ProposalBlock if we have enough prevotes for it,
// else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil, // else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil,
// else, precommit locked block or nil otherwise.
// else, precommit nil otherwise.
// NOTE: we don't precommit our locked block (unless theres another POL for it) because it complicates unlocking and accountability
func (cs *ConsensusState) EnterPrecommit(height int, round int) { func (cs *ConsensusState) EnterPrecommit(height int, round int) {
cs.mtx.Lock() cs.mtx.Lock()
defer cs.mtx.Unlock() defer cs.mtx.Unlock()
@ -762,25 +787,29 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) {
hash, partsHeader, ok := cs.Votes.Prevotes(round).TwoThirdsMajority() hash, partsHeader, ok := cs.Votes.Prevotes(round).TwoThirdsMajority()
// If we don't have two thirds of prevotes, just precommit locked block or nil
// If we don't have two thirds of prevotes, just precommit nil
// NOTE: alternatively, if we have seen a POL since our last precommit,
// we could precommit that
if !ok { if !ok {
if cs.LockedBlock != nil { if cs.LockedBlock != nil {
log.Info("EnterPrecommit: No +2/3 prevotes during EnterPrecommit. Precommitting lock.")
cs.signAddVote(types.VoteTypePrecommit, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header())
log.Info("EnterPrecommit: No +2/3 prevotes during EnterPrecommit while we're locked. Precommitting nil")
//cs.signAddVote(types.VoteTypePrecommit, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header())
} else { } else {
log.Info("EnterPrecommit: No +2/3 prevotes during EnterPrecommit. Precommitting nil.") log.Info("EnterPrecommit: No +2/3 prevotes during EnterPrecommit. Precommitting nil.")
cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{})
} }
cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{})
return return
} }
// At this point +2/3 prevoted for a particular block or nil
// +2/3 prevoted nil. Unlock and precommit nil. // +2/3 prevoted nil. Unlock and precommit nil.
if len(hash) == 0 { if len(hash) == 0 {
if cs.LockedBlock == nil { if cs.LockedBlock == nil {
log.Info("EnterPrecommit: +2/3 prevoted for nil.") log.Info("EnterPrecommit: +2/3 prevoted for nil.")
} else { } else {
log.Info("EnterPrecommit: +2/3 prevoted for nil. Unlocking") log.Info("EnterPrecommit: +2/3 prevoted for nil. Unlocking")
cs.LockedRound = 0
cs.LockedRound = 0 //XXX: should be this round
cs.LockedBlock = nil cs.LockedBlock = nil
cs.LockedBlockParts = nil cs.LockedBlockParts = nil
} }
@ -790,16 +819,17 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) {
// At this point, +2/3 prevoted for a particular block. // At this point, +2/3 prevoted for a particular block.
// If +2/3 prevoted for already locked block, precommit it.
// If we're already locked on that block, precommit it, and update the LockedRound
if cs.LockedBlock.HashesTo(hash) { if cs.LockedBlock.HashesTo(hash) {
log.Info("EnterPrecommit: +2/3 prevoted locked block.") log.Info("EnterPrecommit: +2/3 prevoted locked block.")
cs.LockedRound = round
cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader) cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader)
return return
} }
// If +2/3 prevoted for proposal block, stage and precommit it // If +2/3 prevoted for proposal block, stage and precommit it
if cs.ProposalBlock.HashesTo(hash) { if cs.ProposalBlock.HashesTo(hash) {
log.Info("EnterPrecommit: +2/3 prevoted proposal block.")
log.Info("EnterPrecommit: +2/3 prevoted proposal block.", "hash", fmt.Sprintf("%X", hash))
// Validate the block. // Validate the block.
if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil { if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil {
PanicConsensus(Fmt("EnterPrecommit: +2/3 prevoted for an invalid block: %v", err)) PanicConsensus(Fmt("EnterPrecommit: +2/3 prevoted for an invalid block: %v", err))
@ -817,7 +847,7 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) {
if cs.Votes.POLRound() < round { if cs.Votes.POLRound() < round {
PanicSanity(Fmt("This POLRound shold be %v but got %", round, cs.Votes.POLRound())) PanicSanity(Fmt("This POLRound shold be %v but got %", round, cs.Votes.POLRound()))
} }
cs.LockedRound = 0
cs.LockedRound = 0 // XXX: shouldn't we set this to this round
cs.LockedBlock = nil cs.LockedBlock = nil
cs.LockedBlockParts = nil cs.LockedBlockParts = nil
if !cs.ProposalBlockParts.HasHeader(partsHeader) { if !cs.ProposalBlockParts.HasHeader(partsHeader) {
@ -849,6 +879,7 @@ func (cs *ConsensusState) EnterPrecommitWait(height int, round int) {
// After `timeoutPrecommit0+timeoutPrecommitDelta*round`, EnterNewRound() // After `timeoutPrecommit0+timeoutPrecommitDelta*round`, EnterNewRound()
go func() { go func() {
time.Sleep(timeoutPrecommit0 + timeoutPrecommitDelta*time.Duration(round)) time.Sleep(timeoutPrecommit0 + timeoutPrecommitDelta*time.Duration(round))
cs.timeoutChan <- TimeoutEvent{RoundStepPrecommit, height, round}
// If we have +2/3 of precommits for a particular block (or nil), // If we have +2/3 of precommits for a particular block (or nil),
// we already entered commit (or the next round). // we already entered commit (or the next round).
// So just try to transition to the next round, // So just try to transition to the next round,
@ -885,16 +916,19 @@ func (cs *ConsensusState) EnterCommit(height int) {
// The Locked* fields no longer matter. // The Locked* fields no longer matter.
// Move them over to ProposalBlock if they match the commit hash, // Move them over to ProposalBlock if they match the commit hash,
// otherwise they can now be cleared. // otherwise they can now be cleared.
// XXX: can't we just wait to clear them in updateToState ?
// XXX: it's causing a race condition in tests where they get cleared
// before we can check the lock!
if cs.LockedBlock.HashesTo(hash) { if cs.LockedBlock.HashesTo(hash) {
cs.ProposalBlock = cs.LockedBlock cs.ProposalBlock = cs.LockedBlock
cs.ProposalBlockParts = cs.LockedBlockParts cs.ProposalBlockParts = cs.LockedBlockParts
cs.LockedRound = 0
/*cs.LockedRound = 0
cs.LockedBlock = nil cs.LockedBlock = nil
cs.LockedBlockParts = nil
cs.LockedBlockParts = nil*/
} else { } else {
cs.LockedRound = 0
/*cs.LockedRound = 0
cs.LockedBlock = nil cs.LockedBlock = nil
cs.LockedBlockParts = nil
cs.LockedBlockParts = nil*/
} }
// If we don't have the block being committed, set up to get it. // If we don't have the block being committed, set up to get it.
@ -1033,6 +1067,7 @@ func (cs *ConsensusState) AddProposalBlockPart(height int, part *types.Part) (ad
log.Info("Received complete proposal", "hash", cs.ProposalBlock.Hash()) log.Info("Received complete proposal", "hash", cs.ProposalBlock.Hash())
if cs.Step == RoundStepPropose && cs.isProposalComplete() { if cs.Step == RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step // Move onto the next step
// XXX: isn't this unecessary since propose will either do this or timeout into it
go cs.EnterPrevote(height, cs.Round) go cs.EnterPrevote(height, cs.Round)
} else if cs.Step == RoundStepCommit { } else if cs.Step == RoundStepCommit {
// If we're waiting on the proposal block... // If we're waiting on the proposal block...
@ -1050,10 +1085,49 @@ func (cs *ConsensusState) AddVote(address []byte, vote *types.Vote, peerKey stri
return cs.addVote(address, vote, peerKey) return cs.addVote(address, vote, peerKey)
} }
// Attempt to add the vote. if its a duplicate signature, dupeout the validator
func (cs *ConsensusState) TryAddVote(rs *RoundState, vote *types.Vote, valIndex int, peerKey string) (bool, error) {
var validators *types.ValidatorSet
if rs.Height == vote.Height {
validators = rs.Validators
} else if rs.Height == vote.Height+1 {
if !(rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) {
return false, fmt.Errorf("TryAddVote: Wrong height, not a LastCommit straggler commit.")
}
validators = rs.LastValidators
} else {
return false, fmt.Errorf("TryAddVote: Wrong height. Not necessarily a bad peer.")
}
// We have vote/validators. Height may not be rs.Height
address, _ := validators.GetByIndex(valIndex)
added, index, err := cs.AddVote(address, vote, peerKey)
_ = index // should be same as valIndex
if err != nil {
// If conflicting sig, broadcast evidence tx for slashing. Else punish peer.
if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok {
log.Warn("Found conflicting vote. Publish evidence")
evidenceTx := &types.DupeoutTx{
Address: address,
VoteA: *errDupe.VoteA,
VoteB: *errDupe.VoteB,
}
cs.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err
return added, err
} else {
// Probably an invalid signature. Bad peer.
log.Warn("Error attempting to add vote", "error", err)
return added, ErrAddingVote
}
}
return added, nil
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
func (cs *ConsensusState) addVote(address []byte, vote *types.Vote, peerKey string) (added bool, index int, err error) { func (cs *ConsensusState) addVote(address []byte, vote *types.Vote, peerKey string) (added bool, index int, err error) {
log.Info("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "csHeight", cs.Height)
log.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "csHeight", cs.Height)
// A precommit for the previous height? // A precommit for the previous height?
if vote.Height+1 == cs.Height && vote.Type == types.VoteTypePrecommit { if vote.Height+1 == cs.Height && vote.Type == types.VoteTypePrecommit {
@ -1082,7 +1156,7 @@ func (cs *ConsensusState) addVote(address []byte, vote *types.Vote, peerKey stri
hash, _, ok := prevotes.TwoThirdsMajority() hash, _, ok := prevotes.TwoThirdsMajority()
if ok && !cs.LockedBlock.HashesTo(hash) { if ok && !cs.LockedBlock.HashesTo(hash) {
log.Notice("Unlocking because of POL.", "lockedRound", cs.LockedRound, "POLRound", vote.Round) log.Notice("Unlocking because of POL.", "lockedRound", cs.LockedRound, "POLRound", vote.Round)
cs.LockedRound = 0
cs.LockedRound = 0 // XXX: shouldn't we set this to the current round?
cs.LockedBlock = nil cs.LockedBlock = nil
cs.LockedBlockParts = nil cs.LockedBlockParts = nil
} }
@ -1102,6 +1176,7 @@ func (cs *ConsensusState) addVote(address []byte, vote *types.Vote, peerKey stri
}() }()
} else if cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round { } else if cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round {
// If the proposal is now complete, enter prevote of cs.Round. // If the proposal is now complete, enter prevote of cs.Round.
// XXX: hmph again
if cs.isProposalComplete() { if cs.isProposalComplete() {
go cs.EnterPrevote(height, cs.Round) go cs.EnterPrevote(height, cs.Round)
} }
@ -1168,10 +1243,7 @@ func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartS
} }
} }
func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote {
if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.Address) {
return nil
}
func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSetHeader) (*types.Vote, error) {
vote := &types.Vote{ vote := &types.Vote{
Height: cs.Height, Height: cs.Height,
Round: cs.Round, Round: cs.Round,
@ -1180,6 +1252,14 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part
BlockPartsHeader: header, BlockPartsHeader: header,
} }
err := cs.privValidator.SignVote(cs.state.ChainID, vote) err := cs.privValidator.SignVote(cs.state.ChainID, vote)
return vote, err
}
func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote {
if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.Address) {
return nil
}
vote, err := cs.signVote(type_, hash, header)
if err == nil { if err == nil {
_, _, err := cs.addVote(cs.privValidator.Address, vote, "") _, _, err := cs.addVote(cs.privValidator.Address, vote, "")
log.Notice("Signed and added vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) log.Notice("Signed and added vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)
@ -1211,10 +1291,12 @@ func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSe
cs.mempoolReactor.Mempool.ResetForBlockAndState(block, cs.stagedState) cs.mempoolReactor.Mempool.ResetForBlockAndState(block, cs.stagedState)
// Fire off event // Fire off event
go func(block *types.Block) {
cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block})
cs.evc.Flush()
}(block)
if cs.evsw != nil && cs.evc != nil {
go func(block *types.Block) {
cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block})
cs.evc.Flush()
}(block)
}
} }
@ -1226,3 +1308,26 @@ func (cs *ConsensusState) SetFireable(evsw events.Fireable) {
func (cs *ConsensusState) String() string { func (cs *ConsensusState) String() string {
return Fmt("ConsensusState(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step) return Fmt("ConsensusState(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step)
} }
//-----------------------------------------------------------------------------
// timeout tracking
type TimeoutEvent struct {
Type RoundStepType
Height int
Round int
}
func (cs *ConsensusState) logTimeouts(timeoutChan chan TimeoutEvent, quitChan <-chan struct{}) {
for {
select {
case timeout := <-timeoutChan:
log.Info("Timeout in consensus state", "height", timeout.Height, "round", timeout.Round, "step", timeout.Type.String())
timeoutChan <- timeout
case <-quitChan:
return
}
}
}

+ 0
- 38
consensus/state_test.go View File

@ -1,38 +0,0 @@
package consensus
import (
"testing"
_ "github.com/tendermint/tendermint/config/tendermint_test"
)
func TestEnterProposeNoPrivValidator(t *testing.T) {
cs, _ := randConsensusState()
cs.EnterPropose(1, 0)
rs := cs.GetRoundState()
if rs.Proposal != nil {
t.Error("Expected to make no proposal, since no privValidator")
}
}
func TestEnterPropose(t *testing.T) {
cs, privValidators := randConsensusState()
val0 := privValidators[0]
cs.SetPrivValidator(val0)
cs.EnterPropose(1, 0)
rs := cs.GetRoundState()
// Check that Proposal, ProposalBlock, ProposalBlockParts are set.
if rs.Proposal == nil {
t.Error("rs.Proposal should be set")
}
if rs.ProposalBlock == nil {
t.Error("rs.ProposalBlock should be set")
}
if rs.ProposalBlockParts.Total() == 0 {
t.Error("rs.ProposalBlockParts should be set")
}
}
// TODO write better consensus state tests

+ 113
- 0
consensus/test.go View File

@ -1,13 +1,126 @@
package consensus package consensus
import ( import (
"bytes"
"fmt"
"testing"
"time"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
dbm "github.com/tendermint/tendermint/db" dbm "github.com/tendermint/tendermint/db"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
//-------------------------------------------------------------------------------
// utils
// add vote to one cs from another
func addVoteToFrom(t *testing.T, voteType byte, to, from *ConsensusState, hash []byte, header types.PartSetHeader) {
vote, err := from.signVote(voteType, hash, header)
if err != nil {
panic(fmt.Sprintln("Failed to sign vote", err))
}
valIndex, _ := to.Validators.GetByAddress(from.privValidator.Address)
added, err := to.TryAddVote(to.GetRoundState(), vote, valIndex, "")
if _, ok := err.(*types.ErrVoteConflictingSignature); ok {
// let it fly
} else if !added {
panic("Failed to add vote")
} else if err != nil {
panic(fmt.Sprintln("Failed to add vote:", err))
}
}
func ensureNoNewStep(t *testing.T, cs *ConsensusState) {
timeout := time.NewTicker(2 * time.Second)
select {
case <-timeout.C:
break
case <-cs.NewStepCh():
t.Fatal("We should be stuck waiting for more prevotes, not moving to the next step")
}
}
func validatePrevote(t *testing.T, cs *ConsensusState, round int, privVal *types.PrivValidator, blockHash []byte) {
prevotes := cs.Votes.Prevotes(round)
var vote *types.Vote
if vote = prevotes.GetByAddress(privVal.Address); vote == nil {
t.Fatal("Failed to find prevote from validator")
}
if blockHash == nil {
if vote.BlockHash != nil {
t.Fatal("Expected prevote to be for nil")
}
} else {
if !bytes.Equal(vote.BlockHash, blockHash) {
t.Fatal("Expected prevote to be for proposal block")
}
}
}
func validatePrecommit(t *testing.T, cs *ConsensusState, thisRound, lockRound int, privVal *types.PrivValidator, votedBlock, lockedBlock *types.Block) {
precommits := cs.Votes.Precommits(thisRound)
var vote *types.Vote
if vote = precommits.GetByAddress(privVal.Address); vote == nil {
panic("Failed to find precommit from validator")
}
if votedBlock == nil {
if vote.BlockHash != nil {
panic("Expected precommit to be for nil")
}
} else {
if !bytes.Equal(vote.BlockHash, votedBlock.Hash()) {
panic("Expected precommit to be for proposal block")
}
}
if lockedBlock == nil {
if cs.LockedRound != lockRound || cs.LockedBlock != nil {
panic(fmt.Sprintf("Expected to be locked on nil. Got %v", cs.LockedBlock))
}
} else {
if cs.LockedRound != lockRound || cs.LockedBlock != lockedBlock {
panic(fmt.Sprintf("Expected block to be locked on round %d, got %d. Got locked block %v, expected %v", lockRound, cs.LockedRound, cs.LockedBlock, lockedBlock))
}
}
}
func simpleConsensusState(nValidators int) ([]*ConsensusState, []*types.PrivValidator) {
// Get State
state, privAccs, privVals := sm.RandGenesisState(10, true, 1000, nValidators, false, 10)
_, _ = privAccs, privVals
fmt.Println(state.BondedValidators)
css := make([]*ConsensusState, nValidators)
for i := 0; i < nValidators; i++ {
// Get BlockStore
blockDB := dbm.NewMemDB()
blockStore := bc.NewBlockStore(blockDB)
// Make MempoolReactor
mempool := mempl.NewMempool(state.Copy())
mempoolReactor := mempl.NewMempoolReactor(mempool)
mempoolReactor.SetSwitch(p2p.NewSwitch())
// Make ConsensusReactor
cs := NewConsensusState(state, blockStore, mempoolReactor)
// read off the NewHeightStep
<-cs.NewStepCh()
css[i] = cs
}
return css, privVals
}
func randConsensusState() (*ConsensusState, []*types.PrivValidator) { func randConsensusState() (*ConsensusState, []*types.PrivValidator) {
state, _, privValidators := sm.RandGenesisState(20, false, 1000, 10, false, 1000) state, _, privValidators := sm.RandGenesisState(20, false, 1000, 10, false, 1000)
blockStore := bc.NewBlockStore(dbm.NewMemDB()) blockStore := bc.NewBlockStore(dbm.NewMemDB())


+ 0
- 1
mempool/reactor.go View File

@ -20,7 +20,6 @@ var (
type MempoolReactor struct { type MempoolReactor struct {
p2p.BaseReactor p2p.BaseReactor
sw *p2p.Switch
Mempool *Mempool Mempool *Mempool
evsw events.Fireable evsw events.Fireable


+ 0
- 1
p2p/switch.go View File

@ -295,7 +295,6 @@ func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
}(peer) }(peer)
} }
return successChan return successChan
} }
// Returns the count of outbound/inbound and outbound-dialing peers. // Returns the count of outbound/inbound and outbound-dialing peers.


Loading…
Cancel
Save