1from __future__ import annotations
2
3import os
4import pathlib
5import sys
6from collections.abc import Callable, Iterable, Iterator, Sequence
7from dataclasses import dataclass
8from functools import partial
9from os import PathLike
10from typing import (
11 IO,
12 TYPE_CHECKING,
13 Any,
14 AnyStr,
15 AsyncIterator,
16 Final,
17 Generic,
18 overload,
19)
20
21from .. import to_thread
22from ..abc import AsyncResource
23
24if TYPE_CHECKING:
25 from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer
26else:
27 ReadableBuffer = OpenBinaryMode = OpenTextMode = WriteableBuffer = object
28
29
30class AsyncFile(AsyncResource, Generic[AnyStr]):
31 """
32 An asynchronous file object.
33
34 This class wraps a standard file object and provides async friendly versions of the
35 following blocking methods (where available on the original file object):
36
37 * read
38 * read1
39 * readline
40 * readlines
41 * readinto
42 * readinto1
43 * write
44 * writelines
45 * truncate
46 * seek
47 * tell
48 * flush
49
50 All other methods are directly passed through.
51
52 This class supports the asynchronous context manager protocol which closes the
53 underlying file at the end of the context block.
54
55 This class also supports asynchronous iteration::
56
57 async with await open_file(...) as f:
58 async for line in f:
59 print(line)
60 """
61
62 def __init__(self, fp: IO[AnyStr]) -> None:
63 self._fp: Any = fp
64
65 def __getattr__(self, name: str) -> object:
66 return getattr(self._fp, name)
67
68 @property
69 def wrapped(self) -> IO[AnyStr]:
70 """The wrapped file object."""
71 return self._fp
72
73 async def __aiter__(self) -> AsyncIterator[AnyStr]:
74 while True:
75 line = await self.readline()
76 if line:
77 yield line
78 else:
79 break
80
81 async def aclose(self) -> None:
82 return await to_thread.run_sync(self._fp.close)
83
84 async def read(self, size: int = -1) -> AnyStr:
85 return await to_thread.run_sync(self._fp.read, size)
86
87 async def read1(self: AsyncFile[bytes], size: int = -1) -> bytes:
88 return await to_thread.run_sync(self._fp.read1, size)
89
90 async def readline(self) -> AnyStr:
91 return await to_thread.run_sync(self._fp.readline)
92
93 async def readlines(self) -> list[AnyStr]:
94 return await to_thread.run_sync(self._fp.readlines)
95
96 async def readinto(self: AsyncFile[bytes], b: WriteableBuffer) -> bytes:
97 return await to_thread.run_sync(self._fp.readinto, b)
98
99 async def readinto1(self: AsyncFile[bytes], b: WriteableBuffer) -> bytes:
100 return await to_thread.run_sync(self._fp.readinto1, b)
101
102 @overload
103 async def write(self: AsyncFile[bytes], b: ReadableBuffer) -> int: ...
104
105 @overload
106 async def write(self: AsyncFile[str], b: str) -> int: ...
107
108 async def write(self, b: ReadableBuffer | str) -> int:
109 return await to_thread.run_sync(self._fp.write, b)
110
111 @overload
112 async def writelines(
113 self: AsyncFile[bytes], lines: Iterable[ReadableBuffer]
114 ) -> None: ...
115
116 @overload
117 async def writelines(self: AsyncFile[str], lines: Iterable[str]) -> None: ...
118
119 async def writelines(self, lines: Iterable[ReadableBuffer] | Iterable[str]) -> None:
120 return await to_thread.run_sync(self._fp.writelines, lines)
121
122 async def truncate(self, size: int | None = None) -> int:
123 return await to_thread.run_sync(self._fp.truncate, size)
124
125 async def seek(self, offset: int, whence: int | None = os.SEEK_SET) -> int:
126 return await to_thread.run_sync(self._fp.seek, offset, whence)
127
128 async def tell(self) -> int:
129 return await to_thread.run_sync(self._fp.tell)
130
131 async def flush(self) -> None:
132 return await to_thread.run_sync(self._fp.flush)
133
134
135@overload
136async def open_file(
137 file: str | PathLike[str] | int,
138 mode: OpenBinaryMode,
139 buffering: int = ...,
140 encoding: str | None = ...,
141 errors: str | None = ...,
142 newline: str | None = ...,
143 closefd: bool = ...,
144 opener: Callable[[str, int], int] | None = ...,
145) -> AsyncFile[bytes]: ...
146
147
148@overload
149async def open_file(
150 file: str | PathLike[str] | int,
151 mode: OpenTextMode = ...,
152 buffering: int = ...,
153 encoding: str | None = ...,
154 errors: str | None = ...,
155 newline: str | None = ...,
156 closefd: bool = ...,
157 opener: Callable[[str, int], int] | None = ...,
158) -> AsyncFile[str]: ...
159
160
161async def open_file(
162 file: str | PathLike[str] | int,
163 mode: str = "r",
164 buffering: int = -1,
165 encoding: str | None = None,
166 errors: str | None = None,
167 newline: str | None = None,
168 closefd: bool = True,
169 opener: Callable[[str, int], int] | None = None,
170) -> AsyncFile[Any]:
171 """
172 Open a file asynchronously.
173
174 The arguments are exactly the same as for the builtin :func:`open`.
175
176 :return: an asynchronous file object
177
178 """
179 fp = await to_thread.run_sync(
180 open, file, mode, buffering, encoding, errors, newline, closefd, opener
181 )
182 return AsyncFile(fp)
183
184
185def wrap_file(file: IO[AnyStr]) -> AsyncFile[AnyStr]:
186 """
187 Wrap an existing file as an asynchronous file.
188
189 :param file: an existing file-like object
190 :return: an asynchronous file object
191
192 """
193 return AsyncFile(file)
194
195
196@dataclass(eq=False)
197class _PathIterator(AsyncIterator["Path"]):
198 iterator: Iterator[PathLike[str]]
199
200 async def __anext__(self) -> Path:
201 nextval = await to_thread.run_sync(
202 next, self.iterator, None, abandon_on_cancel=True
203 )
204 if nextval is None:
205 raise StopAsyncIteration from None
206
207 return Path(nextval)
208
209
210class Path:
211 """
212 An asynchronous version of :class:`pathlib.Path`.
213
214 This class cannot be substituted for :class:`pathlib.Path` or
215 :class:`pathlib.PurePath`, but it is compatible with the :class:`os.PathLike`
216 interface.
217
218 It implements the Python 3.10 version of :class:`pathlib.Path` interface, except for
219 the deprecated :meth:`~pathlib.Path.link_to` method.
220
221 Any methods that do disk I/O need to be awaited on. These methods are:
222
223 * :meth:`~pathlib.Path.absolute`
224 * :meth:`~pathlib.Path.chmod`
225 * :meth:`~pathlib.Path.cwd`
226 * :meth:`~pathlib.Path.exists`
227 * :meth:`~pathlib.Path.expanduser`
228 * :meth:`~pathlib.Path.group`
229 * :meth:`~pathlib.Path.hardlink_to`
230 * :meth:`~pathlib.Path.home`
231 * :meth:`~pathlib.Path.is_block_device`
232 * :meth:`~pathlib.Path.is_char_device`
233 * :meth:`~pathlib.Path.is_dir`
234 * :meth:`~pathlib.Path.is_fifo`
235 * :meth:`~pathlib.Path.is_file`
236 * :meth:`~pathlib.Path.is_mount`
237 * :meth:`~pathlib.Path.lchmod`
238 * :meth:`~pathlib.Path.lstat`
239 * :meth:`~pathlib.Path.mkdir`
240 * :meth:`~pathlib.Path.open`
241 * :meth:`~pathlib.Path.owner`
242 * :meth:`~pathlib.Path.read_bytes`
243 * :meth:`~pathlib.Path.read_text`
244 * :meth:`~pathlib.Path.readlink`
245 * :meth:`~pathlib.Path.rename`
246 * :meth:`~pathlib.Path.replace`
247 * :meth:`~pathlib.Path.rmdir`
248 * :meth:`~pathlib.Path.samefile`
249 * :meth:`~pathlib.Path.stat`
250 * :meth:`~pathlib.Path.touch`
251 * :meth:`~pathlib.Path.unlink`
252 * :meth:`~pathlib.Path.write_bytes`
253 * :meth:`~pathlib.Path.write_text`
254
255 Additionally, the following methods return an async iterator yielding
256 :class:`~.Path` objects:
257
258 * :meth:`~pathlib.Path.glob`
259 * :meth:`~pathlib.Path.iterdir`
260 * :meth:`~pathlib.Path.rglob`
261 """
262
263 __slots__ = "_path", "__weakref__"
264
265 __weakref__: Any
266
267 def __init__(self, *args: str | PathLike[str]) -> None:
268 self._path: Final[pathlib.Path] = pathlib.Path(*args)
269
270 def __fspath__(self) -> str:
271 return self._path.__fspath__()
272
273 def __str__(self) -> str:
274 return self._path.__str__()
275
276 def __repr__(self) -> str:
277 return f"{self.__class__.__name__}({self.as_posix()!r})"
278
279 def __bytes__(self) -> bytes:
280 return self._path.__bytes__()
281
282 def __hash__(self) -> int:
283 return self._path.__hash__()
284
285 def __eq__(self, other: object) -> bool:
286 target = other._path if isinstance(other, Path) else other
287 return self._path.__eq__(target)
288
289 def __lt__(self, other: pathlib.PurePath | Path) -> bool:
290 target = other._path if isinstance(other, Path) else other
291 return self._path.__lt__(target)
292
293 def __le__(self, other: pathlib.PurePath | Path) -> bool:
294 target = other._path if isinstance(other, Path) else other
295 return self._path.__le__(target)
296
297 def __gt__(self, other: pathlib.PurePath | Path) -> bool:
298 target = other._path if isinstance(other, Path) else other
299 return self._path.__gt__(target)
300
301 def __ge__(self, other: pathlib.PurePath | Path) -> bool:
302 target = other._path if isinstance(other, Path) else other
303 return self._path.__ge__(target)
304
305 def __truediv__(self, other: str | PathLike[str]) -> Path:
306 return Path(self._path / other)
307
308 def __rtruediv__(self, other: str | PathLike[str]) -> Path:
309 return Path(other) / self
310
311 @property
312 def parts(self) -> tuple[str, ...]:
313 return self._path.parts
314
315 @property
316 def drive(self) -> str:
317 return self._path.drive
318
319 @property
320 def root(self) -> str:
321 return self._path.root
322
323 @property
324 def anchor(self) -> str:
325 return self._path.anchor
326
327 @property
328 def parents(self) -> Sequence[Path]:
329 return tuple(Path(p) for p in self._path.parents)
330
331 @property
332 def parent(self) -> Path:
333 return Path(self._path.parent)
334
335 @property
336 def name(self) -> str:
337 return self._path.name
338
339 @property
340 def suffix(self) -> str:
341 return self._path.suffix
342
343 @property
344 def suffixes(self) -> list[str]:
345 return self._path.suffixes
346
347 @property
348 def stem(self) -> str:
349 return self._path.stem
350
351 async def absolute(self) -> Path:
352 path = await to_thread.run_sync(self._path.absolute)
353 return Path(path)
354
355 def as_posix(self) -> str:
356 return self._path.as_posix()
357
358 def as_uri(self) -> str:
359 return self._path.as_uri()
360
361 def match(self, path_pattern: str) -> bool:
362 return self._path.match(path_pattern)
363
364 def is_relative_to(self, other: str | PathLike[str]) -> bool:
365 try:
366 self.relative_to(other)
367 return True
368 except ValueError:
369 return False
370
371 async def is_junction(self) -> bool:
372 return await to_thread.run_sync(self._path.is_junction)
373
374 async def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None:
375 func = partial(os.chmod, follow_symlinks=follow_symlinks)
376 return await to_thread.run_sync(func, self._path, mode)
377
378 @classmethod
379 async def cwd(cls) -> Path:
380 path = await to_thread.run_sync(pathlib.Path.cwd)
381 return cls(path)
382
383 async def exists(self) -> bool:
384 return await to_thread.run_sync(self._path.exists, abandon_on_cancel=True)
385
386 async def expanduser(self) -> Path:
387 return Path(
388 await to_thread.run_sync(self._path.expanduser, abandon_on_cancel=True)
389 )
390
391 def glob(self, pattern: str) -> AsyncIterator[Path]:
392 gen = self._path.glob(pattern)
393 return _PathIterator(gen)
394
395 async def group(self) -> str:
396 return await to_thread.run_sync(self._path.group, abandon_on_cancel=True)
397
398 async def hardlink_to(
399 self, target: str | bytes | PathLike[str] | PathLike[bytes]
400 ) -> None:
401 if isinstance(target, Path):
402 target = target._path
403
404 await to_thread.run_sync(os.link, target, self)
405
406 @classmethod
407 async def home(cls) -> Path:
408 home_path = await to_thread.run_sync(pathlib.Path.home)
409 return cls(home_path)
410
411 def is_absolute(self) -> bool:
412 return self._path.is_absolute()
413
414 async def is_block_device(self) -> bool:
415 return await to_thread.run_sync(
416 self._path.is_block_device, abandon_on_cancel=True
417 )
418
419 async def is_char_device(self) -> bool:
420 return await to_thread.run_sync(
421 self._path.is_char_device, abandon_on_cancel=True
422 )
423
424 async def is_dir(self) -> bool:
425 return await to_thread.run_sync(self._path.is_dir, abandon_on_cancel=True)
426
427 async def is_fifo(self) -> bool:
428 return await to_thread.run_sync(self._path.is_fifo, abandon_on_cancel=True)
429
430 async def is_file(self) -> bool:
431 return await to_thread.run_sync(self._path.is_file, abandon_on_cancel=True)
432
433 async def is_mount(self) -> bool:
434 return await to_thread.run_sync(
435 os.path.ismount, self._path, abandon_on_cancel=True
436 )
437
438 def is_reserved(self) -> bool:
439 return self._path.is_reserved()
440
441 async def is_socket(self) -> bool:
442 return await to_thread.run_sync(self._path.is_socket, abandon_on_cancel=True)
443
444 async def is_symlink(self) -> bool:
445 return await to_thread.run_sync(self._path.is_symlink, abandon_on_cancel=True)
446
447 def iterdir(self) -> AsyncIterator[Path]:
448 gen = self._path.iterdir()
449 return _PathIterator(gen)
450
451 def joinpath(self, *args: str | PathLike[str]) -> Path:
452 return Path(self._path.joinpath(*args))
453
454 async def lchmod(self, mode: int) -> None:
455 await to_thread.run_sync(self._path.lchmod, mode)
456
457 async def lstat(self) -> os.stat_result:
458 return await to_thread.run_sync(self._path.lstat, abandon_on_cancel=True)
459
460 async def mkdir(
461 self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False
462 ) -> None:
463 await to_thread.run_sync(self._path.mkdir, mode, parents, exist_ok)
464
465 @overload
466 async def open(
467 self,
468 mode: OpenBinaryMode,
469 buffering: int = ...,
470 encoding: str | None = ...,
471 errors: str | None = ...,
472 newline: str | None = ...,
473 ) -> AsyncFile[bytes]: ...
474
475 @overload
476 async def open(
477 self,
478 mode: OpenTextMode = ...,
479 buffering: int = ...,
480 encoding: str | None = ...,
481 errors: str | None = ...,
482 newline: str | None = ...,
483 ) -> AsyncFile[str]: ...
484
485 async def open(
486 self,
487 mode: str = "r",
488 buffering: int = -1,
489 encoding: str | None = None,
490 errors: str | None = None,
491 newline: str | None = None,
492 ) -> AsyncFile[Any]:
493 fp = await to_thread.run_sync(
494 self._path.open, mode, buffering, encoding, errors, newline
495 )
496 return AsyncFile(fp)
497
498 async def owner(self) -> str:
499 return await to_thread.run_sync(self._path.owner, abandon_on_cancel=True)
500
501 async def read_bytes(self) -> bytes:
502 return await to_thread.run_sync(self._path.read_bytes)
503
504 async def read_text(
505 self, encoding: str | None = None, errors: str | None = None
506 ) -> str:
507 return await to_thread.run_sync(self._path.read_text, encoding, errors)
508
509 if sys.version_info >= (3, 12):
510
511 def relative_to(
512 self, *other: str | PathLike[str], walk_up: bool = False
513 ) -> Path:
514 return Path(self._path.relative_to(*other, walk_up=walk_up))
515
516 else:
517
518 def relative_to(self, *other: str | PathLike[str]) -> Path:
519 return Path(self._path.relative_to(*other))
520
521 async def readlink(self) -> Path:
522 target = await to_thread.run_sync(os.readlink, self._path)
523 return Path(target)
524
525 async def rename(self, target: str | pathlib.PurePath | Path) -> Path:
526 if isinstance(target, Path):
527 target = target._path
528
529 await to_thread.run_sync(self._path.rename, target)
530 return Path(target)
531
532 async def replace(self, target: str | pathlib.PurePath | Path) -> Path:
533 if isinstance(target, Path):
534 target = target._path
535
536 await to_thread.run_sync(self._path.replace, target)
537 return Path(target)
538
539 async def resolve(self, strict: bool = False) -> Path:
540 func = partial(self._path.resolve, strict=strict)
541 return Path(await to_thread.run_sync(func, abandon_on_cancel=True))
542
543 def rglob(self, pattern: str) -> AsyncIterator[Path]:
544 gen = self._path.rglob(pattern)
545 return _PathIterator(gen)
546
547 async def rmdir(self) -> None:
548 await to_thread.run_sync(self._path.rmdir)
549
550 async def samefile(self, other_path: str | PathLike[str]) -> bool:
551 if isinstance(other_path, Path):
552 other_path = other_path._path
553
554 return await to_thread.run_sync(
555 self._path.samefile, other_path, abandon_on_cancel=True
556 )
557
558 async def stat(self, *, follow_symlinks: bool = True) -> os.stat_result:
559 func = partial(os.stat, follow_symlinks=follow_symlinks)
560 return await to_thread.run_sync(func, self._path, abandon_on_cancel=True)
561
562 async def symlink_to(
563 self,
564 target: str | bytes | PathLike[str] | PathLike[bytes],
565 target_is_directory: bool = False,
566 ) -> None:
567 if isinstance(target, Path):
568 target = target._path
569
570 await to_thread.run_sync(self._path.symlink_to, target, target_is_directory)
571
572 async def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None:
573 await to_thread.run_sync(self._path.touch, mode, exist_ok)
574
575 async def unlink(self, missing_ok: bool = False) -> None:
576 try:
577 await to_thread.run_sync(self._path.unlink)
578 except FileNotFoundError:
579 if not missing_ok:
580 raise
581
582 if sys.version_info >= (3, 12):
583
584 async def walk(
585 self,
586 top_down: bool = True,
587 on_error: Callable[[OSError], object] | None = None,
588 follow_symlinks: bool = False,
589 ) -> AsyncIterator[tuple[Path, list[str], list[str]]]:
590 def get_next_value() -> tuple[pathlib.Path, list[str], list[str]] | None:
591 try:
592 return next(gen)
593 except StopIteration:
594 return None
595
596 gen = self._path.walk(top_down, on_error, follow_symlinks)
597 while True:
598 value = await to_thread.run_sync(get_next_value)
599 if value is None:
600 return
601
602 root, dirs, paths = value
603 yield Path(root), dirs, paths
604
605 def with_name(self, name: str) -> Path:
606 return Path(self._path.with_name(name))
607
608 def with_stem(self, stem: str) -> Path:
609 return Path(self._path.with_name(stem + self._path.suffix))
610
611 def with_suffix(self, suffix: str) -> Path:
612 return Path(self._path.with_suffix(suffix))
613
614 def with_segments(self, *pathsegments: str | PathLike[str]) -> Path:
615 return Path(*pathsegments)
616
617 async def write_bytes(self, data: bytes) -> int:
618 return await to_thread.run_sync(self._path.write_bytes, data)
619
620 async def write_text(
621 self,
622 data: str,
623 encoding: str | None = None,
624 errors: str | None = None,
625 newline: str | None = None,
626 ) -> int:
627 # Path.write_text() does not support the "newline" parameter before Python 3.10
628 def sync_write_text() -> int:
629 with self._path.open(
630 "w", encoding=encoding, errors=errors, newline=newline
631 ) as fp:
632 return fp.write(data)
633
634 return await to_thread.run_sync(sync_write_text)
635
636
637PathLike.register(Path)