diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 6c53ab6d2..0bada29ab 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -15,6 +15,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - P2P Protocol - Go API + - [libs/protoio] Return number of bytes read in `Reader.ReadMsg()` (@erikgrinaker) - Blockchain Protocol @@ -23,3 +24,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### IMPROVEMENTS ### BUG FIXES + +- [p2p] \#5868 Fix inbound traffic statistics and rate limiting in `MConnection` (@erikgrinaker) diff --git a/libs/protoio/io.go b/libs/protoio/io.go index 91acbb71b..b12a1d482 100644 --- a/libs/protoio/io.go +++ b/libs/protoio/io.go @@ -46,7 +46,7 @@ type WriteCloser interface { } type Reader interface { - ReadMsg(msg proto.Message) error + ReadMsg(msg proto.Message) (int, error) } type ReadCloser interface { @@ -72,25 +72,28 @@ func getSize(v interface{}) (int, bool) { } } -// byteReader wraps an io.Reader and implements io.ByteReader. Reading one byte at a -// time is extremely slow, but this is what Amino did already, and the caller can -// wrap the reader in bufio.Reader if appropriate. +// byteReader wraps an io.Reader and implements io.ByteReader, required by +// binary.ReadUvarint(). Reading one byte at a time is extremely slow, but this +// is what Amino did previously anyway, and the caller can wrap the underlying +// reader in a bufio.Reader if appropriate. type byteReader struct { - io.Reader - bytes []byte + reader io.Reader + buf []byte + bytesRead int // keeps track of bytes read via ReadByte() } func newByteReader(r io.Reader) *byteReader { return &byteReader{ - Reader: r, - bytes: make([]byte, 1), + reader: r, + buf: make([]byte, 1), } } func (r *byteReader) ReadByte() (byte, error) { - _, err := r.Read(r.bytes) + n, err := r.reader.Read(r.buf) + r.bytesRead += n if err != nil { - return 0, err + return 0x00, err } - return r.bytes[0], nil + return r.buf[0], nil } diff --git a/libs/protoio/io_test.go b/libs/protoio/io_test.go index f4556b31f..e6e51d051 100644 --- a/libs/protoio/io_test.go +++ b/libs/protoio/io_test.go @@ -39,6 +39,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/test" + "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/protoio" ) @@ -47,6 +48,7 @@ func iotest(writer protoio.WriteCloser, reader protoio.ReadCloser) error { varint := make([]byte, binary.MaxVarintLen64) size := 1000 msgs := make([]*test.NinOptNative, size) + lens := make([]int, size) r := rand.New(rand.NewSource(time.Now().UnixNano())) for i := range msgs { msgs[i] = test.NewPopulatedNinOptNative(r, true) @@ -71,6 +73,7 @@ func iotest(writer protoio.WriteCloser, reader protoio.ReadCloser) error { if n != len(bz)+visize { return fmt.Errorf("WriteMsg() wrote %v bytes, expected %v", n, len(bz)+visize) // nolint } + lens[i] = n } if err := writer.Close(); err != nil { return err @@ -78,11 +81,13 @@ func iotest(writer protoio.WriteCloser, reader protoio.ReadCloser) error { i := 0 for { msg := &test.NinOptNative{} - if err := reader.ReadMsg(msg); err != nil { + if n, err := reader.ReadMsg(msg); err != nil { if err == io.EOF { break } return err + } else if n != lens[i] { + return fmt.Errorf("read %v bytes, expected %v", n, lens[i]) } if err := msg.VerboseEqual(msgs[i]); err != nil { return err @@ -116,21 +121,17 @@ func TestVarintNormal(t *testing.T) { buf := newBuffer() writer := protoio.NewDelimitedWriter(buf) reader := protoio.NewDelimitedReader(buf, 1024*1024) - if err := iotest(writer, reader); err != nil { - t.Error(err) - } - if !buf.closed { - t.Fatalf("did not close buffer") - } + err := iotest(writer, reader) + require.NoError(t, err) + require.True(t, buf.closed, "did not close buffer") } func TestVarintNoClose(t *testing.T) { buf := bytes.NewBuffer(nil) writer := protoio.NewDelimitedWriter(buf) reader := protoio.NewDelimitedReader(buf, 1024*1024) - if err := iotest(writer, reader); err != nil { - t.Error(err) - } + err := iotest(writer, reader) + require.NoError(t, err) } // issue 32 @@ -138,11 +139,8 @@ func TestVarintMaxSize(t *testing.T) { buf := newBuffer() writer := protoio.NewDelimitedWriter(buf) reader := protoio.NewDelimitedReader(buf, 20) - if err := iotest(writer, reader); err == nil { - t.Error(err) - } else { - t.Logf("%s", err) - } + err := iotest(writer, reader) + require.Error(t, err) } func TestVarintError(t *testing.T) { @@ -150,8 +148,37 @@ func TestVarintError(t *testing.T) { buf.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f}) reader := protoio.NewDelimitedReader(buf, 1024*1024) msg := &test.NinOptNative{} - err := reader.ReadMsg(msg) - if err == nil { - t.Fatalf("Expected error") - } + n, err := reader.ReadMsg(msg) + require.Error(t, err) + require.Equal(t, 10, n) +} + +func TestVarintTruncated(t *testing.T) { + buf := newBuffer() + buf.Write([]byte{0xff, 0xff}) + reader := protoio.NewDelimitedReader(buf, 1024*1024) + msg := &test.NinOptNative{} + n, err := reader.ReadMsg(msg) + require.Error(t, err) + require.Equal(t, 2, n) +} + +func TestShort(t *testing.T) { + buf := newBuffer() + + varintBuf := make([]byte, binary.MaxVarintLen64) + varintLen := binary.PutUvarint(varintBuf, 100) + _, err := buf.Write(varintBuf[:varintLen]) + require.NoError(t, err) + + bz, err := proto.Marshal(&test.NinOptNative{Field15: []byte{0x01, 0x02, 0x03}}) + require.NoError(t, err) + buf.Write(bz) + + reader := protoio.NewDelimitedReader(buf, 1024*1024) + require.NoError(t, err) + msg := &test.NinOptNative{} + n, err := reader.ReadMsg(msg) + require.Error(t, err) + require.Equal(t, varintLen+len(bz), n) } diff --git a/libs/protoio/reader.go b/libs/protoio/reader.go index 15a84899f..66eed707c 100644 --- a/libs/protoio/reader.go +++ b/libs/protoio/reader.go @@ -39,41 +39,58 @@ import ( "github.com/gogo/protobuf/proto" ) -// NewDelimitedReader reads varint-delimited Protobuf messages from a reader. Unlike the gogoproto -// NewDelimitedReader, this does not buffer the reader, which may cause poor performance but is -// necessary when only reading single messages (e.g. in the p2p package). +// NewDelimitedReader reads varint-delimited Protobuf messages from a reader. +// Unlike the gogoproto NewDelimitedReader, this does not buffer the reader, +// which may cause poor performance but is necessary when only reading single +// messages (e.g. in the p2p package). It also returns the number of bytes +// read, which is necessary for the p2p package. func NewDelimitedReader(r io.Reader, maxSize int) ReadCloser { var closer io.Closer if c, ok := r.(io.Closer); ok { closer = c } - return &varintReader{newByteReader(r), nil, maxSize, closer} + return &varintReader{r, nil, maxSize, closer} } type varintReader struct { - r *byteReader + r io.Reader buf []byte maxSize int closer io.Closer } -func (r *varintReader) ReadMsg(msg proto.Message) error { - length64, err := binary.ReadUvarint(newByteReader(r.r)) +func (r *varintReader) ReadMsg(msg proto.Message) (int, error) { + // ReadUvarint needs an io.ByteReader, and we also need to keep track of the + // number of bytes read, so we use our own byteReader. This can't be + // buffered, so the caller should pass a buffered io.Reader to avoid poor + // performance. + byteReader := newByteReader(r.r) + l, err := binary.ReadUvarint(byteReader) + n := byteReader.bytesRead if err != nil { - return err + return n, err } - length := int(length64) - if length < 0 || length > r.maxSize { - return fmt.Errorf("message exceeds max size (%v > %v)", length, r.maxSize) + + // Make sure length doesn't overflow the native int size (e.g. 32-bit), + // and that the returned sum of n+length doesn't overflow either. + length := int(l) + if l >= uint64(^uint(0)>>1) || length < 0 || n+length < 0 { + return n, fmt.Errorf("invalid out-of-range message length %v", l) + } + if length > r.maxSize { + return n, fmt.Errorf("message exceeds max size (%v > %v)", length, r.maxSize) } + if len(r.buf) < length { r.buf = make([]byte, length) } buf := r.buf[:length] - if _, err := io.ReadFull(r.r, buf); err != nil { - return err + nr, err := io.ReadFull(r.r, buf) + n += nr + if err != nil { + return n, err } - return proto.Unmarshal(buf, msg) + return n, proto.Unmarshal(buf, msg) } func (r *varintReader) Close() error { @@ -84,5 +101,6 @@ func (r *varintReader) Close() error { } func UnmarshalDelimited(data []byte, msg proto.Message) error { - return NewDelimitedReader(bytes.NewReader(data), len(data)).ReadMsg(msg) + _, err := NewDelimitedReader(bytes.NewReader(data), len(data)).ReadMsg(msg) + return err } diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index b5290116d..65495074f 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -583,7 +583,8 @@ FOR_LOOP: // Read packet type var packet tmp2p.Packet - err := protoReader.ReadMsg(&packet) + _n, err := protoReader.ReadMsg(&packet) + c.recvMonitor.Update(_n) if err != nil { // stopServices was invoked and we are shutting down // receiving is excpected to fail since we will close the connection diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index a189e8b89..6dadfb486 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -185,7 +185,7 @@ func TestMConnectionPongTimeoutResultsInError(t *testing.T) { go func() { // read ping var pkt tmp2p.Packet - err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&pkt) + _, err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&pkt) require.NoError(t, err) serverGotPing <- struct{}{} }() @@ -236,7 +236,7 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) { go func() { // read ping (one byte) var packet tmp2p.Packet - err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&packet) + _, err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&packet) require.NoError(t, err) serverGotPing <- struct{}{} @@ -284,19 +284,19 @@ func TestMConnectionMultiplePings(t *testing.T) { _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{})) require.NoError(t, err) - err = protoReader.ReadMsg(&pkt) + _, err = protoReader.ReadMsg(&pkt) require.NoError(t, err) _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{})) require.NoError(t, err) - err = protoReader.ReadMsg(&pkt) + _, err = protoReader.ReadMsg(&pkt) require.NoError(t, err) _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{})) require.NoError(t, err) - err = protoReader.ReadMsg(&pkt) + _, err = protoReader.ReadMsg(&pkt) require.NoError(t, err) assert.True(t, mconn.IsRunning()) @@ -331,7 +331,7 @@ func TestMConnectionPingPongs(t *testing.T) { var pkt tmp2p.PacketPing // read ping - err = protoReader.ReadMsg(&pkt) + _, err = protoReader.ReadMsg(&pkt) require.NoError(t, err) serverGotPing <- struct{}{} @@ -342,7 +342,7 @@ func TestMConnectionPingPongs(t *testing.T) { time.Sleep(mconn.config.PingInterval) // read ping - err = protoReader.ReadMsg(&pkt) + _, err = protoReader.ReadMsg(&pkt) require.NoError(t, err) serverGotPing <- struct{}{} diff --git a/p2p/conn/secret_connection.go b/p2p/conn/secret_connection.go index 041224772..febb975f3 100644 --- a/p2p/conn/secret_connection.go +++ b/p2p/conn/secret_connection.go @@ -313,7 +313,7 @@ func shareEphPubKey(conn io.ReadWriter, locEphPub *[32]byte) (remEphPub *[32]byt }, func(_ int) (val interface{}, abort bool, err error) { var bytes gogotypes.BytesValue - err = protoio.NewDelimitedReader(conn, 1024*1024).ReadMsg(&bytes) + _, err = protoio.NewDelimitedReader(conn, 1024*1024).ReadMsg(&bytes) if err != nil { return nil, true, err // abort } @@ -419,7 +419,7 @@ func shareAuthSignature(sc io.ReadWriter, pubKey crypto.PubKey, signature []byte }, func(_ int) (val interface{}, abort bool, err error) { var pba tmp2p.AuthSigMessage - err = protoio.NewDelimitedReader(sc, 1024*1024).ReadMsg(&pba) + _, err = protoio.NewDelimitedReader(sc, 1024*1024).ReadMsg(&pba) if err != nil { return nil, true, err // abort } diff --git a/p2p/transport.go b/p2p/transport.go index e597ac0a1..8448888cb 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -537,7 +537,7 @@ func handshake( }(errc, c) go func(errc chan<- error, c net.Conn) { protoReader := protoio.NewDelimitedReader(c, MaxNodeInfoSize()) - err := protoReader.ReadMsg(&pbpeerNodeInfo) + _, err := protoReader.ReadMsg(&pbpeerNodeInfo) errc <- err }(errc, c) diff --git a/p2p/transport_test.go b/p2p/transport_test.go index 9b81dcd63..77db5a3bd 100644 --- a/p2p/transport_test.go +++ b/p2p/transport_test.go @@ -594,7 +594,7 @@ func TestTransportHandshake(t *testing.T) { ) protoReader := protoio.NewDelimitedReader(c, MaxNodeInfoSize()) - err := protoReader.ReadMsg(&pbni) + _, err := protoReader.ReadMsg(&pbni) if err != nil { t.Error(err) } diff --git a/privval/signer_endpoint.go b/privval/signer_endpoint.go index eb2ed442f..e1594438c 100644 --- a/privval/signer_endpoint.go +++ b/privval/signer_endpoint.go @@ -96,7 +96,7 @@ func (se *signerEndpoint) ReadMessage() (msg privvalproto.Message, err error) { } const maxRemoteSignerMsgSize = 1024 * 10 protoReader := protoio.NewDelimitedReader(se.conn, maxRemoteSignerMsgSize) - err = protoReader.ReadMsg(&msg) + _, err = protoReader.ReadMsg(&msg) if _, ok := err.(timeoutError); ok { if err != nil { err = fmt.Errorf("%v: %w", err, ErrReadTimeout)