Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/metadata/pkg_resources.py: 46%

164 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1import email.message 

2import email.parser 

3import logging 

4import os 

5import zipfile 

6from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional 

7 

8from pip._vendor import pkg_resources 

9from pip._vendor.packaging.requirements import Requirement 

10from pip._vendor.packaging.utils import NormalizedName, canonicalize_name 

11from pip._vendor.packaging.version import parse as parse_version 

12 

13from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel 

14from pip._internal.utils.egg_link import egg_link_path_from_location 

15from pip._internal.utils.misc import display_path, normalize_path 

16from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file 

17 

18from .base import ( 

19 BaseDistribution, 

20 BaseEntryPoint, 

21 BaseEnvironment, 

22 DistributionVersion, 

23 InfoPath, 

24 Wheel, 

25) 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30class EntryPoint(NamedTuple): 

31 name: str 

32 value: str 

33 group: str 

34 

35 

36class InMemoryMetadata: 

37 """IMetadataProvider that reads metadata files from a dictionary. 

38 

39 This also maps metadata decoding exceptions to our internal exception type. 

40 """ 

41 

42 def __init__(self, metadata: Mapping[str, bytes], wheel_name: str) -> None: 

43 self._metadata = metadata 

44 self._wheel_name = wheel_name 

45 

46 def has_metadata(self, name: str) -> bool: 

47 return name in self._metadata 

48 

49 def get_metadata(self, name: str) -> str: 

50 try: 

51 return self._metadata[name].decode() 

52 except UnicodeDecodeError as e: 

53 # Augment the default error with the origin of the file. 

54 raise UnsupportedWheel( 

55 f"Error decoding metadata for {self._wheel_name}: {e} in {name} file" 

56 ) 

57 

58 def get_metadata_lines(self, name: str) -> Iterable[str]: 

59 return pkg_resources.yield_lines(self.get_metadata(name)) 

60 

61 def metadata_isdir(self, name: str) -> bool: 

62 return False 

63 

64 def metadata_listdir(self, name: str) -> List[str]: 

65 return [] 

66 

67 def run_script(self, script_name: str, namespace: str) -> None: 

68 pass 

69 

70 

71class Distribution(BaseDistribution): 

72 def __init__(self, dist: pkg_resources.Distribution) -> None: 

73 self._dist = dist 

74 

75 @classmethod 

76 def from_directory(cls, directory: str) -> BaseDistribution: 

77 dist_dir = directory.rstrip(os.sep) 

78 

79 # Build a PathMetadata object, from path to metadata. :wink: 

80 base_dir, dist_dir_name = os.path.split(dist_dir) 

81 metadata = pkg_resources.PathMetadata(base_dir, dist_dir) 

82 

83 # Determine the correct Distribution object type. 

84 if dist_dir.endswith(".egg-info"): 

85 dist_cls = pkg_resources.Distribution 

86 dist_name = os.path.splitext(dist_dir_name)[0] 

87 else: 

88 assert dist_dir.endswith(".dist-info") 

89 dist_cls = pkg_resources.DistInfoDistribution 

90 dist_name = os.path.splitext(dist_dir_name)[0].split("-")[0] 

91 

92 dist = dist_cls(base_dir, project_name=dist_name, metadata=metadata) 

93 return cls(dist) 

94 

95 @classmethod 

96 def from_metadata_file_contents( 

97 cls, 

98 metadata_contents: bytes, 

99 filename: str, 

100 project_name: str, 

101 ) -> BaseDistribution: 

102 metadata_dict = { 

103 "METADATA": metadata_contents, 

104 } 

105 dist = pkg_resources.DistInfoDistribution( 

106 location=filename, 

107 metadata=InMemoryMetadata(metadata_dict, filename), 

108 project_name=project_name, 

109 ) 

110 return cls(dist) 

111 

112 @classmethod 

113 def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution: 

114 try: 

115 with wheel.as_zipfile() as zf: 

116 info_dir, _ = parse_wheel(zf, name) 

117 metadata_dict = { 

118 path.split("/", 1)[-1]: read_wheel_metadata_file(zf, path) 

119 for path in zf.namelist() 

120 if path.startswith(f"{info_dir}/") 

121 } 

122 except zipfile.BadZipFile as e: 

123 raise InvalidWheel(wheel.location, name) from e 

124 except UnsupportedWheel as e: 

125 raise UnsupportedWheel(f"{name} has an invalid wheel, {e}") 

126 dist = pkg_resources.DistInfoDistribution( 

127 location=wheel.location, 

128 metadata=InMemoryMetadata(metadata_dict, wheel.location), 

129 project_name=name, 

130 ) 

131 return cls(dist) 

132 

133 @property 

134 def location(self) -> Optional[str]: 

135 return self._dist.location 

136 

137 @property 

138 def installed_location(self) -> Optional[str]: 

139 egg_link = egg_link_path_from_location(self.raw_name) 

140 if egg_link: 

