import calendar
import codecs
import collections
+import collections.abc
import contextlib
-import ctypes
import datetime
import email.header
import email.utils
import time
import traceback
import types
+import unicodedata
import urllib.error
import urllib.parse
import urllib.request
'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'],
}
-KNOWN_EXTENSIONS = (
- 'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'aac',
- 'flv', 'f4v', 'f4a', 'f4b',
- 'webm', 'ogg', 'ogv', 'oga', 'ogx', 'spx', 'opus',
- 'mkv', 'mka', 'mk3d',
- 'avi', 'divx',
- 'mov',
- 'asf', 'wmv', 'wma',
- '3gp', '3g2',
- 'mp3',
- 'flac',
- 'ape',
- 'wav',
- 'f4f', 'f4m', 'm3u8', 'smil')
+# From https://github.com/python/cpython/blob/3.11/Lib/email/_parseaddr.py#L36-L42
+TIMEZONE_NAMES = {
+ 'UT': 0, 'UTC': 0, 'GMT': 0, 'Z': 0,
+ 'AST': -4, 'ADT': -3, # Atlantic (used in Canada)
+ 'EST': -5, 'EDT': -4, # Eastern
+ 'CST': -6, 'CDT': -5, # Central
+ 'MST': -7, 'MDT': -6, # Mountain
+ 'PST': -8, 'PDT': -7 # Pacific
+}
# needed for sanitizing filenames in restricted mode
ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ',
'%d/%m/%Y',
'%d/%m/%y',
'%d/%m/%Y %H:%M:%S',
+ '%d-%m-%Y %H:%M',
])
DATE_FORMATS_MONTH_FIRST = list(DATE_FORMATS)
])
PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
-JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?})\s*</script>'
+JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?}|\[.+?\])\s*</script>'
NUMBER_RE = r'\d+(?:\.\d+)?'
return [whole for _, whole in get_elements_text_and_html_by_attribute(*args, **kwargs)]
-def get_elements_text_and_html_by_attribute(attribute, value, html, escape_value=True):
+def get_elements_text_and_html_by_attribute(attribute, value, html, *, tag=r'[\w:.-]+', escape_value=True):
"""
Return the text (content) and the html (whole) of the tag with the specified
attribute in the passed HTML document
value = re.escape(value) if escape_value else value
partial_element_re = rf'''(?x)
- <(?P<tag>[a-zA-Z0-9:._-]+)
+ <(?P<tag>{tag})
(?:\s(?:[^>"']|"[^"]*"|'[^']*')*)?
\s{re.escape(attribute)}\s*=\s*(?P<_q>['"]{quote})(?-x:{value})(?P=_q)
'''
def decode(self, s):
if self.transform_source:
s = self.transform_source(s)
- if self.ignore_extra:
- return self.raw_decode(s.lstrip())[0]
- return super().decode(s)
+ try:
+ if self.ignore_extra:
+ return self.raw_decode(s.lstrip())[0]
+ return super().decode(s)
+ except json.JSONDecodeError as e:
+ if e.pos is not None:
+ raise type(e)(f'{e.msg} in {s[e.pos-10:e.pos+10]!r}', s, e.pos)
+ raise
def sanitize_open(filename, open_mode):
if filename == '-':
if sys.platform == 'win32':
import msvcrt
- msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
+
+ # stdout may be any IO stream, e.g. when using contextlib.redirect_stdout
+ with contextlib.suppress(io.UnsupportedOperation):
+ msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
for attempt in range(2):
return ACCENT_CHARS[char]
elif not restricted and char == '\n':
return '\0 '
+ elif is_id is NO_DEFAULT and not restricted and char in '"*:<>?|/\\':
+ # Replace with their full-width unicode counterparts
+ return {'/': '\u29F8', '\\': '\u29f9'}.get(char, chr(ord(char) + 0xfee0))
elif char == '?' or ord(char) < 32 or ord(char) == 127:
return ''
elif char == '"':
return '\0_'
return char
+ if restricted and is_id is NO_DEFAULT:
+ s = unicodedata.normalize('NFKC', s)
s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s) # Handle timestamps
result = ''.join(map(replace_insane, s))
if is_id is NO_DEFAULT:
return os.path.join(*sanitized_path)
-def sanitize_url(url):
+def sanitize_url(url, *, scheme='http'):
# Prepend protocol-less URLs with `http:` scheme in order to mitigate
# the number of unwanted failures due to missing protocol
if url is None:
return
elif url.startswith('//'):
- return 'http:%s' % url
+ return f'{scheme}:{url}'
# Fix some common typos seen so far
COMMON_TYPOS = (
# https://github.com/ytdl-org/youtube-dl/issues/15649
if entity in html.entities.name2codepoint:
return chr(html.entities.name2codepoint[entity])
- # TODO: HTML5 allows entities without a semicolon. For example,
- # 'Éric' should be decoded as 'Éric'.
+ # TODO: HTML5 allows entities without a semicolon.
+ # E.g. 'Éric' should be decoded as 'Éric'.
if entity_with_semicolon in html.entities.html5:
return html.entities.html5[entity_with_semicolon]
def process_communicate_or_kill(p, *args, **kwargs):
- write_string('DeprecationWarning: yt_dlp.utils.process_communicate_or_kill is deprecated '
- 'and may be removed in a future version. Use yt_dlp.utils.Popen.communicate_or_kill instead')
+ deprecation_warning(f'"{__name__}.process_communicate_or_kill" is deprecated and may be removed '
+ f'in a future version. Use "{__name__}.Popen.communicate_or_kill" instead')
return Popen.communicate_or_kill(p, *args, **kwargs)
else:
_startupinfo = None
- def __init__(self, *args, text=False, **kwargs):
+ @staticmethod
+ def _fix_pyinstaller_ld_path(env):
+ """Restore LD_LIBRARY_PATH when using PyInstaller
+ Ref: https://github.com/pyinstaller/pyinstaller/blob/develop/doc/runtime-information.rst#ld_library_path--libpath-considerations
+ https://github.com/yt-dlp/yt-dlp/issues/4573
+ """
+ if not hasattr(sys, '_MEIPASS'):
+ return
+
+ def _fix(key):
+ orig = env.get(f'{key}_ORIG')
+ if orig is None:
+ env.pop(key, None)
+ else:
+ env[key] = orig
+
+ _fix('LD_LIBRARY_PATH') # Linux
+ _fix('DYLD_LIBRARY_PATH') # macOS
+
+ def __init__(self, *args, env=None, text=False, **kwargs):
+ if env is None:
+ env = os.environ.copy()
+ self._fix_pyinstaller_ld_path(env)
+
if text is True:
kwargs['universal_newlines'] = True # For 3.6 compatibility
kwargs.setdefault('encoding', 'utf-8')
kwargs.setdefault('errors', 'replace')
- super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
+ super().__init__(*args, env=env, **kwargs, startupinfo=self._startupinfo)
def communicate_or_kill(self, *args, **kwargs):
try:
self.wait(timeout=timeout)
@classmethod
- def run(cls, *args, **kwargs):
+ def run(cls, *args, timeout=None, **kwargs):
with cls(*args, **kwargs) as proc:
- stdout, stderr = proc.communicate_or_kill()
- return stdout or '', stderr or '', proc.returncode
+ default = '' if proc.text_mode else b''
+ stdout, stderr = proc.communicate_or_kill(timeout=timeout)
+ return stdout or default, stderr or default, proc.returncode
def get_subprocess_encoding():
raise
+def is_path_like(f):
+ return isinstance(f, (str, bytes, os.PathLike))
+
+
class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
"""
See [1] for cookie file format.
def __init__(self, filename=None, *args, **kwargs):
super().__init__(None, *args, **kwargs)
- if self.is_path(filename):
+ if is_path_like(filename):
filename = os.fspath(filename)
self.filename = filename
def _true_or_false(cndn):
return 'TRUE' if cndn else 'FALSE'
- @staticmethod
- def is_path(file):
- return isinstance(file, (str, bytes, os.PathLike))
-
@contextlib.contextmanager
def open(self, file, *, write=False):
- if self.is_path(file):
+ if is_path_like(file):
with open(file, 'w' if write else 'r', encoding='utf-8') as f:
yield f
else:
if f'{line.strip()} '[0] in '[{"':
raise http.cookiejar.LoadError(
'Cookies file must be Netscape formatted, not JSON. See '
- 'https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl')
+ 'https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp')
write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
continue
cf.seek(0)
$)
''', date_str)
if not m:
- timezone = datetime.timedelta()
+ m = re.search(r'\d{1,2}:\d{1,2}(?:\.\d+)?(?P<tz>\s*[A-Z]+)$', date_str)
+ timezone = TIMEZONE_NAMES.get(m and m.group('tz').strip())
+ if timezone is not None:
+ date_str = date_str[:-len(m.group('tz'))]
+ timezone = datetime.timedelta(hours=timezone or 0)
else:
date_str = date_str[:-len(m.group('tz'))]
if not m.group('sign'):
if date_str is None:
return None
- date_str = re.sub(r'[,|]', '', date_str)
+ date_str = re.sub(r'\s+', ' ', re.sub(
+ r'(?i)[,|]|(mon|tues?|wed(nes)?|thu(rs)?|fri|sat(ur)?)(day)?', '', date_str))
pm_delta = 12 if re.search(r'(?i)PM', date_str) else 0
timezone, date_str = extract_timezone(date_str)
with contextlib.suppress(ValueError):
dt = datetime.datetime.strptime(date_str, expression) - timezone + datetime.timedelta(hours=pm_delta)
return calendar.timegm(dt.timetuple())
+
timetuple = email.utils.parsedate_tz(date_str)
if timetuple:
- return calendar.timegm(timetuple) + pm_delta * 3600
+ return calendar.timegm(timetuple) + pm_delta * 3600 - timezone.total_seconds()
def determine_ext(url, default_ext='unknown_video'):
def platform_name():
""" Returns the platform name as a str """
- write_string('DeprecationWarning: yt_dlp.utils.platform_name is deprecated, use platform.platform instead')
+ deprecation_warning(f'"{__name__}.platform_name" is deprecated, use "platform.platform" instead')
return platform.platform()
python_implementation = platform.python_implementation()
if python_implementation == 'PyPy' and hasattr(sys, 'pypy_version_info'):
python_implementation += ' version %d.%d.%d' % sys.pypy_version_info[:3]
+ libc_ver = []
+ with contextlib.suppress(OSError): # We may not have access to the executable
+ libc_ver = platform.libc_ver()
return 'Python %s (%s %s) - %s %s' % (
platform.python_version(),
python_implementation,
platform.architecture()[0],
platform.platform(),
- format_field(join_nonempty(*platform.libc_ver(), delim=' '), None, '(%s)'),
+ format_field(join_nonempty(*libc_ver, delim=' '), None, '(%s)'),
)
out.flush()
+def deprecation_warning(msg, *, printer=None, stacklevel=0, **kwargs):
+ from . import _IN_CLI
+ if _IN_CLI:
+ if msg in deprecation_warning._cache:
+ return
+ deprecation_warning._cache.add(msg)
+ if printer:
+ return printer(f'{msg}{bug_reports_message()}', **kwargs)
+ return write_string(f'ERROR: {msg}{bug_reports_message()}\n', **kwargs)
+ else:
+ import warnings
+ warnings.warn(DeprecationWarning(msg), stacklevel=stacklevel + 3)
+
+
+deprecation_warning._cache = set()
+
+
def bytes_to_intlist(bs):
if not bs:
return []
# Cross-platform file locking
if sys.platform == 'win32':
+ import ctypes
import ctypes.wintypes
import msvcrt
def setproctitle(title):
assert isinstance(title, str)
- # ctypes in Jython is not complete
- # http://bugs.jython.org/issue2148
- if sys.platform.startswith('java'):
+ # Workaround for https://github.com/yt-dlp/yt-dlp/issues/4541
+ try:
+ import ctypes
+ except ImportError:
return
try:
def base_url(url):
- return re.match(r'https?://[^?#&]+/', url).group()
+ return re.match(r'https?://[^?#]+/', url).group()
def urljoin(base, path):
datetime_object = None
try:
if isinstance(timestamp, (int, float)): # unix timestamp
- datetime_object = datetime.datetime.utcfromtimestamp(timestamp)
+ # Using naive datetime here can break timestamp() in Windows
+ # Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
+ datetime_object = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
elif isinstance(timestamp, str): # assume YYYYMMDD
datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
+ date_format = re.sub( # Support %s on windows
+ r'(?<!%)(%%)*%s', rf'\g<1>{int(datetime_object.timestamp())}', date_format)
return datetime_object.strftime(date_format)
except (ValueError, TypeError, AttributeError):
return default
return out, content_type
+def variadic(x, allowed_types=(str, bytes, dict)):
+ return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
+
+
def dict_get(d, key_or_keys, default=None, skip_false_values=True):
for val in map(d.get, variadic(key_or_keys)):
if val is not None and (val or not skip_false_values):
for f in funcs:
try:
val = f(*args, **kwargs)
- except (AttributeError, KeyError, TypeError, IndexError, ZeroDivisionError):
+ except (AttributeError, KeyError, TypeError, IndexError, ValueError, ZeroDivisionError):
pass
else:
if expected_type is None or isinstance(val, expected_type):
r'\g<callback_data>', code)
-def js_to_json(code, vars={}):
+def js_to_json(code, vars={}, *, strict=False):
# vars is a dict of var, val pairs to substitute
+ STRING_QUOTES = '\'"'
+ STRING_RE = '|'.join(rf'{q}(?:\\.|[^\\{q}])*{q}' for q in STRING_QUOTES)
COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
INTEGER_TABLE = (
(fr'(?s)^(0+[0-7]+){SKIP_RE}:?$', 8),
)
+ def process_escape(match):
+ JSON_PASSTHROUGH_ESCAPES = R'"\bfnrtu'
+ escape = match.group(1) or match.group(2)
+
+ return (Rf'\{escape}' if escape in JSON_PASSTHROUGH_ESCAPES
+ else R'\u00' if escape == 'x'
+ else '' if escape == '\n'
+ else escape)
+
def fix_kv(m):
v = m.group(0)
if v in ('true', 'false', 'null'):
elif v in ('undefined', 'void 0'):
return 'null'
elif v.startswith('/*') or v.startswith('//') or v.startswith('!') or v == ',':
- return ""
-
- if v[0] in ("'", '"'):
- v = re.sub(r'(?s)\\.|"', lambda m: {
- '"': '\\"',
- "\\'": "'",
- '\\\n': '',
- '\\x': '\\u00',
- }.get(m.group(0), m.group(0)), v[1:-1])
- else:
- for regex, base in INTEGER_TABLE:
- im = re.match(regex, v)
- if im:
- i = int(im.group(1), base)
- return '"%d":' % i if v.endswith(':') else '%d' % i
+ return ''
- if v in vars:
- return vars[v]
+ if v[0] in STRING_QUOTES:
+ escaped = re.sub(r'(?s)(")|\\(.)', process_escape, v[1:-1])
+ return f'"{escaped}"'
- return '"%s"' % v
+ for regex, base in INTEGER_TABLE:
+ im = re.match(regex, v)
+ if im:
+ i = int(im.group(1), base)
+ return f'"{i}":' if v.endswith(':') else str(i)
+
+ if v in vars:
+ return json.dumps(vars[v])
+
+ if not strict:
+ return f'"{v}"'
+
+ raise ValueError(f'Unknown value: {v}')
def create_map(mobj):
return json.dumps(dict(json.loads(js_to_json(mobj.group(1) or '[]', vars=vars))))
- code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
code = re.sub(r'new Map\((\[.*?\])?\)', create_map, code)
+ if not strict:
+ code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
+ code = re.sub(r'new \w+\((.*?)\)', lambda m: json.dumps(m.group(0)), code)
- return re.sub(r'''(?sx)
- "(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
- '(?:[^'\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^'\\]*'|
- {comment}|,(?={skip}[\]}}])|
+ return re.sub(rf'''(?sx)
+ {STRING_RE}|
+ {COMMENT_RE}|,(?={SKIP_RE}[\]}}])|
void\s0|(?:(?<![0-9])[eE]|[a-df-zA-DF-Z_$])[.a-zA-Z_$0-9]*|
- \b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{skip}:)?|
- [0-9]+(?={skip}:)|
+ \b(?:0[xX][0-9a-fA-F]+|0+[0-7]+)(?:{SKIP_RE}:)?|
+ [0-9]+(?={SKIP_RE}:)|
!+
- '''.format(comment=COMMENT_RE, skip=SKIP_RE), fix_kv, code)
+ ''', fix_kv, code)
def qualities(quality_ids):
return {}
+def get_compatible_ext(*, vcodecs, acodecs, vexts, aexts, preferences=None):
+ assert len(vcodecs) == len(vexts) and len(acodecs) == len(aexts)
+
+ allow_mkv = not preferences or 'mkv' in preferences
+
+ if allow_mkv and max(len(acodecs), len(vcodecs)) > 1:
+ return 'mkv' # TODO: any other format allows this?
+
+ # TODO: All codecs supported by parse_codecs isn't handled here
+ COMPATIBLE_CODECS = {
+ 'mp4': {
+ 'av1', 'hevc', 'avc1', 'mp4a', # fourcc (m3u8, mpd)
+ 'h264', 'aacl', 'ec-3', # Set in ISM
+ },
+ 'webm': {
+ 'av1', 'vp9', 'vp8', 'opus', 'vrbs',
+ 'vp9x', 'vp8x', # in the webm spec
+ },
+ }
+
+ sanitize_codec = functools.partial(try_get, getter=lambda x: x[0].split('.')[0].replace('0', ''))
+ vcodec, acodec = sanitize_codec(vcodecs), sanitize_codec(acodecs)
+
+ for ext in preferences or COMPATIBLE_CODECS.keys():
+ codec_set = COMPATIBLE_CODECS.get(ext, set())
+ if ext == 'mkv' or codec_set.issuperset((vcodec, acodec)):
+ return ext
+
+ COMPATIBLE_EXTS = (
+ {'mp3', 'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'ismv', 'isma', 'mov'},
+ {'webm'},
+ )
+ for ext in preferences or vexts:
+ current_exts = {ext, *vexts, *aexts}
+ if ext == 'mkv' or current_exts == {ext} or any(
+ ext_sets.issuperset(current_exts) for ext_sets in COMPATIBLE_EXTS):
+ return ext
+ return 'mkv' if allow_mkv else preferences[-1]
+
+
def urlhandle_detect_ext(url_handle):
getheader = url_handle.headers.get
ext = determine_ext(url)
if ext == 'm3u8':
- return 'm3u8'
+ return 'm3u8' if info_dict.get('is_live') else 'm3u8_native'
elif ext == 'f4m':
return 'f4m'
self.chapters, self.ranges = chapters, ranges
def __call__(self, info_dict, ydl):
+ if not self.ranges and not self.chapters:
+ yield {}
+
warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
else 'Cannot match chapters since chapter information is unavailable')
for regex in self.chapters or []:
def decode_base(value, digits):
- write_string('DeprecationWarning: yt_dlp.utils.decode_base is deprecated '
- 'and may be removed in a future version. Use yt_dlp.decode_base_n instead')
+ deprecation_warning(f'{__name__}.decode_base is deprecated and may be removed '
+ f'in a future version. Use {__name__}.decode_base_n instead')
return decode_base_n(value, table=digits)
def traverse_obj(
- obj, *path_list, default=None, expected_type=None, get_all=True,
+ obj, *paths, default=NO_DEFAULT, expected_type=None, get_all=True,
casesense=True, is_user_input=False, traverse_string=False):
- ''' Traverse nested list/dict/tuple
- @param path_list A list of paths which are checked one by one.
- Each path is a list of keys where each key is a:
- - None: Do nothing
- - string: A dictionary key
- - int: An index into a list
- - tuple: A list of keys all of which will be traversed
- - Ellipsis: Fetch all values in the object
- - Function: Takes the key and value as arguments
- and returns whether the key matches or not
- @param default Default value to return
- @param expected_type Only accept final value of this type (Can also be any callable)
- @param get_all Return all the values obtained from a path or only the first one
- @param casesense Whether to consider dictionary keys as case sensitive
- @param is_user_input Whether the keys are generated from user input. If True,
- strings are converted to int/slice if necessary
- @param traverse_string Whether to traverse inside strings. If True, any
- non-compatible object will also be converted into a string
- # TODO: Write tests
- '''
- if not casesense:
- _lower = lambda k: (k.lower() if isinstance(k, str) else k)
- path_list = (map(_lower, variadic(path)) for path in path_list)
-
- def _traverse_obj(obj, path, _current_depth=0):
- nonlocal depth
- path = tuple(variadic(path))
- for i, key in enumerate(path):
- if None in (key, obj):
- return obj
- if isinstance(key, (list, tuple)):
- obj = [_traverse_obj(obj, sub_key, _current_depth) for sub_key in key]
- key = ...
- if key is ...:
- obj = (obj.values() if isinstance(obj, dict)
- else obj if isinstance(obj, (list, tuple, LazyList))
- else str(obj) if traverse_string else [])
- _current_depth += 1
- depth = max(depth, _current_depth)
- return [_traverse_obj(inner_obj, path[i + 1:], _current_depth) for inner_obj in obj]
- elif callable(key):
- if isinstance(obj, (list, tuple, LazyList)):
- obj = enumerate(obj)
- elif isinstance(obj, dict):
- obj = obj.items()
- else:
- if not traverse_string:
- return None
- obj = str(obj)
- _current_depth += 1
- depth = max(depth, _current_depth)
- return [_traverse_obj(v, path[i + 1:], _current_depth) for k, v in obj if try_call(key, args=(k, v))]
- elif isinstance(obj, dict) and not (is_user_input and key == ':'):
- obj = (obj.get(key) if casesense or (key in obj)
- else next((v for k, v in obj.items() if _lower(k) == key), None))
- else:
- if is_user_input:
- key = (int_or_none(key) if ':' not in key
- else slice(*map(int_or_none, key.split(':'))))
- if key == slice(None):
- return _traverse_obj(obj, (..., *path[i + 1:]), _current_depth)
- if not isinstance(key, (int, slice)):
- return None
- if not isinstance(obj, (list, tuple, LazyList)):
- if not traverse_string:
- return None
- obj = str(obj)
- try:
- obj = obj[key]
- except IndexError:
- return None
- return obj
+ """
+ Safely traverse nested `dict`s and `Sequence`s
+
+ >>> obj = [{}, {"key": "value"}]
+ >>> traverse_obj(obj, (1, "key"))
+ "value"
+
+ Each of the provided `paths` is tested and the first producing a valid result will be returned.
+ The next path will also be tested if the path branched but no results could be found.
+ Supported values for traversal are `Mapping`, `Sequence` and `re.Match`.
+ A value of None is treated as the absence of a value.
+
+ The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`.
+
+ The keys in the path can be one of:
+ - `None`: Return the current object.
+ - `str`/`int`: Return `obj[key]`. For `re.Match, return `obj.group(key)`.
+ - `slice`: Branch out and return all values in `obj[key]`.
+ - `Ellipsis`: Branch out and return a list of all values.
+ - `tuple`/`list`: Branch out and return a list of all matching values.
+ Read as: `[traverse_obj(obj, branch) for branch in branches]`.
+ - `function`: Branch out and return values filtered by the function.
+ Read as: `[value for key, value in obj if function(key, value)]`.
+ For `Sequence`s, `key` is the index of the value.
+ - `dict` Transform the current object and return a matching dict.
+ Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`.
+
+ `tuple`, `list`, and `dict` all support nested paths and branches.
+
+ @params paths Paths which to traverse by.
+ @param default Value to return if the paths do not match.
+ @param expected_type If a `type`, only accept final values of this type.
+ If any other callable, try to call the function on each result.
+ @param get_all If `False`, return the first matching result, otherwise all matching ones.
+ @param casesense If `False`, consider string dictionary keys as case insensitive.
+
+ The following are only meant to be used by YoutubeDL.prepare_outtmpl and are not part of the API
+
+ @param is_user_input Whether the keys are generated from user input.
+ If `True` strings get converted to `int`/`slice` if needed.
+ @param traverse_string Whether to traverse into objects as strings.
+ If `True`, any non-compatible object will first be
+ converted into a string and then traversed into.
+
+
+ @returns The result of the object traversal.
+ If successful, `get_all=True`, and the path branches at least once,
+ then a list of results is returned instead.
+ A list is always returned if the last path branches and no `default` is given.
+ """
+ is_sequence = lambda x: isinstance(x, collections.abc.Sequence) and not isinstance(x, (str, bytes))
+ casefold = lambda k: k.casefold() if isinstance(k, str) else k
if isinstance(expected_type, type):
type_test = lambda val: val if isinstance(val, expected_type) else None
else:
- type_test = expected_type or IDENTITY
-
- for path in path_list:
- depth = 0
- val = _traverse_obj(obj, path)
- if val is not None:
- if depth:
- for _ in range(depth - 1):
- val = itertools.chain.from_iterable(v for v in val if v is not None)
- val = [v for v in map(type_test, val) if v is not None]
- if val:
- return val if get_all else val[0]
+ type_test = lambda val: try_call(expected_type or IDENTITY, args=(val,))
+
+ def apply_key(key, obj):
+ if obj is None:
+ return
+
+ elif key is None:
+ yield obj
+
+ elif isinstance(key, (list, tuple)):
+ for branch in key:
+ _, result = apply_path(obj, branch)
+ yield from result
+
+ elif key is ...:
+ if isinstance(obj, collections.abc.Mapping):
+ yield from obj.values()
+ elif is_sequence(obj):
+ yield from obj
+ elif isinstance(obj, re.Match):
+ yield from obj.groups()
+ elif traverse_string:
+ yield from str(obj)
+
+ elif callable(key):
+ if is_sequence(obj):
+ iter_obj = enumerate(obj)
+ elif isinstance(obj, collections.abc.Mapping):
+ iter_obj = obj.items()
+ elif isinstance(obj, re.Match):
+ iter_obj = enumerate((obj.group(), *obj.groups()))
+ elif traverse_string:
+ iter_obj = enumerate(str(obj))
else:
- val = type_test(val)
- if val is not None:
- return val
- return default
+ return
+ yield from (v for k, v in iter_obj if try_call(key, args=(k, v)))
+
+ elif isinstance(key, dict):
+ iter_obj = ((k, _traverse_obj(obj, v)) for k, v in key.items())
+ yield {k: v if v is not None else default for k, v in iter_obj
+ if v is not None or default is not NO_DEFAULT}
+
+ elif isinstance(obj, collections.abc.Mapping):
+ yield (obj.get(key) if casesense or (key in obj)
+ else next((v for k, v in obj.items() if casefold(k) == key), None))
+
+ elif isinstance(obj, re.Match):
+ if isinstance(key, int) or casesense:
+ with contextlib.suppress(IndexError):
+ yield obj.group(key)
+ return
+
+ if not isinstance(key, str):
+ return
+
+ yield next((v for k, v in obj.groupdict().items() if casefold(k) == key), None)
+
+ else:
+ if is_user_input:
+ key = (int_or_none(key) if ':' not in key
+ else slice(*map(int_or_none, key.split(':'))))
+
+ if not isinstance(key, (int, slice)):
+ return
+
+ if not is_sequence(obj):
+ if not traverse_string:
+ return
+ obj = str(obj)
+
+ with contextlib.suppress(IndexError):
+ yield obj[key]
+
+ def apply_path(start_obj, path):
+ objs = (start_obj,)
+ has_branched = False
+
+ for key in variadic(path):
+ if is_user_input and key == ':':
+ key = ...
+
+ if not casesense and isinstance(key, str):
+ key = key.casefold()
+
+ if key is ... or isinstance(key, (list, tuple)) or callable(key):
+ has_branched = True
+
+ key_func = functools.partial(apply_key, key)
+ objs = itertools.chain.from_iterable(map(key_func, objs))
+
+ return has_branched, objs
+
+ def _traverse_obj(obj, path, use_list=True):
+ has_branched, results = apply_path(obj, path)
+ results = LazyList(x for x in map(type_test, results) if x is not None)
+
+ if get_all and has_branched:
+ return results.exhaust() if results or use_list else None
+
+ return results[0] if results else None
+
+ for index, path in enumerate(paths, 1):
+ use_list = default is NO_DEFAULT and index == len(paths)
+ result = _traverse_obj(obj, path, use_list)
+ if result is not None:
+ return result
+
+ return None if default is NO_DEFAULT else default
def traverse_dict(dictn, keys, casesense=True):
- write_string('DeprecationWarning: yt_dlp.utils.traverse_dict is deprecated '
- 'and may be removed in a future version. Use yt_dlp.utils.traverse_obj instead')
+ deprecation_warning(f'"{__name__}.traverse_dict" is deprecated and may be removed '
+ f'in a future version. Use "{__name__}.traverse_obj" instead')
return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
return traverse_obj(obj, (..., *variadic(keys)), **kwargs, get_all=False)
-def variadic(x, allowed_types=(str, bytes, dict)):
- return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,)
-
-
def time_seconds(**kwargs):
t = datetime.datetime.now(datetime.timezone(datetime.timedelta(**kwargs)))
return t.timestamp()
# can be extended in future to verify the signature and parse header and return the algorithm used if it's not HS256
def jwt_decode_hs256(jwt):
header_b64, payload_b64, signature_b64 = jwt.split('.')
- payload_data = json.loads(base64.urlsafe_b64decode(payload_b64))
+ # add trailing ='s that may have been stripped, superfluous ='s are ignored
+ payload_data = json.loads(base64.urlsafe_b64decode(f'{payload_b64}==='))
return payload_data
self.parsed_args = self.own_args
for location in opts.config_locations or []:
if location == '-':
+ if location in self._loaded_paths:
+ continue
+ self._loaded_paths.add(location)
self.append_config(shlex.split(read_stdin('options'), comments=True), label='stdin')
continue
location = os.path.join(directory, expand_path(location))
return self.__dict__.items()
+MEDIA_EXTENSIONS = Namespace(
+ common_video=('avi', 'flv', 'mkv', 'mov', 'mp4', 'webm'),
+ video=('3g2', '3gp', 'f4v', 'mk3d', 'divx', 'mpg', 'ogv', 'm4v', 'wmv'),
+ common_audio=('aiff', 'alac', 'flac', 'm4a', 'mka', 'mp3', 'ogg', 'opus', 'wav'),
+ audio=('aac', 'ape', 'asf', 'f4a', 'f4b', 'm4b', 'm4p', 'm4r', 'oga', 'ogx', 'spx', 'vorbis', 'wma'),
+ thumbnails=('jpg', 'png', 'webp'),
+ storyboards=('mhtml', ),
+ subtitles=('srt', 'vtt', 'ass', 'lrc'),
+ manifests=('f4f', 'f4m', 'm3u8', 'smil', 'mpd'),
+)
+MEDIA_EXTENSIONS.video += MEDIA_EXTENSIONS.common_video
+MEDIA_EXTENSIONS.audio += MEDIA_EXTENSIONS.common_audio
+
+KNOWN_EXTENSIONS = (*MEDIA_EXTENSIONS.video, *MEDIA_EXTENSIONS.audio, *MEDIA_EXTENSIONS.manifests)
+
+
+class RetryManager:
+ """Usage:
+ for retry in RetryManager(...):
+ try:
+ ...
+ except SomeException as err:
+ retry.error = err
+ continue
+ """
+ attempt, _error = 0, None
+
+ def __init__(self, _retries, _error_callback, **kwargs):
+ self.retries = _retries or 0
+ self.error_callback = functools.partial(_error_callback, **kwargs)
+
+ def _should_retry(self):
+ return self._error is not NO_DEFAULT and self.attempt <= self.retries
+
+ @property
+ def error(self):
+ if self._error is NO_DEFAULT:
+ return None
+ return self._error
+
+ @error.setter
+ def error(self, value):
+ self._error = value
+
+ def __iter__(self):
+ while self._should_retry():
+ self.error = NO_DEFAULT
+ self.attempt += 1
+ yield self
+ if self.error:
+ self.error_callback(self.error, self.attempt, self.retries)
+
+ @staticmethod
+ def report_retry(e, count, retries, *, sleep_func, info, warn, error=None, suffix=None):
+ """Utility function for reporting retries"""
+ if count > retries:
+ if error:
+ return error(f'{e}. Giving up after {count - 1} retries') if count > 1 else error(str(e))
+ raise e
+
+ if not count:
+ return warn(e)
+ elif isinstance(e, ExtractorError):
+ e = remove_end(str_or_none(e.cause) or e.orig_msg, '.')
+ warn(f'{e}. Retrying{format_field(suffix, None, " %s")} ({count}/{retries})...')
+
+ delay = float_or_none(sleep_func(n=count - 1)) if callable(sleep_func) else sleep_func
+ if delay:
+ info(f'Sleeping {delay:.2f} seconds ...')
+ time.sleep(delay)
+
+
+def make_archive_id(ie, video_id):
+ ie_key = ie if isinstance(ie, str) else ie.ie_key()
+ return f'{ie_key.lower()} {video_id}'
+
+
+def truncate_string(s, left, right=0):
+ assert left > 3 and right >= 0
+ if s is None or len(s) <= left + right:
+ return s
+ return f'{s[:left-3]}...{s[-right:]}'
+
+
+def orderedSet_from_options(options, alias_dict, *, use_regex=False, start=None):
+ assert 'all' in alias_dict, '"all" alias is required'
+ requested = list(start or [])
+ for val in options:
+ discard = val.startswith('-')
+ if discard:
+ val = val[1:]
+
+ if val in alias_dict:
+ val = alias_dict[val] if not discard else [
+ i[1:] if i.startswith('-') else f'-{i}' for i in alias_dict[val]]
+ # NB: Do not allow regex in aliases for performance
+ requested = orderedSet_from_options(val, alias_dict, start=requested)
+ continue
+
+ current = (filter(re.compile(val, re.I).fullmatch, alias_dict['all']) if use_regex
+ else [val] if val in alias_dict['all'] else None)
+ if current is None:
+ raise ValueError(val)
+
+ if discard:
+ for item in current:
+ while item in requested:
+ requested.remove(item)
+ else:
+ requested.extend(current)
+
+ return orderedSet(requested)
+
+
# Deprecated
has_certifi = bool(certifi)
has_websockets = bool(websockets)