Browse Source

p2p: broadcast spawns goroutine to Send on each peer and times out after 10 seconds. Closes #7

pull/32/head
Ethan Buchman 10 years ago
committed by Jae Kwon
parent
commit
21530bf00f
2 changed files with 24 additions and 17 deletions
  1. +14
    -5
      p2p/connection.go
  2. +10
    -12
      p2p/switch.go

+ 14
- 5
p2p/connection.go View File

@ -28,6 +28,7 @@ const (
defaultRecvRate = 51200 // 5Kb/s defaultRecvRate = 51200 // 5Kb/s
defaultSendQueueCapacity = 1 defaultSendQueueCapacity = 1
defaultRecvBufferCapacity = 4096 defaultRecvBufferCapacity = 4096
defaultSendTimeoutSeconds = 10
) )
type receiveCbFunc func(chId byte, msgBytes []byte) type receiveCbFunc func(chId byte, msgBytes []byte)
@ -191,7 +192,7 @@ func (c *MConnection) Send(chId byte, msg interface{}) bool {
return false return false
} }
channel.sendBytes(binary.BinaryBytes(msg))
success := channel.sendBytes(binary.BinaryBytes(msg))
// Wake up sendRoutine if necessary // Wake up sendRoutine if necessary
select { select {
@ -199,7 +200,7 @@ func (c *MConnection) Send(chId byte, msg interface{}) bool {
default: default:
} }
return true
return success
} }
// Queues a message to be sent to channel. // Queues a message to be sent to channel.
@ -470,9 +471,17 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
// Queues message to send to this channel. // Queues message to send to this channel.
// Goroutine-safe // Goroutine-safe
func (ch *Channel) sendBytes(bytes []byte) {
ch.sendQueue <- bytes
atomic.AddUint32(&ch.sendQueueSize, 1)
// Times out (and returns false) after defaultSendTimeoutSeconds
func (ch *Channel) sendBytes(bytes []byte) bool {
sendTicker := time.NewTicker(defaultSendTimeoutSeconds * time.Second)
select {
case <-sendTicker.C:
// timeout
return false
case ch.sendQueue <- bytes:
atomic.AddUint32(&ch.sendQueueSize, 1)
return true
}
} }
// Queues message to send to this channel. // Queues message to send to this channel.


+ 10
- 12
p2p/switch.go View File

@ -161,24 +161,22 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool {
return sw.dialing.Has(addr.String()) return sw.dialing.Has(addr.String())
} }
// XXX: This is wrong, we can't just ignore failures on TrySend.
func (sw *Switch) Broadcast(chId byte, msg interface{}) (numSuccess, numFailure int) {
// Broadcast runs a go routine for each attemptted send, which will block
// trying to send for defaultSendTimeoutSeconds. Returns a channel
// which receives success values for each attempted send (false if times out)
func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool {
if atomic.LoadUint32(&sw.stopped) == 1 { if atomic.LoadUint32(&sw.stopped) == 1 {
return return
} }
successChan := make(chan bool, len(sw.peers.List()))
log.Debug("Broadcast", "channel", chId, "msg", msg) log.Debug("Broadcast", "channel", chId, "msg", msg)
for _, peer := range sw.peers.List() { for _, peer := range sw.peers.List() {
// XXX XXX Change.
// success := peer.TrySend(chId, msg)
success := peer.Send(chId, msg)
if success {
numSuccess += 1
} else {
numFailure += 1
}
go func() {
success := peer.Send(chId, msg)
successChan <- success
}()
} }
return
return successChan
} }


Loading…
Cancel
Save