Browse Source

No global config

pull/456/head
Jae Kwon 9 years ago
parent
commit
7d997ca8e6
4 changed files with 57 additions and 69 deletions
  1. +20
    -25
      config.go
  2. +21
    -28
      connection.go
  3. +3
    -2
      peer.go
  4. +13
    -14
      switch.go

+ 20
- 25
config.go View File

@ -4,31 +4,26 @@ import (
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
) )
var config cfg.Config = nil
func init() {
initConfigureable(dialTimeoutKey, 3)
initConfigureable(handshakeTimeoutKey, 20)
initConfigureable(maxNumPeersKey, 50)
initConfigureable(sendRateKey, 512000) // 500KB/s
initConfigureable(recvRateKey, 512000) // 500KB/s
initConfigureable(maxPayloadSizeKey, 1024)
initConfigureable(authEncKey, true)
cfg.OnConfig(func(newConfig cfg.Config) {
config = newConfig
// fill in any config values that might be missing
for key, value := range defaultConfigValues {
config.SetDefault(key, value)
}
})
}
const (
// Switch config keys
configKeyDialTimeoutSeconds = "p2p_dial_timeout_seconds"
configKeyHandshakeTimeoutSeconds = "p2p_handshake_timeout_seconds"
configKeyMaxNumPeers = "p2p_max_num_peers"
configKeyAuthEnc = "p2p_authenticated_encryption"
// MConnection config keys
configKeySendRate = "p2p_send_rate"
configKeyRecvRate = "p2p_recv_rate"
)
// default config map
var defaultConfigValues = make(map[string]interface{})
func setConfigDefaults(config cfg.Config) {
// Switch default config
config.SetDefault(configKeyDialTimeoutSeconds, 3)
config.SetDefault(configKeyHandshakeTimeoutSeconds, 20)
config.SetDefault(configKeyMaxNumPeers, 50)
config.SetDefault(configKeyAuthEnc, true)
func initConfigureable(key string, value interface{}) {
defaultConfigValues[key] = value
// MConnection default config
config.SetDefault(configKeySendRate, 512000) // 500KB/s
config.SetDefault(configKeyRecvRate, 512000) // 500KB/s
} }

+ 21
- 28
connection.go View File

