Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/tempfile.py: 21%

519 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-20 07:00 +0000

1"""Temporary files. 

2 

3This module provides generic, low- and high-level interfaces for 

4creating temporary files and directories. All of the interfaces 

5provided by this module can be used without fear of race conditions 

6except for 'mktemp'. 'mktemp' is subject to race conditions and 

7should not be used; it is provided for backward compatibility only. 

8 

9The default path names are returned as str. If you supply bytes as 

10input, all return values will be in bytes. Ex: 

11 

12 >>> tempfile.mkstemp() 

13 (4, '/tmp/tmptpu9nin8') 

14 >>> tempfile.mkdtemp(suffix=b'') 

15 b'/tmp/tmppbi8f0hy' 

16 

17This module also provides some data items to the user: 

18 

19 TMP_MAX - maximum number of names that will be tried before 

20 giving up. 

21 tempdir - If this is set to a string before the first use of 

22 any routine from this module, it will be considered as 

23 another candidate location to store temporary files. 

24""" 

25 

26__all__ = [ 

27 "NamedTemporaryFile", "TemporaryFile", # high level safe interfaces 

28 "SpooledTemporaryFile", "TemporaryDirectory", 

29 "mkstemp", "mkdtemp", # low level safe interfaces 

30 "mktemp", # deprecated unsafe interface 

31 "TMP_MAX", "gettempprefix", # constants 

32 "tempdir", "gettempdir", 

33 "gettempprefixb", "gettempdirb", 

34 ] 

35 

36 

37# Imports. 

38 

39import functools as _functools 

40import warnings as _warnings 

41import io as _io 

42import os as _os 

43try: 

44 import shutil as _shutil 

45 _rmtree = _shutil.rmtree 

46except ImportError: 

47 import sys as _sys 

48 import stat as _stat 

49 # version vulnerable to race conditions 

50 def _rmtree_unsafe(path, onerror): 

51 try: 

52 if _os.path.islink(path): 

53 # symlinks to directories are forbidden, see bug #1669 

54 raise OSError("Cannot call rmtree on a symbolic link") 

55 except OSError: 

56 onerror(_os.path.islink, path, _sys.exc_info()) 

57 # can't continue even if onerror hook returns 

58 return 

59 names = [] 

60 try: 

61 names = _os.listdir(path) 

62 except OSError: 

63 onerror(_os.listdir, path, _sys.exc_info()) 

64 for name in names: 

65 fullname = _os.path.join(path, name) 

66 try: 

67 mode = _os.lstat(fullname).st_mode 

68 except OSError: 

69 mode = 0 

70 if _stat.S_ISDIR(mode): 

71 _rmtree_unsafe(fullname, onerror) 

72 else: 

73 try: 

74 _os.unlink(fullname) 

75 except OSError: 

76 onerror(_os.unlink, fullname, _sys.exc_info()) 

77 try: 

78 _os.rmdir(path) 

79 except OSError: 

80 onerror(_os.rmdir, path, _sys.exc_info()) 

81 

82 # Version using fd-based APIs to protect against races 

83 def _rmtree_safe_fd(topfd, path, onerror): 

84 names = [] 

85 try: 

86 names = _os.listdir(topfd) 

87 except OSError as err: 

88 err.filename = path 

89 onerror(_os.listdir, path, _sys.exc_info()) 

90 for name in names: 

91 fullname = _os.path.join(path, name) 

92 try: 

93 orig_st = _os.stat(name, dir_fd=topfd, follow_symlinks=False) 

94 mode = orig_st.st_mode 

95 except OSError: 

96 mode = 0 

97 if _stat.S_ISDIR(mode): 

98 try: 

99 dirfd = _os.open(name, _os.O_RDONLY, dir_fd=topfd) 

100 except OSError: 

101 onerror(_os.open, fullname, _sys.exc_info()) 

102 else: 

103 try: 

104 if _os.path.samestat(orig_st, _os.fstat(dirfd)): 

105 _rmtree_safe_fd(dirfd, fullname, onerror) 

106 try: 

107 _os.rmdir(name, dir_fd=topfd) 

108 except OSError: 

109 onerror(_os.rmdir, fullname, _sys.exc_info()) 

110 else: 

