diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 7267e6a4b..06773ebed 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -35,6 +35,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [proto/p2p] Rename `NodeInfo.default_node_id` to `node_id` (@erikgrinaker) - [libs/os] `EnsureDir` now propagates IO errors and checks the file type (@erikgrinaker) - [libs/os] Kill() and {Must,}{Read,Write}File() functions have been removed. (@alessio) + - [libs/protoio] Return number of bytes read in `Reader.ReadMsg()` (@erikgrinaker) - Blockchain Protocol @@ -59,6 +60,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES +- [p2p] \#5868 Fix inbound traffic statistics and rate limiting in `MConnection` (@erikgrinaker) - [types] \#5523 Change json naming of `PartSetHeader` within `BlockID` from `parts` to `part_set_header` (@marbar3778) - [privval] \#5638 Increase read/write timeout to 5s and calculate ping interval based on it (@JoeKash) - [blockchain/v1] [\#5701](https://github.com/tendermint/tendermint/pull/5701) Handle peers without blocks (@melekes) diff --git a/abci/types/messages.go b/abci/types/messages.go index a80663646..cced13c1d 100644 --- a/abci/types/messages.go +++ b/abci/types/messages.go @@ -24,8 +24,8 @@ func WriteMessage(msg proto.Message, w io.Writer) error { // ReadMessage reads a varint length-delimited protobuf message. func ReadMessage(r io.Reader, msg proto.Message) error { - reader := protoio.NewDelimitedReader(r, maxMsgSize) - return reader.ReadMsg(msg) + _, err := protoio.NewDelimitedReader(r, maxMsgSize).ReadMsg(msg) + return err } //---------------------------------------- diff --git a/libs/protoio/io.go b/libs/protoio/io.go index 91acbb71b..b12a1d482 100644 --- a/libs/protoio/io.go +++ b/libs/protoio/io.go @@ -46,7 +46,7 @@ type WriteCloser interface { } type Reader interface { - ReadMsg(msg proto.Message) error + ReadMsg(msg proto.Message) (int, error) } type ReadCloser interface { @@ -72,25 +72,28 @@ func getSize(v interface{}) (int, bool) { } } -// byteReader wraps an io.Reader and implements io.ByteReader. Reading one byte at a -// time is extremely slow, but this is what Amino did already, and the caller can -// wrap the reader in bufio.Reader if appropriate. +// byteReader wraps an io.Reader and implements io.ByteReader, required by +// binary.ReadUvarint(). Reading one byte at a time is extremely slow, but this +// is what Amino did previously anyway, and the caller can wrap the underlying +// reader in a bufio.Reader if appropriate. type byteReader struct { - io.Reader - bytes []byte + reader io.Reader + buf []byte + bytesRead int // keeps track of bytes read via ReadByte() } func newByteReader(r io.Reader) *byteReader { return &byteReader{ - Reader: r, - bytes: make([]byte, 1), + reader: r, + buf: make([]byte, 1), } } func (r *byteReader) ReadByte() (byte, error) { - _, err := r.Read(r.bytes) + n, err := r.reader.Read(r.buf) + r.bytesRead += n if err != nil { - return 0, err + return 0x00, err } - return r.bytes[0], nil + return r.buf[0], nil } diff --git a/libs/protoio/io_test.go b/libs/protoio/io_test.go index f4556b31f..e6e51d051 100644 --- a/libs/protoio/io_test.go +++ b/libs/protoio/io_test.go @@ -39,6 +39,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/test" + "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/protoio" ) @@ -47,6 +48,7 @@ func iotest(writer protoio.WriteCloser, reader protoio.ReadCloser) error { varint := make([]byte, binary.MaxVarintLen64) size := 1000 msgs := make([]*test.NinOptNative, size) + lens := make([]int, size) r := rand.New(rand.NewSource(time.Now().UnixNano())) for i := range msgs { msgs[i] = test.NewPopulatedNinOptNative(r, true) @@ -71,6 +73,7 @@ func iotest(writer protoio.WriteCloser, reader protoio.ReadCloser) error { if n != len(bz)+visize { return fmt.Errorf("WriteMsg() wrote %v bytes, expected %v", n, len(bz)+visize) // nolint } + lens[i] = n } if err := writer.Close(); err != nil { return err @@ -78,11 +81,13 @@ func iotest(writer protoio.WriteCloser, reader protoio.ReadCloser) error { i := 0 for { msg := &test.NinOptNative{} - if err := reader.ReadMsg(msg); err != nil { + if n, err := reader.ReadMsg(msg); err != nil { if err == io.EOF { break } return err + } else if n != lens[i] { + return fmt.Errorf("read %v bytes, expected %v", n, lens[i]) } if err := msg.VerboseEqual(msgs[i]); err != nil { return err @@ -116,21 +121,17 @@ func TestVarintNormal(t *testing.T) { buf := newBuffer() writer := protoio.NewDelimitedWriter(buf) reader := protoio.NewDelimitedReader(buf, 1024*1024) - if err := iotest(writer, reader); err != nil { - t.Error(err) - } - if !buf.closed { - t.Fatalf("did not close buffer") - } + err := iotest(writer, reader) + require.NoError(t, err) + require.True(t, buf.closed, "did not close buffer") } func TestVarintNoClose(t *testing.T) { buf := bytes.NewBuffer(nil) writer := protoio.NewDelimitedWriter(buf) reader := protoio.NewDelimitedReader(buf, 1024*1024) - if err := iotest(writer, reader); err != nil { - t.Error(err) - } + err := iotest(writer, reader) + require.NoError(t, err) } // issue 32 @@ -138,11 +139,8 @@ func TestVarintMaxSize(t *testing.T) { buf := newBuffer() writer := protoio.NewDelimitedWriter(buf) reader := protoio.NewDelimitedReader(buf, 20) - if err := iotest(writer, reader); err == nil { - t.Error(err) - } else { - t.Logf("%s", err) - } + err := iotest(writer, reader) + require.Error(t, err) } func TestVarintError(t *testing.T) { @@ -150,8 +148,37 @@ func TestVarintError(t *testing.T) { buf.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f}) reader := protoio.NewDelimitedReader(buf, 1024*1024) msg := &test.NinOptNative{} - err := reader.ReadMsg(msg) - if err == nil { - t.Fatalf("Expected error") - } + n, err := reader.ReadMsg(msg) + require.Error(t, err) + require.Equal(t, 10, n) +} + +func TestVarintTruncated(t *testing.T) { + buf := newBuffer() + buf.Write([]byte{0xff, 0xff}) + reader := protoio.NewDelimitedReader(buf, 1024*1024) + msg := &test.NinOptNative{} + n, err := reader.ReadMsg(msg) + require.Error(t, err) + require.Equal(t, 2, n) +} + +func TestShort(t *testing.T) { + buf := newBuffer() + + varintBuf := make([]byte, binary.MaxVarintLen64) + varintLen := binary.PutUvarint(varintBuf, 100) + _, err := buf.Write(varintBuf[:varintLen]) + require.NoError(t, err) + + bz, err := proto.Marshal(&test.NinOptNative{Field15: []byte{0x01, 0x02, 0x03}}) + require.NoError(t, err) + buf.Write(bz) + + reader := protoio.NewDelimitedReader(buf, 1024*1024) + require.NoError(t, err) + msg := &test.NinOptNative{} + n, err := reader.ReadMsg(msg) + require.Error(t, err) + require.Equal(t, varintLen+len(bz), n) } diff --git a/libs/protoio/reader.go b/libs/protoio/reader.go index 15a84899f..66eed707c 100644 --- a/libs/protoio/reader.go +++ b/libs/protoio/reader.go @@ -39,41 +39,58 @@ import ( "github.com/gogo/protobuf/proto" ) -// NewDelimitedReader reads varint-delimited Protobuf messages from a reader. Unlike the gogoproto -// NewDelimitedReader, this does not buffer the reader, which may cause poor performance but is -// necessary when only reading single messages (e.g. in the p2p package). +// NewDelimitedReader reads varint-delimited Protobuf messages from a reader. +// Unlike the gogoproto NewDelimitedReader, this does not buffer the reader, +// which may cause poor performance but is necessary when only reading single +// messages (e.g. in the p2p package). It also returns the number of bytes +// read, which is necessary for the p2p package. func NewDelimitedReader(r io.Reader, maxSize int) ReadCloser { var closer io.Closer if c, ok := r.(io.Closer); ok { closer = c } - return &varintReader{newByteReader(r), nil, maxSize, closer} + return &varintReader{r, nil, maxSize, closer} } type varintReader struct { - r *byteReader + r io.Reader buf []byte maxSize int closer io.Closer } -func (r *varintReader) ReadMsg(msg proto.Message) error { - length64, err := binary.ReadUvarint(newByteReader(r.r)) +func (r *varintReader) ReadMsg(msg proto.Message) (int, error) { + // ReadUvarint needs an io.ByteReader, and we also need to keep track of the + // number of bytes read, so we use our own byteReader. This can't be + // buffered, so the caller should pass a buffered io.Reader to avoid poor + // performance. + byteReader := newByteReader(r.r) + l, err := binary.ReadUvarint(byteReader) + n := byteReader.bytesRead if err != nil { - return err + return n, err } - length := int(length64) - if length < 0 || length > r.maxSize { - return fmt.Errorf("message exceeds max size (%v > %v)", length, r.maxSize) + + // Make sure length doesn't overflow the native int size (e.g. 32-bit), + // and that the returned sum of n+length doesn't overflow either. + length := int(l) + if l >= uint64(^uint(0)>>1) || length < 0 || n+length < 0 { + return n, fmt.Errorf("invalid out-of-range message length %v", l) + } + if length > r.maxSize { + return n, fmt.Errorf("message exceeds max size (%v > %v)", length, r.maxSize) } + if len(r.buf) < length { r.buf = make([]byte, length) } buf := r.buf[:length] - if _, err := io.ReadFull(r.r, buf); err != nil { - return err + nr, err := io.ReadFull(r.r, buf) + n += nr + if err != nil { + return n, err } - return proto.Unmarshal(buf, msg) + return n, proto.Unmarshal(buf, msg) } func (r *varintReader) Close() error { @@ -84,5 +101,6 @@ func (r *varintReader) Close() error { } func UnmarshalDelimited(data []byte, msg proto.Message) error { - return NewDelimitedReader(bytes.NewReader(data), len(data)).ReadMsg(msg) + _, err := NewDelimitedReader(bytes.NewReader(data), len(data)).ReadMsg(msg) + return err } diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index e6455b0cf..10892564a 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -584,7 +584,8 @@ FOR_LOOP: // Read packet type var packet tmp2p.Packet - err := protoReader.ReadMsg(&packet) + _n, err := protoReader.ReadMsg(&packet) + c.recvMonitor.Update(_n) if err != nil { // stopServices was invoked and we are shutting down // receiving is excpected to fail since we will close the connection diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 12f68f437..dec59d330 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -180,7 +180,7 @@ func TestMConnectionPongTimeoutResultsInError(t *testing.T) { go func() { // read ping var pkt tmp2p.Packet - err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&pkt) + _, err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&pkt) require.NoError(t, err) serverGotPing <- struct{}{} }() @@ -230,7 +230,7 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) { go func() { // read ping (one byte) var packet tmp2p.Packet - err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&packet) + _, err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&packet) require.NoError(t, err) serverGotPing <- struct{}{} @@ -277,19 +277,19 @@ func TestMConnectionMultiplePings(t *testing.T) { _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{})) require.NoError(t, err) - err = protoReader.ReadMsg(&pkt) + _, err = protoReader.ReadMsg(&pkt) require.NoError(t, err) _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{})) require.NoError(t, err) - err = protoReader.ReadMsg(&pkt) + _, err = protoReader.ReadMsg(&pkt) require.NoError(t, err) _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{})) require.NoError(t, err) - err = protoReader.ReadMsg(&pkt) + _, err = protoReader.ReadMsg(&pkt) require.NoError(t, err) assert.True(t, mconn.IsRunning()) @@ -322,7 +322,7 @@ func TestMConnectionPingPongs(t *testing.T) { var pkt tmp2p.PacketPing // read ping - err = protoReader.ReadMsg(&pkt) + _, err = protoReader.ReadMsg(&pkt) require.NoError(t, err) serverGotPing <- struct{}{} @@ -333,7 +333,7 @@ func TestMConnectionPingPongs(t *testing.T) { time.Sleep(mconn.config.PingInterval) // read ping - err = protoReader.ReadMsg(&pkt) + _, err = protoReader.ReadMsg(&pkt) require.NoError(t, err) serverGotPing <- struct{}{} diff --git a/p2p/conn/secret_connection.go b/p2p/conn/secret_connection.go index 041224772..febb975f3 100644 --- a/p2p/conn/secret_connection.go +++ b/p2p/conn/secret_connection.go @@ -313,7 +313,7 @@ func shareEphPubKey(conn io.ReadWriter, locEphPub *[32]byte) (remEphPub *[32]byt }, func(_ int) (val interface{}, abort bool, err error) { var bytes gogotypes.BytesValue - err = protoio.NewDelimitedReader(conn, 1024*1024).ReadMsg(&bytes) + _, err = protoio.NewDelimitedReader(conn, 1024*1024).ReadMsg(&bytes) if err != nil { return nil, true, err // abort } @@ -419,7 +419,7 @@ func shareAuthSignature(sc io.ReadWriter, pubKey crypto.PubKey, signature []byte }, func(_ int) (val interface{}, abort bool, err error) { var pba tmp2p.AuthSigMessage - err = protoio.NewDelimitedReader(sc, 1024*1024).ReadMsg(&pba) + _, err = protoio.NewDelimitedReader(sc, 1024*1024).ReadMsg(&pba) if err != nil { return nil, true, err // abort } diff --git a/p2p/transport_mconn.go b/p2p/transport_mconn.go index d59a58cea..186162ca5 100644 --- a/p2p/transport_mconn.go +++ b/p2p/transport_mconn.go @@ -539,7 +539,8 @@ func (c *mConnConnection) handshake() (NodeInfo, error) { chErr <- err }() go func() { - chErr <- protoio.NewDelimitedReader(c.secretConn, MaxNodeInfoSize()).ReadMsg(&pbNodeInfo) + _, err := protoio.NewDelimitedReader(c.secretConn, MaxNodeInfoSize()).ReadMsg(&pbNodeInfo) + chErr <- err }() for i := 0; i < cap(chErr); i++ { if err := <-chErr; err != nil { diff --git a/privval/secret_connection.go b/privval/secret_connection.go index 10f7653d7..2b59e57fe 100644 --- a/privval/secret_connection.go +++ b/privval/secret_connection.go @@ -317,7 +317,7 @@ func shareEphPubKey(conn io.ReadWriter, locEphPub *[32]byte) (remEphPub *[32]byt }, func(_ int) (val interface{}, abort bool, err error) { var bytes gogotypes.BytesValue - err = protoio.NewDelimitedReader(conn, 1024*1024).ReadMsg(&bytes) + _, err = protoio.NewDelimitedReader(conn, 1024*1024).ReadMsg(&bytes) if err != nil { return nil, true, err // abort } @@ -423,7 +423,7 @@ func shareAuthSignature(sc io.ReadWriter, pubKey crypto.PubKey, signature []byte }, func(_ int) (val interface{}, abort bool, err error) { var pba tmprivval.AuthSigMessage - err = protoio.NewDelimitedReader(sc, 1024*1024).ReadMsg(&pba) + _, err = protoio.NewDelimitedReader(sc, 1024*1024).ReadMsg(&pba) if err != nil { return nil, true, err // abort } diff --git a/privval/signer_endpoint.go b/privval/signer_endpoint.go index eb2ed442f..e1594438c 100644 --- a/privval/signer_endpoint.go +++ b/privval/signer_endpoint.go @@ -96,7 +96,7 @@ func (se *signerEndpoint) ReadMessage() (msg privvalproto.Message, err error) { } const maxRemoteSignerMsgSize = 1024 * 10 protoReader := protoio.NewDelimitedReader(se.conn, maxRemoteSignerMsgSize) - err = protoReader.ReadMsg(&msg) + _, err = protoReader.ReadMsg(&msg) if _, ok := err.(timeoutError); ok { if err != nil { err = fmt.Errorf("%v: %w", err, ErrReadTimeout)