You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

246 lines
6.5 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "time"
  8. . "github.com/tendermint/go-common"
  9. "github.com/tendermint/go-p2p"
  10. "github.com/tendermint/go-wire"
  11. "github.com/tendermint/tendermint/events"
  12. sm "github.com/tendermint/tendermint/state"
  13. "github.com/tendermint/tendermint/types"
  14. )
  15. var (
  16. MempoolChannel = byte(0x30)
  17. checkExecutedTxsMilliseconds = 1 // check for new mempool txs to send to peer
  18. txsToSendPerCheck = 64 // send up to this many txs from the mempool per check
  19. newBlockChCapacity = 100 // queue to process this many ResetInfos per peer
  20. maxMempoolMessageSize = 1048576 // 1MB TODO make it configurable
  21. )
  22. // MempoolReactor handles mempool tx broadcasting amongst peers.
  23. type MempoolReactor struct {
  24. p2p.BaseReactor
  25. Mempool *Mempool
  26. evsw events.Fireable
  27. }
  28. func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
  29. memR := &MempoolReactor{
  30. Mempool: mempool,
  31. }
  32. memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR)
  33. return memR
  34. }
  35. // Implements Reactor
  36. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  37. return []*p2p.ChannelDescriptor{
  38. &p2p.ChannelDescriptor{
  39. ID: MempoolChannel,
  40. Priority: 5,
  41. },
  42. }
  43. }
  44. // Implements Reactor
  45. func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  46. // Each peer gets a go routine on which we broadcast transactions in the same order we applied them to our state.
  47. newBlockChan := make(chan ResetInfo, newBlockChCapacity)
  48. peer.Data.Set(types.PeerMempoolChKey, newBlockChan)
  49. timer := time.NewTicker(time.Millisecond * time.Duration(checkExecutedTxsMilliseconds))
  50. go memR.broadcastTxRoutine(timer.C, newBlockChan, peer)
  51. }
  52. // Implements Reactor
  53. func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  54. // broadcast routine checks if peer is gone and returns
  55. }
  56. // Implements Reactor
  57. func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  58. _, msg, err := DecodeMessage(msgBytes)
  59. if err != nil {
  60. log.Warn("Error decoding message", "error", err)
  61. return
  62. }
  63. log.Notice("MempoolReactor received message", "msg", msg)
  64. switch msg := msg.(type) {
  65. case *TxMessage:
  66. err := memR.Mempool.AddTx(msg.Tx)
  67. if err != nil {
  68. // Bad, seen, or conflicting tx.
  69. log.Info("Could not add tx", "tx", msg.Tx)
  70. return
  71. } else {
  72. log.Info("Added valid tx", "tx", msg.Tx)
  73. }
  74. // broadcasting happens from go routines per peer
  75. default:
  76. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  77. }
  78. }
  79. // "block" is the new block being committed.
  80. // "state" is the result of state.AppendBlock("block").
  81. // Txs that are present in "block" are discarded from mempool.
  82. // Txs that have become invalid in the new "state" are also discarded.
  83. func (memR *MempoolReactor) ResetForBlockAndState(block *types.Block, state *sm.State) {
  84. ri := memR.Mempool.ResetForBlockAndState(block, state)
  85. for _, peer := range memR.Switch.Peers().List() {
  86. peerMempoolCh := peer.Data.Get(types.PeerMempoolChKey).(chan ResetInfo)
  87. select {
  88. case peerMempoolCh <- ri:
  89. default:
  90. memR.Switch.StopPeerForError(peer, errors.New("Peer's mempool push channel full"))
  91. }
  92. }
  93. }
  94. // Just an alias for AddTx since broadcasting happens in peer routines
  95. func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
  96. return memR.Mempool.AddTx(tx)
  97. }
  98. type PeerState interface {
  99. GetHeight() int
  100. }
  101. type Peer interface {
  102. IsRunning() bool
  103. Send(byte, interface{}) bool
  104. Get(string) interface{}
  105. }
  106. // send new mempool txs to peer, strictly in order we applied them to our state.
  107. // new blocks take chunks out of the mempool, but we've already sent some txs to the peer.
  108. // so we wait to hear that the peer has progressed to the new height, and then continue sending txs from where we left off
  109. func (memR *MempoolReactor) broadcastTxRoutine(tickerChan <-chan time.Time, newBlockChan chan ResetInfo, peer Peer) {
  110. var height = memR.Mempool.GetHeight()
  111. var txsSent int // new txs sent for height. (reset every new height)
  112. for {
  113. select {
  114. case <-tickerChan:
  115. if !peer.IsRunning() {
  116. return
  117. }
  118. // make sure the peer is up to date
  119. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  120. peerState := peerState_i.(PeerState)
  121. if peerState.GetHeight() < height {
  122. continue
  123. }
  124. } else {
  125. continue
  126. }
  127. // check the mempool for new transactions
  128. newTxs := memR.getNewTxs(height)
  129. txsSentLoop := 0
  130. start := time.Now()
  131. TX_LOOP:
  132. for i := txsSent; i < len(newTxs) && txsSentLoop < txsToSendPerCheck; i++ {
  133. tx := newTxs[i]
  134. msg := &TxMessage{Tx: tx}
  135. success := peer.Send(MempoolChannel, msg)
  136. if !success {
  137. break TX_LOOP
  138. } else {
  139. txsSentLoop += 1
  140. }
  141. }
  142. if txsSentLoop > 0 {
  143. txsSent += txsSentLoop
  144. log.Info("Sent txs to peer", "txsSentLoop", txsSentLoop,
  145. "took", time.Since(start), "txsSent", txsSent, "newTxs", len(newTxs))
  146. }
  147. case ri := <-newBlockChan:
  148. height = ri.Height
  149. // find out how many txs below what we've sent were included in a block and how many became invalid
  150. included := tallyRangesUpTo(ri.Included, txsSent)
  151. invalidated := tallyRangesUpTo(ri.Invalid, txsSent)
  152. txsSent -= included + invalidated
  153. }
  154. }
  155. }
  156. // fetch new txs from the mempool
  157. func (memR *MempoolReactor) getNewTxs(height int) (txs []types.Tx) {
  158. memR.Mempool.mtx.Lock()
  159. defer memR.Mempool.mtx.Unlock()
  160. // if the mempool got ahead of us just return empty txs
  161. if memR.Mempool.state.LastBlockHeight != height {
  162. return
  163. }
  164. return memR.Mempool.txs
  165. }
  166. // return the size of ranges less than upTo
  167. func tallyRangesUpTo(ranger []Range, upTo int) int {
  168. totalUpTo := 0
  169. for _, r := range ranger {
  170. if r.Start >= upTo {
  171. break
  172. }
  173. if r.Start+r.Length >= upTo {
  174. totalUpTo += upTo - r.Start
  175. break
  176. }
  177. totalUpTo += r.Length
  178. }
  179. return totalUpTo
  180. }
  181. // implements events.Eventable
  182. func (memR *MempoolReactor) SetFireable(evsw events.Fireable) {
  183. memR.evsw = evsw
  184. }
  185. //-----------------------------------------------------------------------------
  186. // Messages
  187. const (
  188. msgTypeTx = byte(0x01)
  189. )
  190. type MempoolMessage interface{}
  191. var _ = wire.RegisterInterface(
  192. struct{ MempoolMessage }{},
  193. wire.ConcreteType{&TxMessage{}, msgTypeTx},
  194. )
  195. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  196. msgType = bz[0]
  197. n := new(int)
  198. r := bytes.NewReader(bz)
  199. msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, maxMempoolMessageSize, n, &err).(struct{ MempoolMessage }).MempoolMessage
  200. return
  201. }
  202. //-------------------------------------
  203. type TxMessage struct {
  204. Tx types.Tx
  205. }
  206. func (m *TxMessage) String() string {
  207. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  208. }