5 from typing
import Any
, Dict
, Iterable
, Optional
, Type
, TypeVar
, Union
9 "DirectUrlValidationError",
17 DIRECT_URL_METADATA_NAME
= "direct_url.json"
18 ENV_VAR_RE
= re
.compile(r
"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
21 class DirectUrlValidationError(Exception):
26 d
: Dict
[str, Any
], expected_type
: Type
[T
], key
: str, default
: Optional
[T
] = None
28 """Get value from dictionary and verify expected type."""
32 if not isinstance(value
, expected_type
):
33 raise DirectUrlValidationError(
34 "{!r} has unexpected type for {} (expected {})".format(
35 value
, key
, expected_type
42 d
: Dict
[str, Any
], expected_type
: Type
[T
], key
: str, default
: Optional
[T
] = None
44 value
= _get(d
, expected_type
, key
, default
)
46 raise DirectUrlValidationError(f
"{key} must have a value")
50 def _exactly_one_of(infos
: Iterable
[Optional
["InfoType"]]) -> "InfoType":
51 infos
= [info
for info
in infos
if info
is not None]
53 raise DirectUrlValidationError(
54 "missing one of archive_info, dir_info, vcs_info"
57 raise DirectUrlValidationError(
58 "more than one of archive_info, dir_info, vcs_info"
60 assert infos
[0] is not None
64 def _filter_none(**kwargs
: Any
) -> Dict
[str, Any
]:
65 """Make dict excluding None values."""
66 return {k: v for k, v in kwargs.items() if v is not None}
76 requested_revision
: Optional
[str] = None,
79 self
.requested_revision
= requested_revision
80 self
.commit_id
= commit_id
83 def _from_dict(cls
, d
: Optional
[Dict
[str, Any
]]) -> Optional
["VcsInfo"]:
87 vcs
=_get_required(d
, str, "vcs"),
88 commit_id
=_get_required(d
, str, "commit_id"),
89 requested_revision
=_get(d
, str, "requested_revision"),
92 def _to_dict(self
) -> Dict
[str, Any
]:
95 requested_revision
=self
.requested_revision
,
96 commit_id
=self
.commit_id
,
101 name
= "archive_info"
105 hash: Optional
[str] = None,
106 hashes
: Optional
[Dict
[str, str]] = None,
108 # set hashes before hash, since the hash setter will further populate hashes
113 def hash(self
) -> Optional
[str]:
117 def hash(self
, value
: Optional
[str]) -> None:
118 if value
is not None:
119 # Auto-populate the hashes key to upgrade to the new format automatically.
120 # We don't back-populate the legacy hash key from hashes.
122 hash_name
, hash_value
= value
.split("=", 1)
124 raise DirectUrlValidationError(
125 f
"invalid archive_info.hash format: {value!r}"
127 if self
.hashes
is None:
128 self
.hashes
= {hash_name: hash_value}
129 elif hash_name
not in self
.hashes
:
130 self
.hashes
= self
.hashes
.copy()
131 self
.hashes
[hash_name
] = hash_value
135 def _from_dict(cls
, d
: Optional
[Dict
[str, Any
]]) -> Optional
["ArchiveInfo"]:
138 return cls(hash=_get(d
, str, "hash"), hashes
=_get(d
, dict, "hashes"))
140 def _to_dict(self
) -> Dict
[str, Any
]:
141 return _filter_none(hash=self
.hash, hashes
=self
.hashes
)
149 editable
: bool = False,
151 self
.editable
= editable
154 def _from_dict(cls
, d
: Optional
[Dict
[str, Any
]]) -> Optional
["DirInfo"]:
157 return cls(editable
=_get_required(d
, bool, "editable", default
=False))
159 def _to_dict(self
) -> Dict
[str, Any
]:
160 return _filter_none(editable
=self
.editable
or None)
163 InfoType
= Union
[ArchiveInfo
, DirInfo
, VcsInfo
]
171 subdirectory
: Optional
[str] = None,
175 self
.subdirectory
= subdirectory
177 def _remove_auth_from_netloc(self
, netloc
: str) -> str:
178 if "@" not in netloc
:
180 user_pass
, netloc_no_user_pass
= netloc
.split("@", 1)
182 isinstance(self
.info
, VcsInfo
)
183 and self
.info
.vcs
== "git"
184 and user_pass
== "git"
187 if ENV_VAR_RE
.match(user_pass
):
189 return netloc_no_user_pass
192 def redacted_url(self
) -> str:
193 """url with user:password part removed unless it is formed with
194 environment variables as specified in PEP 610, or it is ``git``
195 in the case of a git URL.
197 purl
= urllib
.parse
.urlsplit(self
.url
)
198 netloc
= self
._remove
_auth
_from
_netloc
(purl
.netloc
)
199 surl
= urllib
.parse
.urlunsplit(
200 (purl
.scheme
, netloc
, purl
.path
, purl
.query
, purl
.fragment
)
204 def validate(self
) -> None:
205 self
.from_dict(self
.to_dict())
208 def from_dict(cls
, d
: Dict
[str, Any
]) -> "DirectUrl":
210 url
=_get_required(d
, str, "url"),
211 subdirectory
=_get(d
, str, "subdirectory"),
212 info
=_exactly_one_of(
214 ArchiveInfo
._from
_dict
(_get(d
, dict, "archive_info")),
215 DirInfo
._from
_dict
(_get(d
, dict, "dir_info")),
216 VcsInfo
._from
_dict
(_get(d
, dict, "vcs_info")),
221 def to_dict(self
) -> Dict
[str, Any
]:
223 url
=self
.redacted_url
,
224 subdirectory
=self
.subdirectory
,
226 res
[self
.info
.name
] = self
.info
._to
_dict
()
230 def from_json(cls
, s
: str) -> "DirectUrl":
231 return cls
.from_dict(json
.loads(s
))
233 def to_json(self
) -> str:
234 return json
.dumps(self
.to_dict(), sort_keys
=True)
236 def is_local_editable(self
) -> bool:
237 return isinstance(self
.info
, DirInfo
) and self
.info
.editable