Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/upath/extensions.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

308 statements  

1from __future__ import annotations 

2 

3import sys 

4from collections.abc import Iterator 

5from collections.abc import Mapping 

6from collections.abc import Sequence 

7from typing import IO 

8from typing import TYPE_CHECKING 

9from typing import Any 

10from typing import BinaryIO 

11from typing import Callable 

12from typing import Literal 

13from typing import TextIO 

14from typing import TypeVar 

15from typing import overload 

16from urllib.parse import SplitResult 

17 

18from fsspec import AbstractFileSystem 

19from pathlib_abc import vfspath 

20 

21from upath._chain import Chain 

22from upath._chain import ChainSegment 

23from upath.core import UnsupportedOperation 

24from upath.core import UPath 

25from upath.types import UNSET_DEFAULT 

26from upath.types import JoinablePathLike 

27from upath.types import PathInfo 

28from upath.types import ReadablePath 

29from upath.types import ReadablePathLike 

30from upath.types import StatResultType 

31from upath.types import SupportsPathLike 

32from upath.types import UPathParser 

33from upath.types import WritablePathLike 

34 

35if TYPE_CHECKING: 

36 if sys.version_info > (3, 11): 

37 from typing import Self 

38 else: 

39 from typing_extensions import Self 

40 

41 from pydantic import GetCoreSchemaHandler 

42 from pydantic_core.core_schema import CoreSchema 

43 

44__all__ = [ 

45 "ProxyUPath", 

46] 

47 

48T = TypeVar("T") 

49 

50 

51class classmethod_or_method(classmethod): 

52 """A decorator that can be used as a classmethod or an instance method. 

53 

54 When called on the class, it behaves like a classmethod. 

55 When called on an instance, it behaves like an instance method. 

56 

57 """ 

58 

59 def __get__( 

60 self, 

61 instance: T | None, 

62 owner: type[T] | None = None, 

63 /, 

64 ) -> Callable[..., T]: 

65 if instance is None: 

66 return self.__func__.__get__(owner) 

67 else: 

68 return self.__func__.__get__(instance) 

69 

70 

71class ProxyUPath: 

72 """ProxyUPath base class 

73 

74 ProxyUPath should be used when you want to extend the UPath class 

75 interface with additional methods, but still want to support 

76 all supported upath implementations. 

77 

78 """ 

79 

80 __slots__ = ("__wrapped__",) 

81 

82 # TODO: think about if and how to handle these 

83 # _transform_init_args 

84 # _parse_storage_options 

85 # _fs_factory 

86 # _protocol_dispatch 

87 

88 # === non-public methods / attributes ============================= 

89 

90 @classmethod 

91 def _from_upath(cls, upath: UPath, /) -> Self: 

92 if isinstance(upath, cls): 

93 return upath # type: ignore[unreachable] 

94 else: 

95 obj = object.__new__(cls) 

96 obj.__wrapped__ = upath 

97 return obj 

98 

99 @property 

100 def _chain(self): 

101 try: 

102 return self.__wrapped__._chain 

103 except AttributeError: 

104 return Chain( 

105 ChainSegment( 

106 path=self.__wrapped__.path, 

107 protocol=self.__wrapped__.protocol, 

108 storage_options=dict(self.__wrapped__.storage_options), 

109 ), 

110 ) 

111 

112 # === wrapped interface =========================================== 

113 

114 def __init__( 

115 self, 

116 *args: JoinablePathLike, 

117 protocol: str | None = None, 

118 **storage_options: Any, 

119 ) -> None: 

120 self.__wrapped__ = UPath(*args, protocol=protocol, **storage_options) 

121 

122 @property 

123 def parser(self) -> UPathParser: 

124 return self.__wrapped__.parser 

125 

126 def with_segments(self, *pathsegments: JoinablePathLike) -> Self: 

127 return self._from_upath(self.__wrapped__.with_segments(*pathsegments)) 

128 

129 def __str__(self) -> str: 

130 return self.__wrapped__.__str__() 

131 

132 def __vfspath__(self) -> str: 

133 return self.__wrapped__.__vfspath__() 

134 

135 def __repr__(self) -> str: 

