]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/pip/_internal/index/collector.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / pip / _internal / index / collector.py
1 """
2 The main purpose of this module is to expose LinkCollector.collect_sources().
3 """
4
5 import collections
6 import email.message
7 import functools
8 import itertools
9 import json
10 import logging
11 import os
12 import urllib.parse
13 import urllib.request
14 from html.parser import HTMLParser
15 from optparse import Values
16 from typing import (
17 TYPE_CHECKING,
18 Callable,
19 Dict,
20 Iterable,
21 List,
22 MutableMapping,
23 NamedTuple,
24 Optional,
25 Sequence,
26 Tuple,
27 Union,
28 )
29
30 from pip._vendor import requests
31 from pip._vendor.requests import Response
32 from pip._vendor.requests.exceptions import RetryError, SSLError
33
34 from pip._internal.exceptions import NetworkConnectionError
35 from pip._internal.models.link import Link
36 from pip._internal.models.search_scope import SearchScope
37 from pip._internal.network.session import PipSession
38 from pip._internal.network.utils import raise_for_status
39 from pip._internal.utils.filetypes import is_archive_file
40 from pip._internal.utils.misc import redact_auth_from_url
41 from pip._internal.vcs import vcs
42
43 from .sources import CandidatesFromPage, LinkSource, build_source
44
45 if TYPE_CHECKING:
46 from typing import Protocol
47 else:
48 Protocol = object
49
50 logger = logging.getLogger(__name__)
51
52 ResponseHeaders = MutableMapping[str, str]
53
54
55 def _match_vcs_scheme(url: str) -> Optional[str]:
56 """Look for VCS schemes in the URL.
57
58 Returns the matched VCS scheme, or None if there's no match.
59 """
60 for scheme in vcs.schemes:
61 if url.lower().startswith(scheme) and url[len(scheme)] in "+:":
62 return scheme
63 return None
64
65
66 class _NotAPIContent(Exception):
67 def __init__(self, content_type: str, request_desc: str) -> None:
68 super().__init__(content_type, request_desc)
69 self.content_type = content_type
70 self.request_desc = request_desc
71
72
73 def _ensure_api_header(response: Response) -> None:
74 """
75 Check the Content-Type header to ensure the response contains a Simple
76 API Response.
77
78 Raises `_NotAPIContent` if the content type is not a valid content-type.
79 """
80 content_type = response.headers.get("Content-Type", "Unknown")
81
82 content_type_l = content_type.lower()
83 if content_type_l.startswith(
84 (
85 "text/html",
86 "application/vnd.pypi.simple.v1+html",
87 "application/vnd.pypi.simple.v1+json",
88 )
89 ):
90 return
91
92 raise _NotAPIContent(content_type, response.request.method)
93
94
95 class _NotHTTP(Exception):
96 pass
97
98
99 def _ensure_api_response(url: str, session: PipSession) -> None:
100 """
101 Send a HEAD request to the URL, and ensure the response contains a simple
102 API Response.
103
104 Raises `_NotHTTP` if the URL is not available for a HEAD request, or
105 `_NotAPIContent` if the content type is not a valid content type.
106 """
107 scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url)
108 if scheme not in {"http", "https"}:
109 raise _NotHTTP()
110
111 resp = session.head(url, allow_redirects=True)
112 raise_for_status(resp)
113
114 _ensure_api_header(resp)
115
116
117 def _get_simple_response(url: str, session: PipSession) -> Response:
118 """Access an Simple API response with GET, and return the response.
119
120 This consists of three parts:
121
122 1. If the URL looks suspiciously like an archive, send a HEAD first to
123 check the Content-Type is HTML or Simple API, to avoid downloading a
124 large file. Raise `_NotHTTP` if the content type cannot be determined, or
125 `_NotAPIContent` if it is not HTML or a Simple API.
126 2. Actually perform the request. Raise HTTP exceptions on network failures.
127 3. Check the Content-Type header to make sure we got a Simple API response,
128 and raise `_NotAPIContent` otherwise.
129 """
130 if is_archive_file(Link(url).filename):
131 _ensure_api_response(url, session=session)
132
133 logger.debug("Getting page %s", redact_auth_from_url(url))
134
135 resp = session.get(
136 url,
137 headers={
138 "Accept": ", ".join(
139 [
140 "application/vnd.pypi.simple.v1+json",
141 "application/vnd.pypi.simple.v1+html; q=0.1",
142 "text/html; q=0.01",
143 ]
144 ),
145 # We don't want to blindly returned cached data for
146 # /simple/, because authors generally expecting that
147 # twine upload && pip install will function, but if
148 # they've done a pip install in the last ~10 minutes
149 # it won't. Thus by setting this to zero we will not
150 # blindly use any cached data, however the benefit of
151 # using max-age=0 instead of no-cache, is that we will
152 # still support conditional requests, so we will still
153 # minimize traffic sent in cases where the page hasn't
154 # changed at all, we will just always incur the round
155 # trip for the conditional GET now instead of only
156 # once per 10 minutes.
157 # For more information, please see pypa/pip#5670.
158 "Cache-Control": "max-age=0",
159 },
160 )
161 raise_for_status(resp)
162
163 # The check for archives above only works if the url ends with
164 # something that looks like an archive. However that is not a
165 # requirement of an url. Unless we issue a HEAD request on every
166 # url we cannot know ahead of time for sure if something is a
167 # Simple API response or not. However we can check after we've
168 # downloaded it.
169 _ensure_api_header(resp)
170
171 logger.debug(
172 "Fetched page %s as %s",
173 redact_auth_from_url(url),
174 resp.headers.get("Content-Type", "Unknown"),
175 )
176
177 return resp
178
179
180 def _get_encoding_from_headers(headers: ResponseHeaders) -> Optional[str]:
181 """Determine if we have any encoding information in our headers."""
182 if headers and "Content-Type" in headers:
183 m = email.message.Message()
184 m["content-type"] = headers["Content-Type"]
185 charset = m.get_param("charset")
186 if charset:
187 return str(charset)
188 return None
189
190
191 class CacheablePageContent:
192 def __init__(self, page: "IndexContent") -> None:
193 assert page.cache_link_parsing
194 self.page = page
195
196 def __eq__(self, other: object) -> bool:
197 return isinstance(other, type(self)) and self.page.url == other.page.url
198
199 def __hash__(self) -> int:
200 return hash(self.page.url)
201
202
203 class ParseLinks(Protocol):
204 def __call__(self, page: "IndexContent") -> Iterable[Link]:
205 ...
206
207
208 def with_cached_index_content(fn: ParseLinks) -> ParseLinks:
209 """
210 Given a function that parses an Iterable[Link] from an IndexContent, cache the
211 function's result (keyed by CacheablePageContent), unless the IndexContent
212 `page` has `page.cache_link_parsing == False`.
213 """
214
215 @functools.lru_cache(maxsize=None)
216 def wrapper(cacheable_page: CacheablePageContent) -> List[Link]:
217 return list(fn(cacheable_page.page))
218
219 @functools.wraps(fn)
220 def wrapper_wrapper(page: "IndexContent") -> List[Link]:
221 if page.cache_link_parsing:
222 return wrapper(CacheablePageContent(page))
223 return list(fn(page))
224
225 return wrapper_wrapper
226
227
228 @with_cached_index_content
229 def parse_links(page: "IndexContent") -> Iterable[Link]:
230 """
231 Parse a Simple API's Index Content, and yield its anchor elements as Link objects.
232 """
233
234 content_type_l = page.content_type.lower()
235 if content_type_l.startswith("application/vnd.pypi.simple.v1+json"):
236 data = json.loads(page.content)
237 for file in data.get("files", []):
238 link = Link.from_json(file, page.url)
239 if link is None:
240 continue
241 yield link
242 return
243
244 parser = HTMLLinkParser(page.url)
245 encoding = page.encoding or "utf-8"
246 parser.feed(page.content.decode(encoding))
247
248 url = page.url
249 base_url = parser.base_url or url
250 for anchor in parser.anchors:
251 link = Link.from_element(anchor, page_url=url, base_url=base_url)
252 if link is None:
253 continue
254 yield link
255
256
257 class IndexContent:
258 """Represents one response (or page), along with its URL"""
259
260 def __init__(
261 self,
262 content: bytes,
263 content_type: str,
264 encoding: Optional[str],
265 url: str,
266 cache_link_parsing: bool = True,
267 ) -> None:
268 """
269 :param encoding: the encoding to decode the given content.
270 :param url: the URL from which the HTML was downloaded.
271 :param cache_link_parsing: whether links parsed from this page's url
272 should be cached. PyPI index urls should
273 have this set to False, for example.
274 """
275 self.content = content
276 self.content_type = content_type
277 self.encoding = encoding
278 self.url = url
279 self.cache_link_parsing = cache_link_parsing
280
281 def __str__(self) -> str:
282 return redact_auth_from_url(self.url)
283
284
285 class HTMLLinkParser(HTMLParser):
286 """
287 HTMLParser that keeps the first base HREF and a list of all anchor
288 elements' attributes.
289 """
290
291 def __init__(self, url: str) -> None:
292 super().__init__(convert_charrefs=True)
293
294 self.url: str = url
295 self.base_url: Optional[str] = None
296 self.anchors: List[Dict[str, Optional[str]]] = []
297
298 def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
299 if tag == "base" and self.base_url is None:
300 href = self.get_href(attrs)
301 if href is not None:
302 self.base_url = href
303 elif tag == "a":
304 self.anchors.append(dict(attrs))
305
306 def get_href(self, attrs: List[Tuple[str, Optional[str]]]) -> Optional[str]:
307 for name, value in attrs:
308 if name == "href":
309 return value
310 return None
311
312
313 def _handle_get_simple_fail(
314 link: Link,
315 reason: Union[str, Exception],
316 meth: Optional[Callable[..., None]] = None,
317 ) -> None:
318 if meth is None:
319 meth = logger.debug
320 meth("Could not fetch URL %s: %s - skipping", link, reason)
321
322
323 def _make_index_content(
324 response: Response, cache_link_parsing: bool = True
325 ) -> IndexContent:
326 encoding = _get_encoding_from_headers(response.headers)
327 return IndexContent(
328 response.content,
329 response.headers["Content-Type"],
330 encoding=encoding,
331 url=response.url,
332 cache_link_parsing=cache_link_parsing,
333 )
334
335
336 def _get_index_content(link: Link, *, session: PipSession) -> Optional["IndexContent"]:
337 url = link.url.split("#", 1)[0]
338
339 # Check for VCS schemes that do not support lookup as web pages.
340 vcs_scheme = _match_vcs_scheme(url)
341 if vcs_scheme:
342 logger.warning(
343 "Cannot look at %s URL %s because it does not support lookup as web pages.",
344 vcs_scheme,
345 link,
346 )
347 return None
348
349 # Tack index.html onto file:// URLs that point to directories
350 scheme, _, path, _, _, _ = urllib.parse.urlparse(url)
351 if scheme == "file" and os.path.isdir(urllib.request.url2pathname(path)):
352 # add trailing slash if not present so urljoin doesn't trim
353 # final segment
354 if not url.endswith("/"):
355 url += "/"
356 # TODO: In the future, it would be nice if pip supported PEP 691
357 # style responses in the file:// URLs, however there's no
358 # standard file extension for application/vnd.pypi.simple.v1+json
359 # so we'll need to come up with something on our own.
360 url = urllib.parse.urljoin(url, "index.html")
361 logger.debug(" file: URL is directory, getting %s", url)
362
363 try:
364 resp = _get_simple_response(url, session=session)
365 except _NotHTTP:
366 logger.warning(
367 "Skipping page %s because it looks like an archive, and cannot "
368 "be checked by a HTTP HEAD request.",
369 link,
370 )
371 except _NotAPIContent as exc:
372 logger.warning(
373 "Skipping page %s because the %s request got Content-Type: %s. "
374 "The only supported Content-Types are application/vnd.pypi.simple.v1+json, "
375 "application/vnd.pypi.simple.v1+html, and text/html",
376 link,
377 exc.request_desc,
378 exc.content_type,
379 )
380 except NetworkConnectionError as exc:
381 _handle_get_simple_fail(link, exc)
382 except RetryError as exc:
383 _handle_get_simple_fail(link, exc)
384 except SSLError as exc:
385 reason = "There was a problem confirming the ssl certificate: "
386 reason += str(exc)
387 _handle_get_simple_fail(link, reason, meth=logger.info)
388 except requests.ConnectionError as exc:
389 _handle_get_simple_fail(link, f"connection error: {exc}")
390 except requests.Timeout:
391 _handle_get_simple_fail(link, "timed out")
392 else:
393 return _make_index_content(resp, cache_link_parsing=link.cache_link_parsing)
394 return None
395
396
397 class CollectedSources(NamedTuple):
398 find_links: Sequence[Optional[LinkSource]]
399 index_urls: Sequence[Optional[LinkSource]]
400
401
402 class LinkCollector:
403
404 """
405 Responsible for collecting Link objects from all configured locations,
406 making network requests as needed.
407
408 The class's main method is its collect_sources() method.
409 """
410
411 def __init__(
412 self,
413 session: PipSession,
414 search_scope: SearchScope,
415 ) -> None:
416 self.search_scope = search_scope
417 self.session = session
418
419 @classmethod
420 def create(
421 cls,
422 session: PipSession,
423 options: Values,
424 suppress_no_index: bool = False,
425 ) -> "LinkCollector":
426 """
427 :param session: The Session to use to make requests.
428 :param suppress_no_index: Whether to ignore the --no-index option
429 when constructing the SearchScope object.
430 """
431 index_urls = [options.index_url] + options.extra_index_urls
432 if options.no_index and not suppress_no_index:
433 logger.debug(
434 "Ignoring indexes: %s",
435 ",".join(redact_auth_from_url(url) for url in index_urls),
436 )
437 index_urls = []
438
439 # Make sure find_links is a list before passing to create().
440 find_links = options.find_links or []
441
442 search_scope = SearchScope.create(
443 find_links=find_links,
444 index_urls=index_urls,
445 no_index=options.no_index,
446 )
447 link_collector = LinkCollector(
448 session=session,
449 search_scope=search_scope,
450 )
451 return link_collector
452
453 @property
454 def find_links(self) -> List[str]:
455 return self.search_scope.find_links
456
457 def fetch_response(self, location: Link) -> Optional[IndexContent]:
458 """
459 Fetch an HTML page containing package links.
460 """
461 return _get_index_content(location, session=self.session)
462
463 def collect_sources(
464 self,
465 project_name: str,
466 candidates_from_page: CandidatesFromPage,
467 ) -> CollectedSources:
468 # The OrderedDict calls deduplicate sources by URL.
469 index_url_sources = collections.OrderedDict(
470 build_source(
471 loc,
472 candidates_from_page=candidates_from_page,
473 page_validator=self.session.is_secure_origin,
474 expand_dir=False,
475 cache_link_parsing=False,
476 )
477 for loc in self.search_scope.get_index_urls_locations(project_name)
478 ).values()
479 find_links_sources = collections.OrderedDict(
480 build_source(
481 loc,
482 candidates_from_page=candidates_from_page,
483 page_validator=self.session.is_secure_origin,
484 expand_dir=True,
485 cache_link_parsing=True,
486 )
487 for loc in self.find_links
488 ).values()
489
490 if logger.isEnabledFor(logging.DEBUG):
491 lines = [
492 f"* {s.link}"
493 for s in itertools.chain(find_links_sources, index_url_sources)
494 if s is not None and s.link is not None
495 ]
496 lines = [
497 f"{len(lines)} location(s) to search "
498 f"for versions of {project_name}:"
499 ] + lines
500 logger.debug("\n".join(lines))
501
502 return CollectedSources(
503 find_links=list(find_links_sources),
504 index_urls=list(index_url_sources),
505 )