111 try: 

112 # This can only happen if someone replaces 

113 # a directory with a symlink after the call to 

114 # stat.S_ISDIR above. 

115 raise OSError("Cannot call rmtree on a symbolic " 

116 "link") 

117 except OSError: 

118 onerror(_os.path.islink, fullname, _sys.exc_info()) 

119 finally: 

120 _os.close(dirfd) 

121 else: 

122 try: 

123 _os.unlink(name, dir_fd=topfd) 

124 except OSError: 

125 onerror(_os.unlink, fullname, _sys.exc_info()) 

126 

127 _use_fd_functions = ({_os.open, _os.stat, _os.unlink, _os.rmdir} <= 

128 _os.supports_dir_fd and 

129 _os.listdir in _os.supports_fd and 

130 _os.stat in _os.supports_follow_symlinks) 

131 

132 def _rmtree(path, ignore_errors=False, onerror=None): 

133 """Recursively delete a directory tree. 

134 

135 If ignore_errors is set, errors are ignored; otherwise, if onerror 

136 is set, it is called to handle the error with arguments (func, 

137 path, exc_info) where func is platform and implementation dependent; 

138 path is the argument to that function that caused it to fail; and 

139 exc_info is a tuple returned by sys.exc_info(). If ignore_errors 

140 is false and onerror is None, an exception is raised. 

141 

142 """ 

143 if ignore_errors: 

144 def onerror(*args): 

145 pass 

146 elif onerror is None: 

147 def onerror(*args): 

148 raise 

149 if _use_fd_functions: 

150 # While the unsafe rmtree works fine on bytes, the fd based does not. 

151 if isinstance(path, bytes): 

152 path = _os.fsdecode(path) 

153 # Note: To guard against symlink races, we use the standard 

154 # lstat()/open()/fstat() trick. 

155 try: 

156 orig_st = _os.lstat(path) 

157 except Exception: 

158 onerror(_os.lstat, path, _sys.exc_info()) 

159 return 

160 try: 

161 fd = _os.open(path, _os.O_RDONLY) 

162 except Exception: 

163 onerror(_os.lstat, path, _sys.exc_info()) 

164 return 

165 try: 

166 if _os.path.samestat(orig_st, _os.fstat(fd)): 

167 _rmtree_safe_fd(fd, path, onerror) 

168 try: 

169 _os.rmdir(path) 

170 except OSError: 

171 onerror(_os.rmdir, path, _sys.exc_info()) 

172 else: 

173 try: 

174 # symlinks to directories are forbidden, see bug #1669 

175 raise OSError("Cannot call rmtree on a symbolic link") 

176 except OSError: 

177 onerror(_os.path.islink, path, _sys.exc_info()) 

178 finally: 

179 _os.close(fd) 

180 else: 

181 return _rmtree_unsafe(path, onerror) 

182 

183import errno as _errno 

184from random import Random as _Random 

185import sys as _sys 

186import types as _types 

187import weakref as _weakref 

188import _thread 

189_allocate_lock = _thread.allocate_lock 

190 

191_text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL 

192if hasattr(_os, 'O_NOFOLLOW'): 

193 _text_openflags |= _os.O_NOFOLLOW 

194 

195_bin_openflags = _text_openflags 

196if hasattr(_os, 'O_BINARY'): 

197 _bin_openflags |= _os.O_BINARY 

198 

199if hasattr(_os, 'TMP_MAX'): 

200 TMP_MAX = _os.TMP_MAX 

201else: 

202 TMP_MAX = 10000 

203 

204# This variable _was_ unused for legacy reasons, see issue 10354. 

205# But as of 3.5 we actually use it at runtime so changing it would 

206# have a possibly desirable side effect... But we do not want to support 

207# that as an API. It is undocumented on purpose. Do not depend on this. 

208template = "tmp" 

209 

210# Internal routines. 

211 

212_once_lock = _allocate_lock() 

213 

214 

215def _exists(fn): 

216 try: 

217 _os.lstat(fn) 

218 except OSError: 

219 return False 

220 else: 

221 return True 

222 

223 

224def _infer_return_type(*args): 

225 """Look at the type of all args and divine their implied return type.""" 

226 return_type = None 

