You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
5.3 KiB

  1. package mempool
  2. import (
  3. "fmt"
  4. "reflect"
  5. "time"
  6. amino "github.com/tendermint/go-amino"
  7. abci "github.com/tendermint/tendermint/abci/types"
  8. "github.com/tendermint/tendermint/libs/clist"
  9. "github.com/tendermint/tendermint/libs/log"
  10. cfg "github.com/tendermint/tendermint/config"
  11. "github.com/tendermint/tendermint/p2p"
  12. "github.com/tendermint/tendermint/types"
  13. )
  14. const (
  15. MempoolChannel = byte(0x30)
  16. maxMsgSize = 1048576 // 1MB TODO make it configurable
  17. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  18. )
  19. // MempoolReactor handles mempool tx broadcasting amongst peers.
  20. type MempoolReactor struct {
  21. p2p.BaseReactor
  22. config *cfg.MempoolConfig
  23. Mempool *Mempool
  24. }
  25. // NewMempoolReactor returns a new MempoolReactor with the given config and mempool.
  26. func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor {
  27. memR := &MempoolReactor{
  28. config: config,
  29. Mempool: mempool,
  30. }
  31. memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
  32. return memR
  33. }
  34. // SetLogger sets the Logger on the reactor and the underlying Mempool.
  35. func (memR *MempoolReactor) SetLogger(l log.Logger) {
  36. memR.Logger = l
  37. memR.Mempool.SetLogger(l)
  38. }
  39. // OnStart implements p2p.BaseReactor.
  40. func (memR *MempoolReactor) OnStart() error {
  41. if !memR.config.Broadcast {
  42. memR.Logger.Info("Tx broadcasting is disabled")
  43. }
  44. return nil
  45. }
  46. // GetChannels implements Reactor.
  47. // It returns the list of channels for this reactor.
  48. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  49. return []*p2p.ChannelDescriptor{
  50. {
  51. ID: MempoolChannel,
  52. Priority: 5,
  53. },
  54. }
  55. }
  56. // AddPeer implements Reactor.
  57. // It starts a broadcast routine ensuring all txs are forwarded to the given peer.
  58. func (memR *MempoolReactor) AddPeer(peer p2p.Peer) {
  59. go memR.broadcastTxRoutine(peer)
  60. }
  61. // RemovePeer implements Reactor.
  62. func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  63. // broadcast routine checks if peer is gone and returns
  64. }
  65. // Receive implements Reactor.
  66. // It adds any received transactions to the mempool.
  67. func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  68. msg, err := decodeMsg(msgBytes)
  69. if err != nil {
  70. memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
  71. memR.Switch.StopPeerForError(src, err)
  72. return
  73. }
  74. memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  75. switch msg := msg.(type) {
  76. case *TxMessage:
  77. err := memR.Mempool.CheckTx(msg.Tx, nil)
  78. if err != nil {
  79. memR.Logger.Info("Could not check tx", "tx", TxID(msg.Tx), "err", err)
  80. }
  81. // broadcasting happens from go routines per peer
  82. default:
  83. memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  84. }
  85. }
  86. // BroadcastTx is an alias for Mempool.CheckTx. Broadcasting itself happens in peer routines.
  87. func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*abci.Response)) error {
  88. return memR.Mempool.CheckTx(tx, cb)
  89. }
  90. // PeerState describes the state of a peer.
  91. type PeerState interface {
  92. GetHeight() int64
  93. }
  94. // Send new mempool txs to peer.
  95. func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
  96. if !memR.config.Broadcast {
  97. return
  98. }
  99. var next *clist.CElement
  100. for {
  101. // This happens because the CElement we were looking at got garbage
  102. // collected (removed). That is, .NextWait() returned nil. Go ahead and
  103. // start from the beginning.
  104. if next == nil {
  105. select {
  106. case <-memR.Mempool.TxsWaitChan(): // Wait until a tx is available
  107. if next = memR.Mempool.TxsFront(); next == nil {
  108. continue
  109. }
  110. case <-peer.Quit():
  111. return
  112. case <-memR.Quit():
  113. return
  114. }
  115. }
  116. memTx := next.Value.(*mempoolTx)
  117. // make sure the peer is up to date
  118. height := memTx.Height()
  119. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  120. peerState := peerState_i.(PeerState)
  121. peerHeight := peerState.GetHeight()
  122. if peerHeight < height-1 { // Allow for a lag of 1 block
  123. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  124. continue
  125. }
  126. }
  127. // send memTx
  128. msg := &TxMessage{Tx: memTx.tx}
  129. success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg))
  130. if !success {
  131. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  132. continue
  133. }
  134. select {
  135. case <-next.NextWaitChan():
  136. // see the start of the for loop for nil check
  137. next = next.Next()
  138. case <-peer.Quit():
  139. return
  140. case <-memR.Quit():
  141. return
  142. }
  143. }
  144. }
  145. //-----------------------------------------------------------------------------
  146. // Messages
  147. // MempoolMessage is a message sent or received by the MempoolReactor.
  148. type MempoolMessage interface{}
  149. func RegisterMempoolMessages(cdc *amino.Codec) {
  150. cdc.RegisterInterface((*MempoolMessage)(nil), nil)
  151. cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil)
  152. }
  153. func decodeMsg(bz []byte) (msg MempoolMessage, err error) {
  154. if len(bz) > maxMsgSize {
  155. return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize)
  156. }
  157. err = cdc.UnmarshalBinaryBare(bz, &msg)
  158. return
  159. }
  160. //-------------------------------------
  161. // TxMessage is a MempoolMessage containing a transaction.
  162. type TxMessage struct {
  163. Tx types.Tx
  164. }
  165. // String returns a string representation of the TxMessage.
  166. func (m *TxMessage) String() string {
  167. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  168. }