Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/implementations/local.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

349 statements  

1import datetime 

2import io 

3import logging 

4import os 

5import os.path as osp 

6import shutil 

7import stat 

8import tempfile 

9from functools import lru_cache 

10 

11from fsspec import AbstractFileSystem 

12from fsspec.compression import compr 

13from fsspec.core import get_compression 

14from fsspec.utils import isfilelike, stringify_path 

15 

16logger = logging.getLogger("fsspec.local") 

17 

18 

19class LocalFileSystem(AbstractFileSystem): 

20 """Interface to files on local storage 

21 

22 Parameters 

23 ---------- 

24 auto_mkdir: bool 

25 Whether, when opening a file, the directory containing it should 

26 be created (if it doesn't already exist). This is assumed by pyarrow 

27 code. 

28 """ 

29 

30 root_marker = "/" 

31 protocol = "file", "local" 

32 local_file = True 

33 

34 def __init__(self, auto_mkdir=False, **kwargs): 

35 super().__init__(**kwargs) 

36 self.auto_mkdir = auto_mkdir 

37 

38 @property 

39 def fsid(self): 

40 return "local" 

41 

42 def mkdir(self, path, create_parents=True, **kwargs): 

43 path = self._strip_protocol(path) 

44 if self.exists(path): 

45 raise FileExistsError(path) 

46 if create_parents: 

47 self.makedirs(path, exist_ok=True) 

48 else: 

49 os.mkdir(path, **kwargs) 

50 

51 def makedirs(self, path, exist_ok=False): 

52 path = self._strip_protocol(path) 

53 os.makedirs(path, exist_ok=exist_ok) 

54 

55 def rmdir(self, path): 

56 path = self._strip_protocol(path) 

57 os.rmdir(path) 

58 

59 def ls(self, path, detail=False, **kwargs): 

60 path = self._strip_protocol(path) 

61 path_info = self.info(path) 

62 infos = [] 

63 if path_info["type"] == "directory": 

64 with os.scandir(path) as it: 

65 for f in it: 

66 try: 

67 # Only get the info if requested since it is a bit expensive (the stat call inside) 

68 # The strip_protocol is also used in info() and calls make_path_posix to always return posix paths 

69 info = self.info(f) if detail else self._strip_protocol(f.path) 

70 infos.append(info) 

71 except FileNotFoundError: 

72 pass 

73 else: 

74 infos = [path_info] if detail else [path_info["name"]] 

75 

76 return infos 

77 

78 def info(self, path, **kwargs): 

79 if isinstance(path, os.DirEntry): 

80 # scandir DirEntry 

81 out = path.stat(follow_symlinks=False) 

82 link = path.is_symlink() 

83 if path.is_dir(follow_symlinks=False): 

84 t = "directory" 

85 elif path.is_file(follow_symlinks=False): 

86 t = "file" 

87 else: 

88 t = "other" 

89 

90 size = out.st_size 

91 if link: 

92 try: 

93 out2 = path.stat(follow_symlinks=True) 

94 size = out2.st_size 

95 except OSError: 

96 size = 0 

97 path = self._strip_protocol(path.path) 

98 else: 

99 # str or path-like 

100 path = self._strip_protocol(path) 

101 out = os.stat(path, follow_symlinks=False) 

102 link = stat.S_ISLNK(out.st_mode) 

103 if link: 

104 out = os.stat(path, follow_symlinks=True) 

105 size = out.st_size 

106 if stat.S_ISDIR(out.st_mode): 

107 t = "directory" 

108 elif stat.S_ISREG(out.st_mode): 

109 t = "file" 

110 else: 

111 t = "other" 

112 

113 # Check for the 'st_birthtime' attribute, which is not always present; fallback to st_ctime 

114 created_time = getattr(out, "st_birthtime", out.st_ctime) 

115 

116 result = { 

117 "name": path, 

118 "size": size, 

119 "type": t, 

120 "created": created_time, 

121 "islink": link, 

122 } 

123 for field in ["mode", "uid", "gid", "mtime", "ino", "nlink"]: 

124 result[field] = getattr(out, f"st_{field}") 

125 if link: 

126 result["destination"] = os.readlink(path) 

127 return result 

128 

129 def lexists(self, path, **kwargs): 

130 return osp.lexists(path) 

131 

132 def cp_file(self, path1, path2, **kwargs): 

133 path1 = self._strip_protocol(path1) 

134 path2 = self._strip_protocol(path2) 

135 if self.auto_mkdir: 

136 self.makedirs(self._parent(path2), exist_ok=True) 

