You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

179 lines
4.4 KiB

10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
8 years ago
10 years ago
8 years ago
10 years ago
8 years ago
8 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. abci "github.com/tendermint/abci/types"
  8. wire "github.com/tendermint/go-wire"
  9. "github.com/tendermint/tmlibs/clist"
  10. cfg "github.com/tendermint/tendermint/config"
  11. "github.com/tendermint/tendermint/p2p"
  12. "github.com/tendermint/tendermint/types"
  13. )
  14. const (
  15. MempoolChannel = byte(0x30)
  16. maxMempoolMessageSize = 1048576 // 1MB TODO make it configurable
  17. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  18. )
  19. // MempoolReactor handles mempool tx broadcasting amongst peers.
  20. type MempoolReactor struct {
  21. p2p.BaseReactor
  22. config *cfg.MempoolConfig
  23. Mempool *Mempool
  24. evsw types.EventSwitch
  25. }
  26. func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor {
  27. memR := &MempoolReactor{
  28. config: config,
  29. Mempool: mempool,
  30. }
  31. memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
  32. return memR
  33. }
  34. // Implements Reactor
  35. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  36. return []*p2p.ChannelDescriptor{
  37. &p2p.ChannelDescriptor{
  38. ID: MempoolChannel,
  39. Priority: 5,
  40. },
  41. }
  42. }
  43. // Implements Reactor
  44. func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  45. go memR.broadcastTxRoutine(peer)
  46. }
  47. // Implements Reactor
  48. func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  49. // broadcast routine checks if peer is gone and returns
  50. }
  51. // Implements Reactor
  52. func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  53. _, msg, err := DecodeMessage(msgBytes)
  54. if err != nil {
  55. memR.Logger.Error("Error decoding message", "err", err)
  56. return
  57. }
  58. memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  59. switch msg := msg.(type) {
  60. case *TxMessage:
  61. err := memR.Mempool.CheckTx(msg.Tx, nil)
  62. if err != nil {
  63. // Bad, seen, or conflicting tx.
  64. memR.Logger.Info("Could not add tx", "tx", msg.Tx)
  65. return
  66. } else {
  67. memR.Logger.Info("Added valid tx", "tx", msg.Tx)
  68. }
  69. // broadcasting happens from go routines per peer
  70. default:
  71. memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  72. }
  73. }
  74. // Just an alias for CheckTx since broadcasting happens in peer routines
  75. func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*abci.Response)) error {
  76. return memR.Mempool.CheckTx(tx, cb)
  77. }
  78. type PeerState interface {
  79. GetHeight() int
  80. }
  81. type Peer interface {
  82. IsRunning() bool
  83. Send(byte, interface{}) bool
  84. Get(string) interface{}
  85. }
  86. // Send new mempool txs to peer.
  87. // TODO: Handle mempool or reactor shutdown?
  88. // As is this routine may block forever if no new txs come in.
  89. func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
  90. if !memR.config.Broadcast {
  91. return
  92. }
  93. var next *clist.CElement
  94. for {
  95. if !memR.IsRunning() || !peer.IsRunning() {
  96. return // Quit!
  97. }
  98. if next == nil {
  99. // This happens because the CElement we were looking at got
  100. // garbage collected (removed). That is, .NextWait() returned nil.
  101. // Go ahead and start from the beginning.
  102. next = memR.Mempool.TxsFrontWait() // Wait until a tx is available
  103. }
  104. memTx := next.Value.(*mempoolTx)
  105. // make sure the peer is up to date
  106. height := memTx.Height()
  107. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  108. peerState := peerState_i.(PeerState)
  109. if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block
  110. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  111. continue
  112. }
  113. }
  114. // send memTx
  115. msg := &TxMessage{Tx: memTx.tx}
  116. success := peer.Send(MempoolChannel, struct{ MempoolMessage }{msg})
  117. if !success {
  118. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  119. continue
  120. }
  121. next = next.NextWait()
  122. continue
  123. }
  124. }
  125. // implements events.Eventable
  126. func (memR *MempoolReactor) SetEventSwitch(evsw types.EventSwitch) {
  127. memR.evsw = evsw
  128. }
  129. //-----------------------------------------------------------------------------
  130. // Messages
  131. const (
  132. msgTypeTx = byte(0x01)
  133. )
  134. type MempoolMessage interface{}
  135. var _ = wire.RegisterInterface(
  136. struct{ MempoolMessage }{},
  137. wire.ConcreteType{&TxMessage{}, msgTypeTx},
  138. )
  139. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  140. msgType = bz[0]
  141. n := new(int)
  142. r := bytes.NewReader(bz)
  143. msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage
  144. return
  145. }
  146. //-------------------------------------
  147. type TxMessage struct {
  148. Tx types.Tx
  149. }
  150. func (m *TxMessage) String() string {
  151. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  152. }