From aed4bbf0f0e236b28b76d9d456e91cb01de60103 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 25 Mar 2015 02:36:59 -0700 Subject: [PATCH] Fix switch tests --- daemon/daemon.go | 4 ++-- p2p/pex_reactor.go | 5 ++-- p2p/switch.go | 57 +++++++++++++++++++--------------------------- p2p/switch_test.go | 52 ++++++++++++++++++++---------------------- 4 files changed, 53 insertions(+), 65 deletions(-) diff --git a/daemon/daemon.go b/daemon/daemon.go index 89e5de697..a4b573bee 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -94,13 +94,13 @@ func (n *Node) Start() { go n.inboundConnectionRoutine(l) } n.book.Start() - n.sw.StartAll() + n.sw.StartReactors() } func (n *Node) Stop() { log.Info("Stopping Node") // TODO: gracefully disconnect from peers. - n.sw.StopAll() + n.sw.Stop() n.book.Stop() } diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index de742645f..926859316 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -62,8 +62,9 @@ func (pexR *PEXReactor) Stop() { func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { return []*ChannelDescriptor{ &ChannelDescriptor{ - Id: PexChannel, - Priority: 1, + Id: PexChannel, + Priority: 1, + SendQueueCapacity: 10, }, } } diff --git a/p2p/switch.go b/p2p/switch.go index 27c61cb46..5216f77dd 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "net" - "sync/atomic" "time" . "github.com/tendermint/tendermint/common" @@ -35,13 +34,11 @@ type Switch struct { reactorsByCh map[byte]Reactor peers *PeerSet dialing *CMap - listeners *CMap // listenerName -> chan interface{} - running uint32 // atomic + listeners *CMap // listenerName -> chan interface{} } var ( ErrSwitchDuplicatePeer = errors.New("Duplicate peer") - ErrSwitchStopped = errors.New("Switch stopped") ) const ( @@ -58,17 +55,18 @@ func NewSwitch() *Switch { peers: NewPeerSet(), dialing: NewCMap(), listeners: NewCMap(), - running: 0, } return sw } +// Not goroutine safe. func (sw *Switch) SetChainId(hash []byte, network string) { sw.chainId = hex.EncodeToString(hash) + "-" + network } -func (sw *Switch) AddReactor(name string, reactor Reactor) { +// Not goroutine safe. +func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { // Validate the reactor. // No two reactors can share the same channel. reactorChannels := reactor.GetChannels() @@ -81,51 +79,49 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) { sw.reactorsByCh[chId] = reactor } sw.reactors[name] = reactor - time.Sleep(1 * time.Second) + return reactor } -func (sw *Switch) StartReactor(name string) { - atomic.StoreUint32(&sw.running, 1) - sw.reactors[name].Start(sw) +func (sw *Switch) Reactor(name string) Reactor { + return sw.reactors[name] } // Convenience function -func (sw *Switch) StartAll() { - atomic.StoreUint32(&sw.running, 1) +func (sw *Switch) StartReactors() { for _, reactor := range sw.reactors { reactor.Start(sw) } } -func (sw *Switch) StopReactor(name string) { - sw.reactors[name].Stop() +// Convenience function +func (sw *Switch) StopReactors() { + // Stop all reactors. + for _, reactor := range sw.reactors { + reactor.Stop() + } } // Convenience function -// Not goroutine safe -func (sw *Switch) StopAll() { - atomic.StoreUint32(&sw.running, 0) +func (sw *Switch) StopPeers() { // Stop each peer. for _, peer := range sw.peers.List() { peer.stop() } sw.peers = NewPeerSet() - // Stop all reactors. - for _, reactor := range sw.reactors { - reactor.Stop() - } } -// Not goroutine safe +// Convenience function +func (sw *Switch) Stop() { + sw.StopPeers() + sw.StopReactors() +} + +// Not goroutine safe to modify. func (sw *Switch) Reactors() map[string]Reactor { return sw.reactors } func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { - if atomic.LoadUint32(&sw.running) == 0 { - return nil, ErrSwitchStopped - } - peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) // Add the peer to .peers @@ -150,10 +146,6 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er } func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { - if atomic.LoadUint32(&sw.running) == 0 { - return nil, ErrSwitchStopped - } - log.Debug("Dialing address", "address", addr) sw.dialing.Set(addr.String(), addr) conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second) @@ -179,9 +171,6 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool { // trying to send for defaultSendTimeoutSeconds. Returns a channel // which receives success values for each attempted send (false if times out) func (sw *Switch) Broadcast(chId byte, msg interface{}) chan bool { - if atomic.LoadUint32(&sw.running) == 0 { - return nil - } successChan := make(chan bool, len(sw.peers.List())) log.Debug("Broadcast", "channel", chId, "msg", msg) for _, peer := range sw.peers.List() { @@ -239,7 +228,7 @@ func (sw *Switch) IsListening() bool { } func (sw *Switch) doAddPeer(peer *Peer) { - for name, reactor := range sw.reactors { + for _, reactor := range sw.reactors { reactor.AddPeer(peer) } } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index a260df27a..f486230f5 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -68,12 +68,12 @@ func (tr *TestReactor) Receive(chId byte, peer *Peer, msgBytes []byte) { //----------------------------------------------------------------------------- -// convenience method for creating two switches connected to each other. -func makeSwitchPair(t testing.TB, reactorsGenerator func() []Reactor) (*Switch, *Switch) { +// convenience method for creating bar switches connected to each other. +func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *Switch) { - // Create two switches that will be interconnected. - s1 := NewSwitch(reactorsGenerator()) - s2 := NewSwitch(reactorsGenerator()) + // Create bar switches that will be interconnected. + s1 := initSwitch(NewSwitch()) + s2 := initSwitch(NewSwitch()) // Create a listener for s1 l := NewDefaultListener("tcp", ":8001", true) @@ -104,18 +104,17 @@ func makeSwitchPair(t testing.TB, reactorsGenerator func() []Reactor) (*Switch, } func TestSwitches(t *testing.T) { - s1, s2 := makeSwitchPair(t, func() []Reactor { - // Make two reactors of two channels each - reactors := make([]Reactor, 2) - reactors[0] = NewTestReactor([]*ChannelDescriptor{ + s1, s2 := makeSwitchPair(t, func(sw *Switch) *Switch { + // Make bar reactors of bar channels each + sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x00), Priority: 10}, &ChannelDescriptor{Id: byte(0x01), Priority: 10}, - }, true) - reactors[1] = NewTestReactor([]*ChannelDescriptor{ + }, true)).Start(sw) // Start the reactor + sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x02), Priority: 10}, &ChannelDescriptor{Id: byte(0x03), Priority: 10}, - }, true) - return reactors + }, true)).Start(sw) // Start the reactor + return sw }) defer s1.Stop() defer s2.Stop() @@ -129,8 +128,8 @@ func TestSwitches(t *testing.T) { } ch0Msg := "channel zero" - ch1Msg := "channel one" - ch2Msg := "channel two" + ch1Msg := "channel foo" + ch2Msg := "channel bar" s1.Broadcast(byte(0x00), ch0Msg) s1.Broadcast(byte(0x01), ch1Msg) @@ -140,7 +139,7 @@ func TestSwitches(t *testing.T) { time.Sleep(5000 * time.Millisecond) // Check message on ch0 - ch0Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x00)] + ch0Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x00)] if len(ch0Msgs) != 2 { t.Errorf("Expected to have received 1 message in ch0") } @@ -149,7 +148,7 @@ func TestSwitches(t *testing.T) { } // Check message on ch1 - ch1Msgs := s2.Reactors()[0].(*TestReactor).msgsReceived[byte(0x01)] + ch1Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x01)] if len(ch1Msgs) != 1 { t.Errorf("Expected to have received 1 message in ch1") } @@ -158,7 +157,7 @@ func TestSwitches(t *testing.T) { } // Check message on ch2 - ch2Msgs := s2.Reactors()[1].(*TestReactor).msgsReceived[byte(0x02)] + ch2Msgs := s2.Reactor("bar").(*TestReactor).msgsReceived[byte(0x02)] if len(ch2Msgs) != 1 { t.Errorf("Expected to have received 1 message in ch2") } @@ -172,18 +171,17 @@ func BenchmarkSwitches(b *testing.B) { b.StopTimer() - s1, s2 := makeSwitchPair(b, func() []Reactor { - // Make two reactors of two channels each - reactors := make([]Reactor, 2) - reactors[0] = NewTestReactor([]*ChannelDescriptor{ + s1, s2 := makeSwitchPair(b, func(sw *Switch) *Switch { + // Make bar reactors of bar channels each + sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x00), Priority: 10}, &ChannelDescriptor{Id: byte(0x01), Priority: 10}, - }, false) - reactors[1] = NewTestReactor([]*ChannelDescriptor{ + }, false)) + sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ &ChannelDescriptor{Id: byte(0x02), Priority: 10}, &ChannelDescriptor{Id: byte(0x03), Priority: 10}, - }, false) - return reactors + }, false)) + return sw }) defer s1.Stop() defer s2.Stop() @@ -194,7 +192,7 @@ func BenchmarkSwitches(b *testing.B) { numSuccess, numFailure := 0, 0 - // Send random message from one channel to another + // Send random message from foo channel to another for i := 0; i < b.N; i++ { chId := byte(i % 4) successChan := s1.Broadcast(chId, "test data")