1from __future__ import annotations
2
3import collections
4import contextlib
5import itertools
6import pathlib
7import operator
8import re
9import warnings
10from collections.abc import Iterator
11
12from . import abc
13
14from ._itertools import only
15from .compat.py39 import ZipPath
16
17
18def remove_duplicates(items):
19 return iter(collections.OrderedDict.fromkeys(items))
20
21
22class FileReader(abc.TraversableResources):
23 def __init__(self, loader):
24 self.path = pathlib.Path(loader.path).parent
25
26 def resource_path(self, resource):
27 """
28 Return the file system path to prevent
29 `resources.path()` from creating a temporary
30 copy.
31 """
32 return str(self.path.joinpath(resource))
33
34 def files(self):
35 return self.path
36
37
38class ZipReader(abc.TraversableResources):
39 def __init__(self, loader, module):
40 self.prefix = loader.prefix.replace('\\', '/')
41 if loader.is_package(module):
42 _, _, name = module.rpartition('.')
43 self.prefix += name + '/'
44 self.archive = loader.archive
45
46 def open_resource(self, resource):
47 try:
48 return super().open_resource(resource)
49 except KeyError as exc:
50 raise FileNotFoundError(exc.args[0])
51
52 def is_resource(self, path):
53 """
54 Workaround for `zipfile.Path.is_file` returning true
55 for non-existent paths.
56 """
57 target = self.files().joinpath(path)
58 return target.is_file() and target.exists()
59
60 def files(self):
61 return ZipPath(self.archive, self.prefix)
62
63
64class MultiplexedPath(abc.Traversable):
65 """
66 Given a series of Traversable objects, implement a merged
67 version of the interface across all objects. Useful for
68 namespace packages which may be multihomed at a single
69 name.
70 """
71
72 def __init__(self, *paths):
73 self._paths = list(map(_ensure_traversable, remove_duplicates(paths)))
74 if not self._paths:
75 message = 'MultiplexedPath must contain at least one path'
76 raise FileNotFoundError(message)
77 if not all(path.is_dir() for path in self._paths):
78 raise NotADirectoryError('MultiplexedPath only supports directories')
79
80 def iterdir(self):
81 children = (child for path in self._paths for child in path.iterdir())
82 by_name = operator.attrgetter('name')
83 groups = itertools.groupby(sorted(children, key=by_name), key=by_name)
84 return map(self._follow, (locs for name, locs in groups))
85
86 def read_bytes(self):
87 raise FileNotFoundError(f'{self} is not a file')
88
89 def read_text(self, *args, **kwargs):
90 raise FileNotFoundError(f'{self} is not a file')
91
92 def is_dir(self):
93 return True
94
95 def is_file(self):
96 return False
97
98 def joinpath(self, *descendants):
99 try:
100 return super().joinpath(*descendants)
101 except abc.TraversalError:
102 # One of the paths did not resolve (a directory does not exist).
103 # Just return something that will not exist.
104 return self._paths[0].joinpath(*descendants)
105
106 @classmethod
107 def _follow(cls, children):
108 """
109 Construct a MultiplexedPath if needed.
110
111 If children contains a sole element, return it.
112 Otherwise, return a MultiplexedPath of the items.
113 Unless one of the items is not a Directory, then return the first.
114 """
115 subdirs, one_dir, one_file = itertools.tee(children, 3)
116
117 try:
118 return only(one_dir)
119 except ValueError:
120 try:
121 return cls(*subdirs)
122 except NotADirectoryError:
123 return next(one_file)
124
125 def open(self, *args, **kwargs):
126 raise FileNotFoundError(f'{self} is not a file')
127
128 @property
129 def name(self):
130 return self._paths[0].name
131
132 def __repr__(self):
133 paths = ', '.join(f"'{path}'" for path in self._paths)
134 return f'MultiplexedPath({paths})'
135
136
137class NamespaceReader(abc.TraversableResources):
138 def __init__(self, namespace_path):
139 if 'NamespacePath' not in str(namespace_path):
140 raise ValueError('Invalid path')
141 self.path = MultiplexedPath(*filter(bool, map(self._resolve, namespace_path)))
142
143 @classmethod
144 def _resolve(cls, path_str) -> abc.Traversable | None:
145 r"""
146 Given an item from a namespace path, resolve it to a Traversable.
147
148 path_str might be a directory on the filesystem or a path to a
149 zipfile plus the path within the zipfile, e.g. ``/foo/bar`` or
150 ``/foo/baz.zip/inner_dir`` or ``foo\baz.zip\inner_dir\sub``.
151
152 path_str might also be a sentinel used by editable packages to
153 trigger other behaviors (see python/importlib_resources#311).
154 In that case, return None.
155 """
156 dirs = (cand for cand in cls._candidate_paths(path_str) if cand.is_dir())
157 return next(dirs, None)
158
159 @classmethod
160 def _candidate_paths(cls, path_str: str) -> Iterator[abc.Traversable]:
161 yield pathlib.Path(path_str)
162 yield from cls._resolve_zip_path(path_str)
163
164 @staticmethod
165 def _resolve_zip_path(path_str: str):
166 for match in reversed(list(re.finditer(r'[\\/]', path_str))):
167 with contextlib.suppress(
168 FileNotFoundError,
169 IsADirectoryError,
170 NotADirectoryError,
171 PermissionError,
172 ):
173 inner = path_str[match.end() :].replace('\\', '/') + '/'
174 yield ZipPath(path_str[: match.start()], inner.lstrip('/'))
175
176 def resource_path(self, resource):
177 """
178 Return the file system path to prevent
179 `resources.path()` from creating a temporary
180 copy.
181 """
182 return str(self.path.joinpath(resource))
183
184 def files(self):
185 return self.path
186
187
188def _ensure_traversable(path):
189 """
190 Convert deprecated string arguments to traversables (pathlib.Path).
191
192 Remove with Python 3.15.
193 """
194 if not isinstance(path, str):
195 return path
196
197 warnings.warn(
198 "String arguments are deprecated. Pass a Traversable instead.",
199 DeprecationWarning,
200 stacklevel=3,
201 )
202
203 return pathlib.Path(path)