X-Git-Url: https://jfr.im/git/yt-dlp.git/blobdiff_plain/14f25df2b6233553e968df023430ca96c0b1df9f..8e15177b4113c355989881e4e030f695a9b59c3a:/test/test_download.py diff --git a/test/test_download.py b/test/test_download.py index b397b3ecf..253079249 100755 --- a/test/test_download.py +++ b/test/test_download.py @@ -8,11 +8,9 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import collections import hashlib -import http.client import json -import socket -import urllib.error from test.helper import ( assertGreaterEqual, @@ -20,6 +18,7 @@ expect_warnings, get_params, gettestcases, + getwebpagetestcases, is_download_test, report_warning, try_rm, @@ -27,11 +26,14 @@ import yt_dlp.YoutubeDL # isort: split from yt_dlp.extractor import get_info_extractor +from yt_dlp.networking.exceptions import HTTPError, TransportError from yt_dlp.utils import ( DownloadError, ExtractorError, UnavailableVideoError, + YoutubeDLError, format_bytes, + join_nonempty, ) RETRIES = 3 @@ -57,7 +59,9 @@ def _file_md5(fn): return hashlib.md5(f.read()).hexdigest() -defs = gettestcases() +normal_test_cases = gettestcases() +webpage_test_cases = getwebpagetestcases() +tests_counter = collections.defaultdict(collections.Counter) @is_download_test @@ -72,24 +76,13 @@ class TestDownload(unittest.TestCase): def __str__(self): """Identify each test with the `add_ie` attribute, if available.""" + cls, add_ie = type(self), getattr(self, self._testMethodName).add_ie + return f'{self._testMethodName} ({cls.__module__}.{cls.__name__}){f" [{add_ie}]" if add_ie else ""}:' - def strclass(cls): - """From 2.7's unittest; 2.6 had _strclass so we can't import it.""" - return f'{cls.__module__}.{cls.__name__}' - - add_ie = getattr(self, self._testMethodName).add_ie - return '%s (%s)%s:' % (self._testMethodName, - strclass(self.__class__), - ' [%s]' % add_ie if add_ie else '') - - def setUp(self): - self.defs = defs # Dynamically generate tests - def generator(test_case, tname): - def test_template(self): if self.COMPLETED_TESTS.get(tname): return @@ -108,14 +101,16 @@ def print_skipping(reason): print_skipping('IE marked as not _WORKING') for tc in test_cases: + if tc.get('expected_exception'): + continue info_dict = tc.get('info_dict', {}) params = tc.get('params', {}) if not info_dict.get('id'): - raise Exception('Test definition incorrect. \'id\' key is not present') - elif not info_dict.get('ext'): + raise Exception(f'Test {tname} definition incorrect - "id" key is not present') + elif not info_dict.get('ext') and info_dict.get('_type', 'video') == 'video': if params.get('skip_download') and params.get('ignore_no_formats_error'): continue - raise Exception('Test definition incorrect. The output file cannot be known. \'ext\' key is not present') + raise Exception(f'Test {tname} definition incorrect - "ext" key must be present to define the output file') if 'skip' in test_case: print_skipping(test_case['skip']) @@ -128,7 +123,8 @@ def print_skipping(reason): params['outtmpl'] = tname + '_' + params['outtmpl'] if is_playlist and 'playlist' not in test_case: params.setdefault('extract_flat', 'in_playlist') - params.setdefault('playlistend', test_case.get('playlist_mincount')) + params.setdefault('playlistend', test_case.get( + 'playlist_mincount', test_case.get('playlist_count', -2) + 1)) params.setdefault('skip_download', True) ydl = YoutubeDL(params, auto_init=False) @@ -146,6 +142,17 @@ def get_tc_filename(tc): res_dict = None + def match_exception(err): + expected_exception = test_case.get('expected_exception') + if not expected_exception: + return False + if err.__class__.__name__ == expected_exception: + return True + for exc in err.exc_info: + if exc.__class__.__name__ == expected_exception: + return True + return False + def try_rm_tcs_files(tcs=None): if tcs is None: tcs = test_cases @@ -167,7 +174,10 @@ def try_rm_tcs_files(tcs=None): force_generic_extractor=params.get('force_generic_extractor', False)) except (DownloadError, ExtractorError) as err: # Check if the exception is not a network related one - if not err.exc_info[0] in (urllib.error.URLError, socket.timeout, UnavailableVideoError, http.client.BadStatusLine) or (err.exc_info[0] == urllib.error.HTTPError and err.exc_info[1].code == 503): + if not isinstance(err.exc_info[1], (TransportError, UnavailableVideoError)) or (isinstance(err.exc_info[1], HTTPError) and err.exc_info[1].status == 503): + if match_exception(err): + return + err.msg = f'{getattr(err, "msg", err)} ({tname})' raise if try_num == RETRIES: @@ -177,6 +187,10 @@ def try_rm_tcs_files(tcs=None): print(f'Retrying: {try_num} failed tries\n\n##########\n\n') try_num += 1 + except YoutubeDLError as err: + if match_exception(err): + return + raise else: break @@ -216,6 +230,8 @@ def try_rm_tcs_files(tcs=None): tc_res_dict = res_dict['entries'][tc_num] # First, check test cases' data against extracted data alone expect_info_dict(self, tc_res_dict, tc.get('info_dict', {})) + if tc_res_dict.get('_type', 'video') != 'video': + continue # Now, check downloaded file consistency tc_filename = get_tc_filename(tc) if not test_case.get('params', {}).get('skip_download', False): @@ -250,40 +266,48 @@ def try_rm_tcs_files(tcs=None): # extractor returns full results even with extract_flat res_tcs = [{'info_dict': e} for e in res_dict['entries']] try_rm_tcs_files(res_tcs) - + ydl.close() return test_template # And add them to TestDownload -tests_counter = {} -for test_case in defs: - name = test_case['name'] - i = tests_counter.get(name, 0) - tests_counter[name] = i + 1 - tname = f'test_{name}_{i}' if i else f'test_{name}' - test_method = generator(test_case, tname) - test_method.__name__ = str(tname) - ie_list = test_case.get('add_ie') - test_method.add_ie = ie_list and ','.join(ie_list) - setattr(TestDownload, test_method.__name__, test_method) - del test_method +def inject_tests(test_cases, label=''): + for test_case in test_cases: + name = test_case['name'] + tname = join_nonempty('test', name, label, tests_counter[name][label], delim='_') + tests_counter[name][label] += 1 + + test_method = generator(test_case, tname) + test_method.__name__ = tname + test_method.add_ie = ','.join(test_case.get('add_ie', [])) + setattr(TestDownload, test_method.__name__, test_method) -def batch_generator(name, num_tests): +inject_tests(normal_test_cases) +# TODO: disable redirection to the IE to ensure we are actually testing the webpage extraction +inject_tests(webpage_test_cases, 'webpage') + + +def batch_generator(name): def test_template(self): - for i in range(num_tests): - getattr(self, f'test_{name}_{i}' if i else f'test_{name}')() + for label, num_tests in tests_counter[name].items(): + for i in range(num_tests): + test_name = join_nonempty('test', name, label, i, delim='_') + try: + getattr(self, test_name)() + except unittest.SkipTest: + print(f'Skipped {test_name}') return test_template -for name, num_tests in tests_counter.items(): - test_method = batch_generator(name, num_tests) +for name in tests_counter: + test_method = batch_generator(name) test_method.__name__ = f'test_{name}_all' test_method.add_ie = '' setattr(TestDownload, test_method.__name__, test_method) - del test_method +del test_method if __name__ == '__main__':