Browse Source

PeerSet is for fast iteration of peers

pull/9/head
Jae Kwon 11 years ago
parent
commit
ce51af9d4d
6 changed files with 149 additions and 46 deletions
  1. +53
    -0
      main.go
  2. +1
    -1
      p2p/listener.go
  3. +2
    -0
      p2p/peer.go
  4. +76
    -0
      p2p/peer_set.go
  5. +13
    -41
      p2p/switch.go
  6. +4
    -4
      p2p/switch_test.go

+ 53
- 0
main.go View File

@ -0,0 +1,53 @@
package main
import (
"github.com/tendermint/tendermint/p2p"
)
func initPeer(peer *p2p.Peer) {
//
}
func main() {
// Define channels for our app
chDescs := []ChannelDescriptor{
ChannelDescriptor{
Name: "PEX",
SendBufferSize: 2,
RecvBuffersize: 2,
},
ChannelDescriptor{
Name: "block",
SendBufferSize: 10,
RecvBufferSize: 10,
},
ChannelDescriptor{
Name: "mempool",
SendBufferSize: 100,
RecvBufferSize: 100,
},
ChannelDescriptor{
Name: "consensus",
SendBufferSize: 1000,
RecvBufferSize: 1000,
},
}
// Create the switch
sw := NewSwitch(chDescs)
// Create a listener for incoming connections
l := NewDefaultListener("tcp", ":8001")
go func() {
for {
inConn, ok := <-l.Connections()
if !ok {
break
}
sw.AddPeerWithConnection(inConn, false)
}
}()
// TODO
}

+ 1
- 1
p2p/listener.go View File

@ -7,7 +7,7 @@ import (
. "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/peer/upnp"
"github.com/tendermint/tendermint/p2p/upnp"
)
/*


+ 2
- 0
p2p/peer.go View File

@ -77,6 +77,8 @@ func (p *Peer) TryQueue(pkt Packet) bool {
return false
}
sendQueue <- pkt
return true
select {
case sendQueue <- pkt:
return true


+ 76
- 0
p2p/peer_set.go View File

@ -0,0 +1,76 @@
package p2p
import (
"sync"
)
/*
PeerSet is a special structure for keeping a table of peers.
Iteration over the peers is super fast and thread-safe.
*/
type PeerSet struct {
mtx sync.Mutex
lookup map[string]*peerSetItem
list []*Peer
}
type peerSetItem struct {
peer *Peer
index int
}
func NewPeerSet() *PeerSet {
return &PeerSet{
lookup: make(map[string]*peerSetItem),
list: make([]*Peer, 0, 256),
}
}
func (ps *PeerSet) Add(peer *Peer) bool {
ps.mtx.Lock()
defer ps.mtx.Unlock()
addr := peer.RemoteAddress().String()
if ps.lookup[addr] != nil {
return false
}
index := len(ps.list)
// Appending is safe even with other goroutines
// iterating over the ps.list slice.
ps.list = append(ps.list, peer)
ps.lookup[addr] = &peerSetItem{peer, index}
return true
}
func (ps *PeerSet) Remove(peer *Peer) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
addr := peer.RemoteAddress().String()
item := ps.lookup[addr]
if item == nil {
return
}
index := item.index
// If it's the last peer, that's an easy special case.
if index == len(ps.list)-1 {
ps.list = ps.list[:len(ps.list)-1]
return
}
// Copy the list but without the last element.
newList := make([]*Peer, len(ps.list)-1)
copy(newList, ps.list)
// Move the last item from ps.list to "index" in list.
lastPeer := ps.list[len(ps.list)-1]
lastPeerAddr := lastPeer.RemoteAddress().String()
lastPeerItem := ps.lookup[lastPeerAddr]
newList[index] = lastPeer
lastPeerItem.index = index
ps.list = newList
delete(ps.lookup, addr)
}
// threadsafe list of peers.
func (ps *PeerSet) List() []*Peer {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.list
}

+ 13
- 41
p2p/switch.go View File