136 return ( 

137 f"{type(self).__name__}" 

138 f"({self.__wrapped__.path!r}, protocol={self.protocol!r})" 

139 ) 

140 

141 @property 

142 def parts(self) -> Sequence[str]: 

143 return self.__wrapped__.parts 

144 

145 def with_name(self, name: str) -> Self: 

146 return self._from_upath(self.__wrapped__.with_name(name)) 

147 

148 @property 

149 def info(self) -> PathInfo: 

150 return self.__wrapped__.info 

151 

152 def iterdir(self) -> Iterator[Self]: 

153 for pth in self.__wrapped__.iterdir(): 

154 yield self._from_upath(pth) 

155 

156 def __open_reader__(self) -> BinaryIO: 

157 return self.__wrapped__.__open_reader__() 

158 

159 def readlink(self) -> Self: 

160 return self._from_upath(self.__wrapped__.readlink()) 

161 

162 def symlink_to( 

163 self, 

164 target: ReadablePathLike, 

165 target_is_directory: bool = False, 

166 ) -> None: 

167 if not isinstance(target, str): 

168 target = vfspath(target) 

169 self.__wrapped__.symlink_to(target, target_is_directory=target_is_directory) 

170 

171 def mkdir( 

172 self, 

173 mode: int = 0o777, 

174 parents: bool = False, 

175 exist_ok: bool = False, 

176 ) -> None: 

177 self.__wrapped__.mkdir(mode=mode, parents=parents, exist_ok=exist_ok) 

178 

179 def __open_writer__(self, mode: Literal["a", "w", "x"]) -> BinaryIO: 

180 return self.__wrapped__.__open_writer__(mode) 

181 

182 @overload 

183 def open( 

184 self, 

185 mode: Literal["r", "w", "a"] = "r", 

186 buffering: int = ..., 

187 encoding: str = ..., 

188 errors: str = ..., 

189 newline: str = ..., 

190 **fsspec_kwargs: Any, 

191 ) -> TextIO: ... 

192 

193 @overload 

194 def open( 

195 self, 

196 mode: Literal["rb", "wb", "ab"], 

197 buffering: int = ..., 

198 encoding: str = ..., 

199 errors: str = ..., 

200 newline: str = ..., 

201 **fsspec_kwargs: Any, 

202 ) -> BinaryIO: ... 

203 

204 @overload 

205 def open( 

206 self, 

207 mode: str, 

208 *args: Any, 

209 **fsspec_kwargs: Any, 

210 ) -> IO[Any]: ... 

211 

212 def open( 

213 self, 

214 mode: str = "r", 

215 buffering: int = UNSET_DEFAULT, 

216 encoding: str | None = UNSET_DEFAULT, 

217 errors: str | None = UNSET_DEFAULT, 

218 newline: str | None = UNSET_DEFAULT, 

219 **fsspec_kwargs: Any, 

220 ) -> IO[Any]: 

221 return self.__wrapped__.open( 

222 mode, 

223 buffering, 

224 encoding, 

225 errors, 

226 newline, 

227 **fsspec_kwargs, 

228 ) 

229 

230 def stat( 

231 self, 

232 *, 

233 follow_symlinks=True, 

234 ) -> StatResultType: 

235 return self.__wrapped__.stat(follow_symlinks=follow_symlinks) 

236 

237 def lstat(self) -> StatResultType: 

238 return self.__wrapped__.stat(follow_symlinks=False) 

239 

240 def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: 

241 self.__wrapped__.chmod(mode=mode, follow_symlinks=follow_symlinks) 

242 

243 def exists(self, *, follow_symlinks=True) -> bool: 

244 return self.__wrapped__.exists(follow_symlinks=follow_symlinks) 

245 

246 def is_dir(self) -> bool: 

247 return self.__wrapped__.is_dir() 

248 

249 def is_file(self) -> bool: 

250 return self.__wrapped__.is_file() 

251 

252 def is_mount(self) -> bool: 

253 return self.__wrapped__.is_mount() 

254 

255 def is_symlink(self) -> bool: 

256 return self.__wrapped__.is_symlink() 

257 

258 def is_junction(self) -> bool: 

259 return self.__wrapped__.is_junction() 

260 

261 def is_block_device(self) -> bool: 

262 return self.__wrapped__.is_block_device() 

