From 7640e6a29fd43f4eef1f735487d4b59d62b2e7d3 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 26 Jan 2018 13:41:29 -0500 Subject: [PATCH 1/4] add some p2p TODOs --- p2p/conn/connection.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 451f35bbe..c23ed7f82 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -406,6 +406,7 @@ func (c *MConnection) sendMsgPacket() bool { // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer. // After a whole message has been assembled, it's pushed to onReceive(). // Blocks depending on how the connection is throttled. +// Otherwise, it never blocks. func (c *MConnection) recvRoutine() { defer c._recover() @@ -449,8 +450,8 @@ FOR_LOOP: c.Logger.Debug("Receive Ping") select { case c.pong <- struct{}{}: - case <-c.quit: - break FOR_LOOP + default: + // never block } case packetTypePong: // do nothing @@ -494,8 +495,9 @@ FOR_LOOP: break FOR_LOOP } - // TODO: shouldn't this go in the sendRoutine? - // Better to send a ping packet when *we* haven't sent anything for a while. + // TODO: don't bother with this "only ping when we havent heard from them". + // lets just always ping every peer from the sendRoutine every 10s no matter what. + // if they dont pong within pongTimeout, disconnect :) c.pingTimer.Reset() } @@ -691,7 +693,13 @@ func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { } ch.recving = append(ch.recving, packet.Bytes...) if packet.EOF == byte(0x01) { + // TODO: document that these returned msgBytes will change under you after Receive finishes. + // TODO: document it in the Reactor interface especially - implementations of a Reactor + // can not keep these bytes around after the Receive completes without copying! + // In general that's fine, because the first thing we do is unmarshal into a msg type and then + // we never use the bytes again msgBytes := ch.recving + // clear the slice without re-allocating. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes, From 2b2c233977591dadc3646844afe61f1c7f579d49 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 8 Feb 2018 13:07:40 +0400 Subject: [PATCH 2/4] write docs for Reactor interface --- p2p/base_reactor.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index 20525e675..8217d46bc 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -8,11 +8,23 @@ import ( type Reactor interface { cmn.Service // Start, Stop + // SetSwitch allows setting a switch. SetSwitch(*Switch) + + // GetChannels returns the list of channel descriptors. GetChannels() []*conn.ChannelDescriptor + + // AddPeer is called by the switch when a new peer is added. AddPeer(peer Peer) + + // RemovePeer is called by the switch when the peer is stopped (due to error + // or other reason). RemovePeer(peer Peer, reason interface{}) - Receive(chID byte, peer Peer, msgBytes []byte) // CONTRACT: msgBytes are not nil + + // Receive is called when msgBytes is received from peer. + // + // CONTRACT: msgBytes are not nil + Receive(chID byte, peer Peer, msgBytes []byte) } //-------------------------------------- From d6d1f8512d1ecb96034e081bd0cbe9d802b6faa2 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 8 Feb 2018 13:08:11 +0400 Subject: [PATCH 3/4] do not reset pingTimer don't bother with this "only ping when we havent heard from them". lets just always ping every peer from the sendRoutine every 10s no matter what. if they dont pong within pongTimeout, disconnect :) --- p2p/conn/connection.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index c23ed7f82..6b4912502 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -494,11 +494,6 @@ FOR_LOOP: c.stopForError(err) break FOR_LOOP } - - // TODO: don't bother with this "only ping when we havent heard from them". - // lets just always ping every peer from the sendRoutine every 10s no matter what. - // if they dont pong within pongTimeout, disconnect :) - c.pingTimer.Reset() } // Cleanup From 3f9aa8d8fa328fee897d0295fb8024f1cf67c9f4 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 8 Feb 2018 13:09:46 +0400 Subject: [PATCH 4/4] document that msgBytes in p2p/connection change --- p2p/base_reactor.go | 5 ++++- p2p/conn/connection.go | 8 ++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index 8217d46bc..f0dd14147 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -23,7 +23,10 @@ type Reactor interface { // Receive is called when msgBytes is received from peer. // - // CONTRACT: msgBytes are not nil + // NOTE reactor can not keep msgBytes around after Receive completes without + // copying. + // + // CONTRACT: msgBytes are not nil. Receive(chID byte, peer Peer, msgBytes []byte) } diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 6b4912502..7727ee321 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -679,7 +679,8 @@ func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) { wire.WriteBinary(packet, w, n, err) } -// Handles incoming msgPackets. Returns a msg bytes if msg is complete. +// Handles incoming msgPackets. It returns a message bytes if message is +// complete. NOTE message bytes may change on next call to recvMsgPacket. // Not goroutine-safe func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { ch.Logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet) @@ -688,11 +689,6 @@ func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { } ch.recving = append(ch.recving, packet.Bytes...) if packet.EOF == byte(0x01) { - // TODO: document that these returned msgBytes will change under you after Receive finishes. - // TODO: document it in the Reactor interface especially - implementations of a Reactor - // can not keep these bytes around after the Receive completes without copying! - // In general that's fine, because the first thing we do is unmarshal into a msg type and then - // we never use the bytes again msgBytes := ch.recving // clear the slice without re-allocating.