diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index b044548c2..0bedcc67c 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -78,6 +78,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("db_dir", rootDir+"/data") mapConfig.SetDefault("log_level", "debug") mapConfig.SetDefault("rpc_laddr", "0.0.0.0:36657") + mapConfig.SetDefault("revisions_file", rootDir+"/revisions") return mapConfig } diff --git a/consensus/reactor.go b/consensus/reactor.go index 6f08a09a8..f357987b1 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -141,16 +141,16 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte // Get round state rs := conR.conS.GetRoundState() ps := peer.Data.Get(PeerStateKey).(*PeerState) - _, msg_, err := DecodeMessage(msgBytes) + _, msg, err := DecodeMessage(msgBytes) if err != nil { - log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg_, "error", err, "bytes", msgBytes) + log.Warn("Error decoding message", "channel", chId, "peer", peer, "msg", msg, "error", err, "bytes", msgBytes) return } - log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg_, "rsHeight", rs.Height) //, "bytes", msgBytes) + log.Debug("Receive", "channel", chId, "peer", peer, "msg", msg, "rsHeight", rs.Height) //, "bytes", msgBytes) switch chId { case StateChannel: - switch msg := msg_.(type) { + switch msg := msg.(type) { case *NewRoundStepMessage: ps.ApplyNewRoundStepMessage(msg, rs) case *CommitStepMessage: @@ -163,10 +163,10 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte case DataChannel: if conR.fastSync { - log.Warn("Ignoring message received during fastSync", "msg", msg_) + log.Warn("Ignoring message received during fastSync", "msg", msg) return } - switch msg := msg_.(type) { + switch msg := msg.(type) { case *ProposalMessage: ps.SetHasProposal(msg.Proposal) err = conR.conS.SetProposal(msg.Proposal) @@ -181,10 +181,10 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte case VoteChannel: if conR.fastSync { - log.Warn("Ignoring message received during fastSync", "msg", msg_) + log.Warn("Ignoring message received during fastSync", "msg", msg) return } - switch msg := msg_.(type) { + switch msg := msg.(type) { case *VoteMessage: vote := msg.Vote var validators *sm.ValidatorSet diff --git a/mempool/reactor.go b/mempool/reactor.go index 28af17438..327eef51a 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -73,14 +73,14 @@ func (pexR *MempoolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // Implements Reactor func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) { - _, msg_, err := DecodeMessage(msgBytes) + _, msg, err := DecodeMessage(msgBytes) if err != nil { log.Warn("Error decoding message", "error", err) return } - log.Info("MempoolReactor received message", "msg", msg_) + log.Info("MempoolReactor received message", "msg", msg) - switch msg := msg_.(type) { + switch msg := msg.(type) { case *TxMessage: err := memR.Mempool.AddTx(msg.Tx) if err != nil { diff --git a/p2p/peer_set.go b/p2p/peer_set.go index 2033529d6..8e85af555 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -1,6 +1,7 @@ package p2p import ( + "fmt" "net" "strings" "sync" @@ -54,7 +55,7 @@ func (ps *PeerSet) Add(peer *Peer) error { // ensure we havent maxed out connections for the peer's IP range yet // and update the IP range counters - if !ps.updateIPRangeCounts(peer.Host) { + if !ps.incrIPRangeCounts(peer.Host) { return ErrSwitchMaxPeersPerIPRange } @@ -91,6 +92,10 @@ func (ps *PeerSet) Remove(peer *Peer) { if item == nil { return } + + // update the IP range counters + ps.decrIPRangeCounts(peer.Host) + index := item.index // Copy the list but without the last element. // (we must copy because we're mutating the list) @@ -102,6 +107,7 @@ func (ps *PeerSet) Remove(peer *Peer) { delete(ps.lookup, peer.Key) return } + // Move the last item from ps.list to "index" in list. lastPeer := ps.list[len(ps.list)-1] lastPeerKey := lastPeer.Key @@ -110,6 +116,7 @@ func (ps *PeerSet) Remove(peer *Peer) { lastPeerItem.index = index ps.list = newList delete(ps.lookup, peer.Key) + } func (ps *PeerSet) Size() int { @@ -147,50 +154,80 @@ func (ps *PeerSet) HasMaxForIPRange(conn net.Conn) (ok bool) { ps.mtx.Lock() defer ps.mtx.Unlock() ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) - spl := strings.Split(ip, ".") + ipBytes := strings.Split(ip, ".") c := ps.connectedIPs - for i, ipByte := range spl { + for i, ipByte := range ipBytes { if c, ok = c.children[ipByte]; !ok { return false } - if c.count == maxPeersPerIPRange[i] { + if maxPeersPerIPRange[i] <= c.count { return true } } return false } -// Update counts for this address' IP range +// Increments counts for this address' IP range // Returns false if we already have enough connections // Not thread safe (only called by ps.Add()) -func (ps *PeerSet) updateIPRangeCounts(address string) bool { - spl := strings.Split(address, ".") +func (ps *PeerSet) incrIPRangeCounts(address string) bool { + addrParts := strings.Split(address, ".") c := ps.connectedIPs - return updateNestedCountRecursive(c, spl, 0) + return incrNestedCounters(c, addrParts, 0) } -// recursively descend the IP hierarchy, checking if we have -// max peers for each range and updating if not -func updateNestedCountRecursive(c *nestedCounter, ipBytes []string, index int) bool { - if index == len(ipBytes) { - return true - } +// Recursively descend the IP hierarchy, checking if we have +// max peers for each range and incrementing if not. +// Returns false if incr failed because max peers reached for some range counter. +func incrNestedCounters(c *nestedCounter, ipBytes []string, index int) bool { + fmt.Println("incr:", c.count, ipBytes, index) ipByte := ipBytes[index] - if c2, ok := c.children[ipByte]; !ok { - c2 = NewNestedCounter() - c.children[ipByte] = c2 - c = c2 - } else { - c = c2 - if c.count == maxPeersPerIPRange[index] { + child := c.children[ipByte] + if child == nil { + child = NewNestedCounter() + c.children[ipByte] = child + } + fmt.Println("incr child:", child.count) + if index+1 < len(ipBytes) { + fmt.Println("1>>") + if !incrNestedCounters(child, ipBytes, index+1) { return false } + } else { + fmt.Println("2>>") } - if !updateNestedCountRecursive(c, ipBytes, index+1) { + if maxPeersPerIPRange[index] <= child.count { return false + } else { + child.count += 1 + return true + } +} + +// Decrement counts for this address' IP range +func (ps *PeerSet) decrIPRangeCounts(address string) { + addrParts := strings.Split(address, ".") + + c := ps.connectedIPs + decrNestedCounters(c, addrParts, 0) +} + +// Recursively descend the IP hierarchy, decrementing by one. +// If the counter is zero, deletes the child. +func decrNestedCounters(c *nestedCounter, ipBytes []string, index int) { + ipByte := ipBytes[index] + child := c.children[ipByte] + if child == nil { + log.Error("p2p/peer_set decrNestedCounters encountered a missing child counter") + return + } + if index+1 < len(ipBytes) { + decrNestedCounters(child, ipBytes, index+1) + } + child.count -= 1 + if child.count <= 0 { + delete(c.children, ipByte) } - c.count += 1 - return true } diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index dfd8ee153..6d47ca73b 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -1,6 +1,7 @@ package p2p import ( + "fmt" "math/rand" "strings" "testing" @@ -24,8 +25,8 @@ func TestAddRemoveOne(t *testing.T) { peerSet := NewPeerSet() peer := randPeer() - added := peerSet.Add(peer) - if err := added; err != nil { + err := peerSet.Add(peer) + if err != nil { t.Errorf("Failed to add new peer") } if peerSet.Size() != 1 { @@ -92,6 +93,7 @@ func TestIPRanges(t *testing.T) { peerSet := NewPeerSet() // test /8 + fmt.Println("---") maxPeersPerIPRange = [4]int{2, 2, 2, 2} peer := newPeerInIPRange("54") if err := peerSet.Add(peer); err != nil { @@ -109,6 +111,7 @@ func TestIPRanges(t *testing.T) { if err := peerSet.Add(peer); err != nil { t.Errorf("Failed to add new peer") } + fmt.Println("---END") // test /16 peerSet = NewPeerSet() diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 93e57e6da..f3d84423d 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -20,7 +20,6 @@ const ( PexChannel = byte(0x00) ensurePeersPeriodSeconds = 30 minNumOutboundPeers = 10 - maxNumPeers = 50 ) /* @@ -78,12 +77,14 @@ func (pexR *PEXReactor) GetChannels() []*ChannelDescriptor { func (pexR *PEXReactor) AddPeer(peer *Peer) { // Add the peer to the address book netAddr := NewNetAddressString(fmt.Sprintf("%s:%d", peer.Host, peer.P2PPort)) - pexR.book.AddAddress(netAddr, netAddr) // the peer is its own source - if peer.IsOutbound() { if pexR.book.NeedMoreAddrs() { pexR.RequestPEX(peer) } + } else { + // For inbound connections, the peer is its own source + // (For outbound peers, the address is already in the books) + pexR.book.AddAddress(netAddr, netAddr) } } @@ -104,7 +105,7 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { } log.Info("Received message", "msg", msg) - switch msgT := msg.(type) { + switch msg := msg.(type) { case *pexRequestMessage: // src requested some peers. // TODO: prevent abuse. @@ -114,7 +115,7 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { // TODO: prevent abuse. // (We don't want to get spammed with bad peers) srcAddr := src.Connection().RemoteAddress - for _, addr := range msgT.Addrs { + for _, addr := range msg.Addrs { pexR.book.AddAddress(addr, srcAddr) } default: @@ -210,12 +211,13 @@ func (pexR *PEXReactor) ensurePeers() { }(item.(*NetAddress)) } - // if no addresses to dial, pick a random connected peer and ask for more peers - if toDial.Size() == 0 { + // If we need more addresses, pick a random peer and ask for more. + if pexR.book.NeedMoreAddrs() { if peers := pexR.sw.Peers().List(); len(peers) > 0 { i := rand.Int() % len(peers) - log.Debug("No addresses to dial. Sending pexRequest to random peer", "peer", peers[i]) - pexR.RequestPEX(peers[i]) + peer := peers[i] + log.Debug("No addresses to dial. Sending pexRequest to random peer", "peer", peer) + pexR.RequestPEX(peer) } } } diff --git a/p2p/switch.go b/p2p/switch.go index cbd627b98..21a4d8a1f 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -57,7 +57,8 @@ var ( ) const ( - peerDialTimeoutSeconds = 3 + peerDialTimeoutSeconds = 3 // TODO make this configurable + maxNumPeers = 50 // TODO make this configurable ) func NewSwitch() *Switch { @@ -308,9 +309,7 @@ func (sw *Switch) listenerRoutine(l Listener) { } // ignore connection if we already have enough - // note we might exceed the maxNumPeers in order to - // achieve minNumOutboundPeers - if sw.peers.Size() >= maxNumPeers { + if maxNumPeers <= sw.peers.Size() { log.Debug("Ignoring inbound connection: already have enough peers", "conn", inConn, "numPeers", sw.peers.Size(), "max", maxNumPeers) continue } diff --git a/state/execution.go b/state/execution.go index 11ff01002..7097d7b75 100644 --- a/state/execution.go +++ b/state/execution.go @@ -293,14 +293,14 @@ func adjustByOutputs(accounts map[string]*account.Account, outs []*types.TxOutpu // If the tx is invalid, an error will be returned. // Unlike ExecBlock(), state will not be altered. -func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool, evc events.Fireable) (err error) { +func ExecTx(blockCache *BlockCache, tx types.Tx, runCall bool, evc events.Fireable) (err error) { // TODO: do something with fees fees := int64(0) _s := blockCache.State() // hack to access validators and block height // Exec tx - switch tx := tx_.(type) { + switch tx := tx.(type) { case *types.SendTx: accounts, err := getInputs(blockCache, tx.Inputs) if err != nil {