Browse Source

[R4R] Add proposer to NewRound event and proposal info to CompleteProposal event (#2767)

* add proposer info to EventCompleteProposal

* create separate EventData structure for CompleteProposal

* cant us rs.Proposal to get BlockID because it is not guaranteed to be set yet

* copying RoundState isnt helping us here

* add Step back to make compatible with original RoundState event. update changelog

* add NewRound event

* fix test

* remove unneeded RoundState

* put height round step into a struct

* pull out ValidatorInfo struct. add ensureProposal assert

* remove height-round-state sub-struct refactor

* minor fixes from review
pull/2863/head
kevlubkcm 6 years ago
committed by Ethan Buchman
parent
commit
a676c71678
9 changed files with 132 additions and 20 deletions
  1. +2
    -0
      CHANGELOG_PENDING.md
  2. +60
    -4
      consensus/common_test.go
  3. +3
    -3
      consensus/mempool_test.go
  4. +3
    -3
      consensus/state.go
  5. +3
    -4
      consensus/state_test.go
  6. +34
    -2
      consensus/types/round_state.go
  7. +2
    -2
      types/event_bus.go
  8. +2
    -2
      types/event_bus_test.go
  9. +23
    -0
      types/events.go

+ 2
- 0
CHANGELOG_PENDING.md View File

@ -29,6 +29,8 @@ program](https://hackerone.com/tendermint).
- [log] new `log_format` config option, which can be set to 'plain' for colored
text or 'json' for JSON output
- [types] \#2767 New event types EventDataNewRound (with ProposerInfo) and EventDataCompleteProposal (with BlockID). (@kevlubkcm)
### IMPROVEMENTS:
### BUG FIXES:

+ 60
- 4
consensus/common_test.go View File

@ -405,8 +405,24 @@ func ensureNewVote(voteCh <-chan interface{}, height int64, round int) {
}
func ensureNewRound(roundCh <-chan interface{}, height int64, round int) {
ensureNewEvent(roundCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewRound event")
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewRound event")
case ev := <-roundCh:
rs, ok := ev.(types.EventDataNewRound)
if !ok {
panic(
fmt.Sprintf(
"expected a EventDataNewRound, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
}
if rs.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
}
if rs.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
}
}
}
func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, timeout int64) {
@ -416,8 +432,24 @@ func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, tim
}
func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) {
ensureNewEvent(proposalCh, height, round, ensureTimeout,
"Timeout expired while waiting for NewProposal event")
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewProposal event")
case ev := <-proposalCh:
rs, ok := ev.(types.EventDataCompleteProposal)
if !ok {
panic(
fmt.Sprintf(
"expected a EventDataCompleteProposal, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
}
if rs.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
}
if rs.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
}
}
}
func ensureNewValidBlock(validBlockCh <-chan interface{}, height int64, round int) {
@ -492,6 +524,30 @@ func ensureVote(voteCh <-chan interface{}, height int64, round int,
}
}
func ensureProposal(proposalCh <-chan interface{}, height int64, round int, propId types.BlockID) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewProposal event")
case ev := <-proposalCh:
rs, ok := ev.(types.EventDataCompleteProposal)
if !ok {
panic(
fmt.Sprintf(
"expected a EventDataCompleteProposal, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
}
if rs.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
}
if rs.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
}
if !rs.BlockID.Equals(propId) {
panic("Proposed block does not match expected block")
}
}
}
func ensurePrecommit(voteCh <-chan interface{}, height int64, round int) {
ensureVote(voteCh, height, round, types.PrecommitType)
}


+ 3
- 3
consensus/mempool_test.go View File

@ -71,18 +71,18 @@ func TestMempoolProgressInHigherRound(t *testing.T) {
}
startTestRound(cs, height, round)
ensureNewRoundStep(newRoundCh, height, round) // first round at first height
ensureNewRound(newRoundCh, height, round) // first round at first height
ensureNewEventOnChannel(newBlockCh) // first block gets committed
height = height + 1 // moving to the next height
round = 0
ensureNewRoundStep(newRoundCh, height, round) // first round at next height
ensureNewRound(newRoundCh, height, round) // first round at next height
deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round
ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds())
round = round + 1 // moving to the next round
ensureNewRoundStep(newRoundCh, height, round) // wait for the next round
ensureNewRound(newRoundCh, height, round) // wait for the next round
ensureNewEventOnChannel(newBlockCh) // now we can commit the block
}


+ 3
- 3
consensus/state.go View File

