Browse Source

Merge pull request #1196 from tendermint/1149-TestReactorValidatorSetChanges-fails-non-deterministically

WIP: TestReactorValidatorSetChanges fails non deterministically
pull/1200/head
Ethan Buchman 7 years ago
committed by GitHub
parent
commit
199ea40980
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 8 deletions
  1. +16
    -1
      p2p/base_reactor.go
  2. +6
    -7
      p2p/conn/connection.go

+ 16
- 1
p2p/base_reactor.go View File

@ -8,11 +8,26 @@ import (
type Reactor interface { type Reactor interface {
cmn.Service // Start, Stop cmn.Service // Start, Stop
// SetSwitch allows setting a switch.
SetSwitch(*Switch) SetSwitch(*Switch)
// GetChannels returns the list of channel descriptors.
GetChannels() []*conn.ChannelDescriptor GetChannels() []*conn.ChannelDescriptor
// AddPeer is called by the switch when a new peer is added.
AddPeer(peer Peer) AddPeer(peer Peer)
// RemovePeer is called by the switch when the peer is stopped (due to error
// or other reason).
RemovePeer(peer Peer, reason interface{}) RemovePeer(peer Peer, reason interface{})
Receive(chID byte, peer Peer, msgBytes []byte) // CONTRACT: msgBytes are not nil
// Receive is called when msgBytes is received from peer.
//
// NOTE reactor can not keep msgBytes around after Receive completes without
// copying.
//
// CONTRACT: msgBytes are not nil.
Receive(chID byte, peer Peer, msgBytes []byte)
} }
//-------------------------------------- //--------------------------------------


+ 6
- 7
p2p/conn/connection.go View File

@ -406,6 +406,7 @@ func (c *MConnection) sendMsgPacket() bool {
// recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer. // recvRoutine reads msgPackets and reconstructs the message using the channels' "recving" buffer.
// After a whole message has been assembled, it's pushed to onReceive(). // After a whole message has been assembled, it's pushed to onReceive().
// Blocks depending on how the connection is throttled. // Blocks depending on how the connection is throttled.
// Otherwise, it never blocks.
func (c *MConnection) recvRoutine() { func (c *MConnection) recvRoutine() {
defer c._recover() defer c._recover()
@ -449,8 +450,8 @@ FOR_LOOP:
c.Logger.Debug("Receive Ping") c.Logger.Debug("Receive Ping")
select { select {
case c.pong <- struct{}{}: case c.pong <- struct{}{}:
case <-c.quit:
break FOR_LOOP
default:
// never block
} }
case packetTypePong: case packetTypePong:
// do nothing // do nothing
@ -493,10 +494,6 @@ FOR_LOOP:
c.stopForError(err) c.stopForError(err)
break FOR_LOOP break FOR_LOOP
} }
// TODO: shouldn't this go in the sendRoutine?
// Better to send a ping packet when *we* haven't sent anything for a while.
c.pingTimer.Reset()
} }
// Cleanup // Cleanup
@ -682,7 +679,8 @@ func writeMsgPacketTo(packet msgPacket, w io.Writer, n *int, err *error) {
wire.WriteBinary(packet, w, n, err) wire.WriteBinary(packet, w, n, err)
} }
// Handles incoming msgPackets. Returns a msg bytes if msg is complete.
// Handles incoming msgPackets. It returns a message bytes if message is
// complete. NOTE message bytes may change on next call to recvMsgPacket.
// Not goroutine-safe // Not goroutine-safe
func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
ch.Logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet) ch.Logger.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
@ -692,6 +690,7 @@ func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
ch.recving = append(ch.recving, packet.Bytes...) ch.recving = append(ch.recving, packet.Bytes...)
if packet.EOF == byte(0x01) { if packet.EOF == byte(0x01) {
msgBytes := ch.recving msgBytes := ch.recving
// clear the slice without re-allocating. // clear the slice without re-allocating.
// http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
// suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes, // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,


Loading…
Cancel
Save