NoSupportingHandlers,
RequestError,
SSLError,
- _CompatHTTPError,
network_exceptions,
)
+from .networking.impersonate import ImpersonateRequestHandler
from .plugins import directories as plugin_directories
from .postprocessor import _PLUGIN_CLASSES as plugin_pps
from .postprocessor import (
get_postprocessor,
)
from .postprocessor.ffmpeg import resolve_mapping as resolve_recode_mapping
-from .update import REPOSITORY, _get_system_deprecation, current_git_head, detect_variant
+from .update import (
+ REPOSITORY,
+ _get_system_deprecation,
+ _make_label,
+ current_git_head,
+ detect_variant,
+)
from .utils import (
DEFAULT_OUTTMPL,
IDENTITY,
SameFileError,
UnavailableVideoError,
UserNotLive,
+ YoutubeDLError,
age_restricted,
args_to_str,
bug_reports_message,
clean_proxies,
std_headers,
)
-from .version import CHANNEL, RELEASE_GIT_HEAD, VARIANT, __version__
+from .version import CHANNEL, ORIGIN, RELEASE_GIT_HEAD, VARIANT, __version__
if compat_os_name == 'nt':
import ctypes
- "detect_or_warn": check whether we can do anything
about it, warn otherwise (default)
source_address: Client-side IP address to bind to.
+ impersonate: Client to impersonate for requests.
+ An ImpersonateTarget (from yt_dlp.networking.impersonate)
sleep_interval_requests: Number of seconds to sleep between requests
during extraction
sleep_interval: Number of seconds to sleep before each download when
'url', 'manifest_url', 'manifest_stream_number', 'ext', 'format', 'format_id', 'format_note',
'width', 'height', 'aspect_ratio', 'resolution', 'dynamic_range', 'tbr', 'abr', 'acodec', 'asr', 'audio_channels',
'vbr', 'fps', 'vcodec', 'container', 'filesize', 'filesize_approx', 'rows', 'columns',
- 'player_url', 'protocol', 'fragment_base_url', 'fragments', 'is_from_start',
+ 'player_url', 'protocol', 'fragment_base_url', 'fragments', 'is_from_start', 'is_dash_periods', 'request_data',
'preference', 'language', 'language_preference', 'quality', 'source_preference', 'cookies',
'http_headers', 'stretched_ratio', 'no_resume', 'has_drm', 'extra_param_to_segment_url', 'hls_aes', 'downloader_options',
'page_url', 'app', 'play_path', 'tc_url', 'flash_version', 'rtmp_live', 'rtmp_conn', 'rtmp_protocol', 'rtmp_real_time'
}
+ _deprecated_multivalue_fields = {
+ 'album_artist': 'album_artists',
+ 'artist': 'artists',
+ 'composer': 'composers',
+ 'creator': 'creators',
+ 'genre': 'genres',
+ }
_format_selection_exts = {
'audio': set(MEDIA_EXTENSIONS.common_audio),
'video': set(MEDIA_EXTENSIONS.common_video + ('3gp', )),
'Overwriting params from "color" with "no_color"')
self.params['color'] = 'no_color'
- term_allow_color = os.environ.get('TERM', '').lower() != 'dumb'
+ term_allow_color = os.getenv('TERM', '').lower() != 'dumb'
+ no_color = bool(os.getenv('NO_COLOR'))
def process_color_policy(stream):
stream_name = {sys.stdout: 'stdout', sys.stderr: 'stderr'}[stream]
policy = traverse_obj(self.params, ('color', (stream_name, None), {str}), get_all=False)
if policy in ('auto', None):
- return term_allow_color and supports_terminal_sequences(stream)
+ if term_allow_color and supports_terminal_sequences(stream):
+ return 'no_color' if no_color else True
+ return False
assert policy in ('always', 'never', 'no_color'), policy
return {'always': True, 'never': False}.get(policy, policy)
self.params['http_headers'] = HTTPHeaderDict(std_headers, self.params.get('http_headers'))
self._load_cookies(self.params['http_headers'].get('Cookie')) # compat
self.params['http_headers'].pop('Cookie', None)
- self._request_director = self.build_request_director(_REQUEST_HANDLERS.values(), _RH_PREFERENCES)
if auto_init and auto_init != 'no_verbose_header':
self.print_debug_header()
for msg in self.params.get('_deprecation_warnings', []):
self.deprecated_feature(msg)
+ if impersonate_target := self.params.get('impersonate'):
+ if not self._impersonate_target_available(impersonate_target):
+ raise YoutubeDLError(
+ f'Impersonate target "{impersonate_target}" is not available. '
+ f'Use --list-impersonate-targets to see available targets. '
+ f'You may be missing dependencies required to support this target.')
+
if 'list-formats' in self.params['compat_opts']:
self.params['listformats_table'] = False
def close(self):
self.save_cookies()
- self._request_director.close()
+ if '_request_director' in self.__dict__:
+ self._request_director.close()
+ del self._request_director
def trouble(self, message=None, tb=None, is_error=True):
"""Determine action to take when a download problem appears.
MATH_FUNCTIONS = {
'+': float.__add__,
'-': float.__sub__,
+ '*': float.__mul__,
}
# Field is of the form key1.key2...
# where keys (except first) can be string, int, slice or "{field, ...}"
(?:\|(?P<default>.*?))?
)$''')
+ def _from_user_input(field):
+ if field == ':':
+ return ...
+ elif ':' in field:
+ return slice(*map(int_or_none, field.split(':')))
+ elif int_or_none(field) is not None:
+ return int(field)
+ return field
+
def _traverse_infodict(fields):
fields = [f for x in re.split(r'\.({.+?})\.?', fields)
for f in ([x] if x.startswith('{') else x.split('.'))]
for i, f in enumerate(fields):
if not f.startswith('{'):
+ fields[i] = _from_user_input(f)
continue
assert f.endswith('}'), f'No closing brace for {f} in {fields}'
- fields[i] = {k: k.split('.') for k in f[1:-1].split(',')}
+ fields[i] = {k: list(map(_from_user_input, k.split('.'))) for k in f[1:-1].split(',')}
- return traverse_obj(info_dict, fields, is_user_input=True, traverse_string=True)
+ return traverse_obj(info_dict, fields, traverse_string=True)
def get_value(mdict):
# Object traversal
selectors = []
current_selector = None
for type, string_, start, _, _ in tokens:
- # ENCODING is only defined in python 3.x
+ # ENCODING is only defined in Python 3.x
if type == getattr(tokenize, 'ENCODING', None):
continue
elif type in [tokenize.NAME, tokenize.NUMBER]:
# for extractors with incomplete formats (audio only (soundcloud)
# or video only (imgur)) best/worst will fallback to
# best/worst {video,audio}-only format
- matches = formats
+ matches = list(filter(lambda f: f.get('vcodec') != 'none' or f.get('acodec') != 'none', formats))
elif seperate_fallback and not ctx['has_merged_format']:
# for compatibility with youtube-dl when there is no pre-merged format
matches = list(filter(seperate_fallback, formats))
return selector_function(ctx_copy)
return final_selector
- stream = io.BytesIO(format_spec.encode())
+ # HACK: Python 3.12 changed the underlying parser, rendering '7_a' invalid
+ # Prefix numbers with random letters to avoid it being classified as a number
+ # See: https://github.com/yt-dlp/yt-dlp/pulls/8797
+ # TODO: Implement parser not reliant on tokenize.tokenize
+ prefix = ''.join(random.choices(string.ascii_letters, k=32))
+ stream = io.BytesIO(re.sub(r'\d[_\d]*', rf'{prefix}\g<0>', format_spec).encode())
try:
- tokens = list(_remove_unused_ops(tokenize.tokenize(stream.readline)))
+ tokens = list(_remove_unused_ops(
+ token._replace(string=token.string.replace(prefix, ''))
+ for token in tokenize.tokenize(stream.readline)))
except tokenize.TokenError:
raise syntax_error('Missing closing/opening brackets or parenthesis', (0, len(format_spec)))
upload_date = datetime.datetime.fromtimestamp(info_dict[ts_key], datetime.timezone.utc)
info_dict[date_key] = upload_date.strftime('%Y%m%d')
+ if not info_dict.get('release_year'):
+ info_dict['release_year'] = traverse_obj(info_dict, ('release_date', {lambda x: int(x[:4])}))
+
live_keys = ('is_live', 'was_live')
live_status = info_dict.get('live_status')
if live_status is None:
if final and info_dict.get('%s_number' % field) is not None and not info_dict.get(field):
info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field])
+ for old_key, new_key in self._deprecated_multivalue_fields.items():
+ if new_key in info_dict and old_key in info_dict:
+ if '_version' not in info_dict: # HACK: Do not warn when using --load-info-json
+ self.deprecation_warning(f'Do not return {old_key!r} when {new_key!r} is present')
+ elif old_value := info_dict.get(old_key):
+ info_dict[new_key] = old_value.split(', ')
+ elif new_value := info_dict.get(new_key):
+ info_dict[old_key] = ', '.join(v.replace(',', '\N{FULLWIDTH COMMA}') for v in new_value)
+
def _raise_pending_errors(self, info):
err = info.pop('__pending_error', None)
if err:
or info_dict.get('is_live') and self.params.get('hls_use_mpegts') is None,
'Possible MPEG-TS in MP4 container or malformed AAC timestamps',
FFmpegFixupM3u8PP)
- ffmpeg_fixup(info_dict.get('is_live') and downloader == 'dashsegments',
+ ffmpeg_fixup(downloader == 'dashsegments'
+ and (info_dict.get('is_live') or info_dict.get('is_dash_periods')),
'Possible duplicate MOOV atoms', FFmpegFixupDuplicateMoovPP)
ffmpeg_fixup(downloader == 'web_socket_fragment', 'Malformed timestamps detected', FFmpegFixupTimestampPP)
raise
self.report_warning(f'The info failed to download: {e}; trying with URL {webpage_url}')
self.download([webpage_url])
+ except ExtractorError as e:
+ self.report_error(e)
return self._download_retcode
@staticmethod
'version': __version__,
'current_git_head': current_git_head(),
'release_git_head': RELEASE_GIT_HEAD,
- 'repository': REPOSITORY,
+ 'repository': ORIGIN,
})
if remove_private_keys:
source += '*'
klass = type(self)
write_debug(join_nonempty(
- f'{"yt-dlp" if REPOSITORY == "yt-dlp/yt-dlp" else REPOSITORY} version',
- f'{CHANNEL}@{__version__}',
+ f'{REPOSITORY.rpartition("/")[2]} version',
+ _make_label(ORIGIN, CHANNEL.partition('@')[2] or __version__, __version__),
f'[{RELEASE_GIT_HEAD[:9]}]' if RELEASE_GIT_HEAD else '',
'' if source == 'unknown' else f'({source})',
'' if _IN_CLI else 'API' if klass == YoutubeDL else f'API:{self.__module__}.{klass.__qualname__}',
handler = self._request_director.handlers['Urllib']
return handler._get_instance(cookiejar=self.cookiejar, proxies=self.proxies)
+ def _get_available_impersonate_targets(self):
+ # todo(future): make available as public API
+ return [
+ (target, rh.RH_NAME)
+ for rh in self._request_director.handlers.values()
+ if isinstance(rh, ImpersonateRequestHandler)
+ for target in rh.supported_targets
+ ]
+
+ def _impersonate_target_available(self, target):
+ # todo(future): make available as public API
+ return any(
+ rh.is_supported_target(target)
+ for rh in self._request_director.handlers.values()
+ if isinstance(rh, ImpersonateRequestHandler))
+
def urlopen(self, req):
""" Start an HTTP download """
if isinstance(req, str):
return self._request_director.send(req)
except NoSupportingHandlers as e:
for ue in e.unsupported_errors:
+ # FIXME: This depends on the order of errors.
if not (ue.handler and ue.msg):
continue
if ue.handler.RH_KEY == 'Urllib' and 'unsupported url scheme: "file"' in ue.msg.lower():
raise RequestError(
'file:// URLs are disabled by default in yt-dlp for security reasons. '
'Use --enable-file-urls to enable at your own risk.', cause=ue) from ue
- if 'unsupported proxy type: "https"' in ue.msg.lower():
+ if (
+ 'unsupported proxy type: "https"' in ue.msg.lower()
+ and 'requests' not in self._request_director.handlers
+ and 'curl_cffi' not in self._request_director.handlers
+ ):
+ raise RequestError(
+ 'To use an HTTPS proxy for this request, one of the following dependencies needs to be installed: requests, curl_cffi')
+
+ elif (
+ re.match(r'unsupported url scheme: "wss?"', ue.msg.lower())
+ and 'websockets' not in self._request_director.handlers
+ ):
raise RequestError(
- 'To use an HTTPS proxy for this request, one of the following dependencies needs to be installed: requests')
+ 'This request requires WebSocket support. '
+ 'Ensure one of the following dependencies are installed: websockets',
+ cause=ue) from ue
+
+ elif re.match(r'unsupported (?:extensions: impersonate|impersonate target)', ue.msg.lower()):
+ raise RequestError(
+ f'Impersonate target "{req.extensions["impersonate"]}" is not available.'
+ f' See --list-impersonate-targets for available targets.'
+ f' This request requires browser impersonation, however you may be missing dependencies'
+ f' required to support this target.')
raise
except SSLError as e:
if 'UNSAFE_LEGACY_RENEGOTIATION_DISABLED' in str(e):
'SSLV3_ALERT_HANDSHAKE_FAILURE: The server may not support the current cipher list. '
'Try using --legacy-server-connect', cause=e) from e
raise
- except HTTPError as e: # TODO: Remove in a future release
- raise _CompatHTTPError(e) from e
def build_request_director(self, handlers, preferences=None):
logger = _YDLLogger(self)
'timeout': 'socket_timeout',
'legacy_ssl_support': 'legacyserverconnect',
'enable_file_urls': 'enable_file_urls',
+ 'impersonate': 'impersonate',
'client_cert': {
'client_certificate': 'client_certificate',
'client_certificate_key': 'client_certificate_key',
director.preferences.add(lambda rh, _: 500 if rh.RH_KEY == 'Urllib' else 0)
return director
+ @functools.cached_property
+ def _request_director(self):
+ return self.build_request_director(_REQUEST_HANDLERS.values(), _RH_PREFERENCES)
+
def encode(self, s):
if isinstance(s, bytes):
return s # Already encoded