get_postprocessor,
)
from .postprocessor.ffmpeg import resolve_mapping as resolve_recode_mapping
-from .update import REPOSITORY, _get_system_deprecation, current_git_head, detect_variant
+from .update import REPOSITORY, _get_system_deprecation, _make_label, current_git_head, detect_variant
from .utils import (
DEFAULT_OUTTMPL,
IDENTITY,
clean_proxies,
std_headers,
)
-from .version import CHANNEL, RELEASE_GIT_HEAD, VARIANT, __version__
+from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
if compat_os_name == 'nt':
import ctypes
'Overwriting params from "color" with "no_color"')
self.params['color'] = 'no_color'
- term_allow_color = os.environ.get('TERM', '').lower() != 'dumb'
+ term_allow_color = os.getenv('TERM', '').lower() != 'dumb'
+ no_color = bool(os.getenv('NO_COLOR'))
def process_color_policy(stream):
stream_name = {sys.stdout: 'stdout', sys.stderr: 'stderr'}[stream]
policy = traverse_obj(self.params, ('color', (stream_name, None), {str}), get_all=False)
if policy in ('auto', None):
- return term_allow_color and supports_terminal_sequences(stream)
+ if term_allow_color and supports_terminal_sequences(stream):
+ return 'no_color' if no_color else True
+ return False
assert policy in ('always', 'never', 'no_color'), policy
return {'always': True, 'never': False}.get(policy, policy)
MATH_FUNCTIONS = {
'+': float.__add__,
'-': float.__sub__,
+ '*': float.__mul__,
}
# Field is of the form key1.key2...
# where keys (except first) can be string, int, slice or "{field, ...}"
(?:\|(?P<default>.*?))?
)$''')
+ def _from_user_input(field):
+ if field == ':':
+ return ...
+ elif ':' in field:
+ return slice(*map(int_or_none, field.split(':')))
+ elif int_or_none(field) is not None:
+ return int(field)
+ return field
+
def _traverse_infodict(fields):
fields = [f for x in re.split(r'\.({.+?})\.?', fields)
for f in ([x] if x.startswith('{') else x.split('.'))]
for i, f in enumerate(fields):
if not f.startswith('{'):
+ fields[i] = _from_user_input(f)
continue
assert f.endswith('}'), f'No closing brace for {f} in {fields}'
- fields[i] = {k: k.split('.') for k in f[1:-1].split(',')}
+ fields[i] = {k: list(map(_from_user_input, k.split('.'))) for k in f[1:-1].split(',')}
- return traverse_obj(info_dict, fields, is_user_input=True, traverse_string=True)
+ return traverse_obj(info_dict, fields, traverse_string=True)
def get_value(mdict):
# Object traversal
return
for f in formats:
- if f.get('has_drm'):
+ if f.get('has_drm') or f.get('__needs_testing'):
yield from self._check_formats([f])
else:
yield f
return selector_function(ctx_copy)
return final_selector
- stream = io.BytesIO(format_spec.encode())
+ # HACK: Python 3.12 changed the underlying parser, rendering '7_a' invalid
+ # Prefix numbers with random letters to avoid it being classified as a number
+ # See: https://github.com/yt-dlp/yt-dlp/pulls/8797
+ # TODO: Implement parser not reliant on tokenize.tokenize
+ prefix = ''.join(random.choices(string.ascii_letters, k=32))
+ stream = io.BytesIO(re.sub(r'\d[_\d]*', rf'{prefix}\g<0>', format_spec).encode())
try:
- tokens = list(_remove_unused_ops(tokenize.tokenize(stream.readline)))
+ tokens = list(_remove_unused_ops(
+ token._replace(string=token.string.replace(prefix, ''))
+ for token in tokenize.tokenize(stream.readline)))
except tokenize.TokenError:
raise syntax_error('Missing closing/opening brackets or parenthesis', (0, len(format_spec)))
upload_date = datetime.datetime.fromtimestamp(info_dict[ts_key], datetime.timezone.utc)
info_dict[date_key] = upload_date.strftime('%Y%m%d')
+ if not info_dict.get('release_year'):
+ info_dict['release_year'] = traverse_obj(info_dict, ('release_date', {lambda x: int(x[:4])}))
+
live_keys = ('is_live', 'was_live')
live_status = info_dict.get('live_status')
if live_status is None:
format['dynamic_range'] = 'SDR'
if format.get('aspect_ratio') is None:
format['aspect_ratio'] = try_call(lambda: round(format['width'] / format['height'], 2))
- if (not format.get('manifest_url') # For fragmented formats, "tbr" is often max bitrate and not average
+ # For fragmented formats, "tbr" is often max bitrate and not average
+ if (('manifest-filesize-approx' in self.params['compat_opts'] or not format.get('manifest_url'))
and info_dict.get('duration') and format.get('tbr')
and not format.get('filesize') and not format.get('filesize_approx')):
format['filesize_approx'] = int(info_dict['duration'] * format['tbr'] * (1024 / 8))
'version': __version__,
'current_git_head': current_git_head(),
'release_git_head': RELEASE_GIT_HEAD,
- 'repository': REPOSITORY,
+ 'repository': ORIGIN,
})
if remove_private_keys:
reject = lambda k, v: v is None or k.startswith('__') or k in {
'requested_downloads', 'requested_formats', 'requested_subtitles', 'requested_entries',
'entries', 'filepath', '_filename', 'filename', 'infojson_filename', 'original_url',
- 'playlist_autonumber', '_format_sort_fields',
+ 'playlist_autonumber',
}
else:
reject = lambda k, v: False
source += '*'
klass = type(self)
write_debug(join_nonempty(
- f'{"yt-dlp" if REPOSITORY == "yt-dlp/yt-dlp" else REPOSITORY} version',
- f'{CHANNEL}@{__version__}',
+ f'{REPOSITORY.rpartition("/")[2]} version',
+ _make_label(ORIGIN, CHANNEL.partition('@')[2] or __version__, __version__),
f'[{RELEASE_GIT_HEAD[:9]}]' if RELEASE_GIT_HEAD else '',
'' if source == 'unknown' else f'({source})',
'' if _IN_CLI else 'API' if klass == YoutubeDL else f'API:{self.__module__}.{klass.__qualname__}',
return self._request_director.send(req)
except NoSupportingHandlers as e:
for ue in e.unsupported_errors:
+ # FIXME: This depends on the order of errors.
if not (ue.handler and ue.msg):
continue
if ue.handler.RH_KEY == 'Urllib' and 'unsupported url scheme: "file"' in ue.msg.lower():
if 'unsupported proxy type: "https"' in ue.msg.lower():
raise RequestError(
'To use an HTTPS proxy for this request, one of the following dependencies needs to be installed: requests')
+
+ elif (
+ re.match(r'unsupported url scheme: "wss?"', ue.msg.lower())
+ and 'websockets' not in self._request_director.handlers
+ ):
+ raise RequestError(
+ 'This request requires WebSocket support. '
+ 'Ensure one of the following dependencies are installed: websockets',
+ cause=ue) from ue
raise
except SSLError as e:
if 'UNSAFE_LEGACY_RENEGOTIATION_DISABLED' in str(e):