You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

189 lines
5.1 KiB

10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
8 years ago
10 years ago
8 years ago
10 years ago
8 years ago
8 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. abci "github.com/tendermint/abci/types"
  8. wire "github.com/tendermint/go-wire"
  9. "github.com/tendermint/tmlibs/clist"
  10. cfg "github.com/tendermint/tendermint/config"
  11. "github.com/tendermint/tendermint/p2p"
  12. "github.com/tendermint/tendermint/types"
  13. )
  14. const (
  15. MempoolChannel = byte(0x30)
  16. maxMempoolMessageSize = 1048576 // 1MB TODO make it configurable
  17. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  18. )
  19. // MempoolReactor handles mempool tx broadcasting amongst peers.
  20. type MempoolReactor struct {
  21. p2p.BaseReactor
  22. config *cfg.MempoolConfig
  23. Mempool *Mempool
  24. evsw types.EventSwitch
  25. }
  26. // NewMempoolReactor returns a new MempoolReactor with the given config and mempool.
  27. func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor {
  28. memR := &MempoolReactor{
  29. config: config,
  30. Mempool: mempool,
  31. }
  32. memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
  33. return memR
  34. }
  35. // GetChannels implements Reactor.
  36. // It returns the list of channels for this reactor.
  37. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  38. return []*p2p.ChannelDescriptor{
  39. &p2p.ChannelDescriptor{
  40. ID: MempoolChannel,
  41. Priority: 5,
  42. },
  43. }
  44. }
  45. // AddPeer implements Reactor.
  46. // It starts a broadcast routine ensuring all txs are forwarded to the given peer.
  47. func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  48. go memR.broadcastTxRoutine(peer)
  49. }
  50. // RemovePeer implements Reactor.
  51. func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  52. // broadcast routine checks if peer is gone and returns
  53. }
  54. // Receive implements Reactor.
  55. // It adds any received transactions to the mempool.
  56. func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  57. _, msg, err := DecodeMessage(msgBytes)
  58. if err != nil {
  59. memR.Logger.Error("Error decoding message", "err", err)
  60. return
  61. }
  62. memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  63. switch msg := msg.(type) {
  64. case *TxMessage:
  65. err := memR.Mempool.CheckTx(msg.Tx, nil)
  66. if err != nil {
  67. // Bad, seen, or conflicting tx.
  68. memR.Logger.Info("Could not add tx", "tx", msg.Tx)
  69. return
  70. } else {
  71. memR.Logger.Info("Added valid tx", "tx", msg.Tx)
  72. }
  73. // broadcasting happens from go routines per peer
  74. default:
  75. memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  76. }
  77. }
  78. // BroadcastTx is an alias for Mempool.CheckTx. Broadcasting itself happens in peer routines.
  79. func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*abci.Response)) error {
  80. return memR.Mempool.CheckTx(tx, cb)
  81. }
  82. // PeerState describes the state of a peer.
  83. type PeerState interface {
  84. GetHeight() int
  85. }
  86. // Peer describes a peer.
  87. type Peer interface {
  88. IsRunning() bool
  89. Send(byte, interface{}) bool
  90. Get(string) interface{}
  91. }
  92. // Send new mempool txs to peer.
  93. // TODO: Handle mempool or reactor shutdown?
  94. // As is this routine may block forever if no new txs come in.
  95. func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
  96. if !memR.config.Broadcast {
  97. return
  98. }
  99. var next *clist.CElement
  100. for {
  101. if !memR.IsRunning() || !peer.IsRunning() {
  102. return // Quit!
  103. }
  104. if next == nil {
  105. // This happens because the CElement we were looking at got
  106. // garbage collected (removed). That is, .NextWait() returned nil.
  107. // Go ahead and start from the beginning.
  108. next = memR.Mempool.TxsFrontWait() // Wait until a tx is available
  109. }
  110. memTx := next.Value.(*mempoolTx)
  111. // make sure the peer is up to date
  112. height := memTx.Height()
  113. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  114. peerState := peerState_i.(PeerState)
  115. if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block
  116. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  117. continue
  118. }
  119. }
  120. // send memTx
  121. msg := &TxMessage{Tx: memTx.tx}
  122. success := peer.Send(MempoolChannel, struct{ MempoolMessage }{msg})
  123. if !success {
  124. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  125. continue
  126. }
  127. next = next.NextWait()
  128. continue
  129. }
  130. }
  131. // SetEventSwitch implements events.Eventable.
  132. func (memR *MempoolReactor) SetEventSwitch(evsw types.EventSwitch) {
  133. memR.evsw = evsw
  134. }
  135. //-----------------------------------------------------------------------------
  136. // Messages
  137. const (
  138. msgTypeTx = byte(0x01)
  139. )
  140. // MempoolMessage is a message sent or received by the MempoolReactor.
  141. type MempoolMessage interface{}
  142. var _ = wire.RegisterInterface(
  143. struct{ MempoolMessage }{},
  144. wire.ConcreteType{&TxMessage{}, msgTypeTx},
  145. )
  146. // DecodeMessage decodes a byte-array into a MempoolMessage.
  147. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  148. msgType = bz[0]
  149. n := new(int)
  150. r := bytes.NewReader(bz)
  151. msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage
  152. return
  153. }
  154. //-------------------------------------
  155. // TxMessage is a MempoolMessage containing a transaction.
  156. type TxMessage struct {
  157. Tx types.Tx
  158. }
  159. // String returns a string representation of the TxMessage.
  160. func (m *TxMessage) String() string {
  161. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  162. }