From 660e72a12f977a9b52659e263d096406b181b2ba Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 9 Jun 2020 18:09:51 +0200 Subject: [PATCH] p2p/conn: migrate to Protobuf (#4990) Migrates the p2p connections to Protobuf. Supersedes #4800. gogoproto's `NewDelimitedReader()` uses an internal buffer, which makes it unsuitable for reading individual messages from a shared reader (since any remaining data in the buffer will be discarded). We therefore add a new `protoio` package with an unbuffered `NewDelimitedReader()`. Additionally, the `NewDelimitedWriter()` returns the number of bytes written, and we've added `MarshalDelimited()` and `UnmarshalDelimited()`, to ease migration of existing code. --- libs/protoio/io.go | 96 ++++++++ libs/protoio/io_test.go | 157 ++++++++++++++ libs/protoio/reader.go | 88 ++++++++ libs/protoio/writer.go | 100 +++++++++ p2p/conn/codec.go | 14 -- p2p/conn/connection.go | 149 +++++++------ p2p/conn/connection_test.go | 145 +++++++------ p2p/conn/evil_secret_connection_test.go | 27 ++- p2p/conn/secret_connection.go | 52 +++-- p2p/conn/secret_connection_test.go | 32 +-- proto/p2p/conn_msgs.pb.go | 277 ++++++++++++++++++++++-- proto/p2p/conn_msgs.proto | 6 + 12 files changed, 926 insertions(+), 217 deletions(-) create mode 100644 libs/protoio/io.go create mode 100644 libs/protoio/io_test.go create mode 100644 libs/protoio/reader.go create mode 100644 libs/protoio/writer.go delete mode 100644 p2p/conn/codec.go diff --git a/libs/protoio/io.go b/libs/protoio/io.go new file mode 100644 index 000000000..91acbb71b --- /dev/null +++ b/libs/protoio/io.go @@ -0,0 +1,96 @@ +// Protocol Buffers for Go with Gadgets +// +// Copyright (c) 2013, The GoGo Authors. All rights reserved. +// http://github.com/gogo/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// Modified to return number of bytes written by Writer.WriteMsg(), and added byteReader. + +package protoio + +import ( + "io" + + "github.com/gogo/protobuf/proto" +) + +type Writer interface { + WriteMsg(proto.Message) (int, error) +} + +type WriteCloser interface { + Writer + io.Closer +} + +type Reader interface { + ReadMsg(msg proto.Message) error +} + +type ReadCloser interface { + Reader + io.Closer +} + +type marshaler interface { + MarshalTo(data []byte) (n int, err error) +} + +func getSize(v interface{}) (int, bool) { + if sz, ok := v.(interface { + Size() (n int) + }); ok { + return sz.Size(), true + } else if sz, ok := v.(interface { + ProtoSize() (n int) + }); ok { + return sz.ProtoSize(), true + } else { + return 0, false + } +} + +// byteReader wraps an io.Reader and implements io.ByteReader. Reading one byte at a +// time is extremely slow, but this is what Amino did already, and the caller can +// wrap the reader in bufio.Reader if appropriate. +type byteReader struct { + io.Reader + bytes []byte +} + +func newByteReader(r io.Reader) *byteReader { + return &byteReader{ + Reader: r, + bytes: make([]byte, 1), + } +} + +func (r *byteReader) ReadByte() (byte, error) { + _, err := r.Read(r.bytes) + if err != nil { + return 0, err + } + return r.bytes[0], nil +} diff --git a/libs/protoio/io_test.go b/libs/protoio/io_test.go new file mode 100644 index 000000000..5ec5627d2 --- /dev/null +++ b/libs/protoio/io_test.go @@ -0,0 +1,157 @@ +// Protocol Buffers for Go with Gadgets +// +// Copyright (c) 2013, The GoGo Authors. All rights reserved. +// http://github.com/gogo/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package protoio_test + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "math/rand" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/test" + + "github.com/tendermint/tendermint/libs/protoio" +) + +func iotest(writer protoio.WriteCloser, reader protoio.ReadCloser) error { + varint := make([]byte, binary.MaxVarintLen64) + size := 1000 + msgs := make([]*test.NinOptNative, size) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := range msgs { + msgs[i] = test.NewPopulatedNinOptNative(r, true) + //issue 31 + if i == 5 { + msgs[i] = &test.NinOptNative{} + } + //issue 31 + if i == 999 { + msgs[i] = &test.NinOptNative{} + } + // FIXME Check size + bz, err := proto.Marshal(msgs[i]) + if err != nil { + return err + } + visize := binary.PutUvarint(varint, uint64(len(bz))) + n, err := writer.WriteMsg(msgs[i]) + if err != nil { + return err + } + if n != len(bz)+visize { + return fmt.Errorf("WriteMsg() wrote %v bytes, expected %v", n, len(bz)+visize) // nolint + } + } + if err := writer.Close(); err != nil { + return err + } + i := 0 + for { + msg := &test.NinOptNative{} + if err := reader.ReadMsg(msg); err != nil { + if err == io.EOF { + break + } + return err + } + if err := msg.VerboseEqual(msgs[i]); err != nil { + return err + } + i++ + } + if i != size { + panic("not enough messages read") + } + if err := reader.Close(); err != nil { + return err + } + return nil +} + +type buffer struct { + *bytes.Buffer + closed bool +} + +func (b *buffer) Close() error { + b.closed = true + return nil +} + +func newBuffer() *buffer { + return &buffer{bytes.NewBuffer(nil), false} +} + +func TestVarintNormal(t *testing.T) { + buf := newBuffer() + writer := protoio.NewDelimitedWriter(buf) + reader := protoio.NewDelimitedReader(buf, 1024*1024) + if err := iotest(writer, reader); err != nil { + t.Error(err) + } + if !buf.closed { + t.Fatalf("did not close buffer") + } +} + +func TestVarintNoClose(t *testing.T) { + buf := bytes.NewBuffer(nil) + writer := protoio.NewDelimitedWriter(buf) + reader := protoio.NewDelimitedReader(buf, 1024*1024) + if err := iotest(writer, reader); err != nil { + t.Error(err) + } +} + +//issue 32 +func TestVarintMaxSize(t *testing.T) { + buf := newBuffer() + writer := protoio.NewDelimitedWriter(buf) + reader := protoio.NewDelimitedReader(buf, 20) + if err := iotest(writer, reader); err == nil { + t.Error(err) + } else { + t.Logf("%s", err) + } +} + +func TestVarintError(t *testing.T) { + buf := newBuffer() + buf.Write([]byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f}) + reader := protoio.NewDelimitedReader(buf, 1024*1024) + msg := &test.NinOptNative{} + err := reader.ReadMsg(msg) + if err == nil { + t.Fatalf("Expected error") + } +} diff --git a/libs/protoio/reader.go b/libs/protoio/reader.go new file mode 100644 index 000000000..15a84899f --- /dev/null +++ b/libs/protoio/reader.go @@ -0,0 +1,88 @@ +// Protocol Buffers for Go with Gadgets +// +// Copyright (c) 2013, The GoGo Authors. All rights reserved. +// http://github.com/gogo/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// Modified from original GoGo Protobuf to not buffer the reader. + +package protoio + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + + "github.com/gogo/protobuf/proto" +) + +// NewDelimitedReader reads varint-delimited Protobuf messages from a reader. Unlike the gogoproto +// NewDelimitedReader, this does not buffer the reader, which may cause poor performance but is +// necessary when only reading single messages (e.g. in the p2p package). +func NewDelimitedReader(r io.Reader, maxSize int) ReadCloser { + var closer io.Closer + if c, ok := r.(io.Closer); ok { + closer = c + } + return &varintReader{newByteReader(r), nil, maxSize, closer} +} + +type varintReader struct { + r *byteReader + buf []byte + maxSize int + closer io.Closer +} + +func (r *varintReader) ReadMsg(msg proto.Message) error { + length64, err := binary.ReadUvarint(newByteReader(r.r)) + if err != nil { + return err + } + length := int(length64) + if length < 0 || length > r.maxSize { + return fmt.Errorf("message exceeds max size (%v > %v)", length, r.maxSize) + } + if len(r.buf) < length { + r.buf = make([]byte, length) + } + buf := r.buf[:length] + if _, err := io.ReadFull(r.r, buf); err != nil { + return err + } + return proto.Unmarshal(buf, msg) +} + +func (r *varintReader) Close() error { + if r.closer != nil { + return r.closer.Close() + } + return nil +} + +func UnmarshalDelimited(data []byte, msg proto.Message) error { + return NewDelimitedReader(bytes.NewReader(data), len(data)).ReadMsg(msg) +} diff --git a/libs/protoio/writer.go b/libs/protoio/writer.go new file mode 100644 index 000000000..d4c66798f --- /dev/null +++ b/libs/protoio/writer.go @@ -0,0 +1,100 @@ +// Protocol Buffers for Go with Gadgets +// +// Copyright (c) 2013, The GoGo Authors. All rights reserved. +// http://github.com/gogo/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// Modified from original GoGo Protobuf to return number of bytes written. + +package protoio + +import ( + "bytes" + "encoding/binary" + "io" + + "github.com/gogo/protobuf/proto" +) + +// NewDelimitedWriter writes a varint-delimited Protobuf message to a writer. It is +// equivalent to the gogoproto NewDelimitedWriter, except WriteMsg() also returns the +// number of bytes written, which is necessary in the p2p package. +func NewDelimitedWriter(w io.Writer) WriteCloser { + return &varintWriter{w, make([]byte, binary.MaxVarintLen64), nil} +} + +type varintWriter struct { + w io.Writer + lenBuf []byte + buffer []byte +} + +func (w *varintWriter) WriteMsg(msg proto.Message) (int, error) { + if m, ok := msg.(marshaler); ok { + n, ok := getSize(m) + if ok { + if n+binary.MaxVarintLen64 >= len(w.buffer) { + w.buffer = make([]byte, n+binary.MaxVarintLen64) + } + lenOff := binary.PutUvarint(w.buffer, uint64(n)) + _, err := m.MarshalTo(w.buffer[lenOff:]) + if err != nil { + return 0, err + } + _, err = w.w.Write(w.buffer[:lenOff+n]) + return lenOff + n, err + } + } + + // fallback + data, err := proto.Marshal(msg) + if err != nil { + return 0, err + } + length := uint64(len(data)) + n := binary.PutUvarint(w.lenBuf, length) + _, err = w.w.Write(w.lenBuf[:n]) + if err != nil { + return 0, err + } + _, err = w.w.Write(data) + return len(data) + n, err +} + +func (w *varintWriter) Close() error { + if closer, ok := w.w.(io.Closer); ok { + return closer.Close() + } + return nil +} + +func MarshalDelimited(msg proto.Message) ([]byte, error) { + var buf bytes.Buffer + _, err := NewDelimitedWriter(&buf).WriteMsg(msg) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} diff --git a/p2p/conn/codec.go b/p2p/conn/codec.go deleted file mode 100644 index 0625c7a38..000000000 --- a/p2p/conn/codec.go +++ /dev/null @@ -1,14 +0,0 @@ -package conn - -import ( - amino "github.com/tendermint/go-amino" - - cryptoamino "github.com/tendermint/tendermint/crypto/encoding/amino" -) - -var cdc *amino.Codec = amino.NewCodec() - -func init() { - cryptoamino.RegisterAmino(cdc) - RegisterPacket(cdc) -} diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index f2e02fbc9..dc53bcd95 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -2,25 +2,26 @@ package conn import ( "bufio" - "runtime/debug" - "errors" "fmt" "io" "math" "net" "reflect" + "runtime/debug" "sync" "sync/atomic" "time" - amino "github.com/tendermint/go-amino" + "github.com/gogo/protobuf/proto" flow "github.com/tendermint/tendermint/libs/flowrate" "github.com/tendermint/tendermint/libs/log" tmmath "github.com/tendermint/tendermint/libs/math" + "github.com/tendermint/tendermint/libs/protoio" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/libs/timer" + tmp2p "github.com/tendermint/tendermint/proto/p2p" ) const ( @@ -66,7 +67,7 @@ There are two methods for sending messages: `Send(chID, msgBytes)` is a blocking call that waits until `msg` is successfully queued for the channel with the given id byte `chID`, or until the -request times out. The message `msg` is serialized using Go-Amino. +request times out. The message `msg` is serialized using Protobuf. `TrySend(chID, msgBytes)` is a nonblocking call that returns false if the channel's queue is full. @@ -418,9 +419,11 @@ func (c *MConnection) CanSend(chID byte) bool { func (c *MConnection) sendRoutine() { defer c._recover() + protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter) + FOR_LOOP: for { - var _n int64 + var _n int var err error SELECTION: select { @@ -434,11 +437,12 @@ FOR_LOOP: } case <-c.pingTimer.C: c.Logger.Debug("Send Ping") - _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{}) + _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{})) if err != nil { + c.Logger.Error("Failed to send PacketPing", "err", err) break SELECTION } - c.sendMonitor.Update(int(_n)) + c.sendMonitor.Update(_n) c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout) c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() { select { @@ -456,11 +460,12 @@ FOR_LOOP: } case <-c.pong: c.Logger.Debug("Send Pong") - _n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPong{}) + _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{})) if err != nil { + c.Logger.Error("Failed to send PacketPong", "err", err) break SELECTION } - c.sendMonitor.Update(int(_n)) + c.sendMonitor.Update(_n) c.flush() case <-c.quitSendRoutine: break FOR_LOOP @@ -540,7 +545,7 @@ func (c *MConnection) sendPacketMsg() bool { c.stopForError(err) return true } - c.sendMonitor.Update(int(_n)) + c.sendMonitor.Update(_n) c.flushTimer.Set() return false } @@ -552,6 +557,8 @@ func (c *MConnection) sendPacketMsg() bool { func (c *MConnection) recvRoutine() { defer c._recover() + protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize) + FOR_LOOP: for { // Block until .recvMonitor says we can read. @@ -572,12 +579,9 @@ FOR_LOOP: */ // Read packet type - var packet Packet - var _n int64 - var err error - _n, err = cdc.UnmarshalBinaryLengthPrefixedReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize)) - c.recvMonitor.Update(int(_n)) + var packet tmp2p.Packet + err := protoReader.ReadMsg(&packet) if err != nil { // stopServices was invoked and we are shutting down // receiving is excpected to fail since we will close the connection @@ -599,8 +603,8 @@ FOR_LOOP: } // Read more depending on packet type. - switch pkt := packet.(type) { - case PacketPing: + switch pkt := packet.Sum.(type) { + case *tmp2p.Packet_PacketPing: // TODO: prevent abuse, as they cause flush()'s. // https://github.com/tendermint/tendermint/issues/1190 c.Logger.Debug("Receive Ping") @@ -609,23 +613,23 @@ FOR_LOOP: default: // never block } - case PacketPong: + case *tmp2p.Packet_PacketPong: c.Logger.Debug("Receive Pong") select { case c.pongTimeoutCh <- false: default: // never block } - case PacketMsg: - channel, ok := c.channelsIdx[pkt.ChannelID] + case *tmp2p.Packet_PacketMsg: + channel, ok := c.channelsIdx[byte(pkt.PacketMsg.ChannelID)] if !ok || channel == nil { - err := fmt.Errorf("unknown channel %X", pkt.ChannelID) + err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID) c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) break FOR_LOOP } - msgBytes, err := channel.recvPacketMsg(pkt) + msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg) if err != nil { if c.IsRunning() { c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err) @@ -634,9 +638,9 @@ FOR_LOOP: break FOR_LOOP } if msgBytes != nil { - c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes)) + c.Logger.Debug("Received bytes", "chID", pkt.PacketMsg.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes)) // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine - c.onReceive(pkt.ChannelID, msgBytes) + c.onReceive(byte(pkt.PacketMsg.ChannelID), msgBytes) } default: err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet)) @@ -661,14 +665,17 @@ func (c *MConnection) stopPongTimer() { } } -// maxPacketMsgSize returns a maximum size of PacketMsg, including the overhead -// of amino encoding. +// maxPacketMsgSize returns a maximum size of PacketMsg func (c *MConnection) maxPacketMsgSize() int { - return len(cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{ + bz, err := proto.Marshal(mustWrapPacket(&tmp2p.PacketMsg{ ChannelID: 0x01, EOF: 1, - Bytes: make([]byte, c.config.MaxPacketMsgPayloadSize), - })) + 10 // leave room for changes in amino + Data: make([]byte, c.config.MaxPacketMsgPayloadSize), + })) + if err != nil { + panic(err) + } + return len(bz) } type ConnectionStatus struct { @@ -814,17 +821,16 @@ func (ch *Channel) isSendPending() bool { // Creates a new PacketMsg to send. // Not goroutine-safe -func (ch *Channel) nextPacketMsg() PacketMsg { - packet := PacketMsg{} - packet.ChannelID = ch.desc.ID +func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg { + packet := tmp2p.PacketMsg{ChannelID: int32(ch.desc.ID)} maxSize := ch.maxPacketMsgPayloadSize - packet.Bytes = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))] + packet.Data = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))] if len(ch.sending) <= maxSize { - packet.EOF = byte(0x01) + packet.EOF = 0x01 ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize } else { - packet.EOF = byte(0x00) + packet.EOF = 0x00 ch.sending = ch.sending[tmmath.MinInt(maxSize, len(ch.sending)):] } return packet @@ -832,24 +838,24 @@ func (ch *Channel) nextPacketMsg() PacketMsg { // Writes next PacketMsg to w and updates c.recentlySent. // Not goroutine-safe -func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) { - var packet = ch.nextPacketMsg() - n, err = cdc.MarshalBinaryLengthPrefixedWriter(w, packet) - atomic.AddInt64(&ch.recentlySent, n) +func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) { + packet := ch.nextPacketMsg() + n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet)) + atomic.AddInt64(&ch.recentlySent, int64(n)) return } // Handles incoming PacketMsgs. It returns a message bytes if message is // complete. NOTE message bytes may change on next call to recvPacketMsg. // Not goroutine-safe -func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) { +func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet) - var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Bytes) + var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data) if recvCap < recvReceived { return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived) } - ch.recving = append(ch.recving, packet.Bytes...) - if packet.EOF == byte(0x01) { + ch.recving = append(ch.recving, packet.Data...) + if packet.EOF == 0x01 { msgBytes := ch.recving // clear the slice without re-allocating. @@ -873,33 +879,34 @@ func (ch *Channel) updateStats() { //---------------------------------------- // Packet -type Packet interface { - AssertIsPacket() -} - -func RegisterPacket(cdc *amino.Codec) { - cdc.RegisterInterface((*Packet)(nil), nil) - cdc.RegisterConcrete(PacketPing{}, "tendermint/p2p/PacketPing", nil) - cdc.RegisterConcrete(PacketPong{}, "tendermint/p2p/PacketPong", nil) - cdc.RegisterConcrete(PacketMsg{}, "tendermint/p2p/PacketMsg", nil) -} - -func (PacketPing) AssertIsPacket() {} -func (PacketPong) AssertIsPacket() {} -func (PacketMsg) AssertIsPacket() {} - -type PacketPing struct { -} - -type PacketPong struct { -} - -type PacketMsg struct { - ChannelID byte - EOF byte // 1 means message ends here. - Bytes []byte -} +// mustWrapPacket takes a packet kind (oneof) and wraps it in a tmp2p.Packet message. +func mustWrapPacket(pb proto.Message) *tmp2p.Packet { + var msg tmp2p.Packet + + switch pb := pb.(type) { + case *tmp2p.Packet: // already a packet + msg = *pb + case *tmp2p.PacketPing: + msg = tmp2p.Packet{ + Sum: &tmp2p.Packet_PacketPing{ + PacketPing: pb, + }, + } + case *tmp2p.PacketPong: + msg = tmp2p.Packet{ + Sum: &tmp2p.Packet_PacketPong{ + PacketPong: pb, + }, + } + case *tmp2p.PacketMsg: + msg = tmp2p.Packet{ + Sum: &tmp2p.Packet_PacketMsg{ + PacketMsg: pb, + }, + } + default: + panic(fmt.Errorf("unknown packet type %T", pb)) + } -func (mp PacketMsg) String() string { - return fmt.Sprintf("PacketMsg{%X:%X T:%X}", mp.ChannelID, mp.Bytes, mp.EOF) + return &msg } diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 29d29fc6e..f5fe45bde 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -1,7 +1,6 @@ package conn import ( - "bytes" "net" "testing" "time" @@ -10,9 +9,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - amino "github.com/tendermint/go-amino" - "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/protoio" + tmp2p "github.com/tendermint/tendermint/proto/p2p" + "github.com/tendermint/tendermint/proto/types" ) const maxPingPongPacketSize = 1024 // bytes @@ -54,12 +54,12 @@ func TestMConnectionSendFlushStop(t *testing.T) { msg := []byte("abc") assert.True(t, clientConn.Send(0x01, msg)) - aminoMsgLength := 14 + msgLength := 14 // start the reader in a new routine, so we can flush errCh := make(chan error) go func() { - msgB := make([]byte, aminoMsgLength) + msgB := make([]byte, msgLength) _, err := server.Read(msgB) if err != nil { t.Error(err) @@ -182,9 +182,9 @@ func TestMConnectionPongTimeoutResultsInError(t *testing.T) { serverGotPing := make(chan struct{}) go func() { // read ping - var pkt PacketPing - _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize) - assert.Nil(t, err) + var pkt tmp2p.Packet + err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&pkt) + require.NoError(t, err) serverGotPing <- struct{}{} }() <-serverGotPing @@ -219,26 +219,28 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) { defer mconn.Stop() // sending 3 pongs in a row (abuse) - _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{})) - require.Nil(t, err) - _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{})) - require.Nil(t, err) - _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{})) - require.Nil(t, err) + protoWriter := protoio.NewDelimitedWriter(server) + + _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{})) + require.NoError(t, err) + + _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{})) + require.NoError(t, err) + + _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{})) + require.NoError(t, err) serverGotPing := make(chan struct{}) go func() { // read ping (one byte) - var ( - packet Packet - err error - ) - _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &packet, maxPingPongPacketSize) - require.Nil(t, err) + var packet tmp2p.Packet + err := protoio.NewDelimitedReader(server, maxPingPongPacketSize).ReadMsg(&packet) + require.NoError(t, err) serverGotPing <- struct{}{} + // respond with pong - _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{})) - require.Nil(t, err) + _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{})) + require.NoError(t, err) }() <-serverGotPing @@ -273,19 +275,27 @@ func TestMConnectionMultiplePings(t *testing.T) { // sending 3 pings in a row (abuse) // see https://github.com/tendermint/tendermint/issues/1190 - _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{})) - require.Nil(t, err) - var pkt PacketPong - _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize) - require.Nil(t, err) - _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{})) - require.Nil(t, err) - _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize) - require.Nil(t, err) - _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPing{})) - require.Nil(t, err) - _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize) - require.Nil(t, err) + protoReader := protoio.NewDelimitedReader(server, maxPingPongPacketSize) + protoWriter := protoio.NewDelimitedWriter(server) + var pkt tmp2p.Packet + + _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{})) + require.NoError(t, err) + + err = protoReader.ReadMsg(&pkt) + require.NoError(t, err) + + _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{})) + require.NoError(t, err) + + err = protoReader.ReadMsg(&pkt) + require.NoError(t, err) + + _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{})) + require.NoError(t, err) + + err = protoReader.ReadMsg(&pkt) + require.NoError(t, err) assert.True(t, mconn.IsRunning()) } @@ -314,25 +324,32 @@ func TestMConnectionPingPongs(t *testing.T) { serverGotPing := make(chan struct{}) go func() { + protoReader := protoio.NewDelimitedReader(server, maxPingPongPacketSize) + protoWriter := protoio.NewDelimitedWriter(server) + var pkt tmp2p.PacketPing + // read ping - var pkt PacketPing - _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize) - require.Nil(t, err) + err = protoReader.ReadMsg(&pkt) + require.NoError(t, err) serverGotPing <- struct{}{} + // respond with pong - _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{})) - require.Nil(t, err) + _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{})) + require.NoError(t, err) time.Sleep(mconn.config.PingInterval) // read ping - _, err = cdc.UnmarshalBinaryLengthPrefixedReader(server, &pkt, maxPingPongPacketSize) - require.Nil(t, err) + err = protoReader.ReadMsg(&pkt) + require.NoError(t, err) + serverGotPing <- struct{}{} + // respond with pong - _, err = server.Write(cdc.MustMarshalBinaryLengthPrefixed(PacketPong{})) - require.Nil(t, err) + _, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{})) + require.NoError(t, err) }() <-serverGotPing + <-serverGotPing pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 2 select { @@ -425,13 +442,9 @@ func TestMConnectionReadErrorBadEncoding(t *testing.T) { client := mconnClient.conn - // send badly encoded msgPacket - bz := cdc.MustMarshalBinaryLengthPrefixed(PacketMsg{}) - bz[4] += 0x01 // Invalid prefix bytes. - // Write it. - _, err := client.Write(bz) - assert.Nil(t, err) + _, err := client.Write([]byte{1, 2, 3, 4, 5}) + require.NoError(t, err) assert.True(t, expectSend(chOnErr), "badly encoded msgPacket") } @@ -465,32 +478,28 @@ func TestMConnectionReadErrorLongMessage(t *testing.T) { } client := mconnClient.conn + protoWriter := protoio.NewDelimitedWriter(client) // send msg thats just right - var err error - var buf = new(bytes.Buffer) - var packet = PacketMsg{ + var packet = tmp2p.PacketMsg{ ChannelID: 0x01, EOF: 1, - Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize), + Data: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize), } - _, err = cdc.MarshalBinaryLengthPrefixedWriter(buf, packet) - assert.Nil(t, err) - _, err = client.Write(buf.Bytes()) - assert.Nil(t, err) + + _, err := protoWriter.WriteMsg(mustWrapPacket(&packet)) + require.NoError(t, err) assert.True(t, expectSend(chOnRcv), "msg just right") // send msg thats too long - buf = new(bytes.Buffer) - packet = PacketMsg{ + packet = tmp2p.PacketMsg{ ChannelID: 0x01, EOF: 1, - Bytes: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+100), + Data: make([]byte, mconnClient.config.MaxPacketMsgPayloadSize+100), } - _, err = cdc.MarshalBinaryLengthPrefixedWriter(buf, packet) - assert.Nil(t, err) - _, err = client.Write(buf.Bytes()) - assert.NotNil(t, err) + + _, err = protoWriter.WriteMsg(mustWrapPacket(&packet)) + require.Error(t, err) assert.True(t, expectSend(chOnErr), "msg too long") } @@ -501,10 +510,8 @@ func TestMConnectionReadErrorUnknownMsgType(t *testing.T) { defer mconnServer.Stop() // send msg with unknown msg type - err := amino.EncodeUvarint(mconnClient.conn, 4) - assert.Nil(t, err) - _, err = mconnClient.conn.Write([]byte{0xFF, 0xFF, 0xFF, 0xFF}) - assert.Nil(t, err) + _, err := protoio.NewDelimitedWriter(mconnClient.conn).WriteMsg(&types.Header{ChainID: "x"}) + require.NoError(t, err) assert.True(t, expectSend(chOnErr), "unknown msg type") } diff --git a/p2p/conn/evil_secret_connection_test.go b/p2p/conn/evil_secret_connection_test.go index 1f662ee2a..993b07f5a 100644 --- a/p2p/conn/evil_secret_connection_test.go +++ b/p2p/conn/evil_secret_connection_test.go @@ -6,12 +6,16 @@ import ( "io" "testing" + gogotypes "github.com/gogo/protobuf/types" "github.com/gtank/merlin" "github.com/stretchr/testify/assert" "golang.org/x/crypto/chacha20poly1305" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/ed25519" + cryptoenc "github.com/tendermint/tendermint/crypto/encoding" + "github.com/tendermint/tendermint/libs/protoio" + tmp2p "github.com/tendermint/tendermint/proto/p2p" ) type buffer struct { @@ -80,14 +84,15 @@ func (c *evilConn) Read(data []byte) (n int, err error) { switch c.readStep { case 0: if !c.badEphKey { - bz, err := cdc.MarshalBinaryLengthPrefixed(c.locEphPub) + lc := *c.locEphPub + bz, err := protoio.MarshalDelimited(&gogotypes.BytesValue{Value: lc[:]}) if err != nil { panic(err) } copy(data, bz[c.readOffset:]) n = len(data) } else { - bz, err := cdc.MarshalBinaryLengthPrefixed([]byte("drop users;")) + bz, err := protoio.MarshalDelimited(&gogotypes.BytesValue{Value: []byte("drop users;")}) if err != nil { panic(err) } @@ -108,7 +113,11 @@ func (c *evilConn) Read(data []byte) (n int, err error) { case 1: signature := c.signChallenge() if !c.badAuthSignature { - bz, err := cdc.MarshalBinaryLengthPrefixed(authSigMessage{c.privKey.PubKey(), signature}) + pkpb, err := cryptoenc.PubKeyToProto(c.privKey.PubKey()) + if err != nil { + panic(err) + } + bz, err := protoio.MarshalDelimited(&tmp2p.AuthSigMessage{PubKey: pkpb, Sig: signature}) if err != nil { panic(err) } @@ -121,7 +130,7 @@ func (c *evilConn) Read(data []byte) (n int, err error) { } copy(data, c.buffer.Bytes()[c.readOffset:]) } else { - bz, err := cdc.MarshalBinaryLengthPrefixed([]byte("select * from users;")) + bz, err := protoio.MarshalDelimited(&gogotypes.BytesValue{Value: []byte("select * from users;")}) if err != nil { panic(err) } @@ -144,10 +153,16 @@ func (c *evilConn) Read(data []byte) (n int, err error) { func (c *evilConn) Write(data []byte) (n int, err error) { switch c.writeStep { case 0: - err := cdc.UnmarshalBinaryLengthPrefixed(data, c.remEphPub) + var ( + bytes gogotypes.BytesValue + remEphPub [32]byte + ) + err := protoio.UnmarshalDelimited(data, &bytes) if err != nil { panic(err) } + copy(remEphPub[:], bytes.Value) + c.remEphPub = &remEphPub c.writeStep = 1 if !c.shareAuthSignature { c.writeStep = 2 @@ -235,7 +250,7 @@ func TestMakeSecretConnection(t *testing.T) { errMsg string }{ {"refuse to share ethimeral key", newEvilConn(false, false, false, false), "EOF"}, - {"share bad ethimeral key", newEvilConn(true, true, false, false), "Insufficient bytes to decode"}, + {"share bad ethimeral key", newEvilConn(true, true, false, false), "wrong wireType"}, {"refuse to share auth signature", newEvilConn(true, false, false, false), "EOF"}, {"share bad auth signature", newEvilConn(true, false, true, true), "failed to decrypt SecretConnection"}, {"all good", newEvilConn(true, false, true, false), ""}, diff --git a/p2p/conn/secret_connection.go b/p2p/conn/secret_connection.go index 5d48fa1c6..4d3ea3491 100644 --- a/p2p/conn/secret_connection.go +++ b/p2p/conn/secret_connection.go @@ -14,6 +14,7 @@ import ( "sync" "time" + gogotypes "github.com/gogo/protobuf/types" "github.com/gtank/merlin" pool "github.com/libp2p/go-buffer-pool" "golang.org/x/crypto/chacha20poly1305" @@ -23,7 +24,10 @@ import ( "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/ed25519" + cryptoenc "github.com/tendermint/tendermint/crypto/encoding" "github.com/tendermint/tendermint/libs/async" + "github.com/tendermint/tendermint/libs/protoio" + tmp2p "github.com/tendermint/tendermint/proto/p2p" ) // 4 + 1024 == 1028 total frame size @@ -250,7 +254,7 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) { defer pool.Put(frame) _, err = sc.recvAead.Open(frame[:0], sc.recvNonce[:], sealedFrame, nil) if err != nil { - return n, errors.New("failed to decrypt SecretConnection") + return n, fmt.Errorf("failed to decrypt SecretConnection: %w", err) } incrNonce(sc.recvNonce) // end decryption @@ -300,18 +304,22 @@ func shareEphPubKey(conn io.ReadWriter, locEphPub *[32]byte) (remEphPub *[32]byt // Send our pubkey and receive theirs in tandem. var trs, _ = async.Parallel( func(_ int) (val interface{}, abort bool, err error) { - var _, err1 = cdc.MarshalBinaryLengthPrefixedWriter(conn, locEphPub) - if err1 != nil { - return nil, true, err1 // abort + lc := *locEphPub + _, err = protoio.NewDelimitedWriter(conn).WriteMsg(&gogotypes.BytesValue{Value: lc[:]}) + if err != nil { + return nil, true, err // abort } return nil, false, nil }, func(_ int) (val interface{}, abort bool, err error) { - var _remEphPub [32]byte - var _, err2 = cdc.UnmarshalBinaryLengthPrefixedReader(conn, &_remEphPub, 1024*1024) // TODO - if err2 != nil { - return nil, true, err2 // abort + var bytes gogotypes.BytesValue + err = protoio.NewDelimitedReader(conn, 1024*1024).ReadMsg(&bytes) + if err != nil { + return nil, true, err // abort } + + var _remEphPub [32]byte + copy(_remEphPub[:], bytes.Value) return _remEphPub, false, nil }, ) @@ -399,17 +407,31 @@ func shareAuthSignature(sc io.ReadWriter, pubKey crypto.PubKey, signature []byte // Send our info and receive theirs in tandem. var trs, _ = async.Parallel( func(_ int) (val interface{}, abort bool, err error) { - var _, err1 = cdc.MarshalBinaryLengthPrefixedWriter(sc, authSigMessage{pubKey, signature}) - if err1 != nil { - return nil, true, err1 // abort + pbpk, err := cryptoenc.PubKeyToProto(pubKey) + if err != nil { + return nil, true, err + } + _, err = protoio.NewDelimitedWriter(sc).WriteMsg(&tmp2p.AuthSigMessage{PubKey: pbpk, Sig: signature}) + if err != nil { + return nil, true, err // abort } return nil, false, nil }, func(_ int) (val interface{}, abort bool, err error) { - var _recvMsg authSigMessage - var _, err2 = cdc.UnmarshalBinaryLengthPrefixedReader(sc, &_recvMsg, 1024*1024) // TODO - if err2 != nil { - return nil, true, err2 // abort + var pba tmp2p.AuthSigMessage + err = protoio.NewDelimitedReader(sc, 1024*1024).ReadMsg(&pba) + if err != nil { + return nil, true, err // abort + } + + pk, err := cryptoenc.PubKeyFromProto(pba.PubKey) + if err != nil { + return nil, true, err // abort + } + + _recvMsg := authSigMessage{ + Key: pk, + Sig: pba.Sig, } return _recvMsg, false, nil }, diff --git a/p2p/conn/secret_connection_test.go b/p2p/conn/secret_connection_test.go index 0edf2e243..9ded86a15 100644 --- a/p2p/conn/secret_connection_test.go +++ b/p2p/conn/secret_connection_test.go @@ -257,38 +257,30 @@ func TestDeriveSecretsAndChallengeGolden(t *testing.T) { func TestNilPubkey(t *testing.T) { var fooConn, barConn = makeKVStoreConnPair() + defer fooConn.Close() + defer barConn.Close() var fooPrvKey = ed25519.GenPrivKey() var barPrvKey = privKeyWithNilPubKey{ed25519.GenPrivKey()} - go func() { - _, err := MakeSecretConnection(barConn, barPrvKey) - assert.NoError(t, err) - }() + go MakeSecretConnection(fooConn, fooPrvKey) - assert.NotPanics(t, func() { - _, err := MakeSecretConnection(fooConn, fooPrvKey) - if assert.Error(t, err) { - assert.Equal(t, "expected ed25519 pubkey, got ", err.Error()) - } - }) + _, err := MakeSecretConnection(barConn, barPrvKey) + require.Error(t, err) + assert.Equal(t, "toproto: key type is not supported", err.Error()) } func TestNonEd25519Pubkey(t *testing.T) { var fooConn, barConn = makeKVStoreConnPair() + defer fooConn.Close() + defer barConn.Close() var fooPrvKey = ed25519.GenPrivKey() var barPrvKey = secp256k1.GenPrivKey() - go func() { - _, err := MakeSecretConnection(barConn, barPrvKey) - assert.NoError(t, err) - }() + go MakeSecretConnection(fooConn, fooPrvKey) - assert.NotPanics(t, func() { - _, err := MakeSecretConnection(fooConn, fooPrvKey) - if assert.Error(t, err) { - assert.Equal(t, "expected ed25519 pubkey, got secp256k1.PubKey", err.Error()) - } - }) + _, err := MakeSecretConnection(barConn, barPrvKey) + require.Error(t, err) + assert.Contains(t, err.Error(), "is not supported") } func writeLots(t *testing.T, wg *sync.WaitGroup, conn io.Writer, txt string, n int) { diff --git a/proto/p2p/conn_msgs.pb.go b/proto/p2p/conn_msgs.pb.go index bd0ab6358..86afe9dc7 100644 --- a/proto/p2p/conn_msgs.pb.go +++ b/proto/p2p/conn_msgs.pb.go @@ -7,6 +7,7 @@ import ( fmt "fmt" _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" + keys "github.com/tendermint/tendermint/proto/crypto/keys" io "io" math "math" math_bits "math/bits" @@ -253,38 +254,96 @@ func (*Packet) XXX_OneofWrappers() []interface{} { } } +type AuthSigMessage struct { + PubKey keys.PublicKey `protobuf:"bytes,1,opt,name=pub_key,json=pubKey,proto3" json:"pub_key"` + Sig []byte `protobuf:"bytes,2,opt,name=sig,proto3" json:"sig,omitempty"` +} + +func (m *AuthSigMessage) Reset() { *m = AuthSigMessage{} } +func (m *AuthSigMessage) String() string { return proto.CompactTextString(m) } +func (*AuthSigMessage) ProtoMessage() {} +func (*AuthSigMessage) Descriptor() ([]byte, []int) { + return fileDescriptor_8c680f0b24d73fe7, []int{4} +} +func (m *AuthSigMessage) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *AuthSigMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_AuthSigMessage.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *AuthSigMessage) XXX_Merge(src proto.Message) { + xxx_messageInfo_AuthSigMessage.Merge(m, src) +} +func (m *AuthSigMessage) XXX_Size() int { + return m.Size() +} +func (m *AuthSigMessage) XXX_DiscardUnknown() { + xxx_messageInfo_AuthSigMessage.DiscardUnknown(m) +} + +var xxx_messageInfo_AuthSigMessage proto.InternalMessageInfo + +func (m *AuthSigMessage) GetPubKey() keys.PublicKey { + if m != nil { + return m.PubKey + } + return keys.PublicKey{} +} + +func (m *AuthSigMessage) GetSig() []byte { + if m != nil { + return m.Sig + } + return nil +} + func init() { proto.RegisterType((*PacketPing)(nil), "tendermint.proto.p2p.PacketPing") proto.RegisterType((*PacketPong)(nil), "tendermint.proto.p2p.PacketPong") proto.RegisterType((*PacketMsg)(nil), "tendermint.proto.p2p.PacketMsg") proto.RegisterType((*Packet)(nil), "tendermint.proto.p2p.Packet") + proto.RegisterType((*AuthSigMessage)(nil), "tendermint.proto.p2p.AuthSigMessage") } func init() { proto.RegisterFile("proto/p2p/conn_msgs.proto", fileDescriptor_8c680f0b24d73fe7) } var fileDescriptor_8c680f0b24d73fe7 = []byte{ - // 324 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x91, 0x41, 0x4b, 0xfb, 0x30, - 0x18, 0xc6, 0x9b, 0x7f, 0xff, 0x9b, 0xf4, 0xdd, 0xbc, 0x04, 0x0f, 0x9b, 0x87, 0x6c, 0xec, 0x20, - 0x43, 0xa4, 0x85, 0xfa, 0x05, 0x64, 0x9b, 0xe2, 0x0e, 0xc3, 0xd1, 0xa3, 0x97, 0xd2, 0xb5, 0x35, - 0x0d, 0xda, 0x24, 0xb4, 0xd9, 0xc1, 0x6f, 0xe1, 0xc7, 0xf2, 0xb8, 0xa3, 0x20, 0x0c, 0xe9, 0xbe, - 0x88, 0x2c, 0x99, 0x5b, 0x05, 0xd1, 0xdb, 0xf3, 0x3c, 0xbc, 0xf9, 0xbd, 0x4f, 0x12, 0xe8, 0xca, - 0x42, 0x28, 0xe1, 0x49, 0x5f, 0x7a, 0xb1, 0xe0, 0x3c, 0xcc, 0x4b, 0x5a, 0xba, 0x3a, 0xc3, 0x27, - 0x2a, 0xe5, 0x49, 0x5a, 0xe4, 0x8c, 0x2b, 0x93, 0xb8, 0xd2, 0x97, 0xa7, 0x67, 0x2a, 0x63, 0x45, - 0x12, 0xca, 0xa8, 0x50, 0xcf, 0x9e, 0x39, 0x4c, 0x05, 0x15, 0x07, 0x65, 0x66, 0x07, 0x6d, 0x80, - 0x79, 0x14, 0x3f, 0xa6, 0x6a, 0xce, 0x38, 0xad, 0x39, 0xc1, 0xe9, 0x20, 0x03, 0xc7, 0xb8, 0x59, - 0x49, 0xf1, 0x05, 0x40, 0x9c, 0x45, 0x9c, 0xa7, 0x4f, 0x21, 0x4b, 0x3a, 0xa8, 0x8f, 0x86, 0x8d, - 0xd1, 0x71, 0xb5, 0xee, 0x39, 0x63, 0x93, 0x4e, 0x27, 0x81, 0xb3, 0x1b, 0x98, 0x26, 0xb8, 0x0b, - 0x76, 0x2a, 0x1e, 0x3a, 0xff, 0xf4, 0xd8, 0x51, 0xb5, 0xee, 0xd9, 0xd7, 0x77, 0x37, 0xc1, 0x36, - 0xc3, 0x18, 0xfe, 0x27, 0x91, 0x8a, 0x3a, 0x76, 0x1f, 0x0d, 0xdb, 0x81, 0xd6, 0x83, 0x77, 0x04, - 0x4d, 0xb3, 0x0a, 0x8f, 0xa1, 0x25, 0xb5, 0x0a, 0x25, 0xe3, 0x54, 0x2f, 0x6a, 0xf9, 0x7d, 0xf7, - 0xa7, 0x4b, 0xba, 0x87, 0xe6, 0xb7, 0x56, 0x00, 0x72, 0xef, 0xea, 0x10, 0xc1, 0xa9, 0xae, 0xf1, - 0x17, 0x44, 0x7c, 0x83, 0x08, 0x4e, 0xf1, 0x15, 0xec, 0xdc, 0xf6, 0xb5, 0x75, 0xdd, 0x96, 0xdf, - 0xfb, 0x8d, 0x31, 0x2b, 0xb7, 0x08, 0x47, 0x7e, 0x99, 0x51, 0x03, 0xec, 0x72, 0x99, 0x8f, 0x26, - 0xaf, 0x15, 0x41, 0xab, 0x8a, 0xa0, 0x8f, 0x8a, 0xa0, 0x97, 0x0d, 0xb1, 0x56, 0x1b, 0x62, 0xbd, - 0x6d, 0x88, 0x75, 0x7f, 0x4e, 0x99, 0xca, 0x96, 0x0b, 0x37, 0x16, 0xb9, 0x77, 0x00, 0xd7, 0xe5, - 0xfe, 0xdf, 0x17, 0x4d, 0x2d, 0x2f, 0x3f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xad, 0x8c, 0xf9, 0x97, - 0x0b, 0x02, 0x00, 0x00, + // 409 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x92, 0xdf, 0x6a, 0xd4, 0x40, + 0x14, 0xc6, 0x13, 0xd3, 0x6e, 0xd9, 0xb3, 0xab, 0xc8, 0xe0, 0xc5, 0xb6, 0x60, 0x76, 0xc9, 0x85, + 0x16, 0x91, 0x04, 0xe2, 0x0b, 0x68, 0x5a, 0x8b, 0xa5, 0x2c, 0x2e, 0xf1, 0xce, 0x9b, 0x90, 0x3f, + 0xe3, 0x64, 0xdc, 0x66, 0x66, 0xc8, 0x4c, 0x2e, 0xf2, 0x16, 0x3e, 0x56, 0x2f, 0x7b, 0x29, 0x08, + 0x8b, 0x64, 0x5f, 0x44, 0x32, 0x13, 0x77, 0x57, 0x14, 0x7b, 0xf7, 0x7d, 0x1f, 0x27, 0xbf, 0x73, + 0x4e, 0xce, 0xc0, 0xa9, 0xa8, 0xb9, 0xe2, 0x81, 0x08, 0x45, 0x90, 0x73, 0xc6, 0x92, 0x4a, 0x12, + 0xe9, 0xeb, 0x0c, 0x3d, 0x53, 0x98, 0x15, 0xb8, 0xae, 0x28, 0x53, 0x26, 0xf1, 0x45, 0x28, 0xce, + 0x5e, 0xa8, 0x92, 0xd6, 0x45, 0x22, 0xd2, 0x5a, 0xb5, 0x81, 0xf9, 0x98, 0x70, 0xc2, 0xf7, 0xca, + 0xd4, 0x9e, 0x3d, 0x37, 0x49, 0x5e, 0xb7, 0x42, 0xf1, 0x60, 0x8d, 0x5b, 0x19, 0xa8, 0x56, 0xe0, + 0x01, 0xee, 0x4d, 0x01, 0x56, 0x69, 0xbe, 0xc6, 0x6a, 0x45, 0x19, 0x39, 0x70, 0x9c, 0x11, 0xaf, + 0x84, 0xb1, 0x71, 0x4b, 0x49, 0xd0, 0x6b, 0x80, 0xbc, 0x4c, 0x19, 0xc3, 0xb7, 0x09, 0x2d, 0x66, + 0xf6, 0xc2, 0x3e, 0x3f, 0x8e, 0x1e, 0x77, 0x9b, 0xf9, 0xf8, 0xc2, 0xa4, 0xd7, 0x97, 0xf1, 0x78, + 0x28, 0xb8, 0x2e, 0xd0, 0x29, 0x38, 0x98, 0x7f, 0x99, 0x3d, 0xd2, 0x65, 0x27, 0xdd, 0x66, 0xee, + 0xbc, 0xff, 0x78, 0x15, 0xf7, 0x19, 0x42, 0x70, 0x54, 0xa4, 0x2a, 0x9d, 0x39, 0x0b, 0xfb, 0x7c, + 0x1a, 0x6b, 0xed, 0xfd, 0xb0, 0x61, 0x64, 0x5a, 0xa1, 0x0b, 0x98, 0x08, 0xad, 0x12, 0x41, 0x19, + 0xd1, 0x8d, 0x26, 0xe1, 0xc2, 0xff, 0xd7, 0x3f, 0xf0, 0xf7, 0x93, 0x7f, 0xb0, 0x62, 0x10, 0x3b, + 0x77, 0x08, 0xe1, 0x8c, 0xe8, 0x31, 0x1e, 0x82, 0xf0, 0x3f, 0x20, 0x9c, 0x11, 0xf4, 0x16, 0x06, + 0xd7, 0x1f, 0x43, 0x8f, 0x3b, 0x09, 0xe7, 0xff, 0x63, 0x2c, 0x65, 0x8f, 0x18, 0x8b, 0xdf, 0x26, + 0x3a, 0x06, 0x47, 0x36, 0x95, 0xf7, 0x15, 0x9e, 0xbc, 0x6b, 0x54, 0xf9, 0x89, 0x92, 0x25, 0x96, + 0x32, 0x25, 0x18, 0x5d, 0xc1, 0x89, 0x68, 0xb2, 0x64, 0x8d, 0xdb, 0x61, 0xc1, 0x97, 0x7f, 0x73, + 0xcd, 0xc5, 0xfc, 0xfe, 0x62, 0xfe, 0xaa, 0xc9, 0x6e, 0x69, 0x7e, 0x83, 0xdb, 0xe8, 0xe8, 0x6e, + 0x33, 0xb7, 0xe2, 0x91, 0x68, 0xb2, 0x1b, 0xdc, 0xa2, 0xa7, 0xe0, 0x48, 0x6a, 0xf6, 0x9b, 0xc6, + 0xbd, 0x8c, 0x2e, 0xef, 0x3a, 0xd7, 0xbe, 0xef, 0x5c, 0xfb, 0x67, 0xe7, 0xda, 0xdf, 0xb6, 0xae, + 0x75, 0xbf, 0x75, 0xad, 0xef, 0x5b, 0xd7, 0xfa, 0xfc, 0x8a, 0x50, 0x55, 0x36, 0x99, 0x9f, 0xf3, + 0x2a, 0xd8, 0x37, 0x3b, 0x94, 0xbb, 0x27, 0x98, 0x8d, 0xb4, 0x7c, 0xf3, 0x2b, 0x00, 0x00, 0xff, + 0xff, 0xd2, 0xaa, 0xb3, 0xc0, 0x96, 0x02, 0x00, 0x00, } func (m *PacketPing) Marshal() (dAtA []byte, err error) { @@ -468,6 +527,46 @@ func (m *Packet_PacketMsg) MarshalToSizedBuffer(dAtA []byte) (int, error) { } return len(dAtA) - i, nil } +func (m *AuthSigMessage) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AuthSigMessage) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *AuthSigMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Sig) > 0 { + i -= len(m.Sig) + copy(dAtA[i:], m.Sig) + i = encodeVarintConnMsgs(dAtA, i, uint64(len(m.Sig))) + i-- + dAtA[i] = 0x12 + } + { + size, err := m.PubKey.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintConnMsgs(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} + func encodeVarintConnMsgs(dAtA []byte, offset int, v uint64) int { offset -= sovConnMsgs(v) base := offset @@ -564,6 +663,20 @@ func (m *Packet_PacketMsg) Size() (n int) { } return n } +func (m *AuthSigMessage) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = m.PubKey.Size() + n += 1 + l + sovConnMsgs(uint64(l)) + l = len(m.Sig) + if l > 0 { + n += 1 + l + sovConnMsgs(uint64(l)) + } + return n +} func sovConnMsgs(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 @@ -960,6 +1073,126 @@ func (m *Packet) Unmarshal(dAtA []byte) error { } return nil } +func (m *AuthSigMessage) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowConnMsgs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AuthSigMessage: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AuthSigMessage: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PubKey", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowConnMsgs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthConnMsgs + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthConnMsgs + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.PubKey.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Sig", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowConnMsgs + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthConnMsgs + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthConnMsgs + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Sig = append(m.Sig[:0], dAtA[iNdEx:postIndex]...) + if m.Sig == nil { + m.Sig = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipConnMsgs(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthConnMsgs + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthConnMsgs + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipConnMsgs(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/proto/p2p/conn_msgs.proto b/proto/p2p/conn_msgs.proto index 6c362d7ad..c501ca4f0 100644 --- a/proto/p2p/conn_msgs.proto +++ b/proto/p2p/conn_msgs.proto @@ -4,6 +4,7 @@ package tendermint.proto.p2p; option go_package = "github.com/tendermint/tendermint/proto/p2p"; import "third_party/proto/gogoproto/gogo.proto"; +import "proto/crypto/keys/types.proto"; message PacketPing {} @@ -22,3 +23,8 @@ message Packet { PacketMsg packet_msg = 3; } } + +message AuthSigMessage { + tendermint.proto.crypto.keys.PublicKey pub_key = 1 [(gogoproto.nullable) = false]; + bytes sig = 2; +}