Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/models/direct_url.py: 41%
121 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
1""" PEP 610 """
2import json
3import re
4import urllib.parse
5from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union
7__all__ = [
8 "DirectUrl",
9 "DirectUrlValidationError",
10 "DirInfo",
11 "ArchiveInfo",
12 "VcsInfo",
13]
15T = TypeVar("T")
17DIRECT_URL_METADATA_NAME = "direct_url.json"
18ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
21class DirectUrlValidationError(Exception):
22 pass
25def _get(
26 d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
27) -> Optional[T]:
28 """Get value from dictionary and verify expected type."""
29 if key not in d:
30 return default
31 value = d[key]
32 if not isinstance(value, expected_type):
33 raise DirectUrlValidationError(
34 f"{value!r} has unexpected type for {key} (expected {expected_type})"
35 )
36 return value
39def _get_required(
40 d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None
41) -> T:
42 value = _get(d, expected_type, key, default)
43 if value is None:
44 raise DirectUrlValidationError(f"{key} must have a value")
45 return value
48def _exactly_one_of(infos: Iterable[Optional["InfoType"]]) -> "InfoType":
49 infos = [info for info in infos if info is not None]
50 if not infos:
51 raise DirectUrlValidationError(
52 "missing one of archive_info, dir_info, vcs_info"
53 )
54 if len(infos) > 1:
55 raise DirectUrlValidationError(
56 "more than one of archive_info, dir_info, vcs_info"
57 )
58 assert infos[0] is not None
59 return infos[0]
62def _filter_none(**kwargs: Any) -> Dict[str, Any]:
63 """Make dict excluding None values."""
64 return {k: v for k, v in kwargs.items() if v is not None}
67class VcsInfo:
68 name = "vcs_info"
70 def __init__(
71 self,
72 vcs: str,
73 commit_id: str,
74 requested_revision: Optional[str] = None,
75 ) -> None:
76 self.vcs = vcs
77 self.requested_revision = requested_revision
78 self.commit_id = commit_id
80 @classmethod
81 def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["VcsInfo"]:
82 if d is None:
83 return None
84 return cls(
85 vcs=_get_required(d, str, "vcs"),
86 commit_id=_get_required(d, str, "commit_id"),
87 requested_revision=_get(d, str, "requested_revision"),
88 )
90 def _to_dict(self) -> Dict[str, Any]:
91 return _filter_none(
92 vcs=self.vcs,
93 requested_revision=self.requested_revision,
94 commit_id=self.commit_id,
95 )
98class ArchiveInfo:
99 name = "archive_info"
101 def __init__(
102 self,
103 hash: Optional[str] = None,
104 hashes: Optional[Dict[str, str]] = None,
105 ) -> None:
106 # set hashes before hash, since the hash setter will further populate hashes
107 self.hashes = hashes
108 self.hash = hash
110 @property
111 def hash(self) -> Optional[str]:
112 return self._hash
114 @hash.setter
115 def hash(self, value: Optional[str]) -> None:
116 if value is not None:
117 # Auto-populate the hashes key to upgrade to the new format automatically.
118 # We don't back-populate the legacy hash key from hashes.
119 try:
120 hash_name, hash_value = value.split("=", 1)
121 except ValueError:
122 raise DirectUrlValidationError(
123 f"invalid archive_info.hash format: {value!r}"
124 )
125 if self.hashes is None:
126 self.hashes = {hash_name: hash_value}
127 elif hash_name not in self.hashes:
128 self.hashes = self.hashes.copy()
129 self.hashes[hash_name] = hash_value
130 self._hash = value
132 @classmethod
133 def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["ArchiveInfo"]:
134 if d is None:
135 return None
136 return cls(hash=_get(d, str, "hash"), hashes=_get(d, dict, "hashes"))
138 def _to_dict(self) -> Dict[str, Any]:
139 return _filter_none(hash=self.hash, hashes=self.hashes)
142class DirInfo:
143 name = "dir_info"
145 def __init__(
146 self,
147 editable: bool = False,
148 ) -> None:
149 self.editable = editable
151 @classmethod
152 def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["DirInfo"]:
153 if d is None:
154 return None
155 return cls(editable=_get_required(d, bool, "editable", default=False))
157 def _to_dict(self) -> Dict[str, Any]:
158 return _filter_none(editable=self.editable or None)
161InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
164class DirectUrl:
165 def __init__(
166 self,
167 url: str,
168 info: InfoType,
169 subdirectory: Optional[str] = None,
170 ) -> None:
171 self.url = url
172 self.info = info
173 self.subdirectory = subdirectory
175 def _remove_auth_from_netloc(self, netloc: str) -> str:
176 if "@" not in netloc:
177 return netloc
178 user_pass, netloc_no_user_pass = netloc.split("@", 1)
179 if (
180 isinstance(self.info, VcsInfo)
181 and self.info.vcs == "git"
182 and user_pass == "git"
183 ):
184 return netloc
185 if ENV_VAR_RE.match(user_pass):
186 return netloc
187 return netloc_no_user_pass
189 @property
190 def redacted_url(self) -> str:
191 """url with user:password part removed unless it is formed with
192 environment variables as specified in PEP 610, or it is ``git``
193 in the case of a git URL.
194 """
195 purl = urllib.parse.urlsplit(self.url)
196 netloc = self._remove_auth_from_netloc(purl.netloc)
197 surl = urllib.parse.urlunsplit(
198 (purl.scheme, netloc, purl.path, purl.query, purl.fragment)
199 )
200 return surl
202 def validate(self) -> None:
203 self.from_dict(self.to_dict())
205 @classmethod
206 def from_dict(cls, d: Dict[str, Any]) -> "DirectUrl":
207 return DirectUrl(
208 url=_get_required(d, str, "url"),
209 subdirectory=_get(d, str, "subdirectory"),
210 info=_exactly_one_of(
211 [
212 ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
213 DirInfo._from_dict(_get(d, dict, "dir_info")),
214 VcsInfo._from_dict(_get(d, dict, "vcs_info")),
215 ]
216 ),
217 )
219 def to_dict(self) -> Dict[str, Any]:
220 res = _filter_none(
221 url=self.redacted_url,
222 subdirectory=self.subdirectory,
223 )
224 res[self.info.name] = self.info._to_dict()
225 return res
227 @classmethod
228 def from_json(cls, s: str) -> "DirectUrl":
229 return cls.from_dict(json.loads(s))
231 def to_json(self) -> str:
232 return json.dumps(self.to_dict(), sort_keys=True)
234 def is_local_editable(self) -> bool:
235 return isinstance(self.info, DirInfo) and self.info.editable