Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/importlib_resources/readers.py: 47%
100 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1import collections
2import contextlib
3import itertools
4import pathlib
5import operator
6import re
8from . import abc
10from ._itertools import only
11from ._compat import ZipPath, ensure_traversable
14def remove_duplicates(items):
15 return iter(collections.OrderedDict.fromkeys(items))
18class FileReader(abc.TraversableResources):
19 def __init__(self, loader):
20 self.path = pathlib.Path(loader.path).parent
22 def resource_path(self, resource):
23 """
24 Return the file system path to prevent
25 `resources.path()` from creating a temporary
26 copy.
27 """
28 return str(self.path.joinpath(resource))
30 def files(self):
31 return self.path
34class ZipReader(abc.TraversableResources):
35 def __init__(self, loader, module):
36 _, _, name = module.rpartition('.')
37 self.prefix = loader.prefix.replace('\\', '/') + name + '/'
38 self.archive = loader.archive
40 def open_resource(self, resource):
41 try:
42 return super().open_resource(resource)
43 except KeyError as exc:
44 raise FileNotFoundError(exc.args[0])
46 def is_resource(self, path):
47 """
48 Workaround for `zipfile.Path.is_file` returning true
49 for non-existent paths.
50 """
51 target = self.files().joinpath(path)
52 return target.is_file() and target.exists()
54 def files(self):
55 return ZipPath(self.archive, self.prefix)
58class MultiplexedPath(abc.Traversable):
59 """
60 Given a series of Traversable objects, implement a merged
61 version of the interface across all objects. Useful for
62 namespace packages which may be multihomed at a single
63 name.
64 """
66 def __init__(self, *paths):
67 self._paths = list(map(ensure_traversable, remove_duplicates(paths)))
68 if not self._paths:
69 message = 'MultiplexedPath must contain at least one path'
70 raise FileNotFoundError(message)
71 if not all(path.is_dir() for path in self._paths):
72 raise NotADirectoryError('MultiplexedPath only supports directories')
74 def iterdir(self):
75 children = (child for path in self._paths for child in path.iterdir())
76 by_name = operator.attrgetter('name')
77 groups = itertools.groupby(sorted(children, key=by_name), key=by_name)
78 return map(self._follow, (locs for name, locs in groups))
80 def read_bytes(self):
81 raise FileNotFoundError(f'{self} is not a file')
83 def read_text(self, *args, **kwargs):
84 raise FileNotFoundError(f'{self} is not a file')
86 def is_dir(self):
87 return True
89 def is_file(self):
90 return False
92 def joinpath(self, *descendants):
93 try:
94 return super().joinpath(*descendants)
95 except abc.TraversalError:
96 # One of the paths did not resolve (a directory does not exist).
97 # Just return something that will not exist.
98 return self._paths[0].joinpath(*descendants)
100 @classmethod
101 def _follow(cls, children):
102 """
103 Construct a MultiplexedPath if needed.
105 If children contains a sole element, return it.
106 Otherwise, return a MultiplexedPath of the items.
107 Unless one of the items is not a Directory, then return the first.
108 """
109 subdirs, one_dir, one_file = itertools.tee(children, 3)
111 try:
112 return only(one_dir)
113 except ValueError:
114 try:
115 return cls(*subdirs)
116 except NotADirectoryError:
117 return next(one_file)
119 def open(self, *args, **kwargs):
120 raise FileNotFoundError(f'{self} is not a file')
122 @property
123 def name(self):
124 return self._paths[0].name
126 def __repr__(self):
127 paths = ', '.join(f"'{path}'" for path in self._paths)
128 return f'MultiplexedPath({paths})'
131class NamespaceReader(abc.TraversableResources):
132 def __init__(self, namespace_path):
133 if 'NamespacePath' not in str(namespace_path):
134 raise ValueError('Invalid path')
135 self.path = MultiplexedPath(*map(self._resolve, namespace_path))
137 @classmethod
138 def _resolve(cls, path_str) -> abc.Traversable:
139 r"""
140 Given an item from a namespace path, resolve it to a Traversable.
142 path_str might be a directory on the filesystem or a path to a
143 zipfile plus the path within the zipfile, e.g. ``/foo/bar`` or
144 ``/foo/baz.zip/inner_dir`` or ``foo\baz.zip\inner_dir\sub``.
145 """
146 (dir,) = (cand for cand in cls._candidate_paths(path_str) if cand.is_dir())
147 return dir
149 @classmethod
150 def _candidate_paths(cls, path_str):
151 yield pathlib.Path(path_str)
152 yield from cls._resolve_zip_path(path_str)
154 @staticmethod
155 def _resolve_zip_path(path_str):
156 for match in reversed(list(re.finditer(r'[\\/]', path_str))):
157 with contextlib.suppress(
158 FileNotFoundError, IsADirectoryError, PermissionError
159 ):
160 inner = path_str[match.end() :].replace('\\', '/') + '/'
161 yield ZipPath(path_str[: match.start()], inner.lstrip('/'))
163 def resource_path(self, resource):
164 """
165 Return the file system path to prevent
166 `resources.path()` from creating a temporary
167 copy.
168 """
169 return str(self.path.joinpath(resource))
171 def files(self):
172 return self.path