Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/core.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""upath.core module: UPath base class implementation"""
3from __future__ import annotations
5import sys
6import warnings
7from abc import ABCMeta
8from abc import abstractmethod
9from collections.abc import Iterator
10from collections.abc import Mapping
11from collections.abc import Sequence
12from copy import copy
13from pathlib import PurePath
14from types import MappingProxyType
15from typing import IO
16from typing import TYPE_CHECKING
17from typing import Any
18from typing import BinaryIO
19from typing import Literal
20from typing import NoReturn
21from typing import TextIO
22from typing import TypeVar
23from typing import overload
24from urllib.parse import SplitResult
25from urllib.parse import urlsplit
27from fsspec.registry import get_filesystem_class
28from fsspec.spec import AbstractFileSystem
30from upath._chain import DEFAULT_CHAIN_PARSER
31from upath._chain import Chain
32from upath._chain import FSSpecChainParser
33from upath._flavour import LazyFlavourDescriptor
34from upath._flavour import WrappedFileSystemFlavour
35from upath._flavour import upath_get_kwargs_from_url
36from upath._flavour import upath_urijoin
37from upath._info import UPathInfo
38from upath._protocol import compatible_protocol
39from upath._protocol import get_upath_protocol
40from upath._stat import UPathStatResult
41from upath.registry import _get_implementation_protocols
42from upath.registry import available_implementations
43from upath.registry import get_upath_class
44from upath.types import UNSET_DEFAULT
45from upath.types import JoinablePathLike
46from upath.types import OnNameCollisionFunc
47from upath.types import PathInfo
48from upath.types import ReadablePath
49from upath.types import ReadablePathLike
50from upath.types import StatResultType
51from upath.types import SupportsPathLike
52from upath.types import UPathParser
53from upath.types import WritablePath
54from upath.types import WritablePathLike
56if sys.version_info >= (3, 13):
57 from pathlib import UnsupportedOperation
58else:
59 UnsupportedOperation = NotImplementedError
60 """Raised when an unsupported operation is called on a path object."""
62if TYPE_CHECKING:
63 import upath.implementations as _uimpl
65 if sys.version_info >= (3, 11):
66 from typing import Self
67 else:
68 from typing_extensions import Self
70 from pydantic import GetCoreSchemaHandler
71 from pydantic_core.core_schema import CoreSchema
73 _MT = TypeVar("_MT")
74 _WT = TypeVar("_WT", bound="WritablePath")
76__all__ = [
77 "UPath",
78 "UnsupportedOperation",
79]
81_FSSPEC_HAS_WORKING_GLOB = None
84def _check_fsspec_has_working_glob():
85 global _FSSPEC_HAS_WORKING_GLOB
86 from fsspec.implementations.memory import MemoryFileSystem
88 m = type("_M", (MemoryFileSystem,), {"store": {}, "pseudo_dirs": [""]})()
89 m.touch("a.txt")
90 m.touch("f/b.txt")
91 g = _FSSPEC_HAS_WORKING_GLOB = len(m.glob("**/*.txt")) == 2
92 return g
95def _make_instance(cls, args, kwargs):
96 """helper for pickling UPath instances"""
97 # Extract _relative_base if present
98 relative_base = kwargs.pop("_relative_base", None)
99 instance = cls(*args, **kwargs)
100 if relative_base is not None:
101 instance._relative_base = relative_base
102 return instance
105def _buffering2blocksize(mode: str, buffering: int) -> int | None:
106 if not isinstance(buffering, int):
107 raise TypeError("buffering must be an integer")
108 if buffering == 0: # buffering disabled
109 if "b" not in mode: # text mode
110 raise ValueError("can't have unbuffered text I/O")
111 return buffering
112 elif buffering == -1:
113 return None
114 else:
115 return buffering
118def _raise_unsupported(cls_name: str, method: str) -> NoReturn:
119 raise UnsupportedOperation(f"{cls_name}.{method}() is unsupported")
122class _IncompatibleProtocolError(TypeError, ValueError):
123 """switch to TypeError for incompatible protocols in a backward compatible way.
125 !!! Do not use this exception directly !!!
126 Catch TypeError instead, if you need to handle incompatible protocol errors.
128 We'll do the switch in a future major release.
129 """
131 # evil: make this look like a built-in TypeError
132 __module__ = "builtins"
133 __qualname__ = "TypeError"
135 def __repr__(self) -> str:
136 return f"TypeError({', '.join(map(repr, self.args))})"
139class _UPathMeta(ABCMeta):
140 """metaclass for UPath to customize instance creation
142 There are two main reasons for this metaclass:
143 - support copying UPath instances via UPath(existing_upath)
144 - force calling __init__ on instance creation for instances of a non-subclass
145 """
147 if sys.version_info < (3, 11):
148 # pathlib 3.9 and 3.10 supported `Path[str]` but
149 # did not return a GenericAlias but the class itself?
150 def __getitem__(cls, key):
151 return cls
153 def __call__(cls: type[_MT], *args: Any, **kwargs: Any) -> _MT:
154 # create a copy if UPath class
155 try:
156 (arg0,) = args
157 except ValueError:
158 pass
159 else:
160 if isinstance(arg0, UPath) and not kwargs:
161 return copy(arg0) # type: ignore[return-value]
162 # We do this call manually, because cls could be a registered
163 # subclass of UPath that is not directly inheriting from UPath.
164 inst = cls.__new__(cls, *args, **kwargs)
165 inst.__init__(*args, **kwargs) # type: ignore[misc]
166 return inst
169class _UPathMixin(metaclass=_UPathMeta):
170 """Mixin class for UPath to allow sharing some common functionality
171 between UPath and PosixUPath/WindowsUPath.
172 """
174 __slots__ = ()
176 @property
177 @abstractmethod
178 def parser(self) -> UPathParser:
179 """The parser (flavour) for this UPath instance."""
180 raise NotImplementedError
182 @property
183 def _protocol(self) -> str:
184 return self._chain.nest().protocol
186 @_protocol.setter
187 def _protocol(self, value: str) -> None:
188 self._chain = self._chain.replace(protocol=value)
190 @property
191 def _storage_options(self) -> dict[str, Any]:
192 return self._chain.nest().storage_options
194 @_storage_options.setter
195 def _storage_options(self, value: dict[str, Any]) -> None:
196 self._chain = self._chain.replace(storage_options=value)
198 @property
199 @abstractmethod
200 def _chain(self) -> Chain:
201 raise NotImplementedError
203 @_chain.setter
204 @abstractmethod
205 def _chain(self, value: Chain) -> None:
206 raise NotImplementedError
208 @property
209 @abstractmethod
210 def _chain_parser(self) -> FSSpecChainParser:
211 raise NotImplementedError
213 @_chain_parser.setter
214 @abstractmethod
215 def _chain_parser(self, value: FSSpecChainParser) -> None:
216 raise NotImplementedError
218 @property
219 @abstractmethod
220 def _fs_cached(self) -> AbstractFileSystem:
221 raise NotImplementedError
223 @_fs_cached.setter
224 def _fs_cached(self, value: AbstractFileSystem):
225 raise NotImplementedError
227 @property
228 @abstractmethod
229 def _raw_urlpaths(self) -> Sequence[JoinablePathLike]:
230 raise NotImplementedError
232 @_raw_urlpaths.setter
233 def _raw_urlpaths(self, value: Sequence[JoinablePathLike]) -> None:
234 raise NotImplementedError
236 @property
237 @abstractmethod
238 def _relative_base(self) -> str | None:
239 raise NotImplementedError
241 @_relative_base.setter
242 def _relative_base(self, value: str | None) -> None:
243 raise NotImplementedError
245 # === upath.UPath PUBLIC ADDITIONAL API ===========================
247 @property
248 def protocol(self) -> str:
249 """The fsspec protocol for the path.
251 Note
252 ----
253 Protocols are linked to upath and fsspec filesystems via the
254 `upath.registry` and `fsspec.registry` modules. They basically
255 represent the URI scheme used for the specific filesystem.
257 Examples
258 --------
259 >>> from upath import UPath
260 >>> p0 = UPath("s3://my-bucket/path/to/file.txt")
261 >>> p0.protocol
262 's3'
263 >>> p1 = UPath("/foo/bar/baz.txt", protocol="memory")
264 >>> p1.protocol
265 'memory'
267 """
268 return self._protocol
270 @property
271 def storage_options(self) -> Mapping[str, Any]:
272 """The read-only fsspec storage options for the path.
274 Note
275 ----
276 Storage options are specific to each fsspec filesystem and
277 can include parameters such as authentication credentials,
278 connection settings, and other options that affect how the
279 filesystem interacts with the underlying storage.
281 Examples
282 --------
283 >>> from upath import UPath
284 >>> p = UPath("s3://my-bucket/path/to/file.txt", anon=True)
285 >>> p.storage_options['anon']
286 True
288 """
289 return MappingProxyType(self._storage_options)
291 @property
292 def fs(self) -> AbstractFileSystem:
293 """The cached fsspec filesystem instance for the path.
295 This is the underlying fsspec filesystem instance. It's
296 instantiated on first filesystem access and cached. Can
297 be used to access fsspec-specific functionality not exposed
298 by the UPath API.
300 Examples
301 --------
302 >>> from upath import UPath
303 >>> p = UPath("s3://my-bucket/path/to/file.txt")
304 >>> p.fs
305 <s3fs.core.S3FileSystem object at 0x...>
306 >>> p.fs.get_tags(p.path)
307 {'VersionId': 'null', 'ContentLength': 12345, ...}
309 """
310 try:
311 return self._fs_cached
312 except AttributeError:
313 fs = self._fs_cached = self._fs_factory(
314 str(self), self.protocol, self.storage_options
315 )
316 return fs
318 @property
319 def path(self) -> str:
320 """The path used by fsspec filesystem.
322 FSSpec filesystems usually handle paths stripped of protocol.
323 This property returns the path suitable for use with the
324 underlying fsspec filesystem. It guarantees that a filesystem's
325 strip_protocol method is applied correctly.
327 Examples
328 --------
329 >>> from upath import UPath
330 >>> p = UPath("memory:///foo/bar.txt")
331 >>> str(p)
332 'memory:///foo/bar.txt'
333 >>> p.path
334 '/foo/bar.txt'
335 >>> p.fs.exists(p.path)
336 True
338 """
339 if self._relative_base is not None:
340 try:
341 # For relative paths, we need to resolve to absolute path
342 current_dir = self.cwd() # type: ignore[attr-defined]
343 except NotImplementedError:
344 raise UnsupportedOperation(
345 f"fsspec paths can not be relative and"
346 f" {type(self).__name__}.cwd() is unsupported"
347 ) from None
348 # Join the current directory with the relative path
349 if (self_path := str(self)) == ".":
350 path = str(current_dir)
351 else:
352 path = current_dir.parser.join(str(current_dir), self_path)
353 return self.parser.strip_protocol(path)
354 return self._chain.active_path
356 def joinuri(self, uri: JoinablePathLike) -> UPath:
357 """Join with urljoin behavior for UPath instances.
359 Examples
360 --------
361 >>> from upath import UPath
362 >>> p = UPath("https://example.com/dir/subdir/")
363 >>> p.joinuri("file.txt")
364 HTTPSPath('https://example.com/dir/subdir/file.txt')
365 >>> p.joinuri("/anotherdir/otherfile.txt")
366 HTTPSPath('https://example.com/anotherdir/otherfile.txt')
367 >>> p.joinuri("memory:///foo/bar.txt"
368 MemoryPath('memory:///foo/bar.txt')
370 """
371 # short circuit if the new uri uses a different protocol
372 other_protocol = get_upath_protocol(uri)
373 if other_protocol and other_protocol != self._protocol:
374 return UPath(uri)
375 return UPath(
376 upath_urijoin(str(self), str(uri)),
377 protocol=other_protocol or self._protocol,
378 **self.storage_options,
379 )
381 # === upath.UPath CUSTOMIZABLE API ================================
383 @classmethod
384 def _transform_init_args(
385 cls,
386 args: tuple[JoinablePathLike, ...],
387 protocol: str,
388 storage_options: dict[str, Any],
389 ) -> tuple[tuple[JoinablePathLike, ...], str, dict[str, Any]]:
390 """allow customization of init args in subclasses"""
391 return args, protocol, storage_options
393 @classmethod
394 def _parse_storage_options(
395 cls,
396 urlpath: str,
397 protocol: str,
398 storage_options: Mapping[str, Any],
399 ) -> dict[str, Any]:
400 """Parse storage_options from the urlpath"""
401 pth_storage_options = upath_get_kwargs_from_url(urlpath)
402 return {**pth_storage_options, **storage_options}
404 @classmethod
405 def _fs_factory(
406 cls,
407 urlpath: str,
408 protocol: str,
409 storage_options: Mapping[str, Any],
410 ) -> AbstractFileSystem:
411 """Instantiate the filesystem_spec filesystem class"""
412 fs_cls = get_filesystem_class(protocol)
413 return fs_cls(**storage_options)
415 # === upath.UPath constructor =====================================
417 _protocol_dispatch: bool | None = None
419 def __new__( # noqa C901
420 cls,
421 *args: JoinablePathLike,
422 protocol: str | None = None,
423 chain_parser: FSSpecChainParser = DEFAULT_CHAIN_PARSER,
424 **storage_options: Any,
425 ) -> UPath:
426 # narrow type
427 if not issubclass(cls, UPath):
428 raise TypeError("UPath.__new__ can't instantiate non-UPath classes")
430 # deprecate 'scheme'
431 if "scheme" in storage_options:
432 warnings.warn(
433 "use 'protocol' kwarg instead of 'scheme'",
434 DeprecationWarning,
435 stacklevel=2,
436 )
437 protocol = storage_options.pop("scheme")
439 # determine the protocol
440 try:
441 pth_protocol = get_upath_protocol(
442 args[0] if args else "",
443 protocol=protocol,
444 storage_options=storage_options,
445 )
446 except ValueError as e:
447 if "incompatible with" in str(e):
448 raise _IncompatibleProtocolError(str(e)) from e
449 raise
451 # subclasses should default to their own protocol
452 if protocol is None and cls is not UPath:
453 impl_protocols = _get_implementation_protocols(cls)
454 if not pth_protocol and impl_protocols:
455 pth_protocol = impl_protocols[0]
456 elif pth_protocol and pth_protocol not in impl_protocols:
457 msg_protocol = pth_protocol
458 if not pth_protocol:
459 msg_protocol = "'' (empty string)"
460 msg = (
461 f"{cls.__name__!s}(...) detected protocol {msg_protocol!s}"
462 f" which is incompatible with {cls.__name__}."
463 )
464 if not pth_protocol or pth_protocol not in available_implementations():
465 msg += (
466 " Did you forget to register the subclass for this protocol"
467 " with upath.registry.register_implementation()?"
468 )
469 raise _IncompatibleProtocolError(msg)
471 # determine which UPath subclass to dispatch to
472 upath_cls: type[UPath] | None
473 if cls._protocol_dispatch or cls._protocol_dispatch is None:
474 upath_cls = get_upath_class(protocol=pth_protocol)
475 if upath_cls is None:
476 raise ValueError(f"Unsupported filesystem: {pth_protocol!r}")
477 else:
478 # user subclasses can request to disable protocol dispatch
479 # by setting MyUPathSubclass._protocol_dispatch to `False`.
480 # This will effectively ignore the registered UPath
481 # implementations and return an instance of MyUPathSubclass.
482 # This be useful if a subclass wants to extend the UPath
483 # api, and it is fine to rely on the default implementation
484 # for all supported user protocols.
485 #
486 # THIS IS DEPRECATED!
487 # Use upath.extensions.ProxyUPath to extend the UPath API
488 warnings.warn(
489 f"{cls.__name__}._protocol_dispatch = False is deprecated and"
490 " will be removed in future universal_pathlib versions."
491 " To extend the UPath API, subclass upath.extensions.ProxyUPath",
492 DeprecationWarning,
493 stacklevel=2,
494 )
495 upath_cls = cls
497 if issubclass(upath_cls, cls):
498 pass
500 elif not issubclass(upath_cls, UPath):
501 raise RuntimeError("UPath.__new__ expected cls to be subclass of UPath")
503 else:
504 msg_protocol = pth_protocol
505 if not pth_protocol:
506 msg_protocol = "'' (empty string)"
507 msg = (
508 f"{cls.__name__!s}(...) detected protocol {msg_protocol!s}"
509 f" which is incompatible with {cls.__name__}."
510 )
511 if (
512 # find a better way
513 (not pth_protocol and cls.__name__ not in ["CloudPath", "LocalPath"])
514 or pth_protocol
515 and pth_protocol not in available_implementations()
516 ):
517 msg += (
518 " Did you forget to register the subclass for this protocol"
519 " with upath.registry.register_implementation()?"
520 )
521 raise _IncompatibleProtocolError(msg)
523 return object.__new__(upath_cls)
525 def __init__(
526 self,
527 *args: JoinablePathLike,
528 protocol: str | None = None,
529 chain_parser: FSSpecChainParser = DEFAULT_CHAIN_PARSER,
530 **storage_options: Any,
531 ) -> None:
532 """Initialize a UPath instance
534 When instantiating a `UPath`, the detected or provided protocol determines
535 the `UPath` subclass that will be instantiated. The protocol is looked up
536 via the `get_upath_protocol` function, which loads the registered `UPath`
537 implementation from the registry. If no `UPath` implementation is found for
538 the detected protocol, but a registered `fsspec` filesystem exists for the
539 protocol, a default dynamically created `UPath` implementation will be used.
541 Parameters
542 ----------
543 *args :
544 The path (or uri) segments to construct the UPath from. The first
545 argument is used to detect the protocol if no protocol is provided.
546 protocol :
547 The protocol to use for the path.
548 chain_parser :
549 A chain parser instance for chained urlpaths. _(experimental)_
550 **storage_options :
551 Additional storage options for the path.
553 """
554 # todo: avoid duplicating this call from __new__
555 protocol = get_upath_protocol(
556 args[0] if args else "",
557 protocol=protocol,
558 storage_options=storage_options,
559 )
560 args, protocol, storage_options = type(self)._transform_init_args(
561 args, protocol, storage_options
562 )
564 # check that UPath subclasses in args are compatible
565 # TODO:
566 # Future versions of UPath could verify that storage_options
567 # can be combined between UPath instances. Not sure if this
568 # is really necessary though. A warning might be enough...
569 if not compatible_protocol(protocol, *args):
570 raise ValueError("can't combine incompatible UPath protocols")
572 # subclasses should default to their own protocol
573 if not protocol:
574 impl_protocols = _get_implementation_protocols(type(self))
575 if impl_protocols:
576 protocol = impl_protocols[0]
578 if args:
579 args0 = args[0]
580 if isinstance(args0, UPath):
581 storage_options = {
582 **args0._chain.nest().storage_options,
583 **storage_options,
584 }
585 str_args0 = args0.__vfspath__()
587 else:
588 if hasattr(args0, "__fspath__") and args0.__fspath__ is not None:
589 str_args0 = args0.__fspath__()
590 elif hasattr(args0, "__vfspath__") and args0.__vfspath__ is not None:
591 str_args0 = args0.__vfspath__()
592 elif isinstance(args0, str):
593 str_args0 = args0
594 else:
595 raise TypeError(
596 "argument should be a UPath, str, "
597 f"or support __vfspath__ or __fspath__, not {type(args0)!r}"
598 )
599 storage_options = type(self)._parse_storage_options(
600 str_args0, protocol, storage_options
601 )
602 else:
603 str_args0 = "."
605 segments = chain_parser.unchain(
606 str_args0,
607 protocol=protocol,
608 storage_options=storage_options,
609 )
610 # FIXME: normalization needs to happen in unchain already...
611 chain = Chain.from_list(Chain.from_list(segments).to_list())
612 if len(args) > 1:
613 flavour = WrappedFileSystemFlavour.from_protocol(chain.active_path_protocol)
614 joined = flavour.join(chain.active_path, *args[1:])
615 stripped = flavour.strip_protocol(joined)
616 chain = chain.replace(path=stripped)
617 self._chain = chain
618 self._chain_parser = chain_parser
619 self._raw_urlpaths = args
620 self._relative_base = None
622 # --- deprecated attributes ---------------------------------------
624 @property
625 def _url(self) -> SplitResult:
626 # TODO:
627 # _url should be deprecated, but for now there is no good way of
628 # accessing query parameters from urlpaths...
629 return urlsplit(self.__str__())
632class UPath(_UPathMixin, WritablePath, ReadablePath):
633 """Base class for pathlike paths backed by an fsspec filesystem.
635 Note
636 ----
637 The following attributes and methods are specific to UPath instances and are not
638 available on pathlib.Path instances.
640 Attributes
641 ----------
642 protocol :
643 The fsspec protocol for the path.
644 storage_options :
645 The fsspec storage options for the path.
646 path :
647 The path that a fsspec filesystem can use.
648 fs :
649 The cached fsspec filesystem instance for the path.
651 Methods
652 -------
653 joinuri(*parts) :
654 Join URI parts to this path.
657 Info
658 ----
659 Below are pathlib attributes and methods available on UPath instances.
661 Attributes
662 ----------
663 drive :
664 The drive component of the path.
665 root :
666 The root component of the path.
667 anchor :
668 The concatenation of the drive and root.
669 parent :
670 The logical parent of the path.
671 parents :
672 An immutable sequence providing access to the logical ancestors of the path.
673 name :
674 The final path component, excluding the drive and root, if any.
675 suffix :
676 The file extension of the final component, if any.
677 suffixes :
678 A list of the path's file extensions.
679 stem :
680 The final path component, without its suffix.
681 info :
682 Filesystem information about the path.
683 parser :
684 The path parser instance for parsing path segments.
686 Methods
687 -------
688 __truediv__(key) :
689 Combine this path with the argument using the `/` operator.
690 __rtruediv__(key) :
691 Combine the argument with this path using the `/` operator.
692 as_posix() :
693 Return the string representation of the path with forward slashes.
694 is_absolute() :
695 Return True if the path is absolute.
696 is_relative_to(other) :
697 Return True if the path is relative to another path.
698 is_reserved() :
699 Return True if the path is reserved under Windows.
700 joinpath(*pathsegments) :
701 Combine this path with one or several arguments, and return a new path.
702 full_match(pattern, *, case_sensitive=None) :
703 Match this path against the provided glob-style pattern.
704 match(pattern, *, case_sensitive=None) :
705 Match this path against the provided glob-style pattern.
706 relative_to(other, walk_up=False) :
707 Return a version of this path relative to another path.
708 with_name(name) :
709 Return a new path with the name changed.
710 with_stem(stem) :
711 Return a new path with the stem changed.
712 with_suffix(suffix) :
713 Return a new path with the suffix changed.
714 with_segments(*pathsegments) :
715 Construct a new path object from any number of path-like objects.
716 from_uri(uri) :
717 Return a new path from the given URI.
718 as_uri() :
719 Return the path as a URI.
720 home() :
721 Return a new path pointing to the user's home directory.
722 expanduser() :
723 Return a new path with expanded `~` constructs.
724 cwd() :
725 Return a new path pointing to the current working directory.
726 absolute() :
727 Make the path absolute, without normalization or resolving symlinks.
728 resolve(strict=False) :
729 Make the path absolute, resolving any symlinks.
730 readlink() :
731 Return the path to which the symbolic link points.
732 stat(*, follow_symlinks=True) :
733 Return the result of the stat() system call on this path.
734 lstat() :
735 Like stat(), but if the path points to a symlink, return the symlink's
736 information.
737 exists(*, follow_symlinks=True) :
738 Return True if the path exists.
739 is_file(*, follow_symlinks=True) :
740 Return True if the path is a regular file.
741 is_dir(*, follow_symlinks=True) :
742 Return True if the path is a directory.
743 is_symlink() :
744 Return True if the path is a symbolic link.
745 is_junction() :
746 Return True if the path is a junction.
747 is_mount() :
748 Return True if the path is a mount point.
749 is_socket() :
750 Return True if the path is a socket.
751 is_fifo() :
752 Return True if the path is a FIFO.
753 is_block_device() :
754 Return True if the path is a block device.
755 is_char_device() :
756 Return True if the path is a character device.
757 samefile(other_path) :
758 Return True if this path points to the same file as other_path.
759 open(mode='r', buffering=-1, encoding=None, errors=None, newline=None) :
760 Open the file pointed to by the path.
761 read_text(encoding=None, errors=None, newline=None) :
762 Open the file in text mode, read it, and close the file.
763 read_bytes() :
764 Open the file in bytes mode, read it, and close the file.
765 write_text(data, encoding=None, errors=None, newline=None) :
766 Open the file in text mode, write to it, and close the file.
767 write_bytes(data) :
768 Open the file in bytes mode, write to it, and close the file.
769 iterdir() :
770 Yield path objects of the directory contents.
771 glob(pattern, *, case_sensitive=None) :
772 Iterate over this subtree and yield all existing files matching the
773 given pattern.
774 rglob(pattern, *, case_sensitive=None) :
775 Recursively yield all existing files matching the given pattern.
776 walk(top_down=True, on_error=None, follow_symlinks=False) :
777 Generate the file names in a directory tree by walking the tree.
778 touch(mode=0o666, exist_ok=True) :
779 Create this file with the given access mode, if it doesn't exist.
780 mkdir(mode=0o777, parents=False, exist_ok=False) :
781 Create a new directory at this given path.
782 symlink_to(target, target_is_directory=False) :
783 Make this path a symbolic link pointing to target.
784 hardlink_to(target) :
785 Make this path a hard link pointing to the same file as target.
786 copy(target, *, follow_symlinks=True, preserve_metadata=False) :
787 Copy the contents of this file to the target file.
788 copy_into(target_dir, *, follow_symlinks=True, preserve_metadata=False) :
789 Copy this file or directory into the target directory.
790 rename(target) :
791 Rename this path to the target path.
792 replace(target) :
793 Rename this path to the target path, overwriting if that path exists.
794 move(target) :
795 Move this file or directory tree to the target path.
796 move_into(target_dir) :
797 Move this file or directory into the target directory.
798 unlink(missing_ok=False) :
799 Remove this file or link.
800 rmdir() :
801 Remove this directory.
802 owner(*, follow_symlinks=True) :
803 Return the login name of the file owner.
804 group(*, follow_symlinks=True) :
805 Return the group name of the file gid.
806 chmod(mode, *, follow_symlinks=True) :
807 Change the permissions of the path.
808 lchmod(mode) :
809 Like chmod() but, if the path points to a symlink, modify the symlink's
810 permissions.
812 """
814 __slots__ = (
815 "_chain",
816 "_chain_parser",
817 "_fs_cached",
818 "_raw_urlpaths",
819 "_relative_base",
820 )
822 if TYPE_CHECKING: # noqa: C901
823 _chain: Chain
824 _chain_parser: FSSpecChainParser
825 _fs_cached: AbstractFileSystem
826 _raw_urlpaths: Sequence[JoinablePathLike]
827 _relative_base: str | None
829 @overload
830 def __new__(
831 cls,
832 ) -> Self: ...
833 @overload # noqa: E301
834 def __new__(
835 cls,
836 *args: JoinablePathLike,
837 protocol: Literal["simplecache"],
838 **_: Any,
839 ) -> _uimpl.cached.SimpleCachePath: ...
840 @overload # noqa: E301
841 def __new__(
842 cls,
843 *args: JoinablePathLike,
844 protocol: Literal["gcs", "gs"],
845 **_: Any,
846 ) -> _uimpl.cloud.GCSPath: ...
847 @overload # noqa: E301
848 def __new__(
849 cls,
850 *args: JoinablePathLike,
851 protocol: Literal["s3", "s3a"],
852 **_: Any,
853 ) -> _uimpl.cloud.S3Path: ...
854 @overload # noqa: E301
855 def __new__(
856 cls,
857 *args: JoinablePathLike,
858 protocol: Literal["az", "abfs", "abfss", "adl"],
859 **_: Any,
860 ) -> _uimpl.cloud.AzurePath: ...
861 @overload # noqa: E301
862 def __new__(
863 cls,
864 *args: JoinablePathLike,
865 protocol: Literal["hf"],
866 **_: Any,
867 ) -> _uimpl.cloud.HfPath: ...
868 @overload # noqa: E301
869 def __new__(
870 cls,
871 *args: JoinablePathLike,
872 protocol: Literal["data"],
873 **_: Any,
874 ) -> _uimpl.data.DataPath: ...
875 @overload # noqa: E301
876 def __new__(
877 cls,
878 *args: JoinablePathLike,
879 protocol: Literal["ftp"],
880 **_: Any,
881 ) -> _uimpl.ftp.FTPPath: ...
882 @overload # noqa: E301
883 def __new__(
884 cls,
885 *args: JoinablePathLike,
886 protocol: Literal["github"],
887 **_: Any,
888 ) -> _uimpl.github.GitHubPath: ...
889 @overload # noqa: E301
890 def __new__(
891 cls,
892 *args: JoinablePathLike,
893 protocol: Literal["hdfs"],
894 **_: Any,
895 ) -> _uimpl.hdfs.HDFSPath: ...
896 @overload # noqa: E301
897 def __new__(
898 cls,
899 *args: JoinablePathLike,
900 protocol: Literal["http", "https"],
901 **_: Any,
902 ) -> _uimpl.http.HTTPPath: ...
903 @overload # noqa: E301
904 def __new__(
905 cls,
906 *args: JoinablePathLike,
907 protocol: Literal["file", "local"],
908 **_: Any,
909 ) -> _uimpl.local.FilePath: ...
910 @overload # noqa: E301
911 def __new__(
912 cls,
913 *args: JoinablePathLike,
914 protocol: Literal["memory"],
915 **_: Any,
916 ) -> _uimpl.memory.MemoryPath: ...
917 @overload # noqa: E301
918 def __new__(
919 cls,
920 *args: JoinablePathLike,
921 protocol: Literal["sftp", "ssh"],
922 **_: Any,
923 ) -> _uimpl.sftp.SFTPPath: ...
924 @overload # noqa: E301
925 def __new__(
926 cls,
927 *args: JoinablePathLike,
928 protocol: Literal["smb"],
929 **_: Any,
930 ) -> _uimpl.smb.SMBPath: ...
931 @overload # noqa: E301
932 def __new__(
933 cls,
934 *args: JoinablePathLike,
935 protocol: Literal["tar"],
936 **_: Any,
937 ) -> _uimpl.tar.TarPath: ...
938 @overload # noqa: E301
939 def __new__(
940 cls,
941 *args: JoinablePathLike,
942 protocol: Literal["webdav"],
943 **_: Any,
944 ) -> _uimpl.webdav.WebdavPath: ...
945 @overload # noqa: E301
946 def __new__(
947 cls,
948 *args: JoinablePathLike,
949 protocol: Literal["zip"],
950 **_: Any,
951 ) -> _uimpl.zip.ZipPath: ...
953 if sys.platform == "win32":
955 @overload # noqa: E301
956 def __new__(
957 cls,
958 *args: JoinablePathLike,
959 protocol: Literal[""],
960 **_: Any,
961 ) -> _uimpl.local.WindowsUPath: ...
963 else:
965 @overload # noqa: E301
966 def __new__(
967 cls,
968 *args: JoinablePathLike,
969 protocol: Literal[""],
970 **_: Any,
971 ) -> _uimpl.local.PosixUPath: ...
973 @overload # noqa: E301
974 def __new__(
975 cls,
976 *args: JoinablePathLike,
977 protocol: str | None = ...,
978 **_: Any,
979 ) -> Self: ...
981 def __new__(
982 cls,
983 *args: JoinablePathLike,
984 protocol: str | None = ...,
985 chain_parser: FSSpecChainParser = ...,
986 **storage_options: Any,
987 ) -> Self: ...
989 # === JoinablePath attributes =====================================
991 parser: UPathParser = LazyFlavourDescriptor() # type: ignore[assignment]
993 def with_segments(self, *pathsegments: JoinablePathLike) -> Self:
994 """Construct a new path object from any number of path-like objects."""
995 # we change joinpath behavior if called from a relative path
996 # this is not fully ideal, but currently the best way to move forward
997 if is_relative := self._relative_base is not None:
998 pathsegments = (self._relative_base, *pathsegments)
1000 new_instance = type(self)(
1001 *pathsegments,
1002 protocol=self._protocol,
1003 **self.storage_options,
1004 )
1005 if hasattr(self, "_fs_cached"):
1006 new_instance._fs_cached = self._fs_cached
1008 if is_relative:
1009 new_instance._relative_base = self._relative_base
1010 return new_instance
1012 def __str__(self) -> str:
1013 if self._relative_base is not None:
1014 active_path = self._chain.active_path
1015 stripped_base = self.parser.strip_protocol(
1016 self._relative_base
1017 ).removesuffix(self.parser.sep)
1018 if not active_path.startswith(stripped_base):
1019 raise RuntimeError(
1020 f"{active_path!r} is not a subpath of {stripped_base!r}"
1021 )
1023 return (
1024 active_path.removeprefix(stripped_base).removeprefix(self.parser.sep)
1025 or "."
1026 )
1027 else:
1028 return self._chain_parser.chain(self._chain.to_list())[0]
1030 def __vfspath__(self) -> str:
1031 if self._relative_base is not None:
1032 return self.__str__()
1033 else:
1034 return self.path
1036 def __repr__(self) -> str:
1037 cls_name = type(self).__name__
1038 path = self.__vfspath__()
1039 if self._relative_base is not None:
1040 return f"<relative {cls_name} {path!r}>"
1041 else:
1042 return f"{cls_name}({path!r}, protocol={self._protocol!r})"
1044 # === JoinablePath overrides ======================================
1046 @property
1047 def parts(self) -> Sequence[str]:
1048 """Provides sequence-like access to the filesystem path components.
1050 Examples
1051 --------
1052 >>> from upath import UPath
1053 >>> p = UPath("s3://my-bucket/path/to/file.txt")
1054 >>> p.parts
1055 ('my-bucket/', 'path', 'to', 'file.txt')
1056 >>> p2 = UPath("/foo/bar/baz.txt", protocol="memory")
1057 >>> p2.parts
1058 ('/', 'foo', 'bar', 'baz.txt')
1060 """
1061 # For relative paths, return parts of the relative path only
1062 if self._relative_base is not None:
1063 rel_str = str(self)
1064 if rel_str == ".":
1065 return ()
1066 return tuple(rel_str.split(self.parser.sep))
1068 split = self.parser.split
1069 sep = self.parser.sep
1071 path = self._chain.active_path
1072 drive = self.parser.splitdrive(self._chain.active_path)[0]
1073 stripped_path = self.parser.strip_protocol(path)
1074 if stripped_path:
1075 _, _, tail = path.partition(stripped_path)
1076 path = stripped_path + tail
1078 parent, name = split(path)
1079 names = []
1080 while path != parent:
1081 names.append(name)
1082 path = parent
1083 parent, name = split(path)
1085 if names and names[-1] == drive:
1086 names = names[:-1]
1087 if names and names[-1].startswith(sep):
1088 parts = [*names[:-1], names[-1].removeprefix(sep), drive + sep]
1089 else:
1090 parts = [*names, drive + sep]
1091 return tuple(reversed(parts))
1093 def with_name(self, name: str) -> Self:
1094 """Return a new path with the file name changed."""
1095 split = self.parser.split
1096 if self.parser.sep in name: # `split(name)[0]`
1097 raise ValueError(f"Invalid name {name!r}")
1098 _path = self.__vfspath__()
1099 _path = _path.removesuffix(split(_path)[1]) + name
1100 return self.with_segments(_path)
1102 @property
1103 def anchor(self) -> str:
1104 """The concatenation of the drive and root or an empty string."""
1105 if self._relative_base is not None:
1106 return ""
1107 return self.drive + self.root
1109 @property
1110 def parent(self) -> Self:
1111 """The logical parent of the path.
1113 Examples
1114 --------
1115 >>> from upath import UPath
1116 >>> p = UPath("s3://my-bucket/path/to/file.txt")
1117 >>> p.parent
1118 S3Path('s3://my-bucket/path/to')
1120 """
1121 if self._relative_base is not None:
1122 if str(self) == ".":
1123 return self
1124 else:
1125 # this needs to be revisited...
1126 pth = type(self)(
1127 self._relative_base,
1128 str(self),
1129 protocol=self._protocol,
1130 **self.storage_options,
1131 )
1132 parent = pth.parent
1133 parent._relative_base = self._relative_base
1134 return parent
1135 return super().parent
1137 @property
1138 def parents(self) -> Sequence[Self]:
1139 """A sequence providing access to the logical ancestors of the path.
1141 Examples
1142 --------
1143 >>> from upath import UPath
1144 >>> p = UPath("memory:///foo/bar/baz.txt")
1145 >>> list(p.parents)
1146 [
1147 MemoryPath('memory:///foo/bar'),
1148 MemoryPath('memory:///foo'),
1149 MemoryPath('memory:///'),
1150 ]
1152 """
1153 if self._relative_base is not None:
1154 parents = []
1155 parent = self
1156 while True:
1157 if str(parent) == ".":
1158 break
1159 parent = parent.parent
1160 parents.append(parent)
1161 return tuple(parents)
1162 return super().parents
1164 def joinpath(self, *pathsegments: JoinablePathLike) -> Self:
1165 """Combine this path with one or several arguments, and return a new path.
1167 For one argument, this is equivalent to using the `/` operator.
1169 Examples
1170 --------
1171 >>> from upath import UPath
1172 >>> p = UPath("s3://my-bucket/path/to")
1173 >>> p.joinpath("file.txt")
1174 S3Path('s3://my-bucket/path/to/file.txt')
1176 """
1177 return self.with_segments(self.__vfspath__(), *pathsegments)
1179 def __truediv__(self, key: JoinablePathLike) -> Self:
1180 try:
1181 return self.with_segments(self.__vfspath__(), key)
1182 except TypeError:
1183 return NotImplemented
1185 def __rtruediv__(self, key: JoinablePathLike) -> Self:
1186 try:
1187 return self.with_segments(key, self.__vfspath__())
1188 except TypeError:
1189 return NotImplemented
1191 # === ReadablePath attributes =====================================
1193 @property
1194 def info(self) -> PathInfo:
1195 """
1196 A PathInfo object that exposes the file type and other file attributes
1197 of this path.
1199 Returns
1200 -------
1201 : UPathInfo
1202 The UPathInfo object for this path.
1203 """
1204 return UPathInfo(self)
1206 def iterdir(self) -> Iterator[Self]:
1207 """Yield path objects of the directory contents.
1209 Examples
1210 --------
1211 >>> from upath import UPath
1212 >>> p = UPath("memory:///foo/")
1213 >>> p.joinpath("bar.txt").touch()
1214 >>> p.joinpath("baz.txt").touch()
1215 >>> for child in p.iterdir():
1216 ... print(child)
1217 MemoryPath('memory:///foo/bar.txt')
1218 MemoryPath('memory:///foo/baz.txt')
1220 """
1221 sep = self.parser.sep
1222 base = self
1223 if self.parts[-1:] == ("",):
1224 base = self.parent
1225 fs = base.fs
1226 base_path = base.path
1227 if not fs.isdir(base_path):
1228 raise NotADirectoryError(str(self))
1229 for name in fs.listdir(base_path):
1230 # fsspec returns dictionaries
1231 if isinstance(name, dict):
1232 name = name.get("name")
1233 if name in {".", ".."}:
1234 # Yielding a path object for these makes little sense
1235 continue
1236 # only want the path name with iterdir
1237 _, _, name = name.removesuffix(sep).rpartition(self.parser.sep)
1238 yield base.with_segments(base_path, name)
1240 def __open_reader__(self) -> BinaryIO:
1241 return self.fs.open(self.path, mode="rb")
1243 if sys.version_info >= (3, 14):
1245 def __open_rb__(self, buffering: int = UNSET_DEFAULT) -> BinaryIO:
1246 return self.open("rb", buffering=buffering)
1248 def readlink(self) -> Self:
1249 _raise_unsupported(type(self).__name__, "readlink")
1251 @overload
1252 def copy(self, target: _WT, **kwargs: Any) -> _WT: ...
1254 @overload
1255 def copy(self, target: SupportsPathLike | str, **kwargs: Any) -> Self: ...
1257 def copy(self, target: _WT | SupportsPathLike | str, **kwargs: Any) -> _WT | UPath:
1258 """
1259 Recursively copy this file or directory tree to the given destination.
1260 """
1261 if isinstance(target, str):
1262 proto = get_upath_protocol(target)
1263 if proto != self.protocol:
1264 target = UPath(target)
1265 else:
1266 target = self.with_segments(target)
1267 elif not isinstance(target, UPath):
1268 target = UPath(target)
1269 if target.is_dir():
1270 raise IsADirectoryError(str(target))
1271 return super().copy(target, **kwargs)
1273 @overload
1274 def copy_into(self, target_dir: _WT, **kwargs: Any) -> _WT: ...
1276 @overload
1277 def copy_into(self, target_dir: SupportsPathLike | str, **kwargs: Any) -> Self: ...
1279 def copy_into(
1280 self, target_dir: _WT | SupportsPathLike | str, **kwargs: Any
1281 ) -> _WT | UPath:
1282 """
1283 Copy this file or directory tree into the given existing directory.
1284 """
1285 if isinstance(target_dir, str):
1286 proto = get_upath_protocol(target_dir)
1287 if proto != self.protocol:
1288 target_dir = UPath(target_dir)
1289 else:
1290 target_dir = self.with_segments(target_dir)
1291 elif not isinstance(target_dir, UPath):
1292 target_dir = UPath(target_dir)
1293 if not target_dir.exists():
1294 raise FileNotFoundError(str(target_dir))
1295 if not target_dir.is_dir():
1296 raise NotADirectoryError(str(target_dir))
1297 return super().copy_into(target_dir, **kwargs)
1299 @overload
1300 def move(self, target: _WT, **kwargs: Any) -> _WT: ...
1302 @overload
1303 def move(self, target: SupportsPathLike | str, **kwargs: Any) -> Self: ...
1305 def move(self, target: _WT | SupportsPathLike | str, **kwargs: Any) -> _WT | UPath:
1306 """
1307 Recursively move this file or directory tree to the given destination.
1308 """
1309 target = self.copy(target, **kwargs)
1310 self.fs.rm(self.path, recursive=self.is_dir())
1311 return target
1313 @overload
1314 def move_into(self, target_dir: _WT, **kwargs: Any) -> _WT: ...
1316 @overload
1317 def move_into(self, target_dir: SupportsPathLike | str, **kwargs: Any) -> Self: ...
1319 def move_into(
1320 self, target_dir: _WT | SupportsPathLike | str, **kwargs: Any
1321 ) -> _WT | UPath:
1322 """
1323 Move this file or directory tree into the given existing directory.
1324 """
1325 name = self.name
1326 if not name:
1327 raise ValueError(f"{self!r} has an empty name")
1328 elif hasattr(target_dir, "with_segments"):
1329 target = target_dir.with_segments(target_dir, name) # type: ignore
1330 elif isinstance(target_dir, PurePath):
1331 target = UPath(target_dir, name)
1332 else:
1333 target = self.with_segments(target_dir, name)
1334 td = target.parent
1335 if not td.exists():
1336 raise FileNotFoundError(str(td))
1337 elif not td.is_dir():
1338 raise NotADirectoryError(str(td))
1339 return self.move(target)
1341 def _copy_from(
1342 self,
1343 source: ReadablePath,
1344 follow_symlinks: bool = True,
1345 on_name_collision: OnNameCollisionFunc | None = None,
1346 **kwargs: Any,
1347 ) -> None:
1348 """
1349 UPath custom:: Recursively copy the given path to this path.
1350 """
1351 # fixme: it would be best if this would be upstreamed
1352 from pathlib_abc import vfsopen
1353 from pathlib_abc import vfspath
1354 from pathlib_abc._os import copyfileobj
1355 from pathlib_abc._os import ensure_different_files
1357 stack: list[tuple[ReadablePath, WritablePath]] = [(source, self)]
1358 while stack:
1359 src, dst = stack.pop()
1360 info = src.info
1361 if not follow_symlinks and info.is_symlink():
1362 dst.symlink_to(vfspath(src.readlink()), src.info.is_dir())
1363 elif on_name_collision and info.is_file() and info.is_dir():
1364 dst_file, dst_dir = on_name_collision(src, dst)
1365 if dst_file is not None:
1366 ensure_different_files(src, dst_file)
1367 with vfsopen(src, "rb") as source_f:
1368 with vfsopen(dst_file, "wb") as target_f:
1369 copyfileobj(source_f, target_f)
1370 if dst_dir is not None:
1371 children = src.iterdir()
1372 dst_dir.mkdir()
1373 # feed through dict.fromkeys to remove duplicates
1374 for child in dict.fromkeys(children):
1375 stack.append((child, dst_dir.joinpath(child.name)))
1376 elif info.is_dir():
1377 children = src.iterdir()
1378 dst.mkdir()
1379 # feed through dict.fromkeys to remove duplicates
1380 for child in dict.fromkeys(children):
1381 stack.append((child, dst.joinpath(child.name)))
1382 else:
1383 ensure_different_files(src, dst)
1384 with vfsopen(src, "rb") as source_f:
1385 with vfsopen(dst, "wb") as target_f:
1386 copyfileobj(source_f, target_f)
1388 # --- WritablePath attributes -------------------------------------
1390 def symlink_to(
1391 self,
1392 target: ReadablePathLike,
1393 target_is_directory: bool = False,
1394 ) -> None:
1395 _raise_unsupported(type(self).__name__, "symlink_to")
1397 def mkdir(
1398 self,
1399 mode: int = 0o777,
1400 parents: bool = False,
1401 exist_ok: bool = False,
1402 ) -> None:
1403 """
1404 Create a new directory at this given path.
1405 """
1406 if parents and not exist_ok and self.exists():
1407 raise FileExistsError(str(self))
1408 try:
1409 self.fs.mkdir(
1410 self.path,
1411 create_parents=parents,
1412 mode=mode,
1413 )
1414 except FileExistsError:
1415 if not exist_ok:
1416 raise FileExistsError(str(self))
1417 if not self.is_dir():
1418 raise FileExistsError(str(self))
1420 def __open_writer__(self, mode: Literal["a", "w", "x"]) -> BinaryIO:
1421 return self.fs.open(self.path, mode=f"{mode}b")
1423 # --- upath overrides ---------------------------------------------
1425 @overload
1426 def open(
1427 self,
1428 mode: Literal["r", "w", "a"] = ...,
1429 buffering: int = ...,
1430 encoding: str = ...,
1431 errors: str = ...,
1432 newline: str = ...,
1433 **fsspec_kwargs: Any,
1434 ) -> TextIO: ...
1436 @overload
1437 def open(
1438 self,
1439 mode: Literal["rb", "wb", "ab"] = ...,
1440 buffering: int = ...,
1441 encoding: str = ...,
1442 errors: str = ...,
1443 newline: str = ...,
1444 **fsspec_kwargs: Any,
1445 ) -> BinaryIO: ...
1447 @overload
1448 def open(
1449 self,
1450 mode: str = ...,
1451 buffering: int = ...,
1452 encoding: str | None = ...,
1453 errors: str | None = ...,
1454 newline: str | None = ...,
1455 **fsspec_kwargs: Any,
1456 ) -> IO[Any]: ...
1458 def open(
1459 self,
1460 mode: str = "r",
1461 buffering: int = UNSET_DEFAULT,
1462 encoding: str | None = UNSET_DEFAULT,
1463 errors: str | None = UNSET_DEFAULT,
1464 newline: str | None = UNSET_DEFAULT,
1465 **fsspec_kwargs: Any,
1466 ) -> IO[Any]:
1467 """
1468 Open the file pointed by this path and return a file object, as
1469 the built-in open() function does.
1471 Parameters
1472 ----------
1473 mode:
1474 Opening mode. Default is 'r'.
1475 buffering:
1476 Default is the block size of the underlying fsspec filesystem.
1477 encoding:
1478 Encoding is only used in text mode. Default is None.
1479 errors:
1480 Error handling for encoding. Only used in text mode. Default is None.
1481 newline:
1482 Newline handling. Only used in text mode. Default is None.
1483 **fsspec_kwargs:
1484 Additional options for the fsspec filesystem.
1485 """
1486 # match the signature of pathlib.Path.open()
1487 if buffering is not UNSET_DEFAULT:
1488 if "block_size" in fsspec_kwargs:
1489 raise TypeError("cannot specify both 'buffering' and 'block_size'")
1490 block_size = _buffering2blocksize(mode, buffering)
1491 if block_size is not None:
1492 fsspec_kwargs.setdefault("block_size", block_size)
1493 if encoding is not UNSET_DEFAULT:
1494 fsspec_kwargs["encoding"] = encoding
1495 if errors is not UNSET_DEFAULT:
1496 fsspec_kwargs["errors"] = errors
1497 if newline is not UNSET_DEFAULT:
1498 fsspec_kwargs["newline"] = newline
1499 return self.fs.open(self.path, mode=mode, **fsspec_kwargs)
1501 # === pathlib.Path ================================================
1503 def stat(
1504 self,
1505 *,
1506 follow_symlinks: bool = True,
1507 ) -> StatResultType:
1508 """
1509 Return the result of the stat() system call on this path, like
1510 os.stat() does.
1512 Info
1513 ----
1514 For fsspec filesystems follow_symlinks is currently ignored.
1516 Returns
1517 -------
1518 : UPathStatResult
1519 The upath stat result for this path, emulating `os.stat_result`.
1521 """
1522 if not follow_symlinks:
1523 warnings.warn(
1524 f"{type(self).__name__}.stat(follow_symlinks=False):"
1525 " is currently ignored.",
1526 UserWarning,
1527 stacklevel=2,
1528 )
1529 return UPathStatResult.from_info(self.fs.info(self.path))
1531 def lstat(self) -> StatResultType:
1532 return self.stat(follow_symlinks=False)
1534 def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None:
1535 _raise_unsupported(type(self).__name__, "chmod")
1537 def exists(self, *, follow_symlinks: bool = True) -> bool:
1538 """
1539 Whether this path exists.
1541 Info
1542 ----
1543 For fsspec filesystems follow_symlinks is currently ignored.
1544 """
1545 if not follow_symlinks:
1546 warnings.warn(
1547 f"{type(self).__name__}.exists() follow_symlinks=False"
1548 " is currently ignored.",
1549 UserWarning,
1550 stacklevel=2,
1551 )
1552 return self.fs.exists(self.path)
1554 def is_dir(self, *, follow_symlinks: bool = True) -> bool:
1555 """
1556 Whether this path is a directory.
1557 """
1558 if not follow_symlinks:
1559 warnings.warn(
1560 f"{type(self).__name__}.is_dir() follow_symlinks=False"
1561 " is currently ignored.",
1562 UserWarning,
1563 stacklevel=2,
1564 )
1565 return self.fs.isdir(self.path)
1567 def is_file(self, *, follow_symlinks: bool = True) -> bool:
1568 """
1569 Whether this path is a regular file.
1570 """
1571 if not follow_symlinks:
1572 warnings.warn(
1573 f"{type(self).__name__}.is_file() follow_symlinks=False"
1574 " is currently ignored.",
1575 UserWarning,
1576 stacklevel=2,
1577 )
1578 return self.fs.isfile(self.path)
1580 def is_mount(self) -> bool:
1581 """
1582 Check if this path is a mount point
1584 Info
1585 ----
1586 For fsspec filesystems this is always False.
1587 """
1588 return False
1590 def is_symlink(self) -> bool:
1591 """
1592 Whether this path is a symbolic link.
1593 """
1594 try:
1595 info = self.fs.info(self.path)
1596 if "islink" in info:
1597 return bool(info["islink"])
1598 except FileNotFoundError:
1599 return False
1600 return False
1602 def is_junction(self) -> bool:
1603 """
1604 Whether this path is a junction.
1606 Info
1607 ----
1608 For fsspec filesystems this is always False.
1609 """
1610 return False
1612 def is_block_device(self) -> bool:
1613 """
1614 Whether this path is a block device.
1616 Info
1617 ----
1618 For fsspec filesystems this is always False.
1619 """
1620 return False
1622 def is_char_device(self) -> bool:
1623 """
1624 Whether this path is a character device.
1626 Info
1627 ----
1628 For fsspec filesystems this is always False.
1629 """
1630 return False
1632 def is_fifo(self) -> bool:
1633 """
1634 Whether this path is a FIFO (named pipe).
1636 Info
1637 ----
1638 For fsspec filesystems this is always False.
1639 """
1640 return False
1642 def is_socket(self) -> bool:
1643 """
1644 Whether this path is a socket.
1646 Info
1647 ----
1648 For fsspec filesystems this is always False.
1649 """
1650 return False
1652 def is_reserved(self) -> bool:
1653 """
1654 Whether this path is reserved under Windows.
1656 Info
1657 ----
1658 For fsspec filesystems this is always False.
1659 """
1660 return False
1662 def expanduser(self) -> Self:
1663 """Return a new path with expanded `~` constructs.
1665 Info
1666 ----
1667 For fsspec filesystems this is currently a no-op.
1668 """
1669 return self
1671 def glob(
1672 self,
1673 pattern: str,
1674 *,
1675 case_sensitive: bool | None = None,
1676 recurse_symlinks: bool = False,
1677 ) -> Iterator[Self]:
1678 """Iterate over this subtree and yield all existing files (of any
1679 kind, including directories) matching the given relative pattern."""
1680 if case_sensitive is not None:
1681 warnings.warn(
1682 "UPath.glob(): case_sensitive is currently ignored.",
1683 UserWarning,
1684 stacklevel=2,
1685 )
1686 if recurse_symlinks:
1687 warnings.warn(
1688 "UPath.glob(): recurse_symlinks=True is currently ignored.",
1689 UserWarning,
1690 stacklevel=2,
1691 )
1692 if self._relative_base is not None:
1693 self = self.absolute()
1694 path_pattern = self.joinpath(pattern).path
1695 sep = self.parser.sep
1696 base = self.path
1697 for name in self.fs.glob(path_pattern):
1698 name = name.removeprefix(base).removeprefix(sep)
1699 yield self.joinpath(name)
1701 def rglob(
1702 self,
1703 pattern: str,
1704 *,
1705 case_sensitive: bool | None = None,
1706 recurse_symlinks: bool = False,
1707 ) -> Iterator[Self]:
1708 """Recursively yield all existing files (of any kind, including
1709 directories) matching the given relative pattern, anywhere in
1710 this subtree.
1711 """
1712 if case_sensitive is not None:
1713 warnings.warn(
1714 "UPath.glob(): case_sensitive is currently ignored.",
1715 UserWarning,
1716 stacklevel=2,
1717 )
1718 if recurse_symlinks:
1719 warnings.warn(
1720 "UPath.glob(): recurse_symlinks=True is currently ignored.",
1721 UserWarning,
1722 stacklevel=2,
1723 )
1724 if _FSSPEC_HAS_WORKING_GLOB is None:
1725 _check_fsspec_has_working_glob()
1727 if _FSSPEC_HAS_WORKING_GLOB:
1728 r_path_pattern = self.joinpath("**", pattern).path
1729 sep = self.parser.sep
1730 base = self.path
1731 for name in self.fs.glob(r_path_pattern):
1732 name = name.removeprefix(base).removeprefix(sep)
1733 yield self.joinpath(name)
1735 else:
1736 path_pattern = self.joinpath(pattern).path
1737 r_path_pattern = self.joinpath("**", pattern).path
1738 sep = self.parser.sep
1739 base = self.path
1740 seen = set()
1741 for p in (path_pattern, r_path_pattern):
1742 for name in self.fs.glob(p):
1743 name = name.removeprefix(base).removeprefix(sep)
1744 if name in seen:
1745 continue
1746 else:
1747 seen.add(name)
1748 yield self.joinpath(name)
1750 def owner(self, *, follow_symlinks: bool = True) -> str:
1751 _raise_unsupported(type(self).__name__, "owner")
1753 def group(self, *, follow_symlinks: bool = True) -> str:
1754 _raise_unsupported(type(self).__name__, "group")
1756 def absolute(self) -> Self:
1757 """Return an absolute version of this path
1758 No normalization or symlink resolution is performed.
1760 Use resolve() to resolve symlinks and remove '..' segments.
1761 """
1762 if self._relative_base is not None:
1763 return self.cwd().joinpath(self.__vfspath__())
1764 return self
1766 def is_absolute(self) -> bool:
1767 """True if the path is absolute (has both a root and, if applicable,
1768 a drive)."""
1769 if self._relative_base is not None:
1770 return False
1771 else:
1772 return self.parser.isabs(self.__vfspath__())
1774 def __eq__(self, other: object) -> bool:
1775 """UPaths are considered equal if their protocol, path and
1776 storage_options are equal."""
1777 if not isinstance(other, UPath):
1778 return NotImplemented
1780 # For relative paths, compare the string representation instead of path
1781 if (
1782 self._relative_base is not None
1783 or getattr(other, "_relative_base", None) is not None
1784 ):
1785 # If both are relative paths, compare just the relative strings
1786 if (
1787 self._relative_base is not None
1788 and getattr(other, "_relative_base", None) is not None
1789 ):
1790 return str(self) == str(other)
1791 else:
1792 # One is relative, one is not - they can't be equal
1793 return False
1795 return (
1796 self.__vfspath__() == other.__vfspath__()
1797 and self.protocol == other.protocol
1798 and self.storage_options == other.storage_options
1799 )
1801 def __hash__(self) -> int:
1802 """The returned hash is based on the protocol and path only.
1804 Note: in the future, if hash collisions become an issue, we
1805 can add `fsspec.utils.tokenize(storage_options)`
1806 """
1807 return hash((self.protocol, self.__vfspath__()))
1809 def __lt__(self, other: object) -> bool:
1810 if not isinstance(other, UPath) or self.parser is not other.parser:
1811 return NotImplemented
1812 return self.__vfspath__() < other.__vfspath__()
1814 def __le__(self, other: object) -> bool:
1815 if not isinstance(other, UPath) or self.parser is not other.parser:
1816 return NotImplemented
1817 return self.__vfspath__() <= other.__vfspath__()
1819 def __gt__(self, other: object) -> bool:
1820 if not isinstance(other, UPath) or self.parser is not other.parser:
1821 return NotImplemented
1822 return self.__vfspath__() > other.__vfspath__()
1824 def __ge__(self, other: object) -> bool:
1825 if not isinstance(other, UPath) or self.parser is not other.parser:
1826 return NotImplemented
1827 return self.__vfspath__() >= other.__vfspath__()
1829 def resolve(self, strict: bool = False) -> Self:
1830 """
1831 Make the path absolute, resolving all symlinks on the way and also
1832 normalizing it.
1833 """
1834 if self._relative_base is not None:
1835 self = self.absolute()
1836 _parts = self.parts
1838 # Do not attempt to normalize path if no parts are dots
1839 if ".." not in _parts and "." not in _parts:
1840 return self
1842 resolved: list[str] = []
1843 resolvable_parts = _parts[1:]
1844 for part in resolvable_parts:
1845 if part == "..":
1846 if resolved:
1847 resolved.pop()
1848 elif part != ".":
1849 resolved.append(part)
1851 return self.with_segments(*_parts[:1], *resolved)
1853 def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None:
1854 """Create this file with the given access mode, if it doesn't exist."""
1855 exists = self.fs.exists(self.path)
1856 if exists and not exist_ok:
1857 raise FileExistsError(str(self))
1858 if not exists:
1859 try:
1860 self.fs.touch(self.path, truncate=True)
1861 except NotImplementedError:
1862 _raise_unsupported(type(self).__name__, "touch")
1863 else:
1864 try:
1865 self.fs.touch(self.path, truncate=False)
1866 except (NotImplementedError, ValueError):
1867 pass # unsupported by filesystem
1869 def lchmod(self, mode: int) -> None:
1870 _raise_unsupported(type(self).__name__, "lchmod")
1872 def unlink(self, missing_ok: bool = False) -> None:
1873 """
1874 Remove this file or link.
1875 If the path is a directory, use rmdir() instead.
1876 """
1877 if not self.exists():
1878 if not missing_ok:
1879 raise FileNotFoundError(str(self))
1880 return
1881 self.fs.rm(self.path, recursive=False)
1883 def rmdir(self, recursive: bool = True) -> None: # fixme: non-standard
1884 """
1885 Remove this directory.
1887 Warning
1888 -------
1889 This method is non-standard compared to pathlib.Path.rmdir(),
1890 as it supports a `recursive` parameter to remove non-empty
1891 directories and defaults to recursive deletion.
1893 This behavior is likely to change in future releases once
1894 `.delete()` is introduced.
1896 """
1897 if not self.is_dir():
1898 raise NotADirectoryError(str(self))
1899 if not recursive and next(self.iterdir()): # type: ignore[arg-type]
1900 raise OSError(f"Not recursive and directory not empty: {self}")
1901 self.fs.rm(self.path, recursive=recursive)
1903 def rename(
1904 self,
1905 target: WritablePathLike,
1906 *, # note: non-standard compared to pathlib
1907 recursive: bool = UNSET_DEFAULT,
1908 maxdepth: int | None = UNSET_DEFAULT,
1909 **kwargs: Any,
1910 ) -> Self:
1911 """
1912 Rename this file or directory to the given target.
1914 The target path may be absolute or relative. Relative paths are
1915 interpreted relative to the current working directory, *not* the
1916 directory of the Path object.
1918 Returns the new Path instance pointing to the target path.
1920 Info
1921 ----
1922 For filesystems that don't have a root character, i.e. for which
1923 relative paths can be ambiguous, you can explicitly indicate a
1924 relative path via prefixing with `./`
1926 Warning
1927 -------
1928 This method is non-standard compared to pathlib.Path.rename(),
1929 as it supports `recursive` and `maxdepth` parameters for
1930 directory moves. This will be revisited in future releases.
1932 It's better to use `.move()` or `.move_into()` to avoid
1933 running into future compatibility issues.
1935 """
1936 # check protocol compatibility
1937 target_protocol = get_upath_protocol(target)
1938 if target_protocol and target_protocol != self.protocol:
1939 raise ValueError(
1940 f"expected protocol {self.protocol!r}, got: {target_protocol!r}"
1941 )
1942 # ensure target is an absolute UPath
1943 if not isinstance(target, type(self)):
1944 if isinstance(target, (UPath, PurePath)):
1945 target_str = target.as_posix()
1946 else:
1947 target_str = str(target)
1948 if target_protocol:
1949 # target protocol provided indicates absolute path
1950 target = self.with_segments(target_str)
1951 elif self.anchor and target_str.startswith(self.anchor):
1952 # self.anchor can be used to indicate absolute path
1953 target = self.with_segments(target_str)
1954 elif not self.anchor and target_str.startswith("./"):
1955 # indicate relative via "./"
1956 target = (
1957 self.cwd()
1958 .joinpath(target_str.removeprefix("./"))
1959 .relative_to(self.cwd())
1960 )
1961 else:
1962 # all other cases
1963 target = self.cwd().joinpath(target_str).relative_to(self.cwd())
1964 # return early if renaming to same path
1965 if target == self:
1966 return self
1967 # ensure source and target are absolute
1968 source_abs = self.absolute()
1969 target_abs = target.absolute()
1970 # avoid calling .resolve for if not needed
1971 if ".." in target_abs.parts or "." in target_abs.parts:
1972 target_abs = target_abs.resolve()
1973 if kwargs:
1974 warnings.warn(
1975 "Passing additional keyword arguments to "
1976 f"{type(self).__name__}.rename() is deprecated and will be"
1977 " removed in future univeral-pathlib versions.",
1978 DeprecationWarning,
1979 stacklevel=2,
1980 )
1981 if recursive is not UNSET_DEFAULT:
1982 warnings.warn(
1983 f"{type(self).__name__}.rename()'s `recursive` keyword argument is"
1984 " deprecated and will be removed in future universal-pathlib versions."
1985 f" Please use {type(self).__name__}.move() or .move_into() instead.",
1986 DeprecationWarning,
1987 stacklevel=2,
1988 )
1989 kwargs["recursive"] = recursive
1990 if maxdepth is not UNSET_DEFAULT:
1991 warnings.warn(
1992 f"{type(self).__name__}.rename()'s `maxdepth` keyword argument is"
1993 " deprecated and will be removed in future universal-pathlib versions.",
1994 DeprecationWarning,
1995 stacklevel=2,
1996 )
1997 kwargs["maxdepth"] = maxdepth
1998 self.fs.mv(
1999 source_abs.path,
2000 target_abs.path,
2001 **kwargs,
2002 )
2003 return target
2005 def replace(self, target: WritablePathLike) -> Self:
2006 """
2007 Rename this path to the target path, overwriting if that path exists.
2009 The target path may be absolute or relative. Relative paths are
2010 interpreted relative to the current working directory, *not* the
2011 directory of the Path object.
2013 Returns the new Path instance pointing to the target path.
2015 Warning
2016 -------
2017 This method is currently not implemented.
2019 """
2020 _raise_unsupported(type(self).__name__, "replace")
2022 @property
2023 def drive(self) -> str:
2024 """The drive prefix (letter or UNC path), if any.
2026 Info
2027 ----
2028 On non-Windows systems, the drive is always an empty string.
2029 On cloud storage systems, the drive is the bucket name or equivalent.
2030 """
2031 if self._relative_base is not None:
2032 return ""
2033 return self.parser.splitroot(str(self))[0]
2035 @property
2036 def root(self) -> str:
2037 """The root of the path, if any."""
2038 if self._relative_base is not None:
2039 return ""
2040 return self.parser.splitroot(str(self))[1]
2042 def __reduce__(self):
2043 if self._relative_base is None:
2044 args = (self.__vfspath__(),)
2045 kwargs = {
2046 "protocol": self._protocol,
2047 **self.storage_options,
2048 }
2049 else:
2050 args = (self._relative_base, self.__vfspath__())
2051 # Include _relative_base in the state if it's set
2052 kwargs = {
2053 "protocol": self._protocol,
2054 **self.storage_options,
2055 "_relative_base": self._relative_base,
2056 }
2057 return _make_instance, (type(self), args, kwargs)
2059 @classmethod
2060 def from_uri(cls, uri: str, **storage_options: Any) -> Self:
2061 return cls(uri, **storage_options)
2063 def as_uri(self) -> str:
2064 """Return the string representation of the path as a URI."""
2065 if self._relative_base is not None:
2066 raise ValueError(
2067 f"relative path can't be expressed as a {self.protocol} URI"
2068 )
2069 return str(self)
2071 def as_posix(self) -> str:
2072 """Return the string representation of the path with POSIX-style separators."""
2073 return str(self)
2075 def samefile(self, other_path) -> bool:
2076 st = self.stat()
2077 if isinstance(other_path, UPath):
2078 other_st = other_path.stat()
2079 else:
2080 other_st = self.with_segments(other_path).stat()
2081 return st == other_st
2083 @classmethod
2084 def cwd(cls) -> Self:
2085 """
2086 Return a new UPath object representing the current working directory.
2088 Info
2089 ----
2090 None of the fsspec filesystems support a global current working
2091 directory, so this method only works for the base UPath class,
2092 returning the local current working directory.
2094 """
2095 if cls is UPath:
2096 # default behavior for UPath.cwd() is to return local cwd
2097 return get_upath_class("").cwd() # type: ignore[union-attr,return-value]
2098 else:
2099 _raise_unsupported(cls.__name__, "cwd")
2101 @classmethod
2102 def home(cls) -> Self:
2103 """
2104 Return a new UPath object representing the user's home directory.
2106 Info
2107 ----
2108 None of the fsspec filesystems support user home directories,
2109 so this method only works for the base UPath class, returning the
2110 local user's home directory.
2112 """
2113 if cls is UPath:
2114 return get_upath_class("").home() # type: ignore[union-attr,return-value]
2115 else:
2116 _raise_unsupported(cls.__name__, "home")
2118 def relative_to( # type: ignore[override]
2119 self,
2120 other: Self | str,
2121 /,
2122 *_deprecated: Any,
2123 walk_up: bool = False,
2124 ) -> Self:
2125 """Return the relative path to another path identified by the passed
2126 arguments. If the operation is not possible (because this is not
2127 related to the other path), raise ValueError.
2129 The *walk_up* parameter controls whether `..` may be used to resolve
2130 the path.
2131 """
2132 if walk_up:
2133 raise NotImplementedError("walk_up=True is not implemented yet")
2135 if isinstance(other, UPath):
2136 # revisit: ...
2137 if self.__class__ is not other.__class__:
2138 raise ValueError(
2139 "incompatible protocols:"
2140 f" {self.protocol!r} != {other.protocol!r}"
2141 )
2142 if self.storage_options != other.storage_options:
2143 raise ValueError(
2144 "incompatible storage_options:"
2145 f" {self.storage_options!r} != {other.storage_options!r}"
2146 )
2147 elif isinstance(other, str):
2148 other = self.with_segments(other)
2149 else:
2150 raise TypeError(f"expected UPath or str, got {type(other).__name__}")
2152 if other not in self.parents and self != other:
2153 raise ValueError(f"{self!s} is not in the subpath of {other!s}")
2154 else:
2155 rel = copy(self)
2156 rel._relative_base = other.path
2157 return rel
2159 def is_relative_to(
2160 self,
2161 other: Self | str,
2162 /,
2163 *_deprecated: Any,
2164 ) -> bool: # type: ignore[override]
2165 """Return True if the path is relative to another path identified."""
2166 if isinstance(other, UPath) and self.storage_options != other.storage_options:
2167 return False
2168 elif isinstance(other, str):
2169 other = self.with_segments(other)
2170 return self == other or other in self.parents
2172 def hardlink_to(self, target: ReadablePathLike) -> None:
2173 _raise_unsupported(type(self).__name__, "hardlink_to")
2175 def full_match(
2176 self,
2177 pattern: str | SupportsPathLike,
2178 *,
2179 case_sensitive: bool | None = None,
2180 ) -> bool:
2181 """Match this path against the provided glob-style pattern.
2182 Return True if matching is successful, False otherwise.
2183 """
2184 if case_sensitive is not None:
2185 warnings.warn(
2186 f"{type(self).__name__}.full_match(): case_sensitive"
2187 " is currently ignored.",
2188 UserWarning,
2189 stacklevel=2,
2190 )
2191 return super().full_match(str(pattern))
2193 def match(
2194 self,
2195 path_pattern: str | SupportsPathLike,
2196 *,
2197 case_sensitive: bool | None = None,
2198 ) -> bool:
2199 """Match this path against the provided non-recursive glob-style pattern.
2200 Return True if matching is successful, False otherwise.
2201 """
2202 path_pattern = str(path_pattern)
2203 if not path_pattern:
2204 raise ValueError("pattern cannot be empty")
2205 if case_sensitive is not None:
2206 warnings.warn(
2207 f"{type(self).__name__}.match(): case_sensitive is currently ignored.",
2208 UserWarning,
2209 stacklevel=2,
2210 )
2211 return self.full_match(path_pattern.replace("**", "*"))
2213 @classmethod
2214 def __get_pydantic_core_schema__(
2215 cls, _source_type: Any, _handler: GetCoreSchemaHandler
2216 ) -> CoreSchema:
2217 from pydantic_core import core_schema
2219 str_schema = core_schema.chain_schema(
2220 [
2221 core_schema.str_schema(),
2222 core_schema.no_info_plain_validator_function(
2223 lambda path: {
2224 "path": path,
2225 "protocol": None,
2226 "storage_options": {},
2227 },
2228 ),
2229 ]
2230 )
2232 object_schema = core_schema.typed_dict_schema(
2233 {
2234 "path": core_schema.typed_dict_field(
2235 core_schema.str_schema(), required=True
2236 ),
2237 "protocol": core_schema.typed_dict_field(
2238 core_schema.with_default_schema(
2239 core_schema.nullable_schema(
2240 core_schema.str_schema(),
2241 ),
2242 default=None,
2243 ),
2244 required=False,
2245 ),
2246 "storage_options": core_schema.typed_dict_field(
2247 core_schema.with_default_schema(
2248 core_schema.dict_schema(
2249 core_schema.str_schema(),
2250 core_schema.any_schema(),
2251 ),
2252 default_factory=dict,
2253 ),
2254 required=False,
2255 ),
2256 },
2257 extra_behavior="forbid",
2258 )
2260 deserialization_schema = core_schema.chain_schema(
2261 [
2262 core_schema.union_schema([str_schema, object_schema]),
2263 core_schema.no_info_plain_validator_function(
2264 lambda dct: cls(
2265 dct.pop("path"),
2266 protocol=dct.pop("protocol"),
2267 **dct["storage_options"],
2268 )
2269 ),
2270 ]
2271 )
2273 serialization_schema = core_schema.plain_serializer_function_ser_schema(
2274 lambda u: {
2275 "path": u.path,
2276 "protocol": u.protocol,
2277 "storage_options": dict(u.storage_options),
2278 }
2279 )
2281 return core_schema.json_or_python_schema(
2282 json_schema=deserialization_schema,
2283 python_schema=core_schema.union_schema(
2284 [core_schema.is_instance_schema(UPath), deserialization_schema]
2285 ),
2286 serialization=serialization_schema,
2287 )