263 

264 def is_char_device(self) -> bool: 

265 return self.__wrapped__.is_char_device() 

266 

267 def is_fifo(self) -> bool: 

268 return self.__wrapped__.is_fifo() 

269 

270 def is_socket(self) -> bool: 

271 return self.__wrapped__.is_socket() 

272 

273 def is_reserved(self) -> bool: 

274 return self.__wrapped__.is_reserved() 

275 

276 def expanduser(self) -> Self: 

277 return self._from_upath(self.__wrapped__.expanduser()) 

278 

279 def glob( 

280 self, 

281 pattern: str, 

282 *, 

283 case_sensitive: bool | None = None, 

284 recurse_symlinks: bool = False, 

285 ) -> Iterator[Self]: 

286 for p in self.__wrapped__.glob( 

287 pattern, case_sensitive=case_sensitive, recurse_symlinks=recurse_symlinks 

288 ): 

289 yield self._from_upath(p) 

290 

291 def rglob( 

292 self, 

293 pattern: str, 

294 *, 

295 case_sensitive: bool | None = None, 

296 recurse_symlinks: bool = False, 

297 ) -> Iterator[Self]: 

298 for p in self.__wrapped__.rglob( 

299 pattern, case_sensitive=case_sensitive, recurse_symlinks=recurse_symlinks 

300 ): 

301 yield self._from_upath(p) 

302 

303 def owner(self) -> str: 

304 return self.__wrapped__.owner() 

305 

306 def group(self) -> str: 

307 return self.__wrapped__.group() 

308 

309 def absolute(self) -> Self: 

310 return self._from_upath(self.__wrapped__.absolute()) 

311 

312 def is_absolute(self) -> bool: 

313 return self.__wrapped__.is_absolute() 

314 

315 def __eq__(self, other: object) -> bool: 

316 if not isinstance(other, type(self)): 

317 return NotImplemented 

318 return self.__wrapped__.__eq__(other.__wrapped__) 

319 

320 def __hash__(self) -> int: 

321 return self.__wrapped__.__hash__() 

322 

323 def __ne__(self, other: object) -> bool: 

324 if not isinstance(other, type(self)): 

325 return NotImplemented 

326 return self.__wrapped__.__ne__(other.__wrapped__) 

327 

328 def __lt__(self, other: object) -> bool: 

329 if not isinstance(other, type(self)): 

330 return NotImplemented 

331 return self.__wrapped__.__lt__(other.__wrapped__) 

332 

333 def __le__(self, other: object) -> bool: 

334 if not isinstance(other, type(self)): 

335 return NotImplemented 

336 return self.__wrapped__.__le__(other.__wrapped__) 

337 

338 def __gt__(self, other: object) -> bool: 

339 if not isinstance(other, type(self)): 

340 return NotImplemented 

341 return self.__wrapped__.__gt__(other.__wrapped__) 

342 

343 def __ge__(self, other: object) -> bool: 

344 if not isinstance(other, type(self)): 

345 return NotImplemented 

346 return self.__wrapped__.__ge__(other.__wrapped__) 

347 

348 def resolve(self, strict: bool = False) -> Self: 

349 return self._from_upath(self.__wrapped__.resolve(strict=strict)) 

350 

351 def touch(self, mode=0o666, exist_ok=True) -> None: 

352 self.__wrapped__.touch(mode=mode, exist_ok=exist_ok) 

353 

354 def lchmod(self, mode: int) -> None: 

355 self.__wrapped__.lchmod(mode=mode) 

356 

357 def unlink(self, missing_ok: bool = False) -> None: 

358 self.__wrapped__.unlink(missing_ok=missing_ok) 

359 

360 def rmdir(self, recursive: bool = UNSET_DEFAULT) -> None: # fixme: non-standard 

361 kwargs: dict[str, Any] = {} 

362 if recursive is not UNSET_DEFAULT: 

363 kwargs["recursive"] = recursive 

364 self.__wrapped__.rmdir(**kwargs) 

365 

366 def rename( 

367 self, 

368 target: WritablePathLike, 

369 *, # note: non-standard compared to pathlib 

370 recursive: bool = UNSET_DEFAULT, 

371 maxdepth: int | None = UNSET_DEFAULT, 

372 **kwargs: Any, 

373 ) -> Self: 

