You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

133 lines
2.8 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "reflect"
  6. . "github.com/tendermint/tendermint/common"
  7. "github.com/tendermint/tendermint/events"
  8. "github.com/tendermint/tendermint/p2p"
  9. "github.com/tendermint/tendermint/types"
  10. "github.com/tendermint/tendermint/wire"
  11. )
  12. var (
  13. MempoolChannel = byte(0x30)
  14. )
  15. // MempoolReactor handles mempool tx broadcasting amongst peers.
  16. type MempoolReactor struct {
  17. p2p.BaseReactor
  18. Mempool *Mempool
  19. evsw events.Fireable
  20. }
  21. func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
  22. memR := &MempoolReactor{
  23. Mempool: mempool,
  24. }
  25. memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR)
  26. return memR
  27. }
  28. // Implements Reactor
  29. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  30. return []*p2p.ChannelDescriptor{
  31. &p2p.ChannelDescriptor{
  32. ID: MempoolChannel,
  33. Priority: 5,
  34. },
  35. }
  36. }
  37. // Implements Reactor
  38. func (pexR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  39. }
  40. // Implements Reactor
  41. func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  42. }
  43. // Implements Reactor
  44. func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  45. _, msg, err := DecodeMessage(msgBytes)
  46. if err != nil {
  47. log.Warn("Error decoding message", "error", err)
  48. return
  49. }
  50. log.Notice("MempoolReactor received message", "msg", msg)
  51. switch msg := msg.(type) {
  52. case *TxMessage:
  53. err := memR.Mempool.AddTx(msg.Tx)
  54. if err != nil {
  55. // Bad, seen, or conflicting tx.
  56. log.Info("Could not add tx", "tx", msg.Tx)
  57. return
  58. } else {
  59. log.Info("Added valid tx", "tx", msg.Tx)
  60. }
  61. // Share tx.
  62. // We use a simple shotgun approach for now.
  63. // TODO: improve efficiency
  64. for _, peer := range memR.Switch.Peers().List() {
  65. if peer.Key == src.Key {
  66. continue
  67. }
  68. peer.TrySend(MempoolChannel, msg)
  69. }
  70. default:
  71. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  72. }
  73. }
  74. func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
  75. err := memR.Mempool.AddTx(tx)
  76. if err != nil {
  77. return err
  78. }
  79. msg := &TxMessage{Tx: tx}
  80. memR.Switch.Broadcast(MempoolChannel, msg)
  81. return nil
  82. }
  83. // implements events.Eventable
  84. func (memR *MempoolReactor) SetFireable(evsw events.Fireable) {
  85. memR.evsw = evsw
  86. }
  87. //-----------------------------------------------------------------------------
  88. // Messages
  89. const (
  90. msgTypeTx = byte(0x01)
  91. )
  92. type MempoolMessage interface{}
  93. var _ = wire.RegisterInterface(
  94. struct{ MempoolMessage }{},
  95. wire.ConcreteType{&TxMessage{}, msgTypeTx},
  96. )
  97. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  98. msgType = bz[0]
  99. n := new(int64)
  100. r := bytes.NewReader(bz)
  101. msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, n, &err).(struct{ MempoolMessage }).MempoolMessage
  102. return
  103. }
  104. //-------------------------------------
  105. type TxMessage struct {
  106. Tx types.Tx
  107. }
  108. func (m *TxMessage) String() string {
  109. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  110. }