Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/models/direct_url.py: 41%

121 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1""" PEP 610 """ 

2import json 

3import re 

4import urllib.parse 

5from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union 

6 

7__all__ = [ 

8 "DirectUrl", 

9 "DirectUrlValidationError", 

10 "DirInfo", 

11 "ArchiveInfo", 

12 "VcsInfo", 

13] 

14 

15T = TypeVar("T") 

16 

17DIRECT_URL_METADATA_NAME = "direct_url.json" 

18ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$") 

19 

20 

21class DirectUrlValidationError(Exception): 

22 pass 

23 

24 

25def _get( 

26 d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None 

27) -> Optional[T]: 

28 """Get value from dictionary and verify expected type.""" 

29 if key not in d: 

30 return default 

31 value = d[key] 

32 if not isinstance(value, expected_type): 

33 raise DirectUrlValidationError( 

34 "{!r} has unexpected type for {} (expected {})".format( 

35 value, key, expected_type 

36 ) 

37 ) 

38 return value 

39 

40 

41def _get_required( 

42 d: Dict[str, Any], expected_type: Type[T], key: str, default: Optional[T] = None 

43) -> T: 

44 value = _get(d, expected_type, key, default) 

45 if value is None: 

46 raise DirectUrlValidationError(f"{key} must have a value") 

47 return value 

48 

49 

50def _exactly_one_of(infos: Iterable[Optional["InfoType"]]) -> "InfoType": 

51 infos = [info for info in infos if info is not None] 

52 if not infos: 

53 raise DirectUrlValidationError( 

54 "missing one of archive_info, dir_info, vcs_info" 

55 ) 

56 if len(infos) > 1: 

57 raise DirectUrlValidationError( 

58 "more than one of archive_info, dir_info, vcs_info" 

59 ) 

60 assert infos[0] is not None 

61 return infos[0] 

62 

63 

64def _filter_none(**kwargs: Any) -> Dict[str, Any]: 

65 """Make dict excluding None values.""" 

66 return {k: v for k, v in kwargs.items() if v is not None} 

67 

68 

69class VcsInfo: 

70 name = "vcs_info" 

71 

72 def __init__( 

73 self, 

74 vcs: str, 

75 commit_id: str, 

76 requested_revision: Optional[str] = None, 

77 ) -> None: 

78 self.vcs = vcs 

79 self.requested_revision = requested_revision 

80 self.commit_id = commit_id 

81 

82 @classmethod 

83 def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["VcsInfo"]: 

84 if d is None: 

85 return None 

86 return cls( 

87 vcs=_get_required(d, str, "vcs"), 

88 commit_id=_get_required(d, str, "commit_id"), 

89 requested_revision=_get(d, str, "requested_revision"), 

90 ) 

91 

92 def _to_dict(self) -> Dict[str, Any]: 

93 return _filter_none( 

94 vcs=self.vcs, 

95 requested_revision=self.requested_revision, 

96 commit_id=self.commit_id, 

97 ) 

98 

99 

100class ArchiveInfo: 

101 name = "archive_info" 

102 

103 def __init__( 

104 self, 

105 hash: Optional[str] = None, 

106 hashes: Optional[Dict[str, str]] = None, 

107 ) -> None: 

108 # set hashes before hash, since the hash setter will further populate hashes 

109 self.hashes = hashes 

110 self.hash = hash 

111 

112 @property 

113 def hash(self) -> Optional[str]: 

114 return self._hash 

115 

116 @hash.setter 

117 def hash(self, value: Optional[str]) -> None: 

118 if value is not None: 

119 # Auto-populate the hashes key to upgrade to the new format automatically. 

120 # We don't back-populate the legacy hash key from hashes. 

121 try: 

122 hash_name, hash_value = value.split("=", 1) 

123 except ValueError: 

124 raise DirectUrlValidationError( 

125 f"invalid archive_info.hash format: {value!r}" 

126 ) 

127 if self.hashes is None: 

