You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

188 lines
5.1 KiB

10 years ago
10 years ago
7 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
7 years ago
10 years ago
7 years ago
10 years ago
7 years ago
7 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
8 years ago
10 years ago
7 years ago
10 years ago
10 years ago
7 years ago
10 years ago
7 years ago
7 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "fmt"
  4. "reflect"
  5. "time"
  6. abci "github.com/tendermint/abci/types"
  7. "github.com/tendermint/go-amino"
  8. "github.com/tendermint/tmlibs/clist"
  9. "github.com/tendermint/tmlibs/log"
  10. cfg "github.com/tendermint/tendermint/config"
  11. "github.com/tendermint/tendermint/p2p"
  12. "github.com/tendermint/tendermint/types"
  13. )
  14. const (
  15. MempoolChannel = byte(0x30)
  16. maxMsgSize = 1048576 // 1MB TODO make it configurable
  17. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  18. )
  19. // MempoolReactor handles mempool tx broadcasting amongst peers.
  20. type MempoolReactor struct {
  21. p2p.BaseReactor
  22. config *cfg.MempoolConfig
  23. Mempool *Mempool
  24. }
  25. // NewMempoolReactor returns a new MempoolReactor with the given config and mempool.
  26. func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor {
  27. memR := &MempoolReactor{
  28. config: config,
  29. Mempool: mempool,
  30. }
  31. memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
  32. return memR
  33. }
  34. // SetLogger sets the Logger on the reactor and the underlying Mempool.
  35. func (memR *MempoolReactor) SetLogger(l log.Logger) {
  36. memR.Logger = l
  37. memR.Mempool.SetLogger(l)
  38. }
  39. // GetChannels implements Reactor.
  40. // It returns the list of channels for this reactor.
  41. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  42. return []*p2p.ChannelDescriptor{
  43. {
  44. ID: MempoolChannel,
  45. Priority: 5,
  46. },
  47. }
  48. }
  49. // AddPeer implements Reactor.
  50. // It starts a broadcast routine ensuring all txs are forwarded to the given peer.
  51. func (memR *MempoolReactor) AddPeer(peer p2p.Peer) {
  52. go memR.broadcastTxRoutine(peer)
  53. }
  54. // RemovePeer implements Reactor.
  55. func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  56. // broadcast routine checks if peer is gone and returns
  57. }
  58. // Receive implements Reactor.
  59. // It adds any received transactions to the mempool.
  60. func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  61. msg, err := DecodeMessage(msgBytes)
  62. if err != nil {
  63. memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
  64. memR.Switch.StopPeerForError(src, err)
  65. return
  66. }
  67. memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  68. switch msg := msg.(type) {
  69. case *TxMessage:
  70. err := memR.Mempool.CheckTx(msg.Tx, nil)
  71. if err != nil {
  72. memR.Logger.Info("Could not check tx", "tx", msg.Tx, "err", err)
  73. }
  74. // broadcasting happens from go routines per peer
  75. default:
  76. memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  77. }
  78. }
  79. // BroadcastTx is an alias for Mempool.CheckTx. Broadcasting itself happens in peer routines.
  80. func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*abci.Response)) error {
  81. return memR.Mempool.CheckTx(tx, cb)
  82. }
  83. // PeerState describes the state of a peer.
  84. type PeerState interface {
  85. GetHeight() int64
  86. }
  87. // Send new mempool txs to peer.
  88. func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
  89. if !memR.config.Broadcast {
  90. return
  91. }
  92. var next *clist.CElement
  93. for {
  94. // This happens because the CElement we were looking at got garbage
  95. // collected (removed). That is, .NextWait() returned nil. Go ahead and
  96. // start from the beginning.
  97. if next == nil {
  98. select {
  99. case <-memR.Mempool.TxsWaitChan(): // Wait until a tx is available
  100. if next = memR.Mempool.TxsFront(); next == nil {
  101. continue
  102. }
  103. case <-peer.Quit():
  104. return
  105. case <-memR.Quit():
  106. return
  107. }
  108. }
  109. memTx := next.Value.(*mempoolTx)
  110. // make sure the peer is up to date
  111. height := memTx.Height()
  112. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  113. peerState := peerState_i.(PeerState)
  114. if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block
  115. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  116. continue
  117. }
  118. }
  119. // send memTx
  120. msg := &TxMessage{Tx: memTx.tx}
  121. success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg))
  122. if !success {
  123. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  124. continue
  125. }
  126. select {
  127. case <-next.NextWaitChan():
  128. // see the start of the for loop for nil check
  129. next = next.Next()
  130. case <-peer.Quit():
  131. return
  132. case <-memR.Quit():
  133. return
  134. }
  135. }
  136. }
  137. //-----------------------------------------------------------------------------
  138. // Messages
  139. // MempoolMessage is a message sent or received by the MempoolReactor.
  140. type MempoolMessage interface{}
  141. func RegisterMempoolMessages(cdc *amino.Codec) {
  142. cdc.RegisterInterface((*MempoolMessage)(nil), nil)
  143. cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil)
  144. }
  145. // DecodeMessage decodes a byte-array into a MempoolMessage.
  146. func DecodeMessage(bz []byte) (msg MempoolMessage, err error) {
  147. if len(bz) > maxMsgSize {
  148. return msg, fmt.Errorf("Msg exceeds max size (%d > %d)",
  149. len(bz), maxMsgSize)
  150. }
  151. err = cdc.UnmarshalBinaryBare(bz, &msg)
  152. return
  153. }
  154. //-------------------------------------
  155. // TxMessage is a MempoolMessage containing a transaction.
  156. type TxMessage struct {
  157. Tx types.Tx
  158. }
  159. // String returns a string representation of the TxMessage.
  160. func (m *TxMessage) String() string {
  161. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  162. }