Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_fileio.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

310 statements  

1from __future__ import annotations 

2 

3import os 

4import pathlib 

5import sys 

6from collections.abc import Callable, Iterable, Iterator, Sequence 

7from dataclasses import dataclass 

8from functools import partial 

9from os import PathLike 

10from typing import ( 

11 IO, 

12 TYPE_CHECKING, 

13 Any, 

14 AnyStr, 

15 AsyncIterator, 

16 Final, 

17 Generic, 

18 overload, 

19) 

20 

21from .. import to_thread 

22from ..abc import AsyncResource 

23 

24if TYPE_CHECKING: 

25 from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer 

26else: 

27 ReadableBuffer = OpenBinaryMode = OpenTextMode = WriteableBuffer = object 

28 

29 

30class AsyncFile(AsyncResource, Generic[AnyStr]): 

31 """ 

32 An asynchronous file object. 

33 

34 This class wraps a standard file object and provides async friendly versions of the 

35 following blocking methods (where available on the original file object): 

36 

37 * read 

38 * read1 

39 * readline 

40 * readlines 

41 * readinto 

42 * readinto1 

43 * write 

44 * writelines 

45 * truncate 

46 * seek 

47 * tell 

48 * flush 

49 

50 All other methods are directly passed through. 

51 

52 This class supports the asynchronous context manager protocol which closes the 

53 underlying file at the end of the context block. 

54 

55 This class also supports asynchronous iteration:: 

56 

57 async with await open_file(...) as f: 

58 async for line in f: 

59 print(line) 

60 """ 

61 

62 def __init__(self, fp: IO[AnyStr]) -> None: 

63 self._fp: Any = fp 

64 

65 def __getattr__(self, name: str) -> object: 

66 return getattr(self._fp, name) 

67 

68 @property 

69 def wrapped(self) -> IO[AnyStr]: 

70 """The wrapped file object.""" 

71 return self._fp 

72 

73 async def __aiter__(self) -> AsyncIterator[AnyStr]: 

74 while True: 

75 line = await self.readline() 

76 if line: 

77 yield line 

78 else: 

79 break 

80 

81 async def aclose(self) -> None: 

82 return await to_thread.run_sync(self._fp.close) 

83 

84 async def read(self, size: int = -1) -> AnyStr: 

85 return await to_thread.run_sync(self._fp.read, size) 

86 

87 async def read1(self: AsyncFile[bytes], size: int = -1) -> bytes: 

88 return await to_thread.run_sync(self._fp.read1, size) 

89 

90 async def readline(self) -> AnyStr: 

91 return await to_thread.run_sync(self._fp.readline) 

92 

93 async def readlines(self) -> list[AnyStr]: 

94 return await to_thread.run_sync(self._fp.readlines) 

95 

96 async def readinto(self: AsyncFile[bytes], b: WriteableBuffer) -> bytes: 

97 return await to_thread.run_sync(self._fp.readinto, b) 

98 

99 async def readinto1(self: AsyncFile[bytes], b: WriteableBuffer) -> bytes: 

100 return await to_thread.run_sync(self._fp.readinto1, b) 

101 

102 @overload 

103 async def write(self: AsyncFile[bytes], b: ReadableBuffer) -> int: ... 

104 

105 @overload 

106 async def write(self: AsyncFile[str], b: str) -> int: ... 

107 

108 async def write(self, b: ReadableBuffer | str) -> int: 

109 return await to_thread.run_sync(self._fp.write, b) 

110 

111 @overload 

112 async def writelines( 

113 self: AsyncFile[bytes], lines: Iterable[ReadableBuffer] 

114 ) -> None: ... 

115 

116 @overload 

117 async def writelines(self: AsyncFile[str], lines: Iterable[str]) -> None: ... 

118 

119 async def writelines(self, lines: Iterable[ReadableBuffer] | Iterable[str]) -> None: 

120 return await to_thread.run_sync(self._fp.writelines, lines) 

