diff --git a/main.go b/main.go new file mode 100644 index 000000000..818394308 --- /dev/null +++ b/main.go @@ -0,0 +1,53 @@ +package main + +import ( + "github.com/tendermint/tendermint/p2p" +) + +func initPeer(peer *p2p.Peer) { + // +} + +func main() { + + // Define channels for our app + chDescs := []ChannelDescriptor{ + ChannelDescriptor{ + Name: "PEX", + SendBufferSize: 2, + RecvBuffersize: 2, + }, + ChannelDescriptor{ + Name: "block", + SendBufferSize: 10, + RecvBufferSize: 10, + }, + ChannelDescriptor{ + Name: "mempool", + SendBufferSize: 100, + RecvBufferSize: 100, + }, + ChannelDescriptor{ + Name: "consensus", + SendBufferSize: 1000, + RecvBufferSize: 1000, + }, + } + + // Create the switch + sw := NewSwitch(chDescs) + + // Create a listener for incoming connections + l := NewDefaultListener("tcp", ":8001") + go func() { + for { + inConn, ok := <-l.Connections() + if !ok { + break + } + sw.AddPeerWithConnection(inConn, false) + } + }() + + // TODO +} diff --git a/p2p/listener.go b/p2p/listener.go index a6fa7a1da..a99a2381d 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -7,7 +7,7 @@ import ( . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" - "github.com/tendermint/tendermint/peer/upnp" + "github.com/tendermint/tendermint/p2p/upnp" ) /* diff --git a/p2p/peer.go b/p2p/peer.go index e77765064..65008c962 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -77,6 +77,8 @@ func (p *Peer) TryQueue(pkt Packet) bool { return false } + sendQueue <- pkt + return true select { case sendQueue <- pkt: return true diff --git a/p2p/peer_set.go b/p2p/peer_set.go new file mode 100644 index 000000000..2151a707d --- /dev/null +++ b/p2p/peer_set.go @@ -0,0 +1,76 @@ +package p2p + +import ( + "sync" +) + +/* +PeerSet is a special structure for keeping a table of peers. +Iteration over the peers is super fast and thread-safe. +*/ +type PeerSet struct { + mtx sync.Mutex + lookup map[string]*peerSetItem + list []*Peer +} + +type peerSetItem struct { + peer *Peer + index int +} + +func NewPeerSet() *PeerSet { + return &PeerSet{ + lookup: make(map[string]*peerSetItem), + list: make([]*Peer, 0, 256), + } +} + +func (ps *PeerSet) Add(peer *Peer) bool { + ps.mtx.Lock() + defer ps.mtx.Unlock() + addr := peer.RemoteAddress().String() + if ps.lookup[addr] != nil { + return false + } + index := len(ps.list) + // Appending is safe even with other goroutines + // iterating over the ps.list slice. + ps.list = append(ps.list, peer) + ps.lookup[addr] = &peerSetItem{peer, index} + return true +} + +func (ps *PeerSet) Remove(peer *Peer) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + addr := peer.RemoteAddress().String() + item := ps.lookup[addr] + if item == nil { + return + } + index := item.index + // If it's the last peer, that's an easy special case. + if index == len(ps.list)-1 { + ps.list = ps.list[:len(ps.list)-1] + return + } + // Copy the list but without the last element. + newList := make([]*Peer, len(ps.list)-1) + copy(newList, ps.list) + // Move the last item from ps.list to "index" in list. + lastPeer := ps.list[len(ps.list)-1] + lastPeerAddr := lastPeer.RemoteAddress().String() + lastPeerItem := ps.lookup[lastPeerAddr] + newList[index] = lastPeer + lastPeerItem.index = index + ps.list = newList + delete(ps.lookup, addr) +} + +// threadsafe list of peers. +func (ps *PeerSet) List() []*Peer { + ps.mtx.Lock() + defer ps.mtx.Unlock() + return ps.list +} diff --git a/p2p/switch.go b/p2p/switch.go index 38f98b572..43d9e0556 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -2,11 +2,9 @@ package p2p import ( "errors" - "sync" "sync/atomic" . "github.com/tendermint/tendermint/common" - "github.com/tendermint/tendermint/merkle" ) /* @@ -23,8 +21,7 @@ Incoming messages are received by calling ".Receive()". type Switch struct { channels []ChannelDescriptor pktRecvQueues map[string]chan *InboundPacket - peersMtx sync.Mutex - peers merkle.Tree // addr -> *Peer + peers *PeerSet quit chan struct{} stopped uint32 } @@ -44,7 +41,7 @@ func NewSwitch(channels []ChannelDescriptor) *Switch { s := &Switch{ channels: channels, pktRecvQueues: pktRecvQueues, - peers: merkle.NewIAVLTree(nil), + peers: NewPeerSet(), quit: make(chan struct{}), stopped: 0, } @@ -54,20 +51,15 @@ func NewSwitch(channels []ChannelDescriptor) *Switch { func (s *Switch) Stop() { log.Infof("Stopping switch") - // lock - s.peersMtx.Lock() if atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { close(s.quit) // stop each peer. - for peerValue := range s.peers.Values() { - peer := peerValue.(*Peer) + for _, peer := range s.peers.List() { peer.stop() } // empty tree. - s.peers = merkle.NewIAVLTree(nil) + s.peers = NewPeerSet() } - s.peersMtx.Unlock() - // unlock } func (s *Switch) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) { @@ -99,8 +91,7 @@ func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) { } log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes)) - for v := range s.peers.Values() { - peer := v.(*Peer) + for _, peer := range s.peers.List() { success := peer.TryQueue(pkt) log.Tracef("Broadcast for peer %v success: %v", peer, success) if success { @@ -135,12 +126,8 @@ func (s *Switch) Receive(chName string) *InboundPacket { } } -func (s *Switch) Peers() merkle.Tree { - // lock & defer - s.peersMtx.Lock() - defer s.peersMtx.Unlock() - return s.peers.Copy() - // unlock deferred +func (s *Switch) Peers() []*Peer { + return s.peers.List() } // Disconnect from a peer due to external error. @@ -154,35 +141,20 @@ func (s *Switch) StopPeerForError(peer *Peer, reason interface{}) { // If graceful is true, last message sent is a disconnect message. // TODO: handle graceful disconnects. func (s *Switch) StopPeer(peer *Peer, graceful bool) { - // lock - s.peersMtx.Lock() - peerValue, _ := s.peers.Remove(peer.RemoteAddress()) - s.peersMtx.Unlock() - // unlock - - peer_ := peerValue.(*Peer) - if peer_ != nil { - peer_.stop() - } + s.peers.Remove(peer) + peer.stop() } func (s *Switch) addPeer(peer *Peer) error { - addr := peer.RemoteAddress() - - // lock & defer - s.peersMtx.Lock() - defer s.peersMtx.Unlock() if s.stopped == 1 { return ErrSwitchStopped } - if !s.peers.Has(addr) { - log.Tracef("Actually putting addr: %v, peer: %v", addr, peer) - s.peers.Put(addr, peer) + if s.peers.Add(peer) { + log.Tracef("Adding: %v", peer) return nil } else { - // ignore duplicate peer for addr. - log.Infof("Ignoring duplicate peer for addr %v", addr) + // ignore duplicate peer + log.Infof("Ignoring duplicate: %v", peer) return ErrSwitchDuplicatePeer } - // unlock deferred } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 57933dded..fc50c84ef 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -59,11 +59,11 @@ func TestSwitches(t *testing.T) { defer s2.Stop() // Lets send a message from s1 to s2. - if s1.Peers().Size() != 1 { - t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size()) + if len(s1.Peers()) != 1 { + t.Errorf("Expected exactly 1 peer in s1, got %v", len(s1.Peers())) } - if s2.Peers().Size() != 1 { - t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size()) + if len(s2.Peers()) != 1 { + t.Errorf("Expected exactly 1 peer in s2, got %v", len(s2.Peers())) } // Broadcast a message on ch1