Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/metadata/pkg_resources.py: 49%

169 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-26 06:33 +0000

1import email.message 

2import email.parser 

3import logging 

4import os 

5import zipfile 

6from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional 

7 

8from pip._vendor import pkg_resources 

9from pip._vendor.packaging.requirements import Requirement 

10from pip._vendor.packaging.utils import NormalizedName, canonicalize_name 

11from pip._vendor.packaging.version import parse as parse_version 

12 

13from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel 

14from pip._internal.utils.egg_link import egg_link_path_from_location 

15from pip._internal.utils.misc import display_path, normalize_path 

16from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file 

17 

18from .base import ( 

19 BaseDistribution, 

20 BaseEntryPoint, 

21 BaseEnvironment, 

22 DistributionVersion, 

23 InfoPath, 

24 Wheel, 

25) 

26 

27__all__ = ["NAME", "Distribution", "Environment"] 

28 

29logger = logging.getLogger(__name__) 

30 

31NAME = "pkg_resources" 

32 

33 

34class EntryPoint(NamedTuple): 

35 name: str 

36 value: str 

37 group: str 

38 

39 

40class InMemoryMetadata: 

41 """IMetadataProvider that reads metadata files from a dictionary. 

42 

43 This also maps metadata decoding exceptions to our internal exception type. 

44 """ 

45 

46 def __init__(self, metadata: Mapping[str, bytes], wheel_name: str) -> None: 

47 self._metadata = metadata 

48 self._wheel_name = wheel_name 

49 

50 def has_metadata(self, name: str) -> bool: 

51 return name in self._metadata 

52 

53 def get_metadata(self, name: str) -> str: 

54 try: 

55 return self._metadata[name].decode() 

56 except UnicodeDecodeError as e: 

57 # Augment the default error with the origin of the file. 

58 raise UnsupportedWheel( 

59 f"Error decoding metadata for {self._wheel_name}: {e} in {name} file" 

60 ) 

61 

62 def get_metadata_lines(self, name: str) -> Iterable[str]: 

63 return pkg_resources.yield_lines(self.get_metadata(name)) 

64 

65 def metadata_isdir(self, name: str) -> bool: 

66 return False 

67 

68 def metadata_listdir(self, name: str) -> List[str]: 

69 return [] 

70 

71 def run_script(self, script_name: str, namespace: str) -> None: 

72 pass 

73 

74 

75class Distribution(BaseDistribution): 

76 def __init__(self, dist: pkg_resources.Distribution) -> None: 

77 self._dist = dist 

78 

79 @classmethod 

80 def from_directory(cls, directory: str) -> BaseDistribution: 

81 dist_dir = directory.rstrip(os.sep) 

82 

83 # Build a PathMetadata object, from path to metadata. :wink: 

84 base_dir, dist_dir_name = os.path.split(dist_dir) 

85 metadata = pkg_resources.PathMetadata(base_dir, dist_dir) 

86 

87 # Determine the correct Distribution object type. 

88 if dist_dir.endswith(".egg-info"): 

89 dist_cls = pkg_resources.Distribution 

90 dist_name = os.path.splitext(dist_dir_name)[0] 

91 else: 

92 assert dist_dir.endswith(".dist-info") 

93 dist_cls = pkg_resources.DistInfoDistribution 

94 dist_name = os.path.splitext(dist_dir_name)[0].split("-")[0] 

95 

96 dist = dist_cls(base_dir, project_name=dist_name, metadata=metadata) 

97 return cls(dist) 

98 

99 @classmethod 

100 def from_metadata_file_contents( 

101 cls, 

102 metadata_contents: bytes, 

103 filename: str, 

104 project_name: str, 

105 ) -> BaseDistribution: 

106 metadata_dict = { 

107 "METADATA": metadata_contents, 

108 } 

109 dist = pkg_resources.DistInfoDistribution( 

110 location=filename, 

111 metadata=InMemoryMetadata(metadata_dict, filename), 

112 project_name=project_name, 

113 ) 

114 return cls(dist) 

115 

116 @classmethod 

117 def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution: 

118 try: 

119 with wheel.as_zipfile() as zf: 

120 info_dir, _ = parse_wheel(zf, name) 

121 metadata_dict = { 

122 path.split("/", 1)[-1]: read_wheel_metadata_file(zf, path) 

123 for path in zf.namelist() 

124 if path.startswith(f"{info_dir}/") 

125 } 

126 except zipfile.BadZipFile as e: 

127 raise InvalidWheel(wheel.location, name) from e 

128 except UnsupportedWheel as e: 

129 raise UnsupportedWheel(f"{name} has an invalid wheel, {e}") 

130 dist = pkg_resources.DistInfoDistribution( 

131 location=wheel.location, 

132 metadata=InMemoryMetadata(metadata_dict, wheel.location), 

133 project_name=name, 

134 ) 

135 return cls(dist) 

136 

137 @property 

138 def location(self) -> Optional[str]: 

139 return self._dist.location 

140 

141 @property 

142 def installed_location(self) -> Optional[str]: 

143 egg_link = egg_link_path_from_location(self.raw_name) 

144 if egg_link: 

