Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/core.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

870 statements  

1"""upath.core module: UPath base class implementation""" 

2 

3from __future__ import annotations 

4 

5import sys 

6import warnings 

7from abc import ABCMeta 

8from abc import abstractmethod 

9from collections.abc import Iterator 

10from collections.abc import Mapping 

11from collections.abc import Sequence 

12from copy import copy 

13from pathlib import PurePath 

14from types import MappingProxyType 

15from typing import IO 

16from typing import TYPE_CHECKING 

17from typing import Any 

18from typing import BinaryIO 

19from typing import Literal 

20from typing import NoReturn 

21from typing import TextIO 

22from typing import TypeVar 

23from typing import overload 

24from urllib.parse import SplitResult 

25from urllib.parse import urlsplit 

26 

27from fsspec.registry import get_filesystem_class 

28from fsspec.spec import AbstractFileSystem 

29 

30from upath._chain import DEFAULT_CHAIN_PARSER 

31from upath._chain import Chain 

32from upath._chain import FSSpecChainParser 

33from upath._flavour import LazyFlavourDescriptor 

34from upath._flavour import WrappedFileSystemFlavour 

35from upath._flavour import upath_get_kwargs_from_url 

36from upath._flavour import upath_urijoin 

37from upath._info import UPathInfo 

38from upath._protocol import compatible_protocol 

39from upath._protocol import get_upath_protocol 

40from upath._stat import UPathStatResult 

41from upath.registry import _get_implementation_protocols 

42from upath.registry import available_implementations 

43from upath.registry import get_upath_class 

44from upath.types import UNSET_DEFAULT 

45from upath.types import JoinablePathLike 

46from upath.types import OnNameCollisionFunc 

47from upath.types import PathInfo 

48from upath.types import ReadablePath 

49from upath.types import ReadablePathLike 

50from upath.types import StatResultType 

51from upath.types import SupportsPathLike 

52from upath.types import UPathParser 

53from upath.types import WritablePath 

54from upath.types import WritablePathLike 

55 

56if sys.version_info >= (3, 13): 

57 from pathlib import UnsupportedOperation 

58else: 

59 UnsupportedOperation = NotImplementedError 

60 """Raised when an unsupported operation is called on a path object.""" 

61 

62if TYPE_CHECKING: 

63 import upath.implementations as _uimpl 

64 

65 if sys.version_info >= (3, 11): 

66 from typing import Self 

67 else: 

68 from typing_extensions import Self 

69 

70 from pydantic import GetCoreSchemaHandler 

71 from pydantic_core.core_schema import CoreSchema 

72 

73 _MT = TypeVar("_MT") 

74 _WT = TypeVar("_WT", bound="WritablePath") 

75 

76__all__ = [ 

77 "UPath", 

78 "UnsupportedOperation", 

79] 

80 

81_FSSPEC_HAS_WORKING_GLOB = None 

82 

83 

84def _check_fsspec_has_working_glob(): 

85 global _FSSPEC_HAS_WORKING_GLOB 

86 from fsspec.implementations.memory import MemoryFileSystem 

87 

88 m = type("_M", (MemoryFileSystem,), {"store": {}, "pseudo_dirs": [""]})() 

89 m.touch("a.txt") 

90 m.touch("f/b.txt") 

91 g = _FSSPEC_HAS_WORKING_GLOB = len(m.glob("**/*.txt")) == 2 

92 return g 

93 

94 

95def _make_instance(cls, args, kwargs): 

96 """helper for pickling UPath instances""" 

97 # Extract _relative_base if present 

98 relative_base = kwargs.pop("_relative_base", None) 

99 instance = cls(*args, **kwargs) 

100 if relative_base is not None: 

101 instance._relative_base = relative_base 

102 return instance 

103 

104 

105def _buffering2blocksize(mode: str, buffering: int) -> int | None: 

106 if not isinstance(buffering, int): 

107 raise TypeError("buffering must be an integer") 

108 if buffering == 0: # buffering disabled 

109 if "b" not in mode: # text mode 

110 raise ValueError("can't have unbuffered text I/O") 

111 return buffering 

112 elif buffering == -1: 

113 return None 

114 else: 

115 return buffering 

116 

117 

118def _raise_unsupported(cls_name: str, method: str) -> NoReturn: 

119 raise UnsupportedOperation(f"{cls_name}.{method}() is unsupported") 

120 

121 

122class _IncompatibleProtocolError(TypeError, ValueError): 

123 """switch to TypeError for incompatible protocols in a backward compatible way. 

124 

125 !!! Do not use this exception directly !!! 

126 Catch TypeError instead, if you need to handle incompatible protocol errors. 

127 

128 We'll do the switch in a future major release. 

129 """ 

130 

131 # evil: make this look like a built-in TypeError 

132 __module__ = "builtins" 

133 __qualname__ = "TypeError" 

134 

135 def __repr__(self) -> str: 

136 return f"TypeError({', '.join(map(repr, self.args))})" 

137 

138 

139class _UPathMeta(ABCMeta): 

140 """metaclass for UPath to customize instance creation 

141 

142 There are two main reasons for this metaclass: 

143 - support copying UPath instances via UPath(existing_upath) 

144 - force calling __init__ on instance creation for instances of a non-subclass 

145 """ 

146 

147 if sys.version_info < (3, 11): 

148 # pathlib 3.9 and 3.10 supported `Path[str]` but 

149 # did not return a GenericAlias but the class itself? 

150 def __getitem__(cls, key): 

151 return cls 

152 

153 def __call__(cls: type[_MT], *args: Any, **kwargs: Any) -> _MT: 

154 # create a copy if UPath class 

155 try: 

156 (arg0,) = args 

157 except ValueError: 

158 pass 

159 else: 

160 if isinstance(arg0, UPath) and not kwargs: 

161 return copy(arg0) # type: ignore[return-value] 

162 # We do this call manually, because cls could be a registered 

163 # subclass of UPath that is not directly inheriting from UPath. 

164 inst = cls.__new__(cls, *args, **kwargs) 

165 inst.__init__(*args, **kwargs) # type: ignore[misc] 

166 return inst 

167 

168 

169class _UPathMixin(metaclass=_UPathMeta): 

170 """Mixin class for UPath to allow sharing some common functionality 

171 between UPath and PosixUPath/WindowsUPath. 

172 """ 

173 

174 __slots__ = () 

175 

176 @property 

177 @abstractmethod 

178 def parser(self) -> UPathParser: 

179 """The parser (flavour) for this UPath instance.""" 

180 raise NotImplementedError 

181 

182 @property 

183 def _protocol(self) -> str: 

184 return self._chain.nest().protocol 

185 

186 @_protocol.setter 

187 def _protocol(self, value: str) -> None: 

188 self._chain = self._chain.replace(protocol=value) 

189 

190 @property 

191 def _storage_options(self) -> dict[str, Any]: 

192 return self._chain.nest().storage_options 

193 

194 @_storage_options.setter 

195 def _storage_options(self, value: dict[str, Any]) -> None: 

196 self._chain = self._chain.replace(storage_options=value) 

197 

198 @property 

199 @abstractmethod 

200 def _chain(self) -> Chain: 

201 raise NotImplementedError 

202 

203 @_chain.setter 

204 @abstractmethod 

205 def _chain(self, value: Chain) -> None: 

206 raise NotImplementedError 

207 

208 @property 

209 @abstractmethod 

210 def _chain_parser(self) -> FSSpecChainParser: 

211 raise NotImplementedError 

212 

213 @_chain_parser.setter 

214 @abstractmethod 

215 def _chain_parser(self, value: FSSpecChainParser) -> None: 

216 raise NotImplementedError 

217 

218 @property 

219 @abstractmethod 

220 def _fs_cached(self) -> AbstractFileSystem: 

221 raise NotImplementedError 

222 

223 @_fs_cached.setter 

224 def _fs_cached(self, value: AbstractFileSystem): 

225 raise NotImplementedError 

226 

227 @property 

228 @abstractmethod 

229 def _raw_urlpaths(self) -> Sequence[JoinablePathLike]: 

230 raise NotImplementedError 

231 

232 @_raw_urlpaths.setter 

233 def _raw_urlpaths(self, value: Sequence[JoinablePathLike]) -> None: 

234 raise NotImplementedError 

235 

236 @property 

237 @abstractmethod 

238 def _relative_base(self) -> str | None: 

239 raise NotImplementedError 

240 

241 @_relative_base.setter 

242 def _relative_base(self, value: str | None) -> None: 

243 raise NotImplementedError 

244 

245 # === upath.UPath PUBLIC ADDITIONAL API =========================== 

246 

247 @property 

248 def protocol(self) -> str: 

249 """The fsspec protocol for the path. 

250 

251 Note 

252 ---- 

253 Protocols are linked to upath and fsspec filesystems via the 

254 `upath.registry` and `fsspec.registry` modules. They basically 

255 represent the URI scheme used for the specific filesystem. 

256 

257 Examples 

258 -------- 

259 >>> from upath import UPath 

260 >>> p0 = UPath("s3://my-bucket/path/to/file.txt") 

261 >>> p0.protocol 

262 's3' 

263 >>> p1 = UPath("/foo/bar/baz.txt", protocol="memory") 

264 >>> p1.protocol 

265 'memory' 

266 

267 """ 

268 return self._protocol 

269 

270 @property 

271 def storage_options(self) -> Mapping[str, Any]: 

272 """The read-only fsspec storage options for the path. 

273 

274 Note 

275 ---- 

276 Storage options are specific to each fsspec filesystem and 

277 can include parameters such as authentication credentials, 

278 connection settings, and other options that affect how the 

279 filesystem interacts with the underlying storage. 

280 

281 Examples 

282 -------- 

283 >>> from upath import UPath 

284 >>> p = UPath("s3://my-bucket/path/to/file.txt", anon=True) 

285 >>> p.storage_options['anon'] 

286 True 

287 

288 """ 

289 return MappingProxyType(self._storage_options) 

290 

291 @property 

292 def fs(self) -> AbstractFileSystem: 

