]>
Commit | Line | Data |
---|---|---|
1 | #!/usr/bin/env python3 | |
2 | ||
3 | # Allow direct execution | |
4 | import os | |
5 | import sys | |
6 | import unittest | |
7 | ||
8 | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
9 | ||
10 | ||
11 | import collections | |
12 | import hashlib | |
13 | import json | |
14 | ||
15 | from test.helper import ( | |
16 | assertGreaterEqual, | |
17 | expect_info_dict, | |
18 | expect_warnings, | |
19 | get_params, | |
20 | gettestcases, | |
21 | getwebpagetestcases, | |
22 | is_download_test, | |
23 | report_warning, | |
24 | try_rm, | |
25 | ) | |
26 | ||
27 | import yt_dlp.YoutubeDL # isort: split | |
28 | from yt_dlp.extractor import get_info_extractor | |
29 | from yt_dlp.networking.exceptions import HTTPError, TransportError | |
30 | from yt_dlp.utils import ( | |
31 | DownloadError, | |
32 | ExtractorError, | |
33 | UnavailableVideoError, | |
34 | YoutubeDLError, | |
35 | format_bytes, | |
36 | join_nonempty, | |
37 | ) | |
38 | ||
39 | RETRIES = 3 | |
40 | ||
41 | ||
42 | class YoutubeDL(yt_dlp.YoutubeDL): | |
43 | def __init__(self, *args, **kwargs): | |
44 | self.to_stderr = self.to_screen | |
45 | self.processed_info_dicts = [] | |
46 | super().__init__(*args, **kwargs) | |
47 | ||
48 | def report_warning(self, message, *args, **kwargs): | |
49 | # Don't accept warnings during tests | |
50 | raise ExtractorError(message) | |
51 | ||
52 | def process_info(self, info_dict): | |
53 | self.processed_info_dicts.append(info_dict.copy()) | |
54 | return super().process_info(info_dict) | |
55 | ||
56 | ||
57 | def _file_md5(fn): | |
58 | with open(fn, 'rb') as f: | |
59 | return hashlib.md5(f.read()).hexdigest() | |
60 | ||
61 | ||
62 | normal_test_cases = gettestcases() | |
63 | webpage_test_cases = getwebpagetestcases() | |
64 | tests_counter = collections.defaultdict(collections.Counter) | |
65 | ||
66 | ||
67 | @is_download_test | |
68 | class TestDownload(unittest.TestCase): | |
69 | # Parallel testing in nosetests. See | |
70 | # http://nose.readthedocs.org/en/latest/doc_tests/test_multiprocess/multiprocess.html | |
71 | _multiprocess_shared_ = True | |
72 | ||
73 | maxDiff = None | |
74 | ||
75 | COMPLETED_TESTS = {} | |
76 | ||
77 | def __str__(self): | |
78 | """Identify each test with the `add_ie` attribute, if available.""" | |
79 | cls, add_ie = type(self), getattr(self, self._testMethodName).add_ie | |
80 | return f'{self._testMethodName} ({cls.__module__}.{cls.__name__}){f" [{add_ie}]" if add_ie else ""}:' | |
81 | ||
82 | ||
83 | # Dynamically generate tests | |
84 | ||
85 | def generator(test_case, tname): | |
86 | def test_template(self): | |
87 | if self.COMPLETED_TESTS.get(tname): | |
88 | return | |
89 | self.COMPLETED_TESTS[tname] = True | |
90 | ie = yt_dlp.extractor.get_info_extractor(test_case['name'])() | |
91 | other_ies = [get_info_extractor(ie_key)() for ie_key in test_case.get('add_ie', [])] | |
92 | is_playlist = any(k.startswith('playlist') for k in test_case) | |
93 | test_cases = test_case.get( | |
94 | 'playlist', [] if is_playlist else [test_case]) | |
95 | ||
96 | def print_skipping(reason): | |
97 | print('Skipping {}: {}'.format(test_case['name'], reason)) | |
98 | self.skipTest(reason) | |
99 | ||
100 | if not ie.working(): | |
101 | print_skipping('IE marked as not _WORKING') | |
102 | ||
103 | for tc in test_cases: | |
104 | if tc.get('expected_exception'): | |
105 | continue | |
106 | info_dict = tc.get('info_dict', {}) | |
107 | params = tc.get('params', {}) | |
108 | if not info_dict.get('id'): | |
109 | raise Exception(f'Test {tname} definition incorrect - "id" key is not present') | |
110 | elif not info_dict.get('ext') and info_dict.get('_type', 'video') == 'video': | |
111 | if params.get('skip_download') and params.get('ignore_no_formats_error'): | |
112 | continue | |
113 | raise Exception(f'Test {tname} definition incorrect - "ext" key must be present to define the output file') | |
114 | ||
115 | if 'skip' in test_case: | |
116 | print_skipping(test_case['skip']) | |
117 | ||
118 | for other_ie in other_ies: | |
119 | if not other_ie.working(): | |
120 | print_skipping(f'test depends on {other_ie.ie_key()}IE, marked as not WORKING') | |
121 | ||
122 | params = get_params(test_case.get('params', {})) | |
123 | params['outtmpl'] = tname + '_' + params['outtmpl'] | |
124 | if is_playlist and 'playlist' not in test_case: | |
125 | params.setdefault('extract_flat', 'in_playlist') | |
126 | params.setdefault('playlistend', test_case.get( | |
127 | 'playlist_mincount', test_case.get('playlist_count', -2) + 1)) | |
128 | params.setdefault('skip_download', True) | |
129 | ||
130 | ydl = YoutubeDL(params, auto_init=False) | |
131 | ydl.add_default_info_extractors() | |
132 | finished_hook_called = set() | |
133 | ||
134 | def _hook(status): | |
135 | if status['status'] == 'finished': | |
136 | finished_hook_called.add(status['filename']) | |
137 | ydl.add_progress_hook(_hook) | |
138 | expect_warnings(ydl, test_case.get('expected_warnings', [])) | |
139 | ||
140 | def get_tc_filename(tc): | |
141 | return ydl.prepare_filename(dict(tc.get('info_dict', {}))) | |
142 | ||
143 | res_dict = None | |
144 | ||
145 | def match_exception(err): | |
146 | expected_exception = test_case.get('expected_exception') | |
147 | if not expected_exception: | |
148 | return False | |
149 | if err.__class__.__name__ == expected_exception: | |
150 | return True | |
151 | return any(exc.__class__.__name__ == expected_exception for exc in err.exc_info) | |
152 | ||
153 | def try_rm_tcs_files(tcs=None): | |
154 | if tcs is None: | |
155 | tcs = test_cases | |
156 | for tc in tcs: | |
157 | tc_filename = get_tc_filename(tc) | |
158 | try_rm(tc_filename) | |
159 | try_rm(tc_filename + '.part') | |
160 | try_rm(os.path.splitext(tc_filename)[0] + '.info.json') | |
161 | try_rm_tcs_files() | |
162 | try: | |
163 | try_num = 1 | |
164 | while True: | |
165 | try: | |
166 | # We're not using .download here since that is just a shim | |
167 | # for outside error handling, and returns the exit code | |
168 | # instead of the result dict. | |
169 | res_dict = ydl.extract_info( | |
170 | test_case['url'], | |
171 | force_generic_extractor=params.get('force_generic_extractor', False)) | |
172 | except (DownloadError, ExtractorError) as err: | |
173 | # Check if the exception is not a network related one | |
174 | if not isinstance(err.exc_info[1], (TransportError, UnavailableVideoError)) or (isinstance(err.exc_info[1], HTTPError) and err.exc_info[1].status == 503): | |
175 | if match_exception(err): | |
176 | return | |
177 | err.msg = f'{getattr(err, "msg", err)} ({tname})' | |
178 | raise | |
179 | ||
180 | if try_num == RETRIES: | |
181 | report_warning(f'{tname} failed due to network errors, skipping...') | |
182 | return | |
183 | ||
184 | print(f'Retrying: {try_num} failed tries\n\n##########\n\n') | |
185 | ||
186 | try_num += 1 | |
187 | except YoutubeDLError as err: | |
188 | if match_exception(err): | |
189 | return | |
190 | raise | |
191 | else: | |
192 | break | |
193 | ||
194 | if is_playlist: | |
195 | self.assertTrue(res_dict['_type'] in ['playlist', 'multi_video']) | |
196 | self.assertTrue('entries' in res_dict) | |
197 | expect_info_dict(self, res_dict, test_case.get('info_dict', {})) | |
198 | ||
199 | if 'playlist_mincount' in test_case: | |
200 | assertGreaterEqual( | |
201 | self, | |
202 | len(res_dict['entries']), | |
203 | test_case['playlist_mincount'], | |
204 | 'Expected at least %d in playlist %s, but got only %d' % ( | |
205 | test_case['playlist_mincount'], test_case['url'], | |
206 | len(res_dict['entries']))) | |
207 | if 'playlist_count' in test_case: | |
208 | self.assertEqual( | |
209 | len(res_dict['entries']), | |
210 | test_case['playlist_count'], | |
211 | 'Expected %d entries in playlist %s, but got %d.' % ( | |
212 | test_case['playlist_count'], | |
213 | test_case['url'], | |
214 | len(res_dict['entries']), | |
215 | )) | |
216 | if 'playlist_duration_sum' in test_case: | |
217 | got_duration = sum(e['duration'] for e in res_dict['entries']) | |
218 | self.assertEqual( | |
219 | test_case['playlist_duration_sum'], got_duration) | |
220 | ||
221 | # Generalize both playlists and single videos to unified format for | |
222 | # simplicity | |
223 | if 'entries' not in res_dict: | |
224 | res_dict['entries'] = [res_dict] | |
225 | ||
226 | for tc_num, tc in enumerate(test_cases): | |
227 | tc_res_dict = res_dict['entries'][tc_num] | |
228 | # First, check test cases' data against extracted data alone | |
229 | expect_info_dict(self, tc_res_dict, tc.get('info_dict', {})) | |
230 | if tc_res_dict.get('_type', 'video') != 'video': | |
231 | continue | |
232 | # Now, check downloaded file consistency | |
233 | tc_filename = get_tc_filename(tc) | |
234 | if not test_case.get('params', {}).get('skip_download', False): | |
235 | self.assertTrue(os.path.exists(tc_filename), msg='Missing file ' + tc_filename) | |
236 | self.assertTrue(tc_filename in finished_hook_called) | |
237 | expected_minsize = tc.get('file_minsize', 10000) | |
238 | if expected_minsize is not None: | |
239 | if params.get('test'): | |
240 | expected_minsize = max(expected_minsize, 10000) | |
241 | got_fsize = os.path.getsize(tc_filename) | |
242 | assertGreaterEqual( | |
243 | self, got_fsize, expected_minsize, | |
244 | f'Expected {tc_filename} to be at least {format_bytes(expected_minsize)}, ' | |
245 | f'but it\'s only {format_bytes(got_fsize)} ') | |
246 | if 'md5' in tc: | |
247 | md5_for_file = _file_md5(tc_filename) | |
248 | self.assertEqual(tc['md5'], md5_for_file) | |
249 | # Finally, check test cases' data again but this time against | |
250 | # extracted data from info JSON file written during processing | |
251 | info_json_fn = os.path.splitext(tc_filename)[0] + '.info.json' | |
252 | self.assertTrue( | |
253 | os.path.exists(info_json_fn), | |
254 | f'Missing info file {info_json_fn}') | |
255 | with open(info_json_fn, encoding='utf-8') as infof: | |
256 | info_dict = json.load(infof) | |
257 | expect_info_dict(self, info_dict, tc.get('info_dict', {})) | |
258 | finally: | |
259 | try_rm_tcs_files() | |
260 | if is_playlist and res_dict is not None and res_dict.get('entries'): | |
261 | # Remove all other files that may have been extracted if the | |
262 | # extractor returns full results even with extract_flat | |
263 | res_tcs = [{'info_dict': e} for e in res_dict['entries']] | |
264 | try_rm_tcs_files(res_tcs) | |
265 | ydl.close() | |
266 | return test_template | |
267 | ||
268 | ||
269 | # And add them to TestDownload | |
270 | def inject_tests(test_cases, label=''): | |
271 | for test_case in test_cases: | |
272 | name = test_case['name'] | |
273 | tname = join_nonempty('test', name, label, tests_counter[name][label], delim='_') | |
274 | tests_counter[name][label] += 1 | |
275 | ||
276 | test_method = generator(test_case, tname) | |
277 | test_method.__name__ = tname | |
278 | test_method.add_ie = ','.join(test_case.get('add_ie', [])) | |
279 | setattr(TestDownload, test_method.__name__, test_method) | |
280 | ||
281 | ||
282 | inject_tests(normal_test_cases) | |
283 | ||
284 | # TODO: disable redirection to the IE to ensure we are actually testing the webpage extraction | |
285 | inject_tests(webpage_test_cases, 'webpage') | |
286 | ||
287 | ||
288 | def batch_generator(name): | |
289 | def test_template(self): | |
290 | for label, num_tests in tests_counter[name].items(): | |
291 | for i in range(num_tests): | |
292 | test_name = join_nonempty('test', name, label, i, delim='_') | |
293 | try: | |
294 | getattr(self, test_name)() | |
295 | except unittest.SkipTest: | |
296 | print(f'Skipped {test_name}') | |
297 | ||
298 | return test_template | |
299 | ||
300 | ||
301 | for name in tests_counter: | |
302 | test_method = batch_generator(name) | |
303 | test_method.__name__ = f'test_{name}_all' | |
304 | test_method.add_ie = '' | |
305 | setattr(TestDownload, test_method.__name__, test_method) | |
306 | del test_method | |
307 | ||
308 | ||
309 | if __name__ == '__main__': | |
310 | unittest.main() |