You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

146 lines
3.0 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "sync/atomic"
  6. "github.com/tendermint/tendermint/binary"
  7. blk "github.com/tendermint/tendermint/block"
  8. "github.com/tendermint/tendermint/p2p"
  9. )
  10. var (
  11. MempoolCh = byte(0x30)
  12. )
  13. // MempoolReactor handles mempool tx broadcasting amongst peers.
  14. type MempoolReactor struct {
  15. sw *p2p.Switch
  16. quit chan struct{}
  17. started uint32
  18. stopped uint32
  19. Mempool *Mempool
  20. }
  21. func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
  22. memR := &MempoolReactor{
  23. quit: make(chan struct{}),
  24. Mempool: mempool,
  25. }
  26. return memR
  27. }
  28. // Implements Reactor
  29. func (memR *MempoolReactor) Start(sw *p2p.Switch) {
  30. if atomic.CompareAndSwapUint32(&memR.started, 0, 1) {
  31. memR.sw = sw
  32. log.Info("Starting MempoolReactor")
  33. }
  34. }
  35. // Implements Reactor
  36. func (memR *MempoolReactor) Stop() {
  37. if atomic.CompareAndSwapUint32(&memR.stopped, 0, 1) {
  38. log.Info("Stopping MempoolReactor")
  39. close(memR.quit)
  40. }
  41. }
  42. // Implements Reactor
  43. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  44. return []*p2p.ChannelDescriptor{
  45. &p2p.ChannelDescriptor{
  46. Id: MempoolCh,
  47. Priority: 5,
  48. },
  49. }
  50. }
  51. // Implements Reactor
  52. func (pexR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  53. }
  54. // Implements Reactor
  55. func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  56. }
  57. // Implements Reactor
  58. func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
  59. _, msg_, err := DecodeMessage(msgBytes)
  60. if err != nil {
  61. log.Warn("Error decoding message", "error", err)
  62. return
  63. }
  64. log.Info("MempoolReactor received message", "msg", msg_)
  65. switch msg_.(type) {
  66. case *TxMessage:
  67. msg := msg_.(*TxMessage)
  68. err := memR.Mempool.AddTx(msg.Tx)
  69. if err != nil {
  70. // Bad, seen, or conflicting tx.
  71. log.Debug("Could not add tx", "tx", msg.Tx)
  72. return
  73. } else {
  74. log.Debug("Added valid tx", "tx", msg.Tx)
  75. }
  76. // Share tx.
  77. // We use a simple shotgun approach for now.
  78. // TODO: improve efficiency
  79. for _, peer := range memR.sw.Peers().List() {
  80. if peer.Key == src.Key {
  81. continue
  82. }
  83. peer.TrySend(MempoolCh, msg)
  84. }
  85. default:
  86. // Ignore unknown message
  87. }
  88. }
  89. func (memR *MempoolReactor) BroadcastTx(tx blk.Tx) error {
  90. err := memR.Mempool.AddTx(tx)
  91. if err != nil {
  92. return err
  93. }
  94. msg := &TxMessage{Tx: tx}
  95. memR.sw.Broadcast(MempoolCh, msg)
  96. return nil
  97. }
  98. //-----------------------------------------------------------------------------
  99. // Messages
  100. const (
  101. msgTypeUnknown = byte(0x00)
  102. msgTypeTx = byte(0x10)
  103. )
  104. // TODO: check for unnecessary extra bytes at the end.
  105. func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
  106. n := new(int64)
  107. msgType = bz[0]
  108. r := bytes.NewReader(bz)
  109. switch msgType {
  110. case msgTypeTx:
  111. msg = binary.ReadBinary(&TxMessage{}, r, n, &err)
  112. default:
  113. msg = nil
  114. }
  115. return
  116. }
  117. //-------------------------------------
  118. type TxMessage struct {
  119. Tx blk.Tx
  120. }
  121. func (m *TxMessage) TypeByte() byte { return msgTypeTx }
  122. func (m *TxMessage) String() string {
  123. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  124. }