]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils/_utils.py
[cleanup] Misc
[yt-dlp.git] / yt_dlp / utils / _utils.py
index d78022295b19e1141e8e661c6c18b4abe7c259c9..10052009fcc5b3991ba74bbb430f0e73031e6215 100644 (file)
@@ -25,6 +25,7 @@
 import locale
 import math
 import mimetypes
+import netrc
 import operator
 import os
 import platform
@@ -223,6 +224,7 @@ def IDENTITY(x):
     '%d/%m/%y',
     '%d/%m/%Y %H:%M:%S',
     '%d-%m-%Y %H:%M',
+    '%H:%M %d/%m/%Y',
 ])
 
 DATE_FORMATS_MONTH_FIRST = list(DATE_FORMATS)
@@ -863,10 +865,11 @@ def escapeHTML(text):
     )
 
 
-def process_communicate_or_kill(p, *args, **kwargs):
-    deprecation_warning(f'"{__name__}.process_communicate_or_kill" is deprecated and may be removed '
-                        f'in a future version. Use "{__name__}.Popen.communicate_or_kill" instead')
-    return Popen.communicate_or_kill(p, *args, **kwargs)
+class netrc_from_content(netrc.netrc):
+    def __init__(self, content):
+        self.hosts, self.macros = {}, {}
+        with io.StringIO(content) as stream:
+            self._parse('-', stream, False)
 
 
 class Popen(subprocess.Popen):
@@ -1361,6 +1364,23 @@ def brotli(data):
             return data
         return brotli.decompress(data)
 
+    @staticmethod
+    def gz(data):
+        gz = gzip.GzipFile(fileobj=io.BytesIO(data), mode='rb')
+        try:
+            return gz.read()
+        except OSError as original_oserror:
+            # There may be junk add the end of the file
+            # See http://stackoverflow.com/q/4928560/35070 for details
+            for i in range(1, 1024):
+                try:
+                    gz = gzip.GzipFile(fileobj=io.BytesIO(data[:-i]), mode='rb')
+                    return gz.read()
+                except OSError:
+                    continue
+            else:
+                raise original_oserror
+
     def http_request(self, req):
         # According to RFC 3986, URLs can not contain non-ASCII characters, however this is not
         # always respected by websites, some tend to give out URLs with non percent-encoded
@@ -1394,35 +1414,21 @@ def http_request(self, req):
 
     def http_response(self, req, resp):
         old_resp = resp
-        # gzip
-        if resp.headers.get('Content-encoding', '') == 'gzip':
-            content = resp.read()
-            gz = gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb')
-            try:
-                uncompressed = io.BytesIO(gz.read())
-            except OSError as original_ioerror:
-                # There may be junk add the end of the file
-                # See http://stackoverflow.com/q/4928560/35070 for details
-                for i in range(1, 1024):
-                    try:
-                        gz = gzip.GzipFile(fileobj=io.BytesIO(content[:-i]), mode='rb')
-                        uncompressed = io.BytesIO(gz.read())
-                    except OSError:
-                        continue
-                    break
-                else:
-                    raise original_ioerror
-            resp = urllib.request.addinfourl(uncompressed, old_resp.headers, old_resp.url, old_resp.code)
-            resp.msg = old_resp.msg
-        # deflate
-        if resp.headers.get('Content-encoding', '') == 'deflate':
-            gz = io.BytesIO(self.deflate(resp.read()))
-            resp = urllib.request.addinfourl(gz, old_resp.headers, old_resp.url, old_resp.code)
-            resp.msg = old_resp.msg
-        # brotli
-        if resp.headers.get('Content-encoding', '') == 'br':
-            resp = urllib.request.addinfourl(
-                io.BytesIO(self.brotli(resp.read())), old_resp.headers, old_resp.url, old_resp.code)
+
+        # Content-Encoding header lists the encodings in order that they were applied [1].
+        # To decompress, we simply do the reverse.
+        # [1]: https://datatracker.ietf.org/doc/html/rfc9110#name-content-encoding
+        decoded_response = None
+        for encoding in (e.strip() for e in reversed(resp.headers.get('Content-encoding', '').split(','))):
+            if encoding == 'gzip':
+                decoded_response = self.gz(decoded_response or resp.read())
+            elif encoding == 'deflate':
+                decoded_response = self.deflate(decoded_response or resp.read())
+            elif encoding == 'br' and brotli:
+                decoded_response = self.brotli(decoded_response or resp.read())
+
+        if decoded_response is not None:
+            resp = urllib.request.addinfourl(io.BytesIO(decoded_response), old_resp.headers, old_resp.url, old_resp.code)
             resp.msg = old_resp.msg
         # Percent-encode redirect URL of Location HTTP header to satisfy RFC 3986 (see
         # https://github.com/ytdl-org/youtube-dl/issues/6457).
