diff --git a/p2p/conn/secret_connection_test.go b/p2p/conn/secret_connection_test.go index 8af9cdeb5..4cf715dd2 100644 --- a/p2p/conn/secret_connection_test.go +++ b/p2p/conn/secret_connection_test.go @@ -4,7 +4,7 @@ import ( "io" "testing" - "github.com/tendermint/go-crypto" + crypto "github.com/tendermint/go-crypto" cmn "github.com/tendermint/tmlibs/common" ) diff --git a/p2p/switch.go b/p2p/switch.go index 7c09212b3..f1d02dcfc 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -5,6 +5,7 @@ import ( "math" "math/rand" "net" + "sync" "time" "github.com/pkg/errors" @@ -200,20 +201,28 @@ func (sw *Switch) OnStop() { //--------------------------------------------------------------------- // Peers -// Broadcast runs a go routine for each attempted send, which will block -// trying to send for defaultSendTimeoutSeconds. Returns a channel -// which receives success values for each attempted send (false if times out). +// Broadcast runs a go routine for each attempted send, which will block trying +// to send for defaultSendTimeoutSeconds. Returns a channel which receives +// success values for each attempted send (false if times out). Channel will be +// closed once msg send to all peers. +// // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. -// TODO: Something more intelligent. func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { successChan := make(chan bool, len(sw.peers.List())) sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) + var wg sync.WaitGroup for _, peer := range sw.peers.List() { + wg.Add(1) go func(peer Peer) { + defer wg.Done() success := peer.Send(chID, msg) successChan <- success }(peer) } + go func() { + wg.Wait() + close(successChan) + }() return successChan } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 75f9640b1..be1d96e9b 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -300,10 +300,8 @@ func TestSwitchFullConnectivity(t *testing.T) { } } -func BenchmarkSwitches(b *testing.B) { - b.StopTimer() - - s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch { +func BenchmarkSwitchBroadcast(b *testing.B) { + s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch { // Make bar reactors of bar channels each sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{ {ID: byte(0x00), Priority: 10}, @@ -320,7 +318,8 @@ func BenchmarkSwitches(b *testing.B) { // Allow time for goroutines to boot up time.Sleep(1 * time.Second) - b.StartTimer() + + b.ResetTimer() numSuccess, numFailure := 0, 0 @@ -338,7 +337,4 @@ func BenchmarkSwitches(b *testing.B) { } b.Logf("success: %v, failure: %v", numSuccess, numFailure) - - // Allow everything to flush before stopping switches & closing connections. - b.StopTimer() }