227 for arg in args: 

228 if arg is None: 

229 continue 

230 if isinstance(arg, bytes): 

231 if return_type is str: 

232 raise TypeError("Can't mix bytes and non-bytes in " 

233 "path components.") 

234 return_type = bytes 

235 else: 

236 if return_type is bytes: 

237 raise TypeError("Can't mix bytes and non-bytes in " 

238 "path components.") 

239 return_type = str 

240 if return_type is None: 

241 return str # tempfile APIs return a str by default. 

242 return return_type 

243 

244 

245def _sanitize_params(prefix, suffix, dir): 

246 """Common parameter processing for most APIs in this module.""" 

247 output_type = _infer_return_type(prefix, suffix, dir) 

248 if suffix is None: 

249 suffix = output_type() 

250 if prefix is None: 

251 if output_type is str: 

252 prefix = template 

253 else: 

254 prefix = _os.fsencode(template) 

255 if dir is None: 

256 if output_type is str: 

257 dir = gettempdir() 

258 else: 

259 dir = gettempdirb() 

260 return prefix, suffix, dir, output_type 

261 

262 

263class _RandomNameSequence: 

264 """An instance of _RandomNameSequence generates an endless 

265 sequence of unpredictable strings which can safely be incorporated 

266 into file names. Each string is eight characters long. Multiple 

267 threads can safely use the same instance at the same time. 

268 

269 _RandomNameSequence is an iterator.""" 

270 

271 characters = "abcdefghijklmnopqrstuvwxyz0123456789_" 

272 

273 @property 

274 def rng(self): 

275 cur_pid = _os.getpid() 

276 if cur_pid != getattr(self, '_rng_pid', None): 

277 self._rng = _Random() 

278 self._rng_pid = cur_pid 

279 return self._rng 

280 

281 def __iter__(self): 

282 return self 

283 

284 def __next__(self): 

285 c = self.characters 

286 choose = self.rng.choice 

287 letters = [choose(c) for dummy in range(8)] 

288 return ''.join(letters) 

289 

290def _candidate_tempdir_list(): 

291 """Generate a list of candidate temporary directories which 

292 _get_default_tempdir will try.""" 

293 

294 dirlist = [] 

295 

296 # First, try the environment. 

297 for envname in 'TMPDIR', 'TEMP', 'TMP': 

298 dirname = _os.getenv(envname) 

299 if dirname: dirlist.append(dirname) 

300 

301 # Failing that, try OS-specific locations. 

302 if _os.name == 'nt': 

303 dirlist.extend([ _os.path.expanduser(r'~\AppData\Local\Temp'), 

304 _os.path.expandvars(r'%SYSTEMROOT%\Temp'), 

305 r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ]) 

306 else: 

307 dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ]) 

308 

309 # As a last resort, the current directory. 

310 try: 

311 dirlist.append(_os.getcwd()) 

312 except (AttributeError, OSError): 

313 dirlist.append(_os.curdir) 

314 

315 return dirlist 

316 

317def _get_default_tempdir(): 

318 """Calculate the default directory to use for temporary files. 

319 This routine should be called exactly once. 

320 

321 We determine whether or not a candidate temp dir is usable by 

322 trying to create and write to a file in that directory. If this 

323 is successful, the test file is deleted. To prevent denial of 

324 service, the name of the test file must be randomized.""" 

325 

326 namer = _RandomNameSequence() 

327 dirlist = _candidate_tempdir_list() 

328 

329 for dir in dirlist: 

330 if dir != _os.curdir: 

331 dir = _os.path.abspath(dir) 

332 # Try only a few names per directory. 

333 for seq in range(100): 

334 name = next(namer) 

335 filename = _os.path.join(dir, name) 

336 try: 

337 fd = _os.open(filename, _bin_openflags, 0o600) 

338 try: 

339 try: 

340 with _io.open(fd, 'wb', closefd=False) as fp: 

341 fp.write(b'blat') 

342 finally: 

343 _os.close(fd) 

344 finally: 

345 _os.unlink(filename) 

346 return dir 

347 except FileExistsError: 

348 pass 

349 except PermissionError: 

350 # This exception is thrown when a directory with the chosen name 

351 # already exists on windows. 

