Browse Source

p2p/conn: migrate to Protobuf (#4990)

Migrates the p2p connections to Protobuf. Supersedes #4800.

gogoproto's `NewDelimitedReader()` uses an internal buffer, which makes it unsuitable for reading individual messages from a shared reader (since any remaining data in the buffer will be discarded). We therefore add a new `protoio` package with an unbuffered `NewDelimitedReader()`. Additionally, the `NewDelimitedWriter()` returns the number of bytes written, and we've added `MarshalDelimited()` and `UnmarshalDelimited()`, to ease migration of existing code.
pull/4994/head
Erik Grinaker 4 years ago
committed by GitHub
parent
commit
660e72a12f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 926 additions and 217 deletions
  1. +96
    -0
      libs/protoio/io.go
  2. +157
    -0
      libs/protoio/io_test.go
  3. +88
    -0
      libs/protoio/reader.go
  4. +100
    -0
      libs/protoio/writer.go
  5. +0
    -14
      p2p/conn/codec.go
  6. +78
    -71
      p2p/conn/connection.go
  7. +76
    -69
      p2p/conn/connection_test.go
  8. +21
    -6
      p2p/conn/evil_secret_connection_test.go
  9. +37
    -15
      p2p/conn/secret_connection.go
  10. +12
    -20
      p2p/conn/secret_connection_test.go
  11. +255
    -22
      proto/p2p/conn_msgs.pb.go
  12. +6
    -0
      proto/p2p/conn_msgs.proto

+ 96
- 0
libs/protoio/io.go View File

@ -0,0 +1,96 @@
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// Modified to return number of bytes written by Writer.WriteMsg(), and added byteReader.
package protoio
import (
"io"
"github.com/gogo/protobuf/proto"
)
type Writer interface {
WriteMsg(proto.Message) (int, error)
}
type WriteCloser interface {
Writer
io.Closer
}
type Reader interface {
ReadMsg(msg proto.Message) error
}
type ReadCloser interface {
Reader
io.Closer
}
type marshaler interface {
MarshalTo(data []byte) (n int, err error)
}
func getSize(v interface{}) (int, bool) {
if sz, ok := v.(interface {
Size() (n int)
}); ok {
return sz.Size(), true
} else if sz, ok := v.(interface {
ProtoSize() (n int)
}); ok {
return sz.ProtoSize(), true
} else {
return 0, false
}
}
// byteReader wraps an io.Reader and implements io.ByteReader. Reading one byte at a
// time is extremely slow, but this is what Amino did already, and the caller can
// wrap the reader in bufio.Reader if appropriate.
type byteReader struct {
io.Reader
bytes []byte
}
func newByteReader(r io.Reader) *byteReader {
return &byteReader{
Reader: r,
bytes: make([]byte, 1),
}
}
func (r *byteReader) ReadByte() (byte, error) {
_, err := r.Read(r.bytes)
if err != nil {
return 0, err
}
return r.bytes[0], nil
}

+ 157
- 0
libs/protoio/io_test.go View File

@ -0,0 +1,157 @@
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package protoio_test
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"math/rand"
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/test"
"github.com/tendermint/tendermint/libs/protoio"
)
func iotest(writer protoio.WriteCloser, reader protoio.ReadCloser) error {
varint := make([]byte, binary.MaxVarintLen64)
size := 1000
msgs := make([]*test.NinOptNative, size)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range msgs {
msgs[i] = test.NewPopulatedNinOptNative(r, true)
//issue 31
if i == 5 {
msgs[i] = &test.NinOptNative{}
}
//issue 31
if i == 999 {
msgs[i] = &test.NinOptNative{}
}
// FIXME Check size
bz, err := proto.Marshal(msgs[i])
if err != nil {
return err
}
visize := binary.PutUvarint(varint, uint64(len(bz)))
n, err := writer.WriteMsg(msgs[i])
if err != nil {
return err
}
if n != len(bz)+visize {
return fmt.Errorf("WriteMsg() wrote %v bytes, expected %v", n, len(bz)+visize) // nolint
}
}
if err := writer.Close(); err != nil {
return err
}
i := 0
for {
msg := &test.NinOptNative{}
if err := reader.ReadMsg(msg); err != nil {
if err == io.EOF {
break
}
return err
}
if err := msg.VerboseEqual(msgs[i]); err != nil {
return err
}
i++
}
if i != size {
panic("not enough messages read")
}
if err := reader.Close(); err != nil {
return err
}
return nil
}
type buffer struct {
*bytes.Buffer
closed bool
}
func (b *buffer) Close() error {
b.closed = true
return nil
}
func newBuffer() *buffer {
return &buffer{bytes.NewBuffer(nil), false}
}
func TestVarintNormal(t *testing.T) {
buf := newBuffer()
writer := protoio.NewDelimitedWriter(buf)
reader := protoio.NewDelimitedReader(buf, 1024*1024)
if err := iotest(writer, reader); err != nil {
t.Error(err)
}
if !buf.closed {
t.Fatalf("did not close buffer")
}
}
func TestVarintNoClose(t *testing.T) {
buf := bytes.NewBuffer(nil)
writer := protoio.NewDelimitedWriter(buf)
reader := protoio.NewDelimitedReader(buf, 1024*1024)
if err := iotest(writer, reader); err != nil {
t.Error(err)
}
}
//issue 32
func TestVarintMaxSize(t *testing.T) {
buf := newBuffer()
writer := protoio.NewDelimitedWriter(buf)
reader := protoio.NewDelimitedReader(buf, 20)
if err := iotest(writer, reader); err == nil {
t.Error(err)
} else {
t.Logf("%s", err)
}
}
func TestVarintError(t *testing.T) {
buf := newBuffer()
buf.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f})
reader := protoio.NewDelimitedReader(buf, 1024*1024)
msg := &test.NinOptNative{}
err := reader.ReadMsg(msg)
if err == nil {
t.Fatalf("Expected error")
}
}

+ 88
- 0
libs/protoio/reader.go View File

