You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

225 lines
6.4 KiB

7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
  1. package evidence
  2. import (
  3. "fmt"
  4. "reflect"
  5. "time"
  6. amino "github.com/tendermint/go-amino"
  7. clist "github.com/tendermint/tendermint/libs/clist"
  8. "github.com/tendermint/tendermint/libs/log"
  9. "github.com/tendermint/tendermint/p2p"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. const (
  13. EvidenceChannel = byte(0x38)
  14. maxMsgSize = 1048576 // 1MB TODO make it configurable
  15. broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often
  16. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  17. )
  18. // EvidenceReactor handles evpool evidence broadcasting amongst peers.
  19. type EvidenceReactor struct {
  20. p2p.BaseReactor
  21. evpool *EvidencePool
  22. eventBus *types.EventBus
  23. }
  24. // NewEvidenceReactor returns a new EvidenceReactor with the given config and evpool.
  25. func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor {
  26. evR := &EvidenceReactor{
  27. evpool: evpool,
  28. }
  29. evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
  30. return evR
  31. }
  32. // SetLogger sets the Logger on the reactor and the underlying Evidence.
  33. func (evR *EvidenceReactor) SetLogger(l log.Logger) {
  34. evR.Logger = l
  35. evR.evpool.SetLogger(l)
  36. }
  37. // OnStart implements cmn.Service
  38. func (evR *EvidenceReactor) OnStart() error {
  39. return evR.BaseReactor.OnStart()
  40. }
  41. // GetChannels implements Reactor.
  42. // It returns the list of channels for this reactor.
  43. func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor {
  44. return []*p2p.ChannelDescriptor{
  45. &p2p.ChannelDescriptor{
  46. ID: EvidenceChannel,
  47. Priority: 5,
  48. },
  49. }
  50. }
  51. // AddPeer implements Reactor.
  52. func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) {
  53. go evR.broadcastEvidenceRoutine(peer)
  54. }
  55. // RemovePeer implements Reactor.
  56. func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  57. // nothing to do
  58. }
  59. // Receive implements Reactor.
  60. // It adds any received evidence to the evpool.
  61. func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  62. msg, err := decodeMsg(msgBytes)
  63. if err != nil {
  64. evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
  65. evR.Switch.StopPeerForError(src, err)
  66. return
  67. }
  68. evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  69. switch msg := msg.(type) {
  70. case *EvidenceListMessage:
  71. for _, ev := range msg.Evidence {
  72. err := evR.evpool.AddEvidence(ev)
  73. if err != nil {
  74. evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err)
  75. // punish peer
  76. evR.Switch.StopPeerForError(src, err)
  77. }
  78. }
  79. default:
  80. evR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  81. }
  82. }
  83. // SetEventSwitch implements events.Eventable.
  84. func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) {
  85. evR.eventBus = b
  86. }
  87. // Modeled after the mempool routine.
  88. // - Evidence accumulates in a clist.
  89. // - Each peer has a routien that iterates through the clist,
  90. // sending available evidence to the peer.
  91. // - If we're waiting for new evidence and the list is not empty,
  92. // start iterating from the beginning again.
  93. func (evR *EvidenceReactor) broadcastEvidenceRoutine(peer p2p.Peer) {
  94. var next *clist.CElement
  95. for {
  96. // This happens because the CElement we were looking at got garbage
  97. // collected (removed). That is, .NextWait() returned nil. Go ahead and
  98. // start from the beginning.
  99. if next == nil {
  100. select {
  101. case <-evR.evpool.EvidenceWaitChan(): // Wait until evidence is available
  102. if next = evR.evpool.EvidenceFront(); next == nil {
  103. continue
  104. }
  105. case <-peer.Quit():
  106. return
  107. case <-evR.Quit():
  108. return
  109. }
  110. }
  111. ev := next.Value.(types.Evidence)
  112. msg, retry := evR.checkSendEvidenceMessage(peer, ev)
  113. if msg != nil {
  114. success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
  115. retry = !success
  116. }
  117. if retry {
  118. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  119. continue
  120. }
  121. afterCh := time.After(time.Second * broadcastEvidenceIntervalS)
  122. select {
  123. case <-afterCh:
  124. // start from the beginning every tick.
  125. // TODO: only do this if we're at the end of the list!
  126. next = nil
  127. case <-next.NextWaitChan():
  128. // see the start of the for loop for nil check
  129. next = next.Next()
  130. case <-peer.Quit():
  131. return
  132. case <-evR.Quit():
  133. return
  134. }
  135. }
  136. }
  137. // Returns the message to send the peer, or nil if the evidence is invalid for the peer.
  138. // If message is nil, return true if we should sleep and try again.
  139. func (evR EvidenceReactor) checkSendEvidenceMessage(peer p2p.Peer, ev types.Evidence) (msg EvidenceMessage, retry bool) {
  140. // make sure the peer is up to date
  141. evHeight := ev.Height()
  142. peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
  143. if !ok {
  144. evR.Logger.Info("Found peer without PeerState", "peer", peer)
  145. return nil, true
  146. }
  147. // NOTE: We only send evidence to peers where
  148. // peerHeight - maxAge < evidenceHeight < peerHeight
  149. maxAge := evR.evpool.State().ConsensusParams.EvidenceParams.MaxAge
  150. peerHeight := peerState.GetHeight()
  151. if peerHeight < evHeight {
  152. // peer is behind. sleep while he catches up
  153. return nil, true
  154. } else if peerHeight > evHeight+maxAge {
  155. // evidence is too old, skip
  156. // NOTE: if evidence is too old for an honest peer,
  157. // then we're behind and either it already got committed or it never will!
  158. evR.Logger.Info("Not sending peer old evidence", "peerHeight", peerHeight, "evHeight", evHeight, "maxAge", maxAge, "peer", peer)
  159. return nil, false
  160. }
  161. // send evidence
  162. msg = &EvidenceListMessage{[]types.Evidence{ev}}
  163. return msg, false
  164. }
  165. // PeerState describes the state of a peer.
  166. type PeerState interface {
  167. GetHeight() int64
  168. }
  169. //-----------------------------------------------------------------------------
  170. // Messages
  171. // EvidenceMessage is a message sent or received by the EvidenceReactor.
  172. type EvidenceMessage interface{}
  173. func RegisterEvidenceMessages(cdc *amino.Codec) {
  174. cdc.RegisterInterface((*EvidenceMessage)(nil), nil)
  175. cdc.RegisterConcrete(&EvidenceListMessage{},
  176. "tendermint/evidence/EvidenceListMessage", nil)
  177. }
  178. func decodeMsg(bz []byte) (msg EvidenceMessage, err error) {
  179. if len(bz) > maxMsgSize {
  180. return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize)
  181. }
  182. err = cdc.UnmarshalBinaryBare(bz, &msg)
  183. return
  184. }
  185. //-------------------------------------
  186. // EvidenceMessage contains a list of evidence.
  187. type EvidenceListMessage struct {
  188. Evidence []types.Evidence
  189. }
  190. // String returns a string representation of the EvidenceListMessage.
  191. func (m *EvidenceListMessage) String() string {
  192. return fmt.Sprintf("[EvidenceListMessage %v]", m.Evidence)
  193. }