1import os
2import pathlib
3import tempfile
4import functools
5import contextlib
6import types
7import importlib
8import inspect
9import warnings
10import itertools
11
12from typing import Union, Optional, cast
13from .abc import ResourceReader, Traversable
14
15Package = Union[types.ModuleType, str]
16Anchor = Package
17
18
19def package_to_anchor(func):
20 """
21 Replace 'package' parameter as 'anchor' and warn about the change.
22
23 Other errors should fall through.
24
25 >>> files('a', 'b')
26 Traceback (most recent call last):
27 TypeError: files() takes from 0 to 1 positional arguments but 2 were given
28
29 Remove this compatibility in Python 3.14.
30 """
31 undefined = object()
32
33 @functools.wraps(func)
34 def wrapper(anchor=undefined, package=undefined):
35 if package is not undefined:
36 if anchor is not undefined:
37 return func(anchor, package)
38 warnings.warn(
39 "First parameter to files is renamed to 'anchor'",
40 DeprecationWarning,
41 stacklevel=2,
42 )
43 return func(package)
44 elif anchor is undefined:
45 return func()
46 return func(anchor)
47
48 return wrapper
49
50
51@package_to_anchor
52def files(anchor: Optional[Anchor] = None) -> Traversable:
53 """
54 Get a Traversable resource for an anchor.
55 """
56 return from_package(resolve(anchor))
57
58
59def get_resource_reader(package: types.ModuleType) -> Optional[ResourceReader]:
60 """
61 Return the package's loader if it's a ResourceReader.
62 """
63 # We can't use
64 # a issubclass() check here because apparently abc.'s __subclasscheck__()
65 # hook wants to create a weak reference to the object, but
66 # zipimport.zipimporter does not support weak references, resulting in a
67 # TypeError. That seems terrible.
68 spec = package.__spec__
69 reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore[union-attr]
70 if reader is None:
71 return None
72 return reader(spec.name) # type: ignore[union-attr]
73
74
75@functools.singledispatch
76def resolve(cand: Optional[Anchor]) -> types.ModuleType:
77 return cast(types.ModuleType, cand)
78
79
80@resolve.register
81def _(cand: str) -> types.ModuleType:
82 return importlib.import_module(cand)
83
84
85@resolve.register
86def _(cand: None) -> types.ModuleType:
87 return resolve(_infer_caller().f_globals['__name__'])
88
89
90def _infer_caller():
91 """
92 Walk the stack and find the frame of the first caller not in this module.
93 """
94
95 def is_this_file(frame_info):
96 return frame_info.filename == stack[0].filename
97
98 def is_wrapper(frame_info):
99 return frame_info.function == 'wrapper'
100
101 stack = inspect.stack()
102 not_this_file = itertools.filterfalse(is_this_file, stack)
103 # also exclude 'wrapper' due to singledispatch in the call stack
104 callers = itertools.filterfalse(is_wrapper, not_this_file)
105 return next(callers).frame
106
107
108def from_package(package: types.ModuleType):
109 """
110 Return a Traversable object for the given package.
111
112 """
113 # deferred for performance (python/cpython#109829)
114 from .future.adapters import wrap_spec
115
116 spec = wrap_spec(package)
117 reader = spec.loader.get_resource_reader(spec.name)
118 return reader.files()
119
120
121@contextlib.contextmanager
122def _tempfile(
123 reader,
124 suffix='',
125 # gh-93353: Keep a reference to call os.remove() in late Python
126 # finalization.
127 *,
128 _os_remove=os.remove,
129):
130 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
131 # blocks due to the need to close the temporary file to work on Windows
132 # properly.
133 fd, raw_path = tempfile.mkstemp(suffix=suffix)
134 try:
135 try:
136 os.write(fd, reader())
137 finally:
138 os.close(fd)
139 del reader
140 yield pathlib.Path(raw_path)
141 finally:
142 try:
143 _os_remove(raw_path)
144 except FileNotFoundError:
145 pass
146
147
148def _temp_file(path):
149 return _tempfile(path.read_bytes, suffix=path.name)
150
151
152def _is_present_dir(path: Traversable) -> bool:
153 """
154 Some Traversables implement ``is_dir()`` to raise an
155 exception (i.e. ``FileNotFoundError``) when the
156 directory doesn't exist. This function wraps that call
157 to always return a boolean and only return True
158 if there's a dir and it exists.
159 """
160 with contextlib.suppress(FileNotFoundError):
161 return path.is_dir()
162 return False
163
164
165@functools.singledispatch
166def as_file(path):
167 """
168 Given a Traversable object, return that object as a
169 path on the local file system in a context manager.
170 """
171 return _temp_dir(path) if _is_present_dir(path) else _temp_file(path)
172
173
174@as_file.register(pathlib.Path)
175@contextlib.contextmanager
176def _(path):
177 """
178 Degenerate behavior for pathlib.Path objects.
179 """
180 yield path
181
182
183@contextlib.contextmanager
184def _temp_path(dir: tempfile.TemporaryDirectory):
185 """
186 Wrap tempfile.TemporyDirectory to return a pathlib object.
187 """
188 with dir as result:
189 yield pathlib.Path(result)
190
191
192@contextlib.contextmanager
193def _temp_dir(path):
194 """
195 Given a traversable dir, recursively replicate the whole tree
196 to the file system in a context manager.
197 """
198 assert path.is_dir()
199 with _temp_path(tempfile.TemporaryDirectory()) as temp_dir:
200 yield _write_contents(temp_dir, path)
201
202
203def _write_contents(target, source):
204 child = target.joinpath(source.name)
205 if source.is_dir():
206 child.mkdir()
207 for item in source.iterdir():
208 _write_contents(child, item)
209 else:
210 child.write_bytes(source.read_bytes())
211 return child