Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/_compat.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import ntpath
4import os
5import posixpath
6import sys
7import warnings
8from collections.abc import Sequence
9from functools import wraps
10from pathlib import Path
11from pathlib import PurePath
12from typing import TYPE_CHECKING
13from typing import Any
14from typing import Callable
15from typing import TypeVar
16from urllib.parse import SplitResult
18from fsspec import get_filesystem_class
20if TYPE_CHECKING:
21 from upath import UPath
23__all__ = [
24 "PathlibPathShim",
25 "str_remove_prefix",
26 "str_remove_suffix",
27 "FSSpecAccessorShim",
28 "deprecated",
29]
32if sys.version_info >= (3, 12): # noqa: C901
34 class PathlibPathShim:
35 """no need to shim pathlib.Path in Python 3.12+"""
37 __slots__ = ()
38 __missing_py312_slots__ = ()
40 def __init__(self, *args):
41 super().__init__(*args)
43else:
45 def _get_missing_py312_pathlib_slots():
46 """Return a tuple of slots that are present in Python 3.12's
47 pathlib.Path but not in the current version of pathlib.Path
48 """
49 py312_slots = (
50 "_raw_paths",
51 "_drv",
52 "_root",
53 "_tail_cached",
54 "_str",
55 "_str_normcase_cached",
56 "_parts_normcase_cached",
57 "_lines_cached",
58 "_hash",
59 )
60 current_slots = [
61 slot for cls in Path.__mro__ for slot in getattr(cls, "__slots__", [])
62 ]
63 return tuple([slot for slot in py312_slots if slot not in current_slots])
65 class PathlibPathShim:
66 """A compatibility shim for python < 3.12
68 Basically vendoring the functionality of pathlib.Path from Python 3.12
69 that's not overwritten in upath.core.UPath
71 """
73 __slots__ = ()
74 __missing_py312_slots__ = _get_missing_py312_pathlib_slots()
76 def __init__(self, *args):
77 paths = []
78 for arg in args:
79 if isinstance(arg, PurePath) and hasattr(arg, "_raw_paths"):
80 if arg._flavour is ntpath and self._flavour is posixpath:
81 # GH-103631: Convert separators for backwards compatibility.
82 paths.extend(path.replace("\\", "/") for path in arg._raw_paths)
83 else:
84 paths.extend(arg._raw_paths)
85 else:
86 try:
87 path = os.fspath(arg)
88 except TypeError:
89 path = arg
90 if not isinstance(path, str):
91 raise TypeError(
92 "argument should be a str or an os.PathLike "
93 "object where __fspath__ returns a str, "
94 f"not {type(path).__name__!r}"
95 )
96 paths.append(path)
97 self._raw_paths = paths
99 @classmethod
100 def _parse_path(cls, path):
101 if not path:
102 return "", "", []
103 sep = cls._flavour.sep
104 altsep = cls._flavour.altsep
105 if altsep:
106 path = path.replace(altsep, sep)
107 drv, root, rel = cls._flavour.splitroot(path)
108 if not root and drv.startswith(sep) and not drv.endswith(sep):
109 drv_parts = drv.split(sep)
110 if len(drv_parts) == 4 and drv_parts[2] not in "?.":
111 # e.g. //server/share
112 root = sep
113 elif len(drv_parts) == 6:
114 # e.g. //?/unc/server/share
115 root = sep
116 parsed = [sys.intern(str(x)) for x in rel.split(sep) if x and x != "."]
117 return drv, root, parsed
119 def _load_parts(self):
120 paths = self._raw_paths
121 if len(paths) == 0:
122 path = ""
123 elif len(paths) == 1:
124 path = paths[0]
125 else:
126 path = self._flavour.join(*paths)
127 drv, root, tail = self._parse_path(path)
128 self._drv = drv
129 self._root = root
130 self._tail_cached = tail
132 def _from_parsed_parts(self, drv, root, tail):
133 path_str = self._format_parsed_parts(drv, root, tail)
134 path = self.with_segments(path_str)
135 path._str = path_str or "."
136 path._drv = drv
137 path._root = root
138 path._tail_cached = tail
139 return path
141 @classmethod
142 def _format_parsed_parts(cls, drv, root, tail):
143 if drv or root:
144 return drv + root + cls._flavour.sep.join(tail)
145 elif tail and cls._flavour.splitdrive(tail[0])[0]:
146 tail = ["."] + tail
147 return cls._flavour.sep.join(tail)
149 def __str__(self):
150 try:
151 return self._str
152 except AttributeError:
153 self._str = (
154 self._format_parsed_parts(self.drive, self.root, self._tail) or "."
155 )
156 return self._str
158 @property
159 def drive(self):
160 try:
161 return self._drv
162 except AttributeError:
163 self._load_parts()
164 return self._drv
166 @property
167 def root(self):
168 try:
169 return self._root
170 except AttributeError:
171 self._load_parts()
172 return self._root
174 @property
175 def _tail(self):
176 try:
177 return self._tail_cached
178 except AttributeError:
179 self._load_parts()
180 return self._tail_cached
182 @property
183 def anchor(self):
184 anchor = self.drive + self.root
185 return anchor
187 @property
188 def name(self):
189 tail = self._tail
190 if not tail:
191 return ""
192 return tail[-1]
194 @property
195 def suffix(self):
196 name = self.name
197 i = name.rfind(".")
198 if 0 < i < len(name) - 1:
199 return name[i:]
200 else:
201 return ""
203 @property
204 def suffixes(self):
205 name = self.name
206 if name.endswith("."):
207 return []
208 name = name.lstrip(".")
209 return ["." + suffix for suffix in name.split(".")[1:]]
211 @property
212 def stem(self):
213 name = self.name
214 i = name.rfind(".")
215 if 0 < i < len(name) - 1:
216 return name[:i]
217 else:
218 return name
220 def with_name(self, name):
221 if not self.name:
222 raise ValueError(f"{self!r} has an empty name")
223 f = self._flavour
224 if (
225 not name
226 or f.sep in name
227 or (f.altsep and f.altsep in name)
228 or name == "."
229 ):
230 raise ValueError("Invalid name %r" % (name))
231 return self._from_parsed_parts(
232 self.drive, self.root, self._tail[:-1] + [name]
233 )
235 def with_stem(self, stem):
236 return self.with_name(stem + self.suffix)
238 def with_suffix(self, suffix):
239 f = self._flavour
240 if f.sep in suffix or f.altsep and f.altsep in suffix:
241 raise ValueError(f"Invalid suffix {suffix!r}")
242 if suffix and not suffix.startswith(".") or suffix == ".":
243 raise ValueError("Invalid suffix %r" % (suffix))
244 name = self.name
245 if not name:
246 raise ValueError(f"{self!r} has an empty name")
247 old_suffix = self.suffix
248 if not old_suffix:
249 name = name + suffix
250 else:
251 name = name[: -len(old_suffix)] + suffix
252 return self._from_parsed_parts(
253 self.drive, self.root, self._tail[:-1] + [name]
254 )
256 def relative_to(self, other, /, *_deprecated, walk_up=False):
257 if _deprecated:
258 msg = (
259 "support for supplying more than one positional argument "
260 "to pathlib.PurePath.relative_to() is deprecated and "
261 "scheduled for removal in Python 3.14"
262 )
263 warnings.warn(
264 f"pathlib.PurePath.relative_to(*args) {msg}",
265 DeprecationWarning,
266 stacklevel=2,
267 )
268 other = self.with_segments(other, *_deprecated)
269 for step, path in enumerate([other] + list(other.parents)): # noqa: B007
270 if self.is_relative_to(path):
271 break
272 elif not walk_up:
273 raise ValueError(
274 f"{str(self)!r} is not in the subpath of {str(other)!r}"
275 )
276 elif path.name == "..":
277 raise ValueError(f"'..' segment in {str(other)!r} cannot be walked")
278 else:
279 raise ValueError(
280 f"{str(self)!r} and {str(other)!r} have different anchors"
281 )
282 parts = [".."] * step + self._tail[len(path._tail) :]
283 return self.with_segments(*parts)
285 def is_relative_to(self, other, /, *_deprecated):
286 if _deprecated:
287 msg = (
288 "support for supplying more than one argument to "
289 "pathlib.PurePath.is_relative_to() is deprecated and "
290 "scheduled for removal in Python 3.14"
291 )
292 warnings.warn(
293 f"pathlib.PurePath.is_relative_to(*args) {msg}",
294 DeprecationWarning,
295 stacklevel=2,
296 )
297 other = self.with_segments(other, *_deprecated)
298 return other == self or other in self.parents
300 @property
301 def parts(self):
302 if self.drive or self.root:
303 return (self.drive + self.root,) + tuple(self._tail)
304 else:
305 return tuple(self._tail)
307 @property
308 def parent(self):
309 drv = self.drive
310 root = self.root
311 tail = self._tail
312 if not tail:
313 return self
314 return self._from_parsed_parts(drv, root, tail[:-1])
316 @property
317 def parents(self):
318 return _PathParents(self)
320 def _make_child_relpath(self, name):
321 path_str = str(self)
322 tail = self._tail
323 if tail:
324 path_str = f"{path_str}{self._flavour.sep}{name}"
325 elif path_str != ".":
326 path_str = f"{path_str}{name}"
327 else:
328 path_str = name
329 path = self.with_segments(path_str)
330 path._str = path_str
331 path._drv = self.drive
332 path._root = self.root
333 path._tail_cached = tail + [name]
334 return path
336 def lchmod(self, mode):
337 """
338 Like chmod(), except if the path points to a symlink, the symlink's
339 permissions are changed, rather than its target's.
340 """
341 self.chmod(mode, follow_symlinks=False)
343 class _PathParents(Sequence):
344 __slots__ = ("_path", "_drv", "_root", "_tail")
346 def __init__(self, path):
347 self._path = path
348 self._drv = path.drive
349 self._root = path.root
350 self._tail = path._tail
352 def __len__(self):
353 return len(self._tail)
355 def __getitem__(self, idx):
356 if isinstance(idx, slice):
357 return tuple(self[i] for i in range(*idx.indices(len(self))))
359 if idx >= len(self) or idx < -len(self):
360 raise IndexError(idx)
361 if idx < 0:
362 idx += len(self)
363 return self._path._from_parsed_parts(
364 self._drv, self._root, self._tail[: -idx - 1]
365 )
367 def __repr__(self):
368 return f"<{type(self._path).__name__}.parents>"
371if sys.version_info >= (3, 9):
372 str_remove_suffix = str.removesuffix
373 str_remove_prefix = str.removeprefix
375else:
377 def str_remove_suffix(s: str, suffix: str) -> str:
378 if s.endswith(suffix):
379 return s[: -len(suffix)]
380 else:
381 return s
383 def str_remove_prefix(s: str, prefix: str) -> str:
384 if s.startswith(prefix):
385 return s[len(prefix) :]
386 else:
387 return s
390class FSSpecAccessorShim:
391 """this is a compatibility shim and will be removed"""
393 def __init__(self, parsed_url: SplitResult | None, **kwargs: Any) -> None:
394 if parsed_url and parsed_url.scheme:
395 cls = get_filesystem_class(parsed_url.scheme)
396 url_kwargs = cls._get_kwargs_from_urls(parsed_url.geturl())
397 else:
398 cls = get_filesystem_class(None)
399 url_kwargs = {}
400 url_kwargs.update(kwargs)
401 self._fs = cls(**url_kwargs)
403 def __init_subclass__(cls, **kwargs):
404 warnings.warn(
405 "All _FSSpecAccessor subclasses have been deprecated. "
406 " Please follow the universal_pathlib==0.2.0 migration guide at"
407 " https://github.com/fsspec/universal_pathlib for more"
408 " information.",
409 DeprecationWarning,
410 stacklevel=2,
411 )
413 @classmethod
414 def from_path(cls, path: UPath) -> FSSpecAccessorShim:
415 """internal accessor for backwards compatibility"""
416 url = path._url._replace(scheme=path.protocol)
417 obj = cls(url, **path.storage_options)
418 obj.__dict__["_fs"] = path.fs
419 return obj
421 def _format_path(self, path: UPath) -> str:
422 return path.path
424 def open(self, path, mode="r", *args, **kwargs):
425 return path.fs.open(self._format_path(path), mode, *args, **kwargs)
427 def stat(self, path, **kwargs):
428 return path.fs.stat(self._format_path(path), **kwargs)
430 def listdir(self, path, **kwargs):
431 p_fmt = self._format_path(path)
432 contents = path.fs.listdir(p_fmt, **kwargs)
433 if len(contents) == 0 and not path.fs.isdir(p_fmt):
434 raise NotADirectoryError(str(self))
435 elif (
436 len(contents) == 1
437 and contents[0]["name"] == p_fmt
438 and contents[0]["type"] == "file"
439 ):
440 raise NotADirectoryError(str(self))
441 return contents
443 def glob(self, _path, path_pattern, **kwargs):
444 return _path.fs.glob(self._format_path(path_pattern), **kwargs)
446 def exists(self, path, **kwargs):
447 return path.fs.exists(self._format_path(path), **kwargs)
449 def info(self, path, **kwargs):
450 return path.fs.info(self._format_path(path), **kwargs)
452 def rm(self, path, recursive, **kwargs):
453 return path.fs.rm(self._format_path(path), recursive=recursive, **kwargs)
455 def mkdir(self, path, create_parents=True, **kwargs):
456 return path.fs.mkdir(
457 self._format_path(path), create_parents=create_parents, **kwargs
458 )
460 def makedirs(self, path, exist_ok=False, **kwargs):
461 return path.fs.makedirs(self._format_path(path), exist_ok=exist_ok, **kwargs)
463 def touch(self, path, **kwargs):
464 return path.fs.touch(self._format_path(path), **kwargs)
466 def mv(self, path, target, recursive=False, maxdepth=None, **kwargs):
467 if hasattr(target, "_accessor"):
468 target = target._accessor._format_path(target)
469 return path.fs.mv(
470 self._format_path(path),
471 target,
472 recursive=recursive,
473 maxdepth=maxdepth,
474 **kwargs,
475 )
478RT = TypeVar("RT")
479F = Callable[..., RT]
482def deprecated(*, python_version: tuple[int, ...]) -> Callable[[F], F]:
483 """marks function as deprecated"""
484 pyver_str = ".".join(map(str, python_version))
486 def deprecated_decorator(func: F) -> F:
487 if sys.version_info >= python_version:
489 @wraps(func)
490 def wrapper(*args, **kwargs):
491 warnings.warn(
492 f"{func.__name__} is deprecated on py>={pyver_str}",
493 DeprecationWarning,
494 stacklevel=2,
495 )
496 return func(*args, **kwargs)
498 return wrapper
500 else:
501 return func
503 return deprecated_decorator
506class method_and_classmethod:
507 """Allow a method to be used as both a method and a classmethod"""
509 def __init__(self, method):
510 self.method = method
512 def __get__(self, instance, owner):
513 if instance is None:
514 return self.method.__get__(owner)
515 return self.method.__get__(instance)