]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils.py
[docs] Consistent use of `e.g.` (#4643)
[yt-dlp.git] / yt_dlp / utils.py
index fd6c206823130a92f6eb2b60123917ffd64bb1ed..e64d35936556d0e33de07a61591f2eb1e7818f70 100644 (file)
@@ -1,3 +1,4 @@
+import asyncio
 import atexit
 import base64
 import binascii
@@ -5,7 +6,6 @@
 import codecs
 import collections
 import contextlib
-import ctypes
 import datetime
 import email.header
 import email.utils
@@ -18,6 +18,7 @@
 import http.client
 import http.cookiejar
 import importlib.util
+import inspect
 import io
 import itertools
 import json
 import time
 import traceback
 import types
+import unicodedata
 import urllib.error
 import urllib.parse
 import urllib.request
 import xml.etree.ElementTree
 import zlib
 
-from .compat import asyncio, functools  # isort: split
+from .compat import functools  # isort: split
 from .compat import (
     compat_etree_fromstring,
     compat_expanduser,
@@ -148,20 +150,15 @@ def random_user_agent():
         'juillet', 'août', 'septembre', 'octobre', 'novembre', 'décembre'],
 }
 
-KNOWN_EXTENSIONS = (
-    'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'aac',
-    'flv', 'f4v', 'f4a', 'f4b',
-    'webm', 'ogg', 'ogv', 'oga', 'ogx', 'spx', 'opus',
-    'mkv', 'mka', 'mk3d',
-    'avi', 'divx',
-    'mov',
-    'asf', 'wmv', 'wma',
-    '3gp', '3g2',
-    'mp3',
-    'flac',
-    'ape',
-    'wav',
-    'f4f', 'f4m', 'm3u8', 'smil')
+# From https://github.com/python/cpython/blob/3.11/Lib/email/_parseaddr.py#L36-L42
+TIMEZONE_NAMES = {
+    'UT': 0, 'UTC': 0, 'GMT': 0, 'Z': 0,
+    'AST': -4, 'ADT': -3,  # Atlantic (used in Canada)
+    'EST': -5, 'EDT': -4,  # Eastern
+    'CST': -6, 'CDT': -5,  # Central
+    'MST': -7, 'MDT': -6,  # Mountain
+    'PST': -8, 'PDT': -7   # Pacific
+}
 
 # needed for sanitizing filenames in restricted mode
 ACCENT_CHARS = dict(zip('ÂÃÄÀÁÅÆÇÈÉÊËÌÍÎÏÐÑÒÓÔÕÖŐØŒÙÚÛÜŰÝÞßàáâãäåæçèéêëìíîïðñòóôõöőøœùúûüűýþÿ',
@@ -221,6 +218,7 @@ def random_user_agent():
     '%d/%m/%Y',
     '%d/%m/%y',
     '%d/%m/%Y %H:%M:%S',
+    '%d-%m-%Y %H:%M',
 ])
 
 DATE_FORMATS_MONTH_FIRST = list(DATE_FORMATS)
@@ -233,7 +231,7 @@ def random_user_agent():
 ])
 
 PACKED_CODES_RE = r"}\('(.+)',(\d+),(\d+),'([^']+)'\.split\('\|'\)"
-JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>(?P<json_ld>.+?)</script>'
+JSON_LD_RE = r'(?is)<script[^>]+type=(["\']?)application/ld\+json\1[^>]*>\s*(?P<json_ld>{.+?})\s*</script>'
 
 NUMBER_RE = r'\d+(?:\.\d+)?'
 
@@ -611,7 +609,10 @@ def sanitize_open(filename, open_mode):
     if filename == '-':
         if sys.platform == 'win32':
             import msvcrt
-            msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
+
+            # stdout may be any IO stream, e.g. when using contextlib.redirect_stdout
+            with contextlib.suppress(io.UnsupportedOperation):
+                msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
         return (sys.stdout.buffer if hasattr(sys.stdout, 'buffer') else sys.stdout, filename)
 
     for attempt in range(2):
@@ -657,6 +658,9 @@ def replace_insane(char):
             return ACCENT_CHARS[char]
         elif not restricted and char == '\n':
             return '\0 '