352 if (_os.name == 'nt' and _os.path.isdir(dir) and 

353 _os.access(dir, _os.W_OK)): 

354 continue 

355 break # no point trying more names in this directory 

356 except OSError: 

357 break # no point trying more names in this directory 

358 raise FileNotFoundError(_errno.ENOENT, 

359 "No usable temporary directory found in %s" % 

360 dirlist) 

361 

362_name_sequence = None 

363 

364def _get_candidate_names(): 

365 """Common setup sequence for all user-callable interfaces.""" 

366 

367 global _name_sequence 

368 if _name_sequence is None: 

369 _once_lock.acquire() 

370 try: 

371 if _name_sequence is None: 

372 _name_sequence = _RandomNameSequence() 

373 finally: 

374 _once_lock.release() 

375 return _name_sequence 

376 

377 

378def _mkstemp_inner(dir, pre, suf, flags, output_type): 

379 """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile.""" 

380 

381 names = _get_candidate_names() 

382 if output_type is bytes: 

383 names = map(_os.fsencode, names) 

384 

385 for seq in range(TMP_MAX): 

386 name = next(names) 

387 file = _os.path.join(dir, pre + name + suf) 

388 _sys.audit("tempfile.mkstemp", file) 

389 try: 

390 fd = _os.open(file, flags, 0o600) 

391 except FileExistsError: 

392 continue # try again 

393 except PermissionError: 

394 # This exception is thrown when a directory with the chosen name 

395 # already exists on windows. 

396 if (_os.name == 'nt' and _os.path.isdir(dir) and 

397 _os.access(dir, _os.W_OK)): 

398 continue 

399 else: 

400 raise 

401 return (fd, _os.path.abspath(file)) 

402 

403 raise FileExistsError(_errno.EEXIST, 

404 "No usable temporary file name found") 

405 

406 

407# User visible interfaces. 

408 

409def gettempprefix(): 

410 """The default prefix for temporary directories.""" 

411 return template 

412 

413def gettempprefixb(): 

414 """The default prefix for temporary directories as bytes.""" 

415 return _os.fsencode(gettempprefix()) 

416 

417tempdir = None 

418 

419def gettempdir(): 

420 """Accessor for tempfile.tempdir.""" 

421 global tempdir 

422 if tempdir is None: 

423 _once_lock.acquire() 

424 try: 

425 if tempdir is None: 

426 tempdir = _get_default_tempdir() 

427 finally: 

428 _once_lock.release() 

429 return tempdir 

430 

431def gettempdirb(): 

432 """A bytes version of tempfile.gettempdir().""" 

433 return _os.fsencode(gettempdir()) 

434 

435def mkstemp(suffix=None, prefix=None, dir=None, text=False): 

436 """User-callable function to create and return a unique temporary 

437 file. The return value is a pair (fd, name) where fd is the 

438 file descriptor returned by os.open, and name is the filename. 

439 

440 If 'suffix' is not None, the file name will end with that suffix, 

441 otherwise there will be no suffix. 

442 

443 If 'prefix' is not None, the file name will begin with that prefix, 

444 otherwise a default prefix is used. 

445 

446 If 'dir' is not None, the file will be created in that directory, 

447 otherwise a default directory is used. 

448 

449 If 'text' is specified and true, the file is opened in text 

450 mode. Else (the default) the file is opened in binary mode. 

451 

452 If any of 'suffix', 'prefix' and 'dir' are not None, they must be the 

453 same type. If they are bytes, the returned name will be bytes; str 

454 otherwise. 

455 

456 The file is readable and writable only by the creating user ID. 

457 If the operating system uses permission bits to indicate whether a 

458 file is executable, the file is executable by no one. The file 

459 descriptor is not inherited by children of this process. 

460 

461 Caller is responsible for deleting the file when done with it. 

462 """ 

463 

464 prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir) 

465 

466 if text: 

467 flags = _text_openflags 

468 else: 

469 flags = _bin_openflags 

470 

471 return _mkstemp_inner(dir, prefix, suffix, flags, output_type) 

472 

473 

474def mkdtemp(suffix=None, prefix=None, dir=None): 

