You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

245 lines
6.4 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package mempool
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "time"
  8. . "github.com/tendermint/tendermint/common"
  9. "github.com/tendermint/tendermint/events"
  10. "github.com/tendermint/tendermint/p2p"
  11. sm "github.com/tendermint/tendermint/state"
  12. "github.com/tendermint/tendermint/types"
  13. "github.com/tendermint/tendermint/wire"
  14. )
  15. var (
  16. MempoolChannel = byte(0x30)
  17. checkExecutedTxsMilliseconds = 1 // check for new mempool txs to send to peer
  18. txsToSendPerCheck = 64 // send up to this many txs from the mempool per check
  19. newBlockChCapacity = 100 // queue to process this many ResetInfos per peer
  20. )
  21. // MempoolReactor handles mempool tx broadcasting amongst peers.
  22. type MempoolReactor struct {
  23. p2p.BaseReactor
  24. Mempool *Mempool
  25. evsw events.Fireable
  26. }
  27. func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
  28. memR := &MempoolReactor{
  29. Mempool: mempool,
  30. }
  31. memR.BaseReactor = *p2p.NewBaseReactor(log, "MempoolReactor", memR)
  32. return memR
  33. }
  34. // Implements Reactor
  35. func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
  36. return []*p2p.ChannelDescriptor{
  37. &p2p.ChannelDescriptor{
  38. ID: MempoolChannel,
  39. Priority: 5,
  40. },
  41. }
  42. }
  43. // Implements Reactor
  44. func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) {
  45. // Each peer gets a go routine on which we broadcast transactions in the same order we applied them to our state.
  46. newBlockChan := make(chan ResetInfo, newBlockChCapacity)
  47. peer.Data.Set(types.PeerMempoolChKey, newBlockChan)
  48. timer := time.NewTicker(time.Millisecond * time.Duration(checkExecutedTxsMilliseconds))
  49. go memR.broadcastTxRoutine(timer.C, newBlockChan, peer)
  50. }
  51. // Implements Reactor
  52. func (memR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
  53. // broadcast routine checks if peer is gone and returns
  54. }
  55. // Implements Reactor
  56. func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
  57. _, msg, err := DecodeMessage(msgBytes)
  58. if err != nil {
  59. log.Warn("Error decoding message", "error", err)
  60. return
  61. }
  62. log.Notice("MempoolReactor received message", "msg", msg)
  63. switch msg := msg.(type) {
  64. case *TxMessage:
  65. err := memR.Mempool.AddTx(msg.Tx)
  66. if err != nil {
  67. // Bad, seen, or conflicting tx.
  68. log.Info("Could not add tx", "tx", msg.Tx)
  69. return
  70. } else {
  71. log.Info("Added valid tx", "tx", msg.Tx)
  72. }
  73. // broadcasting happens from go routines per peer
  74. default:
  75. log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg)))
  76. }
  77. }
  78. // "block" is the new block being committed.
  79. // "state" is the result of state.AppendBlock("block").
  80. // Txs that are present in "block" are discarded from mempool.
  81. // Txs that have become invalid in the new "state" are also discarded.
  82. func (memR *MempoolReactor) ResetForBlockAndState(block *types.Block, state *sm.State) {
  83. ri := memR.Mempool.ResetForBlockAndState(block, state)
  84. for _, peer := range memR.Switch.Peers().List() {
  85. peerMempoolCh := peer.Data.Get(types.PeerMempoolChKey).(chan ResetInfo)
  86. select {
  87. case peerMempoolCh <- ri:
  88. default:
  89. memR.Switch.StopPeerForError(peer, errors.New("Peer's mempool push channel full"))
  90. }
  91. }
  92. }
  93. // Just an alias for AddTx since broadcasting happens in peer routines
  94. func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
  95. return memR.Mempool.AddTx(tx)
  96. }
  97. type PeerState interface {
  98. GetHeight() int
  99. }
  100. type Peer interface {
  101. IsRunning() bool
  102. Send(byte, interface{}) bool
  103. Get(string) interface{}
  104. }
  105. // send new mempool txs to peer, strictly in order we applied them to our state.
  106. // new blocks take chunks out of the mempool, but we've already sent some txs to the peer.
  107. // so we wait to hear that the peer has progressed to the new height, and then continue sending txs from where we left off
  108. func (memR *MempoolReactor) broadcastTxRoutine(tickerChan <-chan time.Time, newBlockChan chan ResetInfo, peer Peer) {
  109. var height = memR.Mempool.GetHeight()
  110. var txsSent int // new txs sent for height. (reset every new height)
  111. for {
  112. select {
  113. case <-tickerChan:
  114. if !peer.IsRunning() {
  115. return
  116. }
  117. // make sure the peer is up to date
  118. if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
  119. peerState := peerState_i.(PeerState)
  120. if peerState.GetHeight() < height {
  121. continue
  122. }
  123. } else {
  124. continue
  125. }
  126. // check the mempool for new transactions
  127. newTxs := memR.getNewTxs(height)
  128. txsSentLoop := 0
  129. start := time.Now()
  130. TX_LOOP:
  131. for i := txsSent; i < len(newTxs) && txsSentLoop < txsToSendPerCheck; i++ {
  132. tx := newTxs[i]
  133. msg := &TxMessage{Tx: tx}
  134. success := peer.Send(MempoolChannel, msg)
  135. if !success {
  136. break TX_LOOP
  137. } else {
  138. txsSentLoop += 1
  139. }
  140. }
  141. if txsSentLoop > 0 {
  142. txsSent += txsSentLoop
  143. log.Info("Sent txs to peer", "txsSentLoop", txsSentLoop,
  144. "took", time.Since(start), "txsSent", txsSent, "newTxs", len(newTxs))
  145. }
  146. case ri := <-newBlockChan:
  147. height = ri.Height
  148. // find out how many txs below what we've sent were included in a block and how many became invalid
  149. included := tallyRangesUpTo(ri.Included, txsSent)
  150. invalidated := tallyRangesUpTo(ri.Invalid, txsSent)
  151. txsSent -= included + invalidated
  152. }
  153. }
  154. }
  155. // fetch new txs from the mempool
  156. func (memR *MempoolReactor) getNewTxs(height int) (txs []types.Tx) {
  157. memR.Mempool.mtx.Lock()
  158. defer memR.Mempool.mtx.Unlock()
  159. // if the mempool got ahead of us just return empty txs
  160. if memR.Mempool.state.LastBlockHeight != height {
  161. return
  162. }
  163. return memR.Mempool.txs
  164. }
  165. // return the size of ranges less than upTo
  166. func tallyRangesUpTo(ranger []Range, upTo int) int {
  167. totalUpTo := 0
  168. for _, r := range ranger {
  169. if r.Start >= upTo {
  170. break
  171. }
  172. if r.Start+r.Length >= upTo {
  173. totalUpTo += upTo - r.Start
  174. break
  175. }
  176. totalUpTo += r.Length
  177. }
  178. return totalUpTo
  179. }
  180. // implements events.Eventable
  181. func (memR *MempoolReactor) SetFireable(evsw events.Fireable) {
  182. memR.evsw = evsw
  183. }
  184. //-----------------------------------------------------------------------------
  185. // Messages
  186. const (
  187. msgTypeTx = byte(0x01)
  188. )
  189. type MempoolMessage interface{}
  190. var _ = wire.RegisterInterface(
  191. struct{ MempoolMessage }{},
  192. wire.ConcreteType{&TxMessage{}, msgTypeTx},
  193. )
  194. func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
  195. msgType = bz[0]
  196. n := new(int64)
  197. r := bytes.NewReader(bz)
  198. msg = wire.ReadBinary(struct{ MempoolMessage }{}, r, n, &err).(struct{ MempoolMessage }).MempoolMessage
  199. return
  200. }
  201. //-------------------------------------
  202. type TxMessage struct {
  203. Tx types.Tx
  204. }
  205. func (m *TxMessage) String() string {
  206. return fmt.Sprintf("[TxMessage %v]", m.Tx)
  207. }