293 """The cached fsspec filesystem instance for the path. 

294 

295 This is the underlying fsspec filesystem instance. It's 

296 instantiated on first filesystem access and cached. Can 

297 be used to access fsspec-specific functionality not exposed 

298 by the UPath API. 

299 

300 Examples 

301 -------- 

302 >>> from upath import UPath 

303 >>> p = UPath("s3://my-bucket/path/to/file.txt") 

304 >>> p.fs 

305 <s3fs.core.S3FileSystem object at 0x...> 

306 >>> p.fs.get_tags(p.path) 

307 {'VersionId': 'null', 'ContentLength': 12345, ...} 

308 

309 """ 

310 try: 

311 return self._fs_cached 

312 except AttributeError: 

313 fs = self._fs_cached = self._fs_factory( 

314 str(self), self.protocol, self.storage_options 

315 ) 

316 return fs 

317 

318 @property 

319 def path(self) -> str: 

320 """The path used by fsspec filesystem. 

321 

322 FSSpec filesystems usually handle paths stripped of protocol. 

323 This property returns the path suitable for use with the 

324 underlying fsspec filesystem. It guarantees that a filesystem's 

325 strip_protocol method is applied correctly. 

326 

327 Examples 

328 -------- 

329 >>> from upath import UPath 

330 >>> p = UPath("memory:///foo/bar.txt") 

331 >>> str(p) 

332 'memory:///foo/bar.txt' 

333 >>> p.path 

334 '/foo/bar.txt' 

335 >>> p.fs.exists(p.path) 

336 True 

337 

338 """ 

339 if self._relative_base is not None: 

340 try: 

341 # For relative paths, we need to resolve to absolute path 

342 current_dir = self.cwd() # type: ignore[attr-defined] 

343 except NotImplementedError: 

344 raise UnsupportedOperation( 

345 f"fsspec paths can not be relative and" 

346 f" {type(self).__name__}.cwd() is unsupported" 

347 ) from None 

348 # Join the current directory with the relative path 

349 if (self_path := str(self)) == ".": 

350 path = str(current_dir) 

351 else: 

352 path = current_dir.parser.join(str(current_dir), self_path) 

353 return self.parser.strip_protocol(path) 

354 return self._chain.active_path 

355 

356 def joinuri(self, uri: JoinablePathLike) -> UPath: 

357 """Join with urljoin behavior for UPath instances. 

358 

359 Examples 

360 -------- 

361 >>> from upath import UPath 

362 >>> p = UPath("https://example.com/dir/subdir/") 

363 >>> p.joinuri("file.txt") 

364 HTTPSPath('https://example.com/dir/subdir/file.txt') 

365 >>> p.joinuri("/anotherdir/otherfile.txt") 

366 HTTPSPath('https://example.com/anotherdir/otherfile.txt') 

367 >>> p.joinuri("memory:///foo/bar.txt" 

368 MemoryPath('memory:///foo/bar.txt') 

369 

370 """ 

371 # short circuit if the new uri uses a different protocol 

372 other_protocol = get_upath_protocol(uri) 

373 if other_protocol and other_protocol != self._protocol: 

374 return UPath(uri) 

375 return UPath( 

376 upath_urijoin(str(self), str(uri)), 

377 protocol=other_protocol or self._protocol, 

378 **self.storage_options, 

379 ) 

380 

381 # === upath.UPath CUSTOMIZABLE API ================================ 

382 

383 @classmethod 

384 def _transform_init_args( 

385 cls, 

386 args: tuple[JoinablePathLike, ...], 

387 protocol: str, 

388 storage_options: dict[str, Any], 

389 ) -> tuple[tuple[JoinablePathLike, ...], str, dict[str, Any]]: 

390 """allow customization of init args in subclasses""" 

391 return args, protocol, storage_options 

392 

393 @classmethod 

394 def _parse_storage_options( 

395 cls, 

396 urlpath: str, 

397 protocol: str, 

398 storage_options: Mapping[str, Any], 

399 ) -> dict[str, Any]: 

400 """Parse storage_options from the urlpath""" 

401 pth_storage_options = upath_get_kwargs_from_url(urlpath) 

402 return {**pth_storage_options, **storage_options} 

403 

404 @classmethod 

405 def _fs_factory( 

406 cls, 

407 urlpath: str, 

408 protocol: str, 

409 storage_options: Mapping[str, Any], 

410 ) -> AbstractFileSystem: 

411 """Instantiate the filesystem_spec filesystem class""" 

412 fs_cls = get_filesystem_class(protocol) 

413 return fs_cls(**storage_options) 

414 

415 # === upath.UPath constructor ===================================== 

416 

417 _protocol_dispatch: bool | None = None 

418 

419 def __new__( # noqa C901 

420 cls, 

421 *args: JoinablePathLike, 

422 protocol: str | None = None, 

423 chain_parser: FSSpecChainParser = DEFAULT_CHAIN_PARSER, 

424 **storage_options: Any, 

425 ) -> UPath: 

426 # narrow type 

427 if not issubclass(cls, UPath): 

428 raise TypeError("UPath.__new__ can't instantiate non-UPath classes") 

429 

430 # deprecate 'scheme' 

431 if "scheme" in storage_options: 

432 warnings.warn( 

433 "use 'protocol' kwarg instead of 'scheme'", 

434 DeprecationWarning, 

435 stacklevel=2, 

436 ) 

437 protocol = storage_options.pop("scheme") 

438 

439 # determine the protocol 

440 try: 

441 pth_protocol = get_upath_protocol( 

442 args[0] if args else "", 

443 protocol=protocol, 

444 storage_options=storage_options, 

445 ) 

446 except ValueError as e: 

447 if "incompatible with" in str(e): 

448 raise _IncompatibleProtocolError(str(e)) from e 

449 raise 

450 

451 # subclasses should default to their own protocol 

452 if protocol is None and cls is not UPath: 

453 impl_protocols = _get_implementation_protocols(cls) 

454 if not pth_protocol and impl_protocols: 

455 pth_protocol = impl_protocols[0] 

456 elif pth_protocol and pth_protocol not in impl_protocols: 

457 msg_protocol = pth_protocol 

458 if not pth_protocol: 

459 msg_protocol = "'' (empty string)" 

460 msg = ( 

461 f"{cls.__name__!s}(...) detected protocol {msg_protocol!s}" 

462 f" which is incompatible with {cls.__name__}." 

463 ) 

464 if not pth_protocol or pth_protocol not in available_implementations(): 

465 msg += ( 

466 " Did you forget to register the subclass for this protocol" 

467 " with upath.registry.register_implementation()?" 

468 ) 

469 raise _IncompatibleProtocolError(msg) 

470 

471 # determine which UPath subclass to dispatch to 

472 upath_cls: type[UPath] | None 

473 if cls._protocol_dispatch or cls._protocol_dispatch is None: 

474 upath_cls = get_upath_class(protocol=pth_protocol) 

475 if upath_cls is None: 

476 raise ValueError(f"Unsupported filesystem: {pth_protocol!r}") 

477 else: 

478 # user subclasses can request to disable protocol dispatch 

479 # by setting MyUPathSubclass._protocol_dispatch to `False`. 

480 # This will effectively ignore the registered UPath 

481 # implementations and return an instance of MyUPathSubclass. 

482 # This be useful if a subclass wants to extend the UPath 

483 # api, and it is fine to rely on the default implementation 

484 # for all supported user protocols. 

485 # 

486 # THIS IS DEPRECATED! 

487 # Use upath.extensions.ProxyUPath to extend the UPath API 

488 warnings.warn( 

489 f"{cls.__name__}._protocol_dispatch = False is deprecated and" 

490 " will be removed in future universal_pathlib versions." 

491 " To extend the UPath API, subclass upath.extensions.ProxyUPath", 

492 DeprecationWarning, 

493 stacklevel=2, 

494 ) 

495 upath_cls = cls 

496 

497 if issubclass(upath_cls, cls): 

498 pass 

499 

500 elif not issubclass(upath_cls, UPath): 

501 raise RuntimeError("UPath.__new__ expected cls to be subclass of UPath") 

502 

503 else: 

504 msg_protocol = pth_protocol 

505 if not pth_protocol: 

506 msg_protocol = "'' (empty string)" 

507 msg = ( 

508 f"{cls.__name__!s}(...) detected protocol {msg_protocol!s}" 

509 f" which is incompatible with {cls.__name__}." 

510 ) 

511 if ( 

512 # find a better way 

513 (not pth_protocol and cls.__name__ not in ["CloudPath", "LocalPath"]) 

514 or pth_protocol 

515 and pth_protocol not in available_implementations() 

516 ): 

517 msg += ( 

518 " Did you forget to register the subclass for this protocol" 

519 " with upath.registry.register_implementation()?" 

520 ) 

521 raise _IncompatibleProtocolError(msg) 

522 

523 return object.__new__(upath_cls) 

524 

525 def __init__( 

526 self, 

527 *args: JoinablePathLike, 

528 protocol: str | None = None, 

529 chain_parser: FSSpecChainParser = DEFAULT_CHAIN_PARSER, 

530 **storage_options: Any, 

531 ) -> None: 

532 """Initialize a UPath instance 

533 

534 When instantiating a `UPath`, the detected or provided protocol determines 

535 the `UPath` subclass that will be instantiated. The protocol is looked up 

536 via the `get_upath_protocol` function, which loads the registered `UPath` 

537 implementation from the registry. If no `UPath` implementation is found for 

538 the detected protocol, but a registered `fsspec` filesystem exists for the 

539 protocol, a default dynamically created `UPath` implementation will be used. 

540 

541 Parameters 

542 ---------- 

543 *args : 

544 The path (or uri) segments to construct the UPath from. The first 

545 argument is used to detect the protocol if no protocol is provided. 

546 protocol : 

547 The protocol to use for the path. 

548 chain_parser : 

549 A chain parser instance for chained urlpaths. _(experimental)_ 

550 **storage_options : 

551 Additional storage options for the path. 

552 

553 """ 

554 # todo: avoid duplicating this call from __new__ 

555 protocol = get_upath_protocol( 

556 args[0] if args else "", 

557 protocol=protocol, 

558 storage_options=storage_options, 

559 ) 

560 args, protocol, storage_options = type(self)._transform_init_args( 

561 args, protocol, storage_options 

562 ) 

563 

564 # check that UPath subclasses in args are compatible 

565 # TODO: 

566 # Future versions of UPath could verify that storage_options 

567 # can be combined between UPath instances. Not sure if this 

568 # is really necessary though. A warning might be enough... 

569 if not compatible_protocol(protocol, *args): 

570 raise ValueError("can't combine incompatible UPath protocols") 