128 self.hashes = {hash_name: hash_value} 

129 elif hash_name not in self.hashes: 

130 self.hashes = self.hashes.copy() 

131 self.hashes[hash_name] = hash_value 

132 self._hash = value 

133 

134 @classmethod 

135 def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["ArchiveInfo"]: 

136 if d is None: 

137 return None 

138 return cls(hash=_get(d, str, "hash"), hashes=_get(d, dict, "hashes")) 

139 

140 def _to_dict(self) -> Dict[str, Any]: 

141 return _filter_none(hash=self.hash, hashes=self.hashes) 

142 

143 

144class DirInfo: 

145 name = "dir_info" 

146 

147 def __init__( 

148 self, 

149 editable: bool = False, 

150 ) -> None: 

151 self.editable = editable 

152 

153 @classmethod 

154 def _from_dict(cls, d: Optional[Dict[str, Any]]) -> Optional["DirInfo"]: 

155 if d is None: 

156 return None 

157 return cls(editable=_get_required(d, bool, "editable", default=False)) 

158 

159 def _to_dict(self) -> Dict[str, Any]: 

160 return _filter_none(editable=self.editable or None) 

161 

162 

163InfoType = Union[ArchiveInfo, DirInfo, VcsInfo] 

164 

165 

166class DirectUrl: 

167 def __init__( 

168 self, 

169 url: str, 

170 info: InfoType, 

171 subdirectory: Optional[str] = None, 

172 ) -> None: 

173 self.url = url 

174 self.info = info 

175 self.subdirectory = subdirectory 

176 

177 def _remove_auth_from_netloc(self, netloc: str) -> str: 

178 if "@" not in netloc: 

179 return netloc 

180 user_pass, netloc_no_user_pass = netloc.split("@", 1) 

181 if ( 

182 isinstance(self.info, VcsInfo) 

183 and self.info.vcs == "git" 

184 and user_pass == "git" 

185 ): 

186 return netloc 

187 if ENV_VAR_RE.match(user_pass): 

188 return netloc 

189 return netloc_no_user_pass 

190 

191 @property 

192 def redacted_url(self) -> str: 

193 """url with user:password part removed unless it is formed with 

194 environment variables as specified in PEP 610, or it is ``git`` 

195 in the case of a git URL. 

196 """ 

197 purl = urllib.parse.urlsplit(self.url) 

198 netloc = self._remove_auth_from_netloc(purl.netloc) 

199 surl = urllib.parse.urlunsplit( 

200 (purl.scheme, netloc, purl.path, purl.query, purl.fragment) 

201 ) 

202 return surl 

203 

204 def validate(self) -> None: 

205 self.from_dict(self.to_dict()) 

206 

207 @classmethod 

208 def from_dict(cls, d: Dict[str, Any]) -> "DirectUrl": 

209 return DirectUrl( 

210 url=_get_required(d, str, "url"), 

211 subdirectory=_get(d, str, "subdirectory"), 

212 info=_exactly_one_of( 

213 [ 

214 ArchiveInfo._from_dict(_get(d, dict, "archive_info")), 

215 DirInfo._from_dict(_get(d, dict, "dir_info")), 

216 VcsInfo._from_dict(_get(d, dict, "vcs_info")), 

217 ] 

218 ), 

219 ) 

220 

221 def to_dict(self) -> Dict[str, Any]: 

222 res = _filter_none( 

223 url=self.redacted_url, 

224 subdirectory=self.subdirectory, 

225 ) 

226 res[self.info.name] = self.info._to_dict() 

227 return res 

228 

229 @classmethod 

230 def from_json(cls, s: str) -> "DirectUrl": 

231 return cls.from_dict(json.loads(s)) 

232 

233 def to_json(self) -> str: 

234 return json.dumps(self.to_dict(), sort_keys=True) 

235 

236 def is_local_editable(self) -> bool: 

237 return isinstance(self.info, DirInfo) and self.info.editable