diff --git a/binary/byteslice.go b/binary/byteslice.go index d134ed1a9..0390b0177 100644 --- a/binary/byteslice.go +++ b/binary/byteslice.go @@ -35,21 +35,35 @@ func (self ByteSlice) WriteTo(w io.Writer) (n int64, err error) { return int64(n_ + 4), err } -func ReadByteSliceSafe(r io.Reader) (ByteSlice, error) { - length, err := ReadUInt32Safe(r) +func (self ByteSlice) Reader() io.Reader { + return bytes.NewReader([]byte(self)) +} + +func ReadByteSliceSafe(r io.Reader) (bytes ByteSlice, n int64, err error) { + length, n_, err := ReadUInt32Safe(r) + n += n_ if err != nil { - return nil, err + return nil, n, err } - bytes := make([]byte, int(length)) - _, err = io.ReadFull(r, bytes) + bytes = make([]byte, int(length)) + n__, err := io.ReadFull(r, bytes) + n += int64(n__) if err != nil { - return nil, err + return nil, n, err + } + return bytes, n, nil +} + +func ReadByteSliceN(r io.Reader) (bytes ByteSlice, n int64) { + bytes, n, err := ReadByteSliceSafe(r) + if err != nil { + panic(err) } - return bytes, nil + return bytes, n } -func ReadByteSlice(r io.Reader) ByteSlice { - bytes, err := ReadByteSliceSafe(r) +func ReadByteSlice(r io.Reader) (bytes ByteSlice) { + bytes, _, err := ReadByteSliceSafe(r) if err != nil { panic(err) } diff --git a/binary/codec.go b/binary/codec.go index 4544c53ff..432ad0bea 100644 --- a/binary/codec.go +++ b/binary/codec.go @@ -5,21 +5,19 @@ import ( ) const ( - TYPE_NIL = Byte(0x00) - TYPE_BYTE = Byte(0x01) - TYPE_INT8 = Byte(0x02) - TYPE_UINT8 = Byte(0x03) - TYPE_INT16 = Byte(0x04) - TYPE_UINT16 = Byte(0x05) - TYPE_INT32 = Byte(0x06) - TYPE_UINT32 = Byte(0x07) - TYPE_INT64 = Byte(0x08) - TYPE_UINT64 = Byte(0x09) - + TYPE_NIL = Byte(0x00) + TYPE_BYTE = Byte(0x01) + TYPE_INT8 = Byte(0x02) + TYPE_UINT8 = Byte(0x03) + TYPE_INT16 = Byte(0x04) + TYPE_UINT16 = Byte(0x05) + TYPE_INT32 = Byte(0x06) + TYPE_UINT32 = Byte(0x07) + TYPE_INT64 = Byte(0x08) + TYPE_UINT64 = Byte(0x09) TYPE_STRING = Byte(0x10) TYPE_BYTESLICE = Byte(0x11) - - TYPE_TIME = Byte(0x20) + TYPE_TIME = Byte(0x20) ) func GetBinaryType(o Binary) Byte { @@ -44,57 +42,50 @@ func GetBinaryType(o Binary) Byte { return TYPE_INT64 case UInt64: return TYPE_UINT64 - case Int: - panic("Int not supported") - case UInt: - panic("UInt not supported") - case String: return TYPE_STRING case ByteSlice: return TYPE_BYTESLICE - case Time: return TYPE_TIME - default: panic("Unsupported type") } } -func ReadBinary(r io.Reader) Binary { - type_ := ReadByte(r) +func ReadBinaryN(r io.Reader) (o Binary, n int64) { + type_, n_ := ReadByteN(r) + n += n_ switch type_ { case TYPE_NIL: - return nil + o, n_ = nil, 0 case TYPE_BYTE: - return ReadByte(r) + o, n_ = ReadByteN(r) case TYPE_INT8: - return ReadInt8(r) + o, n_ = ReadInt8N(r) case TYPE_UINT8: - return ReadUInt8(r) + o, n_ = ReadUInt8N(r) case TYPE_INT16: - return ReadInt16(r) + o, n_ = ReadInt16N(r) case TYPE_UINT16: - return ReadUInt16(r) + o, n_ = ReadUInt16N(r) case TYPE_INT32: - return ReadInt32(r) + o, n_ = ReadInt32N(r) case TYPE_UINT32: - return ReadUInt32(r) + o, n_ = ReadUInt32N(r) case TYPE_INT64: - return ReadInt64(r) + o, n_ = ReadInt64N(r) case TYPE_UINT64: - return ReadUInt64(r) - + o, n_ = ReadUInt64N(r) case TYPE_STRING: - return ReadString(r) + o, n_ = ReadStringN(r) case TYPE_BYTESLICE: - return ReadByteSlice(r) - + o, n_ = ReadByteSliceN(r) case TYPE_TIME: - return ReadTime(r) - + o, n_ = ReadTimeN(r) default: panic("Unsupported type") } + n += n_ + return o, n } diff --git a/binary/int.go b/binary/int.go index d64d37473..81382419a 100644 --- a/binary/int.go +++ b/binary/int.go @@ -40,17 +40,25 @@ func (self Byte) WriteTo(w io.Writer) (int64, error) { return int64(n), err } -func ReadByteSafe(r io.Reader) (Byte, error) { +func ReadByteSafe(r io.Reader) (Byte, int64, error) { buf := [1]byte{0} - _, err := io.ReadFull(r, buf[:]) + n, err := io.ReadFull(r, buf[:]) if err != nil { - return 0, err + return 0, int64(n), err } - return Byte(buf[0]), nil + return Byte(buf[0]), int64(n), nil +} + +func ReadByteN(r io.Reader) (Byte, int64) { + b, n, err := ReadByteSafe(r) + if err != nil { + panic(err) + } + return b, n } func ReadByte(r io.Reader) Byte { - b, err := ReadByteSafe(r) + b, _, err := ReadByteSafe(r) if err != nil { panic(err) } @@ -80,17 +88,25 @@ func (self Int8) WriteTo(w io.Writer) (int64, error) { return int64(n), err } -func ReadInt8Safe(r io.Reader) (Int8, error) { +func ReadInt8Safe(r io.Reader) (Int8, int64, error) { buf := [1]byte{0} - _, err := io.ReadFull(r, buf[:]) + n, err := io.ReadFull(r, buf[:]) + if err != nil { + return Int8(0), int64(n), err + } + return Int8(buf[0]), int64(n), nil +} + +func ReadInt8N(r io.Reader) (Int8, int64) { + b, n, err := ReadInt8Safe(r) if err != nil { - return Int8(0), err + panic(err) } - return Int8(buf[0]), nil + return b, n } func ReadInt8(r io.Reader) Int8 { - b, err := ReadInt8Safe(r) + b, _, err := ReadInt8Safe(r) if err != nil { panic(err) } @@ -120,17 +136,25 @@ func (self UInt8) WriteTo(w io.Writer) (int64, error) { return int64(n), err } -func ReadUInt8Safe(r io.Reader) (UInt8, error) { +func ReadUInt8Safe(r io.Reader) (UInt8, int64, error) { buf := [1]byte{0} - _, err := io.ReadFull(r, buf[:]) + n, err := io.ReadFull(r, buf[:]) if err != nil { - return UInt8(0), err + return UInt8(0), int64(n), err } - return UInt8(buf[0]), nil + return UInt8(buf[0]), int64(n), nil +} + +func ReadUInt8N(r io.Reader) (UInt8, int64) { + b, n, err := ReadUInt8Safe(r) + if err != nil { + panic(err) + } + return b, n } func ReadUInt8(r io.Reader) UInt8 { - b, err := ReadUInt8Safe(r) + b, _, err := ReadUInt8Safe(r) if err != nil { panic(err) } @@ -162,17 +186,25 @@ func (self Int16) WriteTo(w io.Writer) (int64, error) { return int64(n), err } -func ReadInt16Safe(r io.Reader) (Int16, error) { +func ReadInt16Safe(r io.Reader) (Int16, int64, error) { buf := [2]byte{0} - _, err := io.ReadFull(r, buf[:]) + n, err := io.ReadFull(r, buf[:]) if err != nil { - return Int16(0), err + return Int16(0), int64(n), err } - return Int16(binary.LittleEndian.Uint16(buf[:])), nil + return Int16(binary.LittleEndian.Uint16(buf[:])), int64(n), nil +} + +func ReadInt16N(r io.Reader) (Int16, int64) { + b, n, err := ReadInt16Safe(r) + if err != nil { + panic(err) + } + return b, n } func ReadInt16(r io.Reader) Int16 { - b, err := ReadInt16Safe(r) + b, _, err := ReadInt16Safe(r) if err != nil { panic(err) } @@ -204,17 +236,25 @@ func (self UInt16) WriteTo(w io.Writer) (int64, error) { return int64(n), err } -func ReadUInt16Safe(r io.Reader) (UInt16, error) { +func ReadUInt16Safe(r io.Reader) (UInt16, int64, error) { buf := [2]byte{0} - _, err := io.ReadFull(r, buf[:]) + n, err := io.ReadFull(r, buf[:]) if err != nil { - return UInt16(0), err + return UInt16(0), int64(n), err } - return UInt16(binary.LittleEndian.Uint16(buf[:])), nil + return UInt16(binary.LittleEndian.Uint16(buf[:])), int64(n), nil +} + +func ReadUInt16N(r io.Reader) (UInt16, int64) { + b, n, err := ReadUInt16Safe(r) + if err != nil { + panic(err) + } + return b, n } func ReadUInt16(r io.Reader) UInt16 { - b, err := ReadUInt16Safe(r) + b, _, err := ReadUInt16Safe(r) if err != nil { panic(err) } @@ -246,17 +286,25 @@ func (self Int32) WriteTo(w io.Writer) (int64, error) { return int64(n), err } -func ReadInt32Safe(r io.Reader) (Int32, error) { +func ReadInt32Safe(r io.Reader) (Int32, int64, error) { buf := [4]byte{0} - _, err := io.ReadFull(r, buf[:]) + n, err := io.ReadFull(r, buf[:]) if err != nil { - return Int32(0), err + return Int32(0), int64(n), err } - return Int32(binary.LittleEndian.Uint32(buf[:])), nil + return Int32(binary.LittleEndian.Uint32(buf[:])), int64(n), nil +} + +func ReadInt32N(r io.Reader) (Int32, int64) { + b, n, err := ReadInt32Safe(r) + if err != nil { + panic(err) + } + return b, n } func ReadInt32(r io.Reader) Int32 { - b, err := ReadInt32Safe(r) + b, _, err := ReadInt32Safe(r) if err != nil { panic(err) } @@ -288,17 +336,25 @@ func (self UInt32) WriteTo(w io.Writer) (int64, error) { return int64(n), err } -func ReadUInt32Safe(r io.Reader) (UInt32, error) { +func ReadUInt32Safe(r io.Reader) (UInt32, int64, error) { buf := [4]byte{0} - _, err := io.ReadFull(r, buf[:]) + n, err := io.ReadFull(r, buf[:]) + if err != nil { + return UInt32(0), int64(n), err + } + return UInt32(binary.LittleEndian.Uint32(buf[:])), int64(n), nil +} + +func ReadUInt32N(r io.Reader) (UInt32, int64) { + b, n, err := ReadUInt32Safe(r) if err != nil { - return UInt32(0), err + panic(err) } - return UInt32(binary.LittleEndian.Uint32(buf[:])), nil + return b, n } func ReadUInt32(r io.Reader) UInt32 { - b, err := ReadUInt32Safe(r) + b, _, err := ReadUInt32Safe(r) if err != nil { panic(err) } @@ -330,17 +386,25 @@ func (self Int64) WriteTo(w io.Writer) (int64, error) { return int64(n), err } -func ReadInt64Safe(r io.Reader) (Int64, error) { +func ReadInt64Safe(r io.Reader) (Int64, int64, error) { buf := [8]byte{0} - _, err := io.ReadFull(r, buf[:]) + n, err := io.ReadFull(r, buf[:]) + if err != nil { + return Int64(0), int64(n), err + } + return Int64(binary.LittleEndian.Uint64(buf[:])), int64(n), nil +} + +func ReadInt64N(r io.Reader) (Int64, int64) { + b, n, err := ReadInt64Safe(r) if err != nil { - return Int64(0), err + panic(err) } - return Int64(binary.LittleEndian.Uint64(buf[:])), nil + return b, n } func ReadInt64(r io.Reader) Int64 { - b, err := ReadInt64Safe(r) + b, _, err := ReadInt64Safe(r) if err != nil { panic(err) } @@ -372,87 +436,27 @@ func (self UInt64) WriteTo(w io.Writer) (int64, error) { return int64(n), err } -func ReadUInt64Safe(r io.Reader) (UInt64, error) { +func ReadUInt64Safe(r io.Reader) (UInt64, int64, error) { buf := [8]byte{0} - _, err := io.ReadFull(r, buf[:]) + n, err := io.ReadFull(r, buf[:]) if err != nil { - return UInt64(0), err + return UInt64(0), int64(n), err } - return UInt64(binary.LittleEndian.Uint64(buf[:])), nil + return UInt64(binary.LittleEndian.Uint64(buf[:])), int64(n), nil } -func ReadUInt64(r io.Reader) UInt64 { - b, err := ReadUInt64Safe(r) +func ReadUInt64N(r io.Reader) (UInt64, int64) { + b, n, err := ReadUInt64Safe(r) if err != nil { panic(err) } - return b + return b, n } -// Int - -func (self Int) Equals(other Binary) bool { - return self == other -} - -func (self Int) Less(other Binary) bool { - if o, ok := other.(Int); ok { - return self < o - } else { - panic("Cannot compare unequal types") - } -} - -func (self Int) ByteSize() int { - return 8 -} - -func (self Int) WriteTo(w io.Writer) (int64, error) { - buf := []byte{0, 0, 0, 0, 0, 0, 0, 0} - binary.LittleEndian.PutUint64(buf, uint64(self)) - n, err := w.Write(buf) - return int64(n), err -} - -func ReadInt(r io.Reader) Int { - buf := [8]byte{0} - _, err := io.ReadFull(r, buf[:]) - if err != nil { - panic(err) - } - return Int(binary.LittleEndian.Uint64(buf[:])) -} - -// UInt - -func (self UInt) Equals(other Binary) bool { - return self == other -} - -func (self UInt) Less(other Binary) bool { - if o, ok := other.(UInt); ok { - return self < o - } else { - panic("Cannot compare unequal types") - } -} - -func (self UInt) ByteSize() int { - return 8 -} - -func (self UInt) WriteTo(w io.Writer) (int64, error) { - buf := []byte{0, 0, 0, 0, 0, 0, 0, 0} - binary.LittleEndian.PutUint64(buf, uint64(self)) - n, err := w.Write(buf) - return int64(n), err -} - -func ReadUInt(r io.Reader) UInt { - buf := [8]byte{0} - _, err := io.ReadFull(r, buf[:]) +func ReadUInt64(r io.Reader) UInt64 { + b, _, err := ReadUInt64Safe(r) if err != nil { panic(err) } - return UInt(binary.LittleEndian.Uint64(buf[:])) + return b } diff --git a/binary/string.go b/binary/string.go index 9dd37d891..b5ad69d02 100644 --- a/binary/string.go +++ b/binary/string.go @@ -32,21 +32,31 @@ func (self String) WriteTo(w io.Writer) (n int64, err error) { return int64(n_ + 4), err } -func ReadStringSafe(r io.Reader) (String, error) { - length, err := ReadUInt32Safe(r) +func ReadStringSafe(r io.Reader) (str String, n int64, err error) { + length, n_, err := ReadUInt32Safe(r) + n += n_ if err != nil { - return "", err + return "", n, err } bytes := make([]byte, int(length)) - _, err = io.ReadFull(r, bytes) + n__, err := io.ReadFull(r, bytes) + n += int64(n__) if err != nil { - return "", err + return "", n, err } - return String(bytes), nil + return String(bytes), n, nil } -func ReadString(r io.Reader) String { - str, err := ReadStringSafe(r) +func ReadStringN(r io.Reader) (str String, n int64) { + str, n, err := ReadStringSafe(r) + if err != nil { + panic(err) + } + return str, n +} + +func ReadString(r io.Reader) (str String) { + str, _, err := ReadStringSafe(r) if err != nil { panic(err) } diff --git a/binary/time.go b/binary/time.go index 62f7f0663..f2fe1ea66 100644 --- a/binary/time.go +++ b/binary/time.go @@ -33,6 +33,26 @@ func (self Time) WriteTo(w io.Writer) (int64, error) { return Int64(self.Unix()).WriteTo(w) } +func ReadTimeSafe(r io.Reader) (Time, int64, error) { + t, n, err := ReadInt64Safe(r) + if err != nil { + return Time{}, n, err + } + return Time{time.Unix(int64(t), 0)}, n, nil +} + +func ReadTimeN(r io.Reader) (Time, int64) { + t, n, err := ReadTimeSafe(r) + if err != nil { + panic(err) + } + return t, n +} + func ReadTime(r io.Reader) Time { - return Time{time.Unix(int64(ReadInt64(r)), 0)} + t, _, err := ReadTimeSafe(r) + if err != nil { + panic(err) + } + return t } diff --git a/blocks/block_manager.go b/blocks/block_manager.go index 4dd44959b..e94eacb1e 100644 --- a/blocks/block_manager.go +++ b/blocks/block_manager.go @@ -1,6 +1,16 @@ package blocks import ( + "bytes" + "encoding/json" + "fmt" + "io" + "sync" + "sync/atomic" + "time" + + . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" db_ "github.com/tendermint/tendermint/db" "github.com/tendermint/tendermint/p2p" ) @@ -26,7 +36,7 @@ const ( dataTypeTxs = byte(0x03) ) -func _dataKey(dataType byte, height int) { +func _dataKey(dataType byte, height int) string { switch dataType { case dataTypeHeader: return fmt.Sprintf("H%v", height) @@ -35,11 +45,12 @@ func _dataKey(dataType byte, height int) { case dataTypeTxs: return fmt.Sprintf("T%v", height) default: - panic("Unknown datatype %X", dataType) + Panicf("Unknown datatype %X", dataType) + return "" // should not happen } } -func dataTypeFromObj(data interface{}) { +func dataTypeFromObj(data interface{}) byte { switch data.(type) { case *Header: return dataTypeHeader @@ -48,7 +59,8 @@ func dataTypeFromObj(data interface{}) { case *Txs: return dataTypeTxs default: - panic("Unexpected datatype: %v", data) + Panicf("Unexpected datatype: %v", data) + return byte(0x00) // should not happen } } @@ -99,8 +111,8 @@ func (bm *BlockManager) Stop() { // NOTE: assumes that data is already validated. func (bm *BlockManager) StoreData(dataObj interface{}) { - bm.mtx.Lock() - defer bm.mtx.Unlock() + //bm.mtx.Lock() + //defer bm.mtx.Unlock() dataType := dataTypeForObj(dataObj) dataKey := _dataKey(dataType, dataObj) // Update state diff --git a/blocks/block_test.go b/blocks/block_test.go index 95d06a33d..744ce4a9c 100644 --- a/blocks/block_test.go +++ b/blocks/block_test.go @@ -91,13 +91,13 @@ func TestBlock(t *testing.T) { Time: randVar(), PrevHash: randBytes(32), ValidationHash: randBytes(32), - DataHash: randBytes(32), + TxsHash: randBytes(32), }, Validation{ Signatures: []Signature{randSig(), randSig()}, Adjustments: []Adjustment{bond, unbond, timeout, dupeout}, }, - Data{ + Txs{ Txs: []Tx{sendTx, nameTx}, }, } diff --git a/blocks/log.go b/blocks/log.go new file mode 100644 index 000000000..1ddb0d940 --- /dev/null +++ b/blocks/log.go @@ -0,0 +1,15 @@ +package blocks + +import ( + "github.com/op/go-logging" +) + +var log = logging.MustGetLogger("block") + +func init() { + logging.SetFormatter(logging.MustStringFormatter("[%{level:.1s}] %{message}")) +} + +func SetLogger(l *logging.Logger) { + log = l +} diff --git a/common/throttler.go b/common/throttle_timer.go similarity index 53% rename from common/throttler.go rename to common/throttle_timer.go index 2076ddf1e..62ad60ec6 100644 --- a/common/throttler.go +++ b/common/throttle_timer.go @@ -6,12 +6,12 @@ import ( ) /* -Throttler fires an event at most "dur" after each .Set() call. -If a short burst of .Set() calls happens, Throttler fires once. -If a long continuous burst of .Set() calls happens, Throttler fires +ThrottleTimer fires an event at most "dur" after each .Set() call. +If a short burst of .Set() calls happens, ThrottleTimer fires once. +If a long continuous burst of .Set() calls happens, ThrottleTimer fires at most once every "dur". */ -type Throttler struct { +type ThrottleTimer struct { Ch chan struct{} quit chan struct{} dur time.Duration @@ -19,16 +19,16 @@ type Throttler struct { isSet uint32 } -func NewThrottler(dur time.Duration) *Throttler { +func NewThrottleTimer(dur time.Duration) *ThrottleTimer { var ch = make(chan struct{}) var quit = make(chan struct{}) - var t = &Throttler{Ch: ch, dur: dur, quit: quit} + var t = &ThrottleTimer{Ch: ch, dur: dur, quit: quit} t.timer = time.AfterFunc(dur, t.fireHandler) t.timer.Stop() return t } -func (t *Throttler) fireHandler() { +func (t *ThrottleTimer) fireHandler() { select { case t.Ch <- struct{}{}: atomic.StoreUint32(&t.isSet, 0) @@ -36,13 +36,13 @@ func (t *Throttler) fireHandler() { } } -func (t *Throttler) Set() { +func (t *ThrottleTimer) Set() { if atomic.CompareAndSwapUint32(&t.isSet, 0, 1) { t.timer.Reset(t.dur) } } -func (t *Throttler) Stop() bool { +func (t *ThrottleTimer) Stop() bool { close(t.quit) return t.timer.Stop() } diff --git a/log.go b/log.go index 9aad621ee..50c89fe12 100644 --- a/log.go +++ b/log.go @@ -4,6 +4,7 @@ import ( "os" "github.com/op/go-logging" + "github.com/tendermint/tendermint/block" "github.com/tendermint/tendermint/p2p" ) @@ -27,4 +28,5 @@ func init() { */ p2p.SetLogger(log) + block.SetLogger(log) } diff --git a/merkle/iavl_node.go b/merkle/iavl_node.go index 72286f167..31f2c4e38 100644 --- a/merkle/iavl_node.go +++ b/merkle/iavl_node.go @@ -305,23 +305,21 @@ func (self *IAVLNode) fill(db Db) { self.height = uint8(ReadUInt8(r)) self.size = uint64(ReadUInt64(r)) // key - key := ReadBinary(r) + key, _ := ReadBinaryN(r) self.key = key.(Key) if self.height == 0 { // value - self.value = ReadBinary(r) + self.value, _ = ReadBinaryN(r) } else { // left - var leftHash ByteSlice - leftHash = ReadByteSlice(r) + leftHash := ReadByteSlice(r) self.left = &IAVLNode{ hash: leftHash, flags: IAVLNODE_FLAG_PERSISTED | IAVLNODE_FLAG_PLACEHOLDER, } // right - var rightHash ByteSlice - rightHash = ReadByteSlice(r) + rightHash := ReadByteSlice(r) self.right = &IAVLNode{ hash: rightHash, flags: IAVLNODE_FLAG_PERSISTED | IAVLNODE_FLAG_PLACEHOLDER, diff --git a/p2p/connection.go b/p2p/connection.go index 9af897bd2..a3e5b7ce9 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -3,85 +3,110 @@ package p2p import ( "bufio" "fmt" + "io" + "math" "net" "sync/atomic" "time" + flow "code.google.com/p/mxk/go1/flowcontrol" "github.com/op/go-logging" . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" ) const ( + numBatchPackets = 10 minReadBufferSize = 1024 minWriteBufferSize = 1024 flushThrottleMS = 50 - outQueueSize = 50 idleTimeoutMinutes = 5 + updateStatsSeconds = 2 pingTimeoutMinutes = 2 + defaultSendRate = 51200 // 5Kb/s + defaultRecvRate = 51200 // 5Kb/s ) /* -A Connection wraps a network connection and handles buffering and multiplexing. -"Packets" are sent with ".Send(Packet)". -Packets received are sent to channels as commanded by the ".Start(...)" method. +A MConnection wraps a network connection and handles buffering and multiplexing. +ByteSlices are sent with ".Send(channelId, bytes)". +Inbound ByteSlices are pushed to the designated chan<- InboundBytes. */ -type Connection struct { - ioStats IOStats - - sendQueue chan Packet // never closes - conn net.Conn - bufReader *bufio.Reader - bufWriter *bufio.Writer - flushThrottler *Throttler - quit chan struct{} - pingRepeatTimer *RepeatTimer - pong chan struct{} - channels map[string]*Channel - onError func(interface{}) - started uint32 - stopped uint32 - errored uint32 +type MConnection struct { + conn net.Conn + bufReader *bufio.Reader + bufWriter *bufio.Writer + sendMonitor *flow.Monitor + recvMonitor *flow.Monitor + sendRate int64 + recvRate int64 + flushTimer *ThrottleTimer // flush writes as necessary but throttled. + canSend chan struct{} + quit chan struct{} + pingTimer *RepeatTimer // send pings periodically + pong chan struct{} + chStatsTimer *RepeatTimer // update channel stats periodically + channels []*Channel + channelsIdx map[byte]*Channel + onError func(interface{}) + started uint32 + stopped uint32 + errored uint32 + + _peer *Peer // hacky optimization } -const ( - packetTypePing = UInt8(0x00) - packetTypePong = UInt8(0x01) - packetTypeMessage = UInt8(0x10) -) +func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onError func(interface{})) *MConnection { + + mconn := &MConnection{ + conn: conn, + bufReader: bufio.NewReaderSize(conn, minReadBufferSize), + bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize), + sendMonitor: flow.New(0, 0), + recvMonitor: flow.New(0, 0), + sendRate: defaultSendRate, + recvRate: defaultRecvRate, + flushTimer: NewThrottleTimer(flushThrottleMS * time.Millisecond), + canSend: make(chan struct{}, 1), + quit: make(chan struct{}), + pingTimer: NewRepeatTimer(pingTimeoutMinutes * time.Minute), + pong: make(chan struct{}), + chStatsTimer: NewRepeatTimer(updateStatsSeconds * time.Second), + onError: onError, + } + + // Create channels + var channelsIdx = map[byte]*Channel{} + var channels = []*Channel{} -func NewConnection(conn net.Conn) *Connection { - return &Connection{ - sendQueue: make(chan Packet, outQueueSize), - conn: conn, - bufReader: bufio.NewReaderSize(conn, minReadBufferSize), - bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize), - flushThrottler: NewThrottler(flushThrottleMS * time.Millisecond), - quit: make(chan struct{}), - pingRepeatTimer: NewRepeatTimer(pingTimeoutMinutes * time.Minute), - pong: make(chan struct{}), + for _, desc := range chDescs { + channel := newChannel(mconn, desc) + channelsIdx[channel.id] = channel + channels = append(channels, channel) } + mconn.channels = channels + mconn.channelsIdx = channelsIdx + + return mconn } // .Start() begins multiplexing packets to and from "channels". -// If an error occurs, the recovered reason is passed to "onError". -func (c *Connection) Start(channels map[string]*Channel, onError func(interface{})) { +func (c *MConnection) Start() { if atomic.CompareAndSwapUint32(&c.started, 0, 1) { log.Debug("Starting %v", c) - c.channels = channels - c.onError = onError go c.sendHandler() go c.recvHandler() } } -func (c *Connection) Stop() { +func (c *MConnection) Stop() { if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { log.Debug("Stopping %v", c) close(c.quit) c.conn.Close() - c.flushThrottler.Stop() - c.pingRepeatTimer.Stop() + c.flushTimer.Stop() + c.chStatsTimer.Stop() + c.pingTimer.Stop() // We can't close pong safely here because // recvHandler may write to it after we've stopped. // Though it doesn't need to get closed at all, @@ -90,81 +115,123 @@ func (c *Connection) Stop() { } } -func (c *Connection) LocalAddress() *NetAddress { +func (c *MConnection) LocalAddress() *NetAddress { return NewNetAddress(c.conn.LocalAddr()) } -func (c *Connection) RemoteAddress() *NetAddress { +func (c *MConnection) RemoteAddress() *NetAddress { return NewNetAddress(c.conn.RemoteAddr()) } -// Returns true if successfully queued, -// Returns false if connection was closed. -// Blocks. -func (c *Connection) Send(pkt Packet) bool { - select { - case c.sendQueue <- pkt: - return true - case <-c.quit: - return false - } -} - -func (c *Connection) String() string { +func (c *MConnection) String() string { return fmt.Sprintf("/%v/", c.conn.RemoteAddr()) } -func (c *Connection) flush() { - // TODO: this is pretty naive. - // We end up flushing when we don't have to (yet). - // A better solution might require us implementing our own buffered writer. +func (c *MConnection) flush() { err := c.bufWriter.Flush() if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { - log.Warning("Connection flush failed: %v", err) + log.Warning("MConnection flush failed: %v", err) } } } // Catch panics, usually caused by remote disconnects. -func (c *Connection) _recover() { +func (c *MConnection) _recover() { if r := recover(); r != nil { - c.Stop() - if atomic.CompareAndSwapUint32(&c.errored, 0, 1) { - if c.onError != nil { - c.onError(r) - } + c.stopForError(r) + } +} + +func (c *MConnection) stopForError(r interface{}) { + c.Stop() + if atomic.CompareAndSwapUint32(&c.errored, 0, 1) { + if c.onError != nil { + c.onError(r) } } } -// sendHandler pulls from .sendQueue and writes to .bufWriter -func (c *Connection) sendHandler() { +// Queues a message to be sent to channel. +func (c *MConnection) Send(chId byte, bytes ByteSlice) bool { + // Send message to channel. + channel, ok := c.channelsIdx[chId] + if !ok { + log.Error("Cannot send bytes, unknown channel %X", chId) + return false + } + + channel.sendBytes(bytes) + + // Wake up sendHandler if necessary + select { + case c.canSend <- struct{}{}: + default: + } + + return true +} + +// Queues a message to be sent to channel. +// Nonblocking, returns true if successful. +func (c *MConnection) TrySend(chId byte, bytes ByteSlice) bool { + // Send message to channel. + channel, ok := c.channelsIdx[chId] + if !ok { + log.Error("Cannot send bytes, unknown channel %X", chId) + return false + } + + ok = channel.trySendBytes(bytes) + if ok { + // Wake up sendHandler if necessary + select { + case c.canSend <- struct{}{}: + default: + } + } + + return ok +} + +// sendHandler polls for packets to send from channels. +func (c *MConnection) sendHandler() { defer c._recover() FOR_LOOP: for { var err error select { - case sendPkt := <-c.sendQueue: - _, err = packetTypeMessage.WriteTo(c.bufWriter) - if err != nil { - break - } - _, err = sendPkt.WriteTo(c.bufWriter) - c.flushThrottler.Set() - case <-c.flushThrottler.Ch: + case <-c.flushTimer.Ch: + // NOTE: flushTimer.Set() must be called every time + // something is written to .bufWriter. c.flush() - case <-c.pingRepeatTimer.Ch: - _, err = packetTypePing.WriteTo(c.bufWriter) - // log.Debug("PING %v", c) + case <-c.chStatsTimer.Ch: + for _, channel := range c.channels { + channel.updateStats() + } + case <-c.pingTimer.Ch: + var n int64 + n, err = packetTypePing.WriteTo(c.bufWriter) + c.sendMonitor.Update(int(n)) c.flush() case <-c.pong: - _, err = packetTypePong.WriteTo(c.bufWriter) - // log.Debug("PONG %v", c) + var n int64 + n, err = packetTypePong.WriteTo(c.bufWriter) + c.sendMonitor.Update(int(n)) c.flush() case <-c.quit: break FOR_LOOP + case <-c.canSend: + // Send some packets + eof := c.sendSomePackets() + if !eof { + // Keep sendHandler awake. + select { + case c.canSend <- struct{}{}: + default: + } + } } if atomic.LoadUint32(&c.stopped) == 1 { @@ -180,22 +247,75 @@ FOR_LOOP: // Cleanup } -// recvHandler reads from .bufReader and pushes to the appropriate -// channel's recvQueue. -func (c *Connection) recvHandler() { +// Returns true if messages from channels were exhausted. +// Blocks in accordance to .sendMonitor throttling. +func (c *MConnection) sendSomePackets() bool { + // Block until .sendMonitor says we can write. + // Once we're ready we send more than we asked for, + // but amortized it should even out. + c.sendMonitor.Limit(maxPacketSize, atomic.LoadInt64(&c.sendRate), true) + + // Now send some packets. + for i := 0; i < numBatchPackets; i++ { + if c.sendPacket() { + return true + } + } + return false +} + +// Returns true if messages from channels were exhausted. +func (c *MConnection) sendPacket() bool { + // Choose a channel to create a packet from. + // The chosen channel will be the one whose recentlySent/priority is the least. + var leastRatio float32 = math.MaxFloat32 + var leastChannel *Channel + for _, channel := range c.channels { + // If nothing to send, skip this channel + if !channel.sendPending() { + continue + } + // Get ratio, and keep track of lowest ratio. + ratio := float32(channel.recentlySent) / float32(channel.priority) + if ratio < leastRatio { + leastRatio = ratio + leastChannel = channel + } + } + + // Nothing to send? + if leastChannel == nil { + return true + } else { + log.Debug("Found a packet to send") + } + + // Make & send a packet from this channel + n, err := leastChannel.writePacketTo(c.bufWriter) + if err != nil { + log.Warning("Failed to write packet. Error: %v", err) + c.stopForError(err) + return true + } + c.sendMonitor.Update(int(n)) + c.flushTimer.Set() + return false +} + +// recvHandler reads packets and reconstructs the message using the channels' "recving" buffer. +// After a whole message has been assembled, it's pushed to the Channel's recvQueue. +// Blocks depending on how the connection is throttled. +func (c *MConnection) recvHandler() { defer c._recover() FOR_LOOP: for { - pktType, err := ReadUInt8Safe(c.bufReader) - if log.IsEnabledFor(logging.DEBUG) { - // peeking into bufReader - numBytes := c.bufReader.Buffered() - bytes, err := c.bufReader.Peek(MinInt(numBytes, 100)) - if err != nil { - log.Debug("recvHandler packet type %X, peeked: %X", pktType, bytes) - } - } + // Block until .recvMonitor says we can read. + c.recvMonitor.Limit(maxPacketSize, atomic.LoadInt64(&c.recvRate), true) + + // Read packet type + pktType, n, err := ReadUInt8Safe(c.bufReader) + c.recvMonitor.Update(int(n)) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { log.Info("%v failed @ recvHandler with err: %v", c, err) @@ -204,15 +324,25 @@ FOR_LOOP: break FOR_LOOP } + // Peek into bufReader for debugging + if log.IsEnabledFor(logging.DEBUG) { + numBytes := c.bufReader.Buffered() + bytes, err := c.bufReader.Peek(MinInt(numBytes, 100)) + if err != nil { + log.Debug("recvHandler packet type %X, peeked: %X", pktType, bytes) + } + } + + // Read more depending on packet type. switch pktType { case packetTypePing: - // TODO: keep track of these, make sure it isn't abused - // as they cause flush()'s in the send buffer. + // TODO: prevent abuse, as they cause flush()'s. c.pong <- struct{}{} case packetTypePong: // do nothing case packetTypeMessage: - pkt, err := ReadPacketSafe(c.bufReader) + pkt, n, err := readPacketSafe(c.bufReader) + c.recvMonitor.Update(int(n)) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { log.Info("%v failed @ recvHandler", c) @@ -220,16 +350,18 @@ FOR_LOOP: } break FOR_LOOP } - channel := c.channels[string(pkt.Channel)] + channel := c.channels[pkt.ChannelId] if channel == nil { - Panicf("Unknown channel %v", pkt.Channel) + Panicf("Unknown channel %v", pkt.ChannelId) } - channel.recvQueue <- pkt + channel.recvPacket(pkt) default: Panicf("Unknown message type %v", pktType) } - c.pingRepeatTimer.Reset() + // TODO: shouldn't this go in the sendHandler? + // Better to send a packet when *we* haven't sent anything for a while. + c.pingTimer.Reset() } // Cleanup @@ -239,13 +371,191 @@ FOR_LOOP: } } -/* IOStats */ -type IOStats struct { - TimeConnected Time - LastSent Time - LastRecv Time - BytesRecv UInt64 - BytesSent UInt64 - PktsRecv UInt64 - PktsSent UInt64 +//----------------------------------------------------------------------------- + +type ChannelDescriptor struct { + Id byte + SendQueueCapacity int // One per MConnection. + RecvQueueCapacity int // Global for this channel. + RecvBufferSize int + DefaultPriority uint + + // TODO: kinda hacky. + // This is created by the switch, one per channel. + recvQueue chan InboundBytes +} + +// TODO: lowercase. +// NOTE: not goroutine-safe. +type Channel struct { + conn *MConnection + desc *ChannelDescriptor + id byte + recvQueue chan InboundBytes + sendQueue chan ByteSlice + recving ByteSlice + sending ByteSlice + priority uint + recentlySent int64 // exponential moving average +} + +func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { + if desc.DefaultPriority <= 0 { + panic("Channel default priority must be a postive integer") + } + return &Channel{ + conn: conn, + desc: desc, + id: desc.Id, + recvQueue: desc.recvQueue, + sendQueue: make(chan ByteSlice, desc.SendQueueCapacity), + recving: make([]byte, 0, desc.RecvBufferSize), + priority: desc.DefaultPriority, + } +} + +// Queues message to send to this channel. +func (ch *Channel) sendBytes(bytes ByteSlice) { + ch.sendQueue <- bytes +} + +// Queues message to send to this channel. +// Nonblocking, returns true if successful. +func (ch *Channel) trySendBytes(bytes ByteSlice) bool { + select { + case ch.sendQueue <- bytes: + return true + default: + return false + } +} + +// Returns true if any packets are pending to be sent. +func (ch *Channel) sendPending() bool { + if len(ch.sending) == 0 { + if len(ch.sendQueue) == 0 { + return false + } + ch.sending = <-ch.sendQueue + } + return true +} + +// Creates a new packet to send. +func (ch *Channel) nextPacket() packet { + packet := packet{} + packet.ChannelId = Byte(ch.id) + packet.Bytes = ch.sending[:MinInt(maxPacketSize, len(ch.sending))] + if len(ch.sending) <= maxPacketSize { + packet.EOF = Byte(0x01) + ch.sending = nil + } else { + packet.EOF = Byte(0x00) + ch.sending = ch.sending[MinInt(maxPacketSize, len(ch.sending)):] + } + return packet +} + +// Writes next packet to w. +func (ch *Channel) writePacketTo(w io.Writer) (n int64, err error) { + packet := ch.nextPacket() + n, err = WriteTo(packetTypeMessage, w, n, err) + n, err = WriteTo(packet, w, n, err) + if err != nil { + ch.recentlySent += n + } + return +} + +// Handles incoming packets. +func (ch *Channel) recvPacket(pkt packet) { + ch.recving = append(ch.recving, pkt.Bytes...) + if pkt.EOF == Byte(0x01) { + ch.recvQueue <- InboundBytes{ch.conn, ch.recving} + ch.recving = make([]byte, 0, ch.desc.RecvBufferSize) + } +} + +// Call this periodically to update stats for throttling purposes. +func (ch *Channel) updateStats() { + // Exponential decay of stats. + // TODO: optimize. + ch.recentlySent = int64(float64(ch.recentlySent) * 0.5) +} + +//----------------------------------------------------------------------------- + +const ( + maxPacketSize = 1024 + packetTypePing = UInt8(0x00) + packetTypePong = UInt8(0x01) + packetTypeMessage = UInt8(0x10) +) + +// Messages in channels are chopped into smaller packets for multiplexing. +type packet struct { + ChannelId Byte + EOF Byte // 1 means message ends here. + Bytes ByteSlice +} + +func (p packet) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(p.ChannelId, w, n, err) + n, err = WriteTo(p.EOF, w, n, err) + n, err = WriteTo(p.Bytes, w, n, err) + return +} + +func (p packet) String() string { + return fmt.Sprintf("%v:%X", p.ChannelId, p.Bytes) +} + +func readPacketSafe(r io.Reader) (pkt packet, n int64, err error) { + chId, n_, err := ReadByteSafe(r) + n += n_ + if err != nil { + return + } + eof, n_, err := ReadByteSafe(r) + n += n_ + if err != nil { + return + } + // TODO: packet length sanity check. + bytes, n_, err := ReadByteSliceSafe(r) + n += n_ + if err != nil { + return + } + return packet{chId, eof, bytes}, n, nil +} + +//----------------------------------------------------------------------------- + +type InboundBytes struct { + MConn *MConnection + Bytes ByteSlice +} + +//----------------------------------------------------------------------------- + +// Convenience struct for writing typed messages. +// Reading requires a custom decoder that switches on the first type byte of a ByteSlice. +type TypedMessage struct { + Type Byte + Msg Binary +} + +func (tm TypedMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(tm.Type, w, n, err) + n, err = WriteTo(tm.Msg, w, n, err) + return +} + +func (tm TypedMessage) String() string { + return fmt.Sprintf("<%X:%v>", tm.Type, tm.Msg) +} + +func (tm TypedMessage) Bytes() ByteSlice { + return BinaryBytes(tm) } diff --git a/p2p/listener.go b/p2p/listener.go index 9b505a33e..168a43d16 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -15,18 +15,18 @@ import ( Listener is part of a Server. */ type Listener interface { - Connections() <-chan *Connection + Connections() <-chan net.Conn ExternalAddress() *NetAddress Stop() } /* -DefaultListener is an implementation that works on the golang network stack. +DefaultListener is an implementation of Listener. */ type DefaultListener struct { listener net.Listener extAddr *NetAddress - connections chan *Connection + connections chan net.Conn stopped uint32 } @@ -77,7 +77,7 @@ func NewDefaultListener(protocol string, lAddr string) Listener { dl := &DefaultListener{ listener: listener, extAddr: extAddr, - connections: make(chan *Connection, numBufferedConnections), + connections: make(chan net.Conn, numBufferedConnections), } go dl.listenHandler() @@ -100,8 +100,7 @@ func (l *DefaultListener) listenHandler() { panic(err) } - c := NewConnection(conn) - l.connections <- c + l.connections <- conn } // Cleanup @@ -113,7 +112,7 @@ func (l *DefaultListener) listenHandler() { // A channel of inbound connections. // It gets closed when the listener closes. -func (l *DefaultListener) Connections() <-chan *Connection { +func (l *DefaultListener) Connections() <-chan net.Conn { return l.connections } diff --git a/p2p/msg.go b/p2p/msg.go deleted file mode 100644 index 57eeb9b63..000000000 --- a/p2p/msg.go +++ /dev/null @@ -1,94 +0,0 @@ -package p2p - -import ( - "bytes" - "fmt" - "io" - - . "github.com/tendermint/tendermint/binary" -) - -/* -A Message is anything that can be serialized. -The resulting serialized bytes of Message don't contain type information, -so messages are typically wrapped in a TypedMessage before put in the wire. -*/ -type Message interface { - Binary -} - -/* -A TypedMessage extends a Message with a single byte of type information. -When deserializing a message from the wire, a switch statement is needed -to dispatch to the correct constructor, typically named "ReadXXXMessage". -*/ -type TypedMessage struct { - Type Byte - Message Message -} - -func (tm TypedMessage) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(tm.Type, w, n, err) - n, err = WriteTo(tm.Message, w, n, err) - return -} - -func (tm TypedMessage) String() string { - return fmt.Sprintf("0x%X⋺%v", tm.Type, tm.Message) -} - -//----------------------------------------------------------------------------- - -/* -Packet encapsulates a ByteSlice on a Channel. -Typically the Bytes are the serialized form of a TypedMessage. -*/ -type Packet struct { - Channel String - Bytes ByteSlice - // Hash -} - -func NewPacket(chName String, msg Binary) Packet { - msgBytes := BinaryBytes(msg) - return Packet{ - Channel: chName, - Bytes: msgBytes, - } -} - -func (p Packet) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteTo(p.Channel, w, n, err) - n, err = WriteTo(p.Bytes, w, n, err) - return -} - -func (p Packet) Reader() io.Reader { - return bytes.NewReader(p.Bytes) -} - -func (p Packet) String() string { - return fmt.Sprintf("%v:%X", p.Channel, p.Bytes) -} - -func ReadPacketSafe(r io.Reader) (pkt Packet, err error) { - chName, err := ReadStringSafe(r) - if err != nil { - return - } - // TODO: packet length sanity check. - bytes, err := ReadByteSliceSafe(r) - if err != nil { - return - } - return Packet{Channel: chName, Bytes: bytes}, nil -} - -/* -InboundPacket extends Packet with fields relevant to inbound packets. -*/ -type InboundPacket struct { - Peer *Peer - Time Time - Packet -} diff --git a/p2p/netaddress.go b/p2p/netaddress.go index 09977d36b..410a1102f 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -55,9 +55,11 @@ func NewNetAddressIPPort(ip net.IP, port UInt16) *NetAddress { } func ReadNetAddress(r io.Reader) *NetAddress { + ipBytes := ReadByteSlice(r) + port := ReadUInt16(r) return &NetAddress{ - IP: net.IP(ReadByteSlice(r)), - Port: ReadUInt16(r), + IP: net.IP(ipBytes), + Port: port, } } @@ -89,20 +91,20 @@ func (na *NetAddress) String() string { return addr } -func (na *NetAddress) Dial() (*Connection, error) { +func (na *NetAddress) Dial() (net.Conn, error) { conn, err := net.Dial("tcp", na.String()) if err != nil { return nil, err } - return NewConnection(conn), nil + return conn, nil } -func (na *NetAddress) DialTimeout(timeout time.Duration) (*Connection, error) { +func (na *NetAddress) DialTimeout(timeout time.Duration) (net.Conn, error) { conn, err := net.DialTimeout("tcp", na.String(), timeout) if err != nil { return nil, err } - return NewConnection(conn), nil + return conn, nil } func (na *NetAddress) Routable() bool { diff --git a/p2p/peer.go b/p2p/peer.go index 1fa2d0cac..f6039e955 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -3,8 +3,8 @@ package p2p import ( "fmt" "io" + "net" "sync/atomic" - "time" . "github.com/tendermint/tendermint/binary" ) @@ -13,45 +13,38 @@ import ( type Peer struct { outbound bool - conn *Connection - channels map[string]*Channel - quit chan struct{} + mconn *MConnection started uint32 stopped uint32 } -func newPeer(conn *Connection, channels map[string]*Channel) *Peer { - return &Peer{ - conn: conn, - channels: channels, - quit: make(chan struct{}), +func newPeer(conn net.Conn, outbound bool, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { + var p *Peer + onError := func(r interface{}) { + p.stop() + onPeerError(p, r) + } + mconn := NewMConnection(conn, chDescs, onError) + p = &Peer{ + outbound: outbound, + mconn: mconn, stopped: 0, } + mconn._peer = p // hacky optimization + return p } -func (p *Peer) start(pktRecvQueues map[string]chan *InboundPacket, onPeerError func(*Peer, interface{})) { - log.Debug("Starting %v", p) - +func (p *Peer) start() { if atomic.CompareAndSwapUint32(&p.started, 0, 1) { - // on connection error - onError := func(r interface{}) { - p.stop() - onPeerError(p, r) - } - p.conn.Start(p.channels, onError) - for chName, _ := range p.channels { - chInQueue := pktRecvQueues[chName] - go p.recvHandler(chName, chInQueue) - go p.sendHandler(chName) - } + log.Debug("Starting %v", p) + p.mconn.Start() } } func (p *Peer) stop() { if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) { log.Debug("Stopping %v", p) - close(p.quit) - p.conn.Stop() + p.mconn.Stop() } } @@ -60,44 +53,21 @@ func (p *Peer) IsOutbound() bool { } func (p *Peer) RemoteAddress() *NetAddress { - return p.conn.RemoteAddress() + return p.mconn.RemoteAddress() } -func (p *Peer) Channel(chName string) *Channel { - return p.channels[chName] -} - -// TrySend returns true if the packet was successfully queued. -// Returning true does not imply that the packet will be sent. -func (p *Peer) TrySend(pkt Packet) bool { - channel := p.Channel(string(pkt.Channel)) - sendQueue := channel.sendQueue - +func (p *Peer) TrySend(chId byte, bytes ByteSlice) bool { if atomic.LoadUint32(&p.stopped) == 1 { return false } - - select { - case sendQueue <- pkt: - log.Debug("SEND %v: %v", p, pkt) - return true - default: // buffer full - log.Debug("FAIL SEND %v: %v", p, pkt) - return false - } + return p.mconn.TrySend(chId, bytes) } -func (p *Peer) Send(pkt Packet) bool { - channel := p.Channel(string(pkt.Channel)) - sendQueue := channel.sendQueue - +func (p *Peer) Send(chId byte, bytes ByteSlice) bool { if atomic.LoadUint32(&p.stopped) == 1 { return false } - - sendQueue <- pkt - log.Debug("SEND %v: %v", p, pkt) - return true + return p.mconn.Send(chId, bytes) } func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { @@ -106,104 +76,8 @@ func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { func (p *Peer) String() string { if p.outbound { - return fmt.Sprintf("P(->%v)", p.conn) + return fmt.Sprintf("P(->%v)", p.mconn) } else { - return fmt.Sprintf("P(%v->)", p.conn) - } -} - -// sendHandler pulls from a channel and pushes to the connection. -// Each channel gets its own sendHandler goroutine; -// Golang's channel implementation handles the scheduling. -func (p *Peer) sendHandler(chName string) { - // log.Debug("%v sendHandler [%v]", p, chName) - channel := p.channels[chName] - sendQueue := channel.sendQueue -FOR_LOOP: - for { - select { - case <-p.quit: - break FOR_LOOP - case pkt := <-sendQueue: - // blocks until the connection is Stop'd, - // which happens when this peer is Stop'd. - p.conn.Send(pkt) - } - } - - // log.Debug("%v sendHandler [%v] closed", p, chName) - // Cleanup -} - -// recvHandler pulls from a channel and pushes to the given pktRecvQueue. -// Each channel gets its own recvHandler goroutine. -// Many peers have goroutines that push to the same pktRecvQueue. -// Golang's channel implementation handles the scheduling. -func (p *Peer) recvHandler(chName string, pktRecvQueue chan<- *InboundPacket) { - // log.Debug("%v recvHandler [%v]", p, chName) - channel := p.channels[chName] - recvQueue := channel.recvQueue - -FOR_LOOP: - for { - select { - case <-p.quit: - break FOR_LOOP - case pkt := <-recvQueue: - // send to pktRecvQueue - inboundPacket := &InboundPacket{ - Peer: p, - Time: Time{time.Now()}, - Packet: pkt, - } - select { - case <-p.quit: - break FOR_LOOP - case pktRecvQueue <- inboundPacket: - continue - } - } + return fmt.Sprintf("P(%v->)", p.mconn) } - - // log.Debug("%v recvHandler [%v] closed", p, chName) - // Cleanup -} - -//----------------------------------------------------------------------------- - -/* ChannelDescriptor */ - -type ChannelDescriptor struct { - Name string - SendBufferSize int - RecvBufferSize int -} - -/* Channel */ - -type Channel struct { - name string - recvQueue chan Packet - sendQueue chan Packet - //stats Stats -} - -func newChannel(desc ChannelDescriptor) *Channel { - return &Channel{ - name: desc.Name, - recvQueue: make(chan Packet, desc.RecvBufferSize), - sendQueue: make(chan Packet, desc.SendBufferSize), - } -} - -func (c *Channel) Name() string { - return c.name -} - -func (c *Channel) RecvQueue() <-chan Packet { - return c.recvQueue -} - -func (c *Channel) SendQueue() chan<- Packet { - return c.sendQueue } diff --git a/p2p/peer_manager.go b/p2p/peer_manager.go index bbd8753bc..78d4688c3 100644 --- a/p2p/peer_manager.go +++ b/p2p/peer_manager.go @@ -15,7 +15,7 @@ import ( var pexErrInvalidMessage = errors.New("Invalid PEX message") const ( - pexCh = "PEX" + pexCh = byte(0x00) ensurePeersPeriodSeconds = 30 minNumOutboundPeers = 10 maxNumPeers = 50 @@ -67,13 +67,13 @@ func (pm *PeerManager) Stop() { func (pm *PeerManager) RequestPEX(peer *Peer) { msg := &pexRequestMessage{} tm := TypedMessage{msgTypeRequest, msg} - peer.TrySend(NewPacket(pexCh, tm)) + peer.TrySend(pexCh, tm.Bytes()) } func (pm *PeerManager) SendAddrs(peer *Peer, addrs []*NetAddress) { msg := &pexAddrsMessage{Addrs: addrs} tm := TypedMessage{msgTypeAddrs, msg} - peer.Send(NewPacket(pexCh, tm)) + peer.Send(pexCh, tm.Bytes()) } // For new outbound peers, announce our listener addresses if any, @@ -170,38 +170,38 @@ func (pm *PeerManager) ensurePeers() { func (pm *PeerManager) requestHandler() { for { - inPkt := pm.sw.Receive(pexCh) // {Peer, Time, Packet} - if inPkt == nil { + inBytes, ok := pm.sw.Receive(pexCh) // {Peer, Time, Packet} + if !ok { // Client has stopped break } // decode message - msg := decodeMessage(inPkt.Bytes) + msg := decodeMessage(inBytes.Bytes) log.Info("requestHandler received %v", msg) switch msg.(type) { case *pexRequestMessage: - // inPkt.Peer requested some peers. + // inBytes.MConn._peer requested some peers. // TODO: prevent abuse. addrs := pm.book.GetSelection() msg := &pexAddrsMessage{Addrs: addrs} tm := TypedMessage{msgTypeRequest, msg} - queued := inPkt.Peer.TrySend(NewPacket(pexCh, tm)) + queued := inBytes.MConn._peer.TrySend(pexCh, tm.Bytes()) if !queued { // ignore } case *pexAddrsMessage: - // We received some peer addresses from inPkt.Peer. + // We received some peer addresses from inBytes.MConn._peer. // TODO: prevent abuse. // (We don't want to get spammed with bad peers) - srcAddr := inPkt.Peer.RemoteAddress() + srcAddr := inBytes.MConn._peer.RemoteAddress() for _, addr := range msg.(*pexAddrsMessage).Addrs { pm.book.AddAddress(addr, srcAddr) } default: // Ignore unknown message. - // pm.sw.StopPeerForError(inPkt.Peer, pexErrInvalidMessage) + // pm.sw.StopPeerForError(inBytes.MConn._peer, pexErrInvalidMessage) } } @@ -220,7 +220,7 @@ const ( ) // TODO: check for unnecessary extra bytes at the end. -func decodeMessage(bz ByteSlice) (msg Message) { +func decodeMessage(bz ByteSlice) (msg interface{}) { // log.Debug("decoding msg bytes: %X", bz) switch Byte(bz[0]) { case msgTypeRequest: diff --git a/p2p/switch.go b/p2p/switch.go index 03acb326e..bea91110b 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -2,9 +2,11 @@ package p2p import ( "errors" + "net" "sync/atomic" "time" + . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" ) @@ -12,22 +14,26 @@ import ( All communication amongst peers are multiplexed by "channels". (Not the same as Go "channels") -To send a message, encapsulate it into a "Packet" and send it to each peer. +To send a message, serialize it into a ByteSlice and send it to each peer. +For best performance, re-use the same immutable ByteSlice to each peer. +You can also use a TypedBytes{} struct for convenience. You can find all connected and active peers by iterating over ".Peers().List()". ".Broadcast()" is provided for convenience, but by iterating over the peers manually the caller can decide which subset receives a message. Inbound messages are received by calling ".Receive()". +The receiver is responsible for decoding the message bytes, which may be preceded +by a single type byte if a TypedBytes{} was used. */ type Switch struct { - channels []ChannelDescriptor - pktRecvQueues map[string]chan *InboundPacket - peers *PeerSet - dialing *CMap - listeners *CMap // name -> chan interface{} - quit chan struct{} - started uint32 - stopped uint32 + chDescs []*ChannelDescriptor + recvQueues map[byte]chan InboundBytes + peers *PeerSet + dialing *CMap + listeners *CMap // listenerName -> chan interface{} + quit chan struct{} + started uint32 + stopped uint32 } var ( @@ -39,22 +45,22 @@ const ( peerDialTimeoutSeconds = 30 ) -func NewSwitch(channels []ChannelDescriptor) *Switch { - // make pktRecvQueues... - pktRecvQueues := make(map[string]chan *InboundPacket) - for _, chDesc := range channels { - // XXX: buffer this - pktRecvQueues[chDesc.Name] = make(chan *InboundPacket) +func NewSwitch(chDescs []*ChannelDescriptor) *Switch { + s := &Switch{ + chDescs: chDescs, + recvQueues: make(map[byte]chan InboundBytes), + peers: NewPeerSet(), + dialing: NewCMap(), + listeners: NewCMap(), + quit: make(chan struct{}), + stopped: 0, } - s := &Switch{ - channels: channels, - pktRecvQueues: pktRecvQueues, - peers: NewPeerSet(), - dialing: NewCMap(), - listeners: NewCMap(), - quit: make(chan struct{}), - stopped: 0, + // Create global recvQueues, one per channel. + for _, chDesc := range chDescs { + recvQueue := make(chan InboundBytes, chDesc.RecvQueueCapacity) + chDesc.recvQueue = recvQueue + s.recvQueues[chDesc.Id] = recvQueue } return s @@ -79,18 +85,12 @@ func (s *Switch) Stop() { } } -func (s *Switch) AddPeerWithConnection(conn *Connection, outbound bool) (*Peer, error) { +func (s *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { if atomic.LoadUint32(&s.stopped) == 1 { return nil, ErrSwitchStopped } - // Create channels for peer - channels := map[string]*Channel{} - for _, chDesc := range s.channels { - channels[chDesc.Name] = newChannel(chDesc) - } - peer := newPeer(conn, channels) - peer.outbound = outbound + peer := newPeer(conn, outbound, s.chDescs, s.StopPeerForError) // Add the peer to .peers if s.peers.Add(peer) { @@ -101,7 +101,7 @@ func (s *Switch) AddPeerWithConnection(conn *Connection, outbound bool) (*Peer, } // Start the peer - go peer.start(s.pktRecvQueues, s.StopPeerForError) + go peer.start() // Notify listeners. s.emit(SwitchEventNewPeer{Peer: peer}) @@ -132,14 +132,16 @@ func (s *Switch) IsDialing(addr *NetAddress) bool { return s.dialing.Has(addr.String()) } -func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) { +func (s *Switch) Broadcast(chId byte, msg Binary) (numSuccess, numFailure int) { if atomic.LoadUint32(&s.stopped) == 1 { return } - log.Debug("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes)) + msgBytes := BinaryBytes(msg) + + log.Debug("Broadcast on [%X] len: %v", chId, len(msgBytes)) for _, peer := range s.peers.List() { - success := peer.TrySend(pkt) + success := peer.TrySend(chId, msgBytes) log.Debug("Broadcast for peer %v success: %v", peer, success) if success { numSuccess += 1 @@ -164,22 +166,22 @@ func (s *Switch) RemoveEventListener(name string) { /* Receive blocks on a channel until a message is found. */ -func (s *Switch) Receive(chName string) *InboundPacket { +func (s *Switch) Receive(chId byte) (InboundBytes, bool) { if atomic.LoadUint32(&s.stopped) == 1 { - return nil + return InboundBytes{}, false } - q := s.pktRecvQueues[chName] + q := s.recvQueues[chId] if q == nil { - Panicf("Expected pktRecvQueues[%f], found none", chName) + Panicf("Expected recvQueues[%X], found none", chId) } select { case <-s.quit: - return nil - case inPacket := <-q: - log.Debug("RECV %v", inPacket) - return inPacket + return InboundBytes{}, false + case inBytes := <-q: + log.Debug("RECV %v", inBytes) + return inBytes, true } } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 36a957fc1..2ad9cbf5b 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -8,14 +8,23 @@ import ( ) // convenience method for creating two switches connected to each other. -func makeSwitchPair(t testing.TB, bufferSize int, chNames []string) (*Switch, *Switch) { - - chDescs := []ChannelDescriptor{} - for _, chName := range chNames { - chDescs = append(chDescs, ChannelDescriptor{ - Name: chName, - SendBufferSize: bufferSize, - RecvBufferSize: bufferSize, +func makeSwitchPair(t testing.TB, numChannels int, sendQueueCapacity int, recvBufferSize int, recvQueueCapacity int) (*Switch, *Switch, []*ChannelDescriptor) { + + // Make numChannels channels starting at byte(0x00) + chIds := []byte{} + for i := 0; i < numChannels; i++ { + chIds = append(chIds, byte(i)) + } + + // Make some channel descriptors. + chDescs := []*ChannelDescriptor{} + for _, chId := range chIds { + chDescs = append(chDescs, &ChannelDescriptor{ + Id: chId, + SendQueueCapacity: sendQueueCapacity, + RecvBufferSize: recvBufferSize, + RecvQueueCapacity: recvQueueCapacity, + DefaultPriority: 1, }) } @@ -48,13 +57,11 @@ func makeSwitchPair(t testing.TB, bufferSize int, chNames []string) (*Switch, *S // Close the server, no longer needed. l.Stop() - return s1, s2 + return s1, s2, chDescs } func TestSwitches(t *testing.T) { - - channels := []string{"ch1", "ch2", "ch3", "ch4", "ch5", "ch6", "ch7", "ch8", "ch9", "ch0"} - s1, s2 := makeSwitchPair(t, 10, channels) + s1, s2, _ := makeSwitchPair(t, 10, 10, 1024, 10) defer s1.Stop() defer s2.Stop() @@ -66,26 +73,32 @@ func TestSwitches(t *testing.T) { t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size()) } + // Broadcast a message on ch0 + s1.Broadcast(byte(0x00), String("channel zero")) // Broadcast a message on ch1 - s1.Broadcast(NewPacket("ch1", String("channel one"))) + s1.Broadcast(byte(0x01), String("channel one")) // Broadcast a message on ch2 - s1.Broadcast(NewPacket("ch2", String("channel two"))) - // Broadcast a message on ch3 - s1.Broadcast(NewPacket("ch3", String("channel three"))) + s1.Broadcast(byte(0x02), String("channel two")) // Wait for things to settle... time.Sleep(100 * time.Millisecond) - // Receive message from channel 2 and check - inMsg := s2.Receive("ch2") - if ReadString(inMsg.Reader()) != "channel two" { - t.Errorf("Unexpected received message bytes: %X = [%v]", inMsg.Bytes, ReadString(inMsg.Reader())) + // Receive message from channel 1 and check + inMsg, ok := s2.Receive(byte(0x01)) + if !ok { + t.Errorf("Failed to receive from channel one") + } + if ReadString(inMsg.Bytes.Reader()) != "channel one" { + t.Errorf("Unexpected received message bytes: %X = [%v]", inMsg.Bytes, ReadString(inMsg.Bytes.Reader())) } - // Receive message from channel 1 and check - inMsg = s2.Receive("ch1") - if ReadString(inMsg.Reader()) != "channel one" { - t.Errorf("Unexpected received message bytes: %X = [%v]", inMsg.Bytes, ReadString(inMsg.Reader())) + // Receive message from channel 0 and check + inMsg, ok = s2.Receive(byte(0x00)) + if !ok { + t.Errorf("Failed to receive from channel zero") + } + if ReadString(inMsg.Bytes.Reader()) != "channel zero" { + t.Errorf("Unexpected received message bytes: %X = [%v]", inMsg.Bytes, ReadString(inMsg.Bytes.Reader())) } } @@ -93,24 +106,24 @@ func BenchmarkSwitches(b *testing.B) { b.StopTimer() - channels := []string{"ch1", "ch2", "ch3", "ch4", "ch5", "ch6", "ch7", "ch8", "ch9", "ch0"} - s1, s2 := makeSwitchPair(b, 10, channels) + s1, s2, chDescs := makeSwitchPair(b, 10, 10, 1024, 10) defer s1.Stop() defer s2.Stop() // Create a sink on either channel to just pop off messages. - recvHandler := func(c *Switch, chName string) { + recvHandler := func(c *Switch, chId byte) { for { - it := c.Receive(chName) - if it == nil { + _, ok := c.Receive(chId) + if !ok { break } } } - for _, chName := range channels { - go recvHandler(s1, chName) - go recvHandler(s2, chName) + // Create routines to consume from recvQueues. + for _, chDesc := range chDescs { + go recvHandler(s1, chDesc.Id) + go recvHandler(s2, chDesc.Id) } // Allow time for goroutines to boot up @@ -121,9 +134,8 @@ func BenchmarkSwitches(b *testing.B) { // Send random message from one channel to another for i := 0; i < b.N; i++ { - chName := channels[i%len(channels)] - pkt := NewPacket(String(chName), ByteSlice("test data")) - nS, nF := s1.Broadcast(pkt) + chId := chDescs[i%len(chDescs)].Id + nS, nF := s1.Broadcast(chId, String("test data")) numSuccess += nS numFailure += nF }