Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_fileio.py: 49%

290 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import os 

2import pathlib 

3import sys 

4from dataclasses import dataclass 

5from functools import partial 

6from os import PathLike 

7from typing import ( 

8 IO, 

9 TYPE_CHECKING, 

10 Any, 

11 AnyStr, 

12 AsyncIterator, 

13 Callable, 

14 Generic, 

15 Iterable, 

16 Iterator, 

17 List, 

18 Optional, 

19 Sequence, 

20 Tuple, 

21 Union, 

22 cast, 

23 overload, 

24) 

25 

26from .. import to_thread 

27from ..abc import AsyncResource 

28 

29if sys.version_info >= (3, 8): 

30 from typing import Final 

31else: 

32 from typing_extensions import Final 

33 

34if TYPE_CHECKING: 

35 from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer 

36else: 

37 ReadableBuffer = OpenBinaryMode = OpenTextMode = WriteableBuffer = object 

38 

39 

40class AsyncFile(AsyncResource, Generic[AnyStr]): 

41 """ 

42 An asynchronous file object. 

43 

44 This class wraps a standard file object and provides async friendly versions of the following 

45 blocking methods (where available on the original file object): 

46 

47 * read 

48 * read1 

49 * readline 

50 * readlines 

51 * readinto 

52 * readinto1 

53 * write 

54 * writelines 

55 * truncate 

56 * seek 

57 * tell 

58 * flush 

59 

60 All other methods are directly passed through. 

61 

62 This class supports the asynchronous context manager protocol which closes the underlying file 

63 at the end of the context block. 

64 

65 This class also supports asynchronous iteration:: 

66 

67 async with await open_file(...) as f: 

68 async for line in f: 

69 print(line) 

70 """ 

71 

72 def __init__(self, fp: IO[AnyStr]) -> None: 

73 self._fp: Any = fp 

74 

75 def __getattr__(self, name: str) -> object: 

76 return getattr(self._fp, name) 

77 

78 @property 

79 def wrapped(self) -> IO[AnyStr]: 

80 """The wrapped file object.""" 

81 return self._fp 

82 

83 async def __aiter__(self) -> AsyncIterator[AnyStr]: 

84 while True: 

85 line = await self.readline() 

86 if line: 

87 yield line 

88 else: 

89 break 

90 

91 async def aclose(self) -> None: 

92 return await to_thread.run_sync(self._fp.close) 

93 

94 async def read(self, size: int = -1) -> AnyStr: 

95 return await to_thread.run_sync(self._fp.read, size) 

96 

97 async def read1(self: "AsyncFile[bytes]", size: int = -1) -> bytes: 

98 return await to_thread.run_sync(self._fp.read1, size) 

99 

100 async def readline(self) -> AnyStr: 

101 return await to_thread.run_sync(self._fp.readline) 

102 

103 async def readlines(self) -> List[AnyStr]: 

104 return await to_thread.run_sync(self._fp.readlines) 

105 

106 async def readinto(self: "AsyncFile[bytes]", b: WriteableBuffer) -> bytes: 

107 return await to_thread.run_sync(self._fp.readinto, b) 

108 

109 async def readinto1(self: "AsyncFile[bytes]", b: WriteableBuffer) -> bytes: 

110 return await to_thread.run_sync(self._fp.readinto1, b) 

111 

112 @overload 

113 async def write(self: "AsyncFile[bytes]", b: ReadableBuffer) -> int: 

114 ... 

115 

116 @overload 

117 async def write(self: "AsyncFile[str]", b: str) -> int: 

118 ... 

119 

120 async def write(self, b: Union[ReadableBuffer, str]) -> int: 

121 return await to_thread.run_sync(self._fp.write, b) 

122 

123 @overload 

124 async def writelines( 

125 self: "AsyncFile[bytes]", lines: Iterable[ReadableBuffer] 

126 ) -> None: 

127 ... 

128 

129 @overload 

130 async def writelines(self: "AsyncFile[str]", lines: Iterable[str]) -> None: 

131 ... 

132 

133 async def writelines( 

134 self, lines: Union[Iterable[ReadableBuffer], Iterable[str]] 

135 ) -> None: 

136 return await to_thread.run_sync(self._fp.writelines, lines) 

137 

138 async def truncate(self, size: Optional[int] = None) -> int: 

139 return await to_thread.run_sync(self._fp.truncate, size) 

