You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

331 lines
8.6 KiB

9 years ago
9 years ago
7 years ago
7 years ago
9 years ago
8 years ago
7 years ago
8 years ago
7 years ago
8 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
9 years ago
9 years ago
9 years ago
9 years ago
7 years ago
9 years ago
  1. package p2p
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. crypto "github.com/tendermint/go-crypto"
  12. wire "github.com/tendermint/go-wire"
  13. cfg "github.com/tendermint/tendermint/config"
  14. "github.com/tendermint/tmlibs/log"
  15. )
  16. var (
  17. config *cfg.P2PConfig
  18. )
  19. func init() {
  20. config = cfg.DefaultP2PConfig()
  21. config.PexReactor = true
  22. }
  23. type PeerMessage struct {
  24. PeerKey string
  25. Bytes []byte
  26. Counter int
  27. }
  28. type TestReactor struct {
  29. BaseReactor
  30. mtx sync.Mutex
  31. channels []*ChannelDescriptor
  32. peersAdded []Peer
  33. peersRemoved []Peer
  34. logMessages bool
  35. msgsCounter int
  36. msgsReceived map[byte][]PeerMessage
  37. }
  38. func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
  39. tr := &TestReactor{
  40. channels: channels,
  41. logMessages: logMessages,
  42. msgsReceived: make(map[byte][]PeerMessage),
  43. }
  44. tr.BaseReactor = *NewBaseReactor("TestReactor", tr)
  45. tr.SetLogger(log.TestingLogger())
  46. return tr
  47. }
  48. func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
  49. return tr.channels
  50. }
  51. func (tr *TestReactor) AddPeer(peer Peer) {
  52. tr.mtx.Lock()
  53. defer tr.mtx.Unlock()
  54. tr.peersAdded = append(tr.peersAdded, peer)
  55. }
  56. func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {
  57. tr.mtx.Lock()
  58. defer tr.mtx.Unlock()
  59. tr.peersRemoved = append(tr.peersRemoved, peer)
  60. }
  61. func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
  62. if tr.logMessages {
  63. tr.mtx.Lock()
  64. defer tr.mtx.Unlock()
  65. //fmt.Printf("Received: %X, %X\n", chID, msgBytes)
  66. tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.Key(), msgBytes, tr.msgsCounter})
  67. tr.msgsCounter++
  68. }
  69. }
  70. func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
  71. tr.mtx.Lock()
  72. defer tr.mtx.Unlock()
  73. return tr.msgsReceived[chID]
  74. }
  75. //-----------------------------------------------------------------------------
  76. // convenience method for creating two switches connected to each other.
  77. // XXX: note this uses net.Pipe and not a proper TCP conn
  78. func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
  79. // Create two switches that will be interconnected.
  80. switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches)
  81. return switches[0], switches[1]
  82. }
  83. func initSwitchFunc(i int, sw *Switch) *Switch {
  84. // Make two reactors of two channels each
  85. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  86. &ChannelDescriptor{ID: byte(0x00), Priority: 10},
  87. &ChannelDescriptor{ID: byte(0x01), Priority: 10},
  88. }, true))
  89. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  90. &ChannelDescriptor{ID: byte(0x02), Priority: 10},
  91. &ChannelDescriptor{ID: byte(0x03), Priority: 10},
  92. }, true))
  93. return sw
  94. }
  95. func TestSwitches(t *testing.T) {
  96. s1, s2 := makeSwitchPair(t, initSwitchFunc)
  97. defer s1.Stop()
  98. defer s2.Stop()
  99. if s1.Peers().Size() != 1 {
  100. t.Errorf("Expected exactly 1 peer in s1, got %v", s1.Peers().Size())
  101. }
  102. if s2.Peers().Size() != 1 {
  103. t.Errorf("Expected exactly 1 peer in s2, got %v", s2.Peers().Size())
  104. }
  105. // Lets send some messages
  106. ch0Msg := "channel zero"
  107. ch1Msg := "channel foo"
  108. ch2Msg := "channel bar"
  109. s1.Broadcast(byte(0x00), ch0Msg)
  110. s1.Broadcast(byte(0x01), ch1Msg)
  111. s1.Broadcast(byte(0x02), ch2Msg)
  112. // Wait for things to settle...
  113. time.Sleep(5000 * time.Millisecond)
  114. // Check message on ch0
  115. ch0Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x00))
  116. if len(ch0Msgs) != 1 {
  117. t.Errorf("Expected to have received 1 message in ch0")
  118. }
  119. if !bytes.Equal(ch0Msgs[0].Bytes, wire.BinaryBytes(ch0Msg)) {
  120. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes)
  121. }
  122. // Check message on ch1
  123. ch1Msgs := s2.Reactor("foo").(*TestReactor).getMsgs(byte(0x01))
  124. if len(ch1Msgs) != 1 {
  125. t.Errorf("Expected to have received 1 message in ch1")
  126. }
  127. if !bytes.Equal(ch1Msgs[0].Bytes, wire.BinaryBytes(ch1Msg)) {
  128. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch1Msg), ch1Msgs[0].Bytes)
  129. }
  130. // Check message on ch2
  131. ch2Msgs := s2.Reactor("bar").(*TestReactor).getMsgs(byte(0x02))
  132. if len(ch2Msgs) != 1 {
  133. t.Errorf("Expected to have received 1 message in ch2")
  134. }
  135. if !bytes.Equal(ch2Msgs[0].Bytes, wire.BinaryBytes(ch2Msg)) {
  136. t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", wire.BinaryBytes(ch2Msg), ch2Msgs[0].Bytes)
  137. }
  138. }
  139. func TestConnAddrFilter(t *testing.T) {
  140. s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  141. s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  142. c1, c2 := net.Pipe()
  143. s1.SetAddrFilter(func(addr net.Addr) error {
  144. if addr.String() == c1.RemoteAddr().String() {
  145. return fmt.Errorf("Error: pipe is blacklisted")
  146. }
  147. return nil
  148. })
  149. // connect to good peer
  150. go func() {
  151. s1.addPeerWithConnection(c1)
  152. }()
  153. go func() {
  154. s2.addPeerWithConnection(c2)
  155. }()
  156. // Wait for things to happen, peers to get added...
  157. time.Sleep(100 * time.Millisecond * time.Duration(4))
  158. defer s1.Stop()
  159. defer s2.Stop()
  160. if s1.Peers().Size() != 0 {
  161. t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
  162. }
  163. if s2.Peers().Size() != 0 {
  164. t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
  165. }
  166. }
  167. func TestConnPubKeyFilter(t *testing.T) {
  168. s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  169. s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  170. c1, c2 := net.Pipe()
  171. // set pubkey filter
  172. s1.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
  173. if bytes.Equal(pubkey.Bytes(), s2.nodeInfo.PubKey.Bytes()) {
  174. return fmt.Errorf("Error: pipe is blacklisted")
  175. }
  176. return nil
  177. })
  178. // connect to good peer
  179. go func() {
  180. s1.addPeerWithConnection(c1)
  181. }()
  182. go func() {
  183. s2.addPeerWithConnection(c2)
  184. }()
  185. // Wait for things to happen, peers to get added...
  186. time.Sleep(100 * time.Millisecond * time.Duration(4))
  187. defer s1.Stop()
  188. defer s2.Stop()
  189. if s1.Peers().Size() != 0 {
  190. t.Errorf("Expected s1 not to connect to peers, got %d", s1.Peers().Size())
  191. }
  192. if s2.Peers().Size() != 0 {
  193. t.Errorf("Expected s2 not to connect to peers, got %d", s2.Peers().Size())
  194. }
  195. }
  196. func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
  197. assert, require := assert.New(t), require.New(t)
  198. sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  199. sw.Start()
  200. defer sw.Stop()
  201. // simulate remote peer
  202. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
  203. rp.Start()
  204. defer rp.Stop()
  205. peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig())
  206. require.Nil(err)
  207. err = sw.addPeer(peer)
  208. require.Nil(err)
  209. // simulate failure by closing connection
  210. peer.CloseConn()
  211. time.Sleep(100 * time.Millisecond)
  212. assert.Zero(sw.Peers().Size())
  213. assert.False(peer.IsRunning())
  214. }
  215. func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
  216. assert, require := assert.New(t), require.New(t)
  217. sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
  218. sw.Start()
  219. defer sw.Stop()
  220. // simulate remote peer
  221. rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: DefaultPeerConfig()}
  222. rp.Start()
  223. defer rp.Stop()
  224. peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig())
  225. peer.makePersistent()
  226. require.Nil(err)
  227. err = sw.addPeer(peer)
  228. require.Nil(err)
  229. // simulate failure by closing connection
  230. peer.CloseConn()
  231. // TODO: actually detect the disconnection and wait for reconnect
  232. time.Sleep(100 * time.Millisecond)
  233. assert.NotZero(sw.Peers().Size())
  234. assert.False(peer.IsRunning())
  235. }
  236. func BenchmarkSwitches(b *testing.B) {
  237. b.StopTimer()
  238. s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch {
  239. // Make bar reactors of bar channels each
  240. sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
  241. &ChannelDescriptor{ID: byte(0x00), Priority: 10},
  242. &ChannelDescriptor{ID: byte(0x01), Priority: 10},
  243. }, false))
  244. sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
  245. &ChannelDescriptor{ID: byte(0x02), Priority: 10},
  246. &ChannelDescriptor{ID: byte(0x03), Priority: 10},
  247. }, false))
  248. return sw
  249. })
  250. defer s1.Stop()
  251. defer s2.Stop()
  252. // Allow time for goroutines to boot up
  253. time.Sleep(1000 * time.Millisecond)
  254. b.StartTimer()
  255. numSuccess, numFailure := 0, 0
  256. // Send random message from foo channel to another
  257. for i := 0; i < b.N; i++ {
  258. chID := byte(i % 4)
  259. successChan := s1.Broadcast(chID, "test data")
  260. for s := range successChan {
  261. if s {
  262. numSuccess++
  263. } else {
  264. numFailure++
  265. }
  266. }
  267. }
  268. b.Logf("success: %v, failure: %v", numSuccess, numFailure)
  269. // Allow everything to flush before stopping switches & closing connections.
  270. b.StopTimer()
  271. time.Sleep(1000 * time.Millisecond)
  272. }