From 3e92899bd917a624fa0d3aca5c1fe7109fe5adab Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 10 Dec 2021 09:36:43 -0800 Subject: [PATCH] internal/libs/protoio: optimize MarshalDelimited by plain byteslice allocations+sync.Pool (#7325) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Noticed in profiles that invoking *VoteSignBytes always created a bytes.Buffer, then discarded it inside protoio.MarshalDelimited. I dug further and examined the call paths and noticed that we unconditionally create the bytes.Buffer, even though we might have proto messages (in the common case) that implement MarshalTo([]byte), and invoked varintWriter. Instead by inlining this case, we skip a bunch of allocations and CPU cycles, which then reflects properly on all calling functions. Here are the benchmark results: ```shell $ benchstat before.txt after.txt name old time/op new time/op delta types.VoteSignBytes-8 705ns ± 3% 573ns ± 6% -18.74% (p=0.000 n=18+20) types.CommitVoteSignBytes-8 8.15µs ± 9% 6.81µs ± 4% -16.51% (p=0.000 n=20+19) protoio.MarshalDelimitedWithMarshalTo-8 788ns ± 8% 772ns ± 3% -2.01% (p=0.050 n=20+20) protoio.MarshalDelimitedNoMarshalTo-8 989ns ± 4% 845ns ± 2% -14.51% (p=0.000 n=20+18) name old alloc/op new alloc/op delta types.VoteSignBytes-8 792B ± 0% 600B ± 0% -24.24% (p=0.000 n=20+20) types.CommitVoteSignBytes-8 9.52kB ± 0% 7.60kB ± 0% -20.17% (p=0.000 n=20+20) protoio.MarshalDelimitedNoMarshalTo-8 808B ± 0% 440B ± 0% -45.54% (p=0.000 n=20+20) name old allocs/op new allocs/op delta types.VoteSignBytes-8 13.0 ± 0% 10.0 ± 0% -23.08% (p=0.000 n=20+20) types.CommitVoteSignBytes-8 140 ± 0% 110 ± 0% -21.43% (p=0.000 n=20+20) protoio.MarshalDelimitedNoMarshalTo-8 10.0 ± 0% 7.0 ± 0% -30.00% (p=0.000 n=20+20) ``` Thanks to Tharsis who tasked me to help them increase TPS and who are keen on improving Tendermint and efficiency. --- CHANGELOG_PENDING.md | 1 + internal/libs/protoio/writer.go | 40 +++++++++++- internal/libs/protoio/writer_test.go | 91 ++++++++++++++++++++++++++++ types/vote_test.go | 49 +++++++++++++++ 4 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 internal/libs/protoio/writer_test.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 9f39edaa6..70b307961 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -44,6 +44,7 @@ Special thanks to external contributors on this release: - [mempool, rpc] \#7041 Add removeTx operation to the RPC layer. (@tychoish) ### IMPROVEMENTS +- [internal/protoio] \#7325 Optimized `MarshalDelimited` by inlining the common case and using a `sync.Pool` in the worst case. (@odeke-em) - [pubsub] \#7319 Performance improvements for the event query API (@creachadair) diff --git a/internal/libs/protoio/writer.go b/internal/libs/protoio/writer.go index d4c66798f..93be1f851 100644 --- a/internal/libs/protoio/writer.go +++ b/internal/libs/protoio/writer.go @@ -34,6 +34,7 @@ import ( "bytes" "encoding/binary" "io" + "sync" "github.com/gogo/protobuf/proto" ) @@ -90,11 +91,44 @@ func (w *varintWriter) Close() error { return nil } +func varintWrittenBytes(m marshaler, size int) ([]byte, error) { + buf := make([]byte, size+binary.MaxVarintLen64) + n := binary.PutUvarint(buf, uint64(size)) + nw, err := m.MarshalTo(buf[n:]) + if err != nil { + return nil, err + } + return buf[:n+nw], nil +} + +var bufPool = &sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + func MarshalDelimited(msg proto.Message) ([]byte, error) { - var buf bytes.Buffer - _, err := NewDelimitedWriter(&buf).WriteMsg(msg) + // The goal here is to write proto message as is knowning already if + // the exact size can be retrieved and if so just use that. + if m, ok := msg.(marshaler); ok { + size, ok := getSize(msg) + if ok { + return varintWrittenBytes(m, size) + } + } + + // Otherwise, go down the route of using proto.Marshal, + // and use the buffer pool to retrieve a writer. + buf := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(buf) + buf.Reset() + _, err := NewDelimitedWriter(buf).WriteMsg(msg) if err != nil { return nil, err } - return buf.Bytes(), nil + // Given that we are reusing buffers, we should + // make a copy of the returned bytes. + bytesCopy := make([]byte, buf.Len()) + copy(bytesCopy, buf.Bytes()) + return bytesCopy, nil } diff --git a/internal/libs/protoio/writer_test.go b/internal/libs/protoio/writer_test.go new file mode 100644 index 000000000..a4c0b6552 --- /dev/null +++ b/internal/libs/protoio/writer_test.go @@ -0,0 +1,91 @@ +package protoio_test + +import ( + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/crypto/tmhash" + "github.com/tendermint/tendermint/internal/libs/protoio" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + "github.com/tendermint/tendermint/types" +) + +func aVote() *types.Vote { + var stamp, err = time.Parse(types.TimeFormat, "2017-12-25T03:00:01.234Z") + if err != nil { + panic(err) + } + + return &types.Vote{ + Type: tmproto.SignedMsgType(byte(tmproto.PrevoteType)), + Height: 12345, + Round: 2, + Timestamp: stamp, + BlockID: types.BlockID{ + Hash: tmhash.Sum([]byte("blockID_hash")), + PartSetHeader: types.PartSetHeader{ + Total: 1000000, + Hash: tmhash.Sum([]byte("blockID_part_set_header_hash")), + }, + }, + ValidatorAddress: crypto.AddressHash([]byte("validator_address")), + ValidatorIndex: 56789, + } +} + +type excludedMarshalTo struct { + msg proto.Message +} + +func (emt *excludedMarshalTo) ProtoMessage() {} +func (emt *excludedMarshalTo) String() string { + return emt.msg.String() +} +func (emt *excludedMarshalTo) Reset() { + emt.msg.Reset() +} +func (emt *excludedMarshalTo) Marshal() ([]byte, error) { + return proto.Marshal(emt.msg) +} + +var _ proto.Message = (*excludedMarshalTo)(nil) + +var sink interface{} + +func BenchmarkMarshalDelimitedWithMarshalTo(b *testing.B) { + msgs := []proto.Message{ + aVote().ToProto(), + } + benchmarkMarshalDelimited(b, msgs) +} + +func BenchmarkMarshalDelimitedNoMarshalTo(b *testing.B) { + msgs := []proto.Message{ + &excludedMarshalTo{aVote().ToProto()}, + } + benchmarkMarshalDelimited(b, msgs) +} + +func benchmarkMarshalDelimited(b *testing.B, msgs []proto.Message) { + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + for _, msg := range msgs { + blob, err := protoio.MarshalDelimited(msg) + require.Nil(b, err) + sink = blob + } + } + + if sink == nil { + b.Fatal("Benchmark did not run") + } + + // Reset the sink. + sink = (interface{})(nil) +} diff --git a/types/vote_test.go b/types/vote_test.go index 3ffb60324..29c29baac 100644 --- a/types/vote_test.go +++ b/types/vote_test.go @@ -294,3 +294,52 @@ func TestVoteProtobuf(t *testing.T) { } } } + +var sink interface{} + +var protoVote *tmproto.Vote +var sampleCommit *Commit + +func init() { + protoVote = examplePrecommit().ToProto() + + lastID := makeBlockIDRandom() + voteSet, _, vals := randVoteSet(2, 1, tmproto.PrecommitType, 10, 1) + commit, err := makeCommit(lastID, 2, 1, voteSet, vals, time.Now()) + if err != nil { + panic(err) + } + sampleCommit = commit +} + +func BenchmarkVoteSignBytes(b *testing.B) { + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + sink = VoteSignBytes("test_chain_id", protoVote) + } + + if sink == nil { + b.Fatal("Benchmark did not run") + } + + // Reset the sink. + sink = (interface{})(nil) +} + +func BenchmarkCommitVoteSignBytes(b *testing.B) { + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + for index := range sampleCommit.Signatures { + sink = sampleCommit.VoteSignBytes("test_chain_id", int32(index)) + } + } + + if sink == nil { + b.Fatal("Benchmark did not run") + } + + // Reset the sink. + sink = (interface{})(nil) +}