1from __future__ import annotations
2
3import os
4import sys
5import tempfile
6from collections.abc import Iterable
7from io import BytesIO, TextIOWrapper
8from types import TracebackType
9from typing import (
10 TYPE_CHECKING,
11 Any,
12 AnyStr,
13 Generic,
14 overload,
15)
16
17from .. import to_thread
18from .._core._fileio import AsyncFile
19from ..lowlevel import checkpoint_if_cancelled
20
21if TYPE_CHECKING:
22 from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer
23
24
25class TemporaryFile(Generic[AnyStr]):
26 """
27 An asynchronous temporary file that is automatically created and cleaned up.
28
29 This class provides an asynchronous context manager interface to a temporary file.
30 The file is created using Python's standard `tempfile.TemporaryFile` function in a
31 background thread, and is wrapped as an asynchronous file using `AsyncFile`.
32
33 :param mode: The mode in which the file is opened. Defaults to "w+b".
34 :param buffering: The buffering policy (-1 means the default buffering).
35 :param encoding: The encoding used to decode or encode the file. Only applicable in
36 text mode.
37 :param newline: Controls how universal newlines mode works (only applicable in text
38 mode).
39 :param suffix: The suffix for the temporary file name.
40 :param prefix: The prefix for the temporary file name.
41 :param dir: The directory in which the temporary file is created.
42 :param errors: The error handling scheme used for encoding/decoding errors.
43 """
44
45 _async_file: AsyncFile[AnyStr]
46
47 @overload
48 def __init__(
49 self: TemporaryFile[bytes],
50 mode: OpenBinaryMode = ...,
51 buffering: int = ...,
52 encoding: str | None = ...,
53 newline: str | None = ...,
54 suffix: str | None = ...,
55 prefix: str | None = ...,
56 dir: str | None = ...,
57 *,
58 errors: str | None = ...,
59 ): ...
60 @overload
61 def __init__(
62 self: TemporaryFile[str],
63 mode: OpenTextMode,
64 buffering: int = ...,
65 encoding: str | None = ...,
66 newline: str | None = ...,
67 suffix: str | None = ...,
68 prefix: str | None = ...,
69 dir: str | None = ...,
70 *,
71 errors: str | None = ...,
72 ): ...
73
74 def __init__(
75 self,
76 mode: OpenTextMode | OpenBinaryMode = "w+b",
77 buffering: int = -1,
78 encoding: str | None = None,
79 newline: str | None = None,
80 suffix: str | None = None,
81 prefix: str | None = None,
82 dir: str | None = None,
83 *,
84 errors: str | None = None,
85 ) -> None:
86 self.mode = mode
87 self.buffering = buffering
88 self.encoding = encoding
89 self.newline = newline
90 self.suffix: str | None = suffix
91 self.prefix: str | None = prefix
92 self.dir: str | None = dir
93 self.errors = errors
94
95 async def __aenter__(self) -> AsyncFile[AnyStr]:
96 fp = await to_thread.run_sync(
97 lambda: tempfile.TemporaryFile(
98 self.mode,
99 self.buffering,
100 self.encoding,
101 self.newline,
102 self.suffix,
103 self.prefix,
104 self.dir,
105 errors=self.errors,
106 )
107 )
108 self._async_file = AsyncFile(fp)
109 return self._async_file
110
111 async def __aexit__(
112 self,
113 exc_type: type[BaseException] | None,
114 exc_value: BaseException | None,
115 traceback: TracebackType | None,
116 ) -> None:
117 await self._async_file.aclose()
118
119
120class NamedTemporaryFile(Generic[AnyStr]):
121 """
122 An asynchronous named temporary file that is automatically created and cleaned up.
123
124 This class provides an asynchronous context manager for a temporary file with a
125 visible name in the file system. It uses Python's standard
126 :func:`~tempfile.NamedTemporaryFile` function and wraps the file object with
127 :class:`AsyncFile` for asynchronous operations.
128
129 :param mode: The mode in which the file is opened. Defaults to "w+b".
130 :param buffering: The buffering policy (-1 means the default buffering).
131 :param encoding: The encoding used to decode or encode the file. Only applicable in
132 text mode.
133 :param newline: Controls how universal newlines mode works (only applicable in text
134 mode).
135 :param suffix: The suffix for the temporary file name.
136 :param prefix: The prefix for the temporary file name.
137 :param dir: The directory in which the temporary file is created.
138 :param delete: Whether to delete the file when it is closed.
139 :param errors: The error handling scheme used for encoding/decoding errors.
140 :param delete_on_close: (Python 3.12+) Whether to delete the file on close.
141 """
142
143 _async_file: AsyncFile[AnyStr]
144
145 @overload
146 def __init__(
147 self: NamedTemporaryFile[bytes],
148 mode: OpenBinaryMode = ...,
149 buffering: int = ...,
150 encoding: str | None = ...,
151 newline: str | None = ...,
152 suffix: str | None = ...,
153 prefix: str | None = ...,
154 dir: str | None = ...,
155 delete: bool = ...,
156 *,
157 errors: str | None = ...,
158 delete_on_close: bool = ...,
159 ): ...
160 @overload
161 def __init__(
162 self: NamedTemporaryFile[str],
163 mode: OpenTextMode,
164 buffering: int = ...,
165 encoding: str | None = ...,
166 newline: str | None = ...,
167 suffix: str | None = ...,
168 prefix: str | None = ...,
169 dir: str | None = ...,
170 delete: bool = ...,
171 *,
172 errors: str | None = ...,
173 delete_on_close: bool = ...,
174 ): ...
175
176 def __init__(
177 self,
178 mode: OpenBinaryMode | OpenTextMode = "w+b",
179 buffering: int = -1,
180 encoding: str | None = None,
181 newline: str | None = None,
182 suffix: str | None = None,
183 prefix: str | None = None,
184 dir: str | None = None,
185 delete: bool = True,
186 *,
187 errors: str | None = None,
188 delete_on_close: bool = True,
189 ) -> None:
190 self._params: dict[str, Any] = {
191 "mode": mode,
192 "buffering": buffering,
193 "encoding": encoding,
194 "newline": newline,
195 "suffix": suffix,
196 "prefix": prefix,
197 "dir": dir,
198 "delete": delete,
199 "errors": errors,
200 }
201 if sys.version_info >= (3, 12):
202 self._params["delete_on_close"] = delete_on_close
203
204 async def __aenter__(self) -> AsyncFile[AnyStr]:
205 fp = await to_thread.run_sync(
206 lambda: tempfile.NamedTemporaryFile(**self._params)
207 )
208 self._async_file = AsyncFile(fp)
209 return self._async_file
210
211 async def __aexit__(
212 self,
213 exc_type: type[BaseException] | None,
214 exc_value: BaseException | None,
215 traceback: TracebackType | None,
216 ) -> None:
217 await self._async_file.aclose()
218
219
220class SpooledTemporaryFile(AsyncFile[AnyStr]):
221 """
222 An asynchronous spooled temporary file that starts in memory and is spooled to disk.
223
224 This class provides an asynchronous interface to a spooled temporary file, much like
225 Python's standard :class:`~tempfile.SpooledTemporaryFile`. It supports asynchronous
226 write operations and provides a method to force a rollover to disk.
227
228 :param max_size: Maximum size in bytes before the file is rolled over to disk.
229 :param mode: The mode in which the file is opened. Defaults to "w+b".
230 :param buffering: The buffering policy (-1 means the default buffering).
231 :param encoding: The encoding used to decode or encode the file (text mode only).
232 :param newline: Controls how universal newlines mode works (text mode only).
233 :param suffix: The suffix for the temporary file name.
234 :param prefix: The prefix for the temporary file name.
235 :param dir: The directory in which the temporary file is created.
236 :param errors: The error handling scheme used for encoding/decoding errors.
237 """
238
239 _rolled: bool = False
240
241 @overload
242 def __init__(
243 self: SpooledTemporaryFile[bytes],
244 max_size: int = ...,
245 mode: OpenBinaryMode = ...,
246 buffering: int = ...,
247 encoding: str | None = ...,
248 newline: str | None = ...,
249 suffix: str | None = ...,
250 prefix: str | None = ...,
251 dir: str | None = ...,
252 *,
253 errors: str | None = ...,
254 ): ...
255 @overload
256 def __init__(
257 self: SpooledTemporaryFile[str],
258 max_size: int = ...,
259 mode: OpenTextMode = ...,
260 buffering: int = ...,
261 encoding: str | None = ...,
262 newline: str | None = ...,
263 suffix: str | None = ...,
264 prefix: str | None = ...,
265 dir: str | None = ...,
266 *,
267 errors: str | None = ...,
268 ): ...
269
270 def __init__(
271 self,
272 max_size: int = 0,
273 mode: OpenBinaryMode | OpenTextMode = "w+b",
274 buffering: int = -1,
275 encoding: str | None = None,
276 newline: str | None = None,
277 suffix: str | None = None,
278 prefix: str | None = None,
279 dir: str | None = None,
280 *,
281 errors: str | None = None,
282 ) -> None:
283 self._tempfile_params: dict[str, Any] = {
284 "mode": mode,
285 "buffering": buffering,
286 "encoding": encoding,
287 "newline": newline,
288 "suffix": suffix,
289 "prefix": prefix,
290 "dir": dir,
291 "errors": errors,
292 }
293 self._max_size = max_size
294 if "b" in mode:
295 super().__init__(BytesIO()) # type: ignore[arg-type]
296 else:
297 super().__init__(
298 TextIOWrapper( # type: ignore[arg-type]
299 BytesIO(),
300 encoding=encoding,
301 errors=errors,
302 newline=newline,
303 write_through=True,
304 )
305 )
306
307 async def aclose(self) -> None:
308 if not self._rolled:
309 self._fp.close()
310 return
311
312 await super().aclose()
313
314 async def _check(self) -> None:
315 if self._rolled or self._fp.tell() <= self._max_size:
316 return
317
318 await self.rollover()
319
320 async def rollover(self) -> None:
321 if self._rolled:
322 return
323
324 self._rolled = True
325 buffer = self._fp
326 buffer.seek(0)
327 self._fp = await to_thread.run_sync(
328 lambda: tempfile.TemporaryFile(**self._tempfile_params)
329 )
330 await self.write(buffer.read())
331 buffer.close()
332
333 @property
334 def closed(self) -> bool:
335 return self._fp.closed
336
337 async def read(self, size: int = -1) -> AnyStr:
338 if not self._rolled:
339 await checkpoint_if_cancelled()
340 return self._fp.read(size)
341
342 return await super().read(size) # type: ignore[return-value]
343
344 async def read1(self: SpooledTemporaryFile[bytes], size: int = -1) -> bytes:
345 if not self._rolled:
346 await checkpoint_if_cancelled()
347 return self._fp.read1(size)
348
349 return await super().read1(size)
350
351 async def readline(self) -> AnyStr:
352 if not self._rolled:
353 await checkpoint_if_cancelled()
354 return self._fp.readline()
355
356 return await super().readline() # type: ignore[return-value]
357
358 async def readlines(self) -> list[AnyStr]:
359 if not self._rolled:
360 await checkpoint_if_cancelled()
361 return self._fp.readlines()
362
363 return await super().readlines() # type: ignore[return-value]
364
365 async def readinto(self: SpooledTemporaryFile[bytes], b: WriteableBuffer) -> int:
366 if not self._rolled:
367 await checkpoint_if_cancelled()
368 self._fp.readinto(b)
369
370 return await super().readinto(b)
371
372 async def readinto1(self: SpooledTemporaryFile[bytes], b: WriteableBuffer) -> int:
373 if not self._rolled:
374 await checkpoint_if_cancelled()
375 self._fp.readinto(b)
376
377 return await super().readinto1(b)
378
379 async def seek(self, offset: int, whence: int | None = os.SEEK_SET) -> int:
380 if not self._rolled:
381 await checkpoint_if_cancelled()
382 return self._fp.seek(offset, whence)
383
384 return await super().seek(offset, whence)
385
386 async def tell(self) -> int:
387 if not self._rolled:
388 await checkpoint_if_cancelled()
389 return self._fp.tell()
390
391 return await super().tell()
392
393 async def truncate(self, size: int | None = None) -> int:
394 if not self._rolled:
395 await checkpoint_if_cancelled()
396 return self._fp.truncate(size)
397
398 return await super().truncate(size)
399
400 @overload
401 async def write(self: SpooledTemporaryFile[bytes], b: ReadableBuffer) -> int: ...
402 @overload
403 async def write(self: SpooledTemporaryFile[str], b: str) -> int: ...
404
405 async def write(self, b: ReadableBuffer | str) -> int:
406 """
407 Asynchronously write data to the spooled temporary file.
408
409 If the file has not yet been rolled over, the data is written synchronously,
410 and a rollover is triggered if the size exceeds the maximum size.
411
412 :param s: The data to write.
413 :return: The number of bytes written.
414 :raises RuntimeError: If the underlying file is not initialized.
415
416 """
417 if not self._rolled:
418 await checkpoint_if_cancelled()
419 result = self._fp.write(b)
420 await self._check()
421 return result
422
423 return await super().write(b) # type: ignore[misc]
424
425 @overload
426 async def writelines(
427 self: SpooledTemporaryFile[bytes], lines: Iterable[ReadableBuffer]
428 ) -> None: ...
429 @overload
430 async def writelines(
431 self: SpooledTemporaryFile[str], lines: Iterable[str]
432 ) -> None: ...
433
434 async def writelines(self, lines: Iterable[str] | Iterable[ReadableBuffer]) -> None:
435 """
436 Asynchronously write a list of lines to the spooled temporary file.
437
438 If the file has not yet been rolled over, the lines are written synchronously,
439 and a rollover is triggered if the size exceeds the maximum size.
440
441 :param lines: An iterable of lines to write.
442 :raises RuntimeError: If the underlying file is not initialized.
443
444 """
445 if not self._rolled:
446 await checkpoint_if_cancelled()
447 result = self._fp.writelines(lines)
448 await self._check()
449 return result
450
451 return await super().writelines(lines) # type: ignore[misc]
452
453
454class TemporaryDirectory(Generic[AnyStr]):
455 """
456 An asynchronous temporary directory that is created and cleaned up automatically.
457
458 This class provides an asynchronous context manager for creating a temporary
459 directory. It wraps Python's standard :class:`~tempfile.TemporaryDirectory` to
460 perform directory creation and cleanup operations in a background thread.
461
462 :param suffix: Suffix to be added to the temporary directory name.
463 :param prefix: Prefix to be added to the temporary directory name.
464 :param dir: The parent directory where the temporary directory is created.
465 :param ignore_cleanup_errors: Whether to ignore errors during cleanup
466 :param delete: Whether to delete the directory upon closing (Python 3.12+).
467 """
468
469 def __init__(
470 self,
471 suffix: AnyStr | None = None,
472 prefix: AnyStr | None = None,
473 dir: AnyStr | None = None,
474 *,
475 ignore_cleanup_errors: bool = False,
476 delete: bool = True,
477 ) -> None:
478 self.suffix: AnyStr | None = suffix
479 self.prefix: AnyStr | None = prefix
480 self.dir: AnyStr | None = dir
481 self.ignore_cleanup_errors = ignore_cleanup_errors
482 self.delete = delete
483
484 self._tempdir: tempfile.TemporaryDirectory | None = None
485
486 async def __aenter__(self) -> str:
487 params: dict[str, Any] = {
488 "suffix": self.suffix,
489 "prefix": self.prefix,
490 "dir": self.dir,
491 "ignore_cleanup_errors": self.ignore_cleanup_errors,
492 }
493 if sys.version_info >= (3, 12):
494 params["delete"] = self.delete
495
496 self._tempdir = await to_thread.run_sync(
497 lambda: tempfile.TemporaryDirectory(**params)
498 )
499 return await to_thread.run_sync(self._tempdir.__enter__)
500
501 async def __aexit__(
502 self,
503 exc_type: type[BaseException] | None,
504 exc_value: BaseException | None,
505 traceback: TracebackType | None,
506 ) -> None:
507 if self._tempdir is not None:
508 await to_thread.run_sync(
509 self._tempdir.__exit__, exc_type, exc_value, traceback
510 )
511
512 async def cleanup(self) -> None:
513 if self._tempdir is not None:
514 await to_thread.run_sync(self._tempdir.cleanup)
515
516
517@overload
518async def mkstemp(
519 suffix: str | None = None,
520 prefix: str | None = None,
521 dir: str | None = None,
522 text: bool = False,
523) -> tuple[int, str]: ...
524
525
526@overload
527async def mkstemp(
528 suffix: bytes | None = None,
529 prefix: bytes | None = None,
530 dir: bytes | None = None,
531 text: bool = False,
532) -> tuple[int, bytes]: ...
533
534
535async def mkstemp(
536 suffix: AnyStr | None = None,
537 prefix: AnyStr | None = None,
538 dir: AnyStr | None = None,
539 text: bool = False,
540) -> tuple[int, str | bytes]:
541 """
542 Asynchronously create a temporary file and return an OS-level handle and the file
543 name.
544
545 This function wraps `tempfile.mkstemp` and executes it in a background thread.
546
547 :param suffix: Suffix to be added to the file name.
548 :param prefix: Prefix to be added to the file name.
549 :param dir: Directory in which the temporary file is created.
550 :param text: Whether the file is opened in text mode.
551 :return: A tuple containing the file descriptor and the file name.
552
553 """
554 return await to_thread.run_sync(tempfile.mkstemp, suffix, prefix, dir, text)
555
556
557@overload
558async def mkdtemp(
559 suffix: str | None = None,
560 prefix: str | None = None,
561 dir: str | None = None,
562) -> str: ...
563
564
565@overload
566async def mkdtemp(
567 suffix: bytes | None = None,
568 prefix: bytes | None = None,
569 dir: bytes | None = None,
570) -> bytes: ...
571
572
573async def mkdtemp(
574 suffix: AnyStr | None = None,
575 prefix: AnyStr | None = None,
576 dir: AnyStr | None = None,
577) -> str | bytes:
578 """
579 Asynchronously create a temporary directory and return its path.
580
581 This function wraps `tempfile.mkdtemp` and executes it in a background thread.
582
583 :param suffix: Suffix to be added to the directory name.
584 :param prefix: Prefix to be added to the directory name.
585 :param dir: Parent directory where the temporary directory is created.
586 :return: The path of the created temporary directory.
587
588 """
589 return await to_thread.run_sync(tempfile.mkdtemp, suffix, prefix, dir)
590
591
592async def gettempdir() -> str:
593 """
594 Asynchronously return the name of the directory used for temporary files.
595
596 This function wraps `tempfile.gettempdir` and executes it in a background thread.
597
598 :return: The path of the temporary directory as a string.
599
600 """
601 return await to_thread.run_sync(tempfile.gettempdir)
602
603
604async def gettempdirb() -> bytes:
605 """
606 Asynchronously return the name of the directory used for temporary files in bytes.
607
608 This function wraps `tempfile.gettempdirb` and executes it in a background thread.
609
610 :return: The path of the temporary directory as bytes.
611
612 """
613 return await to_thread.run_sync(tempfile.gettempdirb)