You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
4.8 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package evidence
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. wire "github.com/tendermint/go-wire"
  8. "github.com/tendermint/tmlibs/log"
  9. "github.com/tendermint/tendermint/p2p"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. const (
  13. EvidenceChannel = byte(0x38)
  14. maxEvidenceMessageSize = 1048576 // 1MB TODO make it configurable
  15. broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often
  16. )
  17. // EvidenceReactor handles evpool evidence broadcasting amongst peers.
  18. type EvidenceReactor struct {
  19. p2p.BaseReactor
  20. evpool *EvidencePool
  21. eventBus *types.EventBus
  22. }
  23. // NewEvidenceReactor returns a new EvidenceReactor with the given config and evpool.
  24. func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor {
  25. evR := &EvidenceReactor{
  26. evpool: evpool,
  27. }
  28. evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
  29. return evR
  30. }
  31. // SetLogger sets the Logger on the reactor and the underlying Evidence.
  32. func (evR *EvidenceReactor) SetLogger(l log.Logger) {
  33. evR.Logger = l
  34. evR.evpool.SetLogger(l)
  35. }
  36. // OnStart implements cmn.Service
  37. func (evR *EvidenceReactor) OnStart() error {
  38. if err := evR.BaseReactor.OnStart(); err != nil {
  39. return err
  40. }
  41. go evR.broadcastRoutine()
  42. return nil
  43. }
  44. // GetChannels implements Reactor.
  45. // It returns the list of channels for this reactor.
  46. func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor {
  47. return []*p2p.ChannelDescriptor{
  48. &p2p.ChannelDescriptor{
  49. ID: EvidenceChannel,
  50. Priority: 5,
  51. },
  52. }
  53. }
  54. // AddPeer implements Reactor.
  55. func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) {
  56. // send the peer our high-priority evidence.
  57. // the rest will be sent by the broadcastRoutine
  58. evidences := evR.evpool.PriorityEvidence()
  59. msg := &EvidenceListMessage{evidences}
  60. success := peer.Send(EvidenceChannel, struct{ EvidenceMessage }{msg})
  61. if !success {
  62. // TODO: remove peer ?
  63. }
  64. }
  65. // RemovePeer implements Reactor.
  66. func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  67. // nothing to do
  68. }
  69. // Receive implements Reactor.
  70. // It adds any received evidence to the evpool.
  71. func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  72. _, msg, err := DecodeMessage(msgBytes)
  73. if err != nil {
  74. evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
  75. evR.Switch.StopPeerForError(src, err)
  76. return
  77. }
  78. evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  79. switch msg := msg.(type) {
  80. case *EvidenceListMessage:
  81. for _, ev := range msg.Evidence {
  82. err := evR.evpool.AddEvidence(ev)
  83. if err != nil {
  84. evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err)
  85. // punish peer
  86. evR.Switch.StopPeerForError(src, err)
  87. }
  88. }
  89. default:
  90. evR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  91. }
  92. }
  93. // SetEventSwitch implements events.Eventable.
  94. func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) {
  95. evR.eventBus = b
  96. }
  97. // Broadcast new evidence to all peers.
  98. // Broadcasts must be non-blocking so routine is always available to read off EvidenceChan.
  99. func (evR *EvidenceReactor) broadcastRoutine() {
  100. ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS)
  101. for {
  102. select {
  103. case evidence := <-evR.evpool.EvidenceChan():
  104. // broadcast some new evidence
  105. msg := &EvidenceListMessage{[]types.Evidence{evidence}}
  106. evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg})
  107. // TODO: Broadcast runs asynchronously, so this should wait on the successChan
  108. // in another routine before marking to be proper.
  109. evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence)
  110. case <-ticker.C:
  111. // broadcast all pending evidence
  112. msg := &EvidenceListMessage{evR.evpool.PendingEvidence()}
  113. evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg})
  114. case <-evR.Quit():
  115. return
  116. }
  117. }
  118. }
  119. //-----------------------------------------------------------------------------
  120. // Messages
  121. const (
  122. msgTypeEvidence = byte(0x01)
  123. )
  124. // EvidenceMessage is a message sent or received by the EvidenceReactor.
  125. type EvidenceMessage interface{}
  126. var _ = wire.RegisterInterface(
  127. struct{ EvidenceMessage }{},
  128. wire.ConcreteType{&EvidenceListMessage{}, msgTypeEvidence},
  129. )
  130. // DecodeMessage decodes a byte-array into a EvidenceMessage.
  131. func DecodeMessage(bz []byte) (msgType byte, msg EvidenceMessage, err error) {
  132. msgType = bz[0]
  133. n := new(int)
  134. r := bytes.NewReader(bz)
  135. msg = wire.ReadBinary(struct{ EvidenceMessage }{}, r, maxEvidenceMessageSize, n, &err).(struct{ EvidenceMessage }).EvidenceMessage
  136. return
  137. }
  138. //-------------------------------------
  139. // EvidenceMessage contains a list of evidence.
  140. type EvidenceListMessage struct {
  141. Evidence []types.Evidence
  142. }
  143. // String returns a string representation of the EvidenceListMessage.
  144. func (m *EvidenceListMessage) String() string {
  145. return fmt.Sprintf("[EvidenceListMessage %v]", m.Evidence)
  146. }