diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 9f39edaa6..70b307961 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -44,6 +44,7 @@ Special thanks to external contributors on this release: - [mempool, rpc] \#7041 Add removeTx operation to the RPC layer. (@tychoish) ### IMPROVEMENTS +- [internal/protoio] \#7325 Optimized `MarshalDelimited` by inlining the common case and using a `sync.Pool` in the worst case. (@odeke-em) - [pubsub] \#7319 Performance improvements for the event query API (@creachadair) diff --git a/internal/libs/protoio/writer.go b/internal/libs/protoio/writer.go index d4c66798f..93be1f851 100644 --- a/internal/libs/protoio/writer.go +++ b/internal/libs/protoio/writer.go @@ -34,6 +34,7 @@ import ( "bytes" "encoding/binary" "io" + "sync" "github.com/gogo/protobuf/proto" ) @@ -90,11 +91,44 @@ func (w *varintWriter) Close() error { return nil } +func varintWrittenBytes(m marshaler, size int) ([]byte, error) { + buf := make([]byte, size+binary.MaxVarintLen64) + n := binary.PutUvarint(buf, uint64(size)) + nw, err := m.MarshalTo(buf[n:]) + if err != nil { + return nil, err + } + return buf[:n+nw], nil +} + +var bufPool = &sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + func MarshalDelimited(msg proto.Message) ([]byte, error) { - var buf bytes.Buffer - _, err := NewDelimitedWriter(&buf).WriteMsg(msg) + // The goal here is to write proto message as is knowning already if + // the exact size can be retrieved and if so just use that. + if m, ok := msg.(marshaler); ok { + size, ok := getSize(msg) + if ok { + return varintWrittenBytes(m, size) + } + } + + // Otherwise, go down the route of using proto.Marshal, + // and use the buffer pool to retrieve a writer. + buf := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(buf) + buf.Reset() + _, err := NewDelimitedWriter(buf).WriteMsg(msg) if err != nil { return nil, err } - return buf.Bytes(), nil + // Given that we are reusing buffers, we should + // make a copy of the returned bytes. + bytesCopy := make([]byte, buf.Len()) + copy(bytesCopy, buf.Bytes()) + return bytesCopy, nil } diff --git a/internal/libs/protoio/writer_test.go b/internal/libs/protoio/writer_test.go new file mode 100644 index 000000000..a4c0b6552 --- /dev/null +++ b/internal/libs/protoio/writer_test.go @@ -0,0 +1,91 @@ +package protoio_test + +import ( + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/crypto/tmhash" + "github.com/tendermint/tendermint/internal/libs/protoio" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + "github.com/tendermint/tendermint/types" +) + +func aVote() *types.Vote { + var stamp, err = time.Parse(types.TimeFormat, "2017-12-25T03:00:01.234Z") + if err != nil { + panic(err) + } + + return &types.Vote{ + Type: tmproto.SignedMsgType(byte(tmproto.PrevoteType)), + Height: 12345, + Round: 2, + Timestamp: stamp, + BlockID: types.BlockID{ + Hash: tmhash.Sum([]byte("blockID_hash")), + PartSetHeader: types.PartSetHeader{ + Total: 1000000, + Hash: tmhash.Sum([]byte("blockID_part_set_header_hash")), + }, + }, + ValidatorAddress: crypto.AddressHash([]byte("validator_address")), + ValidatorIndex: 56789, + } +} + +type excludedMarshalTo struct { + msg proto.Message +} + +func (emt *excludedMarshalTo) ProtoMessage() {} +func (emt *excludedMarshalTo) String() string { + return emt.msg.String() +} +func (emt *excludedMarshalTo) Reset() { + emt.msg.Reset() +} +func (emt *excludedMarshalTo) Marshal() ([]byte, error) { + return proto.Marshal(emt.msg) +} + +var _ proto.Message = (*excludedMarshalTo)(nil) + +var sink interface{} + +func BenchmarkMarshalDelimitedWithMarshalTo(b *testing.B) { + msgs := []proto.Message{ + aVote().ToProto(), + } + benchmarkMarshalDelimited(b, msgs) +} + +func BenchmarkMarshalDelimitedNoMarshalTo(b *testing.B) { + msgs := []proto.Message{ + &excludedMarshalTo{aVote().ToProto()}, + } + benchmarkMarshalDelimited(b, msgs) +} + +func benchmarkMarshalDelimited(b *testing.B, msgs []proto.Message) { + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + for _, msg := range msgs { + blob, err := protoio.MarshalDelimited(msg) + require.Nil(b, err) + sink = blob + } + } + + if sink == nil { + b.Fatal("Benchmark did not run") + } + + // Reset the sink. + sink = (interface{})(nil) +} diff --git a/types/vote_test.go b/types/vote_test.go index 3ffb60324..29c29baac 100644 --- a/types/vote_test.go +++ b/types/vote_test.go @@ -294,3 +294,52 @@ func TestVoteProtobuf(t *testing.T) { } } } + +var sink interface{} + +var protoVote *tmproto.Vote +var sampleCommit *Commit + +func init() { + protoVote = examplePrecommit().ToProto() + + lastID := makeBlockIDRandom() + voteSet, _, vals := randVoteSet(2, 1, tmproto.PrecommitType, 10, 1) + commit, err := makeCommit(lastID, 2, 1, voteSet, vals, time.Now()) + if err != nil { + panic(err) + } + sampleCommit = commit +} + +func BenchmarkVoteSignBytes(b *testing.B) { + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + sink = VoteSignBytes("test_chain_id", protoVote) + } + + if sink == nil { + b.Fatal("Benchmark did not run") + } + + // Reset the sink. + sink = (interface{})(nil) +} + +func BenchmarkCommitVoteSignBytes(b *testing.B) { + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + for index := range sampleCommit.Signatures { + sink = sampleCommit.VoteSignBytes("test_chain_id", int32(index)) + } + } + + if sink == nil { + b.Fatal("Benchmark did not run") + } + + // Reset the sink. + sink = (interface{})(nil) +}