You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

119 lines
4.1 KiB

10 years ago
  1. #!/usr/bin/env python
  2. from __future__ import unicode_literals
  3. # Allow direct execution
  4. import os
  5. import sys
  6. import unittest
  7. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  8. from youtube_dl import YoutubeDL
  9. from youtube_dl.compat import compat_http_server, compat_urllib_request
  10. import ssl
  11. import threading
  12. TEST_DIR = os.path.dirname(os.path.abspath(__file__))
  13. class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
  14. def log_message(self, format, *args):
  15. pass
  16. def do_GET(self):
  17. if self.path == '/video.html':
  18. self.send_response(200)
  19. self.send_header('Content-Type', 'text/html; charset=utf-8')
  20. self.end_headers()
  21. self.wfile.write(b'<html><video src="/vid.mp4" /></html>')
  22. elif self.path == '/vid.mp4':
  23. self.send_response(200)
  24. self.send_header('Content-Type', 'video/mp4')
  25. self.end_headers()
  26. self.wfile.write(b'\x00\x00\x00\x00\x20\x66\x74[video]')
  27. else:
  28. assert False
  29. class FakeLogger(object):
  30. def debug(self, msg):
  31. pass
  32. def warning(self, msg):
  33. pass
  34. def error(self, msg):
  35. pass
  36. class TestHTTP(unittest.TestCase):
  37. def setUp(self):
  38. certfn = os.path.join(TEST_DIR, 'testcert.pem')
  39. self.httpd = compat_http_server.HTTPServer(
  40. ('localhost', 0), HTTPTestRequestHandler)
  41. self.httpd.socket = ssl.wrap_socket(
  42. self.httpd.socket, certfile=certfn, server_side=True)
  43. self.port = self.httpd.socket.getsockname()[1]
  44. self.server_thread = threading.Thread(target=self.httpd.serve_forever)
  45. self.server_thread.daemon = True
  46. self.server_thread.start()
  47. def test_nocheckcertificate(self):
  48. if sys.version_info >= (2, 7, 9): # No certificate checking anyways
  49. ydl = YoutubeDL({'logger': FakeLogger()})
  50. self.assertRaises(
  51. Exception,
  52. ydl.extract_info, 'https://localhost:%d/video.html' % self.port)
  53. ydl = YoutubeDL({'logger': FakeLogger(), 'nocheckcertificate': True})
  54. r = ydl.extract_info('https://localhost:%d/video.html' % self.port)
  55. self.assertEqual(r['url'], 'https://localhost:%d/vid.mp4' % self.port)
  56. def _build_proxy_handler(name):
  57. class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
  58. proxy_name = name
  59. def log_message(self, format, *args):
  60. pass
  61. def do_GET(self):
  62. self.send_response(200)
  63. self.send_header('Content-Type', 'text/plain; charset=utf-8')
  64. self.end_headers()
  65. self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode('utf-8'))
  66. return HTTPTestRequestHandler
  67. class TestProxy(unittest.TestCase):
  68. def setUp(self):
  69. self.proxy = compat_http_server.HTTPServer(
  70. ('localhost', 0), _build_proxy_handler('normal'))
  71. self.port = self.proxy.socket.getsockname()[1]
  72. self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)
  73. self.proxy_thread.daemon = True
  74. self.proxy_thread.start()
  75. self.cn_proxy = compat_http_server.HTTPServer(
  76. ('localhost', 0), _build_proxy_handler('cn'))
  77. self.cn_port = self.cn_proxy.socket.getsockname()[1]
  78. self.cn_proxy_thread = threading.Thread(target=self.cn_proxy.serve_forever)
  79. self.cn_proxy_thread.daemon = True
  80. self.cn_proxy_thread.start()
  81. def test_proxy(self):
  82. cn_proxy = 'localhost:{0}'.format(self.cn_port)
  83. ydl = YoutubeDL({
  84. 'proxy': 'localhost:{0}'.format(self.port),
  85. 'cn_verification_proxy': cn_proxy,
  86. })
  87. url = 'http://foo.com/bar'
  88. response = ydl.urlopen(url).read().decode('utf-8')
  89. self.assertEqual(response, 'normal: {0}'.format(url))
  90. req = compat_urllib_request.Request(url)
  91. req.add_header('Ytdl-request-proxy', cn_proxy)
  92. response = ydl.urlopen(req).read().decode('utf-8')
  93. self.assertEqual(response, 'cn: {0}'.format(url))
  94. if __name__ == '__main__':
  95. unittest.main()