You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

192 lines
5.2 KiB

10 years ago
10 years ago
7 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
7 years ago
10 years ago
10 years ago
7 years ago
10 years ago
7 years ago
10 years ago
7 years ago
7 years ago
10 years ago
10 years ago
10 years ago
10 years ago
7 years ago
8 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. abci "github.com/tendermint/abci/types"
  8. wire "github.com/tendermint/go-wire"
  9. "github.com/tendermint/tmlibs/clist"
  10. "github.com/tendermint/tmlibs/log"
  11. cfg "github.com/tendermint/tendermint/config"
  12. "github.com/tendermint/tendermint/p2p"
  13. "github.com/tendermint/tendermint/types"
  14. )
  15. const (
  16. MempoolChannel = byte(0x30)
  17. maxMempoolMessageSize = 1048576 // 1MB TODO make it configurable
  18. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  19. )
  20. // MempoolReactor handles mempool tx broadcasting amongst peers.
  21. type MempoolReactor struct {
  22. p2p.BaseReactor
  23. config *cfg.MempoolConfig
  24. Mempool *Mempool
  25. evsw types.EventSwitch
  26. }
  27. // NewMempoolReactor returns a new MempoolReactor with the given config and mempool.
  28. func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor {
  29. memR := &MempoolReactor{
  30. config: config,
  31. Mempool: mempool,
  32. }
  33. memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
  34. return memR
  35. }
  36. // SetLogger sets the Logger on the reactor and the underlying Mempool.
  37. func (memR *MempoolReactor) SetLogger(l log.Logger) {
  38. memR.Logger = l
  39. memR.Mempool.SetLogger(l)
  40. }
  41. // GetChannels implements Reactor.
  42. // It returns the list of channels for this reactor.
  43. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  44. return []*p2p.ChannelDescriptor{
  45. &p2p.ChannelDescriptor{
  46. ID: MempoolChannel,
  47. Priority: 5,
  48. },
  49. }
  50. }
  51. // AddPeer implements Reactor.
  52. // It starts a broadcast routine ensuring all txs are forwarded to the given peer.
  53. func (memR *MempoolReactor) AddPeer(peer p2p.Peer) {
  54. go memR.broadcastTxRoutine(peer)
  55. }
  56. // RemovePeer implements Reactor.
  57. func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
  58. // broadcast routine checks if peer is gone and returns
  59. }
  60. // Receive implements Reactor.
  61. // It adds any received transactions to the mempool.
  62. func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
  63. _, msg, err := DecodeMessage(msgBytes)
  64. if err != nil {
  65. memR.Logger.Error("Error decoding message", "err", err)
  66. return
  67. }
  68. memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  69. switch msg := msg.(type) {
  70. case *TxMessage:
  71. err := memR.Mempool.CheckTx(msg.Tx, nil)
  72. if err != nil {
  73. memR.Logger.Info("Could not check tx", "tx", msg.Tx, "err", err)
  74. }
  75. // broadcasting happens from go routines per peer
  76. default:
  77. memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  78. }
  79. }
  80. // BroadcastTx is an alias for Mempool.CheckTx. Broadcasting itself happens in peer routines.
  81. func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*abci.Response)) error {
  82. return memR.Mempool.CheckTx(tx, cb)
  83. }
  84. // PeerState describes the state of a peer.
  85. type PeerState interface {
  86. GetHeight() int
  87. }
  88. // Peer describes a peer.
  89. type Peer interface {
  90. IsRunning() bool
  91. Send(byte, interface{}) bool
  92. Get(string) interface{}
  93. }
  94. // Send new mempool txs to peer.
  95. // TODO: Handle mempool or reactor shutdown?
  96. // As is this routine may block forever if no new txs come in.
  97. func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
  98. if !memR.config.Broadcast {
  99. return
  100. }
  101. var next *clist.CElement
  102. for {
  103. if !memR.IsRunning() || !peer.IsRunning() {
  104. return // Quit!
  105. }
  106. if next == nil {
  107. // This happens because the CElement we were looking at got
  108. // garbage collected (removed). That is, .NextWait() returned nil.
  109. // Go ahead and start from the beginning.
  110. next = memR.Mempool.TxsFrontWait() // Wait until a tx is available
  111. }
  112. memTx := next.Value.(*mempoolTx)
  113. // make sure the peer is up to date
  114. height := memTx.Height()
  115. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  116. peerState := peerState_i.(PeerState)
  117. if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block
  118. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  119. continue
  120. }
  121. }
  122. // send memTx
  123. msg := &TxMessage{Tx: memTx.tx}
  124. success := peer.Send(MempoolChannel, struct{ MempoolMessage }{msg})
  125. if !success {
  126. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  127. continue
  128. }
  129. next = next.NextWait()
  130. continue
  131. }
  132. }
  133. // SetEventSwitch implements events.Eventable.
  134. func (memR *MempoolReactor) SetEventSwitch(evsw types.EventSwitch) {
  135. memR.evsw = evsw
  136. }
  137. //-----------------------------------------------------------------------------
  138. // Messages
  139. const (
  140. msgTypeTx = byte(0x01)
  141. )
  142. // MempoolMessage is a message sent or received by the MempoolReactor.
  143. type MempoolMessage interface{}
  144. var _ = wire.RegisterInterface(
  145. struct{ MempoolMessage }{},
  146. wire.ConcreteType{&TxMessage{}, msgTypeTx},
  147. )
  148. // DecodeMessage decodes a byte-array into a MempoolMessage.
  149. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  150. msgType = bz[0]
  151. n := new(int)
  152. r := bytes.NewReader(bz)
  153. msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage
  154. return
  155. }
  156. //-------------------------------------
  157. // TxMessage is a MempoolMessage containing a transaction.
  158. type TxMessage struct {
  159. Tx types.Tx
  160. }
  161. // String returns a string representation of the TxMessage.
  162. func (m *TxMessage) String() string {
  163. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  164. }