diff --git a/consensus/reactor.go b/consensus/reactor.go index ce25442fe..cd0ebbd3f 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -311,6 +311,21 @@ func (conR *ConsensusReactor) registerEventCallbacks() { edv := data.Unwrap().(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote) }) + + types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposerHeartbeat(), func(data types.TMEventData) { + heartbeat := data.Unwrap().(types.EventDataProposerHeartbeat) + conR.broadcastProposerHeartbeatMessage(heartbeat) + }) +} + +func (conR *ConsensusReactor) broadcastProposerHeartbeatMessage(heartbeat types.EventDataProposerHeartbeat) { + msg := &ProposerHeartbeatMessage{ + Height: heartbeat.Height, + Round: heartbeat.Round, + Proposer: heartbeat.Proposer, + Sequence: heartbeat.Sequence, + } + conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg}) } func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) { @@ -1305,3 +1320,18 @@ type VoteSetBitsMessage struct { func (m *VoteSetBitsMessage) String() string { return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes) } + +//------------------------------------- + +// ProposerHeartbeatMessage is sent to signal that the proposer is alive and waiting for transactions +type ProposerHeartbeatMessage struct { + Height int + Round int + Proposer []byte + Sequence int +} + +// String returns a string representation. +func (m *ProposerHeartbeatMessage) String() string { + return fmt.Sprintf("[HEARTBEAT %v/%02d %X %d]", m.Height, m.Round, m.Proposer, m.Sequence) +} diff --git a/consensus/state.go b/consensus/state.go index 4695a6304..222e2eb10 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -808,11 +808,17 @@ func (cs *ConsensusState) needProofBlock(height int) bool { } func (cs *ConsensusState) proposalHeartbeat() { + counter := 0 + addr := cs.privValidator.GetAddress() for { select { default: - // TODO: broadcast heartbeat - + if cs.evsw != nil { + rs := cs.RoundStateEvent() + heartbeat := types.EventDataProposerHeartbeat{rs, addr, counter} + types.FireEventProposerHeartbeat(cs.evsw, heartbeat) + counter += 1 + } time.Sleep(time.Second) } } diff --git a/types/events.go b/types/events.go index 8c29c4445..625e0aa9d 100644 --- a/types/events.go +++ b/types/events.go @@ -31,6 +31,8 @@ func EventStringRelock() string { return "Relock" } func EventStringTimeoutWait() string { return "TimeoutWait" } func EventStringVote() string { return "Vote" } +func EventStringProposerHeartbeat() string { return "ProposerHeartbeat" } + //---------------------------------------- var ( @@ -39,6 +41,8 @@ var ( EventDataNameTx = "tx" EventDataNameRoundState = "round_state" EventDataNameVote = "vote" + + EventDataNameProposerHeartbeat = "proposer_heartbeat" ) //---------------------------------------- @@ -84,6 +88,8 @@ const ( EventDataTypeRoundState = byte(0x11) EventDataTypeVote = byte(0x12) + + EventDataTypeProposerHeartbeat = byte(0x20) ) var tmEventDataMapper = data.NewMapper(TMEventData{}). @@ -91,7 +97,8 @@ var tmEventDataMapper = data.NewMapper(TMEventData{}). RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader). RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx). RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState). - RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote) + RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote). + RegisterImplementation(EventDataProposerHeartbeat{}, EventDataNameProposerHeartbeat, EventDataTypeProposerHeartbeat) // Most event messages are basic types (a block, a transaction) // but some (an input to a call tx or a receive) are more exotic @@ -115,6 +122,13 @@ type EventDataTx struct { Error string `json:"error"` // this is redundant information for now } +type EventDataProposerHeartbeat struct { + EventDataRoundState + + Proposer []byte `json:"proposer"` + Sequence int `json:"sequence"` +} + // NOTE: This goes into the replay WAL type EventDataRoundState struct { Height int `json:"height"` @@ -135,6 +149,8 @@ func (_ EventDataTx) AssertIsTMEventData() {} func (_ EventDataRoundState) AssertIsTMEventData() {} func (_ EventDataVote) AssertIsTMEventData() {} +func (_ EventDataProposerHeartbeat) AssertIsTMEventData() {} + //---------------------------------------- // Wrappers for type safety @@ -232,3 +248,7 @@ func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) { func FireEventLock(fireable events.Fireable, rs EventDataRoundState) { fireEvent(fireable, EventStringLock(), TMEventData{rs}) } + +func FireEventProposerHeartbeat(fireable events.Fireable, rs EventDataProposerHeartbeat) { + fireEvent(fireable, EventStringProposerHeartbeat(), TMEventData{rs}) +}