137 if self.isfile(path1): 

138 shutil.copyfile(path1, path2) 

139 elif self.isdir(path1): 

140 self.mkdirs(path2, exist_ok=True) 

141 else: 

142 raise FileNotFoundError(path1) 

143 

144 def isfile(self, path): 

145 path = self._strip_protocol(path) 

146 return os.path.isfile(path) 

147 

148 def isdir(self, path): 

149 path = self._strip_protocol(path) 

150 return os.path.isdir(path) 

151 

152 def get_file(self, path1, path2, callback=None, **kwargs): 

153 if isfilelike(path2): 

154 with open(path1, "rb") as f: 

155 shutil.copyfileobj(f, path2) 

156 else: 

157 return self.cp_file(path1, path2, **kwargs) 

158 

159 def put_file(self, path1, path2, callback=None, **kwargs): 

160 return self.cp_file(path1, path2, **kwargs) 

161 

162 def mv(self, path1, path2, recursive: bool = True, **kwargs): 

163 """Move files/directories 

164 For the specific case of local, all ops on directories are recursive and 

165 the recursive= kwarg is ignored. 

166 """ 

167 path1 = self._strip_protocol(path1) 

168 path2 = self._strip_protocol(path2) 

169 

170 if self.auto_mkdir: 

171 self.makedirs(self._parent(path2), exist_ok=True) 

172 

173 shutil.move(path1, path2) 

174 

175 def link(self, src, dst, **kwargs): 

176 src = self._strip_protocol(src) 

177 dst = self._strip_protocol(dst) 

178 os.link(src, dst, **kwargs) 

179 

180 def symlink(self, src, dst, **kwargs): 

181 src = self._strip_protocol(src) 

182 dst = self._strip_protocol(dst) 

183 os.symlink(src, dst, **kwargs) 

184 

185 def islink(self, path) -> bool: 

186 return os.path.islink(self._strip_protocol(path)) 

187 

188 def rm_file(self, path): 

189 os.remove(self._strip_protocol(path)) 

190 

191 def rm(self, path, recursive=False, maxdepth=None): 

192 if not isinstance(path, list): 

193 path = [path] 

194 

195 for p in path: 

196 p = self._strip_protocol(p) 

197 if self.isdir(p): 

198 if not recursive: 

199 raise ValueError("Cannot delete directory, set recursive=True") 

200 if osp.abspath(p) == os.getcwd(): 

201 raise ValueError("Cannot delete current working directory") 

202 shutil.rmtree(p) 

203 else: 

204 os.remove(p) 

205 

206 def unstrip_protocol(self, name): 

207 protocol = self.protocol if isinstance(self.protocol, str) else self.protocol[0] 

208 name = self._strip_protocol(name) # normalise for local/win/... 

209 return f"{protocol}://{name}" 

210 

211 def _open(self, path, mode="rb", block_size=None, **kwargs): 

212 path = self._strip_protocol(path) 

213 if self.auto_mkdir and "w" in mode: 

214 self.makedirs(self._parent(path), exist_ok=True) 

215 return LocalFileOpener(path, mode, fs=self, **kwargs) 

216 

217 def touch(self, path, truncate=True, **kwargs): 

218 path = self._strip_protocol(path) 

219 if self.auto_mkdir: 

220 self.makedirs(self._parent(path), exist_ok=True) 

221 if self.exists(path): 

222 os.utime(path, None) 

223 else: 

224 open(path, "a").close() 

225 if truncate: 

226 os.truncate(path, 0) 

227 

228 def created(self, path): 

229 info = self.info(path=path) 

230 return datetime.datetime.fromtimestamp( 

231 info["created"], tz=datetime.timezone.utc 

232 ) 

233 

234 def modified(self, path): 

235 info = self.info(path=path) 

236 return datetime.datetime.fromtimestamp(info["mtime"], tz=datetime.timezone.utc) 

237 

238 @classmethod 

239 def _parent(cls, path): 

240 path = cls._strip_protocol(path) 

241 if os.sep == "/": 

242 # posix native 

243 return path.rsplit("/", 1)[0] or "/" 

244 else: 

245 # NT 

246 path_ = path.rsplit("/", 1)[0] 

247 if len(path_) <= 3: 

248 if path_[1:2] == ":": 

249 # nt root (something like c:/) 

250 return path_[0] + ":/" 

251 # More cases may be required here 

252 return path_ 

253 

254 @classmethod 

255 def _strip_protocol(cls, path): 

256 path = stringify_path(path) 

257 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol 

