Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_fileio.py: 49%
290 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import os
2import pathlib
3import sys
4from dataclasses import dataclass
5from functools import partial
6from os import PathLike
7from typing import (
8 IO,
9 TYPE_CHECKING,
10 Any,
11 AnyStr,
12 AsyncIterator,
13 Callable,
14 Generic,
15 Iterable,
16 Iterator,
17 List,
18 Optional,
19 Sequence,
20 Tuple,
21 Union,
22 cast,
23 overload,
24)
26from .. import to_thread
27from ..abc import AsyncResource
29if sys.version_info >= (3, 8):
30 from typing import Final
31else:
32 from typing_extensions import Final
34if TYPE_CHECKING:
35 from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer
36else:
37 ReadableBuffer = OpenBinaryMode = OpenTextMode = WriteableBuffer = object
40class AsyncFile(AsyncResource, Generic[AnyStr]):
41 """
42 An asynchronous file object.
44 This class wraps a standard file object and provides async friendly versions of the following
45 blocking methods (where available on the original file object):
47 * read
48 * read1
49 * readline
50 * readlines
51 * readinto
52 * readinto1
53 * write
54 * writelines
55 * truncate
56 * seek
57 * tell
58 * flush
60 All other methods are directly passed through.
62 This class supports the asynchronous context manager protocol which closes the underlying file
63 at the end of the context block.
65 This class also supports asynchronous iteration::
67 async with await open_file(...) as f:
68 async for line in f:
69 print(line)
70 """
72 def __init__(self, fp: IO[AnyStr]) -> None:
73 self._fp: Any = fp
75 def __getattr__(self, name: str) -> object:
76 return getattr(self._fp, name)
78 @property
79 def wrapped(self) -> IO[AnyStr]:
80 """The wrapped file object."""
81 return self._fp
83 async def __aiter__(self) -> AsyncIterator[AnyStr]:
84 while True:
85 line = await self.readline()
86 if line:
87 yield line
88 else:
89 break
91 async def aclose(self) -> None:
92 return await to_thread.run_sync(self._fp.close)
94 async def read(self, size: int = -1) -> AnyStr:
95 return await to_thread.run_sync(self._fp.read, size)
97 async def read1(self: "AsyncFile[bytes]", size: int = -1) -> bytes:
98 return await to_thread.run_sync(self._fp.read1, size)
100 async def readline(self) -> AnyStr:
101 return await to_thread.run_sync(self._fp.readline)
103 async def readlines(self) -> List[AnyStr]:
104 return await to_thread.run_sync(self._fp.readlines)
106 async def readinto(self: "AsyncFile[bytes]", b: WriteableBuffer) -> bytes:
107 return await to_thread.run_sync(self._fp.readinto, b)
109 async def readinto1(self: "AsyncFile[bytes]", b: WriteableBuffer) -> bytes:
110 return await to_thread.run_sync(self._fp.readinto1, b)
112 @overload
113 async def write(self: "AsyncFile[bytes]", b: ReadableBuffer) -> int:
114 ...
116 @overload
117 async def write(self: "AsyncFile[str]", b: str) -> int:
118 ...
120 async def write(self, b: Union[ReadableBuffer, str]) -> int:
121 return await to_thread.run_sync(self._fp.write, b)
123 @overload
124 async def writelines(
125 self: "AsyncFile[bytes]", lines: Iterable[ReadableBuffer]
126 ) -> None:
127 ...
129 @overload
130 async def writelines(self: "AsyncFile[str]", lines: Iterable[str]) -> None:
131 ...
133 async def writelines(
134 self, lines: Union[Iterable[ReadableBuffer], Iterable[str]]
135 ) -> None:
136 return await to_thread.run_sync(self._fp.writelines, lines)
138 async def truncate(self, size: Optional[int] = None) -> int:
139 return await to_thread.run_sync(self._fp.truncate, size)
141 async def seek(self, offset: int, whence: Optional[int] = os.SEEK_SET) -> int:
142 return await to_thread.run_sync(self._fp.seek, offset, whence)
144 async def tell(self) -> int:
145 return await to_thread.run_sync(self._fp.tell)
147 async def flush(self) -> None:
148 return await to_thread.run_sync(self._fp.flush)
151@overload
152async def open_file(
153 file: Union[str, "PathLike[str]", int],
154 mode: OpenBinaryMode,
155 buffering: int = ...,
156 encoding: Optional[str] = ...,
157 errors: Optional[str] = ...,
158 newline: Optional[str] = ...,
159 closefd: bool = ...,
160 opener: Optional[Callable[[str, int], int]] = ...,
161) -> AsyncFile[bytes]:
162 ...
165@overload
166async def open_file(
167 file: Union[str, "PathLike[str]", int],
168 mode: OpenTextMode = ...,
169 buffering: int = ...,
170 encoding: Optional[str] = ...,
171 errors: Optional[str] = ...,
172 newline: Optional[str] = ...,
173 closefd: bool = ...,
174 opener: Optional[Callable[[str, int], int]] = ...,
175) -> AsyncFile[str]:
176 ...
179async def open_file(
180 file: Union[str, "PathLike[str]", int],
181 mode: str = "r",
182 buffering: int = -1,
183 encoding: Optional[str] = None,
184 errors: Optional[str] = None,
185 newline: Optional[str] = None,
186 closefd: bool = True,
187 opener: Optional[Callable[[str, int], int]] = None,
188) -> AsyncFile[Any]:
189 """
190 Open a file asynchronously.
192 The arguments are exactly the same as for the builtin :func:`open`.
194 :return: an asynchronous file object
196 """
197 fp = await to_thread.run_sync(
198 open, file, mode, buffering, encoding, errors, newline, closefd, opener
199 )
200 return AsyncFile(fp)
203def wrap_file(file: IO[AnyStr]) -> AsyncFile[AnyStr]:
204 """
205 Wrap an existing file as an asynchronous file.
207 :param file: an existing file-like object
208 :return: an asynchronous file object
210 """
211 return AsyncFile(file)
214@dataclass(eq=False)
215class _PathIterator(AsyncIterator["Path"]):
216 iterator: Iterator["PathLike[str]"]
218 async def __anext__(self) -> "Path":
219 nextval = await to_thread.run_sync(next, self.iterator, None, cancellable=True)
220 if nextval is None:
221 raise StopAsyncIteration from None
223 return Path(cast("PathLike[str]", nextval))
226class Path:
227 """
228 An asynchronous version of :class:`pathlib.Path`.
230 This class cannot be substituted for :class:`pathlib.Path` or :class:`pathlib.PurePath`, but
231 it is compatible with the :class:`os.PathLike` interface.
233 It implements the Python 3.10 version of :class:`pathlib.Path` interface, except for the
234 deprecated :meth:`~pathlib.Path.link_to` method.
236 Any methods that do disk I/O need to be awaited on. These methods are:
238 * :meth:`~pathlib.Path.absolute`
239 * :meth:`~pathlib.Path.chmod`
240 * :meth:`~pathlib.Path.cwd`
241 * :meth:`~pathlib.Path.exists`
242 * :meth:`~pathlib.Path.expanduser`
243 * :meth:`~pathlib.Path.group`
244 * :meth:`~pathlib.Path.hardlink_to`
245 * :meth:`~pathlib.Path.home`
246 * :meth:`~pathlib.Path.is_block_device`
247 * :meth:`~pathlib.Path.is_char_device`
248 * :meth:`~pathlib.Path.is_dir`
249 * :meth:`~pathlib.Path.is_fifo`
250 * :meth:`~pathlib.Path.is_file`
251 * :meth:`~pathlib.Path.is_mount`
252 * :meth:`~pathlib.Path.lchmod`
253 * :meth:`~pathlib.Path.lstat`
254 * :meth:`~pathlib.Path.mkdir`
255 * :meth:`~pathlib.Path.open`
256 * :meth:`~pathlib.Path.owner`
257 * :meth:`~pathlib.Path.read_bytes`
258 * :meth:`~pathlib.Path.read_text`
259 * :meth:`~pathlib.Path.readlink`
260 * :meth:`~pathlib.Path.rename`
261 * :meth:`~pathlib.Path.replace`
262 * :meth:`~pathlib.Path.rmdir`
263 * :meth:`~pathlib.Path.samefile`
264 * :meth:`~pathlib.Path.stat`
265 * :meth:`~pathlib.Path.touch`
266 * :meth:`~pathlib.Path.unlink`
267 * :meth:`~pathlib.Path.write_bytes`
268 * :meth:`~pathlib.Path.write_text`
270 Additionally, the following methods return an async iterator yielding :class:`~.Path` objects:
272 * :meth:`~pathlib.Path.glob`
273 * :meth:`~pathlib.Path.iterdir`
274 * :meth:`~pathlib.Path.rglob`
275 """
277 __slots__ = "_path", "__weakref__"
279 __weakref__: Any
281 def __init__(self, *args: Union[str, "PathLike[str]"]) -> None:
282 self._path: Final[pathlib.Path] = pathlib.Path(*args)
284 def __fspath__(self) -> str:
285 return self._path.__fspath__()
287 def __str__(self) -> str:
288 return self._path.__str__()
290 def __repr__(self) -> str:
291 return f"{self.__class__.__name__}({self.as_posix()!r})"
293 def __bytes__(self) -> bytes:
294 return self._path.__bytes__()
296 def __hash__(self) -> int:
297 return self._path.__hash__()
299 def __eq__(self, other: object) -> bool:
300 target = other._path if isinstance(other, Path) else other
301 return self._path.__eq__(target)
303 def __lt__(self, other: "Path") -> bool:
304 target = other._path if isinstance(other, Path) else other
305 return self._path.__lt__(target)
307 def __le__(self, other: "Path") -> bool:
308 target = other._path if isinstance(other, Path) else other
309 return self._path.__le__(target)
311 def __gt__(self, other: "Path") -> bool:
312 target = other._path if isinstance(other, Path) else other
313 return self._path.__gt__(target)
315 def __ge__(self, other: "Path") -> bool:
316 target = other._path if isinstance(other, Path) else other
317 return self._path.__ge__(target)
319 def __truediv__(self, other: Any) -> "Path":
320 return Path(self._path / other)
322 def __rtruediv__(self, other: Any) -> "Path":
323 return Path(other) / self
325 @property
326 def parts(self) -> Tuple[str, ...]:
327 return self._path.parts
329 @property
330 def drive(self) -> str:
331 return self._path.drive
333 @property
334 def root(self) -> str:
335 return self._path.root
337 @property
338 def anchor(self) -> str:
339 return self._path.anchor
341 @property
342 def parents(self) -> Sequence["Path"]:
343 return tuple(Path(p) for p in self._path.parents)
345 @property
346 def parent(self) -> "Path":
347 return Path(self._path.parent)
349 @property
350 def name(self) -> str:
351 return self._path.name
353 @property
354 def suffix(self) -> str:
355 return self._path.suffix
357 @property
358 def suffixes(self) -> List[str]:
359 return self._path.suffixes
361 @property
362 def stem(self) -> str:
363 return self._path.stem
365 async def absolute(self) -> "Path":
366 path = await to_thread.run_sync(self._path.absolute)
367 return Path(path)
369 def as_posix(self) -> str:
370 return self._path.as_posix()
372 def as_uri(self) -> str:
373 return self._path.as_uri()
375 def match(self, path_pattern: str) -> bool:
376 return self._path.match(path_pattern)
378 def is_relative_to(self, *other: Union[str, "PathLike[str]"]) -> bool:
379 try:
380 self.relative_to(*other)
381 return True
382 except ValueError:
383 return False
385 async def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None:
386 func = partial(os.chmod, follow_symlinks=follow_symlinks)
387 return await to_thread.run_sync(func, self._path, mode)
389 @classmethod
390 async def cwd(cls) -> "Path":
391 path = await to_thread.run_sync(pathlib.Path.cwd)
392 return cls(path)
394 async def exists(self) -> bool:
395 return await to_thread.run_sync(self._path.exists, cancellable=True)
397 async def expanduser(self) -> "Path":
398 return Path(await to_thread.run_sync(self._path.expanduser, cancellable=True))
400 def glob(self, pattern: str) -> AsyncIterator["Path"]:
401 gen = self._path.glob(pattern)
402 return _PathIterator(gen)
404 async def group(self) -> str:
405 return await to_thread.run_sync(self._path.group, cancellable=True)
407 async def hardlink_to(self, target: Union[str, pathlib.Path, "Path"]) -> None:
408 if isinstance(target, Path):
409 target = target._path
411 await to_thread.run_sync(os.link, target, self)
413 @classmethod
414 async def home(cls) -> "Path":
415 home_path = await to_thread.run_sync(pathlib.Path.home)
416 return cls(home_path)
418 def is_absolute(self) -> bool:
419 return self._path.is_absolute()
421 async def is_block_device(self) -> bool:
422 return await to_thread.run_sync(self._path.is_block_device, cancellable=True)
424 async def is_char_device(self) -> bool:
425 return await to_thread.run_sync(self._path.is_char_device, cancellable=True)
427 async def is_dir(self) -> bool:
428 return await to_thread.run_sync(self._path.is_dir, cancellable=True)
430 async def is_fifo(self) -> bool:
431 return await to_thread.run_sync(self._path.is_fifo, cancellable=True)
433 async def is_file(self) -> bool:
434 return await to_thread.run_sync(self._path.is_file, cancellable=True)
436 async def is_mount(self) -> bool:
437 return await to_thread.run_sync(os.path.ismount, self._path, cancellable=True)
439 def is_reserved(self) -> bool:
440 return self._path.is_reserved()
442 async def is_socket(self) -> bool:
443 return await to_thread.run_sync(self._path.is_socket, cancellable=True)
445 async def is_symlink(self) -> bool:
446 return await to_thread.run_sync(self._path.is_symlink, cancellable=True)
448 def iterdir(self) -> AsyncIterator["Path"]:
449 gen = self._path.iterdir()
450 return _PathIterator(gen)
452 def joinpath(self, *args: Union[str, "PathLike[str]"]) -> "Path":
453 return Path(self._path.joinpath(*args))
455 async def lchmod(self, mode: int) -> None:
456 await to_thread.run_sync(self._path.lchmod, mode)
458 async def lstat(self) -> os.stat_result:
459 return await to_thread.run_sync(self._path.lstat, cancellable=True)
461 async def mkdir(
462 self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False
463 ) -> None:
464 await to_thread.run_sync(self._path.mkdir, mode, parents, exist_ok)
466 @overload
467 async def open(
468 self,
469 mode: OpenBinaryMode,
470 buffering: int = ...,
471 encoding: Optional[str] = ...,
472 errors: Optional[str] = ...,
473 newline: Optional[str] = ...,
474 ) -> AsyncFile[bytes]:
475 ...
477 @overload
478 async def open(
479 self,
480 mode: OpenTextMode = ...,
481 buffering: int = ...,
482 encoding: Optional[str] = ...,
483 errors: Optional[str] = ...,
484 newline: Optional[str] = ...,
485 ) -> AsyncFile[str]:
486 ...
488 async def open(
489 self,
490 mode: str = "r",
491 buffering: int = -1,
492 encoding: Optional[str] = None,
493 errors: Optional[str] = None,
494 newline: Optional[str] = None,
495 ) -> AsyncFile[Any]:
496 fp = await to_thread.run_sync(
497 self._path.open, mode, buffering, encoding, errors, newline
498 )
499 return AsyncFile(fp)
501 async def owner(self) -> str:
502 return await to_thread.run_sync(self._path.owner, cancellable=True)
504 async def read_bytes(self) -> bytes:
505 return await to_thread.run_sync(self._path.read_bytes)
507 async def read_text(
508 self, encoding: Optional[str] = None, errors: Optional[str] = None
509 ) -> str:
510 return await to_thread.run_sync(self._path.read_text, encoding, errors)
512 def relative_to(self, *other: Union[str, "PathLike[str]"]) -> "Path":
513 return Path(self._path.relative_to(*other))
515 async def readlink(self) -> "Path":
516 target = await to_thread.run_sync(os.readlink, self._path)
517 return Path(cast(str, target))
519 async def rename(self, target: Union[str, pathlib.PurePath, "Path"]) -> "Path":
520 if isinstance(target, Path):
521 target = target._path
523 await to_thread.run_sync(self._path.rename, target)
524 return Path(target)
526 async def replace(self, target: Union[str, pathlib.PurePath, "Path"]) -> "Path":
527 if isinstance(target, Path):
528 target = target._path
530 await to_thread.run_sync(self._path.replace, target)
531 return Path(target)
533 async def resolve(self, strict: bool = False) -> "Path":
534 func = partial(self._path.resolve, strict=strict)
535 return Path(await to_thread.run_sync(func, cancellable=True))
537 def rglob(self, pattern: str) -> AsyncIterator["Path"]:
538 gen = self._path.rglob(pattern)
539 return _PathIterator(gen)
541 async def rmdir(self) -> None:
542 await to_thread.run_sync(self._path.rmdir)
544 async def samefile(
545 self, other_path: Union[str, bytes, int, pathlib.Path, "Path"]
546 ) -> bool:
547 if isinstance(other_path, Path):
548 other_path = other_path._path
550 return await to_thread.run_sync(
551 self._path.samefile, other_path, cancellable=True
552 )
554 async def stat(self, *, follow_symlinks: bool = True) -> os.stat_result:
555 func = partial(os.stat, follow_symlinks=follow_symlinks)
556 return await to_thread.run_sync(func, self._path, cancellable=True)
558 async def symlink_to(
559 self,
560 target: Union[str, pathlib.Path, "Path"],
561 target_is_directory: bool = False,
562 ) -> None:
563 if isinstance(target, Path):
564 target = target._path
566 await to_thread.run_sync(self._path.symlink_to, target, target_is_directory)
568 async def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None:
569 await to_thread.run_sync(self._path.touch, mode, exist_ok)
571 async def unlink(self, missing_ok: bool = False) -> None:
572 try:
573 await to_thread.run_sync(self._path.unlink)
574 except FileNotFoundError:
575 if not missing_ok:
576 raise
578 def with_name(self, name: str) -> "Path":
579 return Path(self._path.with_name(name))
581 def with_stem(self, stem: str) -> "Path":
582 return Path(self._path.with_name(stem + self._path.suffix))
584 def with_suffix(self, suffix: str) -> "Path":
585 return Path(self._path.with_suffix(suffix))
587 async def write_bytes(self, data: bytes) -> int:
588 return await to_thread.run_sync(self._path.write_bytes, data)
590 async def write_text(
591 self,
592 data: str,
593 encoding: Optional[str] = None,
594 errors: Optional[str] = None,
595 newline: Optional[str] = None,
596 ) -> int:
597 # Path.write_text() does not support the "newline" parameter before Python 3.10
598 def sync_write_text() -> int:
599 with self._path.open(
600 "w", encoding=encoding, errors=errors, newline=newline
601 ) as fp:
602 return fp.write(data)
604 return await to_thread.run_sync(sync_write_text)
607PathLike.register(Path)