Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/importlib_resources/_common.py: 52%
101 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:51 +0000
1import os
2import pathlib
3import tempfile
4import functools
5import contextlib
6import types
7import importlib
8import inspect
9import warnings
10import itertools
12from typing import Union, Optional, cast
13from .abc import ResourceReader, Traversable
15from ._compat import wrap_spec
17Package = Union[types.ModuleType, str]
18Anchor = Package
21def package_to_anchor(func):
22 """
23 Replace 'package' parameter as 'anchor' and warn about the change.
25 Other errors should fall through.
27 >>> files('a', 'b')
28 Traceback (most recent call last):
29 TypeError: files() takes from 0 to 1 positional arguments but 2 were given
30 """
31 undefined = object()
33 @functools.wraps(func)
34 def wrapper(anchor=undefined, package=undefined):
35 if package is not undefined:
36 if anchor is not undefined:
37 return func(anchor, package)
38 warnings.warn(
39 "First parameter to files is renamed to 'anchor'",
40 DeprecationWarning,
41 stacklevel=2,
42 )
43 return func(package)
44 elif anchor is undefined:
45 return func()
46 return func(anchor)
48 return wrapper
51@package_to_anchor
52def files(anchor: Optional[Anchor] = None) -> Traversable:
53 """
54 Get a Traversable resource for an anchor.
55 """
56 return from_package(resolve(anchor))
59def get_resource_reader(package: types.ModuleType) -> Optional[ResourceReader]:
60 """
61 Return the package's loader if it's a ResourceReader.
62 """
63 # We can't use
64 # a issubclass() check here because apparently abc.'s __subclasscheck__()
65 # hook wants to create a weak reference to the object, but
66 # zipimport.zipimporter does not support weak references, resulting in a
67 # TypeError. That seems terrible.
68 spec = package.__spec__
69 reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore
70 if reader is None:
71 return None
72 return reader(spec.name) # type: ignore
75@functools.singledispatch
76def resolve(cand: Optional[Anchor]) -> types.ModuleType:
77 return cast(types.ModuleType, cand)
80@resolve.register
81def _(cand: str) -> types.ModuleType:
82 return importlib.import_module(cand)
85@resolve.register
86def _(cand: None) -> types.ModuleType:
87 return resolve(_infer_caller().f_globals['__name__'])
90def _infer_caller():
91 """
92 Walk the stack and find the frame of the first caller not in this module.
93 """
95 def is_this_file(frame_info):
96 return frame_info.filename == __file__
98 def is_wrapper(frame_info):
99 return frame_info.function == 'wrapper'
101 not_this_file = itertools.filterfalse(is_this_file, inspect.stack())
102 # also exclude 'wrapper' due to singledispatch in the call stack
103 callers = itertools.filterfalse(is_wrapper, not_this_file)
104 return next(callers).frame
107def from_package(package: types.ModuleType):
108 """
109 Return a Traversable object for the given package.
111 """
112 spec = wrap_spec(package)
113 reader = spec.loader.get_resource_reader(spec.name)
114 return reader.files()
117@contextlib.contextmanager
118def _tempfile(
119 reader,
120 suffix='',
121 # gh-93353: Keep a reference to call os.remove() in late Python
122 # finalization.
123 *,
124 _os_remove=os.remove,
125):
126 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
127 # blocks due to the need to close the temporary file to work on Windows
128 # properly.
129 fd, raw_path = tempfile.mkstemp(suffix=suffix)
130 try:
131 try:
132 os.write(fd, reader())
133 finally:
134 os.close(fd)
135 del reader
136 yield pathlib.Path(raw_path)
137 finally:
138 try:
139 _os_remove(raw_path)
140 except FileNotFoundError:
141 pass
144def _temp_file(path):
145 return _tempfile(path.read_bytes, suffix=path.name)
148def _is_present_dir(path: Traversable) -> bool:
149 """
150 Some Traversables implement ``is_dir()`` to raise an
151 exception (i.e. ``FileNotFoundError``) when the
152 directory doesn't exist. This function wraps that call
153 to always return a boolean and only return True
154 if there's a dir and it exists.
155 """
156 with contextlib.suppress(FileNotFoundError):
157 return path.is_dir()
158 return False
161@functools.singledispatch
162def as_file(path):
163 """
164 Given a Traversable object, return that object as a
165 path on the local file system in a context manager.
166 """
167 return _temp_dir(path) if _is_present_dir(path) else _temp_file(path)
170@as_file.register(pathlib.Path)
171@contextlib.contextmanager
172def _(path):
173 """
174 Degenerate behavior for pathlib.Path objects.
175 """
176 yield path
179@contextlib.contextmanager
180def _temp_path(dir: tempfile.TemporaryDirectory):
181 """
182 Wrap tempfile.TemporyDirectory to return a pathlib object.
183 """
184 with dir as result:
185 yield pathlib.Path(result)
188@contextlib.contextmanager
189def _temp_dir(path):
190 """
191 Given a traversable dir, recursively replicate the whole tree
192 to the file system in a context manager.
193 """
194 assert path.is_dir()
195 with _temp_path(tempfile.TemporaryDirectory()) as temp_dir:
196 yield _write_contents(temp_dir, path)
199def _write_contents(target, source):
200 child = target.joinpath(source.name)
201 if source.is_dir():
202 child.mkdir()
203 for item in source.iterdir():
204 _write_contents(child, item)
205 else:
206 child.write_bytes(source.read_bytes())
207 return child