X-Git-Url: https://jfr.im/git/yt-dlp.git/blobdiff_plain/0bfc53d05c7ecd7762313f0cd8578c46cd916519..10331a2672bd1fdcbe72f7ca60d1b6202c3783a6:/yt_dlp/YoutubeDL.py diff --git a/yt_dlp/YoutubeDL.py b/yt_dlp/YoutubeDL.py index 11708774e..355369c21 100644 --- a/yt_dlp/YoutubeDL.py +++ b/yt_dlp/YoutubeDL.py @@ -32,6 +32,7 @@ from .compat import ( compat_basestring, + compat_brotli, compat_get_terminal_size, compat_kwargs, compat_numeric_types, @@ -72,6 +73,7 @@ GeoRestrictedError, get_domain, HEADRequest, + InAdvancePagedList, int_or_none, iri_to_uri, ISO3166Utils, @@ -82,6 +84,7 @@ make_dir, make_HTTPS_handler, MaxDownloadsReached, + merge_headers, network_exceptions, number_of_digits, orderedSet, @@ -200,9 +203,12 @@ class YoutubeDL(object): verbose: Print additional info to stdout. quiet: Do not print messages to stdout. no_warnings: Do not print out anything for warnings. - forceprint: A dict with keys video/playlist mapped to - a list of templates to force print to stdout + forceprint: A dict with keys WHEN mapped to a list of templates to + print to stdout. The allowed keys are video or any of the + items in utils.POSTPROCESS_WHEN. For compatibility, a single list is also accepted + print_to_file: A dict with keys WHEN (same as forceprint) mapped to + a list of tuples with (template, filename) forceurl: Force printing final URL. (Deprecated) forcetitle: Force printing title. (Deprecated) forceid: Force printing ID. (Deprecated) @@ -229,6 +235,8 @@ class YoutubeDL(object): See "Sorting Formats" for more details. format_sort_force: Force the given format_sort. see "Sorting Formats" for more details. + prefer_free_formats: Whether to prefer video formats with free containers + over non-free ones of same quality. allow_multiple_video_streams: Allow multiple video streams to be merged into a single file allow_multiple_audio_streams: Allow multiple audio streams to be merged @@ -328,6 +336,7 @@ class YoutubeDL(object): nocheckcertificate: Do not verify SSL certificates prefer_insecure: Use HTTP instead of HTTPS to retrieve information. At the moment, this is only supported by YouTube. + http_headers: A dictionary of custom headers to be used for all requests proxy: URL of the proxy server to use geo_verification_proxy: URL of the proxy to use for IP address verification on geo-restricted sites. @@ -348,8 +357,8 @@ class YoutubeDL(object): postprocessors: A list of dictionaries, each with an entry * key: The name of the postprocessor. See yt_dlp/postprocessor/__init__.py for a list. - * when: When to run the postprocessor. Can be one of - pre_process|before_dl|post_process|after_move. + * when: When to run the postprocessor. Allowed values are + the entries of utils.POSTPROCESS_WHEN Assumed to be 'post_process' if not given post_hooks: Deprecated - Register a custom postprocessor instead A list of functions that get called as the final step @@ -480,6 +489,7 @@ class YoutubeDL(object): extractor_args: A dictionary of arguments to be passed to the extractors. See "EXTRACTOR ARGUMENTS" for details. Eg: {'youtube': {'skip': ['dash', 'hls']}} + mark_watched: Mark videos watched (even with --simulate). Only for YouTube youtube_include_dash_manifest: Deprecated - Use extractor_args instead. If True (default), DASH manifests and related data will be downloaded and processed by extractor. @@ -591,12 +601,14 @@ def check_deprecated(param, option, suggestion): else: self.params['nooverwrites'] = not self.params['overwrites'] + self.params.setdefault('forceprint', {}) + self.params.setdefault('print_to_file', {}) + # Compatibility with older syntax - params.setdefault('forceprint', {}) if not isinstance(params['forceprint'], dict): - params['forceprint'] = {'video': params['forceprint']} + self.params['forceprint'] = {'video': params['forceprint']} - if params.get('bidi_workaround', False): + if self.params.get('bidi_workaround', False): try: import pty master, slave = pty.openpty() @@ -624,7 +636,7 @@ def check_deprecated(param, option, suggestion): if (sys.platform != 'win32' and sys.getfilesystemencoding() in ['ascii', 'ANSI_X3.4-1968'] - and not params.get('restrictfilenames', False)): + and not self.params.get('restrictfilenames', False)): # Unicode filesystem API will throw errors (#1474, #13027) self.report_warning( 'Assuming --restrict-filenames since file system encoding ' @@ -640,6 +652,9 @@ def check_deprecated(param, option, suggestion): else self.params['format'] if callable(self.params['format']) else self.build_format_selector(self.params['format'])) + # Set http_headers defaults according to std_headers + self.params['http_headers'] = merge_headers(std_headers, self.params.get('http_headers', {})) + self._setup_opener() if auto_init: @@ -881,7 +896,8 @@ class Styles(Enum): def _format_text(self, handle, allow_colors, text, f, fallback=None, *, test_encoding=False): if test_encoding: original_text = text - encoding = self.params.get('encoding') or getattr(handle, 'encoding', 'ascii') + # handle.encoding can be None. See https://github.com/yt-dlp/yt-dlp/issues/2711 + encoding = self.params.get('encoding') or getattr(handle, 'encoding', None) or 'ascii' text = text.encode(encoding, 'ignore').decode(encoding) if fallback is not None and text != original_text: text = fallback @@ -946,13 +962,13 @@ def report_file_delete(self, file_name): except UnicodeEncodeError: self.to_screen('Deleting existing file') - def raise_no_formats(self, info, forced=False): + def raise_no_formats(self, info, forced=False, *, msg=None): has_drm = info.get('__has_drm') - msg = 'This video is DRM protected' if has_drm else 'No video formats found!' - expected = self.params.get('ignore_no_formats_error') - if forced or not expected: + ignored, expected = self.params.get('ignore_no_formats_error'), bool(msg) + msg = msg or has_drm and 'This video is DRM protected' or 'No video formats found!' + if forced or not ignored: raise ExtractorError(msg, video_id=info['id'], ie=info['extractor'], - expected=has_drm or expected) + expected=has_drm or ignored or expected) else: self.report_warning(msg) @@ -1029,8 +1045,7 @@ def validate_outtmpl(cls, outtmpl): @staticmethod def _copy_infodict(info_dict): info_dict = dict(info_dict) - for key in ('__original_infodict', '__postprocessors'): - info_dict.pop(key, None) + info_dict.pop('__postprocessors', None) return info_dict def prepare_outtmpl(self, outtmpl, info_dict, sanitize=False): @@ -1215,10 +1230,17 @@ def _prepare_filename(self, info_dict, tmpl_type='default'): try: outtmpl = self._outtmpl_expandpath(self.outtmpl_dict.get(tmpl_type, self.outtmpl_dict['default'])) filename = self.evaluate_outtmpl(outtmpl, info_dict, True) + if not filename: + return None - force_ext = OUTTMPL_TYPES.get(tmpl_type) - if filename and force_ext is not None: - filename = replace_extension(filename, force_ext, info_dict.get('ext')) + if tmpl_type in ('default', 'temp'): + final_ext, ext = self.params.get('final_ext'), info_dict.get('ext') + if final_ext and ext and final_ext != ext and filename.endswith(f'.{final_ext}'): + filename = replace_extension(filename, ext, final_ext) + else: + force_ext = OUTTMPL_TYPES[tmpl_type] + if force_ext: + filename = replace_extension(filename, force_ext, info_dict.get('ext')) # https://github.com/blackjack4494/youtube-dlc/issues/85 trim_file_name = self.params.get('trim_file_name', False) @@ -1457,8 +1479,12 @@ def add_default_extra_info(self, ie_result, ie, url): self.add_extra_info(ie_result, { 'webpage_url': url, 'original_url': url, - 'webpage_url_basename': url_basename(url), - 'webpage_url_domain': get_domain(url), + }) + webpage_url = ie_result.get('webpage_url') + if webpage_url: + self.add_extra_info(ie_result, { + 'webpage_url_basename': url_basename(webpage_url), + 'webpage_url_domain': get_domain(webpage_url), }) if ie is not None: self.add_extra_info(ie_result, { @@ -1566,6 +1592,7 @@ def process_ie_result(self, ie_result, download=True, extra_info=None): self._playlist_level += 1 self._playlist_urls.add(webpage_url) + self._fill_common_fields(ie_result, False) self._sanitize_thumbnails(ie_result) try: return self.__process_playlist(ie_result, download) @@ -1662,6 +1689,9 @@ def get_entry(i): msg = 'Downloading %d videos' if not isinstance(ie_entries, (PagedList, LazyList)): ie_entries = LazyList(ie_entries) + elif isinstance(ie_entries, InAdvancePagedList): + if ie_entries._pagesize == 1: + playlist_count = ie_entries._pagecount def get_entry(i): return YoutubeDL.__handle_extraction_exceptions( @@ -1825,15 +1855,21 @@ def _build_format_filter(self, filter_spec): '^=': lambda attr, value: attr.startswith(value), '$=': lambda attr, value: attr.endswith(value), '*=': lambda attr, value: value in attr, + '~=': lambda attr, value: value.search(attr) is not None } str_operator_rex = re.compile(r'''(?x)\s* (?P[a-zA-Z0-9._-]+)\s* - (?P!\s*)?(?P%s)(?P\s*\?)?\s* - (?P[a-zA-Z0-9._-]+)\s* + (?P!\s*)?(?P%s)\s*(?P\?\s*)? + (?P["'])? + (?P(?(quote)(?:(?!(?P=quote))[^\\]|\\.)+|[\w.-]+)) + (?(quote)(?P=quote))\s* ''' % '|'.join(map(re.escape, STR_OPERATORS.keys()))) m = str_operator_rex.fullmatch(filter_spec) if m: - comparison_value = m.group('value') + if m.group('op') == '~=': + comparison_value = re.compile(m.group('value')) + else: + comparison_value = re.sub(r'''\\([\\"'])''', r'\1', m.group('value')) str_op = STR_OPERATORS[m.group('op')] if m.group('negation'): op = lambda attr, value: not str_op(attr, value) @@ -2222,11 +2258,7 @@ def restore_last_token(self): return _build_selector_function(parsed_selector) def _calc_headers(self, info_dict): - res = std_headers.copy() - - add_headers = info_dict.get('http_headers') - if add_headers: - res.update(add_headers) + res = merge_headers(self.params['http_headers'], info_dict.get('http_headers') or {}) cookies = self._calc_cookies(info_dict) if cookies: @@ -2284,15 +2316,66 @@ def check_thumbnails(thumbnails): else: info_dict['thumbnails'] = thumbnails + def _fill_common_fields(self, info_dict, is_video=True): + # TODO: move sanitization here + if is_video: + # playlists are allowed to lack "title" + info_dict['fulltitle'] = info_dict.get('title') + if 'title' not in info_dict: + raise ExtractorError('Missing "title" field in extractor result', + video_id=info_dict['id'], ie=info_dict['extractor']) + elif not info_dict.get('title'): + self.report_warning('Extractor failed to obtain "title". Creating a generic title instead') + info_dict['title'] = f'{info_dict["extractor"]} video #{info_dict["id"]}' + + if info_dict.get('duration') is not None: + info_dict['duration_string'] = formatSeconds(info_dict['duration']) + + for ts_key, date_key in ( + ('timestamp', 'upload_date'), + ('release_timestamp', 'release_date'), + ('modified_timestamp', 'modified_date'), + ): + if info_dict.get(date_key) is None and info_dict.get(ts_key) is not None: + # Working around out-of-range timestamp values (e.g. negative ones on Windows, + # see http://bugs.python.org/issue1646728) + try: + upload_date = datetime.datetime.utcfromtimestamp(info_dict[ts_key]) + info_dict[date_key] = upload_date.strftime('%Y%m%d') + except (ValueError, OverflowError, OSError): + pass + + live_keys = ('is_live', 'was_live') + live_status = info_dict.get('live_status') + if live_status is None: + for key in live_keys: + if info_dict.get(key) is False: + continue + if info_dict.get(key): + live_status = key + break + if all(info_dict.get(key) is False for key in live_keys): + live_status = 'not_live' + if live_status: + info_dict['live_status'] = live_status + for key in live_keys: + if info_dict.get(key) is None: + info_dict[key] = (live_status == key) + + # Auto generate title fields corresponding to the *_number fields when missing + # in order to always have clean titles. This is very common for TV series. + for field in ('chapter', 'season', 'episode'): + if info_dict.get('%s_number' % field) is not None and not info_dict.get(field): + info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field]) + def process_video_result(self, info_dict, download=True): assert info_dict.get('_type', 'video') == 'video' self._num_videos += 1 if 'id' not in info_dict: - raise ExtractorError('Missing "id" field in extractor result') - if 'title' not in info_dict: - raise ExtractorError('Missing "title" field in extractor result', - video_id=info_dict['id'], ie=info_dict['extractor']) + raise ExtractorError('Missing "id" field in extractor result', ie=info_dict['extractor']) + elif not info_dict.get('id'): + raise ExtractorError('Extractor failed to obtain "id"', ie=info_dict['extractor']) def report_force_conversion(field, field_not, conversion): self.report_warning( @@ -2316,6 +2399,8 @@ def sanitize_numeric_fields(info): sanitize_string_field(info_dict, 'id') sanitize_numeric_fields(info_dict) + if (info_dict.get('duration') or 0) <= 0 and info_dict.pop('duration', None): + self.report_warning('"duration" field is negative, there is an error in extractor') if 'playlist' not in info_dict: # It isn't part of a playlist @@ -2334,45 +2419,7 @@ def sanitize_numeric_fields(info): if info_dict.get('display_id') is None and 'id' in info_dict: info_dict['display_id'] = info_dict['id'] - if info_dict.get('duration') is not None: - info_dict['duration_string'] = formatSeconds(info_dict['duration']) - - for ts_key, date_key in ( - ('timestamp', 'upload_date'), - ('release_timestamp', 'release_date'), - ('modified_timestamp', 'modified_date'), - ): - if info_dict.get(date_key) is None and info_dict.get(ts_key) is not None: - # Working around out-of-range timestamp values (e.g. negative ones on Windows, - # see http://bugs.python.org/issue1646728) - try: - upload_date = datetime.datetime.utcfromtimestamp(info_dict[ts_key]) - info_dict[date_key] = upload_date.strftime('%Y%m%d') - except (ValueError, OverflowError, OSError): - pass - - live_keys = ('is_live', 'was_live') - live_status = info_dict.get('live_status') - if live_status is None: - for key in live_keys: - if info_dict.get(key) is False: - continue - if info_dict.get(key): - live_status = key - break - if all(info_dict.get(key) is False for key in live_keys): - live_status = 'not_live' - if live_status: - info_dict['live_status'] = live_status - for key in live_keys: - if info_dict.get(key) is None: - info_dict[key] = (live_status == key) - - # Auto generate title fields corresponding to the *_number fields when missing - # in order to always have clean titles. This is very common for TV series. - for field in ('chapter', 'season', 'episode'): - if info_dict.get('%s_number' % field) is not None and not info_dict.get(field): - info_dict[field] = '%s %d' % (field.capitalize(), info_dict['%s_number' % field]) + self._fill_common_fields(info_dict) for cc_kind in ('subtitles', 'automatic_captions'): cc = info_dict.get(cc_kind) @@ -2400,14 +2447,14 @@ def sanitize_numeric_fields(info): if not self.params.get('allow_unplayable_formats'): formats = [f for f in formats if not f.get('has_drm')] - # backward compatibility - info_dict['fulltitle'] = info_dict['title'] - - if info_dict.get('is_live'): - get_from_start = bool(self.params.get('live_from_start')) + get_from_start = not info_dict.get('is_live') or bool(self.params.get('live_from_start')) + if not get_from_start: + info_dict['title'] += ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M') + if info_dict.get('is_live') and formats: formats = [f for f in formats if bool(f.get('is_from_start')) == get_from_start] - if not get_from_start: - info_dict['title'] += ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M') + if get_from_start and not formats: + self.raise_no_formats(info_dict, msg='--live-from-start is passed, but there are no formats that can be downloaded from the start. ' + 'If you want to download from the current time, pass --no-live-from-start') if not formats: self.raise_no_formats(info_dict) @@ -2483,8 +2530,6 @@ def is_wellformed(f): if '__x_forwarded_for_ip' in info_dict: del info_dict['__x_forwarded_for_ip'] - # TODO Central sorting goes here - if self.params.get('check_formats') is True: formats = LazyList(self._check_formats(formats[::-1]), reverse=True) @@ -2497,6 +2542,12 @@ def is_wellformed(f): info_dict, _ = self.pre_process(info_dict) + if self._match_entry(info_dict) is not None: + return info_dict + + self.post_extract(info_dict) + info_dict, _ = self.pre_process(info_dict, 'after_filter') + # The pre-processors may have modified the formats formats = info_dict.get('formats', [info_dict]) @@ -2581,15 +2632,12 @@ def is_wellformed(f): + ', '.join([f['format_id'] for f in formats_to_download])) max_downloads_reached = False for i, fmt in enumerate(formats_to_download): - formats_to_download[i] = new_info = dict(info_dict) - # Save a reference to the original info_dict so that it can be modified in process_info if needed + formats_to_download[i] = new_info = self._copy_infodict(info_dict) new_info.update(fmt) - new_info['__original_infodict'] = info_dict try: self.process_info(new_info) except MaxDownloadsReached: max_downloads_reached = True - new_info.pop('__original_infodict') # Remove copied info for key, val in tuple(new_info.items()): if info_dict.get(key) == val: @@ -2634,12 +2682,15 @@ def process_subtitles(self, video_id, normal_subtitles, automatic_captions): # given in subtitleslangs. See https://github.com/yt-dlp/yt-dlp/issues/1041 requested_langs = [] for lang_re in self.params.get('subtitleslangs'): - if lang_re == 'all': - requested_langs.extend(all_sub_langs) - continue discard = lang_re[0] == '-' if discard: lang_re = lang_re[1:] + if lang_re == 'all': + if discard: + requested_langs = [] + else: + requested_langs.extend(all_sub_langs) + continue current_langs = filter(re.compile(lang_re + '$').match, all_sub_langs) if discard: for lang in current_langs: @@ -2679,19 +2730,33 @@ def process_subtitles(self, video_id, normal_subtitles, automatic_captions): subs[lang] = f return subs - def _forceprint(self, tmpl, info_dict): - mobj = re.match(r'\w+(=?)$', tmpl) - if mobj and mobj.group(1): - tmpl = f'{tmpl[:-1]} = %({tmpl[:-1]})s' - elif mobj: - tmpl = '%({})s'.format(tmpl) + def _forceprint(self, key, info_dict): + if info_dict is None: + return + info_copy = info_dict.copy() + info_copy['formats_table'] = self.render_formats_table(info_dict) + info_copy['thumbnails_table'] = self.render_thumbnails_table(info_dict) + info_copy['subtitles_table'] = self.render_subtitles_table(info_dict.get('id'), info_dict.get('subtitles')) + info_copy['automatic_captions_table'] = self.render_subtitles_table(info_dict.get('id'), info_dict.get('automatic_captions')) + + def format_tmpl(tmpl): + mobj = re.match(r'\w+(=?)$', tmpl) + if mobj and mobj.group(1): + return f'{tmpl[:-1]} = %({tmpl[:-1]})r' + elif mobj: + return f'%({tmpl})s' + return tmpl - info_dict = info_dict.copy() - info_dict['formats_table'] = self.render_formats_table(info_dict) - info_dict['thumbnails_table'] = self.render_thumbnails_table(info_dict) - info_dict['subtitles_table'] = self.render_subtitles_table(info_dict.get('id'), info_dict.get('subtitles')) - info_dict['automatic_captions_table'] = self.render_subtitles_table(info_dict.get('id'), info_dict.get('automatic_captions')) - self.to_stdout(self.evaluate_outtmpl(tmpl, info_dict)) + for tmpl in self.params['forceprint'].get(key, []): + self.to_stdout(self.evaluate_outtmpl(format_tmpl(tmpl), info_copy)) + + for tmpl, file_tmpl in self.params['print_to_file'].get(key, []): + filename = self.evaluate_outtmpl(file_tmpl, info_dict) + tmpl = format_tmpl(tmpl) + self.to_screen(f'[info] Writing {tmpl!r} to: {filename}') + if self._ensure_dir_exists(filename): + with io.open(filename, 'a', encoding='utf-8') as f: + f.write(self.evaluate_outtmpl(tmpl, info_copy) + '\n') def __forced_printings(self, info_dict, filename, incomplete): def print_mandatory(field, actual_field=None): @@ -2712,13 +2777,14 @@ def print_optional(field): if info_dict.get('requested_formats') is not None: # For RTMP URLs, also include the playpath info_dict['urls'] = '\n'.join(f['url'] + f.get('play_path', '') for f in info_dict['requested_formats']) - elif 'url' in info_dict: + elif info_dict.get('url'): info_dict['urls'] = info_dict['url'] + info_dict.get('play_path', '') - if self.params['forceprint'].get('video') or self.params.get('forcejson'): + if (self.params.get('forcejson') + or self.params['forceprint'].get('video') + or self.params['print_to_file'].get('video')): self.post_extract(info_dict) - for tmpl in self.params['forceprint'].get('video', []): - self._forceprint(tmpl, info_dict) + self._forceprint('video', info_dict) print_mandatory('title') print_mandatory('id') @@ -2779,7 +2845,7 @@ def existing_file(self, filepaths, *, default_overwrite=True): return None def process_info(self, info_dict): - """Process a single resolved IE result. (Modified it in-place)""" + """Process a single resolved IE result. (Modifies it in-place)""" assert info_dict.get('_type', 'video') == 'video' original_infodict = info_dict @@ -2787,18 +2853,22 @@ def process_info(self, info_dict): if 'format' not in info_dict and 'ext' in info_dict: info_dict['format'] = info_dict['ext'] + # This is mostly just for backward compatibility of process_info + # As a side-effect, this allows for format-specific filters if self._match_entry(info_dict) is not None: info_dict['__write_download_archive'] = 'ignore' return + # Does nothing under normal operation - for backward compatibility of process_info self.post_extract(info_dict) - self._num_downloads += 1 # info_dict['_filename'] needs to be set for backward compatibility info_dict['_filename'] = full_filename = self.prepare_filename(info_dict, warn=True) temp_filename = self.prepare_filename(info_dict, 'temp') files_to_move = {} + self._num_downloads += 1 + # Forced printings self.__forced_printings(info_dict, full_filename, incomplete=('format' not in info_dict)) @@ -2861,9 +2931,11 @@ def process_info(self, info_dict): # Write internet shortcut files def _write_link_file(link_type): - if 'webpage_url' not in info_dict: - self.report_error('Cannot write internet shortcut file because the "webpage_url" field is missing in the media information') - return False + url = try_get(info_dict['webpage_url'], iri_to_uri) + if not url: + self.report_warning( + f'Cannot write internet shortcut file because the actual URL of "{info_dict["webpage_url"]}" is unknown') + return True linkfn = replace_extension(self.prepare_filename(info_dict, 'link'), link_type, info_dict.get('ext')) if not self._ensure_dir_exists(encodeFilename(linkfn)): return False @@ -2874,7 +2946,7 @@ def _write_link_file(link_type): self.to_screen(f'[info] Writing internet shortcut (.{link_type}) to: {linkfn}') with io.open(encodeFilename(to_high_limit_path(linkfn)), 'w', encoding='utf-8', newline='\r\n' if link_type == 'url' else '\n') as linkfile: - template_vars = {'url': iri_to_uri(info_dict['webpage_url'])} + template_vars = {'url': url} if link_type == 'desktop': template_vars['filename'] = linkfn[:-(len(link_type) + 1)] linkfile.write(LINK_TEMPLATES[link_type] % template_vars) @@ -3009,9 +3081,11 @@ def correct_ext(filename, ext=new_ext): 'while also allowing unplayable formats to be downloaded. ' 'The formats won\'t be merged to prevent data corruption.') elif not merger.available: - self.report_warning( - 'You have requested merging of multiple formats but ffmpeg is not installed. ' - 'The formats won\'t be merged.') + msg = 'You have requested merging of multiple formats but ffmpeg is not installed' + if not self.params.get('ignoreerrors'): + self.report_error(f'{msg}. Aborting due to --abort-on-error') + return + self.report_warning(f'{msg}. The formats won\'t be merged') if temp_filename == '-': reason = ('using a downloader other than ffmpeg' if FFmpegFD.can_merge_formats(info_dict, self.params) @@ -3207,17 +3281,15 @@ def sanitize_info(info_dict, remove_private_keys=False): if info_dict is None: return info_dict info_dict.setdefault('epoch', int(time.time())) - remove_keys = {'__original_infodict'} # Always remove this since this may contain a copy of the entire dict - keep_keys = ['_type'] # Always keep this to facilitate load-info-json + info_dict.setdefault('_type', 'video') + if remove_private_keys: - remove_keys |= { + reject = lambda k, v: v is None or (k.startswith('_') and k != '_type') or k in { 'requested_downloads', 'requested_formats', 'requested_subtitles', 'requested_entries', 'entries', 'filepath', 'infojson_filename', 'original_url', 'playlist_autonumber', } - reject = lambda k, v: k not in keep_keys and ( - k.startswith('_') or k in remove_keys or v is None) else: - reject = lambda k, v: k in remove_keys + reject = lambda k, v: False def filter_fn(obj): if isinstance(obj, dict): @@ -3244,14 +3316,8 @@ def actual_post_extract(info_dict): actual_post_extract(video_dict or {}) return - post_extractor = info_dict.get('__post_extractor') or (lambda: {}) - extra = post_extractor().items() - info_dict.update(extra) - info_dict.pop('__post_extractor', None) - - original_infodict = info_dict.get('__original_infodict') or {} - original_infodict.update(extra) - original_infodict.pop('__post_extractor', None) + post_extractor = info_dict.pop('__post_extractor', None) or (lambda: {}) + info_dict.update(post_extractor()) actual_post_extract(info_dict or {}) @@ -3285,8 +3351,7 @@ def run_pp(self, pp, infodict): return infodict def run_all_pps(self, key, info, *, additional_pps=None): - for tmpl in self.params['forceprint'].get(key, []): - self._forceprint(tmpl, info) + self._forceprint(key, info) for pp in (additional_pps or []) + self._pps[key]: info = self.run_pp(pp, info) return info @@ -3481,12 +3546,12 @@ def render_formats_table(self, info_dict): delim=self._format_screen('\u2500', self.Styles.DELIM, '-', test_encoding=True)) def render_thumbnails_table(self, info_dict): - thumbnails = list(info_dict.get('thumbnails')) + thumbnails = list(info_dict.get('thumbnails') or []) if not thumbnails: return None return render_table( self._list_format_headers('ID', 'Width', 'Height', 'URL'), - [[t['id'], t.get('width', 'unknown'), t.get('height', 'unknown'), t['url']] for t in thumbnails]) + [[t.get('id'), t.get('width', 'unknown'), t.get('height', 'unknown'), t['url']] for t in thumbnails]) def render_subtitles_table(self, video_id, subtitles): def _row(lang, formats): @@ -3530,7 +3595,7 @@ def print_debug_header(self): return def get_encoding(stream): - ret = getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__) + ret = str(getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__)) if not supports_terminal_sequences(stream): from .compat import WINDOWS_VT_MODE ret += ' (No VT)' if WINDOWS_VT_MODE is False else ' (No ANSI)' @@ -3613,6 +3678,7 @@ def python_implementation(): from .cookies import SQLITE_AVAILABLE, SECRETSTORAGE_AVAILABLE lib_str = join_nonempty( + compat_brotli and compat_brotli.__name__, compat_pycrypto_AES and compat_pycrypto_AES.__name__.split('.')[0], SECRETSTORAGE_AVAILABLE and 'secretstorage', has_mutagen and 'mutagen', @@ -3828,7 +3894,7 @@ def _write_thumbnails(self, label, info_dict, filename, thumb_filename_base=None else: self.to_screen(f'[info] Downloading {thumb_display_id} ...') try: - uf = self.urlopen(t['url']) + uf = self.urlopen(sanitized_Request(t['url'], headers=t.get('http_headers', {}))) self.to_screen(f'[info] Writing {thumb_display_id} to: {thumb_filename}') with open(encodeFilename(thumb_filename), 'wb') as thumbf: shutil.copyfileobj(uf, thumbf)