121 

122 async def truncate(self, size: int | None = None) -> int: 

123 return await to_thread.run_sync(self._fp.truncate, size) 

124 

125 async def seek(self, offset: int, whence: int | None = os.SEEK_SET) -> int: 

126 return await to_thread.run_sync(self._fp.seek, offset, whence) 

127 

128 async def tell(self) -> int: 

129 return await to_thread.run_sync(self._fp.tell) 

130 

131 async def flush(self) -> None: 

132 return await to_thread.run_sync(self._fp.flush) 

133 

134 

135@overload 

136async def open_file( 

137 file: str | PathLike[str] | int, 

138 mode: OpenBinaryMode, 

139 buffering: int = ..., 

140 encoding: str | None = ..., 

141 errors: str | None = ..., 

142 newline: str | None = ..., 

143 closefd: bool = ..., 

144 opener: Callable[[str, int], int] | None = ..., 

145) -> AsyncFile[bytes]: ... 

146 

147 

148@overload 

149async def open_file( 

150 file: str | PathLike[str] | int, 

151 mode: OpenTextMode = ..., 

152 buffering: int = ..., 

153 encoding: str | None = ..., 

154 errors: str | None = ..., 

155 newline: str | None = ..., 

156 closefd: bool = ..., 

157 opener: Callable[[str, int], int] | None = ..., 

158) -> AsyncFile[str]: ... 

159 

160 

161async def open_file( 

162 file: str | PathLike[str] | int, 

163 mode: str = "r", 

164 buffering: int = -1, 

165 encoding: str | None = None, 

166 errors: str | None = None, 

167 newline: str | None = None, 

168 closefd: bool = True, 

169 opener: Callable[[str, int], int] | None = None, 

170) -> AsyncFile[Any]: 

171 """ 

172 Open a file asynchronously. 

173 

174 The arguments are exactly the same as for the builtin :func:`open`. 

175 

176 :return: an asynchronous file object 

177 

178 """ 

179 fp = await to_thread.run_sync( 

180 open, file, mode, buffering, encoding, errors, newline, closefd, opener 

181 ) 

182 return AsyncFile(fp) 

183 

184 

185def wrap_file(file: IO[AnyStr]) -> AsyncFile[AnyStr]: 

186 """ 

187 Wrap an existing file as an asynchronous file. 

188 

189 :param file: an existing file-like object 

190 :return: an asynchronous file object 

191 

192 """ 

193 return AsyncFile(file) 

194 

195 

196@dataclass(eq=False) 

197class _PathIterator(AsyncIterator["Path"]): 

198 iterator: Iterator[PathLike[str]] 

199 

200 async def __anext__(self) -> Path: 

201 nextval = await to_thread.run_sync( 

202 next, self.iterator, None, abandon_on_cancel=True 

203 ) 

204 if nextval is None: 

205 raise StopAsyncIteration from None 

206 

207 return Path(nextval) 

208 

209 

210class Path: 