@ -0,0 +1,88 @@
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// Modified from original GoGo Protobuf to not buffer the reader.
package protoio
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"github.com/gogo/protobuf/proto"
)
// NewDelimitedReader reads varint-delimited Protobuf messages from a reader. Unlike the gogoproto
// NewDelimitedReader, this does not buffer the reader, which may cause poor performance but is
// necessary when only reading single messages (e.g. in the p2p package).
func NewDelimitedReader(r io.Reader, maxSize int) ReadCloser {
var closer io.Closer
if c, ok := r.(io.Closer); ok {
closer = c
}
return &varintReader{newByteReader(r), nil, maxSize, closer}
}
type varintReader struct {
r *byteReader
buf []byte
maxSize int
closer io.Closer
}
func (r *varintReader) ReadMsg(msg proto.Message) error {
length64, err := binary.ReadUvarint(newByteReader(r.r))
if err != nil {
return err
}
length := int(length64)
if length < 0 || length > r.maxSize {
return fmt.Errorf("message exceeds max size (%v > %v)", length, r.maxSize)
}
if len(r.buf) < length {
r.buf = make([]byte, length)
}
buf := r.buf[:length]
if _, err := io.ReadFull(r.r, buf); err != nil {
return err
}
return proto.Unmarshal(buf, msg)
}
func (r *varintReader) Close() error {
if r.closer != nil {
return r.closer.Close()
}
return nil
}
func UnmarshalDelimited(data []byte, msg proto.Message) error {
return NewDelimitedReader(bytes.NewReader(data), len(data)).ReadMsg(msg)
}

+ 100
- 0
libs/protoio/writer.go View File

