Browse Source

uuid branch tidying

pull/111/merge
Jae Kwon 10 years ago
parent
commit
d91f073676
8 changed files with 94 additions and 52 deletions
  1. +1
    -0
      config/tendermint_test/config.go
  2. +8
    -8
      consensus/reactor.go
  3. +3
    -3
      mempool/reactor.go
  4. +61
    -24
      p2p/peer_set.go
  5. +5
    -2
      p2p/peer_set_test.go
  6. +11
    -9
      p2p/pex_reactor.go
  7. +3
    -4
      p2p/switch.go
  8. +2
    -2
      state/execution.go

+ 1
- 0
config/tendermint_test/config.go View File

@ -78,6 +78,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("db_dir", rootDir+"/data")
mapConfig.SetDefault("log_level", "debug")
mapConfig.SetDefault("rpc_laddr", "0.0.0.0:36657")
mapConfig.SetDefault("revisions_file", rootDir+"/revisions")
return mapConfig
}


+ 8
- 8
consensus/reactor.go View File

@ -141,16 +141,16 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
// Get round state
rs := conR.conS.GetRoundState()
ps := peer.Data.Get(PeerStateKey).(*PeerState)
_, msg_, err := DecodeMessage(msgBytes)
_, msg, err := DecodeMessage(msgBytes)
if err != nil {
log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes)
log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes)
return
}
log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "rsHeight", rs.Height) //, "bytes", msgBytes)
log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg, "rsHeight", rs.Height) //, "bytes", msgBytes)
switch chId {
case StateChannel:
switch msg := msg_.(type) {
switch msg := msg.(type) {
case *NewRoundStepMessage:
ps.ApplyNewRoundStepMessage(msg, rs)
case *CommitStepMessage:
@ -163,10 +163,10 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
case DataChannel:
if conR.fastSync {
log.Warn("Ignoring message received during fastSync", "msg", msg_)
log.Warn("Ignoring message received during fastSync", "msg", msg)
return
}
switch msg := msg_.(type) {
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
err = conR.conS.SetProposal(msg.Proposal)
@ -181,10 +181,10 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
case VoteChannel:
if conR.fastSync {
log.Warn("Ignoring message received during fastSync", "msg", msg_)
log.Warn("Ignoring message received during fastSync", "msg", msg)
return
}
switch msg := msg_.(type) {
switch msg := msg.(type) {
case *VoteMessage:
vote := msg.Vote
var validators *sm.ValidatorSet


+ 3
- 3
mempool/reactor.go View File

@ -73,14 +73,14 @@ func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// Implements Reactor
func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
_, msg_, err := DecodeMessage(msgBytes)
_, msg, err := DecodeMessage(msgBytes)
if err != nil {
log.Warn("Error decoding message", "error", err)
return
}
log.Info("MempoolReactor received message", "msg", msg_)
log.Info("MempoolReactor received message", "msg", msg)
switch msg := msg_.(type) {
switch msg := msg.(type) {
case *TxMessage:
err := memR.Mempool.AddTx(msg.Tx)
if err != nil {


+ 61
- 24
p2p/peer_set.go View File

@ -1,6 +1,7 @@
package p2p
import (
"fmt"
"net"
"strings"
"sync"
@ -54,7 +55,7 @@ func (ps *PeerSet) Add(peer *Peer) error {
// ensure we havent maxed out connections for the peer's IP range yet
// and update the IP range counters
if !ps.updateIPRangeCounts(peer.Host) {
if !ps.incrIPRangeCounts(peer.Host) {
return ErrSwitchMaxPeersPerIPRange
}
@ -91,6 +92,10 @@ func (ps *PeerSet) Remove(peer *Peer) {
if item == nil {
return
}
// update the IP range counters
ps.decrIPRangeCounts(peer.Host)
index := item.index
// Copy the list but without the last element.
// (we must copy because we're mutating the list)
@ -102,6 +107,7 @@ func (ps *PeerSet) Remove(peer *Peer) {
delete(ps.lookup, peer.Key)
return
}
// Move the last item from ps.list to "index" in list.
lastPeer := ps.list[len(ps.list)-1]
lastPeerKey := lastPeer.Key
@ -110,6 +116,7 @@ func (ps *PeerSet) Remove(peer *Peer) {
lastPeerItem.index = index
ps.list = newList
delete(ps.lookup, peer.Key)
}
func (ps *PeerSet) Size() int {
@ -147,50 +154,80 @@ func (ps *PeerSet) HasMaxForIPRange(conn net.Conn) (ok bool) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
spl := strings.Split(ip, ".")
ipBytes := strings.Split(ip, ".")
c := ps.connectedIPs
for i, ipByte := range spl {
for i, ipByte := range ipBytes {
if c, ok = c.children[ipByte]; !ok {
return false
}
if c.count == maxPeersPerIPRange[i] {
if maxPeersPerIPRange[i] <= c.count {
return true
}
}
return false
}
// Update counts for this address' IP range
// Increments counts for this address' IP range
// Returns false if we already have enough connections
// Not thread safe (only called by ps.Add())
func (ps *PeerSet) updateIPRangeCounts(address string) bool {
spl := strings.Split(address, ".")
func (ps *PeerSet) incrIPRangeCounts(address string) bool {
addrParts := strings.Split(address, ".")
c := ps.connectedIPs
return updateNestedCountRecursive(c, spl, 0)
return incrNestedCounters(c, addrParts, 0)
}
// recursively descend the IP hierarchy, checking if we have
// max peers for each range and updating if not
func updateNestedCountRecursive(c *nestedCounter, ipBytes []string, index int) bool {
if index == len(ipBytes) {
return true
}
// Recursively descend the IP hierarchy, checking if we have
// max peers for each range and incrementing if not.
// Returns false if incr failed because max peers reached for some range counter.
func incrNestedCounters(c *nestedCounter, ipBytes []string, index int) bool {
fmt.Println("incr:", c.count, ipBytes, index)
ipByte := ipBytes[index]
if c2, ok := c.children[ipByte]; !ok {
c2 = NewNestedCounter()
c.children[ipByte] = c2
c = c2
} else {
c = c2
if c.count == maxPeersPerIPRange[index] {
child := c.children[ipByte]
if child == nil {
child = NewNestedCounter()
c.children[ipByte] = child
}
fmt.Println("incr child:", child.count)
if index+1 < len(ipBytes) {
fmt.Println("1>>")
if !incrNestedCounters(child, ipBytes, index+1) {
return false
}
} else {
fmt.Println("2>>")
}
if !updateNestedCountRecursive(c, ipBytes, index+1) {
if maxPeersPerIPRange[index] <= child.count {
return false
} else {
child.count += 1
return true
}
}
// Decrement counts for this address' IP range
func (ps *PeerSet) decrIPRangeCounts(address string) {
addrParts := strings.Split(address, ".")
c := ps.connectedIPs
decrNestedCounters(c, addrParts, 0)
}
// Recursively descend the IP hierarchy, decrementing by one.
// If the counter is zero, deletes the child.
func decrNestedCounters(c *nestedCounter, ipBytes []string, index int) {
ipByte := ipBytes[index]
child := c.children[ipByte]
if child == nil {
log.Error("p2p/peer_set decrNestedCounters encountered a missing child counter")
return
}
if index+1 < len(ipBytes) {
decrNestedCounters(child, ipBytes, index+1)
}
child.count -= 1
if child.count <= 0 {
delete(c.children, ipByte)
}
c.count += 1
return true
}

+ 5
- 2
p2p/peer_set_test.go View File

@ -1,6 +1,7 @@
package p2p
import (
"fmt"
"math/rand"
"strings"
"testing"
@ -24,8 +25,8 @@ func TestAddRemoveOne(t *testing.T) {
peerSet := NewPeerSet()
peer := randPeer()
added := peerSet.Add(peer)
if err := added; err != nil {
err := peerSet.Add(peer)
if err != nil {
t.Errorf("Failed to add new peer")
}
if peerSet.Size() != 1 {
@ -92,6 +93,7 @@ func TestIPRanges(t *testing.T) {
peerSet := NewPeerSet()
// test /8
fmt.Println("---")
maxPeersPerIPRange = [4]int{2, 2, 2, 2}
peer := newPeerInIPRange("54")
if err := peerSet.Add(peer); err != nil {
@ -109,6 +111,7 @@ func TestIPRanges(t *testing.T) {
if err := peerSet.Add(peer); err != nil {
t.Errorf("Failed to add new peer")
}
fmt.Println("---END")
// test /16
peerSet = NewPeerSet()


+ 11
- 9
p2p/pex_reactor.go View File

@ -20,7 +20,6 @@ const (
PexChannel = byte(0x00)
ensurePeersPeriodSeconds = 30
minNumOutboundPeers = 10
maxNumPeers = 50
)
/*
@ -78,12 +77,14 @@ func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor {
func (pexR *PEXReactor) AddPeer(peer *Peer) {
// Add the peer to the address book
netAddr := NewNetAddressString(fmt.Sprintf("%s:%d", peer.Host, peer.P2PPort))
pexR.book.AddAddress(netAddr, netAddr) // the peer is its own source
if peer.IsOutbound() {
if pexR.book.NeedMoreAddrs() {
pexR.RequestPEX(peer)
}
} else {
// For inbound connections, the peer is its own source
// (For outbound peers, the address is already in the books)
pexR.book.AddAddress(netAddr, netAddr)
}
}
@ -104,7 +105,7 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
}
log.Info("Received message", "msg", msg)
switch msgT := msg.(type) {
switch msg := msg.(type) {
case *pexRequestMessage:
// src requested some peers.
// TODO: prevent abuse.
@ -114,7 +115,7 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
// TODO: prevent abuse.
// (We don't want to get spammed with bad peers)
srcAddr := src.Connection().RemoteAddress
for _, addr := range msgT.Addrs {
for _, addr := range msg.Addrs {
pexR.book.AddAddress(addr, srcAddr)
}
default:
@ -210,12 +211,13 @@ func (pexR *PEXReactor) ensurePeers() {
}(item.(*NetAddress))
}
// if no addresses to dial, pick a random connected peer and ask for more peers
if toDial.Size() == 0 {
// If we need more addresses, pick a random peer and ask for more.
if pexR.book.NeedMoreAddrs() {
if peers := pexR.sw.Peers().List(); len(peers) > 0 {
i := rand.Int() % len(peers)
log.Debug("No addresses to dial. Sending pexRequest to random peer", "peer", peers[i])
pexR.RequestPEX(peers[i])
peer := peers[i]
log.Debug("No addresses to dial. Sending pexRequest to random peer", "peer", peer)
pexR.RequestPEX(peer)
}
}
}


+ 3
- 4
p2p/switch.go View File

@ -57,7 +57,8 @@ var (
)
const (
peerDialTimeoutSeconds = 3
peerDialTimeoutSeconds = 3 // TODO make this configurable
maxNumPeers = 50 // TODO make this configurable
)
func NewSwitch() *Switch {
@ -308,9 +309,7 @@ func (sw *Switch) listenerRoutine(l Listener) {
}
// ignore connection if we already have enough
// note we might exceed the maxNumPeers in order to
// achieve minNumOutboundPeers
if sw.peers.Size() >= maxNumPeers {
if maxNumPeers <= sw.peers.Size() {
log.Debug("Ignoring inbound connection: already have enough peers", "conn", inConn, "numPeers", sw.peers.Size(), "max", maxNumPeers)
continue
}


+ 2
- 2
state/execution.go View File

@ -293,14 +293,14 @@ func adjustByOutputs(accounts map[string]*account.Account, outs []*types.TxOutpu
// If the tx is invalid, an error will be returned.
// Unlike ExecBlock(), state will not be altered.
func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool, evc events.Fireable) (err error) {
func ExecTx(blockCache *BlockCache, tx types.Tx, runCall bool, evc events.Fireable) (err error) {
// TODO: do something with fees
fees := int64(0)
_s := blockCache.State() // hack to access validators and block height
// Exec tx
switch tx := tx_.(type) {
switch tx := tx.(type) {
case *types.SendTx:
accounts, err := getInputs(blockCache, tx.Inputs)
if err != nil {


Loading…
Cancel
Save