Browse Source

conR uses events to trigger newstep & hasvote broadcasts

pull/167/head
Ethan Buchman 9 years ago
parent
commit
4483971776
5 changed files with 127 additions and 105 deletions
  1. +14
    -8
      consensus/common_test.go
  2. +55
    -44
      consensus/reactor.go
  3. +37
    -35
      consensus/state.go
  4. +5
    -8
      consensus/state_test.go
  5. +16
    -10
      types/events.go

+ 14
- 8
consensus/common_test.go View File

@ -78,6 +78,7 @@ func decideProposal(cs1 *ConsensusState, cs2 *validatorStub, height, round int)
//-------------------------------------------------------------------------------
// utils
/*
func nilRound(t *testing.T, cs1 *ConsensusState, vss ...*validatorStub) {
cs1.mtx.Lock()
height, round := cs1.Height, cs1.Round
@ -93,6 +94,7 @@ func nilRound(t *testing.T, cs1 *ConsensusState, vss ...*validatorStub) {
waitFor(t, cs1, height, round+1, RoundStepNewRound)
}
*/
// NOTE: this switches the propser as far as `perspectiveOf` is concerned,
// but for simplicity we return a block it generated.
@ -172,6 +174,17 @@ func signAddVoteToFrom(voteType byte, to *ConsensusState, from *validatorStub, h
return vote
}
func ensureNoNewStep(stepCh chan interface{}) {
timeout := time.NewTicker(ensureTimeout * time.Second)
select {
case <-timeout.C:
break
case <-stepCh:
panic("We should be stuck waiting for more votes, not moving to the next step")
}
}
/*
func ensureNoNewStep(t *testing.T, cs *ConsensusState) {
timeout := time.NewTicker(ensureTimeout * time.Second)
select {
@ -202,6 +215,7 @@ func waitFor(t *testing.T, cs *ConsensusState, height int, round int, step Round
}
}
}
*/
func incrementHeight(vss ...*validatorStub) {
for _, vs := range vss {
@ -309,17 +323,9 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
cs := NewConsensusState(state, proxyAppCtxCon, blockStore, mempool)
cs.SetPrivValidator(privVals[0])
// from the updateToState in NewConsensusState
<-cs.NewStepCh()
evsw := events.NewEventSwitch()
cs.SetFireable(evsw)
evsw.OnStart()
go func() {
for {
<-cs.NewStepCh()
}
}()
// start the transition routines
// cs.startRoutines()


+ 55
- 44
consensus/reactor.go View File

@ -50,13 +50,17 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockSto
func (conR *ConsensusReactor) OnStart() error {
log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
conR.BaseReactor.OnStart()
// callbacks for broadcasting new steps and votes to peers
// upon their respective events (ie. uses evsw)
conR.registerEventCallbacks()
if !conR.fastSync {
_, err := conR.conS.Start()
if err != nil {
return err
}
}
go conR.broadcastNewRoundStepRoutine()
return nil
}
@ -132,7 +136,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// NOTE: We process these messages even when we're fast_syncing.
// Messages affect either a peer state or the consensus state.
// Peer state updates can happen in parallel, but processing of
// proposals, block parts, and votes are ordered.
// proposals, block parts, and votes are ordered by the receiveRoutine
func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) {
if !conR.IsRunning() {
log.Debug("Receive", "channel", chID, "peer", peer, "bytes", msgBytes)
@ -211,6 +215,47 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte
}
}
// Sets our private validator account for signing votes.
func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
conR.conS.SetPrivValidator(priv)
}
// implements events.Eventable
func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
conR.evsw = evsw
conR.conS.SetFireable(evsw)
}
//--------------------------------------
// Listens for new steps and votes,
// broadcasting the result to peers
func (conR *ConsensusReactor) registerEventCallbacks() {
// XXX: should we change SetFireable to just use EventSwitch so we don't need these assertions?
evsw := conR.evsw.(*events.EventSwitch)
evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) {
rs := data.(*types.EventDataRoundState)
conR.broadcastNewRoundStep(rs)
})
evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) {
edv := data.(*types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote, edv.Index)
})
}
func (conR *ConsensusReactor) broadcastNewRoundStep(rs *types.EventDataRoundState) {
nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil {
conR.Switch.Broadcast(StateChannel, nrsMsg)
}
if csMsg != nil {
conR.Switch.Broadcast(StateChannel, csMsg)
}
}
// Broadcasts HasVoteMessage to peers that care.
func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
msg := &HasVoteMessage{
@ -237,62 +282,28 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index in
*/
}
// Sets our private validator account for signing votes.
func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
conR.conS.SetPrivValidator(priv)
}
// implements events.Eventable
func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
conR.evsw = evsw
conR.conS.SetFireable(evsw)
}
//--------------------------------------
func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
func makeRoundStepMessages(rs *types.EventDataRoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
step := RoundStepType(rs.Step)
nrsMsg = &NewRoundStepMessage{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
Step: step,
SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
LastCommitRound: rs.LastCommit.Round(),
LastCommitRound: rs.LastCommitRound,
}
if rs.Step == RoundStepCommit {
if step == RoundStepCommit {
csMsg = &CommitStepMessage{
Height: rs.Height,
BlockPartsHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.ProposalBlockParts.BitArray(),
BlockPartsHeader: rs.BlockPartsHeader,
BlockParts: rs.BlockParts,
}
}
return
}
// Listens for changes to the ConsensusState.Step by pulling
// on conR.conS.NewStepCh().
func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
for {
// Get RoundState with new Step or quit.
var rs *RoundState
select {
case rs = <-conR.conS.NewStepCh():
case <-conR.Quit:
return
}
nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil {
conR.Switch.Broadcast(StateChannel, nrsMsg)
}
if csMsg != nil {
conR.Switch.Broadcast(StateChannel, csMsg)
}
}
}
func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
rs := conR.conS.GetRoundState()
nrsMsg, csMsg := makeRoundStepMessages(rs)
nrsMsg, csMsg := makeRoundStepMessages(rs.RoundStateEvent())
if nrsMsg != nil {
peer.Send(StateChannel, nrsMsg)
}


