Browse Source

Merge pull request #815 from tendermint/p2p-readme-tests

P2P: readme and tests
pull/821/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
e8e512f1fa
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 197 additions and 27 deletions
  1. +45
    -2
      p2p/README.md
  2. +13
    -4
      p2p/connection.go
  3. +138
    -9
      p2p/connection_test.go
  4. +1
    -1
      p2p/pex_reactor.go
  5. +0
    -11
      p2p/switch.go

+ 45
- 2
p2p/README.md View File

@ -4,9 +4,9 @@
`tendermint/tendermint/p2p` provides an abstraction around peer-to-peer communication.<br/> `tendermint/tendermint/p2p` provides an abstraction around peer-to-peer communication.<br/>
## Peer/MConnection/Channel
## MConnection
Each peer has one `MConnection` (multiplex connection) instance.
`MConnection` is a multiplex connection:
__multiplex__ *noun* a system or signal involving simultaneous transmission of __multiplex__ *noun* a system or signal involving simultaneous transmission of
several messages along a single channel of communication. several messages along a single channel of communication.
@ -16,6 +16,43 @@ Each `MConnection` handles message transmission on multiple abstract communicati
The byte id and the relative priorities of each `Channel` are configured upon The byte id and the relative priorities of each `Channel` are configured upon
initialization of the connection. initialization of the connection.
The `MConnection` supports three packet types: Ping, Pong, and Msg.
### Ping and Pong
The ping and pong messages consist of writing a single byte to the connection; 0x1 and 0x2, respectively
When we haven't received any messages on an `MConnection` in a time `pingTimeout`, we send a ping message.
When a ping is received on the `MConnection`, a pong is sent in response.
If a pong is not received in sufficient time, the peer's score should be decremented (TODO).
### Msg
Messages in channels are chopped into smaller msgPackets for multiplexing.
```
type msgPacket struct {
ChannelID byte
EOF byte // 1 means message ends here.
Bytes []byte
}
```
The msgPacket is serialized using go-wire, and prefixed with a 0x3.
The received `Bytes` of a sequential set of packets are appended together
until a packet with `EOF=1` is received, at which point the complete serialized message
is returned for processing by the corresponding channels `onReceive` function.
### Multiplexing
Messages are sent from a single `sendRoutine`, which loops over a select statement that results in the sending
of a ping, a pong, or a batch of data messages. The batch of data messages may include messages from multiple channels.
Message bytes are queued for sending in their respective channel, with each channel holding one unsent message at a time.
Messages are chosen for a batch one a time from the channel with the lowest ratio of recently sent bytes to channel priority.
## Sending Messages
There are two methods for sending messages: There are two methods for sending messages:
```go ```go
func (m MConnection) Send(chID byte, msg interface{}) bool {} func (m MConnection) Send(chID byte, msg interface{}) bool {}
@ -31,6 +68,12 @@ queue is full.
`Send()` and `TrySend()` are also exposed for each `Peer`. `Send()` and `TrySend()` are also exposed for each `Peer`.
## Peer
Each peer has one `MConnection` instance, and includes other information such as whether the connection
was outbound, whether the connection should be recreated if it closes, various identity information about the node,
and other higher level thread-safe data used by the reactors.
## Switch/Reactor ## Switch/Reactor
The `Switch` handles peer connections and exposes an API to receive incoming messages The `Switch` handles peer connections and exposes an API to receive incoming messages


+ 13
- 4
p2p/connection.go View File

