Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/importlib_resources/readers.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

110 statements  

1from __future__ import annotations 

2 

3import collections 

4import contextlib 

5import itertools 

6import pathlib 

7import operator 

8import re 

9import warnings 

10from collections.abc import Iterator 

11 

12from . import abc 

13 

14from ._itertools import only 

15from .compat.py39 import ZipPath 

16 

17 

18def remove_duplicates(items): 

19 return iter(collections.OrderedDict.fromkeys(items)) 

20 

21 

22class FileReader(abc.TraversableResources): 

23 def __init__(self, loader): 

24 self.path = pathlib.Path(loader.path).parent 

25 

26 def resource_path(self, resource): 

27 """ 

28 Return the file system path to prevent 

29 `resources.path()` from creating a temporary 

30 copy. 

31 """ 

32 return str(self.path.joinpath(resource)) 

33 

34 def files(self): 

35 return self.path 

36 

37 

38class ZipReader(abc.TraversableResources): 

39 def __init__(self, loader, module): 

40 self.prefix = loader.prefix.replace('\\', '/') 

41 if loader.is_package(module): 

42 _, _, name = module.rpartition('.') 

43 self.prefix += name + '/' 

44 self.archive = loader.archive 

45 

46 def open_resource(self, resource): 

47 try: 

48 return super().open_resource(resource) 

49 except KeyError as exc: 

50 raise FileNotFoundError(exc.args[0]) 

51 

52 def is_resource(self, path): 

53 """ 

54 Workaround for `zipfile.Path.is_file` returning true 

55 for non-existent paths. 

56 """ 

57 target = self.files().joinpath(path) 

58 return target.is_file() and target.exists() 

59 

60 def files(self): 

61 return ZipPath(self.archive, self.prefix) 

62 

63 

64class MultiplexedPath(abc.Traversable): 

65 """ 

66 Given a series of Traversable objects, implement a merged 

67 version of the interface across all objects. Useful for 

68 namespace packages which may be multihomed at a single 

69 name. 

70 """ 

71 

72 def __init__(self, *paths): 

73 self._paths = list(map(_ensure_traversable, remove_duplicates(paths))) 

74 if not self._paths: 

75 message = 'MultiplexedPath must contain at least one path' 

76 raise FileNotFoundError(message) 

77 if not all(path.is_dir() for path in self._paths): 

78 raise NotADirectoryError('MultiplexedPath only supports directories') 

79 

80 def iterdir(self): 

81 children = (child for path in self._paths for child in path.iterdir()) 

82 by_name = operator.attrgetter('name') 

83 groups = itertools.groupby(sorted(children, key=by_name), key=by_name) 

84 return map(self._follow, (locs for name, locs in groups)) 

85 

86 def read_bytes(self): 

87 raise FileNotFoundError(f'{self} is not a file') 

88 

89 def read_text(self, *args, **kwargs): 

90 raise FileNotFoundError(f'{self} is not a file') 

91 

92 def is_dir(self): 

93 return True 

94 

95 def is_file(self): 

96 return False 

97 

98 def joinpath(self, *descendants): 

99 try: 

100 return super().joinpath(*descendants) 

101 except abc.TraversalError: 

102 # One of the paths did not resolve (a directory does not exist). 

103 # Just return something that will not exist. 

104 return self._paths[0].joinpath(*descendants) 

105 

106 @classmethod 

107 def _follow(cls, children): 

108 """ 

109 Construct a MultiplexedPath if needed. 

110 

111 If children contains a sole element, return it. 

112 Otherwise, return a MultiplexedPath of the items. 

113 Unless one of the items is not a Directory, then return the first. 

114 """ 

115 subdirs, one_dir, one_file = itertools.tee(children, 3) 

116 

117 try: 

118 return only(one_dir) 

119 except ValueError: 

120 try: 

121 return cls(*subdirs) 

122 except NotADirectoryError: 

123 return next(one_file) 

124 

125 def open(self, *args, **kwargs): 

126 raise FileNotFoundError(f'{self} is not a file') 

127 

128 @property 

129 def name(self): 

130 return self._paths[0].name 

131 

132 def __repr__(self): 

133 paths = ', '.join(f"'{path}'" for path in self._paths) 

134 return f'MultiplexedPath({paths})' 

135 

136 

137class NamespaceReader(abc.TraversableResources): 

138 def __init__(self, namespace_path): 

139 if 'NamespacePath' not in str(namespace_path): 

140 raise ValueError('Invalid path') 

141 self.path = MultiplexedPath(*filter(bool, map(self._resolve, namespace_path))) 

142 

143 @classmethod 

144 def _resolve(cls, path_str) -> abc.Traversable | None: 

145 r""" 

146 Given an item from a namespace path, resolve it to a Traversable. 

147 

148 path_str might be a directory on the filesystem or a path to a 

149 zipfile plus the path within the zipfile, e.g. ``/foo/bar`` or 

150 ``/foo/baz.zip/inner_dir`` or ``foo\baz.zip\inner_dir\sub``. 

151 

152 path_str might also be a sentinel used by editable packages to 

153 trigger other behaviors (see python/importlib_resources#311). 

154 In that case, return None. 

155 """ 

156 dirs = (cand for cand in cls._candidate_paths(path_str) if cand.is_dir()) 

157 return next(dirs, None) 

158 

159 @classmethod 

160 def _candidate_paths(cls, path_str: str) -> Iterator[abc.Traversable]: 

161 yield pathlib.Path(path_str) 

162 yield from cls._resolve_zip_path(path_str) 

163 

164 @staticmethod 

165 def _resolve_zip_path(path_str: str): 

166 for match in reversed(list(re.finditer(r'[\\/]', path_str))): 

167 with contextlib.suppress( 

168 FileNotFoundError, 

169 IsADirectoryError, 

170 NotADirectoryError, 

171 PermissionError, 

172 ): 

173 inner = path_str[match.end() :].replace('\\', '/') + '/' 

174 yield ZipPath(path_str[: match.start()], inner.lstrip('/')) 

175 

176 def resource_path(self, resource): 

177 """ 

178 Return the file system path to prevent 

179 `resources.path()` from creating a temporary 

180 copy. 

181 """ 

182 return str(self.path.joinpath(resource)) 

183 

184 def files(self): 

185 return self.path 

186 

187 

188def _ensure_traversable(path): 

189 """ 

190 Convert deprecated string arguments to traversables (pathlib.Path). 

191 

192 Remove with Python 3.15. 

193 """ 

194 if not isinstance(path, str): 

195 return path 

196 

197 warnings.warn( 

198 "String arguments are deprecated. Pass a Traversable instead.", 

199 DeprecationWarning, 

200 stacklevel=3, 

201 ) 

202 

203 return pathlib.Path(path)