diff --git a/consensus/common_test.go b/consensus/common_test.go index 40609c6e9..a9da1cdb6 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -78,6 +78,7 @@ func decideProposal(cs1 *ConsensusState, cs2 *validatorStub, height, round int) //------------------------------------------------------------------------------- // utils +/* func nilRound(t *testing.T, cs1 *ConsensusState, vss ...*validatorStub) { cs1.mtx.Lock() height, round := cs1.Height, cs1.Round @@ -93,6 +94,7 @@ func nilRound(t *testing.T, cs1 *ConsensusState, vss ...*validatorStub) { waitFor(t, cs1, height, round+1, RoundStepNewRound) } +*/ // NOTE: this switches the propser as far as `perspectiveOf` is concerned, // but for simplicity we return a block it generated. @@ -172,6 +174,17 @@ func signAddVoteToFrom(voteType byte, to *ConsensusState, from *validatorStub, h return vote } +func ensureNoNewStep(stepCh chan interface{}) { + timeout := time.NewTicker(ensureTimeout * time.Second) + select { + case <-timeout.C: + break + case <-stepCh: + panic("We should be stuck waiting for more votes, not moving to the next step") + } +} + +/* func ensureNoNewStep(t *testing.T, cs *ConsensusState) { timeout := time.NewTicker(ensureTimeout * time.Second) select { @@ -202,6 +215,7 @@ func waitFor(t *testing.T, cs *ConsensusState, height int, round int, step Round } } } +*/ func incrementHeight(vss ...*validatorStub) { for _, vs := range vss { @@ -309,17 +323,9 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { cs := NewConsensusState(state, proxyAppCtxCon, blockStore, mempool) cs.SetPrivValidator(privVals[0]) - // from the updateToState in NewConsensusState - <-cs.NewStepCh() - evsw := events.NewEventSwitch() cs.SetFireable(evsw) evsw.OnStart() - go func() { - for { - <-cs.NewStepCh() - } - }() // start the transition routines // cs.startRoutines() diff --git a/consensus/reactor.go b/consensus/reactor.go index 7b6100a4a..ab2fb4913 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -50,13 +50,17 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockSto func (conR *ConsensusReactor) OnStart() error { log.Notice("ConsensusReactor ", "fastSync", conR.fastSync) conR.BaseReactor.OnStart() + + // callbacks for broadcasting new steps and votes to peers + // upon their respective events (ie. uses evsw) + conR.registerEventCallbacks() + if !conR.fastSync { _, err := conR.conS.Start() if err != nil { return err } } - go conR.broadcastNewRoundStepRoutine() return nil } @@ -132,7 +136,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // NOTE: We process these messages even when we're fast_syncing. // Messages affect either a peer state or the consensus state. // Peer state updates can happen in parallel, but processing of -// proposals, block parts, and votes are ordered. +// proposals, block parts, and votes are ordered by the receiveRoutine func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) { if !conR.IsRunning() { log.Debug("Receive", "channel", chID, "peer", peer, "bytes", msgBytes) @@ -211,6 +215,47 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte } } +// Sets our private validator account for signing votes. +func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) { + conR.conS.SetPrivValidator(priv) +} + +// implements events.Eventable +func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { + conR.evsw = evsw + conR.conS.SetFireable(evsw) +} + +//-------------------------------------- + +// Listens for new steps and votes, +// broadcasting the result to peers +func (conR *ConsensusReactor) registerEventCallbacks() { + // XXX: should we change SetFireable to just use EventSwitch so we don't need these assertions? + evsw := conR.evsw.(*events.EventSwitch) + + evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) { + rs := data.(*types.EventDataRoundState) + conR.broadcastNewRoundStep(rs) + }) + + evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { + edv := data.(*types.EventDataVote) + conR.broadcastHasVoteMessage(edv.Vote, edv.Index) + }) +} + +func (conR *ConsensusReactor) broadcastNewRoundStep(rs *types.EventDataRoundState) { + + nrsMsg, csMsg := makeRoundStepMessages(rs) + if nrsMsg != nil { + conR.Switch.Broadcast(StateChannel, nrsMsg) + } + if csMsg != nil { + conR.Switch.Broadcast(StateChannel, csMsg) + } +} + // Broadcasts HasVoteMessage to peers that care. func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) { msg := &HasVoteMessage{ @@ -237,62 +282,28 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index in */ } -// Sets our private validator account for signing votes. -func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) { - conR.conS.SetPrivValidator(priv) -} - -// implements events.Eventable -func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { - conR.evsw = evsw - conR.conS.SetFireable(evsw) -} - -//-------------------------------------- - -func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { +func makeRoundStepMessages(rs *types.EventDataRoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { + step := RoundStepType(rs.Step) nrsMsg = &NewRoundStepMessage{ Height: rs.Height, Round: rs.Round, - Step: rs.Step, + Step: step, SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()), - LastCommitRound: rs.LastCommit.Round(), + LastCommitRound: rs.LastCommitRound, } - if rs.Step == RoundStepCommit { + if step == RoundStepCommit { csMsg = &CommitStepMessage{ Height: rs.Height, - BlockPartsHeader: rs.ProposalBlockParts.Header(), - BlockParts: rs.ProposalBlockParts.BitArray(), + BlockPartsHeader: rs.BlockPartsHeader, + BlockParts: rs.BlockParts, } } return } -// Listens for changes to the ConsensusState.Step by pulling -// on conR.conS.NewStepCh(). -func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() { - for { - // Get RoundState with new Step or quit. - var rs *RoundState - select { - case rs = <-conR.conS.NewStepCh(): - case <-conR.Quit: - return - } - - nrsMsg, csMsg := makeRoundStepMessages(rs) - if nrsMsg != nil { - conR.Switch.Broadcast(StateChannel, nrsMsg) - } - if csMsg != nil { - conR.Switch.Broadcast(StateChannel, csMsg) - } - } -} - func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) { rs := conR.conS.GetRoundState() - nrsMsg, csMsg := makeRoundStepMessages(rs) + nrsMsg, csMsg := makeRoundStepMessages(rs.RoundStateEvent()) if nrsMsg != nil { peer.Send(StateChannel, nrsMsg) } diff --git a/consensus/state.go b/consensus/state.go index 4f4723615..168f2e4ed 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -98,18 +98,26 @@ type RoundState struct { } func (rs *RoundState) RoundStateEvent() *types.EventDataRoundState { + var header types.PartSetHeader + var parts *BitArray + if rs.ProposalBlockParts != nil { + header = rs.ProposalBlockParts.Header() + parts = rs.ProposalBlockParts.BitArray() + } return &types.EventDataRoundState{ - CurrentTime: time.Now(), - Height: rs.Height, - Round: rs.Round, - Step: rs.Step.String(), - StartTime: rs.StartTime, - CommitTime: rs.CommitTime, - Proposal: rs.Proposal, - ProposalBlock: rs.ProposalBlock, - LockedRound: rs.LockedRound, - LockedBlock: rs.LockedBlock, - POLRound: rs.Votes.POLRound(), + CurrentTime: time.Now(), + Height: rs.Height, + Round: rs.Round, + Step: int(rs.Step), + StartTime: rs.StartTime, + CommitTime: rs.CommitTime, + Proposal: rs.Proposal, + ProposalBlock: rs.ProposalBlock, + LockedRound: rs.LockedRound, + LockedBlock: rs.LockedBlock, + POLRound: rs.Votes.POLRound(), + BlockPartsHeader: header, + BlockParts: parts, } } @@ -183,7 +191,6 @@ type ConsensusState struct { blockStore *bc.BlockStore mempool *mempl.Mempool privValidator *types.PrivValidator - newStepCh chan *RoundState mtx sync.Mutex RoundState @@ -208,7 +215,6 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore proxyAppCtx: proxyAppCtx, blockStore: blockStore, mempool: mempool, - newStepCh: make(chan *RoundState, 10), peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), timeoutTicker: new(time.Ticker), @@ -258,10 +264,6 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { cs.privValidator = priv } -func (cs *ConsensusState) NewStepCh() chan *RoundState { - return cs.newStepCh -} - func (cs *ConsensusState) OnStart() error { cs.BaseService.OnStart() @@ -466,7 +468,10 @@ func (cs *ConsensusState) updateToState(state *sm.State) { func (cs *ConsensusState) newStep() { cs.nSteps += 1 - cs.newStepCh <- cs.getRoundState() + // newStep is called by updateToStep in NewConsensusState before the evsw is set! + if cs.evsw != nil { + cs.evsw.FireEvent(types.EventStringNewRoundStep(), cs.RoundStateEvent()) + } } //----------------------------------------- @@ -529,7 +534,7 @@ func (cs *ConsensusState) stopTimer() { // receiveRoutine handles messages which may cause state transitions. // it's argument (n) is the number of messages to process before exiting - use 0 to run forever // It keeps the RoundState and is the only thing that updates it. -// Updates happen on timeouts, complete proposals, and 2/3 majorities +// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities func (cs *ConsensusState) receiveRoutine(maxSteps int) { for { if maxSteps > 0 { @@ -579,19 +584,17 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { case *VoteMessage: // attempt to add the vote and dupeout the validator if its a duplicate signature // if the vote gives us a 2/3-any or 2/3-one, we transition - added, err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey) + err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey) if err == ErrAddingVote { // TODO: punish peer } - if added { - // If rs.Height == vote.Height && rs.Round < vote.Round, - // the peer is sending us CatchupCommit precommits. - // We could make note of this and help filter in broadcastHasVoteMessage(). + // NOTE: the vote is broadcast to peers by the reactor listening + // for vote events - // XXX TODO: do this - // conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex) - } + // TODO: If rs.Height == vote.Height && rs.Round < vote.Round, + // the peer is sending us CatchupCommit precommits. + // We could make note of this and help filter in broadcastHasVoteMessage(). default: log.Warn("Unknown msg type", reflect.TypeOf(msg)) } @@ -1233,14 +1236,14 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (ad } // Attempt to add the vote. if its a duplicate signature, dupeout the validator -func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey string) (bool, error) { - added, _, err := cs.addVote(valIndex, vote, peerKey) +func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey string) error { + _, _, err := cs.addVote(valIndex, vote, peerKey) if err != nil { // If the vote height is off, we'll just ignore it, // But if it's a conflicting sig, broadcast evidence tx for slashing. // If it's otherwise invalid, punish peer. if err == ErrVoteHeightMismatch { - return added, err + return err } else if _, ok := err.(*types.ErrVoteConflictingSignature); ok { log.Warn("Found conflicting vote. Publish evidence") /* TODO @@ -1251,14 +1254,14 @@ func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey str } cs.mempool.BroadcastTx(evidenceTx) // shouldn't need to check returned err */ - return added, err + return err } else { // Probably an invalid signature. Bad peer. log.Warn("Error attempting to add vote", "error", err) - return added, ErrAddingVote + return ErrAddingVote } } - return added, nil + return nil } //----------------------------------------------------------------------------- @@ -1408,9 +1411,8 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part } vote, err := cs.signVote(type_, hash, header) if err == nil { - // NOTE: store our index in the cs so we don't have to do this every time + // TODO: store our index in the cs so we don't have to do this every time valIndex, _ := cs.Validators.GetByAddress(cs.privValidator.Address) - // _, _, err := cs.addVote(valIndex, vote, "") cs.sendInternalMessage(msgInfo{&VoteMessage{valIndex, vote}, ""}) log.Notice("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err) return vote diff --git a/consensus/state_test.go b/consensus/state_test.go index 13619965a..459af95e7 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -316,8 +316,7 @@ func TestFullRound2(t *testing.T) { // LockSuite // two validators, 4 rounds. -// val1 proposes the first 2 rounds, and is locked in the first. -// val2 proposes the next two. val1 should precommit nil on all (except first where he locks) +// two vals take turns proposing. val1 locks on first one, precommits nil on everything else func TestLockNoPOL(t *testing.T) { cs1, vss := simpleConsensusState(2) cs2 := vss[1] @@ -431,10 +430,8 @@ func TestLockNoPOL(t *testing.T) { <-voteCh // prevote - // TODO: is the round right?! - validatePrevote(t, cs1, 0, vss[0], rs.LockedBlock.Hash()) + validatePrevote(t, cs1, 2, vss[0], rs.LockedBlock.Hash()) - // TODO: quick fastforward to new round, set proposer signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header()) <-voteCh @@ -783,14 +780,14 @@ func TestLockPOLSafety1(t *testing.T) { // we should prevote what we're locked on validatePrevote(t, cs1, 2, vss[0], propBlockHash) + newStepCh := subscribeToEvent(cs1, types.EventStringNewRoundStep()) + // add prevotes from the earlier round addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) log.Warn("Done adding prevotes!") - // ensureNoNewStep(t, cs1) - // TODO: subscribe to NewStep ... - + ensureNoNewStep(newStepCh) } // 4 vals. diff --git a/types/events.go b/types/events.go index 3b1598414..b046e9656 100644 --- a/types/events.go +++ b/types/events.go @@ -3,6 +3,7 @@ package types import ( "time" + "github.com/tendermint/go-common" "github.com/tendermint/go-wire" ) @@ -17,6 +18,7 @@ func EventStringFork() string { return "Fork" } func EventStringNewBlock() string { return "NewBlock" } func EventStringNewRound() string { return "NewRound" } +func EventStringNewRoundStep() string { return "NewRoundStep" } func EventStringTimeoutPropose() string { return "TimeoutPropose" } func EventStringCompleteProposal() string { return "CompleteProposal" } func EventStringPolka() string { return "Polka" } @@ -77,16 +79,20 @@ type EventDataApp struct { type EventDataRoundState struct { CurrentTime time.Time `json:"current_time"` - Height int `json:"height"` - Round int `json:"round"` - Step string `json:"step"` - StartTime time.Time `json:"start_time"` - CommitTime time.Time `json:"commit_time"` - Proposal *Proposal `json:"proposal"` - ProposalBlock *Block `json:"proposal_block"` - LockedRound int `json:"locked_round"` - LockedBlock *Block `json:"locked_block"` - POLRound int `json:"pol_round"` + Height int `json:"height"` + Round int `json:"round"` + Step int `json:"step"` + LastCommitRound int `json:"last_commit_round"` + StartTime time.Time `json:"start_time"` + CommitTime time.Time `json:"commit_time"` + Proposal *Proposal `json:"proposal"` + ProposalBlock *Block `json:"proposal_block"` + LockedRound int `json:"locked_round"` + LockedBlock *Block `json:"locked_block"` + POLRound int `json:"pol_round"` + + BlockPartsHeader PartSetHeader `json:"block_parts_header"` + BlockParts *common.BitArray `json:"block_parts"` } type EventDataVote struct {