1from __future__ import annotations
2
3import os
4import pathlib
5import sys
6from collections.abc import Callable, Iterable, Iterator, Sequence
7from dataclasses import dataclass
8from functools import partial
9from os import PathLike
10from typing import (
11 IO,
12 TYPE_CHECKING,
13 Any,
14 AnyStr,
15 AsyncIterator,
16 Final,
17 Generic,
18 overload,
19)
20
21from .. import to_thread
22from ..abc import AsyncResource
23
24if TYPE_CHECKING:
25 from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer
26else:
27 ReadableBuffer = OpenBinaryMode = OpenTextMode = WriteableBuffer = object
28
29
30class AsyncFile(AsyncResource, Generic[AnyStr]):
31 """
32 An asynchronous file object.
33
34 This class wraps a standard file object and provides async friendly versions of the
35 following blocking methods (where available on the original file object):
36
37 * read
38 * read1
39 * readline
40 * readlines
41 * readinto
42 * readinto1
43 * write
44 * writelines
45 * truncate
46 * seek
47 * tell
48 * flush
49
50 All other methods are directly passed through.
51
52 This class supports the asynchronous context manager protocol which closes the
53 underlying file at the end of the context block.
54
55 This class also supports asynchronous iteration::
56
57 async with await open_file(...) as f:
58 async for line in f:
59 print(line)
60 """
61
62 def __init__(self, fp: IO[AnyStr]) -> None:
63 self._fp: Any = fp
64
65 def __getattr__(self, name: str) -> object:
66 return getattr(self._fp, name)
67
68 @property
69 def wrapped(self) -> IO[AnyStr]:
70 """The wrapped file object."""
71 return self._fp
72
73 async def __aiter__(self) -> AsyncIterator[AnyStr]:
74 while True:
75 line = await self.readline()
76 if line:
77 yield line
78 else:
79 break
80
81 async def aclose(self) -> None:
82 return await to_thread.run_sync(self._fp.close)
83
84 async def read(self, size: int = -1) -> AnyStr:
85 return await to_thread.run_sync(self._fp.read, size)
86
87 async def read1(self: AsyncFile[bytes], size: int = -1) -> bytes:
88 return await to_thread.run_sync(self._fp.read1, size)
89
90 async def readline(self) -> AnyStr:
91 return await to_thread.run_sync(self._fp.readline)
92
93 async def readlines(self) -> list[AnyStr]:
94 return await to_thread.run_sync(self._fp.readlines)
95
96 async def readinto(self: AsyncFile[bytes], b: WriteableBuffer) -> bytes:
97 return await to_thread.run_sync(self._fp.readinto, b)
98
99 async def readinto1(self: AsyncFile[bytes], b: WriteableBuffer) -> bytes:
100 return await to_thread.run_sync(self._fp.readinto1, b)
101
102 @overload
103 async def write(self: AsyncFile[bytes], b: ReadableBuffer) -> int:
104 ...
105
106 @overload
107 async def write(self: AsyncFile[str], b: str) -> int:
108 ...
109
110 async def write(self, b: ReadableBuffer | str) -> int:
111 return await to_thread.run_sync(self._fp.write, b)
112
113 @overload
114 async def writelines(
115 self: AsyncFile[bytes], lines: Iterable[ReadableBuffer]
116 ) -> None:
117 ...
118
119 @overload
120 async def writelines(self: AsyncFile[str], lines: Iterable[str]) -> None:
121 ...
122
123 async def writelines(self, lines: Iterable[ReadableBuffer] | Iterable[str]) -> None:
124 return await to_thread.run_sync(self._fp.writelines, lines)
125
126 async def truncate(self, size: int | None = None) -> int:
127 return await to_thread.run_sync(self._fp.truncate, size)
128
129 async def seek(self, offset: int, whence: int | None = os.SEEK_SET) -> int:
130 return await to_thread.run_sync(self._fp.seek, offset, whence)
131
132 async def tell(self) -> int:
133 return await to_thread.run_sync(self._fp.tell)
134
135 async def flush(self) -> None:
136 return await to_thread.run_sync(self._fp.flush)
137
138
139@overload
140async def open_file(
141 file: str | PathLike[str] | int,
142 mode: OpenBinaryMode,
143 buffering: int = ...,
144 encoding: str | None = ...,
145 errors: str | None = ...,
146 newline: str | None = ...,
147 closefd: bool = ...,
148 opener: Callable[[str, int], int] | None = ...,
149) -> AsyncFile[bytes]:
150 ...
151
152
153@overload
154async def open_file(
155 file: str | PathLike[str] | int,
156 mode: OpenTextMode = ...,
157 buffering: int = ...,
158 encoding: str | None = ...,
159 errors: str | None = ...,
160 newline: str | None = ...,
161 closefd: bool = ...,
162 opener: Callable[[str, int], int] | None = ...,
163) -> AsyncFile[str]:
164 ...
165
166
167async def open_file(
168 file: str | PathLike[str] | int,
169 mode: str = "r",
170 buffering: int = -1,
171 encoding: str | None = None,
172 errors: str | None = None,
173 newline: str | None = None,
174 closefd: bool = True,
175 opener: Callable[[str, int], int] | None = None,
176) -> AsyncFile[Any]:
177 """
178 Open a file asynchronously.
179
180 The arguments are exactly the same as for the builtin :func:`open`.
181
182 :return: an asynchronous file object
183
184 """
185 fp = await to_thread.run_sync(
186 open, file, mode, buffering, encoding, errors, newline, closefd, opener
187 )
188 return AsyncFile(fp)
189
190
191def wrap_file(file: IO[AnyStr]) -> AsyncFile[AnyStr]:
192 """
193 Wrap an existing file as an asynchronous file.
194
195 :param file: an existing file-like object
196 :return: an asynchronous file object
197
198 """
199 return AsyncFile(file)
200
201
202@dataclass(eq=False)
203class _PathIterator(AsyncIterator["Path"]):
204 iterator: Iterator[PathLike[str]]
205
206 async def __anext__(self) -> Path:
207 nextval = await to_thread.run_sync(
208 next, self.iterator, None, abandon_on_cancel=True
209 )
210 if nextval is None:
211 raise StopAsyncIteration from None
212
213 return Path(nextval)
214
215
216class Path:
217 """
218 An asynchronous version of :class:`pathlib.Path`.
219
220 This class cannot be substituted for :class:`pathlib.Path` or
221 :class:`pathlib.PurePath`, but it is compatible with the :class:`os.PathLike`
222 interface.
223
224 It implements the Python 3.10 version of :class:`pathlib.Path` interface, except for
225 the deprecated :meth:`~pathlib.Path.link_to` method.
226
227 Any methods that do disk I/O need to be awaited on. These methods are:
228
229 * :meth:`~pathlib.Path.absolute`
230 * :meth:`~pathlib.Path.chmod`
231 * :meth:`~pathlib.Path.cwd`
232 * :meth:`~pathlib.Path.exists`
233 * :meth:`~pathlib.Path.expanduser`
234 * :meth:`~pathlib.Path.group`
235 * :meth:`~pathlib.Path.hardlink_to`
236 * :meth:`~pathlib.Path.home`
237 * :meth:`~pathlib.Path.is_block_device`
238 * :meth:`~pathlib.Path.is_char_device`
239 * :meth:`~pathlib.Path.is_dir`
240 * :meth:`~pathlib.Path.is_fifo`
241 * :meth:`~pathlib.Path.is_file`
242 * :meth:`~pathlib.Path.is_mount`
243 * :meth:`~pathlib.Path.lchmod`
244 * :meth:`~pathlib.Path.lstat`
245 * :meth:`~pathlib.Path.mkdir`
246 * :meth:`~pathlib.Path.open`
247 * :meth:`~pathlib.Path.owner`
248 * :meth:`~pathlib.Path.read_bytes`
249 * :meth:`~pathlib.Path.read_text`
250 * :meth:`~pathlib.Path.readlink`
251 * :meth:`~pathlib.Path.rename`
252 * :meth:`~pathlib.Path.replace`
253 * :meth:`~pathlib.Path.rmdir`
254 * :meth:`~pathlib.Path.samefile`
255 * :meth:`~pathlib.Path.stat`
256 * :meth:`~pathlib.Path.touch`
257 * :meth:`~pathlib.Path.unlink`
258 * :meth:`~pathlib.Path.write_bytes`
259 * :meth:`~pathlib.Path.write_text`
260
261 Additionally, the following methods return an async iterator yielding
262 :class:`~.Path` objects:
263
264 * :meth:`~pathlib.Path.glob`
265 * :meth:`~pathlib.Path.iterdir`
266 * :meth:`~pathlib.Path.rglob`
267 """
268
269 __slots__ = "_path", "__weakref__"
270
271 __weakref__: Any
272
273 def __init__(self, *args: str | PathLike[str]) -> None:
274 self._path: Final[pathlib.Path] = pathlib.Path(*args)
275
276 def __fspath__(self) -> str:
277 return self._path.__fspath__()
278
279 def __str__(self) -> str:
280 return self._path.__str__()
281
282 def __repr__(self) -> str:
283 return f"{self.__class__.__name__}({self.as_posix()!r})"
284
285 def __bytes__(self) -> bytes:
286 return self._path.__bytes__()
287
288 def __hash__(self) -> int:
289 return self._path.__hash__()
290
291 def __eq__(self, other: object) -> bool:
292 target = other._path if isinstance(other, Path) else other
293 return self._path.__eq__(target)
294
295 def __lt__(self, other: pathlib.PurePath | Path) -> bool:
296 target = other._path if isinstance(other, Path) else other
297 return self._path.__lt__(target)
298
299 def __le__(self, other: pathlib.PurePath | Path) -> bool:
300 target = other._path if isinstance(other, Path) else other
301 return self._path.__le__(target)
302
303 def __gt__(self, other: pathlib.PurePath | Path) -> bool:
304 target = other._path if isinstance(other, Path) else other
305 return self._path.__gt__(target)
306
307 def __ge__(self, other: pathlib.PurePath | Path) -> bool:
308 target = other._path if isinstance(other, Path) else other
309 return self._path.__ge__(target)
310
311 def __truediv__(self, other: str | PathLike[str]) -> Path:
312 return Path(self._path / other)
313
314 def __rtruediv__(self, other: str | PathLike[str]) -> Path:
315 return Path(other) / self
316
317 @property
318 def parts(self) -> tuple[str, ...]:
319 return self._path.parts
320
321 @property
322 def drive(self) -> str:
323 return self._path.drive
324
325 @property
326 def root(self) -> str:
327 return self._path.root
328
329 @property
330 def anchor(self) -> str:
331 return self._path.anchor
332
333 @property
334 def parents(self) -> Sequence[Path]:
335 return tuple(Path(p) for p in self._path.parents)
336
337 @property
338 def parent(self) -> Path:
339 return Path(self._path.parent)
340
341 @property
342 def name(self) -> str:
343 return self._path.name
344
345 @property
346 def suffix(self) -> str:
347 return self._path.suffix
348
349 @property
350 def suffixes(self) -> list[str]:
351 return self._path.suffixes
352
353 @property
354 def stem(self) -> str:
355 return self._path.stem
356
357 async def absolute(self) -> Path:
358 path = await to_thread.run_sync(self._path.absolute)
359 return Path(path)
360
361 def as_posix(self) -> str:
362 return self._path.as_posix()
363
364 def as_uri(self) -> str:
365 return self._path.as_uri()
366
367 def match(self, path_pattern: str) -> bool:
368 return self._path.match(path_pattern)
369
370 def is_relative_to(self, other: str | PathLike[str]) -> bool:
371 try:
372 self.relative_to(other)
373 return True
374 except ValueError:
375 return False
376
377 async def is_junction(self) -> bool:
378 return await to_thread.run_sync(self._path.is_junction)
379
380 async def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None:
381 func = partial(os.chmod, follow_symlinks=follow_symlinks)
382 return await to_thread.run_sync(func, self._path, mode)
383
384 @classmethod
385 async def cwd(cls) -> Path:
386 path = await to_thread.run_sync(pathlib.Path.cwd)
387 return cls(path)
388
389 async def exists(self) -> bool:
390 return await to_thread.run_sync(self._path.exists, abandon_on_cancel=True)
391
392 async def expanduser(self) -> Path:
393 return Path(
394 await to_thread.run_sync(self._path.expanduser, abandon_on_cancel=True)
395 )
396
397 def glob(self, pattern: str) -> AsyncIterator[Path]:
398 gen = self._path.glob(pattern)
399 return _PathIterator(gen)
400
401 async def group(self) -> str:
402 return await to_thread.run_sync(self._path.group, abandon_on_cancel=True)
403
404 async def hardlink_to(
405 self, target: str | bytes | PathLike[str] | PathLike[bytes]
406 ) -> None:
407 if isinstance(target, Path):
408 target = target._path
409
410 await to_thread.run_sync(os.link, target, self)
411
412 @classmethod
413 async def home(cls) -> Path:
414 home_path = await to_thread.run_sync(pathlib.Path.home)
415 return cls(home_path)
416
417 def is_absolute(self) -> bool:
418 return self._path.is_absolute()
419
420 async def is_block_device(self) -> bool:
421 return await to_thread.run_sync(
422 self._path.is_block_device, abandon_on_cancel=True
423 )
424
425 async def is_char_device(self) -> bool:
426 return await to_thread.run_sync(
427 self._path.is_char_device, abandon_on_cancel=True
428 )
429
430 async def is_dir(self) -> bool:
431 return await to_thread.run_sync(self._path.is_dir, abandon_on_cancel=True)
432
433 async def is_fifo(self) -> bool:
434 return await to_thread.run_sync(self._path.is_fifo, abandon_on_cancel=True)
435
436 async def is_file(self) -> bool:
437 return await to_thread.run_sync(self._path.is_file, abandon_on_cancel=True)
438
439 async def is_mount(self) -> bool:
440 return await to_thread.run_sync(
441 os.path.ismount, self._path, abandon_on_cancel=True
442 )
443
444 def is_reserved(self) -> bool:
445 return self._path.is_reserved()
446
447 async def is_socket(self) -> bool:
448 return await to_thread.run_sync(self._path.is_socket, abandon_on_cancel=True)
449
450 async def is_symlink(self) -> bool:
451 return await to_thread.run_sync(self._path.is_symlink, abandon_on_cancel=True)
452
453 def iterdir(self) -> AsyncIterator[Path]:
454 gen = self._path.iterdir()
455 return _PathIterator(gen)
456
457 def joinpath(self, *args: str | PathLike[str]) -> Path:
458 return Path(self._path.joinpath(*args))
459
460 async def lchmod(self, mode: int) -> None:
461 await to_thread.run_sync(self._path.lchmod, mode)
462
463 async def lstat(self) -> os.stat_result:
464 return await to_thread.run_sync(self._path.lstat, abandon_on_cancel=True)
465
466 async def mkdir(
467 self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False
468 ) -> None:
469 await to_thread.run_sync(self._path.mkdir, mode, parents, exist_ok)
470
471 @overload
472 async def open(
473 self,
474 mode: OpenBinaryMode,
475 buffering: int = ...,
476 encoding: str | None = ...,
477 errors: str | None = ...,
478 newline: str | None = ...,
479 ) -> AsyncFile[bytes]:
480 ...
481
482 @overload
483 async def open(
484 self,
485 mode: OpenTextMode = ...,
486 buffering: int = ...,
487 encoding: str | None = ...,
488 errors: str | None = ...,
489 newline: str | None = ...,
490 ) -> AsyncFile[str]:
491 ...
492
493 async def open(
494 self,
495 mode: str = "r",
496 buffering: int = -1,
497 encoding: str | None = None,
498 errors: str | None = None,
499 newline: str | None = None,
500 ) -> AsyncFile[Any]:
501 fp = await to_thread.run_sync(
502 self._path.open, mode, buffering, encoding, errors, newline
503 )
504 return AsyncFile(fp)
505
506 async def owner(self) -> str:
507 return await to_thread.run_sync(self._path.owner, abandon_on_cancel=True)
508
509 async def read_bytes(self) -> bytes:
510 return await to_thread.run_sync(self._path.read_bytes)
511
512 async def read_text(
513 self, encoding: str | None = None, errors: str | None = None
514 ) -> str:
515 return await to_thread.run_sync(self._path.read_text, encoding, errors)
516
517 if sys.version_info >= (3, 12):
518
519 def relative_to(
520 self, *other: str | PathLike[str], walk_up: bool = False
521 ) -> Path:
522 return Path(self._path.relative_to(*other, walk_up=walk_up))
523
524 else:
525
526 def relative_to(self, *other: str | PathLike[str]) -> Path:
527 return Path(self._path.relative_to(*other))
528
529 async def readlink(self) -> Path:
530 target = await to_thread.run_sync(os.readlink, self._path)
531 return Path(target)
532
533 async def rename(self, target: str | pathlib.PurePath | Path) -> Path:
534 if isinstance(target, Path):
535 target = target._path
536
537 await to_thread.run_sync(self._path.rename, target)
538 return Path(target)
539
540 async def replace(self, target: str | pathlib.PurePath | Path) -> Path:
541 if isinstance(target, Path):
542 target = target._path
543
544 await to_thread.run_sync(self._path.replace, target)
545 return Path(target)
546
547 async def resolve(self, strict: bool = False) -> Path:
548 func = partial(self._path.resolve, strict=strict)
549 return Path(await to_thread.run_sync(func, abandon_on_cancel=True))
550
551 def rglob(self, pattern: str) -> AsyncIterator[Path]:
552 gen = self._path.rglob(pattern)
553 return _PathIterator(gen)
554
555 async def rmdir(self) -> None:
556 await to_thread.run_sync(self._path.rmdir)
557
558 async def samefile(self, other_path: str | PathLike[str]) -> bool:
559 if isinstance(other_path, Path):
560 other_path = other_path._path
561
562 return await to_thread.run_sync(
563 self._path.samefile, other_path, abandon_on_cancel=True
564 )
565
566 async def stat(self, *, follow_symlinks: bool = True) -> os.stat_result:
567 func = partial(os.stat, follow_symlinks=follow_symlinks)
568 return await to_thread.run_sync(func, self._path, abandon_on_cancel=True)
569
570 async def symlink_to(
571 self,
572 target: str | bytes | PathLike[str] | PathLike[bytes],
573 target_is_directory: bool = False,
574 ) -> None:
575 if isinstance(target, Path):
576 target = target._path
577
578 await to_thread.run_sync(self._path.symlink_to, target, target_is_directory)
579
580 async def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None:
581 await to_thread.run_sync(self._path.touch, mode, exist_ok)
582
583 async def unlink(self, missing_ok: bool = False) -> None:
584 try:
585 await to_thread.run_sync(self._path.unlink)
586 except FileNotFoundError:
587 if not missing_ok:
588 raise
589
590 if sys.version_info >= (3, 12):
591
592 async def walk(
593 self,
594 top_down: bool = True,
595 on_error: Callable[[OSError], object] | None = None,
596 follow_symlinks: bool = False,
597 ) -> AsyncIterator[tuple[Path, list[str], list[str]]]:
598 def get_next_value() -> tuple[pathlib.Path, list[str], list[str]] | None:
599 try:
600 return next(gen)
601 except StopIteration:
602 return None
603
604 gen = self._path.walk(top_down, on_error, follow_symlinks)
605 while True:
606 value = await to_thread.run_sync(get_next_value)
607 if value is None:
608 return
609
610 root, dirs, paths = value
611 yield Path(root), dirs, paths
612
613 def with_name(self, name: str) -> Path:
614 return Path(self._path.with_name(name))
615
616 def with_stem(self, stem: str) -> Path:
617 return Path(self._path.with_name(stem + self._path.suffix))
618
619 def with_suffix(self, suffix: str) -> Path:
620 return Path(self._path.with_suffix(suffix))
621
622 def with_segments(self, *pathsegments: str | PathLike[str]) -> Path:
623 return Path(*pathsegments)
624
625 async def write_bytes(self, data: bytes) -> int:
626 return await to_thread.run_sync(self._path.write_bytes, data)
627
628 async def write_text(
629 self,
630 data: str,
631 encoding: str | None = None,
632 errors: str | None = None,
633 newline: str | None = None,
634 ) -> int:
635 # Path.write_text() does not support the "newline" parameter before Python 3.10
636 def sync_write_text() -> int:
637 with self._path.open(
638 "w", encoding=encoding, errors=errors, newline=newline
639 ) as fp:
640 return fp.write(data)
641
642 return await to_thread.run_sync(sync_write_text)
643
644
645PathLike.register(Path)