|
@ -15,9 +15,6 @@ import ( |
|
|
|
|
|
|
|
|
const ( |
|
|
const ( |
|
|
MemoryProtocol Protocol = "memory" |
|
|
MemoryProtocol Protocol = "memory" |
|
|
|
|
|
|
|
|
// bufferSize is the channel buffer size of MemoryConnection.
|
|
|
|
|
|
bufferSize = 1 |
|
|
|
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
// MemoryNetwork is an in-memory "network" that uses buffered Go channels to
|
|
|
// MemoryNetwork is an in-memory "network" that uses buffered Go channels to
|
|
@ -30,11 +27,13 @@ type MemoryNetwork struct { |
|
|
|
|
|
|
|
|
mtx sync.RWMutex |
|
|
mtx sync.RWMutex |
|
|
transports map[NodeID]*MemoryTransport |
|
|
transports map[NodeID]*MemoryTransport |
|
|
|
|
|
bufferSize int |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// NewMemoryNetwork creates a new in-memory network.
|
|
|
// NewMemoryNetwork creates a new in-memory network.
|
|
|
func NewMemoryNetwork(logger log.Logger) *MemoryNetwork { |
|
|
|
|
|
|
|
|
func NewMemoryNetwork(logger log.Logger, bufferSize int) *MemoryNetwork { |
|
|
return &MemoryNetwork{ |
|
|
return &MemoryNetwork{ |
|
|
|
|
|
bufferSize: bufferSize, |
|
|
logger: logger, |
|
|
logger: logger, |
|
|
transports: map[NodeID]*MemoryTransport{}, |
|
|
transports: map[NodeID]*MemoryTransport{}, |
|
|
} |
|
|
} |
|
@ -89,9 +88,10 @@ func (n *MemoryNetwork) Size() int { |
|
|
// New transports are allocated with MemoryNetwork.CreateTransport(). To contact
|
|
|
// New transports are allocated with MemoryNetwork.CreateTransport(). To contact
|
|
|
// a different endpoint, both transports must be in the same MemoryNetwork.
|
|
|
// a different endpoint, both transports must be in the same MemoryNetwork.
|
|
|
type MemoryTransport struct { |
|
|
type MemoryTransport struct { |
|
|
logger log.Logger |
|
|
|
|
|
network *MemoryNetwork |
|
|
|
|
|
nodeID NodeID |
|
|
|
|
|
|
|
|
logger log.Logger |
|
|
|
|
|
network *MemoryNetwork |
|
|
|
|
|
nodeID NodeID |
|
|
|
|
|
bufferSize int |
|
|
|
|
|
|
|
|
acceptCh chan *MemoryConnection |
|
|
acceptCh chan *MemoryConnection |
|
|
closeCh chan struct{} |
|
|
closeCh chan struct{} |
|
@ -102,12 +102,12 @@ type MemoryTransport struct { |
|
|
// MemoryNetwork, use MemoryNetwork.CreateTransport() instead.
|
|
|
// MemoryNetwork, use MemoryNetwork.CreateTransport() instead.
|
|
|
func newMemoryTransport(network *MemoryNetwork, nodeID NodeID) *MemoryTransport { |
|
|
func newMemoryTransport(network *MemoryNetwork, nodeID NodeID) *MemoryTransport { |
|
|
return &MemoryTransport{ |
|
|
return &MemoryTransport{ |
|
|
logger: network.logger.With("local", nodeID), |
|
|
|
|
|
network: network, |
|
|
|
|
|
nodeID: nodeID, |
|
|
|
|
|
|
|
|
|
|
|
acceptCh: make(chan *MemoryConnection), |
|
|
|
|
|
closeCh: make(chan struct{}), |
|
|
|
|
|
|
|
|
logger: network.logger.With("local", nodeID), |
|
|
|
|
|
network: network, |
|
|
|
|
|
nodeID: nodeID, |
|
|
|
|
|
bufferSize: network.bufferSize, |
|
|
|
|
|
acceptCh: make(chan *MemoryConnection), |
|
|
|
|
|
closeCh: make(chan struct{}), |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -164,8 +164,8 @@ func (t *MemoryTransport) Dial(ctx context.Context, endpoint Endpoint) (Connecti |
|
|
return nil, fmt.Errorf("unknown peer %q", nodeID) |
|
|
return nil, fmt.Errorf("unknown peer %q", nodeID) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
inCh := make(chan memoryMessage, bufferSize) |
|
|
|
|
|
outCh := make(chan memoryMessage, bufferSize) |
|
|
|
|
|
|
|
|
inCh := make(chan memoryMessage, t.bufferSize) |
|
|
|
|
|
outCh := make(chan memoryMessage, t.bufferSize) |
|
|
closer := tmsync.NewCloser() |
|
|
closer := tmsync.NewCloser() |
|
|
|
|
|
|
|
|
outConn := newMemoryConnection(t.logger, t.nodeID, peer.nodeID, inCh, outCh, closer) |
|
|
outConn := newMemoryConnection(t.logger, t.nodeID, peer.nodeID, inCh, outCh, closer) |
|
|