211 """ 

212 An asynchronous version of :class:`pathlib.Path`. 

213 

214 This class cannot be substituted for :class:`pathlib.Path` or 

215 :class:`pathlib.PurePath`, but it is compatible with the :class:`os.PathLike` 

216 interface. 

217 

218 It implements the Python 3.10 version of :class:`pathlib.Path` interface, except for 

219 the deprecated :meth:`~pathlib.Path.link_to` method. 

220 

221 Any methods that do disk I/O need to be awaited on. These methods are: 

222 

223 * :meth:`~pathlib.Path.absolute` 

224 * :meth:`~pathlib.Path.chmod` 

225 * :meth:`~pathlib.Path.cwd` 

226 * :meth:`~pathlib.Path.exists` 

227 * :meth:`~pathlib.Path.expanduser` 

228 * :meth:`~pathlib.Path.group` 

229 * :meth:`~pathlib.Path.hardlink_to` 

230 * :meth:`~pathlib.Path.home` 

231 * :meth:`~pathlib.Path.is_block_device` 

232 * :meth:`~pathlib.Path.is_char_device` 

233 * :meth:`~pathlib.Path.is_dir` 

234 * :meth:`~pathlib.Path.is_fifo` 

235 * :meth:`~pathlib.Path.is_file` 

236 * :meth:`~pathlib.Path.is_mount` 

237 * :meth:`~pathlib.Path.lchmod` 

238 * :meth:`~pathlib.Path.lstat` 

239 * :meth:`~pathlib.Path.mkdir` 

240 * :meth:`~pathlib.Path.open` 

241 * :meth:`~pathlib.Path.owner` 

242 * :meth:`~pathlib.Path.read_bytes` 

243 * :meth:`~pathlib.Path.read_text` 

244 * :meth:`~pathlib.Path.readlink` 

245 * :meth:`~pathlib.Path.rename` 

246 * :meth:`~pathlib.Path.replace` 

247 * :meth:`~pathlib.Path.rmdir` 

248 * :meth:`~pathlib.Path.samefile` 

249 * :meth:`~pathlib.Path.stat` 

250 * :meth:`~pathlib.Path.touch` 

251 * :meth:`~pathlib.Path.unlink` 

252 * :meth:`~pathlib.Path.write_bytes` 

253 * :meth:`~pathlib.Path.write_text` 

254 

255 Additionally, the following methods return an async iterator yielding 

256 :class:`~.Path` objects: 

257 

258 * :meth:`~pathlib.Path.glob` 

259 * :meth:`~pathlib.Path.iterdir` 

260 * :meth:`~pathlib.Path.rglob` 

261 """ 

262 

263 __slots__ = "_path", "__weakref__" 

264 

265 __weakref__: Any 

266 

267 def __init__(self, *args: str | PathLike[str]) -> None: 

268 self._path: Final[pathlib.Path] = pathlib.Path(*args) 

269 

270 def __fspath__(self) -> str: 

271 return self._path.__fspath__() 

272 

273 def __str__(self) -> str: 

274 return self._path.__str__() 

275 

276 def __repr__(self) -> str: 

277 return f"{self.__class__.__name__}({self.as_posix()!r})" 

278 

279 def __bytes__(self) -> bytes: 

280 return self._path.__bytes__() 

281 

282 def __hash__(self) -> int: 

283 return self._path.__hash__() 

284 

285 def __eq__(self, other: object) -> bool: 

286 target = other._path if isinstance(other, Path) else other 

287 return self._path.__eq__(target) 

288 

289 def __lt__(self, other: pathlib.PurePath | Path) -> bool: 

290 target = other._path if isinstance(other, Path) else other 

291 return self._path.__lt__(target) 

292 

293 def __le__(self, other: pathlib.PurePath | Path) -> bool: 

294 target = other._path if isinstance(other, Path) else other 

295 return self._path.__le__(target) 

296 

297 def __gt__(self, other: pathlib.PurePath | Path) -> bool: 

298 target = other._path if isinstance(other, Path) else other 

299 return self._path.__gt__(target) 

300 

301 def __ge__(self, other: pathlib.PurePath | Path) -> bool: 

302 target = other._path if isinstance(other, Path) else other 

303 return self._path.__ge__(target) 

304 

305 def __truediv__(self, other: str | PathLike[str]) -> Path: 

306 return Path(self._path / other) 

307 

308 def __rtruediv__(self, other: str | PathLike[str]) -> Path: 

309 return Path(other) / self 

310 

311 @property 

312 def parts(self) -> tuple[str, ...]: 

313 return self._path.parts 

314 

315 @property 

316 def drive(self) -> str: 

317 return self._path.drive 

318 

319 @property 

320 def root(self) -> str: 

321 return self._path.root 

322 

323 @property 

324 def anchor(self) -> str: 

325 return self._path.anchor 

326 

327 @property 

