Browse Source

rewrite broadcastTxRoutine to use channels

https://play.golang.org/p/gN21yO9IRs3

```
func waitWithCancel(f func() *clist.CElement, ctx context.Context) *clist.CElement {
	el := make(chan *clist.CElement, 1)
	select {
	case el <- f():
```
will just run f() blockingly, so this doesn't change much in terms of behavior.
pull/1173/head
Anton Kaliaev 7 years ago
parent
commit
11b68f1934
No known key found for this signature in database GPG Key ID: 7B6881D965918214
7 changed files with 58 additions and 50 deletions
  1. +7
    -5
      blockchain/reactor_test.go
  2. +9
    -6
      glide.lock
  3. +5
    -1
      glide.yaml
  4. +11
    -4
      mempool/mempool.go
  5. +19
    -34
      mempool/reactor.go
  6. +6
    -0
      p2p/peer.go
  7. +1
    -0
      p2p/pex/pex_reactor_test.go

+ 7
- 5
blockchain/reactor_test.go View File

@ -157,7 +157,7 @@ func makeBlock(height int64, state sm.State) *types.Block {
// The Test peer
type bcrTestPeer struct {
cmn.Service
*cmn.BaseService
id p2p.ID
ch chan interface{}
}
@ -165,11 +165,12 @@ type bcrTestPeer struct {
var _ p2p.Peer = (*bcrTestPeer)(nil)
func newbcrTestPeer(id p2p.ID) *bcrTestPeer {
return &bcrTestPeer{
Service: cmn.NewBaseService(nil, "bcrTestPeer", nil),
id: id,
ch: make(chan interface{}, 2),
bcr := &bcrTestPeer{
id: id,
ch: make(chan interface{}, 2),
}
bcr.BaseService = cmn.NewBaseService(nil, "bcrTestPeer", bcr)
return bcr
}
func (tp *bcrTestPeer) lastValue() interface{} { return <-tp.ch }
@ -195,3 +196,4 @@ func (tp *bcrTestPeer) IsOutbound() bool { return false }
func (tp *bcrTestPeer) IsPersistent() bool { return true }
func (tp *bcrTestPeer) Get(s string) interface{} { return s }
func (tp *bcrTestPeer) Set(string, interface{}) {}
func (tp *bcrTestPeer) QuitChan() <-chan struct{} { return tp.Quit }

+ 9
- 6
glide.lock View File

@ -1,5 +1,5 @@
hash: 78f23456c3ca7af06fc26e59107de92a7b208776643bda398b0a05348153da1b
updated: 2018-02-03T03:31:46.976175875-05:00
hash: 94a3f8a3cf531e0cdde8bc160a2f4bab6f269d99a9a9633404e5badb0481f02c
updated: 2018-02-05T10:04:25.7693634Z
imports:
- name: github.com/btcsuite/btcd
version: 50de9da05b50eb15658bb350f6ea24368a111ab7
@ -116,9 +116,12 @@ imports:
version: b6fc872b42d41158a60307db4da051dd6f179415
subpackages:
- data
- nowriter/tmlegacy
- name: github.com/tendermint/iavl
version: 1a59ec0c82dc940c25339dd7c834df5cb76a95cb
subpackages:
- iavl
- name: github.com/tendermint/tmlibs
version: deaaf014d8b8d1095054380a38b1b00e293f725f
version: 51684dabf79c2079f32cc25d6bccb748ee098386
subpackages:
- autofile
- cli
@ -194,6 +197,8 @@ testImports:
version: 346938d642f2ec3594ed81d874461961cd0faa76
subpackages:
- spew
- name: github.com/fortytw2/leaktest
version: 3b724c3d7b8729a35bf4e577f71653aec6e53513
- name: github.com/pmezard/go-difflib
version: 792786c7400a136282c1664665ae0a8db921c6c2
subpackages:
@ -203,5 +208,3 @@ testImports:
subpackages:
- assert
- require
- name: github.com/fortytw2/leaktest
version: 3b724c3d7b8729a35bf4e577f71653aec6e53513

+ 5
- 1
glide.yaml View File

@ -29,9 +29,13 @@ import:
version: master
subpackages:
- data
- package: github.com/tendermint/tmlibs
- package: github.com/tendermint/iavl
version: develop
subpackages:
- iavl
- package: github.com/tendermint/tmlibs
version: 51684dabf79c2079f32cc25d6bccb748ee098386
subpackages:
- autofile
- cli
- cli/flags


+ 11
- 4
mempool/mempool.go View File

@ -178,10 +178,17 @@ func (mem *Mempool) Flush() {
}
}
// TxsFrontWait returns the first transaction in the ordered list for peer goroutines to call .NextWait() on.
// It blocks until the mempool is not empty (ie. until the internal `mem.txs` has at least one element)
func (mem *Mempool) TxsFrontWait() *clist.CElement {
return mem.txs.FrontWait()
// TxsFront returns the first transaction in the ordered list for peer
// goroutines to call .NextWait() on.
func (mem *Mempool) TxsFront() *clist.CElement {
return mem.txs.Front()
}
// TxsWaitChan returns a channel to wait on transactions. It will be closed
// once the mempool is not empty (ie. the internal `mem.txs` has at least one
// element)
func (mem *Mempool) TxsWaitChan() <-chan struct{} {
return mem.txs.WaitChan()
}
// CheckTx executes a new transaction against the application to determine its validity


+ 19
- 34
mempool/reactor.go View File

@ -2,7 +2,6 @@ package mempool
import (
"bytes"
"context"
"fmt"
"reflect"
"time"
@ -107,30 +106,20 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
return
}
// used to abort waiting until a tx available
// otherwise TxsFrontWait/NextWait could block forever if there are
// no txs
ctx, cancel := context.WithCancel(context.Background())
go func() {
const healthCheckInterval = 5 * time.Second
for {
if !memR.IsRunning() || !peer.IsRunning() {
cancel()
return
}
time.Sleep(healthCheckInterval)
}
}()
var next *clist.CElement
for {
// This happens because the CElement we were looking at got
// garbage collected (removed). That is, .NextWait() returned nil.
// Go ahead and start from the beginning.
// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWait() returned nil. Go ahead and
// start from the beginning.
if next == nil {
// Wait until a tx is available
next = waitWithCancel(memR.Mempool.TxsFrontWait, ctx)
if ctx.Err() != nil {
select {
case <-memR.Mempool.TxsWaitChan(): // Wait until a tx is available
if next = memR.Mempool.TxsFront(); next == nil {
continue
}
case <-peer.QuitChan():
return
case <-memR.Quit:
return
}
}
@ -152,23 +141,19 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
}
next = waitWithCancel(next.NextWait, ctx)
if ctx.Err() != nil {
select {
case <-next.NextWaitChan():
// see the start of the for loop for nil check
next = next.Next()
case <-peer.QuitChan():
return
case <-memR.Quit:
return
}
}
}
func waitWithCancel(f func() *clist.CElement, ctx context.Context) *clist.CElement {
el := make(chan *clist.CElement, 1)
select {
case el <- f():
return <-el
case <-ctx.Done():
return nil
}
}
//-----------------------------------------------------------------------------
// Messages


+ 6
- 0
p2p/peer.go View File

@ -18,6 +18,7 @@ import (
// Peer is an interface representing a peer connected on a reactor.
type Peer interface {
cmn.Service
QuitChan() <-chan struct{}
ID() ID // peer's cryptographic ID
IsOutbound() bool // did we dial the peer
@ -331,6 +332,11 @@ func (p *peer) String() string {
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
}
// QuitChan returns a channel, which will be closed once peer is stopped.
func (p *peer) QuitChan() <-chan struct{} {
return p.Quit
}
//------------------------------------------------------------------
// helper funcs


+ 1
- 0
p2p/pex/pex_reactor_test.go View File

@ -368,3 +368,4 @@ func (mp mockPeer) Send(byte, interface{}) bool { return false }
func (mp mockPeer) TrySend(byte, interface{}) bool { return false }
func (mp mockPeer) Set(string, interface{}) {}
func (mp mockPeer) Get(string) interface{} { return nil }
func (mp mockPeer) QuitChan() <-chan struct{} { return mp.Quit }

Loading…
Cancel
Save