From 5fa540bdc9eac01cc3df6de01119550d1122114b Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 27 Mar 2019 16:45:34 +0100 Subject: [PATCH] mempool: add a safety check, write tests for mempoolIDs (#3487) * mempool: add a safety check, write tests for mempoolIDs and document 65536 limit in the mempool reactor spec follow-up to https://github.com/tendermint/tendermint/pull/2778 * rename the test * fixes after Ismail's review --- consensus/state_test.go | 6 +- docs/spec/reactors/mempool/reactor.md | 8 ++- mempool/mempool.go | 5 +- mempool/reactor.go | 24 +++++-- mempool/reactor_test.go | 35 +++++++++ p2p/dummy/peer.go | 100 -------------------------- p2p/mock/peer.go | 68 ++++++++++++++++++ p2p/pex/pex_reactor_test.go | 53 ++------------ 8 files changed, 140 insertions(+), 159 deletions(-) delete mode 100644 p2p/dummy/peer.go create mode 100644 p2p/mock/peer.go diff --git a/consensus/state_test.go b/consensus/state_test.go index a4d01e584..fc1e3e949 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -14,7 +14,7 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" - p2pdummy "github.com/tendermint/tendermint/p2p/dummy" + p2pmock "github.com/tendermint/tendermint/p2p/mock" "github.com/tendermint/tendermint/types" ) @@ -1548,7 +1548,7 @@ func TestStateHalt1(t *testing.T) { func TestStateOutputsBlockPartsStats(t *testing.T) { // create dummy peer cs, _ := randConsensusState(1) - peer := p2pdummy.NewPeer() + peer := p2pmock.NewPeer(nil) // 1) new block part parts := types.NewPartSetFromData(cmn.RandBytes(100), 10) @@ -1591,7 +1591,7 @@ func TestStateOutputsBlockPartsStats(t *testing.T) { func TestStateOutputVoteStats(t *testing.T) { cs, vss := randConsensusState(2) // create dummy peer - peer := p2pdummy.NewPeer() + peer := p2pmock.NewPeer(nil) vote := signVote(vss[1], types.PrecommitType, []byte("test"), types.PartSetHeader{}) diff --git a/docs/spec/reactors/mempool/reactor.md b/docs/spec/reactors/mempool/reactor.md index d0b19f7ca..d349fc7cc 100644 --- a/docs/spec/reactors/mempool/reactor.md +++ b/docs/spec/reactors/mempool/reactor.md @@ -13,4 +13,10 @@ for details. Sending incorrectly encoded data or data exceeding `maxMsgSize` will result in stopping the peer. -The mempool will not send a tx back to any peer which it received it from. \ No newline at end of file +The mempool will not send a tx back to any peer which it received it from. + +The reactor assigns an `uint16` number for each peer and maintains a map from +p2p.ID to `uint16`. Each mempool transaction carries a list of all the senders +(`[]uint16`). The list is updated every time mempool receives a transaction it +is already seen. `uint16` assumes that a node will never have over 65535 active +peers (0 is reserved for unknown source - e.g. RPC). diff --git a/mempool/mempool.go b/mempool/mempool.go index 2064b7bce..bd3cbf7d9 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -382,7 +382,10 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo if !mem.cache.Push(tx) { // record the sender e, ok := mem.txsMap[sha256.Sum256(tx)] - if ok { // tx may be in cache, but not in the mempool + // The check is needed because tx may be in cache, but not in the mempool. + // E.g. after we've committed a block, txs are removed from the mempool, + // but not from the cache. + if ok { memTx := e.Value.(*mempoolTx) if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded { // TODO: consider punishing peer for dups, diff --git a/mempool/reactor.go b/mempool/reactor.go index 23fec2700..e1376b287 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -2,6 +2,7 @@ package mempool import ( "fmt" + "math" "reflect" "sync" "time" @@ -26,6 +27,8 @@ const ( // UnknownPeerID is the peer ID to use when running CheckTx when there is // no peer (e.g. RPC) UnknownPeerID uint16 = 0 + + maxActiveIDs = math.MaxUint16 ) // MempoolReactor handles mempool tx broadcasting amongst peers. @@ -45,7 +48,8 @@ type mempoolIDs struct { activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter } -// Reserve searches for the next unused ID and assignes it to the peer. +// Reserve searches for the next unused ID and assignes it to the +// peer. func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { ids.mtx.Lock() defer ids.mtx.Unlock() @@ -58,6 +62,10 @@ func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { // nextPeerID returns the next unused peer ID to use. // This assumes that ids's mutex is already locked. func (ids *mempoolIDs) nextPeerID() uint16 { + if len(ids.activeIDs) == maxActiveIDs { + panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", maxActiveIDs)) + } + _, idExists := ids.activeIDs[ids.nextID] for idExists { ids.nextID++ @@ -88,16 +96,20 @@ func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 { return ids.peerMap[peer.ID()] } +func newMempoolIDs() *mempoolIDs { + return &mempoolIDs{ + peerMap: make(map[p2p.ID]uint16), + activeIDs: map[uint16]struct{}{0: {}}, + nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx + } +} + // NewMempoolReactor returns a new MempoolReactor with the given config and mempool. func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor { memR := &MempoolReactor{ config: config, Mempool: mempool, - ids: &mempoolIDs{ - peerMap: make(map[p2p.ID]uint16), - activeIDs: map[uint16]struct{}{0: {}}, - nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx - }, + ids: newMempoolIDs(), } memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR) return memR diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index f16f84479..c9cf49809 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -2,6 +2,7 @@ package mempool import ( "fmt" + "net" "sync" "testing" "time" @@ -15,6 +16,7 @@ import ( cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/mock" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -189,3 +191,36 @@ func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { // i.e. broadcastTxRoutine finishes when reactor is stopped leaktest.CheckTimeout(t, 10*time.Second)() } + +func TestMempoolIDsBasic(t *testing.T) { + ids := newMempoolIDs() + + peer := mock.NewPeer(net.IP{127, 0, 0, 1}) + + ids.ReserveForPeer(peer) + assert.EqualValues(t, 1, ids.GetForPeer(peer)) + ids.Reclaim(peer) + + ids.ReserveForPeer(peer) + assert.EqualValues(t, 2, ids.GetForPeer(peer)) + ids.Reclaim(peer) +} + +func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { + if testing.Short() { + return + } + + // 0 is already reserved for UnknownPeerID + ids := newMempoolIDs() + + for i := 0; i < maxActiveIDs-1; i++ { + peer := mock.NewPeer(net.IP{127, 0, 0, 1}) + ids.ReserveForPeer(peer) + } + + assert.Panics(t, func() { + peer := mock.NewPeer(net.IP{127, 0, 0, 1}) + ids.ReserveForPeer(peer) + }) +} diff --git a/p2p/dummy/peer.go b/p2p/dummy/peer.go deleted file mode 100644 index 57edafc67..000000000 --- a/p2p/dummy/peer.go +++ /dev/null @@ -1,100 +0,0 @@ -package dummy - -import ( - "net" - - cmn "github.com/tendermint/tendermint/libs/common" - p2p "github.com/tendermint/tendermint/p2p" - tmconn "github.com/tendermint/tendermint/p2p/conn" -) - -type peer struct { - cmn.BaseService - kv map[string]interface{} -} - -var _ p2p.Peer = (*peer)(nil) - -// NewPeer creates new dummy peer. -func NewPeer() *peer { - p := &peer{ - kv: make(map[string]interface{}), - } - p.BaseService = *cmn.NewBaseService(nil, "peer", p) - - return p -} - -// FlushStop just calls Stop. -func (p *peer) FlushStop() { - p.Stop() -} - -// ID always returns dummy. -func (p *peer) ID() p2p.ID { - return p2p.ID("dummy") -} - -// IsOutbound always returns false. -func (p *peer) IsOutbound() bool { - return false -} - -// IsPersistent always returns false. -func (p *peer) IsPersistent() bool { - return false -} - -// NodeInfo always returns empty node info. -func (p *peer) NodeInfo() p2p.NodeInfo { - return p2p.DefaultNodeInfo{} -} - -// RemoteIP always returns localhost. -func (p *peer) RemoteIP() net.IP { - return net.ParseIP("127.0.0.1") -} - -// Addr always returns tcp://localhost:8800. -func (p *peer) RemoteAddr() net.Addr { - return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8800} -} - -// CloseConn always returns nil. -func (p *peer) CloseConn() error { - return nil -} - -// Status always returns empry connection status. -func (p *peer) Status() tmconn.ConnectionStatus { - return tmconn.ConnectionStatus{} -} - -// Send does not do anything and just returns true. -func (p *peer) Send(byte, []byte) bool { - return true -} - -// TrySend does not do anything and just returns true. -func (p *peer) TrySend(byte, []byte) bool { - return true -} - -// Set records value under key specified in the map. -func (p *peer) Set(key string, value interface{}) { - p.kv[key] = value -} - -// Get returns a value associated with the key. Nil is returned if no value -// found. -func (p *peer) Get(key string) interface{} { - if value, ok := p.kv[key]; ok { - return value - } - return nil -} - -// OriginalAddr always returns nil. -func (p *peer) OriginalAddr() *p2p.NetAddress { - return nil -} diff --git a/p2p/mock/peer.go b/p2p/mock/peer.go new file mode 100644 index 000000000..5ee81f67e --- /dev/null +++ b/p2p/mock/peer.go @@ -0,0 +1,68 @@ +package mock + +import ( + "net" + + "github.com/tendermint/tendermint/crypto/ed25519" + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/conn" +) + +type Peer struct { + *cmn.BaseService + ip net.IP + id p2p.ID + addr *p2p.NetAddress + kv map[string]interface{} + Outbound, Persistent bool +} + +// NewPeer creates and starts a new mock peer. If the ip +// is nil, random routable address is used. +func NewPeer(ip net.IP) *Peer { + var netAddr *p2p.NetAddress + if ip == nil { + _, netAddr = p2p.CreateRoutableAddr() + } else { + netAddr = p2p.NewNetAddressIPPort(ip, 26656) + } + nodeKey := p2p.NodeKey{PrivKey: ed25519.GenPrivKey()} + netAddr.ID = nodeKey.ID() + mp := &Peer{ + ip: ip, + id: nodeKey.ID(), + addr: netAddr, + kv: make(map[string]interface{}), + } + mp.BaseService = cmn.NewBaseService(nil, "MockPeer", mp) + mp.Start() + return mp +} + +func (mp *Peer) FlushStop() { mp.Stop() } +func (mp *Peer) TrySend(chID byte, msgBytes []byte) bool { return true } +func (mp *Peer) Send(chID byte, msgBytes []byte) bool { return true } +func (mp *Peer) NodeInfo() p2p.NodeInfo { + return p2p.DefaultNodeInfo{ + ID_: mp.addr.ID, + ListenAddr: mp.addr.DialString(), + } +} +func (mp *Peer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} } +func (mp *Peer) ID() p2p.ID { return mp.id } +func (mp *Peer) IsOutbound() bool { return mp.Outbound } +func (mp *Peer) IsPersistent() bool { return mp.Persistent } +func (mp *Peer) Get(key string) interface{} { + if value, ok := mp.kv[key]; ok { + return value + } + return nil +} +func (mp *Peer) Set(key string, value interface{}) { + mp.kv[key] = value +} +func (mp *Peer) RemoteIP() net.IP { return mp.ip } +func (mp *Peer) OriginalAddr() *p2p.NetAddress { return mp.addr } +func (mp *Peer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} } +func (mp *Peer) CloseConn() error { return nil } diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 4f4ccb039..9e23058a5 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -3,7 +3,6 @@ package pex import ( "fmt" "io/ioutil" - "net" "os" "path/filepath" "testing" @@ -12,14 +11,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/crypto" - "github.com/tendermint/tendermint/crypto/ed25519" - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/p2p/conn" + "github.com/tendermint/tendermint/p2p/mock" ) var ( @@ -148,7 +143,7 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) { sw := createSwitchAndAddReactors(r) sw.SetAddrBook(book) - peer := newMockPeer() + peer := mock.NewPeer(nil) p2p.AddPeerToSwitch(sw, peer) assert.True(t, sw.Peers().Has(peer.ID())) @@ -178,7 +173,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) { sw := createSwitchAndAddReactors(r) sw.SetAddrBook(book) - peer := newMockPeer() + peer := mock.NewPeer(nil) p2p.AddPeerToSwitch(sw, peer) assert.True(t, sw.Peers().Has(peer.ID())) @@ -418,7 +413,7 @@ func TestPEXReactorDialPeer(t *testing.T) { sw := createSwitchAndAddReactors(pexR) sw.SetAddrBook(book) - peer := newMockPeer() + peer := mock.NewPeer(nil) addr := peer.NodeInfo().NetAddress() assert.Equal(t, 0, pexR.AttemptsToDial(addr)) @@ -444,44 +439,6 @@ func TestPEXReactorDialPeer(t *testing.T) { } } -type mockPeer struct { - *cmn.BaseService - pubKey crypto.PubKey - addr *p2p.NetAddress - outbound, persistent bool -} - -func newMockPeer() mockPeer { - _, netAddr := p2p.CreateRoutableAddr() - mp := mockPeer{ - addr: netAddr, - pubKey: ed25519.GenPrivKey().PubKey(), - } - mp.BaseService = cmn.NewBaseService(nil, "MockPeer", mp) - mp.Start() - return mp -} - -func (mp mockPeer) FlushStop() { mp.Stop() } -func (mp mockPeer) ID() p2p.ID { return mp.addr.ID } -func (mp mockPeer) IsOutbound() bool { return mp.outbound } -func (mp mockPeer) IsPersistent() bool { return mp.persistent } -func (mp mockPeer) NodeInfo() p2p.NodeInfo { - return p2p.DefaultNodeInfo{ - ID_: mp.addr.ID, - ListenAddr: mp.addr.DialString(), - } -} -func (mockPeer) RemoteIP() net.IP { return net.ParseIP("127.0.0.1") } -func (mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} } -func (mockPeer) Send(byte, []byte) bool { return false } -func (mockPeer) TrySend(byte, []byte) bool { return false } -func (mockPeer) Set(string, interface{}) {} -func (mockPeer) Get(string) interface{} { return nil } -func (mockPeer) OriginalAddr() *p2p.NetAddress { return nil } -func (mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8800} } -func (mockPeer) CloseConn() error { return nil } - func assertPeersWithTimeout( t *testing.T, switches []*p2p.Switch,