You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
4.7 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package evidence
  2. import (
  3. "fmt"
  4. "github.com/tendermint/go-amino"
  5. "github.com/tendermint/tmlibs/log"
  6. "reflect"
  7. "time"
  8. "github.com/tendermint/tendermint/p2p"
  9. "github.com/tendermint/tendermint/types"
  10. )
  11. const (
  12. EvidenceChannel = byte(0x38)
  13. maxEvidenceMessageSize = 1048576 // 1MB TODO make it configurable
  14. broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often
  15. )
  16. // EvidenceReactor handles evpool evidence broadcasting amongst peers.
  17. type EvidenceReactor struct {
  18. p2p.BaseReactor
  19. evpool *EvidencePool
  20. eventBus *types.EventBus
  21. }
  22. // NewEvidenceReactor returns a new EvidenceReactor with the given config and evpool.
  23. func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor {
  24. evR := &EvidenceReactor{
  25. evpool: evpool,
  26. }
  27. evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
  28. return evR
  29. }
  30. // SetLogger sets the Logger on the reactor and the underlying Evidence.
  31. func (evR *EvidenceReactor) SetLogger(l log.Logger) {
  32. evR.Logger = l
  33. evR.evpool.SetLogger(l)
  34. }
  35. // OnStart implements cmn.Service
  36. func (evR *EvidenceReactor) OnStart() error {
  37. if err := evR.BaseReactor.OnStart(); err != nil {
  38. return err
  39. }
  40. go evR.broadcastRoutine()
  41. return nil
  42. }
  43. // GetChannels implements Reactor.
  44. // It returns the list of channels for this reactor.
  45. func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor {
  46. return []*p2p.ChannelDescriptor{
  47. &p2p.ChannelDescriptor{
  48. ID: EvidenceChannel,
  49. Priority: 5,
  50. },
  51. }
  52. }
  53. // AddPeer implements Reactor.
  54. func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) {
  55. // send the peer our high-priority evidence.
  56. // the rest will be sent by the broadcastRoutine
  57. evidences := evR.evpool.PriorityEvidence()
  58. msg := &EvidenceListMessage{evidences}
  59. success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
  60. if !success {
  61. // TODO: remove peer ?
  62. }
  63. }
  64. // RemovePeer implements Reactor.
  65. func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  66. // nothing to do
  67. }
  68. // Receive implements Reactor.
  69. // It adds any received evidence to the evpool.
  70. func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  71. msg, err := DecodeMessage(msgBytes)
  72. if err != nil {
  73. evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
  74. evR.Switch.StopPeerForError(src, err)
  75. return
  76. }
  77. evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  78. switch msg := msg.(type) {
  79. case *EvidenceListMessage:
  80. for _, ev := range msg.Evidence {
  81. err := evR.evpool.AddEvidence(ev)
  82. if err != nil {
  83. evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err)
  84. // punish peer
  85. evR.Switch.StopPeerForError(src, err)
  86. }
  87. }
  88. default:
  89. evR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  90. }
  91. }
  92. // SetEventSwitch implements events.Eventable.
  93. func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) {
  94. evR.eventBus = b
  95. }
  96. // Broadcast new evidence to all peers.
  97. // Broadcasts must be non-blocking so routine is always available to read off EvidenceChan.
  98. func (evR *EvidenceReactor) broadcastRoutine() {
  99. ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS)
  100. for {
  101. select {
  102. case evidence := <-evR.evpool.EvidenceChan():
  103. // broadcast some new evidence
  104. msg := &EvidenceListMessage{[]types.Evidence{evidence}}
  105. evR.Switch.Broadcast(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
  106. // TODO: Broadcast runs asynchronously, so this should wait on the successChan
  107. // in another routine before marking to be proper.
  108. evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence)
  109. case <-ticker.C:
  110. // broadcast all pending evidence
  111. msg := &EvidenceListMessage{evR.evpool.PendingEvidence()}
  112. evR.Switch.Broadcast(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
  113. case <-evR.Quit():
  114. return
  115. }
  116. }
  117. }
  118. //-----------------------------------------------------------------------------
  119. // Messages
  120. // EvidenceMessage is a message sent or received by the EvidenceReactor.
  121. type EvidenceMessage interface{}
  122. func RegisterEvidenceMessages(cdc *amino.Codec) {
  123. cdc.RegisterInterface((*EvidenceMessage)(nil), nil)
  124. cdc.RegisterConcrete(&EvidenceListMessage{},
  125. "tendermint/evidence/EvidenceListMessagE", nil)
  126. }
  127. // DecodeMessage decodes a byte-array into a EvidenceMessage.
  128. func DecodeMessage(bz []byte) (msg EvidenceMessage, err error) {
  129. err = cdc.UnmarshalBinaryBare(bz, &msg)
  130. return
  131. }
  132. //-------------------------------------
  133. // EvidenceMessage contains a list of evidence.
  134. type EvidenceListMessage struct {
  135. Evidence []types.Evidence
  136. }
  137. // String returns a string representation of the EvidenceListMessage.
  138. func (m *EvidenceListMessage) String() string {
  139. return fmt.Sprintf("[EvidenceListMessage %v]", m.Evidence)
  140. }