475 """User-callable function to create and return a unique temporary 

476 directory. The return value is the pathname of the directory. 

477 

478 Arguments are as for mkstemp, except that the 'text' argument is 

479 not accepted. 

480 

481 The directory is readable, writable, and searchable only by the 

482 creating user. 

483 

484 Caller is responsible for deleting the directory when done with it. 

485 """ 

486 

487 prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir) 

488 

489 names = _get_candidate_names() 

490 if output_type is bytes: 

491 names = map(_os.fsencode, names) 

492 

493 for seq in range(TMP_MAX): 

494 name = next(names) 

495 file = _os.path.join(dir, prefix + name + suffix) 

496 _sys.audit("tempfile.mkdtemp", file) 

497 try: 

498 _os.mkdir(file, 0o700) 

499 except FileExistsError: 

500 continue # try again 

501 except PermissionError: 

502 # This exception is thrown when a directory with the chosen name 

503 # already exists on windows. 

504 if (_os.name == 'nt' and _os.path.isdir(dir) and 

505 _os.access(dir, _os.W_OK)): 

506 continue 

507 else: 

508 raise 

509 return file 

510 

511 raise FileExistsError(_errno.EEXIST, 

512 "No usable temporary directory name found") 

513 

514def mktemp(suffix="", prefix=template, dir=None): 

515 """User-callable function to return a unique temporary file name. The 

516 file is not created. 

517 

518 Arguments are similar to mkstemp, except that the 'text' argument is 

519 not accepted, and suffix=None, prefix=None and bytes file names are not 

520 supported. 

521 

522 THIS FUNCTION IS UNSAFE AND SHOULD NOT BE USED. The file name may 

523 refer to a file that did not exist at some point, but by the time 

524 you get around to creating it, someone else may have beaten you to 

525 the punch. 

526 """ 

527 

528## from warnings import warn as _warn 

529## _warn("mktemp is a potential security risk to your program", 

530## RuntimeWarning, stacklevel=2) 

531 

532 if dir is None: 

533 dir = gettempdir() 

534 

535 names = _get_candidate_names() 

536 for seq in range(TMP_MAX): 

537 name = next(names) 

538 file = _os.path.join(dir, prefix + name + suffix) 

539 if not _exists(file): 

540 return file 

541 

542 raise FileExistsError(_errno.EEXIST, 

543 "No usable temporary filename found") 

544 

545 

546class _TemporaryFileCloser: 

547 """A separate object allowing proper closing of a temporary file's 

548 underlying file object, without adding a __del__ method to the 

549 temporary file.""" 

550 

551 file = None # Set here since __del__ checks it 

552 close_called = False 

553 

554 def __init__(self, file, name, delete=True): 

555 self.file = file 

556 self.name = name 

557 self.delete = delete 

558 

559 # NT provides delete-on-close as a primitive, so we don't need 

560 # the wrapper to do anything special. We still use it so that 

561 # file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile. 

562 if _os.name != 'nt': 

563 # Cache the unlinker so we don't get spurious errors at 

564 # shutdown when the module-level "os" is None'd out. Note 

565 # that this must be referenced as self.unlink, because the 

566 # name TemporaryFileWrapper may also get None'd out before 

567 # __del__ is called. 

568 

569 def close(self, unlink=_os.unlink): 

570 if not self.close_called and self.file is not None: 

571 self.close_called = True 

572 try: 

573 self.file.close() 

574 finally: 

575 if self.delete: 

576 unlink(self.name) 

577 

578 # Need to ensure the file is deleted on __del__ 

579 def __del__(self): 

580 self.close() 

581 

582 else: 

583 def close(self): 

584 if not self.close_called: 

585 self.close_called = True 

586 self.file.close() 

587 

588 

589class _TemporaryFileWrapper: 

590 """Temporary file wrapper 

591 

592 This class provides a wrapper around files opened for 

593 temporary use. In particular, it seeks to automatically 

594 remove the file when it is no longer needed. 

595 """ 

596 

597 def __init__(self, file, name, delete=True): 

598 self.file = file 

599 self.name = name 

600 self.delete = delete 

601 self._closer = _TemporaryFileCloser(file, name, delete) 

602 

603 def __getattr__(self, name): 

604 # Attribute lookups are delegated to the underlying file 