328 def parents(self) -> Sequence[Path]: 

329 return tuple(Path(p) for p in self._path.parents) 

330 

331 @property 

332 def parent(self) -> Path: 

333 return Path(self._path.parent) 

334 

335 @property 

336 def name(self) -> str: 

337 return self._path.name 

338 

339 @property 

340 def suffix(self) -> str: 

341 return self._path.suffix 

342 

343 @property 

344 def suffixes(self) -> list[str]: 

345 return self._path.suffixes 

346 

347 @property 

348 def stem(self) -> str: 

349 return self._path.stem 

350 

351 async def absolute(self) -> Path: 

352 path = await to_thread.run_sync(self._path.absolute) 

353 return Path(path) 

354 

355 def as_posix(self) -> str: 

356 return self._path.as_posix() 

357 

358 def as_uri(self) -> str: 

359 return self._path.as_uri() 

360 

361 if sys.version_info >= (3, 13): 

362 parser = pathlib.Path.parser 

363 

364 @classmethod 

365 def from_uri(cls, uri: str) -> Path: 

366 return Path(pathlib.Path.from_uri(uri)) 

367 

368 def full_match( 

369 self, path_pattern: str, *, case_sensitive: bool | None = None 

370 ) -> bool: 

371 return self._path.full_match(path_pattern, case_sensitive=case_sensitive) 

372 

373 def match( 

374 self, path_pattern: str, *, case_sensitive: bool | None = None 

375 ) -> bool: 

376 return self._path.match(path_pattern, case_sensitive=case_sensitive) 

377 else: 

378 

379 def match(self, path_pattern: str) -> bool: 

380 return self._path.match(path_pattern) 

381 

382 def is_relative_to(self, other: str | PathLike[str]) -> bool: 

383 try: 

384 self.relative_to(other) 

385 return True 

386 except ValueError: 

387 return False 

388 

389 async def is_junction(self) -> bool: 

390 return await to_thread.run_sync(self._path.is_junction) 

391 

392 async def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: 

393 func = partial(os.chmod, follow_symlinks=follow_symlinks) 

394 return await to_thread.run_sync(func, self._path, mode) 

395 

396 @classmethod 

397 async def cwd(cls) -> Path: 

398 path = await to_thread.run_sync(pathlib.Path.cwd) 

399 return cls(path) 

400 

401 async def exists(self) -> bool: 

402 return await to_thread.run_sync(self._path.exists, abandon_on_cancel=True) 

403 

404 async def expanduser(self) -> Path: 

405 return Path( 

406 await to_thread.run_sync(self._path.expanduser, abandon_on_cancel=True) 

407 ) 

408 

409 def glob(self, pattern: str) -> AsyncIterator[Path]: 

410 gen = self._path.glob(pattern) 

411 return _PathIterator(gen) 

412 

413 async def group(self) -> str: 

414 return await to_thread.run_sync(self._path.group, abandon_on_cancel=True) 

415 

416 async def hardlink_to( 

417 self, target: str | bytes | PathLike[str] | PathLike[bytes] 

418 ) -> None: 

419 if isinstance(target, Path): 

420 target = target._path 

421 

422 await to_thread.run_sync(os.link, target, self) 

423 

424 @classmethod 

425 async def home(cls) -> Path: 

426 home_path = await to_thread.run_sync(pathlib.Path.home) 

427 return cls(home_path) 

428 

429 def is_absolute(self) -> bool: 

430 return self._path.is_absolute() 

431 

432 async def is_block_device(self) -> bool: 

433 return await to_thread.run_sync( 

434 self._path.is_block_device, abandon_on_cancel=True 

435 ) 

436 

437 async def is_char_device(self) -> bool: 

438 return await to_thread.run_sync( 

439 self._path.is_char_device, abandon_on_cancel=True 

440 ) 

441 

442 async def is_dir(self) -> bool: 

443 return await to_thread.run_sync(self._path.is_dir, abandon_on_cancel=True) 

