Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/models/direct_url.py: 41%

121 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-26 06:33 +0000

1""" PEP 610 """ 

2import json 

3import re 

4import urllib.parse 

5from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union 

6 

7__all__ = [ 

8 "DirectUrl", 

9 "DirectUrlValidationError", 

10 "DirInfo", 

11 "ArchiveInfo", 

12 "VcsInfo", 

13] 

14 

15T = TypeVar("T") 

16 

17DIRECT_URL_METADATA_NAME = "direct_url.json" 

18ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$") 

19 

20 

21class DirectUrlValidationError(Exception): 

22 pass 

23 

24 

25def _get( 

26 d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None 

27) -> Optional[T]: 

28 """Get value from dictionary and verify expected type.""" 

29 if key not in d: 

30 return default 

31 value = d[key] 

32 if not isinstance(value, expected_type): 

33 raise DirectUrlValidationError( 

34 f"{value!r} has unexpected type for {key} (expected {expected_type})" 

35 ) 

36 return value 

37 

38 

39def _get_required( 

40 d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None 

41) -> T: 

42 value = _get(d, expected_type, key, default) 

43 if value is None: 

44 raise DirectUrlValidationError(f"{key} must have a value") 

45 return value 

46 

47 

48def _exactly_one_of(infos: Iterable[Optional["InfoType"]]) -> "InfoType": 

49 infos = [info for info in infos if info is not None] 

50 if not infos: 

51 raise DirectUrlValidationError( 

52 "missing one of archive_info, dir_info, vcs_info" 

53 ) 

54 if len(infos) > 1: 

55 raise DirectUrlValidationError( 

56 "more than one of archive_info, dir_info, vcs_info" 

57 ) 

58 assert infos[0] is not None 

59 return infos[0] 

60 

61 

62def _filter_none(**kwargs: Any) -> Dict[str, Any]: 

63 """Make dict excluding None values.""" 

64 return {k: v for k, v in kwargs.items() if v is not None} 

65 

66 

67class VcsInfo: 

68 name = "vcs_info" 

69 

70 def __init__( 

71 self, 

72 vcs: str, 

73 commit_id: str, 

74 requested_revision: Optional[str] = None, 

75 ) -> None: 

76 self.vcs = vcs 

77 self.requested_revision = requested_revision 

78 self.commit_id = commit_id 

79 

80 @classmethod 

81 def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["VcsInfo"]: 

82 if d is None: 

83 return None 

84 return cls( 

85 vcs=_get_required(d, str, "vcs"), 

86 commit_id=_get_required(d, str, "commit_id"), 

87 requested_revision=_get(d, str, "requested_revision"), 

88 ) 

89 

90 def _to_dict(self) -> Dict[str, Any]: 

91 return _filter_none( 

92 vcs=self.vcs, 

93 requested_revision=self.requested_revision, 

94 commit_id=self.commit_id, 

95 ) 

96 

97 

98class ArchiveInfo: 

99 name = "archive_info" 

100 

101 def __init__( 

102 self, 

103 hash: Optional[str] = None, 

104 hashes: Optional[Dict[str, str]] = None, 

105 ) -> None: 

106 # set hashes before hash, since the hash setter will further populate hashes 

107 self.hashes = hashes 

108 self.hash = hash 

109 

110 @property 

111 def hash(self) -> Optional[str]: 

112 return self._hash 

113 

114 @hash.setter 

115 def hash(self, value: Optional[str]) -> None: 

116 if value is not None: 

117 # Auto-populate the hashes key to upgrade to the new format automatically. 

118 # We don't back-populate the legacy hash key from hashes. 

119 try: 

120 hash_name, hash_value = value.split("=", 1) 

121 except ValueError: 

122 raise DirectUrlValidationError( 

123 f"invalid archive_info.hash format: {value!r}" 

124 ) 

125 if self.hashes is None: 

126 self.hashes = {hash_name: hash_value} 

127 elif hash_name not in self.hashes: 

128 self.hashes = self.hashes.copy() 

129 self.hashes[hash_name] = hash_value 

130 self._hash = value 

131 

132 @classmethod 

133 def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["ArchiveInfo"]: 

134 if d is None: 

135 return None 

136 return cls(hash=_get(d, str, "hash"), hashes=_get(d, dict, "hashes")) 

137 

138 def _to_dict(self) -> Dict[str, Any]: 

139 return _filter_none(hash=self.hash, hashes=self.hashes) 

140 

141 

142class DirInfo: 

143 name = "dir_info" 

144 

145 def __init__( 

146 self, 

147 editable: bool = False, 

148 ) -> None: 

149 self.editable = editable 

150 

151 @classmethod 

152 def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["DirInfo"]: 

153 if d is None: 

154 return None 

155 return cls(editable=_get_required(d, bool, "editable", default=False)) 

156 

157 def _to_dict(self) -> Dict[str, Any]: 

158 return _filter_none(editable=self.editable or None) 

159 

160 

161InfoType = Union[ArchiveInfo, DirInfo, VcsInfo] 

162 

163 

164class DirectUrl: 

165 def __init__( 

166 self, 

167 url: str, 

168 info: InfoType, 

169 subdirectory: Optional[str] = None, 

170 ) -> None: 

171 self.url = url 

172 self.info = info 

173 self.subdirectory = subdirectory 

174 

175 def _remove_auth_from_netloc(self, netloc: str) -> str: 

176 if "@" not in netloc: 

177 return netloc 

178 user_pass, netloc_no_user_pass = netloc.split("@", 1) 

179 if ( 

180 isinstance(self.info, VcsInfo) 

181 and self.info.vcs == "git" 

182 and user_pass == "git" 

183 ): 

184 return netloc 

185 if ENV_VAR_RE.match(user_pass): 

186 return netloc 

187 return netloc_no_user_pass 

188 

189 @property 

190 def redacted_url(self) -> str: 

191 """url with user:password part removed unless it is formed with 

192 environment variables as specified in PEP 610, or it is ``git`` 

193 in the case of a git URL. 

194 """ 

195 purl = urllib.parse.urlsplit(self.url) 

196 netloc = self._remove_auth_from_netloc(purl.netloc) 

197 surl = urllib.parse.urlunsplit( 

198 (purl.scheme, netloc, purl.path, purl.query, purl.fragment) 

199 ) 

200 return surl 

201 

202 def validate(self) -> None: 

203 self.from_dict(self.to_dict()) 

204 

205 @classmethod 

206 def from_dict(cls, d: Dict[str, Any]) -> "DirectUrl": 

207 return DirectUrl( 

208 url=_get_required(d, str, "url"), 

209 subdirectory=_get(d, str, "subdirectory"), 

210 info=_exactly_one_of( 

211 [ 

212 ArchiveInfo._from_dict(_get(d, dict, "archive_info")), 

213 DirInfo._from_dict(_get(d, dict, "dir_info")), 

214 VcsInfo._from_dict(_get(d, dict, "vcs_info")), 

215 ] 

216 ), 

217 ) 

218 

219 def to_dict(self) -> Dict[str, Any]: 

220 res = _filter_none( 

221 url=self.redacted_url, 

222 subdirectory=self.subdirectory, 

223 ) 

224 res[self.info.name] = self.info._to_dict() 

225 return res 

226 

227 @classmethod 

228 def from_json(cls, s: str) -> "DirectUrl": 

229 return cls.from_dict(json.loads(s)) 

230 

231 def to_json(self) -> str: 

232 return json.dumps(self.to_dict(), sort_keys=True) 

233 

234 def is_local_editable(self) -> bool: 

235 return isinstance(self.info, DirInfo) and self.info.editable