You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

265 lines
9.9 KiB

10 years ago
10 years ago
10 years ago
10 years ago
12 years ago
10 years ago
12 years ago
10 years ago
10 years ago
12 years ago
10 years ago
10 years ago
12 years ago
10 years ago
10 years ago
10 years ago
  1. #!/usr/bin/env python
  2. from __future__ import unicode_literals
  3. # Allow direct execution
  4. import os
  5. import sys
  6. import unittest
  7. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  8. from test.helper import (
  9. assertGreaterEqual,
  10. expect_warnings,
  11. get_params,
  12. gettestcases,
  13. expect_info_dict,
  14. try_rm,
  15. report_warning,
  16. )
  17. import hashlib
  18. import io
  19. import json
  20. import socket
  21. import youtube_dl.YoutubeDL
  22. from youtube_dl.compat import (
  23. compat_http_client,
  24. compat_urllib_error,
  25. compat_HTTPError,
  26. )
  27. from youtube_dl.utils import (
  28. DownloadError,
  29. ExtractorError,
  30. format_bytes,
  31. UnavailableVideoError,
  32. )
  33. from youtube_dl.extractor import get_info_extractor
  34. RETRIES = 3
  35. class YoutubeDL(youtube_dl.YoutubeDL):
  36. def __init__(self, *args, **kwargs):
  37. self.to_stderr = self.to_screen
  38. self.processed_info_dicts = []
  39. super(YoutubeDL, self).__init__(*args, **kwargs)
  40. def report_warning(self, message):
  41. # Don't accept warnings during tests
  42. raise ExtractorError(message)
  43. def process_info(self, info_dict):
  44. self.processed_info_dicts.append(info_dict)
  45. return super(YoutubeDL, self).process_info(info_dict)
  46. def _file_md5(fn):
  47. with open(fn, 'rb') as f:
  48. return hashlib.md5(f.read()).hexdigest()
  49. defs = gettestcases()
  50. class TestDownload(unittest.TestCase):
  51. # Parallel testing in nosetests. See
  52. # http://nose.readthedocs.org/en/latest/doc_tests/test_multiprocess/multiprocess.html
  53. _multiprocess_shared_ = True
  54. maxDiff = None
  55. def __str__(self):
  56. """Identify each test with the `add_ie` attribute, if available."""
  57. def strclass(cls):
  58. """From 2.7's unittest; 2.6 had _strclass so we can't import it."""
  59. return '%s.%s' % (cls.__module__, cls.__name__)
  60. add_ie = getattr(self, self._testMethodName).add_ie
  61. return '%s (%s)%s:' % (self._testMethodName,
  62. strclass(self.__class__),
  63. ' [%s]' % add_ie if add_ie else '')
  64. def setUp(self):
  65. self.defs = defs
  66. # Dynamically generate tests
  67. def generator(test_case, tname):
  68. def test_template(self):
  69. ie = youtube_dl.extractor.get_info_extractor(test_case['name'])()
  70. other_ies = [get_info_extractor(ie_key)() for ie_key in test_case.get('add_ie', [])]
  71. is_playlist = any(k.startswith('playlist') for k in test_case)
  72. test_cases = test_case.get(
  73. 'playlist', [] if is_playlist else [test_case])
  74. def print_skipping(reason):
  75. print('Skipping %s: %s' % (test_case['name'], reason))
  76. if not ie.working():
  77. print_skipping('IE marked as not _WORKING')
  78. return
  79. for tc in test_cases:
  80. info_dict = tc.get('info_dict', {})
  81. if not (info_dict.get('id') and info_dict.get('ext')):
  82. raise Exception('Test definition incorrect. The output file cannot be known. Are both \'id\' and \'ext\' keys present?')
  83. if 'skip' in test_case:
  84. print_skipping(test_case['skip'])
  85. return
  86. for other_ie in other_ies:
  87. if not other_ie.working():
  88. print_skipping('test depends on %sIE, marked as not WORKING' % other_ie.ie_key())
  89. return
  90. params = get_params(test_case.get('params', {}))
  91. params['outtmpl'] = tname + '_' + params['outtmpl']
  92. if is_playlist and 'playlist' not in test_case:
  93. params.setdefault('extract_flat', 'in_playlist')
  94. params.setdefault('skip_download', True)
  95. ydl = YoutubeDL(params, auto_init=False)
  96. ydl.add_default_info_extractors()
  97. finished_hook_called = set()
  98. def _hook(status):
  99. if status['status'] == 'finished':
  100. finished_hook_called.add(status['filename'])
  101. ydl.add_progress_hook(_hook)
  102. expect_warnings(ydl, test_case.get('expected_warnings', []))
  103. def get_tc_filename(tc):
  104. return ydl.prepare_filename(tc.get('info_dict', {}))
  105. res_dict = None
  106. def try_rm_tcs_files(tcs=None):
  107. if tcs is None:
  108. tcs = test_cases
  109. for tc in tcs:
  110. tc_filename = get_tc_filename(tc)
  111. try_rm(tc_filename)
  112. try_rm(tc_filename + '.part')
  113. try_rm(os.path.splitext(tc_filename)[0] + '.info.json')
  114. try_rm_tcs_files()
  115. try:
  116. try_num = 1
  117. while True:
  118. try:
  119. # We're not using .download here since that is just a shim
  120. # for outside error handling, and returns the exit code
  121. # instead of the result dict.
  122. res_dict = ydl.extract_info(
  123. test_case['url'],
  124. force_generic_extractor=params.get('force_generic_extractor', False))
  125. except (DownloadError, ExtractorError) as err:
  126. # Check if the exception is not a network related one
  127. if not err.exc_info[0] in (compat_urllib_error.URLError, socket.timeout, UnavailableVideoError, compat_http_client.BadStatusLine) or (err.exc_info[0] == compat_HTTPError and err.exc_info[1].code == 503):
  128. raise
  129. if try_num == RETRIES:
  130. report_warning('%s failed due to network errors, skipping...' % tname)
  131. return
  132. print('Retrying: {0} failed tries\n\n##########\n\n'.format(try_num))
  133. try_num += 1
  134. else:
  135. break
  136. if is_playlist:
  137. self.assertTrue(res_dict['_type'] in ['playlist', 'multi_video'])
  138. self.assertTrue('entries' in res_dict)
  139. expect_info_dict(self, res_dict, test_case.get('info_dict', {}))
  140. if 'playlist_mincount' in test_case:
  141. assertGreaterEqual(
  142. self,
  143. len(res_dict['entries']),
  144. test_case['playlist_mincount'],
  145. 'Expected at least %d in playlist %s, but got only %d' % (
  146. test_case['playlist_mincount'], test_case['url'],
  147. len(res_dict['entries'])))
  148. if 'playlist_count' in test_case:
  149. self.assertEqual(
  150. len(res_dict['entries']),
  151. test_case['playlist_count'],
  152. 'Expected %d entries in playlist %s, but got %d.' % (
  153. test_case['playlist_count'],
  154. test_case['url'],
  155. len(res_dict['entries']),
  156. ))
  157. if 'playlist_duration_sum' in test_case:
  158. got_duration = sum(e['duration'] for e in res_dict['entries'])
  159. self.assertEqual(
  160. test_case['playlist_duration_sum'], got_duration)
  161. # Generalize both playlists and single videos to unified format for
  162. # simplicity
  163. if 'entries' not in res_dict:
  164. res_dict['entries'] = [res_dict]
  165. for tc_num, tc in enumerate(test_cases):
  166. tc_res_dict = res_dict['entries'][tc_num]
  167. # First, check test cases' data against extracted data alone
  168. expect_info_dict(self, tc_res_dict, tc.get('info_dict', {}))
  169. # Now, check downloaded file consistency
  170. tc_filename = get_tc_filename(tc)
  171. if not test_case.get('params', {}).get('skip_download', False):
  172. self.assertTrue(os.path.exists(tc_filename), msg='Missing file ' + tc_filename)
  173. self.assertTrue(tc_filename in finished_hook_called)
  174. expected_minsize = tc.get('file_minsize', 10000)
  175. if expected_minsize is not None:
  176. if params.get('test'):
  177. expected_minsize = max(expected_minsize, 10000)
  178. got_fsize = os.path.getsize(tc_filename)
  179. assertGreaterEqual(
  180. self, got_fsize, expected_minsize,
  181. 'Expected %s to be at least %s, but it\'s only %s ' %
  182. (tc_filename, format_bytes(expected_minsize),
  183. format_bytes(got_fsize)))
  184. if 'md5' in tc:
  185. md5_for_file = _file_md5(tc_filename)
  186. self.assertEqual(tc['md5'], md5_for_file)
  187. # Finally, check test cases' data again but this time against
  188. # extracted data from info JSON file written during processing
  189. info_json_fn = os.path.splitext(tc_filename)[0] + '.info.json'
  190. self.assertTrue(
  191. os.path.exists(info_json_fn),
  192. 'Missing info file %s' % info_json_fn)
  193. with io.open(info_json_fn, encoding='utf-8') as infof:
  194. info_dict = json.load(infof)
  195. expect_info_dict(self, info_dict, tc.get('info_dict', {}))
  196. finally:
  197. try_rm_tcs_files()
  198. if is_playlist and res_dict is not None and res_dict.get('entries'):
  199. # Remove all other files that may have been extracted if the
  200. # extractor returns full results even with extract_flat
  201. res_tcs = [{'info_dict': e} for e in res_dict['entries']]
  202. try_rm_tcs_files(res_tcs)
  203. return test_template
  204. # And add them to TestDownload
  205. for n, test_case in enumerate(defs):
  206. tname = 'test_' + str(test_case['name'])
  207. i = 1
  208. while hasattr(TestDownload, tname):
  209. tname = 'test_%s_%d' % (test_case['name'], i)
  210. i += 1
  211. test_method = generator(test_case, tname)
  212. test_method.__name__ = str(tname)
  213. ie_list = test_case.get('add_ie')
  214. test_method.add_ie = ie_list and ','.join(ie_list)
  215. setattr(TestDownload, test_method.__name__, test_method)
  216. del test_method
  217. if __name__ == '__main__':
  218. unittest.main()