]> jfr.im git - yt-dlp.git/blob - yt_dlp/plugins.py
[ie/youtube] Extract upload timestamp if available (#9856)
[yt-dlp.git] / yt_dlp / plugins.py
1 import contextlib
2 import importlib
3 import importlib.abc
4 import importlib.machinery
5 import importlib.util
6 import inspect
7 import itertools
8 import pkgutil
9 import sys
10 import traceback
11 import zipimport
12 from pathlib import Path
13 from zipfile import ZipFile
14
15 from .compat import functools # isort: split
16 from .utils import (
17 get_executable_path,
18 get_system_config_dirs,
19 get_user_config_dirs,
20 orderedSet,
21 write_string,
22 )
23
24 PACKAGE_NAME = 'yt_dlp_plugins'
25 COMPAT_PACKAGE_NAME = 'ytdlp_plugins'
26
27
28 class PluginLoader(importlib.abc.Loader):
29 """Dummy loader for virtual namespace packages"""
30
31 def exec_module(self, module):
32 return None
33
34
35 @functools.cache
36 def dirs_in_zip(archive):
37 try:
38 with ZipFile(archive) as zip_:
39 return set(itertools.chain.from_iterable(
40 Path(file).parents for file in zip_.namelist()))
41 except FileNotFoundError:
42 pass
43 except Exception as e:
44 write_string(f'WARNING: Could not read zip file {archive}: {e}\n')
45 return set()
46
47
48 class PluginFinder(importlib.abc.MetaPathFinder):
49 """
50 This class provides one or multiple namespace packages.
51 It searches in sys.path and yt-dlp config folders for
52 the existing subdirectories from which the modules can be imported
53 """
54
55 def __init__(self, *packages):
56 self._zip_content_cache = {}
57 self.packages = set(itertools.chain.from_iterable(
58 itertools.accumulate(name.split('.'), lambda a, b: '.'.join((a, b)))
59 for name in packages))
60
61 def search_locations(self, fullname):
62 candidate_locations = []
63
64 def _get_package_paths(*root_paths, containing_folder='plugins'):
65 for config_dir in orderedSet(map(Path, root_paths), lazy=True):
66 with contextlib.suppress(OSError):
67 yield from (config_dir / containing_folder).iterdir()
68
69 # Load from yt-dlp config folders
70 candidate_locations.extend(_get_package_paths(
71 *get_user_config_dirs('yt-dlp'),
72 *get_system_config_dirs('yt-dlp'),
73 containing_folder='plugins'))
74
75 # Load from yt-dlp-plugins folders
76 candidate_locations.extend(_get_package_paths(
77 get_executable_path(),
78 *get_user_config_dirs(''),
79 *get_system_config_dirs(''),
80 containing_folder='yt-dlp-plugins'))
81
82 candidate_locations.extend(map(Path, sys.path)) # PYTHONPATH
83 with contextlib.suppress(ValueError): # Added when running __main__.py directly
84 candidate_locations.remove(Path(__file__).parent)
85
86 parts = Path(*fullname.split('.'))
87 for path in orderedSet(candidate_locations, lazy=True):
88 candidate = path / parts
89 try:
90 if candidate.is_dir():
91 yield candidate
92 elif path.suffix in ('.zip', '.egg', '.whl') and path.is_file():
93 if parts in dirs_in_zip(path):
94 yield candidate
95 except PermissionError as e:
96 write_string(f'Permission error while accessing modules in "{e.filename}"\n')
97
98 def find_spec(self, fullname, path=None, target=None):
99 if fullname not in self.packages:
100 return None
101
102 search_locations = list(map(str, self.search_locations(fullname)))
103 if not search_locations:
104 return None
105
106 spec = importlib.machinery.ModuleSpec(fullname, PluginLoader(), is_package=True)
107 spec.submodule_search_locations = search_locations
108 return spec
109
110 def invalidate_caches(self):
111 dirs_in_zip.cache_clear()
112 for package in self.packages:
113 if package in sys.modules:
114 del sys.modules[package]
115
116
117 def directories():
118 spec = importlib.util.find_spec(PACKAGE_NAME)
119 return spec.submodule_search_locations if spec else []
120
121
122 def iter_modules(subpackage):
123 fullname = f'{PACKAGE_NAME}.{subpackage}'
124 with contextlib.suppress(ModuleNotFoundError):
125 pkg = importlib.import_module(fullname)
126 yield from pkgutil.iter_modules(path=pkg.__path__, prefix=f'{fullname}.')
127
128
129 def load_module(module, module_name, suffix):
130 return inspect.getmembers(module, lambda obj: (
131 inspect.isclass(obj)
132 and obj.__name__.endswith(suffix)
133 and obj.__module__.startswith(module_name)
134 and not obj.__name__.startswith('_')
135 and obj.__name__ in getattr(module, '__all__', [obj.__name__])))
136
137
138 def load_plugins(name, suffix):
139 classes = {}
140
141 for finder, module_name, _ in iter_modules(name):
142 if any(x.startswith('_') for x in module_name.split('.')):
143 continue
144 try:
145 if sys.version_info < (3, 10) and isinstance(finder, zipimport.zipimporter):
146 # zipimporter.load_module() is deprecated in 3.10 and removed in 3.12
147 # The exec_module branch below is the replacement for >= 3.10
148 # See: https://docs.python.org/3/library/zipimport.html#zipimport.zipimporter.exec_module
149 module = finder.load_module(module_name)
150 else:
151 spec = finder.find_spec(module_name)
152 module = importlib.util.module_from_spec(spec)
153 sys.modules[module_name] = module
154 spec.loader.exec_module(module)
155 except Exception:
156 write_string(f'Error while importing module {module_name!r}\n{traceback.format_exc(limit=-1)}')
157 continue
158 classes.update(load_module(module, module_name, suffix))
159
160 # Compat: old plugin system using __init__.py
161 # Note: plugins imported this way do not show up in directories()
162 # nor are considered part of the yt_dlp_plugins namespace package
163 with contextlib.suppress(FileNotFoundError):
164 spec = importlib.util.spec_from_file_location(
165 name, Path(get_executable_path(), COMPAT_PACKAGE_NAME, name, '__init__.py'))
166 plugins = importlib.util.module_from_spec(spec)
167 sys.modules[spec.name] = plugins
168 spec.loader.exec_module(plugins)
169 classes.update(load_module(plugins, spec.name, suffix))
170
171 return classes
172
173
174 sys.meta_path.insert(0, PluginFinder(f'{PACKAGE_NAME}.extractor', f'{PACKAGE_NAME}.postprocessor'))
175
176 __all__ = ['directories', 'load_plugins', 'PACKAGE_NAME', 'COMPAT_PACKAGE_NAME']