571 

572 # subclasses should default to their own protocol 

573 if not protocol: 

574 impl_protocols = _get_implementation_protocols(type(self)) 

575 if impl_protocols: 

576 protocol = impl_protocols[0] 

577 

578 if args: 

579 args0 = args[0] 

580 if isinstance(args0, UPath): 

581 storage_options = { 

582 **args0._chain.nest().storage_options, 

583 **storage_options, 

584 } 

585 str_args0 = args0.__vfspath__() 

586 

587 else: 

588 if hasattr(args0, "__fspath__") and args0.__fspath__ is not None: 

589 str_args0 = args0.__fspath__() 

590 elif hasattr(args0, "__vfspath__") and args0.__vfspath__ is not None: 

591 str_args0 = args0.__vfspath__() 

592 elif isinstance(args0, str): 

593 str_args0 = args0 

594 else: 

595 raise TypeError( 

596 "argument should be a UPath, str, " 

597 f"or support __vfspath__ or __fspath__, not {type(args0)!r}" 

598 ) 

599 storage_options = type(self)._parse_storage_options( 

600 str_args0, protocol, storage_options 

601 ) 

602 else: 

603 str_args0 = "." 

604 

605 segments = chain_parser.unchain( 

606 str_args0, 

607 protocol=protocol, 

608 storage_options=storage_options, 

609 ) 

610 # FIXME: normalization needs to happen in unchain already... 

611 chain = Chain.from_list(Chain.from_list(segments).to_list()) 

612 if len(args) > 1: 

613 flavour = WrappedFileSystemFlavour.from_protocol(chain.active_path_protocol) 

614 joined = flavour.join(chain.active_path, *args[1:]) 

615 stripped = flavour.strip_protocol(joined) 

616 chain = chain.replace(path=stripped) 

617 self._chain = chain 

618 self._chain_parser = chain_parser 

619 self._raw_urlpaths = args 

620 self._relative_base = None 

621 

622 # --- deprecated attributes --------------------------------------- 

623 

624 @property 

625 def _url(self) -> SplitResult: 

626 # TODO: 

627 # _url should be deprecated, but for now there is no good way of 

628 # accessing query parameters from urlpaths... 

629 return urlsplit(self.__str__()) 

630 

631 

632class UPath(_UPathMixin, WritablePath, ReadablePath): 

633 """Base class for pathlike paths backed by an fsspec filesystem. 

634 

635 Note 

636 ---- 

637 The following attributes and methods are specific to UPath instances and are not 

638 available on pathlib.Path instances. 

639 

640 Attributes 

641 ---------- 

642 protocol : 

643 The fsspec protocol for the path. 

644 storage_options : 

645 The fsspec storage options for the path. 

646 path : 

647 The path that a fsspec filesystem can use. 

648 fs : 

649 The cached fsspec filesystem instance for the path. 

650 

651 Methods 

652 ------- 

653 joinuri(*parts) : 

654 Join URI parts to this path. 

655 

656 

657 Info 

658 ---- 

659 Below are pathlib attributes and methods available on UPath instances. 

660 

661 Attributes 

662 ---------- 

663 drive : 

664 The drive component of the path. 

665 root : 

666 The root component of the path. 

667 anchor : 

668 The concatenation of the drive and root. 

669 parent : 

670 The logical parent of the path. 

671 parents : 

672 An immutable sequence providing access to the logical ancestors of the path. 

673 name : 

674 The final path component, excluding the drive and root, if any. 

675 suffix : 

676 The file extension of the final component, if any. 

677 suffixes : 

678 A list of the path's file extensions. 

679 stem : 

680 The final path component, without its suffix. 

681 info : 

682 Filesystem information about the path. 

683 parser : 

684 The path parser instance for parsing path segments. 

685 

686 Methods 

687 ------- 

688 __truediv__(key) : 

689 Combine this path with the argument using the `/` operator. 

690 __rtruediv__(key) : 

691 Combine the argument with this path using the `/` operator. 

692 as_posix() : 

693 Return the string representation of the path with forward slashes. 

694 is_absolute() : 

695 Return True if the path is absolute. 

696 is_relative_to(other) : 

697 Return True if the path is relative to another path. 

698 is_reserved() : 

699 Return True if the path is reserved under Windows. 

700 joinpath(*pathsegments) : 

701 Combine this path with one or several arguments, and return a new path. 

702 full_match(pattern, *, case_sensitive=None) : 

703 Match this path against the provided glob-style pattern. 

704 match(pattern, *, case_sensitive=None) : 

705 Match this path against the provided glob-style pattern. 

706 relative_to(other, walk_up=False) : 

707 Return a version of this path relative to another path. 

708 with_name(name) : 

709 Return a new path with the name changed. 

710 with_stem(stem) : 

711 Return a new path with the stem changed. 

712 with_suffix(suffix) : 

713 Return a new path with the suffix changed. 

714 with_segments(*pathsegments) : 

715 Construct a new path object from any number of path-like objects. 

716 from_uri(uri) : 

717 Return a new path from the given URI. 

718 as_uri() : 

719 Return the path as a URI. 

720 home() : 

721 Return a new path pointing to the user's home directory. 

722 expanduser() : 

723 Return a new path with expanded `~` constructs. 

724 cwd() : 

725 Return a new path pointing to the current working directory. 

726 absolute() : 

727 Make the path absolute, without normalization or resolving symlinks. 

728 resolve(strict=False) : 

729 Make the path absolute, resolving any symlinks. 

730 readlink() : 

731 Return the path to which the symbolic link points. 

732 stat(*, follow_symlinks=True) : 

733 Return the result of the stat() system call on this path. 

734 lstat() : 

735 Like stat(), but if the path points to a symlink, return the symlink's 

736 information. 

737 exists(*, follow_symlinks=True) : 

738 Return True if the path exists. 

739 is_file(*, follow_symlinks=True) : 

740 Return True if the path is a regular file. 

741 is_dir(*, follow_symlinks=True) : 

742 Return True if the path is a directory. 

743 is_symlink() : 

744 Return True if the path is a symbolic link. 

745 is_junction() : 

746 Return True if the path is a junction. 

747 is_mount() : 

748 Return True if the path is a mount point. 

749 is_socket() : 

750 Return True if the path is a socket. 

751 is_fifo() : 

752 Return True if the path is a FIFO. 

753 is_block_device() : 

754 Return True if the path is a block device. 

755 is_char_device() : 

756 Return True if the path is a character device. 

757 samefile(other_path) : 

758 Return True if this path points to the same file as other_path. 

759 open(mode='r', buffering=-1, encoding=None, errors=None, newline=None) : 

760 Open the file pointed to by the path. 

761 read_text(encoding=None, errors=None, newline=None) : 

762 Open the file in text mode, read it, and close the file. 

763 read_bytes() : 

764 Open the file in bytes mode, read it, and close the file. 

765 write_text(data, encoding=None, errors=None, newline=None) : 

766 Open the file in text mode, write to it, and close the file. 

767 write_bytes(data) : 

768 Open the file in bytes mode, write to it, and close the file. 

769 iterdir() : 

770 Yield path objects of the directory contents. 

771 glob(pattern, *, case_sensitive=None) : 

772 Iterate over this subtree and yield all existing files matching the 

773 given pattern. 

774 rglob(pattern, *, case_sensitive=None) : 

775 Recursively yield all existing files matching the given pattern. 

776 walk(top_down=True, on_error=None, follow_symlinks=False) : 

777 Generate the file names in a directory tree by walking the tree. 

778 touch(mode=0o666, exist_ok=True) : 

779 Create this file with the given access mode, if it doesn't exist. 

780 mkdir(mode=0o777, parents=False, exist_ok=False) : 

781 Create a new directory at this given path. 

782 symlink_to(target, target_is_directory=False) : 

783 Make this path a symbolic link pointing to target. 

784 hardlink_to(target) : 

785 Make this path a hard link pointing to the same file as target. 

786 copy(target, *, follow_symlinks=True, preserve_metadata=False) : 

787 Copy the contents of this file to the target file. 

788 copy_into(target_dir, *, follow_symlinks=True, preserve_metadata=False) : 

789 Copy this file or directory into the target directory. 

790 rename(target) : 

791 Rename this path to the target path. 

792 replace(target) : 

793 Rename this path to the target path, overwriting if that path exists. 

794 move(target) : 

795 Move this file or directory tree to the target path. 

796 move_into(target_dir) : 

797 Move this file or directory into the target directory. 

798 unlink(missing_ok=False) : 

799 Remove this file or link. 

800 rmdir() : 

801 Remove this directory. 

802 owner(*, follow_symlinks=True) : 

803 Return the login name of the file owner. 

804 group(*, follow_symlinks=True) : 

805 Return the group name of the file gid. 

806 chmod(mode, *, follow_symlinks=True) : 

807 Change the permissions of the path. 

808 lchmod(mode) : 

809 Like chmod() but, if the path points to a symlink, modify the symlink's 

810 permissions. 

811 

812 """ 

813 

814 __slots__ = ( 

815 "_chain", 

816 "_chain_parser", 

817 "_fs_cached", 

818 "_raw_urlpaths", 

819 "_relative_base", 

820 ) 

821 

822 if TYPE_CHECKING: # noqa: C901 

823 _chain: Chain 

824 _chain_parser: FSSpecChainParser 

825 _fs_cached: AbstractFileSystem 

826 _raw_urlpaths: Sequence[JoinablePathLike] 

827 _relative_base: str | None 

828 

829 @overload 

830 def __new__( 

831 cls, 

832 ) -> Self: ... 

833 @overload # noqa: E301 

834 def __new__( 

835 cls, 

836 *args: JoinablePathLike, 

837 protocol: Literal["simplecache"], 

838 **_: Any, 

839 ) -> _uimpl.cached.SimpleCachePath: ... 

840 @overload # noqa: E301 

841 def __new__( 

842 cls, 

843 *args: JoinablePathLike, 

844 protocol: Literal["gcs", "gs"], 

845 **_: Any, 

846 ) -> _uimpl.cloud.GCSPath: ... 

847 @overload # noqa: E301 

848 def __new__( 

849 cls, 

850 *args: JoinablePathLike, 

851 protocol: Literal["s3", "s3a"], 

852 **_: Any, 

853 ) -> _uimpl.cloud.S3Path: ... 

854 @overload # noqa: E301 