140 

141 async def seek(self, offset: int, whence: Optional[int] = os.SEEK_SET) -> int: 

142 return await to_thread.run_sync(self._fp.seek, offset, whence) 

143 

144 async def tell(self) -> int: 

145 return await to_thread.run_sync(self._fp.tell) 

146 

147 async def flush(self) -> None: 

148 return await to_thread.run_sync(self._fp.flush) 

149 

150 

151@overload 

152async def open_file( 

153 file: Union[str, "PathLike[str]", int], 

154 mode: OpenBinaryMode, 

155 buffering: int = ..., 

156 encoding: Optional[str] = ..., 

157 errors: Optional[str] = ..., 

158 newline: Optional[str] = ..., 

159 closefd: bool = ..., 

160 opener: Optional[Callable[[str, int], int]] = ..., 

161) -> AsyncFile[bytes]: 

162 ... 

163 

164 

165@overload 

166async def open_file( 

167 file: Union[str, "PathLike[str]", int], 

168 mode: OpenTextMode = ..., 

169 buffering: int = ..., 

170 encoding: Optional[str] = ..., 

171 errors: Optional[str] = ..., 

172 newline: Optional[str] = ..., 

173 closefd: bool = ..., 

174 opener: Optional[Callable[[str, int], int]] = ..., 

175) -> AsyncFile[str]: 

176 ... 

177 

178 

179async def open_file( 

180 file: Union[str, "PathLike[str]", int], 

181 mode: str = "r", 

182 buffering: int = -1, 

183 encoding: Optional[str] = None, 

184 errors: Optional[str] = None, 

185 newline: Optional[str] = None, 

186 closefd: bool = True, 

187 opener: Optional[Callable[[str, int], int]] = None, 

188) -> AsyncFile[Any]: 

189 """ 

190 Open a file asynchronously. 

191 

192 The arguments are exactly the same as for the builtin :func:`open`. 

193 

194 :return: an asynchronous file object 

195 

196 """ 

197 fp = await to_thread.run_sync( 

198 open, file, mode, buffering, encoding, errors, newline, closefd, opener 

199 ) 

200 return AsyncFile(fp) 

201 

202 

203def wrap_file(file: IO[AnyStr]) -> AsyncFile[AnyStr]: 

204 """ 

205 Wrap an existing file as an asynchronous file. 

206 

207 :param file: an existing file-like object 

208 :return: an asynchronous file object 

209 

210 """ 

211 return AsyncFile(file) 

212 

213 

214@dataclass(eq=False) 

215class _PathIterator(AsyncIterator["Path"]): 

216 iterator: Iterator["PathLike[str]"] 

217 

218 async def __anext__(self) -> "Path": 

219 nextval = await to_thread.run_sync(next, self.iterator, None, cancellable=True) 

220 if nextval is None: 

221 raise StopAsyncIteration from None 

222 

223 return Path(cast("PathLike[str]", nextval)) 

224 

225 

226class Path: 

227 """ 

228 An asynchronous version of :class:`pathlib.Path`. 

229 

230 This class cannot be substituted for :class:`pathlib.Path` or :class:`pathlib.PurePath`, but 

231 it is compatible with the :class:`os.PathLike` interface. 

232 

233 It implements the Python 3.10 version of :class:`pathlib.Path` interface, except for the 

234 deprecated :meth:`~pathlib.Path.link_to` method. 

235 

236 Any methods that do disk I/O need to be awaited on. These methods are: 

237 

238 * :meth:`~pathlib.Path.absolute` 

239 * :meth:`~pathlib.Path.chmod` 

240 * :meth:`~pathlib.Path.cwd` 

241 * :meth:`~pathlib.Path.exists` 

242 * :meth:`~pathlib.Path.expanduser` 

243 * :meth:`~pathlib.Path.group` 

244 * :meth:`~pathlib.Path.hardlink_to` 

245 * :meth:`~pathlib.Path.home` 

246 * :meth:`~pathlib.Path.is_block_device` 

247 * :meth:`~pathlib.Path.is_char_device` 

248 * :meth:`~pathlib.Path.is_dir` 

249 * :meth:`~pathlib.Path.is_fifo` 

250 * :meth:`~pathlib.Path.is_file` 

251 * :meth:`~pathlib.Path.is_mount` 

252 * :meth:`~pathlib.Path.lchmod` 

253 * :meth:`~pathlib.Path.lstat` 

254 * :meth:`~pathlib.Path.mkdir` 

255 * :meth:`~pathlib.Path.open` 

256 * :meth:`~pathlib.Path.owner` 

257 * :meth:`~pathlib.Path.read_bytes` 

258 * :meth:`~pathlib.Path.read_text` 

259 * :meth:`~pathlib.Path.readlink` 

260 * :meth:`~pathlib.Path.rename` 

261 * :meth:`~pathlib.Path.replace` 

262 * :meth:`~pathlib.Path.rmdir` 

263 * :meth:`~pathlib.Path.samefile` 

264 * :meth:`~pathlib.Path.stat` 

265 * :meth:`~pathlib.Path.touch` 

266 * :meth:`~pathlib.Path.unlink` 

267 * :meth:`~pathlib.Path.write_bytes` 

268 * :meth:`~pathlib.Path.write_text` 

269 

270 Additionally, the following methods return an async iterator yielding :class:`~.Path` objects: 

271 

272 * :meth:`~pathlib.Path.glob` 

273 * :meth:`~pathlib.Path.iterdir` 

274 * :meth:`~pathlib.Path.rglob` 

275 """ 