+ 37
- 35
consensus/state.go View File

@ -98,18 +98,26 @@ type RoundState struct {
}
func (rs *RoundState) RoundStateEvent() *types.EventDataRoundState {
var header types.PartSetHeader
var parts *BitArray
if rs.ProposalBlockParts != nil {
header = rs.ProposalBlockParts.Header()
parts = rs.ProposalBlockParts.BitArray()
}
return &types.EventDataRoundState{
CurrentTime: time.Now(),
Height: rs.Height,
Round: rs.Round,
Step: rs.Step.String(),
StartTime: rs.StartTime,
CommitTime: rs.CommitTime,
Proposal: rs.Proposal,
ProposalBlock: rs.ProposalBlock,
LockedRound: rs.LockedRound,
LockedBlock: rs.LockedBlock,
POLRound: rs.Votes.POLRound(),
CurrentTime: time.Now(),
Height: rs.Height,
Round: rs.Round,
Step: int(rs.Step),
StartTime: rs.StartTime,
CommitTime: rs.CommitTime,
Proposal: rs.Proposal,
ProposalBlock: rs.ProposalBlock,
LockedRound: rs.LockedRound,
LockedBlock: rs.LockedBlock,
POLRound: rs.Votes.POLRound(),
BlockPartsHeader: header,
BlockParts: parts,
}
}
@ -183,7 +191,6 @@ type ConsensusState struct {
blockStore *bc.BlockStore
mempool *mempl.Mempool
privValidator *types.PrivValidator
newStepCh chan *RoundState
mtx sync.Mutex
RoundState
@ -208,7 +215,6 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore
proxyAppCtx: proxyAppCtx,
blockStore: blockStore,
mempool: mempool,
newStepCh: make(chan *RoundState, 10),
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
timeoutTicker: new(time.Ticker),
@ -258,10 +264,6 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) {
cs.privValidator = priv
}
func (cs *ConsensusState) NewStepCh() chan *RoundState {
return cs.newStepCh
}
func (cs *ConsensusState) OnStart() error {
cs.BaseService.OnStart()
@ -466,7 +468,10 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
func (cs *ConsensusState) newStep() {
cs.nSteps += 1
cs.newStepCh <- cs.getRoundState()
// newStep is called by updateToStep in NewConsensusState before the evsw is set!
if cs.evsw != nil {
cs.evsw.FireEvent(types.EventStringNewRoundStep(), cs.RoundStateEvent())
}
}
//-----------------------------------------
@ -529,7 +534,7 @@ func (cs *ConsensusState) stopTimer() {
// receiveRoutine handles messages which may cause state transitions.
// it's argument (n) is the number of messages to process before exiting - use 0 to run forever
// It keeps the RoundState and is the only thing that updates it.
// Updates happen on timeouts, complete proposals, and 2/3 majorities
// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities
func (cs *ConsensusState) receiveRoutine(maxSteps int) {
for {
if maxSteps > 0 {
@ -579,19 +584,17 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
case *VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature
// if the vote gives us a 2/3-any or 2/3-one, we transition
added, err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey)
err := cs.tryAddVote(msg.ValidatorIndex, msg.Vote, peerKey)
if err == ErrAddingVote {
// TODO: punish peer
}
if added {
// If rs.Height == vote.Height && rs.Round < vote.Round,
// the peer is sending us CatchupCommit precommits.
// We could make note of this and help filter in broadcastHasVoteMessage().
// NOTE: the vote is broadcast to peers by the reactor listening
// for vote events
// XXX TODO: do this
// conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex)
}
// TODO: If rs.Height == vote.Height && rs.Round < vote.Round,
// the peer is sending us CatchupCommit precommits.
// We could make note of this and help filter in broadcastHasVoteMessage().
default:
log.Warn("Unknown msg type", reflect.TypeOf(msg))
}
@ -1233,14 +1236,14 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (ad
}
// Attempt to add the vote. if its a duplicate signature, dupeout the validator
func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey string) (bool, error) {
added, _, err := cs.addVote(valIndex, vote, peerKey)
func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey string) error {
_, _, err := cs.addVote(valIndex, vote, peerKey)
if err != nil {
// If the vote height is off, we'll just ignore it,
// But if it's a conflicting sig, broadcast evidence tx for slashing.
// If it's otherwise invalid, punish peer.
if err == ErrVoteHeightMismatch {
return added, err
return err
} else if _, ok := err.(*types.ErrVoteConflictingSignature); ok {
log.Warn("Found conflicting vote. Publish evidence")
/* TODO
@ -1251,14 +1254,14 @@ func (cs *ConsensusState) tryAddVote(valIndex int, vote *types.Vote, peerKey str
}
cs.mempool.BroadcastTx(evidenceTx) // shouldn't need to check returned err
*/
return added, err
return err
} else {
// Probably an invalid signature. Bad peer.
log.Warn("Error attempting to add vote", "error", err)
return added, ErrAddingVote
return ErrAddingVote
}
}
return added, nil
return nil
}
//-----------------------------------------------------------------------------
@ -1408,9 +1411,8 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part
}
vote, err := cs.signVote(type_, hash, header)
if err == nil {
// NOTE: store our index in the cs so we don't have to do this every time
// TODO: store our index in the cs so we don't have to do this every time
valIndex, _ := cs.Validators.GetByAddress(cs.privValidator.Address)
// _, _, err := cs.addVote(valIndex, vote, "")
cs.sendInternalMessage(msgInfo{&VoteMessage{valIndex, vote}, ""})
log.Notice("Signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)
return vote


+ 5
- 8
consensus/state_test.go View File

@ -316,8 +316,7 @@ func TestFullRound2(t *testing.T) {
// LockSuite
// two validators, 4 rounds.
// val1 proposes the first 2 rounds, and is locked in the first.
// val2 proposes the next two. val1 should precommit nil on all (except first where he locks)
// two vals take turns proposing. val1 locks on first one, precommits nil on everything else
func TestLockNoPOL(t *testing.T) {
cs1, vss := simpleConsensusState(2)
cs2 := vss[1]
@ -431,10 +430,8 @@ func TestLockNoPOL(t *testing.T) {
<-voteCh // prevote
// TODO: is the round right?!
validatePrevote(t, cs1, 0, vss[0], rs.LockedBlock.Hash())
validatePrevote(t, cs1, 2, vss[0], rs.LockedBlock.Hash())
// TODO: quick fastforward to new round, set proposer
signAddVoteToFrom(types.VoteTypePrevote, cs1, cs2, hash, rs.ProposalBlock.MakePartSet().Header())
<-voteCh
@ -783,14 +780,14 @@ func TestLockPOLSafety1(t *testing.T) {
// we should prevote what we're locked on
validatePrevote(t, cs1, 2, vss[0], propBlockHash)
newStepCh := subscribeToEvent(cs1, types.EventStringNewRoundStep())
// add prevotes from the earlier round
addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4)
log.Warn("Done adding prevotes!")
// ensureNoNewStep(t, cs1)
// TODO: subscribe to NewStep ...
ensureNoNewStep(newStepCh)
}
// 4 vals.


+ 16
- 10
types/events.go View File

@ -3,6 +3,7 @@ package types
import (
"time"
"github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
)
@ -17,6 +18,7 @@ func EventStringFork() string { return "Fork" }
func EventStringNewBlock() string { return "NewBlock" }
func EventStringNewRound() string { return "NewRound" }
func EventStringNewRoundStep() string { return "NewRoundStep" }
func EventStringTimeoutPropose() string { return "TimeoutPropose" }
func EventStringCompleteProposal() string { return "CompleteProposal" }
func EventStringPolka() string { return "Polka" }
@ -77,16 +79,20 @@ type EventDataApp struct {
type EventDataRoundState struct {
CurrentTime time.Time `json:"current_time"`
Height int `json:"height"`
Round int `json:"round"`
Step string `json:"step"`
StartTime time.Time `json:"start_time"`
CommitTime time.Time `json:"commit_time"`
Proposal *Proposal `json:"proposal"`
ProposalBlock *Block `json:"proposal_block"`
LockedRound int `json:"locked_round"`
LockedBlock *Block `json:"locked_block"`
POLRound int `json:"pol_round"`
Height int `json:"height"`
Round int `json:"round"`
Step int `json:"step"`
LastCommitRound int `json:"last_commit_round"`
StartTime time.Time `json:"start_time"`
CommitTime time.Time `json:"commit_time"`
Proposal *Proposal `json:"proposal"`
ProposalBlock *Block `json:"proposal_block"`
LockedRound int `json:"locked_round"`
LockedBlock *Block `json:"locked_block"`
POLRound int `json:"pol_round"`
BlockPartsHeader PartSetHeader `json:"block_parts_header"`
BlockParts *common.BitArray `json:"block_parts"`
}
type EventDataVote struct {


Loading…
Cancel
Save