+        elif is_id is NO_DEFAULT and not restricted and char in '"*:<>?|/\\':
+            # Replace with their full-width unicode counterparts
+            return {'/': '\u29F8', '\\': '\u29f9'}.get(char, chr(ord(char) + 0xfee0))
         elif char == '?' or ord(char) < 32 or ord(char) == 127:
             return ''
         elif char == '"':
@@ -669,11 +673,13 @@ def replace_insane(char):
             return '\0_'
         return char
 
+    if restricted and is_id is NO_DEFAULT:
+        s = unicodedata.normalize('NFKC', s)
     s = re.sub(r'[0-9]+(?::[0-9]+)+', lambda m: m.group(0).replace(':', '_'), s)  # Handle timestamps
     result = ''.join(map(replace_insane, s))
     if is_id is NO_DEFAULT:
-        result = re.sub('(\0.)(?:(?=\\1)..)+', r'\1', result)  # Remove repeated substitute chars
-        STRIP_RE = '(?:\0.|[ _-])*'
+        result = re.sub(r'(\0.)(?:(?=\1)..)+', r'\1', result)  # Remove repeated substitute chars
+        STRIP_RE = r'(?:\0.|[ _-])*'
         result = re.sub(f'^\0.{STRIP_RE}|{STRIP_RE}\0.$', '', result)  # Remove substitute chars from start/end
     result = result.replace('\0', '') or '_'
 
@@ -715,13 +721,13 @@ def sanitize_path(s, force=False):
     return os.path.join(*sanitized_path)
 
 
-def sanitize_url(url):
+def sanitize_url(url, *, scheme='http'):
     # Prepend protocol-less URLs with `http:` scheme in order to mitigate
     # the number of unwanted failures due to missing protocol
     if url is None:
         return
     elif url.startswith('//'):
