diff --git a/consensus/height_vote_set.go b/consensus/height_vote_set.go index 795b1eb2a..6613bdba3 100644 --- a/consensus/height_vote_set.go +++ b/consensus/height_vote_set.go @@ -77,7 +77,7 @@ func (hvs *HeightVoteSet) addRound(round int) { if _, ok := hvs.roundVoteSets[round]; ok { PanicSanity("addRound() for an existing round") } - log.Info("addRound(round)", "round", round) + log.Debug("addRound(round)", "round", round) prevotes := types.NewVoteSet(hvs.height, round, types.VoteTypePrevote, hvs.valSet) precommits := types.NewVoteSet(hvs.height, round, types.VoteTypePrecommit, hvs.valSet) hvs.roundVoteSets[round] = RoundVoteSet{ @@ -120,7 +120,7 @@ func (hvs *HeightVoteSet) Precommits(round int) *types.VoteSet { return hvs.getVoteSet(round, types.VoteTypePrecommit) } -// Last round that has +2/3 prevotes for a particular block or nik. +// Last round that has +2/3 prevotes for a particular block or nil. // Returns -1 if no such round exists. func (hvs *HeightVoteSet) POLRound() int { hvs.mtx.Lock() @@ -134,7 +134,7 @@ func (hvs *HeightVoteSet) POLRound() int { } func (hvs *HeightVoteSet) getVoteSet(round int, type_ byte) *types.VoteSet { - log.Info("getVoteSet(round)", "round", round, "type", type_) + log.Debug("getVoteSet(round)", "round", round, "type", type_) rvs, ok := hvs.roundVoteSets[round] if !ok { return nil diff --git a/consensus/reactor.go b/consensus/reactor.go index 4605c7bb7..0869cf8af 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -142,7 +142,8 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte ps := peer.Data.Get(PeerStateKey).(*PeerState) _, msg, err := DecodeMessage(msgBytes) if err != nil { - log.Warn("Error decoding message", "channel", chID, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes) + log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes) + // TODO punish peer? return } log.Debug("Receive", "channel", chID, "peer", peer, "msg", msg, "rsHeight", rs.Height) @@ -185,50 +186,29 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte } switch msg := msg.(type) { case *VoteMessage: - vote := msg.Vote - var validators *types.ValidatorSet - if rs.Height == vote.Height { - validators = rs.Validators - } else if rs.Height == vote.Height+1 { - if !(rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) { - return // Wrong height, not a LastCommit straggler commit. - } - validators = rs.LastValidators - } else { - return // Wrong height. Not necessarily a bad peer. + vote, valIndex := msg.Vote, msg.ValidatorIndex + + // attempt to add the vote and dupeout the validator if its a duplicate signature + added, err := conR.conS.TryAddVote(rs, vote, valIndex, peer.Key) + if err == ErrAddingVote { + // TODO: punish peer + } else if err != nil { + return } - // We have vote/validators. Height may not be rs.Height - - address, _ := validators.GetByIndex(msg.ValidatorIndex) - added, index, err := conR.conS.AddVote(address, vote, peer.Key) - if err != nil { - // If conflicting sig, broadcast evidence tx for slashing. Else punish peer. - if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok { - log.Warn("Found conflicting vote. Publish evidence") - evidenceTx := &types.DupeoutTx{ - Address: address, - VoteA: *errDupe.VoteA, - VoteB: *errDupe.VoteB, - } - conR.conS.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err - } else { - // Probably an invalid signature. Bad peer. - log.Warn("Error attempting to add vote", "error", err) - // TODO: punish peer - } - } ps.EnsureVoteBitArrays(rs.Height, rs.Validators.Size()) ps.EnsureVoteBitArrays(rs.Height-1, rs.LastCommit.Size()) - ps.SetHasVote(vote, index) + ps.SetHasVote(vote, valIndex) + if added { // If rs.Height == vote.Height && rs.Round < vote.Round, // the peer is sending us CatchupCommit precommits. // We could make note of this and help filter in broadcastHasVoteMessage(). - conR.broadcastHasVoteMessage(vote, index) + conR.broadcastHasVoteMessage(vote, valIndex) } default: + // TODO: should these be punishable? log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) } default: diff --git a/consensus/state.go b/consensus/state.go index 14e2b36e6..4d4cf0e1f 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -89,7 +89,7 @@ it locks (changes lock to) that block and precommits that block. * Else, if the validator had seen +2/3 of prevotes for , it unlocks and precommits . * Else, if +2/3 of prevotes for a particular block or is not received on time, - it precommits what it's locked on, or . + it precommits . * The Precommit step ends: * After +2/3 precommits for a particular block. --> goto Commit(H) * After +2/3 precommits for . --> goto NewRound(H,R+1) @@ -179,6 +179,7 @@ var ( var ( ErrInvalidProposalSignature = errors.New("Error invalid proposal signature") ErrInvalidProposalPOLRound = errors.New("Error invalid proposal POL round") + ErrAddingVote = errors.New("Error adding vote") ) //----------------------------------------------------------------------------- @@ -298,13 +299,18 @@ type ConsensusState struct { evsw events.Fireable evc *events.EventCache // set in stageBlock and passed into state + + timeoutChan chan TimeoutEvent // RoundState instead? + timeoutQuitChan chan struct{} } func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState { cs := &ConsensusState{ - blockStore: blockStore, - mempoolReactor: mempoolReactor, - newStepCh: make(chan *RoundState, 10), + blockStore: blockStore, + mempoolReactor: mempoolReactor, + newStepCh: make(chan *RoundState, 10), + timeoutChan: make(chan TimeoutEvent), // XXX: blocks! + timeoutQuitChan: make(chan struct{}, 1), } cs.updateToState(state, true) // Don't call scheduleRound0 yet. @@ -361,12 +367,15 @@ func (cs *ConsensusState) NewStepCh() chan *RoundState { func (cs *ConsensusState) OnStart() error { cs.BaseService.OnStart() + go cs.logTimeouts(cs.timeoutChan, cs.timeoutQuitChan) cs.scheduleRound0(cs.Height) return nil } func (cs *ConsensusState) OnStop() { // It's mostly asynchronous so, there's not much to stop. + // just the timeout tracker + close(cs.timeoutQuitChan) cs.BaseService.OnStop() } @@ -377,6 +386,7 @@ func (cs *ConsensusState) scheduleRound0(height int) { go func() { if 0 < sleepDuration { time.Sleep(sleepDuration) + // TODO: fire on timeoutCh ? } cs.EnterNewRound(height, 0) }() @@ -533,21 +543,28 @@ func (cs *ConsensusState) EnterPropose(height int, round int) { } log.Info(Fmt("EnterPropose(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) + var enterPrevote = make(chan struct{}, 1) + defer func() { // Done EnterPropose: cs.Round = round cs.Step = RoundStepPropose - cs.newStepCh <- cs.getRoundState() // If we already have the proposal + POL, then goto Prevote if cs.isProposalComplete() { - go cs.EnterPrevote(height, cs.Round) + enterPrevote <- struct{}{} } }() - // This step times out after `timeoutPropose` + // EnterPrevote after timeoutPropose or if the proposal is complete go func() { - time.Sleep(timeoutPropose) + ticker := time.NewTicker(timeoutPropose) + select { + case <-ticker.C: + cs.timeoutChan <- TimeoutEvent{RoundStepPropose, height, round} + case <-enterPrevote: + } + cs.newStepCh <- cs.getRoundState() cs.EnterPrevote(height, round) }() @@ -560,7 +577,7 @@ func (cs *ConsensusState) EnterPropose(height int, round int) { log.Info("EnterPropose: Not our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) } else { log.Info("EnterPropose: Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator) - cs.decideProposal(height, round) + cs.decideProposal(height, round) // if this takes longer than timeoutPropose we'll catch it in a later EnterPropose } } @@ -583,7 +600,7 @@ func (cs *ConsensusState) decideProposal(height int, round int) { err := cs.privValidator.SignProposal(cs.state.ChainID, proposal) if err == nil { log.Notice("Signed and set proposal", "height", height, "round", round, "proposal", proposal) - log.Info(Fmt("Signed and set proposal block: %v", block)) + log.Debug(Fmt("Signed and set proposal block: %v", block)) // Set fields cs.Proposal = proposal cs.ProposalBlock = block @@ -600,15 +617,19 @@ func (cs *ConsensusState) isProposalComplete() bool { if cs.Proposal == nil || cs.ProposalBlock == nil { return false } + // we have the proposal. if there's a POLRound, + // make sure we have the prevotes from it too if cs.Proposal.POLRound < 0 { return true } else { + // if this is false the proposer is lying or we haven't received the POL yet return cs.Votes.Prevotes(cs.Proposal.POLRound).HasTwoThirdsMajority() } } // Create the next block to propose and return it. // NOTE: make it side-effect free for clarity. +// XXX: where are the side-effects? func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts *types.PartSet) { var validation *types.Validation if cs.Height == 1 { @@ -705,6 +726,8 @@ func (cs *ConsensusState) doPrevote(height int, round int) { } // Prevote cs.ProposalBlock + // NOTE: the proposal signature is validated when it is received, + // and the proposal block parts are validated as they are received (against the merkle hash in the proposal) cs.signAddVote(types.VoteTypePrevote, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header()) return } @@ -729,7 +752,8 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) { // After `timeoutPrevote0+timeoutPrevoteDelta*round`, EnterPrecommit() go func() { - time.Sleep(timeoutPrevote0 + timeoutPrevote0*time.Duration(round)) + time.Sleep(timeoutPrevote0 + timeoutPrevoteDelta*time.Duration(round)) + cs.timeoutChan <- TimeoutEvent{RoundStepPrevote, height, round} cs.EnterPrecommit(height, round) }() } @@ -739,7 +763,8 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) { // Enter: any +2/3 precommits for next round. // Lock & precommit the ProposalBlock if we have enough prevotes for it, // else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil, -// else, precommit locked block or nil otherwise. +// else, precommit nil otherwise. +// NOTE: we don't precommit our locked block (unless theres another POL for it) because it complicates unlocking and accountability func (cs *ConsensusState) EnterPrecommit(height int, round int) { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -762,25 +787,29 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { hash, partsHeader, ok := cs.Votes.Prevotes(round).TwoThirdsMajority() - // If we don't have two thirds of prevotes, just precommit locked block or nil + // If we don't have two thirds of prevotes, just precommit nil + // NOTE: alternatively, if we have seen a POL since our last precommit, + // we could precommit that if !ok { if cs.LockedBlock != nil { - log.Info("EnterPrecommit: No +2/3 prevotes during EnterPrecommit. Precommitting lock.") - cs.signAddVote(types.VoteTypePrecommit, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header()) + log.Info("EnterPrecommit: No +2/3 prevotes during EnterPrecommit while we're locked. Precommitting nil") + //cs.signAddVote(types.VoteTypePrecommit, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header()) } else { log.Info("EnterPrecommit: No +2/3 prevotes during EnterPrecommit. Precommitting nil.") - cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{}) } + cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{}) return } + // At this point +2/3 prevoted for a particular block or nil + // +2/3 prevoted nil. Unlock and precommit nil. if len(hash) == 0 { if cs.LockedBlock == nil { log.Info("EnterPrecommit: +2/3 prevoted for nil.") } else { log.Info("EnterPrecommit: +2/3 prevoted for nil. Unlocking") - cs.LockedRound = 0 + cs.LockedRound = 0 //XXX: should be this round cs.LockedBlock = nil cs.LockedBlockParts = nil } @@ -790,16 +819,17 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { // At this point, +2/3 prevoted for a particular block. - // If +2/3 prevoted for already locked block, precommit it. + // If we're already locked on that block, precommit it, and update the LockedRound if cs.LockedBlock.HashesTo(hash) { log.Info("EnterPrecommit: +2/3 prevoted locked block.") + cs.LockedRound = round cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader) return } // If +2/3 prevoted for proposal block, stage and precommit it if cs.ProposalBlock.HashesTo(hash) { - log.Info("EnterPrecommit: +2/3 prevoted proposal block.") + log.Info("EnterPrecommit: +2/3 prevoted proposal block.", "hash", fmt.Sprintf("%X", hash)) // Validate the block. if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil { PanicConsensus(Fmt("EnterPrecommit: +2/3 prevoted for an invalid block: %v", err)) @@ -817,7 +847,7 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int) { if cs.Votes.POLRound() < round { PanicSanity(Fmt("This POLRound shold be %v but got %", round, cs.Votes.POLRound())) } - cs.LockedRound = 0 + cs.LockedRound = 0 // XXX: shouldn't we set this to this round cs.LockedBlock = nil cs.LockedBlockParts = nil if !cs.ProposalBlockParts.HasHeader(partsHeader) { @@ -849,6 +879,7 @@ func (cs *ConsensusState) EnterPrecommitWait(height int, round int) { // After `timeoutPrecommit0+timeoutPrecommitDelta*round`, EnterNewRound() go func() { time.Sleep(timeoutPrecommit0 + timeoutPrecommitDelta*time.Duration(round)) + cs.timeoutChan <- TimeoutEvent{RoundStepPrecommit, height, round} // If we have +2/3 of precommits for a particular block (or nil), // we already entered commit (or the next round). // So just try to transition to the next round, @@ -885,16 +916,19 @@ func (cs *ConsensusState) EnterCommit(height int) { // The Locked* fields no longer matter. // Move them over to ProposalBlock if they match the commit hash, // otherwise they can now be cleared. + // XXX: can't we just wait to clear them in updateToState ? + // XXX: it's causing a race condition in tests where they get cleared + // before we can check the lock! if cs.LockedBlock.HashesTo(hash) { cs.ProposalBlock = cs.LockedBlock cs.ProposalBlockParts = cs.LockedBlockParts - cs.LockedRound = 0 + /*cs.LockedRound = 0 cs.LockedBlock = nil - cs.LockedBlockParts = nil + cs.LockedBlockParts = nil*/ } else { - cs.LockedRound = 0 + /*cs.LockedRound = 0 cs.LockedBlock = nil - cs.LockedBlockParts = nil + cs.LockedBlockParts = nil*/ } // If we don't have the block being committed, set up to get it. @@ -1033,6 +1067,7 @@ func (cs *ConsensusState) AddProposalBlockPart(height int, part *types.Part) (ad log.Info("Received complete proposal", "hash", cs.ProposalBlock.Hash()) if cs.Step == RoundStepPropose && cs.isProposalComplete() { // Move onto the next step + // XXX: isn't this unecessary since propose will either do this or timeout into it go cs.EnterPrevote(height, cs.Round) } else if cs.Step == RoundStepCommit { // If we're waiting on the proposal block... @@ -1050,10 +1085,49 @@ func (cs *ConsensusState) AddVote(address []byte, vote *types.Vote, peerKey stri return cs.addVote(address, vote, peerKey) } +// Attempt to add the vote. if its a duplicate signature, dupeout the validator +func (cs *ConsensusState) TryAddVote(rs *RoundState, vote *types.Vote, valIndex int, peerKey string) (bool, error) { + var validators *types.ValidatorSet + if rs.Height == vote.Height { + validators = rs.Validators + } else if rs.Height == vote.Height+1 { + if !(rs.Step == RoundStepNewHeight && vote.Type == types.VoteTypePrecommit) { + return false, fmt.Errorf("TryAddVote: Wrong height, not a LastCommit straggler commit.") + } + validators = rs.LastValidators + } else { + return false, fmt.Errorf("TryAddVote: Wrong height. Not necessarily a bad peer.") + } + + // We have vote/validators. Height may not be rs.Height + + address, _ := validators.GetByIndex(valIndex) + added, index, err := cs.AddVote(address, vote, peerKey) + _ = index // should be same as valIndex + if err != nil { + // If conflicting sig, broadcast evidence tx for slashing. Else punish peer. + if errDupe, ok := err.(*types.ErrVoteConflictingSignature); ok { + log.Warn("Found conflicting vote. Publish evidence") + evidenceTx := &types.DupeoutTx{ + Address: address, + VoteA: *errDupe.VoteA, + VoteB: *errDupe.VoteB, + } + cs.mempoolReactor.BroadcastTx(evidenceTx) // shouldn't need to check returned err + return added, err + } else { + // Probably an invalid signature. Bad peer. + log.Warn("Error attempting to add vote", "error", err) + return added, ErrAddingVote + } + } + return added, nil +} + //----------------------------------------------------------------------------- func (cs *ConsensusState) addVote(address []byte, vote *types.Vote, peerKey string) (added bool, index int, err error) { - log.Info("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "csHeight", cs.Height) + log.Debug("addVote", "voteHeight", vote.Height, "voteType", vote.Type, "csHeight", cs.Height) // A precommit for the previous height? if vote.Height+1 == cs.Height && vote.Type == types.VoteTypePrecommit { @@ -1082,7 +1156,7 @@ func (cs *ConsensusState) addVote(address []byte, vote *types.Vote, peerKey stri hash, _, ok := prevotes.TwoThirdsMajority() if ok && !cs.LockedBlock.HashesTo(hash) { log.Notice("Unlocking because of POL.", "lockedRound", cs.LockedRound, "POLRound", vote.Round) - cs.LockedRound = 0 + cs.LockedRound = 0 // XXX: shouldn't we set this to the current round? cs.LockedBlock = nil cs.LockedBlockParts = nil } @@ -1102,6 +1176,7 @@ func (cs *ConsensusState) addVote(address []byte, vote *types.Vote, peerKey stri }() } else if cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round { // If the proposal is now complete, enter prevote of cs.Round. + // XXX: hmph again if cs.isProposalComplete() { go cs.EnterPrevote(height, cs.Round) } @@ -1168,10 +1243,7 @@ func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartS } } -func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote { - if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.Address) { - return nil - } +func (cs *ConsensusState) signVote(type_ byte, hash []byte, header types.PartSetHeader) (*types.Vote, error) { vote := &types.Vote{ Height: cs.Height, Round: cs.Round, @@ -1180,6 +1252,14 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part BlockPartsHeader: header, } err := cs.privValidator.SignVote(cs.state.ChainID, vote) + return vote, err +} + +func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.PartSetHeader) *types.Vote { + if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.Address) { + return nil + } + vote, err := cs.signVote(type_, hash, header) if err == nil { _, _, err := cs.addVote(cs.privValidator.Address, vote, "") log.Notice("Signed and added vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) @@ -1211,10 +1291,12 @@ func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSe cs.mempoolReactor.Mempool.ResetForBlockAndState(block, cs.stagedState) // Fire off event - go func(block *types.Block) { - cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block}) - cs.evc.Flush() - }(block) + if cs.evsw != nil && cs.evc != nil { + go func(block *types.Block) { + cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block}) + cs.evc.Flush() + }(block) + } } @@ -1226,3 +1308,26 @@ func (cs *ConsensusState) SetFireable(evsw events.Fireable) { func (cs *ConsensusState) String() string { return Fmt("ConsensusState(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step) } + +//----------------------------------------------------------------------------- +// timeout tracking + +type TimeoutEvent struct { + Type RoundStepType + Height int + Round int +} + +func (cs *ConsensusState) logTimeouts(timeoutChan chan TimeoutEvent, quitChan <-chan struct{}) { + + for { + select { + case timeout := <-timeoutChan: + log.Info("Timeout in consensus state", "height", timeout.Height, "round", timeout.Round, "step", timeout.Type.String()) + timeoutChan <- timeout + case <-quitChan: + return + + } + } +} diff --git a/consensus/state_test.go b/consensus/state_test.go deleted file mode 100644 index 599d3c399..000000000 --- a/consensus/state_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package consensus - -import ( - "testing" - - _ "github.com/tendermint/tendermint/config/tendermint_test" -) - -func TestEnterProposeNoPrivValidator(t *testing.T) { - cs, _ := randConsensusState() - cs.EnterPropose(1, 0) - rs := cs.GetRoundState() - if rs.Proposal != nil { - t.Error("Expected to make no proposal, since no privValidator") - } -} - -func TestEnterPropose(t *testing.T) { - cs, privValidators := randConsensusState() - val0 := privValidators[0] - cs.SetPrivValidator(val0) - - cs.EnterPropose(1, 0) - rs := cs.GetRoundState() - - // Check that Proposal, ProposalBlock, ProposalBlockParts are set. - if rs.Proposal == nil { - t.Error("rs.Proposal should be set") - } - if rs.ProposalBlock == nil { - t.Error("rs.ProposalBlock should be set") - } - if rs.ProposalBlockParts.Total() == 0 { - t.Error("rs.ProposalBlockParts should be set") - } -} - -// TODO write better consensus state tests diff --git a/consensus/test.go b/consensus/test.go index a12c925c0..df8e35fbc 100644 --- a/consensus/test.go +++ b/consensus/test.go @@ -1,13 +1,126 @@ package consensus import ( + "bytes" + "fmt" + "testing" + "time" + bc "github.com/tendermint/tendermint/blockchain" dbm "github.com/tendermint/tendermint/db" mempl "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) +//------------------------------------------------------------------------------- +// utils + +// add vote to one cs from another +func addVoteToFrom(t *testing.T, voteType byte, to, from *ConsensusState, hash []byte, header types.PartSetHeader) { + vote, err := from.signVote(voteType, hash, header) + if err != nil { + panic(fmt.Sprintln("Failed to sign vote", err)) + } + valIndex, _ := to.Validators.GetByAddress(from.privValidator.Address) + added, err := to.TryAddVote(to.GetRoundState(), vote, valIndex, "") + if _, ok := err.(*types.ErrVoteConflictingSignature); ok { + // let it fly + } else if !added { + panic("Failed to add vote") + } else if err != nil { + panic(fmt.Sprintln("Failed to add vote:", err)) + } +} + +func ensureNoNewStep(t *testing.T, cs *ConsensusState) { + timeout := time.NewTicker(2 * time.Second) + select { + case <-timeout.C: + break + case <-cs.NewStepCh(): + t.Fatal("We should be stuck waiting for more prevotes, not moving to the next step") + } +} + +func validatePrevote(t *testing.T, cs *ConsensusState, round int, privVal *types.PrivValidator, blockHash []byte) { + prevotes := cs.Votes.Prevotes(round) + var vote *types.Vote + if vote = prevotes.GetByAddress(privVal.Address); vote == nil { + t.Fatal("Failed to find prevote from validator") + } + if blockHash == nil { + if vote.BlockHash != nil { + t.Fatal("Expected prevote to be for nil") + } + } else { + if !bytes.Equal(vote.BlockHash, blockHash) { + t.Fatal("Expected prevote to be for proposal block") + } + } +} + +func validatePrecommit(t *testing.T, cs *ConsensusState, thisRound, lockRound int, privVal *types.PrivValidator, votedBlock, lockedBlock *types.Block) { + precommits := cs.Votes.Precommits(thisRound) + var vote *types.Vote + if vote = precommits.GetByAddress(privVal.Address); vote == nil { + panic("Failed to find precommit from validator") + } + + if votedBlock == nil { + if vote.BlockHash != nil { + panic("Expected precommit to be for nil") + } + } else { + if !bytes.Equal(vote.BlockHash, votedBlock.Hash()) { + panic("Expected precommit to be for proposal block") + } + } + + if lockedBlock == nil { + if cs.LockedRound != lockRound || cs.LockedBlock != nil { + panic(fmt.Sprintf("Expected to be locked on nil. Got %v", cs.LockedBlock)) + } + } else { + if cs.LockedRound != lockRound || cs.LockedBlock != lockedBlock { + panic(fmt.Sprintf("Expected block to be locked on round %d, got %d. Got locked block %v, expected %v", lockRound, cs.LockedRound, cs.LockedBlock, lockedBlock)) + } + } + +} + +func simpleConsensusState(nValidators int) ([]*ConsensusState, []*types.PrivValidator) { + // Get State + state, privAccs, privVals := sm.RandGenesisState(10, true, 1000, nValidators, false, 10) + _, _ = privAccs, privVals + + fmt.Println(state.BondedValidators) + + css := make([]*ConsensusState, nValidators) + for i := 0; i < nValidators; i++ { + // Get BlockStore + blockDB := dbm.NewMemDB() + blockStore := bc.NewBlockStore(blockDB) + + // Make MempoolReactor + mempool := mempl.NewMempool(state.Copy()) + mempoolReactor := mempl.NewMempoolReactor(mempool) + + mempoolReactor.SetSwitch(p2p.NewSwitch()) + + // Make ConsensusReactor + cs := NewConsensusState(state, blockStore, mempoolReactor) + + // read off the NewHeightStep + <-cs.NewStepCh() + + css[i] = cs + } + + return css, privVals +} + func randConsensusState() (*ConsensusState, []*types.PrivValidator) { state, _, privValidators := sm.RandGenesisState(20, false, 1000, 10, false, 1000) blockStore := bc.NewBlockStore(dbm.NewMemDB()) diff --git a/mempool/reactor.go b/mempool/reactor.go index 664e33467..2e6e36ec6 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -20,7 +20,6 @@ var ( type MempoolReactor struct { p2p.BaseReactor - sw *p2p.Switch Mempool *Mempool evsw events.Fireable diff --git a/p2p/switch.go b/p2p/switch.go index fe5e10d63..9d3a07d46 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -295,7 +295,6 @@ func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { }(peer) } return successChan - } // Returns the count of outbound/inbound and outbound-dialing peers.