]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/utils/_utils.py
[utils] clean_podcast_url: Handle protocol in redirect URL
[yt-dlp.git] / yt_dlp / utils / _utils.py
index 64621011653145f8ce9ff9f6eac13fa0f48262be..f68cdb96868b45ae0b393a806094dd69778856a7 100644 (file)
@@ -25,6 +25,7 @@
 import locale
 import math
 import mimetypes
+import netrc
 import operator
 import os
 import platform
@@ -864,10 +865,11 @@ def escapeHTML(text):
     )
 
 
-def process_communicate_or_kill(p, *args, **kwargs):
-    deprecation_warning(f'"{__name__}.process_communicate_or_kill" is deprecated and may be removed '
-                        f'in a future version. Use "{__name__}.Popen.communicate_or_kill" instead')
-    return Popen.communicate_or_kill(p, *args, **kwargs)
+class netrc_from_content(netrc.netrc):
+    def __init__(self, content):
+        self.hosts, self.macros = {}, {}
+        with io.StringIO(content) as stream:
+            self._parse('-', stream, False)
 
 
 class Popen(subprocess.Popen):
@@ -1654,7 +1656,7 @@ def unified_strdate(date_str, day_first=True):
 
 
 def unified_timestamp(date_str, day_first=True):
-    if date_str is None:
+    if not isinstance(date_str, str):
         return None
 
     date_str = re.sub(r'\s+', ' ', re.sub(
@@ -2446,13 +2448,16 @@ def request_to_url(req):
         return req
 
 
-def strftime_or_none(timestamp, date_format, default=None):
+def strftime_or_none(timestamp, date_format='%Y%m%d', default=None):
     datetime_object = None
     try:
         if isinstance(timestamp, (int, float)):  # unix timestamp
             # Using naive datetime here can break timestamp() in Windows
             # Ref: https://github.com/yt-dlp/yt-dlp/issues/5185, https://github.com/python/cpython/issues/94414
-            datetime_object = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
+            # Also, datetime.datetime.fromtimestamp breaks for negative timestamps
+            # Ref: https://github.com/yt-dlp/yt-dlp/issues/6706#issuecomment-1496842642
+            datetime_object = (datetime.datetime.fromtimestamp(0, datetime.timezone.utc)
+                               + datetime.timedelta(seconds=timestamp))
         elif isinstance(timestamp, str):  # assume YYYYMMDD
             datetime_object = datetime.datetime.strptime(timestamp, '%Y%m%d')
         date_format = re.sub(  # Support %s on windows
@@ -3299,7 +3304,7 @@ def q(qid):
 '''
 
 
-STR_FORMAT_TYPES = 'diouxXeEfFgGcrs'
+STR_FORMAT_TYPES = 'diouxXeEfFgGcrsa'
 
 
 def limit_length(s, length):
@@ -3502,7 +3507,8 @@ def get_compatible_ext(*, vcodecs, acodecs, vexts, aexts, preferences=None):
         },
     }
 
-    sanitize_codec = functools.partial(try_get, getter=lambda x: x[0].split('.')[0].replace('0', ''))
+    sanitize_codec = functools.partial(
+        try_get, getter=lambda x: x[0].split('.')[0].replace('0', '').lower())
     vcodec, acodec = sanitize_codec(vcodecs), sanitize_codec(acodecs)
 
     for ext in preferences or COMPATIBLE_CODECS.keys():
@@ -3748,12 +3754,10 @@ def _match_func(info_dict, incomplete=False):
 
 
 class download_range_func:
-    def __init__(self, chapters, ranges):
-        self.chapters, self.ranges = chapters, ranges
+    def __init__(self, chapters, ranges, from_info=False):
+        self.chapters, self.ranges, self.from_info = chapters, ranges, from_info
 
     def __call__(self, info_dict, ydl):
-        if not self.ranges and not self.chapters:
-            yield {}
 
         warning = ('There are no chapters matching the regex' if info_dict.get('chapters')
                    else 'Cannot match chapters since chapter information is unavailable')
@@ -3765,7 +3769,23 @@ def __call__(self, info_dict, ydl):
         if self.chapters and warning:
             ydl.to_screen(f'[info] {info_dict["id"]}: {warning}')
 
-        yield from ({'start_time': start, 'end_time': end} for start, end in self.ranges or [])
+        for start, end in self.ranges or []:
+            yield {
+                'start_time': self._handle_negative_timestamp(start, info_dict),
+                'end_time': self._handle_negative_timestamp(end, info_dict),
+            }
+
+        if self.from_info and (info_dict.get('start_time') or info_dict.get('end_time')):
+            yield {
+                'start_time': info_dict.get('start_time') or 0,
+                'end_time': info_dict.get('end_time') or float('inf'),
+            }
+        elif not self.ranges and not self.chapters:
+            yield {}
+
+    @staticmethod
+    def _handle_negative_timestamp(time, info):
+        return max(info['duration'] + time, 0) if info.get('duration') and time < 0 else time
 
     def __eq__(self, other):
         return (isinstance(other, download_range_func)
@@ -5093,7 +5113,7 @@ def format_field(obj, field=None, template='%s', ignore=NO_DEFAULT, default='',
 
 
 def clean_podcast_url(url):
-    return re.sub(r'''(?x)
+    url = re.sub(r'''(?x)
         (?:
             (?:
                 chtbl\.com/track|
@@ -5107,6 +5127,7 @@ def clean_podcast_url(url):
                 st\.fm # https://podsights.com/docs/
             )/e
         )/''', '', url)
+    return re.sub(r'^\w+://(\w+://)', r'\1', url)
 
 
 _HEX_TABLE = '0123456789abcdef'
@@ -5669,6 +5690,7 @@ def orderedSet_from_options(options, alias_dict, *, use_regex=False, start=None)
     return orderedSet(requested)
 
 
+# TODO: Rewrite
 class FormatSorter:
     regex = r' *((?P<reverse>\+)?(?P<field>[a-zA-Z0-9_]+)((?P<separator>[~:])(?P<limit>.*?))?)? *$'
 
@@ -5717,8 +5739,10 @@ class FormatSorter:
         'source': {'convert': 'float', 'field': 'source_preference', 'default': -1},
 
         'codec': {'type': 'combined', 'field': ('vcodec', 'acodec')},
-        'br': {'type': 'combined', 'field': ('tbr', 'vbr', 'abr'), 'same_limit': True},
-        'size': {'type': 'combined', 'same_limit': True, 'field': ('filesize', 'fs_approx')},
+        'br': {'type': 'multiple', 'field': ('tbr', 'vbr', 'abr'), 'convert': 'float_none',
+               'function': lambda it: next(filter(None, it), None)},
+        'size': {'type': 'multiple', 'field': ('filesize', 'fs_approx'), 'convert': 'bytes',
+                 'function': lambda it: next(filter(None, it), None)},
         'ext': {'type': 'combined', 'field': ('vext', 'aext')},
         'res': {'type': 'multiple', 'field': ('height', 'width'),
                 'function': lambda it: (lambda l: min(l) if l else 0)(tuple(filter(None, it)))},
@@ -5949,13 +5973,15 @@ def calculate_preference(self, format):
             format['preference'] = -100
 
         # Determine missing bitrates
-        if format.get('tbr') is None:
-            if format.get('vbr') is not None and format.get('abr') is not None:
-                format['tbr'] = format.get('vbr', 0) + format.get('abr', 0)
-        else:
-            if format.get('vcodec') != 'none' and format.get('vbr') is None:
-                format['vbr'] = format.get('tbr') - format.get('abr', 0)
-            if format.get('acodec') != 'none' and format.get('abr') is None:
-                format['abr'] = format.get('tbr') - format.get('vbr', 0)
+        if format.get('vcodec') == 'none':
+            format['vbr'] = 0
+        if format.get('acodec') == 'none':
+            format['abr'] = 0
+        if not format.get('vbr') and format.get('vcodec') != 'none':
+            format['vbr'] = try_call(lambda: format['tbr'] - format['abr']) or None
+        if not format.get('abr') and format.get('acodec') != 'none':
+            format['abr'] = try_call(lambda: format['tbr'] - format['vbr']) or None
+        if not format.get('tbr'):
+            format['tbr'] = try_call(lambda: format['vbr'] + format['abr']) or None
 
         return tuple(self._calculate_field_preference(format, field) for field in self._order)