@ -0,0 +1,100 @@
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// Modified from original GoGo Protobuf to return number of bytes written.
package protoio
import (
"bytes"
"encoding/binary"
"io"
"github.com/gogo/protobuf/proto"
)
// NewDelimitedWriter writes a varint-delimited Protobuf message to a writer. It is
// equivalent to the gogoproto NewDelimitedWriter, except WriteMsg() also returns the
// number of bytes written, which is necessary in the p2p package.
func NewDelimitedWriter(w io.Writer) WriteCloser {
return &varintWriter{w, make([]byte, binary.MaxVarintLen64), nil}
}
type varintWriter struct {
w io.Writer
lenBuf []byte
buffer []byte
}
func (w *varintWriter) WriteMsg(msg proto.Message) (int, error) {
if m, ok := msg.(marshaler); ok {
n, ok := getSize(m)
if ok {
if n+binary.MaxVarintLen64 >= len(w.buffer) {
w.buffer = make([]byte, n+binary.MaxVarintLen64)
}
lenOff := binary.PutUvarint(w.buffer, uint64(n))
_, err := m.MarshalTo(w.buffer[lenOff:])
if err != nil {
return 0, err
}
_, err = w.w.Write(w.buffer[:lenOff+n])
return lenOff + n, err
}
}
// fallback
data, err := proto.Marshal(msg)
if err != nil {
return 0, err
}
length := uint64(len(data))
n := binary.PutUvarint(w.lenBuf, length)
_, err = w.w.Write(w.lenBuf[:n])
if err != nil {
return 0, err
}
_, err = w.w.Write(data)
return len(data) + n, err
}
func (w *varintWriter) Close() error {
if closer, ok := w.w.(io.Closer); ok {
return closer.Close()
}
return nil
}
func MarshalDelimited(msg proto.Message) ([]byte, error) {
var buf bytes.Buffer
_, err := NewDelimitedWriter(&buf).WriteMsg(msg)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

+ 0
- 14
p2p/conn/codec.go View File

@ -1,14 +0,0 @@
package conn
import (
amino "github.com/tendermint/go-amino"
cryptoamino "github.com/tendermint/tendermint/crypto/encoding/amino"
)
var cdc *amino.Codec = amino.NewCodec()
func init() {
cryptoamino.RegisterAmino(cdc)
RegisterPacket(cdc)
}

+ 78
- 71
p2p/conn/connection.go View File

@ -2,25 +2,26 @@ package conn
import (
"bufio"
"runtime/debug"
"errors"
"fmt"
"io"
"math"
"net"
"reflect"
"runtime/debug"
"sync"
"sync/atomic"
"time"
amino "github.com/tendermint/go-amino"
"github.com/gogo/protobuf/proto"
flow "github.com/tendermint/tendermint/libs/flowrate"
"github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/libs/protoio"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/libs/timer"
tmp2p "github.com/tendermint/tendermint/proto/p2p"
)
const (
@ -66,7 +67,7 @@ There are two methods for sending messages:
`Send(chID, msgBytes)` is a blocking call that waits until `msg` is
successfully queued for the channel with the given id byte `chID`, or until the
request times out. The message `msg` is serialized using Go-Amino.
request times out. The message `msg` is serialized using Protobuf.
`TrySend(chID, msgBytes)` is a nonblocking call that returns false if the
channel's queue is full.
@ -418,9 +419,11 @@ func (c *MConnection) CanSend(chID byte) bool {
func (c *MConnection) sendRoutine() {
defer c._recover()
protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter)
FOR_LOOP:
for {
var _n int64
var _n int
var err error
SELECTION:
select {
@ -434,11 +437,12 @@ FOR_LOOP:
}
case <-c.pingTimer.C:
c.Logger.Debug("Send Ping")
_n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{})
_n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
if err != nil {
c.Logger.Error("Failed to send PacketPing", "err", err)
break SELECTION
}
c.sendMonitor.Update(int(_n))
c.sendMonitor.Update(_n)
c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
select {
@ -456,11 +460,12 @@ FOR_LOOP:
}
case <-c.pong:
c.Logger.Debug("Send Pong")
_n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPong{})
_n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
if err != nil {
c.Logger.Error("Failed to send PacketPong", "err", err)
break SELECTION
}
c.sendMonitor.Update(int(_n))
c.sendMonitor.Update(_n)
c.flush()
case <-c.quitSendRoutine:
break FOR_LOOP
@ -540,7 +545,7 @@ func (c *MConnection) sendPacketMsg() bool {
c.stopForError(err)
return true
}
c.sendMonitor.Update(int(_n))
c.sendMonitor.Update(_n)
c.flushTimer.Set()
return false
}
@ -552,6 +557,8 @@ func (c *MConnection) sendPacketMsg() bool {
func (c *MConnection) recvRoutine() {
defer c._recover()
protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize)
FOR_LOOP:
for {
// Block until .recvMonitor says we can read.
@ -572,12 +579,9 @@ FOR_LOOP:
*/
// Read packet type
var packet Packet
var _n int64
var err error
_n, err = cdc.UnmarshalBinaryLengthPrefixedReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize))
c.recvMonitor.Update(int(_n))
var packet tmp2p.Packet
err := protoReader.ReadMsg(&packet)
if err != nil {
// stopServices was invoked and we are shutting down
// receiving is excpected to fail since we will close the connection
@ -599,8 +603,8 @@ FOR_LOOP:
}
// Read more depending on packet type.
switch pkt := packet.(type) {
case PacketPing:
switch pkt := packet.Sum.(type) {
case *tmp2p.Packet_PacketPing:
// TODO: prevent abuse, as they cause flush()'s.
// https://github.com/tendermint/tendermint/issues/1190
c.Logger.Debug("Receive Ping")
@ -609,23 +613,23 @@ FOR_LOOP:
default:
// never block
}
case PacketPong:
case *tmp2p.Packet_PacketPong:
c.Logger.Debug("Receive Pong")
select {
case c.pongTimeoutCh <- false:
default:
// never block
}
case PacketMsg:
channel, ok := c.channelsIdx[pkt.ChannelID]
case *tmp2p.Packet_PacketMsg:
channel, ok := c.channelsIdx[byte(pkt.PacketMsg.ChannelID)]
if !ok || channel == nil {
err := fmt.Errorf("unknown channel %X", pkt.ChannelID)
err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID)
c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
break FOR_LOOP
}
msgBytes, err := channel.recvPacketMsg(pkt)
msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
if err != nil {
if c.IsRunning() {
c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
@ -634,9 +638,9 @@ FOR_LOOP:
break FOR_LOOP
}
if msgBytes != nil {
c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
c.Logger.Debug("Received bytes", "chID", pkt.PacketMsg.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
// NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
c.onReceive(pkt.ChannelID, msgBytes)
c.onReceive(byte(pkt.PacketMsg.ChannelID), msgBytes)
}
default:
err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet))
@ -661,14 +665,17 @@ func (c *MConnection) stopPongTimer() {
}
}
// maxPacketMsgSize returns a maximum size of PacketMsg, including the overhead
// of amino encoding.
// maxPacketMsgSize returns a maximum size of PacketMsg
func (c *MConnection) maxPacketMsgSize() int {
return len(cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{
bz, err := proto.Marshal(mustWrapPacket(&tmp2p.PacketMsg{
ChannelID: 0x01,
EOF: 1,
Bytes: make([]byte, c.config.MaxPacketMsgPayloadSize),
})) + 10 // leave room for changes in amino
Data: make([]byte, c.config.MaxPacketMsgPayloadSize),
}))
if err != nil {
panic(err)
}
return len(bz)
}
type ConnectionStatus struct {
@ -814,17 +821,16 @@ func (ch *Channel) isSendPending() bool {
// Creates a new PacketMsg to send.
// Not goroutine-safe
func (ch *Channel) nextPacketMsg() PacketMsg {
packet := PacketMsg{}
packet.ChannelID = ch.desc.ID
func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg {
packet := tmp2p.PacketMsg{ChannelID: int32(ch.desc.ID)}
maxSize := ch.maxPacketMsgPayloadSize
packet.Bytes = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))]
packet.Data = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))]
if len(ch.sending) <= maxSize {
packet.EOF = byte(0x01)
packet.EOF = 0x01
ch.sending = nil
atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
} else {
packet.EOF = byte(0x00)
packet.EOF = 0x00
ch.sending = ch.sending[tmmath.MinInt(maxSize, len(ch.sending)):]
}
return packet
@ -832,24 +838,24 @@ func (ch *Channel) nextPacketMsg() PacketMsg {
// Writes next PacketMsg to w and updates c.recentlySent.
// Not goroutine-safe
func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) {
var packet = ch.nextPacketMsg()
n, err = cdc.MarshalBinaryLengthPrefixedWriter(w, packet)
atomic.AddInt64(&ch.recentlySent, n)
func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) {
packet := ch.nextPacketMsg()
n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet))
atomic.AddInt64(&ch.recentlySent, int64(n))
return
}
// Handles incoming PacketMsgs. It returns a message bytes if message is
// complete. NOTE message bytes may change on next call to recvPacketMsg.
// Not goroutine-safe
func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Bytes)
var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data)
if recvCap < recvReceived {
return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived)
}
ch.recving = append(ch.recving, packet.Bytes...)
if packet.EOF == byte(0x01) {
ch.recving = append(ch.recving, packet.Data...)
if packet.EOF == 0x01 {
msgBytes := ch.recving
// clear the slice without re-allocating.
@ -873,33 +879,34 @@ func (ch *Channel) updateStats() {
//----------------------------------------
// Packet
type Packet interface {
AssertIsPacket()
}
func RegisterPacket(cdc *amino.Codec) {
cdc.RegisterInterface((*Packet)(nil), nil)
cdc.RegisterConcrete(PacketPing{}, "tendermint/p2p/PacketPing", nil)
cdc.RegisterConcrete(PacketPong{}, "tendermint/p2p/PacketPong", nil)
cdc.RegisterConcrete(PacketMsg{}, "tendermint/p2p/PacketMsg", nil)
}
func (PacketPing) AssertIsPacket() {}
func (PacketPong) AssertIsPacket() {}
func (PacketMsg) AssertIsPacket() {}
type PacketPing struct {
}
type PacketPong struct {
}
type PacketMsg struct {
ChannelID byte
EOF byte // 1 means message ends here.
Bytes []byte
}
// mustWrapPacket takes a packet kind (oneof) and wraps it in a tmp2p.Packet message.
func mustWrapPacket(pb proto.Message) *tmp2p.Packet {
var msg tmp2p.Packet
switch pb := pb.(type) {
case *tmp2p.Packet: // already a packet
msg = *pb
case *tmp2p.PacketPing:
msg = tmp2p.Packet{
Sum: &tmp2p.Packet_PacketPing{
PacketPing: pb,
},
}
case *tmp2p.PacketPong:
msg = tmp2p.Packet{
Sum: &tmp2p.Packet_PacketPong{
PacketPong: pb,
},
}
case *tmp2p.PacketMsg:
msg = tmp2p.Packet{
Sum: &tmp2p.Packet_PacketMsg{
PacketMsg: pb,
},
}
default:
panic(fmt.Errorf("unknown packet type %T", pb))
}
func (mp PacketMsg) String() string {
return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF)
return &msg
}

+ 76
- 69
p2p/conn/connection_test.go View File

@ -1,7 +1,6 @@
package conn
import (
"bytes"
"net"
"testing"
"time"
@ -10,9 +9,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
amino "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/protoio"
tmp2p "github.com/tendermint/tendermint/proto/p2p"
"github.com/tendermint/tendermint/proto/types"
)
const maxPingPongPacketSize = 1024 // bytes
@ -54,12 +54,12 @@ func TestMConnectionSendFlushStop(t *testing.T) {
msg := []byte("abc")
assert.True(t, clientConn.Send(0x01, msg))
aminoMsgLength := 14
msgLength := 14
// start the reader in a new routine, so we can flush
errCh := make(chan error)
go func() {
msgB := make([]byte, aminoMsgLength)
msgB := make([]byte, msgLength)
_, err := server.Read(msgB)
if err != nil {
t.Error(err)
@ -182,9 +182,9 @@ func TestMConnectionPongTimeoutResultsInError(t *testing.T) {
serverGotPing := make(chan struct{})
go func() {
// read ping
var pkt PacketPing
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
assert.Nil(t, err)
var pkt tmp2p.Packet
err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&pkt)
require.NoError(t, err)
serverGotPing <- struct{}{}
}()
<-serverGotPing
@ -219,26 +219,28 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) {
defer mconn.Stop()
// sending 3 pongs in a row (abuse)
_, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
require.Nil(t, err)
_, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
require.Nil(t, err)
_, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
require.Nil(t, err)
protoWriter := protoio.NewDelimitedWriter(server)
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
require.NoError(t, err)
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
require.NoError(t, err)
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
require.NoError(t, err)
serverGotPing := make(chan struct{})
go func() {
// read ping (one byte)
var (
packet Packet
err error
)
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &packet, maxPingPongPacketSize)
require.Nil(t, err)
var packet tmp2p.Packet
err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&packet)
require.NoError(t, err)
serverGotPing <- struct{}{}
// respond with pong
_, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
require.Nil(t, err)
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
require.NoError(t, err)
}()
<-serverGotPing
@ -273,19 +275,27 @@ func TestMConnectionMultiplePings(t *testing.T) {
// sending 3 pings in a row (abuse)
// see https://github.com/tendermint/tendermint/issues/1190
_, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
require.Nil(t, err)
var pkt PacketPong
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
require.Nil(t, err)
_, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
require.Nil(t, err)
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
require.Nil(t, err)
_, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{}))
require.Nil(t, err)
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
require.Nil(t, err)
protoReader := protoio.NewDelimitedReader(server, maxPingPongPacketSize)
protoWriter := protoio.NewDelimitedWriter(server)
var pkt tmp2p.Packet
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
require.NoError(t, err)
err = protoReader.ReadMsg(&pkt)
require.NoError(t, err)
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
require.NoError(t, err)
err = protoReader.ReadMsg(&pkt)
require.NoError(t, err)
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{}))
require.NoError(t, err)
err = protoReader.ReadMsg(&pkt)
require.NoError(t, err)
assert.True(t, mconn.IsRunning())
}
@ -314,25 +324,32 @@ func TestMConnectionPingPongs(t *testing.T) {
serverGotPing := make(chan struct{})
go func() {
protoReader := protoio.NewDelimitedReader(server, maxPingPongPacketSize)
protoWriter := protoio.NewDelimitedWriter(server)
var pkt tmp2p.PacketPing
// read ping
var pkt PacketPing
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
require.Nil(t, err)
err = protoReader.ReadMsg(&pkt)
require.NoError(t, err)
serverGotPing <- struct{}{}
// respond with pong
_, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
require.Nil(t, err)
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
require.NoError(t, err)
time.Sleep(mconn.config.PingInterval)
// read ping
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize)
require.Nil(t, err)
err = protoReader.ReadMsg(&pkt)
require.NoError(t, err)
serverGotPing <- struct{}{}
// respond with pong
_, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{}))
require.Nil(t, err)
_, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{}))
require.NoError(t, err)
}()
<-serverGotPing
<-serverGotPing
pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 2
select {
@ -425,13 +442,9 @@ func TestMConnectionReadErrorBadEncoding(t *testing.T) {
client := mconnClient.conn
// send badly encoded msgPacket
bz := cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{})
bz[4] += 0x01 // Invalid prefix bytes.
// Write it.
_, err := client.Write(bz)
assert.Nil(t, err)
_, err := client.Write([]byte{1, 2, 3, 4, 5})
require.NoError(t, err)
assert.True(t, expectSend(chOnErr), "badly encoded msgPacket")
}
@ -465,32 +478,28 @@ func TestMConnectionReadErrorLongMessage(t *testing.T) {
}
client := mconnClient.conn
protoWriter := protoio.NewDelimitedWriter(client)
// send msg thats just right
var err error
var buf = new(bytes.Buffer)
var packet = PacketMsg{
var packet = tmp2p.PacketMsg{
ChannelID: 0x01,
EOF: 1,
Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize),
Data: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize),
}
_, err = cdc.MarshalBinaryLengthPrefixedWriter(buf, packet)
assert.Nil(t, err)
_, err = client.Write(buf.Bytes())
assert.Nil(t, err)
_, err := protoWriter.WriteMsg(mustWrapPacket(&packet))
require.NoError(t, err)
assert.True(t, expectSend(chOnRcv), "msg just right")
// send msg thats too long
buf = new(bytes.Buffer)
packet = PacketMsg{
packet = tmp2p.PacketMsg{
ChannelID: 0x01,
EOF: 1,
Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+100),
Data: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+100),
}
_, err = cdc.MarshalBinaryLengthPrefixedWriter(buf, packet)
assert.Nil(t, err)
_, err = client.Write(buf.Bytes())
assert.NotNil(t, err)
_, err = protoWriter.WriteMsg(mustWrapPacket(&packet))
require.Error(t, err)
assert.True(t, expectSend(chOnErr), "msg too long")
}
@ -501,10 +510,8 @@ func TestMConnectionReadErrorUnknownMsgType(t *testing.T) {
defer mconnServer.Stop()
// send msg with unknown msg type
err := amino.EncodeUvarint(mconnClient.conn, 4)
assert.Nil(t, err)
_, err = mconnClient.conn.Write([]byte{0xFF, 0xFF, 0xFF, 0xFF})
assert.Nil(t, err)
_, err := protoio.NewDelimitedWriter(mconnClient.conn).WriteMsg(&types.Header{ChainID: "x"})
require.NoError(t, err)
assert.True(t, expectSend(chOnErr), "unknown msg type")
}


