14 from io
import StringIO
15 from itertools
import filterfalse
, tee
, zip_longest
16 from types
import TracebackType
36 from pip
._vendor
.pyproject_hooks
import BuildBackendHookCaller
37 from pip
._vendor
.tenacity
import retry
, stop_after_delay
, wait_fixed
39 from pip
import __version__
40 from pip
._internal
.exceptions
import CommandError
, ExternallyManagedEnvironment
41 from pip
._internal
.locations
import get_major_minor_version
42 from pip
._internal
.utils
.compat
import WINDOWS
43 from pip
._internal
.utils
.virtualenv
import running_under_virtualenv
58 "remove_auth_from_url",
59 "check_externally_managed",
60 "ConfiguredBuildBackendHookCaller",
63 logger
= logging
.getLogger(__name__
)
66 ExcInfo
= Tuple
[Type
[BaseException
], BaseException
, TracebackType
]
67 VersionInfo
= Tuple
[int, int, int]
68 NetlocTuple
= Tuple
[str, Tuple
[Optional
[str], Optional
[str]]]
71 def get_pip_version() -> str:
72 pip_pkg_dir
= os
.path
.join(os
.path
.dirname(__file__
), "..", "..")
73 pip_pkg_dir
= os
.path
.abspath(pip_pkg_dir
)
75 return "pip {} from {} (python {})".format(
78 get_major_minor_version(),
82 def normalize_version_info(py_version_info
: Tuple
[int, ...]) -> Tuple
[int, int, int]:
84 Convert a tuple of ints representing a Python version to one of length
87 :param py_version_info: a tuple of ints representing a Python version,
88 or None to specify no version. The tuple can have any length.
90 :return: a tuple of length three if `py_version_info` is non-None.
91 Otherwise, return `py_version_info` unchanged (i.e. None).
93 if len(py_version_info
) < 3:
94 py_version_info
+= (3 - len(py_version_info
)) * (0,)
95 elif len(py_version_info
) > 3:
96 py_version_info
= py_version_info
[:3]
98 return cast("VersionInfo", py_version_info
)
101 def ensure_dir(path
: str) -> None:
102 """os.path.makedirs without EEXIST."""
106 # Windows can raise spurious ENOTEMPTY errors. See #6426.
107 if e
.errno
!= errno
.EEXIST
and e
.errno
!= errno
.ENOTEMPTY
:
111 def get_prog() -> str:
113 prog
= os
.path
.basename(sys
.argv
[0])
114 if prog
in ("__main__.py", "-c"):
115 return f
"{sys.executable} -m pip"
118 except (AttributeError, TypeError, IndexError):
123 # Retry every half second for up to 3 seconds
124 # Tenacity raises RetryError by default, explicitly raise the original exception
125 @retry(reraise
=True, stop
=stop_after_delay(3), wait
=wait_fixed(0.5))
126 def rmtree(dir: str, ignore_errors
: bool = False) -> None:
127 if sys
.version_info
>= (3, 12):
128 shutil
.rmtree(dir, ignore_errors
=ignore_errors
, onexc
=rmtree_errorhandler
)
130 shutil
.rmtree(dir, ignore_errors
=ignore_errors
, onerror
=rmtree_errorhandler
)
133 def rmtree_errorhandler(
134 func
: Callable
[..., Any
], path
: str, exc_info
: Union
[ExcInfo
, BaseException
]
136 """On Windows, the files in .svn are read-only, so when rmtree() tries to
137 remove them, an exception is thrown. We catch that here, remove the
138 read-only attribute, and hopefully continue without problems."""
140 has_attr_readonly
= not (os
.stat(path
).st_mode
& stat
.S_IWRITE
)
142 # it's equivalent to os.path.exists
145 if has_attr_readonly
:
146 # convert to read/write
147 os
.chmod(path
, stat
.S_IWRITE
)
148 # use the original function to repeat the operation
155 def display_path(path
: str) -> str:
156 """Gives the display value for a given path, making it relative to cwd
158 path
= os
.path
.normcase(os
.path
.abspath(path
))
159 if path
.startswith(os
.getcwd() + os
.path
.sep
):
160 path
= "." + path
[len(os
.getcwd()) :]
164 def backup_dir(dir: str, ext
: str = ".bak") -> str:
165 """Figure out the name of a directory to back up the given dir to
166 (adding .bak, .bak2, etc)"""
169 while os
.path
.exists(dir + extension
):
171 extension
= ext
+ str(n
)
172 return dir + extension
175 def ask_path_exists(message
: str, options
: Iterable
[str]) -> str:
176 for action
in os
.environ
.get("PIP_EXISTS_ACTION", "").split():
177 if action
in options
:
179 return ask(message
, options
)
182 def _check_no_input(message
: str) -> None:
183 """Raise an error if no input is allowed."""
184 if os
.environ
.get("PIP_NO_INPUT"):
186 f
"No input was expected ($PIP_NO_INPUT set); question: {message}"
190 def ask(message
: str, options
: Iterable
[str]) -> str:
191 """Ask the message interactively, with the given possible responses"""
193 _check_no_input(message
)
194 response
= input(message
)
195 response
= response
.strip().lower()
196 if response
not in options
:
198 "Your response ({!r}) was not one of the expected responses: "
199 "{}".format(response
, ", ".join(options
))
205 def ask_input(message
: str) -> str:
206 """Ask for input interactively."""
207 _check_no_input(message
)
208 return input(message
)
211 def ask_password(message
: str) -> str:
212 """Ask for a password interactively."""
213 _check_no_input(message
)
214 return getpass
.getpass(message
)
217 def strtobool(val
: str) -> int:
218 """Convert a string representation of truth to true (1) or false (0).
220 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
221 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
222 'val' is anything else.
225 if val
in ("y", "yes", "t", "true", "on", "1"):
227 elif val
in ("n", "no", "f", "false", "off", "0"):
230 raise ValueError(f
"invalid truth value {val!r}")
233 def format_size(bytes: float) -> str:
234 if bytes > 1000 * 1000:
235 return "{:.1f} MB".format(bytes / 1000.0 / 1000)
236 elif bytes > 10 * 1000:
237 return "{} kB".format(int(bytes / 1000))
239 return "{:.1f} kB".format(bytes / 1000.0)
241 return "{} bytes".format(int(bytes))
244 def tabulate(rows
: Iterable
[Iterable
[Any
]]) -> Tuple
[List
[str], List
[int]]:
245 """Return a list of formatted rows and a list of column sizes.
249 >>> tabulate([['foobar', 2000], [0xdeadbeef]])
250 (['foobar 2000', '3735928559'], [10, 4])
252 rows
= [tuple(map(str, row
)) for row
in rows
]
253 sizes
= [max(map(len, col
)) for col
in zip_longest(*rows
, fillvalue
="")]
254 table
= [" ".join(map(str.ljust
, row
, sizes
)).rstrip() for row
in rows
]
258 def is_installable_dir(path
: str) -> bool:
259 """Is path is a directory containing pyproject.toml or setup.py?
261 If pyproject.toml exists, this is a PEP 517 project. Otherwise we look for
262 a legacy setuptools layout by identifying setup.py. We don't check for the
263 setup.cfg because using it without setup.py is only available for PEP 517
264 projects, which are already covered by the pyproject.toml check.
266 if not os
.path
.isdir(path
):
268 if os
.path
.isfile(os
.path
.join(path
, "pyproject.toml")):
270 if os
.path
.isfile(os
.path
.join(path
, "setup.py")):
276 file: BinaryIO
, size
: int = io
.DEFAULT_BUFFER_SIZE
277 ) -> Generator
[bytes, None, None]:
278 """Yield pieces of data from a file-like object until EOF."""
280 chunk
= file.read(size
)
286 def normalize_path(path
: str, resolve_symlinks
: bool = True) -> str:
288 Convert a path to its canonical, case-normalized, absolute version.
291 path
= os
.path
.expanduser(path
)
293 path
= os
.path
.realpath(path
)
295 path
= os
.path
.abspath(path
)
296 return os
.path
.normcase(path
)
299 def splitext(path
: str) -> Tuple
[str, str]:
300 """Like os.path.splitext, but take off .tar too"""
301 base
, ext
= posixpath
.splitext(path
)
302 if base
.lower().endswith(".tar"):
303 ext
= base
[-4:] + ext
308 def renames(old
: str, new
: str) -> None:
309 """Like os.renames(), but handles renaming across devices."""
310 # Implementation borrowed from os.renames().
311 head
, tail
= os
.path
.split(new
)
312 if head
and tail
and not os
.path
.exists(head
):
315 shutil
.move(old
, new
)
317 head
, tail
= os
.path
.split(old
)
325 def is_local(path
: str) -> bool:
327 Return True if path is within sys.prefix, if we're running in a virtualenv.
329 If we're not in a virtualenv, all paths are considered "local."
331 Caution: this function assumes the head of path has been normalized
334 if not running_under_virtualenv():
336 return path
.startswith(normalize_path(sys
.prefix
))
339 def write_output(msg
: Any
, *args
: Any
) -> None:
340 logger
.info(msg
, *args
)
343 class StreamWrapper(StringIO
):
347 def from_stream(cls
, orig_stream
: TextIO
) -> "StreamWrapper":
349 ret
.orig_stream
= orig_stream
352 # compileall.compile_dir() needs stdout.encoding to print to stdout
353 # type ignore is because TextIOBase.encoding is writeable
355 def encoding(self
) -> str: # type: ignore
356 return self
.orig_stream
.encoding
359 @contextlib.contextmanager
360 def captured_output(stream_name
: str) -> Generator
[StreamWrapper
, None, None]:
361 """Return a context manager used by captured_stdout/stdin/stderr
362 that temporarily replaces the sys stream *stream_name* with a StringIO.
364 Taken from Lib/support/__init__.py in the CPython repo.
366 orig_stdout
= getattr(sys
, stream_name
)
367 setattr(sys
, stream_name
, StreamWrapper
.from_stream(orig_stdout
))
369 yield getattr(sys
, stream_name
)
371 setattr(sys
, stream_name
, orig_stdout
)
374 def captured_stdout() -> ContextManager
[StreamWrapper
]:
375 """Capture the output of sys.stdout:
377 with captured_stdout() as stdout:
379 self.assertEqual(stdout.getvalue(), 'hello\n')
381 Taken from Lib/support/__init__.py in the CPython repo.
383 return captured_output("stdout")
386 def captured_stderr() -> ContextManager
[StreamWrapper
]:
388 See captured_stdout().
390 return captured_output("stderr")
394 def enum(*sequential
: Any
, **named
: Any
) -> Type
[Any
]:
395 enums
= dict(zip(sequential
, range(len(sequential
))), **named
)
396 reverse
= {value: key for key, value in enums.items()}
397 enums
["reverse_mapping"] = reverse
398 return type("Enum", (), enums
)
401 def build_netloc(host
: str, port
: Optional
[int]) -> str:
403 Build a netloc from a host-port pair
408 # Only wrap host with square brackets when it is IPv6
410 return f
"{host}:{port}"
413 def build_url_from_netloc(netloc
: str, scheme
: str = "https") -> str:
415 Build a full URL from a netloc.
417 if netloc
.count(":") >= 2 and "@" not in netloc
and "[" not in netloc
:
418 # It must be a bare IPv6 address, so wrap it with brackets.
419 netloc
= f
"[{netloc}]"
420 return f
"{scheme}://{netloc}"
423 def parse_netloc(netloc
: str) -> Tuple
[Optional
[str], Optional
[int]]:
425 Return the host-port pair from a netloc.
427 url
= build_url_from_netloc(netloc
)
428 parsed
= urllib
.parse
.urlparse(url
)
429 return parsed
.hostname
, parsed
.port
432 def split_auth_from_netloc(netloc
: str) -> NetlocTuple
:
434 Parse out and remove the auth information from a netloc.
436 Returns: (netloc, (username, password)).
438 if "@" not in netloc
:
439 return netloc
, (None, None)
441 # Split from the right because that's how urllib.parse.urlsplit()
442 # behaves if more than one @ is present (which can be checked using
443 # the password attribute of urlsplit()'s return value).
444 auth
, netloc
= netloc
.rsplit("@", 1)
445 pw
: Optional
[str] = None
447 # Split from the left because that's how urllib.parse.urlsplit()
448 # behaves if more than one : is present (which again can be checked
449 # using the password attribute of the return value)
450 user
, pw
= auth
.split(":", 1)
452 user
, pw
= auth
, None
454 user
= urllib
.parse
.unquote(user
)
456 pw
= urllib
.parse
.unquote(pw
)
458 return netloc
, (user
, pw
)
461 def redact_netloc(netloc
: str) -> str:
463 Replace the sensitive data in a netloc with "****", if it exists.
466 - "user:pass@example.com" returns "user:****@example.com"
467 - "accesstoken@example.com" returns "****@example.com"
469 netloc
, (user
, password
) = split_auth_from_netloc(netloc
)
476 user
= urllib
.parse
.quote(user
)
478 return "{user}{password}@{netloc}".format(
479 user
=user
, password
=password
, netloc
=netloc
484 url
: str, transform_netloc
: Callable
[[str], Tuple
[Any
, ...]]
485 ) -> Tuple
[str, NetlocTuple
]:
486 """Transform and replace netloc in a url.
488 transform_netloc is a function taking the netloc and returning a
489 tuple. The first element of this tuple is the new netloc. The
490 entire tuple is returned.
492 Returns a tuple containing the transformed url as item 0 and the
493 original tuple returned by transform_netloc as item 1.
495 purl
= urllib
.parse
.urlsplit(url
)
496 netloc_tuple
= transform_netloc(purl
.netloc
)
498 url_pieces
= (purl
.scheme
, netloc_tuple
[0], purl
.path
, purl
.query
, purl
.fragment
)
499 surl
= urllib
.parse
.urlunsplit(url_pieces
)
500 return surl
, cast("NetlocTuple", netloc_tuple
)
503 def _get_netloc(netloc
: str) -> NetlocTuple
:
504 return split_auth_from_netloc(netloc
)
507 def _redact_netloc(netloc
: str) -> Tuple
[str]:
508 return (redact_netloc(netloc
),)
511 def split_auth_netloc_from_url(
513 ) -> Tuple
[str, str, Tuple
[Optional
[str], Optional
[str]]]:
515 Parse a url into separate netloc, auth, and url with no auth.
517 Returns: (url_without_auth, netloc, (username, password))
519 url_without_auth
, (netloc
, auth
) = _transform_url(url
, _get_netloc
)
520 return url_without_auth
, netloc
, auth
523 def remove_auth_from_url(url
: str) -> str:
524 """Return a copy of url with 'username:password@' removed."""
525 # username/pass params are passed to subversion through flags
526 # and are not recognized in the url.
527 return _transform_url(url
, _get_netloc
)[0]
530 def redact_auth_from_url(url
: str) -> str:
531 """Replace the password in a given url with ****."""
532 return _transform_url(url
, _redact_netloc
)[0]
536 def __init__(self
, secret
: str, redacted
: str) -> None:
538 self
.redacted
= redacted
540 def __repr__(self
) -> str:
541 return "<HiddenText {!r}>".format(str(self
))
543 def __str__(self
) -> str:
546 # This is useful for testing.
547 def __eq__(self
, other
: Any
) -> bool:
548 if type(self
) != type(other
):
551 # The string being used for redaction doesn't also have to match,
552 # just the raw, original string.
553 return self
.secret
== other
.secret
556 def hide_value(value
: str) -> HiddenText
:
557 return HiddenText(value
, redacted
="****")
560 def hide_url(url
: str) -> HiddenText
:
561 redacted
= redact_auth_from_url(url
)
562 return HiddenText(url
, redacted
=redacted
)
565 def protect_pip_from_modification_on_windows(modifying_pip
: bool) -> None:
566 """Protection of pip.exe from modification on Windows
568 On Windows, any operation modifying pip should be run as:
573 f
"pip{sys.version_info.major}",
574 f
"pip{sys.version_info.major}.{sys.version_info.minor}",
577 # See https://github.com/pypa/pip/issues/1299 for more discussion
578 should_show_use_python_msg
= (
579 modifying_pip
and WINDOWS
and os
.path
.basename(sys
.argv
[0]) in pip_names
582 if should_show_use_python_msg
:
583 new_command
= [sys
.executable
, "-m", "pip"] + sys
.argv
[1:]
585 "To modify pip, please run the following command:\n{}".format(
586 " ".join(new_command
)
591 def check_externally_managed() -> None:
592 """Check whether the current environment is externally managed.
594 If the ``EXTERNALLY-MANAGED`` config file is found, the current environment
595 is considered externally managed, and an ExternallyManagedEnvironment is
598 if running_under_virtualenv():
600 marker
= os
.path
.join(sysconfig
.get_path("stdlib"), "EXTERNALLY-MANAGED")
601 if not os
.path
.isfile(marker
):
603 raise ExternallyManagedEnvironment
.from_config(marker
)
606 def is_console_interactive() -> bool:
607 """Is this console interactive?"""
608 return sys
.stdin
is not None and sys
.stdin
.isatty()
611 def hash_file(path
: str, blocksize
: int = 1 << 20) -> Tuple
[Any
, int]:
612 """Return (hash, length) for path using hashlib.sha256()"""
616 with open(path
, "rb") as f
:
617 for block
in read_chunks(f
, size
=blocksize
):
623 def pairwise(iterable
: Iterable
[Any
]) -> Iterator
[Tuple
[Any
, Any
]]:
625 Return paired elements.
628 s -> (s0, s1), (s2, s3), (s4, s5), ...
630 iterable
= iter(iterable
)
631 return zip_longest(iterable
, iterable
)
635 pred
: Callable
[[T
], bool],
636 iterable
: Iterable
[T
],
637 ) -> Tuple
[Iterable
[T
], Iterable
[T
]]:
639 Use a predicate to partition entries into false entries and true entries,
642 partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
644 t1
, t2
= tee(iterable
)
645 return filterfalse(pred
, t1
), filter(pred
, t2
)
648 class ConfiguredBuildBackendHookCaller(BuildBackendHookCaller
):
654 backend_path
: Optional
[str] = None,
655 runner
: Optional
[Callable
[..., None]] = None,
656 python_executable
: Optional
[str] = None,
659 source_dir
, build_backend
, backend_path
, runner
, python_executable
661 self
.config_holder
= config_holder
665 wheel_directory
: str,
666 config_settings
: Optional
[Dict
[str, Union
[str, List
[str]]]] = None,
667 metadata_directory
: Optional
[str] = None,
669 cs
= self
.config_holder
.config_settings
670 return super().build_wheel(
671 wheel_directory
, config_settings
=cs
, metadata_directory
=metadata_directory
676 sdist_directory
: str,
677 config_settings
: Optional
[Dict
[str, Union
[str, List
[str]]]] = None,
679 cs
= self
.config_holder
.config_settings
680 return super().build_sdist(sdist_directory
, config_settings
=cs
)
684 wheel_directory
: str,
685 config_settings
: Optional
[Dict
[str, Union
[str, List
[str]]]] = None,
686 metadata_directory
: Optional
[str] = None,
688 cs
= self
.config_holder
.config_settings
689 return super().build_editable(
690 wheel_directory
, config_settings
=cs
, metadata_directory
=metadata_directory
693 def get_requires_for_build_wheel(
694 self
, config_settings
: Optional
[Dict
[str, Union
[str, List
[str]]]] = None
696 cs
= self
.config_holder
.config_settings
697 return super().get_requires_for_build_wheel(config_settings
=cs
)
699 def get_requires_for_build_sdist(
700 self
, config_settings
: Optional
[Dict
[str, Union
[str, List
[str]]]] = None
702 cs
= self
.config_holder
.config_settings
703 return super().get_requires_for_build_sdist(config_settings
=cs
)
705 def get_requires_for_build_editable(
706 self
, config_settings
: Optional
[Dict
[str, Union
[str, List
[str]]]] = None
708 cs
= self
.config_holder
.config_settings
709 return super().get_requires_for_build_editable(config_settings
=cs
)
711 def prepare_metadata_for_build_wheel(
713 metadata_directory
: str,
714 config_settings
: Optional
[Dict
[str, Union
[str, List
[str]]]] = None,
715 _allow_fallback
: bool = True,
717 cs
= self
.config_holder
.config_settings
718 return super().prepare_metadata_for_build_wheel(
719 metadata_directory
=metadata_directory
,
721 _allow_fallback
=_allow_fallback
,
724 def prepare_metadata_for_build_editable(
726 metadata_directory
: str,
727 config_settings
: Optional
[Dict
[str, Union
[str, List
[str]]]] = None,
728 _allow_fallback
: bool = True,
730 cs
= self
.config_holder
.config_settings
731 return super().prepare_metadata_for_build_editable(
732 metadata_directory
=metadata_directory
,
734 _allow_fallback
=_allow_fallback
,