1import os 
    2import pathlib 
    3import tempfile 
    4import functools 
    5import contextlib 
    6import types 
    7import importlib 
    8import inspect 
    9import warnings 
    10import itertools 
    11 
    12from typing import Union, Optional, cast 
    13from .abc import ResourceReader, Traversable 
    14 
    15Package = Union[types.ModuleType, str] 
    16Anchor = Package 
    17 
    18 
    19def package_to_anchor(func): 
    20    """ 
    21    Replace 'package' parameter as 'anchor' and warn about the change. 
    22 
    23    Other errors should fall through. 
    24 
    25    >>> files('a', 'b') 
    26    Traceback (most recent call last): 
    27    TypeError: files() takes from 0 to 1 positional arguments but 2 were given 
    28 
    29    Remove this compatibility in Python 3.14. 
    30    """ 
    31    undefined = object() 
    32 
    33    @functools.wraps(func) 
    34    def wrapper(anchor=undefined, package=undefined): 
    35        if package is not undefined: 
    36            if anchor is not undefined: 
    37                return func(anchor, package) 
    38            warnings.warn( 
    39                "First parameter to files is renamed to 'anchor'", 
    40                DeprecationWarning, 
    41                stacklevel=2, 
    42            ) 
    43            return func(package) 
    44        elif anchor is undefined: 
    45            return func() 
    46        return func(anchor) 
    47 
    48    return wrapper 
    49 
    50 
    51@package_to_anchor 
    52def files(anchor: Optional[Anchor] = None) -> Traversable: 
    53    """ 
    54    Get a Traversable resource for an anchor. 
    55    """ 
    56    return from_package(resolve(anchor)) 
    57 
    58 
    59def get_resource_reader(package: types.ModuleType) -> Optional[ResourceReader]: 
    60    """ 
    61    Return the package's loader if it's a ResourceReader. 
    62    """ 
    63    # We can't use 
    64    # a issubclass() check here because apparently abc.'s __subclasscheck__() 
    65    # hook wants to create a weak reference to the object, but 
    66    # zipimport.zipimporter does not support weak references, resulting in a 
    67    # TypeError.  That seems terrible. 
    68    spec = package.__spec__ 
    69    reader = getattr(spec.loader, 'get_resource_reader', None)  # type: ignore[union-attr] 
    70    if reader is None: 
    71        return None 
    72    return reader(spec.name)  # type: ignore[union-attr] 
    73 
    74 
    75@functools.singledispatch 
    76def resolve(cand: Optional[Anchor]) -> types.ModuleType: 
    77    return cast(types.ModuleType, cand) 
    78 
    79 
    80@resolve.register 
    81def _(cand: str) -> types.ModuleType: 
    82    return importlib.import_module(cand) 
    83 
    84 
    85@resolve.register 
    86def _(cand: None) -> types.ModuleType: 
    87    return resolve(_infer_caller().f_globals['__name__']) 
    88 
    89 
    90def _infer_caller(): 
    91    """ 
    92    Walk the stack and find the frame of the first caller not in this module. 
    93    """ 
    94 
    95    def is_this_file(frame_info): 
    96        return frame_info.filename == stack[0].filename 
    97 
    98    def is_wrapper(frame_info): 
    99        return frame_info.function == 'wrapper' 
    100 
    101    stack = inspect.stack() 
    102    not_this_file = itertools.filterfalse(is_this_file, stack) 
    103    # also exclude 'wrapper' due to singledispatch in the call stack 
    104    callers = itertools.filterfalse(is_wrapper, not_this_file) 
    105    return next(callers).frame 
    106 
    107 
    108def from_package(package: types.ModuleType): 
    109    """ 
    110    Return a Traversable object for the given package. 
    111 
    112    """ 
    113    # deferred for performance (python/cpython#109829) 
    114    from .future.adapters import wrap_spec 
    115 
    116    spec = wrap_spec(package) 
    117    reader = spec.loader.get_resource_reader(spec.name) 
    118    return reader.files() 
    119 
    120 
    121@contextlib.contextmanager 
    122def _tempfile( 
    123    reader, 
    124    suffix='', 
    125    # gh-93353: Keep a reference to call os.remove() in late Python 
    126    # finalization. 
    127    *, 
    128    _os_remove=os.remove, 
    129): 
    130    # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' 
    131    # blocks due to the need to close the temporary file to work on Windows 
    132    # properly. 
    133    fd, raw_path = tempfile.mkstemp(suffix=suffix) 
    134    try: 
    135        try: 
    136            os.write(fd, reader()) 
    137        finally: 
    138            os.close(fd) 
    139        del reader 
    140        yield pathlib.Path(raw_path) 
    141    finally: 
    142        try: 
    143            _os_remove(raw_path) 
    144        except FileNotFoundError: 
    145            pass 
    146 
    147 
    148def _temp_file(path): 
    149    return _tempfile(path.read_bytes, suffix=path.name) 
    150 
    151 
    152def _is_present_dir(path: Traversable) -> bool: 
    153    """ 
    154    Some Traversables implement ``is_dir()`` to raise an 
    155    exception (i.e. ``FileNotFoundError``) when the 
    156    directory doesn't exist. This function wraps that call 
    157    to always return a boolean and only return True 
    158    if there's a dir and it exists. 
    159    """ 
    160    with contextlib.suppress(FileNotFoundError): 
    161        return path.is_dir() 
    162    return False 
    163 
    164 
    165@functools.singledispatch 
    166def as_file(path): 
    167    """ 
    168    Given a Traversable object, return that object as a 
    169    path on the local file system in a context manager. 
    170    """ 
    171    return _temp_dir(path) if _is_present_dir(path) else _temp_file(path) 
    172 
    173 
    174@as_file.register(pathlib.Path) 
    175@contextlib.contextmanager 
    176def _(path): 
    177    """ 
    178    Degenerate behavior for pathlib.Path objects. 
    179    """ 
    180    yield path 
    181 
    182 
    183@contextlib.contextmanager 
    184def _temp_path(dir: tempfile.TemporaryDirectory): 
    185    """ 
    186    Wrap tempfile.TemporyDirectory to return a pathlib object. 
    187    """ 
    188    with dir as result: 
    189        yield pathlib.Path(result) 
    190 
    191 
    192@contextlib.contextmanager 
    193def _temp_dir(path): 
    194    """ 
    195    Given a traversable dir, recursively replicate the whole tree 
    196    to the file system in a context manager. 
    197    """ 
    198    assert path.is_dir() 
    199    with _temp_path(tempfile.TemporaryDirectory()) as temp_dir: 
    200        yield _write_contents(temp_dir, path) 
    201 
    202 
    203def _write_contents(target, source): 
    204    child = target.joinpath(source.name) 
    205    if source.is_dir(): 
    206        child.mkdir() 
    207        for item in source.iterdir(): 
    208            _write_contents(child, item) 
    209    else: 
    210        child.write_bytes(source.read_bytes()) 
    211    return child