1import datetime
2import io
3import logging
4import os
5import os.path as osp
6import shutil
7import stat
8import tempfile
9
10from fsspec import AbstractFileSystem
11from fsspec.compression import compr
12from fsspec.core import get_compression
13from fsspec.utils import isfilelike, stringify_path
14
15logger = logging.getLogger("fsspec.local")
16
17
18class LocalFileSystem(AbstractFileSystem):
19 """Interface to files on local storage
20
21 Parameters
22 ----------
23 auto_mkdir: bool
24 Whether, when opening a file, the directory containing it should
25 be created (if it doesn't already exist). This is assumed by pyarrow
26 code.
27 """
28
29 root_marker = "/"
30 protocol = "file", "local"
31 local_file = True
32
33 def __init__(self, auto_mkdir=False, **kwargs):
34 super().__init__(**kwargs)
35 self.auto_mkdir = auto_mkdir
36
37 @property
38 def fsid(self):
39 return "local"
40
41 def mkdir(self, path, create_parents=True, **kwargs):
42 path = self._strip_protocol(path)
43 if self.exists(path):
44 raise FileExistsError(path)
45 if create_parents:
46 self.makedirs(path, exist_ok=True)
47 else:
48 os.mkdir(path, **kwargs)
49
50 def makedirs(self, path, exist_ok=False):
51 path = self._strip_protocol(path)
52 os.makedirs(path, exist_ok=exist_ok)
53
54 def rmdir(self, path):
55 path = self._strip_protocol(path)
56 os.rmdir(path)
57
58 def ls(self, path, detail=False, **kwargs):
59 path = self._strip_protocol(path)
60 info = self.info(path)
61 if info["type"] == "directory":
62 with os.scandir(path) as it:
63 infos = [self.info(f) for f in it]
64 else:
65 infos = [info]
66
67 if not detail:
68 return [i["name"] for i in infos]
69 return infos
70
71 def info(self, path, **kwargs):
72 if isinstance(path, os.DirEntry):
73 # scandir DirEntry
74 out = path.stat(follow_symlinks=False)
75 link = path.is_symlink()
76 if path.is_dir(follow_symlinks=False):
77 t = "directory"
78 elif path.is_file(follow_symlinks=False):
79 t = "file"
80 else:
81 t = "other"
82 path = self._strip_protocol(path.path)
83 else:
84 # str or path-like
85 path = self._strip_protocol(path)
86 out = os.stat(path, follow_symlinks=False)
87 link = stat.S_ISLNK(out.st_mode)
88 if link:
89 out = os.stat(path, follow_symlinks=True)
90 if stat.S_ISDIR(out.st_mode):
91 t = "directory"
92 elif stat.S_ISREG(out.st_mode):
93 t = "file"
94 else:
95 t = "other"
96 result = {
97 "name": path,
98 "size": out.st_size,
99 "type": t,
100 "created": out.st_ctime,
101 "islink": link,
102 }
103 for field in ["mode", "uid", "gid", "mtime", "ino", "nlink"]:
104 result[field] = getattr(out, f"st_{field}")
105 if result["islink"]:
106 result["destination"] = os.readlink(path)
107 try:
108 out2 = os.stat(path, follow_symlinks=True)
109 result["size"] = out2.st_size
110 except OSError:
111 result["size"] = 0
112 return result
113
114 def lexists(self, path, **kwargs):
115 return osp.lexists(path)
116
117 def cp_file(self, path1, path2, **kwargs):
118 path1 = self._strip_protocol(path1)
119 path2 = self._strip_protocol(path2)
120 if self.auto_mkdir:
121 self.makedirs(self._parent(path2), exist_ok=True)
122 if self.isfile(path1):
123 shutil.copyfile(path1, path2)
124 elif self.isdir(path1):
125 self.mkdirs(path2, exist_ok=True)
126 else:
127 raise FileNotFoundError(path1)
128
129 def isfile(self, path):
130 path = self._strip_protocol(path)
131 return os.path.isfile(path)
132
133 def isdir(self, path):
134 path = self._strip_protocol(path)
135 return os.path.isdir(path)
136
137 def get_file(self, path1, path2, callback=None, **kwargs):
138 if isfilelike(path2):
139 with open(path1, "rb") as f:
140 shutil.copyfileobj(f, path2)
141 else:
142 return self.cp_file(path1, path2, **kwargs)
143
144 def put_file(self, path1, path2, callback=None, **kwargs):
145 return self.cp_file(path1, path2, **kwargs)
146
147 def mv(self, path1, path2, **kwargs):
148 path1 = self._strip_protocol(path1)
149 path2 = self._strip_protocol(path2)
150 shutil.move(path1, path2)
151
152 def link(self, src, dst, **kwargs):
153 src = self._strip_protocol(src)
154 dst = self._strip_protocol(dst)
155 os.link(src, dst, **kwargs)
156
157 def symlink(self, src, dst, **kwargs):
158 src = self._strip_protocol(src)
159 dst = self._strip_protocol(dst)
160 os.symlink(src, dst, **kwargs)
161
162 def islink(self, path) -> bool:
163 return os.path.islink(self._strip_protocol(path))
164
165 def rm_file(self, path):
166 os.remove(self._strip_protocol(path))
167
168 def rm(self, path, recursive=False, maxdepth=None):
169 if not isinstance(path, list):
170 path = [path]
171
172 for p in path:
173 p = self._strip_protocol(p)
174 if self.isdir(p):
175 if not recursive:
176 raise ValueError("Cannot delete directory, set recursive=True")
177 if osp.abspath(p) == os.getcwd():
178 raise ValueError("Cannot delete current working directory")
179 shutil.rmtree(p)
180 else:
181 os.remove(p)
182
183 def unstrip_protocol(self, name):
184 name = self._strip_protocol(name) # normalise for local/win/...
185 return f"file://{name}"
186
187 def _open(self, path, mode="rb", block_size=None, **kwargs):
188 path = self._strip_protocol(path)
189 if self.auto_mkdir and "w" in mode:
190 self.makedirs(self._parent(path), exist_ok=True)
191 return LocalFileOpener(path, mode, fs=self, **kwargs)
192
193 def touch(self, path, truncate=True, **kwargs):
194 path = self._strip_protocol(path)
195 if self.auto_mkdir:
196 self.makedirs(self._parent(path), exist_ok=True)
197 if self.exists(path):
198 os.utime(path, None)
199 else:
200 open(path, "a").close()
201 if truncate:
202 os.truncate(path, 0)
203
204 def created(self, path):
205 info = self.info(path=path)
206 return datetime.datetime.fromtimestamp(
207 info["created"], tz=datetime.timezone.utc
208 )
209
210 def modified(self, path):
211 info = self.info(path=path)
212 return datetime.datetime.fromtimestamp(info["mtime"], tz=datetime.timezone.utc)
213
214 @classmethod
215 def _parent(cls, path):
216 path = cls._strip_protocol(path)
217 if os.sep == "/":
218 # posix native
219 return path.rsplit("/", 1)[0] or "/"
220 else:
221 # NT
222 path_ = path.rsplit("/", 1)[0]
223 if len(path_) <= 3:
224 if path_[1:2] == ":":
225 # nt root (something like c:/)
226 return path_[0] + ":/"
227 # More cases may be required here
228 return path_
229
230 @classmethod
231 def _strip_protocol(cls, path):
232 path = stringify_path(path)
233 if path.startswith("file://"):
234 path = path[7:]
235 elif path.startswith("file:"):
236 path = path[5:]
237 elif path.startswith("local://"):
238 path = path[8:]
239 elif path.startswith("local:"):
240 path = path[6:]
241
242 path = make_path_posix(path)
243 if os.sep != "/":
244 # This code-path is a stripped down version of
245 # > drive, path = ntpath.splitdrive(path)
246 if path[1:2] == ":":
247 # Absolute drive-letter path, e.g. X:\Windows
248 # Relative path with drive, e.g. X:Windows
249 drive, path = path[:2], path[2:]
250 elif path[:2] == "//":
251 # UNC drives, e.g. \\server\share or \\?\UNC\server\share
252 # Device drives, e.g. \\.\device or \\?\device
253 if (index1 := path.find("/", 2)) == -1 or (
254 index2 := path.find("/", index1 + 1)
255 ) == -1:
256 drive, path = path, ""
257 else:
258 drive, path = path[:index2], path[index2:]
259 else:
260 # Relative path, e.g. Windows
261 drive = ""
262
263 path = path.rstrip("/") or cls.root_marker
264 return drive + path
265
266 else:
267 return path.rstrip("/") or cls.root_marker
268
269 def _isfilestore(self):
270 # Inheriting from DaskFileSystem makes this False (S3, etc. were)
271 # the original motivation. But we are a posix-like file system.
272 # See https://github.com/dask/dask/issues/5526
273 return True
274
275 def chmod(self, path, mode):
276 path = stringify_path(path)
277 return os.chmod(path, mode)
278
279
280def make_path_posix(path):
281 """Make path generic and absolute for current OS"""
282 if not isinstance(path, str):
283 if isinstance(path, (list, set, tuple)):
284 return type(path)(make_path_posix(p) for p in path)
285 else:
286 path = stringify_path(path)
287 if not isinstance(path, str):
288 raise TypeError(f"could not convert {path!r} to string")
289 if os.sep == "/":
290 # Native posix
291 if path.startswith("/"):
292 # most common fast case for posix
293 return path
294 elif path.startswith("~"):
295 return osp.expanduser(path)
296 elif path.startswith("./"):
297 path = path[2:]
298 elif path == ".":
299 path = ""
300 return f"{os.getcwd()}/{path}"
301 else:
302 # NT handling
303 if path[0:1] == "/" and path[2:3] == ":":
304 # path is like "/c:/local/path"
305 path = path[1:]
306 if path[1:2] == ":":
307 # windows full path like "C:\\local\\path"
308 if len(path) <= 3:
309 # nt root (something like c:/)
310 return path[0] + ":/"
311 path = path.replace("\\", "/")
312 return path
313 elif path[0:1] == "~":
314 return make_path_posix(osp.expanduser(path))
315 elif path.startswith(("\\\\", "//")):
316 # windows UNC/DFS-style paths
317 return "//" + path[2:].replace("\\", "/")
318 elif path.startswith(("\\", "/")):
319 # windows relative path with root
320 path = path.replace("\\", "/")
321 return f"{osp.splitdrive(os.getcwd())[0]}{path}"
322 else:
323 path = path.replace("\\", "/")
324 if path.startswith("./"):
325 path = path[2:]
326 elif path == ".":
327 path = ""
328 return f"{make_path_posix(os.getcwd())}/{path}"
329
330
331def trailing_sep(path):
332 """Return True if the path ends with a path separator.
333
334 A forward slash is always considered a path separator, even on Operating
335 Systems that normally use a backslash.
336 """
337 # TODO: if all incoming paths were posix-compliant then separator would
338 # always be a forward slash, simplifying this function.
339 # See https://github.com/fsspec/filesystem_spec/pull/1250
340 return path.endswith(os.sep) or (os.altsep is not None and path.endswith(os.altsep))
341
342
343class LocalFileOpener(io.IOBase):
344 def __init__(
345 self, path, mode, autocommit=True, fs=None, compression=None, **kwargs
346 ):
347 logger.debug("open file: %s", path)
348 self.path = path
349 self.mode = mode
350 self.fs = fs
351 self.f = None
352 self.autocommit = autocommit
353 self.compression = get_compression(path, compression)
354 self.blocksize = io.DEFAULT_BUFFER_SIZE
355 self._open()
356
357 def _open(self):
358 if self.f is None or self.f.closed:
359 if self.autocommit or "w" not in self.mode:
360 self.f = open(self.path, mode=self.mode)
361 if self.compression:
362 compress = compr[self.compression]
363 self.f = compress(self.f, mode=self.mode)
364 else:
365 # TODO: check if path is writable?
366 i, name = tempfile.mkstemp()
367 os.close(i) # we want normal open and normal buffered file
368 self.temp = name
369 self.f = open(name, mode=self.mode)
370 if "w" not in self.mode:
371 self.size = self.f.seek(0, 2)
372 self.f.seek(0)
373 self.f.size = self.size
374
375 def _fetch_range(self, start, end):
376 # probably only used by cached FS
377 if "r" not in self.mode:
378 raise ValueError
379 self._open()
380 self.f.seek(start)
381 return self.f.read(end - start)
382
383 def __setstate__(self, state):
384 self.f = None
385 loc = state.pop("loc", None)
386 self.__dict__.update(state)
387 if "r" in state["mode"]:
388 self.f = None
389 self._open()
390 self.f.seek(loc)
391
392 def __getstate__(self):
393 d = self.__dict__.copy()
394 d.pop("f")
395 if "r" in self.mode:
396 d["loc"] = self.f.tell()
397 else:
398 if not self.f.closed:
399 raise ValueError("Cannot serialise open write-mode local file")
400 return d
401
402 def commit(self):
403 if self.autocommit:
404 raise RuntimeError("Can only commit if not already set to autocommit")
405 shutil.move(self.temp, self.path)
406
407 def discard(self):
408 if self.autocommit:
409 raise RuntimeError("Cannot discard if set to autocommit")
410 os.remove(self.temp)
411
412 def readable(self) -> bool:
413 return True
414
415 def writable(self) -> bool:
416 return "r" not in self.mode
417
418 def read(self, *args, **kwargs):
419 return self.f.read(*args, **kwargs)
420
421 def write(self, *args, **kwargs):
422 return self.f.write(*args, **kwargs)
423
424 def tell(self, *args, **kwargs):
425 return self.f.tell(*args, **kwargs)
426
427 def seek(self, *args, **kwargs):
428 return self.f.seek(*args, **kwargs)
429
430 def seekable(self, *args, **kwargs):
431 return self.f.seekable(*args, **kwargs)
432
433 def readline(self, *args, **kwargs):
434 return self.f.readline(*args, **kwargs)
435
436 def readlines(self, *args, **kwargs):
437 return self.f.readlines(*args, **kwargs)
438
439 def close(self):
440 return self.f.close()
441
442 def truncate(self, size=None) -> int:
443 return self.f.truncate(size)
444
445 @property
446 def closed(self):
447 return self.f.closed
448
449 def fileno(self):
450 return self.raw.fileno()
451
452 def flush(self) -> None:
453 self.f.flush()
454
455 def __iter__(self):
456 return self.f.__iter__()
457
458 def __getattr__(self, item):
459 return getattr(self.f, item)
460
461 def __enter__(self):
462 self._incontext = True
463 return self
464
465 def __exit__(self, exc_type, exc_value, traceback):
466 self._incontext = False
467 self.f.__exit__(exc_type, exc_value, traceback)