You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

735 lines
19 KiB

package p2p
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"regexp"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p/conn"
)
var (
cfg *config.P2PConfig
)
func init() {
cfg = config.DefaultP2PConfig()
cfg.PexReactor = true
cfg.AllowDuplicateIP = true
}
type PeerMessage struct {
PeerID ID
Bytes []byte
Counter int
}
type TestReactor struct {
BaseReactor
mtx sync.Mutex
channels []*conn.ChannelDescriptor
logMessages bool
msgsCounter int
msgsReceived map[byte][]PeerMessage
}
func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
tr := &TestReactor{
channels: channels,
logMessages: logMessages,
msgsReceived: make(map[byte][]PeerMessage),
}
tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
tr.SetLogger(log.TestingLogger())
return tr
}
func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
return tr.channels
}
func (tr *TestReactor) AddPeer(peer Peer) {}
func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
if tr.logMessages {
tr.mtx.Lock()
defer tr.mtx.Unlock()
//fmt.Printf("Received: %X, %X\n", chID, msgBytes)
tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
tr.msgsCounter++
}
}
func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
tr.mtx.Lock()
defer tr.mtx.Unlock()
return tr.msgsReceived[chID]
}
//-----------------------------------------------------------------------------
// convenience method for creating two switches connected to each other.
// XXX: note this uses net.Pipe and not a proper TCP conn
func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
// Create two switches that will be interconnected.
switches := MakeConnectedSwitches(cfg, 2, initSwitch, Connect2Switches)
return switches[0], switches[1]
}
func initSwitchFunc(i int, sw *Switch) *Switch {
sw.SetAddrBook(&addrBookMock{
addrs: make(map[string]struct{}),
ourAddrs: make(map[string]struct{})})
// Make two reactors of two channels each
sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x00), Priority: 10},
{ID: byte(0x01), Priority: 10},
}, true))
sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x02), Priority: 10},
{ID: byte(0x03), Priority: 10},
}, true))
return sw
}
func TestSwitches(t *testing.T) {
s1, s2 := MakeSwitchPair(t, initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
if s1.Peers().Size() != 1 {
t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
}
if s2.Peers().Size() != 1 {
t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
}
// Lets send some messages
ch0Msg := []byte("channel zero")
ch1Msg := []byte("channel foo")
ch2Msg := []byte("channel bar")
s1.Broadcast(byte(0x00), ch0Msg)
s1.Broadcast(byte(0x01), ch1Msg)
s1.Broadcast(byte(0x02), ch2Msg)
assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
}
func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
ticker := time.NewTicker(checkPeriod)
for {
select {
case <-ticker.C:
msgs := reactor.getMsgs(channel)
if len(msgs) > 0 {
if !bytes.Equal(msgs[0].Bytes, msgBytes) {
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msgBytes, msgs[0].Bytes)
}
return
}
case <-time.After(timeout):
t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
}
}
}
func TestSwitchFiltersOutItself(t *testing.T) {
s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc)
// simulate s1 having a public IP by creating a remote peer with the same ID
rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: cfg}
rp.Start()
// addr should be rejected in addPeer based on the same ID
err := s1.DialPeerWithAddress(rp.Addr())
if assert.Error(t, err) {
if err, ok := err.(ErrRejected); ok {
if !err.IsSelf() {
t.Errorf("expected self to be rejected")
}
} else {
t.Errorf("expected ErrRejected")
}
}
assert.True(t, s1.addrBook.OurAddress(rp.Addr()))
assert.False(t, s1.addrBook.HasAddress(rp.Addr()))
rp.Stop()
assertNoPeersAfterTimeout(t, s1, 100*time.Millisecond)
}
func TestSwitchPeerFilter(t *testing.T) {
var (
filters = []PeerFilterFunc{
func(_ IPeerSet, _ Peer) error { return nil },
func(_ IPeerSet, _ Peer) error { return fmt.Errorf("denied!") },
func(_ IPeerSet, _ Peer) error { return nil },
}
sw = MakeSwitch(
cfg,
1,
"testing",
"123.123.123",
initSwitchFunc,
SwitchPeerFilters(filters...),
)
)
defer sw.Stop()
// simulate remote peer
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.isPeerPersistentFn(),
reactorsByCh: sw.reactorsByCh,
})
if err != nil {
t.Fatal(err)
}
err = sw.addPeer(p)
if err, ok := err.(ErrRejected); ok {
if !err.IsFiltered() {
t.Errorf("expected peer to be filtered")
}
} else {
t.Errorf("expected ErrRejected")
}
}
func TestSwitchPeerFilterTimeout(t *testing.T) {
var (
filters = []PeerFilterFunc{
func(_ IPeerSet, _ Peer) error {
time.Sleep(10 * time.Millisecond)
return nil
},
}
sw = MakeSwitch(
cfg,
1,
"testing",
"123.123.123",
initSwitchFunc,
SwitchFilterTimeout(5*time.Millisecond),
SwitchPeerFilters(filters...),
)
)
defer sw.Stop()
// simulate remote peer
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.isPeerPersistentFn(),
reactorsByCh: sw.reactorsByCh,
})
if err != nil {
t.Fatal(err)
}
err = sw.addPeer(p)
if _, ok := err.(ErrFilterTimeout); !ok {
t.Errorf("expected ErrFilterTimeout")
}
}
func TestSwitchPeerFilterDuplicate(t *testing.T) {
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
sw.Start()
defer sw.Stop()
// simulate remote peer
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.isPeerPersistentFn(),
reactorsByCh: sw.reactorsByCh,
})
if err != nil {
t.Fatal(err)
}
if err := sw.addPeer(p); err != nil {
t.Fatal(err)
}
err = sw.addPeer(p)
if errRej, ok := err.(ErrRejected); ok {
if !errRej.IsDuplicate() {
t.Errorf("expected peer to be duplicate. got %v", errRej)
}
} else {
t.Errorf("expected ErrRejected, got %v", err)
}
}
func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
time.Sleep(timeout)
if sw.Peers().Size() != 0 {
t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size())
}
}
func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
assert, require := assert.New(t), require.New(t)
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
if err != nil {
t.Error(err)
}
defer sw.Stop()
// simulate remote peer
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.isPeerPersistentFn(),
reactorsByCh: sw.reactorsByCh,
})
require.Nil(err)
err = sw.addPeer(p)
require.Nil(err)
require.NotNil(sw.Peers().Get(rp.ID()))
// simulate failure by closing connection
p.(*peer).CloseConn()
assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
assert.False(p.IsRunning())
}
func TestSwitchStopPeerForError(t *testing.T) {
s := httptest.NewServer(stdprometheus.UninstrumentedHandler())
defer s.Close()
scrapeMetrics := func() string {
resp, _ := http.Get(s.URL)
buf, _ := ioutil.ReadAll(resp.Body)
return string(buf)
}
namespace, subsystem, name := config.TestInstrumentationConfig().Namespace, MetricsSubsystem, "peers"
re := regexp.MustCompile(namespace + `_` + subsystem + `_` + name + ` ([0-9\.]+)`)
peersMetricValue := func() float64 {
matches := re.FindStringSubmatch(scrapeMetrics())
f, _ := strconv.ParseFloat(matches[1], 64)
return f
}
p2pMetrics := PrometheusMetrics(namespace)
// make two connected switches
sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch) *Switch {
// set metrics on sw1
if i == 0 {
opt := WithMetrics(p2pMetrics)
opt(sw)
}
return initSwitchFunc(i, sw)
})
assert.Equal(t, len(sw1.Peers().List()), 1)
assert.EqualValues(t, 1, peersMetricValue())
// send messages to the peer from sw1
p := sw1.Peers().List()[0]
p.Send(0x1, []byte("here's a message to send"))
// stop sw2. this should cause the p to fail,
// which results in calling StopPeerForError internally
sw2.Stop()
// now call StopPeerForError explicitly, eg. from a reactor
sw1.StopPeerForError(p, fmt.Errorf("some err"))
assert.Equal(t, len(sw1.Peers().List()), 0)
assert.EqualValues(t, 0, peersMetricValue())
}
func TestSwitchReconnectsToOutboundPersistentPeer(t *testing.T) {
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
require.NoError(t, err)
defer sw.Stop()
// 1. simulate failure by closing connection
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
err = sw.AddPersistentPeers([]string{rp.Addr().String()})
require.NoError(t, err)
err = sw.DialPeerWithAddress(rp.Addr())
require.Nil(t, err)
require.NotNil(t, sw.Peers().Get(rp.ID()))
p := sw.Peers().List()[0]
p.(*peer).CloseConn()
waitUntilSwitchHasAtLeastNPeers(sw, 1)
assert.False(t, p.IsRunning()) // old peer instance
assert.Equal(t, 1, sw.Peers().Size()) // new peer instance
// 2. simulate first time dial failure
rp = &remotePeer{
PrivKey: ed25519.GenPrivKey(),
Config: cfg,
// Use different interface to prevent duplicate IP filter, this will break
// beyond two peers.
listenAddr: "127.0.0.1:0",
}
rp.Start()
defer rp.Stop()
conf := config.DefaultP2PConfig()
conf.TestDialFail = true // will trigger a reconnect
err = sw.addOutboundPeerWithConfig(rp.Addr(), conf)
require.NotNil(t, err)
// DialPeerWithAddres - sw.peerConfig resets the dialer
waitUntilSwitchHasAtLeastNPeers(sw, 2)
assert.Equal(t, 2, sw.Peers().Size())
}
func TestSwitchReconnectsToInboundPersistentPeer(t *testing.T) {
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
require.NoError(t, err)
defer sw.Stop()
// 1. simulate failure by closing the connection
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
err = sw.AddPersistentPeers([]string{rp.Addr().String()})
require.NoError(t, err)
conn, err := rp.Dial(sw.NetAddress())
require.NoError(t, err)
time.Sleep(50 * time.Millisecond)
require.NotNil(t, sw.Peers().Get(rp.ID()))
conn.Close()
waitUntilSwitchHasAtLeastNPeers(sw, 1)
assert.Equal(t, 1, sw.Peers().Size())
}
func TestSwitchDialPeersAsync(t *testing.T) {
if testing.Short() {
return
}
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
require.NoError(t, err)
defer sw.Stop()
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
err = sw.DialPeersAsync([]string{rp.Addr().String()})
require.NoError(t, err)
time.Sleep(dialRandomizerIntervalMilliseconds * time.Millisecond)
require.NotNil(t, sw.Peers().Get(rp.ID()))
}
func waitUntilSwitchHasAtLeastNPeers(sw *Switch, n int) {
for i := 0; i < 20; i++ {
time.Sleep(250 * time.Millisecond)
has := sw.Peers().Size()
if has >= n {
break
}
}
}
func TestSwitchFullConnectivity(t *testing.T) {
switches := MakeConnectedSwitches(cfg, 3, initSwitchFunc, Connect2Switches)
defer func() {
for _, sw := range switches {
sw.Stop()
}
}()
for i, sw := range switches {
if sw.Peers().Size() != 2 {
t.Fatalf("Expected each switch to be connected to 2 other, but %d switch only connected to %d", sw.Peers().Size(), i)
}
}
}
func TestSwitchAcceptRoutine(t *testing.T) {
cfg.MaxNumInboundPeers = 5
// make switch
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
require.NoError(t, err)
defer sw.Stop()
remotePeers := make([]*remotePeer, 0)
assert.Equal(t, 0, sw.Peers().Size())
// 1. check we connect up to MaxNumInboundPeers
for i := 0; i < cfg.MaxNumInboundPeers; i++ {
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
remotePeers = append(remotePeers, rp)
rp.Start()
c, err := rp.Dial(sw.NetAddress())
require.NoError(t, err)
// spawn a reading routine to prevent connection from closing
go func(c net.Conn) {
for {
one := make([]byte, 1)
_, err := c.Read(one)
if err != nil {
return
}
}
}(c)
}
time.Sleep(10 * time.Millisecond)
assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size())
// 2. check we close new connections if we already have MaxNumInboundPeers peers
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
conn, err := rp.Dial(sw.NetAddress())
require.NoError(t, err)
// check conn is closed
one := make([]byte, 1)
conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
_, err = conn.Read(one)
assert.Equal(t, io.EOF, err)
assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size())
rp.Stop()
// stop remote peers
for _, rp := range remotePeers {
rp.Stop()
}
}
type errorTransport struct {
acceptErr error
}
func (et errorTransport) NetAddress() NetAddress {
panic("not implemented")
}
func (et errorTransport) Accept(c peerConfig) (Peer, error) {
return nil, et.acceptErr
}
func (errorTransport) Dial(NetAddress, peerConfig) (Peer, error) {
panic("not implemented")
}
func (errorTransport) Cleanup(Peer) {
panic("not implemented")
}
func TestSwitchAcceptRoutineErrorCases(t *testing.T) {
sw := NewSwitch(cfg, errorTransport{ErrFilterTimeout{}})
assert.NotPanics(t, func() {
err := sw.Start()
assert.NoError(t, err)
sw.Stop()
})
sw = NewSwitch(cfg, errorTransport{ErrRejected{conn: nil, err: errors.New("filtered"), isFiltered: true}})
assert.NotPanics(t, func() {
err := sw.Start()
assert.NoError(t, err)
sw.Stop()
})
// TODO(melekes) check we remove our address from addrBook
sw = NewSwitch(cfg, errorTransport{ErrTransportClosed{}})
assert.NotPanics(t, func() {
err := sw.Start()
assert.NoError(t, err)
sw.Stop()
})
}
// mockReactor checks that InitPeer never called before RemovePeer. If that's
// not true, InitCalledBeforeRemoveFinished will return true.
type mockReactor struct {
*BaseReactor
// atomic
removePeerInProgress uint32
initCalledBeforeRemoveFinished uint32
}
func (r *mockReactor) RemovePeer(peer Peer, reason interface{}) {
atomic.StoreUint32(&r.removePeerInProgress, 1)
defer atomic.StoreUint32(&r.removePeerInProgress, 0)
time.Sleep(100 * time.Millisecond)
}
func (r *mockReactor) InitPeer(peer Peer) Peer {
if atomic.LoadUint32(&r.removePeerInProgress) == 1 {
atomic.StoreUint32(&r.initCalledBeforeRemoveFinished, 1)
}
return peer
}
func (r *mockReactor) InitCalledBeforeRemoveFinished() bool {
return atomic.LoadUint32(&r.initCalledBeforeRemoveFinished) == 1
}
// see stopAndRemovePeer
func TestSwitchInitPeerIsNotCalledBeforeRemovePeer(t *testing.T) {
// make reactor
reactor := &mockReactor{}
reactor.BaseReactor = NewBaseReactor("mockReactor", reactor)
// make switch
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", func(i int, sw *Switch) *Switch {
sw.AddReactor("mock", reactor)
return sw
})
err := sw.Start()
require.NoError(t, err)
defer sw.Stop()
// add peer
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
_, err = rp.Dial(sw.NetAddress())
require.NoError(t, err)
// wait till the switch adds rp to the peer set
time.Sleep(50 * time.Millisecond)
// stop peer asynchronously
go sw.StopPeerForError(sw.Peers().Get(rp.ID()), "test")
// simulate peer reconnecting to us
_, err = rp.Dial(sw.NetAddress())
require.NoError(t, err)
// wait till the switch adds rp to the peer set
time.Sleep(50 * time.Millisecond)
// make sure reactor.RemovePeer is finished before InitPeer is called
assert.False(t, reactor.InitCalledBeforeRemoveFinished())
}
func BenchmarkSwitchBroadcast(b *testing.B) {
s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch {
// Make bar reactors of bar channels each
sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x00), Priority: 10},
{ID: byte(0x01), Priority: 10},
}, false))
sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x02), Priority: 10},
{ID: byte(0x03), Priority: 10},
}, false))
return sw
})
defer s1.Stop()
defer s2.Stop()
// Allow time for goroutines to boot up
time.Sleep(1 * time.Second)
b.ResetTimer()
numSuccess, numFailure := 0, 0
// Send random message from foo channel to another
for i := 0; i < b.N; i++ {
chID := byte(i % 4)
successChan := s1.Broadcast(chID, []byte("test data"))
for s := range successChan {
if s {
numSuccess++
} else {
numFailure++
}
}
}
b.Logf("success: %v, failure: %v", numSuccess, numFailure)
}
type addrBookMock struct {
addrs map[string]struct{}
ourAddrs map[string]struct{}
}
var _ AddrBook = (*addrBookMock)(nil)
func (book *addrBookMock) AddAddress(addr *NetAddress, src *NetAddress) error {
book.addrs[addr.String()] = struct{}{}
return nil
}
func (book *addrBookMock) AddOurAddress(addr *NetAddress) { book.ourAddrs[addr.String()] = struct{}{} }
func (book *addrBookMock) OurAddress(addr *NetAddress) bool {
_, ok := book.ourAddrs[addr.String()]
return ok
}
func (book *addrBookMock) MarkGood(ID) {}
func (book *addrBookMock) HasAddress(addr *NetAddress) bool {
_, ok := book.addrs[addr.String()]
return ok
}
func (book *addrBookMock) RemoveAddress(addr *NetAddress) {
delete(book.addrs, addr.String())
}
func (book *addrBookMock) Save() {}