]> jfr.im git - dlqueue.git/blobdiff - venv/lib/python3.11/site-packages/pip/_internal/network/session.py
hello world
[dlqueue.git] / venv / lib / python3.11 / site-packages / pip / _internal / network / session.py
diff --git a/venv/lib/python3.11/site-packages/pip/_internal/network/session.py b/venv/lib/python3.11/site-packages/pip/_internal/network/session.py
deleted file mode 100644 (file)
index 887dc14..0000000
+++ /dev/null
@@ -1,519 +0,0 @@
-"""PipSession and supporting code, containing all pip-specific
-network request configuration and behavior.
-"""
-
-import email.utils
-import io
-import ipaddress
-import json
-import logging
-import mimetypes
-import os
-import platform
-import shutil
-import subprocess
-import sys
-import urllib.parse
-import warnings
-from typing import (
-    TYPE_CHECKING,
-    Any,
-    Dict,
-    Generator,
-    List,
-    Mapping,
-    Optional,
-    Sequence,
-    Tuple,
-    Union,
-)
-
-from pip._vendor import requests, urllib3
-from pip._vendor.cachecontrol import CacheControlAdapter as _BaseCacheControlAdapter
-from pip._vendor.requests.adapters import DEFAULT_POOLBLOCK, BaseAdapter
-from pip._vendor.requests.adapters import HTTPAdapter as _BaseHTTPAdapter
-from pip._vendor.requests.models import PreparedRequest, Response
-from pip._vendor.requests.structures import CaseInsensitiveDict
-from pip._vendor.urllib3.connectionpool import ConnectionPool
-from pip._vendor.urllib3.exceptions import InsecureRequestWarning
-
-from pip import __version__
-from pip._internal.metadata import get_default_environment
-from pip._internal.models.link import Link
-from pip._internal.network.auth import MultiDomainBasicAuth
-from pip._internal.network.cache import SafeFileCache
-
-# Import ssl from compat so the initial import occurs in only one place.
-from pip._internal.utils.compat import has_tls
-from pip._internal.utils.glibc import libc_ver
-from pip._internal.utils.misc import build_url_from_netloc, parse_netloc
-from pip._internal.utils.urls import url_to_path
-
-if TYPE_CHECKING:
-    from ssl import SSLContext
-
-    from pip._vendor.urllib3.poolmanager import PoolManager
-
-
-logger = logging.getLogger(__name__)
-
-SecureOrigin = Tuple[str, str, Optional[Union[int, str]]]
-
-
-# Ignore warning raised when using --trusted-host.
-warnings.filterwarnings("ignore", category=InsecureRequestWarning)
-
-
-SECURE_ORIGINS: List[SecureOrigin] = [
-    # protocol, hostname, port
-    # Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
-    ("https", "*", "*"),
-    ("*", "localhost", "*"),
-    ("*", "127.0.0.0/8", "*"),
-    ("*", "::1/128", "*"),
-    ("file", "*", None),
-    # ssh is always secure.
-    ("ssh", "*", "*"),
-]
-
-
-# These are environment variables present when running under various
-# CI systems.  For each variable, some CI systems that use the variable
-# are indicated.  The collection was chosen so that for each of a number
-# of popular systems, at least one of the environment variables is used.
-# This list is used to provide some indication of and lower bound for
-# CI traffic to PyPI.  Thus, it is okay if the list is not comprehensive.
-# For more background, see: https://github.com/pypa/pip/issues/5499
-CI_ENVIRONMENT_VARIABLES = (
-    # Azure Pipelines
-    "BUILD_BUILDID",
-    # Jenkins
-    "BUILD_ID",
-    # AppVeyor, CircleCI, Codeship, Gitlab CI, Shippable, Travis CI
-    "CI",
-    # Explicit environment variable.
-    "PIP_IS_CI",
-)
-
-
-def looks_like_ci() -> bool:
-    """
-    Return whether it looks like pip is running under CI.
-    """
-    # We don't use the method of checking for a tty (e.g. using isatty())
-    # because some CI systems mimic a tty (e.g. Travis CI).  Thus that
-    # method doesn't provide definitive information in either direction.
-    return any(name in os.environ for name in CI_ENVIRONMENT_VARIABLES)
-
-
-def user_agent() -> str:
-    """
-    Return a string representing the user agent.
-    """
-    data: Dict[str, Any] = {
-        "installer": {"name": "pip", "version": __version__},
-        "python": platform.python_version(),
-        "implementation": {
-            "name": platform.python_implementation(),
-        },
-    }
-
-    if data["implementation"]["name"] == "CPython":
-        data["implementation"]["version"] = platform.python_version()
-    elif data["implementation"]["name"] == "PyPy":
-        pypy_version_info = sys.pypy_version_info  # type: ignore
-        if pypy_version_info.releaselevel == "final":
-            pypy_version_info = pypy_version_info[:3]
-        data["implementation"]["version"] = ".".join(
-            [str(x) for x in pypy_version_info]
-        )
-    elif data["implementation"]["name"] == "Jython":
-        # Complete Guess
-        data["implementation"]["version"] = platform.python_version()
-    elif data["implementation"]["name"] == "IronPython":
-        # Complete Guess
-        data["implementation"]["version"] = platform.python_version()
-
-    if sys.platform.startswith("linux"):
-        from pip._vendor import distro
-
-        linux_distribution = distro.name(), distro.version(), distro.codename()
-        distro_infos: Dict[str, Any] = dict(
-            filter(
-                lambda x: x[1],
-                zip(["name", "version", "id"], linux_distribution),
-            )
-        )
-        libc = dict(
-            filter(
-                lambda x: x[1],
-                zip(["lib", "version"], libc_ver()),
-            )
-        )
-        if libc:
-            distro_infos["libc"] = libc
-        if distro_infos:
-            data["distro"] = distro_infos
-
-    if sys.platform.startswith("darwin") and platform.mac_ver()[0]:
-        data["distro"] = {"name": "macOS", "version": platform.mac_ver()[0]}
-
-    if platform.system():
-        data.setdefault("system", {})["name"] = platform.system()
-
-    if platform.release():
-        data.setdefault("system", {})["release"] = platform.release()
-
-    if platform.machine():
-        data["cpu"] = platform.machine()
-
-    if has_tls():
-        import _ssl as ssl
-
-        data["openssl_version"] = ssl.OPENSSL_VERSION
-
-    setuptools_dist = get_default_environment().get_distribution("setuptools")
-    if setuptools_dist is not None:
-        data["setuptools_version"] = str(setuptools_dist.version)
-
-    if shutil.which("rustc") is not None:
-        # If for any reason `rustc --version` fails, silently ignore it
-        try:
-            rustc_output = subprocess.check_output(
-                ["rustc", "--version"], stderr=subprocess.STDOUT, timeout=0.5
-            )
-        except Exception:
-            pass
-        else:
-            if rustc_output.startswith(b"rustc "):
-                # The format of `rustc --version` is:
-                # `b'rustc 1.52.1 (9bc8c42bb 2021-05-09)\n'`
-                # We extract just the middle (1.52.1) part
-                data["rustc_version"] = rustc_output.split(b" ")[1].decode()
-
-    # Use None rather than False so as not to give the impression that
-    # pip knows it is not being run under CI.  Rather, it is a null or
-    # inconclusive result.  Also, we include some value rather than no
-    # value to make it easier to know that the check has been run.
-    data["ci"] = True if looks_like_ci() else None
-
-    user_data = os.environ.get("PIP_USER_AGENT_USER_DATA")
-    if user_data is not None:
-        data["user_data"] = user_data
-
-    return "{data[installer][name]}/{data[installer][version]} {json}".format(
-        data=data,
-        json=json.dumps(data, separators=(",", ":"), sort_keys=True),
-    )
-
-
-class LocalFSAdapter(BaseAdapter):
-    def send(
-        self,
-        request: PreparedRequest,
-        stream: bool = False,
-        timeout: Optional[Union[float, Tuple[float, float]]] = None,
-        verify: Union[bool, str] = True,
-        cert: Optional[Union[str, Tuple[str, str]]] = None,
-        proxies: Optional[Mapping[str, str]] = None,
-    ) -> Response:
-        pathname = url_to_path(request.url)
-
-        resp = Response()
-        resp.status_code = 200
-        resp.url = request.url
-
-        try:
-            stats = os.stat(pathname)
-        except OSError as exc:
-            # format the exception raised as a io.BytesIO object,
-            # to return a better error message:
-            resp.status_code = 404
-            resp.reason = type(exc).__name__
-            resp.raw = io.BytesIO(f"{resp.reason}: {exc}".encode("utf8"))
-        else:
-            modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
-            content_type = mimetypes.guess_type(pathname)[0] or "text/plain"
-            resp.headers = CaseInsensitiveDict(
-                {
-                    "Content-Type": content_type,
-                    "Content-Length": stats.st_size,
-                    "Last-Modified": modified,
-                }
-            )
-
-            resp.raw = open(pathname, "rb")
-            resp.close = resp.raw.close
-
-        return resp
-
-    def close(self) -> None:
-        pass
-
-
-class _SSLContextAdapterMixin:
-    """Mixin to add the ``ssl_context`` constructor argument to HTTP adapters.
-
-    The additional argument is forwarded directly to the pool manager. This allows us
-    to dynamically decide what SSL store to use at runtime, which is used to implement
-    the optional ``truststore`` backend.
-    """
-
-    def __init__(
-        self,
-        *,
-        ssl_context: Optional["SSLContext"] = None,
-        **kwargs: Any,
-    ) -> None:
-        self._ssl_context = ssl_context
-        super().__init__(**kwargs)
-
-    def init_poolmanager(
-        self,
-        connections: int,
-        maxsize: int,
-        block: bool = DEFAULT_POOLBLOCK,
-        **pool_kwargs: Any,
-    ) -> "PoolManager":
-        if self._ssl_context is not None:
-            pool_kwargs.setdefault("ssl_context", self._ssl_context)
-        return super().init_poolmanager(  # type: ignore[misc]
-            connections=connections,
-            maxsize=maxsize,
-            block=block,
-            **pool_kwargs,
-        )
-
-
-class HTTPAdapter(_SSLContextAdapterMixin, _BaseHTTPAdapter):
-    pass
-
-
-class CacheControlAdapter(_SSLContextAdapterMixin, _BaseCacheControlAdapter):
-    pass
-
-
-class InsecureHTTPAdapter(HTTPAdapter):
-    def cert_verify(
-        self,
-        conn: ConnectionPool,
-        url: str,
-        verify: Union[bool, str],
-        cert: Optional[Union[str, Tuple[str, str]]],
-    ) -> None:
-        super().cert_verify(conn=conn, url=url, verify=False, cert=cert)
-
-
-class InsecureCacheControlAdapter(CacheControlAdapter):
-    def cert_verify(
-        self,
-        conn: ConnectionPool,
-        url: str,
-        verify: Union[bool, str],
-        cert: Optional[Union[str, Tuple[str, str]]],
-    ) -> None:
-        super().cert_verify(conn=conn, url=url, verify=False, cert=cert)
-
-
-class PipSession(requests.Session):
-    timeout: Optional[int] = None
-
-    def __init__(
-        self,
-        *args: Any,
-        retries: int = 0,
-        cache: Optional[str] = None,
-        trusted_hosts: Sequence[str] = (),
-        index_urls: Optional[List[str]] = None,
-        ssl_context: Optional["SSLContext"] = None,
-        **kwargs: Any,
-    ) -> None:
-        """
-        :param trusted_hosts: Domains not to emit warnings for when not using
-            HTTPS.
-        """
-        super().__init__(*args, **kwargs)
-
-        # Namespace the attribute with "pip_" just in case to prevent
-        # possible conflicts with the base class.
-        self.pip_trusted_origins: List[Tuple[str, Optional[int]]] = []
-
-        # Attach our User Agent to the request
-        self.headers["User-Agent"] = user_agent()
-
-        # Attach our Authentication handler to the session
-        self.auth = MultiDomainBasicAuth(index_urls=index_urls)
-
-        # Create our urllib3.Retry instance which will allow us to customize
-        # how we handle retries.
-        retries = urllib3.Retry(
-            # Set the total number of retries that a particular request can
-            # have.
-            total=retries,
-            # A 503 error from PyPI typically means that the Fastly -> Origin
-            # connection got interrupted in some way. A 503 error in general
-            # is typically considered a transient error so we'll go ahead and
-            # retry it.
-            # A 500 may indicate transient error in Amazon S3
-            # A 520 or 527 - may indicate transient error in CloudFlare
-            status_forcelist=[500, 503, 520, 527],
-            # Add a small amount of back off between failed requests in
-            # order to prevent hammering the service.
-            backoff_factor=0.25,
-        )  # type: ignore
-
-        # Our Insecure HTTPAdapter disables HTTPS validation. It does not
-        # support caching so we'll use it for all http:// URLs.
-        # If caching is disabled, we will also use it for
-        # https:// hosts that we've marked as ignoring
-        # TLS errors for (trusted-hosts).
-        insecure_adapter = InsecureHTTPAdapter(max_retries=retries)
-
-        # We want to _only_ cache responses on securely fetched origins or when
-        # the host is specified as trusted. We do this because
-        # we can't validate the response of an insecurely/untrusted fetched
-        # origin, and we don't want someone to be able to poison the cache and
-        # require manual eviction from the cache to fix it.
-        if cache:
-            secure_adapter = CacheControlAdapter(
-                cache=SafeFileCache(cache),
-                max_retries=retries,
-                ssl_context=ssl_context,
-            )
-            self._trusted_host_adapter = InsecureCacheControlAdapter(
-                cache=SafeFileCache(cache),
-                max_retries=retries,
-            )
-        else:
-            secure_adapter = HTTPAdapter(max_retries=retries, ssl_context=ssl_context)
-            self._trusted_host_adapter = insecure_adapter
-
-        self.mount("https://", secure_adapter)
-        self.mount("http://", insecure_adapter)
-
-        # Enable file:// urls
-        self.mount("file://", LocalFSAdapter())
-
-        for host in trusted_hosts:
-            self.add_trusted_host(host, suppress_logging=True)
-
-    def update_index_urls(self, new_index_urls: List[str]) -> None:
-        """
-        :param new_index_urls: New index urls to update the authentication
-            handler with.
-        """
-        self.auth.index_urls = new_index_urls
-
-    def add_trusted_host(
-        self, host: str, source: Optional[str] = None, suppress_logging: bool = False
-    ) -> None:
-        """
-        :param host: It is okay to provide a host that has previously been
-            added.
-        :param source: An optional source string, for logging where the host
-            string came from.
-        """
-        if not suppress_logging:
-            msg = f"adding trusted host: {host!r}"
-            if source is not None:
-                msg += f" (from {source})"
-            logger.info(msg)
-
-        parsed_host, parsed_port = parse_netloc(host)
-        if parsed_host is None:
-            raise ValueError(f"Trusted host URL must include a host part: {host!r}")
-        if (parsed_host, parsed_port) not in self.pip_trusted_origins:
-            self.pip_trusted_origins.append((parsed_host, parsed_port))
-
-        self.mount(
-            build_url_from_netloc(host, scheme="http") + "/", self._trusted_host_adapter
-        )
-        self.mount(build_url_from_netloc(host) + "/", self._trusted_host_adapter)
-        if not parsed_port:
-            self.mount(
-                build_url_from_netloc(host, scheme="http") + ":",
-                self._trusted_host_adapter,
-            )
-            # Mount wildcard ports for the same host.
-            self.mount(build_url_from_netloc(host) + ":", self._trusted_host_adapter)
-
-    def iter_secure_origins(self) -> Generator[SecureOrigin, None, None]:
-        yield from SECURE_ORIGINS
-        for host, port in self.pip_trusted_origins:
-            yield ("*", host, "*" if port is None else port)
-
-    def is_secure_origin(self, location: Link) -> bool:
-        # Determine if this url used a secure transport mechanism
-        parsed = urllib.parse.urlparse(str(location))
-        origin_protocol, origin_host, origin_port = (
-            parsed.scheme,
-            parsed.hostname,
-            parsed.port,
-        )
-
-        # The protocol to use to see if the protocol matches.
-        # Don't count the repository type as part of the protocol: in
-        # cases such as "git+ssh", only use "ssh". (I.e., Only verify against
-        # the last scheme.)
-        origin_protocol = origin_protocol.rsplit("+", 1)[-1]
-
-        # Determine if our origin is a secure origin by looking through our
-        # hardcoded list of secure origins, as well as any additional ones
-        # configured on this PackageFinder instance.
-        for secure_origin in self.iter_secure_origins():
-            secure_protocol, secure_host, secure_port = secure_origin
-            if origin_protocol != secure_protocol and secure_protocol != "*":
-                continue
-
-            try:
-                addr = ipaddress.ip_address(origin_host or "")
-                network = ipaddress.ip_network(secure_host)
-            except ValueError:
-                # We don't have both a valid address or a valid network, so
-                # we'll check this origin against hostnames.
-                if (
-                    origin_host
-                    and origin_host.lower() != secure_host.lower()
-                    and secure_host != "*"
-                ):
-                    continue
-            else:
-                # We have a valid address and network, so see if the address
-                # is contained within the network.
-                if addr not in network:
-                    continue
-
-            # Check to see if the port matches.
-            if (
-                origin_port != secure_port
-                and secure_port != "*"
-                and secure_port is not None
-            ):
-                continue
-
-            # If we've gotten here, then this origin matches the current
-            # secure origin and we should return True
-            return True
-
-        # If we've gotten to this point, then the origin isn't secure and we
-        # will not accept it as a valid location to search. We will however
-        # log a warning that we are ignoring it.
-        logger.warning(
-            "The repository located at %s is not a trusted or secure host and "
-            "is being ignored. If this repository is available via HTTPS we "
-            "recommend you use HTTPS instead, otherwise you may silence "
-            "this warning and allow it anyway with '--trusted-host %s'.",
-            origin_host,
-            origin_host,
-        )
-
-        return False
-
-    def request(self, method: str, url: str, *args: Any, **kwargs: Any) -> Response:
-        # Allow setting a default timeout on a session
-        kwargs.setdefault("timeout", self.timeout)
-        # Allow setting a default proxies on a session
-        kwargs.setdefault("proxies", self.proxies)
-
-        # Dispatch the actual request
-        return super().request(method, url, *args, **kwargs)