From a676c7167854b45ce8a23ccc44292cb00d1509cd Mon Sep 17 00:00:00 2001 From: kevlubkcm <36485490+kevlubkcm@users.noreply.github.com> Date: Thu, 15 Nov 2018 18:40:42 -0500 Subject: [PATCH] [R4R] Add proposer to NewRound event and proposal info to CompleteProposal event (#2767) * add proposer info to EventCompleteProposal * create separate EventData structure for CompleteProposal * cant us rs.Proposal to get BlockID because it is not guaranteed to be set yet * copying RoundState isnt helping us here * add Step back to make compatible with original RoundState event. update changelog * add NewRound event * fix test * remove unneeded RoundState * put height round step into a struct * pull out ValidatorInfo struct. add ensureProposal assert * remove height-round-state sub-struct refactor * minor fixes from review --- CHANGELOG_PENDING.md | 2 ++ consensus/common_test.go | 64 +++++++++++++++++++++++++++++++--- consensus/mempool_test.go | 6 ++-- consensus/state.go | 6 ++-- consensus/state_test.go | 7 ++-- consensus/types/round_state.go | 36 +++++++++++++++++-- types/event_bus.go | 4 +-- types/event_bus_test.go | 4 +-- types/events.go | 23 ++++++++++++ 9 files changed, 132 insertions(+), 20 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 63b277110..19ee4e7da 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -29,6 +29,8 @@ program](https://hackerone.com/tendermint). - [log] new `log_format` config option, which can be set to 'plain' for colored text or 'json' for JSON output +- [types] \#2767 New event types EventDataNewRound (with ProposerInfo) and EventDataCompleteProposal (with BlockID). (@kevlubkcm) + ### IMPROVEMENTS: ### BUG FIXES: diff --git a/consensus/common_test.go b/consensus/common_test.go index 4f48f4424..46be5cbd7 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -405,8 +405,24 @@ func ensureNewVote(voteCh <-chan interface{}, height int64, round int) { } func ensureNewRound(roundCh <-chan interface{}, height int64, round int) { - ensureNewEvent(roundCh, height, round, ensureTimeout, - "Timeout expired while waiting for NewRound event") + select { + case <-time.After(ensureTimeout): + panic("Timeout expired while waiting for NewRound event") + case ev := <-roundCh: + rs, ok := ev.(types.EventDataNewRound) + if !ok { + panic( + fmt.Sprintf( + "expected a EventDataNewRound, got %v.Wrong subscription channel?", + reflect.TypeOf(rs))) + } + if rs.Height != height { + panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height)) + } + if rs.Round != round { + panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round)) + } + } } func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, timeout int64) { @@ -416,8 +432,24 @@ func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, tim } func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) { - ensureNewEvent(proposalCh, height, round, ensureTimeout, - "Timeout expired while waiting for NewProposal event") + select { + case <-time.After(ensureTimeout): + panic("Timeout expired while waiting for NewProposal event") + case ev := <-proposalCh: + rs, ok := ev.(types.EventDataCompleteProposal) + if !ok { + panic( + fmt.Sprintf( + "expected a EventDataCompleteProposal, got %v.Wrong subscription channel?", + reflect.TypeOf(rs))) + } + if rs.Height != height { + panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height)) + } + if rs.Round != round { + panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round)) + } + } } func ensureNewValidBlock(validBlockCh <-chan interface{}, height int64, round int) { @@ -492,6 +524,30 @@ func ensureVote(voteCh <-chan interface{}, height int64, round int, } } +func ensureProposal(proposalCh <-chan interface{}, height int64, round int, propId types.BlockID) { + select { + case <-time.After(ensureTimeout): + panic("Timeout expired while waiting for NewProposal event") + case ev := <-proposalCh: + rs, ok := ev.(types.EventDataCompleteProposal) + if !ok { + panic( + fmt.Sprintf( + "expected a EventDataCompleteProposal, got %v.Wrong subscription channel?", + reflect.TypeOf(rs))) + } + if rs.Height != height { + panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height)) + } + if rs.Round != round { + panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round)) + } + if !rs.BlockID.Equals(propId) { + panic("Proposed block does not match expected block") + } + } +} + func ensurePrecommit(voteCh <-chan interface{}, height int64, round int) { ensureVote(voteCh, height, round, types.PrecommitType) } diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 3dc1cd5ff..6d36d1e74 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -71,18 +71,18 @@ func TestMempoolProgressInHigherRound(t *testing.T) { } startTestRound(cs, height, round) - ensureNewRoundStep(newRoundCh, height, round) // first round at first height + ensureNewRound(newRoundCh, height, round) // first round at first height ensureNewEventOnChannel(newBlockCh) // first block gets committed height = height + 1 // moving to the next height round = 0 - ensureNewRoundStep(newRoundCh, height, round) // first round at next height + ensureNewRound(newRoundCh, height, round) // first round at next height deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds()) round = round + 1 // moving to the next round - ensureNewRoundStep(newRoundCh, height, round) // wait for the next round + ensureNewRound(newRoundCh, height, round) // wait for the next round ensureNewEventOnChannel(newBlockCh) // now we can commit the block } diff --git a/consensus/state.go b/consensus/state.go index e8603011f..110a0e9fb 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -772,7 +772,7 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) { cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping cs.triggeredTimeoutPrecommit = false - cs.eventBus.PublishEventNewRound(cs.RoundStateEvent()) + cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()) cs.metrics.Rounds.Set(float64(round)) // Wait for txs to be available in the mempool @@ -1404,7 +1404,7 @@ func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error { return nil } - // Verify POLRound, which must be -1 or in range [0, proposal.Round). + // Verify POLRound, which must be -1 or in range [0, proposal.Round). if proposal.POLRound < -1 || (proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) { return ErrInvalidProposalPOLRound @@ -1462,7 +1462,7 @@ func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p } // NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) - cs.eventBus.PublishEventCompleteProposal(cs.RoundStateEvent()) + cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent()) // Update Valid* if we can. prevotes := cs.Votes.Prevotes(cs.Round) diff --git a/consensus/state_test.go b/consensus/state_test.go index 9bf4fada5..ddab6404a 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -197,9 +197,8 @@ func TestStateBadProposal(t *testing.T) { stateHash[0] = byte((stateHash[0] + 1) % 255) propBlock.AppHash = stateHash propBlockParts := propBlock.MakePartSet(partSize) - proposal := types.NewProposal( - vs2.Height, round, -1, - types.BlockID{propBlock.Hash(), propBlockParts.Header()}) + blockID := types.BlockID{propBlock.Hash(), propBlockParts.Header()} + proposal := types.NewProposal(vs2.Height, round, -1, blockID) if err := vs2.SignProposal(config.ChainID(), proposal); err != nil { t.Fatal("failed to sign bad proposal", err) } @@ -213,7 +212,7 @@ func TestStateBadProposal(t *testing.T) { startTestRound(cs1, height, round) // wait for proposal - ensureNewProposal(proposalCh, height, round) + ensureProposal(proposalCh, height, round, blockID) // wait for prevote ensurePrevote(voteCh, height, round) diff --git a/consensus/types/round_state.go b/consensus/types/round_state.go index ef4236118..6359a6555 100644 --- a/consensus/types/round_state.go +++ b/consensus/types/round_state.go @@ -112,18 +112,50 @@ func (rs *RoundState) RoundStateSimple() RoundStateSimple { } } +// NewRoundEvent returns the RoundState with proposer information as an event. +func (rs *RoundState) NewRoundEvent() types.EventDataNewRound { + addr := rs.Validators.GetProposer().Address + idx, _ := rs.Validators.GetByAddress(addr) + + return types.EventDataNewRound{ + Height: rs.Height, + Round: rs.Round, + Step: rs.Step.String(), + Proposer: types.ValidatorInfo{ + Address: addr, + Index: idx, + }, + } +} + +// CompleteProposalEvent returns information about a proposed block as an event. +func (rs *RoundState) CompleteProposalEvent() types.EventDataCompleteProposal { + // We must construct BlockID from ProposalBlock and ProposalBlockParts + // cs.Proposal is not guaranteed to be set when this function is called + blockId := types.BlockID{ + Hash: rs.ProposalBlock.Hash(), + PartsHeader: rs.ProposalBlockParts.Header(), + } + + return types.EventDataCompleteProposal{ + Height: rs.Height, + Round: rs.Round, + Step: rs.Step.String(), + BlockID: blockId, + } +} + // RoundStateEvent returns the H/R/S of the RoundState as an event. func (rs *RoundState) RoundStateEvent() types.EventDataRoundState { // copy the RoundState. // TODO: if we want to avoid this, we may need synchronous events after all rsCopy := *rs - edrs := types.EventDataRoundState{ + return types.EventDataRoundState{ Height: rs.Height, Round: rs.Round, Step: rs.Step.String(), RoundState: &rsCopy, } - return edrs } // String returns a string diff --git a/types/event_bus.go b/types/event_bus.go index 65206e938..fbe5ac478 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -136,11 +136,11 @@ func (b *EventBus) PublishEventTimeoutWait(data EventDataRoundState) error { return b.Publish(EventTimeoutWait, data) } -func (b *EventBus) PublishEventNewRound(data EventDataRoundState) error { +func (b *EventBus) PublishEventNewRound(data EventDataNewRound) error { return b.Publish(EventNewRound, data) } -func (b *EventBus) PublishEventCompleteProposal(data EventDataRoundState) error { +func (b *EventBus) PublishEventCompleteProposal(data EventDataCompleteProposal) error { return b.Publish(EventCompleteProposal, data) } diff --git a/types/event_bus_test.go b/types/event_bus_test.go index f0e825d5d..4056dacd4 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -96,9 +96,9 @@ func TestEventBusPublish(t *testing.T) { require.NoError(t, err) err = eventBus.PublishEventTimeoutWait(EventDataRoundState{}) require.NoError(t, err) - err = eventBus.PublishEventNewRound(EventDataRoundState{}) + err = eventBus.PublishEventNewRound(EventDataNewRound{}) require.NoError(t, err) - err = eventBus.PublishEventCompleteProposal(EventDataRoundState{}) + err = eventBus.PublishEventCompleteProposal(EventDataCompleteProposal{}) require.NoError(t, err) err = eventBus.PublishEventPolka(EventDataRoundState{}) require.NoError(t, err) diff --git a/types/events.go b/types/events.go index 33aa712ef..2f9dc76ee 100644 --- a/types/events.go +++ b/types/events.go @@ -43,6 +43,8 @@ func RegisterEventDatas(cdc *amino.Codec) { cdc.RegisterConcrete(EventDataNewBlockHeader{}, "tendermint/event/NewBlockHeader", nil) cdc.RegisterConcrete(EventDataTx{}, "tendermint/event/Tx", nil) cdc.RegisterConcrete(EventDataRoundState{}, "tendermint/event/RoundState", nil) + cdc.RegisterConcrete(EventDataNewRound{}, "tendermint/event/NewRound", nil) + cdc.RegisterConcrete(EventDataCompleteProposal{}, "tendermint/event/CompleteProposal", nil) cdc.RegisterConcrete(EventDataVote{}, "tendermint/event/Vote", nil) cdc.RegisterConcrete(EventDataProposalHeartbeat{}, "tendermint/event/ProposalHeartbeat", nil) cdc.RegisterConcrete(EventDataValidatorSetUpdates{}, "tendermint/event/ValidatorSetUpdates", nil) @@ -80,6 +82,27 @@ type EventDataRoundState struct { RoundState interface{} `json:"-"` } +type ValidatorInfo struct { + Address Address `json:"address"` + Index int `json:"index"` +} + +type EventDataNewRound struct { + Height int64 `json:"height"` + Round int `json:"round"` + Step string `json:"step"` + + Proposer ValidatorInfo `json:"proposer"` +} + +type EventDataCompleteProposal struct { + Height int64 `json:"height"` + Round int `json:"round"` + Step string `json:"step"` + + BlockID BlockID `json:"block_id"` +} + type EventDataVote struct { Vote *Vote }