@ -772,7 +772,7 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) {
cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping
cs.triggeredTimeoutPrecommit = false
cs.eventBus.PublishEventNewRound(cs.RoundStateEvent())
cs.eventBus.PublishEventNewRound(cs.NewRoundEvent())
cs.metrics.Rounds.Set(float64(round))
// Wait for txs to be available in the mempool
@ -1404,7 +1404,7 @@ func (cs *ConsensusState) defaultSetProposal(proposal *types.Proposal) error {
return nil
}
// Verify POLRound, which must be -1 or in range [0, proposal.Round).
// Verify POLRound, which must be -1 or in range [0, proposal.Round).
if proposal.POLRound < -1 ||
(proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) {
return ErrInvalidProposalPOLRound
@ -1462,7 +1462,7 @@ func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p
}
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
cs.eventBus.PublishEventCompleteProposal(cs.RoundStateEvent())
cs.eventBus.PublishEventCompleteProposal(cs.CompleteProposalEvent())
// Update Valid* if we can.
prevotes := cs.Votes.Prevotes(cs.Round)


+ 3
- 4
consensus/state_test.go View File

@ -197,9 +197,8 @@ func TestStateBadProposal(t *testing.T) {
stateHash[0] = byte((stateHash[0] + 1) % 255)
propBlock.AppHash = stateHash
propBlockParts := propBlock.MakePartSet(partSize)
proposal := types.NewProposal(
vs2.Height, round, -1,
types.BlockID{propBlock.Hash(), propBlockParts.Header()})
blockID := types.BlockID{propBlock.Hash(), propBlockParts.Header()}
proposal := types.NewProposal(vs2.Height, round, -1, blockID)
if err := vs2.SignProposal(config.ChainID(), proposal); err != nil {
t.Fatal("failed to sign bad proposal", err)
}
@ -213,7 +212,7 @@ func TestStateBadProposal(t *testing.T) {
startTestRound(cs1, height, round)
// wait for proposal
ensureNewProposal(proposalCh, height, round)
ensureProposal(proposalCh, height, round, blockID)
// wait for prevote
ensurePrevote(voteCh, height, round)


+ 34
- 2
consensus/types/round_state.go View File

@ -112,18 +112,50 @@ func (rs *RoundState) RoundStateSimple() RoundStateSimple {
}
}
// NewRoundEvent returns the RoundState with proposer information as an event.
func (rs *RoundState) NewRoundEvent() types.EventDataNewRound {
addr := rs.Validators.GetProposer().Address
idx, _ := rs.Validators.GetByAddress(addr)
return types.EventDataNewRound{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step.String(),
Proposer: types.ValidatorInfo{
Address: addr,
Index: idx,
},
}
}
// CompleteProposalEvent returns information about a proposed block as an event.
func (rs *RoundState) CompleteProposalEvent() types.EventDataCompleteProposal {
// We must construct BlockID from ProposalBlock and ProposalBlockParts
// cs.Proposal is not guaranteed to be set when this function is called
blockId := types.BlockID{
Hash: rs.ProposalBlock.Hash(),
PartsHeader: rs.ProposalBlockParts.Header(),
}
return types.EventDataCompleteProposal{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step.String(),
BlockID: blockId,
}
}
// RoundStateEvent returns the H/R/S of the RoundState as an event.
func (rs *RoundState) RoundStateEvent() types.EventDataRoundState {
// copy the RoundState.
// TODO: if we want to avoid this, we may need synchronous events after all
rsCopy := *rs
edrs := types.EventDataRoundState{
return types.EventDataRoundState{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step.String(),
RoundState: &rsCopy,
}
return edrs
}
// String returns a string


+ 2
- 2
types/event_bus.go View File

@ -136,11 +136,11 @@ func (b *EventBus) PublishEventTimeoutWait(data EventDataRoundState) error {
return b.Publish(EventTimeoutWait, data)
}
func (b *EventBus) PublishEventNewRound(data EventDataRoundState) error {
func (b *EventBus) PublishEventNewRound(data EventDataNewRound) error {
return b.Publish(EventNewRound, data)
}
func (b *EventBus) PublishEventCompleteProposal(data EventDataRoundState) error {
func (b *EventBus) PublishEventCompleteProposal(data EventDataCompleteProposal) error {
return b.Publish(EventCompleteProposal, data)
}


+ 2
- 2
types/event_bus_test.go View File

@ -96,9 +96,9 @@ func TestEventBusPublish(t *testing.T) {
require.NoError(t, err)
err = eventBus.PublishEventTimeoutWait(EventDataRoundState{})
require.NoError(t, err)
err = eventBus.PublishEventNewRound(EventDataRoundState{})
err = eventBus.PublishEventNewRound(EventDataNewRound{})
require.NoError(t, err)
err = eventBus.PublishEventCompleteProposal(EventDataRoundState{})
err = eventBus.PublishEventCompleteProposal(EventDataCompleteProposal{})
require.NoError(t, err)
err = eventBus.PublishEventPolka(EventDataRoundState{})
require.NoError(t, err)


+ 23
- 0
types/events.go View File

@ -43,6 +43,8 @@ func RegisterEventDatas(cdc *amino.Codec) {
cdc.RegisterConcrete(EventDataNewBlockHeader{}, "tendermint/event/NewBlockHeader", nil)
cdc.RegisterConcrete(EventDataTx{}, "tendermint/event/Tx", nil)
cdc.RegisterConcrete(EventDataRoundState{}, "tendermint/event/RoundState", nil)
cdc.RegisterConcrete(EventDataNewRound{}, "tendermint/event/NewRound", nil)
cdc.RegisterConcrete(EventDataCompleteProposal{}, "tendermint/event/CompleteProposal", nil)
cdc.RegisterConcrete(EventDataVote{}, "tendermint/event/Vote", nil)
cdc.RegisterConcrete(EventDataProposalHeartbeat{}, "tendermint/event/ProposalHeartbeat", nil)
cdc.RegisterConcrete(EventDataValidatorSetUpdates{}, "tendermint/event/ValidatorSetUpdates", nil)
@ -80,6 +82,27 @@ type EventDataRoundState struct {
RoundState interface{} `json:"-"`
}
type ValidatorInfo struct {
Address Address `json:"address"`
Index int `json:"index"`
}
type EventDataNewRound struct {
Height int64 `json:"height"`
Round int `json:"round"`
Step string `json:"step"`
Proposer ValidatorInfo `json:"proposer"`
}
type EventDataCompleteProposal struct {
Height int64 `json:"height"`
Round int `json:"round"`
Step string `json:"step"`
BlockID BlockID `json:"block_id"`
}
type EventDataVote struct {
Vote *Vote
}


Loading…
Cancel
Save