if sys.platform == 'win32':
import msvcrt
- # stdout may be any IO stream. Eg, when using contextlib.redirect_stdout
+ # stdout may be any IO stream, e.g. when using contextlib.redirect_stdout
with contextlib.suppress(io.UnsupportedOperation):
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
if entity in html.entities.name2codepoint:
return chr(html.entities.name2codepoint[entity])
- # TODO: HTML5 allows entities without a semicolon. For example,
- # 'Éric' should be decoded as 'Éric'.
+ # TODO: HTML5 allows entities without a semicolon.
+ # E.g. 'Éric' should be decoded as 'Éric'.
if entity_with_semicolon in html.entities.html5:
return html.entities.html5[entity_with_semicolon]
def process_communicate_or_kill(p, *args, **kwargs):
- write_string('DeprecationWarning: yt_dlp.utils.process_communicate_or_kill is deprecated '
- 'and may be removed in a future version. Use yt_dlp.utils.Popen.communicate_or_kill instead')
+ deprecation_warning(f'"{__name__}.process_communicate_or_kill" is deprecated and may be removed '
+ f'in a future version. Use "{__name__}.Popen.communicate_or_kill" instead')
return Popen.communicate_or_kill(p, *args, **kwargs)
else:
_startupinfo = None
- def __init__(self, *args, text=False, **kwargs):
+ @staticmethod
+ def _fix_pyinstaller_ld_path(env):
+ """Restore LD_LIBRARY_PATH when using PyInstaller
+ Ref: https://github.com/pyinstaller/pyinstaller/blob/develop/doc/runtime-information.rst#ld_library_path--libpath-considerations
+ https://github.com/yt-dlp/yt-dlp/issues/4573
+ """
+ if not hasattr(sys, '_MEIPASS'):
+ return
+
+ def _fix(key):
+ orig = env.get(f'{key}_ORIG')
+ if orig is None:
+ env.pop(key, None)
+ else:
+ env[key] = orig
+
+ _fix('LD_LIBRARY_PATH') # Linux
+ _fix('DYLD_LIBRARY_PATH') # macOS
+
+ def __init__(self, *args, env=None, text=False, **kwargs):
+ if env is None:
+ env = os.environ.copy()
+ self._fix_pyinstaller_ld_path(env)
+
if text is True:
kwargs['universal_newlines'] = True # For 3.6 compatibility
kwargs.setdefault('encoding', 'utf-8')
kwargs.setdefault('errors', 'replace')
- super().__init__(*args, **kwargs, startupinfo=self._startupinfo)
+ super().__init__(*args, env=env, **kwargs, startupinfo=self._startupinfo)
def communicate_or_kill(self, *args, **kwargs):
try:
self.wait(timeout=timeout)
@classmethod
- def run(cls, *args, **kwargs):
+ def run(cls, *args, timeout=None, **kwargs):
with cls(*args, **kwargs) as proc:
- stdout, stderr = proc.communicate_or_kill()
+ stdout, stderr = proc.communicate_or_kill(timeout=timeout)
return stdout or '', stderr or '', proc.returncode
raise
+def is_path_like(f):
+ return isinstance(f, (str, bytes, os.PathLike))
+
+
class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
"""
See [1] for cookie file format.
def __init__(self, filename=None, *args, **kwargs):
super().__init__(None, *args, **kwargs)
- if self.is_path(filename):
+ if is_path_like(filename):
filename = os.fspath(filename)
self.filename = filename
def _true_or_false(cndn):
return 'TRUE' if cndn else 'FALSE'
- @staticmethod
- def is_path(file):
- return isinstance(file, (str, bytes, os.PathLike))
-
@contextlib.contextmanager
def open(self, file, *, write=False):
- if self.is_path(file):
+ if is_path_like(file):
with open(file, 'w' if write else 'r', encoding='utf-8') as f:
yield f
else:
if f'{line.strip()} '[0] in '[{"':
raise http.cookiejar.LoadError(
'Cookies file must be Netscape formatted, not JSON. See '
- 'https://github.com/ytdl-org/youtube-dl#how-do-i-pass-cookies-to-youtube-dl')
+ 'https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp')
write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
continue
cf.seek(0)
def platform_name():
""" Returns the platform name as a str """
- write_string('DeprecationWarning: yt_dlp.utils.platform_name is deprecated, use platform.platform instead')
+ deprecation_warning(f'"{__name__}.platform_name" is deprecated, use "platform.platform" instead')
return platform.platform()
out.flush()
+def deprecation_warning(msg, *, printer=None, stacklevel=0, **kwargs):
+ from . import _IN_CLI
+ if _IN_CLI:
+ if msg in deprecation_warning._cache:
+ return
+ deprecation_warning._cache.add(msg)
+ if printer:
+ return printer(f'{msg}{bug_reports_message()}', **kwargs)
+ return write_string(f'ERROR: {msg}{bug_reports_message()}\n', **kwargs)
+ else:
+ import warnings
+ warnings.warn(DeprecationWarning(msg), stacklevel=stacklevel + 3)
+
+
+deprecation_warning._cache = set()
+
+
def bytes_to_intlist(bs):
if not bs:
return []
def base_url(url):
- return re.match(r'https?://[^?#&]+/', url).group()
+ return re.match(r'https?://[^?#]+/', url).group()
def urljoin(base, path):
},
}
- sanitize_codec = functools.partial(try_get, getter=lambda x: x.split('.')[0].replace('0', ''))
- vcodec, acodec = sanitize_codec(vcodecs[0]), sanitize_codec(acodecs[0])
+ sanitize_codec = functools.partial(try_get, getter=lambda x: x[0].split('.')[0].replace('0', ''))
+ vcodec, acodec = sanitize_codec(vcodecs), sanitize_codec(acodecs)
for ext in preferences or COMPATIBLE_CODECS.keys():
codec_set = COMPATIBLE_CODECS.get(ext, set())
ext = determine_ext(url)
if ext == 'm3u8':
- return 'm3u8'
+ return 'm3u8' if info_dict.get('is_live') else 'm3u8_native'
elif ext == 'f4m':
return 'f4m'
def decode_base(value, digits):
- write_string('DeprecationWarning: yt_dlp.utils.decode_base is deprecated '
- 'and may be removed in a future version. Use yt_dlp.decode_base_n instead')
+ deprecation_warning(f'{__name__}.decode_base is deprecated and may be removed '
+ f'in a future version. Use {__name__}.decode_base_n instead')
return decode_base_n(value, table=digits)
@param path_list A list of paths which are checked one by one.
Each path is a list of keys where each key is a:
- None: Do nothing
- - string: A dictionary key
+ - string: A dictionary key / regex group
- int: An index into a list
- tuple: A list of keys all of which will be traversed
- Ellipsis: Fetch all values in the object
@param expected_type Only accept final value of this type (Can also be any callable)
@param get_all Return all the values obtained from a path or only the first one
@param casesense Whether to consider dictionary keys as case sensitive
+
+ The following are only meant to be used by YoutubeDL.prepare_outtmpl and is not part of the API
+
+ @param path_list In addition to the above,
+ - dict: Given {k:v, ...}; return {k: traverse_obj(obj, v), ...}
@param is_user_input Whether the keys are generated from user input. If True,
strings are converted to int/slice if necessary
@param traverse_string Whether to traverse inside strings. If True, any
non-compatible object will also be converted into a string
- # TODO: Write tests
- '''
+ ''' # TODO: Write tests
if not casesense:
_lower = lambda k: (k.lower() if isinstance(k, str) else k)
path_list = (map(_lower, variadic(path)) for path in path_list)
if isinstance(key, (list, tuple)):
obj = [_traverse_obj(obj, sub_key, _current_depth) for sub_key in key]
key = ...
+
if key is ...:
obj = (obj.values() if isinstance(obj, dict)
else obj if isinstance(obj, (list, tuple, LazyList))
_current_depth += 1
depth = max(depth, _current_depth)
return [_traverse_obj(inner_obj, path[i + 1:], _current_depth) for inner_obj in obj]
+ elif isinstance(key, dict):
+ obj = filter_dict({k: _traverse_obj(obj, v, _current_depth) for k, v in key.items()})
elif callable(key):
if isinstance(obj, (list, tuple, LazyList)):
obj = enumerate(obj)
def traverse_dict(dictn, keys, casesense=True):
- write_string('DeprecationWarning: yt_dlp.utils.traverse_dict is deprecated '
- 'and may be removed in a future version. Use yt_dlp.utils.traverse_obj instead')
+ deprecation_warning(f'"{__name__}.traverse_dict" is deprecated and may be removed '
+ f'in a future version. Use "{__name__}.traverse_obj" instead')
return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
if not count:
return warn(e)
elif isinstance(e, ExtractorError):
- e = remove_end(str(e.cause) or e.orig_msg, '.')
+ e = remove_end(str_or_none(e.cause) or e.orig_msg, '.')
warn(f'{e}. Retrying{format_field(suffix, None, " %s")} ({count}/{retries})...')
delay = float_or_none(sleep_func(n=count - 1)) if callable(sleep_func) else sleep_func
return f'{s[:left-3]}...{s[-right:]}'
+def orderedSet_from_options(options, alias_dict, *, use_regex=False, start=None):
+ assert 'all' in alias_dict, '"all" alias is required'
+ requested = list(start or [])
+ for val in options:
+ discard = val.startswith('-')
+ if discard:
+ val = val[1:]
+
+ if val in alias_dict:
+ val = alias_dict[val] if not discard else [
+ i[1:] if i.startswith('-') else f'-{i}' for i in alias_dict[val]]
+ # NB: Do not allow regex in aliases for performance
+ requested = orderedSet_from_options(val, alias_dict, start=requested)
+ continue
+
+ current = (filter(re.compile(val, re.I).fullmatch, alias_dict['all']) if use_regex
+ else [val] if val in alias_dict['all'] else None)
+ if current is None:
+ raise ValueError(val)
+
+ if discard:
+ for item in current:
+ while item in requested:
+ requested.remove(item)
+ else:
+ requested.extend(current)
+
+ return orderedSet(requested)
+
+
# Deprecated
has_certifi = bool(certifi)
has_websockets = bool(websockets)