Browse Source

p2p/pex: migrate to Protobuf (#4973)

## Description

migrate p2p/pex to protobuf

Closes: #XXX
pull/4968/head
Marko 4 years ago
committed by GitHub
parent
commit
3b256ccb05
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 104 additions and 58 deletions
  1. +1
    -0
      CHANGELOG_PENDING.md
  2. +51
    -0
      p2p/netaddress.go
  3. +0
    -11
      p2p/pex/codec.go
  4. +46
    -39
      p2p/pex/pex_reactor.go
  5. +6
    -8
      p2p/pex/pex_reactor_test.go

+ 1
- 0
CHANGELOG_PENDING.md View File

@ -35,6 +35,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [state] \#4679 `TxResult` is a Protobuf type defined in `abci` types directory
- [state] \#4679 `state` reactor migration to Protobuf encoding
- [evidence] \#4959 Add json tags to `DuplicateVoteEvidence`
- [p2p/pex] \#4973 `p2p/pex` reactor migration to Protobuf encoding
- [light] \#4964 `light` reactor migration to Protobuf encoding
- [store] \#4778 Transition store module to protobuf encoding
- `BlockStoreStateJSON` is now `BlockStoreState` and is encoded as binary in the database


+ 51
- 0
p2p/netaddress.go View File

@ -13,6 +13,8 @@ import (
"strconv"
"strings"
"time"
tmp2p "github.com/tendermint/tendermint/proto/p2p"
)
// NetAddress defines information about a peer on the network
@ -137,6 +139,55 @@ func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress {
}
}
// NetAddressFromProto converts a Protobuf NetAddress into a native struct.
func NetAddressFromProto(pb tmp2p.NetAddress) (*NetAddress, error) {
ip := net.ParseIP(pb.IP)
if ip == nil {
return nil, fmt.Errorf("invalid IP address %v", pb.IP)
}
if pb.Port >= 1<<16 {
return nil, fmt.Errorf("invalid port number %v", pb.Port)
}
return &NetAddress{
ID: ID(pb.ID),
IP: ip,
Port: uint16(pb.Port),
}, nil
}
// NetAddressesFromProto converts a slice of Protobuf NetAddresses into a native slice.
func NetAddressesFromProto(pbs []tmp2p.NetAddress) ([]*NetAddress, error) {
nas := make([]*NetAddress, 0, len(pbs))
for _, pb := range pbs {
na, err := NetAddressFromProto(pb)
if err != nil {
return nil, err
}
nas = append(nas, na)
}
return nas, nil
}
// NetAddressesToProto converts a slice of NetAddresses into a Protobuf slice.
func NetAddressesToProto(nas []*NetAddress) []tmp2p.NetAddress {
pbs := make([]tmp2p.NetAddress, 0, len(nas))
for _, na := range nas {
if na != nil {
pbs = append(pbs, na.ToProto())
}
}
return pbs
}
// ToProto converts a NetAddress to Protobuf.
func (na *NetAddress) ToProto() tmp2p.NetAddress {
return tmp2p.NetAddress{
ID: string(na.ID),
IP: na.IP.String(),
Port: uint32(na.Port),
}
}
// Equals reports whether na and other are the same addresses,
// including their ID, IP, and Port.
func (na *NetAddress) Equals(other interface{}) bool {


+ 0
- 11
p2p/pex/codec.go View File

@ -1,11 +0,0 @@
package pex
import (
amino "github.com/tendermint/go-amino"
)
var cdc *amino.Codec = amino.NewCodec()
func init() {
RegisterMessages(cdc)
}

+ 46
- 39
p2p/pex/pex_reactor.go View File

@ -3,11 +3,10 @@ package pex
import (
"errors"
"fmt"
"reflect"
"sync"
"time"
"github.com/tendermint/go-amino"
"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/libs/cmap"
tmmath "github.com/tendermint/tendermint/libs/math"
@ -15,6 +14,7 @@ import (
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/conn"
tmp2p "github.com/tendermint/tendermint/proto/p2p"
)
type Peer = p2p.Peer
@ -244,7 +244,7 @@ func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) {
r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg)
switch msg := msg.(type) {
case *pexRequestMessage:
case *tmp2p.PexRequest:
// NOTE: this is a prime candidate for amplification attacks,
// so it's important we
@ -281,17 +281,25 @@ func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) {
r.SendAddrs(src, r.book.GetSelection())
}
case *pexAddrsMessage:
case *tmp2p.PexAddrs:
// If we asked for addresses, add them to the book
if err := r.ReceiveAddrs(msg.Addrs, src); err != nil {
addrs, err := p2p.NetAddressesFromProto(msg.Addrs)
if err != nil {
r.Switch.StopPeerForError(src, err)
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
return
}
err = r.ReceiveAddrs(addrs, src)
if err != nil {
r.Switch.StopPeerForError(src, err)
if err == ErrUnsolicitedList {
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
}
return
}
default:
r.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
r.Logger.Error(fmt.Sprintf("Unknown message type %T", msg))
}
}
@ -338,7 +346,7 @@ func (r *Reactor) RequestAddrs(p Peer) {
}
r.Logger.Debug("Request addrs", "from", p)
r.requestsSent.Set(id, struct{}{})
p.Send(PexChannel, cdc.MustMarshalBinaryBare(&pexRequestMessage{}))
p.Send(PexChannel, mustEncode(&tmp2p.PexRequest{}))
}
// ReceiveAddrs adds the given addrs to the addrbook if theres an open
@ -397,7 +405,7 @@ func (r *Reactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
// SendAddrs sends addrs to the peer.
func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
p.Send(PexChannel, cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: netAddrs}))
p.Send(PexChannel, mustEncode(&tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)}))
}
// SetEnsurePeersPeriod sets period to ensure peers connected.
@ -759,38 +767,37 @@ func markAddrInBookBasedOnErr(addr *p2p.NetAddress, book AddrBook, err error) {
//-----------------------------------------------------------------------------
// Messages
// Message is a primary type for PEX messages. Underneath, it could contain
// either pexRequestMessage, or pexAddrsMessage messages.
type Message interface{}
func RegisterMessages(cdc *amino.Codec) {
cdc.RegisterInterface((*Message)(nil), nil)
cdc.RegisterConcrete(&pexRequestMessage{}, "tendermint/p2p/PexRequestMessage", nil)
cdc.RegisterConcrete(&pexAddrsMessage{}, "tendermint/p2p/PexAddrsMessage", nil)
}
func decodeMsg(bz []byte) (msg Message, err error) {
err = cdc.UnmarshalBinaryBare(bz, &msg)
return
}
/*
A pexRequestMessage requests additional peer addresses.
*/
type pexRequestMessage struct {
}
func (m *pexRequestMessage) String() string {
return "[pexRequest]"
}
// mustEncode proto encodes a tmp2p.Message
func mustEncode(pb proto.Message) []byte {
msg := tmp2p.Message{}
switch pb := pb.(type) {
case *tmp2p.PexRequest:
msg.Sum = &tmp2p.Message_PexRequest{PexRequest: pb}
case *tmp2p.PexAddrs:
msg.Sum = &tmp2p.Message_PexAddrs{PexAddrs: pb}
default:
panic(fmt.Sprintf("Unknown message type %T", pb))
}
/*
A message with announced peer addresses.
*/
type pexAddrsMessage struct {
Addrs []*p2p.NetAddress
bz, err := proto.Marshal(&msg)
if err != nil {
panic(fmt.Errorf("unable to marshal %T: %w", pb, err))
}
return bz
}
func (m *pexAddrsMessage) String() string {
return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
func decodeMsg(bz []byte) (proto.Message, error) {
pb := &tmp2p.Message{}
err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}
switch msg := pb.Sum.(type) {
case *tmp2p.Message_PexRequest:
return msg.PexRequest, nil
case *tmp2p.Message_PexAddrs:
return msg.PexAddrs, nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

+ 6
- 8
p2p/pex/pex_reactor_test.go View File

@ -15,6 +15,7 @@ import (
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/mock"
tmp2p "github.com/tendermint/tendermint/proto/p2p"
)
var (
@ -127,12 +128,11 @@ func TestPEXReactorReceive(t *testing.T) {
r.RequestAddrs(peer)
size := book.Size()
addrs := []*p2p.NetAddress{peer.SocketAddr()}
msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
r.Receive(PexChannel, peer, msg)
assert.Equal(t, size+1, book.Size())
msg = cdc.MustMarshalBinaryBare(&pexRequestMessage{})
msg = mustEncode(&tmp2p.PexRequest{})
r.Receive(PexChannel, peer, msg) // should not panic.
}
@ -151,7 +151,7 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
require.True(t, book.HasAddress(peerAddr))
id := string(peer.ID())
msg := cdc.MustMarshalBinaryBare(&pexRequestMessage{})
msg := mustEncode(&tmp2p.PexRequest{})
// first time creates the entry
r.Receive(PexChannel, peer, msg)
@ -188,8 +188,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
assert.True(t, r.requestsSent.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
addrs := []*p2p.NetAddress{peer.SocketAddr()}
msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
// receive some addrs. should clear the request
r.Receive(PexChannel, peer, msg)
@ -481,8 +480,7 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
pexR.RequestAddrs(peer)
size := book.Size()
addrs := []*p2p.NetAddress{peer.SocketAddr()}
msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
pexR.Receive(PexChannel, peer, msg)
assert.Equal(t, size, book.Size())


Loading…
Cancel
Save