@@ -1518,136 +1524,6 @@ def is_path_like(f):
     return isinstance(f, (str, bytes, os.PathLike))
 
 
-class YoutubeDLCookieJar(http.cookiejar.MozillaCookieJar):
-    """
-    See [1] for cookie file format.
-
-    1. https://curl.haxx.se/docs/http-cookies.html
-    """
-    _HTTPONLY_PREFIX = '#HttpOnly_'
-    _ENTRY_LEN = 7
-    _HEADER = '''# Netscape HTTP Cookie File
-# This file is generated by yt-dlp.  Do not edit.
-
-'''
-    _CookieFileEntry = collections.namedtuple(
-        'CookieFileEntry',
-        ('domain_name', 'include_subdomains', 'path', 'https_only', 'expires_at', 'name', 'value'))
-
-    def __init__(self, filename=None, *args, **kwargs):
-        super().__init__(None, *args, **kwargs)
-        if is_path_like(filename):
-            filename = os.fspath(filename)
-        self.filename = filename
-
-    @staticmethod
-    def _true_or_false(cndn):
-        return 'TRUE' if cndn else 'FALSE'
-
-    @contextlib.contextmanager
-    def open(self, file, *, write=False):
-        if is_path_like(file):
-            with open(file, 'w' if write else 'r', encoding='utf-8') as f:
-                yield f
-        else:
-            if write:
-                file.truncate(0)
-            yield file
-
-    def _really_save(self, f, ignore_discard=False, ignore_expires=False):
-        now = time.time()
-        for cookie in self:
-            if (not ignore_discard and cookie.discard
-                    or not ignore_expires and cookie.is_expired(now)):
-                continue
-            name, value = cookie.name, cookie.value
-            if value is None:
-                # cookies.txt regards 'Set-Cookie: foo' as a cookie
-                # with no name, whereas http.cookiejar regards it as a
-                # cookie with no value.
-                name, value = '', name
-            f.write('%s\n' % '\t'.join((
-                cookie.domain,
-                self._true_or_false(cookie.domain.startswith('.')),
-                cookie.path,
-                self._true_or_false(cookie.secure),
-                str_or_none(cookie.expires, default=''),
-                name, value
-            )))
-
-    def save(self, filename=None, *args, **kwargs):
-        """
-        Save cookies to a file.
-        Code is taken from CPython 3.6
-        https://github.com/python/cpython/blob/8d999cbf4adea053be6dbb612b9844635c4dfb8e/Lib/http/cookiejar.py#L2091-L2117 """
-
-        if filename is None:
-            if self.filename is not None:
-                filename = self.filename
-            else:
-                raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
-
-        # Store session cookies with `expires` set to 0 instead of an empty string
-        for cookie in self:
-            if cookie.expires is None:
-                cookie.expires = 0
-
-        with self.open(filename, write=True) as f:
-            f.write(self._HEADER)
-            self._really_save(f, *args, **kwargs)
-
-    def load(self, filename=None, ignore_discard=False, ignore_expires=False):
-        """Load cookies from a file."""
-        if filename is None:
-            if self.filename is not None:
-                filename = self.filename
-            else:
-                raise ValueError(http.cookiejar.MISSING_FILENAME_TEXT)
-
-        def prepare_line(line):
-            if line.startswith(self._HTTPONLY_PREFIX):
-                line = line[len(self._HTTPONLY_PREFIX):]
-            # comments and empty lines are fine
-            if line.startswith('#') or not line.strip():
-                return line
-            cookie_list = line.split('\t')
-            if len(cookie_list) != self._ENTRY_LEN:
-                raise http.cookiejar.LoadError('invalid length %d' % len(cookie_list))
-            cookie = self._CookieFileEntry(*cookie_list)
-            if cookie.expires_at and not cookie.expires_at.isdigit():
-                raise http.cookiejar.LoadError('invalid expires at %s' % cookie.expires_at)
-            return line
-
-        cf = io.StringIO()
-        with self.open(filename) as f:
-            for line in f:
-                try:
-                    cf.write(prepare_line(line))
-                except http.cookiejar.LoadError as e:
-                    if f'{line.strip()} '[0] in '[{"':
-                        raise http.cookiejar.LoadError(
-                            'Cookies file must be Netscape formatted, not JSON. See  '
-                            'https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp')
-                    write_string(f'WARNING: skipping cookie file entry due to {e}: {line!r}\n')
-                    continue
-        cf.seek(0)
-        self._really_load(cf, filename, ignore_discard, ignore_expires)
-        # Session cookies are denoted by either `expires` field set to
-        # an empty string or 0. MozillaCookieJar only recognizes the former
-        # (see [1]). So we need force the latter to be recognized as session
-        # cookies on our own.
-        # Session cookies may be important for cookies-based authentication,
-        # e.g. usually, when user does not check 'Remember me' check box while
-        # logging in on a site, some important cookies are stored as session
-        # cookies so that not recognizing them will result in failed login.
-        # 1. https://bugs.python.org/issue17164
-        for cookie in self:
-            # Treat `expires=0` cookies as session cookies
-            if cookie.expires == 0:
-                cookie.expires = None
-                cookie.discard = True
-
-
 class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor):
     def __init__(self, cookiejar=None):
         urllib.request.HTTPCookieProcessor.__init__(self, cookiejar)
