]>
Commit | Line | Data |
---|---|---|
1 | import contextlib | |
2 | import importlib | |
3 | import importlib.abc | |
4 | import importlib.machinery | |
5 | import importlib.util | |
6 | import inspect | |
7 | import itertools | |
8 | import pkgutil | |
9 | import sys | |
10 | import traceback | |
11 | import zipimport | |
12 | from pathlib import Path | |
13 | from zipfile import ZipFile | |
14 | ||
15 | from .compat import functools # isort: split | |
16 | from .utils import ( | |
17 | get_executable_path, | |
18 | get_system_config_dirs, | |
19 | get_user_config_dirs, | |
20 | orderedSet, | |
21 | write_string, | |
22 | ) | |
23 | ||
24 | PACKAGE_NAME = 'yt_dlp_plugins' | |
25 | COMPAT_PACKAGE_NAME = 'ytdlp_plugins' | |
26 | ||
27 | ||
28 | class PluginLoader(importlib.abc.Loader): | |
29 | """Dummy loader for virtual namespace packages""" | |
30 | ||
31 | def exec_module(self, module): | |
32 | return None | |
33 | ||
34 | ||
35 | @functools.cache | |
36 | def dirs_in_zip(archive): | |
37 | try: | |
38 | with ZipFile(archive) as zip_: | |
39 | return set(itertools.chain.from_iterable( | |
40 | Path(file).parents for file in zip_.namelist())) | |
41 | except FileNotFoundError: | |
42 | pass | |
43 | except Exception as e: | |
44 | write_string(f'WARNING: Could not read zip file {archive}: {e}\n') | |
45 | return set() | |
46 | ||
47 | ||
48 | class PluginFinder(importlib.abc.MetaPathFinder): | |
49 | """ | |
50 | This class provides one or multiple namespace packages. | |
51 | It searches in sys.path and yt-dlp config folders for | |
52 | the existing subdirectories from which the modules can be imported | |
53 | """ | |
54 | ||
55 | def __init__(self, *packages): | |
56 | self._zip_content_cache = {} | |
57 | self.packages = set(itertools.chain.from_iterable( | |
58 | itertools.accumulate(name.split('.'), lambda a, b: '.'.join((a, b))) | |
59 | for name in packages)) | |
60 | ||
61 | def search_locations(self, fullname): | |
62 | candidate_locations = [] | |
63 | ||
64 | def _get_package_paths(*root_paths, containing_folder='plugins'): | |
65 | for config_dir in orderedSet(map(Path, root_paths), lazy=True): | |
66 | with contextlib.suppress(OSError): | |
67 | yield from (config_dir / containing_folder).iterdir() | |
68 | ||
69 | # Load from yt-dlp config folders | |
70 | candidate_locations.extend(_get_package_paths( | |
71 | *get_user_config_dirs('yt-dlp'), | |
72 | *get_system_config_dirs('yt-dlp'), | |
73 | containing_folder='plugins')) | |
74 | ||
75 | # Load from yt-dlp-plugins folders | |
76 | candidate_locations.extend(_get_package_paths( | |
77 | get_executable_path(), | |
78 | *get_user_config_dirs(''), | |
79 | *get_system_config_dirs(''), | |
80 | containing_folder='yt-dlp-plugins')) | |
81 | ||
82 | candidate_locations.extend(map(Path, sys.path)) # PYTHONPATH | |
83 | with contextlib.suppress(ValueError): # Added when running __main__.py directly | |
84 | candidate_locations.remove(Path(__file__).parent) | |
85 | ||
86 | parts = Path(*fullname.split('.')) | |
87 | for path in orderedSet(candidate_locations, lazy=True): | |
88 | candidate = path / parts | |
89 | try: | |
90 | if candidate.is_dir(): | |
91 | yield candidate | |
92 | elif path.suffix in ('.zip', '.egg', '.whl') and path.is_file(): | |
93 | if parts in dirs_in_zip(path): | |
94 | yield candidate | |
95 | except PermissionError as e: | |
96 | write_string(f'Permission error while accessing modules in "{e.filename}"\n') | |
97 | ||
98 | def find_spec(self, fullname, path=None, target=None): | |
99 | if fullname not in self.packages: | |
100 | return None | |
101 | ||
102 | search_locations = list(map(str, self.search_locations(fullname))) | |
103 | if not search_locations: | |
104 | return None | |
105 | ||
106 | spec = importlib.machinery.ModuleSpec(fullname, PluginLoader(), is_package=True) | |
107 | spec.submodule_search_locations = search_locations | |
108 | return spec | |
109 | ||
110 | def invalidate_caches(self): | |
111 | dirs_in_zip.cache_clear() | |
112 | for package in self.packages: | |
113 | if package in sys.modules: | |
114 | del sys.modules[package] | |
115 | ||
116 | ||
117 | def directories(): | |
118 | spec = importlib.util.find_spec(PACKAGE_NAME) | |
119 | return spec.submodule_search_locations if spec else [] | |
120 | ||
121 | ||
122 | def iter_modules(subpackage): | |
123 | fullname = f'{PACKAGE_NAME}.{subpackage}' | |
124 | with contextlib.suppress(ModuleNotFoundError): | |
125 | pkg = importlib.import_module(fullname) | |
126 | yield from pkgutil.iter_modules(path=pkg.__path__, prefix=f'{fullname}.') | |
127 | ||
128 | ||
129 | def load_module(module, module_name, suffix): | |
130 | return inspect.getmembers(module, lambda obj: ( | |
131 | inspect.isclass(obj) | |
132 | and obj.__name__.endswith(suffix) | |
133 | and obj.__module__.startswith(module_name) | |
134 | and not obj.__name__.startswith('_') | |
135 | and obj.__name__ in getattr(module, '__all__', [obj.__name__]))) | |
136 | ||
137 | ||
138 | def load_plugins(name, suffix): | |
139 | classes = {} | |
140 | ||
141 | for finder, module_name, _ in iter_modules(name): | |
142 | if any(x.startswith('_') for x in module_name.split('.')): | |
143 | continue | |
144 | try: | |
145 | if sys.version_info < (3, 10) and isinstance(finder, zipimport.zipimporter): | |
146 | # zipimporter.load_module() is deprecated in 3.10 and removed in 3.12 | |
147 | # The exec_module branch below is the replacement for >= 3.10 | |
148 | # See: https://docs.python.org/3/library/zipimport.html#zipimport.zipimporter.exec_module | |
149 | module = finder.load_module(module_name) | |
150 | else: | |
151 | spec = finder.find_spec(module_name) | |
152 | module = importlib.util.module_from_spec(spec) | |
153 | sys.modules[module_name] = module | |
154 | spec.loader.exec_module(module) | |
155 | except Exception: | |
156 | write_string(f'Error while importing module {module_name!r}\n{traceback.format_exc(limit=-1)}') | |
157 | continue | |
158 | classes.update(load_module(module, module_name, suffix)) | |
159 | ||
160 | # Compat: old plugin system using __init__.py | |
161 | # Note: plugins imported this way do not show up in directories() | |
162 | # nor are considered part of the yt_dlp_plugins namespace package | |
163 | with contextlib.suppress(FileNotFoundError): | |
164 | spec = importlib.util.spec_from_file_location( | |
165 | name, Path(get_executable_path(), COMPAT_PACKAGE_NAME, name, '__init__.py')) | |
166 | plugins = importlib.util.module_from_spec(spec) | |
167 | sys.modules[spec.name] = plugins | |
168 | spec.loader.exec_module(plugins) | |
169 | classes.update(load_module(plugins, spec.name, suffix)) | |
170 | ||
171 | return classes | |
172 | ||
173 | ||
174 | sys.meta_path.insert(0, PluginFinder(f'{PACKAGE_NAME}.extractor', f'{PACKAGE_NAME}.postprocessor')) | |
175 | ||
176 | __all__ = ['directories', 'load_plugins', 'PACKAGE_NAME', 'COMPAT_PACKAGE_NAME'] |