diff --git a/p2p/README.md b/p2p/README.md index bf0a5c4d0..d653b2caf 100644 --- a/p2p/README.md +++ b/p2p/README.md @@ -4,9 +4,9 @@ `tendermint/tendermint/p2p` provides an abstraction around peer-to-peer communication.
-## Peer/MConnection/Channel +## MConnection -Each peer has one `MConnection` (multiplex connection) instance. +`MConnection` is a multiplex connection: __multiplex__ *noun* a system or signal involving simultaneous transmission of several messages along a single channel of communication. @@ -16,6 +16,43 @@ Each `MConnection` handles message transmission on multiple abstract communicati The byte id and the relative priorities of each `Channel` are configured upon initialization of the connection. +The `MConnection` supports three packet types: Ping, Pong, and Msg. + +### Ping and Pong + +The ping and pong messages consist of writing a single byte to the connection; 0x1 and 0x2, respectively + +When we haven't received any messages on an `MConnection` in a time `pingTimeout`, we send a ping message. +When a ping is received on the `MConnection`, a pong is sent in response. + +If a pong is not received in sufficient time, the peer's score should be decremented (TODO). + +### Msg + +Messages in channels are chopped into smaller msgPackets for multiplexing. + +``` +type msgPacket struct { + ChannelID byte + EOF byte // 1 means message ends here. + Bytes []byte +} +``` + +The msgPacket is serialized using go-wire, and prefixed with a 0x3. +The received `Bytes` of a sequential set of packets are appended together +until a packet with `EOF=1` is received, at which point the complete serialized message +is returned for processing by the corresponding channels `onReceive` function. + +### Multiplexing + +Messages are sent from a single `sendRoutine`, which loops over a select statement that results in the sending +of a ping, a pong, or a batch of data messages. The batch of data messages may include messages from multiple channels. +Message bytes are queued for sending in their respective channel, with each channel holding one unsent message at a time. +Messages are chosen for a batch one a time from the channel with the lowest ratio of recently sent bytes to channel priority. + +## Sending Messages + There are two methods for sending messages: ```go func (m MConnection) Send(chID byte, msg interface{}) bool {} @@ -31,6 +68,12 @@ queue is full. `Send()` and `TrySend()` are also exposed for each `Peer`. +## Peer + +Each peer has one `MConnection` instance, and includes other information such as whether the connection +was outbound, whether the connection should be recreated if it closes, various identity information about the node, +and other higher level thread-safe data used by the reactors. + ## Switch/Reactor The `Switch` handles peer connections and exposes an API to receive incoming messages diff --git a/p2p/connection.go b/p2p/connection.go index 97d54635d..30935c71e 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -459,8 +459,11 @@ FOR_LOOP: } channel, ok := c.channelsIdx[pkt.ChannelID] if !ok || channel == nil { - cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID)) + err := fmt.Errorf("Unknown channel %X", pkt.ChannelID) + c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err) + c.stopForError(err) } + msgBytes, err := channel.recvMsgPacket(pkt) if err != nil { if c.IsRunning() { @@ -475,7 +478,9 @@ FOR_LOOP: c.onReceive(pkt.ChannelID, msgBytes) } default: - cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType)) + err := fmt.Errorf("Unknown message type %X", pktType) + c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err) + c.stopForError(err) } // TODO: shouldn't this go in the sendRoutine? @@ -648,14 +653,18 @@ func (ch *Channel) nextMsgPacket() msgPacket { func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) { packet := ch.nextMsgPacket() // log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet) - wire.WriteByte(packetTypeMsg, w, &n, &err) - wire.WriteBinary(packet, w, &n, &err) + writeMsgPacketTo(packet, w, &n, &err) if err == nil { ch.recentlySent += int64(n) } return } +func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) { + wire.WriteByte(packetTypeMsg, w, n, err) + wire.WriteBinary(packet, w, n, err) +} + // Handles incoming msgPackets. Returns a msg bytes if msg is complete. // Not goroutine-safe func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { diff --git a/p2p/connection_test.go b/p2p/connection_test.go index 71c3d64c2..a96734c0b 100644 --- a/p2p/connection_test.go +++ b/p2p/connection_test.go @@ -1,4 +1,4 @@ -package p2p_test +package p2p import ( "net" @@ -7,11 +7,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - p2p "github.com/tendermint/tendermint/p2p" + wire "github.com/tendermint/go-wire" "github.com/tendermint/tmlibs/log" ) -func createMConnection(conn net.Conn) *p2p.MConnection { +func createTestMConnection(conn net.Conn) *MConnection { onReceive := func(chID byte, msgBytes []byte) { } onError := func(r interface{}) { @@ -21,9 +21,9 @@ func createMConnection(conn net.Conn) *p2p.MConnection { return c } -func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection { - chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} - c := p2p.NewMConnection(conn, chDescs, onReceive, onError) +func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection { + chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} + c := NewMConnection(conn, chDescs, onReceive, onError) c.SetLogger(log.TestingLogger()) return c } @@ -35,7 +35,7 @@ func TestMConnectionSend(t *testing.T) { defer server.Close() defer client.Close() - mconn := createMConnection(client) + mconn := createTestMConnection(client) _, err := mconn.Start() require.Nil(err) defer mconn.Stop() @@ -75,7 +75,7 @@ func TestMConnectionReceive(t *testing.T) { require.Nil(err) defer mconn1.Stop() - mconn2 := createMConnection(server) + mconn2 := createTestMConnection(server) _, err = mconn2.Start() require.Nil(err) defer mconn2.Stop() @@ -100,7 +100,7 @@ func TestMConnectionStatus(t *testing.T) { defer server.Close() defer client.Close() - mconn := createMConnection(client) + mconn := createTestMConnection(client) _, err := mconn.Start() require.Nil(err) defer mconn.Stop() @@ -142,3 +142,132 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) { t.Fatal("Did not receive error in 500ms") } } + +func newClientAndServerConnsForReadErrors(require *require.Assertions, chOnErr chan struct{}) (*MConnection, *MConnection) { + server, client := net.Pipe() + + onReceive := func(chID byte, msgBytes []byte) {} + onError := func(r interface{}) {} + + // create client conn with two channels + chDescs := []*ChannelDescriptor{ + {ID: 0x01, Priority: 1, SendQueueCapacity: 1}, + {ID: 0x02, Priority: 1, SendQueueCapacity: 1}, + } + mconnClient := NewMConnection(client, chDescs, onReceive, onError) + mconnClient.SetLogger(log.TestingLogger().With("module", "client")) + _, err := mconnClient.Start() + require.Nil(err) + + // create server conn with 1 channel + // it fires on chOnErr when there's an error + serverLogger := log.TestingLogger().With("module", "server") + onError = func(r interface{}) { + chOnErr <- struct{}{} + } + mconnServer := createMConnectionWithCallbacks(server, onReceive, onError) + mconnServer.SetLogger(serverLogger) + _, err = mconnServer.Start() + require.Nil(err) + return mconnClient, mconnServer +} + +func expectSend(ch chan struct{}) bool { + after := time.After(time.Second * 5) + select { + case <-ch: + return true + case <-after: + return false + } +} + +func TestMConnectionReadErrorBadEncoding(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + chOnErr := make(chan struct{}) + mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr) + defer mconnClient.Stop() + defer mconnServer.Stop() + + client := mconnClient.conn + msg := "Ant-Man" + + // send badly encoded msgPacket + var n int + var err error + wire.WriteByte(packetTypeMsg, client, &n, &err) + wire.WriteByteSlice([]byte(msg), client, &n, &err) + assert.True(expectSend(chOnErr), "badly encoded msgPacket") +} + +func TestMConnectionReadErrorUnknownChannel(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + chOnErr := make(chan struct{}) + mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr) + defer mconnClient.Stop() + defer mconnServer.Stop() + + msg := "Ant-Man" + + // fail to send msg on channel unknown by client + assert.False(mconnClient.Send(0x03, msg)) + + // send msg on channel unknown by the server. + // should cause an error + assert.True(mconnClient.Send(0x02, msg)) + assert.True(expectSend(chOnErr), "unknown channel") +} + +func TestMConnectionReadErrorLongMessage(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + chOnErr := make(chan struct{}) + chOnRcv := make(chan struct{}) + + mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr) + defer mconnClient.Stop() + defer mconnServer.Stop() + + mconnServer.onReceive = func(chID byte, msgBytes []byte) { + chOnRcv <- struct{}{} + } + + client := mconnClient.conn + + // send msg thats just right + var n int + var err error + packet := msgPacket{ + ChannelID: 0x01, + Bytes: make([]byte, mconnClient.config.maxMsgPacketTotalSize()-5), + EOF: 1, + } + writeMsgPacketTo(packet, client, &n, &err) + assert.True(expectSend(chOnRcv), "msg just right") + + // send msg thats too long + packet = msgPacket{ + ChannelID: 0x01, + Bytes: make([]byte, mconnClient.config.maxMsgPacketTotalSize()-4), + EOF: 1, + } + writeMsgPacketTo(packet, client, &n, &err) + assert.True(expectSend(chOnErr), "msg too long") +} + +func TestMConnectionReadErrorUnknownMsgType(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + chOnErr := make(chan struct{}) + mconnClient, mconnServer := newClientAndServerConnsForReadErrors(require, chOnErr) + defer mconnClient.Stop() + defer mconnServer.Stop() + + // send msg with unknown msg type + var n int + var err error + wire.WriteByte(0x04, mconnClient.conn, &n, &err) + assert.True(expectSend(chOnErr), "unknown msg type") +} diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 54c2d06b5..2f13703ef 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -143,7 +143,7 @@ func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { r.SendAddrs(src, r.book.GetSelection()) case *pexAddrsMessage: // We received some peer addresses from src. - // (We don't want to get spammed with bad peers) + // TODO: (We don't want to get spammed with bad peers) for _, addr := range msg.Addrs { if addr != nil { r.book.AddAddress(addr, srcAddr) diff --git a/p2p/switch.go b/p2p/switch.go index af9324a9b..994a3344d 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -481,17 +481,6 @@ func (sw *Switch) listenerRoutine(l Listener) { // cleanup } -//----------------------------------------------------------------------------- - -type SwitchEventNewPeer struct { - Peer Peer -} - -type SwitchEventDonePeer struct { - Peer Peer - Error interface{} -} - //------------------------------------------------------------------ // Connects switches via arbitrary net.Conn. Used for testing.