|
|
@ -1,72 +1,94 @@ |
|
|
|
#!/usr/bin/env python2 |
|
|
|
#!/usr/bin/env python |
|
|
|
|
|
|
|
import hashlib |
|
|
|
import io |
|
|
|
import os |
|
|
|
import json |
|
|
|
import unittest |
|
|
|
import sys |
|
|
|
import socket |
|
|
|
|
|
|
|
# Allow direct execution |
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
|
|
|
|
|
from youtube_dl.FileDownloader import FileDownloader |
|
|
|
#import all the info extractor |
|
|
|
import youtube_dl |
|
|
|
import youtube_dl.FileDownloader |
|
|
|
import youtube_dl.InfoExtractors |
|
|
|
from youtube_dl.utils import * |
|
|
|
|
|
|
|
DEF_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tests.json') |
|
|
|
PARAM_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'parameters.json' |
|
|
|
) |
|
|
|
PARAMETERS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "parameters.json") |
|
|
|
|
|
|
|
# General configuration (from __init__, not very elegant...) |
|
|
|
jar = compat_cookiejar.CookieJar() |
|
|
|
cookie_processor = compat_urllib_request.HTTPCookieProcessor(jar) |
|
|
|
proxy_handler = compat_urllib_request.ProxyHandler() |
|
|
|
opener = compat_urllib_request.build_opener(proxy_handler, cookie_processor, YoutubeDLHandler()) |
|
|
|
compat_urllib_request.install_opener(opener) |
|
|
|
socket.setdefaulttimeout(300) # 5 minutes should be enough (famous last words) |
|
|
|
|
|
|
|
class FileDownloader(youtube_dl.FileDownloader): |
|
|
|
def __init__(self, *args, **kwargs): |
|
|
|
youtube_dl.FileDownloader.__init__(self, *args, **kwargs) |
|
|
|
self.to_stderr = self.to_screen |
|
|
|
|
|
|
|
def _file_md5(fn): |
|
|
|
with open(fn, 'rb') as f: |
|
|
|
return hashlib.md5(f.read()).hexdigest() |
|
|
|
|
|
|
|
with io.open(DEF_FILE, encoding='utf-8') as deff: |
|
|
|
defs = json.load(deff) |
|
|
|
with io.open(PARAMETERS_FILE, encoding='utf-8') as pf: |
|
|
|
parameters = json.load(pf) |
|
|
|
|
|
|
|
class TestDownload(unittest.TestCase): |
|
|
|
pass |
|
|
|
def setUp(self): |
|
|
|
self.parameters = parameters |
|
|
|
self.defs = defs |
|
|
|
|
|
|
|
# Clear old files |
|
|
|
self.tearDown() |
|
|
|
|
|
|
|
def tearDown(self): |
|
|
|
for fn in [ test.get('file', False) for test in self.defs ]: |
|
|
|
if fn and os.path.exists(fn): |
|
|
|
os.remove(fn) |
|
|
|
|
|
|
|
def md5_for_file(filename, block_size=2**20): |
|
|
|
with open(filename) as f: |
|
|
|
md5 = hashlib.md5() |
|
|
|
while True: |
|
|
|
data = f.read(block_size) |
|
|
|
if not data: |
|
|
|
break |
|
|
|
md5.update(data) |
|
|
|
return md5.hexdigest() |
|
|
|
|
|
|
|
### Dinamically generate tests |
|
|
|
def generator(test_case): |
|
|
|
|
|
|
|
def generator(name, url, md5, file, ie_param, optional_ie): |
|
|
|
def test_template(self): |
|
|
|
fd = FileDownloader(ie_param) |
|
|
|
fd.add_info_extractor(getattr(youtube_dl, name + "IE")()) |
|
|
|
fd.download([url]) |
|
|
|
self.assertTrue(os.path.exists(file)) |
|
|
|
self.assertEqual(md5_for_file(file), md5) |
|
|
|
ie = getattr(youtube_dl.InfoExtractors, test_case['name'] + 'IE') |
|
|
|
if not ie._WORKING: |
|
|
|
print('Skipping: IE marked as not _WORKING') |
|
|
|
return |
|
|
|
if not test_case['file']: |
|
|
|
print('Skipping: No output file specified') |
|
|
|
return |
|
|
|
if 'skip' in test_case: |
|
|
|
print('Skipping: {0}'.format(test_case['skip'])) |
|
|
|
return |
|
|
|
params = dict(self.parameters) # Duplicate it locally |
|
|
|
for p in test_case.get('params', {}): |
|
|
|
params[p] = test_case['params'][p] |
|
|
|
fd = FileDownloader(params) |
|
|
|
fd.add_info_extractor(ie()) |
|
|
|
for ien in test_case.get('add_ie', []): |
|
|
|
fd.add_info_extractor(getattr(youtube_dl.InfoExtractors, ien + 'IE')()) |
|
|
|
fd.download([test_case['url']]) |
|
|
|
self.assertTrue(os.path.exists(test_case['file'])) |
|
|
|
if 'md5' in test_case: |
|
|
|
md5_for_file = _file_md5(test_case['file']) |
|
|
|
self.assertEqual(md5_for_file, test_case['md5']) |
|
|
|
|
|
|
|
return test_template |
|
|
|
#only python 2.7 |
|
|
|
|
|
|
|
def clean_generator(files): |
|
|
|
def clean_template(self): |
|
|
|
for file_name in files: |
|
|
|
if os.path.exists(file_name): |
|
|
|
os.remove(file_name) |
|
|
|
return clean_template |
|
|
|
|
|
|
|
with open(DEF_FILE, "r") as f: |
|
|
|
with open(PARAM_FILE) as fp: |
|
|
|
p = json.load(fp) |
|
|
|
test_param = json.load(f) |
|
|
|
files = set() |
|
|
|
for test_case in test_param: |
|
|
|
if test_case.get("broken", False): |
|
|
|
continue |
|
|
|
try: |
|
|
|
files.add(test_case["file"]) |
|
|
|
test_method = generator(test_case['name'], test_case['url'], test_case['md5'], test_case['file'], p, test_case.get('add_ie', [])) |
|
|
|
test_method.__name__ = "test_{0}".format(test_case["name"]) |
|
|
|
setattr(TestDownload, test_method.__name__, test_method) |
|
|
|
del test_method |
|
|
|
except KeyError as e: |
|
|
|
sys.stderr.write("Issue with the parameters of test {0}.\n".format(test_case.get("name", "unknown test"))) |
|
|
|
#clean the files |
|
|
|
ff = clean_generator(files) |
|
|
|
ff.__name__ = "tearDown" |
|
|
|
setattr(TestDownload, ff.__name__, ff) |
|
|
|
del ff |
|
|
|
|
|
|
|
### And add them to TestDownload |
|
|
|
for test_case in defs: |
|
|
|
test_method = generator(test_case) |
|
|
|
test_method.__name__ = "test_{0}".format(test_case["name"]) |
|
|
|
setattr(TestDownload, test_method.__name__, test_method) |
|
|
|
del test_method |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|