Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/_core/_fileio.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import os
4import pathlib
5import sys
6from collections.abc import (
7 AsyncIterator,
8 Callable,
9 Iterable,
10 Iterator,
11 Sequence,
12)
13from dataclasses import dataclass
14from functools import partial
15from os import PathLike
16from typing import (
17 IO,
18 TYPE_CHECKING,
19 Any,
20 AnyStr,
21 ClassVar,
22 Final,
23 Generic,
24 overload,
25)
27from .. import to_thread
28from ..abc import AsyncResource
30if TYPE_CHECKING:
31 from types import ModuleType
33 from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer
34else:
35 ReadableBuffer = OpenBinaryMode = OpenTextMode = WriteableBuffer = object
38class AsyncFile(AsyncResource, Generic[AnyStr]):
39 """
40 An asynchronous file object.
42 This class wraps a standard file object and provides async friendly versions of the
43 following blocking methods (where available on the original file object):
45 * read
46 * read1
47 * readline
48 * readlines
49 * readinto
50 * readinto1
51 * write
52 * writelines
53 * truncate
54 * seek
55 * tell
56 * flush
58 All other methods are directly passed through.
60 This class supports the asynchronous context manager protocol which closes the
61 underlying file at the end of the context block.
63 This class also supports asynchronous iteration::
65 async with await open_file(...) as f:
66 async for line in f:
67 print(line)
68 """
70 def __init__(self, fp: IO[AnyStr]) -> None:
71 self._fp: Any = fp
73 def __getattr__(self, name: str) -> object:
74 return getattr(self._fp, name)
76 @property
77 def wrapped(self) -> IO[AnyStr]:
78 """The wrapped file object."""
79 return self._fp
81 async def __aiter__(self) -> AsyncIterator[AnyStr]:
82 while True:
83 line = await self.readline()
84 if line:
85 yield line
86 else:
87 break
89 async def aclose(self) -> None:
90 return await to_thread.run_sync(self._fp.close)
92 async def read(self, size: int = -1) -> AnyStr:
93 return await to_thread.run_sync(self._fp.read, size)
95 async def read1(self: AsyncFile[bytes], size: int = -1) -> bytes:
96 return await to_thread.run_sync(self._fp.read1, size)
98 async def readline(self) -> AnyStr:
99 return await to_thread.run_sync(self._fp.readline)
101 async def readlines(self) -> list[AnyStr]:
102 return await to_thread.run_sync(self._fp.readlines)
104 async def readinto(self: AsyncFile[bytes], b: WriteableBuffer) -> int:
105 return await to_thread.run_sync(self._fp.readinto, b)
107 async def readinto1(self: AsyncFile[bytes], b: WriteableBuffer) -> int:
108 return await to_thread.run_sync(self._fp.readinto1, b)
110 @overload
111 async def write(self: AsyncFile[bytes], b: ReadableBuffer) -> int: ...
113 @overload
114 async def write(self: AsyncFile[str], b: str) -> int: ...
116 async def write(self, b: ReadableBuffer | str) -> int:
117 return await to_thread.run_sync(self._fp.write, b)
119 @overload
120 async def writelines(
121 self: AsyncFile[bytes], lines: Iterable[ReadableBuffer]
122 ) -> None: ...
124 @overload
125 async def writelines(self: AsyncFile[str], lines: Iterable[str]) -> None: ...
127 async def writelines(self, lines: Iterable[ReadableBuffer] | Iterable[str]) -> None:
128 return await to_thread.run_sync(self._fp.writelines, lines)
130 async def truncate(self, size: int | None = None) -> int:
131 return await to_thread.run_sync(self._fp.truncate, size)
133 async def seek(self, offset: int, whence: int | None = os.SEEK_SET) -> int:
134 return await to_thread.run_sync(self._fp.seek, offset, whence)
136 async def tell(self) -> int:
137 return await to_thread.run_sync(self._fp.tell)
139 async def flush(self) -> None:
140 return await to_thread.run_sync(self._fp.flush)
143@overload
144async def open_file(
145 file: str | PathLike[str] | int,
146 mode: OpenBinaryMode,
147 buffering: int = ...,
148 encoding: str | None = ...,
149 errors: str | None = ...,
150 newline: str | None = ...,
151 closefd: bool = ...,
152 opener: Callable[[str, int], int] | None = ...,
153) -> AsyncFile[bytes]: ...
156@overload
157async def open_file(
158 file: str | PathLike[str] | int,
159 mode: OpenTextMode = ...,
160 buffering: int = ...,
161 encoding: str | None = ...,
162 errors: str | None = ...,
163 newline: str | None = ...,
164 closefd: bool = ...,
165 opener: Callable[[str, int], int] | None = ...,
166) -> AsyncFile[str]: ...
169async def open_file(
170 file: str | PathLike[str] | int,
171 mode: str = "r",
172 buffering: int = -1,
173 encoding: str | None = None,
174 errors: str | None = None,
175 newline: str | None = None,
176 closefd: bool = True,
177 opener: Callable[[str, int], int] | None = None,
178) -> AsyncFile[Any]:
179 """
180 Open a file asynchronously.
182 The arguments are exactly the same as for the builtin :func:`open`.
184 :return: an asynchronous file object
186 """
187 fp = await to_thread.run_sync(
188 open, file, mode, buffering, encoding, errors, newline, closefd, opener
189 )
190 return AsyncFile(fp)
193def wrap_file(file: IO[AnyStr]) -> AsyncFile[AnyStr]:
194 """
195 Wrap an existing file as an asynchronous file.
197 :param file: an existing file-like object
198 :return: an asynchronous file object
200 """
201 return AsyncFile(file)
204@dataclass(eq=False)
205class _PathIterator(AsyncIterator["Path"]):
206 iterator: Iterator[PathLike[str]]
208 async def __anext__(self) -> Path:
209 nextval = await to_thread.run_sync(
210 next, self.iterator, None, abandon_on_cancel=True
211 )
212 if nextval is None:
213 raise StopAsyncIteration from None
215 return Path(nextval)
218class Path:
219 """
220 An asynchronous version of :class:`pathlib.Path`.
222 This class cannot be substituted for :class:`pathlib.Path` or
223 :class:`pathlib.PurePath`, but it is compatible with the :class:`os.PathLike`
224 interface.
226 It implements the Python 3.10 version of :class:`pathlib.Path` interface, except for
227 the deprecated :meth:`~pathlib.Path.link_to` method.
229 Some methods may be unavailable or have limited functionality, based on the Python
230 version:
232 * :meth:`~pathlib.Path.copy` (available on Python 3.14 or later)
233 * :meth:`~pathlib.Path.copy_into` (available on Python 3.14 or later)
234 * :meth:`~pathlib.Path.from_uri` (available on Python 3.13 or later)
235 * :meth:`~pathlib.PurePath.full_match` (available on Python 3.13 or later)
236 * :attr:`~pathlib.Path.info` (available on Python 3.14 or later)
237 * :meth:`~pathlib.Path.is_junction` (available on Python 3.12 or later)
238 * :meth:`~pathlib.PurePath.match` (the ``case_sensitive`` parameter is only
239 available on Python 3.13 or later)
240 * :meth:`~pathlib.Path.move` (available on Python 3.14 or later)
241 * :meth:`~pathlib.Path.move_into` (available on Python 3.14 or later)
242 * :meth:`~pathlib.PurePath.relative_to` (the ``walk_up`` parameter is only available
243 on Python 3.12 or later)
244 * :meth:`~pathlib.Path.walk` (available on Python 3.12 or later)
246 Any methods that do disk I/O need to be awaited on. These methods are:
248 * :meth:`~pathlib.Path.absolute`
249 * :meth:`~pathlib.Path.chmod`
250 * :meth:`~pathlib.Path.cwd`
251 * :meth:`~pathlib.Path.exists`
252 * :meth:`~pathlib.Path.expanduser`
253 * :meth:`~pathlib.Path.group`
254 * :meth:`~pathlib.Path.hardlink_to`
255 * :meth:`~pathlib.Path.home`
256 * :meth:`~pathlib.Path.is_block_device`
257 * :meth:`~pathlib.Path.is_char_device`
258 * :meth:`~pathlib.Path.is_dir`
259 * :meth:`~pathlib.Path.is_fifo`
260 * :meth:`~pathlib.Path.is_file`
261 * :meth:`~pathlib.Path.is_junction`
262 * :meth:`~pathlib.Path.is_mount`
263 * :meth:`~pathlib.Path.is_socket`
264 * :meth:`~pathlib.Path.is_symlink`
265 * :meth:`~pathlib.Path.lchmod`
266 * :meth:`~pathlib.Path.lstat`
267 * :meth:`~pathlib.Path.mkdir`
268 * :meth:`~pathlib.Path.open`
269 * :meth:`~pathlib.Path.owner`
270 * :meth:`~pathlib.Path.read_bytes`
271 * :meth:`~pathlib.Path.read_text`
272 * :meth:`~pathlib.Path.readlink`
273 * :meth:`~pathlib.Path.rename`
274 * :meth:`~pathlib.Path.replace`
275 * :meth:`~pathlib.Path.resolve`
276 * :meth:`~pathlib.Path.rmdir`
277 * :meth:`~pathlib.Path.samefile`
278 * :meth:`~pathlib.Path.stat`
279 * :meth:`~pathlib.Path.symlink_to`
280 * :meth:`~pathlib.Path.touch`
281 * :meth:`~pathlib.Path.unlink`
282 * :meth:`~pathlib.Path.walk`
283 * :meth:`~pathlib.Path.write_bytes`
284 * :meth:`~pathlib.Path.write_text`
286 Additionally, the following methods return an async iterator yielding
287 :class:`~.Path` objects:
289 * :meth:`~pathlib.Path.glob`
290 * :meth:`~pathlib.Path.iterdir`
291 * :meth:`~pathlib.Path.rglob`
292 """
294 __slots__ = "_path", "__weakref__"
296 __weakref__: Any
298 def __init__(self, *args: str | PathLike[str]) -> None:
299 self._path: Final[pathlib.Path] = pathlib.Path(*args)
301 def __fspath__(self) -> str:
302 return self._path.__fspath__()
304 def __str__(self) -> str:
305 return self._path.__str__()
307 def __repr__(self) -> str:
308 return f"{self.__class__.__name__}({self.as_posix()!r})"
310 def __bytes__(self) -> bytes:
311 return self._path.__bytes__()
313 def __hash__(self) -> int:
314 return self._path.__hash__()
316 def __eq__(self, other: object) -> bool:
317 target = other._path if isinstance(other, Path) else other
318 return self._path.__eq__(target)
320 def __lt__(self, other: pathlib.PurePath | Path) -> bool:
321 target = other._path if isinstance(other, Path) else other
322 return self._path.__lt__(target)
324 def __le__(self, other: pathlib.PurePath | Path) -> bool:
325 target = other._path if isinstance(other, Path) else other
326 return self._path.__le__(target)
328 def __gt__(self, other: pathlib.PurePath | Path) -> bool:
329 target = other._path if isinstance(other, Path) else other
330 return self._path.__gt__(target)
332 def __ge__(self, other: pathlib.PurePath | Path) -> bool:
333 target = other._path if isinstance(other, Path) else other
334 return self._path.__ge__(target)
336 def __truediv__(self, other: str | PathLike[str]) -> Path:
337 return Path(self._path / other)
339 def __rtruediv__(self, other: str | PathLike[str]) -> Path:
340 return Path(other) / self
342 @property
343 def parts(self) -> tuple[str, ...]:
344 return self._path.parts
346 @property
347 def drive(self) -> str:
348 return self._path.drive
350 @property
351 def root(self) -> str:
352 return self._path.root
354 @property
355 def anchor(self) -> str:
356 return self._path.anchor
358 @property
359 def parents(self) -> Sequence[Path]:
360 return tuple(Path(p) for p in self._path.parents)
362 @property
363 def parent(self) -> Path:
364 return Path(self._path.parent)
366 @property
367 def name(self) -> str:
368 return self._path.name
370 @property
371 def suffix(self) -> str:
372 return self._path.suffix
374 @property
375 def suffixes(self) -> list[str]:
376 return self._path.suffixes
378 @property
379 def stem(self) -> str:
380 return self._path.stem
382 async def absolute(self) -> Path:
383 path = await to_thread.run_sync(self._path.absolute)
384 return Path(path)
386 def as_posix(self) -> str:
387 return self._path.as_posix()
389 def as_uri(self) -> str:
390 return self._path.as_uri()
392 if sys.version_info >= (3, 13):
393 parser: ClassVar[ModuleType] = pathlib.Path.parser
395 @classmethod
396 def from_uri(cls, uri: str) -> Path:
397 return Path(pathlib.Path.from_uri(uri))
399 def full_match(
400 self, path_pattern: str, *, case_sensitive: bool | None = None
401 ) -> bool:
402 return self._path.full_match(path_pattern, case_sensitive=case_sensitive)
404 def match(
405 self, path_pattern: str, *, case_sensitive: bool | None = None
406 ) -> bool:
407 return self._path.match(path_pattern, case_sensitive=case_sensitive)
408 else:
410 def match(self, path_pattern: str) -> bool:
411 return self._path.match(path_pattern)
413 if sys.version_info >= (3, 14):
415 @property
416 def info(self) -> Any: # TODO: add return type annotation when Typeshed gets it
417 return self._path.info
419 async def copy(
420 self,
421 target: str | os.PathLike[str],
422 *,
423 follow_symlinks: bool = True,
424 preserve_metadata: bool = False,
425 ) -> Path:
426 func = partial(
427 self._path.copy,
428 follow_symlinks=follow_symlinks,
429 preserve_metadata=preserve_metadata,
430 )
431 return Path(await to_thread.run_sync(func, pathlib.Path(target)))
433 async def copy_into(
434 self,
435 target_dir: str | os.PathLike[str],
436 *,
437 follow_symlinks: bool = True,
438 preserve_metadata: bool = False,
439 ) -> Path:
440 func = partial(
441 self._path.copy_into,
442 follow_symlinks=follow_symlinks,
443 preserve_metadata=preserve_metadata,
444 )
445 return Path(await to_thread.run_sync(func, pathlib.Path(target_dir)))
447 async def move(self, target: str | os.PathLike[str]) -> Path:
448 # Upstream does not handle anyio.Path properly as a PathLike
449 target = pathlib.Path(target)
450 return Path(await to_thread.run_sync(self._path.move, target))
452 async def move_into(
453 self,
454 target_dir: str | os.PathLike[str],
455 ) -> Path:
456 return Path(await to_thread.run_sync(self._path.move_into, target_dir))
458 def is_relative_to(self, other: str | PathLike[str]) -> bool:
459 try:
460 self.relative_to(other)
461 return True
462 except ValueError:
463 return False
465 async def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None:
466 func = partial(os.chmod, follow_symlinks=follow_symlinks)
467 return await to_thread.run_sync(func, self._path, mode)
469 @classmethod
470 async def cwd(cls) -> Path:
471 path = await to_thread.run_sync(pathlib.Path.cwd)
472 return cls(path)
474 async def exists(self) -> bool:
475 return await to_thread.run_sync(self._path.exists, abandon_on_cancel=True)
477 async def expanduser(self) -> Path:
478 return Path(
479 await to_thread.run_sync(self._path.expanduser, abandon_on_cancel=True)
480 )
482 def glob(self, pattern: str) -> AsyncIterator[Path]:
483 gen = self._path.glob(pattern)
484 return _PathIterator(gen)
486 async def group(self) -> str:
487 return await to_thread.run_sync(self._path.group, abandon_on_cancel=True)
489 async def hardlink_to(
490 self, target: str | bytes | PathLike[str] | PathLike[bytes]
491 ) -> None:
492 if isinstance(target, Path):
493 target = target._path
495 await to_thread.run_sync(os.link, target, self)
497 @classmethod
498 async def home(cls) -> Path:
499 home_path = await to_thread.run_sync(pathlib.Path.home)
500 return cls(home_path)
502 def is_absolute(self) -> bool:
503 return self._path.is_absolute()
505 async def is_block_device(self) -> bool:
506 return await to_thread.run_sync(
507 self._path.is_block_device, abandon_on_cancel=True
508 )
510 async def is_char_device(self) -> bool:
511 return await to_thread.run_sync(
512 self._path.is_char_device, abandon_on_cancel=True
513 )
515 async def is_dir(self) -> bool:
516 return await to_thread.run_sync(self._path.is_dir, abandon_on_cancel=True)
518 async def is_fifo(self) -> bool:
519 return await to_thread.run_sync(self._path.is_fifo, abandon_on_cancel=True)
521 async def is_file(self) -> bool:
522 return await to_thread.run_sync(self._path.is_file, abandon_on_cancel=True)
524 if sys.version_info >= (3, 12):
526 async def is_junction(self) -> bool:
527 return await to_thread.run_sync(self._path.is_junction)
529 async def is_mount(self) -> bool:
530 return await to_thread.run_sync(
531 os.path.ismount, self._path, abandon_on_cancel=True
532 )
534 def is_reserved(self) -> bool:
535 return self._path.is_reserved()
537 async def is_socket(self) -> bool:
538 return await to_thread.run_sync(self._path.is_socket, abandon_on_cancel=True)
540 async def is_symlink(self) -> bool:
541 return await to_thread.run_sync(self._path.is_symlink, abandon_on_cancel=True)
543 async def iterdir(self) -> AsyncIterator[Path]:
544 gen = (
545 self._path.iterdir()
546 if sys.version_info < (3, 13)
547 else await to_thread.run_sync(self._path.iterdir, abandon_on_cancel=True)
548 )
549 async for path in _PathIterator(gen):
550 yield path
552 def joinpath(self, *args: str | PathLike[str]) -> Path:
553 return Path(self._path.joinpath(*args))
555 async def lchmod(self, mode: int) -> None:
556 await to_thread.run_sync(self._path.lchmod, mode)
558 async def lstat(self) -> os.stat_result:
559 return await to_thread.run_sync(self._path.lstat, abandon_on_cancel=True)
561 async def mkdir(
562 self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False
563 ) -> None:
564 await to_thread.run_sync(self._path.mkdir, mode, parents, exist_ok)
566 @overload
567 async def open(
568 self,
569 mode: OpenBinaryMode,
570 buffering: int = ...,
571 encoding: str | None = ...,
572 errors: str | None = ...,
573 newline: str | None = ...,
574 ) -> AsyncFile[bytes]: ...
576 @overload
577 async def open(
578 self,
579 mode: OpenTextMode = ...,
580 buffering: int = ...,
581 encoding: str | None = ...,
582 errors: str | None = ...,
583 newline: str | None = ...,
584 ) -> AsyncFile[str]: ...
586 async def open(
587 self,
588 mode: str = "r",
589 buffering: int = -1,
590 encoding: str | None = None,
591 errors: str | None = None,
592 newline: str | None = None,
593 ) -> AsyncFile[Any]:
594 fp = await to_thread.run_sync(
595 self._path.open, mode, buffering, encoding, errors, newline
596 )
597 return AsyncFile(fp)
599 async def owner(self) -> str:
600 return await to_thread.run_sync(self._path.owner, abandon_on_cancel=True)
602 async def read_bytes(self) -> bytes:
603 return await to_thread.run_sync(self._path.read_bytes)
605 async def read_text(
606 self, encoding: str | None = None, errors: str | None = None
607 ) -> str:
608 return await to_thread.run_sync(self._path.read_text, encoding, errors)
610 if sys.version_info >= (3, 12):
612 def relative_to(
613 self, *other: str | PathLike[str], walk_up: bool = False
614 ) -> Path:
615 # relative_to() should work with any PathLike but it doesn't
616 others = [pathlib.Path(other) for other in other]
617 return Path(self._path.relative_to(*others, walk_up=walk_up))
619 else:
621 def relative_to(self, *other: str | PathLike[str]) -> Path:
622 return Path(self._path.relative_to(*other))
624 async def readlink(self) -> Path:
625 target = await to_thread.run_sync(os.readlink, self._path)
626 return Path(target)
628 async def rename(self, target: str | pathlib.PurePath | Path) -> Path:
629 if isinstance(target, Path):
630 target = target._path
632 await to_thread.run_sync(self._path.rename, target)
633 return Path(target)
635 async def replace(self, target: str | pathlib.PurePath | Path) -> Path:
636 if isinstance(target, Path):
637 target = target._path
639 await to_thread.run_sync(self._path.replace, target)
640 return Path(target)
642 async def resolve(self, strict: bool = False) -> Path:
643 func = partial(self._path.resolve, strict=strict)
644 return Path(await to_thread.run_sync(func, abandon_on_cancel=True))
646 def rglob(self, pattern: str) -> AsyncIterator[Path]:
647 gen = self._path.rglob(pattern)
648 return _PathIterator(gen)
650 async def rmdir(self) -> None:
651 await to_thread.run_sync(self._path.rmdir)
653 async def samefile(self, other_path: str | PathLike[str]) -> bool:
654 if isinstance(other_path, Path):
655 other_path = other_path._path
657 return await to_thread.run_sync(
658 self._path.samefile, other_path, abandon_on_cancel=True
659 )
661 async def stat(self, *, follow_symlinks: bool = True) -> os.stat_result:
662 func = partial(os.stat, follow_symlinks=follow_symlinks)
663 return await to_thread.run_sync(func, self._path, abandon_on_cancel=True)
665 async def symlink_to(
666 self,
667 target: str | bytes | PathLike[str] | PathLike[bytes],
668 target_is_directory: bool = False,
669 ) -> None:
670 if isinstance(target, Path):
671 target = target._path
673 await to_thread.run_sync(self._path.symlink_to, target, target_is_directory)
675 async def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None:
676 await to_thread.run_sync(self._path.touch, mode, exist_ok)
678 async def unlink(self, missing_ok: bool = False) -> None:
679 try:
680 await to_thread.run_sync(self._path.unlink)
681 except FileNotFoundError:
682 if not missing_ok:
683 raise
685 if sys.version_info >= (3, 12):
687 async def walk(
688 self,
689 top_down: bool = True,
690 on_error: Callable[[OSError], object] | None = None,
691 follow_symlinks: bool = False,
692 ) -> AsyncIterator[tuple[Path, list[str], list[str]]]:
693 def get_next_value() -> tuple[pathlib.Path, list[str], list[str]] | None:
694 try:
695 return next(gen)
696 except StopIteration:
697 return None
699 gen = self._path.walk(top_down, on_error, follow_symlinks)
700 while True:
701 value = await to_thread.run_sync(get_next_value)
702 if value is None:
703 return
705 root, dirs, paths = value
706 yield Path(root), dirs, paths
708 def with_name(self, name: str) -> Path:
709 return Path(self._path.with_name(name))
711 def with_stem(self, stem: str) -> Path:
712 return Path(self._path.with_name(stem + self._path.suffix))
714 def with_suffix(self, suffix: str) -> Path:
715 return Path(self._path.with_suffix(suffix))
717 def with_segments(self, *pathsegments: str | PathLike[str]) -> Path:
718 return Path(*pathsegments)
720 async def write_bytes(self, data: bytes) -> int:
721 return await to_thread.run_sync(self._path.write_bytes, data)
723 async def write_text(
724 self,
725 data: str,
726 encoding: str | None = None,
727 errors: str | None = None,
728 newline: str | None = None,
729 ) -> int:
730 # Path.write_text() does not support the "newline" parameter before Python 3.10
731 def sync_write_text() -> int:
732 with self._path.open(
733 "w", encoding=encoding, errors=errors, newline=newline
734 ) as fp:
735 return fp.write(data)
737 return await to_thread.run_sync(sync_write_text)
740PathLike.register(Path)