You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

347 lines
8.9 KiB

9 years ago
9 years ago
8 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
7 years ago
9 years ago
8 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. crypto "github.com/tendermint/go-crypto"
  12. wire "github.com/tendermint/go-wire"
  13. cfg "github.com/tendermint/tendermint/config"
  14. "github.com/tendermint/tmlibs/log"
  15. )
  16. var (
  17. config *cfg.P2PConfig
  18. )
  19. func init() {
  20. config = cfg.DefaultP2PConfig()
  21. config.PexReactor = true
  22. }
  23. type PeerMessage struct {
  24. PeerKey string
  25. Bytes []byte
  26. Counter int
  27. }
  28. type TestReactor struct {
  29. BaseReactor
  30. mtx sync.Mutex
  31. channels []*ChannelDescriptor
  32. peersAdded []Peer
  33. peersRemoved []Peer
  34. logMessages bool
  35. msgsCounter int
  36. msgsReceived map[byte][]PeerMessage
  37. }
  38. func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
  39. tr := &TestReactor{
  40. channels: channels,
  41. logMessages: logMessages,
  42. msgsReceived: make(map[byte][]PeerMessage),
  43. }
  44. tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
  45. tr.SetLogger(log.TestingLogger())
  46. return tr
  47. }
  48. func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
  49. return tr.channels
  50. }
  51. func (tr *TestReactor) AddPeer(peer Peer) {
  52. tr.mtx.Lock()
  53. defer tr.mtx.Unlock()
  54. tr.peersAdded = append(tr.peersAdded, peer)
  55. }
  56. func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {
  57. tr.mtx.Lock()
  58. defer tr.mtx.Unlock()
  59. tr.peersRemoved = append(tr.peersRemoved, peer)
  60. }
  61. func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
  62. if tr.logMessages {
  63. tr.mtx.Lock()
  64. defer tr.mtx.Unlock()
  65. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  66. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key(), msgBytes, tr.msgsCounter})
  67. tr.msgsCounter++
  68. }
  69. }
  70. func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
  71. tr.mtx.Lock()
  72. defer tr.mtx.Unlock()
  73. return tr.msgsReceived[chID]
  74. }
  75. //-----------------------------------------------------------------------------
  76. // convenience method for creating two switches connected to each other.
  77. // XXX: note this uses net.Pipe and not a proper TCP conn
  78. func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
  79. // Create two switches that will be interconnected.
  80. switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches)
  81. return switches[0], switches[1]
  82. }
  83. func initSwitchFunc(i int, sw *Switch) *Switch {
  84. // Make two reactors of two channels each
  85. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  86. {ID: byte(0x00), Priority: 10},
  87. {ID: byte(0x01), Priority: 10},
  88. }, true))
  89. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  90. {ID: byte(0x02), Priority: 10},
  91. {ID: byte(0x03), Priority: 10},
  92. }, true))
  93. return sw
  94. }
  95. func TestSwitches(t *testing.T) {
  96. s1, s2 := makeSwitchPair(t, initSwitchFunc)
  97. defer s1.Stop()
  98. defer s2.Stop()
  99. if s1.Peers().Size() != 1 {
  100. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  101. }
  102. if s2.Peers().Size() != 1 {
  103. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  104. }
  105. // Lets send some messages
  106. ch0Msg := "channel zero"
  107. ch1Msg := "channel foo"
  108. ch2Msg := "channel bar"
  109. s1.Broadcast(byte(0x00), ch0Msg)
  110. s1.Broadcast(byte(0x01), ch1Msg)
  111. s1.Broadcast(byte(0x02), ch2Msg)
  112. assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  113. assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  114. assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  115. }
  116. func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
  117. ticker := time.NewTicker(checkPeriod)
  118. for {
  119. select {
  120. case <-ticker.C:
  121. msgs := reactor.getMsgs(channel)
  122. if len(msgs) > 0 {
  123. if !bytes.Equal(msgs[0].Bytes, wire.BinaryBytes(msg)) {
  124. t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(msg), msgs[0].Bytes)
  125. }
  126. return
  127. }
  128. case <-time.After(timeout):
  129. t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
  130. }
  131. }
  132. }
  133. func TestConnAddrFilter(t *testing.T) {
  134. s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  135. s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  136. defer s1.Stop()
  137. defer s2.Stop()
  138. c1, c2 := netPipe()
  139. s1.SetAddrFilter(func(addr net.Addr) error {
  140. if addr.String() == c1.RemoteAddr().String() {
  141. return fmt.Errorf("Error: pipe is blacklisted")
  142. }
  143. return nil
  144. })
  145. // connect to good peer
  146. go func() {
  147. if err := s1.addPeerWithConnection(c1); err != nil {
  148. // t.Error(err) FIXME: fails
  149. }
  150. }()
  151. go func() {
  152. if err := s2.addPeerWithConnection(c2); err != nil {
  153. // t.Error(err) FIXME: fails
  154. }
  155. }()
  156. assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
  157. assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
  158. }
  159. func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
  160. time.Sleep(timeout)
  161. if sw.Peers().Size() != 0 {
  162. t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size())
  163. }
  164. }
  165. func TestConnPubKeyFilter(t *testing.T) {
  166. s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  167. s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  168. defer s1.Stop()
  169. defer s2.Stop()
  170. c1, c2 := netPipe()
  171. // set pubkey filter
  172. s1.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  173. if bytes.Equal(pubkey.Bytes(), s2.nodeInfo.PubKey.Bytes()) {
  174. return fmt.Errorf("Error: pipe is blacklisted")
  175. }
  176. return nil
  177. })
  178. // connect to good peer
  179. go func() {
  180. if err := s1.addPeerWithConnection(c1); err != nil {
  181. // t.Error(err) FIXME: fails
  182. }
  183. }()
  184. go func() {
  185. if err := s2.addPeerWithConnection(c2); err != nil {
  186. // t.Error(err) FIXME: fails
  187. }
  188. }()
  189. assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
  190. assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
  191. }
  192. func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
  193. assert, require := assert.New(t), require.New(t)
  194. sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  195. _, err := sw.Start()
  196. if err != nil {
  197. t.Error(err)
  198. }
  199. defer sw.Stop()
  200. // simulate remote peer
  201. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
  202. rp.Start()
  203. defer rp.Stop()
  204. peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig())
  205. require.Nil(err)
  206. err = sw.addPeer(peer)
  207. require.Nil(err)
  208. // simulate failure by closing connection
  209. peer.CloseConn()
  210. assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
  211. assert.False(peer.IsRunning())
  212. }
  213. func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
  214. assert, require := assert.New(t), require.New(t)
  215. sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  216. _, err := sw.Start()
  217. if err != nil {
  218. t.Error(err)
  219. }
  220. defer sw.Stop()
  221. // simulate remote peer
  222. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
  223. rp.Start()
  224. defer rp.Stop()
  225. peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig())
  226. peer.makePersistent()
  227. require.Nil(err)
  228. err = sw.addPeer(peer)
  229. require.Nil(err)
  230. // simulate failure by closing connection
  231. peer.CloseConn()
  232. // TODO: actually detect the disconnection and wait for reconnect
  233. npeers := sw.Peers().Size()
  234. for i := 0; i < 20; i++ {
  235. time.Sleep(100 * time.Millisecond)
  236. npeers = sw.Peers().Size()
  237. if npeers > 0 {
  238. break
  239. }
  240. }
  241. assert.NotZero(npeers)
  242. assert.False(peer.IsRunning())
  243. }
  244. func TestSwitchFullConnectivity(t *testing.T) {
  245. switches := MakeConnectedSwitches(config, 3, initSwitchFunc, Connect2Switches)
  246. defer func() {
  247. for _, sw := range switches {
  248. sw.Stop()
  249. }
  250. }()
  251. for i, sw := range switches {
  252. if sw.Peers().Size() != 2 {
  253. t.Fatalf("Expected each switch to be connected to 2 other, but %d switch only connected to %d", sw.Peers().Size(), i)
  254. }
  255. }
  256. }
  257. func BenchmarkSwitches(b *testing.B) {
  258. b.StopTimer()
  259. s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch {
  260. // Make bar reactors of bar channels each
  261. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  262. {ID: byte(0x00), Priority: 10},
  263. {ID: byte(0x01), Priority: 10},
  264. }, false))
  265. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  266. {ID: byte(0x02), Priority: 10},
  267. {ID: byte(0x03), Priority: 10},
  268. }, false))
  269. return sw
  270. })
  271. defer s1.Stop()
  272. defer s2.Stop()
  273. // Allow time for goroutines to boot up
  274. time.Sleep(1 * time.Second)
  275. b.StartTimer()
  276. numSuccess, numFailure := 0, 0
  277. // Send random message from foo channel to another
  278. for i := 0; i < b.N; i++ {
  279. chID := byte(i % 4)
  280. successChan := s1.Broadcast(chID, "test data")
  281. for s := range successChan {
  282. if s {
  283. numSuccess++
  284. } else {
  285. numFailure++
  286. }
  287. }
  288. }
  289. b.Logf("success: %v, failure: %v", numSuccess, numFailure)
  290. // Allow everything to flush before stopping switches & closing connections.
  291. b.StopTimer()
  292. }