You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

344 lines
8.8 KiB

9 years ago
9 years ago
7 years ago
9 years ago
8 years ago
7 years ago
8 years ago
7 years ago
8 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. crypto "github.com/tendermint/go-crypto"
  12. wire "github.com/tendermint/go-wire"
  13. "github.com/tendermint/tmlibs/log"
  14. cfg "github.com/tendermint/tendermint/config"
  15. )
  16. var (
  17. config *cfg.P2PConfig
  18. )
  19. func init() {
  20. config = cfg.DefaultP2PConfig()
  21. config.PexReactor = true
  22. }
  23. type PeerMessage struct {
  24. PeerKey string
  25. Bytes []byte
  26. Counter int
  27. }
  28. type TestReactor struct {
  29. BaseReactor
  30. mtx sync.Mutex
  31. channels []*ChannelDescriptor
  32. peersAdded []Peer
  33. peersRemoved []Peer
  34. logMessages bool
  35. msgsCounter int
  36. msgsReceived map[byte][]PeerMessage
  37. }
  38. func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
  39. tr := &TestReactor{
  40. channels: channels,
  41. logMessages: logMessages,
  42. msgsReceived: make(map[byte][]PeerMessage),
  43. }
  44. tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
  45. tr.SetLogger(log.TestingLogger())
  46. return tr
  47. }
  48. func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
  49. return tr.channels
  50. }
  51. func (tr *TestReactor) AddPeer(peer Peer) {
  52. tr.mtx.Lock()
  53. defer tr.mtx.Unlock()
  54. tr.peersAdded = append(tr.peersAdded, peer)
  55. }
  56. func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {
  57. tr.mtx.Lock()
  58. defer tr.mtx.Unlock()
  59. tr.peersRemoved = append(tr.peersRemoved, peer)
  60. }
  61. func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
  62. if tr.logMessages {
  63. tr.mtx.Lock()
  64. defer tr.mtx.Unlock()
  65. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  66. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key(), msgBytes, tr.msgsCounter})
  67. tr.msgsCounter++
  68. }
  69. }
  70. func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
  71. tr.mtx.Lock()
  72. defer tr.mtx.Unlock()
  73. return tr.msgsReceived[chID]
  74. }
  75. //-----------------------------------------------------------------------------
  76. // convenience method for creating two switches connected to each other.
  77. // XXX: note this uses net.Pipe and not a proper TCP conn
  78. func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
  79. // Create two switches that will be interconnected.
  80. switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches)
  81. return switches[0], switches[1]
  82. }
  83. func initSwitchFunc(i int, sw *Switch) *Switch {
  84. // Make two reactors of two channels each
  85. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  86. {ID: byte(0x00), Priority: 10},
  87. {ID: byte(0x01), Priority: 10},
  88. }, true))
  89. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  90. {ID: byte(0x02), Priority: 10},
  91. {ID: byte(0x03), Priority: 10},
  92. }, true))
  93. return sw
  94. }
  95. func TestSwitches(t *testing.T) {
  96. s1, s2 := makeSwitchPair(t, initSwitchFunc)
  97. defer s1.Stop()
  98. defer s2.Stop()
  99. if s1.Peers().Size() != 1 {
  100. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  101. }
  102. if s2.Peers().Size() != 1 {
  103. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  104. }
  105. // Lets send some messages
  106. ch0Msg := "channel zero"
  107. ch1Msg := "channel foo"
  108. ch2Msg := "channel bar"
  109. s1.Broadcast(byte(0x00), ch0Msg)
  110. s1.Broadcast(byte(0x01), ch1Msg)
  111. s1.Broadcast(byte(0x02), ch2Msg)
  112. assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  113. assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  114. assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
  115. }
  116. func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
  117. ticker := time.NewTicker(checkPeriod)
  118. for {
  119. select {
  120. case <-ticker.C:
  121. msgs := reactor.getMsgs(channel)
  122. if len(msgs) > 0 {
  123. if !bytes.Equal(msgs[0].Bytes, wire.BinaryBytes(msg)) {
  124. t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(msg), msgs[0].Bytes)
  125. }
  126. return
  127. }
  128. case <-time.After(timeout):
  129. t.Fatalf("Expected to have received 1 message in channel #%v, got zero", channel)
  130. }
  131. }
  132. }
  133. func TestConnAddrFilter(t *testing.T) {
  134. s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  135. s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  136. defer s1.Stop()
  137. defer s2.Stop()
  138. c1, c2 := netPipe()
  139. s1.SetAddrFilter(func(addr net.Addr) error {
  140. if addr.String() == c1.RemoteAddr().String() {
  141. return fmt.Errorf("Error: pipe is blacklisted")
  142. }
  143. return nil
  144. })
  145. // connect to good peer
  146. go func() {
  147. err := s1.addPeerWithConnection(c1)
  148. assert.NotNil(t, err, "expected err")
  149. }()
  150. go func() {
  151. err := s2.addPeerWithConnection(c2)
  152. assert.NotNil(t, err, "expected err")
  153. }()
  154. assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
  155. assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
  156. }
  157. func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
  158. time.Sleep(timeout)
  159. if sw.Peers().Size() != 0 {
  160. t.Fatalf("Expected %v to not connect to some peers, got %d", sw, sw.Peers().Size())
  161. }
  162. }
  163. func TestConnPubKeyFilter(t *testing.T) {
  164. s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  165. s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  166. defer s1.Stop()
  167. defer s2.Stop()
  168. c1, c2 := netPipe()
  169. // set pubkey filter
  170. s1.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  171. if bytes.Equal(pubkey.Bytes(), s2.nodeInfo.PubKey.Bytes()) {
  172. return fmt.Errorf("Error: pipe is blacklisted")
  173. }
  174. return nil
  175. })
  176. // connect to good peer
  177. go func() {
  178. err := s1.addPeerWithConnection(c1)
  179. assert.NotNil(t, err, "expected error")
  180. }()
  181. go func() {
  182. err := s2.addPeerWithConnection(c2)
  183. assert.NotNil(t, err, "expected error")
  184. }()
  185. assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
  186. assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
  187. }
  188. func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
  189. assert, require := assert.New(t), require.New(t)
  190. sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  191. err := sw.Start()
  192. if err != nil {
  193. t.Error(err)
  194. }
  195. defer sw.Stop()
  196. // simulate remote peer
  197. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
  198. rp.Start()
  199. defer rp.Stop()
  200. peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig())
  201. require.Nil(err)
  202. err = sw.addPeer(peer)
  203. require.Nil(err)
  204. // simulate failure by closing connection
  205. peer.CloseConn()
  206. assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
  207. assert.False(peer.IsRunning())
  208. }
  209. func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
  210. assert, require := assert.New(t), require.New(t)
  211. sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  212. err := sw.Start()
  213. if err != nil {
  214. t.Error(err)
  215. }
  216. defer sw.Stop()
  217. // simulate remote peer
  218. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
  219. rp.Start()
  220. defer rp.Stop()
  221. peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig())
  222. peer.makePersistent()
  223. require.Nil(err)
  224. err = sw.addPeer(peer)
  225. require.Nil(err)
  226. // simulate failure by closing connection
  227. peer.CloseConn()
  228. // TODO: remove sleep, detect the disconnection, wait for reconnect
  229. npeers := sw.Peers().Size()
  230. for i := 0; i < 20; i++ {
  231. time.Sleep(250 * time.Millisecond)
  232. npeers = sw.Peers().Size()
  233. if npeers > 0 {
  234. break
  235. }
  236. }
  237. assert.NotZero(npeers)
  238. assert.False(peer.IsRunning())
  239. }
  240. func TestSwitchFullConnectivity(t *testing.T) {
  241. switches := MakeConnectedSwitches(config, 3, initSwitchFunc, Connect2Switches)
  242. defer func() {
  243. for _, sw := range switches {
  244. sw.Stop()
  245. }
  246. }()
  247. for i, sw := range switches {
  248. if sw.Peers().Size() != 2 {
  249. t.Fatalf("Expected each switch to be connected to 2 other, but %d switch only connected to %d", sw.Peers().Size(), i)
  250. }
  251. }
  252. }
  253. func BenchmarkSwitches(b *testing.B) {
  254. b.StopTimer()
  255. s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch {
  256. // Make bar reactors of bar channels each
  257. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  258. {ID: byte(0x00), Priority: 10},
  259. {ID: byte(0x01), Priority: 10},
  260. }, false))
  261. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  262. {ID: byte(0x02), Priority: 10},
  263. {ID: byte(0x03), Priority: 10},
  264. }, false))
  265. return sw
  266. })
  267. defer s1.Stop()
  268. defer s2.Stop()
  269. // Allow time for goroutines to boot up
  270. time.Sleep(1 * time.Second)
  271. b.StartTimer()
  272. numSuccess, numFailure := 0, 0
  273. // Send random message from foo channel to another
  274. for i := 0; i < b.N; i++ {
  275. chID := byte(i % 4)
  276. successChan := s1.Broadcast(chID, "test data")
  277. for s := range successChan {
  278. if s {
  279. numSuccess++
  280. } else {
  281. numFailure++
  282. }
  283. }
  284. }
  285. b.Logf("success: %v, failure: %v", numSuccess, numFailure)
  286. // Allow everything to flush before stopping switches & closing connections.
  287. b.StopTimer()
  288. }