]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/pip/_internal/network/auth.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / pip / _internal / network / auth.py
1 """Network Authentication Helpers
2
3 Contains interface (MultiDomainBasicAuth) and associated glue code for
4 providing credentials in the context of network requests.
5 """
6 import logging
7 import os
8 import shutil
9 import subprocess
10 import sysconfig
11 import typing
12 import urllib.parse
13 from abc import ABC, abstractmethod
14 from functools import lru_cache
15 from os.path import commonprefix
16 from pathlib import Path
17 from typing import Any, Dict, List, NamedTuple, Optional, Tuple
18
19 from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
20 from pip._vendor.requests.models import Request, Response
21 from pip._vendor.requests.utils import get_netrc_auth
22
23 from pip._internal.utils.logging import getLogger
24 from pip._internal.utils.misc import (
25 ask,
26 ask_input,
27 ask_password,
28 remove_auth_from_url,
29 split_auth_netloc_from_url,
30 )
31 from pip._internal.vcs.versioncontrol import AuthInfo
32
33 logger = getLogger(__name__)
34
35 KEYRING_DISABLED = False
36
37
38 class Credentials(NamedTuple):
39 url: str
40 username: str
41 password: str
42
43
44 class KeyRingBaseProvider(ABC):
45 """Keyring base provider interface"""
46
47 has_keyring: bool
48
49 @abstractmethod
50 def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
51 ...
52
53 @abstractmethod
54 def save_auth_info(self, url: str, username: str, password: str) -> None:
55 ...
56
57
58 class KeyRingNullProvider(KeyRingBaseProvider):
59 """Keyring null provider"""
60
61 has_keyring = False
62
63 def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
64 return None
65
66 def save_auth_info(self, url: str, username: str, password: str) -> None:
67 return None
68
69
70 class KeyRingPythonProvider(KeyRingBaseProvider):
71 """Keyring interface which uses locally imported `keyring`"""
72
73 has_keyring = True
74
75 def __init__(self) -> None:
76 import keyring
77
78 self.keyring = keyring
79
80 def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
81 # Support keyring's get_credential interface which supports getting
82 # credentials without a username. This is only available for
83 # keyring>=15.2.0.
84 if hasattr(self.keyring, "get_credential"):
85 logger.debug("Getting credentials from keyring for %s", url)
86 cred = self.keyring.get_credential(url, username)
87 if cred is not None:
88 return cred.username, cred.password
89 return None
90
91 if username is not None:
92 logger.debug("Getting password from keyring for %s", url)
93 password = self.keyring.get_password(url, username)
94 if password:
95 return username, password
96 return None
97
98 def save_auth_info(self, url: str, username: str, password: str) -> None:
99 self.keyring.set_password(url, username, password)
100
101
102 class KeyRingCliProvider(KeyRingBaseProvider):
103 """Provider which uses `keyring` cli
104
105 Instead of calling the keyring package installed alongside pip
106 we call keyring on the command line which will enable pip to
107 use which ever installation of keyring is available first in
108 PATH.
109 """
110
111 has_keyring = True
112
113 def __init__(self, cmd: str) -> None:
114 self.keyring = cmd
115
116 def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]:
117 # This is the default implementation of keyring.get_credential
118 # https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139
119 if username is not None:
120 password = self._get_password(url, username)
121 if password is not None:
122 return username, password
123 return None
124
125 def save_auth_info(self, url: str, username: str, password: str) -> None:
126 return self._set_password(url, username, password)
127
128 def _get_password(self, service_name: str, username: str) -> Optional[str]:
129 """Mirror the implementation of keyring.get_password using cli"""
130 if self.keyring is None:
131 return None
132
133 cmd = [self.keyring, "get", service_name, username]
134 env = os.environ.copy()
135 env["PYTHONIOENCODING"] = "utf-8"
136 res = subprocess.run(
137 cmd,
138 stdin=subprocess.DEVNULL,
139 stdout=subprocess.PIPE,
140 env=env,
141 )
142 if res.returncode:
143 return None
144 return res.stdout.decode("utf-8").strip(os.linesep)
145
146 def _set_password(self, service_name: str, username: str, password: str) -> None:
147 """Mirror the implementation of keyring.set_password using cli"""
148 if self.keyring is None:
149 return None
150 env = os.environ.copy()
151 env["PYTHONIOENCODING"] = "utf-8"
152 subprocess.run(
153 [self.keyring, "set", service_name, username],
154 input=f"{password}{os.linesep}".encode("utf-8"),
155 env=env,
156 check=True,
157 )
158 return None
159
160
161 @lru_cache(maxsize=None)
162 def get_keyring_provider(provider: str) -> KeyRingBaseProvider:
163 logger.verbose("Keyring provider requested: %s", provider)
164
165 # keyring has previously failed and been disabled
166 if KEYRING_DISABLED:
167 provider = "disabled"
168 if provider in ["import", "auto"]:
169 try:
170 impl = KeyRingPythonProvider()
171 logger.verbose("Keyring provider set: import")
172 return impl
173 except ImportError:
174 pass
175 except Exception as exc:
176 # In the event of an unexpected exception
177 # we should warn the user
178 msg = "Installed copy of keyring fails with exception %s"
179 if provider == "auto":
180 msg = msg + ", trying to find a keyring executable as a fallback"
181 logger.warning(msg, exc, exc_info=logger.isEnabledFor(logging.DEBUG))
182 if provider in ["subprocess", "auto"]:
183 cli = shutil.which("keyring")
184 if cli and cli.startswith(sysconfig.get_path("scripts")):
185 # all code within this function is stolen from shutil.which implementation
186 @typing.no_type_check
187 def PATH_as_shutil_which_determines_it() -> str:
188 path = os.environ.get("PATH", None)
189 if path is None:
190 try:
191 path = os.confstr("CS_PATH")
192 except (AttributeError, ValueError):
193 # os.confstr() or CS_PATH is not available
194 path = os.defpath
195 # bpo-35755: Don't use os.defpath if the PATH environment variable is
196 # set to an empty string
197
198 return path
199
200 scripts = Path(sysconfig.get_path("scripts"))
201
202 paths = []
203 for path in PATH_as_shutil_which_determines_it().split(os.pathsep):
204 p = Path(path)
205 try:
206 if not p.samefile(scripts):
207 paths.append(path)
208 except FileNotFoundError:
209 pass
210
211 path = os.pathsep.join(paths)
212
213 cli = shutil.which("keyring", path=path)
214
215 if cli:
216 logger.verbose("Keyring provider set: subprocess with executable %s", cli)
217 return KeyRingCliProvider(cli)
218
219 logger.verbose("Keyring provider set: disabled")
220 return KeyRingNullProvider()
221
222
223 class MultiDomainBasicAuth(AuthBase):
224 def __init__(
225 self,
226 prompting: bool = True,
227 index_urls: Optional[List[str]] = None,
228 keyring_provider: str = "auto",
229 ) -> None:
230 self.prompting = prompting
231 self.index_urls = index_urls
232 self.keyring_provider = keyring_provider # type: ignore[assignment]
233 self.passwords: Dict[str, AuthInfo] = {}
234 # When the user is prompted to enter credentials and keyring is
235 # available, we will offer to save them. If the user accepts,
236 # this value is set to the credentials they entered. After the
237 # request authenticates, the caller should call
238 # ``save_credentials`` to save these.
239 self._credentials_to_save: Optional[Credentials] = None
240
241 @property
242 def keyring_provider(self) -> KeyRingBaseProvider:
243 return get_keyring_provider(self._keyring_provider)
244
245 @keyring_provider.setter
246 def keyring_provider(self, provider: str) -> None:
247 # The free function get_keyring_provider has been decorated with
248 # functools.cache. If an exception occurs in get_keyring_auth that
249 # cache will be cleared and keyring disabled, take that into account
250 # if you want to remove this indirection.
251 self._keyring_provider = provider
252
253 @property
254 def use_keyring(self) -> bool:
255 # We won't use keyring when --no-input is passed unless
256 # a specific provider is requested because it might require
257 # user interaction
258 return self.prompting or self._keyring_provider not in ["auto", "disabled"]
259
260 def _get_keyring_auth(
261 self,
262 url: Optional[str],
263 username: Optional[str],
264 ) -> Optional[AuthInfo]:
265 """Return the tuple auth for a given url from keyring."""
266 # Do nothing if no url was provided
267 if not url:
268 return None
269
270 try:
271 return self.keyring_provider.get_auth_info(url, username)
272 except Exception as exc:
273 logger.warning(
274 "Keyring is skipped due to an exception: %s",
275 str(exc),
276 )
277 global KEYRING_DISABLED
278 KEYRING_DISABLED = True
279 get_keyring_provider.cache_clear()
280 return None
281
282 def _get_index_url(self, url: str) -> Optional[str]:
283 """Return the original index URL matching the requested URL.
284
285 Cached or dynamically generated credentials may work against
286 the original index URL rather than just the netloc.
287
288 The provided url should have had its username and password
289 removed already. If the original index url had credentials then
290 they will be included in the return value.
291
292 Returns None if no matching index was found, or if --no-index
293 was specified by the user.
294 """
295 if not url or not self.index_urls:
296 return None
297
298 url = remove_auth_from_url(url).rstrip("/") + "/"
299 parsed_url = urllib.parse.urlsplit(url)
300
301 candidates = []
302
303 for index in self.index_urls:
304 index = index.rstrip("/") + "/"
305 parsed_index = urllib.parse.urlsplit(remove_auth_from_url(index))
306 if parsed_url == parsed_index:
307 return index
308
309 if parsed_url.netloc != parsed_index.netloc:
310 continue
311
312 candidate = urllib.parse.urlsplit(index)
313 candidates.append(candidate)
314
315 if not candidates:
316 return None
317
318 candidates.sort(
319 reverse=True,
320 key=lambda candidate: commonprefix(
321 [
322 parsed_url.path,
323 candidate.path,
324 ]
325 ).rfind("/"),
326 )
327
328 return urllib.parse.urlunsplit(candidates[0])
329
330 def _get_new_credentials(
331 self,
332 original_url: str,
333 *,
334 allow_netrc: bool = True,
335 allow_keyring: bool = False,
336 ) -> AuthInfo:
337 """Find and return credentials for the specified URL."""
338 # Split the credentials and netloc from the url.
339 url, netloc, url_user_password = split_auth_netloc_from_url(
340 original_url,
341 )
342
343 # Start with the credentials embedded in the url
344 username, password = url_user_password
345 if username is not None and password is not None:
346 logger.debug("Found credentials in url for %s", netloc)
347 return url_user_password
348
349 # Find a matching index url for this request
350 index_url = self._get_index_url(url)
351 if index_url:
352 # Split the credentials from the url.
353 index_info = split_auth_netloc_from_url(index_url)
354 if index_info:
355 index_url, _, index_url_user_password = index_info
356 logger.debug("Found index url %s", index_url)
357
358 # If an index URL was found, try its embedded credentials
359 if index_url and index_url_user_password[0] is not None:
360 username, password = index_url_user_password
361 if username is not None and password is not None:
362 logger.debug("Found credentials in index url for %s", netloc)
363 return index_url_user_password
364
365 # Get creds from netrc if we still don't have them
366 if allow_netrc:
367 netrc_auth = get_netrc_auth(original_url)
368 if netrc_auth:
369 logger.debug("Found credentials in netrc for %s", netloc)
370 return netrc_auth
371
372 # If we don't have a password and keyring is available, use it.
373 if allow_keyring:
374 # The index url is more specific than the netloc, so try it first
375 # fmt: off
376 kr_auth = (
377 self._get_keyring_auth(index_url, username) or
378 self._get_keyring_auth(netloc, username)
379 )
380 # fmt: on
381 if kr_auth:
382 logger.debug("Found credentials in keyring for %s", netloc)
383 return kr_auth
384
385 return username, password
386
387 def _get_url_and_credentials(
388 self, original_url: str
389 ) -> Tuple[str, Optional[str], Optional[str]]:
390 """Return the credentials to use for the provided URL.
391
392 If allowed, netrc and keyring may be used to obtain the
393 correct credentials.
394
395 Returns (url_without_credentials, username, password). Note
396 that even if the original URL contains credentials, this
397 function may return a different username and password.
398 """
399 url, netloc, _ = split_auth_netloc_from_url(original_url)
400
401 # Try to get credentials from original url
402 username, password = self._get_new_credentials(original_url)
403
404 # If credentials not found, use any stored credentials for this netloc.
405 # Do this if either the username or the password is missing.
406 # This accounts for the situation in which the user has specified
407 # the username in the index url, but the password comes from keyring.
408 if (username is None or password is None) and netloc in self.passwords:
409 un, pw = self.passwords[netloc]
410 # It is possible that the cached credentials are for a different username,
411 # in which case the cache should be ignored.
412 if username is None or username == un:
413 username, password = un, pw
414
415 if username is not None or password is not None:
416 # Convert the username and password if they're None, so that
417 # this netloc will show up as "cached" in the conditional above.
418 # Further, HTTPBasicAuth doesn't accept None, so it makes sense to
419 # cache the value that is going to be used.
420 username = username or ""
421 password = password or ""
422
423 # Store any acquired credentials.
424 self.passwords[netloc] = (username, password)
425
426 assert (
427 # Credentials were found
428 (username is not None and password is not None)
429 # Credentials were not found
430 or (username is None and password is None)
431 ), f"Could not load credentials from url: {original_url}"
432
433 return url, username, password
434
435 def __call__(self, req: Request) -> Request:
436 # Get credentials for this request
437 url, username, password = self._get_url_and_credentials(req.url)
438
439 # Set the url of the request to the url without any credentials
440 req.url = url
441
442 if username is not None and password is not None:
443 # Send the basic auth with this request
444 req = HTTPBasicAuth(username, password)(req)
445
446 # Attach a hook to handle 401 responses
447 req.register_hook("response", self.handle_401)
448
449 return req
450
451 # Factored out to allow for easy patching in tests
452 def _prompt_for_password(
453 self, netloc: str
454 ) -> Tuple[Optional[str], Optional[str], bool]:
455 username = ask_input(f"User for {netloc}: ") if self.prompting else None
456 if not username:
457 return None, None, False
458 if self.use_keyring:
459 auth = self._get_keyring_auth(netloc, username)
460 if auth and auth[0] is not None and auth[1] is not None:
461 return auth[0], auth[1], False
462 password = ask_password("Password: ")
463 return username, password, True
464
465 # Factored out to allow for easy patching in tests
466 def _should_save_password_to_keyring(self) -> bool:
467 if (
468 not self.prompting
469 or not self.use_keyring
470 or not self.keyring_provider.has_keyring
471 ):
472 return False
473 return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
474
475 def handle_401(self, resp: Response, **kwargs: Any) -> Response:
476 # We only care about 401 responses, anything else we want to just
477 # pass through the actual response
478 if resp.status_code != 401:
479 return resp
480
481 username, password = None, None
482
483 # Query the keyring for credentials:
484 if self.use_keyring:
485 username, password = self._get_new_credentials(
486 resp.url,
487 allow_netrc=False,
488 allow_keyring=True,
489 )
490
491 # We are not able to prompt the user so simply return the response
492 if not self.prompting and not username and not password:
493 return resp
494
495 parsed = urllib.parse.urlparse(resp.url)
496
497 # Prompt the user for a new username and password
498 save = False
499 if not username and not password:
500 username, password, save = self._prompt_for_password(parsed.netloc)
501
502 # Store the new username and password to use for future requests
503 self._credentials_to_save = None
504 if username is not None and password is not None:
505 self.passwords[parsed.netloc] = (username, password)
506
507 # Prompt to save the password to keyring
508 if save and self._should_save_password_to_keyring():
509 self._credentials_to_save = Credentials(
510 url=parsed.netloc,
511 username=username,
512 password=password,
513 )
514
515 # Consume content and release the original connection to allow our new
516 # request to reuse the same one.
517 # The result of the assignment isn't used, it's just needed to consume
518 # the content.
519 _ = resp.content
520 resp.raw.release_conn()
521
522 # Add our new username and password to the request
523 req = HTTPBasicAuth(username or "", password or "")(resp.request)
524 req.register_hook("response", self.warn_on_401)
525
526 # On successful request, save the credentials that were used to
527 # keyring. (Note that if the user responded "no" above, this member
528 # is not set and nothing will be saved.)
529 if self._credentials_to_save:
530 req.register_hook("response", self.save_credentials)
531
532 # Send our new request
533 new_resp = resp.connection.send(req, **kwargs)
534 new_resp.history.append(resp)
535
536 return new_resp
537
538 def warn_on_401(self, resp: Response, **kwargs: Any) -> None:
539 """Response callback to warn about incorrect credentials."""
540 if resp.status_code == 401:
541 logger.warning(
542 "401 Error, Credentials not correct for %s",
543 resp.request.url,
544 )
545
546 def save_credentials(self, resp: Response, **kwargs: Any) -> None:
547 """Response callback to save credentials on success."""
548 assert (
549 self.keyring_provider.has_keyring
550 ), "should never reach here without keyring"
551
552 creds = self._credentials_to_save
553 self._credentials_to_save = None
554 if creds and resp.status_code < 400:
555 try:
556 logger.info("Saving credentials to keyring")
557 self.keyring_provider.save_auth_info(
558 creds.url, creds.username, creds.password
559 )
560 except Exception:
561 logger.exception("Failed to save credentials")