You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
2.8 KiB

10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "sync/atomic"
  7. . "github.com/tendermint/tendermint/binary"
  8. . "github.com/tendermint/tendermint/blocks"
  9. . "github.com/tendermint/tendermint/p2p"
  10. )
  11. var (
  12. MempoolCh = byte(0x30)
  13. )
  14. // MempoolReactor handles mempool tx broadcasting amongst peers.
  15. type MempoolReactor struct {
  16. sw *Switch
  17. quit chan struct{}
  18. started uint32
  19. stopped uint32
  20. mempool *Mempool
  21. }
  22. func NewMempoolReactor(sw *Switch, mempool *Mempool) *MempoolReactor {
  23. memR := &MempoolReactor{
  24. sw: sw,
  25. quit: make(chan struct{}),
  26. mempool: mempool,
  27. }
  28. return memR
  29. }
  30. func (memR *MempoolReactor) Start() {
  31. if atomic.CompareAndSwapUint32(&memR.started, 0, 1) {
  32. log.Info("Starting MempoolReactor")
  33. }
  34. }
  35. func (memR *MempoolReactor) Stop() {
  36. if atomic.CompareAndSwapUint32(&memR.stopped, 0, 1) {
  37. log.Info("Stopping MempoolReactor")
  38. close(memR.quit)
  39. }
  40. }
  41. func (memR *MempoolReactor) BroadcastTx(tx Tx) error {
  42. err := memR.mempool.AddTx(tx)
  43. if err != nil {
  44. return err
  45. }
  46. msg := &TxMessage{Tx: tx}
  47. memR.sw.Broadcast(MempoolCh, msg)
  48. return nil
  49. }
  50. // Implements Reactor
  51. func (pexR *MempoolReactor) AddPeer(peer *Peer) {
  52. }
  53. // Implements Reactor
  54. func (pexR *MempoolReactor) RemovePeer(peer *Peer, err error) {
  55. }
  56. func (memR *MempoolReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
  57. _, msg_ := decodeMessage(msgBytes)
  58. log.Info("MempoolReactor received %v", msg_)
  59. switch msg_.(type) {
  60. case *TxMessage:
  61. msg := msg_.(*TxMessage)
  62. err := memR.mempool.AddTx(msg.Tx)
  63. if err != nil {
  64. // Bad, seen, or conflicting tx.
  65. log.Debug("Could not add tx %v", msg.Tx)
  66. return
  67. } else {
  68. log.Debug("Added valid tx %V", msg.Tx)
  69. }
  70. // Share tx.
  71. // We use a simple shotgun approach for now.
  72. // TODO: improve efficiency
  73. for _, peer := range memR.sw.Peers().List() {
  74. if peer.Key == src.Key {
  75. continue
  76. }
  77. peer.TrySend(MempoolCh, msg)
  78. }
  79. default:
  80. // Ignore unknown message
  81. }
  82. }
  83. //-----------------------------------------------------------------------------
  84. // Messages
  85. const (
  86. msgTypeUnknown = byte(0x00)
  87. msgTypeTx = byte(0x10)
  88. )
  89. // TODO: check for unnecessary extra bytes at the end.
  90. func decodeMessage(bz []byte) (msgType byte, msg interface{}) {
  91. n, err := new(int64), new(error)
  92. // log.Debug("decoding msg bytes: %X", bz)
  93. msgType = bz[0]
  94. switch msgType {
  95. case msgTypeTx:
  96. msg = readTxMessage(bytes.NewReader(bz[1:]), n, err)
  97. // case ...:
  98. default:
  99. msg = nil
  100. }
  101. return
  102. }
  103. //-------------------------------------
  104. type TxMessage struct {
  105. Tx Tx
  106. }
  107. func readTxMessage(r io.Reader, n *int64, err *error) *TxMessage {
  108. return &TxMessage{
  109. Tx: ReadTx(r, n, err),
  110. }
  111. }
  112. func (m *TxMessage) WriteTo(w io.Writer) (n int64, err error) {
  113. WriteByte(w, msgTypeTx, &n, &err)
  114. WriteBinary(w, m.Tx, &n, &err)
  115. return
  116. }
  117. func (m *TxMessage) String() string {
  118. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  119. }