]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/pip/_internal/utils/misc.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / pip / _internal / utils / misc.py
1 import contextlib
2 import errno
3 import getpass
4 import hashlib
5 import io
6 import logging
7 import os
8 import posixpath
9 import shutil
10 import stat
11 import sys
12 import sysconfig
13 import urllib.parse
14 from io import StringIO
15 from itertools import filterfalse, tee, zip_longest
16 from types import TracebackType
17 from typing import (
18 Any,
19 BinaryIO,
20 Callable,
21 ContextManager,
22 Dict,
23 Generator,
24 Iterable,
25 Iterator,
26 List,
27 Optional,
28 TextIO,
29 Tuple,
30 Type,
31 TypeVar,
32 Union,
33 cast,
34 )
35
36 from pip._vendor.pyproject_hooks import BuildBackendHookCaller
37 from pip._vendor.tenacity import retry, stop_after_delay, wait_fixed
38
39 from pip import __version__
40 from pip._internal.exceptions import CommandError, ExternallyManagedEnvironment
41 from pip._internal.locations import get_major_minor_version
42 from pip._internal.utils.compat import WINDOWS
43 from pip._internal.utils.virtualenv import running_under_virtualenv
44
45 __all__ = [
46 "rmtree",
47 "display_path",
48 "backup_dir",
49 "ask",
50 "splitext",
51 "format_size",
52 "is_installable_dir",
53 "normalize_path",
54 "renames",
55 "get_prog",
56 "captured_stdout",
57 "ensure_dir",
58 "remove_auth_from_url",
59 "check_externally_managed",
60 "ConfiguredBuildBackendHookCaller",
61 ]
62
63 logger = logging.getLogger(__name__)
64
65 T = TypeVar("T")
66 ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
67 VersionInfo = Tuple[int, int, int]
68 NetlocTuple = Tuple[str, Tuple[Optional[str], Optional[str]]]
69
70
71 def get_pip_version() -> str:
72 pip_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "..")
73 pip_pkg_dir = os.path.abspath(pip_pkg_dir)
74
75 return "pip {} from {} (python {})".format(
76 __version__,
77 pip_pkg_dir,
78 get_major_minor_version(),
79 )
80
81
82 def normalize_version_info(py_version_info: Tuple[int, ...]) -> Tuple[int, int, int]:
83 """
84 Convert a tuple of ints representing a Python version to one of length
85 three.
86
87 :param py_version_info: a tuple of ints representing a Python version,
88 or None to specify no version. The tuple can have any length.
89
90 :return: a tuple of length three if `py_version_info` is non-None.
91 Otherwise, return `py_version_info` unchanged (i.e. None).
92 """
93 if len(py_version_info) < 3:
94 py_version_info += (3 - len(py_version_info)) * (0,)
95 elif len(py_version_info) > 3:
96 py_version_info = py_version_info[:3]
97
98 return cast("VersionInfo", py_version_info)
99
100
101 def ensure_dir(path: str) -> None:
102 """os.path.makedirs without EEXIST."""
103 try:
104 os.makedirs(path)
105 except OSError as e:
106 # Windows can raise spurious ENOTEMPTY errors. See #6426.
107 if e.errno != errno.EEXIST and e.errno != errno.ENOTEMPTY:
108 raise
109
110
111 def get_prog() -> str:
112 try:
113 prog = os.path.basename(sys.argv[0])
114 if prog in ("__main__.py", "-c"):
115 return f"{sys.executable} -m pip"
116 else:
117 return prog
118 except (AttributeError, TypeError, IndexError):
119 pass
120 return "pip"
121
122
123 # Retry every half second for up to 3 seconds
124 # Tenacity raises RetryError by default, explicitly raise the original exception
125 @retry(reraise=True, stop=stop_after_delay(3), wait=wait_fixed(0.5))
126 def rmtree(dir: str, ignore_errors: bool = False) -> None:
127 if sys.version_info >= (3, 12):
128 shutil.rmtree(dir, ignore_errors=ignore_errors, onexc=rmtree_errorhandler)
129 else:
130 shutil.rmtree(dir, ignore_errors=ignore_errors, onerror=rmtree_errorhandler)
131
132
133 def rmtree_errorhandler(
134 func: Callable[..., Any], path: str, exc_info: Union[ExcInfo, BaseException]
135 ) -> None:
136 """On Windows, the files in .svn are read-only, so when rmtree() tries to
137 remove them, an exception is thrown. We catch that here, remove the
138 read-only attribute, and hopefully continue without problems."""
139 try:
140 has_attr_readonly = not (os.stat(path).st_mode & stat.S_IWRITE)
141 except OSError:
142 # it's equivalent to os.path.exists
143 return
144
145 if has_attr_readonly:
146 # convert to read/write
147 os.chmod(path, stat.S_IWRITE)
148 # use the original function to repeat the operation
149 func(path)
150 return
151 else:
152 raise
153
154
155 def display_path(path: str) -> str:
156 """Gives the display value for a given path, making it relative to cwd
157 if possible."""
158 path = os.path.normcase(os.path.abspath(path))
159 if path.startswith(os.getcwd() + os.path.sep):
160 path = "." + path[len(os.getcwd()) :]
161 return path
162
163
164 def backup_dir(dir: str, ext: str = ".bak") -> str:
165 """Figure out the name of a directory to back up the given dir to
166 (adding .bak, .bak2, etc)"""
167 n = 1
168 extension = ext
169 while os.path.exists(dir + extension):
170 n += 1
171 extension = ext + str(n)
172 return dir + extension
173
174
175 def ask_path_exists(message: str, options: Iterable[str]) -> str:
176 for action in os.environ.get("PIP_EXISTS_ACTION", "").split():
177 if action in options:
178 return action
179 return ask(message, options)
180
181
182 def _check_no_input(message: str) -> None:
183 """Raise an error if no input is allowed."""
184 if os.environ.get("PIP_NO_INPUT"):
185 raise Exception(
186 f"No input was expected ($PIP_NO_INPUT set); question: {message}"
187 )
188
189
190 def ask(message: str, options: Iterable[str]) -> str:
191 """Ask the message interactively, with the given possible responses"""
192 while 1:
193 _check_no_input(message)
194 response = input(message)
195 response = response.strip().lower()
196 if response not in options:
197 print(
198 "Your response ({!r}) was not one of the expected responses: "
199 "{}".format(response, ", ".join(options))
200 )
201 else:
202 return response
203
204
205 def ask_input(message: str) -> str:
206 """Ask for input interactively."""
207 _check_no_input(message)
208 return input(message)
209
210
211 def ask_password(message: str) -> str:
212 """Ask for a password interactively."""
213 _check_no_input(message)
214 return getpass.getpass(message)
215
216
217 def strtobool(val: str) -> int:
218 """Convert a string representation of truth to true (1) or false (0).
219
220 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
221 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
222 'val' is anything else.
223 """
224 val = val.lower()
225 if val in ("y", "yes", "t", "true", "on", "1"):
226 return 1
227 elif val in ("n", "no", "f", "false", "off", "0"):
228 return 0
229 else:
230 raise ValueError(f"invalid truth value {val!r}")
231
232
233 def format_size(bytes: float) -> str:
234 if bytes > 1000 * 1000:
235 return "{:.1f} MB".format(bytes / 1000.0 / 1000)
236 elif bytes > 10 * 1000:
237 return "{} kB".format(int(bytes / 1000))
238 elif bytes > 1000:
239 return "{:.1f} kB".format(bytes / 1000.0)
240 else:
241 return "{} bytes".format(int(bytes))
242
243
244 def tabulate(rows: Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]]:
245 """Return a list of formatted rows and a list of column sizes.
246
247 For example::
248
249 >>> tabulate([['foobar', 2000], [0xdeadbeef]])
250 (['foobar 2000', '3735928559'], [10, 4])
251 """
252 rows = [tuple(map(str, row)) for row in rows]
253 sizes = [max(map(len, col)) for col in zip_longest(*rows, fillvalue="")]
254 table = [" ".join(map(str.ljust, row, sizes)).rstrip() for row in rows]
255 return table, sizes
256
257
258 def is_installable_dir(path: str) -> bool:
259 """Is path is a directory containing pyproject.toml or setup.py?
260
261 If pyproject.toml exists, this is a PEP 517 project. Otherwise we look for
262 a legacy setuptools layout by identifying setup.py. We don't check for the
263 setup.cfg because using it without setup.py is only available for PEP 517
264 projects, which are already covered by the pyproject.toml check.
265 """
266 if not os.path.isdir(path):
267 return False
268 if os.path.isfile(os.path.join(path, "pyproject.toml")):
269 return True
270 if os.path.isfile(os.path.join(path, "setup.py")):
271 return True
272 return False
273
274
275 def read_chunks(
276 file: BinaryIO, size: int = io.DEFAULT_BUFFER_SIZE
277 ) -> Generator[bytes, None, None]:
278 """Yield pieces of data from a file-like object until EOF."""
279 while True:
280 chunk = file.read(size)
281 if not chunk:
282 break
283 yield chunk
284
285
286 def normalize_path(path: str, resolve_symlinks: bool = True) -> str:
287 """
288 Convert a path to its canonical, case-normalized, absolute version.
289
290 """
291 path = os.path.expanduser(path)
292 if resolve_symlinks:
293 path = os.path.realpath(path)
294 else:
295 path = os.path.abspath(path)
296 return os.path.normcase(path)
297
298
299 def splitext(path: str) -> Tuple[str, str]:
300 """Like os.path.splitext, but take off .tar too"""
301 base, ext = posixpath.splitext(path)
302 if base.lower().endswith(".tar"):
303 ext = base[-4:] + ext
304 base = base[:-4]
305 return base, ext
306
307
308 def renames(old: str, new: str) -> None:
309 """Like os.renames(), but handles renaming across devices."""
310 # Implementation borrowed from os.renames().
311 head, tail = os.path.split(new)
312 if head and tail and not os.path.exists(head):
313 os.makedirs(head)
314
315 shutil.move(old, new)
316
317 head, tail = os.path.split(old)
318 if head and tail:
319 try:
320 os.removedirs(head)
321 except OSError:
322 pass
323
324
325 def is_local(path: str) -> bool:
326 """
327 Return True if path is within sys.prefix, if we're running in a virtualenv.
328
329 If we're not in a virtualenv, all paths are considered "local."
330
331 Caution: this function assumes the head of path has been normalized
332 with normalize_path.
333 """
334 if not running_under_virtualenv():
335 return True
336 return path.startswith(normalize_path(sys.prefix))
337
338
339 def write_output(msg: Any, *args: Any) -> None:
340 logger.info(msg, *args)
341
342
343 class StreamWrapper(StringIO):
344 orig_stream: TextIO
345
346 @classmethod
347 def from_stream(cls, orig_stream: TextIO) -> "StreamWrapper":
348 ret = cls()
349 ret.orig_stream = orig_stream
350 return ret
351
352 # compileall.compile_dir() needs stdout.encoding to print to stdout
353 # type ignore is because TextIOBase.encoding is writeable
354 @property
355 def encoding(self) -> str: # type: ignore
356 return self.orig_stream.encoding
357
358
359 @contextlib.contextmanager
360 def captured_output(stream_name: str) -> Generator[StreamWrapper, None, None]:
361 """Return a context manager used by captured_stdout/stdin/stderr
362 that temporarily replaces the sys stream *stream_name* with a StringIO.
363
364 Taken from Lib/support/__init__.py in the CPython repo.
365 """
366 orig_stdout = getattr(sys, stream_name)
367 setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))
368 try:
369 yield getattr(sys, stream_name)
370 finally:
371 setattr(sys, stream_name, orig_stdout)
372
373
374 def captured_stdout() -> ContextManager[StreamWrapper]:
375 """Capture the output of sys.stdout:
376
377 with captured_stdout() as stdout:
378 print('hello')
379 self.assertEqual(stdout.getvalue(), 'hello\n')
380
381 Taken from Lib/support/__init__.py in the CPython repo.
382 """
383 return captured_output("stdout")
384
385
386 def captured_stderr() -> ContextManager[StreamWrapper]:
387 """
388 See captured_stdout().
389 """
390 return captured_output("stderr")
391
392
393 # Simulates an enum
394 def enum(*sequential: Any, **named: Any) -> Type[Any]:
395 enums = dict(zip(sequential, range(len(sequential))), **named)
396 reverse = {value: key for key, value in enums.items()}
397 enums["reverse_mapping"] = reverse
398 return type("Enum", (), enums)
399
400
401 def build_netloc(host: str, port: Optional[int]) -> str:
402 """
403 Build a netloc from a host-port pair
404 """
405 if port is None:
406 return host
407 if ":" in host:
408 # Only wrap host with square brackets when it is IPv6
409 host = f"[{host}]"
410 return f"{host}:{port}"
411
412
413 def build_url_from_netloc(netloc: str, scheme: str = "https") -> str:
414 """
415 Build a full URL from a netloc.
416 """
417 if netloc.count(":") >= 2 and "@" not in netloc and "[" not in netloc:
418 # It must be a bare IPv6 address, so wrap it with brackets.
419 netloc = f"[{netloc}]"
420 return f"{scheme}://{netloc}"
421
422
423 def parse_netloc(netloc: str) -> Tuple[Optional[str], Optional[int]]:
424 """
425 Return the host-port pair from a netloc.
426 """
427 url = build_url_from_netloc(netloc)
428 parsed = urllib.parse.urlparse(url)
429 return parsed.hostname, parsed.port
430
431
432 def split_auth_from_netloc(netloc: str) -> NetlocTuple:
433 """
434 Parse out and remove the auth information from a netloc.
435
436 Returns: (netloc, (username, password)).
437 """
438 if "@" not in netloc:
439 return netloc, (None, None)
440
441 # Split from the right because that's how urllib.parse.urlsplit()
442 # behaves if more than one @ is present (which can be checked using
443 # the password attribute of urlsplit()'s return value).
444 auth, netloc = netloc.rsplit("@", 1)
445 pw: Optional[str] = None
446 if ":" in auth:
447 # Split from the left because that's how urllib.parse.urlsplit()
448 # behaves if more than one : is present (which again can be checked
449 # using the password attribute of the return value)
450 user, pw = auth.split(":", 1)
451 else:
452 user, pw = auth, None
453
454 user = urllib.parse.unquote(user)
455 if pw is not None:
456 pw = urllib.parse.unquote(pw)
457
458 return netloc, (user, pw)
459
460
461 def redact_netloc(netloc: str) -> str:
462 """
463 Replace the sensitive data in a netloc with "****", if it exists.
464
465 For example:
466 - "user:pass@example.com" returns "user:****@example.com"
467 - "accesstoken@example.com" returns "****@example.com"
468 """
469 netloc, (user, password) = split_auth_from_netloc(netloc)
470 if user is None:
471 return netloc
472 if password is None:
473 user = "****"
474 password = ""
475 else:
476 user = urllib.parse.quote(user)
477 password = ":****"
478 return "{user}{password}@{netloc}".format(
479 user=user, password=password, netloc=netloc
480 )
481
482
483 def _transform_url(
484 url: str, transform_netloc: Callable[[str], Tuple[Any, ...]]
485 ) -> Tuple[str, NetlocTuple]:
486 """Transform and replace netloc in a url.
487
488 transform_netloc is a function taking the netloc and returning a
489 tuple. The first element of this tuple is the new netloc. The
490 entire tuple is returned.
491
492 Returns a tuple containing the transformed url as item 0 and the
493 original tuple returned by transform_netloc as item 1.
494 """
495 purl = urllib.parse.urlsplit(url)
496 netloc_tuple = transform_netloc(purl.netloc)
497 # stripped url
498 url_pieces = (purl.scheme, netloc_tuple[0], purl.path, purl.query, purl.fragment)
499 surl = urllib.parse.urlunsplit(url_pieces)
500 return surl, cast("NetlocTuple", netloc_tuple)
501
502
503 def _get_netloc(netloc: str) -> NetlocTuple:
504 return split_auth_from_netloc(netloc)
505
506
507 def _redact_netloc(netloc: str) -> Tuple[str]:
508 return (redact_netloc(netloc),)
509
510
511 def split_auth_netloc_from_url(
512 url: str,
513 ) -> Tuple[str, str, Tuple[Optional[str], Optional[str]]]:
514 """
515 Parse a url into separate netloc, auth, and url with no auth.
516
517 Returns: (url_without_auth, netloc, (username, password))
518 """
519 url_without_auth, (netloc, auth) = _transform_url(url, _get_netloc)
520 return url_without_auth, netloc, auth
521
522
523 def remove_auth_from_url(url: str) -> str:
524 """Return a copy of url with 'username:password@' removed."""
525 # username/pass params are passed to subversion through flags
526 # and are not recognized in the url.
527 return _transform_url(url, _get_netloc)[0]
528
529
530 def redact_auth_from_url(url: str) -> str:
531 """Replace the password in a given url with ****."""
532 return _transform_url(url, _redact_netloc)[0]
533
534
535 class HiddenText:
536 def __init__(self, secret: str, redacted: str) -> None:
537 self.secret = secret
538 self.redacted = redacted
539
540 def __repr__(self) -> str:
541 return "<HiddenText {!r}>".format(str(self))
542
543 def __str__(self) -> str:
544 return self.redacted
545
546 # This is useful for testing.
547 def __eq__(self, other: Any) -> bool:
548 if type(self) != type(other):
549 return False
550
551 # The string being used for redaction doesn't also have to match,
552 # just the raw, original string.
553 return self.secret == other.secret
554
555
556 def hide_value(value: str) -> HiddenText:
557 return HiddenText(value, redacted="****")
558
559
560 def hide_url(url: str) -> HiddenText:
561 redacted = redact_auth_from_url(url)
562 return HiddenText(url, redacted=redacted)
563
564
565 def protect_pip_from_modification_on_windows(modifying_pip: bool) -> None:
566 """Protection of pip.exe from modification on Windows
567
568 On Windows, any operation modifying pip should be run as:
569 python -m pip ...
570 """
571 pip_names = [
572 "pip",
573 f"pip{sys.version_info.major}",
574 f"pip{sys.version_info.major}.{sys.version_info.minor}",
575 ]
576
577 # See https://github.com/pypa/pip/issues/1299 for more discussion
578 should_show_use_python_msg = (
579 modifying_pip and WINDOWS and os.path.basename(sys.argv[0]) in pip_names
580 )
581
582 if should_show_use_python_msg:
583 new_command = [sys.executable, "-m", "pip"] + sys.argv[1:]
584 raise CommandError(
585 "To modify pip, please run the following command:\n{}".format(
586 " ".join(new_command)
587 )
588 )
589
590
591 def check_externally_managed() -> None:
592 """Check whether the current environment is externally managed.
593
594 If the ``EXTERNALLY-MANAGED`` config file is found, the current environment
595 is considered externally managed, and an ExternallyManagedEnvironment is
596 raised.
597 """
598 if running_under_virtualenv():
599 return
600 marker = os.path.join(sysconfig.get_path("stdlib"), "EXTERNALLY-MANAGED")
601 if not os.path.isfile(marker):
602 return
603 raise ExternallyManagedEnvironment.from_config(marker)
604
605
606 def is_console_interactive() -> bool:
607 """Is this console interactive?"""
608 return sys.stdin is not None and sys.stdin.isatty()
609
610
611 def hash_file(path: str, blocksize: int = 1 << 20) -> Tuple[Any, int]:
612 """Return (hash, length) for path using hashlib.sha256()"""
613
614 h = hashlib.sha256()
615 length = 0
616 with open(path, "rb") as f:
617 for block in read_chunks(f, size=blocksize):
618 length += len(block)
619 h.update(block)
620 return h, length
621
622
623 def pairwise(iterable: Iterable[Any]) -> Iterator[Tuple[Any, Any]]:
624 """
625 Return paired elements.
626
627 For example:
628 s -> (s0, s1), (s2, s3), (s4, s5), ...
629 """
630 iterable = iter(iterable)
631 return zip_longest(iterable, iterable)
632
633
634 def partition(
635 pred: Callable[[T], bool],
636 iterable: Iterable[T],
637 ) -> Tuple[Iterable[T], Iterable[T]]:
638 """
639 Use a predicate to partition entries into false entries and true entries,
640 like
641
642 partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
643 """
644 t1, t2 = tee(iterable)
645 return filterfalse(pred, t1), filter(pred, t2)
646
647
648 class ConfiguredBuildBackendHookCaller(BuildBackendHookCaller):
649 def __init__(
650 self,
651 config_holder: Any,
652 source_dir: str,
653 build_backend: str,
654 backend_path: Optional[str] = None,
655 runner: Optional[Callable[..., None]] = None,
656 python_executable: Optional[str] = None,
657 ):
658 super().__init__(
659 source_dir, build_backend, backend_path, runner, python_executable
660 )
661 self.config_holder = config_holder
662
663 def build_wheel(
664 self,
665 wheel_directory: str,
666 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
667 metadata_directory: Optional[str] = None,
668 ) -> str:
669 cs = self.config_holder.config_settings
670 return super().build_wheel(
671 wheel_directory, config_settings=cs, metadata_directory=metadata_directory
672 )
673
674 def build_sdist(
675 self,
676 sdist_directory: str,
677 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
678 ) -> str:
679 cs = self.config_holder.config_settings
680 return super().build_sdist(sdist_directory, config_settings=cs)
681
682 def build_editable(
683 self,
684 wheel_directory: str,
685 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
686 metadata_directory: Optional[str] = None,
687 ) -> str:
688 cs = self.config_holder.config_settings
689 return super().build_editable(
690 wheel_directory, config_settings=cs, metadata_directory=metadata_directory
691 )
692
693 def get_requires_for_build_wheel(
694 self, config_settings: Optional[Dict[str, Union[str, List[str]]]] = None
695 ) -> List[str]:
696 cs = self.config_holder.config_settings
697 return super().get_requires_for_build_wheel(config_settings=cs)
698
699 def get_requires_for_build_sdist(
700 self, config_settings: Optional[Dict[str, Union[str, List[str]]]] = None
701 ) -> List[str]:
702 cs = self.config_holder.config_settings
703 return super().get_requires_for_build_sdist(config_settings=cs)
704
705 def get_requires_for_build_editable(
706 self, config_settings: Optional[Dict[str, Union[str, List[str]]]] = None
707 ) -> List[str]:
708 cs = self.config_holder.config_settings
709 return super().get_requires_for_build_editable(config_settings=cs)
710
711 def prepare_metadata_for_build_wheel(
712 self,
713 metadata_directory: str,
714 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
715 _allow_fallback: bool = True,
716 ) -> str:
717 cs = self.config_holder.config_settings
718 return super().prepare_metadata_for_build_wheel(
719 metadata_directory=metadata_directory,
720 config_settings=cs,
721 _allow_fallback=_allow_fallback,
722 )
723
724 def prepare_metadata_for_build_editable(
725 self,
726 metadata_directory: str,
727 config_settings: Optional[Dict[str, Union[str, List[str]]]] = None,
728 _allow_fallback: bool = True,
729 ) -> str:
730 cs = self.config_holder.config_settings
731 return super().prepare_metadata_for_build_editable(
732 metadata_directory=metadata_directory,
733 config_settings=cs,
734 _allow_fallback=_allow_fallback,
735 )