You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

344 lines
8.9 KiB

9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
9 years ago
8 years ago
9 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. crypto "github.com/tendermint/go-crypto"
  12. wire "github.com/tendermint/go-wire"
  13. "github.com/tendermint/tmlibs/log"
  14. cfg "github.com/tendermint/tendermint/config"
  15. "github.com/tendermint/tendermint/p2p/conn"
  16. )
  17. var (
  18. config *cfg.P2PConfig
  19. )
  20. func init() {
  21. config = cfg.DefaultP2PConfig()
  22. config.PexReactor = true
  23. }
  24. type PeerMessage struct {
  25. PeerID ID
  26. Bytes []byte
  27. Counter int
  28. }
  29. type TestReactor struct {
  30. BaseReactor
  31. mtx sync.Mutex
  32. channels []*conn.ChannelDescriptor
  33. peersAdded []Peer
  34. peersRemoved []Peer
  35. logMessages bool
  36. msgsCounter int
  37. msgsReceived map[byte][]PeerMessage
  38. }
  39. func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
  40. tr := &TestReactor{
  41. channels: channels,
  42. logMessages: logMessages,
  43. msgsReceived: make(map[byte][]PeerMessage),
  44. }
  45. tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
  46. tr.SetLogger(log.TestingLogger())
  47. return tr
  48. }
  49. func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
  50. return tr.channels
  51. }
  52. func (tr *TestReactor) AddPeer(peer Peer) {
  53. tr.mtx.Lock()
  54. defer tr.mtx.Unlock()
  55. tr.peersAdded = append(tr.peersAdded, peer)
  56. }
  57. func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {
  58. tr.mtx.Lock()
  59. defer tr.mtx.Unlock()
  60. tr.peersRemoved = append(tr.peersRemoved, peer)
  61. }
  62. func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
  63. if tr.logMessages {
  64. tr.mtx.Lock()
  65. defer tr.mtx.Unlock()
  66. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  67. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
  68. tr.msgsCounter++
  69. }
  70. }
  71. func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
  72. tr.mtx.Lock()
  73. defer tr.mtx.Unlock()
  74. return tr.msgsReceived[chID]
  75. }
  76. //-----------------------------------------------------------------------------
  77. // convenience method for creating two switches connected to each other.
  78. // XXX: note this uses net.Pipe and not a proper TCP conn
  79. func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
  80. // Create two switches that will be interconnected.
  81. switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches)
  82. return switches[0], switches[1]
  83. }
  84. func initSwitchFunc(i int, sw *Switch) *Switch {
  85. // Make two reactors of two channels each
  86. sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
  87. {ID: byte(0x00), Priority: 10},
  88. {ID: byte(0x01), Priority: 10},
  89. }, true))
  90. sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
  91. {ID: byte(0x02), Priority: 10},
  92. {ID: byte(0x03), Priority: 10},
  93. }, true))
  94. return sw
  95. }
  96. func TestSwitches(t *testing.T) {
  97. s1, s2 := MakeSwitchPair(t, initSwitchFunc)
  98. defer s1.Stop()
  99. defer s2.Stop()
  100. if s1.Peers().Size() != 1 {
  101. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  102. }
  103. if s2.Peers().Size() != 1 {
  104. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  105. }
  106. // Lets send some messages
  107. ch0Msg := "channel zero"
  108. ch1Msg := "channel foo"
  109. ch2Msg := "channel bar"
  110. s1.Broadcast(byte(0x00), ch0Msg)
  111. s1.Broadcast(byte(0x01), ch1Msg)
  112. s1.Broadcast(byte(0x02), ch2Msg)
  113. assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  114. assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  115. assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  116. }
  117. func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
  118. ticker := time.NewTicker(checkPeriod)
  119. for {
  120. select {
  121. case <-ticker.C:
  122. msgs := reactor.getMsgs(channel)
  123. if len(msgs) > 0 {
  124. if !bytes.Equal(msgs[0].Bytes, wire.BinaryBytes(msg)) {
  125. t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(msg), msgs[0].Bytes)
  126. }
  127. return
  128. }
  129. case <-time.After(timeout):
  130. t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
  131. }
  132. }
  133. }
  134. func TestConnAddrFilter(t *testing.T) {
  135. s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  136. s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  137. defer s1.Stop()
  138. defer s2.Stop()
  139. c1, c2 := conn.NetPipe()
  140. s1.SetAddrFilter(func(addr net.Addr) error {
  141. if addr.String() == c1.RemoteAddr().String() {
  142. return fmt.Errorf("Error: pipe is blacklisted")
  143. }
  144. return nil
  145. })
  146. // connect to good peer
  147. go func() {
  148. err := s1.addPeerWithConnection(c1)
  149. assert.NotNil(t, err, "expected err")
  150. }()
  151. go func() {
  152. err := s2.addPeerWithConnection(c2)
  153. assert.NotNil(t, err, "expected err")
  154. }()
  155. assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
  156. assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
  157. }
  158. func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
  159. time.Sleep(timeout)
  160. if sw.Peers().Size() != 0 {
  161. t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size())
  162. }
  163. }
  164. func TestConnPubKeyFilter(t *testing.T) {
  165. s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  166. s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  167. defer s1.Stop()
  168. defer s2.Stop()
  169. c1, c2 := conn.NetPipe()
  170. // set pubkey filter
  171. s1.SetPubKeyFilter(func(pubkey crypto.PubKey) error {
  172. if bytes.Equal(pubkey.Bytes(), s2.nodeInfo.PubKey.Bytes()) {
  173. return fmt.Errorf("Error: pipe is blacklisted")
  174. }
  175. return nil
  176. })
  177. // connect to good peer
  178. go func() {
  179. err := s1.addPeerWithConnection(c1)
  180. assert.NotNil(t, err, "expected error")
  181. }()
  182. go func() {
  183. err := s2.addPeerWithConnection(c2)
  184. assert.NotNil(t, err, "expected error")
  185. }()
  186. assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
  187. assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
  188. }
  189. func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
  190. assert, require := assert.New(t), require.New(t)
  191. sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  192. err := sw.Start()
  193. if err != nil {
  194. t.Error(err)
  195. }
  196. defer sw.Stop()
  197. // simulate remote peer
  198. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519().Wrap(), Config: DefaultPeerConfig()}
  199. rp.Start()
  200. defer rp.Stop()
  201. peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig(), false)
  202. require.Nil(err)
  203. err = sw.addPeer(peer)
  204. require.Nil(err)
  205. // simulate failure by closing connection
  206. peer.CloseConn()
  207. assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
  208. assert.False(peer.IsRunning())
  209. }
  210. func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
  211. assert, require := assert.New(t), require.New(t)
  212. sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  213. err := sw.Start()
  214. if err != nil {
  215. t.Error(err)
  216. }
  217. defer sw.Stop()
  218. // simulate remote peer
  219. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519().Wrap(), Config: DefaultPeerConfig()}
  220. rp.Start()
  221. defer rp.Stop()
  222. peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, DefaultPeerConfig(), true)
  223. require.Nil(err)
  224. err = sw.addPeer(peer)
  225. require.Nil(err)
  226. // simulate failure by closing connection
  227. peer.CloseConn()
  228. // TODO: remove sleep, detect the disconnection, wait for reconnect
  229. npeers := sw.Peers().Size()
  230. for i := 0; i < 20; i++ {
  231. time.Sleep(250 * time.Millisecond)
  232. npeers = sw.Peers().Size()
  233. if npeers > 0 {
  234. break
  235. }
  236. }
  237. assert.NotZero(npeers)
  238. assert.False(peer.IsRunning())
  239. }
  240. func TestSwitchFullConnectivity(t *testing.T) {
  241. switches := MakeConnectedSwitches(config, 3, initSwitchFunc, Connect2Switches)
  242. defer func() {
  243. for _, sw := range switches {
  244. sw.Stop()
  245. }
  246. }()
  247. for i, sw := range switches {
  248. if sw.Peers().Size() != 2 {
  249. t.Fatalf("Expected each switch to be connected to 2 other, but %d switch only connected to %d", sw.Peers().Size(), i)
  250. }
  251. }
  252. }
  253. func BenchmarkSwitches(b *testing.B) {
  254. b.StopTimer()
  255. s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch {
  256. // Make bar reactors of bar channels each
  257. sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
  258. {ID: byte(0x00), Priority: 10},
  259. {ID: byte(0x01), Priority: 10},
  260. }, false))
  261. sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
  262. {ID: byte(0x02), Priority: 10},
  263. {ID: byte(0x03), Priority: 10},
  264. }, false))
  265. return sw
  266. })
  267. defer s1.Stop()
  268. defer s2.Stop()
  269. // Allow time for goroutines to boot up
  270. time.Sleep(1 * time.Second)
  271. b.StartTimer()
  272. numSuccess, numFailure := 0, 0
  273. // Send random message from foo channel to another
  274. for i := 0; i < b.N; i++ {
  275. chID := byte(i % 4)
  276. successChan := s1.Broadcast(chID, "test data")
  277. for s := range successChan {
  278. if s {
  279. numSuccess++
  280. } else {
  281. numFailure++
  282. }
  283. }
  284. }
  285. b.Logf("success: %v, failure: %v", numSuccess, numFailure)
  286. // Allow everything to flush before stopping switches & closing connections.
  287. b.StopTimer()
  288. }