276 

277 __slots__ = "_path", "__weakref__" 

278 

279 __weakref__: Any 

280 

281 def __init__(self, *args: Union[str, "PathLike[str]"]) -> None: 

282 self._path: Final[pathlib.Path] = pathlib.Path(*args) 

283 

284 def __fspath__(self) -> str: 

285 return self._path.__fspath__() 

286 

287 def __str__(self) -> str: 

288 return self._path.__str__() 

289 

290 def __repr__(self) -> str: 

291 return f"{self.__class__.__name__}({self.as_posix()!r})" 

292 

293 def __bytes__(self) -> bytes: 

294 return self._path.__bytes__() 

295 

296 def __hash__(self) -> int: 

297 return self._path.__hash__() 

298 

299 def __eq__(self, other: object) -> bool: 

300 target = other._path if isinstance(other, Path) else other 

301 return self._path.__eq__(target) 

302 

303 def __lt__(self, other: "Path") -> bool: 

304 target = other._path if isinstance(other, Path) else other 

305 return self._path.__lt__(target) 

306 

307 def __le__(self, other: "Path") -> bool: 

308 target = other._path if isinstance(other, Path) else other 

309 return self._path.__le__(target) 

310 

311 def __gt__(self, other: "Path") -> bool: 

312 target = other._path if isinstance(other, Path) else other 

313 return self._path.__gt__(target) 

314 

315 def __ge__(self, other: "Path") -> bool: 

316 target = other._path if isinstance(other, Path) else other 

317 return self._path.__ge__(target) 

318 

319 def __truediv__(self, other: Any) -> "Path": 

320 return Path(self._path / other) 

321 

322 def __rtruediv__(self, other: Any) -> "Path": 

323 return Path(other) / self 

324 

325 @property 

326 def parts(self) -> Tuple[str, ...]: 

327 return self._path.parts 

328 

329 @property 

330 def drive(self) -> str: 

331 return self._path.drive 

332 

333 @property 

334 def root(self) -> str: 

335 return self._path.root 

336 

337 @property 

338 def anchor(self) -> str: 

339 return self._path.anchor 

340 

341 @property 

342 def parents(self) -> Sequence["Path"]: 

343 return tuple(Path(p) for p in self._path.parents) 

344 

345 @property 

346 def parent(self) -> "Path": 

347 return Path(self._path.parent) 

348 

349 @property 

350 def name(self) -> str: 

351 return self._path.name 

352 

353 @property 

354 def suffix(self) -> str: 

355 return self._path.suffix 

356 

357 @property 

358 def suffixes(self) -> List[str]: 

359 return self._path.suffixes 

360 

361 @property 

362 def stem(self) -> str: 

363 return self._path.stem 

364 

365 async def absolute(self) -> "Path": 

366 path = await to_thread.run_sync(self._path.absolute) 

367 return Path(path) 

368 

369 def as_posix(self) -> str: 

370 return self._path.as_posix() 

371 

372 def as_uri(self) -> str: 

373 return self._path.as_uri() 

374 

375 def match(self, path_pattern: str) -> bool: 

376 return self._path.match(path_pattern) 

377 

