You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123 lines
4.2 KiB

12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
  1. #!/usr/bin/env python
  2. import errno
  3. import hashlib
  4. import io
  5. import os
  6. import json
  7. import unittest
  8. import sys
  9. import hashlib
  10. import socket
  11. # Allow direct execution
  12. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  13. import youtube_dl.FileDownloader
  14. import youtube_dl.InfoExtractors
  15. from youtube_dl.utils import *
  16. DEF_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tests.json')
  17. PARAMETERS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "parameters.json")
  18. # General configuration (from __init__, not very elegant...)
  19. jar = compat_cookiejar.CookieJar()
  20. cookie_processor = compat_urllib_request.HTTPCookieProcessor(jar)
  21. proxy_handler = compat_urllib_request.ProxyHandler()
  22. opener = compat_urllib_request.build_opener(proxy_handler, cookie_processor, YoutubeDLHandler())
  23. compat_urllib_request.install_opener(opener)
  24. def _try_rm(filename):
  25. """ Remove a file if it exists """
  26. try:
  27. os.remove(filename)
  28. except OSError as ose:
  29. if ose.errno != errno.ENOENT:
  30. raise
  31. class FileDownloader(youtube_dl.FileDownloader):
  32. def __init__(self, *args, **kwargs):
  33. self.to_stderr = self.to_screen
  34. self.processed_info_dicts = []
  35. return youtube_dl.FileDownloader.__init__(self, *args, **kwargs)
  36. def process_info(self, info_dict):
  37. self.processed_info_dicts.append(info_dict)
  38. return youtube_dl.FileDownloader.process_info(self, info_dict)
  39. def _file_md5(fn):
  40. with open(fn, 'rb') as f:
  41. return hashlib.md5(f.read()).hexdigest()
  42. with io.open(DEF_FILE, encoding='utf-8') as deff:
  43. defs = json.load(deff)
  44. with io.open(PARAMETERS_FILE, encoding='utf-8') as pf:
  45. parameters = json.load(pf)
  46. class TestDownload(unittest.TestCase):
  47. def setUp(self):
  48. self.parameters = parameters
  49. self.defs = defs
  50. ### Dynamically generate tests
  51. def generator(test_case):
  52. def test_template(self):
  53. ie = getattr(youtube_dl.InfoExtractors, test_case['name'] + 'IE')
  54. if not ie._WORKING:
  55. print('Skipping: IE marked as not _WORKING')
  56. return
  57. if 'playlist' not in test_case and not test_case['file']:
  58. print('Skipping: No output file specified')
  59. return
  60. if 'skip' in test_case:
  61. print('Skipping: {0}'.format(test_case['skip']))
  62. return
  63. params = self.parameters.copy()
  64. params.update(test_case.get('params', {}))
  65. fd = FileDownloader(params)
  66. fd.add_info_extractor(ie())
  67. for ien in test_case.get('add_ie', []):
  68. fd.add_info_extractor(getattr(youtube_dl.InfoExtractors, ien + 'IE')())
  69. test_cases = test_case.get('playlist', [test_case])
  70. for tc in test_cases:
  71. _try_rm(tc['file'])
  72. _try_rm(tc['file'] + '.info.json')
  73. try:
  74. fd.download([test_case['url']])
  75. for tc in test_cases:
  76. if not test_case.get('params', {}).get('skip_download', False):
  77. self.assertTrue(os.path.exists(tc['file']))
  78. self.assertTrue(os.path.exists(tc['file'] + '.info.json'))
  79. if 'md5' in tc:
  80. md5_for_file = _file_md5(tc['file'])
  81. self.assertEqual(md5_for_file, tc['md5'])
  82. with io.open(tc['file'] + '.info.json', encoding='utf-8') as infof:
  83. info_dict = json.load(infof)
  84. for (info_field, value) in tc.get('info_dict', {}).items():
  85. if value.startswith('md5:'):
  86. md5_info_value = hashlib.md5(info_dict.get(info_field, '')).hexdigest()
  87. self.assertEqual(value[3:], md5_info_value)
  88. else:
  89. self.assertEqual(value, info_dict.get(info_field))
  90. finally:
  91. for tc in test_cases:
  92. _try_rm(tc['file'])
  93. _try_rm(tc['file'] + '.info.json')
  94. return test_template
  95. ### And add them to TestDownload
  96. for test_case in defs:
  97. test_method = generator(test_case)
  98. test_method.__name__ = "test_{0}".format(test_case["name"])
  99. setattr(TestDownload, test_method.__name__, test_method)
  100. del test_method
  101. if __name__ == '__main__':
  102. unittest.main()