Browse Source

Conform to go-wire 1.0

pull/456/head
Jae Kwon 9 years ago
parent
commit
1c628a97ad
6 changed files with 54 additions and 54 deletions
  1. +2
    -1
      addrbook.go
  2. +39
    -45
      connection.go
  3. +6
    -4
      peer.go
  4. +3
    -2
      pex_reactor.go
  5. +2
    -2
      secret_connection.go
  6. +2
    -0
      types.go

+ 2
- 1
addrbook.go View File

@ -65,7 +65,8 @@ const (
minGetSelection = 32 minGetSelection = 32
// max addresses returned by GetSelection // max addresses returned by GetSelection
maxGetSelection = 2500
// NOTE: this must match "maxPexMessageSize"
maxGetSelection = 250
// current version of the on-disk format. // current version of the on-disk format.
serializationVersion = 1 serializationVersion = 1


+ 39
- 45
connection.go View File

@ -16,18 +16,19 @@ import (
) )
const ( const (
numBatchMsgPackets = 10
minReadBufferSize = 1024
minWriteBufferSize = 1024
idleTimeoutMinutes = 5
updateStatsSeconds = 2
pingTimeoutSeconds = 40
defaultSendRate = 51200 // 50Kb/s
defaultRecvRate = 51200 // 50Kb/s
flushThrottleMS = 100
defaultSendQueueCapacity = 1
defaultRecvBufferCapacity = 4096
defaultSendTimeoutSeconds = 10
numBatchMsgPackets = 10
minReadBufferSize = 1024
minWriteBufferSize = 1024
idleTimeoutMinutes = 5
updateStatsSeconds = 2
pingTimeoutSeconds = 40
defaultSendRate = 51200 // 50KB/s
defaultRecvRate = 51200 // 50KB/s
flushThrottleMS = 100
defaultSendQueueCapacity = 1
defaultRecvBufferCapacity = 4096
defaultRecvMessageCapacity = 22020096 // 21MB
defaultSendTimeoutSeconds = 10
) )
type receiveCbFunc func(chID byte, msgBytes []byte) type receiveCbFunc func(chID byte, msgBytes []byte)
@ -259,7 +260,7 @@ func (c *MConnection) sendRoutine() {
FOR_LOOP: FOR_LOOP:
for { for {
var n int64
var n int
var err error var err error
select { select {
case <-c.flushTimer.Ch: case <-c.flushTimer.Ch:
@ -313,7 +314,7 @@ func (c *MConnection) sendSomeMsgPackets() bool {
// Block until .sendMonitor says we can write. // Block until .sendMonitor says we can write.
// Once we're ready we send more than we asked for, // Once we're ready we send more than we asked for,
// but amortized it should even out. // but amortized it should even out.
c.sendMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.sendRate), true)
c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.sendRate), true)
// Now send some msgPackets. // Now send some msgPackets.
for i := 0; i < numBatchMsgPackets; i++ { for i := 0; i < numBatchMsgPackets; i++ {
@ -371,7 +372,7 @@ func (c *MConnection) recvRoutine() {
FOR_LOOP: FOR_LOOP:
for { for {
// Block until .recvMonitor says we can read. // Block until .recvMonitor says we can read.
c.recvMonitor.Limit(maxMsgPacketSize, atomic.LoadInt64(&c.recvRate), true)
c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.recvRate), true)
/* /*
// Peek into bufReader for debugging // Peek into bufReader for debugging
@ -389,7 +390,7 @@ FOR_LOOP:
*/ */
// Read packet type // Read packet type
var n int64
var n int
var err error var err error
pktType := wire.ReadByte(c.bufReader, &n, &err) pktType := wire.ReadByte(c.bufReader, &n, &err)
c.recvMonitor.Update(int(n)) c.recvMonitor.Update(int(n))
@ -411,8 +412,8 @@ FOR_LOOP:
// do nothing // do nothing
log.Info("Receive Pong") log.Info("Receive Pong")
case packetTypeMsg: case packetTypeMsg:
pkt, n, err := msgPacket{}, int64(0), error(nil)
wire.ReadBinaryPtr(&pkt, c.bufReader, &n, &err)
pkt, n, err := msgPacket{}, int(0), error(nil)
wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
c.recvMonitor.Update(int(n)) c.recvMonitor.Update(int(n))
if err != nil { if err != nil {
if c.IsRunning() { if c.IsRunning() {
@ -456,10 +457,11 @@ FOR_LOOP:
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
type ChannelDescriptor struct { type ChannelDescriptor struct {
ID byte
Priority int
SendQueueCapacity int
RecvBufferCapacity int
ID byte
Priority int
SendQueueCapacity int
RecvBufferCapacity int
RecvMessageCapacity int
} }
func (chDesc *ChannelDescriptor) FillDefaults() { func (chDesc *ChannelDescriptor) FillDefaults() {
@ -469,6 +471,9 @@ func (chDesc *ChannelDescriptor) FillDefaults() {
if chDesc.RecvBufferCapacity == 0 { if chDesc.RecvBufferCapacity == 0 {
chDesc.RecvBufferCapacity = defaultRecvBufferCapacity chDesc.RecvBufferCapacity = defaultRecvBufferCapacity
} }
if chDesc.RecvMessageCapacity == 0 {
chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
}
} }
// TODO: lowercase. // TODO: lowercase.
@ -557,27 +562,27 @@ func (ch *Channel) isSendPending() bool {
func (ch *Channel) nextMsgPacket() msgPacket { func (ch *Channel) nextMsgPacket() msgPacket {
packet := msgPacket{} packet := msgPacket{}
packet.ChannelID = byte(ch.id) packet.ChannelID = byte(ch.id)
packet.Bytes = ch.sending[:MinInt(maxMsgPacketSize, len(ch.sending))]
if len(ch.sending) <= maxMsgPacketSize {
packet.Bytes = ch.sending[:MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
if len(ch.sending) <= maxMsgPacketPayloadSize {
packet.EOF = byte(0x01) packet.EOF = byte(0x01)
ch.sending = nil ch.sending = nil
atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
} else { } else {
packet.EOF = byte(0x00) packet.EOF = byte(0x00)
ch.sending = ch.sending[MinInt(maxMsgPacketSize, len(ch.sending)):]
ch.sending = ch.sending[MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
} }
return packet return packet
} }
// Writes next msgPacket to w. // Writes next msgPacket to w.
// Not goroutine-safe // Not goroutine-safe
func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int64, err error) {
func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
packet := ch.nextMsgPacket() packet := ch.nextMsgPacket()
log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet) log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
wire.WriteByte(packetTypeMsg, w, &n, &err) wire.WriteByte(packetTypeMsg, w, &n, &err)
wire.WriteBinary(packet, w, &n, &err) wire.WriteBinary(packet, w, &n, &err)
if err != nil { if err != nil {
ch.recentlySent += n
ch.recentlySent += int64(n)
} }
return return
} }
@ -586,7 +591,7 @@ func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int64, err error) {
// Not goroutine-safe // Not goroutine-safe
func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
// log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet) // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
if wire.MaxBinaryReadSize < len(ch.recving)+len(packet.Bytes) {
if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
return nil, wire.ErrBinaryReadSizeOverflow return nil, wire.ErrBinaryReadSizeOverflow
} }
ch.recving = append(ch.recving, packet.Bytes...) ch.recving = append(ch.recving, packet.Bytes...)
@ -609,10 +614,12 @@ func (ch *Channel) updateStats() {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
const ( const (
maxMsgPacketSize = 1024
packetTypePing = byte(0x01)
packetTypePong = byte(0x02)
packetTypeMsg = byte(0x03)
maxMsgPacketPayloadSize = 1024
maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
packetTypePing = byte(0x01)
packetTypePong = byte(0x02)
packetTypeMsg = byte(0x03)
) )
// Messages in channels are chopped into smaller msgPackets for multiplexing. // Messages in channels are chopped into smaller msgPackets for multiplexing.
@ -625,16 +632,3 @@ type msgPacket struct {
func (p msgPacket) String() string { func (p msgPacket) String() string {
return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF) return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF)
} }
//-----------------------------------------------------------------------------
// Convenience struct for writing typed messages.
// Reading requires a custom decoder that switches on the first type byte of a byteslice.
type TypedMessage struct {
Type byte
Msg interface{}
}
func (tm TypedMessage) String() string {
return fmt.Sprintf("TMsg{%X:%v}", tm.Type, tm.Msg)
}

+ 6
- 4
peer.go View File

@ -28,12 +28,12 @@ func peerHandshake(conn net.Conn, ourNodeInfo *NodeInfo) (*NodeInfo, error) {
var err2 error var err2 error
Parallel( Parallel(
func() { func() {
var n int64
var n int
wire.WriteBinary(ourNodeInfo, conn, &n, &err1) wire.WriteBinary(ourNodeInfo, conn, &n, &err1)
}, },
func() { func() {
var n int64
wire.ReadBinary(peerNodeInfo, conn, &n, &err2)
var n int
wire.ReadBinary(peerNodeInfo, conn, maxNodeInfoSize, &n, &err2)
log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo) log.Notice("Peer handshake", "peerNodeInfo", peerNodeInfo)
}) })
if err1 != nil { if err1 != nil {
@ -112,7 +112,9 @@ func (p *Peer) CanSend(chID byte) bool {
} }
func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
wire.WriteString(p.Key, w, &n, &err)
var n_ int
wire.WriteString(p.Key, w, &n_, &err)
n += int64(n_)
return return
} }


+ 3
- 2
pex_reactor.go View File

@ -18,6 +18,7 @@ const (
PexChannel = byte(0x00) PexChannel = byte(0x00)
ensurePeersPeriodSeconds = 30 ensurePeersPeriodSeconds = 30
minNumOutboundPeers = 10 minNumOutboundPeers = 10
maxPexMessageSize = 1048576 // 1MB
) )
/* /*
@ -227,9 +228,9 @@ var _ = wire.RegisterInterface(
func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) { func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
msgType = bz[0] msgType = bz[0]
n := new(int64)
n := new(int)
r := bytes.NewReader(bz) r := bytes.NewReader(bz)
msg = wire.ReadBinary(struct{ PexMessage }{}, r, n, &err).(struct{ PexMessage }).PexMessage
msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
return return
} }


+ 2
- 2
secret_connection.go View File

@ -279,8 +279,8 @@ func shareAuthSignature(sc *SecretConnection, pubKey crypto.PubKeyEd25519, signa
if err2 != nil { if err2 != nil {
return return
} }
n := int64(0) // not used.
recvMsg = wire.ReadBinary(authSigMessage{}, bytes.NewBuffer(readBuffer), &n, &err2).(authSigMessage)
n := int(0) // not used.
recvMsg = wire.ReadBinary(authSigMessage{}, bytes.NewBuffer(readBuffer), authSigMsgSize, &n, &err2).(authSigMessage)
}) })
if err1 != nil { if err1 != nil {


+ 2
- 0
types.go View File

@ -9,6 +9,8 @@ import (
"github.com/tendermint/go-crypto" "github.com/tendermint/go-crypto"
) )
const maxNodeInfoSize = 10240 // 10Kb
type NodeInfo struct { type NodeInfo struct {
PubKey crypto.PubKeyEd25519 `json:"pub_key"` PubKey crypto.PubKeyEd25519 `json:"pub_key"`
Moniker string `json:"moniker"` Moniker string `json:"moniker"`


Loading…
Cancel
Save