]> jfr.im git - dlqueue.git/blob - venv/lib/python3.11/site-packages/pip/_internal/models/direct_url.py
init: venv aand flask
[dlqueue.git] / venv / lib / python3.11 / site-packages / pip / _internal / models / direct_url.py
1 """ PEP 610 """
2 import json
3 import re
4 import urllib.parse
5 from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union
6
7 __all__ = [
8 "DirectUrl",
9 "DirectUrlValidationError",
10 "DirInfo",
11 "ArchiveInfo",
12 "VcsInfo",
13 ]
14
15 T = TypeVar("T")
16
17 DIRECT_URL_METADATA_NAME = "direct_url.json"
18 ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
19
20
21 class DirectUrlValidationError(Exception):
22 pass
23
24
25 def _get(
26 d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
27 ) -> Optional[T]:
28 """Get value from dictionary and verify expected type."""
29 if key not in d:
30 return default
31 value = d[key]
32 if not isinstance(value, expected_type):
33 raise DirectUrlValidationError(
34 "{!r} has unexpected type for {} (expected {})".format(
35 value, key, expected_type
36 )
37 )
38 return value
39
40
41 def _get_required(
42 d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
43 ) -> T:
44 value = _get(d, expected_type, key, default)
45 if value is None:
46 raise DirectUrlValidationError(f"{key} must have a value")
47 return value
48
49
50 def _exactly_one_of(infos: Iterable[Optional["InfoType"]]) -> "InfoType":
51 infos = [info for info in infos if info is not None]
52 if not infos:
53 raise DirectUrlValidationError(
54 "missing one of archive_info, dir_info, vcs_info"
55 )
56 if len(infos) > 1:
57 raise DirectUrlValidationError(
58 "more than one of archive_info, dir_info, vcs_info"
59 )
60 assert infos[0] is not None
61 return infos[0]
62
63
64 def _filter_none(**kwargs: Any) -> Dict[str, Any]:
65 """Make dict excluding None values."""
66 return {k: v for k, v in kwargs.items() if v is not None}
67
68
69 class VcsInfo:
70 name = "vcs_info"
71
72 def __init__(
73 self,
74 vcs: str,
75 commit_id: str,
76 requested_revision: Optional[str] = None,
77 ) -> None:
78 self.vcs = vcs
79 self.requested_revision = requested_revision
80 self.commit_id = commit_id
81
82 @classmethod
83 def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["VcsInfo"]:
84 if d is None:
85 return None
86 return cls(
87 vcs=_get_required(d, str, "vcs"),
88 commit_id=_get_required(d, str, "commit_id"),
89 requested_revision=_get(d, str, "requested_revision"),
90 )
91
92 def _to_dict(self) -> Dict[str, Any]:
93 return _filter_none(
94 vcs=self.vcs,
95 requested_revision=self.requested_revision,
96 commit_id=self.commit_id,
97 )
98
99
100 class ArchiveInfo:
101 name = "archive_info"
102
103 def __init__(
104 self,
105 hash: Optional[str] = None,
106 hashes: Optional[Dict[str, str]] = None,
107 ) -> None:
108 # set hashes before hash, since the hash setter will further populate hashes
109 self.hashes = hashes
110 self.hash = hash
111
112 @property
113 def hash(self) -> Optional[str]:
114 return self._hash
115
116 @hash.setter
117 def hash(self, value: Optional[str]) -> None:
118 if value is not None:
119 # Auto-populate the hashes key to upgrade to the new format automatically.
120 # We don't back-populate the legacy hash key from hashes.
121 try:
122 hash_name, hash_value = value.split("=", 1)
123 except ValueError:
124 raise DirectUrlValidationError(
125 f"invalid archive_info.hash format: {value!r}"
126 )
127 if self.hashes is None:
128 self.hashes = {hash_name: hash_value}
129 elif hash_name not in self.hashes:
130 self.hashes = self.hashes.copy()
131 self.hashes[hash_name] = hash_value
132 self._hash = value
133
134 @classmethod
135 def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["ArchiveInfo"]:
136 if d is None:
137 return None
138 return cls(hash=_get(d, str, "hash"), hashes=_get(d, dict, "hashes"))
139
140 def _to_dict(self) -> Dict[str, Any]:
141 return _filter_none(hash=self.hash, hashes=self.hashes)
142
143
144 class DirInfo:
145 name = "dir_info"
146
147 def __init__(
148 self,
149 editable: bool = False,
150 ) -> None:
151 self.editable = editable
152
153 @classmethod
154 def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["DirInfo"]:
155 if d is None:
156 return None
157 return cls(editable=_get_required(d, bool, "editable", default=False))
158
159 def _to_dict(self) -> Dict[str, Any]:
160 return _filter_none(editable=self.editable or None)
161
162
163 InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
164
165
166 class DirectUrl:
167 def __init__(
168 self,
169 url: str,
170 info: InfoType,
171 subdirectory: Optional[str] = None,
172 ) -> None:
173 self.url = url
174 self.info = info
175 self.subdirectory = subdirectory
176
177 def _remove_auth_from_netloc(self, netloc: str) -> str:
178 if "@" not in netloc:
179 return netloc
180 user_pass, netloc_no_user_pass = netloc.split("@", 1)
181 if (
182 isinstance(self.info, VcsInfo)
183 and self.info.vcs == "git"
184 and user_pass == "git"
185 ):
186 return netloc
187 if ENV_VAR_RE.match(user_pass):
188 return netloc
189 return netloc_no_user_pass
190
191 @property
192 def redacted_url(self) -> str:
193 """url with user:password part removed unless it is formed with
194 environment variables as specified in PEP 610, or it is ``git``
195 in the case of a git URL.
196 """
197 purl = urllib.parse.urlsplit(self.url)
198 netloc = self._remove_auth_from_netloc(purl.netloc)
199 surl = urllib.parse.urlunsplit(
200 (purl.scheme, netloc, purl.path, purl.query, purl.fragment)
201 )
202 return surl
203
204 def validate(self) -> None:
205 self.from_dict(self.to_dict())
206
207 @classmethod
208 def from_dict(cls, d: Dict[str, Any]) -> "DirectUrl":
209 return DirectUrl(
210 url=_get_required(d, str, "url"),
211 subdirectory=_get(d, str, "subdirectory"),
212 info=_exactly_one_of(
213 [
214 ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
215 DirInfo._from_dict(_get(d, dict, "dir_info")),
216 VcsInfo._from_dict(_get(d, dict, "vcs_info")),
217 ]
218 ),
219 )
220
221 def to_dict(self) -> Dict[str, Any]:
222 res = _filter_none(
223 url=self.redacted_url,
224 subdirectory=self.subdirectory,
225 )
226 res[self.info.name] = self.info._to_dict()
227 return res
228
229 @classmethod
230 def from_json(cls, s: str) -> "DirectUrl":
231 return cls.from_dict(json.loads(s))
232
233 def to_json(self) -> str:
234 return json.dumps(self.to_dict(), sort_keys=True)
235
236 def is_local_editable(self) -> bool:
237 return isinstance(self.info, DirInfo) and self.info.editable