Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/extensions.py: 53%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import sys
4from collections.abc import Iterator
5from collections.abc import Mapping
6from collections.abc import Sequence
7from typing import IO
8from typing import TYPE_CHECKING
9from typing import Any
10from typing import BinaryIO
11from typing import Callable
12from typing import Literal
13from typing import TextIO
14from typing import TypeVar
15from typing import overload
16from urllib.parse import SplitResult
18from fsspec import AbstractFileSystem
19from pathlib_abc import vfspath
21from upath._chain import Chain
22from upath._chain import ChainSegment
23from upath.core import UnsupportedOperation
24from upath.core import UPath
25from upath.types import UNSET_DEFAULT
26from upath.types import JoinablePathLike
27from upath.types import PathInfo
28from upath.types import ReadablePath
29from upath.types import ReadablePathLike
30from upath.types import StatResultType
31from upath.types import SupportsPathLike
32from upath.types import UPathParser
33from upath.types import WritablePathLike
35if TYPE_CHECKING:
36 if sys.version_info > (3, 11):
37 from typing import Self
38 else:
39 from typing_extensions import Self
41 from pydantic import GetCoreSchemaHandler
42 from pydantic_core.core_schema import CoreSchema
44__all__ = [
45 "ProxyUPath",
46]
48T = TypeVar("T")
51class classmethod_or_method(classmethod):
52 """A decorator that can be used as a classmethod or an instance method.
54 When called on the class, it behaves like a classmethod.
55 When called on an instance, it behaves like an instance method.
57 """
59 def __get__(
60 self,
61 instance: T | None,
62 owner: type[T] | None = None,
63 /,
64 ) -> Callable[..., T]:
65 if instance is None:
66 return self.__func__.__get__(owner)
67 else:
68 return self.__func__.__get__(instance)
71class ProxyUPath:
72 """ProxyUPath base class
74 ProxyUPath should be used when you want to extend the UPath class
75 interface with additional methods, but still want to support
76 all supported upath implementations.
78 """
80 __slots__ = ("__wrapped__",)
82 # TODO: think about if and how to handle these
83 # _transform_init_args
84 # _parse_storage_options
85 # _fs_factory
86 # _protocol_dispatch
88 # === non-public methods / attributes =============================
90 @classmethod
91 def _from_upath(cls, upath: UPath, /) -> Self:
92 if isinstance(upath, cls):
93 return upath # type: ignore[unreachable]
94 else:
95 obj = object.__new__(cls)
96 obj.__wrapped__ = upath
97 return obj
99 @property
100 def _chain(self):
101 try:
102 return self.__wrapped__._chain
103 except AttributeError:
104 return Chain(
105 ChainSegment(
106 path=self.__wrapped__.path,
107 protocol=self.__wrapped__.protocol,
108 storage_options=dict(self.__wrapped__.storage_options),
109 ),
110 )
112 # === wrapped interface ===========================================
114 def __init__(
115 self,
116 *args: JoinablePathLike,
117 protocol: str | None = None,
118 **storage_options: Any,
119 ) -> None:
120 self.__wrapped__ = UPath(*args, protocol=protocol, **storage_options)
122 @property
123 def parser(self) -> UPathParser:
124 return self.__wrapped__.parser
126 def with_segments(self, *pathsegments: JoinablePathLike) -> Self:
127 return self._from_upath(self.__wrapped__.with_segments(*pathsegments))
129 def __str__(self) -> str:
130 return self.__wrapped__.__str__()
132 def __vfspath__(self) -> str:
133 return self.__wrapped__.__vfspath__()
135 def __repr__(self) -> str:
136 return (
137 f"{type(self).__name__}"
138 f"({self.__wrapped__.path!r}, protocol={self.protocol!r})"
139 )
141 @property
142 def parts(self) -> Sequence[str]:
143 return self.__wrapped__.parts
145 def with_name(self, name: str) -> Self:
146 return self._from_upath(self.__wrapped__.with_name(name))
148 @property
149 def info(self) -> PathInfo:
150 return self.__wrapped__.info
152 def iterdir(self) -> Iterator[Self]:
153 for pth in self.__wrapped__.iterdir():
154 yield self._from_upath(pth)
156 def __open_reader__(self) -> BinaryIO:
157 return self.__wrapped__.__open_reader__()
159 def readlink(self) -> Self:
160 return self._from_upath(self.__wrapped__.readlink())
162 def symlink_to(
163 self,
164 target: ReadablePathLike,
165 target_is_directory: bool = False,
166 ) -> None:
167 if not isinstance(target, str):
168 target = vfspath(target)
169 self.__wrapped__.symlink_to(target, target_is_directory=target_is_directory)
171 def mkdir(
172 self,
173 mode: int = 0o777,
174 parents: bool = False,
175 exist_ok: bool = False,
176 ) -> None:
177 self.__wrapped__.mkdir(mode=mode, parents=parents, exist_ok=exist_ok)
179 def __open_writer__(self, mode: Literal["a", "w", "x"]) -> BinaryIO:
180 return self.__wrapped__.__open_writer__(mode)
182 @overload
183 def open(
184 self,
185 mode: Literal["r", "w", "a"] = "r",
186 buffering: int = ...,
187 encoding: str = ...,
188 errors: str = ...,
189 newline: str = ...,
190 **fsspec_kwargs: Any,
191 ) -> TextIO: ...
193 @overload
194 def open(
195 self,
196 mode: Literal["rb", "wb", "ab"],
197 buffering: int = ...,
198 encoding: str = ...,
199 errors: str = ...,
200 newline: str = ...,
201 **fsspec_kwargs: Any,
202 ) -> BinaryIO: ...
204 @overload
205 def open(
206 self,
207 mode: str,
208 *args: Any,
209 **fsspec_kwargs: Any,
210 ) -> IO[Any]: ...
212 def open(
213 self,
214 mode: str = "r",
215 buffering: int = UNSET_DEFAULT,
216 encoding: str | None = UNSET_DEFAULT,
217 errors: str | None = UNSET_DEFAULT,
218 newline: str | None = UNSET_DEFAULT,
219 **fsspec_kwargs: Any,
220 ) -> IO[Any]:
221 return self.__wrapped__.open(
222 mode,
223 buffering,
224 encoding,
225 errors,
226 newline,
227 **fsspec_kwargs,
228 )
230 def stat(
231 self,
232 *,
233 follow_symlinks=True,
234 ) -> StatResultType:
235 return self.__wrapped__.stat(follow_symlinks=follow_symlinks)
237 def lstat(self) -> StatResultType:
238 return self.__wrapped__.stat(follow_symlinks=False)
240 def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None:
241 self.__wrapped__.chmod(mode=mode, follow_symlinks=follow_symlinks)
243 def exists(self, *, follow_symlinks=True) -> bool:
244 return self.__wrapped__.exists(follow_symlinks=follow_symlinks)
246 def is_dir(self) -> bool:
247 return self.__wrapped__.is_dir()
249 def is_file(self) -> bool:
250 return self.__wrapped__.is_file()
252 def is_mount(self) -> bool:
253 return self.__wrapped__.is_mount()
255 def is_symlink(self) -> bool:
256 return self.__wrapped__.is_symlink()
258 def is_junction(self) -> bool:
259 return self.__wrapped__.is_junction()
261 def is_block_device(self) -> bool:
262 return self.__wrapped__.is_block_device()
264 def is_char_device(self) -> bool:
265 return self.__wrapped__.is_char_device()
267 def is_fifo(self) -> bool:
268 return self.__wrapped__.is_fifo()
270 def is_socket(self) -> bool:
271 return self.__wrapped__.is_socket()
273 def is_reserved(self) -> bool:
274 return self.__wrapped__.is_reserved()
276 def expanduser(self) -> Self:
277 return self._from_upath(self.__wrapped__.expanduser())
279 def glob(
280 self,
281 pattern: str,
282 *,
283 case_sensitive: bool | None = None,
284 recurse_symlinks: bool = False,
285 ) -> Iterator[Self]:
286 for p in self.__wrapped__.glob(
287 pattern, case_sensitive=case_sensitive, recurse_symlinks=recurse_symlinks
288 ):
289 yield self._from_upath(p)
291 def rglob(
292 self,
293 pattern: str,
294 *,
295 case_sensitive: bool | None = None,
296 recurse_symlinks: bool = False,
297 ) -> Iterator[Self]:
298 for p in self.__wrapped__.rglob(
299 pattern, case_sensitive=case_sensitive, recurse_symlinks=recurse_symlinks
300 ):
301 yield self._from_upath(p)
303 def owner(self) -> str:
304 return self.__wrapped__.owner()
306 def group(self) -> str:
307 return self.__wrapped__.group()
309 def absolute(self) -> Self:
310 return self._from_upath(self.__wrapped__.absolute())
312 def is_absolute(self) -> bool:
313 return self.__wrapped__.is_absolute()
315 def __eq__(self, other: object) -> bool:
316 if not isinstance(other, type(self)):
317 return NotImplemented
318 return self.__wrapped__.__eq__(other.__wrapped__)
320 def __hash__(self) -> int:
321 return self.__wrapped__.__hash__()
323 def __ne__(self, other: object) -> bool:
324 if not isinstance(other, type(self)):
325 return NotImplemented
326 return self.__wrapped__.__ne__(other.__wrapped__)
328 def __lt__(self, other: object) -> bool:
329 if not isinstance(other, type(self)):
330 return NotImplemented
331 return self.__wrapped__.__lt__(other.__wrapped__)
333 def __le__(self, other: object) -> bool:
334 if not isinstance(other, type(self)):
335 return NotImplemented
336 return self.__wrapped__.__le__(other.__wrapped__)
338 def __gt__(self, other: object) -> bool:
339 if not isinstance(other, type(self)):
340 return NotImplemented
341 return self.__wrapped__.__gt__(other.__wrapped__)
343 def __ge__(self, other: object) -> bool:
344 if not isinstance(other, type(self)):
345 return NotImplemented
346 return self.__wrapped__.__ge__(other.__wrapped__)
348 def resolve(self, strict: bool = False) -> Self:
349 return self._from_upath(self.__wrapped__.resolve(strict=strict))
351 def touch(self, mode=0o666, exist_ok=True) -> None:
352 self.__wrapped__.touch(mode=mode, exist_ok=exist_ok)
354 def lchmod(self, mode: int) -> None:
355 self.__wrapped__.lchmod(mode=mode)
357 def unlink(self, missing_ok: bool = False) -> None:
358 self.__wrapped__.unlink(missing_ok=missing_ok)
360 def rmdir(self, recursive: bool = UNSET_DEFAULT) -> None: # fixme: non-standard
361 kwargs: dict[str, Any] = {}
362 if recursive is not UNSET_DEFAULT:
363 kwargs["recursive"] = recursive
364 self.__wrapped__.rmdir(**kwargs)
366 def rename(
367 self,
368 target: WritablePathLike,
369 *, # note: non-standard compared to pathlib
370 recursive: bool = UNSET_DEFAULT,
371 maxdepth: int | None = UNSET_DEFAULT,
372 **kwargs: Any,
373 ) -> Self:
374 if recursive is not UNSET_DEFAULT:
375 kwargs["recursive"] = recursive
376 if maxdepth is not UNSET_DEFAULT:
377 kwargs["maxdepth"] = maxdepth
378 return self._from_upath(
379 self.__wrapped__.rename(
380 target.__wrapped__ if isinstance(target, ProxyUPath) else target,
381 **kwargs,
382 )
383 )
385 def replace(self, target: WritablePathLike) -> Self:
386 return self._from_upath(self.__wrapped__.replace(target))
388 @property
389 def drive(self) -> str:
390 return self.__wrapped__.drive
392 @property
393 def root(self) -> str:
394 return self.__wrapped__.root
396 def __reduce__(self):
397 return type(self)._from_upath, (self.__wrapped__,)
399 @classmethod
400 def from_uri(cls, uri: str, **storage_options: Any) -> Self:
401 return cls(uri, **storage_options)
403 def as_uri(self) -> str:
404 return self.__wrapped__.as_uri()
406 def as_posix(self) -> str:
407 return self.__wrapped__.as_posix()
409 def samefile(self, other_path) -> bool:
410 return self.__wrapped__.samefile(other_path)
412 @classmethod_or_method
413 def cwd(cls_or_self) -> Self: # noqa: B902
414 if isinstance(cls_or_self, type):
415 raise UnsupportedOperation(".cwd() not supported")
416 else:
417 return cls_or_self._from_upath(cls_or_self.__wrapped__.cwd())
419 @classmethod
420 def home(cls) -> Self:
421 raise UnsupportedOperation(".home() not supported")
423 def relative_to( # type: ignore[override]
424 self,
425 other,
426 /,
427 *_deprecated,
428 walk_up=False,
429 ) -> Self:
430 if isinstance(other, ProxyUPath):
431 other = other.__wrapped__
432 return self._from_upath(
433 self.__wrapped__.relative_to(other, *_deprecated, walk_up=walk_up)
434 )
436 def is_relative_to(self, other, /, *_deprecated) -> bool: # type: ignore[override]
437 if not isinstance(other, str):
438 other = vfspath(other)
439 return self.__wrapped__.is_relative_to(other, *_deprecated)
441 def hardlink_to(self, target: ReadablePathLike) -> None:
442 if not isinstance(target, str):
443 target = vfspath(target)
444 return self.__wrapped__.hardlink_to(target)
446 def match(self, pattern: str, *, case_sensitive: bool | None = None) -> bool:
447 return self.__wrapped__.match(pattern)
449 @property
450 def protocol(self) -> str:
451 return self.__wrapped__.protocol
453 @property
454 def storage_options(self) -> Mapping[str, Any]:
455 return self.__wrapped__.storage_options
457 @property
458 def fs(self) -> AbstractFileSystem:
459 return self.__wrapped__.fs
461 @property
462 def path(self) -> str:
463 return self.__wrapped__.path
465 def joinuri(self, uri: JoinablePathLike) -> Self:
466 return self._from_upath(self.__wrapped__.joinuri(uri))
468 @property
469 def _url(self) -> SplitResult:
470 return self.__wrapped__._url
472 def read_bytes(self) -> bytes:
473 return self.__wrapped__.read_bytes()
475 def read_text(
476 self,
477 encoding: str | None = None,
478 errors: str | None = None,
479 newline: str | None = None,
480 ) -> str:
481 return self.__wrapped__.read_text(
482 encoding=encoding, errors=errors, newline=newline
483 )
485 def walk(
486 self,
487 top_down: bool = True,
488 on_error: Callable[[Exception], Any] | None = None,
489 follow_symlinks: bool = False,
490 ) -> Iterator[tuple[Self, list[str], list[str]]]:
491 for pth, dirnames, filenames in self.__wrapped__.walk(
492 top_down=top_down, on_error=on_error, follow_symlinks=follow_symlinks
493 ):
494 yield self._from_upath(pth), dirnames, filenames
496 def copy(self, target: WritablePathLike, **kwargs: Any) -> Self: # type: ignore[override] # noqa: E501
497 return self._from_upath(self.__wrapped__.copy(target, **kwargs))
499 def copy_into(self, target_dir: WritablePathLike, **kwargs: Any) -> Self: # type: ignore[override] # noqa: E501
500 return self._from_upath(self.__wrapped__.copy_into(target_dir, **kwargs))
502 def move(self, target: WritablePathLike, **kwargs: Any) -> Self: # type: ignore[override] # noqa: E501
503 return self._from_upath(self.__wrapped__.move(target, **kwargs))
505 def move_into(self, target_dir: WritablePathLike, **kwargs: Any) -> Self: # type: ignore[override] # noqa: E501
506 return self._from_upath(self.__wrapped__.move_into(target_dir, **kwargs))
508 def write_bytes(self, data: bytes) -> int:
509 return self.__wrapped__.write_bytes(data)
511 def write_text(
512 self,
513 data: str,
514 encoding: str | None = None,
515 errors: str | None = None,
516 newline: str | None = None,
517 ) -> int:
518 return self.__wrapped__.write_text(
519 data, encoding=encoding, errors=errors, newline=newline
520 )
522 def _copy_from(
523 self, source: ReadablePath | Self, follow_symlinks: bool = True
524 ) -> None:
525 self.__wrapped__._copy_from(source, follow_symlinks=follow_symlinks) # type: ignore # noqa: E501
527 @property
528 def anchor(self) -> str:
529 return self.__wrapped__.anchor
531 @property
532 def name(self) -> str:
533 return self.__wrapped__.name
535 @property
536 def suffix(self) -> str:
537 return self.__wrapped__.suffix
539 @property
540 def suffixes(self) -> list[str]:
541 return self.__wrapped__.suffixes
543 @property
544 def stem(self) -> str:
545 return self.__wrapped__.stem
547 def with_stem(self, stem: str) -> Self:
548 return self._from_upath(self.__wrapped__.with_stem(stem))
550 def with_suffix(self, suffix: str) -> Self:
551 return self._from_upath(self.__wrapped__.with_suffix(suffix))
553 def joinpath(self, *pathsegments: JoinablePathLike) -> Self:
554 return self._from_upath(self.__wrapped__.joinpath(*pathsegments))
556 def __truediv__(self, other: str | Self) -> Self:
557 return self._from_upath(
558 self.__wrapped__.__truediv__(other) # type: ignore[operator]
559 )
561 def __rtruediv__(self, other: str | Self) -> Self:
562 return self._from_upath(
563 self.__wrapped__.__rtruediv__(other) # type: ignore[operator]
564 )
566 @property
567 def parent(self) -> Self:
568 return self._from_upath(self.__wrapped__.parent)
570 @property
571 def parents(self) -> Sequence[Self]:
572 return tuple(self._from_upath(p) for p in self.__wrapped__.parents)
574 def full_match(
575 self,
576 pattern: str | SupportsPathLike,
577 *,
578 case_sensitive: bool | None = None,
579 ) -> bool:
580 return self.__wrapped__.full_match(pattern, case_sensitive=case_sensitive)
582 @classmethod
583 def __get_pydantic_core_schema__(
584 cls, source_type: Any, handler: GetCoreSchemaHandler
585 ) -> CoreSchema:
586 cs = UPath.__get_pydantic_core_schema__.__func__ # type: ignore[attr-defined]
587 return cs(cls, source_type, handler)
590UPath.register(ProxyUPath)