]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/YoutubeDL.py
[core] Fix format selection parse error for CPython 3.12 (#8797)
[yt-dlp.git] / yt_dlp / YoutubeDL.py
index 1fb3e4ad2b3337401aba66a418626c1e923a4989..5e28fd0e21c00c6d1b1d563ea5cf38e5a0b05545 100644 (file)
@@ -60,7 +60,7 @@
     get_postprocessor,
 )
 from .postprocessor.ffmpeg import resolve_mapping as resolve_recode_mapping
-from .update import REPOSITORY, _get_system_deprecation, current_git_head, detect_variant
+from .update import REPOSITORY, _get_system_deprecation, _make_label, current_git_head, detect_variant
 from .utils import (
     DEFAULT_OUTTMPL,
     IDENTITY,
@@ -625,13 +625,16 @@ def __init__(self, params=None, auto_init=True):
                     'Overwriting params from "color" with "no_color"')
             self.params['color'] = 'no_color'
 
-        term_allow_color = os.environ.get('TERM', '').lower() != 'dumb'
+        term_allow_color = os.getenv('TERM', '').lower() != 'dumb'
+        no_color = bool(os.getenv('NO_COLOR'))
 
         def process_color_policy(stream):
             stream_name = {sys.stdout: 'stdout', sys.stderr: 'stderr'}[stream]
             policy = traverse_obj(self.params, ('color', (stream_name, None), {str}), get_all=False)
             if policy in ('auto', None):
-                return term_allow_color and supports_terminal_sequences(stream)
+                if term_allow_color and supports_terminal_sequences(stream):
+                    return 'no_color' if no_color else True
+                return False
             assert policy in ('always', 'never', 'no_color'), policy
             return {'always': True, 'never': False}.get(policy, policy)
 
@@ -1176,6 +1179,7 @@ def prepare_outtmpl(self, outtmpl, info_dict, sanitize=False):
         MATH_FUNCTIONS = {
             '+': float.__add__,
             '-': float.__sub__,
+            '*': float.__mul__,
         }
         # Field is of the form key1.key2...
         # where keys (except first) can be string, int, slice or "{field, ...}"
@@ -1197,6 +1201,15 @@ def prepare_outtmpl(self, outtmpl, info_dict, sanitize=False):
                 (?:\|(?P<default>.*?))?
             )$''')
 
+        def _from_user_input(field):
+            if field == ':':
+                return ...
+            elif ':' in field:
+                return slice(*map(int_or_none, field.split(':')))
+            elif int_or_none(field) is not None:
+                return int(field)
+            return field
+
         def _traverse_infodict(fields):
             fields = [f for x in re.split(r'\.({.+?})\.?', fields)
                       for f in ([x] if x.startswith('{') else x.split('.'))]
@@ -1206,11 +1219,12 @@ def _traverse_infodict(fields):
 
             for i, f in enumerate(fields):
                 if not f.startswith('{'):
+                    fields[i] = _from_user_input(f)
                     continue
                 assert f.endswith('}'), f'No closing brace for {f} in {fields}'
-                fields[i] = {k: k.split('.') for k in f[1:-1].split(',')}
+                fields[i] = {k: list(map(_from_user_input, k.split('.'))) for k in f[1:-1].split(',')}
 
-            return traverse_obj(info_dict, fields, is_user_input=True, traverse_string=True)
+            return traverse_obj(info_dict, fields, traverse_string=True)
 
         def get_value(mdict):
             # Object traversal
@@ -2451,9 +2465,16 @@ def final_selector(ctx):
                 return selector_function(ctx_copy)
             return final_selector
 
-        stream = io.BytesIO(format_spec.encode())
+        # HACK: Python 3.12 changed the underlying parser, rendering '7_a' invalid
+        #       Prefix numbers with random letters to avoid it being classified as a number
+        #       See: https://github.com/yt-dlp/yt-dlp/pulls/8797
+        # TODO: Implement parser not reliant on tokenize.tokenize
+        prefix = ''.join(random.choices(string.ascii_letters, k=32))
+        stream = io.BytesIO(re.sub(r'\d[_\d]*', rf'{prefix}\g<0>', format_spec).encode())
         try:
-            tokens = list(_remove_unused_ops(tokenize.tokenize(stream.readline)))
+            tokens = list(_remove_unused_ops(
+                token._replace(string=token.string.replace(prefix, ''))
+                for token in tokenize.tokenize(stream.readline)))
         except tokenize.TokenError:
             raise syntax_error('Missing closing/opening brackets or parenthesis', (0, len(format_spec)))
 
@@ -2586,6 +2607,9 @@ def _fill_common_fields(self, info_dict, final=True):
                     upload_date = datetime.datetime.fromtimestamp(info_dict[ts_key], datetime.timezone.utc)
                     info_dict[date_key] = upload_date.strftime('%Y%m%d')
 
+        if not info_dict.get('release_year'):
+            info_dict['release_year'] = traverse_obj(info_dict, ('release_date', {lambda x: int(x[:4])}))
+
         live_keys = ('is_live', 'was_live')
         live_status = info_dict.get('live_status')
         if live_status is None:
@@ -3928,8 +3952,7 @@ def get_encoding(stream):
         klass = type(self)
         write_debug(join_nonempty(
             f'{REPOSITORY.rpartition("/")[2]} version',
-            f'{CHANNEL.rpartition("@")[2]}@{__version__}',
-            not ORIGIN.startswith('yt-dlp/') and f'from {ORIGIN}',
+            _make_label(ORIGIN, CHANNEL.partition('@')[2] or __version__, __version__),
             f'[{RELEASE_GIT_HEAD[:9]}]' if RELEASE_GIT_HEAD else '',
             '' if source == 'unknown' else f'({source})',
             '' if _IN_CLI else 'API' if klass == YoutubeDL else f'API:{self.__module__}.{klass.__qualname__}',
@@ -4053,6 +4076,7 @@ def urlopen(self, req):
             return self._request_director.send(req)
         except NoSupportingHandlers as e:
             for ue in e.unsupported_errors:
+                # FIXME: This depends on the order of errors.
                 if not (ue.handler and ue.msg):
                     continue
                 if ue.handler.RH_KEY == 'Urllib' and 'unsupported url scheme: "file"' in ue.msg.lower():
@@ -4062,6 +4086,15 @@ def urlopen(self, req):
                 if 'unsupported proxy type: "https"' in ue.msg.lower():
                     raise RequestError(
                         'To use an HTTPS proxy for this request, one of the following dependencies needs to be installed: requests')
+
+                elif (
+                    re.match(r'unsupported url scheme: "wss?"', ue.msg.lower())
+                    and 'websockets' not in self._request_director.handlers
+                ):
+                    raise RequestError(
+                        'This request requires WebSocket support. '
+                        'Ensure one of the following dependencies are installed: websockets',
+                        cause=ue) from ue
             raise
         except SSLError as e:
             if 'UNSAFE_LEGACY_RENEGOTIATION_DISABLED' in str(e):