378 def is_relative_to(self, *other: Union[str, "PathLike[str]"]) -> bool: 

379 try: 

380 self.relative_to(*other) 

381 return True 

382 except ValueError: 

383 return False 

384 

385 async def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: 

386 func = partial(os.chmod, follow_symlinks=follow_symlinks) 

387 return await to_thread.run_sync(func, self._path, mode) 

388 

389 @classmethod 

390 async def cwd(cls) -> "Path": 

391 path = await to_thread.run_sync(pathlib.Path.cwd) 

392 return cls(path) 

393 

394 async def exists(self) -> bool: 

395 return await to_thread.run_sync(self._path.exists, cancellable=True) 

396 

397 async def expanduser(self) -> "Path": 

398 return Path(await to_thread.run_sync(self._path.expanduser, cancellable=True)) 

399 

400 def glob(self, pattern: str) -> AsyncIterator["Path"]: 

401 gen = self._path.glob(pattern) 

402 return _PathIterator(gen) 

403 

404 async def group(self) -> str: 

405 return await to_thread.run_sync(self._path.group, cancellable=True) 

406 

407 async def hardlink_to(self, target: Union[str, pathlib.Path, "Path"]) -> None: 

408 if isinstance(target, Path): 

409 target = target._path 

410 

411 await to_thread.run_sync(os.link, target, self) 

412 

413 @classmethod 

414 async def home(cls) -> "Path": 

415 home_path = await to_thread.run_sync(pathlib.Path.home) 

416 return cls(home_path) 

417 

418 def is_absolute(self) -> bool: 

419 return self._path.is_absolute() 

420 

421 async def is_block_device(self) -> bool: 

422 return await to_thread.run_sync(self._path.is_block_device, cancellable=True) 

423 

424 async def is_char_device(self) -> bool: 

425 return await to_thread.run_sync(self._path.is_char_device, cancellable=True) 

426 

427 async def is_dir(self) -> bool: 

428 return await to_thread.run_sync(self._path.is_dir, cancellable=True) 

429 

430 async def is_fifo(self) -> bool: 

431 return await to_thread.run_sync(self._path.is_fifo, cancellable=True) 

432 

433 async def is_file(self) -> bool: 

434 return await to_thread.run_sync(self._path.is_file, cancellable=True) 

435 

436 async def is_mount(self) -> bool: 

437 return await to_thread.run_sync(os.path.ismount, self._path, cancellable=True) 

438 

439 def is_reserved(self) -> bool: 

440 return self._path.is_reserved() 

441 

442 async def is_socket(self) -> bool: 

443 return await to_thread.run_sync(self._path.is_socket, cancellable=True) 

444 

445 async def is_symlink(self) -> bool: 

446 return await to_thread.run_sync(self._path.is_symlink, cancellable=True) 

447 

448 def iterdir(self) -> AsyncIterator["Path"]: 

449 gen = self._path.iterdir() 

450 return _PathIterator(gen) 

451 

452 def joinpath(self, *args: Union[str, "PathLike[str]"]) -> "Path": 

453 return Path(self._path.joinpath(*args)) 

454 

455 async def lchmod(self, mode: int) -> None: 

456 await to_thread.run_sync(self._path.lchmod, mode) 

457 

458 async def lstat(self) -> os.stat_result: 

459 return await to_thread.run_sync(self._path.lstat, cancellable=True) 

460 

461 async def mkdir( 

462 self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False 

463 ) -> None: 

464 await to_thread.run_sync(self._path.mkdir, mode, parents, exist_ok) 

465 

466 @overload 

467 async def open( 

468 self, 

469 mode: OpenBinaryMode, 

470 buffering: int = ..., 

471 encoding: Optional[str] = ..., 

472 errors: Optional[str] = ..., 

473 newline: Optional[str] = ..., 

474 ) -> AsyncFile[bytes]: 

475 ... 

476 

477 @overload 

478 async def open( 

479 self, 

480 mode: OpenTextMode = ..., 

481 buffering: int = ..., 

482 encoding: Optional[str] = ..., 

483 errors: Optional[str] = ..., 

484 newline: Optional[str] = ..., 

485 ) -> AsyncFile[str]: 

486 ... 

487 

488 async def open( 

489 self, 

490 mode: str = "r", 

491 buffering: int = -1, 

492 encoding: Optional[str] = None, 

493 errors: Optional[str] = None, 

494 newline: Optional[str] = None, 

495 ) -> AsyncFile[Any]: 

