diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 76b951255..5abe0ac62 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -35,6 +35,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [state] \#4679 `TxResult` is a Protobuf type defined in `abci` types directory - [state] \#4679 `state` reactor migration to Protobuf encoding - [evidence] \#4959 Add json tags to `DuplicateVoteEvidence` + - [p2p/pex] \#4973 `p2p/pex` reactor migration to Protobuf encoding - [light] \#4964 `light` reactor migration to Protobuf encoding - [store] \#4778 Transition store module to protobuf encoding - `BlockStoreStateJSON` is now `BlockStoreState` and is encoded as binary in the database diff --git a/p2p/netaddress.go b/p2p/netaddress.go index 299696056..abc828915 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -13,6 +13,8 @@ import ( "strconv" "strings" "time" + + tmp2p "github.com/tendermint/tendermint/proto/p2p" ) // NetAddress defines information about a peer on the network @@ -137,6 +139,55 @@ func NewNetAddressIPPort(ip net.IP, port uint16) *NetAddress { } } +// NetAddressFromProto converts a Protobuf NetAddress into a native struct. +func NetAddressFromProto(pb tmp2p.NetAddress) (*NetAddress, error) { + ip := net.ParseIP(pb.IP) + if ip == nil { + return nil, fmt.Errorf("invalid IP address %v", pb.IP) + } + if pb.Port >= 1<<16 { + return nil, fmt.Errorf("invalid port number %v", pb.Port) + } + return &NetAddress{ + ID: ID(pb.ID), + IP: ip, + Port: uint16(pb.Port), + }, nil +} + +// NetAddressesFromProto converts a slice of Protobuf NetAddresses into a native slice. +func NetAddressesFromProto(pbs []tmp2p.NetAddress) ([]*NetAddress, error) { + nas := make([]*NetAddress, 0, len(pbs)) + for _, pb := range pbs { + na, err := NetAddressFromProto(pb) + if err != nil { + return nil, err + } + nas = append(nas, na) + } + return nas, nil +} + +// NetAddressesToProto converts a slice of NetAddresses into a Protobuf slice. +func NetAddressesToProto(nas []*NetAddress) []tmp2p.NetAddress { + pbs := make([]tmp2p.NetAddress, 0, len(nas)) + for _, na := range nas { + if na != nil { + pbs = append(pbs, na.ToProto()) + } + } + return pbs +} + +// ToProto converts a NetAddress to Protobuf. +func (na *NetAddress) ToProto() tmp2p.NetAddress { + return tmp2p.NetAddress{ + ID: string(na.ID), + IP: na.IP.String(), + Port: uint32(na.Port), + } +} + // Equals reports whether na and other are the same addresses, // including their ID, IP, and Port. func (na *NetAddress) Equals(other interface{}) bool { diff --git a/p2p/pex/codec.go b/p2p/pex/codec.go deleted file mode 100644 index 79ab56380..000000000 --- a/p2p/pex/codec.go +++ /dev/null @@ -1,11 +0,0 @@ -package pex - -import ( - amino "github.com/tendermint/go-amino" -) - -var cdc *amino.Codec = amino.NewCodec() - -func init() { - RegisterMessages(cdc) -} diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 26f714772..cb0879c11 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -3,11 +3,10 @@ package pex import ( "errors" "fmt" - "reflect" "sync" "time" - "github.com/tendermint/go-amino" + "github.com/gogo/protobuf/proto" "github.com/tendermint/tendermint/libs/cmap" tmmath "github.com/tendermint/tendermint/libs/math" @@ -15,6 +14,7 @@ import ( "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p/conn" + tmp2p "github.com/tendermint/tendermint/proto/p2p" ) type Peer = p2p.Peer @@ -244,7 +244,7 @@ func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) { r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg) switch msg := msg.(type) { - case *pexRequestMessage: + case *tmp2p.PexRequest: // NOTE: this is a prime candidate for amplification attacks, // so it's important we @@ -281,17 +281,25 @@ func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) { r.SendAddrs(src, r.book.GetSelection()) } - case *pexAddrsMessage: + case *tmp2p.PexAddrs: // If we asked for addresses, add them to the book - if err := r.ReceiveAddrs(msg.Addrs, src); err != nil { + addrs, err := p2p.NetAddressesFromProto(msg.Addrs) + if err != nil { + r.Switch.StopPeerForError(src, err) + r.book.MarkBad(src.SocketAddr(), defaultBanTime) + return + } + err = r.ReceiveAddrs(addrs, src) + if err != nil { r.Switch.StopPeerForError(src, err) if err == ErrUnsolicitedList { r.book.MarkBad(src.SocketAddr(), defaultBanTime) } return } + default: - r.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + r.Logger.Error(fmt.Sprintf("Unknown message type %T", msg)) } } @@ -338,7 +346,7 @@ func (r *Reactor) RequestAddrs(p Peer) { } r.Logger.Debug("Request addrs", "from", p) r.requestsSent.Set(id, struct{}{}) - p.Send(PexChannel, cdc.MustMarshalBinaryBare(&pexRequestMessage{})) + p.Send(PexChannel, mustEncode(&tmp2p.PexRequest{})) } // ReceiveAddrs adds the given addrs to the addrbook if theres an open @@ -397,7 +405,7 @@ func (r *Reactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error { // SendAddrs sends addrs to the peer. func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) { - p.Send(PexChannel, cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: netAddrs})) + p.Send(PexChannel, mustEncode(&tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)})) } // SetEnsurePeersPeriod sets period to ensure peers connected. @@ -759,38 +767,37 @@ func markAddrInBookBasedOnErr(addr *p2p.NetAddress, book AddrBook, err error) { //----------------------------------------------------------------------------- // Messages -// Message is a primary type for PEX messages. Underneath, it could contain -// either pexRequestMessage, or pexAddrsMessage messages. -type Message interface{} - -func RegisterMessages(cdc *amino.Codec) { - cdc.RegisterInterface((*Message)(nil), nil) - cdc.RegisterConcrete(&pexRequestMessage{}, "tendermint/p2p/PexRequestMessage", nil) - cdc.RegisterConcrete(&pexAddrsMessage{}, "tendermint/p2p/PexAddrsMessage", nil) -} - -func decodeMsg(bz []byte) (msg Message, err error) { - err = cdc.UnmarshalBinaryBare(bz, &msg) - return -} - -/* -A pexRequestMessage requests additional peer addresses. -*/ -type pexRequestMessage struct { -} - -func (m *pexRequestMessage) String() string { - return "[pexRequest]" -} +// mustEncode proto encodes a tmp2p.Message +func mustEncode(pb proto.Message) []byte { + msg := tmp2p.Message{} + switch pb := pb.(type) { + case *tmp2p.PexRequest: + msg.Sum = &tmp2p.Message_PexRequest{PexRequest: pb} + case *tmp2p.PexAddrs: + msg.Sum = &tmp2p.Message_PexAddrs{PexAddrs: pb} + default: + panic(fmt.Sprintf("Unknown message type %T", pb)) + } -/* -A message with announced peer addresses. -*/ -type pexAddrsMessage struct { - Addrs []*p2p.NetAddress + bz, err := proto.Marshal(&msg) + if err != nil { + panic(fmt.Errorf("unable to marshal %T: %w", pb, err)) + } + return bz } -func (m *pexAddrsMessage) String() string { - return fmt.Sprintf("[pexAddrs %v]", m.Addrs) +func decodeMsg(bz []byte) (proto.Message, error) { + pb := &tmp2p.Message{} + err := proto.Unmarshal(bz, pb) + if err != nil { + return nil, err + } + switch msg := pb.Sum.(type) { + case *tmp2p.Message_PexRequest: + return msg.PexRequest, nil + case *tmp2p.Message_PexAddrs: + return msg.PexAddrs, nil + default: + return nil, fmt.Errorf("unknown message: %T", msg) + } } diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 04f4149eb..80d6c7175 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -15,6 +15,7 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p/mock" + tmp2p "github.com/tendermint/tendermint/proto/p2p" ) var ( @@ -127,12 +128,11 @@ func TestPEXReactorReceive(t *testing.T) { r.RequestAddrs(peer) size := book.Size() - addrs := []*p2p.NetAddress{peer.SocketAddr()} - msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs}) + msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}) r.Receive(PexChannel, peer, msg) assert.Equal(t, size+1, book.Size()) - msg = cdc.MustMarshalBinaryBare(&pexRequestMessage{}) + msg = mustEncode(&tmp2p.PexRequest{}) r.Receive(PexChannel, peer, msg) // should not panic. } @@ -151,7 +151,7 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) { require.True(t, book.HasAddress(peerAddr)) id := string(peer.ID()) - msg := cdc.MustMarshalBinaryBare(&pexRequestMessage{}) + msg := mustEncode(&tmp2p.PexRequest{}) // first time creates the entry r.Receive(PexChannel, peer, msg) @@ -188,8 +188,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) { assert.True(t, r.requestsSent.Has(id)) assert.True(t, sw.Peers().Has(peer.ID())) - addrs := []*p2p.NetAddress{peer.SocketAddr()} - msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs}) + msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}) // receive some addrs. should clear the request r.Receive(PexChannel, peer, msg) @@ -481,8 +480,7 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) { pexR.RequestAddrs(peer) size := book.Size() - addrs := []*p2p.NetAddress{peer.SocketAddr()} - msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs}) + msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}) pexR.Receive(PexChannel, peer, msg) assert.Equal(t, size, book.Size())