]>
Commit | Line | Data |
---|---|---|
e0df8241 JR |
1 | """ PEP 610 """ |
2 | import json | |
3 | import re | |
4 | import urllib.parse | |
5 | from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union | |
6 | ||
7 | __all__ = [ | |
8 | "DirectUrl", | |
9 | "DirectUrlValidationError", | |
10 | "DirInfo", | |
11 | "ArchiveInfo", | |
12 | "VcsInfo", | |
13 | ] | |
14 | ||
15 | T = TypeVar("T") | |
16 | ||
17 | DIRECT_URL_METADATA_NAME = "direct_url.json" | |
18 | ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$") | |
19 | ||
20 | ||
21 | class DirectUrlValidationError(Exception): | |
22 | pass | |
23 | ||
24 | ||
25 | def _get( | |
26 | d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None | |
27 | ) -> Optional[T]: | |
28 | """Get value from dictionary and verify expected type.""" | |
29 | if key not in d: | |
30 | return default | |
31 | value = d[key] | |
32 | if not isinstance(value, expected_type): | |
33 | raise DirectUrlValidationError( | |
34 | "{!r} has unexpected type for {} (expected {})".format( | |
35 | value, key, expected_type | |
36 | ) | |
37 | ) | |
38 | return value | |
39 | ||
40 | ||
41 | def _get_required( | |
42 | d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None | |
43 | ) -> T: | |
44 | value = _get(d, expected_type, key, default) | |
45 | if value is None: | |
46 | raise DirectUrlValidationError(f"{key} must have a value") | |
47 | return value | |
48 | ||
49 | ||
50 | def _exactly_one_of(infos: Iterable[Optional["InfoType"]]) -> "InfoType": | |
51 | infos = [info for info in infos if info is not None] | |
52 | if not infos: | |
53 | raise DirectUrlValidationError( | |
54 | "missing one of archive_info, dir_info, vcs_info" | |
55 | ) | |
56 | if len(infos) > 1: | |
57 | raise DirectUrlValidationError( | |
58 | "more than one of archive_info, dir_info, vcs_info" | |
59 | ) | |
60 | assert infos[0] is not None | |
61 | return infos[0] | |
62 | ||
63 | ||
64 | def _filter_none(**kwargs: Any) -> Dict[str, Any]: | |
65 | """Make dict excluding None values.""" | |
66 | return {k: v for k, v in kwargs.items() if v is not None} | |
67 | ||
68 | ||
69 | class VcsInfo: | |
70 | name = "vcs_info" | |
71 | ||
72 | def __init__( | |
73 | self, | |
74 | vcs: str, | |
75 | commit_id: str, | |
76 | requested_revision: Optional[str] = None, | |
77 | ) -> None: | |
78 | self.vcs = vcs | |
79 | self.requested_revision = requested_revision | |
80 | self.commit_id = commit_id | |
81 | ||
82 | @classmethod | |
83 | def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["VcsInfo"]: | |
84 | if d is None: | |
85 | return None | |
86 | return cls( | |
87 | vcs=_get_required(d, str, "vcs"), | |
88 | commit_id=_get_required(d, str, "commit_id"), | |
89 | requested_revision=_get(d, str, "requested_revision"), | |
90 | ) | |
91 | ||
92 | def _to_dict(self) -> Dict[str, Any]: | |
93 | return _filter_none( | |
94 | vcs=self.vcs, | |
95 | requested_revision=self.requested_revision, | |
96 | commit_id=self.commit_id, | |
97 | ) | |
98 | ||
99 | ||
100 | class ArchiveInfo: | |
101 | name = "archive_info" | |
102 | ||
103 | def __init__( | |
104 | self, | |
105 | hash: Optional[str] = None, | |
106 | hashes: Optional[Dict[str, str]] = None, | |
107 | ) -> None: | |
108 | # set hashes before hash, since the hash setter will further populate hashes | |
109 | self.hashes = hashes | |
110 | self.hash = hash | |
111 | ||
112 | @property | |
113 | def hash(self) -> Optional[str]: | |
114 | return self._hash | |
115 | ||
116 | @hash.setter | |
117 | def hash(self, value: Optional[str]) -> None: | |
118 | if value is not None: | |
119 | # Auto-populate the hashes key to upgrade to the new format automatically. | |
120 | # We don't back-populate the legacy hash key from hashes. | |
121 | try: | |
122 | hash_name, hash_value = value.split("=", 1) | |
123 | except ValueError: | |
124 | raise DirectUrlValidationError( | |
125 | f"invalid archive_info.hash format: {value!r}" | |
126 | ) | |
127 | if self.hashes is None: | |
128 | self.hashes = {hash_name: hash_value} | |
129 | elif hash_name not in self.hashes: | |
130 | self.hashes = self.hashes.copy() | |
131 | self.hashes[hash_name] = hash_value | |
132 | self._hash = value | |
133 | ||
134 | @classmethod | |
135 | def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["ArchiveInfo"]: | |
136 | if d is None: | |
137 | return None | |
138 | return cls(hash=_get(d, str, "hash"), hashes=_get(d, dict, "hashes")) | |
139 | ||
140 | def _to_dict(self) -> Dict[str, Any]: | |
141 | return _filter_none(hash=self.hash, hashes=self.hashes) | |
142 | ||
143 | ||
144 | class DirInfo: | |
145 | name = "dir_info" | |
146 | ||
147 | def __init__( | |
148 | self, | |
149 | editable: bool = False, | |
150 | ) -> None: | |
151 | self.editable = editable | |
152 | ||
153 | @classmethod | |
154 | def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["DirInfo"]: | |
155 | if d is None: | |
156 | return None | |
157 | return cls(editable=_get_required(d, bool, "editable", default=False)) | |
158 | ||
159 | def _to_dict(self) -> Dict[str, Any]: | |
160 | return _filter_none(editable=self.editable or None) | |
161 | ||
162 | ||
163 | InfoType = Union[ArchiveInfo, DirInfo, VcsInfo] | |
164 | ||
165 | ||
166 | class DirectUrl: | |
167 | def __init__( | |
168 | self, | |
169 | url: str, | |
170 | info: InfoType, | |
171 | subdirectory: Optional[str] = None, | |
172 | ) -> None: | |
173 | self.url = url | |
174 | self.info = info | |
175 | self.subdirectory = subdirectory | |
176 | ||
177 | def _remove_auth_from_netloc(self, netloc: str) -> str: | |
178 | if "@" not in netloc: | |
179 | return netloc | |
180 | user_pass, netloc_no_user_pass = netloc.split("@", 1) | |
181 | if ( | |
182 | isinstance(self.info, VcsInfo) | |
183 | and self.info.vcs == "git" | |
184 | and user_pass == "git" | |
185 | ): | |
186 | return netloc | |
187 | if ENV_VAR_RE.match(user_pass): | |
188 | return netloc | |
189 | return netloc_no_user_pass | |
190 | ||
191 | @property | |
192 | def redacted_url(self) -> str: | |
193 | """url with user:password part removed unless it is formed with | |
194 | environment variables as specified in PEP 610, or it is ``git`` | |
195 | in the case of a git URL. | |
196 | """ | |
197 | purl = urllib.parse.urlsplit(self.url) | |
198 | netloc = self._remove_auth_from_netloc(purl.netloc) | |
199 | surl = urllib.parse.urlunsplit( | |
200 | (purl.scheme, netloc, purl.path, purl.query, purl.fragment) | |
201 | ) | |
202 | return surl | |
203 | ||
204 | def validate(self) -> None: | |
205 | self.from_dict(self.to_dict()) | |
206 | ||
207 | @classmethod | |
208 | def from_dict(cls, d: Dict[str, Any]) -> "DirectUrl": | |
209 | return DirectUrl( | |
210 | url=_get_required(d, str, "url"), | |
211 | subdirectory=_get(d, str, "subdirectory"), | |
212 | info=_exactly_one_of( | |
213 | [ | |
214 | ArchiveInfo._from_dict(_get(d, dict, "archive_info")), | |
215 | DirInfo._from_dict(_get(d, dict, "dir_info")), | |
216 | VcsInfo._from_dict(_get(d, dict, "vcs_info")), | |
217 | ] | |
218 | ), | |
219 | ) | |
220 | ||
221 | def to_dict(self) -> Dict[str, Any]: | |
222 | res = _filter_none( | |
223 | url=self.redacted_url, | |
224 | subdirectory=self.subdirectory, | |
225 | ) | |
226 | res[self.info.name] = self.info._to_dict() | |
227 | return res | |
228 | ||
229 | @classmethod | |
230 | def from_json(cls, s: str) -> "DirectUrl": | |
231 | return cls.from_dict(json.loads(s)) | |
232 | ||
233 | def to_json(self) -> str: | |
234 | return json.dumps(self.to_dict(), sort_keys=True) | |
235 | ||
236 | def is_local_editable(self) -> bool: | |
237 | return isinstance(self.info, DirInfo) and self.info.editable |