374 if recursive is not UNSET_DEFAULT: 

375 kwargs["recursive"] = recursive 

376 if maxdepth is not UNSET_DEFAULT: 

377 kwargs["maxdepth"] = maxdepth 

378 return self._from_upath( 

379 self.__wrapped__.rename( 

380 target.__wrapped__ if isinstance(target, ProxyUPath) else target, 

381 **kwargs, 

382 ) 

383 ) 

384 

385 def replace(self, target: WritablePathLike) -> Self: 

386 return self._from_upath(self.__wrapped__.replace(target)) 

387 

388 @property 

389 def drive(self) -> str: 

390 return self.__wrapped__.drive 

391 

392 @property 

393 def root(self) -> str: 

394 return self.__wrapped__.root 

395 

396 def __reduce__(self): 

397 return type(self)._from_upath, (self.__wrapped__,) 

398 

399 @classmethod 

400 def from_uri(cls, uri: str, **storage_options: Any) -> Self: 

401 return cls(uri, **storage_options) 

402 

403 def as_uri(self) -> str: 

404 return self.__wrapped__.as_uri() 

405 

406 def as_posix(self) -> str: 

407 return self.__wrapped__.as_posix() 

408 

409 def samefile(self, other_path) -> bool: 

410 return self.__wrapped__.samefile(other_path) 

411 

412 @classmethod_or_method 

413 def cwd(cls_or_self) -> Self: # noqa: B902 

414 if isinstance(cls_or_self, type): 

415 raise UnsupportedOperation(".cwd() not supported") 

416 else: 

417 return cls_or_self._from_upath(cls_or_self.__wrapped__.cwd()) 

418 

419 @classmethod 

420 def home(cls) -> Self: 

421 raise UnsupportedOperation(".home() not supported") 

422 

423 def relative_to( # type: ignore[override] 

424 self, 

425 other, 

426 /, 

427 *_deprecated, 

428 walk_up=False, 

429 ) -> Self: 

430 if isinstance(other, ProxyUPath): 

431 other = other.__wrapped__ 

432 return self._from_upath( 

433 self.__wrapped__.relative_to(other, *_deprecated, walk_up=walk_up) 

434 ) 

435 

436 def is_relative_to(self, other, /, *_deprecated) -> bool: # type: ignore[override] 

437 if not isinstance(other, str): 

438 other = vfspath(other) 

439 return self.__wrapped__.is_relative_to(other, *_deprecated) 

440 

441 def hardlink_to(self, target: ReadablePathLike) -> None: 

442 if not isinstance(target, str): 

443 target = vfspath(target) 

444 return self.__wrapped__.hardlink_to(target) 

445 

446 def match(self, pattern: str, *, case_sensitive: bool | None = None) -> bool: 

447 return self.__wrapped__.match(pattern) 

448 

449 @property 

450 def protocol(self) -> str: 

451 return self.__wrapped__.protocol 

452 

453 @property 

454 def storage_options(self) -> Mapping[str, Any]: 

455 return self.__wrapped__.storage_options 

456 

457 @property 

458 def fs(self) -> AbstractFileSystem: 

459 return self.__wrapped__.fs 

460 

461 @property 

462 def path(self) -> str: 

463 return self.__wrapped__.path 

464 

465 def joinuri(self, uri: JoinablePathLike) -> Self: 

466 return self._from_upath(self.__wrapped__.joinuri(uri)) 

467 

468 @property 

469 def _url(self) -> SplitResult: 

470 return self.__wrapped__._url 

471 

472 def read_bytes(self) -> bytes: 

473 return self.__wrapped__.read_bytes() 

474 

475 def read_text( 

476 self, 

477 encoding: str | None = None, 

478 errors: str | None = None, 

479 newline: str | None = None, 

480 ) -> str: 

481 return self.__wrapped__.read_text( 

482 encoding=encoding, errors=errors, newline=newline 

483 ) 

484 

485 def walk( 

486 self, 

487 top_down: bool = True, 

488 on_error: Callable[[Exception], Any] | None = None, 

489 follow_symlinks: bool = False, 

490 ) -> Iterator[tuple[Self, list[str], list[str]]]: 

