You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

333 lines
8.7 KiB

9 years ago
9 years ago
8 years ago
8 years ago
9 years ago
9 years ago
8 years ago
9 years ago
8 years ago
9 years ago
9 years ago
7 years ago
9 years ago
8 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
8 years ago
7 years ago
8 years ago
8 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
8 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. crypto "github.com/tendermint/go-crypto"
  12. wire "github.com/tendermint/go-wire"
  13. cfg "github.com/tendermint/tendermint/config"
  14. "github.com/tendermint/tmlibs/log"
  15. )
  16. var (
  17. config *cfg.P2PConfig
  18. )
  19. func init() {
  20. config = cfg.DefaultP2PConfig()
  21. config.PexReactor = true
  22. }
  23. type PeerMessage struct {
  24. PeerKey string
  25. Bytes []byte
  26. Counter int
  27. }
  28. type TestReactor struct {
  29. BaseReactor
  30. mtx sync.Mutex
  31. channels []*ChannelDescriptor
  32. peersAdded []Peer
  33. peersRemoved []Peer
  34. logMessages bool
  35. msgsCounter int
  36. msgsReceived map[byte][]PeerMessage
  37. }
  38. func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
  39. tr := &TestReactor{
  40. channels: channels,
  41. logMessages: logMessages,
  42. msgsReceived: make(map[byte][]PeerMessage),
  43. }
  44. tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
  45. tr.SetLogger(log.TestingLogger())
  46. return tr
  47. }
  48. func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
  49. return tr.channels
  50. }
  51. func (tr *TestReactor) AddPeer(peer Peer) {
  52. tr.mtx.Lock()
  53. defer tr.mtx.Unlock()
  54. tr.peersAdded = append(tr.peersAdded, peer)
  55. }
  56. func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {
  57. tr.mtx.Lock()
  58. defer tr.mtx.Unlock()
  59. tr.peersRemoved = append(tr.peersRemoved, peer)
  60. }
  61. func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
  62. if tr.logMessages {
  63. tr.mtx.Lock()
  64. defer tr.mtx.Unlock()
  65. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  66. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key(), msgBytes, tr.msgsCounter})
  67. tr.msgsCounter++
  68. }
  69. }
  70. func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
  71. tr.mtx.Lock()
  72. defer tr.mtx.Unlock()
  73. return tr.msgsReceived[chID]
  74. }
  75. //-----------------------------------------------------------------------------
  76. // convenience method for creating two switches connected to each other.
  77. // XXX: note this uses net.Pipe and not a proper TCP conn
  78. func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
  79. // Create two switches that will be interconnected.
  80. switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches)
  81. return switches[0], switches[1]
  82. }
  83. func initSwitchFunc(i int, sw *Switch) *Switch {
  84. // Make two reactors of two channels each
  85. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  86. &ChannelDescriptor{ID: byte(0x00), Priority: 10},
  87. &ChannelDescriptor{ID: byte(0x01), Priority: 10},
  88. }, true))
  89. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  90. &ChannelDescriptor{ID: byte(0x02), Priority: 10},
  91. &ChannelDescriptor{ID: byte(0x03), Priority: 10},
  92. }, true))
  93. return sw
  94. }
  95. func TestSwitches(t *testing.T) {
  96. s1, s2 := makeSwitchPair(t, initSwitchFunc)
  97. defer s1.Stop()
  98. defer s2.Stop()
  99. if s1.Peers().Size() != 1 {
  100. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  101. }
  102. if s2.Peers().Size() != 1 {
  103. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  104. }
  105. // Lets send some messages
  106. ch0Msg := "channel zero"
  107. ch1Msg := "channel foo"
  108. ch2Msg := "channel bar"
  109. s1.Broadcast(byte(0x00), ch0Msg)
  110. s1.Broadcast(byte(0x01), ch1Msg)
  111. s1.Broadcast(byte(0x02), ch2Msg)
  112. assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  113. assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  114. assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  115. }
  116. func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
  117. ticker := time.NewTicker(checkPeriod)
  118. for {
  119. select {
  120. case <-ticker.C:
  121. msgs := reactor.getMsgs(channel)
  122. if len(msgs) > 0 {
  123. if !bytes.Equal(msgs[0].Bytes, wire.BinaryBytes(msg)) {
  124. t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(msg), msgs[0].Bytes)
  125. }
  126. return
  127. }
  128. case <-time.After(timeout):
  129. t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
  130. }
  131. }
  132. }
  133. func TestConnAddrFilter(t *testing.T) {
  134. s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  135. s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  136. defer s1.Stop()
  137. defer s2.Stop()
  138. c1, c2 := netPipe()
  139. s1.SetAddrFilter(func(addr net.Addr) error {
  140. if addr.String() == c1.RemoteAddr().String() {
  141. return fmt.Errorf("Error: pipe is blacklisted")
  142. }
  143. return nil
  144. })
  145. // connect to good peer
  146. go func() {
  147. s1.addPeerWithConnection(c1)
  148. }()
  149. go func() {
  150. s2.addPeerWithConnection(c2)
  151. }()
  152. assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
  153. assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
  154. }
  155. func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
  156. time.Sleep(timeout)
  157. if sw.Peers().Size() != 0 {
  158. t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size())
  159. }
  160. }
  161. func TestConnPubKeyFilter(t *testing.T) {
  162. s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  163. s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  164. defer s1.Stop()
  165. defer s2.Stop()
  166. c1, c2 := netPipe()
  167. // set pubkey filter
  168. s1.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  169. if bytes.Equal(pubkey.Bytes(), s2.nodeInfo.PubKey.Bytes()) {
  170. return fmt.Errorf("Error: pipe is blacklisted")
  171. }
  172. return nil
  173. })
  174. // connect to good peer
  175. go func() {
  176. s1.addPeerWithConnection(c1)
  177. }()
  178. go func() {
  179. s2.addPeerWithConnection(c2)
  180. }()
  181. assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
  182. assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
  183. }
  184. func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
  185. assert, require := assert.New(t), require.New(t)
  186. sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  187. sw.Start()
  188. defer sw.Stop()
  189. // simulate remote peer
  190. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
  191. rp.Start()
  192. defer rp.Stop()
  193. peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig())
  194. require.Nil(err)
  195. err = sw.addPeer(peer)
  196. require.Nil(err)
  197. // simulate failure by closing connection
  198. peer.CloseConn()
  199. assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
  200. assert.False(peer.IsRunning())
  201. }
  202. func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
  203. assert, require := assert.New(t), require.New(t)
  204. sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  205. sw.Start()
  206. defer sw.Stop()
  207. // simulate remote peer
  208. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
  209. rp.Start()
  210. defer rp.Stop()
  211. peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig())
  212. peer.makePersistent()
  213. require.Nil(err)
  214. err = sw.addPeer(peer)
  215. require.Nil(err)
  216. // simulate failure by closing connection
  217. peer.CloseConn()
  218. // TODO: actually detect the disconnection and wait for reconnect
  219. npeers := sw.Peers().Size()
  220. for i := 0; i < 20; i++ {
  221. time.Sleep(100 * time.Millisecond)
  222. npeers = sw.Peers().Size()
  223. if npeers > 0 {
  224. break
  225. }
  226. }
  227. assert.NotZero(npeers)
  228. assert.False(peer.IsRunning())
  229. }
  230. func TestSwitchFullConnectivity(t *testing.T) {
  231. switches := MakeConnectedSwitches(config, 3, initSwitchFunc, Connect2Switches)
  232. defer func() {
  233. for _, sw := range switches {
  234. sw.Stop()
  235. }
  236. }()
  237. for i, sw := range switches {
  238. if sw.Peers().Size() != 2 {
  239. t.Fatalf("Expected each switch to be connected to 2 other, but %d switch only connected to %d", sw.Peers().Size(), i)
  240. }
  241. }
  242. }
  243. func BenchmarkSwitches(b *testing.B) {
  244. b.StopTimer()
  245. s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch {
  246. // Make bar reactors of bar channels each
  247. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  248. &ChannelDescriptor{ID: byte(0x00), Priority: 10},
  249. &ChannelDescriptor{ID: byte(0x01), Priority: 10},
  250. }, false))
  251. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  252. &ChannelDescriptor{ID: byte(0x02), Priority: 10},
  253. &ChannelDescriptor{ID: byte(0x03), Priority: 10},
  254. }, false))
  255. return sw
  256. })
  257. defer s1.Stop()
  258. defer s2.Stop()
  259. // Allow time for goroutines to boot up
  260. time.Sleep(1 * time.Second)
  261. b.StartTimer()
  262. numSuccess, numFailure := 0, 0
  263. // Send random message from foo channel to another
  264. for i := 0; i < b.N; i++ {
  265. chID := byte(i % 4)
  266. successChan := s1.Broadcast(chID, "test data")
  267. for s := range successChan {
  268. if s {
  269. numSuccess++
  270. } else {
  271. numFailure++
  272. }
  273. }
  274. }
  275. b.Logf("success: %v, failure: %v", numSuccess, numFailure)
  276. // Allow everything to flush before stopping switches & closing connections.
  277. b.StopTimer()
  278. }