1import io 
    2import posixpath 
    3import zipfile 
    4import itertools 
    5import contextlib 
    6import pathlib 
    7import re 
    8import stat 
    9import sys 
    10 
    11from .compat.py310 import text_encoding 
    12from .glob import Translator 
    13 
    14 
    15__all__ = ['Path'] 
    16 
    17 
    18def _parents(path): 
    19    """ 
    20    Given a path with elements separated by 
    21    posixpath.sep, generate all parents of that path. 
    22 
    23    >>> list(_parents('b/d')) 
    24    ['b'] 
    25    >>> list(_parents('/b/d/')) 
    26    ['/b'] 
    27    >>> list(_parents('b/d/f/')) 
    28    ['b/d', 'b'] 
    29    >>> list(_parents('b')) 
    30    [] 
    31    >>> list(_parents('')) 
    32    [] 
    33    """ 
    34    return itertools.islice(_ancestry(path), 1, None) 
    35 
    36 
    37def _ancestry(path): 
    38    """ 
    39    Given a path with elements separated by 
    40    posixpath.sep, generate all elements of that path 
    41 
    42    >>> list(_ancestry('b/d')) 
    43    ['b/d', 'b'] 
    44    >>> list(_ancestry('/b/d/')) 
    45    ['/b/d', '/b'] 
    46    >>> list(_ancestry('b/d/f/')) 
    47    ['b/d/f', 'b/d', 'b'] 
    48    >>> list(_ancestry('b')) 
    49    ['b'] 
    50    >>> list(_ancestry('')) 
    51    [] 
    52    """ 
    53    path = path.rstrip(posixpath.sep) 
    54    while path and path != posixpath.sep: 
    55        yield path 
    56        path, tail = posixpath.split(path) 
    57 
    58 
    59_dedupe = dict.fromkeys 
    60"""Deduplicate an iterable in original order""" 
    61 
    62 
    63def _difference(minuend, subtrahend): 
    64    """ 
    65    Return items in minuend not in subtrahend, retaining order 
    66    with O(1) lookup. 
    67    """ 
    68    return itertools.filterfalse(set(subtrahend).__contains__, minuend) 
    69 
    70 
    71class InitializedState: 
    72    """ 
    73    Mix-in to save the initialization state for pickling. 
    74    """ 
    75 
    76    def __init__(self, *args, **kwargs): 
    77        self.__args = args 
    78        self.__kwargs = kwargs 
    79        super().__init__(*args, **kwargs) 
    80 
    81    def __getstate__(self): 
    82        return self.__args, self.__kwargs 
    83 
    84    def __setstate__(self, state): 
    85        args, kwargs = state 
    86        super().__init__(*args, **kwargs) 
    87 
    88 
    89class SanitizedNames: 
    90    """ 
    91    ZipFile mix-in to ensure names are sanitized. 
    92    """ 
    93 
    94    def namelist(self): 
    95        return list(map(self._sanitize, super().namelist())) 
    96 
    97    @staticmethod 
    98    def _sanitize(name): 
    99        r""" 
    100        Ensure a relative path with posix separators and no dot names. 
    101 
    102        Modeled after 
    103        https://github.com/python/cpython/blob/bcc1be39cb1d04ad9fc0bd1b9193d3972835a57c/Lib/zipfile/__init__.py#L1799-L1813 
    104        but provides consistent cross-platform behavior. 
    105 
    106        >>> san = SanitizedNames._sanitize 
    107        >>> san('/foo/bar') 
    108        'foo/bar' 
    109        >>> san('//foo.txt') 
    110        'foo.txt' 
    111        >>> san('foo/.././bar.txt') 
    112        'foo/bar.txt' 
    113        >>> san('foo../.bar.txt') 
    114        'foo../.bar.txt' 
    115        >>> san('\\foo\\bar.txt') 
    116        'foo/bar.txt' 
    117        >>> san('D:\\foo.txt') 
    118        'D/foo.txt' 
    119        >>> san('\\\\server\\share\\file.txt') 
    120        'server/share/file.txt' 
    121        >>> san('\\\\?\\GLOBALROOT\\Volume3') 
    122        '?/GLOBALROOT/Volume3' 
    123        >>> san('\\\\.\\PhysicalDrive1\\root') 
    124        'PhysicalDrive1/root' 
    125 
    126        Retain any trailing slash. 
    127        >>> san('abc/') 
    128        'abc/' 
    129 
    130        Raises a ValueError if the result is empty. 
    131        >>> san('../..') 
    132        Traceback (most recent call last): 
    133        ... 
    134        ValueError: Empty filename 
    135        """ 
    136 
    137        def allowed(part): 
    138            return part and part not in {'..', '.'} 
    139 
    140        # Remove the drive letter. 
    141        # Don't use ntpath.splitdrive, because that also strips UNC paths 
    142        bare = re.sub('^([A-Z]):', r'\1', name, flags=re.IGNORECASE) 
    143        clean = bare.replace('\\', '/') 
    144        parts = clean.split('/') 
    145        joined = '/'.join(filter(allowed, parts)) 
    146        if not joined: 
    147            raise ValueError("Empty filename") 
    148        return joined + '/' * name.endswith('/') 
    149 
    150 
    151class CompleteDirs(InitializedState, SanitizedNames, zipfile.ZipFile): 
    152    """ 
    153    A ZipFile subclass that ensures that implied directories 
    154    are always included in the namelist. 
    155 
    156    >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt'])) 
    157    ['foo/', 'foo/bar/'] 
    158    >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt', 'foo/bar/'])) 
    159    ['foo/'] 
    160    """ 
    161 
    162    @staticmethod 
    163    def _implied_dirs(names): 
    164        parents = itertools.chain.from_iterable(map(_parents, names)) 
    165        as_dirs = (p + posixpath.sep for p in parents) 
    166        return _dedupe(_difference(as_dirs, names)) 
    167 
    168    def namelist(self): 
    169        names = super().namelist() 
    170        return names + list(self._implied_dirs(names)) 
    171 
    172    def _name_set(self): 
    173        return set(self.namelist()) 
    174 
    175    def resolve_dir(self, name): 
    176        """ 
    177        If the name represents a directory, return that name 
    178        as a directory (with the trailing slash). 
    179        """ 
    180        names = self._name_set() 
    181        dirname = name + '/' 
    182        dir_match = name not in names and dirname in names 
    183        return dirname if dir_match else name 
    184 
    185    def getinfo(self, name): 
    186        """ 
    187        Supplement getinfo for implied dirs. 
    188        """ 
    189        try: 
    190            return super().getinfo(name) 
    191        except KeyError: 
    192            if not name.endswith('/') or name not in self._name_set(): 
    193                raise 
    194            return zipfile.ZipInfo(filename=name) 
    195 
    196    @classmethod 
    197    def make(cls, source): 
    198        """ 
    199        Given a source (filename or zipfile), return an 
    200        appropriate CompleteDirs subclass. 
    201        """ 
    202        if isinstance(source, CompleteDirs): 
    203            return source 
    204 
    205        if not isinstance(source, zipfile.ZipFile): 
    206            return cls(source) 
    207 
    208        # Only allow for FastLookup when supplied zipfile is read-only 
    209        if 'r' not in source.mode: 
    210            cls = CompleteDirs 
    211 
    212        source.__class__ = cls 
    213        return source 
    214 
    215    @classmethod 
    216    def inject(cls, zf: zipfile.ZipFile) -> zipfile.ZipFile: 
    217        """ 
    218        Given a writable zip file zf, inject directory entries for 
    219        any directories implied by the presence of children. 
    220        """ 
    221        for name in cls._implied_dirs(zf.namelist()): 
    222            zf.writestr(name, b"") 
    223        return zf 
    224 
    225 
    226class FastLookup(CompleteDirs): 
    227    """ 
    228    ZipFile subclass to ensure implicit 
    229    dirs exist and are resolved rapidly. 
    230    """ 
    231 
    232    def namelist(self): 
    233        with contextlib.suppress(AttributeError): 
    234            return self.__names 
    235        self.__names = super().namelist() 
    236        return self.__names 
    237 
    238    def _name_set(self): 
    239        with contextlib.suppress(AttributeError): 
    240            return self.__lookup 
    241        self.__lookup = super()._name_set() 
    242        return self.__lookup 
    243 
    244 
    245def _extract_text_encoding(encoding=None, *args, **kwargs): 
    246    # compute stack level so that the caller of the caller sees any warning. 
    247    is_pypy = sys.implementation.name == 'pypy' 
    248    stack_level = 3 + is_pypy 
    249    return text_encoding(encoding, stack_level), args, kwargs 
    250 
    251 
    252class Path: 
    253    """ 
    254    A :class:`importlib.resources.abc.Traversable` interface for zip files. 
    255 
    256    Implements many of the features users enjoy from 
    257    :class:`pathlib.Path`. 
    258 
    259    Consider a zip file with this structure:: 
    260 
    261        . 
    262        ├── a.txt 
    263        └── b 
    264            ├── c.txt 
    265            └── d 
    266                └── e.txt 
    267 
    268    >>> data = io.BytesIO() 
    269    >>> zf = zipfile.ZipFile(data, 'w') 
    270    >>> zf.writestr('a.txt', 'content of a') 
    271    >>> zf.writestr('b/c.txt', 'content of c') 
    272    >>> zf.writestr('b/d/e.txt', 'content of e') 
    273    >>> zf.filename = 'mem/abcde.zip' 
    274 
    275    Path accepts the zipfile object itself or a filename 
    276 
    277    >>> path = Path(zf) 
    278 
    279    From there, several path operations are available. 
    280 
    281    Directory iteration (including the zip file itself): 
    282 
    283    >>> a, b = path.iterdir() 
    284    >>> a 
    285    Path('mem/abcde.zip', 'a.txt') 
    286    >>> b 
    287    Path('mem/abcde.zip', 'b/') 
    288 
    289    name property: 
    290 
    291    >>> b.name 
    292    'b' 
    293 
    294    join with divide operator: 
    295 
    296    >>> c = b / 'c.txt' 
    297    >>> c 
    298    Path('mem/abcde.zip', 'b/c.txt') 
    299    >>> c.name 
    300    'c.txt' 
    301 
    302    Read text: 
    303 
    304    >>> c.read_text(encoding='utf-8') 
    305    'content of c' 
    306 
    307    existence: 
    308 
    309    >>> c.exists() 
    310    True 
    311    >>> (b / 'missing.txt').exists() 
    312    False 
    313 
    314    Coercion to string: 
    315 
    316    >>> import os 
    317    >>> str(c).replace(os.sep, posixpath.sep) 
    318    'mem/abcde.zip/b/c.txt' 
    319 
    320    At the root, ``name``, ``filename``, and ``parent`` 
    321    resolve to the zipfile. 
    322 
    323    >>> str(path) 
    324    'mem/abcde.zip/' 
    325    >>> path.name 
    326    'abcde.zip' 
    327    >>> path.filename == pathlib.Path('mem/abcde.zip') 
    328    True 
    329    >>> str(path.parent) 
    330    'mem' 
    331 
    332    If the zipfile has no filename, such attributes are not 
    333    valid and accessing them will raise an Exception. 
    334 
    335    >>> zf.filename = None 
    336    >>> path.name 
    337    Traceback (most recent call last): 
    338    ... 
    339    TypeError: ... 
    340 
    341    >>> path.filename 
    342    Traceback (most recent call last): 
    343    ... 
    344    TypeError: ... 
    345 
    346    >>> path.parent 
    347    Traceback (most recent call last): 
    348    ... 
    349    TypeError: ... 
    350 
    351    # workaround python/cpython#106763 
    352    >>> pass 
    353    """ 
    354 
    355    __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})" 
    356 
    357    def __init__(self, root, at=""): 
    358        """ 
    359        Construct a Path from a ZipFile or filename. 
    360 
    361        Note: When the source is an existing ZipFile object, 
    362        its type (__class__) will be mutated to a 
    363        specialized type. If the caller wishes to retain the 
    364        original type, the caller should either create a 
    365        separate ZipFile object or pass a filename. 
    366        """ 
    367        self.root = FastLookup.make(root) 
    368        self.at = at 
    369 
    370    def __eq__(self, other): 
    371        """ 
    372        >>> Path(zipfile.ZipFile(io.BytesIO(), 'w')) == 'foo' 
    373        False 
    374        """ 
    375        if self.__class__ is not other.__class__: 
    376            return NotImplemented 
    377        return (self.root, self.at) == (other.root, other.at) 
    378 
    379    def __hash__(self): 
    380        return hash((self.root, self.at)) 
    381 
    382    def open(self, mode='r', *args, pwd=None, **kwargs): 
    383        """ 
    384        Open this entry as text or binary following the semantics 
    385        of ``pathlib.Path.open()`` by passing arguments through 
    386        to io.TextIOWrapper(). 
    387        """ 
    388        if self.is_dir(): 
    389            raise IsADirectoryError(self) 
    390        zip_mode = mode[0] 
    391        if not self.exists() and zip_mode == 'r': 
    392            raise FileNotFoundError(self) 
    393        stream = self.root.open(self.at, zip_mode, pwd=pwd) 
    394        if 'b' in mode: 
    395            if args or kwargs: 
    396                raise ValueError("encoding args invalid for binary operation") 
    397            return stream 
    398        # Text mode: 
    399        encoding, args, kwargs = _extract_text_encoding(*args, **kwargs) 
    400        return io.TextIOWrapper(stream, encoding, *args, **kwargs) 
    401 
    402    def _base(self): 
    403        return pathlib.PurePosixPath(self.at or self.root.filename) 
    404 
    405    @property 
    406    def name(self): 
    407        return self._base().name 
    408 
    409    @property 
    410    def suffix(self): 
    411        return self._base().suffix 
    412 
    413    @property 
    414    def suffixes(self): 
    415        return self._base().suffixes 
    416 
    417    @property 
    418    def stem(self): 
    419        return self._base().stem 
    420 
    421    @property 
    422    def filename(self): 
    423        return pathlib.Path(self.root.filename).joinpath(self.at) 
    424 
    425    def read_text(self, *args, **kwargs): 
    426        encoding, args, kwargs = _extract_text_encoding(*args, **kwargs) 
    427        with self.open('r', encoding, *args, **kwargs) as strm: 
    428            return strm.read() 
    429 
    430    def read_bytes(self): 
    431        with self.open('rb') as strm: 
    432            return strm.read() 
    433 
    434    def _is_child(self, path): 
    435        return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/") 
    436 
    437    def _next(self, at): 
    438        return self.__class__(self.root, at) 
    439 
    440    def is_dir(self): 
    441        return not self.at or self.at.endswith("/") 
    442 
    443    def is_file(self): 
    444        return self.exists() and not self.is_dir() 
    445 
    446    def exists(self): 
    447        return self.at in self.root._name_set() 
    448 
    449    def iterdir(self): 
    450        if not self.is_dir(): 
    451            raise ValueError("Can't listdir a file") 
    452        subs = map(self._next, self.root.namelist()) 
    453        return filter(self._is_child, subs) 
    454 
    455    def match(self, path_pattern): 
    456        return pathlib.PurePosixPath(self.at).match(path_pattern) 
    457 
    458    def is_symlink(self): 
    459        """ 
    460        Return whether this path is a symlink. 
    461        """ 
    462        info = self.root.getinfo(self.at) 
    463        mode = info.external_attr >> 16 
    464        return stat.S_ISLNK(mode) 
    465 
    466    def glob(self, pattern): 
    467        if not pattern: 
    468            raise ValueError(f"Unacceptable pattern: {pattern!r}") 
    469 
    470        prefix = re.escape(self.at) 
    471        tr = Translator(seps='/') 
    472        matches = re.compile(prefix + tr.translate(pattern)).fullmatch 
    473        names = (data.filename for data in self.root.filelist) 
    474        return map(self._next, filter(matches, names)) 
    475 
    476    def rglob(self, pattern): 
    477        return self.glob(f'**/{pattern}') 
    478 
    479    def relative_to(self, other, *extra): 
    480        return posixpath.relpath(str(self), str(other.joinpath(*extra))) 
    481 
    482    def __str__(self): 
    483        return posixpath.join(self.root.filename, self.at) 
    484 
    485    def __repr__(self): 
    486        return self.__repr.format(self=self) 
    487 
    488    def joinpath(self, *other): 
    489        next = posixpath.join(self.at, *other) 
    490        return self._next(self.root.resolve_dir(next)) 
    491 
    492    __truediv__ = joinpath 
    493 
    494    @property 
    495    def parent(self): 
    496        if not self.at: 
    497            return self.filename.parent 
    498        parent_at = posixpath.dirname(self.at.rstrip('/')) 
    499        if parent_at: 
    500            parent_at += '/' 
    501        return self._next(parent_at)