You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

155 lines
3.3 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "sync/atomic"
  6. "github.com/tendermint/tendermint/binary"
  7. . "github.com/tendermint/tendermint/common"
  8. "github.com/tendermint/tendermint/events"
  9. "github.com/tendermint/tendermint/p2p"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. var (
  13. MempoolChannel = byte(0x30)
  14. )
  15. // MempoolReactor handles mempool tx broadcasting amongst peers.
  16. type MempoolReactor struct {
  17. sw *p2p.Switch
  18. quit chan struct{}
  19. started uint32
  20. stopped uint32
  21. Mempool *Mempool
  22. evsw *events.EventSwitch
  23. }
  24. func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
  25. memR := &MempoolReactor{
  26. quit: make(chan struct{}),
  27. Mempool: mempool,
  28. }
  29. return memR
  30. }
  31. // Implements Reactor
  32. func (memR *MempoolReactor) Start(sw *p2p.Switch) {
  33. if atomic.CompareAndSwapUint32(&memR.started, 0, 1) {
  34. memR.sw = sw
  35. log.Info("Starting MempoolReactor")
  36. }
  37. }
  38. // Implements Reactor
  39. func (memR *MempoolReactor) Stop() {
  40. if atomic.CompareAndSwapUint32(&memR.stopped, 0, 1) {
  41. log.Info("Stopping MempoolReactor")
  42. close(memR.quit)
  43. }
  44. }
  45. // Implements Reactor
  46. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  47. return []*p2p.ChannelDescriptor{
  48. &p2p.ChannelDescriptor{
  49. Id: MempoolChannel,
  50. Priority: 5,
  51. },
  52. }
  53. }
  54. // Implements Reactor
  55. func (pexR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  56. }
  57. // Implements Reactor
  58. func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  59. }
  60. // Implements Reactor
  61. func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
  62. _, msg_, err := DecodeMessage(msgBytes)
  63. if err != nil {
  64. log.Warn("Error decoding message", "error", err)
  65. return
  66. }
  67. log.Info("MempoolReactor received message", "msg", msg_)
  68. switch msg := msg_.(type) {
  69. case *TxMessage:
  70. err := memR.Mempool.AddTx(msg.Tx)
  71. if err != nil {
  72. // Bad, seen, or conflicting tx.
  73. log.Debug("Could not add tx", "tx", msg.Tx)
  74. return
  75. } else {
  76. log.Debug("Added valid tx", "tx", msg.Tx)
  77. }
  78. // Share tx.
  79. // We use a simple shotgun approach for now.
  80. // TODO: improve efficiency
  81. for _, peer := range memR.sw.Peers().List() {
  82. if peer.Key == src.Key {
  83. continue
  84. }
  85. peer.TrySend(MempoolChannel, msg)
  86. }
  87. default:
  88. // Ignore unknown message
  89. }
  90. }
  91. func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
  92. err := memR.Mempool.AddTx(tx)
  93. if err != nil {
  94. return err
  95. }
  96. msg := &TxMessage{Tx: tx}
  97. memR.sw.Broadcast(MempoolChannel, msg)
  98. return nil
  99. }
  100. // implements events.Eventable
  101. func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) {
  102. memR.evsw = evsw
  103. }
  104. //-----------------------------------------------------------------------------
  105. // Messages
  106. const (
  107. msgTypeUnknown = byte(0x00)
  108. msgTypeTx = byte(0x01)
  109. )
  110. // TODO: check for unnecessary extra bytes at the end.
  111. func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
  112. n := new(int64)
  113. msgType = bz[0]
  114. r := bytes.NewReader(bz)
  115. switch msgType {
  116. case msgTypeTx:
  117. msg = binary.ReadBinary(&TxMessage{}, r, n, &err)
  118. default:
  119. log.Warn(Fmt("Ignoring unknown message %X", bz))
  120. msg = nil
  121. }
  122. return
  123. }
  124. //-------------------------------------
  125. type TxMessage struct {
  126. Tx types.Tx
  127. }
  128. func (m *TxMessage) TypeByte() byte { return msgTypeTx }
  129. func (m *TxMessage) String() string {
  130. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  131. }