]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/pip/_internal/vcs/git.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / pip / _internal / vcs / git.py
1 import logging
2 import os.path
3 import pathlib
4 import re
5 import urllib.parse
6 import urllib.request
7 from typing import List, Optional, Tuple
8
9 from pip._internal.exceptions import BadCommand, InstallationError
10 from pip._internal.utils.misc import HiddenText, display_path, hide_url
11 from pip._internal.utils.subprocess import make_command
12 from pip._internal.vcs.versioncontrol import (
13 AuthInfo,
14 RemoteNotFoundError,
15 RemoteNotValidError,
16 RevOptions,
17 VersionControl,
18 find_path_to_project_root_from_repo_root,
19 vcs,
20 )
21
22 urlsplit = urllib.parse.urlsplit
23 urlunsplit = urllib.parse.urlunsplit
24
25
26 logger = logging.getLogger(__name__)
27
28
29 GIT_VERSION_REGEX = re.compile(
30 r"^git version " # Prefix.
31 r"(\d+)" # Major.
32 r"\.(\d+)" # Dot, minor.
33 r"(?:\.(\d+))?" # Optional dot, patch.
34 r".*$" # Suffix, including any pre- and post-release segments we don't care about.
35 )
36
37 HASH_REGEX = re.compile("^[a-fA-F0-9]{40}$")
38
39 # SCP (Secure copy protocol) shorthand. e.g. 'git@example.com:foo/bar.git'
40 SCP_REGEX = re.compile(
41 r"""^
42 # Optional user, e.g. 'git@'
43 (\w+@)?
44 # Server, e.g. 'github.com'.
45 ([^/:]+):
46 # The server-side path. e.g. 'user/project.git'. Must start with an
47 # alphanumeric character so as not to be confusable with a Windows paths
48 # like 'C:/foo/bar' or 'C:\foo\bar'.
49 (\w[^:]*)
50 $""",
51 re.VERBOSE,
52 )
53
54
55 def looks_like_hash(sha: str) -> bool:
56 return bool(HASH_REGEX.match(sha))
57
58
59 class Git(VersionControl):
60 name = "git"
61 dirname = ".git"
62 repo_name = "clone"
63 schemes = (
64 "git+http",
65 "git+https",
66 "git+ssh",
67 "git+git",
68 "git+file",
69 )
70 # Prevent the user's environment variables from interfering with pip:
71 # https://github.com/pypa/pip/issues/1130
72 unset_environ = ("GIT_DIR", "GIT_WORK_TREE")
73 default_arg_rev = "HEAD"
74
75 @staticmethod
76 def get_base_rev_args(rev: str) -> List[str]:
77 return [rev]
78
79 def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
80 _, rev_options = self.get_url_rev_options(hide_url(url))
81 if not rev_options.rev:
82 return False
83 if not self.is_commit_id_equal(dest, rev_options.rev):
84 # the current commit is different from rev,
85 # which means rev was something else than a commit hash
86 return False
87 # return False in the rare case rev is both a commit hash
88 # and a tag or a branch; we don't want to cache in that case
89 # because that branch/tag could point to something else in the future
90 is_tag_or_branch = bool(self.get_revision_sha(dest, rev_options.rev)[0])
91 return not is_tag_or_branch
92
93 def get_git_version(self) -> Tuple[int, ...]:
94 version = self.run_command(
95 ["version"],
96 command_desc="git version",
97 show_stdout=False,
98 stdout_only=True,
99 )
100 match = GIT_VERSION_REGEX.match(version)
101 if not match:
102 logger.warning("Can't parse git version: %s", version)
103 return ()
104 return tuple(int(c) for c in match.groups())
105
106 @classmethod
107 def get_current_branch(cls, location: str) -> Optional[str]:
108 """
109 Return the current branch, or None if HEAD isn't at a branch
110 (e.g. detached HEAD).
111 """
112 # git-symbolic-ref exits with empty stdout if "HEAD" is a detached
113 # HEAD rather than a symbolic ref. In addition, the -q causes the
114 # command to exit with status code 1 instead of 128 in this case
115 # and to suppress the message to stderr.
116 args = ["symbolic-ref", "-q", "HEAD"]
117 output = cls.run_command(
118 args,
119 extra_ok_returncodes=(1,),
120 show_stdout=False,
121 stdout_only=True,
122 cwd=location,
123 )
124 ref = output.strip()
125
126 if ref.startswith("refs/heads/"):
127 return ref[len("refs/heads/") :]
128
129 return None
130
131 @classmethod
132 def get_revision_sha(cls, dest: str, rev: str) -> Tuple[Optional[str], bool]:
133 """
134 Return (sha_or_none, is_branch), where sha_or_none is a commit hash
135 if the revision names a remote branch or tag, otherwise None.
136
137 Args:
138 dest: the repository directory.
139 rev: the revision name.
140 """
141 # Pass rev to pre-filter the list.
142 output = cls.run_command(
143 ["show-ref", rev],
144 cwd=dest,
145 show_stdout=False,
146 stdout_only=True,
147 on_returncode="ignore",
148 )
149 refs = {}
150 # NOTE: We do not use splitlines here since that would split on other
151 # unicode separators, which can be maliciously used to install a
152 # different revision.
153 for line in output.strip().split("\n"):
154 line = line.rstrip("\r")
155 if not line:
156 continue
157 try:
158 ref_sha, ref_name = line.split(" ", maxsplit=2)
159 except ValueError:
160 # Include the offending line to simplify troubleshooting if
161 # this error ever occurs.
162 raise ValueError(f"unexpected show-ref line: {line!r}")
163
164 refs[ref_name] = ref_sha
165
166 branch_ref = f"refs/remotes/origin/{rev}"
167 tag_ref = f"refs/tags/{rev}"
168
169 sha = refs.get(branch_ref)
170 if sha is not None:
171 return (sha, True)
172
173 sha = refs.get(tag_ref)
174
175 return (sha, False)
176
177 @classmethod
178 def _should_fetch(cls, dest: str, rev: str) -> bool:
179 """
180 Return true if rev is a ref or is a commit that we don't have locally.
181
182 Branches and tags are not considered in this method because they are
183 assumed to be always available locally (which is a normal outcome of
184 ``git clone`` and ``git fetch --tags``).
185 """
186 if rev.startswith("refs/"):
187 # Always fetch remote refs.
188 return True
189
190 if not looks_like_hash(rev):
191 # Git fetch would fail with abbreviated commits.
192 return False
193
194 if cls.has_commit(dest, rev):
195 # Don't fetch if we have the commit locally.
196 return False
197
198 return True
199
200 @classmethod
201 def resolve_revision(
202 cls, dest: str, url: HiddenText, rev_options: RevOptions
203 ) -> RevOptions:
204 """
205 Resolve a revision to a new RevOptions object with the SHA1 of the
206 branch, tag, or ref if found.
207
208 Args:
209 rev_options: a RevOptions object.
210 """
211 rev = rev_options.arg_rev
212 # The arg_rev property's implementation for Git ensures that the
213 # rev return value is always non-None.
214 assert rev is not None
215
216 sha, is_branch = cls.get_revision_sha(dest, rev)
217
218 if sha is not None:
219 rev_options = rev_options.make_new(sha)
220 rev_options.branch_name = rev if is_branch else None
221
222 return rev_options
223
224 # Do not show a warning for the common case of something that has
225 # the form of a Git commit hash.
226 if not looks_like_hash(rev):
227 logger.warning(
228 "Did not find branch or tag '%s', assuming revision or ref.",
229 rev,
230 )
231
232 if not cls._should_fetch(dest, rev):
233 return rev_options
234
235 # fetch the requested revision
236 cls.run_command(
237 make_command("fetch", "-q", url, rev_options.to_args()),
238 cwd=dest,
239 )
240 # Change the revision to the SHA of the ref we fetched
241 sha = cls.get_revision(dest, rev="FETCH_HEAD")
242 rev_options = rev_options.make_new(sha)
243
244 return rev_options
245
246 @classmethod
247 def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
248 """
249 Return whether the current commit hash equals the given name.
250
251 Args:
252 dest: the repository directory.
253 name: a string name.
254 """
255 if not name:
256 # Then avoid an unnecessary subprocess call.
257 return False
258
259 return cls.get_revision(dest) == name
260
261 def fetch_new(
262 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
263 ) -> None:
264 rev_display = rev_options.to_display()
265 logger.info("Cloning %s%s to %s", url, rev_display, display_path(dest))
266 if verbosity <= 0:
267 flags: Tuple[str, ...] = ("--quiet",)
268 elif verbosity == 1:
269 flags = ()
270 else:
271 flags = ("--verbose", "--progress")
272 if self.get_git_version() >= (2, 17):
273 # Git added support for partial clone in 2.17
274 # https://git-scm.com/docs/partial-clone
275 # Speeds up cloning by functioning without a complete copy of repository
276 self.run_command(
277 make_command(
278 "clone",
279 "--filter=blob:none",
280 *flags,
281 url,
282 dest,
283 )
284 )
285 else:
286 self.run_command(make_command("clone", *flags, url, dest))
287
288 if rev_options.rev:
289 # Then a specific revision was requested.
290 rev_options = self.resolve_revision(dest, url, rev_options)
291 branch_name = getattr(rev_options, "branch_name", None)
292 logger.debug("Rev options %s, branch_name %s", rev_options, branch_name)
293 if branch_name is None:
294 # Only do a checkout if the current commit id doesn't match
295 # the requested revision.
296 if not self.is_commit_id_equal(dest, rev_options.rev):
297 cmd_args = make_command(
298 "checkout",
299 "-q",
300 rev_options.to_args(),
301 )
302 self.run_command(cmd_args, cwd=dest)
303 elif self.get_current_branch(dest) != branch_name:
304 # Then a specific branch was requested, and that branch
305 # is not yet checked out.
306 track_branch = f"origin/{branch_name}"
307 cmd_args = [
308 "checkout",
309 "-b",
310 branch_name,
311 "--track",
312 track_branch,
313 ]
314 self.run_command(cmd_args, cwd=dest)
315 else:
316 sha = self.get_revision(dest)
317 rev_options = rev_options.make_new(sha)
318
319 logger.info("Resolved %s to commit %s", url, rev_options.rev)
320
321 #: repo may contain submodules
322 self.update_submodules(dest)
323
324 def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
325 self.run_command(
326 make_command("config", "remote.origin.url", url),
327 cwd=dest,
328 )
329 cmd_args = make_command("checkout", "-q", rev_options.to_args())
330 self.run_command(cmd_args, cwd=dest)
331
332 self.update_submodules(dest)
333
334 def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
335 # First fetch changes from the default remote
336 if self.get_git_version() >= (1, 9):
337 # fetch tags in addition to everything else
338 self.run_command(["fetch", "-q", "--tags"], cwd=dest)
339 else:
340 self.run_command(["fetch", "-q"], cwd=dest)
341 # Then reset to wanted revision (maybe even origin/master)
342 rev_options = self.resolve_revision(dest, url, rev_options)
343 cmd_args = make_command("reset", "--hard", "-q", rev_options.to_args())
344 self.run_command(cmd_args, cwd=dest)
345 #: update submodules
346 self.update_submodules(dest)
347
348 @classmethod
349 def get_remote_url(cls, location: str) -> str:
350 """
351 Return URL of the first remote encountered.
352
353 Raises RemoteNotFoundError if the repository does not have a remote
354 url configured.
355 """
356 # We need to pass 1 for extra_ok_returncodes since the command
357 # exits with return code 1 if there are no matching lines.
358 stdout = cls.run_command(
359 ["config", "--get-regexp", r"remote\..*\.url"],
360 extra_ok_returncodes=(1,),
361 show_stdout=False,
362 stdout_only=True,
363 cwd=location,
364 )
365 remotes = stdout.splitlines()
366 try:
367 found_remote = remotes[0]
368 except IndexError:
369 raise RemoteNotFoundError
370
371 for remote in remotes:
372 if remote.startswith("remote.origin.url "):
373 found_remote = remote
374 break
375 url = found_remote.split(" ")[1]
376 return cls._git_remote_to_pip_url(url.strip())
377
378 @staticmethod
379 def _git_remote_to_pip_url(url: str) -> str:
380 """
381 Convert a remote url from what git uses to what pip accepts.
382
383 There are 3 legal forms **url** may take:
384
385 1. A fully qualified url: ssh://git@example.com/foo/bar.git
386 2. A local project.git folder: /path/to/bare/repository.git
387 3. SCP shorthand for form 1: git@example.com:foo/bar.git
388
389 Form 1 is output as-is. Form 2 must be converted to URI and form 3 must
390 be converted to form 1.
391
392 See the corresponding test test_git_remote_url_to_pip() for examples of
393 sample inputs/outputs.
394 """
395 if re.match(r"\w+://", url):
396 # This is already valid. Pass it though as-is.
397 return url
398 if os.path.exists(url):
399 # A local bare remote (git clone --mirror).
400 # Needs a file:// prefix.
401 return pathlib.PurePath(url).as_uri()
402 scp_match = SCP_REGEX.match(url)
403 if scp_match:
404 # Add an ssh:// prefix and replace the ':' with a '/'.
405 return scp_match.expand(r"ssh://\1\2/\3")
406 # Otherwise, bail out.
407 raise RemoteNotValidError(url)
408
409 @classmethod
410 def has_commit(cls, location: str, rev: str) -> bool:
411 """
412 Check if rev is a commit that is available in the local repository.
413 """
414 try:
415 cls.run_command(
416 ["rev-parse", "-q", "--verify", "sha^" + rev],
417 cwd=location,
418 log_failed_cmd=False,
419 )
420 except InstallationError:
421 return False
422 else:
423 return True
424
425 @classmethod
426 def get_revision(cls, location: str, rev: Optional[str] = None) -> str:
427 if rev is None:
428 rev = "HEAD"
429 current_rev = cls.run_command(
430 ["rev-parse", rev],
431 show_stdout=False,
432 stdout_only=True,
433 cwd=location,
434 )
435 return current_rev.strip()
436
437 @classmethod
438 def get_subdirectory(cls, location: str) -> Optional[str]:
439 """
440 Return the path to Python project root, relative to the repo root.
441 Return None if the project root is in the repo root.
442 """
443 # find the repo root
444 git_dir = cls.run_command(
445 ["rev-parse", "--git-dir"],
446 show_stdout=False,
447 stdout_only=True,
448 cwd=location,
449 ).strip()
450 if not os.path.isabs(git_dir):
451 git_dir = os.path.join(location, git_dir)
452 repo_root = os.path.abspath(os.path.join(git_dir, ".."))
453 return find_path_to_project_root_from_repo_root(location, repo_root)
454
455 @classmethod
456 def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
457 """
458 Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'.
459 That's required because although they use SSH they sometimes don't
460 work with a ssh:// scheme (e.g. GitHub). But we need a scheme for
461 parsing. Hence we remove it again afterwards and return it as a stub.
462 """
463 # Works around an apparent Git bug
464 # (see https://article.gmane.org/gmane.comp.version-control.git/146500)
465 scheme, netloc, path, query, fragment = urlsplit(url)
466 if scheme.endswith("file"):
467 initial_slashes = path[: -len(path.lstrip("/"))]
468 newpath = initial_slashes + urllib.request.url2pathname(path).replace(
469 "\\", "/"
470 ).lstrip("/")
471 after_plus = scheme.find("+") + 1
472 url = scheme[:after_plus] + urlunsplit(
473 (scheme[after_plus:], netloc, newpath, query, fragment),
474 )
475
476 if "://" not in url:
477 assert "file:" not in url
478 url = url.replace("git+", "git+ssh://")
479 url, rev, user_pass = super().get_url_rev_and_auth(url)
480 url = url.replace("ssh://", "")
481 else:
482 url, rev, user_pass = super().get_url_rev_and_auth(url)
483
484 return url, rev, user_pass
485
486 @classmethod
487 def update_submodules(cls, location: str) -> None:
488 if not os.path.exists(os.path.join(location, ".gitmodules")):
489 return
490 cls.run_command(
491 ["submodule", "update", "--init", "--recursive", "-q"],
492 cwd=location,
493 )
494
495 @classmethod
496 def get_repository_root(cls, location: str) -> Optional[str]:
497 loc = super().get_repository_root(location)
498 if loc:
499 return loc
500 try:
501 r = cls.run_command(
502 ["rev-parse", "--show-toplevel"],
503 cwd=location,
504 show_stdout=False,
505 stdout_only=True,
506 on_returncode="raise",
507 log_failed_cmd=False,
508 )
509 except BadCommand:
510 logger.debug(
511 "could not determine if %s is under git control "
512 "because git is not available",
513 location,
514 )
515 return None
516 except InstallationError:
517 return None
518 return os.path.normpath(r.rstrip("\r\n"))
519
520 @staticmethod
521 def should_add_vcs_url_prefix(repo_url: str) -> bool:
522 """In either https or ssh form, requirements must be prefixed with git+."""
523 return True
524
525
526 vcs.register(Git)