@ -2,11 +2,9 @@ package p2p
import (
"errors"
"sync"
"sync/atomic"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/merkle"
)
/*
@ -23,8 +21,7 @@ Incoming messages are received by calling ".Receive()".
type Switch struct {
channels []ChannelDescriptor
pktRecvQueues map[string]chan *InboundPacket
peersMtx sync.Mutex
peers merkle.Tree // addr -> *Peer
peers *PeerSet
quit chan struct{}
stopped uint32
}
@ -44,7 +41,7 @@ func NewSwitch(channels []ChannelDescriptor) *Switch {
s := &Switch{
channels: channels,
pktRecvQueues: pktRecvQueues,
peers: merkle.NewIAVLTree(nil),
peers: NewPeerSet(),
quit: make(chan struct{}),
stopped: 0,
}
@ -54,20 +51,15 @@ func NewSwitch(channels []ChannelDescriptor) *Switch {
func (s *Switch) Stop() {
log.Infof("Stopping switch")
// lock
s.peersMtx.Lock()
if atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
close(s.quit)
// stop each peer.
for peerValue := range s.peers.Values() {
peer := peerValue.(*Peer)
for _, peer := range s.peers.List() {
peer.stop()
}
// empty tree.
s.peers = merkle.NewIAVLTree(nil)
s.peers = NewPeerSet()
}
s.peersMtx.Unlock()
// unlock
}
func (s *Switch) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, error) {
@ -99,8 +91,7 @@ func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) {
}
log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes))
for v := range s.peers.Values() {
peer := v.(*Peer)
for _, peer := range s.peers.List() {
success := peer.TryQueue(pkt)
log.Tracef("Broadcast for peer %v success: %v", peer, success)
if success {
@ -135,12 +126,8 @@ func (s *Switch) Receive(chName string) *InboundPacket {
}
}
func (s *Switch) Peers() merkle.Tree {
// lock & defer
s.peersMtx.Lock()
defer s.peersMtx.Unlock()
return s.peers.Copy()
// unlock deferred
func (s *Switch) Peers() []*Peer {
return s.peers.List()
}
// Disconnect from a peer due to external error.
@ -154,35 +141,20 @@ func (s *Switch) StopPeerForError(peer *Peer, reason interface{}) {
// If graceful is true, last message sent is a disconnect message.
// TODO: handle graceful disconnects.
func (s *Switch) StopPeer(peer *Peer, graceful bool) {
// lock
s.peersMtx.Lock()
peerValue, _ := s.peers.Remove(peer.RemoteAddress())
s.peersMtx.Unlock()
// unlock
peer_ := peerValue.(*Peer)
if peer_ != nil {
peer_.stop()
}
s.peers.Remove(peer)
peer.stop()
}
func (s *Switch) addPeer(peer *Peer) error {
addr := peer.RemoteAddress()
// lock & defer
s.peersMtx.Lock()
defer s.peersMtx.Unlock()
if s.stopped == 1 {
return ErrSwitchStopped
}
if !s.peers.Has(addr) {
log.Tracef("Actually putting addr: %v, peer: %v", addr, peer)
s.peers.Put(addr, peer)
if s.peers.Add(peer) {
log.Tracef("Adding: %v", peer)
return nil
} else {
// ignore duplicate peer for addr.
log.Infof("Ignoring duplicate peer for addr %v", addr)
// ignore duplicate peer
log.Infof("Ignoring duplicate: %v", peer)
return ErrSwitchDuplicatePeer
}
// unlock deferred
}

+ 4
- 4
p2p/switch_test.go View File

@ -59,11 +59,11 @@ func TestSwitches(t *testing.T) {
defer s2.Stop()
// Lets send a message from s1 to s2.
if s1.Peers().Size() != 1 {
t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
if len(s1.Peers()) != 1 {
t.Errorf("Expected exactly 1 peer in s1, got %v", len(s1.Peers()))
}
if s2.Peers().Size() != 1 {
t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
if len(s2.Peers()) != 1 {
t.Errorf("Expected exactly 1 peer in s2, got %v", len(s2.Peers()))
}
// Broadcast a message on ch1


Loading…
Cancel
Save