You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

173 lines
4.9 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package evpool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. wire "github.com/tendermint/go-wire"
  8. "github.com/tendermint/tmlibs/log"
  9. cfg "github.com/tendermint/tendermint/config"
  10. "github.com/tendermint/tendermint/p2p"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. const (
  14. EvidenceChannel = byte(0x38)
  15. maxEvidenceMessageSize = 1048576 // 1MB TODO make it configurable
  16. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  17. broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often
  18. )
  19. // EvidenceReactor handles evpool evidence broadcasting amongst peers.
  20. type EvidenceReactor struct {
  21. p2p.BaseReactor
  22. config *cfg.EvidenceConfig
  23. evpool *EvidencePool
  24. eventBus *types.EventBus
  25. }
  26. // NewEvidenceReactor returns a new EvidenceReactor with the given config and evpool.
  27. func NewEvidenceReactor(config *cfg.EvidenceConfig, evpool *EvidencePool) *EvidenceReactor {
  28. evR := &EvidenceReactor{
  29. config: config,
  30. evpool: evpool,
  31. }
  32. evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
  33. return evR
  34. }
  35. // SetLogger sets the Logger on the reactor and the underlying Evidence.
  36. func (evR *EvidenceReactor) SetLogger(l log.Logger) {
  37. evR.Logger = l
  38. evR.evpool.SetLogger(l)
  39. }
  40. // OnStart implements cmn.Service
  41. func (evR *EvidenceReactor) OnStart() error {
  42. if err := evR.BaseReactor.OnStart(); err != nil {
  43. return err
  44. }
  45. go evR.broadcastRoutine()
  46. return nil
  47. }
  48. // GetChannels implements Reactor.
  49. // It returns the list of channels for this reactor.
  50. func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor {
  51. return []*p2p.ChannelDescriptor{
  52. &p2p.ChannelDescriptor{
  53. ID: EvidenceChannel,
  54. Priority: 5,
  55. },
  56. }
  57. }
  58. // AddPeer implements Reactor.
  59. func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) {
  60. // send the peer our high-priority evidence.
  61. // the rest will be sent by the broadcastRoutine
  62. evidence := evR.evpool.PriorityEvidence()
  63. msg := EvidenceListMessage{evidence}
  64. success := peer.Send(EvidenceChannel, struct{ EvidenceMessage }{msg})
  65. if !success {
  66. // TODO: remove peer ?
  67. }
  68. }
  69. // RemovePeer implements Reactor.
  70. func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  71. // nothing to do
  72. }
  73. // Receive implements Reactor.
  74. // It adds any received evidence to the evpool.
  75. func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  76. _, msg, err := DecodeMessage(msgBytes)
  77. if err != nil {
  78. evR.Logger.Error("Error decoding message", "err", err)
  79. return
  80. }
  81. evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  82. switch msg := msg.(type) {
  83. case *EvidenceListMessage:
  84. for _, ev := range msg.Evidence {
  85. err := evR.evpool.AddEvidence(ev)
  86. if err != nil {
  87. evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err)
  88. // TODO: punish peer
  89. }
  90. }
  91. default:
  92. evR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  93. }
  94. }
  95. // SetEventSwitch implements events.Eventable.
  96. func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) {
  97. evR.eventBus = b
  98. }
  99. // broadcast new evidence to all peers.
  100. // broadcasts must be non-blocking so routine is always available to read off EvidenceChan.
  101. func (evR *EvidenceReactor) broadcastRoutine() {
  102. ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS)
  103. for {
  104. select {
  105. case evidence := <-evR.evpool.EvidenceChan():
  106. // broadcast some new evidence
  107. msg := EvidenceListMessage{[]types.Evidence{evidence}}
  108. evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg})
  109. // TODO: Broadcast runs asynchronously, so this should wait on the successChan
  110. // in another routine before marking to be proper.
  111. evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence)
  112. case <-ticker.C:
  113. // broadcast all pending evidence
  114. msg := EvidenceListMessage{evR.evpool.PendingEvidence()}
  115. evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg})
  116. case <-evR.Quit:
  117. return
  118. }
  119. }
  120. }
  121. //-----------------------------------------------------------------------------
  122. // Messages
  123. const (
  124. msgTypeEvidence = byte(0x01)
  125. )
  126. // EvidenceMessage is a message sent or received by the EvidenceReactor.
  127. type EvidenceMessage interface{}
  128. var _ = wire.RegisterInterface(
  129. struct{ EvidenceMessage }{},
  130. wire.ConcreteType{&EvidenceListMessage{}, msgTypeEvidence},
  131. )
  132. // DecodeMessage decodes a byte-array into a EvidenceMessage.
  133. func DecodeMessage(bz []byte) (msgType byte, msg EvidenceMessage, err error) {
  134. msgType = bz[0]
  135. n := new(int)
  136. r := bytes.NewReader(bz)
  137. msg = wire.ReadBinary(struct{ EvidenceMessage }{}, r, maxEvidenceMessageSize, n, &err).(struct{ EvidenceMessage }).EvidenceMessage
  138. return
  139. }
  140. //-------------------------------------
  141. // EvidenceMessage contains a list of evidence.
  142. type EvidenceListMessage struct {
  143. Evidence []types.Evidence
  144. }
  145. // String returns a string representation of the EvidenceListMessage.
  146. func (m *EvidenceListMessage) String() string {
  147. return fmt.Sprintf("[EvidenceListMessage %v]", m.Evidence)
  148. }