1import datetime
2import io
3import logging
4import os
5import os.path as osp
6import shutil
7import stat
8import tempfile
9from functools import lru_cache
10
11from fsspec import AbstractFileSystem
12from fsspec.compression import compr
13from fsspec.core import get_compression
14from fsspec.utils import isfilelike, stringify_path
15
16logger = logging.getLogger("fsspec.local")
17
18
19class LocalFileSystem(AbstractFileSystem):
20 """Interface to files on local storage
21
22 Parameters
23 ----------
24 auto_mkdir: bool
25 Whether, when opening a file, the directory containing it should
26 be created (if it doesn't already exist). This is assumed by pyarrow
27 code.
28 """
29
30 root_marker = "/"
31 protocol = "file", "local"
32 local_file = True
33
34 def __init__(self, auto_mkdir=False, **kwargs):
35 super().__init__(**kwargs)
36 self.auto_mkdir = auto_mkdir
37
38 @property
39 def fsid(self):
40 return "local"
41
42 def mkdir(self, path, create_parents=True, **kwargs):
43 path = self._strip_protocol(path)
44 if self.exists(path):
45 raise FileExistsError(path)
46 if create_parents:
47 self.makedirs(path, exist_ok=True)
48 else:
49 os.mkdir(path, **kwargs)
50
51 def makedirs(self, path, exist_ok=False):
52 path = self._strip_protocol(path)
53 os.makedirs(path, exist_ok=exist_ok)
54
55 def rmdir(self, path):
56 path = self._strip_protocol(path)
57 os.rmdir(path)
58
59 def ls(self, path, detail=False, **kwargs):
60 path = self._strip_protocol(path)
61 path_info = self.info(path)
62 infos = []
63 if path_info["type"] == "directory":
64 with os.scandir(path) as it:
65 for f in it:
66 try:
67 # Only get the info if requested since it is a bit expensive (the stat call inside)
68 # The strip_protocol is also used in info() and calls make_path_posix to always return posix paths
69 info = self.info(f) if detail else self._strip_protocol(f.path)
70 infos.append(info)
71 except FileNotFoundError:
72 pass
73 else:
74 infos = [path_info] if detail else [path_info["name"]]
75
76 return infos
77
78 def info(self, path, **kwargs):
79 if isinstance(path, os.DirEntry):
80 # scandir DirEntry
81 out = path.stat(follow_symlinks=False)
82 link = path.is_symlink()
83 if path.is_dir(follow_symlinks=False):
84 t = "directory"
85 elif path.is_file(follow_symlinks=False):
86 t = "file"
87 else:
88 t = "other"
89
90 size = out.st_size
91 if link:
92 try:
93 out2 = path.stat(follow_symlinks=True)
94 size = out2.st_size
95 except OSError:
96 size = 0
97 path = self._strip_protocol(path.path)
98 else:
99 # str or path-like
100 path = self._strip_protocol(path)
101 out = os.stat(path, follow_symlinks=False)
102 link = stat.S_ISLNK(out.st_mode)
103 if link:
104 out = os.stat(path, follow_symlinks=True)
105 size = out.st_size
106 if stat.S_ISDIR(out.st_mode):
107 t = "directory"
108 elif stat.S_ISREG(out.st_mode):
109 t = "file"
110 else:
111 t = "other"
112
113 # Check for the 'st_birthtime' attribute, which is not always present; fallback to st_ctime
114 created_time = getattr(out, "st_birthtime", out.st_ctime)
115
116 result = {
117 "name": path,
118 "size": size,
119 "type": t,
120 "created": created_time,
121 "islink": link,
122 }
123 for field in ["mode", "uid", "gid", "mtime", "ino", "nlink"]:
124 result[field] = getattr(out, f"st_{field}")
125 if link:
126 result["destination"] = os.readlink(path)
127 return result
128
129 def lexists(self, path, **kwargs):
130 return osp.lexists(path)
131
132 def cp_file(self, path1, path2, **kwargs):
133 path1 = self._strip_protocol(path1)
134 path2 = self._strip_protocol(path2)
135 if self.auto_mkdir:
136 self.makedirs(self._parent(path2), exist_ok=True)
137 if self.isfile(path1):
138 shutil.copyfile(path1, path2)
139 elif self.isdir(path1):
140 self.mkdirs(path2, exist_ok=True)
141 else:
142 raise FileNotFoundError(path1)
143
144 def isfile(self, path):
145 path = self._strip_protocol(path)
146 return os.path.isfile(path)
147
148 def isdir(self, path):
149 path = self._strip_protocol(path)
150 return os.path.isdir(path)
151
152 def get_file(self, path1, path2, callback=None, **kwargs):
153 if isfilelike(path2):
154 with open(path1, "rb") as f:
155 shutil.copyfileobj(f, path2)
156 else:
157 return self.cp_file(path1, path2, **kwargs)
158
159 def put_file(self, path1, path2, callback=None, **kwargs):
160 return self.cp_file(path1, path2, **kwargs)
161
162 def mv(self, path1, path2, recursive: bool = True, **kwargs):
163 """Move files/directories
164 For the specific case of local, all ops on directories are recursive and
165 the recursive= kwarg is ignored.
166 """
167 path1 = self._strip_protocol(path1)
168 path2 = self._strip_protocol(path2)
169
170 if self.auto_mkdir:
171 self.makedirs(self._parent(path2), exist_ok=True)
172
173 shutil.move(path1, path2)
174
175 def link(self, src, dst, **kwargs):
176 src = self._strip_protocol(src)
177 dst = self._strip_protocol(dst)
178 os.link(src, dst, **kwargs)
179
180 def symlink(self, src, dst, **kwargs):
181 src = self._strip_protocol(src)
182 dst = self._strip_protocol(dst)
183 os.symlink(src, dst, **kwargs)
184
185 def islink(self, path) -> bool:
186 return os.path.islink(self._strip_protocol(path))
187
188 def rm_file(self, path):
189 os.remove(self._strip_protocol(path))
190
191 def rm(self, path, recursive=False, maxdepth=None):
192 if not isinstance(path, list):
193 path = [path]
194
195 for p in path:
196 p = self._strip_protocol(p)
197 if self.isdir(p):
198 if not recursive:
199 raise ValueError("Cannot delete directory, set recursive=True")
200 if osp.abspath(p) == os.getcwd():
201 raise ValueError("Cannot delete current working directory")
202 shutil.rmtree(p)
203 else:
204 os.remove(p)
205
206 def unstrip_protocol(self, name):
207 protocol = self.protocol if isinstance(self.protocol, str) else self.protocol[0]
208 name = self._strip_protocol(name) # normalise for local/win/...
209 return f"{protocol}://{name}"
210
211 def _open(self, path, mode="rb", block_size=None, **kwargs):
212 path = self._strip_protocol(path)
213 if self.auto_mkdir and "w" in mode:
214 self.makedirs(self._parent(path), exist_ok=True)
215 return LocalFileOpener(path, mode, fs=self, **kwargs)
216
217 def touch(self, path, truncate=True, **kwargs):
218 path = self._strip_protocol(path)
219 if self.auto_mkdir:
220 self.makedirs(self._parent(path), exist_ok=True)
221 if self.exists(path):
222 os.utime(path, None)
223 else:
224 open(path, "a").close()
225 if truncate:
226 os.truncate(path, 0)
227
228 def created(self, path):
229 info = self.info(path=path)
230 return datetime.datetime.fromtimestamp(
231 info["created"], tz=datetime.timezone.utc
232 )
233
234 def modified(self, path):
235 info = self.info(path=path)
236 return datetime.datetime.fromtimestamp(info["mtime"], tz=datetime.timezone.utc)
237
238 @classmethod
239 def _parent(cls, path):
240 path = cls._strip_protocol(path)
241 if os.sep == "/":
242 # posix native
243 return path.rsplit("/", 1)[0] or "/"
244 else:
245 # NT
246 path_ = path.rsplit("/", 1)[0]
247 if len(path_) <= 3:
248 if path_[1:2] == ":":
249 # nt root (something like c:/)
250 return path_[0] + ":/"
251 # More cases may be required here
252 return path_
253
254 @classmethod
255 def _strip_protocol(cls, path):
256 path = stringify_path(path)
257 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
258 prefixes = (protocol + sep for protocol in protos for sep in ("://", ":"))
259 for prefix in prefixes:
260 if path.startswith(prefix):
261 path = path.removeprefix(prefix)
262 break
263
264 path = make_path_posix(path)
265 if os.sep != "/":
266 # This code-path is a stripped down version of
267 # > drive, path = ntpath.splitdrive(path)
268 if path[1:2] == ":":
269 # Absolute drive-letter path, e.g. X:\Windows
270 # Relative path with drive, e.g. X:Windows
271 drive, path = path[:2], path[2:]
272 elif path[:2] == "//":
273 # UNC drives, e.g. \\server\share or \\?\UNC\server\share
274 # Device drives, e.g. \\.\device or \\?\device
275 if (index1 := path.find("/", 2)) == -1 or (
276 index2 := path.find("/", index1 + 1)
277 ) == -1:
278 drive, path = path, ""
279 else:
280 drive, path = path[:index2], path[index2:]
281 else:
282 # Relative path, e.g. Windows
283 drive = ""
284
285 path = path.rstrip("/") or cls.root_marker
286 return drive + path
287
288 else:
289 return path.rstrip("/") or cls.root_marker
290
291 def _isfilestore(self):
292 # Inheriting from DaskFileSystem makes this False (S3, etc. were)
293 # the original motivation. But we are a posix-like file system.
294 # See https://github.com/dask/dask/issues/5526
295 return True
296
297 def chmod(self, path, mode):
298 path = stringify_path(path)
299 return os.chmod(path, mode)
300
301
302def make_path_posix(path):
303 """Make path generic and absolute for current OS"""
304 if not isinstance(path, str):
305 if isinstance(path, (list, set, tuple)):
306 return type(path)(make_path_posix(p) for p in path)
307 else:
308 path = stringify_path(path)
309 if not isinstance(path, str):
310 raise TypeError(f"could not convert {path!r} to string")
311 if os.sep == "/":
312 # Native posix
313 if path.startswith("/"):
314 # most common fast case for posix
315 return path
316 elif path.startswith("~"):
317 return osp.expanduser(path)
318 elif path.startswith("./"):
319 path = path[2:]
320 elif path == ".":
321 path = ""
322 return f"{os.getcwd()}/{path}"
323 else:
324 # NT handling
325 if path[0:1] == "/" and path[2:3] == ":":
326 # path is like "/c:/local/path"
327 path = path[1:]
328 if path[1:2] == ":":
329 # windows full path like "C:\\local\\path"
330 if len(path) <= 3:
331 # nt root (something like c:/)
332 return path[0] + ":/"
333 path = path.replace("\\", "/")
334 return path
335 elif path[0:1] == "~":
336 return make_path_posix(osp.expanduser(path))
337 elif path.startswith(("\\\\", "//")):
338 # windows UNC/DFS-style paths
339 return "//" + path[2:].replace("\\", "/")
340 elif path.startswith(("\\", "/")):
341 # windows relative path with root
342 path = path.replace("\\", "/")
343 return f"{osp.splitdrive(os.getcwd())[0]}{path}"
344 else:
345 path = path.replace("\\", "/")
346 if path.startswith("./"):
347 path = path[2:]
348 elif path == ".":
349 path = ""
350 return f"{make_path_posix(os.getcwd())}/{path}"
351
352
353def trailing_sep(path):
354 """Return True if the path ends with a path separator.
355
356 A forward slash is always considered a path separator, even on Operating
357 Systems that normally use a backslash.
358 """
359 # TODO: if all incoming paths were posix-compliant then separator would
360 # always be a forward slash, simplifying this function.
361 # See https://github.com/fsspec/filesystem_spec/pull/1250
362 return path.endswith(os.sep) or (os.altsep is not None and path.endswith(os.altsep))
363
364
365@lru_cache(maxsize=1)
366def get_umask(mask: int = 0o666) -> int:
367 """Get the current umask.
368
369 Follows https://stackoverflow.com/a/44130549 to get the umask.
370 Temporarily sets the umask to the given value, and then resets it to the
371 original value.
372 """
373 value = os.umask(mask)
374 os.umask(value)
375 return value
376
377
378class LocalFileOpener(io.IOBase):
379 def __init__(
380 self, path, mode, autocommit=True, fs=None, compression=None, **kwargs
381 ):
382 logger.debug("open file: %s", path)
383 self.path = path
384 self.mode = mode
385 self.fs = fs
386 self.f = None
387 self.autocommit = autocommit
388 self.compression = get_compression(path, compression)
389 self.blocksize = io.DEFAULT_BUFFER_SIZE
390 self._open()
391
392 def _open(self):
393 if self.f is None or self.f.closed:
394 if self.autocommit or "w" not in self.mode:
395 self.f = open(self.path, mode=self.mode)
396 if self.compression:
397 compress = compr[self.compression]
398 self.f = compress(self.f, mode=self.mode)
399 else:
400 # TODO: check if path is writable?
401 i, name = tempfile.mkstemp()
402 os.close(i) # we want normal open and normal buffered file
403 self.temp = name
404 self.f = open(name, mode=self.mode)
405 if "w" not in self.mode:
406 self.size = self.f.seek(0, 2)
407 self.f.seek(0)
408 self.f.size = self.size
409
410 def _fetch_range(self, start, end):
411 # probably only used by cached FS
412 if "r" not in self.mode:
413 raise ValueError
414 self._open()
415 self.f.seek(start)
416 return self.f.read(end - start)
417
418 def __setstate__(self, state):
419 self.f = None
420 loc = state.pop("loc", None)
421 self.__dict__.update(state)
422 if "r" in state["mode"]:
423 self.f = None
424 self._open()
425 self.f.seek(loc)
426
427 def __getstate__(self):
428 d = self.__dict__.copy()
429 d.pop("f")
430 if "r" in self.mode:
431 d["loc"] = self.f.tell()
432 else:
433 if not self.f.closed:
434 raise ValueError("Cannot serialise open write-mode local file")
435 return d
436
437 def commit(self):
438 if self.autocommit:
439 raise RuntimeError("Can only commit if not already set to autocommit")
440 try:
441 shutil.move(self.temp, self.path)
442 except PermissionError as e:
443 # shutil.move raises PermissionError if os.rename
444 # and the default copy2 fallback with shutil.copystats fail.
445 # The file should be there nonetheless, but without copied permissions.
446 # If it doesn't exist, there was no permission to create the file.
447 if not os.path.exists(self.path):
448 raise e
449 else:
450 # If PermissionError is not raised, permissions can be set.
451 try:
452 mask = 0o666
453 os.chmod(self.path, mask & ~get_umask(mask))
454 except RuntimeError:
455 pass
456
457 def discard(self):
458 if self.autocommit:
459 raise RuntimeError("Cannot discard if set to autocommit")
460 os.remove(self.temp)
461
462 def readable(self) -> bool:
463 return True
464
465 def writable(self) -> bool:
466 return "r" not in self.mode
467
468 def read(self, *args, **kwargs):
469 return self.f.read(*args, **kwargs)
470
471 def write(self, *args, **kwargs):
472 return self.f.write(*args, **kwargs)
473
474 def tell(self, *args, **kwargs):
475 return self.f.tell(*args, **kwargs)
476
477 def seek(self, *args, **kwargs):
478 return self.f.seek(*args, **kwargs)
479
480 def seekable(self, *args, **kwargs):
481 return self.f.seekable(*args, **kwargs)
482
483 def readline(self, *args, **kwargs):
484 return self.f.readline(*args, **kwargs)
485
486 def readlines(self, *args, **kwargs):
487 return self.f.readlines(*args, **kwargs)
488
489 def close(self):
490 return self.f.close()
491
492 def truncate(self, size=None) -> int:
493 return self.f.truncate(size)
494
495 @property
496 def closed(self):
497 return self.f.closed
498
499 def fileno(self):
500 return self.raw.fileno()
501
502 def flush(self) -> None:
503 self.f.flush()
504
505 def __iter__(self):
506 return self.f.__iter__()
507
508 def __getattr__(self, item):
509 return getattr(self.f, item)
510
511 def __enter__(self):
512 self._incontext = True
513 return self
514
515 def __exit__(self, exc_type, exc_value, traceback):
516 self._incontext = False
517 self.f.__exit__(exc_type, exc_value, traceback)