Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/tempfile.py: 21%
519 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-20 07:00 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-20 07:00 +0000
1"""Temporary files.
3This module provides generic, low- and high-level interfaces for
4creating temporary files and directories. All of the interfaces
5provided by this module can be used without fear of race conditions
6except for 'mktemp'. 'mktemp' is subject to race conditions and
7should not be used; it is provided for backward compatibility only.
9The default path names are returned as str. If you supply bytes as
10input, all return values will be in bytes. Ex:
12 >>> tempfile.mkstemp()
13 (4, '/tmp/tmptpu9nin8')
14 >>> tempfile.mkdtemp(suffix=b'')
15 b'/tmp/tmppbi8f0hy'
17This module also provides some data items to the user:
19 TMP_MAX - maximum number of names that will be tried before
20 giving up.
21 tempdir - If this is set to a string before the first use of
22 any routine from this module, it will be considered as
23 another candidate location to store temporary files.
24"""
26__all__ = [
27 "NamedTemporaryFile", "TemporaryFile", # high level safe interfaces
28 "SpooledTemporaryFile", "TemporaryDirectory",
29 "mkstemp", "mkdtemp", # low level safe interfaces
30 "mktemp", # deprecated unsafe interface
31 "TMP_MAX", "gettempprefix", # constants
32 "tempdir", "gettempdir",
33 "gettempprefixb", "gettempdirb",
34 ]
37# Imports.
39import functools as _functools
40import warnings as _warnings
41import io as _io
42import os as _os
43try:
44 import shutil as _shutil
45 _rmtree = _shutil.rmtree
46except ImportError:
47 import sys as _sys
48 import stat as _stat
49 # version vulnerable to race conditions
50 def _rmtree_unsafe(path, onerror):
51 try:
52 if _os.path.islink(path):
53 # symlinks to directories are forbidden, see bug #1669
54 raise OSError("Cannot call rmtree on a symbolic link")
55 except OSError:
56 onerror(_os.path.islink, path, _sys.exc_info())
57 # can't continue even if onerror hook returns
58 return
59 names = []
60 try:
61 names = _os.listdir(path)
62 except OSError:
63 onerror(_os.listdir, path, _sys.exc_info())
64 for name in names:
65 fullname = _os.path.join(path, name)
66 try:
67 mode = _os.lstat(fullname).st_mode
68 except OSError:
69 mode = 0
70 if _stat.S_ISDIR(mode):
71 _rmtree_unsafe(fullname, onerror)
72 else:
73 try:
74 _os.unlink(fullname)
75 except OSError:
76 onerror(_os.unlink, fullname, _sys.exc_info())
77 try:
78 _os.rmdir(path)
79 except OSError:
80 onerror(_os.rmdir, path, _sys.exc_info())
82 # Version using fd-based APIs to protect against races
83 def _rmtree_safe_fd(topfd, path, onerror):
84 names = []
85 try:
86 names = _os.listdir(topfd)
87 except OSError as err:
88 err.filename = path
89 onerror(_os.listdir, path, _sys.exc_info())
90 for name in names:
91 fullname = _os.path.join(path, name)
92 try:
93 orig_st = _os.stat(name, dir_fd=topfd, follow_symlinks=False)
94 mode = orig_st.st_mode
95 except OSError:
96 mode = 0
97 if _stat.S_ISDIR(mode):
98 try:
99 dirfd = _os.open(name, _os.O_RDONLY, dir_fd=topfd)
100 except OSError:
101 onerror(_os.open, fullname, _sys.exc_info())
102 else:
103 try:
104 if _os.path.samestat(orig_st, _os.fstat(dirfd)):
105 _rmtree_safe_fd(dirfd, fullname, onerror)
106 try:
107 _os.rmdir(name, dir_fd=topfd)
108 except OSError:
109 onerror(_os.rmdir, fullname, _sys.exc_info())
110 else:
111 try:
112 # This can only happen if someone replaces
113 # a directory with a symlink after the call to
114 # stat.S_ISDIR above.
115 raise OSError("Cannot call rmtree on a symbolic "
116 "link")
117 except OSError:
118 onerror(_os.path.islink, fullname, _sys.exc_info())
119 finally:
120 _os.close(dirfd)
121 else:
122 try:
123 _os.unlink(name, dir_fd=topfd)
124 except OSError:
125 onerror(_os.unlink, fullname, _sys.exc_info())
127 _use_fd_functions = ({_os.open, _os.stat, _os.unlink, _os.rmdir} <=
128 _os.supports_dir_fd and
129 _os.listdir in _os.supports_fd and
130 _os.stat in _os.supports_follow_symlinks)
132 def _rmtree(path, ignore_errors=False, onerror=None):
133 """Recursively delete a directory tree.
135 If ignore_errors is set, errors are ignored; otherwise, if onerror
136 is set, it is called to handle the error with arguments (func,
137 path, exc_info) where func is platform and implementation dependent;
138 path is the argument to that function that caused it to fail; and
139 exc_info is a tuple returned by sys.exc_info(). If ignore_errors
140 is false and onerror is None, an exception is raised.
142 """
143 if ignore_errors:
144 def onerror(*args):
145 pass
146 elif onerror is None:
147 def onerror(*args):
148 raise
149 if _use_fd_functions:
150 # While the unsafe rmtree works fine on bytes, the fd based does not.
151 if isinstance(path, bytes):
152 path = _os.fsdecode(path)
153 # Note: To guard against symlink races, we use the standard
154 # lstat()/open()/fstat() trick.
155 try:
156 orig_st = _os.lstat(path)
157 except Exception:
158 onerror(_os.lstat, path, _sys.exc_info())
159 return
160 try:
161 fd = _os.open(path, _os.O_RDONLY)
162 except Exception:
163 onerror(_os.lstat, path, _sys.exc_info())
164 return
165 try:
166 if _os.path.samestat(orig_st, _os.fstat(fd)):
167 _rmtree_safe_fd(fd, path, onerror)
168 try:
169 _os.rmdir(path)
170 except OSError:
171 onerror(_os.rmdir, path, _sys.exc_info())
172 else:
173 try:
174 # symlinks to directories are forbidden, see bug #1669
175 raise OSError("Cannot call rmtree on a symbolic link")
176 except OSError:
177 onerror(_os.path.islink, path, _sys.exc_info())
178 finally:
179 _os.close(fd)
180 else:
181 return _rmtree_unsafe(path, onerror)
183import errno as _errno
184from random import Random as _Random
185import sys as _sys
186import types as _types
187import weakref as _weakref
188import _thread
189_allocate_lock = _thread.allocate_lock
191_text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL
192if hasattr(_os, 'O_NOFOLLOW'):
193 _text_openflags |= _os.O_NOFOLLOW
195_bin_openflags = _text_openflags
196if hasattr(_os, 'O_BINARY'):
197 _bin_openflags |= _os.O_BINARY
199if hasattr(_os, 'TMP_MAX'):
200 TMP_MAX = _os.TMP_MAX
201else:
202 TMP_MAX = 10000
204# This variable _was_ unused for legacy reasons, see issue 10354.
205# But as of 3.5 we actually use it at runtime so changing it would
206# have a possibly desirable side effect... But we do not want to support
207# that as an API. It is undocumented on purpose. Do not depend on this.
208template = "tmp"
210# Internal routines.
212_once_lock = _allocate_lock()
215def _exists(fn):
216 try:
217 _os.lstat(fn)
218 except OSError:
219 return False
220 else:
221 return True
224def _infer_return_type(*args):
225 """Look at the type of all args and divine their implied return type."""
226 return_type = None
227 for arg in args:
228 if arg is None:
229 continue
230 if isinstance(arg, bytes):
231 if return_type is str:
232 raise TypeError("Can't mix bytes and non-bytes in "
233 "path components.")
234 return_type = bytes
235 else:
236 if return_type is bytes:
237 raise TypeError("Can't mix bytes and non-bytes in "
238 "path components.")
239 return_type = str
240 if return_type is None:
241 return str # tempfile APIs return a str by default.
242 return return_type
245def _sanitize_params(prefix, suffix, dir):
246 """Common parameter processing for most APIs in this module."""
247 output_type = _infer_return_type(prefix, suffix, dir)
248 if suffix is None:
249 suffix = output_type()
250 if prefix is None:
251 if output_type is str:
252 prefix = template
253 else:
254 prefix = _os.fsencode(template)
255 if dir is None:
256 if output_type is str:
257 dir = gettempdir()
258 else:
259 dir = gettempdirb()
260 return prefix, suffix, dir, output_type
263class _RandomNameSequence:
264 """An instance of _RandomNameSequence generates an endless
265 sequence of unpredictable strings which can safely be incorporated
266 into file names. Each string is eight characters long. Multiple
267 threads can safely use the same instance at the same time.
269 _RandomNameSequence is an iterator."""
271 characters = "abcdefghijklmnopqrstuvwxyz0123456789_"
273 @property
274 def rng(self):
275 cur_pid = _os.getpid()
276 if cur_pid != getattr(self, '_rng_pid', None):
277 self._rng = _Random()
278 self._rng_pid = cur_pid
279 return self._rng
281 def __iter__(self):
282 return self
284 def __next__(self):
285 c = self.characters
286 choose = self.rng.choice
287 letters = [choose(c) for dummy in range(8)]
288 return ''.join(letters)
290def _candidate_tempdir_list():
291 """Generate a list of candidate temporary directories which
292 _get_default_tempdir will try."""
294 dirlist = []
296 # First, try the environment.
297 for envname in 'TMPDIR', 'TEMP', 'TMP':
298 dirname = _os.getenv(envname)
299 if dirname: dirlist.append(dirname)
301 # Failing that, try OS-specific locations.
302 if _os.name == 'nt':
303 dirlist.extend([ _os.path.expanduser(r'~\AppData\Local\Temp'),
304 _os.path.expandvars(r'%SYSTEMROOT%\Temp'),
305 r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
306 else:
307 dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
309 # As a last resort, the current directory.
310 try:
311 dirlist.append(_os.getcwd())
312 except (AttributeError, OSError):
313 dirlist.append(_os.curdir)
315 return dirlist
317def _get_default_tempdir():
318 """Calculate the default directory to use for temporary files.
319 This routine should be called exactly once.
321 We determine whether or not a candidate temp dir is usable by
322 trying to create and write to a file in that directory. If this
323 is successful, the test file is deleted. To prevent denial of
324 service, the name of the test file must be randomized."""
326 namer = _RandomNameSequence()
327 dirlist = _candidate_tempdir_list()
329 for dir in dirlist:
330 if dir != _os.curdir:
331 dir = _os.path.abspath(dir)
332 # Try only a few names per directory.
333 for seq in range(100):
334 name = next(namer)
335 filename = _os.path.join(dir, name)
336 try:
337 fd = _os.open(filename, _bin_openflags, 0o600)
338 try:
339 try:
340 with _io.open(fd, 'wb', closefd=False) as fp:
341 fp.write(b'blat')
342 finally:
343 _os.close(fd)
344 finally:
345 _os.unlink(filename)
346 return dir
347 except FileExistsError:
348 pass
349 except PermissionError:
350 # This exception is thrown when a directory with the chosen name
351 # already exists on windows.
352 if (_os.name == 'nt' and _os.path.isdir(dir) and
353 _os.access(dir, _os.W_OK)):
354 continue
355 break # no point trying more names in this directory
356 except OSError:
357 break # no point trying more names in this directory
358 raise FileNotFoundError(_errno.ENOENT,
359 "No usable temporary directory found in %s" %
360 dirlist)
362_name_sequence = None
364def _get_candidate_names():
365 """Common setup sequence for all user-callable interfaces."""
367 global _name_sequence
368 if _name_sequence is None:
369 _once_lock.acquire()
370 try:
371 if _name_sequence is None:
372 _name_sequence = _RandomNameSequence()
373 finally:
374 _once_lock.release()
375 return _name_sequence
378def _mkstemp_inner(dir, pre, suf, flags, output_type):
379 """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile."""
381 names = _get_candidate_names()
382 if output_type is bytes:
383 names = map(_os.fsencode, names)
385 for seq in range(TMP_MAX):
386 name = next(names)
387 file = _os.path.join(dir, pre + name + suf)
388 _sys.audit("tempfile.mkstemp", file)
389 try:
390 fd = _os.open(file, flags, 0o600)
391 except FileExistsError:
392 continue # try again
393 except PermissionError:
394 # This exception is thrown when a directory with the chosen name
395 # already exists on windows.
396 if (_os.name == 'nt' and _os.path.isdir(dir) and
397 _os.access(dir, _os.W_OK)):
398 continue
399 else:
400 raise
401 return (fd, _os.path.abspath(file))
403 raise FileExistsError(_errno.EEXIST,
404 "No usable temporary file name found")
407# User visible interfaces.
409def gettempprefix():
410 """The default prefix for temporary directories."""
411 return template
413def gettempprefixb():
414 """The default prefix for temporary directories as bytes."""
415 return _os.fsencode(gettempprefix())
417tempdir = None
419def gettempdir():
420 """Accessor for tempfile.tempdir."""
421 global tempdir
422 if tempdir is None:
423 _once_lock.acquire()
424 try:
425 if tempdir is None:
426 tempdir = _get_default_tempdir()
427 finally:
428 _once_lock.release()
429 return tempdir
431def gettempdirb():
432 """A bytes version of tempfile.gettempdir()."""
433 return _os.fsencode(gettempdir())
435def mkstemp(suffix=None, prefix=None, dir=None, text=False):
436 """User-callable function to create and return a unique temporary
437 file. The return value is a pair (fd, name) where fd is the
438 file descriptor returned by os.open, and name is the filename.
440 If 'suffix' is not None, the file name will end with that suffix,
441 otherwise there will be no suffix.
443 If 'prefix' is not None, the file name will begin with that prefix,
444 otherwise a default prefix is used.
446 If 'dir' is not None, the file will be created in that directory,
447 otherwise a default directory is used.
449 If 'text' is specified and true, the file is opened in text
450 mode. Else (the default) the file is opened in binary mode.
452 If any of 'suffix', 'prefix' and 'dir' are not None, they must be the
453 same type. If they are bytes, the returned name will be bytes; str
454 otherwise.
456 The file is readable and writable only by the creating user ID.
457 If the operating system uses permission bits to indicate whether a
458 file is executable, the file is executable by no one. The file
459 descriptor is not inherited by children of this process.
461 Caller is responsible for deleting the file when done with it.
462 """
464 prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
466 if text:
467 flags = _text_openflags
468 else:
469 flags = _bin_openflags
471 return _mkstemp_inner(dir, prefix, suffix, flags, output_type)
474def mkdtemp(suffix=None, prefix=None, dir=None):
475 """User-callable function to create and return a unique temporary
476 directory. The return value is the pathname of the directory.
478 Arguments are as for mkstemp, except that the 'text' argument is
479 not accepted.
481 The directory is readable, writable, and searchable only by the
482 creating user.
484 Caller is responsible for deleting the directory when done with it.
485 """
487 prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
489 names = _get_candidate_names()
490 if output_type is bytes:
491 names = map(_os.fsencode, names)
493 for seq in range(TMP_MAX):
494 name = next(names)
495 file = _os.path.join(dir, prefix + name + suffix)
496 _sys.audit("tempfile.mkdtemp", file)
497 try:
498 _os.mkdir(file, 0o700)
499 except FileExistsError:
500 continue # try again
501 except PermissionError:
502 # This exception is thrown when a directory with the chosen name
503 # already exists on windows.
504 if (_os.name == 'nt' and _os.path.isdir(dir) and
505 _os.access(dir, _os.W_OK)):
506 continue
507 else:
508 raise
509 return file
511 raise FileExistsError(_errno.EEXIST,
512 "No usable temporary directory name found")
514def mktemp(suffix="", prefix=template, dir=None):
515 """User-callable function to return a unique temporary file name. The
516 file is not created.
518 Arguments are similar to mkstemp, except that the 'text' argument is
519 not accepted, and suffix=None, prefix=None and bytes file names are not
520 supported.
522 THIS FUNCTION IS UNSAFE AND SHOULD NOT BE USED. The file name may
523 refer to a file that did not exist at some point, but by the time
524 you get around to creating it, someone else may have beaten you to
525 the punch.
526 """
528## from warnings import warn as _warn
529## _warn("mktemp is a potential security risk to your program",
530## RuntimeWarning, stacklevel=2)
532 if dir is None:
533 dir = gettempdir()
535 names = _get_candidate_names()
536 for seq in range(TMP_MAX):
537 name = next(names)
538 file = _os.path.join(dir, prefix + name + suffix)
539 if not _exists(file):
540 return file
542 raise FileExistsError(_errno.EEXIST,
543 "No usable temporary filename found")
546class _TemporaryFileCloser:
547 """A separate object allowing proper closing of a temporary file's
548 underlying file object, without adding a __del__ method to the
549 temporary file."""
551 file = None # Set here since __del__ checks it
552 close_called = False
554 def __init__(self, file, name, delete=True):
555 self.file = file
556 self.name = name
557 self.delete = delete
559 # NT provides delete-on-close as a primitive, so we don't need
560 # the wrapper to do anything special. We still use it so that
561 # file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile.
562 if _os.name != 'nt':
563 # Cache the unlinker so we don't get spurious errors at
564 # shutdown when the module-level "os" is None'd out. Note
565 # that this must be referenced as self.unlink, because the
566 # name TemporaryFileWrapper may also get None'd out before
567 # __del__ is called.
569 def close(self, unlink=_os.unlink):
570 if not self.close_called and self.file is not None:
571 self.close_called = True
572 try:
573 self.file.close()
574 finally:
575 if self.delete:
576 unlink(self.name)
578 # Need to ensure the file is deleted on __del__
579 def __del__(self):
580 self.close()
582 else:
583 def close(self):
584 if not self.close_called:
585 self.close_called = True
586 self.file.close()
589class _TemporaryFileWrapper:
590 """Temporary file wrapper
592 This class provides a wrapper around files opened for
593 temporary use. In particular, it seeks to automatically
594 remove the file when it is no longer needed.
595 """
597 def __init__(self, file, name, delete=True):
598 self.file = file
599 self.name = name
600 self.delete = delete
601 self._closer = _TemporaryFileCloser(file, name, delete)
603 def __getattr__(self, name):
604 # Attribute lookups are delegated to the underlying file
605 # and cached for non-numeric results
606 # (i.e. methods are cached, closed and friends are not)
607 file = self.__dict__['file']
608 a = getattr(file, name)
609 if hasattr(a, '__call__'):
610 func = a
611 @_functools.wraps(func)
612 def func_wrapper(*args, **kwargs):
613 return func(*args, **kwargs)
614 # Avoid closing the file as long as the wrapper is alive,
615 # see issue #18879.
616 func_wrapper._closer = self._closer
617 a = func_wrapper
618 if not isinstance(a, int):
619 setattr(self, name, a)
620 return a
622 # The underlying __enter__ method returns the wrong object
623 # (self.file) so override it to return the wrapper
624 def __enter__(self):
625 self.file.__enter__()
626 return self
628 # Need to trap __exit__ as well to ensure the file gets
629 # deleted when used in a with statement
630 def __exit__(self, exc, value, tb):
631 result = self.file.__exit__(exc, value, tb)
632 self.close()
633 return result
635 def close(self):
636 """
637 Close the temporary file, possibly deleting it.
638 """
639 self._closer.close()
641 # iter() doesn't use __getattr__ to find the __iter__ method
642 def __iter__(self):
643 # Don't return iter(self.file), but yield from it to avoid closing
644 # file as long as it's being used as iterator (see issue #23700). We
645 # can't use 'yield from' here because iter(file) returns the file
646 # object itself, which has a close method, and thus the file would get
647 # closed when the generator is finalized, due to PEP380 semantics.
648 for line in self.file:
649 yield line
652def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None,
653 newline=None, suffix=None, prefix=None,
654 dir=None, delete=True, *, errors=None):
655 """Create and return a temporary file.
656 Arguments:
657 'prefix', 'suffix', 'dir' -- as for mkstemp.
658 'mode' -- the mode argument to io.open (default "w+b").
659 'buffering' -- the buffer size argument to io.open (default -1).
660 'encoding' -- the encoding argument to io.open (default None)
661 'newline' -- the newline argument to io.open (default None)
662 'delete' -- whether the file is deleted on close (default True).
663 'errors' -- the errors argument to io.open (default None)
664 The file is created as mkstemp() would do it.
666 Returns an object with a file-like interface; the name of the file
667 is accessible as its 'name' attribute. The file will be automatically
668 deleted when it is closed unless the 'delete' argument is set to False.
669 """
671 prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
673 flags = _bin_openflags
675 # Setting O_TEMPORARY in the flags causes the OS to delete
676 # the file when it is closed. This is only supported by Windows.
677 if _os.name == 'nt' and delete:
678 flags |= _os.O_TEMPORARY
680 (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
681 try:
682 file = _io.open(fd, mode, buffering=buffering,
683 newline=newline, encoding=encoding, errors=errors)
685 return _TemporaryFileWrapper(file, name, delete)
686 except BaseException:
687 _os.unlink(name)
688 _os.close(fd)
689 raise
691if _os.name != 'posix' or _sys.platform == 'cygwin':
692 # On non-POSIX and Cygwin systems, assume that we cannot unlink a file
693 # while it is open.
694 TemporaryFile = NamedTemporaryFile
696else:
697 # Is the O_TMPFILE flag available and does it work?
698 # The flag is set to False if os.open(dir, os.O_TMPFILE) raises an
699 # IsADirectoryError exception
700 _O_TMPFILE_WORKS = hasattr(_os, 'O_TMPFILE')
702 def TemporaryFile(mode='w+b', buffering=-1, encoding=None,
703 newline=None, suffix=None, prefix=None,
704 dir=None, *, errors=None):
705 """Create and return a temporary file.
706 Arguments:
707 'prefix', 'suffix', 'dir' -- as for mkstemp.
708 'mode' -- the mode argument to io.open (default "w+b").
709 'buffering' -- the buffer size argument to io.open (default -1).
710 'encoding' -- the encoding argument to io.open (default None)
711 'newline' -- the newline argument to io.open (default None)
712 'errors' -- the errors argument to io.open (default None)
713 The file is created as mkstemp() would do it.
715 Returns an object with a file-like interface. The file has no
716 name, and will cease to exist when it is closed.
717 """
718 global _O_TMPFILE_WORKS
720 prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
722 flags = _bin_openflags
723 if _O_TMPFILE_WORKS:
724 try:
725 flags2 = (flags | _os.O_TMPFILE) & ~_os.O_CREAT
726 fd = _os.open(dir, flags2, 0o600)
727 except IsADirectoryError:
728 # Linux kernel older than 3.11 ignores the O_TMPFILE flag:
729 # O_TMPFILE is read as O_DIRECTORY. Trying to open a directory
730 # with O_RDWR|O_DIRECTORY fails with IsADirectoryError, a
731 # directory cannot be open to write. Set flag to False to not
732 # try again.
733 _O_TMPFILE_WORKS = False
734 except OSError:
735 # The filesystem of the directory does not support O_TMPFILE.
736 # For example, OSError(95, 'Operation not supported').
737 #
738 # On Linux kernel older than 3.11, trying to open a regular
739 # file (or a symbolic link to a regular file) with O_TMPFILE
740 # fails with NotADirectoryError, because O_TMPFILE is read as
741 # O_DIRECTORY.
742 pass
743 else:
744 try:
745 return _io.open(fd, mode, buffering=buffering,
746 newline=newline, encoding=encoding,
747 errors=errors)
748 except:
749 _os.close(fd)
750 raise
751 # Fallback to _mkstemp_inner().
753 (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
754 try:
755 _os.unlink(name)
756 return _io.open(fd, mode, buffering=buffering,
757 newline=newline, encoding=encoding, errors=errors)
758 except:
759 _os.close(fd)
760 raise
762class SpooledTemporaryFile:
763 """Temporary file wrapper, specialized to switch from BytesIO
764 or StringIO to a real file when it exceeds a certain size or
765 when a fileno is needed.
766 """
767 _rolled = False
769 def __init__(self, max_size=0, mode='w+b', buffering=-1,
770 encoding=None, newline=None,
771 suffix=None, prefix=None, dir=None, *, errors=None):
772 if 'b' in mode:
773 self._file = _io.BytesIO()
774 else:
775 self._file = _io.TextIOWrapper(_io.BytesIO(),
776 encoding=encoding, errors=errors,
777 newline=newline)
778 self._max_size = max_size
779 self._rolled = False
780 self._TemporaryFileArgs = {'mode': mode, 'buffering': buffering,
781 'suffix': suffix, 'prefix': prefix,
782 'encoding': encoding, 'newline': newline,
783 'dir': dir, 'errors': errors}
785 __class_getitem__ = classmethod(_types.GenericAlias)
787 def _check(self, file):
788 if self._rolled: return
789 max_size = self._max_size
790 if max_size and file.tell() > max_size:
791 self.rollover()
793 def rollover(self):
794 if self._rolled: return
795 file = self._file
796 newfile = self._file = TemporaryFile(**self._TemporaryFileArgs)
797 del self._TemporaryFileArgs
799 pos = file.tell()
800 if hasattr(newfile, 'buffer'):
801 newfile.buffer.write(file.detach().getvalue())
802 else:
803 newfile.write(file.getvalue())
804 newfile.seek(pos, 0)
806 self._rolled = True
808 # The method caching trick from NamedTemporaryFile
809 # won't work here, because _file may change from a
810 # BytesIO/StringIO instance to a real file. So we list
811 # all the methods directly.
813 # Context management protocol
814 def __enter__(self):
815 if self._file.closed:
816 raise ValueError("Cannot enter context with closed file")
817 return self
819 def __exit__(self, exc, value, tb):
820 self._file.close()
822 # file protocol
823 def __iter__(self):
824 return self._file.__iter__()
826 def close(self):
827 self._file.close()
829 @property
830 def closed(self):
831 return self._file.closed
833 @property
834 def encoding(self):
835 return self._file.encoding
837 @property
838 def errors(self):
839 return self._file.errors
841 def fileno(self):
842 self.rollover()
843 return self._file.fileno()
845 def flush(self):
846 self._file.flush()
848 def isatty(self):
849 return self._file.isatty()
851 @property
852 def mode(self):
853 try:
854 return self._file.mode
855 except AttributeError:
856 return self._TemporaryFileArgs['mode']
858 @property
859 def name(self):
860 try:
861 return self._file.name
862 except AttributeError:
863 return None
865 @property
866 def newlines(self):
867 return self._file.newlines
869 def read(self, *args):
870 return self._file.read(*args)
872 def readline(self, *args):
873 return self._file.readline(*args)
875 def readlines(self, *args):
876 return self._file.readlines(*args)
878 def seek(self, *args):
879 return self._file.seek(*args)
881 def tell(self):
882 return self._file.tell()
884 def truncate(self, size=None):
885 if size is None:
886 self._file.truncate()
887 else:
888 if size > self._max_size:
889 self.rollover()
890 self._file.truncate(size)
892 def write(self, s):
893 file = self._file
894 rv = file.write(s)
895 self._check(file)
896 return rv
898 def writelines(self, iterable):
899 file = self._file
900 rv = file.writelines(iterable)
901 self._check(file)
902 return rv
905class TemporaryDirectory(object):
906 """Create and return a temporary directory. This has the same
907 behavior as mkdtemp but can be used as a context manager. For
908 example:
910 with TemporaryDirectory() as tmpdir:
911 ...
913 Upon exiting the context, the directory and everything contained
914 in it are removed.
915 """
917 def __init__(self, suffix=None, prefix=None, dir=None):
918 self.name = mkdtemp(suffix, prefix, dir)
919 self._finalizer = _weakref.finalize(
920 self, self._cleanup, self.name,
921 warn_message="Implicitly cleaning up {!r}".format(self))
923 @classmethod
924 def _rmtree(cls, name):
925 def onerror(func, path, exc_info):
926 if issubclass(exc_info[0], PermissionError):
927 def resetperms(path):
928 try:
929 _os.chflags(path, 0)
930 except AttributeError:
931 pass
932 _os.chmod(path, 0o700)
934 try:
935 if path != name:
936 resetperms(_os.path.dirname(path))
937 resetperms(path)
939 try:
940 _os.unlink(path)
941 # PermissionError is raised on FreeBSD for directories
942 except (IsADirectoryError, PermissionError):
943 cls._rmtree(path)
944 except FileNotFoundError:
945 pass
946 elif issubclass(exc_info[0], FileNotFoundError):
947 pass
948 else:
949 raise
951 _rmtree(name, onerror=onerror)
953 @classmethod
954 def _cleanup(cls, name, warn_message):
955 cls._rmtree(name)
956 _warnings.warn(warn_message, ResourceWarning)
958 def __repr__(self):
959 return "<{} {!r}>".format(self.__class__.__name__, self.name)
961 def __enter__(self):
962 return self.name
964 def __exit__(self, exc, value, tb):
965 self.cleanup()
967 def cleanup(self):
968 if self._finalizer.detach():
969 self._rmtree(self.name)
971 __class_getitem__ = classmethod(_types.GenericAlias)