-        return 'http:%s' % url
+        return f'{scheme}:{url}'
     # Fix some common typos seen so far
     COMMON_TYPOS = (
         # https://github.com/ytdl-org/youtube-dl/issues/15649
@@ -780,8 +786,8 @@ def _htmlentity_transform(entity_with_semicolon):
     if entity in html.entities.name2codepoint:
         return chr(html.entities.name2codepoint[entity])
 
-    # TODO: HTML5 allows entities without a semicolon. For example,
-    # '&Eacuteric' should be decoded as 'Éric'.
+    # TODO: HTML5 allows entities without a semicolon.
+    # E.g. '&Eacuteric' should be decoded as 'Éric'.
     if entity_with_semicolon in html.entities.html5:
         return html.entities.html5[entity_with_semicolon]
 
@@ -949,17 +955,18 @@ def make_HTTPS_handler(params, **kwargs):
     if opts_check_certificate:
         if has_certifi and 'no-certifi' not in params.get('compat_opts', []):
             context.load_verify_locations(cafile=certifi.where())
-        try:
-            context.load_default_certs()
-        # Work around the issue in load_default_certs when there are bad certificates. See:
-        # https://github.com/yt-dlp/yt-dlp/issues/1060,
-        # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
-        except ssl.SSLError:
-            # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
-            if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
-                for storename in ('CA', 'ROOT'):
-                    _ssl_load_windows_store_certs(context, storename)
-            context.set_default_verify_paths()
+        else:
+            try:
+                context.load_default_certs()
+                # Work around the issue in load_default_certs when there are bad certificates. See:
+                # https://github.com/yt-dlp/yt-dlp/issues/1060,
+                # https://bugs.python.org/issue35665, https://bugs.python.org/issue45312
+            except ssl.SSLError:
+                # enum_certificates is not present in mingw python. See https://github.com/yt-dlp/yt-dlp/issues/1151
+                if sys.platform == 'win32' and hasattr(ssl, 'enum_certificates'):
+                    for storename in ('CA', 'ROOT'):
+                        _ssl_load_windows_store_certs(context, storename)
+                context.set_default_verify_paths()
 
     client_certfile = params.get('client_certificate')
     if client_certfile:
@@ -1069,6 +1076,14 @@ def __init__(self, msg, countries=None, **kwargs):
         self.countries = countries
 
 
+class UserNotLive(ExtractorError):
+    """Error when a channel/user is not live"""
+
+    def __init__(self, msg=None, **kwargs):
+        kwargs['expected'] = True
+        super().__init__(msg or 'The channel is not currently live', **kwargs)
+
+
 class DownloadError(YoutubeDLError):
     """Download Error exception.
 
@@ -1679,7 +1694,11 @@ def extract_timezone(date_str):
             $)
         ''', date_str)
     if not m:
-        timezone = datetime.timedelta()
+        m = re.search(r'\d{1,2}:\d{1,2}(?:\.\d+)?(?P<tz>\s*[A-Z]+)$', date_str)
+        timezone = TIMEZONE_NAMES.get(m and m.group('tz').strip())
+        if timezone is not None:
+            date_str = date_str[:-len(m.group('tz'))]
+        timezone = datetime.timedelta(hours=timezone or 0)
     else:
         date_str = date_str[:-len(m.group('tz'))]
         if not m.group('sign'):
@@ -1741,7 +1760,8 @@ def unified_timestamp(date_str, day_first=True):
     if date_str is None:
         return None
 
-    date_str = re.sub(r'[,|]', '', date_str)
+    date_str = re.sub(r'\s+', ' ', re.sub(
+        r'(?i)[,|]|(mon|tues?|wed(nes)?|thu(rs)?|fri|sat(ur)?)(day)?', '', date_str))
 
     pm_delta = 12 if re.search(r'(?i)PM', date_str) else 0
     timezone, date_str = extract_timezone(date_str)
@@ -1763,9 +1783,10 @@ def unified_timestamp(date_str, day_first=True):
         with contextlib.suppress(ValueError):
             dt = datetime.datetime.strptime(date_str, expression) - timezone + datetime.timedelta(hours=pm_delta)
             return calendar.timegm(dt.timetuple())
+
     timetuple = email.utils.parsedate_tz(date_str)
     if timetuple:
-        return calendar.timegm(timetuple) + pm_delta * 3600
+        return calendar.timegm(timetuple) + pm_delta * 3600 - timezone.total_seconds()
 
 
 def determine_ext(url, default_ext='unknown_video'):
@@ -1906,15 +1927,30 @@ def __contains__(self, date):
     def __str__(self):
         return f'{self.start.isoformat()} - {self.end.isoformat()}'
 
+    def __eq__(self, other):
+        return (isinstance(other, DateRange)
+                and self.start == other.start and self.end == other.end)
+
 
 def platform_name():
     """ Returns the platform name as a str """
-    res = platform.platform()
-    if isinstance(res, bytes):
-        res = res.decode(preferredencoding())
+    write_string('DeprecationWarning: yt_dlp.utils.platform_name is deprecated, use platform.platform instead')
+    return platform.platform()
+
 
-    assert isinstance(res, str)
-    return res
+@functools.cache
+def system_identifier():
+    python_implementation = platform.python_implementation()
+    if python_implementation == 'PyPy' and hasattr(sys, 'pypy_version_info'):
+        python_implementation += ' version %d.%d.%d' % sys.pypy_version_info[:3]
+
+    return 'Python %s (%s %s) - %s %s' % (
+        platform.python_version(),
+        python_implementation,
+        platform.architecture()[0],
+        platform.platform(),
+        format_field(join_nonempty(*platform.libc_ver(), delim=' '), None, '(%s)'),
+    )
 
 
 @functools.cache
@@ -1968,6 +2004,7 @@ def __init__(self):
 
 # Cross-platform file locking
 if sys.platform == 'win32':
+    import ctypes
     import ctypes.wintypes
     import msvcrt
 
@@ -2347,9 +2384,10 @@ def fix_xml_ampersands(xml_str):
 def setproctitle(title):
     assert isinstance(title, str)
 
-    # ctypes in Jython is not complete
-    # http://bugs.jython.org/issue2148
-    if sys.platform.startswith('java'):
+    # Workaround for https://github.com/yt-dlp/yt-dlp/issues/4541
+    try:
+        import ctypes
+    except ImportError:
         return
 
     try:
@@ -2388,8 +2426,11 @@ def remove_quotes(s):
 
 
 def get_domain(url):
-    domain = re.match(r'(?:https?:\/\/)?(?:www\.)?(?P<domain>[^\n\/]+\.[^\n\/]+)(?:\/(.*))?', url)
-    return domain.group('domain') if domain else None
+    """
+    This implementation is inconsistent, but is kept for compatibility.
+    Use this only for "webpage_url_domain"
+    """
+    return remove_start(urllib.parse.urlparse(url).netloc, 'www.') or None
 
 
 def url_basename(url):
@@ -2648,7 +2689,7 @@ def exhaust(self):
 
     @staticmethod
     def _reverse_index(x):
-        return None if x is None else -(x + 1)
+        return None if x is None else ~x
 
     def __getitem__(self, idx):
         if isinstance(idx, slice):
@@ -2994,7 +3035,7 @@ def fixup(url):
         if not url or url.startswith(('#', ';', ']')):
             return False
         # "#" cannot be stripped out since it is part of the URI
-        # However, it can be safely stipped out if follwing a whitespace
+        # However, it can be safely stripped out if following a whitespace
         return re.split(r'\s#', url, 1)[0].rstrip()
 
     with contextlib.closing(batch_fd) as fd:
@@ -3015,9 +3056,9 @@ def update_url_query(url, query):
         query=urllib.parse.urlencode(qs, True)))
 
 
-def update_Request(req, url=None, data=None, headers={}, query={}):
+def update_Request(req, url=None, data=None, headers=None, query=None):
     req_headers = req.headers.copy()
-    req_headers.update(headers)
+    req_headers.update(headers or {})
     req_data = data or req.data
     req_url = update_url_query(url or req.get_full_url(), query)
     req_get_method = req.get_method()
@@ -3174,7 +3215,7 @@ def strip_jsonp(code):
         r'\g<callback_data>', code)
 
 
-def js_to_json(code, vars={}):
+def js_to_json(code, vars={}, *, strict=False):
     # vars is a dict of var, val pairs to substitute
     COMMENT_RE = r'/\*(?:(?!\*/).)*?\*/|//[^\n]*\n'
     SKIP_RE = fr'\s*(?:{COMMENT_RE})?\s*'
@@ -3208,14 +3249,17 @@ def fix_kv(m):
 
             if v in vars:
                 return vars[v]
+            if strict:
+                raise ValueError(f'Unknown value: {v}')
 
         return '"%s"' % v
 
     def create_map(mobj):
         return json.dumps(dict(json.loads(js_to_json(mobj.group(1) or '[]', vars=vars))))
 
-    code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
     code = re.sub(r'new Map\((\[.*?\])?\)', create_map, code)
+    if not strict:
+        code = re.sub(r'new Date\((".+")\)', r'\g<1>', code)
 
     return re.sub(r'''(?sx)
         "(?:[^"\\]*(?:\\\\|\\['"nurtbfx/\n]))*[^"\\]*"|
@@ -3403,24 +3447,23 @@ def parse_codecs(codecs_str):
         str.strip, codecs_str.strip().strip(',').split(','))))
     vcodec, acodec, scodec, hdr = None, None, None, None
     for full_codec in split_codecs:
-        parts = full_codec.split('.')
-        codec = parts[0].replace('0', '')
-        if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2',
-                     'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'):
-            if not vcodec:
-                vcodec = '.'.join(parts[:4]) if codec in ('vp9', 'av1', 'hvc1') else full_codec
-                if codec in ('dvh1', 'dvhe'):
-                    hdr = 'DV'
-                elif codec == 'av1' and len(parts) > 3 and parts[3] == '10':
-                    hdr = 'HDR10'
-                elif full_codec.replace('0', '').startswith('vp9.2'):
-                    hdr = 'HDR10'
-        elif codec in ('flac', 'mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
-            if not acodec:
-                acodec = full_codec
-        elif codec in ('stpp', 'wvtt',):
-            if not scodec:
-                scodec = full_codec
+        parts = re.sub(r'0+(?=\d)', '', full_codec).split('.')
+        if parts[0] in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2',
+                        'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'):
+            if vcodec:
+                continue
+            vcodec = full_codec
+            if parts[0] in ('dvh1', 'dvhe'):
+                hdr = 'DV'
+            elif parts[0] == 'av1' and traverse_obj(parts, 3) == '10':
+                hdr = 'HDR10'
+            elif parts[:2] == ['vp9', '2']:
+                hdr = 'HDR10'
+        elif parts[0] in ('flac', 'mp4a', 'opus', 'vorbis', 'mp3', 'aac',
+                          'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
+            acodec = acodec or full_codec
+        elif parts[0] in ('stpp', 'wvtt'):
+            scodec = scodec or full_codec
         else:
             write_string(f'WARNING: Unknown codec {full_codec}\n')
     if vcodec or acodec or scodec:
@@ -3438,6 +3481,46 @@ def parse_codecs(codecs_str):
     return {}
 
 
+def get_compatible_ext(*, vcodecs, acodecs, vexts, aexts, preferences=None):
+    assert len(vcodecs) == len(vexts) and len(acodecs) == len(aexts)
+
+    allow_mkv = not preferences or 'mkv' in preferences
+
+    if allow_mkv and max(len(acodecs), len(vcodecs)) > 1:
+        return 'mkv'  # TODO: any other format allows this?
+
+    # TODO: All codecs supported by parse_codecs isn't handled here
+    COMPATIBLE_CODECS = {
+        'mp4': {
+            'av1', 'hevc', 'avc1', 'mp4a',  # fourcc (m3u8, mpd)
+            'h264', 'aacl',  # Set in ISM
+        },
+        'webm': {
+            'av1', 'vp9', 'vp8', 'opus', 'vrbs',
+            'vp9x', 'vp8x',  # in the webm spec
+        },
+    }
+
+    sanitize_codec = functools.partial(try_get, getter=lambda x: x.split('.')[0].replace('0', ''))
+    vcodec, acodec = sanitize_codec(vcodecs[0]), sanitize_codec(acodecs[0])
+
+    for ext in preferences or COMPATIBLE_CODECS.keys():
+        codec_set = COMPATIBLE_CODECS.get(ext, set())
+        if ext == 'mkv' or codec_set.issuperset((vcodec, acodec)):
+            return ext
+
+    COMPATIBLE_EXTS = (
+        {'mp3', 'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'ismv', 'isma', 'mov'},
+        {'webm'},
+    )
+    for ext in preferences or vexts:
+        current_exts = {ext, *vexts, *aexts}
+        if ext == 'mkv' or current_exts == {ext} or any(
+                ext_sets.issuperset(current_exts) for ext_sets in COMPATIBLE_EXTS):
+            return ext
+    return 'mkv' if allow_mkv else preferences[-1]
+
+
 def urlhandle_detect_ext(url_handle):
     getheader = url_handle.headers.get
 
@@ -3466,17 +3549,19 @@ def age_restricted(content_limit, age_limit):
     return age_limit < content_limit
 
 
+# List of known byte-order-marks (BOM)
+BOMS = [
+    (b'\xef\xbb\xbf', 'utf-8'),
+    (b'\x00\x00\xfe\xff', 'utf-32-be'),
+    (b'\xff\xfe\x00\x00', 'utf-32-le'),
+    (b'\xff\xfe', 'utf-16-le'),
+    (b'\xfe\xff', 'utf-16-be'),
+]
+
+
 def is_html(first_bytes):
     """ Detect whether a file contains HTML by examining its first bytes. """
 
-    BOMS = [
-        (b'\xef\xbb\xbf', 'utf-8'),
-        (b'\x00\x00\xfe\xff', 'utf-32-be'),
-        (b'\xff\xfe\x00\x00', 'utf-32-le'),
-        (b'\xff\xfe', 'utf-16-le'),
-        (b'\xfe\xff', 'utf-16-be'),
-    ]
-
     encoding = 'utf-8'
     for bom, enc in BOMS:
         while first_bytes.startswith(bom):
@@ -3644,27 +3729,32 @@ def _match_func(info_dict, incomplete=False):
         if not filters or any(match_str(f, info_dict, incomplete) for f in filters):
             return NO_DEFAULT if interactive and not incomplete else None
         else:
-            video_title = info_dict.get('title') or info_dict.get('id') or 'video'
+            video_title = info_dict.get('title') or info_dict.get('id') or 'entry'
             filter_str = ') | ('.join(map(str.strip, filters))
             return f'{video_title} does not pass filter ({filter_str}), skipping ..'
     return _match_func
 
 
-def download_range_func(chapters, ranges):
-    def inner(info_dict, ydl):
+class download_range_func:
+    def __init__(self, chapters, ranges):
+        self.chapters, self.ranges = chapters, ranges
+
+    def __call__(self, info_dict, ydl):
         warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
                    else 'Cannot match chapters since chapter information is unavailable')
-        for regex in chapters or []:
+        for regex in self.chapters or []:
             for i, chapter in enumerate(info_dict.get('chapters') or []):
                 if re.search(regex, chapter['title']):
                     warning = None
                     yield {**chapter, 'index': i}
-        if chapters and warning:
+        if self.chapters and warning:
             ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
 
-        yield from ({'start_time': start, 'end_time': end} for start, end in ranges or [])
+        yield from ({'start_time': start, 'end_time': end} for start, end in self.ranges or [])
 
-    return inner
+    def __eq__(self, other):
+        return (isinstance(other, download_range_func)
+                and self.chapters == other.chapters and self.ranges == other.ranges)
 
 
 def parse_dfxp_time_expr(time_expr):
@@ -4744,7 +4834,7 @@ def _base_n_table(n, table):
         raise ValueError('Either table or n must be specified')
     table = (table or '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ')[:n]
 
-    if n != len(table):
+    if n and n != len(table):
         raise ValueError(f'base {n} exceeds table length {len(table)}')
     return table
 
@@ -5370,6 +5460,24 @@ def read_stdin(what):
     return sys.stdin
 
 
+def determine_file_encoding(data):
+    """
+    Detect the text encoding used
+    @returns (encoding, bytes to skip)
+    """
+
+    # BOM marks are given priority over declarations
+    for bom, enc in BOMS:
+        if data.startswith(bom):
+            return enc, len(bom)
+
+    # Strip off all null bytes to match even when UTF-16 or UTF-32 is used.
+    # We ignore the endianness to get a good enough match
+    data = data.replace(b'\0', b'')
+    mobj = re.match(rb'(?m)^#\s*coding\s*:\s*(\S+)\s*$', data)
+    return mobj.group(1).decode() if mobj else None, 0
+
+
 class Config:
     own_args = None
     parsed_args = None
@@ -5382,18 +5490,21 @@ def __init__(self, parser, label=None):
 
     def init(self, args=None, filename=None):
         assert not self.__initialized
+        self.own_args, self.filename = args, filename
+        return self.load_configs()
+
+    def load_configs(self):
         directory = ''
-        if filename:
-            location = os.path.realpath(filename)
+        if self.filename:
+            location = os.path.realpath(self.filename)
             directory = os.path.dirname(location)
             if location in self._loaded_paths:
                 return False
             self._loaded_paths.add(location)
 
-        self.own_args, self.__initialized = args, True
-        opts, _ = self.parser.parse_known_args(args)
-        self.parsed_args, self.filename = args, filename
-
+        self.__initialized = True
+        opts, _ = self.parser.parse_known_args(self.own_args)
+        self.parsed_args = self.own_args
         for location in opts.config_locations or []:
             if location == '-':
                 self.append_config(shlex.split(read_stdin('options'), comments=True), label='stdin')
@@ -5418,12 +5529,17 @@ def __str__(self):
     @staticmethod
     def read_file(filename, default=[]):
         try:
-            optionf = open(filename)
+            optionf = open(filename, 'rb')
         except OSError:
             return default  # silently skip if file is not present
+        try:
+            enc, skip = determine_file_encoding(optionf.read(512))
+            optionf.seek(skip, io.SEEK_SET)
+        except OSError:
+            enc = None  # silently skip read errors
         try:
             # FIXME: https://github.com/ytdl-org/youtube-dl/commit/dfe5fa49aed02cf36ba9f743b11b0903554b5e56
-            contents = optionf.read()
+            contents = optionf.read().decode(enc or preferredencoding())
             res = shlex.split(contents, comments=True)
         except Exception as err:
             raise ValueError(f'Unable to parse "{filename}": {err}')
@@ -5544,8 +5660,27 @@ def merge_headers(*dicts):
     return {k.title(): v for k, v in itertools.chain.from_iterable(map(dict.items, dicts))}
 
 
+def cached_method(f):
+    """Cache a method"""
+    signature = inspect.signature(f)
+
+    @functools.wraps(f)
+    def wrapper(self, *args, **kwargs):
+        bound_args = signature.bind(self, *args, **kwargs)
+        bound_args.apply_defaults()
+        key = tuple(bound_args.arguments.values())
+
+        if not hasattr(self, '__cached_method__cache'):
+            self.__cached_method__cache = {}
+        cache = self.__cached_method__cache.setdefault(f.__name__, {})
+        if key not in cache:
+            cache[key] = f(self, *args, **kwargs)
+        return cache[key]
+    return wrapper
+
+
 class classproperty:
-    """classmethod(property(func)) that works in py < 3.9"""
+    """property access for class methods"""
 
     def __init__(self, func):
         functools.update_wrapper(self, func)
@@ -5566,6 +5701,90 @@ def items_(self):
         return self.__dict__.items()
 
 
+MEDIA_EXTENSIONS = Namespace(
+    common_video=('avi', 'flv', 'mkv', 'mov', 'mp4', 'webm'),
+    video=('3g2', '3gp', 'f4v', 'mk3d', 'divx', 'mpg', 'ogv', 'm4v', 'wmv'),
+    common_audio=('aiff', 'alac', 'flac', 'm4a', 'mka', 'mp3', 'ogg', 'opus', 'wav'),
+    audio=('aac', 'ape', 'asf', 'f4a', 'f4b', 'm4b', 'm4p', 'm4r', 'oga', 'ogx', 'spx', 'vorbis', 'wma'),
+    thumbnails=('jpg', 'png', 'webp'),
+    storyboards=('mhtml', ),
+    subtitles=('srt', 'vtt', 'ass', 'lrc'),
+    manifests=('f4f', 'f4m', 'm3u8', 'smil', 'mpd'),
+)
+MEDIA_EXTENSIONS.video += MEDIA_EXTENSIONS.common_video
+MEDIA_EXTENSIONS.audio += MEDIA_EXTENSIONS.common_audio
+
+KNOWN_EXTENSIONS = (*MEDIA_EXTENSIONS.video, *MEDIA_EXTENSIONS.audio, *MEDIA_EXTENSIONS.manifests)
+
+
+class RetryManager:
+    """Usage:
+        for retry in RetryManager(...):
+            try:
+                ...
+            except SomeException as err:
+                retry.error = err
+                continue
+    """
+    attempt, _error = 0, None
+
+    def __init__(self, _retries, _error_callback, **kwargs):
+        self.retries = _retries or 0
+        self.error_callback = functools.partial(_error_callback, **kwargs)
+
+    def _should_retry(self):
+        return self._error is not NO_DEFAULT and self.attempt <= self.retries
+
+    @property
+    def error(self):
+        if self._error is NO_DEFAULT:
+            return None
+        return self._error
+
+    @error.setter
+    def error(self, value):
+        self._error = value
+
+    def __iter__(self):
+        while self._should_retry():
+            self.error = NO_DEFAULT
+            self.attempt += 1
+            yield self
+            if self.error:
+                self.error_callback(self.error, self.attempt, self.retries)
+
+    @staticmethod
+    def report_retry(e, count, retries, *, sleep_func, info, warn, error=None, suffix=None):
+        """Utility function for reporting retries"""
+        if count > retries:
+            if error:
+                return error(f'{e}. Giving up after {count - 1} retries') if count > 1 else error(str(e))
+            raise e
+
+        if not count:
+            return warn(e)
+        elif isinstance(e, ExtractorError):
+            e = remove_end(str(e.cause) or e.orig_msg, '.')
+        warn(f'{e}. Retrying{format_field(suffix, None, " %s")} ({count}/{retries})...')
+
+        delay = float_or_none(sleep_func(n=count - 1)) if callable(sleep_func) else sleep_func
+        if delay:
+            info(f'Sleeping {delay:.2f} seconds ...')
+            time.sleep(delay)
+
+
+def make_archive_id(ie, video_id):
+    ie_key = ie if isinstance(ie, str) else ie.ie_key()
+    return f'{ie_key.lower()} {video_id}'
+
+
+def truncate_string(s, left, right=0):
+    assert left > 3 and right >= 0
+    if s is None or len(s) <= left + right:
+        return s
+    return f'{s[:left-3]}...{s[-right:]}'
+
+
 # Deprecated
 has_certifi = bool(certifi)
 has_websockets = bool(websockets)