141 location = egg_link 

142 elif self.location: 

143 location = self.location 

144 else: 

145 return None 

146 return normalize_path(location) 

147 

148 @property 

149 def info_location(self) -> Optional[str]: 

150 return self._dist.egg_info 

151 

152 @property 

153 def installed_by_distutils(self) -> bool: 

154 # A distutils-installed distribution is provided by FileMetadata. This 

155 # provider has a "path" attribute not present anywhere else. Not the 

156 # best introspection logic, but pip has been doing this for a long time. 

157 try: 

158 return bool(self._dist._provider.path) 

159 except AttributeError: 

160 return False 

161 

162 @property 

163 def canonical_name(self) -> NormalizedName: 

164 return canonicalize_name(self._dist.project_name) 

165 

166 @property 

167 def version(self) -> DistributionVersion: 

168 return parse_version(self._dist.version) 

169 

170 def is_file(self, path: InfoPath) -> bool: 

171 return self._dist.has_metadata(str(path)) 

172 

173 def iter_distutils_script_names(self) -> Iterator[str]: 

174 yield from self._dist.metadata_listdir("scripts") 

175 

176 def read_text(self, path: InfoPath) -> str: 

177 name = str(path) 

178 if not self._dist.has_metadata(name): 

179 raise FileNotFoundError(name) 

180 content = self._dist.get_metadata(name) 

181 if content is None: 

182 raise NoneMetadataError(self, name) 

183 return content 

184 

185 def iter_entry_points(self) -> Iterable[BaseEntryPoint]: 

186 for group, entries in self._dist.get_entry_map().items(): 

187 for name, entry_point in entries.items(): 

188 name, _, value = str(entry_point).partition("=") 

189 yield EntryPoint(name=name.strip(), value=value.strip(), group=group) 

190 

191 def _metadata_impl(self) -> email.message.Message: 

192 """ 

193 :raises NoneMetadataError: if the distribution reports `has_metadata()` 

194 True but `get_metadata()` returns None. 

195 """ 

196 if isinstance(self._dist, pkg_resources.DistInfoDistribution): 

197 metadata_name = "METADATA" 

198 else: 

199 metadata_name = "PKG-INFO" 

200 try: 

201 metadata = self.read_text(metadata_name) 

202 except FileNotFoundError: 

203 if self.location: 

204 displaying_path = display_path(self.location) 

205 else: 

206 displaying_path = repr(self.location) 

207 logger.warning("No metadata found in %s", displaying_path) 

208 metadata = "" 

209 feed_parser = email.parser.FeedParser() 

210 feed_parser.feed(metadata) 

211 return feed_parser.close() 

212 

213 def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]: 

214 if extras: # pkg_resources raises on invalid extras, so we sanitize. 

215 extras = frozenset(extras).intersection(self._dist.extras) 

216 return self._dist.requires(extras) 

217 

218 def iter_provided_extras(self) -> Iterable[str]: 

219 return self._dist.extras 

220 

221 

222class Environment(BaseEnvironment): 

223 def __init__(self, ws: pkg_resources.WorkingSet) -> None: 

224 self._ws = ws 

225 

226 @classmethod 

227 def default(cls) -> BaseEnvironment: 

228 return cls(pkg_resources.working_set) 

229 

230 @classmethod 

231 def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment: 

232 return cls(pkg_resources.WorkingSet(paths)) 

233 

234 def _iter_distributions(self) -> Iterator[BaseDistribution]: 

235 for dist in self._ws: 

236 yield Distribution(dist) 

237 

238 def _search_distribution(self, name: str) -> Optional[BaseDistribution]: 

239 """Find a distribution matching the ``name`` in the environment. 

240 

241 This searches from *all* distributions available in the environment, to 

242 match the behavior of ``pkg_resources.get_distribution()``. 

243 """ 

244 canonical_name = canonicalize_name(name) 

245 for dist in self.iter_all_distributions(): 

246 if dist.canonical_name == canonical_name: 

247 return dist 

248 return None 

249 

250 def get_distribution(self, name: str) -> Optional[BaseDistribution]: 

251 # Search the distribution by looking through the working set. 

252 dist = self._search_distribution(name) 

253 if dist: 

254 return dist 

255 

256 # If distribution could not be found, call working_set.require to 

257 # update the working set, and try to find the distribution again. 

258 # This might happen for e.g. when you install a package twice, once 

259 # using setup.py develop and again using setup.py install. Now when 

260 # running pip uninstall twice, the package gets removed from the 

261 # working set in the first uninstall, so we have to populate the 

262 # working set again so that pip knows about it and the packages gets 

263 # picked up and is successfully uninstalled the second time too. 

264 try: 

265 # We didn't pass in any version specifiers, so this can never 

266 # raise pkg_resources.VersionConflict. 

267 self._ws.require(name) 

268 except pkg_resources.DistributionNotFound: 

269 return None 

270 return self._search_distribution(name)