258 prefixes = (protocol + sep for protocol in protos for sep in ("://", ":")) 

259 for prefix in prefixes: 

260 if path.startswith(prefix): 

261 path = path.removeprefix(prefix) 

262 break 

263 

264 path = make_path_posix(path) 

265 if os.sep != "/": 

266 # This code-path is a stripped down version of 

267 # > drive, path = ntpath.splitdrive(path) 

268 if path[1:2] == ":": 

269 # Absolute drive-letter path, e.g. X:\Windows 

270 # Relative path with drive, e.g. X:Windows 

271 drive, path = path[:2], path[2:] 

272 elif path[:2] == "//": 

273 # UNC drives, e.g. \\server\share or \\?\UNC\server\share 

274 # Device drives, e.g. \\.\device or \\?\device 

275 if (index1 := path.find("/", 2)) == -1 or ( 

276 index2 := path.find("/", index1 + 1) 

277 ) == -1: 

278 drive, path = path, "" 

279 else: 

280 drive, path = path[:index2], path[index2:] 

281 else: 

282 # Relative path, e.g. Windows 

283 drive = "" 

284 

285 path = path.rstrip("/") or cls.root_marker 

286 return drive + path 

287 

288 else: 

289 return path.rstrip("/") or cls.root_marker 

290 

291 def _isfilestore(self): 

292 # Inheriting from DaskFileSystem makes this False (S3, etc. were) 

293 # the original motivation. But we are a posix-like file system. 

294 # See https://github.com/dask/dask/issues/5526 

295 return True 

296 

297 def chmod(self, path, mode): 

298 path = stringify_path(path) 

299 return os.chmod(path, mode) 

300 

301 

302def make_path_posix(path): 

303 """Make path generic and absolute for current OS""" 

304 if not isinstance(path, str): 

305 if isinstance(path, (list, set, tuple)): 

306 return type(path)(make_path_posix(p) for p in path) 

307 else: 

308 path = stringify_path(path) 

309 if not isinstance(path, str): 

310 raise TypeError(f"could not convert {path!r} to string") 

311 if os.sep == "/": 

312 # Native posix 

313 if path.startswith("/"): 

314 # most common fast case for posix 

315 return path 

316 elif path.startswith("~"): 

317 return osp.expanduser(path) 

318 elif path.startswith("./"): 

319 path = path[2:] 

320 elif path == ".": 

321 path = "" 

322 return f"{os.getcwd()}/{path}" 

323 else: 

324 # NT handling 

325 if path[0:1] == "/" and path[2:3] == ":": 

326 # path is like "/c:/local/path" 

327 path = path[1:] 

328 if path[1:2] == ":": 

329 # windows full path like "C:\\local\\path" 

330 if len(path) <= 3: 

331 # nt root (something like c:/) 

332 return path[0] + ":/" 

333 path = path.replace("\\", "/") 

334 return path 

335 elif path[0:1] == "~": 

336 return make_path_posix(osp.expanduser(path)) 

337 elif path.startswith(("\\\\", "//")): 

338 # windows UNC/DFS-style paths 

339 return "//" + path[2:].replace("\\", "/") 

340 elif path.startswith(("\\", "/")): 

341 # windows relative path with root 

342 path = path.replace("\\", "/") 

343 return f"{osp.splitdrive(os.getcwd())[0]}{path}" 

344 else: 

345 path = path.replace("\\", "/") 

346 if path.startswith("./"): 

347 path = path[2:] 

348 elif path == ".": 

349 path = "" 

350 return f"{make_path_posix(os.getcwd())}/{path}" 

351 

352 

353def trailing_sep(path): 

354 """Return True if the path ends with a path separator. 

355 

356 A forward slash is always considered a path separator, even on Operating 

357 Systems that normally use a backslash. 

358 """ 

359 # TODO: if all incoming paths were posix-compliant then separator would 

360 # always be a forward slash, simplifying this function. 

361 # See https://github.com/fsspec/filesystem_spec/pull/1250 

362 return path.endswith(os.sep) or (os.altsep is not None and path.endswith(os.altsep)) 

363 

364 

365@lru_cache(maxsize=1) 

366def get_umask(mask: int = 0o666) -> int: 

367 """Get the current umask. 

368 

369 Follows https://stackoverflow.com/a/44130549 to get the umask. 

370 Temporarily sets the umask to the given value, and then resets it to the 

371 original value. 

372 """ 

373 value = os.umask(mask) 

374 os.umask(value) 

375 return value 

376 

377 

378class LocalFileOpener(io.IOBase): 

