You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

134 lines
2.9 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. . "github.com/tendermint/tendermint/common"
  7. "github.com/tendermint/tendermint/events"
  8. "github.com/tendermint/tendermint/p2p"
  9. "github.com/tendermint/tendermint/types"
  10. "github.com/tendermint/tendermint/wire"
  11. )
  12. var (
  13. MempoolChannel = byte(0x30)
  14. )
  15. // MempoolReactor handles mempool tx broadcasting amongst peers.
  16. type MempoolReactor struct {
  17. p2p.BaseReactor
  18. sw *p2p.Switch
  19. Mempool *Mempool
  20. evsw events.Fireable
  21. }
  22. func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
  23. memR := &MempoolReactor{
  24. Mempool: mempool,
  25. }
  26. memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR)
  27. return memR
  28. }
  29. // Implements Reactor
  30. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  31. return []*p2p.ChannelDescriptor{
  32. &p2p.ChannelDescriptor{
  33. ID: MempoolChannel,
  34. Priority: 5,
  35. },
  36. }
  37. }
  38. // Implements Reactor
  39. func (pexR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  40. }
  41. // Implements Reactor
  42. func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  43. }
  44. // Implements Reactor
  45. func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  46. _, msg, err := DecodeMessage(msgBytes)
  47. if err != nil {
  48. log.Warn("Error decoding message", "error", err)
  49. return
  50. }
  51. log.Notice("MempoolReactor received message", "msg", msg)
  52. switch msg := msg.(type) {
  53. case *TxMessage:
  54. err := memR.Mempool.AddTx(msg.Tx)
  55. if err != nil {
  56. // Bad, seen, or conflicting tx.
  57. log.Info("Could not add tx", "tx", msg.Tx)
  58. return
  59. } else {
  60. log.Info("Added valid tx", "tx", msg.Tx)
  61. }
  62. // Share tx.
  63. // We use a simple shotgun approach for now.
  64. // TODO: improve efficiency
  65. for _, peer := range memR.Switch.Peers().List() {
  66. if peer.Key == src.Key {
  67. continue
  68. }
  69. peer.TrySend(MempoolChannel, msg)
  70. }
  71. default:
  72. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  73. }
  74. }
  75. func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
  76. err := memR.Mempool.AddTx(tx)
  77. if err != nil {
  78. return err
  79. }
  80. msg := &TxMessage{Tx: tx}
  81. memR.Switch.Broadcast(MempoolChannel, msg)
  82. return nil
  83. }
  84. // implements events.Eventable
  85. func (memR *MempoolReactor) SetFireable(evsw events.Fireable) {
  86. memR.evsw = evsw
  87. }
  88. //-----------------------------------------------------------------------------
  89. // Messages
  90. const (
  91. msgTypeTx = byte(0x01)
  92. )
  93. type MempoolMessage interface{}
  94. var _ = wire.RegisterInterface(
  95. struct{ MempoolMessage }{},
  96. wire.ConcreteType{&TxMessage{}, msgTypeTx},
  97. )
  98. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  99. msgType = bz[0]
  100. n := new(int64)
  101. r := bytes.NewReader(bz)
  102. msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, n, &err).(struct{ MempoolMessage }).MempoolMessage
  103. return
  104. }
  105. //-------------------------------------
  106. type TxMessage struct {
  107. Tx types.Tx
  108. }
  109. func (m *TxMessage) String() string {
  110. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  111. }