You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

275 lines
7.1 KiB

9 years ago
  1. package mempool
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. "time"
  7. acm "github.com/tendermint/tendermint/account"
  8. _ "github.com/tendermint/tendermint/config/tendermint_test"
  9. sm "github.com/tendermint/tendermint/state"
  10. "github.com/tendermint/tendermint/types"
  11. )
  12. var someAddr = []byte("ABCDEFGHIJABCDEFGHIJ")
  13. // number of txs
  14. var nTxs = 100
  15. // what the ResetInfo should look like after ResetForBlockAndState
  16. var TestResetInfoData = ResetInfo{
  17. Included: []Range{
  18. Range{0, 5},
  19. Range{10, 10},
  20. Range{30, 5},
  21. },
  22. Invalid: []Range{
  23. Range{5, 5},
  24. Range{20, 8}, // let 28 and 29 be valid
  25. Range{35, 64}, // let 99 be valid
  26. },
  27. }
  28. // inverse of the ResetInfo
  29. var notInvalidNotIncluded = map[int]struct{}{
  30. 28: struct{}{},
  31. 29: struct{}{},
  32. 99: struct{}{},
  33. }
  34. func newSendTx(t *testing.T, mempool *Mempool, from *acm.PrivAccount, to []byte, amt int64) types.Tx {
  35. tx := types.NewSendTx()
  36. tx.AddInput(mempool.GetCache(), from.PubKey, amt)
  37. tx.AddOutput(to, amt)
  38. tx.SignInput(config.GetString("chain_id"), 0, from)
  39. if err := mempool.AddTx(tx); err != nil {
  40. t.Fatal(err)
  41. }
  42. return tx
  43. }
  44. func addTxs(t *testing.T, mempool *Mempool, lastAcc *acm.PrivAccount, privAccs []*acm.PrivAccount) []types.Tx {
  45. txs := make([]types.Tx, nTxs)
  46. for i := 0; i < nTxs; i++ {
  47. if _, ok := notInvalidNotIncluded[i]; ok {
  48. txs[i] = newSendTx(t, mempool, lastAcc, someAddr, 10)
  49. } else {
  50. txs[i] = newSendTx(t, mempool, privAccs[i%len(privAccs)], privAccs[(i+1)%len(privAccs)].Address, 5)
  51. }
  52. }
  53. return txs
  54. }
  55. func makeBlock(mempool *Mempool) *types.Block {
  56. txs := mempool.GetProposalTxs()
  57. var includedTxs []types.Tx
  58. for _, rid := range TestResetInfoData.Included {
  59. includedTxs = append(includedTxs, txs[rid.Start:rid.Start+rid.Length]...)
  60. }
  61. mempool.mtx.Lock()
  62. state := mempool.state
  63. state.LastBlockHeight += 1
  64. mempool.mtx.Unlock()
  65. return &types.Block{
  66. Header: &types.Header{
  67. ChainID: state.ChainID,
  68. Height: state.LastBlockHeight,
  69. NumTxs: len(includedTxs),
  70. },
  71. Data: &types.Data{
  72. Txs: includedTxs,
  73. },
  74. }
  75. }
  76. // Add txs. Grab chunks to put in block. All the others become invalid because of nonce errors except those in notInvalidNotIncluded
  77. func TestResetInfo(t *testing.T) {
  78. amtPerAccount := int64(100000)
  79. state, privAccs, _ := sm.RandGenesisState(6, false, amtPerAccount, 1, true, 100)
  80. mempool := NewMempool(state)
  81. lastAcc := privAccs[5] // we save him (his tx wont become invalid)
  82. privAccs = privAccs[:5]
  83. txs := addTxs(t, mempool, lastAcc, privAccs)
  84. // its actually an invalid block since we're skipping nonces
  85. // but all we care about is how the mempool responds after
  86. block := makeBlock(mempool)
  87. mempool.ResetForBlockAndState(block, state)
  88. ri := mempool.resetInfo
  89. if len(ri.Included) != len(TestResetInfoData.Included) {
  90. t.Fatalf("invalid number of included ranges. Got %d, expected %d\n", len(ri.Included), len(TestResetInfoData.Included))
  91. }
  92. if len(ri.Invalid) != len(TestResetInfoData.Invalid) {
  93. t.Fatalf("invalid number of invalid ranges. Got %d, expected %d\n", len(ri.Invalid), len(TestResetInfoData.Invalid))
  94. }
  95. for i, rid := range ri.Included {
  96. inc := TestResetInfoData.Included[i]
  97. if rid.Start != inc.Start {
  98. t.Fatalf("Invalid start of range. Got %d, expected %d\n", inc.Start, rid.Start)
  99. }
  100. if rid.Length != inc.Length {
  101. t.Fatalf("Invalid length of range. Got %d, expected %d\n", inc.Length, rid.Length)
  102. }
  103. }
  104. txs = mempool.GetProposalTxs()
  105. if len(txs) != len(notInvalidNotIncluded) {
  106. t.Fatalf("Expected %d txs left in mempool. Got %d", len(notInvalidNotIncluded), len(txs))
  107. }
  108. }
  109. //------------------------------------------------------------------------------------------
  110. type TestPeer struct {
  111. sync.Mutex
  112. running bool
  113. height int
  114. t *testing.T
  115. received int
  116. txs map[string]int
  117. timeoutFail int
  118. done chan int
  119. }
  120. func newPeer(t *testing.T, state *sm.State) *TestPeer {
  121. return &TestPeer{
  122. running: true,
  123. height: state.LastBlockHeight,
  124. t: t,
  125. txs: make(map[string]int),
  126. done: make(chan int),
  127. }
  128. }
  129. func (tp *TestPeer) IsRunning() bool {
  130. tp.Lock()
  131. defer tp.Unlock()
  132. return tp.running
  133. }
  134. func (tp *TestPeer) SetRunning(running bool) {
  135. tp.Lock()
  136. defer tp.Unlock()
  137. tp.running = running
  138. }
  139. func (tp *TestPeer) Send(chID byte, msg interface{}) bool {
  140. if tp.timeoutFail > 0 {
  141. time.Sleep(time.Second * time.Duration(tp.timeoutFail))
  142. return false
  143. }
  144. tx := msg.(*TxMessage).Tx
  145. id := types.TxID(config.GetString("chain_id"), tx)
  146. if _, ok := tp.txs[string(id)]; ok {
  147. tp.t.Fatal("received the same tx twice!")
  148. }
  149. tp.txs[string(id)] = tp.received
  150. tp.received += 1
  151. tp.done <- tp.received
  152. return true
  153. }
  154. func (tp *TestPeer) Get(key string) interface{} {
  155. return tp
  156. }
  157. func (tp *TestPeer) GetHeight() int {
  158. return tp.height
  159. }
  160. func TestBroadcast(t *testing.T) {
  161. state, privAccs, _ := sm.RandGenesisState(6, false, 10000, 1, true, 100)
  162. mempool := NewMempool(state)
  163. reactor := NewMempoolReactor(mempool)
  164. reactor.Start()
  165. lastAcc := privAccs[5] // we save him (his tx wont become invalid)
  166. privAccs = privAccs[:5]
  167. peer := newPeer(t, state)
  168. newBlockChan := make(chan ResetInfo)
  169. tickerChan := make(chan time.Time)
  170. go reactor.broadcastTxRoutine(tickerChan, newBlockChan, peer)
  171. // we don't broadcast any before updating
  172. fmt.Println("dont broadcast any")
  173. addTxs(t, mempool, lastAcc, privAccs)
  174. block := makeBlock(mempool)
  175. mempool.ResetForBlockAndState(block, state)
  176. newBlockChan <- mempool.resetInfo
  177. peer.height = mempool.resetInfo.Height
  178. tickerChan <- time.Now()
  179. pullTxs(t, peer, len(mempool.txs)) // should have sent whatever txs are left (3)
  180. toBroadcast := []int{1, 3, 7, 9, 11, 12, 18, 20, 21, 28, 29, 30, 31, 34, 35, 36, 50, 90, 99, 100}
  181. for _, N := range toBroadcast {
  182. peer = resetPeer(t, reactor, mempool, state, tickerChan, newBlockChan, peer)
  183. // we broadcast N txs before updating
  184. fmt.Println("broadcast", N)
  185. addTxs(t, mempool, lastAcc, privAccs)
  186. txsToSendPerCheck = N
  187. tickerChan <- time.Now()
  188. pullTxs(t, peer, txsToSendPerCheck) // should have sent N txs
  189. block = makeBlock(mempool)
  190. mempool.ResetForBlockAndState(block, state)
  191. newBlockChan <- mempool.resetInfo
  192. peer.height = mempool.resetInfo.Height
  193. txsToSendPerCheck = 100
  194. tickerChan <- time.Now()
  195. left := len(mempool.txs)
  196. if N > 99 {
  197. left -= 3
  198. } else if N > 29 {
  199. left -= 2
  200. } else if N > 28 {
  201. left -= 1
  202. }
  203. pullTxs(t, peer, left) // should have sent whatever txs are left that havent been sent
  204. }
  205. }
  206. func pullTxs(t *testing.T, peer *TestPeer, N int) {
  207. timer := time.NewTicker(time.Second * 2)
  208. for i := 0; i < N; i++ {
  209. select {
  210. case <-peer.done:
  211. case <-timer.C:
  212. panic(fmt.Sprintf("invalid number of received messages. Got %d, expected %d\n", i, N))
  213. }
  214. }
  215. if N == 0 {
  216. select {
  217. case <-peer.done:
  218. t.Fatalf("should not have sent any more txs")
  219. case <-timer.C:
  220. }
  221. }
  222. }
  223. func resetPeer(t *testing.T, reactor *MempoolReactor, mempool *Mempool, state *sm.State, tickerChan chan time.Time, newBlockChan chan ResetInfo, peer *TestPeer) *TestPeer {
  224. // reset peer
  225. mempool.txs = []types.Tx{}
  226. mempool.state = state
  227. mempool.cache = sm.NewBlockCache(state)
  228. peer.SetRunning(false)
  229. tickerChan <- time.Now()
  230. peer = newPeer(t, state)
  231. go reactor.broadcastTxRoutine(tickerChan, newBlockChan, peer)
  232. return peer
  233. }