491 for pth, dirnames, filenames in self.__wrapped__.walk( 

492 top_down=top_down, on_error=on_error, follow_symlinks=follow_symlinks 

493 ): 

494 yield self._from_upath(pth), dirnames, filenames 

495 

496 def copy(self, target: WritablePathLike, **kwargs: Any) -> Self: # type: ignore[override] # noqa: E501 

497 return self._from_upath(self.__wrapped__.copy(target, **kwargs)) 

498 

499 def copy_into(self, target_dir: WritablePathLike, **kwargs: Any) -> Self: # type: ignore[override] # noqa: E501 

500 return self._from_upath(self.__wrapped__.copy_into(target_dir, **kwargs)) 

501 

502 def move(self, target: WritablePathLike, **kwargs: Any) -> Self: # type: ignore[override] # noqa: E501 

503 return self._from_upath(self.__wrapped__.move(target, **kwargs)) 

504 

505 def move_into(self, target_dir: WritablePathLike, **kwargs: Any) -> Self: # type: ignore[override] # noqa: E501 

506 return self._from_upath(self.__wrapped__.move_into(target_dir, **kwargs)) 

507 

508 def write_bytes(self, data: bytes) -> int: 

509 return self.__wrapped__.write_bytes(data) 

510 

511 def write_text( 

512 self, 

513 data: str, 

514 encoding: str | None = None, 

515 errors: str | None = None, 

516 newline: str | None = None, 

517 ) -> int: 

518 return self.__wrapped__.write_text( 

519 data, encoding=encoding, errors=errors, newline=newline 

520 ) 

521 

522 def _copy_from( 

523 self, source: ReadablePath | Self, follow_symlinks: bool = True 

524 ) -> None: 

525 self.__wrapped__._copy_from(source, follow_symlinks=follow_symlinks) # type: ignore # noqa: E501 

526 

527 @property 

528 def anchor(self) -> str: 

529 return self.__wrapped__.anchor 

530 

531 @property 

532 def name(self) -> str: 

533 return self.__wrapped__.name 

534 

535 @property 

536 def suffix(self) -> str: 

537 return self.__wrapped__.suffix 

538 

539 @property 

540 def suffixes(self) -> list[str]: 

541 return self.__wrapped__.suffixes 

542 

543 @property 

544 def stem(self) -> str: 

545 return self.__wrapped__.stem 

546 

547 def with_stem(self, stem: str) -> Self: 

548 return self._from_upath(self.__wrapped__.with_stem(stem)) 

549 

550 def with_suffix(self, suffix: str) -> Self: 

551 return self._from_upath(self.__wrapped__.with_suffix(suffix)) 

552 

553 def joinpath(self, *pathsegments: JoinablePathLike) -> Self: 

554 return self._from_upath(self.__wrapped__.joinpath(*pathsegments)) 

555 

556 def __truediv__(self, other: str | Self) -> Self: 

557 return self._from_upath( 

558 self.__wrapped__.__truediv__(other) # type: ignore[operator] 

559 ) 

560 

561 def __rtruediv__(self, other: str | Self) -> Self: 

562 return self._from_upath( 

563 self.__wrapped__.__rtruediv__(other) # type: ignore[operator] 

564 ) 

565 

566 @property 

567 def parent(self) -> Self: 

568 return self._from_upath(self.__wrapped__.parent) 

569 

570 @property 

571 def parents(self) -> Sequence[Self]: 

572 return tuple(self._from_upath(p) for p in self.__wrapped__.parents) 

573 

574 def full_match( 

575 self, 

576 pattern: str | SupportsPathLike, 

577 *, 

578 case_sensitive: bool | None = None, 

579 ) -> bool: 

580 return self.__wrapped__.full_match(pattern, case_sensitive=case_sensitive) 

581 

582 @classmethod 

583 def __get_pydantic_core_schema__( 

584 cls, source_type: Any, handler: GetCoreSchemaHandler 

585 ) -> CoreSchema: 

586 cs = UPath.__get_pydantic_core_schema__.__func__ # type: ignore[attr-defined] 

587 return cs(cls, source_type, handler) 

588 

589 

590UPath.register(ProxyUPath)