@ -12,30 +12,25 @@ import (
flow "github.com/tendermint/flowcontrol" flow "github.com/tendermint/flowcontrol"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-wire" //"github.com/tendermint/log15" "github.com/tendermint/go-wire" //"github.com/tendermint/log15"
) )
const ( const (
numBatchMsgPackets = 10
minReadBufferSize = 1024
minWriteBufferSize = 65536
idleTimeoutMinutes = 5
updateStatsSeconds = 2
pingTimeoutSeconds = 40
flushThrottleMS = 100
numBatchMsgPackets = 10
minReadBufferSize = 1024
minWriteBufferSize = 65536
idleTimeoutMinutes = 5
updateStatsSeconds = 2
pingTimeoutSeconds = 40
flushThrottleMS = 100
defaultSendQueueCapacity = 1 defaultSendQueueCapacity = 1
defaultRecvBufferCapacity = 4096 defaultRecvBufferCapacity = 4096
defaultRecvMessageCapacity = 22020096 // 21MB defaultRecvMessageCapacity = 22020096 // 21MB
defaultSendTimeoutSeconds = 10 defaultSendTimeoutSeconds = 10
) )
// config keys
const (
sendRateKey = "p2p_send_rate"
recvRateKey = "p2p_recv_rate"
maxPayloadSizeKey = "p2p_max_msg_packet_payload_size"
)
type receiveCbFunc func(chID byte, msgBytes []byte) type receiveCbFunc func(chID byte, msgBytes []byte)
type errorCbFunc func(interface{}) type errorCbFunc func(interface{})
@ -91,7 +86,8 @@ type MConnection struct {
RemoteAddress *NetAddress RemoteAddress *NetAddress
} }
func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
func NewMConnection(config cfg.Config, conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
setConfigDefaults(config)
mconn := &MConnection{ mconn := &MConnection{
conn: conn, conn: conn,
@ -99,8 +95,8 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei
bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize), bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
sendMonitor: flow.New(0, 0), sendMonitor: flow.New(0, 0),
recvMonitor: flow.New(0, 0), recvMonitor: flow.New(0, 0),
sendRate: int64(config.GetInt(sendRateKey)),
recvRate: int64(config.GetInt(recvRateKey)),
sendRate: int64(config.GetInt(configKeySendRate)),
recvRate: int64(config.GetInt(configKeyRecvRate)),
send: make(chan struct{}, 1), send: make(chan struct{}, 1),
pong: make(chan struct{}), pong: make(chan struct{}),
onReceive: onReceive, onReceive: onReceive,
@ -319,7 +315,7 @@ func (c *MConnection) sendSomeMsgPackets() bool {
// Block until .sendMonitor says we can write. // Block until .sendMonitor says we can write.
// Once we're ready we send more than we asked for, // Once we're ready we send more than we asked for,
// but amortized it should even out. // but amortized it should even out.
c.sendMonitor.Limit(maxMsgPacketTotalSize(), atomic.LoadInt64(&c.sendRate), true)
c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.sendRate), true)
// Now send some msgPackets. // Now send some msgPackets.
for i := 0; i < numBatchMsgPackets; i++ { for i := 0; i < numBatchMsgPackets; i++ {
@ -377,7 +373,7 @@ func (c *MConnection) recvRoutine() {
FOR_LOOP: FOR_LOOP:
for { for {
// Block until .recvMonitor says we can read. // Block until .recvMonitor says we can read.
c.recvMonitor.Limit(maxMsgPacketTotalSize(), atomic.LoadInt64(&c.recvRate), true)
c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.recvRate), true)
/* /*
// Peek into bufReader for debugging // Peek into bufReader for debugging
@ -418,7 +414,7 @@ FOR_LOOP:
log.Debug("Receive Pong") log.Debug("Receive Pong")
case packetTypeMsg: case packetTypeMsg:
pkt, n, err := msgPacket{}, int(0), error(nil) pkt, n, err := msgPacket{}, int(0), error(nil)
wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize(), &n, &err)
wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
c.recvMonitor.Update(int(n)) c.recvMonitor.Update(int(n))
if err != nil { if err != nil {
if c.IsRunning() { if c.IsRunning() {
@ -598,15 +594,14 @@ func (ch *Channel) isSendPending() bool {
func (ch *Channel) nextMsgPacket() msgPacket { func (ch *Channel) nextMsgPacket() msgPacket {
packet := msgPacket{} packet := msgPacket{}
packet.ChannelID = byte(ch.id) packet.ChannelID = byte(ch.id)
maxPayloadSize := config.GetInt(maxPayloadSizeKey)
packet.Bytes = ch.sending[:MinInt(maxPayloadSize, len(ch.sending))]
if len(ch.sending) <= maxPayloadSize {
packet.Bytes = ch.sending[:MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
if len(ch.sending) <= maxMsgPacketPayloadSize {
packet.EOF = byte(0x01) packet.EOF = byte(0x01)
ch.sending = nil ch.sending = nil
atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
} else { } else {
packet.EOF = byte(0x00) packet.EOF = byte(0x00)
ch.sending = ch.sending[MinInt(maxPayloadSize, len(ch.sending)):]
ch.sending = ch.sending[MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
} }
return packet return packet
} }
@ -654,12 +649,10 @@ func (ch *Channel) updateStats() {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
func maxMsgPacketTotalSize() int {
return config.GetInt(maxPayloadSizeKey) + maxMsgPacketOverheadSize
}
const ( const (
maxMsgPacketPayloadSize = 1024
maxMsgPacketOverheadSize = 10 // It's actually lower but good enough maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
packetTypePing = byte(0x01) packetTypePing = byte(0x01)
packetTypePong = byte(0x02) packetTypePong = byte(0x02)
packetTypeMsg = byte(0x03) packetTypeMsg = byte(0x03)


+ 3
- 2
peer.go View File

@ -6,6 +6,7 @@ import (
"net" "net"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
) )
@ -47,7 +48,7 @@ func peerHandshake(conn net.Conn, ourNodeInfo *NodeInfo) (*NodeInfo, error) {
} }
// NOTE: call peerHandshake on conn before calling newPeer(). // NOTE: call peerHandshake on conn before calling newPeer().
func newPeer(conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
func newPeer(config cfg.Config, conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
var p *Peer var p *Peer
onReceive := func(chID byte, msgBytes []byte) { onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID] reactor := reactorsByCh[chID]
@ -60,7 +61,7 @@ func newPeer(conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh
p.Stop() p.Stop()
onPeerError(p, r) onPeerError(p, r)
} }
mconn := NewMConnection(conn, chDescs, onReceive, onError)
mconn := NewMConnection(config, conn, chDescs, onReceive, onError)
p = &Peer{ p = &Peer{
outbound: outbound, outbound: outbound,
mconn: mconn, mconn: mconn,


+ 13
- 14
switch.go View File

@ -8,6 +8,7 @@ import (
"time" "time"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
"github.com/tendermint/go-crypto" "github.com/tendermint/go-crypto"
"github.com/tendermint/log15" "github.com/tendermint/log15"
) )
@ -55,6 +56,7 @@ incoming messages are received on the reactor.
type Switch struct { type Switch struct {
BaseService BaseService
config cfg.Config
listeners []Listener listeners []Listener
reactors map[string]Reactor reactors map[string]Reactor
chDescs []*ChannelDescriptor chDescs []*ChannelDescriptor
@ -70,16 +72,11 @@ var (
ErrSwitchMaxPeersPerIPRange = errors.New("IP range has too many peers") ErrSwitchMaxPeersPerIPRange = errors.New("IP range has too many peers")
) )
// config keys
const (
dialTimeoutKey = "p2p_dial_timeout_seconds"
handshakeTimeoutKey = "p2p_handshake_timeout_seconds"
maxNumPeersKey = "p2p_max_num_peers"
authEncKey = "p2p_authenticated_encryption"
)
func NewSwitch(config cfg.Config) *Switch {
setConfigDefaults(config)
func NewSwitch() *Switch {
sw := &Switch{ sw := &Switch{
config: config,
reactors: make(map[string]Reactor), reactors: make(map[string]Reactor),
chDescs: make([]*ChannelDescriptor, 0), chDescs: make([]*ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor), reactorsByCh: make(map[byte]Reactor),
@ -196,11 +193,12 @@ func (sw *Switch) OnStop() {
// CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed. // CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed.
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
// Set deadline for handshake so we don't block forever on conn.ReadFull // Set deadline for handshake so we don't block forever on conn.ReadFull
conn.SetDeadline(time.Now().Add(time.Duration(config.GetInt(handshakeTimeoutKey)) * time.Second))
conn.SetDeadline(time.Now().Add(
time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second))
// First, encrypt the connection. // First, encrypt the connection.
var sconn net.Conn = conn var sconn net.Conn = conn
if config.GetBool(authEncKey) {
if sw.config.GetBool(configKeyAuthEnc) {
var err error var err error
sconn, err = MakeSecretConnection(conn, sw.nodePrivKey) sconn, err = MakeSecretConnection(conn, sw.nodePrivKey)
if err != nil { if err != nil {
@ -214,7 +212,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er
sconn.Close() sconn.Close()
return nil, err return nil, err
} }
if config.GetBool("p2p_authenticated_encryption") {
if sw.config.GetBool(configKeyAuthEnc) {
// Check that the professed PubKey matches the sconn's. // Check that the professed PubKey matches the sconn's.
if !peerNodeInfo.PubKey.Equals(sconn.(*SecretConnection).RemotePubKey()) { if !peerNodeInfo.PubKey.Equals(sconn.(*SecretConnection).RemotePubKey()) {
sconn.Close() sconn.Close()
@ -233,7 +231,7 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er
return nil, err return nil, err
} }
peer := newPeer(sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
peer := newPeer(sw.config, sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
// Add the peer to .peers // Add the peer to .peers
// ignore if duplicate or if we already have too many for that IP range // ignore if duplicate or if we already have too many for that IP range
@ -287,7 +285,8 @@ func (sw *Switch) dialSeed(addr *NetAddress) {
func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
log.Info("Dialing address", "address", addr) log.Info("Dialing address", "address", addr)
sw.dialing.Set(addr.IP.String(), addr) sw.dialing.Set(addr.IP.String(), addr)
conn, err := addr.DialTimeout(time.Duration(config.GetInt(dialTimeoutKey)) * time.Second)
conn, err := addr.DialTimeout(time.Duration(
sw.config.GetInt(configKeyDialTimeoutSeconds)) * time.Second)
sw.dialing.Delete(addr.IP.String()) sw.dialing.Delete(addr.IP.String())
if err != nil { if err != nil {
log.Info("Failed dialing address", "address", addr, "error", err) log.Info("Failed dialing address", "address", addr, "error", err)
@ -378,7 +377,7 @@ func (sw *Switch) listenerRoutine(l Listener) {
} }
// ignore connection if we already have enough // ignore connection if we already have enough
maxPeers := config.GetInt(maxNumPeersKey)
maxPeers := sw.config.GetInt(configKeyMaxNumPeers)
if maxPeers <= sw.peers.Size() { if maxPeers <= sw.peers.Size() {
log.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers) log.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers)
continue continue


Loading…
Cancel
Save