Browse Source

close return channel when we're done

Benchmark results:

```
BenchmarkSwitchBroadcast-2         30000             71275 ns/op
--- BENCH: BenchmarkSwitchBroadcast-2
        switch_test.go:339: success: 1, failure: 0
        switch_test.go:339: success: 100, failure: 0
        switch_test.go:339: success: 10000, failure: 0
        switch_test.go:339: success: 30000, failure: 0
```
pull/1095/head
Anton Kaliaev 7 years ago
parent
commit
161e100a24
No known key found for this signature in database GPG Key ID: 7B6881D965918214
3 changed files with 18 additions and 13 deletions
  1. +1
    -1
      p2p/conn/secret_connection_test.go
  2. +13
    -4
      p2p/switch.go
  3. +4
    -8
      p2p/switch_test.go

+ 1
- 1
p2p/conn/secret_connection_test.go View File

@ -4,7 +4,7 @@ import (
"io"
"testing"
"github.com/tendermint/go-crypto"
crypto "github.com/tendermint/go-crypto"
cmn "github.com/tendermint/tmlibs/common"
)


+ 13
- 4
p2p/switch.go View File

@ -5,6 +5,7 @@ import (
"math"
"math/rand"
"net"
"sync"
"time"
"github.com/pkg/errors"
@ -200,20 +201,28 @@ func (sw *Switch) OnStop() {
//---------------------------------------------------------------------
// Peers
// Broadcast runs a go routine for each attempted send, which will block
// trying to send for defaultSendTimeoutSeconds. Returns a channel
// which receives success values for each attempted send (false if times out).
// Broadcast runs a go routine for each attempted send, which will block trying
// to send for defaultSendTimeoutSeconds. Returns a channel which receives
// success values for each attempted send (false if times out). Channel will be
// closed once msg send to all peers.
//
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
// TODO: Something more intelligent.
func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
successChan := make(chan bool, len(sw.peers.List()))
sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg)
var wg sync.WaitGroup
for _, peer := range sw.peers.List() {
wg.Add(1)
go func(peer Peer) {
defer wg.Done()
success := peer.Send(chID, msg)
successChan <- success
}(peer)
}
go func() {
wg.Wait()
close(successChan)
}()
return successChan
}


+ 4
- 8
p2p/switch_test.go View File

@ -300,10 +300,8 @@ func TestSwitchFullConnectivity(t *testing.T) {
}
}
func BenchmarkSwitches(b *testing.B) {
b.StopTimer()
s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch {
func BenchmarkSwitchBroadcast(b *testing.B) {
s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch {
// Make bar reactors of bar channels each
sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x00), Priority: 10},
@ -320,7 +318,8 @@ func BenchmarkSwitches(b *testing.B) {
// Allow time for goroutines to boot up
time.Sleep(1 * time.Second)
b.StartTimer()
b.ResetTimer()
numSuccess, numFailure := 0, 0
@ -338,7 +337,4 @@ func BenchmarkSwitches(b *testing.B) {
}
b.Logf("success: %v, failure: %v", numSuccess, numFailure)
// Allow everything to flush before stopping switches & closing connections.
b.StopTimer()
}

Loading…
Cancel
Save