-try:
- import contextvars # noqa: F401
-except Exception:
- raise Exception(
- f'You are using an unsupported version of Python. Only Python versions 3.7 and above are supported by yt-dlp') # noqa: F541
+import sys
+
+if sys.version_info < (3, 8):
+ raise ImportError(
+ f'You are using an unsupported version of Python. Only Python versions 3.8 and above are supported by yt-dlp') # noqa: F541
-__license__ = 'Public Domain'
+__license__ = 'The Unlicense'
import collections
import getpass
import optparse
import os
import re
-import sys
import traceback
-from .compat import compat_shlex_quote
+from .compat import compat_os_name
from .cookies import SUPPORTED_BROWSERS, SUPPORTED_KEYRINGS
from .downloader.external import get_external_downloader
from .extractor import list_extractor_classes
from .extractor.adobepass import MSO_INFO
+from .networking.impersonate import ImpersonateTarget
from .options import parseOpts
from .postprocessor import (
FFmpegExtractAudioPP,
float_or_none,
format_field,
int_or_none,
+ join_nonempty,
match_filter_func,
parse_bytes,
parse_duration,
read_stdin,
render_table,
setproctitle,
- std_headers,
+ shell_quote,
traverse_obj,
variadic,
write_string,
)
+from .utils.networking import std_headers
+from .utils._utils import _UnsafeExtensionError
from .YoutubeDL import YoutubeDL
_IN_CLI = False
def get_urls(urls, batchfile, verbose):
- # Batch file verification
+ """
+ @param verbose -1: quiet, 0: normal, 1: verbose
+ """
batch_urls = []
if batchfile is not None:
try:
batch_urls = read_batch_urls(
- read_stdin('URLs') if batchfile == '-'
+ read_stdin(None if verbose == -1 else 'URLs') if batchfile == '-'
else open(expand_path(batchfile), encoding='utf-8', errors='ignore'))
- if verbose:
+ if verbose == 1:
write_string('[debug] Batch file urls: ' + repr(batch_urls) + '\n')
except OSError:
_exit(f'ERROR: batch file {batchfile} could not be read')
ie.description(markdown=False, search_examples=_SEARCHES)
for ie in list_extractor_classes(opts.age_limit) if ie.working() and ie.IE_DESC is not False)
elif opts.ap_list_mso:
- out = 'Supported TV Providers:\n%s\n' % render_table(
+ out = 'Supported TV Providers:\n{}\n'.format(render_table(
['mso', 'mso name'],
- [[mso_id, mso_info['name']] for mso_id, mso_info in MSO_INFO.items()])
+ [[mso_id, mso_info['name']] for mso_id, mso_info in MSO_INFO.items()]))
else:
return False
write_string(out, out=sys.stdout)
if name not in opts.compat_opts:
return False
opts.compat_opts.discard(name)
- opts.compat_opts.update(['*%s' % name])
+ opts.compat_opts.update([f'*{name}'])
return True
def set_default_compat(compat_name, opt_name, default=True, remove_compat=True):
raise ValueError(f'{max_name} "{max_val}" must be must be greater than or equal to {min_name} "{min_val}"')
# Usernames and passwords
- validate(not opts.usenetrc or (opts.username is None and opts.password is None),
- '.netrc', msg='using {name} conflicts with giving username/password')
+ validate(sum(map(bool, (opts.usenetrc, opts.netrc_cmd, opts.username))) <= 1, '.netrc',
+ msg='{name}, netrc command and username/password are mutually exclusive options')
validate(opts.password is None or opts.username is not None, 'account username', msg='{name} missing')
validate(opts.ap_password is None or opts.ap_username is not None,
'TV Provider account username', msg='{name} missing')
validate_minmax(opts.sleep_interval, opts.max_sleep_interval, 'sleep interval')
if opts.wait_for_video is not None:
- min_wait, max_wait, *_ = map(parse_duration, opts.wait_for_video.split('-', 1) + [None])
+ min_wait, max_wait, *_ = map(parse_duration, [*opts.wait_for_video.split('-', 1), None])
validate(min_wait is not None and not (max_wait is None and '-' in opts.wait_for_video),
'time range to wait for video', opts.wait_for_video)
validate_minmax(min_wait, max_wait, 'time range to wait for video')
# Retry sleep function
def parse_sleep_func(expr):
NUMBER_RE = r'\d+(?:\.\d+)?'
- op, start, limit, step, *_ = tuple(re.fullmatch(
+ op, start, limit, step, *_ = (*tuple(re.fullmatch(
rf'(?:(linear|exp)=)?({NUMBER_RE})(?::({NUMBER_RE})?)?(?::({NUMBER_RE}))?',
- expr.strip()).groups()) + (None, None)
+ expr.strip()).groups()), None, None)
if op == 'exp':
return lambda n: min(float(start) * (float(step or 2) ** n), float(limit or 'inf'))
opts.skip_download = None
del opts.outtmpl['default']
- def parse_chapters(name, value):
- chapters, ranges = [], []
+ def parse_chapters(name, value, advanced=False):
parse_timestamp = lambda x: float('inf') if x in ('inf', 'infinite') else parse_duration(x)
+ TIMESTAMP_RE = r'''(?x)(?:
+ (?P<start_sign>-?)(?P<start>[^-]+)
+ )?\s*-\s*(?:
+ (?P<end_sign>-?)(?P<end>[^-]+)
+ )?'''
+
+ chapters, ranges, from_url = [], [], False
for regex in value or []:
- if regex.startswith('*'):
- for range_ in map(str.strip, regex[1:].split(',')):
- mobj = range_ != '-' and re.fullmatch(r'([^-]+)?\s*-\s*([^-]+)?', range_)
- dur = mobj and (parse_timestamp(mobj.group(1) or '0'), parse_timestamp(mobj.group(2) or 'inf'))
- if None in (dur or [None]):
- raise ValueError(f'invalid {name} time range "{regex}". Must be of the form "*start-end"')
- ranges.append(dur)
+ if advanced and regex == '*from-url':
+ from_url = True
+ continue
+ elif not regex.startswith('*'):
+ try:
+ chapters.append(re.compile(regex))
+ except re.error as err:
+ raise ValueError(f'invalid {name} regex "{regex}" - {err}')
continue
- try:
- chapters.append(re.compile(regex))
- except re.error as err:
- raise ValueError(f'invalid {name} regex "{regex}" - {err}')
- return chapters, ranges
- opts.remove_chapters, opts.remove_ranges = parse_chapters('--remove-chapters', opts.remove_chapters)
- opts.download_ranges = download_range_func(*parse_chapters('--download-sections', opts.download_ranges))
+ for range_ in map(str.strip, regex[1:].split(',')):
+ mobj = range_ != '-' and re.fullmatch(TIMESTAMP_RE, range_)
+ dur = mobj and [parse_timestamp(mobj.group('start') or '0'), parse_timestamp(mobj.group('end') or 'inf')]
+ signs = mobj and (mobj.group('start_sign'), mobj.group('end_sign'))
+
+ err = None
+ if None in (dur or [None]):
+ err = 'Must be of the form "*start-end"'
+ elif not advanced and any(signs):
+ err = 'Negative timestamps are not allowed'
+ else:
+ dur[0] *= -1 if signs[0] else 1
+ dur[1] *= -1 if signs[1] else 1
+ if dur[1] == float('-inf'):
+ err = '"-inf" is not a valid end'
+ if err:
+ raise ValueError(f'invalid {name} time range "{regex}". {err}')
+ ranges.append(dur)
+
+ return chapters, ranges, from_url
+
+ opts.remove_chapters, opts.remove_ranges, _ = parse_chapters('--remove-chapters', opts.remove_chapters)
+ opts.download_ranges = download_range_func(*parse_chapters('--download-sections', opts.download_ranges, True))
# Cookies from browser
if opts.cookiesfrombrowser:
f'Supported keyrings are: {", ".join(sorted(SUPPORTED_KEYRINGS))}')
opts.cookiesfrombrowser = (browser_name, profile, keyring, container)
+ if opts.impersonate is not None:
+ opts.impersonate = ImpersonateTarget.from_str(opts.impersonate.lower())
+
# MetadataParser
def metadataparser_actions(f):
if isinstance(f, str):
- cmd = '--parse-metadata %s' % compat_shlex_quote(f)
+ cmd = f'--parse-metadata {shell_quote(f)}'
try:
actions = [MetadataFromFieldPP.to_action(f)]
except Exception as err:
raise ValueError(f'{cmd} is invalid; {err}')
else:
- cmd = '--replace-in-metadata %s' % ' '.join(map(compat_shlex_quote, f))
+ cmd = f'--replace-in-metadata {shell_quote(f)}'
actions = ((MetadataParserPP.Actions.REPLACE, x, *f[1:]) for x in f[0].split(','))
for action in actions:
yield action
if opts.metafromtitle is not None:
- opts.parse_metadata.setdefault('pre_process', []).append('title:%s' % opts.metafromtitle)
+ opts.parse_metadata.setdefault('pre_process', []).append(f'title:{opts.metafromtitle}')
opts.parse_metadata = {
k: list(itertools.chain(*map(metadataparser_actions, v)))
for k, v in opts.parse_metadata.items()
elif ed and proto == 'default':
default_downloader = ed.get_basename()
+ for policy in opts.color.values():
+ if policy not in ('always', 'auto', 'no_color', 'never'):
+ raise ValueError(f'"{policy}" is not a valid color policy')
+
warnings, deprecation_warnings = [], []
# Common mistake: -f best
if opts.ap_username is not None and opts.ap_password is None:
opts.ap_password = getpass.getpass('Type TV provider account password and press [Return]: ')
+ # compat option changes global state destructively; only allow from cli
+ if 'allow-unsafe-ext' in opts.compat_opts:
+ warnings.append(
+ 'Using allow-unsafe-ext opens you up to potential attacks. '
+ 'Use with great care!')
+ _UnsafeExtensionError.sanitize_extension = lambda x, prepend=False: x
+
return warnings, deprecation_warnings
yield {
'key': 'MetadataParser',
'actions': actions,
- 'when': when
+ 'when': when,
}
sponsorblock_query = opts.sponsorblock_mark | opts.sponsorblock_remove
if sponsorblock_query:
'key': 'SponsorBlock',
'categories': sponsorblock_query,
'api': opts.sponsorblock_api,
- 'when': 'after_filter'
+ 'when': 'after_filter',
}
if opts.convertsubtitles:
yield {
'key': 'FFmpegSubtitlesConvertor',
'format': opts.convertsubtitles,
- 'when': 'before_dl'
+ 'when': 'before_dl',
}
if opts.convertthumbnails:
yield {
'key': 'FFmpegThumbnailsConvertor',
'format': opts.convertthumbnails,
- 'when': 'before_dl'
+ 'when': 'before_dl',
}
if opts.extractaudio:
yield {
yield {
'key': 'FFmpegEmbedSubtitle',
# already_have_subtitle = True prevents the file from being deleted after embedding
- 'already_have_subtitle': opts.writesubtitles and keep_subs
+ 'already_have_subtitle': opts.writesubtitles and keep_subs,
}
if not opts.writeautomaticsub and keep_subs:
opts.writesubtitles = True
'remove_sponsor_segments': opts.sponsorblock_remove,
'remove_ranges': opts.remove_ranges,
'sponsorblock_chapter_title': opts.sponsorblock_chapter_title,
- 'force_keyframes': opts.force_keyframes_at_cuts
+ 'force_keyframes': opts.force_keyframes_at_cuts,
}
# FFmpegMetadataPP should be run after FFmpegVideoConvertorPP and
# FFmpegExtractAudioPP as containers before conversion may not support
yield {
'key': 'EmbedThumbnail',
# already_have_thumbnail = True prevents the file from being deleted after embedding
- 'already_have_thumbnail': opts.writethumbnail
+ 'already_have_thumbnail': opts.writethumbnail,
}
if not opts.writethumbnail:
opts.writethumbnail = True
def parse_options(argv=None):
"""@returns ParsedOptions(parser, opts, urls, ydl_opts)"""
parser, opts, urls = parseOpts(argv)
- urls = get_urls(urls, opts.batchfile, opts.verbose)
+ urls = get_urls(urls, opts.batchfile, -1 if opts.quiet and not opts.verbose else opts.verbose)
set_compat_opts(opts)
try:
print_only = bool(opts.forceprint) and all(k not in opts.forceprint for k in POSTPROCESS_WHEN[3:])
any_getting = any(getattr(opts, k) for k in (
'dumpjson', 'dump_single_json', 'getdescription', 'getduration', 'getfilename',
- 'getformat', 'getid', 'getthumbnail', 'gettitle', 'geturl'
+ 'getformat', 'getid', 'getthumbnail', 'gettitle', 'geturl',
))
if opts.quiet is None:
opts.quiet = any_getting or opts.print_json or bool(opts.forceprint)
return ParsedOptions(parser, opts, urls, {
'usenetrc': opts.usenetrc,
'netrc_location': opts.netrc_location,
+ 'netrc_cmd': opts.netrc_cmd,
'username': opts.username,
'password': opts.password,
'twofactor': opts.twofactor,
'noprogress': opts.quiet if opts.noprogress is None else opts.noprogress,
'progress_with_newline': opts.progress_with_newline,
'progress_template': opts.progress_template,
+ 'progress_delta': opts.progress_delta,
'playliststart': opts.playliststart,
'playlistend': opts.playlistend,
'playlistreverse': opts.playlist_reverse,
'postprocessors': postprocessors,
'fixup': opts.fixup,
'source_address': opts.source_address,
+ 'impersonate': opts.impersonate,
'call_home': opts.call_home,
'sleep_interval_requests': opts.sleep_interval_requests,
'sleep_interval': opts.sleep_interval,
'playlist_items': opts.playlist_items,
'xattr_set_filesize': opts.xattr_set_filesize,
'match_filter': opts.match_filter,
- 'no_color': opts.no_color,
+ 'color': opts.color,
'ffmpeg_location': opts.ffmpeg_location,
'hls_prefer_native': opts.hls_prefer_native,
'hls_use_mpegts': opts.hls_use_mpegts,
ydl.cache.remove()
try:
- updater = Updater(ydl, opts.update_self if isinstance(opts.update_self, str) else None)
+ updater = Updater(ydl, opts.update_self)
if opts.update_self and updater.update() and actual_use:
if updater.cmd:
return updater.restart()
traceback.print_exc()
ydl._download_retcode = 100
+ if opts.list_impersonate_targets:
+
+ known_targets = [
+ # List of simplified targets we know are supported,
+ # to help users know what dependencies may be required.
+ (ImpersonateTarget('chrome'), 'curl_cffi'),
+ (ImpersonateTarget('edge'), 'curl_cffi'),
+ (ImpersonateTarget('safari'), 'curl_cffi'),
+ ]
+
+ available_targets = ydl._get_available_impersonate_targets()
+
+ def make_row(target, handler):
+ return [
+ join_nonempty(target.client.title(), target.version, delim='-') or '-',
+ join_nonempty((target.os or '').title(), target.os_version, delim='-') or '-',
+ handler,
+ ]
+
+ rows = [make_row(target, handler) for target, handler in available_targets]
+
+ for known_target, known_handler in known_targets:
+ if not any(
+ known_target in target and handler == known_handler
+ for target, handler in available_targets
+ ):
+ rows.append([
+ ydl._format_out(text, ydl.Styles.SUPPRESS)
+ for text in make_row(known_target, f'{known_handler} (not available)')
+ ])
+
+ ydl.to_screen('[info] Available impersonate targets')
+ ydl.to_stdout(render_table(['Client', 'OS', 'Source'], rows, extra_gap=2, delim='-'))
+ return
+
if not actual_use:
if pre_process:
return ydl._download_retcode
- ydl.warn_if_short_id(sys.argv[1:] if argv is None else argv)
+ args = sys.argv[1:] if argv is None else argv
+ ydl.warn_if_short_id(args)
+
+ # Show a useful error message and wait for keypress if not launched from shell on Windows
+ if not args and compat_os_name == 'nt' and getattr(sys, 'frozen', False):
+ import ctypes.wintypes
+ import msvcrt
+
+ kernel32 = ctypes.WinDLL('Kernel32')
+
+ buffer = (1 * ctypes.wintypes.DWORD)()
+ attached_processes = kernel32.GetConsoleProcessList(buffer, 1)
+ # If we only have a single process attached, then the executable was double clicked
+ # When using `pyinstaller` with `--onefile`, two processes get attached
+ is_onefile = hasattr(sys, '_MEIPASS') and os.path.basename(sys._MEIPASS).startswith('_MEI')
+ if attached_processes == 1 or is_onefile and attached_processes == 2:
+ print(parser._generate_error_message(
+ 'Do not double-click the executable, instead call it from a command line.\n'
+ 'Please read the README for further information on how to use yt-dlp: '
+ 'https://github.com/yt-dlp/yt-dlp#readme'))
+ msvcrt.getch()
+ _exit(2)
parser.error(
'You must provide at least one URL.\n'
'Type yt-dlp --help to see a list of all options.')