You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

173 lines
4.3 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. "github.com/tendermint/go-clist"
  8. . "github.com/tendermint/go-common"
  9. "github.com/tendermint/go-events"
  10. "github.com/tendermint/go-p2p"
  11. "github.com/tendermint/go-wire"
  12. "github.com/tendermint/tendermint/types"
  13. tmsp "github.com/tendermint/tmsp/types"
  14. )
  15. const (
  16. MempoolChannel = byte(0x30)
  17. maxMempoolMessageSize = 1048576 // 1MB TODO make it configurable
  18. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  19. )
  20. // MempoolReactor handles mempool tx broadcasting amongst peers.
  21. type MempoolReactor struct {
  22. p2p.BaseReactor
  23. Mempool *Mempool // TODO: un-expose
  24. evsw *events.EventSwitch
  25. }
  26. func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
  27. memR := &MempoolReactor{
  28. Mempool: mempool,
  29. }
  30. memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR)
  31. return memR
  32. }
  33. // Implements Reactor
  34. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  35. return []*p2p.ChannelDescriptor{
  36. &p2p.ChannelDescriptor{
  37. ID: MempoolChannel,
  38. Priority: 5,
  39. },
  40. }
  41. }
  42. // Implements Reactor
  43. func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  44. go memR.broadcastTxRoutine(peer)
  45. }
  46. // Implements Reactor
  47. func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  48. // broadcast routine checks if peer is gone and returns
  49. }
  50. // Implements Reactor
  51. func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  52. _, msg, err := DecodeMessage(msgBytes)
  53. if err != nil {
  54. log.Warn("Error decoding message", "error", err)
  55. return
  56. }
  57. log.Info("Receive", "src", src, "chId", chID, "msg", msg)
  58. switch msg := msg.(type) {
  59. case *TxMessage:
  60. err := memR.Mempool.CheckTx(msg.Tx, nil)
  61. if err != nil {
  62. // Bad, seen, or conflicting tx.
  63. log.Info("Could not add tx", "tx", msg.Tx)
  64. return
  65. } else {
  66. log.Info("Added valid tx", "tx", msg.Tx)
  67. }
  68. // broadcasting happens from go routines per peer
  69. default:
  70. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  71. }
  72. }
  73. // Just an alias for CheckTx since broadcasting happens in peer routines
  74. func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*tmsp.Response)) error {
  75. return memR.Mempool.CheckTx(tx, cb)
  76. }
  77. type PeerState interface {
  78. GetHeight() int
  79. }
  80. type Peer interface {
  81. IsRunning() bool
  82. Send(byte, interface{}) bool
  83. Get(string) interface{}
  84. }
  85. // Send new mempool txs to peer.
  86. // TODO: Handle mempool or reactor shutdown?
  87. // As is this routine may block forever if no new txs come in.
  88. func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
  89. var next *clist.CElement
  90. for {
  91. if !memR.IsRunning() {
  92. return // Quit!
  93. }
  94. if next == nil {
  95. // This happens because the CElement we were looking at got
  96. // garbage collected (removed). That is, .NextWait() returned nil.
  97. // Go ahead and start from the beginning.
  98. next = memR.Mempool.TxsFrontWait() // Wait until a tx is available
  99. }
  100. memTx := next.Value.(*mempoolTx)
  101. // make sure the peer is up to date
  102. height := memTx.Height()
  103. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  104. peerState := peerState_i.(PeerState)
  105. if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block
  106. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  107. continue
  108. }
  109. }
  110. // send memTx
  111. msg := &TxMessage{Tx: memTx.tx}
  112. success := peer.Send(MempoolChannel, struct{ MempoolMessage }{msg})
  113. if !success {
  114. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  115. continue
  116. }
  117. next = next.NextWait()
  118. continue
  119. }
  120. }
  121. // implements events.Eventable
  122. func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) {
  123. memR.evsw = evsw
  124. }
  125. //-----------------------------------------------------------------------------
  126. // Messages
  127. const (
  128. msgTypeTx = byte(0x01)
  129. )
  130. type MempoolMessage interface{}
  131. var _ = wire.RegisterInterface(
  132. struct{ MempoolMessage }{},
  133. wire.ConcreteType{&TxMessage{}, msgTypeTx},
  134. )
  135. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  136. msgType = bz[0]
  137. n := new(int)
  138. r := bytes.NewReader(bz)
  139. msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage
  140. return
  141. }
  142. //-------------------------------------
  143. type TxMessage struct {
  144. Tx types.Tx
  145. }
  146. func (m *TxMessage) String() string {
  147. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  148. }