605 # and cached for non-numeric results 

606 # (i.e. methods are cached, closed and friends are not) 

607 file = self.__dict__['file'] 

608 a = getattr(file, name) 

609 if hasattr(a, '__call__'): 

610 func = a 

611 @_functools.wraps(func) 

612 def func_wrapper(*args, **kwargs): 

613 return func(*args, **kwargs) 

614 # Avoid closing the file as long as the wrapper is alive, 

615 # see issue #18879. 

616 func_wrapper._closer = self._closer 

617 a = func_wrapper 

618 if not isinstance(a, int): 

619 setattr(self, name, a) 

620 return a 

621 

622 # The underlying __enter__ method returns the wrong object 

623 # (self.file) so override it to return the wrapper 

624 def __enter__(self): 

625 self.file.__enter__() 

626 return self 

627 

628 # Need to trap __exit__ as well to ensure the file gets 

629 # deleted when used in a with statement 

630 def __exit__(self, exc, value, tb): 

631 result = self.file.__exit__(exc, value, tb) 

632 self.close() 

633 return result 

634 

635 def close(self): 

636 """ 

637 Close the temporary file, possibly deleting it. 

638 """ 

639 self._closer.close() 

640 

641 # iter() doesn't use __getattr__ to find the __iter__ method 

642 def __iter__(self): 

643 # Don't return iter(self.file), but yield from it to avoid closing 

644 # file as long as it's being used as iterator (see issue #23700). We 

645 # can't use 'yield from' here because iter(file) returns the file 

646 # object itself, which has a close method, and thus the file would get 

647 # closed when the generator is finalized, due to PEP380 semantics. 

648 for line in self.file: 

649 yield line 

650 

651 

652def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None, 

653 newline=None, suffix=None, prefix=None, 

654 dir=None, delete=True, *, errors=None): 

655 """Create and return a temporary file. 

656 Arguments: 

657 'prefix', 'suffix', 'dir' -- as for mkstemp. 

658 'mode' -- the mode argument to io.open (default "w+b"). 

659 'buffering' -- the buffer size argument to io.open (default -1). 

660 'encoding' -- the encoding argument to io.open (default None) 

661 'newline' -- the newline argument to io.open (default None) 

662 'delete' -- whether the file is deleted on close (default True). 

663 'errors' -- the errors argument to io.open (default None) 

664 The file is created as mkstemp() would do it. 

665 

666 Returns an object with a file-like interface; the name of the file 

667 is accessible as its 'name' attribute. The file will be automatically 

668 deleted when it is closed unless the 'delete' argument is set to False. 

669 """ 

670 

671 prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir) 

672 

673 flags = _bin_openflags 

674 

675 # Setting O_TEMPORARY in the flags causes the OS to delete 

676 # the file when it is closed. This is only supported by Windows. 

677 if _os.name == 'nt' and delete: 

678 flags |= _os.O_TEMPORARY 

679 

680 (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type) 

681 try: 

682 file = _io.open(fd, mode, buffering=buffering, 

683 newline=newline, encoding=encoding, errors=errors) 

684 

685 return _TemporaryFileWrapper(file, name, delete) 

686 except BaseException: 

687 _os.unlink(name) 

688 _os.close(fd) 

689 raise 

690 

691if _os.name != 'posix' or _sys.platform == 'cygwin': 

692 # On non-POSIX and Cygwin systems, assume that we cannot unlink a file 

693 # while it is open. 

694 TemporaryFile = NamedTemporaryFile 

695 

696else: 

697 # Is the O_TMPFILE flag available and does it work? 

698 # The flag is set to False if os.open(dir, os.O_TMPFILE) raises an 

699 # IsADirectoryError exception 

700 _O_TMPFILE_WORKS = hasattr(_os, 'O_TMPFILE') 

701 

702 def TemporaryFile(mode='w+b', buffering=-1, encoding=None, 

703 newline=None, suffix=None, prefix=None, 

704 dir=None, *, errors=None): 

