]> jfr.im git - yt-dlp.git/blobdiff - yt_dlp/YoutubeDL.py
[extractor/generic] Avoid catastrophic backtracking in KVS regex
[yt-dlp.git] / yt_dlp / YoutubeDL.py
index b1d009280e4ec017b97e45588c92ebc51f5875ae..fd280726f9b66bbf92df344828a0568f51138aa8 100644 (file)
@@ -32,7 +32,8 @@
 from .extractor.common import UnsupportedURLIE
 from .extractor.openload import PhantomJSwrapper
 from .minicurses import format_text
-from .postprocessor import _PLUGIN_CLASSES as plugin_postprocessors
+from .plugins import directories as plugin_directories
+from .postprocessor import _PLUGIN_CLASSES as plugin_pps
 from .postprocessor import (
     EmbedThumbnailPP,
     FFmpegFixupDuplicateMoovPP,
@@ -317,6 +318,7 @@ class YoutubeDL:
                         If not provided and the key is encrypted, yt-dlp will ask interactively
     prefer_insecure:   Use HTTP instead of HTTPS to retrieve information.
                        (Only supported by some extractors)
+    enable_file_urls:  Enable file:// URLs. This is disabled by default for security reasons.
     http_headers:      A dictionary of custom headers to be used for all requests
     proxy:             URL of the proxy server to use
     geo_verification_proxy:  URL of the proxy to use for IP address verification
@@ -584,7 +586,6 @@ def __init__(self, params=None, auto_init=True):
         self._playlist_urls = set()
         self.cache = Cache(self)
 
-        windows_enable_vt_mode()
         stdout = sys.stderr if self.params.get('logtostderr') else sys.stdout
         self._out_files = Namespace(
             out=stdout,
@@ -593,6 +594,12 @@ def __init__(self, params=None, auto_init=True):
             console=None if compat_os_name == 'nt' else next(
                 filter(supports_terminal_sequences, (sys.stderr, sys.stdout)), None)
         )
+
+        try:
+            windows_enable_vt_mode()
+        except Exception as e:
+            self.write_debug(f'Failed to enable VT mode: {e}')
+
         self._allow_colors = Namespace(**{
             type_: not self.params.get('no_color') and supports_terminal_sequences(stream)
             for type_, stream in self._out_files.items_ if type_ != 'console'
@@ -1068,7 +1075,7 @@ def _outtmpl_expandpath(outtmpl):
         # correspondingly that is not what we want since we need to keep
         # '%%' intact for template dict substitution step. Working around
         # with boundary-alike separator hack.
-        sep = ''.join([random.choice(ascii_letters) for _ in range(32)])
+        sep = ''.join(random.choices(ascii_letters, k=32))
         outtmpl = outtmpl.replace('%%', f'%{sep}%').replace('$$', f'${sep}$')
 
         # outtmpl should be expand_path'ed before template dict substitution
@@ -1626,8 +1633,8 @@ def process_ie_result(self, ie_result, download=True, extra_info=None):
         if result_type in ('url', 'url_transparent'):
             ie_result['url'] = sanitize_url(
                 ie_result['url'], scheme='http' if self.params.get('prefer_insecure') else 'https')
-            if ie_result.get('original_url'):
-                extra_info.setdefault('original_url', ie_result['original_url'])
+            if ie_result.get('original_url') and not extra_info.get('original_url'):
+                extra_info = {'original_url': ie_result['original_url'], **extra_info}
 
             extract_flat = self.params.get('extract_flat', False)
             if ((extract_flat == 'in_playlist' and 'playlist' in extra_info)
@@ -1770,7 +1777,7 @@ def _playlist_infodict(ie_result, strict=False, **kwargs):
         return {
             **info,
             'playlist_index': 0,
-            '__last_playlist_index': max(ie_result['requested_entries'] or (0, 0)),
+            '__last_playlist_index': max(ie_result.get('requested_entries') or (0, 0)),
             'extractor': ie_result['extractor'],
             'extractor_key': ie_result['extractor_key'],
         }
@@ -1862,11 +1869,10 @@ def __process_playlist(self, ie_result, download):
             self.to_screen('[download] Downloading item %s of %s' % (
                 self._format_screen(i + 1, self.Styles.ID), self._format_screen(n_entries, self.Styles.EMPHASIS)))
 
-            extra.update({
+            entry_result = self.__process_iterable_entry(entry, download, collections.ChainMap({
                 'playlist_index': playlist_index,
                 'playlist_autonumber': i + 1,
-            })
-            entry_result = self.__process_iterable_entry(entry, download, extra)
+            }, extra))
             if not entry_result:
                 failures += 1
             if failures >= max_failures:
@@ -2977,6 +2983,16 @@ def process_info(self, info_dict):
 
         # Does nothing under normal operation - for backward compatibility of process_info
         self.post_extract(info_dict)
+
+        def replace_info_dict(new_info):
+            nonlocal info_dict
+            if new_info == info_dict:
+                return
+            info_dict.clear()
+            info_dict.update(new_info)
+
+        new_info, _ = self.pre_process(info_dict, 'video')
+        replace_info_dict(new_info)
         self._num_downloads += 1
 
         # info_dict['_filename'] needs to be set for backward compatibility
@@ -3090,13 +3106,6 @@ def _write_link_file(link_type):
                for link_type, should_write in write_links.items()):
             return
 
-        def replace_info_dict(new_info):
-            nonlocal info_dict
-            if new_info == info_dict:
-                return
-            info_dict.clear()
-            info_dict.update(new_info)
-
         new_info, files_to_move = self.pre_process(info_dict, 'before_dl', files_to_move)
         replace_info_dict(new_info)
 
@@ -3123,7 +3132,7 @@ def existing_video_file(*filepaths):
                 fd, success = None, True
                 if info_dict.get('protocol') or info_dict.get('url'):
                     fd = get_suitable_downloader(info_dict, self.params, to_stdout=temp_filename == '-')
-                    if fd is not FFmpegFD and (
+                    if fd is not FFmpegFD and 'no-direct-merge' not in self.params['compat_opts'] and (
                             info_dict.get('section_start') or info_dict.get('section_end')):
                         msg = ('This format cannot be partially downloaded' if FFmpegFD.available()
                                else 'You have requested downloading the video partially, but ffmpeg is not installed')
@@ -3388,6 +3397,7 @@ def sanitize_info(info_dict, remove_private_keys=False):
             reject = lambda k, v: v is None or k.startswith('__') or k in {
                 'requested_downloads', 'requested_formats', 'requested_subtitles', 'requested_entries',
                 'entries', 'filepath', '_filename', 'infojson_filename', 'original_url', 'playlist_autonumber',
+                '_format_sort_fields',
             }
         else:
             reject = lambda k, v: False
@@ -3457,7 +3467,8 @@ def run_pp(self, pp, infodict):
         return infodict
 
     def run_all_pps(self, key, info, *, additional_pps=None):
-        self._forceprint(key, info)
+        if key != 'video':
+            self._forceprint(key, info)
         for pp in (additional_pps or []) + self._pps[key]:
             info = self.run_pp(pp, info)
         return info
@@ -3726,7 +3737,10 @@ def print_debug_header(self):
 
         # These imports can be slow. So import them only as needed
         from .extractor.extractors import _LAZY_LOADER
-        from .extractor.extractors import _PLUGIN_CLASSES as plugin_extractors
+        from .extractor.extractors import (
+            _PLUGIN_CLASSES as plugin_ies,
+            _PLUGIN_OVERRIDES as plugin_ie_overrides
+        )
 
         def get_encoding(stream):
             ret = str(getattr(stream, 'encoding', 'missing (%s)' % type(stream).__name__))
@@ -3771,10 +3785,6 @@ def get_encoding(stream):
                 write_debug('Lazy loading extractors is forcibly disabled')
             else:
                 write_debug('Lazy loading extractors is disabled')
-        if plugin_extractors or plugin_postprocessors:
-            write_debug('Plugins: %s' % [
-                '%s%s' % (klass.__name__, '' if klass.__name__ == name else f' as {name}')
-                for name, klass in itertools.chain(plugin_extractors.items(), plugin_postprocessors.items())])
         if self.params['compat_opts']:
             write_debug('Compatibility options: %s' % ', '.join(self.params['compat_opts']))
 
@@ -3808,6 +3818,21 @@ def get_encoding(stream):
                 proxy_map.update(handler.proxies)
         write_debug(f'Proxy map: {proxy_map}')
 
+        for plugin_type, plugins in {'Extractor': plugin_ies, 'Post-Processor': plugin_pps}.items():
+            display_list = ['%s%s' % (
+                klass.__name__, '' if klass.__name__ == name else f' as {name}')
+                for name, klass in plugins.items()]
+            if plugin_type == 'Extractor':
+                display_list.extend(f'{plugins[-1].IE_NAME.partition("+")[2]} ({parent.__name__})'
+                                    for parent, plugins in plugin_ie_overrides.items())
+            if not display_list:
+                continue
+            write_debug(f'{plugin_type} Plugins: {", ".join(sorted(display_list))}')
+
+        plugin_dirs = plugin_directories()
+        if plugin_dirs:
+            write_debug(f'Plugin directories: {plugin_dirs}')
+
         # Not implemented
         if False and self.params.get('call_home'):
             ipaddr = self.urlopen('https://yt-dl.org/ip').read().decode()
@@ -3857,9 +3882,12 @@ def _setup_opener(self):
         # https://github.com/ytdl-org/youtube-dl/issues/8227)
         file_handler = urllib.request.FileHandler()
 
-        def file_open(*args, **kwargs):
-            raise urllib.error.URLError('file:// scheme is explicitly disabled in yt-dlp for security reasons')
-        file_handler.file_open = file_open
+        if not self.params.get('enable_file_urls'):
+            def file_open(*args, **kwargs):
+                raise urllib.error.URLError(
+                    'file:// URLs are explicitly disabled in yt-dlp for security reasons. '
+                    'Use --enable-file-urls to enable at your own risk.')
+            file_handler.file_open = file_open
 
         opener = urllib.request.build_opener(
             proxy_handler, https_handler, cookie_processor, ydlh, redirect_handler, data_handler, file_handler)
@@ -3921,7 +3949,7 @@ def _write_description(self, label, ie_result, descfn):
         elif not self.params.get('overwrites', True) and os.path.exists(descfn):
             self.to_screen(f'[info] {label.title()} description is already present')
         elif ie_result.get('description') is None:
-            self.report_warning(f'There\'s no {label} description to write')
+            self.to_screen(f'[info] There\'s no {label} description to write')
             return False
         else:
             try:
@@ -3937,15 +3965,18 @@ def _write_subtitles(self, info_dict, filename):
         ''' Write subtitles to file and return list of (sub_filename, final_sub_filename); or None if error'''
         ret = []
         subtitles = info_dict.get('requested_subtitles')
-        if not subtitles or not (self.params.get('writesubtitles') or self.params.get('writeautomaticsub')):
+        if not (self.params.get('writesubtitles') or self.params.get('writeautomaticsub')):
             # subtitles download errors are already managed as troubles in relevant IE
             # that way it will silently go on when used with unsupporting IE
             return ret
-
+        elif not subtitles:
+            self.to_screen('[info] There\'s no subtitles for the requested languages')
+            return ret
         sub_filename_base = self.prepare_filename(info_dict, 'subtitle')
         if not sub_filename_base:
             self.to_screen('[info] Skipping writing video subtitles')
             return ret
+
         for sub_lang, sub_info in subtitles.items():
             sub_format = sub_info['ext']
             sub_filename = subtitles_filename(filename, sub_lang, sub_format, info_dict.get('ext'))
@@ -3992,6 +4023,9 @@ def _write_thumbnails(self, label, info_dict, filename, thumb_filename_base=None
         thumbnails, ret = [], []
         if write_all or self.params.get('writethumbnail', False):
             thumbnails = info_dict.get('thumbnails') or []
+            if not thumbnails:
+                self.to_screen(f'[info] There\'s no {label} thumbnails to download')
+                return ret
         multiple = write_all and len(thumbnails) > 1
 
         if thumb_filename_base is None: