1from __future__ import annotations
2
3import os
4import sys
5import tempfile
6from collections.abc import Iterable
7from io import BytesIO, TextIOWrapper
8from types import TracebackType
9from typing import (
10 TYPE_CHECKING,
11 Any,
12 AnyStr,
13 Generic,
14 overload,
15)
16
17from .. import to_thread
18from .._core._fileio import AsyncFile
19from ..lowlevel import checkpoint_if_cancelled
20
21if TYPE_CHECKING:
22 from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer
23
24
25class TemporaryFile(Generic[AnyStr]):
26 """
27 An asynchronous temporary file that is automatically created and cleaned up.
28
29 This class provides an asynchronous context manager interface to a temporary file.
30 The file is created using Python's standard `tempfile.TemporaryFile` function in a
31 background thread, and is wrapped as an asynchronous file using `AsyncFile`.
32
33 :param mode: The mode in which the file is opened. Defaults to "w+b".
34 :param buffering: The buffering policy (-1 means the default buffering).
35 :param encoding: The encoding used to decode or encode the file. Only applicable in
36 text mode.
37 :param newline: Controls how universal newlines mode works (only applicable in text
38 mode).
39 :param suffix: The suffix for the temporary file name.
40 :param prefix: The prefix for the temporary file name.
41 :param dir: The directory in which the temporary file is created.
42 :param errors: The error handling scheme used for encoding/decoding errors.
43 """
44
45 _async_file: AsyncFile[AnyStr]
46
47 @overload
48 def __init__(
49 self: TemporaryFile[bytes],
50 mode: OpenBinaryMode = ...,
51 buffering: int = ...,
52 encoding: str | None = ...,
53 newline: str | None = ...,
54 suffix: str | None = ...,
55 prefix: str | None = ...,
56 dir: str | None = ...,
57 *,
58 errors: str | None = ...,
59 ): ...
60 @overload
61 def __init__(
62 self: TemporaryFile[str],
63 mode: OpenTextMode,
64 buffering: int = ...,
65 encoding: str | None = ...,
66 newline: str | None = ...,
67 suffix: str | None = ...,
68 prefix: str | None = ...,
69 dir: str | None = ...,
70 *,
71 errors: str | None = ...,
72 ): ...
73
74 def __init__(
75 self,
76 mode: OpenTextMode | OpenBinaryMode = "w+b",
77 buffering: int = -1,
78 encoding: str | None = None,
79 newline: str | None = None,
80 suffix: str | None = None,
81 prefix: str | None = None,
82 dir: str | None = None,
83 *,
84 errors: str | None = None,
85 ) -> None:
86 self.mode = mode
87 self.buffering = buffering
88 self.encoding = encoding
89 self.newline = newline
90 self.suffix: str | None = suffix
91 self.prefix: str | None = prefix
92 self.dir: str | None = dir
93 self.errors = errors
94
95 async def __aenter__(self) -> AsyncFile[AnyStr]:
96 fp = await to_thread.run_sync(
97 lambda: tempfile.TemporaryFile(
98 self.mode,
99 self.buffering,
100 self.encoding,
101 self.newline,
102 self.suffix,
103 self.prefix,
104 self.dir,
105 errors=self.errors,
106 )
107 )
108 self._async_file = AsyncFile(fp)
109 return self._async_file
110
111 async def __aexit__(
112 self,
113 exc_type: type[BaseException] | None,
114 exc_value: BaseException | None,
115 traceback: TracebackType | None,
116 ) -> None:
117 await self._async_file.aclose()
118
119
120class NamedTemporaryFile(Generic[AnyStr]):
121 """
122 An asynchronous named temporary file that is automatically created and cleaned up.
123
124 This class provides an asynchronous context manager for a temporary file with a
125 visible name in the file system. It uses Python's standard
126 :func:`~tempfile.NamedTemporaryFile` function and wraps the file object with
127 :class:`AsyncFile` for asynchronous operations.
128
129 :param mode: The mode in which the file is opened. Defaults to "w+b".
130 :param buffering: The buffering policy (-1 means the default buffering).
131 :param encoding: The encoding used to decode or encode the file. Only applicable in
132 text mode.
133 :param newline: Controls how universal newlines mode works (only applicable in text
134 mode).
135 :param suffix: The suffix for the temporary file name.
136 :param prefix: The prefix for the temporary file name.
137 :param dir: The directory in which the temporary file is created.
138 :param delete: Whether to delete the file when it is closed.
139 :param errors: The error handling scheme used for encoding/decoding errors.
140 :param delete_on_close: (Python 3.12+) Whether to delete the file on close.
141 """
142
143 _async_file: AsyncFile[AnyStr]
144
145 @overload
146 def __init__(
147 self: NamedTemporaryFile[bytes],
148 mode: OpenBinaryMode = ...,
149 buffering: int = ...,
150 encoding: str | None = ...,
151 newline: str | None = ...,
152 suffix: str | None = ...,
153 prefix: str | None = ...,
154 dir: str | None = ...,
155 delete: bool = ...,
156 *,
157 errors: str | None = ...,
158 delete_on_close: bool = ...,
159 ): ...
160 @overload
161 def __init__(
162 self: NamedTemporaryFile[str],
163 mode: OpenTextMode,
164 buffering: int = ...,
165 encoding: str | None = ...,
166 newline: str | None = ...,
167 suffix: str | None = ...,
168 prefix: str | None = ...,
169 dir: str | None = ...,
170 delete: bool = ...,
171 *,
172 errors: str | None = ...,
173 delete_on_close: bool = ...,
174 ): ...
175
176 def __init__(
177 self,
178 mode: OpenBinaryMode | OpenTextMode = "w+b",
179 buffering: int = -1,
180 encoding: str | None = None,
181 newline: str | None = None,
182 suffix: str | None = None,
183 prefix: str | None = None,
184 dir: str | None = None,
185 delete: bool = True,
186 *,
187 errors: str | None = None,
188 delete_on_close: bool = True,
189 ) -> None:
190 self._params: dict[str, Any] = {
191 "mode": mode,
192 "buffering": buffering,
193 "encoding": encoding,
194 "newline": newline,
195 "suffix": suffix,
196 "prefix": prefix,
197 "dir": dir,
198 "delete": delete,
199 "errors": errors,
200 }
201 if sys.version_info >= (3, 12):
202 self._params["delete_on_close"] = delete_on_close
203
204 async def __aenter__(self) -> AsyncFile[AnyStr]:
205 fp = await to_thread.run_sync(
206 lambda: tempfile.NamedTemporaryFile(**self._params)
207 )
208 self._async_file = AsyncFile(fp)
209 return self._async_file
210
211 async def __aexit__(
212 self,
213 exc_type: type[BaseException] | None,
214 exc_value: BaseException | None,
215 traceback: TracebackType | None,
216 ) -> None:
217 await self._async_file.aclose()
218
219
220class SpooledTemporaryFile(AsyncFile[AnyStr]):
221 """
222 An asynchronous spooled temporary file that starts in memory and is spooled to disk.
223
224 This class provides an asynchronous interface to a spooled temporary file, much like
225 Python's standard :class:`~tempfile.SpooledTemporaryFile`. It supports asynchronous
226 write operations and provides a method to force a rollover to disk.
227
228 :param max_size: Maximum size in bytes before the file is rolled over to disk.
229 :param mode: The mode in which the file is opened. Defaults to "w+b".
230 :param buffering: The buffering policy (-1 means the default buffering).
231 :param encoding: The encoding used to decode or encode the file (text mode only).
232 :param newline: Controls how universal newlines mode works (text mode only).
233 :param suffix: The suffix for the temporary file name.
234 :param prefix: The prefix for the temporary file name.
235 :param dir: The directory in which the temporary file is created.
236 :param errors: The error handling scheme used for encoding/decoding errors.
237 """
238
239 _rolled: bool = False
240
241 @overload
242 def __init__(
243 self: SpooledTemporaryFile[bytes],
244 max_size: int = ...,
245 mode: OpenBinaryMode = ...,
246 buffering: int = ...,
247 encoding: str | None = ...,
248 newline: str | None = ...,
249 suffix: str | None = ...,
250 prefix: str | None = ...,
251 dir: str | None = ...,
252 *,
253 errors: str | None = ...,
254 ): ...
255 @overload
256 def __init__(
257 self: SpooledTemporaryFile[str],
258 max_size: int = ...,
259 mode: OpenTextMode = ...,
260 buffering: int = ...,
261 encoding: str | None = ...,
262 newline: str | None = ...,
263 suffix: str | None = ...,
264 prefix: str | None = ...,
265 dir: str | None = ...,
266 *,
267 errors: str | None = ...,
268 ): ...
269
270 def __init__(
271 self,
272 max_size: int = 0,
273 mode: OpenBinaryMode | OpenTextMode = "w+b",
274 buffering: int = -1,
275 encoding: str | None = None,
276 newline: str | None = None,
277 suffix: str | None = None,
278 prefix: str | None = None,
279 dir: str | None = None,
280 *,
281 errors: str | None = None,
282 ) -> None:
283 self._tempfile_params: dict[str, Any] = {
284 "mode": mode,
285 "buffering": buffering,
286 "encoding": encoding,
287 "newline": newline,
288 "suffix": suffix,
289 "prefix": prefix,
290 "dir": dir,
291 "errors": errors,
292 }
293 self._max_size = max_size
294 if "b" in mode:
295 super().__init__(BytesIO()) # type: ignore[arg-type]
296 else:
297 super().__init__(
298 TextIOWrapper( # type: ignore[arg-type]
299 BytesIO(),
300 encoding=encoding,
301 errors=errors,
302 newline=newline,
303 write_through=True,
304 )
305 )
306
307 async def aclose(self) -> None:
308 if not self._rolled:
309 self._fp.close()
310 return
311
312 await super().aclose()
313
314 async def _check(self) -> None:
315 if self._rolled or self._fp.tell() < self._max_size:
316 return
317
318 await self.rollover()
319
320 async def rollover(self) -> None:
321 if self._rolled:
322 return
323
324 self._rolled = True
325 buffer = self._fp
326 buffer.seek(0)
327 self._fp = await to_thread.run_sync(
328 lambda: tempfile.TemporaryFile(**self._tempfile_params)
329 )
330 await self.write(buffer.read())
331 buffer.close()
332
333 @property
334 def closed(self) -> bool:
335 return self._fp.closed
336
337 async def read(self, size: int = -1) -> AnyStr:
338 if not self._rolled:
339 await checkpoint_if_cancelled()
340 return self._fp.read(size)
341
342 return await super().read(size) # type: ignore[return-value]
343
344 async def read1(self: SpooledTemporaryFile[bytes], size: int = -1) -> bytes:
345 if not self._rolled:
346 await checkpoint_if_cancelled()
347 return self._fp.read1(size)
348
349 return await super().read1(size)
350
351 async def readline(self) -> AnyStr:
352 if not self._rolled:
353 await checkpoint_if_cancelled()
354 return self._fp.readline()
355
356 return await super().readline() # type: ignore[return-value]
357
358 async def readlines(self) -> list[AnyStr]:
359 if not self._rolled:
360 await checkpoint_if_cancelled()
361 return self._fp.readlines()
362
363 return await super().readlines() # type: ignore[return-value]
364
365 async def readinto(self: SpooledTemporaryFile[bytes], b: WriteableBuffer) -> int:
366 if not self._rolled:
367 await checkpoint_if_cancelled()
368 self._fp.readinto(b)
369
370 return await super().readinto(b)
371
372 async def readinto1(self: SpooledTemporaryFile[bytes], b: WriteableBuffer) -> int:
373 if not self._rolled:
374 await checkpoint_if_cancelled()
375 self._fp.readinto(b)
376
377 return await super().readinto1(b)
378
379 async def seek(self, offset: int, whence: int | None = os.SEEK_SET) -> int:
380 if not self._rolled:
381 await checkpoint_if_cancelled()
382 return self._fp.seek(offset, whence)
383
384 return await super().seek(offset, whence)
385
386 async def tell(self) -> int:
387 if not self._rolled:
388 await checkpoint_if_cancelled()
389 return self._fp.tell()
390
391 return await super().tell()
392
393 async def truncate(self, size: int | None = None) -> int:
394 if not self._rolled:
395 await checkpoint_if_cancelled()
396 return self._fp.truncate(size)
397
398 return await super().truncate(size)
399
400 @overload
401 async def write(self: SpooledTemporaryFile[bytes], b: ReadableBuffer) -> int: ...
402 @overload
403 async def write(self: SpooledTemporaryFile[str], b: str) -> int: ...
404
405 async def write(self, b: ReadableBuffer | str) -> int:
406 """
407 Asynchronously write data to the spooled temporary file.
408
409 If the file has not yet been rolled over, the data is written synchronously,
410 and a rollover is triggered if the size exceeds the maximum size.
411
412 :param s: The data to write.
413 :return: The number of bytes written.
414 :raises RuntimeError: If the underlying file is not initialized.
415
416 """
417 if not self._rolled:
418 await checkpoint_if_cancelled()
419 result = self._fp.write(b)
420 await self._check()
421 return result
422
423 return await super().write(b) # type: ignore[misc]
424
425 @overload
426 async def writelines(
427 self: SpooledTemporaryFile[bytes], lines: Iterable[ReadableBuffer]
428 ) -> None: ...
429 @overload
430 async def writelines(
431 self: SpooledTemporaryFile[str], lines: Iterable[str]
432 ) -> None: ...
433
434 async def writelines(self, lines: Iterable[str] | Iterable[ReadableBuffer]) -> None:
435 """
436 Asynchronously write a list of lines to the spooled temporary file.
437
438 If the file has not yet been rolled over, the lines are written synchronously,
439 and a rollover is triggered if the size exceeds the maximum size.
440
441 :param lines: An iterable of lines to write.
442 :raises RuntimeError: If the underlying file is not initialized.
443
444 """
445 if not self._rolled:
446 await checkpoint_if_cancelled()
447 result = self._fp.writelines(lines)
448 await self._check()
449 return result
450
451 return await super().writelines(lines) # type: ignore[misc]
452
453
454class TemporaryDirectory(Generic[AnyStr]):
455 """
456 An asynchronous temporary directory that is created and cleaned up automatically.
457
458 This class provides an asynchronous context manager for creating a temporary
459 directory. It wraps Python's standard :class:`~tempfile.TemporaryDirectory` to
460 perform directory creation and cleanup operations in a background thread.
461
462 :param suffix: Suffix to be added to the temporary directory name.
463 :param prefix: Prefix to be added to the temporary directory name.
464 :param dir: The parent directory where the temporary directory is created.
465 :param ignore_cleanup_errors: Whether to ignore errors during cleanup
466 (Python 3.10+).
467 :param delete: Whether to delete the directory upon closing (Python 3.12+).
468 """
469
470 def __init__(
471 self,
472 suffix: AnyStr | None = None,
473 prefix: AnyStr | None = None,
474 dir: AnyStr | None = None,
475 *,
476 ignore_cleanup_errors: bool = False,
477 delete: bool = True,
478 ) -> None:
479 self.suffix: AnyStr | None = suffix
480 self.prefix: AnyStr | None = prefix
481 self.dir: AnyStr | None = dir
482 self.ignore_cleanup_errors = ignore_cleanup_errors
483 self.delete = delete
484
485 self._tempdir: tempfile.TemporaryDirectory | None = None
486
487 async def __aenter__(self) -> str:
488 params: dict[str, Any] = {
489 "suffix": self.suffix,
490 "prefix": self.prefix,
491 "dir": self.dir,
492 }
493 if sys.version_info >= (3, 10):
494 params["ignore_cleanup_errors"] = self.ignore_cleanup_errors
495
496 if sys.version_info >= (3, 12):
497 params["delete"] = self.delete
498
499 self._tempdir = await to_thread.run_sync(
500 lambda: tempfile.TemporaryDirectory(**params)
501 )
502 return await to_thread.run_sync(self._tempdir.__enter__)
503
504 async def __aexit__(
505 self,
506 exc_type: type[BaseException] | None,
507 exc_value: BaseException | None,
508 traceback: TracebackType | None,
509 ) -> None:
510 if self._tempdir is not None:
511 await to_thread.run_sync(
512 self._tempdir.__exit__, exc_type, exc_value, traceback
513 )
514
515 async def cleanup(self) -> None:
516 if self._tempdir is not None:
517 await to_thread.run_sync(self._tempdir.cleanup)
518
519
520@overload
521async def mkstemp(
522 suffix: str | None = None,
523 prefix: str | None = None,
524 dir: str | None = None,
525 text: bool = False,
526) -> tuple[int, str]: ...
527
528
529@overload
530async def mkstemp(
531 suffix: bytes | None = None,
532 prefix: bytes | None = None,
533 dir: bytes | None = None,
534 text: bool = False,
535) -> tuple[int, bytes]: ...
536
537
538async def mkstemp(
539 suffix: AnyStr | None = None,
540 prefix: AnyStr | None = None,
541 dir: AnyStr | None = None,
542 text: bool = False,
543) -> tuple[int, str | bytes]:
544 """
545 Asynchronously create a temporary file and return an OS-level handle and the file
546 name.
547
548 This function wraps `tempfile.mkstemp` and executes it in a background thread.
549
550 :param suffix: Suffix to be added to the file name.
551 :param prefix: Prefix to be added to the file name.
552 :param dir: Directory in which the temporary file is created.
553 :param text: Whether the file is opened in text mode.
554 :return: A tuple containing the file descriptor and the file name.
555
556 """
557 return await to_thread.run_sync(tempfile.mkstemp, suffix, prefix, dir, text)
558
559
560@overload
561async def mkdtemp(
562 suffix: str | None = None,
563 prefix: str | None = None,
564 dir: str | None = None,
565) -> str: ...
566
567
568@overload
569async def mkdtemp(
570 suffix: bytes | None = None,
571 prefix: bytes | None = None,
572 dir: bytes | None = None,
573) -> bytes: ...
574
575
576async def mkdtemp(
577 suffix: AnyStr | None = None,
578 prefix: AnyStr | None = None,
579 dir: AnyStr | None = None,
580) -> str | bytes:
581 """
582 Asynchronously create a temporary directory and return its path.
583
584 This function wraps `tempfile.mkdtemp` and executes it in a background thread.
585
586 :param suffix: Suffix to be added to the directory name.
587 :param prefix: Prefix to be added to the directory name.
588 :param dir: Parent directory where the temporary directory is created.
589 :return: The path of the created temporary directory.
590
591 """
592 return await to_thread.run_sync(tempfile.mkdtemp, suffix, prefix, dir)
593
594
595async def gettempdir() -> str:
596 """
597 Asynchronously return the name of the directory used for temporary files.
598
599 This function wraps `tempfile.gettempdir` and executes it in a background thread.
600
601 :return: The path of the temporary directory as a string.
602
603 """
604 return await to_thread.run_sync(tempfile.gettempdir)
605
606
607async def gettempdirb() -> bytes:
608 """
609 Asynchronously return the name of the directory used for temporary files in bytes.
610
611 This function wraps `tempfile.gettempdirb` and executes it in a background thread.
612
613 :return: The path of the temporary directory as bytes.
614
615 """
616 return await to_thread.run_sync(tempfile.gettempdirb)