705 """Create and return a temporary file. 

706 Arguments: 

707 'prefix', 'suffix', 'dir' -- as for mkstemp. 

708 'mode' -- the mode argument to io.open (default "w+b"). 

709 'buffering' -- the buffer size argument to io.open (default -1). 

710 'encoding' -- the encoding argument to io.open (default None) 

711 'newline' -- the newline argument to io.open (default None) 

712 'errors' -- the errors argument to io.open (default None) 

713 The file is created as mkstemp() would do it. 

714 

715 Returns an object with a file-like interface. The file has no 

716 name, and will cease to exist when it is closed. 

717 """ 

718 global _O_TMPFILE_WORKS 

719 

720 prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir) 

721 

722 flags = _bin_openflags 

723 if _O_TMPFILE_WORKS: 

724 try: 

725 flags2 = (flags | _os.O_TMPFILE) & ~_os.O_CREAT 

726 fd = _os.open(dir, flags2, 0o600) 

727 except IsADirectoryError: 

728 # Linux kernel older than 3.11 ignores the O_TMPFILE flag: 

729 # O_TMPFILE is read as O_DIRECTORY. Trying to open a directory 

730 # with O_RDWR|O_DIRECTORY fails with IsADirectoryError, a 

731 # directory cannot be open to write. Set flag to False to not 

732 # try again. 

733 _O_TMPFILE_WORKS = False 

734 except OSError: 

735 # The filesystem of the directory does not support O_TMPFILE. 

736 # For example, OSError(95, 'Operation not supported'). 

737 # 

738 # On Linux kernel older than 3.11, trying to open a regular 

739 # file (or a symbolic link to a regular file) with O_TMPFILE 

740 # fails with NotADirectoryError, because O_TMPFILE is read as 

741 # O_DIRECTORY. 

742 pass 

743 else: 

744 try: 

745 return _io.open(fd, mode, buffering=buffering, 

746 newline=newline, encoding=encoding, 

747 errors=errors) 

748 except: 

749 _os.close(fd) 

750 raise 

751 # Fallback to _mkstemp_inner(). 

752 

753 (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type) 

754 try: 

755 _os.unlink(name) 

756 return _io.open(fd, mode, buffering=buffering, 

757 newline=newline, encoding=encoding, errors=errors) 

758 except: 

759 _os.close(fd) 

760 raise 

761 

762class SpooledTemporaryFile: 

763 """Temporary file wrapper, specialized to switch from BytesIO 

764 or StringIO to a real file when it exceeds a certain size or 

765 when a fileno is needed. 

766 """ 

767 _rolled = False 

768 

769 def __init__(self, max_size=0, mode='w+b', buffering=-1, 

770 encoding=None, newline=None, 

771 suffix=None, prefix=None, dir=None, *, errors=None): 

772 if 'b' in mode: 

773 self._file = _io.BytesIO() 

774 else: 

775 self._file = _io.TextIOWrapper(_io.BytesIO(), 

776 encoding=encoding, errors=errors, 

777 newline=newline) 

778 self._max_size = max_size 

779 self._rolled = False 

780 self._TemporaryFileArgs = {'mode': mode, 'buffering': buffering, 

781 'suffix': suffix, 'prefix': prefix, 

782 'encoding': encoding, 'newline': newline, 

783 'dir': dir, 'errors': errors} 

784 

785 __class_getitem__ = classmethod(_types.GenericAlias) 

786 

787 def _check(self, file): 

788 if self._rolled: return 

789 max_size = self._max_size 

790 if max_size and file.tell() > max_size: 

791 self.rollover() 

792 

793 def rollover(self): 

794 if self._rolled: return 

795 file = self._file 

796 newfile = self._file = TemporaryFile(**self._TemporaryFileArgs) 

797 del self._TemporaryFileArgs 

798 

799 pos = file.tell() 

800 if hasattr(newfile, 'buffer'): 

801 newfile.buffer.write(file.detach().getvalue()) 

802 else: 

803 newfile.write(file.getvalue()) 

804 newfile.seek(pos, 0) 

805 

806 self._rolled = True 

807 

808 # The method caching trick from NamedTemporaryFile 

809 # won't work here, because _file may change from a 

810 # BytesIO/StringIO instance to a real file. So we list 

811 # all the methods directly. 

812 

813 # Context management protocol 

814 def __enter__(self): 

815 if self._file.closed: 

