Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/core.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import os
4import sys
5import warnings
6from copy import copy
7from pathlib import Path
8from types import MappingProxyType
9from typing import IO
10from typing import TYPE_CHECKING
11from typing import Any
12from typing import BinaryIO
13from typing import Generator
14from typing import Literal
15from typing import Mapping
16from typing import Sequence
17from typing import TextIO
18from typing import TypeVar
19from typing import overload
20from urllib.parse import urlsplit
22from fsspec.registry import get_filesystem_class
23from fsspec.spec import AbstractFileSystem
25from upath._compat import FSSpecAccessorShim
26from upath._compat import PathlibPathShim
27from upath._compat import method_and_classmethod
28from upath._compat import str_remove_prefix
29from upath._compat import str_remove_suffix
30from upath._flavour import LazyFlavourDescriptor
31from upath._flavour import upath_get_kwargs_from_url
32from upath._flavour import upath_urijoin
33from upath._protocol import compatible_protocol
34from upath._protocol import get_upath_protocol
35from upath._stat import UPathStatResult
36from upath.registry import get_upath_class
38if TYPE_CHECKING:
39 from urllib.parse import SplitResult
41 if sys.version_info >= (3, 11):
42 from typing import Self
43 else:
44 from typing_extensions import Self
46__all__ = ["UPath"]
49def __getattr__(name):
50 if name == "_UriFlavour":
51 from upath._flavour import default_flavour
53 warnings.warn(
54 "upath.core._UriFlavour should not be used anymore."
55 " Please follow the universal_pathlib==0.2.0 migration guide at"
56 " https://github.com/fsspec/universal_pathlib for more"
57 " information.",
58 DeprecationWarning,
59 stacklevel=2,
60 )
61 return default_flavour
62 elif name == "PT":
63 warnings.warn(
64 "upath.core.PT should not be used anymore."
65 " Please follow the universal_pathlib==0.2.0 migration guide at"
66 " https://github.com/fsspec/universal_pathlib for more"
67 " information.",
68 DeprecationWarning,
69 stacklevel=2,
70 )
71 return TypeVar("PT", bound="UPath")
72 else:
73 raise AttributeError(name)
76_FSSPEC_HAS_WORKING_GLOB = None
79def _check_fsspec_has_working_glob():
80 global _FSSPEC_HAS_WORKING_GLOB
81 from fsspec.implementations.memory import MemoryFileSystem
83 m = type("_M", (MemoryFileSystem,), {"store": {}, "pseudo_dirs": [""]})()
84 m.touch("a.txt")
85 m.touch("f/b.txt")
86 g = _FSSPEC_HAS_WORKING_GLOB = len(m.glob("**/*.txt")) == 2
87 return g
90def _make_instance(cls, args, kwargs):
91 """helper for pickling UPath instances"""
92 return cls(*args, **kwargs)
95_unset: Any = object()
98# accessors are deprecated
99_FSSpecAccessor = FSSpecAccessorShim
102class UPath(PathlibPathShim, Path):
103 __slots__ = (
104 "_protocol",
105 "_storage_options",
106 "_fs_cached",
107 *PathlibPathShim.__missing_py312_slots__,
108 "__drv",
109 "__root",
110 "__parts",
111 )
113 if TYPE_CHECKING:
114 # public
115 anchor: str
116 drive: str
117 parent: Self
118 parents: Sequence[Self]
119 parts: tuple[str, ...]
120 root: str
121 stem: str
122 suffix: str
123 suffixes: list[str]
125 def with_name(self, name: str) -> Self: ...
126 def with_stem(self, stem: str) -> Self: ...
127 def with_suffix(self, suffix: str) -> Self: ...
129 # private attributes
130 _protocol: str
131 _storage_options: dict[str, Any]
132 _fs_cached: AbstractFileSystem
133 _tail: str
135 _protocol_dispatch: bool | None = None
136 _flavour = LazyFlavourDescriptor()
138 if sys.version_info >= (3, 13):
139 parser = _flavour
141 # === upath.UPath constructor =====================================
143 def __new__(
144 cls, *args, protocol: str | None = None, **storage_options: Any
145 ) -> UPath:
146 # fill empty arguments
147 if not args:
148 args = (".",)
150 # create a copy if UPath class
151 part0, *parts = args
152 if not parts and not storage_options and isinstance(part0, cls):
153 return copy(part0)
155 # deprecate 'scheme'
156 if "scheme" in storage_options:
157 warnings.warn(
158 "use 'protocol' kwarg instead of 'scheme'",
159 DeprecationWarning,
160 stacklevel=2,
161 )
162 protocol = storage_options.pop("scheme")
164 # determine the protocol
165 pth_protocol = get_upath_protocol(
166 part0, protocol=protocol, storage_options=storage_options
167 )
168 # determine which UPath subclass to dispatch to
169 if cls._protocol_dispatch or cls._protocol_dispatch is None:
170 upath_cls = get_upath_class(protocol=pth_protocol)
171 if upath_cls is None:
172 raise ValueError(f"Unsupported filesystem: {pth_protocol!r}")
173 else:
174 # user subclasses can request to disable protocol dispatch
175 # by setting MyUPathSubclass._protocol_dispatch to `False`.
176 # This will effectively ignore the registered UPath
177 # implementations and return an instance of MyUPathSubclass.
178 # This can be useful if a subclass wants to extend the UPath
179 # api, and it is fine to rely on the default implementation
180 # for all supported user protocols.
181 upath_cls = cls
183 # create a new instance
184 if cls is UPath:
185 # we called UPath() directly, and want an instance based on the
186 # provided or detected protocol (i.e. upath_cls)
187 obj: UPath = object.__new__(upath_cls)
188 obj._protocol = pth_protocol
190 elif issubclass(cls, upath_cls):
191 # we called a sub- or sub-sub-class of UPath, i.e. S3Path() and the
192 # corresponding upath_cls based on protocol is equal-to or a
193 # parent-of the cls.
194 obj = object.__new__(cls)
195 obj._protocol = pth_protocol
197 elif issubclass(cls, UPath):
198 # we called a subclass of UPath directly, i.e. S3Path() but the
199 # detected protocol would return a non-related UPath subclass, i.e.
200 # S3Path("file:///abc"). This behavior is going to raise an error
201 # in future versions
202 msg_protocol = repr(pth_protocol)
203 if not pth_protocol:
204 msg_protocol += " (empty string)"
205 msg = (
206 f"{cls.__name__!s}(...) detected protocol {msg_protocol!s} and"
207 f" returns a {upath_cls.__name__} instance that isn't a direct"
208 f" subclass of {cls.__name__}. This will raise an exception in"
209 " future universal_pathlib versions. To prevent the issue, use"
210 " UPath(...) to create instances of unrelated protocols or you"
211 f" can instead derive your subclass {cls.__name__!s}(...) from"
212 f" {upath_cls.__name__} or alternatively override behavior via"
213 f" registering the {cls.__name__} implementation with protocol"
214 f" {msg_protocol!s} replacing the default implementation."
215 )
216 warnings.warn(msg, DeprecationWarning, stacklevel=2)
218 obj = object.__new__(upath_cls)
219 obj._protocol = pth_protocol
221 upath_cls.__init__(
222 obj, *args, protocol=pth_protocol, **storage_options
223 ) # type: ignore
225 else:
226 raise RuntimeError("UPath.__new__ expected cls to be subclass of UPath")
228 return obj
230 def __init__(
231 self, *args, protocol: str | None = None, **storage_options: Any
232 ) -> None:
233 # allow subclasses to customize __init__ arg parsing
234 base_options = getattr(self, "_storage_options", {})
235 args, protocol, storage_options = type(self)._transform_init_args(
236 args, protocol or self._protocol, {**base_options, **storage_options}
237 )
238 if self._protocol != protocol and protocol:
239 self._protocol = protocol
241 # retrieve storage_options
242 if args:
243 args0 = args[0]
244 if isinstance(args0, UPath):
245 self._storage_options = {**args0.storage_options, **storage_options}
246 else:
247 if hasattr(args0, "__fspath__"):
248 _args0 = args0.__fspath__()
249 else:
250 _args0 = str(args0)
251 self._storage_options = type(self)._parse_storage_options(
252 _args0, protocol, storage_options
253 )
254 else:
255 self._storage_options = storage_options.copy()
257 # check that UPath subclasses in args are compatible
258 # TODO:
259 # Future versions of UPath could verify that storage_options
260 # can be combined between UPath instances. Not sure if this
261 # is really necessary though. A warning might be enough...
262 if not compatible_protocol(self._protocol, *args):
263 raise ValueError("can't combine incompatible UPath protocols")
265 # fill ._raw_paths
266 if hasattr(self, "_raw_paths"):
267 return
268 super().__init__(*args)
270 # === upath.UPath PUBLIC ADDITIONAL API ===========================
272 @property
273 def protocol(self) -> str:
274 """The fsspec protocol for the path."""
275 return self._protocol
277 @property
278 def storage_options(self) -> Mapping[str, Any]:
279 """The fsspec storage options for the path."""
280 return MappingProxyType(self._storage_options)
282 @property
283 def fs(self) -> AbstractFileSystem:
284 """The cached fsspec filesystem instance for the path."""
285 try:
286 return self._fs_cached
287 except AttributeError:
288 fs = self._fs_cached = self._fs_factory(
289 str(self), self.protocol, self.storage_options
290 )
291 return fs
293 @property
294 def path(self) -> str:
295 """The path that a fsspec filesystem can use."""
296 return super().__str__()
298 def joinuri(self, uri: str | os.PathLike[str]) -> UPath:
299 """Join with urljoin behavior for UPath instances"""
300 # short circuit if the new uri uses a different protocol
301 other_protocol = get_upath_protocol(uri)
302 if other_protocol and other_protocol != self._protocol:
303 return UPath(uri)
304 return UPath(
305 upath_urijoin(str(self), str(uri)),
306 protocol=other_protocol or self._protocol,
307 **self.storage_options,
308 )
310 # === upath.UPath CUSTOMIZABLE API ================================
312 @classmethod
313 def _transform_init_args(
314 cls,
315 args: tuple[str | os.PathLike, ...],
316 protocol: str,
317 storage_options: dict[str, Any],
318 ) -> tuple[tuple[str | os.PathLike, ...], str, dict[str, Any]]:
319 """allow customization of init args in subclasses"""
320 return args, protocol, storage_options
322 @classmethod
323 def _parse_storage_options(
324 cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any]
325 ) -> dict[str, Any]:
326 """Parse storage_options from the urlpath"""
327 pth_storage_options = upath_get_kwargs_from_url(urlpath)
328 return {**pth_storage_options, **storage_options}
330 @classmethod
331 def _fs_factory(
332 cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any]
333 ) -> AbstractFileSystem:
334 """Instantiate the filesystem_spec filesystem class"""
335 fs_cls = get_filesystem_class(protocol)
336 so_dct = fs_cls._get_kwargs_from_urls(urlpath)
337 so_dct.update(storage_options)
338 return fs_cls(**storage_options)
340 # === upath.UPath COMPATIBILITY API ===============================
342 def __init_subclass__(cls, **kwargs):
343 """provide a clean migration path for custom user subclasses"""
345 # Check if the user subclass has a custom `__new__` method
346 has_custom_new_method = (
347 cls.__new__ is not UPath.__new__
348 and cls.__name__ not in {"PosixUPath", "WindowsUPath"}
349 )
351 if has_custom_new_method and cls._protocol_dispatch is None:
352 warnings.warn(
353 "Detected a customized `__new__` method in subclass"
354 f" {cls.__name__!r}. Protocol dispatch will be disabled"
355 " for this subclass. Please follow the"
356 " universal_pathlib==0.2.0 migration guide at"
357 " https://github.com/fsspec/universal_pathlib for more"
358 " information.",
359 DeprecationWarning,
360 stacklevel=2,
361 )
362 cls._protocol_dispatch = False
364 # Check if the user subclass has defined a custom accessor class
365 accessor_cls = getattr(cls, "_default_accessor", None)
367 has_custom_legacy_accessor = (
368 accessor_cls is not None
369 and issubclass(accessor_cls, FSSpecAccessorShim)
370 and accessor_cls is not FSSpecAccessorShim
371 )
372 has_customized_fs_instantiation = (
373 accessor_cls.__init__ is not FSSpecAccessorShim.__init__
374 or hasattr(accessor_cls, "_fs")
375 )
377 if has_custom_legacy_accessor and has_customized_fs_instantiation:
378 warnings.warn(
379 "Detected a customized `__init__` method or `_fs` attribute"
380 f" in the provided `_FSSpecAccessor` subclass of {cls.__name__!r}."
381 " It is recommended to instead override the `UPath._fs_factory`"
382 " classmethod to customize filesystem instantiation. Please follow"
383 " the universal_pathlib==0.2.0 migration guide at"
384 " https://github.com/fsspec/universal_pathlib for more"
385 " information.",
386 DeprecationWarning,
387 stacklevel=2,
388 )
390 def _fs_factory(
391 cls_, urlpath: str, protocol: str, storage_options: Mapping[str, Any]
392 ) -> AbstractFileSystem:
393 url = urlsplit(urlpath)
394 if protocol:
395 url = url._replace(scheme=protocol)
396 inst = cls_._default_accessor(url, **storage_options)
397 return inst._fs
399 def _parse_storage_options(
400 cls_, urlpath: str, protocol: str, storage_options: Mapping[str, Any]
401 ) -> dict[str, Any]:
402 url = urlsplit(urlpath)
403 if protocol:
404 url = url._replace(scheme=protocol)
405 inst = cls_._default_accessor(url, **storage_options)
406 return inst._fs.storage_options
408 cls._fs_factory = classmethod(_fs_factory)
409 cls._parse_storage_options = classmethod(_parse_storage_options)
411 @property
412 def _path(self):
413 warnings.warn(
414 "UPath._path is deprecated and should not be used."
415 " Please follow the universal_pathlib==0.2.0 migration guide at"
416 " https://github.com/fsspec/universal_pathlib for more"
417 " information.",
418 DeprecationWarning,
419 stacklevel=2,
420 )
421 return self.path
423 @property
424 def _kwargs(self):
425 warnings.warn(
426 "UPath._kwargs is deprecated. Please use"
427 " UPath.storage_options instead. Follow the"
428 " universal_pathlib==0.2.0 migration guide at"
429 " https://github.com/fsspec/universal_pathlib for more"
430 " information.",
431 DeprecationWarning,
432 stacklevel=2,
433 )
434 return self.storage_options
436 @property
437 def _url(self) -> SplitResult:
438 # TODO:
439 # _url should be deprecated, but for now there is no good way of
440 # accessing query parameters from urlpaths...
441 return urlsplit(self.as_posix())
443 if not TYPE_CHECKING:
444 # allow mypy to catch missing attributes
446 def __getattr__(self, item):
447 if item == "_accessor":
448 warnings.warn(
449 "UPath._accessor is deprecated. Please use"
450 " UPath.fs instead. Follow the"
451 " universal_pathlib==0.2.0 migration guide at"
452 " https://github.com/fsspec/universal_pathlib for more"
453 " information.",
454 DeprecationWarning,
455 stacklevel=2,
456 )
457 if hasattr(self, "_default_accessor"):
458 accessor_cls = self._default_accessor
459 else:
460 accessor_cls = FSSpecAccessorShim
461 return accessor_cls.from_path(self)
462 else:
463 raise AttributeError(item)
465 @classmethod
466 def _from_parts(cls, parts, **kwargs):
467 warnings.warn(
468 "UPath._from_parts is deprecated and should not be used."
469 " Please follow the universal_pathlib==0.2.0 migration guide at"
470 " https://github.com/fsspec/universal_pathlib for more"
471 " information.",
472 DeprecationWarning,
473 stacklevel=2,
474 )
475 parsed_url = kwargs.pop("url", None)
476 if parsed_url:
477 if protocol := parsed_url.scheme:
478 kwargs["protocol"] = protocol
479 if netloc := parsed_url.netloc:
480 kwargs["netloc"] = netloc
481 obj = UPath.__new__(cls, parts, **kwargs)
482 obj.__init__(*parts, **kwargs)
483 return obj
485 @classmethod
486 def _parse_args(cls, args):
487 warnings.warn(
488 "UPath._parse_args is deprecated and should not be used."
489 " Please follow the universal_pathlib==0.2.0 migration guide at"
490 " https://github.com/fsspec/universal_pathlib for more"
491 " information.",
492 DeprecationWarning,
493 stacklevel=2,
494 )
495 # TODO !!!
496 pth = cls._flavour.join(*args)
497 return cls._parse_path(pth)
499 @property
500 def _drv(self):
501 # direct access to ._drv should emit a warning,
502 # but there is no good way of doing this for now...
503 try:
504 return self.__drv
505 except AttributeError:
506 self._load_parts()
507 return self.__drv
509 @_drv.setter
510 def _drv(self, value):
511 self.__drv = value
513 @property
514 def _root(self):
515 # direct access to ._root should emit a warning,
516 # but there is no good way of doing this for now...
517 try:
518 return self.__root
519 except AttributeError:
520 self._load_parts()
521 return self.__root
523 @_root.setter
524 def _root(self, value):
525 self.__root = value
527 @property
528 def _parts(self):
529 # UPath._parts is not used anymore, and not available
530 # in pathlib.Path for Python 3.12 and later.
531 # Direct access to ._parts should emit a deprecation warning,
532 # but there is no good way of doing this for now...
533 try:
534 return self.__parts
535 except AttributeError:
536 self._load_parts()
537 self.__parts = super().parts
538 return list(self.__parts)
540 @_parts.setter
541 def _parts(self, value):
542 self.__parts = value
544 @property
545 def _cparts(self):
546 # required for pathlib.Path.__eq__ compatibility on Python <3.12
547 return self.parts
549 # === pathlib.PurePath ============================================
551 def __reduce__(self):
552 args = tuple(self._raw_paths)
553 kwargs = {
554 "protocol": self._protocol,
555 **self._storage_options,
556 }
557 return _make_instance, (type(self), args, kwargs)
559 def with_segments(self, *pathsegments: str | os.PathLike[str]) -> Self:
560 return type(self)(
561 *pathsegments,
562 protocol=self._protocol,
563 **self._storage_options,
564 )
566 def joinpath(self, *pathsegments: str | os.PathLike[str]) -> Self:
567 return self.with_segments(self, *pathsegments)
569 def __truediv__(self, key: str | os.PathLike[str]) -> Self:
570 try:
571 return self.joinpath(key)
572 except TypeError:
573 return NotImplemented
575 def __rtruediv__(self, key: str | os.PathLike[str]) -> Self:
576 try:
577 return self.with_segments(key, self)
578 except TypeError:
579 return NotImplemented
581 # === upath.UPath non-standard changes ============================
583 # NOTE:
584 # this is a classmethod on the parent class, but we need to
585 # override it here to make it possible to provide the _flavour
586 # with the correct protocol...
587 # pathlib 3.12 never calls this on the class. Only on the instance.
588 @method_and_classmethod
589 def _parse_path(self_or_cls, path): # noqa: B902
590 if isinstance(self_or_cls, type):
591 warnings.warn(
592 "UPath._parse_path should not be used as a classmethod."
593 " Please file an issue on the universal_pathlib issue tracker"
594 " and describe your use case.",
595 DeprecationWarning,
596 stacklevel=2,
597 )
598 flavour = self_or_cls._flavour
600 if flavour.supports_empty_parts:
601 drv, root, rel = flavour.splitroot(path)
602 if not root:
603 parsed = []
604 else:
605 parsed = list(map(sys.intern, rel.split(flavour.sep)))
606 if parsed[-1] == ".":
607 parsed[-1] = ""
608 parsed = [x for x in parsed if x != "."]
609 if not flavour.has_meaningful_trailing_slash and parsed[-1] == "":
610 parsed.pop()
611 return drv, root, parsed
612 if not path:
613 return "", "", []
614 sep = flavour.sep
615 altsep = flavour.altsep
616 if altsep:
617 path = path.replace(altsep, sep)
618 drv, root, rel = flavour.splitroot(path)
619 if not root and drv.startswith(sep) and not drv.endswith(sep):
620 drv_parts = drv.split(sep)
621 if len(drv_parts) == 4 and drv_parts[2] not in "?.":
622 # e.g. //server/share
623 root = sep
624 elif len(drv_parts) == 6:
625 # e.g. //?/unc/server/share
626 root = sep
627 parsed = [sys.intern(str(x)) for x in rel.split(sep) if x and x != "."]
628 return drv, root, parsed
630 @method_and_classmethod
631 def _format_parsed_parts(self_or_cls, drv, root, tail, **kwargs): # noqa: B902
632 if isinstance(self_or_cls, type):
633 warnings.warn(
634 "UPath._format_parsed_path should not be used as a classmethod."
635 " Please file an issue on the universal_pathlib issue tracker"
636 " and describe your use case.",
637 DeprecationWarning,
638 stacklevel=2,
639 )
640 flavour = self_or_cls._flavour
642 if kwargs:
643 warnings.warn(
644 "UPath._format_parsed_parts should not be used with"
645 " additional kwargs. Please follow the"
646 " universal_pathlib==0.2.0 migration guide at"
647 " https://github.com/fsspec/universal_pathlib for more"
648 " information.",
649 DeprecationWarning,
650 stacklevel=2,
651 )
652 if "url" in kwargs and tail[:1] == [f"{drv}{root}"]:
653 # This was called from code that expected py38-py311 behavior
654 # of _format_parsed_parts, which takes drv, root and parts
655 tail = tail[1:]
657 if drv or root:
658 return drv + root + flavour.sep.join(tail)
659 elif tail and flavour.splitdrive(tail[0])[0]:
660 tail = ["."] + tail
661 return flavour.sep.join(tail)
663 # === upath.UPath changes =========================================
665 def __str__(self):
666 if self._protocol:
667 return f"{self._protocol}://{self.path}"
668 else:
669 return self.path
671 def __fspath__(self):
672 msg = (
673 "in a future version of UPath this will be set to None"
674 " unless the filesystem is local (or caches locally)"
675 )
676 warnings.warn(msg, PendingDeprecationWarning, stacklevel=2)
677 return str(self)
679 def __bytes__(self):
680 msg = (
681 "in a future version of UPath this will be set to None"
682 " unless the filesystem is local (or caches locally)"
683 )
684 warnings.warn(msg, PendingDeprecationWarning, stacklevel=2)
685 return os.fsencode(self)
687 def as_uri(self) -> str:
688 return str(self)
690 def is_reserved(self) -> bool:
691 return False
693 def __eq__(self, other: object) -> bool:
694 """UPaths are considered equal if their protocol, path and
695 storage_options are equal."""
696 if not isinstance(other, UPath):
697 return NotImplemented
698 return (
699 self.path == other.path
700 and self.protocol == other.protocol
701 and self.storage_options == other.storage_options
702 )
704 def __hash__(self) -> int:
705 """The returned hash is based on the protocol and path only.
707 Note: in the future, if hash collisions become an issue, we
708 can add `fsspec.utils.tokenize(storage_options)`
709 """
710 return hash((self.protocol, self.path))
712 def relative_to( # type: ignore[override]
713 self,
714 other,
715 /,
716 *_deprecated,
717 walk_up=False,
718 ) -> Self:
719 if isinstance(other, UPath) and self.storage_options != other.storage_options:
720 raise ValueError(
721 "paths have different storage_options:"
722 f" {self.storage_options!r} != {other.storage_options!r}"
723 )
724 return super().relative_to(other, *_deprecated, walk_up=walk_up)
726 def is_relative_to(self, other, /, *_deprecated) -> bool: # type: ignore[override]
727 if isinstance(other, UPath) and self.storage_options != other.storage_options:
728 return False
729 return super().is_relative_to(other, *_deprecated)
731 @property
732 def name(self) -> str:
733 tail = self._tail
734 if not tail:
735 return ""
736 name = tail[-1]
737 if not name and len(tail) >= 2:
738 return tail[-2]
739 else:
740 return name
742 # === pathlib.Path ================================================
744 def stat( # type: ignore[override]
745 self,
746 *,
747 follow_symlinks=True,
748 ) -> UPathStatResult:
749 if not follow_symlinks:
750 warnings.warn(
751 f"{type(self).__name__}.stat(follow_symlinks=False):"
752 " is currently ignored.",
753 UserWarning,
754 stacklevel=2,
755 )
756 return UPathStatResult.from_info(self.fs.stat(self.path))
758 def lstat(self) -> UPathStatResult: # type: ignore[override]
759 return self.stat(follow_symlinks=False)
761 def exists(self, *, follow_symlinks=True) -> bool:
762 return self.fs.exists(self.path)
764 def is_dir(self) -> bool:
765 return self.fs.isdir(self.path)
767 def is_file(self) -> bool:
768 return self.fs.isfile(self.path)
770 def is_mount(self) -> bool:
771 return False
773 def is_symlink(self) -> bool:
774 try:
775 info = self.fs.info(self.path)
776 if "islink" in info:
777 return bool(info["islink"])
778 except FileNotFoundError:
779 return False
780 return False
782 def is_junction(self) -> bool:
783 return False
785 def is_block_device(self) -> bool:
786 return False
788 def is_char_device(self) -> bool:
789 return False
791 def is_fifo(self) -> bool:
792 return False
794 def is_socket(self) -> bool:
795 return False
797 def samefile(self, other_path) -> bool:
798 st = self.stat()
799 if isinstance(other_path, UPath):
800 other_st = other_path.stat()
801 else:
802 other_st = self.with_segments(other_path).stat()
803 return st == other_st
805 @overload # type: ignore[override]
806 def open(
807 self,
808 mode: Literal["r", "w", "a"] = "r",
809 buffering: int = ...,
810 encoding: str = ...,
811 errors: str = ...,
812 newline: str = ...,
813 **fsspec_kwargs: Any,
814 ) -> TextIO: ...
816 @overload
817 def open( # type: ignore[override]
818 self,
819 mode: Literal["rb", "wb", "ab"],
820 buffering: int = ...,
821 encoding: str = ...,
822 errors: str = ...,
823 newline: str = ...,
824 **fsspec_kwargs: Any,
825 ) -> BinaryIO: ...
827 def open(
828 self,
829 mode: str = "r",
830 *args: Any,
831 **fsspec_kwargs: Any,
832 ) -> IO[Any]:
833 """
834 Open the file pointed by this path and return a file object, as
835 the built-in open() function does.
837 Parameters
838 ----------
839 mode:
840 Opening mode. Default is 'r'.
841 buffering:
842 Default is the block size of the underlying fsspec filesystem.
843 encoding:
844 Encoding is only used in text mode. Default is None.
845 errors:
846 Error handling for encoding. Only used in text mode. Default is None.
847 newline:
848 Newline handling. Only used in text mode. Default is None.
849 **fsspec_kwargs:
850 Additional options for the fsspec filesystem.
851 """
852 # match the signature of pathlib.Path.open()
853 for key, value in zip(["buffering", "encoding", "errors", "newline"], args):
854 if key in fsspec_kwargs:
855 raise TypeError(
856 f"{type(self).__name__}.open() got multiple values for '{key}'"
857 )
858 fsspec_kwargs[key] = value
859 # translate pathlib buffering to fs block_size
860 if "buffering" in fsspec_kwargs:
861 fsspec_kwargs.setdefault("block_size", fsspec_kwargs.pop("buffering"))
862 return self.fs.open(self.path, mode=mode, **fsspec_kwargs)
864 def iterdir(self) -> Generator[UPath, None, None]:
865 for name in self.fs.listdir(self.path):
866 # fsspec returns dictionaries
867 if isinstance(name, dict):
868 name = name.get("name")
869 if name in {".", ".."}:
870 # Yielding a path object for these makes little sense
871 continue
872 # only want the path name with iterdir
873 _, _, name = str_remove_suffix(name, "/").rpartition(self._flavour.sep)
874 yield self.with_segments(*self.parts, name)
876 def _scandir(self):
877 raise NotImplementedError # todo
879 def _make_child_relpath(self, name):
880 path = super()._make_child_relpath(name)
881 del path._str # fix _str = str(self) assignment
882 return path
884 def glob(
885 self, pattern: str, *, case_sensitive=None
886 ) -> Generator[UPath, None, None]:
887 path_pattern = self.joinpath(pattern).path
888 sep = self._flavour.sep
889 base = self.fs._strip_protocol(self.path)
890 for name in self.fs.glob(path_pattern):
891 name = str_remove_prefix(str_remove_prefix(name, base), sep)
892 yield self.joinpath(name)
894 def rglob(
895 self, pattern: str, *, case_sensitive=None
896 ) -> Generator[UPath, None, None]:
897 if _FSSPEC_HAS_WORKING_GLOB is None:
898 _check_fsspec_has_working_glob()
900 if _FSSPEC_HAS_WORKING_GLOB:
901 r_path_pattern = self.joinpath("**", pattern).path
902 sep = self._flavour.sep
903 base = self.fs._strip_protocol(self.path)
904 for name in self.fs.glob(r_path_pattern):
905 name = str_remove_prefix(str_remove_prefix(name, base), sep)
906 yield self.joinpath(name)
908 else:
909 path_pattern = self.joinpath(pattern).path
910 r_path_pattern = self.joinpath("**", pattern).path
911 sep = self._flavour.sep
912 base = self.fs._strip_protocol(self.path)
913 seen = set()
914 for p in (path_pattern, r_path_pattern):
915 for name in self.fs.glob(p):
916 name = str_remove_prefix(str_remove_prefix(name, base), sep)
917 if name in seen:
918 continue
919 else:
920 seen.add(name)
921 yield self.joinpath(name)
923 @classmethod
924 def cwd(cls) -> UPath:
925 if cls is UPath:
926 return get_upath_class("").cwd() # type: ignore[union-attr]
927 else:
928 raise NotImplementedError
930 @classmethod
931 def home(cls) -> UPath:
932 if cls is UPath:
933 return get_upath_class("").home() # type: ignore[union-attr]
934 else:
935 raise NotImplementedError
937 def absolute(self) -> Self:
938 return self
940 def is_absolute(self) -> bool:
941 return self._flavour.isabs(str(self))
943 def resolve(self, strict: bool = False) -> Self:
944 _parts = self.parts
946 # Do not attempt to normalize path if no parts are dots
947 if ".." not in _parts and "." not in _parts:
948 return self
950 resolved: list[str] = []
951 resolvable_parts = _parts[1:]
952 for part in resolvable_parts:
953 if part == "..":
954 if resolved:
955 resolved.pop()
956 elif part != ".":
957 resolved.append(part)
959 return self.with_segments(*_parts[:1], *resolved)
961 def owner(self) -> str:
962 raise NotImplementedError
964 def group(self) -> str:
965 raise NotImplementedError
967 def readlink(self) -> Self:
968 raise NotImplementedError
970 def touch(self, mode=0o666, exist_ok=True) -> None:
971 exists = self.fs.exists(self.path)
972 if exists and not exist_ok:
973 raise FileExistsError(str(self))
974 if not exists:
975 self.fs.touch(self.path, truncate=True)
976 else:
977 try:
978 self.fs.touch(self.path, truncate=False)
979 except (NotImplementedError, ValueError):
980 pass # unsupported by filesystem
982 def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None:
983 if parents and not exist_ok and self.exists():
984 raise FileExistsError(str(self))
985 try:
986 self.fs.mkdir(
987 self.path,
988 create_parents=parents,
989 mode=mode,
990 )
991 except FileExistsError:
992 if not exist_ok:
993 raise FileExistsError(str(self))
994 if not self.is_dir():
995 raise FileExistsError(str(self))
997 def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None:
998 raise NotImplementedError
1000 def lchmod(self, mode: int) -> None:
1001 raise NotImplementedError
1003 def unlink(self, missing_ok: bool = False) -> None:
1004 if not self.exists():
1005 if not missing_ok:
1006 raise FileNotFoundError(str(self))
1007 return
1008 self.fs.rm(self.path, recursive=False)
1010 def rmdir(self, recursive: bool = True) -> None: # fixme: non-standard
1011 if not self.is_dir():
1012 raise NotADirectoryError(str(self))
1013 if not recursive and next(self.iterdir()): # type: ignore[arg-type]
1014 raise OSError(f"Not recursive and directory not empty: {self}")
1015 self.fs.rm(self.path, recursive=recursive)
1017 def rename(
1018 self,
1019 target: str | os.PathLike[str] | UPath,
1020 *, # note: non-standard compared to pathlib
1021 recursive: bool = _unset,
1022 maxdepth: int | None = _unset,
1023 **kwargs: Any,
1024 ) -> Self:
1025 if isinstance(target, str) and self.storage_options:
1026 target = UPath(target, **self.storage_options)
1027 target_protocol = get_upath_protocol(target)
1028 if target_protocol:
1029 if target_protocol != self.protocol:
1030 raise ValueError(
1031 f"expected protocol {self.protocol!r}, got: {target_protocol!r}"
1032 )
1033 if not isinstance(target, UPath):
1034 target_ = UPath(target, **self.storage_options)
1035 else:
1036 target_ = target
1037 # avoid calling .resolve for subclasses of UPath
1038 if ".." in target_.parts or "." in target_.parts:
1039 target_ = target_.resolve()
1040 else:
1041 parent = self.parent
1042 # avoid calling .resolve for subclasses of UPath
1043 if ".." in parent.parts or "." in parent.parts:
1044 parent = parent.resolve()
1045 target_ = parent.joinpath(os.path.normpath(target))
1046 assert isinstance(target_, type(self)), "identical protocols enforced above"
1047 if recursive is not _unset:
1048 kwargs["recursive"] = recursive
1049 if maxdepth is not _unset:
1050 kwargs["maxdepth"] = maxdepth
1051 self.fs.mv(
1052 self.path,
1053 target_.path,
1054 **kwargs,
1055 )
1056 return target_
1058 def replace(self, target: str | os.PathLike[str] | UPath) -> UPath:
1059 raise NotImplementedError # todo
1061 def symlink_to( # type: ignore[override]
1062 self,
1063 target: str | os.PathLike[str] | UPath,
1064 target_is_directory: bool = False,
1065 ) -> None:
1066 raise NotImplementedError
1068 def hardlink_to( # type: ignore[override]
1069 self,
1070 target: str | os.PathLike[str] | UPath,
1071 ) -> None:
1072 raise NotImplementedError
1074 def expanduser(self) -> Self:
1075 return self