diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index 20525e675..f0dd14147 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -8,11 +8,26 @@ import ( type Reactor interface { cmn.Service // Start, Stop + // SetSwitch allows setting a switch. SetSwitch(*Switch) + + // GetChannels returns the list of channel descriptors. GetChannels() []*conn.ChannelDescriptor + + // AddPeer is called by the switch when a new peer is added. AddPeer(peer Peer) + + // RemovePeer is called by the switch when the peer is stopped (due to error + // or other reason). RemovePeer(peer Peer, reason interface{}) - Receive(chID byte, peer Peer, msgBytes []byte) // CONTRACT: msgBytes are not nil + + // Receive is called when msgBytes is received from peer. + // + // NOTE reactor can not keep msgBytes around after Receive completes without + // copying. + // + // CONTRACT: msgBytes are not nil. + Receive(chID byte, peer Peer, msgBytes []byte) } //-------------------------------------- diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 451f35bbe..7727ee321 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -406,6 +406,7 @@ func (c *MConnection) sendMsgPacket() bool { // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer. // After a whole message has been assembled, it's pushed to onReceive(). // Blocks depending on how the connection is throttled. +// Otherwise, it never blocks. func (c *MConnection) recvRoutine() { defer c._recover() @@ -449,8 +450,8 @@ FOR_LOOP: c.Logger.Debug("Receive Ping") select { case c.pong <- struct{}{}: - case <-c.quit: - break FOR_LOOP + default: + // never block } case packetTypePong: // do nothing @@ -493,10 +494,6 @@ FOR_LOOP: c.stopForError(err) break FOR_LOOP } - - // TODO: shouldn't this go in the sendRoutine? - // Better to send a ping packet when *we* haven't sent anything for a while. - c.pingTimer.Reset() } // Cleanup @@ -682,7 +679,8 @@ func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) { wire.WriteBinary(packet, w, n, err) } -// Handles incoming msgPackets. Returns a msg bytes if msg is complete. +// Handles incoming msgPackets. It returns a message bytes if message is +// complete. NOTE message bytes may change on next call to recvMsgPacket. // Not goroutine-safe func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { ch.Logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet) @@ -692,6 +690,7 @@ func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { ch.recving = append(ch.recving, packet.Bytes...) if packet.EOF == byte(0x01) { msgBytes := ch.recving + // clear the slice without re-allocating. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,