1from __future__ import annotations
2
3import os
4import pathlib
5import sys
6from collections.abc import Callable, Iterable, Iterator, Sequence
7from dataclasses import dataclass
8from functools import partial
9from os import PathLike
10from typing import (
11 IO,
12 TYPE_CHECKING,
13 Any,
14 AnyStr,
15 AsyncIterator,
16 Final,
17 Generic,
18 overload,
19)
20
21from .. import to_thread
22from ..abc import AsyncResource
23
24if TYPE_CHECKING:
25 from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer
26else:
27 ReadableBuffer = OpenBinaryMode = OpenTextMode = WriteableBuffer = object
28
29
30class AsyncFile(AsyncResource, Generic[AnyStr]):
31 """
32 An asynchronous file object.
33
34 This class wraps a standard file object and provides async friendly versions of the
35 following blocking methods (where available on the original file object):
36
37 * read
38 * read1
39 * readline
40 * readlines
41 * readinto
42 * readinto1
43 * write
44 * writelines
45 * truncate
46 * seek
47 * tell
48 * flush
49
50 All other methods are directly passed through.
51
52 This class supports the asynchronous context manager protocol which closes the
53 underlying file at the end of the context block.
54
55 This class also supports asynchronous iteration::
56
57 async with await open_file(...) as f:
58 async for line in f:
59 print(line)
60 """
61
62 def __init__(self, fp: IO[AnyStr]) -> None:
63 self._fp: Any = fp
64
65 def __getattr__(self, name: str) -> object:
66 return getattr(self._fp, name)
67
68 @property
69 def wrapped(self) -> IO[AnyStr]:
70 """The wrapped file object."""
71 return self._fp
72
73 async def __aiter__(self) -> AsyncIterator[AnyStr]:
74 while True:
75 line = await self.readline()
76 if line:
77 yield line
78 else:
79 break
80
81 async def aclose(self) -> None:
82 return await to_thread.run_sync(self._fp.close)
83
84 async def read(self, size: int = -1) -> AnyStr:
85 return await to_thread.run_sync(self._fp.read, size)
86
87 async def read1(self: AsyncFile[bytes], size: int = -1) -> bytes:
88 return await to_thread.run_sync(self._fp.read1, size)
89
90 async def readline(self) -> AnyStr:
91 return await to_thread.run_sync(self._fp.readline)
92
93 async def readlines(self) -> list[AnyStr]:
94 return await to_thread.run_sync(self._fp.readlines)
95
96 async def readinto(self: AsyncFile[bytes], b: WriteableBuffer) -> bytes:
97 return await to_thread.run_sync(self._fp.readinto, b)
98
99 async def readinto1(self: AsyncFile[bytes], b: WriteableBuffer) -> bytes:
100 return await to_thread.run_sync(self._fp.readinto1, b)
101
102 @overload
103 async def write(self: AsyncFile[bytes], b: ReadableBuffer) -> int: ...
104
105 @overload
106 async def write(self: AsyncFile[str], b: str) -> int: ...
107
108 async def write(self, b: ReadableBuffer | str) -> int:
109 return await to_thread.run_sync(self._fp.write, b)
110
111 @overload
112 async def writelines(
113 self: AsyncFile[bytes], lines: Iterable[ReadableBuffer]
114 ) -> None: ...
115
116 @overload
117 async def writelines(self: AsyncFile[str], lines: Iterable[str]) -> None: ...
118
119 async def writelines(self, lines: Iterable[ReadableBuffer] | Iterable[str]) -> None:
120 return await to_thread.run_sync(self._fp.writelines, lines)
121
122 async def truncate(self, size: int | None = None) -> int:
123 return await to_thread.run_sync(self._fp.truncate, size)
124
125 async def seek(self, offset: int, whence: int | None = os.SEEK_SET) -> int:
126 return await to_thread.run_sync(self._fp.seek, offset, whence)
127
128 async def tell(self) -> int:
129 return await to_thread.run_sync(self._fp.tell)
130
131 async def flush(self) -> None:
132 return await to_thread.run_sync(self._fp.flush)
133
134
135@overload
136async def open_file(
137 file: str | PathLike[str] | int,
138 mode: OpenBinaryMode,
139 buffering: int = ...,
140 encoding: str | None = ...,
141 errors: str | None = ...,
142 newline: str | None = ...,
143 closefd: bool = ...,
144 opener: Callable[[str, int], int] | None = ...,
145) -> AsyncFile[bytes]: ...
146
147
148@overload
149async def open_file(
150 file: str | PathLike[str] | int,
151 mode: OpenTextMode = ...,
152 buffering: int = ...,
153 encoding: str | None = ...,
154 errors: str | None = ...,
155 newline: str | None = ...,
156 closefd: bool = ...,
157 opener: Callable[[str, int], int] | None = ...,
158) -> AsyncFile[str]: ...
159
160
161async def open_file(
162 file: str | PathLike[str] | int,
163 mode: str = "r",
164 buffering: int = -1,
165 encoding: str | None = None,
166 errors: str | None = None,
167 newline: str | None = None,
168 closefd: bool = True,
169 opener: Callable[[str, int], int] | None = None,
170) -> AsyncFile[Any]:
171 """
172 Open a file asynchronously.
173
174 The arguments are exactly the same as for the builtin :func:`open`.
175
176 :return: an asynchronous file object
177
178 """
179 fp = await to_thread.run_sync(
180 open, file, mode, buffering, encoding, errors, newline, closefd, opener
181 )
182 return AsyncFile(fp)
183
184
185def wrap_file(file: IO[AnyStr]) -> AsyncFile[AnyStr]:
186 """
187 Wrap an existing file as an asynchronous file.
188
189 :param file: an existing file-like object
190 :return: an asynchronous file object
191
192 """
193 return AsyncFile(file)
194
195
196@dataclass(eq=False)
197class _PathIterator(AsyncIterator["Path"]):
198 iterator: Iterator[PathLike[str]]
199
200 async def __anext__(self) -> Path:
201 nextval = await to_thread.run_sync(
202 next, self.iterator, None, abandon_on_cancel=True
203 )
204 if nextval is None:
205 raise StopAsyncIteration from None
206
207 return Path(nextval)
208
209
210class Path:
211 """
212 An asynchronous version of :class:`pathlib.Path`.
213
214 This class cannot be substituted for :class:`pathlib.Path` or
215 :class:`pathlib.PurePath`, but it is compatible with the :class:`os.PathLike`
216 interface.
217
218 It implements the Python 3.10 version of :class:`pathlib.Path` interface, except for
219 the deprecated :meth:`~pathlib.Path.link_to` method.
220
221 Any methods that do disk I/O need to be awaited on. These methods are:
222
223 * :meth:`~pathlib.Path.absolute`
224 * :meth:`~pathlib.Path.chmod`
225 * :meth:`~pathlib.Path.cwd`
226 * :meth:`~pathlib.Path.exists`
227 * :meth:`~pathlib.Path.expanduser`
228 * :meth:`~pathlib.Path.group`
229 * :meth:`~pathlib.Path.hardlink_to`
230 * :meth:`~pathlib.Path.home`
231 * :meth:`~pathlib.Path.is_block_device`
232 * :meth:`~pathlib.Path.is_char_device`
233 * :meth:`~pathlib.Path.is_dir`
234 * :meth:`~pathlib.Path.is_fifo`
235 * :meth:`~pathlib.Path.is_file`
236 * :meth:`~pathlib.Path.is_mount`
237 * :meth:`~pathlib.Path.lchmod`
238 * :meth:`~pathlib.Path.lstat`
239 * :meth:`~pathlib.Path.mkdir`
240 * :meth:`~pathlib.Path.open`
241 * :meth:`~pathlib.Path.owner`
242 * :meth:`~pathlib.Path.read_bytes`
243 * :meth:`~pathlib.Path.read_text`
244 * :meth:`~pathlib.Path.readlink`
245 * :meth:`~pathlib.Path.rename`
246 * :meth:`~pathlib.Path.replace`
247 * :meth:`~pathlib.Path.rmdir`
248 * :meth:`~pathlib.Path.samefile`
249 * :meth:`~pathlib.Path.stat`
250 * :meth:`~pathlib.Path.touch`
251 * :meth:`~pathlib.Path.unlink`
252 * :meth:`~pathlib.Path.write_bytes`
253 * :meth:`~pathlib.Path.write_text`
254
255 Additionally, the following methods return an async iterator yielding
256 :class:`~.Path` objects:
257
258 * :meth:`~pathlib.Path.glob`
259 * :meth:`~pathlib.Path.iterdir`
260 * :meth:`~pathlib.Path.rglob`
261 """
262
263 __slots__ = "_path", "__weakref__"
264
265 __weakref__: Any
266
267 def __init__(self, *args: str | PathLike[str]) -> None:
268 self._path: Final[pathlib.Path] = pathlib.Path(*args)
269
270 def __fspath__(self) -> str:
271 return self._path.__fspath__()
272
273 def __str__(self) -> str:
274 return self._path.__str__()
275
276 def __repr__(self) -> str:
277 return f"{self.__class__.__name__}({self.as_posix()!r})"
278
279 def __bytes__(self) -> bytes:
280 return self._path.__bytes__()
281
282 def __hash__(self) -> int:
283 return self._path.__hash__()
284
285 def __eq__(self, other: object) -> bool:
286 target = other._path if isinstance(other, Path) else other
287 return self._path.__eq__(target)
288
289 def __lt__(self, other: pathlib.PurePath | Path) -> bool:
290 target = other._path if isinstance(other, Path) else other
291 return self._path.__lt__(target)
292
293 def __le__(self, other: pathlib.PurePath | Path) -> bool:
294 target = other._path if isinstance(other, Path) else other
295 return self._path.__le__(target)
296
297 def __gt__(self, other: pathlib.PurePath | Path) -> bool:
298 target = other._path if isinstance(other, Path) else other
299 return self._path.__gt__(target)
300
301 def __ge__(self, other: pathlib.PurePath | Path) -> bool:
302 target = other._path if isinstance(other, Path) else other
303 return self._path.__ge__(target)
304
305 def __truediv__(self, other: str | PathLike[str]) -> Path:
306 return Path(self._path / other)
307
308 def __rtruediv__(self, other: str | PathLike[str]) -> Path:
309 return Path(other) / self
310
311 @property
312 def parts(self) -> tuple[str, ...]:
313 return self._path.parts
314
315 @property
316 def drive(self) -> str:
317 return self._path.drive
318
319 @property
320 def root(self) -> str:
321 return self._path.root
322
323 @property
324 def anchor(self) -> str:
325 return self._path.anchor
326
327 @property
328 def parents(self) -> Sequence[Path]:
329 return tuple(Path(p) for p in self._path.parents)
330
331 @property
332 def parent(self) -> Path:
333 return Path(self._path.parent)
334
335 @property
336 def name(self) -> str:
337 return self._path.name
338
339 @property
340 def suffix(self) -> str:
341 return self._path.suffix
342
343 @property
344 def suffixes(self) -> list[str]:
345 return self._path.suffixes
346
347 @property
348 def stem(self) -> str:
349 return self._path.stem
350
351 async def absolute(self) -> Path:
352 path = await to_thread.run_sync(self._path.absolute)
353 return Path(path)
354
355 def as_posix(self) -> str:
356 return self._path.as_posix()
357
358 def as_uri(self) -> str:
359 return self._path.as_uri()
360
361 if sys.version_info >= (3, 13):
362 parser = pathlib.Path.parser
363
364 @classmethod
365 def from_uri(cls, uri: str) -> Path:
366 return Path(pathlib.Path.from_uri(uri))
367
368 def full_match(
369 self, path_pattern: str, *, case_sensitive: bool | None = None
370 ) -> bool:
371 return self._path.full_match(path_pattern, case_sensitive=case_sensitive)
372
373 def match(
374 self, path_pattern: str, *, case_sensitive: bool | None = None
375 ) -> bool:
376 return self._path.match(path_pattern, case_sensitive=case_sensitive)
377 else:
378
379 def match(self, path_pattern: str) -> bool:
380 return self._path.match(path_pattern)
381
382 def is_relative_to(self, other: str | PathLike[str]) -> bool:
383 try:
384 self.relative_to(other)
385 return True
386 except ValueError:
387 return False
388
389 async def is_junction(self) -> bool:
390 return await to_thread.run_sync(self._path.is_junction)
391
392 async def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None:
393 func = partial(os.chmod, follow_symlinks=follow_symlinks)
394 return await to_thread.run_sync(func, self._path, mode)
395
396 @classmethod
397 async def cwd(cls) -> Path:
398 path = await to_thread.run_sync(pathlib.Path.cwd)
399 return cls(path)
400
401 async def exists(self) -> bool:
402 return await to_thread.run_sync(self._path.exists, abandon_on_cancel=True)
403
404 async def expanduser(self) -> Path:
405 return Path(
406 await to_thread.run_sync(self._path.expanduser, abandon_on_cancel=True)
407 )
408
409 def glob(self, pattern: str) -> AsyncIterator[Path]:
410 gen = self._path.glob(pattern)
411 return _PathIterator(gen)
412
413 async def group(self) -> str:
414 return await to_thread.run_sync(self._path.group, abandon_on_cancel=True)
415
416 async def hardlink_to(
417 self, target: str | bytes | PathLike[str] | PathLike[bytes]
418 ) -> None:
419 if isinstance(target, Path):
420 target = target._path
421
422 await to_thread.run_sync(os.link, target, self)
423
424 @classmethod
425 async def home(cls) -> Path:
426 home_path = await to_thread.run_sync(pathlib.Path.home)
427 return cls(home_path)
428
429 def is_absolute(self) -> bool:
430 return self._path.is_absolute()
431
432 async def is_block_device(self) -> bool:
433 return await to_thread.run_sync(
434 self._path.is_block_device, abandon_on_cancel=True
435 )
436
437 async def is_char_device(self) -> bool:
438 return await to_thread.run_sync(
439 self._path.is_char_device, abandon_on_cancel=True
440 )
441
442 async def is_dir(self) -> bool:
443 return await to_thread.run_sync(self._path.is_dir, abandon_on_cancel=True)
444
445 async def is_fifo(self) -> bool:
446 return await to_thread.run_sync(self._path.is_fifo, abandon_on_cancel=True)
447
448 async def is_file(self) -> bool:
449 return await to_thread.run_sync(self._path.is_file, abandon_on_cancel=True)
450
451 async def is_mount(self) -> bool:
452 return await to_thread.run_sync(
453 os.path.ismount, self._path, abandon_on_cancel=True
454 )
455
456 def is_reserved(self) -> bool:
457 return self._path.is_reserved()
458
459 async def is_socket(self) -> bool:
460 return await to_thread.run_sync(self._path.is_socket, abandon_on_cancel=True)
461
462 async def is_symlink(self) -> bool:
463 return await to_thread.run_sync(self._path.is_symlink, abandon_on_cancel=True)
464
465 def iterdir(self) -> AsyncIterator[Path]:
466 gen = self._path.iterdir()
467 return _PathIterator(gen)
468
469 def joinpath(self, *args: str | PathLike[str]) -> Path:
470 return Path(self._path.joinpath(*args))
471
472 async def lchmod(self, mode: int) -> None:
473 await to_thread.run_sync(self._path.lchmod, mode)
474
475 async def lstat(self) -> os.stat_result:
476 return await to_thread.run_sync(self._path.lstat, abandon_on_cancel=True)
477
478 async def mkdir(
479 self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False
480 ) -> None:
481 await to_thread.run_sync(self._path.mkdir, mode, parents, exist_ok)
482
483 @overload
484 async def open(
485 self,
486 mode: OpenBinaryMode,
487 buffering: int = ...,
488 encoding: str | None = ...,
489 errors: str | None = ...,
490 newline: str | None = ...,
491 ) -> AsyncFile[bytes]: ...
492
493 @overload
494 async def open(
495 self,
496 mode: OpenTextMode = ...,
497 buffering: int = ...,
498 encoding: str | None = ...,
499 errors: str | None = ...,
500 newline: str | None = ...,
501 ) -> AsyncFile[str]: ...
502
503 async def open(
504 self,
505 mode: str = "r",
506 buffering: int = -1,
507 encoding: str | None = None,
508 errors: str | None = None,
509 newline: str | None = None,
510 ) -> AsyncFile[Any]:
511 fp = await to_thread.run_sync(
512 self._path.open, mode, buffering, encoding, errors, newline
513 )
514 return AsyncFile(fp)
515
516 async def owner(self) -> str:
517 return await to_thread.run_sync(self._path.owner, abandon_on_cancel=True)
518
519 async def read_bytes(self) -> bytes:
520 return await to_thread.run_sync(self._path.read_bytes)
521
522 async def read_text(
523 self, encoding: str | None = None, errors: str | None = None
524 ) -> str:
525 return await to_thread.run_sync(self._path.read_text, encoding, errors)
526
527 if sys.version_info >= (3, 12):
528
529 def relative_to(
530 self, *other: str | PathLike[str], walk_up: bool = False
531 ) -> Path:
532 return Path(self._path.relative_to(*other, walk_up=walk_up))
533
534 else:
535
536 def relative_to(self, *other: str | PathLike[str]) -> Path:
537 return Path(self._path.relative_to(*other))
538
539 async def readlink(self) -> Path:
540 target = await to_thread.run_sync(os.readlink, self._path)
541 return Path(target)
542
543 async def rename(self, target: str | pathlib.PurePath | Path) -> Path:
544 if isinstance(target, Path):
545 target = target._path
546
547 await to_thread.run_sync(self._path.rename, target)
548 return Path(target)
549
550 async def replace(self, target: str | pathlib.PurePath | Path) -> Path:
551 if isinstance(target, Path):
552 target = target._path
553
554 await to_thread.run_sync(self._path.replace, target)
555 return Path(target)
556
557 async def resolve(self, strict: bool = False) -> Path:
558 func = partial(self._path.resolve, strict=strict)
559 return Path(await to_thread.run_sync(func, abandon_on_cancel=True))
560
561 def rglob(self, pattern: str) -> AsyncIterator[Path]:
562 gen = self._path.rglob(pattern)
563 return _PathIterator(gen)
564
565 async def rmdir(self) -> None:
566 await to_thread.run_sync(self._path.rmdir)
567
568 async def samefile(self, other_path: str | PathLike[str]) -> bool:
569 if isinstance(other_path, Path):
570 other_path = other_path._path
571
572 return await to_thread.run_sync(
573 self._path.samefile, other_path, abandon_on_cancel=True
574 )
575
576 async def stat(self, *, follow_symlinks: bool = True) -> os.stat_result:
577 func = partial(os.stat, follow_symlinks=follow_symlinks)
578 return await to_thread.run_sync(func, self._path, abandon_on_cancel=True)
579
580 async def symlink_to(
581 self,
582 target: str | bytes | PathLike[str] | PathLike[bytes],
583 target_is_directory: bool = False,
584 ) -> None:
585 if isinstance(target, Path):
586 target = target._path
587
588 await to_thread.run_sync(self._path.symlink_to, target, target_is_directory)
589
590 async def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None:
591 await to_thread.run_sync(self._path.touch, mode, exist_ok)
592
593 async def unlink(self, missing_ok: bool = False) -> None:
594 try:
595 await to_thread.run_sync(self._path.unlink)
596 except FileNotFoundError:
597 if not missing_ok:
598 raise
599
600 if sys.version_info >= (3, 12):
601
602 async def walk(
603 self,
604 top_down: bool = True,
605 on_error: Callable[[OSError], object] | None = None,
606 follow_symlinks: bool = False,
607 ) -> AsyncIterator[tuple[Path, list[str], list[str]]]:
608 def get_next_value() -> tuple[pathlib.Path, list[str], list[str]] | None:
609 try:
610 return next(gen)
611 except StopIteration:
612 return None
613
614 gen = self._path.walk(top_down, on_error, follow_symlinks)
615 while True:
616 value = await to_thread.run_sync(get_next_value)
617 if value is None:
618 return
619
620 root, dirs, paths = value
621 yield Path(root), dirs, paths
622
623 def with_name(self, name: str) -> Path:
624 return Path(self._path.with_name(name))
625
626 def with_stem(self, stem: str) -> Path:
627 return Path(self._path.with_name(stem + self._path.suffix))
628
629 def with_suffix(self, suffix: str) -> Path:
630 return Path(self._path.with_suffix(suffix))
631
632 def with_segments(self, *pathsegments: str | PathLike[str]) -> Path:
633 return Path(*pathsegments)
634
635 async def write_bytes(self, data: bytes) -> int:
636 return await to_thread.run_sync(self._path.write_bytes, data)
637
638 async def write_text(
639 self,
640 data: str,
641 encoding: str | None = None,
642 errors: str | None = None,
643 newline: str | None = None,
644 ) -> int:
645 # Path.write_text() does not support the "newline" parameter before Python 3.10
646 def sync_write_text() -> int:
647 with self._path.open(
648 "w", encoding=encoding, errors=errors, newline=newline
649 ) as fp:
650 return fp.write(data)
651
652 return await to_thread.run_sync(sync_write_text)
653
654
655PathLike.register(Path)