1 """Network Authentication Helpers
3 Contains interface (MultiDomainBasicAuth) and associated glue code for
4 providing credentials in the context of network requests.
13 from abc
import ABC
, abstractmethod
14 from functools
import lru_cache
15 from os
.path
import commonprefix
16 from pathlib
import Path
17 from typing
import Any
, Dict
, List
, NamedTuple
, Optional
, Tuple
19 from pip
._vendor
.requests
.auth
import AuthBase
, HTTPBasicAuth
20 from pip
._vendor
.requests
.models
import Request
, Response
21 from pip
._vendor
.requests
.utils
import get_netrc_auth
23 from pip
._internal
.utils
.logging
import getLogger
24 from pip
._internal
.utils
.misc
import (
29 split_auth_netloc_from_url
,
31 from pip
._internal
.vcs
.versioncontrol
import AuthInfo
33 logger
= getLogger(__name__
)
35 KEYRING_DISABLED
= False
38 class Credentials(NamedTuple
):
44 class KeyRingBaseProvider(ABC
):
45 """Keyring base provider interface"""
50 def get_auth_info(self
, url
: str, username
: Optional
[str]) -> Optional
[AuthInfo
]:
54 def save_auth_info(self
, url
: str, username
: str, password
: str) -> None:
58 class KeyRingNullProvider(KeyRingBaseProvider
):
59 """Keyring null provider"""
63 def get_auth_info(self
, url
: str, username
: Optional
[str]) -> Optional
[AuthInfo
]:
66 def save_auth_info(self
, url
: str, username
: str, password
: str) -> None:
70 class KeyRingPythonProvider(KeyRingBaseProvider
):
71 """Keyring interface which uses locally imported `keyring`"""
75 def __init__(self
) -> None:
78 self
.keyring
= keyring
80 def get_auth_info(self
, url
: str, username
: Optional
[str]) -> Optional
[AuthInfo
]:
81 # Support keyring's get_credential interface which supports getting
82 # credentials without a username. This is only available for
84 if hasattr(self
.keyring
, "get_credential"):
85 logger
.debug("Getting credentials from keyring for %s", url
)
86 cred
= self
.keyring
.get_credential(url
, username
)
88 return cred
.username
, cred
.password
91 if username
is not None:
92 logger
.debug("Getting password from keyring for %s", url
)
93 password
= self
.keyring
.get_password(url
, username
)
95 return username
, password
98 def save_auth_info(self
, url
: str, username
: str, password
: str) -> None:
99 self
.keyring
.set_password(url
, username
, password
)
102 class KeyRingCliProvider(KeyRingBaseProvider
):
103 """Provider which uses `keyring` cli
105 Instead of calling the keyring package installed alongside pip
106 we call keyring on the command line which will enable pip to
107 use which ever installation of keyring is available first in
113 def __init__(self
, cmd
: str) -> None:
116 def get_auth_info(self
, url
: str, username
: Optional
[str]) -> Optional
[AuthInfo
]:
117 # This is the default implementation of keyring.get_credential
118 # https://github.com/jaraco/keyring/blob/97689324abcf01bd1793d49063e7ca01e03d7d07/keyring/backend.py#L134-L139
119 if username
is not None:
120 password
= self
._get
_password
(url
, username
)
121 if password
is not None:
122 return username
, password
125 def save_auth_info(self
, url
: str, username
: str, password
: str) -> None:
126 return self
._set
_password
(url
, username
, password
)
128 def _get_password(self
, service_name
: str, username
: str) -> Optional
[str]:
129 """Mirror the implementation of keyring.get_password using cli"""
130 if self
.keyring
is None:
133 cmd
= [self
.keyring
, "get", service_name
, username
]
134 env
= os
.environ
.copy()
135 env
["PYTHONIOENCODING"] = "utf-8"
136 res
= subprocess
.run(
138 stdin
=subprocess
.DEVNULL
,
139 stdout
=subprocess
.PIPE
,
144 return res
.stdout
.decode("utf-8").strip(os
.linesep
)
146 def _set_password(self
, service_name
: str, username
: str, password
: str) -> None:
147 """Mirror the implementation of keyring.set_password using cli"""
148 if self
.keyring
is None:
150 env
= os
.environ
.copy()
151 env
["PYTHONIOENCODING"] = "utf-8"
153 [self
.keyring
, "set", service_name
, username
],
154 input=f
"{password}{os.linesep}".encode("utf-8"),
161 @lru_cache(maxsize
=None)
162 def get_keyring_provider(provider
: str) -> KeyRingBaseProvider
:
163 logger
.verbose("Keyring provider requested: %s", provider
)
165 # keyring has previously failed and been disabled
167 provider
= "disabled"
168 if provider
in ["import", "auto"]:
170 impl
= KeyRingPythonProvider()
171 logger
.verbose("Keyring provider set: import")
175 except Exception as exc
:
176 # In the event of an unexpected exception
177 # we should warn the user
178 msg
= "Installed copy of keyring fails with exception %s"
179 if provider
== "auto":
180 msg
= msg
+ ", trying to find a keyring executable as a fallback"
181 logger
.warning(msg
, exc
, exc_info
=logger
.isEnabledFor(logging
.DEBUG
))
182 if provider
in ["subprocess", "auto"]:
183 cli
= shutil
.which("keyring")
184 if cli
and cli
.startswith(sysconfig
.get_path("scripts")):
185 # all code within this function is stolen from shutil.which implementation
186 @typing.no_type_check
187 def PATH_as_shutil_which_determines_it() -> str:
188 path
= os
.environ
.get("PATH", None)
191 path
= os
.confstr("CS_PATH")
192 except (AttributeError, ValueError):
193 # os.confstr() or CS_PATH is not available
195 # bpo-35755: Don't use os.defpath if the PATH environment variable is
196 # set to an empty string
200 scripts
= Path(sysconfig
.get_path("scripts"))
203 for path
in PATH_as_shutil_which_determines_it().split(os
.pathsep
):
206 if not p
.samefile(scripts
):
208 except FileNotFoundError
:
211 path
= os
.pathsep
.join(paths
)
213 cli
= shutil
.which("keyring", path
=path
)
216 logger
.verbose("Keyring provider set: subprocess with executable %s", cli
)
217 return KeyRingCliProvider(cli
)
219 logger
.verbose("Keyring provider set: disabled")
220 return KeyRingNullProvider()
223 class MultiDomainBasicAuth(AuthBase
):
226 prompting
: bool = True,
227 index_urls
: Optional
[List
[str]] = None,
228 keyring_provider
: str = "auto",
230 self
.prompting
= prompting
231 self
.index_urls
= index_urls
232 self
.keyring_provider
= keyring_provider
# type: ignore[assignment]
233 self
.passwords
: Dict
[str, AuthInfo
] = {}
234 # When the user is prompted to enter credentials and keyring is
235 # available, we will offer to save them. If the user accepts,
236 # this value is set to the credentials they entered. After the
237 # request authenticates, the caller should call
238 # ``save_credentials`` to save these.
239 self
._credentials
_to
_save
: Optional
[Credentials
] = None
242 def keyring_provider(self
) -> KeyRingBaseProvider
:
243 return get_keyring_provider(self
._keyring
_provider
)
245 @keyring_provider.setter
246 def keyring_provider(self
, provider
: str) -> None:
247 # The free function get_keyring_provider has been decorated with
248 # functools.cache. If an exception occurs in get_keyring_auth that
249 # cache will be cleared and keyring disabled, take that into account
250 # if you want to remove this indirection.
251 self
._keyring
_provider
= provider
254 def use_keyring(self
) -> bool:
255 # We won't use keyring when --no-input is passed unless
256 # a specific provider is requested because it might require
258 return self
.prompting
or self
._keyring
_provider
not in ["auto", "disabled"]
260 def _get_keyring_auth(
263 username
: Optional
[str],
264 ) -> Optional
[AuthInfo
]:
265 """Return the tuple auth for a given url from keyring."""
266 # Do nothing if no url was provided
271 return self
.keyring_provider
.get_auth_info(url
, username
)
272 except Exception as exc
:
274 "Keyring is skipped due to an exception: %s",
277 global KEYRING_DISABLED
278 KEYRING_DISABLED
= True
279 get_keyring_provider
.cache_clear()
282 def _get_index_url(self
, url
: str) -> Optional
[str]:
283 """Return the original index URL matching the requested URL.
285 Cached or dynamically generated credentials may work against
286 the original index URL rather than just the netloc.
288 The provided url should have had its username and password
289 removed already. If the original index url had credentials then
290 they will be included in the return value.
292 Returns None if no matching index was found, or if --no-index
293 was specified by the user.
295 if not url
or not self
.index_urls
:
298 url
= remove_auth_from_url(url
).rstrip("/") + "/"
299 parsed_url
= urllib
.parse
.urlsplit(url
)
303 for index
in self
.index_urls
:
304 index
= index
.rstrip("/") + "/"
305 parsed_index
= urllib
.parse
.urlsplit(remove_auth_from_url(index
))
306 if parsed_url
== parsed_index
:
309 if parsed_url
.netloc
!= parsed_index
.netloc
:
312 candidate
= urllib
.parse
.urlsplit(index
)
313 candidates
.append(candidate
)
320 key
=lambda candidate
: commonprefix(
328 return urllib
.parse
.urlunsplit(candidates
[0])
330 def _get_new_credentials(
334 allow_netrc
: bool = True,
335 allow_keyring
: bool = False,
337 """Find and return credentials for the specified URL."""
338 # Split the credentials and netloc from the url.
339 url
, netloc
, url_user_password
= split_auth_netloc_from_url(
343 # Start with the credentials embedded in the url
344 username
, password
= url_user_password
345 if username
is not None and password
is not None:
346 logger
.debug("Found credentials in url for %s", netloc
)
347 return url_user_password
349 # Find a matching index url for this request
350 index_url
= self
._get
_index
_url
(url
)
352 # Split the credentials from the url.
353 index_info
= split_auth_netloc_from_url(index_url
)
355 index_url
, _
, index_url_user_password
= index_info
356 logger
.debug("Found index url %s", index_url
)
358 # If an index URL was found, try its embedded credentials
359 if index_url
and index_url_user_password
[0] is not None:
360 username
, password
= index_url_user_password
361 if username
is not None and password
is not None:
362 logger
.debug("Found credentials in index url for %s", netloc
)
363 return index_url_user_password
365 # Get creds from netrc if we still don't have them
367 netrc_auth
= get_netrc_auth(original_url
)
369 logger
.debug("Found credentials in netrc for %s", netloc
)
372 # If we don't have a password and keyring is available, use it.
374 # The index url is more specific than the netloc, so try it first
377 self
._get
_keyring
_auth
(index_url
, username
) or
378 self
._get
_keyring
_auth
(netloc
, username
)
382 logger
.debug("Found credentials in keyring for %s", netloc
)
385 return username
, password
387 def _get_url_and_credentials(
388 self
, original_url
: str
389 ) -> Tuple
[str, Optional
[str], Optional
[str]]:
390 """Return the credentials to use for the provided URL.
392 If allowed, netrc and keyring may be used to obtain the
395 Returns (url_without_credentials, username, password). Note
396 that even if the original URL contains credentials, this
397 function may return a different username and password.
399 url
, netloc
, _
= split_auth_netloc_from_url(original_url
)
401 # Try to get credentials from original url
402 username
, password
= self
._get
_new
_credentials
(original_url
)
404 # If credentials not found, use any stored credentials for this netloc.
405 # Do this if either the username or the password is missing.
406 # This accounts for the situation in which the user has specified
407 # the username in the index url, but the password comes from keyring.
408 if (username
is None or password
is None) and netloc
in self
.passwords
:
409 un
, pw
= self
.passwords
[netloc
]
410 # It is possible that the cached credentials are for a different username,
411 # in which case the cache should be ignored.
412 if username
is None or username
== un
:
413 username
, password
= un
, pw
415 if username
is not None or password
is not None:
416 # Convert the username and password if they're None, so that
417 # this netloc will show up as "cached" in the conditional above.
418 # Further, HTTPBasicAuth doesn't accept None, so it makes sense to
419 # cache the value that is going to be used.
420 username
= username
or ""
421 password
= password
or ""
423 # Store any acquired credentials.
424 self
.passwords
[netloc
] = (username
, password
)
427 # Credentials were found
428 (username
is not None and password
is not None)
429 # Credentials were not found
430 or (username
is None and password
is None)
431 ), f
"Could not load credentials from url: {original_url}"
433 return url
, username
, password
435 def __call__(self
, req
: Request
) -> Request
:
436 # Get credentials for this request
437 url
, username
, password
= self
._get
_url
_and
_credentials
(req
.url
)
439 # Set the url of the request to the url without any credentials
442 if username
is not None and password
is not None:
443 # Send the basic auth with this request
444 req
= HTTPBasicAuth(username
, password
)(req
)
446 # Attach a hook to handle 401 responses
447 req
.register_hook("response", self
.handle_401
)
451 # Factored out to allow for easy patching in tests
452 def _prompt_for_password(
454 ) -> Tuple
[Optional
[str], Optional
[str], bool]:
455 username
= ask_input(f
"User for {netloc}: ") if self
.prompting
else None
457 return None, None, False
459 auth
= self
._get
_keyring
_auth
(netloc
, username
)
460 if auth
and auth
[0] is not None and auth
[1] is not None:
461 return auth
[0], auth
[1], False
462 password
= ask_password("Password: ")
463 return username
, password
, True
465 # Factored out to allow for easy patching in tests
466 def _should_save_password_to_keyring(self
) -> bool:
469 or not self
.use_keyring
470 or not self
.keyring_provider
.has_keyring
473 return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
475 def handle_401(self
, resp
: Response
, **kwargs
: Any
) -> Response
:
476 # We only care about 401 responses, anything else we want to just
477 # pass through the actual response
478 if resp
.status_code
!= 401:
481 username
, password
= None, None
483 # Query the keyring for credentials:
485 username
, password
= self
._get
_new
_credentials
(
491 # We are not able to prompt the user so simply return the response
492 if not self
.prompting
and not username
and not password
:
495 parsed
= urllib
.parse
.urlparse(resp
.url
)
497 # Prompt the user for a new username and password
499 if not username
and not password
:
500 username
, password
, save
= self
._prompt
_for
_password
(parsed
.netloc
)
502 # Store the new username and password to use for future requests
503 self
._credentials
_to
_save
= None
504 if username
is not None and password
is not None:
505 self
.passwords
[parsed
.netloc
] = (username
, password
)
507 # Prompt to save the password to keyring
508 if save
and self
._should
_save
_password
_to
_keyring
():
509 self
._credentials
_to
_save
= Credentials(
515 # Consume content and release the original connection to allow our new
516 # request to reuse the same one.
517 # The result of the assignment isn't used, it's just needed to consume
520 resp
.raw
.release_conn()
522 # Add our new username and password to the request
523 req
= HTTPBasicAuth(username
or "", password
or "")(resp
.request
)
524 req
.register_hook("response", self
.warn_on_401
)
526 # On successful request, save the credentials that were used to
527 # keyring. (Note that if the user responded "no" above, this member
528 # is not set and nothing will be saved.)
529 if self
._credentials
_to
_save
:
530 req
.register_hook("response", self
.save_credentials
)
532 # Send our new request
533 new_resp
= resp
.connection
.send(req
, **kwargs
)
534 new_resp
.history
.append(resp
)
538 def warn_on_401(self
, resp
: Response
, **kwargs
: Any
) -> None:
539 """Response callback to warn about incorrect credentials."""
540 if resp
.status_code
== 401:
542 "401 Error, Credentials not correct for %s",
546 def save_credentials(self
, resp
: Response
, **kwargs
: Any
) -> None:
547 """Response callback to save credentials on success."""
549 self
.keyring_provider
.has_keyring
550 ), "should never reach here without keyring"
552 creds
= self
._credentials
_to
_save
553 self
._credentials
_to
_save
= None
554 if creds
and resp
.status_code
< 400:
556 logger
.info("Saving credentials to keyring")
557 self
.keyring_provider
.save_auth_info(
558 creds
.url
, creds
.username
, creds
.password
561 logger
.exception("Failed to save credentials")