You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
4.2 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. "github.com/tendermint/go-clist"
  8. . "github.com/tendermint/go-common"
  9. "github.com/tendermint/go-p2p"
  10. "github.com/tendermint/go-wire"
  11. "github.com/tendermint/tendermint/events"
  12. "github.com/tendermint/tendermint/types"
  13. )
  14. const (
  15. MempoolChannel = byte(0x30)
  16. maxMempoolMessageSize = 1048576 // 1MB TODO make it configurable
  17. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  18. )
  19. // MempoolReactor handles mempool tx broadcasting amongst peers.
  20. type MempoolReactor struct {
  21. p2p.BaseReactor
  22. Mempool *Mempool // TODO: un-expose
  23. evsw *events.EventSwitch
  24. }
  25. func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
  26. memR := &MempoolReactor{
  27. Mempool: mempool,
  28. }
  29. memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR)
  30. return memR
  31. }
  32. // Implements Reactor
  33. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  34. return []*p2p.ChannelDescriptor{
  35. &p2p.ChannelDescriptor{
  36. ID: MempoolChannel,
  37. Priority: 5,
  38. },
  39. }
  40. }
  41. // Implements Reactor
  42. func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  43. go memR.broadcastTxRoutine(peer)
  44. }
  45. // Implements Reactor
  46. func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  47. // broadcast routine checks if peer is gone and returns
  48. }
  49. // Implements Reactor
  50. func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  51. _, msg, err := DecodeMessage(msgBytes)
  52. if err != nil {
  53. log.Warn("Error decoding message", "error", err)
  54. return
  55. }
  56. log.Info("MempoolReactor received message", "msg", msg)
  57. switch msg := msg.(type) {
  58. case *TxMessage:
  59. err := memR.Mempool.AppendTx(msg.Tx)
  60. if err != nil {
  61. // Bad, seen, or conflicting tx.
  62. log.Info("Could not add tx", "tx", msg.Tx)
  63. return
  64. } else {
  65. log.Info("Added valid tx", "tx", msg.Tx)
  66. }
  67. // broadcasting happens from go routines per peer
  68. default:
  69. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  70. }
  71. }
  72. // Just an alias for AppendTx since broadcasting happens in peer routines
  73. func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
  74. return memR.Mempool.AppendTx(tx)
  75. }
  76. type PeerState interface {
  77. GetHeight() int
  78. }
  79. type Peer interface {
  80. IsRunning() bool
  81. Send(byte, interface{}) bool
  82. Get(string) interface{}
  83. }
  84. // Send new mempool txs to peer.
  85. // TODO: Handle mempool or reactor shutdown?
  86. // As is this routine may block forever if no new txs come in.
  87. func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
  88. var next *clist.CElement
  89. for {
  90. if !memR.IsRunning() {
  91. return // Quit!
  92. }
  93. if next == nil {
  94. // This happens because the CElement we were looking at got
  95. // garbage collected (removed). That is, .NextWait() returned nil.
  96. // Go ahead and start from the beginning.
  97. next = memR.Mempool.TxsFrontWait() // Wait until a tx is available
  98. }
  99. memTx := next.Value.(*mempoolTx)
  100. // make sure the peer is up to date
  101. height := memTx.Height()
  102. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  103. peerState := peerState_i.(PeerState)
  104. if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block
  105. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  106. continue
  107. }
  108. }
  109. // send memTx
  110. msg := &TxMessage{Tx: memTx.tx}
  111. success := peer.Send(MempoolChannel, msg)
  112. if !success {
  113. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  114. continue
  115. }
  116. next = next.NextWait()
  117. continue
  118. }
  119. }
  120. // implements events.Eventable
  121. func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) {
  122. memR.evsw = evsw
  123. }
  124. //-----------------------------------------------------------------------------
  125. // Messages
  126. const (
  127. msgTypeTx = byte(0x01)
  128. )
  129. type MempoolMessage interface{}
  130. var _ = wire.RegisterInterface(
  131. struct{ MempoolMessage }{},
  132. wire.ConcreteType{&TxMessage{}, msgTypeTx},
  133. )
  134. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  135. msgType = bz[0]
  136. n := new(int)
  137. r := bytes.NewReader(bz)
  138. msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage
  139. return
  140. }
  141. //-------------------------------------
  142. type TxMessage struct {
  143. Tx types.Tx
  144. }
  145. func (m *TxMessage) String() string {
  146. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  147. }