@ -459,8 +459,11 @@ FOR_LOOP:
} }
channel, ok := c.channelsIdx[pkt.ChannelID] channel, ok := c.channelsIdx[pkt.ChannelID]
if !ok || channel == nil { if !ok || channel == nil {
cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
err := fmt.Errorf("Unknown channel %X", pkt.ChannelID)
c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
} }
msgBytes, err := channel.recvMsgPacket(pkt) msgBytes, err := channel.recvMsgPacket(pkt)
if err != nil { if err != nil {
if c.IsRunning() { if c.IsRunning() {
@ -475,7 +478,9 @@ FOR_LOOP:
c.onReceive(pkt.ChannelID, msgBytes) c.onReceive(pkt.ChannelID, msgBytes)
} }
default: default:
cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
err := fmt.Errorf("Unknown message type %X", pktType)
c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
} }
// TODO: shouldn't this go in the sendRoutine? // TODO: shouldn't this go in the sendRoutine?
@ -648,14 +653,18 @@ func (ch *Channel) nextMsgPacket() msgPacket {
func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) { func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
packet := ch.nextMsgPacket() packet := ch.nextMsgPacket()
// log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet) // log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
wire.WriteByte(packetTypeMsg, w, &n, &err)
wire.WriteBinary(packet, w, &n, &err)
writeMsgPacketTo(packet, w, &n, &err)
if err == nil { if err == nil {
ch.recentlySent += int64(n) ch.recentlySent += int64(n)
} }
return return
} }
func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) {
wire.WriteByte(packetTypeMsg, w, n, err)
wire.WriteBinary(packet, w, n, err)
}
// Handles incoming msgPackets. Returns a msg bytes if msg is complete. // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
// Not goroutine-safe // Not goroutine-safe
func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {


+ 138
- 9
p2p/connection_test.go View File

@ -1,4 +1,4 @@
package p2p_test
package p2p
import ( import (
"net" "net"
@ -7,11 +7,11 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
p2p "github.com/tendermint/tendermint/p2p"
wire "github.com/tendermint/go-wire"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
) )
func createMConnection(conn net.Conn) *p2p.MConnection {
func createTestMConnection(conn net.Conn) *MConnection {
onReceive := func(chID byte, msgBytes []byte) { onReceive := func(chID byte, msgBytes []byte) {
} }
onError := func(r interface{}) { onError := func(r interface{}) {
@ -21,9 +21,9 @@ func createMConnection(conn net.Conn) *p2p.MConnection {
return c return c
} }
func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection {
chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
c := p2p.NewMConnection(conn, chDescs, onReceive, onError)
func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection {
chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
c := NewMConnection(conn, chDescs, onReceive, onError)
c.SetLogger(log.TestingLogger()) c.SetLogger(log.TestingLogger())
return c return c
} }
@ -35,7 +35,7 @@ func TestMConnectionSend(t *testing.T) {
defer server.Close() defer server.Close()
defer client.Close() defer client.Close()
mconn := createMConnection(client)
mconn := createTestMConnection(client)
_, err := mconn.Start() _, err := mconn.Start()
require.Nil(err) require.Nil(err)
defer mconn.Stop() defer mconn.Stop()
@ -75,7 +75,7 @@ func TestMConnectionReceive(t *testing.T) {
require.Nil(err) require.Nil(err)
defer mconn1.Stop() defer mconn1.Stop()
mconn2 := createMConnection(server)
mconn2 := createTestMConnection(server)
_, err = mconn2.Start() _, err = mconn2.Start()
require.Nil(err) require.Nil(err)
defer mconn2.Stop() defer mconn2.Stop()
@ -100,7 +100,7 @@ func TestMConnectionStatus(t *testing.T) {
defer server.Close() defer server.Close()
defer client.Close() defer client.Close()
mconn := createMConnection(client)
mconn := createTestMConnection(client)
_, err := mconn.Start() _, err := mconn.Start()
require.Nil(err) require.Nil(err)
defer mconn.Stop() defer mconn.Stop()
@ -142,3 +142,132 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) {
t.Fatal("Did not receive error in 500ms") t.Fatal("Did not receive error in 500ms")
} }
} }
func newClientAndServerConnsForReadErrors(require *require.Assertions, chOnErr chan struct{}) (*MConnection, *MConnection) {
server, client := net.Pipe()
onReceive := func(chID byte, msgBytes []byte) {}
onError := func(r interface{}) {}
// create client conn with two channels
chDescs := []*ChannelDescriptor{
{ID: 0x01, Priority: 1, SendQueueCapacity: 1},
{ID: 0x02, Priority: 1, SendQueueCapacity: 1},
}
mconnClient := NewMConnection(client, chDescs, onReceive, onError)
mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
_, err := mconnClient.Start()
require.Nil(err)
// create server conn with 1 channel
// it fires on chOnErr when there's an error
serverLogger := log.TestingLogger().With("module", "server")
onError = func(r interface{}) {
chOnErr <- struct{}{}
}
mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
mconnServer.SetLogger(serverLogger)
_, err = mconnServer.Start()
require.Nil(err)
return mconnClient, mconnServer
}
func expectSend(ch chan struct{}) bool {
after := time.After(time.Second * 5)
select {
case <-ch:
return true
case <-after:
return false
}
}
func TestMConnectionReadErrorBadEncoding(t *testing.T) {
assert, require := assert.New(t), require.New(t)
chOnErr := make(chan struct{})
mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr)
defer mconnClient.Stop()
defer mconnServer.Stop()
client := mconnClient.conn
msg := "Ant-Man"
// send badly encoded msgPacket
var n int
var err error
wire.WriteByte(packetTypeMsg, client, &n, &err)
wire.WriteByteSlice([]byte(msg), client, &n, &err)
assert.True(expectSend(chOnErr), "badly encoded msgPacket")
}
func TestMConnectionReadErrorUnknownChannel(t *testing.T) {
assert, require := assert.New(t), require.New(t)
chOnErr := make(chan struct{})
mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr)
defer mconnClient.Stop()
defer mconnServer.Stop()
msg := "Ant-Man"
// fail to send msg on channel unknown by client
assert.False(mconnClient.Send(0x03, msg))
// send msg on channel unknown by the server.
// should cause an error
assert.True(mconnClient.Send(0x02, msg))
assert.True(expectSend(chOnErr), "unknown channel")
}
func TestMConnectionReadErrorLongMessage(t *testing.T) {
assert, require := assert.New(t), require.New(t)
chOnErr := make(chan struct{})
chOnRcv := make(chan struct{})
mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr)
defer mconnClient.Stop()
defer mconnServer.Stop()
mconnServer.onReceive = func(chID byte, msgBytes []byte) {
chOnRcv <- struct{}{}
}
client := mconnClient.conn
// send msg thats just right
var n int
var err error
packet := msgPacket{
ChannelID: 0x01,
Bytes: make([]byte, mconnClient.config.maxMsgPacketTotalSize()-5),
EOF: 1,
}
writeMsgPacketTo(packet, client, &n, &err)
assert.True(expectSend(chOnRcv), "msg just right")
// send msg thats too long
packet = msgPacket{
ChannelID: 0x01,
Bytes: make([]byte, mconnClient.config.maxMsgPacketTotalSize()-4),
EOF: 1,
}
writeMsgPacketTo(packet, client, &n, &err)
assert.True(expectSend(chOnErr), "msg too long")
}
func TestMConnectionReadErrorUnknownMsgType(t *testing.T) {
assert, require := assert.New(t), require.New(t)
chOnErr := make(chan struct{})
mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr)
defer mconnClient.Stop()
defer mconnServer.Stop()
// send msg with unknown msg type
var n int
var err error
wire.WriteByte(0x04, mconnClient.conn, &n, &err)
assert.True(expectSend(chOnErr), "unknown msg type")
}

+ 1
- 1
p2p/pex_reactor.go View File

@ -143,7 +143,7 @@ func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
r.SendAddrs(src, r.book.GetSelection()) r.SendAddrs(src, r.book.GetSelection())
case *pexAddrsMessage: case *pexAddrsMessage:
// We received some peer addresses from src. // We received some peer addresses from src.
// (We don't want to get spammed with bad peers)
// TODO: (We don't want to get spammed with bad peers)
for _, addr := range msg.Addrs { for _, addr := range msg.Addrs {
if addr != nil { if addr != nil {
r.book.AddAddress(addr, srcAddr) r.book.AddAddress(addr, srcAddr)


+ 0
- 11
p2p/switch.go View File

@ -481,17 +481,6 @@ func (sw *Switch) listenerRoutine(l Listener) {
// cleanup // cleanup
} }
//-----------------------------------------------------------------------------
type SwitchEventNewPeer struct {
Peer Peer
}
type SwitchEventDonePeer struct {
Peer Peer
Error interface{}
}
//------------------------------------------------------------------ //------------------------------------------------------------------
// Connects switches via arbitrary net.Conn. Used for testing. // Connects switches via arbitrary net.Conn. Used for testing.


Loading…
Cancel
Save