855 def __new__( 

856 cls, 

857 *args: JoinablePathLike, 

858 protocol: Literal["az", "abfs", "abfss", "adl"], 

859 **_: Any, 

860 ) -> _uimpl.cloud.AzurePath: ... 

861 @overload # noqa: E301 

862 def __new__( 

863 cls, 

864 *args: JoinablePathLike, 

865 protocol: Literal["hf"], 

866 **_: Any, 

867 ) -> _uimpl.cloud.HfPath: ... 

868 @overload # noqa: E301 

869 def __new__( 

870 cls, 

871 *args: JoinablePathLike, 

872 protocol: Literal["data"], 

873 **_: Any, 

874 ) -> _uimpl.data.DataPath: ... 

875 @overload # noqa: E301 

876 def __new__( 

877 cls, 

878 *args: JoinablePathLike, 

879 protocol: Literal["ftp"], 

880 **_: Any, 

881 ) -> _uimpl.ftp.FTPPath: ... 

882 @overload # noqa: E301 

883 def __new__( 

884 cls, 

885 *args: JoinablePathLike, 

886 protocol: Literal["github"], 

887 **_: Any, 

888 ) -> _uimpl.github.GitHubPath: ... 

889 @overload # noqa: E301 

890 def __new__( 

891 cls, 

892 *args: JoinablePathLike, 

893 protocol: Literal["hdfs"], 

894 **_: Any, 

895 ) -> _uimpl.hdfs.HDFSPath: ... 

896 @overload # noqa: E301 

897 def __new__( 

898 cls, 

899 *args: JoinablePathLike, 

900 protocol: Literal["http", "https"], 

901 **_: Any, 

902 ) -> _uimpl.http.HTTPPath: ... 

903 @overload # noqa: E301 

904 def __new__( 

905 cls, 

906 *args: JoinablePathLike, 

907 protocol: Literal["file", "local"], 

908 **_: Any, 

909 ) -> _uimpl.local.FilePath: ... 

910 @overload # noqa: E301 

911 def __new__( 

912 cls, 

913 *args: JoinablePathLike, 

914 protocol: Literal["memory"], 

915 **_: Any, 

916 ) -> _uimpl.memory.MemoryPath: ... 

917 @overload # noqa: E301 

918 def __new__( 

919 cls, 

920 *args: JoinablePathLike, 

921 protocol: Literal["sftp", "ssh"], 

922 **_: Any, 

923 ) -> _uimpl.sftp.SFTPPath: ... 

924 @overload # noqa: E301 

925 def __new__( 

926 cls, 

927 *args: JoinablePathLike, 

928 protocol: Literal["smb"], 

929 **_: Any, 

930 ) -> _uimpl.smb.SMBPath: ... 

931 @overload # noqa: E301 

932 def __new__( 

933 cls, 

934 *args: JoinablePathLike, 

935 protocol: Literal["tar"], 

936 **_: Any, 

937 ) -> _uimpl.tar.TarPath: ... 

938 @overload # noqa: E301 

939 def __new__( 

940 cls, 

941 *args: JoinablePathLike, 

942 protocol: Literal["webdav"], 

943 **_: Any, 

944 ) -> _uimpl.webdav.WebdavPath: ... 

945 @overload # noqa: E301 

946 def __new__( 

947 cls, 

948 *args: JoinablePathLike, 

949 protocol: Literal["zip"], 

950 **_: Any, 

951 ) -> _uimpl.zip.ZipPath: ... 

952 

953 if sys.platform == "win32": 

954 

955 @overload # noqa: E301 

956 def __new__( 

957 cls, 

958 *args: JoinablePathLike, 

959 protocol: Literal[""], 

960 **_: Any, 

961 ) -> _uimpl.local.WindowsUPath: ... 

962 

963 else: 

964 

965 @overload # noqa: E301 

966 def __new__( 

967 cls, 

968 *args: JoinablePathLike, 

969 protocol: Literal[""], 

970 **_: Any, 

971 ) -> _uimpl.local.PosixUPath: ... 

972 

973 @overload # noqa: E301 

974 def __new__( 

975 cls, 

976 *args: JoinablePathLike, 

977 protocol: str | None = ..., 

978 **_: Any, 

979 ) -> Self: ... 

980 

981 def __new__( 

982 cls, 

983 *args: JoinablePathLike, 

984 protocol: str | None = ..., 

985 chain_parser: FSSpecChainParser = ..., 

986 **storage_options: Any, 

987 ) -> Self: ... 

988 

989 # === JoinablePath attributes ===================================== 

990 

991 parser: UPathParser = LazyFlavourDescriptor() # type: ignore[assignment] 

992 

993 def with_segments(self, *pathsegments: JoinablePathLike) -> Self: 

994 """Construct a new path object from any number of path-like objects.""" 

995 # we change joinpath behavior if called from a relative path 

996 # this is not fully ideal, but currently the best way to move forward 

997 if is_relative := self._relative_base is not None: 

998 pathsegments = (self._relative_base, *pathsegments) 

999 

1000 new_instance = type(self)( 

1001 *pathsegments, 

1002 protocol=self._protocol, 

1003 **self.storage_options, 

1004 ) 

1005 if hasattr(self, "_fs_cached"): 

1006 new_instance._fs_cached = self._fs_cached 

1007 

1008 if is_relative: 

1009 new_instance._relative_base = self._relative_base 

1010 return new_instance 

1011 

1012 def __str__(self) -> str: 

1013 if self._relative_base is not None: 

1014 active_path = self._chain.active_path 

1015 stripped_base = self.parser.strip_protocol( 

1016 self._relative_base 

1017 ).removesuffix(self.parser.sep) 

1018 if not active_path.startswith(stripped_base): 

1019 raise RuntimeError( 

1020 f"{active_path!r} is not a subpath of {stripped_base!r}" 

1021 ) 

1022 

1023 return ( 

1024 active_path.removeprefix(stripped_base).removeprefix(self.parser.sep) 

1025 or "." 

1026 ) 

1027 else: 

1028 return self._chain_parser.chain(self._chain.to_list())[0] 

1029 

1030 def __vfspath__(self) -> str: 

1031 if self._relative_base is not None: 

1032 return self.__str__() 

1033 else: 

1034 return self.path 

1035 

1036 def __repr__(self) -> str: 

1037 cls_name = type(self).__name__ 

1038 path = self.__vfspath__() 

1039 if self._relative_base is not None: 

1040 return f"<relative {cls_name} {path!r}>" 

1041 else: 

1042 return f"{cls_name}({path!r}, protocol={self._protocol!r})" 

1043 

1044 # === JoinablePath overrides ====================================== 

1045 

1046 @property 

1047 def parts(self) -> Sequence[str]: 

1048 """Provides sequence-like access to the filesystem path components. 

1049 

1050 Examples 

1051 -------- 

1052 >>> from upath import UPath 

1053 >>> p = UPath("s3://my-bucket/path/to/file.txt") 

1054 >>> p.parts 

1055 ('my-bucket/', 'path', 'to', 'file.txt') 

1056 >>> p2 = UPath("/foo/bar/baz.txt", protocol="memory") 

1057 >>> p2.parts 

1058 ('/', 'foo', 'bar', 'baz.txt') 

1059 

1060 """ 

1061 # For relative paths, return parts of the relative path only 

1062 if self._relative_base is not None: 

1063 rel_str = str(self) 

1064 if rel_str == ".": 

1065 return () 

1066 return tuple(rel_str.split(self.parser.sep)) 

1067 

1068 split = self.parser.split 

1069 sep = self.parser.sep 

1070 

1071 path = self._chain.active_path 

1072 drive = self.parser.splitdrive(self._chain.active_path)[0] 

1073 stripped_path = self.parser.strip_protocol(path) 

1074 if stripped_path: 

1075 _, _, tail = path.partition(stripped_path) 

1076 path = stripped_path + tail 

1077 

1078 parent, name = split(path) 

1079 names = [] 

1080 while path != parent: 

1081 names.append(name) 

1082 path = parent 

1083 parent, name = split(path) 

1084 

1085 if names and names[-1] == drive: 

1086 names = names[:-1] 

1087 if names and names[-1].startswith(sep): 

1088 parts = [*names[:-1], names[-1].removeprefix(sep), drive + sep] 

1089 else: 

1090 parts = [*names, drive + sep] 

1091 return tuple(reversed(parts)) 

1092 

1093 def with_name(self, name: str) -> Self: 

1094 """Return a new path with the file name changed.""" 

1095 split = self.parser.split 

1096 if self.parser.sep in name: # `split(name)[0]` 

1097 raise ValueError(f"Invalid name {name!r}") 

1098 _path = self.__vfspath__() 

1099 _path = _path.removesuffix(split(_path)[1]) + name 

1100 return self.with_segments(_path) 

1101 

1102 @property 

1103 def anchor(self) -> str: 

1104 """The concatenation of the drive and root or an empty string.""" 

1105 if self._relative_base is not None: 

1106 return "" 

1107 return self.drive + self.root 

1108 

1109 @property 

1110 def parent(self) -> Self: 

1111 """The logical parent of the path. 

1112 

1113 Examples 

1114 -------- 

1115 >>> from upath import UPath 

1116 >>> p = UPath("s3://my-bucket/path/to/file.txt") 

1117 >>> p.parent 

1118 S3Path('s3://my-bucket/path/to') 

1119 

1120 """ 

1121 if self._relative_base is not None: 

1122 if str(self) == ".": 

1123 return self 

1124 else: 

1125 # this needs to be revisited... 

1126 pth = type(self)( 

1127 self._relative_base, 

1128 str(self), 

1129 protocol=self._protocol, 

1130 **self.storage_options, 

1131 ) 

1132 parent = pth.parent 

1133 parent._relative_base = self._relative_base 

1134 return parent 

1135 return super().parent 

1136 

1137 @property 

1138 def parents(self) -> Sequence[Self]: 

1139 """A sequence providing access to the logical ancestors of the path. 

1140 

1141 Examples 

1142 -------- 

1143 >>> from upath import UPath 

1144 >>> p = UPath("memory:///foo/bar/baz.txt") 

1145 >>> list(p.parents) 

1146 [ 

1147 MemoryPath('memory:///foo/bar'), 

1148 MemoryPath('memory:///foo'), 

1149 MemoryPath('memory:///'), 

1150 ] 

1151 

1152 """ 