145 location = egg_link 

146 elif self.location: 

147 location = self.location 

148 else: 

149 return None 

150 return normalize_path(location) 

151 

152 @property 

153 def info_location(self) -> Optional[str]: 

154 return self._dist.egg_info 

155 

156 @property 

157 def installed_by_distutils(self) -> bool: 

158 # A distutils-installed distribution is provided by FileMetadata. This 

159 # provider has a "path" attribute not present anywhere else. Not the 

160 # best introspection logic, but pip has been doing this for a long time. 

161 try: 

162 return bool(self._dist._provider.path) 

163 except AttributeError: 

164 return False 

165 

166 @property 

167 def canonical_name(self) -> NormalizedName: 

168 return canonicalize_name(self._dist.project_name) 

169 

170 @property 

171 def version(self) -> DistributionVersion: 

172 return parse_version(self._dist.version) 

173 

174 def is_file(self, path: InfoPath) -> bool: 

175 return self._dist.has_metadata(str(path)) 

176 

177 def iter_distutils_script_names(self) -> Iterator[str]: 

178 yield from self._dist.metadata_listdir("scripts") 

179 

180 def read_text(self, path: InfoPath) -> str: 

181 name = str(path) 

182 if not self._dist.has_metadata(name): 

183 raise FileNotFoundError(name) 

184 content = self._dist.get_metadata(name) 

185 if content is None: 

186 raise NoneMetadataError(self, name) 

187 return content 

188 

189 def iter_entry_points(self) -> Iterable[BaseEntryPoint]: 

190 for group, entries in self._dist.get_entry_map().items(): 

191 for name, entry_point in entries.items(): 

192 name, _, value = str(entry_point).partition("=") 

193 yield EntryPoint(name=name.strip(), value=value.strip(), group=group) 

194 

195 def _metadata_impl(self) -> email.message.Message: 

196 """ 

197 :raises NoneMetadataError: if the distribution reports `has_metadata()` 

198 True but `get_metadata()` returns None. 

199 """ 

200 if isinstance(self._dist, pkg_resources.DistInfoDistribution): 

201 metadata_name = "METADATA" 

202 else: 

203 metadata_name = "PKG-INFO" 

204 try: 

205 metadata = self.read_text(metadata_name) 

206 except FileNotFoundError: 

207 if self.location: 

208 displaying_path = display_path(self.location) 

209 else: 

210 displaying_path = repr(self.location) 

211 logger.warning("No metadata found in %s", displaying_path) 

212 metadata = "" 

213 feed_parser = email.parser.FeedParser() 

214 feed_parser.feed(metadata) 

215 return feed_parser.close() 

216 

217 def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]: 

218 if extras: # pkg_resources raises on invalid extras, so we sanitize. 

219 extras = frozenset(pkg_resources.safe_extra(e) for e in extras) 

220 extras = extras.intersection(self._dist.extras) 

221 return self._dist.requires(extras) 

222 

223 def iter_provided_extras(self) -> Iterable[str]: 

224 return self._dist.extras 

225 

226 def is_extra_provided(self, extra: str) -> bool: 

227 return pkg_resources.safe_extra(extra) in self._dist.extras 

228 

229 

230class Environment(BaseEnvironment): 

231 def __init__(self, ws: pkg_resources.WorkingSet) -> None: 

232 self._ws = ws 

233 

234 @classmethod 

235 def default(cls) -> BaseEnvironment: 

236 return cls(pkg_resources.working_set) 

237 

238 @classmethod 

239 def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment: 

240 return cls(pkg_resources.WorkingSet(paths)) 

241 

242 def _iter_distributions(self) -> Iterator[BaseDistribution]: 

243 for dist in self._ws: 

244 yield Distribution(dist) 

245 

246 def _search_distribution(self, name: str) -> Optional[BaseDistribution]: 

247 """Find a distribution matching the ``name`` in the environment. 

248 

249 This searches from *all* distributions available in the environment, to 

250 match the behavior of ``pkg_resources.get_distribution()``. 

251 """ 

252 canonical_name = canonicalize_name(name) 

253 for dist in self.iter_all_distributions(): 

254 if dist.canonical_name == canonical_name: 

255 return dist 

256 return None 

257 

258 def get_distribution(self, name: str) -> Optional[BaseDistribution]: 

259 # Search the distribution by looking through the working set. 

260 dist = self._search_distribution(name) 

261 if dist: 

262 return dist 

263 

264 # If distribution could not be found, call working_set.require to 

265 # update the working set, and try to find the distribution again. 

266 # This might happen for e.g. when you install a package twice, once 

267 # using setup.py develop and again using setup.py install. Now when 

268 # running pip uninstall twice, the package gets removed from the 

269 # working set in the first uninstall, so we have to populate the 

270 # working set again so that pip knows about it and the packages gets 

271 # picked up and is successfully uninstalled the second time too. 

272 try: 

273 # We didn't pass in any version specifiers, so this can never 

274 # raise pkg_resources.VersionConflict. 

275 self._ws.require(name) 

276 except pkg_resources.DistributionNotFound: 

277 return None 

278 return self._search_distribution(name)