Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_compat.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

323 statements  

1from __future__ import annotations 

2 

3import ntpath 

4import os 

5import posixpath 

6import sys 

7import warnings 

8from collections.abc import Sequence 

9from functools import wraps 

10from pathlib import Path 

11from pathlib import PurePath 

12from typing import TYPE_CHECKING 

13from typing import Any 

14from typing import Callable 

15from typing import TypeVar 

16from urllib.parse import SplitResult 

17 

18from fsspec import get_filesystem_class 

19 

20if TYPE_CHECKING: 

21 from upath import UPath 

22 

23__all__ = [ 

24 "PathlibPathShim", 

25 "str_remove_prefix", 

26 "str_remove_suffix", 

27 "FSSpecAccessorShim", 

28 "deprecated", 

29] 

30 

31 

32if sys.version_info >= (3, 12): # noqa: C901 

33 

34 class PathlibPathShim: 

35 """no need to shim pathlib.Path in Python 3.12+""" 

36 

37 __slots__ = () 

38 __missing_py312_slots__ = () 

39 

40 def __init__(self, *args): 

41 super().__init__(*args) 

42 

43else: 

44 

45 def _get_missing_py312_pathlib_slots(): 

46 """Return a tuple of slots that are present in Python 3.12's 

47 pathlib.Path but not in the current version of pathlib.Path 

48 """ 

49 py312_slots = ( 

50 "_raw_paths", 

51 "_drv", 

52 "_root", 

53 "_tail_cached", 

54 "_str", 

55 "_str_normcase_cached", 

56 "_parts_normcase_cached", 

57 "_lines_cached", 

58 "_hash", 

59 ) 

60 current_slots = [ 

61 slot for cls in Path.__mro__ for slot in getattr(cls, "__slots__", []) 

62 ] 

63 return tuple([slot for slot in py312_slots if slot not in current_slots]) 

64 

65 class PathlibPathShim: 

66 """A compatibility shim for python < 3.12 

67 

68 Basically vendoring the functionality of pathlib.Path from Python 3.12 

69 that's not overwritten in upath.core.UPath 

70 

71 """ 

72 

73 __slots__ = () 

74 __missing_py312_slots__ = _get_missing_py312_pathlib_slots() 

75 

76 def __init__(self, *args): 

77 paths = [] 

78 for arg in args: 

79 if isinstance(arg, PurePath) and hasattr(arg, "_raw_paths"): 

80 if arg._flavour is ntpath and self._flavour is posixpath: 

81 # GH-103631: Convert separators for backwards compatibility. 

82 paths.extend(path.replace("\\", "/") for path in arg._raw_paths) 

83 else: 

84 paths.extend(arg._raw_paths) 

85 else: 

86 try: 

87 path = os.fspath(arg) 

88 except TypeError: 

89 path = arg 

90 if not isinstance(path, str): 

91 raise TypeError( 

92 "argument should be a str or an os.PathLike " 

93 "object where __fspath__ returns a str, " 

94 f"not {type(path).__name__!r}" 

95 ) 

96 paths.append(path) 

97 self._raw_paths = paths 

98 

99 @classmethod 

100 def _parse_path(cls, path): 

101 if not path: 

102 return "", "", [] 

103 sep = cls._flavour.sep 

104 altsep = cls._flavour.altsep 

105 if altsep: 

106 path = path.replace(altsep, sep) 

107 drv, root, rel = cls._flavour.splitroot(path) 

108 if not root and drv.startswith(sep) and not drv.endswith(sep): 

109 drv_parts = drv.split(sep) 

110 if len(drv_parts) == 4 and drv_parts[2] not in "?.": 

111 # e.g. //server/share 

112 root = sep 

113 elif len(drv_parts) == 6: 

114 # e.g. //?/unc/server/share 

115 root = sep 

116 parsed = [sys.intern(str(x)) for x in rel.split(sep) if x and x != "."] 

117 return drv, root, parsed 

118 

119 def _load_parts(self): 

120 paths = self._raw_paths 

121 if len(paths) == 0: 

122 path = "" 

123 elif len(paths) == 1: 

124 path = paths[0] 

125 else: 

126 path = self._flavour.join(*paths) 

127 drv, root, tail = self._parse_path(path) 

128 self._drv = drv 

129 self._root = root 

130 self._tail_cached = tail 

