You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

180 lines
4.4 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. "github.com/tendermint/go-clist"
  8. . "github.com/tendermint/go-common"
  9. cfg "github.com/tendermint/go-config"
  10. "github.com/tendermint/go-events"
  11. "github.com/tendermint/go-p2p"
  12. "github.com/tendermint/go-wire"
  13. "github.com/tendermint/tendermint/types"
  14. tmsp "github.com/tendermint/tmsp/types"
  15. )
  16. const (
  17. MempoolChannel = byte(0x30)
  18. maxMempoolMessageSize = 1048576 // 1MB TODO make it configurable
  19. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  20. )
  21. // MempoolReactor handles mempool tx broadcasting amongst peers.
  22. type MempoolReactor struct {
  23. p2p.BaseReactor
  24. config cfg.Config
  25. Mempool *Mempool
  26. evsw *events.EventSwitch
  27. }
  28. func NewMempoolReactor(config cfg.Config, mempool *Mempool) *MempoolReactor {
  29. memR := &MempoolReactor{
  30. config: config,
  31. Mempool: mempool,
  32. }
  33. memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR)
  34. return memR
  35. }
  36. // Implements Reactor
  37. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  38. return []*p2p.ChannelDescriptor{
  39. &p2p.ChannelDescriptor{
  40. ID: MempoolChannel,
  41. Priority: 5,
  42. },
  43. }
  44. }
  45. // Implements Reactor
  46. func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  47. go memR.broadcastTxRoutine(peer)
  48. }
  49. // Implements Reactor
  50. func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  51. // broadcast routine checks if peer is gone and returns
  52. }
  53. // Implements Reactor
  54. func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  55. _, msg, err := DecodeMessage(msgBytes)
  56. if err != nil {
  57. log.Warn("Error decoding message", "error", err)
  58. return
  59. }
  60. log.Info("Receive", "src", src, "chId", chID, "msg", msg)
  61. switch msg := msg.(type) {
  62. case *TxMessage:
  63. err := memR.Mempool.CheckTx(msg.Tx, nil)
  64. if err != nil {
  65. // Bad, seen, or conflicting tx.
  66. log.Info("Could not add tx", "tx", msg.Tx)
  67. return
  68. } else {
  69. log.Info("Added valid tx", "tx", msg.Tx)
  70. }
  71. // broadcasting happens from go routines per peer
  72. default:
  73. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  74. }
  75. }
  76. // Just an alias for CheckTx since broadcasting happens in peer routines
  77. func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*tmsp.Response)) error {
  78. return memR.Mempool.CheckTx(tx, cb)
  79. }
  80. type PeerState interface {
  81. GetHeight() int
  82. }
  83. type Peer interface {
  84. IsRunning() bool
  85. Send(byte, interface{}) bool
  86. Get(string) interface{}
  87. }
  88. // Send new mempool txs to peer.
  89. // TODO: Handle mempool or reactor shutdown?
  90. // As is this routine may block forever if no new txs come in.
  91. func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
  92. if !memR.config.GetBool("mempool_broadcast") {
  93. return
  94. }
  95. var next *clist.CElement
  96. for {
  97. if !memR.IsRunning() {
  98. return // Quit!
  99. }
  100. if next == nil {
  101. // This happens because the CElement we were looking at got
  102. // garbage collected (removed). That is, .NextWait() returned nil.
  103. // Go ahead and start from the beginning.
  104. next = memR.Mempool.TxsFrontWait() // Wait until a tx is available
  105. }
  106. memTx := next.Value.(*mempoolTx)
  107. // make sure the peer is up to date
  108. height := memTx.Height()
  109. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  110. peerState := peerState_i.(PeerState)
  111. if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block
  112. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  113. continue
  114. }
  115. }
  116. // send memTx
  117. msg := &TxMessage{Tx: memTx.tx}
  118. success := peer.Send(MempoolChannel, struct{ MempoolMessage }{msg})
  119. if !success {
  120. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  121. continue
  122. }
  123. next = next.NextWait()
  124. continue
  125. }
  126. }
  127. // implements events.Eventable
  128. func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) {
  129. memR.evsw = evsw
  130. }
  131. //-----------------------------------------------------------------------------
  132. // Messages
  133. const (
  134. msgTypeTx = byte(0x01)
  135. )
  136. type MempoolMessage interface{}
  137. var _ = wire.RegisterInterface(
  138. struct{ MempoolMessage }{},
  139. wire.ConcreteType{&TxMessage{}, msgTypeTx},
  140. )
  141. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  142. msgType = bz[0]
  143. n := new(int)
  144. r := bytes.NewReader(bz)
  145. msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage
  146. return
  147. }
  148. //-------------------------------------
  149. type TxMessage struct {
  150. Tx types.Tx
  151. }
  152. func (m *TxMessage) String() string {
  153. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  154. }