Browse Source

internal/libs/protoio: optimize MarshalDelimited by plain byteslice allocations+sync.Pool (#7325)

Noticed in profiles that invoking *VoteSignBytes always created a
bytes.Buffer, then discarded it inside protoio.MarshalDelimited.
I dug further and examined the call paths and noticed that we
unconditionally create the bytes.Buffer, even though we might
have proto messages (in the common case) that implement
MarshalTo([]byte), and invoked varintWriter. Instead by inlining
this case, we skip a bunch of allocations and CPU cycles,
which then reflects properly on all calling functions. Here
are the benchmark results:

```shell
$ benchstat before.txt after.txt
name                                        old time/op    new time/op      delta
types.VoteSignBytes-8                       705ns ± 3%     573ns ± 6%       -18.74% (p=0.000 n=18+20)
types.CommitVoteSignBytes-8                 8.15µs ± 9%    6.81µs ± 4%      -16.51% (p=0.000 n=20+19)
protoio.MarshalDelimitedWithMarshalTo-8     788ns ± 8%     772ns ± 3%       -2.01%  (p=0.050 n=20+20)
protoio.MarshalDelimitedNoMarshalTo-8       989ns ± 4%     845ns ± 2%       -14.51% (p=0.000 n=20+18)

name                                        old alloc/op   new alloc/op    delta
types.VoteSignBytes-8                       792B ± 0%      600B ± 0%       -24.24%  (p=0.000 n=20+20)
types.CommitVoteSignBytes-8                 9.52kB ± 0%    7.60kB ± 0%     -20.17%  (p=0.000 n=20+20)
protoio.MarshalDelimitedNoMarshalTo-8       808B ± 0%      440B ± 0%       -45.54%  (p=0.000 n=20+20)

name                                        old allocs/op  new allocs/op   delta
types.VoteSignBytes-8                       13.0 ± 0%      10.0 ± 0%       -23.08%  (p=0.000 n=20+20)
types.CommitVoteSignBytes-8                 140 ± 0%       110 ± 0%        -21.43%  (p=0.000 n=20+20)
protoio.MarshalDelimitedNoMarshalTo-8       10.0 ± 0%      7.0 ± 0%        -30.00%  (p=0.000 n=20+20)
```

Thanks to Tharsis who tasked me to help them increase TPS and who
are keen on improving Tendermint and efficiency.
pull/7423/head
Emmanuel T Odeke 3 years ago
committed by GitHub
parent
commit
3e92899bd9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 178 additions and 3 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +37
    -3
      internal/libs/protoio/writer.go
  3. +91
    -0
      internal/libs/protoio/writer_test.go
  4. +49
    -0
      types/vote_test.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -44,6 +44,7 @@ Special thanks to external contributors on this release:
- [mempool, rpc] \#7041 Add removeTx operation to the RPC layer. (@tychoish)
### IMPROVEMENTS
- [internal/protoio] \#7325 Optimized `MarshalDelimited` by inlining the common case and using a `sync.Pool` in the worst case. (@odeke-em)
- [pubsub] \#7319 Performance improvements for the event query API (@creachadair)


+ 37
- 3
internal/libs/protoio/writer.go View File

@ -34,6 +34,7 @@ import (
"bytes"
"encoding/binary"
"io"
"sync"
"github.com/gogo/protobuf/proto"
)
@ -90,11 +91,44 @@ func (w *varintWriter) Close() error {
return nil
}
func varintWrittenBytes(m marshaler, size int) ([]byte, error) {
buf := make([]byte, size+binary.MaxVarintLen64)
n := binary.PutUvarint(buf, uint64(size))
nw, err := m.MarshalTo(buf[n:])
if err != nil {
return nil, err
}
return buf[:n+nw], nil
}
var bufPool = &sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func MarshalDelimited(msg proto.Message) ([]byte, error) {
var buf bytes.Buffer
_, err := NewDelimitedWriter(&buf).WriteMsg(msg)
// The goal here is to write proto message as is knowning already if
// the exact size can be retrieved and if so just use that.
if m, ok := msg.(marshaler); ok {
size, ok := getSize(msg)
if ok {
return varintWrittenBytes(m, size)
}
}
// Otherwise, go down the route of using proto.Marshal,
// and use the buffer pool to retrieve a writer.
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
_, err := NewDelimitedWriter(buf).WriteMsg(msg)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
// Given that we are reusing buffers, we should
// make a copy of the returned bytes.
bytesCopy := make([]byte, buf.Len())
copy(bytesCopy, buf.Bytes())
return bytesCopy, nil
}

+ 91
- 0
internal/libs/protoio/writer_test.go View File

@ -0,0 +1,91 @@
package protoio_test
import (
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/internal/libs/protoio"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
func aVote() *types.Vote {
var stamp, err = time.Parse(types.TimeFormat, "2017-12-25T03:00:01.234Z")
if err != nil {
panic(err)
}
return &types.Vote{
Type: tmproto.SignedMsgType(byte(tmproto.PrevoteType)),
Height: 12345,
Round: 2,
Timestamp: stamp,
BlockID: types.BlockID{
Hash: tmhash.Sum([]byte("blockID_hash")),
PartSetHeader: types.PartSetHeader{
Total: 1000000,
Hash: tmhash.Sum([]byte("blockID_part_set_header_hash")),
},
},
ValidatorAddress: crypto.AddressHash([]byte("validator_address")),
ValidatorIndex: 56789,
}
}
type excludedMarshalTo struct {
msg proto.Message
}
func (emt *excludedMarshalTo) ProtoMessage() {}
func (emt *excludedMarshalTo) String() string {
return emt.msg.String()
}
func (emt *excludedMarshalTo) Reset() {
emt.msg.Reset()
}
func (emt *excludedMarshalTo) Marshal() ([]byte, error) {
return proto.Marshal(emt.msg)
}
var _ proto.Message = (*excludedMarshalTo)(nil)
var sink interface{}
func BenchmarkMarshalDelimitedWithMarshalTo(b *testing.B) {
msgs := []proto.Message{
aVote().ToProto(),
}
benchmarkMarshalDelimited(b, msgs)
}
func BenchmarkMarshalDelimitedNoMarshalTo(b *testing.B) {
msgs := []proto.Message{
&excludedMarshalTo{aVote().ToProto()},
}
benchmarkMarshalDelimited(b, msgs)
}
func benchmarkMarshalDelimited(b *testing.B, msgs []proto.Message) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for _, msg := range msgs {
blob, err := protoio.MarshalDelimited(msg)
require.Nil(b, err)
sink = blob
}
}
if sink == nil {
b.Fatal("Benchmark did not run")
}
// Reset the sink.
sink = (interface{})(nil)
}

+ 49
- 0
types/vote_test.go View File

@ -294,3 +294,52 @@ func TestVoteProtobuf(t *testing.T) {
}
}
}
var sink interface{}
var protoVote *tmproto.Vote
var sampleCommit *Commit
func init() {
protoVote = examplePrecommit().ToProto()
lastID := makeBlockIDRandom()
voteSet, _, vals := randVoteSet(2, 1, tmproto.PrecommitType, 10, 1)
commit, err := makeCommit(lastID, 2, 1, voteSet, vals, time.Now())
if err != nil {
panic(err)
}
sampleCommit = commit
}
func BenchmarkVoteSignBytes(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
sink = VoteSignBytes("test_chain_id", protoVote)
}
if sink == nil {
b.Fatal("Benchmark did not run")
}
// Reset the sink.
sink = (interface{})(nil)
}
func BenchmarkCommitVoteSignBytes(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for index := range sampleCommit.Signatures {
sink = sampleCommit.VoteSignBytes("test_chain_id", int32(index))
}
}
if sink == nil {
b.Fatal("Benchmark did not run")
}
// Reset the sink.
sink = (interface{})(nil)
}

Loading…
Cancel
Save