1153 if self._relative_base is not None: 

1154 parents = [] 

1155 parent = self 

1156 while True: 

1157 if str(parent) == ".": 

1158 break 

1159 parent = parent.parent 

1160 parents.append(parent) 

1161 return tuple(parents) 

1162 return super().parents 

1163 

1164 def joinpath(self, *pathsegments: JoinablePathLike) -> Self: 

1165 """Combine this path with one or several arguments, and return a new path. 

1166 

1167 For one argument, this is equivalent to using the `/` operator. 

1168 

1169 Examples 

1170 -------- 

1171 >>> from upath import UPath 

1172 >>> p = UPath("s3://my-bucket/path/to") 

1173 >>> p.joinpath("file.txt") 

1174 S3Path('s3://my-bucket/path/to/file.txt') 

1175 

1176 """ 

1177 return self.with_segments(self.__vfspath__(), *pathsegments) 

1178 

1179 def __truediv__(self, key: JoinablePathLike) -> Self: 

1180 try: 

1181 return self.with_segments(self.__vfspath__(), key) 

1182 except TypeError: 

1183 return NotImplemented 

1184 

1185 def __rtruediv__(self, key: JoinablePathLike) -> Self: 

1186 try: 

1187 return self.with_segments(key, self.__vfspath__()) 

1188 except TypeError: 

1189 return NotImplemented 

1190 

1191 # === ReadablePath attributes ===================================== 

1192 

1193 @property 

1194 def info(self) -> PathInfo: 

1195 """ 

1196 A PathInfo object that exposes the file type and other file attributes 

1197 of this path. 

1198 

1199 Returns 

1200 ------- 

1201 : UPathInfo 

1202 The UPathInfo object for this path. 

1203 """ 

1204 return UPathInfo(self) 

1205 

1206 def iterdir(self) -> Iterator[Self]: 

1207 """Yield path objects of the directory contents. 

1208 

1209 Examples 

1210 -------- 

1211 >>> from upath import UPath 

1212 >>> p = UPath("memory:///foo/") 

1213 >>> p.joinpath("bar.txt").touch() 

1214 >>> p.joinpath("baz.txt").touch() 

1215 >>> for child in p.iterdir(): 

1216 ... print(child) 

1217 MemoryPath('memory:///foo/bar.txt') 

1218 MemoryPath('memory:///foo/baz.txt') 

1219 

1220 """ 

1221 sep = self.parser.sep 

1222 base = self 

1223 if self.parts[-1:] == ("",): 

1224 base = self.parent 

1225 fs = base.fs 

1226 base_path = base.path 

1227 if not fs.isdir(base_path): 

1228 raise NotADirectoryError(str(self)) 

1229 for name in fs.listdir(base_path): 

1230 # fsspec returns dictionaries 

1231 if isinstance(name, dict): 

1232 name = name.get("name") 

1233 if name in {".", ".."}: 

1234 # Yielding a path object for these makes little sense 

1235 continue 

1236 # only want the path name with iterdir 

1237 _, _, name = name.removesuffix(sep).rpartition(self.parser.sep) 

1238 yield base.with_segments(base_path, name) 

1239 

1240 def __open_reader__(self) -> BinaryIO: 

1241 return self.fs.open(self.path, mode="rb") 

1242 

1243 if sys.version_info >= (3, 14): 

1244 

1245 def __open_rb__(self, buffering: int = UNSET_DEFAULT) -> BinaryIO: 

1246 return self.open("rb", buffering=buffering) 

1247 

1248 def readlink(self) -> Self: 

1249 _raise_unsupported(type(self).__name__, "readlink") 

1250 

1251 @overload 

1252 def copy(self, target: _WT, **kwargs: Any) -> _WT: ... 

1253 

1254 @overload 

1255 def copy(self, target: SupportsPathLike | str, **kwargs: Any) -> Self: ... 

1256 

1257 def copy(self, target: _WT | SupportsPathLike | str, **kwargs: Any) -> _WT | UPath: 

1258 """ 

1259 Recursively copy this file or directory tree to the given destination. 

1260 """ 

1261 if isinstance(target, str): 

1262 proto = get_upath_protocol(target) 

1263 if proto != self.protocol: 

1264 target = UPath(target) 

1265 else: 

1266 target = self.with_segments(target) 

1267 elif not isinstance(target, UPath): 

1268 target = UPath(target) 

1269 if target.is_dir(): 

1270 raise IsADirectoryError(str(target)) 

1271 return super().copy(target, **kwargs) 

1272 

1273 @overload 

1274 def copy_into(self, target_dir: _WT, **kwargs: Any) -> _WT: ... 

1275 

1276 @overload 

1277 def copy_into(self, target_dir: SupportsPathLike | str, **kwargs: Any) -> Self: ... 

1278 

1279 def copy_into( 

1280 self, target_dir: _WT | SupportsPathLike | str, **kwargs: Any 

1281 ) -> _WT | UPath: 

1282 """ 

1283 Copy this file or directory tree into the given existing directory. 

1284 """ 

1285 if isinstance(target_dir, str): 

1286 proto = get_upath_protocol(target_dir) 

1287 if proto != self.protocol: 

1288 target_dir = UPath(target_dir) 

1289 else: 

1290 target_dir = self.with_segments(target_dir) 

1291 elif not isinstance(target_dir, UPath): 

1292 target_dir = UPath(target_dir) 

1293 if not target_dir.exists(): 

1294 raise FileNotFoundError(str(target_dir)) 

1295 if not target_dir.is_dir(): 

1296 raise NotADirectoryError(str(target_dir)) 

1297 return super().copy_into(target_dir, **kwargs) 

1298 

1299 @overload 

1300 def move(self, target: _WT, **kwargs: Any) -> _WT: ... 

1301 

1302 @overload 

1303 def move(self, target: SupportsPathLike | str, **kwargs: Any) -> Self: ... 

1304 

1305 def move(self, target: _WT | SupportsPathLike | str, **kwargs: Any) -> _WT | UPath: 

1306 """ 

1307 Recursively move this file or directory tree to the given destination. 

1308 """ 

1309 target = self.copy(target, **kwargs) 

1310 self.fs.rm(self.path, recursive=self.is_dir()) 

1311 return target 

1312 

1313 @overload 

1314 def move_into(self, target_dir: _WT, **kwargs: Any) -> _WT: ... 

1315 

1316 @overload 

1317 def move_into(self, target_dir: SupportsPathLike | str, **kwargs: Any) -> Self: ... 

1318 

1319 def move_into( 

1320 self, target_dir: _WT | SupportsPathLike | str, **kwargs: Any 

1321 ) -> _WT | UPath: 

1322 """ 

1323 Move this file or directory tree into the given existing directory. 

1324 """ 

1325 name = self.name 

1326 if not name: 

1327 raise ValueError(f"{self!r} has an empty name") 

1328 elif hasattr(target_dir, "with_segments"): 

1329 target = target_dir.with_segments(target_dir, name) # type: ignore 

1330 elif isinstance(target_dir, PurePath): 

1331 target = UPath(target_dir, name) 

1332 else: 

1333 target = self.with_segments(target_dir, name) 

1334 td = target.parent 

1335 if not td.exists(): 

1336 raise FileNotFoundError(str(td)) 

1337 elif not td.is_dir(): 

1338 raise NotADirectoryError(str(td)) 

1339 return self.move(target) 

1340 

1341 def _copy_from( 

1342 self, 

1343 source: ReadablePath, 

1344 follow_symlinks: bool = True, 

1345 on_name_collision: OnNameCollisionFunc | None = None, 

1346 **kwargs: Any, 

1347 ) -> None: 

1348 """ 

1349 UPath custom:: Recursively copy the given path to this path. 

1350 """ 

1351 # fixme: it would be best if this would be upstreamed 

1352 from pathlib_abc import vfsopen 

1353 from pathlib_abc import vfspath 

1354 from pathlib_abc._os import copyfileobj 

1355 from pathlib_abc._os import ensure_different_files 

1356 

1357 stack: list[tuple[ReadablePath, WritablePath]] = [(source, self)] 

1358 while stack: 

1359 src, dst = stack.pop() 

1360 info = src.info 

1361 if not follow_symlinks and info.is_symlink(): 

1362 dst.symlink_to(vfspath(src.readlink()), src.info.is_dir()) 

1363 elif on_name_collision and info.is_file() and info.is_dir(): 

1364 dst_file, dst_dir = on_name_collision(src, dst) 

1365 if dst_file is not None: 

1366 ensure_different_files(src, dst_file) 

1367 with vfsopen(src, "rb") as source_f: 

1368 with vfsopen(dst_file, "wb") as target_f: 

1369 copyfileobj(source_f, target_f) 

1370 if dst_dir is not None: 

1371 children = src.iterdir() 

1372 dst_dir.mkdir() 

1373 # feed through dict.fromkeys to remove duplicates 

1374 for child in dict.fromkeys(children): 

1375 stack.append((child, dst_dir.joinpath(child.name))) 

1376 elif info.is_dir(): 

1377 children = src.iterdir() 

1378 dst.mkdir() 

1379 # feed through dict.fromkeys to remove duplicates 

1380 for child in dict.fromkeys(children): 

1381 stack.append((child, dst.joinpath(child.name))) 

1382 else: 

1383 ensure_different_files(src, dst) 

1384 with vfsopen(src, "rb") as source_f: 

1385 with vfsopen(dst, "wb") as target_f: 

1386 copyfileobj(source_f, target_f) 

1387 

1388 # --- WritablePath attributes ------------------------------------- 

1389 

1390 def symlink_to( 

1391 self, 

1392 target: ReadablePathLike, 

1393 target_is_directory: bool = False, 

1394 ) -> None: 

1395 _raise_unsupported(type(self).__name__, "symlink_to") 

1396 

1397 def mkdir( 

1398 self, 

1399 mode: int = 0o777, 

1400 parents: bool = False, 

1401 exist_ok: bool = False, 

1402 ) -> None: 

1403 """ 

1404 Create a new directory at this given path. 

1405 """ 

1406 if parents and not exist_ok and self.exists(): 

1407 raise FileExistsError(str(self)) 

1408 try: 

1409 self.fs.mkdir( 

1410 self.path, 

1411 create_parents=parents, 

1412 mode=mode, 

1413 ) 

1414 except FileExistsError: 

