Browse Source

sync+p2p: remove closer (#7805)

pull/7811/head
Sam Kleinman 2 years ago
committed by GitHub
parent
commit
dbb7d6ecdd
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 62 additions and 138 deletions
  1. +0
    -31
      internal/libs/sync/closer.go
  2. +0
    -28
      internal/libs/sync/closer_test.go
  3. +19
    -28
      internal/p2p/pqueue.go
  4. +12
    -18
      internal/p2p/queue.go
  5. +1
    -2
      internal/p2p/router_filter_test.go
  6. +8
    -8
      internal/p2p/router_test.go
  7. +22
    -23
      internal/p2p/transport_memory.go

+ 0
- 31
internal/libs/sync/closer.go View File

@ -1,31 +0,0 @@
package sync
import "sync"
// Closer implements a primitive to close a channel that signals process
// termination while allowing a caller to call Close multiple times safely. It
// should be used in cases where guarantees cannot be made about when and how
// many times closure is executed.
type Closer struct {
closeOnce sync.Once
doneCh chan struct{}
}
// NewCloser returns a reference to a new Closer.
func NewCloser() *Closer {
return &Closer{doneCh: make(chan struct{})}
}
// Done returns the internal done channel allowing the caller either block or wait
// for the Closer to be terminated/closed.
func (c *Closer) Done() <-chan struct{} {
return c.doneCh
}
// Close gracefully closes the Closer. A caller should only call Close once, but
// it is safe to call it successive times.
func (c *Closer) Close() {
c.closeOnce.Do(func() {
close(c.doneCh)
})
}

+ 0
- 28
internal/libs/sync/closer_test.go View File

@ -1,28 +0,0 @@
package sync_test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
)
func TestCloser(t *testing.T) {
closer := tmsync.NewCloser()
var timeout bool
select {
case <-closer.Done():
case <-time.After(time.Second):
timeout = true
}
for i := 0; i < 10; i++ {
closer.Close()
}
require.True(t, timeout)
<-closer.Done()
}

+ 19
- 28
internal/p2p/pqueue.go View File