131 

132 def _from_parsed_parts(self, drv, root, tail): 

133 path_str = self._format_parsed_parts(drv, root, tail) 

134 path = self.with_segments(path_str) 

135 path._str = path_str or "." 

136 path._drv = drv 

137 path._root = root 

138 path._tail_cached = tail 

139 return path 

140 

141 @classmethod 

142 def _format_parsed_parts(cls, drv, root, tail): 

143 if drv or root: 

144 return drv + root + cls._flavour.sep.join(tail) 

145 elif tail and cls._flavour.splitdrive(tail[0])[0]: 

146 tail = ["."] + tail 

147 return cls._flavour.sep.join(tail) 

148 

149 def __str__(self): 

150 try: 

151 return self._str 

152 except AttributeError: 

153 self._str = ( 

154 self._format_parsed_parts(self.drive, self.root, self._tail) or "." 

155 ) 

156 return self._str 

157 

158 @property 

159 def drive(self): 

160 try: 

161 return self._drv 

162 except AttributeError: 

163 self._load_parts() 

164 return self._drv 

165 

166 @property 

167 def root(self): 

168 try: 

169 return self._root 

170 except AttributeError: 

171 self._load_parts() 

172 return self._root 

173 

174 @property 

175 def _tail(self): 

176 try: 

177 return self._tail_cached 

178 except AttributeError: 

179 self._load_parts() 

180 return self._tail_cached 

181 

182 @property 

183 def anchor(self): 

184 anchor = self.drive + self.root 

185 return anchor 

186 

187 @property 

188 def name(self): 

189 tail = self._tail 

190 if not tail: 

191 return "" 

192 return tail[-1] 

193 

194 @property 

195 def suffix(self): 

196 name = self.name 

197 i = name.rfind(".") 

198 if 0 < i < len(name) - 1: 

199 return name[i:] 

200 else: 

201 return "" 

202 

203 @property 

204 def suffixes(self): 

205 name = self.name 

206 if name.endswith("."): 

207 return [] 

208 name = name.lstrip(".") 

209 return ["." + suffix for suffix in name.split(".")[1:]] 

210 

211 @property 

212 def stem(self): 

213 name = self.name 

214 i = name.rfind(".") 

215 if 0 < i < len(name) - 1: 

216 return name[:i] 

217 else: 

218 return name 

219 

220 def with_name(self, name): 

221 if not self.name: 

222 raise ValueError(f"{self!r} has an empty name") 

223 f = self._flavour 

224 if ( 

225 not name 

226 or f.sep in name 

227 or (f.altsep and f.altsep in name) 

228 or name == "." 

229 ): 

230 raise ValueError("Invalid name %r" % (name)) 

231 return self._from_parsed_parts( 

232 self.drive, self.root, self._tail[:-1] + [name] 

233 ) 

234 

235 def with_stem(self, stem): 

236 return self.with_name(stem + self.suffix) 

237 

238 def with_suffix(self, suffix): 

239 f = self._flavour 

240 if f.sep in suffix or f.altsep and f.altsep in suffix: 

241 raise ValueError(f"Invalid suffix {suffix!r}") 

242 if suffix and not suffix.startswith(".") or suffix == ".": 

243 raise ValueError("Invalid suffix %r" % (suffix)) 

244 name = self.name 

245 if not name: 

246 raise ValueError(f"{self!r} has an empty name") 

247 old_suffix = self.suffix 

248 if not old_suffix: 

249 name = name + suffix 

250 else: 

251 name = name[: -len(old_suffix)] + suffix 

252 return self._from_parsed_parts( 

253 self.drive, self.root, self._tail[:-1] + [name] 

254 ) 

255 

256 def relative_to(self, other, /, *_deprecated, walk_up=False): 

257 if _deprecated: 

258 msg = ( 

259 "support for supplying more than one positional argument " 

260 "to pathlib.PurePath.relative_to() is deprecated and " 

261 "scheduled for removal in Python 3.14" 

262 ) 

263 warnings.warn( 

264 f"pathlib.PurePath.relative_to(*args) {msg}", 

265 DeprecationWarning, 

266 stacklevel=2, 

267 ) 

268 other = self.with_segments(other, *_deprecated) 

269 for step, path in enumerate([other] + list(other.parents)): # noqa: B007 

270 if self.is_relative_to(path): 

