diff --git a/connection.go b/connection.go index 8afb3f9e9..e61608896 100644 --- a/connection.go +++ b/connection.go @@ -11,7 +11,6 @@ import ( "time" cmn "github.com/tendermint/go-common" - cfg "github.com/tendermint/go-config" flow "github.com/tendermint/go-flowrate/flowrate" wire "github.com/tendermint/go-wire" ) @@ -25,8 +24,10 @@ const ( flushThrottle = 100 * time.Millisecond defaultSendQueueCapacity = 1 + defaultSendRate = int64(512000) // 500KB/s defaultRecvBufferCapacity = 4096 - defaultRecvMessageCapacity = 22020096 // 21MB + defaultRecvMessageCapacity = 22020096 // 21MB + defaultRecvRate = int64(512000) // 500KB/s defaultSendTimeout = 10 * time.Second ) @@ -66,8 +67,6 @@ type MConnection struct { bufWriter *bufio.Writer sendMonitor *flow.Monitor recvMonitor *flow.Monitor - sendRate int64 - recvRate int64 send chan struct{} pong chan struct{} channels []*Channel @@ -75,6 +74,7 @@ type MConnection struct { onReceive receiveCbFunc onError errorCbFunc errored uint32 + config *MConnectionConfig quit chan struct{} flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. @@ -85,25 +85,38 @@ type MConnection struct { RemoteAddress *NetAddress } -func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection { +// MConnectionConfig is a MConnection configuration +type MConnectionConfig struct { + SendRate int64 + RecvRate int64 +} + +// NewMConnection wraps net.Conn and creates multiplex connection +func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection { + return NewMConnectionWithConfig( + conn, + chDescs, + onReceive, + onError, + &MConnectionConfig{ + SendRate: defaultSendRate, + RecvRate: defaultRecvRate, + }) +} + +// NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config +func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnectionConfig) *MConnection { mconn := &MConnection{ conn: conn, bufReader: bufio.NewReaderSize(conn, minReadBufferSize), bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize), sendMonitor: flow.New(0, 0), recvMonitor: flow.New(0, 0), - sendRate: int64(config.GetInt(configKeySendRate)), - recvRate: int64(config.GetInt(configKeyRecvRate)), send: make(chan struct{}, 1), pong: make(chan struct{}), onReceive: onReceive, onError: onError, - - // Initialized in Start() - quit: nil, - flushTimer: nil, - pingTimer: nil, - chStatsTimer: nil, + config: config, LocalAddress: NewNetAddress(conn.LocalAddr()), RemoteAddress: NewNetAddress(conn.RemoteAddr()), @@ -313,7 +326,7 @@ func (c *MConnection) sendSomeMsgPackets() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. - c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.sendRate), true) + c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true) // Now send some msgPackets. for i := 0; i < numBatchMsgPackets; i++ { @@ -371,7 +384,7 @@ func (c *MConnection) recvRoutine() { FOR_LOOP: for { // Block until .recvMonitor says we can read. - c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.recvRate), true) + c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true) /* // Peek into bufReader for debugging diff --git a/connection_test.go b/connection_test.go index 79a7227a2..cd6bb4a8f 100644 --- a/connection_test.go +++ b/connection_test.go @@ -7,7 +7,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - cfg "github.com/tendermint/go-config" p2p "github.com/tendermint/go-p2p" ) @@ -20,10 +19,8 @@ func createMConnection(conn net.Conn) *p2p.MConnection { } func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection { - config := cfg.NewMapConfig(map[string]interface{}{"send_rate": 512000, "recv_rate": 512000}) chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1}} - - return p2p.NewMConnection(config, conn, chDescs, onReceive, onError) + return p2p.NewMConnection(conn, chDescs, onReceive, onError) } func TestMConnectionSend(t *testing.T) { @@ -104,7 +101,7 @@ func TestMConnectionStatus(t *testing.T) { assert.Zero(status.Channels[0].SendQueueSize) } -func TestMConnectionNonPersistent(t *testing.T) { +func TestMConnectionStopsAndReturnsError(t *testing.T) { assert, require := assert.New(t), require.New(t) server, client := net.Pipe() diff --git a/peer.go b/peer.go index 7a2d647f8..6b1b47a1f 100644 --- a/peer.go +++ b/peer.go @@ -61,7 +61,11 @@ func newPeer(config cfg.Config, conn net.Conn, peerNodeInfo *NodeInfo, outbound p.Stop() onPeerError(p, r) } - mconn := NewMConnection(config, conn, chDescs, onReceive, onError) + mconnConfig := &MConnectionConfig{ + SendRate: int64(config.GetInt(configKeySendRate)), + RecvRate: int64(config.GetInt(configKeyRecvRate)), + } + mconn := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, mconnConfig) p = &Peer{ outbound: outbound, mconn: mconn, diff --git a/switch.go b/switch.go index eed8ceea5..78a3020ed 100644 --- a/switch.go +++ b/switch.go @@ -193,7 +193,7 @@ func (sw *Switch) OnStop() { } // NOTE: This performs a blocking handshake before the peer is added. -// CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed. +// CONTRACT: If error is returned, peer is nil, and conn is immediately closed. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { // Filter by addr (ie. ip:port)