#!/usr/bin/env python3
-# coding: utf-8
-
-from __future__ import absolute_import, unicode_literals
-
import collections
import contextlib
import datetime
import operator
import os
import platform
+import random
import re
import shutil
import subprocess
import time
import tokenize
import traceback
-import random
import unicodedata
-
+import urllib.request
from enum import Enum
from string import ascii_letters
+from .cache import Cache
from .compat import (
- compat_basestring,
compat_brotli,
compat_get_terminal_size,
- compat_kwargs,
- compat_numeric_types,
compat_os_name,
compat_pycrypto_AES,
compat_shlex_quote,
compat_str,
- compat_tokenize_tokenize,
compat_urllib_error,
compat_urllib_request,
- compat_urllib_request_DataHandler,
windows_enable_vt_mode,
)
from .cookies import load_cookies
+from .downloader import FFmpegFD, get_suitable_downloader, shorten_protocol_name
+from .downloader.rtmp import rtmpdump_version
+from .extractor import _LAZY_LOADER
+from .extractor import _PLUGIN_CLASSES as plugin_extractors
+from .extractor import gen_extractor_classes, get_info_extractor
+from .extractor.openload import PhantomJSwrapper
+from .minicurses import format_text
+from .postprocessor import _PLUGIN_CLASSES as plugin_postprocessors
+from .postprocessor import (
+ EmbedThumbnailPP,
+ FFmpegFixupDuplicateMoovPP,
+ FFmpegFixupDurationPP,
+ FFmpegFixupM3u8PP,
+ FFmpegFixupM4aPP,
+ FFmpegFixupStretchedPP,
+ FFmpegFixupTimestampPP,
+ FFmpegMergerPP,
+ FFmpegPostProcessor,
+ MoveFilesAfterDownloadPP,
+ get_postprocessor,
+)
+from .update import detect_variant
from .utils import (
+ DEFAULT_OUTTMPL,
+ LINK_TEMPLATES,
+ NO_DEFAULT,
+ OUTTMPL_TYPES,
+ POSTPROCESS_WHEN,
+ STR_FORMAT_RE_TMPL,
+ STR_FORMAT_TYPES,
+ ContentTooShortError,
+ DateRange,
+ DownloadCancelled,
+ DownloadError,
+ EntryNotInPlaylist,
+ ExistingVideoReached,
+ ExtractorError,
+ GeoRestrictedError,
+ HEADRequest,
+ InAdvancePagedList,
+ ISO3166Utils,
+ LazyList,
+ MaxDownloadsReached,
+ PagedList,
+ PerRequestProxyHandler,
+ Popen,
+ PostProcessingError,
+ ReExtractInfo,
+ RejectedVideoReached,
+ SameFileError,
+ UnavailableVideoError,
+ YoutubeDLCookieProcessor,
+ YoutubeDLHandler,
+ YoutubeDLRedirectHandler,
age_restricted,
args_to_str,
- ContentTooShortError,
date_from_str,
- DateRange,
- DEFAULT_OUTTMPL,
determine_ext,
determine_protocol,
- DownloadCancelled,
- DownloadError,
encode_compat_str,
encodeFilename,
- EntryNotInPlaylist,
error_to_compat_str,
- ExistingVideoReached,
expand_path,
- ExtractorError,
+ filter_dict,
float_or_none,
format_bytes,
- format_field,
format_decimal_suffix,
+ format_field,
formatSeconds,
- GeoRestrictedError,
get_domain,
has_certifi,
- HEADRequest,
- InAdvancePagedList,
int_or_none,
iri_to_uri,
- ISO3166Utils,
join_nonempty,
- LazyList,
- LINK_TEMPLATES,
locked_file,
make_dir,
make_HTTPS_handler,
- MaxDownloadsReached,
merge_headers,
network_exceptions,
number_of_digits,
orderedSet,
- OUTTMPL_TYPES,
- PagedList,
parse_filesize,
- PerRequestProxyHandler,
platform_name,
- Popen,
- POSTPROCESS_WHEN,
- PostProcessingError,
preferredencoding,
prepend_extension,
- ReExtractInfo,
register_socks_protocols,
- RejectedVideoReached,
remove_terminal_sequences,
render_table,
replace_extension,
- SameFileError,
sanitize_filename,
sanitize_path,
sanitize_url,
sanitized_Request,
std_headers,
- STR_FORMAT_RE_TMPL,
- STR_FORMAT_TYPES,
str_or_none,
strftime_or_none,
subtitles_filename,
to_high_limit_path,
traverse_obj,
try_get,
- UnavailableVideoError,
url_basename,
variadic,
version_tuple,
write_json_file,
write_string,
- YoutubeDLCookieProcessor,
- YoutubeDLHandler,
- YoutubeDLRedirectHandler,
-)
-from .cache import Cache
-from .minicurses import format_text
-from .extractor import (
- gen_extractor_classes,
- get_info_extractor,
- _LAZY_LOADER,
- _PLUGIN_CLASSES as plugin_extractors
)
-from .extractor.openload import PhantomJSwrapper
-from .downloader import (
- FFmpegFD,
- get_suitable_downloader,
- shorten_protocol_name
-)
-from .downloader.rtmp import rtmpdump_version
-from .postprocessor import (
- get_postprocessor,
- EmbedThumbnailPP,
- FFmpegFixupDuplicateMoovPP,
- FFmpegFixupDurationPP,
- FFmpegFixupM3u8PP,
- FFmpegFixupM4aPP,
- FFmpegFixupStretchedPP,
- FFmpegFixupTimestampPP,
- FFmpegMergerPP,
- FFmpegPostProcessor,
- MoveFilesAfterDownloadPP,
- _PLUGIN_CLASSES as plugin_postprocessors
-)
-from .update import detect_variant
-from .version import __version__, RELEASE_GIT_HEAD
+from .version import RELEASE_GIT_HEAD, __version__
if compat_os_name == 'nt':
import ctypes
-class YoutubeDL(object):
+class YoutubeDL:
"""YoutubeDL class.
YoutubeDL objects are the ones responsible of downloading the
care about HLS. (only for youtube)
"""
- _NUMERIC_FIELDS = set((
+ _NUMERIC_FIELDS = {
'width', 'height', 'tbr', 'abr', 'asr', 'vbr', 'fps', 'filesize', 'filesize_approx',
'timestamp', 'release_timestamp',
'duration', 'view_count', 'like_count', 'dislike_count', 'repost_count',
'start_time', 'end_time',
'chapter_number', 'season_number', 'episode_number',
'track_number', 'disc_number', 'release_year',
- ))
+ }
_format_fields = {
# NB: Keep in sync with the docstring of extractor/common.py
- 'url', 'manifest_url', 'ext', 'format', 'format_id', 'format_note',
+ 'url', 'manifest_url', 'manifest_stream_number', 'ext', 'format', 'format_id', 'format_note',
'width', 'height', 'resolution', 'dynamic_range', 'tbr', 'abr', 'acodec', 'asr',
'vbr', 'fps', 'vcodec', 'container', 'filesize', 'filesize_approx',
'player_url', 'protocol', 'fragment_base_url', 'fragments', 'is_from_start',
def check_deprecated(param, option, suggestion):
if self.params.get(param) is not None:
- self.report_warning('%s is deprecated. Use %s instead' % (option, suggestion))
+ self.report_warning(f'{option} is deprecated. Use {suggestion} instead')
return True
return False
else:
raise
+ if auto_init:
+ if auto_init != 'no_verbose_header':
+ self.print_debug_header()
+ self.add_default_info_extractors()
+
if (sys.platform != 'win32'
and sys.getfilesystemencoding() in ['ascii', 'ANSI_X3.4-1968']
and not self.params.get('restrictfilenames', False)):
# Set http_headers defaults according to std_headers
self.params['http_headers'] = merge_headers(std_headers, self.params.get('http_headers', {}))
- self._setup_opener()
-
- if auto_init:
- if auto_init != 'no_verbose_header':
- self.print_debug_header()
- self.add_default_info_extractors()
-
hooks = {
'post_hooks': self.add_post_hook,
'progress_hooks': self.add_progress_hook,
pp_def = dict(pp_def_raw)
when = pp_def.pop('when', 'post_process')
self.add_post_processor(
- get_postprocessor(pp_def.pop('key'))(self, **compat_kwargs(pp_def)),
+ get_postprocessor(pp_def.pop('key'))(self, **pp_def),
when=when)
+ self._setup_opener()
register_socks_protocols()
def preload_download_archive(fn):
with locked_file(fn, 'r', encoding='utf-8') as archive_file:
for line in archive_file:
self.archive.add(line.strip())
- except IOError as ioe:
+ except OSError as ioe:
if ioe.errno != errno.ENOENT:
raise
return False
SUPPRESS = 'light black'
def _format_text(self, handle, allow_colors, text, f, fallback=None, *, test_encoding=False):
+ text = str(text)
if test_encoding:
original_text = text
# handle.encoding can be None. See https://github.com/yt-dlp/yt-dlp/issues/2711
text = text.encode(encoding, 'ignore').decode(encoding)
if fallback is not None and text != original_text:
text = fallback
- if isinstance(f, self.Styles):
+ if isinstance(f, Enum):
f = f.value
return format_text(text, f) if allow_colors else text if fallback is None else fallback
def deprecation_warning(self, message):
if self.params.get('logger') is not None:
- self.params['logger'].warning('DeprecationWarning: {message}')
+ self.params['logger'].warning(f'DeprecationWarning: {message}')
else:
self.to_stderr(f'{self._format_err("DeprecationWarning:", self.Styles.ERROR)} {message}', True)
outtmpl_dict.update({
k: sanitize(v) for k, v in DEFAULT_OUTTMPL.items()
if outtmpl_dict.get(k) is None})
- for key, val in outtmpl_dict.items():
+ for _, val in outtmpl_dict.items():
if isinstance(val, bytes):
- self.report_warning(
- 'Parameter outtmpl is bytes, but should be a unicode string. '
- 'Put from __future__ import unicode_literals at the top of your code file or consider switching to Python 3.x.')
+ self.report_warning('Parameter outtmpl is bytes, but should be a unicode string')
return outtmpl_dict
def get_output_path(self, dir_type='', filename=None):
expand_path(paths.get('home', '').strip()),
expand_path(paths.get(dir_type, '').strip()) if dir_type else '',
filename or '')
-
- # Temporary fix for #4787
- # 'Treat' all problem characters by passing filename through preferredencoding
- # to workaround encoding issues with subprocess on python2 @ Windows
- if sys.version_info < (3, 0) and sys.platform == 'win32':
- path = encodeFilename(path, True).decode(preferredencoding())
return sanitize_path(path, force=self.params.get('windowsfilenames'))
@staticmethod
# '%%' intact for template dict substitution step. Working around
# with boundary-alike separator hack.
sep = ''.join([random.choice(ascii_letters) for _ in range(32)])
- outtmpl = outtmpl.replace('%%', '%{0}%'.format(sep)).replace('$$', '${0}$'.format(sep))
+ outtmpl = outtmpl.replace('%%', f'%{sep}%').replace('$$', f'${sep}$')
# outtmpl should be expand_path'ed before template dict substitution
# because meta fields may contain env variables we don't want to
na = self.params.get('outtmpl_na_placeholder', 'NA')
def filename_sanitizer(key, value, restricted=self.params.get('restrictfilenames')):
- return sanitize_filename(str(value), restricted=restricted,
- is_id=re.search(r'(^|[_.])id(\.|$)', key))
+ return sanitize_filename(str(value), restricted=restricted, is_id=(
+ bool(re.search(r'(^|[_.])id(\.|$)', key))
+ if 'filename-sanitization' in self.params.get('compat_opts', [])
+ else NO_DEFAULT))
sanitizer = sanitize if callable(sanitize) else filename_sanitizer
sanitize = bool(sanitize)
fmt = outer_mobj.group('format')
if fmt == 's' and value is not None and key in field_size_compat_map.keys():
- fmt = '0{:d}d'.format(field_size_compat_map[key])
+ fmt = f'0{field_size_compat_map[key]:d}d'
value = default if value is None else value if replacement is None else replacement
value = map(str, variadic(value) if '#' in flags else [value])
value, fmt = ' '.join(map(compat_shlex_quote, value)), str_fmt
elif fmt[-1] == 'B': # bytes
- value = f'%{str_fmt}'.encode('utf-8') % str(value).encode('utf-8')
+ value = f'%{str_fmt}'.encode() % str(value).encode('utf-8')
value, fmt = value.decode('utf-8', 'ignore'), 's'
elif fmt[-1] == 'U': # unicode normalized
value, fmt = unicodedata.normalize(
outtmpl, info_dict = self.prepare_outtmpl(outtmpl, info_dict, *args, **kwargs)
return self.escape_outtmpl(outtmpl) % info_dict
- def _prepare_filename(self, info_dict, tmpl_type='default'):
+ def _prepare_filename(self, info_dict, *, outtmpl=None, tmpl_type=None):
+ assert None in (outtmpl, tmpl_type), 'outtmpl and tmpl_type are mutually exclusive'
+ if outtmpl is None:
+ outtmpl = self.outtmpl_dict.get(tmpl_type or 'default', self.outtmpl_dict['default'])
try:
- outtmpl = self._outtmpl_expandpath(self.outtmpl_dict.get(tmpl_type, self.outtmpl_dict['default']))
+ outtmpl = self._outtmpl_expandpath(outtmpl)
filename = self.evaluate_outtmpl(outtmpl, info_dict, True)
if not filename:
return None
- if tmpl_type in ('default', 'temp'):
+ if tmpl_type in ('', 'temp'):
final_ext, ext = self.params.get('final_ext'), info_dict.get('ext')
if final_ext and ext and final_ext != ext and filename.endswith(f'.{final_ext}'):
filename = replace_extension(filename, ext, final_ext)
- else:
+ elif tmpl_type:
force_ext = OUTTMPL_TYPES[tmpl_type]
if force_ext:
filename = replace_extension(filename, force_ext, info_dict.get('ext'))
self.report_error('Error in output template: ' + str(err) + ' (encoding: ' + repr(preferredencoding()) + ')')
return None
- def prepare_filename(self, info_dict, dir_type='', warn=False):
- """Generate the output filename."""
-
- filename = self._prepare_filename(info_dict, dir_type or 'default')
+ def prepare_filename(self, info_dict, dir_type='', *, outtmpl=None, warn=False):
+ """Generate the output filename"""
+ if outtmpl:
+ assert not dir_type, 'outtmpl and dir_type are mutually exclusive'
+ dir_type = None
+ filename = self._prepare_filename(info_dict, tmpl_type=dir_type, outtmpl=outtmpl)
if not filename and dir_type not in ('', 'temp'):
return ''
if date is not None:
dateRange = self.params.get('daterange', DateRange())
if date not in dateRange:
- return '%s upload date is not in range %s' % (date_from_str(date).isoformat(), dateRange)
+ return f'{date_from_str(date).isoformat()} upload date is not in range {dateRange}'
view_count = info_dict.get('view_count')
if view_count is not None:
min_views = self.params.get('min_views')
if not info:
return info
- force_properties = dict(
- (k, v) for k, v in ie_result.items() if v is not None)
- for f in ('_type', 'url', 'id', 'extractor', 'extractor_key', 'ie_key'):
- if f in force_properties:
- del force_properties[f]
new_result = info.copy()
- new_result.update(force_properties)
+ new_result.update(filter_dict(ie_result, lambda k, v: (
+ v is not None and k not in {'_type', 'url', 'id', 'extractor', 'extractor_key', 'ie_key'})))
# Extracted info may not be a video result (i.e.
# info.get('_type', 'video') != video) but rather an url or
entries.append(entry)
try:
if entry is not None:
+ # TODO: Add auto-generated fields
self._match_entry(entry, incomplete=True, silent=True)
except (ExistingVideoReached, RejectedVideoReached):
broken = True
x_forwarded_for = ie_result.get('__x_forwarded_for_ip')
- self.to_screen('[%s] playlist %s: %s' % (ie_result['extractor'], playlist, msg % n_entries))
+ self.to_screen(f'[{ie_result["extractor"]}] playlist {playlist}: {msg % n_entries}')
failures = 0
max_failures = self.params.get('skip_playlist_after_errors') or float('inf')
for i, entry_tuple in enumerate(entries, 1):
playlist_index, entry = entry_tuple
if 'playlist-index' in self.params.get('compat_opts', []):
playlist_index = playlistitems[i - 1] if playlistitems else i + playliststart - 1
- self.to_screen('[download] Downloading video %s of %s' % (i, n_entries))
+ self.to_screen(f'[download] Downloading video {i} of {n_entries}')
# This __x_forwarded_for_ip thing is a bit ugly but requires
# minimal changes
if x_forwarded_for:
ie_result['entries'] = playlist_results
# Write the updated info to json
- if _infojson_written and self._write_info_json(
+ if _infojson_written is True and self._write_info_json(
'updated playlist', ie_result,
self.prepare_filename(ie_copy, 'pl_infojson'), overwrite=True) is None:
return
def syntax_error(note, start):
message = (
'Invalid format specification: '
- '{0}\n\t{1}\n\t{2}^'.format(note, format_spec, ' ' * start[1]))
+ '{}\n\t{}\n\t{}^'.format(note, format_spec, ' ' * start[1]))
return SyntaxError(message)
PICKFIRST = 'PICKFIRST'
raise syntax_error('Expected a selector', start)
current_selector = FormatSelector(MERGE, (selector_1, selector_2), [])
else:
- raise syntax_error('Operator not recognized: "{0}"'.format(string), start)
+ raise syntax_error(f'Operator not recognized: "{string}"', start)
elif type == tokenize.ENDMARKER:
break
if current_selector:
yield from _check_formats(ctx['formats'][::-1])
elif format_spec == 'mergeall':
def selector_function(ctx):
- formats = list(_check_formats(ctx['formats']))
+ formats = list(_check_formats(
+ f for f in ctx['formats'] if f.get('vcodec') != 'none' or f.get('acodec') != 'none'))
if not formats:
return
merged_format = formats[-1]
yield merged_format
else:
- format_fallback, format_reverse, format_idx = False, True, 1
+ format_fallback, seperate_fallback, format_reverse, format_idx = False, None, True, 1
mobj = re.match(
r'(?P<bw>best|worst|b|w)(?P<type>video|audio|v|a)?(?P<mod>\*)?(?:\.(?P<n>[1-9]\d*))?$',
format_spec)
filter_f = lambda f: f.get('ext') == format_spec and f.get('acodec') != 'none'
elif format_spec in self._format_selection_exts['video']:
filter_f = lambda f: f.get('ext') == format_spec and f.get('acodec') != 'none' and f.get('vcodec') != 'none'
+ seperate_fallback = lambda f: f.get('ext') == format_spec and f.get('vcodec') != 'none'
elif format_spec in self._format_selection_exts['storyboards']:
filter_f = lambda f: f.get('ext') == format_spec and f.get('acodec') == 'none' and f.get('vcodec') == 'none'
else:
def selector_function(ctx):
formats = list(ctx['formats'])
matches = list(filter(filter_f, formats)) if filter_f is not None else formats
- if format_fallback and ctx['incomplete_formats'] and not matches:
- # for extractors with incomplete formats (audio only (soundcloud)
- # or video only (imgur)) best/worst will fallback to
- # best/worst {video,audio}-only format
- matches = formats
+ if not matches:
+ if format_fallback and ctx['incomplete_formats']:
+ # for extractors with incomplete formats (audio only (soundcloud)
+ # or video only (imgur)) best/worst will fallback to
+ # best/worst {video,audio}-only format
+ matches = formats
+ elif seperate_fallback and not ctx['has_merged_format']:
+ # for compatibility with youtube-dl when there is no pre-merged format
+ matches = list(filter(seperate_fallback, formats))
matches = LazyList(_check_formats(matches[::-1 if format_reverse else 1]))
try:
yield matches[format_idx - 1]
- except IndexError:
+ except LazyList.IndexError:
return
filters = [self._build_format_filter(f) for f in selector.filters]
stream = io.BytesIO(format_spec.encode('utf-8'))
try:
- tokens = list(_remove_unused_ops(compat_tokenize_tokenize(stream.readline)))
+ tokens = list(_remove_unused_ops(tokenize.tokenize(stream.readline)))
except tokenize.TokenError:
raise syntax_error('Missing closing/opening brackets or parenthesis', (0, len(format_spec)))
- class TokenIterator(object):
+ class TokenIterator:
def __init__(self, tokens):
self.tokens = tokens
self.counter = 0
def sanitize_numeric_fields(info):
for numeric_field in self._NUMERIC_FIELDS:
field = info.get(numeric_field)
- if field is None or isinstance(field, compat_numeric_types):
+ if field is None or isinstance(field, (int, float)):
continue
report_force_conversion(numeric_field, 'numeric', 'int')
info[numeric_field] = int_or_none(field)
info_dict['__has_drm'] = any(f.get('has_drm') for f in formats)
if not self.params.get('allow_unplayable_formats'):
formats = [f for f in formats if not f.get('has_drm')]
+ if info_dict['__has_drm'] and all(
+ f.get('acodec') == f.get('vcodec') == 'none' for f in formats):
+ self.report_warning(
+ 'This video is DRM protected and only images are available for download. '
+ 'Use --list-formats to see them')
get_from_start = not info_dict.get('is_live') or bool(self.params.get('live_from_start'))
if not get_from_start:
if info_dict.get('is_live') and formats:
formats = [f for f in formats if bool(f.get('is_from_start')) == get_from_start]
if get_from_start and not formats:
- self.raise_no_formats(info_dict, msg='--live-from-start is passed, but there are no formats that can be downloaded from the start. '
- 'If you want to download from the current time, pass --no-live-from-start')
+ self.raise_no_formats(info_dict, msg=(
+ '--live-from-start is passed, but there are no formats that can be downloaded from the start. '
+ 'If you want to download from the current time, use --no-live-from-start'))
if not formats:
self.raise_no_formats(info_dict)
self.report_error(err, tb=False, is_error=False)
continue
- # While in format selection we may need to have an access to the original
- # format set in order to calculate some metrics or do some processing.
- # For now we need to be able to guess whether original formats provided
- # by extractor are incomplete or not (i.e. whether extractor provides only
- # video-only or audio-only formats) for proper formats selection for
- # extractors with such incomplete formats (see
- # https://github.com/ytdl-org/youtube-dl/pull/5556).
- # Since formats may be filtered during format selection and may not match
- # the original formats the results may be incorrect. Thus original formats
- # or pre-calculated metrics should be passed to format selection routines
- # as well.
- # We will pass a context object containing all necessary additional data
- # instead of just formats.
- # This fixes incorrect format selection issue (see
- # https://github.com/ytdl-org/youtube-dl/issues/10083).
- incomplete_formats = (
- # All formats are video-only or
- all(f.get('vcodec') != 'none' and f.get('acodec') == 'none' for f in formats)
- # all formats are audio-only
- or all(f.get('vcodec') == 'none' and f.get('acodec') != 'none' for f in formats))
-
- ctx = {
+ formats_to_download = list(format_selector({
'formats': formats,
- 'incomplete_formats': incomplete_formats,
- }
-
- formats_to_download = list(format_selector(ctx))
+ 'has_merged_format': any('none' not in (f.get('acodec'), f.get('vcodec')) for f in formats),
+ 'incomplete_formats': (
+ # All formats are video-only or
+ all(f.get('vcodec') != 'none' and f.get('acodec') == 'none' for f in formats)
+ # all formats are audio-only
+ or all(f.get('vcodec') == 'none' and f.get('acodec') != 'none' for f in formats)),
+ }))
if interactive_format_selection and not formats_to_download:
self.report_error('Requested format is not available', tb=False, is_error=False)
continue
if not formats_to_download:
if not self.params.get('ignore_no_formats_error'):
- raise ExtractorError('Requested format is not available', expected=True,
- video_id=info_dict['id'], ie=info_dict['extractor'])
+ raise ExtractorError(
+ 'Requested format is not available. Use --list-formats for a list of available formats',
+ expected=True, video_id=info_dict['id'], ie=info_dict['extractor'])
self.report_warning('Requested format is not available')
# Process what we can, even without any available formats.
formats_to_download = [{}]
if max_downloads_reached:
break
- write_archive = set(f.get('__write_download_archive', False) for f in formats_to_download)
+ write_archive = {f.get('__write_download_archive', False) for f in formats_to_download}
assert write_archive.issubset({True, False, 'ignore'})
if True in write_archive and False not in write_archive:
self.record_download_archive(info_dict)
def process_subtitles(self, video_id, normal_subtitles, automatic_captions):
"""Select the requested subtitles and their format"""
- available_subs = {}
+ available_subs, normal_sub_langs = {}, []
if normal_subtitles and self.params.get('writesubtitles'):
available_subs.update(normal_subtitles)
+ normal_sub_langs = tuple(normal_subtitles.keys())
if automatic_captions and self.params.get('writeautomaticsub'):
for lang, cap_info in automatic_captions.items():
if lang not in available_subs:
available_subs):
return None
- all_sub_langs = available_subs.keys()
+ all_sub_langs = tuple(available_subs.keys())
if self.params.get('allsubtitles', False):
requested_langs = all_sub_langs
elif self.params.get('subtitleslangs', False):
else:
requested_langs.extend(current_langs)
requested_langs = orderedSet(requested_langs)
- elif 'en' in available_subs:
- requested_langs = ['en']
+ elif normal_sub_langs:
+ requested_langs = ['en'] if 'en' in normal_sub_langs else normal_sub_langs[:1]
else:
- requested_langs = [list(all_sub_langs)[0]]
+ requested_langs = ['en'] if 'en' in all_sub_langs else all_sub_langs[:1]
if requested_langs:
self.write_debug('Downloading subtitles: %s' % ', '.join(requested_langs))
for lang in requested_langs:
formats = available_subs.get(lang)
if formats is None:
- self.report_warning('%s subtitles not available for %s' % (lang, video_id))
+ self.report_warning(f'{lang} subtitles not available for {video_id}')
continue
for ext in formats_preference:
if ext == 'best':
self.to_stdout(self.evaluate_outtmpl(format_tmpl(tmpl), info_copy))
for tmpl, file_tmpl in self.params['print_to_file'].get(key, []):
- filename = self.evaluate_outtmpl(file_tmpl, info_dict)
+ filename = self.prepare_filename(info_dict, outtmpl=file_tmpl)
tmpl = format_tmpl(tmpl)
self.to_screen(f'[info] Writing {tmpl!r} to: {filename}')
if self._ensure_dir_exists(filename):
- with io.open(filename, 'a', encoding='utf-8') as f:
+ with open(filename, 'a', encoding='utf-8') as f:
f.write(self.evaluate_outtmpl(tmpl, info_copy) + '\n')
def __forced_printings(self, info_dict, filename, incomplete):
else:
try:
self.to_screen('[info] Writing video annotations to: ' + annofn)
- with io.open(encodeFilename(annofn), 'w', encoding='utf-8') as annofile:
+ with open(encodeFilename(annofn), 'w', encoding='utf-8') as annofile:
annofile.write(info_dict['annotations'])
except (KeyError, TypeError):
self.report_warning('There are no annotations to write.')
- except (OSError, IOError):
+ except OSError:
self.report_error('Cannot write annotations file: ' + annofn)
return
return True
try:
self.to_screen(f'[info] Writing internet shortcut (.{link_type}) to: {linkfn}')
- with io.open(encodeFilename(to_high_limit_path(linkfn)), 'w', encoding='utf-8',
- newline='\r\n' if link_type == 'url' else '\n') as linkfile:
+ with open(encodeFilename(to_high_limit_path(linkfn)), 'w', encoding='utf-8',
+ newline='\r\n' if link_type == 'url' else '\n') as linkfile:
template_vars = {'url': url}
if link_type == 'desktop':
template_vars['filename'] = linkfn[:-(len(link_type) + 1)]
linkfile.write(LINK_TEMPLATES[link_type] % template_vars)
- except (OSError, IOError):
+ except OSError:
self.report_error(f'Cannot write internet shortcut {linkfn}')
return False
return True
return False
# Check extension
- exts = set(format.get('ext') for format in formats)
+ exts = {format.get('ext') for format in formats}
COMPATIBLE_EXTS = (
- set(('mp3', 'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'ismv', 'isma')),
- set(('webm',)),
+ {'mp3', 'mp4', 'm4a', 'm4p', 'm4b', 'm4r', 'm4v', 'ismv', 'isma'},
+ {'webm'},
)
for ext_sets in COMPATIBLE_EXTS:
if ext_sets.issuperset(exts):
os.path.splitext(filename)[0]
if filename_real_ext in (old_ext, new_ext)
else filename)
- return '%s.%s' % (filename_wo_ext, ext)
+ return f'{filename_wo_ext}.{ext}'
# Ensure filename always has a correct extension for successful merge
full_filename = correct_ext(full_filename)
except network_exceptions as err:
self.report_error('unable to download video data: %s' % error_to_compat_str(err))
return
- except (OSError, IOError) as err:
+ except OSError as err:
raise UnavailableVideoError(err)
except (ContentTooShortError, ) as err:
- self.report_error('content too short (expected %s bytes and served %s)' % (err.expected, err.downloaded))
+ self.report_error(f'content too short (expected {err.expected} bytes and served {err.downloaded})')
return
if success and full_filename != '-':
self.to_screen('Deleting original file %s (pass -k to keep)' % old_filename)
try:
os.remove(encodeFilename(old_filename))
- except (IOError, OSError):
+ except OSError:
self.report_warning('Unable to remove downloaded original file')
if old_filename in infodict['__files_to_move']:
del infodict['__files_to_move'][old_filename]
break
else:
return
- return '%s %s' % (extractor.lower(), video_id)
+ return f'{extractor.lower()} {video_id}'
def in_download_archive(self, info_dict):
fn = self.params.get('download_archive')
def urlopen(self, req):
""" Start an HTTP download """
- if isinstance(req, compat_basestring):
+ if isinstance(req, str):
req = sanitized_Request(req)
return self._opener.open(req, timeout=self._socket_timeout)
) or 'none'
write_debug('exe versions: %s' % exe_str)
+ from .cookies import SECRETSTORAGE_AVAILABLE, SQLITE_AVAILABLE
from .downloader.websocket import has_websockets
from .postprocessor.embedthumbnail import has_mutagen
- from .cookies import SQLITE_AVAILABLE, SECRETSTORAGE_AVAILABLE
lib_str = join_nonempty(
compat_brotli and compat_brotli.__name__,
delim=', ') or 'none'
write_debug('Optional libraries: %s' % lib_str)
+ self._setup_opener()
proxy_map = {}
for handler in self._opener.handlers:
if hasattr(handler, 'proxies'):
latest_version)
def _setup_opener(self):
+ if hasattr(self, '_opener'):
+ return
timeout_val = self.params.get('socket_timeout')
self._socket_timeout = 20 if timeout_val is None else float(timeout_val)
https_handler = make_HTTPS_handler(self.params, debuglevel=debuglevel)
ydlh = YoutubeDLHandler(self.params, debuglevel=debuglevel)
redirect_handler = YoutubeDLRedirectHandler()
- data_handler = compat_urllib_request_DataHandler()
+ data_handler = urllib.request.DataHandler()
# When passing our own FileHandler instance, build_opener won't add the
# default FileHandler and allows us to disable the file protocol, which
return encoding
def _write_info_json(self, label, ie_result, infofn, overwrite=None):
- ''' Write infojson and returns True = written, False = skip, None = error '''
+ ''' Write infojson and returns True = written, 'exists' = Already exists, False = skip, None = error '''
if overwrite is None:
overwrite = self.params.get('overwrites', True)
if not self.params.get('writeinfojson'):
return None
elif not overwrite and os.path.exists(infofn):
self.to_screen(f'[info] {label.title()} metadata is already present')
- else:
- self.to_screen(f'[info] Writing {label} metadata as JSON to: {infofn}')
- try:
- write_json_file(self.sanitize_info(ie_result, self.params.get('clean_infojson', True)), infofn)
- except (OSError, IOError):
- self.report_error(f'Cannot write {label} metadata to JSON file {infofn}')
- return None
- return True
+ return 'exists'
+
+ self.to_screen(f'[info] Writing {label} metadata as JSON to: {infofn}')
+ try:
+ write_json_file(self.sanitize_info(ie_result, self.params.get('clean_infojson', True)), infofn)
+ return True
+ except OSError:
+ self.report_error(f'Cannot write {label} metadata to JSON file {infofn}')
+ return None
def _write_description(self, label, ie_result, descfn):
''' Write description and returns True = written, False = skip, None = error '''
else:
try:
self.to_screen(f'[info] Writing {label} description to: {descfn}')
- with io.open(encodeFilename(descfn), 'w', encoding='utf-8') as descfile:
+ with open(encodeFilename(descfn), 'w', encoding='utf-8') as descfile:
descfile.write(ie_result['description'])
- except (OSError, IOError):
+ except OSError:
self.report_error(f'Cannot write {label} description file {descfn}')
return None
return True
try:
# Use newline='' to prevent conversion of newline characters
# See https://github.com/ytdl-org/youtube-dl/issues/10268
- with io.open(sub_filename, 'w', encoding='utf-8', newline='') as subfile:
+ with open(sub_filename, 'w', encoding='utf-8', newline='') as subfile:
subfile.write(sub_info['data'])
sub_info['filepath'] = sub_filename
ret.append((sub_filename, sub_filename_final))
continue
- except (OSError, IOError):
+ except OSError:
self.report_error(f'Cannot write video subtitles file {sub_filename}')
return None