+ 21
- 6
p2p/conn/evil_secret_connection_test.go View File

@ -6,12 +6,16 @@ import (
"io"
"testing"
gogotypes "github.com/gogo/protobuf/types"
"github.com/gtank/merlin"
"github.com/stretchr/testify/assert"
"golang.org/x/crypto/chacha20poly1305"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
cryptoenc "github.com/tendermint/tendermint/crypto/encoding"
"github.com/tendermint/tendermint/libs/protoio"
tmp2p "github.com/tendermint/tendermint/proto/p2p"
)
type buffer struct {
@ -80,14 +84,15 @@ func (c *evilConn) Read(data []byte) (n int, err error) {
switch c.readStep {
case 0:
if !c.badEphKey {
bz, err := cdc.MarshalBinaryLengthPrefixed(c.locEphPub)
lc := *c.locEphPub
bz, err := protoio.MarshalDelimited(&gogotypes.BytesValue{Value: lc[:]})
if err != nil {
panic(err)
}
copy(data, bz[c.readOffset:])
n = len(data)
} else {
bz, err := cdc.MarshalBinaryLengthPrefixed([]byte("drop users;"))
bz, err := protoio.MarshalDelimited(&gogotypes.BytesValue{Value: []byte("drop users;")})
if err != nil {
panic(err)
}
@ -108,7 +113,11 @@ func (c *evilConn) Read(data []byte) (n int, err error) {
case 1:
signature := c.signChallenge()
if !c.badAuthSignature {
bz, err := cdc.MarshalBinaryLengthPrefixed(authSigMessage{c.privKey.PubKey(), signature})
pkpb, err := cryptoenc.PubKeyToProto(c.privKey.PubKey())
if err != nil {
panic(err)
}
bz, err := protoio.MarshalDelimited(&tmp2p.AuthSigMessage{PubKey: pkpb, Sig: signature})
if err != nil {
panic(err)
}
@ -121,7 +130,7 @@ func (c *evilConn) Read(data []byte) (n int, err error) {
}
copy(data, c.buffer.Bytes()[c.readOffset:])
} else {
bz, err := cdc.MarshalBinaryLengthPrefixed([]byte("select * from users;"))
bz, err := protoio.MarshalDelimited(&gogotypes.BytesValue{Value: []byte("select * from users;")})
if err != nil {
panic(err)
}
@ -144,10 +153,16 @@ func (c *evilConn) Read(data []byte) (n int, err error) {
func (c *evilConn) Write(data []byte) (n int, err error) {
switch c.writeStep {
case 0:
err := cdc.UnmarshalBinaryLengthPrefixed(data, c.remEphPub)
var (
bytes gogotypes.BytesValue
remEphPub [32]byte
)
err := protoio.UnmarshalDelimited(data, &bytes)
if err != nil {
panic(err)
}
copy(remEphPub[:], bytes.Value)
c.remEphPub = &remEphPub
c.writeStep = 1
if !c.shareAuthSignature {
c.writeStep = 2
@ -235,7 +250,7 @@ func TestMakeSecretConnection(t *testing.T) {
errMsg string
}{
{"refuse to share ethimeral key", newEvilConn(false, false, false, false), "EOF"},
{"share bad ethimeral key", newEvilConn(true, true, false, false), "Insufficient bytes to decode"},
{"share bad ethimeral key", newEvilConn(true, true, false, false), "wrong wireType"},
{"refuse to share auth signature", newEvilConn(true, false, false, false), "EOF"},
{"share bad auth signature", newEvilConn(true, false, true, true), "failed to decrypt SecretConnection"},
{"all good", newEvilConn(true, false, true, false), ""},


+ 37
- 15
p2p/conn/secret_connection.go View File

@ -14,6 +14,7 @@ import (
"sync"
"time"
gogotypes "github.com/gogo/protobuf/types"
"github.com/gtank/merlin"
pool "github.com/libp2p/go-buffer-pool"
"golang.org/x/crypto/chacha20poly1305"
@ -23,7 +24,10 @@ import (
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
cryptoenc "github.com/tendermint/tendermint/crypto/encoding"
"github.com/tendermint/tendermint/libs/async"
"github.com/tendermint/tendermint/libs/protoio"
tmp2p "github.com/tendermint/tendermint/proto/p2p"
)
// 4 + 1024 == 1028 total frame size
@ -250,7 +254,7 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) {
defer pool.Put(frame)
_, err = sc.recvAead.Open(frame[:0], sc.recvNonce[:], sealedFrame, nil)
if err != nil {
return n, errors.New("failed to decrypt SecretConnection")
return n, fmt.Errorf("failed to decrypt SecretConnection: %w", err)
}
incrNonce(sc.recvNonce)
// end decryption
@ -300,18 +304,22 @@ func shareEphPubKey(conn io.ReadWriter, locEphPub *[32]byte) (remEphPub *[32]byt
// Send our pubkey and receive theirs in tandem.
var trs, _ = async.Parallel(
func(_ int) (val interface{}, abort bool, err error) {
var _, err1 = cdc.MarshalBinaryLengthPrefixedWriter(conn, locEphPub)
if err1 != nil {
return nil, true, err1 // abort
lc := *locEphPub
_, err = protoio.NewDelimitedWriter(conn).WriteMsg(&gogotypes.BytesValue{Value: lc[:]})
if err != nil {
return nil, true, err // abort
}
return nil, false, nil
},
func(_ int) (val interface{}, abort bool, err error) {
var _remEphPub [32]byte
var _, err2 = cdc.UnmarshalBinaryLengthPrefixedReader(conn, &_remEphPub, 1024*1024) // TODO
if err2 != nil {
return nil, true, err2 // abort
var bytes gogotypes.BytesValue
err = protoio.NewDelimitedReader(conn, 1024*1024).ReadMsg(&bytes)
if err != nil {
return nil, true, err // abort
}
var _remEphPub [32]byte
copy(_remEphPub[:], bytes.Value)
return _remEphPub, false, nil
},
)
@ -399,17 +407,31 @@ func shareAuthSignature(sc io.ReadWriter, pubKey crypto.PubKey, signature []byte
// Send our info and receive theirs in tandem.
var trs, _ = async.Parallel(
func(_ int) (val interface{}, abort bool, err error) {
var _, err1 = cdc.MarshalBinaryLengthPrefixedWriter(sc, authSigMessage{pubKey, signature})
if err1 != nil {
return nil, true, err1 // abort
pbpk, err := cryptoenc.PubKeyToProto(pubKey)
if err != nil {
return nil, true, err
}
_, err = protoio.NewDelimitedWriter(sc).WriteMsg(&tmp2p.AuthSigMessage{PubKey: pbpk, Sig: signature})
if err != nil {
return nil, true, err // abort
}
return nil, false, nil
},
func(_ int) (val interface{}, abort bool, err error) {
var _recvMsg authSigMessage
var _, err2 = cdc.UnmarshalBinaryLengthPrefixedReader(sc, &_recvMsg, 1024*1024) // TODO
if err2 != nil {
return nil, true, err2 // abort
var pba tmp2p.AuthSigMessage
err = protoio.NewDelimitedReader(sc, 1024*1024).ReadMsg(&pba)
if err != nil {
return nil, true, err // abort
}
pk, err := cryptoenc.PubKeyFromProto(pba.PubKey)
if err != nil {
return nil, true, err // abort
}
_recvMsg := authSigMessage{
Key: pk,
Sig: pba.Sig,
}
return _recvMsg, false, nil
},


+ 12
- 20
p2p/conn/secret_connection_test.go View File

@ -257,38 +257,30 @@ func TestDeriveSecretsAndChallengeGolden(t *testing.T) {
func TestNilPubkey(t *testing.T) {
var fooConn, barConn = makeKVStoreConnPair()
defer fooConn.Close()
defer barConn.Close()
var fooPrvKey = ed25519.GenPrivKey()
var barPrvKey = privKeyWithNilPubKey{ed25519.GenPrivKey()}
go func() {
_, err := MakeSecretConnection(barConn, barPrvKey)
assert.NoError(t, err)
}()
go MakeSecretConnection(fooConn, fooPrvKey)
assert.NotPanics(t, func() {
_, err := MakeSecretConnection(fooConn, fooPrvKey)
if assert.Error(t, err) {
assert.Equal(t, "expected ed25519 pubkey, got <nil>", err.Error())
}
})
_, err := MakeSecretConnection(barConn, barPrvKey)
require.Error(t, err)
assert.Equal(t, "toproto: key type <nil> is not supported", err.Error())
}
func TestNonEd25519Pubkey(t *testing.T) {
var fooConn, barConn = makeKVStoreConnPair()
defer fooConn.Close()
defer barConn.Close()
var fooPrvKey = ed25519.GenPrivKey()
var barPrvKey = secp256k1.GenPrivKey()
go func() {
_, err := MakeSecretConnection(barConn, barPrvKey)
assert.NoError(t, err)
}()
go MakeSecretConnection(fooConn, fooPrvKey)
assert.NotPanics(t, func() {
_, err := MakeSecretConnection(fooConn, fooPrvKey)
if assert.Error(t, err) {
assert.Equal(t, "expected ed25519 pubkey, got secp256k1.PubKey", err.Error())
}
})
_, err := MakeSecretConnection(barConn, barPrvKey)
require.Error(t, err)
assert.Contains(t, err.Error(), "is not supported")
}
func writeLots(t *testing.T, wg *sync.WaitGroup, conn io.Writer, txt string, n int) {


+ 255
- 22
proto/p2p/conn_msgs.pb.go View File

@ -7,6 +7,7 @@ import (
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
keys "github.com/tendermint/tendermint/proto/crypto/keys"
io "io"
math "math"
math_bits "math/bits"
@ -253,38 +254,96 @@ func (*Packet) XXX_OneofWrappers() []interface{} {
}
}
type AuthSigMessage struct {
PubKey keys.PublicKey `protobuf:"bytes,1,opt,name=pub_key,json=pubKey,proto3" json:"pub_key"`
Sig []byte `protobuf:"bytes,2,opt,name=sig,proto3" json:"sig,omitempty"`
}
func (m *AuthSigMessage) Reset() { *m = AuthSigMessage{} }
func (m *AuthSigMessage) String() string { return proto.CompactTextString(m) }
func (*AuthSigMessage) ProtoMessage() {}
func (*AuthSigMessage) Descriptor() ([]byte, []int) {
return fileDescriptor_8c680f0b24d73fe7, []int{4}
}
func (m *AuthSigMessage) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *AuthSigMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_AuthSigMessage.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *AuthSigMessage) XXX_Merge(src proto.Message) {
xxx_messageInfo_AuthSigMessage.Merge(m, src)
}
func (m *AuthSigMessage) XXX_Size() int {
return m.Size()
}
func (m *AuthSigMessage) XXX_DiscardUnknown() {
xxx_messageInfo_AuthSigMessage.DiscardUnknown(m)
}
var xxx_messageInfo_AuthSigMessage proto.InternalMessageInfo
func (m *AuthSigMessage) GetPubKey() keys.PublicKey {
if m != nil {
return m.PubKey
}
return keys.PublicKey{}
}
func (m *AuthSigMessage) GetSig() []byte {
if m != nil {
return m.Sig
}
return nil
}
func init() {
proto.RegisterType((*PacketPing)(nil), "tendermint.proto.p2p.PacketPing")
proto.RegisterType((*PacketPong)(nil), "tendermint.proto.p2p.PacketPong")
proto.RegisterType((*PacketMsg)(nil), "tendermint.proto.p2p.PacketMsg")
proto.RegisterType((*Packet)(nil), "tendermint.proto.p2p.Packet")
proto.RegisterType((*AuthSigMessage)(nil), "tendermint.proto.p2p.AuthSigMessage")
}
func init() { proto.RegisterFile("proto/p2p/conn_msgs.proto", fileDescriptor_8c680f0b24d73fe7) }
var fileDescriptor_8c680f0b24d73fe7 = []byte{
// 324 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x91, 0x41, 0x4b, 0xfb, 0x30,
0x18, 0xc6, 0x9b, 0x7f, 0xff, 0x9b, 0xf4, 0xdd, 0xbc, 0x04, 0x0f, 0x9b, 0x87, 0x6c, 0xec, 0x20,
0x43, 0xa4, 0x85, 0xfa, 0x05, 0x64, 0x9b, 0xe2, 0x0e, 0xc3, 0xd1, 0xa3, 0x97, 0xd2, 0xb5, 0x35,
0x0d, 0xda, 0x24, 0xb4, 0xd9, 0xc1, 0x6f, 0xe1, 0xc7, 0xf2, 0xb8, 0xa3, 0x20, 0x0c, 0xe9, 0xbe,
0x88, 0x2c, 0x99, 0x5b, 0x05, 0xd1, 0xdb, 0xf3, 0x3c, 0xbc, 0xf9, 0xbd, 0x4f, 0x12, 0xe8, 0xca,
0x42, 0x28, 0xe1, 0x49, 0x5f, 0x7a, 0xb1, 0xe0, 0x3c, 0xcc, 0x4b, 0x5a, 0xba, 0x3a, 0xc3, 0x27,
0x2a, 0xe5, 0x49, 0x5a, 0xe4, 0x8c, 0x2b, 0x93, 0xb8, 0xd2, 0x97, 0xa7, 0x67, 0x2a, 0x63, 0x45,
0x12, 0xca, 0xa8, 0x50, 0xcf, 0x9e, 0x39, 0x4c, 0x05, 0x15, 0x07, 0x65, 0x66, 0x07, 0x6d, 0x80,
0x79, 0x14, 0x3f, 0xa6, 0x6a, 0xce, 0x38, 0xad, 0x39, 0xc1, 0xe9, 0x20, 0x03, 0xc7, 0xb8, 0x59,
0x49, 0xf1, 0x05, 0x40, 0x9c, 0x45, 0x9c, 0xa7, 0x4f, 0x21, 0x4b, 0x3a, 0xa8, 0x8f, 0x86, 0x8d,
0xd1, 0x71, 0xb5, 0xee, 0x39, 0x63, 0x93, 0x4e, 0x27, 0x81, 0xb3, 0x1b, 0x98, 0x26, 0xb8, 0x0b,
0x76, 0x2a, 0x1e, 0x3a, 0xff, 0xf4, 0xd8, 0x51, 0xb5, 0xee, 0xd9, 0xd7, 0x77, 0x37, 0xc1, 0x36,
0xc3, 0x18, 0xfe, 0x27, 0x91, 0x8a, 0x3a, 0x76, 0x1f, 0x0d, 0xdb, 0x81, 0xd6, 0x83, 0x77, 0x04,
0x4d, 0xb3, 0x0a, 0x8f, 0xa1, 0x25, 0xb5, 0x0a, 0x25, 0xe3, 0x54, 0x2f, 0x6a, 0xf9, 0x7d, 0xf7,
0xa7, 0x4b, 0xba, 0x87, 0xe6, 0xb7, 0x56, 0x00, 0x72, 0xef, 0xea, 0x10, 0xc1, 0xa9, 0xae, 0xf1,
0x17, 0x44, 0x7c, 0x83, 0x08, 0x4e, 0xf1, 0x15, 0xec, 0xdc, 0xf6, 0xb5, 0x75, 0xdd, 0x96, 0xdf,
0xfb, 0x8d, 0x31, 0x2b, 0xb7, 0x08, 0x47, 0x7e, 0x99, 0x51, 0x03, 0xec, 0x72, 0x99, 0x8f, 0x26,
0xaf, 0x15, 0x41, 0xab, 0x8a, 0xa0, 0x8f, 0x8a, 0xa0, 0x97, 0x0d, 0xb1, 0x56, 0x1b, 0x62, 0xbd,
0x6d, 0x88, 0x75, 0x7f, 0x4e, 0x99, 0xca, 0x96, 0x0b, 0x37, 0x16, 0xb9, 0x77, 0x00, 0xd7, 0xe5,
0xfe, 0xdf, 0x17, 0x4d, 0x2d, 0x2f, 0x3f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xad, 0x8c, 0xf9, 0x97,
0x0b, 0x02, 0x00, 0x00,
// 409 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x92, 0xdf, 0x6a, 0xd4, 0x40,
0x14, 0xc6, 0x13, 0xd3, 0x6e, 0xd9, 0xb3, 0xab, 0xc8, 0xe0, 0xc5, 0xb6, 0x60, 0x76, 0xc9, 0x85,
0x16, 0x91, 0x04, 0xe2, 0x0b, 0x68, 0x5a, 0x8b, 0xa5, 0x2c, 0x2e, 0xf1, 0xce, 0x9b, 0x90, 0x3f,
0xe3, 0x64, 0xdc, 0x66, 0x66, 0xc8, 0x4c, 0x2e, 0xf2, 0x16, 0x3e, 0x56, 0x2f, 0x7b, 0x29, 0x08,
0x8b, 0x64, 0x5f, 0x44, 0x32, 0x13, 0x77, 0x57, 0x14, 0x7b, 0xf7, 0x7d, 0x1f, 0x27, 0xbf, 0x73,
0x4e, 0xce, 0xc0, 0xa9, 0xa8, 0xb9, 0xe2, 0x81, 0x08, 0x45, 0x90, 0x73, 0xc6, 0x92, 0x4a, 0x12,
0xe9, 0xeb, 0x0c, 0x3d, 0x53, 0x98, 0x15, 0xb8, 0xae, 0x28, 0x53, 0x26, 0xf1, 0x45, 0x28, 0xce,
0x5e, 0xa8, 0x92, 0xd6, 0x45, 0x22, 0xd2, 0x5a, 0xb5, 0x81, 0xf9, 0x98, 0x70, 0xc2, 0xf7, 0xca,
0xd4, 0x9e, 0x3d, 0x37, 0x49, 0x5e, 0xb7, 0x42, 0xf1, 0x60, 0x8d, 0x5b, 0x19, 0xa8, 0x56, 0xe0,
0x01, 0xee, 0x4d, 0x01, 0x56, 0x69, 0xbe, 0xc6, 0x6a, 0x45, 0x19, 0x39, 0x70, 0x9c, 0x11, 0xaf,
0x84, 0xb1, 0x71, 0x4b, 0x49, 0xd0, 0x6b, 0x80, 0xbc, 0x4c, 0x19, 0xc3, 0xb7, 0x09, 0x2d, 0x66,
0xf6, 0xc2, 0x3e, 0x3f, 0x8e, 0x1e, 0x77, 0x9b, 0xf9, 0xf8, 0xc2, 0xa4, 0xd7, 0x97, 0xf1, 0x78,
0x28, 0xb8, 0x2e, 0xd0, 0x29, 0x38, 0x98, 0x7f, 0x99, 0x3d, 0xd2, 0x65, 0x27, 0xdd, 0x66, 0xee,
0xbc, 0xff, 0x78, 0x15, 0xf7, 0x19, 0x42, 0x70, 0x54, 0xa4, 0x2a, 0x9d, 0x39, 0x0b, 0xfb, 0x7c,
0x1a, 0x6b, 0xed, 0xfd, 0xb0, 0x61, 0x64, 0x5a, 0xa1, 0x0b, 0x98, 0x08, 0xad, 0x12, 0x41, 0x19,
0xd1, 0x8d, 0x26, 0xe1, 0xc2, 0xff, 0xd7, 0x3f, 0xf0, 0xf7, 0x93, 0x7f, 0xb0, 0x62, 0x10, 0x3b,
0x77, 0x08, 0xe1, 0x8c, 0xe8, 0x31, 0x1e, 0x82, 0xf0, 0x3f, 0x20, 0x9c, 0x11, 0xf4, 0x16, 0x06,
0xd7, 0x1f, 0x43, 0x8f, 0x3b, 0x09, 0xe7, 0xff, 0x63, 0x2c, 0x65, 0x8f, 0x18, 0x8b, 0xdf, 0x26,
0x3a, 0x06, 0x47, 0x36, 0x95, 0xf7, 0x15, 0x9e, 0xbc, 0x6b, 0x54, 0xf9, 0x89, 0x92, 0x25, 0x96,
0x32, 0x25, 0x18, 0x5d, 0xc1, 0x89, 0x68, 0xb2, 0x64, 0x8d, 0xdb, 0x61, 0xc1, 0x97, 0x7f, 0x73,
0xcd, 0xc5, 0xfc, 0xfe, 0x62, 0xfe, 0xaa, 0xc9, 0x6e, 0x69, 0x7e, 0x83, 0xdb, 0xe8, 0xe8, 0x6e,
0x33, 0xb7, 0xe2, 0x91, 0x68, 0xb2, 0x1b, 0xdc, 0xa2, 0xa7, 0xe0, 0x48, 0x6a, 0xf6, 0x9b, 0xc6,
0xbd, 0x8c, 0x2e, 0xef, 0x3a, 0xd7, 0xbe, 0xef, 0x5c, 0xfb, 0x67, 0xe7, 0xda, 0xdf, 0xb6, 0xae,
0x75, 0xbf, 0x75, 0xad, 0xef, 0x5b, 0xd7, 0xfa, 0xfc, 0x8a, 0x50, 0x55, 0x36, 0x99, 0x9f, 0xf3,
0x2a, 0xd8, 0x37, 0x3b, 0x94, 0xbb, 0x27, 0x98, 0x8d, 0xb4, 0x7c, 0xf3, 0x2b, 0x00, 0x00, 0xff,
0xff, 0xd2, 0xaa, 0xb3, 0xc0, 0x96, 0x02, 0x00, 0x00,
}
func (m *PacketPing) Marshal() (dAtA []byte, err error) {
@ -468,6 +527,46 @@ func (m *Packet_PacketMsg) MarshalToSizedBuffer(dAtA []byte) (int, error) {
}
return len(dAtA) - i, nil
}
func (m *AuthSigMessage) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *AuthSigMessage) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *AuthSigMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Sig) > 0 {
i -= len(m.Sig)
copy(dAtA[i:], m.Sig)
i = encodeVarintConnMsgs(dAtA, i, uint64(len(m.Sig)))
i--
dAtA[i] = 0x12
}
{
size, err := m.PubKey.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintConnMsgs(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
return len(dAtA) - i, nil
}
func encodeVarintConnMsgs(dAtA []byte, offset int, v uint64) int {
offset -= sovConnMsgs(v)
base := offset
@ -564,6 +663,20 @@ func (m *Packet_PacketMsg) Size() (n int) {
}
return n
}
func (m *AuthSigMessage) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = m.PubKey.Size()
n += 1 + l + sovConnMsgs(uint64(l))
l = len(m.Sig)
if l > 0 {
n += 1 + l + sovConnMsgs(uint64(l))
}
return n
}
func sovConnMsgs(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
@ -960,6 +1073,126 @@ func (m *Packet) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *AuthSigMessage) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowConnMsgs
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: AuthSigMessage: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: AuthSigMessage: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PubKey", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowConnMsgs
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthConnMsgs
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthConnMsgs
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := m.PubKey.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Sig", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowConnMsgs
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthConnMsgs
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthConnMsgs
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Sig = append(m.Sig[:0], dAtA[iNdEx:postIndex]...)
if m.Sig == nil {
m.Sig = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipConnMsgs(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthConnMsgs
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthConnMsgs
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipConnMsgs(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0


+ 6
- 0
proto/p2p/conn_msgs.proto View File

@ -4,6 +4,7 @@ package tendermint.proto.p2p;
option go_package = "github.com/tendermint/tendermint/proto/p2p";
import "third_party/proto/gogoproto/gogo.proto";
import "proto/crypto/keys/types.proto";
message PacketPing {}
@ -22,3 +23,8 @@ message Packet {
PacketMsg packet_msg = 3;
}
}
message AuthSigMessage {
tendermint.proto.crypto.keys.PublicKey pub_key = 1 [(gogoproto.nullable) = false];
bytes sig = 2;
}

Loading…
Cancel
Save