]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils.py
[utils] Add `join_nonempty`
[yt-dlp.git] / yt_dlp / utils.py
index be93b0ef27219776ac6069673f5afe1751b837dc..75b4ed61b6d47c19e503d106ffc9b1876f782bb9 100644 (file)
@@ -2006,6 +2006,23 @@ def handle_starttag(self, tag, attrs):
         self.attrs = dict(attrs)
 
 
+class HTMLListAttrsParser(compat_HTMLParser):
+    """HTML parser to gather the attributes for the elements of a list"""
+
+    def __init__(self):
+        compat_HTMLParser.__init__(self)
+        self.items = []
+        self._level = 0
+
+    def handle_starttag(self, tag, attrs):
+        if tag == 'li' and self._level == 0:
+            self.items.append(dict(attrs))
+        self._level += 1
+
+    def handle_endtag(self, tag):
+        self._level -= 1
+
+
 def extract_attributes(html_element):
     """Given a string for an HTML element such as
     <el
@@ -2032,6 +2049,15 @@ def extract_attributes(html_element):
     return parser.attrs
 
 
+def parse_list(webpage):
+    """Given a string for an series of HTML <li> elements,
+    return a dictionary of their attributes"""
+    parser = HTMLListAttrsParser()
+    parser.feed(webpage)
+    parser.close()
+    return parser.items
+
+
 def clean_html(html):
     """Clean an HTML snippet into a readable string"""
 
@@ -2492,9 +2518,9 @@ class GeoRestrictedError(ExtractorError):
     geographic location due to geographic restrictions imposed by a website.
     """
 
-    def __init__(self, msg, countries=None):
-        super(GeoRestrictedError, self).__init__(msg, expected=True)
-        self.msg = msg
+    def __init__(self, msg, countries=None, **kwargs):
+        kwargs['expected'] = True
+        super(GeoRestrictedError, self).__init__(msg, **kwargs)
         self.countries = countries
 
 
@@ -2542,23 +2568,33 @@ def __init__(self, msg):
         self.msg = msg
 
 
-class ExistingVideoReached(YoutubeDLError):
-    """ --max-downloads limit has been reached. """
-    pass
+class DownloadCancelled(YoutubeDLError):
+    """ Exception raised when the download queue should be interrupted """
+    msg = 'The download was cancelled'
 
+    def __init__(self, msg=None):
+        if msg is not None:
+            self.msg = msg
+        YoutubeDLError.__init__(self, self.msg)
 
-class RejectedVideoReached(YoutubeDLError):
-    """ --max-downloads limit has been reached. """
-    pass
 
+class ExistingVideoReached(DownloadCancelled):
+    """ --break-on-existing triggered """
+    msg = 'Encountered a video that is already in the archive, stopping due to --break-on-existing'
 
-class ThrottledDownload(YoutubeDLError):
-    """ Download speed below --throttled-rate. """
-    pass
+
+class RejectedVideoReached(DownloadCancelled):
+    """ --break-on-reject triggered """
+    msg = 'Encountered a video that did not match filter, stopping due to --break-on-reject'
 
 
-class MaxDownloadsReached(YoutubeDLError):
+class MaxDownloadsReached(DownloadCancelled):
     """ --max-downloads limit has been reached. """
+    msg = 'Maximum number of downloads reached, stopping due to --max-downloads'
+
+
+class ThrottledDownload(YoutubeDLError):
+    """ Download speed below --throttled-rate. """
     pass
 
 
@@ -3861,7 +3897,7 @@ def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1):
         return default
     try:
         return int(v) * invscale // scale
-    except (ValueError, TypeError):
+    except (ValueError, TypeError, OverflowError):
         return default
 
 
@@ -3997,10 +4033,7 @@ def check_executable(exe, args=[]):
     return exe
 
 
-def get_exe_version(exe, args=['--version'],
-                    version_re=None, unrecognized='present'):
-    """ Returns the version of the specified executable,
-    or False if the executable is not present """
+def _get_exe_version_output(exe, args):
     try:
         # STDIN should be redirected too. On UNIX-like systems, ffmpeg triggers
         # SIGTTOU if yt-dlp is run in the background.
