]>
Commit | Line | Data |
---|---|---|
e0df8241 JR |
1 | import email.message |
2 | import email.parser | |
3 | import logging | |
4 | import os | |
5 | import zipfile | |
6 | from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional | |
7 | ||
8 | from pip._vendor import pkg_resources | |
9 | from pip._vendor.packaging.requirements import Requirement | |
10 | from pip._vendor.packaging.utils import NormalizedName, canonicalize_name | |
11 | from pip._vendor.packaging.version import parse as parse_version | |
12 | ||
13 | from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel | |
14 | from pip._internal.utils.egg_link import egg_link_path_from_location | |
15 | from pip._internal.utils.misc import display_path, normalize_path | |
16 | from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file | |
17 | ||
18 | from .base import ( | |
19 | BaseDistribution, | |
20 | BaseEntryPoint, | |
21 | BaseEnvironment, | |
22 | DistributionVersion, | |
23 | InfoPath, | |
24 | Wheel, | |
25 | ) | |
26 | ||
27 | logger = logging.getLogger(__name__) | |
28 | ||
29 | ||
30 | class EntryPoint(NamedTuple): | |
31 | name: str | |
32 | value: str | |
33 | group: str | |
34 | ||
35 | ||
36 | class InMemoryMetadata: | |
37 | """IMetadataProvider that reads metadata files from a dictionary. | |
38 | ||
39 | This also maps metadata decoding exceptions to our internal exception type. | |
40 | """ | |
41 | ||
42 | def __init__(self, metadata: Mapping[str, bytes], wheel_name: str) -> None: | |
43 | self._metadata = metadata | |
44 | self._wheel_name = wheel_name | |
45 | ||
46 | def has_metadata(self, name: str) -> bool: | |
47 | return name in self._metadata | |
48 | ||
49 | def get_metadata(self, name: str) -> str: | |
50 | try: | |
51 | return self._metadata[name].decode() | |
52 | except UnicodeDecodeError as e: | |
53 | # Augment the default error with the origin of the file. | |
54 | raise UnsupportedWheel( | |
55 | f"Error decoding metadata for {self._wheel_name}: {e} in {name} file" | |
56 | ) | |
57 | ||
58 | def get_metadata_lines(self, name: str) -> Iterable[str]: | |
59 | return pkg_resources.yield_lines(self.get_metadata(name)) | |
60 | ||
61 | def metadata_isdir(self, name: str) -> bool: | |
62 | return False | |
63 | ||
64 | def metadata_listdir(self, name: str) -> List[str]: | |
65 | return [] | |
66 | ||
67 | def run_script(self, script_name: str, namespace: str) -> None: | |
68 | pass | |
69 | ||
70 | ||
71 | class Distribution(BaseDistribution): | |
72 | def __init__(self, dist: pkg_resources.Distribution) -> None: | |
73 | self._dist = dist | |
74 | ||
75 | @classmethod | |
76 | def from_directory(cls, directory: str) -> BaseDistribution: | |
77 | dist_dir = directory.rstrip(os.sep) | |
78 | ||
79 | # Build a PathMetadata object, from path to metadata. :wink: | |
80 | base_dir, dist_dir_name = os.path.split(dist_dir) | |
81 | metadata = pkg_resources.PathMetadata(base_dir, dist_dir) | |
82 | ||
83 | # Determine the correct Distribution object type. | |
84 | if dist_dir.endswith(".egg-info"): | |
85 | dist_cls = pkg_resources.Distribution | |
86 | dist_name = os.path.splitext(dist_dir_name)[0] | |
87 | else: | |
88 | assert dist_dir.endswith(".dist-info") | |
89 | dist_cls = pkg_resources.DistInfoDistribution | |
90 | dist_name = os.path.splitext(dist_dir_name)[0].split("-")[0] | |
91 | ||
92 | dist = dist_cls(base_dir, project_name=dist_name, metadata=metadata) | |
93 | return cls(dist) | |
94 | ||
95 | @classmethod | |
96 | def from_metadata_file_contents( | |
97 | cls, | |
98 | metadata_contents: bytes, | |
99 | filename: str, | |
100 | project_name: str, | |
101 | ) -> BaseDistribution: | |
102 | metadata_dict = { | |
103 | "METADATA": metadata_contents, | |
104 | } | |
105 | dist = pkg_resources.DistInfoDistribution( | |
106 | location=filename, | |
107 | metadata=InMemoryMetadata(metadata_dict, filename), | |
108 | project_name=project_name, | |
109 | ) | |
110 | return cls(dist) | |
111 | ||
112 | @classmethod | |
113 | def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution: | |
114 | try: | |
115 | with wheel.as_zipfile() as zf: | |
116 | info_dir, _ = parse_wheel(zf, name) | |
117 | metadata_dict = { | |
118 | path.split("/", 1)[-1]: read_wheel_metadata_file(zf, path) | |
119 | for path in zf.namelist() | |
120 | if path.startswith(f"{info_dir}/") | |
121 | } | |
122 | except zipfile.BadZipFile as e: | |
123 | raise InvalidWheel(wheel.location, name) from e | |
124 | except UnsupportedWheel as e: | |
125 | raise UnsupportedWheel(f"{name} has an invalid wheel, {e}") | |
126 | dist = pkg_resources.DistInfoDistribution( | |
127 | location=wheel.location, | |
128 | metadata=InMemoryMetadata(metadata_dict, wheel.location), | |
129 | project_name=name, | |
130 | ) | |
131 | return cls(dist) | |
132 | ||
133 | @property | |
134 | def location(self) -> Optional[str]: | |
135 | return self._dist.location | |
136 | ||
137 | @property | |
138 | def installed_location(self) -> Optional[str]: | |
139 | egg_link = egg_link_path_from_location(self.raw_name) | |
140 | if egg_link: | |
141 | location = egg_link | |
142 | elif self.location: | |
143 | location = self.location | |
144 | else: | |
145 | return None | |
146 | return normalize_path(location) | |
147 | ||
148 | @property | |
149 | def info_location(self) -> Optional[str]: | |
150 | return self._dist.egg_info | |
151 | ||
152 | @property | |
153 | def installed_by_distutils(self) -> bool: | |
154 | # A distutils-installed distribution is provided by FileMetadata. This | |
155 | # provider has a "path" attribute not present anywhere else. Not the | |
156 | # best introspection logic, but pip has been doing this for a long time. | |
157 | try: | |
158 | return bool(self._dist._provider.path) | |
159 | except AttributeError: | |
160 | return False | |
161 | ||
162 | @property | |
163 | def canonical_name(self) -> NormalizedName: | |
164 | return canonicalize_name(self._dist.project_name) | |
165 | ||
166 | @property | |
167 | def version(self) -> DistributionVersion: | |
168 | return parse_version(self._dist.version) | |
169 | ||
170 | def is_file(self, path: InfoPath) -> bool: | |
171 | return self._dist.has_metadata(str(path)) | |
172 | ||
173 | def iter_distutils_script_names(self) -> Iterator[str]: | |
174 | yield from self._dist.metadata_listdir("scripts") | |
175 | ||
176 | def read_text(self, path: InfoPath) -> str: | |
177 | name = str(path) | |
178 | if not self._dist.has_metadata(name): | |
179 | raise FileNotFoundError(name) | |
180 | content = self._dist.get_metadata(name) | |
181 | if content is None: | |
182 | raise NoneMetadataError(self, name) | |
183 | return content | |
184 | ||
185 | def iter_entry_points(self) -> Iterable[BaseEntryPoint]: | |
186 | for group, entries in self._dist.get_entry_map().items(): | |
187 | for name, entry_point in entries.items(): | |
188 | name, _, value = str(entry_point).partition("=") | |
189 | yield EntryPoint(name=name.strip(), value=value.strip(), group=group) | |
190 | ||
191 | def _metadata_impl(self) -> email.message.Message: | |
192 | """ | |
193 | :raises NoneMetadataError: if the distribution reports `has_metadata()` | |
194 | True but `get_metadata()` returns None. | |
195 | """ | |
196 | if isinstance(self._dist, pkg_resources.DistInfoDistribution): | |
197 | metadata_name = "METADATA" | |
198 | else: | |
199 | metadata_name = "PKG-INFO" | |
200 | try: | |
201 | metadata = self.read_text(metadata_name) | |
202 | except FileNotFoundError: | |
203 | if self.location: | |
204 | displaying_path = display_path(self.location) | |
205 | else: | |
206 | displaying_path = repr(self.location) | |
207 | logger.warning("No metadata found in %s", displaying_path) | |
208 | metadata = "" | |
209 | feed_parser = email.parser.FeedParser() | |
210 | feed_parser.feed(metadata) | |
211 | return feed_parser.close() | |
212 | ||
213 | def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]: | |
214 | if extras: # pkg_resources raises on invalid extras, so we sanitize. | |
215 | extras = frozenset(extras).intersection(self._dist.extras) | |
216 | return self._dist.requires(extras) | |
217 | ||
218 | def iter_provided_extras(self) -> Iterable[str]: | |
219 | return self._dist.extras | |
220 | ||
221 | ||
222 | class Environment(BaseEnvironment): | |
223 | def __init__(self, ws: pkg_resources.WorkingSet) -> None: | |
224 | self._ws = ws | |
225 | ||
226 | @classmethod | |
227 | def default(cls) -> BaseEnvironment: | |
228 | return cls(pkg_resources.working_set) | |
229 | ||
230 | @classmethod | |
231 | def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment: | |
232 | return cls(pkg_resources.WorkingSet(paths)) | |
233 | ||
234 | def _iter_distributions(self) -> Iterator[BaseDistribution]: | |
235 | for dist in self._ws: | |
236 | yield Distribution(dist) | |
237 | ||
238 | def _search_distribution(self, name: str) -> Optional[BaseDistribution]: | |
239 | """Find a distribution matching the ``name`` in the environment. | |
240 | ||
241 | This searches from *all* distributions available in the environment, to | |
242 | match the behavior of ``pkg_resources.get_distribution()``. | |
243 | """ | |
244 | canonical_name = canonicalize_name(name) | |
245 | for dist in self.iter_all_distributions(): | |
246 | if dist.canonical_name == canonical_name: | |
247 | return dist | |
248 | return None | |
249 | ||
250 | def get_distribution(self, name: str) -> Optional[BaseDistribution]: | |
251 | # Search the distribution by looking through the working set. | |
252 | dist = self._search_distribution(name) | |
253 | if dist: | |
254 | return dist | |
255 | ||
256 | # If distribution could not be found, call working_set.require to | |
257 | # update the working set, and try to find the distribution again. | |
258 | # This might happen for e.g. when you install a package twice, once | |
259 | # using setup.py develop and again using setup.py install. Now when | |
260 | # running pip uninstall twice, the package gets removed from the | |
261 | # working set in the first uninstall, so we have to populate the | |
262 | # working set again so that pip knows about it and the packages gets | |
263 | # picked up and is successfully uninstalled the second time too. | |
264 | try: | |
265 | # We didn't pass in any version specifiers, so this can never | |
266 | # raise pkg_resources.VersionConflict. | |
267 | self._ws.require(name) | |
268 | except pkg_resources.DistributionNotFound: | |
269 | return None | |
270 | return self._search_distribution(name) |