1from contextlib import suppress 
    2from io import TextIOWrapper 
    3 
    4from . import abc 
    5 
    6 
    7class SpecLoaderAdapter: 
    8    """ 
    9    Adapt a package spec to adapt the underlying loader. 
    10    """ 
    11 
    12    def __init__(self, spec, adapter=lambda spec: spec.loader): 
    13        self.spec = spec 
    14        self.loader = adapter(spec) 
    15 
    16    def __getattr__(self, name): 
    17        return getattr(self.spec, name) 
    18 
    19 
    20class TraversableResourcesLoader: 
    21    """ 
    22    Adapt a loader to provide TraversableResources. 
    23    """ 
    24 
    25    def __init__(self, spec): 
    26        self.spec = spec 
    27 
    28    def get_resource_reader(self, name): 
    29        return CompatibilityFiles(self.spec)._native() 
    30 
    31 
    32def _io_wrapper(file, mode='r', *args, **kwargs): 
    33    if mode == 'r': 
    34        return TextIOWrapper(file, *args, **kwargs) 
    35    elif mode == 'rb': 
    36        return file 
    37    raise ValueError(f"Invalid mode value '{mode}', only 'r' and 'rb' are supported") 
    38 
    39 
    40class CompatibilityFiles: 
    41    """ 
    42    Adapter for an existing or non-existent resource reader 
    43    to provide a compatibility .files(). 
    44    """ 
    45 
    46    class SpecPath(abc.Traversable): 
    47        """ 
    48        Path tied to a module spec. 
    49        Can be read and exposes the resource reader children. 
    50        """ 
    51 
    52        def __init__(self, spec, reader): 
    53            self._spec = spec 
    54            self._reader = reader 
    55 
    56        def iterdir(self): 
    57            if not self._reader: 
    58                return iter(()) 
    59            return iter( 
    60                CompatibilityFiles.ChildPath(self._reader, path) 
    61                for path in self._reader.contents() 
    62            ) 
    63 
    64        def is_file(self): 
    65            return False 
    66 
    67        is_dir = is_file 
    68 
    69        def joinpath(self, other): 
    70            if not self._reader: 
    71                return CompatibilityFiles.OrphanPath(other) 
    72            return CompatibilityFiles.ChildPath(self._reader, other) 
    73 
    74        @property 
    75        def name(self): 
    76            return self._spec.name 
    77 
    78        def open(self, mode='r', *args, **kwargs): 
    79            return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs) 
    80 
    81    class ChildPath(abc.Traversable): 
    82        """ 
    83        Path tied to a resource reader child. 
    84        Can be read but doesn't expose any meaningful children. 
    85        """ 
    86 
    87        def __init__(self, reader, name): 
    88            self._reader = reader 
    89            self._name = name 
    90 
    91        def iterdir(self): 
    92            return iter(()) 
    93 
    94        def is_file(self): 
    95            return self._reader.is_resource(self.name) 
    96 
    97        def is_dir(self): 
    98            return not self.is_file() 
    99 
    100        def joinpath(self, other): 
    101            return CompatibilityFiles.OrphanPath(self.name, other) 
    102 
    103        @property 
    104        def name(self): 
    105            return self._name 
    106 
    107        def open(self, mode='r', *args, **kwargs): 
    108            return _io_wrapper( 
    109                self._reader.open_resource(self.name), mode, *args, **kwargs 
    110            ) 
    111 
    112    class OrphanPath(abc.Traversable): 
    113        """ 
    114        Orphan path, not tied to a module spec or resource reader. 
    115        Can't be read and doesn't expose any meaningful children. 
    116        """ 
    117 
    118        def __init__(self, *path_parts): 
    119            if len(path_parts) < 1: 
    120                raise ValueError('Need at least one path part to construct a path') 
    121            self._path = path_parts 
    122 
    123        def iterdir(self): 
    124            return iter(()) 
    125 
    126        def is_file(self): 
    127            return False 
    128 
    129        is_dir = is_file 
    130 
    131        def joinpath(self, other): 
    132            return CompatibilityFiles.OrphanPath(*self._path, other) 
    133 
    134        @property 
    135        def name(self): 
    136            return self._path[-1] 
    137 
    138        def open(self, mode='r', *args, **kwargs): 
    139            raise FileNotFoundError("Can't open orphan path") 
    140 
    141    def __init__(self, spec): 
    142        self.spec = spec 
    143 
    144    @property 
    145    def _reader(self): 
    146        with suppress(AttributeError): 
    147            return self.spec.loader.get_resource_reader(self.spec.name) 
    148 
    149    def _native(self): 
    150        """ 
    151        Return the native reader if it supports files(). 
    152        """ 
    153        reader = self._reader 
    154        return reader if hasattr(reader, 'files') else self 
    155 
    156    def __getattr__(self, attr): 
    157        return getattr(self._reader, attr) 
    158 
    159    def files(self): 
    160        return CompatibilityFiles.SpecPath(self.spec, self._reader) 
    161 
    162 
    163def wrap_spec(package): 
    164    """ 
    165    Construct a package spec with traversable compatibility 
    166    on the spec/loader/reader. 
    167    """ 
    168    return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader)