Browse Source

broadcast proposer heartbeat msg

pull/591/head
Ethan Buchman 7 years ago
parent
commit
530626dab7
3 changed files with 59 additions and 3 deletions
  1. +30
    -0
      consensus/reactor.go
  2. +8
    -2
      consensus/state.go
  3. +21
    -1
      types/events.go

+ 30
- 0
consensus/reactor.go View File

@ -311,6 +311,21 @@ func (conR *ConsensusReactor) registerEventCallbacks() {
edv := data.Unwrap().(types.EventDataVote) edv := data.Unwrap().(types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote) conR.broadcastHasVoteMessage(edv.Vote)
}) })
types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposerHeartbeat(), func(data types.TMEventData) {
heartbeat := data.Unwrap().(types.EventDataProposerHeartbeat)
conR.broadcastProposerHeartbeatMessage(heartbeat)
})
}
func (conR *ConsensusReactor) broadcastProposerHeartbeatMessage(heartbeat types.EventDataProposerHeartbeat) {
msg := &ProposerHeartbeatMessage{
Height: heartbeat.Height,
Round: heartbeat.Round,
Proposer: heartbeat.Proposer,
Sequence: heartbeat.Sequence,
}
conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
} }
func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) { func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) {
@ -1305,3 +1320,18 @@ type VoteSetBitsMessage struct {
func (m *VoteSetBitsMessage) String() string { func (m *VoteSetBitsMessage) String() string {
return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes) return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
} }
//-------------------------------------
// ProposerHeartbeatMessage is sent to signal that the proposer is alive and waiting for transactions
type ProposerHeartbeatMessage struct {
Height int
Round int
Proposer []byte
Sequence int
}
// String returns a string representation.
func (m *ProposerHeartbeatMessage) String() string {
return fmt.Sprintf("[HEARTBEAT %v/%02d %X %d]", m.Height, m.Round, m.Proposer, m.Sequence)
}

+ 8
- 2
consensus/state.go View File

@ -808,11 +808,17 @@ func (cs *ConsensusState) needProofBlock(height int) bool {
} }
func (cs *ConsensusState) proposalHeartbeat() { func (cs *ConsensusState) proposalHeartbeat() {
counter := 0
addr := cs.privValidator.GetAddress()
for { for {
select { select {
default: default:
// TODO: broadcast heartbeat
if cs.evsw != nil {
rs := cs.RoundStateEvent()
heartbeat := types.EventDataProposerHeartbeat{rs, addr, counter}
types.FireEventProposerHeartbeat(cs.evsw, heartbeat)
counter += 1
}
time.Sleep(time.Second) time.Sleep(time.Second)
} }
} }


+ 21
- 1
types/events.go View File

@ -31,6 +31,8 @@ func EventStringRelock() string { return "Relock" }
func EventStringTimeoutWait() string { return "TimeoutWait" } func EventStringTimeoutWait() string { return "TimeoutWait" }
func EventStringVote() string { return "Vote" } func EventStringVote() string { return "Vote" }
func EventStringProposerHeartbeat() string { return "ProposerHeartbeat" }
//---------------------------------------- //----------------------------------------
var ( var (
@ -39,6 +41,8 @@ var (
EventDataNameTx = "tx" EventDataNameTx = "tx"
EventDataNameRoundState = "round_state" EventDataNameRoundState = "round_state"
EventDataNameVote = "vote" EventDataNameVote = "vote"
EventDataNameProposerHeartbeat = "proposer_heartbeat"
) )
//---------------------------------------- //----------------------------------------
@ -84,6 +88,8 @@ const (
EventDataTypeRoundState = byte(0x11) EventDataTypeRoundState = byte(0x11)
EventDataTypeVote = byte(0x12) EventDataTypeVote = byte(0x12)
EventDataTypeProposerHeartbeat = byte(0x20)
) )
var tmEventDataMapper = data.NewMapper(TMEventData{}). var tmEventDataMapper = data.NewMapper(TMEventData{}).
@ -91,7 +97,8 @@ var tmEventDataMapper = data.NewMapper(TMEventData{}).
RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader). RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader).
RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx). RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx).
RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState). RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState).
RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote)
RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote).
RegisterImplementation(EventDataProposerHeartbeat{}, EventDataNameProposerHeartbeat, EventDataTypeProposerHeartbeat)
// Most event messages are basic types (a block, a transaction) // Most event messages are basic types (a block, a transaction)
// but some (an input to a call tx or a receive) are more exotic // but some (an input to a call tx or a receive) are more exotic
@ -115,6 +122,13 @@ type EventDataTx struct {
Error string `json:"error"` // this is redundant information for now Error string `json:"error"` // this is redundant information for now
} }
type EventDataProposerHeartbeat struct {
EventDataRoundState
Proposer []byte `json:"proposer"`
Sequence int `json:"sequence"`
}
// NOTE: This goes into the replay WAL // NOTE: This goes into the replay WAL
type EventDataRoundState struct { type EventDataRoundState struct {
Height int `json:"height"` Height int `json:"height"`
@ -135,6 +149,8 @@ func (_ EventDataTx) AssertIsTMEventData() {}
func (_ EventDataRoundState) AssertIsTMEventData() {} func (_ EventDataRoundState) AssertIsTMEventData() {}
func (_ EventDataVote) AssertIsTMEventData() {} func (_ EventDataVote) AssertIsTMEventData() {}
func (_ EventDataProposerHeartbeat) AssertIsTMEventData() {}
//---------------------------------------- //----------------------------------------
// Wrappers for type safety // Wrappers for type safety
@ -232,3 +248,7 @@ func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) {
func FireEventLock(fireable events.Fireable, rs EventDataRoundState) { func FireEventLock(fireable events.Fireable, rs EventDataRoundState) {
fireEvent(fireable, EventStringLock(), TMEventData{rs}) fireEvent(fireable, EventStringLock(), TMEventData{rs})
} }
func FireEventProposerHeartbeat(fireable events.Fireable, rs EventDataProposerHeartbeat) {
fireEvent(fireable, EventStringProposerHeartbeat(), TMEventData{rs})
}

Loading…
Cancel
Save