271 break 

272 elif not walk_up: 

273 raise ValueError( 

274 f"{str(self)!r} is not in the subpath of {str(other)!r}" 

275 ) 

276 elif path.name == "..": 

277 raise ValueError(f"'..' segment in {str(other)!r} cannot be walked") 

278 else: 

279 raise ValueError( 

280 f"{str(self)!r} and {str(other)!r} have different anchors" 

281 ) 

282 parts = [".."] * step + self._tail[len(path._tail) :] 

283 return self.with_segments(*parts) 

284 

285 def is_relative_to(self, other, /, *_deprecated): 

286 if _deprecated: 

287 msg = ( 

288 "support for supplying more than one argument to " 

289 "pathlib.PurePath.is_relative_to() is deprecated and " 

290 "scheduled for removal in Python 3.14" 

291 ) 

292 warnings.warn( 

293 f"pathlib.PurePath.is_relative_to(*args) {msg}", 

294 DeprecationWarning, 

295 stacklevel=2, 

296 ) 

297 other = self.with_segments(other, *_deprecated) 

298 return other == self or other in self.parents 

299 

300 @property 

301 def parts(self): 

302 if self.drive or self.root: 

303 return (self.drive + self.root,) + tuple(self._tail) 

304 else: 

305 return tuple(self._tail) 

306 

307 @property 

308 def parent(self): 

309 drv = self.drive 

310 root = self.root 

311 tail = self._tail 

312 if not tail: 

313 return self 

314 return self._from_parsed_parts(drv, root, tail[:-1]) 

315 

316 @property 

317 def parents(self): 

318 return _PathParents(self) 

319 

320 def _make_child_relpath(self, name): 

321 path_str = str(self) 

322 tail = self._tail 

323 if tail: 

324 path_str = f"{path_str}{self._flavour.sep}{name}" 

325 elif path_str != ".": 

326 path_str = f"{path_str}{name}" 

327 else: 

328 path_str = name 

329 path = self.with_segments(path_str) 

330 path._str = path_str 

331 path._drv = self.drive 

332 path._root = self.root 

333 path._tail_cached = tail + [name] 

334 return path 

335 

336 def lchmod(self, mode): 

337 """ 

338 Like chmod(), except if the path points to a symlink, the symlink's 

339 permissions are changed, rather than its target's. 

340 """ 

341 self.chmod(mode, follow_symlinks=False) 

342 

343 class _PathParents(Sequence): 

344 __slots__ = ("_path", "_drv", "_root", "_tail") 

345 

346 def __init__(self, path): 

347 self._path = path 

348 self._drv = path.drive 

349 self._root = path.root 

350 self._tail = path._tail 

351 

352 def __len__(self): 

353 return len(self._tail) 

354 

355 def __getitem__(self, idx): 

356 if isinstance(idx, slice): 

357 return tuple(self[i] for i in range(*idx.indices(len(self)))) 

358 

359 if idx >= len(self) or idx < -len(self): 

360 raise IndexError(idx) 

361 if idx < 0: 

362 idx += len(self) 

363 return self._path._from_parsed_parts( 

364 self._drv, self._root, self._tail[: -idx - 1] 

365 ) 

366 

367 def __repr__(self): 

368 return f"<{type(self._path).__name__}.parents>" 

369 

370 

371if sys.version_info >= (3, 9): 

372 str_remove_suffix = str.removesuffix 

373 str_remove_prefix = str.removeprefix 

374 

375else: 

376 

377 def str_remove_suffix(s: str, suffix: str) -> str: 

378 if s.endswith(suffix): 

379 return s[: -len(suffix)] 

380 else: 

381 return s 

382 

383 def str_remove_prefix(s: str, prefix: str) -> str: 

384 if s.startswith(prefix): 

385 return s[len(prefix) :] 

386 else: 

387 return s 

388 

389 

390class FSSpecAccessorShim: 

391 """this is a compatibility shim and will be removed""" 

392 

393 def __init__(self, parsed_url: SplitResult | None, **kwargs: Any) -> None: 

394 if parsed_url and parsed_url.scheme: 

395 cls = get_filesystem_class(parsed_url.scheme) 

396 url_kwargs = cls._get_kwargs_from_urls(parsed_url.geturl()) 

397 else: 

398 cls = get_filesystem_class(None) 

