You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
4.8 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. abci "github.com/tendermint/abci/types"
  8. "github.com/tendermint/go-wire"
  9. "github.com/tendermint/tmlibs/clist"
  10. "github.com/tendermint/tendermint/p2p"
  11. "github.com/tendermint/tendermint/types"
  12. )
  13. const (
  14. MempoolChannel = byte(0x30)
  15. maxMempoolMessageSize = 1048576 // 1MB TODO make it configurable
  16. peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
  17. )
  18. type Config struct {
  19. Recheck bool `mapstructure:"recheck"` // true
  20. RecheckEmpty bool `mapstructure:"recheck_empty"` // true
  21. Broadcast bool `mapstructure:"broadcast"` // true
  22. WalDir string `mapstructure:"wal_dir"` //
  23. }
  24. func NewDefaultConfig(rootDir string) *Config {
  25. return &Config{
  26. Recheck: true,
  27. RecheckEmpty: true,
  28. Broadcast: true,
  29. WalDir: rootDir + "/data/mempool.wal",
  30. }
  31. }
  32. // MempoolReactor handles mempool tx broadcasting amongst peers.
  33. type MempoolReactor struct {
  34. p2p.BaseReactor
  35. config *Config
  36. Mempool *Mempool
  37. evsw types.EventSwitch
  38. }
  39. func NewMempoolReactor(config *Config, mempool *Mempool) *MempoolReactor {
  40. memR := &MempoolReactor{
  41. config: config,
  42. Mempool: mempool,
  43. }
  44. memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR)
  45. return memR
  46. }
  47. // Implements Reactor
  48. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  49. return []*p2p.ChannelDescriptor{
  50. &p2p.ChannelDescriptor{
  51. ID: MempoolChannel,
  52. Priority: 5,
  53. },
  54. }
  55. }
  56. // Implements Reactor
  57. func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  58. go memR.broadcastTxRoutine(peer)
  59. }
  60. // Implements Reactor
  61. func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  62. // broadcast routine checks if peer is gone and returns
  63. }
  64. // Implements Reactor
  65. func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  66. _, msg, err := DecodeMessage(msgBytes)
  67. if err != nil {
  68. log.Warn("Error decoding message", "error", err)
  69. return
  70. }
  71. log.Debug("Receive", "src", src, "chId", chID, "msg", msg)
  72. switch msg := msg.(type) {
  73. case *TxMessage:
  74. err := memR.Mempool.CheckTx(msg.Tx, nil)
  75. if err != nil {
  76. // Bad, seen, or conflicting tx.
  77. log.Info("Could not add tx", "tx", msg.Tx)
  78. return
  79. } else {
  80. log.Info("Added valid tx", "tx", msg.Tx)
  81. }
  82. // broadcasting happens from go routines per peer
  83. default:
  84. log.Warn(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
  85. }
  86. }
  87. // Just an alias for CheckTx since broadcasting happens in peer routines
  88. func (memR *MempoolReactor) BroadcastTx(tx types.Tx, cb func(*abci.Response)) error {
  89. return memR.Mempool.CheckTx(tx, cb)
  90. }
  91. type PeerState interface {
  92. GetHeight() int
  93. }
  94. type Peer interface {
  95. IsRunning() bool
  96. Send(byte, interface{}) bool
  97. Get(string) interface{}
  98. }
  99. // Send new mempool txs to peer.
  100. // TODO: Handle mempool or reactor shutdown?
  101. // As is this routine may block forever if no new txs come in.
  102. func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
  103. if !memR.config.Broadcast {
  104. return
  105. }
  106. var next *clist.CElement
  107. for {
  108. if !memR.IsRunning() || !peer.IsRunning() {
  109. return // Quit!
  110. }
  111. if next == nil {
  112. // This happens because the CElement we were looking at got
  113. // garbage collected (removed). That is, .NextWait() returned nil.
  114. // Go ahead and start from the beginning.
  115. next = memR.Mempool.TxsFrontWait() // Wait until a tx is available
  116. }
  117. memTx := next.Value.(*mempoolTx)
  118. // make sure the peer is up to date
  119. height := memTx.Height()
  120. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  121. peerState := peerState_i.(PeerState)
  122. if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block
  123. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  124. continue
  125. }
  126. }
  127. // send memTx
  128. msg := &TxMessage{Tx: memTx.tx}
  129. success := peer.Send(MempoolChannel, struct{ MempoolMessage }{msg})
  130. if !success {
  131. time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
  132. continue
  133. }
  134. next = next.NextWait()
  135. continue
  136. }
  137. }
  138. // implements events.Eventable
  139. func (memR *MempoolReactor) SetEventSwitch(evsw types.EventSwitch) {
  140. memR.evsw = evsw
  141. }
  142. //-----------------------------------------------------------------------------
  143. // Messages
  144. const (
  145. msgTypeTx = byte(0x01)
  146. )
  147. type MempoolMessage interface{}
  148. var _ = wire.RegisterInterface(
  149. struct{ MempoolMessage }{},
  150. wire.ConcreteType{&TxMessage{}, msgTypeTx},
  151. )
  152. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  153. msgType = bz[0]
  154. n := new(int)
  155. r := bytes.NewReader(bz)
  156. msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage
  157. return
  158. }
  159. //-------------------------------------
  160. type TxMessage struct {
  161. Tx types.Tx
  162. }
  163. func (m *TxMessage) String() string {
  164. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  165. }