You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
3.0 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. "github.com/tendermint/tendermint/binary"
  7. . "github.com/tendermint/tendermint/common"
  8. "github.com/tendermint/tendermint/events"
  9. "github.com/tendermint/tendermint/p2p"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. var (
  13. MempoolChannel = byte(0x30)
  14. )
  15. // MempoolReactor handles mempool tx broadcasting amongst peers.
  16. type MempoolReactor struct {
  17. p2p.BaseReactor
  18. sw *p2p.Switch
  19. Mempool *Mempool
  20. evsw events.Fireable
  21. }
  22. func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
  23. memR := &MempoolReactor{
  24. Mempool: mempool,
  25. }
  26. memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR)
  27. return memR
  28. }
  29. // func (memR *MempoolReactor) AfterStart() {}
  30. // func (memR *MempoolReactor) AfterStop() {}
  31. // Implements Reactor
  32. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  33. return []*p2p.ChannelDescriptor{
  34. &p2p.ChannelDescriptor{
  35. Id: MempoolChannel,
  36. Priority: 5,
  37. },
  38. }
  39. }
  40. // Implements Reactor
  41. func (pexR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  42. }
  43. // Implements Reactor
  44. func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  45. }
  46. // Implements Reactor
  47. func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
  48. _, msg, err := DecodeMessage(msgBytes)
  49. if err != nil {
  50. log.Warn("Error decoding message", "error", err)
  51. return
  52. }
  53. log.Notice("MempoolReactor received message", "msg", msg)
  54. switch msg := msg.(type) {
  55. case *TxMessage:
  56. err := memR.Mempool.AddTx(msg.Tx)
  57. if err != nil {
  58. // Bad, seen, or conflicting tx.
  59. log.Info("Could not add tx", "tx", msg.Tx)
  60. return
  61. } else {
  62. log.Info("Added valid tx", "tx", msg.Tx)
  63. }
  64. // Share tx.
  65. // We use a simple shotgun approach for now.
  66. // TODO: improve efficiency
  67. for _, peer := range memR.Switch.Peers().List() {
  68. if peer.Key == src.Key {
  69. continue
  70. }
  71. peer.TrySend(MempoolChannel, msg)
  72. }
  73. default:
  74. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  75. }
  76. }
  77. func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
  78. err := memR.Mempool.AddTx(tx)
  79. if err != nil {
  80. return err
  81. }
  82. msg := &TxMessage{Tx: tx}
  83. memR.Switch.Broadcast(MempoolChannel, msg)
  84. return nil
  85. }
  86. // implements events.Eventable
  87. func (memR *MempoolReactor) SetFireable(evsw events.Fireable) {
  88. memR.evsw = evsw
  89. }
  90. //-----------------------------------------------------------------------------
  91. // Messages
  92. const (
  93. msgTypeTx = byte(0x01)
  94. )
  95. type MempoolMessage interface{}
  96. var _ = binary.RegisterInterface(
  97. struct{ MempoolMessage }{},
  98. binary.ConcreteType{&TxMessage{}, msgTypeTx},
  99. )
  100. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  101. msgType = bz[0]
  102. n := new(int64)
  103. r := bytes.NewReader(bz)
  104. msg = binary.ReadBinary(struct{ MempoolMessage }{}, r, n, &err).(struct{ MempoolMessage }).MempoolMessage
  105. return
  106. }
  107. //-------------------------------------
  108. type TxMessage struct {
  109. Tx types.Tx
  110. }
  111. func (m *TxMessage) String() string {
  112. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  113. }