1415 if not exist_ok: 

1416 raise FileExistsError(str(self)) 

1417 if not self.is_dir(): 

1418 raise FileExistsError(str(self)) 

1419 

1420 def __open_writer__(self, mode: Literal["a", "w", "x"]) -> BinaryIO: 

1421 return self.fs.open(self.path, mode=f"{mode}b") 

1422 

1423 # --- upath overrides --------------------------------------------- 

1424 

1425 @overload 

1426 def open( 

1427 self, 

1428 mode: Literal["r", "w", "a"] = ..., 

1429 buffering: int = ..., 

1430 encoding: str = ..., 

1431 errors: str = ..., 

1432 newline: str = ..., 

1433 **fsspec_kwargs: Any, 

1434 ) -> TextIO: ... 

1435 

1436 @overload 

1437 def open( 

1438 self, 

1439 mode: Literal["rb", "wb", "ab"] = ..., 

1440 buffering: int = ..., 

1441 encoding: str = ..., 

1442 errors: str = ..., 

1443 newline: str = ..., 

1444 **fsspec_kwargs: Any, 

1445 ) -> BinaryIO: ... 

1446 

1447 @overload 

1448 def open( 

1449 self, 

1450 mode: str = ..., 

1451 buffering: int = ..., 

1452 encoding: str | None = ..., 

1453 errors: str | None = ..., 

1454 newline: str | None = ..., 

1455 **fsspec_kwargs: Any, 

1456 ) -> IO[Any]: ... 

1457 

1458 def open( 

1459 self, 

1460 mode: str = "r", 

1461 buffering: int = UNSET_DEFAULT, 

1462 encoding: str | None = UNSET_DEFAULT, 

1463 errors: str | None = UNSET_DEFAULT, 

1464 newline: str | None = UNSET_DEFAULT, 

1465 **fsspec_kwargs: Any, 

1466 ) -> IO[Any]: 

1467 """ 

1468 Open the file pointed by this path and return a file object, as 

1469 the built-in open() function does. 

1470 

1471 Parameters 

1472 ---------- 

1473 mode: 

1474 Opening mode. Default is 'r'. 

1475 buffering: 

1476 Default is the block size of the underlying fsspec filesystem. 

1477 encoding: 

1478 Encoding is only used in text mode. Default is None. 

1479 errors: 

1480 Error handling for encoding. Only used in text mode. Default is None. 

1481 newline: 

1482 Newline handling. Only used in text mode. Default is None. 

1483 **fsspec_kwargs: 

1484 Additional options for the fsspec filesystem. 

1485 """ 

1486 # match the signature of pathlib.Path.open() 

1487 if buffering is not UNSET_DEFAULT: 

1488 if "block_size" in fsspec_kwargs: 

1489 raise TypeError("cannot specify both 'buffering' and 'block_size'") 

1490 block_size = _buffering2blocksize(mode, buffering) 

1491 if block_size is not None: 

1492 fsspec_kwargs.setdefault("block_size", block_size) 

1493 if encoding is not UNSET_DEFAULT: 

1494 fsspec_kwargs["encoding"] = encoding 

1495 if errors is not UNSET_DEFAULT: 

1496 fsspec_kwargs["errors"] = errors 

1497 if newline is not UNSET_DEFAULT: 

1498 fsspec_kwargs["newline"] = newline 

1499 return self.fs.open(self.path, mode=mode, **fsspec_kwargs) 

1500 

1501 # === pathlib.Path ================================================ 

1502 

1503 def stat( 

1504 self, 

1505 *, 

1506 follow_symlinks: bool = True, 

1507 ) -> StatResultType: 

1508 """ 

1509 Return the result of the stat() system call on this path, like 

1510 os.stat() does. 

1511 

1512 Info 

1513 ---- 

1514 For fsspec filesystems follow_symlinks is currently ignored. 

1515 

1516 Returns 

1517 ------- 

1518 : UPathStatResult 

1519 The upath stat result for this path, emulating `os.stat_result`. 

1520 

1521 """ 

1522 if not follow_symlinks: 

1523 warnings.warn( 

1524 f"{type(self).__name__}.stat(follow_symlinks=False):" 

1525 " is currently ignored.", 

1526 UserWarning, 

1527 stacklevel=2, 

1528 ) 

1529 return UPathStatResult.from_info(self.fs.info(self.path)) 

1530 

1531 def lstat(self) -> StatResultType: 

1532 return self.stat(follow_symlinks=False) 

1533 

1534 def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: 

1535 _raise_unsupported(type(self).__name__, "chmod") 

1536 

1537 def exists(self, *, follow_symlinks: bool = True) -> bool: 

1538 """ 

1539 Whether this path exists. 

1540 

1541 Info 

1542 ---- 

1543 For fsspec filesystems follow_symlinks is currently ignored. 

1544 """ 

1545 if not follow_symlinks: 

1546 warnings.warn( 

1547 f"{type(self).__name__}.exists() follow_symlinks=False" 

1548 " is currently ignored.", 

1549 UserWarning, 

1550 stacklevel=2, 

1551 ) 

1552 return self.fs.exists(self.path) 

1553 

1554 def is_dir(self, *, follow_symlinks: bool = True) -> bool: 

1555 """ 

1556 Whether this path is a directory. 

1557 """ 

1558 if not follow_symlinks: 

1559 warnings.warn( 

1560 f"{type(self).__name__}.is_dir() follow_symlinks=False" 

1561 " is currently ignored.", 

1562 UserWarning, 

1563 stacklevel=2, 

1564 ) 

1565 return self.fs.isdir(self.path) 

1566 

1567 def is_file(self, *, follow_symlinks: bool = True) -> bool: 

1568 """ 

1569 Whether this path is a regular file. 

1570 """ 

1571 if not follow_symlinks: 

1572 warnings.warn( 

1573 f"{type(self).__name__}.is_file() follow_symlinks=False" 

1574 " is currently ignored.", 

1575 UserWarning, 

1576 stacklevel=2, 

1577 ) 

1578 return self.fs.isfile(self.path) 

1579 

1580 def is_mount(self) -> bool: 

1581 """ 

1582 Check if this path is a mount point 

1583 

1584 Info 

1585 ---- 

1586 For fsspec filesystems this is always False. 

1587 """ 

1588 return False 

1589 

1590 def is_symlink(self) -> bool: 

1591 """ 

1592 Whether this path is a symbolic link. 

1593 """ 

1594 try: 

1595 info = self.fs.info(self.path) 

1596 if "islink" in info: 

1597 return bool(info["islink"]) 

1598 except FileNotFoundError: 

1599 return False 

1600 return False 

1601 

1602 def is_junction(self) -> bool: 

1603 """ 

1604 Whether this path is a junction. 

1605 

1606 Info 

1607 ---- 

1608 For fsspec filesystems this is always False. 

1609 """ 

1610 return False 

1611 

1612 def is_block_device(self) -> bool: 

1613 """ 

1614 Whether this path is a block device. 

1615 

1616 Info 

1617 ---- 

1618 For fsspec filesystems this is always False. 

1619 """ 

1620 return False 

1621 

1622 def is_char_device(self) -> bool: 

1623 """ 

1624 Whether this path is a character device. 

1625 

1626 Info 

1627 ---- 

1628 For fsspec filesystems this is always False. 

1629 """ 

1630 return False 

1631 

1632 def is_fifo(self) -> bool: 

1633 """ 

1634 Whether this path is a FIFO (named pipe). 

1635 

1636 Info 

1637 ---- 

1638 For fsspec filesystems this is always False. 

1639 """ 

1640 return False 

1641 

1642 def is_socket(self) -> bool: 

1643 """ 

1644 Whether this path is a socket. 

1645 

1646 Info 

1647 ---- 

1648 For fsspec filesystems this is always False. 

1649 """ 

1650 return False 

1651 

1652 def is_reserved(self) -> bool: 

1653 """ 

1654 Whether this path is reserved under Windows. 

1655 

1656 Info 

1657 ---- 

1658 For fsspec filesystems this is always False. 

1659 """ 

1660 return False 

1661 

1662 def expanduser(self) -> Self: 

1663 """Return a new path with expanded `~` constructs. 

1664 

1665 Info 

1666 ---- 

1667 For fsspec filesystems this is currently a no-op. 

1668 """ 

1669 return self 

1670 

1671 def glob( 

1672 self, 

1673 pattern: str, 

1674 *, 

1675 case_sensitive: bool | None = None, 

1676 recurse_symlinks: bool = False, 

1677 ) -> Iterator[Self]: 

1678 """Iterate over this subtree and yield all existing files (of any 

1679 kind, including directories) matching the given relative pattern.""" 

1680 if case_sensitive is not None: 

1681 warnings.warn( 

1682 "UPath.glob(): case_sensitive is currently ignored.", 

1683 UserWarning, 

1684 stacklevel=2, 

1685 ) 

1686 if recurse_symlinks: 

1687 warnings.warn( 

1688 "UPath.glob(): recurse_symlinks=True is currently ignored.", 

1689 UserWarning, 

1690 stacklevel=2, 

1691 ) 

1692 if self._relative_base is not None: 

1693 self = self.absolute() 

1694 path_pattern = self.joinpath(pattern).path 

1695 sep = self.parser.sep 

1696 base = self.path 

1697 for name in self.fs.glob(path_pattern): 

1698 name = name.removeprefix(base).removeprefix(sep) 

1699 yield self.joinpath(name) 

1700 

1701 def rglob( 

1702 self, 

1703 pattern: str, 

1704 *, 

1705 case_sensitive: bool | None = None, 

1706 recurse_symlinks: bool = False, 

1707 ) -> Iterator[Self]: 

1708 """Recursively yield all existing files (of any kind, including 

1709 directories) matching the given relative pattern, anywhere in 

1710 this subtree. 

1711 """ 

1712 if case_sensitive is not None: 

1713 warnings.warn( 

1714 "UPath.glob(): case_sensitive is currently ignored.", 

1715 UserWarning, 

1716 stacklevel=2, 

1717 ) 

1718 if recurse_symlinks: 

1719 warnings.warn( 

1720 "UPath.glob(): recurse_symlinks=True is currently ignored.", 

1721 UserWarning, 

1722 stacklevel=2, 

1723 ) 

1724 if _FSSPEC_HAS_WORKING_GLOB is None: 

1725 _check_fsspec_has_working_glob() 