444 

445 async def is_fifo(self) -> bool: 

446 return await to_thread.run_sync(self._path.is_fifo, abandon_on_cancel=True) 

447 

448 async def is_file(self) -> bool: 

449 return await to_thread.run_sync(self._path.is_file, abandon_on_cancel=True) 

450 

451 async def is_mount(self) -> bool: 

452 return await to_thread.run_sync( 

453 os.path.ismount, self._path, abandon_on_cancel=True 

454 ) 

455 

456 def is_reserved(self) -> bool: 

457 return self._path.is_reserved() 

458 

459 async def is_socket(self) -> bool: 

460 return await to_thread.run_sync(self._path.is_socket, abandon_on_cancel=True) 

461 

462 async def is_symlink(self) -> bool: 

463 return await to_thread.run_sync(self._path.is_symlink, abandon_on_cancel=True) 

464 

465 def iterdir(self) -> AsyncIterator[Path]: 

466 gen = self._path.iterdir() 

467 return _PathIterator(gen) 

468 

469 def joinpath(self, *args: str | PathLike[str]) -> Path: 

470 return Path(self._path.joinpath(*args)) 

471 

472 async def lchmod(self, mode: int) -> None: 

473 await to_thread.run_sync(self._path.lchmod, mode) 

474 

475 async def lstat(self) -> os.stat_result: 

476 return await to_thread.run_sync(self._path.lstat, abandon_on_cancel=True) 

477 

478 async def mkdir( 

479 self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False 

480 ) -> None: 

481 await to_thread.run_sync(self._path.mkdir, mode, parents, exist_ok) 

482 

483 @overload 

484 async def open( 

485 self, 

486 mode: OpenBinaryMode, 

487 buffering: int = ..., 

488 encoding: str | None = ..., 

489 errors: str | None = ..., 

490 newline: str | None = ..., 

491 ) -> AsyncFile[bytes]: ... 

492 

493 @overload 

494 async def open( 

495 self, 

496 mode: OpenTextMode = ..., 

497 buffering: int = ..., 

498 encoding: str | None = ..., 

499 errors: str | None = ..., 

500 newline: str | None = ..., 

501 ) -> AsyncFile[str]: ... 

502 

503 async def open( 

504 self, 

505 mode: str = "r", 

506 buffering: int = -1, 

507 encoding: str | None = None, 

508 errors: str | None = None, 

509 newline: str | None = None, 

510 ) -> AsyncFile[Any]: 

511 fp = await to_thread.run_sync( 

512 self._path.open, mode, buffering, encoding, errors, newline 

513 ) 

514 return AsyncFile(fp) 

515 

516 async def owner(self) -> str: 

517 return await to_thread.run_sync(self._path.owner, abandon_on_cancel=True) 

518 

519 async def read_bytes(self) -> bytes: 

520 return await to_thread.run_sync(self._path.read_bytes) 

521 

522 async def read_text( 

523 self, encoding: str | None = None, errors: str | None = None 

524 ) -> str: 

525 return await to_thread.run_sync(self._path.read_text, encoding, errors) 

526 

527 if sys.version_info >= (3, 12): 

528 

529 def relative_to( 

530 self, *other: str | PathLike[str], walk_up: bool = False 

531 ) -> Path: 

532 return Path(self._path.relative_to(*other, walk_up=walk_up)) 

533 

534 else: 

535 

536 def relative_to(self, *other: str | PathLike[str]) -> Path: 

537 return Path(self._path.relative_to(*other)) 

538 

539 async def readlink(self) -> Path: 

540 target = await to_thread.run_sync(os.readlink, self._path) 

541 return Path(target) 

542 

543 async def rename(self, target: str | pathlib.PurePath | Path) -> Path: 

544 if isinstance(target, Path): 

545 target = target._path 

546 

547 await to_thread.run_sync(self._path.rename, target) 

548 return Path(target) 

549 