@ -5,10 +5,10 @@ import (
"context"
"sort"
"strconv"
"sync"
"time"
"github.com/gogo/protobuf/proto"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/log"
)
@ -78,8 +78,10 @@ type pqScheduler struct {
enqueueCh chan Envelope
dequeueCh chan Envelope
closer *tmsync.Closer
done *tmsync.Closer
closeFn func()
closeCh <-chan struct{}
done chan struct{}
}
func newPQScheduler(
@ -108,6 +110,9 @@ func newPQScheduler(
pq := make(priorityQueue, 0)
heap.Init(&pq)
closeCh := make(chan struct{})
once := &sync.Once{}
return &pqScheduler{
logger: logger.With("router", "scheduler"),
metrics: m,
@ -118,32 +123,18 @@ func newPQScheduler(
sizes: sizes,
enqueueCh: make(chan Envelope, enqueueBuf),
dequeueCh: make(chan Envelope, dequeueBuf),
closer: tmsync.NewCloser(),
done: tmsync.NewCloser(),
closeFn: func() { once.Do(func() { close(closeCh) }) },
closeCh: closeCh,
done: make(chan struct{}),
}
}
func (s *pqScheduler) enqueue() chan<- Envelope {
return s.enqueueCh
}
func (s *pqScheduler) dequeue() <-chan Envelope {
return s.dequeueCh
}
func (s *pqScheduler) close() {
s.closer.Close()
<-s.done.Done()
}
func (s *pqScheduler) closed() <-chan struct{} {
return s.closer.Done()
}
// start starts non-blocking process that starts the priority queue scheduler.
func (s *pqScheduler) start(ctx context.Context) {
go s.process(ctx)
}
func (s *pqScheduler) start(ctx context.Context) { go s.process(ctx) }
func (s *pqScheduler) enqueue() chan<- Envelope { return s.enqueueCh }
func (s *pqScheduler) dequeue() <-chan Envelope { return s.dequeueCh }
func (s *pqScheduler) close() { s.closeFn() }
func (s *pqScheduler) closed() <-chan struct{} { return s.done }
// process starts a block process where we listen for Envelopes to enqueue. If
// there is sufficient capacity, it will be enqueued into the priority queue,
@ -155,7 +146,7 @@ func (s *pqScheduler) start(ctx context.Context) {
// After we attempt to enqueue the incoming Envelope, if the priority queue is
// non-empty, we pop the top Envelope and send it on the dequeueCh.
func (s *pqScheduler) process(ctx context.Context) {
defer s.done.Close()
defer close(s.done)
for {
select {
@ -264,13 +255,13 @@ func (s *pqScheduler) process(ctx context.Context) {
"peer_id", string(pqEnv.envelope.To)).Add(float64(-pqEnv.size))
select {
case s.dequeueCh <- pqEnv.envelope:
case <-s.closer.Done():
case <-s.closeCh:
return
}
}
case <-ctx.Done():
return
case <-s.closer.Done():
case <-s.closeCh:
return
}
}


+ 12
- 18
internal/p2p/queue.go View File

@ -1,7 +1,7 @@
package p2p
import (
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"sync"
)
// default capacity for the size of a queue
@ -32,28 +32,22 @@ type queue interface {
// in the order they were received, and blocks until message is received.
type fifoQueue struct {
queueCh chan Envelope
closer *tmsync.Closer
closeFn func()
closeCh <-chan struct{}
}
func newFIFOQueue(size int) queue {
closeCh := make(chan struct{})
once := &sync.Once{}
return &fifoQueue{
queueCh: make(chan Envelope, size),
closer: tmsync.NewCloser(),
closeFn: func() { once.Do(func() { close(closeCh) }) },
closeCh: closeCh,
}
}
func (q *fifoQueue) enqueue() chan<- Envelope {
return q.queueCh
}
func (q *fifoQueue) dequeue() <-chan Envelope {
return q.queueCh
}
func (q *fifoQueue) close() {
q.closer.Close()
}
func (q *fifoQueue) closed() <-chan struct{} {
return q.closer.Done()
}
func (q *fifoQueue) enqueue() chan<- Envelope { return q.queueCh }
func (q *fifoQueue) dequeue() <-chan Envelope { return q.queueCh }
func (q *fifoQueue) close() { q.closeFn() }
func (q *fifoQueue) closed() <-chan struct{} { return q.closeCh }

+ 1
- 2
internal/p2p/router_filter_test.go View File

@ -8,7 +8,6 @@ import (
"time"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/log"
)
@ -29,6 +28,6 @@ func TestConnectionFiltering(t *testing.T) {
},
}
require.Equal(t, 0, filterByIPCount)
router.openConnection(ctx, &MemoryConnection{logger: logger, closer: sync.NewCloser()})
router.openConnection(ctx, &MemoryConnection{logger: logger, closeFn: func() {}})
require.Equal(t, 1, filterByIPCount)
}

+ 8
- 8
internal/p2p/router_test.go View File

@ -19,7 +19,6 @@ import (
dbm "github.com/tendermint/tm-db"
"github.com/tendermint/tendermint/crypto"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/mocks"
"github.com/tendermint/tendermint/internal/p2p/p2ptest"
@ -385,12 +384,12 @@ func TestRouter_AcceptPeers(t *testing.T) {
t.Cleanup(leaktest.Check(t))
// Set up a mock transport that handshakes.
closer := tmsync.NewCloser()
connCtx, connCancel := context.WithCancel(context.Background())
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
Return(tc.peerInfo, tc.peerKey, nil)
mockConnection.On("Close").Run(func(_ mock.Arguments) { closer.Close() }).Return(nil).Maybe()
mockConnection.On("Close").Run(func(_ mock.Arguments) { connCancel() }).Return(nil).Maybe()
mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{})
if tc.ok {
mockConnection.On("ReceiveMessage", mock.Anything).Return(chID, nil, io.EOF).Maybe()
@ -433,7 +432,7 @@ func TestRouter_AcceptPeers(t *testing.T) {
time.Sleep(time.Millisecond)
} else {
select {
case <-closer.Done():
case <-connCtx.Done():
case <-time.After(100 * time.Millisecond):
require.Fail(t, "connection not closed")
}
@ -620,13 +619,14 @@ func TestRouter_DialPeers(t *testing.T) {
endpoint := p2p.Endpoint{Protocol: "mock", Path: string(tc.dialID)}
// Set up a mock transport that handshakes.
closer := tmsync.NewCloser()
connCtx, connCancel := context.WithCancel(context.Background())
defer connCancel()
mockConnection := &mocks.Connection{}
mockConnection.On("String").Maybe().Return("mock")
if tc.dialErr == nil {
mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey).
Return(tc.peerInfo, tc.peerKey, nil)
mockConnection.On("Close").Run(func(_ mock.Arguments) { closer.Close() }).Return(nil).Maybe()
mockConnection.On("Close").Run(func(_ mock.Arguments) { connCancel() }).Return(nil).Maybe()
}
if tc.ok {
mockConnection.On("ReceiveMessage", mock.Anything).Return(chID, nil, io.EOF).Maybe()
@ -644,7 +644,7 @@ func TestRouter_DialPeers(t *testing.T) {
mockTransport.On("Dial", mock.Anything, endpoint).Maybe().Return(nil, io.EOF)
} else {
mockTransport.On("Dial", mock.Anything, endpoint).Once().
Run(func(_ mock.Arguments) { closer.Close() }).
Run(func(_ mock.Arguments) { connCancel() }).
Return(nil, tc.dialErr)
}
@ -681,7 +681,7 @@ func TestRouter_DialPeers(t *testing.T) {
time.Sleep(time.Millisecond)
} else {
select {
case <-closer.Done():
case <-connCtx.Done():
case <-time.After(100 * time.Millisecond):
require.Fail(t, "connection not closed")
}


+ 22
- 23
internal/p2p/transport_memory.go View File

@ -9,7 +9,6 @@ import (
"sync"
"github.com/tendermint/tendermint/crypto"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
@ -175,10 +174,17 @@ func (t *MemoryTransport) Dial(ctx context.Context, endpoint Endpoint) (Connecti
inCh := make(chan memoryMessage, t.bufferSize)
outCh := make(chan memoryMessage, t.bufferSize)
closer := tmsync.NewCloser()
outConn := newMemoryConnection(t.logger, t.nodeID, peer.nodeID, inCh, outCh, closer)
inConn := newMemoryConnection(peer.logger, peer.nodeID, t.nodeID, outCh, inCh, closer)
once := &sync.Once{}
closeCh := make(chan struct{})
closeFn := func() { once.Do(func() { close(closeCh) }) }
outConn := newMemoryConnection(t.logger, t.nodeID, peer.nodeID, inCh, outCh)
outConn.closeCh = closeCh
outConn.closeFn = closeFn
inConn := newMemoryConnection(peer.logger, peer.nodeID, t.nodeID, outCh, inCh)
inConn.closeCh = closeCh
inConn.closeFn = closeFn
select {
case peer.acceptCh <- inConn:
@ -202,7 +208,9 @@ type MemoryConnection struct {
receiveCh <-chan memoryMessage
sendCh chan<- memoryMessage
closer *tmsync.Closer
closeFn func()
closeCh <-chan struct{}
}
// memoryMessage is passed internally, containing either a message or handshake.
@ -222,7 +230,6 @@ func newMemoryConnection(
remoteID types.NodeID,
receiveCh <-chan memoryMessage,
sendCh chan<- memoryMessage,
closer *tmsync.Closer,
) *MemoryConnection {
return &MemoryConnection{
logger: logger.With("remote", remoteID),
@ -230,7 +237,6 @@ func newMemoryConnection(
remoteID: remoteID,
receiveCh: receiveCh,
sendCh: sendCh,
closer: closer,
}
}
@ -264,7 +270,7 @@ func (c *MemoryConnection) Handshake(
select {
case c.sendCh <- memoryMessage{nodeInfo: &nodeInfo, pubKey: privKey.PubKey()}:
c.logger.Debug("sent handshake", "nodeInfo", nodeInfo)
case <-c.closer.Done():
case <-c.closeCh:
return types.NodeInfo{}, nil, io.EOF
case <-ctx.Done():
return types.NodeInfo{}, nil, ctx.Err()
@ -277,7 +283,7 @@ func (c *MemoryConnection) Handshake(
}
c.logger.Debug("received handshake", "peerInfo", msg.nodeInfo)
return *msg.nodeInfo, msg.pubKey, nil
case <-c.closer.Done():
case <-c.closeCh:
return types.NodeInfo{}, nil, io.EOF
case <-ctx.Done():
return types.NodeInfo{}, nil, ctx.Err()
@ -289,7 +295,7 @@ func (c *MemoryConnection) ReceiveMessage(ctx context.Context) (ChannelID, []byt
// Check close first, since channels are buffered. Otherwise, below select
// may non-deterministically return non-error even when closed.
select {
case <-c.closer.Done():
case <-c.closeCh:
return 0, nil, io.EOF
case <-ctx.Done():
return 0, nil, io.EOF
@ -300,7 +306,9 @@ func (c *MemoryConnection) ReceiveMessage(ctx context.Context) (ChannelID, []byt
case msg := <-c.receiveCh:
c.logger.Debug("received message", "chID", msg.channelID, "msg", msg.message)
return msg.channelID, msg.message, nil
case <-c.closer.Done():
case <-ctx.Done():
return 0, nil, io.EOF
case <-c.closeCh:
return 0, nil, io.EOF
}
}
@ -310,7 +318,7 @@ func (c *MemoryConnection) SendMessage(ctx context.Context, chID ChannelID, msg
// Check close first, since channels are buffered. Otherwise, below select
// may non-deterministically return non-error even when closed.
select {
case <-c.closer.Done():
case <-c.closeCh:
return io.EOF
case <-ctx.Done():
return io.EOF
@ -323,19 +331,10 @@ func (c *MemoryConnection) SendMessage(ctx context.Context, chID ChannelID, msg
return nil
case <-ctx.Done():
return io.EOF
case <-c.closer.Done():
case <-c.closeCh:
return io.EOF
}
}
// Close implements Connection.
func (c *MemoryConnection) Close() error {
select {
case <-c.closer.Done():
return nil
default:
c.closer.Close()
c.logger.Info("closed connection")
}
return nil
}
func (c *MemoryConnection) Close() error { c.closeFn(); return nil }

Loading…
Cancel
Save