816 raise ValueError("Cannot enter context with closed file") 

817 return self 

818 

819 def __exit__(self, exc, value, tb): 

820 self._file.close() 

821 

822 # file protocol 

823 def __iter__(self): 

824 return self._file.__iter__() 

825 

826 def close(self): 

827 self._file.close() 

828 

829 @property 

830 def closed(self): 

831 return self._file.closed 

832 

833 @property 

834 def encoding(self): 

835 return self._file.encoding 

836 

837 @property 

838 def errors(self): 

839 return self._file.errors 

840 

841 def fileno(self): 

842 self.rollover() 

843 return self._file.fileno() 

844 

845 def flush(self): 

846 self._file.flush() 

847 

848 def isatty(self): 

849 return self._file.isatty() 

850 

851 @property 

852 def mode(self): 

853 try: 

854 return self._file.mode 

855 except AttributeError: 

856 return self._TemporaryFileArgs['mode'] 

857 

858 @property 

859 def name(self): 

860 try: 

861 return self._file.name 

862 except AttributeError: 

863 return None 

864 

865 @property 

866 def newlines(self): 

867 return self._file.newlines 

868 

869 def read(self, *args): 

870 return self._file.read(*args) 

871 

872 def readline(self, *args): 

873 return self._file.readline(*args) 

874 

875 def readlines(self, *args): 

876 return self._file.readlines(*args) 

877 

878 def seek(self, *args): 

879 return self._file.seek(*args) 

880 

881 def tell(self): 

882 return self._file.tell() 

883 

884 def truncate(self, size=None): 

885 if size is None: 

886 self._file.truncate() 

887 else: 

888 if size > self._max_size: 

889 self.rollover() 

890 self._file.truncate(size) 

891 

892 def write(self, s): 

893 file = self._file 

894 rv = file.write(s) 

895 self._check(file) 

896 return rv 

897 

898 def writelines(self, iterable): 

899 file = self._file 

900 rv = file.writelines(iterable) 

901 self._check(file) 

902 return rv 

903 

904 

905class TemporaryDirectory(object): 

906 """Create and return a temporary directory. This has the same 

907 behavior as mkdtemp but can be used as a context manager. For 

908 example: 

909 

910 with TemporaryDirectory() as tmpdir: 

911 ... 

912 

913 Upon exiting the context, the directory and everything contained 

914 in it are removed. 

915 """ 

916 

917 def __init__(self, suffix=None, prefix=None, dir=None): 

918 self.name = mkdtemp(suffix, prefix, dir) 

919 self._finalizer = _weakref.finalize( 

920 self, self._cleanup, self.name, 

921 warn_message="Implicitly cleaning up {!r}".format(self)) 

922 

923 @classmethod 

924 def _rmtree(cls, name): 

925 def onerror(func, path, exc_info): 

926 if issubclass(exc_info[0], PermissionError): 

927 def resetperms(path): 

928 try: 

929 _os.chflags(path, 0) 

930 except AttributeError: 

931 pass 

932 _os.chmod(path, 0o700) 

933 

934 try: 

935 if path != name: 

936 resetperms(_os.path.dirname(path)) 

937 resetperms(path) 

938 

939 try: 

940 _os.unlink(path) 

941 # PermissionError is raised on FreeBSD for directories 

942 except (IsADirectoryError, PermissionError): 

943 cls._rmtree(path) 

944 except FileNotFoundError: 

945 pass 

946 elif issubclass(exc_info[0], FileNotFoundError): 

947 pass 

948 else: 

949 raise 

950 

951 _rmtree(name, onerror=onerror) 

952 

953 @classmethod 

954 def _cleanup(cls, name, warn_message): 

955 cls._rmtree(name) 

956 _warnings.warn(warn_message, ResourceWarning) 

957 

958 def __repr__(self): 

959 return "<{} {!r}>".format(self.__class__.__name__, self.name) 

960 

961 def __enter__(self): 

962 return self.name 

963 

964 def __exit__(self, exc, value, tb): 

965 self.cleanup() 

966 

967 def cleanup(self): 

968 if self._finalizer.detach(): 

969 self._rmtree(self.name) 

970 

971 __class_getitem__ = classmethod(_types.GenericAlias)