You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

179 lines
4.4 KiB

10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. "github.com/tendermint/go-clist"
  8. . "github.com/tendermint/go-common"
  9. cfg "github.com/tendermint/go-config"
  10. "github.com/tendermint/go-p2p"
  11. "github.com/tendermint/go-wire"
  12. "github.com/tendermint/tendermint/types"
  13. abci "github.com/tendermint/abci/types"
  14. )
  15. const (
  16. MempoolChannel = byte(0x30)
  17. maxMempoolMessageSize = 1048576 // 1MB TODO make it configurable
  18. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  19. )
  20. // MempoolReactor handles mempool tx broadcasting amongst peers.
  21. type MempoolReactor struct {
  22. p2p.BaseReactor
  23. config cfg.Config
  24. Mempool *Mempool
  25. evsw types.EventSwitch
  26. }
  27. func NewMempoolReactor(config cfg.Config, mempool *Mempool) *MempoolReactor {
  28. memR := &MempoolReactor{
  29. config: config,
  30. Mempool: mempool,
  31. }
  32. memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR)
  33. return memR
  34. }
  35. // Implements Reactor
  36. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  37. return []*p2p.ChannelDescriptor{
  38. &p2p.ChannelDescriptor{
  39. ID: MempoolChannel,
  40. Priority: 5,
  41. },
  42. }
  43. }
  44. // Implements Reactor
  45. func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  46. go memR.broadcastTxRoutine(peer)
  47. }
  48. // Implements Reactor
  49. func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  50. // broadcast routine checks if peer is gone and returns
  51. }
  52. // Implements Reactor
  53. func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  54. _, msg, err := DecodeMessage(msgBytes)
  55. if err != nil {
  56. log.Warn("Error decoding message", "error", err)
  57. return
  58. }
  59. log.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  60. switch msg := msg.(type) {
  61. case *TxMessage:
  62. err := memR.Mempool.CheckTx(msg.Tx, nil)
  63. if err != nil {
  64. // Bad, seen, or conflicting tx.
  65. log.Info("Could not add tx", "tx", msg.Tx)
  66. return
  67. } else {
  68. log.Info("Added valid tx", "tx", msg.Tx)
  69. }
  70. // broadcasting happens from go routines per peer
  71. default:
  72. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  73. }
  74. }
  75. // Just an alias for CheckTx since broadcasting happens in peer routines
  76. func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*abci.Response)) error {
  77. return memR.Mempool.CheckTx(tx, cb)
  78. }
  79. type PeerState interface {
  80. GetHeight() int
  81. }
  82. type Peer interface {
  83. IsRunning() bool
  84. Send(byte, interface{}) bool
  85. Get(string) interface{}
  86. }
  87. // Send new mempool txs to peer.
  88. // TODO: Handle mempool or reactor shutdown?
  89. // As is this routine may block forever if no new txs come in.
  90. func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
  91. if !memR.config.GetBool("mempool_broadcast") {
  92. return
  93. }
  94. var next *clist.CElement
  95. for {
  96. if !memR.IsRunning() || !peer.IsRunning() {
  97. return // Quit!
  98. }
  99. if next == nil {
  100. // This happens because the CElement we were looking at got
  101. // garbage collected (removed). That is, .NextWait() returned nil.
  102. // Go ahead and start from the beginning.
  103. next = memR.Mempool.TxsFrontWait() // Wait until a tx is available
  104. }
  105. memTx := next.Value.(*mempoolTx)
  106. // make sure the peer is up to date
  107. height := memTx.Height()
  108. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  109. peerState := peerState_i.(PeerState)
  110. if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block
  111. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  112. continue
  113. }
  114. }
  115. // send memTx
  116. msg := &TxMessage{Tx: memTx.tx}
  117. success := peer.Send(MempoolChannel, struct{ MempoolMessage }{msg})
  118. if !success {
  119. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  120. continue
  121. }
  122. next = next.NextWait()
  123. continue
  124. }
  125. }
  126. // implements events.Eventable
  127. func (memR *MempoolReactor) SetEventSwitch(evsw types.EventSwitch) {
  128. memR.evsw = evsw
  129. }
  130. //-----------------------------------------------------------------------------
  131. // Messages
  132. const (
  133. msgTypeTx = byte(0x01)
  134. )
  135. type MempoolMessage interface{}
  136. var _ = wire.RegisterInterface(
  137. struct{ MempoolMessage }{},
  138. wire.ConcreteType{&TxMessage{}, msgTypeTx},
  139. )
  140. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  141. msgType = bz[0]
  142. n := new(int)
  143. r := bytes.NewReader(bz)
  144. msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage
  145. return
  146. }
  147. //-------------------------------------
  148. type TxMessage struct {
  149. Tx types.Tx
  150. }
  151. func (m *TxMessage) String() string {
  152. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  153. }