@@ -4012,7 +4045,7 @@ def get_exe_version(exe, args=['--version'],
         return False
     if isinstance(out, bytes):  # Python 2.x
         out = out.decode('ascii', 'ignore')
-    return detect_exe_version(out, version_re, unrecognized)
+    return out
 
 
 def detect_exe_version(output, version_re=None, unrecognized='present'):
@@ -4026,6 +4059,14 @@ def detect_exe_version(output, version_re=None, unrecognized='present'):
         return unrecognized
 
 
+def get_exe_version(exe, args=['--version'],
+                    version_re=None, unrecognized='present'):
+    """ Returns the version of the specified executable,
+    or False if the executable is not present """
+    out = _get_exe_version_output(exe, args)
+    return detect_exe_version(out, version_re, unrecognized) if out else False
+
+
 class LazyList(collections.abc.Sequence):
     ''' Lazy immutable list from an iterable
     Note that slices of a LazyList are lists and not LazyList'''
@@ -4503,6 +4544,7 @@ def q(qid):
     'description': 'description',
     'annotation': 'annotations.xml',
     'infojson': 'info.json',
+    'link': None,
     'pl_thumbnail': None,
     'pl_description': 'description',
     'pl_infojson': 'info.json',
@@ -4645,19 +4687,18 @@ def parse_codecs(codecs_str):
         str.strip, codecs_str.strip().strip(',').split(','))))
     vcodec, acodec, hdr = None, None, None
     for full_codec in split_codecs:
-        codec = full_codec.split('.')[0]
-        if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2', 'h263', 'h264', 'mp4v', 'hvc1', 'av01', 'theora', 'dvh1', 'dvhe'):
+        parts = full_codec.split('.')
+        codec = parts[0].replace('0', '')
+        if codec in ('avc1', 'avc2', 'avc3', 'avc4', 'vp9', 'vp8', 'hev1', 'hev2',
+                     'h263', 'h264', 'mp4v', 'hvc1', 'av1', 'theora', 'dvh1', 'dvhe'):
             if not vcodec:
-                vcodec = full_codec
+                vcodec = '.'.join(parts[:4]) if codec in ('vp9', 'av1') else full_codec
                 if codec in ('dvh1', 'dvhe'):
                     hdr = 'DV'
-                elif codec == 'vp9' and vcodec.startswith('vp9.2'):
+                elif codec == 'av1' and len(parts) > 3 and parts[3] == '10':
+                    hdr = 'HDR10'
+                elif full_codec.replace('0', '').startswith('vp9.2'):
                     hdr = 'HDR10'
-                elif codec == 'av01':
-                    parts = full_codec.split('.')
-                    if len(parts) > 3 and parts[3] == '10':
-                        hdr = 'HDR10'
-                        vcodec = '.'.join(parts[:4])
         elif codec in ('mp4a', 'opus', 'vorbis', 'mp3', 'aac', 'ac-3', 'ec-3', 'eac3', 'dtsc', 'dtse', 'dtsh', 'dtsl'):
             if not acodec:
                 acodec = full_codec
@@ -6238,6 +6279,12 @@ def random_birthday(year_field, month_field, day_field):
 Icon=text-html
 '''.lstrip()
 
+LINK_TEMPLATES = {
+    'url': DOT_URL_LINK_TEMPLATE,
+    'desktop': DOT_DESKTOP_LINK_TEMPLATE,
+    'webloc': DOT_WEBLOC_LINK_TEMPLATE,
+}
+
 
 def iri_to_uri(iri):
     """
@@ -6495,6 +6542,13 @@ def jwt_encode_hs256(payload_data, key, headers={}):
     return token
 
 
+# can be extended in future to verify the signature and parse header and return the algorithm used if it's not HS256
+def jwt_decode_hs256(jwt):
+    header_b64, payload_b64, signature_b64 = jwt.split('.')
+    payload_data = json.loads(base64.urlsafe_b64decode(payload_b64))
+    return payload_data
+
+
 def supports_terminal_sequences(stream):
     if compat_os_name == 'nt':
         if get_windows_version() < (10, 0, 10586):
@@ -6516,3 +6570,9 @@ def remove_terminal_sequences(string):
 
 def number_of_digits(number):
     return len('%d' % number)
+
+
+def join_nonempty(*values, delim='-', from_dict=None):
+    if from_dict is not None:
+        values = operator.itemgetter(values)(from_dict)
+    return delim.join(map(str, filter(None, values)))