You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

145 lines
3.0 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "sync/atomic"
  6. "github.com/tendermint/tendermint2/binary"
  7. "github.com/tendermint/tendermint2/p2p"
  8. "github.com/tendermint/tendermint2/types"
  9. )
  10. var (
  11. MempoolChannel = byte(0x30)
  12. )
  13. // MempoolReactor handles mempool tx broadcasting amongst peers.
  14. type MempoolReactor struct {
  15. sw *p2p.Switch
  16. quit chan struct{}
  17. started uint32
  18. stopped uint32
  19. Mempool *Mempool
  20. }
  21. func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
  22. memR := &MempoolReactor{
  23. quit: make(chan struct{}),
  24. Mempool: mempool,
  25. }
  26. return memR
  27. }
  28. // Implements Reactor
  29. func (memR *MempoolReactor) Start(sw *p2p.Switch) {
  30. if atomic.CompareAndSwapUint32(&memR.started, 0, 1) {
  31. memR.sw = sw
  32. log.Info("Starting MempoolReactor")
  33. }
  34. }
  35. // Implements Reactor
  36. func (memR *MempoolReactor) Stop() {
  37. if atomic.CompareAndSwapUint32(&memR.stopped, 0, 1) {
  38. log.Info("Stopping MempoolReactor")
  39. close(memR.quit)
  40. }
  41. }
  42. // Implements Reactor
  43. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  44. return []*p2p.ChannelDescriptor{
  45. &p2p.ChannelDescriptor{
  46. Id: MempoolChannel,
  47. Priority: 5,
  48. },
  49. }
  50. }
  51. // Implements Reactor
  52. func (pexR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  53. }
  54. // Implements Reactor
  55. func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  56. }
  57. // Implements Reactor
  58. func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
  59. _, msg_, err := DecodeMessage(msgBytes)
  60. if err != nil {
  61. log.Warn("Error decoding message", "error", err)
  62. return
  63. }
  64. log.Info("MempoolReactor received message", "msg", msg_)
  65. switch msg := msg_.(type) {
  66. case *TxMessage:
  67. err := memR.Mempool.AddTx(msg.Tx)
  68. if err != nil {
  69. // Bad, seen, or conflicting tx.
  70. log.Debug("Could not add tx", "tx", msg.Tx)
  71. return
  72. } else {
  73. log.Debug("Added valid tx", "tx", msg.Tx)
  74. }
  75. // Share tx.
  76. // We use a simple shotgun approach for now.
  77. // TODO: improve efficiency
  78. for _, peer := range memR.sw.Peers().List() {
  79. if peer.Key == src.Key {
  80. continue
  81. }
  82. peer.TrySend(MempoolChannel, msg)
  83. }
  84. default:
  85. // Ignore unknown message
  86. }
  87. }
  88. func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
  89. err := memR.Mempool.AddTx(tx)
  90. if err != nil {
  91. return err
  92. }
  93. msg := &TxMessage{Tx: tx}
  94. memR.sw.Broadcast(MempoolChannel, msg)
  95. return nil
  96. }
  97. //-----------------------------------------------------------------------------
  98. // Messages
  99. const (
  100. msgTypeUnknown = byte(0x00)
  101. msgTypeTx = byte(0x10)
  102. )
  103. // TODO: check for unnecessary extra bytes at the end.
  104. func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
  105. n := new(int64)
  106. msgType = bz[0]
  107. r := bytes.NewReader(bz)
  108. switch msgType {
  109. case msgTypeTx:
  110. msg = binary.ReadBinary(&TxMessage{}, r, n, &err)
  111. default:
  112. msg = nil
  113. }
  114. return
  115. }
  116. //-------------------------------------
  117. type TxMessage struct {
  118. Tx types.Tx
  119. }
  120. func (m *TxMessage) TypeByte() byte { return msgTypeTx }
  121. func (m *TxMessage) String() string {
  122. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  123. }