1import os
2import pathlib
3import tempfile
4import functools
5import contextlib
6import types
7import importlib
8import inspect
9import warnings
10import itertools
11
12from typing import Union, Optional, cast
13from .abc import ResourceReader, Traversable
14
15Package = Union[types.ModuleType, str]
16Anchor = Package
17
18
19def package_to_anchor(func):
20 """
21 Replace 'package' parameter as 'anchor' and warn about the change.
22
23 Other errors should fall through.
24
25 >>> files('a', 'b')
26 Traceback (most recent call last):
27 TypeError: files() takes from 0 to 1 positional arguments but 2 were given
28
29 Remove this compatibility in Python 3.14.
30 """
31 undefined = object()
32
33 @functools.wraps(func)
34 def wrapper(anchor=undefined, package=undefined):
35 if package is not undefined:
36 if anchor is not undefined:
37 return func(anchor, package)
38 warnings.warn(
39 "First parameter to files is renamed to 'anchor'",
40 DeprecationWarning,
41 stacklevel=2,
42 )
43 return func(package)
44 elif anchor is undefined:
45 return func()
46 return func(anchor)
47
48 return wrapper
49
50
51@package_to_anchor
52def files(anchor: Optional[Anchor] = None) -> Traversable:
53 """
54 Get a Traversable resource for an anchor.
55 """
56 return from_package(resolve(anchor))
57
58
59def get_resource_reader(package: types.ModuleType) -> Optional[ResourceReader]:
60 """
61 Return the package's loader if it's a ResourceReader.
62 """
63 # We can't use
64 # a issubclass() check here because apparently abc.'s __subclasscheck__()
65 # hook wants to create a weak reference to the object, but
66 # zipimport.zipimporter does not support weak references, resulting in a
67 # TypeError. That seems terrible.
68 spec = package.__spec__
69 reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore
70 if reader is None:
71 return None
72 return reader(spec.name) # type: ignore
73
74
75@functools.singledispatch
76def resolve(cand: Optional[Anchor]) -> types.ModuleType:
77 return cast(types.ModuleType, cand)
78
79
80@resolve.register
81def _(cand: str) -> types.ModuleType:
82 return importlib.import_module(cand)
83
84
85@resolve.register
86def _(cand: None) -> types.ModuleType:
87 return resolve(_infer_caller().f_globals['__name__'])
88
89
90def _infer_caller():
91 """
92 Walk the stack and find the frame of the first caller not in this module.
93 """
94
95 def is_this_file(frame_info):
96 return frame_info.filename == __file__
97
98 def is_wrapper(frame_info):
99 return frame_info.function == 'wrapper'
100
101 not_this_file = itertools.filterfalse(is_this_file, inspect.stack())
102 # also exclude 'wrapper' due to singledispatch in the call stack
103 callers = itertools.filterfalse(is_wrapper, not_this_file)
104 return next(callers).frame
105
106
107def from_package(package: types.ModuleType):
108 """
109 Return a Traversable object for the given package.
110
111 """
112 # deferred for performance (python/cpython#109829)
113 from .future.adapters import wrap_spec
114
115 spec = wrap_spec(package)
116 reader = spec.loader.get_resource_reader(spec.name)
117 return reader.files()
118
119
120@contextlib.contextmanager
121def _tempfile(
122 reader,
123 suffix='',
124 # gh-93353: Keep a reference to call os.remove() in late Python
125 # finalization.
126 *,
127 _os_remove=os.remove,
128):
129 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
130 # blocks due to the need to close the temporary file to work on Windows
131 # properly.
132 fd, raw_path = tempfile.mkstemp(suffix=suffix)
133 try:
134 try:
135 os.write(fd, reader())
136 finally:
137 os.close(fd)
138 del reader
139 yield pathlib.Path(raw_path)
140 finally:
141 try:
142 _os_remove(raw_path)
143 except FileNotFoundError:
144 pass
145
146
147def _temp_file(path):
148 return _tempfile(path.read_bytes, suffix=path.name)
149
150
151def _is_present_dir(path: Traversable) -> bool:
152 """
153 Some Traversables implement ``is_dir()`` to raise an
154 exception (i.e. ``FileNotFoundError``) when the
155 directory doesn't exist. This function wraps that call
156 to always return a boolean and only return True
157 if there's a dir and it exists.
158 """
159 with contextlib.suppress(FileNotFoundError):
160 return path.is_dir()
161 return False
162
163
164@functools.singledispatch
165def as_file(path):
166 """
167 Given a Traversable object, return that object as a
168 path on the local file system in a context manager.
169 """
170 return _temp_dir(path) if _is_present_dir(path) else _temp_file(path)
171
172
173@as_file.register(pathlib.Path)
174@contextlib.contextmanager
175def _(path):
176 """
177 Degenerate behavior for pathlib.Path objects.
178 """
179 yield path
180
181
182@contextlib.contextmanager
183def _temp_path(dir: tempfile.TemporaryDirectory):
184 """
185 Wrap tempfile.TemporyDirectory to return a pathlib object.
186 """
187 with dir as result:
188 yield pathlib.Path(result)
189
190
191@contextlib.contextmanager
192def _temp_dir(path):
193 """
194 Given a traversable dir, recursively replicate the whole tree
195 to the file system in a context manager.
196 """
197 assert path.is_dir()
198 with _temp_path(tempfile.TemporaryDirectory()) as temp_dir:
199 yield _write_contents(temp_dir, path)
200
201
202def _write_contents(target, source):
203 child = target.joinpath(source.name)
204 if source.is_dir():
205 child.mkdir()
206 for item in source.iterdir():
207 _write_contents(child, item)
208 else:
209 child.write_bytes(source.read_bytes())
210 return child