379 def __init__( 

380 self, path, mode, autocommit=True, fs=None, compression=None, **kwargs 

381 ): 

382 logger.debug("open file: %s", path) 

383 self.path = path 

384 self.mode = mode 

385 self.fs = fs 

386 self.f = None 

387 self.autocommit = autocommit 

388 self.compression = get_compression(path, compression) 

389 self.blocksize = io.DEFAULT_BUFFER_SIZE 

390 self._open() 

391 

392 def _open(self): 

393 if self.f is None or self.f.closed: 

394 if self.autocommit or "w" not in self.mode: 

395 self.f = open(self.path, mode=self.mode) 

396 if self.compression: 

397 compress = compr[self.compression] 

398 self.f = compress(self.f, mode=self.mode) 

399 else: 

400 # TODO: check if path is writable? 

401 i, name = tempfile.mkstemp() 

402 os.close(i) # we want normal open and normal buffered file 

403 self.temp = name 

404 self.f = open(name, mode=self.mode) 

405 if "w" not in self.mode: 

406 self.size = self.f.seek(0, 2) 

407 self.f.seek(0) 

408 self.f.size = self.size 

409 

410 def _fetch_range(self, start, end): 

411 # probably only used by cached FS 

412 if "r" not in self.mode: 

413 raise ValueError 

414 self._open() 

415 self.f.seek(start) 

416 return self.f.read(end - start) 

417 

418 def __setstate__(self, state): 

419 self.f = None 

420 loc = state.pop("loc", None) 

421 self.__dict__.update(state) 

422 if "r" in state["mode"]: 

423 self.f = None 

424 self._open() 

425 self.f.seek(loc) 

426 

427 def __getstate__(self): 

428 d = self.__dict__.copy() 

429 d.pop("f") 

430 if "r" in self.mode: 

431 d["loc"] = self.f.tell() 

432 else: 

433 if not self.f.closed: 

434 raise ValueError("Cannot serialise open write-mode local file") 

435 return d 

436 

437 def commit(self): 

438 if self.autocommit: 

439 raise RuntimeError("Can only commit if not already set to autocommit") 

440 try: 

441 shutil.move(self.temp, self.path) 

442 except PermissionError as e: 

443 # shutil.move raises PermissionError if os.rename 

444 # and the default copy2 fallback with shutil.copystats fail. 

445 # The file should be there nonetheless, but without copied permissions. 

446 # If it doesn't exist, there was no permission to create the file. 

447 if not os.path.exists(self.path): 

448 raise e 

449 else: 

450 # If PermissionError is not raised, permissions can be set. 

451 try: 

452 mask = 0o666 

453 os.chmod(self.path, mask & ~get_umask(mask)) 

454 except RuntimeError: 

455 pass 

456 

457 def discard(self): 

458 if self.autocommit: 

459 raise RuntimeError("Cannot discard if set to autocommit") 

460 os.remove(self.temp) 

461 

462 def readable(self) -> bool: 

463 return True 

464 

465 def writable(self) -> bool: 

466 return "r" not in self.mode 

467 

468 def read(self, *args, **kwargs): 

469 return self.f.read(*args, **kwargs) 

470 

471 def write(self, *args, **kwargs): 

472 return self.f.write(*args, **kwargs) 

473 

474 def tell(self, *args, **kwargs): 

475 return self.f.tell(*args, **kwargs) 

476 

477 def seek(self, *args, **kwargs): 

478 return self.f.seek(*args, **kwargs) 

479 

480 def seekable(self, *args, **kwargs): 

481 return self.f.seekable(*args, **kwargs) 

482 

483 def readline(self, *args, **kwargs): 

484 return self.f.readline(*args, **kwargs) 

485 

486 def readlines(self, *args, **kwargs): 

487 return self.f.readlines(*args, **kwargs) 

488 

489 def close(self): 

490 return self.f.close() 

491 

492 def truncate(self, size=None) -> int: 

493 return self.f.truncate(size) 

494 

495 @property 

496 def closed(self): 

497 return self.f.closed 

498 

499 def fileno(self): 

500 return self.raw.fileno() 

501 

502 def flush(self) -> None: 

503 self.f.flush() 

504 

505 def __iter__(self): 

506 return self.f.__iter__() 

507 

508 def __getattr__(self, item): 

509 return getattr(self.f, item) 

510 

511 def __enter__(self): 

512 self._incontext = True 

513 return self 

514 

515 def __exit__(self, exc_type, exc_value, traceback): 

516 self._incontext = False 

517 self.f.__exit__(exc_type, exc_value, traceback)