You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

177 lines
4.3 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. "github.com/tendermint/go-clist"
  8. . "github.com/tendermint/go-common"
  9. "github.com/tendermint/go-events"
  10. "github.com/tendermint/go-p2p"
  11. "github.com/tendermint/go-wire"
  12. "github.com/tendermint/tendermint/types"
  13. tmsp "github.com/tendermint/tmsp/types"
  14. )
  15. const (
  16. MempoolChannel = byte(0x30)
  17. maxMempoolMessageSize = 1048576 // 1MB TODO make it configurable
  18. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  19. )
  20. // MempoolReactor handles mempool tx broadcasting amongst peers.
  21. type MempoolReactor struct {
  22. p2p.BaseReactor
  23. Mempool *Mempool // TODO: un-expose
  24. evsw *events.EventSwitch
  25. }
  26. func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
  27. memR := &MempoolReactor{
  28. Mempool: mempool,
  29. }
  30. memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR)
  31. return memR
  32. }
  33. // Implements Reactor
  34. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  35. return []*p2p.ChannelDescriptor{
  36. &p2p.ChannelDescriptor{
  37. ID: MempoolChannel,
  38. Priority: 5,
  39. },
  40. }
  41. }
  42. // Implements Reactor
  43. func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  44. go memR.broadcastTxRoutine(peer)
  45. }
  46. // Implements Reactor
  47. func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  48. // broadcast routine checks if peer is gone and returns
  49. }
  50. // Implements Reactor
  51. func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  52. _, msg, err := DecodeMessage(msgBytes)
  53. if err != nil {
  54. log.Warn("Error decoding message", "error", err)
  55. return
  56. }
  57. log.Info("Receive", "src", src, "chId", chID, "msg", msg)
  58. switch msg := msg.(type) {
  59. case *TxMessage:
  60. err := memR.Mempool.CheckTx(msg.Tx, nil)
  61. if err != nil {
  62. // Bad, seen, or conflicting tx.
  63. log.Info("Could not add tx", "tx", msg.Tx)
  64. return
  65. } else {
  66. log.Info("Added valid tx", "tx", msg.Tx)
  67. }
  68. // broadcasting happens from go routines per peer
  69. default:
  70. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  71. }
  72. }
  73. // Just an alias for CheckTx since broadcasting happens in peer routines
  74. func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*tmsp.Response)) error {
  75. return memR.Mempool.CheckTx(tx, cb)
  76. }
  77. type PeerState interface {
  78. GetHeight() int
  79. }
  80. type Peer interface {
  81. IsRunning() bool
  82. Send(byte, interface{}) bool
  83. Get(string) interface{}
  84. }
  85. // Send new mempool txs to peer.
  86. // TODO: Handle mempool or reactor shutdown?
  87. // As is this routine may block forever if no new txs come in.
  88. func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
  89. if !config.GetBool("mempool_broadcast") {
  90. return
  91. }
  92. var next *clist.CElement
  93. for {
  94. if !memR.IsRunning() {
  95. return // Quit!
  96. }
  97. if next == nil {
  98. // This happens because the CElement we were looking at got
  99. // garbage collected (removed). That is, .NextWait() returned nil.
  100. // Go ahead and start from the beginning.
  101. next = memR.Mempool.TxsFrontWait() // Wait until a tx is available
  102. }
  103. memTx := next.Value.(*mempoolTx)
  104. // make sure the peer is up to date
  105. height := memTx.Height()
  106. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  107. peerState := peerState_i.(PeerState)
  108. if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block
  109. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  110. continue
  111. }
  112. }
  113. // send memTx
  114. msg := &TxMessage{Tx: memTx.tx}
  115. success := peer.Send(MempoolChannel, struct{ MempoolMessage }{msg})
  116. if !success {
  117. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  118. continue
  119. }
  120. next = next.NextWait()
  121. continue
  122. }
  123. }
  124. // implements events.Eventable
  125. func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) {
  126. memR.evsw = evsw
  127. }
  128. //-----------------------------------------------------------------------------
  129. // Messages
  130. const (
  131. msgTypeTx = byte(0x01)
  132. )
  133. type MempoolMessage interface{}
  134. var _ = wire.RegisterInterface(
  135. struct{ MempoolMessage }{},
  136. wire.ConcreteType{&TxMessage{}, msgTypeTx},
  137. )
  138. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  139. msgType = bz[0]
  140. n := new(int)
  141. r := bytes.NewReader(bz)
  142. msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage
  143. return
  144. }
  145. //-------------------------------------
  146. type TxMessage struct {
  147. Tx types.Tx
  148. }
  149. func (m *TxMessage) String() string {
  150. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  151. }