550 async def replace(self, target: str | pathlib.PurePath | Path) -> Path: 

551 if isinstance(target, Path): 

552 target = target._path 

553 

554 await to_thread.run_sync(self._path.replace, target) 

555 return Path(target) 

556 

557 async def resolve(self, strict: bool = False) -> Path: 

558 func = partial(self._path.resolve, strict=strict) 

559 return Path(await to_thread.run_sync(func, abandon_on_cancel=True)) 

560 

561 def rglob(self, pattern: str) -> AsyncIterator[Path]: 

562 gen = self._path.rglob(pattern) 

563 return _PathIterator(gen) 

564 

565 async def rmdir(self) -> None: 

566 await to_thread.run_sync(self._path.rmdir) 

567 

568 async def samefile(self, other_path: str | PathLike[str]) -> bool: 

569 if isinstance(other_path, Path): 

570 other_path = other_path._path 

571 

572 return await to_thread.run_sync( 

573 self._path.samefile, other_path, abandon_on_cancel=True 

574 ) 

575 

576 async def stat(self, *, follow_symlinks: bool = True) -> os.stat_result: 

577 func = partial(os.stat, follow_symlinks=follow_symlinks) 

578 return await to_thread.run_sync(func, self._path, abandon_on_cancel=True) 

579 

580 async def symlink_to( 

581 self, 

582 target: str | bytes | PathLike[str] | PathLike[bytes], 

583 target_is_directory: bool = False, 

584 ) -> None: 

585 if isinstance(target, Path): 

586 target = target._path 

587 

588 await to_thread.run_sync(self._path.symlink_to, target, target_is_directory) 

589 

590 async def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None: 

591 await to_thread.run_sync(self._path.touch, mode, exist_ok) 

592 

593 async def unlink(self, missing_ok: bool = False) -> None: 

594 try: 

595 await to_thread.run_sync(self._path.unlink) 

596 except FileNotFoundError: 

597 if not missing_ok: 

598 raise 

599 

600 if sys.version_info >= (3, 12): 

601 

602 async def walk( 

603 self, 

604 top_down: bool = True, 

605 on_error: Callable[[OSError], object] | None = None, 

606 follow_symlinks: bool = False, 

607 ) -> AsyncIterator[tuple[Path, list[str], list[str]]]: 

608 def get_next_value() -> tuple[pathlib.Path, list[str], list[str]] | None: 

609 try: 

610 return next(gen) 

611 except StopIteration: 

612 return None 

613 

614 gen = self._path.walk(top_down, on_error, follow_symlinks) 

615 while True: 

616 value = await to_thread.run_sync(get_next_value) 

617 if value is None: 

618 return 

619 

620 root, dirs, paths = value 

621 yield Path(root), dirs, paths 

622 

623 def with_name(self, name: str) -> Path: 

624 return Path(self._path.with_name(name)) 

625 

626 def with_stem(self, stem: str) -> Path: 

627 return Path(self._path.with_name(stem + self._path.suffix)) 

628 

629 def with_suffix(self, suffix: str) -> Path: 

630 return Path(self._path.with_suffix(suffix)) 

631 

632 def with_segments(self, *pathsegments: str | PathLike[str]) -> Path: 

633 return Path(*pathsegments) 

634 

635 async def write_bytes(self, data: bytes) -> int: 

636 return await to_thread.run_sync(self._path.write_bytes, data) 

637 

638 async def write_text( 

639 self, 

640 data: str, 

641 encoding: str | None = None, 

642 errors: str | None = None, 

643 newline: str | None = None, 

644 ) -> int: 

645 # Path.write_text() does not support the "newline" parameter before Python 3.10 

646 def sync_write_text() -> int: 

647 with self._path.open( 

648 "w", encoding=encoding, errors=errors, newline=newline 

649 ) as fp: 

650 return fp.write(data) 

651 

652 return await to_thread.run_sync(sync_write_text) 

653 

654 

655PathLike.register(Path)