|
|
@ -1,5 +1,6 @@ |
|
|
|
#!/usr/bin/env python |
|
|
|
|
|
|
|
import errno |
|
|
|
import hashlib |
|
|
|
import io |
|
|
|
import os |
|
|
@ -26,6 +27,14 @@ proxy_handler = compat_urllib_request.ProxyHandler() |
|
|
|
opener = compat_urllib_request.build_opener(proxy_handler, cookie_processor, YoutubeDLHandler()) |
|
|
|
compat_urllib_request.install_opener(opener) |
|
|
|
|
|
|
|
def _try_rm(filename): |
|
|
|
""" Remove a file if it exists """ |
|
|
|
try: |
|
|
|
os.remove(filename) |
|
|
|
except OSError as ose: |
|
|
|
if ose.errno != errno.ENOENT: |
|
|
|
raise |
|
|
|
|
|
|
|
class FileDownloader(youtube_dl.FileDownloader): |
|
|
|
def __init__(self, *args, **kwargs): |
|
|
|
self.to_stderr = self.to_screen |
|
|
@ -50,15 +59,6 @@ class TestDownload(unittest.TestCase): |
|
|
|
self.parameters = parameters |
|
|
|
self.defs = defs |
|
|
|
|
|
|
|
# Clear old files |
|
|
|
self.tearDown() |
|
|
|
|
|
|
|
def tearDown(self): |
|
|
|
for fn in [ test.get('file', False) for test in self.defs ]: |
|
|
|
if fn and os.path.exists(fn): |
|
|
|
os.remove(fn) |
|
|
|
|
|
|
|
|
|
|
|
### Dynamically generate tests |
|
|
|
def generator(test_case): |
|
|
|
|
|
|
@ -67,7 +67,7 @@ def generator(test_case): |
|
|
|
if not ie._WORKING: |
|
|
|
print('Skipping: IE marked as not _WORKING') |
|
|
|
return |
|
|
|
if not test_case['file']: |
|
|
|
if 'playlist' not in test_case and not test_case['file']: |
|
|
|
print('Skipping: No output file specified') |
|
|
|
return |
|
|
|
if 'skip' in test_case: |
|
|
@ -82,19 +82,32 @@ def generator(test_case): |
|
|
|
fd.add_info_extractor(ie()) |
|
|
|
for ien in test_case.get('add_ie', []): |
|
|
|
fd.add_info_extractor(getattr(youtube_dl.InfoExtractors, ien + 'IE')()) |
|
|
|
fd.download([test_case['url']]) |
|
|
|
|
|
|
|
self.assertTrue(os.path.exists(test_case['file'])) |
|
|
|
if 'md5' in test_case: |
|
|
|
md5_for_file = _file_md5(test_case['file']) |
|
|
|
self.assertEqual(md5_for_file, test_case['md5']) |
|
|
|
info_dict = fd.processed_info_dicts[0] |
|
|
|
for (info_field, value) in test_case.get('info_dict', {}).items(): |
|
|
|
if value.startswith('md5:'): |
|
|
|
md5_info_value = hashlib.md5(info_dict.get(info_field, '')).hexdigest() |
|
|
|
self.assertEqual(value[3:], md5_info_value) |
|
|
|
else: |
|
|
|
self.assertEqual(value, info_dict.get(info_field)) |
|
|
|
|
|
|
|
test_cases = test_case.get('playlist', [test_case]) |
|
|
|
for tc in test_cases: |
|
|
|
_try_rm(tc['file']) |
|
|
|
_try_rm(tc['file'] + '.info.json') |
|
|
|
try: |
|
|
|
fd.download([test_case['url']]) |
|
|
|
|
|
|
|
for tc in test_cases: |
|
|
|
self.assertTrue(os.path.exists(tc['file'])) |
|
|
|
self.assertTrue(os.path.exists(tc['file'] + '.info.json')) |
|
|
|
if 'md5' in tc: |
|
|
|
md5_for_file = _file_md5(tc['file']) |
|
|
|
self.assertEqual(md5_for_file, tc['md5']) |
|
|
|
with io.open(tc['file'] + '.info.json', encoding='utf-8') as infof: |
|
|
|
info_dict = json.load(infof) |
|
|
|
for (info_field, value) in tc.get('info_dict', {}).items(): |
|
|
|
if value.startswith('md5:'): |
|
|
|
md5_info_value = hashlib.md5(info_dict.get(info_field, '')).hexdigest() |
|
|
|
self.assertEqual(value[3:], md5_info_value) |
|
|
|
else: |
|
|
|
self.assertEqual(value, info_dict.get(info_field)) |
|
|
|
finally: |
|
|
|
for tc in test_cases: |
|
|
|
_try_rm(tc['file']) |
|
|
|
_try_rm(tc['file'] + '.info.json') |
|
|
|
|
|
|
|
return test_template |
|
|
|
|
|
|
|