1726 

1727 if _FSSPEC_HAS_WORKING_GLOB: 

1728 r_path_pattern = self.joinpath("**", pattern).path 

1729 sep = self.parser.sep 

1730 base = self.path 

1731 for name in self.fs.glob(r_path_pattern): 

1732 name = name.removeprefix(base).removeprefix(sep) 

1733 yield self.joinpath(name) 

1734 

1735 else: 

1736 path_pattern = self.joinpath(pattern).path 

1737 r_path_pattern = self.joinpath("**", pattern).path 

1738 sep = self.parser.sep 

1739 base = self.path 

1740 seen = set() 

1741 for p in (path_pattern, r_path_pattern): 

1742 for name in self.fs.glob(p): 

1743 name = name.removeprefix(base).removeprefix(sep) 

1744 if name in seen: 

1745 continue 

1746 else: 

1747 seen.add(name) 

1748 yield self.joinpath(name) 

1749 

1750 def owner(self, *, follow_symlinks: bool = True) -> str: 

1751 _raise_unsupported(type(self).__name__, "owner") 

1752 

1753 def group(self, *, follow_symlinks: bool = True) -> str: 

1754 _raise_unsupported(type(self).__name__, "group") 

1755 

1756 def absolute(self) -> Self: 

1757 """Return an absolute version of this path 

1758 No normalization or symlink resolution is performed. 

1759 

1760 Use resolve() to resolve symlinks and remove '..' segments. 

1761 """ 

1762 if self._relative_base is not None: 

1763 return self.cwd().joinpath(self.__vfspath__()) 

1764 return self 

1765 

1766 def is_absolute(self) -> bool: 

1767 """True if the path is absolute (has both a root and, if applicable, 

1768 a drive).""" 

1769 if self._relative_base is not None: 

1770 return False 

1771 else: 

1772 return self.parser.isabs(self.__vfspath__()) 

1773 

1774 def __eq__(self, other: object) -> bool: 

1775 """UPaths are considered equal if their protocol, path and 

1776 storage_options are equal.""" 

1777 if not isinstance(other, UPath): 

1778 return NotImplemented 

1779 

1780 # For relative paths, compare the string representation instead of path 

1781 if ( 

1782 self._relative_base is not None 

1783 or getattr(other, "_relative_base", None) is not None 

1784 ): 

1785 # If both are relative paths, compare just the relative strings 

1786 if ( 

1787 self._relative_base is not None 

1788 and getattr(other, "_relative_base", None) is not None 

1789 ): 

1790 return str(self) == str(other) 

1791 else: 

1792 # One is relative, one is not - they can't be equal 

1793 return False 

1794 

1795 return ( 

1796 self.__vfspath__() == other.__vfspath__() 

1797 and self.protocol == other.protocol 

1798 and self.storage_options == other.storage_options 

1799 ) 

1800 

1801 def __hash__(self) -> int: 

1802 """The returned hash is based on the protocol and path only. 

1803 

1804 Note: in the future, if hash collisions become an issue, we 

1805 can add `fsspec.utils.tokenize(storage_options)` 

1806 """ 

1807 return hash((self.protocol, self.__vfspath__())) 

1808 

1809 def __lt__(self, other: object) -> bool: 

1810 if not isinstance(other, UPath) or self.parser is not other.parser: 

1811 return NotImplemented 

1812 return self.__vfspath__() < other.__vfspath__() 

1813 

1814 def __le__(self, other: object) -> bool: 

1815 if not isinstance(other, UPath) or self.parser is not other.parser: 

1816 return NotImplemented 

1817 return self.__vfspath__() <= other.__vfspath__() 

1818 

1819 def __gt__(self, other: object) -> bool: 

1820 if not isinstance(other, UPath) or self.parser is not other.parser: 

1821 return NotImplemented 

1822 return self.__vfspath__() > other.__vfspath__() 

1823 

1824 def __ge__(self, other: object) -> bool: 

1825 if not isinstance(other, UPath) or self.parser is not other.parser: 

1826 return NotImplemented 

1827 return self.__vfspath__() >= other.__vfspath__() 

1828 

1829 def resolve(self, strict: bool = False) -> Self: 

1830 """ 

1831 Make the path absolute, resolving all symlinks on the way and also 

1832 normalizing it. 

1833 """ 

1834 if self._relative_base is not None: 

1835 self = self.absolute() 

1836 _parts = self.parts 

1837 

1838 # Do not attempt to normalize path if no parts are dots 

1839 if ".." not in _parts and "." not in _parts: 

1840 return self 

1841 

1842 resolved: list[str] = [] 

1843 resolvable_parts = _parts[1:] 

1844 for part in resolvable_parts: 

1845 if part == "..": 

1846 if resolved: 

1847 resolved.pop() 

1848 elif part != ".": 

1849 resolved.append(part) 

1850 

1851 return self.with_segments(*_parts[:1], *resolved) 

1852 

1853 def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None: 

1854 """Create this file with the given access mode, if it doesn't exist.""" 

1855 exists = self.fs.exists(self.path) 

1856 if exists and not exist_ok: 

1857 raise FileExistsError(str(self)) 

1858 if not exists: 

1859 try: 

1860 self.fs.touch(self.path, truncate=True) 

1861 except NotImplementedError: 

1862 _raise_unsupported(type(self).__name__, "touch") 

1863 else: 

1864 try: 

1865 self.fs.touch(self.path, truncate=False) 

1866 except (NotImplementedError, ValueError): 

1867 pass # unsupported by filesystem 

1868 

1869 def lchmod(self, mode: int) -> None: 

1870 _raise_unsupported(type(self).__name__, "lchmod") 

1871 

1872 def unlink(self, missing_ok: bool = False) -> None: 

1873 """ 

1874 Remove this file or link. 

1875 If the path is a directory, use rmdir() instead. 

1876 """ 

1877 if not self.exists(): 

1878 if not missing_ok: 

1879 raise FileNotFoundError(str(self)) 

1880 return 

1881 self.fs.rm(self.path, recursive=False) 

1882 

1883 def rmdir(self, recursive: bool = True) -> None: # fixme: non-standard 

1884 """ 

1885 Remove this directory. 

1886 

1887 Warning 

1888 ------- 

1889 This method is non-standard compared to pathlib.Path.rmdir(), 

1890 as it supports a `recursive` parameter to remove non-empty 

1891 directories and defaults to recursive deletion. 

1892 

1893 This behavior is likely to change in future releases once 

1894 `.delete()` is introduced. 

1895 

1896 """ 

1897 if not self.is_dir(): 

1898 raise NotADirectoryError(str(self)) 

1899 if not recursive and next(self.iterdir()): # type: ignore[arg-type] 

1900 raise OSError(f"Not recursive and directory not empty: {self}") 

1901 self.fs.rm(self.path, recursive=recursive) 

1902 

1903 def rename( 

1904 self, 

1905 target: WritablePathLike, 

1906 *, # note: non-standard compared to pathlib 

1907 recursive: bool = UNSET_DEFAULT, 

1908 maxdepth: int | None = UNSET_DEFAULT, 

1909 **kwargs: Any, 

1910 ) -> Self: 

1911 """ 

1912 Rename this file or directory to the given target. 

1913 

1914 The target path may be absolute or relative. Relative paths are 

1915 interpreted relative to the current working directory, *not* the 

1916 directory of the Path object. 

1917 

1918 Returns the new Path instance pointing to the target path. 

1919 

1920 Info 

1921 ---- 

1922 For filesystems that don't have a root character, i.e. for which 

1923 relative paths can be ambiguous, you can explicitly indicate a 

1924 relative path via prefixing with `./` 

1925 

1926 Warning 

1927 ------- 

1928 This method is non-standard compared to pathlib.Path.rename(), 

1929 as it supports `recursive` and `maxdepth` parameters for 

1930 directory moves. This will be revisited in future releases. 

1931 

1932 It's better to use `.move()` or `.move_into()` to avoid 

1933 running into future compatibility issues. 

1934 

1935 """ 

1936 # check protocol compatibility 

1937 target_protocol = get_upath_protocol(target) 

1938 if target_protocol and target_protocol != self.protocol: 

1939 raise ValueError( 

1940 f"expected protocol {self.protocol!r}, got: {target_protocol!r}" 

1941 ) 

1942 # ensure target is an absolute UPath 

1943 if not isinstance(target, type(self)): 

1944 if isinstance(target, (UPath, PurePath)): 

1945 target_str = target.as_posix() 

1946 else: 

1947 target_str = str(target) 

1948 if target_protocol: 

1949 # target protocol provided indicates absolute path 

1950 target = self.with_segments(target_str) 

1951 elif self.anchor and target_str.startswith(self.anchor): 

1952 # self.anchor can be used to indicate absolute path 

1953 target = self.with_segments(target_str) 

1954 elif not self.anchor and target_str.startswith("./"): 

1955 # indicate relative via "./" 

1956 target = ( 

1957 self.cwd() 

1958 .joinpath(target_str.removeprefix("./")) 

1959 .relative_to(self.cwd()) 

1960 ) 

1961 else: 

1962 # all other cases 

1963 target = self.cwd().joinpath(target_str).relative_to(self.cwd()) 

1964 # return early if renaming to same path 

1965 if target == self: 

1966 return self 

1967 # ensure source and target are absolute 

1968 source_abs = self.absolute() 

1969 target_abs = target.absolute() 

1970 # avoid calling .resolve for if not needed 

1971 if ".." in target_abs.parts or "." in target_abs.parts: 

1972 target_abs = target_abs.resolve() 

1973 if kwargs: 

1974 warnings.warn( 

1975 "Passing additional keyword arguments to " 

1976 f"{type(self).__name__}.rename() is deprecated and will be" 

1977 " removed in future univeral-pathlib versions.", 

1978 DeprecationWarning, 

1979 stacklevel=2, 

1980 ) 

1981 if recursive is not UNSET_DEFAULT: 

1982 warnings.warn( 

1983 f"{type(self).__name__}.rename()'s `recursive` keyword argument is" 

1984 " deprecated and will be removed in future universal-pathlib versions." 

1985 f" Please use {type(self).__name__}.move() or .move_into() instead.", 

1986 DeprecationWarning, 

1987 stacklevel=2, 

1988 ) 

1989 kwargs["recursive"] = recursive 

1990 if maxdepth is not UNSET_DEFAULT: 

