1import collections
2import contextlib
3import itertools
4import pathlib
5import operator
6import re
7import warnings
8
9from . import abc
10
11from ._itertools import only
12from .compat.py39 import ZipPath
13
14
15def remove_duplicates(items):
16 return iter(collections.OrderedDict.fromkeys(items))
17
18
19class FileReader(abc.TraversableResources):
20 def __init__(self, loader):
21 self.path = pathlib.Path(loader.path).parent
22
23 def resource_path(self, resource):
24 """
25 Return the file system path to prevent
26 `resources.path()` from creating a temporary
27 copy.
28 """
29 return str(self.path.joinpath(resource))
30
31 def files(self):
32 return self.path
33
34
35class ZipReader(abc.TraversableResources):
36 def __init__(self, loader, module):
37 _, _, name = module.rpartition('.')
38 self.prefix = loader.prefix.replace('\\', '/') + name + '/'
39 self.archive = loader.archive
40
41 def open_resource(self, resource):
42 try:
43 return super().open_resource(resource)
44 except KeyError as exc:
45 raise FileNotFoundError(exc.args[0])
46
47 def is_resource(self, path):
48 """
49 Workaround for `zipfile.Path.is_file` returning true
50 for non-existent paths.
51 """
52 target = self.files().joinpath(path)
53 return target.is_file() and target.exists()
54
55 def files(self):
56 return ZipPath(self.archive, self.prefix)
57
58
59class MultiplexedPath(abc.Traversable):
60 """
61 Given a series of Traversable objects, implement a merged
62 version of the interface across all objects. Useful for
63 namespace packages which may be multihomed at a single
64 name.
65 """
66
67 def __init__(self, *paths):
68 self._paths = list(map(_ensure_traversable, remove_duplicates(paths)))
69 if not self._paths:
70 message = 'MultiplexedPath must contain at least one path'
71 raise FileNotFoundError(message)
72 if not all(path.is_dir() for path in self._paths):
73 raise NotADirectoryError('MultiplexedPath only supports directories')
74
75 def iterdir(self):
76 children = (child for path in self._paths for child in path.iterdir())
77 by_name = operator.attrgetter('name')
78 groups = itertools.groupby(sorted(children, key=by_name), key=by_name)
79 return map(self._follow, (locs for name, locs in groups))
80
81 def read_bytes(self):
82 raise FileNotFoundError(f'{self} is not a file')
83
84 def read_text(self, *args, **kwargs):
85 raise FileNotFoundError(f'{self} is not a file')
86
87 def is_dir(self):
88 return True
89
90 def is_file(self):
91 return False
92
93 def joinpath(self, *descendants):
94 try:
95 return super().joinpath(*descendants)
96 except abc.TraversalError:
97 # One of the paths did not resolve (a directory does not exist).
98 # Just return something that will not exist.
99 return self._paths[0].joinpath(*descendants)
100
101 @classmethod
102 def _follow(cls, children):
103 """
104 Construct a MultiplexedPath if needed.
105
106 If children contains a sole element, return it.
107 Otherwise, return a MultiplexedPath of the items.
108 Unless one of the items is not a Directory, then return the first.
109 """
110 subdirs, one_dir, one_file = itertools.tee(children, 3)
111
112 try:
113 return only(one_dir)
114 except ValueError:
115 try:
116 return cls(*subdirs)
117 except NotADirectoryError:
118 return next(one_file)
119
120 def open(self, *args, **kwargs):
121 raise FileNotFoundError(f'{self} is not a file')
122
123 @property
124 def name(self):
125 return self._paths[0].name
126
127 def __repr__(self):
128 paths = ', '.join(f"'{path}'" for path in self._paths)
129 return f'MultiplexedPath({paths})'
130
131
132class NamespaceReader(abc.TraversableResources):
133 def __init__(self, namespace_path):
134 if 'NamespacePath' not in str(namespace_path):
135 raise ValueError('Invalid path')
136 self.path = MultiplexedPath(*map(self._resolve, namespace_path))
137
138 @classmethod
139 def _resolve(cls, path_str) -> abc.Traversable:
140 r"""
141 Given an item from a namespace path, resolve it to a Traversable.
142
143 path_str might be a directory on the filesystem or a path to a
144 zipfile plus the path within the zipfile, e.g. ``/foo/bar`` or
145 ``/foo/baz.zip/inner_dir`` or ``foo\baz.zip\inner_dir\sub``.
146 """
147 (dir,) = (cand for cand in cls._candidate_paths(path_str) if cand.is_dir())
148 return dir
149
150 @classmethod
151 def _candidate_paths(cls, path_str):
152 yield pathlib.Path(path_str)
153 yield from cls._resolve_zip_path(path_str)
154
155 @staticmethod
156 def _resolve_zip_path(path_str):
157 for match in reversed(list(re.finditer(r'[\\/]', path_str))):
158 with contextlib.suppress(
159 FileNotFoundError,
160 IsADirectoryError,
161 NotADirectoryError,
162 PermissionError,
163 ):
164 inner = path_str[match.end() :].replace('\\', '/') + '/'
165 yield ZipPath(path_str[: match.start()], inner.lstrip('/'))
166
167 def resource_path(self, resource):
168 """
169 Return the file system path to prevent
170 `resources.path()` from creating a temporary
171 copy.
172 """
173 return str(self.path.joinpath(resource))
174
175 def files(self):
176 return self.path
177
178
179def _ensure_traversable(path):
180 """
181 Convert deprecated string arguments to traversables (pathlib.Path).
182
183 Remove with Python 3.15.
184 """
185 if not isinstance(path, str):
186 return path
187
188 warnings.warn(
189 "String arguments are deprecated. Pass a Traversable instead.",
190 DeprecationWarning,
191 stacklevel=3,
192 )
193
194 return pathlib.Path(path)