496 fp = await to_thread.run_sync( 

497 self._path.open, mode, buffering, encoding, errors, newline 

498 ) 

499 return AsyncFile(fp) 

500 

501 async def owner(self) -> str: 

502 return await to_thread.run_sync(self._path.owner, cancellable=True) 

503 

504 async def read_bytes(self) -> bytes: 

505 return await to_thread.run_sync(self._path.read_bytes) 

506 

507 async def read_text( 

508 self, encoding: Optional[str] = None, errors: Optional[str] = None 

509 ) -> str: 

510 return await to_thread.run_sync(self._path.read_text, encoding, errors) 

511 

512 def relative_to(self, *other: Union[str, "PathLike[str]"]) -> "Path": 

513 return Path(self._path.relative_to(*other)) 

514 

515 async def readlink(self) -> "Path": 

516 target = await to_thread.run_sync(os.readlink, self._path) 

517 return Path(cast(str, target)) 

518 

519 async def rename(self, target: Union[str, pathlib.PurePath, "Path"]) -> "Path": 

520 if isinstance(target, Path): 

521 target = target._path 

522 

523 await to_thread.run_sync(self._path.rename, target) 

524 return Path(target) 

525 

526 async def replace(self, target: Union[str, pathlib.PurePath, "Path"]) -> "Path": 

527 if isinstance(target, Path): 

528 target = target._path 

529 

530 await to_thread.run_sync(self._path.replace, target) 

531 return Path(target) 

532 

533 async def resolve(self, strict: bool = False) -> "Path": 

534 func = partial(self._path.resolve, strict=strict) 

535 return Path(await to_thread.run_sync(func, cancellable=True)) 

536 

537 def rglob(self, pattern: str) -> AsyncIterator["Path"]: 

538 gen = self._path.rglob(pattern) 

539 return _PathIterator(gen) 

540 

541 async def rmdir(self) -> None: 

542 await to_thread.run_sync(self._path.rmdir) 

543 

544 async def samefile( 

545 self, other_path: Union[str, bytes, int, pathlib.Path, "Path"] 

546 ) -> bool: 

547 if isinstance(other_path, Path): 

548 other_path = other_path._path 

549 

550 return await to_thread.run_sync( 

551 self._path.samefile, other_path, cancellable=True 

552 ) 

553 

554 async def stat(self, *, follow_symlinks: bool = True) -> os.stat_result: 

555 func = partial(os.stat, follow_symlinks=follow_symlinks) 

556 return await to_thread.run_sync(func, self._path, cancellable=True) 

557 

558 async def symlink_to( 

559 self, 

560 target: Union[str, pathlib.Path, "Path"], 

561 target_is_directory: bool = False, 

562 ) -> None: 

563 if isinstance(target, Path): 

564 target = target._path 

565 

566 await to_thread.run_sync(self._path.symlink_to, target, target_is_directory) 

567 

568 async def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None: 

569 await to_thread.run_sync(self._path.touch, mode, exist_ok) 

570 

571 async def unlink(self, missing_ok: bool = False) -> None: 

572 try: 

573 await to_thread.run_sync(self._path.unlink) 

574 except FileNotFoundError: 

575 if not missing_ok: 

576 raise 

577 

578 def with_name(self, name: str) -> "Path": 

579 return Path(self._path.with_name(name)) 

580 

581 def with_stem(self, stem: str) -> "Path": 

582 return Path(self._path.with_name(stem + self._path.suffix)) 

583 

584 def with_suffix(self, suffix: str) -> "Path": 

585 return Path(self._path.with_suffix(suffix)) 

586 

587 async def write_bytes(self, data: bytes) -> int: 

588 return await to_thread.run_sync(self._path.write_bytes, data) 

589 

590 async def write_text( 

591 self, 

592 data: str, 

593 encoding: Optional[str] = None, 

594 errors: Optional[str] = None, 

595 newline: Optional[str] = None, 

596 ) -> int: 

597 # Path.write_text() does not support the "newline" parameter before Python 3.10 

598 def sync_write_text() -> int: 

599 with self._path.open( 

600 "w", encoding=encoding, errors=errors, newline=newline 

601 ) as fp: 

602 return fp.write(data) 

603 

604 return await to_thread.run_sync(sync_write_text) 

605 

606 

607PathLike.register(Path)