From 21530bf00f05fe5cef4eea0270640c559fef620e Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 18 Mar 2015 12:33:48 -0700 Subject: [PATCH] p2p: broadcast spawns goroutine to Send on each peer and times out after 10 seconds. Closes #7 --- p2p/connection.go | 19 ++++++++++++++----- p2p/switch.go | 22 ++++++++++------------ 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/p2p/connection.go b/p2p/connection.go index edb4389da..89086bc2a 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -28,6 +28,7 @@ const ( defaultRecvRate = 51200 // 5Kb/s defaultSendQueueCapacity = 1 defaultRecvBufferCapacity = 4096 + defaultSendTimeoutSeconds = 10 ) type receiveCbFunc func(chId byte, msgBytes []byte) @@ -191,7 +192,7 @@ func (c *MConnection) Send(chId byte, msg interface{}) bool { return false } - channel.sendBytes(binary.BinaryBytes(msg)) + success := channel.sendBytes(binary.BinaryBytes(msg)) // Wake up sendRoutine if necessary select { @@ -199,7 +200,7 @@ func (c *MConnection) Send(chId byte, msg interface{}) bool { default: } - return true + return success } // Queues a message to be sent to channel. @@ -470,9 +471,17 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { // Queues message to send to this channel. // Goroutine-safe -func (ch *Channel) sendBytes(bytes []byte) { - ch.sendQueue <- bytes - atomic.AddUint32(&ch.sendQueueSize, 1) +// Times out (and returns false) after defaultSendTimeoutSeconds +func (ch *Channel) sendBytes(bytes []byte) bool { + sendTicker := time.NewTicker(defaultSendTimeoutSeconds * time.Second) + select { + case <-sendTicker.C: + // timeout + return false + case ch.sendQueue <- bytes: + atomic.AddUint32(&ch.sendQueueSize, 1) + return true + } } // Queues message to send to this channel. diff --git a/p2p/switch.go b/p2p/switch.go index 0b348003d..99afe561e 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -161,24 +161,22 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool { return sw.dialing.Has(addr.String()) } -// XXX: This is wrong, we can't just ignore failures on TrySend. -func (sw *Switch) Broadcast(chId byte, msg interface{}) (numSuccess, numFailure int) { +// Broadcast runs a go routine for each attemptted send, which will block +// trying to send for defaultSendTimeoutSeconds. Returns a channel +// which receives success values for each attempted send (false if times out) +func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool { if atomic.LoadUint32(&sw.stopped) == 1 { return } - + successChan := make(chan bool, len(sw.peers.List())) log.Debug("Broadcast", "channel", chId, "msg", msg) for _, peer := range sw.peers.List() { - // XXX XXX Change. - // success := peer.TrySend(chId, msg) - success := peer.Send(chId, msg) - if success { - numSuccess += 1 - } else { - numFailure += 1 - } + go func() { + success := peer.Send(chId, msg) + successChan <- success + }() } - return + return successChan }