1import datetime 
    2import io 
    3import logging 
    4import os 
    5import os.path as osp 
    6import shutil 
    7import stat 
    8import tempfile 
    9from functools import lru_cache 
    10 
    11from fsspec import AbstractFileSystem 
    12from fsspec.compression import compr 
    13from fsspec.core import get_compression 
    14from fsspec.utils import isfilelike, stringify_path 
    15 
    16logger = logging.getLogger("fsspec.local") 
    17 
    18 
    19class LocalFileSystem(AbstractFileSystem): 
    20    """Interface to files on local storage 
    21 
    22    Parameters 
    23    ---------- 
    24    auto_mkdir: bool 
    25        Whether, when opening a file, the directory containing it should 
    26        be created (if it doesn't already exist). This is assumed by pyarrow 
    27        code. 
    28    """ 
    29 
    30    root_marker = "/" 
    31    protocol = "file", "local" 
    32    local_file = True 
    33 
    34    def __init__(self, auto_mkdir=False, **kwargs): 
    35        super().__init__(**kwargs) 
    36        self.auto_mkdir = auto_mkdir 
    37 
    38    @property 
    39    def fsid(self): 
    40        return "local" 
    41 
    42    def mkdir(self, path, create_parents=True, **kwargs): 
    43        path = self._strip_protocol(path) 
    44        if self.exists(path): 
    45            raise FileExistsError(path) 
    46        if create_parents: 
    47            self.makedirs(path, exist_ok=True) 
    48        else: 
    49            os.mkdir(path, **kwargs) 
    50 
    51    def makedirs(self, path, exist_ok=False): 
    52        path = self._strip_protocol(path) 
    53        os.makedirs(path, exist_ok=exist_ok) 
    54 
    55    def rmdir(self, path): 
    56        path = self._strip_protocol(path) 
    57        os.rmdir(path) 
    58 
    59    def ls(self, path, detail=False, **kwargs): 
    60        path = self._strip_protocol(path) 
    61        path_info = self.info(path) 
    62        infos = [] 
    63        if path_info["type"] == "directory": 
    64            with os.scandir(path) as it: 
    65                for f in it: 
    66                    try: 
    67                        # Only get the info if requested since it is a bit expensive (the stat call inside) 
    68                        # The strip_protocol is also used in info() and calls make_path_posix to always return posix paths 
    69                        info = self.info(f) if detail else self._strip_protocol(f.path) 
    70                        infos.append(info) 
    71                    except FileNotFoundError: 
    72                        pass 
    73        else: 
    74            infos = [path_info] if detail else [path_info["name"]] 
    75 
    76        return infos 
    77 
    78    def info(self, path, **kwargs): 
    79        if isinstance(path, os.DirEntry): 
    80            # scandir DirEntry 
    81            out = path.stat(follow_symlinks=False) 
    82            link = path.is_symlink() 
    83            if path.is_dir(follow_symlinks=False): 
    84                t = "directory" 
    85            elif path.is_file(follow_symlinks=False): 
    86                t = "file" 
    87            else: 
    88                t = "other" 
    89 
    90            size = out.st_size 
    91            if link: 
    92                try: 
    93                    out2 = path.stat(follow_symlinks=True) 
    94                    size = out2.st_size 
    95                except OSError: 
    96                    size = 0 
    97            path = self._strip_protocol(path.path) 
    98        else: 
    99            # str or path-like 
    100            path = self._strip_protocol(path) 
    101            out = os.stat(path, follow_symlinks=False) 
    102            link = stat.S_ISLNK(out.st_mode) 
    103            if link: 
    104                out = os.stat(path, follow_symlinks=True) 
    105            size = out.st_size 
    106            if stat.S_ISDIR(out.st_mode): 
    107                t = "directory" 
    108            elif stat.S_ISREG(out.st_mode): 
    109                t = "file" 
    110            else: 
    111                t = "other" 
    112 
    113        # Check for the 'st_birthtime' attribute, which is not always present; fallback to st_ctime 
    114        created_time = getattr(out, "st_birthtime", out.st_ctime) 
    115 
    116        result = { 
    117            "name": path, 
    118            "size": size, 
    119            "type": t, 
    120            "created": created_time, 
    121            "islink": link, 
    122        } 
    123        for field in ["mode", "uid", "gid", "mtime", "ino", "nlink"]: 
    124            result[field] = getattr(out, f"st_{field}") 
    125        if link: 
    126            result["destination"] = os.readlink(path) 
    127        return result 
    128 
    129    def lexists(self, path, **kwargs): 
    130        return osp.lexists(path) 
    131 
    132    def cp_file(self, path1, path2, **kwargs): 
    133        path1 = self._strip_protocol(path1) 
    134        path2 = self._strip_protocol(path2) 
    135        if self.auto_mkdir: 
    136            self.makedirs(self._parent(path2), exist_ok=True) 
    137        if self.isfile(path1): 
    138            shutil.copyfile(path1, path2) 
    139        elif self.isdir(path1): 
    140            self.mkdirs(path2, exist_ok=True) 
    141        else: 
    142            raise FileNotFoundError(path1) 
    143 
    144    def isfile(self, path): 
    145        path = self._strip_protocol(path) 
    146        return os.path.isfile(path) 
    147 
    148    def isdir(self, path): 
    149        path = self._strip_protocol(path) 
    150        return os.path.isdir(path) 
    151 
    152    def get_file(self, path1, path2, callback=None, **kwargs): 
    153        if isfilelike(path2): 
    154            with open(path1, "rb") as f: 
    155                shutil.copyfileobj(f, path2) 
    156        else: 
    157            return self.cp_file(path1, path2, **kwargs) 
    158 
    159    def put_file(self, path1, path2, callback=None, **kwargs): 
    160        return self.cp_file(path1, path2, **kwargs) 
    161 
    162    def mv(self, path1, path2, recursive: bool = True, **kwargs): 
    163        """Move files/directories 
    164        For the specific case of local, all ops on directories are recursive and 
    165        the recursive= kwarg is ignored. 
    166        """ 
    167        path1 = self._strip_protocol(path1) 
    168        path2 = self._strip_protocol(path2) 
    169        shutil.move(path1, path2) 
    170 
    171    def link(self, src, dst, **kwargs): 
    172        src = self._strip_protocol(src) 
    173        dst = self._strip_protocol(dst) 
    174        os.link(src, dst, **kwargs) 
    175 
    176    def symlink(self, src, dst, **kwargs): 
    177        src = self._strip_protocol(src) 
    178        dst = self._strip_protocol(dst) 
    179        os.symlink(src, dst, **kwargs) 
    180 
    181    def islink(self, path) -> bool: 
    182        return os.path.islink(self._strip_protocol(path)) 
    183 
    184    def rm_file(self, path): 
    185        os.remove(self._strip_protocol(path)) 
    186 
    187    def rm(self, path, recursive=False, maxdepth=None): 
    188        if not isinstance(path, list): 
    189            path = [path] 
    190 
    191        for p in path: 
    192            p = self._strip_protocol(p) 
    193            if self.isdir(p): 
    194                if not recursive: 
    195                    raise ValueError("Cannot delete directory, set recursive=True") 
    196                if osp.abspath(p) == os.getcwd(): 
    197                    raise ValueError("Cannot delete current working directory") 
    198                shutil.rmtree(p) 
    199            else: 
    200                os.remove(p) 
    201 
    202    def unstrip_protocol(self, name): 
    203        name = self._strip_protocol(name)  # normalise for local/win/... 
    204        return f"file://{name}" 
    205 
    206    def _open(self, path, mode="rb", block_size=None, **kwargs): 
    207        path = self._strip_protocol(path) 
    208        if self.auto_mkdir and "w" in mode: 
    209            self.makedirs(self._parent(path), exist_ok=True) 
    210        return LocalFileOpener(path, mode, fs=self, **kwargs) 
    211 
    212    def touch(self, path, truncate=True, **kwargs): 
    213        path = self._strip_protocol(path) 
    214        if self.auto_mkdir: 
    215            self.makedirs(self._parent(path), exist_ok=True) 
    216        if self.exists(path): 
    217            os.utime(path, None) 
    218        else: 
    219            open(path, "a").close() 
    220        if truncate: 
    221            os.truncate(path, 0) 
    222 
    223    def created(self, path): 
    224        info = self.info(path=path) 
    225        return datetime.datetime.fromtimestamp( 
    226            info["created"], tz=datetime.timezone.utc 
    227        ) 
    228 
    229    def modified(self, path): 
    230        info = self.info(path=path) 
    231        return datetime.datetime.fromtimestamp(info["mtime"], tz=datetime.timezone.utc) 
    232 
    233    @classmethod 
    234    def _parent(cls, path): 
    235        path = cls._strip_protocol(path) 
    236        if os.sep == "/": 
    237            # posix native 
    238            return path.rsplit("/", 1)[0] or "/" 
    239        else: 
    240            # NT 
    241            path_ = path.rsplit("/", 1)[0] 
    242            if len(path_) <= 3: 
    243                if path_[1:2] == ":": 
    244                    # nt root (something like c:/) 
    245                    return path_[0] + ":/" 
    246            # More cases may be required here 
    247            return path_ 
    248 
    249    @classmethod 
    250    def _strip_protocol(cls, path): 
    251        path = stringify_path(path) 
    252        if path.startswith("file://"): 
    253            path = path[7:] 
    254        elif path.startswith("file:"): 
    255            path = path[5:] 
    256        elif path.startswith("local://"): 
    257            path = path[8:] 
    258        elif path.startswith("local:"): 
    259            path = path[6:] 
    260 
    261        path = make_path_posix(path) 
    262        if os.sep != "/": 
    263            # This code-path is a stripped down version of 
    264            # > drive, path = ntpath.splitdrive(path) 
    265            if path[1:2] == ":": 
    266                # Absolute drive-letter path, e.g. X:\Windows 
    267                # Relative path with drive, e.g. X:Windows 
    268                drive, path = path[:2], path[2:] 
    269            elif path[:2] == "//": 
    270                # UNC drives, e.g. \\server\share or \\?\UNC\server\share 
    271                # Device drives, e.g. \\.\device or \\?\device 
    272                if (index1 := path.find("/", 2)) == -1 or ( 
    273                    index2 := path.find("/", index1 + 1) 
    274                ) == -1: 
    275                    drive, path = path, "" 
    276                else: 
    277                    drive, path = path[:index2], path[index2:] 
    278            else: 
    279                # Relative path, e.g. Windows 
    280                drive = "" 
    281 
    282            path = path.rstrip("/") or cls.root_marker 
    283            return drive + path 
    284 
    285        else: 
    286            return path.rstrip("/") or cls.root_marker 
    287 
    288    def _isfilestore(self): 
    289        # Inheriting from DaskFileSystem makes this False (S3, etc. were) 
    290        # the original motivation. But we are a posix-like file system. 
    291        # See https://github.com/dask/dask/issues/5526 
    292        return True 
    293 
    294    def chmod(self, path, mode): 
    295        path = stringify_path(path) 
    296        return os.chmod(path, mode) 
    297 
    298 
    299def make_path_posix(path): 
    300    """Make path generic and absolute for current OS""" 
    301    if not isinstance(path, str): 
    302        if isinstance(path, (list, set, tuple)): 
    303            return type(path)(make_path_posix(p) for p in path) 
    304        else: 
    305            path = stringify_path(path) 
    306            if not isinstance(path, str): 
    307                raise TypeError(f"could not convert {path!r} to string") 
    308    if os.sep == "/": 
    309        # Native posix 
    310        if path.startswith("/"): 
    311            # most common fast case for posix 
    312            return path 
    313        elif path.startswith("~"): 
    314            return osp.expanduser(path) 
    315        elif path.startswith("./"): 
    316            path = path[2:] 
    317        elif path == ".": 
    318            path = "" 
    319        return f"{os.getcwd()}/{path}" 
    320    else: 
    321        # NT handling 
    322        if path[0:1] == "/" and path[2:3] == ":": 
    323            # path is like "/c:/local/path" 
    324            path = path[1:] 
    325        if path[1:2] == ":": 
    326            # windows full path like "C:\\local\\path" 
    327            if len(path) <= 3: 
    328                # nt root (something like c:/) 
    329                return path[0] + ":/" 
    330            path = path.replace("\\", "/") 
    331            return path 
    332        elif path[0:1] == "~": 
    333            return make_path_posix(osp.expanduser(path)) 
    334        elif path.startswith(("\\\\", "//")): 
    335            # windows UNC/DFS-style paths 
    336            return "//" + path[2:].replace("\\", "/") 
    337        elif path.startswith(("\\", "/")): 
    338            # windows relative path with root 
    339            path = path.replace("\\", "/") 
    340            return f"{osp.splitdrive(os.getcwd())[0]}{path}" 
    341        else: 
    342            path = path.replace("\\", "/") 
    343            if path.startswith("./"): 
    344                path = path[2:] 
    345            elif path == ".": 
    346                path = "" 
    347            return f"{make_path_posix(os.getcwd())}/{path}" 
    348 
    349 
    350def trailing_sep(path): 
    351    """Return True if the path ends with a path separator. 
    352 
    353    A forward slash is always considered a path separator, even on Operating 
    354    Systems that normally use a backslash. 
    355    """ 
    356    # TODO: if all incoming paths were posix-compliant then separator would 
    357    # always be a forward slash, simplifying this function. 
    358    # See https://github.com/fsspec/filesystem_spec/pull/1250 
    359    return path.endswith(os.sep) or (os.altsep is not None and path.endswith(os.altsep)) 
    360 
    361 
    362@lru_cache(maxsize=1) 
    363def get_umask(mask: int = 0o666) -> int: 
    364    """Get the current umask. 
    365 
    366    Follows https://stackoverflow.com/a/44130549 to get the umask. 
    367    Temporarily sets the umask to the given value, and then resets it to the 
    368    original value. 
    369    """ 
    370    value = os.umask(mask) 
    371    os.umask(value) 
    372    return value 
    373 
    374 
    375class LocalFileOpener(io.IOBase): 
    376    def __init__( 
    377        self, path, mode, autocommit=True, fs=None, compression=None, **kwargs 
    378    ): 
    379        logger.debug("open file: %s", path) 
    380        self.path = path 
    381        self.mode = mode 
    382        self.fs = fs 
    383        self.f = None 
    384        self.autocommit = autocommit 
    385        self.compression = get_compression(path, compression) 
    386        self.blocksize = io.DEFAULT_BUFFER_SIZE 
    387        self._open() 
    388 
    389    def _open(self): 
    390        if self.f is None or self.f.closed: 
    391            if self.autocommit or "w" not in self.mode: 
    392                self.f = open(self.path, mode=self.mode) 
    393                if self.compression: 
    394                    compress = compr[self.compression] 
    395                    self.f = compress(self.f, mode=self.mode) 
    396            else: 
    397                # TODO: check if path is writable? 
    398                i, name = tempfile.mkstemp() 
    399                os.close(i)  # we want normal open and normal buffered file 
    400                self.temp = name 
    401                self.f = open(name, mode=self.mode) 
    402            if "w" not in self.mode: 
    403                self.size = self.f.seek(0, 2) 
    404                self.f.seek(0) 
    405                self.f.size = self.size 
    406 
    407    def _fetch_range(self, start, end): 
    408        # probably only used by cached FS 
    409        if "r" not in self.mode: 
    410            raise ValueError 
    411        self._open() 
    412        self.f.seek(start) 
    413        return self.f.read(end - start) 
    414 
    415    def __setstate__(self, state): 
    416        self.f = None 
    417        loc = state.pop("loc", None) 
    418        self.__dict__.update(state) 
    419        if "r" in state["mode"]: 
    420            self.f = None 
    421            self._open() 
    422            self.f.seek(loc) 
    423 
    424    def __getstate__(self): 
    425        d = self.__dict__.copy() 
    426        d.pop("f") 
    427        if "r" in self.mode: 
    428            d["loc"] = self.f.tell() 
    429        else: 
    430            if not self.f.closed: 
    431                raise ValueError("Cannot serialise open write-mode local file") 
    432        return d 
    433 
    434    def commit(self): 
    435        if self.autocommit: 
    436            raise RuntimeError("Can only commit if not already set to autocommit") 
    437        try: 
    438            shutil.move(self.temp, self.path) 
    439        except PermissionError as e: 
    440            # shutil.move raises PermissionError if os.rename 
    441            # and the default copy2 fallback with shutil.copystats fail. 
    442            # The file should be there nonetheless, but without copied permissions. 
    443            # If it doesn't exist, there was no permission to create the file. 
    444            if not os.path.exists(self.path): 
    445                raise e 
    446        else: 
    447            # If PermissionError is not raised, permissions can be set. 
    448            try: 
    449                mask = 0o666 
    450                os.chmod(self.path, mask & ~get_umask(mask)) 
    451            except RuntimeError: 
    452                pass 
    453 
    454    def discard(self): 
    455        if self.autocommit: 
    456            raise RuntimeError("Cannot discard if set to autocommit") 
    457        os.remove(self.temp) 
    458 
    459    def readable(self) -> bool: 
    460        return True 
    461 
    462    def writable(self) -> bool: 
    463        return "r" not in self.mode 
    464 
    465    def read(self, *args, **kwargs): 
    466        return self.f.read(*args, **kwargs) 
    467 
    468    def write(self, *args, **kwargs): 
    469        return self.f.write(*args, **kwargs) 
    470 
    471    def tell(self, *args, **kwargs): 
    472        return self.f.tell(*args, **kwargs) 
    473 
    474    def seek(self, *args, **kwargs): 
    475        return self.f.seek(*args, **kwargs) 
    476 
    477    def seekable(self, *args, **kwargs): 
    478        return self.f.seekable(*args, **kwargs) 
    479 
    480    def readline(self, *args, **kwargs): 
    481        return self.f.readline(*args, **kwargs) 
    482 
    483    def readlines(self, *args, **kwargs): 
    484        return self.f.readlines(*args, **kwargs) 
    485 
    486    def close(self): 
    487        return self.f.close() 
    488 
    489    def truncate(self, size=None) -> int: 
    490        return self.f.truncate(size) 
    491 
    492    @property 
    493    def closed(self): 
    494        return self.f.closed 
    495 
    496    def fileno(self): 
    497        return self.raw.fileno() 
    498 
    499    def flush(self) -> None: 
    500        self.f.flush() 
    501 
    502    def __iter__(self): 
    503        return self.f.__iter__() 
    504 
    505    def __getattr__(self, item): 
    506        return getattr(self.f, item) 
    507 
    508    def __enter__(self): 
    509        self._incontext = True 
    510        return self 
    511 
    512    def __exit__(self, exc_type, exc_value, traceback): 
    513        self._incontext = False 
    514        self.f.__exit__(exc_type, exc_value, traceback)