diff --git a/binary/int.go b/binary/int.go index 1a0223674..d64d37473 100644 --- a/binary/int.go +++ b/binary/int.go @@ -156,8 +156,10 @@ func (self Int16) ByteSize() int { } func (self Int16) WriteTo(w io.Writer) (int64, error) { - err := binary.Write(w, binary.LittleEndian, int16(self)) - return 2, err + buf := []byte{0, 0} + binary.LittleEndian.PutUint16(buf, uint16(self)) + n, err := w.Write(buf) + return int64(n), err } func ReadInt16Safe(r io.Reader) (Int16, error) { @@ -196,8 +198,10 @@ func (self UInt16) ByteSize() int { } func (self UInt16) WriteTo(w io.Writer) (int64, error) { - err := binary.Write(w, binary.LittleEndian, uint16(self)) - return 2, err + buf := []byte{0, 0} + binary.LittleEndian.PutUint16(buf, uint16(self)) + n, err := w.Write(buf) + return int64(n), err } func ReadUInt16Safe(r io.Reader) (UInt16, error) { @@ -236,8 +240,10 @@ func (self Int32) ByteSize() int { } func (self Int32) WriteTo(w io.Writer) (int64, error) { - err := binary.Write(w, binary.LittleEndian, int32(self)) - return 4, err + buf := []byte{0, 0, 0, 0} + binary.LittleEndian.PutUint32(buf, uint32(self)) + n, err := w.Write(buf) + return int64(n), err } func ReadInt32Safe(r io.Reader) (Int32, error) { @@ -276,8 +282,10 @@ func (self UInt32) ByteSize() int { } func (self UInt32) WriteTo(w io.Writer) (int64, error) { - err := binary.Write(w, binary.LittleEndian, uint32(self)) - return 4, err + buf := []byte{0, 0, 0, 0} + binary.LittleEndian.PutUint32(buf, uint32(self)) + n, err := w.Write(buf) + return int64(n), err } func ReadUInt32Safe(r io.Reader) (UInt32, error) { @@ -316,8 +324,10 @@ func (self Int64) ByteSize() int { } func (self Int64) WriteTo(w io.Writer) (int64, error) { - err := binary.Write(w, binary.LittleEndian, int64(self)) - return 8, err + buf := []byte{0, 0, 0, 0, 0, 0, 0, 0} + binary.LittleEndian.PutUint64(buf, uint64(self)) + n, err := w.Write(buf) + return int64(n), err } func ReadInt64Safe(r io.Reader) (Int64, error) { @@ -356,8 +366,10 @@ func (self UInt64) ByteSize() int { } func (self UInt64) WriteTo(w io.Writer) (int64, error) { - err := binary.Write(w, binary.LittleEndian, uint64(self)) - return 8, err + buf := []byte{0, 0, 0, 0, 0, 0, 0, 0} + binary.LittleEndian.PutUint64(buf, uint64(self)) + n, err := w.Write(buf) + return int64(n), err } func ReadUInt64Safe(r io.Reader) (UInt64, error) { @@ -396,8 +408,10 @@ func (self Int) ByteSize() int { } func (self Int) WriteTo(w io.Writer) (int64, error) { - err := binary.Write(w, binary.LittleEndian, int64(self)) - return 8, err + buf := []byte{0, 0, 0, 0, 0, 0, 0, 0} + binary.LittleEndian.PutUint64(buf, uint64(self)) + n, err := w.Write(buf) + return int64(n), err } func ReadInt(r io.Reader) Int { @@ -428,8 +442,10 @@ func (self UInt) ByteSize() int { } func (self UInt) WriteTo(w io.Writer) (int64, error) { - err := binary.Write(w, binary.LittleEndian, uint64(self)) - return 8, err + buf := []byte{0, 0, 0, 0, 0, 0, 0, 0} + binary.LittleEndian.PutUint64(buf, uint64(self)) + n, err := w.Write(buf) + return int64(n), err } func ReadUInt(r io.Reader) UInt { diff --git a/peer/connection.go b/peer/connection.go index 3317057f0..d143dd7ec 100644 --- a/peer/connection.go +++ b/peer/connection.go @@ -1,6 +1,7 @@ package peer import ( + "bufio" "fmt" "net" "sync/atomic" @@ -22,6 +23,8 @@ type Connection struct { sendQueue chan Packet // never closes conn net.Conn + bufWriter *bufio.Writer + bufReader *bufio.Reader quit chan struct{} stopped uint32 pingDebouncer *Debouncer @@ -38,6 +41,8 @@ func NewConnection(conn net.Conn) *Connection { return &Connection{ sendQueue: make(chan Packet, OUT_QUEUE_SIZE), conn: conn, + bufWriter: bufio.NewWriterSize(conn, 1024), + bufReader: bufio.NewReaderSize(conn, 1024), quit: make(chan struct{}), pingDebouncer: NewDebouncer(PING_TIMEOUT_MINUTES * time.Minute), pong: make(chan struct{}), @@ -102,16 +107,16 @@ FOR_LOOP: var err error select { case <-c.pingDebouncer.Ch: - _, err = PACKET_TYPE_PING.WriteTo(c.conn) + _, err = PACKET_TYPE_PING.WriteTo(c.bufWriter) case sendPkt := <-c.sendQueue: log.Tracef("Found pkt from sendQueue. Writing pkt to underlying connection") - _, err = PACKET_TYPE_MSG.WriteTo(c.conn) + _, err = PACKET_TYPE_MSG.WriteTo(c.bufWriter) if err != nil { break } - _, err = sendPkt.WriteTo(c.conn) + _, err = sendPkt.WriteTo(c.bufWriter) case <-c.pong: - _, err = PACKET_TYPE_PONG.WriteTo(c.conn) + _, err = PACKET_TYPE_PONG.WriteTo(c.bufWriter) case <-c.quit: break FOR_LOOP } @@ -138,7 +143,7 @@ func (c *Connection) recvHandler(channels map[String]*Channel) { FOR_LOOP: for { - pktType, err := ReadUInt8Safe(c.conn) + pktType, err := ReadUInt8Safe(c.bufReader) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { log.Infof("%v failed @ recvHandler", c) @@ -155,7 +160,7 @@ FOR_LOOP: case PACKET_TYPE_PONG: // do nothing case PACKET_TYPE_MSG: - pkt, err := ReadPacketSafe(c.conn) + pkt, err := ReadPacketSafe(c.bufReader) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { log.Infof("%v failed @ recvHandler", c) diff --git a/peer/listener.go b/peer/listener.go index 0796479d1..5060d434d 100644 --- a/peer/listener.go +++ b/peer/listener.go @@ -8,6 +8,7 @@ import ( ) const ( + // TODO REMOVE DEFAULT_PORT = 8001 )