Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/importlib_resources/readers.py: 47%

100 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1import collections 

2import contextlib 

3import itertools 

4import pathlib 

5import operator 

6import re 

7 

8from . import abc 

9 

10from ._itertools import only 

11from ._compat import ZipPath, ensure_traversable 

12 

13 

14def remove_duplicates(items): 

15 return iter(collections.OrderedDict.fromkeys(items)) 

16 

17 

18class FileReader(abc.TraversableResources): 

19 def __init__(self, loader): 

20 self.path = pathlib.Path(loader.path).parent 

21 

22 def resource_path(self, resource): 

23 """ 

24 Return the file system path to prevent 

25 `resources.path()` from creating a temporary 

26 copy. 

27 """ 

28 return str(self.path.joinpath(resource)) 

29 

30 def files(self): 

31 return self.path 

32 

33 

34class ZipReader(abc.TraversableResources): 

35 def __init__(self, loader, module): 

36 _, _, name = module.rpartition('.') 

37 self.prefix = loader.prefix.replace('\\', '/') + name + '/' 

38 self.archive = loader.archive 

39 

40 def open_resource(self, resource): 

41 try: 

42 return super().open_resource(resource) 

43 except KeyError as exc: 

44 raise FileNotFoundError(exc.args[0]) 

45 

46 def is_resource(self, path): 

47 """ 

48 Workaround for `zipfile.Path.is_file` returning true 

49 for non-existent paths. 

50 """ 

51 target = self.files().joinpath(path) 

52 return target.is_file() and target.exists() 

53 

54 def files(self): 

55 return ZipPath(self.archive, self.prefix) 

56 

57 

58class MultiplexedPath(abc.Traversable): 

59 """ 

60 Given a series of Traversable objects, implement a merged 

61 version of the interface across all objects. Useful for 

62 namespace packages which may be multihomed at a single 

63 name. 

64 """ 

65 

66 def __init__(self, *paths): 

67 self._paths = list(map(ensure_traversable, remove_duplicates(paths))) 

68 if not self._paths: 

69 message = 'MultiplexedPath must contain at least one path' 

70 raise FileNotFoundError(message) 

71 if not all(path.is_dir() for path in self._paths): 

72 raise NotADirectoryError('MultiplexedPath only supports directories') 

73 

74 def iterdir(self): 

75 children = (child for path in self._paths for child in path.iterdir()) 

76 by_name = operator.attrgetter('name') 

77 groups = itertools.groupby(sorted(children, key=by_name), key=by_name) 

78 return map(self._follow, (locs for name, locs in groups)) 

79 

80 def read_bytes(self): 

81 raise FileNotFoundError(f'{self} is not a file') 

82 

83 def read_text(self, *args, **kwargs): 

84 raise FileNotFoundError(f'{self} is not a file') 

85 

86 def is_dir(self): 

87 return True 

88 

89 def is_file(self): 

90 return False 

91 

92 def joinpath(self, *descendants): 

93 try: 

94 return super().joinpath(*descendants) 

95 except abc.TraversalError: 

96 # One of the paths did not resolve (a directory does not exist). 

97 # Just return something that will not exist. 

98 return self._paths[0].joinpath(*descendants) 

99 

100 @classmethod 

101 def _follow(cls, children): 

102 """ 

103 Construct a MultiplexedPath if needed. 

104 

105 If children contains a sole element, return it. 

106 Otherwise, return a MultiplexedPath of the items. 

107 Unless one of the items is not a Directory, then return the first. 

108 """ 

109 subdirs, one_dir, one_file = itertools.tee(children, 3) 

110 

111 try: 

112 return only(one_dir) 

113 except ValueError: 

114 try: 

115 return cls(*subdirs) 

116 except NotADirectoryError: 

117 return next(one_file) 

118 

119 def open(self, *args, **kwargs): 

120 raise FileNotFoundError(f'{self} is not a file') 

121 

122 @property 

123 def name(self): 

124 return self._paths[0].name 

125 

126 def __repr__(self): 

127 paths = ', '.join(f"'{path}'" for path in self._paths) 

128 return f'MultiplexedPath({paths})' 

129 

130 

131class NamespaceReader(abc.TraversableResources): 

132 def __init__(self, namespace_path): 

133 if 'NamespacePath' not in str(namespace_path): 

134 raise ValueError('Invalid path') 

135 self.path = MultiplexedPath(*map(self._resolve, namespace_path)) 

136 

137 @classmethod 

138 def _resolve(cls, path_str) -> abc.Traversable: 

139 r""" 

140 Given an item from a namespace path, resolve it to a Traversable. 

141 

142 path_str might be a directory on the filesystem or a path to a 

143 zipfile plus the path within the zipfile, e.g. ``/foo/bar`` or 

144 ``/foo/baz.zip/inner_dir`` or ``foo\baz.zip\inner_dir\sub``. 

145 """ 

146 (dir,) = (cand for cand in cls._candidate_paths(path_str) if cand.is_dir()) 

147 return dir 

148 

149 @classmethod 

150 def _candidate_paths(cls, path_str): 

151 yield pathlib.Path(path_str) 

152 yield from cls._resolve_zip_path(path_str) 

153 

154 @staticmethod 

155 def _resolve_zip_path(path_str): 

156 for match in reversed(list(re.finditer(r'[\\/]', path_str))): 

157 with contextlib.suppress( 

158 FileNotFoundError, IsADirectoryError, PermissionError 

159 ): 

160 inner = path_str[match.end() :].replace('\\', '/') + '/' 

161 yield ZipPath(path_str[: match.start()], inner.lstrip('/')) 

162 

163 def resource_path(self, resource): 

164 """ 

165 Return the file system path to prevent 

166 `resources.path()` from creating a temporary 

167 copy. 

168 """ 

169 return str(self.path.joinpath(resource)) 

170 

171 def files(self): 

172 return self.path