You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

169 lines
4.7 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package evidence
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. wire "github.com/tendermint/go-wire"
  8. "github.com/tendermint/tmlibs/log"
  9. "github.com/tendermint/tendermint/p2p"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. const (
  13. EvidenceChannel = byte(0x38)
  14. maxEvidenceMessageSize = 1048576 // 1MB TODO make it configurable
  15. broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often
  16. )
  17. // EvidenceReactor handles evpool evidence broadcasting amongst peers.
  18. type EvidenceReactor struct {
  19. p2p.BaseReactor
  20. evpool *EvidencePool
  21. eventBus *types.EventBus
  22. }
  23. // NewEvidenceReactor returns a new EvidenceReactor with the given config and evpool.
  24. func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor {
  25. evR := &EvidenceReactor{
  26. evpool: evpool,
  27. }
  28. evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
  29. return evR
  30. }
  31. // SetLogger sets the Logger on the reactor and the underlying Evidence.
  32. func (evR *EvidenceReactor) SetLogger(l log.Logger) {
  33. evR.Logger = l
  34. evR.evpool.SetLogger(l)
  35. }
  36. // OnStart implements cmn.Service
  37. func (evR *EvidenceReactor) OnStart() error {
  38. if err := evR.BaseReactor.OnStart(); err != nil {
  39. return err
  40. }
  41. go evR.broadcastRoutine()
  42. return nil
  43. }
  44. // GetChannels implements Reactor.
  45. // It returns the list of channels for this reactor.
  46. func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor {
  47. return []*p2p.ChannelDescriptor{
  48. &p2p.ChannelDescriptor{
  49. ID: EvidenceChannel,
  50. Priority: 5,
  51. },
  52. }
  53. }
  54. // AddPeer implements Reactor.
  55. func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) {
  56. // send the peer our high-priority evidence.
  57. // the rest will be sent by the broadcastRoutine
  58. evidences := evR.evpool.PriorityEvidence()
  59. msg := &EvidenceListMessage{evidences}
  60. success := peer.Send(EvidenceChannel, struct{ EvidenceMessage }{msg})
  61. if !success {
  62. // TODO: remove peer ?
  63. }
  64. }
  65. // RemovePeer implements Reactor.
  66. func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  67. // nothing to do
  68. }
  69. // Receive implements Reactor.
  70. // It adds any received evidence to the evpool.
  71. func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  72. _, msg, err := DecodeMessage(msgBytes)
  73. if err != nil {
  74. evR.Logger.Error("Error decoding message", "err", err)
  75. return
  76. }
  77. evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  78. switch msg := msg.(type) {
  79. case *EvidenceListMessage:
  80. for _, ev := range msg.Evidence {
  81. err := evR.evpool.AddEvidence(ev)
  82. if err != nil {
  83. evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err)
  84. // TODO: punish peer
  85. }
  86. }
  87. default:
  88. evR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  89. }
  90. }
  91. // SetEventSwitch implements events.Eventable.
  92. func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) {
  93. evR.eventBus = b
  94. }
  95. // Broadcast new evidence to all peers.
  96. // Broadcasts must be non-blocking so routine is always available to read off EvidenceChan.
  97. func (evR *EvidenceReactor) broadcastRoutine() {
  98. ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS)
  99. for {
  100. select {
  101. case evidence := <-evR.evpool.EvidenceChan():
  102. // broadcast some new evidence
  103. msg := &EvidenceListMessage{[]types.Evidence{evidence}}
  104. evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg})
  105. // TODO: Broadcast runs asynchronously, so this should wait on the successChan
  106. // in another routine before marking to be proper.
  107. evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence)
  108. case <-ticker.C:
  109. // broadcast all pending evidence
  110. msg := &EvidenceListMessage{evR.evpool.PendingEvidence()}
  111. evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg})
  112. case <-evR.Quit:
  113. return
  114. }
  115. }
  116. }
  117. //-----------------------------------------------------------------------------
  118. // Messages
  119. const (
  120. msgTypeEvidence = byte(0x01)
  121. )
  122. // EvidenceMessage is a message sent or received by the EvidenceReactor.
  123. type EvidenceMessage interface{}
  124. var _ = wire.RegisterInterface(
  125. struct{ EvidenceMessage }{},
  126. wire.ConcreteType{&EvidenceListMessage{}, msgTypeEvidence},
  127. )
  128. // DecodeMessage decodes a byte-array into a EvidenceMessage.
  129. func DecodeMessage(bz []byte) (msgType byte, msg EvidenceMessage, err error) {
  130. msgType = bz[0]
  131. n := new(int)
  132. r := bytes.NewReader(bz)
  133. msg = wire.ReadBinary(struct{ EvidenceMessage }{}, r, maxEvidenceMessageSize, n, &err).(struct{ EvidenceMessage }).EvidenceMessage
  134. return
  135. }
  136. //-------------------------------------
  137. // EvidenceMessage contains a list of evidence.
  138. type EvidenceListMessage struct {
  139. Evidence []types.Evidence
  140. }
  141. // String returns a string representation of the EvidenceListMessage.
  142. func (m *EvidenceListMessage) String() string {
  143. return fmt.Sprintf("[EvidenceListMessage %v]", m.Evidence)
  144. }