1991 warnings.warn( 

1992 f"{type(self).__name__}.rename()'s `maxdepth` keyword argument is" 

1993 " deprecated and will be removed in future universal-pathlib versions.", 

1994 DeprecationWarning, 

1995 stacklevel=2, 

1996 ) 

1997 kwargs["maxdepth"] = maxdepth 

1998 self.fs.mv( 

1999 source_abs.path, 

2000 target_abs.path, 

2001 **kwargs, 

2002 ) 

2003 return target 

2004 

2005 def replace(self, target: WritablePathLike) -> Self: 

2006 """ 

2007 Rename this path to the target path, overwriting if that path exists. 

2008 

2009 The target path may be absolute or relative. Relative paths are 

2010 interpreted relative to the current working directory, *not* the 

2011 directory of the Path object. 

2012 

2013 Returns the new Path instance pointing to the target path. 

2014 

2015 Warning 

2016 ------- 

2017 This method is currently not implemented. 

2018 

2019 """ 

2020 _raise_unsupported(type(self).__name__, "replace") 

2021 

2022 @property 

2023 def drive(self) -> str: 

2024 """The drive prefix (letter or UNC path), if any. 

2025 

2026 Info 

2027 ---- 

2028 On non-Windows systems, the drive is always an empty string. 

2029 On cloud storage systems, the drive is the bucket name or equivalent. 

2030 """ 

2031 if self._relative_base is not None: 

2032 return "" 

2033 return self.parser.splitroot(str(self))[0] 

2034 

2035 @property 

2036 def root(self) -> str: 

2037 """The root of the path, if any.""" 

2038 if self._relative_base is not None: 

2039 return "" 

2040 return self.parser.splitroot(str(self))[1] 

2041 

2042 def __reduce__(self): 

2043 if self._relative_base is None: 

2044 args = (self.__vfspath__(),) 

2045 kwargs = { 

2046 "protocol": self._protocol, 

2047 **self.storage_options, 

2048 } 

2049 else: 

2050 args = (self._relative_base, self.__vfspath__()) 

2051 # Include _relative_base in the state if it's set 

2052 kwargs = { 

2053 "protocol": self._protocol, 

2054 **self.storage_options, 

2055 "_relative_base": self._relative_base, 

2056 } 

2057 return _make_instance, (type(self), args, kwargs) 

2058 

2059 @classmethod 

2060 def from_uri(cls, uri: str, **storage_options: Any) -> Self: 

2061 return cls(uri, **storage_options) 

2062 

2063 def as_uri(self) -> str: 

2064 """Return the string representation of the path as a URI.""" 

2065 if self._relative_base is not None: 

2066 raise ValueError( 

2067 f"relative path can't be expressed as a {self.protocol} URI" 

2068 ) 

2069 return str(self) 

2070 

2071 def as_posix(self) -> str: 

2072 """Return the string representation of the path with POSIX-style separators.""" 

2073 return str(self) 

2074 

2075 def samefile(self, other_path) -> bool: 

2076 st = self.stat() 

2077 if isinstance(other_path, UPath): 

2078 other_st = other_path.stat() 

2079 else: 

2080 other_st = self.with_segments(other_path).stat() 

2081 return st == other_st 

2082 

2083 @classmethod 

2084 def cwd(cls) -> Self: 

2085 """ 

2086 Return a new UPath object representing the current working directory. 

2087 

2088 Info 

2089 ---- 

2090 None of the fsspec filesystems support a global current working 

2091 directory, so this method only works for the base UPath class, 

2092 returning the local current working directory. 

2093 

2094 """ 

2095 if cls is UPath: 

2096 # default behavior for UPath.cwd() is to return local cwd 

2097 return get_upath_class("").cwd() # type: ignore[union-attr,return-value] 

2098 else: 

2099 _raise_unsupported(cls.__name__, "cwd") 

2100 

2101 @classmethod 

2102 def home(cls) -> Self: 

2103 """ 

2104 Return a new UPath object representing the user's home directory. 

2105 

2106 Info 

2107 ---- 

2108 None of the fsspec filesystems support user home directories, 

2109 so this method only works for the base UPath class, returning the 

2110 local user's home directory. 

2111 

2112 """ 

2113 if cls is UPath: 

2114 return get_upath_class("").home() # type: ignore[union-attr,return-value] 

2115 else: 

2116 _raise_unsupported(cls.__name__, "home") 

2117 

2118 def relative_to( # type: ignore[override] 

2119 self, 

2120 other: Self | str, 

2121 /, 

2122 *_deprecated: Any, 

2123 walk_up: bool = False, 

2124 ) -> Self: 

2125 """Return the relative path to another path identified by the passed 

2126 arguments. If the operation is not possible (because this is not 

2127 related to the other path), raise ValueError. 

2128 

2129 The *walk_up* parameter controls whether `..` may be used to resolve 

2130 the path. 

2131 """ 

2132 if walk_up: 

2133 raise NotImplementedError("walk_up=True is not implemented yet") 

2134 

2135 if isinstance(other, UPath): 

2136 # revisit: ... 

2137 if self.__class__ is not other.__class__: 

2138 raise ValueError( 

2139 "incompatible protocols:" 

2140 f" {self.protocol!r} != {other.protocol!r}" 

2141 ) 

2142 if self.storage_options != other.storage_options: 

2143 raise ValueError( 

2144 "incompatible storage_options:" 

2145 f" {self.storage_options!r} != {other.storage_options!r}" 

2146 ) 

2147 elif isinstance(other, str): 

2148 other = self.with_segments(other) 

2149 else: 

2150 raise TypeError(f"expected UPath or str, got {type(other).__name__}") 

2151 

2152 if other not in self.parents and self != other: 

2153 raise ValueError(f"{self!s} is not in the subpath of {other!s}") 

2154 else: 

2155 rel = copy(self) 

2156 rel._relative_base = other.path 

2157 return rel 

2158 

2159 def is_relative_to( 

2160 self, 

2161 other: Self | str, 

2162 /, 

2163 *_deprecated: Any, 

2164 ) -> bool: # type: ignore[override] 

2165 """Return True if the path is relative to another path identified.""" 

2166 if isinstance(other, UPath) and self.storage_options != other.storage_options: 

2167 return False 

2168 elif isinstance(other, str): 

2169 other = self.with_segments(other) 

2170 return self == other or other in self.parents 

2171 

2172 def hardlink_to(self, target: ReadablePathLike) -> None: 

2173 _raise_unsupported(type(self).__name__, "hardlink_to") 

2174 

2175 def full_match( 

2176 self, 

2177 pattern: str | SupportsPathLike, 

2178 *, 

2179 case_sensitive: bool | None = None, 

2180 ) -> bool: 

2181 """Match this path against the provided glob-style pattern. 

2182 Return True if matching is successful, False otherwise. 

2183 """ 

2184 if case_sensitive is not None: 

2185 warnings.warn( 

2186 f"{type(self).__name__}.full_match(): case_sensitive" 

2187 " is currently ignored.", 

2188 UserWarning, 

2189 stacklevel=2, 

2190 ) 

2191 return super().full_match(str(pattern)) 

2192 

2193 def match( 

2194 self, 

2195 path_pattern: str | SupportsPathLike, 

2196 *, 

2197 case_sensitive: bool | None = None, 

2198 ) -> bool: 

2199 """Match this path against the provided non-recursive glob-style pattern. 

2200 Return True if matching is successful, False otherwise. 

2201 """ 

2202 path_pattern = str(path_pattern) 

2203 if not path_pattern: 

2204 raise ValueError("pattern cannot be empty") 

2205 if case_sensitive is not None: 

2206 warnings.warn( 

2207 f"{type(self).__name__}.match(): case_sensitive is currently ignored.", 

2208 UserWarning, 

2209 stacklevel=2, 

2210 ) 

2211 return self.full_match(path_pattern.replace("**", "*")) 

2212 

2213 @classmethod 

2214 def __get_pydantic_core_schema__( 

2215 cls, _source_type: Any, _handler: GetCoreSchemaHandler 

2216 ) -> CoreSchema: 

2217 from pydantic_core import core_schema 

2218 

2219 str_schema = core_schema.chain_schema( 

2220 [ 

2221 core_schema.str_schema(), 

2222 core_schema.no_info_plain_validator_function( 

2223 lambda path: { 

2224 "path": path, 

2225 "protocol": None, 

2226 "storage_options": {}, 

2227 }, 

2228 ), 

2229 ] 

2230 ) 

2231 

2232 object_schema = core_schema.typed_dict_schema( 

2233 { 

2234 "path": core_schema.typed_dict_field( 

2235 core_schema.str_schema(), required=True 

2236 ), 

2237 "protocol": core_schema.typed_dict_field( 

2238 core_schema.with_default_schema( 

2239 core_schema.nullable_schema( 

2240 core_schema.str_schema(), 

2241 ), 

2242 default=None, 

2243 ), 

2244 required=False, 

2245 ), 

2246 "storage_options": core_schema.typed_dict_field( 

2247 core_schema.with_default_schema( 

2248 core_schema.dict_schema( 

2249 core_schema.str_schema(), 

2250 core_schema.any_schema(), 

2251 ), 

2252 default_factory=dict, 

2253 ), 

2254 required=False, 

2255 ), 

2256 }, 

2257 extra_behavior="forbid", 

2258 ) 

2259 

2260 deserialization_schema = core_schema.chain_schema( 

2261 [ 

2262 core_schema.union_schema([str_schema, object_schema]), 

2263 core_schema.no_info_plain_validator_function( 

2264 lambda dct: cls( 

2265 dct.pop("path"), 

2266 protocol=dct.pop("protocol"), 

2267 **dct["storage_options"], 

2268 ) 

2269 ), 

2270 ] 

2271 ) 

2272 

2273 serialization_schema = core_schema.plain_serializer_function_ser_schema( 

2274 lambda u: { 

2275 "path": u.path, 

2276 "protocol": u.protocol, 

2277 "storage_options": dict(u.storage_options), 

2278 } 

2279 ) 

2280 

2281 return core_schema.json_or_python_schema( 

2282 json_schema=deserialization_schema, 

2283 python_schema=core_schema.union_schema( 

2284 [core_schema.is_instance_schema(UPath), deserialization_schema] 

2285 ), 

2286 serialization=serialization_schema, 

2287 )