399 url_kwargs = {} 

400 url_kwargs.update(kwargs) 

401 self._fs = cls(**url_kwargs) 

402 

403 def __init_subclass__(cls, **kwargs): 

404 warnings.warn( 

405 "All _FSSpecAccessor subclasses have been deprecated. " 

406 " Please follow the universal_pathlib==0.2.0 migration guide at" 

407 " https://github.com/fsspec/universal_pathlib for more" 

408 " information.", 

409 DeprecationWarning, 

410 stacklevel=2, 

411 ) 

412 

413 @classmethod 

414 def from_path(cls, path: UPath) -> FSSpecAccessorShim: 

415 """internal accessor for backwards compatibility""" 

416 url = path._url._replace(scheme=path.protocol) 

417 obj = cls(url, **path.storage_options) 

418 obj.__dict__["_fs"] = path.fs 

419 return obj 

420 

421 def _format_path(self, path: UPath) -> str: 

422 return path.path 

423 

424 def open(self, path, mode="r", *args, **kwargs): 

425 return path.fs.open(self._format_path(path), mode, *args, **kwargs) 

426 

427 def stat(self, path, **kwargs): 

428 return path.fs.stat(self._format_path(path), **kwargs) 

429 

430 def listdir(self, path, **kwargs): 

431 p_fmt = self._format_path(path) 

432 contents = path.fs.listdir(p_fmt, **kwargs) 

433 if len(contents) == 0 and not path.fs.isdir(p_fmt): 

434 raise NotADirectoryError(str(self)) 

435 elif ( 

436 len(contents) == 1 

437 and contents[0]["name"] == p_fmt 

438 and contents[0]["type"] == "file" 

439 ): 

440 raise NotADirectoryError(str(self)) 

441 return contents 

442 

443 def glob(self, _path, path_pattern, **kwargs): 

444 return _path.fs.glob(self._format_path(path_pattern), **kwargs) 

445 

446 def exists(self, path, **kwargs): 

447 return path.fs.exists(self._format_path(path), **kwargs) 

448 

449 def info(self, path, **kwargs): 

450 return path.fs.info(self._format_path(path), **kwargs) 

451 

452 def rm(self, path, recursive, **kwargs): 

453 return path.fs.rm(self._format_path(path), recursive=recursive, **kwargs) 

454 

455 def mkdir(self, path, create_parents=True, **kwargs): 

456 return path.fs.mkdir( 

457 self._format_path(path), create_parents=create_parents, **kwargs 

458 ) 

459 

460 def makedirs(self, path, exist_ok=False, **kwargs): 

461 return path.fs.makedirs(self._format_path(path), exist_ok=exist_ok, **kwargs) 

462 

463 def touch(self, path, **kwargs): 

464 return path.fs.touch(self._format_path(path), **kwargs) 

465 

466 def mv(self, path, target, recursive=False, maxdepth=None, **kwargs): 

467 if hasattr(target, "_accessor"): 

468 target = target._accessor._format_path(target) 

469 return path.fs.mv( 

470 self._format_path(path), 

471 target, 

472 recursive=recursive, 

473 maxdepth=maxdepth, 

474 **kwargs, 

475 ) 

476 

477 

478RT = TypeVar("RT") 

479F = Callable[..., RT] 

480 

481 

482def deprecated(*, python_version: tuple[int, ...]) -> Callable[[F], F]: 

483 """marks function as deprecated""" 

484 pyver_str = ".".join(map(str, python_version)) 

485 

486 def deprecated_decorator(func: F) -> F: 

487 if sys.version_info >= python_version: 

488 

489 @wraps(func) 

490 def wrapper(*args, **kwargs): 

491 warnings.warn( 

492 f"{func.__name__} is deprecated on py>={pyver_str}", 

493 DeprecationWarning, 

494 stacklevel=2, 

495 ) 

496 return func(*args, **kwargs) 

497 

498 return wrapper 

499 

500 else: 

501 return func 

502 

503 return deprecated_decorator 

504 

505 

506class method_and_classmethod: 

507 """Allow a method to be used as both a method and a classmethod""" 

508 

509 def __init__(self, method): 

510 self.method = method 

511 

512 def __get__(self, instance, owner): 

513 if instance is None: 

514 return self.method.__get__(owner) 

515 return self.method.__get__(instance)