You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
6.0 KiB

12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
  1. #!/usr/bin/env python
  2. import hashlib
  3. import io
  4. import os
  5. import json
  6. import unittest
  7. import sys
  8. import socket
  9. import binascii
  10. # Allow direct execution
  11. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  12. import youtube_dl.YoutubeDL
  13. from youtube_dl.utils import *
  14. PARAMETERS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "parameters.json")
  15. RETRIES = 3
  16. # General configuration (from __init__, not very elegant...)
  17. jar = compat_cookiejar.CookieJar()
  18. cookie_processor = compat_urllib_request.HTTPCookieProcessor(jar)
  19. proxy_handler = compat_urllib_request.ProxyHandler()
  20. opener = compat_urllib_request.build_opener(proxy_handler, cookie_processor, YoutubeDLHandler())
  21. compat_urllib_request.install_opener(opener)
  22. socket.setdefaulttimeout(10)
  23. md5 = lambda s: hashlib.md5(s.encode('utf-8')).hexdigest()
  24. class YoutubeDL(youtube_dl.YoutubeDL):
  25. def __init__(self, *args, **kwargs):
  26. self.to_stderr = self.to_screen
  27. self.processed_info_dicts = []
  28. super(YoutubeDL, self).__init__(*args, **kwargs)
  29. def report_warning(self, message):
  30. # Don't accept warnings during tests
  31. raise ExtractorError(message)
  32. def process_info(self, info_dict):
  33. self.processed_info_dicts.append(info_dict)
  34. return super(YoutubeDL, self).process_info(info_dict)
  35. def _file_md5(fn):
  36. with open(fn, 'rb') as f:
  37. return hashlib.md5(f.read()).hexdigest()
  38. from helper import get_testcases, try_rm
  39. defs = get_testcases()
  40. with io.open(PARAMETERS_FILE, encoding='utf-8') as pf:
  41. parameters = json.load(pf)
  42. class TestDownload(unittest.TestCase):
  43. maxDiff = None
  44. def setUp(self):
  45. self.parameters = parameters
  46. self.defs = defs
  47. ### Dynamically generate tests
  48. def generator(test_case):
  49. def test_template(self):
  50. ie = youtube_dl.extractor.get_info_extractor(test_case['name'])
  51. def print_skipping(reason):
  52. print('Skipping %s: %s' % (test_case['name'], reason))
  53. if not ie._WORKING:
  54. print_skipping('IE marked as not _WORKING')
  55. return
  56. if 'playlist' not in test_case and not test_case['file']:
  57. print_skipping('No output file specified')
  58. return
  59. if 'skip' in test_case:
  60. print_skipping(test_case['skip'])
  61. return
  62. params = self.parameters.copy()
  63. params.update(test_case.get('params', {}))
  64. ydl = YoutubeDL(params)
  65. ydl.add_default_info_extractors()
  66. finished_hook_called = set()
  67. def _hook(status):
  68. if status['status'] == 'finished':
  69. finished_hook_called.add(status['filename'])
  70. ydl.fd.add_progress_hook(_hook)
  71. test_cases = test_case.get('playlist', [test_case])
  72. for tc in test_cases:
  73. try_rm(tc['file'])
  74. try_rm(tc['file'] + '.part')
  75. try_rm(tc['file'] + '.info.json')
  76. try:
  77. for retry in range(1, RETRIES + 1):
  78. try:
  79. ydl.download([test_case['url']])
  80. except (DownloadError, ExtractorError) as err:
  81. if retry == RETRIES: raise
  82. # Check if the exception is not a network related one
  83. if not err.exc_info[0] in (compat_urllib_error.URLError, socket.timeout, UnavailableVideoError):
  84. raise
  85. print('Retrying: {0} failed tries\n\n##########\n\n'.format(retry))
  86. else:
  87. break
  88. for tc in test_cases:
  89. if not test_case.get('params', {}).get('skip_download', False):
  90. self.assertTrue(os.path.exists(tc['file']), msg='Missing file ' + tc['file'])
  91. self.assertTrue(tc['file'] in finished_hook_called)
  92. self.assertTrue(os.path.exists(tc['file'] + '.info.json'))
  93. if 'md5' in tc:
  94. md5_for_file = _file_md5(tc['file'])
  95. self.assertEqual(md5_for_file, tc['md5'])
  96. with io.open(tc['file'] + '.info.json', encoding='utf-8') as infof:
  97. info_dict = json.load(infof)
  98. for (info_field, expected) in tc.get('info_dict', {}).items():
  99. if isinstance(expected, compat_str) and expected.startswith('md5:'):
  100. got = 'md5:' + md5(info_dict.get(info_field))
  101. else:
  102. got = info_dict.get(info_field)
  103. self.assertEqual(expected, got,
  104. u'invalid value for field %s, expected %r, got %r' % (info_field, expected, got))
  105. # If checkable fields are missing from the test case, print the info_dict
  106. test_info_dict = dict((key, value if not isinstance(value, compat_str) or len(value) < 250 else 'md5:' + md5(value))
  107. for key, value in info_dict.items()
  108. if value and key in ('title', 'description', 'uploader', 'upload_date', 'uploader_id', 'location'))
  109. if not all(key in tc.get('info_dict', {}).keys() for key in test_info_dict.keys()):
  110. sys.stderr.write(u'\n"info_dict": ' + json.dumps(test_info_dict, ensure_ascii=False, indent=2) + u'\n')
  111. # Check for the presence of mandatory fields
  112. for key in ('id', 'url', 'title', 'ext'):
  113. self.assertTrue(key in info_dict.keys() and info_dict[key])
  114. finally:
  115. for tc in test_cases:
  116. try_rm(tc['file'])
  117. try_rm(tc['file'] + '.part')
  118. try_rm(tc['file'] + '.info.json')
  119. return test_template
  120. ### And add them to TestDownload
  121. for n, test_case in enumerate(defs):
  122. test_method = generator(test_case)
  123. tname = 'test_' + str(test_case['name'])
  124. i = 1
  125. while hasattr(TestDownload, tname):
  126. tname = 'test_' + str(test_case['name']) + '_' + str(i)
  127. i += 1
  128. test_method.__name__ = tname
  129. setattr(TestDownload, test_method.__name__, test_method)
  130. del test_method
  131. if __name__ == '__main__':
  132. unittest.main()