You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

197 lines
5.3 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
7 years ago
10 years ago
7 years ago
10 years ago
7 years ago
7 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
8 years ago
10 years ago
7 years ago
10 years ago
10 years ago
7 years ago
10 years ago
7 years ago
7 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "fmt"
  4. "reflect"
  5. "time"
  6. abci "github.com/tendermint/tendermint/abci/types"
  7. amino "github.com/tendermint/go-amino"
  8. "github.com/tendermint/tmlibs/clist"
  9. "github.com/tendermint/tmlibs/log"
  10. cfg "github.com/tendermint/tendermint/config"
  11. "github.com/tendermint/tendermint/p2p"
  12. "github.com/tendermint/tendermint/types"
  13. )
  14. const (
  15. MempoolChannel = byte(0x30)
  16. maxMsgSize = 1048576 // 1MB TODO make it configurable
  17. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  18. )
  19. // MempoolReactor handles mempool tx broadcasting amongst peers.
  20. type MempoolReactor struct {
  21. p2p.BaseReactor
  22. config *cfg.MempoolConfig
  23. Mempool *Mempool
  24. }
  25. // NewMempoolReactor returns a new MempoolReactor with the given config and mempool.
  26. func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor {
  27. memR := &MempoolReactor{
  28. config: config,
  29. Mempool: mempool,
  30. }
  31. memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
  32. return memR
  33. }
  34. // SetLogger sets the Logger on the reactor and the underlying Mempool.
  35. func (memR *MempoolReactor) SetLogger(l log.Logger) {
  36. memR.Logger = l
  37. memR.Mempool.SetLogger(l)
  38. }
  39. // OnStart implements p2p.BaseReactor.
  40. func (memR *MempoolReactor) OnStart() error {
  41. if !memR.config.Broadcast {
  42. memR.Logger.Info("Tx broadcasting is disabled")
  43. }
  44. return nil
  45. }
  46. // GetChannels implements Reactor.
  47. // It returns the list of channels for this reactor.
  48. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  49. return []*p2p.ChannelDescriptor{
  50. {
  51. ID: MempoolChannel,
  52. Priority: 5,
  53. },
  54. }
  55. }
  56. // AddPeer implements Reactor.
  57. // It starts a broadcast routine ensuring all txs are forwarded to the given peer.
  58. func (memR *MempoolReactor) AddPeer(peer p2p.Peer) {
  59. go memR.broadcastTxRoutine(peer)
  60. }
  61. // RemovePeer implements Reactor.
  62. func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  63. // broadcast routine checks if peer is gone and returns
  64. }
  65. // Receive implements Reactor.
  66. // It adds any received transactions to the mempool.
  67. func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  68. msg, err := DecodeMessage(msgBytes)
  69. if err != nil {
  70. memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
  71. memR.Switch.StopPeerForError(src, err)
  72. return
  73. }
  74. memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  75. switch msg := msg.(type) {
  76. case *TxMessage:
  77. err := memR.Mempool.CheckTx(msg.Tx, nil)
  78. if err != nil {
  79. memR.Logger.Info("Could not check tx", "tx", msg.Tx, "err", err)
  80. }
  81. // broadcasting happens from go routines per peer
  82. default:
  83. memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  84. }
  85. }
  86. // BroadcastTx is an alias for Mempool.CheckTx. Broadcasting itself happens in peer routines.
  87. func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*abci.Response)) error {
  88. return memR.Mempool.CheckTx(tx, cb)
  89. }
  90. // PeerState describes the state of a peer.
  91. type PeerState interface {
  92. GetHeight() int64
  93. }
  94. // Send new mempool txs to peer.
  95. func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
  96. if !memR.config.Broadcast {
  97. return
  98. }
  99. var next *clist.CElement
  100. for {
  101. // This happens because the CElement we were looking at got garbage
  102. // collected (removed). That is, .NextWait() returned nil. Go ahead and
  103. // start from the beginning.
  104. if next == nil {
  105. select {
  106. case <-memR.Mempool.TxsWaitChan(): // Wait until a tx is available
  107. if next = memR.Mempool.TxsFront(); next == nil {
  108. continue
  109. }
  110. case <-peer.Quit():
  111. return
  112. case <-memR.Quit():
  113. return
  114. }
  115. }
  116. memTx := next.Value.(*mempoolTx)
  117. // make sure the peer is up to date
  118. height := memTx.Height()
  119. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  120. peerState := peerState_i.(PeerState)
  121. peerHeight := peerState.GetHeight()
  122. if peerHeight < height-1 { // Allow for a lag of 1 block
  123. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  124. continue
  125. }
  126. }
  127. // send memTx
  128. msg := &TxMessage{Tx: memTx.tx}
  129. success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg))
  130. if !success {
  131. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  132. continue
  133. }
  134. select {
  135. case <-next.NextWaitChan():
  136. // see the start of the for loop for nil check
  137. next = next.Next()
  138. case <-peer.Quit():
  139. return
  140. case <-memR.Quit():
  141. return
  142. }
  143. }
  144. }
  145. //-----------------------------------------------------------------------------
  146. // Messages
  147. // MempoolMessage is a message sent or received by the MempoolReactor.
  148. type MempoolMessage interface{}
  149. func RegisterMempoolMessages(cdc *amino.Codec) {
  150. cdc.RegisterInterface((*MempoolMessage)(nil), nil)
  151. cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil)
  152. }
  153. // DecodeMessage decodes a byte-array into a MempoolMessage.
  154. func DecodeMessage(bz []byte) (msg MempoolMessage, err error) {
  155. if len(bz) > maxMsgSize {
  156. return msg, fmt.Errorf("Msg exceeds max size (%d > %d)",
  157. len(bz), maxMsgSize)
  158. }
  159. err = cdc.UnmarshalBinaryBare(bz, &msg)
  160. return
  161. }
  162. //-------------------------------------
  163. // TxMessage is a MempoolMessage containing a transaction.
  164. type TxMessage struct {
  165. Tx types.Tx
  166. }
  167. // String returns a string representation of the TxMessage.
  168. func (m *TxMessage) String() string {
  169. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  170. }