@@ -1780,7 +1656,7 @@ def unified_strdate(date_str, day_first=True):
 
 
 def unified_timestamp(date_str, day_first=True):
-    if date_str is None:
+    if not isinstance(date_str, str):
         return None
 
     date_str = re.sub(r'\s+', ' ', re.sub(
@@ -2572,13 +2448,16 @@ def request_to_url(req):
         return req
 
 
-def strftime_or_none(timestamp, date_format, default=None):
+def strftime_or_none(timestamp, date_format='%Y%m%d', default=None):
     datetime_object = None
     try:
         if isinstance(timestamp, (int, float)):  # unix timestamp
             # Using naive datetime here can break timestamp() in Windows
             # Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
-            datetime_object = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
+            # Also, datetime.datetime.fromtimestamp breaks for negative timestamps
+            # Ref: https://github.com/yt-dlp/yt-dlp/issues/6706#issuecomment-1496842642
+            datetime_object = (datetime.datetime.fromtimestamp(0, datetime.timezone.utc)
+                               + datetime.timedelta(seconds=timestamp))
         elif isinstance(timestamp, str):  # assume YYYYMMDD
             datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
         date_format = re.sub(  # Support %s on windows
@@ -3425,7 +3304,7 @@ def q(qid):
 '''
 
 
-STR_FORMAT_TYPES = 'diouxXeEfFgGcrs'
+STR_FORMAT_TYPES = 'diouxXeEfFgGcrsa'
 
 
 def limit_length(s, length):
@@ -3628,7 +3507,8 @@ def get_compatible_ext(*, vcodecs, acodecs, vexts, aexts, preferences=None):
         },
     }
 
-    sanitize_codec = functools.partial(try_get, getter=lambda x: x[0].split('.')[0].replace('0', ''))
+    sanitize_codec = functools.partial(
+        try_get, getter=lambda x: x[0].split('.')[0].replace('0', '').lower())
     vcodec, acodec = sanitize_codec(vcodecs), sanitize_codec(acodecs)
 
     for ext in preferences or COMPATIBLE_CODECS.keys():
@@ -3874,11 +3754,11 @@ def _match_func(info_dict, incomplete=False):
 
 
 class download_range_func:
-    def __init__(self, chapters, ranges):
-        self.chapters, self.ranges = chapters, ranges
+    def __init__(self, chapters, ranges, from_info=False):
+        self.chapters, self.ranges, self.from_info = chapters, ranges, from_info
 
     def __call__(self, info_dict, ydl):
-        if not self.ranges and not self.chapters:
+        if not any((self.ranges, self.chapters, self.from_info)):
             yield {}
 
         warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
@@ -3891,7 +3771,21 @@ def __call__(self, info_dict, ydl):
         if self.chapters and warning:
             ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
 
-        yield from ({'start_time': start, 'end_time': end} for start, end in self.ranges or [])
+        for start, end in self.ranges or []:
+            yield {
+                'start_time': self._handle_negative_timestamp(start, info_dict),
+                'end_time': self._handle_negative_timestamp(end, info_dict),
+            }
+
+        if self.from_info and (info_dict.get('start_time') or info_dict.get('end_time')):
+            yield {
+                'start_time': info_dict.get('start_time'),
+                'end_time': info_dict.get('end_time'),
+            }
+
+    @staticmethod
+    def _handle_negative_timestamp(time, info):
+        return max(info['duration'] + time, 0) if info.get('duration') and time < 0 else time
 
     def __eq__(self, other):
         return (isinstance(other, download_range_func)
@@ -4273,6 +4167,7 @@ class ISO639Utils:
         'or': 'ori',
         'os': 'oss',
         'pa': 'pan',
+        'pe': 'per',
         'pi': 'pli',
         'pl': 'pol',
         'ps': 'pus',
@@ -5794,6 +5689,7 @@ def orderedSet_from_options(options, alias_dict, *, use_regex=False, start=None)
     return orderedSet(requested)
 
 
+# TODO: Rewrite
 class FormatSorter:
     regex = r' *((?P<reverse>\+)?(?P<field>[a-zA-Z0-9_]+)((?P<separator>[~:])(?P<limit>.*?))?)? *$'
 
@@ -5842,8 +5738,10 @@ class FormatSorter:
         'source': {'convert': 'float', 'field': 'source_preference', 'default': -1},
 
         'codec': {'type': 'combined', 'field': ('vcodec', 'acodec')},
-        'br': {'type': 'combined', 'field': ('tbr', 'vbr', 'abr'), 'same_limit': True},
-        'size': {'type': 'combined', 'same_limit': True, 'field': ('filesize', 'fs_approx')},
+        'br': {'type': 'multiple', 'field': ('tbr', 'vbr', 'abr'), 'convert': 'float_none',
+               'function': lambda it: next(filter(None, it), None)},
+        'size': {'type': 'multiple', 'field': ('filesize', 'fs_approx'), 'convert': 'bytes',
+                 'function': lambda it: next(filter(None, it), None)},
         'ext': {'type': 'combined', 'field': ('vext', 'aext')},
         'res': {'type': 'multiple', 'field': ('height', 'width'),
                 'function': lambda it: (lambda l: min(l) if l else 0)(tuple(filter(None, it)))},
@@ -6074,13 +5972,15 @@ def calculate_preference(self, format):
             format['preference'] = -100
 
         # Determine missing bitrates
-        if format.get('tbr') is None:
-            if format.get('vbr') is not None and format.get('abr') is not None:
-                format['tbr'] = format.get('vbr', 0) + format.get('abr', 0)
-        else:
-            if format.get('vcodec') != 'none' and format.get('vbr') is None:
-                format['vbr'] = format.get('tbr') - format.get('abr', 0)
-            if format.get('acodec') != 'none' and format.get('abr') is None:
-                format['abr'] = format.get('tbr') - format.get('vbr', 0)
+        if format.get('vcodec') == 'none':
+            format['vbr'] = 0
+        if format.get('acodec') == 'none':
+            format['abr'] = 0
+        if not format.get('vbr') and format.get('vcodec') != 'none':
+            format['vbr'] = try_call(lambda: format['tbr'] - format['abr']) or None
+        if not format.get('abr') and format.get('acodec') != 'none':
+            format['abr'] = try_call(lambda: format['tbr'] - format['vbr']) or None
+        if not format.get('tbr'):
+            format['tbr'] = try_call(lambda: format['vbr'] + format['abr']) or None
 
         return tuple(self._calculate_field_preference(format, field) for field in self._order)