X-Git-Url: https://jfr.im/git/yt-dlp.git/blobdiff_plain/776995bc109c5cd1aa56b684fada2ce718a386ec..af86873218c24c3859ccf575a87f2b00a73b49d0:/test/test_utils.py diff --git a/test/test_utils.py b/test/test_utils.py index ffe1b729f..b36bc04c2 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -5,6 +5,7 @@ import re import sys import unittest +import warnings sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -50,6 +51,7 @@ escape_url, expand_path, extract_attributes, + extract_basic_auth, find_xpath_attr, fix_xml_ampersands, float_or_none, @@ -102,7 +104,6 @@ sanitize_filename, sanitize_path, sanitize_url, - sanitized_Request, shell_quote, smuggle_url, str_or_none, @@ -112,6 +113,7 @@ subtitles_filename, timeconvert, traverse_obj, + try_call, unescapeHTML, unified_strdate, unified_timestamp, @@ -123,12 +125,14 @@ urlencode_postdata, urljoin, urshift, + variadic, version_tuple, xpath_attr, xpath_element, xpath_text, xpath_with_ns, ) +from yt_dlp.utils.networking import HTTPHeaderDict class TestUtil(unittest.TestCase): @@ -255,15 +259,6 @@ def test_sanitize_url(self): self.assertEqual(sanitize_url('https://foo.bar'), 'https://foo.bar') self.assertEqual(sanitize_url('foo bar'), 'foo bar') - def test_extract_basic_auth(self): - auth_header = lambda url: sanitized_Request(url).get_header('Authorization') - self.assertFalse(auth_header('http://foo.bar')) - self.assertFalse(auth_header('http://:foo.bar')) - self.assertEqual(auth_header('http://@foo.bar'), 'Basic Og==') - self.assertEqual(auth_header('http://:pass@foo.bar'), 'Basic OnBhc3M=') - self.assertEqual(auth_header('http://user:@foo.bar'), 'Basic dXNlcjo=') - self.assertEqual(auth_header('http://user:pass@foo.bar'), 'Basic dXNlcjpwYXNz') - def test_expand_path(self): def env(var): return f'%{var}%' if sys.platform == 'win32' else f'${var}' @@ -660,6 +655,8 @@ def test_parse_duration(self): self.assertEqual(parse_duration('P0Y0M0DT0H4M20.880S'), 260.88) self.assertEqual(parse_duration('01:02:03:050'), 3723.05) self.assertEqual(parse_duration('103:050'), 103.05) + self.assertEqual(parse_duration('1HR 3MIN'), 3780) + self.assertEqual(parse_duration('2hrs 3mins'), 7380) def test_fix_xml_ampersands(self): self.assertEqual( @@ -1190,6 +1187,13 @@ def test_js_to_json_malformed(self): self.assertEqual(js_to_json('42a1'), '42"a1"') self.assertEqual(js_to_json('42a-1'), '42"a"-1') + def test_js_to_json_template_literal(self): + self.assertEqual(js_to_json('`Hello ${name}`', {'name': '"world"'}), '"Hello world"') + self.assertEqual(js_to_json('`${name}${name}`', {'name': '"X"'}), '"XX"') + self.assertEqual(js_to_json('`${name}${name}`', {'name': '5'}), '"55"') + self.assertEqual(js_to_json('`${name}"${name}"`', {'name': '5'}), '"5\\"5\\""') + self.assertEqual(js_to_json('`${name}`', {}), '"name"') + def test_extract_attributes(self): self.assertEqual(extract_attributes(''), {'x': 'y'}) self.assertEqual(extract_attributes(""), {'x': 'y'}) @@ -1825,6 +1829,8 @@ def test_iri_to_uri(self): def test_clean_podcast_url(self): self.assertEqual(clean_podcast_url('https://www.podtrac.com/pts/redirect.mp3/chtbl.com/track/5899E/traffic.megaphone.fm/HSW7835899191.mp3'), 'https://traffic.megaphone.fm/HSW7835899191.mp3') self.assertEqual(clean_podcast_url('https://play.podtrac.com/npr-344098539/edge1.pod.npr.org/anon.npr-podcasts/podcast/npr/waitwait/2020/10/20201003_waitwait_wwdtmpodcast201003-015621a5-f035-4eca-a9a1-7c118d90bc3c.mp3'), 'https://edge1.pod.npr.org/anon.npr-podcasts/podcast/npr/waitwait/2020/10/20201003_waitwait_wwdtmpodcast201003-015621a5-f035-4eca-a9a1-7c118d90bc3c.mp3') + self.assertEqual(clean_podcast_url('https://pdst.fm/e/2.gum.fm/chtbl.com/track/chrt.fm/track/34D33/pscrb.fm/rss/p/traffic.megaphone.fm/ITLLC7765286967.mp3?updated=1687282661'), 'https://traffic.megaphone.fm/ITLLC7765286967.mp3?updated=1687282661') + self.assertEqual(clean_podcast_url('https://pdst.fm/e/https://mgln.ai/e/441/www.buzzsprout.com/1121972/13019085-ep-252-the-deep-life-stack.mp3'), 'https://www.buzzsprout.com/1121972/13019085-ep-252-the-deep-life-stack.mp3') def test_LazyList(self): it = list(range(10)) @@ -1967,6 +1973,35 @@ def test_get_compatible_ext(self): self.assertEqual(get_compatible_ext( vcodecs=['av1'], acodecs=['mp4a'], vexts=['webm'], aexts=['m4a'], preferences=('webm', 'mkv')), 'mkv') + def test_try_call(self): + def total(*x, **kwargs): + return sum(x) + sum(kwargs.values()) + + self.assertEqual(try_call(None), None, + msg='not a fn should give None') + self.assertEqual(try_call(lambda: 1), 1, + msg='int fn with no expected_type should give int') + self.assertEqual(try_call(lambda: 1, expected_type=int), 1, + msg='int fn with expected_type int should give int') + self.assertEqual(try_call(lambda: 1, expected_type=dict), None, + msg='int fn with wrong expected_type should give None') + self.assertEqual(try_call(total, args=(0, 1, 0, ), expected_type=int), 1, + msg='fn should accept arglist') + self.assertEqual(try_call(total, kwargs={'a': 0, 'b': 1, 'c': 0}, expected_type=int), 1, + msg='fn should accept kwargs') + self.assertEqual(try_call(lambda: 1, expected_type=dict), None, + msg='int fn with no expected_type should give None') + self.assertEqual(try_call(lambda x: {}, total, args=(42, ), expected_type=int), 42, + msg='expect first int result with expected_type int') + + def test_variadic(self): + self.assertEqual(variadic(None), (None, )) + self.assertEqual(variadic('spam'), ('spam', )) + self.assertEqual(variadic('spam', allowed_types=dict), 'spam') + with warnings.catch_warnings(): + warnings.simplefilter('ignore') + self.assertEqual(variadic('spam', allowed_types=[dict]), 'spam') + def test_traverse_obj(self): _TEST_DATA = { 100: 100, @@ -2000,8 +2035,8 @@ def test_traverse_obj(self): # Test Ellipsis behavior self.assertCountEqual(traverse_obj(_TEST_DATA, ...), - (item for item in _TEST_DATA.values() if item is not None), - msg='`...` should give all values except `None`') + (item for item in _TEST_DATA.values() if item not in (None, {})), + msg='`...` should give all non discarded values') self.assertCountEqual(traverse_obj(_TEST_DATA, ('urls', 0, ...)), _TEST_DATA['urls'][0].values(), msg='`...` selection for dicts should select all values') self.assertEqual(traverse_obj(_TEST_DATA, (..., ..., 'url')), @@ -2009,6 +2044,8 @@ def test_traverse_obj(self): msg='nested `...` queries should work') self.assertCountEqual(traverse_obj(_TEST_DATA, (..., ..., 'index')), range(4), msg='`...` query result should be flattened') + self.assertEqual(traverse_obj(iter(range(4)), ...), list(range(4)), + msg='`...` should accept iterables') # Test function as key self.assertEqual(traverse_obj(_TEST_DATA, lambda x, y: x == 'urls' and isinstance(y, list)), @@ -2016,6 +2053,8 @@ def test_traverse_obj(self): msg='function as query key should perform a filter based on (key, value)') self.assertCountEqual(traverse_obj(_TEST_DATA, lambda _, x: isinstance(x[0], str)), {'str'}, msg='exceptions in the query function should be catched') + self.assertEqual(traverse_obj(iter(range(4)), lambda _, x: x % 2 == 0), [0, 2], + msg='function key should accept iterables') if __debug__: with self.assertRaises(Exception, msg='Wrong function signature should raise in debug'): traverse_obj(_TEST_DATA, lambda a: ...) @@ -2040,6 +2079,17 @@ def test_traverse_obj(self): with self.assertRaises(Exception, msg='Sets with length != 1 should raise in debug'): traverse_obj(_TEST_DATA, {str.upper, str}) + # Test `slice` as a key + _SLICE_DATA = [0, 1, 2, 3, 4] + self.assertEqual(traverse_obj(_TEST_DATA, ('dict', slice(1))), None, + msg='slice on a dictionary should not throw') + self.assertEqual(traverse_obj(_SLICE_DATA, slice(1)), _SLICE_DATA[:1], + msg='slice key should apply slice to sequence') + self.assertEqual(traverse_obj(_SLICE_DATA, slice(1, 2)), _SLICE_DATA[1:2], + msg='slice key should apply slice to sequence') + self.assertEqual(traverse_obj(_SLICE_DATA, slice(1, 4, 2)), _SLICE_DATA[1:4:2], + msg='slice key should apply slice to sequence') + # Test alternative paths self.assertEqual(traverse_obj(_TEST_DATA, 'fail', 'str'), 'str', msg='multiple `paths` should be treated as alternative paths') @@ -2084,15 +2134,23 @@ def test_traverse_obj(self): {0: ['https://www.example.com/1', 'https://www.example.com/0']}, msg='tripple nesting in dict path should be treated as branches') self.assertEqual(traverse_obj(_TEST_DATA, {0: 'fail'}), {}, - msg='remove `None` values when dict key') + msg='remove `None` values when top level dict key fails') self.assertEqual(traverse_obj(_TEST_DATA, {0: 'fail'}, default=...), {0: ...}, - msg='do not remove `None` values if `default`') - self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}), {0: {}}, - msg='do not remove empty values when dict key') - self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}, default=...), {0: {}}, - msg='do not remove empty values when dict key and a default') - self.assertEqual(traverse_obj(_TEST_DATA, {0: ('dict', ...)}), {0: []}, - msg='if branch in dict key not successful, return `[]`') + msg='use `default` if key fails and `default`') + self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}), {}, + msg='remove empty values when dict key') + self.assertEqual(traverse_obj(_TEST_DATA, {0: 'dict'}, default=...), {0: ...}, + msg='use `default` when dict key and `default`') + self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 'fail'}}), {}, + msg='remove empty values when nested dict key fails') + self.assertEqual(traverse_obj(None, {0: 'fail'}), {}, + msg='default to dict if pruned') + self.assertEqual(traverse_obj(None, {0: 'fail'}, default=...), {0: ...}, + msg='default to dict if pruned and default is given') + self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 'fail'}}, default=...), {0: {0: ...}}, + msg='use nested `default` when nested dict key fails and `default`') + self.assertEqual(traverse_obj(_TEST_DATA, {0: ('dict', ...)}), {}, + msg='remove key if branch in dict key not successful') # Testing default parameter behavior _DEFAULT_DATA = {'None': None, 'int': 0, 'list': []} @@ -2116,34 +2174,55 @@ def test_traverse_obj(self): msg='if branched but not successful return `[]`, not `default`') self.assertEqual(traverse_obj(_DEFAULT_DATA, ('list', ...)), [], msg='if branched but object is empty return `[]`, not `default`') + self.assertEqual(traverse_obj(None, ...), [], + msg='if branched but object is `None` return `[]`, not `default`') + self.assertEqual(traverse_obj({0: None}, (0, ...)), [], + msg='if branched but state is `None` return `[]`, not `default`') + + branching_paths = [ + ('fail', ...), + (..., 'fail'), + 100 * ('fail',) + (...,), + (...,) + 100 * ('fail',), + ] + for branching_path in branching_paths: + self.assertEqual(traverse_obj({}, branching_path), [], + msg='if branched but state is `None`, return `[]` (not `default`)') + self.assertEqual(traverse_obj({}, 'fail', branching_path), [], + msg='if branching in last alternative and previous did not match, return `[]` (not `default`)') + self.assertEqual(traverse_obj({0: 'x'}, 0, branching_path), 'x', + msg='if branching in last alternative and previous did match, return single value') + self.assertEqual(traverse_obj({0: 'x'}, branching_path, 0), 'x', + msg='if branching in first alternative and non-branching path does match, return single value') + self.assertEqual(traverse_obj({}, branching_path, 'fail'), None, + msg='if branching in first alternative and non-branching path does not match, return `default`') # Testing expected_type behavior _EXPECTED_TYPE_DATA = {'str': 'str', 'int': 0} - self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=str), 'str', - msg='accept matching `expected_type` type') - self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=int), None, - msg='reject non matching `expected_type` type') - self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'int', expected_type=lambda x: str(x)), '0', - msg='transform type using type function') - self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', - expected_type=lambda _: 1 / 0), None, - msg='wrap expected_type fuction in try_call') - self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, ..., expected_type=str), ['str'], - msg='eliminate items that expected_type fails on') - self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2}, expected_type=int), {0: 100}, - msg='type as expected_type should filter dict values') - self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2, 2: 'None'}, expected_type=str_or_none), {0: '100', 1: '1.2'}, - msg='function as expected_type should transform dict values') - self.assertEqual(traverse_obj(_TEST_DATA, ({0: 1.2}, 0, {int_or_none}), expected_type=int), 1, - msg='expected_type should not filter non final dict values') - self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 100, 1: 'str'}}, expected_type=int), {0: {0: 100}}, - msg='expected_type should transform deep dict values') - self.assertEqual(traverse_obj(_TEST_DATA, [({0: '...'}, {0: '...'})], expected_type=type(...)), [{0: ...}, {0: ...}], - msg='expected_type should transform branched dict values') - self.assertEqual(traverse_obj({1: {3: 4}}, [(1, 2), 3], expected_type=int), [4], - msg='expected_type regression for type matching in tuple branching') - self.assertEqual(traverse_obj(_TEST_DATA, ['data', ...], expected_type=int), [], - msg='expected_type regression for type matching in dict result') + self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=str), + 'str', msg='accept matching `expected_type` type') + self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=int), + None, msg='reject non matching `expected_type` type') + self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'int', expected_type=lambda x: str(x)), + '0', msg='transform type using type function') + self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, 'str', expected_type=lambda _: 1 / 0), + None, msg='wrap expected_type fuction in try_call') + self.assertEqual(traverse_obj(_EXPECTED_TYPE_DATA, ..., expected_type=str), + ['str'], msg='eliminate items that expected_type fails on') + self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2}, expected_type=int), + {0: 100}, msg='type as expected_type should filter dict values') + self.assertEqual(traverse_obj(_TEST_DATA, {0: 100, 1: 1.2, 2: 'None'}, expected_type=str_or_none), + {0: '100', 1: '1.2'}, msg='function as expected_type should transform dict values') + self.assertEqual(traverse_obj(_TEST_DATA, ({0: 1.2}, 0, {int_or_none}), expected_type=int), + 1, msg='expected_type should not filter non final dict values') + self.assertEqual(traverse_obj(_TEST_DATA, {0: {0: 100, 1: 'str'}}, expected_type=int), + {0: {0: 100}}, msg='expected_type should transform deep dict values') + self.assertEqual(traverse_obj(_TEST_DATA, [({0: '...'}, {0: '...'})], expected_type=type(...)), + [{0: ...}, {0: ...}], msg='expected_type should transform branched dict values') + self.assertEqual(traverse_obj({1: {3: 4}}, [(1, 2), 3], expected_type=int), + [4], msg='expected_type regression for type matching in tuple branching') + self.assertEqual(traverse_obj(_TEST_DATA, ['data', ...], expected_type=int), + [], msg='expected_type regression for type matching in dict result') # Test get_all behavior _GET_ALL_DATA = {'key': [0, 1, 2]} @@ -2183,14 +2262,23 @@ def test_traverse_obj(self): traverse_string=True), '.', msg='traverse into converted data if `traverse_string`') self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', ...), - traverse_string=True), list('str'), - msg='`...` branching into string should result in list') + traverse_string=True), 'str', + msg='`...` should result in string (same value) if `traverse_string`') + self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', slice(0, None, 2)), + traverse_string=True), 'sr', + msg='`slice` should result in string if `traverse_string`') + self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', lambda i, v: i or v == "s"), + traverse_string=True), 'str', + msg='function should result in string if `traverse_string`') self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', (0, 2)), traverse_string=True), ['s', 'r'], - msg='branching into string should result in list') - self.assertEqual(traverse_obj(_TRAVERSE_STRING_DATA, ('str', lambda _, x: x), - traverse_string=True), list('str'), - msg='function branching into string should result in list') + msg='branching should result in list if `traverse_string`') + self.assertEqual(traverse_obj({}, (0, ...), traverse_string=True), [], + msg='branching should result in list if `traverse_string`') + self.assertEqual(traverse_obj({}, (0, lambda x, y: True), traverse_string=True), [], + msg='branching should result in list if `traverse_string`') + self.assertEqual(traverse_obj({}, (0, slice(1)), traverse_string=True), [], + msg='branching should result in list if `traverse_string`') # Test is_user_input behavior _IS_USER_INPUT_DATA = {'range8': list(range(8))} @@ -2230,6 +2318,44 @@ def test_traverse_obj(self): self.assertEqual(traverse_obj(mobj, lambda k, _: k in (0, 'group')), ['0123', '3'], msg='function on a `re.Match` should give group name as well') + def test_http_header_dict(self): + headers = HTTPHeaderDict() + headers['ytdl-test'] = 1 + self.assertEqual(list(headers.items()), [('Ytdl-Test', '1')]) + headers['Ytdl-test'] = '2' + self.assertEqual(list(headers.items()), [('Ytdl-Test', '2')]) + self.assertTrue('ytDl-Test' in headers) + self.assertEqual(str(headers), str(dict(headers))) + self.assertEqual(repr(headers), str(dict(headers))) + + headers.update({'X-dlp': 'data'}) + self.assertEqual(set(headers.items()), {('Ytdl-Test', '2'), ('X-Dlp', 'data')}) + self.assertEqual(dict(headers), {'Ytdl-Test': '2', 'X-Dlp': 'data'}) + self.assertEqual(len(headers), 2) + self.assertEqual(headers.copy(), headers) + headers2 = HTTPHeaderDict({'X-dlp': 'data3'}, **headers, **{'X-dlp': 'data2'}) + self.assertEqual(set(headers2.items()), {('Ytdl-Test', '2'), ('X-Dlp', 'data2')}) + self.assertEqual(len(headers2), 2) + headers2.clear() + self.assertEqual(len(headers2), 0) + + # ensure we prefer latter headers + headers3 = HTTPHeaderDict({'Ytdl-TeSt': 1}, {'Ytdl-test': 2}) + self.assertEqual(set(headers3.items()), {('Ytdl-Test', '2')}) + del headers3['ytdl-tesT'] + self.assertEqual(dict(headers3), {}) + + headers4 = HTTPHeaderDict({'ytdl-test': 'data;'}) + self.assertEqual(set(headers4.items()), {('Ytdl-Test', 'data;')}) + + def test_extract_basic_auth(self): + assert extract_basic_auth('http://:foo.bar') == ('http://:foo.bar', None) + assert extract_basic_auth('http://foo.bar') == ('http://foo.bar', None) + assert extract_basic_auth('http://@foo.bar') == ('http://foo.bar', 'Basic Og==') + assert extract_basic_auth('http://:pass@foo.bar') == ('http://foo.bar', 'Basic OnBhc3M=') + assert extract_basic_auth('http://user:@foo.bar') == ('http://foo.bar', 'Basic dXNlcjo=') + assert extract_basic_auth('http://user:pass@foo.bar') == ('http://foo.bar', 'Basic dXNlcjpwYXNz') + if __name__ == '__main__': unittest.main()